ordered

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2023 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package ordered provides a projector that consumes messages from a an ordered stream of events and applies them to a projection.

Index

Constants

View Source
const (
	// DefaultTimeout is the default timeout to use when applying an event.
	DefaultTimeout = 3 * time.Second

	// DefaultCompactionInterval is the default interval at which a projector
	// will compact its projection.
	DefaultCompactionInterval = 24 * time.Hour

	// DefaultCompactionTimeout is the default timeout to use when compacting a
	// projection.
	DefaultCompactionTimeout = 5 * time.Minute
)

Variables

View Source
var ErrStreamSealed = errors.New("stream sealed")

ErrStreamSealed is returned by Stream.Open() and Cursor.Next() to indicate that a stream will never produce any more events.

Functions

This section is empty.

Types

type Cursor

type Cursor interface {
	// Next returns the next relevant event in the stream.
	//
	// If the end of the stream is reached it blocks until a relevant event is
	// appended to the stream, ctx is canceled or the stream is sealed. If the
	// stream is sealed, ErrStreamSealed is returned.
	Next(ctx context.Context) (Envelope, error)

	// Close stops the cursor.
	//
	// Any current or future calls to Next() return a non-nil error.
	Close() error
}

A Cursor reads events from a stream.

Cursors are not intended to be used by multiple goroutines concurrently.

type Envelope

type Envelope struct {
	// Offset is the zero-based offset of the message on the stream.
	Offset uint64

	// RecordedAt is the time at which the event occurred.
	RecordedAt time.Time

	// Message is the application-defined message.
	Message dogma.Message
}

Envelope is a container for an event on a stream.

type MemoryStream

type MemoryStream struct {
	// StreamID is a unique identifier for the stream, it must not be empty.
	// The tuple of stream ID and event offset must uniquely identify a message.
	StreamID string
	// contains filtered or unexported fields
}

MemoryStream is an implementation of Stream that stores messages in-memory.

It is intended primarily for testing.

func (*MemoryStream) Append

func (s *MemoryStream) Append(t time.Time, messages ...dogma.Message)

Append appends messages to the end of the stream.

It panics if the stream is sealed.

func (*MemoryStream) ID

func (s *MemoryStream) ID() string

ID returns a unique identifier for the stream.

The tuple of stream ID and event offset must uniquely identify a message.

func (*MemoryStream) Open

func (s *MemoryStream) Open(
	ctx context.Context,
	offset uint64,
	filter []dogma.Message,
) (Cursor, error)

Open returns a cursor used to read events from this stream.

offset is the position of the first event to read. The first event on a stream is always at offset 0. If the given offset is beyond the end of a sealed stream, ErrStreamSealed is returned.

filter is a set of zero-value event messages, the types of which indicate which event types are returned by Cursor.Next(). If filter is empty, all events types are returned.

func (*MemoryStream) Seal added in v0.2.1

func (s *MemoryStream) Seal()

Seal marks the stream as sealed, preventing new events from being appended.

func (*MemoryStream) Truncate added in v0.2.0

func (s *MemoryStream) Truncate(offset uint64) uint64

Truncate discards any events before the given offset.

It returns the number of truncated events.

It panics if the offset is greater than the total number of events appended to the stream.

type Projector

type Projector struct {
	// Stream is the stream used to obtain event messages.
	Stream Stream

	// Handler is the Dogma projection handler that the messages are applied to.
	Handler dogma.ProjectionMessageHandler

	// Logger is the target for log messages from the projector and the handler.
	// If it is nil, logging.DefaultLogger is used.
	Logger logging.Logger

	// DefaultTimeout is the timeout duration to use when hanlding an event if
	// the handler does not provide a timeout hint. If it is zero the global
	// DefaultTimeout constant is used.
	DefaultTimeout time.Duration

	// CompactionInterval is the interval at which the projector compacts the
	// projection. If it is zero the global DefaultCompactionInterval constant
	// is used.
	CompactionInterval time.Duration

	// CompactionTimeout is the default timeout to use when compacting the
	// projection. If it is zero the global DefaultCompactionTimeout is used.
	CompactionTimeout time.Duration
	// contains filtered or unexported fields
}

Projector reads events from a stream and applies them to a projection.

func (*Projector) Run

func (p *Projector) Run(ctx context.Context) (err error)

Run runs the projection until ctx is canceled or an error occurs.

Event messages are obtained from the stream and passed to the handler for handling as they become available. Projection compaction is performed at a fixed interval.

If message handling fails due to an optimistic concurrency conflict within the projection the consumer restarts automatically.

Run() returns if any other error occurs during handling or compaction, in which case it is the caller's responsibility to implement any retry logic.

Run() can safely be called again after exiting with an error.

type Stream

type Stream interface {
	// ID returns a unique identifier for the stream.
	//
	// The tuple of stream ID and event offset must uniquely identify a message.
	ID() string

	// Open returns a cursor used to read events from this stream.
	//
	// offset is the position of the first event to read. The first event on a
	// stream is always at offset 0. If the given offset is beyond the end of a
	// sealed stream, ErrStreamSealed is returned.
	//
	// filter is a set of zero-value event messages, the types of which indicate
	// which event types are returned by Cursor.Next(). If filter is empty, all
	// events types are returned.
	Open(ctx context.Context, offset uint64, filter []dogma.Message) (Cursor, error)
}

A Stream is an ordered sequence of event messages.

Stream implementations may optionally allow for streams to be marked as "sealed", indicating that no new messages will appear on the stream.

Directories

Path Synopsis
Package resource contains utilities for performing low-level manipulations of projection resource versions.
Package resource contains utilities for performing low-level manipulations of projection resource versions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL