es

package module
v0.0.0-...-3b037d6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 2, 2020 License: MIT Imports: 8 Imported by: 0

README

Event Stream / CQRS toolkit for GO

NOT ready for production. Work in progress...

Documentation

Overview

Package es is a generated GoMock package.

Package es is a generated GoMock package.

Package es is a generated GoMock package.

Package es is a generated GoMock package.

Package es is a generated GoMock package.

Package es is a generated GoMock package.

Package es is a generated GoMock package.

Index

Constants

View Source
const (
	ErrorReplyType   = "Error"
	SuccessReplyType = "Success"
	UnknownReplyType = "Unknown"
)
View Source
const (
	CorrelationIDKey = "_correlationID"
	ReplyToKey       = "_replyTo"
)
View Source
const (
	SourceKey = "sourceID"
	ActorKey  = "actor"
)
View Source
const (
	TxnKey key = 1
)

Variables

View Source
var (
	ErrTakeSnapshot         = errors.New("es: take snapshot")
	ErrNoEvents             = errors.New("es: events not found")
	ErrNoSnapshot           = errors.New("es: snapshot not found")
	ErrVersionInconsistency = errors.New("es: version inconsistency")
	ErrVersionIdentical     = errors.New("es: expected version and new version are identical")
)

Functions

func Raise

func Raise(es Stream, payload interface{}, eventContext EventContext)

func RaiseEvent

func RaiseEvent(
	es Stream,
	payload interface{},
	eventType string,
	eventContext EventContext,
)

func TypeName

func TypeName(source interface{}) string

Types

type Command

type Command struct {
	Header Header
	// contains filtered or unexported fields
}

func NewCommand

func NewCommand(
	streamType string,
	streamID uuid.UUID,
	commandType string,
	payload interface{},
) *Command

func NewCommandWithReply

func NewCommandWithReply(
	replyTo string,
	streamType string,
	streamID uuid.UUID,
	commandType string,
	payload interface{},
	cid CorrelationID,
) *Command

func (*Command) Payload

func (c *Command) Payload() interface{}

func (*Command) Reply

func (c *Command) Reply(reply interface{}, version int) *Reply

func (*Command) ReplyOk

func (c *Command) ReplyOk() *Reply

func (*Command) ReplyWith

func (c *Command) ReplyWith(replyType string, reply interface{}, version int) *Reply

func (*Command) ReplyWithError

func (c *Command) ReplyWithError(err error) *Reply

func (*Command) ReplyWithSuccess

func (c *Command) ReplyWithSuccess(version int) *Reply

func (*Command) StreamID

func (c *Command) StreamID() uuid.UUID

func (*Command) StreamType

func (c *Command) StreamType() string

func (*Command) Type

func (c *Command) Type() string

type CommandCoder

type CommandCoder interface {
	EncodeCommand(ctx context.Context, streamType string, commandType string, command interface{}) (bytes []byte, err error)
	DecodeReply(ctx context.Context, streamType string, replyType string, bytes []byte) (reply interface{}, err error)
	DecodeCommand(ctx context.Context, streamType string, commandType string, bytes []byte) (command interface{}, err error)
	EncodeReply(ctx context.Context, streamType string, replyType string, reply interface{}) (bytes []byte, err error)
}

type CommandReceiverFunc

type CommandReceiverFunc func(ctx context.Context, command *Command) (*Reply, error)

type CommandSendReplier

type CommandSendReplier interface {
	SendCommand(ctx context.Context, c *Command) error
	ReceiveReply(ctx context.Context, fn ReplyReceiverFunc)
	SendReply(ctx context.Context, r *Reply) error
	SendWithReply(ctx context.Context, c *Command, fn ReplyReceiverFunc) error
	SendAndWaitReply(ctx context.Context, c *Command) (*Reply, error)
	RemoveReplier(ctx context.Context, cid CorrelationID)
	HasReplier(ctx context.Context, cid CorrelationID) bool
	Close() error
}

type Consumer

type Consumer interface {
	Consume(ctx context.Context) error
	Close() error
}

type ConsumerHandler

type ConsumerHandler func(ctx context.Context, channel string, message interface{}) error

type CorrelationID

type CorrelationID string

func NewCorrelationID

func NewCorrelationID() CorrelationID

func (CorrelationID) Equal

func (cid CorrelationID) Equal(correlationID string) bool

func (CorrelationID) String

func (cid CorrelationID) String() string

type DecodeFunc

type DecodeFunc func(ctx context.Context, payloadType string, bytes []byte) (payload interface{}, err error)

type DefaultEvent

type DefaultEvent struct {
	// contains filtered or unexported fields
}

func (*DefaultEvent) Context

func (e *DefaultEvent) Context() EventContext

func (*DefaultEvent) CreatedAt

func (e *DefaultEvent) CreatedAt() time.Time

func (*DefaultEvent) EventType

func (e *DefaultEvent) EventType() string

func (*DefaultEvent) Payload

func (e *DefaultEvent) Payload() interface{}

func (*DefaultEvent) SetStreamID

func (e *DefaultEvent) SetStreamID(id uuid.UUID)

func (*DefaultEvent) Snapshot

func (e *DefaultEvent) Snapshot() EventSnapshot

func (*DefaultEvent) StreamID

func (e *DefaultEvent) StreamID() uuid.UUID

func (*DefaultEvent) StreamType

func (e *DefaultEvent) StreamType() string

func (*DefaultEvent) Version

func (e *DefaultEvent) Version() int

type DefaultStream

type DefaultStream struct {
	// contains filtered or unexported fields
}

func (*DefaultStream) AddChange

func (a *DefaultStream) AddChange(e Event)

func (*DefaultStream) Apply

func (a *DefaultStream) Apply(e Event, isNew bool)

func (*DefaultStream) Changes

func (a *DefaultStream) Changes() []Event

func (*DefaultStream) ClearChanges

func (a *DefaultStream) ClearChanges()

func (*DefaultStream) IncrementVersion

func (a *DefaultStream) IncrementVersion()

func (*DefaultStream) PreviousVersion

func (a *DefaultStream) PreviousVersion() int

func (*DefaultStream) SetStreamID

func (a *DefaultStream) SetStreamID(id uuid.UUID)

func (*DefaultStream) StreamID

func (a *DefaultStream) StreamID() uuid.UUID

func (*DefaultStream) StreamType

func (a *DefaultStream) StreamType() string

func (*DefaultStream) Version

func (a *DefaultStream) Version() int

type EncodeFunc

type EncodeFunc func(ctx context.Context, payload interface{}) (bytes []byte, err error)

type Endpoint

type Endpoint = func(ctx context.Context, req interface{}) (resp interface{}, err error)

go-kit endpoint.Endpoint compatibility

type Event

type Event interface {
	StreamID() uuid.UUID
	SetStreamID(id uuid.UUID)
	StreamType() string
	Version() int
	Context() EventContext
	Payload() interface{}
	EventType() string
	CreatedAt() time.Time
	Snapshot() EventSnapshot
}

func NewEvent

func NewEvent(
	streamID uuid.UUID,
	streamType string,
	version int,
	payload interface{},
	eventType string,
) Event

func NewEventWithContext

func NewEventWithContext(
	streamID uuid.UUID,
	streamType string,
	version int,
	payload interface{},
	eventType string,
	eventContext EventContext,
	createdAt time.Time,
) Event

type EventBus

type EventBus interface {
	Publisher
	Subscriber
}

type EventCoder

type EventCoder interface {
	Encode(ctx context.Context, streamType string, eventType string, event interface{}) (bytes []byte, err error)
	Decode(ctx context.Context, streamType string, eventType string, bytes []byte) (event interface{}, err error)
}

type EventContext

type EventContext map[string]string

func NewEventContext

func NewEventContext() EventContext

func (EventContext) Actor

func (c EventContext) Actor() string

func (EventContext) Add

func (c EventContext) Add(key, value string) EventContext

func (EventContext) SetActor

func (c EventContext) SetActor(value string) EventContext

func (EventContext) SetSource

func (c EventContext) SetSource(value string) EventContext

func (EventContext) Source

func (c EventContext) Source() string

type EventHandler

type EventHandler func(ctx context.Context, event Event) error

type EventSnapshot

type EventSnapshot struct {
	StreamID   string
	StreamType string
	Version    int
	Context    map[string]string
	EventType  string
	CreatedAt  time.Time
}

type Filter

type Filter func(event Event) bool

func FilterByEventType

func FilterByEventType(eventType ...string) Filter

func FilterByStreamType

func FilterByStreamType(streamType ...string) Filter
type Header map[string]string

func (Header) Add

func (h Header) Add(k, v string)

func (Header) CorrelationID

func (h Header) CorrelationID() CorrelationID

func (Header) Get

func (h Header) Get(k string) string

func (Header) ReplyTo

func (h Header) ReplyTo() string

func (Header) SetCorrelationID

func (h Header) SetCorrelationID(cid CorrelationID)

func (Header) SetReplyTo

func (h Header) SetReplyTo(reply string)

func (Header) ShouldReply

func (h Header) ShouldReply() bool

type Identifier

type Identifier interface {
	ID() uuid.UUID
	SetID(id uuid.UUID)
}

type MockCommandCoder

type MockCommandCoder struct {
	// contains filtered or unexported fields
}

MockCommandCoder is a mock of CommandCoder interface

func NewMockCommandCoder

func NewMockCommandCoder(ctrl *gomock.Controller) *MockCommandCoder

NewMockCommandCoder creates a new mock instance

func (*MockCommandCoder) DecodeCommand

func (m *MockCommandCoder) DecodeCommand(ctx context.Context, streamType, commandType string, bytes []byte) (interface{}, error)

DecodeCommand mocks base method

func (*MockCommandCoder) DecodeReply

func (m *MockCommandCoder) DecodeReply(ctx context.Context, streamType, replyType string, bytes []byte) (interface{}, error)

DecodeReply mocks base method

func (*MockCommandCoder) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockCommandCoder) EncodeCommand

func (m *MockCommandCoder) EncodeCommand(ctx context.Context, streamType, commandType string, command interface{}) ([]byte, error)

EncodeCommand mocks base method

func (*MockCommandCoder) EncodeReply

func (m *MockCommandCoder) EncodeReply(ctx context.Context, streamType, replyType string, reply interface{}) ([]byte, error)

EncodeReply mocks base method

type MockCommandCoderMockRecorder

type MockCommandCoderMockRecorder struct {
	// contains filtered or unexported fields
}

MockCommandCoderMockRecorder is the mock recorder for MockCommandCoder

func (*MockCommandCoderMockRecorder) DecodeCommand

func (mr *MockCommandCoderMockRecorder) DecodeCommand(ctx, streamType, commandType, bytes interface{}) *gomock.Call

DecodeCommand indicates an expected call of DecodeCommand

func (*MockCommandCoderMockRecorder) DecodeReply

func (mr *MockCommandCoderMockRecorder) DecodeReply(ctx, streamType, replyType, bytes interface{}) *gomock.Call

DecodeReply indicates an expected call of DecodeReply

func (*MockCommandCoderMockRecorder) EncodeCommand

func (mr *MockCommandCoderMockRecorder) EncodeCommand(ctx, streamType, commandType, command interface{}) *gomock.Call

EncodeCommand indicates an expected call of EncodeCommand

func (*MockCommandCoderMockRecorder) EncodeReply

func (mr *MockCommandCoderMockRecorder) EncodeReply(ctx, streamType, replyType, reply interface{}) *gomock.Call

EncodeReply indicates an expected call of EncodeReply

type MockCommandSendReplier

type MockCommandSendReplier struct {
	// contains filtered or unexported fields
}

MockCommandSendReplier is a mock of CommandSendReplier interface

func NewMockCommandSendReplier

func NewMockCommandSendReplier(ctrl *gomock.Controller) *MockCommandSendReplier

NewMockCommandSendReplier creates a new mock instance

func (*MockCommandSendReplier) Close

func (m *MockCommandSendReplier) Close() error

Close mocks base method

func (*MockCommandSendReplier) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockCommandSendReplier) HasReplier

func (m *MockCommandSendReplier) HasReplier(ctx context.Context, cid CorrelationID) bool

HasReplier mocks base method

func (*MockCommandSendReplier) ReceiveReply

func (m *MockCommandSendReplier) ReceiveReply(ctx context.Context, fn ReplyReceiverFunc)

ReceiveReply mocks base method

func (*MockCommandSendReplier) RemoveReplier

func (m *MockCommandSendReplier) RemoveReplier(ctx context.Context, cid CorrelationID)

RemoveReplier mocks base method

func (*MockCommandSendReplier) SendAndWaitReply

func (m *MockCommandSendReplier) SendAndWaitReply(ctx context.Context, c *Command) (*Reply, error)

SendAndWaitReply mocks base method

func (*MockCommandSendReplier) SendCommand

func (m *MockCommandSendReplier) SendCommand(ctx context.Context, c *Command) error

SendCommand mocks base method

func (*MockCommandSendReplier) SendReply

func (m *MockCommandSendReplier) SendReply(ctx context.Context, r *Reply) error

SendReply mocks base method

func (*MockCommandSendReplier) SendWithReply

func (m *MockCommandSendReplier) SendWithReply(ctx context.Context, c *Command, fn ReplyReceiverFunc) error

SendWithReply mocks base method

type MockCommandSendReplierMockRecorder

type MockCommandSendReplierMockRecorder struct {
	// contains filtered or unexported fields
}

MockCommandSendReplierMockRecorder is the mock recorder for MockCommandSendReplier

func (*MockCommandSendReplierMockRecorder) Close

Close indicates an expected call of Close

func (*MockCommandSendReplierMockRecorder) HasReplier

func (mr *MockCommandSendReplierMockRecorder) HasReplier(ctx, cid interface{}) *gomock.Call

HasReplier indicates an expected call of HasReplier

func (*MockCommandSendReplierMockRecorder) ReceiveReply

func (mr *MockCommandSendReplierMockRecorder) ReceiveReply(ctx, fn interface{}) *gomock.Call

ReceiveReply indicates an expected call of ReceiveReply

func (*MockCommandSendReplierMockRecorder) RemoveReplier

func (mr *MockCommandSendReplierMockRecorder) RemoveReplier(ctx, cid interface{}) *gomock.Call

RemoveReplier indicates an expected call of RemoveReplier

func (*MockCommandSendReplierMockRecorder) SendAndWaitReply

func (mr *MockCommandSendReplierMockRecorder) SendAndWaitReply(ctx, c interface{}) *gomock.Call

SendAndWaitReply indicates an expected call of SendAndWaitReply

func (*MockCommandSendReplierMockRecorder) SendCommand

func (mr *MockCommandSendReplierMockRecorder) SendCommand(ctx, c interface{}) *gomock.Call

SendCommand indicates an expected call of SendCommand

func (*MockCommandSendReplierMockRecorder) SendReply

func (mr *MockCommandSendReplierMockRecorder) SendReply(ctx, r interface{}) *gomock.Call

SendReply indicates an expected call of SendReply

func (*MockCommandSendReplierMockRecorder) SendWithReply

func (mr *MockCommandSendReplierMockRecorder) SendWithReply(ctx, c, fn interface{}) *gomock.Call

SendWithReply indicates an expected call of SendWithReply

type MockConsumer

type MockConsumer struct {
	// contains filtered or unexported fields
}

MockConsumer is a mock of Consumer interface

func NewMockConsumer

func NewMockConsumer(ctrl *gomock.Controller) *MockConsumer

NewMockConsumer creates a new mock instance

func (*MockConsumer) Close

func (m *MockConsumer) Close() error

Close mocks base method

func (*MockConsumer) Consume

func (m *MockConsumer) Consume(ctx context.Context) error

Consume mocks base method

func (*MockConsumer) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

type MockConsumerMockRecorder

type MockConsumerMockRecorder struct {
	// contains filtered or unexported fields
}

MockConsumerMockRecorder is the mock recorder for MockConsumer

func (*MockConsumerMockRecorder) Close

func (mr *MockConsumerMockRecorder) Close() *gomock.Call

Close indicates an expected call of Close

func (*MockConsumerMockRecorder) Consume

func (mr *MockConsumerMockRecorder) Consume(ctx interface{}) *gomock.Call

Consume indicates an expected call of Consume

type MockEventBus

type MockEventBus struct {
	// contains filtered or unexported fields
}

MockEventBus is a mock of EventBus interface

func NewMockEventBus

func NewMockEventBus(ctrl *gomock.Controller) *MockEventBus

NewMockEventBus creates a new mock instance

func (*MockEventBus) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockEventBus) Filter

func (m *MockEventBus) Filter(f ...Filter)

Filter mocks base method

func (*MockEventBus) Publish

func (m *MockEventBus) Publish(arg0 context.Context, arg1 Event) error

Publish mocks base method

func (*MockEventBus) Subscribe

func (m *MockEventBus) Subscribe(ctx context.Context, streamType string, handler EventHandler) error

Subscribe mocks base method

func (*MockEventBus) SubscribeToEvent

func (m *MockEventBus) SubscribeToEvent(ctx context.Context, streamType, eventType string, handler EventHandler) error

SubscribeToEvent mocks base method

type MockEventBusMockRecorder

type MockEventBusMockRecorder struct {
	// contains filtered or unexported fields
}

MockEventBusMockRecorder is the mock recorder for MockEventBus

func (*MockEventBusMockRecorder) Filter

func (mr *MockEventBusMockRecorder) Filter(f ...interface{}) *gomock.Call

Filter indicates an expected call of Filter

func (*MockEventBusMockRecorder) Publish

func (mr *MockEventBusMockRecorder) Publish(arg0, arg1 interface{}) *gomock.Call

Publish indicates an expected call of Publish

func (*MockEventBusMockRecorder) Subscribe

func (mr *MockEventBusMockRecorder) Subscribe(ctx, streamType, handler interface{}) *gomock.Call

Subscribe indicates an expected call of Subscribe

func (*MockEventBusMockRecorder) SubscribeToEvent

func (mr *MockEventBusMockRecorder) SubscribeToEvent(ctx, streamType, eventType, handler interface{}) *gomock.Call

SubscribeToEvent indicates an expected call of SubscribeToEvent

type MockEventCoder

type MockEventCoder struct {
	// contains filtered or unexported fields
}

MockEventCoder is a mock of EventCoder interface

func NewMockEventCoder

func NewMockEventCoder(ctrl *gomock.Controller) *MockEventCoder

NewMockEventCoder creates a new mock instance

func (*MockEventCoder) Decode

func (m *MockEventCoder) Decode(ctx context.Context, streamType, eventType string, bytes []byte) (interface{}, error)

Decode mocks base method

func (*MockEventCoder) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockEventCoder) Encode

func (m *MockEventCoder) Encode(ctx context.Context, streamType, eventType string, event interface{}) ([]byte, error)

Encode mocks base method

type MockEventCoderMockRecorder

type MockEventCoderMockRecorder struct {
	// contains filtered or unexported fields
}

MockEventCoderMockRecorder is the mock recorder for MockEventCoder

func (*MockEventCoderMockRecorder) Decode

func (mr *MockEventCoderMockRecorder) Decode(ctx, streamType, eventType, bytes interface{}) *gomock.Call

Decode indicates an expected call of Decode

func (*MockEventCoderMockRecorder) Encode

func (mr *MockEventCoderMockRecorder) Encode(ctx, streamType, eventType, event interface{}) *gomock.Call

Encode indicates an expected call of Encode

type MockProducer

type MockProducer struct {
	// contains filtered or unexported fields
}

MockProducer is a mock of Producer interface

func NewMockProducer

func NewMockProducer(ctrl *gomock.Controller) *MockProducer

NewMockProducer creates a new mock instance

func (*MockProducer) Close

func (m *MockProducer) Close() error

Close mocks base method

func (*MockProducer) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockProducer) Send

func (m *MockProducer) Send(ctx context.Context, message interface{}) error

Send mocks base method

func (*MockProducer) SendWithRouteKey

func (m *MockProducer) SendWithRouteKey(ctx context.Context, routeKey string, message interface{}) error

SendWithRouteKey mocks base method

type MockProducerMockRecorder

type MockProducerMockRecorder struct {
	// contains filtered or unexported fields
}

MockProducerMockRecorder is the mock recorder for MockProducer

func (*MockProducerMockRecorder) Close

func (mr *MockProducerMockRecorder) Close() *gomock.Call

Close indicates an expected call of Close

func (*MockProducerMockRecorder) Send

func (mr *MockProducerMockRecorder) Send(ctx, message interface{}) *gomock.Call

Send indicates an expected call of Send

func (*MockProducerMockRecorder) SendWithRouteKey

func (mr *MockProducerMockRecorder) SendWithRouteKey(ctx, routeKey, message interface{}) *gomock.Call

SendWithRouteKey indicates an expected call of SendWithRouteKey

type MockProjection

type MockProjection struct {
	// contains filtered or unexported fields
}

MockProjection is a mock of Projection interface

func NewMockProjection

func NewMockProjection(ctrl *gomock.Controller) *MockProjection

NewMockProjection creates a new mock instance

func (*MockProjection) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockProjection) IsEventProcessed

func (m *MockProjection) IsEventProcessed(ctx context.Context, e Event) bool

IsEventProcessed mocks base method

func (*MockProjection) LatestVersion

func (m *MockProjection) LatestVersion(ctx context.Context, streamType string) (int, error)

LatestVersion mocks base method

func (*MockProjection) Process

func (m *MockProjection) Process(ctx context.Context, e Event) error

Process mocks base method

func (*MockProjection) Remove

func (m *MockProjection) Remove(ctx context.Context, streamType string, streamID uuid.UUID) error

Remove mocks base method

func (*MockProjection) Reset

func (m *MockProjection) Reset(ctx context.Context, streamType string) error

Reset mocks base method

type MockProjectionMockRecorder

type MockProjectionMockRecorder struct {
	// contains filtered or unexported fields
}

MockProjectionMockRecorder is the mock recorder for MockProjection

func (*MockProjectionMockRecorder) IsEventProcessed

func (mr *MockProjectionMockRecorder) IsEventProcessed(ctx, e interface{}) *gomock.Call

IsEventProcessed indicates an expected call of IsEventProcessed

func (*MockProjectionMockRecorder) LatestVersion

func (mr *MockProjectionMockRecorder) LatestVersion(ctx, streamType interface{}) *gomock.Call

LatestVersion indicates an expected call of LatestVersion

func (*MockProjectionMockRecorder) Process

func (mr *MockProjectionMockRecorder) Process(ctx, e interface{}) *gomock.Call

Process indicates an expected call of Process

func (*MockProjectionMockRecorder) Remove

func (mr *MockProjectionMockRecorder) Remove(ctx, streamType, streamID interface{}) *gomock.Call

Remove indicates an expected call of Remove

func (*MockProjectionMockRecorder) Reset

func (mr *MockProjectionMockRecorder) Reset(ctx, streamType interface{}) *gomock.Call

Reset indicates an expected call of Reset

type MockPublisher

type MockPublisher struct {
	// contains filtered or unexported fields
}

MockPublisher is a mock of Publisher interface

func NewMockPublisher

func NewMockPublisher(ctrl *gomock.Controller) *MockPublisher

NewMockPublisher creates a new mock instance

func (*MockPublisher) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockPublisher) Publish

func (m *MockPublisher) Publish(arg0 context.Context, arg1 Event) error

Publish mocks base method

type MockPublisherMockRecorder

type MockPublisherMockRecorder struct {
	// contains filtered or unexported fields
}

MockPublisherMockRecorder is the mock recorder for MockPublisher

func (*MockPublisherMockRecorder) Publish

func (mr *MockPublisherMockRecorder) Publish(arg0, arg1 interface{}) *gomock.Call

Publish indicates an expected call of Publish

type MockSnapshot

type MockSnapshot struct {
	// contains filtered or unexported fields
}

MockSnapshot is a mock of Snapshot interface

func NewMockSnapshot

func NewMockSnapshot(ctrl *gomock.Controller) *MockSnapshot

NewMockSnapshot creates a new mock instance

func (*MockSnapshot) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockSnapshot) Load

func (m *MockSnapshot) Load(ctx context.Context, streamID uuid.UUID, streamType string) (Stream, error)

Load mocks base method

func (*MockSnapshot) Remove

func (m *MockSnapshot) Remove(ctx context.Context, streamType string, streamID uuid.UUID) error

Remove mocks base method

func (*MockSnapshot) Reset

func (m *MockSnapshot) Reset(ctx context.Context, streamType string) error

Reset mocks base method

func (*MockSnapshot) Should

func (m *MockSnapshot) Should(ctx context.Context, es Stream) bool

Should mocks base method

func (*MockSnapshot) Take

func (m *MockSnapshot) Take(ctx context.Context, es Stream) error

Take mocks base method

type MockSnapshotCoder

type MockSnapshotCoder struct {
	// contains filtered or unexported fields
}

MockSnapshotCoder is a mock of SnapshotCoder interface

func NewMockSnapshotCoder

func NewMockSnapshotCoder(ctrl *gomock.Controller) *MockSnapshotCoder

NewMockSnapshotCoder creates a new mock instance

func (*MockSnapshotCoder) Decode

func (m *MockSnapshotCoder) Decode(ctx context.Context, streamType string, stream []byte) (Stream, error)

Decode mocks base method

func (*MockSnapshotCoder) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockSnapshotCoder) Encode

func (m *MockSnapshotCoder) Encode(ctx context.Context, s Stream) ([]byte, error)

Encode mocks base method

type MockSnapshotCoderMockRecorder

type MockSnapshotCoderMockRecorder struct {
	// contains filtered or unexported fields
}

MockSnapshotCoderMockRecorder is the mock recorder for MockSnapshotCoder

func (*MockSnapshotCoderMockRecorder) Decode

func (mr *MockSnapshotCoderMockRecorder) Decode(ctx, streamType, stream interface{}) *gomock.Call

Decode indicates an expected call of Decode

func (*MockSnapshotCoderMockRecorder) Encode

func (mr *MockSnapshotCoderMockRecorder) Encode(ctx, s interface{}) *gomock.Call

Encode indicates an expected call of Encode

type MockSnapshotMockRecorder

type MockSnapshotMockRecorder struct {
	// contains filtered or unexported fields
}

MockSnapshotMockRecorder is the mock recorder for MockSnapshot

func (*MockSnapshotMockRecorder) Load

func (mr *MockSnapshotMockRecorder) Load(ctx, streamID, streamType interface{}) *gomock.Call

Load indicates an expected call of Load

func (*MockSnapshotMockRecorder) Remove

func (mr *MockSnapshotMockRecorder) Remove(ctx, streamType, streamID interface{}) *gomock.Call

Remove indicates an expected call of Remove

func (*MockSnapshotMockRecorder) Reset

func (mr *MockSnapshotMockRecorder) Reset(ctx, streamType interface{}) *gomock.Call

Reset indicates an expected call of Reset

func (*MockSnapshotMockRecorder) Should

func (mr *MockSnapshotMockRecorder) Should(ctx, es interface{}) *gomock.Call

Should indicates an expected call of Should

func (*MockSnapshotMockRecorder) Take

func (mr *MockSnapshotMockRecorder) Take(ctx, es interface{}) *gomock.Call

Take indicates an expected call of Take

type MockStore

type MockStore struct {
	// contains filtered or unexported fields
}

MockStore is a mock of Store interface

func NewMockStore

func NewMockStore(ctrl *gomock.Controller) *MockStore

NewMockStore creates a new mock instance

func (*MockStore) Append

func (m *MockStore) Append(ctx context.Context, streamID uuid.UUID, streamType string, events []Event, expectedVersion int) error

Append mocks base method

func (*MockStore) EXPECT

func (m *MockStore) EXPECT() *MockStoreMockRecorder

EXPECT returns an object that allows the caller to indicate expected use

func (*MockStore) Iter

func (m *MockStore) Iter(ctx context.Context, streamType string, fn func(uuid.UUID, int, []Event) error) error

Iter mocks base method

func (*MockStore) Load

func (m *MockStore) Load(ctx context.Context, streamID uuid.UUID, streamType string, fromVersion int) ([]Event, error)

Load mocks base method

type MockStoreMockRecorder

type MockStoreMockRecorder struct {
	// contains filtered or unexported fields
}

MockStoreMockRecorder is the mock recorder for MockStore

func (*MockStoreMockRecorder) Append

func (mr *MockStoreMockRecorder) Append(ctx, streamID, streamType, events, expectedVersion interface{}) *gomock.Call

Append indicates an expected call of Append

func (*MockStoreMockRecorder) Iter

func (mr *MockStoreMockRecorder) Iter(ctx, streamType, fn interface{}) *gomock.Call

Iter indicates an expected call of Iter

func (*MockStoreMockRecorder) Load

func (mr *MockStoreMockRecorder) Load(ctx, streamID, streamType, fromVersion interface{}) *gomock.Call

Load indicates an expected call of Load

type MockSubscriber

type MockSubscriber struct {
	// contains filtered or unexported fields
}

MockSubscriber is a mock of Subscriber interface

func NewMockSubscriber

func NewMockSubscriber(ctrl *gomock.Controller) *MockSubscriber

NewMockSubscriber creates a new mock instance

func (*MockSubscriber) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockSubscriber) Filter

func (m *MockSubscriber) Filter(f ...Filter)

Filter mocks base method

func (*MockSubscriber) Subscribe

func (m *MockSubscriber) Subscribe(ctx context.Context, streamType string, handler EventHandler) error

Subscribe mocks base method

func (*MockSubscriber) SubscribeToEvent

func (m *MockSubscriber) SubscribeToEvent(ctx context.Context, streamType, eventType string, handler EventHandler) error

SubscribeToEvent mocks base method

type MockSubscriberMockRecorder

type MockSubscriberMockRecorder struct {
	// contains filtered or unexported fields
}

MockSubscriberMockRecorder is the mock recorder for MockSubscriber

func (*MockSubscriberMockRecorder) Filter

func (mr *MockSubscriberMockRecorder) Filter(f ...interface{}) *gomock.Call

Filter indicates an expected call of Filter

func (*MockSubscriberMockRecorder) Subscribe

func (mr *MockSubscriberMockRecorder) Subscribe(ctx, streamType, handler interface{}) *gomock.Call

Subscribe indicates an expected call of Subscribe

func (*MockSubscriberMockRecorder) SubscribeToEvent

func (mr *MockSubscriberMockRecorder) SubscribeToEvent(ctx, streamType, eventType, handler interface{}) *gomock.Call

SubscribeToEvent indicates an expected call of SubscribeToEvent

type Producer

type Producer interface {
	SendWithRouteKey(ctx context.Context, routeKey string, message interface{}) error
	Send(ctx context.Context, message interface{}) error
	Close() error
}

type Projection

type Projection interface {
	LatestVersion(ctx context.Context, streamType string) (int, error)
	Reset(ctx context.Context, streamType string) error
	Remove(ctx context.Context, streamType string, streamID uuid.UUID) error
	IsEventProcessed(ctx context.Context, e Event) bool
	Process(ctx context.Context, e Event) error
}

type Publisher

type Publisher interface {
	Publish(context.Context, Event) error
}

type Replay

type Replay struct {
	// contains filtered or unexported fields
}

func NewReplay

func NewReplay(
	eventStore Store,
	publisher Publisher,
) *Replay

func (*Replay) Stream

func (r *Replay) Stream(ctx context.Context, streamID uuid.UUID, streamType string, withDelay time.Duration) error

func (*Replay) StreamFromVersion

func (r *Replay) StreamFromVersion(ctx context.Context, streamID uuid.UUID, streamType string, fromEvents int, withDelay time.Duration) error

func (*Replay) Streams

func (r *Replay) Streams(ctx context.Context, streamType string, withDelay time.Duration) error

type Reply

type Reply struct {
	// contains filtered or unexported fields
}

func NewReply

func NewReply(
	correlationID CorrelationID,
	streamID string,
	streamType string,
	replyType string,
	commandType string,
	version int,
	payload interface{},
	err error,
) *Reply

func (*Reply) CommandType

func (r *Reply) CommandType() string

func (*Reply) Copy

func (r *Reply) Copy() *Reply

func (*Reply) CorrelationID

func (r *Reply) CorrelationID() CorrelationID

func (*Reply) Err

func (r *Reply) Err() error

func (*Reply) IsErrorType

func (r *Reply) IsErrorType() bool

func (*Reply) IsSuccessType

func (r *Reply) IsSuccessType() bool

func (*Reply) Payload

func (r *Reply) Payload() interface{}

func (*Reply) StreamID

func (r *Reply) StreamID() string

func (*Reply) StreamType

func (r *Reply) StreamType() string

func (*Reply) Type

func (r *Reply) Type() string

func (*Reply) Version

func (r *Reply) Version() int

type ReplyReceiverFunc

type ReplyReceiverFunc func(ctx context.Context, r *Reply) error

type Repository

type Repository struct {
	// contains filtered or unexported fields
}

func NewRepository

func NewRepository(
	eventStore Store,
	publisher Publisher,
	snapshot Snapshot,
	options ...RepositoryOption,
) *Repository

func (*Repository) Load

func (r *Repository) Load(ctx context.Context, streamType string,
	streamID uuid.UUID, newStream func() Stream) (Stream, error)

func (*Repository) Save

func (r *Repository) Save(ctx context.Context, stream Stream, expectedVersion int) (err error)

type RepositoryOption

type RepositoryOption func(r *Repository)

func WithIgnoreSnapshotError

func WithIgnoreSnapshotError() RepositoryOption

func WithTxn

func WithTxn(tx Txn) RepositoryOption

type Snapshot

type Snapshot interface {
	Remove(ctx context.Context, streamType string, streamID uuid.UUID) error
	Reset(ctx context.Context, streamType string) error
	Should(ctx context.Context, es Stream) bool
	Take(ctx context.Context, es Stream) error
	Load(ctx context.Context, streamID uuid.UUID, streamType string) (Stream, error)
}

type SnapshotCoder

type SnapshotCoder interface {
	Decode(ctx context.Context, streamType string, stream []byte) (Stream, error)
	Encode(ctx context.Context, s Stream) ([]byte, error)
}

type Store

type Store interface {
	Iter(ctx context.Context, streamType string, fn func(streamID uuid.UUID, version int, events []Event) error) error
	Append(ctx context.Context, streamID uuid.UUID, streamType string, events []Event, expectedVersion int) error
	Load(ctx context.Context, streamID uuid.UUID, streamType string, fromVersion int) ([]Event, error)
}

type Stream

type Stream interface {
	StreamID() uuid.UUID
	SetStreamID(id uuid.UUID)
	StreamType() string
	Version() int
	PreviousVersion() int
	IncrementVersion()
	Changes() []Event
	ClearChanges()
	AddChange(e Event)
	Apply(e Event, isNew bool)
}

func New

func New() Stream

type Subscriber

type Subscriber interface {
	Filter(f ...Filter)
	Subscribe(ctx context.Context, streamType string, handler EventHandler) error
	SubscribeToEvent(ctx context.Context, streamType string, eventType string, handler EventHandler) error
}

type Txn

type Txn func(ctx context.Context, fn func(tx context.Context) error) error

type Versioner

type Versioner interface {
	Version() int
}

Directories

Path Synopsis
commandbus
kafka
Package kafka is a generated GoMock package.
Package kafka is a generated GoMock package.
Package envelope is a generated protocol buffer package.
Package envelope is a generated protocol buffer package.
eventbus
kafka
Package kafka is a generated GoMock package.
Package kafka is a generated GoMock package.
examples
components/messaging/kafka/proto
Package proto is a generated protocol buffer package.
Package proto is a generated protocol buffer package.
messaging
kafka
Package kafka is a generated GoMock package.
Package kafka is a generated GoMock package.
pm
Package pm is a generated GoMock package.
Package pm is a generated GoMock package.
projection
snapshot
store
txn

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL