kubelet 啟動流程分析

本來這篇文章會繼續(xù)講述 kubelet 中的主要模塊教寂,但由于網(wǎng)友反饋能不能先從 kubelet 的啟動流程開始抒抬,kubelet 的啟動流程在很久之前基于 v1.12 寫過一篇文章菱魔,對比了 v1.16 中的啟動流程變化不大,但之前的文章寫的比較簡潔,本文會重新分析 kubelet 的啟動流程挡篓。

Kubelet 啟動流程

kubernetes 版本:v1.16

kubelet 的啟動比較復(fù)雜,首先還是把 kubelet 的啟動流程圖放在此處帚称,便于在后文中清楚各種調(diào)用的流程:

NewKubeletCommand

首先從 kubelet 的 main 函數(shù)開始官研,其中調(diào)用的 NewKubeletCommand 方法主要負(fù)責(zé)獲取配置文件中的參數(shù),校驗(yàn)參數(shù)以及為參數(shù)設(shè)置默認(rèn)值闯睹。主要邏輯為:

  • 1戏羽、解析命令行參數(shù);
  • 2楼吃、為 kubelet 初始化 feature gates 參數(shù)始花;
  • 3妄讯、加載 kubelet 配置文件;
  • 4衙荐、校驗(yàn)配置文件中的參數(shù)捞挥;
  • 5、檢查 kubelet 是否啟用動態(tài)配置功能忧吟;
  • 6砌函、初始化 kubeletDeps,kubeletDeps 包含 kubelet 運(yùn)行所必須的配置溜族,是為了實(shí)現(xiàn) dependency injection讹俊,其目的是為了把 kubelet 依賴的組件對象作為參數(shù)傳進(jìn)來,這樣可以控制 kubelet 的行為煌抒;
  • 7仍劈、調(diào)用 Run 方法;

k8s.io/kubernetes/cmd/kubelet/app/server.go:111

func NewKubeletCommand() *cobra.Command {
    cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
    cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)

    // 1寡壮、kubelet配置分兩部分:
    // KubeletFlag: 指那些不允許在 kubelet 運(yùn)行時進(jìn)行修改的配置集贩疙,或者不能在集群中各個 Nodes 之間共享的配置集。
    // KubeletConfiguration: 指可以在集群中各個Nodes之間共享的配置集况既,可以進(jìn)行動態(tài)配置这溅。
    kubeletFlags := options.NewKubeletFlags()
    kubeletConfig, err := options.NewKubeletConfiguration()
    if err != nil {
        klog.Fatal(err)
    }

    cmd := &cobra.Command{
        Use: componentKubelet,
        DisableFlagParsing: true,
        ......
        Run: func(cmd *cobra.Command, args []string) {
            // 2、解析命令行參數(shù)
            if err := cleanFlagSet.Parse(args); err != nil {
                cmd.Usage()
                klog.Fatal(err)
            }
            ......

            verflag.PrintAndExitIfRequested()
            utilflag.PrintFlags(cleanFlagSet)

            // 3棒仍、初始化 feature gates 配置
            if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
                klog.Fatal(err)
            }

            if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
                klog.Fatal(err)
            }

            if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {
                klog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that      remote runtime instead")
            }

            // 4悲靴、加載 kubelet 配置文件
            if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
                kubeletConfig, err = loadConfigFile(configFile)
                ......
            }
            // 5、校驗(yàn)配置文件中的參數(shù)
            if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {
                klog.Fatal(err)
            }

            // 6莫其、檢查 kubelet 是否啟用動態(tài)配置功能
            var kubeletConfigController *dynamickubeletconfig.Controller
            if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
                var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
                dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
                    func(kc *kubeletconfiginternal.KubeletConfiguration) error {
                        return kubeletConfigFlagPrecedence(kc, args)
                    })
                if err != nil {
                    klog.Fatal(err)
                }
                if dynamicKubeletConfig != nil {
                    kubeletConfig = dynamicKubeletConfig
                    if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
                        klog.Fatal(err)
                    }
                }
            }
            kubeletServer := &options.KubeletServer{
                KubeletFlags:         *kubeletFlags,
                KubeletConfiguration: *kubeletConfig,
            }
            // 7癞尚、初始化 kubeletDeps
            kubeletDeps, err := UnsecuredDependencies(kubeletServer)
            if err != nil {
                klog.Fatal(err)
            }

            kubeletDeps.KubeletConfigController = kubeletConfigController
            stopCh := genericapiserver.SetupSignalHandler()
            if kubeletServer.KubeletFlags.ExperimentalDockershim {
                if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
                    klog.Fatal(err)
                }
                return
            }

            // 8、調(diào)用 Run 方法
            if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
                klog.Fatal(err)
            }
        },
    }
    kubeletFlags.AddFlags(cleanFlagSet)
    options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)
    options.AddGlobalFlags(cleanFlagSet)
    ......

    return cmd
}

Run

該方法中僅僅調(diào)用 run 方法執(zhí)行后面的啟動邏輯乱陡。

k8s.io/kubernetes/cmd/kubelet/app/server.go:408

func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) error {
    if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
        return fmt.Errorf("failed OS init: %v", err)
    }
    if err := run(s, kubeDeps, stopCh); err != nil {
        return fmt.Errorf("failed to run Kubelet: %v", err)
    }
    return nil
}

run

run 方法中主要是為 kubelet 的啟動做一些基本的配置及檢查工作浇揩,主要邏輯為:

  • 1、為 kubelet 設(shè)置默認(rèn)的 FeatureGates憨颠,kubelet 所有的 FeatureGates 可以通過命令參數(shù)查看临燃,k8s 中處于 Alpha 狀態(tài)的 FeatureGates 在組件啟動時默認(rèn)關(guān)閉,處于 Beta 和 GA 狀態(tài)的默認(rèn)開啟烙心;
  • 2、校驗(yàn) kubelet 的參數(shù)乏沸;
  • 3淫茵、嘗試獲取 kubelet 的 lock file,需要在 kubelet 啟動時指定 --exit-on-lock-contention--lock-file蹬跃,該功能處于 Alpha 版本默認(rèn)為關(guān)閉狀態(tài)匙瘪;
  • 4铆铆、將當(dāng)前的配置文件注冊到 http server /configz URL 中;
  • 5丹喻、檢查 kubelet 啟動模式是否為 standalone 模式薄货,此模式下不會和 apiserver 交互,主要用于 kubelet 的調(diào)試碍论;
  • 6谅猾、初始化 kubeDeps,kubeDeps 中包含 kubelet 的一些依賴鳍悠,主要有 KubeClient税娜、EventClientHeartbeatClient藏研、Auth敬矩、cadvisorContainerManager蠢挡;
  • 7弧岳、檢查是否以 root 用戶啟動;
  • 8业踏、為進(jìn)程設(shè)置 oom 分?jǐn)?shù)禽炬,默認(rèn)為 -999,分?jǐn)?shù)范圍為 [-1000, 1000]堡称,越小越不容易被 kill 掉瞎抛;
  • 9、調(diào)用 RunKubelet 方法却紧;
  • 10桐臊、檢查 kubelet 是否啟動了動態(tài)配置功能;
  • 11晓殊、啟動 Healthz http server断凶;
  • 12、如果使用 systemd 啟動巫俺,通知 systemd kubelet 已經(jīng)啟動认烁;

k8s.io/kubernetes/cmd/kubelet/app/server.go:472

func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
    // 1、為 kubelet 設(shè)置默認(rèn)的 FeatureGates
    err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
    if err != nil {
        return err
    }
    // 2介汹、校驗(yàn) kubelet 的參數(shù)
    if err := options.ValidateKubeletServer(s); err != nil {
        return err
    }

    // 3却嗡、嘗試獲取 kubelet 的 lock file
    if s.ExitOnLockContention && s.LockFilePath == "" {
        return errors.New("cannot exit on lock file contention: no lock file specified")
    }
    done := make(chan struct{})
    if s.LockFilePath != "" {
        klog.Infof("acquiring file lock on %q", s.LockFilePath)
        if err := flock.Acquire(s.LockFilePath); err != nil {
            return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
        }
        if s.ExitOnLockContention {
            klog.Infof("watching for inotify events for: %v", s.LockFilePath)
            if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
                return err
            }
        }
    }
    // 4、將當(dāng)前的配置文件注冊到 http server /configz URL 中嘹承;
    err = initConfigz(&s.KubeletConfiguration)
    if err != nil {
        klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
    }

    // 5窗价、判斷是否為 standalone 模式
    standaloneMode := true
    if len(s.KubeConfig) > 0 {
        standaloneMode = false
    }

    // 6、初始化 kubeDeps
    if kubeDeps == nil {
        kubeDeps, err = UnsecuredDependencies(s)
        if err != nil {
            return err
        }
    }
    if kubeDeps.Cloud == nil {
        if !cloudprovider.IsExternal(s.CloudProvider) {
            cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
            if err != nil {
                return err
            }
            ......
            kubeDeps.Cloud = cloud
        }
    }

    hostName, err := nodeutil.GetHostname(s.HostnameOverride)
    if err != nil {
        return err
    }
    nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
    if err != nil {
        return err
    }
    // 7叹卷、如果是 standalone 模式將所有 client 設(shè)置為 nil
    switch {
    case standaloneMode:
        kubeDeps.KubeClient = nil
        kubeDeps.EventClient = nil
        kubeDeps.HeartbeatClient = nil

    // 8撼港、為 kubeDeps 初始化 KubeClient坪它、EventClient、HeartbeatClient 模塊
    case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
        clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)
        if err != nil {
            return err
        }
        if closeAllConns == nil {
            return errors.New("closeAllConns must be a valid function other than nil")
        }
        kubeDeps.OnHeartbeatFailure = closeAllConns

        kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
        if err != nil {
            return fmt.Errorf("failed to initialize kubelet client: %v", err)
        }

        eventClientConfig := *clientConfig
        eventClientConfig.QPS = float32(s.EventRecordQPS)
        eventClientConfig.Burst = int(s.EventBurst)
        kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
        if err != nil {
            return fmt.Errorf("failed to initialize kubelet event client: %v", err)
        }

        heartbeatClientConfig := *clientConfig
        heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration

        if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
            leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
            if heartbeatClientConfig.Timeout > leaseTimeout {
                heartbeatClientConfig.Timeout = leaseTimeout
            }
        }
        heartbeatClientConfig.QPS = float32(-1)
        kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
        if err != nil {
            return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err)
        }
    }
    // 9帝牡、初始化 auth 模塊
    if kubeDeps.Auth == nil {
        auth, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
        if err != nil {
            return err
        }
        kubeDeps.Auth = auth
    }

    var cgroupRoots []string

    // 10往毡、設(shè)置 cgroupRoot
    cgroupRoots = append(cgroupRoots, cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupDriver))
    kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
    if err != nil {
    } else if kubeletCgroup != "" {
        cgroupRoots = append(cgroupRoots, kubeletCgroup)
    }

    runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
    if err != nil {
    } else if runtimeCgroup != "" {
        cgroupRoots = append(cgroupRoots, runtimeCgroup)
    }
    if s.SystemCgroups != "" {
        cgroupRoots = append(cgroupRoots, s.SystemCgroups)
    }

    // 11、初始化 cadvisor
    if kubeDeps.CAdvisorInterface == nil {
        imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
        kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.           ContainerRuntime, s.RemoteRuntimeEndpoint))
        if err != nil {
            return err
        }
    }

    makeEventRecorder(kubeDeps, nodeName)

    // 12靶溜、初始化 ContainerManager
    if kubeDeps.ContainerManager == nil {
        if s.CgroupsPerQOS && s.CgroupRoot == "" {
            s.CgroupRoot = "/"
        }
        kubeReserved, err := parseResourceList(s.KubeReserved)
        if err != nil {
            return err
        }
        systemReserved, err := parseResourceList(s.SystemReserved)
        if err != nil {
            return err
        }
        var hardEvictionThresholds []evictionapi.Threshold
        if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
            hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
            if err != nil {
                return err
            }
        }
        experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
        if err != nil {
            return err
        }

        devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)
        kubeDeps.ContainerManager, err = cm.NewContainerManager(
            kubeDeps.Mounter,
            kubeDeps.CAdvisorInterface,
            cm.NodeConfig{
                ......
            },
            s.FailSwapOn,
            devicePluginEnabled,
            kubeDeps.Recorder)
        if err != nil {
            return err
        }
    }

    // 13开瞭、檢查是否以 root 權(quán)限啟動
    if err := checkPermissions(); err != nil {
        klog.Error(err)
    }

    utilruntime.ReallyCrash = s.ReallyCrashForTesting

    // 14、為 kubelet 進(jìn)程設(shè)置 oom 分?jǐn)?shù)
    oomAdjuster := kubeDeps.OOMAdjuster
    if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
        klog.Warning(err)
    }

    // 15墨技、調(diào)用 RunKubelet 方法執(zhí)行后續(xù)的啟動操作
    if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
        return err
    }

    if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&
        kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
        if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {
            return err
        }
    }

    // 16惩阶、啟動 Healthz http server
    if s.HealthzPort > 0 {
        mux := http.NewServeMux()
        healthz.InstallHandler(mux)
        go wait.Until(func() {
            err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
            if err != nil {
                klog.Errorf("Starting healthz server failed: %v", err)
            }
        }, 5*time.Second, wait.NeverStop)
    }

    if s.RunOnce {
        return nil
    }

    // 17、向 systemd 發(fā)送啟動信號
    go daemon.SdNotify(false, "READY=1")

    select {
    case <-done:
        break
    case <-stopCh:
        break
    }
    return nil
}

RunKubelet

RunKubelet 中主要調(diào)用了 createAndInitKubelet 方法執(zhí)行 kubelet 組件的初始化扣汪,然后調(diào)用 startKubelet 啟動 kubelet 中的組件断楷。

k8s.io/kubernetes/cmd/kubelet/app/server.go:989

func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
    hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
    if err != nil {
        return err
    }
    nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
    if err != nil {
        return err
    }
    makeEventRecorder(kubeDeps, nodeName)

    // 1、默認(rèn)啟動特權(quán)模式
    capabilities.Initialize(capabilities.Capabilities{
        AllowPrivileged: true,
    })

    credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)

    if kubeDeps.OSInterface == nil {
        kubeDeps.OSInterface = kubecontainer.RealOS{}
    }

    // 2崭别、調(diào)用 createAndInitKubelet
    k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
        ......
        kubeServer.NodeStatusMaxImages)
    if err != nil {
        return fmt.Errorf("failed to create kubelet: %v", err)
    }

    if kubeDeps.PodConfig == nil {
        return fmt.Errorf("failed to create kubelet, pod source config was nil")
    }
    podCfg := kubeDeps.PodConfig

    rlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles))

    if runOnce {
        if _, err := k.RunOnce(podCfg.Updates()); err != nil {
            return fmt.Errorf("runonce failed: %v", err)
        }
        klog.Info("Started kubelet as runonce")
    } else {
        // 3冬筒、調(diào)用 startKubelet
        startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
        klog.Info("Started kubelet")
    }
    return nil
}

createAndInitKubelet

createAndInitKubelet 中主要調(diào)用了三個方法來完成 kubelet 的初始化:

  • kubelet.NewMainKubelet:實(shí)例化 kubelet 對象,并對 kubelet 依賴的所有模塊進(jìn)行初始化茅主;
  • k.BirthCry:向 apiserver 發(fā)送一條 kubelet 啟動了的 event舞痰;
  • k.StartGarbageCollection:啟動垃圾回收服務(wù),回收 container 和 images诀姚;

k8s.io/kubernetes/cmd/kubelet/app/server.go:1089

func createAndInitKubelet(......) {
    k, err = kubelet.NewMainKubelet(
            ......
    )
    if err != nil {
        return nil, err
    }

    k.BirthCry()

    k.StartGarbageCollection()

    return k, nil
}
kubelet.NewMainKubelet

NewMainKubelet 是初始化 kubelet 的一個方法响牛,主要邏輯為:

  • 1、初始化 PodConfig 即監(jiān)聽 pod 元數(shù)據(jù)的來源(file赫段,http呀打,apiserver),將不同 source 的 pod configuration 合并到一個結(jié)構(gòu)中糯笙;
  • 2贬丛、初始化 containerGCPolicy、imageGCPolicy给涕、evictionConfig 配置豺憔;
  • 3、啟動 serviceInformer 和 nodeInformer够庙;
  • 4恭应、初始化 containerRefManageroomWatcher耘眨;
  • 5暮屡、初始化 kubelet 對象;
  • 6毅桃、初始化 secretManager褒纲、configMapManager
  • 7钥飞、初始化 livenessManager莺掠、podManagerstatusManager读宙、resourceAnalyzer彻秆;
  • 8、調(diào)用 kuberuntime.NewKubeGenericRuntimeManager 初始化 containerRuntime结闸;
  • 9唇兑、初始化 pleg
  • 10桦锄、初始化 containerGC扎附、containerDeletorimageManager结耀、containerLogManager留夜;
  • 11、初始化 serverCertificateManager图甜、probeManager碍粥、tokenManagervolumePluginMgr黑毅、pluginManager嚼摩、volumeManager
  • 12矿瘦、初始化 workQueue枕面、podWorkersevictionManager匪凡;
  • 13膊畴、最后注冊相關(guān)模塊的 handler;

NewMainKubelet 中對 kubelet 依賴的所有模塊進(jìn)行了初始化病游,每個模塊對應(yīng)的功能在上篇文章“kubelet 架構(gòu)淺析”有介紹唇跨,至于每個模塊初始化的流程以及功能會在后面的文章中進(jìn)行詳細(xì)分析。

k8s.io/kubernetes/pkg/kubelet/kubelet.go:335

func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,) {
    if rootDirectory == "" {
        return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
    }
    if kubeCfg.SyncFrequency.Duration <= 0 {
        return nil, fmt.Errorf("invalid sync frequency %d", kubeCfg.SyncFrequency.Duration)
    }

    if kubeCfg.MakeIPTablesUtilChains {
        ......
    }

    hostname, err := nodeutil.GetHostname(hostnameOverride)
    if err != nil {
        return nil, err
    }

    nodeName := types.NodeName(hostname)
    if kubeDeps.Cloud != nil {
        ......
    }

    // 1衬衬、初始化 PodConfig
    if kubeDeps.PodConfig == nil {
        var err error
        kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
        if err != nil {
            return nil, err
        }
    }

    // 2樊零、初始化 containerGCPolicy、imageGCPolicy叠纹、evictionConfig
    containerGCPolicy := kubecontainer.ContainerGCPolicy{
        MinAge:             minimumGCAge.Duration,
        MaxPerPodContainer: int(maxPerPodContainerCount),
        MaxContainers:      int(maxContainerCount),
    }
    daemonEndpoints := &v1.NodeDaemonEndpoints{
        KubeletEndpoint: v1.DaemonEndpoint{Port: kubeCfg.Port},
    }

    imageGCPolicy := images.ImageGCPolicy{
        MinAge:               kubeCfg.ImageMinimumGCAge.Duration,
        HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
        LowThresholdPercent:  int(kubeCfg.ImageGCLowThresholdPercent),
    }

    enforceNodeAllocatable := kubeCfg.EnforceNodeAllocatable
    if experimentalNodeAllocatableIgnoreEvictionThreshold {
        enforceNodeAllocatable = []string{}
    }
    thresholds, err := eviction.ParseThresholdConfig(enforceNodeAllocatable, kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.                        EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
    if err != nil {
        return nil, err
    }
    evictionConfig := eviction.Config{
        PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
        MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
        Thresholds:               thresholds,
        KernelMemcgNotification:  experimentalKernelMemcgNotification,
        PodCgroupRoot:            kubeDeps.ContainerManager.GetPodCgroupRoot(),
    }
    // 3疲吸、啟動 serviceInformer 和 nodeInformer
    serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
    if kubeDeps.KubeClient != nil {
        serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
        r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0)
        go r.Run(wait.NeverStop)
    }
    serviceLister := corelisters.NewServiceLister(serviceIndexer)

    nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
    if kubeDeps.KubeClient != nil {
        fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
        nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
        r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
        go r.Run(wait.NeverStop)
    }
    nodeInfo := &CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}

    ......

    // 4、初始化 containerRefManager狮惜、oomWatcher
    containerRefManager := kubecontainer.NewRefManager()

    oomWatcher := oomwatcher.NewWatcher(kubeDeps.Recorder)
    clusterDNS := make([]net.IP, 0, len(kubeCfg.ClusterDNS))
    for _, ipEntry := range kubeCfg.ClusterDNS {
        ip := net.ParseIP(ipEntry)
        if ip == nil {
            klog.Warningf("Invalid clusterDNS ip '%q'", ipEntry)
        } else {
            clusterDNS = append(clusterDNS, ip)
        }
    }
    httpClient := &http.Client{}
    parsedNodeIP := net.ParseIP(nodeIP)
    protocol := utilipt.ProtocolIpv4
    if parsedNodeIP != nil && parsedNodeIP.To4() == nil {
        protocol = utilipt.ProtocolIpv6
    }

    // 5高诺、初始化 kubelet 對象
    klet := &Kubelet{......}

    if klet.cloud != nil {
        klet.cloudResourceSyncManager = cloudresource.NewSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency)
    }

    // 6碌识、初始化 secretManager、configMapManager
    var secretManager secret.Manager
    var configMapManager configmap.Manager
    switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
    case kubeletconfiginternal.WatchChangeDetectionStrategy:
        secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient)
        configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient)
    case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
        secretManager = secret.NewCachingSecretManager(
            kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
        configMapManager = configmap.NewCachingConfigMapManager(
            kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
    case kubeletconfiginternal.GetChangeDetectionStrategy:
        secretManager = secret.NewSimpleSecretManager(kubeDeps.KubeClient)
        configMapManager = configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient)
    default:
        return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)
    }

    klet.secretManager = secretManager
    klet.configMapManager = configMapManager
    if klet.experimentalHostUserNamespaceDefaulting {
        klog.Infof("Experimental host user namespace defaulting is enabled.")
    }

    machineInfo, err := klet.cadvisor.MachineInfo()
    if err != nil {
        return nil, err
    }
    klet.machineInfo = machineInfo

    imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)

    // 7虱而、初始化 livenessManager筏餐、podManager、statusManager牡拇、resourceAnalyzer
    klet.livenessManager = proberesults.NewManager()

    klet.podCache = kubecontainer.NewCache()
    var checkpointManager checkpointmanager.CheckpointManager
    if bootstrapCheckpointPath != "" {
        checkpointManager, err = checkpointmanager.NewCheckpointManager(bootstrapCheckpointPath)
        if err != nil {
            return nil, fmt.Errorf("failed to initialize checkpoint manager: %+v", err)
        }
    }

    klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager, checkpointManager)

    klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)

    if remoteRuntimeEndpoint != "" {
        if remoteImageEndpoint == "" {
            remoteImageEndpoint = remoteRuntimeEndpoint
        }
    }

    pluginSettings := dockershim.NetworkPluginSettings{......}

    klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration)

    var legacyLogProvider kuberuntime.LegacyLogProvider

    // 8魁瞪、調(diào)用 kuberuntime.NewKubeGenericRuntimeManager 初始化 containerRuntime
    switch containerRuntime {
    case kubetypes.DockerContainerRuntime:
        streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions)
        ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,
            &pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, !crOptions.RedirectContainerStreaming)
        if err != nil {
            return nil, err
        }
        if crOptions.RedirectContainerStreaming {
            klet.criHandler = ds
        }

        server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
        if err := server.Start(); err != nil {
            return nil, err
        }

        supported, err := ds.IsCRISupportedLogDriver()
        if err != nil {
            return nil, err
        }
        if !supported {
            klet.dockerLegacyService = ds
            legacyLogProvider = ds
        }
    case kubetypes.RemoteContainerRuntime:
        break
    default:
        return nil, fmt.Errorf("unsupported CRI runtime: %q", containerRuntime)
    }
    runtimeService, imageService, err := getRuntimeAndImageServices(remoteRuntimeEndpoint, remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout)
    if err != nil {
        return nil, err
    }
    klet.runtimeService = runtimeService
    if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && kubeDeps.KubeClient != nil {
        klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
    }

    runtime, err := kuberuntime.NewKubeGenericRuntimeManager(......)
    if err != nil {
        return nil, err
    }
    klet.containerRuntime = runtime
    klet.streamingRuntime = runtime
    klet.runner = runtime

    runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
    if err != nil {
        return nil, err
    }
    klet.runtimeCache = runtimeCache

    if cadvisor.UsingLegacyCadvisorStats(containerRuntime, remoteRuntimeEndpoint) {
        klet.StatsProvider = stats.NewCadvisorStatsProvider(......)
    } else {
        klet.StatsProvider = stats.NewCRIStatsProvider(......)
    }
    // 9、初始化 pleg
    klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
    klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
    klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
    if _, err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil {
        klog.Errorf("Pod CIDR update failed %v", err)
    }

    // 10惠呼、初始化 containerGC导俘、containerDeletor、imageManager剔蹋、containerLogManager
    containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady)
    if err != nil {
        return nil, err
    }
    klet.containerGC = containerGC
    klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))

    imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.       PodSandboxImage)
    if err != nil {
        return nil, fmt.Errorf("failed to initialize image manager: %v", err)
    }
    klet.imageManager = imageManager

    if containerRuntime == kubetypes.RemoteContainerRuntime && utilfeature.DefaultFeatureGate.Enabled(features.CRIContainerLogRotation) {
        containerLogManager, err := logs.NewContainerLogManager(
            klet.runtimeService,
            kubeCfg.ContainerLogMaxSize,
            int(kubeCfg.ContainerLogMaxFiles),
        )
        if err != nil {
            return nil, fmt.Errorf("failed to initialize container log manager: %v", err)
        }
        klet.containerLogManager = containerLogManager
    } else {
        klet.containerLogManager = logs.NewStubContainerLogManager()
    }
    // 11旅薄、初始化 serverCertificateManager、probeManager滩租、tokenManager赋秀、volumePluginMgr、pluginManager律想、volumeManager
    if kubeCfg.ServerTLSBootstrap && kubeDeps.TLSOptions != nil && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
        klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, klet.        getLastObservedNodeAddresses, certDirectory)
        if err != nil {
            return nil, fmt.Errorf("failed to initialize certificate manager: %v", err)
        }
        kubeDeps.TLSOptions.Config.GetCertificate = func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
            cert := klet.serverCertificateManager.Current()
            if cert == nil {
                return nil, fmt.Errorf("no serving certificate available for the kubelet")
            }
            return cert, nil
        }
    }

    klet.probeManager = prober.NewManager(......)
    tokenManager := token.NewManager(kubeDeps.KubeClient)

    klet.volumePluginMgr, err =
        NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
    if err != nil {
        return nil, err
    }
    klet.pluginManager = pluginmanager.NewPluginManager(
        klet.getPluginsRegistrationDir(), /* sockDir */
        klet.getPluginsDir(),             /* deprecatedSockDir */
        kubeDeps.Recorder,
    )

    if len(experimentalMounterPath) != 0 {
        experimentalCheckNodeCapabilitiesBeforeMount = false
        klet.dnsConfigurer.SetupDNSinContainerizedMounter(experimentalMounterPath)
    }
    klet.volumeManager = volumemanager.NewVolumeManager(......)

    // 12猎莲、初始化 workQueue、podWorkers技即、evictionManager
    klet.reasonCache = NewReasonCache()
    klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
    klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)

    klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
    klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)

    evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder),  klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)

    klet.evictionManager = evictionManager
    klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
    if utilfeature.DefaultFeatureGate.Enabled(features.Sysctls) {
        runtimeSupport, err := sysctl.NewRuntimeAdmitHandler(klet.containerRuntime)
        if err != nil {
            return nil, err
        }

        safeAndUnsafeSysctls := append(sysctlwhitelist.SafeSysctlWhitelist(), allowedUnsafeSysctls...)
        sysctlsWhitelist, err := sysctl.NewWhitelist(safeAndUnsafeSysctls)
        if err != nil {
            return nil, err
        }
        klet.admitHandlers.AddPodAdmitHandler(runtimeSupport)
        klet.admitHandlers.AddPodAdmitHandler(sysctlsWhitelist)
    }

    // 13著洼、為 pod 注冊相關(guān)模塊的 handler
    activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
    if err != nil {
        return nil, err
    }
    klet.AddPodSyncLoopHandler(activeDeadlineHandler)
    klet.AddPodSyncHandler(activeDeadlineHandler)
    if utilfeature.DefaultFeatureGate.Enabled(features.TopologyManager) {
        klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetTopologyPodAdmitHandler())
    }
    criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder),kubeDeps.Recorder)
    klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
    for _, opt := range kubeDeps.Options {
        opt(klet)
    }

    klet.appArmorValidator = apparmor.NewValidator(containerRuntime)
    klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator))
    klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewNoNewPrivsAdmitHandler(klet.containerRuntime))

    if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
        klet.nodeLeaseController = nodelease.NewController(klet.clock, klet.heartbeatClient, string(klet.nodeName), kubeCfg.NodeLeaseDurationSeconds,    klet.onRepeatedHeartbeatFailure)
    }

    klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewProcMountAdmitHandler(klet.containerRuntime))

    klet.kubeletConfiguration = *kubeCfg

    klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()

    return klet, nil
}

startKubelet

startKubelet 中通過調(diào)用 k.Run 來啟動 kubelet 中的所有模塊以及主流程,然后啟動 kubelet 所需要的 http server而叼,在 v1.16 中身笤,kubelet 默認(rèn)僅啟動健康檢查端口 10248 和 kubelet server 的端口 10250。

k8s.io/kubernetes/cmd/kubelet/app/server.go:1070

func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies,    enableCAdvisorJSONEndpoints, enableServer bool) {
    // start the kubelet
    go wait.Until(func() {
        k.Run(podCfg.Updates())
    }, 0, wait.NeverStop)

    // start the kubelet server
    if enableServer {
        go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg.  EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)

    }
    if kubeCfg.ReadOnlyPort > 0 {
        go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
    }
    if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
        go k.ListenAndServePodResources()
    }
}

至此葵陵,kubelet 對象以及其依賴模塊在上面的幾個方法中已經(jīng)初始化完成了液荸,除了單獨(dú)啟動了 gc 模塊外其余的模塊以及主邏輯最后都會在 Run 方法啟動,Run 方法的主要邏輯在下文中會進(jìn)行解釋脱篙,此處總結(jié)一下 kubelet 啟動邏輯中的調(diào)用關(guān)系如下所示:

                                                                                  |--> NewMainKubelet
                                                                                  |
                                                      |--> createAndInitKubelet --|--> BirthCry
                                                      |                           |
                                    |--> RunKubelet --|                           |--> StartGarbageCollection
                                    |                 |
                                    |                  |--> startKubelet --> k.Run
                                    |
NewKubeletCommand --> Run --> run --|--> http.ListenAndServe
                                    |
                                    |--> daemon.SdNotify

Run

Run 方法是啟動 kubelet 的核心方法娇钱,其中會啟動 kubelet 的依賴模塊以及主循環(huán)邏輯,該方法的主要邏輯為:

  • 1绊困、注冊 logServer文搂;
  • 2、判斷是否需要啟動 cloud provider sync manager秤朗;
  • 3煤蹭、調(diào)用 kl.initializeModules 首先啟動不依賴 container runtime 的一些模塊;
  • 4、啟動 volume manager硝皂;
  • 5常挚、執(zhí)行 kl.syncNodeStatus 定時同步 Node 狀態(tài);
  • 6稽物、調(diào)用 kl.fastStatusUpdateOnce 更新容器運(yùn)行時啟動時間以及執(zhí)行首次狀態(tài)同步待侵;
  • 7、判斷是否啟用 NodeLease 機(jī)制姨裸;
  • 8、執(zhí)行 kl.updateRuntimeUp 定時更新 Runtime 狀態(tài)怨酝;
  • 9傀缩、執(zhí)行 kl.syncNetworkUtil 定時同步 iptables 規(guī)則;
  • 10农猬、執(zhí)行 kl.podKiller 定時清理異常 pod赡艰,當(dāng) pod 沒有被 podworker 正確處理的時候,啟動一個goroutine 負(fù)責(zé) kill 掉 pod斤葱;
  • 11慷垮、啟動 statusManager
  • 12揍堕、啟動 probeManager料身;
  • 13、啟動 runtimeClassManager衩茸;
  • 14芹血、啟動 pleg
  • 15楞慈、調(diào)用 kl.syncLoop 監(jiān)聽 pod 變化幔烛;

Run 方法中主要調(diào)用了兩個方法 kl.initializeModuleskl.fastStatusUpdateOnce 來完成啟動前的一些初始化,在初始化完所有的模塊后會啟動主循環(huán)囊蓝。

k8s.io/kubernetes/pkg/kubelet/kubelet.go:1398

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
    // 1饿悬、注冊 logServer
    if kl.logServer == nil {
        kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
    }
    if kl.kubeClient == nil {
        klog.Warning("No api server defined - no node status update will be sent.")
    }

    // 2、判斷是否需要啟動 cloud provider sync manager
    if kl.cloudResourceSyncManager != nil {
       go kl.cloudResourceSyncManager.Run(wait.NeverStop)
    }

    // 3聚霜、調(diào)用 kl.initializeModules 首先啟動不依賴 container runtime 的一些模塊
    if err := kl.initializeModules(); err != nil {
        kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
        klog.Fatal(err)
    }

    // 4狡恬、啟動 volume manager
    go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

    if kl.kubeClient != nil {
        // 5、執(zhí)行 kl.syncNodeStatus 定時同步 Node 狀態(tài)
        go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)

        // 6俯萎、調(diào)用 kl.fastStatusUpdateOnce 更新容器運(yùn)行時啟動時間以及執(zhí)行首次狀態(tài)同步
        go kl.fastStatusUpdateOnce()

        // 7傲宜、判斷是否啟用 NodeLease 機(jī)制
        if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
            go kl.nodeLeaseController.Run(wait.NeverStop)
        }
    }

    // 8、執(zhí)行 kl.updateRuntimeUp 定時更新 Runtime 狀態(tài)
    go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

    // 9夫啊、執(zhí)行 kl.syncNetworkUtil 定時同步 iptables 規(guī)則
    if kl.makeIPTablesUtilChains {
        go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
    }

    // 10函卒、執(zhí)行 kl.podKiller 定時清理異常 pod
    go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)

    // 11、啟動 statusManager、probeManager报嵌、runtimeClassManager
    kl.statusManager.Start()
    kl.probeManager.Start()

    if kl.runtimeClassManager != nil {
        kl.runtimeClassManager.Start(wait.NeverStop)
    }

    // 12虱咧、啟動 pleg
    kl.pleg.Start()

    // 13、調(diào)用 kl.syncLoop 監(jiān)聽 pod 變化
    kl.syncLoop(updates, kl)
}

initializeModules

initializeModules 中啟動的模塊是不依賴于 container runtime 的锚国,并且不依賴于尚未初始化的模塊腕巡,其主要邏輯為:

  • 1、調(diào)用 kl.setupDataDirs 創(chuàng)建 kubelet 所需要的文件目錄血筑;
  • 2绘沉、創(chuàng)建 ContainerLogsDir /var/log/containers
  • 3豺总、啟動 imageManager车伞,image gc 的功能已經(jīng)在 RunKubelet 中啟動了,此處主要是監(jiān)控 image 的變化喻喳;
  • 4另玖、啟動 certificateManager,負(fù)責(zé)證書更新表伦;
  • 5谦去、啟動 oomWatcher,監(jiān)聽 oom 并記錄事件蹦哼;
  • 6鳄哭、啟動 resourceAnalyzer

k8s.io/kubernetes/pkg/kubelet/kubelet.go:1319

func (kl *Kubelet) initializeModules() error {
    metrics.Register(
        kl.runtimeCache,
        collectors.NewVolumeStatsCollector(kl),
        collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats),
    )
    metrics.SetNodeName(kl.nodeName)
    servermetrics.Register()

    // 1翔怎、創(chuàng)建文件目錄
    if err := kl.setupDataDirs(); err != nil {
        return err
    }

    // 2窃诉、創(chuàng)建 ContainerLogsDir
    if _, err := os.Stat(ContainerLogsDir); err != nil {
        if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
            klog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err)
        }
    }

    // 3、啟動 imageManager
    kl.imageManager.Start()

    // 4赤套、啟動 certificate manager
    if kl.serverCertificateManager != nil {
        kl.serverCertificateManager.Start()
    }
    // 5飘痛、啟動 oomWatcher.
    if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
        return fmt.Errorf("failed to start OOM watcher %v", err)
    }

    // 6、啟動 resource analyzer
    kl.resourceAnalyzer.Start()

    return nil
}

fastStatusUpdateOnce

fastStatusUpdateOnce 會不斷嘗試更新 pod CIDR容握,一旦更新成功會立即執(zhí)行updateRuntimeUpsyncNodeStatus來進(jìn)行運(yùn)行時的更新和節(jié)點(diǎn)狀態(tài)更新宣脉。此方法只在 kubelet 啟動時執(zhí)行一次,目的是為了通過更新 pod CIDR剔氏,減少節(jié)點(diǎn)達(dá)到 ready 狀態(tài)的時延塑猖,盡可能快的進(jìn)行 runtime update 和 node status update。

k8s.io/kubernetes/pkg/kubelet/kubelet.go:2262

func (kl *Kubelet) fastStatusUpdateOnce() {
    for {
        time.Sleep(100 * time.Millisecond)
        node, err := kl.GetNode()
        if err != nil {
            klog.Errorf(err.Error())
            continue
        }
        if len(node.Spec.PodCIDRs) != 0 {
            podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
            if _, err := kl.updatePodCIDR(podCIDRs); err != nil {
                klog.Errorf("Pod CIDR update to %v failed %v", podCIDRs, err)
                continue
            }
            kl.updateRuntimeUp()
            kl.syncNodeStatus()
            return
        }
    }
}
updateRuntimeUp

updateRuntimeUp 方法在容器運(yùn)行時首次啟動過程中初始化運(yùn)行時依賴的模塊谈跛,并在 kubelet 的runtimeState中更新容器運(yùn)行時的啟動時間羊苟。updateRuntimeUp 方法首先檢查 network 以及 runtime 是否處于 ready 狀態(tài),如果 network 以及 runtime 都處于 ready 狀態(tài)感憾,然后調(diào)用 initializeRuntimeDependentModules 初始化 runtime 的依賴模塊蜡励,包括 cadvisorcontainerManagerevictionManager凉倚、containerLogManager兼都、pluginManage等。

k8s.io/kubernetes/pkg/kubelet/kubelet.go:2168

func (kl *Kubelet) updateRuntimeUp() {
    kl.updateRuntimeMux.Lock()
    defer kl.updateRuntimeMux.Unlock()

    // 1稽寒、獲取 containerRuntime Status
    s, err := kl.containerRuntime.Status()
    if err != nil {
        klog.Errorf("Container runtime sanity check failed: %v", err)
        return
    }
    if s == nil {
        klog.Errorf("Container runtime status is nil")
        return
    }

    // 2扮碧、檢查 network 和 runtime 是否處于 ready 狀態(tài)
    networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
    if networkReady == nil || !networkReady.Status {
        kl.runtimeState.setNetworkState(fmt.Errorf("runtime network not ready: %v", networkReady))
    } else {
        kl.runtimeState.setNetworkState(nil)
    }

    runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
    if runtimeReady == nil || !runtimeReady.Status {
        kl.runtimeState.setRuntimeState(err)
        return
    }
    kl.runtimeState.setRuntimeState(nil)
    // 3、調(diào)用 kl.initializeRuntimeDependentModules 啟動依賴模塊
    kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
    kl.runtimeState.setRuntimeSync(kl.clock.Now())
}
initializeRuntimeDependentModules

該方法的主要邏輯為:

  • 1杏糙、啟動 cadvisor慎王;
  • 2、獲取 CgroupStats宏侍;
  • 3柬祠、啟動 containerManagerevictionManager负芋、containerLogManager
  • 4嗜愈、將 CSI Driver 和 Device Manager 注冊到 pluginManager旧蛾,然后啟動 pluginManager

k8s.io/kubernetes/pkg/kubelet/kubelet.go:1361

func (kl *Kubelet) initializeRuntimeDependentModules() {
    // 1蠕嫁、啟動 cadvisor
    if err := kl.cadvisor.Start(); err != nil {
        ......
    }

    // 2锨天、獲取 CgroupStats
    kl.StatsProvider.GetCgroupStats("/", true)

    node, err := kl.getNodeAnyWay()
    if err != nil {
        klog.Fatalf("Kubelet failed to get node info: %v", err)
    }

    // 3、啟動 containerManager剃毒、evictionManager病袄、containerLogManager
    if err := kl.containerManager.Start(node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService); err != nil {
        klog.Fatalf("Failed to start ContainerManager %v", err)
    }

    kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.podResourcesAreReclaimed, evictionMonitoringPeriod)

    kl.containerLogManager.Start()

    kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))

    kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
    // 4、啟動 pluginManager
    go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
}

小結(jié)

Run 方法中可以看到赘阀,會直接調(diào)用 kl.syncNodeStatuskl.updateRuntimeUp益缠,但在 kl.fastStatusUpdateOnce 中也調(diào)用了這兩個方法,而在 kl.fastStatusUpdateOnce 中僅執(zhí)行一次基公,在 Run 方法中會定期執(zhí)行幅慌。在kl.fastStatusUpdateOnce 中調(diào)用的目的就是當(dāng) kubelet 首次啟動時盡可能快的進(jìn)行 runtime update 和 node status update,減少節(jié)點(diǎn)達(dá)到 ready 狀態(tài)的時延轰豆。而在 kl.updateRuntimeUp 中調(diào)用的初始化 runtime 依賴模塊的方法 kl.initializeRuntimeDependentModules 通過 sync.Once 調(diào)用僅僅會被執(zhí)行一次胰伍。

syncLoop

syncLoop 是 kubelet 的主循環(huán)方法,它從不同的管道(file酸休,http骂租,apiserver)監(jiān)聽 pod 的變化,并把它們匯聚起來斑司。當(dāng)有新的變化發(fā)生時渗饮,它會調(diào)用對應(yīng)的函數(shù),保證 pod 處于期望的狀態(tài)。

syncLoop 中首先定義了一個 syncTickerhousekeepingTicker抽米,即使沒有需要更新的 pod 配置特占,kubelet 也會定時去做同步和清理 pod 的工作。然后在 for 循環(huán)中一直調(diào)用 syncLoopIteration云茸,如果在每次循環(huán)過程中出現(xiàn)錯誤時是目,kubelet 會記錄到 runtimeState 中,遇到錯誤就等待 5 秒中繼續(xù)循環(huán)标捺。

k8s.io/kubernetes/pkg/kubelet/kubelet.go:1821

func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
    syncTicker := time.NewTicker(time.Second)
    defer syncTicker.Stop()
    housekeepingTicker := time.NewTicker(housekeepingPeriod)
    defer housekeepingTicker.Stop()
    plegCh := kl.pleg.Watch()
    const (
        base   = 100 * time.Millisecond
        max    = 5 * time.Second
        factor = 2
    )
    duration := base
    for {
        if err := kl.runtimeState.runtimeErrors(); err != nil {
            time.Sleep(duration)
            duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
            continue
        }
        duration = base
        kl.syncLoopMonitor.Store(kl.clock.Now())
        if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
            break
        }
        kl.syncLoopMonitor.Store(kl.clock.Now())
    }
}
syncLoopIteration

syncLoopIteration 方法會監(jiān)聽多個 channel懊纳,當(dāng)發(fā)現(xiàn)任何一個 channel 有數(shù)據(jù)就交給 handler 去處理,在 handler 中通過調(diào)用 dispatchWork 分發(fā)任務(wù)亡容。它會從以下幾個 channel 中獲取消息:

  • 1嗤疯、configCh:該信息源由 kubeDeps 對象中的 PodConfig 子模塊提供,該模塊將同時 watch 3 個不同來源的 pod 信息的變化(file闺兢,http茂缚,apiserver),一旦某個來源的 pod 信息發(fā)生了更新(創(chuàng)建/更新/刪除)屋谭,這個 channel 中就會出現(xiàn)被更新的 pod 信息和更新的具體操作脚囊;
  • 2、syncCh:定時器桐磁,每隔一秒去同步最新保存的 pod 狀態(tài)悔耘;
  • 3、houseKeepingCh:housekeeping 事件的通道我擂,做 pod 清理工作衬以;
  • 4、plegCh:該信息源由 kubelet 對象中的 pleg 子模塊提供校摩,該模塊主要用于周期性地向 container runtime 查詢當(dāng)前所有容器的狀態(tài)看峻,如果狀態(tài)發(fā)生變化,則這個 channel 產(chǎn)生事件衙吩;
  • 5备籽、liveness Manager:健康檢查模塊發(fā)現(xiàn)某個 pod 異常時,kubelet 將根據(jù) pod 的 restartPolicy 自動執(zhí)行正確的操作分井;

k8s.io/kubernetes/pkg/kubelet/kubelet.go:1888

func (kl *Kubelet) syncLoopIteration(......) bool {
    select {
    case u, open := <-configCh:
        if !open {
            return false
        }

        switch u.Op {
        case kubetypes.ADD:
            handler.HandlePodAdditions(u.Pods)
        case kubetypes.UPDATE:
            handler.HandlePodUpdates(u.Pods)
        case kubetypes.REMOVE:
            handler.HandlePodRemoves(u.Pods)
        case kubetypes.RECONCILE:
            handler.HandlePodReconcile(u.Pods)
        case kubetypes.DELETE:
            handler.HandlePodUpdates(u.Pods)
        case kubetypes.RESTORE:
            handler.HandlePodAdditions(u.Pods)
        case kubetypes.SET:
        }

        if u.Op != kubetypes.RESTORE {
            kl.sourcesReady.AddSource(u.Source)
        }
    case e := <-plegCh:
        if isSyncPodWorthy(e) {
            if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
                klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
                handler.HandlePodSyncs([]*v1.Pod{pod})
            } else {
                klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
            }
        }

        if e.Type == pleg.ContainerDied {
            if containerID, ok := e.Data.(string); ok {
                kl.cleanUpContainersInPod(e.ID, containerID)
            }
        }
    case <-syncCh:
        podsToSync := kl.getPodsToSync()
        if len(podsToSync) == 0 {
            break
        }
        handler.HandlePodSyncs(podsToSync)
    case update := <-kl.livenessManager.Updates():
        if update.Result == proberesults.Failure {
            pod, ok := kl.podManager.GetPodByUID(update.PodUID)
            if !ok {
                break
            }
            handler.HandlePodSyncs([]*v1.Pod{pod})
        }
    case <-housekeepingCh:
        if !kl.sourcesReady.AllReady() {
            klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
        } else {
            if err := handler.HandlePodCleanups(); err != nil {
                klog.Errorf("Failed cleaning pods: %v", err)
            }
        }
    }
    return true
}

最后再總結(jié)一下啟動 kubelet 以及其依賴模塊 Run 方法中的調(diào)用流程:

      |--> kl.cloudResourceSyncManager.Run
      |
      |                            |--> kl.setupDataDirs
      |                            |--> kl.imageManager.Start
Run --|--> kl.initializeModules ---|--> kl.serverCertificateManager.Start
      |                            |--> kl.oomWatcher.Start
      |                            |--> kl.resourceAnalyzer.Start
      |
      |--> kl.volumeManager.Run
      |                                                        |--> kl.containerRuntime.Status
      |--> kl.syncNodeStatus                                   |
      |                              |--> kl.updateRuntimeUp --|                                           |--> kl.cadvisor.Start
      |                              |                         |                                           |
      |--> kl.fastStatusUpdateOnce --|                         |--> kl.initializeRuntimeDependentModules --|--> kl.containerManager.Start
      |                              |                                                                     |
      |                              |--> kl.syncNodeStatus                                                |--> kl.evictionManager.Start
      |                                                                                                    |
      |--> kl.updateRuntimeUp                                                                              |--> kl.containerLogManager.Start
      |                                                                                                    |
      |--> kl.syncNetworkUtil                                                                              |--> kl.pluginManager.Run
      |
      |--> kl.podKiller
      |
      |--> kl.statusManager.Start
      |
      |--> kl.probeManager.Start
      |
      |--> kl.runtimeClassManager.Start
      |
      |--> kl.pleg.Start
      |
      |--> kl.syncLoop --> kl.syncLoopIteration

總結(jié)

本文主要介紹了 kubelet 的啟動流程车猬,可以看到 kubelet 啟動流程中的環(huán)節(jié)非常多,kubelet 中也包含了非常多的模塊尺锚,后續(xù)在分享 kubelet 源碼的文章中會先以 Run 方法中啟動的所有模塊為主珠闰,各個擊破。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末瘫辩,一起剝皮案震驚了整個濱河市伏嗜,隨后出現(xiàn)的幾起案子坛悉,更是在濱河造成了極大的恐慌,老刑警劉巖承绸,帶你破解...
    沈念sama閱讀 218,525評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件裸影,死亡現(xiàn)場離奇詭異,居然都是意外死亡军熏,警方通過查閱死者的電腦和手機(jī)轩猩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,203評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來荡澎,“玉大人均践,你說我怎么就攤上這事∧︶#” “怎么了彤委?”我有些...
    開封第一講書人閱讀 164,862評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長或衡。 經(jīng)常有香客問我焦影,道長,這世上最難降的妖魔是什么封断? 我笑而不...
    開封第一講書人閱讀 58,728評論 1 294
  • 正文 為了忘掉前任偷办,我火速辦了婚禮,結(jié)果婚禮上澄港,老公的妹妹穿的比我還像新娘。我一直安慰自己柄沮,他們只是感情好回梧,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,743評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著祖搓,像睡著了一般狱意。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上拯欧,一...
    開封第一講書人閱讀 51,590評論 1 305
  • 那天详囤,我揣著相機(jī)與錄音,去河邊找鬼镐作。 笑死藏姐,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的该贾。 我是一名探鬼主播羔杨,決...
    沈念sama閱讀 40,330評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼杨蛋!你這毒婦竟也來了兜材?” 一聲冷哼從身側(cè)響起理澎,我...
    開封第一講書人閱讀 39,244評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎曙寡,沒想到半個月后糠爬,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,693評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡举庶,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,885評論 3 336
  • 正文 我和宋清朗相戀三年执隧,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片灯变。...
    茶點(diǎn)故事閱讀 40,001評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡殴玛,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出添祸,到底是詐尸還是另有隱情滚粟,我是刑警寧澤,帶...
    沈念sama閱讀 35,723評論 5 346
  • 正文 年R本政府宣布刃泌,位于F島的核電站凡壤,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏耙替。R本人自食惡果不足惜亚侠,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,343評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望俗扇。 院中可真熱鬧硝烂,春花似錦、人聲如沸铜幽。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,919評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽除抛。三九已至狮杨,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間到忽,已是汗流浹背橄教。 一陣腳步聲響...
    開封第一講書人閱讀 33,042評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留喘漏,地道東北人护蝶。 一個月前我還...
    沈念sama閱讀 48,191評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像翩迈,于是被迫代替她去往敵國和親滓走。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,955評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 上篇文章(kubelet 架構(gòu)淺析 )已經(jīng)介紹過 kubelet 在整個集群架構(gòu)中的功能以及自身各模塊的用途帽馋,本篇...
    田飛雨閱讀 4,571評論 0 3
  • 概述 k8s版本: 1.13.10代碼路徑: https://github.com/kubernetes/kube...
    徐亞松_v閱讀 2,452評論 0 5
  • 在Kubernetes 子節(jié)點(diǎn)搅方,在每個Node節(jié)點(diǎn)上都會啟動一個kubelet服務(wù)進(jìn)程比吭。該進(jìn)程用于處理Master...
    程序員札記閱讀 2,347評論 0 4
  • 導(dǎo)讀:目前完全專注于云原生的paas平臺建設(shè)上,為了進(jìn)一步了解k8s并且方便debug姨涡,因此對kubelet的源碼...
    大雄good閱讀 3,337評論 0 0
  • 簡介 kubernetes 是一個分布式的集群管理系統(tǒng)衩藤,在每個節(jié)點(diǎn)(node)上都要運(yùn)行一個 worker 對容器...
    shinwing閱讀 24,502評論 1 11