cockroach: github.com/cockroachdb/cockroach/pkg/jobs Index | Files | Directories

package jobs

import "github.com/cockroachdb/cockroach/pkg/jobs"

Index

Package Files

helpers.go jobs.go metrics.go progress.go registry.go update.go

Variables

var DefaultAdoptInterval = 30 * time.Second

DefaultAdoptInterval is a reasonable interval at which to poll system.jobs for jobs with expired leases.

DefaultAdoptInterval is mutable for testing. NB: Updates to this value after Registry.Start has been called will not have any effect.

var DefaultCancelInterval = base.DefaultTxnHeartbeatInterval

DefaultCancelInterval is a reasonable interval at which to poll this node for liveness failures and cancel running jobs.

var FakeNodeID = func() *base.NodeIDContainer {
    nodeID := base.NodeIDContainer{}
    nodeID.Reset(1)
    return &nodeID
}()

FakeNodeID is a dummy node ID for use in tests. It always stores 1.

var (

    // LeniencySetting is the amount of time to defer any attempts to
    // reschedule a job.  Visible for testing.
    LeniencySetting = settings.RegisterDurationSetting(
        "jobs.registry.leniency",
        "the amount of time to defer any attempts to reschedule a job",
        defaultLeniencySetting)
)
var MakeChangefeedMetricsHook func(time.Duration) metric.Struct

MakeChangefeedMetricsHook allows for registration of changefeed metrics from ccl code.

var NoopFn = func(context.Context, *client.Txn) error { return nil }

NoopFn is an empty function that can be used for Failed and Succeeded. It indicates no transactional callback should be made during these operations.

var ProgressUpdateOnly func(context.Context, jobspb.ProgressDetails)

ProgressUpdateOnly is for use with NewChunkProgressLogger to just update job progress fraction (ie. when a custom func with side-effects is not needed).

func NewRetryJobError Uses

func NewRetryJobError(s string) error

NewRetryJobError creates a new error that, if returned by a Resumer, indicates to the jobs registry that the job should be restarted in the background.

func RegisterConstructor Uses

func RegisterConstructor(typ jobspb.Type, fn Constructor)

RegisterConstructor registers a Resumer constructor for a certain job type.

func SimplifyInvalidStatusError Uses

func SimplifyInvalidStatusError(err error) error

SimplifyInvalidStatusError unwraps an *InvalidStatusError into an error message suitable for users. Other errors are returned as passed.

func TestingSetProgressThresholds Uses

func TestingSetProgressThresholds() func()

TestingSetProgressThresholds overrides batching limits to update more often.

func UnmarshalPayload Uses

func UnmarshalPayload(datum tree.Datum) (*jobspb.Payload, error)

UnmarshalPayload unmarshals and returns the Payload encoded in the input datum, which should be a tree.DBytes.

func UnmarshalProgress Uses

func UnmarshalProgress(datum tree.Datum) (*jobspb.Progress, error)

UnmarshalProgress unmarshals and returns the Progress encoded in the input datum, which should be a tree.DBytes.

type ChunkProgressLogger Uses

type ChunkProgressLogger struct {
    // contains filtered or unexported fields
}

ChunkProgressLogger is a helper for managing the progress state on a job. For a given job, it assumes there are some number of chunks of work to do and tracks the completion progress as chunks are reported as done (via Loop). It then updates the actual job periodically using a ProgressUpdateBatcher.

func NewChunkProgressLogger Uses

func NewChunkProgressLogger(
    j *Job,
    expectedChunks int,
    startFraction float32,
    progressedFn func(context.Context, jobspb.ProgressDetails),
) *ChunkProgressLogger

NewChunkProgressLogger returns a ChunkProgressLogger.

func (*ChunkProgressLogger) Loop Uses

func (jpl *ChunkProgressLogger) Loop(ctx context.Context, chunkCh <-chan struct{}) error

Loop calls chunkFinished for every message received over chunkCh. It exits when chunkCh is closed, when totalChunks messages have been received, or when the context is canceled.

type Constructor Uses

type Constructor func(job *Job, settings *cluster.Settings) Resumer

Constructor creates a resumable job of a certain type. The Resumer is created on the coordinator each time the job is started/resumed, so it can hold state. The Resume method is always ran, and can set state on the Resumer that can be used by the other methods.

type DescriptionUpdateFn Uses

type DescriptionUpdateFn func(ctx context.Context, description string) (string, error)

DescriptionUpdateFn is a callback that computes a job's description given its current one.

type FakeNodeLiveness Uses

type FakeNodeLiveness struct {

    // A non-blocking send is performed over these channels when the corresponding
    // method is called.
    SelfCalledCh          chan struct{}
    GetLivenessesCalledCh chan struct{}
    // contains filtered or unexported fields
}

FakeNodeLiveness allows simulating liveness failures without the full storage.NodeLiveness machinery.

func NewFakeNodeLiveness Uses

func NewFakeNodeLiveness(nodeCount int) *FakeNodeLiveness

NewFakeNodeLiveness initializes a new NodeLiveness with nodeCount live nodes.

func (*FakeNodeLiveness) FakeIncrementEpoch Uses

func (nl *FakeNodeLiveness) FakeIncrementEpoch(id roachpb.NodeID)

FakeIncrementEpoch increments the epoch for the node with the specified ID.

func (*FakeNodeLiveness) FakeSetExpiration Uses

func (nl *FakeNodeLiveness) FakeSetExpiration(id roachpb.NodeID, ts hlc.Timestamp)

FakeSetExpiration sets the expiration time of the liveness for the node with the specified ID to ts.

func (*FakeNodeLiveness) GetLivenesses Uses

func (nl *FakeNodeLiveness) GetLivenesses() (out []storagepb.Liveness)

GetLivenesses implements the implicit storage.NodeLiveness interface.

func (*FakeNodeLiveness) ModuleTestingKnobs Uses

func (*FakeNodeLiveness) ModuleTestingKnobs()

ModuleTestingKnobs implements base.ModuleTestingKnobs.

func (*FakeNodeLiveness) Self Uses

func (nl *FakeNodeLiveness) Self() (*storagepb.Liveness, error)

Self implements the implicit storage.NodeLiveness interface. It uses NodeID as the node ID. On every call, a nonblocking send is performed over nl.ch to allow tests to execute a callback.

type FractionProgressedFn Uses

type FractionProgressedFn func(ctx context.Context, details jobspb.ProgressDetails) float32

FractionProgressedFn is a callback that computes a job's completion fraction given its details. It is safe to modify details in the callback; those modifications will be automatically persisted to the database record.

func FractionUpdater Uses

func FractionUpdater(f float32) FractionProgressedFn

FractionUpdater returns a FractionProgressedFn that returns its argument.

type HighWaterProgressedFn Uses

type HighWaterProgressedFn func(ctx context.Context, details jobspb.ProgressDetails) hlc.Timestamp

HighWaterProgressedFn is a callback that computes a job's high-water mark given its details. It is safe to modify details in the callback; those modifications will be automatically persisted to the database record.

type InvalidStatusError Uses

type InvalidStatusError struct {
    // contains filtered or unexported fields
}

InvalidStatusError is the error returned when the desired operation is invalid given the job's current status.

func (*InvalidStatusError) Error Uses

func (e *InvalidStatusError) Error() string

type Job Uses

type Job struct {
    // contains filtered or unexported fields
}

Job manages logging the progress of long-running system processes, like backups and restores, to the system.jobs table.

func (*Job) CheckStatus Uses

func (j *Job) CheckStatus(ctx context.Context) error

CheckStatus verifies the status of the job and returns an error if the job's status isn't Running.

func (*Job) CheckTerminalStatus Uses

func (j *Job) CheckTerminalStatus(ctx context.Context) bool

CheckTerminalStatus returns true if the job is in a terminal status.

func (*Job) Created Uses

func (j *Job) Created(ctx context.Context) error

Created records the creation of a new job in the system.jobs table and remembers the assigned ID of the job in the Job. The job information is read from the Record field at the time Created is called.

func (*Job) Details Uses

func (j *Job) Details() jobspb.Details

Details returns the details from the most recently sent Payload for this Job.

func (*Job) Failed Uses

func (j *Job) Failed(
    ctx context.Context, err error, fn func(context.Context, *client.Txn) error,
) error

Failed marks the tracked job as having failed with the given error.

func (*Job) FractionCompleted Uses

func (j *Job) FractionCompleted() float32

FractionCompleted returns completion according to the in-memory job state.

func (*Job) FractionProgressed Uses

func (j *Job) FractionProgressed(ctx context.Context, progressedFn FractionProgressedFn) error

FractionProgressed updates the progress of the tracked job. It sets the job's FractionCompleted field to the value returned by progressedFn and persists progressedFn's modifications to the job's progress details, if any.

Jobs for which progress computations do not depend on their details can use the FractionUpdater helper to construct a ProgressedFn.

func (*Job) HighWaterProgressed Uses

func (j *Job) HighWaterProgressed(ctx context.Context, progressedFn HighWaterProgressedFn) error

HighWaterProgressed updates the progress of the tracked job. It sets the job's HighWater field to the value returned by progressedFn and persists progressedFn's modifications to the job's progress details, if any.

func (*Job) ID Uses

func (j *Job) ID() *int64

ID returns the ID of the job that this Job is currently tracking. This will be nil if Created has not yet been called.

func (*Job) Payload Uses

func (j *Job) Payload() jobspb.Payload

Payload returns the most recently sent Payload for this Job.

func (*Job) Progress Uses

func (j *Job) Progress() jobspb.Progress

Progress returns the most recently sent Progress for this Job.

func (*Job) RunningStatus Uses

func (j *Job) RunningStatus(ctx context.Context, runningStatusFn RunningStatusFn) error

RunningStatus updates the detailed status of a job currently in progress. It sets the job's RunningStatus field to the value returned by runningStatusFn and persists runningStatusFn's modifications to the job's details, if any.

func (*Job) SetDescription Uses

func (j *Job) SetDescription(ctx context.Context, updateFn DescriptionUpdateFn) error

SetDescription updates the description of a created job.

func (*Job) SetDetails Uses

func (j *Job) SetDetails(ctx context.Context, details interface{}) error

SetDetails sets the details field of the currently running tracked job.

func (*Job) SetProgress Uses

func (j *Job) SetProgress(ctx context.Context, details interface{}) error

SetProgress sets the details field of the currently running tracked job.

func (*Job) Started Uses

func (j *Job) Started(ctx context.Context) error

Started marks the tracked job as started.

func (*Job) Succeeded Uses

func (j *Job) Succeeded(ctx context.Context, fn func(context.Context, *client.Txn) error) error

Succeeded marks the tracked job as having succeeded and sets its fraction completed to 1.0.

func (*Job) Update Uses

func (j *Job) Update(ctx context.Context, updateFn UpdateFn) error

Update is used to read the metadata for a job and potentially update it.

The updateFn is called in the context of a transaction and is passed the current metadata for the job. It can choose to update parts of the metadata using the JobUpdater, causing them to be updated within the same transaction.

Sample usage:

err := j.Update(ctx, func(_ *client.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
  if md.Status != StatusRunning {
    return errors.New("job no longer running")
  }
  md.UpdateStatus(StatusPaused)
  // <modify md.Payload>
  md.UpdatePayload(md.Payload)
}

Note that there are various convenience wrappers (like FractionProgressed) defined in jobs.go.

func (*Job) WithTxn Uses

func (j *Job) WithTxn(txn *client.Txn) *Job

WithTxn sets the transaction that this Job will use for its next operation. If the transaction is nil, the Job will create a one-off transaction instead. If you use WithTxn, this Job will no longer be threadsafe.

type JobMetadata Uses

type JobMetadata struct {
    ID       int64
    Status   Status
    Payload  *jobspb.Payload
    Progress *jobspb.Progress
}

JobMetadata groups the job metadata values passed to UpdateFn.

func (*JobMetadata) CheckRunning Uses

func (md *JobMetadata) CheckRunning() error

CheckRunning returns an InvalidStatusError if md.Status is not StatusRunning.

type JobUpdater Uses

type JobUpdater struct {
    // contains filtered or unexported fields
}

JobUpdater accumulates changes to job metadata that are to be persisted.

func (*JobUpdater) UpdatePayload Uses

func (ju *JobUpdater) UpdatePayload(payload *jobspb.Payload)

UpdatePayload sets a new Payload (to be persisted).

WARNING: the payload can be large (resulting in a large KV for each version); it shouldn't be updated frequently.

func (*JobUpdater) UpdateProgress Uses

func (ju *JobUpdater) UpdateProgress(progress *jobspb.Progress)

UpdateProgress sets a new Progress (to be persisted).

func (*JobUpdater) UpdateStatus Uses

func (ju *JobUpdater) UpdateStatus(status Status)

UpdateStatus sets a new status (to be persisted).

type Metrics Uses

type Metrics struct {
    Changefeed metric.Struct
}

Metrics are for production monitoring of each job type.

func (*Metrics) InitHooks Uses

func (m *Metrics) InitHooks(histogramWindowInterval time.Duration)

InitHooks initializes the metrics for job monitoring.

func (Metrics) MetricStruct Uses

func (Metrics) MetricStruct()

MetricStruct implements the metric.Struct interface.

type NodeLiveness Uses

type NodeLiveness interface {
    Self() (*storagepb.Liveness, error)
    GetLivenesses() []storagepb.Liveness
}

NodeLiveness is the subset of storage.NodeLiveness's interface needed by Registry.

type ProgressUpdateBatcher Uses

type ProgressUpdateBatcher struct {
    // Report is the function called to record progress
    Report func(context.Context, float32) error

    syncutil.Mutex
    // contains filtered or unexported fields
}

ProgressUpdateBatcher is a helper for tracking progress as it is made and calling a progress update function when it has meaningfully advanced (e.g. by more than 5%), while ensuring updates also are not done too often (by default not less than 30s apart).

func (*ProgressUpdateBatcher) Add Uses

func (p *ProgressUpdateBatcher) Add(ctx context.Context, delta float32) error

Add records some additional progress made and checks there has been enough change in the completed progress (and enough time has passed) to report the new progress amount.

func (*ProgressUpdateBatcher) Done Uses

func (p *ProgressUpdateBatcher) Done(ctx context.Context) error

Done allows the batcher to report any meaningful unreported progress, without worrying about update frequency now that it is done.

type Record Uses

type Record struct {
    Description   string
    Statement     string
    Username      string
    DescriptorIDs sqlbase.IDs
    Details       jobspb.Details
    Progress      jobspb.ProgressDetails
    RunningStatus RunningStatus
}

Record bundles together the user-managed fields in jobspb.Payload.

type Registry Uses

type Registry struct {
    TestingResumerCreationKnobs map[jobspb.Type]func(Resumer) Resumer
    // contains filtered or unexported fields
}

Registry creates Jobs and manages their leases and cancelation.

Job information is stored in the `system.jobs` table. Each node will poll this table and establish a lease on any claimed job. Registry calculates its own liveness for a node based on the expiration time of the underlying node-liveness lease. This is because we want to allow jobs assigned to temporarily non-live (i.e. saturated) nodes to continue without being canceled.

When a lease has been determined to be stale, a node may attempt to claim the relevant job. Thus, a Registry must occasionally re-validate its own leases to ensure that another node has not stolen the work and cancel the local job if so.

Prior versions of Registry used the node's epoch value to determine whether or not a job should be stolen. The current implementation uses a time-based approach, where a node's last reported expiration timestamp is used to calculate a liveness value for the purpose of job scheduling.

Mixed-version operation between epoch- and time-based nodes works since we still publish epoch information in the leases for time-based nodes. From the perspective of a time-based node, an epoch-based node simply behaves as though its leniency period is 0. Epoch-based nodes will see time-based nodes delay the act of stealing a job.

func MakeRegistry Uses

func MakeRegistry(
    ac log.AmbientContext,
    stopper *stop.Stopper,
    clock *hlc.Clock,
    db *client.DB,
    ex sqlutil.InternalExecutor,
    nodeID *base.NodeIDContainer,
    settings *cluster.Settings,
    histogramWindowInterval time.Duration,
    planFn planHookMaker,
) *Registry

MakeRegistry creates a new Registry. planFn is a wrapper around sql.newInternalPlanner. It returns a sql.PlanHookState, but must be coerced into that in the Resumer functions.

func (*Registry) Cancel Uses

func (r *Registry) Cancel(ctx context.Context, txn *client.Txn, id int64) error

Cancel marks the job with id as canceled using the specified txn (may be nil).

func (*Registry) LoadJob Uses

func (r *Registry) LoadJob(ctx context.Context, jobID int64) (*Job, error)

LoadJob loads an existing job with the given jobID from the system.jobs table.

func (*Registry) LoadJobWithTxn Uses

func (r *Registry) LoadJobWithTxn(ctx context.Context, jobID int64, txn *client.Txn) (*Job, error)

LoadJobWithTxn does the same as above, but using the transaction passed in the txn argument. Passing a nil transaction is equivalent to calling LoadJob in that a transaction will be automatically created.

func (*Registry) MetricsStruct Uses

func (r *Registry) MetricsStruct() *Metrics

MetricsStruct returns the metrics for production monitoring of each job type. They're all stored as the `metric.Struct` interface because of dependency cycles.

func (*Registry) NewJob Uses

func (r *Registry) NewJob(record Record) *Job

NewJob creates a new Job.

func (*Registry) Pause Uses

func (r *Registry) Pause(ctx context.Context, txn *client.Txn, id int64) error

Pause marks the job with id as paused using the specified txn (may be nil).

func (*Registry) Resume Uses

func (r *Registry) Resume(ctx context.Context, txn *client.Txn, id int64) error

Resume resumes the paused job with id using the specified txn (may be nil).

func (*Registry) Start Uses

func (r *Registry) Start(
    ctx context.Context,
    stopper *stop.Stopper,
    nl NodeLiveness,
    cancelInterval, adoptInterval time.Duration,
) error

Start polls the current node for liveness failures and cancels all registered jobs if it observes a failure.

func (*Registry) StartJob Uses

func (r *Registry) StartJob(
    ctx context.Context, resultsCh chan<- tree.Datums, record Record,
) (*Job, <-chan error, error)

StartJob creates and asynchronously starts a job from record. An error is returned if the job type has not been registered with RegisterConstructor. The ctx passed to this function is not the context the job will be started with (canceling ctx will not causing the job to cancel).

type Resumer Uses

type Resumer interface {
    // Resume is called when a job is started or resumed. Sending results on the
    // chan will return them to a user, if a user's session is connected. phs
    // is a sql.PlanHookState.
    Resume(ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums) error

    // OnSuccess is called when a job has completed successfully, and is called
    // with the same txn that will mark the job as successful. The txn will
    // only be committed if this doesn't return an error and the job state was
    // successfully changed to successful. If OnSuccess returns an error, the
    // job will be marked as failed.
    //
    // Any work this function does must still be correct if the txn is aborted at
    // a later time.
    OnSuccess(ctx context.Context, txn *client.Txn) error

    // OnTerminal is called after a job has successfully been marked as
    // terminal. It should be used to perform optional cleanup and return final
    // results to the user. There is no guarantee that this function is ever run
    // (for example, if a node died immediately after Success commits).
    OnTerminal(ctx context.Context, status Status, resultsCh chan<- tree.Datums)

    // OnFailOrCancel is called when a job fails or is canceled, and is called
    // with the same txn that will mark the job as failed or canceled. The txn
    // will only be committed if this doesn't return an error and the job state
    // was successfully changed to failed or canceled. This is done so that
    // transactional cleanup can be guaranteed to have happened.
    //
    // This method can be called during cancellation, which is not guaranteed to
    // run on the node where the job is running. So it cannot assume that any
    // other methods have been called on this Resumer object.
    OnFailOrCancel(ctx context.Context, txn *client.Txn) error
}

Resumer is a resumable job, and is associated with a Job object. Jobs can be paused or canceled at any time. Jobs should call their CheckStatus() or Progressed() method, which will return an error if the job has been paused or canceled.

Resumers are created through registered Constructor functions.

type RunningStatus Uses

type RunningStatus string

RunningStatus represents the more detailed status of a running job in the system.jobs table.

type RunningStatusFn Uses

type RunningStatusFn func(ctx context.Context, details jobspb.Details) (RunningStatus, error)

RunningStatusFn is a callback that computes a job's running status given its details. It is safe to modify details in the callback; those modifications will be automatically persisted to the database record.

type Status Uses

type Status string

Status represents the status of a job in the system.jobs table.

const (
    // StatusPending is for jobs that have been created but on which work has
    // not yet started.
    StatusPending Status = "pending"
    // StatusRunning is for jobs that are currently in progress.
    StatusRunning Status = "running"
    // StatusPaused is for jobs that are not currently performing work, but have
    // saved their state and can be resumed by the user later.
    StatusPaused Status = "paused"
    // StatusFailed is for jobs that failed.
    StatusFailed Status = "failed"
    // StatusSucceeded is for jobs that have successfully completed.
    StatusSucceeded Status = "succeeded"
    // StatusCanceled is for jobs that were explicitly canceled by the user and
    // cannot be resumed.
    StatusCanceled Status = "canceled"
)

func (Status) Terminal Uses

func (s Status) Terminal() bool

Terminal returns whether this status represents a "terminal" state: a state after which the job should never be updated again.

type UpdateFn Uses

type UpdateFn func(txn *client.Txn, md JobMetadata, ju *JobUpdater) error

UpdateFn is the callback passed to Job.Update. It is called from the context of a transaction and is passed the current metadata for the job. The callback can modify metadata using the JobUpdater and the changes will be persisted within the same transaction.

The function is free to modify contents of JobMetadata in place (but the changes will be ignored unless JobUpdater is used).

Directories

PathSynopsis
jobspb

Package jobs imports 25 packages (graph) and is imported by 15 packages. Updated 2019-09-14. Refresh now. Tools for package owners.