autotee

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 22, 2016 License: BSD-3-Clause Imports: 31 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var UseDefaults = func(interface{}) error { return nil }
View Source
var Version = "devel"

Overridden by build.sh

Functions

func Catch

func Catch(err error) error

func IsExit added in v0.4.0

func IsExit(errFromWait error) bool

Takes an error returned from Wait() and determines if the program has exited.

func Main

func Main()

func ShowConfigMain

func ShowConfigMain(config *Config) error

func ShowIdleness

func ShowIdleness(d time.Duration)

func ShowStreamsMain

func ShowStreamsMain(config *Config) error

func TickNow

func TickNow(d time.Duration) <-chan time.Time

Like time.Tick, but the first tick comes immediately.

func WatchChannel

func WatchChannel(in <-chan *BufPoolElem, timeout time.Duration, notify func()) <-chan *BufPoolElem

Types

type App

type App struct {
	Config *Config
	Flows  map[string][]*Flow
	// contains filtered or unexported fields
}

func NewApp

func NewApp(ctx context.Context, config *Config) *App

func (*App) Run

func (app *App) Run() error

type BaseScreenService

type BaseScreenService struct {
	// contains filtered or unexported fields
}

type BufPool

type BufPool struct {

	// After receiving a buffer from the channel, you *must* call AcquireFirst().
	C <-chan *BufPoolElem
	// contains filtered or unexported fields
}

A fixed-size pool of fixed-size byte buffers.

func NewBufPool

func NewBufPool(nbuf, bufsize int) *BufPool

Create a new BufPool of `nbuf` elements of `bufsize` bytes each.

func (*BufPool) IsFull

func (bp *BufPool) IsFull() bool

Returns true if all buffers are currently in the pool.

type BufPoolElem

type BufPoolElem struct {
	// contains filtered or unexported fields
}

A reusable fixed-size byte buffer that is a member of BufPool.

func (*BufPoolElem) Acquire

func (elem *BufPoolElem) Acquire(refs int32)

Increase reference count by `refs`.

Passing a negative value causes panic. Calling when the reference count has already reached 0 causes panic.

Fully thread-safe.

func (*BufPoolElem) AcquireFirst

func (elem *BufPoolElem) AcquireFirst()

Must be called after the buffer has been received from the pool.

Passing a negative value causes panic. Calling when the reference count has already reached 0 causes panic.

Fully thread-safe.

func (*BufPoolElem) Free

func (elem *BufPoolElem) Free()

Decrease reference count.

When the reference count reaches 0, the buffer is considered fully freed and it will be returned into the pool, so it must not be used anymore and all references to it must be relinquished.

Calling when the reference count has already reached 0 causes panic.

Fully thread-safe.

func (*BufPoolElem) GetBuffer

func (elem *BufPoolElem) GetBuffer() []byte

Returns a slice of bytes that can be read and written to.

The size of the slice can be set via SetSize(). Initially, the size will be the maximum size, see GetMaxSize().

Thread-safe if the "current size" isn't changed at the same time.

func (*BufPoolElem) GetMaxSize

func (elem *BufPoolElem) GetMaxSize() int

Gets the maximum size of the buffer.

It is the same for all pool elements and does not change.

func (*BufPoolElem) SetSize

func (elem *BufPoolElem) SetSize(n int)

Sets the size of the buffer.

GetBuffer() will return a slice of this size.

Trying to set a size larger than the maximum size (see GetMaxSize()) causes panic.

type BufferConfig

type BufferConfig struct {
	BufferCount int `yaml:"buffer_count"`
}

type BufferPoolConfig

type BufferPoolConfig struct {
	BufferCount int `yaml:"buffer_count"`
	BufferSize  int `yaml:"buffer_size"`
}

type Cmd

type Cmd struct {
	// contains filtered or unexported fields
}

func Command

func Command(name string, args ...string) *Cmd

func (*Cmd) End

func (c *Cmd) End() error

Graceful termination. Blocking!

func (*Cmd) EndWith

func (c *Cmd) EndWith(otherCmd *Cmd)

func (*Cmd) KillGroup

func (c *Cmd) KillGroup() error

func (*Cmd) SetStderr

func (c *Cmd) SetStderr(w io.Writer)

func (*Cmd) SetStdin

func (c *Cmd) SetStdin(r io.Reader)

func (*Cmd) Start

func (c *Cmd) Start() error

func (*Cmd) StdinPipe

func (c *Cmd) StdinPipe() (io.WriteCloser, error)

func (*Cmd) StdoutPipe

func (c *Cmd) StdoutPipe() (io.ReadCloser, error)

func (*Cmd) WaitChannel

func (c *Cmd) WaitChannel() <-chan error

type CmdData

type CmdData struct {
	Name string
	Args []string
}

func NewCmdData

func NewCmdData(line string) (cd CmdData, err error)

func (*CmdData) Equals

func (cd *CmdData) Equals(other CmdData) bool

func (*CmdData) NewCmd

func (cd *CmdData) NewCmd() *Cmd

func (*CmdData) Replace

func (cd *CmdData) Replace(replacements map[string]string) (result CmdData)

func (*CmdData) String

func (cd *CmdData) String() string

type Config

type Config struct {
	Debug        bool
	Server       ServerConfig
	Metrics      MetricsConfig
	SourceBuffer BufferPoolConfig
	SinkBuffer   BufferConfig
	Flows        map[string]FlowConfig
	Times        TimeConfig
	Misc         MiscConfig
}

func LoadConfig

func LoadConfig(path string) (*Config, error)

func (*Config) Dump

func (c *Config) Dump() ([]byte, error)

func (*Config) UnmarshalYAML

func (tc *Config) UnmarshalYAML(unmarshal func(interface{}) error) error

type ExclusiveScreenService

type ExclusiveScreenService struct {
	BaseScreenService
}

func (*ExclusiveScreenService) Done

func (s *ExclusiveScreenService) Done() error

func (*ExclusiveScreenService) Screen

func (s *ExclusiveScreenService) Screen() (*Screen, error)

func (*ExclusiveScreenService) Stop

func (s *ExclusiveScreenService) Stop()

type Flow

type Flow struct {
	// contains filtered or unexported fields
}

func NewFlow

func NewFlow(ctx context.Context, name string, config *Config, sourceCmd CmdData, sinkCmds map[string]CmdData, entry *log.Entry) *Flow

func (*Flow) Start

func (f *Flow) Start()

Must only be called once. Does not block.

func (*Flow) Stop

func (f *Flow) Stop()

Stop ends all processes and goroutines. Idempotent. Blocks.

type FlowCmdData

type FlowCmdData struct {
	CmdData
	// contains filtered or unexported fields
}

type FlowConfig

type FlowConfig struct {
	Regexp *regexp.Regexp
	Source CmdData
	Sinks  map[string]CmdData
}

func (*FlowConfig) UnmarshalYAML

func (fc *FlowConfig) UnmarshalYAML(unmarshal func(interface{}) error) (err error)

type Icecast

type Icecast struct {
	// contains filtered or unexported fields
}

func (*Icecast) GetActiveStreams

func (nr *Icecast) GetActiveStreams() (mapset.Set, error)

Get a list of active streams of a specific app from an Icecast server.

type IcecastUrls

type IcecastUrls struct {
	mapset.Set
}

func (*IcecastUrls) UnmarshalJSON

func (iu *IcecastUrls) UnmarshalJSON(bytes []byte) error

type InfluxConfig

type InfluxConfig struct {
	Host     string `yaml:"host"`
	Database string `yaml:"database"`
	Username string `yaml:"username"`
	Password string `yaml:"password"`
}

type MetricsConfig

type MetricsConfig struct {
	Influx *InfluxConfig `yaml:"influx"`
}

type MiscConfig

type MiscConfig struct {
	ReuseScreens        bool
	RestartWhenSinkDies bool
}

func (*MiscConfig) UnmarshalYAML

func (mc *MiscConfig) UnmarshalYAML(unmarshal func(interface{}) error) error

type NginxRtmp

type NginxRtmp struct {
	// contains filtered or unexported fields
}

func (*NginxRtmp) GetActiveStreams

func (nr *NginxRtmp) GetActiveStreams() (mapset.Set, error)

Get a list of active streams of a specific app from an nginx-rtmp server.

type RestartableTimer

type RestartableTimer struct {
	C <-chan time.Time
	// contains filtered or unexported fields
}

func NewRestartableTimer

func NewRestartableTimer(d time.Duration) RestartableTimer

func (*RestartableTimer) Restart

func (st *RestartableTimer) Restart()

func (*RestartableTimer) Start

func (st *RestartableTimer) Start()

func (*RestartableTimer) Stop

func (st *RestartableTimer) Stop()

type Screen

type Screen struct {
	Name string
	File *os.File
}

type ScreenService

type ScreenService interface {
	Screen() (*Screen, error)
	Done() error
	Stop()
}

func NewExclusiveScreenService

func NewExclusiveScreenService(name string) ScreenService

func NewSharedScreenService

func NewSharedScreenService(name string) ScreenService

type Server

type Server interface {
	GetActiveStreams() (mapset.Set, error)
}

func NewIcecast

func NewIcecast(config *Config) Server

func NewNginxRtmp

func NewNginxRtmp(config *Config) Server

func NewStaticStreamList

func NewStaticStreamList(config *Config) Server

type ServerConfig

type ServerConfig struct {
	NewServer ServerFactory
	Url       string
	App       string
	XPath     *xmlpath.Path
	Streams   mapset.Set
}

func (*ServerConfig) UnmarshalYAML

func (sc *ServerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error

type ServerFactory

type ServerFactory func(config *Config) Server

type SharedScreenService

type SharedScreenService struct {
	BaseScreenService
}

func (*SharedScreenService) Done

func (s *SharedScreenService) Done() error

func (*SharedScreenService) Screen

func (s *SharedScreenService) Screen() (*Screen, error)

func (*SharedScreenService) Stop

func (s *SharedScreenService) Stop()

type Sink

type Sink struct {
	// contains filtered or unexported fields
}

func NewSink

func NewSink(ctx context.Context, entry *log.Entry, name string, command CmdData, config *BufferConfig, screen *Screen) *Sink

func (*Sink) Channel

func (s *Sink) Channel() chan<- *BufPoolElem

func (*Sink) DeathBarrier

func (s *Sink) DeathBarrier() <-chan struct{}

func (*Sink) Kill

func (s *Sink) Kill()

Doesn't block.

func (*Sink) Start

func (s *Sink) Start() (err error)

Must only be called once. Blocks.

func (*Sink) Stop

func (s *Sink) Stop()

Blocks.

type SinkCmdData

type SinkCmdData struct {
	Screens ScreenService
	Command CmdData
}

type SinkSet

type SinkSet struct {
	// contains filtered or unexported fields
}

SinkSet starts and supervises multiple sinks.

func NewSinkSet

func NewSinkSet(ctx context.Context, commands map[string]SinkCmdData, buffers <-chan *BufPoolElem, config *Config, entry *log.Entry) *SinkSet

func (*SinkSet) AnySinkDied

func (ss *SinkSet) AnySinkDied() <-chan struct{}

func (*SinkSet) Start

func (ss *SinkSet) Start()

Start makes the SinkSet begin managing source and sinks and forwarding data.

Must only be called once. Does not block.

func (*SinkSet) Stop

func (ss *SinkSet) Stop()

Stop ends all processes and goroutines.

Idempotent. Blocks until all processes have been killed and goroutines are exiting.

type Source

type Source struct {
	// contains filtered or unexported fields
}

func NewSource

func NewSource(ctx context.Context, name string, command CmdData, config *Config, entry *log.Entry, bufpool *BufPool, screen *Screen) *Source

func (*Source) Channel

func (s *Source) Channel() <-chan *BufPoolElem

func (*Source) DeathBarrier

func (s *Source) DeathBarrier() <-chan struct{}

func (*Source) Kill

func (s *Source) Kill()

Doesn't block.

func (*Source) Start

func (s *Source) Start() (err error)

func (*Source) Stop

func (s *Source) Stop()

Blocks.

type StaticStreamList

type StaticStreamList struct {
	// contains filtered or unexported fields
}

func (*StaticStreamList) GetActiveStreams

func (nr *StaticStreamList) GetActiveStreams() (mapset.Set, error)

type TimeConfig

type TimeConfig struct {
	SourceRestartDelay   time.Duration
	SourceTimeout        time.Duration
	SinkRestartDelay     time.Duration
	ServerPollInterval   time.Duration
	ServerRequestTimeout time.Duration
	ServerTimeout        time.Duration
	IdleTime             time.Duration
}

func (*TimeConfig) UnmarshalYAML

func (tc *TimeConfig) UnmarshalYAML(unmarshal func(interface{}) error) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL