kubelet源碼詳解(一)

kubelet

前言

本文沒有去列出細節(jié)邏輯實現(xiàn),只列出主干邏輯锌雀,代碼中有注解可以簡單閱讀以下蹂随,k8s源碼大多方法以interface層層包裝的形式調(diào)用琉预,一個interface會有較多實現(xiàn)(interface多態(tài)),代碼中的interface的具體實現(xiàn)可以參考《intrface實現(xiàn)分析》姐直,后續(xù)會就一處調(diào)用進行詳細分析

如何debug

我是利用dlv工具遠程調(diào)試的倦淀,遠端搭建了一個3master、3node的k8s集群声畏,停止了一個vm的kubelet撞叽,在vm上用以下命令啟動kubelet源碼,進行調(diào)試插龄。至于其他組件調(diào)試愿棋,也可以通過這種方式。
需要注意:啟動命令中的參數(shù)均牢,多去少補(通過觀察日志)
dlv啟動命令dlv debug --headless --listen ":2345" --log --api-version 2 -- --runtime-cgroups=/systemd/system.slice --kubelet-cgroups=/systemd/system.slice --kubeconfig=/etc/kubernetes/kubelet.conf --pod-infra-container-image=xxx/pause:3.1 --config=/var/lib/kubelet/config.yaml --cgroup-driver=cgroupfs --network-plugin=cni

Kubelet服務(wù)啟動流程

image.png

kubelet服務(wù)入口

cmd/kubelet/kubelet.go,主要負責(zé)校驗參數(shù)糠雨,創(chuàng)建和 api-server 交互的 client 及對運行 kubelet 權(quán)限檢測,啟動 Kubelet 等等

func main() {
    rand.Seed(time.Now().UnixNano())

    command := app.NewKubeletCommand(server.SetupSignalHandler())
    logs.InitLogs()
    defer logs.FlushLogs()
    ...
}

具體實現(xiàn)cmd/kubelet/app/server.go

func NewKubeletCommand(stopCh <-chan struct{}) *cobra.Command {
    cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
    cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
    //設(shè)置默認的KubeletFlags的值,包括docker,證書路徑徘跪,插件目錄甘邀,包括CIDR等等信息
    kubeletFlags := options.NewKubeletFlags()
    //生成kubelet默認配置文件
    kubeletConfig, err := options.NewKubeletConfiguration()
    // programmer error
    if err != nil {
        klog.Fatal(err)
    }

    ...
        Run: func(cmd *cobra.Command, args []string) {

            // use dynamic kubelet config, if enabled
            var kubeletConfigController *dynamickubeletconfig.Controller
            if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
                var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
                dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
                    func(kc *kubeletconfiginternal.KubeletConfiguration) error {
                        // Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence,
                        // so that we get a complete validation at the same point where we can decide to reject dynamic config.
                        // This fixes the flag-precedence component of issue #63305.
                        // See issue #56171 for general details on flag precedence.
                        return kubeletConfigFlagPrecedence(kc, args)
                    })
                if err != nil {
                    klog.Fatal(err)
                }
                // If we should just use our existing, local config, the controller will return a nil config
                if dynamicKubeletConfig != nil {
                    kubeletConfig = dynamicKubeletConfig
                    // Note: flag precedence was already enforced in the controller, prior to validation,
                    // by our above transform function. Now we simply update feature gates from the new config.
                    if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
                        klog.Fatal(err)
                    }
                }
            }

            // construct a KubeletServer from kubeletFlags and kubeletConfig
            kubeletServer := &options.KubeletServer{
                KubeletFlags:         *kubeletFlags,
                KubeletConfiguration: *kubeletConfig,
            }

            // use kubeletServer to construct the default KubeletDeps
            kubeletDeps, err := UnsecuredDependencies(kubeletServer)
            if err != nil {
                klog.Fatal(err)
            }

            // add the kubelet config controller to kubeletDeps
            kubeletDeps.KubeletConfigController = kubeletConfigController

            // start the experimental docker shim, if enabled
            if kubeletServer.KubeletFlags.ExperimentalDockershim {
                if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
                    klog.Fatal(err)
                }
                return
            }

            // run the kubelet
            klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
            if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
                klog.Fatal(err)
            }
        },
    }

...
}

func NewKubeletCommand中

// use dynamic kubelet config, if enabled
            var kubeletConfigController *dynamickubeletconfig.Controller
            if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
                var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
                dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
                    func(kc *kubeletconfiginternal.KubeletConfiguration) error {
                        // Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence,
                        // so that we get a complete validation at the same point where we can decide to reject dynamic config.
                        // This fixes the flag-precedence component of issue #63305.
                        // See issue #56171 for general details on flag precedence.
                        return kubeletConfigFlagPrecedence(kc, args)
                    })
                if err != nil {
                    klog.Fatal(err)
                }
                // If we should just use our existing, local config, the controller will return a nil config
                if dynamicKubeletConfig != nil {
                    kubeletConfig = dynamicKubeletConfig
                    // Note: flag precedence was already enforced in the controller, prior to validation,
                    // by our above transform function. Now we simply update feature gates from the new config.
                    if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
                        klog.Fatal(err)
                    }
                }
            }

該功能主要是新建一個watch的功能,主要是用來watch kubelet的配置文件是否改變真椿,如果已經(jīng)改變鹃答,那么就重新load kubelet的配置文件 用的是kubernetes常用到的Controller,也就是Informer的架構(gòu)乎澄,watch ConfigMap對象突硝,進一步查看BootstrapKubeletConfigController

// BootstrapKubeletConfigController constructs and bootstrap a configuration controller
func BootstrapKubeletConfigController(dynamicConfigDir string, transform dynamickubeletconfig.TransformFunc) (*kubeletconfiginternal.KubeletConfiguration, *dynamickubeletconfig.Controller, error) {
    if !utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) {
        return nil, nil, fmt.Errorf("failed to bootstrap Kubelet config controller, you must enable the DynamicKubeletConfig feature gate")
    }
    if len(dynamicConfigDir) == 0 {
        return nil, nil, fmt.Errorf("cannot bootstrap Kubelet config controller, --dynamic-config-dir was not provided")
    }

    // compute absolute path and bootstrap controller
    dir, err := filepath.Abs(dynamicConfigDir)
    if err != nil {
        return nil, nil, fmt.Errorf("failed to get absolute path for --dynamic-config-dir=%s", dynamicConfigDir)
    }
    // get the latest KubeletConfiguration checkpoint from disk, or return the default config if no valid checkpoints exist
    c := dynamickubeletconfig.NewController(dir, transform)
    kc, err := c.Bootstrap()
    if err != nil {
        return nil, nil, fmt.Errorf("failed to determine a valid configuration, error: %v", err)
    }
    return kc, c, nil
}

-------------------------
// /pkg/kubelet/kubeletconfig/controller.go
// Bootstrap attempts to return a valid KubeletConfiguration based on the configuration of the Controller,
// or returns an error if no valid configuration could be produced. Bootstrap should be called synchronously before StartSync.
// If the pre-existing local configuration should be used, Bootstrap returns a nil config.
func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error)
graph LR
NewKubeletCommand-->RUN
RUN-->run

這個函數(shù)主要是用來啟動各種以來的服務(wù)以及kubelet的監(jiān)聽端口

func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) error {
    // To help debugging, immediately log version
    klog.Infof("Version: %+v", version.Get())
    if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
        return fmt.Errorf("failed OS init: %v", err)
    }
    //主要啟動函數(shù)
    if err := run(s, kubeDeps, stopCh); err != nil {
        return fmt.Errorf("failed to run Kubelet: %v", err)
    }
    return nil
}
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
    ...
    //啟動前參數(shù)準備完備,進入啟動流程 
    if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
        return err
    }
}
graph LR
準備完成-->RunKubelet
RunKubelet-->CreateAndInitKubelet
RunKubelet-->startKubelet

在 RunKubelet 中主要做 CreateAndInitKubelet 和 startKubelet 兩件事:

func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
    ...
    k, err := createAndInitKubelet(...)
    ...
    // process pods and exit.
    if runOnce {
        if _, err := k.RunOnce(podCfg.Updates()); err != nil {
            return fmt.Errorf("runonce failed: %v", err)
        }
        klog.Info("Started kubelet as runonce")
    } else {
        startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
        klog.Info("Started kubelet")
    }
    return nil
}

createAndInitKubelet

func createAndInitKubelet(...){
    ...
    k, err = kubelet.NewMainKubelet(...)
    if err != nil {
        return nil, err
    }
    k.BirthCry()

    k.StartGarbageCollection()
}

NewMainKubelet 實例化一個 kubelet 對象置济,并對 kubelet 內(nèi)部各個 component 進行初始化工作:

  • containerGC // 容器的垃圾回收
  • statusManager // pod 狀態(tài)的管理
  • imageManager // 鏡像的管路
  • probeManager // 容器健康檢測
  • gpuManager // GPU 的支持
  • PodCache // Pod 緩存的管理
  • secretManager // secret 資源的管理
  • configMapManager // configMap 資源的管理
  • InitNetworkPlugin // 網(wǎng)絡(luò)插件的初始化
  • PodManager // 對 pod 的管理, e.g., CRUD
  • makePodSourceConfig // pod 元數(shù)據(jù)的來源 (FILE, URL, api-server)
  • diskSpaceManager // 磁盤空間的管理
  • ContainerRuntime // 容器運行時的選擇(docker 或 rkt)
  • BirthCry // 通知 api-server 服務(wù) kubelet 啟動
  • StartGarbageCollection // 開啟垃圾回收服務(wù)

startKubelet

func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
    // start the kubelet
    go wait.Until(func() {
        k.Run(podCfg.Updates())
    }, 0, wait.NeverStop)

    // start the kubelet server
    //獲取 pod 及 node 的相關(guān)信息解恰,后續(xù)會更新到etcd
    if enableServer {
        go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)

    }
    if kubeCfg.ReadOnlyPort > 0 {
        go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
    }
    if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
        go k.ListenAndServePodResources()
    }
}
graph LR
startKubelet-->Run

pkg/kubelet/kubelet.go

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {   //啟動日志服務(wù)
    if kl.logServer == nil {
        kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
    }
    
    if kl.kubeClient == nil {
        klog.Warning("No api server defined - no node status update will be sent.")
    }

    // Start the cloud provider sync manager
    if kl.cloudResourceSyncManager != nil {
        go kl.cloudResourceSyncManager.Run(wait.NeverStop)
    }
    
    //初始化模塊,包括volume 數(shù)據(jù)目錄 容器日志
    //啟動鏡像管理 啟動證書管理 OOM管理
    //啟動資源分析器

    // Start volume manager
    go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

        // Start syncing node status immediately, this may set up things the runtime needs to run.
        go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
        go kl.fastStatusUpdateOnce()
        // start syncing lease
            go kl.nodeLeaseController.Run(wait.NeverStop)
    go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

    // Start loop to sync iptables util rules
        go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)

    // Start a goroutine responsible for killing pods (that are not properly
    // handled by pod workers).
    go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)

    // Start component sync loops.
    kl.statusManager.Start() //狀態(tài)管理
    kl.probeManager.Start() //探針管理

    // Start syncing RuntimeClasses if enabled.
        kl.runtimeClassManager.Start(wait.NeverStop)

    // Start the pod lifecycle event generator.
    kl.pleg.Start() //啟動容器的生命周期
    kl.syncLoop(updates, kl) //循環(huán)同步
}

==至此kubelet啟動完成==

graph LR
Run-->synLoop

syncLoop

syncLoop is the main loop for processing changes. It watches for changes from three channels (file, apiserver, and http) and creates a union of them. For any new change seen, will run a sync against desired state and running state. If no changes are seen to the configuration, will synchronize the last known desired state every sync-frequency seconds. Never returns.

func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
    // 準備工作
    for{
        time.Sleep(duration)
        kl.syncLoopIteration(...)
        ...
    }
}
graph LR
syncLoop-->syncLoopIteration

syncLoopIteration 接收來自多個方向的消息,run a sync against desired state and running state

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    select {
    case u, open := <-configCh:
    case e := <-plegCh:...
    case <-syncCh:...
    case update := <-kl.livenessManager.Updates():...
    case <-housekeepingCh:...
    }
    return true
}

syncLoopIteration reads from various channels and dispatches pods to the given handler. 以configCh 為例

switch u.Op {
case kubetypes.ADD:
    handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
    handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
    handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
    handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
    // DELETE is treated as a UPDATE because of graceful deletion.
    handler.HandlePodUpdates(u.Pods)
case kubetypes.RESTORE:
    // These are pods restored from the checkpoint. Treat them as new pods.
    handler.HandlePodAdditions(u.Pods)
}

最終的立足點還是 syncHandler(還是Kubelet 自己實現(xiàn)的)浙于,下面分析下 HandlePodAdditions

新建 pod開始

image.png

代碼中去掉了跟創(chuàng)建 無關(guān)的部分护盈,刪減了日志、錯誤校驗等

//file:/pkg/kubelet/kubelet.go---2026

func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
    sort.Sort(sliceutils.PodsByCreationTime(pods))
    for _, pod := range pods {
        ...
        // Always add the pod to the pod manager. Kubelet relies on the pod manager as the source of truth for the desired state. If a pod does
        // not exist in the pod manager, it means that it has been deleted in the apiserver and no action (other than cleanup) is required.
        kl.podManager.AddPod(pod)
        ...
        mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
        kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
        kl.probeManager.AddPod(pod)
    }
}

kl.podManager.AddPodkl.probeManager.AddPod(pod) 都只是將pod 納入跟蹤羞酗,真正創(chuàng)建pod的是dispatchWork腐宋,然后又轉(zhuǎn)回 kl.syncPod

//file:/pkg/kubelet/kubelet.go---1464

func (kl *Kubelet) syncPod(o syncPodOptions) error {
    ...
    // Generate final API pod status with pod and status manager status
    apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
    existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
    if runnable := kl.canRunPod(pod); !runnable.Admit {...}
    // Update status in the status manager
    kl.statusManager.SetPodStatus(pod, apiPodStatus)
    // Create Cgroups for the pod and apply resource parameters to them if cgroups-per-qos flag is enabled.
    pcm := kl.containerManager.NewPodContainerManager()
    // Make data directories for the pod
    kl.makePodDataDirs(pod);
    // Fetch the pull secrets for the pod
    pullSecrets := kl.getPullSecretsForPod(pod)
    // Call the container runtime's SyncPod callback
    result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
    ...
}

kubeGenericRuntimeManager.syncPod

//file:/pkg/kubelet/kuberuntime/kuberuntime_manager.go---618

func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
    // Step 1: Compute sandbox and container changes.
    podContainerChanges := m.computePodActions(pod, podStatus)
    ...
    // Step 4: Create a sandbox for the pod if necessary.
    podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)       
    // Get podSandboxConfig for containers to start.
    podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
    // Step 5: start the init container.
    if container := podContainerChanges.NextInitContainerToStart; container != nil {
        // Start the next init container.
        msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit); 
    }
    // Step 6: start containers in podContainerChanges.ContainersToStart.
    for _, idx := range podContainerChanges.ContainersToStart {
        container := &pod.Spec.Containers[idx]
        msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular); 
    }
    ...
}

m.createPodSandbox 和 startContainer

pkg/kubelet/kuberuntime/包中,kuberuntime_manager.go 定義了 kubeGenericRuntimeManager struct 及其接口方法實現(xiàn)檀轨,但接口方法的內(nèi)部依賴方法 分散在 package 下的其它go文件中胸竞。其本質(zhì)是將 一個“類方法”分散在了多個go 文件中,多個文件合起來 組成了kubeGenericRuntimeManager 類實現(xiàn)参萄。

這個方法的內(nèi)容也非常多卫枝,它的主要邏輯是先比較傳遞過來的 pod 信息和實際運行的 pod(對于新建 pod 來說后者為空),計算出兩者的差別讹挎,也就是需要更新的地方校赤。然后先創(chuàng)建 infra 容器吆玖,配置好網(wǎng)絡(luò),然后再逐個創(chuàng)建應(yīng)用容器马篮。

func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, containerType kubecontainer.ContainerType) (string, error) {
    // Step 1: pull the image.
    imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)
    // Step 2: create the container.
    ref, err := kubecontainer.GenerateContainerRef(pod, container)
    containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, containerType)
    containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
    err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
    // Step 3: start the container.
    err = m.runtimeService.StartContainer(containerID)
    // Step 4: execute the post start hook.
    msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
}
image.png
  • 默認sandbox image為 gcr.io/google_containers/pause-amd64:3.0
  • ensureImageExists 當(dāng)鏡像不存在是進行拉取工作
  • CreateContainer最終通過docker API POST方法調(diào)用 /containers/create
  • CreateCheckpoint寫入文件沾乘,文件名為容器ID
  • StartContainer最終通過docker API POST方法調(diào)用 /containers/containerID/start
  • 重新寫入resolv.conf由docker產(chǎn)生,pod里的容器共享
  • InspectContainer最終通過docker API GET方法調(diào)用 /containers/containerID/json
  • 為容器建立網(wǎng)絡(luò)浑测,通過CNI建立網(wǎng)絡(luò)意鲸,建立loopback接口,建立網(wǎng)絡(luò)設(shè)置為混雜模式(調(diào)用命令ip link show dev / ip set bridgeName promisc on)尽爆。

以下為debug怎顾,PodSandbox從create到start的過程,一直到請求發(fā)送docker結(jié)束漱贱,其他調(diào)用與本次調(diào)用相似槐雾,可以自行debug

func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
    podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
    if err != nil {
        message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
        klog.Error(message)
        return "", message, err
    }

    // Create pod logs directory
    err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
    if err != nil {
        message := fmt.Sprintf("Create pod log directory for pod %q failed: %v", format.Pod(pod), err)
        klog.Errorf(message)
        return "", message, err
    }

    runtimeHandler := ""
    if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && m.runtimeClassManager != nil {
        runtimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName)
        if err != nil {
            message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
            return "", message, err
        }
        if runtimeHandler != "" {
            klog.V(2).Infof("Running pod %s with RuntimeHandler %q", format.Pod(pod), runtimeHandler)
        }
    }
//啟動RunPodSandbox
    podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
    if err != nil {
        message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
        klog.Error(message)
        return "", message, err
    }

    return podSandBoxID, "", nil
}
/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/kuberuntime/instrumented_services.go

func (in instrumentedRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
    const operation = "run_podsandbox"
    startTime := time.Now()
    defer recordOperation(operation, startTime)
    defer metrics.RunPodSandboxDuration.WithLabelValues(runtimeHandler).Observe(metrics.SinceInSeconds(startTime))

    out, err := in.service.RunPodSandbox(config, runtimeHandler)
    recordError(operation, err)
    if err != nil {
        metrics.RunPodSandboxErrors.WithLabelValues(runtimeHandler).Inc()
    }
    return out, err
}
/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/remote/remote_runtime.go

func (r *RemoteRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
    // Use 2 times longer timeout for sandbox operation (4 mins by default)
    // TODO: Make the pod sandbox timeout configurable.
    ctx, cancel := getContextWithTimeout(r.timeout * 2)
    defer cancel()

    resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
        Config:         config,
        RuntimeHandler: runtimeHandler,
    })
    if err != nil {
        klog.Errorf("RunPodSandbox from runtime service failed: %v", err)
        return "", err
    }

    if resp.PodSandboxId == "" {
        errorMessage := fmt.Sprintf("PodSandboxId is not set for sandbox %q", config.GetMetadata())
        klog.Errorf("RunPodSandbox failed: %s", errorMessage)
        return "", errors.New(errorMessage)
    }

    return resp.PodSandboxId, nil
}

kubelet cri grpc-client實現(xiàn)

/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2/api.pb.go

func (c *runtimeServiceClient) RunPodSandbox(ctx context.Context, in *RunPodSandboxRequest, opts ...grpc.CallOption) (*RunPodSandboxResponse, error) {
    out := new(RunPodSandboxResponse)
    err := grpc.Invoke(ctx, "/runtime.v1alpha2.RuntimeService/RunPodSandbox", in, out, c.cc, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

grpc-server實現(xiàn)在dockershim中,繼續(xù)

/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/dockershim/docker_sandbox.go

func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
    config := r.GetConfig()

    // Step 1: Pull the image for the sandbox.
    image := defaultSandboxImage
    podSandboxImage := ds.podSandboxImage
    if len(podSandboxImage) != 0 {
        image = podSandboxImage
    }

    // NOTE: To use a custom sandbox image in a private repository, users need to configure the nodes with credentials properly.
    // see: http://kubernetes.io/docs/user-guide/images/#configuring-nodes-to-authenticate-to-a-private-repository
    // Only pull sandbox image when it's not present - v1.PullIfNotPresent.
    if err := ensureSandboxImageExists(ds.client, image); err != nil {
        return nil, err
    }

    // Step 2: Create the sandbox container.
    if r.GetRuntimeHandler() != "" {
        return nil, fmt.Errorf("RuntimeHandler %q not supported", r.GetRuntimeHandler())
    }
    createConfig, err := ds.makeSandboxDockerConfig(config, image)
    if err != nil {
        return nil, fmt.Errorf("failed to make sandbox docker config for pod %q: %v", config.Metadata.Name, err)
    }
    createResp, err := ds.client.CreateContainer(*createConfig)
    if err != nil {
        createResp, err = recoverFromCreationConflictIfNeeded(ds.client, *createConfig, err)
    }

    if err != nil || createResp == nil {
        return nil, fmt.Errorf("failed to create a sandbox for pod %q: %v", config.Metadata.Name, err)
    }
    resp := &runtimeapi.RunPodSandboxResponse{PodSandboxId: createResp.ID}

    ds.setNetworkReady(createResp.ID, false)
    defer func(e *error) {
        // Set networking ready depending on the error return of
        // the parent function
        if *e == nil {
            ds.setNetworkReady(createResp.ID, true)
        }
    }(&err)

    // Step 3: Create Sandbox Checkpoint.
    if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {
        return nil, err
    }

    // Step 4: Start the sandbox container.
    // Assume kubelet's garbage collector would remove the sandbox later, if
    // startContainer failed.
    err = ds.client.StartContainer(createResp.ID)
    if err != nil {
        return nil, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.Name, err)
    }

    // Rewrite resolv.conf file generated by docker.
    // NOTE: cluster dns settings aren't passed anymore to docker api in all cases,
    // not only for pods with host network: the resolver conf will be overwritten
    // after sandbox creation to override docker's behaviour. This resolv.conf
    // file is shared by all containers of the same pod, and needs to be modified
    // only once per pod.
    if dnsConfig := config.GetDnsConfig(); dnsConfig != nil {
        containerInfo, err := ds.client.InspectContainer(createResp.ID)
        if err != nil {
            return nil, fmt.Errorf("failed to inspect sandbox container for pod %q: %v", config.Metadata.Name, err)
        }

        if err := rewriteResolvFile(containerInfo.ResolvConfPath, dnsConfig.Servers, dnsConfig.Searches, dnsConfig.Options); err != nil {
            return nil, fmt.Errorf("rewrite resolv.conf failed for pod %q: %v", config.Metadata.Name, err)
        }
    }

    // Do not invoke network plugins if in hostNetwork mode.
    if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE {
        return resp, nil
    }

    // Step 5: Setup networking for the sandbox.
    // All pod networking is setup by a CNI plugin discovered at startup time.
    // This plugin assigns the pod ip, sets up routes inside the sandbox,
    // creates interfaces etc. In theory, its jurisdiction ends with pod
    // sandbox networking, but it might insert iptables rules or open ports
    // on the host as well, to satisfy parts of the pod spec that aren't
    // recognized by the CNI standard yet.
    cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID)
    networkOptions := make(map[string]string)
    if dnsConfig := config.GetDnsConfig(); dnsConfig != nil {
        // Build DNS options.
        dnsOption, err := json.Marshal(dnsConfig)
        if err != nil {
            return nil, fmt.Errorf("failed to marshal dns config for pod %q: %v", config.Metadata.Name, err)
        }
        networkOptions["dns"] = string(dnsOption)
    }
    err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations, networkOptions)
    if err != nil {
        errList := []error{fmt.Errorf("failed to set up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err)}

        // Ensure network resources are cleaned up even if the plugin
        // succeeded but an error happened between that success and here.
        err = ds.network.TearDownPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID)
        if err != nil {
            errList = append(errList, fmt.Errorf("failed to clean up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err))
        }

        err = ds.client.StopContainer(createResp.ID, defaultSandboxGracePeriod)
        if err != nil {
            errList = append(errList, fmt.Errorf("failed to stop sandbox container %q for pod %q: %v", createResp.ID, config.Metadata.Name, err))
        }

        return resp, utilerrors.NewAggregate(errList)
    }

    return resp, nil
}

/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker/instrumented_client.go

func (in instrumentedInterface) StartContainer(id string) error {
    const operation = "start_container"
    defer recordOperation(operation, time.Now())

    err := in.client.StartContainer(id)
    recordError(operation, err)
    return err
}
/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker/kube_docker_client.go

func (d *kubeDockerClient) StartContainer(id string) error {
    ctx, cancel := d.getTimeoutContext()
    defer cancel()
    err := d.client.ContainerStart(ctx, id, dockertypes.ContainerStartOptions{})
    if ctxErr := contextError(ctx); ctxErr != nil {
        return ctxErr
    }
    return err
}

grpc調(diào)用到docker至此結(jié)束

/workspace/goWorkspace/src/k8s.io/kubernetes/vendor/github.com/docker/docker/client/container_start.go

func (cli *Client) ContainerStart(ctx context.Context, containerID string, options types.ContainerStartOptions) error {
    query := url.Values{}
    if len(options.CheckpointID) != 0 {
        query.Set("checkpoint", options.CheckpointID)
    }
    if len(options.CheckpointDir) != 0 {
        query.Set("checkpoint-dir", options.CheckpointDir)
    }

    resp, err := cli.post(ctx, "/containers/"+containerID+"/start", query, nil, nil)
    ensureReaderClosed(resp)
    return err
}

參考資料:

http://qiankunli.github.io/2018/12/31/kubernetes_source_kubelet.html#%E6%96%B0%E5%BB%BA-pod
https://cizixs.com/2017/06/07/kubelet-source-code-analysis-part-2/
https://toutiao.io/posts/z2e88b/preview

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末幅狮,一起剝皮案震驚了整個濱河市募强,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌崇摄,老刑警劉巖擎值,帶你破解...
    沈念sama閱讀 206,602評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異逐抑,居然都是意外死亡鸠儿,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評論 2 382
  • 文/潘曉璐 我一進店門厕氨,熙熙樓的掌柜王于貴愁眉苦臉地迎上來进每,“玉大人,你說我怎么就攤上這事命斧√锿恚” “怎么了?”我有些...
    開封第一講書人閱讀 152,878評論 0 344
  • 文/不壞的土叔 我叫張陵国葬,是天一觀的道長贤徒。 經(jīng)常有香客問我,道長汇四,這世上最難降的妖魔是什么接奈? 我笑而不...
    開封第一講書人閱讀 55,306評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮船殉,結(jié)果婚禮上鲫趁,老公的妹妹穿的比我還像新娘。我一直安慰自己利虫,他們只是感情好挨厚,可當(dāng)我...
    茶點故事閱讀 64,330評論 5 373
  • 文/花漫 我一把揭開白布堡僻。 她就那樣靜靜地躺著,像睡著了一般疫剃。 火紅的嫁衣襯著肌膚如雪钉疫。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,071評論 1 285
  • 那天巢价,我揣著相機與錄音牲阁,去河邊找鬼。 笑死壤躲,一個胖子當(dāng)著我的面吹牛城菊,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播碉克,決...
    沈念sama閱讀 38,382評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼凌唬,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了漏麦?” 一聲冷哼從身側(cè)響起客税,我...
    開封第一講書人閱讀 37,006評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎撕贞,沒想到半個月后更耻,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,512評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡捏膨,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,965評論 2 325
  • 正文 我和宋清朗相戀三年秧均,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片脊奋。...
    茶點故事閱讀 38,094評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡熬北,死狀恐怖疙描,靈堂內(nèi)的尸體忽然破棺而出诚隙,到底是詐尸還是另有隱情,我是刑警寧澤起胰,帶...
    沈念sama閱讀 33,732評論 4 323
  • 正文 年R本政府宣布久又,位于F島的核電站,受9級特大地震影響效五,放射性物質(zhì)發(fā)生泄漏地消。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,283評論 3 307
  • 文/蒙蒙 一畏妖、第九天 我趴在偏房一處隱蔽的房頂上張望脉执。 院中可真熱鬧,春花似錦戒劫、人聲如沸半夷。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽巫橄。三九已至淘邻,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間湘换,已是汗流浹背宾舅。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留彩倚,地道東北人筹我。 一個月前我還...
    沈念sama閱讀 45,536評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像帆离,于是被迫代替她去往敵國和親崎溃。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,828評論 2 345