async

package
v0.40.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2022 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrWrongEpoch = errors.New("ring buffer: wrong epoch")

Returned from Push() when the supplied epoch does not match the buffer's current epoch.

Functions

func GoWithCancel

func GoWithCancel(ctx context.Context, g *errgroup.Group, f func(context.Context) error) func()

GoWithCancel runs a context-aware error-returning function on the provided |*errgroup.Group|. It passes the function a child context of the provided |ctx| and returns a |func()| to cancel that child context. If the provided function returns a |context.Canceled| error, then the |*errgroup.Group| will see a return of |ctx.Err()|, instead of the child context's |Err()|.

If the provided function returns or approrpiately wraps a |context.Canceled| error that it sees in processing, this function allows for dispatching cancelable work on an |*errgroup.Group| and canceling that work, without the |*errgroup.Group| itself seeing an |err| and canceling.

Types

type Action

type Action func(ctx context.Context, val interface{}) error

Action is the function called by an ActionExecutor on each given value.

type ActionExecutor

type ActionExecutor struct {
	// contains filtered or unexported fields
}

ActionExecutor is designed for asynchronous workloads that should run when a new task is available. The closest analog would be to have a long-running goroutine that receives from a channel, however ActionExecutor provides three major points of differentiation. The first is that there is no need to close the queue, as goroutines automatically exit when the queue is empty. The second is that a concurrency parameter may be set, that will spin up goroutines as needed until the maximum number is attained. The third is that you don't have to declare the buffer size beforehand as with channels, allowing the queue to respond to demand. You may declare a max buffer though, for RAM-limited situations, which then blocks appends until the buffer is below the max given.

func NewActionExecutor

func NewActionExecutor(ctx context.Context, action Action, concurrency uint32, maxBuffer uint64) *ActionExecutor

NewActionExecutor returns an ActionExecutor that will run the given action on each appended value, and run up to the max number of goroutines as defined by concurrency. If concurrency is 0, then it is set to 1. If maxBuffer is 0, then it is unlimited. Panics on a nil action.

func (*ActionExecutor) Execute

func (aq *ActionExecutor) Execute(val interface{})

Execute adds the value to the end of the queue to be executed. If any action encountered an error before this call, then the value is not added and this returns immediately.

func (*ActionExecutor) WaitForEmpty

func (aq *ActionExecutor) WaitForEmpty() error

WaitForEmpty waits until all the work that has been submitted before the call to |WaitForEmpty| has completed. It returns any errors that any actions may have encountered.

type AsyncReader

type AsyncReader struct {
	// contains filtered or unexported fields
}

AsyncReader is a TableReadCloser implementation that spins up a go routine to keep reading data into a buffer so that it is ready when the caller wants it.

func NewAsyncReader

func NewAsyncReader(rf ReadFunc, bufferSize int) *AsyncReader

NewAsyncReader creates a new AsyncReader

func (*AsyncReader) Close

func (asRd *AsyncReader) Close() error

Close releases resources being held

func (*AsyncReader) Read

func (asRd *AsyncReader) Read() (interface{}, error)

ReadObject reads an object

func (*AsyncReader) Start

func (asRd *AsyncReader) Start(ctx context.Context) error

Start the worker routine reading rows to the channel

type ReadFunc

type ReadFunc func(ctx context.Context) (interface{}, error)

ReadFunc is a function that is called repeatedly in order to retrieve a stream of objects. When all objects have been been read from the stream then (nil, io.EOF) should be returned.

type RingBuffer

type RingBuffer struct {
	// contains filtered or unexported fields
}

RingBuffer is a dynamically sized ring buffer that is thread safe

func NewRingBuffer

func NewRingBuffer(allocSize int) *RingBuffer

NewRingBuffer creates a new RingBuffer instance

func (*RingBuffer) Close

func (rb *RingBuffer) Close() error

Close closes a RingBuffer so that no new items can be pushed onto it. Items that are already in the buffer can still be read via Pop and TryPop. Close will broadcast to all go routines waiting inside Pop

func (*RingBuffer) Pop

func (rb *RingBuffer) Pop() (item interface{}, err error)

Pop will return the next item in the RingBuffer. If there are no items available, Pop will wait until a new item is pushed, or the RingBuffer is closed.

func (*RingBuffer) Push

func (rb *RingBuffer) Push(item interface{}, epoch int) error

Push will add a new item to the RingBuffer and signal a go routine waiting inside Pop that new data is available

func (*RingBuffer) Reset

func (rb *RingBuffer) Reset() int

Reset clears a ring buffer so that it can be reused

func (*RingBuffer) TryPop

func (rb *RingBuffer) TryPop() (item interface{}, ok bool)

TryPop will return the next item in the RingBuffer. If there are no items available TryPop will return immediately with with `ok` set to false.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL