mqlib

package module
v1.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2024 License: Apache-2.0 Imports: 20 Imported by: 0

README

Go library for communicating with Apache RocketMQ

Documentation

Index

Constants

View Source
const (
	DEFAULT_RETRY = 3

	MSG_IDS_CACHE_EX = time.Hour * 2
)
View Source
const (
	DefaultRpcTTL = time.Minute * 10 // 默认rpc请求timeout

)

Variables

View Source
var (
	ErrSendFailed   = errors.New("failed to send msg")
	ErrFetchFailed  = errors.New("failed to fetch msg")
	ErrMisformedMsg = errors.New("input msg is invalid")
	ErrMsgTimeout   = errors.New("msg timeout")
	ErrReachedLimit = errors.New("reached concurrent limit")
)
View Source
var (
	ErrNewTopicAfterStart = errors.New("add new topic after consumer started")
)

Functions

func CreateTopic

func CreateTopic(nsName string, topics ...string) (err error)

func GetTTLFromContext

func GetTTLFromContext(ctx context.Context) (ttl time.Duration)

func GetUniqKey

func GetUniqKey() string

Types

type Consumer

type Consumer struct {
	sync.RWMutex
	GroupName string
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(gpName, nsName string, broadcast bool, topics ...Topic) (c *Consumer, err error)

func (*Consumer) SetDeDup

func (c *Consumer) SetDeDup(on bool)

func (*Consumer) Shutdown

func (c *Consumer) Shutdown()

func (*Consumer) UnsubscribeTopic

func (c *Consumer) UnsubscribeTopic(topic string) error

type GenericClient

type GenericClient interface {
	RpcSrvClient
	PubSubClient
}

func NewGenericClient

func NewGenericClient(nameServer, app string, subDispatcher SubDispatcher, ttl time.Duration, topics ...Topic) (gc GenericClient, err error)

可发布/订阅消息,同时接收RPC调用的MQ客户端

type Message

type Message struct {
	Id        string   `json:"id,omitempty"`         // 消息ID
	RemoteApp string   `json:"remote_app,omitempty"` // RPC调用服务端应用名称
	Topic     string   `json:"topic,omitempty"`      // 消息主题(可选)
	Tag       string   `json:"tag"`                  // RPC调用接口名称
	Keys      []string `json:"keys"`                 // 携带的业务KEY(可选)
	Body      []byte   `json:"-"`                    // 业务消息体
	// contains filtered or unexported fields
}

func (*Message) ToRkMessage

func (m *Message) ToRkMessage() *primitive.Message

func (*Message) ToString

func (m *Message) ToString() string

type Producer

type Producer struct {
	GroupName string
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(gpName, nsName string) (p *Producer, err error)

func (*Producer) Shutdown

func (p *Producer) Shutdown()

type PubClient

type PubClient interface {
	GetPub() *Producer
	SetRpcTimeout(ttl time.Duration)
	SendMessage(msg *Message) error
	Request(msg *Message, ctx ...context.Context) (resp *Message, err error)
	RequestAsync(msg *Message, out chan *Message, ctx ...context.Context) error
	RequestAsyncWithFunc(msg *Message, callback RpcCallback, ctx ...context.Context) error
	// contains filtered or unexported methods
}

func NewClient

func NewClient(nameServer, app string, ttl ...time.Duration) (pc PubClient, err error)

只做RPC调用/发布消息的MQ客户端

type PubSubClient

type PubSubClient interface {
	PubClient
	SubClient
}

func NewPubSubClient

func NewPubSubClient(nameServer, consumerGroup string, topics ...Topic) (psc PubSubClient, err error)

可发布/订阅消息的MQ客户端

type RpcCallback

type RpcCallback = func(msg *Message)

type RpcSrvClient

type RpcSrvClient interface {
	Respond(msg *Message) error
	// contains filtered or unexported methods
}

func NewSrvClient

func NewSrvClient(nameServer, app string, subDispatcher SubDispatcher) (rsc RpcSrvClient, err error)

只接收RPC调用的MQ客户端

type SubCallback

type SubCallback = func(msg *Message) error

type SubClient

type SubClient interface {
	GetSub() *Consumer
	SetDeDup(bool)
	// contains filtered or unexported methods
}

func NewBroadcastSubClient

func NewBroadcastSubClient(nameServer, consumerGroup string, topics ...Topic) (sc SubClient, err error)

以广播模式订阅消息的MQ客户端

func NewSubClient

func NewSubClient(nameServer, consumerGroup string, topics ...Topic) (sc SubClient, err error)

只订阅消息的MQ客户端

type SubDispatcher

type SubDispatcher interface {
	ProcessMsg(msg *Message) error
}

type Topic

type Topic struct {
	Name     string      // 订阅的topic名称
	Tags     []string    // 订阅的topic下的tag列表,可留空(表示订阅所有tag),订阅后不可修改
	Callback SubCallback // 订阅的topic+tag收到消息时的回调,在第一次订阅topic时指定,订阅后不可修改
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL