rsql

package
v0.0.0-...-c9f981a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2024 License: MIT Imports: 17 Imported by: 19

Documentation

Overview

Package rsql provides reflex event stream and cursor table implementations for mysql.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrConsecEvent occurs when the difference between the ids of two consecutive events is not 1.
	ErrConsecEvent = errors.New("non-consecutive event ids", j.C("ERR_bc3dcacb92b9761f"))
	// ErrInvalidIntID occurs when a non-int value is specified for an integer id
	ErrInvalidIntID = errors.New("invalid id, only int supported", j.C("ERR_82d0368b5478d378"))
)

Functions

func FillGaps

func FillGaps(dbc *sql.DB, gapTable gapTable)

FillGaps registers the default gap filler with the events table. It inserts noops into the events table when gaps are detected. Both EventsTable and EventsTableInt satisfy the gapTable internal interface.

Usage:
var events = rsql.NewEventsTable()
...
rsql.FillGaps(dbc, events)

func GetLatestIDForTesting

func GetLatestIDForTesting(ctx context.Context, _ *testing.T, dbc *sql.DB, eventTable, idField string) (int64, error)

GetLatestIDForTesting fetches the latest event id from the event table

func GetNextEventsForTesting

func GetNextEventsForTesting(ctx context.Context, _ *testing.T, dbc *sql.DB, table *EventsTable, after int64, lag time.Duration) ([]*reflex.Event, error)

GetNextEventsForTesting fetches a bunch of events from the event table

func IsDuplicateErrorInsertion

func IsDuplicateErrorInsertion(err error) bool

func TestCursorsTable

func TestCursorsTable(t *testing.T, dbc *sql.DB, table CursorsTable)

TestCursorsTable runs assertions on the cursor table to check it has a valid schema

func TestEventsTable

func TestEventsTable(t *testing.T, dbc *sql.DB, table *EventsTable)

TestEventsTable provides a helper function to test event tables.

func TestEventsTableInt

func TestEventsTableInt(t *testing.T, dbc *sql.DB, table *EventsTableInt)

TestEventsTableInt provides a helper function to test event tables with int foreign id columns.

func TestEventsTableWithID

func TestEventsTableWithID(t *testing.T, dbc *sql.DB, table *EventsTable, foreignID string)

TestEventsTableWithID provides a helper function to test event tables.

Types

type CursorType

type CursorType int

CursorType is either int or string

func (CursorType) Cast

func (t CursorType) Cast(cursor string) (interface{}, error)

Cast returns cursor casted to type.

type CursorsOption

type CursorsOption func(*ctable)

CursorsOption are the configurations for the cursor table

func WithCursorAsyncDisabled

func WithCursorAsyncDisabled() CursorsOption

WithCursorAsyncDisabled provides an option to disable async writes.

func WithCursorAsyncPeriod

func WithCursorAsyncPeriod(d time.Duration) CursorsOption

WithCursorAsyncPeriod provides an option to configure the async write period. It defaults to 5 seconds.

func WithCursorCursorField

func WithCursorCursorField(field string) CursorsOption

WithCursorCursorField provides an option to configure the cursor field. It defaults to 'last_event_id'.

func WithCursorIDField

func WithCursorIDField(field string) CursorsOption

WithCursorIDField provides an option to configure the cursor ID field. It defaults to 'id'.

func WithCursorSetCounter

func WithCursorSetCounter(f func()) CursorsOption

WithCursorSetCounter provides an option to set the cursor DB set cursor metric. It defaults to prometheus metrics.

func WithCursorStrings

func WithCursorStrings() CursorsOption

WithCursorStrings provides an option to configure the cursor type to string. It defaults to int.

func WithCursorTimeField

func WithCursorTimeField(field string) CursorsOption

WithCursorTimeField provides an option to configure the cursor time field. It defaults to 'updated_at'.

func WithTestCursorSleep

func WithTestCursorSleep(_ testing.TB, f func(time.Duration)) CursorsOption

WithTestCursorSleep replaces the sleep function for testing.

type CursorsTable

type CursorsTable interface {
	GetCursor(ctx context.Context, dbc *sql.DB, consumerID string) (string, error)
	SetCursor(ctx context.Context, dbc *sql.DB, consumerID string, cursor string) error
	Flush(ctx context.Context) error
	Clone(ol ...CursorsOption) CursorsTable
	ToStore(dbc *sql.DB, ol ...CursorsOption) reflex.CursorStore
}

CursorsTable provides an interface to an event consumer cursors db table.

func NewCursorsTable

func NewCursorsTable(name string, options ...CursorsOption) CursorsTable

NewCursorsTable returns a new CursorsTable implementation.

type ErrorEventInserter

type ErrorEventInserter func(ctx context.Context, tx *sql.Tx, foreignID string, typ reflex.EventType, metadata []byte) (NotifyFunc, error)

ErrorEventInserter abstracts the insertion of an event into a sql table including providing a notification capability.

type ErrorInserter

type ErrorInserter func(ctx context.Context, tx *sql.Tx, consumer, eventID, errMsg string, errStatus reflex.ErrorStatus) (string, error)

ErrorInserter abstracts the insertion of an error into a sql table.

type ErrorsOption

type ErrorsOption func(*ErrorsTable)

ErrorsOption defines a functional option to configure new error tables.

func WithErrorCounter

func WithErrorCounter(counter func(consumer string)) ErrorsOption

WithErrorCounter provides an option to set the error counter which counts the errors being successfully record to the error table.

func WithErrorCreatedAtField

func WithErrorCreatedAtField(field string) ErrorsOption

WithErrorCreatedAtField provides an option to set the error DB created at timestamp field. It defaults to 'created_at'.

func WithErrorEventConsumerField

func WithErrorEventConsumerField(field string) ErrorsOption

WithErrorEventConsumerField provides an option to set the event consumer DB msg field. It defaults to 'consumer'.

func WithErrorEventIDField

func WithErrorEventIDField(field string) ErrorsOption

WithErrorEventIDField provides an option to set the event DB eventID field. It defaults to 'event_id'.

func WithErrorEventInserter

func WithErrorEventInserter(inserter ErrorEventInserter) ErrorsOption

WithErrorEventInserter provides an option to set the error event inserter which inserts error into a sql table. The default inserter would be the EventsTable.InsertWithMetadata function of a given EventsTable instance.

func WithErrorIDField

func WithErrorIDField(field string) ErrorsOption

WithErrorIDField provides an option to set the event DB ID field. This is useful for tables which implement custom error loaders. It defaults to 'id'.

func WithErrorInserter

func WithErrorInserter(inserter ErrorInserter) ErrorsOption

WithErrorInserter provides an option to set the error inserter which inserts error into a sql table. The default inserter would be generated from the rsql.makeDefaultErrorInserter function parameterised with the errTableSchema of the given ErrorsTable.

func WithErrorMsgField

func WithErrorMsgField(field string) ErrorsOption

WithErrorMsgField provides an option to set the error DB msg field. It defaults to 'msg'.

func WithErrorRecordOnly

func WithErrorRecordOnly() ErrorsOption

WithErrorRecordOnly provides an option to set that we only record errors and not enable streaming of those errors.

func WithErrorStatusField

func WithErrorStatusField(field string) ErrorsOption

WithErrorStatusField provides an option to set the error status field. It defaults to 'status'.

func WithErrorTableName

func WithErrorTableName(name string) ErrorsOption

WithErrorTableName provides an option to set the name of the consumer error table. It defaults to 'consumer_errors'.

func WithErrorUpdatedAtField

func WithErrorUpdatedAtField(field string) ErrorsOption

WithErrorUpdatedAtField provides an option to set the error DB updated at timestamp field. It defaults to 'updated_at'.

type ErrorsTable

type ErrorsTable struct {
	// contains filtered or unexported fields
}

ErrorsTable provides reflex consumer event errors insertion and streaming for a sql db table.

func NewErrorsTable

func NewErrorsTable(opts ...ErrorsOption) *ErrorsTable

NewErrorsTable returns a new event consumer errors table.

func (*ErrorsTable) ToErrorInsertFunc

func (t *ErrorsTable) ToErrorInsertFunc(dbc *sql.DB) reflex.ErrorInsertFunc

type EventsNotifier

type EventsNotifier interface {
	// StreamWatcher is passed as the default StreamWatcher every time stream()
	// is called on the EventsTable.
	StreamWatcher

	// Notify is called by reflex every time an event is inserted into the
	// EventsTable.
	Notify()
}

EventsNotifier provides a way to receive notifications when an event is inserted in an EventsTable, and a way to trigger an EventsTable's StreamClients when there are new events available.

type EventsOption

type EventsOption func(*EventsTable)

EventsOption defines a functional option to configure new event tables.

func WithEventForeignIDField

func WithEventForeignIDField(field string) EventsOption

WithEventForeignIDField provides an option to set the event DB foreignID field. It defaults to 'foreign_id'.

func WithEventIDField

func WithEventIDField(field string) EventsOption

WithEventIDField provides an option to set the event DB ID field. This is useful for tables which implement custom event loaders. It defaults to 'id'.

func WithEventMetadataField

func WithEventMetadataField(field string) EventsOption

WithEventMetadataField provides an option to set the event DB metadata field. It is disabled by default; ie. ”.

func WithEventTimeField

func WithEventTimeField(field string) EventsOption

WithEventTimeField provides an option to set the event DB timestamp field. It defaults to 'timestamp'.

func WithEventTraceField

func WithEventTraceField(field string) EventsOption

WithEventTraceField provides an option to persist an opentelemetry trace through the events stream

func WithEventTypeField

func WithEventTypeField(field string) EventsOption

WithEventTypeField provides an option to set the event DB type field. It defaults to 'type'.

func WithEventsBackoff

func WithEventsBackoff(d time.Duration) EventsOption

WithEventsBackoff provides an option to set the backoff period between polling the DB for new events. It defaults to 10s.

func WithEventsCacheEnabled deprecated

func WithEventsCacheEnabled() EventsOption

WithEventsCacheEnabled provides an option to enable the read-through cache on the events table.

Deprecated: Cache enabled by default.

func WithEventsInMemNotifier

func WithEventsInMemNotifier() EventsOption

WithEventsInMemNotifier provides an option that enables an in-memory notifier.

Note: This can have a significant impact on database load if the cache is disabled since all consumers might query the database on every event.

func WithEventsInserter

func WithEventsInserter(inserter inserter) EventsOption

WithEventsInserter provides an option to set the event inserter which inserts event into a sql table. The default inserter is configured with the WithEventsXField options.

func WithEventsLoader

func WithEventsLoader(loader loader) EventsOption

WithEventsLoader provides an option to set the base event loader function. The base event loader loads events returns the next available events and the associated next cursor after the previous cursor or an error. The default loader is configured with the WithEventsXField options.

func WithEventsNotifier

func WithEventsNotifier(notifier EventsNotifier) EventsOption

WithEventsNotifier provides an option to receive event notifications and trigger StreamClients when new events are available.

func WithIncludeNoopEvents

func WithIncludeNoopEvents() EventsOption

WithIncludeNoopEvents noop events are not streamed by default. Use this option to enable the streaming of noop events

func WithoutEventsCache

func WithoutEventsCache() EventsOption

WithoutEventsCache provides an option to disable the read-through cache on the events table.

type EventsTable

type EventsTable struct {
	// contains filtered or unexported fields
}

EventsTable provides reflex event insertion and streaming for a sql db table.

func NewEventsTable

func NewEventsTable(name string, opts ...EventsOption) *EventsTable

NewEventsTable returns a new events table.

func (*EventsTable) Clone

func (t *EventsTable) Clone(opts ...EventsOption) *EventsTable

Clone returns a new events table generated from the config of t with the new options applied. Note that non-config fields are not copied, so things like the cache and inmemnotifier are not shared.

func (*EventsTable) Insert

func (t *EventsTable) Insert(ctx context.Context, tx *sql.Tx, foreignID string,
	typ reflex.EventType,
) (NotifyFunc, error)

Insert inserts an event into the EventsTable and returns a function that can be optionally called to notify the table's EventNotifier of the change. The intended pattern for this function is:

notify, err := eTable.Insert(ctx, tx, ...)
if err != nil {
  return err
}
defer notify()
return doWorkAndCommit(tx)

func (*EventsTable) InsertWithMetadata

func (t *EventsTable) InsertWithMetadata(ctx context.Context, tx *sql.Tx, foreignID string,
	typ reflex.EventType, metadata []byte,
) (NotifyFunc, error)

InsertWithMetadata inserts an event with metadata into the EventsTable. Note metadata is disabled by default, enable with WithEventMetadataField option.

func (*EventsTable) ListenGaps

func (t *EventsTable) ListenGaps(f func(Gap))

ListenGaps adds f to a slice of functions that are called when a gap is detected. One first call, it starts a goroutine that serves these functions.

func (*EventsTable) Stream

func (t *EventsTable) Stream(ctx context.Context, dbc *sql.DB, after string,
	opts ...reflex.StreamOption,
) reflex.StreamClient

Stream implements reflex.StreamFunc and returns a StreamClient that streams events from the db. It is only safe for a single goroutine to use.

func (*EventsTable) ToStream

func (t *EventsTable) ToStream(dbc *sql.DB, opts1 ...reflex.StreamOption) reflex.StreamFunc

ToStream returns a reflex StreamFunc interface of this EventsTable.

type EventsTableInt

type EventsTableInt struct {
	*EventsTable
}

EventsTableInt wraps reflex EventsTable and provides typed int64 foreign id inserts.

func NewEventsTableInt

func NewEventsTableInt(name string, options ...EventsOption) *EventsTableInt

NewEventsTableInt returns an event table which uses integers for the foreign IDs

func (*EventsTableInt) Clone

func (e *EventsTableInt) Clone(opts ...EventsOption) *EventsTableInt

Clone works as EventsTable.Clone.

func (*EventsTableInt) Insert

func (e *EventsTableInt) Insert(ctx context.Context, tx *sql.Tx, foreignID int64,
	typ reflex.EventType,
) (NotifyFunc, error)

Insert works as EventsTable.Insert except that foreign id is an int64.

func (*EventsTableInt) InsertWithMetadata

func (e *EventsTableInt) InsertWithMetadata(ctx context.Context, tx *sql.Tx, foreignID int64,
	typ reflex.EventType, metadata []byte,
) (NotifyFunc, error)

InsertWithMetadata works as EventsTable.InsertWithMetadata except that foreign id is an int64.

type Gap

type Gap struct {
	// Prev(ious) event ID.
	Prev int64

	// Next event ID.
	Next int64
}

Gap represents a gap in monotonically incrementing events IDs. The gap is after previous before next, so if Prev+1==Next, then there is no gap.

type NotifyFunc

type NotifyFunc func()

NotifyFunc notifies an events table's underlying EventsNotifier.

type StreamWatcher

type StreamWatcher interface {
	// C returns a channel that blocks until the next event is available in the
	// StreamWatcher's EventsTable. C will be called every time a StreamClient
	// reaches the head of an events table.
	C() <-chan struct{}
}

StreamWatcher provides the ability to trigger the streamer when new events are available.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL