hub

package module
v0.1.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2024 License: MIT Imports: 12 Imported by: 0

README

hub

Go Reference

hub is a multi-user WebSocket hub based on nhooyr.io/websocket intended for use in chat-like/collaborative programs that need to support connections from multiple clients, and handle message/connection events in a straightforward manner.

hub provides a serialised view of server activity, exposing this through a simple channel-oriented interface. A basic integration can be achieved with a single loop:

for {
    select {
    case conn := <-server.Connections():
        // handle new connection
    case conn := <-server.Disconnections():
        // handle new connection
    case msg := <-server.Incoming():
        // handle incoming message
    }
}

Behind the scenes hub manages all per-connection workflow and state, including authentication, active roster maintenance, incoming message parsing, and outgoing message queueing/encoding.

hub features:

  • channel-based notification of connections, disconnections, and received messages
  • flexible message send operations capable of targetting both logical clients and physical connections
  • pluggable authentication
  • pluggable policies for controlling multiple connections from the same client ID
  • net/http compatible interface
  • full context.Context support

Status

Very much alpha - I'm building hub in parallel with a couple of my own projects, robustness will improve over time.

Example

A fully commented chat example can be found at demo/main.go.

To run:

go build -o demo/run github.com/jaz303/hub/demo
./demo/run

You should now be able to run the demo by accessing http://localhost:8080 in your browser. To auto-connect, append the username query param (e.g. http://localhost:8080/?username=jason).

TODO

  • Rate limiting for outgoing messages?
  • If a receiver can't keep up, is it worth having a policy option to close it, instead of stalling the program?
  • Callback/notification when outgoing message written to socket (buffered channel)
  • Customisable filtering for outgoing message targets

© 2023-2024 Jason Frame

Documentation

Index

Constants

View Source
const (
	EncodeOutgoingMessageFailed = 1 + iota
	WriteOutgoingMessageFailed
	DecodeIncomingMessageFailed
	ReadIncomingMessageFailed
	AcceptPolicyDenied
	ClosedByAcceptPolicy
	HubShuttingDown
	PingFailed
)

Variables

View Source
var ErrSkipMessage = errors.New("skip")

Functions

func Binary added in v0.1.10

func Binary(v any) (websocket.MessageType, error)

Binary indicates that every outgoing message is encoded as binary

func FirstWins

func FirstWins[ID comparable, IM any]() func(conn *Conn[ID, IM], roster *Roster[ID, IM]) ([]*Conn[ID, IM], error)

FirstWins creates a policy that permits a single connection per unique client ID. Any additional connections with the same client ID will be immediately disconnected.

func LastWins

func LastWins[ID comparable, IM any]() func(conn *Conn[ID, IM], roster *Roster[ID, IM]) ([]*Conn[ID, IM], error)

LastWins creates a policy that permits a single connection per unique client ID. Each additional connection with the same client ID will cause its predecessor to be disconnected.

func MultipleClientConnectionsAllowed

func MultipleClientConnectionsAllowed[ID comparable, IM any]() func(conn *Conn[ID, IM], roster *Roster[ID, IM]) ([]*Conn[ID, IM], error)

MultipleClientConnectionsAllowed creates a policy that permits multiple simultaneous connections per unique client ID.

func NullLogger

func NullLogger(v ...any)

NullLogger discards its input

func Text added in v0.1.10

func Text(v any) (websocket.MessageType, error)

Text indicates that every outgoing message is encoded as text

func WriteJSON added in v0.1.10

func WriteJSON(dst io.Writer, msg any) error

WriteJSON encodes each outgoing message using a *json.Encoder

Types

type BufferedChannelQueue added in v0.1.10

type BufferedChannelQueue struct {
	// contains filtered or unexported fields
}

func NewBufferedChannelQueue added in v0.1.10

func NewBufferedChannelQueue(bufferSize int) *BufferedChannelQueue

func (*BufferedChannelQueue) Envelopes added in v0.1.10

func (q *BufferedChannelQueue) Envelopes() <-chan Envelope

func (*BufferedChannelQueue) Put added in v0.1.10

func (q *BufferedChannelQueue) Put(env Envelope) bool

type CloseStatus

type CloseStatus struct {
	StatusCode websocket.StatusCode
	Reason     string
}

CloseStatus represents a WebSocket close status code and reason

func DefaultCloseStatus

func DefaultCloseStatus(cause int, err error) CloseStatus

DefaultCloseStatus provides a basic default mapping of cause to close status.

func MakeCloseStatus

func MakeCloseStatus(sc websocket.StatusCode, r string) CloseStatus

MakeCloseStatus is a helper function for creating a CloseStatus

type Config

type Config[ID comparable, IM any] struct {
	// Tag is an arbitrary string used to identify this hub in log messages.
	// Useful if the application creates multiple hubs.
	// If unspecified, defaults to "hub"
	Tag string

	AcceptOptions *websocket.AcceptOptions

	// Set to non-zero value enable periodic ping from server -> client
	PingInterval time.Duration

	// Callback that returns a status code and reason based on the given
	// cause of closure.
	GetCloseStatus func(cause int, err error) CloseStatus

	SendQueue     SendQueue
	SendQueueSize int

	// Maximum number of outgoing messages that can be queued for
	// any connection. Currently this must be set; an unbounded
	// queue is not supported. This may change in the future.
	PerConnectionSendBufferSize int

	// Authentication callback; this function should use the provided
	// websocket and HTTP request to authenticate the client, returning
	// the user's credentials on success, and a non-zero close status/reason
	// on failure.
	//
	// The first argument is the server/hub context; the callback should abort
	// if this context is cancelled before completion. In this situation it
	// is not necessary to return a meaningful close status as one will be
	// selected by the hub.
	//
	// If the HTTP request's context is cancelled it is the responsibility of
	// the callback to provide a meaningful close status.
	Authenticate func(context.Context, *websocket.Conn, *http.Request) (ID, any, CloseStatus)

	// Callback to determine if a new, authenticated connection should be
	// accepted for registration. Use this function to implement connection
	// policies for multiple instances of a single client ID, based on
	// inspection of the provided Roster.
	//
	// First return value is a list of pre-existing connections
	// that should be cancelled - this can be used, for example, to terminate
	// any existing connections with a matching client ID.
	//
	// Second return value indicates whether conn should be registered with
	// the server; nil indicates that the connection is accepted, non-nil
	// that it is rejected. The default close status generator uses the
	// error's string representation as the socket close reason.
	//
	// This function is called from the server's main loop so should complete
	// quickly. The provided Roster instance is valid for this invocation only
	// and must not be retained for use elsewhere as it is not threadsafe.
	Accept func(conn *Conn[ID, IM], roster *Roster[ID, IM]) ([]*Conn[ID, IM], error)

	// Decode an incoming message into an instance of IM
	ReadIncomingMessage func(websocket.MessageType, io.Reader) (IM, error)

	// Get the websocket message type for the outgoing message
	OutgoingMessageType func(any) (websocket.MessageType, error)

	// Write an outgoing message
	WriteOutgoingMessage func(io.Writer, any) error

	// Logger - defaults to log.Println(). Use hub.NullLogger to silence output.
	Logger func(...any)
}

Config defines a Hub's configuration.

type Conn

type Conn[ID comparable, IM any] struct {
	// contains filtered or unexported fields
}

Conn represents a connection to the Hub

func (*Conn[ID, IM]) ClientID

func (c *Conn[ID, IM]) ClientID() ID

ClientID returns the client ID associated with this connection. Depending on the Hub's multiple-client policy, multiple connections with the same client ID may be allowed.

func (*Conn[ID, IM]) ClientInfo

func (c *Conn[ID, IM]) ClientInfo() any

ClientInfo returns the client info associated with the connection, as returned by the Authenticate callback. This value is not used by Hub in any way.

func (*Conn[ID, IM]) ConnectionID

func (c *Conn[ID, IM]) ConnectionID() uint64

ConnectionID returns the connection's unique ID

func (*Conn[ID, IM]) Context added in v0.1.2

func (c *Conn[ID, IM]) Context() context.Context

Context() returns the context associated with this connection. The context is valid until the connection terminates, at which point it will be cancelled.

func (*Conn[ID, IM]) String

func (c *Conn[ID, IM]) String() string

type Envelope added in v0.1.10

type Envelope struct {
	// contains filtered or unexported fields
}

Envelope represents an outgoing message along with its destination(s)

type Hub

type Hub[ID comparable, IM any] struct {
	// contains filtered or unexported fields
}

func New

func New[ID comparable, IM any](ctx context.Context, cfg *Config[ID, IM]) *Hub[ID, IM]

New creates a new Hub with the given config, and bound to the given context.

func (*Hub[ID, IM]) Broadcast added in v0.1.10

func (s *Hub[ID, IM]) Broadcast(msg any) bool

Broadcast enqueus a message to be sent to all connections. Returns true on success, false if the message could not be enqueued.

func (*Hub[ID, IM]) Connections

func (s *Hub[ID, IM]) Connections() <-chan *Conn[ID, IM]

Connections returns a channel that will receive each new Hub connection

func (*Hub[ID, IM]) Disconnections

func (s *Hub[ID, IM]) Disconnections() <-chan *Conn[ID, IM]

Connections returns a channel that will receive each Hub disconnection

func (*Hub[ID, IM]) HandleConnection

func (s *Hub[ID, IM]) HandleConnection(w http.ResponseWriter, r *http.Request)

HandleConnection is an http.HandlerFunc; integrations should route their WebSocket endpoint to this method.

func (*Hub[ID, IM]) Incoming

func (s *Hub[ID, IM]) Incoming() <-chan IncomingMessage[ID, IM]

Incoming returns a channel that will receive messages received from the Hub's connections

func (*Hub[ID, IM]) SendToClient

func (s *Hub[ID, IM]) SendToClient(clientID ID, msg any) bool

SendToClient enqueues a message for sending to a single client. If there are multiple active connections for the given client ID, the message will be relayed to each of them. Returns true on success, false if the message could not be enqueued.

func (*Hub[ID, IM]) SendToClients

func (s *Hub[ID, IM]) SendToClients(clientIDs []ID, msg any) bool

SendToClients enqueues a message for sending to multiple clients. If there are multiple active connections for any given client ID, the message will be relayed to each of them. Returns true on success, false if the message could not be enqueued.

func (*Hub[ID, IM]) SendToConnection

func (s *Hub[ID, IM]) SendToConnection(conn *Conn[ID, IM], msg any) bool

SendToConnection enqueues a message for sending to a single connection. Returns true on success, false if the message could not be enqueued.

func (*Hub[ID, IM]) SendToConnections

func (s *Hub[ID, IM]) SendToConnections(conns []*Conn[ID, IM], msg any) bool

SendToConnections enqueues a message for sending to a multiple connections. Returns true on success, false if the message could not be enqueued.

func (*Hub[ID, IM]) Start

func (s *Hub[ID, IM]) Start()

Start starts the Hub in the background; the Hub will continue running until the context that was passed to New is cancelled.

Once started, programs should read continuously from the Connections(), Disconnections(), and Incoming() channels in order to prevent stalling. Additionally, Start should be called before starting the HTTP server that delegates connections to the Hub.

type IncomingMessage

type IncomingMessage[ID comparable, IM any] struct {
	ReceivedAt time.Time     // Time message was received
	Conn       *Conn[ID, IM] // Connection from which message was received
	Msg        IM            // Decoded message
}

IncomingMessage represents a message received from a Hub connection.

type Roster

type Roster[ID comparable, IM any] struct {
	// contains filtered or unexported fields
}

Roster is an index of all active connections to a Hub. Rosters are not threadsafe and must only be used by integrating programs in the callbacks to which they are provided.

func NewRoster

func NewRoster[ID comparable, IM any]() *Roster[ID, IM]

NewRoster creates a new Roster.

func (*Roster[ID, IM]) Add

func (r *Roster[ID, IM]) Add(conn *Conn[ID, IM])

Add adds a connection to the Roster

func (*Roster[ID, IM]) ClientConnections

func (r *Roster[ID, IM]) ClientConnections(id ID) []*Conn[ID, IM]

ClientConnections returns a slice containing all connections associated with a given client ID

func (*Roster[ID, IM]) Remove

func (r *Roster[ID, IM]) Remove(conn *Conn[ID, IM])

Remove removes a connection from the Roster

type RunnableSendQueue added in v0.1.10

type RunnableSendQueue interface {
	SendQueue
	Run(ctx context.Context)
}

type SendQueue added in v0.1.10

type SendQueue interface {
	Envelopes() <-chan Envelope
	Put(env Envelope) bool
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL