drmaa2os

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 5, 2019 License: Apache-2.0 Imports: 15 Imported by: 0

README

drmaa2os - A Go API for OS Processes, Docker Containers, Cloud Foundry Tasks, Kubernetes Jobs, Grid Engine Jobs and more...

DRMAA2 for OS processes and more

CircleCI codecov

This is a Go API based on an open standard (Open Grid Forum DRMAA2) for submitting and supervising workloads running in operating system processes, containers, PODs, tasks, or HPC batch jobs.

The API allows you to develop and run job workflows in OS processes and switch later to containers running in Kubernetes, as Cloud Foundry tasks, pure Docker, or Singularity without changing the application logic.

Its main pupose is supporting you with an abstraction layer on top of platforms, workload managers, and HPC cluster schedulers, so that you don't need to deal with the underlaying details and differences.

An even simpler interface for creating job workflows without dealing with the DRMAA2 details is wfl which is based on the Go DRMAA2 implementation.

For details about the mapping of job operations please consult the platform specific READMEs:

Feedback welcome!

For a Go DRMAA2 wrapper based on C DRMAA2 (libdrmaa2.so) like for Univa Grid Engine please check out drmaa2.

Basic Usage

Following example demonstrates how a job running as OS process can be executed. More examples can be found in the examples subdirectory.

Note that at this point in time only JobSessions are implemented.

	sm, err := drmaa2os.NewDefaultSessionManager("testdb.db")
	if err != nil {
		panic(err)
	}

	js, err := sm.CreateJobSession("jobsession", "")
	if err != nil {
		panic(err)
	}

	jt := drmaa2interface.JobTemplate{
		RemoteCommand: "sleep",
		Args:          []string{"2"},
	}

	job, err := js.RunJob(jt)
	if err != nil {
		panic(err)
	}

	job.WaitTerminated(drmaa2interface.InfiniteTime)

	if job.GetState() == drmaa2interface.Done {
		job2, _ := js.RunJob(jt)
		job2.WaitTerminated(drmaa2interface.InfiniteTime)
	} else {
		fmt.Println("Failed to execute job1 successfully")
	}

	js.Close()
	sm.DestroyJobSession("jobsession")

Using other Backends

Using other backends for workload management and execution only differs in creating a different SessionManager. Different JobTemplate attributes might be neccessary when switching the implementation. If using a backend which supports container images it might be required to set the JobCategory to the container image name.

Docker

If Docker is installed locally it will automatically detect it. For pointing to a different host environment variables needs to be set before the SessionManager is created.

"Use DOCKER_HOST to set the url to the docker server. Use DOCKER_API_VERSION to set the version of the API to reach, leave empty for latest. Use DOCKER_CERT_PATH to load the TLS certificates from. Use DOCKER_TLS_VERIFY to enable or disable TLS verification, off by default."

	sm, err := drmaa2os.NewDockerSessionManager("testdb.db")
	if err != nil {
		panic(err)
	}

	js, err := sm.CreateJobSession("jobsession", "")
	if err != nil {
		panic(err)
	}

	jt := drmaa2interface.JobTemplate{
		RemoteCommand: "sleep",
		Args:          []string{"2"},
		JobCategory:   "busybox",
	}
	job, err := js.RunJob(jt)
	if err != nil {
		panic(err)
	}

	job.WaitTerminated(drmaa2interface.InfiniteTime)

	js.Close()
	sm.DestroyJobSession("jobsession")
Kubernetes
	sm, err := drmaa2os.NewKubernetesSessionManager("testdb.db")
	if err != nil {
		panic(err)
	}

	js, err := sm.CreateJobSession("jobsession", "")
	if err != nil {
		panic(err)
	}

	jt := drmaa2interface.JobTemplate{
		RemoteCommand: "sleep",
		Args:          []string{"2"},
		JobCategory:   "busybox",
	}
	job, err := js.RunJob(jt)
	if err != nil {
		panic(err)
	}

	job.WaitTerminated(drmaa2interface.InfiniteTime)

	js.Close()
	sm.DestroyJobSession("jobsession")
Cloud Foundry

The Cloud Foundry SessionManager requires details for connecting to the Cloud Foundry cloud controller API when being created. The JobCategory needs to be set to the application GUID which is the source of the container image of the task.

	sm, err := drmaa2os.NewCloudFoundrySessionManager("api.run.pivotal.io", "user", "password", "test.db")
	if err != nil {
		panic(err)
	}

	js, err := sm.CreateJobSession("jobsession", "")
	if err != nil {
		panic(err)
	}

	jt := drmaa2interface.JobTemplate{
		RemoteCommand: "dbbackup.sh",
		Args:          []string{"location"},
		JobCategory:   "123CFAPPGUID",
	}
	job, err := js.RunJob(jt)
	if err != nil {
		panic(err)
	}

	job.WaitTerminated(drmaa2interface.InfiniteTime)

	js.Close()
	sm.DestroyJobSession("jobsession")
Singularity

The Singularity SessionManager wraps the singularity command which is required to be installed. The container images can be provided in any form (like pointing to file or shub) but are required to be set as JobCategory for each job.

	sm, err := drmaa2os.NewSingularitySessionManager("testdb.db")
	if err != nil {
		panic(err)
	}

	js, err := sm.CreateJobSession("jobsession", "")
	if err != nil {
		panic(err)
	}

	jt := drmaa2interface.JobTemplate{
		RemoteCommand: "sleep",
		Args:          []string{"2"},
		JobCategory:   "shub://GodloveD/lolcow",
	}

	job, err := js.RunJob(jt)
	if err != nil {
		panic(err)
	}

	job.WaitTerminated(drmaa2interface.InfiniteTime)

	js.Close()
	sm.DestroyJobSession("jobsession")

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrorUnsupportedOperation = DRMAA2Error{"This optional function is not suppported."}
	ErrorJobNotExists         = DRMAA2Error{"The job does not exist."}
	ErrorInvalidState         = DRMAA2Error{"Invalid state."}
	ErrorInternal             = DRMAA2Error{"Internal error occurred."}
	ErrorInvalidSession       = DRMAA2Error{"The session used for the method call is not valid."}
)

Functions

func CloseMonitoringSession

func CloseMonitoringSession() error

func GetAllMachines

func GetAllMachines(names []string) ([]drmaa2interface.Machine, error)

func GetAllQueues

func GetAllQueues(names []string) ([]drmaa2interface.Queue, error)

Types

type ArrayJob

type ArrayJob struct {
	// contains filtered or unexported fields
}

ArrayJob represents a set of jobs created by one operation. In DRMAA, JobArray instances are only created by the RunBulkJobs method. JobArray instances differ from the JobList data structure due to their potential for representing a DRM system concept, while JobList is a DRMAA-only concept realized by language binding support.

func (*ArrayJob) GetID

func (aj *ArrayJob) GetID() string

GetID reports the job identifier assigned to the job array by the DRM system in text form.

func (*ArrayJob) GetJobTemplate

func (aj *ArrayJob) GetJobTemplate() drmaa2interface.JobTemplate

GetJobTemplate provides a reference to a JobTemplate instance that has equal values to the one that was used for the job submission creating this JobArray instance.

func (*ArrayJob) GetJobs

func (aj *ArrayJob) GetJobs() []drmaa2interface.Job

GetJobs provides the list of jobs that are part of the job array, regardless of their state.

func (*ArrayJob) GetSessionName

func (aj *ArrayJob) GetSessionName() string

GetSessionName states the name of the JobSession that was used to create the bulk job represented by this instance. If the session name cannot be determined, for example since the bulk job was created outside of a DRMAA session, the attribute SHOULD have an UNSET value (i.e. is "").

func (*ArrayJob) Hold

func (aj *ArrayJob) Hold() error

Hold triggers a transition from QUEUED to QUEUED_HELD, or from REQUEUED to REQUEUED_HELD state.

The job control functions allow modifying the status of the job array in the DRM system, with the same semantic as in the Job interface. If one of the jobs in the array is in an inappropriate state for the particular method, the method MAY raise an InvalidStateException.

The methods SHOULD return after the action has been acknowledged by the DRM system for all jobs in the array, but MAY return before the action has been completed for all of the jobs. Some DRMAA implementations MAY allow this method to be used to control job arrays created externally to the DRMAA session. This behavior is implementation-specific.

func (*ArrayJob) Release

func (aj *ArrayJob) Release() error

Release triggers a transition from QUEUED_HELD to QUEUED, or from REQUEUED_HELD to REQUEUED state.

The job control functions allow modifying the status of the job array in the DRM system, with the same semantic as in the Job interface. If one of the jobs in the array is in an inappropriate state for the particular method, the method MAY raise an InvalidStateException.

The methods SHOULD return after the action has been acknowledged by the DRM system for all jobs in the array, but MAY return before the action has been completed for all of the jobs. Some DRMAA implementations MAY allow this method to be used to control job arrays created externally to the DRMAA session. This behavior is implementation-specific.

func (*ArrayJob) Resume

func (aj *ArrayJob) Resume() error

Resume triggers a job state transition from SUSPENDED to RUNNING state.

The job control functions allow modifying the status of the job array in the DRM system, with the same semantic as in the Job interface. If one of the jobs in the array is in an inappropriate state for the particular method, the method MAY raise an InvalidStateException.

The methods SHOULD return after the action has been acknowledged by the DRM system for all jobs in the array, but MAY return before the action has been completed for all of the jobs. Some DRMAA implementations MAY allow this method to be used to control job arrays created externally to the DRMAA session. This behavior is implementation-specific.

func (*ArrayJob) Suspend

func (aj *ArrayJob) Suspend() error

Suspend triggers a job state transition from RUNNING to SUSPENDED state.

The job control functions allow modifying the status of the job array in the DRM system, with the same semantic as in the Job interface. If one of the jobs in the array is in an inappropriate state for the particular method, the method MAY raise an InvalidStateException.

The methods SHOULD return after the action has been acknowledged by the DRM system for all jobs in the array, but MAY return before the action has been completed for all of the jobs. Some DRMAA implementations MAY allow this method to be used to control job arrays created externally to the DRMAA session. This behavior is implementation-specific.

func (*ArrayJob) Terminate

func (aj *ArrayJob) Terminate() error

Terminate triggers a transition from any of the "Started" states to one of the "Terminated" states.

The job control functions allow modifying the status of the job array in the DRM system, with the same semantic as in the Job interface. If one of the jobs in the array is in an inappropriate state for the particular method, the method MAY raise an InvalidStateException.

The methods SHOULD return after the action has been acknowledged by the DRM system for all jobs in the array, but MAY return before the action has been completed for all of the jobs. Some DRMAA implementations MAY allow this method to be used to control job arrays created externally to the DRMAA session. This behavior is implementation-specific.

type DRMAA2Error

type DRMAA2Error struct {
	// contains filtered or unexported fields
}

func (DRMAA2Error) Error

func (d DRMAA2Error) Error() string

type Job

type Job struct {
	// contains filtered or unexported fields
}

Job represents a single computational activity that is executed by the DRM system. There are three relevant method sets for working with jobs: The JobSession interface represents all control and monitoring functions available for jobs. The Job interface represents the common control functionality for one existing job. Sets of jobs resulting from a bulk submission are controllable as a whole by the JobArray interface.

func GetAllJobs

func GetAllJobs(filter drmaa2interface.JobInfo) ([]Job, error)

func (*Job) GetID

func (j *Job) GetID() string

GetID returns the job identifier assigned by the DRM system in text form. This method is expected to be used as a fast alternative to the fetching of a complete JobInfo instance.

func (*Job) GetJobInfo

func (j *Job) GetJobInfo() (drmaa2interface.JobInfo, error)

GetJobInfo returns a JobInfo instance for the particular job.

func (*Job) GetJobTemplate

func (j *Job) GetJobTemplate() (drmaa2interface.JobTemplate, error)

GetJobTemplate returns a reference to a JobTemplate instance that has equal values to the one that was used for the job submission creating this Job instance. For jobs created outside of a DRMAA session, implementations MUST also return a JobTemplate instance here, which MAY be empty or only partially filled.

func (*Job) GetSessionName

func (j *Job) GetSessionName() string

GetSessionName reports the name of the JobSession that was used to create the job. If the session name cannot be determined, for example since the job was created outside of a DRMAA session, the attribute SHOULD be UNSET (i.e. equals "").

func (*Job) GetState

func (j *Job) GetState() drmaa2interface.JobState

GetState allows the application to get the current status of the job according to the DRMAA state model, together with an implementation specific sub state (see Section 8.1). It is intended as a fast alternative to the fetching of a complete JobInfo instance.

func (*Job) Hold

func (j *Job) Hold() error

Hold triggers a transition from QUEUED to QUEUED_HELD, or from REQUEUED to REQUEUED_HELD state.

func (*Job) Reap

func (j *Job) Reap() error

Reap is intended to let the DRMAA implementation clean up any data about this job. The motivating factor are long-running applications maintaining large amounts of jobs as part of a monitoring session. Using a reaped job in any subsequent activity MUST generate an InvalidArgumentException for the job parameter. This function MUST only work for jobs in "Terminated" states, so that the job is promised to not change its status while being reaped.

func (*Job) Release

func (j *Job) Release() error

Release triggers a transition from QUEUED_HELD to QUEUED, or from REQUEUED_HELD to REQUEUED state.

func (*Job) Resume

func (j *Job) Resume() error

Resume triggers a job state transition from SUSPENDED to RUNNING state.

func (*Job) Suspend

func (j *Job) Suspend() error

Suspend triggers a job state transition from RUNNING to SUSPENDED state.

func (*Job) Terminate

func (j *Job) Terminate() error

Terminate triggers a transition from any of the "Started" states to one of the "Terminated" states.

func (*Job) WaitStarted

func (j *Job) WaitStarted(timeout time.Duration) error

WaitStarted blocks until the job entered one of the "Started" states.

func (*Job) WaitTerminated

func (j *Job) WaitTerminated(timeout time.Duration) error

WaitTerminated blocks until the job entered one of the "Terminated" states

type JobSession

type JobSession struct {
	// contains filtered or unexported fields
}

JobSession instance acts as container for job instances controlled through the DRMAA API. The session methods support the submission of new jobs and the monitoring of existing jobs.

func (*JobSession) Close

func (js *JobSession) Close() error

Close MUST perform the necessary action to disengage from the DRM system. It SHOULD be callable only once, by only one of the application threads. This SHOULD be ensured by the library implementation. Additional calls beyond the first one SHOULD lead to a InvalidSessionException error notification. The corresponding state information MUST be saved to some stable storage before the method returns. This method SHALL NOT affect any jobs or reservations in the session (e.g., queued and running jobs remain queued and running). (TODO)

func (*JobSession) GetContact

func (js *JobSession) GetContact() (string, error)

GetContact method reports the contact value that was used in the SessionManager::createJobSession call for this instance. If no value was originally provided, the default contact string from the implementation MUST be returned.

func (*JobSession) GetJobArray

func (js *JobSession) GetJobArray(id string) (drmaa2interface.ArrayJob, error)

GetJobArray method returns the JobArray instance with the given ID. If the session does not / no longer contain the according job array, InvalidArgumentException SHALL be thrown.

func (*JobSession) GetJobCategories

func (js *JobSession) GetJobCategories() ([]string, error)

GetJobCategories provides the list of valid job category names which can be used for the jobCategory attribute in a JobTemplate instance.

func (*JobSession) GetJobs

func (js *JobSession) GetJobs(filter drmaa2interface.JobInfo) ([]drmaa2interface.Job, error)

GetJobs returns the set of jobs that belong to the job session. The filter parameter allows to choose a subset of the session jobs as return value. If no job matches or the session has no jobs attached, the method MUST return an empty set. If filter is UNSET, all session jobs MUST be returned. Time-dependent effects of this method, such as jobs no longer matching to filter criteria on evaluation time, are implementation-specific. The purpose of the filter parameter is to keep scalability with a large number of jobs per session. Applications therefore must consider the possibly changed state of jobs during their evaluation of the method result.

func (*JobSession) GetSessionName

func (js *JobSession) GetSessionName() (string, error)

GetSessionName reports the session name, a value that resulted from the SessionManager::createJobSession or SessionManager::openJobSession call for this instance.

func (*JobSession) RunBulkJobs

func (js *JobSession) RunBulkJobs(jt drmaa2interface.JobTemplate, begin, end, step, maxParallel int) (drmaa2interface.ArrayJob, error)

RunBulkJobs method creates a set of parametric jobs, each with attributes as defined in the given job template instance.

func (*JobSession) RunJob

RunJob method submits a job with the attributes defined in the given job template instance. The method returns a Job object that represents the job in the underlying DRM system. Depending on the job template settings, submission attempts may be rejected with an InvalidArgumentException. The error details SHOULD provide further information about the attribute(s) responsible for the rejection. When this method returns a valid Job instance, the following conditions SHOULD be fulfilled:

  • The job is part of the persistent state of the job session.
  • All non-DRMAA and DRMAA interfaces to the DRM system report the job as being submitted to the DRM system.
  • The job has one of the DRMAA job states.

func (*JobSession) WaitAnyStarted

func (js *JobSession) WaitAnyStarted(jobs []drmaa2interface.Job, timeout time.Duration) (drmaa2interface.Job, error)

WaitAnyStarted method blocks until any of the jobs referenced in the jobs parameter entered one of the "Started" states.

The timeout argument specifies the desired waiting time for the state change. The constant value drmaa2interface.InfiniteTime MUST be supported to get an indefinite waiting time. The constant value drmaa2interface.ZeroTime MUST be supported to express that the method call SHALL return immediately. A time.Duration can be specified to indicate the maximum waiting time. If the method call returns because of timeout, an TimeoutException SHALL be raised.

func (*JobSession) WaitAnyTerminated

func (js *JobSession) WaitAnyTerminated(jobs []drmaa2interface.Job, timeout time.Duration) (drmaa2interface.Job, error)

WaitAnyTerminated method blocks until any of the jobs referenced in the jobs parameter entered one of the "Terminated" states.

The timeout argument specifies the desired waiting time for the state change. The constant value drmaa2interface.InfiniteTime MUST be supported to get an indefinite waiting time. The constant value drmaa2interface.ZeroTime MUST be supported to express that the method call SHALL return immediately. A time.Duration can be specified to indicate the maximum waiting time. If the method call returns because of timeout, an TimeoutException SHALL be raised.

type MonitoringSession

type MonitoringSession struct {
}

type Reservation

type Reservation struct {
}

func GetAllReservations

func GetAllReservations() ([]Reservation, error)

func (*Reservation) GetID

func (r *Reservation) GetID() (string, error)

func (*Reservation) GetInfo

func (*Reservation) GetSessionName

func (r *Reservation) GetSessionName() (string, error)

func (*Reservation) GetTemplate

func (*Reservation) Terminate

func (r *Reservation) Terminate() error

type ReservationSession

type ReservationSession struct {
}

func (*ReservationSession) Close

func (rs *ReservationSession) Close() error

func (*ReservationSession) GetContact

func (rs *ReservationSession) GetContact() (string, error)

func (*ReservationSession) GetReservation

func (*ReservationSession) GetReservations

func (rs *ReservationSession) GetReservations() ([]Reservation, error)

func (*ReservationSession) GetSessionName

func (rs *ReservationSession) GetSessionName() (string, error)

func (*ReservationSession) RequestReservation

type SessionManager

type SessionManager struct {
	// contains filtered or unexported fields
}

SessionManager allows to create, list, and destroy job, reserveration, and monitoring sessions. It also returns holds basic information about the resource manager and its capabilities.

func NewCloudFoundrySessionManager

func NewCloudFoundrySessionManager(addr, username, password, dbpath string) (*SessionManager, error)

NewCloudFoundrySessionManager creates a SessionManager which maintains jobs as Cloud Foundry tasks. addr needs to point to the cloud controller API and username and password needs to be set as well.

func NewDefaultSessionManager

func NewDefaultSessionManager(dbpath string) (*SessionManager, error)

NewDefaultSessionManager creates a SessionManager which starts jobs as processes.

func NewDockerSessionManager

func NewDockerSessionManager(dbpath string) (*SessionManager, error)

NewDockerSessionManager creates a SessionManager which maintains jobs as Docker containers.

func NewKubernetesSessionManager

func NewKubernetesSessionManager(dbpath string) (*SessionManager, error)

NewKubernetesSessionManager creates a new session manager which uses Kubernetes tasks as execution backend for jobs.

func NewSingularitySessionManager added in v0.2.0

func NewSingularitySessionManager(dbpath string) (*SessionManager, error)

NewSingularitySessionManager creates a new session manager creating and maintaining jobs as Singularity containers.

func (*SessionManager) CreateJobSession

func (sm *SessionManager) CreateJobSession(name, contact string) (drmaa2interface.JobSession, error)

CreateJobSession creates a new JobSession for managing jobs.

func (*SessionManager) CreateReservationSession

func (sm *SessionManager) CreateReservationSession(name, contact string) (drmaa2interface.ReservationSession, error)

CreateReservationSession creates a new ReservationSession.

func (*SessionManager) DestroyJobSession

func (sm *SessionManager) DestroyJobSession(name string) error

DestroyJobSession destroys a job session by name.

func (*SessionManager) DestroyReservationSession

func (sm *SessionManager) DestroyReservationSession(name string) error

DestroyReservationSession removes a reservation session.

func (*SessionManager) GetDrmsName

func (sm *SessionManager) GetDrmsName() (string, error)

GetDrmsName returns the name of the distributed resource manager.

func (*SessionManager) GetDrmsVersion

func (sm *SessionManager) GetDrmsVersion() (drmaa2interface.Version, error)

GetDrmsVersion returns the version of the distributed resource manager.

func (*SessionManager) GetJobSessionNames

func (sm *SessionManager) GetJobSessionNames() ([]string, error)

GetJobSessionNames returns a list of all job sessions.

func (*SessionManager) GetReservationSessionNames

func (sm *SessionManager) GetReservationSessionNames() ([]string, error)

GetReservationSessionNames returns a list of all reservation sessions.

func (*SessionManager) OpenJobSession

func (sm *SessionManager) OpenJobSession(name string) (drmaa2interface.JobSession, error)

OpenJobSession creates a new session for managing jobs. The semantic of a job session and the job session name depends on the resource manager.

func (*SessionManager) OpenMonitoringSession

func (sm *SessionManager) OpenMonitoringSession(sessionName string) (drmaa2interface.MonitoringSession, error)

OpenMonitoringSession opens a session for monitoring jobs.

func (*SessionManager) OpenReservationSession

func (sm *SessionManager) OpenReservationSession(name string) (drmaa2interface.ReservationSession, error)

OpenReservationSession opens a reservation session.

func (*SessionManager) RegisterEventNotification

func (sm *SessionManager) RegisterEventNotification() (drmaa2interface.EventChannel, error)

RegisterEventNotification creates an event channel which emits events when the conditions described in the given notification specification are met.

func (*SessionManager) Supports

func (sm *SessionManager) Supports(capability drmaa2interface.Capability) bool

Supports returns true of false of the given Capability is supported by DRMAA2OS.

type SessionType

type SessionType int

SessionType represents the selected resource manager.

const (
	DefaultSession      SessionType = iota // processes
	DockerSession                          // containers
	CloudFoundrySession                    // application tasks
	KubernetesSession                      // pods
	SingularitySession                     // Singularity containers
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL