batchproducer

package
v0.0.0-...-8de9069 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 6, 2019 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

View Source
const MaxKinesisBatchSize = 500

MaxKinesisBatchSize is the maximum number of records that Kinesis accepts in a request

Variables

View Source
var (
	// ErrAlreadyStarted is returned by Start if the Producer is already started.
	ErrAlreadyStarted = errors.New("already started")

	// ErrAlreadyStopped is returned by Stop if the Producer is already stopped.
	ErrAlreadyStopped = errors.New("already stopped")
)
View Source
var DefaultConfig = Config{
	AddBlocksWhenBufferFull: false,
	BufferSize:              10000,
	FlushInterval:           1 * time.Second,
	BatchSize:               10,
	MaxAttemptsPerRecord:    10,
	StatInterval:            1 * time.Second,
	Logger:                  log.New(os.Stderr, "", log.LstdFlags),
}

DefaultConfig is provided for convenience; if you have no specific preferences on how you’d like to configure your Producer you can pass this into New. The default value of Logger is the same as the standard logger in "log" : `log.New(os.Stderr, "", log.LstdFlags)`.

Functions

This section is empty.

Types

type BatchProducerLogger

type BatchProducerLogger interface {
	Printf(format string, args ...interface{})
}

type BatchingKinesisClient

type BatchingKinesisClient interface {
	PutRecords(args *kinesis.RequestArgs) (resp *kinesis.PutRecordsResp, err error)
}

BatchingKinesisClient is a subset of KinesisClient to ease mocking.

type Config

type Config struct {
	// AddBlocksWhenBufferFull controls the behavior of Add when the buffer is full. If true, Add
	// will block. If false, Add will return an error. This enables integrating applications to
	// decide how they want to handle a full buffer e.g. so they can discard records if there’s
	// a problem.
	AddBlocksWhenBufferFull bool

	// BatchSize controls the maximum size of the batches sent to Kinesis. If the number of records
	// in the buffer hits this size, a batch of this size will be sent at that time, regardless of
	// whether FlushInterval has a value or not.
	BatchSize int

	// BufferSize is the size of the buffer that stores records before they are sent to the Kinesis
	// stream. If when Add is called the number of records in the buffer is >= bufferSize then
	// Add will either block or return an error, depending on the value of AddBlocksWhenBufferFull.
	BufferSize int

	// FlushInterval controls how often the buffer is flushed to Kinesis. If nonzero, then every
	// time this interval occurs, if there are any records in the buffer, they will be flushed,
	// no matter how few there are. The size of the batch that’s flushed may be as small as 1 but
	// will be no larger than BatchSize.
	FlushInterval time.Duration

	// The logger used by the Producer.
	Logger BatchProducerLogger

	// MaxAttemptsPerRecord defines how many attempts should be made for each record before it is
	// dropped. You probably want this higher than the init default of 0.
	MaxAttemptsPerRecord int

	// StatInterval will be used to make a *best effort* attempt to send stats *approximately*
	// when this interval elapses. There’s no guarantee, however, since the main goroutine is
	// used to send the stats and therefore there may be some skew.
	StatInterval time.Duration

	// StatReceiver will have its Receive method called approximately every StatInterval.
	StatReceiver StatReceiver
}

Config is a collection of config values for a Producer

type Producer

type Producer interface {
	// Start starts the main goroutine. No need to call it using `go`.
	Start() error

	// Stop signals the main goroutine to finish. Once this is called, Add will immediately start
	// returning errors (unless and until Start is called again).
	Stop() error

	// Add might block if the BatchProducer has a buffer and the buffer is full.
	// In order to prevent filling the buffer and eventually blocking indefinitely,
	// Add will fail and return an error if the BatchProducer is stopped or stopping. Note
	// that it’s critical to check the return value because the BatchProducer could have
	// died in the background due to a panic (or something).
	Add(data []byte, partitionKey string) error

	// Flush stops the Producer using Stop and attempts to send all buffered records to Kinesis as
	// fast as possible with batches of size 500 (the maximum). It blocks until either all records
	// are sent or the timeout expires. It returns the number of records still remaining in the
	// buffer or (possibly) an error. (It doesn’t currently return errors but that is in the
	// signature for future-proofing.) A timeout value of 0 means no timeout.
	// If Flush finishes sending all records without timing out, and sendStats is true, it will
	// cause a single final StatsBatch to be sent to the StatsReceiver in Config, if set.
	Flush(timeout time.Duration, sendStats bool) (sent int, remaining int, err error)
}

Producer collects records individually and then sends them to Kinesis in batches in the background using PutRecords, with retries. A Producer will do nothing until Start is called.

func New

func New(
	client BatchingKinesisClient,
	streamName string,
	config Config,
) (Producer, error)

New creates and returns a BatchProducer that will do nothing until its Start method is called. Once it is started, it will flush a batch to Kinesis whenever either the flushInterval occurs (if flushInterval > 0) or the batchSize is reached, whichever happens first.

type StatReceiver

type StatReceiver interface {
	// Receive will be called by the main Producer goroutine so it will block all batches from being
	// sent, so make sure it is either very fast or never blocks at all!
	Receive(StatsBatch)
}

StatReceiver defines an object that can accept stats.

type StatsBatch

type StatsBatch struct {
	// Moment-in-time stats
	BufferSize int

	// Cumulative stats
	KinesisErrorsSinceLastStat           int
	RecordsSentSuccessfullySinceLastStat int
	RecordsDroppedSinceLastStat          int
}

StatsBatch is a kind of a snapshot of activity and happenings. Some of its fields represent "moment-in-time" values e.g. BufferSize is the size of the buffer at the moment the StatsBatch is sent. Other fields are cumulative since the last StatsBatch, i.e. ErrorsSinceLastStat.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL