consumer

package
v0.0.0-...-ba2213e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2019 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumeConcurrentlyContext

type ConsumeConcurrentlyContext struct {
	MessageQueue *message.MessageQueue
	// 消费失败延迟消费级别
	DelayLevelWhenNextConsume int
	AckIndex                  int
}

func NewConsumeConcurrentlyContext

func NewConsumeConcurrentlyContext(mq *message.MessageQueue) *ConsumeConcurrentlyContext

type MQConsumerInner

type MQConsumerInner interface {
	// Set<SubscriptionData>
	Subscriptions() set.Set
	// Set<MessageQueue>
	UpdateTopicSubscribeInfo(topic string, info set.Set)
	// 组名称
	GroupName() string
	// 消息类型
	MessageModel() heartbeat.MessageModel
	// 消费类型
	ConsumeType() heartbeat.ConsumeType
	// 消费位置
	ConsumeFromWhere() heartbeat.ConsumeFromWhere
	IsUnitMode() bool
	// 是否需要更新
	IsSubscribeTopicNeedUpdate(topic string) bool
	// 持久化offset
	PersistConsumerOffset()
	// 负载
	DoRebalance()
}

type MQPullConsumer

type MQPullConsumer interface {
	// 开启
	Start()
	// 关闭
	Shutdown()
}

type MQPushConsumer

type MQPushConsumer interface {
	// 开启
	Start()
	// 关闭
	Shutdown()
}

type MessageListenerConcurrently

type MessageListenerConcurrently interface {
	ConsumeMessage(msgs []*message.MessageExt, context *ConsumeConcurrentlyContext) listener.ConsumeConcurrentlyStatus
}

type ProcessQueue

type ProcessQueue struct {
	Dropped           bool
	LastPullTimestamp int64
	PullMaxIdleTime   int64
	MsgCount          int64
	MsgTreeMap        *TreeMap
	QueueOffsetMax    int64
	Consuming         bool
	MsgAccCnt         int64
	// contains filtered or unexported fields
}

func NewProcessQueue

func NewProcessQueue() *ProcessQueue

func (*ProcessQueue) GetMaxSpan

func (pq *ProcessQueue) GetMaxSpan() int64

func (*ProcessQueue) IsPullExpired

func (pq *ProcessQueue) IsPullExpired() bool

func (*ProcessQueue) PutMessage

func (pq *ProcessQueue) PutMessage(msgs []*message.MessageExt) bool

func (*ProcessQueue) RemoveMessage

func (pq *ProcessQueue) RemoveMessage(msgs []*message.MessageExt) int64

func (*ProcessQueue) ToString

func (pq *ProcessQueue) ToString() string

type PullRequest

type PullRequest struct {
	ConsumerGroup string
	MessageQueue  *message.MessageQueue
	ProcessQueue  *ProcessQueue
	NextOffset    int64
}

type PullResult

type PullResult struct {
	PullStatus      PullStatus
	NextBeginOffset int64
	MinOffset       int64
	MaxOffset       int64
	MsgFoundList    []*message.MessageExt
}

type PullStatus

type PullStatus int
const (
	// Founded
	FOUND PullStatus = iota
	// No new message can be pull
	NO_NEW_MSG
	// Filtering results can not match
	NO_MATCHED_MSG
	// Illegal offset,may be too big or too small
	OFFSET_ILLEGAL
)

func (PullStatus) String

func (status PullStatus) String() string

type TreeMap

type TreeMap struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewTreeMap

func NewTreeMap() *TreeMap

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL