kubernetes(k8s) csi 插件attach-detach流程

  1. kubernetes (k8s) csi 插件開(kāi)發(fā)簡(jiǎn)介 http://www.reibang.com/p/88ec8cba7507

  2. kubernetes(k8s) csi 插件attach-detach流程 http://www.reibang.com/p/5c6e78b6b320

簡(jiǎn)介

因?yàn)閗8s csi plugin的工作流程屬于out-tree旺芽,所以k8s額外使用了輔助容器來(lái)與k8s組件通信东囚,本篇主要分析卷在掛載/卸載時(shí)候的代碼調(diào)用流程向瓷。

   CreateVolume +------------+ DeleteVolume
 +------------->|  CREATED   +--------------+
 |              +---+----+---+              |
 |       Controller |    | Controller       v
+++         Publish |    | Unpublish       +++
|X|          Volume |    | Volume          | |
+-+             +---v----+---+             +-+
                | NODE_READY |
                +---+----^---+
               Node |    | Node
            Publish |    | Unpublish
             Volume |    | Volume
                +---v----+---+
                | PUBLISHED  |
                +------------+

Figure 5: The lifecycle of a dynamically provisioned volume, from
creation to destruction.

以這個(gè)掛載圖為例, CreateVolume方法對(duì)應(yīng)創(chuàng)建卷, ControllerPublishVolume代表attach卷到對(duì)應(yīng)kubelet節(jié)點(diǎn), 而NodePublishVolume對(duì)應(yīng)mount卷到對(duì)應(yīng)目錄, NodeUnpublishVolume 代表unmount方法, ControllerUnpublishVolume 代表detach流程. 需要注意的是, attach和detach都是以節(jié)點(diǎn)為單位, 并不能具體到pod上.

kubernetes源碼

以下代碼都在k8s源碼中:

https://github.com/kubernetes/kubernetes.git

CSI Plugin

首先看kubernetes代碼中csi相關(guān)的代碼悯森。

path usage
pkg/volume/csi manager CSI plugin

csiPlugin源碼

  • csiPlugin結(jié)構(gòu)體
type csiPlugin struct {
  # host用來(lái)與kubelet對(duì)接
    host              volume.VolumeHost
    blockEnabled      bool
  # 用來(lái)列出對(duì)應(yīng)節(jié)點(diǎn)的CSIDriver
    csiDriverLister   csilister.CSIDriverLister
  #用來(lái)調(diào)用infomer和CSIDriverLister
    csiDriverInformer csiinformer.CSIDriverInformer
}
  • 加載csiPlugin
    ProbeVolumePlugins函數(shù)是k8s組件用來(lái)加載對(duì)應(yīng)的csiPlugin的函數(shù),調(diào)用函數(shù)會(huì)返回一個(gè)空的csiPlugin.
// ProbeVolumePlugins returns implemented plugins
func ProbeVolumePlugins() []volume.VolumePlugin {
    p := &csiPlugin{
        host:         nil,
        blockEnabled: utilfeature.DefaultFeatureGate.Enabled(features.CSIBlockVolume),
    }
    return []volume.VolumePlugin{p}
}
  • 初始化csiPlugin
# 傳入對(duì)應(yīng)的VolumeHost
func (p *csiPlugin) Init(host volume.VolumeHost) error {
    p.host = host

    if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
        csiClient := host.GetCSIClient()
        if csiClient == nil {
            klog.Warning("The client for CSI Custom Resources is not available, skipping informer initialization")
        } else {
            // Start informer for CSIDrivers.
            factory := csiapiinformer.NewSharedInformerFactory(csiClient, csiResyncPeriod)
            p.csiDriverInformer = factory.Csi().V1alpha1().CSIDrivers()
            p.csiDriverLister = p.csiDriverInformer.Lister()
            go factory.Start(wait.NeverStop)
        }
    }

    // Initializing csiDrivers map and label management channels
    csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
    nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host)

    // TODO(#70514) Init CSINodeInfo object if the CRD exists and create Driver
    // objects for migrated drivers.

    return nil
}
  • NewAttacher函數(shù)

NewAttacher函數(shù)返回csiAttacher實(shí)例

func (p *csiPlugin) NewAttacher() (volume.Attacher, error) {
    k8s := p.host.GetKubeClient()
    if k8s == nil {
        klog.Error(log("unable to get kubernetes client from host"))
        return nil, errors.New("unable to get Kubernetes client")
    }

    return &csiAttacher{
        plugin:        p,
        k8s:           k8s,
        waitSleepTime: 1 * time.Second,
    }, nil
}
  • NewDetacher函數(shù)

同NewAttacher函數(shù)妇垢,也返回csiAttacher實(shí)例

  • NewMounter函數(shù)

返回csiMountMgr實(shí)例

func (p *csiPlugin) NewMounter(
    spec *volume.Spec,
    pod *api.Pod,
    _ volume.VolumeOptions) (volume.Mounter, error) {
    ...

    mounter := &csiMountMgr{
        plugin:       p,
        k8s:          k8s,
        spec:         spec,
        pod:          pod,
        podUID:       pod.UID,
        driverName:   csiDriverName(pvSource.Driver),
        volumeID:     pvSource.VolumeHandle,
        specVolumeID: spec.Name(),
        csiClient:    csi,
        readOnly:     readOnly,
    }
  ...

    return mounter, nil
}
  • NewUnmounter方法

同上驳概,也返回csiMountMgr實(shí)例

csiAttacher

  • Attach

Attach方法負(fù)責(zé)創(chuàng)建VolumeAttachment

func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
  ...

  # 創(chuàng)建VolumeAttachment
    node := string(nodeName)
    pvName := spec.PersistentVolume.GetName()
  # 需要注意的是attachID只跟卷名稱(chēng), driver名稱(chēng), node名稱(chēng)相關(guān), 所以attach只能以節(jié)點(diǎn)
  # 為單位, 不能以pod為單位.
    attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, node)

    attachment := &storage.VolumeAttachment{
        ObjectMeta: meta.ObjectMeta{
            Name: attachID,
        },
        Spec: storage.VolumeAttachmentSpec{
            NodeName: node,
            Attacher: csiSource.Driver,
            Source: storage.VolumeAttachmentSource{
                PersistentVolumeName: &pvName,
            },
        },
    }
  ...

  # 判斷attach成功的條件是attachment.Status.Attached為T(mén)rue
    if _, err := c.waitForVolumeAttachment(csiSource.VolumeHandle, attachID, csiTimeout); err != nil {
        return "", err
    }

    klog.V(4).Info(log("attacher.Attach finished OK with VolumeAttachment object [%s]", attachID))

    // TODO(71164): In 1.15, return empty devicePath
    return attachID, nil
}
  • Detach

Detach方法負(fù)責(zé)刪除VolumeAttachment

func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error {
    // volumeName in format driverName<SEP>volumeHandle generated by plugin.GetVolumeName()
    if volumeName == "" {
        klog.Error(log("detacher.Detach missing value for parameter volumeName"))
        return errors.New("missing expected parameter volumeName")
    }
    parts := strings.Split(volumeName, volNameSep)
    if len(parts) != 2 {
        klog.Error(log("detacher.Detach insufficient info encoded in volumeName"))
        return errors.New("volumeName missing expected data")
    }

    driverName := parts[0]
    volID := parts[1]
    attachID := getAttachmentName(volID, driverName, string(nodeName))
  # 刪除VolumeAttachment
    if err := c.k8s.StorageV1beta1().VolumeAttachments().Delete(attachID, nil); err != nil {
        if apierrs.IsNotFound(err) {
            // object deleted or never existed, done
            klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volID))
            return nil
        }
        klog.Error(log("detacher.Detach failed to delete VolumeAttachment [%s]: %v", attachID, err))
        return err
    }

    klog.V(4).Info(log("detacher deleted ok VolumeAttachment.ID=%s", attachID))
  # 等待VolumeAttachment被刪除
    return c.waitForVolumeDetachment(volID, attachID)
}

VolumeAttachMent的名稱(chēng)只與卷的名稱(chēng), driver的名稱(chēng), 節(jié)點(diǎn)的名稱(chēng)有關(guān)系. 所以我們?cè)趯?shí)現(xiàn)csi driver的時(shí)候, 切記ControllerPublishVolume和ControllerUnpublishVolume的實(shí)現(xiàn)只與node有關(guān)與pod無(wú)關(guān).

func getAttachmentName(volName, csiDriverName, nodeName string) string {
    result := sha256.Sum256([]byte(fmt.Sprintf("%s%s%s", volName, csiDriverName, nodeName)))
    return fmt.Sprintf("csi-%x", result)
}

csiMountMgr

  • SetUp

SetUp 方法調(diào)用SetUpAt方法調(diào)用csi driver的NodePublishVolume方法.

func (c *csiMountMgr) SetUp(fsGroup *int64) error {
    return c.SetUpAt(c.GetPath(), fsGroup)
}

SetUpAt中進(jìn)行一些預(yù)處理, 然后傳遞參數(shù)到NodePublishVolume

func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
  klog.V(4).Infof(log("Mounter.SetUpAt(%s)", dir))
  # 判斷是否掛載
  mounted, err := isDirMounted(c.plugin, dir)
    if err != nil {
        klog.Error(log("mounter.SetUpAt failed while checking mount status for dir [%s]", dir))
        return err
    }
  # 如果掛載, 則返回
    if mounted {
        klog.V(4).Info(log("mounter.SetUpAt skipping mount, dir already mounted [%s]", dir))
        return nil
    }
  # 獲取卷中的spec.PersistentVolume.Spec.CSI
    csiSource, err := getCSISourceFromSpec(c.spec)
    if err != nil {
        klog.Error(log("mounter.SetupAt failed to get CSI persistent source: %v", err))
        return err
    }

    csi := c.csiClient
    ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
    defer cancel()

    // Check for STAGE_UNSTAGE_VOLUME set and populate deviceMountPath if so
    ...
    // search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
  # 如果csi客戶(hù)端沒(méi)有獲取到csi相關(guān)的上下文, 那么獲取卷對(duì)應(yīng)的VolumeAttachMent,
  # 并從中獲取attachment.Status.AttachmentMetadata
    if c.publishContext == nil {
        nodeName := string(c.plugin.host.GetNodeName())
        c.publishContext, err = c.plugin.getPublishContext(c.k8s, c.volumeID, string(c.driverName), nodeName)
        if err != nil {
            return err
        }
    }

  # csiSource.VolumeAttributes對(duì)應(yīng)的是CreateVolume方法返回的csi.CreateVolumeResponse中
  # 的VolumeContext
    attribs := csiSource.VolumeAttributes

    nodePublishSecrets := map[string]string{}
    if csiSource.NodePublishSecretRef != nil {
        nodePublishSecrets, err = getCredentialsFromSecret(c.k8s, csiSource.NodePublishSecretRef)
        if err != nil {
            return fmt.Errorf("fetching NodePublishSecretRef %s/%s failed: %v",
                csiSource.NodePublishSecretRef.Namespace, csiSource.NodePublishSecretRef.Name, err)
        }
    }

    // create target_dir before call to NodePublish
  # 創(chuàng)建掛載目標(biāo)目錄
    if err := os.MkdirAll(dir, 0750); err != nil {
        klog.Error(log("mouter.SetUpAt failed to create dir %#v:  %v", dir, err))
        return err
    }
    klog.V(4).Info(log("created target path successfully [%s]", dir))

    //TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
  # 此處的accessMode是在卷定義中獲取
    accessMode := api.ReadWriteOnce
    if c.spec.PersistentVolume.Spec.AccessModes != nil {
        accessMode = c.spec.PersistentVolume.Spec.AccessModes[0]
    }

    // Inject pod information into volume_attributes
    podAttrs, err := c.podAttributes()
    ...

    fsType := csiSource.FSType
    err = csi.NodePublishVolume(
        ctx,
        c.volumeID,
        c.readOnly,
        deviceMountPath,
        dir,
        accessMode,
        c.publishContext,
        attribs,
        nodePublishSecrets,
        fsType,
        c.spec.PersistentVolume.Spec.MountOptions,
    )

    if err != nil {
        klog.Errorf(log("mounter.SetupAt failed: %v", err))
        if removeMountDirErr := removeMountDir(c.plugin, dir); removeMountDirErr != nil {
            klog.Error(log("mounter.SetupAt failed to remove mount dir after a NodePublish() error [%s]: %v", dir, removeMountDirErr))
        }
        return err
    }

    // apply volume ownership
    // The following logic is derived from https://github.com/kubernetes/kubernetes/issues/66323
    // if fstype is "", then skip fsgroup (could be indication of non-block filesystem)
    // if fstype is provided and pv.AccessMode == ReadWriteOnly, then apply fsgroup

    err = c.applyFSGroup(fsType, fsGroup)
    if err != nil {
        // attempt to rollback mount.
        fsGrpErr := fmt.Errorf("applyFSGroup failed for vol %s: %v", c.volumeID, err)
        if unpubErr := csi.NodeUnpublishVolume(ctx, c.volumeID, dir); unpubErr != nil {
            klog.Error(log("NodeUnpublishVolume failed for [%s]: %v", c.volumeID, unpubErr))
            return fsGrpErr
        }

        if unmountErr := removeMountDir(c.plugin, dir); unmountErr != nil {
            klog.Error(log("removeMountDir failed for [%s]: %v", dir, unmountErr))
            return fsGrpErr
        }
        return fsGrpErr
    }

    klog.V(4).Infof(log("mounter.SetUp successfully requested NodePublish [%s]", dir))
    return nil
}

目前來(lái)看k8s并沒(méi)有提供卷的AccessMode與csi driver支持的VolumeCapabilityAccessModes之間的邏輯關(guān)系, 只是做了一個(gè)簡(jiǎn)單的轉(zhuǎn)化, 把卷定義中的AccessMode轉(zhuǎn)成對(duì)應(yīng)的類(lèi)型, 所以具體的邏輯還需要我們?cè)赾si driver中進(jìn)行實(shí)現(xiàn).

  • TearDown

TearDown 方法調(diào)用TearDownAt方法調(diào)用csi driver的NodeUnpublishVolume方法.

func (c *csiMountMgr) TearDown() error {
    return c.TearDownAt(c.GetPath())
}

kube-controller-manager源碼

path usage
pkg/controller contains code for controllers
  • attachDetachController

kube-controller-manager用attachDetachController管理卷的attach和detach,主要邏輯在以下函數(shù)中:

path usage
pkg/controller/volume/attachdetach/reconciler/reconciler.go manager attach/detach volume
func (rc *reconciler) reconcile() {
    // Detaches are triggered before attaches so that volumes referenced by
    // pods that are rescheduled to a different node are detached first.

    // Ensure volumes that should be detached are detached.
  # 遍歷已經(jīng)attach到節(jié)點(diǎn)上的卷
  # actualStateOfWorld代表實(shí)際的卷與節(jié)點(diǎn)的對(duì)應(yīng)關(guān)系
  # desiredStateOfWorld代表定義的卷與節(jié)點(diǎn)及pod的對(duì)應(yīng)關(guān)系
    for _, attachedVolume := range
  rc.actualStateOfWorld.GetAttachedVolumes() {

    # 如果該卷不再需要贩虾,則進(jìn)行detach
        if !rc.desiredStateOfWorld.VolumeExists(
            attachedVolume.VolumeName, attachedVolume.NodeName) {

            ...
      # DetachVolume方法最終調(diào)用plugin的NewDetacher函數(shù)杆麸,最后調(diào)用返回
      # 的csiAttacher的Detach方法
            err = rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld)
      ...
            }
        }
    }

  # attach對(duì)應(yīng)的卷
    rc.attachDesiredVolumes()

    // Update Node Status
    err := rc.nodeStatusUpdater.UpdateNodeStatuses()
    if err != nil {
        klog.Warningf("UpdateNodeStatuses failed with: %v", err)
    }
}

# attach卷
func (rc *reconciler) attachDesiredVolumes() {
    // Ensure volumes that should be attached are attached.
  # GetVolumesToAttach獲取需要attach到節(jié)點(diǎn)上的卷
    for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() {
    # 判斷需求中的卷是否實(shí)際中已經(jīng)存在
        if rc.actualStateOfWorld.VolumeNodeExists(volumeToAttach.VolumeName, volumeToAttach.NodeName) {
            // Volume/Node exists, touch it to reset detachRequestedTime
            if klog.V(5) {
                klog.Infof(volumeToAttach.GenerateMsgDetailed("Volume attached--touching", ""))
            }
            rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName)
            continue
        }
        // Don't even try to start an operation if there is already one running
    # 如果對(duì)應(yīng)的卷處在pending狀態(tài), 說(shuō)明對(duì)應(yīng)卷的操作正在執(zhí)行, 跳過(guò)本次處理.
        if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "") {
            if klog.V(10) {
                klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
            }
            continue
        }
    # 根據(jù)卷的屬性中的accessModes判斷是否可以attach到多個(gè)節(jié)點(diǎn)上, 
    # 比如ReadWriteOnce的卷已經(jīng)attach到一個(gè)節(jié)點(diǎn), 這時(shí)再想掛載到其他節(jié)點(diǎn)則會(huì)失敗.
        if rc.isMultiAttachForbidden(volumeToAttach.VolumeSpec) {
            nodes := rc.actualStateOfWorld.GetNodesForVolume(volumeToAttach.VolumeName)
            if len(nodes) > 0 {
                if !volumeToAttach.MultiAttachErrorReported {
                    rc.reportMultiAttachError(volumeToAttach, nodes)
                    rc.desiredStateOfWorld.SetMultiAttachError(volumeToAttach.VolumeName, volumeToAttach.NodeName)
                }
                continue
            }
        }

        // Volume/Node doesn't exist, spawn a goroutine to attach it
        if klog.V(5) {
            klog.Infof(volumeToAttach.GenerateMsgDetailed("Starting attacherDetacher.AttachVolume", ""))
        }
    # AttachVolume方法最終調(diào)用plugin的NewAttacher函數(shù)搁进,最后調(diào)用返回
    # 的csiAttacher的Attach方法
        err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld)
        if err == nil {
            klog.Infof(volumeToAttach.GenerateMsgDetailed("attacherDetacher.AttachVolume started", ""))
        }
        if err != nil && !exponentialbackoff.IsExponentialBackoff(err) {
            // Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected.
            // Log all other errors.
            klog.Errorf(volumeToAttach.GenerateErrorDetailed("attacherDetacher.AttachVolume failed to start", err).Error())
        }
    }
}

kuebelet源碼

path usage
pkg/kubelet contains the libraries that drive the Kubelet binary
  • volumemanager

kubelet對(duì)卷的處理也在reconcile函數(shù)中,注意kubelet和kube-controller-manager都有各自的reconciler,actualStateOfWorld,desiredStateOfWorld定義昔头,不要混淆饼问。

path usage
pkg/kubelet/volumemanager/reconciler/reconciler.go Mainly used to manage the mounting and unmounting of volumes
func (rc *reconciler) reconcile() {
    // Unmounts are triggered before mounts so that a volume that was
    // referenced by a pod that was deleted and is now referenced by another
    // pod is unmounted from the first pod before being mounted to the new
    // pod.

    // Ensure volumes that should be unmounted are unmounted.
  # GetMountedVolumes返回的是成功mount到pod上的卷
    for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {
    # 如果實(shí)際掛載的卷不需要被掛載,卸載卷
        if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
            // Volume is mounted, unmount it
            klog.V(5).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
      # UnmountVolume調(diào)用plugin的NewUnmounter創(chuàng)建實(shí)例揭斧,并調(diào)用TearDown方法
            err := rc.operationExecutor.UnmountVolume(
                mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
            ...
        }
    }

  # GetVolumesToMount返回需要attach到節(jié)點(diǎn)并掛載到pod上的卷
    // Ensure volumes that should be attached/mounted are attached/mounted.
    for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
        volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
        volumeToMount.DevicePath = devicePath
    # 如果需要掛載的卷還沒(méi)有被掛載
        if cache.IsVolumeNotAttachedError(err) {
      # controllerAttachDetachEnabled為true的時(shí)候匆瓜,一般為true
      #
            if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
                // Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
                // for controller to finish attaching volume.
                klog.V(5).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""))
        # 判斷卷的狀態(tài)是否是attach到node上
                err := rc.operationExecutor.VerifyControllerAttachedVolume(
                    volumeToMount.VolumeToMount,
                    rc.nodeName,
                    rc.actualStateOfWorld)
        ...
            } else {
        # 不使用controller的attach/detach controller,kubelet 直接調(diào)用plugin去attach
                // Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
                // so attach it
                volumeToAttach := operationexecutor.VolumeToAttach{
                    VolumeName: volumeToMount.VolumeName,
                    VolumeSpec: volumeToMount.VolumeSpec,
                    NodeName:   rc.nodeName,
                }
                klog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""))
                err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
                ...
            }
        } else if !volMounted || cache.IsRemountRequiredError(err) {
      # attach到node上的卷下次循環(huán)中會(huì)進(jìn)入這個(gè)分支重新掛載
      # 重新掛載
            // Volume is not mounted, or is already mounted, but requires remounting
            remountingLogStr := ""
            isRemount := cache.IsRemountRequiredError(err)
            if isRemount {
                remountingLogStr = "Volume is already mounted to pod, but remount was requested."
            }
            klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr))
      #
            err := rc.operationExecutor.MountVolume(
                rc.waitForAttachTimeout,
                volumeToMount.VolumeToMount,
                rc.actualStateOfWorld,
                isRemount)
            ...
        } else if cache.IsFSResizeRequiredError(err) &&
      # volume need resize
            utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
            klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandVolumeFSWithoutUnmounting", ""))
            err := rc.operationExecutor.ExpandVolumeFSWithoutUnmounting(
                volumeToMount.VolumeToMount,
                rc.actualStateOfWorld)
            ...
        }
    }

  # GetUnmountedVolumes返回的attach但是沒(méi)有掛載到任何pod上的卷
  // Ensure devices that should be detached/unmounted are detached/unmounted.
    for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
        // Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
        if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
            !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) {
            if attachedVolume.GloballyMounted {
                // Volume is globally mounted to device, unmount it
                klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
                err := rc.operationExecutor.UnmountDevice(
                    attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter)
                ...
            } else {
                // Volume is attached to node, detach it
                // Kubelet not responsible for detaching or this volume has a non-attachable volume plugin.
        # 等待controller detach
                if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
                    rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName)
                    klog.Infof(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath)))
                } else {
                    // Only detach if kubelet detach is enabled
                    klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", ""))
                    err := rc.operationExecutor.DetachVolume(
                        attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
                    ...
                }
            }
        }
    }
}

external-attacher源碼

https://github.com/kubernetes-csi/external-attacher.git

在attach/detach的時(shí)候external-attacher主要用來(lái)監(jiān)控VolumeAttachment并調(diào)用csi driver中的相關(guān)方法

  • CSIAttachController
path usage
pkg/controller/controller.go attaches / detaches CSI volumes using provided Handler interface

這個(gè)controller負(fù)責(zé)監(jiān)控VolumeAttachment并調(diào)用csi driver的相關(guān)方法。

// CSIAttachController is a controller that attaches / detaches CSI volumes using provided Handler interface
type CSIAttachController struct {
    client        kubernetes.Interface
    attacherName  string
    handler       Handler
    eventRecorder record.EventRecorder
    vaQueue       workqueue.RateLimitingInterface
    pvQueue       workqueue.RateLimitingInterface

    vaLister       storagelisters.VolumeAttachmentLister
    vaListerSynced cache.InformerSynced
    pvLister       corelisters.PersistentVolumeLister
    pvListerSynced cache.InformerSynced
}
  • SyncNewOrUpdatedVolumeAttachment

根據(jù)VolumeAttachment的狀態(tài)來(lái)調(diào)用csi driver未蝌,并更新VolumeAttachment的狀態(tài)。

func (h *csiHandler) SyncNewOrUpdatedVolumeAttachment(va *storage.VolumeAttachment) {
    glog.V(4).Infof("CSIHandler: processing VA %q", va.Name)

    var err error
    if va.DeletionTimestamp == nil {
        err = h.syncAttach(va)
    } else {
        err = h.syncDetach(va)
    }
  ...
}
  • syncAttach

attach操作會(huì)調(diào)用csi driver的ControllerPublishVolume方法

func (h *csiHandler) syncAttach(va *storage.VolumeAttachment) error {
    if va.Status.Attached {
        // Volume is attached, there is nothing to be done.
        glog.V(4).Infof("%q is already attached", va.Name)
        return nil
    }

    // Attach and report any error
    glog.V(2).Infof("Attaching %q", va.Name)
    # csiAttach最終調(diào)用csi driver的ControllerPublishVolume方法
    va, metadata, err := h.csiAttach(va)
    ...
    glog.V(2).Infof("Attached %q", va.Name)

    // Mark as attached
    if _, err := markAsAttached(h.client, va, metadata); err != nil {
        return fmt.Errorf("failed to mark as attached: %s", err)
    }
    glog.V(4).Infof("Fully attached %q", va.Name)
    return nil
}
  • syncDetach

syncDetach會(huì)調(diào)用csi driver的ControllerUnpublishVolume,然后把VolumeAttachment的狀態(tài)置為detach

結(jié)論

以這個(gè)卷的掛載流程來(lái)說(shuō)明下各部分都是如何工作的:

CreateVolume +------------+ DeleteVolume
+------------->|  CREATED   +--------------+
|              +---+----+---+              |
|       Controller |    | Controller       v
+++         Publish |    | Unpublish       +++
|X|          Volume |    | Volume          | |
+-+             +---v----+---+             +-+
             | NODE_READY |
             +---+----^---+
            Node |    | Node
         Publish |    | Unpublish
          Volume |    | Volume
             +---v----+---+
             | PUBLISHED  |
             +------------+
  • 掛載卷的過(guò)程:
  1. kube-controller-manager調(diào)用csi driver plugin來(lái)創(chuàng)建VolumeAttachment
  2. external-attach 監(jiān)控VolumeAttachment并調(diào)用csi driver的ControllerPublishVolume方法,根據(jù)返回值更改VolumeAttachment的狀態(tài)為attach
  3. kubelet 判斷卷的狀態(tài)是否為attach, 如果是則調(diào)用csi driver plugin的NodePublishVolume來(lái)進(jìn)行掛載.
  • 卸載卷:
  1. kubelet 把未掛載到pod(pod被刪除)上的卷調(diào)用csi driver plugin的NodeUnpublishVolume方法解綁
  2. kube-controller-manager 對(duì)attach到node但是沒(méi)有使用的卷進(jìn)行detach
  3. external-attach調(diào)用csi driver plugin的ControllerUnpublishVolume進(jìn)行detach

ps:

1.一個(gè)VolumeAttachment對(duì)應(yīng)一個(gè)綁定關(guān)系茧妒,所以如果是ReadWriteOnce萧吠,那其他VolumeAttachment創(chuàng)建不成功, 
這部分邏輯是由kube-controller-manager在attach的時(shí)候獲取持久卷的accessModes和卷已掛載的節(jié)點(diǎn)個(gè)數(shù)來(lái)判斷.

2. VolumeAttachment的ID只與卷名稱(chēng), csi driver名稱(chēng), node名稱(chēng)相關(guān). 所以確保attach和detach只與node有關(guān),與pod無(wú)關(guān).

3. 對(duì)于卷的accessModes, k8s在處理的時(shí)候都是以卷的accessModes為主, 并沒(méi)有根據(jù)csi driver支持的accessModes進(jìn)行判斷, 
所以需要我們?cè)诖a中進(jìn)行判斷并處理.

3.當(dāng)kubelet服務(wù)出現(xiàn)問(wèn)題的時(shí)候,此時(shí)k8s會(huì)調(diào)度刪除此節(jié)點(diǎn)上的pod,但是刪除的時(shí)候會(huì)因?yàn)閗ubelet出問(wèn)題而卡住。

此時(shí)kube-controller-manager的desiredStateOfWorld中依然存在這個(gè)卷桐筏,對(duì)應(yīng)的VolumeAttachment就不會(huì)被刪除纸型,卷也不會(huì)被重新掛載。

當(dāng)kubelet重啟的時(shí)候,pod會(huì)被徹底刪除狰腌,kube-controller-manager會(huì)調(diào)用卷的detach除破,也就是detach首先完成。

而kubelet重新啟動(dòng)的時(shí)候會(huì)重新加載actualStateOfWorld和desiredStateOfWorld,
因?yàn)閜od被刪除琼腔,所以kubelet的actualStateOfWorld沒(méi)有pod的掛載信息瑰枫,后續(xù)不會(huì)對(duì)這個(gè)卷進(jìn)行操作,導(dǎo)致上次的掛載信息依然存在丹莲。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末光坝,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子甥材,更是在濱河造成了極大的恐慌盯另,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,482評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件洲赵,死亡現(xiàn)場(chǎng)離奇詭異鸳惯,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)叠萍,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)芝发,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人俭令,你說(shuō)我怎么就攤上這事后德。” “怎么了抄腔?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,762評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵瓢湃,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我赫蛇,道長(zhǎng)绵患,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,273評(píng)論 1 279
  • 正文 為了忘掉前任悟耘,我火速辦了婚禮落蝙,結(jié)果婚禮上映穗,老公的妹妹穿的比我還像新娘赐纱。我一直安慰自己,他們只是感情好居夹,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,289評(píng)論 5 373
  • 文/花漫 我一把揭開(kāi)白布旺嬉。 她就那樣靜靜地躺著管行,像睡著了一般。 火紅的嫁衣襯著肌膚如雪邪媳。 梳的紋絲不亂的頭發(fā)上捐顷,一...
    開(kāi)封第一講書(shū)人閱讀 49,046評(píng)論 1 285
  • 那天荡陷,我揣著相機(jī)與錄音,去河邊找鬼迅涮。 笑死废赞,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的叮姑。 我是一名探鬼主播唉地,決...
    沈念sama閱讀 38,351評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼戏溺!你這毒婦竟也來(lái)了渣蜗?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 36,988評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤旷祸,失蹤者是張志新(化名)和其女友劉穎耕拷,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體托享,經(jīng)...
    沈念sama閱讀 43,476評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡骚烧,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,948評(píng)論 2 324
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了闰围。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片赃绊。...
    茶點(diǎn)故事閱讀 38,064評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖羡榴,靈堂內(nèi)的尸體忽然破棺而出碧查,到底是詐尸還是另有隱情,我是刑警寧澤校仑,帶...
    沈念sama閱讀 33,712評(píng)論 4 323
  • 正文 年R本政府宣布忠售,位于F島的核電站,受9級(jí)特大地震影響迄沫,放射性物質(zhì)發(fā)生泄漏稻扬。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,261評(píng)論 3 307
  • 文/蒙蒙 一羊瘩、第九天 我趴在偏房一處隱蔽的房頂上張望泰佳。 院中可真熱鬧,春花似錦尘吗、人聲如沸逝她。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,264評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)黔宛。三九已至,卻和暖如春侧戴,著一層夾襖步出監(jiān)牢的瞬間宁昭,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,486評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工酗宋, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留积仗,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,511評(píng)論 2 354
  • 正文 我出身青樓蜕猫,卻偏偏與公主長(zhǎng)得像寂曹,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子回右,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,802評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容