Documentation ¶
Index ¶
- Constants
- Variables
- func RangeToChan[...](start InputType, end InputType) <-chan InputType
- func SliceToChan[InputType any](inputs []InputType) <-chan InputType
- type BaseExecutorCallbackInput
- type ContinuousBatchInput
- type ContinuousFinalInput
- type ContinuousInput
- type ContinuousRebatchInput
- type ContinuousUnbatchInput
- type EmptyInputChannelCallbackInput
- type ExecutorBatchInput
- type ExecutorContextDoneCallbackInput
- type ExecutorErrorCallbackInput
- type ExecutorFinalInput
- type ExecutorInput
- type ExecutorOutput
- func Chain[InputType any, OutputType any](upstream *ExecutorOutput[InputType], ...) *ExecutorOutput[OutputType]
- func ChainBatch[InputType any, OutputType any](upstream *ExecutorOutput[InputType], ...) *ExecutorOutput[[]OutputType]
- func ChainFinal[InputType any](upstream *ExecutorOutput[InputType], input ExecutorFinalInput[InputType]) *ExecutorOutput[any]
- func ChainRebatch[InputType any, OutputType any](upstream *ExecutorOutput[InputType], ...) *ExecutorOutput[[]OutputType]
- func ChainUnbatch[InputType any, OutputChanType any](upstream *ExecutorOutput[InputType], ...) *ExecutorOutput[OutputChanType]
- func Continuous[OutputType any](ctx context.Context, input ContinuousInput[OutputType], period time.Duration) *ExecutorOutput[OutputType]
- func ContinuousBatch[OutputType any](ctx context.Context, input ContinuousBatchInput[OutputType], ...) *ExecutorOutput[[]OutputType]
- func ContinuousFinal(ctx context.Context, input ContinuousFinalInput, period time.Duration) *ExecutorOutput[any]
- func ContinuousRebatch[OutputType any](ctx context.Context, input ContinuousRebatchInput[OutputType], ...) *ExecutorOutput[[]OutputType]
- func ContinuousUnbatch[OutputChanType any](ctx context.Context, input ContinuousUnbatchInput[OutputChanType], ...) *ExecutorOutput[OutputChanType]
- func Executor[InputType any, OutputType any](ctx context.Context, input ExecutorInput[InputType, OutputType]) *ExecutorOutput[OutputType]
- func ExecutorBatch[InputType any, OutputType any](ctx context.Context, input ExecutorBatchInput[InputType, OutputType]) *ExecutorOutput[[]OutputType]
- func ExecutorFinal[InputType any](ctx context.Context, input ExecutorFinalInput[InputType]) *ExecutorOutput[any]
- func ExecutorRebatch[InputType any, OutputType any](ctx context.Context, input ExecutorRebatchInput[InputType, OutputType]) *ExecutorOutput[[]OutputType]
- func ExecutorUnbatch[InputType any, OutputChanType any](ctx context.Context, input ExecutorUnbatchInput[InputType, OutputChanType]) *ExecutorOutput[OutputChanType]
- type ExecutorRebatchInput
- type ExecutorSuccessCallbackInput
- type ExecutorUnbatchInput
- type FullOutputChannelCallbackInput
- type ProcessingFuncTypes
- type ProcessingFuncWithInputWithOutput
- type ProcessingFuncWithInputWithoutOutput
- type ProcessingFuncWithoutInputWithOutput
- type ProcessingFuncWithoutInputWithoutOutput
- type RoutineContextDoneCallbackInput
- type RoutineErrorCallbackInput
- type RoutineFunctionMetadata
- type RoutineStatusTracker
- func (rst *RoutineStatusTracker) GetExecutorName() string
- func (rst *RoutineStatusTracker) GetInputChanLength() int
- func (rst *RoutineStatusTracker) GetNumRoutinesAwaitingInput() int32
- func (rst *RoutineStatusTracker) GetNumRoutinesAwaitingOutput() int32
- func (rst *RoutineStatusTracker) GetNumRoutinesContextDone() int32
- func (rst *RoutineStatusTracker) GetNumRoutinesErrored() int32
- func (rst *RoutineStatusTracker) GetNumRoutinesFinished() int32
- func (rst *RoutineStatusTracker) GetNumRoutinesProcessing() int32
- func (rst *RoutineStatusTracker) GetNumRoutinesRunning() int32
- func (rst *RoutineStatusTracker) GetOutputChanLength() *int
- type RoutineSuccessCallbackInput
Constants ¶
const ( AwaitingInput routineStatus = iota Processing AwaitingOutput Errored ContextDone Finished )
Variables ¶
Functions ¶
func RangeToChan ¶
func RangeToChan[InputType int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | uint64](start InputType, end InputType) <-chan InputType
RangeToChan returns a closed channel that contains all of the integer values, in ascending order, from `start` (inclusive) to `end` (exclusive)
func SliceToChan ¶
func SliceToChan[InputType any](inputs []InputType) <-chan InputType
SliceToChan returns a closed channel that contains all of the values in the `inputs` slice
Types ¶
type BaseExecutorCallbackInput ¶
type BaseExecutorCallbackInput struct { // The name of the executor ExecutorName string }
The common set of values for callbacks that are not specific to a single routine
type ContinuousBatchInput ¶
type ContinuousBatchInput[OutputType any] executorInput[time.Time, OutputType, []OutputType, ProcessingFuncWithoutInputWithOutput[OutputType]]
type ContinuousFinalInput ¶
type ContinuousFinalInput executorInput[time.Time, any, any, ProcessingFuncWithoutInputWithoutOutput]
type ContinuousInput ¶
type ContinuousInput[OutputType any] executorInput[time.Time, OutputType, OutputType, ProcessingFuncWithoutInputWithOutput[OutputType]]
type ContinuousRebatchInput ¶
type ContinuousRebatchInput[OutputType any] executorInput[time.Time, []OutputType, []OutputType, ProcessingFuncWithoutInputWithOutput[[]OutputType]]
type ContinuousUnbatchInput ¶
type ContinuousUnbatchInput[OutputChanType any] executorInput[time.Time, []OutputChanType, OutputChanType, ProcessingFuncWithoutInputWithOutput[[]OutputChanType]]
type EmptyInputChannelCallbackInput ¶
type EmptyInputChannelCallbackInput struct { *RoutineFunctionMetadata // The duration since the last input was received TimeSinceLastInput time.Duration }
type ExecutorBatchInput ¶
type ExecutorBatchInput[InputType any, OutputType any] executorInput[InputType, OutputType, []OutputType, ProcessingFuncWithInputWithOutput[InputType, OutputType]]
type ExecutorContextDoneCallbackInput ¶
type ExecutorContextDoneCallbackInput struct { *BaseExecutorCallbackInput // The context cancellation error (may wrap other info) Err stackerr.Error }
type ExecutorErrorCallbackInput ¶
type ExecutorErrorCallbackInput struct { *BaseExecutorCallbackInput // The error that caused one or more of the routines to fail Err stackerr.Error }
type ExecutorFinalInput ¶
type ExecutorFinalInput[InputType any] executorInput[InputType, any, any, ProcessingFuncWithInputWithoutOutput[InputType]]
type ExecutorInput ¶
type ExecutorInput[InputType any, OutputType any] executorInput[InputType, OutputType, OutputType, ProcessingFuncWithInputWithOutput[InputType, OutputType]]
type ExecutorOutput ¶
type ExecutorOutput[OutputChanType any] struct { // The name of the executor Name string // The status tracker for the routines. RoutineStatusTracker *RoutineStatusTracker // The channel that the outputs are written to OutputChan <-chan OutputChanType // contains filtered or unexported fields }
func Chain ¶
func Chain[InputType any, OutputType any](upstream *ExecutorOutput[InputType], input ExecutorInput[InputType, OutputType]) *ExecutorOutput[OutputType]
func ChainBatch ¶
func ChainBatch[InputType any, OutputType any](upstream *ExecutorOutput[InputType], input ExecutorBatchInput[InputType, OutputType]) *ExecutorOutput[[]OutputType]
func ChainFinal ¶
func ChainFinal[InputType any](upstream *ExecutorOutput[InputType], input ExecutorFinalInput[InputType]) *ExecutorOutput[any]
func ChainRebatch ¶
func ChainRebatch[InputType any, OutputType any](upstream *ExecutorOutput[InputType], input ExecutorRebatchInput[InputType, OutputType]) *ExecutorOutput[[]OutputType]
func ChainUnbatch ¶
func ChainUnbatch[InputType any, OutputChanType any](upstream *ExecutorOutput[InputType], input ExecutorUnbatchInput[InputType, OutputChanType]) *ExecutorOutput[OutputChanType]
func Continuous ¶
func Continuous[OutputType any]( ctx context.Context, input ContinuousInput[OutputType], period time.Duration, ) *ExecutorOutput[OutputType]
func ContinuousBatch ¶
func ContinuousBatch[OutputType any]( ctx context.Context, input ContinuousBatchInput[OutputType], period time.Duration, ) *ExecutorOutput[[]OutputType]
func ContinuousFinal ¶
func ContinuousFinal( ctx context.Context, input ContinuousFinalInput, period time.Duration, ) *ExecutorOutput[any]
func ContinuousRebatch ¶
func ContinuousRebatch[OutputType any]( ctx context.Context, input ContinuousRebatchInput[OutputType], period time.Duration, ) *ExecutorOutput[[]OutputType]
func ContinuousUnbatch ¶
func ContinuousUnbatch[OutputChanType any]( ctx context.Context, input ContinuousUnbatchInput[OutputChanType], period time.Duration, ) *ExecutorOutput[OutputChanType]
func Executor ¶
func Executor[InputType any, OutputType any](ctx context.Context, input ExecutorInput[InputType, OutputType]) *ExecutorOutput[OutputType]
func ExecutorBatch ¶
func ExecutorBatch[InputType any, OutputType any](ctx context.Context, input ExecutorBatchInput[InputType, OutputType]) *ExecutorOutput[[]OutputType]
func ExecutorFinal ¶
func ExecutorFinal[InputType any](ctx context.Context, input ExecutorFinalInput[InputType]) *ExecutorOutput[any]
func ExecutorRebatch ¶
func ExecutorRebatch[InputType any, OutputType any](ctx context.Context, input ExecutorRebatchInput[InputType, OutputType]) *ExecutorOutput[[]OutputType]
func ExecutorUnbatch ¶
func ExecutorUnbatch[InputType any, OutputChanType any](ctx context.Context, input ExecutorUnbatchInput[InputType, OutputChanType]) *ExecutorOutput[OutputChanType]
func (*ExecutorOutput[OutputChanType]) Ctx ¶
func (eo *ExecutorOutput[OutputChanType]) Ctx() context.Context
Ctx returns a context that is derived from the top-level executor's input context and is cancelled if any of the executors in a chain fail (after they are all cleaned up).
func (*ExecutorOutput[OutputChanType]) Errored ¶
func (eo *ExecutorOutput[OutputChanType]) Errored() <-chan struct{}
Errored returns a channel that never has a value, but will always remain open UNLESS this executor finished with an error.
func (*ExecutorOutput[OutputType]) IntoSlice ¶
func (eo *ExecutorOutput[OutputType]) IntoSlice() ([]OutputType, stackerr.Error)
func (*ExecutorOutput[OutputChanType]) Wait ¶
func (eo *ExecutorOutput[OutputChanType]) Wait() stackerr.Error
Wait waits for an executor to finish. If the executor exited with an error, that error will be returned.
type ExecutorRebatchInput ¶
type ExecutorRebatchInput[InputType any, OutputType any] executorInput[InputType, []OutputType, []OutputType, ProcessingFuncWithInputWithOutput[InputType, []OutputType]]
type ExecutorSuccessCallbackInput ¶
type ExecutorSuccessCallbackInput struct {
*BaseExecutorCallbackInput
}
type ExecutorUnbatchInput ¶
type ExecutorUnbatchInput[InputType any, OutputChanType any] executorInput[InputType, []OutputChanType, OutputChanType, ProcessingFuncWithInputWithOutput[InputType, []OutputChanType]]
type FullOutputChannelCallbackInput ¶
type FullOutputChannelCallbackInput struct { *RoutineFunctionMetadata // The duration since the last output was stored TimeSinceLastOutput time.Duration // The output index, specific to the corresponding input // (i.e. resets at 0 for each input) OutputIndex uint64 }
type ProcessingFuncTypes ¶
type ProcessingFuncTypes[InputType any, OutputType any] interface { ProcessingFuncWithInputWithOutput[InputType, OutputType] | ProcessingFuncWithInputWithoutOutput[InputType] | ProcessingFuncWithoutInputWithOutput[OutputType] | ProcessingFuncWithoutInputWithoutOutput }
type ProcessingFuncWithInputWithOutput ¶
type ProcessingFuncWithInputWithOutput[InputType any, OutputType any] func(ctx context.Context, input InputType, metadata *RoutineFunctionMetadata) (output OutputType, err stackerr.Error)
type ProcessingFuncWithInputWithoutOutput ¶
type ProcessingFuncWithInputWithoutOutput[InputType any] func(ctx context.Context, input InputType, metadata *RoutineFunctionMetadata) (err stackerr.Error)
type ProcessingFuncWithoutInputWithOutput ¶
type ProcessingFuncWithoutInputWithOutput[OutputType any] func(ctx context.Context, metadata *RoutineFunctionMetadata) (output OutputType, err stackerr.Error)
type ProcessingFuncWithoutInputWithoutOutput ¶
type ProcessingFuncWithoutInputWithoutOutput func(ctx context.Context, metadata *RoutineFunctionMetadata) (err stackerr.Error)
type RoutineContextDoneCallbackInput ¶
type RoutineContextDoneCallbackInput struct { *RoutineFunctionMetadata // The error that killed the context Err stackerr.Error }
type RoutineErrorCallbackInput ¶
type RoutineErrorCallbackInput struct { *RoutineFunctionMetadata // The error that was returned by the function Err stackerr.Error }
type RoutineFunctionMetadata ¶
type RoutineFunctionMetadata struct { // The name of the executor ExecutorName string // The index of the routine (will be in [0-concurrency)) RoutineIndex uint // The index of the input (the input value is the Nth value // from the input channel) ExecutorInputIndex uint64 // The index of the input for this routine RoutineInputIndex uint64 // The status tracker for this executor RoutineStatusTracker *RoutineStatusTracker // The status trackers for all executors in this chain (by map) RoutineStatusTrackersMap map[string]*RoutineStatusTracker // The status trackers for all executors in this chain (by slice, in order of chaining) RoutineStatusTrackersSlice []*RoutineStatusTracker }
The common set of values for callbacks that are specific to single routine
type RoutineStatusTracker ¶
type RoutineStatusTracker struct {
// contains filtered or unexported fields
}
func (*RoutineStatusTracker) GetExecutorName ¶
func (rst *RoutineStatusTracker) GetExecutorName() string
func (*RoutineStatusTracker) GetInputChanLength ¶
func (rst *RoutineStatusTracker) GetInputChanLength() int
func (*RoutineStatusTracker) GetNumRoutinesAwaitingInput ¶
func (rst *RoutineStatusTracker) GetNumRoutinesAwaitingInput() int32
func (*RoutineStatusTracker) GetNumRoutinesAwaitingOutput ¶
func (rst *RoutineStatusTracker) GetNumRoutinesAwaitingOutput() int32
func (*RoutineStatusTracker) GetNumRoutinesContextDone ¶
func (rst *RoutineStatusTracker) GetNumRoutinesContextDone() int32
func (*RoutineStatusTracker) GetNumRoutinesErrored ¶
func (rst *RoutineStatusTracker) GetNumRoutinesErrored() int32
func (*RoutineStatusTracker) GetNumRoutinesFinished ¶
func (rst *RoutineStatusTracker) GetNumRoutinesFinished() int32
func (*RoutineStatusTracker) GetNumRoutinesProcessing ¶
func (rst *RoutineStatusTracker) GetNumRoutinesProcessing() int32
func (*RoutineStatusTracker) GetNumRoutinesRunning ¶
func (rst *RoutineStatusTracker) GetNumRoutinesRunning() int32
func (*RoutineStatusTracker) GetOutputChanLength ¶
func (rst *RoutineStatusTracker) GetOutputChanLength() *int
Returns nil if there is no output channel, or a pointer to an int if there is
type RoutineSuccessCallbackInput ¶
type RoutineSuccessCallbackInput struct {
*RoutineFunctionMetadata
}