eventstreams

package
v1.4.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 29, 2024 License: Apache-2.0 Imports: 23 Imported by: 0

README

Event streams

This package provides a utility for exposing an event stream over WebSockets and Webhooks.

  • Connectivity:
    • WebSockets support for inbound connections
    • Webhooks support for outbound connections
  • Reliability:
    • Workload managed mode: at-least-once delivery
    • Broadcast mode: at-most-once delivery
    • Batching for performance
    • Checkpointing for the at-least-once delivery assurance
  • Convenience for packaging into apps:
    • Plug-in persistence (including allowing you multiple streams with CRUD.Scoped())
    • Out-of-the-box CRUD on event streams, using DB backed storage
    • Server-side topicFilter event filtering (regular expression)
  • Semi-opinionated:
    • How batches are spelled
    • How WebSocket flow control payloads are spelled (start,ack,nack,batch)
  • Flexibility:
    • Bring your own message payload (note topic and sequenceId always added)
    • Bring your own configuration type (must implement DB Scan & Value functions)

Example

A simple in-memory command line pub/sub example is provided:

Documentation

Index

Constants

View Source
const (
	ConfigTLSConfigName = "name"

	ConfigCheckpointsAsynchronous            = "asynchronous"
	ConfigCheckpointsUnmatchedEventThreshold = "unmatchedEventThreshold"

	ConfigDisablePrivateIPs = "disablePrivateIPs"

	ConfigWebhooksDefaultTLSConfig = "tlsConfigName"

	ConfigWebSocketsDistributionMode = "distributionMode"

	ConfigDefaultsErrorHandling     = "errorHandling"
	ConfigDefaultsBatchSize         = "batchSize"
	ConfigDefaultsBatchTimeout      = "batchTimeout"
	ConfigDefaultsRetryTimeout      = "retryTimeout"
	ConfigDefaultsBlockedRetryDelay = "blockedRetryDelay"
)
View Source
const MessageTypeEventBatch = "event_batch"

Variables

View Source
var (
	EventStreamTypeWebhook   = fftypes.FFEnumValue("estype", "webhook")
	EventStreamTypeWebSocket = fftypes.FFEnumValue("estype", "websocket")
)
View Source
var (
	ErrorHandlingTypeBlock = fftypes.FFEnumValue("ehtype", "block")
	ErrorHandlingTypeSkip  = fftypes.FFEnumValue("ehtype", "skip")
)
View Source
var (
	DispatchStatusDispatching = fftypes.FFEnumValue("edstatus", "dispatching")
	DispatchStatusRetrying    = fftypes.FFEnumValue("edstatus", "retrying")
	DispatchStatusBlocked     = fftypes.FFEnumValue("edstatus", "blocked")
	DispatchStatusComplete    = fftypes.FFEnumValue("edstatus", "complete")
	DispatchStatusSkipped     = fftypes.FFEnumValue("edstatus", "skipped")
)
View Source
var (
	EventStreamStatusStarted         = fftypes.FFEnumValue("esstatus", "started")
	EventStreamStatusStopped         = fftypes.FFEnumValue("esstatus", "stopped")
	EventStreamStatusDeleted         = fftypes.FFEnumValue("esstatus", "deleted")
	EventStreamStatusStopping        = fftypes.FFEnumValue("esstatus", "stopping")         // not persisted
	EventStreamStatusStoppingDeleted = fftypes.FFEnumValue("esstatus", "stopping_deleted") // not persisted
	EventStreamStatusUnknown         = fftypes.FFEnumValue("esstatus", "unknown")          // not persisted
)
View Source
var (
	DistributionModeBroadcast   = fftypes.FFEnumValue("distmode", "broadcast")
	DistributionModeLoadBalance = fftypes.FFEnumValue("distmode", "load_balance")
)
View Source
var CheckpointFilters = &ffapi.QueryFields{
	"id":         &ffapi.StringField{},
	"created":    &ffapi.TimeField{},
	"updated":    &ffapi.TimeField{},
	"sequenceid": &ffapi.StringField{},
}
View Source
var CheckpointsConfig config.Section
View Source
var DefaultsConfig config.Section
View Source
var GenericEventStreamFilters = &ffapi.QueryFields{
	"id":                &ffapi.StringField{},
	"created":           &ffapi.TimeField{},
	"updated":           &ffapi.TimeField{},
	"name":              &ffapi.StringField{},
	"status":            &ffapi.StringField{},
	"type":              &ffapi.StringField{},
	"topicfilter":       &ffapi.StringField{},
	"initialsequenceid": &ffapi.StringField{},
}
View Source
var RetrySection config.Section
View Source
var RootConfig config.Section
View Source
var WebSocketsDefaultsConfig config.Section
View Source
var WebhookDefaultsConfig config.Section

Functions

func InitConfig

func InitConfig(conf config.Section)

Due to how arrays work currently in the config system, this can only be initialized in one section for the whole process.

Types

type CheckpointsTuningConfig

type CheckpointsTuningConfig struct {
	Asynchronous            bool  `ffstruct:"CheckpointsConfig" json:"asynchronous"`
	UnmatchedEventThreshold int64 `ffstruct:"CheckpointsConfig" json:"unmatchedEventThreshold"`
}

type Config

type Config[CT EventStreamSpec, DT any] struct {
	TLSConfigs        map[string]*fftls.Config `ffstruct:"EventStreamConfig" json:"tlsConfigs,omitempty"`
	Retry             *retry.Retry             `ffstruct:"EventStreamConfig" json:"retry,omitempty"`
	DisablePrivateIPs bool                     `ffstruct:"EventStreamConfig" json:"disabledPrivateIPs"`
	Checkpoints       CheckpointsTuningConfig  `ffstruct:"EventStreamConfig" json:"checkpoints"`
	Defaults          EventStreamDefaults      `ffstruct:"EventStreamConfig" json:"defaults,omitempty"`

	// Allow plugging in additional types (important that the embedding code adds the FFEnum doc entry for the EventStreamType)
	AdditionalDispatchers map[EventStreamType]DispatcherFactory[CT, DT] `json:"-"`
}

func GenerateConfig

func GenerateConfig[CT EventStreamSpec, DT any](ctx context.Context) *Config[CT, DT]

Optional function to generate config directly from YAML configuration using the config package. You can also generate the configuration programmatically

type ConfigWebhookDefaults

type ConfigWebhookDefaults struct {
	ffresty.HTTPConfig
}

type ConfigWebsocketDefaults

type ConfigWebsocketDefaults struct {
	DefaultDistributionMode DistributionMode `ffstruct:"EventStreamConfig" json:"distributionMode"`
}

type DBSerializable

type DBSerializable interface {
	sql.Scanner
	driver.Valuer
}

Let's us check that the config serializes

type Deliver

type Deliver[DT any] func(events []*Event[DT]) SourceInstruction

type DispatchStatus

type DispatchStatus = fftypes.FFEnum

type Dispatcher added in v1.4.3

type Dispatcher[DT any] interface {
	AttemptDispatch(ctx context.Context, attempt int, events *EventBatch[DT]) error
}

type DispatcherFactory added in v1.4.3

type DispatcherFactory[CT EventStreamSpec, DT any] interface {
	Validate(ctx context.Context, conf *Config[CT, DT], spec CT, tlsConfigs map[string]*tls.Config, phase LifecyclePhase) error
	NewDispatcher(ctx context.Context, conf *Config[CT, DT], spec CT) Dispatcher[DT]
}

DispatcherFactory is the interface to plug in a custom dispatcher, for example to provide local in-process processing of events (in addition to remote WebSocket/Webhook consumption). Generics: - CT is the Configuration Type - the custom extensions to the configuration schema - DT is the Data Type - the payload type that will be delivered to the application

type DistributionMode

type DistributionMode = fftypes.FFEnum

type ErrorHandlingType

type ErrorHandlingType = fftypes.FFEnum

type Event

type Event[DataType any] struct {
	EventCommon
	// Data can be anything to deliver for the event - must be JSON marshalable.
	// Will be flattened into the struct.
	// Can define topic and/or sequenceId, but these will overridden with EventCommon strings in the JSON serialization.
	Data *DataType `json:"-"`
}

func (Event[DataType]) MarshalJSON

func (e Event[DataType]) MarshalJSON() ([]byte, error)

func (*Event[DataType]) UnmarshalJSON

func (e *Event[DataType]) UnmarshalJSON(b []byte) error

type EventBatch

type EventBatch[DataType any] struct {
	wsserver.BatchHeader
	Type   string             `json:"type"`   // always MessageTypeEventBatch (for consistent WebSocket flow control)
	Events []*Event[DataType] `json:"events"` // an array of events allows efficient batch acknowledgment
}

func (*EventBatch[DataType]) GetBatchHeader added in v1.4.6

func (eb *EventBatch[DataType]) GetBatchHeader() *wsserver.BatchHeader

type EventCommon

type EventCommon struct {
	Topic      string `json:"topic,omitempty"` // describes the sub-stream of events (optional) allowing sever-side event filtering (regexp)
	SequenceID string `json:"sequenceId"`      // deterministic ID for the event, that must be alpha-numerically orderable within the stream (numbers must be left-padded hex/decimal strings for ordering)
}

type EventStreamActions

type EventStreamActions[CT any] interface {
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
	Status(ctx context.Context) CT
}

type EventStreamCheckpoint

type EventStreamCheckpoint struct {
	ID         *string         `ffstruct:"EventStreamCheckpoint" json:"id"`
	Created    *fftypes.FFTime `ffstruct:"EventStreamCheckpoint" json:"created"`
	Updated    *fftypes.FFTime `ffstruct:"EventStreamCheckpoint" json:"updated"`
	SequenceID *string         `ffstruct:"EventStreamCheckpoint" json:"sequenceId,omitempty"`
}

func (*EventStreamCheckpoint) GetID added in v1.4.1

func (esc *EventStreamCheckpoint) GetID() string

func (*EventStreamCheckpoint) SetCreated added in v1.4.1

func (esc *EventStreamCheckpoint) SetCreated(t *fftypes.FFTime)

func (*EventStreamCheckpoint) SetUpdated added in v1.4.1

func (esc *EventStreamCheckpoint) SetUpdated(t *fftypes.FFTime)

type EventStreamDefaults

type EventStreamDefaults struct {
	ErrorHandling     ErrorHandlingType       `ffstruct:"EventStreamDefaults" json:"errorHandling"`
	BatchSize         int                     `ffstruct:"EventStreamDefaults" json:"batchSize"`
	BatchTimeout      fftypes.FFDuration      `ffstruct:"EventStreamDefaults" json:"batchTimeout"`
	RetryTimeout      fftypes.FFDuration      `ffstruct:"EventStreamDefaults" json:"retryTimeout"`
	BlockedRetryDelay fftypes.FFDuration      `ffstruct:"EventStreamDefaults" json:"blockedRetryDelay"`
	WebSocketDefaults ConfigWebsocketDefaults `ffstruct:"EventStreamDefaults" json:"webSockets,omitempty"`
	WebhookDefaults   ConfigWebhookDefaults   `ffstruct:"EventStreamDefaults" json:"webhooks,omitempty"`
}

type EventStreamSpec

type EventStreamSpec interface {
	dbsql.Resource
	SetID(s string)
	ESFields() *EventStreamSpecFields
	ESType() EventStreamType         // separated from fields to allow choice on restrictions
	WebhookConf() *WebhookConfig     // can return nil if Webhooks not supported
	WebSocketConf() *WebSocketConfig // can return nil if WebSockets not supported
	IsNil() bool                     // needed as quirk of using interfaces with generics
}

type EventStreamSpecFields added in v1.4.4

type EventStreamSpecFields struct {
	Name              *string            `ffstruct:"eventstream" json:"name,omitempty"`
	Status            *EventStreamStatus `ffstruct:"eventstream" json:"status,omitempty"`
	InitialSequenceID *string            `ffstruct:"eventstream" json:"initialSequenceId,omitempty"`
	TopicFilter       *string            `ffstruct:"eventstream" json:"topicFilter,omitempty"`

	ErrorHandling     *ErrorHandlingType  `ffstruct:"eventstream" json:"errorHandling"`
	BatchSize         *int                `ffstruct:"eventstream" json:"batchSize"`
	BatchTimeout      *fftypes.FFDuration `ffstruct:"eventstream" json:"batchTimeout"`
	RetryTimeout      *fftypes.FFDuration `ffstruct:"eventstream" json:"retryTimeout"`
	BlockedRetryDelay *fftypes.FFDuration `ffstruct:"eventstream" json:"blockedRetryDelay"`
	// contains filtered or unexported fields
}

type EventStreamStatistics

type EventStreamStatistics struct {
	StartTime            *fftypes.FFTime `ffstruct:"EventStreamStatistics" json:"startTime"`
	LastDispatchTime     *fftypes.FFTime `ffstruct:"EventStreamStatistics" json:"lastDispatchTime"`
	LastDispatchAttempts int             `ffstruct:"EventStreamStatistics" json:"lastDispatchAttempts,omitempty"`
	LastDispatchFailure  string          `ffstruct:"EventStreamStatistics" json:"lastDispatchFailure,omitempty"`
	LastDispatchStatus   DispatchStatus  `ffstruct:"EventStreamStatistics" json:"lastDispatchComplete"`
	HighestDetected      string          `ffstruct:"EventStreamStatistics" json:"highestDetected"`
	HighestDispatched    string          `ffstruct:"EventStreamStatistics" json:"highestDispatched"`
	Checkpoint           string          `ffstruct:"EventStreamStatistics" json:"checkpoint"`
}

type EventStreamStatus

type EventStreamStatus = fftypes.FFEnum

type EventStreamType

type EventStreamType = fftypes.FFEnum

type GenericEventStream added in v1.4.4

type GenericEventStream struct {
	dbsql.ResourceBase
	Type *EventStreamType `ffstruct:"eventstream" json:"type,omitempty" ffenum:"estype"`
	EventStreamSpecFields
	Webhook    *WebhookConfig         `ffstruct:"eventstream" json:"webhook,omitempty"`
	WebSocket  *WebSocketConfig       `ffstruct:"eventstream" json:"websocket,omitempty"`
	Statistics *EventStreamStatistics `ffstruct:"EventStream" json:"statistics,omitempty"`
}

This is a base object, and set of filters, that you can use if: - You are happy exposing all the built-in types of consumer (webhooks/websockets) - You do not need to extend the configuration in any way - You are happy using UUIDs for your IDs per dbsql.ResourceBase semantics

A pre-built persistence library is provided, and sample migrations, that work with this structure.

The design of the generic is such that you can start with the generic structure, and then move to your own structure later if you want to add more fields.

When you are ready to extend, you need to:

  1. Copy the GenericEventStream source into your own repo, and rename it appropriately. Then you can add your extra configuration fields to it.
  2. Copy the EventStreams() and Checkpoints() CRUD factories into your own repo, and extend them with additional columns etc. as you see fit.

func (*GenericEventStream) ESFields added in v1.4.4

func (ges *GenericEventStream) ESFields() *EventStreamSpecFields

func (*GenericEventStream) ESType added in v1.4.4

func (ges *GenericEventStream) ESType() EventStreamType

func (*GenericEventStream) IsNil added in v1.4.4

func (ges *GenericEventStream) IsNil() bool

func (*GenericEventStream) SetID added in v1.4.4

func (ges *GenericEventStream) SetID(s string)

func (*GenericEventStream) WebSocketConf added in v1.4.4

func (ges *GenericEventStream) WebSocketConf() *WebSocketConfig

func (*GenericEventStream) WebhookConf added in v1.4.4

func (ges *GenericEventStream) WebhookConf() *WebhookConfig

func (*GenericEventStream) WithRuntimeStatus added in v1.4.4

func (ges *GenericEventStream) WithRuntimeStatus(status EventStreamStatus, stats *EventStreamStatistics) *GenericEventStream

type IDValidator added in v1.4.1

type IDValidator func(ctx context.Context, idStr string) error

type LifecyclePhase added in v1.4.3

type LifecyclePhase int
const (
	LifecyclePhasePreInsertValidation LifecyclePhase = iota // on user-supplied context, prior to inserting to DB
	LifecyclePhaseStarting                                  // while initializing for startup (so all defaults should be resolved)
)

type Manager

type Manager[CT EventStreamSpec] interface {
	UpsertStream(ctx context.Context, nameOrID string, esSpec CT) (bool, error)
	GetStreamByID(ctx context.Context, id string, opts ...dbsql.GetOption) (CT, error)
	GetStreamByNameOrID(ctx context.Context, nameOrID string, opts ...dbsql.GetOption) (CT, error)
	ListStreams(ctx context.Context, filter ffapi.Filter) ([]CT, *ffapi.FilterResult, error)
	StopStream(ctx context.Context, nameOrID string) error
	StartStream(ctx context.Context, nameOrID string) error
	ResetStream(ctx context.Context, nameOrID string, sequenceID *string, preStartCallbacks ...func(ctx context.Context, spec CT) error) error
	DeleteStream(ctx context.Context, nameOrID string) error
	Close(ctx context.Context)
}

func NewEventStreamManager

func NewEventStreamManager[CT EventStreamSpec, DT any](ctx context.Context, config *Config[CT, DT], p Persistence[CT], wsProtocol wsserver.Protocol, source Runtime[CT, DT]) (es Manager[CT], err error)

type Persistence

type Persistence[CT EventStreamSpec] interface {
	EventStreams() dbsql.CRUD[CT]
	Checkpoints() dbsql.CRUD[*EventStreamCheckpoint]
	IDValidator() IDValidator
	Close()
}

func NewGenericEventStreamPersistence added in v1.4.4

func NewGenericEventStreamPersistence(db *dbsql.Database, idValidator IDValidator) Persistence[*GenericEventStream]

NewGenericEventStreamPersistence is a helper that builds persistence with no extra config Users of this package can use this in cases where they do not have any additional configuration that needs to be persisted, and are happy using dbsql.ResourceBase for IDs.

type Runtime

type Runtime[ConfigType EventStreamSpec, DataType any] interface {
	// Generate a new unique resource ID (such as a UUID)
	NewID() string
	// Return a COPY of the config object with runtime status and statistics (runtime enrichment)
	WithRuntimeStatus(spec ConfigType, status EventStreamStatus, stats *EventStreamStatistics) ConfigType
	// Type specific config validation goes here
	Validate(ctx context.Context, config ConfigType) error
	// The run function should execute in a loop detecting events until instructed to stop:
	// - The Run function should block when no events are available
	//   - Must detect if the context is closed (see below)
	// - The Deliver function will block if the stream is blocked:
	//   - Blocked means the previous batch is being processed, and the current batch is full
	// - If the stream stops, the Exit instruction will be returned from deliver
	// - The supplied context will be cancelled as well on exit, so should be used:
	//   1. In any blocking i/o functions
	//   2. To wake any sleeps early, such as batch polling scenarios
	// - If the function returns without an Exit instruction, it will be restarted from the last checkpoint
	Run(ctx context.Context, spec ConfigType, checkpointSequenceID string, deliver Deliver[DataType]) error
}

Runtime is the required implementation extension for the EventStream common utility Generics: - ConfigType is the Configuration Type - the custom extensions to the configuration schema - DataType is the Data Type - the payload type that will be delivered to the application

type SourceInstruction

type SourceInstruction int
const (
	Continue SourceInstruction = iota
	Exit
)

type WebSocketConfig

type WebSocketConfig struct {
	DistributionMode *DistributionMode `ffstruct:"wsconfig" json:"distributionMode,omitempty"`
}

func (*WebSocketConfig) Scan

func (wc *WebSocketConfig) Scan(src interface{}) error

Store in DB as JSON

func (*WebSocketConfig) Value

func (wc *WebSocketConfig) Value() (driver.Value, error)

Store in DB as JSON

type WebhookConfig

type WebhookConfig struct {
	URL           *string             `ffstruct:"whconfig" json:"url,omitempty"`
	Method        *string             `ffstruct:"whconfig" json:"method,omitempty"`
	TLSConfigName *string             `ffstruct:"whconfig" json:"tlsConfigName,omitempty"`
	HTTP          *ffresty.HTTPConfig `ffstruct:"whconfig" json:"http,omitempty"`
	// contains filtered or unexported fields
}

func (*WebhookConfig) Scan

func (wc *WebhookConfig) Scan(src interface{}) error

Store in DB as JSON

func (*WebhookConfig) Value

func (wc *WebhookConfig) Value() (driver.Value, error)

Store in DB as JSON

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL