job

package
v0.0.0-...-ebdf71b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2022 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	JobStatePreStart = iota
	JobStateRunning
	JobStateCompleted
)
View Source
const JobberCG = "/sys/fs/cgroup/jobber"

Variables

View Source
var (
	ErrUnauthorized = errors.New("unauthorized")
	ErrMissingID    = errors.New("missing job ID")
	ErrNoCommand    = errors.New("missing job command")
	ErrNotStarted   = errors.New("could not start job")
	ErrShutdown     = errors.New("service is shut down")
	ErrUnknown      = errors.New("unknown job")
)
View Source
var (
	ErrAlreadyStarted = errors.New("job already started")
)

Functions

func AddUserToContext

func AddUserToContext(ctx context.Context, user string) context.Context

func GetUserFromContext

func GetUserFromContext(ctx context.Context) (string, bool)

func InitCgroups

func InitCgroups() error

Types

type ArgMaker

type ArgMaker func(JobDescription) (string, []string)

type DiskIOLimits

type DiskIOLimits struct {
	Device    string
	Major     uint32
	Minor     uint32
	ReadBPS   uint64
	WriteBPS  uint64
	ReadIOPS  uint32
	WriteIOPS uint32
}

func (*DiskIOLimits) ResolveDevice

func (d *DiskIOLimits) ResolveDevice() error

func (*DiskIOLimits) String

func (d *DiskIOLimits) String() string

func (*DiskIOLimits) UnmarshalText

func (d *DiskIOLimits) UnmarshalText(b []byte) (err error)

UnmarshalText unmarshals a string ([]byte) into a DiskIOLimits. It is used by kong to unmarshal the command line argument into a structured value.

The format of the input string is 5 or 6 colon separated values. If 5 values, the first should be a block device filesystem path that can be stat'ed to get its major and minor number. If 6 values, they are directly the major and minor number.

The remaining 4 values are the disk IO limits for that block device. A field may be empty which is parsed as zero, which means no setting for that throttle.

type Job

type Job struct {
	ID     string
	Spec   JobSpec
	Status JobStatus
	// contains filtered or unexported fields
}

func NewJob

func NewJob(id string, spec JobSpec, argMaker ArgMaker) *Job

func (*Job) AttachOutfeed

func (j *Job) AttachOutfeed(follow bool, done <-chan struct{}) <-chan Log

func (*Job) Cleanup

func (j *Job) Cleanup()

func (*Job) Description

func (j *Job) Description() JobDescription

func (*Job) ExecPart1

func (j *Job) ExecPart1() (io.ReadCloser, error)

ExecPart1 starts the execution of a job's command, ensuring it runs in new namespaces where appropriate, attaching pipes to capture the output of the command and any errors that come from not being able to run the command. It uses an ArgMaker to construct the command line as we do not know anything about the program we are embedded in and what command line args it takes. The ArgMaker abstracts that for us and allows the user of this package to define how to propagate Job parameters into a Job for ExecPart2 in a child process.

If successful, it returns an io.ReadCloser that can be read for the command's combined stdout/stderr stream. Once that has closed, Job.cmd.Wait() should be called on the job to capture the exit code of the process and reap it.

func (*Job) ExecPart2

func (j *Job) ExecPart2()

ExecPart2 runs the job in a cgroup configured from the job's parameters and configures the namespaces it is already in. It is expected that the process is already running in "empty" namespaces based on the job's configuration.

It is expected that the standard io streams are set up as follows:

  • stdin: /dev/null
  • stdout: where the process's stdout and stderr are sent
  • stderr: where error messages due to the inability to run the program are sent - e.g. errors setting up the cgroup, being unable to exec the program (not found), etc.

When the command is executed, it will have the stderr stream it received closed and will instead have the stdout stream on stderr too.

It does not return an error, instead writing errors to stderr to be captured by the parent process in ExecPart1().

func (*Job) Start

func (j *Job) Start(owner string) error

Start runs the job.

func (*Job) Stop

func (j *Job) Stop(ctx context.Context)

Stop terminates the job (with extreme prejudice - SIGKILL).

type JobDescription

type JobDescription struct {
	ID     string
	Spec   JobSpec
	Status JobStatus
}

type JobSpec

type JobSpec struct {
	Command string   `arg:"" help:"Command for jobber server to run"`
	Args    []string `arg:"" optional:"" help:"Arguments to command"`

	Root           string `help:"run in isolated root directory"`
	IsolateNetwork bool   `help:"run in isolated network namespace"`

	Resources ResourceLimits `embed:""`
}

type JobState

type JobState int

type JobStatus

type JobStatus struct {
	StartTime time.Time
	Owner     string
	State     JobState
	ExitCode  uint32
	ExitError error
}

type Log

type Log struct {
	Timestamp time.Time
	Line      []byte
}

type ResourceLimits

type ResourceLimits struct {
	MaxProcesses uint32         `help:"maximum number of processes"`
	Memory       uint64         `help:"maximum memory (bytes)"`
	CPU          uint32         `help:"maximum CPU (milliCPU)"`
	IO           []DiskIOLimits `name:"io" help:"disk io limits (dev:rbps:wbps:riops:wiops)"`
}

type Tracker

type Tracker struct {
	// contains filtered or unexported fields
}

Tracker maintains a set of Jobs that are either running or have completed. Jobs can be added (started), stopped (including removed via cleanup if desired), listed and attached to for log output.

func NewTracker

func NewTracker(argMaker ArgMaker, admins []string) *Tracker

func (*Tracker) Get

func (t *Tracker) Get(ctx context.Context, id string) (JobDescription, error)

Get returns a copy of the job identified by id if it exists in the tracker, otherwise an error. The copy returned is not an active job that can be manipulated - it is just for the data.

func (*Tracker) GetLogChannel

func (t *Tracker) GetLogChannel(id string, follow bool, ctx context.Context) (<-chan Log, error)

GetLogChannel returns a channel that streams the logs of the job identified by id. If follow is set, the stream will continue until the job terminates. Regardless of the follow flag, if the context is closed, then the returned log channel is detached from the log feeder and is closed.

func (*Tracker) List

func (t *Tracker) List(ctx context.Context, completed, all bool) []JobDescription

List returns a copy of all the jobs for a owner, or all jobs if the given owner is empty. Only running jobs are returned, unless completed is true.

func (*Tracker) Shutdown

func (t *Tracker) Shutdown(ctx context.Context) (int, error)

func (*Tracker) Start

func (t *Tracker) Start(ctx context.Context, spec JobSpec) (string, error)

Start runs the given job. If it starts, the job will be tracked and can be operated upon. If it does not start, an error is returned and the job is not tracked.

func (*Tracker) Stop

func (t *Tracker) Stop(ctx context.Context, id string, cleanup bool) error

Stop kills the job identified by id. It waits until the job exits before returning, unless the context is cancelled.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL