cqrs

package module
v3.2.1+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2018 License: MIT Imports: 8 Imported by: 0

README

go-cqrs

Build Status GoDoc GoCover GoReportCard

CQRS/es implementation in go

Aggregates

todo

Commands

todo

Events

todo

Event store and streams

todo

Factories

todo

Aggregate repository

todo

Projections

todo

Middleware for command handler

todo

Event publishing

todo

Examples

Examples are in the _example directory.

Inventory example This is the GO version of the C# Simple CQRS example from Gregory Young.

Passing events by value not by pointer reference

Events are always passed by value, never passed by pointer reference. This is to ensure immutability of the events data.

  • The Load method of the aggregate repository will always convert pointer events and pass them by value to the aggregates Apply function.
  • The Save method of the aggregate repository will convert any pointer referenced events and pass them by value to the aggregate Apply method and to the publish event hooks (PublishEventFunc)

Always refer to the class name of an event in your aggregates and projections. Never to the pointer variant, it will probably never be picked up

EventFactory

The event factory must always return a pointer to the newly created event. This is of the event stream needs to scan/unmarshal the data into the event instance and can only do this for pointer instances. Later on the newly created event will be passed by value to the aggregates and or projections.

Todo
  • Test the domain aggregate repository.
  • More real world examples.
  • See how it performs in my projects.
  • Documentation documentation documentation.
  • Write test for Snapshot repository
  • Create Mysql and Postgres variant for SnapshotStore

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrRepositoryNotFound = errors.New("repository not found")
)
View Source
var ErrUnknownCommand = errors.New("Cannot handle unknown Command")
View Source
var ErrUnknownEvent = errors.New("Cannot handle unknown Event")

Functions

func AggregateCommandHandler

func AggregateCommandHandler(repository AggregateRepository) commandbus.CommandHandler

AggregateCommandHandler is a command handler middleware who loads the aggregate calls the aggregate command handler to execute the business logic and saves the events to the aggregate store afterwards.

func AggregateCommandHandlerCallback

func AggregateCommandHandlerCallback(repository AggregateRepository, handler AggregateCommandHandlerFunc) commandbus.CommandHandler

Types

type Aggregate

type Aggregate interface {
	AggregateContext

	// AggregateName returns the type name of the aggregate.
	AggregateName() string

	// Apply applies an event to the aggregate by setting its values.
	Apply(Event) error
}

type AggregateBuilder

type AggregateBuilder func(aggregateId uuid.UUID) (Aggregate, error)

AggregateBuilder is the builder function to create new aggregate compositions. This could be used to introduce new strategies how to build a aggregate like the snapshot implementation

func DefaultAggregateBuilder

func DefaultAggregateBuilder(factory AggregateFactoryFunc) AggregateBuilder

func SnapshotAggregateBuilder

func SnapshotAggregateBuilder(factory AggregateFactoryFunc, snapshotStore SnapshotStore) AggregateBuilder

SnapshotAggregateBuilder

type AggregateCommand

type AggregateCommand interface {
	Command
	AggregateName() string
}

type AggregateCommandHandlerFunc

type AggregateCommandHandlerFunc func(aggregate Aggregate, command Command) error

type AggregateComposition

type AggregateComposition interface {
	Context() AggregateContext
	Aggregate() Aggregate
}

type AggregateContext

type AggregateContext interface {
	// AggregateId returns the id of the aggregate.
	AggregateId() uuid.UUID

	// Version returns the version of the aggregate.
	Version() int

	// StoreEvent stores an event as uncommitted event.
	StoreEvent(Event)
	// contains filtered or unexported methods
}

func NewAggregateContext

func NewAggregateContext(id uuid.UUID, version int) AggregateContext

type AggregateFactory

type AggregateFactory interface {
	//MakeAggregate will return a clean Aggregate based on the type provided
	MakeAggregate(string, AggregateContext) Aggregate
}

AggregateFactory returns aggregate instances of a specified type with the AggregateId set to the uuid provided.

type AggregateFactoryFunc

type AggregateFactoryFunc func(AggregateContext) Aggregate

type AggregateHandlesCommands

type AggregateHandlesCommands interface {
	Aggregate

	// HandleCommand handles a command and stores events.
	HandleCommand(Command) error
}

AggregateHandlesCommands indicates a aggregate can directly handle a command

type AggregateRepository

type AggregateRepository interface {
	//Loads an aggregate of the given type and ID
	Load(aggregateId uuid.UUID) (Aggregate, error)

	//Saves the aggregate.
	Save(aggregate Aggregate) error
}

AggregateRepository is the interface that a specific aggregate repositories should implement.

func NewAggregateRepository

func NewAggregateRepository(
	eventStore EventStore,
	aggregateBuilder AggregateBuilder,
	abstractEventFactory EventFactory,
	publishEventHooks ...PublishEventFunc) AggregateRepository

NewAggregateRepository is the constructor of the repository

publishEventHooks get called when a new event is successfully persisted to the eventstore. This is very useful to wire it to an eventbus for publishing the event to other listeners (projections)

func NewSnapshotAggregateRepository

func NewSnapshotAggregateRepository(
	eventStore EventStore,
	snapshotStore SnapshotStore,
	differenceOffset int,
	aggregateBuilder AggregateBuilder,
	abstractEventFactory EventFactory,
	publishEventHooks ...PublishEventFunc) AggregateRepository

NewSnapshotAggregateRepository is the constructor of the aggregate repository with snapshot functionality A snapshot will be created when the differenceOffset between the snapshot version and the current version is equal or greater than the `differenceOffset`

When the differenceOffset is set to 10 than: - aggregate version 7 (snapshot version 0) will not create a snapshot - aggregate version 10 (snapshot version 0) will create a snapshot for version 10 - aggregate version 13 (snapshot version 0) will create a snapshot for version 13 - aggregate version 21 (snapshot version 13) will not create a snapshot - aggregate version 54 (snapshot version 13) will create a snapshot for version 54

type AggregateRepositoryManager

type AggregateRepositoryManager interface {
	//RepositoryFor will return the repository for the specific named aggregate
	RepositoryFor(aggregateName string) AggregateRepository
}

AggregateRepositoryManager is the managing interface who provide aggregate repository access

type CallbackAggregateFactory

type CallbackAggregateFactory struct {
	// contains filtered or unexported fields
}

CallbackAggregateFactory is an implementation of the AggregateFactory interface that supports registration of delegate/callback functions to perform aggregate instantiation.

func NewCallbackAggregateFactory

func NewCallbackAggregateFactory() *CallbackAggregateFactory

NewCallbackAggregateFactory creates a new CallbackAggregateFactory

func (*CallbackAggregateFactory) MakeAggregate

func (t *CallbackAggregateFactory) MakeAggregate(typeName string, ctx AggregateContext) Aggregate

MakeAggregate calls the callback for the specified type and returns the result.

func (*CallbackAggregateFactory) RegisterCallback

func (t *CallbackAggregateFactory) RegisterCallback(callback AggregateFactoryFunc) error

RegisterCallback is used to register a new function for instantiation of an aggregate instance.

type CallbackEventFactory

type CallbackEventFactory struct {
	// contains filtered or unexported fields
}

CallbackEventFactory uses callback/delegate functions to instantiate event instances given the name of the event type as a string.

func NewCallbackEventFactory

func NewCallbackEventFactory() *CallbackEventFactory

NewCallbackEventFactory constructs a new CallbackEventFactory

func (*CallbackEventFactory) MakeEvent

func (t *CallbackEventFactory) MakeEvent(typeName string, aggregateId uuid.UUID, version int, occurredAt time.Time) Event

MakeEvent returns an event instance given an event type as a string.

An appropriate delegate must be registered for the event type. If an appropriate delegate is not registered, the method will return nil.

func (*CallbackEventFactory) RegisterCallback

func (t *CallbackEventFactory) RegisterCallback(callback EventFactoryFunc) error

RegisterCallback registers a delegate that will return an event instance given an event type name as a string.

type Command

type Command interface {
	commandbus.Command
	AggregateId() uuid.UUID
}

type DomainAggregateRepository

type DomainAggregateRepository struct {
	// contains filtered or unexported fields
}

func NewCommonDomainRepository

func NewCommonDomainRepository(eventStore EventStore, eventFactory EventFactory, aggregateFactory AggregateFactory) *DomainAggregateRepository

NewRepository instantiates a new repository resolver who accepts a stream resolver

func (*DomainAggregateRepository) Load

func (r *DomainAggregateRepository) Load(aggregateType string, aggregateId uuid.UUID) (Aggregate, error)

Loads an aggregate of the given type and ID

func (*DomainAggregateRepository) RepositoryFor

func (r *DomainAggregateRepository) RepositoryFor(aggregateName string) AggregateRepository

func (*DomainAggregateRepository) Save

func (r *DomainAggregateRepository) Save(aggregate Aggregate) error

Save will save all the events to the event store.

func (*DomainAggregateRepository) SetEventBus

func (r *DomainAggregateRepository) SetEventBus(eventBus eventbus.EventBus)

SetEventBus will set which eventbus for publishing new events if set to nil no events will be published during a save action

type ErrorAggregateCannotHandleCommand

type ErrorAggregateCannotHandleCommand string

func (ErrorAggregateCannotHandleCommand) Error

type ErrorAggregateFactoryAlreadyRegistered

type ErrorAggregateFactoryAlreadyRegistered string

func (ErrorAggregateFactoryAlreadyRegistered) Error

type ErrorAggregateNotFound

type ErrorAggregateNotFound string

func (ErrorAggregateNotFound) Error

func (e ErrorAggregateNotFound) Error() string

type ErrorEventFactoryAlreadyRegistered

type ErrorEventFactoryAlreadyRegistered string

func (ErrorEventFactoryAlreadyRegistered) Error

type ErrorEventFactoryNotReturningPointer

type ErrorEventFactoryNotReturningPointer string

func (ErrorEventFactoryNotReturningPointer) Error

type ErrorNotAnAggregateCommand

type ErrorNotAnAggregateCommand string

func (ErrorNotAnAggregateCommand) Error

type Event

type Event interface {
	eventbus.Event
	EventBase
}

Event is the interface of an event what an aggregate needs

type EventBase

type EventBase interface {
	AggregateId() uuid.UUID
	Version() int
	OccurredAt() time.Time
}

func NewEventBase

func NewEventBase(id uuid.UUID, version int, occurredAt time.Time) EventBase

NewEventBase constructor with plain version

func NewEventBaseFromAggregate

func NewEventBaseFromAggregate(aggregate AggregateContext) EventBase

NewEventBaseFromAggregate constructor will create a new eventbase based on the latest aggregate state

type EventFactory

type EventFactory interface {
	MakeEvent(string, uuid.UUID, int, time.Time) Event
}

EventFactory is the interface that an event store should implement. An event factory returns instances of an event given the event type as a string.

type EventFactoryFunc

type EventFactoryFunc func(uuid.UUID, int, time.Time) Event

EventFactoryFunc should create an Event and return the pointer to the instance.

type EventStore

type EventStore interface {
	LoadStream(aggregateName string, aggregateId uuid.UUID, version int) (EventStream, error)
	WriteEvent(string, ...Event) error
}

type EventStream

type EventStream interface {
	EventName() string
	AggregateId() uuid.UUID
	Version() int
	OccurredAt() time.Time

	Next() bool
	Error() error
	Scan(Event) error
}

type PublishEventFunc

type PublishEventFunc func(event Event)

PublishEventFunc is a callback function that is getting called once the eventstore has successfully stored the new events gereated by the aggregate

type SnapshotStore

type SnapshotStore interface {
	Load(aggregateId uuid.UUID, aggregate Aggregate) (int, error)
	Write(aggregate Aggregate) error
}

type Validate

type Validate interface {
	Validate() error
}

Validate is the interface an aggregate command can implement to perform validation prior to executing domain logic

Directories

Path Synopsis
_example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL