edatpgx

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2021 License: MIT Imports: 17 Imported by: 5

README

edat-pgx - Postgres stores for edat

Installation

go get -u github.com/stackus/edat-pgx

Usage Example

import "github.com/stack/edat-pgx"

conn, _ := pgxpool.Connect(ctx, "your-connection-string")

// Create a store for aggregate events using the pool connection
eventStore := edatpgx.NewEventStore(conn)

// Create a store using a session client that uses a pgx.Tx from context
client := edatpgx.NewSessionClient()
eventStore := edatpgx.NewEventStore(client)

Prerequisites

Go 1.16

Features

Stores accept *pgx.Conn, *pgxpool.Pool, pgx.Tx and edatpgx.Client for clients. Middleware will accept *pgxpool.Pool only.

  • Session Client NewSessionClient()
  • Aggregate Event Store NewEventStore(client, ...options)
  • Outbox Message Store and Producer NewMessageStore(client, ....options)
  • Saga Instance Store NewSagaInstanceStore(client, ...options)
  • Aggregate Snapshot Store NewSnapshotStore(client, ...options)
  • Message Receiver Session Middleware ReceiverSessionMiddleware(*pgxpool.Pool, log.Logger)
  • Web Request Session Middleware WebSessionMiddleware(*pgxpool.Pool, log.Logger)
  • Grpc Request Session (Unary) Interceptor RpcSessionUnaryMiddleware(*pgxpool.Pool, log.Logger)

TODOs

  • Documentation
  • Tests, tests, and more tests

Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.

Please make sure to update tests as appropriate.

License

MIT

Documentation

Index

Constants

View Source
const (
	DefaultEventTableName        = "events"
	DefaultMessageTableName      = "messages"
	DefaultSagaInstanceTableName = "saga_instances"
	DefaultSnapshotTableName     = "snapshots"

	CreateEventsTableSQL = `` /* 429-byte string literal not displayed */

	CreateMessagesTableSQL = `` /* 386-byte string literal not displayed */

	CreateMessagesUnpublishedIndexSQL = `CREATE INDEX unpublished_idx ON messages (created_at) WHERE not published`
	CreateMessagesPublishedIndexSQL   = `CREATE INDEX published_idx ON messages (modified_at) WHERE published`

	CreateSagaInstancesTableSQL = `` /* 410-byte string literal not displayed */

	CreateSnapshotsTableSQL = `` /* 344-byte string literal not displayed */

)

Variables

View Source
var ErrInvalidTxValue = errors.New("tx value is not a pgx.Tx type")
View Source
var ErrTxNotInContext = errors.New("pgx.Tx is not set for session")

Functions

func NewSnapshotStore

func NewSnapshotStore(client Client, options ...SnapshotStoreOption) es.AggregateRootStoreMiddleware

func ReceiverSessionMiddleware

func ReceiverSessionMiddleware(conn *pgxpool.Pool, logger log.Logger) func(msg.MessageReceiver) msg.MessageReceiver

func RpcSessionStreamInterceptor added in v0.0.2

func RpcSessionStreamInterceptor(_ *pgxpool.Pool, logger log.Logger) grpc.StreamServerInterceptor

func RpcSessionUnaryInterceptor added in v0.0.2

func RpcSessionUnaryInterceptor(conn *pgxpool.Pool, logger log.Logger) grpc.UnaryServerInterceptor

func WebSessionMiddleware

func WebSessionMiddleware(conn *pgxpool.Pool, logger log.Logger) func(http.Handler) http.Handler

Types

type Client

type Client interface {
	Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
	Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
	QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
	QueryFunc(ctx context.Context, sql string, args []interface{}, scans []interface{}, f func(pgx.QueryFuncRow) error) (pgconn.CommandTag, error)
	SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults
	Begin(ctx context.Context) (pgx.Tx, error)
	BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error)
}

Client covers a subset of what both pgx.Conn or pgxpool.Pool provide

func NewSessionClient

func NewSessionClient() Client

NewSessionClient returns a pgx.Conn or pgxpool.Pool compatible client that uses an active transaction from context

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

func NewEventStore

func NewEventStore(client Client, options ...EventStoreOption) *EventStore

func (*EventStore) Load

func (s *EventStore) Load(ctx context.Context, root *es.AggregateRoot) error

func (*EventStore) Save

func (s *EventStore) Save(ctx context.Context, root *es.AggregateRoot) (err error)

type EventStoreOption

type EventStoreOption func(*EventStore)

func WithEventStoreLogger

func WithEventStoreLogger(logger log.Logger) EventStoreOption

func WithEventStoreTableName

func WithEventStoreTableName(tableName string) EventStoreOption

type MessageStore

type MessageStore struct {
	// contains filtered or unexported fields
}

func NewMessageStore

func NewMessageStore(client Client, options ...MessageStoreOption) *MessageStore

func (*MessageStore) Close

func (s *MessageStore) Close(ctx context.Context) error

func (*MessageStore) Fetch

func (s *MessageStore) Fetch(ctx context.Context, limit int) ([]outbox.Message, error)

func (*MessageStore) MarkPublished

func (s *MessageStore) MarkPublished(ctx context.Context, messageIDs []string) error

func (*MessageStore) PurgePublished

func (s *MessageStore) PurgePublished(ctx context.Context, olderThan time.Duration) error

func (*MessageStore) Save

func (s *MessageStore) Save(ctx context.Context, message outbox.Message) error

func (*MessageStore) Send

func (s *MessageStore) Send(ctx context.Context, channel string, message msg.Message) error

type MessageStoreOption

type MessageStoreOption func(*MessageStore)

func WithMessageStoreLogger

func WithMessageStoreLogger(logger log.Logger) MessageStoreOption

func WithMessageStoreTableName

func WithMessageStoreTableName(tableName string) MessageStoreOption

type SagaInstanceStore

type SagaInstanceStore struct {
	// contains filtered or unexported fields
}

func NewSagaInstanceStore

func NewSagaInstanceStore(client Client, options ...SagaInstanceStoreOption) *SagaInstanceStore

func (*SagaInstanceStore) Find

func (s *SagaInstanceStore) Find(ctx context.Context, sagaName, sagaID string) (*saga.Instance, error)

func (*SagaInstanceStore) Save

func (s *SagaInstanceStore) Save(ctx context.Context, sagaInstance *saga.Instance) error

func (*SagaInstanceStore) Update

func (s *SagaInstanceStore) Update(ctx context.Context, sagaInstance *saga.Instance) error

type SagaInstanceStoreOption

type SagaInstanceStoreOption func(*SagaInstanceStore)

func WithSagaInstanceStoreLogger

func WithSagaInstanceStoreLogger(logger log.Logger) SagaInstanceStoreOption

func WithSagaInstanceStoreTableName

func WithSagaInstanceStoreTableName(tableName string) SagaInstanceStoreOption

type SnapshotStore

type SnapshotStore struct {
	// contains filtered or unexported fields
}

func (*SnapshotStore) Load

func (s *SnapshotStore) Load(ctx context.Context, root *es.AggregateRoot) error

func (*SnapshotStore) Save

func (s *SnapshotStore) Save(ctx context.Context, root *es.AggregateRoot) error

type SnapshotStoreOption

type SnapshotStoreOption func(*SnapshotStore)

func WithSnapshotStoreLogger

func WithSnapshotStoreLogger(logger log.Logger) SnapshotStoreOption

func WithSnapshotStoreStrategy

func WithSnapshotStoreStrategy(strategy es.SnapshotStrategy) SnapshotStoreOption

func WithSnapshotStoreTableName

func WithSnapshotStoreTableName(tableName string) SnapshotStoreOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL