database

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2024 License: AGPL-3.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const MySQL = "icinga-mysql"
View Source
const PostgreSQL = "icinga-pgsql"

Variables

This section is empty.

Functions

func CantPerformQuery

func CantPerformQuery(err error, q string) error

CantPerformQuery wraps the given error with the specified query that cannot be executed.

func IsRetryable

func IsRetryable(err error) bool

IsRetryable checks whether the given error is retryable.

func IsStruct

func IsStruct(subject interface{}) bool

func IsUnixAddr

func IsUnixAddr(host string) bool

func JoinHostPort

func JoinHostPort(host string, port int) string

JoinHostPort is like its equivalent in net., but handles UNIX sockets as well.

func RegisterDrivers

func RegisterDrivers(logger logr.Logger)

RegisterDrivers makes our database Driver(s) available under the name "icinga-*sql".

func TableName

func TableName(t interface{}) string

TableName returns the table of t.

Types

type CleanupStmt

type CleanupStmt struct {
	Table  string
	PK     string
	Column string
}

CleanupStmt defines information needed to compose cleanup statements.

func (*CleanupStmt) Build

func (stmt *CleanupStmt) Build(driverName string, limit uint64) string

Build assembles the cleanup statement for the specified database driver with the given limit.

type ColumnMap

type ColumnMap struct {
	// contains filtered or unexported fields
}

func NewColumnMap

func NewColumnMap(mapper *reflectx.Mapper) *ColumnMap

func (*ColumnMap) Columns

func (m *ColumnMap) Columns(subject any) []string

type Config

type Config struct {
	Type     string  `yaml:"type" default:"mysql"`
	Host     string  `yaml:"host"`
	Port     int     `yaml:"port"`
	Database string  `yaml:"database"`
	User     string  `yaml:"user"`
	Password string  `yaml:"password"`
	Options  Options `yaml:"options"`
}

Config defines database client configuration.

func FromYAMLFile

func FromYAMLFile(file string) (*Config, error)

func (*Config) Validate

func (c *Config) Validate() error

Validate checks constraints in the supplied database configuration and returns an error if they are violated.

type Database

type Database struct {
	*sqlx.DB

	Options Options
	// contains filtered or unexported fields
}

Database is a wrapper around sqlx.DB with bulk execution, statement building, streaming and logging capabilities.

func NewFromConfig

func NewFromConfig(c *Config, log logr.Logger) (*Database, error)

NewFromConfig returns a new Database connection from the given Config.

func (*Database) BatchSizeByPlaceholders

func (db *Database) BatchSizeByPlaceholders(n int) int

BatchSizeByPlaceholders returns how often the specified number of placeholders fits into Options.MaxPlaceholdersPerStatement, but at least 1.

func (*Database) BuildDeleteStmt

func (db *Database) BuildDeleteStmt(from interface{}) string

BuildDeleteStmt returns a DELETE statement for the given struct.

func (*Database) BuildSelectStmt

func (db *Database) BuildSelectStmt(table interface{}, columns interface{}) string

BuildSelectStmt returns a SELECT query that creates the FROM part from the given table struct and the column list from the specified columns struct.

func (*Database) BuildUpsertStmt

func (db *Database) BuildUpsertStmt(subject interface{}) (stmt string, placeholders int)

BuildUpsertStmt returns an upsert statement for the given struct.

func (*Database) BulkExec

func (db *Database) BulkExec(
	ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{}, features ...Feature,
) error

BulkExec bulk executes queries with a single slice placeholder in the form of `IN (?)`. Takes in up to the number of arguments specified in count from the arg stream, derives and expands a query and executes it with this set of arguments until the arg stream has been processed. The derived queries are executed in a separate goroutine with a weighting of 1 and can be executed concurrently to the extent allowed by the semaphore passed in sem. Arguments for which the query ran successfully will be passed to onSuccess.

func (*Database) CleanupOlderThan

func (db *Database) CleanupOlderThan(
	ctx context.Context, stmt CleanupStmt,
	count uint64, olderThan time.Time, onSuccess ...OnSuccess[struct{}],
) (uint64, error)

CleanupOlderThan deletes all rows with the specified statement that are older than the given time. Deletes a maximum of as many rows per round as defined in count. Actually deleted rows will be passed to onSuccess. Returns the total number of rows deleted.

func (*Database) Connect

func (db *Database) Connect() bool

func (*Database) DeleteStreamed

func (db *Database) DeleteStreamed(
	ctx context.Context, from interface{}, ids <-chan interface{}, features ...Feature,
) error

DeleteStreamed bulk deletes the specified ids via BulkExec. The delete statement is created using BuildDeleteStmt with the passed entityType. Bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency is controlled via Options.MaxConnectionsPerTable. IDs for which the query ran successfully will be passed to onSuccess.

func (*Database) GetSemaphoreForTable

func (db *Database) GetSemaphoreForTable(table string) *semaphore.Weighted

func (*Database) NamedBulkExec

func (db *Database) NamedBulkExec(
	ctx context.Context, query string, count int, sem *semaphore.Weighted, arg <-chan interface{},
	splitPolicyFactory com.BulkChunkSplitPolicyFactory[interface{}], features ...Feature,
) error

NamedBulkExec bulk executes queries with named placeholders in a VALUES clause most likely in the format INSERT ... VALUES. Takes in up to the number of entities specified in count from the arg stream, derives and executes a new query with the VALUES clause expanded to this set of arguments, until the arg stream has been processed. The queries are executed in a separate goroutine with a weighting of 1 and can be executed concurrently to the extent allowed by the semaphore passed in sem. Entities for which the query ran successfully will be passed to onSuccess.

func (*Database) UpsertStreamed

func (db *Database) UpsertStreamed(
	ctx context.Context, entities <-chan interface{}, features ...Feature,
) error

UpsertStreamed bulk upserts the specified entities via NamedBulkExec. The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream. Bulk size is controlled via Options.MaxPlaceholdersPerStatement and concurrency is controlled via Options.MaxConnectionsPerTable.

func (*Database) YieldAll

func (db *Database) YieldAll(ctx context.Context, factoryFunc func() (interface{}, error), query string, scope ...interface{}) (<-chan interface{}, <-chan error)

YieldAll executes the query with the supplied scope, scans each resulting row into an entity returned by the factory function, and streams them into a returned channel.

type Driver

type Driver struct {
	Logger logr.Logger
	// contains filtered or unexported fields
}

Driver wraps a driver.Driver that also must implement driver.DriverContext with logging capabilities and provides our RetryConnector.

func (Driver) OpenConnector

func (d Driver) OpenConnector(name string) (driver.Connector, error)

OpenConnector implements the DriverContext interface.

type Feature

type Feature func(*Features)

func WithBlocking

func WithBlocking() Feature

func WithCascading

func WithCascading() Feature

func WithOnSuccess

func WithOnSuccess(fn com.ProcessBulk[any]) Feature

type Features

type Features struct {
	// contains filtered or unexported fields
}

func NewFeatures

func NewFeatures(features ...Feature) *Features

type HasRelations

type HasRelations interface {
	Relations() []Relation
}

type OnSuccess

type OnSuccess[T any] func(ctx context.Context, affectedRows []T) (err error)

OnSuccess is a callback for successful (bulk) DML operations.

type Options

type Options struct {
	// Maximum number of open connections to the database.
	MaxConnections int `yaml:"max_connections" default:"16"`

	// Maximum number of connections per table,
	// regardless of what the connection is actually doing,
	// e.g. INSERT, UPDATE, DELETE.
	MaxConnectionsPerTable int `yaml:"max_connections_per_table" default:"8"`

	// MaxPlaceholdersPerStatement defines the maximum number of placeholders in an
	// INSERT, UPDATE or DELETE statement. Theoretically, MySQL can handle up to 2^16-1 placeholders,
	// but this increases the execution time of queries and thus reduces the number of queries
	// that can be executed in parallel in a given time.
	// The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism.
	MaxPlaceholdersPerStatement int `yaml:"max_placeholders_per_statement" default:"8192"`

	// MaxRowsPerTransaction defines the maximum number of rows per transaction.
	// The default is 2^13, which in our tests showed the best performance in terms of execution time and parallelism.
	MaxRowsPerTransaction int `yaml:"max_rows_per_transaction" default:"8192"`
}

Options define user configurable database options.

func (*Options) Validate

func (o *Options) Validate() error

Validate checks constraints in the supplied database options and returns an error if they are violated.

type PgSQLDriver

type PgSQLDriver struct {
	pq.Driver
}

PgSQLDriver extends pq.Driver with driver.DriverContext compliance.

func (PgSQLDriver) OpenConnector

func (PgSQLDriver) OpenConnector(name string) (driver.Connector, error)

OpenConnector implements the driver.DriverContext interface.

type Quoter

type Quoter struct {
	// contains filtered or unexported fields
}

func NewQuoter

func NewQuoter(db *sqlx.DB) *Quoter

func (*Quoter) QuoteColumns

func (q *Quoter) QuoteColumns(columns []string) string

func (*Quoter) QuoteIdentifier

func (q *Quoter) QuoteIdentifier(identifier string) string

type Relation

type Relation interface {
	ForeignKey() string
	SetForeignKey(fk string)
	CascadeDelete() bool
	WithoutCascadeDelete()
	StreamInto(context.Context, chan interface{}) error
	TableName() string
}

func HasMany

func HasMany[T any](entities []T, options ...RelationOption) Relation

func HasOne

func HasOne[T any](entity T, options ...RelationOption) Relation

type RelationOption

type RelationOption func(r Relation)

func WithForeignKey

func WithForeignKey(fk string) RelationOption

func WithoutCascadeDelete

func WithoutCascadeDelete() RelationOption

type RetryConnector

type RetryConnector struct {
	driver.Connector
	// contains filtered or unexported fields
}

RetryConnector wraps driver.Connector with retry logic.

func (RetryConnector) Connect

func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error)

Connect implements part of the driver.Connector interface.

func (RetryConnector) Driver

func (c RetryConnector) Driver() driver.Driver

Driver implements part of the driver.Connector interface.

type TableNamer

type TableNamer interface {
	TableName() string // TableName tells the table.
}

TableNamer implements the TableName method, which returns the table of the object.

type UUID

type UUID struct {
	uuid.UUID
}

UUID is like uuid.UUID, but marshals itself binarily (not like xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx) in SQL context.

func (UUID) Value

func (uuid UUID) Value() (driver.Value, error)

Value implements driver.Valuer.

type Upserter

type Upserter interface {
	Upsert() interface{}
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL