store

package
v0.0.0-...-61cf9d3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2022 License: AGPL-3.0 Imports: 13 Imported by: 2

Documentation

Index

Constants

View Source
const (
	// ErrFieldRequired is an error message if a value is missing
	ErrFieldRequired = "this field is required"
)

Variables

View Source
var (
	// ErrNotInitialized will be returned if the
	// database pool is accessed before initializing using
	// Connect.
	ErrNotInitialized = errors.New("store not initialized, pool not ready")

	// ErrMaxConnsUnconfigured will be returned, if the
	// the maximum connections are zero.
	ErrMaxConnsUnconfigured = errors.New("MaxConns not configured")
)
View Source
var (
	ErrFrontendRequired = errors.New("meeting requires a frontend state")
)

Errors

View Source
var (
	ErrNoBackend = errors.New("no backend associated with meeting")
)

MeetingStateErrors

View Source
var (
	// ErrNoConnectionInConfig will occure when the connection
	// is not available in the context.
	ErrNoConnectionInConfig = errors.New("connection missing in context")
)

Errors

Functions

func Acquire

func Acquire(ctx context.Context) (*pgxpool.Conn, error)

Acquire tries to get a database connection from the pool

func AssertDatabaseVersion

func AssertDatabaseVersion(pool *pgxpool.Pool, version int) error

AssertDatabaseVersion tests if the current version of the database is equal to a required version

func Connect

func Connect(opts *ConnectOpts) error

Connect initializes the connection pool and checks the schema version of the database.

func ConnectTest

func ConnectTest() error

ConnectTest to pgx db pool. Use b3scale defaults if environment variable is not set.

func ConnectionFromContext

func ConnectionFromContext(ctx context.Context) *pgxpool.Conn

ConnectionFromContext will retrieve the connection.

func ContextWithConnection

func ContextWithConnection(
	ctx context.Context,
	conn *pgxpool.Conn,
) context.Context

ContextWithConnection will return a child context with a value for connection.

func CountCommandsError

func CountCommandsError(ctx context.Context, tx pgx.Tx) (int, error)

CountCommandsError returns the number of successfully processed commands in the queue.

func CountCommandsRequested

func CountCommandsRequested(ctx context.Context, tx pgx.Tx) (int, error)

CountCommandsRequested returns the number of unprocessed commands in the queue.

func CountCommandsSuccess

func CountCommandsSuccess(ctx context.Context, tx pgx.Tx) (int, error)

CountCommandsSuccess returns the number of successfully processed commands in the queue.

func CountCommandsWithState

func CountCommandsWithState(
	ctx context.Context,
	tx pgx.Tx,
	state string,
) (int, error)

CountCommandsWithState retrievs the number of commands in the queue with a given state. e.g. requested, error, etc.

func DeleteMeetingStateByID

func DeleteMeetingStateByID(
	ctx context.Context,
	tx pgx.Tx,
	id string,
) error

DeleteMeetingStateByID will remove a meeting state. It will succeed, even if no such meeting was present. TODO: merge with DeleteMeetingStateByInternalID

func DeleteMeetingStateByInternalID

func DeleteMeetingStateByInternalID(
	ctx context.Context,
	tx pgx.Tx,
	id string,
) error

DeleteMeetingStateByInternalID will remove a meeting state. It will succeed, even if no such meeting was present.

func DeleteOrphanMeetings

func DeleteOrphanMeetings(
	ctx context.Context,
	tx pgx.Tx,
	backendID string,
	backendMeetings []string,
) (int64, error)

DeleteOrphanMeetings will remove all meetings not in a list of (internal) meeting IDs, but associated with a backend

func NewDelete

func NewDelete() sq.DeleteBuilder

NewDelete creates a new deletion query

func NewQuery

func NewQuery() sq.SelectBuilder

NewQuery creates a new query

func NextDeadline

func NextDeadline(dt time.Duration) time.Time

NextDeadline calculates the deadline for a newly requested command

func Q

func Q() sq.SelectBuilder

Q is an alias for NewQuery

func QueueCommand

func QueueCommand(ctx context.Context, tx pgx.Tx, cmd *Command) error

QueueCommand adds a new command to the queue

Types

type BackendSettings

type BackendSettings struct {
	Tags Tags `json:"tags,omitempty"`
}

BackendSettings hold per backend runtime configuration.

type BackendState

type BackendState struct {
	ID string `json:"id"`

	NodeState  string `json:"node_state"`
	AdminState string `json:"admin_state"`

	AgentHeartbeat time.Time `json:"agent_heartbeat"`

	LastError *string `json:"last_error"`

	Latency        time.Duration `json:"latency"`
	MeetingsCount  uint          `json:"meetings_count"`
	AttendeesCount uint          `json:"attendees_count"`

	LoadFactor float64 `json:"load_factor"`

	Backend *bbb.Backend `json:"bbb"`

	Settings BackendSettings `json:"settings"`

	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`
	SyncedAt  time.Time `json:"synced_at"`
}

The BackendState is shared across b3scale instances and encapsulates the list of meetings and recordings. The backend.ID should be used as identifier.

func GetBackendState

func GetBackendState(
	ctx context.Context,
	tx pgx.Tx,
	q sq.SelectBuilder,
) (*BackendState, error)

GetBackendState tries to retriev a single backend state

func GetBackendStates

func GetBackendStates(
	ctx context.Context,
	tx pgx.Tx,
	q sq.SelectBuilder,
) ([]*BackendState, error)

GetBackendStates retrievs all backends

func InitBackendState

func InitBackendState(init *BackendState) *BackendState

InitBackendState initializes a new backend state with an initial state.

func (*BackendState) ClearMeetings

func (s *BackendState) ClearMeetings(
	ctx context.Context,
	tx pgx.Tx,
) error

ClearMeetings will remove all meetings in the current state

func (*BackendState) CreateMeetingState

func (s *BackendState) CreateMeetingState(
	ctx context.Context,
	tx pgx.Tx,
	frontend *bbb.Frontend,
	meeting *bbb.Meeting,
) (*MeetingState, error)

CreateMeetingState will create a new state for the current backend state. A frontend is attached if present.

func (*BackendState) CreateOrUpdateMeetingState

func (s *BackendState) CreateOrUpdateMeetingState(
	ctx context.Context,
	tx pgx.Tx,
	meeting *bbb.Meeting,
) error

CreateOrUpdateMeetingState will try to update the meeting or will create a new meeting state if the meeting does not exists. The new meeting will not be associated with a frontend state - however the meeting can later be claimed by a frontend.

func (*BackendState) Delete

func (s *BackendState) Delete(
	ctx context.Context,
	tx pgx.Tx,
) error

Delete will remove the backend from the store

func (*BackendState) IsAgentAlive

func (s *BackendState) IsAgentAlive() bool

IsAgentAlive checks if the heartbeat is older than the threshold

func (*BackendState) IsNodeReady

func (s *BackendState) IsNodeReady() bool

IsNodeReady checks if the agent is alive and the node state is ready

func (*BackendState) Refresh

func (s *BackendState) Refresh(
	ctx context.Context,
	tx pgx.Tx,
) error

Refresh the backend state from the database

func (*BackendState) Save

func (s *BackendState) Save(
	ctx context.Context,
	tx pgx.Tx,
) error

Save persists the backend state in the database store

func (*BackendState) UpdateAgentHeartbeat

func (s *BackendState) UpdateAgentHeartbeat(
	ctx context.Context,
	tx pgx.Tx,
) error

UpdateAgentHeartbeat will set the attribute to the current timestamp

func (*BackendState) UpdateStatCounters

func (s *BackendState) UpdateStatCounters(
	ctx context.Context,
	tx pgx.Tx,
) error

UpdateStatCounters counts meetings and attendees and updates the properties

func (*BackendState) Validate

func (s *BackendState) Validate() ValidationError

Validate the backend state

type Command

type Command struct {
	ID  string `json:"id"`
	Seq int    `json:"seq"`

	State string `json:"state"`

	Action string      `json:"action"`
	Params interface{} `json:"params"`
	Result interface{} `json:"result"`

	Deadline  time.Time  `json:"deadline"`
	StartedAt *time.Time `json:"started_at"`
	StoppedAt *time.Time `json:"stopped_at"`
	CreatedAt time.Time  `json:"created_at"`
	// contains filtered or unexported fields
}

A Command is a representation of an operation

func (*Command) FetchParams

func (cmd *Command) FetchParams(
	ctx context.Context,
	req interface{},
) error

FetchParams loads the parameters and decodes them

type CommandHandler

type CommandHandler func(context.Context, *Command) (interface{}, error)

CommandHandler is a callback function for handling commands. The command was successful if no error was returned.

type CommandQueue

type CommandQueue struct {
	// contains filtered or unexported fields
}

The CommandQueue is connected to the database and provides methods for queuing and dequeuing commands.

func NewCommandQueue

func NewCommandQueue() *CommandQueue

NewCommandQueue initializes a new command queue

func (*CommandQueue) Receive

func (q *CommandQueue) Receive(handler CommandHandler) error

Receive will await a command and will block until a command can be processed. If the handler responds with an error, the error will be returned.

type ConnectOpts

type ConnectOpts struct {
	URL      string
	MaxConns int32
	MinConns int32
}

ConnectOpts database connection options

type DefaultPresentationSettings

type DefaultPresentationSettings struct {
	URL   string `json:"url"`
	Force bool   `json:"force"`
}

DefaultPresentationSettings configure a per frontend default presentation.

type FrontendSettings

type FrontendSettings struct {
	RequiredTags        Tags                         `json:"required_tags,omitempty"`
	DefaultPresentation *DefaultPresentationSettings `json:"default_presentation,omitempty"`
}

FrontendSettings hold all well known settings for a frontend.

type FrontendState

type FrontendState struct {
	ID string `json:"id"`

	Active   bool          `json:"active"`
	Frontend *bbb.Frontend `json:"bbb"`

	Settings FrontendSettings `json:"settings"`

	AccountRef *string `json:"account_ref"`

	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`
}

The FrontendState holds shared information about a frontend.

func GetFrontendState

func GetFrontendState(
	ctx context.Context,
	tx pgx.Tx,
	q sq.SelectBuilder,
) (*FrontendState, error)

GetFrontendState gets a single row from the store. This may return nil without an error.

func GetFrontendStates

func GetFrontendStates(
	ctx context.Context,
	tx pgx.Tx,
	q sq.SelectBuilder,
) ([]*FrontendState, error)

GetFrontendStates retrievs all frontend states from the database.

func InitFrontendState

func InitFrontendState(init *FrontendState) *FrontendState

InitFrontendState initializes the state with a database pool and default values where required.

func (*FrontendState) Delete

func (s *FrontendState) Delete(ctx context.Context, tx pgx.Tx) error

Delete will remove a frontend state from the store

func (*FrontendState) Save

func (s *FrontendState) Save(
	ctx context.Context,
	tx pgx.Tx,
) error

Save will create or update a frontend state

func (*FrontendState) Validate

func (s *FrontendState) Validate() ValidationError

Validate checks for presence of required fields.

type MeetingState

type MeetingState struct {
	ID         string
	InternalID string

	Meeting *bbb.Meeting

	FrontendID *string

	BackendID *string

	CreatedAt time.Time
	UpdatedAt time.Time
	SyncedAt  time.Time
	// contains filtered or unexported fields
}

The MeetingState holds a meeting and it's relations to a backend and frontend.

func GetMeetingState

func GetMeetingState(
	ctx context.Context,
	tx pgx.Tx,
	q sq.SelectBuilder,
) (*MeetingState, error)

GetMeetingState tries to retriev a single meeting state

func GetMeetingStateByID

func GetMeetingStateByID(
	ctx context.Context,
	tx pgx.Tx,
	id string,
) (*MeetingState, error)

GetMeetingStateByID is a convenience wrapper around GetMeetingState.

func GetMeetingStates

func GetMeetingStates(
	ctx context.Context,
	tx pgx.Tx,
	q sq.SelectBuilder,
) ([]*MeetingState, error)

GetMeetingStates retrieves all meeting states

func InitMeetingState

func InitMeetingState(
	init *MeetingState,
) *MeetingState

InitMeetingState initializes meeting state with defaults and a connection

func (*MeetingState) BindFrontendID

func (s *MeetingState) BindFrontendID(
	ctx context.Context,
	tx pgx.Tx,
	id string,
) error

BindFrontendID associates an unclaimed meeting with a frontend

func (*MeetingState) GetBackendState

func (s *MeetingState) GetBackendState(
	ctx context.Context,
	tx pgx.Tx,
) (*BackendState, error)

GetBackendState loads the backend state

func (*MeetingState) GetFrontendState

func (s *MeetingState) GetFrontendState(
	ctx context.Context,
	tx pgx.Tx,
) (*FrontendState, error)

GetFrontendState loads the frontend state for the meeting

func (*MeetingState) IsStale

func (s *MeetingState) IsStale(threshold time.Duration) bool

IsStale checks if the last sync is longer ago than a given threshold.

func (*MeetingState) MarkSynced

func (s *MeetingState) MarkSynced()

MarkSynced sets the synced at timestamp

func (*MeetingState) Refresh

func (s *MeetingState) Refresh(ctx context.Context, tx pgx.Tx) error

Refresh the backend state from the database

func (*MeetingState) Save

func (s *MeetingState) Save(ctx context.Context, tx pgx.Tx) error

Save updates or inserts a meeting state into our cluster state.

func (*MeetingState) SetBackendID

func (s *MeetingState) SetBackendID(
	ctx context.Context,
	tx pgx.Tx,
	id string,
) error

SetBackendID associates a meeting with a backend

func (*MeetingState) Upsert

func (s *MeetingState) Upsert(ctx context.Context, tx pgx.Tx) (string, error)

Upsert meeting state will create the meeting state or will fall back to a state update.

type Tags

type Tags []string

Tags are a list of strings with labels to declare for example backend capabilities

type ValidationError

type ValidationError map[string][]string

A ValidationError is a mapping between the field name (same as json field name), and a list of error strings.

func (ValidationError) Add

func (e ValidationError) Add(field, err string)

Add a validation error to the collection

func (ValidationError) Error

func (e ValidationError) Error() string

Error implements the error interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL