Documentation ¶
Index ¶
- func OutboxInsertHandler[K eventsourcing.ID](tableName string) store.InTxHandler[K]
- func TxFromContext(ctx context.Context) *sql.Tx
- type EsRepository
- func (r *EsRepository[K, PK]) Forget(ctx context.Context, request eventsourcing.ForgetRequest[K], ...) error
- func (r *EsRepository[K, PK]) GetAggregateEvents(ctx context.Context, aggregateID K, snapVersion int) ([]*eventsourcing.Event[K], error)
- func (r *EsRepository[K, PK]) GetEvents(ctx context.Context, after, until eventid.EventID, batchSize int, ...) ([]*eventsourcing.Event[K], error)
- func (r *EsRepository[K, PK]) GetEventsByRawIDs(ctx context.Context, ids []string) ([]*eventsourcing.Event[K], error)
- func (r *EsRepository[K, PK]) GetSnapshot(ctx context.Context, aggregateID K) (eventsourcing.Snapshot[K], error)
- func (r *EsRepository[K, PK]) HasIdempotencyKey(ctx context.Context, idempotencyKey string) (bool, error)
- func (r *EsRepository[K, PK]) MigrateConsistentProjection(ctx context.Context, logger *slog.Logger, locker lock.WaitLocker, ...) error
- func (r *EsRepository[K, PK]) MigrateInPlaceCopyReplace(ctx context.Context, revision int, snapshotThreshold uint32, ...) error
- func (r *EsRepository[K, PK]) SaveEvent(ctx context.Context, eRec *eventsourcing.EventRecord[K]) (eventid.EventID, uint32, error)
- func (r *EsRepository[K, PK]) SaveSnapshot(ctx context.Context, snapshot *eventsourcing.Snapshot[K]) error
- type Event
- type EventsRepository
- type Feed
- type FeedOption
- func WithBackoffMaxElapsedTime[K eventsourcing.ID, PK eventsourcing.IDPt[K]](duration time.Duration) FeedOption[K, PK]
- func WithFilter[K eventsourcing.ID, PK eventsourcing.IDPt[K]](filter *store.Filter) FeedOption[K, PK]
- func WithPublication[K eventsourcing.ID, PK eventsourcing.IDPt[K]](publicationName string) FeedOption[K, PK]
- type KVStore
- type Option
- func WithEventsTable[K eventsourcing.ID, PK eventsourcing.IDPt[K]](table string) Option[K, PK]
- func WithMetadata[K eventsourcing.ID, PK eventsourcing.IDPt[K]](metadata eventsourcing.Metadata) Option[K, PK]
- func WithMetadataHook[K eventsourcing.ID, PK eventsourcing.IDPt[K]](fn store.MetadataHook[K]) Option[K, PK]
- func WithSnapshotsTable[K eventsourcing.ID, PK eventsourcing.IDPt[K]](table string) Option[K, PK]
- func WithTxHandler[K eventsourcing.ID, PK eventsourcing.IDPt[K]](txHandler store.InTxHandler[K]) Option[K, PK]
- type OutboxRepository
- type ProjectionMigrater
- type ProjectionMigrationStep
- type Repository
- type Snapshot
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func OutboxInsertHandler ¶ added in v0.34.0
func OutboxInsertHandler[K eventsourcing.ID](tableName string) store.InTxHandler[K]
Types ¶
type EsRepository ¶
type EsRepository[K eventsourcing.ID, PK eventsourcing.IDPt[K]] struct { Repository // contains filtered or unexported fields }
func NewStore ¶
func NewStore[K eventsourcing.ID, PK eventsourcing.IDPt[K]](db *sql.DB, options ...Option[K, PK]) *EsRepository[K, PK]
func NewStoreWithURL ¶ added in v0.33.0
func NewStoreWithURL[K eventsourcing.ID, PK eventsourcing.IDPt[K]](connString string, options ...Option[K, PK]) (*EsRepository[K, PK], error)
func (*EsRepository[K, PK]) Forget ¶
func (r *EsRepository[K, PK]) Forget(ctx context.Context, request eventsourcing.ForgetRequest[K], forget func(kind eventsourcing.Kind, body []byte) ([]byte, error)) error
func (*EsRepository[K, PK]) GetAggregateEvents ¶
func (r *EsRepository[K, PK]) GetAggregateEvents(ctx context.Context, aggregateID K, snapVersion int) ([]*eventsourcing.Event[K], error)
func (*EsRepository[K, PK]) GetEventsByRawIDs ¶ added in v0.35.0
func (r *EsRepository[K, PK]) GetEventsByRawIDs(ctx context.Context, ids []string) ([]*eventsourcing.Event[K], error)
func (*EsRepository[K, PK]) GetSnapshot ¶
func (r *EsRepository[K, PK]) GetSnapshot(ctx context.Context, aggregateID K) (eventsourcing.Snapshot[K], error)
func (*EsRepository[K, PK]) HasIdempotencyKey ¶
func (*EsRepository[K, PK]) MigrateConsistentProjection ¶ added in v0.28.0
func (r *EsRepository[K, PK]) MigrateConsistentProjection( ctx context.Context, logger *slog.Logger, locker lock.WaitLocker, migrater ProjectionMigrater[K], getByID getByIDFunc[K], ) error
MigrateConsistentProjection migrates a consistent projection by creating a new one
func (*EsRepository[K, PK]) MigrateInPlaceCopyReplace ¶ added in v0.21.0
func (r *EsRepository[K, PK]) MigrateInPlaceCopyReplace( ctx context.Context, revision int, snapshotThreshold uint32, rehydrateFunc func(eventsourcing.Aggregater[K], *eventsourcing.Event[K]) error, codec eventsourcing.Codec[K], handler eventsourcing.MigrationHandler[K], targetAggregateKind eventsourcing.Kind, originalAggregateKind eventsourcing.Kind, originalEventTypeCriteria ...eventsourcing.Kind, ) error
func (*EsRepository[K, PK]) SaveEvent ¶
func (r *EsRepository[K, PK]) SaveEvent(ctx context.Context, eRec *eventsourcing.EventRecord[K]) (eventid.EventID, uint32, error)
func (*EsRepository[K, PK]) SaveSnapshot ¶
func (r *EsRepository[K, PK]) SaveSnapshot(ctx context.Context, snapshot *eventsourcing.Snapshot[K]) error
type Event ¶
type Event struct { ID eventid.EventID AggregateID string AggregateIDHash int32 AggregateVersion uint32 AggregateKind eventsourcing.Kind Kind eventsourcing.Kind Body []byte IdempotencyKey store.NilString CreatedAt time.Time Migration int Migrated bool Metadata eventsourcing.Metadata }
Event is the event data stored in the database
type EventsRepository ¶ added in v0.34.0
type EventsRepository[K eventsourcing.ID] interface { GetEventsByRawIDs(context.Context, []string) ([]*eventsourcing.Event[K], error) }
type Feed ¶
type Feed[K eventsourcing.ID, PK eventsourcing.IDPt[K]] struct { // contains filtered or unexported fields }
func NewFeed ¶
func NewFeed[K eventsourcing.ID, PK eventsourcing.IDPt[K]](connString string, slotIndex, totalSlots int, sinker sink.Sinker[K], options ...FeedOption[K, PK]) (Feed[K, PK], error)
NewFeed creates a new Postgresql 10+ logic replication feed. slotIndex is the index of this feed in a group of feeds. Its value should be between 1 and totalSlots. slotIndex=1 has a special maintenance behaviour of dropping any slot above totalSlots.
type FeedOption ¶
type FeedOption[K eventsourcing.ID, PK eventsourcing.IDPt[K]] func(*Feed[K, PK])
func WithBackoffMaxElapsedTime ¶ added in v0.18.0
func WithBackoffMaxElapsedTime[K eventsourcing.ID, PK eventsourcing.IDPt[K]](duration time.Duration) FeedOption[K, PK]
func WithFilter ¶ added in v0.36.0
func WithFilter[K eventsourcing.ID, PK eventsourcing.IDPt[K]](filter *store.Filter) FeedOption[K, PK]
func WithPublication ¶
func WithPublication[K eventsourcing.ID, PK eventsourcing.IDPt[K]](publicationName string) FeedOption[K, PK]
type KVStore ¶ added in v0.34.0
type KVStore struct { Repository // contains filtered or unexported fields }
func NewKVStoreWithURL ¶ added in v0.34.0
type Option ¶ added in v0.28.0
type Option[K eventsourcing.ID, PK eventsourcing.IDPt[K]] func(*EsRepository[K, PK])
func WithEventsTable ¶ added in v0.30.0
func WithEventsTable[K eventsourcing.ID, PK eventsourcing.IDPt[K]](table string) Option[K, PK]
func WithMetadata ¶ added in v0.36.0
func WithMetadata[K eventsourcing.ID, PK eventsourcing.IDPt[K]](metadata eventsourcing.Metadata) Option[K, PK]
WithMetadata defines the metadata to be save on every event. Data keys will be converted to lower case
func WithMetadataHook ¶ added in v0.36.0
func WithMetadataHook[K eventsourcing.ID, PK eventsourcing.IDPt[K]](fn store.MetadataHook[K]) Option[K, PK]
WithMetadataHook defines the hook that will return the metadata. This metadata will override any metadata defined at the repository level
func WithSnapshotsTable ¶ added in v0.36.0
func WithSnapshotsTable[K eventsourcing.ID, PK eventsourcing.IDPt[K]](table string) Option[K, PK]
func WithTxHandler ¶ added in v0.34.0
func WithTxHandler[K eventsourcing.ID, PK eventsourcing.IDPt[K]](txHandler store.InTxHandler[K]) Option[K, PK]
type OutboxRepository ¶ added in v0.34.0
type OutboxRepository[K eventsourcing.ID] struct { Repository // contains filtered or unexported fields }
func NewOutboxStore ¶ added in v0.34.0
func NewOutboxStore[K eventsourcing.ID](db *sql.DB, tableName string, eventsRepo EventsRepository[K]) *OutboxRepository[K]
func (*OutboxRepository[K]) PendingEvents ¶ added in v0.34.0
func (r *OutboxRepository[K]) PendingEvents(ctx context.Context, batchSize int, filter store.Filter) ([]*eventsourcing.Event[K], error)
type ProjectionMigrater ¶ added in v0.28.0
type ProjectionMigrater[K eventsourcing.ID] interface { // Name returns the name of the new projection. It is used to track if the projection was fully processed Name() string // Steps returns the order of the aggregate types to process to recreate the projection Steps() []ProjectionMigrationStep[K] // Flush is called for each aggregate with the current state Flush(context.Context, store.AggregateMetadata[K], eventsourcing.Aggregater[K]) error }
ProjectionMigrater represents the structure implementation that a projection must return when asked how to rebuild itself
type ProjectionMigrationStep ¶ added in v0.28.0
type ProjectionMigrationStep[K eventsourcing.ID] struct { AggregateKind eventsourcing.Kind // Factory creates a new aggregate instance Factory func() eventsourcing.Aggregater[K] }
type Repository ¶ added in v0.33.0
type Repository struct {
// contains filtered or unexported fields
}
func (Repository) Connection ¶ added in v0.34.0
func (r Repository) Connection() *sqlx.DB