Documentation ¶
Index ¶
- Constants
- Variables
- type AudioCallback
- type Broadcaster
- func (b *Broadcaster) BroadcastAudio(streamKey string, audio []byte, timestamp uint32) error
- func (b *Broadcaster) BroadcastEndOfStream(streamKey string)
- func (b *Broadcaster) BroadcastMetadata(streamKey string, metadata map[string]interface{}) error
- func (b *Broadcaster) DestroyPublisher(streamKey string) error
- func (b *Broadcaster) DestroySubscriber(streamKey string, sessionID string) error
- func (b *Broadcaster) GetAacSequenceHeaderForPublisher(streamKey string) []byte
- func (b *Broadcaster) GetAvcSequenceHeaderForPublisher(streamKey string) []byte
- func (b *Broadcaster) RegisterPublisher(streamKey string) error
- func (b *Broadcaster) RegisterSubscriber(streamKey string, subscriber Subscriber) error
- func (b *Broadcaster) SetAacSequenceHeaderForPublisher(streamKey string, payload []byte)
- func (b *Broadcaster) SetAvcSequenceHeaderForPublisher(streamKey string, payload []byte)
- func (b *Broadcaster) StreamExists(streamKey string) bool
- type Chunk
- type ChunkBasicHeader
- type ChunkData
- type ChunkHandler
- func (chunkHandler *ChunkHandler) ReadChunkData(header ChunkHeader) (payload []byte, n int, err error)
- func (chunkHandler *ChunkHandler) ReadChunkHeader() (ch ChunkHeader, n int, err error)
- func (chunkHandler *ChunkHandler) SetBandwidth(size uint32, limitType uint8)
- func (chunkHandler *ChunkHandler) SetChunkSize(size uint32)
- func (chunkHandler *ChunkHandler) SetWindowAckSize(size uint32)
- type ChunkHeader
- type ChunkMessageHeader
- type Client
- type ContextStore
- type Handshaker
- type InMemoryContext
- func (c *InMemoryContext) DestroyPublisher(streamKey string) error
- func (c *InMemoryContext) DestroySubscriber(streamKey string, sessionID string) error
- func (c *InMemoryContext) GetAacSequenceHeaderForPublisher(streamKey string) []byte
- func (c *InMemoryContext) GetAvcSequenceHeaderForPublisher(streamKey string) []byte
- func (c *InMemoryContext) GetStreams() map[string][]Subscriber
- func (c *InMemoryContext) GetSubscribersForStream(streamKey string) ([]Subscriber, error)
- func (c *InMemoryContext) RegisterPublisher(streamKey string) error
- func (c *InMemoryContext) RegisterSubscriber(streamKey string, subscriber Subscriber) error
- func (c *InMemoryContext) SetAacSequenceHeaderForPublisher(streamKey string, payload []byte)
- func (c *InMemoryContext) SetAvcSequenceHeaderForPublisher(streamKey string, payload []byte)
- func (c *InMemoryContext) StreamExists(streamKey string) bool
- type MediaServer
- type MessageManager
- type MetadataCallback
- type RTMPMessage
- type RTMPMessageMarshaler
- type RTMPMessageUnmarshaler
- type Reader
- type Server
- type Session
- func (session *Session) GetID() string
- func (session *Session) SendAudio(audio []byte, timestamp uint32)
- func (session *Session) SendEndOfStream()
- func (session *Session) SendMetadata(metadata map[string]interface{})
- func (session *Session) Start() error
- func (session *Session) StartPlayback() error
- type Subscriber
- type Writer
Constants ¶
const ( ChunkType0 uint8 = 0 ChunkType1 uint8 = 1 ChunkType2 uint8 = 2 ChunkType3 uint8 = 3 )
Chunk types
const ( // Only Protocol Channel is defined in the spec (csid = 2), the others are defined by me with the idea of being // consistent in sending the same type of data through the same chunk stream id ProtocolChannel uint8 = 2 AudioChannel uint8 = 4 )
const ( LimitHard uint8 = 0 LimitSoft uint8 = 1 LimitDynamic uint8 = 2 // Not part of the spec, it's for our internal use when a LimitDynamic message comes in LimitNotSet uint8 = 3 )
const ( // Control messages MUST have message stream ID 0 and be sent in chunk stream ID 2 SetChunkSize = 1 AbortMessage = 2 Ack = 3 WindowAckSize = 5 SetPeerBandwidth = 6 UserControlMessage = 4 )
Control message types
const ( CommandMessageAMF0 = 20 CommandMessageAMF3 = 17 DataMessageAMF0 = 18 DataMessageAMF3 = 15 AudioMessage = 8 VideoMessage = 9 AggregateMessage = 22 )
Types of messages and commands
const DefaultMaximumChunkSize = 128
const (
EventStreamBegin uint16 = 0
)
const NetConnectionSucces = "NetConnection.Connect.Success"
const RtmpVersion3 = 3
Variables ¶
var ErrHandshakeAlreadyCompleted error = errors.New("invalid call to perform handshake, attempted to perform " +
"handshake more than once")
var ErrInvalidScheme error = errors.New("invalid scheme in URL")
var ErrUnsupportedRTMPVersion error = errors.New("The version of RTMP is not supported")
var ErrWrongC2Message error = errors.New("server handshake: s1 and c2 handshake messages do not match")
var ErrWrongS2Message error = errors.New("client handshake: c1 and s2 handshake messages do not match")
var InvalidChunkType error = errors.New("chunk handler: unknown chunk type")
var StreamNotFound error = errors.New("StreamNotFound")
Functions ¶
This section is empty.
Types ¶
type AudioCallback ¶
type AudioCallback func(format audio.Format, sampleRate audio.SampleRate, sampleSize audio.SampleSize, channels audio.Channel, payload []byte, timestamp uint32)
type Broadcaster ¶
type Broadcaster struct {
// contains filtered or unexported fields
}
func NewBroadcaster ¶
func NewBroadcaster(context ContextStore) *Broadcaster
func (*Broadcaster) BroadcastAudio ¶
func (b *Broadcaster) BroadcastAudio(streamKey string, audio []byte, timestamp uint32) error
func (*Broadcaster) BroadcastEndOfStream ¶
func (b *Broadcaster) BroadcastEndOfStream(streamKey string)
func (*Broadcaster) BroadcastMetadata ¶
func (b *Broadcaster) BroadcastMetadata(streamKey string, metadata map[string]interface{}) error
func (*Broadcaster) DestroyPublisher ¶
func (b *Broadcaster) DestroyPublisher(streamKey string) error
func (*Broadcaster) DestroySubscriber ¶
func (b *Broadcaster) DestroySubscriber(streamKey string, sessionID string) error
func (*Broadcaster) GetAacSequenceHeaderForPublisher ¶
func (b *Broadcaster) GetAacSequenceHeaderForPublisher(streamKey string) []byte
func (*Broadcaster) GetAvcSequenceHeaderForPublisher ¶
func (b *Broadcaster) GetAvcSequenceHeaderForPublisher(streamKey string) []byte
func (*Broadcaster) RegisterPublisher ¶
func (b *Broadcaster) RegisterPublisher(streamKey string) error
func (*Broadcaster) RegisterSubscriber ¶
func (b *Broadcaster) RegisterSubscriber(streamKey string, subscriber Subscriber) error
func (*Broadcaster) SetAacSequenceHeaderForPublisher ¶
func (b *Broadcaster) SetAacSequenceHeaderForPublisher(streamKey string, payload []byte)
func (*Broadcaster) SetAvcSequenceHeaderForPublisher ¶
func (b *Broadcaster) SetAvcSequenceHeaderForPublisher(streamKey string, payload []byte)
func (*Broadcaster) StreamExists ¶
func (b *Broadcaster) StreamExists(streamKey string) bool
type Chunk ¶
type Chunk struct { Header *ChunkHeader Body *ChunkData }
type ChunkBasicHeader ¶
type ChunkHandler ¶
type ChunkHandler struct {
// contains filtered or unexported fields
}
Chunk handler is in charge of reading chunk headers and data. It will assemble a message from multiple chunks if it has to.
func NewChunkHandler ¶
func NewChunkHandler(reader *bufio.Reader, writer *bufio.Writer) *ChunkHandler
func (*ChunkHandler) ReadChunkData ¶
func (chunkHandler *ChunkHandler) ReadChunkData(header ChunkHeader) (payload []byte, n int, err error)
func (*ChunkHandler) ReadChunkHeader ¶
func (chunkHandler *ChunkHandler) ReadChunkHeader() (ch ChunkHeader, n int, err error)
func (*ChunkHandler) SetBandwidth ¶
func (chunkHandler *ChunkHandler) SetBandwidth(size uint32, limitType uint8)
func (*ChunkHandler) SetChunkSize ¶
func (chunkHandler *ChunkHandler) SetChunkSize(size uint32)
func (*ChunkHandler) SetWindowAckSize ¶
func (chunkHandler *ChunkHandler) SetWindowAckSize(size uint32)
Sets the window acknowledgement size to the new size
type ChunkHeader ¶
type ChunkHeader struct { BasicHeader *ChunkBasicHeader MessageHeader *ChunkMessageHeader ExtendedTimestamp uint32 // Total elapsed time = timestamp + deltas ElapsedTime uint32 }
type ChunkMessageHeader ¶
type Client ¶
type Client struct { OnAudio AudioCallback OnMetadata MetadataCallback // contains filtered or unexported fields }
type ContextStore ¶
type ContextStore interface { RegisterPublisher(streamKey string) error DestroyPublisher(streamKey string) error RegisterSubscriber(streamKey string, subscriber Subscriber) error GetSubscribersForStream(streamKey string) ([]Subscriber, error) DestroySubscriber(streamKey string, sessionID string) error StreamExists(streamKey string) bool SetAvcSequenceHeaderForPublisher(streamKey string, payload []byte) GetAvcSequenceHeaderForPublisher(streamKey string) []byte SetAacSequenceHeaderForPublisher(streamKey string, payload []byte) GetAacSequenceHeaderForPublisher(streamKey string) []byte GetStreams() map[string][]Subscriber }
type Handshaker ¶
type Handshaker struct {
// contains filtered or unexported fields
}
func NewHandshaker ¶
func NewHandshaker(reader *bufio.Reader, writer *bufio.Writer) *Handshaker
func (*Handshaker) ClientHandshake ¶
func (h *Handshaker) ClientHandshake() error
func (*Handshaker) Handshake ¶
func (h *Handshaker) Handshake() error
type InMemoryContext ¶
type InMemoryContext struct { ContextStore // contains filtered or unexported fields }
func NewInMemoryContext ¶
func NewInMemoryContext() *InMemoryContext
func (*InMemoryContext) DestroyPublisher ¶
func (c *InMemoryContext) DestroyPublisher(streamKey string) error
func (*InMemoryContext) DestroySubscriber ¶
func (c *InMemoryContext) DestroySubscriber(streamKey string, sessionID string) error
func (*InMemoryContext) GetAacSequenceHeaderForPublisher ¶
func (c *InMemoryContext) GetAacSequenceHeaderForPublisher(streamKey string) []byte
func (*InMemoryContext) GetAvcSequenceHeaderForPublisher ¶
func (c *InMemoryContext) GetAvcSequenceHeaderForPublisher(streamKey string) []byte
func (*InMemoryContext) GetStreams ¶
func (c *InMemoryContext) GetStreams() map[string][]Subscriber
func (*InMemoryContext) GetSubscribersForStream ¶
func (c *InMemoryContext) GetSubscribersForStream(streamKey string) ([]Subscriber, error)
func (*InMemoryContext) RegisterPublisher ¶
func (c *InMemoryContext) RegisterPublisher(streamKey string) error
Registers the session in the broadcaster to keep a reference to all open subscribers
func (*InMemoryContext) RegisterSubscriber ¶
func (c *InMemoryContext) RegisterSubscriber(streamKey string, subscriber Subscriber) error
func (*InMemoryContext) SetAacSequenceHeaderForPublisher ¶
func (c *InMemoryContext) SetAacSequenceHeaderForPublisher(streamKey string, payload []byte)
func (*InMemoryContext) SetAvcSequenceHeaderForPublisher ¶
func (c *InMemoryContext) SetAvcSequenceHeaderForPublisher(streamKey string, payload []byte)
func (*InMemoryContext) StreamExists ¶
func (c *InMemoryContext) StreamExists(streamKey string) bool
type MediaServer ¶
type MediaServer interface {
// contains filtered or unexported methods
}
Media Server interface defines the callbacks that are called when a message is received by the server
type MessageManager ¶
type MessageManager struct {
// contains filtered or unexported fields
}
func NewMessageManager ¶
func NewMessageManager(session MediaServer, handshaker *Handshaker, chunkHandler *ChunkHandler) *MessageManager
func (*MessageManager) Initialize ¶
func (m *MessageManager) Initialize() error
Initialize performs the handshake with the client. It returns an error if the handshake was not successful. Initialize should not be called again for the remainder of the session. Calling Initialize more than once will result in an error. This method is used for servers only.
func (*MessageManager) InitializeClient ¶
func (m *MessageManager) InitializeClient() error
InitializeClient performs the handshake with the server. It returns an error if the handshake was not successful. InitializeClient should not be called again for the remainder of the session. Calling InitializeClient more than once will result in an error. This method is used for clients only.
func (*MessageManager) SetBandwidth ¶
func (m *MessageManager) SetBandwidth(size uint32, limitType uint8)
func (*MessageManager) SetChunkSize ¶
func (m *MessageManager) SetChunkSize(size uint32)
func (*MessageManager) SetWindowAckSize ¶
func (m *MessageManager) SetWindowAckSize(size uint32)
type MetadataCallback ¶
type MetadataCallback func(metadata map[string]interface{})
type RTMPMessage ¶
type RTMPMessage interface { RTMPMessageMarshaler RTMPMessageUnmarshaler }
type RTMPMessageMarshaler ¶
type RTMPMessageUnmarshaler ¶
type Reader ¶
func (*Reader) Read ¶
Read reads exactly len(p) bytes from the underlying bufio.Reader into p. It returns the number of bytes copied and an error if fewer bytes were read. The error is EOF only if no bytes were read. If an EOF happens after reading some but not all the bytes, Read returns ErrUnexpectedEOF. On return, n == len(buf) if and only if err == nil.
type Server ¶
type Server struct { Addr string // TODO: create Logger interface to not depend on zap directly Logger *zap.Logger Broadcaster *Broadcaster }
Server represents the RTMP server, where a client/app can stream media to. The server listens for incoming connections.
type Session ¶
type Session struct { MediaServer // Callbacks (for RTMP clients) OnAudio AudioCallback OnMetadata MetadataCallback // contains filtered or unexported fields }
Represents a connection made with the RTMP server where messages are exchanged between client/server.
func NewClientSession ¶
func NewClientSession(app string, tcUrl string, streamKey string, audioCallback AudioCallback, metadataCallback MetadataCallback) *Session
func NewSession ¶
func NewSession(logger *zap.Logger, b *Broadcaster) *Session
func (*Session) SendEndOfStream ¶
func (session *Session) SendEndOfStream()
func (*Session) SendMetadata ¶
func (*Session) Start ¶
Start performs the initial handshake and starts receiving streams of data. This is used for servers only. For clients, use StartPlayback().
func (*Session) StartPlayback ¶
type Subscriber ¶
type Subscriber interface { SendAudio(audio []byte, timestamp uint32) SendMetadata(metadata map[string]interface{}) GetID() string SendEndOfStream() }
A subscriber gets sent audio, video and data messages that flow in a particular stream (identified with streamKey)
type Writer ¶
func (*Writer) Write ¶
Write writes to the underlying bufio.Writer and calls its Flush method at the end. If an error occurs at the Write stage, the number of bytes written and the Write error is returned. If an error occurs at the Flush stage, the number of bytes written in the Write stage and the error that happened when flushing is returned.