Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DLStorageInterface ¶
type DLStorageInterface interface { // Store 存储队列中无法处理的消息内容 Store(queue string, data []byte) error // Fetch 取出可重试的消息内容 Fetch(queue string) (map[string][]byte, error) // Remove 根据标识移除内容 Remove(id string) error }
DLStorageInterface 死信存储接口
type DriverInterface ¶
type DriverInterface interface { // CreateQueue 创建队列 // name 队列名称, 确保唯一 // delay 队列消息延迟时长, 指定时长后方可被消费者获取 CreateQueue(name string, delay time.Duration) error // CreateTopic 创建主题 // name 主题名称, 确保唯一 CreateTopic(name string) error // Subscribe 订阅主题 // topic 订阅的主题名称 // queue 消息流转队列名称 // routeKey 路由键, 匹配的消息才会被路由到队列 // 请格外注意, 关于routeKey的使用具体实现会有不同 Subscribe(topic, queue, routeKey string) error // UnSubscribe 取消订阅, 参数同Subscribe UnSubscribe(topic, queue, routeKey string) error // SendToQueue 发送消息至队列 // queue 发送目标队列名称 // content 发送消息字节内容 // delay 消息延迟时长, 优先级高于CreateQueue时指定的延迟 SendToQueue(queue string, content []byte, delay time.Duration) error // SendToTopic 发送消息至主题 // topic 发送目标主题名称 // content 发送消息字节内容 // routeKey 路由键, 仅路由到匹配的订阅队列 SendToTopic(topic string, content []byte, routeKey string) error // ReceiveMessage 监听队列获取消息 // ctx 上下文, 用于中断监听 // queue 接受消息的队列名称 // errChan 异常错误传输通道 // handler 消息回调处理函数 ReceiveMessage(ctx context.Context, queue string, errChan chan error, handler func([]byte) bool) }
DriverInterface 驱动接口
type Handler ¶
type Handler struct { sync.Once Context context.Context // Queue 处理队列的名称 Queue string // Delay 消息处理延迟时长 Delay time.Duration // Subscribe 订阅配置 Subscribe Subscribe // Driver 驱动实例 Driver DriverInterface // Logger 异常日志 Logger LoggerInterface // DLStorage 死信存储 // 无法处理的消息最终流转到这里 DLStorage DLStorageInterface // Idempotent 幂等判断实现 // 防止消息被重复处理保证数据一致性 // 若幂等性判断自身异常则可能导致判断失效 // 因此再严格一致的场景下配置EnsureFn进行二次确认 Idempotent IdempotentInterface // HandleFunc 消息处理回调函数 // 若返回值为true则表示处理成功, 将删除该消息 // 若返回值为false则表示处理失败, 消息将延迟重试 HandleFunc func(msg *Message) (done bool) // EnsureFunc 幂等性的二次确认 // 请一定要注意布尔返回值的代表含义 // 若返回值为true表示未处理, 即允许处理 // 若返回值为false表示已处理, 即不允许处理 // 若使用场景不严格要求数据一致的可以不用配置 EnsureFunc func(msg *Message) (allow bool) // RetryDelay 重试延迟机制 // 返回值为重试间隔, 若 < 0 则代表不进行重试 RetryDelay func(attempts int) time.Duration // contains filtered or unexported fields }
Handler 消息处理器
type HandlerOpt ¶
type HandlerOpt func(h *Handler)
func HandlerDelay ¶
func HandlerDelay(delay time.Duration) HandlerOpt
type IdempotentInterface ¶
type IdempotentInterface interface { // Acquire 获取key的操作权 // 若返回值为true表示获取成功, 允许操作 // 若返回值为false表示获取失败, 不允许操作 Acquire(key string) (bool, error) // Release 释放key的操作权 Release(key string) error }
IdempotentInterface 幂等性接口
type LoggerInterface ¶
type LoggerInterface interface {
Errorf(format string, args ...interface{})
}
type Message ¶
type Message struct { // BizUID 消息唯一标识 // 无特殊业务含义, 通常用于幂等性处理防止重复消费 BizUID string `json:"b,omitempty"` // Payload 原始消息内容 Payload []byte `json:"p,omitempty"` // Retried 记录消息重试次数 Retried int `json:"r,omitempty"` // RouteKey 路由键 RouteKey string `json:"k,omitempty"` }
Message 消息结构体
func MessageAutoId ¶
MessageAutoId 实例化消息
func MessageWithId ¶
MessageWithId 实例化消息
type Sender ¶
type Sender struct { sync.Once // Topic 发送主题 Topic string // Driver 驱动实例 Driver DriverInterface // Logger 异常日志 Logger LoggerInterface // TxOptions 事务配置 TxOptions *TxOptions // contains filtered or unexported fields }
Sender 发送器
type TXStorageInterface ¶
type TXStorageInterface interface { // Store 将消息预存 // id 返回存储后的唯一标识 Store(data []byte) (id string, err error) // Fetch 根据标识取出消息 Fetch(id string) (data []byte, err error) // Remove 根据标识移除消息 Remove(id string) error }
TXStorageInterface 预发存储接口
type TxOptions ¶
type TxOptions struct { Context context.Context // Timeout 事务处理时长 // 启用事务的消息不会立即发布给消费者 // 当本地事务回调执行返回true才会正式发布 // 详见事务流程图 ./tx_flow.png Timeout time.Duration // EnsureFunc 事务完成确认 // 请一定要注意布尔返回值的代表含义 // 若返回值为true则表示事务已处理, 发布消息 // 若返回值为false则表示事务未处理, 撤销消息 EnsureFunc func(msg *Message) (done bool) // RetryDelay 重试延迟机制 // 返回值为重试间隔, 若 < 0 则代表不进行重试 RetryDelay func(attempts int) time.Duration // TxStorage 事务消息存储 TxStorage TXStorageInterface // contains filtered or unexported fields }
TxOptions 事务配置
Source Files ¶
Click to show internal directories.
Click to hide internal directories.