Documentation ¶
Overview ¶
Package fsm layers an erlang/akka style finite state machine abstraction on top of SWF, and facilitates modeling your workflows as FSMs. The FSM will be responsible for handling the decision tasks in your workflow that implicitly model it.
The FSM takes care of serializing/deserializing and threading a data model through the workflow history for you, as well as serialization/deserialization of any payloads in events your workflows recieve, as well as optionally sending the data model snapshots to kinesis, to facilitate a CQRS style application where the query models will be built off the Kinesis stream.
From http://www.erlang.org/doc/design_principles/fsm.html, a finite state machine, or FSM, can be described as a set of relations of the form:
State(S) x Event(E) -> Actions(A), State(S')
Substituting the relevant SWF/swf4go concepts, we get
(Your main data struct) x (an swf.HistoryEvent) -> (zero or more swf.Decisions), (A possibly updated main data struct)
See the http://godoc.org/github.com/sclasen/swfsm/fsm#example-FSM for a complete usage example.
Index ¶
- Constants
- func CloseDecisionIncompatableDecisionTypes() []string
- func CloseDecisionTypes() []string
- func GetTagsIfTaggable(data interface{}) []*string
- func NewHistorySegmentor(c *client) *historySegmentor
- func StartFSMWorkflowInput(serializer Serialization, data interface{}) *string
- type ActivityInfo
- type BoundedGoroutineDispatcher
- type CallingGoroutineDispatcher
- type CancellationInfo
- type ChildInfo
- type ClientSWFOps
- type ComposedDecider
- type ComposedDecisionInterceptor
- func (c *ComposedDecisionInterceptor) AfterDecision(decision *swf.PollForDecisionTaskOutput, ctx *FSMContext, outcome *Outcome)
- func (c *ComposedDecisionInterceptor) BeforeDecision(decision *swf.PollForDecisionTaskOutput, ctx *FSMContext, outcome *Outcome)
- func (c *ComposedDecisionInterceptor) BeforeTask(decision *swf.PollForDecisionTaskOutput)
- type Decider
- func AddDecision(decisionFn DecisionFunc) Decider
- func AddDecisions(signalFn MultiDecisionFunc) Decider
- func CancelWorkflow(details *string) Decider
- func CompleteWorkflow() Decider
- func DefaultDecider() Decider
- func FailWorkflow(details *string) Decider
- func NewComposedDecider(deciders ...Decider) Decider
- func OnActivityCanceled(activityName string, deciders ...Decider) Decider
- func OnActivityCompleted(activityName string, deciders ...Decider) Decider
- func OnActivityEvents(activityName string, eventTypes []string, deciders ...Decider) Decider
- func OnActivityFailed(activityName string, deciders ...Decider) Decider
- func OnActivityFailedTimedOutCanceled(activityName string, deciders ...Decider) Decider
- func OnActivityHeartbeatTimeout(activityName string, deciders ...Decider) Decider
- func OnActivityScheduleToCloseTimeout(activityName string, deciders ...Decider) Decider
- func OnActivityScheduleToStartTimeout(activityName string, deciders ...Decider) Decider
- func OnActivityStartToCloseTimeout(activityName string, deciders ...Decider) Decider
- func OnActivityStarted(activityName string, deciders ...Decider) Decider
- func OnActivityTimedOut(activityName string, deciders ...Decider) Decider
- func OnChildCompleted(deciders ...Decider) Decider
- func OnChildStartFailed(deciders ...Decider) Decider
- func OnChildStartFailedAlreadyRunning(deciders ...Decider) Decider
- func OnChildStartFailedAndNotAlreadyRunning(deciders ...Decider) Decider
- func OnChildStarted(deciders ...Decider) Decider
- func OnChildStartedOrAlreadyRunning(deciders ...Decider) Decider
- func OnContinueFailed(deciders ...Decider) Decider
- func OnData(predicate PredicateFunc, deciders ...Decider) Decider
- func OnDataUnless(predicate PredicateFunc, deciders ...Decider) Decider
- func OnExternalCancellationResponse(exitDecider Decider) Decider
- func OnExternalWorkflowExecutionCancelRequested(deciders ...Decider) Decider
- func OnRequestCancelExternalWorkflowExecutionFailed(deciders ...Decider) Decider
- func OnSignalFailed(signalName string, deciders ...Decider) Decider
- func OnSignalFailedAndNotUnknown(signalName string, deciders ...Decider) Decider
- func OnSignalReceived(signalName string, deciders ...Decider) Decider
- func OnSignalSent(signalName string, deciders ...Decider) Decider
- func OnSignalsReceived(signalNames []string, deciders ...Decider) Decider
- func OnStartTimerFailed(timer string, deciders ...Decider) Decider
- func OnStarted(deciders ...Decider) Decider
- func OnTimerCanceled(timer string, deciders ...Decider) Decider
- func OnTimerFired(timerId string, deciders ...Decider) Decider
- func OnUnknownWorkflowSignaled(signalName string, deciders ...Decider) Decider
- func OnWorkflowCancelRequested(deciders ...Decider) Decider
- func Stay() Decider
- func Transition(toState string) Decider
- func UpdateState(updateFunc StateFunc) Decider
- type DecisionErrorHandler
- type DecisionFunc
- type DecisionInterceptor
- func CloseWorkflowRemoveIncompatibleDecisionInterceptor() DecisionInterceptor
- func DedupeDecisions(decisionType string) DecisionInterceptor
- func DedupeWorkflowCancellations() DecisionInterceptor
- func DedupeWorkflowCloseDecisions() DecisionInterceptor
- func DedupeWorkflowCompletes() DecisionInterceptor
- func DedupeWorkflowFailures() DecisionInterceptor
- func ManagedContinuations(historySize int, workflowAgeInSec int, timerRetrySeconds int) DecisionInterceptor
- func ManagedContinuationsWithJitter(historySize int, maxSizeJitter int, workflowAgeInSec int, ...) DecisionInterceptor
- func MoveDecisionsToEnd(decisionType string) DecisionInterceptor
- func MoveWorkflowCloseDecisionsToEnd() DecisionInterceptor
- func NewComposedDecisionInterceptor(interceptors ...DecisionInterceptor) DecisionInterceptor
- func RemoveLowerPriorityDecisions(prioritizedDecisionTypes ...string) DecisionInterceptor
- func StartCancelInterceptor() DecisionInterceptor
- type DecisionTaskDispatcher
- type EventCorrelator
- func (a *EventCorrelator) ActivityInfo(h *swf.HistoryEvent) *ActivityInfo
- func (a *EventCorrelator) Attempts(h *swf.HistoryEvent) int
- func (a *EventCorrelator) AttemptsForActivity(info *ActivityInfo) int
- func (a *EventCorrelator) AttemptsForCancellation(info *CancellationInfo) int
- func (a *EventCorrelator) AttemptsForChild(info *ChildInfo) int
- func (a *EventCorrelator) AttemptsForSignal(signalInfo *SignalInfo) int
- func (a *EventCorrelator) CancellationInfo(h *swf.HistoryEvent) *CancellationInfo
- func (a *EventCorrelator) ChildInfo(h *swf.HistoryEvent) *ChildInfo
- func (a *EventCorrelator) Correlate(h *swf.HistoryEvent)
- func (a *EventCorrelator) RemoveCorrelation(h *swf.HistoryEvent)
- func (a *EventCorrelator) SignalInfo(h *swf.HistoryEvent) *SignalInfo
- func (a *EventCorrelator) TimerInfo(h *swf.HistoryEvent) *TimerInfo
- func (a *EventCorrelator) TimerScheduled(timerId string) bool
- func (a *EventCorrelator) Track(h *swf.HistoryEvent)
- type FSM
- func (f *FSM) AddCanceledState(state *FSMState)
- func (f *FSM) AddCompleteState(state *FSMState)
- func (f *FSM) AddCompleteStateWithHandler(state *FSMState, handler DecisionErrorHandler)
- func (f *FSM) AddErrorHandler(state string, handler DecisionErrorHandler)
- func (f *FSM) AddFailedState(state *FSMState)
- func (f *FSM) AddInitialState(state *FSMState)
- func (f *FSM) AddInitialStateWithHandler(state *FSMState, handler DecisionErrorHandler)
- func (f *FSM) AddState(state *FSMState)
- func (f *FSM) DefaultCanceledState() *FSMState
- func (f *FSM) DefaultCompleteState() *FSMState
- func (f *FSM) DefaultDecisionErrorHandler(ctx *FSMContext, event *swf.HistoryEvent, stateBeforeEvent interface{}, ...) (*Outcome, error)
- func (f *FSM) DefaultDecisionInterceptor() DecisionInterceptor
- func (f *FSM) DefaultFailedState() *FSMState
- func (f *FSM) DefaultTaskErrorHandler(decisionTask *swf.PollForDecisionTaskOutput, err error)
- func (f *FSM) Deserialize(serialized string, data interface{})
- func (f *FSM) EmptyDecisions() []*swf.Decision
- func (f *FSM) ErrorDeserializingStateData(decisionTask *swf.PollForDecisionTaskOutput, serializedStateData string, ...)
- func (f *FSM) ErrorFindingCorrelator(decisionTask *swf.PollForDecisionTaskOutput, err error)
- func (f *FSM) ErrorFindingStateData(decisionTask *swf.PollForDecisionTaskOutput, err error)
- func (f *FSM) ErrorMissingFSMState(decisionTask *swf.PollForDecisionTaskOutput, outcome Outcome)
- func (f *FSM) ErrorSerializingStateData(decisionTask *swf.PollForDecisionTaskOutput, outcome Outcome, ...)
- func (f *FSM) ErrorStateTick(decisionTask *swf.PollForDecisionTaskOutput, error *SerializedErrorState, ...) (*Outcome, error)
- func (f *FSM) EventData(event *swf.HistoryEvent, eventData interface{})
- func (f *FSM) Init()
- func (f *FSM) InitialState() string
- func (f *FSM) Serialize(data interface{}) string
- func (f *FSM) Start()
- func (f *FSM) StateSerializer() StateSerializer
- func (f *FSM) Stop()
- func (f *FSM) Tick(decisionTask *swf.PollForDecisionTaskOutput) (*FSMContext, []*swf.Decision, *SerializedState, error)
- type FSMClient
- type FSMContext
- func (f *FSMContext) ActivitiesInfo() map[string]*ActivityInfo
- func (f *FSMContext) ActivityInfo(h *swf.HistoryEvent) *ActivityInfo
- func (f *FSMContext) Attempts(h *swf.HistoryEvent) int
- func (f *FSMContext) CancelWorkflow(data interface{}, details *string) Outcome
- func (f *FSMContext) CompleteWorkflow(data interface{}, decisions ...*swf.Decision) Outcome
- func (f *FSMContext) CompleteWorkflowDecision(data interface{}) *swf.Decision
- func (f *FSMContext) ContinueDecider(data interface{}, decisions []*swf.Decision) Outcome
- func (f *FSMContext) ContinueWorkflow(data interface{}, decisions ...*swf.Decision) Outcome
- func (f *FSMContext) ContinueWorkflowDecision(continuedState string, data interface{}) *swf.Decision
- func (f *FSMContext) Correlator() *EventCorrelator
- func (f *FSMContext) Decide(h *swf.HistoryEvent, data interface{}, decider Decider) Outcome
- func (f *FSMContext) Decision(d *swf.Decision) []*swf.Decision
- func (f *FSMContext) Deserialize(serialized string, data interface{})
- func (f *FSMContext) EmptyDecisions() []*swf.Decision
- func (f *FSMContext) EventData(h *swf.HistoryEvent, data interface{})
- func (f *FSMContext) FailWorkflow(data interface{}, details *string) Outcome
- func (f *FSMContext) Goto(state string, data interface{}, decisions []*swf.Decision) Outcome
- func (f *FSMContext) InitialState() string
- func (f *FSMContext) Pass() Outcome
- func (f *FSMContext) Serialize(data interface{}) string
- func (f *FSMContext) Serializer() StateSerializer
- func (f *FSMContext) SignalInfo(h *swf.HistoryEvent) *SignalInfo
- func (f *FSMContext) SignalsInfo() map[string]*SignalInfo
- func (f *FSMContext) StateSerializer() StateSerializer
- func (f *FSMContext) Stay(data interface{}, decisions []*swf.Decision) Outcome
- type FSMErrorReporter
- type FSMState
- type FindInput
- type FindOutput
- type Finder
- type FuncInterceptor
- func (i *FuncInterceptor) AfterDecision(decision *swf.PollForDecisionTaskOutput, ctx *FSMContext, outcome *Outcome)
- func (i *FuncInterceptor) BeforeDecision(decision *swf.PollForDecisionTaskOutput, ctx *FSMContext, outcome *Outcome)
- func (i *FuncInterceptor) BeforeTask(decision *swf.PollForDecisionTaskOutput)
- type HistorySegment
- type HistorySegmentEvent
- type HistorySegmentState
- type HistorySegmentor
- type JSONStateSerializer
- type KinesisOps
- type KinesisReplication
- type KinesisReplicator
- type MultiDecisionFunc
- type NewGoroutineDispatcher
- type Outcome
- type PredicateFunc
- type ReplicationHandler
- type SWFOps
- type Serialization
- type SerializedActivityState
- type SerializedErrorState
- type SerializedState
- type SignalInfo
- type StartCancelPair
- type Stasher
- type StateFunc
- type StateSerializer
- type Taggable
- type TaskErrorHandler
- type TimerInfo
- type TypedFuncs
- func (t *TypedFuncs) Decider(decider interface{}) Decider
- func (t *TypedFuncs) DecisionFunc(decisionFunc interface{}) DecisionFunc
- func (t *TypedFuncs) MultiDecisionFunc(decisionFunc interface{}) MultiDecisionFunc
- func (t *TypedFuncs) PredicateFunc(stateFunc interface{}) PredicateFunc
- func (t *TypedFuncs) StateFunc(stateFunc interface{}) StateFunc
Examples ¶
Constants ¶
const ( FilterStatusAll = "ALL" // open + closed FilterStatusOpen = "OPEN" // open only FilterStatusOpenPriority = "OPEN_PRIORITY" // open (+ closed, only if open is totally empty) FilterStatusOpenPriorityWorkflow = "OPEN_PRIORITY_WORKFLOW" // open (+ closed, only if open not present workflow-by-workflow) FilterStatusClosed = "CLOSED" // closed only )
const ( StateMarker = "FSM.State" CorrelatorMarker = "FSM.Correlator" ErrorMarker = "FSM.Error" RepiarStateSignal = "FSM.RepairState" ContinueTimer = "FSM.ContinueWorkflow" ContinueSignal = "FSM.ContinueWorkflow" CompleteState = "complete" CanceledState = "canceled" FailedState = "failed" ErrorState = "error" //the FSM was not configured with a state named in an outcome. FSMErrorMissingState = "ErrorMissingFsmState" //the FSM encountered an erryor while serializaing stateData FSMErrorStateSerialization = "ErrorStateSerialization" //the FSM encountered an erryor while deserializaing stateData FSMErrorStateDeserialization = "ErrorStateDeserialization" //the FSM encountered an erryor while deserializaing stateData FSMErrorCorrelationDeserialization = "ErrorCorrelationDeserialization" //Signal sent when a Long Lived Worker Start() ActivityStartedSignal = "FSM.ActivityStarted" //Signal send when long Lived worker sends an update from Work() ActivityUpdatedSignal = "FSM.ActivityUpdated" )
constants used as marker names or signal names
Variables ¶
This section is empty.
Functions ¶
func CloseDecisionIncompatableDecisionTypes ¶
func CloseDecisionIncompatableDecisionTypes() []string
func CloseDecisionTypes ¶
func CloseDecisionTypes() []string
func GetTagsIfTaggable ¶
func GetTagsIfTaggable(data interface{}) []*string
func NewHistorySegmentor ¶
func NewHistorySegmentor(c *client) *historySegmentor
func StartFSMWorkflowInput ¶
func StartFSMWorkflowInput(serializer Serialization, data interface{}) *string
StartFSMWorkflowInput should be used to construct the input for any StartWorkflowExecutionRequests. This panics on errors cause really this should never err.
Types ¶
type ActivityInfo ¶
type ActivityInfo struct { ActivityId string *swf.ActivityType Input *string }
ActivityInfo holds the ActivityId and ActivityType for an activity
type BoundedGoroutineDispatcher ¶
type BoundedGoroutineDispatcher struct { NumGoroutines int // contains filtered or unexported fields }
BoundedGoroutineDispatcher is a DecisionTaskDispatcher that uses a bounded number of goroutines to run decision handlers.
func (*BoundedGoroutineDispatcher) DispatchTask ¶
func (b *BoundedGoroutineDispatcher) DispatchTask(task *swf.PollForDecisionTaskOutput, handler func(*swf.PollForDecisionTaskOutput))
DispatchTask calls sends the task on a channel that NumGoroutines goroutines are selecting on. Goroutines recieving a task run it in the same goroutine. note that this is unsynchronized as DispatchTask will only be called by the single poller goroutine.
type CallingGoroutineDispatcher ¶
type CallingGoroutineDispatcher struct{}
CallingGoroutineDispatcher is a DecisionTaskDispatcher that runs the decision handler in the polling goroutine
func (*CallingGoroutineDispatcher) DispatchTask ¶
func (*CallingGoroutineDispatcher) DispatchTask(task *swf.PollForDecisionTaskOutput, handler func(*swf.PollForDecisionTaskOutput))
DispatchTask calls the handler in the same goroutine.
type CancellationInfo ¶
CancellationInfo holds the Control data and workflow that was being canceled
type ChildInfo ¶
type ChildInfo struct { WorkflowId string Input *string *swf.WorkflowType }
ChildInfo holds the Input data and Workflow info for the child workflow being started
type ClientSWFOps ¶
type ClientSWFOps interface { ListOpenWorkflowExecutions(req *swf.ListOpenWorkflowExecutionsInput) (resp *swf.WorkflowExecutionInfos, err error) ListClosedWorkflowExecutions(req *swf.ListClosedWorkflowExecutionsInput) (resp *swf.WorkflowExecutionInfos, err error) GetWorkflowExecutionHistory(req *swf.GetWorkflowExecutionHistoryInput) (resp *swf.GetWorkflowExecutionHistoryOutput, err error) GetWorkflowExecutionHistoryPages(input *swf.GetWorkflowExecutionHistoryInput, fn func(p *swf.GetWorkflowExecutionHistoryOutput, lastPage bool) (shouldContinue bool)) error SignalWorkflowExecution(req *swf.SignalWorkflowExecutionInput) (resp *swf.SignalWorkflowExecutionOutput, err error) StartWorkflowExecution(req *swf.StartWorkflowExecutionInput) (resp *swf.StartWorkflowExecutionOutput, err error) TerminateWorkflowExecution(req *swf.TerminateWorkflowExecutionInput) (resp *swf.TerminateWorkflowExecutionOutput, err error) RequestCancelWorkflowExecution(req *swf.RequestCancelWorkflowExecutionInput) (resp *swf.RequestCancelWorkflowExecutionOutput, err error) }
type ComposedDecider ¶
type ComposedDecider struct {
// contains filtered or unexported fields
}
ComposedDecider can be used to build a decider out of a number of sub Deciders the sub deciders should return Pass when they dont wish to handle an event.
Example ¶
//to reduce boilerplate you can create reusable components to compose Deciders with, //that use functions that have the dataType of your FSM. typedFuncs := Typed(new(TestingType)) //for example. reduced boilerplate for the retry of failed activities. //first, you would have one of these typed DecisionFuncs for each activity decision type you create. fooActivityDecision := func(ctx *FSMContext, h *swf.HistoryEvent, data *TestingType) *swf.Decision { return &swf.Decision{ DecisionType: aws.String(swf.DecisionTypeScheduleActivityTask), ScheduleActivityTaskDecisionAttributes: &swf.ScheduleActivityTaskDecisionAttributes{ ActivityType: &swf.ActivityType{Name: aws.String("foo-activity"), Version: aws.String("1")}, }, } } barActivityDecision := func(ctx *FSMContext, h *swf.HistoryEvent, data *TestingType) *swf.Decision { return &swf.Decision{ DecisionType: aws.String(swf.DecisionTypeScheduleActivityTask), ScheduleActivityTaskDecisionAttributes: &swf.ScheduleActivityTaskDecisionAttributes{ ActivityType: &swf.ActivityType{Name: aws.String("bar-activity"), Version: aws.String("1")}, }, } } // optionally a type alias for your 'typed' decision fn. // if you dont do this the retryFailedActivities below will need to be // func(activityName string, activityFn interface{}) // instead of // func(activityName string, activityFn TestingTypeDecisionFunc) type TestingTypeDecisionFunc func(*FSMContext, *swf.HistoryEvent, *TestingType) *swf.Decision //now the retryFailedActivities function, which can be used for all activity funcs like the above. retryFailedActivities := func(activityName string, activityFn TestingTypeDecisionFunc) Decider { typedDecisionFn := typedFuncs.DecisionFunc(activityFn) return func(ctx *FSMContext, h *swf.HistoryEvent, data interface{}) Outcome { switch *h.EventType { case swf.EventTypeActivityTaskFailed, swf.EventTypeActivityTaskTimedOut, swf.EventTypeActivityTaskCanceled: if *ctx.ActivityInfo(h).Name == activityName { decisions := ctx.EmptyDecisions() retry := typedDecisionFn(ctx, h, data) decisions = append(decisions, retry) return ctx.Stay(data, decisions) } } return ctx.Pass() } } //now build a decider out of the parts. //the one thing you need to be careful of is having a unit test that executes the following //since the type checking can only be done at initialization at runtime here. decider := NewComposedDecider( retryFailedActivities("foo-activity", fooActivityDecision), retryFailedActivities("bar-activity", barActivityDecision), DefaultDecider(), ) decider(new(FSMContext), &swf.HistoryEvent{}, new(TestData))
Output:
func (*ComposedDecider) Decide ¶
func (c *ComposedDecider) Decide(ctx *FSMContext, h *swf.HistoryEvent, data interface{}) Outcome
Decide is the the Decider func for a ComposedDecider
type ComposedDecisionInterceptor ¶
type ComposedDecisionInterceptor struct {
// contains filtered or unexported fields
}
func (*ComposedDecisionInterceptor) AfterDecision ¶
func (c *ComposedDecisionInterceptor) AfterDecision(decision *swf.PollForDecisionTaskOutput, ctx *FSMContext, outcome *Outcome)
func (*ComposedDecisionInterceptor) BeforeDecision ¶
func (c *ComposedDecisionInterceptor) BeforeDecision(decision *swf.PollForDecisionTaskOutput, ctx *FSMContext, outcome *Outcome)
func (*ComposedDecisionInterceptor) BeforeTask ¶
func (c *ComposedDecisionInterceptor) BeforeTask(decision *swf.PollForDecisionTaskOutput)
type Decider ¶
type Decider func(*FSMContext, *swf.HistoryEvent, interface{}) Outcome
Decider decides an Outcome based on an event and the current data for an FSM. You can assert the interface{} parameter that is passed to the Decider as the type of the DataType field in the FSM. Alternatively, you can use TypedFuncs to create a typed decider to avoid having to do the assertion.
func AddDecision ¶
func AddDecision(decisionFn DecisionFunc) Decider
AddDecision adds a single decision to a ContinueDecider outcome
func AddDecisions ¶
func AddDecisions(signalFn MultiDecisionFunc) Decider
AddDecisions adds decisions to a ContinueDecider outcome
func CancelWorkflow ¶
CancelWorkflow cancels the workflow
func DefaultDecider ¶
func DefaultDecider() Decider
DefaultDecider is a 'catch-all' decider that simply logs the unhandled decision. You should place this or one like it as the last decider in your top level ComposableDecider.
func NewComposedDecider ¶
NewComposedDecider builds a Composed Decider from a list of sub Deciders. You can compose your fiinal composable decider from other composable deciders, but you should make sure that the final decider includes a 'catch-all' decider in last place you can use DefaultDecider() or your own.
func OnActivityCanceled ¶
OnActivityCanceled builds a composed decider that fires when a matching activity is canceled.
func OnActivityCompleted ¶
OnActivityCompleted builds a composed decider that fires when a matching activity completes.
func OnActivityEvents ¶
func OnActivityFailed ¶
OnActivityFailed builds a composed decider that fires when a matching activity fails.
func OnActivityFailedTimedOutCanceled ¶
OnActivityFailedTimedOutCanceled builds a composed decider that fires when a matching activity fails, times out, or is canceled.
func OnActivityStarted ¶
OnActivityStarted builds a composed decider that fires when a matching activity starts.
func OnActivityTimedOut ¶
OnActivityTimedOut builds a composed decider that fires when a matching activity times out.
func OnChildCompleted ¶
OnChildCompleted builds a composed decider that fires on EventTypeChildWorkflowExecutionCompleted.
func OnChildStartFailed ¶
OnChildStarted builds a composed decider that fires on EventTypeStartChildWorkflowExecutionFailed.
func OnChildStartFailedAndNotAlreadyRunning ¶
OnChildStartFailedAndNotAlreadyRunning builds a composed decider that fires on EventTypeStartChildWorkflowExecutionFailed and Cause != "WORKFLOW_ALREADY_RUNNING".
func OnChildStarted ¶
OnChildStarted builds a composed decider that fires on swf.EventTypeChildWorkflowExecutionStarted.
func OnChildStartedOrAlreadyRunning ¶
OnChildStartedOrAlreadyRunning builds a composed decider that fires on EventTypeChildWorkflowExecutionStarted OR EventTypeStartChildWorkflowExecutionFailed with Cause == "WORKFLOW_ALREADY_RUNNING".
func OnContinueFailed ¶
func OnData ¶
func OnData(predicate PredicateFunc, deciders ...Decider) Decider
OnData builds a composed decider that fires on when the PredicateFunc is satisfied.
func OnDataUnless ¶
func OnDataUnless(predicate PredicateFunc, deciders ...Decider) Decider
OnDataUnless builds a composed decider that fires on when the PredicateFunc is NOT satisfied.
func OnSignalFailed ¶
OnSignalFailed builds a composed decider that fires on when a matching signal fails.
func OnSignalFailedAndNotUnknown ¶
OnSignalFailedAndNotUnknown passes the event to OnSignalFailed only if the signal specified by signalName matches and the signalling was targeting a known Workflow ID.
func OnSignalReceived ¶
OnSignalReceived builds a composed decider that fires on when a matching signal is received.
func OnSignalSent ¶
OnSignalSent builds a composed decider that fires on when a matching signal is received.
func OnSignalsReceived ¶
OnSignalsReceived builds a composed decider that fires on when one of the matching signal is received.
func OnStartTimerFailed ¶
OnStartTimerFailed builds a composed decider that fires on EventTypeStartTimerFailed.
func OnStarted ¶
OnStarted builds a composed decider that fires on swf.EventTypeWorkflowExecutionStarted.
func OnTimerCanceled ¶
OnTimerCanceled builds a composed decider that fires on EventTypeTimerCanceled.
func OnTimerFired ¶
OnTimerFired builds a composed decider that fires on when a matching timer is fired.
func OnUnknownWorkflowSignaled ¶
OnUnknownWorkflowSignaled builds a composed decider that fires if the signal specified by signalName is signaled on an unknown Workflow ID.
func Transition ¶
Transition transitions the FSM to a new state, and terminates the decdier.
func UpdateState ¶
UpdateState allows you to modicy the state data without generating decisions.
type DecisionErrorHandler ¶
type DecisionErrorHandler func(ctx *FSMContext, event *swf.HistoryEvent, stateBeforeEvent interface{}, stateAfterError interface{}, err error) (*Outcome, error)
DecisionErrorHandler is the error handling contract for panics that occur in Deciders. If your DecisionErrorHandler does not return a non nil Outcome, any further attempt to process the decisionTask is abandoned and the task will time out.
type DecisionFunc ¶
type DecisionFunc func(ctx *FSMContext, h *swf.HistoryEvent, data interface{}) *swf.Decision
DecisionFunc is a building block for composable deciders that returns a decision.
type DecisionInterceptor ¶
type DecisionInterceptor interface { BeforeTask(decision *swf.PollForDecisionTaskOutput) BeforeDecision(decision *swf.PollForDecisionTaskOutput, ctx *FSMContext, outcome *Outcome) AfterDecision(decision *swf.PollForDecisionTaskOutput, ctx *FSMContext, outcome *Outcome) }
DecisionInterceptor allows manipulation of the decision task and the outcome at key points in the task lifecycle.
func CloseWorkflowRemoveIncompatibleDecisionInterceptor ¶
func CloseWorkflowRemoveIncompatibleDecisionInterceptor() DecisionInterceptor
CloseWorkflowRemoveIncompatibleDecisionInterceptor checks for incompatible decisions with a Complete workflow decision, and if found removes it from the outcome.
func DedupeDecisions ¶
func DedupeDecisions(decisionType string) DecisionInterceptor
DedupeDecisions returns an interceptor that executes after a decision and removes any duplicate decisions of the specified type from the outcome. Duplicates are removed from the beginning of the input list, so that the last decision of the specified type is the one that remains in the list.
e.g. An outcome with a list of decisions [a, a, b, a, c] where the type to dedupe was 'a' would result in an outcome with a list of decisions [b, a, c]
func DedupeWorkflowCancellations ¶
func DedupeWorkflowCancellations() DecisionInterceptor
DedupeWorkflowCancellations returns an interceptor that executes after a decision and removes any duplicate swf.DecisionTypeCancelWorkflowExecution decisions from the outcome. Duplicates are removed from the beginning of the input list, so that the last cancel decision is the one that remains in the list.
func DedupeWorkflowCloseDecisions ¶
func DedupeWorkflowCloseDecisions() DecisionInterceptor
DedupeWorkflowCloseDecisions returns an interceptor that executes after a decision and removes any duplicate workflow close decisions (cancel, complete, fail) from the outcome. Duplicates are removed from the beginning of the input list, so that the last failure decision is the one that remains in the list.
func DedupeWorkflowCompletes ¶
func DedupeWorkflowCompletes() DecisionInterceptor
DedupeWorkflowCompletes returns an interceptor that executes after a decision and removes any duplicate swf.DecisionTypeCompleteWorkflowExecution decisions from the outcome. Duplicates are removed from the beginning of the input list, so that the last complete decision is the one that remains in the list.
func DedupeWorkflowFailures ¶
func DedupeWorkflowFailures() DecisionInterceptor
DedupeWorkflowFailures returns an interceptor that executes after a decision and removes any duplicate swf.DecisionTypeFailWorkflowExecution decisions from the outcome. Duplicates are removed from the beginning of the input list, so that the last failure decision is the one that remains in the list.
func ManagedContinuations ¶
func ManagedContinuations(historySize int, workflowAgeInSec int, timerRetrySeconds int) DecisionInterceptor
ManagedContinuations is an interceptor that will handle most of the mechanics of automatically continuing workflows.
For workflows without persistent, heartbeating activities, it should do everything.
ContinueSignal: How to continue fsms with persistent activities. How it works FSM in steady+activity, listens for ContinueTimer. OnTimer, cancel activity, transition to continuing. In continuing, OnActivityCanceled send ContinueSignal. Interceptor handles ContinueSignal, if 0,0,0,0 Continue, else start ContinueTimer. In continuing, OnStarted re-starts the activity, transition back to steady.
func ManagedContinuationsWithJitter ¶
func ManagedContinuationsWithJitter(historySize int, maxSizeJitter int, workflowAgeInSec int, maxAgeJitterInSec int, timerRetrySeconds int) DecisionInterceptor
To avoid stampedes of workflows that are started at the same time being continued at the same time ManagedContinuationsWithJitter will schedule the initial continue randomly between workflowAgeInSec and workflowAgeInSec + maxAgeJitterInSec and will attempt to continue workflows with more than between historySize and historySize + maxSizeJitter events
func MoveDecisionsToEnd ¶
func MoveDecisionsToEnd(decisionType string) DecisionInterceptor
MoveDecisionsToEnd returns an interceptor that executes after a decision and moves any decisions of the specified type to the end of an outcome's decision list.
e.g. An outcome with a list of decisions [a, a, b, a, c] where the type to move was 'a' would result in an outcome with a list of decisions [b, c, a, a, a]
func MoveWorkflowCloseDecisionsToEnd ¶
func MoveWorkflowCloseDecisionsToEnd() DecisionInterceptor
MoveWorkflowCloseDecisionsToEnd returns an interceptor that executes after a decision and moves any workflow close decisions (complete, fail, cancel) to the end of an outcome's decision list.
Note: SWF responds with a 400 error if a workflow close decision is not the last decision in the list of decisions.
func NewComposedDecisionInterceptor ¶
func NewComposedDecisionInterceptor(interceptors ...DecisionInterceptor) DecisionInterceptor
func RemoveLowerPriorityDecisions ¶
func RemoveLowerPriorityDecisions(prioritizedDecisionTypes ...string) DecisionInterceptor
RemoveLowerPriorityDecisions returns an interceptor that executes after a decision and removes any lower priority decisions from an outcome if a higher priority decision exists. The decisionTypes passed to this function should be listed in highest to lowest priority order.
e.g. An outcome with a list of decisions [a, a, b, a, c] where the priority was a > b > c would return [a, a, a]
func StartCancelInterceptor ¶
func StartCancelInterceptor() DecisionInterceptor
type DecisionTaskDispatcher ¶
type DecisionTaskDispatcher interface {
DispatchTask(*swf.PollForDecisionTaskOutput, func(*swf.PollForDecisionTaskOutput))
}
DecisionTaskDispatcher is used by the FSM machinery to
func GoroutinePerWorkflowDispatcher ¶
func GoroutinePerWorkflowDispatcher(maxPendingTasks int) DecisionTaskDispatcher
GoroutinePerWorkflowDispatcher allows a single goroutine per workflow execution (RunID) to run at a time. Tasks are queued for each workflow execution. Any workflow execution with maxPendingTasks can cause DispatchTask to block until at least one of them gets handled.
type EventCorrelator ¶
type EventCorrelator struct { Activities map[string]*ActivityInfo // schedueledEventId -> info ActivityAttempts map[string]int // activityId -> attempts Signals map[string]*SignalInfo // schedueledEventId -> info SignalAttempts map[string]int // workflowId + signalName -> attempts Timers map[string]*TimerInfo // startedEventId -> info Cancellations map[string]*CancellationInfo // schedueledEventId -> info CancelationAttempts map[string]int // workflowId -> attempts Children map[string]*ChildInfo // initiatedEventID -> info ChildrenAttempts map[string]int // workflowID -> attempts Serializer StateSerializer `json:"-"` }
EventCorrelator is a serialization-friendly struct that is automatically managed by the FSM machinery It tracks signal and activity correlation info, so you know how to react when an event that signals the end of an activity or signal hits your Decider. This is missing from the SWF api. Activities and Signals are string instead of int64 beacuse json.
func (*EventCorrelator) ActivityInfo ¶
func (a *EventCorrelator) ActivityInfo(h *swf.HistoryEvent) *ActivityInfo
ActivityInfo returns the ActivityInfo that is correlates with a given event. The HistoryEvent is expected to be of type EventTypeActivityTaskCompleted,EventTypeActivityTaskFailed,EventTypeActivityTaskTimedOut.
func (*EventCorrelator) Attempts ¶
func (a *EventCorrelator) Attempts(h *swf.HistoryEvent) int
func (*EventCorrelator) AttemptsForActivity ¶
func (a *EventCorrelator) AttemptsForActivity(info *ActivityInfo) int
AttemptsForActivity returns the number of times a given activity has been attempted. It will return 0 if the activity has never failed, has been canceled, or has been completed successfully
func (*EventCorrelator) AttemptsForCancellation ¶
func (a *EventCorrelator) AttemptsForCancellation(info *CancellationInfo) int
AttemptsForCancellation returns the number of times a given signal has been attempted. It will return 0 if the signal has never failed, or has been completed successfully
func (*EventCorrelator) AttemptsForChild ¶
func (a *EventCorrelator) AttemptsForChild(info *ChildInfo) int
AttemptsForCancellation returns the number of times a given signal has been attempted. It will return 0 if the signal has never failed, or has been completed successfully
func (*EventCorrelator) AttemptsForSignal ¶
func (a *EventCorrelator) AttemptsForSignal(signalInfo *SignalInfo) int
AttemptsForSignal returns the number of times a given signal has been attempted. It will return 0 if the signal has never failed, or has been completed successfully
func (*EventCorrelator) CancellationInfo ¶
func (a *EventCorrelator) CancellationInfo(h *swf.HistoryEvent) *CancellationInfo
func (*EventCorrelator) ChildInfo ¶
func (a *EventCorrelator) ChildInfo(h *swf.HistoryEvent) *ChildInfo
func (*EventCorrelator) Correlate ¶
func (a *EventCorrelator) Correlate(h *swf.HistoryEvent)
Correlate establishes a mapping of eventId to ActivityType. The HistoryEvent is expected to be of type EventTypeActivityTaskScheduled.
func (*EventCorrelator) RemoveCorrelation ¶
func (a *EventCorrelator) RemoveCorrelation(h *swf.HistoryEvent)
RemoveCorrelation gcs a mapping of eventId to ActivityType. The HistoryEvent is expected to be of type EventTypeActivityTaskCompleted,EventTypeActivityTaskFailed,EventTypeActivityTaskTimedOut.
func (*EventCorrelator) SignalInfo ¶
func (a *EventCorrelator) SignalInfo(h *swf.HistoryEvent) *SignalInfo
SignalInfo returns the SignalInfo that is correlates with a given event. The HistoryEvent is expected to be of type EventTypeSignalExternalWorkflowExecutionFailed,EventTypeExternalWorkflowExecutionSignaled.
func (*EventCorrelator) TimerInfo ¶
func (a *EventCorrelator) TimerInfo(h *swf.HistoryEvent) *TimerInfo
func (*EventCorrelator) TimerScheduled ¶
func (a *EventCorrelator) TimerScheduled(timerId string) bool
func (*EventCorrelator) Track ¶
func (a *EventCorrelator) Track(h *swf.HistoryEvent)
Track will add or remove entries based on the EventType. A new entry is added when there is a new ActivityTask, or an entry is removed when the ActivityTask is terminating.
type FSM ¶
type FSM struct { //Name of the fsm. Used when emitting logs. Should probably be set to the name of the workflow associated with the fsm. Name string // Domain of the workflow associated with the FSM. Domain string // TaskList that the underlying poller will poll for decision tasks. TaskList string // Identity used in PollForDecisionTaskRequests, can be empty. Identity string // Client used to make SWF api requests. SWF SWFOps // Strategy for replication of state. Events may be delivered out of order. ReplicationHandler ReplicationHandler // DataType of the data struct associated with this FSM. // The data is automatically peristed to and loaded from workflow history by the FSM. DataType interface{} // Serializer used to serialize/deserialise fsm state data to/from workflow history. Serializer StateSerializer // Serializer used to serialize/deserialise in json the fsm managed marker recorded events to/from workflow history. SystemSerializer StateSerializer //PollerShutdownManager is used when the FSM is managing the polling ShutdownManager *poller.ShutdownManager //PollerCount is the number of DecisionTaskPollers to start when the FSM is started. //Default 1, if you increase this, be sure your DecisionTaskDispatcher is goroutine-safe. PollerCount int //DecisionTaskDispatcher determines the concurrency strategy for processing tasks in your fsm DecisionTaskDispatcher DecisionTaskDispatcher // DecisionInterceptor fsm will call BeforeDecision/AfterDecision. If unset // will use DefaultDecisionInterceptor. DecisionInterceptor DecisionInterceptor //DecisionErrorHandler is called whenever there is a panic in your decider. //if it returns a nil *Outcome, the attempt to handle the DecisionTask is abandoned. //fsm will then mark the workflow as being in error, by recording 3 markers. state, correlator and error //the error marker contains an ErrorState which tracks the range of unprocessed events since the error occurred. //on subsequent decision tasks if the fsm detects an error state, it will get the ErrorEvent from the ErrorState //and call the DecisionErrorHandler again. // //If there are errors here a new ErrorMarker with the increased range of unprocessed events //will be recorded. //If there is a good outcome, then we use that as the starting point from which to grab and Decide on the range of unprocessed //events. If this works out fine, we then process the initiating decisionTask range of events. DecisionErrorHandler DecisionErrorHandler // TaskErrorHandler is called when an error occurs // outside of the Decider machinery. When this handler is called the decision // task has been abandoned and the task will timeout without any further intervention. // // If unset, the DefaultTaskErrorHandler will be used. // If more "cleanup" is desired, set this field with a custom TaskErrorHandler. TaskErrorHandler TaskErrorHandler //FSMErrorReporter is called whenever there is an error within the FSM, usually indicating bad state or configuration of your FSM. FSMErrorReporter FSMErrorReporter //AllowPanics is mainly for testing, it should be set to false in production. //when true, instead of recovering from panics in deciders, it allows them to propagate. AllowPanics bool // Logger is used for output on a FSM. If not set, will use log.Log Logger StdLogger // contains filtered or unexported fields }
FSM models the decision handling logic a workflow in SWF
Example ¶
// create with swf.NewClient var client *swf.SWF // data type that will be managed by the type StateData struct { Message string `json:"message,omitempty"` Count int `json:"count,omitempty"` } //event type that will be signalled to the FSM with signal name "hello" type Hello struct { Message string `json:"message,omitempty"` } //This is an example of building Deciders without using decider composition. //the FSM we will create will oscillate between 2 states, //waitForSignal -> will wait till the workflow is started or signalled, and update the StateData based on the Hello message received, set a timer, and transition to waitForTimer //waitForTimer -> will wait till the timer set by waitForSignal fires, and will signal the workflow with a Hello message, and transition to waitFotSignal waitForSignal := func(f *FSMContext, h *swf.HistoryEvent, d *StateData) Outcome { decisions := f.EmptyDecisions() switch *h.EventType { case swf.EventTypeWorkflowExecutionStarted, swf.EventTypeWorkflowExecutionSignaled: if *h.EventType == swf.EventTypeWorkflowExecutionSignaled && *h.WorkflowExecutionSignaledEventAttributes.SignalName == "hello" { hello := &Hello{} f.EventData(h, &Hello{}) d.Count++ d.Message = hello.Message } timeoutSeconds := "5" //swf uses stringy numbers in many places timerDecision := &swf.Decision{ DecisionType: S(swf.DecisionTypeStartTimer), StartTimerDecisionAttributes: &swf.StartTimerDecisionAttributes{ StartToFireTimeout: S(timeoutSeconds), TimerId: S("timeToSignal"), }, } decisions = append(decisions, timerDecision) return f.Goto("waitForTimer", d, decisions) } //if the event was unexpected just stay here return f.Stay(d, decisions) } waitForTimer := func(f *FSMContext, h *swf.HistoryEvent, d *StateData) Outcome { decisions := f.EmptyDecisions() switch *h.EventType { case swf.EventTypeTimerFired: //every time the timer fires, signal the workflow with a Hello message := strconv.FormatInt(time.Now().Unix(), 10) signalInput := &Hello{message} signalDecision := &swf.Decision{ DecisionType: S(swf.DecisionTypeSignalExternalWorkflowExecution), SignalExternalWorkflowExecutionDecisionAttributes: &swf.SignalExternalWorkflowExecutionDecisionAttributes{ SignalName: S("hello"), Input: S(f.Serialize(signalInput)), RunId: f.RunId, WorkflowId: f.WorkflowId, }, } decisions = append(decisions, signalDecision) return f.Goto("waitForSignal", d, decisions) } //if the event was unexpected just stay here return f.Stay(d, decisions) } //These 2 deciders are the same as the ones above, but use composable decider bits. typed := Typed(new(StateData)) updateState := typed.StateFunc(func(f *FSMContext, h *swf.HistoryEvent, d *StateData) { hello := &Hello{} f.EventData(h, &Hello{}) d.Count++ d.Message = hello.Message }) setTimer := typed.DecisionFunc(func(f *FSMContext, h *swf.HistoryEvent, d *StateData) *swf.Decision { timeoutSeconds := "5" //swf uses stringy numbers in many places return &swf.Decision{ DecisionType: S(swf.DecisionTypeStartTimer), StartTimerDecisionAttributes: &swf.StartTimerDecisionAttributes{ StartToFireTimeout: S(timeoutSeconds), TimerId: S("timeToSignal"), }, } }) sendSignal := typed.DecisionFunc(func(f *FSMContext, h *swf.HistoryEvent, d *StateData) *swf.Decision { message := strconv.FormatInt(time.Now().Unix(), 10) signalInput := &Hello{message} return &swf.Decision{ DecisionType: S(swf.DecisionTypeSignalExternalWorkflowExecution), SignalExternalWorkflowExecutionDecisionAttributes: &swf.SignalExternalWorkflowExecutionDecisionAttributes{ SignalName: S("hello"), Input: S(f.Serialize(signalInput)), RunId: f.RunId, WorkflowId: f.WorkflowId, }, } }) waitForSignalComposedDecider := NewComposedDecider( OnStarted(UpdateState(updateState), AddDecision(setTimer), Transition("waitForTimer")), OnSignalReceived("hello", UpdateState(updateState), AddDecision(setTimer), Transition("waitForTimer")), DefaultDecider(), ) waitForTimerComposedDecider := NewComposedDecider( OnTimerFired("timeToSignal", AddDecision(sendSignal), Transition("waitForSignal")), DefaultDecider(), ) //create the FSMState by passing the decider function through TypedDecider(), //which lets you use d *StateData rather than d interface{} in your decider. waitForSignalState := &FSMState{Name: "waitForSignal", Decider: typed.Decider(waitForSignal)} waitForTimerState := &FSMState{Name: "waitForTimer", Decider: typed.Decider(waitForTimer)} //or with the composed deciders waitForSignalState = &FSMState{Name: "waitForSignal", Decider: waitForSignalComposedDecider} waitForTimerState = &FSMState{Name: "waitForTimer", Decider: waitForTimerComposedDecider} //wire it up in an fsm fsm := &FSM{ Name: "example-fsm", SWF: client, DataType: StateData{}, Domain: "exaple-swf-domain", TaskList: "example-decision-task-list-to-poll", Serializer: &JSONStateSerializer{}, } //add states to FSM fsm.AddInitialState(waitForSignalState) fsm.AddState(waitForTimerState) //start it up! fsm.Start() //To start workflows using this fsm client.StartWorkflowExecution(&swf.StartWorkflowExecutionInput{ Domain: S("exaple-swf-domain"), WorkflowId: S("your-id"), //you will have previously regiestered a WorkflowType that this FSM will work. WorkflowType: &swf.WorkflowType{Name: S("the-name"), Version: S("the-version")}, Input: StartFSMWorkflowInput(fsm, &StateData{Count: 0, Message: "starting message"}), })
Output:
func (*FSM) AddCanceledState ¶
AddCanceledState adds a state to the FSM and uses it as the final state of a workflow. It will only receive events if you returned FSMContext.CancelWorkflow(...) and the workflow was unable to cancel.
func (*FSM) AddCompleteState ¶
AddCompleteState adds a state to the FSM and uses it as the final state of a workflow. It will only receive events if you returned FSMContext.Complete(...) and the workflow was unable to complete.
func (*FSM) AddCompleteStateWithHandler ¶
func (f *FSM) AddCompleteStateWithHandler(state *FSMState, handler DecisionErrorHandler)
AddCompleteStateWithHandler adds a state to the FSM and uses it as the final state of a workflow. it will only receive events if you returned FSMContext.Complete(...) and the workflow was unable to complete. It also adds a DecisionErrorHandler to the state.
func (*FSM) AddErrorHandler ¶
func (f *FSM) AddErrorHandler(state string, handler DecisionErrorHandler)
AddErrorHandler adds a DecisionErrorHandler to a state in the FSM.
func (*FSM) AddFailedState ¶
AddFailedState adds a state to the FSM and uses it as the final state of a workflow. It will only receive events if you returned FSMContext.FailWorkflow(...) and the workflow was unable to fail.
func (*FSM) AddInitialState ¶
AddInitialState adds a state to the FSM and uses it as the initial state when a workflow execution is started.
func (*FSM) AddInitialStateWithHandler ¶
func (f *FSM) AddInitialStateWithHandler(state *FSMState, handler DecisionErrorHandler)
AddInitialStateWithHandler adds a state to the FSM and uses it as the initial state when a workflow execution is started. it uses the FSM DefaultDecisionErrorHandler, which defaults to FSM.DefaultDecisionErrorHandler if unset.
func (*FSM) DefaultCanceledState ¶
DefaultCanceledState is the canceled state used in an FSM if one has not been set. It simply responds with a CancelWorkflow which attempts to Cancel the workflow. This state will only get events if you previously attempted to cancel the workflow and it failed.
func (*FSM) DefaultCompleteState ¶
DefaultCompleteState is the complete state used in an FSM if one has not been set. It simply responds with a CompleteDecision which attempts to Complete the workflow. This state will only get events if you previously attempted to complete the workflow and it failed.
func (*FSM) DefaultDecisionErrorHandler ¶
func (f *FSM) DefaultDecisionErrorHandler(ctx *FSMContext, event *swf.HistoryEvent, stateBeforeEvent interface{}, stateAfterError interface{}, err error) (*Outcome, error)
DefaultDecisionErrorHandler is the default DecisionErrorHandler that is used if a handler is not set on the FSM or a handler is not associated with the current state. This default handler simply logs the error and the decision task will timeout.
func (*FSM) DefaultDecisionInterceptor ¶
func (f *FSM) DefaultDecisionInterceptor() DecisionInterceptor
DefaultDecisionInterceptor is an interceptor that handles removing duplicate close decisions, moving close decisions to the end of the decision list for an outcome, and making sure the highest priority close decision is the one returned to SWF.
Close decision types in priority order are: swf.DecisionTypeFailWorkflowExecution swf.DecisionTypeCompleteWorkflowExecution swf.DecisionTypeCancelWorkflowExecution
func (*FSM) DefaultFailedState ¶
DefaultFailedState is the failed state used in an FSM if one has not been set. It simply responds with a FailWorkflow which attempts to Fail the workflow. This state will only get events if you previously attempted to fail the workflow and the call failed.
func (*FSM) DefaultTaskErrorHandler ¶
func (f *FSM) DefaultTaskErrorHandler(decisionTask *swf.PollForDecisionTaskOutput, err error)
DefaultTaskErrorHandler is the default TaskErrorHandler that is used if a TaskErrorHandler is not set on this FSM. DefaultTaskErrorHandler simply logs the error. With no further intervention the decision task will timeout.
func (*FSM) Deserialize ¶
Deserialize uses the FSM.Serializer to deserialize data from a string. If there is an error in deserialization this func will panic, so this should usually only be used inside Deciders where the panics are recovered and proper errors are recorded in the workflow.
func (*FSM) EmptyDecisions ¶
EmptyDecisions is a helper method to give you an empty decisions array for use in your Deciders.
func (*FSM) ErrorDeserializingStateData ¶
func (f *FSM) ErrorDeserializingStateData(decisionTask *swf.PollForDecisionTaskOutput, serializedStateData string, err error)
ErrorDeserializingStateData is part of the FSM implementation of FSMErrorReporter
func (*FSM) ErrorFindingCorrelator ¶
func (f *FSM) ErrorFindingCorrelator(decisionTask *swf.PollForDecisionTaskOutput, err error)
ErrorFindingCorrelator is part of the FSM implementation of FSMErrorReporter
func (*FSM) ErrorFindingStateData ¶
func (f *FSM) ErrorFindingStateData(decisionTask *swf.PollForDecisionTaskOutput, err error)
ErrorFindingStateData is part of the FSM implementation of FSMErrorReporter
func (*FSM) ErrorMissingFSMState ¶
func (f *FSM) ErrorMissingFSMState(decisionTask *swf.PollForDecisionTaskOutput, outcome Outcome)
ErrorMissingFSMState is part of the FSM implementation of FSMErrorReporter
func (*FSM) ErrorSerializingStateData ¶
func (f *FSM) ErrorSerializingStateData(decisionTask *swf.PollForDecisionTaskOutput, outcome Outcome, eventCorrelator EventCorrelator, err error)
ErrorSerializingStateData is part of the FSM implementation of FSMErrorReporter
func (*FSM) ErrorStateTick ¶
func (f *FSM) ErrorStateTick(decisionTask *swf.PollForDecisionTaskOutput, error *SerializedErrorState, context *FSMContext, data interface{}) (*Outcome, error)
ErrorStateTick is called when the DecisionTaskPoller receives a PollForDecisionTaskResponse in its polling loop that contains an error marker in its history.
func (*FSM) EventData ¶
func (f *FSM) EventData(event *swf.HistoryEvent, eventData interface{})
EventData works in combination with the FSM.Serializer to provide deserialization of data sent in a HistoryEvent. It is sugar around extracting the event payload from the proper field of the proper Attributes struct on the HistoryEvent
func (*FSM) Init ¶
func (f *FSM) Init()
Init initializes any optional, unspecified values such as the error state, stop channel, serializer, PollerShutdownManager. it gets called by Start(), so you should only call this if you are manually managing polling for tasks, and calling Tick yourself.
func (*FSM) InitialState ¶
InitialState is the implementation of FSMSerializer.InitialState()
func (*FSM) Serialize ¶
Serialize uses the FSM.Serializer to serialize data to a string. If there is an error in serialization this func will panic, so this should usually only be used inside Deciders where the panics are recovered and proper errors are recorded in the workflow.
func (*FSM) Start ¶
func (f *FSM) Start()
Start begins processing DecisionTasks with the FSM. It creates one or more DecisionTaskPollers and spawns a goroutine that continues polling until Stop() is called and any in-flight polls have completed. If you wish to manage polling and calling Tick() yourself, you dont need to start the FSM, just call Init().
func (*FSM) StateSerializer ¶
func (f *FSM) StateSerializer() StateSerializer
StateSerializer is the implementation of FSMSerializer.StateSerializer()
func (*FSM) Stop ¶
func (f *FSM) Stop()
Stop causes the DecisionTask select loop to exit, and to stop the DecisionTaskPoller
func (*FSM) Tick ¶
func (f *FSM) Tick(decisionTask *swf.PollForDecisionTaskOutput) (*FSMContext, []*swf.Decision, *SerializedState, error)
Tick is called when the DecisionTaskPoller receives a PollForDecisionTaskResponse in its polling loop. On errors, a nil *SerializedState is returned, and an error Outcome is included in the Decision list. It is exported to facilitate testing.
type FSMClient ¶
type FSMClient interface { GetState(id string) (string, interface{}, error) GetStateForRun(workflow, run string) (string, interface{}, error) GetSerializedStateForRun(workflow, run string) (*SerializedState, *swf.GetWorkflowExecutionHistoryOutput, error) Signal(id string, signal string, input interface{}) error Start(startTemplate swf.StartWorkflowExecutionInput, id string, input interface{}) (*swf.StartWorkflowExecutionOutput, error) RequestCancel(id string) error GetWorkflowExecutionHistoryPages(execution *swf.WorkflowExecution, fn func(p *swf.GetWorkflowExecutionHistoryOutput, lastPage bool) (shouldContinue bool)) error GetWorkflowExecutionHistoryFromReader(reader io.Reader) (*swf.GetWorkflowExecutionHistoryOutput, error) FindAll(input *FindInput) (output *FindOutput, err error) FindAllWalk(input *FindInput, fn func(info *swf.WorkflowExecutionInfo, done bool) (cont bool)) (err error) FindLatestByWorkflowID(workflowID string) (exec *swf.WorkflowExecution, err error) NewHistorySegmentor() HistorySegmentor }
func NewFSMClient ¶
func NewFSMClient(f *FSM, c ClientSWFOps) FSMClient
type FSMContext ¶
type FSMContext struct { swf.WorkflowType swf.WorkflowExecution State string // contains filtered or unexported fields }
FSMContext is populated by the FSM machinery and passed to Deciders.
func NewFSMContext ¶
func NewFSMContext( serialization Serialization, wfType swf.WorkflowType, wfExec swf.WorkflowExecution, eventCorrelator *EventCorrelator, state string, stateData interface{}, stateVersion uint64) *FSMContext
NewFSMContext constructs an FSMContext.
func (*FSMContext) ActivitiesInfo ¶
func (f *FSMContext) ActivitiesInfo() map[string]*ActivityInfo
ActivitiesInfo will return a map of scheduledId -> ActivityInfo for all in-flight activities in the workflow.
func (*FSMContext) ActivityInfo ¶
func (f *FSMContext) ActivityInfo(h *swf.HistoryEvent) *ActivityInfo
ActivityInfo will find information for ActivityTasks being tracked. It can only be used when handling events related to ActivityTasks. ActivityTasks are automatically tracked after a EventTypeActivityTaskScheduled event. When there is no pending activity related to the event, nil is returned.
func (*FSMContext) Attempts ¶
func (f *FSMContext) Attempts(h *swf.HistoryEvent) int
func (*FSMContext) CancelWorkflow ¶
func (f *FSMContext) CancelWorkflow(data interface{}, details *string) Outcome
CancelWorkflow is a helper func to easily create a CompleteOutcome that sends a CancelWorklfow decision.
func (*FSMContext) CompleteWorkflow ¶
func (f *FSMContext) CompleteWorkflow(data interface{}, decisions ...*swf.Decision) Outcome
CompleteWorkflow is a helper func to easily create a CompleteOutcome that sends a CompleteWorkflow decision.
func (*FSMContext) CompleteWorkflowDecision ¶
func (f *FSMContext) CompleteWorkflowDecision(data interface{}) *swf.Decision
CompleteWorkflowDecision will build a CompleteWorkflowExecutionDecision decision that has the expected SerializedState marshalled to json as its result. This decision should be used when it is appropriate to Complete your workflow.
func (*FSMContext) ContinueDecider ¶
func (f *FSMContext) ContinueDecider(data interface{}, decisions []*swf.Decision) Outcome
ContinueDecider is a helper func to easily create a ContinueOutcome.
func (*FSMContext) ContinueWorkflow ¶
func (f *FSMContext) ContinueWorkflow(data interface{}, decisions ...*swf.Decision) Outcome
ContinueWorkflow is a helper func to easily create a CompleteOutcome that sends a ContinueWorklfow decision.
func (*FSMContext) ContinueWorkflowDecision ¶
func (f *FSMContext) ContinueWorkflowDecision(continuedState string, data interface{}) *swf.Decision
ContinueWorkflowDecision will build a ContinueAsNewWorkflow decision that has the expected SerializedState marshalled to json as its input. This decision should be used when it is appropriate to Continue your workflow. You are unable to ContinueAsNew a workflow that has running activites, so you should assure there are none running before using this. As such there is no need to copy over the ActivityCorrelator. If the FSM Data Struct is Taggable, its tags will be used on the Continue Decisions
func (*FSMContext) Correlator ¶
func (f *FSMContext) Correlator() *EventCorrelator
func (*FSMContext) Decide ¶
func (f *FSMContext) Decide(h *swf.HistoryEvent, data interface{}, decider Decider) Outcome
Decide executes a decider making sure that Activity tasks are being tracked.
func (*FSMContext) Decision ¶
func (f *FSMContext) Decision(d *swf.Decision) []*swf.Decision
EmptyDecisions is a helper to give you an empty Decision slice.
func (*FSMContext) Deserialize ¶
func (f *FSMContext) Deserialize(serialized string, data interface{})
Deserialize will use the current fsm' Serializer to deserialize the given string into the given struct. It will panic on errors, which is ok in the context of a Decider. If you want to handle errors, use Serializer().Deserialize(...) instead.
func (*FSMContext) EmptyDecisions ¶
func (f *FSMContext) EmptyDecisions() []*swf.Decision
EmptyDecisions is a helper to give you an empty Decision slice.
func (*FSMContext) EventData ¶
func (f *FSMContext) EventData(h *swf.HistoryEvent, data interface{})
EventData will extract a payload from the given HistoryEvent and unmarshall it into the given struct.
func (*FSMContext) FailWorkflow ¶
func (f *FSMContext) FailWorkflow(data interface{}, details *string) Outcome
FailWorkflow is a helper func to easily create a FailOutcome that sends a FailWorklfow decision.
func (*FSMContext) Goto ¶
func (f *FSMContext) Goto(state string, data interface{}, decisions []*swf.Decision) Outcome
Goto is a helper func to easily create a TransitionOutcome.
func (*FSMContext) InitialState ¶
func (f *FSMContext) InitialState() string
func (*FSMContext) Pass ¶
func (f *FSMContext) Pass() Outcome
func (*FSMContext) Serialize ¶
func (f *FSMContext) Serialize(data interface{}) string
Serialize will use the current fsm's Serializer to serialize the given struct. It will panic on errors, which is ok in the context of a Decider. If you want to handle errors, use Serializer().Serialize(...) instead.
func (*FSMContext) Serializer ¶
func (f *FSMContext) Serializer() StateSerializer
Serializer returns the current fsm's Serializer.
func (*FSMContext) SignalInfo ¶
func (f *FSMContext) SignalInfo(h *swf.HistoryEvent) *SignalInfo
SignalInfo will find information for ActivityTasks being tracked. It can only be used when handling events related to ActivityTasks. ActivityTasks are automatically tracked after a EventTypeActivityTaskScheduled event. When there is no pending activity related to the event, nil is returned.
func (*FSMContext) SignalsInfo ¶
func (f *FSMContext) SignalsInfo() map[string]*SignalInfo
SignalsInfo will return a map of scheduledId -> ActivityInfo for all in-flight activities in the workflow.
func (*FSMContext) StateSerializer ¶
func (f *FSMContext) StateSerializer() StateSerializer
type FSMErrorReporter ¶
type FSMErrorReporter interface { ErrorFindingStateData(decisionTask *swf.PollForDecisionTaskOutput, err error) ErrorFindingCorrelator(decisionTask *swf.PollForDecisionTaskOutput, err error) ErrorMissingFSMState(decisionTask *swf.PollForDecisionTaskOutput, outcome Outcome) ErrorDeserializingStateData(decisionTask *swf.PollForDecisionTaskOutput, serializedStateData string, err error) ErrorSerializingStateData(decisionTask *swf.PollForDecisionTaskOutput, outcome Outcome, eventCorrelator EventCorrelator, err error) }
FSMErrorHandler is the error handling contract for errors in the FSM machinery itself. These are generally a misconfiguration of your FSM or mismatch between struct and serialized form and cant be resolved without config/code changes the paramaters to each method provide all availabe info at the time of the error so you can diagnose issues. Note that this is a diagnostic interface that basically leaks implementation details, and as such may change from release to release.
type FSMState ¶
type FSMState struct { // Name is the name of the state. When returning an Outcome, the NextState should match the Name of an FSMState in your FSM. Name string // Decider decides an Outcome given the current state, data, and an event. Decider Decider }
FSMState defines the behavior of one state of an FSM
type FindInput ¶
type FindInput struct { MaximumPageSize *int64 OpenNextPageToken *string ClosedNextPageToken *string ReverseOrder *bool StatusFilter string StartTimeFilter *swf.ExecutionTimeFilter CloseTimeFilter *swf.ExecutionTimeFilter // only closed ExecutionFilter *swf.WorkflowExecutionFilter TagFilter *swf.TagFilter TypeFilter *swf.WorkflowTypeFilter CloseStatusFilter *swf.CloseStatusFilter // only closed }
type FindOutput ¶
type FindOutput struct { ExecutionInfos []*swf.WorkflowExecutionInfo OpenNextPageToken *string ClosedNextPageToken *string }
type Finder ¶
type Finder interface { FindAll(*FindInput) (*FindOutput, error) FindLatestByWorkflowID(workflowID string) (*swf.WorkflowExecution, error) Reset() }
func NewFinder ¶
func NewFinder(domain string, c ClientSWFOps) Finder
type FuncInterceptor ¶
type FuncInterceptor struct { BeforeTaskFn func(decision *swf.PollForDecisionTaskOutput) BeforeDecisionFn func(decision *swf.PollForDecisionTaskOutput, ctx *FSMContext, outcome *Outcome) AfterDecisionFn func(decision *swf.PollForDecisionTaskOutput, ctx *FSMContext, outcome *Outcome) }
FuncInterceptor is a DecisionInterceptor that you can set handler funcs on. if any are unset, they are no-ops.
func (*FuncInterceptor) AfterDecision ¶
func (i *FuncInterceptor) AfterDecision(decision *swf.PollForDecisionTaskOutput, ctx *FSMContext, outcome *Outcome)
AfterDecision runs the AfterDecisionFn if not nil
func (*FuncInterceptor) BeforeDecision ¶
func (i *FuncInterceptor) BeforeDecision(decision *swf.PollForDecisionTaskOutput, ctx *FSMContext, outcome *Outcome)
BeforeDecision runs the BeforeDecisionFn if not nil
func (*FuncInterceptor) BeforeTask ¶
func (i *FuncInterceptor) BeforeTask(decision *swf.PollForDecisionTaskOutput)
BeforeTask runs the BeforeTaskFn if not nil
type HistorySegment ¶
type HistorySegment struct { State *HistorySegmentState Correlator *EventCorrelator Error *SerializedErrorState Events []*HistorySegmentEvent ContinuedExecutionRunId *string }
type HistorySegmentEvent ¶
type HistorySegmentState ¶
type HistorySegmentor ¶
type HistorySegmentor interface { FromPage(p *swf.GetWorkflowExecutionHistoryOutput, lastPage bool) (shouldContinue bool) OnStart(fn func()) HistorySegmentor OnSegment(func(HistorySegment)) HistorySegmentor OnPage(fn func()) HistorySegmentor OnError(func(error)) HistorySegmentor OnFinish(fn func()) HistorySegmentor }
type JSONStateSerializer ¶
type JSONStateSerializer struct{}
JSONStateSerializer is a StateSerializer that uses go json serialization.
func (JSONStateSerializer) Deserialize ¶
func (j JSONStateSerializer) Deserialize(serialized string, state interface{}) error
Deserialize unmarshalls the given (json) string into the given struct
func (JSONStateSerializer) Serialize ¶
func (j JSONStateSerializer) Serialize(state interface{}) (string, error)
Serialize serializes the given struct to a json string.
type KinesisOps ¶
type KinesisOps interface {
PutRecord(*kinesis.PutRecordInput) (*kinesis.PutRecordOutput, error)
}
KinesisOps is the subset of kinesis.Kinesis ops required by KinesisReplication
type KinesisReplication ¶
type KinesisReplication struct { KinesisStream string KinesisReplicator KinesisReplicator KinesisOps KinesisOps }
KinesisReplication can be used as a ReplicationHandler by setting its Handler func as the FSM ReplicationHandler
func (*KinesisReplication) Handler ¶
func (f *KinesisReplication) Handler(ctx *FSMContext, decisionTask *swf.PollForDecisionTaskOutput, completedDecision *swf.RespondDecisionTaskCompletedInput, state *SerializedState) error
Handler is a ReplicationHandler. to configure it on your FSM, do fsm.ReplicationHandler = &KinesisReplication{...).Handler
type KinesisReplicator ¶
type KinesisReplicator func(fsm, workflowId string, put func() (*kinesis.PutRecordOutput, error)) (*kinesis.PutRecordOutput, error)
KinesisReplicator lets you customize the retry logic around Replicating State to Kinesis.
type MultiDecisionFunc ¶
type MultiDecisionFunc func(ctx *FSMContext, h *swf.HistoryEvent, data interface{}) []*swf.Decision
MultiDecisionFunc is a building block for composable deciders that returns a [] of decision.
type NewGoroutineDispatcher ¶
type NewGoroutineDispatcher struct { }
NewGoroutineDispatcher is a DecisionTaskDispatcher that runs the decision handler in a new goroutine.
func (*NewGoroutineDispatcher) DispatchTask ¶
func (*NewGoroutineDispatcher) DispatchTask(task *swf.PollForDecisionTaskOutput, handler func(*swf.PollForDecisionTaskOutput))
DispatchTask calls the handler in a new goroutine.
type Outcome ¶
type Outcome struct { //State is the desired next state in the FSM. the empty string ("") is a signal that you wish decision processing to continue //if the FSM machinery recieves the empty string as the state of a final outcome, it will substitute the current state. State string Data interface{} Decisions []*swf.Decision }
Outcome is the result of a Decider processing a HistoryEvent
type PredicateFunc ¶
type PredicateFunc func(data interface{}) bool
PredicateFunc is a building block for composable deciders, a predicate based on the FSM stateData.
type ReplicationHandler ¶
type ReplicationHandler func(*FSMContext, *swf.PollForDecisionTaskOutput, *swf.RespondDecisionTaskCompletedInput, *SerializedState) error
ReplicationHandler can be configured on an FSM and will be called when a DecisionTask is successfully completed. Note that events can be delivered out of order to the ReplicationHandler.
type SWFOps ¶
type SWFOps interface { PollForDecisionTaskPages(*swf.PollForDecisionTaskInput, func(*swf.PollForDecisionTaskOutput, bool) bool) error RespondDecisionTaskCompleted(*swf.RespondDecisionTaskCompletedInput) (*swf.RespondDecisionTaskCompletedOutput, error) }
SWFOps is the subset of swf.SWF ops required by the fsm package
type Serialization ¶
type Serialization interface { EventData(h *swf.HistoryEvent, data interface{}) Serialize(data interface{}) string StateSerializer() StateSerializer Deserialize(serialized string, data interface{}) InitialState() string }
Serialization is the contract for de/serializing state inside an FSM, typically implemented by the FSM itself but serves to break the circular dep between FSMContext and FSM.
type SerializedActivityState ¶
Payload of Signals ActivityStartedSignal and ActivityUpdatedSignal
type SerializedErrorState ¶
type SerializedErrorState struct { Details string EarliestUnprocessedEventId int64 LatestUnprocessedEventId int64 ErrorEvent *swf.HistoryEvent }
ErrorState is used as the input to a marker that signifies that the workflow is in an error state.
type SerializedState ¶
type SerializedState struct { StateVersion uint64 `json:"stateVersion"` StateName string `json:"stateName"` StateData string `json:"stateData"` WorkflowId string `json:"workflowId"` }
SerializedState is a wrapper struct that allows serializing the current state and current data for the FSM in a MarkerRecorded event in the workflow history. We also maintain an epoch, which counts the number of times a workflow has been continued, and the StartedId of the DecisionTask that generated this state. The epoch + the id provide a total ordering of state over the lifetime of different runs of a workflow.
type SignalInfo ¶
SignalInfo holds the SignalName and Input for an activity
type StartCancelPair ¶
type StartCancelPair struct {
// contains filtered or unexported fields
}
type Stasher ¶
type Stasher struct {
// contains filtered or unexported fields
}
Stasher is used to take snapshots of StateData between each event so that we can have shap
func NewStasher ¶
func NewStasher(dataType interface{}) *Stasher
type StateFunc ¶
type StateFunc func(ctx *FSMContext, h *swf.HistoryEvent, data interface{})
StateFunc is a building block for composable deciders mutates the FSM stateData.
type StateSerializer ¶
type StateSerializer interface { Serialize(state interface{}) (string, error) Deserialize(serialized string, state interface{}) error }
StateSerializer defines the interface for serializing state to and deserializing state from the workflow history.
type Taggable ¶
type Taggable interface {
Tags() []*string
}
FSM Data types that implement this interface will have the resulting tags used by FSMClient when starting workflows and by the FSMContext when calling ContinueWorkflow() it is []*string since thats what SWF api takes atm.
type TaskErrorHandler ¶
type TaskErrorHandler func(decisionTask *swf.PollForDecisionTaskOutput, err error)
TaskErrorHandler is the error handling contract for errors that occur outside of the Decider machinery when handling receiving incoming tasks, sending outgoing decisions for tasks, or replicating state. This handler is called when a decision task has been abandoned and the task will timeout without any further intervention.
type TypedFuncs ¶
type TypedFuncs struct {
// contains filtered or unexported fields
}
TypedFuncs lets you construct building block for composable deciders, that have arguments that are checked against the type of your FSM stateData.
func Typed ¶
func Typed(typed interface{}) *TypedFuncs
Typed allows you to create Typed building blocks for composable deciders. the type checking here is done on constriction at runtime, so be sure to have a unit test that constructs your funcs.
func (*TypedFuncs) Decider ¶
func (t *TypedFuncs) Decider(decider interface{}) Decider
Decider builds a Decider from your typed Decider that verifies the right typing at construction time.
func (*TypedFuncs) DecisionFunc ¶
func (t *TypedFuncs) DecisionFunc(decisionFunc interface{}) DecisionFunc
DecisionFunc builds a DecisionFunc from your typed DecisionFunc that verifies the right typing at construction time.
func (*TypedFuncs) MultiDecisionFunc ¶
func (t *TypedFuncs) MultiDecisionFunc(decisionFunc interface{}) MultiDecisionFunc
MultiDecisionFunc builds a MultiDecisionFunc from your typed MultiDecisionFunc that verifies the right typing at construction time.
func (*TypedFuncs) PredicateFunc ¶
func (t *TypedFuncs) PredicateFunc(stateFunc interface{}) PredicateFunc
PredicateFunc builds a PredicateFunc from your typed PredicateFunc that verifies the right typing at construction time.
func (*TypedFuncs) StateFunc ¶
func (t *TypedFuncs) StateFunc(stateFunc interface{}) StateFunc
StateFunc builds a StateFunc from your typed StateFunc that verifies the right typing at construction time.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package jsonpbserializer implements a fsm.StateSerializer that uses jsonpb as the underlying JSON serializer, rather than stdlibs.
|
Package jsonpbserializer implements a fsm.StateSerializer that uses jsonpb as the underlying JSON serializer, rather than stdlibs. |