Documentation ¶
Index ¶
- Variables
- func QuoteIdentifier(name string) string
- func QuoteString(str string) string
- type AdvisoryLockAggregateProjectionStorage
- func (a *AdvisoryLockAggregateProjectionStorage) Acquire(ctx context.Context, conn *sql.Conn, ...) (driverSQL.ProjectorTransaction, int64, error)
- func (a *AdvisoryLockAggregateProjectionStorage) LoadOutOfSync(ctx context.Context, conn driverSQL.Queryer) (*sql.Rows, error)
- func (a *AdvisoryLockAggregateProjectionStorage) PersistFailure(conn driverSQL.Execer, notification *driverSQL.ProjectionNotification) error
- type AdvisoryLockStreamProjectionStorage
- type ConjoinedEventStore
- type ConjoinedMessageHandler
- type EventStore
- func (e *EventStore) AppendTo(ctx context.Context, streamName goengine.StreamName, ...) error
- func (e *EventStore) AppendToWithExecer(ctx context.Context, conn driverSQL.Execer, streamName goengine.StreamName, ...) error
- func (e *EventStore) Create(ctx context.Context, streamName goengine.StreamName) error
- func (e *EventStore) HasStream(ctx context.Context, streamName goengine.StreamName) bool
- func (e *EventStore) Load(ctx context.Context, streamName goengine.StreamName, fromNumber int64, ...) (goengine.EventStream, error)
- func (e *EventStore) LoadWithConnection(ctx context.Context, conn driverSQL.Queryer, streamName goengine.StreamName, ...) (goengine.EventStream, error)
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoCreateTableQueries occurs when table create queries are not presented in the strategy ErrNoCreateTableQueries = errors.New("goengine: create table queries are not provided") // ErrTableAlreadyExists occurs when table cannot be created as it exists already ErrTableAlreadyExists = errors.New("goengine: table already exists") // ErrTableNameEmpty occurs when table cannot be created because it has an empty name ErrTableNameEmpty = errors.New("goengine: table name could not be empty") )
Functions ¶
func QuoteIdentifier ¶
QuoteIdentifier quotes an "identifier" (e.g. a table or a column name) to be used as part of an SQL statement.
Types ¶
type AdvisoryLockAggregateProjectionStorage ¶
type AdvisoryLockAggregateProjectionStorage struct {
// contains filtered or unexported fields
}
AdvisoryLockAggregateProjectionStorage is a AggregateProjectorStorage that uses a advisory locks to lock a projection
func NewAdvisoryLockAggregateProjectionStorage ¶
func NewAdvisoryLockAggregateProjectionStorage( eventStoreTable, projectionTable string, projectionStateSerialization driverSQL.ProjectionStateSerialization, useLockField bool, logger goengine.Logger, ) (*AdvisoryLockAggregateProjectionStorage, error)
NewAdvisoryLockAggregateProjectionStorage returns a new AdvisoryLockAggregateProjectionStorage
func (*AdvisoryLockAggregateProjectionStorage) Acquire ¶
func (a *AdvisoryLockAggregateProjectionStorage) Acquire( ctx context.Context, conn *sql.Conn, notification *driverSQL.ProjectionNotification, ) (driverSQL.ProjectorTransaction, int64, error)
Acquire returns a driverSQL.ProjectorTransaction and the position of the projection within the event stream when a lock is acquired for the specified aggregate_id. Otherwise an error is returned indicating why the lock could not be acquired.
func (*AdvisoryLockAggregateProjectionStorage) LoadOutOfSync ¶
func (a *AdvisoryLockAggregateProjectionStorage) LoadOutOfSync(ctx context.Context, conn driverSQL.Queryer) (*sql.Rows, error)
LoadOutOfSync return a set of rows with the aggregate_id and number of the projection that are not in sync with the event store
func (*AdvisoryLockAggregateProjectionStorage) PersistFailure ¶
func (a *AdvisoryLockAggregateProjectionStorage) PersistFailure(conn driverSQL.Execer, notification *driverSQL.ProjectionNotification) error
PersistFailure marks the specified aggregate_id projection as failed
type AdvisoryLockStreamProjectionStorage ¶
type AdvisoryLockStreamProjectionStorage struct {
// contains filtered or unexported fields
}
AdvisoryLockStreamProjectionStorage is a StreamProjectorStorage that uses a advisory locks to lock a projection
func NewAdvisoryLockStreamProjectionStorage ¶
func NewAdvisoryLockStreamProjectionStorage( projectionName, projectionTable string, projectionStateSerialization driverSQL.ProjectionStateSerialization, useLockField bool, logger goengine.Logger, ) (*AdvisoryLockStreamProjectionStorage, error)
NewAdvisoryLockStreamProjectionStorage returns a new AdvisoryLockStreamProjectionStorage
func (*AdvisoryLockStreamProjectionStorage) Acquire ¶
func (s *AdvisoryLockStreamProjectionStorage) Acquire( ctx context.Context, conn *sql.Conn, notification *driverSQL.ProjectionNotification, ) (driverSQL.ProjectorTransaction, int64, error)
Acquire returns a driverSQL.ProjectorTransaction and the position of the projection within the event stream when a lock is acquire for the specified aggregate_id. Otherwise an error is returned indicating why the lock could not be acquired.
func (*AdvisoryLockStreamProjectionStorage) CreateProjection ¶
func (s *AdvisoryLockStreamProjectionStorage) CreateProjection(ctx context.Context, conn driverSQL.Execer) error
CreateProjection creates the row in the projection table for the stream projection
type ConjoinedEventStore ¶
type ConjoinedEventStore struct { *EventStore // contains filtered or unexported fields }
ConjoinedEventStore a in postgres event store implementation which includes projection logic.
func NewConjoinedEventStore ¶
func NewConjoinedEventStore( eventstore *EventStore, resolver goengine.MessagePayloadResolver, handlers map[string]ConjoinedMessageHandler, ) (*ConjoinedEventStore, error)
NewConjoinedEventStore return a new postgres.ConjoinedEventStore
func (*ConjoinedEventStore) AppendTo ¶
func (e *ConjoinedEventStore) AppendTo(ctx context.Context, streamName goengine.StreamName, streamEvents []goengine.Message) error
AppendTo batch inserts Messages into the event stream table
type ConjoinedMessageHandler ¶
ConjoinedMessageHandler is a message handler called by the ConjoinedEventStore when a message is appended
type EventStore ¶
type EventStore struct {
// contains filtered or unexported fields
}
EventStore a in postgres event store implementation
func NewEventStore ¶
func NewEventStore( persistenceStrategy driverSQL.PersistenceStrategy, db *sql.DB, messageFactory driverSQL.MessageFactory, logger goengine.Logger, ) (*EventStore, error)
NewEventStore return a new postgres.EventStore
func (*EventStore) AppendTo ¶
func (e *EventStore) AppendTo(ctx context.Context, streamName goengine.StreamName, streamEvents []goengine.Message) error
AppendTo batch inserts Messages into the event stream table
func (*EventStore) AppendToWithExecer ¶
func (e *EventStore) AppendToWithExecer(ctx context.Context, conn driverSQL.Execer, streamName goengine.StreamName, streamEvents []goengine.Message) error
AppendToWithExecer batch inserts Messages into the event stream table using the provided Connection/Execer
func (*EventStore) Create ¶
func (e *EventStore) Create(ctx context.Context, streamName goengine.StreamName) error
Create creates the database table, index etc needed for the event stream
func (*EventStore) HasStream ¶
func (e *EventStore) HasStream(ctx context.Context, streamName goengine.StreamName) bool
HasStream returns true if the table for the eventstream already exists
func (*EventStore) Load ¶
func (e *EventStore) Load( ctx context.Context, streamName goengine.StreamName, fromNumber int64, count *uint, matcher metadata.Matcher, ) (goengine.EventStream, error)
Load returns an eventstream based on the provided constraints
func (*EventStore) LoadWithConnection ¶
func (e *EventStore) LoadWithConnection( ctx context.Context, conn driverSQL.Queryer, streamName goengine.StreamName, fromNumber int64, count *uint, matcher metadata.Matcher, ) (goengine.EventStream, error)
LoadWithConnection returns an eventstream based on the provided constraints using the provided sql.Conn