com

package
v0.0.0-...-f0e227c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2024 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Bulk

func Bulk[T any](
	ctx context.Context, ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T],
) <-chan []T

Bulk reads all values from a channel and streams them in chunks into a returned channel.

func CopyFirst

func CopyFirst[T any](ctx context.Context, input <-chan T) (T, <-chan T, error)

CopyFirst asynchronously forwards all items from input to forward and synchronously returns the first item.

func ErrgroupReceive

func ErrgroupReceive(ctx context.Context, g *errgroup.Group, err <-chan error)

ErrgroupReceive adds a goroutine to the specified group that returns the first non-nil error (if any) from the specified channel. If the channel is closed, it will return nil.

func WaitAsync

func WaitAsync(ctx context.Context, w Waiter) <-chan error

WaitAsync calls Wait() on the passed Waiter in a new goroutine and sends the first non-nil error (if any) to the returned channel. The returned channel is always closed when the Waiter is done.

Types

type Atomic

type Atomic[T any] struct {
	// contains filtered or unexported fields
}

Atomic is a type-safe wrapper around atomic.Value.

func (*Atomic[T]) CompareAndSwap

func (a *Atomic[T]) CompareAndSwap(old, new T) (swapped bool)

func (*Atomic[T]) Load

func (a *Atomic[T]) Load() (_ T, ok bool)

func (*Atomic[T]) Store

func (a *Atomic[T]) Store(v T)

func (*Atomic[T]) Swap

func (a *Atomic[T]) Swap(new T) (old T, ok bool)

type BulkChunkSplitPolicy

type BulkChunkSplitPolicy[T any] func(T) bool

BulkChunkSplitPolicy is a state machine which tracks the items of a chunk a bulker assembles. A call takes an item for the current chunk into account. Output true indicates that the state machine was reset first and the bulker shall finish the current chunk now (not e.g. once $size is reached) without the given item.

func NeverSplit

func NeverSplit[T any]() BulkChunkSplitPolicy[T]

NeverSplit returns a pseudo state machine which never demands splitting.

type BulkChunkSplitPolicyFactory

type BulkChunkSplitPolicyFactory[T any] func() BulkChunkSplitPolicy[T]

type Bulker

type Bulker[T any] struct {
	// contains filtered or unexported fields
}

Bulker reads all values from a channel and streams them in chunks into a Bulk channel.

func NewBulker

func NewBulker[T any](
	ctx context.Context, ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T],
) *Bulker[T]

NewBulker returns a new Bulker and starts streaming.

func (*Bulker[T]) Bulk

func (b *Bulker[T]) Bulk() <-chan []T

Bulk returns the channel on which the bulks are delivered.

type Cond

type Cond struct {
	// contains filtered or unexported fields
}

Cond implements a channel-based synchronization for goroutines that wait for signals or send them. Internally based on a controller loop that handles the synchronization of new listeners and signal propagation, which is only started when NewCond is called. Thus the zero value cannot be used.

func NewCond

func NewCond(ctx context.Context) *Cond

NewCond returns a new Cond and starts the controller loop.

func (*Cond) Broadcast

func (c *Cond) Broadcast()

Broadcast sends a signal to all current listeners by closing the previously returned channel from Wait. Panics if the controller loop has already ended.

func (*Cond) Close

func (c *Cond) Close() error

Close stops the controller loop, waits for it to finish, and returns an error if any. Implements the io.Closer interface.

func (*Cond) Done

func (c *Cond) Done() <-chan struct{}

Done returns a channel that will be closed when the controller loop has ended.

func (*Cond) Wait

func (c *Cond) Wait() <-chan struct{}

Wait returns a channel that is closed with the next signal. Panics if the controller loop has already ended.

type Counter

type Counter struct {
	// contains filtered or unexported fields
}

Counter implements an atomic counter.

func (*Counter) Add

func (c *Counter) Add(delta uint64)

Add adds the given delta to the counter.

func (*Counter) Inc

func (c *Counter) Inc()

Inc increments the counter by one.

func (*Counter) Reset

func (c *Counter) Reset() uint64

Reset resets the counter to 0 and returns its previous value. Does not reset the total value returned from Total.

func (*Counter) Total

func (c *Counter) Total() uint64

Total returns the total counter value.

func (*Counter) Val

func (c *Counter) Val() uint64

Val returns the current counter value.

type Waiter

type Waiter interface {
	Wait() error // Wait waits for execution to complete.
}

Waiter implements the Wait method, which blocks until execution is complete.

type WaiterFunc

type WaiterFunc func() error

The WaiterFunc type is an adapter to allow the use of ordinary functions as Waiter. If f is a function with the appropriate signature, WaiterFunc(f) is a Waiter that calls f.

func (WaiterFunc) Wait

func (f WaiterFunc) Wait() error

Wait implements the Waiter interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL