sql

package module
v0.0.1-alpha.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2023 License: MIT Imports: 9 Imported by: 0

README

Streams Driver for SQL

The stream driver for SQL offers a Writer implementation to be used by systems implementing the transactional outbox messaging pattern.

Moreover, the Message Egress Proxy (aka. log trailing) component could be used along this driver to publish the messages to the message broker / stream.

Finally, depending on the underlying database engine, the Message Egress Proxy component could be combined with a Change-Data-Capture (CDC) listener sidecar daemon.

This sidecar daemon could listen directly to CDC logs issued by the database engine (e.g. WAL in PSQL, binlog in MySQL).

Another way avoid constant database polling is by implementing an agnostic sidecar which is deployed along the main container; it would accept connections through OSI Level 4 sockets (e.g. TCP, UDP, UNIX). The application container would send signals (heartbeats) to the sidecar daemon to trigger a data polling process. Lastly, the daemon would use Message Egress Proxy to forward message batches to actual data-in-motion infrastructure.

Requirements

In order for this driver to work, the SQL database MUST have an outbox table with the following schema (called streams_egress by default).

-- Using KSUID as primary key, hence the CHAR(27) type.
--
-- NOTE: BYTEA type is a Postgres type used for binary array types.
CREATE TABLE IF NOT EXISTS streams_egress(
    batch_id CHAR(27) PRIMARY KEY,
    in_flight BOOLEAN DEFAULT FALSE,
    message_count INTEGER DEFAULT 0,
    write_retries INTEGER DEFAULT 0,
    last_write_error VARCHAR(255),
    raw_data BYTEA NOT NULL
);

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrUnableToWriteRows = errors.New("streams.sql: unable to write rows")
)

Functions

This section is empty.

Types

type EgressStorage

type EgressStorage struct {
	// contains filtered or unexported fields
}

A EgressStorage is a SQL implementation of proxy.EgressStorage. Enables interaction with a stream egress table (aka. outbox).

func NewEgressStorage

func NewEgressStorage(db *sql.DB, opts ...egress.StorageOption) EgressStorage

NewEgressStorage allocates a new EgressStorage instance with default configuration but open to apply any proxy.EgressStorageOption(s).

func NewEgressStorageWithConfig

func NewEgressStorageWithConfig(db *sql.DB, cfg egress.StorageConfig) EgressStorage

NewEgressStorageWithConfig allocates a new EgressStorage instance with a specific proxy.EgressStorageConfig.

func (EgressStorage) Commit

func (e EgressStorage) Commit(ctx context.Context, batchID string) error

func (EgressStorage) GetBatch

func (e EgressStorage) GetBatch(ctx context.Context, batchID string) (egress.Batch, error)

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

A Writer is a SQL database writer. This specific kind of streams.Writer is used by systems implementing the transactional outbox messaging pattern. More in depth, a Writer instance will attempt to execute a transactional write into an <<egress table>> where all messages generated by the system will be stored, so they may be later publish by an <<egress proxy agent>> (aka. log trailing).

Writer instances MUST be used along transaction context functions (persistence.SetTransactionContext, persistence.GetTransactionContext). This is because Writer instances obtain the sql.Tx instance from the context. If no context is found, then Writer.Write will fail.

Finally, the main reason to apply the transactional outbox pattern is to obtain write atomicity between a database and an external message stream (i.e. message broker, event bus).

Transactional outbox pattern reference: https://microservices.io/patterns/data/transactional-outbox.html

func NewWriter

func NewWriter(opts ...WriterOption) Writer

NewWriter allocates a new Writer instance with default configuration but open to apply any WriterOption(s).

func NewWriterWithConfig

func NewWriterWithConfig(cfg WriterConfig) Writer

NewWriterWithConfig allocates a new Writer instance with passed configuration.

func (Writer) Write

func (w Writer) Write(ctx context.Context, msgBatch []streams.Message) (err error)

Write append a new batch of messages into the egress table.

A transaction context (persistence.SetTransactionContext) MUST be set before calling this routine. This is because Writer instances obtain the sql.Tx instance from the context. If no context is found, then Writer.Write will fail.

Batch identifier will be taken from TransactionContext.TransactionID.

type WriterConfig

type WriterConfig struct {
	Codec             codec.Codec // used to encode message batches, so it can be stored on the database (default codec.ProtocolBuffers).
	WriterEgressTable string      // table to write message batches to be later published.
}

A WriterConfig is the Writer configuration. Writer uses streams.IdentifierFactory to generate batch identifiers.

type WriterOption

type WriterOption interface {
	// contains filtered or unexported methods
}

A WriterOption is used to configure a Writer instance in an idiomatic & fine-grained way.

func WithCodec

func WithCodec(c codec.Codec) WriterOption

WithCodec sets the codec.Codec to be used by Writer to encode message batches, so data may be stored into a database efficiently.

func WithEgressTable

func WithEgressTable(table string) WriterOption

WithEgressTable sets the name of the table to be used as <<message egress table>>. A <<message egress table>> is a system database table used by `streams` mechanisms to write batch of messages to be published into a message stream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL