在前面的文章中已經(jīng)分析過(guò) deployment蔑担、statefulset 兩個(gè)重要對(duì)象了悍缠,本文會(huì)繼續(xù)分析 kubernetes 中另一個(gè)重要的對(duì)象 daemonset,在 kubernetes 中 daemonset 類似于 linux 上的守護(hù)進(jìn)程會(huì)運(yùn)行在每一個(gè) node 上颅围,在實(shí)際場(chǎng)景中进宝,一般會(huì)將日志采集或者網(wǎng)絡(luò)插件采用 daemonset 的方式部署。
DaemonSet 的基本操作
創(chuàng)建
daemonset 在創(chuàng)建后會(huì)在每個(gè) node 上都啟動(dòng)一個(gè) pod蝗茁。
$ kubectl create -f nginx-ds.yaml
擴(kuò)縮容
由于 daemonset 是在每個(gè) node 上啟動(dòng)一個(gè) pod秘血,其不存在擴(kuò)縮容操作,副本數(shù)量跟 node 數(shù)量保持一致评甜。
更新
daemonset 有兩種更新策略 OnDelete
和 RollingUpdate
灰粮,默認(rèn)為 RollingUpdate
。滾動(dòng)更新時(shí)忍坷,需要指定 .spec.updateStrategy.rollingUpdate.maxUnavailable
(默認(rèn)為1)和 .spec.minReadySeconds
(默認(rèn)為 0)粘舟。
// 更新鏡像
$ kubectl set image ds/nginx-ds nginx-ds=nginx:1.16
// 查看更新狀態(tài)
$ kubectl rollout status ds/nginx-ds
回滾
在 statefulset 源碼分析一節(jié)已經(jīng)提到過(guò) controllerRevision
這個(gè)對(duì)象了,其主要用來(lái)保存歷史版本信息佩研,在更新以及回滾操作時(shí)使用柑肴,daemonset controller 也是使用 controllerrevision
保存歷史版本信息,在回滾時(shí)會(huì)使用歷史 controllerrevision
中的信息替換 daemonset 中 Spec.Template
旬薯。
// 查看 ds 歷史版本信息
$ kubectl get controllerrevision
NAME CONTROLLER REVISION AGE
nginx-ds-5c4b75bdbb daemonset.apps/nginx-ds 2 122m
nginx-ds-7cd7798dcd daemonset.apps/nginx-ds 1 133m
// 回滾到版本 1
$ kubectl rollout undo daemonset nginx-ds --to-revision=1
// 查看回滾狀態(tài)
$ kubectl rollout status ds/nginx-ds
暫停
daemonset 目前不支持暫停操作晰骑。
刪除
daemonset 也支持兩種刪除操作。
// 非級(jí)聯(lián)刪除
$ kubectl delete ds/nginx-ds --cascade=false
// 級(jí)聯(lián)刪除
$ kubectl delete ds/nginx-ds
DaemonSetController 源碼分析
kubernetes 版本:v1.16
首先還是看 startDaemonSetController
方法绊序,在此方法中會(huì)初始化 DaemonSetsController
對(duì)象并調(diào)用 Run
方法啟動(dòng) daemonset controller硕舆,從該方法中可以看出 daemonset controller 會(huì)監(jiān)聽 daemonsets
、controllerRevision
骤公、pod
和 node
四種對(duì)象資源的變動(dòng)抚官。其中 ConcurrentDaemonSetSyncs
的默認(rèn)值為 2。
k8s.io/kubernetes/cmd/kube-controller-manager/app/apps.go:36
func startDaemonSetController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "daemonsets"}] {
return nil, false, nil
}
dsc, err := daemon.NewDaemonSetsController(
ctx.InformerFactory.Apps().V1().DaemonSets(),
ctx.InformerFactory.Apps().V1().ControllerRevisions(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.ClientBuilder.ClientOrDie("daemon-set-controller"),
flowcontrol.NewBackOff(1*time.Second, 15*time.Minute),
)
if err != nil {
return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err)
}
go dsc.Run(int(ctx.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), ctx.Stop)
return nil, true, nil
}
在 Run
方法中會(huì)啟動(dòng)兩個(gè)操作阶捆,一個(gè)就是 dsc.runWorker
執(zhí)行的 sync 操作凌节,另一個(gè)就是 dsc.failedPodsBackoff.GC
執(zhí)行的 gc 操作,主要邏輯為:
- 1洒试、等待 informer 緩存同步完成倍奢;
- 2、啟動(dòng)兩個(gè) goroutine 分別執(zhí)行
dsc.runWorker
垒棋; - 3卒煞、啟動(dòng)一個(gè) goroutine 每分鐘執(zhí)行一次
dsc.failedPodsBackoff.GC
,從startDaemonSetController
方法中可以看到failedPodsBackoff
的 duration為1s捕犬,max duration為15m跷坝,failedPodsBackoff
的主要作用是當(dāng)發(fā)現(xiàn) daemon pod 狀態(tài)為 failed 時(shí),會(huì)定時(shí)重啟該 pod碉碉;
k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:263
func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer dsc.queue.ShutDown()
defer klog.Infof("Shutting down daemon sets controller")
if !cache.WaitForNamedCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
return
}
for i := 0; i < workers; i++ {
// sync 操作
go wait.Until(dsc.runWorker, time.Second, stopCh)
}
// GC 操作
go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, stopCh)
<-stopCh
}
syncDaemonSet
daemonset 中 pod 的創(chuàng)建與刪除是與 node 相關(guān)聯(lián)的柴钻,所以每次執(zhí)行 sync 操作時(shí)需要遍歷所有的 node 進(jìn)行判斷。syncDaemonSet
的主要邏輯為:
- 1垢粮、通過(guò) key 獲取 ns 和 name贴届;
- 2、從 dsLister 中獲取 ds 對(duì)象蜡吧;
- 3毫蚓、從 nodeLister 獲取所有 node;
- 4昔善、獲取 dsKey元潘;
- 5、判斷 ds 是否處于刪除狀態(tài)君仆;
- 6翩概、調(diào)用
constructHistory
獲取 current 和 oldcontrollerRevision
; - 7返咱、調(diào)用
dsc.expectations.SatisfiedExpectations
判斷是否滿足expectations
機(jī)制钥庇,expectations
機(jī)制的目的就是減少不必要的 sync 操作,關(guān)于expectations
機(jī)制的詳細(xì)說(shuō)明可以參考筆者以前寫的 "replicaset controller 源碼分析"一文咖摹; - 8评姨、調(diào)用
dsc.manage
執(zhí)行實(shí)際的 sync 操作; - 9萤晴、判斷是否為更新操作吐句,并執(zhí)行對(duì)應(yīng)的更新操作邏輯;
- 10店读、調(diào)用
dsc.cleanupHistory
根據(jù)spec.revisionHistoryLimit
字段清理過(guò)期的controllerrevision
蕴侧; - 11、調(diào)用
dsc.updateDaemonSetStatus
更新 ds 狀態(tài)两入;
k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:1212
func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
......
// 1净宵、通過(guò) key 獲取 ns 和 name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 2、從 dsLister 中獲取 ds 對(duì)象
ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
if errors.IsNotFound(err) {
dsc.expectations.DeleteExpectations(key)
return nil
}
......
// 3裹纳、從 nodeLister 獲取所有 node
nodeList, err := dsc.nodeLister.List(labels.Everything())
......
everything := metav1.LabelSelector{}
if reflect.DeepEqual(ds.Spec.Selector, &everything) {
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required. ")
return nil
}
// 4择葡、獲取 dsKey
dsKey, err := controller.KeyFunc(ds)
if err != nil {
return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
}
// 5剃氧、判斷 ds 是否處于刪除狀態(tài)
if ds.DeletionTimestamp != nil {
return nil
}
// 6敏储、獲取 current 和 old controllerRevision
cur, old, err := dsc.constructHistory(ds)
if err != nil {
return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
}
hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
// 7、判斷是否滿足 expectations 機(jī)制
if !dsc.expectations.SatisfiedExpectations(dsKey) {
return dsc.updateDaemonSetStatus(ds, nodeList, hash, false)
}
// 8朋鞍、執(zhí)行實(shí)際的 sync 操作
err = dsc.manage(ds, nodeList, hash)
if err != nil {
return err
}
// 9已添、判斷是否為更新操作妥箕,并執(zhí)行對(duì)應(yīng)的更新操作
if dsc.expectations.SatisfiedExpectations(dsKey) {
switch ds.Spec.UpdateStrategy.Type {
case apps.OnDeleteDaemonSetStrategyType:
case apps.RollingUpdateDaemonSetStrategyType:
err = dsc.rollingUpdate(ds, nodeList, hash)
}
if err != nil {
return err
}
}
// 10、清理過(guò)期的 controllerrevision
err = dsc.cleanupHistory(ds, old)
if err != nil {
return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
}
// 11更舞、更新 ds 狀態(tài)
return dsc.updateDaemonSetStatus(ds, nodeList, hash, true)
}
syncDaemonSet
中主要有 manage
畦幢、rollingUpdate
和updateDaemonSetStatus
三個(gè)方法,分別對(duì)應(yīng)創(chuàng)建缆蝉、更新與狀態(tài)同步宇葱,下面主要來(lái)分析這三個(gè)方法。
manage
manage
主要是用來(lái)保證 ds 的 pod 數(shù)正常運(yùn)行在每一個(gè) node 上刊头,其主要邏輯為:
- 1黍瞧、調(diào)用
dsc.getNodesToDaemonPods
獲取已存在 daemon pod 與 node 的映射關(guān)系; - 2原杂、遍歷所有 node印颤,調(diào)用
dsc.podsShouldBeOnNode
方法來(lái)確定在給定的節(jié)點(diǎn)上需要?jiǎng)?chuàng)建還是刪除 daemon pod; - 3穿肄、判斷是否啟動(dòng)了
ScheduleDaemonSetPods
feature-gates 特性膀哲,若啟動(dòng)了則需要?jiǎng)h除通過(guò)默認(rèn)調(diào)度器已經(jīng)調(diào)度到不存在 node 上的 daemon pod; - 4被碗、調(diào)用
dsc.syncNodes
為對(duì)應(yīng)的 node 創(chuàng)建 daemon pod 以及刪除多余的 pods某宪;
k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:952
func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
// 1、獲取已存在 daemon pod 與 node 的映射關(guān)系
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
......
// 2锐朴、判斷每一個(gè) node 是否需要運(yùn)行 daemon pod
var nodesNeedingDaemonPods, podsToDelete []string
for _, node := range nodeList {
nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, err := dsc.podsShouldBeOnNode(
node, nodeToDaemonPods, ds)
if err != nil {
continue
}
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
}
// 3兴喂、判斷是否啟動(dòng)了 ScheduleDaemonSetPods feature-gates 特性,若啟用了則對(duì)不存在 node 上的
// daemon pod 進(jìn)行刪除
if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)
}
// 4焚志、為對(duì)應(yīng)的 node 創(chuàng)建 daemon pod 以及刪除多余的 pods
if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
return err
}
return nil
}
在 manage
方法中又調(diào)用了 getNodesToDaemonPods
衣迷、podsShouldBeOnNode
和 syncNodes
三個(gè)方法,繼續(xù)來(lái)看這幾種方法的作用酱酬。
getNodesToDaemonPods
getNodesToDaemonPods
是用來(lái)獲取已存在 daemon pod 與 node 的映射關(guān)系壶谒,并且會(huì)通過(guò) adopt/orphan
方法關(guān)聯(lián)以及釋放對(duì)應(yīng)的 pod。
k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:820
func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *apps.DaemonSet) (map[string][]*v1.Pod, error) {
claimedPods, err := dsc.getDaemonPods(ds)
if err != nil {
return nil, err
}
nodeToDaemonPods := make(map[string][]*v1.Pod)
for _, pod := range claimedPods {
nodeName, err := util.GetTargetNodeName(pod)
if err != nil {
klog.Warningf("Failed to get target node name of Pod %v/%v in DaemonSet %v/%v",
pod.Namespace, pod.Name, ds.Namespace, ds.Name)
continue
}
nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], pod)
}
return nodeToDaemonPods, nil
}
podsShouldBeOnNode
podsShouldBeOnNode
方法用來(lái)確定在給定的節(jié)點(diǎn)上需要?jiǎng)?chuàng)建還是刪除 daemon pod膳沽,主要邏輯為:
- 1汗菜、調(diào)用
dsc.nodeShouldRunDaemonPod
判斷該 node 是否需要運(yùn)行 daemon pod 以及 pod 能不能調(diào)度成功,該方法返回三個(gè)值wantToRun
,shouldSchedule
,shouldContinueRunning
挑社; - 2陨界、通過(guò)判斷
wantToRun
,shouldSchedule
,shouldContinueRunning
將需要?jiǎng)?chuàng)建 daemon pod 的 node 列表以及需要?jiǎng)h除的 pod 列表獲取到,wantToRun
主要檢查的是 selector痛阻、taints 等是否匹配菌瘪,shouldSchedule
主要檢查 node 上的資源是否充足,shouldContinueRunning
默認(rèn)為 true阱当;
k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:866
func (dsc *DaemonSetsController) podsShouldBeOnNode(...) (nodesNeedingDaemonPods, podsToDelete []string, err error) {
// 1俏扩、判斷該 node 是否需要運(yùn)行 daemon pod 以及能不能調(diào)度成功
wantToRun, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
if err != nil {
return
}
// 2糜工、獲取該節(jié)點(diǎn)上的指定ds的pod列表
daemonPods, exists := nodeToDaemonPods[node.Name]
dsKey, err := cache.MetaNamespaceKeyFunc(ds)
if err != nil {
utilruntime.HandleError(err)
return
}
// 3、從 suspended list 中移除在該節(jié)點(diǎn)上 ds 的 pod
dsc.removeSuspendedDaemonPods(node.Name, dsKey)
switch {
// 4录淡、對(duì)于需要?jiǎng)?chuàng)建 pod 但是不能調(diào)度 pod 的 node捌木,先把 pod 放入到 suspended 隊(duì)列中
case wantToRun && !shouldSchedule:
dsc.addSuspendedDaemonPods(node.Name, dsKey)
// 5、需要?jiǎng)?chuàng)建 pod 且 pod 未運(yùn)行赁咙,則創(chuàng)建 pod
case shouldSchedule && !exists:
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
// 6、需要 pod 一直運(yùn)行
case shouldContinueRunning:
var daemonPodsRunning []*v1.Pod
for _, pod := range daemonPods {
if pod.DeletionTimestamp != nil {
continue
}
// 7免钻、如果 pod 運(yùn)行狀態(tài)為 failed彼水,則刪除該 pod
if pod.Status.Phase == v1.PodFailed {
backoffKey := failedPodsBackoffKey(ds, node.Name)
now := dsc.failedPodsBackoff.Clock.Now()
inBackoff := dsc.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, now)
if inBackoff {
delay := dsc.failedPodsBackoff.Get(backoffKey)
dsc.enqueueDaemonSetAfter(ds, delay)
continue
}
dsc.failedPodsBackoff.Next(backoffKey, now)
podsToDelete = append(podsToDelete, pod.Name)
} else {
daemonPodsRunning = append(daemonPodsRunning, pod)
}
}
// 8、如果節(jié)點(diǎn)上已經(jīng)運(yùn)行 daemon pod 數(shù) > 1极舔,保留運(yùn)行時(shí)間最長(zhǎng)的 pod凤覆,其余的刪除
if len(daemonPodsRunning) > 1 {
sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning))
for i := 1; i < len(daemonPodsRunning); i++ {
podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name)
}
}
// 9、如果 pod 不需要繼續(xù)運(yùn)行但 pod 已存在則需要?jiǎng)h除 pod
case !shouldContinueRunning && exists:
for _, pod := range daemonPods {
if pod.DeletionTimestamp != nil {
continue
}
podsToDelete = append(podsToDelete, pod.Name)
}
}
return nodesNeedingDaemonPods, podsToDelete, nil
}
然后繼續(xù)看 nodeShouldRunDaemonPod
方法的主要邏輯:
- 1拆魏、調(diào)用
NewPod
為該 node 構(gòu)建一個(gè) daemon pod object盯桦; - 2、判斷 ds 是否指定了
.Spec.Template.Spec.NodeName
字段渤刃; - 3拥峦、調(diào)用
dsc.simulate
執(zhí)行GeneralPredicates
預(yù)選算法檢查該 node 是否能夠調(diào)度成功; - 4卖子、判斷
GeneralPredicates
預(yù)選算法執(zhí)行后的reasons
確定wantToRun
,shouldSchedule
,shouldContinueRunning
的值略号;
k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:1337
func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (wantToRun, shouldSchedule, shouldContinueRunning bool, err error) {
// 1、構(gòu)建 daemon pod object
newPod := NewPod(ds, node.Name)
wantToRun, shouldSchedule, shouldContinueRunning = true, true, true
// 2洋闽、判斷 ds 是否指定了 node玄柠,若指定了且不為當(dāng)前 node 直接返回 false
if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
return false, false, false, nil
}
// 3、執(zhí)行 GeneralPredicates 預(yù)選算法
reasons, nodeInfo, err := dsc.simulate(newPod, node, ds)
if err != nil {
......
}
// 4诫舅、檢查預(yù)選算法執(zhí)行的結(jié)果
var insufficientResourceErr error
for _, r := range reasons {
switch reason := r.(type) {
case *predicates.InsufficientResourceError:
insufficientResourceErr = reason
case *predicates.PredicateFailureError:
var emitEvent bool
switch reason {
case
predicates.ErrNodeSelectorNotMatch,
predicates.ErrPodNotMatchHostName,
predicates.ErrNodeLabelPresenceViolated,
predicates.ErrPodNotFitsHostPorts:
return false, false, false, nil
case predicates.ErrTaintsTolerationsNotMatch:
fitsNoExecute, _, err := predicates.PodToleratesNodeNoExecuteTaints(newPod, nil, nodeInfo)
if err != nil {
return false, false, false, err
}
if !fitsNoExecute {
return false, false, false, nil
}
wantToRun, shouldSchedule = false, false
case
predicates.ErrDiskConflict,
predicates.ErrVolumeZoneConflict,
predicates.ErrMaxVolumeCountExceeded,
predicates.ErrNodeUnderMemoryPressure,
predicates.ErrNodeUnderDiskPressure:
shouldSchedule = false
emitEvent = true
case
predicates.ErrPodAffinityNotMatch,
predicates.ErrServiceAffinityViolated:
return false, false, false, fmt.Errorf("unexpected reason: DaemonSet Predicates should not return reason %s", reason.GetReason())
default:
wantToRun, shouldSchedule, shouldContinueRunning = false, false, false
emitEvent = true
}
......
}
}
if shouldSchedule && insufficientResourceErr != nil {
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedPlacementReason, "failed to place pod on %q: %s", node.ObjectMeta.Name, insufficientResourceErr.Error())
shouldSchedule = false
}
return
}
syncNodes
syncNodes
方法主要是為需要 daemon pod 的 node 創(chuàng)建 pod 以及刪除多余的 pod羽利,其主要邏輯為:
- 1、將
createDiff
和deleteDiff
與burstReplicas
進(jìn)行比較刊懈,burstReplicas
默認(rèn)值為 250 即每個(gè) syncLoop 中創(chuàng)建或者刪除的 pod 數(shù)最多為 250 個(gè)这弧,若超過(guò)其值則剩余需要?jiǎng)?chuàng)建或者刪除的 pod 在下一個(gè) syncLoop 繼續(xù)操作; - 2虚汛、將
createDiff
和deleteDiff
寫入到expectations
中当宴; - 3、并發(fā)創(chuàng)建 pod泽疆,創(chuàng)建 pod 有兩種方法:(1)創(chuàng)建的 pod 不經(jīng)過(guò)默認(rèn)調(diào)度器户矢,直接指定了 pod 的運(yùn)行節(jié)點(diǎn)(即設(shè)定
pod.Spec.NodeName
);(2)若啟用了ScheduleDaemonSetPods
feature-gates 特性殉疼,則使用默認(rèn)調(diào)度器進(jìn)行創(chuàng)建 pod梯浪,通過(guò)nodeAffinity
來(lái)保證每個(gè)節(jié)點(diǎn)都運(yùn)行一個(gè) pod捌年; - 4、并發(fā)刪除
deleteDiff
中的所有 pod挂洛;
ScheduleDaemonSetPods
是一個(gè) feature-gates 特性礼预,其出現(xiàn)在 v1.11 中,在 v1.12 中處于 Beta 版本虏劲,v1.17 為 GA 版托酸。最初 daemonset controller 只有一種創(chuàng)建 pod 的方法,即直接指定 pod 的 spec.NodeName
字段柒巫,但是目前這種方式已經(jīng)暴露了許多問(wèn)題励堡,在以后的發(fā)展中社區(qū)還是希望能通過(guò)默認(rèn)調(diào)度器進(jìn)行調(diào)度,所以才出現(xiàn)了第二種方式堡掏,原因主要有以下五點(diǎn):
- 1应结、DaemonSet 無(wú)法感知 node 上資源的變化 (#46935, #58868):當(dāng) pod 第一次因資源不夠無(wú)法創(chuàng)建時(shí),若其他 pod 退出后資源足夠時(shí) DaemonSet 無(wú)法感知到泉唁;
- 2鹅龄、Daemonset 無(wú)法支持 Pod Affinity 和 Pod AntiAffinity 的功能(#29276);
- 3亭畜、在某些功能上需要實(shí)現(xiàn)和 scheduler 重復(fù)的代碼邏輯, 例如:critical pods (#42028), tolerant/taint扮休;
- 4、當(dāng) DaemonSet 的 Pod 創(chuàng)建失敗時(shí)難以 debug拴鸵,例如:資源不足時(shí)肛炮,對(duì)于 pending pod 最好能打一個(gè) event 說(shuō)明;
- 5宝踪、多個(gè)組件同時(shí)調(diào)度時(shí)難以實(shí)現(xiàn)搶占機(jī)制:這也是無(wú)法通過(guò)橫向擴(kuò)展調(diào)度器提高調(diào)度吞吐量的一個(gè)原因侨糟;
更詳細(xì)的原因可以參考社區(qū)的文檔:schedule-DS-pod-by-scheduler.md。
k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:990
func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
......
// 1瘩燥、設(shè)置 burstReplicas
createDiff := len(nodesNeedingDaemonPods)
deleteDiff := len(podsToDelete)
if createDiff > dsc.burstReplicas {
createDiff = dsc.burstReplicas
}
if deleteDiff > dsc.burstReplicas {
deleteDiff = dsc.burstReplicas
}
// 2秕重、寫入到 expectations 中
dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)
errCh := make(chan error, createDiff+deleteDiff)
createWait := sync.WaitGroup{}
generation, err := util.GetTemplateGeneration(ds)
if err != nil {
generation = nil
}
template := util.CreatePodTemplate(ds.Spec.Template, generation, hash)
// 3、并發(fā)創(chuàng)建 pod厉膀,創(chuàng)建的 pod 數(shù)依次為 1, 2, 4, 8, ...
batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
errorCount := len(errCh)
createWait.Add(batchSize)
for i := pos; i < pos+batchSize; i++ {
go func(ix int) {
defer createWait.Done()
var err error
podTemplate := template.DeepCopy()
// 4溶耘、若啟動(dòng)了 ScheduleDaemonSetPods 功能,則通過(guò) kube-scheduler 創(chuàng)建 pod
if utilfeature.DefaultFeatureGate.Enabled(features.ScheduleDaemonSetPods) {
podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])
err = dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate,
ds, metav1.NewControllerRef(ds, controllerKind))
} else {
// 5服鹅、否則直接設(shè)置 pod 的 .spec.NodeName 創(chuàng)建 pod
err = dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, podTemplate,
ds, metav1.NewControllerRef(ds, controllerKind))
}
// 6凳兵、創(chuàng)建 pod 時(shí)忽略 timeout err
if err != nil && errors.IsTimeout(err) {
return
}
if err != nil {
dsc.expectations.CreationObserved(dsKey)
errCh <- err
utilruntime.HandleError(err)
}
}(i)
}
createWait.Wait()
// 7、將創(chuàng)建失敗的 pod 數(shù)記錄到 expectations 中
skippedPods := createDiff - (batchSize + pos)
if errorCount < len(errCh) && skippedPods > 0 {
dsc.expectations.LowerExpectations(dsKey, skippedPods, 0)
break
}
}
// 8企软、并發(fā)刪除 deleteDiff 中的 pod
deleteWait := sync.WaitGroup{}
deleteWait.Add(deleteDiff)
for i := 0; i < deleteDiff; i++ {
go func(ix int) {
defer deleteWait.Done()
if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
dsc.expectations.DeletionObserved(dsKey)
errCh <- err
utilruntime.HandleError(err)
}
}(i)
}
deleteWait.Wait()
errors := []error{}
close(errCh)
for err := range errCh {
errors = append(errors, err)
}
return utilerrors.NewAggregate(errors)
}
RollingUpdate
daemonset update 的方式有兩種 OnDelete
和 RollingUpdate
庐扫,當(dāng)為 OnDelete
時(shí)需要用戶手動(dòng)刪除每一個(gè) pod 后完成更新操作,當(dāng)為 RollingUpdate
時(shí),daemonset controller 會(huì)自動(dòng)控制升級(jí)進(jìn)度形庭。
當(dāng)為 RollingUpdate
時(shí)铅辞,主要邏輯為:
- 1、獲取 daemonset pod 與 node 的映射關(guān)系萨醒;
- 2斟珊、根據(jù)
controllerrevision
的 hash 值獲取所有未更新的 pods; - 3富纸、獲取
maxUnavailable
,numUnavailable
的 pod 數(shù)值囤踩,maxUnavailable
是從 ds 的rollingUpdate
字段中獲取的默認(rèn)值為 1,numUnavailable
的值是通過(guò) daemonset pod 與 node 的映射關(guān)系計(jì)算每個(gè) node 下是否有 available pod 得到的晓褪; - 4堵漱、通過(guò) oldPods 獲取
oldAvailablePods
,oldUnavailablePods
的 pod 列表; - 5辞州、遍歷
oldUnavailablePods
列表將需要?jiǎng)h除的 pod 追加到oldPodsToDelete
數(shù)組中怔锌。oldUnavailablePods
列表中的 pod 分為兩種寥粹,一種處于更新中变过,即刪除狀態(tài),一種處于未更新且異常狀態(tài)涝涤,處于異常狀態(tài)的都需要被刪除媚狰; - 6、遍歷
oldAvailablePods
列表阔拳,此列表中的 pod 都處于正常運(yùn)行狀態(tài)崭孤,根據(jù)maxUnavailable
值確定是否需要?jiǎng)h除該 pod 并將需要?jiǎng)h除的 pod 追加到oldPodsToDelete
數(shù)組中; - 7糊肠、調(diào)用
dsc.syncNodes
刪除oldPodsToDelete
數(shù)組中的 pods辨宠,syncNodes
方法在manage
階段已經(jīng)分析過(guò),此處不再詳述货裹;
rollingUpdate
的結(jié)果是找出需要?jiǎng)h除的 pods 并進(jìn)行刪除嗤形,被刪除的 pod 在下一個(gè) syncLoop 中會(huì)通過(guò) manage
方法使用最新版本的 daemonset template 進(jìn)行創(chuàng)建,整個(gè)滾動(dòng)更新的過(guò)程是通過(guò)先刪除再創(chuàng)建的方式一步步完成更新的弧圆,每次操作都是嚴(yán)格按照 maxUnavailable
的值確定需要?jiǎng)h除的 pod 數(shù)赋兵。
k8s.io/kubernetes/pkg/controller/daemon/update.go:43
func (dsc *DaemonSetsController) rollingUpdate(......) error {
// 1、獲取 daemonset pod 與 node 的映射關(guān)系
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
......
// 2搔预、獲取所有未更新的 pods
_, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods, hash)
// 3霹期、計(jì)算 maxUnavailable, numUnavailable 的 pod 數(shù)值
maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeList, nodeToDaemonPods)
if err != nil {
return fmt.Errorf("couldn't get unavailable numbers: %v", err)
}
oldAvailablePods, oldUnavailablePods := util.SplitByAvailablePods(ds.Spec.MinReadySeconds, oldPods)
// 4、將非 running 狀態(tài)的 pods 加入到 oldPodsToDelete 中
var oldPodsToDelete []string
for _, pod := range oldUnavailablePods {
if pod.DeletionTimestamp != nil {
continue
}
oldPodsToDelete = append(oldPodsToDelete, pod.Name)
}
// 5拯田、根據(jù) maxUnavailable 值確定是否需要?jiǎng)h除 pod
for _, pod := range oldAvailablePods {
if numUnavailable >= maxUnavailable {
break
}
oldPodsToDelete = append(oldPodsToDelete, pod.Name)
numUnavailable++
}
// 6历造、調(diào)用 syncNodes 方法刪除 oldPodsToDelete 數(shù)組中的 pods
return dsc.syncNodes(ds, oldPodsToDelete, []string{}, hash)
}
總結(jié)一下,manage
方法中的主要流程為:
|-> dsc.getNodesToDaemonPods
|
|
manage ---- |-> dsc.podsShouldBeOnNode ---> dsc.nodeShouldRunDaemonPod
|
|
|-> dsc.syncNodes
updateDaemonSetStatus
updateDaemonSetStatus
是 syncDaemonSet
中最后執(zhí)行的方法,主要是用來(lái)計(jì)算 ds status subresource 中的值并更新其 status帕膜。status 如下所示:
status:
currentNumberScheduled: 1 // 已經(jīng)運(yùn)行了 DaemonSet Pod的節(jié)點(diǎn)數(shù)量
desiredNumberScheduled: 1 // 需要運(yùn)行該DaemonSet Pod的節(jié)點(diǎn)數(shù)量
numberMisscheduled: 0 // 不需要運(yùn)行 DeamonSet Pod 但是已經(jīng)運(yùn)行了的節(jié)點(diǎn)數(shù)量
numberReady: 0 // DaemonSet Pod狀態(tài)為Ready的節(jié)點(diǎn)數(shù)量
numberAvailable: 1 // DaemonSet Pod狀態(tài)為Ready且運(yùn)行時(shí)間超過(guò) // Spec.MinReadySeconds 的節(jié)點(diǎn)數(shù)量
numberUnavailable: 0 // desiredNumberScheduled - numberAvailable 的節(jié)點(diǎn)數(shù)量
observedGeneration: 3
updatedNumberScheduled: 1 // 已經(jīng)完成DaemonSet Pod更新的節(jié)點(diǎn)數(shù)量
updateDaemonSetStatus
主要邏輯為:
- 1枣氧、調(diào)用
dsc.getNodesToDaemonPods
獲取已存在 daemon pod 與 node 的映射關(guān)系; - 2垮刹、遍歷所有 node达吞,調(diào)用
dsc.nodeShouldRunDaemonPod
判斷該 node 是否需要運(yùn)行 daemon pod,然后計(jì)算 status 中的部分字段值荒典; - 3酪劫、調(diào)用
storeDaemonSetStatus
更新 ds status subresource; - 4寺董、判斷 ds 是否需要 resync覆糟;
k8s.io/kubernetes/pkg/controller/daemon/daemon_controller.go:1152
func (dsc *DaemonSetsController) updateDaemonSetStatus(......) error {
// 1、獲取已存在 daemon pod 與 node 的映射關(guān)系
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
......
var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable int
for _, node := range nodeList {
// 2遮咖、判斷該 node 是否需要運(yùn)行 daemon pod
wantToRun, _, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
if err != nil {
return err
}
scheduled := len(nodeToDaemonPods[node.Name]) > 0
// 3滩字、計(jì)算 status 中的字段值
if wantToRun {
desiredNumberScheduled++
if scheduled {
currentNumberScheduled++
daemonPods, _ := nodeToDaemonPods[node.Name]
sort.Sort(podByCreationTimestampAndPhase(daemonPods))
pod := daemonPods[0]
if podutil.IsPodReady(pod) {
numberReady++
if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Now()) {
numberAvailable++
}
}
generation, err := util.GetTemplateGeneration(ds)
if err != nil {
generation = nil
}
if util.IsPodUpdated(pod, hash, generation) {
updatedNumberScheduled++
}
}
} else {
if scheduled {
numberMisscheduled++
}
}
}
numberUnavailable := desiredNumberScheduled - numberAvailable
// 4、更新 daemonset status subresource
err = storeDaemonSetStatus(dsc.kubeClient.AppsV1().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable, numberUnavailable, updateObservedGen)
if err != nil {
return fmt.Errorf("error storing status for daemon set %#v: %v", ds, err)
}
// 5御吞、判斷 ds 是否需要 resync
if ds.Spec.MinReadySeconds > 0 && numberReady != numberAvailable {
dsc.enqueueDaemonSetAfter(ds, time.Duration(ds.Spec.MinReadySeconds)*time.Second)
}
return nil
}
最后麦箍,再總結(jié)一下 syncDaemonSet
方法的主要流程:
|-> dsc.getNodesToDaemonPods
|
|
|-> manage -->|-> dsc.podsShouldBeOnNode ---> dsc.nodeShouldRunDaemonPod
| |
| |
syncDaemonSet --> | |-> dsc.syncNodes
|
|-> rollingUpdate
|
|
|-> updateDaemonSetStatus
總結(jié)
在 daemonset controller 中可以看到許多功能都是 deployment 和 statefulset 已有的。在創(chuàng)建 pod 的流程與 replicaset controller 創(chuàng)建 pod 的流程是相似的陶珠,都使用了 expectations
機(jī)制并且限制了在一個(gè) syncLoop 中最多創(chuàng)建或刪除的 pod 數(shù)挟裂。更新方式與 statefulset 一樣都有 OnDelete
和 RollingUpdate
兩種, OnDelete
方式與 statefulset 相似揍诽,都需要手動(dòng)刪除對(duì)應(yīng)的 pod诀蓉,而 RollingUpdate
方式與 statefulset 和 deployment 都有點(diǎn)區(qū)別, RollingUpdate
方式更新時(shí)不支持暫停操作并且 pod 是先刪除再創(chuàng)建的順序進(jìn)行暑脆。版本控制方式與 statefulset 的一樣都是使用 controllerRevision
渠啤。最后要說(shuō)的一點(diǎn)是在 v1.12 及以后的版本中,使用 daemonset 創(chuàng)建的 pod 已不再使用直接指定 .spec.nodeName
的方式繞過(guò)調(diào)度器進(jìn)行調(diào)度添吗,而是走默認(rèn)調(diào)度器通過(guò) nodeAffinity
的方式調(diào)度到每一個(gè)節(jié)點(diǎn)上沥曹。
參考: