scheduler

package
v0.32.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2023 License: GPL-3.0 Imports: 38 Imported by: 6

Documentation

Overview

Package scheduler lets the jobqueue server interact with the configured job scheduler (if any) to submit jobqueue runner clients and have them run on a compute cluster (or local machine).

Currently implemented schedulers are local, LSF, OpenStack and Kubernetes. The implementation of each supported scheduler type is in its own .go file.

It's a pseudo plug-in system in that it is designed so that you can easily add a go file that implements the methods of the scheduleri interface, to support a new job scheduler. On the other hand, there is no dynamic loading of these go files; they are all imported (they all belong to the scheduler package), and the correct one used at run time. To "register" a new scheduleri implementation you must add a case for it to New() and rebuild.

import "github.com/VertebrateResequencing/wr/jobqueue/scheduler"
s, err := scheduler.New("local", &scheduler.ConfigLocal{"bash"})
req := &scheduler.Requirements{RAM: 300, Time: 2 * time.Hour, Cores: 1}
err = s.Schedule("myWRRunnerClient -args", req, 24)
// wait, and when s.Busy() returns false, your command has been run 24 times

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrBadScheduler = "unknown scheduler name"
	ErrImpossible   = "scheduler cannot accept the job, since its resource requirements are too high"
	ErrBadFlavor    = "unknown server flavor"
)

Err* constants are found in the returned Errors under err.Err, so you can cast and check if it's a certain type of error.

Functions

This section is empty.

Types

type BadServerCallBack added in v0.10.0

type BadServerCallBack func(server *cloud.Server)

BadServerCallBack functions receive a server when a cloud scheduler discovers that a server it spawned no longer seems functional. It's possible that this was due to a temporary networking issue, in which case the callback will be called again with the same server when it is working fine again: check server.IsBad(). If it's bad, you'd probably call server.Destroy() after confirming the server is definitely unusable (eg. ask the end user to manually check).

type CloudConfig added in v0.12.0

type CloudConfig interface {
	// AddConfigFile takes a value like that of the ConfigFiles property of the
	// struct implementing this interface, and appends this value to what is
	// in ConfigFiles, or sets it if unset.
	AddConfigFile(spec string)

	// GetOSUser returns the default ssh login username for servers.
	GetOSUser() string

	// GetServerKeepTime returns the time to keep idle servers alive for.
	GetServerKeepTime() time.Duration
}

CloudConfig interface could be satisfied by the config option taken by cloud schedulers which have a ConfigFiles property, a property for configuring a default ssh login username, and a property for determining how long to keep idle servers.

type CmdStatus

type CmdStatus struct {
	Count   int
	Running [][2]int // a slice of [id, index] tuples
	Pending [][2]int // ditto
	Other   [][2]int // ditto, for jobs in some strange state
}

CmdStatus lets you describe how many of a given cmd are already in the job scheduler, and gives the details of those jobs.

type ConfigKubernetes added in v0.16.0

type ConfigKubernetes struct {
	// The image name (Docker Hub) to pull to run wr Runners with. Defaults to
	// 'ubuntu:latest'
	Image string

	// By default, containers in pods run as root. To run as a different user,
	// specify here.
	User string

	// Requested RAM, a pod will default to 64m, and be allocated more up to a
	// limit
	RAM int

	// Requested Disk space, in GB
	Disk int

	// PostCreationScript is the []byte content of a script you want executed
	// after a server is Spawn()ed. (Overridden during Schedule() by a
	// Requirements.Other["cloud_script"] value.)
	PostCreationScript []byte

	// ConfigMap to use in place of PostCreationScript
	ConfigMap string

	// ConfigFiles is a comma separated list of paths to config files that
	// should be copied over to all spawned pods. To handle a config file
	// that should remain relative to the home directory (and where the spawned
	// server may have a different username and thus home directory path
	// compared to the current server), use the prefix ~/ to signify the home
	// directory. It silently ignores files that don't exist locally.
	ConfigFiles string

	// DNSNameServers specifies any additional DNS Nameservers to use by default
	// kubernetes uses kubedns, and those set at cluster deployment which will
	// be set by a cluster administrator. See
	// https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/#pod-s-dns-config
	// for more details
	DNSNameServers []string

	// Shell is the name of the shell to use, but only 'bash' is really
	// guaranteed to work.
	Shell string

	// StateUpdateFrequency is the frequency at which to check spawned servers
	// that are being used to run things, to see if they're still alive. 0
	// (default) is treated as 1 minute.
	StateUpdateFrequency time.Duration

	// Namespace to initialise clientsets to
	Namespace string

	// TempMountPath defines the path at which to copy the wr binary to in the
	// container. It points to an empty volume shared between the init container
	// and main container and is where copied files are stored. It should always
	// be the same as what's currently running on the manager otherwise the cmd
	// passed to runCmd() will be incorrect. Also defined as $HOME
	TempMountPath string

	// LocalBinaryPath points to where the wr binary will be accessed to copy to
	// each pod. It should be generated by the invoking command.
	LocalBinaryPath string

	// Manager Directory to log to
	ManagerDir string

	// Debug mode sets requested resources at 1/10th the integer value. useful
	// for testing.
	Debug bool
}

ConfigKubernetes holds the configuration options for the kubernetes wr driver

func (*ConfigKubernetes) AddConfigFile added in v0.16.0

func (c *ConfigKubernetes) AddConfigFile(configFile string)

AddConfigFile takes a value as per the ConfigFiles property, and appends it to the existing ConfigFiles value (or sets it if unset).

func (*ConfigKubernetes) GetOSUser added in v0.16.0

func (c *ConfigKubernetes) GetOSUser() string

GetOSUser returns User, to meet the CloudConfig interface.

func (*ConfigKubernetes) GetServerKeepTime added in v0.16.0

func (c *ConfigKubernetes) GetServerKeepTime() time.Duration

GetServerKeepTime exists to meet the CloudConfig interface, but is not relevant here, so always returns 0 value.

type ConfigLSF

type ConfigLSF struct {
	// Deployment is one of "development" or "production".
	Deployment string

	// Shell is the shell to use to run the commands to interact with your job
	// scheduler; 'bash' is recommended.
	Shell string

	// PrivateKeyPath is the path to your private key that can be used to ssh
	// to LSF farm nodes to check on jobs if they become non-responsive.
	PrivateKeyPath string
}

ConfigLSF represents the configuration options required by the LSF scheduler. All are required with no usable defaults.

type ConfigLocal

type ConfigLocal struct {
	// Shell is the shell to use to run your commands with; 'bash' is
	// recommended.
	Shell string

	// StateUpdateFrequency is the frequency at which to re-check the queue to
	// see if anything can now run. 0 (default) is treated as 1 minute.
	StateUpdateFrequency time.Duration

	// MaxCores is the maximum number of CPU cores on the machine to use for
	// running jobs. Specifying more cores than the machine has results in using
	// as many cores as the machine has, which is also the default. Values
	// below 1 are treated as default.
	MaxCores int

	// MaxRAM is the maximum amount of machine memory to use for running jobs.
	// The unit is in MB, and defaults to all available memory. Specifying more
	// than this uses the default amount. Values below 1 are treated as default.
	MaxRAM int
}

ConfigLocal represents the configuration options required by the local scheduler. All are required with no usable defaults.

type ConfigOpenStack

type ConfigOpenStack struct {
	// ResourceName is the resource name prefix used to name any resources (such
	// as keys, security groups and servers) that need to be created.
	ResourceName string

	// OSPrefix is the prefix or full name of the Operating System image you
	// wish spawned servers to run by default (overridden during Schedule() by a
	// Requirements.Other["cloud_os"] value)
	OSPrefix string

	// OSUser is the login username of your chosen Operating System from
	// OSPrefix. (Overridden during Schedule() by a
	// Requirements.Other["cloud_user"] value.)
	OSUser string

	// OSRAM is the minimum RAM in MB needed to bring up a server instance that
	// runs your Operating System image. It defaults to 2048. (Overridden during
	// Schedule() by a Requirements.Other["cloud_os_ram"] value.)
	OSRAM int

	// OSDisk is the minimum disk in GB with which to bring up a server instance
	// that runs your Operating System image. It defaults to 1. (Overridden
	// during Schedule() by a Requirements.Disk value.)
	OSDisk int

	// FlavorRegex is a regular expression that you can use to limit what
	// flavors of server will be created to run commands on. The default of an
	// empty string means there is no limit, and any available flavor can be
	// used. (The flavor chosen for a command will be the flavor with the least
	// specifications (RAM, CPUs, Disk) capable of running the command, that
	// also satisfies this regex.)
	FlavorRegex string

	// FlavorSets is used to describe sets of flavors that will only run on
	// certain subsets of your available hardware. If a flavor in set 1 is
	// chosen, but OpenStack reports it isn't possible to create a server with
	// that flavor because there is no more available hardware to back it, then
	// the next best flavor in a different flavor set will be attempted. The
	// value here is a string in the form f1,f2;f3,f4 where f1 and f2 are in the
	// same set, and f3 and f4 are in a different set. The names of each flavor
	// are treates as regular expressions, so you may be able to describe all
	// the flavors in a set with a single entry.
	FlavorSets string

	// PostCreationScript is the []byte content of a script you want executed
	// after a server is Spawn()ed. (Overridden during Schedule() by a
	// Requirements.Other["cloud_script"] value.)
	PostCreationScript []byte

	// PostCreationForcedCommand is a command you want to always execute after
	// a server is Spawn(ed), regardless of any
	// Requirements.Other["cloud_script"] value. Unlike PostCreationScript, this
	// command will be run after the executable in the spawn cmd has been
	// uploaded to the server.
	PostCreationForcedCommand string

	// PreDestroyScript is the []byte content of a script you want executed
	// before it is destroyed.
	PreDestroyScript []byte

	// ConfigFiles is a comma separated list of paths to config files that
	// should be copied over to all spawned servers. Absolute paths are copied
	// over to the same absolute path on the new server. To handle a config file
	// that should remain relative to the home directory (and where the spawned
	// server may have a different username and thus home directory path
	// compared to the current server), use the prefix ~/ to signify the home
	// directory. It silently ignores files that don't exist locally.
	// (Appended to during Schedule() by a
	// Requirements.Other["cloud_config_files"] value.)
	ConfigFiles string

	// SavePath is an absolute path to a file on disk where details of any
	// created resources can be read from and written to.
	SavePath string

	// ServerKeepTime is the time to wait before an idle server is destroyed.
	// Zero duration means "never destroy due to being idle".
	ServerKeepTime time.Duration

	// StateUpdateFrequency is the frequency at which to check spawned servers
	// that are being used to run things, to see if they're still alive.
	// 0 (default) is treated as 1 minute.
	StateUpdateFrequency time.Duration

	// MaxInstances is the maximum number of instances we are allowed to spawn.
	// -1 means we will be limited by your quota, if any. 0 (the default) means
	// no additional instances will be spawned (commands will run locally on the
	// same instance the manager is running on).
	MaxInstances int

	// SimultaneousSpawns is the maximum number of instances we are allowed to
	// try and spawn simultaneously. 0 (the default) means unlimited. 1 would
	// mean all spawns occur sequentially, which may be more reliable, but would
	// result in very slow scale up.
	SimultaneousSpawns int

	// MaxLocalCores is the maximum number of cores that can be used to run
	// commands on the same instance the manager is running on. -1 (the default)
	// means all cores can be used. 0 will only allow 0 core cmds to run on it.
	// To distinguish "not defined" from 0, the value is a reference to an int.
	MaxLocalCores *int

	// MaxLocalRAM is the maximum number of MB of memory that can be used to run
	// commands on the same instance the manager is running on. -1 (the default)
	// means all memory can be used. 0 disables running commands on the
	// manager's instance. To distinguish "not defined" from 0, the value is a
	// reference to an int.
	MaxLocalRAM *int

	// Shell is the shell to use to run your commands with; 'bash' is
	// recommended.
	Shell string

	// ServerPorts are the TCP port numbers you need to be open for
	// communication with any spawned servers. At a minimum you will need to
	// specify []int{22}, unless the network you use has all ports open and does
	// not support applying security groups to servers, in which case you must
	// supply an empty slice.
	ServerPorts []int

	// UseConfigDrive, if set to true (default false), will cause all newly
	// spawned servers to mount a configuration drive, which is typically needed
	// for a network without DHCP.
	UseConfigDrive bool

	// CIDR describes the range of network ips that can be used to spawn
	// OpenStack servers on which to run our commands. The default is
	// "192.168.64.0/18", which allows for 16384 servers to be spawned. This
	// range ends at 192.168.127.255. If already in OpenStack, this chooses
	// which existing network (that the current host is attached to) to use.
	// Otherwise, this results in the creation of an appropriately configured
	// network and subnet.
	CIDR string

	// GatewayIP is the gateway ip address for the subnet that will be created
	// with the given CIDR. It defaults to 192.168.64.1.
	GatewayIP string

	// DNSNameServers is a slice of DNS IP addresses to use for lookups on the
	// created subnet. It defaults to Google's: []string{"8.8.4.4", "8.8.8.8"}.
	DNSNameServers []string

	// Umask is an optional umask to run remote commands under, to control the
	// permissions of files created on spawned OpenStack servers. If not
	// supplied (0), the umask used will be the default umask of the OSUser
	// user. Note that setting this will result in scheduled commands being
	// executed like `(umask Umask && cmd)`, which may present cross-platform
	// compatibility issues. (But should work on most linux-like systems.)
	Umask int
}

ConfigOpenStack represents the configuration options required by the OpenStack scheduler. All are required with no usable defaults, unless otherwise noted. This struct implements the CloudConfig interface.

func (*ConfigOpenStack) AddConfigFile added in v0.12.0

func (c *ConfigOpenStack) AddConfigFile(configFile string)

AddConfigFile takes a value as per the ConfigFiles property, and appends it to the existing ConfigFiles value (or sets it if unset).

func (*ConfigOpenStack) GetOSUser added in v0.16.0

func (c *ConfigOpenStack) GetOSUser() string

GetOSUser returns OSUser, to meet the CloudConfig interface.

func (*ConfigOpenStack) GetServerKeepTime added in v0.16.0

func (c *ConfigOpenStack) GetServerKeepTime() time.Duration

GetServerKeepTime returns ServerKeepTime, to meet the CloudConfig interface.

type Error

type Error struct {
	Scheduler string // the scheduler's Name
	Op        string // name of the method
	Err       string // one of our Err* vars
}

Error records an error and the operation and scheduler that caused it.

func (Error) Error

func (e Error) Error() string

type Host added in v0.24.0

type Host interface {
	// RunCmd runs the given cmd on the host, optionally in the background,
	// cancellable with the context, returning stdout, stderr from the command,
	// or an error if running the command wasn't possible.
	RunCmd(ctx context.Context, cmd string, background bool) (stdout, stderr string, err error)
}

Host interface let's us run a command on a local or remote host.

type MessageCallBack added in v0.10.0

type MessageCallBack func(msg string)

MessageCallBack functions receive a message that would be good to display to end users, so they understand current error conditions related to the scheduler.

type RecoveredHostDetails added in v0.16.0

type RecoveredHostDetails struct {
	Host     string        // host's hostname
	UserName string        // username needed to ssh log in to host
	TTD      time.Duration // frequency to check if the host is idle, and if so destroy it
}

RecoveredHostDetails lets you describe a host for supplying to Recover(). Not all fields are relevant for all schedulers. Some might use none, so a nil RecoveredHostDetails might be valid. Cloud schedulers need all fields specified.

type Requirements

type Requirements struct {
	RAM      int               // the expected peak RAM in MB Cmd will use while running
	Time     time.Duration     // the expected time Cmd will take to run
	Cores    float64           // how many processor cores the Cmd will use
	Disk     int               // the required local disk space in GB the Cmd needs to run
	Other    map[string]string // a map that will be passed through to the job scheduler, defining further arbitrary resource requirements
	CoresSet bool              // to distinguish between you specifying 0 Cores and not specifying Cores at all
	DiskSet  bool              // to distinguish between you specifying 0 Disk and not specifying Disk at all
	OtherSet bool
}

Requirements describes the resource requirements of the commands you want to run, so that when provided to a scheduler it will be able to schedule things appropriately.

func (*Requirements) Clone added in v0.18.1

func (req *Requirements) Clone() *Requirements

Clone creates a copy of the Requirements.

func (*Requirements) Stringify added in v0.4.0

func (req *Requirements) Stringify() string

Stringify represents the contents of the Requirements as a string, sorting the keys of Other to ensure the same result is returned for the same content every time. Note that the data in Other undergoes a 1-way transformation, so you cannot recreate the Requirements from the the output of this method.

type Scheduler

type Scheduler struct {
	Name string

	sync.Mutex
	// contains filtered or unexported fields
}

Scheduler gives you access to all of the methods you'll need to interact with a job scheduler.

func New

func New(ctx context.Context, name string, config interface{}) (*Scheduler, error)

New creates a new Scheduler to interact with the given job scheduler. Possible names so far are "lsf", "local", "openstack" and "kubernetes". You must also provide a config struct appropriate for your chosen scheduler, eg. for the local scheduler you will provide a ConfigLocal.

Providing a logger allows for debug messages to be logged somewhere, along with any "harmless" or unreturnable errors. If not supplied, we use a default logger that discards all log messages.

func (*Scheduler) Busy

func (s *Scheduler) Busy(ctx context.Context) bool

Busy reports true if there are any Schedule()d cmds still in the job scheduler's system. This is useful when testing and other situations where you want to avoid shutting down the server while there are still clients running/ about to run.

func (*Scheduler) Cleanup

func (s *Scheduler) Cleanup(ctx context.Context)

Cleanup means you've finished using a scheduler and it can delete any remaining jobs in its system and clean up any other used resources.

func (*Scheduler) GetHost added in v0.32.0

func (s *Scheduler) GetHost(hostName string) Host

GetHost will return a Host with the given host name. For cloud-based schedulers, you can cast the return value as a *cloud.Server. Returns nil on error (if a host with the given name doesn't exist).

func (*Scheduler) HostToID added in v0.10.0

func (s *Scheduler) HostToID(host string) string

HostToID will return the server id of the server with the given host name, if the scheduler is cloud based. Otherwise this just returns an empty string.

func (*Scheduler) MaxQueueTime

func (s *Scheduler) MaxQueueTime(req *Requirements) time.Duration

MaxQueueTime returns the maximum amount of time that jobs with the given resource requirements are allowed to run for in the job scheduler's queue. If the job scheduler doesn't have a queue system, or if the queue allows jobs to run forever, then this returns req.Time + 15 mins.

func (*Scheduler) ProcessNotRunngingOnHost added in v0.24.0

func (s *Scheduler) ProcessNotRunngingOnHost(ctx context.Context, pid int, hostName string) bool

ProcessNotRunngingOnHost will ssh to the given host and check if the given process id is still running. Returns true if it isn't. Returns false if it is running, or if the ssh wasn't possible. This is to find out if a process is really dead, or if there might just be a temporary networking problem where ssh might fail. The ssh attempt can be cancelled using the supplied context.

func (*Scheduler) Recover added in v0.16.0

func (s *Scheduler) Recover(ctx context.Context, cmd string, req *Requirements, host *RecoveredHostDetails) error

Recover is used if you had Scheduled some cmds, then you crashed, and now you're starting up again and want the scheduler to take in to account the fact that you still have some commands running on certain hosts. Doing this may allow us to avoid overcommitting resources or terminate unneeded hosts, if relevant for the scheduler in question. (For some schedulers, this does nothing.)

The cmd and req ought to exactly match those previously supplied to Schedule() before your crash.

func (*Scheduler) ReserveTimeout

func (s *Scheduler) ReserveTimeout(ctx context.Context, req *Requirements) int

ReserveTimeout returns the number of seconds that runners spawned in this scheduler should wait for new jobs to appear in the manager's queue.

func (*Scheduler) Schedule

func (s *Scheduler) Schedule(ctx context.Context, cmd string, req *Requirements, priority uint8, count int) error

Schedule gets your cmd scheduled in the job scheduler. You give it a command that you would like `count` identical instances of running via your job scheduler. If you already had `count` many scheduled, it will do nothing. If you had less than `count`, it will schedule more to run. If you have more than `count`, it will remove the appropriate number of scheduled (but not yet running) jobs that were previously scheduled for this same cmd (counts of 0 are legitimate - it will get rid of all non-running jobs for the cmd).

Typically schedulers will end up running cmds according to their "size" (cpu and memory needed as per the req), with larger cmds running first due to bin packing. Some schedulers will take the given priority in to account and try to run cmds with higher priorities before those with lower ones. Equal priority jobs will use the normal approach.

If no error is returned, you know all `count` of your jobs are now scheduled and will eventually run unless you call Schedule() again with the same command and a lower count. NB: there is no guarantee that the jobs run successfully, and no feedback on their success or failure is given.

func (*Scheduler) Scheduled added in v0.22.0

func (s *Scheduler) Scheduled(ctx context.Context, cmd string) (int, error)

Scheduled tells you how many of the given cmd are currently scheduled in the scheduler.

func (*Scheduler) SetBadServerCallBack added in v0.10.0

func (s *Scheduler) SetBadServerCallBack(ctx context.Context, cb BadServerCallBack)

SetBadServerCallBack sets the function that will be called when a cloud scheduler discovers that one of the servers it spawned seems to no longer be functional or reachable. Only relevant for cloud schedulers.

func (*Scheduler) SetMessageCallBack added in v0.10.0

func (s *Scheduler) SetMessageCallBack(ctx context.Context, cb MessageCallBack)

SetMessageCallBack sets the function that will be called when a scheduler has some message that could be informative to end users wondering why something is not getting scheduled. The message typically describes an error condition.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL