client

package
v0.0.0-...-693787e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2019 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildMQClientID

func BuildMQClientID(ip, unitName, instanceName string) string

BuildMQClientID build the mq client ID

func IsTag

func IsTag(typ string) bool

IsTag returns true if the expresstion type is "TAG" or empty string, false otherwise

func ParseTags

func ParseTags(expr string) []string

ParseTags parse the expression as tag elements

Types

type Admin

type Admin interface {
	Group() string
}

Admin admin operation

type Config

type Config struct {
	HeartbeatBrokerInterval time.Duration
	PollNameServerInterval  time.Duration
	NameServerAddrs         []string
}

Config the remote client configurations

type ConsumeDirectlyResult

type ConsumeDirectlyResult = rpc.ConsumeDirectlyResult

ConsumeDirectlyResult the flag of the consuming directly

const (
	Success ConsumeDirectlyResult = iota
	Later
	Rollback
	Commit
	Error
	ReturnNil
)

predefined `ConsumeDirectlyResult` values

type ConsumeMessageDirectlyResult

type ConsumeMessageDirectlyResult = rpc.ConsumeMessageDirectlyResult

ConsumeMessageDirectlyResult consume the message directly, sending by the broker

type Consumer

type Consumer interface {
	Group() string
	SubscribeTopics() []string
	UpdateTopicSubscribe(topic string, router *route.TopicRouter)
	NeedUpdateTopicSubscribe(topic string) bool
	ConsumeFromWhere() string
	Model() string
	Type() string
	UnitMode() bool
	Subscriptions() []*SubscribeData
	ReblanceQueue()
	RunningInfo() RunningInfo
	ResetOffset(topic string, offsets map[message.Queue]int64) error
	ConsumeMessageDirectly(msg *message.Ext, broker string) (ConsumeMessageDirectlyResult, error)
}

Consumer interface needed by reblance

type ExprType

type ExprType string

ExprType the filter type of the subcription

const (
	// ExprTypeTag tag filter
	// Only support or operation such as
	// "tag1 || tag2 || tag3", <br>
	// If null or * expression,meaning subscribe all.
	ExprTypeTag ExprType = "TAG"

	// ExprTypeSQL92 sql filter
	//
	//Keywords:
	//AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL
	//Boolean, like: TRUE, FALSE
	//String, like: 'abc'
	//Decimal, like: 123
	//Float number, like: 3.1415
	//
	//Grammar:
	//AND, OR
	//>, >=, <, <=, =
	//BETWEEN A AND B, equals to >=A AND <=B
	//NOT BETWEEN A AND B, equals to >B OR <A
	//IN ('a', 'b'), equals to ='a' OR ='b', this operation only support String type.
	//IS NULL, IS NOT NULL, check parameter whether is null, or not.
	//=TRUE, =FALSE, check parameter whether is true, or false.
	//
	//Example: (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
	ExprTypeSQL92 ExprType = "SQL92"

	// ExprAll the tags indicates subscribing all the message
	ExprAll = "*"
)

func (ExprType) String

func (t ExprType) String() string

type FindBrokerResult

type FindBrokerResult struct {
	Addr    string
	IsSlave bool
	Version int32
}

FindBrokerResult the data returned by FindBrokerAddr

type MQClient

type MQClient struct {
	Config
	sync.RWMutex

	sync.WaitGroup

	remote.Client
	// contains filtered or unexported fields
}

MQClient the client commuicate with broker it's shared by all consumer & producer with the same client id

func New

func New(config *Config, clientID string, logger log.Logger) (c *MQClient, err error)

New create the client

func (*MQClient) AdminCount

func (c *MQClient) AdminCount() int

AdminCount return the registered admin count

func (*MQClient) ConsumeMessageDirectly

func (c *MQClient) ConsumeMessageDirectly(addr, group, clientID, offsetID string) (ConsumeMessageDirectlyResult, error)

ConsumeMessageDirectly send the request to broker to push the message specified by the offsetID to the specified client in the group

func (*MQClient) ConsumerCount

func (c *MQClient) ConsumerCount() int

ConsumerCount return the registered consumer count

func (*MQClient) CreateOrUpdateTopic

func (c *MQClient) CreateOrUpdateTopic(
	addr string, header *rpc.CreateOrUpdateTopicHeader, to time.Duration,
) error

CreateOrUpdateTopic create topic from broker

func (*MQClient) DeleteTopicInBroker

func (c *MQClient) DeleteTopicInBroker(addr, topic string, to time.Duration) error

DeleteTopicInBroker delete topic in the broker

func (*MQClient) DeleteTopicInNamesrv

func (c *MQClient) DeleteTopicInNamesrv(addr, topic string, to time.Duration) error

DeleteTopicInNamesrv delete topic in the broker

func (*MQClient) FindAnyBrokerAddr

func (c *MQClient) FindAnyBrokerAddr(brokerName string) (
	*FindBrokerResult, error,
)

FindAnyBrokerAddr returns any broker whose name is the specified name

func (*MQClient) FindBrokerAddr

func (c *MQClient) FindBrokerAddr(brokerName string, hintBrokerID int32, lock bool) (
	*FindBrokerResult, error,
)

FindBrokerAddr finds the broker address, returns the address with specified broker id first otherwise, returns any one

func (*MQClient) GetBrokerClusterInfo

func (c *MQClient) GetBrokerClusterInfo(addr string, to time.Duration) (*route.ClusterInfo, error)

GetBrokerClusterInfo get the cluster info from the namesrv

func (*MQClient) GetConsumerIDs

func (c *MQClient) GetConsumerIDs(addr, group string, to time.Duration) (ids []string, err error)

GetConsumerIDs get the client id from the broker wraper

func (*MQClient) GetMasterBrokerAddr

func (c *MQClient) GetMasterBrokerAddr(brokerName string) string

GetMasterBrokerAddr returns the master broker address

func (*MQClient) GetMasterBrokerAddrs

func (c *MQClient) GetMasterBrokerAddrs() []string

GetMasterBrokerAddrs returns all the master broker addresses

func (*MQClient) LockMessageQueues

func (c *MQClient) LockMessageQueues(
	broker, group string, queues []message.Queue, to time.Duration,
) (
	[]message.Queue, error,
)

LockMessageQueues send lock message queue request to the broker

func (*MQClient) MaxOffset

func (c *MQClient) MaxOffset(addr, topic string, queueID uint8, to time.Duration) (
	int64, *rpc.Error,
)

MaxOffset returns the max offset in the consume queue

func (*MQClient) ProducerCount

func (c *MQClient) ProducerCount() int

ProducerCount return the registered producer count

func (*MQClient) PullMessageAsync

func (c *MQClient) PullMessageAsync(
	addr string, header *rpc.PullHeader, to time.Duration, callback func(*rpc.PullResponse, error),
) error

PullMessageAsync pull message async

func (*MQClient) PullMessageSync

func (c *MQClient) PullMessageSync(addr string, header *rpc.PullHeader, to time.Duration) (
	pr *rpc.PullResponse, err error,
)

PullMessageSync pull message sync

func (*MQClient) QueryConsumerOffset

func (c *MQClient) QueryConsumerOffset(addr, topic, group string, queueID int, timeout time.Duration) (
	int64, *rpc.Error,
)

QueryConsumerOffset query cosnume offset wraper

func (*MQClient) QueryMessageByOffset

func (c *MQClient) QueryMessageByOffset(addr string, offset int64, to time.Duration) (
	*message.Ext, error,
)

QueryMessageByOffset querys the message by message id

func (*MQClient) RegisterAdmin

func (c *MQClient) RegisterAdmin(a Admin) error

RegisterAdmin registers admin

func (*MQClient) RegisterConsumer

func (c *MQClient) RegisterConsumer(co Consumer) error

RegisterConsumer registers consumer

func (*MQClient) RegisterFilter

func (c *MQClient) RegisterFilter(group string, subData *SubscribeData) error

RegisterFilter register the filter to the broker

func (*MQClient) RegisterProducer

func (c *MQClient) RegisterProducer(p Producer) error

RegisterProducer registers producer

func (*MQClient) ResetConsumeOffset

func (c *MQClient) ResetConsumeOffset(
	addr, topic, group string, timestamp time.Time, isForce bool, timeout time.Duration,
) (
	map[message.Queue]int64, error,
)

ResetConsumeOffset requests the broker to reset the offsets of the specified topic, the offsets' owner is specified by the group

func (*MQClient) SearchOffsetByTimestamp

func (c *MQClient) SearchOffsetByTimestamp(
	addr, topic string, queueID uint8, timestamp time.Time, to time.Duration,
) (
	int64, *rpc.Error,
)

SearchOffsetByTimestamp returns the offset of the specified message queue and the timestamp

func (*MQClient) SendBack

func (c *MQClient) SendBack(addr string, h *rpc.SendBackHeader, to time.Duration) (err error)

SendBack send back the message

func (*MQClient) SendHeartbeat

func (c *MQClient) SendHeartbeat()

SendHeartbeat send the heart beart to the broker server, it locked when sending the data

func (*MQClient) SendMessageSync

func (c *MQClient) SendMessageSync(
	broker string, data []byte, header *rpc.SendHeader, timeout time.Duration,
) (
	*rpc.SendResponse, error,
)

SendMessageSync send message to the broker

func (*MQClient) Shutdown

func (c *MQClient) Shutdown()

Shutdown client

func (*MQClient) Start

func (c *MQClient) Start() error

Start client tasks

func (*MQClient) UnlockMessageQueuesOneway

func (c *MQClient) UnlockMessageQueuesOneway(group, broker string, queues []message.Queue) error

UnlockMessageQueuesOneway send unlock message queue request to the broker

func (*MQClient) UnregisterAdmin

func (c *MQClient) UnregisterAdmin(group string)

UnregisterAdmin unregister producer

func (*MQClient) UnregisterConsumer

func (c *MQClient) UnregisterConsumer(group string)

UnregisterConsumer unregister producer

func (*MQClient) UnregisterProducer

func (c *MQClient) UnregisterProducer(group string)

UnregisterProducer unregister producer

func (*MQClient) UpdateConsumerOffset

func (c *MQClient) UpdateConsumerOffset(
	addr, topic, group string, queueID int, offset int64, timeout time.Duration,
) error

UpdateConsumerOffset updates the offset of the consumer queue wraper

func (*MQClient) UpdateConsumerOffsetOneway

func (c *MQClient) UpdateConsumerOffsetOneway(
	addr, topic, group string, queueID int, offset int64,
) error

UpdateConsumerOffsetOneway updates the offset of the consumer queue wraper

func (*MQClient) UpdateTopicRouterInfoFromNamesrv

func (c *MQClient) UpdateTopicRouterInfoFromNamesrv(topic string) (err error)

UpdateTopicRouterInfoFromNamesrv udpate the topic for the producer/consumer from the namesrv

type Producer

type Producer interface {
	Group() string
	PublishTopics() []string
	UpdateTopicPublish(topic string, router *route.TopicRouter)
	NeedUpdateTopicPublish(topic string) bool
}

Producer interface needed by reblance

type RunningInfo

type RunningInfo struct {
	Properties    map[string]string `json:"properties"`
	Subscriptions []*SubscribeData  `json:"subscriptionSet"`
}

RunningInfo consumer running information

type SubscribeData

type SubscribeData struct {
	Topic             string   `json:"topic"`
	Expr              string   `json:"subString"`
	Type              string   `json:"expressionType"`
	Tags              []string `json:"tagsSet"`
	Codes             []uint32 `json:"codeSet"`
	Version           int64    `json:"subVersion"`
	IsClassFilterMode bool     `json:"classFilterMode"`
	FilterClassSource string   `json:"-"`
}

SubscribeData subscription information

func BuildSubscribe

func BuildSubscribe(group, topic, expr string, typ ExprType) *SubscribeData

BuildSubscribe build the subscribe data with tag type

func (*SubscribeData) Equal

func (s *SubscribeData) Equal(o *SubscribeData) bool

Equal returns true if equals another, otherwise false

func (*SubscribeData) String

func (s *SubscribeData) String() string

type SubscribeDataTable

type SubscribeDataTable struct {
	// contains filtered or unexported fields
}

SubscribeDataTable contains the subscription information of topic, the operations is thread-safe NOTE: donot modify directly

func NewSubcribeTable

func NewSubcribeTable() *SubscribeDataTable

NewSubcribeTable creates one DataTable

func (*SubscribeDataTable) Datas

func (t *SubscribeDataTable) Datas() []*SubscribeData

Datas returns the subscribed datas

func (*SubscribeDataTable) Delete

func (t *SubscribeDataTable) Delete(topic string) *SubscribeData

Delete deletes the data of the specified topic, and return the previous one

func (*SubscribeDataTable) Get

func (t *SubscribeDataTable) Get(topic string) *SubscribeData

Get returns the subcribe data of the topic

func (*SubscribeDataTable) Put

Put stores the subcribe data and returns the previous one

func (*SubscribeDataTable) PutIfAbsent

func (t *SubscribeDataTable) PutIfAbsent(topic string, d *SubscribeData) *SubscribeData

PutIfAbsent stores the subcribe data and returns the previous one

func (*SubscribeDataTable) Topics

func (t *SubscribeDataTable) Topics() []string

Topics returns the topics

type SubscribeQueueTable

type SubscribeQueueTable struct {
	// contains filtered or unexported fields
}

SubscribeQueueTable contains the message queues of topic, the operations is thread-safe

func NewSubscribeQueueTable

func NewSubscribeQueueTable() *SubscribeQueueTable

NewSubscribeQueueTable creates the QueueTable

func (*SubscribeQueueTable) Delete

func (t *SubscribeQueueTable) Delete(topic string) []*message.Queue

Delete returns the topics

func (*SubscribeQueueTable) Get

func (t *SubscribeQueueTable) Get(topic string) []*message.Queue

Get returns the queues of the topic

func (*SubscribeQueueTable) Put

func (t *SubscribeQueueTable) Put(topic string, q []*message.Queue) []*message.Queue

Put stores the queues and returns the previous queue

func (*SubscribeQueueTable) Topics

func (t *SubscribeQueueTable) Topics() []string

Topics returns the topics

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL