kubelet 源碼調(diào)用過程2

http://www.reibang.com/p/95cd1556966a
kubelet 會(huì)監(jiān)聽apiService的Pod變化,事件會(huì)發(fā)送到 listenChannel

//pkg/util/config/config.go
func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
        //遍歷事件
    for update := range listenChannel {
        m.merger.Merge(source, update)
    }
}

listen方法遍歷事件,調(diào)用Merge 持續(xù)處理Pod變更事件

//pkg/kubelet/config/config.go
func (s *podStorage) Merge(source string, change interface{}) error {
    //分組
    adds, updates, deletes, removes, reconciles := s.merge(source, change)
    switch s.mode {
            case PodConfigNotificationIncremental:
                       s.updates <- *adds
}

merge 方法根據(jù)內(nèi)存中 Pod信息 將 change 分組為以下幾種

//pkg/kubelet/types/pod_update.go
const (
    // SET is the current pod configuration.
    SET PodOperation = iota
    // ADD signifies pods that are new to this source.
    ADD
    // DELETE signifies pods that are gracefully deleted from this source.
    DELETE
    // REMOVE signifies pods that have been removed from this source.
    REMOVE
    // UPDATE signifies pods have been updated in this source.
    UPDATE
    // RECONCILE signifies pods that have unexpected status in this source,
    // kubelet should reconcile status with this source.
    RECONCILE
)

s.mode 目前是PodConfigNotificationIncremental,s.updates <- *adds就是將要添加的Pod事件 發(fā)到 updates chan 玄窝。

//pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate,handler SyncHandler
{
select {
  case u, open := <-configCh:
   switch u.Op {     case kubetypes.ADD:
        handler.HandlePodAdditions(u.Pods)
     case kubetypes.UPDATE:

configCh就是updates chan,流程到 handler.HandlePodAdditions(u.Pods)

//pkg/kubelet/kubelet.go
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
   kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
}

dispatchWork 分發(fā)任務(wù)

//pkg/kubelet/kubelet.go
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod has completed termination, dispatchWork will perform no action.
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    // Run the sync in an async worker.
       // 轉(zhuǎn) podWorkers 處理
    kl.podWorkers.UpdatePod(UpdatePodOptions{
        Pod:        pod,
        MirrorPod:  mirrorPod,
        UpdateType: syncType,
        StartTime:  start,
    })
    // Note the number of containers for new pods.
    if syncType == kubetypes.SyncPodCreate {
        metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
    }
}

請(qǐng)求 podWorkers.UpdatePod 參數(shù)是一個(gè)UpdatePodOptions

//pkg/kubelet/pod_workers.go
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
        //獲取當(dāng)前的Pod sync狀態(tài)
    status, ok := p.podSyncStatuses[uid]
    if !ok {
                //獲取失敗說明是第一次
        klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KObj(pod), "podUID", pod.UID)
        status = &podSyncStatus{
            syncedAt: now,
            fullname: kubecontainer.GetPodFullName(pod),
        }
        // if this pod is being synced for the first time, we need to make sure it is an active pod
               //如果是第一次 需要確保Pod是活躍的(PodFailed 或者 PodSucceeded 狀態(tài) )
        if !isRuntimePod && (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) {
            // check to see if the pod is not running and the pod is terminal.
            // If this succeeds then record in the podWorker that it is terminated.
            if statusCache, err := p.podCache.Get(pod.UID); err == nil {
                if isPodStatusCacheTerminal(statusCache) {
                                       //狀態(tài)
                    status = &podSyncStatus{
                        terminatedAt:       now,
                        terminatingAt:      now,
                        syncedAt:           now,
                        startedTerminating: true,
                        finished:           true,
                        fullname:           kubecontainer.GetPodFullName(pod),
                    }
                }
            }
        }
                //更新狀態(tài)
        p.podSyncStatuses[uid] = status
    }
    if status.IsTerminationRequested() {
        if options.UpdateType == kubetypes.SyncPodCreate {
                        //Pod請(qǐng)求創(chuàng)建抗斤,但是當(dāng)前是Terminationing狀態(tài),重置狀態(tài)為 restartRequested =true
            status.restartRequested = true
            klog.V(4).InfoS("Pod is terminating but has been requested to restart with same UID, will be reconciled 
                        later", "pod", klog.KObj(pod), "podUID", pod.UID)
            return
        }
    }

    // once a pod is terminated by UID, it cannot reenter the pod worker (until the UID is purged by housekeeping)
      //Pod的狀態(tài)是Finished的慷吊,不再處理
    if status.IsFinished() {
        klog.V(4).InfoS("Pod is finished processing, no further updates", "pod", klog.KObj(pod), "podUID", pod.UID)
        return
    }
        //任務(wù)類型
        workType = SyncPodWork

    // the desired work we want to be performing
        //任務(wù)對(duì)象
    work := podWork{
        WorkType: workType,
        Options:  options,
    }
if podUpdates, exists = p.podUpdates[uid]; !exists {
         //任務(wù)沒啟動(dòng)則啟動(dòng)
        go func() {
            defer runtime.HandleCrash()
            p.managePodLoop(podUpdates)
        }()

    // dispatch a request to the pod worker if none are running
        // 當(dāng)前Pod沒有處于 Sync
    if !status.IsWorking() {
        status.working = true
        podUpdates <- work
        return
    }
}

managePodLoop 持續(xù)遍歷 podUpdates chan,每個(gè)podUpdate事件調(diào)用 syncPodFn 處理

//pkg/kubelet/pod_workers.go
func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
    for update := range podUpdates {
        //獲取比lastSyncTime 是更新的狀態(tài)
    status, err = p.podCache.GetNewerThan(pod.UID, lastSyncTime)

       //syncPodFn 函數(shù)== /pkg/kubelet/kubelet.go#syncPod函數(shù)
       err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status)
        //重新進(jìn)入隊(duì)列
    p.completeWork(pod, err)
}

syncPodFn 其實(shí)就是 pkg/kubelet/kubelet.go 的 syncPod 函數(shù)

//pkg/kubelet/kubelet.go
func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
                //判斷Pod是否可以被接納(基于)
                runnable := kl.canRunPod(pod)
                //更新pod狀態(tài)  由 pkg/kubelet/status/status_manager.go 負(fù)責(zé)處理
        kl.statusManager.SetPodStatus(pod, apiPodStatus)
                ......
               //創(chuàng)建Pod目錄
               //var/lib/kubelet/pods/{podUID}
              // var/lib/kubelet/pods/{podUID}//volumes
              // var/lib/kubelet/pods/{podUID}//plugins
              if err := kl.makePodDataDirs(pod); err != nil {
                //獲得pullImage 的 secrets
                pullSecrets := kl.getPullSecretsForPod(pod)
                result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
}

syncPod 函數(shù)通過statusManager 更新 Pod狀態(tài), 創(chuàng)建目錄,調(diào)用 pkg/kubelet/kuberuntime/kuberuntime_manager.go 的 SyncPod曹抬,SyncPod 方法溉瓶,官方代碼也給出了提示,共7個(gè)步驟

//pkg/kubelet/kuberuntime/kuberuntime_manager.go 
//  1. Compute sandbox and container changes. 計(jì)算sandbox和 container的變化
//  2. Kill pod sandbox if necessary.  如果有需要就干掉sandbox
//  3. Kill any containers that should not be running. 干掉所有不需要運(yùn)行的container
//  4. Create sandbox if necessary.  如果有需要就創(chuàng)建sandbox
//  5. Create ephemeral containers. 創(chuàng)建臨時(shí)的 container
//  6. Create init containers.   創(chuàng)建init container
//  7. Create normal containers.  創(chuàng)建正常container
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
      //Step1
      .....
}
  1. Compute sandbox and container changes. 計(jì)算sandbox和 container的變化
/Step 1: Compute sandbox and container changes.
    podContainerChanges := m.computePodActions(pod, podStatus)
    klog.V(3).InfoS("computePodActions got for pod", "podActions", podContainerChanges, "pod", klog.KObj(pod))
    if podContainerChanges.CreateSandbox {
        ref, err := ref.GetReference(legacyscheme.Scheme, pod)
        if err != nil {
            klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
        }
        if podContainerChanges.SandboxID != "" {
                       //Pod 變更 需要Kill 和 re-Created
            m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
        } else {
                        //新Pod 需要?jiǎng)?chuàng)建
            klog.V(4).InfoS("SyncPod received new pod, will create a sandbox for it", "pod", klog.KObj(pod))
        }
    }
  1. Kill pod sandbox if necessary. 如果有需要就干掉sandbox
  2. Kill any containers that should not be running. 干掉所有不需要運(yùn)行的container

    if podContainerChanges.KillPod {
// Step 2: Kill the pod if the sandbox has changed.
        if podContainerChanges.CreateSandbox {
            klog.V(4).InfoS("Stopping PodSandbox for pod, will start new one", "pod", klog.KObj(pod))
        } else {
            klog.V(4).InfoS("Stopping PodSandbox for pod, because all other containers are dead", "pod", klog.KObj(pod))
        }
                //通過CRI   kill pod和 container  分別對(duì)應(yīng)調(diào)用CRI 的  StopPodSandbox 和 StopContainer
        killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
        result.AddPodSyncResult(killResult)
        if killResult.Error() != nil {
            klog.ErrorS(killResult.Error(), "killPodWithSyncResult failed")
            return
        }

        if podContainerChanges.CreateSandbox {
            m.purgeInitContainers(pod, podStatus)
        }
    } else {
    // Step 3: kill any running containers in this pod which are not to keep.

Step2和Step3互斥

  1. Create sandbox if necessary. 如果有需要就創(chuàng)建sandbox
//pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
    podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)

Step5,Step6,Step7 分別啟動(dòng) 臨時(shí)容器谤民,init容器堰酿,普通容器

//pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
       // 創(chuàng)建日志目錄 /var/log/pods/{podNamespace}_{podName}_{podUID}
    err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
       //runtimeHandler 具體的容器運(yùn)行時(shí)Handler
        runtimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName)
        //創(chuàng)建PodSandbox
    podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
    
// Step 5: start ephemeral containers
    // These are started "prior" to init containers to allow running ephemeral containers even when there
    // are errors starting an init container. In practice init containers will start first since ephemeral
    // containers cannot be specified on pod creation.
    if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
        for _, idx := range podContainerChanges.EphemeralContainersToStart {
                         //啟動(dòng)臨時(shí)容器
            start("ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
        }
    }

    // Step 6: start the init container.
    if container := podContainerChanges.NextInitContainerToStart; container != nil {
        // Start the next init container.
      //啟動(dòng)init容器
        if err := start("init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
            return
        }

        // Successfully started the container; clear the entry in the failure
        klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
    }

    // Step 7: start containers in podContainerChanges.ContainersToStart.
    for _, idx := range podContainerChanges.ContainersToStart {
                //啟動(dòng)容器
        start("container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
    }

start 方法是啟動(dòng)容器分別為: pull 鏡像,創(chuàng)建容器,啟動(dòng)容器张足,執(zhí)行 生命周期Hook

func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
            // Step 1: pull the image.   (imageManager負(fù)責(zé))
    imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
    // Step 2: create the container.     通過調(diào)用  CRI 的 CreateContainer
containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
    // Step 3: start the container.        通過調(diào)用CRI StartContainer
    err = m.runtimeService.StartContainer(containerID)
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末触创,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子为牍,更是在濱河造成了極大的恐慌哼绑,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,858評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件碉咆,死亡現(xiàn)場(chǎng)離奇詭異抖韩,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)疫铜,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門茂浮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事席揽⊥绮觯” “怎么了?”我有些...
    開封第一講書人閱讀 165,282評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵幌羞,是天一觀的道長(zhǎng)寸谜。 經(jīng)常有香客問我,道長(zhǎng)新翎,這世上最難降的妖魔是什么程帕? 我笑而不...
    開封第一講書人閱讀 58,842評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮地啰,結(jié)果婚禮上愁拭,老公的妹妹穿的比我還像新娘。我一直安慰自己亏吝,他們只是感情好岭埠,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,857評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著蔚鸥,像睡著了一般惜论。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上止喷,一...
    開封第一講書人閱讀 51,679評(píng)論 1 305
  • 那天馆类,我揣著相機(jī)與錄音,去河邊找鬼弹谁。 笑死乾巧,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的预愤。 我是一名探鬼主播沟于,決...
    沈念sama閱讀 40,406評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼植康!你這毒婦竟也來了旷太?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,311評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤销睁,失蹤者是張志新(化名)和其女友劉穎供璧,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體冻记,經(jīng)...
    沈念sama閱讀 45,767評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡睡毒,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了檩赢。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,090評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖贞瞒,靈堂內(nèi)的尸體忽然破棺而出偶房,到底是詐尸還是另有隱情,我是刑警寧澤军浆,帶...
    沈念sama閱讀 35,785評(píng)論 5 346
  • 正文 年R本政府宣布棕洋,位于F島的核電站,受9級(jí)特大地震影響乒融,放射性物質(zhì)發(fā)生泄漏掰盘。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,420評(píng)論 3 331
  • 文/蒙蒙 一赞季、第九天 我趴在偏房一處隱蔽的房頂上張望愧捕。 院中可真熱鬧,春花似錦申钩、人聲如沸次绘。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)邮偎。三九已至,卻和暖如春义黎,著一層夾襖步出監(jiān)牢的瞬間禾进,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評(píng)論 1 271
  • 我被黑心中介騙來泰國(guó)打工廉涕, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留泻云,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,298評(píng)論 3 372
  • 正文 我出身青樓火的,卻偏偏與公主長(zhǎng)得像壶愤,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子馏鹤,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,033評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容