icingadb

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2024 License: GPL-2.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MySQL      string = "mysql"
	PostgreSQL string = "postgres"
)

Driver names as automatically registered in the database/sql package by themselves.

Variables

This section is empty.

Functions

This section is empty.

Types

type CleanupStmt

type CleanupStmt struct {
	Table  string
	PK     string
	Column string
}

CleanupStmt defines information needed to compose cleanup statements.

func (*CleanupStmt) Build

func (stmt *CleanupStmt) Build(driverName string, limit uint64) string

Build assembles the cleanup statement for the specified database driver with the given limit.

type DB

type DB struct {
	*sqlx.DB

	Options *Options
	// contains filtered or unexported fields
}

DB is a wrapper around sqlx.DB with bulk execution, statement building, streaming and logging capabilities.

func NewDb

func NewDb(db *sqlx.DB, logger *logging.Logger, options *Options) *DB

NewDb returns a new icingadb.DB wrapper for a pre-existing *sqlx.DB.

func (*DB) BatchSizeByPlaceholders

func (db *DB) BatchSizeByPlaceholders(n int) int

BatchSizeByPlaceholders returns how often the specified number of placeholders fits into Options.MaxPlaceholdersPerStatement, but at least 1.

func (*DB) BuildColumns

func (db *DB) BuildColumns(subject interface{}) []string

BuildColumns returns all columns of the given struct.

func (*DB) BuildDeleteStmt

func (db *DB) BuildDeleteStmt(from interface{}) string

BuildDeleteStmt returns a DELETE statement for the given struct.

func (*DB) BuildInsertIgnoreStmt

func (db *DB) BuildInsertIgnoreStmt(into interface{}) (string, int)

BuildInsertIgnoreStmt returns an INSERT statement for the specified struct for which the database ignores rows that have already been inserted.

func (*DB) BuildInsertStmt

func (db *DB) BuildInsertStmt(into interface{}) (string, int)

BuildInsertStmt returns an INSERT INTO statement for the given struct.

func (*DB) BuildSelectStmt

func (db *DB) BuildSelectStmt(table interface{}, columns interface{}) string

BuildSelectStmt returns a SELECT query that creates the FROM part from the given table struct and the column list from the specified columns struct.

func (*DB) BuildUpdateStmt

func (db *DB) BuildUpdateStmt(update interface{}) (string, int)

BuildUpdateStmt returns an UPDATE statement for the given struct.

func (*DB) BuildUpsertStmt

func (db *DB) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int)

BuildUpsertStmt returns an upsert statement for the given struct.

func (*DB) BuildWhere

func (db *DB) BuildWhere(subject interface{}) (string, int)

BuildWhere returns a WHERE clause with named placeholder conditions built from the specified struct combined with the AND operator.

func (*DB) BulkExec

func (db *DB) BulkExec(
	ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan any, onSuccess ...OnSuccess[any],
) error

BulkExec bulk executes queries with a single slice placeholder in the form of `IN (?)`. Takes in up to the number of arguments specified in count from the arg stream, derives and expands a query and executes it with this set of arguments until the arg stream has been processed. The derived queries are executed in a separate goroutine with a weighting of 1 and can be executed concurrently to the extent allowed by the semaphore passed in sem. Arguments for which the query ran successfully will be passed to onSuccess.

func (*DB) CheckSchema added in v1.1.0

func (db *DB) CheckSchema(ctx context.Context) error

CheckSchema asserts the database schema of the expected version being present.

func (*DB) CleanupOlderThan

func (db *DB) CleanupOlderThan(
	ctx context.Context, stmt CleanupStmt, envId types.Binary,
	count uint64, olderThan time.Time, onSuccess ...OnSuccess[struct{}],
) (uint64, error)

CleanupOlderThan deletes all rows with the specified statement that are older than the given time. Deletes a maximum of as many rows per round as defined in count. Actually deleted rows will be passed to onSuccess. Returns the total number of rows deleted.

func (*DB) CreateIgnoreStreamed added in v1.1.0

func (db *DB) CreateIgnoreStreamed(
	ctx context.Context, entities <-chan contracts.Entity, onSuccess ...OnSuccess[contracts.Entity],
) error

CreateIgnoreStreamed bulk creates the specified entities via NamedBulkExec. The insert statement is created using BuildInsertIgnoreStmt with the first entity from the entities stream. Bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency is controlled via Options.MaxConnectionsPerTable. Entities for which the query ran successfully will be passed to onSuccess.

func (*DB) CreateStreamed

func (db *DB) CreateStreamed(
	ctx context.Context, entities <-chan contracts.Entity, onSuccess ...OnSuccess[contracts.Entity],
) error

CreateStreamed bulk creates the specified entities via NamedBulkExec. The insert statement is created using BuildInsertStmt with the first entity from the entities stream. Bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency is controlled via Options.MaxConnectionsPerTable. Entities for which the query ran successfully will be passed to onSuccess.

func (*DB) Delete

func (db *DB) Delete(
	ctx context.Context, entityType contracts.Entity, ids []interface{}, onSuccess ...OnSuccess[any],
) error

Delete creates a channel from the specified ids and bulk deletes them by passing the channel along with the entityType to DeleteStreamed. IDs for which the query ran successfully will be passed to onSuccess.

func (*DB) DeleteStreamed

func (db *DB) DeleteStreamed(
	ctx context.Context, entityType contracts.Entity, ids <-chan interface{}, onSuccess ...OnSuccess[any],
) error

DeleteStreamed bulk deletes the specified ids via BulkExec. The delete statement is created using BuildDeleteStmt with the passed entityType. Bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency is controlled via Options.MaxConnectionsPerTable. IDs for which the query ran successfully will be passed to onSuccess.

func (*DB) GetSemaphoreForTable

func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted

func (*DB) NamedBulkExec

func (db *DB) NamedBulkExec(
	ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity,
	splitPolicyFactory com.BulkChunkSplitPolicyFactory[contracts.Entity], onSuccess ...OnSuccess[contracts.Entity],
) error

NamedBulkExec bulk executes queries with named placeholders in a VALUES clause most likely in the format INSERT ... VALUES. Takes in up to the number of entities specified in count from the arg stream, derives and executes a new query with the VALUES clause expanded to this set of arguments, until the arg stream has been processed. The queries are executed in a separate goroutine with a weighting of 1 and can be executed concurrently to the extent allowed by the semaphore passed in sem. Entities for which the query ran successfully will be passed to onSuccess.

func (*DB) NamedBulkExecTx

func (db *DB) NamedBulkExecTx(
	ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan contracts.Entity,
) error

NamedBulkExecTx bulk executes queries with named placeholders in separate transactions. Takes in up to the number of entities specified in count from the arg stream and executes a new transaction that runs a new query for each entity in this set of arguments, until the arg stream has been processed. The transactions are executed in a separate goroutine with a weighting of 1 and can be executed concurrently to the extent allowed by the semaphore passed in sem.

func (*DB) UpdateStreamed

func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan contracts.Entity) error

UpdateStreamed bulk updates the specified entities via NamedBulkExecTx. The update statement is created using BuildUpdateStmt with the first entity from the entities stream. Bulk size is controlled via Options.MaxRowsPerTransaction and concurrency is controlled via Options.MaxConnectionsPerTable.

func (*DB) UpsertStreamed

func (db *DB) UpsertStreamed(
	ctx context.Context, entities <-chan contracts.Entity, onSuccess ...OnSuccess[contracts.Entity],
) error

UpsertStreamed bulk upserts the specified entities via NamedBulkExec. The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream. Bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency is controlled via Options.MaxConnectionsPerTable. Entities for which the query ran successfully will be passed to onSuccess.

func (*DB) YieldAll

func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, scope interface{}) (<-chan contracts.Entity, <-chan error)

YieldAll executes the query with the supplied scope, scans each resulting row into an entity returned by the factory function, and streams them into a returned channel.

type Delta

type Delta struct {
	Create  EntitiesById
	Update  EntitiesById
	Delete  EntitiesById
	Subject *common.SyncSubject
	// contains filtered or unexported fields
}

Delta calculates the delta of actual and desired entities, and stores which entities need to be created, updated, and deleted.

func NewDelta

func NewDelta(ctx context.Context, actual, desired <-chan contracts.Entity, subject *common.SyncSubject, logger *logging.Logger) *Delta

NewDelta creates a new Delta and starts calculating it. The caller must ensure that no duplicate entities are sent to the same stream.

func (*Delta) Wait

func (delta *Delta) Wait() error

Wait waits for the delta calculation to complete and returns an error, if any.

type DumpSignals

type DumpSignals struct {
	// contains filtered or unexported fields
}

DumpSignals reads dump signals from a Redis stream via Listen. Dump-done signals are passed on via Done channels, while InProgress must be checked for dump-wip signals.

func NewDumpSignals

func NewDumpSignals(redis *icingaredis.Client, logger *logging.Logger) *DumpSignals

NewDumpSignals returns new DumpSignals.

func (*DumpSignals) Done

func (s *DumpSignals) Done(key string) <-chan struct{}

Done returns a channel that is closed when the given key receives a done dump signal.

func (*DumpSignals) InProgress

func (s *DumpSignals) InProgress() <-chan struct{}

InProgress returns a channel that is closed when a new dump is in progress after done signals were sent to channels returned by Done.

func (*DumpSignals) Listen

func (s *DumpSignals) Listen(ctx context.Context) error

Listen starts listening for dump signals in the icinga:dump Redis stream. When a done signal is received, this is signaled via the channels returned from the Done function.

If a wip signal is received after a done signal was passed on via the Done function, this is signaled via the InProgress function and this function returns with err == nil. In this case, all other signals are invalidated. It is up to the caller to pass on this information, for example by cancelling derived contexts.

This function may only be called once for each DumpSignals object. To listen for a new iteration of dump signals, a new DumpSignals instance must be created.

type EntitiesById

type EntitiesById map[string]contracts.Entity

EntitiesById is a map of key-contracts.Entity pairs.

func (EntitiesById) Entities

func (ebi EntitiesById) Entities(ctx context.Context) <-chan contracts.Entity

Entities streams the entities on a returned channel.

func (EntitiesById) IDs

func (ebi EntitiesById) IDs() []interface{}

IDs returns the contracts.ID of the entities.

func (EntitiesById) Keys

func (ebi EntitiesById) Keys() []string

Keys returns the keys.

type HA

type HA struct {
	// contains filtered or unexported fields
}

HA provides high availability and indicates whether a Takeover or Handover must be made.

func NewHA

func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger *logging.Logger) *HA

NewHA returns a new HA and starts the controller loop.

func (*HA) Close

func (h *HA) Close(ctx context.Context) error

Close shuts h down.

func (*HA) Done

func (h *HA) Done() <-chan struct{}

Done returns a channel that's closed when the HA controller loop ended.

func (*HA) Environment

func (h *HA) Environment() *v1.Environment

Environment returns the current environment.

func (*HA) Err

func (h *HA) Err() error

Err returns an error if Done has been closed and there is an error. Otherwise returns nil.

func (*HA) Handover

func (h *HA) Handover() chan string

Handover returns a channel with which handovers and their reasons are signaled.

func (*HA) State

func (h *HA) State() (responsibleTsMilli int64, responsible, otherResponsible bool)

State returns the status quo.

func (*HA) Takeover

func (h *HA) Takeover() chan string

Takeover returns a channel with which takeovers and their reasons are signaled.

type InitConnFunc added in v1.2.0

type InitConnFunc func(context.Context, driver.Conn) error

type MysqlFuncLogger added in v1.2.0

type MysqlFuncLogger func(v ...interface{})

MysqlFuncLogger is an adapter that allows ordinary functions to be used as a logger for mysql.SetLogger.

func (MysqlFuncLogger) Print added in v1.2.0

func (log MysqlFuncLogger) Print(v ...interface{})

Print implements the mysql.Logger interface.

type OnSuccess

type OnSuccess[T any] func(ctx context.Context, affectedRows []T) (err error)

OnSuccess is a callback for successful (bulk) DML operations.

func OnSuccessIncrement

func OnSuccessIncrement[T any](counter *com.Counter) OnSuccess[T]

func OnSuccessSendTo

func OnSuccessSendTo[T any](ch chan<- T) OnSuccess[T]

type Options

type Options struct {
	// Maximum number of open connections to the database.
	MaxConnections int `yaml:"max_connections" default:"16"`

	// Maximum number of connections per table,
	// regardless of what the connection is actually doing,
	// e.g. INSERT, UPDATE, DELETE.
	MaxConnectionsPerTable int `yaml:"max_connections_per_table" default:"8"`

	// MaxPlaceholdersPerStatement defines the maximum number of placeholders in an
	// INSERT, UPDATE or DELETE statement. Theoretically, MySQL can handle up to 2^16-1 placeholders,
	// but this increases the execution time of queries and thus reduces the number of queries
	// that can be executed in parallel in a given time.
	// The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism.
	MaxPlaceholdersPerStatement int `yaml:"max_placeholders_per_statement" default:"8192"`

	// MaxRowsPerTransaction defines the maximum number of rows per transaction.
	// The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism.
	MaxRowsPerTransaction int `yaml:"max_rows_per_transaction" default:"8192"`

	// WsrepSyncWait enforces Galera cluster nodes to perform strict cluster-wide causality checks
	// before executing specific SQL queries determined by the number you provided.
	// Please refer to the below link for a detailed description.
	// https://icinga.com/docs/icinga-db/latest/doc/03-Configuration/#galera-cluster
	WsrepSyncWait int `yaml:"wsrep_sync_wait" default:"7"`
}

Options define user configurable database options.

func (*Options) Validate

func (o *Options) Validate() error

Validate checks constraints in the supplied database options and returns an error if they are violated.

type RetryConnector added in v1.2.0

type RetryConnector struct {
	driver.Connector
	// contains filtered or unexported fields
}

RetryConnector wraps driver.Connector with retry logic.

func NewConnector added in v1.2.0

func NewConnector(c driver.Connector, logger *logging.Logger, init InitConnFunc) *RetryConnector

NewConnector creates a fully initialized RetryConnector from the given args.

func (RetryConnector) Connect added in v1.2.0

func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error)

Connect implements part of the driver.Connector interface.

func (RetryConnector) Driver added in v1.2.0

func (c RetryConnector) Driver() driver.Driver

Driver implements part of the driver.Connector interface.

type RuntimeUpdates

type RuntimeUpdates struct {
	// contains filtered or unexported fields
}

RuntimeUpdates specifies the source and destination of runtime updates.

func NewRuntimeUpdates

func NewRuntimeUpdates(db *DB, redis *icingaredis.Client, logger *logging.Logger) *RuntimeUpdates

NewRuntimeUpdates creates a new RuntimeUpdates.

func (*RuntimeUpdates) ClearStreams

func (r *RuntimeUpdates) ClearStreams(ctx context.Context) (config, state icingaredis.Streams, err error)

ClearStreams returns the stream key to ID mapping of the runtime update streams for later use in Sync and clears the streams themselves.

func (*RuntimeUpdates) Sync

func (r *RuntimeUpdates) Sync(
	ctx context.Context, factoryFuncs []contracts.EntityFactoryFunc, streams icingaredis.Streams, allowParallel bool,
) error

Sync synchronizes runtime update streams from s.redis to s.db and deletes the original data on success. Note that Sync must be only be called configuration synchronization has been completed. allowParallel allows synchronizing out of order (not FIFO).

type ScopedEntity

type ScopedEntity struct {
	contracts.Entity
	// contains filtered or unexported fields
}

ScopedEntity combines an entity and a scope that specifies the WHERE conditions that entities of the enclosed entity type must satisfy in order to be SELECTed.

func NewScopedEntity

func NewScopedEntity(entity contracts.Entity, scope interface{}) *ScopedEntity

NewScopedEntity returns a new ScopedEntity.

func (ScopedEntity) Scope

func (e ScopedEntity) Scope() interface{}

Scope implements the contracts.Scoper interface.

func (ScopedEntity) TableName

func (e ScopedEntity) TableName() string

TableName implements the contracts.TableNamer interface.

type Sync

type Sync struct {
	// contains filtered or unexported fields
}

Sync implements a rendezvous point for Icinga DB and Redis to synchronize their entities.

func NewSync

func NewSync(db *DB, redis *icingaredis.Client, logger *logging.Logger) *Sync

NewSync returns a new Sync.

func (Sync) ApplyDelta

func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error

ApplyDelta applies all changes from Delta to the database.

func (Sync) Sync

func (s Sync) Sync(ctx context.Context, subject *common.SyncSubject) error

Sync synchronizes entities between Icinga DB and Redis created with the specified sync subject. This function does not respect dump signals. For this, use SyncAfterDump.

func (Sync) SyncAfterDump

func (s Sync) SyncAfterDump(ctx context.Context, subject *common.SyncSubject, dump *DumpSignals) error

SyncAfterDump waits for a config dump to finish (using the dump parameter) and then starts a sync for the given sync subject using the Sync function.

func (Sync) SyncCustomvars

func (s Sync) SyncCustomvars(ctx context.Context) error

SyncCustomvars synchronizes customvar and customvar_flat.

Directories

Path Synopsis
v1

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL