Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BatchedSubscription ¶
type BatchedSubscription struct { Receiver // contains filtered or unexported fields }
BatchedSubscription implements automatic batching based on a defined batch Capacity and a FlushInterval.
func NewBatchedSubscription ¶
func NewBatchedSubscription(receiver Receiver, capacity int, flushInterval time.Duration) *BatchedSubscription
NewBatchedSubscription returns an initalized BatNewBatchedSubscription.
func (*BatchedSubscription) ReceiveBatch ¶
func (sub *BatchedSubscription) ReceiveBatch(ctx context.Context, f BatchFunc) error
ReceiveBatch calls f with the outstanding batched messages from the subscription.
Basic Example:
err := sub.ReceiveBatch(ctx, func(ctx context.Context, messages []*pubsub.Message){ for i, m := range messages { // TODO: handle message m.Ack() } })
Batch Processing Example:
err := sub.ReceiveBatch(ctx, func(ctx context.Context, messages []*pubsub.Message){
// handle batch of messages using a batch-processing library errors := mylib.BatchProcessMessages(messages) for i, err := errors { if err != nil { // TODO: handle error continue } messages[i].Ack() } })
Click to show internal directories.
Click to hide internal directories.