buffer

package
v0.0.0-...-4a11b79 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 2, 2020 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package buffer implements a batching buffer with batch lease and retry management.

It is meant to be used by something which handles synchronization (primarially the "common/sync/dispatcher.Channel"). As such, it IS NOT GOROUTINE-SAFE. If you use this outside of dispatcher.Channel, you must synchronize all access to the methods of Buffer.

Index

Constants

This section is empty.

Variables

View Source
var Defaults = Options{
	MaxLeases:     4,
	BatchSize:     20,
	BatchDuration: 10 * time.Second,
	FullBehavior: &BlockNewItems{
		MaxItems: 1000,
	},
	Retry: func() retry.Iterator {
		return &retry.ExponentialBackoff{
			Limited: retry.Limited{
				Delay:   200 * time.Millisecond,
				Retries: -1,
			},
			Multiplier: 1.2,
			MaxDelay:   60 * time.Second,
		}
	},
}

Defaults defines the defaults for Options when it contains 0-valued fields.

DO NOT ASSIGN/WRITE TO THIS STRUCT.

Functions

This section is empty.

Types

type Batch

type Batch struct {
	// Data is the individual work items pushed into the Buffer.
	Data []interface{}

	// Meta is an object which dispatcher.Channel will treat as totally opaque;
	// You may manipulate it in SendFn or ErrorFn as you see fit. This can be used
	// for e.g. associating a nonce with the Batch for retries, or stashing
	// a constructed RPC proto, etc.
	Meta interface{}
	// contains filtered or unexported fields
}

Batch represents a collection of individual work items and associated metadata.

Batches are are cut by the Channel according to Options.Buffer, and can be manipulated by ErrorFn and SendFn.

ErrorFn and SendFn may manipulate the contents of the Batch (Data and Meta) to do things such as:

  • Associate a UID with the Batch (e.g. in the Meta field) to identify it to remote services for deduplication.
  • Remove already-processed items from Data in case the SendFn partially succeeded.

The dispatcher accounts for the number of work items in the Batch as it leases the Batch out; initially the Batch's length will be len(Data). If the SendFn reduces the length of Data before the NACK, the accounted number of work items will be accordingly reduced. The accounted length can never grow (e.g. extending Data doesn't do anything).

type BlockNewItems

type BlockNewItems struct {
	// The maximum number of items that this Buffer is allowed to house (including
	// both leased and unleased items).
	//
	// Default: max(1000, BatchSize)
	// Required: Must be >= BatchSize
	MaxItems int
}

BlockNewItems prevents the Buffer from accepting any new items as long as it it has MaxItems worth of items.

This will never drop batches, but will block inserts.

func (*BlockNewItems) Check

func (b *BlockNewItems) Check(opts Options) (err error)

Check implements FullBehavior.Check.

func (*BlockNewItems) ComputeState

func (b *BlockNewItems) ComputeState(stats Stats) (okToInsert, dropBatch bool)

ComputeState implements FullBehavior.ComputeState.

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

Buffer batches individual data items into Batch objects.

All access to the Buffer (as well as invoking ACK/NACK on LeasedBatches) must be synchronized because Buffer is not goroutine-safe.

func NewBuffer

func NewBuffer(o *Options) (*Buffer, error)

NewBuffer returns a new Buffer configured with the given Options.

If there's an issue with the provided Options, this returns an error.

func (*Buffer) ACK

func (buf *Buffer) ACK(leased *Batch)

ACK records that all the items in the batch have been processed.

The Batch is no longer tracked in any form by the Buffer.

Calling ACK/NACK on the same Batch twice will panic. Calling ACK/NACK on a Batch not returned from LeaseOne will panic.

func (*Buffer) AddNoBlock

func (buf *Buffer) AddNoBlock(now time.Time, item interface{}) (dropped *Batch)

AddNoBlock adds the item to the Buffer.

Possibly drops a batch according to FullBehavior. If FullBehavior.ComputeState returns okToInsert=false, AddNoBlock panics.

func (*Buffer) CanAddItem

func (buf *Buffer) CanAddItem() bool

CanAddItem returns true iff the Buffer will accept an item from AddNoBlock without panicing.

func (*Buffer) Flush

func (buf *Buffer) Flush(now time.Time)

Flush causes any buffered-but-not-batched data to be immediately cut into a Batch.

No-op if there's no such data.

func (*Buffer) ForceLeaseAll

func (buf *Buffer) ForceLeaseAll() []*Batch

ForceLeaseAll leases and returns all unleased Batches immediately, regardless of their send times.

This is useful for cancelation scenarios where you no longer want to do full processing on the remaining batches.

NOTE: It's helpful to call Flush before ForceLeaseAll to include the currently buffered, but unbatched, data.

func (*Buffer) LeaseOne

func (buf *Buffer) LeaseOne(now time.Time) (leased *Batch)

LeaseOne returns the most-available-to-send Batch from this Buffer.

The caller must invoke one of ACK/NACK on the Batch. The Batch will count against this Buffer's Stats().Total() until the caller does so.

Returns nil if no batch is available to lease, or if the Buffer has reached MaxLeases.

func (*Buffer) NACK

func (buf *Buffer) NACK(ctx context.Context, err error, leased *Batch)

NACK analyzes the current state of Batch.Data, potentially reducing UnleasedItemCount in the Buffer's if the given Batch.Data length is smaller than when the Batch was originally leased.

The Batch will be re-enqueued unless:

  • The Batch's retry Iterator returns retry.Stop
  • The Batch has been dropped already due to FullBehavior policy. If this is the case, AddNoBlock would already have returned this *Batch pointer to you.

Calling ACK/NACK on the same Batch twice will panic. Calling ACK/NACK on a Batch not returned from LeaseOne will panic.

func (*Buffer) NextSendTime

func (buf *Buffer) NextSendTime() time.Time

NextSendTime returns the send time for the next-most-available-to-send Batch, or a Zero time.Time if no batches are available to send.

NOTE: Because LeaseOne enforces MaxLeases, this time may not guarantee an available lease.

func (*Buffer) Stats

func (buf *Buffer) Stats() Stats

Stats returns information about the Buffer's state.

type DropOldestBatch

type DropOldestBatch struct {
	// The maximum number of leased and unleased items that the Buffer may have
	// before dropping data.
	//
	// Once a batch is dropped, it no longer counts against MaxLiveItems, but it
	// may still be in memory if the dropped batch was currently leased.
	//
	// NOTE: The maximum Stats.Total number of items the Buffer could have at
	// a given time is:
	//
	//     MaxLiveItems + (BatchSize * MaxLeases)
	//
	// Default: max(1000, BatchSize)
	// Required: Must be >= BatchSize
	MaxLiveItems int
}

DropOldestBatch will drop buffered data whenever the number of unleased items plus leased items would grow beyond MaxLiveItems.

This will never block inserts, but will drop batches.

func (*DropOldestBatch) Check

func (d *DropOldestBatch) Check(opts Options) (err error)

Check implements FullBehavior.Check.

func (*DropOldestBatch) ComputeState

func (d *DropOldestBatch) ComputeState(stats Stats) (okToInsert, dropBatch bool)

ComputeState implements FullBehavior.ComputeState.

type FullBehavior

type FullBehavior interface {
	// ComputeState evaluates the state of the Buffer (via Stats) and returns:
	//
	//   * okToInsert - User can add item without blocking.
	//   * dropBatch - Buffer needs to drop the oldest batch if the user does
	//     insert data.
	ComputeState(Stats) (okToInsert, dropBatch bool)

	// Check inspects Options to see if it's compatible with this FullBehavior
	// implementation.
	//
	// Called exactly once during Buffer creation.
	Check(Options) error
}

FullBehavior allows you to customize the Buffer's behavior when it gets too full.

Generally you'll pick one of DropOldestBatch or BlockNewItems.

type InfiniteGrowth

type InfiniteGrowth struct{}

InfiniteGrowth will not drop data or block new items. It just grows until your computer runs out of memory.

This will never block inserts, and will not drop batches.

func (InfiniteGrowth) Check

func (i InfiniteGrowth) Check(opts Options) (err error)

Check implements FullBehavior.Check.

func (InfiniteGrowth) ComputeState

func (i InfiniteGrowth) ComputeState(stats Stats) (okToInsert, dropBatch bool)

ComputeState implements FullBehavior.ComputeState.

type Options

type Options struct {
	// [OPTIONAL] The maximum number of outstanding leases permitted.
	//
	// Attempting additional leases (with LeaseOne) while at the maximum will
	// return nil.
	//
	// Requirement: Must be > 0
	MaxLeases int

	// [OPTIONAL] The maximum number of items to allow in a Batch before making it
	// available to lease.
	//
	// Special value -1: unlimited
	// Requirement: Must be == -1 (i.e. cut batches based on BatchDuration), or > 0
	BatchSize int

	// [OPTIONAL] The maximum amount of time to wait before queuing a Batch for
	// transmission. Note that batches are only cut by time when a worker is ready
	// to process them (i.e. LeaseOne is invoked).
	//
	// Requirement: Must be > 0
	BatchDuration time.Duration

	// [OPTIONAL] Sets the policy for the Buffer around how many items the Buffer
	// is allowed to hold, and what happens when that number is reached.
	FullBehavior FullBehavior

	// [OPTIONAL] If true, ensures that the next available batch is always the one
	// with the oldest data.
	//
	// If this is false (the default), batches will be leased in the order that
	// they're available to send; If a Batch has a retry with a high delay, it's
	// possible that the next leased Batch actually contains newer data than
	// a later batch.
	//
	// NOTE: if this is combined with high Retry values, it can lead to a
	// head-of-line blocking situation.
	//
	// Requirement: May only be true if MaxLeases == 1
	FIFO bool

	// [OPTIONAL] Each batch will have a retry.Iterator assigned to it from this
	// retry.Factory.
	//
	// When a Batch is NACK'd, it will be retried at "now" plus the Duration
	// returned by the retry.Iterator.
	//
	// If the retry.Iterator returns retry.Stop, the Batch will be silently
	// dropped.
	Retry retry.Factory
}

Options configures policy for the Buffer.

See Defaults for default values.

type Stats

type Stats struct {
	// UnleasedItemCount is the total number of items (i.e. objects passed to
	// AddNoBlock) which are currently owned by the Buffer but are not currently
	// leased. This includes:
	//    * Items buffered, but not yet cut into a Batch.
	//    * Items in unleased Batches.
	UnleasedItemCount int

	// LeasedItemCount is the total number of items (i.e. objects passed to
	// AddNoBlock) which are currently owned by the Buffer and are in active
	// leases.
	LeasedItemCount int

	// DroppedLeasedItemCount is the total number of items (i.e. objects passed to
	// AddNoBlock) which were part of leases, but where those leases have been
	// dropped (due to FullBehavior policy).
	DroppedLeasedItemCount int
}

Stats is a block of information about the Buffer's present state.

func (Stats) Empty

func (s Stats) Empty() bool

Empty returns true iff the Buffer is totally empty (has zero user-provided items).

func (Stats) Total

func (s Stats) Total() int

Total returns the total number of items currently referenced by the Buffer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL