eventsource

package module
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2023 License: MIT Imports: 10 Imported by: 0

README

Go Report

Eventsource

Definition

Event sourcing pattern implemented with a postgresql/coackcroachdb compatible eventstore.

Events, aggregates, snapshots and the full aggregate's current state (last projection based on the last transaction) are persisted using the same transaction.

Commands are not persisted but you still can persist them if needed in another layer.

The package also implement an outbox pattern also persisted in the same transaction.

The ids of the entities are generated using a K-Sortable Unique IDentifier (1 second resolution). The migrations to maintain the postgresql eventstore will be added in a future version. The snapshots does not yet have a proper identifier, this should be added at a later stage; snapshots can be fecthed using the aggregate id and the version, subject to a unique index formed by both.

This version is subject to change and will possibly cause breaking changes.

Postgresql/CoackcroachDB schema definition

The SQL schema is as follow (a migration script will be included at a later stage):

create schema if not exists es;
create schema if not exists projection;

drop table if exists es.snapshots;
drop table if exists es.events;
drop table if exists projection.organizations;

create table if not exists es.events
(
    id                varchar primary key,
    type              varchar,
    occurred_at       timestamptz,
    registered_at     timestamptz,
    aggregate_id      varchar,
    aggregate_type    varchar,
    aggregate_version bigint,
    data              jsonb,
    metadata          jsonb,
    unique (aggregate_id, aggregate_version)
);

create table if not exists es.snapshots
(
    aggregate_id      varchar,
    aggregate_type    varchar,
    aggregate_version bigint,
    taken_at          timestamptz,
    registered_at     timestamptz,
    data              jsonb,
    primary key (aggregate_id, aggregate_version)
);

create table if not exists projection.organizations
(
    id                  varchar primary key,
    name                varchar,
    registration_number varchar,
    vat_number          varchar,
    vat_is_intra_com    varchar,
    country             varchar,
    created_by          varchar,
    updated_at          timestamptz,
    created_at          timestamptz,
    version             bigint,
    registered_at       timestamptz
)

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoEventsToStore       = errors.New("no events to store")
	ErrNoSnapshotFound       = errors.New("no snapshot found")
	ErrTransactionIsRequired = errors.New("transaction is required")
	ErrAggregateDoNotExist   = errors.New("aggregate do not exist")
)
View Source
var (
	ErrAssertionFailed = errors.New("assertion failed")
)

Functions

func AssertAndGet added in v0.3.0

func AssertAndGet[H Aggregate, W Aggregate](have H, want W) (W, error)

func ErrIsSnapshotNotFound

func ErrIsSnapshotNotFound(err error) bool

func FromSnapshot added in v0.3.0

func FromSnapshot(snapshot *Snapshot, a Aggregate)

func MarshalES added in v0.3.0

func MarshalES(object any) ([]byte, error)

func On

func On(ctx context.Context, a Aggregate, event Event, new bool)

func Raise

func Raise(ctx context.Context, aggregate Aggregate, changes ...Event)

func Sort

func Sort(ee []Event)

func UnmarshalES added in v0.3.0

func UnmarshalES(b []byte, object any) error

Types

type Aggregate added in v0.3.0

type Aggregate interface {
	ID() AggregateID
	Type() AggregateType
	Version() AggregateVersion
	Changes() []Event
	StackChange(change Event)
	StackSnapshot(snapshot *Snapshot)
	StackedSnapshots() []*Snapshot
	SnapshotsWithFrequency(frequency int) []*Snapshot
	SetVersion(version AggregateVersion)
	IncrementVersion()
	PrepareForLoading()
	ParseEvents(context.Context, ...EventReadModel) []Event
}

func Replay added in v0.3.0

func Replay(ctx context.Context, a Aggregate, snapshot *Snapshot, ee ...Event) (Aggregate, error)

type AggregateID

type AggregateID string

func (AggregateID) IsZero

func (id AggregateID) IsZero() bool

func (AggregateID) String

func (id AggregateID) String() string

type AggregateType

type AggregateType string

func (AggregateType) IsZero

func (t AggregateType) IsZero() bool

func (AggregateType) String

func (t AggregateType) String() string

type AggregateVersion

type AggregateVersion int

func (AggregateVersion) Int added in v0.3.0

func (v AggregateVersion) Int() int

func (AggregateVersion) Int64

func (v AggregateVersion) Int64() int64

func (AggregateVersion) IsZero

func (v AggregateVersion) IsZero() bool

func (AggregateVersion) Next added in v0.3.0

type BaseAggregate

type BaseAggregate struct {
	// contains filtered or unexported fields
}

func InitAggregate added in v0.3.0

func InitAggregate(id string, t AggregateType) *BaseAggregate

func (*BaseAggregate) Changes

func (a *BaseAggregate) Changes() []Event

func (*BaseAggregate) ID

func (a *BaseAggregate) ID() AggregateID

func (*BaseAggregate) IncrementVersion

func (a *BaseAggregate) IncrementVersion()

func (*BaseAggregate) PrepareForLoading added in v0.4.0

func (a *BaseAggregate) PrepareForLoading()

func (*BaseAggregate) SetVersion

func (a *BaseAggregate) SetVersion(version AggregateVersion)

func (*BaseAggregate) SnapshotsWithFrequency

func (a *BaseAggregate) SnapshotsWithFrequency(frequency int) []*Snapshot

func (*BaseAggregate) StackChange

func (a *BaseAggregate) StackChange(change Event)

func (*BaseAggregate) StackSnapshot

func (a *BaseAggregate) StackSnapshot(snapshot *Snapshot)

func (*BaseAggregate) StackedSnapshots

func (a *BaseAggregate) StackedSnapshots() []*Snapshot

func (*BaseAggregate) Type added in v0.3.0

func (a *BaseAggregate) Type() AggregateType

func (*BaseAggregate) Version

func (a *BaseAggregate) Version() AggregateVersion

type BaseEvent

type BaseEvent struct {
	// contains filtered or unexported fields
}

func NewBaseEvent added in v0.3.0

func NewBaseEvent(from Aggregate, metadata Metadata) *BaseEvent

func (*BaseEvent) AggregateID

func (e *BaseEvent) AggregateID() AggregateID

func (*BaseEvent) AggregateType

func (e *BaseEvent) AggregateType() AggregateType

func (*BaseEvent) AggregateVersion

func (e *BaseEvent) AggregateVersion() AggregateVersion

func (*BaseEvent) ID

func (e *BaseEvent) ID() EventID

func (*BaseEvent) Metadata

func (e *BaseEvent) Metadata() Metadata

func (*BaseEvent) OccurredAt

func (e *BaseEvent) OccurredAt() time.Time

func (*BaseEvent) SetVersion

func (e *BaseEvent) SetVersion(version AggregateVersion)

func (*BaseEvent) String

func (e *BaseEvent) String() string

type Event

type Event interface {
	fmt.Stringer
	ApplyTo(ctx context.Context, aggregate Aggregate) // ApplyTo applies the event to the aggregate
	ID() EventID                                      // ID returns the id of the event.
	Type() EventType                                  // Type returns the type of the event.
	OccurredAt() time.Time                            // OccurredAt of when the event was created.
	AggregateID() AggregateID                         // AggregateID is the id of the aggregate that the event belongs to.
	AggregateType() AggregateType                     // AggregateType is the type of the aggregate that the event can be applied to.
	AggregateVersion() AggregateVersion               // AggregateVersion is the version of the aggregate after the event has been applied.
	SetVersion(AggregateVersion)                      // SetVersion sets the aggregate version of the event
	Metadata() Metadata                               // Metadata is app-specific metadata such as request AggregateID, originating user etc.
}

type EventID

type EventID string

func NewEventID added in v0.3.0

func NewEventID() EventID

func (EventID) IsZero

func (id EventID) IsZero() bool

func (EventID) String

func (id EventID) String() string

type EventReadModel

type EventReadModel struct {
	ID               EventID                `json:"id"`
	Type             EventType              `json:"type"`
	OccurredAt       time.Time              `json:"occurred_at"`
	AggregateID      AggregateID            `json:"aggregate_id"`
	AggregateType    AggregateType          `json:"aggregate_type"`
	AggregateVersion AggregateVersion       `json:"aggregate_version"`
	Metadata         map[string]interface{} `json:"metadata"`
	Data             json.RawMessage        `json:"data"`
}

func (*EventReadModel) InitBaseEvent

func (r *EventReadModel) InitBaseEvent() *BaseEvent

type EventStore

type EventStore interface {
	Save(ctx context.Context, tx Transaction, a Aggregate, opts ...SaveOption) error
	Load(ctx context.Context, tx Transaction, a Aggregate) (Aggregate, error)
	EventsHistory(ctx context.Context, tx Transaction, aggregateID, aggregateType string, fromVersion int, limit int) ([]EventReadModel, error)
}

type EventType

type EventType string

func (EventType) IsZero

func (t EventType) IsZero() bool

func (EventType) String

func (t EventType) String() string

type Marshaler added in v0.3.0

type Marshaler interface {
	MarshalES() ([]byte, error)
}

type Metadata

type Metadata map[string]interface{}

func NewMetadata added in v0.3.0

func NewMetadata() Metadata

func (Metadata) Add added in v0.3.0

func (m Metadata) Add(key string, value interface{}) Metadata

type SaveOption added in v0.3.0

type SaveOption func(*SaveOptions)

func WithSnapshot added in v0.3.0

func WithSnapshot(frequency int) SaveOption

type SaveOptions

type SaveOptions struct {
	WithSnapshot          bool
	WithSnapshotFrequency int
}

func NewSaveOptions added in v0.3.0

func NewSaveOptions(opts ...SaveOption) *SaveOptions

type Snapshot

type Snapshot struct {
	AggregateID      AggregateID
	AggregateType    AggregateType
	AggregateVersion AggregateVersion
	TakenAt          time.Time
	Data             []byte
}

func NewSnapshot added in v0.3.0

func NewSnapshot(a Aggregate) (*Snapshot, error)

type Transaction

type Transaction interface {
	Commit() error
	Rollback() error
}

type Unmarshaler added in v0.8.0

type Unmarshaler interface {
	UnmarshalES(b []byte, object any) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL