kubernetes (k8s) csi 插件開(kāi)發(fā)簡(jiǎn)介 http://www.reibang.com/p/88ec8cba7507
kubernetes(k8s) csi 插件attach-detach流程 http://www.reibang.com/p/5c6e78b6b320
簡(jiǎn)介
因?yàn)閗8s csi plugin的工作流程屬于out-tree旺芽,所以k8s額外使用了輔助容器來(lái)與k8s組件通信东囚,本篇主要分析卷在掛載/卸載時(shí)候的代碼調(diào)用流程向瓷。
CreateVolume +------------+ DeleteVolume
+------------->| CREATED +--------------+
| +---+----+---+ |
| Controller | | Controller v
+++ Publish | | Unpublish +++
|X| Volume | | Volume | |
+-+ +---v----+---+ +-+
| NODE_READY |
+---+----^---+
Node | | Node
Publish | | Unpublish
Volume | | Volume
+---v----+---+
| PUBLISHED |
+------------+
Figure 5: The lifecycle of a dynamically provisioned volume, from
creation to destruction.
以這個(gè)掛載圖為例, CreateVolume方法對(duì)應(yīng)創(chuàng)建卷, ControllerPublishVolume代表attach卷到對(duì)應(yīng)kubelet節(jié)點(diǎn), 而NodePublishVolume對(duì)應(yīng)mount卷到對(duì)應(yīng)目錄, NodeUnpublishVolume 代表unmount方法, ControllerUnpublishVolume 代表detach流程. 需要注意的是, attach和detach都是以節(jié)點(diǎn)為單位, 并不能具體到pod上.
kubernetes源碼
以下代碼都在k8s源碼中:
https://github.com/kubernetes/kubernetes.git
CSI Plugin
首先看kubernetes代碼中csi相關(guān)的代碼悯森。
path | usage |
---|---|
pkg/volume/csi | manager CSI plugin |
csiPlugin源碼
- csiPlugin結(jié)構(gòu)體
type csiPlugin struct {
# host用來(lái)與kubelet對(duì)接
host volume.VolumeHost
blockEnabled bool
# 用來(lái)列出對(duì)應(yīng)節(jié)點(diǎn)的CSIDriver
csiDriverLister csilister.CSIDriverLister
#用來(lái)調(diào)用infomer和CSIDriverLister
csiDriverInformer csiinformer.CSIDriverInformer
}
- 加載csiPlugin
ProbeVolumePlugins函數(shù)是k8s組件用來(lái)加載對(duì)應(yīng)的csiPlugin的函數(shù),調(diào)用函數(shù)會(huì)返回一個(gè)空的csiPlugin.
// ProbeVolumePlugins returns implemented plugins
func ProbeVolumePlugins() []volume.VolumePlugin {
p := &csiPlugin{
host: nil,
blockEnabled: utilfeature.DefaultFeatureGate.Enabled(features.CSIBlockVolume),
}
return []volume.VolumePlugin{p}
}
- 初始化csiPlugin
# 傳入對(duì)應(yīng)的VolumeHost
func (p *csiPlugin) Init(host volume.VolumeHost) error {
p.host = host
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
csiClient := host.GetCSIClient()
if csiClient == nil {
klog.Warning("The client for CSI Custom Resources is not available, skipping informer initialization")
} else {
// Start informer for CSIDrivers.
factory := csiapiinformer.NewSharedInformerFactory(csiClient, csiResyncPeriod)
p.csiDriverInformer = factory.Csi().V1alpha1().CSIDrivers()
p.csiDriverLister = p.csiDriverInformer.Lister()
go factory.Start(wait.NeverStop)
}
}
// Initializing csiDrivers map and label management channels
csiDrivers = csiDriversStore{driversMap: map[string]csiDriver{}}
nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host)
// TODO(#70514) Init CSINodeInfo object if the CRD exists and create Driver
// objects for migrated drivers.
return nil
}
- NewAttacher函數(shù)
NewAttacher函數(shù)返回csiAttacher實(shí)例
func (p *csiPlugin) NewAttacher() (volume.Attacher, error) {
k8s := p.host.GetKubeClient()
if k8s == nil {
klog.Error(log("unable to get kubernetes client from host"))
return nil, errors.New("unable to get Kubernetes client")
}
return &csiAttacher{
plugin: p,
k8s: k8s,
waitSleepTime: 1 * time.Second,
}, nil
}
- NewDetacher函數(shù)
同NewAttacher函數(shù)妇垢,也返回csiAttacher實(shí)例
- NewMounter函數(shù)
返回csiMountMgr實(shí)例
func (p *csiPlugin) NewMounter(
spec *volume.Spec,
pod *api.Pod,
_ volume.VolumeOptions) (volume.Mounter, error) {
...
mounter := &csiMountMgr{
plugin: p,
k8s: k8s,
spec: spec,
pod: pod,
podUID: pod.UID,
driverName: csiDriverName(pvSource.Driver),
volumeID: pvSource.VolumeHandle,
specVolumeID: spec.Name(),
csiClient: csi,
readOnly: readOnly,
}
...
return mounter, nil
}
- NewUnmounter方法
同上驳概,也返回csiMountMgr實(shí)例
csiAttacher
- Attach
Attach方法負(fù)責(zé)創(chuàng)建VolumeAttachment
func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
...
# 創(chuàng)建VolumeAttachment
node := string(nodeName)
pvName := spec.PersistentVolume.GetName()
# 需要注意的是attachID只跟卷名稱(chēng), driver名稱(chēng), node名稱(chēng)相關(guān), 所以attach只能以節(jié)點(diǎn)
# 為單位, 不能以pod為單位.
attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, node)
attachment := &storage.VolumeAttachment{
ObjectMeta: meta.ObjectMeta{
Name: attachID,
},
Spec: storage.VolumeAttachmentSpec{
NodeName: node,
Attacher: csiSource.Driver,
Source: storage.VolumeAttachmentSource{
PersistentVolumeName: &pvName,
},
},
}
...
# 判斷attach成功的條件是attachment.Status.Attached為T(mén)rue
if _, err := c.waitForVolumeAttachment(csiSource.VolumeHandle, attachID, csiTimeout); err != nil {
return "", err
}
klog.V(4).Info(log("attacher.Attach finished OK with VolumeAttachment object [%s]", attachID))
// TODO(71164): In 1.15, return empty devicePath
return attachID, nil
}
- Detach
Detach方法負(fù)責(zé)刪除VolumeAttachment
func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error {
// volumeName in format driverName<SEP>volumeHandle generated by plugin.GetVolumeName()
if volumeName == "" {
klog.Error(log("detacher.Detach missing value for parameter volumeName"))
return errors.New("missing expected parameter volumeName")
}
parts := strings.Split(volumeName, volNameSep)
if len(parts) != 2 {
klog.Error(log("detacher.Detach insufficient info encoded in volumeName"))
return errors.New("volumeName missing expected data")
}
driverName := parts[0]
volID := parts[1]
attachID := getAttachmentName(volID, driverName, string(nodeName))
# 刪除VolumeAttachment
if err := c.k8s.StorageV1beta1().VolumeAttachments().Delete(attachID, nil); err != nil {
if apierrs.IsNotFound(err) {
// object deleted or never existed, done
klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volID))
return nil
}
klog.Error(log("detacher.Detach failed to delete VolumeAttachment [%s]: %v", attachID, err))
return err
}
klog.V(4).Info(log("detacher deleted ok VolumeAttachment.ID=%s", attachID))
# 等待VolumeAttachment被刪除
return c.waitForVolumeDetachment(volID, attachID)
}
VolumeAttachMent的名稱(chēng)只與卷的名稱(chēng), driver的名稱(chēng), 節(jié)點(diǎn)的名稱(chēng)有關(guān)系. 所以我們?cè)趯?shí)現(xiàn)csi driver的時(shí)候, 切記ControllerPublishVolume和ControllerUnpublishVolume的實(shí)現(xiàn)只與node有關(guān)與pod無(wú)關(guān).
func getAttachmentName(volName, csiDriverName, nodeName string) string {
result := sha256.Sum256([]byte(fmt.Sprintf("%s%s%s", volName, csiDriverName, nodeName)))
return fmt.Sprintf("csi-%x", result)
}
csiMountMgr
- SetUp
SetUp 方法調(diào)用SetUpAt方法調(diào)用csi driver的NodePublishVolume方法.
func (c *csiMountMgr) SetUp(fsGroup *int64) error {
return c.SetUpAt(c.GetPath(), fsGroup)
}
SetUpAt中進(jìn)行一些預(yù)處理, 然后傳遞參數(shù)到NodePublishVolume
func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
klog.V(4).Infof(log("Mounter.SetUpAt(%s)", dir))
# 判斷是否掛載
mounted, err := isDirMounted(c.plugin, dir)
if err != nil {
klog.Error(log("mounter.SetUpAt failed while checking mount status for dir [%s]", dir))
return err
}
# 如果掛載, 則返回
if mounted {
klog.V(4).Info(log("mounter.SetUpAt skipping mount, dir already mounted [%s]", dir))
return nil
}
# 獲取卷中的spec.PersistentVolume.Spec.CSI
csiSource, err := getCSISourceFromSpec(c.spec)
if err != nil {
klog.Error(log("mounter.SetupAt failed to get CSI persistent source: %v", err))
return err
}
csi := c.csiClient
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
// Check for STAGE_UNSTAGE_VOLUME set and populate deviceMountPath if so
...
// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
# 如果csi客戶(hù)端沒(méi)有獲取到csi相關(guān)的上下文, 那么獲取卷對(duì)應(yīng)的VolumeAttachMent,
# 并從中獲取attachment.Status.AttachmentMetadata
if c.publishContext == nil {
nodeName := string(c.plugin.host.GetNodeName())
c.publishContext, err = c.plugin.getPublishContext(c.k8s, c.volumeID, string(c.driverName), nodeName)
if err != nil {
return err
}
}
# csiSource.VolumeAttributes對(duì)應(yīng)的是CreateVolume方法返回的csi.CreateVolumeResponse中
# 的VolumeContext
attribs := csiSource.VolumeAttributes
nodePublishSecrets := map[string]string{}
if csiSource.NodePublishSecretRef != nil {
nodePublishSecrets, err = getCredentialsFromSecret(c.k8s, csiSource.NodePublishSecretRef)
if err != nil {
return fmt.Errorf("fetching NodePublishSecretRef %s/%s failed: %v",
csiSource.NodePublishSecretRef.Namespace, csiSource.NodePublishSecretRef.Name, err)
}
}
// create target_dir before call to NodePublish
# 創(chuàng)建掛載目標(biāo)目錄
if err := os.MkdirAll(dir, 0750); err != nil {
klog.Error(log("mouter.SetUpAt failed to create dir %#v: %v", dir, err))
return err
}
klog.V(4).Info(log("created target path successfully [%s]", dir))
//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
# 此處的accessMode是在卷定義中獲取
accessMode := api.ReadWriteOnce
if c.spec.PersistentVolume.Spec.AccessModes != nil {
accessMode = c.spec.PersistentVolume.Spec.AccessModes[0]
}
// Inject pod information into volume_attributes
podAttrs, err := c.podAttributes()
...
fsType := csiSource.FSType
err = csi.NodePublishVolume(
ctx,
c.volumeID,
c.readOnly,
deviceMountPath,
dir,
accessMode,
c.publishContext,
attribs,
nodePublishSecrets,
fsType,
c.spec.PersistentVolume.Spec.MountOptions,
)
if err != nil {
klog.Errorf(log("mounter.SetupAt failed: %v", err))
if removeMountDirErr := removeMountDir(c.plugin, dir); removeMountDirErr != nil {
klog.Error(log("mounter.SetupAt failed to remove mount dir after a NodePublish() error [%s]: %v", dir, removeMountDirErr))
}
return err
}
// apply volume ownership
// The following logic is derived from https://github.com/kubernetes/kubernetes/issues/66323
// if fstype is "", then skip fsgroup (could be indication of non-block filesystem)
// if fstype is provided and pv.AccessMode == ReadWriteOnly, then apply fsgroup
err = c.applyFSGroup(fsType, fsGroup)
if err != nil {
// attempt to rollback mount.
fsGrpErr := fmt.Errorf("applyFSGroup failed for vol %s: %v", c.volumeID, err)
if unpubErr := csi.NodeUnpublishVolume(ctx, c.volumeID, dir); unpubErr != nil {
klog.Error(log("NodeUnpublishVolume failed for [%s]: %v", c.volumeID, unpubErr))
return fsGrpErr
}
if unmountErr := removeMountDir(c.plugin, dir); unmountErr != nil {
klog.Error(log("removeMountDir failed for [%s]: %v", dir, unmountErr))
return fsGrpErr
}
return fsGrpErr
}
klog.V(4).Infof(log("mounter.SetUp successfully requested NodePublish [%s]", dir))
return nil
}
目前來(lái)看k8s并沒(méi)有提供卷的AccessMode與csi driver支持的VolumeCapabilityAccessModes之間的邏輯關(guān)系, 只是做了一個(gè)簡(jiǎn)單的轉(zhuǎn)化, 把卷定義中的AccessMode轉(zhuǎn)成對(duì)應(yīng)的類(lèi)型, 所以具體的邏輯還需要我們?cè)赾si driver中進(jìn)行實(shí)現(xiàn).
- TearDown
TearDown 方法調(diào)用TearDownAt方法調(diào)用csi driver的NodeUnpublishVolume方法.
func (c *csiMountMgr) TearDown() error {
return c.TearDownAt(c.GetPath())
}
kube-controller-manager源碼
path | usage |
---|---|
pkg/controller | contains code for controllers |
- attachDetachController
kube-controller-manager用attachDetachController管理卷的attach和detach,主要邏輯在以下函數(shù)中:
path | usage |
---|---|
pkg/controller/volume/attachdetach/reconciler/reconciler.go | manager attach/detach volume |
func (rc *reconciler) reconcile() {
// Detaches are triggered before attaches so that volumes referenced by
// pods that are rescheduled to a different node are detached first.
// Ensure volumes that should be detached are detached.
# 遍歷已經(jīng)attach到節(jié)點(diǎn)上的卷
# actualStateOfWorld代表實(shí)際的卷與節(jié)點(diǎn)的對(duì)應(yīng)關(guān)系
# desiredStateOfWorld代表定義的卷與節(jié)點(diǎn)及pod的對(duì)應(yīng)關(guān)系
for _, attachedVolume := range
rc.actualStateOfWorld.GetAttachedVolumes() {
# 如果該卷不再需要贩虾,則進(jìn)行detach
if !rc.desiredStateOfWorld.VolumeExists(
attachedVolume.VolumeName, attachedVolume.NodeName) {
...
# DetachVolume方法最終調(diào)用plugin的NewDetacher函數(shù)杆麸,最后調(diào)用返回
# 的csiAttacher的Detach方法
err = rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld)
...
}
}
}
# attach對(duì)應(yīng)的卷
rc.attachDesiredVolumes()
// Update Node Status
err := rc.nodeStatusUpdater.UpdateNodeStatuses()
if err != nil {
klog.Warningf("UpdateNodeStatuses failed with: %v", err)
}
}
# attach卷
func (rc *reconciler) attachDesiredVolumes() {
// Ensure volumes that should be attached are attached.
# GetVolumesToAttach獲取需要attach到節(jié)點(diǎn)上的卷
for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() {
# 判斷需求中的卷是否實(shí)際中已經(jīng)存在
if rc.actualStateOfWorld.VolumeNodeExists(volumeToAttach.VolumeName, volumeToAttach.NodeName) {
// Volume/Node exists, touch it to reset detachRequestedTime
if klog.V(5) {
klog.Infof(volumeToAttach.GenerateMsgDetailed("Volume attached--touching", ""))
}
rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName)
continue
}
// Don't even try to start an operation if there is already one running
# 如果對(duì)應(yīng)的卷處在pending狀態(tài), 說(shuō)明對(duì)應(yīng)卷的操作正在執(zhí)行, 跳過(guò)本次處理.
if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "") {
if klog.V(10) {
klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)
}
continue
}
# 根據(jù)卷的屬性中的accessModes判斷是否可以attach到多個(gè)節(jié)點(diǎn)上,
# 比如ReadWriteOnce的卷已經(jīng)attach到一個(gè)節(jié)點(diǎn), 這時(shí)再想掛載到其他節(jié)點(diǎn)則會(huì)失敗.
if rc.isMultiAttachForbidden(volumeToAttach.VolumeSpec) {
nodes := rc.actualStateOfWorld.GetNodesForVolume(volumeToAttach.VolumeName)
if len(nodes) > 0 {
if !volumeToAttach.MultiAttachErrorReported {
rc.reportMultiAttachError(volumeToAttach, nodes)
rc.desiredStateOfWorld.SetMultiAttachError(volumeToAttach.VolumeName, volumeToAttach.NodeName)
}
continue
}
}
// Volume/Node doesn't exist, spawn a goroutine to attach it
if klog.V(5) {
klog.Infof(volumeToAttach.GenerateMsgDetailed("Starting attacherDetacher.AttachVolume", ""))
}
# AttachVolume方法最終調(diào)用plugin的NewAttacher函數(shù)搁进,最后調(diào)用返回
# 的csiAttacher的Attach方法
err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld)
if err == nil {
klog.Infof(volumeToAttach.GenerateMsgDetailed("attacherDetacher.AttachVolume started", ""))
}
if err != nil && !exponentialbackoff.IsExponentialBackoff(err) {
// Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
klog.Errorf(volumeToAttach.GenerateErrorDetailed("attacherDetacher.AttachVolume failed to start", err).Error())
}
}
}
kuebelet源碼
path | usage |
---|---|
pkg/kubelet | contains the libraries that drive the Kubelet binary |
- volumemanager
kubelet對(duì)卷的處理也在reconcile函數(shù)中,注意kubelet和kube-controller-manager都有各自的reconciler,actualStateOfWorld,desiredStateOfWorld定義昔头,不要混淆饼问。
path | usage |
---|---|
pkg/kubelet/volumemanager/reconciler/reconciler.go | Mainly used to manage the mounting and unmounting of volumes |
func (rc *reconciler) reconcile() {
// Unmounts are triggered before mounts so that a volume that was
// referenced by a pod that was deleted and is now referenced by another
// pod is unmounted from the first pod before being mounted to the new
// pod.
// Ensure volumes that should be unmounted are unmounted.
# GetMountedVolumes返回的是成功mount到pod上的卷
for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {
# 如果實(shí)際掛載的卷不需要被掛載,卸載卷
if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {
// Volume is mounted, unmount it
klog.V(5).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
# UnmountVolume調(diào)用plugin的NewUnmounter創(chuàng)建實(shí)例揭斧,并調(diào)用TearDown方法
err := rc.operationExecutor.UnmountVolume(
mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
...
}
}
# GetVolumesToMount返回需要attach到節(jié)點(diǎn)并掛載到pod上的卷
// Ensure volumes that should be attached/mounted are attached/mounted.
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
volumeToMount.DevicePath = devicePath
# 如果需要掛載的卷還沒(méi)有被掛載
if cache.IsVolumeNotAttachedError(err) {
# controllerAttachDetachEnabled為true的時(shí)候匆瓜,一般為true
#
if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
// Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
// for controller to finish attaching volume.
klog.V(5).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""))
# 判斷卷的狀態(tài)是否是attach到node上
err := rc.operationExecutor.VerifyControllerAttachedVolume(
volumeToMount.VolumeToMount,
rc.nodeName,
rc.actualStateOfWorld)
...
} else {
# 不使用controller的attach/detach controller,kubelet 直接調(diào)用plugin去attach
// Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
// so attach it
volumeToAttach := operationexecutor.VolumeToAttach{
VolumeName: volumeToMount.VolumeName,
VolumeSpec: volumeToMount.VolumeSpec,
NodeName: rc.nodeName,
}
klog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""))
err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)
...
}
} else if !volMounted || cache.IsRemountRequiredError(err) {
# attach到node上的卷下次循環(huán)中會(huì)進(jìn)入這個(gè)分支重新掛載
# 重新掛載
// Volume is not mounted, or is already mounted, but requires remounting
remountingLogStr := ""
isRemount := cache.IsRemountRequiredError(err)
if isRemount {
remountingLogStr = "Volume is already mounted to pod, but remount was requested."
}
klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr))
#
err := rc.operationExecutor.MountVolume(
rc.waitForAttachTimeout,
volumeToMount.VolumeToMount,
rc.actualStateOfWorld,
isRemount)
...
} else if cache.IsFSResizeRequiredError(err) &&
# volume need resize
utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandVolumeFSWithoutUnmounting", ""))
err := rc.operationExecutor.ExpandVolumeFSWithoutUnmounting(
volumeToMount.VolumeToMount,
rc.actualStateOfWorld)
...
}
}
# GetUnmountedVolumes返回的attach但是沒(méi)有掛載到任何pod上的卷
// Ensure devices that should be detached/unmounted are detached/unmounted.
for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
// Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&
!rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) {
if attachedVolume.GloballyMounted {
// Volume is globally mounted to device, unmount it
klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
err := rc.operationExecutor.UnmountDevice(
attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter)
...
} else {
// Volume is attached to node, detach it
// Kubelet not responsible for detaching or this volume has a non-attachable volume plugin.
# 等待controller detach
if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName)
klog.Infof(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath)))
} else {
// Only detach if kubelet detach is enabled
klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", ""))
err := rc.operationExecutor.DetachVolume(
attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
...
}
}
}
}
}
external-attacher源碼
https://github.com/kubernetes-csi/external-attacher.git
在attach/detach的時(shí)候external-attacher主要用來(lái)監(jiān)控VolumeAttachment并調(diào)用csi driver中的相關(guān)方法
- CSIAttachController
path | usage |
---|---|
pkg/controller/controller.go | attaches / detaches CSI volumes using provided Handler interface |
這個(gè)controller負(fù)責(zé)監(jiān)控VolumeAttachment并調(diào)用csi driver的相關(guān)方法。
// CSIAttachController is a controller that attaches / detaches CSI volumes using provided Handler interface
type CSIAttachController struct {
client kubernetes.Interface
attacherName string
handler Handler
eventRecorder record.EventRecorder
vaQueue workqueue.RateLimitingInterface
pvQueue workqueue.RateLimitingInterface
vaLister storagelisters.VolumeAttachmentLister
vaListerSynced cache.InformerSynced
pvLister corelisters.PersistentVolumeLister
pvListerSynced cache.InformerSynced
}
- SyncNewOrUpdatedVolumeAttachment
根據(jù)VolumeAttachment的狀態(tài)來(lái)調(diào)用csi driver未蝌,并更新VolumeAttachment的狀態(tài)。
func (h *csiHandler) SyncNewOrUpdatedVolumeAttachment(va *storage.VolumeAttachment) {
glog.V(4).Infof("CSIHandler: processing VA %q", va.Name)
var err error
if va.DeletionTimestamp == nil {
err = h.syncAttach(va)
} else {
err = h.syncDetach(va)
}
...
}
- syncAttach
attach操作會(huì)調(diào)用csi driver的ControllerPublishVolume方法
func (h *csiHandler) syncAttach(va *storage.VolumeAttachment) error {
if va.Status.Attached {
// Volume is attached, there is nothing to be done.
glog.V(4).Infof("%q is already attached", va.Name)
return nil
}
// Attach and report any error
glog.V(2).Infof("Attaching %q", va.Name)
# csiAttach最終調(diào)用csi driver的ControllerPublishVolume方法
va, metadata, err := h.csiAttach(va)
...
glog.V(2).Infof("Attached %q", va.Name)
// Mark as attached
if _, err := markAsAttached(h.client, va, metadata); err != nil {
return fmt.Errorf("failed to mark as attached: %s", err)
}
glog.V(4).Infof("Fully attached %q", va.Name)
return nil
}
- syncDetach
syncDetach會(huì)調(diào)用csi driver的ControllerUnpublishVolume,然后把VolumeAttachment的狀態(tài)置為detach
結(jié)論
以這個(gè)卷的掛載流程來(lái)說(shuō)明下各部分都是如何工作的:
CreateVolume +------------+ DeleteVolume
+------------->| CREATED +--------------+
| +---+----+---+ |
| Controller | | Controller v
+++ Publish | | Unpublish +++
|X| Volume | | Volume | |
+-+ +---v----+---+ +-+
| NODE_READY |
+---+----^---+
Node | | Node
Publish | | Unpublish
Volume | | Volume
+---v----+---+
| PUBLISHED |
+------------+
- 掛載卷的過(guò)程:
- kube-controller-manager調(diào)用csi driver plugin來(lái)創(chuàng)建VolumeAttachment
- external-attach 監(jiān)控VolumeAttachment并調(diào)用csi driver的ControllerPublishVolume方法,根據(jù)返回值更改VolumeAttachment的狀態(tài)為attach
- kubelet 判斷卷的狀態(tài)是否為attach, 如果是則調(diào)用csi driver plugin的NodePublishVolume來(lái)進(jìn)行掛載.
- 卸載卷:
- kubelet 把未掛載到pod(pod被刪除)上的卷調(diào)用csi driver plugin的NodeUnpublishVolume方法解綁
- kube-controller-manager 對(duì)attach到node但是沒(méi)有使用的卷進(jìn)行detach
- external-attach調(diào)用csi driver plugin的ControllerUnpublishVolume進(jìn)行detach
ps:
1.一個(gè)VolumeAttachment對(duì)應(yīng)一個(gè)綁定關(guān)系茧妒,所以如果是ReadWriteOnce萧吠,那其他VolumeAttachment創(chuàng)建不成功,
這部分邏輯是由kube-controller-manager在attach的時(shí)候獲取持久卷的accessModes和卷已掛載的節(jié)點(diǎn)個(gè)數(shù)來(lái)判斷.
2. VolumeAttachment的ID只與卷名稱(chēng), csi driver名稱(chēng), node名稱(chēng)相關(guān). 所以確保attach和detach只與node有關(guān),與pod無(wú)關(guān).
3. 對(duì)于卷的accessModes, k8s在處理的時(shí)候都是以卷的accessModes為主, 并沒(méi)有根據(jù)csi driver支持的accessModes進(jìn)行判斷,
所以需要我們?cè)诖a中進(jìn)行判斷并處理.
3.當(dāng)kubelet服務(wù)出現(xiàn)問(wèn)題的時(shí)候,此時(shí)k8s會(huì)調(diào)度刪除此節(jié)點(diǎn)上的pod,但是刪除的時(shí)候會(huì)因?yàn)閗ubelet出問(wèn)題而卡住。
此時(shí)kube-controller-manager的desiredStateOfWorld中依然存在這個(gè)卷桐筏,對(duì)應(yīng)的VolumeAttachment就不會(huì)被刪除纸型,卷也不會(huì)被重新掛載。
當(dāng)kubelet重啟的時(shí)候,pod會(huì)被徹底刪除狰腌,kube-controller-manager會(huì)調(diào)用卷的detach除破,也就是detach首先完成。
而kubelet重新啟動(dòng)的時(shí)候會(huì)重新加載actualStateOfWorld和desiredStateOfWorld,
因?yàn)閜od被刪除琼腔,所以kubelet的actualStateOfWorld沒(méi)有pod的掛載信息瑰枫,后續(xù)不會(huì)對(duì)這個(gè)卷進(jìn)行操作,導(dǎo)致上次的掛載信息依然存在丹莲。