Documentation ¶
Overview ¶
Statemachine is a featureful statemachine implementation for Metafora handlers to use. It is implemented as a Handler wrapper which provides a channel of incoming commands to wrapped handlers. Internal handlers are expected to shutdown cleanly and exit upon receiving a command from the state machine. The state machine will handle the state transition and restart the internal handler if necesary.
Users must provide a StateStore implementation for persisting task state and Command Listener implementation for receiving commands. See the m_etcd or embedded packages for example Command Listener implementations.
See the README in this package for details.
Index ¶
- Constants
- Variables
- func DefaultErrHandler(_ metafora.Task, errs []Err) (*Message, []Err)
- func New(task metafora.Task, h StatefulHandler, ss StateStore, cl CommandListener, ...) metafora.Handler
- type CommandListener
- type Commander
- type Err
- type ErrHandler
- type Message
- type MessageCode
- type State
- type StateCode
- type StateStore
- type StatefulHandler
- type Transition
Constants ¶
const ( DefaultErrLifetime = -4 * time.Hour DefaultErrMax = 8 )
Variables ¶
var ( MissingUntilError = errors.New("sleeping state missing deadline") MissingErrorsError = errors.New("fault state has no errors") ReleasableError = errors.New("network error, release and retry") )
var ExceededErrorRate = errors.New("exceeded error rate")
ExceededErrorRate is returned by error handlers in an Error Message when retry logic has been exhausted for a handler and it should transition to Failed.
var ( // Rules is the state transition table. Rules = [...]Transition{ {Event: Checkpoint, From: Runnable, To: Runnable}, {Event: Release, From: Runnable, To: Runnable}, {Event: Sleep, From: Runnable, To: Sleeping}, {Event: Complete, From: Runnable, To: Completed}, {Event: Kill, From: Runnable, To: Killed}, {Event: Error, From: Runnable, To: Fault}, {Event: Pause, From: Runnable, To: Paused}, {Event: Run, From: Runnable, To: Runnable}, {Event: Checkpoint, From: Sleeping, To: Sleeping}, {Event: Release, From: Sleeping, To: Sleeping}, {Event: Sleep, From: Sleeping, To: Sleeping}, {Event: Run, From: Sleeping, To: Runnable}, {Event: Kill, From: Sleeping, To: Killed}, {Event: Pause, From: Sleeping, To: Paused}, {Event: Error, From: Sleeping, To: Fault}, {Event: Sleep, From: Fault, To: Sleeping}, {Event: Error, From: Fault, To: Failed}, {Event: Checkpoint, From: Paused, To: Paused}, {Event: Release, From: Paused, To: Paused}, {Event: Run, From: Paused, To: Runnable}, {Event: Sleep, From: Paused, To: Sleeping}, {Event: Kill, From: Paused, To: Killed}, {Event: Pause, From: Paused, To: Paused}, } )
Functions ¶
func DefaultErrHandler ¶
DefaultErrHandler returns a Fail message if 8 errors have occurred in 4 hours. Otherwise it enters the Sleep state for 10 minutes before trying again.
func New ¶
func New(task metafora.Task, h StatefulHandler, ss StateStore, cl CommandListener, e ErrHandler) metafora.Handler
New handler that creates a state machine and exposes state transitions to the given handler by calling its Transition method. It should be created in the HandlerFunc you use with metafora's Consumer.
If ErrHandler is nil DefaultErrHandler will be used.
Types ¶
type CommandListener ¶
type CommandListener interface { Receive() <-chan *Message Stop() }
type Err ¶
type Err struct { Time time.Time `json:"timestamp"` Err string `json:"error"` // contains filtered or unexported fields }
Err represents an error that occurred while a stateful handler was running.
NewErr was added to allow callers to construct an instance from an underlying error. The underlying error is now preserved so that Err can be converted back using errors.As This is useful for custom error handlers that wish to inspect underlying error types and decision accordingly.
type ErrHandler ¶
ErrHandler functions should return Run, Sleep, or Fail messages depending on the rate of errors.
Either ErrHandler and/or StateStore should trim the error slice to keep it from growing without bound.
type Message ¶
type Message struct { Code MessageCode `json:"message"` // Until is when the statemachine should transition from sleeping to runnable Until *time.Time `json:"until,omitempty"` // Err is the error that caused this Error message Err error `json:"error,omitempty"` }
Messages are events that cause state transitions. Until and Err are used by the Sleep and Error messages respectively.
func CheckpointMessage ¶
func CheckpointMessage() *Message
func CompleteMessage ¶
func CompleteMessage() *Message
func ErrorMessage ¶
ErrorMessage is a simpler helper for creating error messages from an error.
func KillMessage ¶
func KillMessage() *Message
func PauseMessage ¶
func PauseMessage() *Message
func ReleaseMessage ¶
func ReleaseMessage() *Message
func RunMessage ¶
func RunMessage() *Message
func SleepMessage ¶
SleepMessage is a simpler helper for creating sleep messages from a time.
type MessageCode ¶
type MessageCode string
MessageCode is the symbolic name of a state transition.
const ( Run MessageCode = "run" Sleep MessageCode = "sleep" Pause MessageCode = "pause" Kill MessageCode = "kill" Error MessageCode = "error" Complete MessageCode = "complete" Checkpoint MessageCode = "checkpoint" // Special event which triggers state machine to exit without transitioning // between states. Release MessageCode = "release" )
func (MessageCode) String ¶
func (m MessageCode) String() string
type State ¶
type State struct { Code StateCode `json:"state"` Until *time.Time `json:"until,omitempty"` Errors []Err `json:"errors,omitempty"` }
State represents the current state of a stateful handler. See StateCode for details. Until and Errors are extra state used by the Sleeping and Fault states respectively.
type StateCode ¶
type StateCode string
StateCode is the actual state key. The State struct adds additional metadata related to certain StateCodes.
const ( Runnable StateCode = "runnable" // Scheduled Sleeping StateCode = "sleeping" // Scheduled, not running until time has elapsed Completed StateCode = "completed" // Terminal, not scheduled Killed StateCode = "killed" // Terminal, not scheduled Failed StateCode = "failed" // Terminal, not scheduled Fault StateCode = "fault" // Scheduled, in error handling / retry logic Paused StateCode = "paused" // Scheduled, not running )
type StateStore ¶
type StateStore interface { // Load the persisted or initial state for a task. Errors will cause tasks to // be marked as done. // // The one exception is the special error StateNotFound which will cause the // state machine to start from the initial (Runnable) state. Load(metafora.Task) (*State, error) // Store the current task state. Errors will prevent current state from being // persisted and prevent state transitions. Store(metafora.Task, *State) error }
StateStore is an interface implementations must provide for persisting task state. Since the task ID is provided on each method call a single global StateStore can be used and implementations should be safe for concurrent access.
type StatefulHandler ¶
StatefulHandler is the function signature that the state machine is able to run. Instead of metafora.Handler's Stop method, StatefulHandlers receive Messages via the commands chan and return their exit status via a Message.
Normally StatefulHandlers simply return a Message as soon as it's received on the commands chan. However, it's also acceptable for a handler to return a different Message. For example if it encounters an error during shutdown, it may choose to return that error as an Error Message as opposed to the original command.
type Transition ¶
type Transition struct { Event MessageCode From StateCode To StateCode }
Transitions represent a state machine transition from one state to another given an event message.
func (Transition) String ¶
func (t Transition) String() string