buffer

package
v0.0.0-...-1f88c41 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2024 License: Apache-2.0 Imports: 8 Imported by: 6

Documentation

Overview

Package buffer implements a batching buffer with batch lease and retry management.

It is meant to be used by something which handles synchronization (primarially the "common/sync/dispatcher.Channel"). As such, it IS NOT GOROUTINE-SAFE. If you use this outside of dispatcher.Channel, you must synchronize all access to the methods of Buffer.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrBufferFull   = errors.New("buffer is full")
	ErrItemTooLarge = errors.New("item exceeds buffer's BatchSizeMax")
	ErrItemTooSmall = errors.New("item has zero or negative size, and BatchSizeMax is set")
)

These errors can be returned from AddNoBlock.

View Source
var Defaults = Options{
	MaxLeases:     4,
	BatchItemsMax: 20,
	BatchSizeMax:  -1,
	BatchAgeMax:   10 * time.Second,
	FullBehavior: &BlockNewItems{
		MaxItems: 1000,
	},
	Retry: func() retry.Iterator {
		return &retry.ExponentialBackoff{
			Limited: retry.Limited{
				Delay:   200 * time.Millisecond,
				Retries: -1,
			},
			Multiplier: 1.2,
			MaxDelay:   60 * time.Second,
		}
	},
}

Defaults defines the defaults for Options when it contains 0-valued fields.

DO NOT ASSIGN/WRITE TO THIS STRUCT.

Functions

This section is empty.

Types

type Batch

type Batch struct {
	// Data is the work items pushed into the Buffer, plus their Size as provided
	// to AddNoBlock.
	Data []BatchItem

	// Meta is an object which dispatcher.Channel will treat as totally opaque;
	// You may manipulate it in SendFn or ErrorFn as you see fit. This can be used
	// for e.g. associating a nonce with the Batch for retries, or stashing
	// a constructed RPC proto, etc.
	Meta any
	// contains filtered or unexported fields
}

Batch represents a collection of individual work items and associated metadata.

Batches are are cut by the Channel according to Options.Buffer, and can be manipulated by ErrorFn and SendFn.

ErrorFn and SendFn may manipulate the contents of the Batch (Data and Meta) to do things such as:

  • Associate a UID with the Batch (e.g. in the Meta field) to identify it to remote services for deduplication.
  • Remove already-processed items from Data in case the SendFn partially succeeded.

The dispatcher accounts for the number of work items in the Batch as it leases the Batch out; initially the Batch's length will be len(Data). If the SendFn reduces the length of Data before the NACK, the accounted number of work items will be accordingly reduced. The accounted length can never grow (i.e. extending Data doesn't do anything).

Similarly, if the buffer is configured with BatchSize, the accounted Size of the batch is defined as the sum of the cached sizes in Data. Reducing this amount (by removing items, or potentially reducing the Size in a BatchItem) will reduce the effective Size of this Batch, but adding to Data cannot increase the Size of the batch.

type BatchItem

type BatchItem struct {
	Item any
	Size int
}

BatchItem is just a container for the user-provided work items.

This includes the Size of `Data` at the time the work item was added to the buffer. This will not be modified by Buffer, but can be adjusted by your application while handling a Batch if your handler needs to trim this somehow.

type BlockNewItems

type BlockNewItems struct {
	// The maximum number of items that this Buffer is allowed to house (including
	// both leased and unleased items).
	//
	// Default: -1 if BatchItemsMax != -1 else max(1000, BatchItemsMax)
	// Required: Must be >= BatchItemsMax or -1 (only MaxSize applies)
	MaxItems int

	// The maximum* number of 'size units' that this Buffer is allowed to house
	// (including both leased and unleased items).
	//
	// NOTE(*): This only blocks addition of new items once the buffer is
	// at-or-past capacitiy. e.g. if the buffer has a MaxSize of 1000 and
	// a BatchSizeMax of 100, and you insert items worth 999 units, this will
	// still allow the addition of another item (of up to 100 units) before
	// claiming the buffer is full.
	//
	// NOTE: This may only be set if BatchSizeMax is > 0; Otherwise buffer will
	// not enforce item sizes, and so BlockNewItems will not be able to enforce
	// this policy.
	//
	// Default: -1 if BatchSizeMax != -1 else BatchSizeMax * 5
	// Required: Must be >= BatchSizeMax or -1 (only MaxItems applies)
	MaxSize int
}

BlockNewItems prevents the Buffer from accepting any new items as long as it it has MaxItems worth of items.

This will never drop batches, but will block inserts.

func (*BlockNewItems) Check

func (b *BlockNewItems) Check(opts Options) (err error)

Check implements FullBehavior.Check.

func (*BlockNewItems) ComputeState

func (b *BlockNewItems) ComputeState(stats Stats) (okToInsert, dropBatch bool)

ComputeState implements FullBehavior.ComputeState.

func (*BlockNewItems) NeedsItemSize

func (b *BlockNewItems) NeedsItemSize() bool

NeedsItemSize implements FullBehavior.NeedsItemSize.

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

Buffer batches individual data items into Batch objects.

All access to the Buffer (as well as invoking ACK/NACK on LeasedBatches) must be synchronized because Buffer is not goroutine-safe.

func NewBuffer

func NewBuffer(o *Options) (*Buffer, error)

NewBuffer returns a new Buffer configured with the given Options.

If there's an issue with the provided Options, this returns an error.

func (*Buffer) ACK

func (buf *Buffer) ACK(leased *Batch)

ACK records that all the items in the batch have been processed.

The Batch is no longer tracked in any form by the Buffer.

Calling ACK/NACK on the same Batch twice will panic. Calling ACK/NACK on a Batch not returned from LeaseOne will panic.

func (*Buffer) AddNoBlock

func (buf *Buffer) AddNoBlock(now time.Time, item any, itemSize int) (dropped *Batch, err error)

AddNoBlock adds the item to the Buffer with the given size.

Possibly drops a batch according to FullBehavior.

Returns an error under the following conditions:

  • ErrBufferFull - If FullBehavior.ComputeState returns okToInsert=false.
  • ErrItemTooLarge - If this buffer has a BatchSizeMax configured and `itemSize` is larger than this.
  • ErrItemTooSmall - If this buffer has a BatchSizeMax configured and `itemSize` is zero, or if `itemSize` is negative.

func (*Buffer) CanAddItem

func (buf *Buffer) CanAddItem() bool

CanAddItem returns true iff the Buffer will accept an item from AddNoBlock without returning ErrBufferFull.

func (*Buffer) Flush

func (buf *Buffer) Flush(now time.Time)

Flush causes any buffered-but-not-batched data to be immediately cut into a Batch.

No-op if there's no such data.

func (*Buffer) ForceLeaseAll

func (buf *Buffer) ForceLeaseAll() []*Batch

ForceLeaseAll leases and returns all unleased Batches immediately, regardless of their send times.

This is useful for cancelation scenarios where you no longer want to do full processing on the remaining batches.

NOTE: It's helpful to call Flush before ForceLeaseAll to include the currently buffered, but unbatched, data.

func (*Buffer) LeaseOne

func (buf *Buffer) LeaseOne(now time.Time) (leased *Batch)

LeaseOne returns the most-available-to-send Batch from this Buffer.

The caller must invoke one of ACK/NACK on the Batch. The Batch will count against this Buffer's Stats().Total() until the caller does so.

Returns nil if no batch is available to lease, or if the Buffer has reached MaxLeases.

func (*Buffer) NACK

func (buf *Buffer) NACK(ctx context.Context, err error, leased *Batch)

NACK analyzes the current state of Batch.Data, potentially reducing UnleasedItemCount in the Buffer's if the given Batch.Data length is smaller than when the Batch was originally leased.

The Batch will be re-enqueued unless:

  • The Batch's retry Iterator returns retry.Stop
  • The Batch has been dropped already due to FullBehavior policy. If this is the case, AddNoBlock would already have returned this *Batch pointer to you.

Calling ACK/NACK on the same Batch twice will panic. Calling ACK/NACK on a Batch not returned from LeaseOne will panic.

func (*Buffer) NextSendTime

func (buf *Buffer) NextSendTime() time.Time

NextSendTime returns the send time for the next-most-available-to-send Batch, or a Zero time.Time if no batches are available to send.

NOTE: Because LeaseOne enforces MaxLeases, this time may not guarantee an available lease.

func (*Buffer) Stats

func (buf *Buffer) Stats() Stats

Stats returns information about the Buffer's state.

type DropOldestBatch

type DropOldestBatch struct {
	// The maximum number of leased and unleased items that the Buffer may have
	// before dropping data.
	//
	// Once a batch is dropped, it no longer counts against MaxLiveItems, but it
	// may still be in memory if the dropped batch was currently leased.
	//
	// NOTE: The maximum Stats.Total number of items the Buffer could have at
	// a given time is:
	//
	//     MaxLiveItems + (BatchItemsMax * MaxLeases)
	//
	// Default: -1 if BatchItemsMax == -1 else max(1000, BatchItemsMax)
	// Required: Must be >= BatchItemsMax or -1 (only MaxLiveSize applies)
	MaxLiveItems int

	// The maximum number of leased and unleased size units that the Buffer may
	// have before dropping data.
	//
	// Once a batch is dropped, it no longer counts against MaxLiveSize, but it
	// may still be in memory if the dropped batch was currently leased at the
	// time it was dropped.
	//
	// NOTE: The maximum Stats.TotalSize the Buffer could have at a given
	// time is:
	//
	//     MaxLiveSize + (BatchSizeMax * MaxLeases)
	//
	// NOTE: This may only be set if BatchSizeMax is > 0; Otherwise buffer will
	// not size inserted items, and so DropOldestBatch will not be able to enforce
	// this policy.
	//
	// Default: -1 if BatchSizeMax == -1 else BatchSizeMax * 5
	// Required: Must be >= BatchSizeMax or -1 (only MaxLiveItems applies)
	MaxLiveSize int
}

DropOldestBatch will drop buffered data whenever the number of unleased items plus leased items would grow beyond MaxLiveItems.

This will never block inserts, but will drop batches.

func (*DropOldestBatch) Check

func (d *DropOldestBatch) Check(opts Options) (err error)

Check implements FullBehavior.Check.

func (*DropOldestBatch) ComputeState

func (d *DropOldestBatch) ComputeState(stats Stats) (okToInsert, dropBatch bool)

ComputeState implements FullBehavior.ComputeState.

func (*DropOldestBatch) NeedsItemSize

func (d *DropOldestBatch) NeedsItemSize() bool

NeedsItemSize implements FullBehavior.NeedsItemSize.

type FullBehavior

type FullBehavior interface {
	// Check inspects Options to see if it's compatible with this FullBehavior
	// implementation.
	//
	// Called exactly once during Buffer creation before any other methods are
	// used.
	Check(Options) error

	// ComputeState evaluates the state of the Buffer (via Stats) and returns:
	//
	//   * okToInsert - User can add an item without blocking.
	//   * dropBatch - Buffer needs to drop the oldest batch if the user does
	//     insert data.
	//
	// Called after Check.
	ComputeState(stats Stats) (okToInsert, dropBatch bool)

	// NeedsItemSize should return true iff this FullBehavior requires item sizes
	// to effectively apply its policy.
	//
	// Called after Check.
	NeedsItemSize() bool
}

FullBehavior allows you to customize the Buffer's behavior when it gets too full.

Generally you'll pick one of DropOldestBatch or BlockNewItems.

type InfiniteGrowth

type InfiniteGrowth struct{}

InfiniteGrowth will not drop data or block new items. It just grows until your computer runs out of memory.

This will never block inserts, and will not drop batches.

func (InfiniteGrowth) Check

func (i InfiniteGrowth) Check(opts Options) (err error)

Check implements FullBehavior.Check.

func (InfiniteGrowth) ComputeState

func (i InfiniteGrowth) ComputeState(Stats) (okToInsert, dropBatch bool)

ComputeState implements FullBehavior.ComputeState.

func (InfiniteGrowth) NeedsItemSize

func (i InfiniteGrowth) NeedsItemSize() bool

NeedsItemSize implements FullBehavior.NeedsItemSize.

type Options

type Options struct {
	// [OPTIONAL] The maximum number of outstanding leases permitted.
	//
	// Attempting additional leases (with LeaseOne) while at the maximum will
	// return nil.
	//
	// Requirement: Must be > 0
	MaxLeases int

	// [OPTIONAL] The maximum number of items to allow in a Batch before making it
	// available to lease.
	//
	// Special value -1: unlimited
	// Requirement: Must be == -1 (i.e. cut batches on BatchAgeMax/BatchSizeMax),
	// or > 0
	BatchItemsMax int

	// [OPTIONAL] The maximum number of "size units" to allow in a Batch before
	// making it available to lease.
	//
	// The units used here are arbitrary and are only checked vs the value
	// provided to AddNoBlock.
	//
	// Size is explicitly provided to AddNoBlock by the caller.
	//
	// Inserting an item which exceeds BatchSizeMax will result in ErrItemTooLarge.
	// It's up to the caller to ensure that this doesn't happen.
	//
	// Special value -1: unlimited
	// Requirement: Must be == -1 (i.e. cut batches on BatchAgeMax/BatchItemsMax),
	// or > 0
	BatchSizeMax int

	// [OPTIONAL] The maximum amount of time to wait before queuing a Batch for
	// transmission. Note that batches are only cut by time when a worker is ready
	// to process them (i.e. LeaseOne is invoked).
	//
	// Requirement: Must be > 0
	BatchAgeMax time.Duration

	// [OPTIONAL] Sets the policy for the Buffer around how many items the Buffer
	// is allowed to hold, and what happens when that number is reached.
	FullBehavior FullBehavior

	// [OPTIONAL] If true, ensures that the next available batch is always the one
	// with the oldest data.
	//
	// If this is false (the default), batches will be leased in the order that
	// they're available to send; If a Batch has a retry with a high delay, it's
	// possible that the next leased Batch actually contains newer data than
	// a later batch.
	//
	// NOTE: if this is combined with high Retry values, it can lead to a
	// head-of-line blocking situation.
	//
	// Requirement: May only be true if MaxLeases == 1
	FIFO bool

	// [OPTIONAL] Each batch will have a retry.Iterator assigned to it from this
	// retry.Factory.
	//
	// When a Batch is NACK'd, it will be retried at "now" plus the Duration
	// returned by the retry.Iterator.
	//
	// If the retry.Iterator returns retry.Stop, the Batch will be silently
	// dropped.
	Retry retry.Factory
}

Options configures policy for the Buffer.

See Defaults for default values.

type Stats

type Stats struct {
	// UnleasedItemCount is the total number of items (i.e. objects passed to
	// AddNoBlock) which are currently owned by the Buffer but are not currently
	// leased. This includes:
	//    * Items buffered, but not yet cut into a Batch.
	//    * Items in unleased Batches.
	//
	// UnleasedItemSize is the size in 'size units' of the same items.
	UnleasedItemCount int
	UnleasedItemSize  int

	// LeasedItemCount is the total number of items (i.e. objects passed to
	// AddNoBlock) which are currently owned by the Buffer and are in active
	// leases.
	//
	// LeasedItemSize is the size in 'size units' of the same items.
	LeasedItemCount int
	LeasedItemSize  int

	// DroppedLeasedItemCount is the total number of items (i.e. objects passed to
	// AddNoBlock) which were part of leases, but where those leases have been
	// dropped (due to FullBehavior policy), but have not yet been ACK/NACK'd.
	//
	// Once these batches are ACK/NACK'd they'll be dropped from Stats entirely.
	//
	// DroppedLeasedItemSize is the size in 'size units' of the same items.
	DroppedLeasedItemCount int
	DroppedLeasedItemSize  int
}

Stats is a block of information about the Buffer's present state.

func (Stats) Empty

func (s Stats) Empty() bool

Empty returns true iff the Buffer is totally empty (has zero user-provided items).

func (Stats) Total

func (s Stats) Total() int

Total returns the total number of items currently referenced by the Buffer.

func (Stats) TotalSize

func (s Stats) TotalSize() int

TotalSize returns the total number of items currently referenced by the Buffer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL