janus

package module
v0.0.0-...-246116b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2023 License: MIT Imports: 10 Imported by: 0

README

cameronelliott/janus-go

A Go language Websocket based package for controlling the Janus WebRTC gateway

Vision

I would like this library to be come:

  • trusted
  • stable
  • reliable

For the purposes of controlling Janus in production environments.

Please join me by providing feedback and Pull Requests, praise, constructive criticism, whatever you like.

Prinicpals guiding the effort to make this super trustworthy and stable

  1. Be humble. I make mistakes, I invite and encourage your review and constructive critism on how to improve this lib.
  2. Use a number of articles and blog posts as guides for the prinicpals behind how this library works.
  3. Outline the techniques used to guide the code toward robustness and trustworthyness.
  4. Provide a real world example of usage.

Articles used to create the guidelines for stability and robustness

Why You Should Be Using errgroup... Blum

Managing Groups of Goroutines in Go. Block

Defer, Panic, and Recover/Gerrand

Notable packages

nhooyr/websocket: used for websockets, the big win here IMHO is context.Context which supports cancellation.

golang.org/x/sync/errgroup: may or may not be included on this package in the future, but can provide a very useful tool for propagating error values between goroutines and their creators.

Techniques used for the stability and robustness of this project

  1. Switch from github.com/gorilla/websocket to github.com/nhooyr.io/websocket, the reason being is that nhooyr.io supports context.Context for many websocket operations. This is a big win for reliable concurrency in many situations. See comment/link below about Google policy requiring use of context.Context
  2. Propagate all non-nil errors up the call stack, AND up the goroutine chain. This should mean no ignored, lost, swallowed errors.
  3. Support the use of this package golang.org/x/sync/errgroup. This is how error values can be returned from goroutines to their parents. Links to articles about Structured Concurrency below.
  4. Writers to channels should use defer close(channelx) to close channels, especially where nested goroutine lifetimes occur. (Structured Conncurrency)
  5. Use context.Context on call paths from Http requests (see google blog post below)
  6. Use a lint or static analysis tool to make sure there are no missed errors
  7. Use best practices for logging levels. Dave Cheyney has good articles on this.

Techniques not used

  1. Share by communicating,
    this is a great approach for robust concurrency, but it requires more changes to the original than I am comfortable making. It would also break the API for existing code. So we are going to live with the mutexs and locking in the code.

Google blog post about usage of context.context inside request handlers

An old Google blog post said this about their policy about context.Context:

At Google, we require that Go programmers pass a Context parameter as the first argument to every function on the call path between incoming and outgoing requests. This allows Go code developed by many different teams to interoperate well. It provides simple control over timeouts and cancelation and ensures that critical values like security credentials transit Go programs properly. Go Google Blog

Credits

Original work by https://github.com/notedit In addition to this popular library, he has a lot of cool WebRTC related software, and stuff for the Medooze media server, another awesome SFU like Janus. Check his stuff out! 😄

Documentation

Overview

Package janus is a Golang implementation of the Janus API, used to interact with the Janus WebRTC Gateway.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WaitForGroup

func WaitForGroup(g *errgroup.Group) error

WaitForGroup is an example of wait and catch errors from the Connect() function

Types

type AckMsg

type AckMsg struct{}

type BaseMsg

type BaseMsg struct {
	Type    string `json:"janus"`
	ID      string `json:"transaction"`
	Session uint64 `json:"session_id"`
	Handle  uint64 `json:"sender"`
}

type DetachedMsg

type DetachedMsg struct{}

type ErrorData

type ErrorData struct {
	Code   int
	Reason string
}

type ErrorMsg

type ErrorMsg struct {
	Err ErrorData `json:"error"`
}

func (*ErrorMsg) Error

func (err *ErrorMsg) Error() string

type EventMsg

type EventMsg struct {
	Plugindata PluginData
	Jsep       map[string]interface{}
	Session    uint64 `json:"session_id"`
	Handle     uint64 `json:"sender"`
}

type Gateway

type Gateway struct {
	// Sessions is a map of the currently active sessions to the gateway.
	Sessions map[uint64]*Session

	// Access to the Sessions map should be synchronized with the Gateway.Lock()
	// and Gateway.Unlock() methods provided by the embeded sync.Mutex.
	sync.Mutex

	// LogJsonMessages enables logging of json rx/tx messages to stdout
	LogJsonMessages bool
	// contains filtered or unexported fields
}

Gateway represents a connection to an instance of the Janus Gateway.

func Connect

func Connect(ctx context.Context, wsURL string) (*Gateway, *errgroup.Group, error)

Connect initiates a websock connection with the Janus Gateway

It will also spawn two goroutines to maintain the connection One is for sending Websocket ping messages periodically The other is for reading messages and passing to the right channel

These two goroutines are added to an errgroup.Group which you can ignore if you like: gateway, _, err := janus.Connect or even better you can use the Wait() method to wait for these methods, AND CATCH ANY ERRORS THAT OCCUR inside of them. The readme has links to more info on errgroup.

func (*Gateway) Close

func (gateway *Gateway) Close(code websocket.StatusCode, reason string) error

Close closes the underlying connection to the Gateway.

func (*Gateway) Create

func (gateway *Gateway) Create(ctx context.Context, token string) (*Session, error)

Create sends a create request to the Gateway. On success, a new Session will be returned and error will be nil.

func (*Gateway) Info

func (gateway *Gateway) Info(ctx context.Context, token string) (*InfoMsg, error)

Info sends an info request to the Gateway. On success, an InfoMsg will be returned and error will be nil.

type Handle

type Handle struct {
	// ID is the handle_id of this plugin handle
	ID uint64

	// Type   // pub  or sub
	Type string

	//User   // Userid
	User string

	// Events is a receive only channel that can be used to receive events
	// related to this handle from the gateway.
	Events chan interface{}
	// contains filtered or unexported fields
}

Handle represents a handle to a plugin instance on the Gateway.

func (*Handle) Detach

func (handle *Handle) Detach(ctx context.Context, token string) (*AckMsg, error)

Detach sends a detach request to the Gateway to remove this handle. On success, an AckMsg will be returned and error will be nil.

func (*Handle) Message

func (handle *Handle) Message(ctx context.Context, token string, body, jsep interface{}) (*EventMsg, error)

Message sends a message request to a plugin handle on the Gateway. body should be the plugin data to be passed to the plugin, and jsep should contain an optional SDP offer/answer to establish a WebRTC PeerConnection. On success, an EventMsg will be returned and error will be nil.

func (*Handle) Request

func (handle *Handle) Request(ctx context.Context, token string, body interface{}) (*SuccessMsg, error)

Request sends a sync request

func (*Handle) Trickle

func (handle *Handle) Trickle(ctx context.Context, token string, candidate interface{}) (*AckMsg, error)

Trickle sends a trickle request to the Gateway as part of establishing a new PeerConnection with a plugin. candidate should be a single ICE candidate, or a completed object to signify that all candidates have been sent:

{
	"completed": true
}

On success, an AckMsg will be returned and error will be nil.

func (*Handle) TrickleMany

func (handle *Handle) TrickleMany(ctx context.Context, token string, candidates interface{}) (*AckMsg, error)

TrickleMany sends a trickle request to the Gateway as part of establishing a new PeerConnection with a plugin. candidates should be an array of ICE candidates. On success, an AckMsg will be returned and error will be nil.

type HangupMsg

type HangupMsg struct {
	Reason  string
	Session uint64 `json:"session_id"`
	Handle  uint64 `json:"sender"`
}

type InfoMsg

type InfoMsg struct {
	Name          string
	Version       int
	VersionString string `json:"version_string"`
	Author        string
	DataChannels  bool   `json:"data_channels"`
	IPv6          bool   `json:"ipv6"`
	LocalIP       string `json:"local-ip"`
	IceTCP        bool   `json:"ice-tcp"`
	Transports    map[string]PluginInfo
	Plugins       map[string]PluginInfo
}

type MediaMsg

type MediaMsg struct {
	Type      string
	Receiving bool
}

type PluginData

type PluginData struct {
	Plugin string
	Data   map[string]interface{}
}

type PluginInfo

type PluginInfo struct {
	Name          string
	Author        string
	Description   string
	Version       int
	VersionString string `json:"version_string"`
}

type Session

type Session struct {
	// ID is the session_id of this session
	ID uint64

	// Handles is a map of plugin handles within this session
	Handles map[uint64]*Handle

	Events chan interface{}

	// Access to the Handles map should be synchronized with the Session.Lock()
	// and Session.Unlock() methods provided by the embeded sync.Mutex.
	sync.Mutex
	// contains filtered or unexported fields
}

Session represents a session instance on the Janus Gateway.

func (*Session) Attach

func (session *Session) Attach(ctx context.Context, token, plugin string) (*Handle, error)

Attach sends an attach request to the Gateway within this session. plugin should be the unique string of the plugin to attach to. On success, a new Handle will be returned and error will be nil.

func (*Session) Destroy

func (session *Session) Destroy(ctx context.Context, token string) (*AckMsg, error)

Destroy sends a destroy request to the Gateway to tear down this session. On success, the Session will be removed from the Gateway.Sessions map, an AckMsg will be returned and error will be nil.

func (*Session) KeepAlive

func (session *Session) KeepAlive(ctx context.Context, token string) (*AckMsg, error)

KeepAlive sends a keep-alive request to the Gateway. On success, an AckMsg will be returned and error will be nil.

func (*Session) KeepAliveSender

func (session *Session) KeepAliveSender(ctx context.Context) error

KeepAliveSender will send keepalive messages every 20 seconds for the session

type SlowLinkMsg

type SlowLinkMsg struct {
	Uplink bool
	Lost   int64
}

type SuccessData

type SuccessData struct {
	ID uint64
}

type SuccessMsg

type SuccessMsg struct {
	Data       SuccessData
	PluginData PluginData
	Session    uint64 `json:"session_id"`
	Handle     uint64 `json:"sender"`
}

type TimeoutMsg

type TimeoutMsg struct {
	Session uint64 `json:"session_id"`
}

type WebRTCUpMsg

type WebRTCUpMsg struct {
	Session uint64 `json:"session_id"`
	Handle  uint64 `json:"sender"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL