sqlxes

package module
v0.0.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2021 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Migrate

func Migrate(config *Config, conn *sqlx.DB) error

Migrate executes table and types migration for the event store and snapshot. The table names are taken from the config.

Types

type Config

type Config struct {
	EventTable     string
	SnapshotTable  string
	SchemaName     string // Optional
	AggregateTable string
	WorkersCount   int
}

Config is the configuration for the event storage.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig creates a new default config.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks if the config is valid to use.

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Storage is the implementation of the eventsource.Storage interface for the sqlx driver.

func New

func New(conn *sqlx.DB, cfg *Config, d xservice.Driver) (*Storage, error)

New creates a new event storage based on provided sqlx connection.

func (*Storage) As added in v0.0.9

func (s *Storage) As(dst interface{}) error

As exposes driver specific implementation.

func (*Storage) BeginTx added in v0.0.9

func (s *Storage) BeginTx(ctx context.Context) (*Transaction, error)

BeginTx creates and begins a new transaction, which exposes *sqlx.Tx and allows atomic commits.

func (*Storage) Err

func (s *Storage) Err(err error) error

Err handles error message with given driver.

func (*Storage) GetSnapshot

func (s *Storage) GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*eventsource.Snapshot, error)

GetSnapshot gets the latest snapshot for given aggregate. Implements eventsource.Storage interface.

func (*Storage) ListEvents added in v0.0.9

func (s *Storage) ListEvents(ctx context.Context, aggId, aggType string) ([]*eventsource.Event, error)

ListEvents gets the event stream for provided aggregate. Implements eventsource.Storage interface.

func (*Storage) ListEventsFromRevision added in v0.0.9

func (s *Storage) ListEventsFromRevision(ctx context.Context, aggId string, aggType string, from int64) ([]*eventsource.Event, error)

ListEventsFromRevision gets the event stream for given aggregate where the revision is subsequent from provided.

func (*Storage) NewCursor

func (s *Storage) NewCursor(ctx context.Context, aggType string, aggVersion int64) (eventsource.Cursor, error)

NewCursor creates a new cursor.

func (*Storage) SaveEvents

func (s *Storage) SaveEvents(ctx context.Context, es []*eventsource.Event) error

SaveEvents stores provided events in the database. Implements eventsource.Storage interface.

func (*Storage) SaveSnapshot

func (s *Storage) SaveSnapshot(ctx context.Context, snap *eventsource.Snapshot) error

SaveSnapshot stores the snapshot in the database. Implements eventsource.Storage interface.

func (*Storage) StreamEvents

func (s *Storage) StreamEvents(ctx context.Context, req *eventsource.StreamEventsRequest) (<-chan *eventsource.Event, error)

StreamEvents opens the channel of the events stream that matches given request. Implements eventsource.Storage.

type Transaction

type Transaction struct {
	// contains filtered or unexported fields
}

Transaction is the implementation of the

func (*Transaction) As added in v0.0.9

func (t *Transaction) As(dst interface{}) error

As sets the destination with the *sqlx.Tx implementation.

func (*Transaction) Commit

func (t *Transaction) Commit() error

Commit commits the transaction.

func (*Transaction) Done added in v0.0.11

func (t *Transaction) Done() bool

Done checks if the transaction is already done.

func (*Transaction) Err

func (s *Transaction) Err(err error) error

Err handles error message with given driver.

func (*Transaction) GetSnapshot

func (s *Transaction) GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*eventsource.Snapshot, error)

GetSnapshot gets the latest snapshot for given aggregate. Implements eventsource.Storage interface.

func (*Transaction) ListEvents added in v0.0.9

func (s *Transaction) ListEvents(ctx context.Context, aggId, aggType string) ([]*eventsource.Event, error)

ListEvents gets the event stream for provided aggregate. Implements eventsource.Storage interface.

func (*Transaction) ListEventsFromRevision added in v0.0.9

func (s *Transaction) ListEventsFromRevision(ctx context.Context, aggId string, aggType string, from int64) ([]*eventsource.Event, error)

ListEventsFromRevision gets the event stream for given aggregate where the revision is subsequent from provided.

func (*Transaction) NewCursor

func (s *Transaction) NewCursor(ctx context.Context, aggType string, aggVersion int64) (eventsource.Cursor, error)

NewCursor creates a new cursor.

func (*Transaction) Rollback added in v0.0.11

func (t *Transaction) Rollback() error

Rollback the transaction.

func (*Transaction) SaveEvents

func (s *Transaction) SaveEvents(ctx context.Context, es []*eventsource.Event) error

SaveEvents stores provided events in the database. Implements eventsource.Storage interface.

func (*Transaction) SaveSnapshot

func (s *Transaction) SaveSnapshot(ctx context.Context, snap *eventsource.Snapshot) error

SaveSnapshot stores the snapshot in the database. Implements eventsource.Storage interface.

func (*Transaction) StreamEvents

func (s *Transaction) StreamEvents(ctx context.Context, req *eventsource.StreamEventsRequest) (<-chan *eventsource.Event, error)

StreamEvents opens the channel of the events stream that matches given request. Implements eventsource.Storage.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL