luci: Index | Files | Directories

package dispatcher

import ""

Package dispatcher implements a super-charged version of a buffered channel connected to a (potentially) parallelized work dispatcher.

This can be used when you have a mismatch between the rate of production of items and the rate of consumption of those items. For example:

* if you have a producer which constantly produces new world states,
  and you want to sink the latest one into a slow external RPC (but still
  do retries if no new state appears).
* if you have bursty user data which you'd like to batch according to some
  maximum batch size, but you don't want the data to get too stale in case
  you don't hit that batch limit.
* your external RPC can absorb almost infinite data, and the order of
  delivery doesn't matter, but you don't want to block the data producer.
* etc.

The dispatcher can be configured to:

* Buffer a certain amount of work (with possible backpressure to the
* Batch pending work into chunks for the send function.
* Drop stale work which is no longer important to send.
* Enforce a maximum QPS on the send function (even with parallel senders).
* Retry batches independently with configurable per-batch retry policy.


Package Files

channel.go coordinator.go doc.go options.go options_validate.go

func DropFnQuiet Uses

func DropFnQuiet(*buffer.Batch, bool)

DropFnQuiet is an implementation of Options.DropFn which drops batches without logging anything.

func DropFnSummarized Uses

func DropFnSummarized(ctx context.Context, lim *rate.Limiter) func(*buffer.Batch, bool)

DropFnSummarized returns an implementation of Options.DropFn which counts the number of dropped batches, and only reports it at the rate provided.

Unlike the default log function, this only logs the number of dropped items and the duration that they were collected over.

func ErrorFnQuiet Uses

func ErrorFnQuiet(b *buffer.Batch, err error) (retry bool)

ErrorFnQuiet is an implementation of Options.ErrorFn which doesn't log the batch, but does check for `transient.Tag` to determine `retry`.

type Channel Uses

type Channel struct {
    // C is an unbuffered channel which you can push single work items into.
    // Close this to shut down the Channel.
    C   chan<- interface{}

    // DrainC will unblock when this Channel is closed/canceled and fully drained.
    DrainC <-chan struct{}

Channel holds a chan which you can push individual work items to.

func NewChannel Uses

func NewChannel(ctx context.Context, opts *Options, send SendFn) (Channel, error)

NewChannel produces a new Channel ready to listen and send work items.


* `ctx` will be used for cancellation and logging. When the `ctx` is
  canceled, the Channel will:
    * drop all incoming data on Channel.C; All new data will be dropped
      (calling DropFn).
    * drop all existing unleased batches (calling DropFn)
    * ignore all errors from SendFn (i.e. even if ErrorFn returns
      'retry=true', the batch will be dropped anyway)
    If you want to gracefully drain the Channel, you must close the channel
    and wait for DrainC before canceling the context.
* `send` is required, and defines the function to use to process Batches
  of data. This function MUST respect `ctx.Done`, or the Channel cannot
  drain properly.
* `opts` is optional (see Options for the defaults).

The Channel MUST be Close()'d when you're done with it, or the Channel will not terminate. This applies even if you cancel it via ctx. The caller is responsible for this (as opposed to having Channel implement this internally) because there is no generally-safe way in Go to close a channel without coordinating that event with all senders on that channel. Because the caller of NewChannel is effectively the sender (owner) of Channel.C, they must coordinate closure of this channel with all their use of sends to this channel.

func (Channel) Close Uses

func (c Channel) Close()

Close is a convenience function which closes C (and swallows panic if already closed).

func (Channel) CloseAndDrain Uses

func (c Channel) CloseAndDrain(ctx context.Context)

CloseAndDrain is a convenience function which closes C (and swallows panic if already closed) and then blocks on DrainC/ctx.Done().

func (Channel) IsDrained Uses

func (c Channel) IsDrained() bool

IsDrained returns true iff the Channel is closed and drained.

type ErrorFn Uses

type ErrorFn func(failedBatch *buffer.Batch, err error) (retry bool)

ErrorFn is called to handle the error from SendFn.

It executes in the main handler loop of the dispatcher so it can make synchronous decisions about the dispatcher state.

Blocking in this function will block ALL dispatcher actions, so be quick :).

DO NOT WRITE TO THE CHANNEL DIRECTLY FROM THIS FUNCTION. Doing so will very likely cause deadlocks.

This may:

* inspect/log the error
* manipulate the contents of failedBatch
* return a boolean of whether this Batch should be retried or not. If
  this is false then the Batch is dropped. If it's true, then it will be
  re-queued as-is for transmission according to BufferFullBehavior.
* pass the Batch.Data to another goroutine (in a non-blocking way!) to be
  re-queued through Channel.WriteChan.


* failedBatch - The Batch for which SendFn produced a non-nil error.
* err - The error SendFn produced.

Returns true iff the dispatcher should re-try sending this Batch, according to Buffer.Retry.

func ErrorFnReport Uses

func ErrorFnReport(bufferSize int, inner ErrorFn) (ErrorFn, <-chan error)

ErrorFnReport is an implementation of Options.ErrorFn which sends all errors to a buffered channel. The channel MUST be drained as quickly as possible. Otherwise, it may block all dispatcher actions.

If `inner` error function is provided, it is used to determine `retry`. Otherwise, `retry` is always false.

type Options Uses

type Options struct {
    // [OPTIONAL] The ErrorFn to use (see ErrorFn docs for details).
    // Default: Logs the error (at Info for retryable errors, and Error for
    // non-retryable errors) and returns true on a transient error.
    ErrorFn ErrorFn

    // [OPTIONAL] Called with the dropped batch any time the Channel drops a batch.
    // This includes:
    //   * When FullBehavior==DropOldestBatch and we get new data.
    //   * When FullBehavior==DropOldestBatch and we attempt to retry old data.
    //   * When ErrorFn returns false for a batch.
    // It executes in the main handler loop of the dispatcher so it can make
    // synchronous decisions about the dispatcher state.
    // Blocking in this function will block ALL dispatcher actions, so be quick
    // :).
    // likely cause deadlocks.
    // When the channel is fully drained, this will be invoked exactly once with
    // `(nil, true)`. This will occur immediately before the DrainedFn is called.
    // Some drop functions buffer their information, and this gives them an
    // opportunity to flush out any buffered data.
    // Default: logs (at Info level if FullBehavior==DropOldestBatch, or Warning
    // level otherwise) the number of data items in the Batch being dropped.
    DropFn func(b *buffer.Batch, flush bool)

    // [OPTIONAL] Called exactly once when the associated Channel is closed and
    // has fully drained its buffer, but before DrainC is closed.
    // Note that this takes effect whether the Channel is shut down via Context
    // cancellation or explicitly by closing Channel.C.
    // This is useful for performing final state synchronization tasks/metrics
    // finalization/helpful "everything is done!" messages/etc. without having to
    // poll the Channel to see if it's done and also maintain external
    // synchronization around the finalization action.
    // Called in the main handler loop, but it's called after all other work is
    // done by the Channel, so the only thing it blocks is the closure of DrainC.
    // Default: No action.
    DrainedFn func()

    // [OPTIONAL] A rate limiter for how frequently this will invoke SendFn.
    // Default: No limit.
    QPSLimit *rate.Limiter

    Buffer buffer.Options
    // contains filtered or unexported fields

Options is the configuration options for NewChannel.

type SendFn Uses

type SendFn func(data *buffer.Batch) error

SendFn is the function which does the work to actually transmit the Batch to the next stage of your processing pipeline (e.g. do an RPC to a remote service).

The function may manipulate the Batch however it wants (see Batch).

In particular, shrinking the size of Batch.Data for confirmed-sent items will allow the dispatcher to reduce its buffer count when SendFn returns, even if SendFn returns an error. Removing items from the Batch will not cause the remaining items to be coalesced into a different Batch.

The SendFn MUST be bound to this Channel's Context; if the Channel's Context is Cancel'd, SendFn MUST terminate, or the Channel's DrainC will be blocked. We don't pass it as part of SendFn's signature in case SendFn needs to be bound to a derived Context.

Non-nil errors returned by this function will be handled by ErrorFn.


bufferPackage buffer implements a batching buffer with batch lease and retry management.

Package dispatcher imports 8 packages (graph) and is imported by 6 packages. Updated 2020-11-26. Refresh now. Tools for package owners.