events

package
v2.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2022 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Events has a default implementations of EventStream for mtglib.

Please see documentation for mtglib.EventStream interface to get an idea of such an abstraction. This package has implementations for the default event stream.

Default event stream has a list of its own concepts. First, all it does is a routing of messages to known observers. It takes an event, defines its type and pass this message to a method of the observer.

There might be many observers, but default event stream has a guarantee though. It uses StreamID as a sharding key and guarantees that a message with the same StreamID will be devlivered to the same observer instance. So, each producer is guarateed to get all relevant messages related to the same session. It is not possible that it will get EventFinish if it has not seen EventStart for that session yet.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewNoopStream

func NewNoopStream() mtglib.EventStream

NewNoopStream creates a stream which discards each message.

Types

type EventStream

type EventStream struct {
	// contains filtered or unexported fields
}

EventStream is a default implementation of the mtglib.EventStream interface.

EventStream manages a set of goroutines, observers. Main responsibility of the event stream is to route an event to relevant observer based on some hash so each observer will have all events which belong to some stream id.

Thus, EventStream can spawn many observers.

func NewEventStream

func NewEventStream(observerFactories []ObserverFactory) EventStream

NewEventStream builds a new default event stream.

If you give an empty array of observers, then NoopObserver is going to be used. If you give many observers, then they will process a message concurrently.

func (EventStream) Send

func (e EventStream) Send(ctx context.Context, evt mtglib.Event)

Send starts delivering of the message to observer with respect to a given context If context is closed, message could be not delivered.

func (EventStream) Shutdown

func (e EventStream) Shutdown()

Shutdown stops an event stream pipeline.

type Observer

type Observer interface {
	// EventStart reacts on incoming mtglib.EventStart event.
	EventStart(mtglib.EventStart)

	// EventFinish reacts on incoming mtglib.EventFinish event.
	EventFinish(mtglib.EventFinish)

	// EventConnectedToDC reacts on incoming mtglib.EventConnectedToDC
	// event.
	EventConnectedToDC(mtglib.EventConnectedToDC)

	// EventDomainFronting reacts on incoming mtglib.EventDomainFronting
	// event.
	EventDomainFronting(mtglib.EventDomainFronting)

	// EventTraffic reacts on incoming mtglib.EventTraffic event.
	EventTraffic(mtglib.EventTraffic)

	// EventConcurrencyLimited reacts on incoming
	// mtglib.EventConcurrencyLimited event.
	EventConcurrencyLimited(mtglib.EventConcurrencyLimited)

	// EventIPBlocklisted reacts on incoming mtglib.EventIPBlocklisted event.
	EventIPBlocklisted(mtglib.EventIPBlocklisted)

	// EventReplayAttack reacts on incoming mtglib.EventReplayAttack event.
	EventReplayAttack(mtglib.EventReplayAttack)

	// EventIPListSize reacts on incoming mtglib.EventIPListSize
	EventIPListSize(mtglib.EventIPListSize)

	// Shutdown stop observer. Default event stream guarantees:
	//   1. If shutdown is executed, it is executed only once
	//   2. Observer won't receieve any new message after this
	//      function call.
	Shutdown()
}

Observer is an instance that listens for the incoming events.

As it is said in the package description, the default event stream guarantees that all events with the same StreamID are going to be routed to the same instance of the observer. So, there is no need to synchronize information about streams between many observers instances, they can have their local storage.

func NewNoopObserver

func NewNoopObserver() Observer

NewNoopObserver creates an observer which discards each message.

type ObserverFactory

type ObserverFactory func() Observer

ObserverFactory creates a new instance of the observer.

Default event stream creates a small set of goroutines to manage incoming messages. Each message is routed to an appropriate observer based on a sharding key, stream id. So, it is possible that an instance of mtg will have many observer instances, not a single one.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL