Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var (
ErrUnableToWriteRows = errors.New("streams.sql: unable to write rows")
)
Functions ¶
This section is empty.
Types ¶
type EgressStorage ¶
type EgressStorage struct {
// contains filtered or unexported fields
}
A EgressStorage is a SQL implementation of proxy.EgressStorage. Enables interaction with a stream egress table (aka. outbox).
func NewEgressStorage ¶
func NewEgressStorage(db *sql.DB, opts ...egress.StorageOption) EgressStorage
NewEgressStorage allocates a new EgressStorage instance with default configuration but open to apply any proxy.EgressStorageOption(s).
func NewEgressStorageWithConfig ¶
func NewEgressStorageWithConfig(db *sql.DB, cfg egress.StorageConfig) EgressStorage
NewEgressStorageWithConfig allocates a new EgressStorage instance with a specific proxy.EgressStorageConfig.
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
A Writer is a SQL database writer. This specific kind of streams.Writer is used by systems implementing the transactional outbox messaging pattern. More in depth, a Writer instance will attempt to execute a transactional write into an <<egress table>> where all messages generated by the system will be stored, so they may be later publish by an <<egress proxy agent>> (aka. log trailing).
Writer instances MUST be used along transaction context functions (persistence.SetTransactionContext, persistence.GetTransactionContext). This is because Writer instances obtain the sql.Tx instance from the context. If no context is found, then Writer.Write will fail.
Finally, the main reason to apply the transactional outbox pattern is to obtain write atomicity between a database and an external message stream (i.e. message broker, event bus).
Transactional outbox pattern reference: https://microservices.io/patterns/data/transactional-outbox.html
func NewWriter ¶
func NewWriter(opts ...WriterOption) Writer
NewWriter allocates a new Writer instance with default configuration but open to apply any WriterOption(s).
func NewWriterWithConfig ¶
func NewWriterWithConfig(cfg WriterConfig) Writer
NewWriterWithConfig allocates a new Writer instance with passed configuration.
func (Writer) Write ¶
Write append a new batch of messages into the egress table.
A transaction context (persistence.SetTransactionContext) MUST be set before calling this routine. This is because Writer instances obtain the sql.Tx instance from the context. If no context is found, then Writer.Write will fail.
Batch identifier will be taken from TransactionContext.TransactionID.
type WriterConfig ¶
type WriterConfig struct { Codec codec.Codec // used to encode message batches, so it can be stored on the database (default codec.ProtocolBuffers). WriterEgressTable string // table to write message batches to be later published. }
A WriterConfig is the Writer configuration. Writer uses streams.IdentifierFactory to generate batch identifiers.
type WriterOption ¶
type WriterOption interface {
// contains filtered or unexported methods
}
A WriterOption is used to configure a Writer instance in an idiomatic & fine-grained way.
func WithCodec ¶
func WithCodec(c codec.Codec) WriterOption
WithCodec sets the codec.Codec to be used by Writer to encode message batches, so data may be stored into a database efficiently.
func WithEgressTable ¶
func WithEgressTable(table string) WriterOption
WithEgressTable sets the name of the table to be used as <<message egress table>>. A <<message egress table>> is a system database table used by `streams` mechanisms to write batch of messages to be published into a message stream.