eventstore

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2020 License: MIT Imports: 7 Imported by: 1

README

GO Event Store

Test Workflow

This Library is an EventStore heavily inspired by the prooph/event-store v7.0.

Provider

  • Postgres
  • InMemory

Implemented:

  • SingleStream Strategy: Create a Stream for each Aggregate
  • Loading and saving Aggregates
  • Persistent Projections
  • ReadModel Projections
  • Event Queries
Projections / Queries
  • You can query and process one or multiple Streams with the fromStream, fromStreams, fromAll API.
  • Fetch all or a subset of Events with an optional MetadataMatcher
  • Create persisted State with an Projector or temporary created State with a Query
  • Fetching multiple streams creates a merged stream and run over the events in historical order

Examples

See the GO EventStore Example Repository for a basic integration in the Gin Framework

Initialize the EventStore
ctx := context.Background()

pool, err := pgxpool.Connect(ctx, DB_URL)
if err != nil {
    fmt.Println(err.Error())
    return
}

// choose your persistence strategy
// at this time only postgres is supported
ps := pg.NewPersistenceStrategy(pool)

// create the event store
es := eventstore.NewEventStore(ps)

// initialize the event store
// creates the event_streams table to persist all created eventstreams
// creates the projection table to store all created persisted projections
err = es.Install(ctx)
if err != nil {
    fmt.Println(err.Error())
    return
}

// add a new eventstream to the event_streams table
// creates a  new table _{sha1 of streamName} for events
err = es.CreateStream(ctx, "foo-stream")
if err != nil {
    fmt.Println(err.Error())
}
const (
	FooStream = "foo-stream"

	FooEventName = "FooEvent"
	BarEventName = "BarEvent"
)

// Each Event is an serializable struct and will be included as Payload in the PersistedEvent
type FooEvent struct {
	Foo string
}

type BarEvent struct {
	Bar string
}

// Create a new Aggregate with the Help of the BaseAggregate struct
type FooAggregate struct {
	eventstore.BaseAggregate

    // custom fields who represent the latest state
	Foo string
}

// An aggregate should have a EventHandler with the schema when${EventName}
// This handler will automaticlly called when an event is recorded
func (f *FooAggregate) WhenFooEvent(e FooEvent, metadata map[string]interface{}) {
	f.Foo = e.Foo
}

// Constructor Function which use the helper constructor function for the BaseAggregate
func NewFooAggregate() *FooAggregate {
	fooAggregate := new(FooAggregate)
	fooAggregate.BaseAggregate = eventstore.NewAggregate(fooAggregate)

	return fooAggregate
}

// Constructor Function which reconstructor an Aggregate with the event history
func NewFooAggregateFromHistory(events eventstore.DomainEventIterator) *FooAggregate {
	aggregate := new(FooAggregate)
	aggregate.BaseAggregate = eventstore.NewAggregate(aggregate)
	aggregate.FromHistory(events)

	return aggregate
}
Load and save Aggregates with an Repositories
type FooRepository struct {
	rootRepo eventstore.Repository
}

// use the history constructor to load an existing Aggregate
func (r FooRepository) Get(ctx context.Context, fooID uuid.UUID) (*FooAggregate, error) {
	events, err := r.rootRepo.GetAggregate(ctx, fooID)
	if err != nil {
		return nil, err
	}

	return NewFooAggregateFromHistory(events), nil
}

// persist all new recorded events
func (r FooRepository) Save(ctx context.Context, foo *FooAggregate) error {
	return r.rootRepo.SaveAggregate(ctx, foo)
}

func NewFooRepository(streamName string, eventStore eventstore.EventStore) FooRepository {
	return FooRepository{
		rootRepo: eventstore.NewRepository(streamName, eventStore),
	}
}
Use it all together
err = es.CreateStream(ctx, FooStream)
if err != nil {
    fmt.Println(err.Error())
    return
}

// register all existing aggregates and events
typeRegistry := eventstore.NewTypeRegistry()
typeRegistry.RegisterAggregate(FooAggregate{})
typeRegistry.RegisterEvents(FooEvent{})

// create a new instance of an aggregate
fooAggregate := NewFooAggregate()

// record new events with the RecordThat method, record additional metadata to your event
fooAggregate.RecordThat(FooEvent{Foo: "Bar"}, nil)
fooAggregate.RecordThat(FooEvent{Foo: "Baz"}, map[string]interface{}{"meta":"data"})

// create a repository and save the aggregate
repo := NewFooRepository(FooStream, es)
err = repo.Save(ctx, fooAggregate)
if err != nil {
    fmt.Println(err.Error())
    return
}

// reload it from the database
result, err := repo.Get(ctx, fooAggregate.AggregateID())
if err != nil {
    fmt.Println(err.Error())
    return
}

Create and using Queries and persisted Projections

Query from an EventStream
// Register existing aggregates and events
typeRegistry := eventstore.NewTypeRegistry()
typeRegistry.RegisterAggregate(FooAggregate{})
typeRegistry.RegisterEvents(FooEvent{}, BarEvent{})

ps := pg.NewPersistenceStrategy(pool)
es := eventstore.NewEventStore(ps)

query := eventstore.NewQuery(es)
err = query.
    // read from a single stream
    FromStream(FooStream, []eventstore.MetadataMatch{}).
    // init your state
    Init(func() interface{} {
        return []string{}
    }).
    // define a callback for each possible event
    // events without a handler will be ignored
    // with WhenAny you can also define a single handler for all events
    // Key of the Map is the EventName without package
    // A handler receives the current state as first argument and the wrapped event as second argument
    // it returns the new state which will be the first argument of the next handler call
    // Access your EventStruct with event.Payload(), you can access additional information from the persistedEvent wrapper like
    //  CreatedAt
    //  AggregateID
    //  AggregateType
    //  Unique EventID (UUID)
    //  Custom Metadata
    When(map[string]func(state interface{}, event eventstore.DomainEvent) interface{} {
        FooEventName: func(state interface{}, event eventstore.DomainEvent) interface{} {
            return append(state.([]string), event.Payload().(FooEvent).Foo)
        },
        BarEventName: func(state interface{}, event eventstore.DomainEvent) interface{} {
            return append(state.([]string), event.Payload().(BarEvent).Bar)
        },
    }).
    Run(ctx)

if err != nil {
    fmt.Println(err)
    return
}

// Access the result of the Query
fmt.Println(query.State())
ReadModels from an EventStream
Define an ReadModel

You can use the helper Client to execute DB Operations

type FooReadModel struct {
	client *pg.Client
	stack  []struct {
		method string
		args   []map[string]interface{}
	}
}

func (f *FooReadModel) Init(ctx context.Context) error {
	_, err := f.client.Conn().(*pgxpool.Pool).Exec(ctx, fmt.Sprintf(`
		CREATE TABLE %s (
			id UUID NOT NULL,
			aggregate_id UUID NOT NULL,
			value VARCHAR(20) NOT NULL,
			PRIMARY KEY (id)
		)`, FooReadModelTable))

	return err
}

func (f *FooReadModel) IsInitialized(ctx context.Context) (bool, error) {
	return f.client.Exists(ctx, FooReadModelTable)
}

func (f *FooReadModel) Reset(ctx context.Context) error {
	return f.client.Reset(ctx, FooReadModelTable)
}

func (f *FooReadModel) Delete(ctx context.Context) error {
	return f.client.Delete(ctx, FooReadModelTable)
}

func (f *FooReadModel) Stack(method string, args ...map[string]interface{}) {
	f.stack = append(f.stack, struct {
		method string
		args   []map[string]interface{}
	}{method: method, args: args})
}

func (f *FooReadModel) Persist(ctx context.Context) error {
	var err error
	for _, command := range f.stack {
		switch command.method {
		case "insert":
			err = f.client.Insert(ctx, FooReadModelTable, command.args[0])
			if err != nil {
				return err
			}
		case "remove":
			err = f.client.Remove(ctx, FooReadModelTable, command.args[0])
			if err != nil {
				return err
			}
		case "update":
			err = f.client.Update(ctx, FooReadModelTable, command.args[0], command.args[1])
			if err != nil {
				return err
			}
		}
	}

	f.stack = make([]struct {
		method string
		args   []map[string]interface{}
	}, 0)

	return err
}

func NewFooReadModel(client *pg.Client) *FooReadModel {
	return &FooReadModel{client: client}
}
Create an ReadModel Projection

Create a new DB table "app_foo" and fill it with your event data

typeRegistry := eventstore.NewTypeRegistry()
typeRegistry.RegisterAggregate(&FooAggregate{}, FooEvent{}, BarEvent{})

ps := pg.NewPersistenceStrategy(pool)
es := eventstore.NewEventStore(ps)
pm := pg.NewProjectionManager(pool)

client := pg.NewClient(pool)
rm := NewFooReadModel(client)

projector := eventstore.NewReadModelProjector("foo_read_model_projection", &rm, &es, &pm)
err = projector.
    FromStream(FooStream, []eventstore.MetadataMatch{}).
    Init(func() interface{} {
        return struct{}{}
    }).
    When(map[string]func(state interface{}, event eventstore.DomainEvent) interface{}{
        FooEventName: func(state interface{}, event eventstore.DomainEvent) interface{} {
            // persist a new entry to your ReadModel
            projector.ReadModel.Stack(
                "insert",
                map[string]interface{}{
                    "id":           event.UUID().String(),
                    "aggregate_id": event.AggregateID().String(),
                    "value":        event.Payload().(FooEvent).Foo,
                },
            )

            return state
        },
        BarEventName: func(state interface{}, event eventstore.DomainEvent) interface{} {
            projector.ReadModel.Stack(
                "insert",
                map[string]interface{}{
                    "id":           event.UUID().String(),
                    "aggregate_id": event.AggregateID().String(),
                    "value":        event.Payload().(BarEvent).Bar,
                },
            )

            return state
        },
    }).
    Run(ctx, false)

For more informations checkout the scripts in example or any *_test.go files

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CopyMap

func CopyMap(m map[string]interface{}) map[string]interface{}

CopyMap is a helper function to copy a map to create immutable events with all methods

func FindStatusInSlice

func FindStatusInSlice(slice []Status, val Status) (int, bool)

FindStatusInSlice returns if the given status is in the given status slice

func ProjectorAlreadyInitialized

func ProjectorAlreadyInitialized() string

ProjectorAlreadyInitialized panics if you call Init on an Projection twice

func ProjectorFromWasAlreadyCalled

func ProjectorFromWasAlreadyCalled() string

ProjectorFromWasAlreadyCalled panics if you call more then one of the available From Methods (FromStream, FromAll, FromStreams)

func ProjectorNoHandler

func ProjectorNoHandler() string

ProjectorNoHandler panics if you run a Projection without defining a Handler

func ProjectorStateNotInitialised

func ProjectorStateNotInitialised() string

ProjectorStateNotInitialised panics if you don't call Init to initialise the ProjectionState

Types

type Aggregate

type Aggregate interface {
	PopEvents() []DomainEvent
	AggregateID() uuid.UUID
	FromHistory(events DomainEventIterator) error
}

Aggregate define the Basic Methods required by any Implementation

type BaseAggregate

type BaseAggregate struct {
	// contains filtered or unexported fields
}

BaseAggregate provide basic functionallity to simplify Aggregate handling Like creation, recreation and loading of Aggregates

func NewAggregate

func NewAggregate(source interface{}) BaseAggregate

NewAggregate is a Constructor Function to create a new BaseAggregate

func (BaseAggregate) AggregateID

func (a BaseAggregate) AggregateID() uuid.UUID

AggregateID is a unique identifier for an Aggregate instance All Aggregate Events are grouped by this UUID

func (*BaseAggregate) Apply

func (a *BaseAggregate) Apply(event DomainEvent)

Apply add a already wrapped Event in an DomainEvent to the Stack With this Method you have more control of the wrapped DomainEvent

func (*BaseAggregate) CallEventHandler

func (a *BaseAggregate) CallEventHandler(event interface{}, metadata map[string]interface{})

CallEventHandler is an internal Method who calls the related Handler Method of the Aggregate after a new Event was recorded The Method has to have the Schema When{EventName}(event EventStruct) -> an example: func (f *FooAggregate) WhenFooEvent(e FooEvent, metadata map[string]interface{})

func (*BaseAggregate) FromHistory

func (a *BaseAggregate) FromHistory(events DomainEventIterator) error

FromHistory recreate the latest state of an existing Aggregate by its recorded Events

func (*BaseAggregate) PopEvents

func (a *BaseAggregate) PopEvents() []DomainEvent

PopEvents return and clear all new and not persisted events of the stack

func (*BaseAggregate) RecordThat

func (a *BaseAggregate) RecordThat(event interface{}, metadata map[string]interface{})

RecordThat add a new Event to the EventStream of an Aggregate instance {event} represent a basic struct with the Event payload {metadata} is an map with additional informations and can be used to filter the EventStream in Projections or Queries

func (BaseAggregate) Version added in v0.2.1

func (a BaseAggregate) Version() int

Version returns the current Version of the Aggregate

type Client

type Client interface {
	// Conn is the underlying Storage Connection like pgx.Pool for Postgres
	Conn() interface{}

	// Exists the given collection (table)
	Exists(ctx context.Context, collection string) (bool, error)
	// Delete the given collection (table)
	Delete(ctx context.Context, collection string) error
	// Reset (truncate) the given collection (table)
	Reset(ctx context.Context, collection string) error

	// Insert a new item into the collection, the map key represent the storage column
	Insert(ctx context.Context, collection string, values map[string]interface{}) error
	// Remove all items from the collection by the given identifiers, the map key represent the storage column
	Remove(ctx context.Context, collection string, identifiers map[string]interface{}) error
	// Update all matching items from the collection with the new values, the map key represent the storage column
	Update(ctx context.Context, collection string, values map[string]interface{}, identifiers map[string]interface{}) error
}

Client is a Helper to execute simple commands on a ReadModel

type DomainEvent added in v0.1.3

type DomainEvent struct {
	// contains filtered or unexported fields
}

DomainEvent represent a Wrapper for DomainEvents to add meta informations It is used to simplify the handling of adding and recreate new DomainEvents

func NewDomainEvent added in v0.1.3

func NewDomainEvent(aggregateID uuid.UUID, payload interface{}, metadata map[string]interface{}, createdAt time.Time) DomainEvent

NewDomainEvent creates a new Event and set default vaues like Metadata Creates the EventID

func (DomainEvent) AggregateID added in v0.1.3

func (e DomainEvent) AggregateID() uuid.UUID

AggregateID of the related Aggregate

func (DomainEvent) AggregateType added in v0.1.3

func (e DomainEvent) AggregateType() string

AggregateType of the Event

func (DomainEvent) CreatedAt added in v0.1.3

func (e DomainEvent) CreatedAt() time.Time

CreatedAt is the creation DateTime of the Event

func (DomainEvent) Metadata added in v0.1.3

func (e DomainEvent) Metadata() map[string]interface{}

Metadata of the Event like the related AggregateID

func (DomainEvent) Name added in v0.1.3

func (e DomainEvent) Name() string

Name of the DomainEvent

func (DomainEvent) Number added in v0.1.3

func (e DomainEvent) Number() int

Number of the event in the complete EventStream (over multiple aggregates)

func (DomainEvent) Payload added in v0.1.3

func (e DomainEvent) Payload() interface{}

Payload is the original recorded DomainEventStruct

func (DomainEvent) SingleMetadata added in v0.1.3

func (e DomainEvent) SingleMetadata(key string) (interface{}, bool)

SingleMetadata from the Metadata Map

func (DomainEvent) UUID added in v0.1.3

func (e DomainEvent) UUID() uuid.UUID

UUID to identify a single Event

func (DomainEvent) Version added in v0.1.3

func (e DomainEvent) Version() int

Payload is the original recorded DomainEventStruct It is set and modified as Integer Postgres as persistence layout read the value as float64 to it has to be converted in this scenarios

func (DomainEvent) WithAddedMetadata added in v0.1.3

func (e DomainEvent) WithAddedMetadata(name string, value interface{}) DomainEvent

WithAddedMetadata create a copy of the event with the given additional Metadata

func (DomainEvent) WithAggregateType added in v0.1.3

func (e DomainEvent) WithAggregateType(aType string) DomainEvent

WithAggregateType create a copy of the event with the given AggregateType

func (DomainEvent) WithNumber added in v0.1.3

func (e DomainEvent) WithNumber(number int) DomainEvent

WithNumber create a copy of the event with the given number Is used to update the Number after adding to the EventStream or to recreate an existing Event

func (DomainEvent) WithUUID added in v0.1.3

func (e DomainEvent) WithUUID(uuid uuid.UUID) DomainEvent

WithUUID create a copy of the event with the given EventID Used to recreate a existing event from the underlying persistence storage

func (DomainEvent) WithVersion added in v0.1.3

func (e DomainEvent) WithVersion(v int) DomainEvent

WithVersion create a copy of the event with the given Version

type DomainEventAction added in v0.3.0

type DomainEventAction = string
const (
	PreAppend DomainEventAction = "PRE_APPEND"
	Appended  DomainEventAction = "APPENDED"
	Loaded    DomainEventAction = "LOADED"
)

type DomainEventIterator added in v0.2.0

type DomainEventIterator interface {
	// Next turns the cursor to the next DomainEvent
	Next() bool
	// Current returns the current selected DomainEvent in the List or a related error
	Current() (*DomainEvent, error)
	// Error returns the latest error
	Error() error
	// Rewind the iterator cursor
	Rewind()
	// Close removes all fetched events and resets the cursor
	Close()
	// IsEmpty checks for any DomainEvent in the Iterator
	IsEmpty() (bool, error)
	// Converts the Iterator to an list of DomainEvents
	ToList() ([]DomainEvent, error)
}

DomainEventIterator is a lazy loading Iterator It fetches DomainEvents in 1000er steps until all events are loaded

type DomainEventMiddleware added in v0.3.0

type DomainEventMiddleware = func(ctx context.Context, event DomainEvent) (DomainEvent, error)

type EventHandler

type EventHandler = func(state interface{}, event DomainEvent) interface{}

EventHandler process a single Event and returns the new ProjectionState The First argument is the current state of the projection The Second argument is the loaded event

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

EventStore represent the connection to our EventStream with the selected PersistenceStrategy

func NewEventStore

func NewEventStore(strategy PersistenceStrategy) *EventStore

NewEventStore creates a BasicEventStore with the selected PersistenceStrategy

func (*EventStore) AppendMiddleware added in v0.3.0

func (es *EventStore) AppendMiddleware(action DomainEventAction, middleware DomainEventMiddleware)

Append a middleware to one of the existing Actions PreAppend, Appended, Loaded

func (*EventStore) AppendTo

func (es *EventStore) AppendTo(ctx context.Context, streamName string, events []DomainEvent) error

AppendTo appends a list of events to the given EventStream

func (*EventStore) CreateStream

func (es *EventStore) CreateStream(ctx context.Context, streamName string) error

CreateStream creates a new EventStream

func (*EventStore) DeleteStream

func (es *EventStore) DeleteStream(ctx context.Context, streamName string) error

DeleteStream deletes an existing EventStream

func (*EventStore) FetchAllStreamNames

func (es *EventStore) FetchAllStreamNames(ctx context.Context) ([]string, error)

FetchAllStreamNames returns a list of all existing EventStreams

func (*EventStore) HasStream

func (es *EventStore) HasStream(ctx context.Context, streamName string) (bool, error)

HasStream return if a EventStream with the given name exists

func (*EventStore) Install

func (es *EventStore) Install(ctx context.Context) error

Install creates the EventStreams and Projections Table if not Exists

func (*EventStore) Load

func (es *EventStore) Load(ctx context.Context, streamName string, fromNumber, count int, matcher MetadataMatcher) (DomainEventIterator, error)

Load Events from the given EventStream from the given Number, restrictable by a count (Limit) and custom filter (MetadataMatcher)

func (*EventStore) MergeAndLoad

func (es *EventStore) MergeAndLoad(ctx context.Context, count int, streams ...LoadStreamParameter) (DomainEventIterator, error)

MergeAndLoad can load Events from multiple Stream merged and sorted by the historical order The Result could also be restriced by count and MetadataMatcher

type FieldType

type FieldType string

Enum for possible categories of fields to filter EventStreams

const (
	MetadataField        FieldType = "metadata"
	MessagePropertyField FieldType = "message_property"
)

type HandlersCache

type HandlersCache map[reflect.Type]func(source interface{}, event interface{}, metadata map[string]interface{})

type LoadStreamParameter

type LoadStreamParameter struct {
	StreamName string
	FromNumber int
	Matcher    MetadataMatcher
}

type MetadataMatch

type MetadataMatch struct {
	// Field name to filter
	// For MessagePropertyField possible values are "event_name", "created_at", "uuid"
	// For MetadataField its the name of the filtered metadata like "_aggregate_id"
	Field string
	// Value to filter
	Value interface{}
	// Operation to execute like EqualsOperator
	// Example for MetadataField and EqualsOperator checks if MetadataMatch.Value = event.metadata[MetadataMatch.Field]
	Operation MetadataOperator
	// FieldType to filter
	FieldType FieldType
}

MetadataMatch is a struct to filter an EventStream by Metadata or EventProperties Like EventName, AggregateID, Version, CreatedAt

type MetadataMatcher

type MetadataMatcher = []MetadataMatch

MetadataMatcher alias of a List of MetadataMatch

type MetadataOperator

type MetadataOperator string

Enum for possible Operators to filter EventStreams

const (
	EqualsOperator            MetadataOperator = "="
	NotEqualsOperator         MetadataOperator = "!="
	GreaterThanOperator       MetadataOperator = ">"
	GreaterThanEqualsOperator MetadataOperator = ">="
	InOperator                MetadataOperator = "in"
	NotInOperator             MetadataOperator = "nin"
	LowerThanOperator         MetadataOperator = "<"
	LowerThanEuqalsOperator   MetadataOperator = "<="
	RegexOperator             MetadataOperator = "regex"
)

type MiddlewareIterator added in v0.6.0

type MiddlewareIterator struct {
	// contains filtered or unexported fields
}

func NewMiddlewareIterator added in v0.6.0

func NewMiddlewareIterator(ctx context.Context, iterator DomainEventIterator, middleware []DomainEventMiddleware) MiddlewareIterator

func (MiddlewareIterator) Close added in v0.6.0

func (m MiddlewareIterator) Close()

func (MiddlewareIterator) Current added in v0.6.0

func (m MiddlewareIterator) Current() (*DomainEvent, error)

func (MiddlewareIterator) Error added in v0.6.0

func (m MiddlewareIterator) Error() error

func (MiddlewareIterator) IsEmpty added in v0.6.0

func (m MiddlewareIterator) IsEmpty() (bool, error)

func (MiddlewareIterator) Next added in v0.6.0

func (m MiddlewareIterator) Next() bool

func (MiddlewareIterator) Rewind added in v0.6.0

func (m MiddlewareIterator) Rewind()

func (MiddlewareIterator) ToList added in v0.6.0

func (m MiddlewareIterator) ToList() ([]DomainEvent, error)

type PersistenceStrategy

type PersistenceStrategy interface {
	// FetchAllStreamNames returns a list of all existing EventStreams
	FetchAllStreamNames(ctx context.Context) ([]string, error)
	// CreateEventStreamsTable creates a DB Table / Collection to manage all existing EventStreams
	CreateEventStreamsTable(context.Context) error
	// CreateProjectionsTable creates a DB Table / Collection to manage all existing Projections
	CreateProjectionsTable(context.Context) error
	// AddStreamToStreamsTable adds a new EventStream to the EventStreams Table / Collection
	AddStreamToStreamsTable(ctx context.Context, streamName string) error
	// RemoveStreamFromStreamsTable removes a EventStream from the EventStreams Table / Collection
	RemoveStreamFromStreamsTable(ctx context.Context, streamName string) error
	// DeleteStream deletes the EventStream from the EventStreams Table / Collection and deletes the related EventStream Table / Collection
	DeleteStream(ctx context.Context, streamName string) error
	// CreateSchema creates a new EventStream Table / Collection which is used to persist all related Events
	CreateSchema(ctx context.Context, streamName string) error
	// DropSchema removes a EventStream Table / Collection with all included Events
	DropSchema(ctx context.Context, streamName string) error
	// HasStream return if a EventStream with the given name exists
	HasStream(ctx context.Context, streamName string) (bool, error)
	// AppendTo appends multiple events to the given EventStream
	AppendTo(ctx context.Context, streamName string, events []DomainEvent) error
	// Load Events from the given EventStream from the given Number, restrictable by a count (Limit) and custom filter (MetadataMatcher)
	Load(ctx context.Context, streamName string, fromNumber int, count int, matcher MetadataMatcher) (DomainEventIterator, error)
	// MergeAndLoad can load Events from multiple Stream merged and sorted by the historical order
	// The Result could also be restriced by count and MetadataMatcher
	MergeAndLoad(ctx context.Context, count int, streams ...LoadStreamParameter) (DomainEventIterator, error)
}

PersistenceStrategy defines an Interface needed for an underlying PersistentStorage Current implementations are Postgres and InMemory

type ProjectionManager

type ProjectionManager interface {
	// FetchProjectionStatus returns the active status of the given projection
	FetchProjectionStatus(ctx context.Context, projectionName string) (Status, error)
	// CreateProjection creates a new projections entry in the projections table
	CreateProjection(ctx context.Context, projectionName string, state interface{}, status Status) error
	// DeleteProjection deletes a projection entry from the projections table
	DeleteProjection(ctx context.Context, projectionName string) error
	// ResetProjection resets state and positions from the given projection
	ResetProjection(ctx context.Context, projectionName string, state interface{}) error
	// PersistProjection persists the current state and position of the given projection
	PersistProjection(ctx context.Context, projectionName string, state interface{}, streamPositions map[string]int) error
	// LoadProjection loads latest state and positions of the given projection
	LoadProjection(ctx context.Context, projectionName string) (map[string]int, interface{}, error)
	// UpdateProjectionStatus updates the status of a given projection
	UpdateProjectionStatus(ctx context.Context, projectionName string, status Status) error
	// ProjectionExists returns if a projection with the given name exists
	ProjectionExists(ctx context.Context, projectionName string) (bool, error)
}

ProjectionManager manages the Projections Table / Collection and hase multiple implementations for different persistens layers

type ProjectionNotFound

type ProjectionNotFound struct {
	Name string
}

ProjectionNotFound is returned if you try to delete / reset or load a none existing Projection

func (ProjectionNotFound) Error

func (e ProjectionNotFound) Error() string

type Projector

type Projector struct {
	// contains filtered or unexported fields
}

Projector creates a persistened projection of one or multiple streams

func NewProjector

func NewProjector(name string, eventStore *EventStore, manager ProjectionManager) Projector

NewProjector create a new Projector to configure and run a new projection Define your prefered persistence storage with the ProjectionManager (at this time only Postgres is supported :-D)

func (*Projector) Delete

func (q *Projector) Delete(ctx context.Context, deleteEmittedEvents bool) error

Delete the Projection from the Projections table / collection and if deleteEmittedEvents is true Also if exists the related Emit-EventStream

func (*Projector) Emit

func (q *Projector) Emit(ctx context.Context, event DomainEvent) error

Emit creates a new EventStream with the name of the Projection And append the event to this new EventStream

func (*Projector) FromAll

func (q *Projector) FromAll() *Projector

FromAll read events from all existing EventStreams

func (*Projector) FromStream

func (q *Projector) FromStream(streamName string, matcher MetadataMatcher) *Projector

FromStream read events from a single EventStream

func (*Projector) FromStreams

func (q *Projector) FromStreams(streams ...StreamProjection) *Projector

FromStreams read events from multiple EventStreams

func (*Projector) Init

func (q *Projector) Init(handler func() interface{}) *Projector

Init the state, define the type and/or prefill it with data

func (*Projector) LinkTo

func (q *Projector) LinkTo(ctx context.Context, streamName string, event DomainEvent) error

LinkTo append the event to a given EventStream

func (Projector) Name

func (q Projector) Name() string

Name of the Projection

func (*Projector) Reset

func (q *Projector) Reset(ctx context.Context) error

Reset the Projection state and EventStream positions

func (*Projector) Run

func (q *Projector) Run(ctx context.Context, keepRunning bool) error

Run the Projection

func (Projector) State

func (q Projector) State() interface{}

State returns the current Projection State

func (Projector) Status

func (q Projector) Status() Status

Status of the Projection

func (*Projector) Stop

func (q *Projector) Stop(ctx context.Context) error

Stop the Projection and persist the current state and EventStream positions

func (*Projector) When

func (q *Projector) When(handlers map[string]EventHandler) *Projector

When define multiple handlers for You can create one handler for one event Events without a handler will not be progressed

func (*Projector) WhenAny

func (q *Projector) WhenAny(handler EventHandler) *Projector

WhenAny defines a single handler for all possible Events

type Query

type Query struct {
	Status Status
	// contains filtered or unexported fields
}

Query custom informations from your EventStream Queries are not persisted, they provide the latest state after running

func NewQuery

func NewQuery(eventStore *EventStore) Query

NewQuery for the given EventStore

func (*Query) FromAll

func (q *Query) FromAll() *Query

FromAll read events from all existing EventStreams

func (*Query) FromStream

func (q *Query) FromStream(streamName string, matcher MetadataMatcher) *Query

FromStream read events from a single EventStream

func (*Query) FromStreams

func (q *Query) FromStreams(streams ...StreamProjection) *Query

FromStreams read events from multiple EventStreams

func (*Query) Init

func (q *Query) Init(handler func() interface{}) *Query

Init the state, define the type and/or prefill it with data

func (*Query) Reset

func (q *Query) Reset()

Reset the query state and EventStream positions

func (*Query) Run

func (q *Query) Run(ctx context.Context) error

Run the Query

func (Query) State

func (q Query) State() interface{}

State returns the current query State

func (*Query) Stop

func (q *Query) Stop()

Stop the query

func (*Query) When

func (q *Query) When(handlers map[string]func(state interface{}, event DomainEvent) interface{}) *Query

When define multiple handlers for You can create one handler for one event Events without a handler will not be progressed

func (*Query) WhenAny

func (q *Query) WhenAny(handler func(state interface{}, event DomainEvent) interface{}) *Query

WhenAny defines a single handler for all possible Events

type ReadModel

type ReadModel interface {
	// Init your ReadModel, for example create the DB Table
	Init(ctx context.Context) error
	// Check if your ReadModel was already initialized, for example if DB Table already exists
	IsInitialized(ctx context.Context) (bool, error)
	// Reset your ReadModel
	Reset(ctx context.Context) error
	// Delete your ReadModel
	Delete(ctx context.Context) error
	// Stack add a new command to you ReadModel
	Stack(method string, args ...map[string]interface{})
	// Persist the current State of your ReadModel, executes all stacked commands
	Persist(ctx context.Context) error
}

ReadModel is a custom ReadModel of your DomainEvents and could be represented and peristed in many different forms For Example as DB Table in your Database or as cached files Example implementation in example/read_model.go

type ReadModelProjector

type ReadModelProjector struct {
	ReadModel ReadModel
	// contains filtered or unexported fields
}

Projector creates a custom ReadModel over one or multiple streams

func NewReadModelProjector

func NewReadModelProjector(name string, readModel ReadModel, eventStore *EventStore, manager ProjectionManager) ReadModelProjector

NewReadModelProjector for the given ReadModel implementation, EventStore and ProjectionManager Find an example for a ReadModel in example/read_model.go

func (*ReadModelProjector) Delete

func (q *ReadModelProjector) Delete(ctx context.Context, deleteProjection bool) error

Delete the ReadModelProjection from the Projections table / collection and if deleteProjection is true it also runs the Delete Method of your ReadModel

func (*ReadModelProjector) FromAll

func (q *ReadModelProjector) FromAll() *ReadModelProjector

FromAll read events from all existing EventStreams

func (*ReadModelProjector) FromStream

func (q *ReadModelProjector) FromStream(streamName string, matcher MetadataMatcher) *ReadModelProjector

FromStream read events from a single EventStream

func (*ReadModelProjector) FromStreams

func (q *ReadModelProjector) FromStreams(streams ...StreamProjection) *ReadModelProjector

FromStreams read events from multiple EventStreams

func (*ReadModelProjector) Init

func (q *ReadModelProjector) Init(handler func() interface{}) *ReadModelProjector

Init the state, define the type and/or prefill it with data

func (ReadModelProjector) Name

func (q ReadModelProjector) Name() string

Name returns the current ReadModelProjection Name

func (*ReadModelProjector) Reset

func (q *ReadModelProjector) Reset(ctx context.Context) error

Reset the ReadModelProjection state and EventStream positions Run also the Reset Method of your ReadModel

func (*ReadModelProjector) Run

func (q *ReadModelProjector) Run(ctx context.Context, keepRunning bool) error

Run the ReadModelProjection

func (ReadModelProjector) State

func (q ReadModelProjector) State() interface{}

State returns the current ReadModelProjection State

func (ReadModelProjector) Status

func (q ReadModelProjector) Status() Status

Status returns the current ReadModelProjection Status

func (*ReadModelProjector) Stop

func (q *ReadModelProjector) Stop(ctx context.Context) error

Stop the ReadModelProjection and persist the current state and EventStream positions

func (*ReadModelProjector) When

When define multiple handlers for You can create one handler for one event Events without a handler will not be progressed

func (*ReadModelProjector) WhenAny

func (q *ReadModelProjector) WhenAny(handler EventHandler) *ReadModelProjector

WhenAny defines a single handler for all possible Events

type Repository

type Repository struct {
	// Stream releated to your AggregateType
	Stream string
	// contains filtered or unexported fields
}

Repository for an AggregateType

func NewRepository

func NewRepository(streamName string, eventStore *EventStore) Repository

NewRepository creates a Repository

func (Repository) GetAggregate

func (r Repository) GetAggregate(ctx context.Context, aggregateID uuid.UUID) (DomainEventIterator, error)

GetAggregate returns a list of all persisted events of a single Aggregate, grouped by the AggregateID, in historical order

func (Repository) SaveAggregate

func (r Repository) SaveAggregate(ctx context.Context, aggregate Aggregate) error

SaveAggregate persist all new Events to the EventStore

type Status

type Status string

Enum of all possible ProjectionStatus

const (
	StatusIdle                        Status = "idle"
	StatusRunning                     Status = "running"
	StatusStopping                    Status = "stopping"
	StatusResetting                   Status = "resetting"
	StatusDeleting                    Status = "deleting"
	StatusDeletingIinclEmittedEevents Status = "deleting incl emitted events"
)

type StreamAlreadyExist

type StreamAlreadyExist struct {
	Stream string
}

StreamAlreadyExist is returned when you create an already existing EventStream

func (StreamAlreadyExist) Error

func (e StreamAlreadyExist) Error() string

type StreamNotFound

type StreamNotFound struct {
	Stream string
}

StreamNotFound is returned if you try to delete / reset or load a none existing EventStream

func (StreamNotFound) Error

func (e StreamNotFound) Error() string

type StreamProjection

type StreamProjection struct {
	// StreamName of the EventStream
	StreamName string
	// Matcher a optional list of custom filters
	Matcher MetadataMatcher
}

StreamProjection is used if you want to Projection over different Streams with different Filters

type TypeCache

type TypeCache map[string]reflect.Type

type TypeRegistry

type TypeRegistry interface {
	GetHandlers(interface{}) HandlersCache
	GetTypeByName(string) (reflect.Type, bool)
	RegisterAggregate(aggregate interface{}, events ...interface{})
	RegisterEvents(events ...interface{})
	RegisterType(interface{})
}

TypeRegistry register all existing Aggregates and Events for dynamic Type Assertions and Conversions

func NewTypeRegistry

func NewTypeRegistry() TypeRegistry

NewTypeRegistry constructs a new TypeRegistry

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL