worker

package
v0.0.0-...-a514ca2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2024 License: MIT Imports: 26 Imported by: 2

Documentation

Overview

Package worker contains code which executes a task.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DownloadInputs

func DownloadInputs(pctx context.Context, inputs []*tes.Input, store storage.Storage, ev *events.TaskWriter, parallelLimit int) error

DownloadInputs downloads the given inputs.

func FlattenInputs

func FlattenInputs(ctx context.Context, inputs []*tes.Input, store storage.Storage, ev *events.TaskWriter) ([]*tes.Input, error)

FlattenInputs flattens any directory inputs into a list of file inputs. A warning event will be generated if an input directory is empty.

func FlattenOutputs

func FlattenOutputs(ctx context.Context, outputs []*tes.Output, store storage.Storage, ev *events.TaskWriter) ([]*tes.Output, error)

FlattenOutputs flattens output directories into a list of files. A warning event will be generated if an output directory is empty.

func SyncDockerAPIVersion

func SyncDockerAPIVersion() error

SyncDockerAPIVersion ensures that the client uses the same API version as the server.

func UploadOutputs

func UploadOutputs(ctx context.Context, outputs []*tes.Output, store storage.Storage, ev *events.TaskWriter, parallelLimit int) ([]*tes.OutputFileLog, error)

UploadOutputs uploads the outputs.

Types

type Base64TaskReader

type Base64TaskReader struct {
	// contains filtered or unexported fields
}

Base64TaskReader reads a task from a base64 encoded string.

func NewBase64TaskReader

func NewBase64TaskReader(raw string) (*Base64TaskReader, error)

NewBase64TaskReader creates a new Base64TaskReader.

func (*Base64TaskReader) Close

func (f *Base64TaskReader) Close()

Close the Base64TaskReader

func (*Base64TaskReader) State

func (f *Base64TaskReader) State(ctx context.Context) (tes.State, error)

State returns the task state. Due to some quirks in the implementation of this reader, and since there is no online database to connect to, this will always return QUEUED.

func (*Base64TaskReader) Task

func (f *Base64TaskReader) Task(ctx context.Context) (*tes.Task, error)

Task returns the task. A random ID will be generated.

type DefaultWorker

type DefaultWorker struct {
	Conf        config.Worker
	Store       storage.Storage
	TaskReader  TaskReader
	EventWriter events.Writer
}

DefaultWorker is the default task worker, which follows a basic, sequential process of task initialization, execution, finalization, and logging.

func (*DefaultWorker) Close

func (r *DefaultWorker) Close()

func (*DefaultWorker) Run

func (r *DefaultWorker) Run(pctx context.Context) (runerr error)

Run runs the Worker.

type DockerCommand

type DockerCommand struct {
	Image           string
	Command         []string
	Volumes         []Volume
	Workdir         string
	ContainerName   string
	RemoveContainer bool
	Env             map[string]string
	Stdin           io.Reader
	Stdout          io.Writer
	Stderr          io.Writer
	Event           *events.ExecutorWriter
}

DockerCommand is responsible for configuring and running a docker container.

func (DockerCommand) Run

func (dcmd DockerCommand) Run(ctx context.Context) error

Run runs the Docker command and blocks until done.

func (DockerCommand) Stop

func (dcmd DockerCommand) Stop() error

Stop stops the container.

type FileMapper

type FileMapper struct {
	Volumes []Volume
	Inputs  []*tes.Input
	Outputs []*tes.Output
	WorkDir string
}

FileMapper is responsible for mapping paths into a working directory on the worker's host file system.

Every task needs it's own directory to work in. When a file is downloaded for a task, it needs to be stored in the task's working directory. Similar for task outputs, uploads, stdin/out/err, etc. FileMapper helps the worker engine manage all these paths.

func NewFileMapper

func NewFileMapper(dir string) *FileMapper

NewFileMapper returns a new FileMapper, which maps files into the given base directory.

func (*FileMapper) AddInput

func (mapper *FileMapper) AddInput(input *tes.Input) error

AddInput adds an input to the mapped files for the given tes.Input. A copy of the tes.Input will be added to mapper.Inputs, with the "Path" field updated to the mapped host path.

If the path can't be mapped an error is returned.

func (*FileMapper) AddOutput

func (mapper *FileMapper) AddOutput(output *tes.Output) error

AddOutput adds an output to the mapped files for the given tes.Output. A copy of the tes.Output will be added to mapper.Outputs, with the "Path" field updated to the mapped host path.

If the path can't be mapped, an error is returned.

func (*FileMapper) AddTmpVolume

func (mapper *FileMapper) AddTmpVolume(mountPoint string) error

AddTmpVolume creates a directory on the host based on the declared path in the container and adds it to mapper.Volumes.

If the path can't be mapped, an error is returned.

func (*FileMapper) AddVolume

func (mapper *FileMapper) AddVolume(hostPath string, mountPoint string, readonly bool) error

AddVolume adds a mapped volume to the mapper. A corresponding Volume record is added to mapper.Volumes.

If the volume paths are invalid or can't be mapped, an error is returned.

func (*FileMapper) Cleanup

func (mapper *FileMapper) Cleanup() error

Cleanup deletes the working directory.

func (*FileMapper) ContainerPath

func (mapper *FileMapper) ContainerPath(src string) string

ContainerPath returns an unmapped path.

The mapper's base dir is stripped from the path. e.g. If the mapper is configured with a base dir of "/tmp/mapped_files", then mapper.ContainerPath("/tmp/mapped_files/home/ubuntu/myfile") will return "/home/ubuntu/myfile".

func (*FileMapper) CreateHostFile

func (mapper *FileMapper) CreateHostFile(src string) (*os.File, error)

CreateHostFile creates a file on the host file system at a mapped path. "src" is an unmapped path. This function will handle mapping the path.

This function calls os.Create

If the path can't be mapped or the file can't be created, an error is returned.

func (*FileMapper) HostPath

func (mapper *FileMapper) HostPath(src string) (string, error)

HostPath returns a mapped path.

The path is concatenated to the mapper's base dir. e.g. If the mapper is configured with a base dir of "/tmp/mapped_files", then mapper.HostPath("/home/ubuntu/myfile") will return "/tmp/mapped_files/home/ubuntu/myfile".

The mapped path is required to be a subpath of the mapper's base directory. e.g. mapper.HostPath("../../foo") should fail with an error.

func (*FileMapper) IsSubpath

func (mapper *FileMapper) IsSubpath(p string, base string) bool

IsSubpath returns true if the given path "p" is a subpath of "base".

func (*FileMapper) MapTask

func (mapper *FileMapper) MapTask(task *tes.Task) error

MapTask adds all the volumes, inputs, and outputs in the given Task to the FileMapper.

func (*FileMapper) OpenHostFile

func (mapper *FileMapper) OpenHostFile(src string) (*os.File, error)

OpenHostFile opens a file on the host file system at a mapped path. "src" is an unmapped path. This function will handle mapping the path.

This function calls os.Open

If the path can't be mapped or the file can't be opened, an error is returned.

type FileTaskReader

type FileTaskReader struct {
	// contains filtered or unexported fields
}

FileTaskReader provides a TaskReader implementation from a task file.

func NewFileTaskReader

func NewFileTaskReader(path string) (*FileTaskReader, error)

NewFileTaskReader creates a new FileTaskReader.

func (*FileTaskReader) Close

func (f *FileTaskReader) Close()

Close the FileTaskReader

func (*FileTaskReader) State

func (f *FileTaskReader) State(ctx context.Context) (tes.State, error)

State returns the task state. Due to some quirks in the implementation of this reader, and since there is no online database to connect to, this will always return QUEUED.

func (*FileTaskReader) Task

func (f *FileTaskReader) Task(ctx context.Context) (*tes.Task, error)

Task returns the task. A random ID will be generated.

type GenericTaskReader

type GenericTaskReader struct {
	// contains filtered or unexported fields
}

GenericTaskReader provides read access to tasks.

func NewGenericTaskReader

func NewGenericTaskReader(get func(ctx context.Context, in *tes.GetTaskRequest) (*tes.Task, error), taskID string, close func()) *GenericTaskReader

NewGenericTaskReader returns a new generic task reader.

func (*GenericTaskReader) Close

func (r *GenericTaskReader) Close()

func (*GenericTaskReader) State

func (r *GenericTaskReader) State(ctx context.Context) (tes.State, error)

State returns the current state of the task.

func (*GenericTaskReader) Task

func (r *GenericTaskReader) Task(ctx context.Context) (*tes.Task, error)

Task returns the task descriptor.

type RPCTaskReader

type RPCTaskReader struct {
	// contains filtered or unexported fields
}

RPCTaskReader provides read access to tasks from the funnel server over gRPC.

func NewRPCTaskReader

func NewRPCTaskReader(ctx context.Context, conf config.RPCClient, taskID string) (*RPCTaskReader, error)

NewRPCTaskReader returns a new RPC-based task reader.

func (*RPCTaskReader) Close

func (r *RPCTaskReader) Close()

Close closes the connection.

func (*RPCTaskReader) State

func (r *RPCTaskReader) State(ctx context.Context) (tes.State, error)

State returns the current state of the task.

func (*RPCTaskReader) Task

func (r *RPCTaskReader) Task(ctx context.Context) (*tes.Task, error)

Task returns the task descriptor.

type TaskReader

type TaskReader interface {
	Task(ctx context.Context) (*tes.Task, error)
	State(ctx context.Context) (tes.State, error)
	Close()
}

TaskReader is a type which reads task information during task execution.

type Volume

type Volume struct {
	// The path in tes worker.
	HostPath string
	// The path in Docker.
	ContainerPath string
	Readonly      bool
}

Volume represents a volume mounted into a docker container. This includes a HostPath, the path on the host file system, and a ContainerPath, the path on the container file system, and whether the volume is read-only.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL