ycq

package module
v0.0.0-...-e4d812d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2019 License: MIT Imports: 5 Imported by: 28

README

Go.CQRS license Go Report Card GoDoc

A Golang CQRS Reference implementation

Go.CQRS provides interfaces and implementations to support a CQRS implementation in Golang. The examples directory contains a sample application that demonstrates how to use Go.CQRS.

As much as possible Go.CQRS has been designed with the principles of CQRS espoused by Greg Young which represents the best thinking on the topic.

CQRS Pattern vs CQRS Framework

CQRS is an architectural pattern. When implementing the CQRS pattern, it is easy to imagine how the code could be packaged into a framework. However, it is recommended that those working with CQRS focus on learning the underlying detail of the pattern rather than simply use a framework.

The implementation of the CQRS pattern is not especially difficult, however it is a steep learning curve because the pattern is very different to the traditional non CQRS architecture. Topics such as Aggregate Design are very different. If you are going to use EventSourcing and eventual consistency then there is a lot of learning to be done.

If you are new to CQRS or simply interested in best practices there is a great 6 hour video of a hands-on CQRS workshop by Greg Young.

Once the pattern is understood, implementations such as Go.CQRS can be used as a reference for learning how to implement the pattern in Golang and also as a foundation upon which to build your CQRS implementation.

What does Go.CQRS provide?

Feature Description
Aggregate AggregateRoot interface and Aggregate base type that can be embedded in your own types to provide common functions required by aggregates
Event An Event interface and an EventDescriptor which is a message envelope for events. Events in Go.CQRS are simply plain Go structs and there are no magic strings to describe them as is the case in some other Go implementations.
Command A Command interface and an CommandDescriptor which is a message envelope for commands. Commands in Go.CQRS are simply plain Go structs and there are no magic strings to describe them as is the case in some other Go implementations.
CommandHandler Interface and base functionality for chaining command handlers
Dispatcher Dispatcher interface and an in memory dispatcher implementation
EventBus EventBus interface and in memory implementation
EventHandler EventHandler interface
Repository Repository interface and an implementation of the CommonDomain repository that persists events in GetEventStore. While there are many generic event store implementations over common databases such as MongoDB, GetEventStore is a specialised EventSourcing database that is open source, performant and reflects the best thinking on the topic from a highly experienced team in this field.
StreamNamer A StreamNamer interface and a DelegateStreamNamer implementation that supports the use of functions with the signiature func(string, string) string to provide flexibility around stream naming. A common way to construct a stream name might be to use the name of your BoundedContext suffixed with an AggregateID.

All implementations are easily replaced to suit your particular requirements.

Example code

The examples folder contains a simple and clear example of how to use go.cqrs to contruct your service. The example is a port of the classic reference implementation m-r by Greg Young.

Getting Started

    $ go get github.com/jetbasrawi/go.cqrs

Refer to the example application for guidance on how to use Go.CQRS.

Documentation

Overview

Package ycq provides a CQRS reference implementation.

The implementation follows as much as possible the classic reference implementation m-r by Greg Young.

The implmentation differs in a number of respects becasue the original is written in C# and uses Generics where generics are not available in Go. This implementation instead uses interfaces to deal with types in a generic manner and used delegate functions to instantiate specific types.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Int

func Int(i int) *int

Int returns a pointer to int.

There are a number of places where a pointer to int is required such as expectedVersion argument on the repository and this helper function makes keeps the code cleaner in these cases.

func NewUUID

func NewUUID() string

NewUUID returns a new v4 uuid as a string

Types

type AggregateBase

type AggregateBase struct {
	// contains filtered or unexported fields
}

AggregateBase is a type that can be embedded in an AggregateRoot implementation to handle common aggragate behaviour

All required methods to implement an aggregate are here, to implement the Aggregate root interface your aggregate will need to implement the Apply method that will contain behaviour specific to your aggregate.

func NewAggregateBase

func NewAggregateBase(id string) *AggregateBase

NewAggregateBase contructs a new AggregateBase.

func (*AggregateBase) AggregateID

func (a *AggregateBase) AggregateID() string

AggregateID returns the AggregateID

func (*AggregateBase) ClearChanges

func (a *AggregateBase) ClearChanges()

ClearChanges removes all unpersisted events from the aggregate.

func (*AggregateBase) CurrentVersion

func (a *AggregateBase) CurrentVersion() int

CurrentVersion returns the version of the aggregate as it was when it was instantiated or loaded from the repository.

Importantly an aggregate with one event applied will be at version 0 this allows the aggregates to match the version in the eventstore where the first event will be version 0.

func (*AggregateBase) GetChanges

func (a *AggregateBase) GetChanges() []EventMessage

GetChanges returns the collection of new unpersisted events that have been applied to the aggregate.

func (*AggregateBase) IncrementVersion

func (a *AggregateBase) IncrementVersion()

IncrementVersion increments the aggregate version number by one.

func (*AggregateBase) OriginalVersion

func (a *AggregateBase) OriginalVersion() int

OriginalVersion returns the version of the aggregate as it was when it was instantiated or loaded from the repository.

Importantly an aggregate with one event applied will be at version 0 this allows the aggregates to match the version in the eventstore where the first event will be version 0.

func (*AggregateBase) TrackChange

func (a *AggregateBase) TrackChange(event EventMessage)

TrackChange stores the EventMessage in the changes collection.

Changes are new, unpersisted events that have been applied to the aggregate.

type AggregateFactory

type AggregateFactory interface {
	GetAggregate(string, string) AggregateRoot
}

AggregateFactory returns aggregate instances of a specified type with the AggregateID set to the uuid provided.

An aggregate factory is typically a dependency of the repository that will delegate instantiation of aggregate instances to the Aggregate factory.

type AggregateRoot

type AggregateRoot interface {
	AggregateID() string
	OriginalVersion() int
	CurrentVersion() int
	IncrementVersion()
	Apply(events EventMessage, isNew bool)
	TrackChange(EventMessage)
	GetChanges() []EventMessage
	ClearChanges()
}

AggregateRoot is the interface that all aggregates should implement

type CommandDescriptor

type CommandDescriptor struct {
	// contains filtered or unexported fields
}

CommandDescriptor is an implementation of the command message interface.

func NewCommandMessage

func NewCommandMessage(aggregateID string, command interface{}) *CommandDescriptor

NewCommandMessage returns a new command descriptor

func (*CommandDescriptor) AggregateID

func (c *CommandDescriptor) AggregateID() string

AggregateID returns the ID of the aggregate that the command relates to.

func (*CommandDescriptor) Command

func (c *CommandDescriptor) Command() interface{}

Command returns the actual command payload of the message.

func (*CommandDescriptor) CommandType

func (c *CommandDescriptor) CommandType() string

CommandType returns the command type name as a string

func (*CommandDescriptor) Headers

func (c *CommandDescriptor) Headers() map[string]interface{}

Headers returns the collection of headers for the command.

func (*CommandDescriptor) SetHeader

func (c *CommandDescriptor) SetHeader(key string, value interface{})

SetHeader sets the value of the header with the specified key

type CommandHandler

type CommandHandler interface {
	Handle(CommandMessage) error
}

CommandHandler is the interface that all command handlers should implement.

type CommandHandlerBase

type CommandHandlerBase struct {
	// contains filtered or unexported fields
}

CommandHandlerBase is an embedded type that supports chaining of command handlers through provision of a next field that will hold a reference to the next handler in the chain.

type CommandMessage

type CommandMessage interface {

	// AggregateID returns the ID of the Aggregate that the command relates to
	AggregateID() string

	// Headers returns the key value collection of headers for the command.
	Headers() map[string]interface{}

	// SetHeader sets the value of the header specified by the key
	SetHeader(string, interface{})

	// Command returns the actual command which is the payload of the command message.
	Command() interface{}

	// CommandType returns a string descriptor of the command name
	CommandType() string
}

CommandMessage is the interface that a command message must implement.

type DelegateAggregateFactory

type DelegateAggregateFactory struct {
	// contains filtered or unexported fields
}

DelegateAggregateFactory is an implementation of the AggregateFactory interface that supports registration of delegate functions to perform aggregate instantiation.

func NewDelegateAggregateFactory

func NewDelegateAggregateFactory() *DelegateAggregateFactory

NewDelegateAggregateFactory contructs a new DelegateAggregateFactory

func (*DelegateAggregateFactory) GetAggregate

func (t *DelegateAggregateFactory) GetAggregate(typeName string, id string) AggregateRoot

GetAggregate calls the delegate for the type specified and returns the result.

func (*DelegateAggregateFactory) RegisterDelegate

func (t *DelegateAggregateFactory) RegisterDelegate(aggregate AggregateRoot, delegate func(string) AggregateRoot) error

RegisterDelegate is used to register a new funtion for instantiation of an aggregate instance.

func(id string) AggregateRoot {return NewMyAggregateType(id)}
func(id string) AggregateRoot { return &MyAggregateType{AggregateBase:NewAggregateBase(id)} }

type DelegateEventFactory

type DelegateEventFactory struct {
	// contains filtered or unexported fields
}

DelegateEventFactory uses delegate functions to instantiate event instances given the name of the event type as a string.

func NewDelegateEventFactory

func NewDelegateEventFactory() *DelegateEventFactory

NewDelegateEventFactory constructs a new DelegateEventFactory

func (*DelegateEventFactory) GetEvent

func (t *DelegateEventFactory) GetEvent(typeName string) interface{}

GetEvent returns an event instance given an event type as a string.

An appropriate delegate must be registered for the event type. If an appropriate delegate is not registered, the method will return nil.

func (*DelegateEventFactory) RegisterDelegate

func (t *DelegateEventFactory) RegisterDelegate(event interface{}, delegate func() interface{}) error

RegisterDelegate registers a delegate that will return an event instance given an event type name as a string.

If an attempt is made to register multiple delegates for an event type, an error is returned.

type DelegateStreamNamer

type DelegateStreamNamer struct {
	// contains filtered or unexported fields
}

DelegateStreamNamer stores delegates per aggregate type allowing fine grained control of stream names for event streams.

func NewDelegateStreamNamer

func NewDelegateStreamNamer() *DelegateStreamNamer

NewDelegateStreamNamer constructs a delegate stream namer

func (*DelegateStreamNamer) GetStreamName

func (r *DelegateStreamNamer) GetStreamName(aggregateTypeName string, id string) (string, error)

GetStreamName gets the result of the stream name delgate registered for the aggregate type.

func (*DelegateStreamNamer) RegisterDelegate

func (r *DelegateStreamNamer) RegisterDelegate(delegate func(string, string) string, aggregates ...AggregateRoot) error

RegisterDelegate allows registration of a stream name delegate function for the aggregates specified in the variadic aggregates argument.

type Dispatcher

type Dispatcher interface {
	Dispatch(CommandMessage) error
	RegisterHandler(CommandHandler, ...interface{}) error
}

Dispatcher is the interface that should be implemented by command dispatcher

The dispatcher is the mechanism through which commands are distributed to the appropriate command handler.

Command handlers are registered with the dispatcher for a given command type. It is good practice in CQRS to have only one command handler for a given command. When a command is passed to the dispatcher it will look for the registered command handler and call that handler's Handle method passing the command message as an argument.

Commands contained in a CommandMessage envelope are passed to the Dispatcher via the dispatch method.

type DomainRepository

type DomainRepository interface {
	//Loads an aggregate of the given type and ID
	Load(aggregateTypeName string, aggregateID string) (AggregateRoot, error)

	//Saves the aggregate.
	Save(aggregate AggregateRoot, expectedVersion *int) error
}

DomainRepository is the interface that all domain repositories should implement.

type ErrAggregateNotFound

type ErrAggregateNotFound struct {
	AggregateID   string
	AggregateType string
}

ErrAggregateNotFound error returned when an aggregate was not found in the repository.

func (*ErrAggregateNotFound) Error

func (e *ErrAggregateNotFound) Error() string

type ErrCommandExecution

type ErrCommandExecution struct {
	Command CommandMessage
	Reason  string
}

ErrCommandExecution is the error returned in response to a failed command.

func (*ErrCommandExecution) Error

func (e *ErrCommandExecution) Error() string

Error fulfills the error interface.

type ErrConcurrencyViolation

type ErrConcurrencyViolation struct {
	Aggregate       AggregateRoot
	ExpectedVersion *int
	StreamName      string
}

ErrConcurrencyViolation is returned when a concurrency error is raised by the event store when events are persisted to a stream and the version of the stream does not match the expected version.

func (*ErrConcurrencyViolation) Error

func (e *ErrConcurrencyViolation) Error() string

type ErrRepositoryUnavailable

type ErrRepositoryUnavailable struct{}

ErrRepositoryUnavailable is returned when the eventstore is temporarily unavailable

func (*ErrRepositoryUnavailable) Error

func (e *ErrRepositoryUnavailable) Error() string

type ErrUnauthorized

type ErrUnauthorized struct {
}

ErrUnauthorized is returned when a request to the repository is not authorized

func (*ErrUnauthorized) Error

func (e *ErrUnauthorized) Error() string

type ErrUnexpected

type ErrUnexpected struct {
	Err error
}

ErrUnexpected is returned for all errors that are not otherwise represented explicitly.

The original error is available for inspection in the Err field.

func (*ErrUnexpected) Error

func (e *ErrUnexpected) Error() string

type EventBus

type EventBus interface {
	PublishEvent(EventMessage)
	AddHandler(EventHandler, ...interface{})
}

EventBus is the inteface that an event bus must implement.

type EventDescriptor

type EventDescriptor struct {
	// contains filtered or unexported fields
}

EventDescriptor is an implementation of the event message interface.

func NewEventMessage

func NewEventMessage(aggregateID string, event interface{}, version *int) *EventDescriptor

NewEventMessage returns a new event descriptor

func (*EventDescriptor) AggregateID

func (c *EventDescriptor) AggregateID() string

AggregateID returns the ID of the Aggregate that the event relates to.

func (*EventDescriptor) Event

func (c *EventDescriptor) Event() interface{}

Event the event payload of the event message

func (*EventDescriptor) EventType

func (c *EventDescriptor) EventType() string

EventType returns the name of the event type as a string.

func (*EventDescriptor) GetHeaders

func (c *EventDescriptor) GetHeaders() map[string]interface{}

GetHeaders returns the headers for the event.

func (*EventDescriptor) SetHeader

func (c *EventDescriptor) SetHeader(key string, value interface{})

SetHeader sets the value of the header specified by the key

func (*EventDescriptor) Version

func (c *EventDescriptor) Version() *int

Version returns the version of the event

type EventFactory

type EventFactory interface {
	GetEvent(string) interface{}
}

EventFactory is the interface that an event factory should implement.

An event factory returns instances of an event given the event type as a string. An event factory is required during deserialisation of events by the eventstore or repository depending on your implementation.

The eventstore will return a string describing the event type. To unmarshal the contents of the persisted event which will typically be in some serialised format such as JSON an instance of the event type will need to be created.

type EventHandler

type EventHandler interface {
	Handle(EventMessage)
}

type EventMessage

type EventMessage interface {

	// AggregateID returns the ID of the Aggregate that the event relates to
	AggregateID() string

	// GetHeaders returns the key value collection of headers for the event.
	//
	// Headers are metadata about the event that do not form part of the
	// actual event but are still required to be persisted alongside the event.
	GetHeaders() map[string]interface{}

	// SetHeader sets the value of the header specified by the key
	SetHeader(string, interface{})

	// Returns the actual event which is the payload of the event message.
	Event() interface{}

	// EventType returns a string descriptor of the command name
	EventType() string

	// Version returns the version of the event
	Version() *int
}

EventMessage is the interface that a command must implement.

type GetEventStoreCommonDomainRepo

type GetEventStoreCommonDomainRepo struct {
	// contains filtered or unexported fields
}

GetEventStoreCommonDomainRepo is an implementation of the DomainRepository that uses GetEventStore for persistence

func NewCommonDomainRepository

func NewCommonDomainRepository(eventStore *goes.Client, eventBus EventBus) (*GetEventStoreCommonDomainRepo, error)

NewCommonDomainRepository constructs a new CommonDomainRepository

func (*GetEventStoreCommonDomainRepo) Load

func (r *GetEventStoreCommonDomainRepo) Load(aggregateType, id string) (AggregateRoot, error)

Load will load all events from a stream and apply those events to an aggregate of the type specified.

The aggregate type and id will be passed to the configured StreamNamer to get the stream name.

func (*GetEventStoreCommonDomainRepo) Save

func (r *GetEventStoreCommonDomainRepo) Save(aggregate AggregateRoot, expectedVersion *int) error

Save persists an aggregate

func (*GetEventStoreCommonDomainRepo) SetAggregateFactory

func (r *GetEventStoreCommonDomainRepo) SetAggregateFactory(factory AggregateFactory)

SetAggregateFactory sets the aggregate factory that should be used to instantate aggregate instances

Only one AggregateFactory can be registered at any one time. Any registration will overwrite the provious registration.

func (*GetEventStoreCommonDomainRepo) SetEventFactory

func (r *GetEventStoreCommonDomainRepo) SetEventFactory(factory EventFactory)

SetEventFactory sets the event factory that should be used to instantiate event instances.

Only one event factory can be set at a time. Any subsequent registration will overwrite the previous factory.

func (*GetEventStoreCommonDomainRepo) SetStreamNameDelegate

func (r *GetEventStoreCommonDomainRepo) SetStreamNameDelegate(delegate StreamNamer)

SetStreamNameDelegate sets the stream name delegate

type InMemoryDispatcher

type InMemoryDispatcher struct {
	// contains filtered or unexported fields
}

InMemoryDispatcher provides a lightweight and performant in process dispatcher

func NewInMemoryDispatcher

func NewInMemoryDispatcher() *InMemoryDispatcher

NewInMemoryDispatcher constructs a new in memory dispatcher

func (*InMemoryDispatcher) Dispatch

func (b *InMemoryDispatcher) Dispatch(command CommandMessage) error

Dispatch passes the CommandMessage on to all registered command handlers.

func (*InMemoryDispatcher) RegisterHandler

func (b *InMemoryDispatcher) RegisterHandler(handler CommandHandler, commands ...interface{}) error

RegisterHandler registers a command handler for the command types specified by the variadic commands parameter.

type InternalEventBus

type InternalEventBus struct {
	// contains filtered or unexported fields
}

InternalEventBus provides a lightweight in process event bus

func NewInternalEventBus

func NewInternalEventBus() *InternalEventBus

NewInternalEventBus constructs a new InternalEventBus

func (*InternalEventBus) AddHandler

func (b *InternalEventBus) AddHandler(handler EventHandler, events ...interface{})

AddHandler registers an event handler for all of the events specified in the variadic events parameter.

func (*InternalEventBus) PublishEvent

func (b *InternalEventBus) PublishEvent(event EventMessage)

PublishEvent publishes events to all registered event handlers

type StreamNamer

type StreamNamer interface {
	GetStreamName(string, string) (string, error)
}

StreamNamer is the interface that stream name delegates should implement.

Directories

Path Synopsis
examples
internal
uuid
Package uuid provides implementation of Universally Unique Identifier (UUID).
Package uuid provides implementation of Universally Unique Identifier (UUID).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL