batcher

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2023 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package batcher collects values and dispatches them in batches for amortizing processing cost or limiting processing concurrency.

By default, a new non-empty batch is dispatched immediately after a new value is received. Exception: If the previous batch is still being processed, a new batch will be dispatched once the previous batch has been processed. Thus, each batch typically has only one value, unless batch processing is slow relative to the frequency of new values.

Options are availble to change the behavior to meet your requirements.

Inputs and Outputs

A Batcher can acquire new values in the following ways, which may be combined:

A Batcher can output batches in one of the following ways:

Defer Thresholds

Defer thresholds are set via [Option]s returned from the WithDefer* family of functions. A new batch is not dispatched until all defer thresholds are met, allowing more values to accumulate in the new batch. Thus, changing a defer threshold can change how often accumulated values are dispatched as a batch, as well as the size of those batches.

Each type of defer threshold has two subtypes: soft and hard. A soft threshold is a wish-to-have threshold that will be violated if necessary to satisfy a dispatch constraint (discussed below). A hard threshold is a must-have threshold that will not be violated even if it means violating a dispatch constraint, with one exception: when the stream of input values is finished, any hard thresholds that cannot possibly be met by pending values (values received but not yet dispatched) are ignored when deciding when to dispatch the final batch.

The following defer thresholds are currently configurable:

  • MaxInFlight: The maximum number of batches that can be "in flight". (The number of in-flight batches is the number of calls to the BatchProcessor that have not yet returned.) Defaults to 1 (hard and soft).
  • MinCount: The minimum number of values in a batch. Defaults to 1 (hard and soft).
  • MinAge: The minimum age of the youngest value in the batch. Defaults to 0s (hard and soft).
  • RateLimit: Defer dispatching the next batch if the number of recently dispatched batches exceeds a limit. Defaults to infinite (hard and soft).

TODO: The following defer thresholds are not yet implemented and thus not configurable, but they should be straightforward to implement if desired:

  • MinWeight: Minimum "weight" of a batch. (A batch's weight would be determined by the application, "number of bytes" for example.) Currently this is effectively 0 (both soft and hard), meaning batch weight is ignored.
  • MaxInFlightCount: Maximum total number of values across all in-flight batches. Currently this is effectively infinite (both soft and hard), meaning there is no limit to the number of in-flight values.
  • MaxInFlightWeight: Maximum total weight of all in-flight batches. Currently this is effectively infinite (both soft and hard), meaning there is no limit to the number of in-flight values.

Dispatch Constraints

Dispatch constraints are set via [Option]s returned from the WithConstrain* family of functions. While defer thresholds control when batches should NOT be dispatched, dispatch constraints are the opposite: they control when batches SHOULD be dispatched. If any dispatch constraint is satisfied, and all hard defer thresholds have been met, a new batch is dispatched even if there are some soft defer thresholds that have not yet been met.

Dispatch constraints are useful for setting upper bounds on batch size and processing latency.

The following dispatch constraints are currently configurable:

  • MaxAge: The maximum age of the oldest value in the batch. Defaults to unlimited.
  • MaxCount: Maximum number of values in a batch. Defaults to unlimited.

TODO: The following dispatch constraints are not yet implemented and thus not configurable, but they should be straightforward to implement if desired:

  • MinRate: Dispatch a new batch if the number of recently dispatched batches drops below a limit. Currently this is effectively 0.
  • MaxWeight: Maximum weight of a batch. (A newly received value would be placed in the next batch if it would cause the current batch to exceed the weight limit.) Currently this is effectively infinite, meaning batch weight is ignored.
  • MinInFlight: Minimum number of in-flight batches. (A new batch is dispatched if the number of in-flight batches drops below this threshold.) Currently this is effectively 0, meaning the number of in-flight batches is ignored.
  • MinInFlightCount: Minimum total number of in-flight values across all in-flight batches. Currently this is effectively 0, meaning the number of in-flight values is ignored.
  • MinInFlightWeight: Minimum total weight of all in-flight batches. Currently this is effectively 0, meaning the weight is ignored.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Chan

func Chan[Q any](ch <-chan Q, opts ...Option) <-chan []Q

Chan is a convenience wrapper around ToChan and Batcher.AddFrom. It returns a new unbuffered channel that receives batches of values accumulated from the given input channel. Each batch includes every value received from the input channel (in the order they were received) since the previous batch was sent.

If the input channel is closed, any previously received values that have not yet been dispatched are batched together and dispatched once all hard defer thresholds that can be met are met. After that last batch is dispatched, the returned channel is closed. The input channel must be closed to free resources and to ensure that all input values are sent to the returned channel.

Types

type BatchProcessor

type BatchProcessor[Q, R any] interface {
	func(context.Context, []Q) ([]R, []error) |
		func(context.Context, []Q) ([]R, error) |
		func(context.Context, []Q) (R, error)
}

A BatchProcessor is a function that takes a batch of values (queries), processes them, and returns their corresponding responses and errors. A Batcher considers the batch to be "in flight" until this function returns. A Batcher may call this function concurrently if the number of in-flight batches is under the Batcher's configured MaxInFlight defer threshold.

For convenience, the return values can take one of these forms:

  • ([]R, []error): The response and error slices have length equal to the number of queries in the batch, and entry i contains the response/error corresponding to query i in the batch.
  • ([]R, error): Same as above, except a single error value applies to all queries.
  • (R, error): The same response and error value applies to all queries.

The response and error values for a query added via [Add] are propagated back to the waiting caller. The response and error values for a query added via an input channel registered with Batcher.AddFrom are discarded.

Canceling the context passed to New or ToChan cancels the context that the Batcher passes to this function.

type Batcher

type Batcher[Q, R any] struct {
	// contains filtered or unexported fields
}

A Batcher collects values in batches, dispatching each batch to a custom BatchProcessor or channel.

func New

func New[Q, R any, F BatchProcessor[Q, R]](ctx context.Context, fn F, opts ...Option) *Batcher[Q, R]

New returns a new Batcher that dispatches batches to the given BatchProcessor function.

Canceling ctx has the following effects:

  • The context passed to the given BatchProcessor is canceled.
  • Any remaining values are batched and processed, after any hard defer thresholds that can be met with the remaining values are met. (Hard defer thresholds such as MinCount are ignored.)
  • No new values can be enqueued; all future calls to Add immediately return ctx's error.
  • Once the final batch is processed, the Batcher's background goroutine exits and the channel returned from [Done] is closed.

func ToChan

func ToChan[Q any](ctx context.Context, ch chan<- []Q, opts ...Option) *Batcher[Q, struct{}]

ToChan returns a new Batcher that sends each batch to the output channel ch. The output channel is not closed when done. The Batcher will not send any new batches to the channel after the channel returned from [Done] is closed.

The effects of canceling ctx are as described for New.

The MaxInFlight hard defer threshold is forced to 1, and a batch is considered to be in flight if ch is not ready to receive a batch.

func (*Batcher[Q, R]) Add

func (b *Batcher[Q, R]) Add(ctx context.Context, q Q) (R, error)

Add enqueues a value for a future batch and blocks until the batch is processed or the provided context is canceled.

If either ctx or the context passed to New or ToChan are canceled before the Batcher's background goroutine accepts the value, a zero value and the context's error are returned.

If ctx is canceled before processing completes, a zero value and ctx's error are returned. (Canceling ctx does not abort processing; if that is needed, cancel the context passed to New or ToChan and ensure that the BatchProcessor function observes the state of its context argument.)

Otherwise, the returned value and error are taken from the BatchProcessor's return values.

func (*Batcher[Q, R]) AddFrom

func (b *Batcher[Q, R]) AddFrom(ch <-chan Q) <-chan struct{}

AddFrom registers ch with the Batcher. Values received from ch are added to batches in addition to any values passed to [Add] and any values received from another channel registered by a separate call to AddFrom. Values are received from ch until it is closed or the context passed to New or ToChan is canceled, at which point the returned channel is closed. The response and error return values that result from processing the values received from ch are not available.

func (*Batcher[Q, R]) Done

func (b *Batcher[Q, R]) Done() <-chan struct{}

Done returns a channel that is closed when the Batcher has flushed the remaining input values after the context passed to New or ToChan is canceled.

type Option

type Option func(*batcherConfig)

Option customizes the batching behavior.

func WithClock

func WithClock(c clockwork.Clock) Option

WithClock returns an Option that changes the clock implementation that is used to reckon ages and time intervals. Its primary purpose is to facilitate testing. If this option is not used, the system's real clock is used.

func WithConstrainMaxAge

func WithConstrainMaxAge(d time.Duration) Option

WithConstrainMaxAge returns an Option that sets the MaxAge dispatch constraint to d. A new batch is dispatched immediately if the age of the oldest value in the batch exceeds d and all hard defer thresholds have been met. If this option is not used, there is no maximum age.

func WithConstrainMaxCount

func WithConstrainMaxCount(n int) Option

WithConstrainMaxCount returns an Option that sets the MaxCount dispatch constraint to n. A new batch is dispatched immediately if the number of values in the batch is n and all hard defer thresholds have been met. If this option is not used, there is no limit.

func WithDeferMaxInFlightHard

func WithDeferMaxInFlightHard(n int) Option

WithDeferMaxInFlightHard returns an Option that sets the hard MaxInFlight defer threshold to n, which must be positive. This is the same as WithDeferMaxInFlightSoft, except it cannot be preempted by a dispatch constraint. If this option is not used, the hard threshold is 1.

func WithDeferMaxInFlightSoft

func WithDeferMaxInFlightSoft(n int) Option

WithDeferMaxInFlightSoft returns an Option that sets the soft MaxInFlight defer threshold to n, which must be positive. Dispatch of a new batch is postponed until the number of in-flight batches (the number of calls to the BatchProcessor function that have not yet returned) has dropped below this number, unless preempted by a dispatch constraint. If this option is not used, the soft threshold is 1.

func WithDeferMinAgeHard

func WithDeferMinAgeHard(d time.Duration) Option

WithDeferMinAgeHard returns an Option that sets the hard MinAge defer threshold to d. This is the same as WithDeferMinAgeSoft, except it cannot be preempted by a dispatch constraint. If unsure, you are encouraged to use WithDeferMinAgeSoft instead of this. If this option is not used, the hard threshold is 0.

func WithDeferMinAgeSoft

func WithDeferMinAgeSoft(d time.Duration) Option

WithDeferMinAgeSoft returns an Option that sets the soft MinAge defer threshold to d. Dispatch of a new batch is postponed until after the age of the youngest value in the batch reaches d, unless preempted by a dispatch constraint. If this option is not used, the soft threshold is 0.

func WithDeferMinCountHard

func WithDeferMinCountHard(n int) Option

WithDeferMinCountHard returns an Option that sets the hard MinCount defer threshold to n. This is the same as WithDeferMinCountSoft, except it cannot be preempted by a dispatch constraint. If this option is not used, the hard threshold is 1.

func WithDeferMinCountSoft

func WithDeferMinCountSoft(n int) Option

WithDeferMinCountSoft returns an Option that sets the soft MinCount defer threshold to n. Dispatch of a new batch is postponed until the number of values in the batch has reached n, unless preempted by a dispatch constraint. If this option is not used, the soft threshold is 1.

func WithDeferRateLimitHard

func WithDeferRateLimitHard(lim *rate.Limiter) Option

WithDeferRateLimitHard returns an Option that sets the hard RateLimit defer threshold to lim. This is the same as WithDeferRateLimitSoft, except it cannot be preempted by a dispatch constraint. If unsure, you are encouraged to use WithDeferRateLimitSoft instead of this. If this option is not used, or if lim is nil, the hard rate is unlimited.

func WithDeferRateLimitSoft

func WithDeferRateLimitSoft(lim *rate.Limiter) Option

WithDeferRateLimitSoft returns an Option that sets the soft RateLimit defer threshold to d. Dispatch of a new batch is postponed until a token is available in lim, unless preempted by a dispatch constraint. If this option is not used, the soft rate is unlimited.

Note: If this is preempted by a dispatch constraint, the next batch will be delayed more than usual to keep the overall rate under the threshold. If this is consistently preempted, this may become significantly backlogged resulting in a long period before the next batch is dispatched after the preemption ceases.

Directories

Path Synopsis
internal
sel
Package sel makes it easier to work with dynamic select cases.
Package sel makes it easier to work with dynamic select cases.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL