consumer

package
v0.0.0-...-dbcf9a4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2020 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	/**
	 * <ul>
	 * Keywords:
	 * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
	 * </ul>
	 * <p/>
	 * <ul>
	 * Data type:
	 * <li>Boolean, like: TRUE, FALSE</li>
	 * <li>String, like: 'abc'</li>
	 * <li>Decimal, like: 123</li>
	 * <li>Float number, like: 3.1415</li>
	 * </ul>
	 * <p/>
	 * <ul>
	 * Grammar:
	 * <li>{@code AND, OR}</li>
	 * <li>{@code >, >=, <, <=, =}</li>
	 * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
	 * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
	 * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this operation only support String type.</li>
	 * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is null, or not.</li>
	 * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, or false.</li>
	 * </ul>
	 * <p/>
	 * <p>
	 * Example:
	 * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
	 * </p>
	 */
	SQL92 = ExpressionType("SQL92")

	/**
	 * Only support or operation such as
	 * "tag1 || tag2 || tag3", <br>
	 * If null or * expression, meaning subscribe all.
	 */
	TAG = ExpressionType("TAG")
)
View Source
const (
	// An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be
	// specified.
	//
	// If any consumer is alive in a machine room, the message queue of the broker which is deployed in the same machine
	// should only be allocated to those. Otherwise, those message queues can be shared along all consumers since there are
	// no alive consumer to monopolize them.
	StrategyMachineNearby = AllocateStrategy("MachineNearby")

	// Average Hashing queue algorithm
	StrategyAveragely = AllocateStrategy("Averagely")

	// Cycle average Hashing queue algorithm
	StrategyAveragelyCircle = AllocateStrategy("AveragelyCircle")

	// Use Message Queue specified
	StrategyConfig = AllocateStrategy("Config")

	// Computer room Hashing queue algorithm, such as Alipay logic room
	StrategyMachineRoom = AllocateStrategy("MachineRoom")

	// Consistent Hashing queue algorithm
	StrategyConsistentHash = AllocateStrategy("ConsistentHash")
)

Variables

This section is empty.

Functions

func IsTagType

func IsTagType(exp string) bool

func NewConsumer

func NewConsumer(config ConsumerOption) *defaultPullConsumer

Types

type AllocateStrategy

type AllocateStrategy string

Strategy Algorithm for message allocating between consumers

type ConsumeFromWhere

type ConsumeFromWhere int

Consuming point on consumer booting. </p>

There are three consuming points: <ul> <li> <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously. If it were a newly booting up consumer client, according aging of the consumer group, there are two cases: <ol> <li> if the consumer group is created so recently that the earliest message being subscribed has yet expired, which means the consumer group represents a lately launched business, consuming will start from the very beginning; </li> <li> if the earliest message being subscribed has expired, consuming will start from the latest messages, meaning messages born prior to the booting timestamp would be ignored. </li> </ol> </li> <li> <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from earliest messages available. </li> <li> <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified timestamp, which means messages born prior to {@link #consumeTimestamp} will be ignored </li> </ul>

const (
	ConsumeFromLastOffset ConsumeFromWhere = iota
	ConsumeFromFirstOffset
	ConsumeFromTimestamp
)

type ConsumeMessageContext

type ConsumeMessageContext struct {
	// contains filtered or unexported fields
}

type ConsumeResult

type ConsumeResult int

In most scenarios, this is the mostly recommended usage to consume messages.

Technically speaking, this push client is virtually a wrapper of the underlying pull service. Specifically, on arrival of messages pulled from brokers, it roughly invokes the registered callback handler to feed the messages.

See quick start/Consumer in the example module for a typical usage.

<strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe.

const (
	Mb                           = 1024 * 1024
	ConsumeSuccess ConsumeResult = iota
	ConsumeRetryLater
)

type ConsumeType

type ConsumeType string

type ConsumerOption

type ConsumerOption struct {
	kernel.ClientOption
	NameServerAddr string

	/**
	 * Backtracking consumption time with second precision. Time format is
	 * 20131223171201<br>
	 * Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
	 * Default backtracking consumption time Half an hour ago.
	 */
	ConsumeTimestamp string

	// The socket timeout in milliseconds
	ConsumerPullTimeout time.Duration

	// Concurrently max span offset.it has no effect on sequential consumption
	ConsumeConcurrentlyMaxSpan int

	// Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,
	// Consider the {PullBatchSize}, the instantaneous value may exceed the limit
	PullThresholdForQueue int64

	// Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
	// Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
	//
	// The size of a message only measured by message body, so it's not accurate
	PullThresholdSizeForQueue int

	// Flow control threshold on topic level, default value is -1(Unlimited)
	//
	// The value of {@code pullThresholdForQueue} will be overwrote and calculated based on
	// {@code pullThresholdForTopic} if it is't unlimited
	//
	// For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer,
	// then pullThresholdForQueue will be set to 100
	PullThresholdForTopic int

	// Limit the cached message size on topic level, default value is -1 MiB(Unlimited)
	//
	// The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on
	// {@code pullThresholdSizeForTopic} if it is't unlimited
	//
	// For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are
	// assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB
	PullThresholdSizeForTopic int

	// Message pull Interval
	PullInterval time.Duration

	// Batch consumption size
	ConsumeMessageBatchMaxSize int

	// Batch pull size
	PullBatchSize int32

	// Whether update subscription relationship when every pull
	PostSubscriptionWhenPull bool

	// Max re-consume times. -1 means 16 times.
	//
	// If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion
	// queue waiting.
	MaxReconsumeTimes int

	// Suspending pulling time for cases requiring slow pulling like flow-control scenario.
	SuspendCurrentQueueTimeMillis time.Duration

	// Maximum amount of time a message may block the consuming thread.
	ConsumeTimeout time.Duration

	ConsumerModel  MessageModel
	Strategy       AllocateStrategy
	ConsumeOrderly bool
	FromWhere      ConsumeFromWhere
}

type ExpressionType

type ExpressionType string

type MessageModel

type MessageModel int

Message model defines the way how messages are delivered to each consumer clients. </p>

RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with the same {@link #consumerGroup} would only consume shards of the messages subscribed, which achieves load balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages separately. </p>

This field defaults to clustering.

const (
	BroadCasting MessageModel = iota
	Clustering
)

func (MessageModel) String

func (mode MessageModel) String() string

type MessageSelector

type MessageSelector struct {
	Type       ExpressionType
	Expression string
}

type OffsetStore

type OffsetStore interface {
	// contains filtered or unexported methods
}

func NewLocalFileOffsetStore

func NewLocalFileOffsetStore(clientID, group string) OffsetStore

func NewRemoteOffsetStore

func NewRemoteOffsetStore(group string, client *kernel.RMQClient) OffsetStore

type PullConsumer

type PullConsumer interface {
	Start()
	Shutdown()
	Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*kernel.PullResult, error)
}

type PullRequest

type PullRequest struct {
	// contains filtered or unexported fields
}

func (*PullRequest) String

func (pr *PullRequest) String() string

type PushConsumer

type PushConsumer interface {
	Start() error
	Shutdown()
	Subscribe(topic string, selector MessageSelector,
		f func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult, error)) error
}

func NewPushConsumer

func NewPushConsumer(consumerGroup string, opt ConsumerOption) (PushConsumer, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL