workceptor

package
v0.0.0-...-b2ae87a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2023 License: Apache-2.0 Imports: 47 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SuccessWorkSleep = 1 * time.Second // Normal time to wait between checks
	MaxWorkSleep     = 1 * time.Minute // Max time to ever wait between checks
)

Work sleep constants.

View Source
const (
	WorkStatePending   = 0
	WorkStateRunning   = 1
	WorkStateSucceeded = 2
	WorkStateFailed    = 3
	WorkStateCanceled  = 4
)

Work state constants.

Variables

View Source
var ErrImagePullBackOff = fmt.Errorf("container failed to start")

ErrImagePullBackOff is returned when the image for the container in the Pod cannot be pulled.

View Source
var ErrPending = fmt.Errorf("operation pending")

ErrPending is returned when an operation hasn't succeeded or failed yet.

View Source
var ErrPodCompleted = fmt.Errorf("pod ran to completion")

ErrPodCompleted is returned when pod has already completed before we could attach.

View Source
var ErrPodFailed = fmt.Errorf("pod failed to start")

ErrPodFailed is returned when pod has failed before we could attach.

Functions

func IsComplete

func IsComplete(workState int) bool

IsComplete returns true if a given WorkState indicates the job is finished.

func IsPending

func IsPending(err error) bool

IsPending returns true if the error is an ErrPending.

func WorkStateToString

func WorkStateToString(workState int) string

WorkStateToString returns a string representation of a WorkState.

Types

type BaseWorkUnit

type BaseWorkUnit struct {
	// contains filtered or unexported fields
}

BaseWorkUnit includes data common to all work units, and partially implements the WorkUnit interface.

func (*BaseWorkUnit) CancelContext

func (bwu *BaseWorkUnit) CancelContext()

func (*BaseWorkUnit) Debug

func (bwu *BaseWorkUnit) Debug(format string, v ...interface{})

Debug logs message with unitID prepended.

func (*BaseWorkUnit) Error

func (bwu *BaseWorkUnit) Error(format string, v ...interface{})

Error logs message with unitID prepended.

func (*BaseWorkUnit) ID

func (bwu *BaseWorkUnit) ID() string

ID returns the unique identifier of this work unit.

func (*BaseWorkUnit) Info

func (bwu *BaseWorkUnit) Info(format string, v ...interface{})

Info logs message with unitID prepended.

func (*BaseWorkUnit) Init

func (bwu *BaseWorkUnit) Init(w *Workceptor, unitID string, workType string, fs FileSystemer, watcher WatcherWrapper)

Init initializes the basic work unit data, in memory only.

func (*BaseWorkUnit) LastUpdateError

func (bwu *BaseWorkUnit) LastUpdateError() error

LastUpdateError returns the last error (including nil) resulting from an UpdateBasicStatus or UpdateFullStatus.

func (*BaseWorkUnit) Load

func (bwu *BaseWorkUnit) Load() error

Load loads status from a file.

func (*BaseWorkUnit) MonitorLocalStatus

func (bwu *BaseWorkUnit) MonitorLocalStatus()

MonitorLocalStatus watches a unit dir and keeps the in-memory workUnit up to date with status changes.

func (*BaseWorkUnit) Release

func (bwu *BaseWorkUnit) Release(force bool) error

Release releases this unit of work, deleting its files.

func (*BaseWorkUnit) Save

func (bwu *BaseWorkUnit) Save() error

Save saves status to a file.

func (*BaseWorkUnit) SetFromParams

func (bwu *BaseWorkUnit) SetFromParams(_ map[string]string) error

SetFromParams sets the in-memory state from parameters.

func (*BaseWorkUnit) Status

func (bwu *BaseWorkUnit) Status() *StatusFileData

Status returns a copy of the status currently loaded in memory (use Load to get it from disk).

func (*BaseWorkUnit) StatusFileName

func (bwu *BaseWorkUnit) StatusFileName() string

StatusFileName returns the full path to the status file in the unit dir.

func (*BaseWorkUnit) StdoutFileName

func (bwu *BaseWorkUnit) StdoutFileName() string

StdoutFileName returns the full path to the stdout file in the unit dir.

func (*BaseWorkUnit) UnitDir

func (bwu *BaseWorkUnit) UnitDir() string

UnitDir returns the unit directory of this work unit.

func (*BaseWorkUnit) UnredactedStatus

func (bwu *BaseWorkUnit) UnredactedStatus() *StatusFileData

UnredactedStatus returns a copy of the status currently loaded in memory, including secrets.

func (*BaseWorkUnit) UpdateBasicStatus

func (bwu *BaseWorkUnit) UpdateBasicStatus(state int, detail string, stdoutSize int64)

UpdateBasicStatus atomically updates key fields in the status metadata file. Errors are logged rather than returned. Passing -1 as stdoutSize leaves it unchanged.

func (*BaseWorkUnit) UpdateFullStatus

func (bwu *BaseWorkUnit) UpdateFullStatus(statusFunc func(*StatusFileData))

UpdateFullStatus atomically updates the whole status record. Changes should be made in the callback function. Errors are logged rather than returned.

func (*BaseWorkUnit) Warning

func (bwu *BaseWorkUnit) Warning(format string, v ...interface{})

Warning logs message with unitID prepended.

type CommandWorkerCfg

type CommandWorkerCfg struct {
	WorkType           string `required:"true" description:"Name for this worker type"`
	Command            string `required:"true" description:"Command to run to process units of work"`
	Params             string `description:"Command-line parameters"`
	AllowRuntimeParams bool   `description:"Allow users to add more parameters" default:"false"`
	VerifySignature    bool   `description:"Verify a signed work submission" default:"false"`
}

CommandWorkerCfg is the cmdline configuration object for a worker that runs a command.

func (CommandWorkerCfg) GetVerifySignature

func (cfg CommandWorkerCfg) GetVerifySignature() bool

func (CommandWorkerCfg) GetWorkType

func (cfg CommandWorkerCfg) GetWorkType() string

func (CommandWorkerCfg) NewWorker

func (cfg CommandWorkerCfg) NewWorker(w *Workceptor, unitID string, workType string) WorkUnit

func (CommandWorkerCfg) Run

func (cfg CommandWorkerCfg) Run() error

Run runs the action.

type FileReadCloser

type FileReadCloser interface {
	io.ReadCloser
}

FileReadCloser wraps io.ReadCloser.

type FileSystem

type FileSystem struct{}

FileSystem represents the real filesystem.

func (FileSystem) Open

func (FileSystem) Open(name string) (*os.File, error)

Open opens a file.

func (FileSystem) OpenFile

func (FileSystem) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error)

OpenFile opens a file on the filesystem.

func (FileSystem) RemoveAll

func (FileSystem) RemoveAll(path string) error

RemoveAll removes path and any children it contains.

func (FileSystem) Stat

func (FileSystem) Stat(name string) (os.FileInfo, error)

Stat retrieves the FileInfo for a given file name.

type FileSystemer

type FileSystemer interface {
	OpenFile(name string, flag int, perm os.FileMode) (*os.File, error)
	Stat(name string) (os.FileInfo, error)
	Open(name string) (*os.File, error)
	RemoveAll(path string) error
}

FileSystemer represents a filesystem.

type FileWriteCloser

type FileWriteCloser interface {
	io.WriteCloser
}

FileWriteCloser wraps io.WriteCloser.

type KubeWorkerCfg

type KubeWorkerCfg struct {
	WorkType            string `required:"true" description:"Name for this worker type"`
	Namespace           string `description:"Kubernetes namespace to create pods in"`
	Image               string `description:"Container image to use for the worker pod"`
	Command             string `description:"Command to run in the container (overrides entrypoint)"`
	Params              string `description:"Command-line parameters to pass to the entrypoint"`
	AuthMethod          string `description:"One of: kubeconfig, incluster" default:"incluster"`
	KubeConfig          string `description:"Kubeconfig filename (for authmethod=kubeconfig)"`
	Pod                 string `description:"Pod definition filename, in json or yaml format"`
	AllowRuntimeAuth    bool   `description:"Allow passing API parameters at runtime" default:"false"`
	AllowRuntimeCommand bool   `description:"Allow specifying image & command at runtime" default:"false"`
	AllowRuntimeParams  bool   `description:"Allow adding command parameters at runtime" default:"false"`
	AllowRuntimePod     bool   `description:"Allow passing Pod at runtime" default:"false"`
	DeletePodOnRestart  bool   `description:"On restart, delete the pod if in pending state" default:"true"`
	StreamMethod        string `description:"Method for connecting to worker pods: logger or tcp" default:"logger"`
	VerifySignature     bool   `description:"Verify a signed work submission" default:"false"`
}

KubeWorkerCfg is the cmdline configuration object for a Kubernetes worker plugin.

func (KubeWorkerCfg) GetVerifySignature

func (cfg KubeWorkerCfg) GetVerifySignature() bool

func (KubeWorkerCfg) GetWorkType

func (cfg KubeWorkerCfg) GetWorkType() string

func (KubeWorkerCfg) NewWorker

func (cfg KubeWorkerCfg) NewWorker(w *Workceptor, unitID string, workType string) WorkUnit

NewWorker is a factory to produce worker instances.

func (KubeWorkerCfg) Prepare

func (cfg KubeWorkerCfg) Prepare() error

Prepare inspects the configuration for validity.

func (KubeWorkerCfg) Run

func (cfg KubeWorkerCfg) Run() error

Run runs the action.

type NetceptorForWorkceptor

type NetceptorForWorkceptor interface {
	NodeID() string
	AddWorkCommand(typeName string, verifySignature bool) error
	GetClientTLSConfig(name string, expectedHostName string, expectedHostNameType netceptor.ExpectedHostnameType) (*tls.Config, error) // have a common pkg for types
	GetLogger() *logger.ReceptorLogger
	DialContext(ctx context.Context, node string, service string, tlscfg *tls.Config) (*netceptor.Conn, error) // create an interface for Conn
}

NetceptorForWorkceptor is a interface to decouple workceptor from netceptor. it includes only the functions that workceptor uses.

type NewWorkerFunc

type NewWorkerFunc func(w *Workceptor, unitID string, workType string) WorkUnit

NewWorkerFunc represents a factory of WorkUnit instances.

type RealWatcher

type RealWatcher struct {
	// contains filtered or unexported fields
}

func (*RealWatcher) Add

func (rw *RealWatcher) Add(name string) error

func (*RealWatcher) Close

func (rw *RealWatcher) Close() error

func (*RealWatcher) EventChannel

func (rw *RealWatcher) EventChannel() chan fsnotify.Event

type STDinReader

type STDinReader struct {
	// contains filtered or unexported fields
}

STDinReader reads from a stdin file and provides a Done function.

func NewStdinReader

func NewStdinReader(fs FileSystemer, unitdir string) (*STDinReader, error)

NewStdinReader allocates a new stdinReader, which reads from a stdin file and provides a Done function.

func (*STDinReader) Done

func (sr *STDinReader) Done() <-chan struct{}

Done returns a channel that will be closed on error (including EOF) in the reader.

func (*STDinReader) Error

func (sr *STDinReader) Error() error

Error returns the most recent error encountered in the reader.

func (*STDinReader) Read

func (sr *STDinReader) Read(p []byte) (n int, err error)

Read reads data from the stdout file, implementing io.Reader.

func (*STDinReader) SetReader

func (sr *STDinReader) SetReader(reader FileReadCloser)

SetReader sets the reader var.

type STDoutWriter

type STDoutWriter struct {
	// contains filtered or unexported fields
}

STDoutWriter writes to a stdout file while also updating the status file.

func NewStdoutWriter

func NewStdoutWriter(fs FileSystemer, unitdir string) (*STDoutWriter, error)

NewStdoutWriter allocates a new stdoutWriter, which writes to both the stdout and status files.

func (*STDoutWriter) SetWriter

func (sw *STDoutWriter) SetWriter(writer FileWriteCloser)

SetWriter sets the writer var.

func (*STDoutWriter) Size

func (sw *STDoutWriter) Size() int64

Size returns the current size of the stdout file.

func (*STDoutWriter) Write

func (sw *STDoutWriter) Write(p []byte) (n int, err error)

Write writes data to the stdout file and status file, implementing io.Writer.

type SigningKeyPrivateCfg

type SigningKeyPrivateCfg struct {
	PrivateKey      string `description:"Private key to sign work submissions" barevalue:"yes" default:""`
	TokenExpiration string `description:"Expiration of the signed json web token, e.g. 3h or 3h30m" default:""`
}

func (SigningKeyPrivateCfg) Prepare

func (cfg SigningKeyPrivateCfg) Prepare() error

func (SigningKeyPrivateCfg) PrepareSigningKeyPrivateCfg

func (cfg SigningKeyPrivateCfg) PrepareSigningKeyPrivateCfg() (*time.Duration, error)

type StatusFileData

type StatusFileData struct {
	State      int
	Detail     string
	StdoutSize int64
	WorkType   string
	ExtraData  interface{}
}

StatusFileData is the structure of the JSON data saved to a status file. This struct should only contain value types, except for ExtraData.

func (*StatusFileData) Load

func (sfd *StatusFileData) Load(filename string) error

Load loads status from a file.

func (*StatusFileData) Save

func (sfd *StatusFileData) Save(filename string) error

Save saves status to a file.

func (*StatusFileData) UpdateBasicStatus

func (sfd *StatusFileData) UpdateBasicStatus(filename string, state int, detail string, stdoutSize int64) error

UpdateBasicStatus atomically updates key fields in the status metadata file. Errors are logged rather than returned. Passing -1 as stdoutSize leaves it unchanged.

func (*StatusFileData) UpdateFullStatus

func (sfd *StatusFileData) UpdateFullStatus(filename string, statusFunc func(*StatusFileData)) error

UpdateFullStatus atomically updates the status metadata file. Changes should be made in the callback function. Errors are logged rather than returned.

type VerifyingKeyPublicCfg

type VerifyingKeyPublicCfg struct {
	PublicKey string `description:"Public key to verify signed work submissions" barevalue:"yes" default:""`
}

func (VerifyingKeyPublicCfg) Prepare

func (cfg VerifyingKeyPublicCfg) Prepare() error

func (VerifyingKeyPublicCfg) PrepareVerifyingKeyPublicCfg

func (cfg VerifyingKeyPublicCfg) PrepareVerifyingKeyPublicCfg() error

type WatcherWrapper

type WatcherWrapper interface {
	Add(name string) error
	Close() error
	EventChannel() chan fsnotify.Event
}

WatcherWrapper is wrapping the fsnofity Watcher struct and exposing the Event chan within.

type WorkUnit

type WorkUnit interface {
	ID() string
	UnitDir() string
	StatusFileName() string
	StdoutFileName() string
	Save() error
	Load() error
	SetFromParams(params map[string]string) error
	UpdateBasicStatus(state int, detail string, stdoutSize int64)
	UpdateFullStatus(statusFunc func(*StatusFileData))
	LastUpdateError() error
	Status() *StatusFileData
	UnredactedStatus() *StatusFileData
	Start() error
	Restart() error
	Cancel() error
	Release(force bool) error
}

WorkUnit represents a local unit of work.

type Workceptor

type Workceptor struct {
	Cancel context.CancelFunc

	SigningKey        string
	SigningExpiration time.Duration
	VerifyingKey      string
	// contains filtered or unexported fields
}

Workceptor is the main object that handles unit-of-work management.

var MainInstance *Workceptor

MainInstance is the global instance of Workceptor instantiated by the command-line main() function.

func New

func New(ctx context.Context, nc NetceptorForWorkceptor, dataDir string) (*Workceptor, error)

New constructs a new Workceptor instance.

func (*Workceptor) AllocateRemoteUnit

func (w *Workceptor) AllocateRemoteUnit(remoteNode, remoteWorkType, tlsClient, ttl string, signWork bool, params map[string]string) (WorkUnit, error)

AllocateRemoteUnit creates a new remote work unit and generates a local identifier for it.

func (*Workceptor) AllocateUnit

func (w *Workceptor) AllocateUnit(workTypeName string, params map[string]string) (WorkUnit, error)

AllocateUnit creates a new local work unit and generates an identifier for it.

func (*Workceptor) CancelUnit

func (w *Workceptor) CancelUnit(unitID string) error

CancelUnit cancels a unit of work, killing any processes.

func (*Workceptor) GetResults

func (w *Workceptor) GetResults(ctx context.Context, unitID string, startPos int64) (chan []byte, error)

GetResults returns a live stream of the results of a unit.

func (*Workceptor) ListKnownUnitIDs

func (w *Workceptor) ListKnownUnitIDs() []string

ListKnownUnitIDs returns a slice containing the known unit IDs.

func (*Workceptor) RegisterWithControlService

func (w *Workceptor) RegisterWithControlService(cs *controlsvc.Server) error

RegisterWithControlService registers this workceptor instance with a control service instance.

func (*Workceptor) RegisterWorker

func (w *Workceptor) RegisterWorker(typeName string, newWorkerFunc NewWorkerFunc, verifySignature bool) error

RegisterWorker notifies the Workceptor of a new kind of work that can be done.

func (*Workceptor) ReleaseUnit

func (w *Workceptor) ReleaseUnit(unitID string, force bool) error

ReleaseUnit releases (deletes) resources from a unit of work, including stdout. Release implies Cancel.

func (*Workceptor) ShouldVerifySignature

func (w *Workceptor) ShouldVerifySignature(workType string, signWork bool) bool

func (*Workceptor) StartUnit

func (w *Workceptor) StartUnit(unitID string) error

StartUnit starts a unit of work.

func (*Workceptor) UnitStatus

func (w *Workceptor) UnitStatus(unitID string) (*StatusFileData, error)

UnitStatus returns the state of a unit.

func (*Workceptor) VerifySignature

func (w *Workceptor) VerifySignature(signature string) error

type WorkerConfig

type WorkerConfig interface {
	GetWorkType() string
	GetVerifySignature() bool
	NewWorker(w *Workceptor, unitID string, workType string) WorkUnit
}

Directories

Path Synopsis
Package mock_workceptor is a generated GoMock package.
Package mock_workceptor is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL