Documentation ¶
Overview ¶
Package server provides the main job scheduling interface for Scoot
Package server is a generated GoMock package.
Index ¶
- Constants
- Variables
- func GetRequestorClass(requestor string, requestorToClassMap map[string]string) string
- func NewStatefulScheduler(nodesUpdatesCh chan []cc.NodeUpdate, sc saga.SagaCoordinator, rf RunnerFactory, ...) *statefulScheduler
- type LoadBasedAlg
- type LoadBasedAlgConfig
- type MockScheduler
- func (m *MockScheduler) EXPECT() *MockSchedulerMockRecorder
- func (m *MockScheduler) GetClassLoadPercents() (map[string]int32, error)
- func (m *MockScheduler) GetRebalanceMinimumDuration() (time.Duration, error)
- func (m *MockScheduler) GetRebalanceThreshold() (int32, error)
- func (m *MockScheduler) GetRequestorToClassMap() (map[string]string, error)
- func (m *MockScheduler) GetSagaCoord() saga.SagaCoordinator
- func (m *MockScheduler) GetSchedulerStatus() (int, int)
- func (m *MockScheduler) KillJob(jobId string) error
- func (m *MockScheduler) OfflineWorker(req domain.OfflineWorkerReq) error
- func (m *MockScheduler) ReinstateWorker(req domain.ReinstateWorkerReq) error
- func (m *MockScheduler) ScheduleJob(jobDef domain.JobDefinition) (string, error)
- func (m *MockScheduler) SetClassLoadPercents(classLoads map[string]int32) error
- func (m *MockScheduler) SetRebalanceMinimumDuration(durationMin time.Duration) error
- func (m *MockScheduler) SetRebalanceThreshold(durationMin int32) error
- func (m *MockScheduler) SetRequestorToClassMap(requestorToClassMap map[string]string) error
- func (m *MockScheduler) SetSchedulerStatus(maxTasks int) error
- type MockSchedulerMockRecorder
- func (mr *MockSchedulerMockRecorder) GetClassLoadPercents() *gomock.Call
- func (mr *MockSchedulerMockRecorder) GetRebalanceMinimumDuration() *gomock.Call
- func (mr *MockSchedulerMockRecorder) GetRebalanceThreshold() *gomock.Call
- func (mr *MockSchedulerMockRecorder) GetRequestorToClassMap() *gomock.Call
- func (mr *MockSchedulerMockRecorder) GetSagaCoord() *gomock.Call
- func (mr *MockSchedulerMockRecorder) GetSchedulerStatus() *gomock.Call
- func (mr *MockSchedulerMockRecorder) KillJob(jobId interface{}) *gomock.Call
- func (mr *MockSchedulerMockRecorder) OfflineWorker(req interface{}) *gomock.Call
- func (mr *MockSchedulerMockRecorder) ReinstateWorker(req interface{}) *gomock.Call
- func (mr *MockSchedulerMockRecorder) ScheduleJob(jobDef interface{}) *gomock.Call
- func (mr *MockSchedulerMockRecorder) SetClassLoadPercents(classLoads interface{}) *gomock.Call
- func (mr *MockSchedulerMockRecorder) SetRebalanceMinimumDuration(durationMin interface{}) *gomock.Call
- func (mr *MockSchedulerMockRecorder) SetRebalanceThreshold(durationMin interface{}) *gomock.Call
- func (mr *MockSchedulerMockRecorder) SetRequestorToClassMap(requestorToClassMap interface{}) *gomock.Call
- func (mr *MockSchedulerMockRecorder) SetSchedulerStatus(maxTasks interface{}) *gomock.Call
- type MockSchedulingAlgorithm
- type MockSchedulingAlgorithmMockRecorder
- type PersistedSettings
- type Persistor
- type ReadyFn
- type RunnerFactory
- type Scheduler
- type SchedulerConfiguration
- type SchedulingAlgorithm
Constants ¶
const ( // Clients will check for this string to differentiate between scoot and user initiated actions. UserRequestedErrStr = "UserRequested" // Nothing should run forever by default, use this timeout as a fallback. DefaultDefaultTaskTimeout = 30 * time.Minute // Allow extra time when waiting for a task response. // This includes network time and the time to upload logs to bundlestore. DefaultTaskTimeoutOverhead = 15 * time.Second // Number of different requestors that can run jobs at any given time. DefaultMaxRequestors = 10 // Number of jobs any single requestor can have (to prevent spamming, not for scheduler fairness). DefaultMaxJobsPerRequestor = 100 // Set the maximum number of tasks we'd expect to queue to a nonzero value (it'll be overridden later). DefaultSoftMaxSchedulableTasks = 1 // Threshold for jobs considered long running LongJobDuration = 4 * time.Hour // How often Scheduler step is called in loop TickRate = 250 * time.Millisecond // The max job priority we respect (higher priority is untested and disabled) MaxPriority = domain.P2 // Max number of requestors to track tag history, and max number of tags per requestor to track DefaultMaxRequestorHistories = 1000000 DefaultMaxHistoryTags = 100 // Max number of task IDs to track durations for DefaultMaxTaskDurations = 1000000 )
const DeadLetterTrailer = " -> Error(s) encountered, canceling task."
const RebalanceRequestedErrStr = "RebalanceRequested"
Clients will check for this string to differentiate between scoot and user initiated actions.
Variables ¶
var ( DefaultLoadBasedSchedulerClassPercents = map[string]int32{ "land": 40, "diff": 25, "sandbox": 10, "regression": 17, "ktf": 3, "coverage": 2, "tryout": 2, "unknown": 1, } DefaultRequestorToClassMap = map[string]string{ "land.*": "land", "diff.*": "diff", "sandbox.*": "sandbox", "regression.*": "regression", "CI.*": "regression", "jenkins.*": "ktf", "ktf.*": "ktf", "coverage.*": "coverage", "tryout.*": "tryout", } DefaultMinRebalanceTime = time.Duration(4 * time.Minute) MaxTaskDuration = time.Duration(4 * time.Hour) )
defaults for the LoadBasedScheduler algorithm: only one class and all jobs map to that class
Functions ¶
func GetRequestorClass ¶
GetRequestorClass find the requestorToClass entry for requestor keys in requestorToClassEntry are regular expressions if no match is found, return "" for the class name
func NewStatefulScheduler ¶
func NewStatefulScheduler( nodesUpdatesCh chan []cc.NodeUpdate, sc saga.SagaCoordinator, rf RunnerFactory, config SchedulerConfiguration, stat stats.StatsReceiver, persistor Persistor, durationKeyExtractorFn func(string) string) *statefulScheduler
Create a New StatefulScheduler that implements the Scheduler interface cc.Cluster - cluster of worker nodes saga.SagaCoordinator - the Saga Coordinator to log to and recover from RunnerFactory - Function which converts a node to a Runner SchedulerConfig - additional configuration settings for the scheduler StatsReceiver - stats receiver to log statistics to specifying debugMode true, starts the scheduler up but does not start the update loop. Instead the loop must be advanced manually by calling step(), intended for debugging and test cases If recoverJobsOnStartup is true Active Sagas in the saga log will be recovered and rescheduled, otherwise no recovery will be done on startup
Types ¶
type LoadBasedAlg ¶
type LoadBasedAlg struct {
// contains filtered or unexported fields
}
LoadBasedAlg the scheduling algorithm computes the list of tasks to start and stop for the current iteration of the scheduler loop.
The algorithm uses the classLoadPercents, number of tasks currently running for each class, and number of available workers when computing the tasks to start/stop.
the algorithm has 3 main phases: rebalancing, entitlement allocation, loaning allocation.
- the rebalancing phase is only triggered when the number of tasks running for each class is very different than the class load percents. When rebalancing is run, the rebalancing computation will select the set of tasks that need to be stopped to bring classes that are over their target load percents back to the target load percents, and the tasks that can be started to replace the stopped tasks. Note: there may still be idle workers after the rebalance tasks are started/stopped. The next scheduling iteration will find tasks for these workers.
the entitlement part of the computation identifies the number of tasks to start to bring the number of running tasks for each class closer to its entitlement as defined in the classLoadPercents.
the loan part of the computation identifies the number of tasks over a class's entitlement that can be started to use up unused workers due to other classes not using their entitlement.
See README.md for more details
func NewLoadBasedAlg ¶
func NewLoadBasedAlg(config *LoadBasedAlgConfig, tasksByJobClassAndStartTimeSec map[taskClassAndStartKey]taskStateByJobIDTaskID) *LoadBasedAlg
NewLoadBasedAlg allocate a new LoadBaseSchedAlg object.
func (*LoadBasedAlg) GetDataStructureSizeStats ¶
func (lbs *LoadBasedAlg) GetDataStructureSizeStats() map[string]int
func (*LoadBasedAlg) GetTasksToBeAssigned ¶
func (lbs *LoadBasedAlg) GetTasksToBeAssigned(jobsNotUsed []*jobState, stat stats.StatsReceiver, cs *clusterState, jobsByRequestor map[string][]*jobState) ([]*taskState, []*taskState)
GetTasksToBeAssigned - the entry point to the load based scheduling algorithm The algorithm uses the classLoadPercents, number of tasks currently running for each class and number of available workers when computing the tasks to start/stop. The algorithm has 3 main phases: rebalancing, entitlement allocation, loaning allocation.
func (*LoadBasedAlg) LocalCopyClassLoadPercents ¶
func (lbs *LoadBasedAlg) LocalCopyClassLoadPercents() map[string]int
LocalCopyClassLoadPercents return a copy of the ClassLoadPercents leaving as int
type LoadBasedAlgConfig ¶
type LoadBasedAlgConfig struct {
// contains filtered or unexported fields
}
type MockScheduler ¶
type MockScheduler struct {
// contains filtered or unexported fields
}
MockScheduler is a mock of Scheduler interface
func NewMockScheduler ¶
func NewMockScheduler(ctrl *gomock.Controller) *MockScheduler
NewMockScheduler creates a new mock instance
func (*MockScheduler) EXPECT ¶
func (m *MockScheduler) EXPECT() *MockSchedulerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MockScheduler) GetClassLoadPercents ¶
func (m *MockScheduler) GetClassLoadPercents() (map[string]int32, error)
GetClassLoadPercents mocks base method
func (*MockScheduler) GetRebalanceMinimumDuration ¶
func (m *MockScheduler) GetRebalanceMinimumDuration() (time.Duration, error)
GetRebalanceMinimumDuration mocks base method
func (*MockScheduler) GetRebalanceThreshold ¶
func (m *MockScheduler) GetRebalanceThreshold() (int32, error)
GetRebalanceThreshold mocks base method
func (*MockScheduler) GetRequestorToClassMap ¶
func (m *MockScheduler) GetRequestorToClassMap() (map[string]string, error)
GetRequestorToClassMap mocks base method
func (*MockScheduler) GetSagaCoord ¶
func (m *MockScheduler) GetSagaCoord() saga.SagaCoordinator
GetSagaCoord mocks base method
func (*MockScheduler) GetSchedulerStatus ¶
func (m *MockScheduler) GetSchedulerStatus() (int, int)
GetSchedulerStatus mocks base method
func (*MockScheduler) KillJob ¶
func (m *MockScheduler) KillJob(jobId string) error
KillJob mocks base method
func (*MockScheduler) OfflineWorker ¶
func (m *MockScheduler) OfflineWorker(req domain.OfflineWorkerReq) error
OfflineWorker mocks base method
func (*MockScheduler) ReinstateWorker ¶
func (m *MockScheduler) ReinstateWorker(req domain.ReinstateWorkerReq) error
ReinstateWorker mocks base method
func (*MockScheduler) ScheduleJob ¶
func (m *MockScheduler) ScheduleJob(jobDef domain.JobDefinition) (string, error)
ScheduleJob mocks base method
func (*MockScheduler) SetClassLoadPercents ¶
func (m *MockScheduler) SetClassLoadPercents(classLoads map[string]int32) error
SetClassLoadPercents mocks base method
func (*MockScheduler) SetRebalanceMinimumDuration ¶
func (m *MockScheduler) SetRebalanceMinimumDuration(durationMin time.Duration) error
SetRebalanceMinimumDuration mocks base method
func (*MockScheduler) SetRebalanceThreshold ¶
func (m *MockScheduler) SetRebalanceThreshold(durationMin int32) error
SetRebalanceThreshold mocks base method
func (*MockScheduler) SetRequestorToClassMap ¶
func (m *MockScheduler) SetRequestorToClassMap(requestorToClassMap map[string]string) error
SetRequestorToClassMap mocks base method
func (*MockScheduler) SetSchedulerStatus ¶
func (m *MockScheduler) SetSchedulerStatus(maxTasks int) error
SetSchedulerStatus mocks base method
type MockSchedulerMockRecorder ¶
type MockSchedulerMockRecorder struct {
// contains filtered or unexported fields
}
MockSchedulerMockRecorder is the mock recorder for MockScheduler
func (*MockSchedulerMockRecorder) GetClassLoadPercents ¶
func (mr *MockSchedulerMockRecorder) GetClassLoadPercents() *gomock.Call
GetClassLoadPercents indicates an expected call of GetClassLoadPercents
func (*MockSchedulerMockRecorder) GetRebalanceMinimumDuration ¶
func (mr *MockSchedulerMockRecorder) GetRebalanceMinimumDuration() *gomock.Call
GetRebalanceMinimumDuration indicates an expected call of GetRebalanceMinimumDuration
func (*MockSchedulerMockRecorder) GetRebalanceThreshold ¶
func (mr *MockSchedulerMockRecorder) GetRebalanceThreshold() *gomock.Call
GetRebalanceThreshold indicates an expected call of GetRebalanceThreshold
func (*MockSchedulerMockRecorder) GetRequestorToClassMap ¶
func (mr *MockSchedulerMockRecorder) GetRequestorToClassMap() *gomock.Call
GetRequestorToClassMap indicates an expected call of GetRequestorToClassMap
func (*MockSchedulerMockRecorder) GetSagaCoord ¶
func (mr *MockSchedulerMockRecorder) GetSagaCoord() *gomock.Call
GetSagaCoord indicates an expected call of GetSagaCoord
func (*MockSchedulerMockRecorder) GetSchedulerStatus ¶
func (mr *MockSchedulerMockRecorder) GetSchedulerStatus() *gomock.Call
GetSchedulerStatus indicates an expected call of GetSchedulerStatus
func (*MockSchedulerMockRecorder) KillJob ¶
func (mr *MockSchedulerMockRecorder) KillJob(jobId interface{}) *gomock.Call
KillJob indicates an expected call of KillJob
func (*MockSchedulerMockRecorder) OfflineWorker ¶
func (mr *MockSchedulerMockRecorder) OfflineWorker(req interface{}) *gomock.Call
OfflineWorker indicates an expected call of OfflineWorker
func (*MockSchedulerMockRecorder) ReinstateWorker ¶
func (mr *MockSchedulerMockRecorder) ReinstateWorker(req interface{}) *gomock.Call
ReinstateWorker indicates an expected call of ReinstateWorker
func (*MockSchedulerMockRecorder) ScheduleJob ¶
func (mr *MockSchedulerMockRecorder) ScheduleJob(jobDef interface{}) *gomock.Call
ScheduleJob indicates an expected call of ScheduleJob
func (*MockSchedulerMockRecorder) SetClassLoadPercents ¶
func (mr *MockSchedulerMockRecorder) SetClassLoadPercents(classLoads interface{}) *gomock.Call
SetClassLoadPercents indicates an expected call of SetClassLoadPercents
func (*MockSchedulerMockRecorder) SetRebalanceMinimumDuration ¶
func (mr *MockSchedulerMockRecorder) SetRebalanceMinimumDuration(durationMin interface{}) *gomock.Call
SetRebalanceMinimumDuration indicates an expected call of SetRebalanceMinimumDuration
func (*MockSchedulerMockRecorder) SetRebalanceThreshold ¶
func (mr *MockSchedulerMockRecorder) SetRebalanceThreshold(durationMin interface{}) *gomock.Call
SetRebalanceThreshold indicates an expected call of SetRebalanceThreshold
func (*MockSchedulerMockRecorder) SetRequestorToClassMap ¶
func (mr *MockSchedulerMockRecorder) SetRequestorToClassMap(requestorToClassMap interface{}) *gomock.Call
SetRequestorToClassMap indicates an expected call of SetRequestorToClassMap
func (*MockSchedulerMockRecorder) SetSchedulerStatus ¶
func (mr *MockSchedulerMockRecorder) SetSchedulerStatus(maxTasks interface{}) *gomock.Call
SetSchedulerStatus indicates an expected call of SetSchedulerStatus
type MockSchedulingAlgorithm ¶
type MockSchedulingAlgorithm struct {
// contains filtered or unexported fields
}
MockSchedulingAlgorithm is a mock of SchedulingAlgorithm interface
func NewMockSchedulingAlgorithm ¶
func NewMockSchedulingAlgorithm(ctrl *gomock.Controller) *MockSchedulingAlgorithm
NewMockSchedulingAlgorithm creates a new mock instance
func (*MockSchedulingAlgorithm) EXPECT ¶
func (m *MockSchedulingAlgorithm) EXPECT() *MockSchedulingAlgorithmMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MockSchedulingAlgorithm) GetTasksToBeAssigned ¶
func (m *MockSchedulingAlgorithm) GetTasksToBeAssigned(jobs []*jobState, stat stats.StatsReceiver, cs *clusterState, requestors map[string][]*jobState) ([]*taskState, []*taskState)
GetTasksToBeAssigned mocks base method
type MockSchedulingAlgorithmMockRecorder ¶
type MockSchedulingAlgorithmMockRecorder struct {
// contains filtered or unexported fields
}
MockSchedulingAlgorithmMockRecorder is the mock recorder for MockSchedulingAlgorithm
func (*MockSchedulingAlgorithmMockRecorder) GetTasksToBeAssigned ¶
func (mr *MockSchedulingAlgorithmMockRecorder) GetTasksToBeAssigned(jobs, stat, cs, requestors interface{}) *gomock.Call
GetTasksToBeAssigned indicates an expected call of GetTasksToBeAssigned
type PersistedSettings ¶
type PersistedSettings struct { ClassLoadPercents map[string]int32 `json:"classLoadPercents"` RequestorToClassMap map[string]string `json:"requestorToClassMap"` RebalanceMinimumDurationMinutes int `json:"rebalanceMinimumDurationMinutes"` RebalanceThreshold int `json:"rebalanceThreshold"` Throttle int `json:"throttle"` }
PersistedSettings the persisted scheduler settings structure for encoding/decoding as json
type Persistor ¶
type Persistor interface { PersistSettings(settings *PersistedSettings) error LoadSettings() (*PersistedSettings, error) }
Persistor interface for persisting scheduler settings and initializing the scheduler from its persisted settings
type ReadyFn ¶
Cluster will use this function to determine if newly added nodes are ready to be used.
type Scheduler ¶
type Scheduler interface { ScheduleJob(jobDef domain.JobDefinition) (string, error) KillJob(jobId string) error GetSagaCoord() saga.SagaCoordinator OfflineWorker(req domain.OfflineWorkerReq) error ReinstateWorker(req domain.ReinstateWorkerReq) error SetSchedulerStatus(maxTasks int) error GetSchedulerStatus() (int, int) GetClassLoadPercents() (map[string]int32, error) SetClassLoadPercents(classLoads map[string]int32) error GetRequestorToClassMap() (map[string]string, error) SetRequestorToClassMap(requestorToClassMap map[string]string) error GetRebalanceMinimumDuration() (time.Duration, error) SetRebalanceMinimumDuration(durationMin time.Duration) error GetRebalanceThreshold() (int32, error) SetRebalanceThreshold(durationMin int32) error }
type SchedulerConfiguration ¶
type SchedulerConfiguration struct { MaxRetriesPerTask int DebugMode bool RecoverJobsOnStartup bool DefaultTaskTimeout time.Duration TaskTimeoutOverhead time.Duration RunnerRetryTimeout time.Duration RunnerRetryInterval time.Duration ReadyFnBackoff time.Duration MaxRequestors int MaxJobsPerRequestor int TaskThrottle int Admins []string SchedAlgConfig interface{} SchedAlg SchedulingAlgorithm }
SchedulerConfiguration variables read at initialization MaxRetriesPerTask - the number of times to retry a failing task before
marking it as completed.
DebugMode - if true, starts the scheduler up but does not start
the update loop. Instead the loop must be advanced manually by calling step()
RecoverJobsOnStartup - if true, the scheduler recovers active sagas,
from the sagalog, and restarts them.
DefaultTaskTimeout -
default timeout for tasks.
TaskTimeoutOverhead
How long to wait for a response after the task has timed out.
RunnerRetryTimeout -
how long to keep retrying a runner req.
RunnerRetryInterval -
how long to sleep between runner req retries.
ReadyFnBackoff -
how long to wait between runner status queries to determine [init] status.
TaskThrottle -
requestors will try not to schedule jobs that make the scheduler exceed the TaskThrottle. Note: Sickle may exceed it with retries.
func (*SchedulerConfiguration) String ¶
func (sc *SchedulerConfiguration) String() string
type SchedulingAlgorithm ¶
type SchedulingAlgorithm interface { GetTasksToBeAssigned(jobs []*jobState, stat stats.StatsReceiver, cs *clusterState, requestors map[string][]*jobState) (startTasks []*taskState, stopTasks []*taskState) }
SchedulingAlgorithm interface for the scheduling algorithm. Implementations will compute the list of tasks to start and stop