eventstore

package
v0.0.0-...-064cf75 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2024 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidEventType    = InvalidEventTypeError{}
	ErrAggregateNotFound   = errors.New("aggregate not found")
	ErrInvalidCommandType  = errors.New("invalid command type")
	ErrInvalidRequestType  = errors.New("invalid request type")
	ErrInvalidAggregate    = errors.New("invalid aggregate")
	ErrInvalidAggregateID  = errors.New("invalid aggregate id")
	ErrInvalidEventVersion = errors.New("invalid event version")
)

Functions

func IsAggregateNotFound

func IsAggregateNotFound(aggregate Aggregate) bool

func IsEventStoreErrorCodeResourceAlreadyExists

func IsEventStoreErrorCodeResourceAlreadyExists(err error) bool

func IsEventStoreErrorCodeResourceNotFound

func IsEventStoreErrorCodeResourceNotFound(err error) bool

func IsEventStoreErrorCodeWrongExpectedVersion

func IsEventStoreErrorCodeWrongExpectedVersion(err error) bool

Types

type Aggregate

type Aggregate interface {
	When
	AggregateRoot
	HandleGRPCRequest(ctx context.Context, request any, params map[string]any) (any, error)
}

type AggregateBase

type AggregateBase struct {
	ID                string
	Tenant            string
	Version           int64
	AppliedEvents     []Event
	UncommittedEvents []Event
	Type              AggregateType
	// contains filtered or unexported fields
}

AggregateBase base aggregate contains all main necessary fields

func NewAggregateBase

func NewAggregateBase(when when) *AggregateBase

func (*AggregateBase) Apply

func (a *AggregateBase) Apply(event Event) error

Apply push event to aggregate uncommitted events using When method

func (*AggregateBase) ApplyAll

func (a *AggregateBase) ApplyAll(events []Event) error

func (*AggregateBase) ClearUncommittedEvents

func (a *AggregateBase) ClearUncommittedEvents()

ClearUncommittedEvents clear AggregateBase uncommitted Event's

func (*AggregateBase) GetAppliedEvents

func (a *AggregateBase) GetAppliedEvents() []Event

GetAppliedEvents get AggregateBase applied Event's

func (*AggregateBase) GetID

func (a *AggregateBase) GetID() string

GetID get AggregateBase ID

func (*AggregateBase) GetStreamMetadata

func (a *AggregateBase) GetStreamMetadata() *esdb.StreamMetadata

func (*AggregateBase) GetTenant

func (a *AggregateBase) GetTenant() string

GetTenant get AggregateBase Tenant

func (*AggregateBase) GetType

func (a *AggregateBase) GetType() AggregateType

GetType get AggregateBase AggregateType

func (*AggregateBase) GetUncommittedEvents

func (a *AggregateBase) GetUncommittedEvents() []Event

GetUncommittedEvents get AggregateBase uncommitted Event's

func (*AggregateBase) GetVersion

func (a *AggregateBase) GetVersion() int64

GetVersion get AggregateBase version

func (*AggregateBase) IsTemporal

func (a *AggregateBase) IsTemporal() bool

func (*AggregateBase) IsWithAppliedEvents

func (a *AggregateBase) IsWithAppliedEvents() bool

func (*AggregateBase) Load

func (a *AggregateBase) Load(events []Event) error

Load add existing events from event store to aggregate using When interface method

func (*AggregateBase) PrepareStreamMetadata

func (a *AggregateBase) PrepareStreamMetadata() esdb.StreamMetadata

func (*AggregateBase) RaiseEvent

func (a *AggregateBase) RaiseEvent(event Event) error

RaiseEvent push event to aggregate applied events using When method, used for load directly from eventstore

func (*AggregateBase) SetAppliedEvents

func (a *AggregateBase) SetAppliedEvents(events []Event)

SetAppliedEvents set AggregateBase applied Event's

func (*AggregateBase) SetID

func (a *AggregateBase) SetID(id string) *AggregateBase

SetID set AggregateBase ID

func (*AggregateBase) SetStreamMetadata

func (a *AggregateBase) SetStreamMetadata(streamMetadata *esdb.StreamMetadata)

func (*AggregateBase) SetType

func (a *AggregateBase) SetType(aggregateType AggregateType)

SetType set AggregateBase AggregateType

func (*AggregateBase) SetVersion

func (a *AggregateBase) SetVersion(version int64)

SetVersion set AggregateBase version

func (*AggregateBase) String

func (a *AggregateBase) String() string

func (*AggregateBase) ToSnapshot

func (a *AggregateBase) ToSnapshot()

ToSnapshot prepare AggregateBase for saving Snapshot.

func (*AggregateBase) WithAppliedEvents

func (a *AggregateBase) WithAppliedEvents()

type AggregateRoot

type AggregateRoot interface {
	GetUncommittedEvents() []Event
	GetTenant() string
	GetID() string
	SetID(id string) *AggregateBase
	GetVersion() int64
	SetVersion(version int64)
	ClearUncommittedEvents()
	ToSnapshot()
	SetType(aggregateType AggregateType)
	GetType() AggregateType
	SetAppliedEvents(events []Event)
	GetAppliedEvents() []Event
	RaiseEvent(event Event) error
	String() string
	IsWithAppliedEvents() bool
	IsTemporal() bool
	SetStreamMetadata(streamMetadata *esdb.StreamMetadata)
	GetStreamMetadata() *esdb.StreamMetadata
	PrepareStreamMetadata() esdb.StreamMetadata
	Load
	Apply
}

AggregateRoot contains all methods of AggregateBase

type AggregateStore

type AggregateStore interface {
	// Load loads the most recent version of an aggregate to provided  into params aggregate with a type and id.
	Load(ctx context.Context, aggregate Aggregate) error

	// LoadVersion loads the most recent version of an aggregate to provided  into params aggregate with a type and id.
	LoadVersion(ctx context.Context, aggregate Aggregate) error

	// Save saves the uncommitted events for an aggregate.
	Save(ctx context.Context, aggregate Aggregate) error

	// Exists check aggregate exists by id.
	Exists(ctx context.Context, streamID string) error

	// UpdateStreamMetadata updates the stream metadata for the aggregate.
	UpdateStreamMetadata(ctx context.Context, streamID string, streamMetadata esdb.StreamMetadata) error
}

AggregateStore is responsible for loading and saving aggregates.

type AggregateType

type AggregateType string

AggregateType type of the Aggregate

type Apply

type Apply interface {
	Apply(event Event) error
	ApplyAll(events []Event) error
}

Apply process Aggregate Event

type BaseCommand

type BaseCommand struct {
	ObjectID       string `json:"objectID" validate:"required"`
	Tenant         string `json:"tenant" validate:"required"`
	LoggedInUserId string `json:"loggedInUserId"`
	AppSource      string `json:"appSource"`
}

func NewBaseCommand

func NewBaseCommand(objectID, tenant, loggedInUserId string) BaseCommand

func (BaseCommand) GetAppSource

func (c BaseCommand) GetAppSource() string

func (BaseCommand) GetLoggedInUserId

func (c BaseCommand) GetLoggedInUserId() string

func (BaseCommand) GetObjectID

func (c BaseCommand) GetObjectID() string

func (BaseCommand) GetTenant

func (c BaseCommand) GetTenant() string

func (BaseCommand) WithAppSource

func (c BaseCommand) WithAppSource(appSource string) BaseCommand

type BaseRequest

type BaseRequest struct {
	ObjectID       string             `json:"objectID" validate:"required"`
	Tenant         string             `json:"tenant" validate:"required"`
	LoggedInUserId string             `json:"loggedInUserId"`
	AppSource      string             `json:"appSource"`
	SourceFields   commonmodel.Source `json:"sourceFields"`
}

Deprecated

func NewBaseRequest

func NewBaseRequest(objectID, tenant, loggedInUserId string, sourceFields commonmodel.Source) BaseRequest

type Command

type Command interface {
	GetObjectID() string
	GetTenant() string
}

type Event

type Event struct {
	EventID       string
	EventType     string
	Data          []byte
	Timestamp     time.Time
	AggregateType AggregateType
	AggregateID   string
	Version       int64
	Metadata      []byte
}

Event is an internal representation of an event, returned when the Aggregate uses NewEvent to create a new event. The events loaded from the db is represented by each DBs internal event type, implementing Event.

func EventFromEventData

func EventFromEventData(recordedEvent esdb.RecordedEvent) (Event, error)

func NewBaseEvent

func NewBaseEvent(aggregate Aggregate, eventType string) Event

NewBaseEvent new base Event constructor with configured EventID, Aggregate properties and Timestamp.

func NewEventFromEventData

func NewEventFromEventData(event esdb.EventData) Event

func NewEventFromRecorded

func NewEventFromRecorded(event *esdb.RecordedEvent) Event

func (*Event) GetAggregateID

func (e *Event) GetAggregateID() string

GetAggregateID is the ID of the Aggregate that the Event belongs to

func (*Event) GetAggregateType

func (e *Event) GetAggregateType() AggregateType

GetAggregateType is the AggregateType that the Event can be applied to.

func (*Event) GetData

func (e *Event) GetData() []byte

GetData The data attached to the Event serialized to bytes.

func (*Event) GetEventID

func (e *Event) GetEventID() string

GetEventID get EventID of the Event.

func (*Event) GetEventType

func (e *Event) GetEventType() string

GetEventType returns the EventType of the event.

func (*Event) GetJsonData

func (e *Event) GetJsonData(data interface{}) error

GetJsonData json unmarshal data attached to the Event.

func (*Event) GetJsonMetadata

func (e *Event) GetJsonMetadata(metaData interface{}) error

GetJsonMetadata unmarshal app-specific metadata serialized as json for the Event.

func (*Event) GetMetadata

func (e *Event) GetMetadata() []byte

GetMetadata is app-specific metadata such as request ID, originating user etc.

func (*Event) GetString

func (e *Event) GetString() string

GetString A string representation of the Event.

func (*Event) GetTimeStamp

func (e *Event) GetTimeStamp() time.Time

GetTimeStamp get timestamp of the Event.

func (*Event) GetVersion

func (e *Event) GetVersion() int64

GetVersion is the version of the Aggregate after the Event has been applied.

func (*Event) SetAggregateType

func (e *Event) SetAggregateType(aggregateType AggregateType)

SetAggregateType set the AggregateType that the Event can be applied to.

func (*Event) SetData

func (e *Event) SetData(data []byte) *Event

SetData add the data attached to the Event serialized to bytes.

func (*Event) SetJsonData

func (e *Event) SetJsonData(data interface{}) error

SetJsonData serialize to json and set data attached to the Event.

func (*Event) SetMetadata

func (e *Event) SetMetadata(metaData interface{}) error

SetMetadata add app-specific metadata serialized as json for the Event.

func (*Event) SetVersion

func (e *Event) SetVersion(aggregateVersion int64)

SetVersion set the version of the Aggregate.

func (*Event) String

func (e *Event) String() string

func (*Event) ToEventData

func (e *Event) ToEventData() esdb.EventData

type EventBufferService

type EventBufferService struct {
	// contains filtered or unexported fields
}

func NewEventBufferService

func NewEventBufferService(eventBufferRepository postgresRepository.EventBufferRepository) *EventBufferService

func (*EventBufferService) Delete

func (eb *EventBufferService) Delete(eventBuffer *postgresEntity.EventBuffer) error

func (*EventBufferService) GetById

func (*EventBufferService) Park

func (eb *EventBufferService) Park(
	evt Event,
	tenant string,
	uuid string,
	expiryTimestamp time.Time,
) error

func (*EventBufferService) Update

func (eb *EventBufferService) Update(eventBuffer *postgresEntity.EventBuffer) error

type EventStore

type EventStore interface {
	// SaveEvents appends all events in the event stream to the store.
	SaveEvents(ctx context.Context, streamID string, events []Event) error

	// LoadEvents loads all events for the aggregate id from the store.
	LoadEvents(ctx context.Context, streamID string) ([]Event, error)
}

EventStore is an interface for an event sourcing event store.

type EventType

type EventType string

EventType is the type of any event, used as its unique identifier.

type InvalidEventTypeError

type InvalidEventTypeError struct {
	EventType string
}

func (InvalidEventTypeError) Error

func (e InvalidEventTypeError) Error() string

type Load

type Load interface {
	Load(events []Event) error
}

Load create Aggregate state from Event's.

type LoadAggregateOptions

type LoadAggregateOptions struct {
	Required       bool
	SkipLoadEvents bool
}

func NewLoadAggregateOptions

func NewLoadAggregateOptions() *LoadAggregateOptions

func NewLoadAggregateOptionsWithRequired

func NewLoadAggregateOptionsWithRequired() *LoadAggregateOptions

func (*LoadAggregateOptions) WithSkipLoadEvents

func (o *LoadAggregateOptions) WithSkipLoadEvents() *LoadAggregateOptions

type Projection

type Projection interface {
	When(ctx context.Context, evt Event) error
}

Projection When method works and process Event's like Aggregate's for interacting with read database.

type RecordedBaseEvent

type RecordedBaseEvent struct {
	// Event's id.
	EventID string
	// Event's type.
	EventType string
	// Event's content type.
	ContentType string
	// The stream that event belongs to.
	StreamID string
	// The event's revision number.
	EventNumber uint64
	// The event's transaction log position.
	Position esdb.Position
	// When the event was created.
	CreatedDate time.Time
}

func NewRecordedBaseEventFromRecorded

func NewRecordedBaseEventFromRecorded(recorded *esdb.RecordedEvent) RecordedBaseEvent

type Snapshot

type Snapshot struct {
	ID      string        `json:"id"`
	Type    AggregateType `json:"type"`
	State   []byte        `json:"state"`
	Version uint64        `json:"version"`
}

Snapshot Event Sourcing Snapshotting is an optimisation that reduces time spent on reading event from an event store.

func NewSnapshotFromAggregate

func NewSnapshotFromAggregate(aggregate Aggregate) (*Snapshot, error)

NewSnapshotFromAggregate create new snapshot from the Aggregate state.

type SnapshotStore

type SnapshotStore interface {
	// SaveSnapshot save aggregate snapshot.
	SaveSnapshot(ctx context.Context, aggregate Aggregate) error

	// GetSnapshot load aggregate snapshot.
	GetSnapshot(ctx context.Context, id string) (*Snapshot, error)
}

SnapshotStore is an interface for an event sourcing snapshot store.

type When

type When interface {
	When(event Event) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL