handler

package
v0.0.0-...-860e413 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2022 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoProjection    = errors.New("no projection")
	ErrNoValues        = errors.New("no values")
	ErrNoCondition     = errors.New("no condition")
	ErrSomeStmtsFailed = errors.New("some statements failed")
)

Functions

This section is empty.

Types

type AggregateReducer

type AggregateReducer struct {
	Aggregate     eventstore.AggregateType
	EventRedusers []EventReducer
}

EventReducer represents the required data to work with aggregates

type Check

type Check struct {
	Executes []func(ex Executer, projectionName string) (bool, error)
}

func (*Check) IsNoop

func (c *Check) IsNoop() bool

type Column

type Column struct {
	Name         string
	Value        interface{}
	ParameterOpt func(string) string
}

func NewCol

func NewCol(name string, value interface{}) Column

func NewJSONCol

func NewJSONCol(name string, value interface{}) Column

type Condition

type Condition Column

func NewCond

func NewCond(name string, value interface{}) Condition

type EventReducer

type EventReducer struct {
	Event  eventstore.EventType
	Reduce Reduce
}

EventReducer represents the required data to work with events

type Executer

type Executer interface {
	Exec(string, ...interface{}) (sql.Result, error)
}

type Handler

type Handler struct {
	Eventstore *eventstore.Eventstore
	Sub        *eventstore.Subscription
	EventQueue chan eventstore.Event
}

func NewHandler

func NewHandler(config HandlerConfig) Handler

func (*Handler) Subscribe

func (h *Handler) Subscribe(aggregates ...eventstore.AggregateType)

func (*Handler) SubscribeEvents

func (h *Handler) SubscribeEvents(types map[eventstore.AggregateType][]eventstore.EventType)

func (*Handler) Unsubscribe

func (h *Handler) Unsubscribe()

type HandlerConfig

type HandlerConfig struct {
	Eventstore *eventstore.Eventstore
}

type Init

type Init func(context.Context, *Check) error

Init initializes the projection with the given check

type Lock

type Lock func(context.Context, time.Duration, ...string) <-chan error

Lock is used for mutex handling if needed on the projection

type ProjectionHandler

type ProjectionHandler struct {
	Handler
	ProjectionName string
	// contains filtered or unexported fields
}

func NewProjectionHandler

func NewProjectionHandler(
	ctx context.Context,
	config ProjectionHandlerConfig,
	reduce Reduce,
	update Update,
	query SearchQuery,
	lock Lock,
	unlock Unlock,
	initialized <-chan bool,
) *ProjectionHandler

func (*ProjectionHandler) FetchEvents

func (h *ProjectionHandler) FetchEvents(ctx context.Context, instances ...string) ([]eventstore.Event, bool, error)

FetchEvents checks the current sequences and filters for newer events

func (*ProjectionHandler) Process

func (h *ProjectionHandler) Process(ctx context.Context, events ...eventstore.Event) (index int, err error)

Process handles multiple events by reducing them to statements and updating the projection

func (*ProjectionHandler) Trigger

func (h *ProjectionHandler) Trigger(ctx context.Context, instances ...string) error

Trigger handles all events for the provided instances (or current instance from context if non specified) by calling FetchEvents and Process until the amount of events is smaller than the BulkLimit

type ProjectionHandlerConfig

type ProjectionHandlerConfig struct {
	HandlerConfig
	ProjectionName      string
	RequeueEvery        time.Duration
	RetryFailedAfter    time.Duration
	Retries             uint
	ConcurrentInstances uint
}

type Reduce

type Reduce func(eventstore.Event) (*Statement, error)

Reduce reduces the given event to a statement which is used to update the projection

type SearchQuery

type SearchQuery func(ctx context.Context, instanceIDs []string) (query *eventstore.SearchQueryBuilder, queryLimit uint64, err error)

SearchQuery generates the search query to lookup for events

type Statement

type Statement struct {
	AggregateType    eventstore.AggregateType
	Sequence         uint64
	PreviousSequence uint64
	InstanceID       string

	Execute func(ex Executer, projectionName string) error
}

func (*Statement) IsNoop

func (s *Statement) IsNoop() bool

type Statements

type Statements []Statement

func (Statements) Len

func (stmts Statements) Len() int

func (Statements) Less

func (stmts Statements) Less(i, j int) bool

func (Statements) Swap

func (stmts Statements) Swap(i, j int)

type Unlock

type Unlock func(...string) error

Unlock releases the mutex of the projection

type Update

type Update func(context.Context, []*Statement, Reduce) (index int, err error)

Update updates the projection with the given statements

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL