swfhelper

package
v0.0.0-...-1a19813 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 21, 2015 License: BSD-3-Clause Imports: 12 Imported by: 0

Documentation

Overview

The SwfHelper is meant to make the decision process execution task easier. As decision tasks can go to any decider, they need to be able to easily recreate the history, and make decisions.

Of the swf helper there are two main types. Deciders, and Activities.

Deciders ========= Deciders must be deterministic in flow. A decider is registered by given the workflow type and a function callback.

// this creates the decider manager/poller
// it will poll the specified region, domain and task list to get decision tasks
decider := &swfhelper.Decider{
    Region: "us-west-2",
    Domain: "yewwdev",
    TaskList: "hello1",
    Identity: "atHome1",
    MaxWorkers: 1,
}
// We must register a function to a workflow. Any decision tasks that come in,
// and match this type, and version will be handled by the 'BasicDecider' function
decider.RegisterWorkflow(swf.WorkflowType{Name:"hello", Version:"1"}, BasicDecider)
// we now block constantly polling SWF for decision tasks
decider.Start()

The decider itself, is just a function like so:

// the swfhelper.SwfWorkflow is the object we will manipulate to our needs.
// This function will be called multiple times during the lifetime of a single
// workflow. Log lines, or anything not contained in an activity will be executed
// multiple times, as each time a new decision is required for a workflow, this function
// will be called. Thus the ordering of w.Go calls must be deterministic.
func BasicDecider(w *swfhelper.SwfWorkflow) {
    // we must always make a decision after a workflow
    // otherwise swfhelper will not respond with a decision
    defer w.Decide()

    // first we define a task/activity that we want to execute
    task := swfhelper.Task{
        Activity: swf.ActivityType{
            Name:"getnum", Version:"1",
        },
    }
    // we then ask for the execution of it, passing in the 'hello1' string as a variable
    // this will return a channel. If we have not executed the task yet, or the task is
    // not yet complete, a closed channel will be returned. If we are wanting to wait
    // for the response of this task, we must return on a closed channel.
    helloTask := w.Go(task, "hello1")
    res, ok := <- helloTask
    if !ok {
        // return as the channel is closed, and we need the response to continue
        return
    }
    // logging the output of the first task
    // note: that this log line wil appear again after helloTask2 is completed
    log.Println("1. Error:", res.FailureType, res.FailureCause)
    log.Println("1. Result:", res.Result)

    // here we will execute two tasks in parallel. and then wait for their responses
    helloTask2 := w.Go(task, "hello2")
    helloTask3 := w.Go(task, "hello3")
    // wait for hello2
    res2, ok := <- helloTask2
    if !ok {
        log.Println("Task2 not ready")
        return
    }
    // wait for hello3
    res3, ok := <- helloTask3
    if !ok {
        log.Println("Task3 not ready")
        return
    }

    log.Println("2. Error:", res2.FailureType, res2.FailureCause)
    log.Println("2. Result:", res2.Result)
    log.Println("3. Error:", res3.FailureType, res3.FailureCause)
    log.Println("3. Result:", res3.Result)

    // now that all the tasks are done, complete the workflow
    w.Complete("Just amazing")
}

The SwfWorkflow object uses the function name 'Go' to launch an activity. It will return a channel. This channel will be closed if the result (success or failure) has not yet been reached on the task. You should never block in the decider, unless you want no decisions (Eg. 'Go', 'Completed', 'StartTimer') to be posted. Decision tasks are not sticky, so if you have multiple deciders running any of them can handle the next decision for a single workflow. Ordering of calls to 'Go' must be deterministic, and be exactly the same, no matter the order of response from SWF. This is so we can properly identify which response matches with which call.

Activities ========== Activities do not need to be determintic. They are execute once per task, and return their a response to the decider.

// the worker here will listen to the domain, region and task list for activity tasks
act := &swfhelper.ActivityWorker{
    Region: "us-west-2",
    Domain: "yewwdev",
    TaskList: "actlist",
    Identity: "atHome1-worker",
    MaxWorkers: 1,
}
// we register the function 'ActivityOne' to handle the 'getnum' swf activity.
act.RegisterActivity(swf.ActivityType{Name:"getnum", Version:"1"}, ActivityOne)
// block polling for new activity tasks
act.Start()

The actual activity is defined as such:

// swfhelper.ActivityContext provides us with the input to the activity, and ways
// to respond to it.
func ActivityOne(a *swfhelper.ActivityContext) {
    myRes := int64(rand.Int())
    // we defer the completed decision, as we must make a decision at the end
    defer a.Completed(strconv.FormatInt(myRes, 10))
    defer log.Println("Hey:", a.Input, myRes)
    time.Sleep(5 * time.Second + time.Second * time.Duration(rand.Float32() * 5))
}

Activities and deciders do not need to run on the same machine, or executable.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ActivityContext

type ActivityContext struct {
	Input    string
	Activity swf.ActivityType

	// when the next heartbeat is sent out, this channel will be checked for the last message.
	HeartbeatDetails chan string
	CancelRequested  chan bool
	// contains filtered or unexported fields
}

func (*ActivityContext) As

func (a *ActivityContext) As(out interface{}) error

func (*ActivityContext) CloseWithoutResult

func (a *ActivityContext) CloseWithoutResult()

called if you want to stop processing an activity, and stop its heartbeat but don't want to respond with a result to this activity. you may want to do this if the activity requires human intervention, and you don't want to consume resources waiting for that interaction.

func (*ActivityContext) Completed

func (a *ActivityContext) Completed(result interface{})

mark this activity as succesfully completed

func (*ActivityContext) Failed

func (a *ActivityContext) Failed(reason, details string)

mark this activity as Failed

func (*ActivityContext) GetRawTask

func (*ActivityContext) SuccesfullyCancel

func (a *ActivityContext) SuccesfullyCancel(details string)

mark this activity as succesfully canceled

type ActivityHandler

type ActivityHandler func(*ActivityContext)

type ActivityWorker

type ActivityWorker struct {
	Domain string
	// default is: ec2 instace Id + uuid
	Identity string
	TaskList string

	Region string

	// leave blank to use default json marshaller
	Marshaler Marshaler

	// maximum number of workers we can spawn. Default is infinite
	MaxWorkers int
	// contains filtered or unexported fields
}

func (*ActivityWorker) RegisterActivity

func (a *ActivityWorker) RegisterActivity(actType swf.ActivityType, handler ActivityHandler)

func (*ActivityWorker) SetActivityHeartbeatInterval

func (a *ActivityWorker) SetActivityHeartbeatInterval(actType swf.ActivityType, dur time.Duration)

overrides our default heartbeat interval of 1 minute

func (*ActivityWorker) Start

func (a *ActivityWorker) Start() error

starts polling for activitiies, indefinitely.

type Decider

type Decider struct {
	Domain string
	// default is: ec2 instace Id + uuid
	Identity string
	TaskList string

	Region string
	// maximum number of workers we can spawn. Default is infinite
	MaxWorkers int

	// leave blank for default JSON marshaller
	Marshaler Marshaler
	// contains filtered or unexported fields
}

will poll for new decision tasks, and delegate them in new coroutines via the workflowhandler.

func (*Decider) RegisterWorkflow

func (d *Decider) RegisterWorkflow(workflow swf.WorkflowType, handler WorkflowHandler)

func (*Decider) Start

func (d *Decider) Start() error

starts polling for decisions, indefinitely.

type JsonMarshaler

type JsonMarshaler struct {
}

func (JsonMarshaler) Marshal

func (j JsonMarshaler) Marshal(a interface{}) (string, error)

func (JsonMarshaler) Unmarshal

func (j JsonMarshaler) Unmarshal(in string, out interface{}) error

type Marshaler

type Marshaler interface {
	Marshal(a interface{}) (string, error)
	Unmarshal(in string, out interface{}) error
}

type SwfWorkflow

type SwfWorkflow struct {
	// contains filtered or unexported fields
}

an instance of a workflow

func (*SwfWorkflow) As

func (w *SwfWorkflow) As(a interface{}) error

decodes the execution workflows input as the given interface

func (*SwfWorkflow) Complete

func (s *SwfWorkflow) Complete(a interface{})

func (*SwfWorkflow) ContinueAsNewWorkflow

func (s *SwfWorkflow) ContinueAsNewWorkflow(a interface{})

continues this workflow as a new one, uses defaults for all settings. 'a' will be marshalled to form the input

func (*SwfWorkflow) Decide

func (w *SwfWorkflow) Decide()

Must be called for our decisions to be posted to the server in your handler you should always defer Decide() so that it gets executed.

func (*SwfWorkflow) Fail

func (s *SwfWorkflow) Fail(reason, details string)

func (*SwfWorkflow) Go

func (s *SwfWorkflow) Go(do Task, data interface{}) TaskResultChan

launches an activity calls to this method must be deterministic in ordering

func (*SwfWorkflow) GoWithCustomId

func (s *SwfWorkflow) GoWithCustomId(do Task, data interface{}, thisId string) TaskResultChan

launches an activity, but sets a custom id to id. you must ensure 'thisId' is determinicitically created with respect to ordering. as 'thisId' is used to see if the activity is scheduled/completed/failed etc..

func (*SwfWorkflow) Mark

func (s *SwfWorkflow) Mark(details interface{}, markerName string)

creates a marker of your choice. Marshaling details with the marshaler. Do not use a name in the pattern of 'awsgo.swfhelper.%d' as we reserve the use of that format

func (*SwfWorkflow) MarkString

func (s *SwfWorkflow) MarkString(details, markerName string)

creates a marker of your choice with a raw string. Do not use a name in the pattern of 'awsgo.swfhelper.%d' as we reserve the use of that format

func (*SwfWorkflow) Once

func (s *SwfWorkflow) Once(what func())

Executes the given function once. we record if this function has been executed or not my leaving a marker in the decision history.

func (*SwfWorkflow) StartTimer

func (s *SwfWorkflow) StartTimer(dur time.Duration) TimerResultChan

stars a timer for the given duration

type Task

type Task struct {
	// which activity to execute
	Activity swf.ActivityType
	// specifies the maximum time before which a worker processing a task of this type must report progress
	HeartbeatTimeout *int64
	// maximum duration for this activity task.
	ScheduleToCloseTimeout *int64
	// specifies the maximum duration the activity task can wait to be assigned to a worker
	ScheduleToStartTimeout *int64
	// specifies the maximum duration a worker may take to process this activity task
	StartToCloseTimeout *int64
	// specifies the name of the task list in which to schedule the activity task.
	TaskList string
}

type TaskResult

type TaskResult struct {
	Result string

	// the type of failure.
	//      ScheduleActivityTaskFailed
	//          FailureCause: see 'cause' http://docs.aws.amazon.com/amazonswf/latest/apireference/API_ScheduleActivityTaskFailedEventAttributes.html
	//      ActivityTaskCanceled
	//          FailureCause: 'details' provided in cancel request
	//      ActivityTaskTimedOut
	//          FailureCause: 'details' last provided by a heartbeat
	//      ActivityTaskFailed
	//          FailureCause: 'details' provided in the failure if any.
	FailureType  string
	FailureCause string
	// only appears for failuire type 'ActivityTaskFailed'
	FailureReason string
	// contains filtered or unexported fields
}

func (TaskResult) As

func (t TaskResult) As(out interface{}) error

type TaskResultChan

type TaskResultChan chan TaskResult

a channel to get task results from. If this channel is closed, the we do not have a result yet.

type TimerResult

type TimerResult struct {

	// if this was canceled of failed to be fired, this will be populated
	FailureCause string
	// contains filtered or unexported fields
}

type TimerResultChan

type TimerResultChan chan TimerResult

type WorkflowHandler

type WorkflowHandler func(w *SwfWorkflow)

called on new coroutines when a new workflow needs to be executed.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL