sqlpersistence

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 26, 2024 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package sqlpersistence is an SQL-based persistence provider with drivers for several popular SQL database systems.

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultMaxIdleConns is the default maximum number of idle connections
	// allowed in the database pool.
	DefaultMaxIdleConns = runtime.GOMAXPROCS(0)

	// DefaultMaxOpenConns is the default maximum number of open connections
	// allowed in the database pool.
	DefaultMaxOpenConns = DefaultMaxIdleConns * 10

	// DefaultMaxConnLifetime is the default maximum lifetime of database
	// connections.
	DefaultMaxConnLifetime = 10 * time.Minute
)

Functions

func CreateSchema

func CreateSchema(ctx context.Context, db *sql.DB) error

CreateSchema creates the schema elements necessary to use the given database.

It does not return an error if the schema already exists.

func DropSchema

func DropSchema(ctx context.Context, db *sql.DB) error

DropSchema drops the schema elements necessary to use the given database.

It does not return an error if the schema does not exist.

Types

type AggregateDriver

type AggregateDriver interface {
	// InsertAggregateMetaData inserts meta-data for an aggregate instance.
	//
	// It returns false if the row already exists.
	InsertAggregateMetaData(
		ctx context.Context,
		tx *sql.Tx,
		ak string,
		md persistence.AggregateMetaData,
	) (bool, error)

	// UpdateAggregateMetaData updates meta-data for an aggregate instance.
	//
	// It returns false if the row does not exist or md.Revision is not current.
	UpdateAggregateMetaData(
		ctx context.Context,
		tx *sql.Tx,
		ak string,
		md persistence.AggregateMetaData,
	) (bool, error)

	// SelectAggregateMetaData selects an aggregate instance's meta-data.
	SelectAggregateMetaData(
		ctx context.Context,
		db *sql.DB,
		ak, hk, id string,
	) (persistence.AggregateMetaData, error)
}

AggregateDriver is the subset of the Driver interface that is concerned with aggregates.

type DSNProvider

type DSNProvider struct {

	// DriverName is the driver name to be passed to sql.Open().
	DriverName string

	// DSN is the data-source name to be passed to sql.Open().
	DSN string

	// Driver is the Verity SQL driver to use with this database. If it is nil,
	// it is chosen automatically from one of the built-in drivers.
	Driver Driver

	// MaxIdleConnections is the maximum number of idle connections allowed in
	// the database pool.
	//
	// If it is zero, DefaultMaxIdleConns is used.
	MaxIdleConns int

	// MaxOpenConnections is the maximum number of open connections allowed in
	// the database pool.
	//
	// If it is zero, DefaultMaxOpenConns is used.
	MaxOpenConns int

	// maxConnLifetime is the maximum lifetime of database connections.
	// If it is zero, DefaultMaxConnLifetime is used.
	MaxConnLifetime time.Duration
	// contains filtered or unexported fields
}

DSNProvider is an implementation of provider.Provider for SQL that opens a a database pool using a DSN.

func (*DSNProvider) Open

Open returns a data-store for a specific application.

k is the identity key of the application.

Data stores are opened for exclusive use. If another engine instance has already opened this application's data-store, ErrDataStoreLocked is returned.

type Driver

type Driver interface {
	AggregateDriver
	EventDriver
	OffsetDriver
	ProcessDriver
	QueueDriver

	// IsCompatibleWith returns nil if this driver can be used with db.
	IsCompatibleWith(ctx context.Context, db *sql.DB) error

	// Begin starts a transaction for use in a peristence.Transaction.
	Begin(ctx context.Context, db *sql.DB) (*sql.Tx, error)

	// CreateSchema creates any SQL schema elements required by the driver.
	CreateSchema(ctx context.Context, db *sql.DB) error

	// DropSchema removes any SQL schema elements created by CreateSchema().
	DropSchema(ctx context.Context, db *sql.DB) error
}

Driver is used to interface with the underlying SQL database.

type EventDriver

type EventDriver interface {
	// UpdateNextOffset increments the next offset by one and returns the new
	// value.
	UpdateNextOffset(
		ctx context.Context,
		tx *sql.Tx,
		ak string,
	) (uint64, error)

	// InsertEvent saves an event at a specific offset.
	InsertEvent(
		ctx context.Context,
		tx *sql.Tx,
		o uint64,
		env *envelopespec.Envelope,
	) error

	// InsertEventFilter inserts a filter that limits selected events to those
	// with a portable name in the given set.
	//
	// It returns the filter's ID.
	InsertEventFilter(
		ctx context.Context,
		db *sql.DB,
		ak string,
		f map[string]struct{},
	) (int64, error)

	// DeleteEventFilter deletes an event filter.
	//
	// f is the filter ID, as returned by InsertEventFilter().
	DeleteEventFilter(
		ctx context.Context,
		db *sql.DB,
		f int64,
	) error

	// PurgeEventFilters deletes all event filters for the given application.
	PurgeEventFilters(
		ctx context.Context,
		db *sql.DB,
		ak string,
	) error

	// SelectNextEventOffset selects the next "unused" offset.
	SelectNextEventOffset(
		ctx context.Context,
		db *sql.DB,
		ak string,
	) (uint64, error)

	// SelectEventsByType selects events that match the given type filter.
	//
	// f is a filter ID, as returned by InsertEventFilter(). o is the minimum
	// offset to include in the results.
	SelectEventsByType(
		ctx context.Context,
		db *sql.DB,
		ak string,
		f int64,
		o uint64,
	) (*sql.Rows, error)

	// SelectEventsBySource selects events that were produced by a specific
	// handler.
	SelectEventsBySource(
		ctx context.Context,
		db *sql.DB,
		ak, hk, id string,
		o uint64,
	) (*sql.Rows, error)

	// SelectOffsetByMessageID selects the offset of the message with the given
	// ID. It returns false as a second return value if the message cannot be
	// found.
	SelectOffsetByMessageID(
		ctx context.Context,
		db *sql.DB,
		id string,
	) (uint64, bool, error)

	// ScanEvent scans the next event from a row-set returned by
	// SelectEventsByType() and SelectEventsBySource().
	ScanEvent(
		rows *sql.Rows,
		ev *persistence.Event,
	) error
}

EventDriver is the subset of the Driver interface that is concerned with events.

type OffsetDriver

type OffsetDriver interface {
	// LoadOffset loads the last offset associated with the given source
	// application key sk. ak is the 'owner' application key.
	//
	// If there is no offset associated with the given source application key,
	// the offset is returned as zero and error as nil.
	LoadOffset(
		ctx context.Context,
		db *sql.DB,
		ak, sk string,
	) (uint64, error)

	// InsertOffset inserts a new offset associated with the given source
	// application key sk. ak is the 'owner' application key.
	//
	// It returns false if the row already exists.
	InsertOffset(
		ctx context.Context,
		tx *sql.Tx,
		ak, sk string,
		n uint64,
	) (bool, error)

	// UpdateOffset updates the offset associated with the given source
	// application key sk. ak is the 'owner' application key.
	//
	// It returns false if the row does not exist or c is not the current offset
	// associated with the given application key.
	UpdateOffset(
		ctx context.Context,
		tx *sql.Tx,
		ak, sk string,
		c, n uint64,
	) (bool, error)
}

OffsetDriver is the subset of the Driver interface that is concerned with persisting event stream offsets.

type ProcessDriver

type ProcessDriver interface {
	// InsertProcessInstance inserts a process instance.
	//
	// It returns false if the row already exists.
	InsertProcessInstance(
		ctx context.Context,
		tx *sql.Tx,
		ak string,
		inst persistence.ProcessInstance,
	) (bool, error)

	// UpdateProcessInstance updates a process instance.
	//
	// It returns false if the row does not exist or inst.Revision is not
	// current.
	UpdateProcessInstance(
		ctx context.Context,
		tx *sql.Tx,
		ak string,
		inst persistence.ProcessInstance,
	) (bool, error)

	// DeleteProcessInstance deletes a process instance.
	//
	// It returns false if the row does not exist or inst.Revision is not
	// current.
	DeleteProcessInstance(
		ctx context.Context,
		tx *sql.Tx,
		ak string,
		inst persistence.ProcessInstance,
	) (bool, error)

	// SelectProcessInstance selects a process instance's data.
	SelectProcessInstance(
		ctx context.Context,
		db *sql.DB,
		ak, hk, id string,
	) (persistence.ProcessInstance, error)
}

ProcessDriver is the subset of the Driver interface that is concerned with processess.

type Provider

type Provider struct {

	// DB is the SQL database to use.
	DB *sql.DB

	// Driver is the Verity SQL driver to use with this database. If it is nil,
	// it is chosen automatically from one of the built-in drivers.
	Driver Driver
	// contains filtered or unexported fields
}

Provider is an implementation of provider.Provider for SQL that uses an existing open database pool.

func (*Provider) Open

Open returns a data-store for a specific application.

k is the identity key of the application.

Data stores are opened for exclusive use. If another engine instance has already opened this application's data-store, ErrDataStoreLocked is returned.

type QueueDriver

type QueueDriver interface {
	// InsertQueueMessage inserts a message in the queue.
	//
	// It returns false if the row already exists.
	InsertQueueMessage(
		ctx context.Context,
		tx *sql.Tx,
		ak string,
		m persistence.QueueMessage,
	) (bool, error)

	// UpdateQueueMessage updates meta-data about a message that is already on
	// the queue.
	//
	// It returns false if the row does not exist or m.Revision is not current.
	UpdateQueueMessage(
		ctx context.Context,
		tx *sql.Tx,
		ak string,
		m persistence.QueueMessage,
	) (bool, error)

	// DeleteQueueMessage deletes a message from the queue.
	//
	// It returns false if the row does not exist or m.Revision is not current.
	DeleteQueueMessage(
		ctx context.Context,
		tx *sql.Tx,
		ak string,
		m persistence.QueueMessage,
	) (bool, error)

	// DeleteQueueTimeoutMessagesByProcessInstance deletes timeout messages that
	// were produced by a specific process instance.
	DeleteQueueTimeoutMessagesByProcessInstance(
		ctx context.Context,
		tx *sql.Tx,
		ak string,
		inst persistence.ProcessInstance,
	) error

	// SelectQueueMessages selects up to n messages from the queue.
	SelectQueueMessages(
		ctx context.Context,
		db *sql.DB,
		ak string,
		n int,
	) (*sql.Rows, error)

	// ScanQueueMessage scans the next message from a row-set returned by
	// SelectQueueMessages().
	ScanQueueMessage(
		rows *sql.Rows,
		m *persistence.QueueMessage,
	) error
}

QueueDriver is the subset of the Driver interface that is concerned with the message queue subsystem.

Directories

Path Synopsis
Package mysql is a MySQL driver for the SQL persistence provider.
Package mysql is a MySQL driver for the SQL persistence provider.
Package postgres is a PostgreSQL driver for the SQL persistence provider.
Package postgres is a PostgreSQL driver for the SQL persistence provider.
Package sqlite is an SQlite v3 driver for the SQL persistence provider.
Package sqlite is an SQlite v3 driver for the SQL persistence provider.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL