Documentation ¶
Index ¶
- Constants
- func Deserialize[T any](data Event, target *T, ser *Serializer) error
- func Serialize[T any](data *T, target *Event, ser *Serializer) error
- type AppHandlers
- type CtrlMsg
- type CtrlMsgType
- type DefaultLoggerFactory
- type Event
- type EventHandler
- type EventType
- type IdCtrlMsg
- type Listener
- type LoggerFactory
- type LoggerImpl
- type PeerId
- type Publisher
- type Sender
- type SerializedConn
- type Serializer
- type TcpListener
- type TcpSender
- func (s *TcpSender) Connect(appAddr string) error
- func (s *TcpSender) EventSource() chan targettedEvent
- func (s *TcpSender) GetPeers() []net.Addr
- func (s *TcpSender) RegisterSub(t EventType, addr net.Addr)
- func (s *TcpSender) Run()
- func (s *TcpSender) Stop()
- func (s *TcpSender) UnregisterSub(t EventType, addr net.Addr)
Constants ¶
const ( CtrlSubscribeResp CtrlMsgType = CtrlSubscribe + CtrlRespOffset CtrlUnsubscribeResp = CtrlUnsubscribe + CtrlRespOffset CtrlConnectToResp = CtrlConnectTo + CtrlRespOffset CtrlRegisterResp = CtrlRegister + CtrlRespOffset )
Variables ¶
This section is empty.
Functions ¶
func Deserialize ¶
func Deserialize[T any](data Event, target *T, ser *Serializer) error
Deserialize extracts the appData from data and deserializes it into target using the provided serializer
Types ¶
type AppHandlers ¶
type AppHandlers struct { EventSource chan Event EventSink chan targettedEvent // contains filtered or unexported fields }
AppHandlers represent the algorithm in a form of event handlers. You can use the handlers separately and connect them to event source manually or use the cluster.Node type that automates the communication between cluster nodes.
Use the AppHandlers.Register method to register event handlers.
func NewAppHandlers ¶ added in v0.0.2
func NewAppHandlers(lf LoggerFactory, concurrent bool) *AppHandlers
NewAppHandlers returns empty Handlers object with the specified logger. The concurrent parameter specifies if the event handlers should be executed one by one or concurrently just after the event is received.
func (*AppHandlers) Activate ¶
func (hs *AppHandlers) Activate() error
func (*AppHandlers) Plug ¶
func (h *AppHandlers) Plug(l Listener, s Sender)
Plug connects the handlers to events passed from the Listener and passes the result of event handler invocations to the Sender
func (*AppHandlers) Register ¶
func (hs *AppHandlers) Register(t EventType, h EventHandler)
func (*AppHandlers) Unregister ¶ added in v0.0.2
func (hs *AppHandlers) Unregister(t EventType)
type CtrlMsg ¶
type CtrlMsg struct { // SerialNumber specifies an id unique for the message // sender. Together with the sender address it creates // an unique identifier for the message. SerialNumber uint64 // Type specifies the type of the message and should be // one of Ctrl... constants defined in this file. Type CtrlMsgType // AppPort is the application protocol port of the CtrlMsg sender. // It is used to calculate the peer unique identifier when required. AppPort uint16 // Addr represents a node in a form of the address and port. // This field is used in CtrlConnectTo message to indicate where // the receiver node should connect to. Addr netip.AddrPort // EventType indicates what event the message refers to. // It is used in CtrlSubscribe and CtrlUnsubscribe messages. EventType EventType }
Represents a service protocol message
type CtrlMsgType ¶
type CtrlMsgType = int
CtrlMsgType indicates how the message should be interpreted by the node. The CtrlMsgType defines the command and the CtrlMsg body holds the arguments for the command.
const ( CtrlSubscribe CtrlMsgType = iota CtrlUnsubscribe CtrlConnectTo CtrlRegister )
The set of available control message types.
const CtrlRespOffset CtrlMsgType = 10000
type DefaultLoggerFactory ¶
type DefaultLoggerFactory struct { }
func NewDefaultLoggerFactory ¶
func NewDefaultLoggerFactory() *DefaultLoggerFactory
func (*DefaultLoggerFactory) NewLogger ¶
func (lf *DefaultLoggerFactory) NewLogger() LoggerImpl
type EventHandler ¶
EventHandler represents a function that is launched as a response to the occouring events. EventHandler receives an Event to handle and a Publisher. The Publisher interface exposes functions that allow the user to publish events as part of event handling.
EventHandlers are registered in Handlers using the Handlers.Register() method.
type Listener ¶
type Listener interface { // Returns a channel that the events from // other nodes are passed to EventSink() chan Event // Runs the network listener and starts // passing events to the EventSink() Run() // Stops the network listener Stop() }
Listener contains a network listener that accepts events from other nodes and passes them to a chanel
type LoggerFactory ¶
type LoggerFactory interface {
NewLogger() LoggerImpl
}
type LoggerImpl ¶
type LoggerImpl = *zap.SugaredLogger
type PeerId ¶
type PeerId = uint32
func Hash ¶
func Hash(data encoding.BinaryMarshaler) PeerId
func HashAddrStr ¶
type Publisher ¶ added in v0.0.2
type Publisher interface { // ToAll publishes the event to the network. It guarantees // that the order in which the events are published is the // preserved upon delivery. ToAll(e Event) // ToSelf publishes the event only to the node that // executes the event where the Publisher is used. ToSelf(e Event) // BackgroundTask asynchronously executes the task // and publishes the notification event to self when // the task finishes. The task can be any function that // accepts no parameters and returns no value. // // The task is ran outside the event handlers path of // execution and is fully asynchronous. If the task needs // to access any variable that the event handler accesses, // this action must be protected with a mutual exclusion // mechanism. BackgroundTask(finishNotification Event, task func()) }
Publisher allows the user to publish events inside event handlers. The events are published after the handler exits
type Sender ¶
type Sender interface { // EventSource returns the channel from which the sender // picks up events to publish EventSource() chan targettedEvent // Run activates a background worker that receives events // from the channel accesed by Sender.EventSource() // and sends them to the subscribed nodes. Run() // Stop halts the background worker and closes all // the established connections Stop() // Connect establishes a connection to other event system node // (specifically to the other node's Listener) Connect(addr string) error // RegisterSub registers a subscription for the // specified address. The cluster.Node structure uses the // register/unregister functionality to optimise the // number of messages passed through the network. RegisterSub(t EventType, addr net.Addr) // UnregisterSub removes the subscription on the // specified event. After unregistering a subscription // the node at addr will no longer get notified about // the event with type t. UnregisterSub(t EventType, addr net.Addr) GetPeers() []net.Addr }
Sender is an interface for the event publishing element of the event system node. It broadcasts the events to other nodes in the cluster.
type SerializedConn ¶
type SerializedConn struct { io.ReadWriteCloser // contains filtered or unexported fields }
func NewSerializedConn ¶
func NewSerializedConn(conn io.ReadWriteCloser) *SerializedConn
func (*SerializedConn) Read ¶
func (c *SerializedConn) Read(e any) error
func (*SerializedConn) Write ¶
func (c *SerializedConn) Write(e any) error
type Serializer ¶
type Serializer struct {
// contains filtered or unexported fields
}
Serializer allows to serialize structs into []byte It should be used to convert user defined structs into Event payloads and deserialize app data from received events. *Serializer is not thread safe
func NewSerializer ¶
func NewSerializer() *Serializer
func (*Serializer) Register ¶ added in v0.0.3
func (s *Serializer) Register(values []any)
type TcpListener ¶
type TcpListener struct {
// contains filtered or unexported fields
}
TcpListener is an implementation of the event.Listener interface based on the TCP protocol. It includes network messages serialization using the event.SerializedConn struct.
func NewTcpListener ¶
func NewTcpListener(appAddr net.Addr, lf LoggerFactory) *TcpListener
func (*TcpListener) EventSink ¶
func (l *TcpListener) EventSink() chan Event
func (*TcpListener) Run ¶
func (l *TcpListener) Run()
func (*TcpListener) Stop ¶
func (l *TcpListener) Stop()
type TcpSender ¶ added in v0.0.2
type TcpSender struct {
// contains filtered or unexported fields
}
func NewTcpSender ¶ added in v0.0.2
func NewTcpSender(lf LoggerFactory) *TcpSender
func (*TcpSender) EventSource ¶ added in v0.0.2
func (s *TcpSender) EventSource() chan targettedEvent