batsub

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2020 License: Apache-2.0 Imports: 5 Imported by: 0

README

Go Report Card cover-badge

Batched PubSub Reader

This package implements batched reading of messages from a pubsub.Subscription. It provides a BatchedSubscription with a ReceiveBatch method to read messages in batches based on a given batch capacity or batching interval.

Usage

capacity := 100
interval := time.Second

sub := batsub.NewBatchedSubscription(subscription, capacity, interval)
err := sub.ReceiveBatch(ctx, func(ctx context.Context, messages []*pubsub.Message){
    // handle batch of messages using a batch-processing library
    errors := mylib.BatchProcessMessages(messages)
    for i, err := errors {
        if err != nil {
            // TODO: handle error
            continue
        }
        messages[i].Ack()
    }
})

Also see the PubSub to BigQuery example.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchFunc

type BatchFunc func(ctx context.Context, messages []*pubsub.Message)

BatchFunc handles a batch of messages.

type BatchedSubscription

type BatchedSubscription struct {
	Receiver
	// contains filtered or unexported fields
}

BatchedSubscription implements automatic batching based on a defined batch Capacity and a FlushInterval.

func NewBatchedSubscription

func NewBatchedSubscription(receiver Receiver, capacity int, flushInterval time.Duration) *BatchedSubscription

NewBatchedSubscription returns an initalized BatNewBatchedSubscription.

func (*BatchedSubscription) ReceiveBatch

func (sub *BatchedSubscription) ReceiveBatch(ctx context.Context, f BatchFunc) error

ReceiveBatch calls f with the outstanding batched messages from the subscription.

Basic Example:

err := sub.ReceiveBatch(ctx, func(ctx context.Context, messages []*pubsub.Message){
    for i, m := range messages {
        // TODO: handle message
	       m.Ack()
    }
})

Batch Processing Example:

err := sub.ReceiveBatch(ctx, func(ctx context.Context, messages []*pubsub.Message){

    // handle batch of messages using a batch-processing library
    errors := mylib.BatchProcessMessages(messages)
    for i, err := errors {
        if err != nil {
            // TODO: handle error
            continue
        }
        messages[i].Ack()
    }
})

type Receiver

type Receiver interface {
	Receive(ctx context.Context, f func(context.Context, *pubsub.Message)) error
}

Receiver defines a pubsub compatible `Receive` func.

Directories

Path Synopsis
_examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL