utils

package
v1.9.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2023 License: Apache-2.0 Imports: 42 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RoomPrefix            = "RM_"
	NodePrefix            = "ND_"
	ParticipantPrefix     = "PA_"
	TrackPrefix           = "TR_"
	APIKeyPrefix          = "API"
	EgressPrefix          = "EG_"
	IngressPrefix         = "IN_"
	SIPTrunkPrefix        = "ST_"
	SIPDispatchRulePrefix = "SDR_"
	SIPParticipantPrefix  = "SP_"
	RPCPrefix             = "RPC_"
	WHIPResourcePrefix    = "WH_"
	RTMPResourcePrefix    = "RT_"
	URLResourcePrefix     = "UR_"
)
View Source
const GuidSize = 12

Variables

View Source
var (
	ErrAnachronousSample = errors.New("anachronous sample")
)
View Source
var (
	PromMessageBusCounter = prometheus.NewCounterVec(
		prometheus.CounterOpts{
			Namespace: "livekit",
			Subsystem: "messagebus",
			Name:      "messages",
		},
		[]string{"type", "status"},
	)
)

Functions

func AggregateRTPStats

func AggregateRTPStats(statsList []*livekit.RTPStats, gapHistogramSize int) *livekit.RTPStats

func EnableLockTracker

func EnableLockTracker()

EnableLockTracker enable lock tracking background worker. This should be called during init

func GetMimeTypeForAudioCodec

func GetMimeTypeForAudioCodec(codec livekit.AudioCodec) string

func GetMimeTypeForVideoCodec

func GetMimeTypeForVideoCodec(codec livekit.VideoCodec) string

func HashedID

func HashedID(id string) string

Creates a hashed ID from a unique string

func IsConnectionQualityLower

func IsConnectionQualityLower(prev livekit.ConnectionQuality, curr livekit.ConnectionQuality) bool

func Least

func Least[T Numeric](less func(a, b T) bool, vs ...T) T

func LocalNodeID

func LocalNodeID() (string, error)

func MarshalGuid

func MarshalGuid[T livekit.Guid](id T) livekit.GuidBlock

func Max

func Max[T Numeric](vs ...T) T

func Min

func Min[T Numeric](vs ...T) T

func Most

func Most[T Numeric](less func(a, b T) bool, vs ...T) T

func Must

func Must[T any](v T, err error) T

func NewGuid

func NewGuid(prefix string) string

func NumMutexes

func NumMutexes() int

func ParallelExec

func ParallelExec[T any](vals []T, parallelThreshold, step uint64, fn func(T))

ParallelExec will executes the given function with each element of vals, if len(vals) >= parallelThreshold, will execute them in parallel, with the given step size. So fn must be thread-safe.

func RandomSecret

func RandomSecret() string

func Redact

func Redact(s, name string) string

func RedactIdentifier

func RedactIdentifier(identifier string) string

func RedactStreamKey

func RedactStreamKey(url string) (string, bool)

func ToggleLockTrackerStackTraces

func ToggleLockTrackerStackTraces(enable bool)

func UnmarshalGuid

func UnmarshalGuid[T livekit.Guid](b livekit.GuidBlock) T

Types

type Bitmap

type Bitmap[T bitmapNumber] struct {
	// contains filtered or unexported fields
}

func NewBitmap

func NewBitmap[T bitmapNumber](size int) *Bitmap[T]

func (*Bitmap[T]) Clear

func (b *Bitmap[T]) Clear(val T)

func (*Bitmap[T]) ClearRange

func (b *Bitmap[T]) ClearRange(min, max T)

func (*Bitmap[T]) IsSet

func (b *Bitmap[T]) IsSet(val T) bool

func (*Bitmap[T]) Set

func (b *Bitmap[T]) Set(val T)

func (*Bitmap[T]) SetRange

func (b *Bitmap[T]) SetRange(min, max T)

type CPUStats

type CPUStats struct {
	// contains filtered or unexported fields
}

func NewCPUStats

func NewCPUStats(idleUpdateCallback func(idle float64)) (*CPUStats, error)

func NewProcCPUStats

func NewProcCPUStats(procUpdateCallback func(idle float64, usage map[int]float64)) (*CPUStats, error)

func (*CPUStats) GetCPUIdle

func (c *CPUStats) GetCPUIdle() float64

func (*CPUStats) NumCPU

func (c *CPUStats) NumCPU() float64

func (*CPUStats) Stop

func (c *CPUStats) Stop()

type DedupedSlice

type DedupedSlice[T comparable] struct {
	// contains filtered or unexported fields
}

func NewDedupedSlice

func NewDedupedSlice[T comparable](maxLen int) *DedupedSlice[T]

func (*DedupedSlice[T]) Add

func (d *DedupedSlice[T]) Add(val T) bool

func (*DedupedSlice[T]) Clear

func (d *DedupedSlice[T]) Clear()

func (*DedupedSlice[T]) Get

func (d *DedupedSlice[T]) Get() []T

func (*DedupedSlice[T]) Has

func (d *DedupedSlice[T]) Has(val T) bool

func (*DedupedSlice[T]) Len

func (d *DedupedSlice[T]) Len() int

type ErrArray

type ErrArray struct {
	// contains filtered or unexported fields
}

func (*ErrArray) AppendErr

func (e *ErrArray) AppendErr(err error)

func (*ErrArray) ToError

func (e *ErrArray) ToError() psrpc.Error

type EventEmitter

type EventEmitter[K comparable, V any] struct {
	// contains filtered or unexported fields
}

func NewDefaultEventEmitter

func NewDefaultEventEmitter[K comparable, V any]() *EventEmitter[K, V]

func NewEventEmitter

func NewEventEmitter[K comparable, V any](params EventEmitterParams) *EventEmitter[K, V]

func (*EventEmitter[K, V]) Emit

func (e *EventEmitter[K, V]) Emit(k K, v V)

func (*EventEmitter[K, V]) Observe

func (e *EventEmitter[K, V]) Observe(k K) *EventObserver[V]

func (*EventEmitter[K, V]) ObservedKeys

func (e *EventEmitter[K, V]) ObservedKeys() []K

type EventEmitterParams

type EventEmitterParams struct {
	QueueSize int
	Logger    logger.Logger
}

func DefaultEventEmitterParams

func DefaultEventEmitterParams() EventEmitterParams

type EventObserver

type EventObserver[V any] struct {
	// contains filtered or unexported fields
}

func NewEventObserver

func NewEventObserver[V any](stopFunc func()) (*EventObserver[V], func(v V))

func (*EventObserver[V]) Events

func (o *EventObserver[V]) Events() <-chan V

func (*EventObserver[V]) Stop

func (o *EventObserver[V]) Stop()

type EventObserverList

type EventObserverList[V any] struct {
	// contains filtered or unexported fields
}

func NewDefaultEventObserverList

func NewDefaultEventObserverList[V any]() *EventObserverList[V]

func NewEventObserverList

func NewEventObserverList[V any](params EventEmitterParams) *EventObserverList[V]

func (*EventObserverList[V]) Emit

func (l *EventObserverList[V]) Emit(v V)

func (*EventObserverList[V]) Len

func (l *EventObserverList[V]) Len() int

func (*EventObserverList[V]) Observe

func (l *EventObserverList[V]) Observe() *EventObserver[V]

type FlowGraph

type FlowGraph struct {
	// contains filtered or unexported fields
}

func NewFlowGraph

func NewFlowGraph(n int64) FlowGraph

func (*FlowGraph) AddEdge

func (g *FlowGraph) AddEdge(s, t, cap, cost int64)

type Graph

type Graph[K comparable, N GraphNodeProps[K], E GraphEdgeProps] struct {
	// contains filtered or unexported fields
}

func NewGraph

func NewGraph[K comparable, N GraphNodeProps[K], E GraphEdgeProps]() *Graph[K, N, E]

func (*Graph[K, N, E]) DeleteEdge

func (g *Graph[K, N, E]) DeleteEdge(src, dst K)

func (*Graph[K, N, E]) DeleteNode

func (g *Graph[K, N, E]) DeleteNode(id K)

func (*Graph[K, N, E]) Edge

func (g *Graph[K, N, E]) Edge(src, dst K) (p E)

func (*Graph[K, N, E]) HasEdge

func (g *Graph[K, N, E]) HasEdge(src, dst K) bool

func (*Graph[K, N, E]) HasNode

func (g *Graph[K, N, E]) HasNode(id K) bool

func (*Graph[K, N, E]) InEdges

func (g *Graph[K, N, E]) InEdges(dst K) map[K]E

func (*Graph[K, N, E]) InsertEdge

func (g *Graph[K, N, E]) InsertEdge(src, dst K, props E)

func (*Graph[K, N, E]) InsertNode

func (g *Graph[K, N, E]) InsertNode(props N)

func (*Graph[K, N, E]) Node

func (g *Graph[K, N, E]) Node(id K) (props N)

func (*Graph[K, N, E]) NodeIDs

func (g *Graph[K, N, E]) NodeIDs() []K

func (*Graph[K, N, E]) OutEdges

func (g *Graph[K, N, E]) OutEdges(src K) map[K]E

func (*Graph[K, N, E]) ShortestPath

func (g *Graph[K, N, E]) ShortestPath(src, dst K) ([]N, int64)

func (*Graph[K, N, E]) Size

func (g *Graph[K, N, E]) Size() int

func (*Graph[K, N, E]) TopologicalSort

func (g *Graph[K, N, E]) TopologicalSort() []N

type GraphEdge

type GraphEdge[N, E any] struct {
	// contains filtered or unexported fields
}

type GraphEdgeProps

type GraphEdgeProps interface {
	Length() int64
}

type GraphNode

type GraphNode[T any] struct {
	// contains filtered or unexported fields
}

type GraphNodeProps

type GraphNodeProps[K comparable] interface {
	ID() K
}

type KillableService

type KillableService interface {
	Kill()
}

type MessageBus

type MessageBus interface {
	Subscribe(ctx context.Context, channel string) (PubSub, error)
	// SubscribeQueue is like subscribe, but ensuring only a single instance gets to process the message
	SubscribeQueue(ctx context.Context, channel string) (PubSub, error)
	Publish(ctx context.Context, channel string, msg proto.Message) error
}

func NewRedisMessageBus

func NewRedisMessageBus(rc redis.UniversalClient) MessageBus

type MinCostMaxFlow

type MinCostMaxFlow struct {
	// contains filtered or unexported fields
}

func (*MinCostMaxFlow) ComputeMaxFlow

func (f *MinCostMaxFlow) ComputeMaxFlow(g FlowGraph, s, t int64) (flow, cost int64)

func (*MinCostMaxFlow) Flow

func (f *MinCostMaxFlow) Flow(s, t int64) int64

type MultitonService

type MultitonService[K comparable] struct {
	// contains filtered or unexported fields
}

func (*MultitonService[K]) Kill

func (s *MultitonService[K]) Kill()

func (*MultitonService[K]) Replace

func (s *MultitonService[K]) Replace(k K, v KillableService) func()

type Mutex

type Mutex struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*Mutex) Lock

func (m *Mutex) Lock()

func (*Mutex) Unlock

func (m *Mutex) Unlock()

type Numeric

type Numeric interface {
	constraints.Signed | constraints.Unsigned | time.Duration
}

type ProtoProxy

type ProtoProxy[T proto.Message] struct {
	// contains filtered or unexported fields
}

ProtoProxy is a caching proxy for protobuf messages that may be expensive to compute. It is used to avoid unnecessary re-generation of Protobufs

func NewProtoProxy

func NewProtoProxy[T proto.Message](refreshInterval time.Duration, updateFn func() T) *ProtoProxy[T]

NewProtoProxy creates a new ProtoProxy that regenerates underlying values at a cadence of refreshInterval this should be used for updates that should be sent periodically, but does not have the urgency of immediate delivery updateFn should provide computations required to generate the protobuf if refreshInterval is 0, then proxy will only update on MarkDirty(true)

func (*ProtoProxy[T]) Get

func (p *ProtoProxy[T]) Get() T

func (*ProtoProxy[T]) MarkDirty

func (p *ProtoProxy[T]) MarkDirty(immediate bool)

func (*ProtoProxy[T]) Stop

func (p *ProtoProxy[T]) Stop()

func (*ProtoProxy[T]) Updated

func (p *ProtoProxy[T]) Updated() <-chan struct{}

type PubSub

type PubSub interface {
	Channel() <-chan interface{}
	Payload(msg interface{}) []byte
	Close() error
}

type RWMutex

type RWMutex struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*RWMutex) Lock

func (m *RWMutex) Lock()

func (*RWMutex) RLock

func (m *RWMutex) RLock()

func (*RWMutex) RUnlock

func (m *RWMutex) RUnlock()

func (*RWMutex) Unlock

func (m *RWMutex) Unlock()

type RedisMessageBus

type RedisMessageBus struct {
	// contains filtered or unexported fields
}

func (*RedisMessageBus) Lock

func (r *RedisMessageBus) Lock(ctx context.Context, key string, expiration time.Duration) (bool, error)

func (*RedisMessageBus) Publish

func (r *RedisMessageBus) Publish(ctx context.Context, channel string, msg proto.Message) error

func (*RedisMessageBus) Subscribe

func (r *RedisMessageBus) Subscribe(ctx context.Context, channel string) (PubSub, error)

func (*RedisMessageBus) SubscribeQueue

func (r *RedisMessageBus) SubscribeQueue(ctx context.Context, channel string) (PubSub, error)

type RedisPubSub

type RedisPubSub struct {
	// contains filtered or unexported fields
}

func (*RedisPubSub) Channel

func (r *RedisPubSub) Channel() <-chan interface{}

func (*RedisPubSub) Close

func (r *RedisPubSub) Close() error

func (*RedisPubSub) Payload

func (r *RedisPubSub) Payload(msg interface{}) []byte

type SimpleGraphEdge

type SimpleGraphEdge struct{}

func (SimpleGraphEdge) Length

func (e SimpleGraphEdge) Length() int64

type StuckLock

type StuckLock struct {
	// contains filtered or unexported fields
}

func ScanTrackedLocks

func ScanTrackedLocks(threshold time.Duration) []*StuckLock

ScanTrackedLocks check all lock trackers

func ScanTrackedLocksI

func ScanTrackedLocksI(threshold time.Duration, n int) []*StuckLock

ScanTrackedLocksI check lock trackers incrementally n at a time

func (*StuckLock) FirstLockedAtStack

func (d *StuckLock) FirstLockedAtStack() string

func (*StuckLock) HeldSince

func (d *StuckLock) HeldSince() time.Time

func (*StuckLock) NumGoroutineHeld

func (d *StuckLock) NumGoroutineHeld() int

func (*StuckLock) NumGoroutineWaiting

func (d *StuckLock) NumGoroutineWaiting() int

type TimedAggregator

type TimedAggregator[T timedAggregatorNumber] struct {
	// contains filtered or unexported fields
}

func NewTimedAggregator

func NewTimedAggregator[T timedAggregatorNumber](params TimedAggregatorParams) *TimedAggregator[T]

func (*TimedAggregator[T]) AddSample

func (t *TimedAggregator[T]) AddSample(val T) error

func (*TimedAggregator[T]) AddSampleAt

func (t *TimedAggregator[T]) AddSampleAt(val T, at time.Time) error

func (*TimedAggregator[T]) GetAggregate

func (t *TimedAggregator[T]) GetAggregate() (T, time.Duration)

func (*TimedAggregator[T]) GetAggregateAndRestartAt

func (t *TimedAggregator[T]) GetAggregateAndRestartAt(at time.Time) (T, time.Duration, error)

func (*TimedAggregator[T]) GetAggregateAt

func (t *TimedAggregator[T]) GetAggregateAt(at time.Time) (T, time.Duration, error)

func (*TimedAggregator[T]) GetAverage

func (t *TimedAggregator[T]) GetAverage() float64

func (*TimedAggregator[T]) GetAverageAndRestartAt

func (t *TimedAggregator[T]) GetAverageAndRestartAt(at time.Time) (float64, error)

func (*TimedAggregator[T]) GetAverageAt

func (t *TimedAggregator[T]) GetAverageAt(at time.Time) (float64, error)

func (*TimedAggregator[T]) Reset

func (t *TimedAggregator[T]) Reset()

func (*TimedAggregator[T]) Restart

func (t *TimedAggregator[T]) Restart()

func (*TimedAggregator[T]) RestartAt

func (t *TimedAggregator[T]) RestartAt(at time.Time)

type TimedAggregatorParams

type TimedAggregatorParams struct {
	CapNegativeValues bool
}

type TimedVersion

type TimedVersion struct {
	// contains filtered or unexported fields
}

func NewTimedVersionFromProto

func NewTimedVersionFromProto(proto *livekit.TimedVersion) *TimedVersion

func NewTimedVersionFromTime

func NewTimedVersionFromTime(t time.Time) *TimedVersion

func TimedVersionFromProto

func TimedVersionFromProto(proto *livekit.TimedVersion) TimedVersion

func TimedVersionFromTime

func TimedVersionFromTime(t time.Time) TimedVersion

func (*TimedVersion) After

func (t *TimedVersion) After(other *TimedVersion) bool

func (*TimedVersion) Compare

func (t *TimedVersion) Compare(other *TimedVersion) int

func (*TimedVersion) IsZero

func (t *TimedVersion) IsZero() bool

func (*TimedVersion) Load

func (t *TimedVersion) Load() TimedVersion

func (*TimedVersion) Store

func (t *TimedVersion) Store(other *TimedVersion)

func (*TimedVersion) String

func (t *TimedVersion) String() string

func (*TimedVersion) Time

func (t *TimedVersion) Time() time.Time

func (*TimedVersion) ToProto

func (t *TimedVersion) ToProto() *livekit.TimedVersion

func (*TimedVersion) Update

func (t *TimedVersion) Update(other *TimedVersion) bool

type TimedVersionGenerator

type TimedVersionGenerator interface {
	New() *TimedVersion
	Next() TimedVersion
}

func NewDefaultTimedVersionGenerator

func NewDefaultTimedVersionGenerator() TimedVersionGenerator

type TimeoutQueue

type TimeoutQueue[T any] struct {
	// contains filtered or unexported fields
}

func (*TimeoutQueue[T]) IterateAfter

func (q *TimeoutQueue[T]) IterateAfter(timeout time.Duration) *timeoutQueueIterator[T]

func (*TimeoutQueue[T]) IterateRemoveAfter

func (q *TimeoutQueue[T]) IterateRemoveAfter(timeout time.Duration) *timeoutQueueIterator[T]

func (*TimeoutQueue[T]) Remove

func (q *TimeoutQueue[T]) Remove(i *TimeoutQueueItem[T])

func (*TimeoutQueue[T]) Reset

func (q *TimeoutQueue[T]) Reset(i *TimeoutQueueItem[T]) bool

type TimeoutQueueItem

type TimeoutQueueItem[T any] struct {
	Value T
	// contains filtered or unexported fields
}

type Welford

type Welford struct {
	// contains filtered or unexported fields
}

Welford implements Welford's online algorithm for variance SEE: https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm

func WelfordMerge

func WelfordMerge(ws ...Welford) Welford

func (Welford) Count

func (w Welford) Count() float64

func (Welford) Mean

func (w Welford) Mean() float64

func (*Welford) Reset

func (w *Welford) Reset()

func (Welford) StdDev

func (w Welford) StdDev() float64

func (*Welford) Update

func (w *Welford) Update(v float64)

func (Welford) Value

func (w Welford) Value() (mean, variance, sampleVariance float64)

func (Welford) Variance

func (w Welford) Variance() float64

type WorkerGroup

type WorkerGroup struct {
	// contains filtered or unexported fields
}

func (*WorkerGroup) Go

func (w *WorkerGroup) Go(fn func())

func (*WorkerGroup) Wait

func (w *WorkerGroup) Wait()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL