engine

package module
v0.0.0-...-b128a35 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2024 License: MIT Imports: 40 Imported by: 0

README

m7s v4核心引擎

该项目为m7s的引擎部分,该部分逻辑是流媒体服务器的核心转发逻辑。包含了一个插件的引入机制,其他功能均由插件实现

引擎的基本功能

  • 提供插件机制,对插件的启动,配置解析,事件派发等进行统一管理
  • 提供H264、H265、AAC、G711格式的转发
  • 提供可复用的AVCC格式、RTP格式、AnnexB格式、ADTS格式等预封装机制
  • 提供多Track机制,支持大小流,加密流扩展
  • 提供DataTrack机制,可用于实现房间文字聊天等功能
  • 提供时间戳同步机制,限速机制
  • 提供RTP包乱序重排机制
  • 提供订阅者追帧跳帧机制
  • 提供发布订阅对外推拉的基础架构
  • 提供鉴权机制的底层架构支持
  • 提供内存复用机制
  • 提供发布者断线重连机制
  • 提供按需拉流机制
  • 提供HTTP服务端口公用机制
  • 提供HTTP API接口自动注册机制
  • 提供HTTP接口中间件机制
  • 提供结构化日志
  • 提供流信息统计和输出
  • 提供事件总线机制,可以对所有插件广播事件
  • 提供配置热更新机制

引擎自带HTTP接口

  • 获取某一个流的详情 /api/stream?streamPath=xxx
  • 终止某一个流 /api/closestream?streamPath=xxx
  • 获取engine信息 /api/sysInfo 返回值{Version:xxx,StartTime:xxx,IP:[xxx.xxx.xxx.xxx]}
  • 获取系统基本情况 /api/summary 返回值Summary数据
  • 获取所有插件信息 /api/plugins 返回值Plugin数据
  • 读取mp4文件再次发布为视频流 /api/replay/mp4?streamPath=xxx&dump=filepath filepath是文件路径
  • 读取ts文件再次发布为视频流 /api/replay/ts?streamPath=xxx&dump=filepath filepath是文件路径
  • 获取指定的配置信息 /api/getconfig?name=xxx 返回xxx插件的配置信息,如果不带参数或参数为空则返回全局配置
  • 修改并保存配置信息 /api/modifyconfig?name=xxx&yaml=1 修改xxx插件的配置信息,在请求的body中传入修改后的配置yaml字符串
  • 热更新配置信息 /api/updateconfig?name=xxx 热更新xxx插件的配置信息,如果不带参数或参数为空则热更新全局配置
  • 获取所有远端拉流信息 /api/list/pull 返回{RemoteURL:"",StreamPath:"",Type:"",StartTime:""}
  • 获取所有向远端推流信息 /api/list/push 返回{RemoteURL:"",StreamPath:"",Type:"",StartTime:""}
  • 停止推流 /api/stop/push?url=xxx 停止向xxx推流 ,成功返回ok
  • 停止某个订阅者 /api/stop/subscribe?streamPath=xxx&id=xxx 停止xxx流的xxx订阅者 ,成功返回ok
  • 插入SEI帧 /api/insertsei?streamPath=xxx&type=5 向xxx流内插入SEI帧 ,成功返回ok。type为SEI类型,可选,默认是5

引擎默认配置

global:
  disableall: false # 是否禁用所有插件
  loglang: zh # 日志语言,可选值:zh,en
  loglevel: info # 日志级别,可选值:debug,info,warn,error,panic,fatal
  http:
    listenaddr: :8080 # 网关地址,用于访问API
    listenaddrtls: :8443  # 用于HTTPS方式访问API的端口配置
    certfile: ""
    keyfile: ""
    cors: true  # 是否自动添加cors头
    username: ""  # 用户名和密码,用于API访问时的基本身份认证
    password: ""
    readtimeout: 0 # 读取超时时间,0为不限制
    writetimeout: 0 # 写入超时时间,0为不限制
    idletimeout: 0 # 空闲超时时间,0为不限制
  publish:
      pubaudio: true # 是否发布音频流
      pubvideo: true # 是否发布视频流
      kickexist: false # 剔出已经存在的发布者,用于顶替原有发布者
      insertsei: false # 是否开启插入SEI信息功能
      publishtimeout: 10s # 发布流默认过期时间,超过该时间发布者没有恢复流将被删除
      delayclosetimeout: 0 # 自动关闭触发后延迟的时间(期间内如果有新的订阅则取消触发关闭),0为关闭该功能,保持连接。
      waitclosetimeout: 0 # 发布者断开后等待时间,超过该时间发布者没有恢复流将被删除,0为关闭该功能,由订阅者决定是否删除
      buffertime: 0 # 缓存时间,用于时光回溯,0为关闭缓存
      idletimeout: 0 # 空闲超时时间,0为不限制
      speedlimit: 500ms # 限速超时时间 0为不限速,对于读取文件这类流需要限速,否则读取过快(如果流的时间戳不正确,则只能关闭该功能:设置为0)
      key:                      # 发布鉴权key
	    secretargname: secret     # 发布鉴权参数名
	    expireargname:   expire   # 发布鉴权失效时间参数名
  subscribe:
      subaudio: true # 是否订阅音频流
      subvideo: true # 是否订阅视频流
      subaudioargname: ats # 订阅音频轨道参数名
      subvideoargname: vts # 订阅视频轨道参数名
      subdataargname: dts # 订阅数据轨道参数名
      subaudiotracks: [] # 订阅音频轨道名称列表
      subvideotracks: [] # 订阅视频轨道名称列表
      submode: 0 # 订阅模式,0为跳帧追赶模式,1为不追赶(多用于录制),2为时光回溯模式
      syncmode: 0 # 音视频同步模式,0按照时间戳同步,1按照写入时间同步(在时间戳不正确的时候)
      iframeonly: false # 只订阅关键帧
      waittimeout: 10s # 等待发布者的超时时间,用于订阅尚未发布的流
      writebuffersize: 0 # 订阅者写缓存大小,用于减少io次数,但可能影响实时性
      key:                      # 订阅鉴权key
	    secretargname: secret     # 订阅鉴权参数名
	    expireargname:   expire   # 订阅鉴权失效时间参数名
      internal: false # 是否内部订阅,内部订阅不会触发发布者自动断开功能
  enableavcc : true  # 启用AVCC格式缓存,用于rtmp协议
  enablertp : true # 启用rtp格式缓存,用于rtsp、websocket、gb28181协议
  enableauth: true # 启用鉴权,详细查看鉴权机制
  enablesubevent: true # 启用订阅事件,用于订阅者上下线事件,关闭可以提高性能
  rtpreoderbufferlen: 50 # rtp乱序重排缓存长度
  eventbussize: 10 # 事件总线缓存大小,事件较多时容易堵阻塞线程,需要增大缓存
  poolsize: 0 # 内存池大小,0为不使用内存池
  pulseinterval: 5s # 心跳事件间隔时间
  console: 
    server : console.monibuca.com:44944 # 连接远程控制台的地址
    secret: "" # 远程控制台的秘钥
    publicaddr: "" # 实例公网地址,提供远程控制台访问的地址,不配置的话使用自动识别的地址
    publicaddrtls: "" # 实例公网地址,提供远程控制台访问的地址,不配置的话使用自动识别的地址(https)

配置覆盖机制

  • 如果不存在配置文件,将使用默认配置,该配置值为代码中写死的配置值
  • 如果存在配置文件,则使用配置文件中的值覆盖默认值
  • http、publish、subscribe三个配置遵循优先级顺序
  1. 如果发布流或者订阅流中包含对应的参数,则优先使用
  2. 其次,查找对应插件的配置项中是否包含配置项
  3. 最后,使用全局配置中的配置

流的状态图

stateDiagram-v2
    [*] --> ⌛等待发布者 : 创建
    ⌛等待发布者 --> 🟢正在发布 :发布
    ⌛等待发布者 --> 🔴已关闭 :关闭
    ⌛等待发布者 --> 🔴已关闭  :超时
    ⌛等待发布者 --> 🔴已关闭  :最后订阅者离开
    🟢正在发布 --> ⌛等待发布者: 发布者断开
    🟢正在发布 --> 🟡等待关闭: 最后订阅者离开
    🟢正在发布 --> 🔴已关闭  :关闭
    🟡等待关闭 --> 🟢正在发布 :第一个订阅者进入
    🟡等待关闭 --> 🔴已关闭  :关闭
    🟡等待关闭 --> 🔴已关闭  :超时
    🟡等待关闭 --> 🔴已关闭  :发布者断开

鉴权机制

默认鉴权

在publish 和 subscribe 中配置 key 引擎会自动进行鉴权, 推流或者拉流时需要在url中添加参数 secret=xxx&expire=xxx。

  • secret为鉴权前面,MD5(key+StreamPath+expire)
  • expire为鉴权失效时间,格式是十六进制 UNIX 时间戳
时间戳计算
设置时间:2018.12.01 08:30:00
十进制 UNIX 时间戳:1543624200
十六进制 UNIX 时间戳:5C01D608(云直播鉴权配置使用十六进制 UNIX 时间戳,十六进制不区分字母大小写)
鉴权签名计算
secret = MD5(key+StreamPath+expire) 
secret = MD5(ngoeiq03+test/01+5C01D608)
secret = MD5(ngoeiq03test/015C01D608)
secret = ce797dc6238156d548ef945e6ad1ea20

单独鉴权

如果需要自定义鉴权,可以在插件中实现鉴权接口, 引擎中定义如下两个接口,插件中的发布者或者订阅者可以实现这两个接口,引擎会在发布或者订阅时调用这两个接口进行鉴权

type AuthSub interface {
	OnAuth(*util.Promise[ISubscriber]) error
}

type AuthPub interface {
	OnAuth(*util.Promise[IPublisher]) error
}
  • OnAuth返回错误即鉴权失败
  • Promise方便异步鉴权,可以后续调用其Resolve或Reject方法进行鉴权结果的返回

全局鉴权

自定义鉴权也可以全局生效, 引擎中定义如下两个全局函数的变量,插件中可以对这两个变量进行赋值,引擎会在发布或者订阅时调用这两个接口进行鉴权

var OnAuthSub func(p *util.Promise[ISubscriber]) error
var OnAuthPub func(p *util.Promise[IPublisher]) error

** 注意:如果单独鉴权和全局鉴权同时存在,优先使用单独鉴权 ** ** 全局鉴权函数可以被多次覆盖,所以需要自己实现鉴权逻辑的合并 **

Http中间件

在HTTPConfig接口中增加了AddMiddleware方法,可以通过该方法添加中间件,中间件的定义如下

type Middleware func(string, http.Handler) http.Handler
type HTTPConfig interface {
	GetHTTPConfig() *HTTP
	Listen(ctx context.Context) error
	Handle(string, http.Handler)
	AddMiddleware(Middleware)
}

中间件的添加必须在FirstConfig之前,也就是在Listen之前 例如:

type MyMiddlewareConfig struct {
  	config.HTTP
}
var myMiddlewareConfig = &MyMiddlewareConfig{}
func init(){
  myMiddlewareConfig.AddMiddleware(func(pattern string, handler http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
      // do something
      handler.ServeHTTP(w, r)
    })
  })
}

Documentation

Index

Constants

View Source
const (
	NO_SUCH_CONIFG = "no such config"
	NO_SUCH_STREAM = "no such stream"
)
View Source
const (
	SUBTYPE_RAW = iota
	SUBTYPE_RTP
	SUBTYPE_FLV
)
View Source
const (
	SUBSTATE_INIT = iota
	SUBSTATE_FIRST
	SUBSTATE_NORMAL
)

Variables

View Source
var (
	ErrDuplicatePublish = errors.New("Duplicate Publish")
	ErrBadStreamName    = errors.New("StreamPath Illegal")
	ErrBadTrackName     = errors.New("Track Already Exist")
	ErrTrackMute        = errors.New("Track Mute")
	ErrStreamIsClosed   = errors.New("Stream Is Closed")
	ErrPublisherLost    = errors.New("Publisher Lost")
	ErrAuth             = errors.New("Auth Failed")
	OnAuthSub           func(p *util.Promise[ISubscriber]) error
	OnAuthPub           func(p *util.Promise[IPublisher]) error
)
View Source
var (
	SysInfo struct {
		StartTime time.Time //启动时间
		LocalIP   string
		Version   string
	}
	ExecPath = os.Args[0]
	ExecDir  = filepath.Dir(ExecPath)
	// ConfigRaw 配置信息的原始数据
	ConfigRaw []byte
	Plugins   = make(map[string]*Plugin) // Plugins 所有的插件配置

	EngineConfig = &GlobalConfig{}
	Engine       = InstallPlugin(EngineConfig)
	SettingDir   = filepath.Join(ExecDir, ".m7s")           //配置缓存目录,该目录按照插件名称作为文件名存储修改过的配置
	MergeConfigs = []string{"Publish", "Subscribe", "HTTP"} //需要合并配置的属性项,插件若没有配置则使用全局配置
	EventBus     chan any
)
View Source
var ActionNames = [...]string{"publish", "track available", "timeout", "publish close", "close", "last leave", "first enter", "no tracks"}
View Source
var ErrNoPullConfig = errors.New("no pull config")
View Source
var ErrNoPushConfig = errors.New("no push config")
View Source
var ErrStreamNotExist = errors.New("stream not exist")
View Source
var Pullers sync.Map
View Source
var Pushers sync.Map
View Source
var StateNames = [...]string{"⌛", "🟡", "🟢", "🟠", "🔴"}
View Source
var Streams util.Map[string, *Stream]

Streams 所有的流集合

Functions

func EmitEvent

func EmitEvent[T any](event T)

func InviteTrack

func InviteTrack(name string, suber ISubscriber)

func ListenEvent

func ListenEvent[T any](handler func(event T))

func Run

func Run(ctx context.Context, conf any) (err error)

Run 启动Monibuca引擎,传入总的Context,可用于关闭所有

func TryInvitePublish

func TryInvitePublish(streamPath string)

Types

type AddTrackEvent

type AddTrackEvent struct {
	Event[common.Track]
}

type AudioDeConf

type AudioDeConf []byte

AVCC 格式的序列帧

func (AudioDeConf) WithOutRTMP

func (a AudioDeConf) WithOutRTMP() []byte

type AudioFrame

type AudioFrame struct {
	*AVFrame
	*track.Audio
	AbsTime uint32
	PTS     uint32
	DTS     uint32
}

func (AudioFrame) GetADTS

func (a AudioFrame) GetADTS() (r net.Buffers)

func (AudioFrame) WriteRawTo

func (a AudioFrame) WriteRawTo(w io.Writer) (n int64, err error)

type AudioRTP

type AudioRTP RTPFrame

type AuthPub

type AuthPub interface {
	OnAuth(*util.Promise[IPublisher]) error
}

type AuthSub

type AuthSub interface {
	OnAuth(*util.Promise[ISubscriber]) error
}

type ClientConfig

type ClientConfig interface {
	config.Pull | config.Push
}

type ClientIO

type ClientIO[C ClientConfig] struct {
	Config         *C
	StreamPath     string // 本地流标识
	RemoteURL      string // 远程服务器地址(用于推拉)
	ReConnectCount int    //重连次数
}

ClientIO 作为Client角色(Puller,Pusher)的公共结构体

type DefaultYaml

type DefaultYaml string

type ErrorEvent

type ErrorEvent struct {
	Event[any]
	Error error
}

ErrorEvent 错误事件

type Event

type Event[T any] struct {
	Time   time.Time
	Target T `json:"-" yaml:"-"`
}

func CreateEvent

func CreateEvent[T any](target T) (event Event[T])

type FLVFrame

type FLVFrame net.Buffers

func (FLVFrame) IsAudio

func (f FLVFrame) IsAudio() bool

func (FLVFrame) IsVideo

func (f FLVFrame) IsVideo() bool

func (FLVFrame) WriteTo

func (f FLVFrame) WriteTo(w io.Writer) (int64, error)

type FirstConfig

type FirstConfig *config.Config

type GlobalConfig

type GlobalConfig struct {
	config.Engine
}

func (*GlobalConfig) API_closeStream

func (conf *GlobalConfig) API_closeStream(w http.ResponseWriter, r *http.Request)

func (*GlobalConfig) API_getConfig

func (conf *GlobalConfig) API_getConfig(w http.ResponseWriter, r *http.Request)

API_getConfig 获取指定的配置信息

func (*GlobalConfig) API_insertSEI

func (conf *GlobalConfig) API_insertSEI(w http.ResponseWriter, r *http.Request)

func (*GlobalConfig) API_list_pull

func (conf *GlobalConfig) API_list_pull(w http.ResponseWriter, r *http.Request)

func (*GlobalConfig) API_list_push

func (conf *GlobalConfig) API_list_push(w http.ResponseWriter, r *http.Request)

func (*GlobalConfig) API_modifyConfig

func (conf *GlobalConfig) API_modifyConfig(w http.ResponseWriter, r *http.Request)

API_modifyConfig 修改并保存配置

func (*GlobalConfig) API_plugins

func (conf *GlobalConfig) API_plugins(rw http.ResponseWriter, r *http.Request)

func (*GlobalConfig) API_replay_mp4

func (conf *GlobalConfig) API_replay_mp4(w http.ResponseWriter, r *http.Request)

func (*GlobalConfig) API_replay_rtpdump

func (conf *GlobalConfig) API_replay_rtpdump(w http.ResponseWriter, r *http.Request)

func (*GlobalConfig) API_replay_ts

func (conf *GlobalConfig) API_replay_ts(w http.ResponseWriter, r *http.Request)

func (*GlobalConfig) API_stop_push

func (conf *GlobalConfig) API_stop_push(w http.ResponseWriter, r *http.Request)

func (*GlobalConfig) API_stop_subscribe

func (conf *GlobalConfig) API_stop_subscribe(w http.ResponseWriter, r *http.Request)

func (*GlobalConfig) API_stream

func (conf *GlobalConfig) API_stream(rw http.ResponseWriter, r *http.Request)

func (*GlobalConfig) API_summary

func (conf *GlobalConfig) API_summary(rw http.ResponseWriter, r *http.Request)

func (*GlobalConfig) API_sysInfo

func (conf *GlobalConfig) API_sysInfo(rw http.ResponseWriter, r *http.Request)

func (*GlobalConfig) API_updateConfig

func (conf *GlobalConfig) API_updateConfig(w http.ResponseWriter, r *http.Request)

API_updateConfig 热更新配置

func (*GlobalConfig) ServeHTTP

func (conf *GlobalConfig) ServeHTTP(rw http.ResponseWriter, r *http.Request)

type HasAnnexB

type HasAnnexB interface {
	GetAnnexB() (r net.Buffers)
}

type IIO

type IIO interface {
	IsClosed() bool
	OnEvent(any)
	Stop(reason ...zapcore.Field)
	SetIO(any)
	SetParentCtx(context.Context)
	SetLogger(*log.Logger)
	IsShutdown() bool
	log.Zap
	// contains filtered or unexported methods
}

type IO

type IO struct {
	ID                      string
	Type                    string
	RemoteAddr              string
	context.Context         `json:"-" yaml:"-"` //不要直接设置,应当通过SetParentCtx传入父级Context
	context.CancelCauseFunc `json:"-" yaml:"-"` //流关闭是关闭发布者或者订阅者
	*log.Logger             `json:"-" yaml:"-"`
	StartTime               time.Time //创建时间
	Stream                  *Stream   `json:"-" yaml:"-"`
	io.Reader               `json:"-" yaml:"-"`
	io.Writer               `json:"-" yaml:"-"`
	io.Closer               `json:"-" yaml:"-"`
	Args                    url.Values
	Spesific                IIO `json:"-" yaml:"-"`
}

发布者或者订阅者的共用结构体

func (*IO) IsClosed

func (io *IO) IsClosed() bool

func (*IO) IsShutdown

func (io *IO) IsShutdown() bool

func (*IO) OnEvent

func (i *IO) OnEvent(event any)

func (*IO) SetIO

func (i *IO) SetIO(conn any)

SetIO(可选) 设置Writer、Reader、Closer

func (*IO) SetLogger

func (i *IO) SetLogger(logger *log.Logger)

func (*IO) SetParentCtx

func (i *IO) SetParentCtx(parent context.Context)

SetParentCtx(可选)

func (*IO) Stop

func (io *IO) Stop(reason ...zapcore.Field)

Stop 停止订阅或者发布,由订阅者或者发布者调用

type IOConfig

type IOConfig interface {
	config.Publish | config.Subscribe
}

type IPublisher

type IPublisher interface {
	IIO
	GetPublisher() *Publisher

	Publish(streamPath string, pub IPublisher) error
	// contains filtered or unexported methods
}

type IPuller

type IPuller interface {
	IPublisher
	Connect() error
	OnConnected()
	Disconnect()
	Pull() error
	Reconnect() bool
	// contains filtered or unexported methods
}

type IPusher

type IPusher interface {
	ISubscriber
	Push() error
	Connect() error
	Disconnect()

	Reconnect() bool
	// contains filtered or unexported methods
}

type ISubscriber

type ISubscriber interface {
	IIO
	GetSubscriber() *Subscriber
	IsPlaying() bool
	PlayRaw()
	PlayBlock(byte)
	PlayFLV()
	Stop(reason ...zapcore.Field)
	Subscribe(streamPath string, sub ISubscriber) error
}

type InvitePublish

type InvitePublish struct {
	Event[string]
}

InvitePublishEvent 邀请推流事件(按需拉流)

type InviteTrackEvent

type InviteTrackEvent struct {
	Event[string]
	ISubscriber
}

InviteTrackEvent 邀请推送指定 Track 事件(转码需要)

type MP4Publisher

type MP4Publisher struct {
	Publisher
	*mp4.MovDemuxer `json:"-" yaml:"-"`
}

func (*MP4Publisher) ReadMP4Data

func (p *MP4Publisher) ReadMP4Data(source io.ReadSeeker) error

Start reading the MP4 file

type MemoryTs

type MemoryTs struct {
	util.BytesPool
	PMT util.Buffer
	util.BLL
}

func (*MemoryTs) WriteAudioFrame

func (ts *MemoryTs) WriteAudioFrame(frame AudioFrame, pes *mpegts.MpegtsPESFrame) (err error)

func (*MemoryTs) WritePESPacket

func (ts *MemoryTs) WritePESPacket(frame *mpegts.MpegtsPESFrame, packet mpegts.MpegTsPESPacket) (err error)

func (*MemoryTs) WritePMTPacket

func (ts *MemoryTs) WritePMTPacket(audio codec.AudioCodecID, video codec.VideoCodecID)

func (*MemoryTs) WriteTo

func (ts *MemoryTs) WriteTo(w io.Writer) (int64, error)

func (*MemoryTs) WriteVideoFrame

func (ts *MemoryTs) WriteVideoFrame(frame VideoFrame, pes *mpegts.MpegtsPESFrame) (err error)

type NetWorkInfo

type NetWorkInfo struct {
	Name         string
	Receive      uint64
	Sent         uint64
	ReceiveSpeed uint64
	SentSpeed    uint64
}

NetWorkInfo 网速信息

type NoMoreTrack

type NoMoreTrack struct{}

type Plugin

type Plugin struct {
	context.Context    `json:"-" yaml:"-"`
	context.CancelFunc `json:"-" yaml:"-"`
	Name               string        //插件名称
	Config             config.Plugin `json:"-" yaml:"-"` //类型化的插件配置
	Version            string        //插件版本
	RawConfig          config.Config //最终合并后的配置的map形式方便查询

	*log.Logger `json:"-" yaml:"-"`

	Disabled bool
	// contains filtered or unexported fields
}

Plugin 插件信息

func InstallPlugin

func InstallPlugin(config config.Plugin, options ...any) *Plugin

InstallPlugin 安装插件,传入插件配置生成插件信息对象

func (*Plugin) AssignPubConfig

func (opt *Plugin) AssignPubConfig(puber *Publisher)

func (*Plugin) AssignSubConfig

func (opt *Plugin) AssignSubConfig(suber *Subscriber)

func (*Plugin) Publish

func (opt *Plugin) Publish(streamPath string, pub IPublisher) error

func (*Plugin) Pull

func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save int) (err error)

func (*Plugin) Push

func (opt *Plugin) Push(streamPath string, url string, pusher IPusher, save bool) (err error)

func (*Plugin) Save

func (opt *Plugin) Save() error

func (*Plugin) Subscribe

func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error

Subscribe 订阅一个流,如果流不存在则创建一个等待流

func (*Plugin) SubscribeBlock

func (opt *Plugin) SubscribeBlock(streamPath string, sub ISubscriber, t byte) (err error)

SubscribeBlock 阻塞订阅一个流,直到订阅结束

func (*Plugin) SubscribeExist

func (opt *Plugin) SubscribeExist(streamPath string, sub ISubscriber) error

SubscribeExist 订阅已经存在的流

func (*Plugin) Update

func (opt *Plugin) Update(conf *config.Config)

Update 热更新配置

type Publisher

type Publisher struct {
	IO
	Config            *config.Publish
	common.AudioTrack `json:"-" yaml:"-"`
	common.VideoTrack `json:"-" yaml:"-"`
}

func (*Publisher) Equal

func (p *Publisher) Equal(p2 IPublisher) bool

func (*Publisher) GetPublisher

func (p *Publisher) GetPublisher() *Publisher

func (*Publisher) Publish

func (p *Publisher) Publish(streamPath string, pub IPublisher) error

func (*Publisher) WriteAVCCAudio

func (p *Publisher) WriteAVCCAudio(ts uint32, frame *util.BLL, pool util.BytesPool)

func (*Publisher) WriteAVCCVideo

func (p *Publisher) WriteAVCCVideo(ts uint32, frame *util.BLL, pool util.BytesPool)

type Puller

type Puller struct {
	ClientIO[config.Pull]
}

用于远程拉流的发布者

func (*Puller) OnConnected

func (pub *Puller) OnConnected()

func (*Puller) Reconnect

func (pub *Puller) Reconnect() (ok bool)

是否需要重连

type PulseEvent

type PulseEvent struct {
	Event[struct{}]
}

PulseEvent 心跳事件

type Pusher

type Pusher struct {
	ClientIO[config.Push]
}

func (*Pusher) Reconnect

func (pub *Pusher) Reconnect() (result bool)

是否需要重连

type RTPDumpPublisher

type RTPDumpPublisher struct {
	Publisher
	VCodec       codec.VideoCodecID
	ACodec       codec.AudioCodecID
	VPayloadType uint8
	APayloadType uint8

	sync.Mutex
	// contains filtered or unexported fields
}

func (*RTPDumpPublisher) Feed

func (t *RTPDumpPublisher) Feed(file *os.File)

func (*RTPDumpPublisher) WriteRTP

func (t *RTPDumpPublisher) WriteRTP(raw []byte)

type SEKick

type SEKick struct {
	Event[struct{}]
}

type SEclose

type SEclose struct {
	StateEvent
}

type SEcreate

type SEcreate struct {
	StreamEvent
}

type SEpublish

type SEpublish struct {
	StateEvent
}

type SErepublish

type SErepublish struct {
	StateEvent
}

type SEtrackAvaliable

type SEtrackAvaliable struct {
	StateEvent
}

type SEwaitClose

type SEwaitClose struct {
	StateEvent
}

type SEwaitPublish

type SEwaitPublish struct {
	StateEvent
	Publisher IPublisher
}

type StateEvent

type StateEvent struct {
	StreamEvent
	Action StreamAction
	From   StreamState
}

StateEvent 状态机事件

func (StateEvent) Next

func (se StateEvent) Next() (next StreamState, ok bool)

type StopError

type StopError []zapcore.Field

func (StopError) Error

func (s StopError) Error() string

type Stream

type Stream struct {
	ID uint32 // 流ID
	*log.Logger
	StartTime time.Time //创建时间
	StreamTimeoutConfig
	Path        string
	Publisher   IPublisher
	State       StreamState
	SEHistory   []StateEvent // 事件历史
	Subscribers Subscribers  // 订阅者
	Tracks      Tracks
	AppName     string
	StreamName  string
	IsPause     bool // 是否处于暂停状态
	// contains filtered or unexported fields
}

Stream 流定义

func FilterStreams

func FilterStreams[T IPublisher]() (ss []*Stream)

func (*Stream) AddTrack

func (s *Stream) AddTrack(t Track) (promise *util.Promise[Track])

func (*Stream) Close

func (r *Stream) Close()

func (*Stream) GetPublisherConfig

func (s *Stream) GetPublisherConfig() *config.Publish

func (*Stream) GetStartTime

func (s *Stream) GetStartTime() time.Time

func (*Stream) GetType

func (s *Stream) GetType() string

func (*Stream) IsClosed

func (r *Stream) IsClosed() bool

func (*Stream) IsShutdown

func (r *Stream) IsShutdown() bool

func (*Stream) Pause

func (s *Stream) Pause()

func (*Stream) Receive

func (s *Stream) Receive(event any) bool

func (*Stream) RemoveTrack

func (s *Stream) RemoveTrack(t Track)

func (*Stream) Resume

func (s *Stream) Resume()

func (*Stream) SSRC

func (s *Stream) SSRC() uint32

func (*Stream) SetIDR

func (s *Stream) SetIDR(video Track)

func (*Stream) Summary

func (s *Stream) Summary() (r StreamSummay)

Summary 返回流的简要信息

type StreamAction

type StreamAction byte
const (
	ACTION_PUBLISH        StreamAction = iota
	ACTION_TRACKAVAILABLE              // 音视频轨道激活
	ACTION_TIMEOUT                     // 发布流长时间没有数据/长时间没有发布者发布流/等待关闭时间到
	ACTION_PUBLISHCLOSE                // 发布者关闭
	ACTION_CLOSE                       // 主动关闭流
	ACTION_LASTLEAVE                   // 最后一个订阅者离开
	ACTION_FIRSTENTER                  // 第一个订阅者进入
	ACTION_NOTRACK                     // 没有音视频轨道
)

func (StreamAction) String

func (s StreamAction) String() string

type StreamEvent

type StreamEvent struct {
	Event[*Stream]
}

type StreamState

type StreamState byte
const (
	STATE_WAITPUBLISH StreamState = iota // 等待发布者状态
	STATE_WAITTRACK                      // 等待音视频轨道激活
	STATE_PUBLISHING                     // 正在发布流状态
	STATE_WAITCLOSE                      // 等待关闭状态(自动关闭延时开启)
	STATE_CLOSED                         // 流已关闭,不可使用
)

四状态机

func (StreamState) String

func (s StreamState) String() string

type StreamSummay

type StreamSummay struct {
	Path        string
	State       StreamState
	Subscribers int
	Tracks      []string
	StartTime   time.Time
	Type        string
	BPS         int
}

type StreamTimeoutConfig

type StreamTimeoutConfig struct {
	PublishTimeout    time.Duration //发布者无数据后超时
	DelayCloseTimeout time.Duration //无订阅者后超时,必须先有一次订阅才会激活
	IdleTimeout       time.Duration //无订阅者后超时,不需要订阅即可激活
	PauseTimeout      time.Duration //暂停后超时
	NeverTimeout      bool          // 永不超时
}

type SubPulse

type SubPulse struct {
	ISubscriber
}

type Subscriber

type Subscriber struct {
	IO
	Config *config.Subscribe

	TrackPlayer `json:"-" yaml:"-"`
	// contains filtered or unexported fields
}

Subscriber 订阅者实体定义

func (*Subscriber) AddTrack

func (s *Subscriber) AddTrack(t Track) bool

func (*Subscriber) CreateTrackReader

func (s *Subscriber) CreateTrackReader(t *track.Media) (result *track.AVRingReader)

func (*Subscriber) GetSubscriber

func (s *Subscriber) GetSubscriber() *Subscriber

func (*Subscriber) IsPlaying

func (s *Subscriber) IsPlaying() bool

func (*Subscriber) OnEvent

func (s *Subscriber) OnEvent(event any)

func (*Subscriber) PlayBlock

func (s *Subscriber) PlayBlock(subType byte)

PlayBlock 阻塞式读取数据

func (*Subscriber) PlayFLV

func (s *Subscriber) PlayFLV()

func (*Subscriber) PlayRTP

func (s *Subscriber) PlayRTP()

func (*Subscriber) PlayRaw

func (s *Subscriber) PlayRaw()

func (*Subscriber) SetIO

func (s *Subscriber) SetIO(i any)

func (*Subscriber) SubPulse

func (s *Subscriber) SubPulse()

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(streamPath string, sub ISubscriber) error

type Subscribers

type Subscribers struct {
	// contains filtered or unexported fields
}

func (*Subscribers) AbortWait

func (s *Subscribers) AbortWait()

func (*Subscribers) Add

func (s *Subscribers) Add(suber ISubscriber, wait *waitTracks)

func (*Subscribers) Broadcast

func (s *Subscribers) Broadcast(event any)

func (*Subscribers) Delete

func (s *Subscribers) Delete(suber ISubscriber)

func (*Subscribers) Dispose

func (s *Subscribers) Dispose()

func (*Subscribers) Find

func (s *Subscribers) Find(id string) ISubscriber

func (*Subscribers) Init

func (s *Subscribers) Init()

func (*Subscribers) Len

func (s *Subscribers) Len() int

func (*Subscribers) MarshalJSON

func (s *Subscribers) MarshalJSON() ([]byte, error)

func (*Subscribers) OnPublisherLost

func (s *Subscribers) OnPublisherLost(event StateEvent)

func (*Subscribers) OnTrack

func (s *Subscribers) OnTrack(track common.Track)

func (*Subscribers) Pick

func (s *Subscribers) Pick() ISubscriber

func (*Subscribers) RangeAll

func (s *Subscribers) RangeAll(f func(sub ISubscriber))

func (*Subscribers) SendInviteTrack

func (s *Subscribers) SendInviteTrack(stream *Stream)

SendInviteTrack 广播需要的 Track(转码插件可以用到)

type Summary

type Summary struct {
	Address string
	Memory  struct {
		Total uint64
		Free  uint64
		Used  uint64
		Usage float64
	}
	CPUUsage float64
	HardDisk struct {
		Total uint64
		Free  uint64
		Used  uint64
		Usage float64
	}
	NetWork []NetWorkInfo
	Streams []StreamSummay
	// contains filtered or unexported fields
}

ServerSummary 系统摘要定义

func (*Summary) Report

func (s *Summary) Report(slave *Summary)

Report 上报数据

type SummaryUtil

type SummaryUtil Summary

func (*SummaryUtil) MarshalJSON

func (s *SummaryUtil) MarshalJSON() ([]byte, error)

func (*SummaryUtil) MarshalYAML

func (s *SummaryUtil) MarshalYAML() (any, error)

type TSPublisher

type TSPublisher struct {
	Publisher
	// contains filtered or unexported fields
}

func (*TSPublisher) OnEvent

func (t *TSPublisher) OnEvent(event any)

func (*TSPublisher) OnPmtStream

func (t *TSPublisher) OnPmtStream(s mpegts.MpegTsPmtStream)

type TSReader

type TSReader struct {
	*TSPublisher
	mpegts.MpegTsStream
}

func NewTSReader

func NewTSReader(pub *TSPublisher) (r *TSReader)

func (*TSReader) Close

func (t *TSReader) Close()

func (*TSReader) ReadPES

func (t *TSReader) ReadPES()

type TrackPlayer

type TrackPlayer struct {
	context.Context
	context.CancelFunc
	AudioReader, VideoReader *track.AVRingReader
	Audio                    *track.Audio
	Video                    *track.Video
}

type TrackRemoved

type TrackRemoved struct {
	Track
}

type Tracks

type Tracks struct {
	sync.Map
	MainVideo *track.Video
	MainAudio *track.Audio
	SEI       *track.Data[[]byte]
	// contains filtered or unexported fields
}

func (*Tracks) Add

func (tracks *Tracks) Add(name string, t Track) bool

func (*Tracks) AddSEI

func (tracks *Tracks) AddSEI(t byte, data []byte) bool

func (*Tracks) MarshalJSON

func (tracks *Tracks) MarshalJSON() ([]byte, error)

func (*Tracks) Range

func (tracks *Tracks) Range(f func(name string, t Track))

func (*Tracks) SetIDR

func (tracks *Tracks) SetIDR(video Track)

type Unsubscribe

type Unsubscribe ISubscriber

type UnsubscribeEvent

type UnsubscribeEvent struct {
	Event[ISubscriber]
}

type UpdateConfig

type UpdateConfig *config.Config

type VideoDeConf

type VideoDeConf []byte

AVCC 格式的序列帧

func (VideoDeConf) WithOutRTMP

func (v VideoDeConf) WithOutRTMP() []byte

type VideoFrame

type VideoFrame struct {
	*AVFrame
	*track.Video
	AbsTime uint32
	PTS     uint32
	DTS     uint32
}

func (VideoFrame) GetAnnexB

func (v VideoFrame) GetAnnexB() (r net.Buffers)

func (VideoFrame) WriteAnnexBTo

func (v VideoFrame) WriteAnnexBTo(w io.Writer) (n int64, err error)

type VideoRTP

type VideoRTP RTPFrame

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL