routing

package
v0.0.0-...-6ec608e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// hash of node_id => Node proto
	NodesKey = "nodes"

	// hash of room_name => node_id
	NodeRoomKey = "room_node_map"
)
View Source
const DefaultMessageChannelSize = 200

Variables

View Source
var (
	ErrNotFound             = errors.New("could not find object")
	ErrIPNotSet             = errors.New("ip address is required and not set")
	ErrHandlerNotDefined    = errors.New("handler not defined")
	ErrIncorrectRTCNode     = errors.New("current node isn't the RTC node for the room")
	ErrNodeNotFound         = errors.New("could not locate the node")
	ErrNodeLimitReached     = errors.New("reached configured limit for node")
	ErrInvalidRouterMessage = errors.New("invalid router message")
	ErrChannelClosed        = errors.New("channel closed")
	ErrChannelFull          = errors.New("channel is full")

	// errors when starting signal connection
	ErrRequestChannelClosed       = errors.New("request channel closed")
	ErrCouldNotMigrateParticipant = errors.New("could not migrate participant")
	ErrClientInfoNotSet           = errors.New("client info not set")
)
View Source
var ErrSignalMessageDropped = errors.New("signal message dropped")
View Source
var ErrSignalWriteFailed = errors.New("signal write failed")

Functions

func CopySignalStreamToMessageChannel

func CopySignalStreamToMessageChannel[SendType, RecvType RelaySignalMessage](
	stream psrpc.Stream[SendType, RecvType],
	ch *MessageChannel,
	reader SignalMessageReader[RecvType],
	config config.SignalRelayConfig,
) error

Types

type LocalNode

type LocalNode *livekit.Node

func NewLocalNode

func NewLocalNode(conf *config.Config) (LocalNode, error)

type LocalRouter

type LocalRouter struct {
	// contains filtered or unexported fields
}

a router of messages on the same node, basic implementation for local testing

func NewLocalRouter

func NewLocalRouter(currentNode LocalNode, signalClient SignalClient) *LocalRouter

func (*LocalRouter) ClearRoomState

func (r *LocalRouter) ClearRoomState(_ context.Context, _ livekit.RoomName) error

func (*LocalRouter) Drain

func (r *LocalRouter) Drain()

func (*LocalRouter) GetNode

func (r *LocalRouter) GetNode(nodeID livekit.NodeID) (*livekit.Node, error)

func (*LocalRouter) GetNodeForRoom

func (r *LocalRouter) GetNodeForRoom(_ context.Context, _ livekit.RoomName) (*livekit.Node, error)

func (*LocalRouter) GetRegion

func (r *LocalRouter) GetRegion() string

func (*LocalRouter) ListNodes

func (r *LocalRouter) ListNodes() ([]*livekit.Node, error)

func (*LocalRouter) RegisterNode

func (r *LocalRouter) RegisterNode() error

func (*LocalRouter) RemoveDeadNodes

func (r *LocalRouter) RemoveDeadNodes() error

func (*LocalRouter) SetNodeForRoom

func (r *LocalRouter) SetNodeForRoom(_ context.Context, _ livekit.RoomName, _ livekit.NodeID) error

func (*LocalRouter) Start

func (r *LocalRouter) Start() error

func (*LocalRouter) StartParticipantSignal

func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)

func (*LocalRouter) StartParticipantSignalWithNodeID

func (r *LocalRouter) StartParticipantSignalWithNodeID(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, nodeID livekit.NodeID) (res StartParticipantSignalResults, err error)

func (*LocalRouter) Stop

func (r *LocalRouter) Stop()

func (*LocalRouter) UnregisterNode

func (r *LocalRouter) UnregisterNode() error

type MessageChannel

type MessageChannel struct {
	// contains filtered or unexported fields
}

func NewDefaultMessageChannel

func NewDefaultMessageChannel(connectionID livekit.ConnectionID) *MessageChannel

func NewMessageChannel

func NewMessageChannel(connectionID livekit.ConnectionID, size int) *MessageChannel

func (*MessageChannel) Close

func (m *MessageChannel) Close()

func (*MessageChannel) ConnectionID

func (m *MessageChannel) ConnectionID() livekit.ConnectionID

func (*MessageChannel) IsClosed

func (m *MessageChannel) IsClosed() bool

func (*MessageChannel) OnClose

func (m *MessageChannel) OnClose(f func())

func (*MessageChannel) ReadChan

func (m *MessageChannel) ReadChan() <-chan proto.Message

func (*MessageChannel) WriteMessage

func (m *MessageChannel) WriteMessage(msg proto.Message) error

type MessageRouter

type MessageRouter interface {
	// StartParticipantSignal participant signal connection is ready to start
	StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)
}

type MessageSink

type MessageSink interface {
	WriteMessage(msg proto.Message) error
	IsClosed() bool
	Close()
	ConnectionID() livekit.ConnectionID
}

MessageSink is an abstraction for writing protobuf messages and having them read by a MessageSource, potentially on a different node via a transport

func NewSignalMessageSink

func NewSignalMessageSink[SendType, RecvType RelaySignalMessage](params SignalSinkParams[SendType, RecvType]) MessageSink

type MessageSource

type MessageSource interface {
	// ReadChan exposes a one way channel to make it easier to use with select
	ReadChan() <-chan proto.Message
	IsClosed() bool
	Close()
	ConnectionID() livekit.ConnectionID
}

type ParticipantInit

type ParticipantInit struct {
	Identity             livekit.ParticipantIdentity
	Name                 livekit.ParticipantName
	Reconnect            bool
	ReconnectReason      livekit.ReconnectReason
	AutoSubscribe        bool
	Client               *livekit.ClientInfo
	Grants               *auth.ClaimGrants
	Region               string
	AdaptiveStream       bool
	ID                   livekit.ParticipantID
	SubscriberAllowPause *bool
}

func ParticipantInitFromStartSession

func ParticipantInitFromStartSession(ss *livekit.StartSession, region string) (*ParticipantInit, error)

func (*ParticipantInit) ToStartSession

func (pi *ParticipantInit) ToStartSession(roomName livekit.RoomName, connectionID livekit.ConnectionID) (*livekit.StartSession, error)

type RedisRouter

type RedisRouter struct {
	*LocalRouter
	// contains filtered or unexported fields
}

RedisRouter uses Redis pub/sub to route signaling messages across different nodes It relies on the RTC node to be the primary driver of the participant connection. Because

func NewRedisRouter

func NewRedisRouter(lr *LocalRouter, rc redis.UniversalClient, kps rpc.KeepalivePubSub) *RedisRouter

func (*RedisRouter) ClearRoomState

func (r *RedisRouter) ClearRoomState(_ context.Context, roomName livekit.RoomName) error

func (*RedisRouter) Drain

func (r *RedisRouter) Drain()

func (*RedisRouter) GetNode

func (r *RedisRouter) GetNode(nodeID livekit.NodeID) (*livekit.Node, error)

func (*RedisRouter) GetNodeForRoom

func (r *RedisRouter) GetNodeForRoom(_ context.Context, roomName livekit.RoomName) (*livekit.Node, error)

func (*RedisRouter) ListNodes

func (r *RedisRouter) ListNodes() ([]*livekit.Node, error)

func (*RedisRouter) RegisterNode

func (r *RedisRouter) RegisterNode() error

func (*RedisRouter) RemoveDeadNodes

func (r *RedisRouter) RemoveDeadNodes() error

func (*RedisRouter) SetNodeForRoom

func (r *RedisRouter) SetNodeForRoom(_ context.Context, roomName livekit.RoomName, nodeID livekit.NodeID) error

func (*RedisRouter) Start

func (r *RedisRouter) Start() error

func (*RedisRouter) StartParticipantSignal

func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)

StartParticipantSignal signal connection sets up paths to the RTC node, and starts to route messages to that message queue

func (*RedisRouter) Stop

func (r *RedisRouter) Stop()

func (*RedisRouter) UnregisterNode

func (r *RedisRouter) UnregisterNode() error

type RelaySignalMessage

type RelaySignalMessage interface {
	proto.Message
	GetSeq() uint64
	GetClose() bool
}

type Router

type Router interface {
	MessageRouter

	RegisterNode() error
	UnregisterNode() error
	RemoveDeadNodes() error

	ListNodes() ([]*livekit.Node, error)

	GetNodeForRoom(ctx context.Context, roomName livekit.RoomName) (*livekit.Node, error)
	SetNodeForRoom(ctx context.Context, roomName livekit.RoomName, nodeId livekit.NodeID) error
	ClearRoomState(ctx context.Context, roomName livekit.RoomName) error

	GetRegion() string

	Start() error
	Drain()
	Stop()
}

Router allows multiple nodes to coordinate the participant session

func CreateRouter

func CreateRouter(rc redis.UniversalClient, node LocalNode, signalClient SignalClient, kps rpc.KeepalivePubSub) Router

type SignalClient

type SignalClient interface {
	ActiveCount() int
	StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, nodeID livekit.NodeID) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error)
}

func NewSignalClient

func NewSignalClient(nodeID livekit.NodeID, bus psrpc.MessageBus, config config.SignalRelayConfig) (SignalClient, error)

type SignalMessageReader

type SignalMessageReader[RecvType RelaySignalMessage] interface {
	Read(msg RecvType) ([]proto.Message, error)
}

type SignalMessageWriter

type SignalMessageWriter[SendType RelaySignalMessage] interface {
	Write(seq uint64, close bool, msgs []proto.Message) SendType
}

type SignalSinkParams

type SignalSinkParams[SendType, RecvType RelaySignalMessage] struct {
	Stream         psrpc.Stream[SendType, RecvType]
	Logger         logger.Logger
	Config         config.SignalRelayConfig
	Writer         SignalMessageWriter[SendType]
	CloseOnFailure bool
	BlockOnClose   bool
	ConnectionID   livekit.ConnectionID
}

type StartParticipantSignalResults

type StartParticipantSignalResults struct {
	ConnectionID        livekit.ConnectionID
	RequestSink         MessageSink
	ResponseSource      MessageSource
	NodeID              livekit.NodeID
	NodeSelectionReason string
}

Directories

Path Synopsis
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL