store

package
v0.0.0-...-e8ec4e4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2019 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// EventSourceQueryClientName constant to be used as name of the event source query client connection
	EventSourceQueryClientName string = "event_source_query"

	// EventSourceCommandClientName constant to be used as name of the event source command client connection
	EventSourceCommandClientName string = "event_source_command"

	// AggregatorIDQueryKey constant to be used as the key in Query Params
	AggregatorIDQueryKey string = "AID"

	// HighestVersionQueryKey constant to be used as the key in Query Params
	HighestVersionQueryKey string = "HV"

	// LowestVersionQueryKey constant to be used as the key in Query Params
	LowestVersionQueryKey string = "LV"
)
View Source
const (
	// SnapshotCreated topic
	SnapshotCreated = "snapshot_created"
)

Variables

View Source
var (
	ErrEventWithoutAggregate = errors.New("event can not contain nil as aggregate")
	ErrInvalidAggregateId    = errors.New("event can not have an empty string as aggregateID")
	ErrInvalidVersion        = errors.New("event can not have 0 as version")
)
View Source
var ErrConcurrencyException = status.Error(codes.Aborted, "concurrency exception")

Functions

func DeleteSubscription

func DeleteSubscription(ctx context.Context, client *pubsub.Client, name string) error

DeleteSubscription deletes a Cloud PubSub subscription

func FromContextAny

func FromContextAny(ctx context.Context, key string) interface{}

FromContextAny returns from context the interface value to which the key is associated.

func GetOrCreateSubscription

func GetOrCreateSubscription(ctx context.Context, client *pubsub.Client, name string, topic *pubsub.Topic) (*pubsub.Subscription, error)

GetOrCreateSubscription gets a reference or creates a Cloud PubSub subscription for the input topic

func GetOrCreateTopic

func GetOrCreateTopic(ctx context.Context, client *pubsub.Client, topic string) (*pubsub.Topic, error)

GetOrCreateTopic create a Cloud PubSub topic if not exists

func IsSnapshotTime

func IsSnapshotTime(e *pb.Event, factor int64) bool

IsSnapshotTime validate if event version is valid to be used as snapshot based on input factor

func NewEventSourceCommandClient

func NewEventSourceCommandClient(target string) *plutoClt.Client

NewEventSourceCommandClient wrapper for a event source command grpc pluto client

func NewEventSourceQueryClient

func NewEventSourceQueryClient(target string) *plutoClt.Client

NewEventSourceQueryClient wrapper for a event source query grpc pluto client

func Subscribe

func Subscribe(client *pubsub.Client, name string, topics Topics) pluto.HookFunc

Subscribe for topics available from a redis configuration

func TakeSnapshot

func TakeSnapshot(ctx context.Context, e *pb.Event, aggregator proto.Message, aFn ApplyFn) error

TakeSnapshot loads an agregator up to current state and triggers a snapshot

func UnmarshalEventData

func UnmarshalEventData(e *pb.Event, out interface{}) error

UnmarshalEventData gets the unserialized data from the event

func UpdateTopic

func UpdateTopic(ctx context.Context, t *pubsub.Topic) (pubsub.TopicConfig, error)

UpdateTopic updates topic environment labels

func WithContextAny

func WithContextAny(ctx context.Context, key string, val interface{}) context.Context

WithContextAny returns a copy of parent ctx in which the value associated with key is val.

Types

type Action

type Action func(context.Context, *pb.Event) error

Action signature of a action func

func ActionWrapper

func ActionWrapper(aggregator interface{}, aFn ApplyFn, hFn ...HookFn) Action

ActionWrapper loads an agregator current state and previous. Hook functions should be used to trigger any subsequent business rules Or just simple cache the state of the aggregator

func SnapshotActionWrapper

func SnapshotActionWrapper(aggregator proto.Message, aFn ApplyFn, nEvents int64) Action

SnapshotActionWrapper loads an agregator current state. Takes a snapshot every number events (nEvents)

type ApplyFn

type ApplyFn func(e *pb.Event, state interface{}) (interface{}, error)

ApplyFn defines type for apply functions

type HookFn

type HookFn func(ctx context.Context, e *pb.Event, prevState, nextState interface{}) error

HookFn type

type Store

type Store struct {
	State          interface{}
	Version        int64
	HighestVersion int64
	LowestVersion  int64
}

Store holds aggregator state and version

func Aggregate

func Aggregate(ctx context.Context, aggregator interface{}, id string, in proto.Message, topic string, metadata map[string]string, apply ApplyFn, validations ...Validate) (*Store, error)

Aggregate proceess all aggregate steps

func NewStore

func NewStore(aggregator interface{}) *Store

NewStore returns new store

func (*Store) Dispatch

func (s *Store) Dispatch(ctx context.Context, e *pb.Event) (*pb.Ack, error)

Dispatch triggeres an event to be created

func (*Store) LoadEvents

func (s *Store) LoadEvents(ctx context.Context, id string, fn ApplyFn) error

LoadEvents stream events by aggregator id and apply the required changes TODO: use snapshots

func (*Store) Marshal

func (s *Store) Marshal() ([]byte, error)

Marshal takes a protocol buffer message and encodes it into the wire format, returning the data.

func (*Store) Snapit

func (s *Store) Snapit(ctx context.Context, e *pb.Event) (*pb.Ack, error)

Snapit triggeres an snapshot to be created

type Topics

type Topics map[string][]Action

Topics map between a topic and respective action

type Validate

type Validate func(*Store) error

Validate helper functions to validate the aggregate

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL