[k8s源碼分析][kube-scheduler]scheduler/internal/queue之優(yōu)先隊列scheduling_queue(1)

1. 前言

轉(zhuǎn)載請說明原文出處, 尊重他人勞動成果!

本文將分析kubernetes/pkg/scheduler/internal/queue中的文件, 其中包括scheduling_queue.go
源碼位置: https://github.com/nicktming/kubernetes/blob/tming-v1.13/pkg/scheduler/internal/queue/scheduling_queue.go
分支: tming-v1.13 (基于v1.13版本)

2. SchedulingQueue (以PriorityQueue為例)

SchedulingQueue 是一個存著等待被調(diào)度的所有pods的數(shù)據(jù)結(jié)構(gòu), 也就是說所有沒有nodeNamepod都會被SchedulingQueue管理.

SchedulingQueue 主要涉及三個重要的模塊:

activeQ: 存著即將要被調(diào)度的pods
unschedulableQ: 存著已經(jīng)試著調(diào)度但是沒有成功的
nominatedPods: 存著搶占資源而成功的pods (因為搶占資源, 意味著正常調(diào)度失敗, 然后如果殺死某些優(yōu)先級低的pods可以使得該pod可以調(diào)度在某個節(jié)點上, 因此該pod還沒有真正調(diào)度到那個被選中的節(jié)點上, 因此殺死那些優(yōu)先級低的pods需要時間)

注意: 一個pod是不能同時出現(xiàn)在activeQunschedulableQ中的.

2.1 activeQ

首先介紹一下activeQ, 它是一個heap組成的數(shù)據(jù)結(jié)構(gòu), heap中的數(shù)據(jù)按照優(yōu)先級從高到低排序, 也就是說第一個出隊列的是優(yōu)先級最高的pod.

// Heap is a producer/consumer queue that implements a heap data structure.
// It can be used to implement priority queues and similar data structures.
type Heap struct {
    // data stores objects and has a queue that keeps their ordering according
    // to the heap invariant.
    data *heapData
}

Heap是一個用堆實現(xiàn)的類似于生產(chǎn)者/消費者的隊列.

2.1.1 heapData

heapData是一個實現(xiàn)heap接口的數(shù)據(jù)結(jié)構(gòu), 就是一個真正的heap.

// LessFunc is a function type to compare two objects.
type LessFunc func(interface{}, interface{}) bool

// KeyFunc is a function type to get the key from an object.
type KeyFunc func(obj interface{}) (string, error)

type heapItem struct {
    obj   interface{} // The object which is stored in the heap.
    index int         // The index of the object's key in the Heap.queue.
}

type itemKeyValue struct {
    key string
    obj interface{}
}

// heapData is an internal struct that implements the standard heap interface
// and keeps the data stored in the heap.
type heapData struct {
    // items is a map from key of the objects to the objects and their index.
    // We depend on the property that items in the map are in the queue and vice versa.
    items map[string]*heapItem
    // queue implements a heap data structure and keeps the order of elements
    // according to the heap invariant. The queue keeps the keys of objects stored
    // in "items".
    queue []string

    // keyFunc is used to make the key used for queued item insertion and retrieval, and
    // should be deterministic.
    keyFunc KeyFunc
    // lessFunc is used to compare two objects in the heap.
    lessFunc LessFunc
}

queue數(shù)組是一個真正實現(xiàn)heap的數(shù)組, 里面的數(shù)據(jù)是根據(jù)keyFunc生成的key.

// Less compares two objects and returns true if the first one should go
// in front of the second one in the heap.
func (h *heapData) Less(i, j int) bool {
    if i > len(h.queue) || j > len(h.queue) {
        return false
    }
    itemi, ok := h.items[h.queue[i]]
    if !ok {
        return false
    }
    itemj, ok := h.items[h.queue[j]]
    if !ok {
        return false
    }
    return h.lessFunc(itemi.obj, itemj.obj)
}

// Len returns the number of items in the Heap.
func (h *heapData) Len() int { return len(h.queue) }

// Swap implements swapping of two elements in the heap. This is a part of standard
// heap interface and should never be called directly.
func (h *heapData) Swap(i, j int) {
    h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
    item := h.items[h.queue[i]]
    item.index = i
    item = h.items[h.queue[j]]
    item.index = j
}

// Push is supposed to be called by heap.Push only.
func (h *heapData) Push(kv interface{}) {
    keyValue := kv.(*itemKeyValue)
    n := len(h.queue)
    h.items[keyValue.key] = &heapItem{keyValue.obj, n}
    h.queue = append(h.queue, keyValue.key)
}

// Pop is supposed to be called by heap.Pop only.
func (h *heapData) Pop() interface{} {
    key := h.queue[len(h.queue)-1]
    h.queue = h.queue[0 : len(h.queue)-1]
    item, ok := h.items[key]
    if !ok {
        // This is an error
        return nil
    }
    delete(h.items, key)
    return item.obj
}

都是一些基本的操作, 就不多說了.

2.1.2 Heap

上面已經(jīng)說了Heap是一個用堆實現(xiàn)的類似于生產(chǎn)者/消費者的隊列. 所以就看一下它的Addpop方法就行.

// 1. 計算key
// 2. 根據(jù)item的map結(jié)構(gòu)檢查該obj是否存在
// 3. 如果存在 則更新該obj 并重新調(diào)整堆結(jié)構(gòu)
// 4. 如果不存在 則添加該obj
func (h *Heap) Add(obj interface{}) error {
    key, err := h.data.keyFunc(obj)
    if err != nil {
        return cache.KeyError{Obj: obj, Err: err}
    }
    if _, exists := h.data.items[key]; exists {
        h.data.items[key].obj = obj
        heap.Fix(h.data, h.data.items[key].index)
    } else {
        heap.Push(h.data, &itemKeyValue{key, obj})
    }
    return nil
}
  1. 計算key
  2. 根據(jù)item的map結(jié)構(gòu)檢查該obj是否存在
  3. 如果存在 則更新該obj 并重新調(diào)整堆結(jié)構(gòu)
  4. 如果不存在 則添加該obj
// Pop returns the head of the heap.
func (h *Heap) Pop() (interface{}, error) {
    obj := heap.Pop(h.data)
    if obj != nil {
        return obj, nil
    }
    return nil, fmt.Errorf("object was removed from heap data")
}

Pop則直接返回堆的頭

2.1.3 總結(jié)

所以activeQ是這樣的一個數(shù)據(jù)結(jié)構(gòu).

// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(stop <-chan struct{}) *PriorityQueue {
    pq := &PriorityQueue{
        clock:            util.RealClock{},
        stop:             stop,
        activeQ:          newHeap(cache.MetaNamespaceKeyFunc, activeQComp),
        unschedulableQ:   newUnschedulablePodsMap(),
        nominatedPods:    newNominatedPodMap(),
        moveRequestCycle: -1,
    }
    pq.cond.L = &pq.lock

    pq.run()
    return pq
}

// newHeap returns a Heap which can be used to queue up items to process.
func newHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
    return &Heap{
        data: &heapData{
            items:    map[string]*heapItem{},
            queue:    []string{},
            keyFunc:  keyFn,
            lessFunc: lessFn,
        },
    }
}

MetaNamespaceKeyFunc基本上就是pod_namespace/pod_name, 而activeQComp就是heap的比較方法. 按照優(yōu)先級大小進行比較, 如果優(yōu)先級一樣, 就按最后一次被調(diào)度的時間.

func podTimestamp(pod *v1.Pod) *metav1.Time {
    _, condition := podutil.GetPodCondition(&pod.Status, v1.PodScheduled)
    if condition == nil {
        return &pod.CreationTimestamp
    }
    if condition.LastProbeTime.IsZero() {
        return &condition.LastTransitionTime
    }
    return &condition.LastProbeTime
}

// activeQComp is the function used by the activeQ heap algorithm to sort pods.
// It sorts pods based on their priority. When priorities are equal, it uses
// podTimestamp.
func activeQComp(pod1, pod2 interface{}) bool {
    p1 := pod1.(*v1.Pod)
    p2 := pod2.(*v1.Pod)
    prio1 := util.GetPodPriority(p1)
    prio2 := util.GetPodPriority(p2)
    return (prio1 > prio2) || (prio1 == prio2 && podTimestamp(p1).Before(podTimestamp(p2)))
}

3. unschedulableQ

接下來看一下unschedulableQ的數(shù)據(jù)結(jié)構(gòu)以及它的一些方法.

// UnschedulablePodsMap 就是一個Map結(jié)構(gòu)
// key為keyFunc計算出來的key 
// value就是對應(yīng)的pod
type UnschedulablePodsMap struct {
    // pods is a map key by a pod's full-name and the value is a pointer to the pod.
    pods    map[string]*v1.Pod
    keyFunc func(*v1.Pod) string
}

可以看到UnschedulablePodsMap是一個標準的Map結(jié)構(gòu), key是由keyFunc方法計算出來的key, value為對應(yīng)的pod.

3.1 方法

由于此結(jié)構(gòu)比較簡單, 所以它的方法包括addOrUpdate, delete 等等都是對map進行直接操作.

// 添加或者更新
func (u *UnschedulablePodsMap) addOrUpdate(pod *v1.Pod) {
    u.pods[u.keyFunc(pod)] = pod
}

// Delete deletes a pod from the unschedulable pods.
// 刪除
func (u *UnschedulablePodsMap) delete(pod *v1.Pod) {
    delete(u.pods, u.keyFunc(pod))
}

4. nominatedPodMap

nominatedPodMap用于存儲那些搶占成功的pods. 下面這段話的大致意思就是該pod現(xiàn)在的nominatedNodeName是有可能與最后真正運行到的節(jié)點不一樣的, 因此需要有結(jié)構(gòu)存儲并且可以進行操作.

// nominatedPodMap is a structure that stores pods nominated to run on nodes.
// It exists because nominatedNodeName of pod objects stored in the structure
// may be different than what scheduler has here. We should be able to find pods
// by their UID and update/delete them.

nominatedPodMap的結(jié)構(gòu)如下:

type nominatedPodMap struct {
    // 這些pods可能在activeQ或者unschedulableQ中
    // nodeName -> pods
    nominatedPods map[string][]*v1.Pod
    // pod_UID -> nodeName
    nominatedPodToNode map[ktypes.UID]string
}

func newNominatedPodMap() *nominatedPodMap {
    return &nominatedPodMap{
        nominatedPods:      make(map[string][]*v1.Pod),
        nominatedPodToNode: make(map[ktypes.UID]string),
    }
}

4.1 對應(yīng)的方法

func (npm *nominatedPodMap) add(p *v1.Pod, nodeName string) {
    // 無論是否存在 先刪除
    npm.delete(p)

    nnn := nodeName
    if len(nnn) == 0 {
        nnn = NominatedNodeName(p)
        if len(nnn) == 0 {
            return
        }
    }
    // 1. 如果nodeName和pod.Status.NominatedNodeName都為空 直接返回
    // 2. 如果nodeName不為空 nnn = nodeName 否則 nnn = pod.Status.NominatedNodeName
    
    npm.nominatedPodToNode[p.UID] = nnn
    for _, np := range npm.nominatedPods[nnn] {
        if np.UID == p.UID {
            klog.V(4).Infof("Pod %v/%v already exists in the nominated map!", p.Namespace, p.Name)
            return
        }
    }
    npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], p)
}

// 如果該pod存在nominatedPodMap中 就刪除
// 不存在 就直接返回
func (npm *nominatedPodMap) delete(p *v1.Pod) {
    nnn, ok := npm.nominatedPodToNode[p.UID]
    if !ok {
        return
    }
    for i, np := range npm.nominatedPods[nnn] {
        if np.UID == p.UID {
            npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)
            if len(npm.nominatedPods[nnn]) == 0 {
                delete(npm.nominatedPods, nnn)
            }
            break
        }
    }
    delete(npm.nominatedPodToNode, p.UID)
}
func (npm *nominatedPodMap) update(oldPod, newPod *v1.Pod) {
    // We update irrespective of the nominatedNodeName changed or not, to ensure
    // that pod pointer is updated.
    npm.delete(oldPod)
    npm.add(newPod, "")
}
// 取該節(jié)點下所有nominated Pods
func (npm *nominatedPodMap) podsForNode(nodeName string) []*v1.Pod {
    if list, ok := npm.nominatedPods[nodeName]; ok {
        return list
    }
    return nil
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子虐块,更是在濱河造成了極大的恐慌,老刑警劉巖渗钉,帶你破解...
    沈念sama閱讀 217,734評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件赚哗,死亡現(xiàn)場離奇詭異蟋字,居然都是意外死亡壹若,警方通過查閱死者的電腦和手機嗅钻,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評論 3 394
  • 文/潘曉璐 我一進店門皂冰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來店展,“玉大人,你說我怎么就攤上這事秃流÷冈蹋” “怎么了?”我有些...
    開封第一講書人閱讀 164,133評論 0 354
  • 文/不壞的土叔 我叫張陵舶胀,是天一觀的道長概说。 經(jīng)常有香客問我,道長嚣伐,這世上最難降的妖魔是什么糖赔? 我笑而不...
    開封第一講書人閱讀 58,532評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮轩端,結(jié)果婚禮上放典,老公的妹妹穿的比我還像新娘。我一直安慰自己基茵,他們只是感情好奋构,可當我...
    茶點故事閱讀 67,585評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著拱层,像睡著了一般弥臼。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上根灯,一...
    開封第一講書人閱讀 51,462評論 1 302
  • 那天径缅,我揣著相機與錄音,去河邊找鬼烙肺。 笑死纳猪,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的茬高。 我是一名探鬼主播兆旬,決...
    沈念sama閱讀 40,262評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼怎栽!你這毒婦竟也來了丽猬?” 一聲冷哼從身側(cè)響起宿饱,我...
    開封第一講書人閱讀 39,153評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎脚祟,沒想到半個月后谬以,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,587評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡由桌,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,792評論 3 336
  • 正文 我和宋清朗相戀三年为黎,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片行您。...
    茶點故事閱讀 39,919評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡铭乾,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出娃循,到底是詐尸還是另有隱情炕檩,我是刑警寧澤,帶...
    沈念sama閱讀 35,635評論 5 345
  • 正文 年R本政府宣布捌斧,位于F島的核電站笛质,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏捞蚂。R本人自食惡果不足惜妇押,卻給世界環(huán)境...
    茶點故事閱讀 41,237評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望姓迅。 院中可真熱鬧敲霍,春花似錦、人聲如沸队贱。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽柱嫌。三九已至锋恬,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間编丘,已是汗流浹背与学。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留嘉抓,地道東北人索守。 一個月前我還...
    沈念sama閱讀 48,048評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像抑片,于是被迫代替她去往敵國和親卵佛。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,864評論 2 354

推薦閱讀更多精彩內(nèi)容