sio

package
v2.0.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2019 License: Apache-2.0 Imports: 20 Imported by: 3

README

Single-crew core

This single-crew approach is designed for running individual crews in containers or micro VMs.

Can be coupled to MQTT, Redis, SQS, SNS, Kafka, stdin/stdout, etc.

See command-line programs siostd and siomq. mqclient is a simple MQTT command-line client that's convenient for talking to siomq (or to MQTT brokers in general).

Documentation

Overview

Package sio provides a single-crew system with pluggable IO.

Index

Constants

This section is empty.

Variables

View Source
var (
	// CaptainMachine is the id of the captain.
	CaptainMachine = "captain"
)
View Source
var DefaultLimits = &Limits{
	MaxSheens:        100,
	MaxStatePerSheen: 10 * 1024,
}

DefaultLimits is just that.

View Source
var (

	// Interpreters are the standard action interpreters.
	Interpreters = core.InterpretersMap{
		"goja":           interpreter,
		"ecmascript":     interpreter,
		"ecmascript-5.1": interpreter,
	}
)
View Source
var (
	// TimersMachine is the id of the timers machine.
	TimersMachine = "timers"
)

Functions

func DefaultState

func DefaultState(s *core.State) *core.State

DefaultState returns a state at "state" with empty bindings.

func JS

func JS(x interface{}) string

JS renders its argument as JSON or as '%#v'.

func JSON

func JSON(x interface{}) string

JSON renders its argument as pretty JSON or as '%#v".

func JShort

func JShort(x interface{}) string

JShort renders its argument as JS() but only up to 73 characters.

func ResolveSpecSource

func ResolveSpecSource(ctx context.Context, specSource interface{}) (*crew.SpecSource, *core.Spec, error)

ResolveSpecSource attempts to find and compile a spec based on a crew.SpecSource (or something that looks like one).

ToDo: Test and document.

func ShellExpand

func ShellExpand(msg string) (string, error)

ShellExpand expands shell commands delimited by '<<' and '>>'. Use at your wown risk, of course!

Types

type Changed

type Changed struct {
	State   *core.State      `json:",omitempty"`
	SpecSrc *crew.SpecSource `json:",omitempty"`
	Deleted bool             `json:",omitempty"`

	// PreviousState is optional data that can be used to decide
	// if the new state is really different from the old state.
	//
	// In this implementation, PreviousState is a JSON
	// representation of this struct.
	PreviousState []byte `json:"-"`
}

Changed represents changes to a machine after message processing.

type Couplings

type Couplings interface {
	// Start initializes the Couplings.
	Start(context.Context) error

	// IO returns the input and result channels.
	//
	// Consumer can see all emitted messages and state updates via
	// the Result(s).
	IO(context.Context) (chan interface{}, chan *Result, chan bool, error)

	// Read (optionally) returns an initial set of machines.
	//
	// An implementation that supports persistence would read
	// machine state and pass it to this method.
	Read(context.Context) (map[string]*crew.Machine, error)

	// Stop shuts down the Couplings.
	Stop(context.Context) error
}

Couplings provide channels for message input, results output, and persistence.

For example, an implementation could couple a crew to an MQTT broker (for IO). For persistence, an implementation could use https://github.com/etcd-io/bbolt, DynamoDB, SQLite, etc.

type Crew

type Crew struct {
	// Machines represents this's Crews current machines.
	Machines map[string]*crew.Machine

	// Conf provides some basic Crew parameters.
	Conf *CrewConf `json:"conf"`

	// Verbose turns on logging.
	Verbose bool

	// Mutex can probably be removed once code is cleaned up to
	// perform all state changes, including timers state changes,
	// the Crew loop.  ToDo.
	sync.Mutex
	// contains filtered or unexported fields
}

Crew represents a collection of machines and associated gear to support message processing, with I/O coupled via two channels (in and out).

func NewCrew

func NewCrew(ctx context.Context, conf *CrewConf, couplings Couplings) (*Crew, error)

NewCrew makes a crew with the given configuration and couplings.

The coupling's IO() method is called to obtain the crew's in/out channels.

func (*Crew) DeleteMachine

func (c *Crew) DeleteMachine(ctx context.Context, mid string) error

DeleteMachine removes a machine from the crew.

No error is returned if the machine doesn't exist.

func (*Crew) DoOp

func (c *Crew) DoOp(ctx context.Context, op *CrewOp) error

DoOp executes the given CrewOp.

func (*Crew) Errorf

func (c *Crew) Errorf(format string, args ...interface{})

Errorf writes a log with "ERROR" prepended.

func (*Crew) GetChanged

func (c *Crew) GetChanged(ctx context.Context) (map[string]*Changed, error)

GetChanged computes the net machine changes since this method was previously called.

ToDo: Make private.

func (*Crew) GetTimers

func (c *Crew) GetTimers(ctx context.Context) (*Timers, error)

GetTimers gets the Timers for the crew.

func (*Crew) Logf

func (c *Crew) Logf(format string, args ...interface{})

Logf logs if c.Verbose.

func (*Crew) Loop

func (c *Crew) Loop(ctx context.Context) error

Loop starts the input processing loop in the current goroutine.

This loop calls ProcessMsg on each message that arrives via the input coupling, and the loop halts when ctx.Done().

func (*Crew) NewCaptainSpec

func (c *Crew) NewCaptainSpec() *core.Spec

NewCaptainSpec creates a machine Spec for a "captain" who can execute CrewOps.

func (*Crew) NewTimersSpec

func (c *Crew) NewTimersSpec() *core.Spec

NewTimersSpec creates a new spec that can process a TimerMsg.

func (*Crew) ProcessMsg

func (c *Crew) ProcessMsg(ctx context.Context, msg interface{}) (*Result, error)

ProcessMsg processes the given message and returns the results, which can then be processed by the crew's Result coupling.

func (*Crew) RunMachine

func (c *Crew) RunMachine(ctx context.Context, msg interface{}, m *crew.Machine) (*core.Walked, error)

RunMachines presents the message to the given machine.

func (*Crew) RunMachines

func (c *Crew) RunMachines(ctx context.Context, msg interface{}) (map[string]*core.Walked, error)

RunMachines presents the message to the machines returned by toMachines.

func (*Crew) SetMachine

func (c *Crew) SetMachine(ctx context.Context, mid string, src *crew.SpecSource, state *core.State) error

SetMachine creates or updates a machine.

When the mid is either (the variable) TimersMachine and the given state is nil, the timers machine's state is reset.

type CrewConf

type CrewConf struct {
	Id  string        `json:"id,omitempty"`
	Ctl *core.Control `json:"ctl"`
}

CrewConf contains (or will contain) basic crew configuration data.

Not much is needed now.

type CrewOp

type CrewOp struct {
	Update map[string]*crew.Machine `json:"update,omitempty"`
	Delete []string                 `json:"delete,omitempty"`
}

CrewOp is a crude structure for crew-level operations (such as adding a machine).

func AsCrewOp

func AsCrewOp(msg interface{}) (*CrewOp, error)

AsCrewOp attempts to interpret the given message (hopefully a map) as a CrewOp.

type JSONStore

type JSONStore struct {
	// StateOutputFilename, if not empty, will be the filename
	// writing state as JSON.
	StateOutputFilename string

	// StateInputFilename optionall gives a filename that contains
	// state to return when Read is called.
	StateInputFilename string

	WG sync.WaitGroup
	// contains filtered or unexported fields
}

JSONStore is a primitive facility to store crew state as JSON in a file.

Not glamorous or efficient.

func NewJSONStore

func NewJSONStore() *JSONStore

func (*JSONStore) Read

func (s *JSONStore) Read(ctx context.Context) (map[string]*crew.Machine, error)

Read reads s.StateInputFilename, which should contain a JSON representation of the crew's state.

func (*JSONStore) Start

func (s *JSONStore) Start(ctx context.Context) error

Start does nothing.

func (*JSONStore) Stop

func (s *JSONStore) Stop(ctx context.Context, wait bool) error

Stop writes out the state if requested by StateInputFilename.

This function first waits for s.WG if told to.

func (*JSONStore) WriteState

func (s *JSONStore) WriteState(ctx context.Context) error

writeState writes the entire crew as JSON.

type Limits

type Limits struct {
	MaxSheens        int `json:"maxSheens"`
	MaxStatePerSheen int `json:"maxStatePerSheen"`
}

Limits provides some operation limits.

Currently these limits are not enforced at all.

ToDo: Use!

type Result

type Result struct {
	// Changed represents all machine changes.
	Changed map[string]*Changed

	// Emitted is list of message batches emitted by machines
	// during processing.
	//
	// A message batch is ordered: A machine (usually) emits
	// messages in a specified, deterministic order.
	//
	// The collection of batches is a partial order given by
	// recursive message processing calls.  When processing a
	// message results in emitted messages that are directed back
	// to the crew, the results of those recursive processings
	// give a determinstic order their emitted batches.  However,
	// with respect to a processig a single message, multiple
	// batches are NOT orders (because the order that machines are
	// presented with an in-bound message is not specified).
	Emitted [][]interface{}

	// Diag includes internal processing data.
	Diag []*Stroll
}

Result represents all visible output from processing a message.

type Stdio

type Stdio struct {
	// In is coupled to crew input.
	In io.Reader

	// Out is coupled to crew output.
	Out io.Writer

	// ShellExpand enables input to include inline shell commands
	// delimited by '<<' and '>>'.  Use at your wown risk, of
	// course!
	ShellExpand bool

	// Timestamps prepends a timestamp to each output line.
	Timestamps bool

	// EchoInput writes input lines (prepended with "input") to
	// the output.
	EchoInput bool

	// Tags prefixes tags indicating type of output ("input",
	// "emit", "diag").
	Tags bool

	// PadTags adds some padding to tags ("input", "emit",
	// "update") used in output.
	PadTags bool

	// PrintUpdates will print update messages to stdout.
	PrintUpdates bool

	JSONStore

	// InputEOF will be closed on EOF from stdin.
	InputEOF chan bool

	// WriteStatePerMsg will write out ALL state after every input
	// message is processed.
	//
	// Inefficient!
	WriteStatePerMsg bool

	// PrintDiag turns on printing of diagnostic data.
	PrintDiag bool
}

Stdio is a fairly simple Couplings that uses stdin for input and stdout for output.

State is optionally crudely written as JSON to a file.

func NewStdio

func NewStdio(shellExpand bool) *Stdio

NewStdio creates a new Stdio.

ShellExpand enables input to include inline shell commands delimited by '<<' and '>>'. Use at your wown risk, of course!

In and Out are initialized with os.Stdin and os.Stdout respectively.

func (*Stdio) IO

func (s *Stdio) IO(ctx context.Context) (chan interface{}, chan *Result, chan bool, error)

IO returns channels for reading from stdin and writing to stdout.

func (*Stdio) Read

func (s *Stdio) Read(ctx context.Context) (map[string]*crew.Machine, error)

Read reads s.StateInputFilename, which should contain a JSON representation of the crew's state.

func (*Stdio) Start

func (s *Stdio) Start(ctx context.Context) error

Start does nothing.

func (*Stdio) Stop

func (s *Stdio) Stop(ctx context.Context) error

Stop writes out the state if requested by StateInputFilename.

This function waits until IO is complete or was terminated via its context.

type Stroll

type Stroll struct {
	Msg     interface{} `json:"msg"`
	Walkeds interface{} `json:"walks"`
	Err     string      `json:"err,omitempty"`
}

Stroll is a internal processing data for the given message.

Result.Diag gathers this information.

type TimerEntry

type TimerEntry struct {
	Id  string
	Msg interface{}
	At  time.Time
	Ctl chan bool `json:"-"`
	// contains filtered or unexported fields
}

TimerEntry represents a pending timer.

type TimerMsg

type TimerMsg struct {

	// Add the given timer.
	Add struct {
		Id  string      `json:"id"`
		Msg interface{} `json:"msg"`
		In  string      `json:"in"`
		To  string      `json:"to"` // ToDo: Support array
	} `json:"makeTimer"`

	// Cancel the given timer.
	Cancel struct {
		Id string
	} `json:"cancelTimer"`
}

TimerMsg is a command that the timers machine can execute.

type Timers

type Timers struct {
	Map     map[string]*TimerEntry
	Emitter func(context.Context, *TimerEntry) `json:"-"`

	sync.Mutex
	// contains filtered or unexported fields
}

Timers represents pending timers.

func NewTimers

func NewTimers(emitter func(context.Context, *TimerEntry)) *Timers

NewTimers creates a Timers with the given function that the TimerEntries will use to emit their messages.

func (*Timers) Add

func (ts *Timers) Add(ctx context.Context, id string, msg interface{}, d time.Duration) error

Add creates a new Timer that will emit the given message later (if the timer isn't cancelled first).

func (*Timers) Cancel

func (ts *Timers) Cancel(ctx context.Context, id string) error

Cancel attepts to cancel the timer with the given id.

func (*Timers) Start

func (ts *Timers) Start(ctx context.Context) error

Start starts all known timers.

Call this method when your have just created a Timers with existing data.

func (*Timers) State

func (ts *Timers) State() *core.State

State creates a machine state that Timers.withMap can use.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL