接 http://www.reibang.com/p/95cd1556966a
kubelet 會(huì)監(jiān)聽apiService的Pod變化,事件會(huì)發(fā)送到 listenChannel
//pkg/util/config/config.go
func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
//遍歷事件
for update := range listenChannel {
m.merger.Merge(source, update)
}
}
listen方法遍歷事件,調(diào)用Merge 持續(xù)處理Pod變更事件
//pkg/kubelet/config/config.go
func (s *podStorage) Merge(source string, change interface{}) error {
//分組
adds, updates, deletes, removes, reconciles := s.merge(source, change)
switch s.mode {
case PodConfigNotificationIncremental:
s.updates <- *adds
}
merge 方法根據(jù)內(nèi)存中 Pod信息 將 change 分組為以下幾種
//pkg/kubelet/types/pod_update.go
const (
// SET is the current pod configuration.
SET PodOperation = iota
// ADD signifies pods that are new to this source.
ADD
// DELETE signifies pods that are gracefully deleted from this source.
DELETE
// REMOVE signifies pods that have been removed from this source.
REMOVE
// UPDATE signifies pods have been updated in this source.
UPDATE
// RECONCILE signifies pods that have unexpected status in this source,
// kubelet should reconcile status with this source.
RECONCILE
)
s.mode 目前是PodConfigNotificationIncremental,s.updates <- *adds就是將要添加的Pod事件 發(fā)到 updates chan 玄窝。
//pkg/kubelet/kubelet.go
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate,handler SyncHandler
{
select {
case u, open := <-configCh:
switch u.Op { case kubetypes.ADD:
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
configCh就是updates chan,流程到 handler.HandlePodAdditions(u.Pods)
//pkg/kubelet/kubelet.go
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
}
dispatchWork 分發(fā)任務(wù)
//pkg/kubelet/kubelet.go
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod has completed termination, dispatchWork will perform no action.
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
// Run the sync in an async worker.
// 轉(zhuǎn) podWorkers 處理
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
StartTime: start,
})
// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {
metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
}
}
請(qǐng)求 podWorkers.UpdatePod 參數(shù)是一個(gè)UpdatePodOptions
//pkg/kubelet/pod_workers.go
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
//獲取當(dāng)前的Pod sync狀態(tài)
status, ok := p.podSyncStatuses[uid]
if !ok {
//獲取失敗說明是第一次
klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KObj(pod), "podUID", pod.UID)
status = &podSyncStatus{
syncedAt: now,
fullname: kubecontainer.GetPodFullName(pod),
}
// if this pod is being synced for the first time, we need to make sure it is an active pod
//如果是第一次 需要確保Pod是活躍的(PodFailed 或者 PodSucceeded 狀態(tài) )
if !isRuntimePod && (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) {
// check to see if the pod is not running and the pod is terminal.
// If this succeeds then record in the podWorker that it is terminated.
if statusCache, err := p.podCache.Get(pod.UID); err == nil {
if isPodStatusCacheTerminal(statusCache) {
//狀態(tài)
status = &podSyncStatus{
terminatedAt: now,
terminatingAt: now,
syncedAt: now,
startedTerminating: true,
finished: true,
fullname: kubecontainer.GetPodFullName(pod),
}
}
}
}
//更新狀態(tài)
p.podSyncStatuses[uid] = status
}
if status.IsTerminationRequested() {
if options.UpdateType == kubetypes.SyncPodCreate {
//Pod請(qǐng)求創(chuàng)建抗斤,但是當(dāng)前是Terminationing狀態(tài),重置狀態(tài)為 restartRequested =true
status.restartRequested = true
klog.V(4).InfoS("Pod is terminating but has been requested to restart with same UID, will be reconciled
later", "pod", klog.KObj(pod), "podUID", pod.UID)
return
}
}
// once a pod is terminated by UID, it cannot reenter the pod worker (until the UID is purged by housekeeping)
//Pod的狀態(tài)是Finished的慷吊,不再處理
if status.IsFinished() {
klog.V(4).InfoS("Pod is finished processing, no further updates", "pod", klog.KObj(pod), "podUID", pod.UID)
return
}
//任務(wù)類型
workType = SyncPodWork
// the desired work we want to be performing
//任務(wù)對(duì)象
work := podWork{
WorkType: workType,
Options: options,
}
if podUpdates, exists = p.podUpdates[uid]; !exists {
//任務(wù)沒啟動(dòng)則啟動(dòng)
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
}()
// dispatch a request to the pod worker if none are running
// 當(dāng)前Pod沒有處于 Sync
if !status.IsWorking() {
status.working = true
podUpdates <- work
return
}
}
managePodLoop 持續(xù)遍歷 podUpdates chan,每個(gè)podUpdate事件調(diào)用 syncPodFn 處理
//pkg/kubelet/pod_workers.go
func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
for update := range podUpdates {
//獲取比lastSyncTime 是更新的狀態(tài)
status, err = p.podCache.GetNewerThan(pod.UID, lastSyncTime)
//syncPodFn 函數(shù)== /pkg/kubelet/kubelet.go#syncPod函數(shù)
err = p.syncPodFn(ctx, update.Options.UpdateType, pod, update.Options.MirrorPod, status)
//重新進(jìn)入隊(duì)列
p.completeWork(pod, err)
}
syncPodFn 其實(shí)就是 pkg/kubelet/kubelet.go 的 syncPod 函數(shù)
//pkg/kubelet/kubelet.go
func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
//判斷Pod是否可以被接納(基于)
runnable := kl.canRunPod(pod)
//更新pod狀態(tài) 由 pkg/kubelet/status/status_manager.go 負(fù)責(zé)處理
kl.statusManager.SetPodStatus(pod, apiPodStatus)
......
//創(chuàng)建Pod目錄
//var/lib/kubelet/pods/{podUID}
// var/lib/kubelet/pods/{podUID}//volumes
// var/lib/kubelet/pods/{podUID}//plugins
if err := kl.makePodDataDirs(pod); err != nil {
//獲得pullImage 的 secrets
pullSecrets := kl.getPullSecretsForPod(pod)
result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
}
syncPod 函數(shù)通過statusManager 更新 Pod狀態(tài), 創(chuàng)建目錄,調(diào)用 pkg/kubelet/kuberuntime/kuberuntime_manager.go 的 SyncPod曹抬,SyncPod 方法溉瓶,官方代碼也給出了提示,共7個(gè)步驟
//pkg/kubelet/kuberuntime/kuberuntime_manager.go
// 1. Compute sandbox and container changes. 計(jì)算sandbox和 container的變化
// 2. Kill pod sandbox if necessary. 如果有需要就干掉sandbox
// 3. Kill any containers that should not be running. 干掉所有不需要運(yùn)行的container
// 4. Create sandbox if necessary. 如果有需要就創(chuàng)建sandbox
// 5. Create ephemeral containers. 創(chuàng)建臨時(shí)的 container
// 6. Create init containers. 創(chuàng)建init container
// 7. Create normal containers. 創(chuàng)建正常container
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
//Step1
.....
}
- Compute sandbox and container changes. 計(jì)算sandbox和 container的變化
/Step 1: Compute sandbox and container changes.
podContainerChanges := m.computePodActions(pod, podStatus)
klog.V(3).InfoS("computePodActions got for pod", "podActions", podContainerChanges, "pod", klog.KObj(pod))
if podContainerChanges.CreateSandbox {
ref, err := ref.GetReference(legacyscheme.Scheme, pod)
if err != nil {
klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod))
}
if podContainerChanges.SandboxID != "" {
//Pod 變更 需要Kill 和 re-Created
m.recorder.Eventf(ref, v1.EventTypeNormal, events.SandboxChanged, "Pod sandbox changed, it will be killed and re-created.")
} else {
//新Pod 需要?jiǎng)?chuàng)建
klog.V(4).InfoS("SyncPod received new pod, will create a sandbox for it", "pod", klog.KObj(pod))
}
}
- Kill pod sandbox if necessary. 如果有需要就干掉sandbox
- Kill any containers that should not be running. 干掉所有不需要運(yùn)行的container
if podContainerChanges.KillPod {
// Step 2: Kill the pod if the sandbox has changed.
if podContainerChanges.CreateSandbox {
klog.V(4).InfoS("Stopping PodSandbox for pod, will start new one", "pod", klog.KObj(pod))
} else {
klog.V(4).InfoS("Stopping PodSandbox for pod, because all other containers are dead", "pod", klog.KObj(pod))
}
//通過CRI kill pod和 container 分別對(duì)應(yīng)調(diào)用CRI 的 StopPodSandbox 和 StopContainer
killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)
result.AddPodSyncResult(killResult)
if killResult.Error() != nil {
klog.ErrorS(killResult.Error(), "killPodWithSyncResult failed")
return
}
if podContainerChanges.CreateSandbox {
m.purgeInitContainers(pod, podStatus)
}
} else {
// Step 3: kill any running containers in this pod which are not to keep.
Step2和Step3互斥
- Create sandbox if necessary. 如果有需要就創(chuàng)建sandbox
//pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
Step5,Step6,Step7 分別啟動(dòng) 臨時(shí)容器谤民,init容器堰酿,普通容器
//pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
// 創(chuàng)建日志目錄 /var/log/pods/{podNamespace}_{podName}_{podUID}
err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
//runtimeHandler 具體的容器運(yùn)行時(shí)Handler
runtimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName)
//創(chuàng)建PodSandbox
podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
// Step 5: start ephemeral containers
// These are started "prior" to init containers to allow running ephemeral containers even when there
// are errors starting an init container. In practice init containers will start first since ephemeral
// containers cannot be specified on pod creation.
if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) {
for _, idx := range podContainerChanges.EphemeralContainersToStart {
//啟動(dòng)臨時(shí)容器
start("ephemeral container", metrics.EphemeralContainer, ephemeralContainerStartSpec(&pod.Spec.EphemeralContainers[idx]))
}
}
// Step 6: start the init container.
if container := podContainerChanges.NextInitContainerToStart; container != nil {
// Start the next init container.
//啟動(dòng)init容器
if err := start("init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
return
}
// Successfully started the container; clear the entry in the failure
klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
}
// Step 7: start containers in podContainerChanges.ContainersToStart.
for _, idx := range podContainerChanges.ContainersToStart {
//啟動(dòng)容器
start("container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
}
start 方法是啟動(dòng)容器分別為: pull 鏡像,創(chuàng)建容器,啟動(dòng)容器张足,執(zhí)行 生命周期Hook
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
// Step 1: pull the image. (imageManager負(fù)責(zé))
imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
// Step 2: create the container. 通過調(diào)用 CRI 的 CreateContainer
containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
// Step 3: start the container. 通過調(diào)用CRI StartContainer
err = m.runtimeService.StartContainer(containerID)
}