sdk

package
v0.3.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2023 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AllConfirm    = proto.AllConfirm
	NoConfirm     = proto.NoConfirm
	LeaderConfirm = proto.LeaderConfirm
)
View Source
const (
	DefaultProducerSendInterval = 500 * time.Millisecond
)
View Source
const DefaultRegisterDelay = time.Second * 2

Variables

View Source
var (
	ErrTopicEmpty           = errors.New("topic is empty")
	ErrConsumerHandlerIsNil = errors.New("consumer messageHandler is nil")
	ErrConsumerUnregistered = errors.New("consumer unregistered")
	ErrConsumerUnconnected  = errors.New("consumer unconnected")
	ErrProducerUnregistered = errors.New("producer unregistered")
	ErrProducerUnconnected  = errors.New("producer unconnected")
	ErrTokenIncorrect       = errors.New("token incorrect")
)
View Source
var NewQueue = proto.NewQueue

Functions

This section is empty.

Types

type Broker added in v0.3.3

type Broker struct {
	// contains filtered or unexported fields
}

Broker Broker连接管理,负责连接服务器并完成注册任务 在检测到连接断开时主动重连

func (*Broker) AsyncSend added in v0.3.4

func (b *Broker) AsyncSend(frame *proto.TransferFrame, message proto.Message) error

AsyncSend 异步发送消息

func (*Broker) Done added in v0.3.3

func (b *Broker) Done() <-chan struct{}

func (*Broker) Handler added in v0.3.3

func (b *Broker) Handler(r *tcp.Remote) error

func (*Broker) HeartbeatInterval added in v0.3.3

func (b *Broker) HeartbeatInterval() time.Duration

HeartbeatInterval 心跳周期

func (*Broker) HeartbeatTask added in v0.3.3

func (b *Broker) HeartbeatTask()

HeartbeatTask 心跳轮询任务

func (*Broker) IsConnected added in v0.3.3

func (b *Broker) IsConnected() bool

IsConnected 与服务端是否连接成功

func (*Broker) IsRegistered added in v0.3.3

func (b *Broker) IsRegistered() bool

IsRegistered 向服务端注册消费者是否成功

func (*Broker) LinkType added in v0.3.3

func (b *Broker) LinkType() proto.LinkType

func (*Broker) Logger added in v0.3.3

func (b *Broker) Logger() logger.Iface

func (*Broker) OnAccepted added in v0.3.3

func (b *Broker) OnAccepted(_ *tcp.Remote) error

OnAccepted 当TCP连接成功时会自行发送注册消息

func (*Broker) OnClosed added in v0.3.3

func (b *Broker) OnClosed(_ *tcp.Remote) error

func (*Broker) ReRegister added in v0.3.3

func (b *Broker) ReRegister(delay bool) error

ReRegister 重新发起注册流程

func (*Broker) Send added in v0.3.4

func (b *Broker) Send(frame *proto.TransferFrame, message proto.Message) error

Send 同步发送消息

func (*Broker) SetCrypto added in v0.3.4

func (b *Broker) SetCrypto(crypto proto.Crypto) *Broker

SetCrypto 修改全局加解密器, 必须在 Serve 之前设置

func (*Broker) SetCryptoPlan added in v0.3.4

func (b *Broker) SetCryptoPlan(option string, key ...string) *Broker

SetCryptoPlan 设置加密方案

@param	option	string		加密方案, 支持token/no (令牌加密和不加密)
@param	key 	[]string	其他加密参数

func (*Broker) SetRegisterMessage added in v0.3.3

func (b *Broker) SetRegisterMessage(message *proto.RegisterMessage) *Broker

func (*Broker) SetTransfer added in v0.3.3

func (b *Broker) SetTransfer(trans string) *Broker

func (*Broker) StatusOK added in v0.3.3

func (b *Broker) StatusOK() bool

StatusOK 连接状态是否正常

func (*Broker) TickerInterval added in v0.3.3

func (b *Broker) TickerInterval() time.Duration

TickerInterval 数据发送周期

type CHandler added in v0.3.3

type CHandler struct{}

func (*CHandler) Handler added in v0.3.3

func (c *CHandler) Handler(_ *ConsumerMessage)

func (*CHandler) OnClosed added in v0.3.3

func (c *CHandler) OnClosed()

func (*CHandler) OnConnected added in v0.3.3

func (c *CHandler) OnConnected()

func (*CHandler) OnNotImplementMessageType added in v0.3.3

func (c *CHandler) OnNotImplementMessageType(_ *proto.TransferFrame, _ transfer.Conn)

func (*CHandler) OnRegisterExpire added in v0.3.3

func (c *CHandler) OnRegisterExpire()

func (*CHandler) OnRegisterFailed added in v0.3.3

func (c *CHandler) OnRegisterFailed(_ proto.MessageResponseStatus)

func (*CHandler) OnRegistered added in v0.3.3

func (c *CHandler) OnRegistered()

type Config

type Config struct {
	Host     string          `json:"host"`
	Port     string          `json:"port"`
	Ack      proto.AckType   `json:"ack"`
	LinkType string          `json:"link_type" description:"tcp/udp"`
	PCtx     context.Context `json:"-"` // 父context,默认为 context.Background()
	Logger   logger.Iface    `json:"-"`
	Token    string          `json:"-"`
}

Config 生产者和消费者配置参数

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer 消费者

func NewAsyncConsumer

func NewAsyncConsumer(conf Config, handler ConsumerHandler) (*Consumer, error)

NewAsyncConsumer 创建异步消费者

func NewConsumer

func NewConsumer(conf Config, handler ConsumerHandler) (*Consumer, error)

NewConsumer 创建一个消费者,需要手动Start

func (*Consumer) Crypto added in v0.3.3

func (client *Consumer) Crypto() proto.Crypto

Crypto 全局加密器

func (*Consumer) Done added in v0.3.3

func (client *Consumer) Done() <-chan struct{}

func (*Consumer) HandlerFunc

func (client *Consumer) HandlerFunc() ConsumerHandler

HandlerFunc 获取注册的消息处理方法

func (*Consumer) HeartbeatInterval added in v0.3.3

func (client *Consumer) HeartbeatInterval() time.Duration

HeartbeatInterval 心跳周期

func (*Consumer) IsConnected

func (client *Consumer) IsConnected() bool

IsConnected 与服务端是否连接成功

func (*Consumer) IsRegistered

func (client *Consumer) IsRegistered() bool

IsRegistered 向服务端注册消费者是否成功

func (*Consumer) JSONMarshal

func (client *Consumer) JSONMarshal(v any) ([]byte, error)

func (*Consumer) JSONUnmarshal

func (client *Consumer) JSONUnmarshal(data []byte, v any) error

JSONUnmarshal 反序列化方法

func (*Consumer) Logger

func (client *Consumer) Logger() logger.Iface

func (*Consumer) SetCrypto added in v0.3.4

func (client *Consumer) SetCrypto(crypto proto.Crypto) *Consumer

SetCrypto 修改全局加解密器, 必须在 Serve 之前设置

func (*Consumer) SetCryptoPlan added in v0.3.4

func (client *Consumer) SetCryptoPlan(option string, key ...string) *Consumer

SetCryptoPlan 设置加密方案

@param	option	string		加密方案, 支持token/no (令牌加密和不加密)
@param	key 	[]string	其他加密参数

func (*Consumer) Start

func (client *Consumer) Start() error

Start 异步启动

func (*Consumer) StatusOK

func (client *Consumer) StatusOK() bool

StatusOK 连接状态是否正常,以及是否可以向服务器发送消息

func (*Consumer) Stop added in v0.3.1

func (client *Consumer) Stop()

func (*Consumer) TokenCrypto added in v0.3.3

func (client *Consumer) TokenCrypto() *proto.TokenCrypto

TokenCrypto Token加解密器,亦可作为全局加解密器

type ConsumerHandler

type ConsumerHandler interface {
	ProducerHandler
	Topics() []string
	Handler(record *ConsumerMessage) // (异步执行)
}

type ConsumerMessage

type ConsumerMessage struct {
	Topic       string    `json:"topic"`
	Key         string    `json:"key"`
	Value       []byte    `json:"value"`
	Offset      uint64    `json:"offset"`
	ProductTime time.Time `json:"product_time"` // 服务端收到消息时的时间戳
	// contains filtered or unexported fields
}

ConsumerMessage 用于SDK直接传递给消费者的单条数据消息 需要从 TransferFrame 中转换

func (*ConsumerMessage) MarshalMethod added in v0.3.3

func (m *ConsumerMessage) MarshalMethod() proto.MarshalMethodType

func (*ConsumerMessage) MessageType added in v0.3.3

func (m *ConsumerMessage) MessageType() proto.MessageType

func (*ConsumerMessage) ParseFromCMessage added in v0.3.3

func (m *ConsumerMessage) ParseFromCMessage(cm *proto.CMessage)

func (*ConsumerMessage) Reset added in v0.3.3

func (m *ConsumerMessage) Reset()

func (*ConsumerMessage) ShouldBindJSON added in v0.3.3

func (m *ConsumerMessage) ShouldBindJSON(v any) error

ShouldBindJSON 将数据反序列化到一个JSON模型上

func (*ConsumerMessage) String added in v0.3.3

func (m *ConsumerMessage) String() string

type HCPMessagePool added in v0.3.3

type HCPMessagePool struct {
	// contains filtered or unexported fields
}

func NewHCPMPool added in v0.3.3

func NewHCPMPool() *HCPMessagePool

func (*HCPMessagePool) CMHistoryNum added in v0.3.3

func (m *HCPMessagePool) CMHistoryNum() uint64

CMHistoryNum ConsumerMessage 历史数量

func (*HCPMessagePool) GetCM added in v0.3.3

func (m *HCPMessagePool) GetCM() *ConsumerMessage

func (*HCPMessagePool) GetPM added in v0.3.3

func (m *HCPMessagePool) GetPM() *ProducerMessage

func (*HCPMessagePool) PMHistoryNum added in v0.3.3

func (m *HCPMessagePool) PMHistoryNum() uint64

PMHistoryNum ProducerMessage 历史数量

func (*HCPMessagePool) PutCM added in v0.3.3

func (m *HCPMessagePool) PutCM(v *ConsumerMessage)

func (*HCPMessagePool) PutPM added in v0.3.3

func (m *HCPMessagePool) PutPM(v *ProducerMessage)

type HttpProducer added in v0.3.5

type HttpProducer struct {
	// contains filtered or unexported fields
}

HttpProducer HTTP 方式生产者

# Usage:

	p := NewHttpProducer("127.0.0.1", "8080")
	p.SetToken("token")
	p.SetPath("/api/edge/product")	// 可选的

	resp, err := p.Send("topic", "key", []byte("value"))
	if err != nil {
		panic(err)
	}
	if !resp.IsOK() {
		// ok
	}

func NewHttpProducer added in v0.3.5

func NewHttpProducer(host, port string) *HttpProducer

NewHttpProducer 创建一个HTTP的生产者

func (*HttpProducer) Addr added in v0.3.5

func (p *HttpProducer) Addr() string

Addr broker 地址

func (*HttpProducer) AsyncUrl added in v0.3.5

func (p *HttpProducer) AsyncUrl() string

AsyncUrl 异步方法请求路由

func (*HttpProducer) CreateSHA added in v0.3.5

func (p *HttpProducer) CreateSHA(pass string) string

CreateSHA 计算字符串的HASH值,默认为SHA256

func (*HttpProducer) Post added in v0.3.5

func (p *HttpProducer) Post(topic, key string, form any) (*ProductResponse, error)

Post 发送消息

func (*HttpProducer) Send added in v0.3.5

func (p *HttpProducer) Send(topic, key string, value []byte) (*ProductResponse, error)

Send 发送消息

func (*HttpProducer) SetAsyncPath added in v0.3.5

func (p *HttpProducer) SetAsyncPath(path string) *HttpProducer

SetAsyncPath 修改broker异步方法路径

func (*HttpProducer) SetPath added in v0.3.5

func (p *HttpProducer) SetPath(path string) *HttpProducer

SetPath 修改broker路径

func (*HttpProducer) SetToken added in v0.3.5

func (p *HttpProducer) SetToken(token string) *HttpProducer

SetToken 设置认证密钥

func (*HttpProducer) Url added in v0.3.5

func (p *HttpProducer) Url() string

Url 请求路由

type Link interface {
	Connect() error
	Close() error
	SetTCPHandler(handler tcp.HandlerFunc)
	SetUDPHandler(handler func())
	Write(p []byte) (int, error) // 将切片buf中的内容追加到发数据缓冲区内,并返回写入的数据长度
	Drain() error                // 将缓冲区的数据发生到客户端
}

type PHandler added in v0.3.3

type PHandler struct{}

PHandler 默认实现

func (PHandler) OnClosed added in v0.3.3

func (h PHandler) OnClosed()

func (PHandler) OnConnected added in v0.3.3

func (h PHandler) OnConnected()

func (PHandler) OnNotImplementMessageType added in v0.3.3

func (h PHandler) OnNotImplementMessageType(frame *proto.TransferFrame, con transfer.Conn)

func (PHandler) OnRegisterExpire added in v0.3.3

func (h PHandler) OnRegisterExpire()

func (PHandler) OnRegisterFailed added in v0.3.3

func (h PHandler) OnRegisterFailed(status proto.MessageResponseStatus)

func (PHandler) OnRegistered added in v0.3.3

func (h PHandler) OnRegistered()

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer 生产者, 通过 Send 发送的消息并非会立即投递给服务端 而是会按照服务器下发的配置定时批量发送消息,通常为500ms

func NewAsyncProducer

func NewAsyncProducer(conf Config, handlers ...ProducerHandler) (*Producer, error)

NewAsyncProducer 创建异步生产者,无需再手动启动

func NewProducer

func NewProducer(conf Config, handlers ...ProducerHandler) *Producer

NewProducer 创建异步生产者,需手动启动

func (*Producer) Beautify

func (client *Producer) Beautify(data []byte) string

Beautify 格式化显示字节流

func (*Producer) Crypto added in v0.3.3

func (client *Producer) Crypto() proto.Crypto

Crypto 全局加密器

func (*Producer) Done

func (client *Producer) Done() <-chan struct{}

func (*Producer) HeartbeatInterval added in v0.3.3

func (client *Producer) HeartbeatInterval() time.Duration

HeartbeatInterval 心跳周期

func (*Producer) IsConnected

func (client *Producer) IsConnected() bool

IsConnected 与服务端是否连接成功

func (*Producer) IsRegistered

func (client *Producer) IsRegistered() bool

IsRegistered 向服务端注册消费者是否成功

func (*Producer) JSONMarshal

func (client *Producer) JSONMarshal(v any) ([]byte, error)

JSONMarshal 序列化方法

func (*Producer) JSONUnmarshal

func (client *Producer) JSONUnmarshal(data []byte, v any) error

func (*Producer) Logger

func (client *Producer) Logger() logger.Iface

func (*Producer) NewRecord

func (client *Producer) NewRecord() *ProducerMessage

NewRecord 从池中初始化一个新的消息记录

func (*Producer) Publisher

func (client *Producer) Publisher(msg *ProducerMessage) error

Publisher 发送消息

func (*Producer) PutRecord

func (client *Producer) PutRecord(msg *ProducerMessage)

PutRecord 主动归还消息记录到池,仅在主动调用 NewRecord 却没发送数据时使用

func (*Producer) Send

func (client *Producer) Send(fn func(record *ProducerMessage) error) error

Send 发送一条消息

func (*Producer) SetCrypto added in v0.3.4

func (client *Producer) SetCrypto(crypto proto.Crypto) *Producer

SetCrypto 修改全局加解密器, 必须在 Serve 之前设置

func (*Producer) SetCryptoPlan added in v0.3.4

func (client *Producer) SetCryptoPlan(option string, key ...string) *Producer

SetCryptoPlan 设置加密方案

@param	option	string		加密方案, 支持token/no (令牌加密和不加密)
@param	key 	[]string	其他加密参数

func (*Producer) Start

func (client *Producer) Start() error

func (*Producer) StatusOK added in v0.3.3

func (client *Producer) StatusOK() bool

StatusOK 连接状态是否正常,以及是否可以向服务器发送消息

func (*Producer) Stop

func (client *Producer) Stop()

func (*Producer) TokenCrypto added in v0.3.3

func (client *Producer) TokenCrypto() *proto.TokenCrypto

TokenCrypto Token加解密器,亦可作为全局加密器

type ProducerForm added in v0.3.5

type ProducerForm struct {
	Topic string `json:"topic" description:"消息主题"`
	Key   string `json:"key" description:"消息键"`
	Value string `json:"value" description:"base64编码后的消息体"`
	Token string `json:"token,omitempty" description:"认证密钥"`
}

ProducerForm 生产者消息投递表单, 不允许将多个消息编码成一个消息帧; token若为空则认为不加密; value是对加密后的消息体进行base64编码后的结果,依据token判断是否需要解密

func (ProducerForm) IsEncrypt added in v0.3.5

func (m ProducerForm) IsEncrypt() bool

func (ProducerForm) String added in v0.3.5

func (m ProducerForm) String() string

type ProducerHandler

type ProducerHandler interface {
	OnConnected()                                        // (同步执行)当连接成功时触发的事件, 此事件必须在执行完成之后才会进行后续的处理,因此需自行控制
	OnClosed()                                           // (同步执行)当连接中断时触发的事件, 此事件必须在执行完成之后才会进行重连操作(若有)
	OnRegistered()                                       // (同步执行)当注册成功触发的事件
	OnRegisterFailed(status proto.MessageResponseStatus) // (同步执行)当注册失败触发的事件
	OnRegisterExpire()                                   // (同步执行)当连接中断时触发的事件, 此事件必须在执行完成之后才会进行重连操作(若有)                                  // 阻塞调用
	// OnNotImplementMessageType 当收到一个未实现的消息帧时触发的事件
	OnNotImplementMessageType(frame *proto.TransferFrame, con transfer.Conn)
}

type ProducerMessage

type ProducerMessage struct {
	Topic string `json:"topic"`
	Key   string `json:"key"`
	Value []byte `json:"value"`
	// contains filtered or unexported fields
}

ProducerMessage 生产者直接发送的数据 会转换成 TransferFrame 后发送

func (*ProducerMessage) BindFromJSON added in v0.3.3

func (m *ProducerMessage) BindFromJSON(v any) error

BindFromJSON 从JSON模型获取序列化数据

func (*ProducerMessage) MarshalMethod added in v0.3.3

func (m *ProducerMessage) MarshalMethod() proto.MarshalMethodType

func (*ProducerMessage) MessageType added in v0.3.3

func (m *ProducerMessage) MessageType() proto.MessageType

func (*ProducerMessage) Reset added in v0.3.3

func (m *ProducerMessage) Reset()

func (*ProducerMessage) String added in v0.3.3

func (m *ProducerMessage) String() string

type ProductResponse added in v0.3.5

type ProductResponse struct {
	Status       string `` /* 126-byte string literal not displayed */
	Offset       uint64 `json:"offset" description:"消息偏移量"`
	ResponseTime int64  `json:"response_time" description:"服务端返回消息时的时间戳"`
	Message      string `json:"message" description:"额外的消息描述"`
}

ProductResponse 消息返回值; 仅当 status=Accepted 时才认为服务器接受了请求并正确的处理了消息

func (ProductResponse) Error added in v0.3.5

func (m ProductResponse) Error() error

func (ProductResponse) IsOK added in v0.3.5

func (m ProductResponse) IsOK() bool

IsOK 消息是否发送成功

func (ProductResponse) String added in v0.3.5

func (m ProductResponse) String() string

type Queue

type Queue = proto.Queue
type TCPLink struct {
	Host     string         `json:"host"`
	Port     string         `json:"port"`
	LinkType proto.LinkType `json:"link_type"`
	// contains filtered or unexported fields
}

func (*TCPLink) Close added in v0.3.3

func (l *TCPLink) Close() error

func (*TCPLink) Connect added in v0.3.3

func (l *TCPLink) Connect() error

Connect 阻塞式连接

func (*TCPLink) Drain added in v0.3.3

func (l *TCPLink) Drain() error

func (*TCPLink) SetTCPHandler added in v0.3.3

func (l *TCPLink) SetTCPHandler(handler tcp.HandlerFunc)

func (*TCPLink) SetUDPHandler added in v0.3.3

func (l *TCPLink) SetUDPHandler(_ func())

func (*TCPLink) Write added in v0.3.3

func (l *TCPLink) Write(p []byte) (int, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL