pool

package
v0.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2023 License: MIT Imports: 6 Imported by: 7

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithBandwidth

func WithBandwidth(bandwidth int) opt

WithBandwidth overrides the default bandwidth value

func WithLogger

func WithLogger(logger util.Logger) opt

WithLogger overrides the default logger

func WithOutputChannel

func WithOutputChannel() opt

WithOutputChannel configures the WorkerPool to push results to a channel for external consumption

func WithThrottler

func WithThrottler(tt *Throttler) opt

WithThrottler specifies a throttler for controlling workload

Types

type ErrHandler

type ErrHandler func(err error)

ErrHandler handles an error result from a Runner

type FeedTransformer

type FeedTransformer func(res interface{}) Runner

FeedTransformer transforms a generic input into a Runner

type ResultSet

type ResultSet map[string]interface{}

ResultSet is a set of results accumulated from a group

type Runner

type Runner func(ctx context.Context) (interface{}, error)

Runner is an executable function that runs as a job

type Throttler

type Throttler struct {
	// contains filtered or unexported fields
}

func NewThrottler

func NewThrottler(burst int, duration time.Duration) *Throttler

func (*Throttler) Start

func (t *Throttler) Start(ctx context.Context) error

func (*Throttler) Stop

func (t *Throttler) Stop()

func (*Throttler) WaitForGo

func (t *Throttler) WaitForGo()

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool is a configurable container for running concurrent tasks, both as one-offs and in groups with a receipt signal

func NewWorkerPool

func NewWorkerPool(id string, opts ...opt) *WorkerPool

NewWorkerPool instantiates a worker pool with default options

func (*WorkerPool) FlushAndRestart

func (wp *WorkerPool) FlushAndRestart()

func (*WorkerPool) Insights

func (wp *WorkerPool) Insights() map[string]int

func (*WorkerPool) PushGroup

func (wp *WorkerPool) PushGroup(fns map[string]Runner, wg *sync.WaitGroup)

PushGroup queues a group of Runners for execution, with a receipt signal to be sent to the supplied receiptWg when all Runners are completed

func (*WorkerPool) PushJob

func (wp *WorkerPool) PushJob(fn Runner, wg *sync.WaitGroup)

PushJob queues a one-off job for execution

func (*WorkerPool) Results

func (wp *WorkerPool) Results() <-chan result

Results gives public access to a channel that will receive results as they are processed; requires that the WithOutputChannel() option be passed to the constructor for proper functionality

func (*WorkerPool) SetGroupInputFeed added in v0.0.9

func (wp *WorkerPool) SetGroupInputFeed(feed <-chan result, groupMap map[string]FeedTransformer)

SetGroupInputFeed configures the workerpool to receive jobs from an input channel, with "transformer" methods that convert a generic input interface into a Runner; with the runners executed as a group

func (*WorkerPool) SetInputFeed

func (wp *WorkerPool) SetInputFeed(feed <-chan result, transformers ...FeedTransformer)

SetInputFeed configures the workerpool to receive jobs from an input channel, with "transformer" methods that convert a generic input interface into a Runner

func (*WorkerPool) Start

func (wp *WorkerPool) Start(parentCtx context.Context) error

Start initializes workers and readies the worker pool to receive jobs and groups

func (*WorkerPool) Stop

func (wp *WorkerPool) Stop()

Stop performs a graceful shutdown of all workers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL