concurrent

package
v0.0.0-...-2bb2ce4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 29, 2022 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var RandomOpts = RetryOpts{
	BaseTimeout:     defaultBaseTimeout,
	BaseExp:         defaultBaseExp,
	MaxRetryTimeout: defaultMaxRetryTimeout,
	Attempts:        defaultAttempts,
	Random:          true,
}

RandomOpts is the default configuration for a Retry with random settings

Functions

func Batch

func Batch(
	ctx context.Context,
	inC <-chan Supplier,
) <-chan BatchResult

Batch operations provided by the input channel and return the results of those operations on the output channel

func BatchWithOpts

func BatchWithOpts(
	ctx context.Context,
	inC <-chan Supplier,
	opts BatchOpts,
) <-chan BatchResult

BatchWithOpts operations provided by the input channel and return the results of those operations on the output channel. It allows to specify the configuration options on how the batch will be run

func Retry

func Retry(ctx context.Context, supplier Supplier) (interface{}, error)

Retry is the same operation as RetryWithOpts but in this case the default values for RetryOpts are used

func RetryRandom

func RetryRandom(ctx context.Context, supplier Supplier) (interface{}, error)

RetryRandom is the same operation as RetryWithOpts but in this case the default values for RetryOpts are used with random exponential backoff

func RetryWithOpts

func RetryWithOpts(
	ctx context.Context,
	supplier Supplier,
	opts RetryOpts,
) (interface{}, error)

RetryWithOpts is an implementation of an exponential back off retry operation for a supplier. It keeps retrying the operation until the maximum number of attempts has been reached, in which case it returns the associated error, or until it succeeds.

Types

type BatchOpts

type BatchOpts struct {
	// Concurrency specificies the maximum number of goroutines
	// that will be used to run all the operations in the batch
	Concurrency int
}

BatchOpts are the options to configure how a batch of operations will be executed

type BatchResult

type BatchResult struct {
	Result
	// contains filtered or unexported fields
}

BatchResult of a batch operation

func BatchSlice

func BatchSlice(
	ctx context.Context,
	in []Supplier,
) []BatchResult

BatchSlice runs as a batch a slice of operations and returns an ordered batch with the results

func BatchSliceWithOpts

func BatchSliceWithOpts(
	ctx context.Context,
	in []Supplier,
	opts BatchOpts,
) []BatchResult

BatchSliceWithOpts runs all the operations in the slice as a batch and equally returns a slice with the results

func (BatchResult) Index

func (r BatchResult) Index() int64

Index is the position of the result within the batch of operations

type BatchRunner

type BatchRunner struct {
	// contains filtered or unexported fields
}

BatchRunner executes a batch of operations until all of them complete, and returns the results in the order in which the suppliers were provided. This is useful to execute a set of operations as a block, and there's a need to wait for the whole results of the block before moving forward.

func NewBatchRunner

func NewBatchRunner() *BatchRunner

NewBatchRunner creates a new instance of a BatchRunner using the default values as configuration

func NewBatchRunnerWithOpts

func NewBatchRunnerWithOpts(opts BatchOpts) *BatchRunner

NewBatchRunnerWithOpts creates a new instance of a BatchRunner with the specified options

func (*BatchRunner) Run

func (r *BatchRunner) Run(
	ctx context.Context,
	inC <-chan Supplier,
) <-chan BatchResult

Run runs all the operations provided by the input channel and returns the results. The results may be returned in a different order compared to the order in which the inputs are received

type CreateWorkerEvent

type CreateWorkerEvent struct {
	Key   string
	Value interface{}
	Opts  *CreateWorkerOpts
}

CreateWorkerEvent is triggered by a master when a new worker is created and available to be sent events to

func (CreateWorkerEvent) WorkerKey

func (e CreateWorkerEvent) WorkerKey() string

WorkerKey implementation of MasterEvent for CreateWorkerEvent

type CreateWorkerOpts

type CreateWorkerOpts struct {
	// WorkerHandler is the handler used by the worker to handle
	// incoming requests
	WorkerHandler WorkerHandler

	// ErrC is an error channel the worker can listen to and report errors
	// though events in case they happen
	ErrC <-chan error

	// UserData is data that the user can attach to the worker in case any
	// external context is required
	UserData interface{}

	// MaxInactivity is the maximum time the worker is allowed to exist
	// without serving any request. When this time expires the worker
	// should destroy itself
	MaxInactivity time.Duration
}

CreateWorkerOpts is the place where a user defined MasterHandler can put the defined properties for a Worker on a CreateWorkerEvent

type DestroyWorkerEvent

type DestroyWorkerEvent struct {
	Worker *Worker
	Key    string
}

DestroyWorkerEvent is triggered by a master when an existing worker is destroyed

func (DestroyWorkerEvent) WorkerKey

func (e DestroyWorkerEvent) WorkerKey() string

WorkerKey implementation of MasterEvent for DestroyWorkerEvent

type ErrCannotRecover

type ErrCannotRecover struct {
	Cause error
}

ErrCannotRecover is an error that can be passed by clients to retry mechanisms so that the attempted action is not retried

func (ErrCannotRecover) Error

func (e ErrCannotRecover) Error() string

Error implementation of error for ErrCannotRecover

type ErrMaxAttemptsReached

type ErrMaxAttemptsReached struct {
	Causes []error
}

ErrMaxAttemptsReached is an error that is returned after attempting an action multiple times with failures

func (ErrMaxAttemptsReached) Error

func (e ErrMaxAttemptsReached) Error() string

Error implementation of error for ErrCannotRecover

type ErrNoOccurrence

type ErrNoOccurrence struct{}

ErrNoOccurrence is returned failing to wait for an event that never occurred

func (ErrNoOccurrence) Error

func (e ErrNoOccurrence) Error() string

type ErrorWorkerEvent

type ErrorWorkerEvent struct {
	Worker *Worker
	Error  error
}

ErrorWorkerEvent is emitted by the worker when an event on the error channel is received

func (ErrorWorkerEvent) GetWorker

func (e ErrorWorkerEvent) GetWorker() *Worker

GetWorker implementation of WorkerEvent for ErrorWorkerEvent

type Master

type Master struct {

	// Error is set in case of exiting with an error
	Error error
	// contains filtered or unexported fields
}

Master manages a set of workers and distributes workers amongst them. It also keeps track of the workers lifetimes

func NewMaster

func NewMaster(opts MasterOpts) *Master

NewMaster creates a new master

func (*Master) Broadcast

func (m *Master) Broadcast(ctx context.Context, req interface{}) ([]Response, error)

Broadcast sends the same request to all workers and waits until a response from each is received

func (*Master) Create

func (m *Master) Create(ctx context.Context, key string, value interface{}) error

Create a new worker

func (*Master) Destroy

func (m *Master) Destroy(ctx context.Context, key string) error

Destroy an existing worker

func (*Master) Execute

func (m *Master) Execute(ctx context.Context, req interface{}) (interface{}, error)

Execute sends a request that will be caught by any worker which is available and execute it

func (*Master) Exists

func (m *Master) Exists(ctx context.Context, key string) (bool, error)

Exists returns true if the worker exists, false otherwise

func (*Master) IsStopped

func (m *Master) IsStopped() bool

IsStopped returns true if the master is not running

func (*Master) Request

func (m *Master) Request(ctx context.Context, key string, req interface{}) (interface{}, error)

Request sends a request to a specific worker and returns back the response

func (*Master) Start

func (m *Master) Start(ctx context.Context) error

Start the master

func (*Master) Stop

func (m *Master) Stop() error

Stop the master and shutdown all the workers that are still running. This method blocks until all the workers have exited

type MasterEvent

type MasterEvent interface {
	WorkerKey() string
}

MasterEvent is the interface implemented by all events triggered by the master and handled for a MasterHandler

type MasterHandler

type MasterHandler interface {
	Handle(ctx context.Context, ev MasterEvent) error
}

MasterHandler is the user defined handler to handle events for the master

type MasterHandlerFunc

type MasterHandlerFunc func(ctx context.Context, ev MasterEvent) error

MasterHandlerFunc is the implementation of MasterHandler for functions

func (MasterHandlerFunc) Handle

Handle implementation of MasterHandler for MasterHandlerFunc

type MasterOpts

type MasterOpts struct {
	// MasterHandler is the handler the master will use to provide access
	// to the master events
	MasterHandler MasterHandler

	// CreateWorkerOnRequest creates a worker if a request is received by
	// a worker and the worker was not created beforehand. Should only be
	// used if a worker does not need a specific request passed on to the
	// CreateWorkerEvent handler
	CreateWorkerOnRequest bool
}

MasterOpts are the properties used by the master to define its behaviour and that of its workers

type PoolInput

type PoolInput struct {
	OutC     chan<- PoolResult
	Supplier Supplier
}

poolInput to a batch operation

type PoolOpts

type PoolOpts struct {
	// Concurrency is the number of Suppliers that the pool can
	// run in parallel at most
	Concurrency int
}

PoolOpts is the optsuration for a PoolRunner

type PoolResult

type PoolResult struct {
	Result
}

PoolResult of a batch operation

type PoolRunner

type PoolRunner struct {
	// contains filtered or unexported fields
}

PoolRunner has a fixed number of goroutines that are used to run an arbitrary number of tasks. A PoolRunner is a convenient way to execute multiple operations in parallel having control on how many go routines are run in the system.

func NewPoolRunner

func NewPoolRunner(ctx context.Context) *PoolRunner

NewPoolRunner creates and starts a new PoolRunner with the default optsuration parameters

func NewPoolRunnerWithOpts

func NewPoolRunnerWithOpts(ctx context.Context, opts PoolOpts) *PoolRunner

NewPoolRunnerWithOpts creates a new PoolRunner with the specified optsuration

func (*PoolRunner) Run

func (r *PoolRunner) Run(input PoolInput) error

func (*PoolRunner) Stop

func (r *PoolRunner) Stop()

Stop orderly stops all the goroutines in the PoolRunner and returns once all the goroutines have exited

type RequestWorkerEvent

type RequestWorkerEvent struct {
	Worker *Worker
	Value  interface{}
}

RequestWorkerEvent is emitted by the worker when a request is received by the worker

func (RequestWorkerEvent) GetWorker

func (e RequestWorkerEvent) GetWorker() *Worker

GetWorker implementation of WorkerEvent for ErrorWorkerEvent

type Response

type Response struct {
	Value interface{}
	Key   string
	Error error
}

type Result

type Result struct {
	// contains filtered or unexported fields
}

Result of a supplier

func (Result) Err

func (r Result) Err() error

Err returned by the supplier

func (Result) Value

func (r Result) Value() interface{}

Value returned by the supplier

type RetryOpts

type RetryOpts struct {
	// Random sets the retry to wait a random time based on the
	// exponential back off
	Random bool

	// UnlimitedAttempts when set to true, Attempts will be ignored
	// and the action will be retried until it succeeds or the context
	// stops
	UnlimitedAttempts bool

	// Attempts is the maximum number of attempts allowed by a
	// Retry operation
	Attempts uint8

	// BaseExp is the base exponent for the calculation of the next
	// time an attempt must be triggered using exponential backoff
	BaseExp uint8

	// BaseTimeout is the initial timeout used after the first
	// attempt fails
	BaseTimeout time.Duration

	// MaxRetryTimeout sets an upper bound into the time that
	// the retry will wait until attempting an operation again.
	MaxRetryTimeout time.Duration
}

RetryOpts is the configuration parameters for the Retry concurrent utility. Look at RetryWithOpts for more information

type Supplier

type Supplier interface {
	Supply() (interface{}, error)
}

Supplier defines an arbitrary operation

type SupplierFunc

type SupplierFunc func() (interface{}, error)

SupplierFunc allows a function to act as a Supplier

func (SupplierFunc) Supply

func (f SupplierFunc) Supply() (interface{}, error)

Implementation of Supplier interface for SupplierFunc.

type Worker

type Worker struct {

	// SharedC is the shared channel between the master and the worker
	// for requests met by the worker who is available
	SharedC <-chan executeRequest

	// C is the channel the worker only reads from
	C chan workerRequest

	// ShutdownC is a channel used by the worker to signal that it
	// has been completely shutdown and removed
	ShutdownC chan error

	// ErrC is an error channel the worker can listen to and report errors
	// though events in case they happen
	ErrC <-chan error

	// UserData is data that the user can attach to the worker in case any
	// external context is required
	UserData interface{}
	// contains filtered or unexported fields
}

Worker handles requests issued by the master in a separate goroutine and gives back results. Its lifetime is managed by the Master

func NewWorker

func NewWorker(ctx context.Context, opts WorkerOpts) *Worker

NewWorker creates a new worker instance

type WorkerEvent

type WorkerEvent interface {
	GetWorker() *Worker
}

WorkerEvent is the interface defined for events that the worker emits

type WorkerHandler

type WorkerHandler interface {
	Handle(ctx context.Context, req WorkerEvent) (interface{}, error)
}

WorkerHandler is the user defined handler to handle events targeting a worker

type WorkerHandlerFunc

type WorkerHandlerFunc func(ctx context.Context, ev WorkerEvent) (interface{}, error)

WorkerHandlerFunc is the implementation of MasterHandler for functions

func (WorkerHandlerFunc) Handle

func (f WorkerHandlerFunc) Handle(ctx context.Context, ev WorkerEvent) (interface{}, error)

Handle implementation of WorkerHandler for WorkerHandlerFunc

type WorkerOpts

type WorkerOpts struct {
	// Key that uniquely identifies the worker
	Key string

	// DoneC is a write once channel the worker uses to notify the master
	// that the worker has exited
	DoneC chan<- workerDestroyed

	// WorkerHandler is the handler used by the worker to handle
	// incoming requests
	WorkerHandler WorkerHandler

	// C is the channel the worker gets requests from
	C chan workerRequest

	// ErrC is an error channel the worker can listen to and report errors
	// though events in case they happen
	ErrC <-chan error

	// SharedC is the shared channel between the master and the worker
	// for requests met by the worker who is available
	SharedC <-chan executeRequest

	// UserData is data that the user can attach to the worker in case any
	// external context is required
	UserData interface{}

	// MaxInactivity is the maximum time the worker is allowed to exist
	// without serving any request. When this time expires the worker
	// should destroy itself
	MaxInactivity time.Duration
}

WorkerOpts are the properties used to construct a worker instance

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL