postgresql

package
v0.38.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2023 License: MIT Imports: 31 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func OutboxInsertHandler added in v0.34.0

func OutboxInsertHandler[K eventsourcing.ID](tableName string) store.InTxHandler[K]

func TxFromContext added in v0.28.0

func TxFromContext(ctx context.Context) *sql.Tx

Types

type EsRepository

type EsRepository[K eventsourcing.ID, PK eventsourcing.IDPt[K]] struct {
	Repository
	// contains filtered or unexported fields
}

func NewStore

func NewStore[K eventsourcing.ID, PK eventsourcing.IDPt[K]](db *sql.DB, options ...Option[K, PK]) *EsRepository[K, PK]

func NewStoreWithURL added in v0.33.0

func NewStoreWithURL[K eventsourcing.ID, PK eventsourcing.IDPt[K]](connString string, options ...Option[K, PK]) (*EsRepository[K, PK], error)

func (*EsRepository[K, PK]) Forget

func (r *EsRepository[K, PK]) Forget(ctx context.Context, request eventsourcing.ForgetRequest[K], forget func(kind eventsourcing.Kind, body []byte) ([]byte, error)) error

func (*EsRepository[K, PK]) GetAggregateEvents

func (r *EsRepository[K, PK]) GetAggregateEvents(ctx context.Context, aggregateID K, snapVersion int) ([]*eventsourcing.Event[K], error)

func (*EsRepository[K, PK]) GetEvents

func (r *EsRepository[K, PK]) GetEvents(ctx context.Context, after, until eventid.EventID, batchSize int, filter store.Filter) ([]*eventsourcing.Event[K], error)

func (*EsRepository[K, PK]) GetEventsByRawIDs added in v0.35.0

func (r *EsRepository[K, PK]) GetEventsByRawIDs(ctx context.Context, ids []string) ([]*eventsourcing.Event[K], error)

func (*EsRepository[K, PK]) GetSnapshot

func (r *EsRepository[K, PK]) GetSnapshot(ctx context.Context, aggregateID K) (eventsourcing.Snapshot[K], error)

func (*EsRepository[K, PK]) HasIdempotencyKey

func (r *EsRepository[K, PK]) HasIdempotencyKey(ctx context.Context, idempotencyKey string) (bool, error)

func (*EsRepository[K, PK]) MigrateConsistentProjection added in v0.28.0

func (r *EsRepository[K, PK]) MigrateConsistentProjection(
	ctx context.Context,
	logger *slog.Logger,
	locker lock.WaitLocker,
	migrater ProjectionMigrater[K],
	getByID getByIDFunc[K],
) error

MigrateConsistentProjection migrates a consistent projection by creating a new one

func (*EsRepository[K, PK]) MigrateInPlaceCopyReplace added in v0.21.0

func (r *EsRepository[K, PK]) MigrateInPlaceCopyReplace(
	ctx context.Context,
	revision int,
	snapshotThreshold uint32,
	rehydrateFunc func(eventsourcing.Aggregater[K], *eventsourcing.Event[K]) error,
	codec eventsourcing.Codec[K],
	handler eventsourcing.MigrationHandler[K],
	targetAggregateKind eventsourcing.Kind,
	originalAggregateKind eventsourcing.Kind,
	originalEventTypeCriteria ...eventsourcing.Kind,
) error

func (*EsRepository[K, PK]) SaveEvent

func (r *EsRepository[K, PK]) SaveEvent(ctx context.Context, eRec *eventsourcing.EventRecord[K]) (eventid.EventID, uint32, error)

func (*EsRepository[K, PK]) SaveSnapshot

func (r *EsRepository[K, PK]) SaveSnapshot(ctx context.Context, snapshot *eventsourcing.Snapshot[K]) error

type Event

type Event struct {
	ID               eventid.EventID
	AggregateID      string
	AggregateIDHash  int32
	AggregateVersion uint32
	AggregateKind    eventsourcing.Kind
	Kind             eventsourcing.Kind
	Body             []byte
	IdempotencyKey   store.NilString
	CreatedAt        time.Time
	Migration        int
	Migrated         bool
	Metadata         eventsourcing.Metadata
}

Event is the event data stored in the database

type EventsRepository added in v0.34.0

type EventsRepository[K eventsourcing.ID] interface {
	GetEventsByRawIDs(context.Context, []string) ([]*eventsourcing.Event[K], error)
}

type Feed

type Feed[K eventsourcing.ID, PK eventsourcing.IDPt[K]] struct {
	// contains filtered or unexported fields
}

func NewFeed

func NewFeed[K eventsourcing.ID, PK eventsourcing.IDPt[K]](connString string, slotIndex, totalSlots int, sinker sink.Sinker[K], options ...FeedOption[K, PK]) (Feed[K, PK], error)

NewFeed creates a new Postgresql 10+ logic replication feed. slotIndex is the index of this feed in a group of feeds. Its value should be between 1 and totalSlots. slotIndex=1 has a special maintenance behaviour of dropping any slot above totalSlots.

func (*Feed[K, PK]) Run added in v0.25.0

func (f *Feed[K, PK]) Run(ctx context.Context) error

Run listens to replication logs and pushes them to sinker https://github.com/jackc/pglogrepl/blob/master/example/pglogrepl_demo/main.go

type FeedOption

type FeedOption[K eventsourcing.ID, PK eventsourcing.IDPt[K]] func(*Feed[K, PK])

func WithBackoffMaxElapsedTime added in v0.18.0

func WithBackoffMaxElapsedTime[K eventsourcing.ID, PK eventsourcing.IDPt[K]](duration time.Duration) FeedOption[K, PK]

func WithFilter added in v0.36.0

func WithFilter[K eventsourcing.ID, PK eventsourcing.IDPt[K]](filter *store.Filter) FeedOption[K, PK]

func WithPublication

func WithPublication[K eventsourcing.ID, PK eventsourcing.IDPt[K]](publicationName string) FeedOption[K, PK]

type KVStore added in v0.34.0

type KVStore struct {
	Repository
	// contains filtered or unexported fields
}

func NewKVStore added in v0.34.0

func NewKVStore(db *sql.DB, table string) KVStore

func NewKVStoreWithURL added in v0.34.0

func NewKVStoreWithURL(connString string, table string) (KVStore, error)

func (KVStore) Get added in v0.34.0

func (r KVStore) Get(ctx context.Context, key string) (string, error)

func (KVStore) Put added in v0.34.0

func (m KVStore) Put(ctx context.Context, key string, value string) error

type Option added in v0.28.0

type Option[K eventsourcing.ID, PK eventsourcing.IDPt[K]] func(*EsRepository[K, PK])

func WithEventsTable added in v0.30.0

func WithEventsTable[K eventsourcing.ID, PK eventsourcing.IDPt[K]](table string) Option[K, PK]

func WithMetadata added in v0.36.0

func WithMetadata[K eventsourcing.ID, PK eventsourcing.IDPt[K]](metadata eventsourcing.Metadata) Option[K, PK]

WithMetadata defines the metadata to be save on every event. Data keys will be converted to lower case

func WithMetadataHook added in v0.36.0

func WithMetadataHook[K eventsourcing.ID, PK eventsourcing.IDPt[K]](fn store.MetadataHook[K]) Option[K, PK]

WithMetadataHook defines the hook that will return the metadata. This metadata will override any metadata defined at the repository level

func WithSnapshotsTable added in v0.36.0

func WithSnapshotsTable[K eventsourcing.ID, PK eventsourcing.IDPt[K]](table string) Option[K, PK]

func WithTxHandler added in v0.34.0

func WithTxHandler[K eventsourcing.ID, PK eventsourcing.IDPt[K]](txHandler store.InTxHandler[K]) Option[K, PK]

type OutboxRepository added in v0.34.0

type OutboxRepository[K eventsourcing.ID] struct {
	Repository
	// contains filtered or unexported fields
}

func NewOutboxStore added in v0.34.0

func NewOutboxStore[K eventsourcing.ID](db *sql.DB, tableName string, eventsRepo EventsRepository[K]) *OutboxRepository[K]

func (*OutboxRepository[K]) AfterSink added in v0.34.0

func (r *OutboxRepository[K]) AfterSink(ctx context.Context, eID eventid.EventID) error

func (*OutboxRepository[K]) PendingEvents added in v0.34.0

func (r *OutboxRepository[K]) PendingEvents(ctx context.Context, batchSize int, filter store.Filter) ([]*eventsourcing.Event[K], error)

type ProjectionMigrater added in v0.28.0

type ProjectionMigrater[K eventsourcing.ID] interface {
	// Name returns the name of the new projection. It is used to track if the projection was fully processed
	Name() string
	// Steps returns the order of the aggregate types to process to recreate the projection
	Steps() []ProjectionMigrationStep[K]
	// Flush is called for each aggregate with the current state
	Flush(context.Context, store.AggregateMetadata[K], eventsourcing.Aggregater[K]) error
}

ProjectionMigrater represents the structure implementation that a projection must return when asked how to rebuild itself

type ProjectionMigrationStep added in v0.28.0

type ProjectionMigrationStep[K eventsourcing.ID] struct {
	AggregateKind eventsourcing.Kind
	// Factory creates a new aggregate instance
	Factory func() eventsourcing.Aggregater[K]
}

type Repository added in v0.33.0

type Repository struct {
	// contains filtered or unexported fields
}

func (Repository) Connection added in v0.34.0

func (r Repository) Connection() *sqlx.DB

func (Repository) TxRunner added in v0.33.0

func (r Repository) TxRunner() func(ctx context.Context, fn func(context.Context) error) error

func (*Repository) WithTx added in v0.33.0

func (r *Repository) WithTx(ctx context.Context, fn func(context.Context, *sql.Tx) error) error

type Snapshot

type Snapshot struct {
	ID               eventid.EventID
	AggregateID      string
	AggregateVersion uint32
	AggregateKind    eventsourcing.Kind
	Body             []byte
	CreatedAt        time.Time
	Metadata         eventsourcing.Metadata
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL