Documentation ¶
Index ¶
Constants ¶
const ( BATSUB = "batsub" SUBSCRIPTION = "subscription" )
metrics labels
Variables ¶
var DefaultFlushInterval = time.Second
DefaultFlushInterval defines the default time between forced flushes of incomplete batches.
Functions ¶
This section is empty.
Types ¶
type BatchedSubscription ¶
type BatchedSubscription struct { Receiver // contains filtered or unexported fields }
BatchedSubscription implements automatic batching based on a defined batch Capacity and a FlushInterval.
func NewBatchedSubscription ¶
func NewBatchedSubscription(receiver Receiver, opt ...Option) *BatchedSubscription
NewBatchedSubscription returns an initalized BatNewBatchedSubscription.
func (*BatchedSubscription) ReceiveBatches ¶
func (sub *BatchedSubscription) ReceiveBatches(ctx context.Context, f BatchFunc) error
ReceiveBatches calls f with the outstanding batched messages from the subscription.
Basic Example:
err := sub.ReceiveBatches(ctx, func(ctx context.Context, messages []*pubsub.Message){ for i, m := range messages { // TODO: handle message m.Ack() } })
Batch Processing Example:
err := sub.ReceiveBatches(ctx, func(ctx context.Context, messages []*pubsub.Message){
// handle batch of messages using a batch-processing library errors := mylib.BatchProcessMessages(messages) for i, err := errors { if err != nil { // TODO: handle error continue } messages[i].Ack() } })
type FlushInterval ¶ added in v0.0.4
FlushInterval sets the batcher's flush interval.
type Metrics ¶ added in v0.0.4
type Metrics struct { // State PendingMessages *prometheus.GaugeVec // Results ProcessedMessages *prometheus.CounterVec ProcessedBatches *prometheus.CounterVec // Latencies ProcessingLatency *prometheus.HistogramVec }
Metrics stores Batcher Metrics.
func NewMetrics ¶ added in v0.0.4
NewMetrics returns prefixed metrics.
func (*Metrics) Register ¶ added in v0.0.4
func (m *Metrics) Register(reg prometheus.Registerer)
Register registers all metrics.
type Option ¶ added in v0.0.4
type Option interface {
// contains filtered or unexported methods
}
Option defines a batching option.
type Receiver ¶
type Receiver interface { // Receive handles receiving messages from a subscription. Receive(ctx context.Context, f func(context.Context, *pubsub.Message)) error // ID returns an identifier for a subscription and is used for the metrics. ID() string }
Receiver defines pubsub.Subscription compatible interface with a `Receive` and `ID` method.