work

package
v0.0.0-...-c5dc566 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2017 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package work provides mechanisms for executing a workflow, which is represented by the Labor object. The Labor object has the following structure:

Labor

  |__Task
	|  |__Sequence
	|	 | |__Job
	|	 | |__Job
	|  |__Sequence
	|    |__Job
	|	   |__Job
	|__Task
	   |__Sequence
		 ...
	...

Labor

Represents the body of work to be done.

Tasks

Execute one at a time. Failure of a task prevents other tasks from running.

Sequences

Different sequences *May* run concurrently. They represent a change to a single entity.

Job

A Job is what does the work. Each job in a Sequence is run sequentially. Failure of one Job fails the Sequence.

Index

Constants

This section is empty.

Variables

View Source
var Registry = map[string]func(args interface{}) Storage{}

Registry holds a register of names to store implementations. Public to allow testing.

Functions

func RegisterStore

func RegisterStore(name string, f func(interface{}) Storage)

RegisterStore registers a storage method that can be utilized. Should only be called during init() and duplicate names panic.

Types

type BaseFilter

type BaseFilter struct {
	// States returns objects that are currently at one the "states".
	// If empty, all states are included.
	States []State
}

BaseFilter is a basic filter that is included in all search related filters.

type CogInfo

type CogInfo struct {
	// contains filtered or unexported fields
}

CogInfo contains information about Cogs running on the system.

func NewCogInfo

func NewCogInfo(maxCrashes int) (*CogInfo, error)

NewCogInfo creates a CogInfo.

type Hub

type Hub struct {

	// Signal provides access to signalling methods.
	Signal chan SignalMsg
	// contains filtered or unexported fields
}

Hub provides access to an object's state data, various statistics, and signally methods.

func NewHub

func NewHub() *Hub

NewHub is a constructor for Hub.

func (*Hub) Executing

func (s *Hub) Executing() int

Executing gets the number of sub objects that are currently executing.

func (*Hub) SetState

func (s *Hub) SetState(st State)

SetState allows setting of the state data in a thread safe manner.

func (*Hub) State

func (s *Hub) State() State

State retrieves the state in a thread safe manner.

type Job

type Job struct {
	*Hub

	Meta

	// CogPath is the name of the Cog the Job will use.
	CogPath string

	// Args are the arguments to the Cog.
	Args string

	// ArgsType is the encoding method for the Args.
	ArgsType pb.ArgsType

	// Output is the output from the Cog.
	Output string

	// Timeout is the amount of time a Job is allowed to run.
	Timeout time.Duration

	// Retries is the number of times to retry a Cog that fails.
	Retries int

	// RetryDelay is how long to wait between retrying a Cog.
	RetryDelay time.Duration

	// RealUser is the username of user Submitting the Labor to Marmot.
	RealUser string
	// contains filtered or unexported fields
}

Job represents work to be done, via a Cog.

func NewJob

func NewJob(cogPath, desc string) *Job

NewJob is the constructor for Job.

func (*Job) SetState

func (j *Job) SetState(st State)

SetState overrides the internal Hub.SetState() adding writing to storage.

func (*Job) Validate

func (j *Job) Validate() error

Validate validates Job's attributes.

type Labor

type Labor struct {
	*Hub

	Meta

	// ClientID is a unique ID sent by the client.
	ClientID string

	// Tags are single word text strings that can be used to group Labors when
	// doing a search.
	Tags []string

	// Tasks are the tasks that are associated with this Labor.
	Tasks []*Task

	// Completed is closed when the Labor has finished execution.
	Completed chan bool
	// contains filtered or unexported fields
}

Labor represents a total unit of work to be done.

func NewLabor

func NewLabor(name, desc string) *Labor

NewLabor is the constructor for Labor.

func (*Labor) SetCogInfo

func (l *Labor) SetCogInfo(cogInfo *CogInfo)

SetCogInfo sets the internal .cogInfof for all containers to "cogInfo".

func (*Labor) SetIDs

func (l *Labor) SetIDs()

SetIDs creates new UUIDv4 IDs for all containers.

func (*Labor) SetSignalDrain

func (l *Labor) SetSignalDrain()

SetSignalDrain sets the internal .signalDrain for all containers.

func (*Labor) SetState

func (l *Labor) SetState(st State)

SetState overrides the internal Hub.SetState() adding writing to storage.

func (*Labor) SetStorage

func (l *Labor) SetStorage(store Storage)

SetStorage sets the internal .store for all containers to "store".

func (*Labor) Signal

func (l *Labor) Signal(signal Signal) error

