mq

package
v0.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 16, 2022 License: MIT Imports: 15 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrMessageIsNil     = errors.New("message is nil show alloc memory")
	ErrMessageIsInvalid = errors.New("message is invalid")
)

Functions

func DefaultDecodeFunc

func DefaultDecodeFunc(ctx context.Context, message Message, args interface{}) error

func DefaultErrorHandler

func DefaultErrorHandler(err error)

func GetMiddlewareFromContext

func GetMiddlewareFromContext(ctx context.Context) []middleware.Middleware

func MiddlewareWithContext

func MiddlewareWithContext(ctx context.Context, list ...middleware.Middleware) context.Context

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(producer Producer, opts ...ClientOptionFunc) *Client

func (*Client) Invoke

func (x *Client) Invoke(ctx context.Context, topic string, args interface{}) error

type ClientOptionFunc

type ClientOptionFunc func(*clientOptions)

func ClientOptionWithEncodeFunc

func ClientOptionWithEncodeFunc(f EncodeFunc) ClientOptionFunc

ClientOptionWithEncodeFunc 中间件

type Consumer

type Consumer interface {
	// Subscribe topic 是主题,相同主题不同 channel 可以分发相同消息实现广播
	Subscribe(ctx context.Context, topic string, channel string) (<-chan Message, error)
	// Close 停止消费
	Close(ctx context.Context) error
}

type DecodeFunc

type DecodeFunc func(ctx context.Context, message Message, args interface{}) error

type EncodeFunc

type EncodeFunc func(ctx context.Context, args interface{}) (Message, error)

type ErrorHandler

type ErrorHandler interface {
	Handle(err error)
}

type ErrorHandlerFunc

type ErrorHandlerFunc func(err error)

func (ErrorHandlerFunc) Handle

func (f ErrorHandlerFunc) Handle(err error)

type HandleFunc

type HandleFunc func(ctx context.Context, message Message)

func (HandleFunc) Handle

func (f HandleFunc) Handle(ctx context.Context, message Message)

type Handler

type Handler interface {
	Handle(ctx context.Context, message Message)
}

type Message

type Message interface {
	// Metadata 元数据
	Metadata() metadata.Metadata
	// Payload 荷载信息
	Payload() Payload
	// Err 错误
	Err() error
	// UniKey 消息唯一键 用于消息去重(已经消费的消息不再消费)
	UniKey() string

	// Check 检查消息完整性
	Check() error
	// Marshal 序列化消息
	Marshal() ([]byte, error)
	// UnMarshal 解析消息
	UnMarshal([]byte) error
}

Message 消息定义

func DefaultEncodeFunc

func DefaultEncodeFunc(ctx context.Context, args interface{}) (Message, error)

func NewMessage added in v0.0.2

func NewMessage(payload Payload, opts ...MessageOption) Message

func NewMessageFromByte added in v0.0.2

func NewMessageFromByte(b []byte) (Message, error)

type MessageOption added in v0.0.2

type MessageOption func(x *MessageV1)

func MessageOptionWithMetadata added in v0.0.2

func MessageOptionWithMetadata(md metadata.Metadata) MessageOption

type MessageV1 added in v0.0.2

type MessageV1 messagev1.Message

func (*MessageV1) Check added in v0.0.2

func (x *MessageV1) Check() error

func (*MessageV1) Err added in v0.0.2

func (x *MessageV1) Err() error

func (*MessageV1) Marshal added in v0.0.2

func (x *MessageV1) Marshal() ([]byte, error)

func (*MessageV1) Metadata added in v0.0.2

func (x *MessageV1) Metadata() metadata.Metadata

func (*MessageV1) Payload added in v0.0.2

func (x *MessageV1) Payload() Payload

func (*MessageV1) UnMarshal added in v0.0.2

func (x *MessageV1) UnMarshal(bytes []byte) error

func (*MessageV1) UniKey added in v0.0.2

func (x *MessageV1) UniKey() string

type Payload

type Payload []byte

type Producer

type Producer interface {
	// Publish topic 是主题 向 topic 投递消息,监听此 topic 的 Producer 可以收到消息
	Publish(ctx context.Context, topic string, message Message) error
	// Close 停止消费
	Close(ctx context.Context) error
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(ctx context.Context, consumer Consumer, opts ...ServerOptionFunc) *Server

func (*Server) DecodeFunc

func (x *Server) DecodeFunc() DecodeFunc

func (*Server) ErrHandler

func (x *Server) ErrHandler() ErrorHandler

func (*Server) Start

func (x *Server) Start(ctx context.Context) error

func (*Server) Stop

func (x *Server) Stop(ctx context.Context) error

func (*Server) Subscriber

func (x *Server) Subscriber(topic string, channel string, handler Handler, ms ...middleware.Middleware) error

type ServerOptionFunc

type ServerOptionFunc func(*serverOptions)

func ServerOptionWithConcurrencyNum

func ServerOptionWithConcurrencyNum(num int32) ServerOptionFunc

ServerOptionWithConcurrencyNum 并发数

func ServerOptionWithDecodeFunc

func ServerOptionWithDecodeFunc(f DecodeFunc) ServerOptionFunc

ServerOptionWithDecodeFunc 解码函数

func ServerOptionWithErrHandleFunc

func ServerOptionWithErrHandleFunc(f ErrorHandlerFunc) ServerOptionFunc

ServerOptionWithErrHandleFunc 运行错误处理函数

func ServerOptionWithErrHandler

func ServerOptionWithErrHandler(handler ErrorHandler) ServerOptionFunc

ServerOptionWithErrHandler 运行错误处理器

func ServerOptionWithMiddleware

func ServerOptionWithMiddleware(ms ...middleware.Middleware) ServerOptionFunc

ServerOptionWithMiddleware 中间件

func ServerOptionWithScaleThreshold

func ServerOptionWithScaleThreshold(num int32) ServerOptionFunc

ServerOptionWithScaleThreshold 阈值 任务队列长度超过 这个数字 则会增加 go routine

func ServerOptionWithServerName

func ServerOptionWithServerName(name string) ServerOptionFunc

ServerOptionWithServerName 服务名定义

func ServerOptionWithTimeout

func ServerOptionWithTimeout(duration time.Duration) ServerOptionFunc

ServerOptionWithTimeout 超时控制

Directories

Path Synopsis
message
v1

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL