本來這篇文章會繼續(xù)講述 kubelet 中的主要模塊教寂,但由于網(wǎng)友反饋能不能先從 kubelet 的啟動流程開始抒抬,kubelet 的啟動流程在很久之前基于 v1.12 寫過一篇文章菱魔,對比了 v1.16 中的啟動流程變化不大,但之前的文章寫的比較簡潔,本文會重新分析 kubelet 的啟動流程挡篓。
Kubelet 啟動流程
kubernetes 版本:v1.16
kubelet 的啟動比較復(fù)雜,首先還是把 kubelet 的啟動流程圖放在此處帚称,便于在后文中清楚各種調(diào)用的流程:
NewKubeletCommand
首先從 kubelet 的 main
函數(shù)開始官研,其中調(diào)用的 NewKubeletCommand
方法主要負(fù)責(zé)獲取配置文件中的參數(shù),校驗(yàn)參數(shù)以及為參數(shù)設(shè)置默認(rèn)值闯睹。主要邏輯為:
- 1戏羽、解析命令行參數(shù);
- 2楼吃、為 kubelet 初始化 feature gates 參數(shù)始花;
- 3妄讯、加載 kubelet 配置文件;
- 4衙荐、校驗(yàn)配置文件中的參數(shù)捞挥;
- 5、檢查 kubelet 是否啟用動態(tài)配置功能忧吟;
- 6砌函、初始化 kubeletDeps,kubeletDeps 包含 kubelet 運(yùn)行所必須的配置溜族,是為了實(shí)現(xiàn) dependency injection讹俊,其目的是為了把 kubelet 依賴的組件對象作為參數(shù)傳進(jìn)來,這樣可以控制 kubelet 的行為煌抒;
- 7仍劈、調(diào)用
Run
方法;
k8s.io/kubernetes/cmd/kubelet/app/server.go:111
func NewKubeletCommand() *cobra.Command {
cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
// 1寡壮、kubelet配置分兩部分:
// KubeletFlag: 指那些不允許在 kubelet 運(yùn)行時進(jìn)行修改的配置集贩疙,或者不能在集群中各個 Nodes 之間共享的配置集。
// KubeletConfiguration: 指可以在集群中各個Nodes之間共享的配置集况既,可以進(jìn)行動態(tài)配置这溅。
kubeletFlags := options.NewKubeletFlags()
kubeletConfig, err := options.NewKubeletConfiguration()
if err != nil {
klog.Fatal(err)
}
cmd := &cobra.Command{
Use: componentKubelet,
DisableFlagParsing: true,
......
Run: func(cmd *cobra.Command, args []string) {
// 2、解析命令行參數(shù)
if err := cleanFlagSet.Parse(args); err != nil {
cmd.Usage()
klog.Fatal(err)
}
......
verflag.PrintAndExitIfRequested()
utilflag.PrintFlags(cleanFlagSet)
// 3棒仍、初始化 feature gates 配置
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
klog.Fatal(err)
}
if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
klog.Fatal(err)
}
if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {
klog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")
}
// 4悲靴、加載 kubelet 配置文件
if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
kubeletConfig, err = loadConfigFile(configFile)
......
}
// 5、校驗(yàn)配置文件中的參數(shù)
if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {
klog.Fatal(err)
}
// 6莫其、檢查 kubelet 是否啟用動態(tài)配置功能
var kubeletConfigController *dynamickubeletconfig.Controller
if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
func(kc *kubeletconfiginternal.KubeletConfiguration) error {
return kubeletConfigFlagPrecedence(kc, args)
})
if err != nil {
klog.Fatal(err)
}
if dynamicKubeletConfig != nil {
kubeletConfig = dynamicKubeletConfig
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
klog.Fatal(err)
}
}
}
kubeletServer := &options.KubeletServer{
KubeletFlags: *kubeletFlags,
KubeletConfiguration: *kubeletConfig,
}
// 7癞尚、初始化 kubeletDeps
kubeletDeps, err := UnsecuredDependencies(kubeletServer)
if err != nil {
klog.Fatal(err)
}
kubeletDeps.KubeletConfigController = kubeletConfigController
stopCh := genericapiserver.SetupSignalHandler()
if kubeletServer.KubeletFlags.ExperimentalDockershim {
if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
klog.Fatal(err)
}
return
}
// 8、調(diào)用 Run 方法
if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
klog.Fatal(err)
}
},
}
kubeletFlags.AddFlags(cleanFlagSet)
options.AddKubeletConfigFlags(cleanFlagSet, kubeletConfig)
options.AddGlobalFlags(cleanFlagSet)
......
return cmd
}
Run
該方法中僅僅調(diào)用 run
方法執(zhí)行后面的啟動邏輯乱陡。
k8s.io/kubernetes/cmd/kubelet/app/server.go:408
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) error {
if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
return fmt.Errorf("failed OS init: %v", err)
}
if err := run(s, kubeDeps, stopCh); err != nil {
return fmt.Errorf("failed to run Kubelet: %v", err)
}
return nil
}
run
run
方法中主要是為 kubelet 的啟動做一些基本的配置及檢查工作浇揩,主要邏輯為:
- 1、為 kubelet 設(shè)置默認(rèn)的 FeatureGates憨颠,kubelet 所有的 FeatureGates 可以通過命令參數(shù)查看临燃,k8s 中處于
Alpha
狀態(tài)的 FeatureGates 在組件啟動時默認(rèn)關(guān)閉,處于Beta
和 GA 狀態(tài)的默認(rèn)開啟烙心; - 2、校驗(yàn) kubelet 的參數(shù)乏沸;
- 3淫茵、嘗試獲取 kubelet 的
lock file
,需要在 kubelet 啟動時指定--exit-on-lock-contention
和--lock-file
蹬跃,該功能處于Alpha
版本默認(rèn)為關(guān)閉狀態(tài)匙瘪; - 4铆铆、將當(dāng)前的配置文件注冊到 http server
/configz
URL 中; - 5丹喻、檢查 kubelet 啟動模式是否為 standalone 模式薄货,此模式下不會和 apiserver 交互,主要用于 kubelet 的調(diào)試碍论;
- 6谅猾、初始化 kubeDeps,kubeDeps 中包含 kubelet 的一些依賴鳍悠,主要有
KubeClient
税娜、EventClient
、HeartbeatClient
藏研、Auth
敬矩、cadvisor
、ContainerManager
蠢挡; - 7弧岳、檢查是否以 root 用戶啟動;
- 8业踏、為進(jìn)程設(shè)置 oom 分?jǐn)?shù)禽炬,默認(rèn)為 -999,分?jǐn)?shù)范圍為 [-1000, 1000]堡称,越小越不容易被 kill 掉瞎抛;
- 9、調(diào)用
RunKubelet
方法却紧; - 10桐臊、檢查 kubelet 是否啟動了動態(tài)配置功能;
- 11晓殊、啟動 Healthz http server断凶;
- 12、如果使用 systemd 啟動巫俺,通知 systemd kubelet 已經(jīng)啟動认烁;
k8s.io/kubernetes/cmd/kubelet/app/server.go:472
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
// 1、為 kubelet 設(shè)置默認(rèn)的 FeatureGates
err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
if err != nil {
return err
}
// 2介汹、校驗(yàn) kubelet 的參數(shù)
if err := options.ValidateKubeletServer(s); err != nil {
return err
}
// 3却嗡、嘗試獲取 kubelet 的 lock file
if s.ExitOnLockContention && s.LockFilePath == "" {
return errors.New("cannot exit on lock file contention: no lock file specified")
}
done := make(chan struct{})
if s.LockFilePath != "" {
klog.Infof("acquiring file lock on %q", s.LockFilePath)
if err := flock.Acquire(s.LockFilePath); err != nil {
return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
}
if s.ExitOnLockContention {
klog.Infof("watching for inotify events for: %v", s.LockFilePath)
if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
return err
}
}
}
// 4、將當(dāng)前的配置文件注冊到 http server /configz URL 中嘹承;
err = initConfigz(&s.KubeletConfiguration)
if err != nil {
klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
}
// 5窗价、判斷是否為 standalone 模式
standaloneMode := true
if len(s.KubeConfig) > 0 {
standaloneMode = false
}
// 6、初始化 kubeDeps
if kubeDeps == nil {
kubeDeps, err = UnsecuredDependencies(s)
if err != nil {
return err
}
}
if kubeDeps.Cloud == nil {
if !cloudprovider.IsExternal(s.CloudProvider) {
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
if err != nil {
return err
}
......
kubeDeps.Cloud = cloud
}
}
hostName, err := nodeutil.GetHostname(s.HostnameOverride)
if err != nil {
return err
}
nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
if err != nil {
return err
}
// 7叹卷、如果是 standalone 模式將所有 client 設(shè)置為 nil
switch {
case standaloneMode:
kubeDeps.KubeClient = nil
kubeDeps.EventClient = nil
kubeDeps.HeartbeatClient = nil
// 8撼港、為 kubeDeps 初始化 KubeClient坪它、EventClient、HeartbeatClient 模塊
case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)
if err != nil {
return err
}
if closeAllConns == nil {
return errors.New("closeAllConns must be a valid function other than nil")
}
kubeDeps.OnHeartbeatFailure = closeAllConns
kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet client: %v", err)
}
eventClientConfig := *clientConfig
eventClientConfig.QPS = float32(s.EventRecordQPS)
eventClientConfig.Burst = int(s.EventBurst)
kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet event client: %v", err)
}
heartbeatClientConfig := *clientConfig
heartbeatClientConfig.Timeout = s.KubeletConfiguration.NodeStatusUpdateFrequency.Duration
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
leaseTimeout := time.Duration(s.KubeletConfiguration.NodeLeaseDurationSeconds) * time.Second
if heartbeatClientConfig.Timeout > leaseTimeout {
heartbeatClientConfig.Timeout = leaseTimeout
}
}
heartbeatClientConfig.QPS = float32(-1)
kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
if err != nil {
return fmt.Errorf("failed to initialize kubelet heartbeat client: %v", err)
}
}
// 9帝牡、初始化 auth 模塊
if kubeDeps.Auth == nil {
auth, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
if err != nil {
return err
}
kubeDeps.Auth = auth
}
var cgroupRoots []string
// 10往毡、設(shè)置 cgroupRoot
cgroupRoots = append(cgroupRoots, cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupDriver))
kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
if err != nil {
} else if kubeletCgroup != "" {
cgroupRoots = append(cgroupRoots, kubeletCgroup)
}
runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
if err != nil {
} else if runtimeCgroup != "" {
cgroupRoots = append(cgroupRoots, runtimeCgroup)
}
if s.SystemCgroups != "" {
cgroupRoots = append(cgroupRoots, s.SystemCgroups)
}
// 11、初始化 cadvisor
if kubeDeps.CAdvisorInterface == nil {
imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s. ContainerRuntime, s.RemoteRuntimeEndpoint))
if err != nil {
return err
}
}
makeEventRecorder(kubeDeps, nodeName)
// 12靶溜、初始化 ContainerManager
if kubeDeps.ContainerManager == nil {
if s.CgroupsPerQOS && s.CgroupRoot == "" {
s.CgroupRoot = "/"
}
kubeReserved, err := parseResourceList(s.KubeReserved)
if err != nil {
return err
}
systemReserved, err := parseResourceList(s.SystemReserved)
if err != nil {
return err
}
var hardEvictionThresholds []evictionapi.Threshold
if !s.ExperimentalNodeAllocatableIgnoreEvictionThreshold {
hardEvictionThresholds, err = eviction.ParseThresholdConfig([]string{}, s.EvictionHard, nil, nil, nil)
if err != nil {
return err
}
}
experimentalQOSReserved, err := cm.ParseQOSReserved(s.QOSReserved)
if err != nil {
return err
}
devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)
kubeDeps.ContainerManager, err = cm.NewContainerManager(
kubeDeps.Mounter,
kubeDeps.CAdvisorInterface,
cm.NodeConfig{
......
},
s.FailSwapOn,
devicePluginEnabled,
kubeDeps.Recorder)
if err != nil {
return err
}
}
// 13开瞭、檢查是否以 root 權(quán)限啟動
if err := checkPermissions(); err != nil {
klog.Error(err)
}
utilruntime.ReallyCrash = s.ReallyCrashForTesting
// 14、為 kubelet 進(jìn)程設(shè)置 oom 分?jǐn)?shù)
oomAdjuster := kubeDeps.OOMAdjuster
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
klog.Warning(err)
}
// 15墨技、調(diào)用 RunKubelet 方法執(zhí)行后續(xù)的啟動操作
if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
return err
}
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&
kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {
return err
}
}
// 16惩阶、啟動 Healthz http server
if s.HealthzPort > 0 {
mux := http.NewServeMux()
healthz.InstallHandler(mux)
go wait.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
if err != nil {
klog.Errorf("Starting healthz server failed: %v", err)
}
}, 5*time.Second, wait.NeverStop)
}
if s.RunOnce {
return nil
}
// 17、向 systemd 發(fā)送啟動信號
go daemon.SdNotify(false, "READY=1")
select {
case <-done:
break
case <-stopCh:
break
}
return nil
}
RunKubelet
RunKubelet
中主要調(diào)用了 createAndInitKubelet
方法執(zhí)行 kubelet 組件的初始化扣汪,然后調(diào)用 startKubelet
啟動 kubelet 中的組件断楷。
k8s.io/kubernetes/cmd/kubelet/app/server.go:989
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
if err != nil {
return err
}
nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
if err != nil {
return err
}
makeEventRecorder(kubeDeps, nodeName)
// 1、默認(rèn)啟動特權(quán)模式
capabilities.Initialize(capabilities.Capabilities{
AllowPrivileged: true,
})
credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)
if kubeDeps.OSInterface == nil {
kubeDeps.OSInterface = kubecontainer.RealOS{}
}
// 2崭别、調(diào)用 createAndInitKubelet
k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
......
kubeServer.NodeStatusMaxImages)
if err != nil {
return fmt.Errorf("failed to create kubelet: %v", err)
}
if kubeDeps.PodConfig == nil {
return fmt.Errorf("failed to create kubelet, pod source config was nil")
}
podCfg := kubeDeps.PodConfig
rlimit.RlimitNumFiles(uint64(kubeServer.MaxOpenFiles))
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %v", err)
}
klog.Info("Started kubelet as runonce")
} else {
// 3冬筒、調(diào)用 startKubelet
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
klog.Info("Started kubelet")
}
return nil
}
createAndInitKubelet
createAndInitKubelet
中主要調(diào)用了三個方法來完成 kubelet 的初始化:
-
kubelet.NewMainKubelet
:實(shí)例化 kubelet 對象,并對 kubelet 依賴的所有模塊進(jìn)行初始化茅主; -
k.BirthCry
:向 apiserver 發(fā)送一條 kubelet 啟動了的 event舞痰; -
k.StartGarbageCollection
:啟動垃圾回收服務(wù),回收 container 和 images诀姚;
k8s.io/kubernetes/cmd/kubelet/app/server.go:1089
func createAndInitKubelet(......) {
k, err = kubelet.NewMainKubelet(
......
)
if err != nil {
return nil, err
}
k.BirthCry()
k.StartGarbageCollection()
return k, nil
}
kubelet.NewMainKubelet
NewMainKubelet
是初始化 kubelet 的一個方法响牛,主要邏輯為:
- 1、初始化 PodConfig 即監(jiān)聽 pod 元數(shù)據(jù)的來源(file赫段,http呀打,apiserver),將不同 source 的 pod configuration 合并到一個結(jié)構(gòu)中糯笙;
- 2贬丛、初始化 containerGCPolicy、imageGCPolicy给涕、evictionConfig 配置豺憔;
- 3、啟動 serviceInformer 和 nodeInformer够庙;
- 4恭应、初始化
containerRefManager
、oomWatcher
耘眨; - 5暮屡、初始化 kubelet 對象;
- 6毅桃、初始化
secretManager
褒纲、configMapManager
; - 7钥飞、初始化
livenessManager
莺掠、podManager
、statusManager
读宙、resourceAnalyzer
彻秆; - 8、調(diào)用
kuberuntime.NewKubeGenericRuntimeManager
初始化containerRuntime
结闸; - 9唇兑、初始化
pleg
; - 10桦锄、初始化
containerGC
扎附、containerDeletor
、imageManager
结耀、containerLogManager
留夜; - 11、初始化
serverCertificateManager
图甜、probeManager
碍粥、tokenManager
、volumePluginMgr
黑毅、pluginManager
嚼摩、volumeManager
; - 12矿瘦、初始化
workQueue
枕面、podWorkers
、evictionManager
匪凡; - 13膊畴、最后注冊相關(guān)模塊的 handler;
NewMainKubelet
中對 kubelet 依賴的所有模塊進(jìn)行了初始化病游,每個模塊對應(yīng)的功能在上篇文章“kubelet 架構(gòu)淺析”有介紹唇跨,至于每個模塊初始化的流程以及功能會在后面的文章中進(jìn)行詳細(xì)分析。
k8s.io/kubernetes/pkg/kubelet/kubelet.go:335
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
if kubeCfg.SyncFrequency.Duration <= 0 {
return nil, fmt.Errorf("invalid sync frequency %d", kubeCfg.SyncFrequency.Duration)
}
if kubeCfg.MakeIPTablesUtilChains {
......
}
hostname, err := nodeutil.GetHostname(hostnameOverride)
if err != nil {
return nil, err
}
nodeName := types.NodeName(hostname)
if kubeDeps.Cloud != nil {
......
}
// 1衬衬、初始化 PodConfig
if kubeDeps.PodConfig == nil {
var err error
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
if err != nil {
return nil, err
}
}
// 2樊零、初始化 containerGCPolicy、imageGCPolicy叠纹、evictionConfig
containerGCPolicy := kubecontainer.ContainerGCPolicy{
MinAge: minimumGCAge.Duration,
MaxPerPodContainer: int(maxPerPodContainerCount),
MaxContainers: int(maxContainerCount),
}
daemonEndpoints := &v1.NodeDaemonEndpoints{
KubeletEndpoint: v1.DaemonEndpoint{Port: kubeCfg.Port},
}
imageGCPolicy := images.ImageGCPolicy{
MinAge: kubeCfg.ImageMinimumGCAge.Duration,
HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
LowThresholdPercent: int(kubeCfg.ImageGCLowThresholdPercent),
}
enforceNodeAllocatable := kubeCfg.EnforceNodeAllocatable
if experimentalNodeAllocatableIgnoreEvictionThreshold {
enforceNodeAllocatable = []string{}
}
thresholds, err := eviction.ParseThresholdConfig(enforceNodeAllocatable, kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg. EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
if err != nil {
return nil, err
}
evictionConfig := eviction.Config{
PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
Thresholds: thresholds,
KernelMemcgNotification: experimentalKernelMemcgNotification,
PodCgroupRoot: kubeDeps.ContainerManager.GetPodCgroupRoot(),
}
// 3疲吸、啟動 serviceInformer 和 nodeInformer
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
if kubeDeps.KubeClient != nil {
serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0)
go r.Run(wait.NeverStop)
}
serviceLister := corelisters.NewServiceLister(serviceIndexer)
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
if kubeDeps.KubeClient != nil {
fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
go r.Run(wait.NeverStop)
}
nodeInfo := &CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}
......
// 4、初始化 containerRefManager狮惜、oomWatcher
containerRefManager := kubecontainer.NewRefManager()
oomWatcher := oomwatcher.NewWatcher(kubeDeps.Recorder)
clusterDNS := make([]net.IP, 0, len(kubeCfg.ClusterDNS))
for _, ipEntry := range kubeCfg.ClusterDNS {
ip := net.ParseIP(ipEntry)
if ip == nil {
klog.Warningf("Invalid clusterDNS ip '%q'", ipEntry)
} else {
clusterDNS = append(clusterDNS, ip)
}
}
httpClient := &http.Client{}
parsedNodeIP := net.ParseIP(nodeIP)
protocol := utilipt.ProtocolIpv4
if parsedNodeIP != nil && parsedNodeIP.To4() == nil {
protocol = utilipt.ProtocolIpv6
}
// 5高诺、初始化 kubelet 對象
klet := &Kubelet{......}
if klet.cloud != nil {
klet.cloudResourceSyncManager = cloudresource.NewSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency)
}
// 6碌识、初始化 secretManager、configMapManager
var secretManager secret.Manager
var configMapManager configmap.Manager
switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
case kubeletconfiginternal.WatchChangeDetectionStrategy:
secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient)
configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient)
case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
secretManager = secret.NewCachingSecretManager(
kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
configMapManager = configmap.NewCachingConfigMapManager(
kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
case kubeletconfiginternal.GetChangeDetectionStrategy:
secretManager = secret.NewSimpleSecretManager(kubeDeps.KubeClient)
configMapManager = configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient)
default:
return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)
}
klet.secretManager = secretManager
klet.configMapManager = configMapManager
if klet.experimentalHostUserNamespaceDefaulting {
klog.Infof("Experimental host user namespace defaulting is enabled.")
}
machineInfo, err := klet.cadvisor.MachineInfo()
if err != nil {
return nil, err
}
klet.machineInfo = machineInfo
imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
// 7虱而、初始化 livenessManager筏餐、podManager、statusManager牡拇、resourceAnalyzer
klet.livenessManager = proberesults.NewManager()
klet.podCache = kubecontainer.NewCache()
var checkpointManager checkpointmanager.CheckpointManager
if bootstrapCheckpointPath != "" {
checkpointManager, err = checkpointmanager.NewCheckpointManager(bootstrapCheckpointPath)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %+v", err)
}
}
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager, checkpointManager)
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
if remoteRuntimeEndpoint != "" {
if remoteImageEndpoint == "" {
remoteImageEndpoint = remoteRuntimeEndpoint
}
}
pluginSettings := dockershim.NetworkPluginSettings{......}
klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration)
var legacyLogProvider kuberuntime.LegacyLogProvider
// 8魁瞪、調(diào)用 kuberuntime.NewKubeGenericRuntimeManager 初始化 containerRuntime
switch containerRuntime {
case kubetypes.DockerContainerRuntime:
streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions)
ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,
&pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, !crOptions.RedirectContainerStreaming)
if err != nil {
return nil, err
}
if crOptions.RedirectContainerStreaming {
klet.criHandler = ds
}
server := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds)
if err := server.Start(); err != nil {
return nil, err
}
supported, err := ds.IsCRISupportedLogDriver()
if err != nil {
return nil, err
}
if !supported {
klet.dockerLegacyService = ds
legacyLogProvider = ds
}
case kubetypes.RemoteContainerRuntime:
break
default:
return nil, fmt.Errorf("unsupported CRI runtime: %q", containerRuntime)
}
runtimeService, imageService, err := getRuntimeAndImageServices(remoteRuntimeEndpoint, remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout)
if err != nil {
return nil, err
}
klet.runtimeService = runtimeService
if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && kubeDeps.KubeClient != nil {
klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
}
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(......)
if err != nil {
return nil, err
}
klet.containerRuntime = runtime
klet.streamingRuntime = runtime
klet.runner = runtime
runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
if err != nil {
return nil, err
}
klet.runtimeCache = runtimeCache
if cadvisor.UsingLegacyCadvisorStats(containerRuntime, remoteRuntimeEndpoint) {
klet.StatsProvider = stats.NewCadvisorStatsProvider(......)
} else {
klet.StatsProvider = stats.NewCRIStatsProvider(......)
}
// 9、初始化 pleg
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
if _, err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil {
klog.Errorf("Pod CIDR update failed %v", err)
}
// 10惠呼、初始化 containerGC导俘、containerDeletor、imageManager剔蹋、containerLogManager
containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady)
if err != nil {
return nil, err
}
klet.containerGC = containerGC
klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions. PodSandboxImage)
if err != nil {
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
}
klet.imageManager = imageManager
if containerRuntime == kubetypes.RemoteContainerRuntime && utilfeature.DefaultFeatureGate.Enabled(features.CRIContainerLogRotation) {
containerLogManager, err := logs.NewContainerLogManager(
klet.runtimeService,
kubeCfg.ContainerLogMaxSize,
int(kubeCfg.ContainerLogMaxFiles),
)
if err != nil {
return nil, fmt.Errorf("failed to initialize container log manager: %v", err)
}
klet.containerLogManager = containerLogManager
} else {
klet.containerLogManager = logs.NewStubContainerLogManager()
}
// 11旅薄、初始化 serverCertificateManager、probeManager滩租、tokenManager赋秀、volumePluginMgr、pluginManager律想、volumeManager
if kubeCfg.ServerTLSBootstrap && kubeDeps.TLSOptions != nil && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, klet. getLastObservedNodeAddresses, certDirectory)
if err != nil {
return nil, fmt.Errorf("failed to initialize certificate manager: %v", err)
}
kubeDeps.TLSOptions.Config.GetCertificate = func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
cert := klet.serverCertificateManager.Current()
if cert == nil {
return nil, fmt.Errorf("no serving certificate available for the kubelet")
}
return cert, nil
}
}
klet.probeManager = prober.NewManager(......)
tokenManager := token.NewManager(kubeDeps.KubeClient)
klet.volumePluginMgr, err =
NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
if err != nil {
return nil, err
}
klet.pluginManager = pluginmanager.NewPluginManager(
klet.getPluginsRegistrationDir(), /* sockDir */
klet.getPluginsDir(), /* deprecatedSockDir */
kubeDeps.Recorder,
)
if len(experimentalMounterPath) != 0 {
experimentalCheckNodeCapabilitiesBeforeMount = false
klet.dnsConfigurer.SetupDNSinContainerizedMounter(experimentalMounterPath)
}
klet.volumeManager = volumemanager.NewVolumeManager(......)
// 12猎莲、初始化 workQueue、podWorkers技即、evictionManager
klet.reasonCache = NewReasonCache()
klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)
klet.evictionManager = evictionManager
klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
if utilfeature.DefaultFeatureGate.Enabled(features.Sysctls) {
runtimeSupport, err := sysctl.NewRuntimeAdmitHandler(klet.containerRuntime)
if err != nil {
return nil, err
}
safeAndUnsafeSysctls := append(sysctlwhitelist.SafeSysctlWhitelist(), allowedUnsafeSysctls...)
sysctlsWhitelist, err := sysctl.NewWhitelist(safeAndUnsafeSysctls)
if err != nil {
return nil, err
}
klet.admitHandlers.AddPodAdmitHandler(runtimeSupport)
klet.admitHandlers.AddPodAdmitHandler(sysctlsWhitelist)
}
// 13著洼、為 pod 注冊相關(guān)模塊的 handler
activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
if err != nil {
return nil, err
}
klet.AddPodSyncLoopHandler(activeDeadlineHandler)
klet.AddPodSyncHandler(activeDeadlineHandler)
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyManager) {
klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetTopologyPodAdmitHandler())
}
criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder),kubeDeps.Recorder)
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
for _, opt := range kubeDeps.Options {
opt(klet)
}
klet.appArmorValidator = apparmor.NewValidator(containerRuntime)
klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator))
klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewNoNewPrivsAdmitHandler(klet.containerRuntime))
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
klet.nodeLeaseController = nodelease.NewController(klet.clock, klet.heartbeatClient, string(klet.nodeName), kubeCfg.NodeLeaseDurationSeconds, klet.onRepeatedHeartbeatFailure)
}
klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewProcMountAdmitHandler(klet.containerRuntime))
klet.kubeletConfiguration = *kubeCfg
klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
return klet, nil
}
startKubelet
在startKubelet
中通過調(diào)用 k.Run
來啟動 kubelet 中的所有模塊以及主流程,然后啟動 kubelet 所需要的 http server而叼,在 v1.16 中身笤,kubelet 默認(rèn)僅啟動健康檢查端口 10248 和 kubelet server 的端口 10250。
k8s.io/kubernetes/cmd/kubelet/app/server.go:1070
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
// start the kubelet
go wait.Until(func() {
k.Run(podCfg.Updates())
}, 0, wait.NeverStop)
// start the kubelet server
if enableServer {
go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg. EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
}
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
go k.ListenAndServePodResources()
}
}
至此葵陵,kubelet 對象以及其依賴模塊在上面的幾個方法中已經(jīng)初始化完成了液荸,除了單獨(dú)啟動了 gc 模塊外其余的模塊以及主邏輯最后都會在 Run
方法啟動,Run
方法的主要邏輯在下文中會進(jìn)行解釋脱篙,此處總結(jié)一下 kubelet 啟動邏輯中的調(diào)用關(guān)系如下所示:
|--> NewMainKubelet
|
|--> createAndInitKubelet --|--> BirthCry
| |
|--> RunKubelet --| |--> StartGarbageCollection
| |
| |--> startKubelet --> k.Run
|
NewKubeletCommand --> Run --> run --|--> http.ListenAndServe
|
|--> daemon.SdNotify
Run
Run
方法是啟動 kubelet 的核心方法娇钱,其中會啟動 kubelet 的依賴模塊以及主循環(huán)邏輯,該方法的主要邏輯為:
- 1绊困、注冊 logServer文搂;
- 2、判斷是否需要啟動 cloud provider sync manager秤朗;
- 3煤蹭、調(diào)用
kl.initializeModules
首先啟動不依賴 container runtime 的一些模塊; - 4、啟動
volume manager
硝皂; - 5常挚、執(zhí)行
kl.syncNodeStatus
定時同步 Node 狀態(tài); - 6稽物、調(diào)用
kl.fastStatusUpdateOnce
更新容器運(yùn)行時啟動時間以及執(zhí)行首次狀態(tài)同步待侵; - 7、判斷是否啟用
NodeLease
機(jī)制姨裸; - 8、執(zhí)行
kl.updateRuntimeUp
定時更新 Runtime 狀態(tài)怨酝; - 9傀缩、執(zhí)行
kl.syncNetworkUtil
定時同步 iptables 規(guī)則; - 10农猬、執(zhí)行
kl.podKiller
定時清理異常 pod赡艰,當(dāng) pod 沒有被 podworker 正確處理的時候,啟動一個goroutine 負(fù)責(zé) kill 掉 pod斤葱; - 11慷垮、啟動
statusManager
; - 12揍堕、啟動
probeManager
料身; - 13、啟動
runtimeClassManager
衩茸; - 14芹血、啟動
pleg
; - 15楞慈、調(diào)用
kl.syncLoop
監(jiān)聽 pod 變化幔烛;
在 Run
方法中主要調(diào)用了兩個方法 kl.initializeModules
和 kl.fastStatusUpdateOnce
來完成啟動前的一些初始化,在初始化完所有的模塊后會啟動主循環(huán)囊蓝。
k8s.io/kubernetes/pkg/kubelet/kubelet.go:1398
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
// 1饿悬、注冊 logServer
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
klog.Warning("No api server defined - no node status update will be sent.")
}
// 2、判斷是否需要啟動 cloud provider sync manager
if kl.cloudResourceSyncManager != nil {
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
}
// 3聚霜、調(diào)用 kl.initializeModules 首先啟動不依賴 container runtime 的一些模塊
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.Fatal(err)
}
// 4狡恬、啟動 volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil {
// 5、執(zhí)行 kl.syncNodeStatus 定時同步 Node 狀態(tài)
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
// 6俯萎、調(diào)用 kl.fastStatusUpdateOnce 更新容器運(yùn)行時啟動時間以及執(zhí)行首次狀態(tài)同步
go kl.fastStatusUpdateOnce()
// 7傲宜、判斷是否啟用 NodeLease 機(jī)制
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
go kl.nodeLeaseController.Run(wait.NeverStop)
}
}
// 8、執(zhí)行 kl.updateRuntimeUp 定時更新 Runtime 狀態(tài)
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// 9夫啊、執(zhí)行 kl.syncNetworkUtil 定時同步 iptables 規(guī)則
if kl.makeIPTablesUtilChains {
go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
}
// 10函卒、執(zhí)行 kl.podKiller 定時清理異常 pod
go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
// 11、啟動 statusManager、probeManager报嵌、runtimeClassManager
kl.statusManager.Start()
kl.probeManager.Start()
if kl.runtimeClassManager != nil {
kl.runtimeClassManager.Start(wait.NeverStop)
}
// 12虱咧、啟動 pleg
kl.pleg.Start()
// 13、調(diào)用 kl.syncLoop 監(jiān)聽 pod 變化
kl.syncLoop(updates, kl)
}
initializeModules
initializeModules
中啟動的模塊是不依賴于 container runtime 的锚国,并且不依賴于尚未初始化的模塊腕巡,其主要邏輯為:
- 1、調(diào)用
kl.setupDataDirs
創(chuàng)建 kubelet 所需要的文件目錄血筑; - 2绘沉、創(chuàng)建 ContainerLogsDir
/var/log/containers
; - 3豺总、啟動
imageManager
车伞,image gc 的功能已經(jīng)在 RunKubelet 中啟動了,此處主要是監(jiān)控 image 的變化喻喳; - 4另玖、啟動
certificateManager
,負(fù)責(zé)證書更新表伦; - 5谦去、啟動
oomWatcher
,監(jiān)聽 oom 并記錄事件蹦哼; - 6鳄哭、啟動
resourceAnalyzer
;
k8s.io/kubernetes/pkg/kubelet/kubelet.go:1319
func (kl *Kubelet) initializeModules() error {
metrics.Register(
kl.runtimeCache,
collectors.NewVolumeStatsCollector(kl),
collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats),
)
metrics.SetNodeName(kl.nodeName)
servermetrics.Register()
// 1翔怎、創(chuàng)建文件目錄
if err := kl.setupDataDirs(); err != nil {
return err
}
// 2窃诉、創(chuàng)建 ContainerLogsDir
if _, err := os.Stat(ContainerLogsDir); err != nil {
if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
klog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err)
}
}
// 3、啟動 imageManager
kl.imageManager.Start()
// 4赤套、啟動 certificate manager
if kl.serverCertificateManager != nil {
kl.serverCertificateManager.Start()
}
// 5飘痛、啟動 oomWatcher.
if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
return fmt.Errorf("failed to start OOM watcher %v", err)
}
// 6、啟動 resource analyzer
kl.resourceAnalyzer.Start()
return nil
}
fastStatusUpdateOnce
fastStatusUpdateOnce
會不斷嘗試更新 pod CIDR容握,一旦更新成功會立即執(zhí)行updateRuntimeUp
和syncNodeStatus
來進(jìn)行運(yùn)行時的更新和節(jié)點(diǎn)狀態(tài)更新宣脉。此方法只在 kubelet 啟動時執(zhí)行一次,目的是為了通過更新 pod CIDR剔氏,減少節(jié)點(diǎn)達(dá)到 ready 狀態(tài)的時延塑猖,盡可能快的進(jìn)行 runtime update 和 node status update。
k8s.io/kubernetes/pkg/kubelet/kubelet.go:2262
func (kl *Kubelet) fastStatusUpdateOnce() {
for {
time.Sleep(100 * time.Millisecond)
node, err := kl.GetNode()
if err != nil {
klog.Errorf(err.Error())
continue
}
if len(node.Spec.PodCIDRs) != 0 {
podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
if _, err := kl.updatePodCIDR(podCIDRs); err != nil {
klog.Errorf("Pod CIDR update to %v failed %v", podCIDRs, err)
continue
}
kl.updateRuntimeUp()
kl.syncNodeStatus()
return
}
}
}
updateRuntimeUp
updateRuntimeUp
方法在容器運(yùn)行時首次啟動過程中初始化運(yùn)行時依賴的模塊谈跛,并在 kubelet 的runtimeState
中更新容器運(yùn)行時的啟動時間羊苟。updateRuntimeUp
方法首先檢查 network 以及 runtime 是否處于 ready 狀態(tài),如果 network 以及 runtime 都處于 ready 狀態(tài)感憾,然后調(diào)用 initializeRuntimeDependentModules
初始化 runtime 的依賴模塊蜡励,包括 cadvisor
、containerManager
、evictionManager
凉倚、containerLogManager
兼都、pluginManage
等。
k8s.io/kubernetes/pkg/kubelet/kubelet.go:2168
func (kl *Kubelet) updateRuntimeUp() {
kl.updateRuntimeMux.Lock()
defer kl.updateRuntimeMux.Unlock()
// 1稽寒、獲取 containerRuntime Status
s, err := kl.containerRuntime.Status()
if err != nil {
klog.Errorf("Container runtime sanity check failed: %v", err)
return
}
if s == nil {
klog.Errorf("Container runtime status is nil")
return
}
// 2扮碧、檢查 network 和 runtime 是否處于 ready 狀態(tài)
networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
if networkReady == nil || !networkReady.Status {
kl.runtimeState.setNetworkState(fmt.Errorf("runtime network not ready: %v", networkReady))
} else {
kl.runtimeState.setNetworkState(nil)
}
runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
if runtimeReady == nil || !runtimeReady.Status {
kl.runtimeState.setRuntimeState(err)
return
}
kl.runtimeState.setRuntimeState(nil)
// 3、調(diào)用 kl.initializeRuntimeDependentModules 啟動依賴模塊
kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
kl.runtimeState.setRuntimeSync(kl.clock.Now())
}
initializeRuntimeDependentModules
該方法的主要邏輯為:
- 1杏糙、啟動
cadvisor
慎王; - 2、獲取 CgroupStats宏侍;
- 3柬祠、啟動
containerManager
、evictionManager
负芋、containerLogManager
; - 4嗜愈、將 CSI Driver 和 Device Manager 注冊到
pluginManager
旧蛾,然后啟動pluginManager
;
k8s.io/kubernetes/pkg/kubelet/kubelet.go:1361
func (kl *Kubelet) initializeRuntimeDependentModules() {
// 1蠕嫁、啟動 cadvisor
if err := kl.cadvisor.Start(); err != nil {
......
}
// 2锨天、獲取 CgroupStats
kl.StatsProvider.GetCgroupStats("/", true)
node, err := kl.getNodeAnyWay()
if err != nil {
klog.Fatalf("Kubelet failed to get node info: %v", err)
}
// 3、啟動 containerManager剃毒、evictionManager病袄、containerLogManager
if err := kl.containerManager.Start(node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService); err != nil {
klog.Fatalf("Failed to start ContainerManager %v", err)
}
kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.podResourcesAreReclaimed, evictionMonitoringPeriod)
kl.containerLogManager.Start()
kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
// 4、啟動 pluginManager
go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
}
小結(jié)
在 Run
方法中可以看到赘阀,會直接調(diào)用 kl.syncNodeStatus
和 kl.updateRuntimeUp
益缠,但在 kl.fastStatusUpdateOnce
中也調(diào)用了這兩個方法,而在 kl.fastStatusUpdateOnce
中僅執(zhí)行一次基公,在 Run
方法中會定期執(zhí)行幅慌。在kl.fastStatusUpdateOnce
中調(diào)用的目的就是當(dāng) kubelet 首次啟動時盡可能快的進(jìn)行 runtime update 和 node status update,減少節(jié)點(diǎn)達(dá)到 ready 狀態(tài)的時延轰豆。而在 kl.updateRuntimeUp
中調(diào)用的初始化 runtime 依賴模塊的方法 kl.initializeRuntimeDependentModules
通過 sync.Once 調(diào)用僅僅會被執(zhí)行一次胰伍。
syncLoop
syncLoop
是 kubelet 的主循環(huán)方法,它從不同的管道(file酸休,http骂租,apiserver)監(jiān)聽 pod 的變化,并把它們匯聚起來斑司。當(dāng)有新的變化發(fā)生時渗饮,它會調(diào)用對應(yīng)的函數(shù),保證 pod 處于期望的狀態(tài)。
syncLoop
中首先定義了一個 syncTicker
和 housekeepingTicker
抽米,即使沒有需要更新的 pod 配置特占,kubelet 也會定時去做同步和清理 pod 的工作。然后在 for 循環(huán)中一直調(diào)用 syncLoopIteration
云茸,如果在每次循環(huán)過程中出現(xiàn)錯誤時是目,kubelet 會記錄到 runtimeState
中,遇到錯誤就等待 5 秒中繼續(xù)循環(huán)标捺。
k8s.io/kubernetes/pkg/kubelet/kubelet.go:1821
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
const (
base = 100 * time.Millisecond
max = 5 * time.Second
factor = 2
)
duration := base
for {
if err := kl.runtimeState.runtimeErrors(); err != nil {
time.Sleep(duration)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
continue
}
duration = base
kl.syncLoopMonitor.Store(kl.clock.Now())
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}
syncLoopIteration
syncLoopIteration
方法會監(jiān)聽多個 channel懊纳,當(dāng)發(fā)現(xiàn)任何一個 channel 有數(shù)據(jù)就交給 handler 去處理,在 handler 中通過調(diào)用 dispatchWork
分發(fā)任務(wù)亡容。它會從以下幾個 channel 中獲取消息:
- 1嗤疯、configCh:該信息源由 kubeDeps 對象中的 PodConfig 子模塊提供,該模塊將同時 watch 3 個不同來源的 pod 信息的變化(file闺兢,http茂缚,apiserver),一旦某個來源的 pod 信息發(fā)生了更新(創(chuàng)建/更新/刪除)屋谭,這個 channel 中就會出現(xiàn)被更新的 pod 信息和更新的具體操作脚囊;
- 2、syncCh:定時器桐磁,每隔一秒去同步最新保存的 pod 狀態(tài)悔耘;
- 3、houseKeepingCh:housekeeping 事件的通道我擂,做 pod 清理工作衬以;
- 4、plegCh:該信息源由 kubelet 對象中的 pleg 子模塊提供校摩,該模塊主要用于周期性地向 container runtime 查詢當(dāng)前所有容器的狀態(tài)看峻,如果狀態(tài)發(fā)生變化,則這個 channel 產(chǎn)生事件衙吩;
- 5备籽、liveness Manager:健康檢查模塊發(fā)現(xiàn)某個 pod 異常時,kubelet 將根據(jù) pod 的 restartPolicy 自動執(zhí)行正確的操作分井;
k8s.io/kubernetes/pkg/kubelet/kubelet.go:1888
func (kl *Kubelet) syncLoopIteration(......) bool {
select {
case u, open := <-configCh:
if !open {
return false
}
switch u.Op {
case kubetypes.ADD:
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
handler.HandlePodUpdates(u.Pods)
case kubetypes.RESTORE:
handler.HandlePodAdditions(u.Pods)
case kubetypes.SET:
}
if u.Op != kubetypes.RESTORE {
kl.sourcesReady.AddSource(u.Source)
}
case e := <-plegCh:
if isSyncPodWorthy(e) {
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
handler.HandlePodSyncs([]*v1.Pod{pod})
} else {
klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
}
}
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
kl.cleanUpContainersInPod(e.ID, containerID)
}
}
case <-syncCh:
podsToSync := kl.getPodsToSync()
if len(podsToSync) == 0 {
break
}
handler.HandlePodSyncs(podsToSync)
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
break
}
handler.HandlePodSyncs([]*v1.Pod{pod})
}
case <-housekeepingCh:
if !kl.sourcesReady.AllReady() {
klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
} else {
if err := handler.HandlePodCleanups(); err != nil {
klog.Errorf("Failed cleaning pods: %v", err)
}
}
}
return true
}
最后再總結(jié)一下啟動 kubelet 以及其依賴模塊 Run
方法中的調(diào)用流程:
|--> kl.cloudResourceSyncManager.Run
|
| |--> kl.setupDataDirs
| |--> kl.imageManager.Start
Run --|--> kl.initializeModules ---|--> kl.serverCertificateManager.Start
| |--> kl.oomWatcher.Start
| |--> kl.resourceAnalyzer.Start
|
|--> kl.volumeManager.Run
| |--> kl.containerRuntime.Status
|--> kl.syncNodeStatus |
| |--> kl.updateRuntimeUp --| |--> kl.cadvisor.Start
| | | |
|--> kl.fastStatusUpdateOnce --| |--> kl.initializeRuntimeDependentModules --|--> kl.containerManager.Start
| | |
| |--> kl.syncNodeStatus |--> kl.evictionManager.Start
| |
|--> kl.updateRuntimeUp |--> kl.containerLogManager.Start
| |
|--> kl.syncNetworkUtil |--> kl.pluginManager.Run
|
|--> kl.podKiller
|
|--> kl.statusManager.Start
|
|--> kl.probeManager.Start
|
|--> kl.runtimeClassManager.Start
|
|--> kl.pleg.Start
|
|--> kl.syncLoop --> kl.syncLoopIteration
總結(jié)
本文主要介紹了 kubelet 的啟動流程车猬,可以看到 kubelet 啟動流程中的環(huán)節(jié)非常多,kubelet 中也包含了非常多的模塊尺锚,后續(xù)在分享 kubelet 源碼的文章中會先以 Run
方法中啟動的所有模塊為主珠闰,各個擊破。