eventsourcex

package
v0.0.0-...-bc6c5c1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2018 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultCommitInterval the minimum amount of time that must pass between offset commits
	DefaultCommitInterval = time.Second * 3

	// DefaultPublishInterval the amount of time between checking the repository for updates
	DefaultPublishInterval = time.Minute
)
View Source
const (
	// ClusterID specifies the cluster-id of the nats streaming cluster that hosts events
	ClusterID = "events"

	// DefaultTimeout specifies how much time to wait for the eventual consistency
	DefaultTimeout = time.Second * 5
)

Variables

This section is empty.

Functions

func NoticesSubject

func NoticesSubject(env, boundedContext string, args ...string) string

NoticesSubject returns the subject for a specific bounded context

func PublishStreamSingleton

func PublishStreamSingleton(ctx context.Context, p Publisher, r eventsource.StreamReader, cp *checkpoint.CP, env, bc string, nc *nats.Conn) error

PublishStreamSingleton is similar to PublishStream except that there may be only one running in the environment

func StreamSubject

func StreamSubject(env, boundedContext string) string

StreamSubject returns the streaming subject for a specific bounded context

func SubscribeNotices

func SubscribeNotices(ctx context.Context, nc *nats.Conn, env, boundedContext string, fn func(id string)) error

SubscribeNotices listens for notices for a specific bounded context. Notices are published when the caller of a command would like a consistent read after writer. The notice provides eventually consistent services an opportunity to update the read model immediately.

func SubscribeStream

func SubscribeStream(ctx context.Context, nc *nats.Conn, cp Checkpointer, env, boundedContext string, h Handler) (<-chan struct{}, error)

SubscribeStream subscribes to a nats stream for the specified bounded context

Types

type Checkpointer

type Checkpointer interface {
	// Load retrieves the specified nats streaming offset
	Load(ctx context.Context, key string) (uint64, error)

	// Save persists the specified nats streaming offset
	Save(ctx context.Context, key string, offset uint64) error
}

Checkpointer persists and retrieves nats streaming offsets

type Handler

type Handler interface {
	// Handle receives the inbound message
	Receive(offset uint64, data []byte)
}

Handler provides the business end of the MessageHandler struct

type HandlerFunc

type HandlerFunc func(offset uint64, data []byte)

HandlerFunc provides a func wrapper for Handler

func WithLogging

func WithLogging(h Handler, logger interface {
	Info(string, ...log.Field)
}) HandlerFunc

func (HandlerFunc) Receive

func (fn HandlerFunc) Receive(offset uint64, data []byte)

Handle implements the Handle interface

type MemoryCP

type MemoryCP map[string]uint64

MemoryCP provides an in-memory non-thread-safe implementation of a checkpoint

func (MemoryCP) Load

func (m MemoryCP) Load(ctx context.Context, key string) (uint64, error)

Load implements Checkpointer.Save

func (MemoryCP) Save

func (m MemoryCP) Save(ctx context.Context, key string, offset uint64) error

Save implements Checkpointer.Save

type MessageHandler

type MessageHandler struct {
	// contains filtered or unexported fields
}

MessageHandler encapsulates a nats streaming processor that performs buffered processing

func NewMessageHandler

func NewMessageHandler(ctx context.Context, p Processor, u Unmarshaler, cp Checkpointer, cpKey string, opts ...MessageHandlerOption) *MessageHandler

NewMessageHandler constructs a new MessageHandler with the arguments provided

func (*MessageHandler) Close

func (m *MessageHandler) Close() error

Close releases resources associated with the MessageHandler

func (*MessageHandler) Done

func (m *MessageHandler) Done() <-chan struct{}

Done returns a chan that signals when all the resources used by MessageHandler have been released

func (*MessageHandler) Receive

func (m *MessageHandler) Receive(offset uint64, data []byte)

Handle the the specified stream record

type MessageHandlerOption

type MessageHandlerOption func(m *MessageHandler)

MessageHandlerOption allows options to be specified for NewMessageHandler

func WithBufferSize

func WithBufferSize(in int) MessageHandlerOption

WithBufferSize specifies the max number of messages to be called before callng the Processor

func WithInterval

func WithInterval(d time.Duration) MessageHandlerOption

WithInterval specifies the maximum amount of time that can elapse until we try to flush any received events

type Processor

type Processor func(ctx context.Context, events ...eventsource.Event) error

Processor performs operations on a stream of events, usually in conjunction with SubscribeStream

func WithSendNotices

func WithSendNotices(p Processor, nc *nats.Conn, env, boundedContext, source string) Processor

WithSendNotices publishes an event to the notices subject if the processor executes successfully

func (Processor) Do

func (fn Processor) Do(ctx context.Context, events ...eventsource.Event) error

Do is a helper method to invoke the Processor

type Publisher

type Publisher interface {
	Publish(record eventsource.StreamRecord) error
}

Publisher publishes the record to a event bus

type PublisherFunc

type PublisherFunc func(record eventsource.StreamRecord) error

PublisherFunc provides a func wrapper to Publisher

func PublishStan

func PublishStan(st stan.Conn, subject string) PublisherFunc

PublishStan publishes events to the nats stream identified with the env and boundedContext

func WithLogPublish

func WithLogPublish(publisher Publisher, logger interface {
	Info(string, ...log.Field)
}) PublisherFunc

WithLogPublish logs when events are published

func WithPublishEvents

func WithPublishEvents(fn Publisher, nc *nats.Conn, env, boundedContext string) PublisherFunc

WithPublishEvents publishes received events to nats

func (PublisherFunc) Publish

func (fn PublisherFunc) Publish(record eventsource.StreamRecord) error

Publish Implements the Publisher interface

type Repository

type Repository interface {
	// Apply executes the command against the current version of the aggregate and returns the updated version
	// of the aggregate (or the current version if no updates were made)
	Apply(ctx context.Context, cmd eventsource.Command) (int, error)
}

Repository provides an abstraction over *eventsource.Repository over the mutator function

func WithConsistentRead

func WithConsistentRead(repo Repository, nc *nats.Conn, env, boundedContext string) Repository

WithConsistentRead provides a faux consistent read. Should wrap WithNotifier to ensure that the NoticesSubject.{ID} is subscribed to prior to the command being executed.

func WithTrace

func WithTrace(repo Repository) Repository

WithTrace logs published events to the tracer

type RepositoryFunc

type RepositoryFunc func(ctx context.Context, cmd eventsource.Command) (int, error)

RepositoryFunc provides a func helper around Repository

func (RepositoryFunc) Apply

func (fn RepositoryFunc) Apply(ctx context.Context, cmd eventsource.Command) (int, error)

Apply implements Repository's Apply method

type Supervisor

type Supervisor interface {
	Check()
	Close() error
	Done() <-chan struct{}
}

Supervisor reads events from a StreamReader and supervisor them to a handler

func PublishStream

func PublishStream(ctx context.Context, h Publisher, r eventsource.StreamReader, cp Checkpointer, env, bc string) Supervisor

PublishStream reads from a stream and publishes

func WithReceiveNotifications

func WithReceiveNotifications(s Supervisor, nc *nats.Conn, env, boundedContext string) Supervisor

WithReceiveNotifications listens to nats for notices on the StreamSubject and prods the supervisor

func WithTraceReceiveNotices

func WithTraceReceiveNotices(s Supervisor, segment tracer.Segment) Supervisor

WithTraceReceiveNotices returns a Supervisor that ping when Check is invoked

type Unmarshaler

type Unmarshaler func([]byte) (eventsource.Event, error)

Unmarshaler accepts a []byte encoded event and returns an event

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL