queue

package
v0.0.0-...-a134451 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2024 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultStageInitialBackoffDuration is the default value for the initial backoff duration
	// for unschedulable stages. To change the default stageInitialBackoffDurationSeconds used by the
	// scheduler, update the ComponentConfig value in defaults.go
	DefaultStageInitialBackoffDuration = 1 * time.Second
	// DefaultStageMaxBackoffDuration is the default value for the max backoff duration
	// for unschedulable stages. To change the default stageMaxBackoffDurationSeconds used by the
	// scheduler, update the ComponentConfig value in defaults.go
	DefaultStageMaxBackoffDuration = 10 * time.Second
)
View Source
const (
	// DefaultStageMaxInUnschedulableStagesDuration is the default value for the maximum
	// time a stage can stay in unschedulableStages. If a stage stays in unschedulableStages
	// for longer than this value, the stage will be moved from unschedulableStages to
	// backoffQ or activeQ. If this value is empty, the default value (5min)
	// will be used.
	DefaultStageMaxInUnschedulableStagesDuration = 5 * time.Minute
)

Variables

View Source
var (
	// AssignedStageAdd is the event when a stage is added that causes stages with matching affinity terms
	// to be more schedulable.
	AssignedStageAdd = framework.ClusterEvent{Resource: framework.Stage, ActionType: framework.Add, Label: "AssignedStageAdd"}
	// WorkerAdd is the event when a new worker is added to the cluster.
	WorkerAdd = framework.ClusterEvent{Resource: framework.Worker, ActionType: framework.Add, Label: "WorkerAdd"}
	// AssignedStageUpdate is the event when a stage is updated that causes stages with matching affinity
	// terms to be more schedulable.
	AssignedStageUpdate = framework.ClusterEvent{Resource: framework.Stage, ActionType: framework.Update, Label: "AssignedStageUpdate"}
	// AssignedStageDelete is the event when a stage is deleted that causes stages with matching affinity
	// terms to be more schedulable.
	AssignedStageDelete = framework.ClusterEvent{Resource: framework.Stage, ActionType: framework.Delete, Label: "AssignedStageDelete"}
	// UnschedulableTimeout is the event when a stage stays in unschedulable for longer than timeout.
	UnschedulableTimeout = framework.ClusterEvent{Resource: framework.WildCard, ActionType: framework.All, Label: "UnschedulableTimeout"}
	// WorkerStateChange is the event when worker label is changed.
	WorkerStateChange = framework.ClusterEvent{Resource: framework.Worker, ActionType: framework.Update, Label: "WorkerStateChange"}
)

Functions

func GetStageFullName

func GetStageFullName(stage *meta.Stage) string

func MakeNextStageFunc

func MakeNextStageFunc(queue SchedulingQueue) func() *framework.QueuedStageInfo

func MetaNamespaceKeyFunc

func MetaNamespaceKeyFunc(obj interface{}) (string, error)

func NewStageNominator

func NewStageNominator(stageLister v1.StageLister) framework.StageNominator

NewStageNominator creates a nominator as a backing of framework.StageNominator. A stageLister is passed in to check if the stage exists before adding its nominatedWorker info.

func NominatedWorkerName

func NominatedWorkerName(stage *meta.Stage) string

Types

type ExplicitKey

type ExplicitKey string

type Option

type Option func(*priorityQueueOptions)

Option configures a PriorityQueue

type PreEnqueueCheck

type PreEnqueueCheck func(stage *meta.Stage) bool

PreEnqueueCheck is a function type. It's used to build functions that run against a Stage and the caller can choose to enqueue or skip the Stage by the checking result.

type PriorityQueue

type PriorityQueue struct {
	// StageNominator abstracts the operations to maintain nominated Stages.
	framework.StageNominator
	// contains filtered or unexported fields
}

PriorityQueue implements a scheduling queue. The head of PriorityQueue is the highest priority pending stage. This structure has two sub queues and a additional data structure, namely: activeQ, backoffQ and unschedulableStages.

  • activeQ holds stages that are being considered for scheduling.
  • backoffQ holds stages that moved from unschedulableStages and will move to activeQ when their backoff periods complete.
  • unschedulableStages holds stages that were already attempted for scheduling and are currently determined to be unschedulable.

func NewPriorityQueue

func NewPriorityQueue(
	lessFn framework.LessFunc,
	informerFactory informers.SharedInformerFactory,
	opts ...Option,
) *PriorityQueue

NewPriorityQueue creates a PriorityQueue object.

func (*PriorityQueue) Activate

func (p *PriorityQueue) Activate(stages map[string]*meta.Stage)

func (*PriorityQueue) Add

func (p *PriorityQueue) Add(stage *meta.Stage) error

func (*PriorityQueue) AddUnschedulableIfNotPresent

func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedStageInfo, stageSchedulingCycle int64) error

func (*PriorityQueue) AssignedStageAdded

func (p *PriorityQueue) AssignedStageAdded(stage *meta.Stage)

func (*PriorityQueue) AssignedStageUpdated

func (p *PriorityQueue) AssignedStageUpdated(stage *meta.Stage)

func (*PriorityQueue) Close

func (p *PriorityQueue) Close()

func (*PriorityQueue) Delete

func (p *PriorityQueue) Delete(stage *meta.Stage) error

func (*PriorityQueue) MoveAllToActiveOrBackoffQueue

func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck)

func (*PriorityQueue) PendingStages

func (p *PriorityQueue) PendingStages() []*meta.Stage

func (*PriorityQueue) Pop

func (*PriorityQueue) Run

func (p *PriorityQueue) Run()

func (*PriorityQueue) SchedulingCycle

func (p *PriorityQueue) SchedulingCycle() int64

func (*PriorityQueue) Update

func (p *PriorityQueue) Update(oldStage, newStage *meta.Stage) error

type SchedulingQueue

type SchedulingQueue interface {
	framework.StageNominator
	Add(stage *meta.Stage) error
	// Activate moves the given stages to activeQ iff they're in unschedulableStages or backoffQ.
	// The passed-in stages are originally compiled from plugins that want to activate Stages,
	// by injecting the stages through a reserved CycleState struct (StagesToActivate).
	Activate(stages map[string]*meta.Stage)
	// AddUnschedulableIfNotPresent adds an unschedulable stage back to scheduling queue.
	// The stageSchedulingCycle represents the current scheduling cycle number which can be
	// returned by calling SchedulingCycle().
	AddUnschedulableIfNotPresent(stage *framework.QueuedStageInfo, stageSchedulingCycle int64) error
	// SchedulingCycle returns the current number of scheduling cycle which is
	// cached by scheduling queue. Normally, incrementing this number whenever
	// a stage is popped (e.g. called Pop()) is enough.
	SchedulingCycle() int64
	// Pop removes the head of the queue and returns it. It blocks if the
	// queue is empty and waits until a new item is added to the queue.
	Pop() (*framework.QueuedStageInfo, error)
	Update(oldStage, newStage *meta.Stage) error
	Delete(stage *meta.Stage) error
	MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck)
	AssignedStageAdded(stage *meta.Stage)
	AssignedStageUpdated(stage *meta.Stage)
	PendingStages() []*meta.Stage
	// Close closes the SchedulingQueue so that the goroutine which is
	// waiting to pop items can exit gracefully.
	Close()
	// Run starts the goroutines managing the queue.
	Run()
}

func NewSchedulingQueue

func NewSchedulingQueue(
	lessFn framework.LessFunc,
	informerFactory informers.SharedInformerFactory,
	opts ...Option) SchedulingQueue

NewSchedulingQueue initializes a priority queue as a new scheduling queue.

type UnschedulableStages

type UnschedulableStages struct {
	// contains filtered or unexported fields
}

UnschedulableStages holds stages that cannot be scheduled. This data structure is used to implement unschedulableStages.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL