Documentation ¶
Index ¶
- Constants
- func IsTagType(exp string) bool
- func NewConsumer(config ConsumerOption) *defaultPullConsumer
- type AllocateStrategy
- type ConsumeFromWhere
- type ConsumeMessageContext
- type ConsumeResult
- type ConsumeType
- type ConsumerOption
- type ExpressionType
- type MessageModel
- type MessageSelector
- type OffsetStore
- type PullConsumer
- type PullRequest
- type PushConsumer
Constants ¶
const ( /** * <ul> * Keywords: * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li> * </ul> * <p/> * <ul> * Data type: * <li>Boolean, like: TRUE, FALSE</li> * <li>String, like: 'abc'</li> * <li>Decimal, like: 123</li> * <li>Float number, like: 3.1415</li> * </ul> * <p/> * <ul> * Grammar: * <li>{@code AND, OR}</li> * <li>{@code >, >=, <, <=, =}</li> * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li> * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li> * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this operation only support String type.</li> * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is null, or not.</li> * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, or false.</li> * </ul> * <p/> * <p> * Example: * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE) * </p> */ SQL92 = ExpressionType("SQL92") /** * Only support or operation such as * "tag1 || tag2 || tag3", <br> * If null or * expression, meaning subscribe all. */ TAG = ExpressionType("TAG") )
const ( // An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be // specified. // // If any consumer is alive in a machine room, the message queue of the broker which is deployed in the same machine // should only be allocated to those. Otherwise, those message queues can be shared along all consumers since there are // no alive consumer to monopolize them. StrategyMachineNearby = AllocateStrategy("MachineNearby") // Average Hashing queue algorithm StrategyAveragely = AllocateStrategy("Averagely") // Cycle average Hashing queue algorithm StrategyAveragelyCircle = AllocateStrategy("AveragelyCircle") // Use Message Queue specified StrategyConfig = AllocateStrategy("Config") // Computer room Hashing queue algorithm, such as Alipay logic room StrategyMachineRoom = AllocateStrategy("MachineRoom") // Consistent Hashing queue algorithm StrategyConsistentHash = AllocateStrategy("ConsistentHash") )
Variables ¶
This section is empty.
Functions ¶
func NewConsumer ¶
func NewConsumer(config ConsumerOption) *defaultPullConsumer
Types ¶
type AllocateStrategy ¶
type AllocateStrategy string
Strategy Algorithm for message allocating between consumers
type ConsumeFromWhere ¶
type ConsumeFromWhere int
Consuming point on consumer booting. </p>
There are three consuming points: <ul> <li> <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously. If it were a newly booting up consumer client, according aging of the consumer group, there are two cases: <ol> <li> if the consumer group is created so recently that the earliest message being subscribed has yet expired, which means the consumer group represents a lately launched business, consuming will start from the very beginning; </li> <li> if the earliest message being subscribed has expired, consuming will start from the latest messages, meaning messages born prior to the booting timestamp would be ignored. </li> </ol> </li> <li> <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from earliest messages available. </li> <li> <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified timestamp, which means messages born prior to {@link #consumeTimestamp} will be ignored </li> </ul>
const ( ConsumeFromLastOffset ConsumeFromWhere = iota ConsumeFromFirstOffset ConsumeFromTimestamp )
type ConsumeMessageContext ¶
type ConsumeMessageContext struct {
// contains filtered or unexported fields
}
type ConsumeResult ¶
type ConsumeResult int
In most scenarios, this is the mostly recommended usage to consume messages.
Technically speaking, this push client is virtually a wrapper of the underlying pull service. Specifically, on arrival of messages pulled from brokers, it roughly invokes the registered callback handler to feed the messages.
See quick start/Consumer in the example module for a typical usage.
<strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe.
const ( Mb = 1024 * 1024 ConsumeSuccess ConsumeResult = iota ConsumeRetryLater )
type ConsumeType ¶
type ConsumeType string
type ConsumerOption ¶
type ConsumerOption struct { kernel.ClientOption NameServerAddr string /** * Backtracking consumption time with second precision. Time format is * 20131223171201<br> * Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br> * Default backtracking consumption time Half an hour ago. */ ConsumeTimestamp string // The socket timeout in milliseconds ConsumerPullTimeout time.Duration // Concurrently max span offset.it has no effect on sequential consumption ConsumeConcurrentlyMaxSpan int // Flow control threshold on queue level, each message queue will cache at most 1000 messages by default, // Consider the {PullBatchSize}, the instantaneous value may exceed the limit PullThresholdForQueue int64 // Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default, // Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit // // The size of a message only measured by message body, so it's not accurate PullThresholdSizeForQueue int // Flow control threshold on topic level, default value is -1(Unlimited) // // The value of {@code pullThresholdForQueue} will be overwrote and calculated based on // {@code pullThresholdForTopic} if it is't unlimited // // For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer, // then pullThresholdForQueue will be set to 100 PullThresholdForTopic int // Limit the cached message size on topic level, default value is -1 MiB(Unlimited) // // The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on // {@code pullThresholdSizeForTopic} if it is't unlimited // // For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are // assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB PullThresholdSizeForTopic int // Message pull Interval PullInterval time.Duration // Batch consumption size ConsumeMessageBatchMaxSize int // Batch pull size PullBatchSize int32 // Whether update subscription relationship when every pull PostSubscriptionWhenPull bool // Max re-consume times. -1 means 16 times. // // If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion // queue waiting. MaxReconsumeTimes int // Suspending pulling time for cases requiring slow pulling like flow-control scenario. SuspendCurrentQueueTimeMillis time.Duration // Maximum amount of time a message may block the consuming thread. ConsumeTimeout time.Duration ConsumerModel MessageModel Strategy AllocateStrategy ConsumeOrderly bool FromWhere ConsumeFromWhere }
type ExpressionType ¶
type ExpressionType string
type MessageModel ¶
type MessageModel int
Message model defines the way how messages are delivered to each consumer clients. </p>
RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with the same {@link #consumerGroup} would only consume shards of the messages subscribed, which achieves load balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages separately. </p>
This field defaults to clustering.
const ( BroadCasting MessageModel = iota Clustering )
func (MessageModel) String ¶
func (mode MessageModel) String() string
type MessageSelector ¶
type MessageSelector struct { Type ExpressionType Expression string }
type OffsetStore ¶
type OffsetStore interface {
// contains filtered or unexported methods
}
func NewLocalFileOffsetStore ¶
func NewLocalFileOffsetStore(clientID, group string) OffsetStore
func NewRemoteOffsetStore ¶
func NewRemoteOffsetStore(group string, client *kernel.RMQClient) OffsetStore
type PullConsumer ¶
type PullConsumer interface { Start() Shutdown() Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*kernel.PullResult, error) }
type PullRequest ¶
type PullRequest struct {
// contains filtered or unexported fields
}
func (*PullRequest) String ¶
func (pr *PullRequest) String() string
type PushConsumer ¶
type PushConsumer interface { Start() error Shutdown() Subscribe(topic string, selector MessageSelector, f func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult, error)) error }
func NewPushConsumer ¶
func NewPushConsumer(consumerGroup string, opt ConsumerOption) (PushConsumer, error)