job

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2021 License: CC0-1.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Lowest     = Priority(0)
	Background = Priority(math.MaxUint8 / 4)
	Default    = Priority(math.MaxUint8 / 2)
	High       = Priority(3 * math.MaxUint8 / 4)
	Maximum    = Priority(math.MaxUint8)
)

Named priority levels

Variables

View Source
var NilID = ID(ulid.Nil)

NilID is the special ID that represents a non-existent job

View Source
var NilTaskID = TaskID(ulid.Nil)

NilTaskID is the special ID that represents a non-existent task

Functions

This section is empty.

Types

type CompilationError

type CompilationError struct {
	Err error
}

CompilationError wraps a compilation error.

func (CompilationError) Error

func (e CompilationError) Error() string

Error returns the wrapped error.

type ID

type ID ulid.ULID

ID represents a unique identifier for a job

func (ID) String

func (id ID) String() string

String returns a string representation of the ID.

type Info

type Info interface {
	Value() int64        // the value for this task
	State() State        // the state of this task
	AppName() string     // the application name provided by the client to which the task was assigned, or the empty string if there is no such client
	Hostname() string    // the hostname provided by the client to which the task was assigned, or the empty string if there is no such client
	Start() time.Time    // the time the task was assigned to a client, or the zero time if there is no such client
	Deadline() time.Time // the time at which this task will go stale, or the zero time if the task has not been assigned to a client
	Failures() int       // the number of times this task has failed
}

Info describes a task.

type Manifest

type Manifest struct {
	// contains filtered or unexported fields
}

Manifest represents a collection of files and directories on a pcas fs server. Paths in a valid Manifest must be unique. A valid Manifest contains at least one file.

func NewManifest

func NewManifest(files []string, directories []string) (Manifest, error)

NewManifest constructs a new Manifest containing the given files and directories.

func (Manifest) ContainsFile

func (M Manifest) ContainsFile(path string) bool

ContainsFile returns true if and only if M contains the file with the given path.

func (Manifest) Directories

func (M Manifest) Directories() []string

Directories returns the directories in M.

func (Manifest) Files

func (M Manifest) Files() []string

Files returns the files in M.

func (Manifest) IsEquivalentTo

func (M Manifest) IsEquivalentTo(N Manifest) bool

IsEquivalentTo returns true if and only if the paths and hashes in M and N agree after reordering.

func (Manifest) String

func (M Manifest) String() string

String returns a string representation of M.

type Metadata

type Metadata struct {
	AppName  string // the name of the worker application
	Hostname string // the hostname of the machine on which the worker is running
}

Metadata holds metadata about a worker

type Priority

type Priority uint8

Priority represents the priority of a job. Higher numbers represent higher priority.

type Reader

type Reader interface {
	io.ReadCloser
	// SetReadDeadline sets the deadline for future Read calls and any
	// currently-blocked Read call. A zero value for t means Read will not
	// time out.
	SetReadDeadline(t time.Time) error
	// SetDeadline is an alias for SetReadDeadline.
	SetDeadline(t time.Time) error
}

Reader is the interface describing a reader.

type Specification

type Specification struct {
	// Name is the name of the job
	Name string
	// Manifest records the files and directories created on job submission
	Manifest Manifest
	// Script is the file in Manifest that is run for each task in the job
	Script string
	// WorkingDir is a directory in Manifest, either explicitly or implicitly (as a subpath)
	WorkingDir string
	// Metadata records user-provided metadata
	Metadata map[string]string
	// Range is the range of tasks that make up the job
	Range irange.Range
	// Priority is the priority level of the job
	Priority Priority
	// Lifetime is the maximum lifetime of a task
	Lifetime time.Duration
	// MaxRetries is the maximum number of times that a task is retried on error
	MaxRetries int
	// MaxConcurrency is the maximum number of simultaneously-active tasks for this job
	MaxConcurrency int
}

Specification represents a job as submitted by a client.

func (*Specification) IsEquivalentTo

func (S *Specification) IsEquivalentTo(T *Specification) (ok bool, err error)

IsEquivalentTo returns true if and only if S and T are equivalent.

func (*Specification) Validate

func (S *Specification) Validate() error

Validate validates the specification, returning an error if there is a problem.

type State

type State uint8

State records the state of a task.

const (
	Uninitialised State = iota
	Pending
	Active
	Succeeded
	Failed
)

States of tasks

type Status

type Status interface {
	Pending() irange.Range   // The pending tasks
	Active() irange.Range    // The active tasks
	Succeeded() irange.Range // The tasks that succeeded
	Failed() irange.Range    // The tasks that failed
}

Status describes the status of a job.

type Storage

type Storage interface {
	SubmitterStorage
	WorkerStorage
}

Storage is the interface satisfied by job storage systems

type SubmitterStorage

type SubmitterStorage interface {
	// Submit submits a job, returning an ID which identifies that job. The job will become active when all files in the specified manifest have been uploaded. The storage system may discard a job if all such files are not uploaded after a certain length of time.
	Submit(context.Context, *Specification) (ID, error)
	// Upload uploads a file with the given path and contents. The path should be present in the manifest for the job with the given ID.
	Upload(ctx context.Context, id ID, path string, contents io.Reader) error
	// Status returns the status of the job with the given ID, or the error errors.ErrUnknownJob if there is no such active job.
	Status(context.Context, ID) (Status, error)
	// Info returns information about the task with the given value for the job with the given ID, or the error errors.ErrUnknownJob if there is no such active job.
	Info(context.Context, ID, int64) (Info, error)
	// List returns a slice containing the IDs of all active jobs.
	List(context.Context) ([]ID, error)
	// ListWithPriority returns a slice containing the IDs of all active jobs with the given priority level.
	ListWithPriority(context.Context, Priority) ([]ID, error)
	// Delete deletes the job with given ID, and returns the error errors.ErrUnknownJob if there is no such job.
	Delete(context.Context, ID) error
	// Describe returns the submission data for the job with the given ID, or the error errors.ErrUnknownJob if there is no such job.
	Describe(context.Context, ID) (*Specification, error)
}

SubmitterStorage is the part of the interface satisfied by a job storage system that is needed by a submitter.

type Task

type Task interface {
	// Name is the name of the job to which this task belongs.
	Name() string
	// JobID is the ID of the job.
	JobID() ID
	// Manifest records the files and directories uploaded with the job.
	Manifest() Manifest
	// Script is the file in Manifest that is run for each task in the job.
	Script() string
	// WorkingDir is a directory in Manifest, either explicitly or implicitly (as a subpath).
	WorkingDir() string
	// Metadata records user-provided metadata.
	Metadata() map[string]string
	// Priority is the priority level of the job.
	Priority() Priority
	// ID is the ID of the task.
	ID() TaskID
	// Value is the value of the task.
	Value() int64
	// Deadline is the time at which the task will go stale.
	Deadline() time.Time
	// Failures is the number of times that this task has failed.
	Failures() int
}

Task represents a task

type TaskID

type TaskID ulid.ULID

TaskID represents a unique identifier for a task

func (TaskID) String

func (id TaskID) String() string

String returns a string representation of the TaskID.

type UserError

type UserError struct {
	Err error
}

UserError wraps an error generated by a task script.

func (UserError) Error

func (e UserError) Error() string

Error returns the wrapped error.

type Worker

type Worker interface {
	log.Logable
	metrics.Metricsable
	// Run starts the worker. It will continue processing tasks until either Stop is called, in which case it finishes processing the current task and then exits, or the context ctx is cancelled, in which case it marks the current task as having errored and exits immediately.
	Run(context.Context) error
	// Stop causes the worker to exit once it finishes processing the current task.
	Stop()
}

Worker is an interface satisfied by a jobdb worker

func NewWorker

func NewWorker(c WorkerStorage, S fs.Interface, cfg *WorkerConfig) Worker

NewWorker returns a new worker with underlying job storage system c, local file storage S, and configuration cfg.

type WorkerConfig

type WorkerConfig struct {
	Timeout  time.Duration // the maximum length of time to wait for a new task before exiting
	AppName  string        // the application name by which the worker identifies itself to the job storage system
	Hostname string        // the hostname by which the worker identifies itself to the job storage system
}

WorkerConfig holds configuration data for a jobdb worker

type WorkerStorage

type WorkerStorage interface {
	// Next returns a new task, if one is available. It provides the given metadata to the job server.
	Next(context.Context, Metadata) (Task, error)
	// Download downloads a file with the given path. This should be present in the manifest for the job with the given ID.
	Download(ctx context.Context, id ID, path string) (Reader, error)
	// Success indicates that the given task has succeeded.
	Success(context.Context, Task) error
	// Error indicates that the given task has failed and should be retried.
	Error(context.Context, Task) error
	// Requeue indicates that the given task should be requeued, without incrementing the number of failures.
	Requeue(context.Context, Task) error
	// Fatal indicates that the given task has failed and should not be requeued.
	Fatal(context.Context, Task) error
	// Heartbeat sends a message to the job server indicating that the worker is still alive and is processing the given task. A nil Task indicates that no task is being processed. Heartbeat returns true if task processing should continue and false if the current task should be cancelled.
	Heartbeat(context.Context, Task) (bool, error)
}

WorkerStorage is the part of the interface satisfied by a job storage system that is needed by a worker.

Directories

Path Synopsis
cmd
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL