daemonset controller 源碼分析

在前面的文章中已經(jīng)分析過(guò) deployment蔑担、statefulset 兩個(gè)重要對(duì)象了悍缠,本文會(huì)繼續(xù)分析 kubernetes 中另一個(gè)重要的對(duì)象 daemonset,在 kubernetes 中 daemonset 類似于 linux 上的守護(hù)進(jìn)程會(huì)運(yùn)行在每一個(gè) node 上颅围,在實(shí)際場(chǎng)景中进宝,一般會(huì)將日志采集或者網(wǎng)絡(luò)插件采用 daemonset 的方式部署。

DaemonSet 的基本操作

創(chuàng)建

daemonset 在創(chuàng)建后會(huì)在每個(gè) node 上都啟動(dòng)一個(gè) pod蝗茁。

$ kubectl create -f nginx-ds.yaml

擴(kuò)縮容

由于 daemonset 是在每個(gè) node 上啟動(dòng)一個(gè) pod秘血,其不存在擴(kuò)縮容操作,副本數(shù)量跟 node 數(shù)量保持一致评甜。

更新

daemonset 有兩種更新策略 OnDeleteRollingUpdate灰粮,默認(rèn)為 RollingUpdate。滾動(dòng)更新時(shí)忍坷,需要指定 .spec.updateStrategy.rollingUpdate.maxUnavailable(默認(rèn)為1)和 .spec.minReadySeconds(默認(rèn)為 0)粘舟。

// 更新鏡像
$ kubectl set image ds/nginx-ds nginx-ds=nginx:1.16

// 查看更新狀態(tài)
$ kubectl rollout status ds/nginx-ds

回滾

在 statefulset 源碼分析一節(jié)已經(jīng)提到過(guò) controllerRevision 這個(gè)對(duì)象了,其主要用來(lái)保存歷史版本信息佩研,在更新以及回滾操作時(shí)使用柑肴,daemonset controller 也是使用 controllerrevision 保存歷史版本信息,在回滾時(shí)會(huì)使用歷史 controllerrevision 中的信息替換 daemonset 中 Spec.Template旬薯。

// 查看 ds 歷史版本信息
$ kubectl get controllerrevision
NAME                  CONTROLLER                REVISION   AGE
nginx-ds-5c4b75bdbb   daemonset.apps/nginx-ds   2          122m
nginx-ds-7cd7798dcd   daemonset.apps/nginx-ds   1          133m

// 回滾到版本 1
$ kubectl rollout undo daemonset nginx-ds --to-revision=1

// 查看回滾狀態(tài)
$ kubectl rollout status ds/nginx-ds

暫停

daemonset 目前不支持暫停操作晰骑。

刪除

daemonset 也支持兩種刪除操作。

// 非級(jí)聯(lián)刪除
$ kubectl delete ds/nginx-ds --cascade=false

// 級(jí)聯(lián)刪除
$ kubectl delete ds/nginx-ds

DaemonSetController 源碼分析

kubernetes 版本:v1.16

首先還是看 startDaemonSetController 方法绊序,在此方法中會(huì)初始化 DaemonSetsController 對(duì)象并調(diào)用 Run方法啟動(dòng) daemonset controller硕舆,從該方法中可以看出 daemonset controller 會(huì)監(jiān)聽 daemonsetscontrollerRevision骤公、podnode 四種對(duì)象資源的變動(dòng)抚官。其中 ConcurrentDaemonSetSyncs的默認(rèn)值為 2。

k8s.io/kubernetes/cmd/kube-controller-manager/app/apps.go:36

func startDaemonSetController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "daemonsets"}] {
        return nil, false, nil
    }
    dsc, err := daemon.NewDaemonSetsController(
        ctx.InformerFactory.Apps().V1().DaemonSets(),
        ctx.InformerFactory.Apps().V1().ControllerRevisions(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.InformerFactory.Core().V1().Nodes(),
        ctx.ClientBuilder.ClientOrDie("daemon-set-controller"),
        flowcontrol.NewBackOff(1*time.Second, 15*time.Minute),
    )
    if err != nil {
        return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err)
    }
    go dsc.Run(int(ctx.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), ctx.Stop)
    return nil, true, nil
}

Run 方法中會(huì)啟動(dòng)兩個(gè)操作阶捆,一個(gè)就是 dsc.runWorker 執(zhí)行的 sync 操作凌节,另一個(gè)就是 dsc.failedPodsBackoff.GC 執(zhí)行的 gc 操作,主要邏輯為:

  • 1洒试、等待 informer 緩存同步完成倍奢;
  • 2、啟動(dòng)兩個(gè) goroutine 分別執(zhí)行 dsc.runWorker垒棋;
  • 3卒煞、啟動(dòng)一個(gè) goroutine 每分鐘執(zhí)行一次 dsc.failedPodsBackoff.GC,從 startDaemonSetController 方法中可以看到 failedPodsBackoff 的 duration為1s捕犬,max duration為15m跷坝,failedPodsBackoff 的主要作用是當(dāng)發(fā)現(xiàn) daemon pod 狀態(tài)為 failed 時(shí),會(huì)定時(shí)重啟該 pod碉碉;

k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:263

func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer dsc.queue.ShutDown()

    defer klog.Infof("Shutting down daemon sets controller")

    if !cache.WaitForNamedCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
        return
    }

    for i := 0; i < workers; i++ {
        // sync 操作
        go wait.Until(dsc.runWorker, time.Second, stopCh)
    }
        
    // GC 操作
    go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, stopCh)

    <-stopCh
}

syncDaemonSet

daemonset 中 pod 的創(chuàng)建與刪除是與 node 相關(guān)聯(lián)的柴钻,所以每次執(zhí)行 sync 操作時(shí)需要遍歷所有的 node 進(jìn)行判斷。syncDaemonSet 的主要邏輯為:

  • 1垢粮、通過(guò) key 獲取 ns 和 name贴届;
  • 2、從 dsLister 中獲取 ds 對(duì)象蜡吧;
  • 3毫蚓、從 nodeLister 獲取所有 node;
  • 4昔善、獲取 dsKey元潘;
  • 5、判斷 ds 是否處于刪除狀態(tài)君仆;
  • 6翩概、調(diào)用 constructHistory 獲取 current 和 old controllerRevision
  • 7返咱、調(diào)用 dsc.expectations.SatisfiedExpectations 判斷是否滿足 expectations 機(jī)制钥庇,expectations 機(jī)制的目的就是減少不必要的 sync 操作,關(guān)于 expectations 機(jī)制的詳細(xì)說(shuō)明可以參考筆者以前寫的 "replicaset controller 源碼分析"一文咖摹;
  • 8评姨、調(diào)用 dsc.manage 執(zhí)行實(shí)際的 sync 操作;
  • 9萤晴、判斷是否為更新操作吐句,并執(zhí)行對(duì)應(yīng)的更新操作邏輯;
  • 10店读、調(diào)用 dsc.cleanupHistory 根據(jù) spec.revisionHistoryLimit字段清理過(guò)期的 controllerrevision蕴侧;
  • 11、調(diào)用 dsc.updateDaemonSetStatus 更新 ds 狀態(tài)两入;

k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:1212

func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
    ......

    // 1净宵、通過(guò) key 獲取 ns 和 name
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return err
    }
    
    // 2、從 dsLister 中獲取 ds 對(duì)象
    ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
    if errors.IsNotFound(err) {
        dsc.expectations.DeleteExpectations(key)
        return nil
    }
    ......

    // 3裹纳、從 nodeLister 獲取所有 node
    nodeList, err := dsc.nodeLister.List(labels.Everything())
        ......

    everything := metav1.LabelSelector{}
    if reflect.DeepEqual(ds.Spec.Selector, &everything) {
        dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required. ")
        return nil
    }
    
    // 4择葡、獲取 dsKey
    dsKey, err := controller.KeyFunc(ds)
    if err != nil {
        return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
    }

    // 5剃氧、判斷 ds 是否處于刪除狀態(tài)
    if ds.DeletionTimestamp != nil {
        return nil
    }

    // 6敏储、獲取 current 和 old controllerRevision
    cur, old, err := dsc.constructHistory(ds)
    if err != nil {
        return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
    }
    hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
    
    // 7、判斷是否滿足 expectations 機(jī)制
    if !dsc.expectations.SatisfiedExpectations(dsKey) {
        return dsc.updateDaemonSetStatus(ds, nodeList, hash, false)
    }

    // 8朋鞍、執(zhí)行實(shí)際的 sync 操作
    err = dsc.manage(ds, nodeList, hash)
    if err != nil {
        return err
    }

    // 9已添、判斷是否為更新操作妥箕,并執(zhí)行對(duì)應(yīng)的更新操作
    if dsc.expectations.SatisfiedExpectations(dsKey) {
        switch ds.Spec.UpdateStrategy.Type {
        case apps.OnDeleteDaemonSetStrategyType:
        case apps.RollingUpdateDaemonSetStrategyType:
            err = dsc.rollingUpdate(ds, nodeList, hash)
        }
        if err != nil {
            return err
        }
    }
    // 10、清理過(guò)期的 controllerrevision
    err = dsc.cleanupHistory(ds, old)
    if err != nil {
        return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
    }

    // 11更舞、更新 ds 狀態(tài)
    return dsc.updateDaemonSetStatus(ds, nodeList, hash, true)
}

syncDaemonSet 中主要有 manage畦幢、rollingUpdateupdateDaemonSetStatus 三個(gè)方法,分別對(duì)應(yīng)創(chuàng)建缆蝉、更新與狀態(tài)同步宇葱,下面主要來(lái)分析這三個(gè)方法。

manage

manage 主要是用來(lái)保證 ds 的 pod 數(shù)正常運(yùn)行在每一個(gè) node 上刊头,其主要邏輯為:

  • 1黍瞧、調(diào)用 dsc.getNodesToDaemonPods 獲取已存在 daemon pod 與 node 的映射關(guān)系;
  • 2原杂、遍歷所有 node印颤,調(diào)用 dsc.podsShouldBeOnNode 方法來(lái)確定在給定的節(jié)點(diǎn)上需要?jiǎng)?chuàng)建還是刪除 daemon pod;
  • 3穿肄、判斷是否啟動(dòng)了 ScheduleDaemonSetPodsfeature-gates 特性膀哲,若啟動(dòng)了則需要?jiǎng)h除通過(guò)默認(rèn)調(diào)度器已經(jīng)調(diào)度到不存在 node 上的 daemon pod;
  • 4被碗、調(diào)用 dsc.syncNodes 為對(duì)應(yīng)的 node 創(chuàng)建 daemon pod 以及刪除多余的 pods某宪;

k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:952

func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
    // 1、獲取已存在 daemon pod 與 node 的映射關(guān)系
    nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
    ......

    // 2锐朴、判斷每一個(gè) node 是否需要運(yùn)行 daemon pod
    var nodesNeedingDaemonPods, podsToDelete []string
    for _, node := range nodeList {
        nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, err := dsc.podsShouldBeOnNode(
            node, nodeToDaemonPods, ds)

        if err != nil {
            continue
        }

        nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
        podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
    }

    // 3兴喂、判斷是否啟動(dòng)了 ScheduleDaemonSetPods feature-gates 特性,若啟用了則對(duì)不存在 node 上的 
    // daemon pod 進(jìn)行刪除 
    if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
        podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)
    }

    // 4焚志、為對(duì)應(yīng)的 node 創(chuàng)建 daemon pod 以及刪除多余的 pods
    if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
        return err
    }

    return nil
}

manage 方法中又調(diào)用了 getNodesToDaemonPods衣迷、podsShouldBeOnNodesyncNodes 三個(gè)方法,繼續(xù)來(lái)看這幾種方法的作用酱酬。

getNodesToDaemonPods

getNodesToDaemonPods 是用來(lái)獲取已存在 daemon pod 與 node 的映射關(guān)系壶谒,并且會(huì)通過(guò) adopt/orphan 方法關(guān)聯(lián)以及釋放對(duì)應(yīng)的 pod。

k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:820

func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *apps.DaemonSet) (map[string][]*v1.Pod, error) {
    claimedPods, err := dsc.getDaemonPods(ds)
    if err != nil {
        return nil, err
    }
    nodeToDaemonPods := make(map[string][]*v1.Pod)
    for _, pod := range claimedPods {
        nodeName, err := util.GetTargetNodeName(pod)
        if err != nil {
            klog.Warningf("Failed to get target node name of Pod %v/%v in DaemonSet %v/%v",
                pod.Namespace, pod.Name, ds.Namespace, ds.Name)
            continue
        }

        nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], pod)
    }

    return nodeToDaemonPods, nil
}
podsShouldBeOnNode

podsShouldBeOnNode 方法用來(lái)確定在給定的節(jié)點(diǎn)上需要?jiǎng)?chuàng)建還是刪除 daemon pod膳沽,主要邏輯為:

  • 1汗菜、調(diào)用 dsc.nodeShouldRunDaemonPod 判斷該 node 是否需要運(yùn)行 daemon pod 以及 pod 能不能調(diào)度成功,該方法返回三個(gè)值 wantToRun, shouldSchedule, shouldContinueRunning挑社;
  • 2陨界、通過(guò)判斷 wantToRun, shouldSchedule, shouldContinueRunning 將需要?jiǎng)?chuàng)建 daemon pod 的 node 列表以及需要?jiǎng)h除的 pod 列表獲取到, wantToRun主要檢查的是 selector痛阻、taints 等是否匹配菌瘪,shouldSchedule 主要檢查 node 上的資源是否充足,shouldContinueRunning 默認(rèn)為 true阱当;

k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:866

func (dsc *DaemonSetsController) podsShouldBeOnNode(...) (nodesNeedingDaemonPods, podsToDelete []string, err error) {
    // 1俏扩、判斷該 node 是否需要運(yùn)行 daemon pod 以及能不能調(diào)度成功
    wantToRun, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
    if err != nil {
        return
    }
    // 2糜工、獲取該節(jié)點(diǎn)上的指定ds的pod列表
    daemonPods, exists := nodeToDaemonPods[node.Name]
    dsKey, err := cache.MetaNamespaceKeyFunc(ds)
    if err != nil {
        utilruntime.HandleError(err)
        return
    }
        
    // 3、從 suspended list 中移除在該節(jié)點(diǎn)上 ds 的 pod
    dsc.removeSuspendedDaemonPods(node.Name, dsKey)

    switch {
    // 4录淡、對(duì)于需要?jiǎng)?chuàng)建 pod 但是不能調(diào)度 pod 的 node捌木,先把 pod 放入到 suspended 隊(duì)列中
    case wantToRun && !shouldSchedule:
        dsc.addSuspendedDaemonPods(node.Name, dsKey)
    // 5、需要?jiǎng)?chuàng)建 pod 且 pod 未運(yùn)行赁咙,則創(chuàng)建 pod
    case shouldSchedule && !exists:
        nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
    // 6、需要 pod 一直運(yùn)行
    case shouldContinueRunning:
        var daemonPodsRunning []*v1.Pod
        for _, pod := range daemonPods {
            if pod.DeletionTimestamp != nil {
                continue
            }
            // 7免钻、如果 pod 運(yùn)行狀態(tài)為 failed彼水,則刪除該 pod
            if pod.Status.Phase == v1.PodFailed {
                backoffKey := failedPodsBackoffKey(ds, node.Name)

                now := dsc.failedPodsBackoff.Clock.Now()
                inBackoff := dsc.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, now)
                if inBackoff {
                    delay := dsc.failedPodsBackoff.Get(backoffKey)
                    dsc.enqueueDaemonSetAfter(ds, delay)
                    continue
                }

                dsc.failedPodsBackoff.Next(backoffKey, now)
                podsToDelete = append(podsToDelete, pod.Name)
            } else {
                daemonPodsRunning = append(daemonPodsRunning, pod)
            }
        }
        // 8、如果節(jié)點(diǎn)上已經(jīng)運(yùn)行 daemon pod 數(shù) > 1极舔,保留運(yùn)行時(shí)間最長(zhǎng)的 pod凤覆,其余的刪除
        if len(daemonPodsRunning) > 1 {
            sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning))
            for i := 1; i < len(daemonPodsRunning); i++ {
                podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name)
            }
        }
    // 9、如果 pod 不需要繼續(xù)運(yùn)行但 pod 已存在則需要?jiǎng)h除 pod
    case !shouldContinueRunning && exists:
        for _, pod := range daemonPods {
            if pod.DeletionTimestamp != nil {
                continue
            }
            podsToDelete = append(podsToDelete, pod.Name)
        }
    }

    return nodesNeedingDaemonPods, podsToDelete, nil
}

然后繼續(xù)看 nodeShouldRunDaemonPod 方法的主要邏輯:

  • 1拆魏、調(diào)用 NewPod 為該 node 構(gòu)建一個(gè) daemon pod object盯桦;
  • 2、判斷 ds 是否指定了 .Spec.Template.Spec.NodeName 字段渤刃;
  • 3拥峦、調(diào)用 dsc.simulate 執(zhí)行 GeneralPredicates 預(yù)選算法檢查該 node 是否能夠調(diào)度成功;
  • 4卖子、判斷 GeneralPredicates 預(yù)選算法執(zhí)行后的 reasons 確定 wantToRun, shouldSchedule, shouldContinueRunning 的值略号;

k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:1337

func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (wantToRun, shouldSchedule, shouldContinueRunning bool, err   error) {
    // 1、構(gòu)建 daemon pod object
    newPod := NewPod(ds, node.Name)

    wantToRun, shouldSchedule, shouldContinueRunning = true, true, true
    // 2洋闽、判斷 ds 是否指定了 node玄柠,若指定了且不為當(dāng)前 node 直接返回 false
    if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
        return false, false, false, nil
    }

    // 3、執(zhí)行 GeneralPredicates 預(yù)選算法
    reasons, nodeInfo, err := dsc.simulate(newPod, node, ds)
    if err != nil {
        ......
    }

    // 4诫舅、檢查預(yù)選算法執(zhí)行的結(jié)果
    var insufficientResourceErr error
    for _, r := range reasons {
        switch reason := r.(type) {
        case *predicates.InsufficientResourceError:
            insufficientResourceErr = reason
        case *predicates.PredicateFailureError:
            var emitEvent bool
            switch reason {
            case
                predicates.ErrNodeSelectorNotMatch,
                predicates.ErrPodNotMatchHostName,
                predicates.ErrNodeLabelPresenceViolated,

                predicates.ErrPodNotFitsHostPorts:
                return false, false, false, nil
            case predicates.ErrTaintsTolerationsNotMatch:
                fitsNoExecute, _, err := predicates.PodToleratesNodeNoExecuteTaints(newPod, nil, nodeInfo)
                if err != nil {
                    return false, false, false, err
                }
                if !fitsNoExecute {
                    return false, false, false, nil
                }
                wantToRun, shouldSchedule = false, false
            case
                predicates.ErrDiskConflict,
                predicates.ErrVolumeZoneConflict,
                predicates.ErrMaxVolumeCountExceeded,
                predicates.ErrNodeUnderMemoryPressure,
                predicates.ErrNodeUnderDiskPressure:
                shouldSchedule = false
                emitEvent = true
            case
                predicates.ErrPodAffinityNotMatch,
                predicates.ErrServiceAffinityViolated:
                
                return false, false, false, fmt.Errorf("unexpected reason: DaemonSet Predicates should not return reason %s", reason.GetReason())
            default:             
                wantToRun, shouldSchedule, shouldContinueRunning = false, false, false
                emitEvent = true
            }
            ......
        }
    }

    if shouldSchedule && insufficientResourceErr != nil {
        dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name,                  insufficientResourceErr.Error())
        shouldSchedule = false
    }
    return
}
syncNodes

syncNodes 方法主要是為需要 daemon pod 的 node 創(chuàng)建 pod 以及刪除多余的 pod羽利,其主要邏輯為:

  • 1、將 createDiffdeleteDiffburstReplicas 進(jìn)行比較刊懈,burstReplicas 默認(rèn)值為 250 即每個(gè) syncLoop 中創(chuàng)建或者刪除的 pod 數(shù)最多為 250 個(gè)这弧,若超過(guò)其值則剩余需要?jiǎng)?chuàng)建或者刪除的 pod 在下一個(gè) syncLoop 繼續(xù)操作;
  • 2虚汛、將 createDiffdeleteDiff 寫入到 expectations 中当宴;
  • 3、并發(fā)創(chuàng)建 pod泽疆,創(chuàng)建 pod 有兩種方法:(1)創(chuàng)建的 pod 不經(jīng)過(guò)默認(rèn)調(diào)度器户矢,直接指定了 pod 的運(yùn)行節(jié)點(diǎn)(即設(shè)定pod.Spec.NodeName);(2)若啟用了 ScheduleDaemonSetPods feature-gates 特性殉疼,則使用默認(rèn)調(diào)度器進(jìn)行創(chuàng)建 pod梯浪,通過(guò) nodeAffinity來(lái)保證每個(gè)節(jié)點(diǎn)都運(yùn)行一個(gè) pod捌年;
  • 4、并發(fā)刪除 deleteDiff 中的所有 pod挂洛;

ScheduleDaemonSetPods 是一個(gè) feature-gates 特性礼预,其出現(xiàn)在 v1.11 中,在 v1.12 中處于 Beta 版本虏劲,v1.17 為 GA 版托酸。最初 daemonset controller 只有一種創(chuàng)建 pod 的方法,即直接指定 pod 的 spec.NodeName 字段柒巫,但是目前這種方式已經(jīng)暴露了許多問(wèn)題励堡,在以后的發(fā)展中社區(qū)還是希望能通過(guò)默認(rèn)調(diào)度器進(jìn)行調(diào)度,所以才出現(xiàn)了第二種方式堡掏,原因主要有以下五點(diǎn):

  • 1应结、DaemonSet 無(wú)法感知 node 上資源的變化 (#46935, #58868):當(dāng) pod 第一次因資源不夠無(wú)法創(chuàng)建時(shí),若其他 pod 退出后資源足夠時(shí) DaemonSet 無(wú)法感知到泉唁;
  • 2鹅龄、Daemonset 無(wú)法支持 Pod Affinity 和 Pod AntiAffinity 的功能(#29276);
  • 3亭畜、在某些功能上需要實(shí)現(xiàn)和 scheduler 重復(fù)的代碼邏輯, 例如:critical pods (#42028), tolerant/taint扮休;
  • 4、當(dāng) DaemonSet 的 Pod 創(chuàng)建失敗時(shí)難以 debug拴鸵,例如:資源不足時(shí)肛炮,對(duì)于 pending pod 最好能打一個(gè) event 說(shuō)明;
  • 5宝踪、多個(gè)組件同時(shí)調(diào)度時(shí)難以實(shí)現(xiàn)搶占機(jī)制:這也是無(wú)法通過(guò)橫向擴(kuò)展調(diào)度器提高調(diào)度吞吐量的一個(gè)原因侨糟;

更詳細(xì)的原因可以參考社區(qū)的文檔:schedule-DS-pod-by-scheduler.md

k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:990

func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
    ......

    // 1瘩燥、設(shè)置 burstReplicas 
    createDiff := len(nodesNeedingDaemonPods)
    deleteDiff := len(podsToDelete)

    if createDiff > dsc.burstReplicas {
        createDiff = dsc.burstReplicas
    }
    if deleteDiff > dsc.burstReplicas {
        deleteDiff = dsc.burstReplicas
    }

    // 2秕重、寫入到 expectations 中
    dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)

    errCh := make(chan error, createDiff+deleteDiff)
    createWait := sync.WaitGroup{}
    generation, err := util.GetTemplateGeneration(ds)
    if err != nil {
        generation = nil
    }
    template := util.CreatePodTemplate(ds.Spec.Template, generation, hash)

    // 3、并發(fā)創(chuàng)建 pod厉膀,創(chuàng)建的 pod 數(shù)依次為 1, 2, 4, 8, ...
    batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
    for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
        errorCount := len(errCh)
        createWait.Add(batchSize)
        for i := pos; i < pos+batchSize; i++ {
            go func(ix int) {
                defer createWait.Done()
                var err error

                podTemplate := template.DeepCopy()
                
                // 4溶耘、若啟動(dòng)了 ScheduleDaemonSetPods 功能,則通過(guò) kube-scheduler 創(chuàng)建 pod
                if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
                    podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
                        podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])

                    err = dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate,
                        ds, metav1.NewControllerRef(ds, controllerKind))
                } else {
                    // 5服鹅、否則直接設(shè)置 pod 的 .spec.NodeName 創(chuàng)建 pod
                    err = dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, podTemplate,
                        ds, metav1.NewControllerRef(ds, controllerKind))
                }

                // 6凳兵、創(chuàng)建 pod 時(shí)忽略 timeout err
                if err != nil && errors.IsTimeout(err) {
                    return
                }
                if err != nil {
                    dsc.expectations.CreationObserved(dsKey)
                    errCh <- err
                    utilruntime.HandleError(err)
                }
            }(i)
        }
        createWait.Wait()

        // 7、將創(chuàng)建失敗的 pod 數(shù)記錄到 expectations 中
        skippedPods := createDiff - (batchSize + pos)
        if errorCount < len(errCh) && skippedPods > 0 {
            dsc.expectations.LowerExpectations(dsKey, skippedPods, 0)
            break
        }
    }

    // 8企软、并發(fā)刪除 deleteDiff 中的 pod
    deleteWait := sync.WaitGroup{}
    deleteWait.Add(deleteDiff)
    for i := 0; i < deleteDiff; i++ {
        go func(ix int) {
            defer deleteWait.Done()
            if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
                dsc.expectations.DeletionObserved(dsKey)
                errCh <- err
                utilruntime.HandleError(err)
            }
        }(i)
    }
    deleteWait.Wait()
    errors := []error{}
    close(errCh)
    for err := range errCh {
        errors = append(errors, err)
    }
    return utilerrors.NewAggregate(errors)
}

RollingUpdate

daemonset update 的方式有兩種 OnDeleteRollingUpdate庐扫,當(dāng)為 OnDelete 時(shí)需要用戶手動(dòng)刪除每一個(gè) pod 后完成更新操作,當(dāng)為 RollingUpdate 時(shí),daemonset controller 會(huì)自動(dòng)控制升級(jí)進(jìn)度形庭。

當(dāng)為 RollingUpdate 時(shí)铅辞,主要邏輯為:

  • 1、獲取 daemonset pod 與 node 的映射關(guān)系萨醒;
  • 2斟珊、根據(jù) controllerrevision 的 hash 值獲取所有未更新的 pods;
  • 3富纸、獲取 maxUnavailable, numUnavailable 的 pod 數(shù)值囤踩,maxUnavailable 是從 ds 的 rollingUpdate 字段中獲取的默認(rèn)值為 1,numUnavailable 的值是通過(guò) daemonset pod 與 node 的映射關(guān)系計(jì)算每個(gè) node 下是否有 available pod 得到的晓褪;
  • 4堵漱、通過(guò) oldPods 獲取 oldAvailablePods, oldUnavailablePods 的 pod 列表;
  • 5辞州、遍歷 oldUnavailablePods 列表將需要?jiǎng)h除的 pod 追加到 oldPodsToDelete 數(shù)組中怔锌。oldUnavailablePods 列表中的 pod 分為兩種寥粹,一種處于更新中变过,即刪除狀態(tài),一種處于未更新且異常狀態(tài)涝涤,處于異常狀態(tài)的都需要被刪除媚狰;
  • 6、遍歷 oldAvailablePods 列表阔拳,此列表中的 pod 都處于正常運(yùn)行狀態(tài)崭孤,根據(jù) maxUnavailable 值確定是否需要?jiǎng)h除該 pod 并將需要?jiǎng)h除的 pod 追加到 oldPodsToDelete 數(shù)組中;
  • 7糊肠、調(diào)用 dsc.syncNodes 刪除 oldPodsToDelete 數(shù)組中的 pods辨宠,syncNodes 方法在 manage 階段已經(jīng)分析過(guò),此處不再詳述货裹;

rollingUpdate 的結(jié)果是找出需要?jiǎng)h除的 pods 并進(jìn)行刪除嗤形,被刪除的 pod 在下一個(gè) syncLoop 中會(huì)通過(guò) manage 方法使用最新版本的 daemonset template 進(jìn)行創(chuàng)建,整個(gè)滾動(dòng)更新的過(guò)程是通過(guò)先刪除再創(chuàng)建的方式一步步完成更新的弧圆,每次操作都是嚴(yán)格按照 maxUnavailable 的值確定需要?jiǎng)h除的 pod 數(shù)赋兵。

k8s.io/kubernetes/pkg/controller/daemon/update.go:43

func (dsc *DaemonSetsController) rollingUpdate(......) error {
    // 1、獲取 daemonset pod 與 node 的映射關(guān)系
    nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
    ......

    // 2搔预、獲取所有未更新的 pods
    _, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods, hash)
    
    // 3霹期、計(jì)算 maxUnavailable, numUnavailable 的 pod 數(shù)值
    maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeList, nodeToDaemonPods)
    if err != nil {
        return fmt.Errorf("couldn't get unavailable numbers: %v", err)
    }
    oldAvailablePods, oldUnavailablePods := util.SplitByAvailablePods(ds.Spec.MinReadySeconds, oldPods)

    // 4、將非 running 狀態(tài)的 pods 加入到 oldPodsToDelete 中
    var oldPodsToDelete []string
    for _, pod := range oldUnavailablePods {
        if pod.DeletionTimestamp != nil {
            continue
        }
        oldPodsToDelete = append(oldPodsToDelete, pod.Name)
    }
    // 5拯田、根據(jù) maxUnavailable 值確定是否需要?jiǎng)h除 pod
    for _, pod := range oldAvailablePods {
        if numUnavailable >= maxUnavailable {
            break
        }
        oldPodsToDelete = append(oldPodsToDelete, pod.Name)
        numUnavailable++
    }
    // 6历造、調(diào)用 syncNodes 方法刪除 oldPodsToDelete 數(shù)組中的 pods
    return dsc.syncNodes(ds, oldPodsToDelete, []string{}, hash)
}

總結(jié)一下,manage 方法中的主要流程為:

            |->  dsc.getNodesToDaemonPods
            |
            |
manage ---- |->  dsc.podsShouldBeOnNode  ---> dsc.nodeShouldRunDaemonPod
            |
            |
            |->  dsc.syncNodes

updateDaemonSetStatus

updateDaemonSetStatussyncDaemonSet 中最后執(zhí)行的方法,主要是用來(lái)計(jì)算 ds status subresource 中的值并更新其 status帕膜。status 如下所示:

status:
  currentNumberScheduled: 1  // 已經(jīng)運(yùn)行了 DaemonSet Pod的節(jié)點(diǎn)數(shù)量
  desiredNumberScheduled: 1  // 需要運(yùn)行該DaemonSet Pod的節(jié)點(diǎn)數(shù)量
  numberMisscheduled: 0      // 不需要運(yùn)行 DeamonSet Pod 但是已經(jīng)運(yùn)行了的節(jié)點(diǎn)數(shù)量
  numberReady: 0             // DaemonSet Pod狀態(tài)為Ready的節(jié)點(diǎn)數(shù)量
  numberAvailable: 1                 // DaemonSet Pod狀態(tài)為Ready且運(yùn)行時(shí)間超過(guò)                                                                                                     // Spec.MinReadySeconds 的節(jié)點(diǎn)數(shù)量
  numberUnavailable: 0           // desiredNumberScheduled - numberAvailable 的節(jié)點(diǎn)數(shù)量
  observedGeneration: 3
  updatedNumberScheduled: 1  // 已經(jīng)完成DaemonSet Pod更新的節(jié)點(diǎn)數(shù)量

updateDaemonSetStatus 主要邏輯為:

  • 1枣氧、調(diào)用 dsc.getNodesToDaemonPods 獲取已存在 daemon pod 與 node 的映射關(guān)系;
  • 2垮刹、遍歷所有 node达吞,調(diào)用 dsc.nodeShouldRunDaemonPod 判斷該 node 是否需要運(yùn)行 daemon pod,然后計(jì)算 status 中的部分字段值荒典;
  • 3酪劫、調(diào)用 storeDaemonSetStatus 更新 ds status subresource;
  • 4寺董、判斷 ds 是否需要 resync覆糟;

k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:1152

func (dsc *DaemonSetsController) updateDaemonSetStatus(......) error {
    // 1、獲取已存在 daemon pod 與 node 的映射關(guān)系
    nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
    ......

    var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable int
    for _, node := range nodeList {
        // 2遮咖、判斷該 node 是否需要運(yùn)行 daemon pod
        wantToRun, _, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
        if err != nil {
            return err
        }

        scheduled := len(nodeToDaemonPods[node.Name]) > 0
        // 3滩字、計(jì)算 status 中的字段值
        if wantToRun {
            desiredNumberScheduled++
            if scheduled {
                currentNumberScheduled++
                daemonPods, _ := nodeToDaemonPods[node.Name]
                sort.Sort(podByCreationTimestampAndPhase(daemonPods))
                pod := daemonPods[0]
                if podutil.IsPodReady(pod) {
                    numberReady++
                    if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Now()) {
                        numberAvailable++
                    }
                }

                generation, err := util.GetTemplateGeneration(ds)
                if err != nil {
                    generation = nil
                }
                if util.IsPodUpdated(pod, hash, generation) {
                    updatedNumberScheduled++
                }
            }
        } else {
            if scheduled {
                numberMisscheduled++
            }
        }
    }
    numberUnavailable := desiredNumberScheduled - numberAvailable
    // 4、更新 daemonset status subresource
    err = storeDaemonSetStatus(dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen)
    if err != nil {
        return fmt.Errorf("error storing status for daemon set %#v: %v", ds, err)
    }

    // 5御吞、判斷 ds 是否需要 resync
    if ds.Spec.MinReadySeconds > 0 && numberReady != numberAvailable {
        dsc.enqueueDaemonSetAfter(ds, time.Duration(ds.Spec.MinReadySeconds)*time.Second)
    }
    return nil
}

最后麦箍,再總結(jié)一下 syncDaemonSet 方法的主要流程:

                                |-> dsc.getNodesToDaemonPods
                                |
                                |
                  |-> manage -->|-> dsc.podsShouldBeOnNode  ---> dsc.nodeShouldRunDaemonPod
                  |             |
                  |             |
syncDaemonSet --> |             |-> dsc.syncNodes
                  |
                  |-> rollingUpdate
                  |
                  |
                  |-> updateDaemonSetStatus

總結(jié)

在 daemonset controller 中可以看到許多功能都是 deployment 和 statefulset 已有的。在創(chuàng)建 pod 的流程與 replicaset controller 創(chuàng)建 pod 的流程是相似的陶珠,都使用了 expectations 機(jī)制并且限制了在一個(gè) syncLoop 中最多創(chuàng)建或刪除的 pod 數(shù)挟裂。更新方式與 statefulset 一樣都有 OnDeleteRollingUpdate 兩種, OnDelete 方式與 statefulset 相似揍诽,都需要手動(dòng)刪除對(duì)應(yīng)的 pod诀蓉,而 RollingUpdate 方式與 statefulset 和 deployment 都有點(diǎn)區(qū)別, RollingUpdate方式更新時(shí)不支持暫停操作并且 pod 是先刪除再創(chuàng)建的順序進(jìn)行暑脆。版本控制方式與 statefulset 的一樣都是使用 controllerRevision渠啤。最后要說(shuō)的一點(diǎn)是在 v1.12 及以后的版本中,使用 daemonset 創(chuàng)建的 pod 已不再使用直接指定 .spec.nodeName的方式繞過(guò)調(diào)度器進(jìn)行調(diào)度添吗,而是走默認(rèn)調(diào)度器通過(guò) nodeAffinity 的方式調(diào)度到每一個(gè)節(jié)點(diǎn)上沥曹。

參考:

https://yq.aliyun.com/articles/702305

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請(qǐng)通過(guò)簡(jiǎn)信或評(píng)論聯(lián)系作者根资。
  • 序言:七十年代末架专,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子玄帕,更是在濱河造成了極大的恐慌部脚,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,542評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件裤纹,死亡現(xiàn)場(chǎng)離奇詭異委刘,居然都是意外死亡丧没,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,596評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門锡移,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)呕童,“玉大人,你說(shuō)我怎么就攤上這事淆珊《崴牵” “怎么了?”我有些...
    開封第一講書人閱讀 158,021評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵施符,是天一觀的道長(zhǎng)往声。 經(jīng)常有香客問(wèn)我,道長(zhǎng)戳吝,這世上最難降的妖魔是什么浩销? 我笑而不...
    開封第一講書人閱讀 56,682評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮听哭,結(jié)果婚禮上慢洋,老公的妹妹穿的比我還像新娘。我一直安慰自己陆盘,他們只是感情好普筹,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,792評(píng)論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著礁遣,像睡著了一般斑芜。 火紅的嫁衣襯著肌膚如雪肩刃。 梳的紋絲不亂的頭發(fā)上祟霍,一...
    開封第一講書人閱讀 49,985評(píng)論 1 291
  • 那天,我揣著相機(jī)與錄音盈包,去河邊找鬼沸呐。 笑死,一個(gè)胖子當(dāng)著我的面吹牛呢燥,可吹牛的內(nèi)容都是我干的崭添。 我是一名探鬼主播,決...
    沈念sama閱讀 39,107評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼叛氨,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼呼渣!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起寞埠,我...
    開封第一講書人閱讀 37,845評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤屁置,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后仁连,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蓝角,經(jīng)...
    沈念sama閱讀 44,299評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,612評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了使鹅。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片揪阶。...
    茶點(diǎn)故事閱讀 38,747評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖患朱,靈堂內(nèi)的尸體忽然破棺而出鲁僚,到底是詐尸還是另有隱情,我是刑警寧澤裁厅,帶...
    沈念sama閱讀 34,441評(píng)論 4 333
  • 正文 年R本政府宣布蕴茴,位于F島的核電站,受9級(jí)特大地震影響姐直,放射性物質(zhì)發(fā)生泄漏倦淀。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,072評(píng)論 3 317
  • 文/蒙蒙 一声畏、第九天 我趴在偏房一處隱蔽的房頂上張望撞叽。 院中可真熱鬧,春花似錦插龄、人聲如沸愿棋。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,828評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)糠雨。三九已至,卻和暖如春徘跪,著一層夾襖步出監(jiān)牢的瞬間甘邀,已是汗流浹背隧膘。 一陣腳步聲響...
    開封第一講書人閱讀 32,069評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工闲礼, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留何鸡,地道東北人骑歹。 一個(gè)月前我還...
    沈念sama閱讀 46,545評(píng)論 2 362
  • 正文 我出身青樓叁温,卻偏偏與公主長(zhǎng)得像回俐,于是被迫代替她去往敵國(guó)和親镊绪。 傳聞我的和親對(duì)象是個(gè)殘疾皇子夹攒,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,658評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容