streaming

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2024 License: GPL-3.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidProtocol is thrown when an invalid protocol was specified.
	// See the docs and example config for a list of supported protocols.
	ErrInvalidProtocol = errors.New("restreamer: unsupported protocol")
	// ErrNoConnection is thrown when trying to read
	// from a stream that is not connected
	ErrNoConnection = errors.New("restreamer: socket not connected")
	// ErrAlreadyConnected is thrown when trying to
	// connect to an already established upstream socket
	ErrAlreadyConnected = errors.New("restreamer: socket is already connected")
	// ErrInvalidResponse is thrown when an unsupported
	// HTTP response code was received
	ErrInvalidResponse = errors.New("restreamer: unsupported response code")
	// ErrNoUrl is thrown when the list of upstream URLs was empty
	ErrNoUrl = errors.New("restreamer: no parseable upstream URL")
)
View Source
var (
	ErrNoLength      = errors.New("restreamer: Fetching of remote resource with unknown length not supported")
	ErrLimitExceeded = errors.New("restreamer: Resource too large for cache")
	ErrShortRead     = errors.New("restreamer: Short read, not all data was transferred in one go")
)
View Source
var (
	// ErrAlreadyRunning is thrown when trying to connect a stream that is already online.
	ErrAlreadyRunning = errors.New("restreamer: service is already active")
	// ErrNotRunning is thrown trying to shut down a stopped stream.
	ErrNotRunning = errors.New("restreamer: service is not running")
	// ErrOffline is thrown when receiving a connection while the stream is offline
	ErrOffline = errors.New("restreamer: refusing connection on an offline stream")
	// ErrSlowRead is logged (not thrown) when a client can not handle the bandwidth.
	ErrSlowRead = errors.New("restreamer: send buffer overrun, increase client bandwidth")
	// ErrPoolFull is logged when the connection pool is full.
	ErrPoolFull = errors.New("restreamer: maximum number of active connections exceeded")
)

Functions

func Etag

func Etag(data []byte) string

Etag calculates a hash value of data and returns it as a hex string. Suitable for HTTP Etags.

func Get

func Get(url *url.URL, timeout time.Duration) (reader io.Reader, header http.Header, status int, length int64, err error)

Get opens a remote or local resource specified by URL and returns a reader, upstream HTTP headers, an HTTP status code and the resource data length, or -1 if no length is available. Local resources contain guessed data. Supported protocols: file, http and https.

func ServeStreamError

func ServeStreamError(writer http.ResponseWriter, status int)

ServeStreamError returns an appropriate error response to the client.

Types

type AccessController

type AccessController struct {
	// contains filtered or unexported fields
}

AccessController implements a connection broker that limits the maximum number of concurrent connections.

func NewAccessController

func NewAccessController(maxconnections uint) *AccessController

NewAccessController creates a connection broker object that handles access control according to the number of connected clients.

func (*AccessController) Accept

func (control *AccessController) Accept(remoteaddr string, streamer *Streamer) bool

Accept accepts an incoming connection when the maximum number of open connections has not been reached yet.

func (*AccessController) Release

func (control *AccessController) Release(streamer *Streamer)

Release decrements the open connections count.

func (*AccessController) SetInhibit added in v0.6.0

func (control *AccessController) SetInhibit(inhibit bool)

SetInhibit allows setting and clearing the inhibit flag. If it is set, no further connections are accepted, irrespective of the maxconnections limit.

type Client

type Client struct {

	// Wait is the time before reconnecting a disconnected upstream.
	// This is a deadline: If a connection (or connection attempt) takes longer
	// than this duration, a reconnection is attempted immediately.
	Wait time.Duration
	// ReadTimeout is the timeout for individual packet reads
	ReadTimeout time.Duration
	// contains filtered or unexported fields
}

Client implements a streaming HTTP client with failover support.

Logging specification:

{
  "time": 1234 | unix timestamp in UTCS,
  "module": "client",
  "event": "" | error or upstream-connect or upstream-disconnect or upstream-loss or upstream-timeout or upstream-offline or client-streaming or client-stopped,
  when event=error:
    "error": "error-name"
    "error-specific key": "error-specific data"
  when event=retry:
    "retry: "99999" | seconds until retry
  when event=upstream-*:
    "url": "http://upstream/url" | upstream stream URL,
  when event=client-*:
    "client": "1.2.3.4:12" | client ip:port,
}

func NewClient

func NewClient(name string, uris []string, streamer *Streamer, timeout uint, reconnect uint, readtimeout uint, qsize uint, intf string, bufferSize uint, packetSize uint) (*Client, error)

NewClient constructs a new streaming HTTP client, without connecting the socket yet. You need to call Connect() to do that.

After a connection has been closed, the client will attempt to reconnect after a configurable delay. This delay is cumulative; if a connection has been up for longer, a reconnect will be attempted immediately.

Arguments:

name: a unique name for this streaming client, used for metrics and logging
uris: a list of upstream URIs, used in random order
queue: the outgoing packet queue
timeout: the connect timeout
reconnect: the minimal reconnect delay
readtimeout: the read timeout
qsize: the input queue size
intf: the network interface to create multicast connections on
bufferSize: the UDP socket receive buffer size
packetSize: the UDP packet size

func (*Client) Close

func (client *Client) Close() error

Close closes the active upstream connection.

This will cause the streaming thread to fail and try to reestablish a connection (unless reconnects are disabled).

func (*Client) Connect

func (client *Client) Connect()

Connect spawns the connection loop.

Do not call this method multiple times!

func (*Client) Connected

func (client *Client) Connected() bool

Connected returns true if the socket is connected.

func (*Client) SetCollector

func (client *Client) SetCollector(stats metrics.Collector)

SetCollector assigns a stats collector.

func (*Client) SetInhibit added in v0.6.0

func (client *Client) SetInhibit(inhibit bool)

SetInhibit calls the SetInhibit function on the attached streamer.

func (*Client) Status

func (client *Client) Status() string

Status returns the HTTP status message, or the empty string if not connected.

func (*Client) StatusCode

func (client *Client) StatusCode() int

StatusCode returns the HTTP status code, or 0 if not connected.

type Command

type Command int

Command is one of several possible constants. See StreamerCommandAdd for more information.

const (

	// StreamerCommandAdd signals a stream to add a connection.
	StreamerCommandAdd Command
	// StreamerCommandRemove signals a stream to remove a connection.
	StreamerCommandRemove
	// StreamerCommandInhibit signals that all connections should be closed
	// and not further connections should be allowed
	StreamerCommandInhibit
	// StreamerCommandAllow signals that new connections should be allowed
	StreamerCommandAllow
)

type Connection

type Connection struct {
	// Queue is the per-connection packet queue
	Queue chan protocol.MpegTsPacket
	// ClientAddress is the remote client address
	ClientAddress string

	// Closed is true if Serve was ended because of a closed channel.
	// This is simply there to avoid a double close.
	Closed bool
	// contains filtered or unexported fields
}

Connection is a single active client connection.

This is meant to be called directly from a ServeHTTP handler. No separate thread is created.

func NewConnection

func NewConnection(destination http.ResponseWriter, qsize int, clientaddr string, ctx context.Context) *Connection

NewConnection creates a new connection object. To start sending data to a client, call Serve().

clientaddr should point to the remote address of the connecting client and will be used for logging.

func (*Connection) Serve

func (conn *Connection) Serve(preamble []byte)

Serve starts serving data to a client, continuously feeding packets from the queue. An optional preamble buffer can be passed that will be sent before streaming the live payload (but after the HTTP response headers).

type ConnectionBroker

type ConnectionBroker interface {
	// Accept will be called on each incoming connection,
	// with the remote client address and the streamer that wants to accept the connection.
	Accept(remoteaddr string, streamer *Streamer) bool
	// Release will be called each time a client disconnects.
	// The streamer argument corresponds to a streamer that has previously called Accept().
	Release(streamer *Streamer)
}

ConnectionBroker represents a policy handler for new connections. It is used to determine if new connections can be accepted, based on arbitrary rules.

type ConnectionRequest

type ConnectionRequest struct {
	// Command is the command to execute
	Command Command
	// Address is the remote client address
	Address string
	// Connection is the connection to add (if this is an Add command)
	Connection *Connection
	// Waiter is a WaitGroup that can be used to track handling of the connection
	// in the streaming thread. If it is non-nil, the streamer will signal
	// Done once the request has been handled.
	Waiter *sync.WaitGroup
	// Ok tells the caller if a connection was handled without error.
	// You should always wait on the Waiter before checking it.
	Ok bool
}

ConnectionRequest encapsulates a request that new connection be added or removed.

type Proxy

type Proxy struct {
	// contains filtered or unexported fields
}

Proxy implements a caching HTTP proxy.

func NewProxy

func NewProxy(uri string, timeout uint, cache uint, auth auth.Authenticator) (*Proxy, error)

NewProxy constructs a new HTTP proxy. The upstream resource is not fetched until the first request. If cache is non-zero, the resource will be evicted from memory after these number of seconds. If it is zero, the resource will be fetched from upstream every time it is requested. timeout sets the upstream HTTP connection timeout.

func (*Proxy) ServeHTTP

func (proxy *Proxy) ServeHTTP(writer http.ResponseWriter, request *http.Request)

ServeHTTP handles an incoming connection. Satisfies the http.Handler interface, so it can be used in an HTTP server.

func (*Proxy) SetStatistics

func (proxy *Proxy) SetStatistics(stats metrics.Statistics)

SetStatistics assigns a stats collector.

func (*Proxy) Shutdown

func (proxy *Proxy) Shutdown()

Shutdown stops the fetcher thread.

func (*Proxy) Start

func (proxy *Proxy) Start()

Start launches the fetcher thread. This should only be called once.

type StateManager

type StateManager struct {
	// contains filtered or unexported fields
}

StateManager maintains a list of disconnectable objects, sending them a notification whenever state changes.

After connection closure has been notified, the list is cleared and further notifications have no effect.

func NewStateManager

func NewStateManager() *StateManager

NewStateManager creates a new state manager.

Register notification channels with Register(), and submit state changes with Notify(). Channels can be removed later with Unregister(). After notify has been called, the list of registered channels is cleared.

func (*StateManager) Notify

func (manager *StateManager) Notify()

Notify sends a state change to all registered notification channels and clears the list.

func (*StateManager) Register

func (manager *StateManager) Register(channel chan<- bool)

Register registers a new notification channel.

It is not possible to register a channel twice. Any additional registrations will be ignored.

func (*StateManager) Unregister

func (manager *StateManager) Unregister(channel chan<- bool)

Unregister removes a registered channel.

If this channel was not registered previously, no action is taken.

type Streamer

type Streamer struct {
	// contains filtered or unexported fields
}

Streamer implements a TS packet multiplier, distributing received packets on the input queue to the output queues. It also handles and manages HTTP connections when added to an HTTP server.

func NewStreamer

func NewStreamer(name string, qsize uint, broker ConnectionBroker, auth auth.Authenticator) *Streamer

NewStreamer creates a new packet streamer. queue is an input packet queue. qsize is the length of each connection's queue (in packets). broker handles policy enforcement stats is a statistics collector object.

func (*Streamer) ServeHTTP

func (streamer *Streamer) ServeHTTP(writer http.ResponseWriter, request *http.Request)

ServeHTTP handles an incoming HTTP connection. Satisfies the http.Handler interface, so it can be used in an HTTP server.

func (*Streamer) SetCollector

func (streamer *Streamer) SetCollector(stats metrics.Collector)

SetCollector assigns a stats collector

func (*Streamer) SetInhibit added in v0.6.0

func (streamer *Streamer) SetInhibit(inhibit bool)

func (*Streamer) SetNotifier added in v0.5.0

func (streamer *Streamer) SetNotifier(events event.Notifiable)

SetNotifier assigns an event notifier

func (*Streamer) SetPreamble added in v0.10.0

func (streamer *Streamer) SetPreamble(preamble []byte)

func (*Streamer) Stream

func (streamer *Streamer) Stream(queue <-chan protocol.MpegTsPacket) error

Stream is the main stream multiplier loop. It reads data from the input queue and distributes it to the connections.

This routine will block; you should run it asynchronously like this:

queue := make(chan protocol.MpegTsPacket, inputQueueSize)

go func() {
  log.Fatal(streamer.Stream(queue))
}

or simply:

go streamer.Stream(queue)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL