kubelet
前言
本文沒有去列出細節(jié)邏輯實現(xiàn),只列出主干邏輯锌雀,代碼中有注解可以簡單閱讀以下蹂随,k8s源碼大多方法以interface層層包裝的形式調(diào)用琉预,一個interface會有較多實現(xiàn)(interface多態(tài)),代碼中的interface的具體實現(xiàn)可以參考《intrface實現(xiàn)分析》姐直,后續(xù)會就一處調(diào)用進行詳細分析
如何debug
我是利用dlv工具遠程調(diào)試的倦淀,遠端搭建了一個3master、3node的k8s集群声畏,停止了一個vm的kubelet撞叽,在vm上用以下命令啟動kubelet源碼,進行調(diào)試插龄。至于其他組件調(diào)試愿棋,也可以通過這種方式。
需要注意:啟動命令中的參數(shù)均牢,多去少補(通過觀察日志)
dlv啟動命令dlv debug --headless --listen ":2345" --log --api-version 2 -- --runtime-cgroups=/systemd/system.slice --kubelet-cgroups=/systemd/system.slice --kubeconfig=/etc/kubernetes/kubelet.conf --pod-infra-container-image=xxx/pause:3.1 --config=/var/lib/kubelet/config.yaml --cgroup-driver=cgroupfs --network-plugin=cni
Kubelet服務(wù)啟動流程
kubelet服務(wù)入口
cmd/kubelet/kubelet.go
,主要負責(zé)校驗參數(shù)糠雨,創(chuàng)建和 api-server 交互的 client 及對運行 kubelet 權(quán)限檢測,啟動 Kubelet 等等
func main() {
rand.Seed(time.Now().UnixNano())
command := app.NewKubeletCommand(server.SetupSignalHandler())
logs.InitLogs()
defer logs.FlushLogs()
...
}
具體實現(xiàn)cmd/kubelet/app/server.go
func NewKubeletCommand(stopCh <-chan struct{}) *cobra.Command {
cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
//設(shè)置默認的KubeletFlags的值,包括docker,證書路徑徘跪,插件目錄甘邀,包括CIDR等等信息
kubeletFlags := options.NewKubeletFlags()
//生成kubelet默認配置文件
kubeletConfig, err := options.NewKubeletConfiguration()
// programmer error
if err != nil {
klog.Fatal(err)
}
...
Run: func(cmd *cobra.Command, args []string) {
// use dynamic kubelet config, if enabled
var kubeletConfigController *dynamickubeletconfig.Controller
if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
func(kc *kubeletconfiginternal.KubeletConfiguration) error {
// Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence,
// so that we get a complete validation at the same point where we can decide to reject dynamic config.
// This fixes the flag-precedence component of issue #63305.
// See issue #56171 for general details on flag precedence.
return kubeletConfigFlagPrecedence(kc, args)
})
if err != nil {
klog.Fatal(err)
}
// If we should just use our existing, local config, the controller will return a nil config
if dynamicKubeletConfig != nil {
kubeletConfig = dynamicKubeletConfig
// Note: flag precedence was already enforced in the controller, prior to validation,
// by our above transform function. Now we simply update feature gates from the new config.
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
klog.Fatal(err)
}
}
}
// construct a KubeletServer from kubeletFlags and kubeletConfig
kubeletServer := &options.KubeletServer{
KubeletFlags: *kubeletFlags,
KubeletConfiguration: *kubeletConfig,
}
// use kubeletServer to construct the default KubeletDeps
kubeletDeps, err := UnsecuredDependencies(kubeletServer)
if err != nil {
klog.Fatal(err)
}
// add the kubelet config controller to kubeletDeps
kubeletDeps.KubeletConfigController = kubeletConfigController
// start the experimental docker shim, if enabled
if kubeletServer.KubeletFlags.ExperimentalDockershim {
if err := RunDockershim(&kubeletServer.KubeletFlags, kubeletConfig, stopCh); err != nil {
klog.Fatal(err)
}
return
}
// run the kubelet
klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
klog.Fatal(err)
}
},
}
...
}
func NewKubeletCommand中
// use dynamic kubelet config, if enabled
var kubeletConfigController *dynamickubeletconfig.Controller
if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
var dynamicKubeletConfig *kubeletconfiginternal.KubeletConfiguration
dynamicKubeletConfig, kubeletConfigController, err = BootstrapKubeletConfigController(dynamicConfigDir,
func(kc *kubeletconfiginternal.KubeletConfiguration) error {
// Here, we enforce flag precedence inside the controller, prior to the controller's validation sequence,
// so that we get a complete validation at the same point where we can decide to reject dynamic config.
// This fixes the flag-precedence component of issue #63305.
// See issue #56171 for general details on flag precedence.
return kubeletConfigFlagPrecedence(kc, args)
})
if err != nil {
klog.Fatal(err)
}
// If we should just use our existing, local config, the controller will return a nil config
if dynamicKubeletConfig != nil {
kubeletConfig = dynamicKubeletConfig
// Note: flag precedence was already enforced in the controller, prior to validation,
// by our above transform function. Now we simply update feature gates from the new config.
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
klog.Fatal(err)
}
}
}
該功能主要是新建一個watch的功能,主要是用來watch kubelet的配置文件是否改變真椿,如果已經(jīng)改變鹃答,那么就重新load kubelet的配置文件 用的是kubernetes常用到的Controller,也就是Informer的架構(gòu)乎澄,watch ConfigMap對象突硝,進一步查看BootstrapKubeletConfigController
// BootstrapKubeletConfigController constructs and bootstrap a configuration controller
func BootstrapKubeletConfigController(dynamicConfigDir string, transform dynamickubeletconfig.TransformFunc) (*kubeletconfiginternal.KubeletConfiguration, *dynamickubeletconfig.Controller, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) {
return nil, nil, fmt.Errorf("failed to bootstrap Kubelet config controller, you must enable the DynamicKubeletConfig feature gate")
}
if len(dynamicConfigDir) == 0 {
return nil, nil, fmt.Errorf("cannot bootstrap Kubelet config controller, --dynamic-config-dir was not provided")
}
// compute absolute path and bootstrap controller
dir, err := filepath.Abs(dynamicConfigDir)
if err != nil {
return nil, nil, fmt.Errorf("failed to get absolute path for --dynamic-config-dir=%s", dynamicConfigDir)
}
// get the latest KubeletConfiguration checkpoint from disk, or return the default config if no valid checkpoints exist
c := dynamickubeletconfig.NewController(dir, transform)
kc, err := c.Bootstrap()
if err != nil {
return nil, nil, fmt.Errorf("failed to determine a valid configuration, error: %v", err)
}
return kc, c, nil
}
-------------------------
// /pkg/kubelet/kubeletconfig/controller.go
// Bootstrap attempts to return a valid KubeletConfiguration based on the configuration of the Controller,
// or returns an error if no valid configuration could be produced. Bootstrap should be called synchronously before StartSync.
// If the pre-existing local configuration should be used, Bootstrap returns a nil config.
func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error)
graph LR
NewKubeletCommand-->RUN
RUN-->run
這個函數(shù)主要是用來啟動各種以來的服務(wù)以及kubelet的監(jiān)聽端口
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
return fmt.Errorf("failed OS init: %v", err)
}
//主要啟動函數(shù)
if err := run(s, kubeDeps, stopCh); err != nil {
return fmt.Errorf("failed to run Kubelet: %v", err)
}
return nil
}
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
...
//啟動前參數(shù)準備完備,進入啟動流程
if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
return err
}
}
graph LR
準備完成-->RunKubelet
RunKubelet-->CreateAndInitKubelet
RunKubelet-->startKubelet
在 RunKubelet 中主要做 CreateAndInitKubelet 和 startKubelet 兩件事:
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
...
k, err := createAndInitKubelet(...)
...
// process pods and exit.
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %v", err)
}
klog.Info("Started kubelet as runonce")
} else {
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
klog.Info("Started kubelet")
}
return nil
}
createAndInitKubelet
func createAndInitKubelet(...){
...
k, err = kubelet.NewMainKubelet(...)
if err != nil {
return nil, err
}
k.BirthCry()
k.StartGarbageCollection()
}
NewMainKubelet 實例化一個 kubelet 對象置济,并對 kubelet 內(nèi)部各個 component 進行初始化工作:
- containerGC // 容器的垃圾回收
- statusManager // pod 狀態(tài)的管理
- imageManager // 鏡像的管路
- probeManager // 容器健康檢測
- gpuManager // GPU 的支持
- PodCache // Pod 緩存的管理
- secretManager // secret 資源的管理
- configMapManager // configMap 資源的管理
- InitNetworkPlugin // 網(wǎng)絡(luò)插件的初始化
- PodManager // 對 pod 的管理, e.g., CRUD
- makePodSourceConfig // pod 元數(shù)據(jù)的來源 (FILE, URL, api-server)
- diskSpaceManager // 磁盤空間的管理
- ContainerRuntime // 容器運行時的選擇(docker 或 rkt)
- BirthCry // 通知 api-server 服務(wù) kubelet 啟動
- StartGarbageCollection // 開啟垃圾回收服務(wù)
startKubelet
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
// start the kubelet
go wait.Until(func() {
k.Run(podCfg.Updates())
}, 0, wait.NeverStop)
// start the kubelet server
//獲取 pod 及 node 的相關(guān)信息解恰,后續(xù)會更新到etcd
if enableServer {
go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
}
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
go k.ListenAndServePodResources()
}
}
graph LR
startKubelet-->Run
pkg/kubelet/kubelet.go
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { //啟動日志服務(wù)
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
klog.Warning("No api server defined - no node status update will be sent.")
}
// Start the cloud provider sync manager
if kl.cloudResourceSyncManager != nil {
go kl.cloudResourceSyncManager.Run(wait.NeverStop)
}
//初始化模塊,包括volume 數(shù)據(jù)目錄 容器日志
//啟動鏡像管理 啟動證書管理 OOM管理
//啟動資源分析器
// Start volume manager
go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
// Start syncing node status immediately, this may set up things the runtime needs to run.
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
go kl.fastStatusUpdateOnce()
// start syncing lease
go kl.nodeLeaseController.Run(wait.NeverStop)
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
// Start loop to sync iptables util rules
go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
// Start a goroutine responsible for killing pods (that are not properly
// handled by pod workers).
go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
// Start component sync loops.
kl.statusManager.Start() //狀態(tài)管理
kl.probeManager.Start() //探針管理
// Start syncing RuntimeClasses if enabled.
kl.runtimeClassManager.Start(wait.NeverStop)
// Start the pod lifecycle event generator.
kl.pleg.Start() //啟動容器的生命周期
kl.syncLoop(updates, kl) //循環(huán)同步
}
==至此kubelet啟動完成==
graph LR
Run-->synLoop
syncLoop
syncLoop is the main loop for processing changes. It watches for changes from three channels (file, apiserver, and http) and creates a union of them. For any new change seen, will run a sync against desired state and running state. If no changes are seen to the configuration, will synchronize the last known desired state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
// 準備工作
for{
time.Sleep(duration)
kl.syncLoopIteration(...)
...
}
}
graph LR
syncLoop-->syncLoopIteration
syncLoopIteration 接收來自多個方向的消息,run a sync against desired state and running state
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
select {
case u, open := <-configCh:
case e := <-plegCh:...
case <-syncCh:...
case update := <-kl.livenessManager.Updates():...
case <-housekeepingCh:...
}
return true
}
syncLoopIteration reads from various channels and dispatches pods to the given handler. 以configCh 為例
switch u.Op {
case kubetypes.ADD:
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.RESTORE:
// These are pods restored from the checkpoint. Treat them as new pods.
handler.HandlePodAdditions(u.Pods)
}
最終的立足點還是 syncHandler(還是Kubelet 自己實現(xiàn)的)浙于,下面分析下 HandlePodAdditions
新建 pod開始
代碼中去掉了跟創(chuàng)建 無關(guān)的部分护盈,刪減了日志、錯誤校驗等
//file:/pkg/kubelet/kubelet.go---2026
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
...
// Always add the pod to the pod manager. Kubelet relies on the pod manager as the source of truth for the desired state. If a pod does
// not exist in the pod manager, it means that it has been deleted in the apiserver and no action (other than cleanup) is required.
kl.podManager.AddPod(pod)
...
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
kl.probeManager.AddPod(pod)
}
}
kl.podManager.AddPod
和 kl.probeManager.AddPod(pod)
都只是將pod 納入跟蹤羞酗,真正創(chuàng)建pod的是dispatchWork腐宋,然后又轉(zhuǎn)回 kl.syncPod
//file:/pkg/kubelet/kubelet.go---1464
func (kl *Kubelet) syncPod(o syncPodOptions) error {
...
// Generate final API pod status with pod and status manager status
apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
if runnable := kl.canRunPod(pod); !runnable.Admit {...}
// Update status in the status manager
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// Create Cgroups for the pod and apply resource parameters to them if cgroups-per-qos flag is enabled.
pcm := kl.containerManager.NewPodContainerManager()
// Make data directories for the pod
kl.makePodDataDirs(pod);
// Fetch the pull secrets for the pod
pullSecrets := kl.getPullSecretsForPod(pod)
// Call the container runtime's SyncPod callback
result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
...
}
kubeGenericRuntimeManager.syncPod
//file:/pkg/kubelet/kuberuntime/kuberuntime_manager.go---618
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
// Step 1: Compute sandbox and container changes.
podContainerChanges := m.computePodActions(pod, podStatus)
...
// Step 4: Create a sandbox for the pod if necessary.
podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
// Get podSandboxConfig for containers to start.
podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
// Step 5: start the init container.
if container := podContainerChanges.NextInitContainerToStart; container != nil {
// Start the next init container.
msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit);
}
// Step 6: start containers in podContainerChanges.ContainersToStart.
for _, idx := range podContainerChanges.ContainersToStart {
container := &pod.Spec.Containers[idx]
msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular);
}
...
}
m.createPodSandbox 和 startContainer
pkg/kubelet/kuberuntime/包中,kuberuntime_manager.go 定義了 kubeGenericRuntimeManager struct 及其接口方法實現(xiàn)檀轨,但接口方法的內(nèi)部依賴方法 分散在 package 下的其它go文件中胸竞。其本質(zhì)是將 一個“類方法”分散在了多個go 文件中,多個文件合起來 組成了kubeGenericRuntimeManager 類實現(xiàn)参萄。
這個方法的內(nèi)容也非常多卫枝,它的主要邏輯是先比較傳遞過來的 pod 信息和實際運行的 pod(對于新建 pod 來說后者為空),計算出兩者的差別讹挎,也就是需要更新的地方校赤。然后先創(chuàng)建 infra 容器吆玖,配置好網(wǎng)絡(luò),然后再逐個創(chuàng)建應(yīng)用容器马篮。
func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, containerType kubecontainer.ContainerType) (string, error) {
// Step 1: pull the image.
imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)
// Step 2: create the container.
ref, err := kubecontainer.GenerateContainerRef(pod, container)
containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, containerType)
containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
// Step 3: start the container.
err = m.runtimeService.StartContainer(containerID)
// Step 4: execute the post start hook.
msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
}
- 默認sandbox image為 gcr.io/google_containers/pause-amd64:3.0
- ensureImageExists 當(dāng)鏡像不存在是進行拉取工作
- CreateContainer最終通過docker API POST方法調(diào)用 /containers/create
- CreateCheckpoint寫入文件沾乘,文件名為容器ID
- StartContainer最終通過docker API POST方法調(diào)用 /containers/containerID/start
- 重新寫入resolv.conf由docker產(chǎn)生,pod里的容器共享
- InspectContainer最終通過docker API GET方法調(diào)用 /containers/containerID/json
- 為容器建立網(wǎng)絡(luò)浑测,通過CNI建立網(wǎng)絡(luò)意鲸,建立loopback接口,建立網(wǎng)絡(luò)設(shè)置為混雜模式(調(diào)用命令ip link show dev / ip set bridgeName promisc on)尽爆。
以下為debug怎顾,PodSandbox從create到start的過程,一直到請求發(fā)送docker結(jié)束漱贱,其他調(diào)用與本次調(diào)用相似槐雾,可以自行debug
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {
podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)
if err != nil {
message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)
klog.Error(message)
return "", message, err
}
// Create pod logs directory
err = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)
if err != nil {
message := fmt.Sprintf("Create pod log directory for pod %q failed: %v", format.Pod(pod), err)
klog.Errorf(message)
return "", message, err
}
runtimeHandler := ""
if utilfeature.DefaultFeatureGate.Enabled(features.RuntimeClass) && m.runtimeClassManager != nil {
runtimeHandler, err = m.runtimeClassManager.LookupRuntimeHandler(pod.Spec.RuntimeClassName)
if err != nil {
message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
return "", message, err
}
if runtimeHandler != "" {
klog.V(2).Infof("Running pod %s with RuntimeHandler %q", format.Pod(pod), runtimeHandler)
}
}
//啟動RunPodSandbox
podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)
if err != nil {
message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)
klog.Error(message)
return "", message, err
}
return podSandBoxID, "", nil
}
/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/kuberuntime/instrumented_services.go
func (in instrumentedRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
const operation = "run_podsandbox"
startTime := time.Now()
defer recordOperation(operation, startTime)
defer metrics.RunPodSandboxDuration.WithLabelValues(runtimeHandler).Observe(metrics.SinceInSeconds(startTime))
out, err := in.service.RunPodSandbox(config, runtimeHandler)
recordError(operation, err)
if err != nil {
metrics.RunPodSandboxErrors.WithLabelValues(runtimeHandler).Inc()
}
return out, err
}
/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/remote/remote_runtime.go
func (r *RemoteRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
// Use 2 times longer timeout for sandbox operation (4 mins by default)
// TODO: Make the pod sandbox timeout configurable.
ctx, cancel := getContextWithTimeout(r.timeout * 2)
defer cancel()
resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
Config: config,
RuntimeHandler: runtimeHandler,
})
if err != nil {
klog.Errorf("RunPodSandbox from runtime service failed: %v", err)
return "", err
}
if resp.PodSandboxId == "" {
errorMessage := fmt.Sprintf("PodSandboxId is not set for sandbox %q", config.GetMetadata())
klog.Errorf("RunPodSandbox failed: %s", errorMessage)
return "", errors.New(errorMessage)
}
return resp.PodSandboxId, nil
}
kubelet cri grpc-client實現(xiàn)
/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2/api.pb.go
func (c *runtimeServiceClient) RunPodSandbox(ctx context.Context, in *RunPodSandboxRequest, opts ...grpc.CallOption) (*RunPodSandboxResponse, error) {
out := new(RunPodSandboxResponse)
err := grpc.Invoke(ctx, "/runtime.v1alpha2.RuntimeService/RunPodSandbox", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
grpc-server實現(xiàn)在dockershim中,繼續(xù)
/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/dockershim/docker_sandbox.go
func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
config := r.GetConfig()
// Step 1: Pull the image for the sandbox.
image := defaultSandboxImage
podSandboxImage := ds.podSandboxImage
if len(podSandboxImage) != 0 {
image = podSandboxImage
}
// NOTE: To use a custom sandbox image in a private repository, users need to configure the nodes with credentials properly.
// see: http://kubernetes.io/docs/user-guide/images/#configuring-nodes-to-authenticate-to-a-private-repository
// Only pull sandbox image when it's not present - v1.PullIfNotPresent.
if err := ensureSandboxImageExists(ds.client, image); err != nil {
return nil, err
}
// Step 2: Create the sandbox container.
if r.GetRuntimeHandler() != "" {
return nil, fmt.Errorf("RuntimeHandler %q not supported", r.GetRuntimeHandler())
}
createConfig, err := ds.makeSandboxDockerConfig(config, image)
if err != nil {
return nil, fmt.Errorf("failed to make sandbox docker config for pod %q: %v", config.Metadata.Name, err)
}
createResp, err := ds.client.CreateContainer(*createConfig)
if err != nil {
createResp, err = recoverFromCreationConflictIfNeeded(ds.client, *createConfig, err)
}
if err != nil || createResp == nil {
return nil, fmt.Errorf("failed to create a sandbox for pod %q: %v", config.Metadata.Name, err)
}
resp := &runtimeapi.RunPodSandboxResponse{PodSandboxId: createResp.ID}
ds.setNetworkReady(createResp.ID, false)
defer func(e *error) {
// Set networking ready depending on the error return of
// the parent function
if *e == nil {
ds.setNetworkReady(createResp.ID, true)
}
}(&err)
// Step 3: Create Sandbox Checkpoint.
if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil {
return nil, err
}
// Step 4: Start the sandbox container.
// Assume kubelet's garbage collector would remove the sandbox later, if
// startContainer failed.
err = ds.client.StartContainer(createResp.ID)
if err != nil {
return nil, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.Name, err)
}
// Rewrite resolv.conf file generated by docker.
// NOTE: cluster dns settings aren't passed anymore to docker api in all cases,
// not only for pods with host network: the resolver conf will be overwritten
// after sandbox creation to override docker's behaviour. This resolv.conf
// file is shared by all containers of the same pod, and needs to be modified
// only once per pod.
if dnsConfig := config.GetDnsConfig(); dnsConfig != nil {
containerInfo, err := ds.client.InspectContainer(createResp.ID)
if err != nil {
return nil, fmt.Errorf("failed to inspect sandbox container for pod %q: %v", config.Metadata.Name, err)
}
if err := rewriteResolvFile(containerInfo.ResolvConfPath, dnsConfig.Servers, dnsConfig.Searches, dnsConfig.Options); err != nil {
return nil, fmt.Errorf("rewrite resolv.conf failed for pod %q: %v", config.Metadata.Name, err)
}
}
// Do not invoke network plugins if in hostNetwork mode.
if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE {
return resp, nil
}
// Step 5: Setup networking for the sandbox.
// All pod networking is setup by a CNI plugin discovered at startup time.
// This plugin assigns the pod ip, sets up routes inside the sandbox,
// creates interfaces etc. In theory, its jurisdiction ends with pod
// sandbox networking, but it might insert iptables rules or open ports
// on the host as well, to satisfy parts of the pod spec that aren't
// recognized by the CNI standard yet.
cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID)
networkOptions := make(map[string]string)
if dnsConfig := config.GetDnsConfig(); dnsConfig != nil {
// Build DNS options.
dnsOption, err := json.Marshal(dnsConfig)
if err != nil {
return nil, fmt.Errorf("failed to marshal dns config for pod %q: %v", config.Metadata.Name, err)
}
networkOptions["dns"] = string(dnsOption)
}
err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations, networkOptions)
if err != nil {
errList := []error{fmt.Errorf("failed to set up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err)}
// Ensure network resources are cleaned up even if the plugin
// succeeded but an error happened between that success and here.
err = ds.network.TearDownPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID)
if err != nil {
errList = append(errList, fmt.Errorf("failed to clean up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err))
}
err = ds.client.StopContainer(createResp.ID, defaultSandboxGracePeriod)
if err != nil {
errList = append(errList, fmt.Errorf("failed to stop sandbox container %q for pod %q: %v", createResp.ID, config.Metadata.Name, err))
}
return resp, utilerrors.NewAggregate(errList)
}
return resp, nil
}
/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker/instrumented_client.go
func (in instrumentedInterface) StartContainer(id string) error {
const operation = "start_container"
defer recordOperation(operation, time.Now())
err := in.client.StartContainer(id)
recordError(operation, err)
return err
}
/workspace/goWorkspace/src/k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker/kube_docker_client.go
func (d *kubeDockerClient) StartContainer(id string) error {
ctx, cancel := d.getTimeoutContext()
defer cancel()
err := d.client.ContainerStart(ctx, id, dockertypes.ContainerStartOptions{})
if ctxErr := contextError(ctx); ctxErr != nil {
return ctxErr
}
return err
}
grpc調(diào)用到docker至此結(jié)束
/workspace/goWorkspace/src/k8s.io/kubernetes/vendor/github.com/docker/docker/client/container_start.go
func (cli *Client) ContainerStart(ctx context.Context, containerID string, options types.ContainerStartOptions) error {
query := url.Values{}
if len(options.CheckpointID) != 0 {
query.Set("checkpoint", options.CheckpointID)
}
if len(options.CheckpointDir) != 0 {
query.Set("checkpoint-dir", options.CheckpointDir)
}
resp, err := cli.post(ctx, "/containers/"+containerID+"/start", query, nil, nil)
ensureReaderClosed(resp)
return err
}
參考資料:
http://qiankunli.github.io/2018/12/31/kubernetes_source_kubelet.html#%E6%96%B0%E5%BB%BA-pod
https://cizixs.com/2017/06/07/kubelet-source-code-analysis-part-2/
https://toutiao.io/posts/z2e88b/preview