es

package
v0.0.0-...-10ef84c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2023 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrAlreadyExists       = errors.New("Already exists")
	ErrAggregateNotFound   = errors.New("aggregate not found")
	ErrInvalidEventType    = errors.New("invalid event type")
	ErrInvalidCommandType  = errors.New("invalid command type")
	ErrInvalidAggregate    = errors.New("invalid aggregate")
	ErrInvalidAggregateID  = errors.New("invalid aggregate id")
	ErrInvalidEventVersion = errors.New("invalid event version")
)

Functions

This section is empty.

Types

type Aggregate

type Aggregate interface {
	When
	AggregateRoot
}

type AggregateBase

type AggregateBase struct {
	ID                string
	Version           int64
	AppliedEvents     []Event
	UncommittedEvents []Event
	Type              AggregateType
	// contains filtered or unexported fields
}

AggregateBase base aggregate contains all main necessary fields

func NewAggregateBase

func NewAggregateBase(when when) *AggregateBase

NewAggregateBase AggregateBase constructor, contains all main fields and methods, main aggregate must realize When interface and pass as argument to constructor Example of recommended aggregate constructor method:

func NewOrderAggregate() *OrderAggregate {
	orderAggregate := &OrderAggregate{
		Order: models.NewOrder(),
	}
	base := es.NewAggregateBase(orderAggregate.When)
	base.SetType(OrderAggregateType)
	orderAggregate.AggregateBase = base
	return orderAggregate
}

func (*AggregateBase) Apply

func (a *AggregateBase) Apply(event Event) error

Apply push event to aggregate uncommitted events using When method

func (*AggregateBase) ClearUncommittedEvents

func (a *AggregateBase) ClearUncommittedEvents()

ClearUncommittedEvents clear AggregateBase uncommitted Event's

func (*AggregateBase) GetAppliedEvents

func (a *AggregateBase) GetAppliedEvents() []Event

GetAppliedEvents get AggregateBase applied Event's

func (*AggregateBase) GetID

func (a *AggregateBase) GetID() string

GetID get AggregateBase ID

func (*AggregateBase) GetType

func (a *AggregateBase) GetType() AggregateType

GetType get AggregateBase AggregateType

func (*AggregateBase) GetUncommittedEvents

func (a *AggregateBase) GetUncommittedEvents() []Event

GetUncommittedEvents get AggregateBase uncommitted Event's

func (*AggregateBase) GetVersion

func (a *AggregateBase) GetVersion() int64

GetVersion get AggregateBase version

func (*AggregateBase) Load

func (a *AggregateBase) Load(events []Event) error

Load add existing events from event store to aggregate using When interface method

func (*AggregateBase) RaiseEvent

func (a *AggregateBase) RaiseEvent(event Event) error

RaiseEvent push event to aggregate applied events using When method, used for load directly from eventstore

func (*AggregateBase) SetAppliedEvents

func (a *AggregateBase) SetAppliedEvents(events []Event)

SetAppliedEvents set AggregateBase applied Event's

func (*AggregateBase) SetID

func (a *AggregateBase) SetID(id string) *AggregateBase

SetID set AggregateBase ID

func (*AggregateBase) SetType

func (a *AggregateBase) SetType(aggregateType AggregateType)

SetType set AggregateBase AggregateType

func (*AggregateBase) String

func (a *AggregateBase) String() string

func (*AggregateBase) ToSnapshot

func (a *AggregateBase) ToSnapshot()

ToSnapshot prepare AggregateBase for saving Snapshot.

type AggregateRoot

type AggregateRoot interface {
	GetUncommittedEvents() []Event
	GetID() string
	SetID(id string) *AggregateBase
	GetVersion() int64
	ClearUncommittedEvents()
	ToSnapshot()
	SetType(aggregateType AggregateType)
	GetType() AggregateType
	SetAppliedEvents(events []Event)
	GetAppliedEvents() []Event
	RaiseEvent(event Event) error
	String() string
	Load
	Apply
}

AggregateRoot contains all methods of AggregateBase

type AggregateStore

type AggregateStore interface {
	// Load loads the most recent version of an aggregate to provided  into params aggregate with a type and id.
	Load(ctx context.Context, aggregate Aggregate) error

	// Save saves the uncommitted events for an aggregate.
	Save(ctx context.Context, aggregate Aggregate) error

	// Exists check aggregate exists by id.
	Exists(ctx context.Context, streamID string) error
}

AggregateStore is responsible for loading and saving aggregates.

type AggregateType

type AggregateType string

AggregateType type of the Aggregate

type Apply

type Apply interface {
	Apply(event Event) error
}

Apply process Aggregate Event

type BaseCommand

type BaseCommand struct {
	AggregateID string `json:"aggregateID" validate:"required,gte=0"`
}

func NewBaseCommand

func NewBaseCommand(aggregateID string) BaseCommand

func (*BaseCommand) GetAggregateID

func (c *BaseCommand) GetAggregateID() string

type Command

type Command interface {
	GetAggregateID() string
}

Command commands interface for event sourcing.

type Config

type Config struct {
	SnapshotFrequency int64 `json:"snapshotFrequency" validate:"required,gte=0"`
}

Config of es package.

type Event

type Event struct {
	EventID       string
	EventType     string
	Data          []byte
	Timestamp     time.Time
	AggregateType AggregateType
	AggregateID   string
	Version       int64
	Metadata      []byte
}

Event is an internal representation of an event, returned when the Aggregate uses NewEvent to create a new event. The events loaded from the db is represented by each DBs internal event type, implementing Event.

func EventFromEventData

func EventFromEventData(recordedEvent esdb.RecordedEvent) (Event, error)

func NewBaseEvent

func NewBaseEvent(aggregate Aggregate, eventType string) Event

NewBaseEvent new base Event constructor with configured EventID, Aggregate properties and Timestamp.

func NewEventFromEventData

func NewEventFromEventData(event esdb.EventData) Event

func NewEventFromRecorded

func NewEventFromRecorded(event *esdb.RecordedEvent) Event

func (*Event) GetAggregateID

func (e *Event) GetAggregateID() string

GetAggregateID is the ID of the Aggregate that the Event belongs to

func (*Event) GetAggregateType

func (e *Event) GetAggregateType() AggregateType

GetAggregateType is the AggregateType that the Event can be applied to.

func (*Event) GetData

func (e *Event) GetData() []byte

GetData The data attached to the Event serialized to bytes.

func (*Event) GetEventID

func (e *Event) GetEventID() string

GetEventID get EventID of the Event.

func (*Event) GetEventType

func (e *Event) GetEventType() string

GetEventType returns the EventType of the event.

func (*Event) GetJsonData

func (e *Event) GetJsonData(data interface{}) error

GetJsonData json unmarshal data attached to the Event.

func (*Event) GetJsonMetadata

func (e *Event) GetJsonMetadata(metaData interface{}) error

GetJsonMetadata unmarshal app-specific metadata serialized as json for the Event.

func (*Event) GetMetadata

func (e *Event) GetMetadata() []byte

GetMetadata is app-specific metadata such as request ID, originating user etc.

func (*Event) GetString

func (e *Event) GetString() string

GetString A string representation of the Event.

func (*Event) GetTimeStamp

func (e *Event) GetTimeStamp() time.Time

GetTimeStamp get timestamp of the Event.

func (*Event) GetVersion

func (e *Event) GetVersion() int64

GetVersion is the version of the Aggregate after the Event has been applied.

func (*Event) SetAggregateType

func (e *Event) SetAggregateType(aggregateType AggregateType)

SetAggregateType set the AggregateType that the Event can be applied to.

func (*Event) SetData

func (e *Event) SetData(data []byte) *Event

SetData add the data attached to the Event serialized to bytes.

func (*Event) SetJsonData

func (e *Event) SetJsonData(data interface{}) error

SetJsonData serialize to json and set data attached to the Event.

func (*Event) SetMetadata

func (e *Event) SetMetadata(metaData interface{}) error

SetMetadata add app-specific metadata serialized as json for the Event.

func (*Event) SetVersion

func (e *Event) SetVersion(aggregateVersion int64)

SetVersion set the version of the Aggregate.

func (*Event) String

func (e *Event) String() string

func (*Event) ToEventData

func (e *Event) ToEventData() esdb.EventData

type EventStore

type EventStore interface {
	// SaveEvents appends all events in the event stream to the store.
	SaveEvents(ctx context.Context, streamID string, events []Event) error

	// LoadEvents loads all events for the aggregate id from the store.
	LoadEvents(ctx context.Context, streamID string) ([]Event, error)
}

EventStore is an interface for an event sourcing event store.

type EventType

type EventType string

EventType is the type of any event, used as its unique identifier.

type HandleCommand

type HandleCommand interface {
	HandleCommand(ctx context.Context, command Command) error
}

HandleCommand Aggregate commands' handler method Example

func (a *OrderAggregate) HandleCommand(command interface{}) error {
	switch c := command.(type) {
	case *CreateOrderCommand:
		return a.handleCreateOrderCommand(c)
	case *OrderPaidCommand:
		return a.handleOrderPaidCommand(c)
	case *SubmitOrderCommand:
		return a.handleSubmitOrderCommand(c)
	default:
		return errors.New("invalid command type")
	}
}

type Load

type Load interface {
	Load(events []Event) error
}

Load create Aggregate state from Event's.

type Projection

type Projection interface {
	When(ctx context.Context, evt Event) error
}

Projection When method works and process Event's like Aggregate's for interacting with read database.

type Snapshot

type Snapshot struct {
	ID      string        `json:"id"`
	Type    AggregateType `json:"type"`
	State   []byte        `json:"state"`
	Version uint64        `json:"version"`
}

Snapshot Event Sourcing Snapshotting is an optimisation that reduces time spent on reading event from an event store.

func NewSnapshotFromAggregate

func NewSnapshotFromAggregate(aggregate Aggregate) (*Snapshot, error)

NewSnapshotFromAggregate create new snapshot from the Aggregate state.

type SnapshotStore

type SnapshotStore interface {
	// SaveSnapshot save aggregate snapshot.
	SaveSnapshot(ctx context.Context, aggregate Aggregate) error

	// GetSnapshot load aggregate snapshot.
	GetSnapshot(ctx context.Context, id string) (*Snapshot, error)
}

SnapshotStore is an interface for an event sourcing snapshot store.

type When

type When interface {
	When(event Event) error
}

When process and update aggregate state on specified es.Event type Example:

func (a *OrderAggregate) When(evt es.Event) error {

	switch evt.GetEventType() {

	case events.OrderCreated:
		var eventData events.OrderCreatedEvent
		if err := json.Unmarshal(evt.GetData(), &eventData); err != nil {
			return err
		}
		a.Order.ItemsIDs = eventData.ItemsIDs
		a.Order.Created = true
		return nil

	default:
		return errors.New("invalid event type")
	}
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL