command

package
v0.4.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2023 License: Apache-2.0 Imports: 17 Imported by: 7

README

Commands

Package command defines and implements a command system for distributed applications. Communication between processes is accomplished using the underlying event system. Read the aggregate documentation before reading further.

The command system does not add any benefits to an application that does not consist of multiple services that need to dispatch commands to each other. Applications that do not need to dispatch commands to other services should call the "commands" directly on the aggregates instead.

Introduction

A command bus can dispatch and subscribe to commands:

package command

type Bus interface {
	Dispatch(context.Context, Command, ...DispatchOption) error

	Subscribe(ctx context.Context, names ...string) (
		<-chan Context,
		<-chan error,
		error,
	)
}

The cmdbus package provides the event-driven implementation of the command bus. Use the cmdbus.New constructor to create a command bus from an event bus:

package example

import (
	"github.com/modernice/goes/event"
	"github.com/modernice/goes/command/cmdbus"
)

func example(ebus event.Bus) {
	bus := cmdbus.New(ebus)
}

Read the documentation of Bus for information on how to use it.

Dispatch a command

Use the Bus.Dispatch() method to dispatch a command to a subscribed bus. Optionally provide the dispatch.Sync() option to synchronously dispatch the command (wait for the execution of the command before returning).

package example

import (
	"github.com/modernice/goes/command"
	"github.com/modernice/goes/command/dispatch"
)

func example(bus command.Bus) {
	cmd := command.New("foo", <some-payload>, <options>...).Any()

	if err := bus.Dispatch(context.TODO(), cmd); err != nil {
		panic(fmt.Errof("dispatch command: %w", err))
	}

	if err := bus.Dispatch(context.TODO(), cmd, dispatch.Sync()); err != nil {
		panic(fmt.Errof("dispatch and execute command: %w", err))
	}
}
Subscribe to commands

Use the Bus.Subscribe() method to subscribe to commands.

package example

import (
	"github.com/modernice/goes/command"
	"github.com/modernice/goes/helper/streams"
)

func example(bus command.Bus) {
	commands, errs, err := bus.Subscribe(context.TODO(), "foo", "bar", "baz")
	if err != nil {
		panic(fmt.Errorf("subscribe to commands: %w", err))
	}

	streams.ForEach(
		context.TODO(),
		func(ctx command.Context) {
			defer ctx.Finish(ctx)
			log.Printf("Received %q command.", ctx.Name())
		},
		func(err error) { log.Println(err) },
		commands,
		errs,
	)
}

Command handling

Standalone command handler

The command bus provides the low-level API for command communication between services. For the actual implementation of command handlers, this package provides a *Handler type that wraps a Bus to allow for a convenient setup of command handlers. *Handler also automatically calls ctx.Finish() after handling the command.

package example

func example(bus command.Bus) {
	h := command.NewHandler(bus)

	errs, err := h.Handle(
		context.TODO(),
		"foo",
		func(ctx command.Context) error {
			log.Printf("Handling %q command ...", ctx.Name())
			return nil
		},
	)
	// handle err

	for err := range errs {
		log.Printf("failed to handle %q command: %v", "foo", err)
	}
}
Aggregate-based command handler

For commands that act on aggregates, you can use the command handler provided by the handler package for a convenient command setup. Using this handler, you can to do the following:

package todo

import (
	"github.com/modernice/goes/aggregate"
	"github.com/modernice/goes/command"
	"github.com/modernice/goes/command/handler"
	"github.com/modernice/goes/event"
)

type List struct {
	*aggregate.Base
	*handler.BaseHandler

	Tasks []string
}

func NewList(id uuid.UUID) *List {
	list := &List{
		Base: aggregate.New("list", id),
		BaseHandler: handler.NewBase(),
	}

	event.ApplyWith(list, list.taskAdded, "task_added")
	event.ApplyWith(list, list.taskRemoved, "task_removed")

	command.ApplyWith(list, list.AddTask, "add_task")
	command.ApplyWith(list, list.RemoveTask, "remove_task")

	return list
}

func (l *List) AddTask(task string) error {
	aggregate.Next(l, "task_added", task)
	return nil
}

func (l *List) RemoveTask(task string) error {
	aggregate.Next(l, "task_removed", task)
	return nil
}

func (l *List) taskAdded(evt event.Of[string]) {
	l.Tasks = append(l.Tasks, evt.Data())
}

func (l *List) taskRemoved(evt event.Of[string]) {
	for i, task := range l.Tasks {
		if task == evt.Data() {
			l.Tasks = append(l.Tasks[:i], l.Tasks[i+1:]...)
			return
		}
	}
}

func example(bus command.Bus, repo aggregate.Repository) {
	h := handler.New(NewList, repo, bus)

	errs, err := h.Handle(context.TODO())
	// handle err

	for err := range errs {
		log.Printf("failed to handle %q command: %v", "list", err)
	}
}

Things to consider

Load-balancing

Do not provide a load-balanced event bus as the underlying event bus for the command bus implemented by the cmdbus package. This will result in broken communication between the command buses of the different services / service instances.

Long-running commands

Handling of commands is done synchronously for each received command within the standalone *Handler and aggregate-based *handler.Of command handlers. This means that while a command is handled, the command handler does not receive from the underlying event bus, which may cause the event bus to drop events, depending on the implementation. For example, the NATS event bus has a PullTimeout() option that specifies the timeout after which an event is dropped if it's not received.

For long-running commands, consider pushing the commands into a queue to avoid event losses:

package example

func example(bus command.Bus) {
	h := command.NewHandler(bus)

	queue := make(chan command.Context)

	enqueueErrors := h.MustHandle(context.TODO(), "foo", func(ctx command.Context) error {
		go func(){
			select {
			case <-ctx.Done():
			case queue <- ctx:
			}
		}()
		return nil
	})

	cmdErrors := make(chan error)

	go func(){
		defer close(cmdErrors)
		for {
			select {
			case <-ctx.Done():
				return
			case ctx := <-queue:
				var err error // handler error
				if err != nil {
					select {
					case <-ctx.Done():
						return
					case cmdErrors <- err:
					}
				}
			}
		}
	}()

	for err := range cmdErrors {
		log.Printf("failed to handle %q command: %v", "foo", err)
	}
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ApplyWith added in v0.1.2

func ApplyWith[Payload any](r Registerer, apply func(Payload) error, commandNames ...string)

ApplyWith registers the command applier for the given commands. When a command handler one of the given commands, it calls the provided apply function with the command payload to execute the command.

ApplyWith calls HandleWith under the hood to register the command handler.

func Handle

func Handle[P any](ctx context.Context, bus Bus, name string, handler func(Ctx[P]) error) (<-chan error, error)

Handle is a shortcut for

NewHandler(bus).Handle(ctx, name, handler)

func HandleWith added in v0.1.2

func HandleWith[Payload any](r Registerer, handler func(Ctx[Payload]) error, commandNames ...string)

HandleWith is an alias for RegisterHandler.

func MustHandle

func MustHandle[P any](ctx context.Context, bus Bus, name string, handler func(Ctx[P]) error) <-chan error

MustHandle is a shortcut for

NewHandler(bus).MustHandle(ctx, name, handler)

func NewRegistry added in v0.1.2

func NewRegistry(opts ...codec.Option) *codec.Registry

NewRegistry returns a new command registry for encoding and decoding of command payloads for transmission over a network.

func RegisterHandler added in v0.1.2

func RegisterHandler[Payload any](r Registerer, commandName string, handler func(Ctx[Payload]) error)

RegisterHandler registers the handler for the given command.

	type Foo struct {
		*aggregate.Base
		*handler.BaseHandler

		Foo string
		Bar string
		Baz string
	}

	type FooEvent { Foo string }
	type BarEvent { Bar string }
	type BazEvent { Bar string }

	func NewFoo(id uuid.UUID) *Foo  {
		foo := &Foo{
			Base: aggregate.New("foo", id),
         Handler: handler.NewBase(),
		}

	    // Register event appliers.
		event.ApplyWith(foo, foo.foo, "foo")
		event.ApplyWith(foo, foo.bar, "bar")
		event.ApplyWith(foo, foo.baz, "baz")

     // Register command handlers.
		command.HandleWith(foo, func(ctx command.Ctx[string]) {
			return foo.Foo(ctx.Payload())
		}, "foo")
		command.HandleWith(foo, func(ctx command.Ctx[string]) {
			return foo.Bar(ctx.Payload())
		}, "bar")
		command.HandleWith(foo, func(ctx command.Ctx[string]) {
			return foo.Baz(ctx.Payload())
		}, "baz")

		return foo
	}

	func (f *Foo) Foo(input string) error {
		aggregate.Next(f, "foo", FooEvent{Foo: input})
 }

	func (f *Foo) foo(e event.Of[FooEvent]) {
		f.Foo = e.Data().Foo
	}

	func (f *Foo) Bar(input string) error {
		aggregate.Next(f, "bar", BarEvent{Bar: input})
 }

	func (f *Foo) bar(e event.Of[BarEvent]) {
		f.Bar = e.Data().Bar
	}

	func (f *Foo) Baz(input string) error {
		aggregate.Next(f, "baz", BazEvent{Baz: input})
 }

	func (f *Foo) baz(e event.Of[BazEvent]) {
		f.Baz = e.Data().Baz
	}

Types

type Bus

type Bus interface {
	Dispatcher
	Subscriber
}

Bus is the pub-sub client for commands.

type Cmd

type Cmd[Payload any] struct {
	Data Data[Payload]
}

Cmd is the command implementation.

func Any added in v0.1.2

func Any[P any](cmd Of[P]) Cmd[any]

Any returns the command with its type paramter set to `any`.

func Cast added in v0.1.2

func Cast[To, From any](cmd Of[From]) Cmd[To]

Cast casts the payload of the given command to the given `To` type. If the payload is not of type `To`, Cast panics.

func New

func New[P any](name string, pl P, opts ...Option) Cmd[P]

New returns a new command with the given name and payload. A random UUID is generated and set as the command id.

func TryCast added in v0.1.2

func TryCast[To, From any](cmd Of[From]) (Cmd[To], bool)

TryCast tries to cast the payload of the given command to the given `To` type. If the payload is not of type `To`, false is returned.

func (Cmd[P]) Aggregate

func (cmd Cmd[P]) Aggregate() event.AggregateRef

Aggregate returns the aggregate that the command acts on.

func (Cmd[P]) Any added in v0.1.2

func (cmd Cmd[P]) Any() Cmd[any]

Any returns the command with its type paramter set to `any`.

func (Cmd[P]) Command added in v0.1.2

func (cmd Cmd[P]) Command() Of[P]

Command returns the command as an interface.

func (Cmd[P]) ID

func (cmd Cmd[P]) ID() uuid.UUID

ID returns the command id.

func (Cmd[P]) Name

func (cmd Cmd[P]) Name() string

Name returns the command name.

func (Cmd[P]) Payload

func (cmd Cmd[P]) Payload() P

Payload returns the command payload.

type CodedError added in v0.3.0

type CodedError[Code constraints.Integer] interface {
	Code() Code
}

CodedError is an error with an error code.

type Command

type Command = Of[any]

Command is a command with arbitrary payload.

type Context

type Context = Ctx[any]

Context is the context of a dispatched command with an arbitrary payload.

type ContextOption added in v0.1.2

type ContextOption func(*options)

ContextOption is a Context option.

func WhenDone added in v0.1.2

func WhenDone(fn func(context.Context, finish.Config) error) ContextOption

WhenDone returns an Option that calls the provided function when the Finish() method of the context is called.

type Ctx added in v0.1.2

type Ctx[P any] interface {
	context.Context
	Of[P]

	// AggregateID returns the id of the aggregate that the command is linked to,
	// or uuid.Nil if no aggregate was linked to this command.
	AggregateID() uuid.UUID

	// AggregateName returns the name of the aggregate that the command is linked to,
	// or "" if no aggregate was linked to this command.
	AggregateName() string

	// Finish should be called after the command has been handled to notify the
	// dispatcher bus about the execution result. If a command was dispatched
	// synchronously, Finish must be called; otherwise the dispatcher bus will
	// never return.
	Finish(context.Context, ...finish.Option) error
}

Ctx is the context of a dispatched command with a specific payload type.

func CastContext added in v0.1.2

func CastContext[To, From any](ctx Ctx[From]) Ctx[To]

CastContext casts the payload of the given context to the given `To` type. If the payload is not a `To`, CastContext panics.

func NewContext added in v0.1.2

func NewContext[P any](base context.Context, cmd Of[P], opts ...ContextOption) Ctx[P]

NewContext returns a context for the given command.

func TryCastContext added in v0.1.2

func TryCastContext[To, From any](ctx Ctx[From]) (Ctx[To], bool)

TryCastContext tries to cast the payload of the given context to the given `To` type. If the payload is not a `To`, TryCastContext returns false.

type Data

type Data[Payload any] struct {
	ID            uuid.UUID
	Name          string
	Payload       Payload
	AggregateName string
	AggregateID   uuid.UUID
}

Data contains the fields of a Cmd.

type DetailedError added in v0.3.0

type DetailedError interface {
	Details() []*ErrDetail
}

DetailedError is an error with details.

type DispatchConfig

type DispatchConfig struct {
	// A synchronous dispatch waits for the execution of the Command to finish
	// and returns the execution error if there was any.
	//
	// A dispatch is automatically made synchronous when Repoter is non-nil.
	Synchronous bool

	// If Reporter is not nil, the Bus will report the execution result of a
	// Command to Reporter by calling Reporter.Report().
	//
	// A non-nil Reporter makes the dispatch synchronous.
	Reporter Reporter
}

Config is the configuration for dispatching a command.

type DispatchOption

type DispatchOption func(*DispatchConfig)

DispatchOption is an option for dispatching commands.

type Dispatcher added in v0.1.2

type Dispatcher interface {
	// Dispatch dispatches the provided command to subscribers of the command.
	//
	// Depending on the implementation, the command may be dispatched to a
	// single or multiple subscribers. The implementation provided by the
	// `cmdbus` package ensures that a command is dispatched to at most one
	// subscriber.
	Dispatch(context.Context, Command, ...DispatchOption) error
}

A Dispatcher dispatches commands to subscribed handlers.

type Err added in v0.3.0

type Err[Code constraints.Integer] struct {
	// contains filtered or unexported fields
}

Err wraps an error with an error code and optional details.

Err is used to transmit errors from the handler back to the dispatcher, including the error code and optional details (e.g. localized messages).

To create an *Err, call NewError or Error. NewError accepts the error code and the underlying error, and returns a new *Err. If the underlying error implements DetailedError, the details of the underlying error will be applied to the returned *Err.

Error tries to convert an [error] to an *Err. If the error is already an *Err, it is returned as is. Otherwise, it first extracts the error code from the error, then calls NewError with the error code and error. If the provided error does not implement CodedError, the error code is set to 0.

*Err implements CodedError and DetailedError.

func Error added in v0.3.0

func Error[Code constraints.Integer](err error) *Err[Code]

Error converts the error to an *Err. If the error is already an *Err, it is returned as is. Otherwise, it first extracts the error code from the error, then calls NewError with the error code and error. If the provided error does not satisfy `errors.As(err, new(CodedError))`, the error code is set to 0.

func NewError added in v0.3.0

func NewError[Code constraints.Integer](code Code, underlying error, opts ...ErrorOption) *Err[Code]

NewError creates a new *Err with the provided error code and underlying error. If the underlying error implements DetailedError, the details of the underlying error will be applied to the returned *Err.

func (*Err[Code]) Code added in v0.3.0

func (err *Err[Code]) Code() Code

Code returns the error code.

func (*Err[Code]) Details added in v0.3.0

func (err *Err[Code]) Details() []*ErrDetail

Details returns the details of the error.

func (*Err[Code]) Error added in v0.3.0

func (err *Err[Code]) Error() string

Error implements [error]. It returns the underlying error's message if it is not nil, otherwise it returns a string representation of the error code formatted as:

fmt.Sprintf("<ERROR CODE %d>", code)

func (*Err[Code]) Localized added in v0.3.0

func (err *Err[Code]) Localized(locale string) string

Localized returns the localized message for the given locale.

func (*Err[Code]) Underlying added in v0.3.0

func (err *Err[Code]) Underlying() error

Underlying returns the underlying error.

func (*Err[Code]) Unwrap added in v0.3.0

func (err *Err[Code]) Unwrap() error

Unwrap returns the underlying error.

func (*Err[Code]) WithDetails added in v0.3.0

func (err *Err[Code]) WithDetails(details ...*ErrDetail) *Err[Code]

WithDetails returns a new *Err with the provided details appended to the details of the original error. The returned error will have the same error code as the original error but will not be the same instance.

type ErrDetail added in v0.3.0

type ErrDetail struct {
	// contains filtered or unexported fields
}

ErrDetail is an error detail. A detail can be an arbitrary protobuf message.

To create an *ErrDetail, call NewErrorDetail:

d, err := NewErrorDetail(&errdetails.LocalizedMessage{
	Locale:  "en",
	Message: "hello",
})

You can also use the predefined constructors (e.g. LocalizeError):

d := LocalizeError("en", "hello")

func LocalizeError added in v0.3.0

func LocalizeError(locale, msg string) *ErrDetail

LocalizeError creates a new *ErrDetail that contains the provided localized error message. The message is encoded as a *errdetails.LocalizedMessage. The *Err that contains this detail will return the provided message when calling Err.Localized with the same locale.

func NewErrorDetail added in v0.3.0

func NewErrorDetail(msg proto.Message) (*ErrDetail, error)

NewErrorDetail creates a new *ErrDetail from the provided protobuf message. If the provided message is not already an *anypb.Any, it is wrapped in a new *anypb.Any.

func (*ErrDetail) AsAny added in v0.3.0

func (detail *ErrDetail) AsAny() *anypb.Any

AsAny returns the underlying *anypb.Any that contains the detail as a protobuf message.

func (*ErrDetail) UnmarshalNew added in v0.3.0

func (detail *ErrDetail) UnmarshalNew() (proto.Message, error)

UnmarshalNew unmarshals the detail as a protobuf message. This is similar to [Value], but it always unmarshals as a new message, instead of returning the cached value.

func (*ErrDetail) Value added in v0.3.0

func (detail *ErrDetail) Value() (proto.Message, error)

Value returns the detail as a protobuf message. Multiple calls to [Value] will return the same value.

type ErrorOption added in v0.3.0

type ErrorOption func(*errorOptions)

ErrorOption is an option for creating a command error.

func WithErrorDetails added in v0.3.0

func WithErrorDetails(details ...*ErrDetail) ErrorOption

WithErrorDetails adds details to the error.

type Handler

type Handler[P any] struct {
	// contains filtered or unexported fields
}

Handler wraps a Bus to provide a convenient way to subscribe to and handle commands.

func NewHandler

func NewHandler[P any](bus Bus) *Handler[P]

NewHandler wraps the provided Bus in a *Handler.

func (*Handler[P]) Handle

func (h *Handler[P]) Handle(ctx context.Context, name string, handler func(Ctx[P]) error) (<-chan error, error)

Handle registers the provided function as a handler for the given command. Handle subscribes to the command over the underlying Bus. The command.Context returned by the Bus is passed to the provided handler function. Afterwards, the `Finish` method of the command.Context is called by *Handler to report the execution result of the command.

Handle returns a channel of asynchronous errors. Users are responsible for receiving the errors from the channel, to avoid blocking. Errors that are sent into the channel are

  • all asynchronous errors from the underlying Bus
  • all errors returned by the provided handler function
  • errors returned by the `Finish` method of command.Context

When ctx is canceled, the returned error channel is closed.

func (*Handler[P]) MustHandle

func (h *Handler[P]) MustHandle(ctx context.Context, name string, handler func(Ctx[P]) error) <-chan error

MustHandle does the same as Handle, but panics if the command subscription fails.

type Handlers added in v0.2.2

type Handlers map[string]func(Context) error

Handlers is a map of event names to command handlers. Handlers can be embedded into structs to implement Registerer.

func (Handlers) CommandHandler added in v0.2.2

func (h Handlers) CommandHandler(commandName string) func(Context) error

CommandHandlers returns the handlers for the given command.

func (Handlers) CommandNames added in v0.2.4

func (h Handlers) CommandNames() []string

CommandNames returns the registered command names.

func (Handlers) HandleCommand added in v0.2.2

func (h Handlers) HandleCommand(ctx Context) error

HandleEvent calls the registered handler of the given Command.

func (Handlers) RegisterCommandHandler added in v0.2.2

func (h Handlers) RegisterCommandHandler(commandName string, handler func(Context) error)

RegisterCommandHandler implements Registerer.

type LocalizedMessage added in v0.3.0

type LocalizedMessage interface {
	GetLocale() string
	GetMessage() string
}

LocalizedMessage is a localized error message. This interface is implemented by *errdetails.LocalizedMessage. You can create a localized message using LocalizeError:

var msg LocalizedMessage = &errdetails.LocalizedMessage{
	Locale:  "en",
	Message: "hello",
}

type Of added in v0.1.2

type Of[Payload any] interface {
	// ID returns the command id.
	ID() uuid.UUID

	// Name returns the command name.
	Name() string

	// Payload returns the command payload.
	Payload() Payload

	// Aggregate returns the aggregate this command acts on.
	Aggregate() event.AggregateRef
}

Of is a command with the given specific payload type. A command has a unique id, a name, and a user-provided payload. A command can optionally provide the aggregate that it acts on.

type Option

type Option func(*Cmd[any])

Option is an option for creating a command.

func Aggregate

func Aggregate(name string, id uuid.UUID) Option

Aggregate returns an Option that links a command to an aggregate.

func ID

func ID(id uuid.UUID) Option

ID returns an Option that overrides the auto-generated UUID of a command.

type Registerer added in v0.1.2

type Registerer interface {
	// RegisterCommandHandler registers a command handler for the given command name.
	RegisterCommandHandler(commandName string, handler func(Context) error)
}

A Registerer is a type that can register handlers for different commands.

type Reporter

type Reporter interface {
	Report(report.Report)
}

A Reporter reports execution results of a Command.

type Subscriber added in v0.1.2

type Subscriber interface {
	// Subscribe subscribes to the given commands and returns two channels –
	// a Context channel and an error channel. When a command is dispatched to
	// this subscriber, a new Context is sent into the Context channel. Context
	// is a context.Context that additionally provides the dispatched command data.
	//
	//	var bus command.Bus
	//	// Subscribe to "foo" and "bar" commands.
	//	res, errs, err := bus.Subscribe(context.TODO(), "foo", "bar")
	//	// handle err
	//	for ctx := range res {
	//		log.Printf(
	//			"Handling %q command ... [aggregate_name: %q, aggregate_id: %q]",
	//			ctx.Name(), ctx.AggregateName(), ctx.AggregateID(),
	//		)
	//
	//	    log.Printf("Payload:\n%v", ctx.Payload())
	//	}
	Subscribe(ctx context.Context, names ...string) (<-chan Context, <-chan error, error)
}

A Subscriber subscribes to commands that are dispatched by a Dispatcher.

Directories

Path Synopsis
Package cmdbus provides a distributed & event-driven Command Bus.
Package cmdbus provides a distributed & event-driven Command Bus.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL