Documentation ¶
Index ¶
- Constants
- Variables
- type Broker
- func (b *Broker) AsyncSend(frame *proto.TransferFrame, message proto.Message) error
- func (b *Broker) Done() <-chan struct{}
- func (b *Broker) Handler(r *tcp.Remote) error
- func (b *Broker) HeartbeatInterval() time.Duration
- func (b *Broker) HeartbeatTask()
- func (b *Broker) IsConnected() bool
- func (b *Broker) IsRegistered() bool
- func (b *Broker) LinkType() proto.LinkType
- func (b *Broker) Logger() logger.Iface
- func (b *Broker) OnAccepted(_ *tcp.Remote) error
- func (b *Broker) OnClosed(_ *tcp.Remote) error
- func (b *Broker) ReRegister(delay bool) error
- func (b *Broker) Send(frame *proto.TransferFrame, message proto.Message) error
- func (b *Broker) SetCrypto(crypto proto.Crypto) *Broker
- func (b *Broker) SetCryptoPlan(option string, key ...string) *Broker
- func (b *Broker) SetRegisterMessage(message *proto.RegisterMessage) *Broker
- func (b *Broker) SetTransfer(trans string) *Broker
- func (b *Broker) StatusOK() bool
- func (b *Broker) TickerInterval() time.Duration
- type CHandler
- func (c *CHandler) Handler(_ *ConsumerMessage)
- func (c *CHandler) OnClosed()
- func (c *CHandler) OnConnected()
- func (c *CHandler) OnNotImplementMessageType(_ *proto.TransferFrame, _ transfer.Conn)
- func (c *CHandler) OnRegisterExpire()
- func (c *CHandler) OnRegisterFailed(_ proto.MessageResponseStatus)
- func (c *CHandler) OnRegistered()
- type Config
- type Consumer
- func (client *Consumer) Crypto() proto.Crypto
- func (client *Consumer) Done() <-chan struct{}
- func (client *Consumer) HandlerFunc() ConsumerHandler
- func (client *Consumer) HeartbeatInterval() time.Duration
- func (client *Consumer) IsConnected() bool
- func (client *Consumer) IsRegistered() bool
- func (client *Consumer) JSONMarshal(v any) ([]byte, error)
- func (client *Consumer) JSONUnmarshal(data []byte, v any) error
- func (client *Consumer) Logger() logger.Iface
- func (client *Consumer) SetCrypto(crypto proto.Crypto) *Consumer
- func (client *Consumer) SetCryptoPlan(option string, key ...string) *Consumer
- func (client *Consumer) Start() error
- func (client *Consumer) StatusOK() bool
- func (client *Consumer) Stop()
- func (client *Consumer) TokenCrypto() *proto.TokenCrypto
- type ConsumerHandler
- type ConsumerMessage
- func (m *ConsumerMessage) MarshalMethod() proto.MarshalMethodType
- func (m *ConsumerMessage) MessageType() proto.MessageType
- func (m *ConsumerMessage) ParseFromCMessage(cm *proto.CMessage)
- func (m *ConsumerMessage) Reset()
- func (m *ConsumerMessage) ShouldBindJSON(v any) error
- func (m *ConsumerMessage) String() string
- type HCPMessagePool
- type HttpProducer
- func (p *HttpProducer) Addr() string
- func (p *HttpProducer) AsyncUrl() string
- func (p *HttpProducer) CreateSHA(pass string) string
- func (p *HttpProducer) Post(topic, key string, form any) (*ProductResponse, error)
- func (p *HttpProducer) Send(topic, key string, value []byte) (*ProductResponse, error)
- func (p *HttpProducer) SetAsyncPath(path string) *HttpProducer
- func (p *HttpProducer) SetPath(path string) *HttpProducer
- func (p *HttpProducer) SetToken(token string) *HttpProducer
- func (p *HttpProducer) Url() string
- type Link
- type PHandler
- type Producer
- func (client *Producer) Beautify(data []byte) string
- func (client *Producer) Crypto() proto.Crypto
- func (client *Producer) Done() <-chan struct{}
- func (client *Producer) HeartbeatInterval() time.Duration
- func (client *Producer) IsConnected() bool
- func (client *Producer) IsRegistered() bool
- func (client *Producer) JSONMarshal(v any) ([]byte, error)
- func (client *Producer) JSONUnmarshal(data []byte, v any) error
- func (client *Producer) Logger() logger.Iface
- func (client *Producer) NewRecord() *ProducerMessage
- func (client *Producer) Publisher(msg *ProducerMessage) error
- func (client *Producer) PutRecord(msg *ProducerMessage)
- func (client *Producer) Send(fn func(record *ProducerMessage) error) error
- func (client *Producer) SetCrypto(crypto proto.Crypto) *Producer
- func (client *Producer) SetCryptoPlan(option string, key ...string) *Producer
- func (client *Producer) Start() error
- func (client *Producer) StatusOK() bool
- func (client *Producer) Stop()
- func (client *Producer) TokenCrypto() *proto.TokenCrypto
- type ProducerForm
- type ProducerHandler
- type ProducerMessage
- type ProductResponse
- type Queue
- type TCPLink
Constants ¶
const ( AllConfirm = proto.AllConfirm NoConfirm = proto.NoConfirm LeaderConfirm = proto.LeaderConfirm )
const (
DefaultProducerSendInterval = 500 * time.Millisecond
)
const DefaultRegisterDelay = time.Second * 2
Variables ¶
var ( ErrTopicEmpty = errors.New("topic is empty") ErrConsumerHandlerIsNil = errors.New("consumer messageHandler is nil") ErrConsumerUnregistered = errors.New("consumer unregistered") ErrConsumerUnconnected = errors.New("consumer unconnected") ErrProducerUnregistered = errors.New("producer unregistered") ErrProducerUnconnected = errors.New("producer unconnected") ErrTokenIncorrect = errors.New("token incorrect") )
var NewQueue = proto.NewQueue
Functions ¶
This section is empty.
Types ¶
type Broker ¶ added in v0.3.3
type Broker struct {
// contains filtered or unexported fields
}
Broker Broker连接管理,负责连接服务器并完成注册任务 在检测到连接断开时主动重连
func (*Broker) HeartbeatInterval ¶ added in v0.3.3
HeartbeatInterval 心跳周期
func (*Broker) HeartbeatTask ¶ added in v0.3.3
func (b *Broker) HeartbeatTask()
HeartbeatTask 心跳轮询任务
func (*Broker) IsConnected ¶ added in v0.3.3
IsConnected 与服务端是否连接成功
func (*Broker) IsRegistered ¶ added in v0.3.3
IsRegistered 向服务端注册消费者是否成功
func (*Broker) OnAccepted ¶ added in v0.3.3
OnAccepted 当TCP连接成功时会自行发送注册消息
func (*Broker) ReRegister ¶ added in v0.3.3
ReRegister 重新发起注册流程
func (*Broker) SetCryptoPlan ¶ added in v0.3.4
SetCryptoPlan 设置加密方案
@param option string 加密方案, 支持token/no (令牌加密和不加密) @param key []string 其他加密参数
func (*Broker) SetRegisterMessage ¶ added in v0.3.3
func (b *Broker) SetRegisterMessage(message *proto.RegisterMessage) *Broker
func (*Broker) SetTransfer ¶ added in v0.3.3
func (*Broker) TickerInterval ¶ added in v0.3.3
TickerInterval 数据发送周期
type CHandler ¶ added in v0.3.3
type CHandler struct{}
func (*CHandler) Handler ¶ added in v0.3.3
func (c *CHandler) Handler(_ *ConsumerMessage)
func (*CHandler) OnConnected ¶ added in v0.3.3
func (c *CHandler) OnConnected()
func (*CHandler) OnNotImplementMessageType ¶ added in v0.3.3
func (c *CHandler) OnNotImplementMessageType(_ *proto.TransferFrame, _ transfer.Conn)
func (*CHandler) OnRegisterExpire ¶ added in v0.3.3
func (c *CHandler) OnRegisterExpire()
func (*CHandler) OnRegisterFailed ¶ added in v0.3.3
func (c *CHandler) OnRegisterFailed(_ proto.MessageResponseStatus)
func (*CHandler) OnRegistered ¶ added in v0.3.3
func (c *CHandler) OnRegistered()
type Config ¶
type Config struct { Host string `json:"host"` Port string `json:"port"` Ack proto.AckType `json:"ack"` LinkType string `json:"link_type" description:"tcp/udp"` PCtx context.Context `json:"-"` // 父context,默认为 context.Background() Logger logger.Iface `json:"-"` Token string `json:"-"` }
Config 生产者和消费者配置参数
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer 消费者
func NewAsyncConsumer ¶
func NewAsyncConsumer(conf Config, handler ConsumerHandler) (*Consumer, error)
NewAsyncConsumer 创建异步消费者
func NewConsumer ¶
func NewConsumer(conf Config, handler ConsumerHandler) (*Consumer, error)
NewConsumer 创建一个消费者,需要手动Start
func (*Consumer) HandlerFunc ¶
func (client *Consumer) HandlerFunc() ConsumerHandler
HandlerFunc 获取注册的消息处理方法
func (*Consumer) HeartbeatInterval ¶ added in v0.3.3
HeartbeatInterval 心跳周期
func (*Consumer) IsRegistered ¶
IsRegistered 向服务端注册消费者是否成功
func (*Consumer) JSONUnmarshal ¶
JSONUnmarshal 反序列化方法
func (*Consumer) SetCryptoPlan ¶ added in v0.3.4
SetCryptoPlan 设置加密方案
@param option string 加密方案, 支持token/no (令牌加密和不加密) @param key []string 其他加密参数
func (*Consumer) TokenCrypto ¶ added in v0.3.3
func (client *Consumer) TokenCrypto() *proto.TokenCrypto
TokenCrypto Token加解密器,亦可作为全局加解密器
type ConsumerHandler ¶
type ConsumerHandler interface { ProducerHandler Topics() []string Handler(record *ConsumerMessage) // (异步执行) }
type ConsumerMessage ¶
type ConsumerMessage struct { Topic string `json:"topic"` Key string `json:"key"` Value []byte `json:"value"` Offset uint64 `json:"offset"` ProductTime time.Time `json:"product_time"` // 服务端收到消息时的时间戳 // contains filtered or unexported fields }
ConsumerMessage 用于SDK直接传递给消费者的单条数据消息 需要从 TransferFrame 中转换
func (*ConsumerMessage) MarshalMethod ¶ added in v0.3.3
func (m *ConsumerMessage) MarshalMethod() proto.MarshalMethodType
func (*ConsumerMessage) MessageType ¶ added in v0.3.3
func (m *ConsumerMessage) MessageType() proto.MessageType
func (*ConsumerMessage) ParseFromCMessage ¶ added in v0.3.3
func (m *ConsumerMessage) ParseFromCMessage(cm *proto.CMessage)
func (*ConsumerMessage) Reset ¶ added in v0.3.3
func (m *ConsumerMessage) Reset()
func (*ConsumerMessage) ShouldBindJSON ¶ added in v0.3.3
func (m *ConsumerMessage) ShouldBindJSON(v any) error
ShouldBindJSON 将数据反序列化到一个JSON模型上
func (*ConsumerMessage) String ¶ added in v0.3.3
func (m *ConsumerMessage) String() string
type HCPMessagePool ¶ added in v0.3.3
type HCPMessagePool struct {
// contains filtered or unexported fields
}
func NewHCPMPool ¶ added in v0.3.3
func NewHCPMPool() *HCPMessagePool
func (*HCPMessagePool) CMHistoryNum ¶ added in v0.3.3
func (m *HCPMessagePool) CMHistoryNum() uint64
CMHistoryNum ConsumerMessage 历史数量
func (*HCPMessagePool) GetCM ¶ added in v0.3.3
func (m *HCPMessagePool) GetCM() *ConsumerMessage
func (*HCPMessagePool) GetPM ¶ added in v0.3.3
func (m *HCPMessagePool) GetPM() *ProducerMessage
func (*HCPMessagePool) PMHistoryNum ¶ added in v0.3.3
func (m *HCPMessagePool) PMHistoryNum() uint64
PMHistoryNum ProducerMessage 历史数量
func (*HCPMessagePool) PutCM ¶ added in v0.3.3
func (m *HCPMessagePool) PutCM(v *ConsumerMessage)
func (*HCPMessagePool) PutPM ¶ added in v0.3.3
func (m *HCPMessagePool) PutPM(v *ProducerMessage)
type HttpProducer ¶ added in v0.3.5
type HttpProducer struct {
// contains filtered or unexported fields
}
HttpProducer HTTP 方式生产者
# Usage: p := NewHttpProducer("127.0.0.1", "8080") p.SetToken("token") p.SetPath("/api/edge/product") // 可选的 resp, err := p.Send("topic", "key", []byte("value")) if err != nil { panic(err) } if !resp.IsOK() { // ok }
func NewHttpProducer ¶ added in v0.3.5
func NewHttpProducer(host, port string) *HttpProducer
NewHttpProducer 创建一个HTTP的生产者
func (*HttpProducer) AsyncUrl ¶ added in v0.3.5
func (p *HttpProducer) AsyncUrl() string
AsyncUrl 异步方法请求路由
func (*HttpProducer) CreateSHA ¶ added in v0.3.5
func (p *HttpProducer) CreateSHA(pass string) string
CreateSHA 计算字符串的HASH值,默认为SHA256
func (*HttpProducer) Post ¶ added in v0.3.5
func (p *HttpProducer) Post(topic, key string, form any) (*ProductResponse, error)
Post 发送消息
func (*HttpProducer) Send ¶ added in v0.3.5
func (p *HttpProducer) Send(topic, key string, value []byte) (*ProductResponse, error)
Send 发送消息
func (*HttpProducer) SetAsyncPath ¶ added in v0.3.5
func (p *HttpProducer) SetAsyncPath(path string) *HttpProducer
SetAsyncPath 修改broker异步方法路径
func (*HttpProducer) SetPath ¶ added in v0.3.5
func (p *HttpProducer) SetPath(path string) *HttpProducer
SetPath 修改broker路径
func (*HttpProducer) SetToken ¶ added in v0.3.5
func (p *HttpProducer) SetToken(token string) *HttpProducer
SetToken 设置认证密钥
type PHandler ¶ added in v0.3.3
type PHandler struct{}
PHandler 默认实现
func (PHandler) OnConnected ¶ added in v0.3.3
func (h PHandler) OnConnected()
func (PHandler) OnNotImplementMessageType ¶ added in v0.3.3
func (h PHandler) OnNotImplementMessageType(frame *proto.TransferFrame, con transfer.Conn)
func (PHandler) OnRegisterExpire ¶ added in v0.3.3
func (h PHandler) OnRegisterExpire()
func (PHandler) OnRegisterFailed ¶ added in v0.3.3
func (h PHandler) OnRegisterFailed(status proto.MessageResponseStatus)
func (PHandler) OnRegistered ¶ added in v0.3.3
func (h PHandler) OnRegistered()
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer 生产者, 通过 Send 发送的消息并非会立即投递给服务端 而是会按照服务器下发的配置定时批量发送消息,通常为500ms
func NewAsyncProducer ¶
func NewAsyncProducer(conf Config, handlers ...ProducerHandler) (*Producer, error)
NewAsyncProducer 创建异步生产者,无需再手动启动
func NewProducer ¶
func NewProducer(conf Config, handlers ...ProducerHandler) *Producer
NewProducer 创建异步生产者,需手动启动
func (*Producer) HeartbeatInterval ¶ added in v0.3.3
HeartbeatInterval 心跳周期
func (*Producer) IsRegistered ¶
IsRegistered 向服务端注册消费者是否成功
func (*Producer) JSONMarshal ¶
JSONMarshal 序列化方法
func (*Producer) NewRecord ¶
func (client *Producer) NewRecord() *ProducerMessage
NewRecord 从池中初始化一个新的消息记录
func (*Producer) Publisher ¶
func (client *Producer) Publisher(msg *ProducerMessage) error
Publisher 发送消息
func (*Producer) PutRecord ¶
func (client *Producer) PutRecord(msg *ProducerMessage)
PutRecord 主动归还消息记录到池,仅在主动调用 NewRecord 却没发送数据时使用
func (*Producer) Send ¶
func (client *Producer) Send(fn func(record *ProducerMessage) error) error
Send 发送一条消息
func (*Producer) SetCryptoPlan ¶ added in v0.3.4
SetCryptoPlan 设置加密方案
@param option string 加密方案, 支持token/no (令牌加密和不加密) @param key []string 其他加密参数
func (*Producer) TokenCrypto ¶ added in v0.3.3
func (client *Producer) TokenCrypto() *proto.TokenCrypto
TokenCrypto Token加解密器,亦可作为全局加密器
type ProducerForm ¶ added in v0.3.5
type ProducerForm struct { Topic string `json:"topic" description:"消息主题"` Key string `json:"key" description:"消息键"` Value string `json:"value" description:"base64编码后的消息体"` Token string `json:"token,omitempty" description:"认证密钥"` }
ProducerForm 生产者消息投递表单, 不允许将多个消息编码成一个消息帧; token若为空则认为不加密; value是对加密后的消息体进行base64编码后的结果,依据token判断是否需要解密
func (ProducerForm) IsEncrypt ¶ added in v0.3.5
func (m ProducerForm) IsEncrypt() bool
func (ProducerForm) String ¶ added in v0.3.5
func (m ProducerForm) String() string
type ProducerHandler ¶
type ProducerHandler interface { OnConnected() // (同步执行)当连接成功时触发的事件, 此事件必须在执行完成之后才会进行后续的处理,因此需自行控制 OnClosed() // (同步执行)当连接中断时触发的事件, 此事件必须在执行完成之后才会进行重连操作(若有) OnRegistered() // (同步执行)当注册成功触发的事件 OnRegisterFailed(status proto.MessageResponseStatus) // (同步执行)当注册失败触发的事件 OnRegisterExpire() // (同步执行)当连接中断时触发的事件, 此事件必须在执行完成之后才会进行重连操作(若有) // 阻塞调用 // OnNotImplementMessageType 当收到一个未实现的消息帧时触发的事件 OnNotImplementMessageType(frame *proto.TransferFrame, con transfer.Conn) }
type ProducerMessage ¶
type ProducerMessage struct { Topic string `json:"topic"` Key string `json:"key"` Value []byte `json:"value"` // contains filtered or unexported fields }
ProducerMessage 生产者直接发送的数据 会转换成 TransferFrame 后发送
func (*ProducerMessage) BindFromJSON ¶ added in v0.3.3
func (m *ProducerMessage) BindFromJSON(v any) error
BindFromJSON 从JSON模型获取序列化数据
func (*ProducerMessage) MarshalMethod ¶ added in v0.3.3
func (m *ProducerMessage) MarshalMethod() proto.MarshalMethodType
func (*ProducerMessage) MessageType ¶ added in v0.3.3
func (m *ProducerMessage) MessageType() proto.MessageType
func (*ProducerMessage) Reset ¶ added in v0.3.3
func (m *ProducerMessage) Reset()
func (*ProducerMessage) String ¶ added in v0.3.3
func (m *ProducerMessage) String() string
type ProductResponse ¶ added in v0.3.5
type ProductResponse struct { Status string `` /* 126-byte string literal not displayed */ Offset uint64 `json:"offset" description:"消息偏移量"` ResponseTime int64 `json:"response_time" description:"服务端返回消息时的时间戳"` Message string `json:"message" description:"额外的消息描述"` }
ProductResponse 消息返回值; 仅当 status=Accepted 时才认为服务器接受了请求并正确的处理了消息
func (ProductResponse) Error ¶ added in v0.3.5
func (m ProductResponse) Error() error
func (ProductResponse) String ¶ added in v0.3.5
func (m ProductResponse) String() string
type TCPLink ¶ added in v0.3.3
type TCPLink struct { Host string `json:"host"` Port string `json:"port"` LinkType proto.LinkType `json:"link_type"` // contains filtered or unexported fields }
func (*TCPLink) SetTCPHandler ¶ added in v0.3.3
func (l *TCPLink) SetTCPHandler(handler tcp.HandlerFunc)
func (*TCPLink) SetUDPHandler ¶ added in v0.3.3
func (l *TCPLink) SetUDPHandler(_ func())