Signal adjusts the running state to a preliminary state to be in line with the signal (PAUSE signal would induce a PAUSING state), sends the signal to all Tasks, and then adjusts the Labor's state to its final state.

func (*Labor) Start

func (l *Labor) Start() error

Start starts processing the Labor.

func (*Labor) Validate

func (l *Labor) Validate() error

Validate validates that the Labor is correctly formed.

type LaborFilter

type LaborFilter struct {
	BaseFilter

	// NamePrefix locates Labors starting with the string.
	// NameSuffix locates Labors ending with the string.
	NamePrefix, NameSuffix string

	// Tags matches any Labor that has any tag listed.
	Tags []string

	// SubmitBegin includes only Labors that were submitted at or after
	// SubmitBegin.
	// SubmitEnd will only include an object which was submitted before SubmitEnd.
	SubmitBegin, SubmitEnd time.Time
}

LaborFilter is a search filter used for searching for Labors.

type Meta

type Meta struct {
	// ID contains a unique UUIDv4 representing the object.
	ID string

	// Name contains the name to be displayed to users.
	Name string

	// Desc contains a description of what the object is doing.
	Desc string

	// Started is when an object begins execution.
	Started time.Time

	// Ended is when an object stops execution.
	Ended time.Time

	// Submitted is when a Labor was submitted to the system.  Only valid for Labors.
	Submitted time.Time
	// contains filtered or unexported fields
}

Meta contains metadata information about different objects. Not all fields are used in every object.

func (*Meta) Reason

func (m *Meta) Reason() Reason

Reason retrieves the failure reason for the object.

func (*Meta) SetReason

func (m *Meta) SetReason(r Reason)

SetReason sets the failure reason in a thread safe manner.

func (*Meta) Validate

func (m *Meta) Validate() error

Validate validates Meta's attributes.

type Read

type Read interface {
	// Labor reads a Labor from storage.  If full==false, it only returns the
	// Labor data itself and none of the sub-containers.
	RLabor(ctx context.Context, id string, full bool) (*Labor, error)

	// Task reads a Task from storage. This only returns the Task data.
	RTask(ctx context.Context, id string) (*Task, error)

	// Sequence reads a Sequence from storage.  It only returns the Sequence
	// data and non of the sub-containers.
	RSequence(ctx context.Context, id string) (*Sequence, error)

	// Job reads a Job from storage.  If full==false, it will not return the
	// Job.Args or Job.Output.
	RJob(ctx context.Context, id string, full bool) (*Job, error)
}

Read provides read access to data in Storage. All methods must return copies of the containers, not the actual containers.

type Reason

type Reason int

Reason details reasons that a Container fails.

const (
	// NoFailure indicates there was no failures in the execution.
	NoFailure Reason = 0

	// PreCheckFailure indicates that a pre check caused a failure.
	PreCheckFailure Reason = 1

	// ContChecks indicates a continuous check caused a failure.
	ContCheckFailure Reason = 2

	// MaxFailures indicates that the maximum number of Jobs failed.
	MaxFailures Reason = 3
)

Note: Changes to this list require running stringer!

func (Reason) String

func (i Reason) String() string
type Search interface {
	// Labors streams a time ordered stream of Labors matching the filter.
	// They are ordered from earliest submitted.  They do not contain data.
	Labors(ctx context.Context, filter LaborFilter) (chan SearchResult, error)
}

Search provides search access to data in Storage.

type SearchResult

type SearchResult struct {
	// Labor is the found Labor.
	Labor *Labor

	// Error indicates the stream had an error.
	Error error
}

SearchResult is the result of a Search.Labors().

type Sequence

type Sequence struct {
	*Hub
	Meta

	// Target is the name of what this sequence targets. This could be a device,
	// a service, a directory... whatever makes sense for whatever is being done.
	Target string

	// Jobs is a sequence of jobs to execute.
	Jobs []*Job
	// contains filtered or unexported fields
}

Sequence represents a sequence of Jobs to execute.

func NewSequence

func NewSequence(target string, done chan bool) *Sequence

NewSequence is the constructor for Sequence.

func (*Sequence) SetState

func (s *Sequence) SetState(st State)

SetState overrides the internal Hub.SetState() adding writing to storage.

func (*Sequence) Signal

func (s *Sequence) Signal(signal Signal)

Signal allows signalling of this Task.

func (*Sequence) Validate

func (s *Sequence) Validate() error

Validate validates the Sequences attributes.

type Signal

type Signal int

Signal is used to tell objects to switch execution states.

const (
	// Stop indicates to stop execution if running.
	Stop Signal = iota

	// Pause indicates to pause execution if running.
	Pause

	// CrisisPause indicates to pause execution if running.
	CrisisPause

	// AdminPause indicates to pause execution if running.
	AdminPause

	// Resume indicates to resume execution if in Paused state.
	Resume

	// CrisisResume indicates to resume execution if in CrisisPaused state.
	CrisisResume

	// AdminResume indicates to resume execution if in the AdminPaused state.
	AdminResume
)

type SignalMsg

type SignalMsg struct {
	// Signal is the signal type you are sending.
	Signal Signal

	// Ack is closed when the sub-object has finished its signal processing.
	Ack chan bool
}

SignalMsg is used to send a signal to an object and receive an ackknowledgement back.

type State

type State int

State represents the exeuction state of an object. Not all states can be used in all objects.

const (
	// UnknownState indicates the object's state hasn't been set.  This should
	// be resolved before object execution.
	UnknownState State = 0

	// NotStarted indicates that the object hasn't started execution.
	NotStarted State = 1

	// AdminNotStarted indicates the Labor object was sent to the server, but
	// was not intended to run until started by an RPC or human.
	AdminNotStarted State = 2

	// Running indicates the object is currently executing.
	Running State = 3

	// Pausing indicates the object is intending to pause execution.
	Pausing State = 4

	// Paused indicates the object has paused execution.
	Paused State = 5

	// AdminPaused indicates that a human has paused execution.
	AdminPaused State = 6

	// CrisisPaused indicates that the crisis service for emergency control has
	// paused execution.
	CrisisPaused State = 7

	// Completed indicates the object has completed execution succcessfully.
	Completed State = 8

	// Failed indicates the object failed its execution.
	Failed State = 9

	// Stopping indicates the object is attempting to stop.
	Stopping State = 10

	// Stopped indicates the object's execution was stopped.
	Stopped State = 11
)

Note: Changes to this list require running stringer!

func (State) String

func (i State) String() string

type Storage

type Storage interface {
	Read
	Write
	Search
}

Storage provides access to the underlying storage system.

type Task

type Task struct {
	*Hub
	Meta

	// PreChecks are Jobs that are executed before executing the Jobs in the
	// Task. If any of these fail, the Task is not executed and fails.
	// This is used provide checks before initiating actions. These are run
	// concurrently.
	PreChecks []*Job

	// ContChecks are Jobs that are exeucted continuously until the task ends
	// with ContCheckInterval between runs.  If any check fails, the Task stops
	// execution and fails.  These are run concurrently.
	ContChecks []*Job

	// ToleratedFailures is how many failures to tolerate before stopping.
	ToleratedFailures int

	// Concurrency is how many Jobs to run simultaneously.
	Concurrency int

	// ContCheckInterval is how long between runs of ContCheck Jobs.
	ContCheckInterval time.Duration

	// PassFailures causes the failures in this Task to be passed to the next
	// Task, acting againsts its ToleratedFailures.
	PassFailures bool

	// Sequence are a set of Jobs.
	Sequences []*Sequence
	// contains filtered or unexported fields
}

Task represents a selection of work to be executed. This may represents a canary, general rollout, or set of work to be executed concurrently.

func NewTask

func NewTask(name, desc string, done chan bool) *Task

NewTask is the constructor for Task.

func (*Task) SetState

func (t *Task) SetState(st State)

SetState overrides the internal Hub.SetState() adding writing to storage.

func (*Task) Signal

func (t *Task) Signal(signal Signal)

Signal allows signalling of this Task.

func (*Task) Validate

func (t *Task) Validate() error

Validate validates the Tasks attributes.

type Write

type Write interface {
	// Labor writes the Labor and sub-containers to storage.
	// This should only be used either on ingest of a Labor or during a
	// recovery of a Labor.
	WLabor(ctx context.Context, l *Labor) error

	// WLaborState allows updating of the Labor's State and Reason.
	WLaborState(ctx context.Context, l *Labor) error

	// Task writes out a Task to storage.  This only writes the Task data, none
	// of the sub-containers.
	WTask(ctx context.Context, t *Task) error

	// Sequence writes out a Sequence to storage.  This only writes the Sequence
	// data, none of the sub-containers.
	WSequence(ctx context.Context, s *Sequence) error

	// Job writes out a Job to storage.  This only writes the Job data.  It does
	// not write the Job.Args, as they should only be recorded once by a call to
	// Write.Labor().
	WJob(ctx context.Context, j *Job) error
}

Write provides write access to data in Storage.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL