luci: go.chromium.org/luci/common/sync/dispatcher/buffer Index | Files

package buffer

import "go.chromium.org/luci/common/sync/dispatcher/buffer"

Package buffer implements a batching buffer with batch lease and retry management.

It is meant to be used by something which handles synchronization (primarially the "common/sync/dispatcher.Channel"). As such, it IS NOT GOROUTINE-SAFE. If you use this outside of dispatcher.Channel, you must synchronize all access to the methods of Buffer.

Index

Package Files

batch.go buffer.go full_behavior.go heap.go moving_average.go options.go

Variables

var Defaults = Options{
    MaxLeases:     4,
    BatchSize:     20,
    BatchDuration: 10 * time.Second,
    FullBehavior: &BlockNewItems{
        MaxItems: 1000,
    },
    Retry: func() retry.Iterator {
        return &retry.ExponentialBackoff{
            Limited: retry.Limited{
                Delay:   200 * time.Millisecond,
                Retries: -1,
            },
            Multiplier: 1.2,
            MaxDelay:   60 * time.Second,
        }
    },
}

Defaults defines the defaults for Options when it contains 0-valued fields.

DO NOT ASSIGN/WRITE TO THIS STRUCT.

type Batch Uses

type Batch struct {
    // Data is the individual work items pushed into the Buffer.
    Data []interface{}

    // Meta is an object which dispatcher.Channel will treat as totally opaque;
    // You may manipulate it in SendFn or ErrorFn as you see fit. This can be used
    // for e.g. associating a nonce with the Batch for retries, or stashing
    // a constructed RPC proto, etc.
    Meta interface{}
    // contains filtered or unexported fields
}

Batch represents a collection of individual work items and associated metadata.

Batches are are cut by the Channel according to Options.Buffer, and can be manipulated by ErrorFn and SendFn.

ErrorFn and SendFn may manipulate the contents of the Batch (Data and Meta) to do things such as:

* Associate a UID with the Batch (e.g. in the Meta field) to identify it to
  remote services for deduplication.
* Remove already-processed items from Data in case the SendFn partially
  succeeded.

The dispatcher accounts for the number of work items in the Batch as it leases the Batch out; initially the Batch's length will be len(Data). If the SendFn reduces the length of Data before the NACK, the accounted number of work items will be accordingly reduced. The accounted length can never grow (e.g. extending Data doesn't do anything).

type BlockNewItems Uses

type BlockNewItems struct {
    // The maximum number of items that this Buffer is allowed to house (including
    // both leased and unleased items).
    //
    // Default: max(1000, BatchSize)
    // Required: Must be >= BatchSize
    MaxItems int
}

BlockNewItems prevents the Buffer from accepting any new items as long as it it has MaxItems worth of items.

This will never drop batches, but will block inserts.

func (*BlockNewItems) Check Uses

func (b *BlockNewItems) Check(opts Options) (err error)

Check implements FullBehavior.Check.

func (*BlockNewItems) ComputeState Uses

func (b *BlockNewItems) ComputeState(stats Stats) (okToInsert, dropBatch bool)

ComputeState implements FullBehavior.ComputeState.

type Buffer Uses

type Buffer struct {
    // contains filtered or unexported fields
}

Buffer batches individual data items into Batch objects.

All access to the Buffer (as well as invoking ACK/NACK on LeasedBatches) must be synchronized because Buffer is not goroutine-safe.

func NewBuffer Uses

func NewBuffer(o *Options) (*Buffer, error)

NewBuffer returns a new Buffer configured with the given Options.

If there's an issue with the provided Options, this returns an error.

func (*Buffer) ACK Uses

func (buf *Buffer) ACK(leased *Batch)

ACK records that all the items in the batch have been processed.

The Batch is no longer tracked in any form by the Buffer.

Calling ACK/NACK on the same Batch twice will panic. Calling ACK/NACK on a Batch not returned from LeaseOne will panic.

func (*Buffer) AddNoBlock Uses

func (buf *Buffer) AddNoBlock(now time.Time, item interface{}) (dropped *Batch)

AddNoBlock adds the item to the Buffer.

Possibly drops a batch according to FullBehavior. If FullBehavior.ComputeState returns okToInsert=false, AddNoBlock panics.

func (*Buffer) CanAddItem Uses

func (buf *Buffer) CanAddItem() bool

CanAddItem returns true iff the Buffer will accept an item from AddNoBlock without panicing.

func (*Buffer) Flush Uses

func (buf *Buffer) Flush(now time.Time)

Flush causes any buffered-but-not-batched data to be immediately cut into a Batch.

No-op if there's no such data.

func (*Buffer) LeaseOne Uses

func (buf *Buffer) LeaseOne(now time.Time) (leased *Batch)

LeaseOne returns the most-available-to-send Batch from this Buffer.

The caller must invoke one of ACK/NACK on the Batch. The Batch will count against this Buffer's Stats().Total() until the caller does so.

Returns nil if no batch is available to lease, or if the Buffer has reached MaxLeases.

func (*Buffer) NACK Uses

func (buf *Buffer) NACK(ctx context.Context, err error, leased *Batch)

NACK analyzes the current state of Batch.Data, potentially reducing UnleasedItemCount in the Buffer's if the given Batch.Data length is smaller than when the Batch was originally leased.

The Batch will be re-enqueued unless:

* The Batch's retry Iterator returns retry.Stop
* The Batch has been dropped already due to FullBehavior policy. If this
  is the case, AddNoBlock would already have returned this *Batch pointer
  to you.

Calling ACK/NACK on the same Batch twice will panic. Calling ACK/NACK on a Batch not returned from LeaseOne will panic.

func (*Buffer) NextSendTime Uses

func (buf *Buffer) NextSendTime() time.Time

NextSendTime returns the send time for the next-most-available-to-send Batch, or a Zero time.Time if no batches are available to send.

NOTE: Because LeaseOne enforces MaxLeases, this time may not guarantee an available lease.

func (*Buffer) Stats Uses

func (buf *Buffer) Stats() Stats

Stats returns information about the Buffer's state.

type DropOldestBatch Uses

type DropOldestBatch struct {
    // The maximum number of leased and unleased items that the Buffer may have
    // before dropping data.
    //
    // Once a batch is dropped, it no longer counts against MaxLiveItems, but it
    // may still be in memory if the dropped batch was currently leased.
    //
    // NOTE: The maximum Stats.Total number of items the Buffer could have at
    // a given time is:
    //
    //     MaxLiveItems + (BatchSize * MaxLeases)
    //
    // Default: max(1000, BatchSize)
    // Required: Must be >= BatchSize
    MaxLiveItems int
}

DropOldestBatch will drop buffered data whenever the number of unleased items plus leased items would grow beyond MaxLiveItems.

This will never block inserts, but will drop batches.

func (*DropOldestBatch) Check Uses

func (d *DropOldestBatch) Check(opts Options) (err error)

Check implements FullBehavior.Check.

func (*DropOldestBatch) ComputeState Uses

func (d *DropOldestBatch) ComputeState(stats Stats) (okToInsert, dropBatch bool)

ComputeState implements FullBehavior.ComputeState.

type FullBehavior Uses

type FullBehavior interface {
    // ComputeState evaluates the state of the Buffer (via Stats) and returns:
    //
    //   * okToInsert - User can add item without blocking.
    //   * dropBatch - Buffer needs to drop the oldest batch if the user does
    //     insert data.
    ComputeState(Stats) (okToInsert, dropBatch bool)

    // Check inspects Options to see if it's compatible with this FullBehavior
    // implementation.
    //
    // Called exactly once during Buffer creation.
    Check(Options) error
}

FullBehavior allows you to customize the Buffer's behavior when it gets too full.

Generally you'll pick one of DropOldestBatch or BlockNewItems.

type InfiniteGrowth Uses

type InfiniteGrowth struct{}

InfiniteGrowth will not drop data or block new items. It just grows until your computer runs out of memory.

This will never block inserts, and will not drop batches.

func (InfiniteGrowth) Check Uses

func (i InfiniteGrowth) Check(opts Options) (err error)

Check implements FullBehavior.Check.

func (InfiniteGrowth) ComputeState Uses

func (i InfiniteGrowth) ComputeState(stats Stats) (okToInsert, dropBatch bool)

ComputeState implements FullBehavior.ComputeState.

type Options Uses

type Options struct {
    // [OPTIONAL] The maximum number of outstanding leases permitted.
    //
    // Attempting additional leases (with LeaseOne) while at the maximum will
    // return nil.
    //
    // Requirement: Must be > 0
    MaxLeases int

    // [OPTIONAL] The maximum number of items to allow in a Batch before making it
    // available to lease.
    //
    // Special value -1: unlimited
    // Requirement: Must be == -1 (i.e. cut batches based on BatchDuration), or > 0
    BatchSize int

    // [OPTIONAL] The maximum amount of time to wait before queuing a Batch for
    // transmission. Note that batches are only cut by time when a worker is ready
    // to process them (i.e. LeaseOne is invoked).
    //
    // Requirement: Must be > 0
    BatchDuration time.Duration

    // [OPTIONAL] Sets the policy for the Buffer around how many items the Buffer
    // is allowed to hold, and what happens when that number is reached.
    FullBehavior FullBehavior

    // [OPTIONAL] If true, ensures that the next available batch is always the one
    // with the oldest data.
    //
    // If this is false (the default), batches will be leased in the order that
    // they're available to send; If a Batch has a retry with a high delay, it's
    // possible that the next leased Batch actually contains newer data than
    // a later batch.
    //
    // NOTE: if this is combined with high Retry values, it can lead to a
    // head-of-line blocking situation.
    //
    // Required: May only be true if MaxLeases == 1
    FIFO bool

    // [OPTIONAL] Each batch will have a retry.Iterator assigned to it from this
    // retry.Factory.
    //
    // When a Batch is NACK'd, it will be retried at "now" plus the Duration
    // returned by the retry.Iterator.
    //
    // If the retry.Iterator returns retry.Stop, the Batch will be silently
    // dropped.
    Retry retry.Factory
}

Options configures policy for the Buffer.

See Defaults for default values.

type Stats Uses

type Stats struct {
    // UnleasedItemCount is the total number of items (i.e. objects passed to
    // AddNoBlock) which are currently owned by the Buffer but are not currently
    // leased. This includes:
    //    * Items buffered, but not yet cut into a Batch.
    //    * Items in unleased Batches.
    UnleasedItemCount int

    // LeasedItemCount is the total number of items (i.e. objects passed to
    // AddNoBlock) which are currently owned by the Buffer and are in active
    // leases.
    LeasedItemCount int

    // DroppedLeasedItemCount is the total number of items (i.e. objects passed to
    // AddNoBlock) which were part of leases, but where those leases have been
    // dropped (due to FullBehavior policy).
    DroppedLeasedItemCount int
}

Stats is a block of information about the Buffer's present state.

func (Stats) Empty Uses

func (s Stats) Empty() bool

Empty returns true iff the Buffer is totally empty (has zero user-provided items).

func (Stats) Total Uses

func (s Stats) Total() int

Total returns the total number of items currently referenced by the Buffer.

Package buffer imports 7 packages (graph) and is imported by 4 packages. Updated 2019-08-25. Refresh now. Tools for package owners.