jobsdb

package
v0.0.0-...-0ade494 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2022 License: AGPL-3.0 Imports: 33 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ExportOp          = "export"
	ImportOp          = "import"
	AcceptNewEventsOp = "acceptNewEvents"
)

ENUM Values for MigrationOp

View Source
const (
	SetupForExport = "setup_for_export"
	Exported       = "exported"
	Notified       = "notified"
	Completed      = "completed"

	SetupToAcceptNewEvents = "setup_to_accept_new_events"
	SetupForImport         = "setup_for_import"
	PreparedForImport      = "prepared_for_import"
	Imported               = "imported"
)

ENUM Values for Status

View Source
const (
	MigrationCheckpointSuffix = "migration_checkpoints"
	UniqueConstraintSuffix    = "unique_checkpoint"
)

MigrationCheckpointSuffix : Suffix for checkpoints table

View Source
const (
	RawDataDestUploadOperation = "S3_DEST_UPLOAD"
)

We keep a journal of all the operations. The journal helps

Variables

View Source
var (
	// Not valid, Not terminal
	NotProcessed = jobStateT{State: "not_picked_yet", /* contains filtered or unexported fields */}

	// Valid, Not terminal
	Failed       = jobStateT{State: "failed", /* contains filtered or unexported fields */}
	Executing    = jobStateT{State: "executing", /* contains filtered or unexported fields */}
	Waiting      = jobStateT{State: "waiting", /* contains filtered or unexported fields */}
	WaitingRetry = jobStateT{State: "waiting_retry", /* contains filtered or unexported fields */}
	Migrating    = jobStateT{State: "migrating", /* contains filtered or unexported fields */}
	Importing    = jobStateT{State: "importing", /* contains filtered or unexported fields */}

	// Valid, Terminal
	Succeeded   = jobStateT{State: "succeeded", /* contains filtered or unexported fields */}
	Aborted     = jobStateT{State: "aborted", /* contains filtered or unexported fields */}
	Migrated    = jobStateT{State: "migrated", /* contains filtered or unexported fields */}
	WontMigrate = jobStateT{State: "wont_migrate", /* contains filtered or unexported fields */}
)

State definitions

Functions

func GetConnectionString

func GetConnectionString() string

GetConnectionString Returns Jobs DB connection configuration

func Init

func Init()

func Init2

func Init2()

func Init3

func Init3()

func IsMasterBackupEnabled

func IsMasterBackupEnabled() bool

Types

type DSPair

type DSPair struct {
	JobTableName       string
	JobStatusTableName string
}

type ErrorCodeCountStats

type ErrorCodeCountStats struct {
	ErrorCodeCounts []ErrorCodeCountsByDestination
}

type ErrorCodeCountsByDestination

type ErrorCodeCountsByDestination struct {
	Count         int
	ErrorCode     string
	Destination   string
	DestinationID string
}

type EventStatusDetailed

type EventStatusDetailed struct {
	Status        string
	SourceID      string
	DestinationID string
	CustomVal     string
	Count         int
}

type EventStatusStats

type EventStatusStats struct {
	StatsNums []EventStatusDetailed
	DSList    string
}

type FailedJobs

type FailedJobs struct {
	JobID         int
	UserID        string
	CustomVal     string
	ExecTime      time.Time
	ErrorCode     string
	ErrorResponse string
}

type FailedJobsStats

type FailedJobsStats struct {
	FailedNums []FailedJobs
}

type FailedStatusStats

type FailedStatusStats struct {
	FailedStatusStats []JobStatusT
}

type GetQueryParamsT

type GetQueryParamsT struct {

	// if IgnoreCustomValFiltersInQuery is true, CustomValFilters is not going to be used
	IgnoreCustomValFiltersInQuery bool
	CustomValFilters              []string
	ParameterFilters              []ParameterFilterT
	StateFilters                  []string

	// Limit the total number of jobs.
	// A value less than or equal to zero will return no results
	JobsLimit int
	// Limit the total number of events, 1 job contains 1+ event(s).
	// A value less than or equal to zero will disable this limit (no limit),
	// only values greater than zero are considered as valid limits.
	EventsLimit int
	// Limit the total job payload size
	// A value less than or equal to zero will disable this limit (no limit),
	// only values greater than zero are considered as valid limits.
	PayloadSizeLimit int64
}

GetQueryParamsT is a struct to hold jobsdb query params.

type HandleInspector

type HandleInspector struct {
	*HandleT
}

HandleInspector is only intended to be used by tests for verifying the handle's internal state

func (*HandleInspector) DSListSize

func (h *HandleInspector) DSListSize() int

DSListSize returns the current size of the handle's dsList

type HandleT

type HandleT struct {
	MinDSRetentionPeriod time.Duration
	MaxDSRetentionPeriod time.Duration

	BackupSettings *backupSettings

	MaxDSSize *int

	// TriggerAddNewDS, TriggerMigrateDS is useful for triggering addNewDS to run from tests.
	// TODO: Ideally we should refactor the code to not use this override.
	TriggerAddNewDS  func() <-chan time.Time
	TriggerMigrateDS func() <-chan time.Time
	// contains filtered or unexported fields
}

HandleT is the main type implementing the database for implementing jobs. The caller must call the SetUp function on a HandleT object

func NewForRead

func NewForRead(tablePrefix string, opts ...OptsFunc) *HandleT

func NewForReadWrite

func NewForReadWrite(tablePrefix string, opts ...OptsFunc) *HandleT

func NewForWrite

func NewForWrite(tablePrefix string, opts ...OptsFunc) *HandleT

func (*HandleT) Checkpoint

func (jd *HandleT) Checkpoint(migrationCheckpoint MigrationCheckpointT) int64

Checkpoint writes a migration event if id is passed as 0. Else it will update status and start_sequence

func (*HandleT) CheckpointInTxn

func (jd *HandleT) CheckpointInTxn(txHandler transactionHandler, migrationCheckpoint MigrationCheckpointT) (int64, error)

CheckpointInTxn writes a migration event if id is passed as 0. Else it will update status and start_sequence If txn is passed, it will run the statement in that txn, otherwise it will execute without a transaction

func (*HandleT) Close

func (jd *HandleT) Close()

Close closes the database connection.

Stop should be called before Close.

func (*HandleT) DeleteExecuting

func (jd *HandleT) DeleteExecuting()

DeleteExecuting deletes events whose latest job state is executing. This is only done during recovery, which happens during the server start.

func (*HandleT) GetCheckpoints

func (jd *HandleT) GetCheckpoints(migrationType MigrationOp, status string) []MigrationCheckpointT

GetCheckpoints gets all checkpoints and

func (*HandleT) GetExecuting

func (jd *HandleT) GetExecuting(params GetQueryParamsT) JobsResult

func (*HandleT) GetImporting

func (jd *HandleT) GetImporting(params GetQueryParamsT) JobsResult

func (*HandleT) GetJournalEntries

func (jd *HandleT) GetJournalEntries(opType string) (entries []JournalEntryT)

func (*HandleT) GetLastJob

func (jd *HandleT) GetLastJob() *JobT

func (*HandleT) GetLastJobID

func (jd *HandleT) GetLastJobID() int64

func (*HandleT) GetLastJobIDBeforeImport

func (jd *HandleT) GetLastJobIDBeforeImport() int64

GetLastJobIDBeforeImport should return the largest job id stored so far

func (*HandleT) GetMaxDSIndex

func (jd *HandleT) GetMaxDSIndex() (maxDSIndex int64)

* Function to return max dataset index in the DB

func (*HandleT) GetMaxIDForDs

func (jd *HandleT) GetMaxIDForDs(ds dataSetT) int64

func (*HandleT) GetNonMigratedAndMarkMigrating

func (jd *HandleT) GetNonMigratedAndMarkMigrating(count int) []*JobT

GetNonMigratedAndMarkMigrating all jobs with no filters

func (*HandleT) GetPileUpCounts

func (jd *HandleT) GetPileUpCounts(statMap map[string]map[string]int)

func (*HandleT) GetProcessed

func (jd *HandleT) GetProcessed(params GetQueryParamsT) JobsResult

GetProcessed returns events of a given state. This does not update any state itself and realises on the caller to update it. That means that successive calls to GetProcessed("failed") can return the same set of events. It is the responsibility of the caller to call it from one thread, update the state (to "waiting") in the same thread and pass on the the processors

func (*HandleT) GetSetupCheckpoint

func (jd *HandleT) GetSetupCheckpoint(migrationType MigrationOp) (MigrationCheckpointT, bool)

GetSetupCheckpoint gets all checkpoints and picks out the setup event for that type

func (*HandleT) GetTablePrefix

func (jd *HandleT) GetTablePrefix() string

GetTablePrefix returns the table prefix of the jobsdb.

func (*HandleT) GetToRetry

func (jd *HandleT) GetToRetry(params GetQueryParamsT) JobsResult

GetToRetry returns events which need to be retried. If enableReaderQueue is true, this goes through worker pool, else calls getUnprocessed directly.

func (*HandleT) GetUnprocessed

func (jd *HandleT) GetUnprocessed(params GetQueryParamsT) JobsResult

GetUnprocessed returns the unprocessed events. Unprocessed events are those whose state hasn't been marked in the DB. If enableReaderQueue is true, this goes through worker pool, else calls getUnprocessed directly.

func (*HandleT) GetUserID

func (*HandleT) GetUserID(job *JobT) string

GetUserID from job

func (*HandleT) GetWaiting

func (jd *HandleT) GetWaiting(params GetQueryParamsT) JobsResult

GetWaiting returns events which are under processing If enableReaderQueue is true, this goes through worker pool, else calls getUnprocessed directly.

func (*HandleT) Identifier

func (jd *HandleT) Identifier() string

Identifier returns the identifier of the jobsdb. Here it is tablePrefix.

func (*HandleT) IsMigrating

func (jd *HandleT) IsMigrating() bool

IsMigrating returns true if there are non zero jobs with status = 'migrating'

func (*HandleT) JournalDeleteEntry

func (jd *HandleT) JournalDeleteEntry(opID int64)

func (*HandleT) JournalMarkDone

func (jd *HandleT) JournalMarkDone(opID int64)

JournalMarkDone marks the end of a journal action

func (*HandleT) JournalMarkStart

func (jd *HandleT) JournalMarkStart(opType string, opPayload json.RawMessage) int64

func (*HandleT) JournalMarkStartInTx

func (jd *HandleT) JournalMarkStartInTx(tx *sql.Tx, opType string, opPayload json.RawMessage) (int64, error)

func (*HandleT) Ping

func (jd *HandleT) Ping() error

Ping returns health check for pg database

func (*HandleT) PostExportCleanup

func (jd *HandleT) PostExportCleanup()

PostExportCleanup removes all the entries from job_status_tables that are of state 'wont_migrate' or 'migrating'

func (*HandleT) PreExportCleanup

func (jd *HandleT) PreExportCleanup()

PreExportCleanup removes all the entries from job_status_tables that are of state 'migrating'

func (*HandleT) RecoverFromMigrationJournal

func (jd *HandleT) RecoverFromMigrationJournal()

RecoverFromMigrationJournal is an exposed function for migrator package to handle journal crashes during migration

func (*HandleT) SchemaMigrationTable

func (jd *HandleT) SchemaMigrationTable() string

SchemaMigrationTable returns the table name used for storing current schema version.

func (*HandleT) Setup

func (jd *HandleT) Setup(
	ownerType OwnerType, clearAll bool, tablePrefix, migrationMode string,
	registerStatusHandler bool, queryFilterKeys QueryFiltersT, preBackupHandlers []prebackup.Handler,
) error

Setup is used to initialize the HandleT structure. clearAll = True means it will remove all existing tables tablePrefix must be unique and is used to separate multiple users of JobsDB

func (*HandleT) SetupCheckpointTable

func (jd *HandleT) SetupCheckpointTable()

SetupCheckpointTable creates a table

func (*HandleT) SetupForExport

func (jd *HandleT) SetupForExport()

SetupForExport is used to setup jobsdb for export or for import or for both

func (*HandleT) SetupForImport

func (jd *HandleT) SetupForImport()

SetupForImport is used to setup jobsdb for export or for import or for both

func (*HandleT) SetupForMigration

func (jd *HandleT) SetupForMigration(fromVersion, toVersion int)

SetupForMigration prepares jobsdb to start migrations

func (*HandleT) Start

func (jd *HandleT) Start() error

Start starts the jobsdb worker and housekeeping (migration, archive) threads. Start should be called before any other jobsdb methods are called.

func (*HandleT) Status

func (jd *HandleT) Status() interface{}

func (*HandleT) Stop

func (jd *HandleT) Stop()

Stop stops the background goroutines and waits until they finish. Stop should be called once only after Start. Only Start and Close can be called after Stop.

func (*HandleT) Store

func (jd *HandleT) Store(ctx context.Context, jobList []*JobT) error

Store stores new jobs to the jobsdb. If enableWriterQueue is true, this goes through writer worker pool.

func (*HandleT) StoreInTx

func (jd *HandleT) StoreInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) error

StoreInTx stores new jobs to the jobsdb. If enableWriterQueue is true, this goes through writer worker pool.

func (*HandleT) StoreJobsAndCheckpoint

func (jd *HandleT) StoreJobsAndCheckpoint(jobList []*JobT, migrationCheckpoint MigrationCheckpointT)

StoreJobsAndCheckpoint is used to write the jobs to _tables

func (*HandleT) StoreWithRetryEach

func (jd *HandleT) StoreWithRetryEach(ctx context.Context, jobList []*JobT) map[uuid.UUID]string

func (*HandleT) StoreWithRetryEachInTx

func (jd *HandleT) StoreWithRetryEachInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) map[uuid.UUID]string

func (*HandleT) TearDown

func (jd *HandleT) TearDown()

TearDown stops the background goroutines,

waits until they finish and closes the database.

func (*HandleT) UpdateJobStatus

func (jd *HandleT) UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error

func (*HandleT) UpdateJobStatusAndCheckpoint

func (jd *HandleT) UpdateJobStatusAndCheckpoint(statusList []*JobStatusT, fromNodeID, toNodeID string, jobsCount int64, uploadLocation string)

UpdateJobStatusAndCheckpoint does update job status and checkpoint in a single transaction

func (*HandleT) UpdateJobStatusInTx

func (jd *HandleT) UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error

UpdateJobStatusInTx updates the status of a batch of jobs in the passed transaction customValFilters[] is passed so we can efficinetly mark empty cache Later we can move this to query IMP NOTE: AcquireUpdateJobStatusLocks Should be called before calling this function

func (*HandleT) UpdateSequenceNumberOfLatestDS

func (jd *HandleT) UpdateSequenceNumberOfLatestDS(seqNoForNewDS int64)

func (*HandleT) WithStoreSafeTx

func (jd *HandleT) WithStoreSafeTx(f func(tx StoreSafeTx) error) error

func (*HandleT) WithTx

func (jd *HandleT) WithTx(f func(tx *sql.Tx) error) error

func (*HandleT) WithUpdateSafeTx

func (jd *HandleT) WithUpdateSafeTx(f func(tx UpdateSafeTx) error) error

type JobStatusT

type JobStatusT struct {
	JobID         int64           `json:"JobID"`
	JobState      string          `json:"JobState"` // ENUM waiting, executing, succeeded, waiting_retry,  failed, aborted, migrating, migrated, wont_migrate
	AttemptNum    int             `json:"AttemptNum"`
	ExecTime      time.Time       `json:"ExecTime"`
	RetryTime     time.Time       `json:"RetryTime"`
	ErrorCode     string          `json:"ErrorCode"`
	ErrorResponse json.RawMessage `json:"ErrorResponse"`
	Parameters    json.RawMessage `json:"Parameters"`
	WorkspaceId   string          `json:"WorkspaceId"`
}

JobStatusT is used for storing status of the job. It is the responsibility of the user of this module to set appropriate job status. State can be one of ENUM waiting, executing, succeeded, waiting_retry, failed, aborted

func BuildStatus

func BuildStatus(job *JobT, jobState string) *JobStatusT

BuildStatus generates a struct of type JobStatusT for a given job and jobState

type JobT

type JobT struct {
	UUID          uuid.UUID       `json:"UUID"`
	JobID         int64           `json:"JobID"`
	UserID        string          `json:"UserID"`
	CreatedAt     time.Time       `json:"CreatedAt"`
	ExpireAt      time.Time       `json:"ExpireAt"`
	CustomVal     string          `json:"CustomVal"`
	EventCount    int             `json:"EventCount"`
	EventPayload  json.RawMessage `json:"EventPayload"`
	PayloadSize   int64           `json:"PayloadSize"`
	LastJobStatus JobStatusT      `json:"LastJobStatus"`
	Parameters    json.RawMessage `json:"Parameters"`
	WorkspaceId   string          `json:"WorkspaceId"`
}

JobT is the basic type for creating jobs. The JobID is generated by the system and LastJobStatus is populated when reading a processed job while rest should be set by the user.

func (*JobT) String

func (job *JobT) String() string

type JobsDB

type JobsDB interface {
	// Identifier returns the jobsdb's identifier, a.k.a. table prefix
	Identifier() string

	// WithTx begins a new transaction that can be used by the provided function.
	// If the function returns an error, the transaction will be rollbacked and return the error,
	// otherwise the transaction will be committed and a nil error will be returned.
	WithTx(func(tx *sql.Tx) error) error

	// WithStoreSafeTx prepares a store-safe environment and then starts a transaction
	// that can be used by the provided function.
	WithStoreSafeTx(func(tx StoreSafeTx) error) error

	// Store stores the provided jobs to the database
	Store(ctx context.Context, jobList []*JobT) error

	// StoreInTx stores the provided jobs to the database using an existing transaction.
	// Please ensure that you are using an StoreSafeTx, e.g.
	//    jobsdb.WithStoreSafeTx(func(tx StoreSafeTx) error {
	//	      jobsdb.StoreInTx(ctx, tx, jobList)
	//    })
	StoreInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) error

	// StoreWithRetryEach tries to store all the provided jobs to the database and returns the job uuids which failed
	StoreWithRetryEach(ctx context.Context, jobList []*JobT) map[uuid.UUID]string

	// StoreWithRetryEachInTx tries to store all the provided jobs to the database and returns the job uuids which failed, using an existing transaction.
	// Please ensure that you are using an StoreSafeTx, e.g.
	//    jobsdb.WithStoreSafeTx(func(tx StoreSafeTx) error {
	//	      jobsdb.StoreWithRetryEachInTx(ctx, tx, jobList)
	//    })
	StoreWithRetryEachInTx(ctx context.Context, tx StoreSafeTx, jobList []*JobT) map[uuid.UUID]string

	// WithUpdateSafeTx prepares an update-safe environment and then starts a transaction
	// that can be used by the provided function. An update-safe transaction shall be used if the provided function
	// needs to call UpdateJobStatusInTx.
	WithUpdateSafeTx(func(tx UpdateSafeTx) error) error

	// UpdateJobStatus updates the provided job statuses
	UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error

	// UpdateJobStatusInTx updates the provided job statuses in an existing transaction.
	// Please ensure that you are using an UpdateSafeTx, e.g.
	//    jobsdb.WithUpdateSafeTx(func(tx UpdateSafeTx) error {
	//	      jobsdb.UpdateJobStatusInTx(ctx, tx, statusList, customValFilters, parameterFilters)
	//    })
	UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error

	// GetUnprocessed finds unprocessed jobs. Unprocessed are new
	// jobs whose state hasn't been marked in the database yet
	GetUnprocessed(params GetQueryParamsT) JobsResult

	// GetProcessed finds jobs in some state, i.e. not unprocessed
	GetProcessed(params GetQueryParamsT) JobsResult

	// GetToRetry finds jobs in failed state
	GetToRetry(params GetQueryParamsT) JobsResult

	// GetToRetry finds jobs in waiting state
	GetWaiting(params GetQueryParamsT) JobsResult

	// GetExecuting finds jobs in executing state
	GetExecuting(params GetQueryParamsT) JobsResult

	// GetImporting finds jobs in importing state
	GetImporting(params GetQueryParamsT) JobsResult

	// GetPileUpCounts returns statistics (counters) of incomplete jobs
	// grouped by workspaceId and destination type
	GetPileUpCounts(statMap map[string]map[string]int)

	Status() interface{}
	Ping() error
	DeleteExecuting()

	GetJournalEntries(opType string) (entries []JournalEntryT)
	JournalDeleteEntry(opID int64)
	JournalMarkStart(opType string, opPayload json.RawMessage) int64
}

JobsDB interface contains public methods to access JobsDB data

type JobsResult

type JobsResult struct {
	Jobs          []*JobT
	LimitsReached bool
	EventsCount   int
	PayloadSize   int64
}

type JobsdbUtilsHandler

type JobsdbUtilsHandler struct{}

Admin Handlers

func (*JobsdbUtilsHandler) RunSQLQuery

func (*JobsdbUtilsHandler) RunSQLQuery(argString string, reply *string) (err error)

type JournalEntryT

type JournalEntryT struct {
	OpID      int64
	OpType    string
	OpDone    bool
	OpPayload json.RawMessage
}

type MigrationCheckpointT

type MigrationCheckpointT struct {
	ID            int64           `json:"ID"`
	MigrationType MigrationOp     `json:"MigrationType"` // ENUM : export, import, acceptNewEvents
	FromNode      string          `json:"FromNode"`
	ToNode        string          `json:"ToNode"`
	JobsCount     int64           `json:"JobsCount"`
	FileLocation  string          `json:"FileLocation"`
	Status        string          `json:"Status"` // ENUM : Look up 'Values for Status'
	StartSeq      int64           `json:"StartSeq"`
	Payload       json.RawMessage `json:"Payload"`
	TimeStamp     time.Time       `json:"TimeStamp"`
}

MigrationCheckpointT captures an event of export/import to recover from incase of a crash during migration

func NewMigrationCheckpoint

func NewMigrationCheckpoint(migrationType MigrationOp, fromNode, toNode string, jobsCount int64, fileLocation, status string, startSeq int64) MigrationCheckpointT

NewMigrationCheckpoint is a constructor for MigrationCheckpoint struct

func NewSetupCheckpointEvent

func NewSetupCheckpointEvent(migrationType MigrationOp, node string) MigrationCheckpointT

NewSetupCheckpointEvent returns a new migration event that captures setup for export, import of new event acceptance

type MigrationOp

type MigrationOp string

MigrationOp is a custom type for supported types in migrationCheckpoint

type MultiTenantHandleT

type MultiTenantHandleT struct {
	*HandleT
}

func (*MultiTenantHandleT) GetAllJobs

func (mj *MultiTenantHandleT) GetAllJobs(workspaceCount map[string]int, params GetQueryParamsT, maxDSQuerySize int) []*JobT

type MultiTenantJobsDB

type MultiTenantJobsDB interface {
	GetAllJobs(map[string]int, GetQueryParamsT, int) []*JobT

	WithUpdateSafeTx(func(tx UpdateSafeTx) error) error
	UpdateJobStatusInTx(ctx context.Context, tx UpdateSafeTx, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error
	UpdateJobStatus(ctx context.Context, statusList []*JobStatusT, customValFilters []string, parameterFilters []ParameterFilterT) error

	DeleteExecuting()

	GetJournalEntries(opType string) (entries []JournalEntryT)
	JournalMarkStart(opType string, opPayload json.RawMessage) int64
	JournalDeleteEntry(opID int64)
	GetPileUpCounts(map[string]map[string]int)
}

type MultiTenantLegacy

type MultiTenantLegacy struct {
	*HandleT
}

func (*MultiTenantLegacy) GetAllJobs

func (mj *MultiTenantLegacy) GetAllJobs(workspaceCount map[string]int, params GetQueryParamsT, _ int) []*JobT

type OptsFunc

type OptsFunc func(jd *HandleT)

func WithClearDB

func WithClearDB(clearDB bool) OptsFunc

WithClearDB, if set to true it will remove all existing tables

func WithMigrationMode

func WithMigrationMode(mode string) OptsFunc

func WithPreBackupHandlers

func WithPreBackupHandlers(preBackupHandlers []prebackup.Handler) OptsFunc

WithPreBackupHandlers, sets pre-backup handlers

func WithQueryFilterKeys

func WithQueryFilterKeys(filters QueryFiltersT) OptsFunc

func WithStatusHandler

func WithStatusHandler() OptsFunc

type OwnerType

type OwnerType string

OwnerType for this jobsdb instance

const (
	// Read : Only Reader of this jobsdb instance
	Read OwnerType = "READ"
	// Write : Only Writer of this jobsdb instance
	Write OwnerType = "WRITE"
	// ReadWrite : Reader and Writer of this jobsdb instance
	ReadWrite OwnerType = ""
)

type ParameterFilterT

type ParameterFilterT struct {
	Name     string
	Value    string
	Optional bool
}

type QueryConditions

type QueryConditions struct {
	// if IgnoreCustomValFiltersInQuery is true, CustomValFilters is not going to be used
	IgnoreCustomValFiltersInQuery bool
	CustomValFilters              []string
	ParameterFilters              []ParameterFilterT
	StateFilters                  []string
}

QueryConditions holds jobsdb query conditions

type QueryFiltersT

type QueryFiltersT struct {
	CustomVal        bool
	ParameterFilters []string
}

type ReadonlyHandleT

type ReadonlyHandleT struct {
	DbHandle *sql.DB
	// contains filtered or unexported fields
}

func (*ReadonlyHandleT) GetDSListString

func (jd *ReadonlyHandleT) GetDSListString() (string, error)

func (*ReadonlyHandleT) GetFailedStatusErrorCodeCountsByDestination

func (jd *ReadonlyHandleT) GetFailedStatusErrorCodeCountsByDestination(args []string) (string, error)

func (*ReadonlyHandleT) GetJobByID

func (jd *ReadonlyHandleT) GetJobByID(job_id, prefix string) (string, error)

func (*ReadonlyHandleT) GetJobIDStatus

func (jd *ReadonlyHandleT) GetJobIDStatus(job_id, prefix string) (string, error)

func (*ReadonlyHandleT) GetJobIDsForUser

func (jd *ReadonlyHandleT) GetJobIDsForUser(args []string) (string, error)

func (*ReadonlyHandleT) GetJobSummaryCount

func (jd *ReadonlyHandleT) GetJobSummaryCount(arg, prefix string) (string, error)

func (*ReadonlyHandleT) GetLatestFailedJobs

func (jd *ReadonlyHandleT) GetLatestFailedJobs(arg, prefix string) (string, error)

func (*ReadonlyHandleT) HavePendingJobs

func (jd *ReadonlyHandleT) HavePendingJobs(ctx context.Context, customValFilters []string, count int, parameterFilters []ParameterFilterT) (bool, error)

Count queries

HavePendingJobs returns the true if there are pending events, else false. Pending events are those whose jobs don't have a state or whose jobs status is neither succeeded nor aborted

func (*ReadonlyHandleT) Setup

func (jd *ReadonlyHandleT) Setup(tablePrefix string)

Setup is used to initialize the ReadonlyHandleT structure.

func (*ReadonlyHandleT) TearDown

func (jd *ReadonlyHandleT) TearDown()

TearDown releases all the resources

type ReadonlyJobsDB

type ReadonlyJobsDB interface {
	HavePendingJobs(ctx context.Context, customValFilters []string, count int, parameterFilters []ParameterFilterT) (bool, error)
	GetJobSummaryCount(arg, prefix string) (string, error)
	GetLatestFailedJobs(arg, prefix string) (string, error)
	GetJobIDsForUser(args []string) (string, error)
	GetFailedStatusErrorCodeCountsByDestination(args []string) (string, error)
	GetDSListString() (string, error)
	GetJobIDStatus(job_id, prefix string) (string, error)
	GetJobByID(job_id, prefix string) (string, error)
}

ReadonlyJobsDB interface contains public methods to access JobsDB data

type SQLJobStatusT

type SQLJobStatusT struct {
	JobID         sql.NullInt64
	JobState      sql.NullString // ENUM waiting, executing, succeeded, waiting_retry,  failed, aborted, migrated
	AttemptNum    sql.NullInt64
	ExecTime      sql.NullTime
	RetryTime     sql.NullTime
	ErrorCode     sql.NullString
	ErrorResponse sql.NullString
}

SQLJobStatusT is a temporary struct to handle nulls from postgres query

type StoreSafeTx

type StoreSafeTx interface {
	Tx() *sql.Tx
	// contains filtered or unexported methods
}

StoreSafeTx sealed interface

func EmptyStoreSafeTx

func EmptyStoreSafeTx() StoreSafeTx

EmptyStoreSafeTx returns an empty interface usable only for tests

type UpdateSafeTx

type UpdateSafeTx interface {
	Tx() *sql.Tx
	// contains filtered or unexported methods
}

UpdateSafeTx sealed interface

func EmptyUpdateSafeTx

func EmptyUpdateSafeTx() UpdateSafeTx

EmptyUpdateSafeTx returns an empty interface usable only for tests

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL