concurrency

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2023 License: MIT Imports: 10 Imported by: 1

README

concurrency

Concurrent/parallel processing for Go.

Documentation

Index

Constants

View Source
const (
	AwaitingInput routineStatus = iota
	Processing
	AwaitingOutput
	Errored
	ContextDone
	Finished
)

Variables

View Source
var (
	DefaultEmptyInputChannelCallbackInterval time.Duration = 1 * time.Second
	DefaultFullOutputChannelCallbackInterval time.Duration = 1 * time.Second
)

Functions

func RangeToChan

func RangeToChan[InputType int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | uint64](start InputType, end InputType) <-chan InputType

RangeToChan returns a closed channel that contains all of the integer values, in ascending order, from `start` (inclusive) to `end` (exclusive)

func SliceToChan

func SliceToChan[InputType any](inputs []InputType) <-chan InputType

SliceToChan returns a closed channel that contains all of the values in the `inputs` slice

Types

type BaseExecutorCallbackInput

type BaseExecutorCallbackInput struct {
	// The name of the executor
	ExecutorName string
}

The common set of values for callbacks that are not specific to a single routine

type ContinuousBatchInput

type ContinuousBatchInput[OutputType any] executorInput[time.Time, OutputType, []OutputType, ProcessingFuncWithoutInputWithOutput[OutputType]]

type ContinuousFinalInput

type ContinuousFinalInput executorInput[time.Time, any, any, ProcessingFuncWithoutInputWithoutOutput]

type ContinuousInput

type ContinuousInput[OutputType any] executorInput[time.Time, OutputType, OutputType, ProcessingFuncWithoutInputWithOutput[OutputType]]

type ContinuousRebatchInput

type ContinuousRebatchInput[OutputType any] executorInput[time.Time, []OutputType, []OutputType, ProcessingFuncWithoutInputWithOutput[[]OutputType]]

type ContinuousUnbatchInput

type ContinuousUnbatchInput[OutputChanType any] executorInput[time.Time, []OutputChanType, OutputChanType, ProcessingFuncWithoutInputWithOutput[[]OutputChanType]]

type EmptyInputChannelCallbackInput

type EmptyInputChannelCallbackInput struct {
	*RoutineFunctionMetadata
	// The duration since the last input was received
	TimeSinceLastInput time.Duration
}

type ExecutorBatchInput

type ExecutorBatchInput[InputType any, OutputType any] executorInput[InputType, OutputType, []OutputType, ProcessingFuncWithInputWithOutput[InputType, OutputType]]

type ExecutorContextDoneCallbackInput

type ExecutorContextDoneCallbackInput struct {
	*BaseExecutorCallbackInput
	// The context cancellation error (may wrap other info)
	Err stackerr.Error
}

type ExecutorErrorCallbackInput

type ExecutorErrorCallbackInput struct {
	*BaseExecutorCallbackInput
	// The error that caused one or more of the routines to fail
	Err stackerr.Error
}

type ExecutorFinalInput

type ExecutorFinalInput[InputType any] executorInput[InputType, any, any, ProcessingFuncWithInputWithoutOutput[InputType]]

type ExecutorInput

type ExecutorInput[InputType any, OutputType any] executorInput[InputType, OutputType, OutputType, ProcessingFuncWithInputWithOutput[InputType, OutputType]]

type ExecutorOutput

type ExecutorOutput[OutputChanType any] struct {

	// The name of the executor
	Name string

	// The status tracker for the routines.
	RoutineStatusTracker *RoutineStatusTracker

	// The channel that the outputs are written to
	OutputChan <-chan OutputChanType
	// contains filtered or unexported fields
}

func Chain

func Chain[InputType any, OutputType any](upstream *ExecutorOutput[InputType], input ExecutorInput[InputType, OutputType]) *ExecutorOutput[OutputType]

func ChainBatch

func ChainBatch[InputType any, OutputType any](upstream *ExecutorOutput[InputType], input ExecutorBatchInput[InputType, OutputType]) *ExecutorOutput[[]OutputType]

func ChainFinal

func ChainFinal[InputType any](upstream *ExecutorOutput[InputType], input ExecutorFinalInput[InputType]) *ExecutorOutput[any]

func ChainRebatch

func ChainRebatch[InputType any, OutputType any](upstream *ExecutorOutput[InputType], input ExecutorRebatchInput[InputType, OutputType]) *ExecutorOutput[[]OutputType]

func ChainUnbatch

func ChainUnbatch[InputType any, OutputChanType any](upstream *ExecutorOutput[InputType], input ExecutorUnbatchInput[InputType, OutputChanType]) *ExecutorOutput[OutputChanType]

func Continuous

func Continuous[OutputType any](
	ctx context.Context,
	input ContinuousInput[OutputType],
	period time.Duration,
) *ExecutorOutput[OutputType]

func ContinuousBatch

func ContinuousBatch[OutputType any](
	ctx context.Context,
	input ContinuousBatchInput[OutputType],
	period time.Duration,
) *ExecutorOutput[[]OutputType]

func ContinuousFinal

func ContinuousFinal(
	ctx context.Context,
	input ContinuousFinalInput,
	period time.Duration,
) *ExecutorOutput[any]

func ContinuousRebatch

func ContinuousRebatch[OutputType any](
	ctx context.Context,
	input ContinuousRebatchInput[OutputType],
	period time.Duration,
) *ExecutorOutput[[]OutputType]

func ContinuousUnbatch

func ContinuousUnbatch[OutputChanType any](
	ctx context.Context,
	input ContinuousUnbatchInput[OutputChanType],
	period time.Duration,
) *ExecutorOutput[OutputChanType]

func Executor

func Executor[InputType any, OutputType any](ctx context.Context, input ExecutorInput[InputType, OutputType]) *ExecutorOutput[OutputType]

func ExecutorBatch

func ExecutorBatch[InputType any, OutputType any](ctx context.Context, input ExecutorBatchInput[InputType, OutputType]) *ExecutorOutput[[]OutputType]

func ExecutorFinal

func ExecutorFinal[InputType any](ctx context.Context, input ExecutorFinalInput[InputType]) *ExecutorOutput[any]

func ExecutorRebatch

func ExecutorRebatch[InputType any, OutputType any](ctx context.Context, input ExecutorRebatchInput[InputType, OutputType]) *ExecutorOutput[[]OutputType]

func ExecutorUnbatch

func ExecutorUnbatch[InputType any, OutputChanType any](ctx context.Context, input ExecutorUnbatchInput[InputType, OutputChanType]) *ExecutorOutput[OutputChanType]

func (*ExecutorOutput[OutputChanType]) Ctx

func (eo *ExecutorOutput[OutputChanType]) Ctx() context.Context

Ctx returns a context that is derived from the top-level executor's input context and is cancelled if any of the executors in a chain fail (after they are all cleaned up).

func (*ExecutorOutput[OutputChanType]) Errored

func (eo *ExecutorOutput[OutputChanType]) Errored() <-chan struct{}

Errored returns a channel that never has a value, but will always remain open UNLESS this executor finished with an error.

func (*ExecutorOutput[OutputType]) IntoSlice

func (eo *ExecutorOutput[OutputType]) IntoSlice() ([]OutputType, stackerr.Error)

func (*ExecutorOutput[OutputChanType]) Wait

func (eo *ExecutorOutput[OutputChanType]) Wait() stackerr.Error

Wait waits for an executor to finish. If the executor exited with an error, that error will be returned.

type ExecutorRebatchInput

type ExecutorRebatchInput[InputType any, OutputType any] executorInput[InputType, []OutputType, []OutputType, ProcessingFuncWithInputWithOutput[InputType, []OutputType]]

type ExecutorSuccessCallbackInput

type ExecutorSuccessCallbackInput struct {
	*BaseExecutorCallbackInput
}

type ExecutorUnbatchInput

type ExecutorUnbatchInput[InputType any, OutputChanType any] executorInput[InputType, []OutputChanType, OutputChanType, ProcessingFuncWithInputWithOutput[InputType, []OutputChanType]]

type FullOutputChannelCallbackInput

type FullOutputChannelCallbackInput struct {
	*RoutineFunctionMetadata
	// The duration since the last output was stored
	TimeSinceLastOutput time.Duration
	// The output index, specific to the corresponding input
	// (i.e. resets at 0 for each input)
	OutputIndex uint64
}

type ProcessingFuncTypes

type ProcessingFuncTypes[InputType any, OutputType any] interface {
	ProcessingFuncWithInputWithOutput[InputType, OutputType] | ProcessingFuncWithInputWithoutOutput[InputType] | ProcessingFuncWithoutInputWithOutput[OutputType] | ProcessingFuncWithoutInputWithoutOutput
}

type ProcessingFuncWithInputWithOutput

type ProcessingFuncWithInputWithOutput[InputType any, OutputType any] func(ctx context.Context, input InputType, metadata *RoutineFunctionMetadata) (output OutputType, err stackerr.Error)

type ProcessingFuncWithInputWithoutOutput

type ProcessingFuncWithInputWithoutOutput[InputType any] func(ctx context.Context, input InputType, metadata *RoutineFunctionMetadata) (err stackerr.Error)

type ProcessingFuncWithoutInputWithOutput

type ProcessingFuncWithoutInputWithOutput[OutputType any] func(ctx context.Context, metadata *RoutineFunctionMetadata) (output OutputType, err stackerr.Error)

type ProcessingFuncWithoutInputWithoutOutput

type ProcessingFuncWithoutInputWithoutOutput func(ctx context.Context, metadata *RoutineFunctionMetadata) (err stackerr.Error)

type RoutineContextDoneCallbackInput

type RoutineContextDoneCallbackInput struct {
	*RoutineFunctionMetadata
	// The error that killed the context
	Err stackerr.Error
}

type RoutineErrorCallbackInput

type RoutineErrorCallbackInput struct {
	*RoutineFunctionMetadata
	// The error that was returned by the function
	Err stackerr.Error
}

type RoutineFunctionMetadata

type RoutineFunctionMetadata struct {
	// The name of the executor
	ExecutorName string
	// The index of the routine (will be in [0-concurrency))
	RoutineIndex uint
	// The index of the input (the input value is the Nth value
	// from the input channel)
	ExecutorInputIndex uint64
	// The index of the input for this routine
	RoutineInputIndex uint64
	// The status tracker for this executor
	RoutineStatusTracker *RoutineStatusTracker
	// The status trackers for all executors in this chain (by map)
	RoutineStatusTrackersMap map[string]*RoutineStatusTracker
	// The status trackers for all executors in this chain (by slice, in order of chaining)
	RoutineStatusTrackersSlice []*RoutineStatusTracker
}

The common set of values for callbacks that are specific to single routine

type RoutineStatusTracker

type RoutineStatusTracker struct {
	// contains filtered or unexported fields
}

func (*RoutineStatusTracker) GetExecutorName

func (rst *RoutineStatusTracker) GetExecutorName() string

func (*RoutineStatusTracker) GetInputChanLength

func (rst *RoutineStatusTracker) GetInputChanLength() int

func (*RoutineStatusTracker) GetNumRoutinesAwaitingInput

func (rst *RoutineStatusTracker) GetNumRoutinesAwaitingInput() int32

func (*RoutineStatusTracker) GetNumRoutinesAwaitingOutput

func (rst *RoutineStatusTracker) GetNumRoutinesAwaitingOutput() int32

func (*RoutineStatusTracker) GetNumRoutinesContextDone

func (rst *RoutineStatusTracker) GetNumRoutinesContextDone() int32

func (*RoutineStatusTracker) GetNumRoutinesErrored

func (rst *RoutineStatusTracker) GetNumRoutinesErrored() int32

func (*RoutineStatusTracker) GetNumRoutinesFinished

func (rst *RoutineStatusTracker) GetNumRoutinesFinished() int32

func (*RoutineStatusTracker) GetNumRoutinesProcessing

func (rst *RoutineStatusTracker) GetNumRoutinesProcessing() int32

func (*RoutineStatusTracker) GetNumRoutinesRunning

func (rst *RoutineStatusTracker) GetNumRoutinesRunning() int32

func (*RoutineStatusTracker) GetOutputChanLength

func (rst *RoutineStatusTracker) GetOutputChanLength() *int

Returns nil if there is no output channel, or a pointer to an int if there is

type RoutineSuccessCallbackInput

type RoutineSuccessCallbackInput struct {
	*RoutineFunctionMetadata
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL