db

package
v0.0.0-...-28c5b4c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2024 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Overview

Package db contains all communication between ppacer and the database.

Introduction

Supported databases

  • SQLite - used as the default database. It's also used as in-memory database and database on /tmp files for unit and integration tests.
  • Postgres
  • ... (More in the future)

Index

Constants

View Source
const (
	DagRunTaskStatusScheduled = "SCHEDULED"
)

Variables

View Source
var TableNames []string = []string{
	"dags",
	"dagtasks",
	"dagruns",
	"dagruntasks",
}

TableNames is a list of ppacer database table names.

Functions

func CleanUpSqliteTmp

func CleanUpSqliteTmp(c *Client, t *testing.T)

CleanUpSqliteTmp deletes SQLite database source file if all tests in the scope passed. In at least one test failed, database will not be deleted, to enable futher debugging. Even though this function takes generic *Client, it's mainly meant for SQLite-based database clients which are used in testing.

func SchemaStatements

func SchemaStatements(dbDriver string) ([]string, error)

SchemaStatements returns a list of SQL statements that setups new instance of scheduler internal database. It can differ a little bit between SQL databases, so exact list of statements are prepared based on given database driver name. If given database driver is not supported, then non-nil error is returned.

func SchemaStatementsForLogs

func SchemaStatementsForLogs(dbDriver string) ([]string, error)

Like SchemaStatements but for database for logs.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents the main database client.

func NewInMemoryClient

func NewInMemoryClient(schemaScriptPath string) (*Client, error)

Produces new Client using in-memory SQLite database with schema created based on given script.

func NewPostgresClient

func NewPostgresClient(dbConn *sql.DB, dbName string, logger *slog.Logger) (*Client, error)

Produces new Client based on given connection to PostgreSQL database. If given database doesn't contain required ppacer schema, it will be created on the client initialization.

func NewPostgresClientForLogs

func NewPostgresClientForLogs(dbConn *sql.DB, dbName string, logger *slog.Logger) (*Client, error)

Produces new Client for logs based on given connection to PostgreSQL database. If given database doesn't contain required logs schema, it will be created on the client initialization.

func NewSqliteClient

func NewSqliteClient(dbFilePath string, logger *slog.Logger) (*Client, error)

Produces new Client based on given connection string to SQLite database. If database file does not exist in given location, then empty SQLite database with setup schema will be created.

func NewSqliteClientForLogs

func NewSqliteClientForLogs(dbFilePath string, logger *slog.Logger) (*Client, error)

Produces new Client for logs based on given connection string to SQLite database. If database file does not exist in given location, then empty SQLite database with setup schema will be created.

func NewSqliteInMemoryClient

func NewSqliteInMemoryClient(logger *slog.Logger) (*Client, error)

Produces new Client based on SQLite in-memory database. It's not persisted anywhere else. Useful usually for tests and small examples.

func NewSqliteTmpClient

func NewSqliteTmpClient(logger *slog.Logger) (*Client, error)

Produces new Client using SQLite database created as temp file. It's mainly for testing and ad-hocs.

func NewSqliteTmpClientForLogs

func NewSqliteTmpClientForLogs(logger *slog.Logger) (*Client, error)

Produces new Client for logs using SQLite database created as temp file. It's mainly for testing and ad-hocs.

func (*Client) Count

func (c *Client) Count(table string) int

Count returns count of rows for given table. If case of errors -1 is returned and error is logged.

func (*Client) CountWhere

func (c *Client) CountWhere(table, where string) int

CountWhere returns count of rows for given table filtered by given where condition. If case of errors -1 is returned and error is logged.

func (*Client) DagRunAlreadyScheduled

func (c *Client) DagRunAlreadyScheduled(
	ctx context.Context, dagId, execTs string,
) (bool, error)

DagRunAlreadyScheduled checks whenever dagrun already exists for given DAG ID and schedule timestamp.

func (*Client) InsertDagRun

func (c *Client) InsertDagRun(ctx context.Context, dagId, execTs string) (int64, error)

InsertDagRun inserts new row into dagruns table for given DagId and execution timestamp. Initial status is set to DagRunStatusScheduled. RunId for just inserted dag run is returned or -1 in case when error is not nil.

func (*Client) InsertDagRunTask

func (c *Client) InsertDagRunTask(ctx context.Context, dagId, execTs, taskId, status string) error

Inserts new DagRunTask with default status SCHEDULED.

func (*Client) InsertDagTasks

func (c *Client) InsertDagTasks(ctx context.Context, d dag.Dag) error

InsertDagTasks inserts the tasks of given DAG to dagtasks table and set it as the current version. Previous versions would still be in dagtasks table but with set IsCurrent=0. In case of inserting any of dag's task insertion would be rollbacked (in terms of SQL transactions).

func (*Client) InsertTaskLog

func (c *Client) InsertTaskLog(tlr TaskLogRecord) error

InsertTaskLog inserts single log record into tasklogs table.

func (*Client) ReadDag

func (c *Client) ReadDag(ctx context.Context, dagId string) (Dag, error)

ReadDag reads metadata about DAG from dags table for given dagId.

func (*Client) ReadDagRunLogs

func (c *Client) ReadDagRunLogs(ctx context.Context, dagId, execTs string) ([]TaskLogRecord, error)

ReadDagRunLogs reads all task logs for given DAG run in chronological order.

func (*Client) ReadDagRunTask

func (c *Client) ReadDagRunTask(ctx context.Context, dagId, execTs, taskId string) (DagRunTask, error)

ReadDagRunTask reads information about given taskId in given dag run.

func (*Client) ReadDagRunTaskLogs

func (c *Client) ReadDagRunTaskLogs(ctx context.Context, dagId, execTs, taskId string) ([]TaskLogRecord, error)

ReadDagRunTaskLogs reads all logs for given DAG run task in chronological order.

func (*Client) ReadDagRunTasks

func (c *Client) ReadDagRunTasks(ctx context.Context, dagId, execTs string) ([]DagRunTask, error)

Reads DAG run tasks information from dagruntasks table for given DAG run.

func (*Client) ReadDagRunTasksNotFinished

func (c *Client) ReadDagRunTasksNotFinished(ctx context.Context) ([]DagRunTask, error)

ReadDagRunTasksNotFinished reads tasks from dagruntasks table which are not in terminal state (success or failed). Tasks are sorted from oldest to newest based on execTs and insertTs.

func (*Client) ReadDagRuns

func (c *Client) ReadDagRuns(ctx context.Context, dagId string, topN int) ([]DagRun, error)

ReadDagRuns reads topN latest dag runs for given DAG ID.

func (*Client) ReadDagRunsNotFinished

func (c *Client) ReadDagRunsNotFinished(ctx context.Context) ([]DagRun, error)

Reads dag run from dagruns table which are not in terminal states.

func (*Client) ReadDagTask

func (c *Client) ReadDagTask(ctx context.Context, dagId, taskId string) (DagTask, error)

ReadDagTask reads single row (current version) from dagtasks table for given DAG ID and task ID.

func (*Client) ReadDagTasks

func (c *Client) ReadDagTasks(ctx context.Context, dagId string) ([]DagTask, error)

ReadDagTasks reads all tasks for given dagId in the current version from dagtasks table.

func (*Client) ReadLatestDagRuns

func (c *Client) ReadLatestDagRuns(ctx context.Context) (map[string]DagRun, error)

ReadLatestDagRuns reads latest dag run for each Dag. Returns map from DagId to DagRun.

func (*Client) RunningTasksNum

func (c *Client) RunningTasksNum(ctx context.Context) (int, error)

RunningTasksNum returns number of currently running tasks. That means rows in dagruntasks table with status 'RUNNING'.

func (*Client) UpdateDagRunStatus

func (c *Client) UpdateDagRunStatus(
	ctx context.Context, runId int64, status string,
) error

Updates dagrun status for given runId.

func (*Client) UpdateDagRunStatusByExecTs

func (c *Client) UpdateDagRunStatusByExecTs(
	ctx context.Context, dagId, execTs, status string,
) error

Updates dagrun status for given dagId and execTs (when runId is not available). Pair (dagId, execTs) is unique in dagrun table.

func (*Client) UpdateDagRunTaskStatus

func (c *Client) UpdateDagRunTaskStatus(ctx context.Context, dagId, execTs, taskId, status string) error

Updates dagruntask status for given dag run task.

func (*Client) UpsertDag

func (c *Client) UpsertDag(ctx context.Context, d dag.Dag) error

Upsert inserts or updates DAG details in dags table. TODO(dskrzypiec): Perhaps we should always insert new DAG into dags and keep IsCurrent flag? Similarly like we do in dagtasks. Not really needed for now, but something to consider in the future.

type DB

type DB interface {
	Begin() (*sql.Tx, error)
	Exec(query string, args ...any) (sql.Result, error)
	ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
	Close() error
	DataSource() string
	Query(query string, args ...any) (*sql.Rows, error)
	QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
	QueryRow(query string, args ...any) *sql.Row
	QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}

DB defines a set of operations required from a database. Most of methods are identical with standard `*sql.DB` type.

type Dag

type Dag struct {
	DagId               string
	StartTs             *string
	Schedule            *string
	CreateTs            string
	LatestUpdateTs      *string
	CreateVersion       string
	LatestUpdateVersion *string
	HashDagMeta         string
	HashTasks           string
	Attributes          string // serialized dag.Dag.Attr
}

func (Dag) Equals

func (d Dag) Equals(e Dag) bool

type DagRun

type DagRun struct {
	RunId          int64
	DagId          string
	ExecTs         string
	InsertTs       string
	Status         string
	StatusUpdateTs string
	Version        string
}

type DagRunTask

type DagRunTask struct {
	DagId          string
	ExecTs         string
	TaskId         string
	InsertTs       string
	Status         string
	StatusUpdateTs string
	Version        string
}

type DagTask

type DagTask struct {
	DagId          string
	TaskId         string
	IsCurrent      bool
	InsertTs       string
	Version        string
	TaskTypeName   string
	TaskBodyHash   string
	TaskBodySource string
}

DagTask represents single row in dagtasks table in the database.

type PostgresDB

type PostgresDB struct {
	// contains filtered or unexported fields
}

PostgresDB represents Client for PostgreSQL database.

func (*PostgresDB) Begin

func (s *PostgresDB) Begin() (*sql.Tx, error)

func (*PostgresDB) Close

func (s *PostgresDB) Close() error

func (*PostgresDB) DataSource

func (s *PostgresDB) DataSource() string

func (*PostgresDB) Exec

func (s *PostgresDB) Exec(query string, args ...any) (sql.Result, error)

func (*PostgresDB) ExecContext

func (s *PostgresDB) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)

func (*PostgresDB) Query

func (s *PostgresDB) Query(query string, args ...any) (*sql.Rows, error)

func (*PostgresDB) QueryContext

func (s *PostgresDB) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)

func (*PostgresDB) QueryRow

func (s *PostgresDB) QueryRow(query string, args ...any) *sql.Row

func (*PostgresDB) QueryRowContext

func (s *PostgresDB) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row

type SqliteDB

type SqliteDB struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*SqliteDB) Begin

func (s *SqliteDB) Begin() (*sql.Tx, error)

func (*SqliteDB) Close

func (s *SqliteDB) Close() error

func (*SqliteDB) DataSource

func (s *SqliteDB) DataSource() string

func (*SqliteDB) Exec

func (s *SqliteDB) Exec(query string, args ...any) (sql.Result, error)

func (*SqliteDB) ExecContext

func (s *SqliteDB) ExecContext(
	ctx context.Context, query string, args ...any,
) (sql.Result, error)

func (*SqliteDB) Query

func (s *SqliteDB) Query(query string, args ...any) (*sql.Rows, error)

func (*SqliteDB) QueryContext

func (s *SqliteDB) QueryContext(
	ctx context.Context, query string, args ...any,
) (*sql.Rows, error)

func (*SqliteDB) QueryRow

func (s *SqliteDB) QueryRow(query string, args ...any) *sql.Row

func (*SqliteDB) QueryRowContext

func (s *SqliteDB) QueryRowContext(
	ctx context.Context, query string, args ...any,
) *sql.Row

type SqliteDBInMemory

type SqliteDBInMemory struct {
	sync.Mutex
	// contains filtered or unexported fields
}

SQLite database where data is stored in the memory rather than in a file on a disk. It needs additional level of isolation for concurrent access.

func (*SqliteDBInMemory) Begin

func (s *SqliteDBInMemory) Begin() (*sql.Tx, error)

func (*SqliteDBInMemory) Close

func (s *SqliteDBInMemory) Close() error

func (*SqliteDBInMemory) DataSource

func (s *SqliteDBInMemory) DataSource() string

func (*SqliteDBInMemory) Exec

func (s *SqliteDBInMemory) Exec(query string, args ...any) (sql.Result, error)

func (*SqliteDBInMemory) ExecContext

func (s *SqliteDBInMemory) ExecContext(
	ctx context.Context, query string, args ...any,
) (sql.Result, error)

func (*SqliteDBInMemory) Query

func (s *SqliteDBInMemory) Query(query string, args ...any) (*sql.Rows, error)

func (*SqliteDBInMemory) QueryContext

func (s *SqliteDBInMemory) QueryContext(
	ctx context.Context, query string, args ...any,
) (*sql.Rows, error)

func (*SqliteDBInMemory) QueryRow

func (s *SqliteDBInMemory) QueryRow(query string, args ...any) *sql.Row

func (*SqliteDBInMemory) QueryRowContext

func (s *SqliteDBInMemory) QueryRowContext(
	ctx context.Context, query string, args ...any,
) *sql.Row

type TaskLogRecord

type TaskLogRecord struct {
	DagId      string
	ExecTs     string
	TaskId     string
	InsertTs   string
	Level      string
	Message    string
	Attributes string
}

LogRecord represents single row in tasklogs table.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL