上篇文章介紹了 kubelet 的啟動流程担忧,本篇文章主要介紹 kubelet 創(chuàng)建 pod 的流程熟嫩。
kubernetes 版本: v1.12
kubelet 的工作核心就是在圍繞著不同的生產(chǎn)者生產(chǎn)出來的不同的有關 pod 的消息來調用相應的消費者(不同的子模塊)完成不同的行為(創(chuàng)建和刪除 pod 等),即圖中的控制循環(huán)(SyncLoop),通過不同的事件驅動這個控制循環(huán)運行。
本文僅分析新建 pod 的流程消玄,當一個 pod 完成調度,與一個 node 綁定起來之后丢胚,這個 pod 就會觸發(fā) kubelet 在循環(huán)控制里注冊的 handler翩瓜,上圖中的 HandlePods 部分。此時携龟,通過檢查 pod 在 kubelet 內存中的狀態(tài)兔跌,kubelet 就能判斷出這是一個新調度過來的 pod,從而觸發(fā) Handler 里的 ADD 事件對應的邏輯處理峡蟋。然后 kubelet 會為這個 pod 生成對應的 podStatus坟桅,接著檢查 pod 所聲明的 volume 是不是準備好了,然后調用下層的容器運行時蕊蝗。如果是 update 事件的話仅乓,kubelet 就會根據(jù) pod 對象具體的變更情況,調用下層的容器運行時進行容器的重建蓬戚。
kubelet 創(chuàng)建 pod 的流程
1夸楣、kubelet 的控制循環(huán)(syncLoop)
syncLoop 中首先定義了一個 syncTicker 和 housekeepingTicker,即使沒有需要更新的 pod 配置碌更,kubelet 也會定時去做同步和清理 pod 的工作裕偿。然后在 for 循環(huán)中一直調用 syncLoopIteration洞慎,如果在每次循環(huán)過程中出現(xiàn)比較嚴重的錯誤痛单,kubelet 會記錄到 runtimeState 中,遇到錯誤就等待 5 秒中繼續(xù)循環(huán)劲腿。
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
glog.Info("Starting kubelet main sync loop.")
// syncTicker 每秒檢測一次是否有需要同步的 pod workers
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
// 每兩秒檢測一次是否有需要清理的 pod
housekeepingTicker := time.NewTicker(housekeepingPeriod)
defer housekeepingTicker.Stop()
// pod 的生命周期變化
plegCh := kl.pleg.Watch()
const (
base = 100 * time.Millisecond
max = 5 * time.Second
factor = 2
)
duration := base
for {
if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 {
time.Sleep(duration)
duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
continue
}
...
kl.syncLoopMonitor.Store(kl.clock.Now())
// 第二個參數(shù)為 SyncHandler 類型旭绒,SyncHandler 是一個 interface,
// 在該文件開頭處定義
if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
break
}
kl.syncLoopMonitor.Store(kl.clock.Now())
}
}
2、監(jiān)聽 pod 變化(syncLoopIteration)
syncLoopIteration 這個方法就會對多個管道進行遍歷挥吵,發(fā)現(xiàn)任何一個管道有消息就交給 handler 去處理重父。它會從以下管道中獲取消息:
- configCh:該信息源由 kubeDeps 對象中的 PodConfig 子模塊提供,該模塊將同時 watch 3 個不同來源的 pod 信息的變化(file忽匈,http房午,apiserver),一旦某個來源的 pod 信息發(fā)生了更新(創(chuàng)建/更新/刪除)丹允,這個 channel 中就會出現(xiàn)被更新的 pod 信息和更新的具體操作郭厌。
- syncCh:定時器管道,每隔一秒去同步最新保存的 pod 狀態(tài)
- houseKeepingCh:housekeeping 事件的管道雕蔽,做 pod 清理工作
- plegCh:該信息源由 kubelet 對象中的 pleg 子模塊提供折柠,該模塊主要用于周期性地向 container runtime 查詢當前所有容器的狀態(tài),如果狀態(tài)發(fā)生變化批狐,則這個 channel 產(chǎn)生事件扇售。
- livenessManager.Updates():健康檢查發(fā)現(xiàn)某個 pod 不可用,kubelet 將根據(jù) Pod 的restartPolicy 自動執(zhí)行正確的操作
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
if !open {
glog.Errorf("Update channel is closed. Exiting the sync loop.")
return false
}
switch u.Op {
case kubetypes.ADD:
...
case kubetypes.UPDATE:
...
case kubetypes.REMOVE:
...
case kubetypes.RECONCILE:
...
case kubetypes.DELETE:
...
case kubetypes.RESTORE:
...
case kubetypes.SET:
...
}
...
case e := <-plegCh:
...
case <-syncCh:
...
case update := <-kl.livenessManager.Updates():
...
case <-housekeepingCh:
...
}
return true
}
3嚣艇、處理新增 pod(HandlePodAddtions)
對于事件中的每個 pod承冰,執(zhí)行以下操作:
- 1、把所有的 pod 按照創(chuàng)建日期進行排序食零,保證最先創(chuàng)建的 pod 會最先被處理
- 2巷懈、把它加入到 podManager 中,podManager 子模塊負責管理這臺機器上的 pod 的信息慌洪,pod 和 mirrorPod 之間的對應關系等等顶燕。所有被管理的 pod 都要出現(xiàn)在里面,如果 podManager 中找不到某個 pod冈爹,就認為這個 pod 被刪除了
- 3涌攻、如果是 mirror pod 調用其單獨的方法
- 4、驗證 pod 是否能在該節(jié)點運行频伤,如果不可以直接拒絕
- 5恳谎、通過 dispatchWork 把創(chuàng)建 pod 的工作下發(fā)給 podWorkers 子模塊做異步處理
- 6、在 probeManager 中添加 pod憋肖,如果 pod 中定義了 readiness 和 liveness 健康檢查因痛,啟動 goroutine 定期進行檢測
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
start := kl.clock.Now()
// 對所有 pod 按照日期排序,保證最先創(chuàng)建的 pod 優(yōu)先被處理
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
kl.dnsConfigurer.CheckLimitsForResolvConf()
}
existingPods := kl.podManager.GetPods()
// 把 pod 加入到 podManager 中
kl.podManager.AddPod(pod)
// 判斷是否是 mirror pod(即 static pod)
if kubepod.IsMirrorPod(pod) {
kl.handleMirrorPod(pod, start)
continue
}
if !kl.podIsTerminated(pod) {
activePods := kl.filterOutTerminatedPods(existingPods)
// 通過 canAdmitPod 方法校驗Pod能否在該計算節(jié)點創(chuàng)建(如:磁盤空間)
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
kl.rejectPod(pod, reason, message)
continue
}
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
// 通過 dispatchWork 分發(fā) pod 做異步處理岸更,dispatchWork 主要工作就是把接收到的參數(shù)封裝成 UpdatePodOptions鸵膏,調用 UpdatePod 方法.
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
// 在 probeManager 中添加 pod,如果 pod 中定義了 readiness 和 liveness 健康檢查怎炊,啟動 goroutine 定期進行檢測
kl.probeManager.AddPod(pod)
}
}
static pod 是由 kubelet 直接管理的谭企,k8s apiserver 并不會感知到 static pod 的存在廓译,當然也不會和任何一個 rs 關聯(lián)上,完全是由 kubelet 進程來監(jiān)管债查,并在它異常時負責重啟非区。Kubelet 會通過 apiserver 為每一個 static pod 創(chuàng)建一個對應的 mirror pod,如此以來就可以可以通過 kubectl 命令查看對應的 pod,并且可以通過 kubectl logs 命令直接查看到static pod 的日志信息盹廷。
4征绸、下發(fā)任務(dispatchWork)
dispatchWorker 的主要作用是把某個對 Pod 的操作(創(chuàng)建/更新/刪除)下發(fā)給 podWorkers。
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
if kl.podIsTerminated(pod) {
if pod.DeletionTimestamp != nil {
kl.statusManager.TerminatePod(pod)
}
return
}
// 落實在 podWorkers 中
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
OnCompleteFunc: func(err error) {
if err != nil {
metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
}
},
})
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
5俄占、更新事件的 channel(UpdatePod)
podWorkers 子模塊主要的作用就是處理針對每一個的 Pod 的更新事件歹垫,比如 Pod 的創(chuàng)建,刪除颠放,更新排惨。而 podWorkers 采取的基本思路是:為每一個 Pod 都單獨創(chuàng)建一個 goroutine 和更新事件的 channel,goroutine 會阻塞式的等待 channel 中的事件碰凶,并且對獲取的事件進行處理暮芭。而 podWorkers 對象自身則主要負責對更新事件進行下發(fā)。
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
pod := options.Pod
uid := pod.UID
var podUpdates chan UpdatePodOptions
var exists bool
p.podLock.Lock()
defer p.podLock.Unlock()
// 如果當前 pod 還沒有啟動過 goroutine 欲低,則啟動 goroutine辕宏,并且創(chuàng)建 channel
if podUpdates, exists = p.podUpdates[uid]; !exists {
// 創(chuàng)建 channel
podUpdates = make(chan UpdatePodOptions, 1)
p.podUpdates[uid] = podUpdates
// 啟動 goroutine
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
}
// 下發(fā)更新事件
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
podUpdates <- *options
} else {
update, found := p.lastUndeliveredWorkUpdate[pod.UID]
if !found || update.UpdateType != kubetypes.SyncPodKill {
p.lastUndeliveredWorkUpdate[pod.UID] = *options
}
}
}
6、調用 syncPodFn 方法同步 pod(managePodLoop)
managePodLoop 調用 syncPodFn 方法去同步 pod砾莱,syncPodFn 實際上就是kubelet.SyncPod瑞筐。在完成這次 sync 動作之后,會調用 wrapUp 函數(shù)腊瑟,這個函數(shù)將會做幾件事情:
- 將這個 pod 信息插入 kubelet 的 workQueue 隊列中聚假,等待下一次周期性的對這個 pod 的狀態(tài)進行 sync
- 將在這次 sync 期間堆積的沒有能夠來得及處理的最近一次 update 操作加入 goroutine 的事件 channel 中,立即處理闰非。
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
var lastSyncTime time.Time
for update := range podUpdates {
err := func() error {
podUID := update.Pod.UID
status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
if err != nil {
...
}
err = p.syncPodFn(syncPodOptions{
mirrorPod: update.MirrorPod,
pod: update.Pod,
podStatus: status,
killPodOptions: update.KillPodOptions,
updateType: update.UpdateType,
})
lastSyncTime = time.Now()
return err
}()
if update.OnCompleteFunc != nil {
update.OnCompleteFunc(err)
}
if err != nil {
...
}
p.wrapUp(update.Pod.UID, err)
}
}
7膘格、完成創(chuàng)建容器前的準備工作(SyncPod)
在這個方法中,主要完成以下幾件事情:
- 如果是刪除 pod财松,立即執(zhí)行并返回
- 同步 podStatus 到 kubelet.statusManager
- 檢查 pod 是否能運行在本節(jié)點瘪贱,主要是權限檢查(是否能使用主機網(wǎng)絡模式,是否可以以 privileged 權限運行等)辆毡。如果沒有權限菜秦,就刪除本地舊的 pod 并返回錯誤信息
- 創(chuàng)建 containerManagar 對象,并且創(chuàng)建 pod level cgroup舶掖,更新 Qos level cgroup
- 如果是 static Pod球昨,就創(chuàng)建或者更新對應的 mirrorPod
- 創(chuàng)建 pod 的數(shù)據(jù)目錄,存放 volume 和 plugin 信息,如果定義了 pv访锻,等待所有的 volume mount 完成(volumeManager 會在后臺做這些事情),如果有 image secrets褪尝,去 apiserver 獲取對應的 secrets 數(shù)據(jù)
- 然后調用 kubelet.volumeManager 組件闹获,等待它將 pod 所需要的所有外掛的 volume 都準備好期犬。
- 調用 container runtime 的 SyncPod 方法河哑,去實現(xiàn)真正的容器創(chuàng)建邏輯
這里所有的事情都和具體的容器沒有關系,可以看到該方法是創(chuàng)建 pod 實體(即容器)之前需要完成的準備工作龟虎。
func (kl *Kubelet) syncPod(o syncPodOptions) error {
// pull out the required options
pod := o.pod
mirrorPod := o.mirrorPod
podStatus := o.podStatus
updateType := o.updateType
// 是否為 刪除 pod
if updateType == kubetypes.SyncPodKill {
...
}
...
// 檢查 pod 是否能運行在本節(jié)點
runnable := kl.canRunPod(pod)
if !runnable.Admit {
...
}
// 更新 pod 狀態(tài)
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// 如果 pod 非 running 狀態(tài)則直接 kill 掉
if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
...
}
// 加載網(wǎng)絡插件
if rs := kl.runtimeState.networkErrors(); len(rs) != 0 && !kubecontainer.IsHostNetworkPod(pod) {
...
}
pcm := kl.containerManager.NewPodContainerManager()
if !kl.podIsTerminated(pod) {
...
// 創(chuàng)建并更新 pod 的 cgroups
if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
if !pcm.Exists(pod) {
...
}
}
}
// 為 static pod 創(chuàng)建對應的 mirror pod
if kubepod.IsStaticPod(pod) {
...
}
// 創(chuàng)建數(shù)據(jù)目錄
if err := kl.makePodDataDirs(pod); err != nil {
...
}
// 掛載 volume
if !kl.podIsTerminated(pod) {
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
...
}
}
// 獲取 secret 信息
pullSecrets := kl.getPullSecretsForPod(pod)
// 調用 containerRuntime 的 SyncPod 方法開始創(chuàng)建容器
result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result)
if err := result.Error(); err != nil {
...
}
return nil
}
8璃谨、創(chuàng)建容器
containerRuntime(pkg/kubelet/kuberuntime)子模塊的 SyncPod 函數(shù)才是真正完成 pod 內容器實體的創(chuàng)建。
syncPod 主要執(zhí)行以下幾個操作:
- 1鲤妥、計算 sandbox 和 container 是否發(fā)生變化
- 2佳吞、創(chuàng)建 sandbox 容器
- 3、啟動 init 容器
- 4棉安、啟動業(yè)務容器
initContainers 可以有多個底扳,多個 container 嚴格按照順序啟動,只有當前一個 container 退出了以后贡耽,才開始啟動下一個 container衷模。
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
// 1、計算 sandbox 和 container 是否發(fā)生變化
podContainerChanges := m.computePodActions(pod, podStatus)
if podContainerChanges.CreateSandbox {
ref, err := ref.GetReference(legacyscheme.Scheme, pod)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
}
...
}
// 2蒲赂、kill 掉 sandbox 已經(jīng)改變的 pod
if podContainerChanges.KillPod {
...
} else {
// 3阱冶、kill 掉非 running 狀態(tài)的 containers
...
for containerID, containerInfo := range podContainerChanges.ContainersToKill {
...
if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
...
}
}
}
m.pruneInitContainersBeforeStart(pod, podStatus)
podIP := ""
if podStatus != nil {
podIP = podStatus.IP
}
// 4、創(chuàng)建 sandbox
podSandboxID := podContainerChanges.SandboxID
if podContainerChanges.CreateSandbox {
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
if err != nil {
...
}
...
podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
if err != nil {
...
}
// 如果 pod 網(wǎng)絡是 host 模式滥嘴,容器也相同木蹬;其他情況下,容器會使用 None 網(wǎng)絡模式若皱,讓 kubelet 的網(wǎng)絡插件自己進行網(wǎng)絡配置
if !kubecontainer.IsHostNetworkPod(pod) {
podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus)
glog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod))
}
}
configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
result.AddSyncResult(configPodSandboxResult)
// 獲取 PodSandbox 的配置(如:metadata,clusterDNS,容器的端口映射等)
podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
...
// 5镊叁、啟動 init container
if container := podContainerChanges.NextInitContainerToStart; container != nil {
...
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit); err != nil {
...
}
}
// 6、啟動業(yè)務容器
for _, idx := range podContainerChanges.ContainersToStart {
...
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular); err != nil {
...
}
}
return
}
9走触、啟動容器
最終由 startContainer 完成容器的啟動意系,其主要有以下幾個步驟:
- 1、拉取鏡像
- 2饺汹、生成業(yè)務容器的配置信息
- 3蛔添、調用 docker api 創(chuàng)建容器
- 4、啟動容器
- 5兜辞、執(zhí)行 post start hook
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, containerType kubecontainer.ContainerType) (string, error) {
// 1迎瞧、檢查業(yè)務鏡像是否存在,不存在則到 Docker Registry 或是 Private Registry 拉取鏡像逸吵。
imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)
if err != nil {
...
}
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
...
}
// 設置 RestartCount
restartCount := 0
containerStatus := podStatus.FindContainerStatusByName(container.Name)
if containerStatus != nil {
restartCount = containerStatus.RestartCount + 1
}
// 2凶硅、生成業(yè)務容器的配置信息
containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, containerType)
if cleanupAction != nil {
defer cleanupAction()
}
...
// 3、通過 client.CreateContainer 調用 docker api 創(chuàng)建業(yè)務容器
containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
if err != nil {
...
}
err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
if err != nil {
...
}
...
// 3扫皱、啟動業(yè)務容器
err = m.runtimeService.StartContainer(containerID)
if err != nil {
...
}
containerMeta := containerConfig.GetMetadata()
sandboxMeta := podSandboxConfig.GetMetadata()
legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
sandboxMeta.Namespace)
containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
glog.Errorf("Failed to create legacy symbolic link %q to container %q log %q: %v",
legacySymlink, containerID, containerLog, err)
}
}
// 4足绅、執(zhí)行 post start hook
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
kubeContainerID := kubecontainer.ContainerID{
Type: m.runtimeName,
ID: containerID,
}
// runner.Run 這個方法的主要作用就是在業(yè)務容器起來的時候捷绑,
// 首先會執(zhí)行一個 container hook(PostStart 和 PreStop),做一些預處理工作。
// 只有 container hook 執(zhí)行成功才會運行具體的業(yè)務服務氢妈,否則容器異常粹污。
msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
if handlerErr != nil {
...
}
}
return "", nil
}
總結
本文主要講述了 kubelet 從監(jiān)聽到有容器調度至本節(jié)點再到容器創(chuàng)建的一個過程,kubelet 最終調用 docker api 來創(chuàng)建容器的首量。結合上篇文章壮吩,可以看出 kubelet 從啟動到創(chuàng)建 pod 的一個清晰過程。
參考:
k8s源碼分析-kubelet
Kubelet源碼分析(一):啟動流程分析
kubelet 源碼分析:pod 新建流程
kubelet創(chuàng)建Pod流程解析
Kubelet: Pod Lifecycle Event Generator (PLEG) Design- proposals