gcpbatchtracker

package module
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2023 License: BSD-2-Clause Imports: 26 Imported by: 6

README

gcpbatchtracker

DRMAA2 JobTracker implementation for Google Batch

Experimental Google Batch support for DRMAA2os.

How gcpbatchtracker Works

The project is created for embedding it as a backend in https://github.com/dgruber/drmaa2os

What gcpbatchtracker is

It is a basic DRMAA2 implementation for Google Batch for Go. The DRMAA2 JobTemplate can be used for submitting Google Batch jobs. The DRMAA2 JobInfo struct is used for getting the status of a job. The job state model is converted to the DRMAA2 spec.

How to use it

See examples directory which uses the interface directly.

Converting a DRMAA2 Job Template to an Google Batch Job

DRMAA2 JobTemplate Google Batch Job
RemoteCommand Command to execute in container or script or script path
Args In case of container the arguments of the command (if RemoteCommand empty then the arguments of entrypoint)
CandidateMachines[0] Machine type or when prefixed with "template:" it uses an instance template with that name
JobCategory Container image or $script$ or $scriptpath$ for other runnables which interpretes then RemoteCommand as script or script path
JobName JobID
AccountingID Sets a tag "accounting"
MinSlots Specifies the parallelism (how many tasks to run in parallel)
MaxSlots Specifies the amount of tasks to run. For MPI set MinSlots = MaxSlots.
MinPhysMemory MB of memory to request; should be set to increase from default to full machine size
ResourceLimits key could be "cpumilli", "bootdiskmib", "runtime" -> runtime limit like "30m" for 30 minutes

Override resource limits "cpumilli" to get full amount of resources one running just one task per machine (like 8000 for 8 cores)!

For StageInFiles and StageOutFiles see below.

In case of a container following files are always mounted from host:

    "/etc/cloudbatch-taskgroup-hosts:/etc/cloudbatch-taskgroup-hosts",
    "/etc/ssh:/etc/ssh",
    "/root/.ssh:/root/.ssh",

For a container the following runtime options are set:

  • "--network=host"

Default output path is cloud logging. If "OutputPath" is set it is changed to LogsPolicy_PATH with the OutputPath as destination.

JobTemplate Extensions
DRMAA2 JobTemplate Extension Key DRMAA2 JobTemplate Extension Value
ExtensionProlog / "prolog" String which contains prolog script executed on machine level before the job starts
ExtensionEpilog / "epilog" String which contains epilog script executed on machine level after the job ends successfully
ExtensionSpot / "spot" "true"/"t"/... when machine should be spot
ExctensionAccelerators / "accelerators" "Amount*Accelerator name" for machine (like "1*nvidia-tesla-v100")
ExtensionTasksPerNode / "tasks_per_node" Amount of tasks per node
ExtensionDockerOptions / "docker_options" Override of docker run options in case a container image is used
ExtensionGoogleSecretEnv / "secret_env" Used for populating env variables from Google Secret Manager. Please use SetSecretEnvironmentVariables()

JobInfo Fields

DRMAA2 JobInfo Batch Job
Slots Parallelism

Job Control Mapping

Did not yet find some way to put a job in hold, suspend, or release a job. Terminating a job deletes it...

Job State Mapping

DRMAA2 State Batch Job State
Done JobStatus_SUCCEEDED
Failed JobStatus_FAILED
Suspended -
Running JobStatus_RUNNING JobStatus_DELETION_IN_PROGRESS
Queued JobStatus_QUEUED JobStatus_SCHEDULED
Undetermined JobStatus_STATE_UNSPECIFIED

File staging using the Job Template

NFS (Google Filestore) and GCS is supported.

For NFS in containers besides directories also files can be specified. In case of files, the directory is mounted to the host and from there the file inside the container as specified in key. For the directory case a leading "/" is required.

    StageInFiles: map[string]string{
            "/etc/script.sh": "nfs:10.20.30.40:/filestore/user/dir/script.sh",
            "/mnt/dir": "nfs:10.20.30.40:/filestore/user/dir/",
            "/somedir": "gs://benchmarkfiles", // mount a bucket into container or host
        },

StageOutFiles creates a bucket if it does not exist before the job is submitted. If that failes then the job submission call fails. Currently only gs:// is evaluated in the StageOutFiles map.

    StageOutFiles: map[string]string{
            "/tmp/joboutput": "gs://outputbucket",
        },

Examples

See examples directory.

Documentation

Index

Constants

View Source
const (
	// ExtensionJobInfoJobTemplate is the job template stored in the job info
	// extension list as base64 encoded string
	ExtensionJobInfoJobTemplate = "jobtemplate_base64"
	// ExtensionJobInfoJobUID is the Google Batch internal job UID
	ExtensionJobInfoJobUID = "uid"
)
View Source
const (

	// job categories (otherwise it is a container image)
	JobCategoryScriptPath = "$scriptpath$" // treats RemoteCommand as path to script and ignores args
	JobCategoryScript     = "$script$"     // treats RemoteCommand as script and ignores args
	// Env variable name container job template
	EnvJobTemplate = "DRMAA2_JOB_TEMPLATE"
)
View Source
const (
	// ResourceLimitRuntime is the key for the ResourceLimits
	// map which defines the maximum runtime of a job. The
	// value is a string which can be parsed by time.ParseDuration.
	// Like "1m30s" for 1 minute 30 seconds.
	ResourceLimitRuntime = "runtime"
	// ResourceLimitBootDisk is the key for the ResourceLimits
	// map which defines the boot disk size of a job. The
	// value is a string which can be parsed by strconv.ParseInt
	// with base 10. The unit is MiB.
	ResourceLimitBootDisk = "bootdiskmib"
	// ResourceLimitCPUMilli is the key for the ResourceLimits
	// map which defines the CPU milli of a job. The
	// value is a string which can be parsed by strconv.ParseInt
	// with base 10. The unit is milli cores. Like 8000 for 8 cores.
	ResourceLimitCPUMilli = "cpumilli"
)
View Source
const (
	ExtensionProlog          = "prolog"
	ExtensionEpilog          = "epilog"
	ExtensionSpot            = "spot"
	ExtensionAccelerators    = "accelerators"
	ExtensionTasksPerNode    = "tasks_per_node"
	ExtensionDockerOptions   = "docker_options"
	ExtensionGoogleSecretEnv = "secret_env"
)
View Source
const (
	BatchTaskLogs = "batch_task_logs"
)

Variables

This section is empty.

Functions

func BatchJobToJobInfo

func BatchJobToJobInfo(project string, job *batchpb.Job) (drmaa2interface.JobInfo, error)

func ConvertJobState

func ConvertJobState(job *batchpb.Job) (drmaa2interface.JobState, string, error)

func ConvertJobTemplateToJobRequest

func ConvertJobTemplateToJobRequest(session, project, location string, jt drmaa2interface.JobTemplate) (*batchpb.CreateJobRequest, error)

func CopyFileFromBucket added in v0.1.2

func CopyFileFromBucket(bucket string, file string, localFile string) error

CopyFileFromBucket reads the content of an object from a bucket and writes it to a local file. It expects the bucket name to be prefixed with gs:// and not contain any other slashes.

func CopyFileToBucket added in v0.1.2

func CopyFileToBucket(bucket string, file string, localFile string) error

CopyFileToBucket writes the content of a local file to a bucket. It expects the bucket name to be prefixed with gs:// and not contain any other slashes.

func CreateMissingStageOutBuckets

func CreateMissingStageOutBuckets(project string, stageOutFiles map[string]string) error

func CreateRunnables

func CreateRunnables(barriers bool, prolog string) []*batchpb.Runnable

func DefaultCPUMilli added in v0.2.3

func DefaultCPUMilli(machine string) int64

DefaultCPUMilli returns the CPU resource limit in milli cores which fits to the given machine type.

func DeleteBucket added in v0.1.2

func DeleteBucket(bucket string) error

DeleteBucket deletes a bucket. It expects the bucket name prefixed with gs://. The bucket must be empty to be deleted.

func DeleteFileInBucket added in v0.1.2

func DeleteFileInBucket(bucket, file string) error

DeleteFileInBucket deletes a file in a bucket. It expects the bucket name prefixed with gs://. The file is the name of the file in the bucket (could be like testpath/testfile.txt).

func GetAcceleratorsExtension

func GetAcceleratorsExtension(jt drmaa2interface.JobTemplate) (string, int64, bool)

func GetDockerOptionsExtension

func GetDockerOptionsExtension(jt drmaa2interface.JobTemplate) (string, bool)

func GetJobOutput added in v0.2.3

func GetJobOutput(projectID, jobUid string, limit int64) ([]string, error)

func GetJobTemplateExtensionFromJobInfo added in v0.2.3

func GetJobTemplateExtensionFromJobInfo(ji drmaa2interface.JobInfo) (drmaa2interface.JobTemplate, bool)

GetJobTemplateExtensionFromJobInfo returns the job template which is stored in the job info extension list. If the job info does not contain a job template extension it returns false.

func GetJobTemplateFromBase64 added in v0.1.1

func GetJobTemplateFromBase64(base64encondedJT string) (drmaa2interface.JobTemplate, error)

func GetMachineEpilogExtension

func GetMachineEpilogExtension(jt drmaa2interface.JobTemplate) (string, bool)

func GetMachinePrologExtension

func GetMachinePrologExtension(jt drmaa2interface.JobTemplate) (string, bool)

func GetSecretEnvironmentVariables added in v0.2.1

func GetSecretEnvironmentVariables(jt drmaa2interface.JobTemplate) (map[string]string, bool)

func GetSpotExtension

func GetSpotExtension(jt drmaa2interface.JobTemplate) (bool, bool)

func GetTasksPerNodeExtension

func GetTasksPerNodeExtension(jt drmaa2interface.JobTemplate) (int64, bool)

func GetUIDExtensionFromJobInfo added in v0.2.3

func GetUIDExtensionFromJobInfo(ji drmaa2interface.JobInfo) (string, bool)

GetUIDExtensionFromJobInfo returns the Google Batch Job UID which is stored in the job info extension list. If the job info does not contain a UID extension it returns false.

func IsInDRMAA2Session

func IsInDRMAA2Session(client *batch.Client, session string, jobID string) bool

func IsInJobSession

func IsInJobSession(session string, job *batchpb.Job) bool

func JobTemplateToEnv added in v0.1.1

func JobTemplateToEnv(jt drmaa2interface.JobTemplate) (string, error)

func MountBucket

func MountBucket(jobRequest *batchpb.CreateJobRequest, execPosition int, destination, source string) *batchpb.CreateJobRequest

MountBucket mounts a bucket into the job request. The source is the bucket name prefixed with gs:// and the destination is the mount path inside the host or container.

func NewAllocator

func NewAllocator() *allocator

func ReadFromBucket added in v0.1.1

func ReadFromBucket(bucket string, file string) ([]byte, error)

ReadFromBucket reads the content of an object from a bucket. This is a convenience function to read files, like output files from a bucket. The bucket name must be prefixed with gs:// and must not contain any other slashes.

func SetAcceleratorsExtension

func SetAcceleratorsExtension(jt drmaa2interface.JobTemplate, count int64, accelerator string) drmaa2interface.JobTemplate

func SetDockerOptionsExtension

func SetDockerOptionsExtension(jt drmaa2interface.JobTemplate, dockerOptions string) drmaa2interface.JobTemplate

func SetSecretEnvironmentVariables added in v0.2.1

func SetSecretEnvironmentVariables(jt drmaa2interface.JobTemplate, secretEnv map[string]string) (drmaa2interface.JobTemplate, error)

SetSecretEnvironmentVariables sets environment variables which are retrieved from Google Secret Manager as JobTemplate extenion. The map key is the environment variable name and the value is the path to the secret (like "projects/dev/secrets/secret_message/versions/1")

func TimesFromStatusEvents added in v0.2.3

func TimesFromStatusEvents(events []*batchpb.StatusEvent) (dispatchTime, finishTime time.Time)

func WriteToBucket added in v0.1.2

func WriteToBucket(bucket string, file string, content []byte) error

WriteToBucket writes the content of a file to a bucket. It expects the bucket name to be prefixed with gs:// and not contain any other slashes.

Types

type GCPBatchTracker

type GCPBatchTracker struct {
	// contains filtered or unexported fields
}

GCPBatchTracker implements the JobTracker interface so that it can be used as backend in drmaa2os project.

func NewGCPBatchTracker

func NewGCPBatchTracker(drmaa2session string, project, location string) (*GCPBatchTracker, error)

NewGCPBatchTracker returns a new GCPBatchTracker instance which is used for managing jobs in Google Batch. The project and location parameters define the Google Cloud project and the location (like "us-central1"). The drmaa2session parameter is optional and can be used to filter for jobs which are in the same job session. If the job session is "" then all jobs are made visible. GCPBatchTracker implements the JobTracker interface so that it can be used as backend in drmaa2os project and wfl.

func (*GCPBatchTracker) AddArrayJob

func (t *GCPBatchTracker) AddArrayJob(jt drmaa2interface.JobTemplate, begin int, end int, step int, maxParallel int) (string, error)

AddArrayJob makes a mass submission of jobs defined by the same job template. Many HPC workload manager support job arrays for submitting 10s of thousands of similar jobs by one call. The additional parameters define how many jobs are submitted by defining a TASK_ID range. Begin is the first task ID (like 1), end is the last task ID (like 10), step is a positive integeger which defines the increments from one task ID to the next task ID (like 1). maxParallel is an arguments representating an optional functionality which instructs the backend to limit maxParallel tasks of this job arary to run in parallel. Note, that jobs use the TASK_ID environment variable to identifiy which task they are and determine that way what to do (like which data set is accessed).

With Google Batch job arrays can be created by using MinSlots and MaxSlots in AddJob(). MaxSlots defines the number of tasks in the job array. MinSlots defines the number of tasks which are run in parallel (for MPI).

func (*GCPBatchTracker) AddJob

AddJob creates a Google Batch job which is defined by the DRMAA2 job template. Job names must be unique in Google Batch hence it is automatically created by the backend. The CandidateMachines field is used to define the machine type (like "n2-standard-2") to be used. Exactly one machine type must be specified. The ResourceLimits field is used to define the CPU and runtime limits. On success the job ID (job name) is returned.

func (*GCPBatchTracker) CloseMonitoringSession

func (t *GCPBatchTracker) CloseMonitoringSession(name string) error

func (*GCPBatchTracker) DeleteJob

func (t *GCPBatchTracker) DeleteJob(jobID string) error

DeleteJob removes a job from a potential internal database. It does not stop a job. A job must be in an endstate (terminated, failed) in order to call DeleteJob. In case of an error or the job is not in an end state error must be returned. If the backend does not support cleaning up resources for a finished job nil should be returned.

func (*GCPBatchTracker) GetAllJobIDs

func (t *GCPBatchTracker) GetAllJobIDs(filter *drmaa2interface.JobInfo) ([]string, error)

func (*GCPBatchTracker) GetAllMachines

func (t *GCPBatchTracker) GetAllMachines(filter []string) ([]drmaa2interface.Machine, error)

func (*GCPBatchTracker) GetAllQueueNames

func (t *GCPBatchTracker) GetAllQueueNames(filter []string) ([]string, error)

func (*GCPBatchTracker) JobControl

func (t *GCPBatchTracker) JobControl(jobID string, action string) error

JobControl sends a request to the backend to either "terminate", "suspend", "resume", "hold", or "release" a job. The strings are fixed and are defined by the JobControl constants. This could change in the future to be limited only to constants representing the actions. When the request is not accepted by the system the function must return an error.

func (*GCPBatchTracker) JobInfo

func (t *GCPBatchTracker) JobInfo(jobID string) (drmaa2interface.JobInfo, error)

JobInfo returns the job status of a job in form of a JobInfo struct or an error.

func (*GCPBatchTracker) JobInfoFromMonitor

func (t *GCPBatchTracker) JobInfoFromMonitor(jobID string) (drmaa2interface.JobInfo, error)

JobInfoFromMonitor might collect job state and job info in a different way as a JobSession with persistent storage does

func (*GCPBatchTracker) JobOutput added in v0.2.3

func (t *GCPBatchTracker) JobOutput(jobID string, lastNLines int64) ([]string, error)

JobOutput is not part of JobTracker interface but it could be a future JobOutputer interface extension. This would be also useful for k8s, Docker, and other JobTracker which currently store the output as JobInfo extension. If lastNLines is 0 then all lines are returned.

func (*GCPBatchTracker) JobState

func (t *GCPBatchTracker) JobState(jobID string) (drmaa2interface.JobState, string, error)

JobState returns the DRMAA2 state and substate (free form string) of the job.

func (*GCPBatchTracker) JobTemplate added in v0.1.1

func (t *GCPBatchTracker) JobTemplate(jobID string) (drmaa2interface.JobTemplate, error)

func (*GCPBatchTracker) ListArrayJobs

func (t *GCPBatchTracker) ListArrayJobs(arrayjobID string) ([]string, error)

ListArrayJobs returns all job IDs an job array ID (or array job ID) represents or an error.

func (*GCPBatchTracker) ListJobCategories

func (t *GCPBatchTracker) ListJobCategories() ([]string, error)

ListJobCategories returns a list of job categories which can be used in the JobCategory field of the job template. The list is informational. An example is returning a list of supported container images. AddJob() and AddArrayJob() processes a JobTemplate and hence also the JobCategory field.

JobCategories supported by Google Batch are all container images which can be used by the service. If "$script$" is used as JobCategory then the RemoteCommand field of the JobTemplate is used as script. If "$script_path$" is used as JobCategory then the RemoteCommand field of the JobTemplate is used as path to a script which is executed.

func (*GCPBatchTracker) ListJobs

func (t *GCPBatchTracker) ListJobs() ([]string, error)

ListJobs returns all visible job IDs or an error.

func (*GCPBatchTracker) OpenMonitoringSession

func (t *GCPBatchTracker) OpenMonitoringSession(name string) error

func (*GCPBatchTracker) Wait

func (t *GCPBatchTracker) Wait(jobID string, timeout time.Duration, state ...drmaa2interface.JobState) error

Wait blocks until the job is either in one of the given states, the max. waiting time (specified by timeout) is reached or an other internal error occured (like job was not found). In case of a timeout also an error must be returned.

type GoogleBatchTrackerParams

type GoogleBatchTrackerParams struct {
	GoogleProjectID string
	Region          string
}

GoogleBatchTrackerParams provide parameters which can be passed to the SessionManager in order to pass things like Google project or region into the job tracker. It needs to be that complicated in order to be used but not tightly integrated with the SessionManager of the DRMAA2OS project, so that not all depenedencies have to be compiled in.

Directories

Path Synopsis
examples
drmaa2job Module
ssh Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL