fanoutbuffer

package
v0.0.0-...-5c79d48 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2024 License: AGPL-3.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrBufferClosed = errors.New("fanout buffer closed")

ErrBufferClosed is an error indicating that the event buffer as a whole has been closed.

View Source
var ErrGracePeriodExceeded = errors.New("failed to process fanout buffer backlog within grace period")

ErrGracePeriodExceeded is an error returned by Cursor.Read indicating that the cursor fell too far behind. Observing this error indicates that either the reader is too slow, or the buffer has been configured with insufficient capacity/backlog for the usecase.

View Source
var ErrUseOfClosedCursor = errors.New("use of closed fanout buffer cursor (this is a bug)")

ErrUseOfClosedCursor is an error indicating that Cursor.Read was called after the cursor had either been explicitly closed, or had previously returned an error.

Functions

This section is empty.

Types

type Buffer

type Buffer[T any] struct {
	// contains filtered or unexported fields
}

Buffer is a circular buffer that keeps track of how many cursors exist, and how many have seen each item, so that it knows when items can be cleared. If one or more cursors fall behind, the items they have yet to see go into a temporary backlog of infinite size. If the backlog persists for greater than the allowed grace period, it is cleared and all cursors still in the backlog fail on their next attempt to read.

func NewBuffer

func NewBuffer[T any](cfg Config) *Buffer[T]

NewBuffer creates a new fanout buffer instance.

func (*Buffer[T]) Append

func (b *Buffer[T]) Append(items ...T)

Append appends to the buffer and wakes all dormant cursors.

func (*Buffer[T]) Close

func (b *Buffer[T]) Close()

Close permanently closes the fanout buffer. Note that all cursors are terminated immediately and may therefore miss items that had not been read at the time Close is called.

func (*Buffer[T]) NewCursor

func (b *Buffer[T]) NewCursor() *Cursor[T]

NewCursor gets a new cursor into the buffer. Cursors *must* be closed as soon as they are no longer being read from. Stale cursors may cause performance degredation.

type Config

type Config struct {
	// Capacity is the capacity to allocate for the main circular buffer. Capacity should be selected s.t. cursors rarely
	// fall behind capacity during normal operation.
	Capacity uint64

	// GracePeriod is the amount of time a backlog (beyond the specified capacity) will be allowed to persist. Longer grace
	// periods give cursors more time to catch up in the event of spikes, at the cost of higher potential memory usage and
	// longer waits before unhealthy cursors are ejected.
	GracePeriod time.Duration

	// Clock is used to override default time-behaviors in tests.
	Clock clockwork.Clock
}

Config holds all configuration parameters for the fanout buffer. All parameters are optional.

func (*Config) SetDefaults

func (c *Config) SetDefaults()

SetDefaults sets default config parameters.

type Cursor

type Cursor[T any] struct {
	// contains filtered or unexported fields
}

Cursor is a cursor into a fanout buffer. Cursor's *must* be closed if not being actively read to avoid buffer performance degredation. Cursors are not intended for concurrent use (though they are "safe", concurrent calls may block longer than expected due to the lock being held across blocking reads).

func (*Cursor[T]) Close

func (c *Cursor[T]) Close() error

Close closes the cursor. Close is safe to double-call and should be called as soon as possible if the cursor is no longer in use.

func (*Cursor[T]) Read

func (c *Cursor[T]) Read(ctx context.Context, out []T) (n int, err error)

Read blocks until items become available and then reads them into the supplied buffer, returning the number of items that were read. Buffer size should be selected based on the expected throughput.

func (*Cursor[T]) TryRead

func (c *Cursor[T]) TryRead(out []T) (n int, err error)

TryRead performs a non-blocking read. returns (0, nil) if no output is available.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL