Documentation ¶
Overview ¶
Package worker contains code which executes a task.
Index ¶
- func DownloadInputs(pctx context.Context, inputs []*tes.Input, store storage.Storage, ...) error
- func FlattenInputs(ctx context.Context, inputs []*tes.Input, store storage.Storage, ...) ([]*tes.Input, error)
- func FlattenOutputs(ctx context.Context, outputs []*tes.Output, store storage.Storage, ...) ([]*tes.Output, error)
- func SyncDockerAPIVersion() error
- func UploadOutputs(ctx context.Context, outputs []*tes.Output, store storage.Storage, ...) ([]*tes.OutputFileLog, error)
- type Base64TaskReader
- type DefaultWorker
- type DockerCommand
- type FileMapper
- func (mapper *FileMapper) AddInput(input *tes.Input) error
- func (mapper *FileMapper) AddOutput(output *tes.Output) error
- func (mapper *FileMapper) AddTmpVolume(mountPoint string) error
- func (mapper *FileMapper) AddVolume(hostPath string, mountPoint string, readonly bool) error
- func (mapper *FileMapper) Cleanup() error
- func (mapper *FileMapper) ContainerPath(src string) string
- func (mapper *FileMapper) CreateHostFile(src string) (*os.File, error)
- func (mapper *FileMapper) HostPath(src string) (string, error)
- func (mapper *FileMapper) IsSubpath(p string, base string) bool
- func (mapper *FileMapper) MapTask(task *tes.Task) error
- func (mapper *FileMapper) OpenHostFile(src string) (*os.File, error)
- type FileTaskReader
- type GenericTaskReader
- type RPCTaskReader
- type TaskReader
- type Volume
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DownloadInputs ¶
func DownloadInputs(pctx context.Context, inputs []*tes.Input, store storage.Storage, ev *events.TaskWriter, parallelLimit int) error
DownloadInputs downloads the given inputs.
func FlattenInputs ¶
func FlattenInputs(ctx context.Context, inputs []*tes.Input, store storage.Storage, ev *events.TaskWriter) ([]*tes.Input, error)
FlattenInputs flattens any directory inputs into a list of file inputs. A warning event will be generated if an input directory is empty.
func FlattenOutputs ¶
func FlattenOutputs(ctx context.Context, outputs []*tes.Output, store storage.Storage, ev *events.TaskWriter) ([]*tes.Output, error)
FlattenOutputs flattens output directories into a list of files. A warning event will be generated if an output directory is empty.
func SyncDockerAPIVersion ¶
func SyncDockerAPIVersion() error
SyncDockerAPIVersion ensures that the client uses the same API version as the server.
func UploadOutputs ¶
func UploadOutputs(ctx context.Context, outputs []*tes.Output, store storage.Storage, ev *events.TaskWriter, parallelLimit int) ([]*tes.OutputFileLog, error)
UploadOutputs uploads the outputs.
Types ¶
type Base64TaskReader ¶
type Base64TaskReader struct {
// contains filtered or unexported fields
}
Base64TaskReader reads a task from a base64 encoded string.
func NewBase64TaskReader ¶
func NewBase64TaskReader(raw string) (*Base64TaskReader, error)
NewBase64TaskReader creates a new Base64TaskReader.
type DefaultWorker ¶
type DefaultWorker struct { Conf config.Worker Store storage.Storage TaskReader TaskReader EventWriter events.Writer }
DefaultWorker is the default task worker, which follows a basic, sequential process of task initialization, execution, finalization, and logging.
func (*DefaultWorker) Close ¶
func (r *DefaultWorker) Close()
type DockerCommand ¶
type DockerCommand struct { Image string Command []string Volumes []Volume Workdir string ContainerName string RemoveContainer bool Env map[string]string Stdin io.Reader Stdout io.Writer Stderr io.Writer Event *events.ExecutorWriter }
DockerCommand is responsible for configuring and running a docker container.
type FileMapper ¶
type FileMapper struct { Volumes []Volume Inputs []*tes.Input Outputs []*tes.Output WorkDir string }
FileMapper is responsible for mapping paths into a working directory on the worker's host file system.
Every task needs it's own directory to work in. When a file is downloaded for a task, it needs to be stored in the task's working directory. Similar for task outputs, uploads, stdin/out/err, etc. FileMapper helps the worker engine manage all these paths.
func NewFileMapper ¶
func NewFileMapper(dir string) *FileMapper
NewFileMapper returns a new FileMapper, which maps files into the given base directory.
func (*FileMapper) AddInput ¶
func (mapper *FileMapper) AddInput(input *tes.Input) error
AddInput adds an input to the mapped files for the given tes.Input. A copy of the tes.Input will be added to mapper.Inputs, with the "Path" field updated to the mapped host path.
If the path can't be mapped an error is returned.
func (*FileMapper) AddOutput ¶
func (mapper *FileMapper) AddOutput(output *tes.Output) error
AddOutput adds an output to the mapped files for the given tes.Output. A copy of the tes.Output will be added to mapper.Outputs, with the "Path" field updated to the mapped host path.
If the path can't be mapped, an error is returned.
func (*FileMapper) AddTmpVolume ¶
func (mapper *FileMapper) AddTmpVolume(mountPoint string) error
AddTmpVolume creates a directory on the host based on the declared path in the container and adds it to mapper.Volumes.
If the path can't be mapped, an error is returned.
func (*FileMapper) AddVolume ¶
func (mapper *FileMapper) AddVolume(hostPath string, mountPoint string, readonly bool) error
AddVolume adds a mapped volume to the mapper. A corresponding Volume record is added to mapper.Volumes.
If the volume paths are invalid or can't be mapped, an error is returned.
func (*FileMapper) Cleanup ¶
func (mapper *FileMapper) Cleanup() error
Cleanup deletes the working directory.
func (*FileMapper) ContainerPath ¶
func (mapper *FileMapper) ContainerPath(src string) string
ContainerPath returns an unmapped path.
The mapper's base dir is stripped from the path. e.g. If the mapper is configured with a base dir of "/tmp/mapped_files", then mapper.ContainerPath("/tmp/mapped_files/home/ubuntu/myfile") will return "/home/ubuntu/myfile".
func (*FileMapper) CreateHostFile ¶
func (mapper *FileMapper) CreateHostFile(src string) (*os.File, error)
CreateHostFile creates a file on the host file system at a mapped path. "src" is an unmapped path. This function will handle mapping the path.
This function calls os.Create ¶
If the path can't be mapped or the file can't be created, an error is returned.
func (*FileMapper) HostPath ¶
func (mapper *FileMapper) HostPath(src string) (string, error)
HostPath returns a mapped path.
The path is concatenated to the mapper's base dir. e.g. If the mapper is configured with a base dir of "/tmp/mapped_files", then mapper.HostPath("/home/ubuntu/myfile") will return "/tmp/mapped_files/home/ubuntu/myfile".
The mapped path is required to be a subpath of the mapper's base directory. e.g. mapper.HostPath("../../foo") should fail with an error.
func (*FileMapper) IsSubpath ¶
func (mapper *FileMapper) IsSubpath(p string, base string) bool
IsSubpath returns true if the given path "p" is a subpath of "base".
func (*FileMapper) MapTask ¶
func (mapper *FileMapper) MapTask(task *tes.Task) error
MapTask adds all the volumes, inputs, and outputs in the given Task to the FileMapper.
func (*FileMapper) OpenHostFile ¶
func (mapper *FileMapper) OpenHostFile(src string) (*os.File, error)
OpenHostFile opens a file on the host file system at a mapped path. "src" is an unmapped path. This function will handle mapping the path.
This function calls os.Open ¶
If the path can't be mapped or the file can't be opened, an error is returned.
type FileTaskReader ¶
type FileTaskReader struct {
// contains filtered or unexported fields
}
FileTaskReader provides a TaskReader implementation from a task file.
func NewFileTaskReader ¶
func NewFileTaskReader(path string) (*FileTaskReader, error)
NewFileTaskReader creates a new FileTaskReader.
type GenericTaskReader ¶
type GenericTaskReader struct {
// contains filtered or unexported fields
}
GenericTaskReader provides read access to tasks.
func NewGenericTaskReader ¶
func NewGenericTaskReader(get func(ctx context.Context, in *tes.GetTaskRequest) (*tes.Task, error), taskID string, close func()) *GenericTaskReader
NewGenericTaskReader returns a new generic task reader.
func (*GenericTaskReader) Close ¶
func (r *GenericTaskReader) Close()
type RPCTaskReader ¶
type RPCTaskReader struct {
// contains filtered or unexported fields
}
RPCTaskReader provides read access to tasks from the funnel server over gRPC.
func NewRPCTaskReader ¶
func NewRPCTaskReader(ctx context.Context, conf config.RPCClient, taskID string) (*RPCTaskReader, error)
NewRPCTaskReader returns a new RPC-based task reader.
type TaskReader ¶
type TaskReader interface { Task(ctx context.Context) (*tes.Task, error) State(ctx context.Context) (tes.State, error) Close() }
TaskReader is a type which reads task information during task execution.
type Volume ¶
type Volume struct { // The path in tes worker. HostPath string // The path in Docker. ContainerPath string Readonly bool }
Volume represents a volume mounted into a docker container. This includes a HostPath, the path on the host file system, and a ContainerPath, the path on the container file system, and whether the volume is read-only.