rtmp

package module
v0.0.0-...-5278192 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2021 License: MIT Imports: 16 Imported by: 0

README

RTMP Server

RTMP server written in Go (Golang) that allows stream publishing.

Install

go get github.com/ZachGill/rtmp

How to start your RTMP server

Start up a server for the ingestion/playback of an RTMP stream (default port is 1935):

package main

import (
	"github.com/ZachGill/rtmp"
	"log"
)

func main() {
	server := &rtmp.Server{}
	log.Fatal(server.Run())
}

You can also create a client to listen for events on a stream (eg: audio, video, and metadata events) so you can do further processing on the media that is being streamed:

package main

import (
	"fmt"
	"github.com/ZachGill/rtmp"
	"github.com/ZachGill/rtmp/audio"
	"github.com/ZachGill/rtmp/video"
	"log"
)

func OnAudio(format audio.Format, sampleRate audio.SampleRate, sampleSize audio.SampleSize, channels audio.Channel, payload []byte, timestamp uint32) {
	fmt.Println("client: on audio")
}

func OnVideo(frameType video.FrameType, codec video.Codec, payload []byte, timestamp uint32) {
	fmt.Println("client: on video")
}

func OnMetadata(metadata map[string]interface{}) {
	fmt.Printf("client: on metadata: %+v", metadata) 
}

func main() {
	// Specify audio, video and metadata callbacks
	client := &rtmp.Client{
		OnAudio: OnAudio,
		OnVideo: OnVideo,
		OnMetadata: OnMetadata,
	}

	log.Fatal(client.Connect("rtmp://192.168.1.2/app/publish"))
}

To view other options accepted by the Server and Client structs, look at the examples directory.

Additional notes

  • This is a work in progress.

Documentation

Index

Constants

View Source
const (
	ChunkType0 uint8 = 0
	ChunkType1 uint8 = 1
	ChunkType2 uint8 = 2
	ChunkType3 uint8 = 3
)

Chunk types

View Source
const (
	// Only Protocol Channel is defined in the spec (csid = 2), the others are defined by me with the idea of being
	// consistent in sending the same type of data through the same chunk stream id
	ProtocolChannel uint8 = 2
	AudioChannel    uint8 = 4
)
View Source
const (
	LimitHard    uint8 = 0
	LimitSoft    uint8 = 1
	LimitDynamic uint8 = 2
	// Not part of the spec, it's for our internal use when a LimitDynamic message comes in
	LimitNotSet uint8 = 3
)
View Source
const (
	// Control messages MUST have message stream ID 0 and be sent in chunk stream ID 2
	SetChunkSize     = 1
	AbortMessage     = 2
	Ack              = 3
	WindowAckSize    = 5
	SetPeerBandwidth = 6

	UserControlMessage = 4
)

Control message types

View Source
const (
	CommandMessageAMF0 = 20
	CommandMessageAMF3 = 17

	DataMessageAMF0 = 18
	DataMessageAMF3 = 15

	SharedObjectMessageAMF0 = 19
	SharedObjectMessageAMF3 = 16

	AudioMessage     = 8
	VideoMessage     = 9
	AggregateMessage = 22
)

Types of messages and commands

View Source
const DefaultMaximumChunkSize = 128
View Source
const (
	EventStreamBegin uint16 = 0
)
View Source
const NetConnectionSucces = "NetConnection.Connect.Success"
View Source
const RtmpVersion3 = 3

Variables

View Source
var ErrHandshakeAlreadyCompleted error = errors.New("invalid call to perform handshake, attempted to perform " +
	"handshake more than once")
View Source
var ErrInvalidScheme error = errors.New("invalid scheme in URL")
View Source
var ErrUnsupportedRTMPVersion error = errors.New("The version of RTMP is not supported")
View Source
var ErrWrongC2Message error = errors.New("server handshake: s1 and c2 handshake messages do not match")
View Source
var ErrWrongS2Message error = errors.New("client handshake: c1 and s2 handshake messages do not match")
View Source
var InvalidChunkType error = errors.New("chunk handler: unknown chunk type")
View Source
var StreamNotFound error = errors.New("StreamNotFound")

Functions

This section is empty.

Types

type AudioCallback

type AudioCallback func(format audio.Format, sampleRate audio.SampleRate, sampleSize audio.SampleSize, channels audio.Channel, payload []byte, timestamp uint32)

type Broadcaster

type Broadcaster struct {
	// contains filtered or unexported fields
}

func NewBroadcaster

func NewBroadcaster(context ContextStore) *Broadcaster

func (*Broadcaster) BroadcastAudio

func (b *Broadcaster) BroadcastAudio(streamKey string, audio []byte, timestamp uint32) error

func (*Broadcaster) BroadcastEndOfStream

func (b *Broadcaster) BroadcastEndOfStream(streamKey string)

func (*Broadcaster) BroadcastMetadata

func (b *Broadcaster) BroadcastMetadata(streamKey string, metadata map[string]interface{}) error

func (*Broadcaster) DestroyPublisher

func (b *Broadcaster) DestroyPublisher(streamKey string) error

func (*Broadcaster) DestroySubscriber

func (b *Broadcaster) DestroySubscriber(streamKey string, sessionID string) error

func (*Broadcaster) GetAacSequenceHeaderForPublisher

func (b *Broadcaster) GetAacSequenceHeaderForPublisher(streamKey string) []byte

func (*Broadcaster) GetAvcSequenceHeaderForPublisher

func (b *Broadcaster) GetAvcSequenceHeaderForPublisher(streamKey string) []byte

func (*Broadcaster) RegisterPublisher

func (b *Broadcaster) RegisterPublisher(streamKey string) error

func (*Broadcaster) RegisterSubscriber

func (b *Broadcaster) RegisterSubscriber(streamKey string, subscriber Subscriber) error

func (*Broadcaster) SetAacSequenceHeaderForPublisher

func (b *Broadcaster) SetAacSequenceHeaderForPublisher(streamKey string, payload []byte)

func (*Broadcaster) SetAvcSequenceHeaderForPublisher

func (b *Broadcaster) SetAvcSequenceHeaderForPublisher(streamKey string, payload []byte)

func (*Broadcaster) StreamExists

func (b *Broadcaster) StreamExists(streamKey string) bool

type Chunk

type Chunk struct {
	Header *ChunkHeader
	Body   *ChunkData
}

type ChunkBasicHeader

type ChunkBasicHeader struct {
	// Chunk type
	FMT           uint8
	ChunkStreamID uint32
}

type ChunkData

type ChunkData struct {
	// contains filtered or unexported fields
}

type ChunkHandler

type ChunkHandler struct {
	// contains filtered or unexported fields
}

Chunk handler is in charge of reading chunk headers and data. It will assemble a message from multiple chunks if it has to.

func NewChunkHandler

func NewChunkHandler(reader *bufio.Reader, writer *bufio.Writer) *ChunkHandler

func (*ChunkHandler) ReadChunkData

func (chunkHandler *ChunkHandler) ReadChunkData(header ChunkHeader) (payload []byte, n int, err error)

func (*ChunkHandler) ReadChunkHeader

func (chunkHandler *ChunkHandler) ReadChunkHeader() (ch ChunkHeader, n int, err error)

func (*ChunkHandler) SetBandwidth

func (chunkHandler *ChunkHandler) SetBandwidth(size uint32, limitType uint8)

func (*ChunkHandler) SetChunkSize

func (chunkHandler *ChunkHandler) SetChunkSize(size uint32)

func (*ChunkHandler) SetWindowAckSize

func (chunkHandler *ChunkHandler) SetWindowAckSize(size uint32)

Sets the window acknowledgement size to the new size

type ChunkHeader

type ChunkHeader struct {
	BasicHeader       *ChunkBasicHeader
	MessageHeader     *ChunkMessageHeader
	ExtendedTimestamp uint32
	// Total elapsed time = timestamp + deltas
	ElapsedTime uint32
}

type ChunkMessageHeader

type ChunkMessageHeader struct {
	// Absolute timestamp of the message (if ChunkHeader.BasicHeader.FMT == 0, chunk type 0), or the timestamp delta if other type of chunk
	Timestamp       uint32
	MessageLength   uint32
	MessageTypeID   uint8
	MessageStreamID uint32
}

type Client

type Client struct {
	OnAudio    AudioCallback
	OnMetadata MetadataCallback
	// contains filtered or unexported fields
}

func (*Client) Connect

func (c *Client) Connect(addr string) error

type ContextStore

type ContextStore interface {
	RegisterPublisher(streamKey string) error
	DestroyPublisher(streamKey string) error
	RegisterSubscriber(streamKey string, subscriber Subscriber) error
	GetSubscribersForStream(streamKey string) ([]Subscriber, error)
	DestroySubscriber(streamKey string, sessionID string) error
	StreamExists(streamKey string) bool
	SetAvcSequenceHeaderForPublisher(streamKey string, payload []byte)
	GetAvcSequenceHeaderForPublisher(streamKey string) []byte
	SetAacSequenceHeaderForPublisher(streamKey string, payload []byte)
	GetAacSequenceHeaderForPublisher(streamKey string) []byte
	GetStreams() map[string][]Subscriber
}

type Handshaker

type Handshaker struct {
	// contains filtered or unexported fields
}

func NewHandshaker

func NewHandshaker(reader *bufio.Reader, writer *bufio.Writer) *Handshaker

func (*Handshaker) ClientHandshake

func (h *Handshaker) ClientHandshake() error

func (*Handshaker) Handshake

func (h *Handshaker) Handshake() error

type InMemoryContext

type InMemoryContext struct {
	ContextStore
	// contains filtered or unexported fields
}

func NewInMemoryContext

func NewInMemoryContext() *InMemoryContext

func (*InMemoryContext) DestroyPublisher

func (c *InMemoryContext) DestroyPublisher(streamKey string) error

func (*InMemoryContext) DestroySubscriber

func (c *InMemoryContext) DestroySubscriber(streamKey string, sessionID string) error

func (*InMemoryContext) GetAacSequenceHeaderForPublisher

func (c *InMemoryContext) GetAacSequenceHeaderForPublisher(streamKey string) []byte

func (*InMemoryContext) GetAvcSequenceHeaderForPublisher

func (c *InMemoryContext) GetAvcSequenceHeaderForPublisher(streamKey string) []byte

func (*InMemoryContext) GetStreams

func (c *InMemoryContext) GetStreams() map[string][]Subscriber

func (*InMemoryContext) GetSubscribersForStream

func (c *InMemoryContext) GetSubscribersForStream(streamKey string) ([]Subscriber, error)

func (*InMemoryContext) RegisterPublisher

func (c *InMemoryContext) RegisterPublisher(streamKey string) error

Registers the session in the broadcaster to keep a reference to all open subscribers

func (*InMemoryContext) RegisterSubscriber

func (c *InMemoryContext) RegisterSubscriber(streamKey string, subscriber Subscriber) error

func (*InMemoryContext) SetAacSequenceHeaderForPublisher

func (c *InMemoryContext) SetAacSequenceHeaderForPublisher(streamKey string, payload []byte)

func (*InMemoryContext) SetAvcSequenceHeaderForPublisher

func (c *InMemoryContext) SetAvcSequenceHeaderForPublisher(streamKey string, payload []byte)

func (*InMemoryContext) StreamExists

func (c *InMemoryContext) StreamExists(streamKey string) bool

type MediaServer

type MediaServer interface {
	// contains filtered or unexported methods
}

Media Server interface defines the callbacks that are called when a message is received by the server

type MessageManager

type MessageManager struct {
	// contains filtered or unexported fields
}

func NewMessageManager

func NewMessageManager(session MediaServer, handshaker *Handshaker, chunkHandler *ChunkHandler) *MessageManager

func (*MessageManager) Initialize

func (m *MessageManager) Initialize() error

Initialize performs the handshake with the client. It returns an error if the handshake was not successful. Initialize should not be called again for the remainder of the session. Calling Initialize more than once will result in an error. This method is used for servers only.

func (*MessageManager) InitializeClient

func (m *MessageManager) InitializeClient() error

InitializeClient performs the handshake with the server. It returns an error if the handshake was not successful. InitializeClient should not be called again for the remainder of the session. Calling InitializeClient more than once will result in an error. This method is used for clients only.

func (*MessageManager) SetBandwidth

func (m *MessageManager) SetBandwidth(size uint32, limitType uint8)

func (*MessageManager) SetChunkSize

func (m *MessageManager) SetChunkSize(size uint32)

func (*MessageManager) SetWindowAckSize

func (m *MessageManager) SetWindowAckSize(size uint32)

type MetadataCallback

type MetadataCallback func(metadata map[string]interface{})

type RTMPMessage

type RTMPMessage interface {
	RTMPMessageMarshaler
	RTMPMessageUnmarshaler
}

type RTMPMessageMarshaler

type RTMPMessageMarshaler interface {
	MarshalRTMPMessage() (message []byte, err error)
}

type RTMPMessageUnmarshaler

type RTMPMessageUnmarshaler interface {
	UnmarshalRTMPMessage(message []byte) error
}

type Reader

type Reader struct {
	io.Reader
	// contains filtered or unexported fields
}

func (*Reader) Read

func (r *Reader) Read(p []byte) (n int, err error)

Read reads exactly len(p) bytes from the underlying bufio.Reader into p. It returns the number of bytes copied and an error if fewer bytes were read. The error is EOF only if no bytes were read. If an EOF happens after reading some but not all the bytes, Read returns ErrUnexpectedEOF. On return, n == len(buf) if and only if err == nil.

func (*Reader) ReadByte

func (r *Reader) ReadByte() (byte, error)

ReadByte reads and returns a single byte from the underlying bufio.Reader. If no byte is available, returns an error.

type Server

type Server struct {
	Addr string
	// TODO: create Logger interface to not depend on zap directly
	Logger      *zap.Logger
	Broadcaster *Broadcaster
}

Server represents the RTMP server, where a client/app can stream media to. The server listens for incoming connections.

func (*Server) Listen

func (s *Server) Listen() error

Listen starts the server and listens for any incoming connections. If no Addr (host:port) has been assigned to the server, ":1935" is used.

type Session

type Session struct {
	MediaServer

	// Callbacks (for RTMP clients)
	OnAudio    AudioCallback
	OnMetadata MetadataCallback
	// contains filtered or unexported fields
}

Represents a connection made with the RTMP server where messages are exchanged between client/server.

func NewClientSession

func NewClientSession(app string, tcUrl string, streamKey string, audioCallback AudioCallback, metadataCallback MetadataCallback) *Session

func NewSession

func NewSession(logger *zap.Logger, b *Broadcaster) *Session

func (*Session) GetID

func (session *Session) GetID() string

func (*Session) SendAudio

func (session *Session) SendAudio(audio []byte, timestamp uint32)

func (*Session) SendEndOfStream

func (session *Session) SendEndOfStream()

func (*Session) SendMetadata

func (session *Session) SendMetadata(metadata map[string]interface{})

func (*Session) Start

func (session *Session) Start() error

Start performs the initial handshake and starts receiving streams of data. This is used for servers only. For clients, use StartPlayback().

func (*Session) StartPlayback

func (session *Session) StartPlayback() error

type Subscriber

type Subscriber interface {
	SendAudio(audio []byte, timestamp uint32)
	SendMetadata(metadata map[string]interface{})
	GetID() string
	SendEndOfStream()
}

A subscriber gets sent audio, video and data messages that flow in a particular stream (identified with streamKey)

type Writer

type Writer struct {
	io.Writer
	// contains filtered or unexported fields
}

func (*Writer) Write

func (w *Writer) Write(p []byte) (n int, err error)

Write writes to the underlying bufio.Writer and calls its Flush method at the end. If an error occurs at the Write stage, the number of bytes written and the Write error is returned. If an error occurs at the Flush stage, the number of bytes written in the Write stage and the error that happened when flushing is returned.

Directories

Path Synopsis
amf

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL