batsub

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 12, 2020 License: Apache-2.0 Imports: 6 Imported by: 0

README

Go Report Card cover-badge

Batched PubSub Reader

This package implements batched reading of messages from a pubsub.Subscription. It provides a BatchedSubscription with a ReceiveBatch method to read messages in batches based on a given batch capacity or batching interval.

Usage

capacity := batsub.Capacity(1000)
interval := batsub.FlushInterval(time.Second)

sub := batsub.NewBatchedSubscription(subscription, capacity, interval)
err := sub.ReceiveBatch(ctx, func(ctx context.Context, messages []*pubsub.Message){
    // handle batch of messages using a batch-processing library
    errors := mylib.BatchProcessMessages(messages)
    for i, err := errors {
        if err != nil {
            // TODO: handle error
            continue
        }
        messages[i].Ack()
    }
})

Documentation

Index

Constants

View Source
const (
	BATSUB       = "batsub"
	SUBSCRIPTION = "subscription"
)

metrics labels

Variables

View Source
var DefaultFlushInterval = time.Second

DefaultFlushInterval defines the default time between forced flushes of incomplete batches.

Functions

This section is empty.

Types

type BatchFunc

type BatchFunc func(ctx context.Context, messages []*pubsub.Message)

BatchFunc handles a batch of messages.

type BatchedSubscription

type BatchedSubscription struct {
	Receiver
	// contains filtered or unexported fields
}

BatchedSubscription implements automatic batching based on a defined batch Capacity and a FlushInterval.

func NewBatchedSubscription

func NewBatchedSubscription(receiver Receiver, opt ...Option) *BatchedSubscription

NewBatchedSubscription returns an initalized BatNewBatchedSubscription.

func (*BatchedSubscription) ReceiveBatches

func (sub *BatchedSubscription) ReceiveBatches(ctx context.Context, f BatchFunc) error

ReceiveBatches calls f with the outstanding batched messages from the subscription.

Basic Example:

err := sub.ReceiveBatches(ctx, func(ctx context.Context, messages []*pubsub.Message){
    for i, m := range messages {
        // TODO: handle message
	       m.Ack()
    }
})

Batch Processing Example:

err := sub.ReceiveBatches(ctx, func(ctx context.Context, messages []*pubsub.Message){

    // handle batch of messages using a batch-processing library
    errors := mylib.BatchProcessMessages(messages)
    for i, err := errors {
        if err != nil {
            // TODO: handle error
            continue
        }
        messages[i].Ack()
    }
})

type Capacity added in v0.0.4

type Capacity int

Capacity sets the batcher's capacity.

type FlushInterval added in v0.0.4

type FlushInterval time.Duration

FlushInterval sets the batcher's flush interval.

type Metrics added in v0.0.4

type Metrics struct {
	// State
	PendingMessages *prometheus.GaugeVec

	// Results
	ProcessedMessages *prometheus.CounterVec
	ProcessedBatches  *prometheus.CounterVec

	// Latencies
	ProcessingLatency *prometheus.HistogramVec
}

Metrics stores Batcher Metrics.

func NewMetrics added in v0.0.4

func NewMetrics(prefix ...string) *Metrics

NewMetrics returns prefixed metrics.

func (*Metrics) Register added in v0.0.4

func (m *Metrics) Register(reg prometheus.Registerer)

Register registers all metrics.

type Option added in v0.0.4

type Option interface {
	// contains filtered or unexported methods
}

Option defines a batching option.

type Receiver

type Receiver interface {
	// Receive handles receiving messages from a subscription.
	Receive(ctx context.Context, f func(context.Context, *pubsub.Message)) error
	// ID returns an identifier for a subscription and is used for the metrics.
	ID() string
}

Receiver defines pubsub.Subscription compatible interface with a `Receive` and `ID` method.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL