engine

package
v0.29.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2023 License: AGPL-3.0 Imports: 9 Imported by: 0

README

Notifier

The Notifier implements the following state machine Notifier State Machine

The intended usage pattern is:

  • there are goroutines, aka Producers, that append work to a queue pendingWorkQueue
  • there is a number of goroutines, aka Consumers, that pull work from the pendingWorkQueue
    • they consume work until they have drained the pendingWorkQueue
    • when they find that the pendingWorkQueue contains no more work, they go back to the notifier and await notification

Notifier Usage Pattern

Note that the consumer / producer interact in a different order with the pendingWorkQueue vs the notifier:

  • the producer first drops its work into the queue and subsequently sends the notification
  • the consumer first processes elements from the queue and subsequently checks for a notification Thereby, it is guaranteed that at least one consumer routine will be notified when work is added

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// IncompatibleInputTypeError indicates that the input has an incompatible type
	IncompatibleInputTypeError = errors.New("incompatible input type")
)

Functions

func IsDuplicatedEntryError

func IsDuplicatedEntryError(err error) bool

func IsIncompatibleInputTypeError

func IsIncompatibleInputTypeError(err error) bool

func IsInvalidInputError

func IsInvalidInputError(err error) bool

IsInvalidInputError returns whether the given error is an InvalidInputError error

func IsNetworkTransmissionError

func IsNetworkTransmissionError(err error) bool

IsNetworkTransmissionError returns whether the given error is a NetworkTransmissionError error

func IsOutdatedInputError

func IsOutdatedInputError(err error) bool

func IsUnverifiableInputError

func IsUnverifiableInputError(err error) bool

func LogError

func LogError(log zerolog.Logger, err error)

LogError logs the engine processing error

func LogErrorWithMsg

func LogErrorWithMsg(log zerolog.Logger, msg string, err error)

func NewDuplicatedEntryErrorf

func NewDuplicatedEntryErrorf(msg string, args ...interface{}) error

func NewInvalidInputError

func NewInvalidInputError(msg string) error

func NewInvalidInputErrorf

func NewInvalidInputErrorf(msg string, args ...interface{}) error

func NewNetworkTransmissionError

func NewNetworkTransmissionError(msg string) error

func NewNetworkTransmissionErrorf

func NewNetworkTransmissionErrorf(msg string, args ...interface{}) error

func NewOutdatedInputErrorf

func NewOutdatedInputErrorf(msg string, args ...interface{}) error

func NewUnverifiableInputError

func NewUnverifiableInputError(msg string, args ...interface{}) error

Types

type DuplicatedEntryError

type DuplicatedEntryError struct {
	// contains filtered or unexported fields
}

func (DuplicatedEntryError) Error

func (e DuplicatedEntryError) Error() string

func (DuplicatedEntryError) Unwrap

func (e DuplicatedEntryError) Unwrap() error

type FifoMessageStore

type FifoMessageStore struct {
	*fifoqueue.FifoQueue
}

FifoMessageStore wraps a FiFo Queue to implement the MessageStore interface.

func (*FifoMessageStore) Get

func (s *FifoMessageStore) Get() (*Message, bool)

func (*FifoMessageStore) Put

func (s *FifoMessageStore) Put(msg *Message) bool

type FilterFunc

type FilterFunc func(*Message) bool

type InvalidInputError

type InvalidInputError struct {
	// contains filtered or unexported fields
}

InvalidInputError are errors for caused by invalid inputs. It's useful to distinguish these known errors from exceptions. By distinguishing errors from exceptions, we can log them differently. For instance, log InvalidInputError error as a warn log, and log other error as an error log.

func (InvalidInputError) Error

func (e InvalidInputError) Error() string

func (InvalidInputError) Unwrap

func (e InvalidInputError) Unwrap() error

type MapFunc

type MapFunc func(*Message) (*Message, bool)

type MatchFunc

type MatchFunc func(*Message) bool

type Message

type Message struct {
	OriginID flow.Identifier
	Payload  interface{}
}

type MessageHandler

type MessageHandler struct {
	// contains filtered or unexported fields
}

func NewMessageHandler

func NewMessageHandler(log zerolog.Logger, notifier Notifier, patterns ...Pattern) *MessageHandler

func (*MessageHandler) GetNotifier

func (e *MessageHandler) GetNotifier() <-chan struct{}

func (*MessageHandler) Process

func (e *MessageHandler) Process(originID flow.Identifier, payload interface{}) error

Process iterates over the internal processing patterns and determines if the payload matches. The _first_ matching pattern processes the payload. Returns

  • IncompatibleInputTypeError if no matching processor was found
  • All other errors are potential symptoms of internal state corruption or bugs (fatal).

type MessageStore

type MessageStore interface {
	Put(*Message) bool
	Get() (*Message, bool)
}

MessageStore is the interface to abstract how messages are buffered in memory before being handled by the engine

type NetworkTransmissionError

type NetworkTransmissionError struct {
	// contains filtered or unexported fields
}

NetworkTransmissionError captures the general sentinel errors upon network transmission. It is used to distinguish network transmission errors from other errors.

func (NetworkTransmissionError) Error

func (e NetworkTransmissionError) Error() string

func (NetworkTransmissionError) Unwrap

func (e NetworkTransmissionError) Unwrap() error

type Notifier

type Notifier struct {
	// contains filtered or unexported fields
}

Notifier is a concurrency primitive for informing worker routines about the arrival of new work unit(s). Notifiers essentially behave like channels in that they can be passed by value and still allow concurrent updates of the same internal state.

func NewNotifier

func NewNotifier() Notifier

NewNotifier instantiates a Notifier. Notifiers essentially behave like channels in that they can be passed by value and still allow concurrent updates of the same internal state.

func (Notifier) Channel

func (n Notifier) Channel() <-chan struct{}

Channel returns a channel for receiving notifications

func (Notifier) Notify

func (n Notifier) Notify()

Notify sends a notification

type OutdatedInputError

type OutdatedInputError struct {
	// contains filtered or unexported fields
}

OutdatedInputError are for inputs that are outdated. An outdated input doesn't mean whether the input was invalid or not, knowing that would take more computation that isn't necessary. An outdated input could also for a duplicated input: the duplication is outdated.

func (OutdatedInputError) Error

func (e OutdatedInputError) Error() string

func (OutdatedInputError) Unwrap

func (e OutdatedInputError) Unwrap() error

type Pattern

type Pattern struct {
	// Match is a function to match a message to this pattern, typically by payload type.
	Match MatchFunc
	// Map is a function to apply to messages before storing them. If not provided, then the message is stored in its original form.
	Map MapFunc
	// Store is an abstract message store where we will store the message upon receipt.
	Store MessageStore
}

type Unit

type Unit struct {
	sync.Mutex // can be used to synchronize the engine
	// contains filtered or unexported fields
}

Unit handles synchronization management, startup, and shutdown for engines.

func NewUnit

func NewUnit() *Unit

NewUnit returns a new unit.

func (*Unit) Ctx

func (u *Unit) Ctx() context.Context

Ctx returns a context with the same lifecycle scope as the unit. In particular, it is cancelled when Done is called, so it can be used as the parent context for processes spawned by any engine whose lifecycle is managed by a unit.

func (*Unit) Do

func (u *Unit) Do(f func() error) error

Do synchronously executes the input function f unless the unit has shut down. It returns the result of f. If f is executed, the unit will not shut down until after f returns.

func (*Unit) Done

func (u *Unit) Done(actions ...func()) <-chan struct{}

Done returns a channel that is closed when the unit is done. A unit is done when (i) the series of "action" functions are executed and (ii) all pending functions invoked with `Do` or `Launch` have completed.

The engine using the unit is responsible for defining these action functions as required.

func (*Unit) Launch

func (u *Unit) Launch(f func())

Launch asynchronously executes the input function unless the unit has shut down. If f is executed, the unit will not shut down until after f returns.

func (*Unit) LaunchAfter

func (u *Unit) LaunchAfter(delay time.Duration, f func())

LaunchAfter asynchronously executes the input function after a certain delay unless the unit has shut down.

func (*Unit) LaunchPeriodically

func (u *Unit) LaunchPeriodically(f func(), interval time.Duration, delay time.Duration)

LaunchPeriodically asynchronously executes the input function on `interval` periods unless the unit has shut down. If f is executed, the unit will not shut down until after f returns.

func (*Unit) Quit

func (u *Unit) Quit() <-chan struct{}

Quit returns a channel that is closed when the unit begins to shut down.

func (*Unit) Ready

func (u *Unit) Ready(checks ...func()) <-chan struct{}

Ready returns a channel that is closed when the unit is ready. A unit is ready when the series of "check" functions are executed.

The engine using the unit is responsible for defining these check functions as required.

type UnverifiableInputError

type UnverifiableInputError struct {
	// contains filtered or unexported fields
}

UnverifiableInputError are for inputs that cannot be verified at this moment. Usually it means that we don't have enough data to verify it. A good example is missing data in DB to process input.

func (UnverifiableInputError) Error

func (e UnverifiableInputError) Error() string

func (UnverifiableInputError) Unwrap

func (e UnverifiableInputError) Unwrap() error

Directories

Path Synopsis
access
rest/models
* Access API * * No description provided (generated by Swagger Codegen https://github.com/swagger-api/swagger-codegen) * * API version: 1.0.0 * Generated by: Swagger Codegen (https://github.com/swagger-api/swagger-codegen.git)
* Access API * * No description provided (generated by Swagger Codegen https://github.com/swagger-api/swagger-codegen) * * API version: 1.0.0 * Generated by: Swagger Codegen (https://github.com/swagger-api/swagger-codegen.git)
rpc
collection
ingest
Package ingest implements an engine for receiving transactions that need to be packaged into a collection.
Package ingest implements an engine for receiving transactions that need to be packaged into a collection.
pusher
Package pusher implements an engine for providing access to resources held by the collection node, including collections, collection guarantees, and transactions.
Package pusher implements an engine for providing access to resources held by the collection node, including collections, collection guarantees, and transactions.
rpc
Package rpc implements accepting transactions into the system.
Package rpc implements accepting transactions into the system.
common
rpc
dkg
rpc
ghost
verification

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL