saga

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2022 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package saga provides tools for building stateful message handlers.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MarshalData

func MarshalData(d Data) (contentType string, data []byte, err error)

MarshalData marshals d to a binary representation.

Types

type Aggregate

type Aggregate struct {
	ErrorIfNotFound
	CompletableByData

	Prototype Data
	Triggers  ax.MessageTypeSet
	Handle    typeswitch.Switch
	Apply     typeswitch.Switch
}

Aggregate is a Saga for implementing application-defined domain aggregates.

func NewAggregate

func NewAggregate(p Data) *Aggregate

NewAggregate returns a new aggregate saga.

Aggregates are a specialization of sagas that handle commands and produce events.

It accepts a prototype data instance which is cloned for new instances.

For each command type to be handled, the aggregate's data struct must implement a "handler" method that adheres to one of the following signatures:

func (cmd *<T>, rec ax.EventRecorder)
func (cmd *<T>, mctx ax.MessageContext, rec ax.EventRecorder)

Where T is a struct type that implements ax.Command.

Handler methods are responsible for producing new events based on the command being handled. They may inspect the current state of the aggregate, and then record zero or more events using rec. Handlers should never mutate the aggregate state.

The names of handler methods are meaningful. Each handler method's name must begin with "Do". By convention these prefixes are followed by the message name, such as:

func (*BankAccount) DoCreditAccount(*messages.CreditAccount, ax.EventRecorder)

For each of the event types passed to rec, the aggregate must implement an "applier" method that adheres to one of the following signatures:

func (ev *T)
func (ev *T, mctx ax.MessageContext)

Where T is a struct type that implements ax.Event.

Applier methods are responsible for mutating the aggregate state. The applier is called every time an event is recorded, *and* when loading an event-sourced aggregate from the message store.

The names of handler methods are meaningful. Each handler method's name must begin with "When". By convention these prefixes are followed by the message name, such as:

func (*BankAccount) WhenAccountCredited(*messages.AccountCredited)

func (*Aggregate) ApplyEvent

func (a *Aggregate) ApplyEvent(d Data, env ax.Envelope)

ApplyEvent updates d to reflect the fact that an event has occurred.

It may panic if env.Message does not implement ax.Event.

func (*Aggregate) HandleMessage

func (a *Aggregate) HandleMessage(
	ctx context.Context,
	s ax.Sender,
	mctx ax.MessageContext,
	i Instance,
) (err error)

HandleMessage handles a message for a particular saga instance.

func (*Aggregate) MessageTypes

func (a *Aggregate) MessageTypes() (tr ax.MessageTypeSet, mt ax.MessageTypeSet)

MessageTypes returns the set of messages that are routed to this saga.

tr is the set of "trigger" messages. If they can not be routed to an existing saga instance a new instance is created.

mt is the set of messages that are only routed to existing instances. If they can not be routed to an existing instance, the HandleNotFound() method is called instead.

func (*Aggregate) NewData

func (a *Aggregate) NewData() Data

NewData returns a pointer to a new zero-value instance of the saga's data type.

func (*Aggregate) PersistenceKey

func (a *Aggregate) PersistenceKey() string

PersistenceKey returns a unique identifier for the saga.

The persistence key is used to relate persisted data with the saga implementation that owns it. Persistence keys should not be changed once a saga has active instances.

type Applier

type Applier struct {
	Saga EventedSaga
	Data Data

	Next ax.Sender
}

Applier is an implementation of ax.Sender that applies published events to saga data for evented sagas.

func (*Applier) ExecuteCommand

func (s *Applier) ExecuteCommand(
	ctx context.Context,
	m ax.Command,
	opts ...ax.ExecuteOption,
) (ax.Envelope, error)

ExecuteCommand sends a command message.

If ctx contains a message envelope, m is sent as a child of the message in that envelope.

func (*Applier) PublishEvent

func (s *Applier) PublishEvent(
	ctx context.Context,
	m ax.Event,
	opts ...ax.PublishOption,
) (ax.Envelope, error)

PublishEvent sends an event message.

If ctx contains a message envelope, m is sent as a child of the message in that envelope.

type CompletableByData

type CompletableByData struct{}

CompletableByData is an embeddable struct that implements a Saga.IsInstanceComplete() method that forwards the completion check on to a CompletableData value.

