Documentation ¶
Overview ¶
Package db contains all communication between ppacer and the database.
Introduction ¶
Supported databases ¶
- SQLite - used as the default database. It's also used as in-memory database and database on /tmp files for unit and integration tests.
- Postgres
- ... (More in the future)
Index ¶
- Constants
- Variables
- func CleanUpSqliteTmp(c *Client, t *testing.T)
- func SchemaStatements(dbDriver string) ([]string, error)
- func SchemaStatementsForLogs(dbDriver string) ([]string, error)
- type Client
- func NewInMemoryClient(schemaScriptPath string) (*Client, error)
- func NewPostgresClient(dbConn *sql.DB, dbName string, logger *slog.Logger) (*Client, error)
- func NewPostgresClientForLogs(dbConn *sql.DB, dbName string, logger *slog.Logger) (*Client, error)
- func NewSqliteClient(dbFilePath string, logger *slog.Logger) (*Client, error)
- func NewSqliteClientForLogs(dbFilePath string, logger *slog.Logger) (*Client, error)
- func NewSqliteInMemoryClient(logger *slog.Logger) (*Client, error)
- func NewSqliteTmpClient(logger *slog.Logger) (*Client, error)
- func NewSqliteTmpClientForLogs(logger *slog.Logger) (*Client, error)
- func (c *Client) Count(table string) int
- func (c *Client) CountWhere(table, where string) int
- func (c *Client) DagRunAlreadyScheduled(ctx context.Context, dagId, execTs string) (bool, error)
- func (c *Client) InsertDagRun(ctx context.Context, dagId, execTs string) (int64, error)
- func (c *Client) InsertDagRunTask(ctx context.Context, dagId, execTs, taskId, status string) error
- func (c *Client) InsertDagTasks(ctx context.Context, d dag.Dag) error
- func (c *Client) InsertTaskLog(tlr TaskLogRecord) error
- func (c *Client) ReadDag(ctx context.Context, dagId string) (Dag, error)
- func (c *Client) ReadDagRunLogs(ctx context.Context, dagId, execTs string) ([]TaskLogRecord, error)
- func (c *Client) ReadDagRunTask(ctx context.Context, dagId, execTs, taskId string) (DagRunTask, error)
- func (c *Client) ReadDagRunTaskLogs(ctx context.Context, dagId, execTs, taskId string) ([]TaskLogRecord, error)
- func (c *Client) ReadDagRunTasks(ctx context.Context, dagId, execTs string) ([]DagRunTask, error)
- func (c *Client) ReadDagRunTasksNotFinished(ctx context.Context) ([]DagRunTask, error)
- func (c *Client) ReadDagRuns(ctx context.Context, dagId string, topN int) ([]DagRun, error)
- func (c *Client) ReadDagRunsNotFinished(ctx context.Context) ([]DagRun, error)
- func (c *Client) ReadDagTask(ctx context.Context, dagId, taskId string) (DagTask, error)
- func (c *Client) ReadDagTasks(ctx context.Context, dagId string) ([]DagTask, error)
- func (c *Client) ReadLatestDagRuns(ctx context.Context) (map[string]DagRun, error)
- func (c *Client) RunningTasksNum(ctx context.Context) (int, error)
- func (c *Client) UpdateDagRunStatus(ctx context.Context, runId int64, status string) error
- func (c *Client) UpdateDagRunStatusByExecTs(ctx context.Context, dagId, execTs, status string) error
- func (c *Client) UpdateDagRunTaskStatus(ctx context.Context, dagId, execTs, taskId, status string) error
- func (c *Client) UpsertDag(ctx context.Context, d dag.Dag) error
- type DB
- type Dag
- type DagRun
- type DagRunTask
- type DagTask
- type PostgresDB
- func (s *PostgresDB) Begin() (*sql.Tx, error)
- func (s *PostgresDB) Close() error
- func (s *PostgresDB) DataSource() string
- func (s *PostgresDB) Exec(query string, args ...any) (sql.Result, error)
- func (s *PostgresDB) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
- func (s *PostgresDB) Query(query string, args ...any) (*sql.Rows, error)
- func (s *PostgresDB) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
- func (s *PostgresDB) QueryRow(query string, args ...any) *sql.Row
- func (s *PostgresDB) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
- type SqliteDB
- func (s *SqliteDB) Begin() (*sql.Tx, error)
- func (s *SqliteDB) Close() error
- func (s *SqliteDB) DataSource() string
- func (s *SqliteDB) Exec(query string, args ...any) (sql.Result, error)
- func (s *SqliteDB) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
- func (s *SqliteDB) Query(query string, args ...any) (*sql.Rows, error)
- func (s *SqliteDB) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
- func (s *SqliteDB) QueryRow(query string, args ...any) *sql.Row
- func (s *SqliteDB) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
- type SqliteDBInMemory
- func (s *SqliteDBInMemory) Begin() (*sql.Tx, error)
- func (s *SqliteDBInMemory) Close() error
- func (s *SqliteDBInMemory) DataSource() string
- func (s *SqliteDBInMemory) Exec(query string, args ...any) (sql.Result, error)
- func (s *SqliteDBInMemory) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
- func (s *SqliteDBInMemory) Query(query string, args ...any) (*sql.Rows, error)
- func (s *SqliteDBInMemory) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
- func (s *SqliteDBInMemory) QueryRow(query string, args ...any) *sql.Row
- func (s *SqliteDBInMemory) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
- type TaskLogRecord
Constants ¶
const (
DagRunTaskStatusScheduled = "SCHEDULED"
)
Variables ¶
var TableNames []string = []string{
"dags",
"dagtasks",
"dagruns",
"dagruntasks",
}
TableNames is a list of ppacer database table names.
Functions ¶
func CleanUpSqliteTmp ¶
CleanUpSqliteTmp deletes SQLite database source file if all tests in the scope passed. In at least one test failed, database will not be deleted, to enable futher debugging. Even though this function takes generic *Client, it's mainly meant for SQLite-based database clients which are used in testing.
func SchemaStatements ¶
SchemaStatements returns a list of SQL statements that setups new instance of scheduler internal database. It can differ a little bit between SQL databases, so exact list of statements are prepared based on given database driver name. If given database driver is not supported, then non-nil error is returned.
func SchemaStatementsForLogs ¶
Like SchemaStatements but for database for logs.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client represents the main database client.
func NewInMemoryClient ¶
Produces new Client using in-memory SQLite database with schema created based on given script.
func NewPostgresClient ¶
Produces new Client based on given connection to PostgreSQL database. If given database doesn't contain required ppacer schema, it will be created on the client initialization.
func NewPostgresClientForLogs ¶
Produces new Client for logs based on given connection to PostgreSQL database. If given database doesn't contain required logs schema, it will be created on the client initialization.
func NewSqliteClient ¶
Produces new Client based on given connection string to SQLite database. If database file does not exist in given location, then empty SQLite database with setup schema will be created.
func NewSqliteClientForLogs ¶
Produces new Client for logs based on given connection string to SQLite database. If database file does not exist in given location, then empty SQLite database with setup schema will be created.
func NewSqliteInMemoryClient ¶
Produces new Client based on SQLite in-memory database. It's not persisted anywhere else. Useful usually for tests and small examples.
func NewSqliteTmpClient ¶
Produces new Client using SQLite database created as temp file. It's mainly for testing and ad-hocs.
func NewSqliteTmpClientForLogs ¶
Produces new Client for logs using SQLite database created as temp file. It's mainly for testing and ad-hocs.
func (*Client) Count ¶
Count returns count of rows for given table. If case of errors -1 is returned and error is logged.
func (*Client) CountWhere ¶
CountWhere returns count of rows for given table filtered by given where condition. If case of errors -1 is returned and error is logged.
func (*Client) DagRunAlreadyScheduled ¶
DagRunAlreadyScheduled checks whenever dagrun already exists for given DAG ID and schedule timestamp.
func (*Client) InsertDagRun ¶
InsertDagRun inserts new row into dagruns table for given DagId and execution timestamp. Initial status is set to DagRunStatusScheduled. RunId for just inserted dag run is returned or -1 in case when error is not nil.
func (*Client) InsertDagRunTask ¶
Inserts new DagRunTask with default status SCHEDULED.
func (*Client) InsertDagTasks ¶
InsertDagTasks inserts the tasks of given DAG to dagtasks table and set it as the current version. Previous versions would still be in dagtasks table but with set IsCurrent=0. In case of inserting any of dag's task insertion would be rollbacked (in terms of SQL transactions).
func (*Client) InsertTaskLog ¶
func (c *Client) InsertTaskLog(tlr TaskLogRecord) error
InsertTaskLog inserts single log record into tasklogs table.
func (*Client) ReadDagRunLogs ¶
ReadDagRunLogs reads all task logs for given DAG run in chronological order.
func (*Client) ReadDagRunTask ¶
func (c *Client) ReadDagRunTask(ctx context.Context, dagId, execTs, taskId string) (DagRunTask, error)
ReadDagRunTask reads information about given taskId in given dag run.
func (*Client) ReadDagRunTaskLogs ¶
func (c *Client) ReadDagRunTaskLogs(ctx context.Context, dagId, execTs, taskId string) ([]TaskLogRecord, error)
ReadDagRunTaskLogs reads all logs for given DAG run task in chronological order.
func (*Client) ReadDagRunTasks ¶
Reads DAG run tasks information from dagruntasks table for given DAG run.
func (*Client) ReadDagRunTasksNotFinished ¶
func (c *Client) ReadDagRunTasksNotFinished(ctx context.Context) ([]DagRunTask, error)
ReadDagRunTasksNotFinished reads tasks from dagruntasks table which are not in terminal state (success or failed). Tasks are sorted from oldest to newest based on execTs and insertTs.
func (*Client) ReadDagRuns ¶
ReadDagRuns reads topN latest dag runs for given DAG ID.
func (*Client) ReadDagRunsNotFinished ¶
Reads dag run from dagruns table which are not in terminal states.
func (*Client) ReadDagTask ¶
ReadDagTask reads single row (current version) from dagtasks table for given DAG ID and task ID.
func (*Client) ReadDagTasks ¶
ReadDagTasks reads all tasks for given dagId in the current version from dagtasks table.
func (*Client) ReadLatestDagRuns ¶
ReadLatestDagRuns reads latest dag run for each Dag. Returns map from DagId to DagRun.
func (*Client) RunningTasksNum ¶
RunningTasksNum returns number of currently running tasks. That means rows in dagruntasks table with status 'RUNNING'.
func (*Client) UpdateDagRunStatus ¶
Updates dagrun status for given runId.
func (*Client) UpdateDagRunStatusByExecTs ¶
func (c *Client) UpdateDagRunStatusByExecTs( ctx context.Context, dagId, execTs, status string, ) error
Updates dagrun status for given dagId and execTs (when runId is not available). Pair (dagId, execTs) is unique in dagrun table.
func (*Client) UpdateDagRunTaskStatus ¶
func (c *Client) UpdateDagRunTaskStatus(ctx context.Context, dagId, execTs, taskId, status string) error
Updates dagruntask status for given dag run task.
type DB ¶
type DB interface { Begin() (*sql.Tx, error) Exec(query string, args ...any) (sql.Result, error) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) Close() error DataSource() string Query(query string, args ...any) (*sql.Rows, error) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) QueryRow(query string, args ...any) *sql.Row QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row }
DB defines a set of operations required from a database. Most of methods are identical with standard `*sql.DB` type.
type Dag ¶
type DagRunTask ¶
type DagTask ¶
type DagTask struct { DagId string TaskId string IsCurrent bool InsertTs string Version string TaskTypeName string TaskBodyHash string TaskBodySource string }
DagTask represents single row in dagtasks table in the database.
type PostgresDB ¶
type PostgresDB struct {
// contains filtered or unexported fields
}
PostgresDB represents Client for PostgreSQL database.
func (*PostgresDB) Close ¶
func (s *PostgresDB) Close() error
func (*PostgresDB) DataSource ¶
func (s *PostgresDB) DataSource() string
func (*PostgresDB) ExecContext ¶
func (*PostgresDB) QueryContext ¶
func (*PostgresDB) QueryRowContext ¶
type SqliteDB ¶
func (*SqliteDB) DataSource ¶
func (*SqliteDB) ExecContext ¶
func (*SqliteDB) QueryContext ¶
type SqliteDBInMemory ¶
SQLite database where data is stored in the memory rather than in a file on a disk. It needs additional level of isolation for concurrent access.
func (*SqliteDBInMemory) Close ¶
func (s *SqliteDBInMemory) Close() error
func (*SqliteDBInMemory) DataSource ¶
func (s *SqliteDBInMemory) DataSource() string
func (*SqliteDBInMemory) ExecContext ¶
func (*SqliteDBInMemory) QueryContext ¶
func (*SqliteDBInMemory) QueryRow ¶
func (s *SqliteDBInMemory) QueryRow(query string, args ...any) *sql.Row