rtmp

package
v0.0.0-...-928f415 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2022 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Amf0TypeMarkerNumber     = uint8(0x00)
	Amf0TypeMarkerBoolean    = uint8(0x01)
	Amf0TypeMarkerString     = uint8(0x02)
	Amf0TypeMarkerObject     = uint8(0x03)
	Amf0TypeMarkerNull       = uint8(0x05)
	Amf0TypeMarkerEcmaArray  = uint8(0x08)
	Amf0TypeMarkerObjectEnd  = uint8(0x09)
	Amf0TypeMarkerLongString = uint8(0x0c)
)
View Source
const (
	CsidAmf   = 5
	CsidAudio = 6
	CsidVideo = 7
)
View Source
const (
	Msid1 = 1 // publish、play、onStatus 以及 音视频数据
)

Variables

View Source
var Amf0 amf0
View Source
var Amf0TypeMarkerObjectEndBytes = []byte{0, 0, Amf0TypeMarkerObjectEnd}
View Source
var (

	// LocalChunkSize
	//
	// 本端(包括Server Session和Client Session)设置的chunk size,本端发送数据时切割chunk包时使用
	// (对端发送数据时的chunk size由对端决定,和本变量没有关系)
	//
	// 注意,这个值不应该设置的太小,原因有两方面:
	// 1. 性能与带宽
	//    切割的chunk包过多,会消耗更多的CPU资源(包括本地和远端),另外还可能增加传输时的chunk header带宽消耗
	// 2. 兼容性
	//    理论上,信令也要参考chunk size切割成chunk包,而对端使用chunk包合成message的实现不一定标准。
	//    我就遇到过这样的case,对端认为rtmp握手后的几个信令,每个信令都只使用一个chunk。
	//    假如我们将一条信令切割成多个chunk,对端可能就解析错误了,这属于对端实现的问题。
	//    但为了更好的兼容性,我们不要将chunk size设置的太小。
	//
	LocalChunkSize = 4096
)

Functions

func BuildMetadata

func BuildMetadata(width int, height int, audiocodecid int, videocodecid int) ([]byte, error)

BuildMetadata spec-video_file_format_spec_v10.pdf onMetaData - duration DOUBLE, seconds - width DOUBLE - height DOUBLE - videodatarate DOUBLE - framerate DOUBLE - videocodecid DOUBLE - audiosamplerate DOUBLE - audiosamplesize DOUBLE - stereo BOOL - audiocodecid DOUBLE - filesize DOUBLE, bytes

目前包含的字段: - width - height - audiocodecid - videocodecid - version

@param width 如果为-1,则metadata中不写入该字段 @param height 如果为-1,则metadata中不写入该字段 @param audiocodecid 如果为-1,则metadata中不写入该字段

AAC 10

@param videocodecid 如果为-1,则metadata中不写入该字段

H264 7
H265 12

@return 返回的内存块为新申请的独立内存块

func Message2Chunks

func Message2Chunks(message []byte, header *base.RtmpHeader) []byte

Message2Chunks @return 返回的内存块由内部申请,不依赖参数<message>内存块

func MetadataEnsureWithSdf

func MetadataEnsureWithSdf(b []byte) ([]byte, error)

MetadataEnsureWithSdf

确保metadata中包含@setDataFrame

@return 返回的内存块为内部独立申请

func MetadataEnsureWithoutSdf

func MetadataEnsureWithoutSdf(b []byte) ([]byte, error)

MetadataEnsureWithoutSdf

确保metadata中不包含@setDataFrame

@return 返回的内存块为内部独立申请

Types

type AuthInfo

type AuthInfo struct {
	// contains filtered or unexported fields
}

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

func NewBuffer

func NewBuffer(n int) *Buffer

func (*Buffer) Bytes

func (b *Buffer) Bytes() []byte

func (*Buffer) Len

func (b *Buffer) Len() int

func (*Buffer) ModWritePos

func (b *Buffer) ModWritePos(pos int)

func (*Buffer) Reset

func (b *Buffer) Reset()

func (*Buffer) Write

func (b *Buffer) Write(p []byte) (n int, err error)

func (*Buffer) WriteByte

func (b *Buffer) WriteByte(c byte) error

func (*Buffer) WriteTo

func (b *Buffer) WriteTo(w io.Writer) (n int64, err error)

type ChunkComposer

type ChunkComposer struct {
	// contains filtered or unexported fields
}

ChunkComposer

读取chunk,并合并chunk,生成message返回给上层

func NewChunkComposer

func NewChunkComposer() *ChunkComposer

func (*ChunkComposer) RunLoop

func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error

RunLoop 将rtmp chunk合并为message

@param cb: stream.msg: 注意,回调结束后,`msg`的内存块会被`ChunkComposer`重复使用

            也即多次回调的`msg`是复用的同一块内存块
            如果业务方需要在回调结束后,依然持有`msg`,那么需要对`msg`进行拷贝
            只在回调中使用`msg`,则不需要拷贝

cb return:  如果cb返回的error不为nil,则`RunLoop`停止阻塞,并返回这个错误

@return 阻塞直到发生错误

TODO chef: msglen支持最大阈值,超过可以认为对端是非法的

func (*ChunkComposer) SetPeerChunkSize

func (c *ChunkComposer) SetPeerChunkSize(val uint32)

func (*ChunkComposer) SetReuseBufferFlag

func (c *ChunkComposer) SetReuseBufferFlag(val bool)

type ChunkDivider

type ChunkDivider struct {
	// contains filtered or unexported fields
}

func (*ChunkDivider) Message2Chunks

func (d *ChunkDivider) Message2Chunks(message []byte, header *base.RtmpHeader) []byte

Message2Chunks TODO chef: 新的 message 的第一个 chunk 始终使用 fmt0 格式,没有参考前一个 message

type ClientSession

type ClientSession struct {
	// contains filtered or unexported fields
}

ClientSession rtmp 客户端类型连接的底层实现 package rtmp 的使用者应该优先使用基于 ClientSession 实现的 PushSession 和 PullSession

func NewClientSession

func NewClientSession(sessionType base.SessionType, modOptions ...ModClientSessionOption) *ClientSession

NewClientSession @param t: session的类型,只能是推或者拉

func (*ClientSession) AppName

func (s *ClientSession) AppName() string

func (*ClientSession) Dispose

func (s *ClientSession) Dispose() error

Dispose 文档请参考: IClientSessionLifecycle interface

func (*ClientSession) Do

func (s *ClientSession) Do(rawUrl string) error

Do 阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误

func (*ClientSession) Flush

func (s *ClientSession) Flush() error

func (*ClientSession) GetStat

func (s *ClientSession) GetStat() base.StatSession

func (*ClientSession) IsAlive

func (s *ClientSession) IsAlive() (readAlive, writeAlive bool)

func (*ClientSession) RawQuery

func (s *ClientSession) RawQuery() string

func (*ClientSession) StreamName

func (s *ClientSession) StreamName() string

func (*ClientSession) UniqueKey

func (s *ClientSession) UniqueKey() string

func (*ClientSession) UpdateStat

func (s *ClientSession) UpdateStat(intervalSec uint32)

func (*ClientSession) Url

func (s *ClientSession) Url() string

func (*ClientSession) WaitChan

func (s *ClientSession) WaitChan() <-chan error

WaitChan 文档请参考: IClientSessionLifecycle interface

func (*ClientSession) Write

func (s *ClientSession) Write(msg []byte) error

type ClientSessionOption

type ClientSessionOption struct {
	// 单位毫秒,如果为0,则没有超时
	DoTimeoutMs      int // 从发起连接(包含了建立连接的时间)到收到publish或play信令结果的超时
	ReadAvTimeoutMs  int // 读取音视频数据的超时
	WriteAvTimeoutMs int // 发送音视频数据的超时

	ReadBufSize   int // io层读取音视频数据时的缓冲大小,如果为0,则没有缓冲
	WriteBufSize  int // io层发送音视频数据的缓冲大小,如果为0,则没有缓冲
	WriteChanSize int // io层发送音视频数据的异步队列大小,如果为0,则同步发送

	HandshakeComplexFlag bool // 握手是否使用复杂模式

	PeerWinAckSize int

	ReuseReadMessageBufferFlag bool // 接收Message时,是否重用内存块
}

type HandshakeClientComplex

type HandshakeClientComplex struct {
	// contains filtered or unexported fields
}

func (*HandshakeClientComplex) ReadS0S1

func (c *HandshakeClientComplex) ReadS0S1(reader io.Reader) error

func (*HandshakeClientComplex) ReadS2

func (c *HandshakeClientComplex) ReadS2(reader io.Reader) error

func (*HandshakeClientComplex) WriteC0C1

func (c *HandshakeClientComplex) WriteC0C1(writer io.Writer) error

func (*HandshakeClientComplex) WriteC2

func (c *HandshakeClientComplex) WriteC2(writer io.Writer) error

type HandshakeClientSimple

type HandshakeClientSimple struct {
	// contains filtered or unexported fields
}

func (*HandshakeClientSimple) ReadS0S1

func (c *HandshakeClientSimple) ReadS0S1(reader io.Reader) error

func (*HandshakeClientSimple) ReadS2

func (c *HandshakeClientSimple) ReadS2(reader io.Reader) error

func (*HandshakeClientSimple) WriteC0C1

func (c *HandshakeClientSimple) WriteC0C1(writer io.Writer) error

func (*HandshakeClientSimple) WriteC2

func (c *HandshakeClientSimple) WriteC2(writer io.Writer) error

type HandshakeServer

type HandshakeServer struct {
	// contains filtered or unexported fields
}

func (*HandshakeServer) ReadC0C1

func (s *HandshakeServer) ReadC0C1(reader io.Reader) (err error)

func (*HandshakeServer) ReadC2

func (s *HandshakeServer) ReadC2(reader io.Reader) error

func (*HandshakeServer) WriteS0S1S2

func (s *HandshakeServer) WriteS0S1S2(writer io.Writer) error

type IHandshakeClient

type IHandshakeClient interface {
	WriteC0C1(writer io.Writer) error
	ReadS0S1(reader io.Reader) error
	WriteC2(writer io.Writer) error
	ReadS2(reader io.Reader) error
}

type IPubSessionObserver

type IPubSessionObserver interface {
	// OnReadRtmpAvMsg 注意,回调结束后,内部会复用Payload内存块
	OnReadRtmpAvMsg(msg base.RtmpMsg)
}

type IServerObserver

type IServerObserver interface {
	OnRtmpConnect(session *ServerSession, opa ObjectPairArray)

	// OnNewRtmpPubSession
	//
	// 上层代码应该在这个事件回调中注册音视频数据的监听
	//
	// @return 上层如果想关闭这个session,则回调中返回不为nil的error值
	//
	OnNewRtmpPubSession(session *ServerSession) error

	// OnDelRtmpPubSession
	//
	// 注意,如果session是上层通过 OnNewRtmpPubSession 回调的返回值关闭的,则该session不再触发这个逻辑
	//
	OnDelRtmpPubSession(session *ServerSession)

	OnNewRtmpSubSession(session *ServerSession) error
	OnDelRtmpSubSession(session *ServerSession)
}

type IServerSessionObserver

type IServerSessionObserver interface {
	OnRtmpConnect(session *ServerSession, opa ObjectPairArray)

	// OnNewRtmpPubSession
	//
	// 上层代码应该在这个事件回调中注册音视频数据的监听
	//
	// @return 上层如果想关闭这个session,则回调中返回不为nil的error值
	//
	OnNewRtmpPubSession(session *ServerSession) error

	OnNewRtmpSubSession(session *ServerSession) error
}

type MessagePacker

type MessagePacker struct {
	// contains filtered or unexported fields
}

MessagePacker 打包并发送 rtmp 信令

func NewMessagePacker

func NewMessagePacker() *MessagePacker

func (*MessagePacker) ChunkAndWrite

func (packer *MessagePacker) ChunkAndWrite(writer io.Writer, csid int, typeid uint8, streamid int) error

type ModClientSessionOption

type ModClientSessionOption func(option *ClientSessionOption)

type ModPullSessionOption

type ModPullSessionOption func(option *PullSessionOption)

type ModPushSessionOption

type ModPushSessionOption func(option *PushSessionOption)

type ObjectPair

type ObjectPair struct {
	Key   string
	Value interface{} // TODO(chef): [perf] 考虑换成泛型 202206
}

type ObjectPairArray

type ObjectPairArray []ObjectPair

func ParseMetadata

func ParseMetadata(b []byte) (ObjectPairArray, error)

func (ObjectPairArray) DebugString

func (o ObjectPairArray) DebugString() string

func (ObjectPairArray) Find

func (o ObjectPairArray) Find(key string) interface{}

func (ObjectPairArray) FindNumber

func (o ObjectPairArray) FindNumber(key string) (int, error)

func (ObjectPairArray) FindString

func (o ObjectPairArray) FindString(key string) (string, error)

type OnCompleteMessage

type OnCompleteMessage func(stream *Stream) error

type OnReadRtmpAvMsg

type OnReadRtmpAvMsg func(msg base.RtmpMsg)

type PullSession

type PullSession struct {
	// contains filtered or unexported fields
}

func NewPullSession

func NewPullSession(modOptions ...ModPullSessionOption) *PullSession

func (*PullSession) AppName

func (s *PullSession) AppName() string

AppName 文档请参考: interface ISessionUrlContext

func (*PullSession) Dispose

func (s *PullSession) Dispose() error

Dispose 文档请参考: IClientSessionLifecycle interface

func (*PullSession) GetStat

func (s *PullSession) GetStat() base.StatSession

GetStat 文档请参考: interface ISessionStat

func (*PullSession) IsAlive

func (s *PullSession) IsAlive() (readAlive, writeAlive bool)

IsAlive 文档请参考: interface ISessionStat

func (*PullSession) Pull

func (s *PullSession) Pull(rawUrl string) error

Pull 阻塞直到和对端完成拉流前的所有准备工作(也即收到RTMP Play response),或者发生错误

func (*PullSession) RawQuery

func (s *PullSession) RawQuery() string

RawQuery 文档请参考: interface ISessionUrlContext

func (*PullSession) StreamName

func (s *PullSession) StreamName() string

StreamName 文档请参考: interface ISessionUrlContext

func (*PullSession) UniqueKey

func (s *PullSession) UniqueKey() string

UniqueKey 文档请参考: interface IObject

func (*PullSession) UpdateStat

func (s *PullSession) UpdateStat(intervalSec uint32)

UpdateStat 文档请参考: interface ISessionStat

func (*PullSession) Url

func (s *PullSession) Url() string

Url 文档请参考: interface ISessionUrlContext

func (*PullSession) WaitChan

func (s *PullSession) WaitChan() <-chan error

WaitChan 文档请参考: IClientSessionLifecycle interface

func (*PullSession) WithOnPullSucc

func (s *PullSession) WithOnPullSucc(onPullResult func()) *PullSession

WithOnPullSucc Pull成功

如果你想保证绝对时序,在 WithOnReadRtmpAvMsg 回调音视频数据前,做一些操作,那么使用这个回调替代 Pull 返回成功

func (*PullSession) WithOnReadRtmpAvMsg

func (s *PullSession) WithOnReadRtmpAvMsg(onReadRtmpAvMsg OnReadRtmpAvMsg) *PullSession

WithOnReadRtmpAvMsg

@param onReadRtmpAvMsg:

msg: 关于内存块的说明:
  ReuseReadMessageBufferFlag 为true时:
    回调结束后,`msg`的内存块会被`PullSession`重复使用。
    也即多次回调的`msg`是复用的同一块内存块。
    如果业务方需要在回调结束后,依然持有`msg`,那么需要对`msg`进行拷贝,比如调用`msg.Clone()`。
    只在回调中使用`msg`,则不需要拷贝。
  ReuseReadMessageBufferFlag 为false时:
    回调接收后,`PullSession`不再使用该内存块。
    业务方可以自由持有释放该内存块。

type PullSessionOption

type PullSessionOption struct {
	// PullTimeoutMs
	//
	// 从调用Pull函数,到接收音视频数据的前一步,也即收到服务端返回的rtmp play对应结果的信令的超时时间
	// 如果为0,则没有超时时间
	//
	PullTimeoutMs int

	ReadAvTimeoutMs            int
	ReadBufSize                int // io层读取音视频数据时的缓冲大小,如果为0,则没有缓冲
	HandshakeComplexFlag       bool
	PeerWinAckSize             int
	ReuseReadMessageBufferFlag bool // 接收Message时,是否复用内存块
}

type PushSession

type PushSession struct {
	IsFresh bool
	// contains filtered or unexported fields
}

func NewPushSession

func NewPushSession(modOptions ...ModPushSessionOption) *PushSession

func (*PushSession) AppName

func (s *PushSession) AppName() string

AppName 文档请参考: interface ISessionUrlContext

func (*PushSession) Dispose

func (s *PushSession) Dispose() error

Dispose 文档请参考: IClientSessionLifecycle interface

func (*PushSession) Flush

func (s *PushSession) Flush() error

Flush 将缓存的数据立即刷新发送 是否有缓存策略,请参见配置及内部实现

func (*PushSession) GetStat

func (s *PushSession) GetStat() base.StatSession

GetStat 文档请参考: interface ISessionStat

func (*PushSession) IsAlive

func (s *PushSession) IsAlive() (readAlive, writeAlive bool)

IsAlive 文档请参考: interface ISessionStat

func (*PushSession) Push

func (s *PushSession) Push(rawUrl string) error

Push 阻塞直到和对端完成推流前,握手部分的工作(也即收到RTMP Publish response),或者发生错误

func (*PushSession) RawQuery

func (s *PushSession) RawQuery() string

RawQuery 文档请参考: interface ISessionUrlContext

func (*PushSession) StreamName

func (s *PushSession) StreamName() string

StreamName 文档请参考: interface ISessionUrlContext

func (*PushSession) UniqueKey

func (s *PushSession) UniqueKey() string

UniqueKey 文档请参考: interface IObject

func (*PushSession) UpdateStat

func (s *PushSession) UpdateStat(intervalSec uint32)

UpdateStat 文档请参考: interface ISessionStat

func (*PushSession) Url

func (s *PushSession) Url() string

Url 文档请参考: interface ISessionUrlContext

func (*PushSession) WaitChan

func (s *PushSession) WaitChan() <-chan error

WaitChan 文档请参考: IClientSessionLifecycle interface

func (*PushSession) Write

func (s *PushSession) Write(msg []byte) error

Write 发送数据

@param msg: 注意,`msg`数据应该是已经打包成rtmp chunk格式的数据。这里的数据就对应socket发送的数据,内部不会再修改数据内容。

type PushSessionOption

type PushSessionOption struct {
	// 从调用Push函数,到可以发送音视频数据的前一步,也即收到服务端返回的rtmp publish对应结果的信令的超时时间
	// 如果为0,则没有超时时间
	PushTimeoutMs int

	WriteAvTimeoutMs     int
	WriteBufSize         int // io层发送音视频数据的缓冲大小,如果为0,则没有缓冲
	WriteChanSize        int // io层发送音视频数据的异步队列大小,如果为0,则同步发送
	HandshakeComplexFlag bool
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(addr string, observer IServerObserver) *Server

func (*Server) Dispose

func (server *Server) Dispose()

func (*Server) Listen

func (server *Server) Listen() (err error)

func (*Server) OnNewRtmpPubSession

func (server *Server) OnNewRtmpPubSession(session *ServerSession) error

func (*Server) OnNewRtmpSubSession

func (server *Server) OnNewRtmpSubSession(session *ServerSession) error

func (*Server) OnRtmpConnect

func (server *Server) OnRtmpConnect(session *ServerSession, opa ObjectPairArray)

func (*Server) RunLoop

func (server *Server) RunLoop() error

type ServerSession

type ServerSession struct {

	// IsFresh ShouldWaitVideoKeyFrame
	//
	// 只有sub类型需要
	//
	// IsFresh
	//  表示是新加入的session,需要新发送meta,vsh,ash以及gop等数据,再转发实时数据。
	//
	// ShouldWaitVideoKeyFrame
	//  表示是新加入的session,正在等待视频关键帧。
	//  注意,需要考虑没有纯音频流的场景。
	//
	IsFresh                 bool
	ShouldWaitVideoKeyFrame bool

	DisposeByObserverFlag bool
	// contains filtered or unexported fields
}

func NewServerSession

func NewServerSession(observer IServerSessionObserver, conn net.Conn) *ServerSession

func (*ServerSession) AppName

func (s *ServerSession) AppName() string

func (*ServerSession) Dispose

func (s *ServerSession) Dispose() error

func (*ServerSession) Flush

func (s *ServerSession) Flush() error

func (*ServerSession) GetStat

func (s *ServerSession) GetStat() base.StatSession

func (*ServerSession) IsAlive

func (s *ServerSession) IsAlive() (readAlive, writeAlive bool)

func (*ServerSession) RawQuery

func (s *ServerSession) RawQuery() string

func (*ServerSession) RunLoop

func (s *ServerSession) RunLoop() (err error)

func (*ServerSession) SetPubSessionObserver

func (s *ServerSession) SetPubSessionObserver(observer IPubSessionObserver)

func (*ServerSession) StreamName

func (s *ServerSession) StreamName() string

func (*ServerSession) UniqueKey

func (s *ServerSession) UniqueKey() string

func (*ServerSession) UpdateStat

func (s *ServerSession) UpdateStat(intervalSec uint32)

func (*ServerSession) Url

func (s *ServerSession) Url() string

func (*ServerSession) Write

func (s *ServerSession) Write(msg []byte) error

func (*ServerSession) Writev

func (s *ServerSession) Writev(msgs net.Buffers) error

type ServerSessionType

type ServerSessionType int
const (
	ServerSessionTypeUnknown ServerSessionType = iota // 收到客户端的publish或者play信令之前的类型状态
	ServerSessionTypePub
	ServerSessionTypeSub
)

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewStream

func NewStream() *Stream

type StreamMsg

type StreamMsg struct {
	// contains filtered or unexported fields
}

func (*StreamMsg) Flush

func (msg *StreamMsg) Flush(n uint32)

func (*StreamMsg) Grow

func (msg *StreamMsg) Grow(n uint32)

Grow 确保可写空间,如果不够会扩容

func (*StreamMsg) Len

func (msg *StreamMsg) Len() uint32

func (*StreamMsg) Reset

func (msg *StreamMsg) Reset()

func (*StreamMsg) ResetAndFree

func (msg *StreamMsg) ResetAndFree()

func (*StreamMsg) Skip

func (msg *StreamMsg) Skip(n uint32)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL