Documentation ¶
Index ¶
- Variables
- func Batch(ctx context.Context, inC <-chan Supplier) <-chan BatchResult
- func BatchWithOpts(ctx context.Context, inC <-chan Supplier, opts BatchOpts) <-chan BatchResult
- func Retry(ctx context.Context, supplier Supplier) (interface{}, error)
- func RetryRandom(ctx context.Context, supplier Supplier) (interface{}, error)
- func RetryWithOpts(ctx context.Context, supplier Supplier, opts RetryOpts) (interface{}, error)
- type BatchOpts
- type BatchResult
- type BatchRunner
- type CreateWorkerEvent
- type CreateWorkerOpts
- type DestroyWorkerEvent
- type ErrCannotRecover
- type ErrMaxAttemptsReached
- type ErrNoOccurrence
- type ErrorWorkerEvent
- type Master
- func (m *Master) Broadcast(ctx context.Context, req interface{}) ([]Response, error)
- func (m *Master) Create(ctx context.Context, key string, value interface{}) error
- func (m *Master) Destroy(ctx context.Context, key string) error
- func (m *Master) Execute(ctx context.Context, req interface{}) (interface{}, error)
- func (m *Master) Exists(ctx context.Context, key string) (bool, error)
- func (m *Master) IsStopped() bool
- func (m *Master) Request(ctx context.Context, key string, req interface{}) (interface{}, error)
- func (m *Master) Start(ctx context.Context) error
- func (m *Master) Stop() error
- type MasterEvent
- type MasterHandler
- type MasterHandlerFunc
- type MasterOpts
- type PoolInput
- type PoolOpts
- type PoolResult
- type PoolRunner
- type RequestWorkerEvent
- type Response
- type Result
- type RetryOpts
- type Supplier
- type SupplierFunc
- type Worker
- type WorkerEvent
- type WorkerHandler
- type WorkerHandlerFunc
- type WorkerOpts
Constants ¶
This section is empty.
Variables ¶
var RandomOpts = RetryOpts{ BaseTimeout: defaultBaseTimeout, BaseExp: defaultBaseExp, MaxRetryTimeout: defaultMaxRetryTimeout, Attempts: defaultAttempts, Random: true, }
RandomOpts is the default configuration for a Retry with random settings
Functions ¶
func Batch ¶
func Batch( ctx context.Context, inC <-chan Supplier, ) <-chan BatchResult
Batch operations provided by the input channel and return the results of those operations on the output channel
func BatchWithOpts ¶
func BatchWithOpts( ctx context.Context, inC <-chan Supplier, opts BatchOpts, ) <-chan BatchResult
BatchWithOpts operations provided by the input channel and return the results of those operations on the output channel. It allows to specify the configuration options on how the batch will be run
func Retry ¶
Retry is the same operation as RetryWithOpts but in this case the default values for RetryOpts are used
func RetryRandom ¶
RetryRandom is the same operation as RetryWithOpts but in this case the default values for RetryOpts are used with random exponential backoff
func RetryWithOpts ¶
RetryWithOpts is an implementation of an exponential back off retry operation for a supplier. It keeps retrying the operation until the maximum number of attempts has been reached, in which case it returns the associated error, or until it succeeds.
Types ¶
type BatchOpts ¶
type BatchOpts struct { // Concurrency specificies the maximum number of goroutines // that will be used to run all the operations in the batch Concurrency int }
BatchOpts are the options to configure how a batch of operations will be executed
type BatchResult ¶
type BatchResult struct { Result // contains filtered or unexported fields }
BatchResult of a batch operation
func BatchSlice ¶
func BatchSlice( ctx context.Context, in []Supplier, ) []BatchResult
BatchSlice runs as a batch a slice of operations and returns an ordered batch with the results
func BatchSliceWithOpts ¶
func BatchSliceWithOpts( ctx context.Context, in []Supplier, opts BatchOpts, ) []BatchResult
BatchSliceWithOpts runs all the operations in the slice as a batch and equally returns a slice with the results
func (BatchResult) Index ¶
func (r BatchResult) Index() int64
Index is the position of the result within the batch of operations
type BatchRunner ¶
type BatchRunner struct {
// contains filtered or unexported fields
}
BatchRunner executes a batch of operations until all of them complete, and returns the results in the order in which the suppliers were provided. This is useful to execute a set of operations as a block, and there's a need to wait for the whole results of the block before moving forward.
func NewBatchRunner ¶
func NewBatchRunner() *BatchRunner
NewBatchRunner creates a new instance of a BatchRunner using the default values as configuration
func NewBatchRunnerWithOpts ¶
func NewBatchRunnerWithOpts(opts BatchOpts) *BatchRunner
NewBatchRunnerWithOpts creates a new instance of a BatchRunner with the specified options
func (*BatchRunner) Run ¶
func (r *BatchRunner) Run( ctx context.Context, inC <-chan Supplier, ) <-chan BatchResult
Run runs all the operations provided by the input channel and returns the results. The results may be returned in a different order compared to the order in which the inputs are received
type CreateWorkerEvent ¶
type CreateWorkerEvent struct { Key string Value interface{} Opts *CreateWorkerOpts }
CreateWorkerEvent is triggered by a master when a new worker is created and available to be sent events to
func (CreateWorkerEvent) WorkerKey ¶
func (e CreateWorkerEvent) WorkerKey() string
WorkerKey implementation of MasterEvent for CreateWorkerEvent
type CreateWorkerOpts ¶
type CreateWorkerOpts struct { // WorkerHandler is the handler used by the worker to handle // incoming requests WorkerHandler WorkerHandler // ErrC is an error channel the worker can listen to and report errors // though events in case they happen ErrC <-chan error // UserData is data that the user can attach to the worker in case any // external context is required UserData interface{} // MaxInactivity is the maximum time the worker is allowed to exist // without serving any request. When this time expires the worker // should destroy itself MaxInactivity time.Duration }
CreateWorkerOpts is the place where a user defined MasterHandler can put the defined properties for a Worker on a CreateWorkerEvent
type DestroyWorkerEvent ¶
DestroyWorkerEvent is triggered by a master when an existing worker is destroyed
func (DestroyWorkerEvent) WorkerKey ¶
func (e DestroyWorkerEvent) WorkerKey() string
WorkerKey implementation of MasterEvent for DestroyWorkerEvent
type ErrCannotRecover ¶
type ErrCannotRecover struct {
Cause error
}
ErrCannotRecover is an error that can be passed by clients to retry mechanisms so that the attempted action is not retried
func (ErrCannotRecover) Error ¶
func (e ErrCannotRecover) Error() string
Error implementation of error for ErrCannotRecover
type ErrMaxAttemptsReached ¶
type ErrMaxAttemptsReached struct {
Causes []error
}
ErrMaxAttemptsReached is an error that is returned after attempting an action multiple times with failures
func (ErrMaxAttemptsReached) Error ¶
func (e ErrMaxAttemptsReached) Error() string
Error implementation of error for ErrCannotRecover
type ErrNoOccurrence ¶
type ErrNoOccurrence struct{}
ErrNoOccurrence is returned failing to wait for an event that never occurred
func (ErrNoOccurrence) Error ¶
func (e ErrNoOccurrence) Error() string
type ErrorWorkerEvent ¶
ErrorWorkerEvent is emitted by the worker when an event on the error channel is received
func (ErrorWorkerEvent) GetWorker ¶
func (e ErrorWorkerEvent) GetWorker() *Worker
GetWorker implementation of WorkerEvent for ErrorWorkerEvent
type Master ¶
type Master struct { // Error is set in case of exiting with an error Error error // contains filtered or unexported fields }
Master manages a set of workers and distributes workers amongst them. It also keeps track of the workers lifetimes
func (*Master) Broadcast ¶
Broadcast sends the same request to all workers and waits until a response from each is received
func (*Master) Execute ¶
Execute sends a request that will be caught by any worker which is available and execute it
type MasterEvent ¶
type MasterEvent interface {
WorkerKey() string
}
MasterEvent is the interface implemented by all events triggered by the master and handled for a MasterHandler
type MasterHandler ¶
type MasterHandler interface {
Handle(ctx context.Context, ev MasterEvent) error
}
MasterHandler is the user defined handler to handle events for the master
type MasterHandlerFunc ¶
type MasterHandlerFunc func(ctx context.Context, ev MasterEvent) error
MasterHandlerFunc is the implementation of MasterHandler for functions
func (MasterHandlerFunc) Handle ¶
func (f MasterHandlerFunc) Handle(ctx context.Context, ev MasterEvent) error
Handle implementation of MasterHandler for MasterHandlerFunc
type MasterOpts ¶
type MasterOpts struct { // MasterHandler is the handler the master will use to provide access // to the master events MasterHandler MasterHandler // CreateWorkerOnRequest creates a worker if a request is received by // a worker and the worker was not created beforehand. Should only be // used if a worker does not need a specific request passed on to the // CreateWorkerEvent handler CreateWorkerOnRequest bool }
MasterOpts are the properties used by the master to define its behaviour and that of its workers
type PoolInput ¶
type PoolInput struct { OutC chan<- PoolResult Supplier Supplier }
poolInput to a batch operation
type PoolOpts ¶
type PoolOpts struct { // Concurrency is the number of Suppliers that the pool can // run in parallel at most Concurrency int }
PoolOpts is the optsuration for a PoolRunner
type PoolRunner ¶
type PoolRunner struct {
// contains filtered or unexported fields
}
PoolRunner has a fixed number of goroutines that are used to run an arbitrary number of tasks. A PoolRunner is a convenient way to execute multiple operations in parallel having control on how many go routines are run in the system.
func NewPoolRunner ¶
func NewPoolRunner(ctx context.Context) *PoolRunner
NewPoolRunner creates and starts a new PoolRunner with the default optsuration parameters
func NewPoolRunnerWithOpts ¶
func NewPoolRunnerWithOpts(ctx context.Context, opts PoolOpts) *PoolRunner
NewPoolRunnerWithOpts creates a new PoolRunner with the specified optsuration
func (*PoolRunner) Run ¶
func (r *PoolRunner) Run(input PoolInput) error
func (*PoolRunner) Stop ¶
func (r *PoolRunner) Stop()
Stop orderly stops all the goroutines in the PoolRunner and returns once all the goroutines have exited
type RequestWorkerEvent ¶
type RequestWorkerEvent struct { Worker *Worker Value interface{} }
RequestWorkerEvent is emitted by the worker when a request is received by the worker
func (RequestWorkerEvent) GetWorker ¶
func (e RequestWorkerEvent) GetWorker() *Worker
GetWorker implementation of WorkerEvent for ErrorWorkerEvent
type RetryOpts ¶
type RetryOpts struct { // Random sets the retry to wait a random time based on the // exponential back off Random bool // UnlimitedAttempts when set to true, Attempts will be ignored // and the action will be retried until it succeeds or the context // stops UnlimitedAttempts bool // Attempts is the maximum number of attempts allowed by a // Retry operation Attempts uint8 // BaseExp is the base exponent for the calculation of the next // time an attempt must be triggered using exponential backoff BaseExp uint8 // BaseTimeout is the initial timeout used after the first // attempt fails BaseTimeout time.Duration // MaxRetryTimeout sets an upper bound into the time that // the retry will wait until attempting an operation again. MaxRetryTimeout time.Duration }
RetryOpts is the configuration parameters for the Retry concurrent utility. Look at RetryWithOpts for more information
type Supplier ¶
type Supplier interface {
Supply() (interface{}, error)
}
Supplier defines an arbitrary operation
type SupplierFunc ¶
type SupplierFunc func() (interface{}, error)
SupplierFunc allows a function to act as a Supplier
func (SupplierFunc) Supply ¶
func (f SupplierFunc) Supply() (interface{}, error)
Implementation of Supplier interface for SupplierFunc.
type Worker ¶
type Worker struct { // for requests met by the worker who is available SharedC <-chan executeRequest // C is the channel the worker only reads from C chan workerRequest // ShutdownC is a channel used by the worker to signal that it // has been completely shutdown and removed ShutdownC chan error // ErrC is an error channel the worker can listen to and report errors // though events in case they happen ErrC <-chan error // UserData is data that the user can attach to the worker in case any // external context is required UserData interface{} // contains filtered or unexported fields }
Worker handles requests issued by the master in a separate goroutine and gives back results. Its lifetime is managed by the Master
type WorkerEvent ¶
type WorkerEvent interface {
GetWorker() *Worker
}
WorkerEvent is the interface defined for events that the worker emits
type WorkerHandler ¶
type WorkerHandler interface {
Handle(ctx context.Context, req WorkerEvent) (interface{}, error)
}
WorkerHandler is the user defined handler to handle events targeting a worker
type WorkerHandlerFunc ¶
type WorkerHandlerFunc func(ctx context.Context, ev WorkerEvent) (interface{}, error)
WorkerHandlerFunc is the implementation of MasterHandler for functions
func (WorkerHandlerFunc) Handle ¶
func (f WorkerHandlerFunc) Handle(ctx context.Context, ev WorkerEvent) (interface{}, error)
Handle implementation of WorkerHandler for WorkerHandlerFunc
type WorkerOpts ¶
type WorkerOpts struct { // Key that uniquely identifies the worker Key string // DoneC is a write once channel the worker uses to notify the master // that the worker has exited DoneC chan<- workerDestroyed // WorkerHandler is the handler used by the worker to handle // incoming requests WorkerHandler WorkerHandler // C is the channel the worker gets requests from C chan workerRequest // ErrC is an error channel the worker can listen to and report errors // though events in case they happen ErrC <-chan error // for requests met by the worker who is available SharedC <-chan executeRequest // UserData is data that the user can attach to the worker in case any // external context is required UserData interface{} // MaxInactivity is the maximum time the worker is allowed to exist // without serving any request. When this time expires the worker // should destroy itself MaxInactivity time.Duration }
WorkerOpts are the properties used to construct a worker instance