func (CompletableByData) IsInstanceComplete

func (CompletableByData) IsInstanceComplete(_ context.Context, i Instance) (bool, error)

IsInstanceComplete returns true if i.Data implements CompletableData and i.Data.IsInstanceComplete() returns true.

type CompletableData

type CompletableData interface {
	Data

	// IsInstanceComplete returns true if the data describes a "completed"
	// instance.
	IsInstanceComplete() bool
}

CompletableData is an interface for application-defined saga data that can be queried as to whether the saga instance is "complete".

type Data

type Data interface {
	proto.Message

	// InstanceDescription returns a human-readable description of the saga
	// instance.
	//
	// Assume that the description will be used inside log messages or displayed
	// in audit logs.
	//
	// Follow the same conventions as for error messages:
	// https://github.com/golang/go/wiki/CodeReviewComments#error-strings
	InstanceDescription() string
}

Data is an interface for application-defined data associated with a saga instance.

func UnmarshalData

func UnmarshalData(ct string, data []byte) (Data, error)

UnmarshalData unmarshals a saga instance from some serialized representation. ct is the MIME content-type for the binary data.

type ErrorIfNotFound

type ErrorIfNotFound struct{}

ErrorIfNotFound is an embeddable struct that implements a Saga.HandleNotFound() method that always returns an error.

func (ErrorIfNotFound) HandleNotFound

HandleNotFound always returns an error.

type EventedSaga

type EventedSaga interface {
	Saga

	// ApplyEvent updates d to reflect the fact that an event has occurred.
	//
	// It may panic if env.Message does not implement ax.Event.
	ApplyEvent(d Data, env ax.Envelope)
}

EventedSaga is a saga that only mutates its data when an event occurs.

CRUD sagas may be evented or non-evented, but eventsourced sagas are always evented.

Implementors should take care not to mutate the saga data directly inside the saga HandleMessage() method, only in ApplyEvent().

type IgnoreNotFound

type IgnoreNotFound struct{}

IgnoreNotFound is an embeddable struct that implements a Saga.HandleNotFound() method that is a no-op.

func (IgnoreNotFound) HandleNotFound

HandleNotFound always returns nil.

type Instance

type Instance struct {
	// InstanceID is a globally unique identifier for the saga instance.
	InstanceID InstanceID

	// Data is the application-defined data associated with this instance.
	Data Data

	// Revision is the version of the instance that the Data field reflects.
	// A value of zero indicates that the instance has not yet been persisted.
	Revision Revision
}

Instance is an instance of a saga.

It encapsulates the application-defined saga data and its meta-data.

type InstanceID

type InstanceID struct {
	ident.ID
}

InstanceID uniquely identifies a saga instance.

func GenerateInstanceID

func GenerateInstanceID() InstanceID

GenerateInstanceID generates a new unique identifier for a saga instance.

func MustParseInstanceID

func MustParseInstanceID(s string) InstanceID

MustParseInstanceID parses s into a saga instance ID and returns it. It panics if s is empty.

func ParseInstanceID

func ParseInstanceID(s string) (InstanceID, error)

ParseInstanceID parses s into a saga instance ID and returns it. It returns an error if s is empty.

type InstancesNeverComplete

type InstancesNeverComplete struct{}

InstancesNeverComplete is an embeddable struct that implements a Saga.IsInstanceComplete() method that always returns false.

func (InstancesNeverComplete) IsInstanceComplete

func (InstancesNeverComplete) IsInstanceComplete(context.Context, Instance) (bool, error)

IsInstanceComplete always returns false.

type Mapper

type Mapper interface {
	// MapMessageToInstance returns the ID of the saga instance that is the
	// target of the given message.
	//
	// It returns false if the message should be ignored.
	MapMessageToInstance(
		ctx context.Context,
		sg Saga,
		tx persistence.Tx,
		env ax.Envelope,
	) (InstanceID, bool, error)

	// UpdateMapping notifies the mapper that an instance has been modified,
	// allowing it to update it's mapping information, if necessary.
	UpdateMapping(
		ctx context.Context,
		sg Saga,
		tx persistence.Tx,
		i Instance,
	) error

	// DeleteMapping notifies the mapper that an instance has been completed,
	// allowing it to remove it's mapping information, if necessary.
	DeleteMapping(
		ctx context.Context,
		sg Saga,
		tx persistence.Tx,
		i Instance,
	) error
}

Mapper is an interface for mapping inbound messages to their target saga instance.

type MessageHandler

type MessageHandler struct {
	Saga      Saga
	Mapper    Mapper
	Persister Persister
}

MessageHandler is an implementation of routing.MessageHandler that loads a saga instance, forwards the message to the saga, then perists any changes to the instance.

func (*MessageHandler) HandleMessage

func (h *MessageHandler) HandleMessage(ctx context.Context, s ax.Sender, mctx ax.MessageContext) error

HandleMessage loads a saga instance, passes env to the saga to be handled, and saves the changes to the saga instance.

Changes to the saga are persisted within the existing transaction in ctx, if present.

func (*MessageHandler) MessageTypes

func (h *MessageHandler) MessageTypes() ax.MessageTypeSet

MessageTypes returns the set of messages that the handler can handle.

For sagas, this is the union of the message types that trigger new instances and the message types that are routed to existing instances.

type Persister

type Persister interface {
	// BeginUnitOfWork starts a new unit-of-work that modifies a saga instance.
	//
	// If the saga instance does not exist, it returns a UnitOfWork with an
	// instance at revision zero.
	BeginUnitOfWork(
		ctx context.Context,
		sg Saga,
		tx persistence.Tx,
		s ax.Sender,
		id InstanceID,
	) (UnitOfWork, error)
}

Persister is an interface for loading saga instances, and persisting the changes that occur to them.

type Revision

type Revision uint64

Revision is a one-based version of a saga instance. An instance with a revision of zero has not yet been persisted.

type Saga

type Saga interface {
	// PersistenceKey returns a unique identifier for the saga.
	//
	// The persistence key is used to relate persisted data with the saga
	// implementation that owns it. Persistence keys should not be changed once
	// a saga has active instances.
	PersistenceKey() string

	// MessageTypes returns the set of messages that are routed to this saga.
	//
	// tr is the set of "trigger" messages. If they can not be routed to an
	// existing saga instance a new instance is created.
	//
	// mt is the set of messages that are only routed to existing instances. If
	// they can not be routed to an existing instance, the HandleNotFound()
	// method is called instead.
	MessageTypes() (tr ax.MessageTypeSet, mt ax.MessageTypeSet)

	// NewData returns a pointer to a new zero-value instance of the
	// saga's data type.
	NewData() Data

	// HandleMessage handles a message for a particular saga instance.
	HandleMessage(context.Context, ax.Sender, ax.MessageContext, Instance) error

	// HandleNotFound handles a message that is intended for a saga instance
	// that could not be found.
	HandleNotFound(context.Context, ax.Sender, ax.MessageContext) error

	// IsInstanceComplete returns true if the given instance is complete.
	IsInstanceComplete(ctx context.Context, i Instance) (bool, error)
}

A Saga is a stateful message handler.

They are typically used to model "long-running" business processes. They are the foundation on which aggregates and workflows are built.

Each saga can have multiple instances, represented by the saga.Instance struct. Each instance has associated application-defined data, represented by the saga.Data interface.

For each saga, an inbound message is always routed to one saga instance.

Saga instances are persisted using an implementation of the Repository interface, which is typically provided by a specific persistence implementation.

type UnitOfWork

type UnitOfWork interface {
	// Sender returns the ax.Sender that the saga must use to send messages.
	// This allows the persister to capture new messages if necessary.
	Sender() ax.Sender

	// Instance returns the saga instance that the unit-of-work applies to.
	Instance() Instance

	// Save persists changes to the instance.
	//
	// It returns true if any changes have occurred.
	// On success, the Instance().Revision is updated to match the new revision.
	Save(ctx context.Context) (bool, error)

	// SaveAndComplete persists changes to a completed instance.
	//
	// The precise behavior is implementation defined. Typically meta-data about
	// the instance is discarded. The implementation may completely remove any
	// record of the instance.
	//
	// On success, the Instance().Revision is updated to match the revision
	// produced by the save.
	SaveAndComplete(ctx context.Context) error

	// Close is called when the unit-of-work has ended, regardless of whether
	// Save() has been called.
	Close()
}

UnitOfWork encapsulates the logic for persisting changes to an instance.

type Workflow

type Workflow struct {
	ErrorIfNotFound
	CompletableByData

	Prototype     Data
	Triggers      ax.MessageTypeSet
	NonTriggers   ax.MessageTypeSet
	HandleCommand typeswitch.Switch
	HandleEvent   typeswitch.Switch
}

Workflow is a Saga for implementing application-defined workflows.

func NewWorkflow

func NewWorkflow(p Data) *Workflow

NewWorkflow returns a saga that forwards to the given aggregate.

Workflows are a specialization of sagas that handle commands and/or events and produce commands.

It accepts a prototype data instance which is cloned for new instances.

For each message type to be handled, the aggregate must implement a "handler" method that adheres to one of the following signatures:

func (m *<T>, ax.CommandExecutor)
func (m *<T>, mctx ax.MessageContext, ax.CommandExecutor)

Where T is a struct type that implements ax.Message.

Handler methods are responsible for mutating the state of the workflow and producing new commands, based on the message being handled.

The names of handler methods are meaningful to the workflow system. If a message is meant to trigger a new workflow instance, its handler method's name must prefixed with "Begin", if it is a command handler, or "BeginWhen" if it is an event handler. Messages that can be routed to existing workflow instances, but not cause new instances must have their method names prefixed with "Do" and "When" for commands and events, respectively.

By convention these prefixes are followed by the message name, such as:

// workflow-triggering command handler
func (*BankTransferWorkflow) BeginDebitAccount(
        *messages.DebitAccount,
        ax.CommandExecutor,
    )

// non-triggering command handler
func (*BankTransferWorkflow) DoDebitAccount(
        *messages.DebitAccount,
        ax.CommandExecutor,
)

// workflow-triggering event handler
func (*BankTransferWorkflow) BeginWhenAccountDebited(
        *messages.AccountDebited,
        ax.CommandExecutor,
)

// non-triggering event handler
func (*BankTransferWorkflow) WhenAccountDebited(
        *messages.AccountDebited,
        ax.CommandExecutor,
)

func (*Workflow) HandleMessage

func (w *Workflow) HandleMessage(
	ctx context.Context,
	s ax.Sender,
	mctx ax.MessageContext,
	i Instance,
) error

HandleMessage handles a message for a particular saga instance.

func (*Workflow) MessageTypes

func (w *Workflow) MessageTypes() (tr ax.MessageTypeSet, mt ax.MessageTypeSet)

MessageTypes returns the set of messages that are routed to this saga.

tr is the set of "trigger" messages. If they can not be routed to an existing saga instance a new instance is created.

mt is the set of messages that are only routed to existing instances. If they can not be routed to an existing instance, the HandleNotFound() method is called instead.

func (*Workflow) NewData

func (w *Workflow) NewData() Data

NewData returns a pointer to a new zero-value instance of the saga's data type.

func (*Workflow) PersistenceKey

func (w *Workflow) PersistenceKey() string

PersistenceKey returns a unique identifier for the saga.

The persistence key is used to relate persisted data with the saga implementation that owns it. Persistence keys should not be changed once a saga has active instances.

Directories

Path Synopsis
Package mapping contains subpackages that implement various message-to-saga mapping strategies.
Package mapping contains subpackages that implement various message-to-saga mapping strategies.
direct
Package direct provides a saga mapping strategy that maps messages to saga instances by having the saga implement a method that returns the instance ID directly.
Package direct provides a saga mapping strategy that maps messages to saga instances by having the saga implement a method that returns the instance ID directly.
keyset
Package keyset provides a saga mapping strategy that maps messages to instances by looking up which instance is associated with a key derived from the message.
Package keyset provides a saga mapping strategy that maps messages to instances by looking up which instance is associated with a key derived from the message.
Package persistence contains subpackages that implement various saga persistence strategies.
Package persistence contains subpackages that implement various saga persistence strategies.
crud
Package crud provides an implementation of saga.Persister that persists saga instances using "CRUD" semantics.
Package crud provides an implementation of saga.Persister that persists saga instances using "CRUD" semantics.
eventsourcing
Package eventsourcing provides an implementation of saga.Persister that persists saga instances as a stream of events with optional snapshots.
Package eventsourcing provides an implementation of saga.Persister that persists saga instances as a stream of events with optional snapshots.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL