写点什么

external-attacher 源码分析 (2)- 核心处理逻辑分析

用户头像
良凯尔
关注
发布于: 1 小时前
external-attacher源码分析(2)-核心处理逻辑分析

更多 ceph-csi 其他源码分析,请查看下面这篇博文:kubernetes ceph-csi 分析 - 目录导航https://xie.infoq.cn/article/4b1d3e32f124307a49cd9c1e3


摘要

ceph-csi 分析-external-attacher 源码分析。external-attacher 属于 external plugin 中的一个,辅助 csi plugin 组件,共同完成了存储相关操作。external-attacher watch volumeAttachment 对象,然后调用 csi plugin 来做 attach/dettach 操作,并修改 volumeAttachment 对象与 pv 对象。


基于 tag v2.1.1

https://github.com/kubernetes-csi/external-attacher/releases/tag/v2.1.1

external-attacher

external-attacher 属于 external plugin 中的一个。下面我们先来回顾一下 external plugin 以及 csi 系统结构。

external plugin

external plugin 包括了 external-provisioner、external-attacher、external-resizer、external-snapshotter 等,external plugin 辅助 csi plugin 组件,共同完成了存储相关操作。external plugin 负责 watch pvc、volumeAttachment 等对象,然后调用 volume plugin 来完成存储的相关操作。如 external-provisioner watch pvc 对象,然后调用 csi plugin 来创建存储,最后创建 pv 对象;external-attacher watch volumeAttachment 对象,然后调用 csi plugin 来做 attach/dettach 操作,并修改 volumeAttachment 对象与 pv 对象;external-resizer watch pvc 对象,然后调用 csi plugin 来做存储的扩容操作等。

csi 系统结构


external-attacher 作用分析

根据 CSI plugin 是否支持 ControllerPublish/ControllerUnpublish 操作,external-attacher 的作用分为如下两种:


(1)当 CSI plugin 不支持 ControllerPublish/ControllerUnpublish 操作时,AD controller(或 kubelet 的 volume manager)创建 VolumeAttachment 对象后,external-attacher 仅参与 VolumeAttachment 对象的修改,将 attached 属性值 patch 为 true;而 external-attacher 对 pv 对象无任何同步处理操作。


(2)当 CSI plugin 支持 ControllerPublish/ControllerUnpublish 操作时,external-attacher 调用 csi plugin(ControllerPublishVolume)进行存储的 attach 操作,然后更改 VolumeAttachment 对象,将 attached 属性值 patch 为 true,并 patch pv 对象,增加该 external-attacher 相关的 finalizer;对于 pv 对象,external-attacher 负责处理 pv 对象的 finalizer,patch pv 对象,去除该 external-attacher 相关的 finalizer(该 external-attacher 执行 attach 操作时添加的 finalizer)。

源码分析

external-attacher 的源码分析将分为两部分:


(1)main 方法以及启动参数分析;


(2)核心处理逻辑分析。


前面一篇博客已经对 external-attacher 的 main 方法以及启动参数做了分析,这篇博客将对 external-attacher 的核心处理逻辑进行源码分析。

Run

核心逻辑为跑 goroutine 不停的调用 ctrl.syncVA 与 ctrl.syncPV 对 VolumeAttachment 对象以及 PV 对象进行同步处理。


//external-attcher/pkg/controller/controller.go
// Run starts CSI attacher and listens on channel eventsfunc (ctrl *CSIAttachController) Run(workers int, stopCh <-chan struct{}) { defer ctrl.vaQueue.ShutDown() defer ctrl.pvQueue.ShutDown()
klog.Infof("Starting CSI attacher") defer klog.Infof("Shutting CSI attacher")
if !cache.WaitForCacheSync(stopCh, ctrl.vaListerSynced, ctrl.pvListerSynced) { klog.Errorf("Cannot sync caches") return } for i := 0; i < workers; i++ { go wait.Until(ctrl.syncVA, 0, stopCh) go wait.Until(ctrl.syncPV, 0, stopCh) }
if ctrl.shouldReconcileVolumeAttachment { go wait.Until(func() { err := ctrl.handler.ReconcileVA() if err != nil { klog.Errorf("Failed to reconcile volume attachments: %v", err) } }, ctrl.reconcileSync, stopCh) }
<-stopCh}
复制代码

1.syncVA

syncVA 负责 VolumeAttachment 对象的处理,核心逻辑为:


(1)判断 VolumeAttachment 对象的.Spec.Attacher属性值,判断是否由本 attacher 组件来负责 VolumeAttachment 对象的同步处理;


(2)调用 ctrl.handler.SyncNewOrUpdatedVolumeAttachment(va)。


//external-attcher/pkg/controller/controller.go
func (ctrl *CSIAttachController) syncVA() { key, quit := ctrl.vaQueue.Get() if quit { return } defer ctrl.vaQueue.Done(key)
vaName := key.(string) klog.V(4).Infof("Started VA processing %q", vaName)
// get VolumeAttachment to process va, err := ctrl.vaLister.Get(vaName) if err != nil { if apierrs.IsNotFound(err) { // VolumeAttachment was deleted in the meantime, ignore. klog.V(3).Infof("VA %q deleted, ignoring", vaName) return } klog.Errorf("Error getting VolumeAttachment %q: %v", vaName, err) ctrl.vaQueue.AddRateLimited(vaName) return } if va.Spec.Attacher != ctrl.attacherName { klog.V(4).Infof("Skipping VolumeAttachment %s for attacher %s", va.Name, va.Spec.Attacher) return } ctrl.handler.SyncNewOrUpdatedVolumeAttachment(va)}
复制代码

1.1 SyncNewOrUpdatedVolumeAttachment

ctrl.handler.SyncNewOrUpdatedVolumeAttachment()包含两个实现,将根据 CSI plugin 是否支持 ControllerPublish/ControllerUnpublish 操作来调用不同的实现。(调用不同实现的判断逻辑在 external-attacher 的 main 方法里,前面分析 external-attacher 的 main 方法时已经分析过了,忘记的可以回去看下)


(1)trivialHandler.SyncNewOrUpdatedVolumeAttachment:当 CSI plugin 不支持 ControllerPublish/ControllerUnpublish 操作时,AD controller(或 kubelet 的 volume manager)创建 VolumeAttachment 对象后,external-attacher 仅参与 VolumeAttachment 对象的修改,将 attached 属性值 patch 为 true。


(2)csiHandler.SyncNewOrUpdatedVolumeAttachment:当 CSI plugin 支持 ControllerPublish/ControllerUnpublish 操作时,external-attacher 调用 csi plugin(ControllerPublishVolume)进行存储的 attach 操作,然后更改 VolumeAttachment 对象,将 attached 属性值 patch 为 true。

1.1.1 trivialHandler.SyncNewOrUpdatedVolumeAttachment

先来看 trivialHandler 的实现。


当 VolumeAttachment 的 attached 属性值为 false 时,调用 markAsAttached 做进一步处理。


//external-attcher/pkg/controller/trivial_handler.go
func (h *trivialHandler) SyncNewOrUpdatedVolumeAttachment(va *storage.VolumeAttachment) { klog.V(4).Infof("Trivial sync[%s] started", va.Name) if !va.Status.Attached { // mark as attached if _, err := markAsAttached(h.client, va, nil); err != nil { klog.Warningf("Error saving VolumeAttachment %s as attached: %s", va.Name, err) h.vaQueue.AddRateLimited(va.Name) return } klog.V(2).Infof("Marked VolumeAttachment %s as attached", va.Name) } h.vaQueue.Forget(va.Name)}
复制代码


markAsAttached


markAsAttached 主要是将 VolumeAttachment 的 attached 属性值 patch 为 true。


//external-attcher/pkg/controller/util.go
func markAsAttached(client kubernetes.Interface, va *storage.VolumeAttachment, metadata map[string]string) (*storage.VolumeAttachment, error) { klog.V(4).Infof("Marking as attached %q", va.Name) clone := va.DeepCopy() clone.Status.Attached = true clone.Status.AttachmentMetadata = metadata clone.Status.AttachError = nil patch, err := createMergePatch(va, clone) if err != nil { return va, err } newVA, err := client.StorageV1beta1().VolumeAttachments().Patch(va.Name, types.MergePatchType, patch) if err != nil { return va, err } klog.V(4).Infof("Marked as attached %q", va.Name) return newVA, nil}
复制代码

1.1.2 csiHandler.SyncNewOrUpdatedVolumeAttachment

先来看 csiHandler 的实现。主要逻辑如下:


(1)当 VolumeAttachment 对象的.DeletionTimestamp字段为空,调用 h.syncAttach(调用了 csi plugin 的 ControllerPublishVolume 方法来做存储的 attach 操作,并调用 markAsAttached 将 VolumeAttachment 的 attached 属性值 patch 为 true);


(2)当 VolumeAttachment 对象的.DeletionTimestamp字段不为空,调用 h.syncDetach(调用了 csi plugin 的 ControllerUnpublishVolume 方法来做存储的 dettach 操作,并调用 markAsDetached 将 VolumeAttachment 的 attached 属性值 patch 为 false)。


// pkg/controller/csi_handler.gofunc (h *csiHandler) SyncNewOrUpdatedVolumeAttachment(va *storage.VolumeAttachment) {  klog.V(4).Infof("CSIHandler: processing VA %q", va.Name)
var err error if va.DeletionTimestamp == nil { err = h.syncAttach(va) } else { err = h.syncDetach(va) } if err != nil { // Re-queue with exponential backoff klog.V(2).Infof("Error processing %q: %s", va.Name, err) h.vaQueue.AddRateLimited(va.Name) return } // The operation has finished successfully, reset exponential backoff h.vaQueue.Forget(va.Name) klog.V(4).Infof("CSIHandler: finished processing %q", va.Name)}
复制代码


syncAttach


syncAttach 不展开分析了,感兴趣的可以自己深入分析,从h.csiAttach调用入手。


主要逻辑为:


(1)调用h.csiAttach做 attach 操作(patch pv 对象,增加该 external-attacher 相关的 Finalizer,并最终调用了 csi plugin 的 ControllerPublishVolume 方法来做存储的 attach 操作);


(2)调用 markAsAttached 将 VolumeAttachment 的 attached 属性值 patch 为 true。


// pkg/controller/csi_handler.gofunc (h *csiHandler) syncAttach(va *storage.VolumeAttachment) error {  if !h.consumeForceSync(va.Name) && va.Status.Attached {    // Volume is attached and no force sync, there is nothing to be done.    klog.V(4).Infof("%q is already attached", va.Name)    return nil  }
// Attach and report any error klog.V(2).Infof("Attaching %q", va.Name) va, metadata, err := h.csiAttach(va) if err != nil { var saveErr error va, saveErr = h.saveAttachError(va, err) if saveErr != nil { // Just log it, propagate the attach error. klog.V(2).Infof("Failed to save attach error to %q: %s", va.Name, saveErr.Error()) } // Add context to the error for logging err := fmt.Errorf("failed to attach: %s", err) return err } klog.V(2).Infof("Attached %q", va.Name)
// Mark as attached if _, err := markAsAttached(h.client, va, metadata); err != nil { return fmt.Errorf("failed to mark as attached: %s", err) } klog.V(4).Infof("Fully attached %q", va.Name) return nil}
复制代码


syncDetach


同样的,syncDetach 不展开分析,这里直接给出结果,最终调用了 csi plugin 的 ControllerUnpublishVolume 方法来做存储的 dettach 操作,并调用 markAsDetached 将 VolumeAttachment 的 attached 属性值 patch 为 false,感兴趣的可以自己深入分析,从h.csiDetach调用入手。


// pkg/controller/csi_handler.gofunc (h *csiHandler) syncDetach(va *storage.VolumeAttachment) error {  klog.V(4).Infof("Starting detach operation for %q", va.Name)  if !h.consumeForceSync(va.Name) && !h.hasVAFinalizer(va) {    klog.V(4).Infof("%q is already detached", va.Name)    return nil  }
// Detach and report any error klog.V(2).Infof("Detaching %q", va.Name) va, err := h.csiDetach(va) if err != nil { var saveErr error va, saveErr = h.saveDetachError(va, err) if saveErr != nil { // Just log it, propagate the detach error. klog.V(2).Infof("Failed to save detach error to %q: %s", va.Name, saveErr.Error()) } // Add context to the error for logging err := fmt.Errorf("failed to detach: %s", err) return err } klog.V(4).Infof("Fully detached %q", va.Name) return nil}
复制代码

2.syncPV

syncPV 负责 PV 对象的处理,核心逻辑为调用 ctrl.handler.SyncNewOrUpdatedPersistentVolume(pv)来对 pv 对象做进一步处理。


//external-attcher/pkg/controller/controller.go
// syncPV deals with one key off the queue. It returns false when it's time to quit.func (ctrl *CSIAttachController) syncPV() { key, quit := ctrl.pvQueue.Get() if quit { return } defer ctrl.pvQueue.Done(key)
pvName := key.(string) klog.V(4).Infof("Started PV processing %q", pvName)
// get PV to process pv, err := ctrl.pvLister.Get(pvName) if err != nil { if apierrs.IsNotFound(err) { // PV was deleted in the meantime, ignore. klog.V(3).Infof("PV %q deleted, ignoring", pvName) return } klog.Errorf("Error getting PersistentVolume %q: %v", pvName, err) ctrl.pvQueue.AddRateLimited(pvName) return } ctrl.handler.SyncNewOrUpdatedPersistentVolume(pv)}
复制代码

2.1 SyncNewOrUpdatedPersistentVolume

跟上面分析的 syncVA 中的 SyncNewOrUpdatedVolumeAttachment 一样,ctrl.handler.SyncNewOrUpdatedPersistentVolume()也包含两个实现,将根据 CSI plugin 是否支持 ControllerPublish/ControllerUnpublish 操作来调用不同的实现。(调用不同实现的判断逻辑在 external-attacher 的 main 方法里,前面分析 external-attacher 的 main 方法时已经分析过了,忘记的可以回去看下)


(1)trivialHandler.SyncNewOrUpdatedPersistentVolume:当 CSI plugin 不支持 ControllerPublish/ControllerUnpublish 操作时,external-attacher 不对 pv 对象做任何同步处理操做。


(2)csiHandler.SyncNewOrUpdatedPersistentVolume:当 CSI plugin 支持 ControllerPublish/ControllerUnpublish 操作时,external-attacher 负责处理 pv 对象的 finalizer,patch pv 对象,去除该 external-attacher 相关的 finalizer(该 external-attacher 执行 attach 操作时添加的 finalizer)。

2.1.1 trivialHandler.SyncNewOrUpdatedPersistentVolume

trivialHandler.SyncNewOrUpdatedPersistentVolume 方法直接返回,可以看出对于 pv 对象,不做处理。


//external-attcher/pkg/controller/trivial_handler.go
func (h *trivialHandler) SyncNewOrUpdatedPersistentVolume(pv *v1.PersistentVolume) { return}
复制代码


与 ceph-csi 搭配使用的 external-attacher 相关日志


I0907 03:30:18.426009       1 main.go:166] CSI driver does not support ControllerPublishUnpublish, using trivial handler
I0907 08:13:38.148672 1 controller.go:198] Started VA processing "csi-706ace7b44a2f618280cba51951595f98893b34a38015e3c4ef7d1160be7f67b"I0907 08:13:38.148698 1 trivial_handler.go:53] Trivial sync[csi-706ace7b44a2f618280cba51951595f98893b34a38015e3c4ef7d1160be7f67b] startedI0907 08:13:38.148707 1 util.go:35] Marking as attached "csi-706ace7b44a2f618280cba51951595f98893b34a38015e3c4ef7d1160be7f67b"I0907 08:13:38.156486 1 util.go:48] Marked as attached "csi-706ace7b44a2f618280cba51951595f98893b34a38015e3c4ef7d1160be7f67b"I0907 08:13:38.156510 1 trivial_handler.go:61] Marked VolumeAttachment csi-706ace7b44a2f618280cba51951595f98893b34a38015e3c4ef7d1160be7f67b as attached
复制代码

2.1.2 csiHandler.SyncNewOrUpdatedPersistentVolume

csiHandler.SyncNewOrUpdatedPersistentVolume 主要是处理 pv 对象的 finalizer,patch pv 对象,去除该 external-attacher 相关的 finalizer(该 external-attacher 执行 attach 操作时添加的 finalizer)。


主要逻辑:


(1)判断 pv 对象的 DeletionTimestamp 是否为空,为空则直接返回;


(2)检查 pv 对象是否含有该 external-attacher 相关的 finalizer,没有则直接返回;


(3)查询 volumeAttachment 对象列表,遍历查询是否有 va 对象记录着该 pv,有则直接返回;


(4)去除 pv 对象中该 external-attacher 相关的 finalizer;


(5)patch pv 对象。


// pkg/controller/csi_handler.gofunc (h *csiHandler) SyncNewOrUpdatedPersistentVolume(pv *v1.PersistentVolume) {  klog.V(4).Infof("CSIHandler: processing PV %q", pv.Name)  // Sync and remove finalizer on given PV  if pv.DeletionTimestamp == nil {    // Don't process anything that has no deletion timestamp.    klog.V(4).Infof("CSIHandler: processing PV %q: no deletion timestamp, ignoring", pv.Name)    h.pvQueue.Forget(pv.Name)    return  }
// Check if the PV has finalizer finalizer := GetFinalizerName(h.attacherName) found := false for _, f := range pv.Finalizers { if f == finalizer { found = true break } } if !found { // No finalizer -> no action required klog.V(4).Infof("CSIHandler: processing PV %q: no finalizer, ignoring", pv.Name) h.pvQueue.Forget(pv.Name) return }
// Check that there is no VA that requires the PV vas, err := h.vaLister.List(labels.Everything()) if err != nil { // Failed listing VAs? Try again with exp. backoff klog.Errorf("Failed to list VolumeAttachments for PV %q: %s", pv.Name, err.Error()) h.pvQueue.AddRateLimited(pv.Name) return } for _, va := range vas { if va.Spec.Source.PersistentVolumeName != nil && *va.Spec.Source.PersistentVolumeName == pv.Name { // This PV is needed by this VA, don't remove finalizer klog.V(4).Infof("CSIHandler: processing PV %q: VA %q found", pv.Name, va.Name) h.pvQueue.Forget(pv.Name) return } } // No VA found -> remove finalizer klog.V(4).Infof("CSIHandler: processing PV %q: no VA found, removing finalizer", pv.Name) clone := pv.DeepCopy() newFinalizers := []string{} for _, f := range pv.Finalizers { if f == finalizer { continue } newFinalizers = append(newFinalizers, f) } if len(newFinalizers) == 0 { // Canonize empty finalizers for unit test (so we don't need to // distinguish nil and [] there) newFinalizers = nil } clone.Finalizers = newFinalizers
if _, err = h.patchPV(pv, clone); err != nil { klog.Errorf("Failed to remove finalizer from PV %q: %s", pv.Name, err.Error()) h.pvQueue.AddRateLimited(pv.Name) return }
klog.V(2).Infof("Removed finalizer from PV %q", pv.Name) h.pvQueue.Forget(pv.Name)
return}
复制代码

总结

external-attacher 属于 external plugin 中的一个。

external-attacher 作用分析

根据 CSI plugin 是否支持 ControllerPublish/ControllerUnpublish 操作,external-attacher 的作用分为如下两种:


(1)当 CSI plugin 不支持 ControllerPublish/ControllerUnpublish 操作时,AD controller(或 kubelet 的 volume manager)创建 VolumeAttachment 对象后,external-attacher 仅参与 VolumeAttachment 对象的修改,将 attached 属性值 patch 为 true;而 external-attacher 对 pv 对象无任何同步处理操作。


(2)当 CSI plugin 支持 ControllerPublish/ControllerUnpublish 操作时,external-attacher 调用 csi plugin(ControllerPublishVolume)进行存储的 attach 操作,然后更改 VolumeAttachment 对象,将 attached 属性值 patch 为 true,并 patch pv 对象,增加该 external-attacher 相关的 Finalizer;对于 pv 对象,external-attacher 负责处理 pv 对象的 finalizer,patch pv 对象,去除该 external-attacher 相关的 finalizer(该 external-attacher 执行 attach 操作时添加的 finalizer)。

external-attacher 与 ceph-csi rbd 结合使用

ceph-csi 不支持 ControllerPublish/ControllerUnpublish 操作,所以 external-attacher 与 ceph-csi rbd 结合使用,external-attacher 仅参与 VolumeAttachment 对象的修改,将 attached 属性值 patch 为 true。

发布于: 1 小时前阅读数: 2
用户头像

良凯尔

关注

热爱的力量 2020.01.10 加入

kubernetes开发者

评论

发布
暂无评论
external-attacher源码分析(2)-核心处理逻辑分析