consumer

package
v2.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2019 License: MIT Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ExprTypeTag see client.ExprTypeTag
	ExprTypeTag = client.ExprTypeTag
	// ExprTypeSQL92 see client.ExprTypeSQL92
	ExprTypeSQL92 = client.ExprTypeSQL92
)
View Source
const (
	PullCommitOffset = 1 << iota
	PullSuspend
	PullSubscribe
	PullClassFilter
)

pull flags

View Source
const (
	// ConsumeFromLastOffset from the offset stored, if not found from the maxoffset
	ConsumeFromLastOffset fromWhere = iota
	// ConsumeFromFirstOffset from the offset stored, if not found from the zeor
	ConsumeFromFirstOffset
	// ConsumeFromTimestamp from the offset specified by the timestamp
	ConsumeFromTimestamp
)
View Source
const (
	ReadOffsetFromMemory = iota
	ReadOffsetFromStore
	ReadOffsetMemoryFirstThenStore
)

read offset type

View Source
const (
	PullTimeDelayWhenException   = time.Second * 3
	PullTimeDelayWhenFlowControl = time.Millisecond * 50
	PullTimeDelayWhenPause       = time.Second
	BrokerSuspendMaxTime         = time.Second * 15
	ConsumerTimeoutWhenSuspend   = time.Second * 30
)

times defined

Variables

This section is empty.

Functions

func ClearCommitOffset

func ClearCommitOffset(flag int32) int32

ClearCommitOffset clears the commitoffset flag

func Home

func Home() (string, error)

Home returns the home directory for the executing user.

This uses an OS-specific method for discovering the home directory. An error is returned if a home directory cannot be detected.

Types

type Averagely

type Averagely struct{}

Averagely reblance average

func (*Averagely) Assign

func (a *Averagely) Assign(group, curClientID string, clientIDs []string, queues []*message.Queue) (
	[]*message.Queue, error,
)

Assign assign averagely

func (*Averagely) Name

func (a *Averagely) Name() string

Name return reblance's name

type ConcurrentlyConsumer

type ConcurrentlyConsumer interface {
	Consume(messages []*message.Ext, ctx *ConcurrentlyContext) ConsumeConcurrentlyStatus
}

ConcurrentlyConsumer consumer consumes the messages concurrently

type ConcurrentlyContext

type ConcurrentlyContext struct {
	MessageQueue *message.Queue
	// message cosnume retry strategy
	// -1, no retry, put into DLQ directly
	// 0, broker control retry frequency
	// >0, client control retry frequency
	DelayLevelWhenNextConsume int
	// the index of the message, any message with the index greater than this one is consumed failed
	AckIndex int
}

ConcurrentlyContext consume concurrently context

type Config

type Config struct {
	rocketmq.Client
	ReblanceInterval  time.Duration
	MessageModel      Model
	Typ               Type
	FromWhere         fromWhere
	MaxReconsumeTimes int
}

Config the configuration of consumer

type ConsumeConcurrentlyStatus

type ConsumeConcurrentlyStatus int

ConsumeConcurrentlyStatus consume concurrently result

const (
	ConcurrentlySuccess ConsumeConcurrentlyStatus = iota
	ReconsumeLater
)

predefined consume concurrently result

func (ConsumeConcurrentlyStatus) String

func (s ConsumeConcurrentlyStatus) String() string

type ConsumeOrderlyStatus

type ConsumeOrderlyStatus int

ConsumeOrderlyStatus consume orderly result

const (
	OrderlySuccess ConsumeOrderlyStatus = iota
	SuspendCurrentQueueMoment
)

predefined consume concurrently result

type ExprType

type ExprType = client.ExprType

ExprType the filter type of the subcription

type MessageQueueChanger

type MessageQueueChanger interface {
	Change(topic string, all, divided []*message.Queue)
}

MessageQueueChanger the callback when the consume queue is changed

type Model

type Model int8

Model consume message model type

const (
	// BroadCasting consumed by all consumer
	BroadCasting Model = iota
	// Clustering consumed by different consumer
	Clustering
)

func (Model) String

func (m Model) String() string

type OrderlyConsumer

type OrderlyConsumer interface {
	Consume(messages []*message.Ext, ctx *OrderlyContext) ConsumeOrderlyStatus
}

OrderlyConsumer consume orderly logic

type OrderlyContext

type OrderlyContext struct {
	Queue *message.Queue

	SupsendCurrentQueueTime time.Duration
	// contains filtered or unexported fields
}

OrderlyContext consume orderly context

type PullConsumer

type PullConsumer struct {
	// contains filtered or unexported fields
}

PullConsumer consumes the messages using pulling method

func NewPullConsumer

func NewPullConsumer(group string, namesrvAddrs []string, logger log.Logger) *PullConsumer

NewPullConsumer creates consumer using the pull model

func (PullConsumer) ConsumeFromWhere

func (c PullConsumer) ConsumeFromWhere() string

func (*PullConsumer) ConsumeMessageDirectly

func (c *PullConsumer) ConsumeMessageDirectly(
	msg *message.Ext, broker string,
) (
	r client.ConsumeMessageDirectlyResult, err error,
)

ConsumeMessageDirectly NOOPS

func (PullConsumer) Group

func (c PullConsumer) Group() string

func (PullConsumer) Model

func (c PullConsumer) Model() string

func (PullConsumer) NeedUpdateTopicSubscribe

func (c PullConsumer) NeedUpdateTopicSubscribe(topic string) bool

func (PullConsumer) PersistOffset

func (c PullConsumer) PersistOffset()

func (*PullConsumer) PullSync

func (c *PullConsumer) PullSync(q *message.Queue, expr string, offset int64, maxCount int) (
	*PullResult, error,
)

PullSync pull the messages sync

func (*PullConsumer) PullSyncBlockIfNotFound

func (c *PullConsumer) PullSyncBlockIfNotFound(
	q *message.Queue, expr string, offset int64, maxCount int,
) (
	*PullResult, error,
)

PullSyncBlockIfNotFound pull the messages sync and block when no message

func (PullConsumer) QueryConsumerOffset

func (c PullConsumer) QueryConsumerOffset(q *message.Queue) (int64, error)

func (PullConsumer) QueryMaxOffset

func (c PullConsumer) QueryMaxOffset(q *message.Queue) (int64, error)

func (PullConsumer) ReblanceQueue

func (c PullConsumer) ReblanceQueue()

ReblanceQueue reblances the consume queues between the different consumers

func (*PullConsumer) Register

func (c *PullConsumer) Register(topics []string, listener MessageQueueChanger)

Register register the message queue changed event of the topics it must be called before calling Start function, otherwise the topic is not subscribe

func (*PullConsumer) ResetOffset

func (c *PullConsumer) ResetOffset(topic string, offsets map[message.Queue]int64) error

ResetOffset NOOPS

func (*PullConsumer) RunningInfo

func (c *PullConsumer) RunningInfo() client.RunningInfo

RunningInfo returns the consumter's running information

func (PullConsumer) SendBack

func (c PullConsumer) SendBack(m *message.Ext, delayLevel int, group, brokerName string) error

SendBack send back message

func (*PullConsumer) Subscribe

func (c *PullConsumer) Subscribe(topic string)

Subscribe subscribe the topic dynamic

func (PullConsumer) SubscribeTopics

func (c PullConsumer) SubscribeTopics() []string

func (PullConsumer) Subscriptions

func (c PullConsumer) Subscriptions() []*client.SubscribeData

func (PullConsumer) Type

func (c PullConsumer) Type() string

func (PullConsumer) UnitMode

func (c PullConsumer) UnitMode() bool

func (PullConsumer) Unsubscribe

func (c PullConsumer) Unsubscribe(topic string)

func (PullConsumer) UpdateOffset

func (c PullConsumer) UpdateOffset(q *message.Queue, offset int64, oneway bool) error

func (PullConsumer) UpdateTopicSubscribe

func (c PullConsumer) UpdateTopicSubscribe(topic string, router *route.TopicRouter)

UpdateTopicSubscribe only updates the subsribed topic

type PullResult

type PullResult struct {
	NextBeginOffset int64
	MinOffset       int64
	MaxOffset       int64
	Messages        []*message.Ext
	Status          PullStatus
}

PullResult pull result

func (*PullResult) String

func (pr *PullResult) String() string

type PullStatus

type PullStatus int8

PullStatus pull status

const (
	// Found find the message
	Found PullStatus = iota
	// NoNewMessage no message in the broker
	NoNewMessage
	// NoMatchedMessage no matched with expr
	NoMatchedMessage
	// OffsetIllegal illegal offset
	OffsetIllegal
)

func (PullStatus) String

func (s PullStatus) String() string

type PushConsumer

type PushConsumer struct {
	// contains filtered or unexported fields
}

PushConsumer the consumer with push model

func NewConcurrentlyConsumer

func NewConcurrentlyConsumer(
	group string, namesrvAddrs []string, userConsumer ConcurrentlyConsumer, logger log.Logger,
) (
	c *PushConsumer, err error,
)

NewConcurrentlyConsumer creates the push consumer consuming the message concurrently

func NewOrderlyConsumer

func NewOrderlyConsumer(
	group string, namesrvAddrs []string, userConsumer OrderlyConsumer, logger log.Logger,
) (
	c *PushConsumer, err error,
)

NewOrderlyConsumer creates the push consumer consuming the message orderly

func (PushConsumer) ConsumeFromWhere

func (c PushConsumer) ConsumeFromWhere() string

func (*PushConsumer) ConsumeMessageDirectly

func (c *PushConsumer) ConsumeMessageDirectly(
	msg *message.Ext, broker string,
) (
	r client.ConsumeMessageDirectlyResult, err error,
)

ConsumeMessageDirectly consume the specified message notified by the broker

func (PushConsumer) Group

func (c PushConsumer) Group() string

func (*PushConsumer) Lock

func (c *PushConsumer) Lock(broker string, mqs []message.Queue) ([]message.Queue, error)

Lock locks the message queues in the broker

func (PushConsumer) Model

func (c PushConsumer) Model() string

func (PushConsumer) NeedUpdateTopicSubscribe

func (c PushConsumer) NeedUpdateTopicSubscribe(topic string) bool

func (*PushConsumer) Pause

func (c *PushConsumer) Pause()

Pause pause the consumer, this operation is thread-safe

func (PushConsumer) PersistOffset

func (c PushConsumer) PersistOffset()

func (PushConsumer) QueryConsumerOffset

func (c PushConsumer) QueryConsumerOffset(q *message.Queue) (int64, error)

func (PushConsumer) QueryMaxOffset

func (c PushConsumer) QueryMaxOffset(q *message.Queue) (int64, error)

func (PushConsumer) ReblanceQueue

func (c PushConsumer) ReblanceQueue()

ReblanceQueue reblances the consume queues between the different consumers

func (*PushConsumer) ResetOffset

func (c *PushConsumer) ResetOffset(topic string, offsets map[message.Queue]int64) error

ResetOffset the offsets of the topic

func (*PushConsumer) Resume

func (c *PushConsumer) Resume()

Resume un-pause the consumer, this operation is thread-safe

func (*PushConsumer) RunningInfo

func (c *PushConsumer) RunningInfo() client.RunningInfo

RunningInfo returns the consumter's running information

func (*PushConsumer) SendBack

func (c *PushConsumer) SendBack(m *message.Ext, delayLevel int, broker string) error

SendBack sends the message to the broker, the message will be consumed again after the at least time specified by the delayLevel

func (PushConsumer) Subscribe

func (c PushConsumer) Subscribe(topic string, expr string)

func (PushConsumer) SubscribeTopics

func (c PushConsumer) SubscribeTopics() []string

func (PushConsumer) Subscriptions

func (c PushConsumer) Subscriptions() []*client.SubscribeData

func (PushConsumer) Type

func (c PushConsumer) Type() string

func (PushConsumer) UnitMode

func (c PushConsumer) UnitMode() bool

func (*PushConsumer) Unlock

func (c *PushConsumer) Unlock(mq message.Queue) error

Unlock unlocks the message queue in the broker

func (PushConsumer) Unsubscribe

func (c PushConsumer) Unsubscribe(topic string)

func (PushConsumer) UpdateOffset

func (c PushConsumer) UpdateOffset(q *message.Queue, offset int64, oneway bool) error

func (PushConsumer) UpdateTopicSubscribe

func (c PushConsumer) UpdateTopicSubscribe(topic string, router *route.TopicRouter)

UpdateTopicSubscribe only updates the subsribed topic

type Type

type Type int8

Type consume type

const (
	// Pull cosume message by pulling
	Pull Type = iota
	// Push cosume message by pushing
	Push
)

func (Type) String

func (t Type) String() string

Directories

Path Synopsis
internel

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL