stream

package
v0.0.0-...-5fa8032 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2021 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrInvalidInputData = errors.New("stream: unmarshal error: invalid data input")

Functions

func RestoreFromEvent

func RestoreFromEvent(s *Stream, event *event.Event)

Types

type CommandController

type CommandController interface {
	CommandSink(context.Context, *Stream, *command.Command) (*command.Reply, error)
}

type CommandControllerOption

type CommandControllerOption func(*commandController)

func WithCommandControllerCreateIfNotExists

func WithCommandControllerCreateIfNotExists() CommandControllerOption

func WithCommandControllerDropStream

func WithCommandControllerDropStream() CommandControllerOption

type CommandSinker

type CommandSinker interface {
	CommandSink(ctx context.Context, cmd *command.Command) (*command.Reply, error)
}

func WithCommandSinkerInterceptor

func WithCommandSinkerInterceptor(s CommandSinker, other ...CommandSinkerInterceptor) CommandSinker

type CommandSinkerInterceptor

type CommandSinkerInterceptor func(CommandSinker) CommandSinker

type ConnectorPublisher

type ConnectorPublisher struct {
}

func (ConnectorPublisher) Publish

func (ConnectorPublisher) Publish(_ []*event.Event) error

type ControllerFunc

type ControllerFunc func(context.Context, *Stream, *command.Command) (*command.Reply, error)

func (ControllerFunc) CommandSink

func (fn ControllerFunc) CommandSink(ctx context.Context, s *Stream, c *command.Command) (*command.Reply, error)

type EventController

type EventController interface {
	PickStream(*event.Event) Picker
	EventSink(context.Context, *Stream, *event.Event) error
}

func EventControllerFunc

func EventControllerFunc(
	pickStream func(*event.Event) Picker,
	sink func(context.Context, *Stream, *event.Event) error,
) EventController

type EventControllerOption

type EventControllerOption func(*eventController)

func WithEventControllerCreateIfNotExists

func WithEventControllerCreateIfNotExists() EventControllerOption

func WithEventControllerDropStream

func WithEventControllerDropStream() EventControllerOption

type EventErrorHandler

type EventErrorHandler interface {
	HandleError(ctx context.Context, e *event.Event, err error)
}

type EventHandler

type EventHandler interface {
	Match(eventName string) bool
	Handle(context.Context, *event.Event) error
	Rollback(context.Context, *event.Event) error
}

func WithEventHandlerInterceptor

func WithEventHandlerInterceptor(eh EventHandler, other ...EventHandlerInterceptor) EventHandler

type EventHandlerFunc

type EventHandlerFunc func(ctx context.Context, e *event.Event) error

type EventHandlerInterceptor

type EventHandlerInterceptor func(handler EventHandler) EventHandler

type EventSinker

type EventSinker interface {
	EventSink(ctx context.Context, e *event.Event) error
}

func WithEventSinkerInterceptor

func WithEventSinkerInterceptor(s EventSinker, other ...EventSinkerInterceptor) EventSinker

type EventSinkerInterceptor

type EventSinkerInterceptor func(EventSinker) EventSinker

type Mutator

type Mutator struct {
	// contains filtered or unexported fields
}

func NewMutator

func NewMutator(
	storage Storage,
	publisher Publisher,
	opts ...MutatorOption,
) *Mutator

func (*Mutator) AddCommandController

func (m *Mutator) AddCommandController(
	commandName string,
	ctrl CommandController,
	opts ...CommandControllerOption,
)

func (*Mutator) AddEventController

func (m *Mutator) AddEventController(
	eventName string,
	ctrl EventController,
	opts ...EventControllerOption,
)

func (*Mutator) CommandSink

func (m *Mutator) CommandSink(ctx context.Context, cmd *command.Command) (*command.Reply, error)

func (*Mutator) EventSink

func (m *Mutator) EventSink(ctx context.Context, e *event.Event) (err error)

func (*Mutator) SetBlacklistOfEvents

func (m *Mutator) SetBlacklistOfEvents(eventNames ...string)

type MutatorOption

type MutatorOption func(*Mutator)

func WithMutatorStrictMode

func WithMutatorStrictMode() MutatorOption

type Picker

type Picker struct {
	StreamID  uuid.UUID
	StreamIDs []uuid.UUID
}

type Projection

type Projection struct {
	// contains filtered or unexported fields
}

func NewProjection

func NewProjection() *Projection

func (*Projection) AddEventController

func (p *Projection) AddEventController(eventName string, fn EventHandlerFunc)

func (*Projection) AddEventControllerWithRollback

func (p *Projection) AddEventControllerWithRollback(eventName string, fn, rollback EventHandlerFunc)

func (*Projection) Handle

func (p *Projection) Handle(ctx context.Context, e *event.Event) error

func (*Projection) Match

func (p *Projection) Match(eventName string) bool

func (*Projection) Rollback

func (p *Projection) Rollback(ctx context.Context, e *event.Event) error

func (*Projection) SkipUnhandledEvent

func (p *Projection) SkipUnhandledEvent()

type Publisher

type Publisher interface {
	Publish(event []*event.Event) error
}

func NewConnectorPublisher

func NewConnectorPublisher() Publisher

type State

type State interface {
	Mutate(*event.Event)
	encoding.BinaryUnmarshaler
	encoding.BinaryMarshaler
}

type Storage

type Storage interface {
	StreamName() string
	NewStream() *Stream
	Persist(ctx context.Context, s *Stream) error
	Load(ctx context.Context, streamID uuid.UUID) (*Stream, error)
	Drop(ctx context.Context, streamID uuid.UUID) error
}

func NewStorage

func NewStorage(streamName string, newStream func() *Stream) Storage

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func Blank

func Blank(name string, initState State) *Stream

func New

func New(name string, id uuid.UUID, initState State) *Stream

func (*Stream) Changes

func (s *Stream) Changes() []*event.Event

func (*Stream) ClearChanges

func (s *Stream) ClearChanges()

func (*Stream) ID

func (s *Stream) ID() uuid.UUID

func (*Stream) MarshalBinary

func (s *Stream) MarshalBinary() (data []byte, err error)

func (*Stream) Mutate

func (s *Stream) Mutate(eventName string, payload codec.Codec)

func (*Stream) Name

func (s *Stream) Name() string

func (*Stream) PreviousVersion

func (s *Stream) PreviousVersion() int

func (*Stream) State

func (s *Stream) State() State

func (*Stream) String

func (s *Stream) String() string

func (*Stream) Unix

func (s *Stream) Unix() int64

func (*Stream) UnmarshalBinary

func (s *Stream) UnmarshalBinary(data []byte) error

func (*Stream) Version

func (s *Stream) Version() int

type Subscriber

type Subscriber interface {
	Subscribe(streamName string, h ...EventHandler)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL