Documentation ¶
Overview ¶
Package eventhorizon is a CQRS/ES toolkit.
Index ¶
- type Aggregate
- type Command
- type CommandHandler
- type DelegateAggregate
- type DelegateDispatcher
- type Dispatcher
- type Event
- type EventBus
- type EventHandler
- type EventStore
- type HandlerEventBus
- type MemoryEventStore
- type MemoryRepository
- type ReflectAggregate
- type ReflectDispatcher
- type ReflectEventHandler
- type Repository
- type TraceEventStore
- type UUID
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Aggregate ¶
type Aggregate interface { // AggregateID returns the id of the aggregate. AggregateID() UUID // ApplyEvent applies an event to the aggregate by setting it's values. ApplyEvent(event Event) // ApplyEvents applies several events by calling ApplyEvent. ApplyEvents(events []Event) }
Aggregate is a CQRS aggregate base to embedd in domain specific aggregates.
A domain specific aggregate is any struct that implements the Aggregate interface, often by embedding. A typical aggregate examyple:
type UserAggregate struct { eventhorizon.Aggregate name string }
The embeddde aggregate is then initialized by the dispatcher with for exapmle ReflectAggregate, depending on the type of dispatcher.
type Command ¶
type Command interface {
AggregateID() UUID
}
Command is a domain command that is sent to a Dispatcher.
A command name should: * Be in present tense. * Contain the intent (MoveCustomer vs CorrectCustomerAddress).
The command should contain all the data needed when handling it.
type CommandHandler ¶
CommandHandler is an interface that all handlers of commands should implement.
type DelegateAggregate ¶
type DelegateAggregate struct {
// contains filtered or unexported fields
}
DelegateAggregate is an implementation of Aggregate using delegation.
This implementation is used by the DelegateDispatcher and will delegate all event handling to the concrete aggregate.
func NewDelegateAggregate ¶
func NewDelegateAggregate(id UUID, delegate EventHandler) *DelegateAggregate
NewDelegateAggregate creates an aggregate wich applies it's events by using methods detected with reflection by the methodApplier.
func (*DelegateAggregate) AggregateID ¶
func (a *DelegateAggregate) AggregateID() UUID
AggregateID returnes the ID of the aggregate.
func (*DelegateAggregate) ApplyEvent ¶
func (a *DelegateAggregate) ApplyEvent(event Event)
ApplyEvent applies an event using the handler.
func (*DelegateAggregate) ApplyEvents ¶
func (a *DelegateAggregate) ApplyEvents(events []Event)
ApplyEvents applies an event stream using the handler.
type DelegateDispatcher ¶
type DelegateDispatcher struct {
// contains filtered or unexported fields
}
DelegateDispatcher is a dispather that dispatches commands and publishes events based on method names.
func NewDelegateDispatcher ¶
func NewDelegateDispatcher(store EventStore, bus EventBus) *DelegateDispatcher
NewDelegateDispatcher creates a dispather and associates it with an event store.
func (*DelegateDispatcher) AddHandler ¶
func (d *DelegateDispatcher) AddHandler(command Command, handler CommandHandler)
AddHandler adds a handler for a command.
func (*DelegateDispatcher) Dispatch ¶
func (d *DelegateDispatcher) Dispatch(command Command) error
Dispatch dispatches a command to the registered command handler.
type Dispatcher ¶
type Dispatcher interface { // Dispatch dispatches a command to the registered command handler. Dispatch(Command) error }
Dispatcher is a interface defining a command and event dispatcher.
The dispatch process is as follows: 1. The dispather receives a command 2. An aggregate is created or rebuilt from previous events in event store 3. The aggregate's command handler is called 4. The aggregate generates events in response to the command 5. The events are stored in the event store 6. The events are published to the event bus
type Event ¶
type Event interface {
AggregateID() UUID
}
Event is a domain event describing a change that has happened to an aggregate.
An event name should: * Be in past tense. * Contain the intent (CustomerMoved vs CustomerAddressCorrected).
The event should contain all the data needed when appling/handling it.
type EventBus ¶
type EventBus interface { // PublishEvent publishes an event on the event bus. PublishEvent(Event) }
EventBus is a interface defining an event bus for distributing events.
type EventHandler ¶
type EventHandler interface {
HandleEvent(Event)
}
EventHandler is an interface that all handlers of events should implement.
type EventStore ¶
type EventStore interface { // Append appends all events in the event stream to the store. Append([]Event) // Load loads all events for the aggregate id from the store. Load(UUID) ([]Event, error) }
EventStore is an interface for an event sourcing event store.
type HandlerEventBus ¶
type HandlerEventBus struct {
// contains filtered or unexported fields
}
HandlerEventBus is an event bus that notifies registered EventHandlers of published events.
func NewHandlerEventBus ¶
func NewHandlerEventBus() *HandlerEventBus
NewHandlerEventBus creates a HandlerEventBus.
func (*HandlerEventBus) AddAllSubscribers ¶
func (b *HandlerEventBus) AddAllSubscribers(subscriber EventHandler)
AddAllSubscribers scans a event handler for handling methods and adds it for every event it detects in the method name.
func (*HandlerEventBus) AddGlobalSubscriber ¶
func (b *HandlerEventBus) AddGlobalSubscriber(subscriber EventHandler)
AddGlobalSubscriber adds the subscriber as a handler for a specific event.
func (*HandlerEventBus) AddSubscriber ¶
func (b *HandlerEventBus) AddSubscriber(event Event, subscriber EventHandler)
AddSubscriber adds the subscriber as a handler for a specific event.
func (*HandlerEventBus) PublishEvent ¶
func (b *HandlerEventBus) PublishEvent(event Event)
PublishEvent publishes an event to all subscribers capable of handling it.
type MemoryEventStore ¶
type MemoryEventStore struct {
// contains filtered or unexported fields
}
MemoryEventStore implements EventStore as an in memory structure.
func NewMemoryEventStore ¶
func NewMemoryEventStore() *MemoryEventStore
NewMemoryEventStore creates a new MemoryEventStore.
func (*MemoryEventStore) Append ¶
func (s *MemoryEventStore) Append(events []Event)
Append appends all events in the event stream to the memory store.
type MemoryRepository ¶
type MemoryRepository struct {
// contains filtered or unexported fields
}
MemoryRepository implements a in memory repository of read models.
func NewMemoryRepository ¶
func NewMemoryRepository() *MemoryRepository
NewMemoryRepository creates a new MemoryRepository.
func (*MemoryRepository) Find ¶
func (r *MemoryRepository) Find(id UUID) (interface{}, error)
Find returns one read model with using an id.
func (*MemoryRepository) FindAll ¶
func (r *MemoryRepository) FindAll() ([]interface{}, error)
FindAll returns all read models in the repository.
func (*MemoryRepository) Remove ¶
func (r *MemoryRepository) Remove(id UUID) error
Remove removes a read model with id from the repository.
func (*MemoryRepository) Save ¶
func (r *MemoryRepository) Save(id UUID, model interface{})
Save saves a read model with id to the repository.
type ReflectAggregate ¶
type ReflectAggregate struct {
// contains filtered or unexported fields
}
ReflectAggregate is an implementation of Aggregate using method reflection.
This implementation is used by the ReflectDispatcher and will add handler based on the aggregate's methods prefixed with "Apply". See docs for ReflectEventHandler for more info.
func NewReflectAggregate ¶
func NewReflectAggregate(id UUID, source interface{}) *ReflectAggregate
NewReflectAggregate creates an aggregate wich applies it's events by using methods detected with reflection by the methodApplier.
func (*ReflectAggregate) AggregateID ¶
func (a *ReflectAggregate) AggregateID() UUID
AggregateID returnes the ID of the aggregate.
func (*ReflectAggregate) ApplyEvent ¶
func (a *ReflectAggregate) ApplyEvent(event Event)
ApplyEvent applies an event using the handler.
func (*ReflectAggregate) ApplyEvents ¶
func (a *ReflectAggregate) ApplyEvents(events []Event)
ApplyEvents applies an event stream using the handler.
type ReflectDispatcher ¶
type ReflectDispatcher struct {
// contains filtered or unexported fields
}
ReflectDispatcher is a dispather that dispatches commands and publishes events based on method names.
func NewReflectDispatcher ¶
func NewReflectDispatcher(store EventStore, bus EventBus) *ReflectDispatcher
NewReflectDispatcher creates a dispather and associates it with an event store.
func (*ReflectDispatcher) AddAllHandlers ¶
func (d *ReflectDispatcher) AddAllHandlers(source interface{})
AddAllHandlers scans an aggregate for command handling methods and adds it for every event it can handle.
func (*ReflectDispatcher) AddHandler ¶
func (d *ReflectDispatcher) AddHandler(command Command, source interface{})
AddHandler adds an aggregate as a handler for a command.
Handling methods are defined in code by:
func (source *MySource) HandleMyCommand(c MyCommand).
When getting the type of this methods by reflection the signature is as following:
func HandleMyCommand(source *MySource, c MyCommand).
Only add method that has the correct type.
func (*ReflectDispatcher) Dispatch ¶
func (d *ReflectDispatcher) Dispatch(command Command) error
Dispatch dispatches a command to the registered command handler.
type ReflectEventHandler ¶
type ReflectEventHandler struct {
// contains filtered or unexported fields
}
ReflectEventHandler routes events to methods of a struct by convention. There should be one router per event source instance.
The convention is: func(s MySource) HandleXXX(e EventType)
func NewReflectEventHandler ¶
func NewReflectEventHandler(source interface{}, methodPrefix string) *ReflectEventHandler
NewReflectEventHandler returns an EventHandler that uses reflection to handle events based on method names.
func (*ReflectEventHandler) HandleEvent ¶
func (h *ReflectEventHandler) HandleEvent(event Event)
HandleEvent handles an event by routing it to the handler method of the source.
type Repository ¶
type Repository interface { // Save saves a read model with id to the repository. Save(UUID, interface{}) // Find returns one read model with using an id. Find(UUID) (interface{}, error) // FindAll returns all read models in the repository. FindAll() ([]interface{}, error) // Remove removes a read model with id from the repository. Remove(UUID) error }
Repository is a storage for read models.
type TraceEventStore ¶
type TraceEventStore struct {
// contains filtered or unexported fields
}
TraceEventStore wraps an EventStore and adds debug tracing.
func NewTraceEventStore ¶
func NewTraceEventStore(eventStore EventStore) *TraceEventStore
NewTraceEventStore creates a new TraceEventStore.
func (*TraceEventStore) Append ¶
func (s *TraceEventStore) Append(events []Event)
Append appends all events to the base store and trace them if enabled.
func (*TraceEventStore) GetTrace ¶
func (s *TraceEventStore) GetTrace() []Event
GetTrace returns the events that happened during the tracing.
func (*TraceEventStore) Load ¶
func (s *TraceEventStore) Load(id UUID) ([]Event, error)
Load loads all events for the aggregate id from the base store.
func (*TraceEventStore) ResetTrace ¶
func (s *TraceEventStore) ResetTrace()
ResetTrace resets the trace.
func (*TraceEventStore) StartTracing ¶
func (s *TraceEventStore) StartTracing()
StartTracing starts the tracing of events.
func (*TraceEventStore) StopTracing ¶
func (s *TraceEventStore) StopTracing()
StopTracing stops the tracing of events.
type UUID ¶
type UUID uuid.UUID
UUID is a unique identifier, based on the UUID spec.
func (*UUID) MarshalJSON ¶
MarshalJSON turns UUID into a json.Marshaller.
func (*UUID) UnmarshalJSON ¶
UnmarshalJSON turns *UUID into a json.Unmarshaller.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
examples
|
|
delegation
Package example contains a simple runnable example of a CQRS/ES app.
|
Package example contains a simple runnable example of a CQRS/ES app. |
reflection
Package example contains a simple runnable example of a CQRS/ES app.
|
Package example contains a simple runnable example of a CQRS/ES app. |