Documentation ¶
Overview ¶
The SwfHelper is meant to make the decision process execution task easier. As decision tasks can go to any decider, they need to be able to easily recreate the history, and make decisions.
Of the swf helper there are two main types. Deciders, and Activities.
Deciders ========= Deciders must be deterministic in flow. A decider is registered by given the workflow type and a function callback.
// this creates the decider manager/poller // it will poll the specified region, domain and task list to get decision tasks decider := &swfhelper.Decider{ Region: "us-west-2", Domain: "yewwdev", TaskList: "hello1", Identity: "atHome1", MaxWorkers: 1, } // We must register a function to a workflow. Any decision tasks that come in, // and match this type, and version will be handled by the 'BasicDecider' function decider.RegisterWorkflow(swf.WorkflowType{Name:"hello", Version:"1"}, BasicDecider) // we now block constantly polling SWF for decision tasks decider.Start()
The decider itself, is just a function like so:
// the swfhelper.SwfWorkflow is the object we will manipulate to our needs. // This function will be called multiple times during the lifetime of a single // workflow. Log lines, or anything not contained in an activity will be executed // multiple times, as each time a new decision is required for a workflow, this function // will be called. Thus the ordering of w.Go calls must be deterministic. func BasicDecider(w *swfhelper.SwfWorkflow) { // we must always make a decision after a workflow // otherwise swfhelper will not respond with a decision defer w.Decide() // first we define a task/activity that we want to execute task := swfhelper.Task{ Activity: swf.ActivityType{ Name:"getnum", Version:"1", }, } // we then ask for the execution of it, passing in the 'hello1' string as a variable // this will return a channel. If we have not executed the task yet, or the task is // not yet complete, a closed channel will be returned. If we are wanting to wait // for the response of this task, we must return on a closed channel. helloTask := w.Go(task, "hello1") res, ok := <- helloTask if !ok { // return as the channel is closed, and we need the response to continue return } // logging the output of the first task // note: that this log line wil appear again after helloTask2 is completed log.Println("1. Error:", res.FailureType, res.FailureCause) log.Println("1. Result:", res.Result) // here we will execute two tasks in parallel. and then wait for their responses helloTask2 := w.Go(task, "hello2") helloTask3 := w.Go(task, "hello3") // wait for hello2 res2, ok := <- helloTask2 if !ok { log.Println("Task2 not ready") return } // wait for hello3 res3, ok := <- helloTask3 if !ok { log.Println("Task3 not ready") return } log.Println("2. Error:", res2.FailureType, res2.FailureCause) log.Println("2. Result:", res2.Result) log.Println("3. Error:", res3.FailureType, res3.FailureCause) log.Println("3. Result:", res3.Result) // now that all the tasks are done, complete the workflow w.Complete("Just amazing") }
The SwfWorkflow object uses the function name 'Go' to launch an activity. It will return a channel. This channel will be closed if the result (success or failure) has not yet been reached on the task. You should never block in the decider, unless you want no decisions (Eg. 'Go', 'Completed', 'StartTimer') to be posted. Decision tasks are not sticky, so if you have multiple deciders running any of them can handle the next decision for a single workflow. Ordering of calls to 'Go' must be deterministic, and be exactly the same, no matter the order of response from SWF. This is so we can properly identify which response matches with which call.
Activities ========== Activities do not need to be determintic. They are execute once per task, and return their a response to the decider.
// the worker here will listen to the domain, region and task list for activity tasks act := &swfhelper.ActivityWorker{ Region: "us-west-2", Domain: "yewwdev", TaskList: "actlist", Identity: "atHome1-worker", MaxWorkers: 1, } // we register the function 'ActivityOne' to handle the 'getnum' swf activity. act.RegisterActivity(swf.ActivityType{Name:"getnum", Version:"1"}, ActivityOne) // block polling for new activity tasks act.Start()
The actual activity is defined as such:
// swfhelper.ActivityContext provides us with the input to the activity, and ways // to respond to it. func ActivityOne(a *swfhelper.ActivityContext) { myRes := int64(rand.Int()) // we defer the completed decision, as we must make a decision at the end defer a.Completed(strconv.FormatInt(myRes, 10)) defer log.Println("Hey:", a.Input, myRes) time.Sleep(5 * time.Second + time.Second * time.Duration(rand.Float32() * 5)) }
Activities and deciders do not need to run on the same machine, or executable.
Index ¶
- type ActivityContext
- func (a *ActivityContext) As(out interface{}) error
- func (a *ActivityContext) CloseWithoutResult()
- func (a *ActivityContext) Completed(result interface{})
- func (a *ActivityContext) Failed(reason, details string)
- func (a *ActivityContext) GetRawTask() *swf.PollForActivityTaskResponse
- func (a *ActivityContext) SuccesfullyCancel(details string)
- type ActivityHandler
- type ActivityWorker
- type Decider
- type JsonMarshaler
- type Marshaler
- type SwfWorkflow
- func (w *SwfWorkflow) As(a interface{}) error
- func (s *SwfWorkflow) Complete(a interface{})
- func (s *SwfWorkflow) ContinueAsNewWorkflow(a interface{})
- func (w *SwfWorkflow) Decide()
- func (s *SwfWorkflow) Fail(reason, details string)
- func (s *SwfWorkflow) Go(do Task, data interface{}) TaskResultChan
- func (s *SwfWorkflow) GoWithCustomId(do Task, data interface{}, thisId string) TaskResultChan
- func (s *SwfWorkflow) Mark(details interface{}, markerName string)
- func (s *SwfWorkflow) MarkString(details, markerName string)
- func (s *SwfWorkflow) Once(what func())
- func (s *SwfWorkflow) StartTimer(dur time.Duration) TimerResultChan
- type Task
- type TaskResult
- type TaskResultChan
- type TimerResult
- type TimerResultChan
- type WorkflowHandler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ActivityContext ¶
type ActivityContext struct { Input string Activity swf.ActivityType // when the next heartbeat is sent out, this channel will be checked for the last message. HeartbeatDetails chan string CancelRequested chan bool // contains filtered or unexported fields }
func (*ActivityContext) As ¶
func (a *ActivityContext) As(out interface{}) error
func (*ActivityContext) CloseWithoutResult ¶
func (a *ActivityContext) CloseWithoutResult()
called if you want to stop processing an activity, and stop its heartbeat but don't want to respond with a result to this activity. you may want to do this if the activity requires human intervention, and you don't want to consume resources waiting for that interaction.
func (*ActivityContext) Completed ¶
func (a *ActivityContext) Completed(result interface{})
mark this activity as succesfully completed
func (*ActivityContext) Failed ¶
func (a *ActivityContext) Failed(reason, details string)
mark this activity as Failed
func (*ActivityContext) GetRawTask ¶
func (a *ActivityContext) GetRawTask() *swf.PollForActivityTaskResponse
func (*ActivityContext) SuccesfullyCancel ¶
func (a *ActivityContext) SuccesfullyCancel(details string)
mark this activity as succesfully canceled
type ActivityHandler ¶
type ActivityHandler func(*ActivityContext)
type ActivityWorker ¶
type ActivityWorker struct { Domain string // default is: ec2 instace Id + uuid Identity string TaskList string Region string // leave blank to use default json marshaller Marshaler Marshaler // maximum number of workers we can spawn. Default is infinite MaxWorkers int // contains filtered or unexported fields }
func (*ActivityWorker) RegisterActivity ¶
func (a *ActivityWorker) RegisterActivity(actType swf.ActivityType, handler ActivityHandler)
func (*ActivityWorker) SetActivityHeartbeatInterval ¶
func (a *ActivityWorker) SetActivityHeartbeatInterval(actType swf.ActivityType, dur time.Duration)
overrides our default heartbeat interval of 1 minute
func (*ActivityWorker) Start ¶
func (a *ActivityWorker) Start() error
starts polling for activitiies, indefinitely.
type Decider ¶
type Decider struct { Domain string // default is: ec2 instace Id + uuid Identity string TaskList string Region string // maximum number of workers we can spawn. Default is infinite MaxWorkers int // leave blank for default JSON marshaller Marshaler Marshaler // contains filtered or unexported fields }
will poll for new decision tasks, and delegate them in new coroutines via the workflowhandler.
func (*Decider) RegisterWorkflow ¶
func (d *Decider) RegisterWorkflow(workflow swf.WorkflowType, handler WorkflowHandler)
type JsonMarshaler ¶
type JsonMarshaler struct { }
func (JsonMarshaler) Marshal ¶
func (j JsonMarshaler) Marshal(a interface{}) (string, error)
func (JsonMarshaler) Unmarshal ¶
func (j JsonMarshaler) Unmarshal(in string, out interface{}) error
type SwfWorkflow ¶
type SwfWorkflow struct {
// contains filtered or unexported fields
}
an instance of a workflow
func (*SwfWorkflow) As ¶
func (w *SwfWorkflow) As(a interface{}) error
decodes the execution workflows input as the given interface
func (*SwfWorkflow) Complete ¶
func (s *SwfWorkflow) Complete(a interface{})
func (*SwfWorkflow) ContinueAsNewWorkflow ¶
func (s *SwfWorkflow) ContinueAsNewWorkflow(a interface{})
continues this workflow as a new one, uses defaults for all settings. 'a' will be marshalled to form the input
func (*SwfWorkflow) Decide ¶
func (w *SwfWorkflow) Decide()
Must be called for our decisions to be posted to the server in your handler you should always defer Decide() so that it gets executed.
func (*SwfWorkflow) Fail ¶
func (s *SwfWorkflow) Fail(reason, details string)
func (*SwfWorkflow) Go ¶
func (s *SwfWorkflow) Go(do Task, data interface{}) TaskResultChan
launches an activity calls to this method must be deterministic in ordering
func (*SwfWorkflow) GoWithCustomId ¶
func (s *SwfWorkflow) GoWithCustomId(do Task, data interface{}, thisId string) TaskResultChan
launches an activity, but sets a custom id to id. you must ensure 'thisId' is determinicitically created with respect to ordering. as 'thisId' is used to see if the activity is scheduled/completed/failed etc..
func (*SwfWorkflow) Mark ¶
func (s *SwfWorkflow) Mark(details interface{}, markerName string)
creates a marker of your choice. Marshaling details with the marshaler. Do not use a name in the pattern of 'awsgo.swfhelper.%d' as we reserve the use of that format
func (*SwfWorkflow) MarkString ¶
func (s *SwfWorkflow) MarkString(details, markerName string)
creates a marker of your choice with a raw string. Do not use a name in the pattern of 'awsgo.swfhelper.%d' as we reserve the use of that format
func (*SwfWorkflow) Once ¶
func (s *SwfWorkflow) Once(what func())
Executes the given function once. we record if this function has been executed or not my leaving a marker in the decision history.
func (*SwfWorkflow) StartTimer ¶
func (s *SwfWorkflow) StartTimer(dur time.Duration) TimerResultChan
stars a timer for the given duration
type Task ¶
type Task struct { // which activity to execute Activity swf.ActivityType // specifies the maximum time before which a worker processing a task of this type must report progress HeartbeatTimeout *int64 // maximum duration for this activity task. ScheduleToCloseTimeout *int64 // specifies the maximum duration the activity task can wait to be assigned to a worker ScheduleToStartTimeout *int64 // specifies the maximum duration a worker may take to process this activity task StartToCloseTimeout *int64 // specifies the name of the task list in which to schedule the activity task. TaskList string }
type TaskResult ¶
type TaskResult struct { Result string // the type of failure. // ScheduleActivityTaskFailed // FailureCause: see 'cause' http://docs.aws.amazon.com/amazonswf/latest/apireference/API_ScheduleActivityTaskFailedEventAttributes.html // ActivityTaskCanceled // FailureCause: 'details' provided in cancel request // ActivityTaskTimedOut // FailureCause: 'details' last provided by a heartbeat // ActivityTaskFailed // FailureCause: 'details' provided in the failure if any. FailureType string FailureCause string // only appears for failuire type 'ActivityTaskFailed' FailureReason string // contains filtered or unexported fields }
func (TaskResult) As ¶
func (t TaskResult) As(out interface{}) error
type TaskResultChan ¶
type TaskResultChan chan TaskResult
a channel to get task results from. If this channel is closed, the we do not have a result yet.
type TimerResult ¶
type TimerResult struct { // if this was canceled of failed to be fired, this will be populated FailureCause string // contains filtered or unexported fields }
type TimerResultChan ¶
type TimerResultChan chan TimerResult
type WorkflowHandler ¶
type WorkflowHandler func(w *SwfWorkflow)
called on new coroutines when a new workflow needs to be executed.