Documentation ¶
Index ¶
- Constants
- func ClearCommitOffset(flag int32) int32
- func Home() (string, error)
- type Averagely
- type ConcurrentlyConsumer
- type ConcurrentlyContext
- type Config
- type ConsumeConcurrentlyStatus
- type ConsumeOrderlyStatus
- type ExprType
- type MessageQueueChanger
- type Model
- type OrderlyConsumer
- type OrderlyContext
- type PullConsumer
- func (c PullConsumer) ConsumeFromWhere() string
- func (c *PullConsumer) ConsumeMessageDirectly(msg *message.Ext, broker string) (r client.ConsumeMessageDirectlyResult, err error)
- func (c PullConsumer) Group() string
- func (c PullConsumer) Model() string
- func (c PullConsumer) NeedUpdateTopicSubscribe(topic string) bool
- func (c PullConsumer) PersistOffset()
- func (c *PullConsumer) PullSync(q *message.Queue, expr string, offset int64, maxCount int) (*PullResult, error)
- func (c *PullConsumer) PullSyncBlockIfNotFound(q *message.Queue, expr string, offset int64, maxCount int) (*PullResult, error)
- func (c PullConsumer) QueryConsumerOffset(q *message.Queue) (int64, error)
- func (c PullConsumer) QueryMaxOffset(q *message.Queue) (int64, error)
- func (c PullConsumer) ReblanceQueue()
- func (c *PullConsumer) Register(topics []string, listener MessageQueueChanger)
- func (c *PullConsumer) ResetOffset(topic string, offsets map[message.Queue]int64) error
- func (c *PullConsumer) RunningInfo() client.RunningInfo
- func (c PullConsumer) SendBack(m *message.Ext, delayLevel int, group, brokerName string) error
- func (c *PullConsumer) Subscribe(topic string)
- func (c PullConsumer) SubscribeTopics() []string
- func (c PullConsumer) Subscriptions() []*client.SubscribeData
- func (c PullConsumer) Type() string
- func (c PullConsumer) UnitMode() bool
- func (c PullConsumer) Unsubscribe(topic string)
- func (c PullConsumer) UpdateOffset(q *message.Queue, offset int64, oneway bool) error
- func (c PullConsumer) UpdateTopicSubscribe(topic string, router *route.TopicRouter)
- type PullResult
- type PullStatus
- type PushConsumer
- func (c PushConsumer) ConsumeFromWhere() string
- func (c *PushConsumer) ConsumeMessageDirectly(msg *message.Ext, broker string) (r client.ConsumeMessageDirectlyResult, err error)
- func (c PushConsumer) Group() string
- func (c *PushConsumer) Lock(broker string, mqs []message.Queue) ([]message.Queue, error)
- func (c PushConsumer) Model() string
- func (c PushConsumer) NeedUpdateTopicSubscribe(topic string) bool
- func (c *PushConsumer) Pause()
- func (c PushConsumer) PersistOffset()
- func (c PushConsumer) QueryConsumerOffset(q *message.Queue) (int64, error)
- func (c PushConsumer) QueryMaxOffset(q *message.Queue) (int64, error)
- func (c PushConsumer) ReblanceQueue()
- func (c *PushConsumer) ResetOffset(topic string, offsets map[message.Queue]int64) error
- func (c *PushConsumer) Resume()
- func (c *PushConsumer) RunningInfo() client.RunningInfo
- func (c *PushConsumer) SendBack(m *message.Ext, delayLevel int, broker string) error
- func (c PushConsumer) Subscribe(topic string, expr string)
- func (c PushConsumer) SubscribeTopics() []string
- func (c PushConsumer) Subscriptions() []*client.SubscribeData
- func (c PushConsumer) Type() string
- func (c PushConsumer) UnitMode() bool
- func (c *PushConsumer) Unlock(mq message.Queue) error
- func (c PushConsumer) Unsubscribe(topic string)
- func (c PushConsumer) UpdateOffset(q *message.Queue, offset int64, oneway bool) error
- func (c PushConsumer) UpdateTopicSubscribe(topic string, router *route.TopicRouter)
- type Type
Constants ¶
const ( // ExprTypeTag see client.ExprTypeTag ExprTypeTag = client.ExprTypeTag // ExprTypeSQL92 see client.ExprTypeSQL92 ExprTypeSQL92 = client.ExprTypeSQL92 )
const ( PullCommitOffset = 1 << iota PullSuspend PullSubscribe PullClassFilter )
pull flags
const ( // ConsumeFromLastOffset from the offset stored, if not found from the maxoffset ConsumeFromLastOffset fromWhere = iota // ConsumeFromFirstOffset from the offset stored, if not found from the zeor ConsumeFromFirstOffset // ConsumeFromTimestamp from the offset specified by the timestamp ConsumeFromTimestamp )
const ( ReadOffsetFromMemory = iota ReadOffsetFromStore ReadOffsetMemoryFirstThenStore )
read offset type
const ( PullTimeDelayWhenException = time.Second * 3 PullTimeDelayWhenFlowControl = time.Millisecond * 50 PullTimeDelayWhenPause = time.Second BrokerSuspendMaxTime = time.Second * 15 ConsumerTimeoutWhenSuspend = time.Second * 30 )
times defined
Variables ¶
This section is empty.
Functions ¶
func ClearCommitOffset ¶
ClearCommitOffset clears the commitoffset flag
Types ¶
type Averagely ¶
type Averagely struct{}
Averagely reblance average
type ConcurrentlyConsumer ¶
type ConcurrentlyConsumer interface {
Consume(messages []*message.Ext, ctx *ConcurrentlyContext) ConsumeConcurrentlyStatus
}
ConcurrentlyConsumer consumer consumes the messages concurrently
type ConcurrentlyContext ¶
type ConcurrentlyContext struct { MessageQueue *message.Queue // message cosnume retry strategy // -1, no retry, put into DLQ directly // 0, broker control retry frequency // >0, client control retry frequency DelayLevelWhenNextConsume int // the index of the message, any message with the index greater than this one is consumed failed AckIndex int }
ConcurrentlyContext consume concurrently context
type Config ¶
type Config struct { rocketmq.Client ReblanceInterval time.Duration MessageModel Model Typ Type FromWhere fromWhere MaxReconsumeTimes int }
Config the configuration of consumer
type ConsumeConcurrentlyStatus ¶
type ConsumeConcurrentlyStatus int
ConsumeConcurrentlyStatus consume concurrently result
const ( ConcurrentlySuccess ConsumeConcurrentlyStatus = iota ReconsumeLater )
predefined consume concurrently result
func (ConsumeConcurrentlyStatus) String ¶
func (s ConsumeConcurrentlyStatus) String() string
type ConsumeOrderlyStatus ¶
type ConsumeOrderlyStatus int
ConsumeOrderlyStatus consume orderly result
const ( OrderlySuccess ConsumeOrderlyStatus = iota SuspendCurrentQueueMoment )
predefined consume concurrently result
type MessageQueueChanger ¶
MessageQueueChanger the callback when the consume queue is changed
type OrderlyConsumer ¶
type OrderlyConsumer interface {
Consume(messages []*message.Ext, ctx *OrderlyContext) ConsumeOrderlyStatus
}
OrderlyConsumer consume orderly logic
type OrderlyContext ¶
type OrderlyContext struct { Queue *message.Queue SupsendCurrentQueueTime time.Duration // contains filtered or unexported fields }
OrderlyContext consume orderly context
type PullConsumer ¶
type PullConsumer struct {
// contains filtered or unexported fields
}
PullConsumer consumes the messages using pulling method
func NewPullConsumer ¶
func NewPullConsumer(group string, namesrvAddrs []string, logger log.Logger) *PullConsumer
NewPullConsumer creates consumer using the pull model
func (PullConsumer) ConsumeFromWhere ¶
func (c PullConsumer) ConsumeFromWhere() string
func (*PullConsumer) ConsumeMessageDirectly ¶
func (c *PullConsumer) ConsumeMessageDirectly( msg *message.Ext, broker string, ) ( r client.ConsumeMessageDirectlyResult, err error, )
ConsumeMessageDirectly NOOPS
func (PullConsumer) NeedUpdateTopicSubscribe ¶
func (PullConsumer) PersistOffset ¶
func (c PullConsumer) PersistOffset()
func (*PullConsumer) PullSync ¶
func (c *PullConsumer) PullSync(q *message.Queue, expr string, offset int64, maxCount int) ( *PullResult, error, )
PullSync pull the messages sync
func (*PullConsumer) PullSyncBlockIfNotFound ¶
func (c *PullConsumer) PullSyncBlockIfNotFound( q *message.Queue, expr string, offset int64, maxCount int, ) ( *PullResult, error, )
PullSyncBlockIfNotFound pull the messages sync and block when no message
func (PullConsumer) QueryConsumerOffset ¶
func (PullConsumer) QueryMaxOffset ¶
func (PullConsumer) ReblanceQueue ¶
func (c PullConsumer) ReblanceQueue()
ReblanceQueue reblances the consume queues between the different consumers
func (*PullConsumer) Register ¶
func (c *PullConsumer) Register(topics []string, listener MessageQueueChanger)
Register register the message queue changed event of the topics it must be called before calling Start function, otherwise the topic is not subscribe
func (*PullConsumer) ResetOffset ¶
ResetOffset NOOPS
func (*PullConsumer) RunningInfo ¶
func (c *PullConsumer) RunningInfo() client.RunningInfo
RunningInfo returns the consumter's running information
func (*PullConsumer) Subscribe ¶
func (c *PullConsumer) Subscribe(topic string)
Subscribe subscribe the topic dynamic
func (PullConsumer) SubscribeTopics ¶
func (c PullConsumer) SubscribeTopics() []string
func (PullConsumer) Subscriptions ¶
func (c PullConsumer) Subscriptions() []*client.SubscribeData
func (PullConsumer) Unsubscribe ¶
func (c PullConsumer) Unsubscribe(topic string)
func (PullConsumer) UpdateOffset ¶
func (PullConsumer) UpdateTopicSubscribe ¶
func (c PullConsumer) UpdateTopicSubscribe(topic string, router *route.TopicRouter)
UpdateTopicSubscribe only updates the subsribed topic
type PullResult ¶
type PullResult struct { NextBeginOffset int64 MinOffset int64 MaxOffset int64 Messages []*message.Ext Status PullStatus }
PullResult pull result
func (*PullResult) String ¶
func (pr *PullResult) String() string
type PullStatus ¶
type PullStatus int8
PullStatus pull status
const ( // Found find the message Found PullStatus = iota // NoNewMessage no message in the broker NoNewMessage // NoMatchedMessage no matched with expr NoMatchedMessage // OffsetIllegal illegal offset OffsetIllegal )
func (PullStatus) String ¶
func (s PullStatus) String() string
type PushConsumer ¶
type PushConsumer struct {
// contains filtered or unexported fields
}
PushConsumer the consumer with push model
func NewConcurrentlyConsumer ¶
func NewConcurrentlyConsumer( group string, namesrvAddrs []string, userConsumer ConcurrentlyConsumer, logger log.Logger, ) ( c *PushConsumer, err error, )
NewConcurrentlyConsumer creates the push consumer consuming the message concurrently
func NewOrderlyConsumer ¶
func NewOrderlyConsumer( group string, namesrvAddrs []string, userConsumer OrderlyConsumer, logger log.Logger, ) ( c *PushConsumer, err error, )
NewOrderlyConsumer creates the push consumer consuming the message orderly
func (PushConsumer) ConsumeFromWhere ¶
func (c PushConsumer) ConsumeFromWhere() string
func (*PushConsumer) ConsumeMessageDirectly ¶
func (c *PushConsumer) ConsumeMessageDirectly( msg *message.Ext, broker string, ) ( r client.ConsumeMessageDirectlyResult, err error, )
ConsumeMessageDirectly consume the specified message notified by the broker
func (PushConsumer) NeedUpdateTopicSubscribe ¶
func (*PushConsumer) Pause ¶
func (c *PushConsumer) Pause()
Pause pause the consumer, this operation is thread-safe
func (PushConsumer) PersistOffset ¶
func (c PushConsumer) PersistOffset()
func (PushConsumer) QueryConsumerOffset ¶
func (PushConsumer) QueryMaxOffset ¶
func (PushConsumer) ReblanceQueue ¶
func (c PushConsumer) ReblanceQueue()
ReblanceQueue reblances the consume queues between the different consumers
func (*PushConsumer) ResetOffset ¶
ResetOffset the offsets of the topic
func (*PushConsumer) Resume ¶
func (c *PushConsumer) Resume()
Resume un-pause the consumer, this operation is thread-safe
func (*PushConsumer) RunningInfo ¶
func (c *PushConsumer) RunningInfo() client.RunningInfo
RunningInfo returns the consumter's running information
func (*PushConsumer) SendBack ¶
SendBack sends the message to the broker, the message will be consumed again after the at least time specified by the delayLevel
func (PushConsumer) SubscribeTopics ¶
func (c PushConsumer) SubscribeTopics() []string
func (PushConsumer) Subscriptions ¶
func (c PushConsumer) Subscriptions() []*client.SubscribeData
func (*PushConsumer) Unlock ¶
func (c *PushConsumer) Unlock(mq message.Queue) error
Unlock unlocks the message queue in the broker
func (PushConsumer) Unsubscribe ¶
func (c PushConsumer) Unsubscribe(topic string)
func (PushConsumer) UpdateOffset ¶
func (PushConsumer) UpdateTopicSubscribe ¶
func (c PushConsumer) UpdateTopicSubscribe(topic string, router *route.TopicRouter)
UpdateTopicSubscribe only updates the subsribed topic
Source Files ¶
- callback.go
- client.go
- consume_service.go
- consumer.go
- filter.go
- flag.go
- fromwhere.go
- model.go
- offsetstore.go
- offsetstore_local.go
- offsetstore_remote.go
- path.go
- processtable.go
- pull.go
- pullresult.go
- push.go
- push_consume_concurrently.go
- push_consume_orderly.go
- push_consume_service.go
- push_pullservice.go
- reblance_stragegy.go
- schedule.go
- suggest.go
- timeout_locker.go
- type.go