bus

package module
v0.0.0-...-2652a1e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2022 License: MIT Imports: 13 Imported by: 4

README

Bus

标准消息总线 Standard Message Bus

Sender

消息发送器, 封装发送细节 (错误处理, 事务消息)

Handler

消息处理器, 封装处理细节 (错误处理, 幂等性, 延迟重试, 死信存储)

处理流程图

tx_flow

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DLStorageInterface

type DLStorageInterface interface {
	// Store 存储队列中无法处理的消息内容
	Store(queue string, data []byte) error
	// Fetch 取出可重试的消息内容
	Fetch(queue string) (map[string][]byte, error)
	// Remove 根据标识移除内容
	Remove(id string) error
}

DLStorageInterface 死信存储接口

type DriverInterface

type DriverInterface interface {
	// CreateQueue 创建队列
	// name 队列名称, 确保唯一
	// delay 队列消息延迟时长, 指定时长后方可被消费者获取
	CreateQueue(name string, delay time.Duration) error

	// CreateTopic 创建主题
	// name 主题名称, 确保唯一
	CreateTopic(name string) error

	// Subscribe 订阅主题
	// topic 订阅的主题名称
	// queue 消息流转队列名称
	// routeKey 路由键, 匹配的消息才会被路由到队列
	// 请格外注意, 关于routeKey的使用具体实现会有不同
	Subscribe(topic, queue, routeKey string) error

	// UnSubscribe 取消订阅, 参数同Subscribe
	UnSubscribe(topic, queue, routeKey string) error

	// SendToQueue 发送消息至队列
	// queue 发送目标队列名称
	// content 发送消息字节内容
	// delay 消息延迟时长, 优先级高于CreateQueue时指定的延迟
	SendToQueue(queue string, content []byte, delay time.Duration) error

	// SendToTopic 发送消息至主题
	// topic 发送目标主题名称
	// content 发送消息字节内容
	// routeKey 路由键, 仅路由到匹配的订阅队列
	SendToTopic(topic string, content []byte, routeKey string) error

	// ReceiveMessage 监听队列获取消息
	// ctx 上下文, 用于中断监听
	// queue 接受消息的队列名称
	// errChan 异常错误传输通道
	// handler 消息回调处理函数
	ReceiveMessage(ctx context.Context, queue string, errChan chan error, handler func([]byte) bool)
}

DriverInterface 驱动接口

type Handler

type Handler struct {
	sync.Once
	Context context.Context

	// Queue 处理队列的名称
	Queue string

	// Delay 消息处理延迟时长
	Delay time.Duration

	// Subscribe 订阅配置
	Subscribe Subscribe

	// Driver 驱动实例
	Driver DriverInterface

	// Logger 异常日志
	Logger LoggerInterface

	// DLStorage 死信存储
	// 无法处理的消息最终流转到这里
	DLStorage DLStorageInterface

	// Idempotent 幂等判断实现
	// 防止消息被重复处理保证数据一致性
	// 若幂等性判断自身异常则可能导致判断失效
	// 因此再严格一致的场景下配置EnsureFn进行二次确认
	Idempotent IdempotentInterface

	// HandleFunc 消息处理回调函数
	// 若返回值为true则表示处理成功, 将删除该消息
	// 若返回值为false则表示处理失败, 消息将延迟重试
	HandleFunc func(msg *Message) (done bool)

	// EnsureFunc 幂等性的二次确认
	// 请一定要注意布尔返回值的代表含义
	// 若返回值为true表示未处理, 即允许处理
	// 若返回值为false表示已处理, 即不允许处理
	// 若使用场景不严格要求数据一致的可以不用配置
	EnsureFunc func(msg *Message) (allow bool)

	// RetryDelay 重试延迟机制
	// 返回值为重试间隔, 若 < 0 则代表不进行重试
	RetryDelay func(attempts int) time.Duration
	// contains filtered or unexported fields
}

Handler 消息处理器

func (*Handler) Prepare

func (h *Handler) Prepare() *Handler

Prepare 准备就绪

func (*Handler) Run

func (h *Handler) Run()

Run 启动处理器

func (*Handler) RunCtx

func (h *Handler) RunCtx(ctx context.Context)

RunCtx 启动处理器

func (*Handler) Wait

func (h *Handler) Wait()

Wait 等待退出

type HandlerOpt

type HandlerOpt func(h *Handler)

func HandlerDelay

func HandlerDelay(delay time.Duration) HandlerOpt

type IdempotentInterface

type IdempotentInterface interface {
	// Acquire 获取key的操作权
	// 若返回值为true表示获取成功, 允许操作
	// 若返回值为false表示获取失败, 不允许操作
	Acquire(key string) (bool, error)

	// Release 释放key的操作权
	Release(key string) error
}

IdempotentInterface 幂等性接口

type LoggerInterface

type LoggerInterface interface {
	Errorf(format string, args ...interface{})
}

type Message

type Message struct {
	// BizUID 消息唯一标识
	// 无特殊业务含义, 通常用于幂等性处理防止重复消费
	BizUID string `json:"b,omitempty"`

	// Payload 原始消息内容
	Payload []byte `json:"p,omitempty"`

	// Retried 记录消息重试次数
	Retried int `json:"r,omitempty"`

	// RouteKey 路由键
	RouteKey string `json:"k,omitempty"`
}

Message 消息结构体

func MessageAutoId

func MessageAutoId(payload interface{}, routeKey string) *Message

MessageAutoId 实例化消息

func MessageWithId

func MessageWithId(id string, payload interface{}, routeKey string) *Message

MessageWithId 实例化消息

func (*Message) Scan

func (m *Message) Scan(dest interface{})

Scan 将消息内容赋值给目标参数

type Sender

type Sender struct {
	sync.Once

	// Topic 发送主题
	Topic string

	// Driver 驱动实例
	Driver DriverInterface

	// Logger 异常日志
	Logger LoggerInterface

	// TxOptions 事务配置
	TxOptions *TxOptions
	// contains filtered or unexported fields
}

Sender 发送器

func (*Sender) Prepare

func (s *Sender) Prepare() *Sender

Prepare 创建主题和日志队列

func (*Sender) Send

func (s *Sender) Send(msg *Message, localTx ...func() error) (err error)

Send 发送消息 msg 发送的消息结构体 localTx 本地事务执行函数

func (*Sender) Wait

func (s *Sender) Wait()

Wait 等待退出

type Subscribe

type Subscribe struct {
	// Topic 订阅主题
	Topic string

	// RouteKey 路由键
	RouteKey string
}

Subscribe 处理器订阅

type TXStorageInterface

type TXStorageInterface interface {
	// Store 将消息预存
	// id 返回存储后的唯一标识
	Store(data []byte) (id string, err error)

	// Fetch 根据标识取出消息
	Fetch(id string) (data []byte, err error)

	// Remove 根据标识移除消息
	Remove(id string) error
}

TXStorageInterface 预发存储接口

type TxOptions

type TxOptions struct {
	Context context.Context
	// Timeout 事务处理时长
	// 启用事务的消息不会立即发布给消费者
	// 当本地事务回调执行返回true才会正式发布
	// 详见事务流程图 ./tx_flow.png
	Timeout time.Duration

	// EnsureFunc 事务完成确认
	// 请一定要注意布尔返回值的代表含义
	// 若返回值为true则表示事务已处理, 发布消息
	// 若返回值为false则表示事务未处理, 撤销消息
	EnsureFunc func(msg *Message) (done bool)

	// RetryDelay 重试延迟机制
	// 返回值为重试间隔, 若 < 0 则代表不进行重试
	RetryDelay func(attempts int) time.Duration

	// TxStorage 事务消息存储
	TxStorage TXStorageInterface
	// contains filtered or unexported fields
}

TxOptions 事务配置

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL