1. 前言
轉(zhuǎn)載請說明原文出處, 尊重他人勞動成果!
本文將分析
kubernetes/pkg/scheduler/internal/queue
中的文件, 其中包括scheduling_queue.go
源碼位置: https://github.com/nicktming/kubernetes/blob/tming-v1.13/pkg/scheduler/internal/queue/scheduling_queue.go
分支: tming-v1.13 (基于v1.13版本)
2. SchedulingQueue (以PriorityQueue為例)
SchedulingQueue
是一個存著等待被調(diào)度的所有pods的數(shù)據(jù)結(jié)構(gòu), 也就是說所有沒有nodeName
的pod
都會被SchedulingQueue
管理.
SchedulingQueue
主要涉及三個重要的模塊:activeQ: 存著即將要被調(diào)度的
pods
unschedulableQ: 存著已經(jīng)試著調(diào)度但是沒有成功的
nominatedPods: 存著搶占資源而成功的pods
(因為搶占資源, 意味著正常調(diào)度失敗, 然后如果殺死某些優(yōu)先級低的pods
可以使得該pod
可以調(diào)度在某個節(jié)點上, 因此該pod
還沒有真正調(diào)度到那個被選中的節(jié)點上, 因此殺死那些優(yōu)先級低的pods
需要時間)
注意: 一個
pod
是不能同時出現(xiàn)在activeQ
和unschedulableQ
中的.
2.1 activeQ
首先介紹一下
activeQ
, 它是一個heap
組成的數(shù)據(jù)結(jié)構(gòu),heap
中的數(shù)據(jù)按照優(yōu)先級從高到低排序, 也就是說第一個出隊列的是優(yōu)先級最高的pod
.
// Heap is a producer/consumer queue that implements a heap data structure.
// It can be used to implement priority queues and similar data structures.
type Heap struct {
// data stores objects and has a queue that keeps their ordering according
// to the heap invariant.
data *heapData
}
Heap
是一個用堆實現(xiàn)的類似于生產(chǎn)者/消費者的隊列.
2.1.1 heapData
heapData
是一個實現(xiàn)heap
接口的數(shù)據(jù)結(jié)構(gòu), 就是一個真正的heap
.
// LessFunc is a function type to compare two objects.
type LessFunc func(interface{}, interface{}) bool
// KeyFunc is a function type to get the key from an object.
type KeyFunc func(obj interface{}) (string, error)
type heapItem struct {
obj interface{} // The object which is stored in the heap.
index int // The index of the object's key in the Heap.queue.
}
type itemKeyValue struct {
key string
obj interface{}
}
// heapData is an internal struct that implements the standard heap interface
// and keeps the data stored in the heap.
type heapData struct {
// items is a map from key of the objects to the objects and their index.
// We depend on the property that items in the map are in the queue and vice versa.
items map[string]*heapItem
// queue implements a heap data structure and keeps the order of elements
// according to the heap invariant. The queue keeps the keys of objects stored
// in "items".
queue []string
// keyFunc is used to make the key used for queued item insertion and retrieval, and
// should be deterministic.
keyFunc KeyFunc
// lessFunc is used to compare two objects in the heap.
lessFunc LessFunc
}
queue
數(shù)組是一個真正實現(xiàn)heap
的數(shù)組, 里面的數(shù)據(jù)是根據(jù)keyFunc
生成的key
.
// Less compares two objects and returns true if the first one should go
// in front of the second one in the heap.
func (h *heapData) Less(i, j int) bool {
if i > len(h.queue) || j > len(h.queue) {
return false
}
itemi, ok := h.items[h.queue[i]]
if !ok {
return false
}
itemj, ok := h.items[h.queue[j]]
if !ok {
return false
}
return h.lessFunc(itemi.obj, itemj.obj)
}
// Len returns the number of items in the Heap.
func (h *heapData) Len() int { return len(h.queue) }
// Swap implements swapping of two elements in the heap. This is a part of standard
// heap interface and should never be called directly.
func (h *heapData) Swap(i, j int) {
h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
item := h.items[h.queue[i]]
item.index = i
item = h.items[h.queue[j]]
item.index = j
}
// Push is supposed to be called by heap.Push only.
func (h *heapData) Push(kv interface{}) {
keyValue := kv.(*itemKeyValue)
n := len(h.queue)
h.items[keyValue.key] = &heapItem{keyValue.obj, n}
h.queue = append(h.queue, keyValue.key)
}
// Pop is supposed to be called by heap.Pop only.
func (h *heapData) Pop() interface{} {
key := h.queue[len(h.queue)-1]
h.queue = h.queue[0 : len(h.queue)-1]
item, ok := h.items[key]
if !ok {
// This is an error
return nil
}
delete(h.items, key)
return item.obj
}
都是一些基本的操作, 就不多說了.
2.1.2 Heap
上面已經(jīng)說了
Heap
是一個用堆實現(xiàn)的類似于生產(chǎn)者/消費者的隊列. 所以就看一下它的Add
和pop
方法就行.
// 1. 計算key
// 2. 根據(jù)item的map結(jié)構(gòu)檢查該obj是否存在
// 3. 如果存在 則更新該obj 并重新調(diào)整堆結(jié)構(gòu)
// 4. 如果不存在 則添加該obj
func (h *Heap) Add(obj interface{}) error {
key, err := h.data.keyFunc(obj)
if err != nil {
return cache.KeyError{Obj: obj, Err: err}
}
if _, exists := h.data.items[key]; exists {
h.data.items[key].obj = obj
heap.Fix(h.data, h.data.items[key].index)
} else {
heap.Push(h.data, &itemKeyValue{key, obj})
}
return nil
}
- 計算key
- 根據(jù)item的map結(jié)構(gòu)檢查該obj是否存在
- 如果存在 則更新該obj 并重新調(diào)整堆結(jié)構(gòu)
- 如果不存在 則添加該obj
// Pop returns the head of the heap.
func (h *Heap) Pop() (interface{}, error) {
obj := heap.Pop(h.data)
if obj != nil {
return obj, nil
}
return nil, fmt.Errorf("object was removed from heap data")
}
Pop
則直接返回堆的頭
2.1.3 總結(jié)
所以
activeQ
是這樣的一個數(shù)據(jù)結(jié)構(gòu).
// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(stop <-chan struct{}) *PriorityQueue {
pq := &PriorityQueue{
clock: util.RealClock{},
stop: stop,
activeQ: newHeap(cache.MetaNamespaceKeyFunc, activeQComp),
unschedulableQ: newUnschedulablePodsMap(),
nominatedPods: newNominatedPodMap(),
moveRequestCycle: -1,
}
pq.cond.L = &pq.lock
pq.run()
return pq
}
// newHeap returns a Heap which can be used to queue up items to process.
func newHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
return &Heap{
data: &heapData{
items: map[string]*heapItem{},
queue: []string{},
keyFunc: keyFn,
lessFunc: lessFn,
},
}
}
MetaNamespaceKeyFunc
基本上就是pod_namespace/pod_name
, 而activeQComp
就是heap
的比較方法. 按照優(yōu)先級大小進行比較, 如果優(yōu)先級一樣, 就按最后一次被調(diào)度的時間.
func podTimestamp(pod *v1.Pod) *metav1.Time {
_, condition := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
if condition == nil {
return &pod.CreationTimestamp
}
if condition.LastProbeTime.IsZero() {
return &condition.LastTransitionTime
}
return &condition.LastProbeTime
}
// activeQComp is the function used by the activeQ heap algorithm to sort pods.
// It sorts pods based on their priority. When priorities are equal, it uses
// podTimestamp.
func activeQComp(pod1, pod2 interface{}) bool {
p1 := pod1.(*v1.Pod)
p2 := pod2.(*v1.Pod)
prio1 := util.GetPodPriority(p1)
prio2 := util.GetPodPriority(p2)
return (prio1 > prio2) || (prio1 == prio2 && podTimestamp(p1).Before(podTimestamp(p2)))
}
3. unschedulableQ
接下來看一下
unschedulableQ
的數(shù)據(jù)結(jié)構(gòu)以及它的一些方法.
// UnschedulablePodsMap 就是一個Map結(jié)構(gòu)
// key為keyFunc計算出來的key
// value就是對應(yīng)的pod
type UnschedulablePodsMap struct {
// pods is a map key by a pod's full-name and the value is a pointer to the pod.
pods map[string]*v1.Pod
keyFunc func(*v1.Pod) string
}
可以看到
UnschedulablePodsMap
是一個標準的Map
結(jié)構(gòu),key
是由keyFunc
方法計算出來的key
,value
為對應(yīng)的pod
.
3.1 方法
由于此結(jié)構(gòu)比較簡單, 所以它的方法包括
addOrUpdate
,delete
等等都是對map
進行直接操作.
// 添加或者更新
func (u *UnschedulablePodsMap) addOrUpdate(pod *v1.Pod) {
u.pods[u.keyFunc(pod)] = pod
}
// Delete deletes a pod from the unschedulable pods.
// 刪除
func (u *UnschedulablePodsMap) delete(pod *v1.Pod) {
delete(u.pods, u.keyFunc(pod))
}
4. nominatedPodMap
nominatedPodMap
用于存儲那些搶占成功的pods
. 下面這段話的大致意思就是該pod
現(xiàn)在的nominatedNodeName
是有可能與最后真正運行到的節(jié)點不一樣的, 因此需要有結(jié)構(gòu)存儲并且可以進行操作.
// nominatedPodMap is a structure that stores pods nominated to run on nodes.
// It exists because nominatedNodeName of pod objects stored in the structure
// may be different than what scheduler has here. We should be able to find pods
// by their UID and update/delete them.
nominatedPodMap的結(jié)構(gòu)如下:
type nominatedPodMap struct {
// 這些pods可能在activeQ或者unschedulableQ中
// nodeName -> pods
nominatedPods map[string][]*v1.Pod
// pod_UID -> nodeName
nominatedPodToNode map[ktypes.UID]string
}
func newNominatedPodMap() *nominatedPodMap {
return &nominatedPodMap{
nominatedPods: make(map[string][]*v1.Pod),
nominatedPodToNode: make(map[ktypes.UID]string),
}
}
4.1 對應(yīng)的方法
func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) {
// 無論是否存在 先刪除
npm.delete(p)
nnn := nodeName
if len(nnn) == 0 {
nnn = NominatedNodeName(p)
if len(nnn) == 0 {
return
}
}
// 1. 如果nodeName和pod.Status.NominatedNodeName都為空 直接返回
// 2. 如果nodeName不為空 nnn = nodeName 否則 nnn = pod.Status.NominatedNodeName
npm.nominatedPodToNode[p.UID] = nnn
for _, np := range npm.nominatedPods[nnn] {
if np.UID == p.UID {
klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", p.Namespace, p.Name)
return
}
}
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], p)
}
// 如果該pod存在nominatedPodMap中 就刪除
// 不存在 就直接返回
func (npm *nominatedPodMap) delete(p *v1.Pod) {
nnn, ok := npm.nominatedPodToNode[p.UID]
if !ok {
return
}
for i, np := range npm.nominatedPods[nnn] {
if np.UID == p.UID {
npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)
if len(npm.nominatedPods[nnn]) == 0 {
delete(npm.nominatedPods, nnn)
}
break
}
}
delete(npm.nominatedPodToNode, p.UID)
}
func (npm *nominatedPodMap) update(oldPod, newPod *v1.Pod) {
// We update irrespective of the nominatedNodeName changed or not, to ensure
// that pod pointer is updated.
npm.delete(oldPod)
npm.add(newPod, "")
}
// 取該節(jié)點下所有nominated Pods
func (npm *nominatedPodMap) podsForNode(nodeName string) []*v1.Pod {
if list, ok := npm.nominatedPods[nodeName]; ok {
return list
}
return nil
}