node

package
v1.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2024 License: MIT Imports: 18 Imported by: 1

Documentation

Index

Constants

View Source
const (
	DISCONNECT_MODE_ALWAYS = "always"
	DISCONNECT_MODE_AUTO   = "auto"
	DISCONNECT_MODE_NEVER  = "never"
)

Variables

Functions

This section is empty.

Types

type AppNode added in v1.0.1

type AppNode interface {
	HandlePubSub(msg []byte)
	LookupSession(id string) *Session
	Authenticate(s *Session, opts ...AuthOption) (*common.ConnectResult, error)
	Authenticated(s *Session, identifiers string)
	Subscribe(s *Session, msg *common.Message) (*common.CommandResult, error)
	Unsubscribe(s *Session, msg *common.Message) (*common.CommandResult, error)
	Perform(s *Session, msg *common.Message) (*common.CommandResult, error)
	Disconnect(s *Session) error
}

AppNode describes a basic node interface

type AuthOption added in v1.4.0

type AuthOption = func(*AuthOptions)

func WithDisconnectOnFailure added in v1.4.0

func WithDisconnectOnFailure(disconnect bool) AuthOption

type AuthOptions added in v1.4.0

type AuthOptions struct {
	DisconnectOnFailure bool
}

type Config added in v1.0.5

type Config struct {
	// Define when to invoke Disconnect callback
	DisconnectMode string
	// The number of goroutines to use for disconnect calls on shutdown
	ShutdownDisconnectPoolSize int
	// How often server should send Action Cable ping messages (seconds)
	PingInterval int
	// How ofter to refresh node stats (seconds)
	StatsRefreshInterval int
	// The max size of the Go routines pool for hub
	HubGopoolSize int
	// How should ping message timestamp be formatted? ('s' => seconds, 'ms' => milli seconds, 'ns' => nano seconds)
	PingTimestampPrecision string
	// For how long to wait for pong message before disconnecting (seconds)
	PongTimeout int
	// For how long to wait for disconnect callbacks to be processed before exiting (seconds)
	ShutdownTimeout int
}

Config contains general application/node settings

func NewConfig added in v1.0.5

func NewConfig() Config

NewConfig builds a new config

type Connection added in v1.1.0

type Connection interface {
	Write(msg []byte, deadline time.Time) error
	WriteBinary(msg []byte, deadline time.Time) error
	Read() ([]byte, error)
	Close(code int, reason string)
}

Connection represents underlying connection

type Controller

type Controller interface {
	Start() error
	Shutdown() error
	Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error)
	Subscribe(sid string, env *common.SessionEnv, ids string, channel string) (*common.CommandResult, error)
	Unsubscribe(sid string, env *common.SessionEnv, ids string, channel string) (*common.CommandResult, error)
	Perform(sid string, env *common.SessionEnv, ids string, channel string, data string) (*common.CommandResult, error)
	Disconnect(sid string, env *common.SessionEnv, ids string, subscriptions []string) error
}

Controller is an interface describing business-logic handler (e.g. RPC)

type DisconnectQueue

type DisconnectQueue struct {
	// contains filtered or unexported fields
}

DisconnectQueue is a rate-limited executor

func NewDisconnectQueue

func NewDisconnectQueue(node *Node, config *DisconnectQueueConfig, l *slog.Logger) *DisconnectQueue

NewDisconnectQueue builds new queue with a specified rate (max calls per second)

func (*DisconnectQueue) Enqueue

func (d *DisconnectQueue) Enqueue(s *Session) error

Enqueue adds session to the disconnect queue

func (*DisconnectQueue) Run

func (d *DisconnectQueue) Run() error

Run starts queue

func (*DisconnectQueue) Shutdown

func (d *DisconnectQueue) Shutdown(ctx context.Context) error

Shutdown stops throttling and makes requests one by one

func (*DisconnectQueue) Size

func (d *DisconnectQueue) Size() int

Size returns the number of enqueued tasks

type DisconnectQueueConfig added in v1.0.1

type DisconnectQueueConfig struct {
	// Limit the number of Disconnect RPC calls per second
	Rate int
	// The size of the channel's buffer for disconnect requests
	Backlog int
	// How much time wait to call all enqueued calls at exit (in seconds) [DEPREACTED]
	ShutdownTimeout int
}

DisconnectQueueConfig contains DisconnectQueue configuration

func NewDisconnectQueueConfig added in v1.0.1

func NewDisconnectQueueConfig() DisconnectQueueConfig

NewDisconnectQueueConfig builds a new config

type Disconnector added in v1.0.1

type Disconnector interface {
	Run() error
	Shutdown(ctx context.Context) error
	Enqueue(*Session) error
	Size() int
}

Disconnector is an interface for disconnect queue implementation

type Executor added in v1.1.0

type Executor interface {
	HandleCommand(*Session, *common.Message) error
	Disconnect(*Session) error
}

Executor handles incoming commands (messages)

type InlineDisconnector added in v1.5.0

type InlineDisconnector struct {
	// contains filtered or unexported fields
}

InlineDisconnector performs Disconnect calls synchronously

func NewInlineDisconnector added in v1.5.0

func NewInlineDisconnector(n *Node) *InlineDisconnector

NewInlineDisconnector returns new InlineDisconnector

func (*InlineDisconnector) Enqueue added in v1.5.0

func (d *InlineDisconnector) Enqueue(s *Session) error

Enqueue disconnects session immediately

func (*InlineDisconnector) Run added in v1.5.0

func (d *InlineDisconnector) Run() error

Run does nothing

func (*InlineDisconnector) Shutdown added in v1.5.0

func (d *InlineDisconnector) Shutdown(ctx context.Context) error

Shutdown does nothing

func (*InlineDisconnector) Size added in v1.5.0

func (d *InlineDisconnector) Size() int

Size returns 0

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node represents the whole application

func NewNode

func NewNode(config *Config, opts ...NodeOption) *Node

NewNode builds new node struct

func (*Node) Authenticate

func (n *Node) Authenticate(s *Session, options ...AuthOption) (*common.ConnectResult, error)

Authenticate calls controller to perform authentication. If authentication is successful, session is registered with a hub.

func (*Node) Authenticated added in v1.4.0

func (n *Node) Authenticated(s *Session, ids string)

Mark session as authenticated and register it with a hub. Useful when you perform authentication manually, not using a controller.

func (*Node) Broadcast

func (n *Node) Broadcast(msg *common.StreamMessage)

Broadcast message to stream (locally)

func (*Node) Disconnect

func (n *Node) Disconnect(s *Session) error

Disconnect adds session to disconnector queue and unregister session from hub

func (*Node) DisconnectNow

func (n *Node) DisconnectNow(s *Session) error

DisconnectNow execute disconnect on controller

func (*Node) ExecuteRemoteCommand added in v1.4.0

func (n *Node) ExecuteRemoteCommand(msg *common.RemoteCommandMessage)

Execute remote command (locally)

func (*Node) HandleBroadcast added in v1.4.0

func (n *Node) HandleBroadcast(raw []byte)

HandleBroadcast parses incoming broadcast message, record it and re-transmit to other nodes

func (*Node) HandleCommand

func (n *Node) HandleCommand(s *Session, msg *common.Message) (err error)

HandleCommand parses incoming message from client and execute the command (if recognized)

func (*Node) HandlePubSub added in v1.0.1

func (n *Node) HandlePubSub(raw []byte)

HandlePubSub parses incoming pubsub message and broadcast it to all clients (w/o using a broker)

func (*Node) History added in v1.4.0

func (n *Node) History(s *Session, msg *common.Message) error

History fetches the stream history for the specified identifier

func (*Node) ID added in v1.5.0

func (n *Node) ID() string

ID returns node identifier

func (*Node) Instrumenter added in v1.4.0

func (n *Node) Instrumenter() metrics.Instrumenter

Return current instrumenter for the node

func (*Node) IsShuttingDown added in v1.4.2

func (n *Node) IsShuttingDown() bool

func (*Node) LookupSession added in v1.1.4

func (n *Node) LookupSession(id string) *Session

func (*Node) Perform

func (n *Node) Perform(s *Session, msg *common.Message) (*common.CommandResult, error)

Perform executes client command

func (*Node) RemoteDisconnect added in v1.0.1

func (n *Node) RemoteDisconnect(msg *common.RemoteDisconnectMessage)

RemoteDisconnect find a session by identifier and closes it

func (*Node) SetBroker added in v1.4.0

func (n *Node) SetBroker(b broker.Broker)

func (*Node) SetDisconnector added in v1.0.1

func (n *Node) SetDisconnector(d Disconnector)

SetDisconnector set disconnector for the node

func (*Node) Shutdown

func (n *Node) Shutdown(ctx context.Context) (err error)

Shutdown stops all services (hub, controller)

func (*Node) Size added in v1.4.4

func (n *Node) Size() int

func (*Node) Start added in v1.0.1

func (n *Node) Start() error

Start runs all the required goroutines

func (*Node) Subscribe

func (n *Node) Subscribe(s *Session, msg *common.Message) (*common.CommandResult, error)

Subscribe subscribes session to a channel

func (*Node) TryRestoreSession added in v1.4.0

func (n *Node) TryRestoreSession(s *Session) (restored bool)

func (*Node) Unsubscribe

func (n *Node) Unsubscribe(s *Session, msg *common.Message) (*common.CommandResult, error)

Unsubscribe unsubscribes session from a channel

func (*Node) Whisper added in v1.5.0

func (n *Node) Whisper(s *Session, msg *common.Message) error

Whisper broadcasts the message to the specified whispering stream to all clients except the sender

type NodeOption added in v1.5.0

type NodeOption = func(*Node)

func WithController added in v1.5.0

func WithController(c Controller) NodeOption

func WithID added in v1.5.0

func WithID(id string) NodeOption

func WithInstrumenter added in v1.5.0

func WithInstrumenter(i metrics.Instrumenter) NodeOption

func WithLogger added in v1.5.0

func WithLogger(l *slog.Logger) NodeOption

type NoopDisconnectQueue added in v1.0.1

type NoopDisconnectQueue struct{}

NoopDisconnectQueue is non-operational disconnect queue implementation

func NewNoopDisconnector added in v1.0.1

func NewNoopDisconnector() *NoopDisconnectQueue

NewNoopDisconnector returns new NoopDisconnectQueue

func (*NoopDisconnectQueue) Enqueue added in v1.0.1

func (d *NoopDisconnectQueue) Enqueue(s *Session) error

Enqueue does nothing

func (*NoopDisconnectQueue) Run added in v1.0.1

func (d *NoopDisconnectQueue) Run() error

Run does nothing

func (*NoopDisconnectQueue) Shutdown added in v1.0.1

func (d *NoopDisconnectQueue) Shutdown(ctx context.Context) error

Shutdown does nothing

func (*NoopDisconnectQueue) Size added in v1.0.1

func (d *NoopDisconnectQueue) Size() int

Size returns 0

type NullController added in v1.5.0

type NullController struct {
	// contains filtered or unexported fields
}

func NewNullController added in v1.5.0

func NewNullController(l *slog.Logger) *NullController

func (*NullController) Authenticate added in v1.5.0

func (c *NullController) Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error)

func (*NullController) Disconnect added in v1.5.0

func (c *NullController) Disconnect(sid string, env *common.SessionEnv, ids string, subscriptions []string) error

func (*NullController) Perform added in v1.5.0

func (c *NullController) Perform(sid string, env *common.SessionEnv, ids string, channel string, data string) (*common.CommandResult, error)

func (*NullController) Shutdown added in v1.5.0

func (c *NullController) Shutdown() (err error)

func (*NullController) Start added in v1.5.0

func (c *NullController) Start() (err error)

func (*NullController) Subscribe added in v1.5.0

func (c *NullController) Subscribe(sid string, env *common.SessionEnv, ids string, channel string) (*common.CommandResult, error)

func (*NullController) Unsubscribe added in v1.5.0

func (c *NullController) Unsubscribe(sid string, env *common.SessionEnv, ids string, channel string) (*common.CommandResult, error)

type Session

type Session struct {
	Connected bool
	// Could be used to store arbitrary data within a session
	InternalState map[string]interface{}
	Log           *slog.Logger
	// contains filtered or unexported fields
}

Session represents active client

func NewSession

func NewSession(node *Node, conn Connection, url string, headers *map[string]string, uid string, opts ...SessionOption) *Session

NewSession build a new Session struct from ws connetion and http request

func (*Session) AuthenticateOnConnect added in v1.4.3

func (s *Session) AuthenticateOnConnect() bool

func (*Session) Disconnect

func (s *Session) Disconnect(reason string, code int)

Disconnect schedules connection disconnect

func (*Session) DisconnectNow added in v1.4.4

func (s *Session) DisconnectNow(reason string, code int)

func (*Session) DisconnectWithMessage added in v1.2.3

func (s *Session) DisconnectWithMessage(msg encoders.EncodedMessage, code string)

func (*Session) GetEnv added in v1.1.2

func (s *Session) GetEnv() *common.SessionEnv

func (*Session) GetID added in v1.2.3

func (s *Session) GetID() string

func (*Session) GetIdentifiers added in v1.2.3

func (s *Session) GetIdentifiers() string

func (*Session) IsClosed added in v1.4.4

func (s *Session) IsClosed() bool

func (*Session) IsConnected added in v1.4.0

func (s *Session) IsConnected() bool

func (*Session) IsDisconnectable added in v1.4.0

func (s *Session) IsDisconnectable() bool

func (*Session) IsResumeable added in v1.4.3

func (s *Session) IsResumeable() bool

func (*Session) MarkDisconnectable added in v1.4.0

func (s *Session) MarkDisconnectable(val bool)

func (*Session) MergeEnv added in v1.1.4

func (s *Session) MergeEnv(env *common.SessionEnv)

Merge connection and channel states into current env. This method locks the state for writing (so, goroutine-safe)

func (*Session) PrevSid added in v1.4.0

func (s *Session) PrevSid() string

func (*Session) ReadInternalState added in v1.4.0

func (s *Session) ReadInternalState(key string) (interface{}, bool)

ReadInternalState reads internal state value by key

func (*Session) ReadMessage added in v1.1.0

func (s *Session) ReadMessage(message []byte) error

ReadMessage reads messages from ws connection and send them to node

func (*Session) RestoreFromCache added in v1.4.0

func (s *Session) RestoreFromCache(cached []byte) error

func (*Session) Send

func (s *Session) Send(msg encoders.EncodedMessage)

Send schedules a data transmission

func (*Session) SendJSONTransmission added in v1.1.0

func (s *Session) SendJSONTransmission(msg string)

SendJSONTransmission is used to propagate the direct transmission to the client (from RPC call result)

func (*Session) SendMessages

func (s *Session) SendMessages()

SendMessages waits for incoming messages and send them to the client connection

func (*Session) Serve added in v1.1.0

func (s *Session) Serve(callback func()) error

Serve enters a loop to read incoming data

func (*Session) SetEnv added in v1.1.2

func (s *Session) SetEnv(env *common.SessionEnv)

func (*Session) SetID added in v1.3.0

func (s *Session) SetID(id string)

func (*Session) SetIdentifiers added in v1.2.3

func (s *Session) SetIdentifiers(ids string)

func (*Session) String added in v1.4.5

func (s *Session) String() string

String returns session string representation (for %v in Printf-like functions)

func (*Session) ToCacheEntry added in v1.4.0

func (s *Session) ToCacheEntry() ([]byte, error)

func (*Session) UnderlyingConn added in v1.4.0

func (s *Session) UnderlyingConn() Connection

func (*Session) WriteInternalState added in v1.4.0

func (s *Session) WriteInternalState(key string, val interface{})

WriteInternalState

type SessionOption added in v1.4.0

type SessionOption = func(*Session)

func WithEncoder added in v1.4.3

func WithEncoder(enc encoders.Encoder) SessionOption

WithEncoder allows to set a custom encoder for a session

func WithExecutor added in v1.4.3

func WithExecutor(ex Executor) SessionOption

WithExecutor allows to set a custom executor for a session

func WithHandshakeMessageDeadline added in v1.4.3

func WithHandshakeMessageDeadline(deadline time.Time) SessionOption

WithHandshakeMessageDeadline allows to set a custom deadline for handshake messages. This option also indicates that we MUST NOT perform Authenticate on connect.

func WithMetrics added in v1.4.3

func WithMetrics(m metrics.Instrumenter) SessionOption

WithMetrics allows to set a custom metrics instrumenter for a session

func WithPingInterval added in v1.4.0

func WithPingInterval(interval time.Duration) SessionOption

WithPingInterval allows to set a custom ping interval for a session or disable pings at all (by passing 0)

func WithPingPrecision added in v1.4.3

func WithPingPrecision(val string) SessionOption

WithPingPrecision allows to configure precision for timestamps attached to pings

func WithPongTimeout added in v1.4.3

func WithPongTimeout(timeout time.Duration) SessionOption

WithPongTimeout allows to set a custom pong timeout for a session

func WithPrevSID added in v1.4.3

func WithPrevSID(sid string) SessionOption

WithPrevSID allows providing the previous session ID to restore from

func WithResumable added in v1.4.3

func WithResumable(val bool) SessionOption

WithResumable allows marking session as resumable (so we store its state in cache)

type SubscriptionState added in v1.2.3

type SubscriptionState struct {
	// contains filtered or unexported fields
}

func NewSubscriptionState added in v1.2.3

func NewSubscriptionState() *SubscriptionState

func (*SubscriptionState) AddChannel added in v1.2.3

func (st *SubscriptionState) AddChannel(id string)

func (*SubscriptionState) AddChannelStream added in v1.2.3

func (st *SubscriptionState) AddChannelStream(id string, stream string)

func (*SubscriptionState) Channels added in v1.2.3

func (st *SubscriptionState) Channels() []string

func (*SubscriptionState) HasChannel added in v1.2.3

func (st *SubscriptionState) HasChannel(id string) bool

func (*SubscriptionState) RemoveChannel added in v1.2.3

func (st *SubscriptionState) RemoveChannel(id string)

func (*SubscriptionState) RemoveChannelStream added in v1.2.3

func (st *SubscriptionState) RemoveChannelStream(id string, stream string)

func (*SubscriptionState) RemoveChannelStreams added in v1.2.3

func (st *SubscriptionState) RemoveChannelStreams(id string) []string

func (*SubscriptionState) StreamsFor added in v1.2.3

func (st *SubscriptionState) StreamsFor(id string) []string

func (*SubscriptionState) ToMap added in v1.2.3

func (st *SubscriptionState) ToMap() map[string][]string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL