上篇文章(kubelet 架構(gòu)淺析 )已經(jīng)介紹過 kubelet 在整個(gè)集群架構(gòu)中的功能以及自身各模塊的用途插掂,本篇文章主要介紹 kubelet 的啟動流程烦周。
kubernetes 版本: v1.12
kubelet 啟動流程
kubelet 代碼結(jié)構(gòu):
? kubernetes git:(release-1.12) ? tree cmd/kubelet
cmd/kubelet
├── BUILD
├── OWNERS
├── app
│ ├── BUILD
│ ├── OWNERS
│ ├── auth.go
│ ├── init_others.go
│ ├── init_windows.go
│ ├── options
│ │ ├── BUILD
│ │ ├── container_runtime.go
│ │ ├── globalflags.go
│ │ ├── globalflags_linux.go
│ │ ├── globalflags_other.go
│ │ ├── options.go
│ │ ├── options_test.go
│ │ ├── osflags_others.go
│ │ └── osflags_windows.go
│ ├── plugins.go
│ ├── server.go
│ ├── server_linux.go
│ ├── server_test.go
│ └── server_unsupported.go
└── kubelet.go
2 directories, 22 files
1、kubelet 入口函數(shù) main(cmd/kubelet/kubelet.go)
func main() {
rand.Seed(time.Now().UTC().UnixNano())
command := app.NewKubeletCommand(server.SetupSignalHandler())
logs.InitLogs()
defer logs.FlushLogs()
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
2、初始化 kubelet 配置(cmd/kubelet/app/server.go)
NewKubeletCommand() 函數(shù)主要負(fù)責(zé)獲取配置文件中的參數(shù),校驗(yàn)參數(shù)以及為參數(shù)設(shè)置默認(rèn)值絮记。
// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand(stopCh <-chan struct{}) *cobra.Command {
cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
cleanFlagSet.SetNormalizeFunc(flag.WordSepNormalizeFunc)
// Kubelet配置分兩部分:
// KubeletFlag: 指那些不允許在 kubelet 運(yùn)行時(shí)進(jìn)行修改的配置集,或者不能在集群中各個(gè) Nodes 之間共享的配置集虐先。
// KubeletConfiguration: 指可以在集群中各個(gè)Nodes之間共享的配置集,可以進(jìn)行動態(tài)配置怨愤。
kubeletFlags := options.NewKubeletFlags()
kubeletConfig, err := options.NewKubeletConfiguration()
...
cmd := &cobra.Command{
...
Run: func(cmd *cobra.Command, args []string) {
// 讀取 kubelet 配置文件
if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
kubeletConfig, err = loadConfigFile(configFile)
if err != nil {
glog.Fatal(err)
}
...
}
// 校驗(yàn) kubelet 參數(shù)
if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {
glog.Fatal(err)
}
...
// 此處初始化了 kubeletDeps
kubeletDeps, err := UnsecuredDependencies(kubeletServer)
if err != nil {
glog.Fatal(err)
}
...
// 啟動程序
if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
glog.Fatal(err)
}
},
}
...
return cmd
}
kubeletDeps 包含 kubelet 運(yùn)行所必須的配置,是為了實(shí)現(xiàn) dependency injection蛹批,其目的是為了把 kubelet 依賴的組件對象作為參數(shù)傳進(jìn)來撰洗,這樣可以控制 kubelet 的行為。主要包括監(jiān)控功能(cadvisor)腐芍,cgroup 管理功能(containerManager)等了赵。
NewKubeletCommand() 會調(diào)用 Run() 函數(shù),Run() 中主要調(diào)用 run() 函數(shù)進(jìn)行一些準(zhǔn)備事項(xiàng)甸赃。
3柿汛、創(chuàng)建和 apiserver 通信的對象(cmd/kubelet/app/server.go)
run() 函數(shù)的主要功能:
- 1、創(chuàng)建 kubeClient埠对,evnetClient 用來和 apiserver 通信络断。創(chuàng)建 heartbeatClient 向 apiserver 上報(bào)心跳狀態(tài)。
- 2项玛、為 kubeDeps 設(shè)定一些默認(rèn)值貌笨。
- 3、啟動監(jiān)聽 Healthz 端口的 http server襟沮,默認(rèn)端口是 10248锥惋。
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
...
// 判斷 kubelet 的啟動模式
if standaloneMode {
...
} else if kubeDeps.KubeClient == nil || kubeDeps.EventClient == nil || kubeDeps.HeartbeatClient == nil || kubeDeps.DynamicKubeClient == nil {
...
// 創(chuàng)建對象 kubeClient
kubeClient, err = clientset.NewForConfig(clientConfig)
...
// 創(chuàng)建對象 evnetClient
eventClient, err = v1core.NewForConfig(&eventClientConfig)
...
// heartbeatClient 上報(bào)狀態(tài)
heartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
...
}
// 為 kubeDeps 設(shè)定一些默認(rèn)值
if kubeDeps.Auth == nil {
auth, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
if err != nil {
return err
}
kubeDeps.Auth = auth
}
if kubeDeps.CAdvisorInterface == nil {
imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
if err != nil {
return err
}
}
}
//
if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
return err
}
...
// 啟動監(jiān)聽 Healthz 端口的 http server
if s.HealthzPort > 0 {
healthz.DefaultHealthz()
go wait.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), nil)
if err != nil {
glog.Errorf("Starting health server failed: %v", err)
}
}, 5*time.Second, wait.NeverStop)
}
...
}
kubelet 對 pod 資源的獲取方式有三種:第一種是通過文件獲得昌腰,文件一般放在 /etc/kubernetes/manifests 目錄下面;第二種也是通過文件過得膀跌,只不過文件是通過 URL 獲取的遭商;第三種是通過 watch kube-apiserver 獲取。其中前兩種模式下捅伤,我們稱 kubelet 運(yùn)行在 standalone 模式下劫流,運(yùn)行在 standalone 模式下的 kubelet 一般用于調(diào)試某些功能。
run() 中調(diào)用 RunKubelet() 函數(shù)進(jìn)行后續(xù)操作丛忆。
4祠汇、初始化 kubelet 組件內(nèi)部的模塊(cmd/kubelet/app/server.go)
RunKubelet() 主要功能:
- 1、初始化 kubelet 組件中的各個(gè)模塊熄诡,創(chuàng)建出 kubelet 對象可很。
- 2、啟動垃圾回收服務(wù)凰浮。
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
...
// 初始化 kubelet 內(nèi)部模塊
k, err := CreateAndInitKubelet(&kubeServer.KubeletConfiguration,
kubeDeps,
&kubeServer.ContainerRuntimeOptions,
kubeServer.ContainerRuntime,
kubeServer.RuntimeCgroups,
kubeServer.HostnameOverride,
kubeServer.NodeIP,
kubeServer.ProviderID,
kubeServer.CloudProvider,
kubeServer.CertDirectory,
kubeServer.RootDirectory,
kubeServer.RegisterNode,
kubeServer.RegisterWithTaints,
kubeServer.AllowedUnsafeSysctls,
kubeServer.RemoteRuntimeEndpoint,
kubeServer.RemoteImageEndpoint,
kubeServer.ExperimentalMounterPath,
kubeServer.ExperimentalKernelMemcgNotification,
kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
kubeServer.MinimumGCAge,
kubeServer.MaxPerPodContainerCount,
kubeServer.MaxContainerCount,
kubeServer.MasterServiceNamespace,
kubeServer.RegisterSchedulable,
kubeServer.NonMasqueradeCIDR,
kubeServer.KeepTerminatedPodVolumes,
kubeServer.NodeLabels,
kubeServer.SeccompProfileRoot,
kubeServer.BootstrapCheckpointPath,
kubeServer.NodeStatusMaxImages)
if err != nil {
return fmt.Errorf("failed to create kubelet: %v", err)
}
...
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %v", err)
}
glog.Infof("Started kubelet as runonce")
} else {
//
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
glog.Infof("Started kubelet")
}
}
func CreateAndInitKubelet(...){
// NewMainKubelet 實(shí)例化一個(gè) kubelet 對象我抠,并對 kubelet 內(nèi)部各個(gè)模塊進(jìn)行初始化
k, err = kubelet.NewMainKubelet(kubeCfg,
kubeDeps,
crOptions,
containerRuntime,
runtimeCgroups,
hostnameOverride,
nodeIP,
providerID,
cloudProvider,
certDirectory,
rootDirectory,
registerNode,
registerWithTaints,
allowedUnsafeSysctls,
remoteRuntimeEndpoint,
remoteImageEndpoint,
experimentalMounterPath,
experimentalKernelMemcgNotification,
experimentalCheckNodeCapabilitiesBeforeMount,
experimentalNodeAllocatableIgnoreEvictionThreshold,
minimumGCAge,
maxPerPodContainerCount,
maxContainerCount,
masterServiceNamespace,
registerSchedulable,
nonMasqueradeCIDR,
keepTerminatedPodVolumes,
nodeLabels,
seccompProfileRoot,
bootstrapCheckpointPath,
nodeStatusMaxImages)
if err != nil {
return nil, err
}
// 通知 apiserver kubelet 啟動了
k.BirthCry()
// 啟動垃圾回收服務(wù)
k.StartGarbageCollection()
return k, nil
}
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,...){
...
if kubeDeps.PodConfig == nil {
var err error
// 初始化 makePodSourceConfig,監(jiān)聽 pod 元數(shù)據(jù)的來源(FILE, URL, api-server)导坟,將不同 source 的 pod configuration 合并到一個(gè)結(jié)構(gòu)中
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
if err != nil {
return nil, err
}
}
// kubelet 服務(wù)端口,默認(rèn) 10250
daemonEndpoints := &v1.NodeDaemonEndpoints{
KubeletEndpoint: v1.DaemonEndpoint{Port: kubeCfg.Port},
}
// 使用 reflector 把 ListWatch 得到的服務(wù)信息實(shí)時(shí)同步到 serviceStore 對象中
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
if kubeDeps.KubeClient != nil {
serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0)
go r.Run(wait.NeverStop)
}
serviceLister := corelisters.NewServiceLister(serviceIndexer)
// 使用 reflector 把 ListWatch 得到的節(jié)點(diǎn)信息實(shí)時(shí)同步到 nodeStore 對象中
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
if kubeDeps.KubeClient != nil {
fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
go r.Run(wait.NeverStop)
}
nodeInfo := &predicates.CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}
...
// node 資源不足時(shí)的驅(qū)逐策略的設(shè)定
thresholds, err := eviction.ParseThresholdConfig(enforceNodeAllocatable, kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
if err != nil {
return nil, err
}
evictionConfig := eviction.Config{
PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
Thresholds: thresholds,
KernelMemcgNotification: experimentalKernelMemcgNotification,
PodCgroupRoot: kubeDeps.ContainerManager.GetPodCgroupRoot(),
}
...
// 容器引用的管理
containerRefManager := kubecontainer.NewRefManager()
// oom 監(jiān)控
oomWatcher := NewOOMWatcher(kubeDeps.CAdvisorInterface, kubeDeps.Recorder)
// 根據(jù)配置信息和各種對象創(chuàng)建 Kubelet 實(shí)例
klet := &Kubelet{
hostname: hostname,
hostnameOverridden: len(hostnameOverride) > 0,
nodeName: nodeName,
...
}
// 從 cAdvisor 獲取當(dāng)前機(jī)器的信息
machineInfo, err := klet.cadvisor.MachineInfo()
// 對 pod 的管理(如: 增刪改等)
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager, checkpointManager)
// 容器運(yùn)行時(shí)管理
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(...)
// pleg
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
// 創(chuàng)建 containerGC 對象圈澈,進(jìn)行周期性的容器清理工作
containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady)
// 創(chuàng)建 imageManager 管理鏡像
imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage)
// statusManager 實(shí)時(shí)檢測節(jié)點(diǎn)上 pod 的狀態(tài)惫周,并更新到 apiserver 對應(yīng)的 pod
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
// 探針管理
klet.probeManager = prober.NewManager(...)
// token 管理
tokenManager := token.NewManager(kubeDeps.KubeClient)
// 磁盤管理
klet.volumeManager = volumemanager.NewVolumeManager()
// 將 syncPod() 注入到 podWorkers 中
klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
// 容器驅(qū)逐策略管理
evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)
...
}
RunKubelet 最后會調(diào)用 startKubelet() 進(jìn)行后續(xù)的操作。
5康栈、啟動 kubelet 內(nèi)部的模塊及服務(wù)(cmd/kubelet/app/server.go)
startKubelet() 的主要功能:
- 1递递、以 goroutine 方式啟動 kubelet 中的各個(gè)模塊。
- 2啥么、啟動 kubelet http server登舞。
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
go wait.Until(func() {
// 以 goroutine 方式啟動 kubelet 中的各個(gè)模塊
k.Run(podCfg.Updates())
}, 0, wait.NeverStop)
// 啟動 kubelet http server
if enableServer {
go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
}
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
}
}
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
glog.Warning("No api server defined - no node status update will be sent.")
}
// Start the cloud provider sync manager
if kl.cloudResourceSyncManager != nil {
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
}
if err := kl.initializeModules(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
glog.Fatal(err)
}
// Start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
if kl.kubeClient != nil {
// Start syncing node status immediately, this may set up things the runtime needs to run.
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
go kl.fastStatusUpdateOnce()
// start syncing lease
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
go kl.nodeLeaseController.Run(wait.NeverStop)
}
}
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// Start loop to sync iptables util rules
if kl.makeIPTablesUtilChains {
go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
}
// Start a goroutine responsible for killing pods (that are not properly
// handled by pod workers).
go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
// Start component sync loops.
kl.statusManager.Start()
kl.probeManager.Start()
// Start syncing RuntimeClasses if enabled.
if kl.runtimeClassManager != nil {
go kl.runtimeClassManager.Run(wait.NeverStop)
}
// Start the pod lifecycle event generator.
kl.pleg.Start()
kl.syncLoop(updates, kl)
}
syncLoop 是 kubelet 的主循環(huán)方法,它從不同的管道(FILE,URL, API-SERVER)監(jiān)聽 pod 的變化悬荣,并把它們匯聚起來菠秒。當(dāng)有新的變化發(fā)生時(shí),它會調(diào)用對應(yīng)的函數(shù)氯迂,保證 Pod 處于期望的狀態(tài)践叠。
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
glog.Info("Starting kubelet main sync loop.")
// syncTicker 每秒檢測一次是否有需要同步的 pod workers
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
plegCh := kl.pleg.Watch()
const (
base = 100 * time.Millisecond
max = 5 * time.Second
factor = 2
)
duration := base
for {
if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 {
glog.Infof("skipping pod synchronization - %v", rs)
// exponential backoff
time.Sleep(duration)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
continue
}
// reset backoff if we have a success
duration = base
kl.syncLoopMonitor.Store(kl.clock.Now())
//
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}
syncLoopIteration() 方法對多個(gè)管道進(jìn)行遍歷,如果 pod 發(fā)生變化嚼蚀,則會調(diào)用相應(yīng)的 Handler禁灼,在 Handler 中通過調(diào)用 dispatchWork 分發(fā)任務(wù)。
總結(jié)
本篇文章主要講述了 kubelet 組件從加載配置到初始化內(nèi)部的各個(gè)模塊再到啟動 kubelet 服務(wù)的整個(gè)流程轿曙,上面的時(shí)序圖能清楚的看到函數(shù)之間的調(diào)用關(guān)系弄捕,但是其中每個(gè)組件具體的工作方式以及組件之間的交互方式還不得而知僻孝,后面會一探究竟。
參考:
kubernetes node components – kubelet
Kubelet 源碼分析(一):啟動流程分析
kubelet 源碼分析:啟動流程
kubernetes 的 kubelet 的工作過程
kubelet 內(nèi)部實(shí)現(xiàn)解析