executor

package
v0.0.0-...-c8a214a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2018 License: Apache-2.0 Imports: 14 Imported by: 5

Documentation

Overview

Package executor provides a mechanism for asynchronous execution of tasks, using callbacks to indicate success or failure.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ForEachAttemptResult

func ForEachAttemptResult(f func(AttemptResult))

Types

type AttemptResult

type AttemptResult int

AttemptResult represents whether or not a task attempt (e.g. an individual invocation of a Func passed to Exec) succeeded or failed. Failures may be retried depending on the Executor configuration.

const (
	// AttemptSuccess indicates the attempt succeeded.
	AttemptSuccess AttemptResult = iota

	// AttemptTimeout indicates the attempt timed out. It may be
	// retried, depending on Executor configuration.
	AttemptTimeout

	// AttemptGlobalTimeout indicates the attempt timed out
	// because the overall time out for the task's execution
	// expired.
	AttemptGlobalTimeout

	// AttemptCancellation indicates the task was canceled
	// (typically because another task within an ExecGathered call
	// failed).
	AttemptCancellation

	// AttemptError indicates that the attempt failed because it
	// returned an error.
	AttemptError
)

func (AttemptResult) String

func (r AttemptResult) String() string

String returns a string representation of the AttemptResult.

func (AttemptResult) Valid

func (r AttemptResult) Valid() bool

Valid returns true if the AttemptResult is a valid value.

type CallbackFunc

type CallbackFunc func(Try)

CallbackFunc is invoked at most once to return the result of Func.

type DelayFunc

type DelayFunc func(int) time.Duration

DelayFunc is invoked to compute a new deadline. The value passed is the number of times the action has been previously attempted. It is always greater than or equal to 1.

func NewConstantDelayFunc

func NewConstantDelayFunc(delay time.Duration) DelayFunc

NewConstantDelayFunc creates a DelayFunc where all retries occur after a fixed delay.

func NewExponentialDelayFunc

func NewExponentialDelayFunc(delay time.Duration, maxDelay time.Duration) DelayFunc

NewExponentialDelayFunc creates a new DelayFunc where the first retry occurs after a duration of delay and each subsequent retry is delayed by twice the previous delay. E.g., given a delay of 1s, the delays for retries are 1s, 2s, 4s, 8s, ... The return value is capped at the specified maximum delay. Delays of less than 0 are treated as 0.

type DelayType

type DelayType string

DelayType represents an algorithm for computing retry delays.

const (
	// ConstantDelayType specifies a constant delay between
	// retries.
	ConstantDelayType DelayType = "constant"

	// ExponentialDelayType specifies an exponentially increasing
	// delay between retries.
	ExponentialDelayType DelayType = "exponential"
)

type DiagnosticsCallback

type DiagnosticsCallback interface {
	// A task was accepted for execution. The value is the "width"
	// of the task. The width is one for calls to Exec or
	// ExecAndForget, and the number of Funcs passed to ExecMany
	// or ExecGathered.
	TaskStarted(int)

	// A task completed. The duration is the total time taken
	// executing the task, including delays between retries. Each
	// Func passed to ExecMany or ExecGathered will trigger a call
	// to this function.
	TaskCompleted(AttemptResult, time.Duration)

	// An attempt was started. The duration is the delay between
	// time an attempt was scheduled to start and when it actually
	// started. The attempt may time out before the actual Func is
	// invoked.
	AttemptStarted(time.Duration)

	// An attempt completed. The duration is the amount of time
	// spent executing the attempt.
	AttemptCompleted(AttemptResult, time.Duration)

	// The amount of time spent executing a task's callback.
	CallbackDuration(time.Duration)
}

DiagnosticsCallback provides information about tasks and attempts within an Executor. Typically, this interface is used to record statistics about the Executor.

func NewLoggingDiagnosticsCallback

func NewLoggingDiagnosticsCallback(logger *log.Logger, period time.Duration) DiagnosticsCallback

NewLoggingDiagnosticsCallback creates an implementation of DiagnosticsCallback that logs diagnostics information periodically.

func NewNoopDiagnosticsCallback

func NewNoopDiagnosticsCallback() DiagnosticsCallback

NewNoopDiagnosticsCallback creates an implementation of DiagnosticsCallback that does nothing.

type Executor

type Executor interface {
	// Invoke the Func, possibly in parallel with other
	// invocations. The function's result is ignored.
	ExecAndForget(Func)

	// Invoke the Func, possibly in parallel with other
	// invocations. Calls back with the result of the call at some
	// future point.
	Exec(Func, CallbackFunc)

	// Invoke the given Funcs, possibly in parallel with other
	// invocations. Calls back with the result of each invocation
	// at some future point. If no Funcs are given, the callback
	// is never invoked.
	ExecMany([]Func, ManyCallbackFunc)

	// Invoke the given Funcs, as in ExecMany. Calls back with a
	// Try containing an []interface{} of the successful results
	// or the first error encountered. If no Funcs are given,
	// the callback is invoked with an empty []interface{}.
	ExecGathered([]Func, CallbackFunc)

	// Stop executor activity and release related resources. In
	// progress actions will complete their current
	// attempt. Pending actions and retries are dropped and
	// callbacks are not invoked.
	Stop()

	// Sets the DiagnosticsCallback for this Executor. Must be
	// called before the first invocation of any Exec
	// function. See also the WithDiagnostics Option to
	// NewRetryingExecutor and NewGoroutineExecutor.
	SetDiagnosticsCallback(DiagnosticsCallback)
}

Executor invokes functions asynchronously with callbacks on completion and automatic retries, if configured.

func NewExecutor

func NewExecutor(options ...Option) Executor

NewExecutor constructs a new Executor. The current default executor uses goroutines. See NewGoroutineExecutor.

func NewGoroutineExecutor

func NewGoroutineExecutor(options ...Option) Executor

NewGoroutineExecutor constructs a new Executor. Each task attempt is executed in a new goroutine, but only a fixed number (the parallelism) are allowed to execute at once. By default, the Executor never retries, has parallelism of 1, and a maximum queue depth of 10.

type FromFlags

type FromFlags interface {
	// Returns the configured Executor. Multiple invocations
	// return the same Executor even if the arguments
	// change. DiagnosticsCallback may be nil.
	Make(*log.Logger) Executor
}

FromFlags validates and constructs an Executor from command line flags.

func NewFromFlags

func NewFromFlags(f tbnflag.FlagSet) FromFlags

NewFromFlags constructs a FromFlags with application-agnostic default flag values. Most callers should use NewFromFlagsWithDefaults.

func NewFromFlagsWithDefaults

func NewFromFlagsWithDefaults(
	f tbnflag.FlagSet,
	defaults FromFlagsDefaults,
) FromFlags

NewFromFlagsWithDefaults constructs a FromFlags with application-provided default flag values.

type FromFlagsDefaults

type FromFlagsDefaults struct {
	DelayType      DelayType
	InitialDelay   time.Duration
	MaxDelay       time.Duration
	MaxAttempts    int
	Parallelism    int
	Timeout        time.Duration
	AttemptTimeout time.Duration
}

FromFlagsDefaults represents default values for Executor flags. Values are ignored if they are the zero value for their type.

func (FromFlagsDefaults) DefaultAttemptTimeout

func (defaults FromFlagsDefaults) DefaultAttemptTimeout() time.Duration

DefaultAttemptTimeout returns the default per-attempt timeout. If not overridden, the default attempt timeout is 0 (attempt timeouts disabled).

func (FromFlagsDefaults) DefaultDelayType

func (defaults FromFlagsDefaults) DefaultDelayType() DelayType

DefaultDelayType returns the default delay type. If not overridden the default delay type is ExponentialDelayType.

func (FromFlagsDefaults) DefaultInitialDelay

func (defaults FromFlagsDefaults) DefaultInitialDelay() time.Duration

DefaultInitialDelay returns the default initial delay. If not overridden, the default initial delay is 100 milliseconds.

func (FromFlagsDefaults) DefaultMaxAttempts

func (defaults FromFlagsDefaults) DefaultMaxAttempts() int

DefaultMaxAttempts returns the default maximum number of attempts. If not overridden, the default max attempts is 8.

func (FromFlagsDefaults) DefaultMaxDelay

func (defaults FromFlagsDefaults) DefaultMaxDelay() time.Duration

DefaultMaxDelay returns the default maximum delay. If not overridden, the default maximum delay is 30 seconds.

func (FromFlagsDefaults) DefaultParallelism

func (defaults FromFlagsDefaults) DefaultParallelism() int

DefaultParallelism returns the default parallelism. If not overridden, the default parallelism is 2 times the number of system CPU cores.

func (FromFlagsDefaults) DefaultTimeout

func (defaults FromFlagsDefaults) DefaultTimeout() time.Duration

DefaultTimeout returns the default global timeout. If not overridden, the default timeout is 0 (timeouts disabled).

type Func

type Func func(context.Context) (interface{}, error)

Func is invoked to execute an action. The given Context should be used to make HTTP requests. The function should return as soon as possible if the context's Done channel is closed. Must return a nil error if the action succeeded. Return an error to try again later.

type ManyCallbackFunc

type ManyCallbackFunc func(int, Try)

ManyCallbackFunc invoked at most once each for functions invoked via a single ExecMany call. Each invocation includes the index of the function in ExecMany's array argument.

type MockDiagnosticsCallback

type MockDiagnosticsCallback struct {
	// contains filtered or unexported fields
}

MockDiagnosticsCallback is a mock of DiagnosticsCallback interface

func NewMockDiagnosticsCallback

func NewMockDiagnosticsCallback(ctrl *gomock.Controller) *MockDiagnosticsCallback

NewMockDiagnosticsCallback creates a new mock instance

func (*MockDiagnosticsCallback) AttemptCompleted

func (m *MockDiagnosticsCallback) AttemptCompleted(arg0 AttemptResult, arg1 time.Duration)

AttemptCompleted mocks base method

func (*MockDiagnosticsCallback) AttemptStarted

func (m *MockDiagnosticsCallback) AttemptStarted(arg0 time.Duration)

AttemptStarted mocks base method

func (*MockDiagnosticsCallback) CallbackDuration

func (m *MockDiagnosticsCallback) CallbackDuration(arg0 time.Duration)

CallbackDuration mocks base method

func (*MockDiagnosticsCallback) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockDiagnosticsCallback) TaskCompleted

func (m *MockDiagnosticsCallback) TaskCompleted(arg0 AttemptResult, arg1 time.Duration)

TaskCompleted mocks base method

func (*MockDiagnosticsCallback) TaskStarted

func (m *MockDiagnosticsCallback) TaskStarted(arg0 int)

TaskStarted mocks base method

type MockDiagnosticsCallbackMockRecorder

type MockDiagnosticsCallbackMockRecorder struct {
	// contains filtered or unexported fields
}

MockDiagnosticsCallbackMockRecorder is the mock recorder for MockDiagnosticsCallback

func (*MockDiagnosticsCallbackMockRecorder) AttemptCompleted

func (mr *MockDiagnosticsCallbackMockRecorder) AttemptCompleted(arg0, arg1 interface{}) *gomock.Call

AttemptCompleted indicates an expected call of AttemptCompleted

func (*MockDiagnosticsCallbackMockRecorder) AttemptStarted

func (mr *MockDiagnosticsCallbackMockRecorder) AttemptStarted(arg0 interface{}) *gomock.Call

AttemptStarted indicates an expected call of AttemptStarted

func (*MockDiagnosticsCallbackMockRecorder) CallbackDuration

func (mr *MockDiagnosticsCallbackMockRecorder) CallbackDuration(arg0 interface{}) *gomock.Call

CallbackDuration indicates an expected call of CallbackDuration

func (*MockDiagnosticsCallbackMockRecorder) TaskCompleted

func (mr *MockDiagnosticsCallbackMockRecorder) TaskCompleted(arg0, arg1 interface{}) *gomock.Call

TaskCompleted indicates an expected call of TaskCompleted

func (*MockDiagnosticsCallbackMockRecorder) TaskStarted

func (mr *MockDiagnosticsCallbackMockRecorder) TaskStarted(arg0 interface{}) *gomock.Call

TaskStarted indicates an expected call of TaskStarted

type MockExecutor

type MockExecutor struct {
	// contains filtered or unexported fields
}

MockExecutor is a mock of Executor interface

func NewMockExecutor

func NewMockExecutor(ctrl *gomock.Controller) *MockExecutor

NewMockExecutor creates a new mock instance

func (*MockExecutor) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockExecutor) Exec

func (m *MockExecutor) Exec(arg0 Func, arg1 CallbackFunc)

Exec mocks base method

func (*MockExecutor) ExecAndForget

func (m *MockExecutor) ExecAndForget(arg0 Func)

ExecAndForget mocks base method

func (*MockExecutor) ExecGathered

func (m *MockExecutor) ExecGathered(arg0 []Func, arg1 CallbackFunc)

ExecGathered mocks base method

func (*MockExecutor) ExecMany

func (m *MockExecutor) ExecMany(arg0 []Func, arg1 ManyCallbackFunc)

ExecMany mocks base method

func (*MockExecutor) SetDiagnosticsCallback

func (m *MockExecutor) SetDiagnosticsCallback(arg0 DiagnosticsCallback)

SetDiagnosticsCallback mocks base method

func (*MockExecutor) Stop

func (m *MockExecutor) Stop()

Stop mocks base method

type MockExecutorMockRecorder

type MockExecutorMockRecorder struct {
	// contains filtered or unexported fields
}

MockExecutorMockRecorder is the mock recorder for MockExecutor

func (*MockExecutorMockRecorder) Exec

func (mr *MockExecutorMockRecorder) Exec(arg0, arg1 interface{}) *gomock.Call

Exec indicates an expected call of Exec

func (*MockExecutorMockRecorder) ExecAndForget

func (mr *MockExecutorMockRecorder) ExecAndForget(arg0 interface{}) *gomock.Call

ExecAndForget indicates an expected call of ExecAndForget

func (*MockExecutorMockRecorder) ExecGathered

func (mr *MockExecutorMockRecorder) ExecGathered(arg0, arg1 interface{}) *gomock.Call

ExecGathered indicates an expected call of ExecGathered

func (*MockExecutorMockRecorder) ExecMany

func (mr *MockExecutorMockRecorder) ExecMany(arg0, arg1 interface{}) *gomock.Call

ExecMany indicates an expected call of ExecMany

func (*MockExecutorMockRecorder) SetDiagnosticsCallback

func (mr *MockExecutorMockRecorder) SetDiagnosticsCallback(arg0 interface{}) *gomock.Call

SetDiagnosticsCallback indicates an expected call of SetDiagnosticsCallback

func (*MockExecutorMockRecorder) Stop

func (mr *MockExecutorMockRecorder) Stop() *gomock.Call

Stop indicates an expected call of Stop

type MockFromFlags

type MockFromFlags struct {
	// contains filtered or unexported fields
}

MockFromFlags is a mock of FromFlags interface

func NewMockFromFlags

func NewMockFromFlags(ctrl *gomock.Controller) *MockFromFlags

NewMockFromFlags creates a new mock instance

func (*MockFromFlags) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockFromFlags) Make

func (m *MockFromFlags) Make(arg0 *log.Logger) Executor

Make mocks base method

type MockFromFlagsMockRecorder

type MockFromFlagsMockRecorder struct {
	// contains filtered or unexported fields
}

MockFromFlagsMockRecorder is the mock recorder for MockFromFlags

func (*MockFromFlagsMockRecorder) Make

func (mr *MockFromFlagsMockRecorder) Make(arg0 interface{}) *gomock.Call

Make indicates an expected call of Make

type Option

type Option func(*commonExec)

Option is used to supply configuration for an Executor implementation

func WithAttemptTimeout

func WithAttemptTimeout(timeout time.Duration) Option

WithAttemptTimeout sets the timeout for completion individual attempts of an action. If the attempt has not completed within the given duration, it is canceled (and potentially retried). Timeouts less than or equal to zero are treated as "no time out."

func WithDiagnostics

func WithDiagnostics(cb DiagnosticsCallback) Option

WithDiagnostics sets a DiagnosticsCallback for the Executor.

func WithLogger

func WithLogger(log *log.Logger) Option

WithLogger sets a Logger for panics recovered while executing actions.

func WithMaxAttempts

func WithMaxAttempts(maxAttempts int) Option

WithMaxAttempts sets the absolute maximum number of attempts made to complete an action (including the initial attempt). Values less than 1 act as if 1 had been passed.

func WithParallelism

func WithParallelism(parallelism int) Option

WithParallelism sets the number of goroutines used to execute actions. No more than this many actions can be executing at once. Values less than 1 act as if 1 has been passed.

func WithRetryDelayFunc

func WithRetryDelayFunc(d DelayFunc) Option

WithRetryDelayFunc sets the DelayFunc used when retrying actions.

func WithTimeSource

func WithTimeSource(src tbntime.Source) Option

WithTimeSource sets the tbntime.Source used for obtaining the current time. This option should only be used for testing.

func WithTimeout

func WithTimeout(timeout time.Duration) Option

WithTimeout sets the timeout for completion of actions. If the action has not completed (including retries) within the given duration, it is canceled. Timeouts less than or equal to zero are treated as "no time out."

type Try

type Try interface {
	// if true, the computation produced a return value
	IsReturn() bool

	// if true, the computation resulted in failure
	IsError() bool

	// Get returns the successul result of the computation.
	// All calls to Get should be guarded by IsReturn; if the computation
	// produced an error, calls to Get will panic.
	Get() interface{}

	// Error returns the error that caused the compuation to fail.
	// All calls to Error should be guarded by IsError; if the computation
	// succeeded, calls to Error will panic.
	Error() error
}

Try represents the result of a computation, which may return a value or an error. The following represents the possible return values for IsReturn, IsError, Get and Error:

Function | Success | Failure
---------|---------|--------
IsReturn | true    | false
IsError  | false   | true
Get      | result  | panic
Error    | panic   | error

func NewError

func NewError(err error) Try

NewError produces a Try representing a failed computation

func NewReturn

func NewReturn(i interface{}) Try

NewReturn produces a Try representing a successful computation

func NewTry

func NewTry(i interface{}, err error) Try

NewTry produces a Try based oen the given interface{} and error. If the error is non-nil, a Try is returned for which IsError returns true. Otherwise, a Try is returned for which IsReturn returns true.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL