event

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2022 License: MIT Imports: 10 Imported by: 0

README

event

Event is a module that implements coponents necessary to create and run a distributed event based platform. It provides simple interface for implementing event-based distributed algorithms with reliable event delivery.

This project was developed as part of my master's thesis.

Documentation

Index

Constants

View Source
const (
	CtrlSubscribeResp   CtrlMsgType = CtrlSubscribe + CtrlRespOffset
	CtrlUnsubscribeResp             = CtrlUnsubscribe + CtrlRespOffset
	CtrlConnectToResp               = CtrlConnectTo + CtrlRespOffset
	CtrlRegisterResp                = CtrlRegister + CtrlRespOffset
)

Variables

This section is empty.

Functions

func Deserialize

func Deserialize[T any](data Event, target *T, ser *Serializer) error

Deserialize extracts the appData from data and deserializes it into target using the provided serializer

func Serialize

func Serialize[T any](data *T, target *Event, ser *Serializer) error

Serialize fills target's appData field with the serialized data using the provided serializer

Types

type AppHandlers

type AppHandlers struct {
	EventSource chan Event
	EventSink   chan targettedEvent
	// contains filtered or unexported fields
}

AppHandlers represent the algorithm in a form of event handlers. You can use the handlers separately and connect them to event source manually or use the cluster.Node type that automates the communication between cluster nodes.

Use the AppHandlers.Register method to register event handlers.

func NewAppHandlers added in v0.0.2

func NewAppHandlers(lf LoggerFactory, concurrent bool) *AppHandlers

NewAppHandlers returns empty Handlers object with the specified logger. The concurrent parameter specifies if the event handlers should be executed one by one or concurrently just after the event is received.

func (*AppHandlers) Activate

func (hs *AppHandlers) Activate() error

func (*AppHandlers) Plug

func (h *AppHandlers) Plug(l Listener, s Sender)

Plug connects the handlers to events passed from the Listener and passes the result of event handler invocations to the Sender

func (*AppHandlers) Register

func (hs *AppHandlers) Register(t EventType, h EventHandler)

func (*AppHandlers) Unregister added in v0.0.2

func (hs *AppHandlers) Unregister(t EventType)

type CtrlMsg

type CtrlMsg struct {
	// SerialNumber specifies an id unique for the message
	// sender. Together with the sender address it creates
	// an unique identifier for the message.
	SerialNumber uint64
	// Type specifies the type of the message and should be
	// one of Ctrl... constants defined in this file.
	Type CtrlMsgType
	// AppPort is the application protocol port of the CtrlMsg sender.
	// It is used to calculate the peer unique identifier when required.
	AppPort uint16
	// Addr represents a node in a form of the address and port.
	// This field is used in CtrlConnectTo message to indicate where
	// the receiver node should connect to.
	Addr netip.AddrPort
	// EventType indicates what event the message refers to.
	// It is used in CtrlSubscribe and CtrlUnsubscribe messages.
	EventType EventType
}

Represents a service protocol message

type CtrlMsgType

type CtrlMsgType = int

CtrlMsgType indicates how the message should be interpreted by the node. The CtrlMsgType defines the command and the CtrlMsg body holds the arguments for the command.

const (
	CtrlSubscribe CtrlMsgType = iota
	CtrlUnsubscribe
	CtrlConnectTo
	CtrlRegister
)

The set of available control message types.

const CtrlRespOffset CtrlMsgType = 10000

type DefaultLoggerFactory

type DefaultLoggerFactory struct {
}

func NewDefaultLoggerFactory

func NewDefaultLoggerFactory() *DefaultLoggerFactory

func (*DefaultLoggerFactory) NewLogger

func (lf *DefaultLoggerFactory) NewLogger() LoggerImpl

type Event

type Event struct {
	Type    EventType
	AppData []byte
}

func NewEvent

func NewEvent[T any](t EventType, data *T, ser *Serializer) *Event

type EventHandler

type EventHandler = func(e Event, publisher Publisher)

EventHandler represents a function that is launched as a response to the occouring events. EventHandler receives an Event to handle and a Publisher. The Publisher interface exposes functions that allow the user to publish events as part of event handling.

EventHandlers are registered in Handlers using the Handlers.Register() method.

type EventType

type EventType = int

type IdCtrlMsg

type IdCtrlMsg struct {
	Source   net.Addr
	SourceId uint32
	Msg      CtrlMsg
}

type Listener

type Listener interface {
	// Returns a channel that the events from
	// other nodes are passed to
	EventSink() chan Event
	// Runs the network listener and starts
	// passing events to the EventSink()
	Run()
	// Stops the network listener
	Stop()
}

Listener contains a network listener that accepts events from other nodes and passes them to a chanel

type LoggerFactory

type LoggerFactory interface {
	NewLogger() LoggerImpl
}

type LoggerImpl

type LoggerImpl = *zap.SugaredLogger

type PeerId

type PeerId = uint32

func Hash

func Hash(data encoding.BinaryMarshaler) PeerId

func HashAddr

func HashAddr(addr net.Addr) (PeerId, error)

func HashAddrStr

func HashAddrStr(addr string) (PeerId, error)

type Publisher added in v0.0.2

type Publisher interface {
	// ToAll publishes the event to the network. It guarantees
	// that the order in which the events are published is the
	// preserved upon delivery.
	ToAll(e Event)
	// ToSelf publishes the event only to the node that
	// executes the event where the Publisher is used.
	ToSelf(e Event)
	// BackgroundTask asynchronously executes the task
	// and publishes the notification event to self when
	// the task finishes. The task can be any function that
	// accepts no parameters and returns no value.
	//
	// The task is ran outside the event handlers path of
	// execution and is fully asynchronous. If the task needs
	// to access any variable that the event handler accesses,
	// this action must be protected with a mutual exclusion
	// mechanism.
	BackgroundTask(finishNotification Event, task func())
}

Publisher allows the user to publish events inside event handlers. The events are published after the handler exits

type Sender

type Sender interface {
	// EventSource returns the channel from which the sender
	// picks up events to publish
	EventSource() chan targettedEvent
	// Run activates a background worker that receives events
	// from the channel accesed by Sender.EventSource()
	// and sends them to the subscribed nodes.
	Run()
	// Stop halts the background worker and closes all
	// the established connections
	Stop()
	// Connect establishes a connection to other event system node
	// (specifically to the other node's Listener)
	Connect(addr string) error
	// RegisterSub registers a subscription for the
	// specified address. The cluster.Node structure uses the
	// register/unregister functionality to optimise the
	// number of messages passed through the network.
	RegisterSub(t EventType, addr net.Addr)
	// UnregisterSub removes the subscription on the
	// specified event. After unregistering a subscription
	// the node at addr will no longer get notified about
	// the event with type t.
	UnregisterSub(t EventType, addr net.Addr)
	GetPeers() []net.Addr
}

Sender is an interface for the event publishing element of the event system node. It broadcasts the events to other nodes in the cluster.

type SerializedConn

type SerializedConn struct {
	io.ReadWriteCloser
	// contains filtered or unexported fields
}

func NewSerializedConn

func NewSerializedConn(conn io.ReadWriteCloser) *SerializedConn

func (*SerializedConn) Read

func (c *SerializedConn) Read(e any) error

func (*SerializedConn) Write

func (c *SerializedConn) Write(e any) error

type Serializer

type Serializer struct {
	// contains filtered or unexported fields
}

Serializer allows to serialize structs into []byte It should be used to convert user defined structs into Event payloads and deserialize app data from received events. *Serializer is not thread safe

func NewSerializer

func NewSerializer() *Serializer

func (*Serializer) Register added in v0.0.3

func (s *Serializer) Register(values []any)

type TcpListener

type TcpListener struct {
	// contains filtered or unexported fields
}

TcpListener is an implementation of the event.Listener interface based on the TCP protocol. It includes network messages serialization using the event.SerializedConn struct.

func NewTcpListener

func NewTcpListener(appAddr net.Addr, lf LoggerFactory) *TcpListener

func (*TcpListener) EventSink

func (l *TcpListener) EventSink() chan Event

func (*TcpListener) Run

func (l *TcpListener) Run()

func (*TcpListener) Stop

func (l *TcpListener) Stop()

type TcpSender added in v0.0.2

type TcpSender struct {
	// contains filtered or unexported fields
}

func NewTcpSender added in v0.0.2

func NewTcpSender(lf LoggerFactory) *TcpSender

func (*TcpSender) Connect added in v0.0.2

func (s *TcpSender) Connect(appAddr string) error

func (*TcpSender) EventSource added in v0.0.2

func (s *TcpSender) EventSource() chan targettedEvent

func (*TcpSender) GetPeers added in v0.0.2

func (s *TcpSender) GetPeers() []net.Addr

func (*TcpSender) RegisterSub added in v0.0.2

func (s *TcpSender) RegisterSub(t EventType, addr net.Addr)

func (*TcpSender) Run added in v0.0.2

func (s *TcpSender) Run()

func (*TcpSender) Stop added in v0.0.2

func (s *TcpSender) Stop()

func (*TcpSender) UnregisterSub added in v0.0.2

func (s *TcpSender) UnregisterSub(t EventType, addr net.Addr)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL