event

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2023 License: MIT Imports: 5 Imported by: 5

Documentation

Overview

Package event contains types and implementations for dealing with Domain Events.

Index

Constants

View Source
const DefaultRunnerBufferSize = 32

DefaultRunnerBufferSize is the default size for the buffered channels opened by a ProcessorRunner instance, if not specified.

Variables

This section is empty.

Functions

This section is empty.

Types

type Appender

type Appender interface {
	Append(ctx context.Context, id StreamID, expected version.Check, events ...Envelope) (version.Version, error)
}

Appender is an event.Store trait used to append new Domain Events in the Event Stream.

type Envelope

type Envelope message.GenericEnvelope

Envelope contains a Domain Event and possible metadata associated to it.

Due to lack of sum types (a.k.a enum types), Events cannot currently take advantage of the new generics feature introduced with Go 1.18.

func ToEnvelope

func ToEnvelope(event Event) Envelope

ToEnvelope returns an Envelope instance with the provided Event instance and no Metadata.

func ToEnvelopes

func ToEnvelopes(events ...Event) []Envelope

ToEnvelopes returns a list of Envelopes from a list of Events. The returned Envelopes have no Metadata.

type Event

type Event message.Message

Event is a Message representing some Domain information that has happened in the past, which is of vital information to the Domain itself.

Event type names should be phrased in the past tense, to enforce the notion of "information happened in the past".

type FusedStore

type FusedStore struct {
	Appender
	Streamer
}

FusedStore is a convenience type to fuse multiple Event Store interfaces where you might need to extend the functionality of the Store only partially.

E.g. You might want to extend the functionality of the Append() method, but keep the Streamer methods the same.

If the extension wrapper does not support the Streamer interface, you cannot use the extension wrapper instance as an Event Store in certain cases (e.g. the Aggregate Repository).

Using a FusedStore instance you can fuse both instances together, and use it with the rest of the library ecosystem.

type Persisted

type Persisted struct {
	StreamID
	version.Version
	Envelope
}

Persisted represents an Domain Event that has been persisted into the Event Store.

func StreamToSlice

func StreamToSlice(ctx context.Context, f func(ctx context.Context, stream StreamWrite) error) ([]Persisted, error)

StreamToSlice synchronously exhausts an EventStream to an event.Persisted slice, and returns an error if the EventStream origin, passed here as a closure, fails with an error.

type Processor

type Processor interface {
	Process(ctx context.Context, event Persisted) error
}

Processor represents a component that can process persisted Domain Events.

type ProcessorFunc

type ProcessorFunc func(ctx context.Context, event Persisted) error

ProcessorFunc is a functional implementation of the Processor interface.

func (ProcessorFunc) Process

func (pf ProcessorFunc) Process(ctx context.Context, event Persisted) error

Process implements the event.Processor interface.

type ProcessorRunner

type ProcessorRunner struct {
	Processor
	Subscription

	BufferSize int

	Logger func(string, ...any)
}

ProcessorRunner is an infrastructural component that orchestrates the processing of a Domain Event using the provided Event Processor and Subscription, to subscribe to incoming events from the Event Store.

func (ProcessorRunner) Run

func (r ProcessorRunner) Run(ctx context.Context) error

Run starts listening to Events from the provided Subscription and passing them to the Processor instance for event processing.

Run is a blocking call, that will exit when either the Processor returns an error, or the Subscription stops.

Run uses buffered channels to coordinate events communication between components, using the value specified in BufferSize, if any, or DefaultRunnerBufferSize otherwise.

To stop the Runner, cancel the provided context. If the error returned upon exit is context.Canceled, that usually represent a case of normal operation, so it could be treated as a non-error.

type Store

type Store interface {
	Appender
	Streamer
}

Store represents an Event Store, a stateful data source where Domain Events can be safely stored, and easily replayed.

type Stream

type Stream = chan Persisted

Stream represents a stream of persisted Domain Events coming from some stream-able source of data, like an Event Store.

func SliceToStream

func SliceToStream(events []Persisted) Stream

SliceToStream converts a slice of event.Persisted domain events to an event.Stream type.

The event.Stream channel has the same buffer size as the input slice.

The channel returned by the function contains all the original slice elements and is already closed.

type StreamID

type StreamID string

StreamID identifies an Event Stream, which is a log of ordered Domain Events.

type StreamRead

type StreamRead <-chan Persisted

StreamRead provides read-only access to an event.Stream object.

type StreamWrite

type StreamWrite chan<- Persisted

StreamWrite provides write-only access to an event.Stream object.

type Streamer

type Streamer interface {
	Stream(ctx context.Context, stream StreamWrite, id StreamID, selector version.Selector) error
}

Streamer is an event.Store trait used to open a specific Event Stream and stream it back in the application.

type Subscription

type Subscription interface {
	Name() string
	Start(ctx context.Context, eventStream StreamWrite) error
	Checkpoint(ctx context.Context, event Persisted) error
}

Subscription is used to open an Event Stream from a remote source.

Usually, these are stateful components, and their state can be updated using the Checkpoint method.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL