2021 年 05 月 29 日
kube-controller-manager之AD Cotroller源码分析

kube-controller-manager 组件中,有两个 controller 与存储相关,分别是 PV controller 与 AD controller。

基于 tag v1.17.4


AD Cotroller 分析

AD Cotroller 作用

AD Cotroller 全称 Attachment/Detachment 控制器,主要负责创建、删除 VolumeAttachment 对象,并调用 volume plugin 来做存储设备的 Attach/Detach 操作(将数据卷挂载到特定 node 节点上/从特定 node 节点上解除挂载),以及更新 node.Status.VolumesAttached 等。

不同的 volume plugin 的 Attach/Detach 操作逻辑有所不同,如通过 ceph-csi(out-tree volume plugin)来使用 ceph 存储,则的 Attach/Detach 操作只是修改 VolumeAttachment 对象的状态,而不会真正的将数据卷挂载到节点/从节点上解除挂载。

kubelet 启动参数--enable-controller-attach-detach

AD Cotroller 与 kubelet 中的 volume manager 逻辑相似,都可以做 Attach/Detach 操作,但是 kube-controller-manager 与 kubelet 中,只会有一个组件做 Attach/Detach 操作,通过 kubelet 启动参数--enable-controller-attach-detach 设置。设置为 true 表示启用 kube-controller-manager 的 AD controller 来做 Attach/Detach 操作,同时禁用 kubelet 执行 Attach/Detach 操作(默认值为 true)。


当 k8s 通过 ceph-csi 来使用 ceph 存储,volume plugin 为 ceph-csi,AD controller 的 Attach/Detach 操作,只是创建/删除 VolumeAttachment 对象,而不会真正的将数据卷挂载到节点/从节点上解除挂载;csi-attacer 组件也不会做挂载/解除挂载操作,只是更新 VolumeAttachment 对象,真正的节点挂载/解除挂载操作由 kubelet 中的 volume manager 调用 volume plugin(ceph-csi)来完成。


(1)desiredStateOfWorld: 记录着集群中期望要挂载到 node 的 pod 的 volume 信息,简称 DSW。

(2)actualStateOfWorld: 记录着集群中实际已经挂载到 node 节点的 volume 信息,简称 ASW。

node.Status.VolumesInUse 作用

node 对象中的node.Status.VolumesInUse记录的是已经 attach 到该 node 节点上,并已经 mount 了的 volume 信息。

该属性由 kubelet 的 volume manager 来更新。当一个 volume 加入 dsw 中,就会被更新到node.Status.VolumesInUse中,直到该 volume 在 asw 与 dsw 中均不存在或已经处于 unmounted 状态时,会从node.Status.VolumesInUse中移除。

AD controller 做 volume 的 dettach 操作前,会先判断该属性,如果该属性值中含有该 volume,则说明 volume 还在被使用,返回 dettach 失败的错误。

node.Status.VolumesAttached 作用

node 对象中的node.Status.VolumesAttached记录的是已经 attach 到该 node 节点上的 volume 信息。

该属性由 kube-controller-manager 中的 AD controller 根据 asw 的值来更新。

attachDetachController.Run 源码分析

直接看到attachDetachControllerRun()方法,来分析 attachDetachController 的主体处理逻辑。

主要调用了以下 4 个方法,下面会逐一分析:





// pkg/controller/volume/attachdetach/attach_detach_controller.gofunc (adc *attachDetachController) Run(stopCh <-chan struct{}) {  defer runtime.HandleCrash()  defer adc.pvcQueue.ShutDown()
klog.Infof("Starting attach detach controller") defer klog.Infof("Shutting down attach detach controller")
synced := []kcache.InformerSynced{adc.podsSynced, adc.nodesSynced, adc.pvcsSynced, adc.pvsSynced} if adc.csiNodeSynced != nil { synced = append(synced, adc.csiNodeSynced) } if adc.csiDriversSynced != nil { synced = append(synced, adc.csiDriversSynced) }
if !kcache.WaitForNamedCacheSync("attach detach", stopCh, synced...) { return }
err := adc.populateActualStateOfWorld() if err != nil { klog.Errorf("Error populating the actual state of world: %v", err) } err = adc.populateDesiredStateOfWorld() if err != nil { klog.Errorf("Error populating the desired state of world: %v", err) } go adc.reconciler.Run(stopCh) go adc.desiredStateOfWorldPopulator.Run(stopCh) go wait.Until(adc.pvcWorker, time.Second, stopCh) metrics.Register(adc.pvcLister, adc.pvLister, adc.podLister, adc.actualStateOfWorld, adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.csiMigratedPluginManager, adc.intreeToCSITranslator)

1 adc.populateActualStateOfWorld

作用:初始化 actualStateOfWorld 结构体。

主要逻辑:遍历 node 对象,获取 node.Status.VolumesAttached 与 node.Status.VolumesInUse,再调用 adc.actualStateOfWorld.MarkVolumeAsAttached 与 adc.processVolumesInUse 更新 actualStateOfWorld 信息。

// pkg/controller/volume/attachdetach/attach_detach_controller.gofunc (adc *attachDetachController) populateActualStateOfWorld() error {  klog.V(5).Infof("Populating ActualStateOfworld")  nodes, err := adc.nodeLister.List(labels.Everything())  if err != nil {    return err  }
for _, node := range nodes { nodeName := types.NodeName(node.Name) for _, attachedVolume := range node.Status.VolumesAttached { uniqueName := attachedVolume.Name // The nil VolumeSpec is safe only in the case the volume is not in use by any pod. // In such a case it should be detached in the first reconciliation cycle and the // volume spec is not needed to detach a volume. If the volume is used by a pod, it // its spec can be: this would happen during in the populateDesiredStateOfWorld which // scans the pods and updates their volumes in the ActualStateOfWorld too. err = adc.actualStateOfWorld.MarkVolumeAsAttached(uniqueName, nil /* VolumeSpec */, nodeName, attachedVolume.DevicePath) if err != nil { klog.Errorf("Failed to mark the volume as attached: %v", err) continue } adc.processVolumesInUse(nodeName, node.Status.VolumesInUse) adc.addNodeToDswp(node, types.NodeName(node.Name)) } } return nil}

2 adc.populateDesiredStateOfWorld

作用:初始化 desiredStateOfWorld 结构体。

主要逻辑:遍历 pod 列表,遍历 pod 的 volume 信息

(1)根据 pod 的 volume 信息来初始化 desiredStateOfWorld;

(2)从 actualStateOfWorld 中查询,如果 pod 的 volume 已经 attach 到 node 了,则更新 actualStateOfWorld,将该 volume 标记为已 attach。

// pkg/controller/volume/attachdetach/attach_detach_controller.gofunc (adc *attachDetachController) populateDesiredStateOfWorld() error {  klog.V(5).Infof("Populating DesiredStateOfworld")
pods, err := adc.podLister.List(labels.Everything()) if err != nil { return err } for _, pod := range pods { podToAdd := pod adc.podAdd(podToAdd) for _, podVolume := range podToAdd.Spec.Volumes { nodeName := types.NodeName(podToAdd.Spec.NodeName) // The volume specs present in the ActualStateOfWorld are nil, let's replace those // with the correct ones found on pods. The present in the ASW with no corresponding // pod will be detached and the spec is irrelevant. volumeSpec, err := util.CreateVolumeSpec(podVolume, podToAdd.Namespace, nodeName, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister, adc.csiMigratedPluginManager, adc.intreeToCSITranslator) if err != nil { klog.Errorf( "Error creating spec for volume %q, pod %q/%q: %v", podVolume.Name, podToAdd.Namespace, podToAdd.Name, err) continue } plugin, err := adc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec) if err != nil || plugin == nil { klog.V(10).Infof( "Skipping volume %q for pod %q/%q: it does not implement attacher interface. err=%v", podVolume.Name, podToAdd.Namespace, podToAdd.Name, err) continue } volumeName, err := volumeutil.GetUniqueVolumeNameFromSpec(plugin, volumeSpec) if err != nil { klog.Errorf( "Failed to find unique name for volume %q, pod %q/%q: %v", podVolume.Name, podToAdd.Namespace, podToAdd.Name, err) continue } if adc.actualStateOfWorld.IsVolumeAttachedToNode(volumeName, nodeName) { devicePath, err := adc.getNodeVolumeDevicePath(volumeName, nodeName) if err != nil { klog.Errorf("Failed to find device path: %v", err) continue } err = adc.actualStateOfWorld.MarkVolumeAsAttached(volumeName, volumeSpec, nodeName, devicePath) if err != nil { klog.Errorf("Failed to update volume spec for node %s: %v", nodeName, err) } } } }
return nil}

3 adc.reconciler.Run

主要是调用 rc.reconcile 做 desiredStateOfWorld 与 actualStateOfWorld 之间的调谐:对比 desiredStateOfWorld 与 actualStateOfWorld,做 attach 与 detach 操作,更新 actualStateOfWorld,并根据 actualStateOfWorld 更新 node 对象的.Status.VolumesAttached

// pkg/controller/volume/attachdetach/reconciler/reconciler.gofunc (rc *reconciler) Run(stopCh <-chan struct{}) {  wait.Until(rc.reconciliationLoopFunc(), rc.loopPeriod, stopCh)}
// reconciliationLoopFunc this can be disabled via cli option disableReconciliation.// It periodically checks whether the attached volumes from actual state// are still attached to the node and update the status if they are not.func (rc *reconciler) reconciliationLoopFunc() func() { return func() {
if rc.disableReconciliationSync { klog.V(5).Info("Skipping reconciling attached volumes still attached since it is disabled via the command line.") } else if rc.syncDuration < time.Second { klog.V(5).Info("Skipping reconciling attached volumes still attached since it is set to less than one second via the command line.") } else if time.Since(rc.timeOfLastSync) > rc.syncDuration { klog.V(5).Info("Starting reconciling attached volumes still attached") rc.sync() } }}

3.1 rc.reconcile


(1)遍历 actualStateOfWorld 中已经 attached 的 volume,判断 desiredStateOfWorld 中是否存在,如果不存在,则调用 rc.attacherDetacher.DetachVolume 执行该 volume 的 Detach 操作;

(2)遍历 desiredStateOfWorld 中期望被 attached 的 volume,判断 actualStateOfWorld 中是否已经 attached 到 node 上,如果没有,则先调用 rc.isMultiAttachForbidden 判断该 volume 的 AccessModes 是否支持多节点挂载,如支持,则继续调用 rc.attacherDetacher.AttachVolume 执行该 volume 的 attach 操作;

(3)调用 rc.nodeStatusUpdater.UpdateNodeStatuses():根据从 actualStateOfWorld 获取已经 attached 到 node 的 volume,更新 node.Status.VolumesAttached 的值。

// pkg/controller/volume/attachdetach/reconciler/reconciler.gofunc (rc *reconciler) reconcile() {  // Detaches are triggered before attaches so that volumes referenced by  // pods that are rescheduled to a different node are detached first.
// Ensure volumes that should be detached are detached. for _, attachedVolume := range rc.actualStateOfWorld.GetAttachedVolumes() { if !rc.desiredStateOfWorld.VolumeExists( attachedVolume.VolumeName, attachedVolume.NodeName) { // Don't even try to start an operation if there is already one running // This check must be done before we do any other checks, as otherwise the other checks // may pass while at the same time the volume leaves the pending state, resulting in // double detach attempts if rc.attacherDetacher.IsOperationPending(attachedVolume.VolumeName, "") { klog.V(10).Infof("Operation for volume %q is already running. Can't start detach for %q", attachedVolume.VolumeName, attachedVolume.NodeName) continue }
// Set the detach request time elapsedTime, err := rc.actualStateOfWorld.SetDetachRequestTime(attachedVolume.VolumeName, attachedVolume.NodeName) if err != nil { klog.Errorf("Cannot trigger detach because it fails to set detach request time with error %v", err) continue } // Check whether timeout has reached the maximum waiting time timeout := elapsedTime > rc.maxWaitForUnmountDuration // Check whether volume is still mounted. Skip detach if it is still mounted unless timeout if attachedVolume.MountedByNode && !timeout { klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Cannot detach volume because it is still mounted", "")) continue }
// Before triggering volume detach, mark volume as detached and update the node status // If it fails to update node status, skip detach volume err = rc.actualStateOfWorld.RemoveVolumeFromReportAsAttached(attachedVolume.VolumeName, attachedVolume.NodeName) if err != nil { klog.V(5).Infof("RemoveVolumeFromReportAsAttached failed while removing volume %q from node %q with: %v", attachedVolume.VolumeName, attachedVolume.NodeName, err) }
// Update Node Status to indicate volume is no longer safe to mount. err = rc.nodeStatusUpdater.UpdateNodeStatuses() if err != nil { // Skip detaching this volume if unable to update node status klog.Errorf(attachedVolume.GenerateErrorDetailed("UpdateNodeStatuses failed while attempting to report volume as attached", err).Error()) continue }
// Trigger detach volume which requires verifying safe to detach step // If timeout is true, skip verifySafeToDetach check klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting attacherDetacher.DetachVolume", "")) verifySafeToDetach := !timeout err = rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld) if err == nil { if !timeout { klog.Infof(attachedVolume.GenerateMsgDetailed("attacherDetacher.DetachVolume started", "")) } else { metrics.RecordForcedDetachMetric() klog.Warningf(attachedVolume.GenerateMsgDetailed("attacherDetacher.DetachVolume started", fmt.Sprintf("This volume is not safe to detach, but maxWaitForUnmountDuration %v expired, force detaching", rc.maxWaitForUnmountDuration))) } } if err != nil && !exponentialbackoff.IsExponentialBackoff(err) { // Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. klog.Errorf(attachedVolume.GenerateErrorDetailed("attacherDetacher.DetachVolume failed to start", err).Error()) } } }
// Update Node Status err := rc.nodeStatusUpdater.UpdateNodeStatuses() if err != nil { klog.Warningf("UpdateNodeStatuses failed with: %v", err) }}


主要逻辑:调用 rc.attacherDetacher.AttachVolume 触发 attach 逻辑。

// pkg/controller/volume/attachdetach/reconciler/reconciler.gofunc (rc *reconciler) attachDesiredVolumes() {  // Ensure volumes that should be attached are attached.  for _, volumeToAttach := range rc.desiredStateOfWorld.GetVolumesToAttach() {    if rc.actualStateOfWorld.IsVolumeAttachedToNode(volumeToAttach.VolumeName, volumeToAttach.NodeName) {      // Volume/Node exists, touch it to reset detachRequestedTime      if klog.V(5) {        klog.Infof(volumeToAttach.GenerateMsgDetailed("Volume attached--touching", ""))      }      rc.actualStateOfWorld.ResetDetachRequestTime(volumeToAttach.VolumeName, volumeToAttach.NodeName)      continue    }    // Don't even try to start an operation if there is already one running    if rc.attacherDetacher.IsOperationPending(volumeToAttach.VolumeName, "") {      if klog.V(10) {        klog.Infof("Operation for volume %q is already running. Can't start attach for %q", volumeToAttach.VolumeName, volumeToAttach.NodeName)      }      continue    }
if rc.isMultiAttachForbidden(volumeToAttach.VolumeSpec) { nodes := rc.actualStateOfWorld.GetNodesForAttachedVolume(volumeToAttach.VolumeName) if len(nodes) > 0 { if !volumeToAttach.MultiAttachErrorReported { rc.reportMultiAttachError(volumeToAttach, nodes) rc.desiredStateOfWorld.SetMultiAttachError(volumeToAttach.VolumeName, volumeToAttach.NodeName) } continue } }
// Volume/Node doesn't exist, spawn a goroutine to attach it if klog.V(5) { klog.Infof(volumeToAttach.GenerateMsgDetailed("Starting attacherDetacher.AttachVolume", "")) } err := rc.attacherDetacher.AttachVolume(volumeToAttach.VolumeToAttach, rc.actualStateOfWorld) if err == nil { klog.Infof(volumeToAttach.GenerateMsgDetailed("attacherDetacher.AttachVolume started", "")) } if err != nil && !exponentialbackoff.IsExponentialBackoff(err) { // Ignore exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. klog.Errorf(volumeToAttach.GenerateErrorDetailed("attacherDetacher.AttachVolume failed to start", err).Error()) } }}


主要逻辑:从 actualStateOfWorld 获取已经 attach 到 node 的 volume,更新 node 对象的 node.Status.VolumesAttached 属性值。

// pkg/controller/volume/attachdetach/statusupdater/node_status_updater.gofunc (nsu *nodeStatusUpdater) UpdateNodeStatuses() error {  // TODO: investigate right behavior if nodeName is empty  // kubernetes/kubernetes/issues/37777  nodesToUpdate := nsu.actualStateOfWorld.GetVolumesToReportAttached()  for nodeName, attachedVolumes := range nodesToUpdate {    nodeObj, err := nsu.nodeLister.Get(string(nodeName))    if errors.IsNotFound(err) {      // If node does not exist, its status cannot be updated.      // Do nothing so that there is no retry until node is created.      klog.V(2).Infof(        "Could not update node status. Failed to find node %q in NodeInformer cache. Error: '%v'",        nodeName,        err)      continue    } else if err != nil {      // For all other errors, log error and reset flag statusUpdateNeeded      // back to true to indicate this node status needs to be updated again.      klog.V(2).Infof("Error retrieving nodes from node lister. Error: %v", err)      nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)      continue    }
if err := nsu.updateNodeStatus(nodeName, nodeObj, attachedVolumes); err != nil { // If update node status fails, reset flag statusUpdateNeeded back to true // to indicate this node status needs to be updated again nsu.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)
klog.V(2).Infof( "Could not update node status for %q; re-marking for update. %v", nodeName, err)
// We currently always return immediately on error return err } } return nil}
func (nsu *nodeStatusUpdater) updateNodeStatus(nodeName types.NodeName, nodeObj *v1.Node, attachedVolumes []v1.AttachedVolume) error { node := nodeObj.DeepCopy() node.Status.VolumesAttached = attachedVolumes _, patchBytes, err := nodeutil.PatchNodeStatus(nsu.kubeClient.CoreV1(), nodeName, nodeObj, node) if err != nil { return err }
klog.V(4).Infof("Updating status %q for node %q succeeded. VolumesAttached: %v", patchBytes, nodeName, attachedVolumes) return nil}

4 adc.desiredStateOfWorldPopulator.Run

作用:更新 desiredStateOfWorld,跟踪 desiredStateOfWorld 初始化后的后续变化更新。


(1)dswp.findAndRemoveDeletedPods:更新 desiredStateOfWorld,从中删除已经不存在的 pod;

(2)dswp.findAndAddActivePods:更新 desiredStateOfWorld,将新增的 pod volume 加入 desiredStateOfWorld。

// pkg/controller/volume/attachdetach/populator/desired_state_of_world_populator.gofunc (dswp *desiredStateOfWorldPopulator) Run(stopCh <-chan struct{}) {  wait.Until(dswp.populatorLoopFunc(), dswp.loopSleepDuration, stopCh)}
func (dswp *desiredStateOfWorldPopulator) populatorLoopFunc() func() { return func() { dswp.findAndRemoveDeletedPods()
// findAndAddActivePods is called periodically, independently of the main // populator loop. if time.Since(dswp.timeOfLastListPods) < dswp.listPodsRetryDuration { klog.V(5).Infof( "Skipping findAndAddActivePods(). Not permitted until %v (listPodsRetryDuration %v).", dswp.timeOfLastListPods.Add(dswp.listPodsRetryDuration), dswp.listPodsRetryDuration)
return } dswp.findAndAddActivePods() }}

4.1 dswp.findAndRemoveDeletedPods


(1)从 desiredStateOfWorld 中取出 pod 列表;

(2)查询该 pod 对象是否还存在于 etcd;

(3)不存在则调用 dswp.desiredStateOfWorld.DeletePod 将该 pod 从 desiredStateOfWorld 中删除。

// pkg/controller/volume/attachdetach/populator/desired_state_of_world_populator.go// Iterate through all pods in desired state of world, and remove if they no// longer exist in the informerfunc (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {  for dswPodUID, dswPodToAdd := range dswp.desiredStateOfWorld.GetPodToAdd() {    dswPodKey, err := kcache.MetaNamespaceKeyFunc(dswPodToAdd.Pod)    if err != nil {      klog.Errorf("MetaNamespaceKeyFunc failed for pod %q (UID %q) with: %v", dswPodKey, dswPodUID, err)      continue    }
// Retrieve the pod object from pod informer with the namespace key namespace, name, err := kcache.SplitMetaNamespaceKey(dswPodKey) if err != nil { utilruntime.HandleError(fmt.Errorf("error splitting dswPodKey %q: %v", dswPodKey, err)) continue } informerPod, err := dswp.podLister.Pods(namespace).Get(name) switch { case errors.IsNotFound(err): // if we can't find the pod, we need to delete it below case err != nil: klog.Errorf("podLister Get failed for pod %q (UID %q) with %v", dswPodKey, dswPodUID, err) continue default: volumeActionFlag := util.DetermineVolumeAction( informerPod, dswp.desiredStateOfWorld, true /* default volume action */)
if volumeActionFlag { informerPodUID := volutil.GetUniquePodName(informerPod) // Check whether the unique identifier of the pod from dsw matches the one retrieved from pod informer if informerPodUID == dswPodUID { klog.V(10).Infof("Verified pod %q (UID %q) from dsw exists in pod informer.", dswPodKey, dswPodUID) continue } } }
// the pod from dsw does not exist in pod informer, or it does not match the unique identifier retrieved // from the informer, delete it from dsw klog.V(1).Infof("Removing pod %q (UID %q) from dsw because it does not exist in pod informer.", dswPodKey, dswPodUID) dswp.desiredStateOfWorld.DeletePod(dswPodUID, dswPodToAdd.VolumeName, dswPodToAdd.NodeName) }}

4.2 dswp.findAndAddActivePods()


(1)从 etcd 中获取全部 pod 信息;

(2)调用 util.ProcessPodVolumes,将新增 pod volume 加入 desiredStateOfWorld。

// pkg/controller/volume/attachdetach/populator/desired_state_of_world_populator.gofunc (dswp *desiredStateOfWorldPopulator) findAndAddActivePods() {  pods, err := dswp.podLister.List(labels.Everything())  if err != nil {    klog.Errorf("podLister List failed: %v", err)    return  }  dswp.timeOfLastListPods = time.Now()
for _, pod := range pods { if volutil.IsPodTerminated(pod, pod.Status) { // Do not add volumes for terminated pods continue } util.ProcessPodVolumes(pod, true, dswp.desiredStateOfWorld, dswp.volumePluginMgr, dswp.pvcLister, dswp.pvLister, dswp.csiMigratedPluginManager, dswp.intreeToCSITranslator)


attachDetachController.Run 中 4 个主要方法作用:

(1)adc.populateActualStateOfWorld():初始化 actualStateOfWorld 结构体。

(2)adc.populateDesiredStateOfWorld():初始化 desiredStateOfWorld 结构体。

(3)adc.reconciler.Run():做 desiredStateOfWorld 与 actualStateOfWorld 之间的调谐:对比 desiredStateOfWorld 与 actualStateOfWorld,做 attach 与 detach 操作,更新 actualStateOfWorld,并根据 actualStateOfWorld 更新 node 对象的.Status.VolumesAttached

(4)adc.desiredStateOfWorldPopulator.Run():更新 desiredStateOfWorld,跟踪 desiredStateOfWorld 初始化后的后续变化更新。

NewAttachDetachController 源码分析

接下来看到 NewAttachDetachController 方法,简单分析一下它的 EventHandler。从代码中可以看到,主要是注册了 pod 对象与 node 对象的 EventHandler。

// pkg/controller/volume/attachdetach/attach_detach_controller.go// NewAttachDetachController returns a new instance of AttachDetachController.func NewAttachDetachController(...
podInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ AddFunc: adc.podAdd, UpdateFunc: adc.podUpdate, DeleteFunc: adc.podDelete, })
nodeInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ AddFunc: adc.nodeAdd, UpdateFunc: adc.nodeUpdate, DeleteFunc: adc.nodeDelete, })

1 adc.podAdd

与 adc.podUpdate 一致,看下面 adc.podUpdate 的分析。

2 adc.podUpdate

作用:主要是更新 dsw,将新 pod 的 volume 加入 dsw 中。

主要看到 util.ProcessPodVolumes 方法。

// pkg/controller/volume/attachdetach/attach_detach_controller.gofunc (adc *attachDetachController) podUpdate(oldObj, newObj interface{}) {  pod, ok := newObj.(*v1.Pod)  if pod == nil || !ok {    return  }  if pod.Spec.NodeName == "" {    // Ignore pods without NodeName, indicating they are not scheduled.    return  }
volumeActionFlag := util.DetermineVolumeAction( pod, adc.desiredStateOfWorld, true /* default volume action */)
util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes */ adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister, adc.csiMigratedPluginManager, adc.intreeToCSITranslator)}


主要是更新 dsw,将新 pod 的 volume 加入 dsw 中。

// pkg/controller/volume/attachdetach/util/util.go// ProcessPodVolumes processes the volumes in the given pod and adds them to the// desired state of the world if addVolumes is true, otherwise it removes them.func ProcessPodVolumes(pod *v1.Pod, addVolumes bool, desiredStateOfWorld cache.DesiredStateOfWorld, volumePluginMgr *volume.VolumePluginMgr, pvcLister corelisters.PersistentVolumeClaimLister, pvLister corelisters.PersistentVolumeLister, csiMigratedPluginManager csimigration.PluginManager, csiTranslator csimigration.InTreeToCSITranslator) {  if pod == nil {    return  }
if len(pod.Spec.Volumes) <= 0 { klog.V(10).Infof("Skipping processing of pod %q/%q: it has no volumes.", pod.Namespace, pod.Name) return }
nodeName := types.NodeName(pod.Spec.NodeName) if nodeName == "" { klog.V(10).Infof( "Skipping processing of pod %q/%q: it is not scheduled to a node.", pod.Namespace, pod.Name) return } else if !desiredStateOfWorld.NodeExists(nodeName) { // If the node the pod is scheduled to does not exist in the desired // state of the world data structure, that indicates the node is not // yet managed by the controller. Therefore, ignore the pod. klog.V(4).Infof( "Skipping processing of pod %q/%q: it is scheduled to node %q which is not managed by the controller.", pod.Namespace, pod.Name, nodeName) return }
// Process volume spec for each volume defined in pod for _, podVolume := range pod.Spec.Volumes { volumeSpec, err := CreateVolumeSpec(podVolume, pod.Namespace, nodeName, volumePluginMgr, pvcLister, pvLister, csiMigratedPluginManager, csiTranslator) if err != nil { klog.V(10).Infof( "Error processing volume %q for pod %q/%q: %v", podVolume.Name, pod.Namespace, pod.Name, err) continue }
attachableVolumePlugin, err := volumePluginMgr.FindAttachablePluginBySpec(volumeSpec) if err != nil || attachableVolumePlugin == nil { klog.V(10).Infof( "Skipping volume %q for pod %q/%q: it does not implement attacher interface. err=%v", podVolume.Name, pod.Namespace, pod.Name, err) continue }
uniquePodName := util.GetUniquePodName(pod) if addVolumes { // Add volume to desired state of world _, err := desiredStateOfWorld.AddPod( uniquePodName, pod, volumeSpec, nodeName) if err != nil { klog.V(10).Infof( "Failed to add volume %q for pod %q/%q to desiredStateOfWorld. %v", podVolume.Name, pod.Namespace, pod.Name, err) }
} else { // Remove volume from desired state of world uniqueVolumeName, err := util.GetUniqueVolumeNameFromSpec( attachableVolumePlugin, volumeSpec) if err != nil { klog.V(10).Infof( "Failed to delete volume %q for pod %q/%q from desiredStateOfWorld. GetUniqueVolumeNameFromSpec failed with %v", podVolume.Name, pod.Namespace, pod.Name, err) continue } desiredStateOfWorld.DeletePod( uniquePodName, uniqueVolumeName, nodeName) } } return}

3 adc.podDelete

作用:将 volume 从 dsw 中删除。

// pkg/controller/volume/attachdetach/attach_detach_controller.gofunc (adc *attachDetachController) podDelete(obj interface{}) {  pod, ok := obj.(*v1.Pod)  if pod == nil || !ok {    return  }
util.ProcessPodVolumes(pod, false, /* addVolumes */ adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister, adc.csiMigratedPluginManager, adc.intreeToCSITranslator)}

4 adc.nodeAdd

主要是调用了 adc.nodeUpdate 方法进行处理,所以作用与 adc.nodeUpdate 基本相似,看 adc.nodeUpdate 的分析。

// pkg/controller/volume/attachdetach/attach_detach_controller.gofunc (adc *attachDetachController) nodeAdd(obj interface{}) {  node, ok := obj.(*v1.Node)  // TODO: investigate if nodeName is empty then if we can return  // kubernetes/kubernetes/issues/37777  if node == nil || !ok {    return  }  nodeName := types.NodeName(node.Name)  adc.nodeUpdate(nil, obj)  // kubernetes/kubernetes/issues/37586  // This is to workaround the case when a node add causes to wipe out  // the attached volumes field. This function ensures that we sync with  // the actual status.  adc.actualStateOfWorld.SetNodeStatusUpdateNeeded(nodeName)}

5 adc.nodeUpdate

作用:往 dsw 中添加 node,根据 node.Status.VolumesInUse 来更新 asw。

// pkg/controller/volume/attachdetach/attach_detach_controller.gofunc (adc *attachDetachController) nodeUpdate(oldObj, newObj interface{}) {  node, ok := newObj.(*v1.Node)  // TODO: investigate if nodeName is empty then if we can return  if node == nil || !ok {    return  }
nodeName := types.NodeName(node.Name) adc.addNodeToDswp(node, nodeName) adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)}

5.1 adc.addNodeToDswp

往 dsw 中添加 node

func (adc *attachDetachController) addNodeToDswp(node *v1.Node, nodeName types.NodeName) {  if _, exists := node.Annotations[volumeutil.ControllerManagedAttachAnnotation]; exists {    keepTerminatedPodVolumes := false
if t, ok := node.Annotations[volumeutil.KeepTerminatedPodVolumesAnnotation]; ok { keepTerminatedPodVolumes = (t == "true") }
// Node specifies annotation indicating it should be managed by attach // detach controller. Add it to desired state of world. adc.desiredStateOfWorld.AddNode(nodeName, keepTerminatedPodVolumes) }}

5.2 adc.processVolumesInUse

根据 node.Status.VolumesInUse 来更新 asw

// processVolumesInUse processes the list of volumes marked as "in-use"// according to the specified Node's Status.VolumesInUse and updates the// corresponding volume in the actual state of the world to indicate that it is// mounted.func (adc *attachDetachController) processVolumesInUse(  nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName) {  klog.V(4).Infof("processVolumesInUse for node %q", nodeName)  for _, attachedVolume := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) {    mounted := false    for _, volumeInUse := range volumesInUse {      if attachedVolume.VolumeName == volumeInUse {        mounted = true        break      }    }    err := adc.actualStateOfWorld.SetVolumeMountedByNode(attachedVolume.VolumeName, nodeName, mounted)    if err != nil {      klog.Warningf(        "SetVolumeMountedByNode(%q, %q, %v) returned an error: %v",        attachedVolume.VolumeName, nodeName, mounted, err)    }  }}

6 adc.nodeDelete

作用:从 dsw 中删除 node 以及该 node 相关的挂载信息

func (adc *attachDetachController) nodeDelete(obj interface{}) {  node, ok := obj.(*v1.Node)  if node == nil || !ok {    return  }
nodeName := types.NodeName(node.Name) if err := adc.desiredStateOfWorld.DeleteNode(nodeName); err != nil { // This might happen during drain, but we still want it to appear in our logs klog.Infof("error removing node %q from desired-state-of-world: %v", nodeName, err) }
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)}


pod 对象与 node 对象的 EventHandler 主要作用分别如下:

(1)adc.podAdd:更新 dsw,将新 pod 的 volume 加入 dsw 中;

(2)adc.podUpdate:更新 dsw,将新 pod 的 volume 加入 dsw 中;

(3)adc.podDelete:将 volume 从 dsw 中删除;

(4)adc.nodeAdd:往 dsw 中添加 node,根据 node.Status.VolumesInUse 来更新 asw;

(5)adc.nodeUpdate:往 dsw 中添加 node,根据 node.Status.VolumesInUse 来更新 asw;

(6)adc.nodeDelete:从 dsw 中删除 node 以及该 node 相关的挂载信息。


AD Cotroller 作用

AD Cotroller 全称 Attachment/Detachment 控制器,主要负责创建、删除 VolumeAttachment 对象,并调用 volume plugin 来做存储设备的 Attach/Detach 操作(将数据卷挂载到特定 node 节点上/从特定 node 节点上解除挂载),以及更新 node.Status.VolumesAttached 等。

不同的 volume plugin 的 Attach/Detach 操作逻辑有所不同,如通过 ceph-csi(out-tree volume plugin)来使用 ceph 存储,则的 Attach/Detach 操作只是修改 VolumeAttachment 对象的状态,而不会真正的将数据卷挂载到节点/从节点上解除挂载。


(1)desiredStateOfWorld: 记录着集群中期望要挂载到 node 的 pod 的 volume 信息,简称 DSW。

(2)actualStateOfWorld: 记录着集群中实际已经挂载到 node 节点的 volume 信息,简称 ASW。

AD controller 会做 desiredStateOfWorld 与 actualStateOfWorld 之间的调谐:对比 desiredStateOfWorld 与 actualStateOfWorld,对 volume 做 attach 或 detach 操作。

