eventhorizon

package module
v0.0.0-...-12e3172 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2015 License: Apache-2.0 Imports: 16 Imported by: 0

README

wercker status Coverage Status GoDoc

Event Horizon

Event Horizon is a CQRS/ES toolkit for Go.

Event Horizon is used in at least one production system but may not be considered stable just yet!

CQRS stands for Command Query Responsibility Segregation and is a technique where object access (the Query part) and modification (the Command part) are separated from each other. This helps in designing complex data models where the actions can be totally independent from the data output.

ES stands for Event Sourcing and is a technique where all events that have happened in a system are recorded, and all future actions are based on the events instead of a single data model. The main benefit of adding Event Sourcing is tracability of changes which can be used for example in audit logging. Additionally, "incorrect" events that happened in the past (for example due to a bug) can be edited which will make the current data "correct", as that is based on the events.

Read more about CQRS/ES from one of the major authors/contributors on the subject: http://codebetter.com/gregyoung/2010/02/16/cqrs-task-based-uis-event-sourcing-agh/

Other material on CQRS/ES:

Inspired by the following libraries/examples:

Suggestions are welcome!

Usage

See the example folder for a basic usage example to get you started.

Changes

2015-01-20

Addded CommandBus that routes commands to handlers. This is for upcoming Saga support. The dispatcher is now renamed to AggregateCommandHandler and must be added to the CommandBus. At the moment Commands have to registered both in the handler and on the bus, this may change in the future.

Added MongoDB ReadRepository implementation. Use with "-tags mongo", same as the MongoDB event store.

2015-01-14

Added Repository that creates/loads and saves aggregates. This needed additional methods in the Aggregate interface.

Removed the reflection based dispatcher, the code was worse performing and harder to test. There was also a bit too much magic going on. If you would like it back open an issue for further discussion.

Renamed Repository to ReadRepository to better adhere to CQRS standards and to free the name to a Aggregate/Saga repository in development.

2015-01-12

Added an EventStore implementation for MongoDB. It currently uses one document per aggregate with all events as an array to make the most out of MongoDBs lack of trasactions. It still takes two operations when adding events but at least there is a check that the version has not been changed by another operation in between. If you want to use the MongoDB event store add "-tags mongo" to your project build.

2015-01-07

As of this version commands and events are recommended to be passed around as pointers, instead of values as the previous versions did. Passing as values may still work, but is not tested at the momemnt. It should not requrie much changes in applications using Event Horizon, simple pass all commands and events with & before them or create them as *XXXCommand, see the examples and tests for usage. There are also some other API changes to method names, mostly with using "handler" as a more common term.

License

Event Horizon is licensed under Apache License 2.0

http://www.apache.org/licenses/LICENSE-2.0

Documentation

Overview

Package eventhorizon is a CQRS/ES toolkit.

Index

Constants

This section is empty.

Variables

View Source
var ErrAggregateAlreadyRegistered = errors.New("aggregate is already registered")

Error returned when an aggregate is already registered.

View Source
var ErrAggregateAlreadySet = errors.New("aggregate is already set")

Error returned when an aggregate is already registered for a command.

View Source
var ErrAggregateNotFound = errors.New("no aggregate for command")

Error returned when no aggregate can be found.

View Source
var ErrAggregateNotRegistered = errors.New("aggregate is not registered")

Error returned when an aggregate is not registered.

View Source
var ErrCouldNotClearDB = errors.New("could not clear database")

ErrCouldNotClearDB returned when the database could not be cleared.

View Source
var ErrCouldNotCreateTables = errors.New("could not create tables")

ErrCouldNotCreateTables returned when necessary tables could not be created.

View Source
var ErrCouldNotDialDB = errors.New("could not dial database")

ErrCouldNotDialDB returned when the database could not be dialed.

View Source
var ErrCouldNotLoadAggregate = errors.New("could not load aggregate")

ErrCouldNotLoadAggregate returned when an aggregate could not be loaded.

View Source
var ErrCouldNotMarshalEvent = errors.New("could not marshal event")

ErrCouldNotMarshalEvent returned when an event could not be marshaled into BSON.

View Source
var ErrCouldNotSaveAggregate = errors.New("could not save aggregate")

ErrCouldNotSaveAggregate returned when an aggregate could not be saved.

View Source
var ErrCouldNotSaveEvent = errors.New("could not save event")

ErrCouldNotSaveEvent returned when an event could not be saved.

View Source
var ErrCouldNotSaveModel = errors.New("could not save model")

Error returned when a model could not be found.

View Source
var ErrCouldNotUnmarshalEvent = errors.New("could not unmarshal event")

ErrCouldNotUnmarshalEvent returned when an event could not be unmarshaled into a concrete type.

View Source
var ErrEventNotRegistered = errors.New("event not registered")

ErrEventNotRegistered returned when an event is not registered.

View Source
var ErrHandlerAlreadySet = errors.New("handler is already set")

ErrHandlerAlreadySet returned when a handler is already registered for a command.

View Source
var ErrHandlerNotFound = errors.New("no handlers for command")

ErrHandlerNotFound returned when no handler can be found.

View Source
var ErrInvalidEvent = errors.New("invalid event")

ErrInvalidEvent returned when an event does not implement the Event interface.

View Source
var ErrModelNotFound = errors.New("could not find model")

Error returned when a model could not be found.

View Source
var ErrModelNotSet = errors.New("model not set")

ErrModelNotSet returned when an model is not set on a read repository.

View Source
var ErrNilEventStore = errors.New("event store is nil")

Error returned when a dispatcher is created with a nil event store.

View Source
var ErrNilRepository = errors.New("repository is nil")

Error returned when a dispatcher is created with a nil repository.

View Source
var ErrNoDBSession = errors.New("no database session")

ErrNoDBSession returned when no database session is set.

View Source
var ErrNoEventStoreDefined = errors.New("no event store defined")

ErrNoEventStoreDefined returned if no event store has been defined.

View Source
var ErrNoEventsFound = errors.New("could not find events")

ErrNoEventsFound returned when no events are found.

View Source
var ErrNoEventsToAppend = errors.New("no events to append")

ErrNoEventsToAppend returned when no events are available to append.

Functions

This section is empty.

Types

type Aggregate

type Aggregate interface {
	// AggregateID returns the id of the aggregate.
	AggregateID() string

	// AggregateType returns the type name of the aggregate.
	AggregateType() string

	// Version returns the version of the aggregate.
	Version() int

	// IncrementVersion increments the aggregate version.
	IncrementVersion()

	// HandleCommand handles a command and stores events.
	HandleCommand(Command) error

	// ApplyEvent applies an event to the aggregate by setting its values.
	ApplyEvent(events Event)

	// StoreEvent stores an event until as uncommitted.
	StoreEvent(Event)

	// GetUncommittedEvents gets all uncommitted events for storing.
	GetUncommittedEvents() []Event

	// ClearUncommittedEvents clears all uncommitted events after storing.
	ClearUncommittedEvents()
}

Aggregate is an interface representing a versioned data entity created from events. It receives commands and generates evens that are stored.

The aggregate is created/loaded and saved by the Repository inside the Dispatcher. A domain specific aggregate can either imlement the full interface, or more commonly embed *AggregateBase to take care of the common methods.

type AggregateBase

type AggregateBase struct {
	// contains filtered or unexported fields
}

AggregateBase is a CQRS aggregate base to embed in domain specific aggregates.

A typical aggregate example:

type UserAggregate struct {
    *eventhorizon.AggregateBase

    name string
}

The embedded aggregate is then initialized by the factory function in the callback repository.

func NewAggregateBase

func NewAggregateBase(id string) *AggregateBase

NewAggregateBase creates an aggregate.

func (*AggregateBase) AggregateID

func (a *AggregateBase) AggregateID() string

AggregateID returns the ID of the aggregate.

func (*AggregateBase) ClearUncommittedEvents

func (a *AggregateBase) ClearUncommittedEvents()

ClearUncommittedEvents clears all uncommitted events after storing.

func (*AggregateBase) GetUncommittedEvents

func (a *AggregateBase) GetUncommittedEvents() []Event

GetUncommittedEvents gets all uncommitted events for storing.

func (*AggregateBase) IncrementVersion

func (a *AggregateBase) IncrementVersion()

IncrementVersion increments the aggregate version.

func (*AggregateBase) StoreEvent

func (a *AggregateBase) StoreEvent(event Event)

StoreEvent stores an event until as uncommitted.

func (*AggregateBase) Version

func (a *AggregateBase) Version() int

Version returns the version of the aggregate.

type AggregateCommandHandler

type AggregateCommandHandler struct {
	// contains filtered or unexported fields
}

AggregateCommandHandler dispatches commands to registered aggregates.

The dispatch process is as follows: 1. The handler receives a command 2. An aggregate is created or rebuilt from previous events by the repository 3. The aggregate's command handler is called 4. The aggregate stores events in response to the command 5. The new events are stored in the event store by the repository 6. The events are published to the event bus when stored by the event store

func NewAggregateCommandHandler

func NewAggregateCommandHandler(repository Repository) (*AggregateCommandHandler, error)

NewAggregateCommandHandler creates a new AggregateCommandHandler.

func (*AggregateCommandHandler) HandleCommand

func (h *AggregateCommandHandler) HandleCommand(command Command) error

HandleCommand handles a command with the registered aggregate. Returns ErrAggregateNotFound if no aggregate could be found.

func (*AggregateCommandHandler) SetAggregate

func (h *AggregateCommandHandler) SetAggregate(aggregate Aggregate, command Command) error

SetAggregate sets an aggregate as handler for a command.

type AggregateRecord

type AggregateRecord interface {
	AggregateID() string
	Version() int
	EventRecords() []EventRecord
}

AggregateRecord is a stored record of an aggregate in form of its events.

type CallbackRepository

type CallbackRepository struct {
	// contains filtered or unexported fields
}

CallbackRepository is an aggregate repository using factory functions.

func NewCallbackRepository

func NewCallbackRepository(eventStore EventStore) (*CallbackRepository, error)

NewCallbackRepository creates a repository and associates it with an event store.

func (*CallbackRepository) Load

func (r *CallbackRepository) Load(aggregateType string, id string) (Aggregate, error)

Load loads an aggregate by creating it and applying all events.

func (*CallbackRepository) RegisterAggregate

func (r *CallbackRepository) RegisterAggregate(aggregate Aggregate, callback func(string) Aggregate) error

RegisterAggregate registers an aggregate factory for a type. The factory is used to create concrete aggregate types when loading from the database.

An example would be:

repository.RegisterAggregate(&Aggregate{}, func(id UUID) interface{} { return &Aggregate{id} })

func (*CallbackRepository) Save

func (r *CallbackRepository) Save(aggregate Aggregate) error

Save saves all uncommitted events from an aggregate.

type Command

type Command interface {
	AggregateID() string
	AggregateType() string
	CommandType() string
}

Command is a domain command that is sent to a Dispatcher.

A command name should 1) be in present tense and 2) contain the intent (MoveCustomer vs CorrectCustomerAddress).

The command should contain all the data needed when handling it as fields. These fields can take an optional "eh" tag, which adds properties. For now only "optional" is a valid tag: `eh:"optional"`.

type CommandBus

type CommandBus interface {
	// PublishCommand publishes a command on the command bus.
	PublishCommand(Command) error
	// HandleCommand handles a command on the command bus.
	HandleCommand(Command) error
	// SetHandler registers a handler with a command.
	SetHandler(CommandHandler, Command) error
}

CommandBus is an interface defining an event bus for distributing events.

func NewInternalCommandBus

func NewInternalCommandBus() CommandBus

NewInternalCommandBus creates a InternalCommandBus.

type CommandFieldError

type CommandFieldError struct {
	Field string
}

CommandFieldError is returned by Dispatch when a field is incorrect.

func (CommandFieldError) Error

func (c CommandFieldError) Error() string

type CommandHandler

type CommandHandler interface {
	HandleCommand(Command) error
}

CommandHandler is an interface that all handlers of commands should implement.

type Event

type Event interface {
	AggregateID() string
	AggregateType() string
	EventType() string
}

Event is a domain event describing a change that has happened to an aggregate.

An event name should 1) be in past tense and 2) contain the intent (CustomerMoved vs CustomerAddressCorrected).

The event should contain all the data needed when applying/handling it.

type EventBus

type EventBus interface {
	// PublishEvent publishes an event on the event bus.
	PublishEvent(Event)
	// AddHandler adds a handler for a specific local event.
	AddHandler(EventHandler, Event)
	// AddLocalHandler adds a handler for local events.
	AddLocalHandler(EventHandler)
	// AddGlobalHandler adds a handler for global (remote) events.
	AddGlobalHandler(EventHandler)
}

EventBus is an interface defining an event bus for distributing events.

type EventHandler

type EventHandler interface {
	// HandleEvent handles an event.
	HandleEvent(Event)
}

EventHandler is an interface that all handlers of events should implement.

type EventRecord

type EventRecord interface {
	Type() string
	Version() int
	Events() []Event
}

EventRecord is a single event record with timestamp

type EventStore

type EventStore interface {
	// Save appends all events in the event stream to the store.
	Save([]Event) error

	// Load loads all events for the aggregate id from the store.
	Load(string) ([]Event, error)
}

EventStore is an interface for an event sourcing event store.

type InternalCommandBus

type InternalCommandBus struct {
	// contains filtered or unexported fields
}

InternalCommandBus is a command bus that handles commands with the registered CommandHandlers

func (*InternalCommandBus) HandleCommand

func (b *InternalCommandBus) HandleCommand(command Command) error

HandleCommand handles a command with a handler capable of handling it.

func (*InternalCommandBus) PublishCommand

func (b *InternalCommandBus) PublishCommand(command Command) error

PublishCommand publishes a command to the internal command bus.

func (*InternalCommandBus) SetHandler

func (b *InternalCommandBus) SetHandler(handler CommandHandler, command Command) error

SetHandler adds a handler for a specific command.

type InternalEventBus

type InternalEventBus struct {
	// contains filtered or unexported fields
}

InternalEventBus is an event bus that notifies registered EventHandlers of published events.

func NewInternalEventBus

func NewInternalEventBus() *InternalEventBus

NewInternalEventBus creates a InternalEventBus.

func (*InternalEventBus) AddGlobalHandler

func (b *InternalEventBus) AddGlobalHandler(handler EventHandler)

AddGlobalHandler adds a handler for global (remote) events.

func (*InternalEventBus) AddHandler

func (b *InternalEventBus) AddHandler(handler EventHandler, event Event)

AddHandler adds a handler for a specific local event.

func (*InternalEventBus) AddLocalHandler

func (b *InternalEventBus) AddLocalHandler(handler EventHandler)

AddLocalHandler adds a handler for local events.

func (*InternalEventBus) PublishEvent

func (b *InternalEventBus) PublishEvent(event Event)

PublishEvent publishes an event to all handlers capable of handling it.

type MemoryEventStore

type MemoryEventStore struct {
	// contains filtered or unexported fields
}

MemoryEventStore implements EventStore as an in memory structure.

func NewMemoryEventStore

func NewMemoryEventStore(eventBus EventBus) *MemoryEventStore

NewMemoryEventStore creates a new MemoryEventStore.

func (*MemoryEventStore) Close

func (s *MemoryEventStore) Close() error

Close closes the store.

func (*MemoryEventStore) Load

func (s *MemoryEventStore) Load(id string) ([]Event, error)

Load loads all events for the aggregate id from the memory store. Returns ErrNoEventsFound if no events can be found.

func (*MemoryEventStore) Save

func (s *MemoryEventStore) Save(events []Event) error

Save appends all events in the event stream to the memory store.

type MemoryReadRepository

type MemoryReadRepository struct {
	// contains filtered or unexported fields
}

MemoryReadRepository implements an in memory repository of read models.

func NewMemoryReadRepository

func NewMemoryReadRepository() *MemoryReadRepository

NewMemoryReadRepository creates a new MemoryReadRepository.

func (*MemoryReadRepository) Find

func (r *MemoryReadRepository) Find(id string) (interface{}, error)

Find returns one read model with using an id. Returns ErrModelNotFound if no model could be found.

func (*MemoryReadRepository) FindAll

func (r *MemoryReadRepository) FindAll() ([]interface{}, error)

FindAll returns all read models in the repository.

func (*MemoryReadRepository) Remove

func (r *MemoryReadRepository) Remove(id string) error

Remove removes a read model with id from the repository. Returns ErrModelNotFound if no model could be found.

func (*MemoryReadRepository) Save

func (r *MemoryReadRepository) Save(id string, model interface{}) error

Save saves a read model with id to the repository.

type MongoEventStore

type MongoEventStore struct {
	// contains filtered or unexported fields
}

MongoEventStore implements an EventStore for MongoDB.

func NewMongoEventStore

func NewMongoEventStore(eventBus EventBus, url, database string) (*MongoEventStore, error)

NewMongoEventStore creates a new MongoEventStore.

func NewMongoEventStoreWithSession

func NewMongoEventStoreWithSession(eventBus EventBus, session *mgo.Session, database string) (*MongoEventStore, error)

NewMongoEventStoreWithSession creates a new MongoEventStore with a session.

func (*MongoEventStore) Clear

func (s *MongoEventStore) Clear() error

Clear clears the event storge.

func (*MongoEventStore) Close

func (s *MongoEventStore) Close() error

Close closes the database session.

func (*MongoEventStore) Load

func (s *MongoEventStore) Load(id string) ([]Event, error)

Load loads all events for the aggregate id from the database. Returns ErrNoEventsFound if no events can be found.

func (*MongoEventStore) RegisterEventType

func (s *MongoEventStore) RegisterEventType(event Event, factory func() Event) error

RegisterEventType registers an event factory for a event type. The factory is used to create concrete event types when loading from the database.

An example would be:

eventStore.RegisterEventType(&MyEvent{}, func() Event { return &MyEvent{} })

func (*MongoEventStore) Save

func (s *MongoEventStore) Save(events []Event) error

Save appends all events in the event stream to the database.

func (*MongoEventStore) SetDB

func (s *MongoEventStore) SetDB(db string)

SetDB sets the database session.

type MongoReadRepository

type MongoReadRepository struct {
	// contains filtered or unexported fields
}

MongoReadRepository implements an MongoDB repository of read models.

func NewMongoReadRepository

func NewMongoReadRepository(url, database, collection string) (*MongoReadRepository, error)

NewMongoReadRepository creates a new MongoReadRepository.

func NewMongoReadRepositoryWithSession

func NewMongoReadRepositoryWithSession(session *mgo.Session, database, collection string) (*MongoReadRepository, error)

NewMongoReadRepositoryWithSession creates a new MongoReadRepository with a session.

func (*MongoReadRepository) Clear

func (r *MongoReadRepository) Clear() error

Clear clears the read model database.

func (*MongoReadRepository) Close

func (r *MongoReadRepository) Close() error

Close closes a database session.

func (*MongoReadRepository) Find

func (r *MongoReadRepository) Find(id string) (interface{}, error)

Find returns one read model with using an id. Returns ErrModelNotFound if no model could be found.

func (*MongoReadRepository) FindAll

func (r *MongoReadRepository) FindAll() ([]interface{}, error)

FindAll returns all read models in the repository.

func (*MongoReadRepository) FindCustom

func (r *MongoReadRepository) FindCustom(callback func(*mgo.Collection) *mgo.Query) ([]interface{}, error)

FindCustom uses a callback to specify a custom query.

func (*MongoReadRepository) Remove

func (r *MongoReadRepository) Remove(id string) error

Remove removes a read model with id from the repository. Returns ErrModelNotFound if no model could be found.

func (*MongoReadRepository) Save

func (r *MongoReadRepository) Save(id string, model interface{}) error

Save saves a read model with id to the repository.

func (*MongoReadRepository) SetDB

func (r *MongoReadRepository) SetDB(db string)

SetDB sets the database session and database.

func (*MongoReadRepository) SetModel

func (r *MongoReadRepository) SetModel(factory func() interface{})

SetModel sets a factory function that creates concrete model types.

type PostgresEventStore

type PostgresEventStore struct {
	// contains filtered or unexported fields
}

PostgresEventStore implements an EventStore for Postgres.

func NewPostgresEventStore

func NewPostgresEventStore(eventBus EventBus, conn string) (*PostgresEventStore, error)

NewPostgresEventStore creates a new PostgresEventStore.

func (*PostgresEventStore) Clear

func (s *PostgresEventStore) Clear() error

Clear clears the postgres storage.

func (*PostgresEventStore) Close

func (s *PostgresEventStore) Close() error

Close closes the postgres db connection.

func (*PostgresEventStore) Load

func (s *PostgresEventStore) Load(id string) ([]Event, error)

Load loads all events for the aggregate id from the store.

func (*PostgresEventStore) RegisterEventType

func (s *PostgresEventStore) RegisterEventType(event Event, factory func() Event) error

RegisterEventType registers an event factory for a event type. The factory is used to create concrete event types when loading from the database.

An example would be:

eventStore.RegisterEventType(&MyEvent{}, func() Event { return &MyEvent{} })

func (*PostgresEventStore) Save

func (s *PostgresEventStore) Save(events []Event) error

Save appends all events in the event stream to the store.

type PostgresReadRepository

type PostgresReadRepository struct {
	// contains filtered or unexported fields
}

PostgresReadRepository implements an Postgres repository of read models.

func NewPostgresReadRepository

func NewPostgresReadRepository(conn, table string) (*PostgresReadRepository, error)

NewPostgresReadRepository creates a new PostgresReadRepository.

func (*PostgresReadRepository) Clear

func (r *PostgresReadRepository) Clear() error

Clear clears the read model table.

func (*PostgresReadRepository) Close

func (r *PostgresReadRepository) Close() error

Close closes the postgres db connection.

func (*PostgresReadRepository) Find

func (r *PostgresReadRepository) Find(id string) (interface{}, error)

Find returns one read model with using an id.

func (*PostgresReadRepository) FindAll

func (r *PostgresReadRepository) FindAll() ([]interface{}, error)

FindAll returns all read models in the repository.

func (*PostgresReadRepository) Remove

func (r *PostgresReadRepository) Remove(id string) error

Remove removes a read model with id from the repository.

func (*PostgresReadRepository) Save

func (r *PostgresReadRepository) Save(id string, model interface{}) error

Save saves a read model with id to the repository.

func (*PostgresReadRepository) SetModel

func (r *PostgresReadRepository) SetModel(factory func() interface{})

SetModel sets a factory function that creates concrete model types.

type RabbitMQCommandBus

type RabbitMQCommandBus struct {
	// contains filtered or unexported fields
}

RabbitMQCommandBus implements CommandBus using RabbitMQ.

func NewRabbitMQCommandBus

func NewRabbitMQCommandBus(amqpURI, app, tag string) (*RabbitMQCommandBus, error)

NewRabbitMQCommandBus creates a new RabbitMQ command bus. amqpURI is the RabbitMQ URI for rabbitmq. app is provides a namespace for this application, allowing for multiple command buses to run on one RabbitMQ and not conflict with eachother. tag is used as the RabbitMQ consumer tag for this bus.

func (*RabbitMQCommandBus) Close

func (b *RabbitMQCommandBus) Close() error

Close closes the command bus, closing the rabbitmq connection.

func (*RabbitMQCommandBus) HandleCommand

func (b *RabbitMQCommandBus) HandleCommand(command Command) error

HandleCommand handles a command, dispatching it to the proper handlers.

func (*RabbitMQCommandBus) PublishCommand

func (b *RabbitMQCommandBus) PublishCommand(command Command) error

PublishCommand publishes a command to the commands exchange.

func (*RabbitMQCommandBus) RegisterCommandType

func (b *RabbitMQCommandBus) RegisterCommandType(command Command, factory func() Command) error

RegisterCommandType registers a command factory for a specific command.

func (*RabbitMQCommandBus) SetHandler

func (b *RabbitMQCommandBus) SetHandler(handler CommandHandler, command Command) error

SetHandler sets a handler for a specific command.

type RabbitMQEventBus

type RabbitMQEventBus struct {
	// contains filtered or unexported fields
}

RabbitMQEventBus implements CommandBus using RabbitMQ.

func NewRabbitMQEventBus

func NewRabbitMQEventBus(amqpURI, app, tag string) (*RabbitMQEventBus, error)

NewRabbitMQEventBus creates a new RabbitMQ event bus. amqpURI is the RabbitMQ URI for rabbitmq. app is provides a namespace for this application, allowing for multiple event buses to run on one RabbitMQ and not conflict with eachother. tag is used as the RabbitMQ consumer tag for this bus.

func (*RabbitMQEventBus) AddGlobalHandler

func (b *RabbitMQEventBus) AddGlobalHandler(handler EventHandler)

AddGlobalHandler adds a handler for global (remote) events.

func (*RabbitMQEventBus) AddHandler

func (b *RabbitMQEventBus) AddHandler(handler EventHandler, event Event)

AddHandler adds a handler for a specific local event.

func (*RabbitMQEventBus) AddLocalHandler

func (b *RabbitMQEventBus) AddLocalHandler(handler EventHandler)

AddLocalHandler adds a handler for local events.

func (*RabbitMQEventBus) Close

func (b *RabbitMQEventBus) Close() error

Close closes the command bus, closing the rabbitmq connection.

func (*RabbitMQEventBus) PublishEvent

func (b *RabbitMQEventBus) PublishEvent(event Event)

PublishEvent publishes a command to the commands exchange.

func (*RabbitMQEventBus) RegisterEventType

func (b *RabbitMQEventBus) RegisterEventType(event Event, factory func() Event) error

RegisterEventType registers a event factory for a specific event.

type ReadRepository

type ReadRepository interface {
	// Save saves a read model with id to the repository.
	Save(string, interface{}) error

	// Find returns one read model with using an id.
	Find(string) (interface{}, error)

	// FindAll returns all read models in the repository.
	FindAll() ([]interface{}, error)

	// Remove removes a read model with id from the repository.
	Remove(string) error
}

ReadRepository is a storage for read models.

type RedisEventBus

type RedisEventBus struct {
	// contains filtered or unexported fields
}

RedisEventBus is an event bus that notifies registered EventHandlers of published events.

func NewRedisEventBus

func NewRedisEventBus(appID, server, password string) (*RedisEventBus, error)

NewRedisEventBus creates a RedisEventBus for remote events.

func NewRedisEventBusWithPool

func NewRedisEventBusWithPool(appID string, pool *redis.Pool) (*RedisEventBus, error)

NewRedisEventBusWithPool creates a RedisEventBus for remote events.

func (*RedisEventBus) AddGlobalHandler

func (b *RedisEventBus) AddGlobalHandler(handler EventHandler)

AddGlobalHandler adds a handler for global (remote) events.

func (*RedisEventBus) AddHandler

func (b *RedisEventBus) AddHandler(handler EventHandler, event Event)

AddHandler adds a handler for a specific local event.

func (*RedisEventBus) AddLocalHandler

func (b *RedisEventBus) AddLocalHandler(handler EventHandler)

AddLocalHandler adds a handler for local events.

func (*RedisEventBus) Close

func (b *RedisEventBus) Close() error

Close exits the recive goroutine by unsubscribing to all channels.

func (*RedisEventBus) PublishEvent

func (b *RedisEventBus) PublishEvent(event Event)

PublishEvent publishes an event to all handlers capable of handling it.

func (*RedisEventBus) RegisterEventType

func (b *RedisEventBus) RegisterEventType(event Event, factory func() Event) error

RegisterEventType registers an event factory for a event type. The factory is used to create concrete event types when receiving from subscriptions.

An example would be:

eventStore.RegisterEventType(&MyEvent{}, func() Event { return &MyEvent{} })

type RemoteCommandBus

type RemoteCommandBus interface {
	CommandBus
	RegisterCommandType(command Command, factory func() Command) error
	Close() error
}

RemoteCommandBus is a command bus that using a networked service.

type RemoteEventBus

type RemoteEventBus interface {
	EventBus
	RemoteHandler
	// Close cleans up any connections
	Close() error
}

RemoteEventBus is EventBus that uses a networked service

type RemoteEventStore

type RemoteEventStore interface {
	EventStore
	RemoteHandler
	// Close the event store.
	Close() error
	// Clear deletes all data from the event store.
	Clear() error
}

RemoteEventStore is a store that is remote, requiring serialization and closing

type RemoteHandler

type RemoteHandler interface {
	// Register a function to create a new event from an event
	RegisterEventType(Event, func() Event) error
}

RemoteHandler enables deserizliaing remote objects to events

type RemoteReadRepository

type RemoteReadRepository interface {
	ReadRepository
	SetModel(factory func() interface{})
	Close() error
	Clear() error
}

RemoteReadRepository is a read repository that uses a networked service

type Repository

type Repository interface {
	// Load loads an aggregate with a type and id.
	Load(string, string) (Aggregate, error)

	// Save saves an aggregets uncommitted events.
	Save(Aggregate) error
}

Repository is a repository responsible for loading and saving aggregates.

type TraceEventStore

type TraceEventStore struct {
	// contains filtered or unexported fields
}

TraceEventStore wraps an EventStore and adds debug tracing.

func NewTraceEventStore

func NewTraceEventStore(eventStore EventStore) *TraceEventStore

NewTraceEventStore creates a new TraceEventStore.

func (*TraceEventStore) GetTrace

func (s *TraceEventStore) GetTrace() []Event

GetTrace returns the events that happened during the tracing.

func (*TraceEventStore) Load

func (s *TraceEventStore) Load(id string) ([]Event, error)

Load loads all events for the aggregate id from the base store. Returns ErrNoEventStoreDefined if no event store could be found.

func (*TraceEventStore) ResetTrace

func (s *TraceEventStore) ResetTrace()

ResetTrace resets the trace.

func (*TraceEventStore) Save

func (s *TraceEventStore) Save(events []Event) error

Save appends all events to the base store and trace them if enabled.

func (*TraceEventStore) StartTracing

func (s *TraceEventStore) StartTracing()

StartTracing starts the tracing of events.

func (*TraceEventStore) StopTracing

func (s *TraceEventStore) StopTracing()

StopTracing stops the tracing of events.

Directories

Path Synopsis
examples
delegation
Package example contains a simple runnable example of a CQRS/ES app.
Package example contains a simple runnable example of a CQRS/ES app.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL