com

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2024 License: AGPL-3.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Bulk

func Bulk[T any](
	ctx context.Context, ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T],
) <-chan []T

Bulk reads all values from a channel and streams them in chunks into a returned channel.

func CopyFirst

func CopyFirst[T any](
	ctx context.Context, input <-chan T,
) (first T, forward <-chan T, err error)

CopyFirst asynchronously forwards all items from input to forward and synchronously returns the first item.

func ErrgroupReceive

func ErrgroupReceive(g *errgroup.Group, err <-chan error)

ErrgroupReceive adds a goroutine to the specified group that returns the first non-nil error (if any) from the specified channel. If the channel is closed, it will return nil.

func WaitAsync

func WaitAsync(w Waiter) <-chan error

WaitAsync calls Wait() on the passed Waiter in a new goroutine and sends the first non-nil error (if any) to the returned channel. The returned channel is always closed when the Waiter is done.

Types

type BulkChunkSplitPolicy

type BulkChunkSplitPolicy[T any] func(T) bool

BulkChunkSplitPolicy is a state machine which tracks the items of a chunk a bulker assembles. A call takes an item for the current chunk into account. Output true indicates that the state machine was reset first and the bulker shall finish the current chunk now (not e.g. once $size is reached) without the given item.

func NeverSplit

func NeverSplit[T any]() BulkChunkSplitPolicy[T]

NeverSplit returns a pseudo state machine which never demands splitting.

type BulkChunkSplitPolicyFactory

type BulkChunkSplitPolicyFactory[T any] func() BulkChunkSplitPolicy[T]

type Bulker

type Bulker[T any] struct {
	// contains filtered or unexported fields
}

Bulker reads all values from a channel and streams them in chunks into a Bulk channel.

func NewBulker

func NewBulker[T any](
	ctx context.Context, ch <-chan T, count int, splitPolicyFactory BulkChunkSplitPolicyFactory[T],
) *Bulker[T]

NewBulker returns a new Bulker and starts streaming.

func (*Bulker[T]) Bulk

func (b *Bulker[T]) Bulk() <-chan []T

Bulk returns the channel on which the bulks are delivered.

type Counter

type Counter struct {
	// contains filtered or unexported fields
}

Counter implements an atomic counter.

func (*Counter) Add

func (c *Counter) Add(delta uint64)

Add adds the given delta to the counter.

func (*Counter) Inc

func (c *Counter) Inc()

Inc increments the counter by one.

func (*Counter) Reset

func (c *Counter) Reset() uint64

Reset resets the counter to 0 and returns its previous value. Does not reset the total value returned from Total.

func (*Counter) Total

func (c *Counter) Total() uint64

Total returns the total counter value.

func (*Counter) Val

func (c *Counter) Val() uint64

Val returns the current counter value.

type ProcessBulk

type ProcessBulk[T any] func(ctx context.Context, bulk []T) (err error)

func ForwardBulk

func ForwardBulk[T any](ch chan<- T) ProcessBulk[T]

type Waiter

type Waiter interface {
	Wait() error // Wait waits for execution to complete.
}

Waiter implements the Wait method, which blocks until execution is complete.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL