routing

package
v1.10.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 13, 2021 License: Apache-2.0 Imports: 18 Imported by: 9

Documentation

Index

Constants

View Source
const (
	// hash of node_id => Node proto
	NodesKey = "nodes"

	// hash of room_name => node_id
	NodeRoomKey = "room_node_map"
)

Variables

View Source
var (
	ErrNotFound          = errors.New("could not find object")
	ErrIPNotSet          = errors.New("ip address is required and not set")
	ErrHandlerNotDefined = errors.New("handler not defined")
	ErrNoAvailableNodes  = errors.New("could not find any available nodes")
	ErrIncorrectRTCNode  = errors.New("current node isn't the RTC node for the room")
	ErrNodeNotFound      = errors.New("could not locate the node")

	ErrChannelClosed = errors.New("channel closed")
	ErrChannelFull   = errors.New("channel is full")
)

Functions

func GetAvailableNodes

func GetAvailableNodes(nodes []*livekit.Node) []*livekit.Node

func HashedID

func HashedID(id string) string

Creates a hashed ID from a unique string

func IsAvailable

func IsAvailable(node *livekit.Node) bool

checks if a node has been updated recently to be considered for selection

Types

type LocalNode

type LocalNode *livekit.Node

func NewLocalNode

func NewLocalNode(conf *config.Config) (LocalNode, error)

type LocalRouter

type LocalRouter struct {
	// contains filtered or unexported fields
}

a router of messages on the same node, basic implementation for local testing

func NewLocalRouter

func NewLocalRouter(currentNode LocalNode) *LocalRouter

func (*LocalRouter) ClearRoomState

func (r *LocalRouter) ClearRoomState(roomName string) error

func (*LocalRouter) CreateRTCSink

func (r *LocalRouter) CreateRTCSink(roomName, identity string) (MessageSink, error)

func (*LocalRouter) GetNode

func (r *LocalRouter) GetNode(nodeId string) (*livekit.Node, error)

func (*LocalRouter) GetNodeForRoom

func (r *LocalRouter) GetNodeForRoom(roomName string) (*livekit.Node, error)

func (*LocalRouter) ListNodes

func (r *LocalRouter) ListNodes() ([]*livekit.Node, error)

func (*LocalRouter) OnNewParticipantRTC

func (r *LocalRouter) OnNewParticipantRTC(callback NewParticipantCallback)

func (*LocalRouter) OnRTCMessage

func (r *LocalRouter) OnRTCMessage(callback RTCMessageCallback)

func (*LocalRouter) RegisterNode

func (r *LocalRouter) RegisterNode() error

func (*LocalRouter) RemoveDeadNodes

func (r *LocalRouter) RemoveDeadNodes() error

func (*LocalRouter) SetNodeForRoom

func (r *LocalRouter) SetNodeForRoom(roomName string, nodeId string) error

func (*LocalRouter) Start

func (r *LocalRouter) Start() error

func (*LocalRouter) StartParticipantSignal

func (r *LocalRouter) StartParticipantSignal(roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error)

func (*LocalRouter) Stop

func (r *LocalRouter) Stop()

func (*LocalRouter) UnregisterNode

func (r *LocalRouter) UnregisterNode() error

type MessageChannel

type MessageChannel struct {
	// contains filtered or unexported fields
}

func NewMessageChannel

func NewMessageChannel() *MessageChannel

func (*MessageChannel) Close

func (m *MessageChannel) Close()

func (*MessageChannel) OnClose

func (m *MessageChannel) OnClose(f func())

func (*MessageChannel) ReadChan

func (m *MessageChannel) ReadChan() <-chan proto.Message

func (*MessageChannel) WriteMessage

func (m *MessageChannel) WriteMessage(msg proto.Message) error

type MessageSink

type MessageSink interface {
	WriteMessage(msg proto.Message) error
	Close()
	OnClose(f func())
}

MessageSink is an abstraction for writing protobuf messages and having them read by a MessageSource, potentially on a different node via a transport

type MessageSource

type MessageSource interface {
	// ReadChan exposes a one way channel to make it easier to use with select
	ReadChan() <-chan proto.Message
}

type NewParticipantCallback

type NewParticipantCallback func(roomName string, pi ParticipantInit, requestSource MessageSource, responseSink MessageSink)

type NodeSelector

type NodeSelector interface {
	SelectNode(nodes []*livekit.Node, room *livekit.Room) (*livekit.Node, error)
}

NodeSelector selects an appropriate node to run the current session

type NodeStats

type NodeStats struct {
	NumRooms         int32
	NumClients       int32
	NumVideoChannels int32
	NumAudioChannels int32
	BytesPerMin      int64
}

type ParticipantInit

type ParticipantInit struct {
	Identity        string
	Metadata        string
	Reconnect       bool
	Permission      *livekit.ParticipantPermission
	ProtocolVersion int32
	UsePlanB        bool
	AutoSubscribe   bool
}

type RTCMessageCallback

type RTCMessageCallback func(roomName, identity string, msg *livekit.RTCNodeMessage)

type RTCNodeSink

type RTCNodeSink struct {
	// contains filtered or unexported fields
}

func NewRTCNodeSink

func NewRTCNodeSink(rc *redis.Client, nodeId, participantKey string) *RTCNodeSink

func (*RTCNodeSink) Close

func (s *RTCNodeSink) Close()

func (*RTCNodeSink) OnClose

func (s *RTCNodeSink) OnClose(f func())

func (*RTCNodeSink) WriteMessage

func (s *RTCNodeSink) WriteMessage(msg proto.Message) error

type RandomSelector

type RandomSelector struct {
}

func (*RandomSelector) SelectNode

func (s *RandomSelector) SelectNode(nodes []*livekit.Node, room *livekit.Room) (*livekit.Node, error)

type RedisRouter

type RedisRouter struct {
	LocalRouter
	// contains filtered or unexported fields
}

RedisRouter uses Redis pub/sub to route signaling messages across different nodes It relies on the RTC node to be the primary driver of the participant connection. Because

func NewRedisRouter

func NewRedisRouter(currentNode LocalNode, rc *redis.Client) *RedisRouter

func (*RedisRouter) ClearRoomState

func (r *RedisRouter) ClearRoomState(roomName string) error

func (*RedisRouter) CreateRTCSink

func (r *RedisRouter) CreateRTCSink(roomName, identity string) (MessageSink, error)

func (*RedisRouter) GetNode

func (r *RedisRouter) GetNode(nodeId string) (*livekit.Node, error)

func (*RedisRouter) GetNodeForRoom

func (r *RedisRouter) GetNodeForRoom(roomName string) (*livekit.Node, error)

func (*RedisRouter) ListNodes

func (r *RedisRouter) ListNodes() ([]*livekit.Node, error)

func (*RedisRouter) RegisterNode

func (r *RedisRouter) RegisterNode() error

func (*RedisRouter) RemoveDeadNodes

func (r *RedisRouter) RemoveDeadNodes() error

func (*RedisRouter) SetNodeForRoom

func (r *RedisRouter) SetNodeForRoom(roomName string, nodeId string) error

func (*RedisRouter) Start

func (r *RedisRouter) Start() error

func (*RedisRouter) StartParticipantSignal

func (r *RedisRouter) StartParticipantSignal(roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error)

StartParticipantSignal signal connection sets up paths to the RTC node, and starts to route messages to that message queue

func (*RedisRouter) Stop

func (r *RedisRouter) Stop()

func (*RedisRouter) UnregisterNode

func (r *RedisRouter) UnregisterNode() error

type Router

type Router interface {
	GetNodeForRoom(roomName string) (*livekit.Node, error)
	SetNodeForRoom(roomName string, nodeId string) error
	ClearRoomState(roomName string) error
	RegisterNode() error
	UnregisterNode() error
	RemoveDeadNodes() error
	GetNode(nodeId string) (*livekit.Node, error)
	ListNodes() ([]*livekit.Node, error)

	// StartParticipantSignal participant signal connection is ready to start
	StartParticipantSignal(roomName string, pi ParticipantInit) (connectionId string, reqSink MessageSink, resSource MessageSource, err error)

	// CreateRTCSink sends a message to RTC node
	CreateRTCSink(roomName, identity string) (MessageSink, error)

	// OnNewParticipantRTC is called to start a new participant's RTC connection
	OnNewParticipantRTC(callback NewParticipantCallback)

	// OnRTCMessage is called to execute actions on the RTC node
	OnRTCMessage(callback RTCMessageCallback)

	Start() error
	Stop()
}

Router allows multiple nodes to coordinate the participant session

type SignalNodeSink

type SignalNodeSink struct {
	// contains filtered or unexported fields
}

func NewSignalNodeSink

func NewSignalNodeSink(rc *redis.Client, nodeId, connectionId string) *SignalNodeSink

func (*SignalNodeSink) Close

func (s *SignalNodeSink) Close()

func (*SignalNodeSink) OnClose

func (s *SignalNodeSink) OnClose(f func())

func (*SignalNodeSink) WriteMessage

func (s *SignalNodeSink) WriteMessage(msg proto.Message) error

Directories

Path Synopsis
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL