edatpgx

package
v1.2.19 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2024 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultEventTableName        = "events"
	DefaultMessageTableName      = "messages"
	DefaultSagaInstanceTableName = "saga_instances"
	DefaultSnapshotTableName     = "snapshots"

	CreateEventsTableSQL = `` /* 429-byte string literal not displayed */

	CreateMessagesTableSQL = `` /* 386-byte string literal not displayed */

	CreateMessagesUnpublishedIndexSQL = `CREATE INDEX unpublished_idx ON messages (created_at) WHERE not published`
	CreateMessagesPublishedIndexSQL   = `CREATE INDEX published_idx ON messages (modified_at) WHERE published`

	CreateSagaInstancesTableSQL = `` /* 410-byte string literal not displayed */

	CreateSnapshotsTableSQL = `` /* 344-byte string literal not displayed */

)

Variables

View Source
var ErrInvalidTxValue = errors.New("tx value is not a pgx.Tx type")
View Source
var ErrTxNotInContext = errors.New("pgx.Tx is not set for session")

Functions

func NewSnapshotStore added in v1.2.3

func NewSnapshotStore(client Client, options ...SnapshotStoreOption) es.AggregateRootStoreMiddleware

func ReceiverSessionMiddleware

func ReceiverSessionMiddleware(conn *pgxpool.Pool, logger edatlog.Logger) func(msg.MessageReceiver) msg.MessageReceiver

func RpcSessionStreamInterceptor

func RpcSessionStreamInterceptor(_ *pgxpool.Pool, logger edatlog.Logger) grpc.StreamServerInterceptor

func RpcSessionUnrayInterceptor

func RpcSessionUnrayInterceptor(conn *pgxpool.Pool, logger edatlog.Logger) grpc.UnaryServerInterceptor

func WebSessionMiddleware

func WebSessionMiddleware(conn *pgxpool.Pool, logger edatlog.Logger) func(http.Handler) http.Handler

Types

type Client

type Client interface {
	Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
	Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
	QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
	QueryFunc(ctx context.Context, sql string, args []interface{}, scans []interface{}, f func(pgx.QueryFuncRow) error) (pgconn.CommandTag, error)
	SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults
	Begin(ctx context.Context) (pgx.Tx, error)
	BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)
}

Client covers a subset of what both pgx.Conn or pgxpool.Pool provide

func NewSessionClient

func NewSessionClient() Client

NewSessionClient returns a pgx.Conn or pgxpool.Pool compatible client that uses an active transaction from context

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

func NewEventStore

func NewEventStore(cliet Client, options ...EventStoreOption) *EventStore

func (*EventStore) Load

func (s *EventStore) Load(ctx context.Context, root *es.AggregateRoot) error

func (*EventStore) Save

func (s *EventStore) Save(ctx context.Context, root *es.AggregateRoot) (err error)

type EventStoreOption

type EventStoreOption func(store *EventStore)

func WithEventStoreLogger

func WithEventStoreLogger(logger edatlog.Logger) EventStoreOption

func WithEventStoreTableName

func WithEventStoreTableName(tableName string) EventStoreOption

type MessageStore

type MessageStore struct {
	// contains filtered or unexported fields
}

func NewMessageStore

func NewMessageStore(client Client, options ...MessageStoreOption) *MessageStore

func (*MessageStore) Close

func (s *MessageStore) Close(ctx context.Context) error

func (*MessageStore) Fetch

func (s *MessageStore) Fetch(ctx context.Context, limit int) ([]outbox.Message, error)

func (*MessageStore) MarkPublished

func (s *MessageStore) MarkPublished(ctx context.Context, messageIDs []string) error

func (*MessageStore) PurgePublished

func (s *MessageStore) PurgePublished(ctx context.Context, olderThan time.Duration) error

func (*MessageStore) Save

func (s *MessageStore) Save(ctx context.Context, message outbox.Message) error

func (*MessageStore) Send

func (s *MessageStore) Send(ctx context.Context, channel string, message msg.Message) error

type MessageStoreOption

type MessageStoreOption func(store *MessageStore)

func WithMessageStoreLogger

func WithMessageStoreLogger(logger edatlog.Logger) MessageStoreOption

func WithMessageStoreTableName

func WithMessageStoreTableName(tableName string) MessageStoreOption

type SagaInstanceStore added in v1.2.14

type SagaInstanceStore struct {
	// contains filtered or unexported fields
}

func NewSagaInstanceStore added in v1.2.14

func NewSagaInstanceStore(client Client, options ...SagaInstanceStoreOption) *SagaInstanceStore

func (*SagaInstanceStore) Find added in v1.2.14

func (s *SagaInstanceStore) Find(ctx context.Context, sagaName, sagaID string) (*saga.Instance, error)

func (*SagaInstanceStore) Save added in v1.2.14

func (s *SagaInstanceStore) Save(ctx context.Context, sagaInstance *saga.Instance) error

func (*SagaInstanceStore) Update added in v1.2.14

func (s *SagaInstanceStore) Update(ctx context.Context, sagaInstance *saga.Instance) error

type SagaInstanceStoreOption added in v1.2.14

type SagaInstanceStoreOption func(store *SagaInstanceStore)

func WithSagaInstanceStoreLogger added in v1.2.14

func WithSagaInstanceStoreLogger(logger edatlog.Logger) SagaInstanceStoreOption

func WithSagaInstanceStoreTableName added in v1.2.14

func WithSagaInstanceStoreTableName(tableName string) SagaInstanceStoreOption

type SnapshotStore added in v1.2.3

type SnapshotStore struct {
	// contains filtered or unexported fields
}

func (*SnapshotStore) Load added in v1.2.3

func (s *SnapshotStore) Load(ctx context.Context, root *es.AggregateRoot) error

func (*SnapshotStore) Save added in v1.2.3

func (s *SnapshotStore) Save(ctx context.Context, root *es.AggregateRoot) error

type SnapshotStoreOption added in v1.2.3

type SnapshotStoreOption func(store *SnapshotStore)

func WithSnapshotStoreLogger added in v1.2.3

func WithSnapshotStoreLogger(logger edatlog.Logger) SnapshotStoreOption

func WithSnapshotStoreStrategy added in v1.2.3

func WithSnapshotStoreStrategy(strategy es.SnapshotStrategy) SnapshotStoreOption

func WithSnapshotStoreTableName added in v1.2.3

func WithSnapshotStoreTableName(tableName string) SnapshotStoreOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL