concurrency

package
v0.0.0-...-04f6dc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 16, 2017 License: MIT Imports: 3 Imported by: 0

Documentation

Overview

Package concurrency provides common concurrency patterns and utilities.

Index

Constants

This section is empty.

Variables

View Source
var Stopped = errors.New("stopped")

Stopped is a special error value is signals that the runnable is stopped

Functions

func NewRetryableError

func NewRetryableError(err error) error

NewRetryableError is a convenience to wrap another error in a retryable

Types

type AsyncRunnable

type AsyncRunnable interface {
	Runnable
	StopWait()
}

AsyncRunnable is a runnable which is can run asynchrounously

type Retryable

type Retryable interface {
	Retryable() bool
}

Retryable is an interface which describes whether something is retryable

type RetryableError

type RetryableError struct {
	Err error
}

RetryableError is an error which is retryable

func (RetryableError) Error

func (e RetryableError) Error() string

func (RetryableError) Retryable

func (e RetryableError) Retryable() bool

Retryable is true

type Runnable

type Runnable interface {
	Start() error
	Stop()
}

Runnable describes something which can start and stop.

type WorkFunc

type WorkFunc func() error

WorkFunc is the worker function

func (WorkFunc) Work

func (f WorkFunc) Work() error

Work adapts WorkFunc to the Worker interface.

type Worker

type Worker interface {
	Work() error
}

Worker is anything that can work

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool is an implementation of a start/stoppable worker pool.

In this implementation, jobs are essentially tokens to perform some work. Jobs are not delivered to the pool, but instead 'claimed' by a worker, and 'returned' when finished (technically, a new job is posted to the queue). In this case, the worker itself is the one which determines the work it should do. This gives a lot of flexibility when combined with implementations of Worker, and maintains a generic but type safe implementation.

It uses a number of channels to control the concurrency: - jobs is a buffered channel that signals that a worker should process a job - results signals that a result was computed by work() - errors collects any errors from work(). An error on the channel will stop the ingester - done is used to signal when the ingester has totally stopped (i.e. all workers drained)

func NewWorkerPool

func NewWorkerPool(logger *zap.Logger, numWorkers int, w Worker) *WorkerPool

NewWorkerPool creates a new worker-pool

func (*WorkerPool) Start

func (s *WorkerPool) Start() error

Start makes the ingester-pool start to process messages.

It continually loops and looks for either a result, in which case it adds another job to the pool to be processed, or an error, in which case it stops the ingester-pool, waits for the workers to drain, then signals that it is done.

func (*WorkerPool) Stop

func (s *WorkerPool) Stop()

Stop signals the ingester-pool to stop processing new messages. Use StopWait to wait until all messages are processed

func (*WorkerPool) StopWait

func (s *WorkerPool) StopWait()

StopWait starts the process of stopping, and waits for all workers to stop before returning.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL