Documentation ¶
Overview ¶
Package saga provides tools for building stateful message handlers.
Index ¶
- func MarshalData(d Data) (contentType string, data []byte, err error)
- type Aggregate
- func (a *Aggregate) ApplyEvent(d Data, env ax.Envelope)
- func (a *Aggregate) HandleMessage(ctx context.Context, s ax.Sender, mctx ax.MessageContext, i Instance) (err error)
- func (a *Aggregate) MessageTypes() (tr ax.MessageTypeSet, mt ax.MessageTypeSet)
- func (a *Aggregate) NewData() Data
- func (a *Aggregate) PersistenceKey() string
- type Applier
- type CompletableByData
- type CompletableData
- type Data
- type ErrorIfNotFound
- type EventedSaga
- type IgnoreNotFound
- type Instance
- type InstanceID
- type InstancesNeverComplete
- type Mapper
- type MessageHandler
- type Persister
- type Revision
- type Saga
- type UnitOfWork
- type Workflow
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Aggregate ¶
type Aggregate struct { ErrorIfNotFound CompletableByData Prototype Data Triggers ax.MessageTypeSet Handle typeswitch.Switch Apply typeswitch.Switch }
Aggregate is a Saga for implementing application-defined domain aggregates.
func NewAggregate ¶
NewAggregate returns a new aggregate saga.
Aggregates are a specialization of sagas that handle commands and produce events.
It accepts a prototype data instance which is cloned for new instances.
For each command type to be handled, the aggregate's data struct must implement a "handler" method that adheres to one of the following signatures:
func (cmd *<T>, rec ax.EventRecorder) func (cmd *<T>, mctx ax.MessageContext, rec ax.EventRecorder)
Where T is a struct type that implements ax.Command.
Handler methods are responsible for producing new events based on the command being handled. They may inspect the current state of the aggregate, and then record zero or more events using rec. Handlers should never mutate the aggregate state.
The names of handler methods are meaningful. Each handler method's name must begin with "Do". By convention these prefixes are followed by the message name, such as:
func (*BankAccount) DoCreditAccount(*messages.CreditAccount, ax.EventRecorder)
For each of the event types passed to rec, the aggregate must implement an "applier" method that adheres to one of the following signatures:
func (ev *T) func (ev *T, mctx ax.MessageContext)
Where T is a struct type that implements ax.Event.
Applier methods are responsible for mutating the aggregate state. The applier is called every time an event is recorded, *and* when loading an event-sourced aggregate from the message store.
The names of handler methods are meaningful. Each handler method's name must begin with "When". By convention these prefixes are followed by the message name, such as:
func (*BankAccount) WhenAccountCredited(*messages.AccountCredited)
func (*Aggregate) ApplyEvent ¶
ApplyEvent updates d to reflect the fact that an event has occurred.
It may panic if env.Message does not implement ax.Event.
func (*Aggregate) HandleMessage ¶
func (a *Aggregate) HandleMessage( ctx context.Context, s ax.Sender, mctx ax.MessageContext, i Instance, ) (err error)
HandleMessage handles a message for a particular saga instance.
func (*Aggregate) MessageTypes ¶
func (a *Aggregate) MessageTypes() (tr ax.MessageTypeSet, mt ax.MessageTypeSet)
MessageTypes returns the set of messages that are routed to this saga.
tr is the set of "trigger" messages. If they can not be routed to an existing saga instance a new instance is created.
mt is the set of messages that are only routed to existing instances. If they can not be routed to an existing instance, the HandleNotFound() method is called instead.
func (*Aggregate) NewData ¶
NewData returns a pointer to a new zero-value instance of the saga's data type.
func (*Aggregate) PersistenceKey ¶
PersistenceKey returns a unique identifier for the saga.
The persistence key is used to relate persisted data with the saga implementation that owns it. Persistence keys should not be changed once a saga has active instances.
type Applier ¶
type Applier struct { Saga EventedSaga Data Data Next ax.Sender }
Applier is an implementation of ax.Sender that applies published events to saga data for evented sagas.
type CompletableByData ¶
type CompletableByData struct{}
CompletableByData is an embeddable struct that implements a Saga.IsInstanceComplete() method that forwards the completion check on to a CompletableData value.
func (CompletableByData) IsInstanceComplete ¶
IsInstanceComplete returns true if i.Data implements CompletableData and i.Data.IsInstanceComplete() returns true.
type CompletableData ¶
type CompletableData interface { Data // IsInstanceComplete returns true if the data describes a "completed" // instance. IsInstanceComplete() bool }
CompletableData is an interface for application-defined saga data that can be queried as to whether the saga instance is "complete".
type Data ¶
type Data interface { proto.Message // InstanceDescription returns a human-readable description of the saga // instance. // // Assume that the description will be used inside log messages or displayed // in audit logs. // // Follow the same conventions as for error messages: // https://github.com/golang/go/wiki/CodeReviewComments#error-strings InstanceDescription() string }
Data is an interface for application-defined data associated with a saga instance.
type ErrorIfNotFound ¶
type ErrorIfNotFound struct{}
ErrorIfNotFound is an embeddable struct that implements a Saga.HandleNotFound() method that always returns an error.
func (ErrorIfNotFound) HandleNotFound ¶
func (ErrorIfNotFound) HandleNotFound(context.Context, ax.Sender, ax.MessageContext) error
HandleNotFound always returns an error.
type EventedSaga ¶
type EventedSaga interface { Saga // ApplyEvent updates d to reflect the fact that an event has occurred. // // It may panic if env.Message does not implement ax.Event. ApplyEvent(d Data, env ax.Envelope) }
EventedSaga is a saga that only mutates its data when an event occurs.
CRUD sagas may be evented or non-evented, but eventsourced sagas are always evented.
Implementors should take care not to mutate the saga data directly inside the saga HandleMessage() method, only in ApplyEvent().
type IgnoreNotFound ¶
type IgnoreNotFound struct{}
IgnoreNotFound is an embeddable struct that implements a Saga.HandleNotFound() method that is a no-op.
func (IgnoreNotFound) HandleNotFound ¶
func (IgnoreNotFound) HandleNotFound(context.Context, ax.Sender, ax.MessageContext) error
HandleNotFound always returns nil.
type Instance ¶
type Instance struct { // InstanceID is a globally unique identifier for the saga instance. InstanceID InstanceID // Data is the application-defined data associated with this instance. Data Data // Revision is the version of the instance that the Data field reflects. // A value of zero indicates that the instance has not yet been persisted. Revision Revision }
Instance is an instance of a saga.
It encapsulates the application-defined saga data and its meta-data.
type InstanceID ¶
InstanceID uniquely identifies a saga instance.
func GenerateInstanceID ¶
func GenerateInstanceID() InstanceID
GenerateInstanceID generates a new unique identifier for a saga instance.
func MustParseInstanceID ¶
func MustParseInstanceID(s string) InstanceID
MustParseInstanceID parses s into a saga instance ID and returns it. It panics if s is empty.
func ParseInstanceID ¶
func ParseInstanceID(s string) (InstanceID, error)
ParseInstanceID parses s into a saga instance ID and returns it. It returns an error if s is empty.
type InstancesNeverComplete ¶
type InstancesNeverComplete struct{}
InstancesNeverComplete is an embeddable struct that implements a Saga.IsInstanceComplete() method that always returns false.
func (InstancesNeverComplete) IsInstanceComplete ¶
IsInstanceComplete always returns false.
type Mapper ¶
type Mapper interface { // MapMessageToInstance returns the ID of the saga instance that is the // target of the given message. // // It returns false if the message should be ignored. MapMessageToInstance( ctx context.Context, sg Saga, tx persistence.Tx, env ax.Envelope, ) (InstanceID, bool, error) // UpdateMapping notifies the mapper that an instance has been modified, // allowing it to update it's mapping information, if necessary. UpdateMapping( ctx context.Context, sg Saga, tx persistence.Tx, i Instance, ) error // DeleteMapping notifies the mapper that an instance has been completed, // allowing it to remove it's mapping information, if necessary. DeleteMapping( ctx context.Context, sg Saga, tx persistence.Tx, i Instance, ) error }
Mapper is an interface for mapping inbound messages to their target saga instance.
type MessageHandler ¶
MessageHandler is an implementation of routing.MessageHandler that loads a saga instance, forwards the message to the saga, then perists any changes to the instance.
func (*MessageHandler) HandleMessage ¶
func (h *MessageHandler) HandleMessage(ctx context.Context, s ax.Sender, mctx ax.MessageContext) error
HandleMessage loads a saga instance, passes env to the saga to be handled, and saves the changes to the saga instance.
Changes to the saga are persisted within the existing transaction in ctx, if present.
func (*MessageHandler) MessageTypes ¶
func (h *MessageHandler) MessageTypes() ax.MessageTypeSet
MessageTypes returns the set of messages that the handler can handle.
For sagas, this is the union of the message types that trigger new instances and the message types that are routed to existing instances.
type Persister ¶
type Persister interface { // BeginUnitOfWork starts a new unit-of-work that modifies a saga instance. // // If the saga instance does not exist, it returns a UnitOfWork with an // instance at revision zero. BeginUnitOfWork( ctx context.Context, sg Saga, tx persistence.Tx, s ax.Sender, id InstanceID, ) (UnitOfWork, error) }
Persister is an interface for loading saga instances, and persisting the changes that occur to them.
type Revision ¶
type Revision uint64
Revision is a one-based version of a saga instance. An instance with a revision of zero has not yet been persisted.
type Saga ¶
type Saga interface { // PersistenceKey returns a unique identifier for the saga. // // The persistence key is used to relate persisted data with the saga // implementation that owns it. Persistence keys should not be changed once // a saga has active instances. PersistenceKey() string // MessageTypes returns the set of messages that are routed to this saga. // // tr is the set of "trigger" messages. If they can not be routed to an // existing saga instance a new instance is created. // // mt is the set of messages that are only routed to existing instances. If // they can not be routed to an existing instance, the HandleNotFound() // method is called instead. MessageTypes() (tr ax.MessageTypeSet, mt ax.MessageTypeSet) // NewData returns a pointer to a new zero-value instance of the // saga's data type. NewData() Data // HandleMessage handles a message for a particular saga instance. HandleMessage(context.Context, ax.Sender, ax.MessageContext, Instance) error // HandleNotFound handles a message that is intended for a saga instance // that could not be found. HandleNotFound(context.Context, ax.Sender, ax.MessageContext) error // IsInstanceComplete returns true if the given instance is complete. IsInstanceComplete(ctx context.Context, i Instance) (bool, error) }
A Saga is a stateful message handler.
They are typically used to model "long-running" business processes. They are the foundation on which aggregates and workflows are built.
Each saga can have multiple instances, represented by the saga.Instance struct. Each instance has associated application-defined data, represented by the saga.Data interface.
For each saga, an inbound message is always routed to one saga instance.
Saga instances are persisted using an implementation of the Repository interface, which is typically provided by a specific persistence implementation.
type UnitOfWork ¶
type UnitOfWork interface { // Sender returns the ax.Sender that the saga must use to send messages. // This allows the persister to capture new messages if necessary. Sender() ax.Sender // Instance returns the saga instance that the unit-of-work applies to. Instance() Instance // Save persists changes to the instance. // // It returns true if any changes have occurred. // On success, the Instance().Revision is updated to match the new revision. Save(ctx context.Context) (bool, error) // SaveAndComplete persists changes to a completed instance. // // The precise behavior is implementation defined. Typically meta-data about // the instance is discarded. The implementation may completely remove any // record of the instance. // // On success, the Instance().Revision is updated to match the revision // produced by the save. SaveAndComplete(ctx context.Context) error // Close is called when the unit-of-work has ended, regardless of whether // Save() has been called. Close() }
UnitOfWork encapsulates the logic for persisting changes to an instance.
type Workflow ¶
type Workflow struct { ErrorIfNotFound CompletableByData Prototype Data Triggers ax.MessageTypeSet NonTriggers ax.MessageTypeSet HandleCommand typeswitch.Switch HandleEvent typeswitch.Switch }
Workflow is a Saga for implementing application-defined workflows.
func NewWorkflow ¶
NewWorkflow returns a saga that forwards to the given aggregate.
Workflows are a specialization of sagas that handle commands and/or events and produce commands.
It accepts a prototype data instance which is cloned for new instances.
For each message type to be handled, the aggregate must implement a "handler" method that adheres to one of the following signatures:
func (m *<T>, ax.CommandExecutor) func (m *<T>, mctx ax.MessageContext, ax.CommandExecutor)
Where T is a struct type that implements ax.Message.
Handler methods are responsible for mutating the state of the workflow and producing new commands, based on the message being handled.
The names of handler methods are meaningful to the workflow system. If a message is meant to trigger a new workflow instance, its handler method's name must prefixed with "Begin", if it is a command handler, or "BeginWhen" if it is an event handler. Messages that can be routed to existing workflow instances, but not cause new instances must have their method names prefixed with "Do" and "When" for commands and events, respectively.
By convention these prefixes are followed by the message name, such as:
// workflow-triggering command handler func (*BankTransferWorkflow) BeginDebitAccount( *messages.DebitAccount, ax.CommandExecutor, ) // non-triggering command handler func (*BankTransferWorkflow) DoDebitAccount( *messages.DebitAccount, ax.CommandExecutor, ) // workflow-triggering event handler func (*BankTransferWorkflow) BeginWhenAccountDebited( *messages.AccountDebited, ax.CommandExecutor, ) // non-triggering event handler func (*BankTransferWorkflow) WhenAccountDebited( *messages.AccountDebited, ax.CommandExecutor, )
func (*Workflow) HandleMessage ¶
func (w *Workflow) HandleMessage( ctx context.Context, s ax.Sender, mctx ax.MessageContext, i Instance, ) error
HandleMessage handles a message for a particular saga instance.
func (*Workflow) MessageTypes ¶
func (w *Workflow) MessageTypes() (tr ax.MessageTypeSet, mt ax.MessageTypeSet)
MessageTypes returns the set of messages that are routed to this saga.
tr is the set of "trigger" messages. If they can not be routed to an existing saga instance a new instance is created.
mt is the set of messages that are only routed to existing instances. If they can not be routed to an existing instance, the HandleNotFound() method is called instead.
func (*Workflow) NewData ¶
NewData returns a pointer to a new zero-value instance of the saga's data type.
func (*Workflow) PersistenceKey ¶
PersistenceKey returns a unique identifier for the saga.
The persistence key is used to relate persisted data with the saga implementation that owns it. Persistence keys should not be changed once a saga has active instances.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package mapping contains subpackages that implement various message-to-saga mapping strategies.
|
Package mapping contains subpackages that implement various message-to-saga mapping strategies. |
direct
Package direct provides a saga mapping strategy that maps messages to saga instances by having the saga implement a method that returns the instance ID directly.
|
Package direct provides a saga mapping strategy that maps messages to saga instances by having the saga implement a method that returns the instance ID directly. |
keyset
Package keyset provides a saga mapping strategy that maps messages to instances by looking up which instance is associated with a key derived from the message.
|
Package keyset provides a saga mapping strategy that maps messages to instances by looking up which instance is associated with a key derived from the message. |
Package persistence contains subpackages that implement various saga persistence strategies.
|
Package persistence contains subpackages that implement various saga persistence strategies. |
crud
Package crud provides an implementation of saga.Persister that persists saga instances using "CRUD" semantics.
|
Package crud provides an implementation of saga.Persister that persists saga instances using "CRUD" semantics. |
eventsourcing
Package eventsourcing provides an implementation of saga.Persister that persists saga instances as a stream of events with optional snapshots.
|
Package eventsourcing provides an implementation of saga.Persister that persists saga instances as a stream of events with optional snapshots. |