Documentation ¶
Index ¶
- Constants
- Variables
- func LinkJointAfterNode(nw NodeWorker, jw JointWorker, index int) error
- func LinkNodeAfterJoint(jw JointWorker, nw NodeWorker) error
- func LinkWorker2Worker(a NodeWorker, b NodeWorker) error
- type CnvContext
- type ConcreteJointExecutor
- type ConcreteJointWorker
- type ConcreteNodeExecutor
- func (cnh *ConcreteNodeExecutor) CleanUp() error
- func (cnh *ConcreteNodeExecutor) Count() int
- func (cnh *ConcreteNodeExecutor) Execute(ctx CnvContext, inData map[string]interface{}) (map[string]interface{}, error)
- func (cnh *ConcreteNodeExecutor) ExecuteLoop(ctx CnvContext, inChan <-chan map[string]interface{}, ...) error
- func (cnh *ConcreteNodeExecutor) GetName() string
- func (cnh *ConcreteNodeExecutor) GetUniqueIdentifier() string
- type ConcreteNodeWorker
- type Conveyor
- func (cnv *Conveyor) AddJointExecutor(jointExecutor JointExecutor) error
- func (cnv *Conveyor) AddJointExecutorAfterNode(jointExecutor JointExecutor, workerMode WorkerMode, workerType string) error
- func (cnv *Conveyor) AddJointWorker(joint JointWorker) error
- func (cnv *Conveyor) AddNodeExecutor(nodeExecutor NodeExecutor, workerMode WorkerMode, workerType string) error
- func (cnv *Conveyor) AddNodeExecutorAfterJoint(nodeExecutor NodeExecutor, workerMode WorkerMode, workerType string) error
- func (cnv *Conveyor) AddNodeWorker(worker NodeWorker, toLink bool) error
- func (cnv *Conveyor) Done() <-chan struct{}
- func (cnv *Conveyor) EnableProgress(expectedDuration time.Duration) *Conveyor
- func (cnv *Conveyor) GetConveyorContext() CnvContext
- func (cnv *Conveyor) GetLastWorker() (NodeWorker, error)
- func (cnv *Conveyor) Logs() <-chan Message
- func (cnv *Conveyor) MarkCurrentState(state string) error
- func (cnv *Conveyor) Progress() <-chan float64
- func (cnv *Conveyor) SetCustomContext(ctx CnvContext) *Conveyor
- func (cnv *Conveyor) SetID(id string) *Conveyor
- func (cnv *Conveyor) SetLifeCycleHandler(lch LifeCycleHandler) *Conveyor
- func (cnv *Conveyor) SetTimeout(timeout time.Duration) *Conveyor
- func (cnv *Conveyor) Start() error
- func (cnv *Conveyor) Status() <-chan string
- func (cnv *Conveyor) Stop() time.Duration
- type CtxData
- type JointExecutor
- type JointWorker
- type JointWorkerPool
- func (jwp *JointWorkerPool) AddInputChannel(inChan chan map[string]interface{}) error
- func (jwp *JointWorkerPool) AddOutputChannel(outChan chan map[string]interface{}) error
- func (jwp *JointWorkerPool) CreateChannels(buffer int)
- func (jwp *JointWorkerPool) GetInputChannels() ([]chan map[string]interface{}, error)
- func (jwp *JointWorkerPool) GetOutputChannels() ([]chan map[string]interface{}, error)
- func (jwp *JointWorkerPool) SetInputChannels(inChans []chan map[string]interface{}) error
- func (jwp *JointWorkerPool) SetOutputChannels(outChans []chan map[string]interface{}) error
- func (jwp *JointWorkerPool) Start(ctx CnvContext) error
- func (jwp *JointWorkerPool) WaitAndStop() error
- type LifeCycleHandler
- type LocalLifeCycleHandler
- type Message
- type NodeExecutor
- type NodeWorker
- type OperationNode
- type OperationWorkerPool
- func (fwp *OperationWorkerPool) CreateChannels(buffer int)
- func (fwp *OperationWorkerPool) GetInputChannel() (chan map[string]interface{}, error)
- func (fwp *OperationWorkerPool) GetOutputChannel() (chan map[string]interface{}, error)
- func (fwp *OperationWorkerPool) SetInputChannel(inChan chan map[string]interface{}) error
- func (fwp *OperationWorkerPool) SetOutputChannel(outChan chan map[string]interface{}) error
- func (fwp *OperationWorkerPool) Start(ctx CnvContext) error
- func (fwp *OperationWorkerPool) WaitAndStop(ctx CnvContext) error
- func (fwp *OperationWorkerPool) WorkerType() string
- type ProgressUpdater
- type ReplicateJoint
- type SinkWorkerPool
- func (swp *SinkWorkerPool) CreateChannels(buffer int)
- func (swp *SinkWorkerPool) GetInputChannel() (chan map[string]interface{}, error)
- func (swp *SinkWorkerPool) GetOutputChannel() (chan map[string]interface{}, error)
- func (swp *SinkWorkerPool) SetInputChannel(inChan chan map[string]interface{}) error
- func (swp *SinkWorkerPool) SetOutputChannel(outChan chan map[string]interface{}) error
- func (swp *SinkWorkerPool) Start(ctx CnvContext) error
- func (swp *SinkWorkerPool) WaitAndStop(ctx CnvContext) error
- func (swp *SinkWorkerPool) WorkerType() string
- type SourceWorkerPool
- func (swp *SourceWorkerPool) GetInputChannel() (chan map[string]interface{}, error)
- func (swp *SourceWorkerPool) GetOutputChannel() (chan map[string]interface{}, error)
- func (swp *SourceWorkerPool) SetInputChannel(inChan chan map[string]interface{}) error
- func (swp *SourceWorkerPool) SetOutputChannel(outChan chan map[string]interface{}) error
- func (swp *SourceWorkerPool) Start(ctx CnvContext) error
- func (swp *SourceWorkerPool) WaitAndStop(ctx CnvContext) error
- func (swp *SourceWorkerPool) WorkerType() string
- type StateUpdater
- type WPool
- type WorkerMode
Constants ¶
const ( // StatusPreparing status is used to mark a conveyor to be in "preparing" state StatusPreparing = "preparing" // StateStarted status is used to mark a conveyor to be in "started" state StateStarted = "started" // StateToKill status is used to mark a conveyor has been setup "to be killed", but isn't yet dead StateToKill = "toKill" // StateKilled status is used to mark a conveyor as "successfully killed" StateKilled = "killed" // StateFinished status is used to mark a conveyor as "successfully finished" StateFinished = "finished" // StateInternalError status is used to indicate that conveyor couldn't finish due to some internal error StateInternalError = "internalError" )
Valid States for a Conveyor
const ( // WorkerTypeSource constant WorkerTypeSource = "SOURCE_WORKER" // WorkerTypeOperation constant WorkerTypeOperation = "OPERATION_WORKER" // WorkerTypeSink constant WorkerTypeSink = "SINK_WORKER" // WorkerTypeJoint constant WorkerTypeJoint = "JOINT_WORKER" )
const ( // WorkerModeTransaction is the worker mode in which executor just needs to implement Execute(ctx) method // and doesn't need to handle worker's channel or shutdown of worker. // But executor should still monitor ctx.Done() to shutdown or cleanup/close any files/connections, etc that it opens. // This one helps keep the executor code leaner & simple // This mode is useful for most of the cases, including executors that do file I/O, database lookup, remote API call WorkerModeTransaction = WorkerMode(iota + 10) // WorkerModeLoop is the worker mode in which executor just needs to implement ExecuteLoop(ctx, inChan, outChan) method // and has to handle the copying of data from/to channels (except, closing them), executor will also need to ensure // that it monitors ctx.Done() to shutdown worker, in case of any error. // Needs more code, use only if you are ready to peek into how it works. // Some use cases are, where you can't fetch data on-demand with a function call. Eg. Running an API server as source WorkerModeLoop )
Variables ¶
var ( // ErrInvalidWorkerType error ErrInvalidWorkerType = errors.New("Invalid worker type. pick one from conveyor.WorkerTypeSource/conveyor.WorkerTypeOperation/conveyor.WorkerTypeSink") // ErrInvalidWorkerMode error ErrInvalidWorkerMode = errors.New("Invalid worker mode. pick either conveyor.WorkerModeTransaction or conveyor.WorkerModeLoop") // ErrNoNodesAvailable error ErrNoNodesAvailable = errors.New("Your action assumes presence of Node Executors in conveyor, but none were found") // ErrNoJointsAvailable error ErrNoJointsAvailable = errors.New("Your action assumes presence of Joint Executors in conveyor, but none were found") // ErrExecuteNotImplemented error ErrExecuteNotImplemented = errors.New("This executor doesn't implement Execute() method") // ErrExecuteLoopNotImplemented error ErrExecuteLoopNotImplemented = errors.New("This executor doesn't implement ExecuteLoop() method") // ErrInputChanDoesNotExist error ErrInputChanDoesNotExist = errors.New("input channel doesn't exist for this node") // ErrOutputChanDoesNotExist error ErrOutputChanDoesNotExist = errors.New("output channel doesn't exist for this node") // ErrSourceExhausted error ErrSourceExhausted = errors.New("Source executor is exhausted") // ErrSourceInternal error ErrSourceInternal = errors.New("Source executor internal error") // ErrFetchRejected error ErrFetchRejected = errors.New("Fetch executor rejected the transaction") // ErrFetchInternal error ErrFetchInternal = errors.New("Fetch executor internal error") // ErrSinkRejected error ErrSinkRejected = errors.New("Sink executor rejected data") // ErrSinkInternal error ErrSinkInternal = errors.New("Sink executor internal error") // ErrLessInputChannelsInJoint error ErrLessInputChannelsInJoint = errors.New("JointWorker doesn't have enough input channels") // ErrLessOutputChannelsInJoint error ErrLessOutputChannelsInJoint = errors.New("JointWorker doesn't have enough output channels") // ErrLifeCycleNotSupported error ErrLifeCycleNotSupported = errors.New("This conveyor instance isn't created with Life Cycle Support") )
var ( // ErrNoInputChannel error ErrNoInputChannel = errors.New("number of input channels is 0") // ErrNoOutputChannel error ErrNoOutputChannel = errors.New("number of output channels is 0") // ErrMultipleInputChannels error ErrMultipleInputChannels = errors.New("only one input channel can be replicated") // ErrOneToOneConnection error ErrOneToOneConnection = errors.New("replicate joint isn't needed for one-to one mapping, " + "you can just link the nodes directly") )
var ( // ErrEmptyConveyor error ErrEmptyConveyor = errors.New("conveyor is empty, no workers employed") )
Functions ¶
func LinkJointAfterNode ¶ added in v1.0.2
func LinkJointAfterNode(nw NodeWorker, jw JointWorker, index int) error
LinkJointAfterNode links JointWorker after NodeWorkers, maps input channel of joint worker on output channel of node worker
func LinkNodeAfterJoint ¶ added in v1.0.2
func LinkNodeAfterJoint(jw JointWorker, nw NodeWorker) error
LinkNodeAfterJoint links NodeWorkers after JointWorker, maps input channel of a b on output channel of a
func LinkWorker2Worker ¶
func LinkWorker2Worker(a NodeWorker, b NodeWorker) error
LinkWorker2Worker links two NodeWorkers, maps input channel of a b on output channel of a
Types ¶
type CnvContext ¶
type CnvContext interface { context.Context WithCancel() CnvContext WithTimeout(time.Duration) CnvContext Cancel() SendLog(int32, string, error) SendStatus(string) GetData() interface{} }
CnvContext is an interface, which is satisfied by CnvContext. This interface is primarily to enabling mocking for unit-testing.CnvContextAble Or may be, something fancy that you might want to do.
type ConcreteJointExecutor ¶ added in v1.0.1
type ConcreteJointExecutor struct {
Name string
}
ConcreteJointExecutor struct represents a concrete node structure
func (*ConcreteJointExecutor) Count ¶ added in v1.0.1
func (cjh *ConcreteJointExecutor) Count() int
Count returns the number of executors required for joint
func (*ConcreteJointExecutor) GetName ¶ added in v1.0.2
func (cjh *ConcreteJointExecutor) GetName() string
GetName returns the name of the executor
func (*ConcreteJointExecutor) GetUniqueIdentifier ¶ added in v1.0.2
func (cjh *ConcreteJointExecutor) GetUniqueIdentifier() string
GetUniqueIdentifier can be used to fetch a unique string identifying the executor
func (*ConcreteJointExecutor) InputCount ¶ added in v1.0.1
func (cjh *ConcreteJointExecutor) InputCount() int
InputCount returns the number of executors required
func (*ConcreteJointExecutor) OutputCount ¶ added in v1.0.1
func (cjh *ConcreteJointExecutor) OutputCount() int
OutputCount returns the number of executors required
type ConcreteJointWorker ¶
type ConcreteJointWorker struct { *WPool Executor JointExecutor }
ConcreteJointWorker to run different joints
type ConcreteNodeExecutor ¶ added in v1.0.1
type ConcreteNodeExecutor struct { Name string Data interface{} }
ConcreteNodeExecutor struct represents a concrete node structure, you should compose it into your node structures
func (*ConcreteNodeExecutor) CleanUp ¶ added in v1.0.1
func (cnh *ConcreteNodeExecutor) CleanUp() error
CleanUp does any cleanup if needed after executors are done
func (*ConcreteNodeExecutor) Count ¶ added in v1.0.1
func (cnh *ConcreteNodeExecutor) Count() int
Count returns the number of executors required
func (*ConcreteNodeExecutor) Execute ¶ added in v1.0.2
func (cnh *ConcreteNodeExecutor) Execute(ctx CnvContext, inData map[string]interface{}) (map[string]interface{}, error)
Execute should take a "map[string]interface{}" as input and returns a map[string]interface{}" as output Ideally it should process the input and either return a new map, or just add it's own keys to it, if we want to retain the data from previous node. This base implementation, just returns an error, so you need to override it with your own. Any struct may define both Execute & ExecuteLoop.Execute Execute will be used if mode is set to conveyor.WorkerModeTransaction
func (*ConcreteNodeExecutor) ExecuteLoop ¶ added in v1.0.2
func (cnh *ConcreteNodeExecutor) ExecuteLoop(ctx CnvContext, inChan <-chan map[string]interface{}, outChan chan<- map[string]interface{}) error
ExecuteLoop should take two "map[string]interface{}" channels. It is a more hands-on version of Execute() method, where you have to handle reading from input channel and writing to output channel, after processing on your own This base implementation, just returns an error, so you need to override it with your own. Any struct may define both Execute & ExecuteLoop. ExecuteLoop will be used if mode is set to conveyor.WorkerModeLoop
func (*ConcreteNodeExecutor) GetName ¶ added in v1.0.2
func (cnh *ConcreteNodeExecutor) GetName() string
GetName returns the name of the executor
func (*ConcreteNodeExecutor) GetUniqueIdentifier ¶ added in v1.0.2
func (cnh *ConcreteNodeExecutor) GetUniqueIdentifier() string
GetUniqueIdentifier can be used to fetch a unique string identifying the executor
type ConcreteNodeWorker ¶
type ConcreteNodeWorker struct { *WPool WorkerCount int Mode WorkerMode Executor NodeExecutor }
ConcreteNodeWorker to run different nodes
func (*ConcreteNodeWorker) CreateChannels ¶ added in v1.0.2
func (cnw *ConcreteNodeWorker) CreateChannels(buffer int)
CreateChannels creates channels for the worker
func (*ConcreteNodeWorker) WaitAndStop ¶ added in v1.0.2
func (cnw *ConcreteNodeWorker) WaitAndStop(ctx CnvContext) error
WaitAndStop ConcreteNodeWorker
type Conveyor ¶
type Conveyor struct { Name string // contains filtered or unexported fields }
Conveyor is base
func NewConveyor ¶ added in v1.0.2
NewConveyor creates a new Conveyor instance, with all options set to default values/implementations
func (*Conveyor) AddJointExecutor ¶ added in v1.0.2
func (cnv *Conveyor) AddJointExecutor(jointExecutor JointExecutor) error
AddJointExecutor creates a worker for a given executor (based on workerMode & workerType) And then links it to the last "Node" added to the conveyor, by creating and mapping connecting channels In case there was no node added previously, it skips the linking part
func (*Conveyor) AddJointExecutorAfterNode ¶ added in v1.0.2
func (cnv *Conveyor) AddJointExecutorAfterNode(jointExecutor JointExecutor, workerMode WorkerMode, workerType string) error
AddJointExecutorAfterNode creates a worker for a given executor (based on workerMode & workerType) And then links it to the last "Joint" added to the conveyor, by creating and mapping connecting channels In case there was no "Joint" added previously, it returns an error
func (*Conveyor) AddJointWorker ¶
func (cnv *Conveyor) AddJointWorker(joint JointWorker) error
AddJointWorker employs a new joint station to the conveyor
func (*Conveyor) AddNodeExecutor ¶ added in v1.0.2
func (cnv *Conveyor) AddNodeExecutor(nodeExecutor NodeExecutor, workerMode WorkerMode, workerType string) error
AddNodeExecutor creates a worker for a given executor (based on workerMode & workerType) And then links it to the last "Node" added to the conveyor, by creating and mapping connecting channels In case there was no node added previously, it skips the linking part
func (*Conveyor) AddNodeExecutorAfterJoint ¶ added in v1.0.2
func (cnv *Conveyor) AddNodeExecutorAfterJoint(nodeExecutor NodeExecutor, workerMode WorkerMode, workerType string) error
AddNodeExecutorAfterJoint creates a worker for a given executor (based on workerMode & workerType) And then links it to the last "Joint" added to the conveyor, by creating and mapping connecting channels In case there was no "Joint" added previously, it returns an error
func (*Conveyor) AddNodeWorker ¶ added in v1.0.2
func (cnv *Conveyor) AddNodeWorker(worker NodeWorker, toLink bool) error
AddNodeWorker employs a new worker station to the conveyor
func (*Conveyor) Done ¶ added in v1.0.2
func (cnv *Conveyor) Done() <-chan struct{}
Done returns the context.Done() channel of Conveyor
func (*Conveyor) EnableProgress ¶ added in v1.0.2
EnableProgress sets the expectedDuration of Conveyor to a given value. Also enables progress based on this value of expectedDuration Will have no effect, once you add your first node
func (*Conveyor) GetConveyorContext ¶ added in v1.0.2
func (cnv *Conveyor) GetConveyorContext() CnvContext
GetConveyorContext gives the conveyor's context object
func (*Conveyor) GetLastWorker ¶
func (cnv *Conveyor) GetLastWorker() (NodeWorker, error)
GetLastWorker returns the last added worker, or error if conveyor is empty
func (*Conveyor) MarkCurrentState ¶ added in v1.0.2
MarkCurrentState marks the current stage of conveyor using internal life-cycle handler interface
func (*Conveyor) SetCustomContext ¶ added in v1.0.2
func (cnv *Conveyor) SetCustomContext(ctx CnvContext) *Conveyor
SetCustomContext sets the conveyor's CnvContext interface to a given implementation Will have no effect, once you add your first node This method must be called before you call "SetTimeout()"
func (*Conveyor) SetID ¶ added in v1.0.2
SetID sets the id of Conveyor to a given string Will have no effect, once you add your first node
func (*Conveyor) SetLifeCycleHandler ¶ added in v1.0.2
func (cnv *Conveyor) SetLifeCycleHandler(lch LifeCycleHandler) *Conveyor
SetLifeCycleHandler sets the conveyor's LifeCycleHandler interface to a given implementation Will have no effect, once you add your first node
func (*Conveyor) SetTimeout ¶ added in v1.0.2
SetTimeout sets the timeout of Conveyor to a given value Will have no effect, once you add your first node If you change the context using "SetCustomContext()" after calling this method, timeout will get reset
type CtxData ¶
type CtxData struct { Name string // contains filtered or unexported fields }
CtxData stores the information that is stored inside a conveyor, useful for it's lifecycle.ConveyorData Any fields only useful for initialization shouldn't be here
type JointExecutor ¶ added in v1.0.1
type JointExecutor interface { GetName() string GetUniqueIdentifier() string ExecuteLoop(ctx CnvContext, inChan []chan map[string]interface{}, outChan []chan map[string]interface{}) error Count() int InputCount() int OutputCount() int }
JointExecutor interface is the interface that you need to implement in your own types of joints
type JointWorker ¶
type JointWorker interface { Start(ctx CnvContext) error WaitAndStop() error CreateChannels(int) SetInputChannels([]chan map[string]interface{}) error SetOutputChannels([]chan map[string]interface{}) error GetInputChannels() ([]chan map[string]interface{}, error) GetOutputChannels() ([]chan map[string]interface{}, error) AddInputChannel(chan map[string]interface{}) error AddOutputChannel(chan map[string]interface{}) error }
JointWorker interface binds to nodes that have the capability to fetch intermidiate data, and forward it to next node
func NewJointWorkerPool ¶
func NewJointWorkerPool(executor JointExecutor) JointWorker
NewJointWorkerPool creates a new OperationWorkerPool
type JointWorkerPool ¶
type JointWorkerPool struct { *ConcreteJointWorker // contains filtered or unexported fields }
JointWorkerPool struct provides the worker pool infra for Joint interface, that act as connections between nodes
func (*JointWorkerPool) AddInputChannel ¶
func (jwp *JointWorkerPool) AddInputChannel(inChan chan map[string]interface{}) error
AddInputChannel maps a slice of channels on the join't outupt channels
func (*JointWorkerPool) AddOutputChannel ¶
func (jwp *JointWorkerPool) AddOutputChannel(outChan chan map[string]interface{}) error
AddOutputChannel maps a slice of channels on the join't outupt channels
func (*JointWorkerPool) CreateChannels ¶ added in v1.0.2
func (jwp *JointWorkerPool) CreateChannels(buffer int)
CreateChannels creates channels for the joint worker
func (*JointWorkerPool) GetInputChannels ¶
func (jwp *JointWorkerPool) GetInputChannels() ([]chan map[string]interface{}, error)
GetInputChannels returns the input channel of Joint WorkerPool
func (*JointWorkerPool) GetOutputChannels ¶
func (jwp *JointWorkerPool) GetOutputChannels() ([]chan map[string]interface{}, error)
GetOutputChannels returns the output channel of Joint WorkerPool
func (*JointWorkerPool) SetInputChannels ¶
func (jwp *JointWorkerPool) SetInputChannels(inChans []chan map[string]interface{}) error
SetInputChannels updates the input channel of Joint WorkerPool
func (*JointWorkerPool) SetOutputChannels ¶
func (jwp *JointWorkerPool) SetOutputChannels(outChans []chan map[string]interface{}) error
SetOutputChannels updates the output channel of Joint WorkerPool
func (*JointWorkerPool) Start ¶
func (jwp *JointWorkerPool) Start(ctx CnvContext) error
Start JoinWorkerPool
func (*JointWorkerPool) WaitAndStop ¶
func (jwp *JointWorkerPool) WaitAndStop() error
WaitAndStop JointWorkerPool
type LifeCycleHandler ¶ added in v1.0.2
type LifeCycleHandler interface { ProgressUpdater StateUpdater }
LifeCycleHandler handles conveyor start/stop
type LocalLifeCycleHandler ¶ added in v1.0.2
type LocalLifeCycleHandler struct { }
type NodeExecutor ¶ added in v1.0.1
type NodeExecutor interface { GetName() string GetUniqueIdentifier() string ExecuteLoop(ctx CnvContext, inChan <-chan map[string]interface{}, outChan chan<- map[string]interface{}) error Execute(ctx CnvContext, inData map[string]interface{}) (map[string]interface{}, error) Count() int CleanUp() error }
NodeExecutor interface is the interface that you need to implement in your own types of nodes
type NodeWorker ¶
type NodeWorker interface { Start(ctx CnvContext) error WaitAndStop(ctx CnvContext) error CreateChannels(int) WorkerType() string SetInputChannel(chan map[string]interface{}) error SetOutputChannel(chan map[string]interface{}) error GetInputChannel() (chan map[string]interface{}, error) GetOutputChannel() (chan map[string]interface{}, error) }
NodeWorker interface binds to nodes that have the capability to fetch intermediate data, and forward it to next node
func NewOperationWorkerPool ¶ added in v1.0.2
func NewOperationWorkerPool(executor NodeExecutor, mode WorkerMode) NodeWorker
NewOperationWorkerPool creates a new OperationWorkerPool
func NewSinkWorkerPool ¶
func NewSinkWorkerPool(executor NodeExecutor, mode WorkerMode) NodeWorker
NewSinkWorkerPool creates a new SinkWorkerPool
func NewSourceWorkerPool ¶
func NewSourceWorkerPool(executor NodeExecutor, mode WorkerMode) NodeWorker
NewSourceWorkerPool creates a new SourceWorkerPool
type OperationNode ¶ added in v1.0.2
type OperationNode struct {
Pool *OperationWorkerPool
}
OperationNode structue
type OperationWorkerPool ¶ added in v1.0.2
type OperationWorkerPool struct { *ConcreteNodeWorker // contains filtered or unexported fields }
OperationWorkerPool struct provides the worker pool infra for Operation interface
func (*OperationWorkerPool) CreateChannels ¶ added in v1.0.2
func (fwp *OperationWorkerPool) CreateChannels(buffer int)
CreateChannels creates channels for the Operation WorkerPool
func (*OperationWorkerPool) GetInputChannel ¶ added in v1.0.2
func (fwp *OperationWorkerPool) GetInputChannel() (chan map[string]interface{}, error)
GetInputChannel returns the input channel of Operation WorkerPool
func (*OperationWorkerPool) GetOutputChannel ¶ added in v1.0.2
func (fwp *OperationWorkerPool) GetOutputChannel() (chan map[string]interface{}, error)
GetOutputChannel returns the output channel of Operation WorkerPool
func (*OperationWorkerPool) SetInputChannel ¶ added in v1.0.2
func (fwp *OperationWorkerPool) SetInputChannel(inChan chan map[string]interface{}) error
SetInputChannel updates the input channel of Operation WorkerPool
func (*OperationWorkerPool) SetOutputChannel ¶ added in v1.0.2
func (fwp *OperationWorkerPool) SetOutputChannel(outChan chan map[string]interface{}) error
SetOutputChannel updates the output channel of Operation WorkerPool
func (*OperationWorkerPool) Start ¶ added in v1.0.2
func (fwp *OperationWorkerPool) Start(ctx CnvContext) error
Start Operation Worker Pool
func (*OperationWorkerPool) WaitAndStop ¶ added in v1.0.2
func (fwp *OperationWorkerPool) WaitAndStop(ctx CnvContext) error
WaitAndStop OperationWorkerPool
func (*OperationWorkerPool) WorkerType ¶ added in v1.0.2
func (fwp *OperationWorkerPool) WorkerType() string
WorkerType returns the type of worker
type ProgressUpdater ¶ added in v1.0.2
type ReplicateJoint ¶ added in v1.0.2
type ReplicateJoint struct { *ConcreteJointExecutor OutChanCount int }
ReplicateJoint is a plumbing joint that connects a source/operation node to multiple operation/sink nodes
func NewReplicateJoint ¶ added in v1.0.2
func NewReplicateJoint(name string, outChanCount int) (*ReplicateJoint, error)
NewReplicateJoint creates a new joint to replicate same data to multiple channels
func NewReplicateJointWithContext ¶ added in v1.0.2
func NewReplicateJointWithContext(cnvCtx CnvContext, name string, outChanCount int) (*ReplicateJoint, error)
NewReplicateJointWithContext creates a new joint (with context) to replicate same data to multiple channels
func (*ReplicateJoint) ExecuteLoop ¶ added in v1.0.2
func (rj *ReplicateJoint) ExecuteLoop(cnvCtx CnvContext, inChans []chan map[string]interface{}, outChans []chan map[string]interface{}) error
ExecuteLoop method produces data for other nodes from inputChannel file, and broadcasts copies of this data on all of it's output channels
func (*ReplicateJoint) OutputCount ¶ added in v1.0.2
func (rj *ReplicateJoint) OutputCount() int
OutputCount returns the number of executors required
type SinkWorkerPool ¶
type SinkWorkerPool struct { *ConcreteNodeWorker // contains filtered or unexported fields }
SinkWorkerPool struct provides the worker pool infra for Sink interface
func (*SinkWorkerPool) CreateChannels ¶ added in v1.0.2
func (swp *SinkWorkerPool) CreateChannels(buffer int)
CreateChannels creates channels for the sink worker
func (*SinkWorkerPool) GetInputChannel ¶
func (swp *SinkWorkerPool) GetInputChannel() (chan map[string]interface{}, error)
GetInputChannel returns the input channel of Sink WorkerPool
func (*SinkWorkerPool) GetOutputChannel ¶
func (swp *SinkWorkerPool) GetOutputChannel() (chan map[string]interface{}, error)
GetOutputChannel returns the output channel of Sink WorkerPool
func (*SinkWorkerPool) SetInputChannel ¶
func (swp *SinkWorkerPool) SetInputChannel(inChan chan map[string]interface{}) error
SetInputChannel updates the input channel of Sink WorkerPool
func (*SinkWorkerPool) SetOutputChannel ¶
func (swp *SinkWorkerPool) SetOutputChannel(outChan chan map[string]interface{}) error
SetOutputChannel updates the output channel of Sink WorkerPool
func (*SinkWorkerPool) Start ¶
func (swp *SinkWorkerPool) Start(ctx CnvContext) error
Start Sink Worker Pool
func (*SinkWorkerPool) WaitAndStop ¶
func (swp *SinkWorkerPool) WaitAndStop(ctx CnvContext) error
WaitAndStop SinkWorkerPool
func (*SinkWorkerPool) WorkerType ¶
func (swp *SinkWorkerPool) WorkerType() string
WorkerType returns the type of worker
type SourceWorkerPool ¶
type SourceWorkerPool struct { *ConcreteNodeWorker // contains filtered or unexported fields }
SourceWorkerPool struct provides the worker pool infra for Source interface
func (*SourceWorkerPool) GetInputChannel ¶
func (swp *SourceWorkerPool) GetInputChannel() (chan map[string]interface{}, error)
GetInputChannel returns the input channel of Source WorkerPool
func (*SourceWorkerPool) GetOutputChannel ¶
func (swp *SourceWorkerPool) GetOutputChannel() (chan map[string]interface{}, error)
GetOutputChannel returns the output channel of Source WorkerPool
func (*SourceWorkerPool) SetInputChannel ¶
func (swp *SourceWorkerPool) SetInputChannel(inChan chan map[string]interface{}) error
SetInputChannel updates the input channel of Source WorkerPool
func (*SourceWorkerPool) SetOutputChannel ¶
func (swp *SourceWorkerPool) SetOutputChannel(outChan chan map[string]interface{}) error
SetOutputChannel updates the output channel of Source WorkerPool
func (*SourceWorkerPool) Start ¶
func (swp *SourceWorkerPool) Start(ctx CnvContext) error
Start Source Worker Pool
func (*SourceWorkerPool) WaitAndStop ¶
func (swp *SourceWorkerPool) WaitAndStop(ctx CnvContext) error
WaitAndStop SourceWorkerPool
func (*SourceWorkerPool) WorkerType ¶
func (swp *SourceWorkerPool) WorkerType() string
WorkerType returns the type of worker
type StateUpdater ¶ added in v1.0.2
type WorkerMode ¶ added in v1.0.2
type WorkerMode uint8
WorkerMode decides if worker would run in loop mode or single transaction mode