projection

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2022 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package projection contains tools for projecting state from messages.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type GlobalStoreConsumer

type GlobalStoreConsumer struct {
	Projector    Projector
	DataStore    persistence.DataStore
	MessageStore messagestore.GloballyOrderedStore
	Offsets      OffsetStore
	Logger       twelf.Logger
	// contains filtered or unexported fields
}

GlobalStoreConsumer reads messages from all streams in a message store and forwards them to an application-defined projector to produce a projection.

func (*GlobalStoreConsumer) Consume

func (c *GlobalStoreConsumer) Consume(ctx context.Context) error

Consume reads messages from the store and forwards them to the projector until an error occurs or ctx is canceled.

type MessageHandler

type MessageHandler struct {
	Projector Projector
}

MessageHandler is an adaptor that presents a Projector as a routing.MessageHandler.

func (*MessageHandler) HandleMessage

func (a *MessageHandler) HandleMessage(
	ctx context.Context,
	_ ax.Sender,
	mctx ax.MessageContext,
) error

HandleMessage invokes application-defined logic that handles a message.

It may panic if env.Message is not one of the types described by MessageTypes().

func (*MessageHandler) MessageTypes

func (a *MessageHandler) MessageTypes() ax.MessageTypeSet

MessageTypes returns the set of messages that the handler intends to handle.

The return value should be constant as it may be cached by various independent stages in the message pipeline.

type OffsetStore

type OffsetStore interface {
	// LoadOffset returns the offset at which a consumer should resume
	// reading from the stream.
	//
	// pk is the projector's persistence key.
	LoadOffset(
		ctx context.Context,
		ds persistence.DataStore,
		pk string,
	) (uint64, error)

	// IncrementOffset increments the offset at which a consumer should resume
	// reading from the stream by one.
	//
	// pk is the projector's persitence key. c is the offset that is currently
	// stored, as returned by LoadOffset(). If c is not the offset that is
	// currently stored, the increment fails and a non-nil error is returned.
	IncrementOffset(
		ctx context.Context,
		tx persistence.Tx,
		pk string,
		c uint64,
	) error
}

OffsetStore is an interface for persisting a consumer's current position in a message stream.

type Projector

type Projector interface {
	// PersistenceKey returns a unique name for the projector.
	//
	// The persistence key is used to relate persisted data with the projector
	// implementation that owns it. Persistence keys should not be changed once
	// a projection has been started.
	PersistenceKey() string

	// MessageTypes returns the set of messages that the projector intends
	// to handle.
	//
	// The return value should be constant as it may be cached.
	MessageTypes() ax.MessageTypeSet

	// ApplyMessage invokes application-defined logic that updates the
	// application state to reflect the occurrence of a message.
	//
	// It may panic if env.Message is not one of the types described by
	// MessageTypes().
	ApplyMessage(ctx context.Context, mctx ax.MessageContext) error
}

Projector is an interface for a specialized form of application-defined message handler which produces a "projection" of state from the messages it receives.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL