cqrs

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2019 License: Apache-2.0 Imports: 9 Imported by: 6

README

CQRS framework in go

GoDoc Build Status

Project Summary

The package provides a framework for quickly implementing a CQRS style application. The framework attempts to provides helpful functions to facilitate:

  • Event Sourcing
  • Command issuing and processing
  • Event publishing
  • Read model generation from published events

Example code

Example test scenario (inmemory)

Example test scenario (couchbase, rabbitmq)

Example CQRS scaleout/concurrent test

Test Scenario

The example test scenario is of a simple bank account that seeks to track, using event sourcing, a customers balance and login password

The are two main areas of concern at the application level, the Write model and Read model. The read model is aimed to facilitate fast reads (read model projections) The write model is where the business logic get executed and asynchronously notifies the read models

Write model - Using Event Sourcing

Account
type Account struct {
  cqrs.EventSourceBased

  FirstName    string
  LastName     string
  EmailAddress string
  PasswordHash []byte
  Balance      float64
}

To compensate for golang's lack of inheritance, a combination of type embedding and a call convention pattern are utilized.

func NewAccount(firstName string, lastName string, emailAddress string, passwordHash []byte, initialBalance float64) *Account {
  account := new(Account)
  account.EventSourceBased = cqrs.NewEventSourceBased(account)

  event := AccountCreatedEvent{firstName, lastName, emailAddress, passwordHash, initialBalance}
  account.Update(event)
  return account
}

The 'attached' Update function being called above will now provide the infrastructure for routing events to event handlers. A function prefixed with 'Handle' and named with the name of the event expected with be called by the infrastructure.

func (account *Account) HandleAccountCreatedEvent(event AccountCreatedEvent) {
  account.EmailAddress = event.EmailAddress
  account.FirstName = event.FirstName
  account.LastName = event.LastName
  account.PasswordHash = event.PasswordHash
}

The above code results in an account object being created with one single pending event namely AccountCreatedEvent. Events will then be persisted once saved to an event sourcing repository. If a repository is created with an event publisher then events saved for the purposes of event sourcing will also be published

persistance := cqrs.NewInMemoryEventStreamRepository()
bus := cqrs.NewInMemoryEventBus()
repository := cqrs.NewRepositoryWithPublisher(persistance, bus)
...
repository.Save(account)
Account Events
type AccountCreatedEvent struct {
  FirstName      string
  LastName       string
  EmailAddress   string
  PasswordHash   []byte
  InitialBalance float64
}

type EmailAddressChangedEvent struct {
  PreviousEmailAddress string
  NewEmailAddress      string
}

type PasswordChangedEvent struct {
  NewPasswordHash []byte
}

type AccountCreditedEvent struct {
  Amount float64
}

type AccountDebitedEvent struct {
  Amount float64
}

Events souring events are raised using the embedded Update function. These events will eventually be published to the read models indirectly via an event bus

func (account *Account) ChangePassword(newPassword string) error {
  if len(newPassword) < 1 {
    return errors.New("Invalid newPassword length")
  }

  hashedPassword, err := GetHashForPassword(newPassword)
  if err != nil {
    panic(err)
  }

  account.Update(PasswordChangedEvent{hashedPassword})

  return nil
}

func (account *Account) HandlePasswordChangedEvent(event PasswordChangedEvent) {
  account.PasswordHash = event.NewPasswordHash
}

Again the calling convention routes our PasswordChangedEvent to the corresponding HandlePasswordChangedEvent instance function

Read Model

Accounts projection
type ReadModelAccounts struct {
  Accounts map[string]*AccountReadModel
}

type AccountReadModel struct {
  ID           string
  FirstName    string
  LastName     string
  EmailAddress string
  Balance      float64
}
Users projection
type UsersModel struct {
  Users    map[string]*User
}

type User struct {
  ID           string
  FirstName    string
  LastName     string
  EmailAddress string
  PasswordHash []byte
}

Infrastructure

There are a number of key elements to the CQRS infrastructure.

  • Event sourcing repository (a repository for event sourcing based business objects)
  • Event publisher (publishes new events to an event bus)
  • Event handler (dispatches received events to call handlers)
  • Command publisher (publishes new commands to a command bus)
  • Command handler (dispatches received commands to call handlers)
Event sourcing and integration events

Nested packages within this repository show example implementations using Couchbase Server and RabbitMQ. The core library includes in-memory implementations for testing and quick prototyping

persistance := cqrs.NewInMemoryEventStreamRepository()
bus := cqrs.NewInMemoryEventBus()
repository := cqrs.NewRepositoryWithPublisher(persistance, bus)

With the infrastructure implementations instantiated a stock event dispatcher is provided to route received events to call handlers

readModel := NewReadModelAccounts()
usersModel := NewUsersModel()

eventDispatcher := cqrs.NewVersionedEventDispatchManager(bus)
eventDispatcher.RegisterEventHandler(AccountCreatedEvent{}, func(event cqrs.VersionedEvent) error {
  readModel.UpdateViewModel([]cqrs.VersionedEvent{event})
  usersModel.UpdateViewModel([]cqrs.VersionedEvent{event})
  return nil
})

We can also register a global handler to be called for all events. This becomes useful when logging system wide events and when our read models are smart enough to filter out irrelevant events

integrationEventsLog := cqrs.NewInMemoryEventStreamRepository()
eventDispatcher.RegisterGlobalHandler(func(event cqrs.VersionedEvent) error {
  integrationEventsLog.SaveIntegrationEvent(event)
  readModel.UpdateViewModel([]cqrs.VersionedEvent{event})
  usersModel.UpdateViewModel([]cqrs.VersionedEvent{event})
  return nil
})

Within your read models the idea is that you implement the updating of your pre-pared read model based upon the incoming event notifications

Commands

Commands are processed by command handlers similar to event handlers. We can make direct changes to our write model and indirect changes to our read models by correctly processing commands and then raising integration events upon command completion.

commandBus := cqrs.NewInMemoryCommandBus()
commandDispatcher := cqrs.NewCommandDispatchManager(commandBus)
RegisterCommandHandlers(commandDispatcher, repository)

Commands can be issued using a command bus. Typically a command is a simple struct. The application layer command struct is then wrapped within a cqrs.Command using the cqrs.CreateCommand helper function

changePasswordCommand := cqrs.CreateCommand(
  ChangePasswordCommand{accountID, "$ThisIsANOTHERPassword"})
commandBus.PublishCommands([]cqrs.Command{changePasswordCommand})

The corresponding command handler for the ChangePassword command plays the role of a DDD aggregate root; responsible for the consistency and lifetime of aggregates and entities within the system)

commandDispatcher.RegisterCommandHandler(ChangePasswordCommand{}, func(command cqrs.Command) error {
  changePasswordCommand := command.Body.(ChangePasswordCommand)
  // Load account from storage
  account, err := NewAccountFromHistory(changePasswordCommand.AccountID, repository)
  if err != nil {
    return err
  }

  account.ChangePassword(changePasswordCommand.NewPassword)

  // Persist new events
  repository.Save(account)  
  return nil
})

As the read models become consistant, within the tests, we check at the end of the test if everything is in sync

if account.EmailAddress != lastEmailAddress {
  t.Fatal("Expected emailaddress to be ", lastEmailAddress)
}

if account.Balance != readModel.Accounts[accountID].Balance {
  t.Fatal("Expected readmodel to be synced with write model")
}

Documentation

Overview

Package cqrs provides a CQRS and Event Sourcing framework written in go influenced by the cqrs journey guide

For a full guide visit http://gitlab.brainloop.com/pkg/cqrs

import "gitlab.brainloop.com/pkg/cqrs"

func NewAccount(firstName string, lastName string, emailAddress string, passwordHash []byte, initialBalance float64) *Account {
  account := new(Account)
  account.EventSourceBased = cqrs.NewEventSourceBased(account)

  event := AccountCreatedEvent{firstName, lastName, emailAddress, passwordHash, initialBalance}
  account.Update(event)
  return account
}

Index

Constants

View Source
const CQRSErrorEventType = "cqrs.ErrorEvent"

CQRSErrorEventType ...

Variables

View Source
var ErrConcurrencyWhenSavingEvents = errors.New("concurrency error saving event")

ErrConcurrencyWhenSavingEvents is raised when a concurrency error has occured when saving events

View Source
var ErrNonePendingWhenSavingEvents = errors.New("no events pending error saving event")

ErrNonePendingWhenSavingEvents is raised when a save is issued but no events are pending for the eventsourced entity.

View Source
var PackageLogger func() Logger = defaultLogger()

Functions

func DeliverCQRSError

func DeliverCQRSError(correlationID string, err error, repo EventSourcingRepository)

DeliverCQRSError will deliver a CQRS error

func NewUUIDString added in v1.0.0

func NewUUIDString() string

NewUUIDString returns a new UUID

Types

type ByCreated

type ByCreated []VersionedEvent

ByCreated is an alias for sorting VersionedEvents by the create field

func (ByCreated) Len

func (c ByCreated) Len() int

func (ByCreated) Less

func (c ByCreated) Less(i, j int) bool

func (ByCreated) Swap

func (c ByCreated) Swap(i, j int)

type Command

type Command struct {
	MessageID     string    `json:"messageID"`
	CorrelationID string    `json:"correlationID"`
	CommandType   string    `json:"commandType"`
	Created       time.Time `json:"time"`
	Body          interface{}
}

Command represents an actor intention to alter the state of the system

func CreateCommand

func CreateCommand(body interface{}) Command

CreateCommand is a helper for creating a new command object with populated default properties

func CreateCommandWithCorrelationID

func CreateCommandWithCorrelationID(body interface{}, correlationID string) Command

CreateCommandWithCorrelationID is a helper for creating a new command object with populated default properties

type CommandBus added in v1.0.0

type CommandBus interface {
	CommandReceiver
	CommandPublisher
}

CommandBus ...

type CommandDispatchManager

type CommandDispatchManager struct {
	// contains filtered or unexported fields
}

CommandDispatchManager is responsible for coordinating receiving messages from command receivers and dispatching them to the command dispatcher.

func NewCommandDispatchManager

func NewCommandDispatchManager(receiver CommandReceiver, registry TypeRegistry) *CommandDispatchManager

NewCommandDispatchManager is a constructor for the CommandDispatchManager

func (*CommandDispatchManager) CommandDispatcher added in v1.0.0

func (m *CommandDispatchManager) CommandDispatcher() CommandDispatcher

CommandDispatcher the internal command dispatcher

func (*CommandDispatchManager) Listen

func (m *CommandDispatchManager) Listen(stop <-chan bool, exclusive bool, listenerCount int) error

Listen starts a listen loop processing channels related to new incoming events, errors and stop listening requests

func (*CommandDispatchManager) RegisterCommandHandler

func (m *CommandDispatchManager) RegisterCommandHandler(command interface{}, handler CommandHandler)

RegisterCommandHandler allows a caller to register a command handler given a command of the specified type being received

func (*CommandDispatchManager) RegisterGlobalHandler

func (m *CommandDispatchManager) RegisterGlobalHandler(handler CommandHandler)

RegisterGlobalHandler allows a caller to register a wildcard command handler call on any command received

type CommandDispatcher

type CommandDispatcher interface {
	DispatchCommand(Command) error
	RegisterCommandHandler(event interface{}, handler CommandHandler)
	RegisterGlobalHandler(handler CommandHandler)
}

CommandDispatcher is responsible for routing commands from the command manager to call handlers responsible for processing received commands

type CommandHandler

type CommandHandler func(Command) error

CommandHandler is a function that takes a command

type CommandPublisher

type CommandPublisher interface {
	PublishCommands([]Command) error
}

CommandPublisher is responsilbe for publishing commands

type CommandReceiver

type CommandReceiver interface {
	ReceiveCommands(CommandReceiverOptions) error
}

CommandReceiver is responsible for receiving commands

type CommandReceiverOptions

type CommandReceiverOptions struct {
	TypeRegistry   TypeRegistry
	Close          chan chan error
	Error          chan error
	ReceiveCommand CommandHandler
	Exclusive      bool
	ListenerCount  int
}

CommandReceiverOptions is an initalization structure to communicate to and from a command receiver go routine

type CommandTransactedAccept

type CommandTransactedAccept struct {
	Command               Command
	ProcessedSuccessfully chan bool
}

CommandTransactedAccept is the message routed from a command receiver to the command manager. Sometimes command receivers designed with reliable delivery require acknowledgements after a message has been received. The success channel here allows for such acknowledgements

type ErrorEvent added in v1.0.0

type ErrorEvent struct {
	Message string
}

ErrorEvent is a generic event raised within the CQRS framework

type EventBus added in v1.0.0

type EventBus interface {
	VersionedEventPublisher
	VersionedEventReceiver
}

EventBus ...

type EventSourceBased

type EventSourceBased struct {
	// contains filtered or unexported fields
}

EventSourceBased provider a base class for aggregate times wishing to contain basis helper functionality for event sourcing

func NewEventSourceBased

func NewEventSourceBased(source interface{}) EventSourceBased

NewEventSourceBased constructor

func NewEventSourceBasedWithID

func NewEventSourceBasedWithID(source interface{}, id string) EventSourceBased

NewEventSourceBasedWithID constructor

func (*EventSourceBased) CallEventHandler

func (s *EventSourceBased) CallEventHandler(event interface{})

CallEventHandler routes an event to an aggregate's event handler

func (*EventSourceBased) Events

func (s *EventSourceBased) Events() []interface{}

Events returns a slice of newly created events since last deserialization

func (*EventSourceBased) ID

func (s *EventSourceBased) ID() string

ID provider the aggregate's ID

func (*EventSourceBased) SetID

func (s *EventSourceBased) SetID(id string)

SetID sets the aggregate's ID

func (*EventSourceBased) SetSource added in v1.0.0

func (s *EventSourceBased) SetSource(source interface{})

SetSource ...

func (*EventSourceBased) SetVersion

func (s *EventSourceBased) SetVersion(version int)

SetVersion sets the aggregate's Version

func (*EventSourceBased) SuggestSaveSnapshot added in v1.0.0

func (s *EventSourceBased) SuggestSaveSnapshot()

SuggestSaveSnapshot records that the aggregate suggests a save of the snapshot upon the next save.

func (*EventSourceBased) Update

func (s *EventSourceBased) Update(versionedEvent interface{})

Update should be called to change the state of an aggregate type

func (*EventSourceBased) Version

func (s *EventSourceBased) Version() int

Version provider the aggregate's Version

func (*EventSourceBased) WantsToSaveSnapshot added in v1.0.0

func (s *EventSourceBased) WantsToSaveSnapshot() bool

WantsToSaveSnapshot returns whether the aggregate suggests to persist a snapshot upon the next save.

type EventSourced

type EventSourced interface {
	ID() string
	SetID(string)
	Version() int
	SetVersion(int)
	Events() []interface{}
	CallEventHandler(event interface{})
	SetSource(interface{})
	WantsToSaveSnapshot() bool
	SuggestSaveSnapshot()
}

EventSourced providers an interface for event sourced aggregate types

type EventSourcingRepository

type EventSourcingRepository interface {
	GetEventStreamRepository() EventStreamRepository
	GetTypeRegistry() TypeRegistry
	Save(EventSourced, string) ([]VersionedEvent, error)
	Get(string, EventSourced) error
	GetSnapshot(id string) (EventSourced, error)
}

EventSourcingRepository is a repository for event source based aggregates

func NewRepository

func NewRepository(eventStreamRepository EventStreamRepository, registry TypeRegistry) EventSourcingRepository

NewRepository constructs an EventSourcingRepository

func NewRepositoryWithPublisher

func NewRepositoryWithPublisher(eventStreamRepository EventStreamRepository, publisher VersionedEventPublisher, registry TypeRegistry) EventSourcingRepository

NewRepositoryWithPublisher constructs an EventSourcingRepository with a VersionedEventPublisher to dispatch events once persisted to the EventStreamRepository

type EventStreamRepository

type EventStreamRepository interface {
	VersionedEventPublicationLogger
	Save(string, []VersionedEvent) error
	Get(string, int) ([]VersionedEvent, error)
	SaveSnapshot(EventSourced) error
	GetSnapshot(string) (EventSourced, error)
}

EventStreamRepository is a persistance layer for events associated with aggregates by ID

type HandlersCache

type HandlersCache map[reflect.Type]func(source interface{}, event interface{})

HandlersCache is a map of types to functions that will be used to route event sourcing events

type InMemoryCommandBus

type InMemoryCommandBus struct {
	// contains filtered or unexported fields
}

InMemoryCommandBus provides an inmemory implementation of the CommandPublisher CommandReceiver interfaces

func NewInMemoryCommandBus

func NewInMemoryCommandBus() *InMemoryCommandBus

NewInMemoryCommandBus constructor

func (*InMemoryCommandBus) PublishCommands

func (bus *InMemoryCommandBus) PublishCommands(commands []Command) error

PublishCommands publishes Commands to the Command bus

func (*InMemoryCommandBus) ReceiveCommands

func (bus *InMemoryCommandBus) ReceiveCommands(options CommandReceiverOptions) error

ReceiveCommands starts a go routine that monitors incoming Commands and routes them to a receiver channel specified within the options

type InMemoryEventBus

type InMemoryEventBus struct {
	// contains filtered or unexported fields
}

InMemoryEventBus provides an inmemory implementation of the VersionedEventPublisher VersionedEventReceiver interfaces

func NewInMemoryEventBus

func NewInMemoryEventBus() *InMemoryEventBus

NewInMemoryEventBus constructor

func (*InMemoryEventBus) PublishEvents

func (bus *InMemoryEventBus) PublishEvents(events []VersionedEvent) error

PublishEvents publishes events to the event bus

func (*InMemoryEventBus) ReceiveEvents

func (bus *InMemoryEventBus) ReceiveEvents(options VersionedEventReceiverOptions) error

ReceiveEvents starts a go routine that monitors incoming events and routes them to a receiver channel specified within the options

type InMemoryEventStreamRepository

type InMemoryEventStreamRepository struct {
	// contains filtered or unexported fields
}

InMemoryEventStreamRepository provides an inmemory event sourcing repository

func NewInMemoryEventStreamRepository

func NewInMemoryEventStreamRepository() *InMemoryEventStreamRepository

NewInMemoryEventStreamRepository constructor

func (*InMemoryEventStreamRepository) AllIntegrationEventsEverPublished

func (r *InMemoryEventStreamRepository) AllIntegrationEventsEverPublished() ([]VersionedEvent, error)

AllIntegrationEventsEverPublished returns all events ever published

func (*InMemoryEventStreamRepository) Get

func (r *InMemoryEventStreamRepository) Get(id string, fromVersion int) ([]VersionedEvent, error)

Get retrieves events assoicated with an event sourced object by ID

func (*InMemoryEventStreamRepository) GetIntegrationEventsByCorrelationID

func (r *InMemoryEventStreamRepository) GetIntegrationEventsByCorrelationID(correlationID string) ([]VersionedEvent, error)

GetIntegrationEventsByCorrelationID returns all integration events with a matching correlationID

func (*InMemoryEventStreamRepository) GetSnapshot added in v1.0.0

GetSnapshot ...

func (*InMemoryEventStreamRepository) Save

func (r *InMemoryEventStreamRepository) Save(id string, newEvents []VersionedEvent) error

Save persists an event sourced object into the repository

func (*InMemoryEventStreamRepository) SaveIntegrationEvent

func (r *InMemoryEventStreamRepository) SaveIntegrationEvent(event VersionedEvent) error

SaveIntegrationEvent persists an integration event

func (*InMemoryEventStreamRepository) SaveSnapshot added in v1.0.0

func (r *InMemoryEventStreamRepository) SaveSnapshot(eventsourced EventSourced) error

SaveSnapshot ...

type Logger added in v1.0.0

type Logger interface {
	Debugf(v ...interface{})
	Println(v ...interface{})
	Printf(format string, v ...interface{})
}

type MapBasedCommandDispatcher

type MapBasedCommandDispatcher struct {
	// contains filtered or unexported fields
}

MapBasedCommandDispatcher is a simple implementation of the command dispatcher. Using a map it registered command handlers to command types

func NewMapBasedCommandDispatcher

func NewMapBasedCommandDispatcher() *MapBasedCommandDispatcher

NewMapBasedCommandDispatcher is a constructor for the MapBasedVersionedCommandDispatcher

func (*MapBasedCommandDispatcher) DispatchCommand

func (m *MapBasedCommandDispatcher) DispatchCommand(command Command) error

DispatchCommand executes all command handlers registered for the given command type

func (*MapBasedCommandDispatcher) RegisterCommandHandler

func (m *MapBasedCommandDispatcher) RegisterCommandHandler(command interface{}, handler CommandHandler)

RegisterCommandHandler allows a caller to register a command handler given a command of the specified type being received

func (*MapBasedCommandDispatcher) RegisterGlobalHandler

func (m *MapBasedCommandDispatcher) RegisterGlobalHandler(handler CommandHandler)

RegisterGlobalHandler allows a caller to register a wildcard command handler call on any command received

type MapBasedVersionedEventDispatcher

type MapBasedVersionedEventDispatcher struct {
	// contains filtered or unexported fields
}

MapBasedVersionedEventDispatcher is a simple implementation of the versioned event dispatcher. Using a map it registered event handlers to event types

func NewVersionedEventDispatcher

func NewVersionedEventDispatcher() *MapBasedVersionedEventDispatcher

NewVersionedEventDispatcher is a constructor for the MapBasedVersionedEventDispatcher

func (*MapBasedVersionedEventDispatcher) DispatchEvent

func (m *MapBasedVersionedEventDispatcher) DispatchEvent(event VersionedEvent) error

DispatchEvent executes all event handlers registered for the given event type

func (*MapBasedVersionedEventDispatcher) RegisterEventHandler

func (m *MapBasedVersionedEventDispatcher) RegisterEventHandler(event interface{}, handler VersionedEventHandler)

RegisterEventHandler allows a caller to register an event handler given an event of the specified type being received

func (*MapBasedVersionedEventDispatcher) RegisterGlobalHandler

func (m *MapBasedVersionedEventDispatcher) RegisterGlobalHandler(handler VersionedEventHandler)

RegisterGlobalHandler allows a caller to register a wildcard event handler call on any event received

type TypeCache

type TypeCache map[string]reflect.Type

TypeCache is a map of strings to reflect.Type structures

type TypeRegistry

type TypeRegistry interface {
	GetHandlers(interface{}) HandlersCache
	GetTypeByName(string) (reflect.Type, bool)
	RegisterAggregate(aggregate interface{}, events ...interface{})
	RegisterEvents(events ...interface{})
	RegisterType(interface{})
}

TypeRegistry providers a helper registry for mapping event types and handlers after performance json serializaton

func NewTypeRegistry

func NewTypeRegistry() TypeRegistry

NewTypeRegistry constructs a new TypeRegistry

type VersionedEvent

type VersionedEvent struct {
	ID            string    `json:"id"`
	CorrelationID string    `json:"correlationID"`
	SourceID      string    `json:"sourceID"`
	Actor         string    `json:"actor"`
	OnBehalfOf    string    `json:"onbehalfof"`
	Version       int       `json:"version"`
	EventType     string    `json:"eventType"`
	Created       time.Time `json:"time"`
	Event         interface{}
}

VersionedEvent represents an event in the past for an aggregate

type VersionedEventDispatchManager

type VersionedEventDispatchManager struct {
	// contains filtered or unexported fields
}

VersionedEventDispatchManager is responsible for coordinating receiving messages from event receivers and dispatching them to the event dispatcher.

func NewVersionedEventDispatchManager

func NewVersionedEventDispatchManager(receiver VersionedEventReceiver, registry TypeRegistry) *VersionedEventDispatchManager

NewVersionedEventDispatchManager is a constructor for the VersionedEventDispatchManager

func (*VersionedEventDispatchManager) Listen

func (m *VersionedEventDispatchManager) Listen(stop <-chan bool, exclusive bool, listenerCount int) error

Listen starts a listen loop processing channels related to new incoming events, errors and stop listening requests

func (*VersionedEventDispatchManager) RegisterEventHandler

func (m *VersionedEventDispatchManager) RegisterEventHandler(event interface{}, handler VersionedEventHandler)

RegisterEventHandler allows a caller to register an event handler given an event of the specified type being received

func (*VersionedEventDispatchManager) RegisterGlobalHandler

func (m *VersionedEventDispatchManager) RegisterGlobalHandler(handler VersionedEventHandler)

RegisterGlobalHandler allows a caller to register a wildcard event handler call on any event received

func (*VersionedEventDispatchManager) VersionedEventDispatcher added in v1.0.0

func (m *VersionedEventDispatchManager) VersionedEventDispatcher() VersionedEventDispatcher

VersionedEventDispatcher the internal versioned event dispatcher

type VersionedEventDispatcher

type VersionedEventDispatcher interface {
	DispatchEvent(VersionedEvent) error
	RegisterEventHandler(event interface{}, handler VersionedEventHandler)
	RegisterGlobalHandler(handler VersionedEventHandler)
}

VersionedEventDispatcher is responsible for routing events from the event manager to call handlers responsible for processing received events

type VersionedEventHandler

type VersionedEventHandler func(VersionedEvent) error

VersionedEventHandler is a function that takes a versioned event

type VersionedEventPublicationLogger

type VersionedEventPublicationLogger interface {
	SaveIntegrationEvent(VersionedEvent) error
	AllIntegrationEventsEverPublished() ([]VersionedEvent, error)
	GetIntegrationEventsByCorrelationID(correlationID string) ([]VersionedEvent, error)
}

VersionedEventPublicationLogger is responsible to retreiving all events ever published to facilitate readmodel reconstruction

type VersionedEventPublisher

type VersionedEventPublisher interface {
	PublishEvents([]VersionedEvent) error
}

VersionedEventPublisher is responsible for publishing events that have been saved to the event store\repository

type VersionedEventReceiver

type VersionedEventReceiver interface {
	ReceiveEvents(VersionedEventReceiverOptions) error
}

VersionedEventReceiver is responsible for receiving globally published events

type VersionedEventReceiverOptions

type VersionedEventReceiverOptions struct {
	TypeRegistry  TypeRegistry
	Close         chan chan error
	Error         chan error
	ReceiveEvent  VersionedEventHandler
	Exclusive     bool
	ListenerCount int
}

VersionedEventReceiverOptions is an initalization structure to communicate to and from an event receiver go routine

type VersionedEventTransactedAccept

type VersionedEventTransactedAccept struct {
	Event                 VersionedEvent
	ProcessedSuccessfully chan bool
}

VersionedEventTransactedAccept is the message routed from an event receiver to the event manager. Sometimes event receivers designed with reliable delivery require acknowledgements after a message has been received. The success channel here allows for such acknowledgements

Directories

Path Synopsis
Package couchbase provides an event sourcing implementation in couchbase for the CQRS and Event Sourcing framework Current version: experimental
Package couchbase provides an event sourcing implementation in couchbase for the CQRS and Event Sourcing framework Current version: experimental
Package rabbit provides an event and command bus for the CQRS and Event Sourcing framework Current version: experimental
Package rabbit provides an event and command bus for the CQRS and Event Sourcing framework Current version: experimental

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL