eventhorizon

package module
v0.0.0-...-1759ba1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2016 License: Apache-2.0 Imports: 6 Imported by: 0

README

wercker status Coverage Status GoDoc

Event Horizon

Event Horizon is a CQRS/ES toolkit for Go.

Event Horizon is used in at least one production system but may not be considered stable just yet!

CQRS stands for Command Query Responsibility Segregation and is a technique where object access (the Query part) and modification (the Command part) are separated from each other. This helps in designing complex data models where the actions can be totally independent from the data output.

ES stands for Event Sourcing and is a technique where all events that have happened in a system are recorded, and all future actions are based on the events instead of a single data model. The main benefit of adding Event Sourcing is traceability of changes which can be used for example in audit logging. Additionally, "incorrect" events that happened in the past (for example due to a bug) can be edited which will make the current data "correct", as that is based on the events.

Read more about CQRS/ES from one of the major authors/contributors on the subject: http://codebetter.com/gregyoung/2010/02/16/cqrs-task-based-uis-event-sourcing-agh/

Other material on CQRS/ES:

Inspired by the following libraries/examples:

Suggestions are welcome!

Usage

See the example folder for a basic usage example to get you started.

Storage and messaging implementations

There are simple in memory implementations of all components in the toolkit (event store, read repository, event bus, command bus). Most of these are meant for testing and development, the command bus (and in some cases the event bus) could however fulfill the needs of a production system.

In addition there is MongoDB implementations of the event store and a simple read repository, and a Redis implementation of the event bus.

There is also experimental support for AWS DynamoDB as an event store. Support for a event bus using AWS SQS is also planned but not started.

License

Event Horizon is licensed under Apache License 2.0

http://www.apache.org/licenses/LICENSE-2.0

Documentation

Overview

Package eventhorizon is a CQRS/ES toolkit.

Index

Constants

This section is empty.

Variables

View Source
var ErrAggregateAlreadyRegistered = errors.New("aggregate is already registered")

ErrAggregateAlreadyRegistered is when an aggregate is already registered.

View Source
var ErrAggregateAlreadySet = errors.New("aggregate is already set")

ErrAggregateAlreadySet is when an aggregate is already registered for a command.

View Source
var ErrAggregateNotFound = errors.New("no aggregate for command")

ErrAggregateNotFound is when no aggregate can be found.

View Source
var ErrAggregateNotRegistered = errors.New("aggregate is not registered")

ErrAggregateNotRegistered is when an aggregate is not registered.

View Source
var ErrCouldNotSaveModel = errors.New("could not save model")

ErrCouldNotSaveModel is when a model could not be found.

View Source
var ErrHandlerAlreadySet = errors.New("handler is already set")

ErrHandlerAlreadySet is when a handler is already registered for a command.

View Source
var ErrHandlerNotFound = errors.New("no handlers for command")

ErrHandlerNotFound is when no handler can be found.

View Source
var ErrMismatchedEventType = errors.New("mismatched event type and aggregate type")

ErrMismatchedEventType occurs when loaded events from ID does not match aggregate type.

View Source
var ErrModelNotFound = errors.New("could not find model")

ErrModelNotFound is when a model could not be found.

View Source
var ErrNilEventStore = errors.New("event store is nil")

ErrNilEventStore is when a dispatcher is created with a nil event store.

View Source
var ErrNilRepository = errors.New("repository is nil")

ErrNilRepository is when a dispatcher is created with a nil repository.

View Source
var ErrNoEventStoreDefined = errors.New("no event store defined")

ErrNoEventStoreDefined is if no event store has been defined.

View Source
var ErrNoEventsFound = errors.New("could not find events")

ErrNoEventsFound is when no events are found.

View Source
var ErrNoEventsToAppend = errors.New("no events to append")

ErrNoEventsToAppend is when no events are available to append.

Functions

This section is empty.

Types

type Aggregate

type Aggregate interface {
	// AggregateID returns the id of the aggregate.
	AggregateID() UUID

	// AggregateType returns the type name of the aggregate.
	AggregateType() string

	// Version returns the version of the aggregate.
	Version() int

	// IncrementVersion increments the aggregate version.
	IncrementVersion()

	// HandleCommand handles a command and stores events.
	HandleCommand(Command) error

	// ApplyEvent applies an event to the aggregate by setting its values.
	ApplyEvent(events Event)

	// StoreEvent stores an event until as uncommitted.
	StoreEvent(Event)

	// GetUncommittedEvents gets all uncommitted events for storing.
	GetUncommittedEvents() []Event

	// ClearUncommittedEvents clears all uncommitted events after storing.
	ClearUncommittedEvents()
}

Aggregate is an interface representing a versioned data entity created from events. It receives commands and generates evens that are stored.

The aggregate is created/loaded and saved by the Repository inside the Dispatcher. A domain specific aggregate can either imlement the full interface, or more commonly embed *AggregateBase to take care of the common methods.

type AggregateBase

type AggregateBase struct {
	// contains filtered or unexported fields
}

AggregateBase is a CQRS aggregate base to embed in domain specific aggregates.

A typical aggregate example:

type UserAggregate struct {
    *eventhorizon.AggregateBase

    name string
}

The embedded aggregate is then initialized by the factory function in the callback repository.

func NewAggregateBase

func NewAggregateBase(id UUID) *AggregateBase

NewAggregateBase creates an aggregate.

func (*AggregateBase) AggregateID

func (a *AggregateBase) AggregateID() UUID

AggregateID returns the ID of the aggregate.

func (*AggregateBase) ClearUncommittedEvents

func (a *AggregateBase) ClearUncommittedEvents()

ClearUncommittedEvents clears all uncommitted events after storing.

func (*AggregateBase) GetUncommittedEvents

func (a *AggregateBase) GetUncommittedEvents() []Event

GetUncommittedEvents gets all uncommitted events for storing.

func (*AggregateBase) IncrementVersion

func (a *AggregateBase) IncrementVersion()

IncrementVersion increments the aggregate version.

func (*AggregateBase) StoreEvent

func (a *AggregateBase) StoreEvent(event Event)

StoreEvent stores an event until as uncommitted.

func (*AggregateBase) Version

func (a *AggregateBase) Version() int

Version returns the version of the aggregate.

type AggregateCommandHandler

type AggregateCommandHandler struct {
	// contains filtered or unexported fields
}

AggregateCommandHandler dispatches commands to registered aggregates.

The dispatch process is as follows: 1. The handler receives a command 2. An aggregate is created or rebuilt from previous events by the repository 3. The aggregate's command handler is called 4. The aggregate stores events in response to the command 5. The new events are stored in the event store by the repository 6. The events are published to the event bus when stored by the event store

func NewAggregateCommandHandler

func NewAggregateCommandHandler(repository Repository) (*AggregateCommandHandler, error)

NewAggregateCommandHandler creates a new AggregateCommandHandler.

func (*AggregateCommandHandler) HandleCommand

func (h *AggregateCommandHandler) HandleCommand(command Command) error

HandleCommand handles a command with the registered aggregate. Returns ErrAggregateNotFound if no aggregate could be found.

func (*AggregateCommandHandler) SetAggregate

func (h *AggregateCommandHandler) SetAggregate(aggregate Aggregate, command Command) error

SetAggregate sets an aggregate as handler for a command.

type AggregateRecord

type AggregateRecord interface {
	AggregateID() UUID
	Version() int
	EventRecords() []EventRecord
}

AggregateRecord is a stored record of an aggregate in form of its events.

type CallbackRepository

type CallbackRepository struct {
	// contains filtered or unexported fields
}

CallbackRepository is an aggregate repository using factory functions.

func NewCallbackRepository

func NewCallbackRepository(eventStore EventStore) (*CallbackRepository, error)

NewCallbackRepository creates a repository and associates it with an event store.

func (*CallbackRepository) Load

func (r *CallbackRepository) Load(aggregateType string, id UUID) (Aggregate, error)

Load loads an aggregate by creating it and applying all events.

func (*CallbackRepository) RegisterAggregate

func (r *CallbackRepository) RegisterAggregate(aggregate Aggregate, callback func(UUID) Aggregate) error

RegisterAggregate registers an aggregate factory for a type. The factory is used to create concrete aggregate types when loading from the database.

An example would be:

repository.RegisterAggregate(&Aggregate{}, func(id UUID) interface{} { return &Aggregate{id} })

func (*CallbackRepository) Save

func (r *CallbackRepository) Save(aggregate Aggregate) error

Save saves all uncommitted events from an aggregate.

type Command

type Command interface {
	AggregateID() UUID
	AggregateType() string
	CommandType() string
}

Command is a domain command that is sent to a Dispatcher.

A command name should 1) be in present tense and 2) contain the intent (MoveCustomer vs CorrectCustomerAddress).

The command should contain all the data needed when handling it as fields. These fields can take an optional "eh" tag, which adds properties. For now only "optional" is a valid tag: `eh:"optional"`.

type CommandBus

type CommandBus interface {
	// HandleCommand handles a command on the event bus.
	HandleCommand(Command) error
	// SetHandler registers a handler with a command.
	SetHandler(CommandHandler, Command) error
}

CommandBus is an interface defining an event bus for distributing events.

type CommandFieldError

type CommandFieldError struct {
	Field string
}

CommandFieldError is returned by Dispatch when a field is incorrect.

func (CommandFieldError) Error

func (c CommandFieldError) Error() string

type CommandHandler

type CommandHandler interface {
	HandleCommand(Command) error
}

CommandHandler is an interface that all handlers of commands should implement.

type Event

type Event interface {
	AggregateID() UUID
	AggregateType() string
	EventType() string
}

Event is a domain event describing a change that has happened to an aggregate.

An event name should 1) be in past tense and 2) contain the intent (CustomerMoved vs CustomerAddressCorrected).

The event should contain all the data needed when applying/handling it.

type EventBus

type EventBus interface {
	// PublishEvent publishes an event on the event bus.
	PublishEvent(Event)
	// AddHandler adds a handler for a specific local event.
	AddHandler(EventHandler, Event)
	// AddLocalHandler adds a handler for local events.
	AddLocalHandler(EventHandler)
	// AddGlobalHandler adds a handler for global (remote) events.
	AddGlobalHandler(EventHandler)
}

EventBus is an interface defining an event bus for distributing events.

type EventHandler

type EventHandler interface {
	// HandleEvent handles an event.
	HandleEvent(Event)
}

EventHandler is an interface that all handlers of events should implement.

type EventRecord

type EventRecord interface {
	Type() string
	Version() int
	Events() []Event
}

EventRecord is a single event record with timestamp

type EventStore

type EventStore interface {
	// Save appends all events in the event stream to the store.
	Save([]Event) error

	// Load loads all events for the aggregate id from the store.
	Load(UUID) ([]Event, error)
}

EventStore is an interface for an event sourcing event store.

type ReadRepository

type ReadRepository interface {
	// Save saves a read model with id to the repository.
	Save(UUID, interface{}) error

	// Find returns one read model with using an id.
	Find(UUID) (interface{}, error)

	// FindAll returns all read models in the repository.
	FindAll() ([]interface{}, error)

	// Remove removes a read model with id from the repository.
	Remove(UUID) error
}

ReadRepository is a storage for read models.

type Repository

type Repository interface {
	// Load loads an aggregate with a type and id.
	Load(string, UUID) (Aggregate, error)

	// Save saves an aggregets uncommitted events.
	Save(Aggregate) error
}

Repository is a repository responsible for loading and saving aggregates.

type UUID

type UUID string

UUID is a unique identifier, based on the UUID spec. It must be exactly 16 bytes long.

func NewUUID

func NewUUID() UUID

NewUUID creates a new UUID of type v4.

func ParseUUID

func ParseUUID(s string) (UUID, error)

ParseUUID parses a UUID from a string representation. ParseUUID creates a UUID object from given hex string representation. The function accepts UUID string in following formats:

ParseUUID("6ba7b814-9dad-11d1-80b4-00c04fd430c8")
ParseUUID("{6ba7b814-9dad-11d1-80b4-00c04fd430c8}")
ParseUUID("urn:uuid:6ba7b814-9dad-11d1-80b4-00c04fd430c8")

func (UUID) MarshalJSON

func (id UUID) MarshalJSON() ([]byte, error)

MarshalJSON turns UUID into a json.Marshaller.

func (UUID) String

func (id UUID) String() string

String implements the Stringer interface for UUID.

func (*UUID) UnmarshalJSON

func (id *UUID) UnmarshalJSON(data []byte) error

UnmarshalJSON turns *UUID into a json.Unmarshaller.

Directories

Path Synopsis
examples
mongodb
Package mongodb contains an example of a CQRS/ES app using the MongoDB adapter.
Package mongodb contains an example of a CQRS/ES app using the MongoDB adapter.
simple
Package delegation contains a simple runnable example of a CQRS/ES app.
Package delegation contains a simple runnable example of a CQRS/ES app.
messaging
storage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL