queue

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2019 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var PriorityQueueClosedError = errors.New(queueClosed)

used to check error to stop scheduling goroutine

View Source
var PriorityQueueDeletedError = errors.New("queue deleted")

Functions

func MakeNextPodFunc

func MakeNextPodFunc(queues SchedulingPoolQueue, schedulerCache schedulerinternalcache.Cache) func(string) (*v1.Pod, error)

MakeNextPodFunc returns a function to retrieve the next pod from a given scheduling queue

func NominatedNodeName

func NominatedNodeName(pod *v1.Pod) string

NominatedNodeName returns nominated node name of a Pod.

Types

type FIFO

type FIFO struct {
	*cache.FIFO
}

FIFO is basically a simple wrapper around cache.FIFO to make it compatible with the SchedulingQueue interface.

func NewFIFO

func NewFIFO() *FIFO

NewFIFO creates a FIFO object.

func (*FIFO) Add

func (f *FIFO) Add(pod *v1.Pod) error

Add adds a pod to the FIFO.

func (*FIFO) AddIfNotPresent

func (f *FIFO) AddIfNotPresent(pod *v1.Pod) error

AddIfNotPresent adds a pod to the FIFO if it is absent in the FIFO.

func (*FIFO) AddUnschedulableIfNotPresent

func (f *FIFO) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error

AddUnschedulableIfNotPresent adds an unschedulable pod back to the queue. In FIFO it is added to the end of the queue.

func (*FIFO) AssignedPodAdded

func (f *FIFO) AssignedPodAdded(pod *v1.Pod)

AssignedPodAdded does nothing here.

func (*FIFO) AssignedPodUpdated

func (f *FIFO) AssignedPodUpdated(pod *v1.Pod)

AssignedPodUpdated does nothing here.

func (*FIFO) Close

func (f *FIFO) Close()

Close closes the FIFO queue.

func (*FIFO) Delete

func (f *FIFO) Delete(pod *v1.Pod) error

Delete deletes a pod in the FIFO.

func (*FIFO) DeleteNominatedPodIfExists

func (f *FIFO) DeleteNominatedPodIfExists(pod *v1.Pod)

DeleteNominatedPodIfExists does nothing in FIFO.

func (*FIFO) MoveAllToActiveQueue

func (f *FIFO) MoveAllToActiveQueue()

MoveAllToActiveQueue does nothing in FIFO as all pods are always in the active queue.

func (*FIFO) NominatedPodsForNode

func (f *FIFO) NominatedPodsForNode(nodeName string) []*v1.Pod

NominatedPodsForNode returns pods that are nominated to run on the given node, but FIFO does not support it.

func (*FIFO) NumActivePods added in v0.3.0

func (f *FIFO) NumActivePods() int

NumActivePods returns the number of active pods exist in the SchedulingQueue.

func (*FIFO) NumBackoffPods added in v0.3.0

func (f *FIFO) NumBackoffPods() int

NumBackoffPods returns the number of backoff pods exist in the SchedulingQueue.

func (*FIFO) NumUnschedulablePods

func (f *FIFO) NumUnschedulablePods() int

NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.

func (*FIFO) PendingPods

func (f *FIFO) PendingPods() []*v1.Pod

PendingPods returns all the pods in the queue.

func (*FIFO) Pop

func (f *FIFO) Pop() (*v1.Pod, error)

Pop removes the head of FIFO and returns it. This is just a copy/paste of cache.Pop(queue Queue) from fifo.go that scheduler has always been using. There is a comment in that file saying that this method shouldn't be used in production code, but scheduler has always been using it. This function does minimal error checking.

func (*FIFO) SchedulingCycle

func (f *FIFO) SchedulingCycle() int64

SchedulingCycle implements SchedulingQueue.SchedulingCycle interface.

func (*FIFO) Update

func (f *FIFO) Update(oldPod, newPod *v1.Pod) error

Update updates a pod in the FIFO.

func (*FIFO) UpdateNominatedPodForNode

func (f *FIFO) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)

UpdateNominatedPodForNode does nothing in FIFO.

type PoolDeletedError

type PoolDeletedError error

type PoolQueue

type PoolQueue struct {
	// contains filtered or unexported fields
}

func NewPoolQueue

func NewPoolQueue(stopCh <-chan struct{}) *PoolQueue

NewPoolQueue

func (*PoolQueue) Add

func (pq *PoolQueue) Add(pod *v1.Pod) error

func (*PoolQueue) AddIfNotPresent

func (pq *PoolQueue) AddIfNotPresent(pod *v1.Pod) error

func (*PoolQueue) AddQueue

func (pq *PoolQueue) AddQueue(poolName string, stopCh <-chan struct{}) (SchedulingQueue, error)

AddPoolQ

func (*PoolQueue) AssignedPodAdded

func (pq *PoolQueue) AssignedPodAdded(pod *v1.Pod)

func (*PoolQueue) AssignedPodUpdated

func (pq *PoolQueue) AssignedPodUpdated(pod *v1.Pod)

func (*PoolQueue) Close

func (pq *PoolQueue) Close()

func (*PoolQueue) Delete

func (pq *PoolQueue) Delete(pod *v1.Pod) error

Delete as pod

func (*PoolQueue) GetPoolQueueNameIfNotPresent

func (pq *PoolQueue) GetPoolQueueNameIfNotPresent(pod *v1.Pod) string

getPoolQueueNameIfNotPresent return pool name by pod annotations, if not found return default name

func (*PoolQueue) GetQueue

func (pq *PoolQueue) GetQueue(poolName string) (SchedulingQueue, error)

func (*PoolQueue) HasSelfPoolPendingPods added in v0.2.3

func (pq *PoolQueue) HasSelfPoolPendingPods(poolName string) bool

HasSelfPoolPendingPods check pool has their self pod to schedule

func (*PoolQueue) Metrics added in v0.2.4

func (pq *PoolQueue) Metrics()

func (*PoolQueue) MoveAllBorrowingPodsToSelfQueue added in v0.2.0

func (pq *PoolQueue) MoveAllBorrowingPodsToSelfQueue(poolName string)

MoveAllBorrowingPodsToSelfQueue reject all pods that are borrowing pool

func (*PoolQueue) MoveAllToActiveQueue

func (pq *PoolQueue) MoveAllToActiveQueue()

func (*PoolQueue) MoveAllToActiveQueueIn

func (pq *PoolQueue) MoveAllToActiveQueueIn(poolName string)

func (*PoolQueue) NominatedPodsForNode

func (pq *PoolQueue) NominatedPodsForNode(nodeName string) []*v1.Pod

func (*PoolQueue) NumQueues

func (pq *PoolQueue) NumQueues() int

NumQueues return the len of queues

func (*PoolQueue) NumUnschedulablePods

func (pq *PoolQueue) NumUnschedulablePods() int

func (*PoolQueue) NumUnschedulablePodsIn

func (pq *PoolQueue) NumUnschedulablePodsIn(poolName string) int

func (*PoolQueue) PendingPods

func (pq *PoolQueue) PendingPods() []*v1.Pod

func (*PoolQueue) Queues

func (pq *PoolQueue) Queues() map[string]SchedulingQueue

func (*PoolQueue) RemoveQueue

func (pq *PoolQueue) RemoveQueue(poolName string)

func (*PoolQueue) SetQueue

func (pq *PoolQueue) SetQueue(poolName string, queue SchedulingQueue)

JUST for test

func (*PoolQueue) Update

func (pq *PoolQueue) Update(oldPod, newPod *v1.Pod) error

Update

type PriorityQueue

type PriorityQueue struct {
	// contains filtered or unexported fields
}

PriorityQueue implements a scheduling queue. It is an alternative to FIFO. The head of PriorityQueue is the highest priority pending pod. This structure has three sub queues. One sub-queue holds pods that are being considered for scheduling. This is called activeQ and is a Heap. Another queue holds pods that are already tried and are determined to be unschedulable. The latter is called unschedulableQ. The third queue holds pods that are moved from unschedulable queues and will be moved to active queue when backoff are completed.

func NewPriorityQueue

func NewPriorityQueue(stop <-chan struct{}) *PriorityQueue

NewPriorityQueue creates a PriorityQueue object.

func NewPriorityQueueWithClock

func NewPriorityQueueWithClock(stop <-chan struct{}, clock util.Clock) *PriorityQueue

func NewPriorityQueueWithLessFunc

func NewPriorityQueueWithLessFunc(lessFunc util.LessFunc, stop <-chan struct{}) *PriorityQueue

NewPriorityQueueWithLessFunc creates a PriorityQueue object.

func NewPriorityQueueWithLessFuncAndClock

func NewPriorityQueueWithLessFuncAndClock(lessFunc util.LessFunc, stop <-chan struct{}, clock util.Clock) *PriorityQueue

NewPriorityQueueWithClock creates a PriorityQueue which uses the passed clock for time.

func (*PriorityQueue) Add

func (p *PriorityQueue) Add(pod *v1.Pod) error

Add adds a pod to the active queue. It should be called only when a new pod is added so there is no chance the pod is already in active/unschedulable/backoff queues

func (*PriorityQueue) AddIfNotPresent

func (p *PriorityQueue) AddIfNotPresent(pod *v1.Pod) error

AddIfNotPresent adds a pod to the active queue if it is not present in any of the queues. If it is present in any, it doesn't do any thing.

func (*PriorityQueue) AddUnschedulableIfNotPresent

func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error

AddUnschedulableIfNotPresent inserts a pod that cannot be scheduled into the queue, unless it is already in the queue. Normally, PriorityQueue puts unschedulable pods in `unschedulableQ`. But if there has been a recent move request, then the pod is put in `podBackoffQ`.

func (*PriorityQueue) AssignedPodAdded

func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod)

AssignedPodAdded is called when a bound pod is added. Creation of this pod may make pending pods with matching affinity terms schedulable.

func (*PriorityQueue) AssignedPodUpdated

func (p *PriorityQueue) AssignedPodUpdated(pod *v1.Pod)

AssignedPodUpdated is called when a bound pod is updated. Change of labels may make pending pods with matching affinity terms schedulable.

func (*PriorityQueue) Close

func (p *PriorityQueue) Close()

Close closes the priority queue.

func (*PriorityQueue) Delete

func (p *PriorityQueue) Delete(pod *v1.Pod) error

Delete deletes the item from either of the two queues. It assumes the pod is only in one queue.

func (*PriorityQueue) DeleteNominatedPodIfExists

func (p *PriorityQueue) DeleteNominatedPodIfExists(pod *v1.Pod)

DeleteNominatedPodIfExists deletes pod nominatedPods.

func (*PriorityQueue) MoveAllToActiveQueue

func (p *PriorityQueue) MoveAllToActiveQueue()

MoveAllToActiveQueue moves all pods from unschedulableQ to activeQ. This function adds all pods and then signals the condition variable to ensure that if Pop() is waiting for an item, it receives it after all the pods are in the queue and the head is the highest priority pod.

func (*PriorityQueue) NominatedPodsForNode

func (p *PriorityQueue) NominatedPodsForNode(nodeName string) []*v1.Pod

NominatedPodsForNode returns pods that are nominated to run on the given node, but they are waiting for other pods to be removed from the node before they can be actually scheduled.

func (*PriorityQueue) NumActivePods added in v0.3.0

func (p *PriorityQueue) NumActivePods() int

NumActivePods returns the number of active pods exist in the SchedulingQueue.

func (*PriorityQueue) NumBackoffPods added in v0.3.0

func (p *PriorityQueue) NumBackoffPods() int

NumBackoffPods returns the number of backoff pods exist in the SchedulingQueue.

func (*PriorityQueue) NumUnschedulablePods

func (p *PriorityQueue) NumUnschedulablePods() int

NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.

func (*PriorityQueue) PendingPods

func (p *PriorityQueue) PendingPods() []*v1.Pod

PendingPods returns all the pending pods in the queue. This function is used for debugging purposes in the scheduler cache dumper and comparer.

func (*PriorityQueue) Pop

func (p *PriorityQueue) Pop() (*v1.Pod, error)

Pop removes the head of the active queue and returns it. It blocks if the activeQ is empty and waits until a new item is added to the queue. It increments scheduling cycle when a pod is popped.

func (*PriorityQueue) SchedulingCycle

func (p *PriorityQueue) SchedulingCycle() int64

SchedulingCycle returns current scheduling cycle.

func (*PriorityQueue) Update

func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error

Update updates a pod in the active or backoff queue if present. Otherwise, it removes the item from the unschedulable queue if pod is updated in a way that it may become schedulable and adds the updated one to the active queue. If pod is not present in any of the queues, it is added to the active queue.

func (*PriorityQueue) UpdateNominatedPodForNode

func (p *PriorityQueue) UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)

UpdateNominatedPodForNode adds a pod to the nominated pods of the given node. This is called during the preemption process after a node is nominated to run the pod. We update the structure before sending a request to update the pod object to avoid races with the following scheduling cycles.

type SchedulingPoolQueue

type SchedulingPoolQueue interface {
	GetQueue(poolName string) (SchedulingQueue, error)
	AddQueue(poolName string, stopCh <-chan struct{}) (SchedulingQueue, error)
	RemoveQueue(poolName string)
	Add(pod *v1.Pod) error
	AddIfNotPresent(pod *v1.Pod) error
	Delete(pod *v1.Pod) error
	Update(oldPod, newPod *v1.Pod) error
	NumQueues() int
	Queues() map[string]SchedulingQueue
	NumUnschedulablePods() int
	NumUnschedulablePodsIn(poolName string) int
	MoveAllToActiveQueue()
	MoveAllToActiveQueueIn(poolName string)
	AssignedPodAdded(pod *v1.Pod)
	AssignedPodUpdated(pod *v1.Pod)
	PendingPods() []*v1.Pod
	NominatedPodsForNode(nodeName string) []*v1.Pod

	GetPoolQueueNameIfNotPresent(pod *v1.Pod) string
	MoveAllBorrowingPodsToSelfQueue(poolName string)
	HasSelfPoolPendingPods(poolName string) bool
	Metrics()
	Close()
}

type SchedulingQueue

type SchedulingQueue interface {
	Add(pod *v1.Pod) error
	AddIfNotPresent(pod *v1.Pod) error
	// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
	// The podSchedulingCycle represents the current scheduling cycle number which can be
	// returned by calling SchedulingCycle().
	AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error
	// SchedulingCycle returns the current number of scheduling cycle which is
	// cached by scheduling queue. Normally, incrementing this number whenever
	// a pod is popped (e.g. called Pop()) is enough.
	SchedulingCycle() int64
	// Pop removes the head of the queue and returns it. It blocks if the
	// queue is empty and waits until a new item is added to the queue.
	Pop() (*v1.Pod, error)
	Update(oldPod, newPod *v1.Pod) error
	Delete(pod *v1.Pod) error
	MoveAllToActiveQueue()
	AssignedPodAdded(pod *v1.Pod)
	AssignedPodUpdated(pod *v1.Pod)
	NominatedPodsForNode(nodeName string) []*v1.Pod
	PendingPods() []*v1.Pod
	// Close closes the SchedulingQueue so that the goroutine which is
	// waiting to pop items can exit gracefully.
	Close()
	// UpdateNominatedPodForNode adds the given pod to the nominated pod map or
	// updates it if it already exists.
	UpdateNominatedPodForNode(pod *v1.Pod, nodeName string)
	// DeleteNominatedPodIfExists deletes nominatedPod from internal cache
	DeleteNominatedPodIfExists(pod *v1.Pod)
	// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
	NumUnschedulablePods() int
	// NumActivePods returns the number of active pods exist in the SchedulingQueue.
	NumActivePods() int
	// NumBackoffPods returns the number of backoff pods exist in the SchedulingQueue.
	NumBackoffPods() int
}

SchedulingQueue is an interface for a queue to store pods waiting to be scheduled. The interface follows a pattern similar to cache.FIFO and cache.Heap and makes it easy to use those data structures as a SchedulingQueue.

func NewSchedulingQueue

func NewSchedulingQueue(stop <-chan struct{}) SchedulingQueue

func NewSchedulingQueueWithLessFunc

func NewSchedulingQueueWithLessFunc(activeQComp util.LessFunc, stop <-chan struct{}) SchedulingQueue

NewSchedulingQueue initializes a new scheduling queue. If pod priority is enabled a priority queue is returned. If it is disabled, a FIFO is returned.

type UnschedulablePodsMap

type UnschedulablePodsMap struct {
	// contains filtered or unexported fields
}

UnschedulablePodsMap holds pods that cannot be scheduled. This data structure is used to implement unschedulableQ.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL