写点什么

kube-controller-manager 之 PV Cotroller 源码分析

用户头像
良凯尔
关注
发布于: 2021 年 05 月 22 日
kube-controller-manager之PV Cotroller源码分析

kubernetes ceph-csi 分析 - 目录导航:

https://xie.infoq.cn/article/4b1d3e32f124307a49cd9c1e3

概述

kube-controller-manager 组件中,有两个 controller 与存储相关,分别是 PV controller 与 AD controller。


基于 tag v1.17.4


https://github.com/kubernetes/kubernetes/releases/tag/v1.17.4

PV Cotroller 分析

这节先对 PV controller 进行分析。

涉及主要对象

(1)Persistent Volume (PV): 持久化存储卷,详细定义了存储的各项参数。


(2)Persistent Volume Claim (PVC):持久化存储卷的使用声明,也即说明需要什么样的多大的存储。


(3)StorageClass:创建 pv 的模板,定义了创建存储的模板参数。


PV 对象主要状态变更


(1)available --> bound:一个 pv 对象创建出来后,处于 available 状态。pv controller 会为 pvc 对象寻找合适的 pv 对象与之绑定,随即 pv 对象状态变更为 bound。


(2)bound --> released:当与 pv 绑定的 pvc 对象被删除后,如果回收逻辑为 retain,则 pv 对象状态变更为 released。


pvc 对象主要状态变更


(1)pending --> bound:一个 pvc 对象创建出来后,处于 pending 状态。pv controller 会为 pvc 对象寻找合适的 pv 对象与之绑定,随即 pvc 对象状态变更为 bound。


pvc 如何选择合适的 pv 来绑定?


(1)volumeMode 匹配:选择具有与 pvc 相同的 volumeMode 的 pv(Block/FileSystem);


(2)storageclass 匹配:选择具有与 pvc 相同的 stroageclass 名称的 pv;


(3)accessMode 匹配:选择具有与 pvc 相同的 accessMode 的 pv;


(4)size 检查:选择 size 大于等于且最接近 pvc 的 size 声明的 pv。


其他:当一个 PVC 找不到合适的 PV 时,相应的 volume plugin 就会根据 StorageClass 对象的参数配置去做一个动态创建 PV 的操作;而当存在一个合适的 PV 时,就会直接与现有的 PV 绑定,而不再去动态创建。

PV Cotroller 作用

PV Cotroller 全称 PersistentVolume controller,主要负责:


(1)pv、pvc 对象的绑定;


(2)pv、pvc 对象的生命周期管理(如创建/删除底层存储,创建/删除 pv 对象,pv 与 pvc 对象的状态变更)。


注意


(1)当一个 pvc 创建出来后,pv controller 会先寻找现存的合适的 pv 与之绑定,当找不到合适的 pv 时,才会去创建新的 pv。


(2)前面说过,根据源码所在位置,volume plugin 分为 in-tree 与 out-tree 两个部分。


(3)创建/删除底层存储、创建/删除 pv 对象的操作,由 PV controller 调用 volume plugin(in-tree)来完成。如果是 k8s 通过 ceph-csi(csi plugin)来使用 ceph 存储,volume plugin 为 ceph-csi,属于 out-tree,所以创建/删除底层存储、创建/删除 pv 对象的操作由 external-provisioner 来完成。


PV 和 PVC 的源码处理逻辑都在kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.gokubernetes/pkg/controller/volume/persistentvolume/pv_controller.go这两个文件中。

源码分析入口

直接看到PersistentVolumeControllerRun方法,主要就是起了三个 Goroutine,分别运行 3 个方法,下面将一一分析。


// kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.go
func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer ctrl.claimQueue.ShutDown() defer ctrl.volumeQueue.ShutDown()
klog.Infof("Starting persistent volume controller") defer klog.Infof("Shutting down persistent volume controller")
if !cache.WaitForNamedCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) { return }
ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)
go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh) go wait.Until(ctrl.volumeWorker, time.Second, stopCh) go wait.Until(ctrl.claimWorker, time.Second, stopCh)
metrics.Register(ctrl.volumes.store, ctrl.claims)
<-stopCh}
复制代码

1 ctrl.resync

resync 方法十分简单,主要作用是定时循环查询出 pv 和 pvc 列表,然后放入到队列 volumeQueue 和 claimQueue 中,让 volumeWorker 和 claimWorker 进行消费。


//kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.go
func (ctrl *PersistentVolumeController) resync() { klog.V(4).Infof("resyncing PV controller")
pvcs, err := ctrl.claimLister.List(labels.NewSelector()) if err != nil { klog.Warningf("cannot list claims: %s", err) return } for _, pvc := range pvcs { ctrl.enqueueWork(ctrl.claimQueue, pvc) }
pvs, err := ctrl.volumeLister.List(labels.NewSelector()) if err != nil { klog.Warningf("cannot list persistent volumes: %s", err) return } for _, pv := range pvs { ctrl.enqueueWork(ctrl.volumeQueue, pv) }}
复制代码

2 ctrl.volumeWorker

volumeWorker 方法主要是维护 pv 的状态,根据不同的状况对 pv 对象的状态值进行更新。当找不到与 pv 绑定的 pvc 时,会调用 volume plugin 来删除底层存储,并删除 pv 对象(这里包含了 in-tree 与 out-tree 两条逻辑)。


volumeWorker 会不断循环消费 volumeQueue 队列里面的数据,然后获取到相应的 PV 执行 updateVolume 操作。


//kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.go
func (ctrl *PersistentVolumeController) volumeWorker() { workFunc := func() bool { keyObj, quit := ctrl.volumeQueue.Get() if quit { return true } defer ctrl.volumeQueue.Done(keyObj) key := keyObj.(string) klog.V(5).Infof("volumeWorker[%s]", key)
_, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { klog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err) return false } volume, err := ctrl.volumeLister.Get(name) if err == nil { // The volume still exists in informer cache, the event must have // been add/update/sync ctrl.updateVolume(volume) return false } if !errors.IsNotFound(err) { klog.V(2).Infof("error getting volume %q from informer: %v", key, err) return false }
// The volume is not in informer cache, the event must have been // "delete" volumeObj, found, err := ctrl.volumes.store.GetByKey(key) if err != nil { klog.V(2).Infof("error getting volume %q from cache: %v", key, err) return false } if !found { // The controller has already processed the delete event and // deleted the volume from its cache klog.V(2).Infof("deletion of volume %q was already processed", key) return false } volume, ok := volumeObj.(*v1.PersistentVolume) if !ok { klog.Errorf("expected volume, got %+v", volumeObj) return false } ctrl.deleteVolume(volume) return false } for { if quit := workFunc(); quit { klog.Infof("volume worker queue shutting down") return } }}
复制代码

2.1 ctrl.updateVolume

updateVolume 方法会调用 syncVolume 方法,执行核心流程。


func (ctrl *PersistentVolumeController) updateVolume(volume *v1.PersistentVolume) {    // Store the new volume version in the cache and do not process it if this    // is an old version.    //更新缓存    new, err := ctrl.storeVolumeUpdate(volume)    if err != nil {        klog.Errorf("%v", err)    }    if !new {        return    }    //核心方法,根据当前 PV 对象的规格对 PV 和 PVC 进行绑定或者解绑    err = ctrl.syncVolume(volume)    if err != nil {        if errors.IsConflict(err) {            // Version conflict error happens quite often and the controller            // recovers from it easily.            klog.V(3).Infof("could not sync volume %q: %+v", volume.Name, err)        } else {            klog.Errorf("could not sync volume %q: %+v", volume.Name, err)        }    }}
复制代码


ctrl.syncVolume


syncVolume 方法为核心方法,主要调谐更新 pv 的状态:


(1)如果 spec.claimRef 未设置,则是未使用过的 pv,则调用 updateVolumePhase 函数更新状态设置 phase 为 available;


(2)如果 spec.claimRef 不为空,则该 pv 已经与 pvc bound 过了,此时若对应的 pvc 不存在,则更新 pv 状态为 released;


(3)如果 pv 对应的 pvc 被删除了,调用 ctrl.reclaimVolume 根据 pv 的回收策略进行相应操作,如果是 retain,则不做操作,如果是 delete,则调用 volume plugin 来删除底层存储,并删除 pv 对象(当 volume plugin 为 csi 时,将走 out-tree 逻辑,pv controller 不做删除存储与 pv 对象的操作,由 external provisioner 组件来完成该操作)。


func (ctrl *PersistentVolumeController) syncVolume(volume *v1.PersistentVolume) error {    klog.V(4).Infof("synchronizing PersistentVolume[%s]: %s", volume.Name, getVolumeStatusForLogging(volume))     ...    //如果spec.claimRef未设置,则是未使用过的pv,则调用updateVolumePhase函数更新状态设置 phase 为 available    if volume.Spec.ClaimRef == nil {         klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is unused", volume.Name)        if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {             return err        }        return nil    } else /* pv.Spec.ClaimRef != nil */ {         //正在被bound中,更新状态available        if volume.Spec.ClaimRef.UID == "" {             klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is pre-bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))            if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil {                 return err            }            return nil        }        klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))        // Get the PVC by _name_        var claim *v1.PersistentVolumeClaim        //根据 pv 的 claimRef 获得 pvc        claimName := claimrefToClaimKey(volume.Spec.ClaimRef)        obj, found, err := ctrl.claims.GetByKey(claimName)        if err != nil {            return err        }        //如果在队列未发现,可能是volume被删除了,或者失败了,重新同步pvc        if !found && metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) {             if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {                obj, err = ctrl.claimLister.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name)                if err != nil && !apierrors.IsNotFound(err) {                    return err                }                found = !apierrors.IsNotFound(err)                if !found {                    obj, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(context.TODO(), volume.Spec.ClaimRef.Name, metav1.GetOptions{})                    if err != nil && !apierrors.IsNotFound(err) {                        return err                    }                    found = !apierrors.IsNotFound(err)                }            }        }        if !found {            klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))         } else {            var ok bool            claim, ok = obj.(*v1.PersistentVolumeClaim)            if !ok {                return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj)            }            klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s found: %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), getClaimStatusForLogging(claim))        }        if claim != nil && claim.UID != volume.Spec.ClaimRef.UID {             klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has different UID, the old one must have been deleted", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef))            // Treat the volume as bound to a missing claim.            claim = nil        }        //claim可能被删除了,或者pv被删除了        if claim == nil {             if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed {                // Also, log this only once:                klog.V(2).Infof("volume %q is released and reclaim policy %q will be executed", volume.Name, volume.Spec.PersistentVolumeReclaimPolicy)                if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil {                     return err                }            }            //根据persistentVolumeReclaimPolicy配置做相应的处理,Retain 保留/ Delete 删除/ Recycle 回收            if err = ctrl.reclaimVolume(volume); err != nil {                 return err            }            if volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimRetain {                // volume is being retained, it references a claim that does not exist now.                klog.V(4).Infof("PersistentVolume[%s] references a claim %q (%s) that is not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), volume.Spec.ClaimRef.UID)            }            return nil        } else if claim.Spec.VolumeName == "" {            if pvutil.CheckVolumeModeMismatches(&claim.Spec, &volume.Spec) {                 volumeMsg := fmt.Sprintf("Cannot bind PersistentVolume to requested PersistentVolumeClaim %q due to incompatible volumeMode.", claim.Name)                ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, events.VolumeMismatch, volumeMsg)                claimMsg := fmt.Sprintf("Cannot bind PersistentVolume %q to requested PersistentVolumeClaim due to incompatible volumeMode.", volume.Name)                ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, claimMsg)                // Skipping syncClaim                return nil            }
if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) { klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume not bound yet, waiting for syncClaim to fix it", volume.Name) } else { klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume was bound and got unbound (by user?), waiting for syncClaim to fix it", volume.Name) } ctrl.claimQueue.Add(claimToClaimKey(claim)) return nil // 已经绑定更新状态status phase为Bound } else if claim.Spec.VolumeName == volume.Name { // Volume is bound to a claim properly, update status if necessary klog.V(4).Infof("synchronizing PersistentVolume[%s]: all is bound", volume.Name) if _, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil { // Nothing was saved; we will fall back into the same // condition in the next call to this method return err } return nil // PV绑定到PVC上,但是PVC被绑定到其他PV上,重置 } else { // Volume is bound to a claim, but the claim is bound elsewhere if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnDynamicallyProvisioned) && volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimDelete {
if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed { klog.V(2).Infof("dynamically volume %q is released and it will be deleted", volume.Name) if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil { return err } } if err = ctrl.reclaimVolume(volume); err != nil { return err } return nil } else { if metav1.HasAnnotation(volume.ObjectMeta, pvutil.AnnBoundByController) { klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by controller to a claim that is bound to another volume, unbinding", volume.Name) if err = ctrl.unbindVolume(volume); err != nil { return err } return nil } else { klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by user to a claim that is bound to another volume, waiting for the claim to get unbound", volume.Name) if err = ctrl.unbindVolume(volume); err != nil { return err } return nil } } } }}
复制代码


ctrl.reclaimVolume


(1)根据 pv 对象的 annotation:pv.kubernetes.io/provisioned-by配置,决定走 in-tree 或 out-tree 逻辑来删除存储、删除 pv 对象。


(2)out-tree 逻辑:让相应的 plugin 来删除存储,并删除 pv 对象(例:当 volume plugin 为 ceph-csi 时,由 external provisioner 来完成删除存储与 pv 对象的操作)。


(3)in-tree 逻辑:在 ctrl.doDeleteVolume 方法中进行删除存储的操作,在 ctrl.deleteVolumeOperation 方法中进行删除 pv 对象的操作。


// reclaimVolume implements volume.Spec.PersistentVolumeReclaimPolicy and// starts appropriate reclaim action.func (ctrl *PersistentVolumeController) reclaimVolume(volume *v1.PersistentVolume) error {  if migrated := volume.Annotations[pvutil.AnnMigratedTo]; len(migrated) > 0 {    // PV is Migrated. The PV controller should stand down and the external    // provisioner will handle this PV    return nil  }  switch volume.Spec.PersistentVolumeReclaimPolicy {  case v1.PersistentVolumeReclaimRetain:    klog.V(4).Infof("reclaimVolume[%s]: policy is Retain, nothing to do", volume.Name)
case v1.PersistentVolumeReclaimRecycle: klog.V(4).Infof("reclaimVolume[%s]: policy is Recycle", volume.Name) opName := fmt.Sprintf("recycle-%s[%s]", volume.Name, string(volume.UID)) ctrl.scheduleOperation(opName, func() error { ctrl.recycleVolumeOperation(volume) return nil })
case v1.PersistentVolumeReclaimDelete: klog.V(4).Infof("reclaimVolume[%s]: policy is Delete", volume.Name) opName := fmt.Sprintf("delete-%s[%s]", volume.Name, string(volume.UID)) // create a start timestamp entry in cache for deletion operation if no one exists with // key = volume.Name, pluginName = provisionerName, operation = "delete" ctrl.operationTimestamps.AddIfNotExist(volume.Name, ctrl.getProvisionerNameFromVolume(volume), "delete") ctrl.scheduleOperation(opName, func() error { _, err := ctrl.deleteVolumeOperation(volume) if err != nil { // only report error count to "volume_operation_total_errors" // latency reporting will happen when the volume get finally // deleted and a volume deleted event is captured metrics.RecordMetric(volume.Name, &ctrl.operationTimestamps, err) } return err })
default: // Unknown PersistentVolumeReclaimPolicy if _, err := ctrl.updateVolumePhaseWithEvent(volume, v1.VolumeFailed, v1.EventTypeWarning, "VolumeUnknownReclaimPolicy", "Volume has unrecognized PersistentVolumeReclaimPolicy"); err != nil { return err } } return nil}

// deleteVolumeOperation deletes a volume. This method is running in standalone// goroutine and already has all necessary locks.func (ctrl *PersistentVolumeController) deleteVolumeOperation(volume *v1.PersistentVolume) (string, error) { klog.V(4).Infof("deleteVolumeOperation [%s] started", volume.Name)
// This method may have been waiting for a volume lock for some time. // Previous deleteVolumeOperation might just have saved an updated version, so // read current volume state now. newVolume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{}) if err != nil { klog.V(3).Infof("error reading persistent volume %q: %v", volume.Name, err) return "", nil } needsReclaim, err := ctrl.isVolumeReleased(newVolume) if err != nil { klog.V(3).Infof("error reading claim for volume %q: %v", volume.Name, err) return "", nil } if !needsReclaim { klog.V(3).Infof("volume %q no longer needs deletion, skipping", volume.Name) return "", nil }
pluginName, deleted, err := ctrl.doDeleteVolume(volume) if err != nil { // Delete failed, update the volume and emit an event. klog.V(3).Infof("deletion of volume %q failed: %v", volume.Name, err) if volerr.IsDeletedVolumeInUse(err) { // The plugin needs more time, don't mark the volume as Failed // and send Normal event only ctrl.eventRecorder.Event(volume, v1.EventTypeNormal, events.VolumeDelete, err.Error()) } else { // The plugin failed, mark the volume as Failed and send Warning // event if _, err := ctrl.updateVolumePhaseWithEvent(volume, v1.VolumeFailed, v1.EventTypeWarning, events.VolumeFailedDelete, err.Error()); err != nil { klog.V(4).Infof("deleteVolumeOperation [%s]: failed to mark volume as failed: %v", volume.Name, err) // Save failed, retry on the next deletion attempt return pluginName, err } }
// Despite the volume being Failed, the controller will retry deleting // the volume in every syncVolume() call. return pluginName, err } if !deleted { // The volume waits for deletion by an external plugin. Do nothing. return pluginName, nil }
klog.V(4).Infof("deleteVolumeOperation [%s]: success", volume.Name) // Delete the volume if err = ctrl.kubeClient.CoreV1().PersistentVolumes().Delete(volume.Name, nil); err != nil { // Oops, could not delete the volume and therefore the controller will // try to delete the volume again on next update. We _could_ maintain a // cache of "recently deleted volumes" and avoid unnecessary deletion, // this is left out as future optimization. klog.V(3).Infof("failed to delete volume %q from database: %v", volume.Name, err) return pluginName, nil } return pluginName, nil}
复制代码

3 ctrl.claimWorker

claimWorker 主要是维护 pvc 的状态,根据不同的状况对 pvc 对象的状态值进行更新,主要是给 pvc 找到合适的 pv 做绑定 bound 操作;当找不到合适的 pv 时,会调用 volume plugin 来创建底层存储,并创建 pv 对象(这里包含了 in-tree 和 out-tree 两条逻辑)。


claimWorker 会不断循环消费 claimQueue 队列里面的数据,然后获取到相应的 PVC 执行 updateClaim 方法做进一步处理。


//kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.go
func (ctrl *PersistentVolumeController) claimWorker() { workFunc := func() bool { keyObj, quit := ctrl.claimQueue.Get() if quit { return true } defer ctrl.claimQueue.Done(keyObj) key := keyObj.(string) klog.V(5).Infof("claimWorker[%s]", key)
namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { klog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err) return false } claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name) if err == nil { // The claim still exists in informer cache, the event must have // been add/update/sync ctrl.updateClaim(claim) return false } if !errors.IsNotFound(err) { klog.V(2).Infof("error getting claim %q from informer: %v", key, err) return false }
// The claim is not in informer cache, the event must have been "delete" claimObj, found, err := ctrl.claims.GetByKey(key) if err != nil { klog.V(2).Infof("error getting claim %q from cache: %v", key, err) return false } if !found { // The controller has already processed the delete event and // deleted the claim from its cache klog.V(2).Infof("deletion of claim %q was already processed", key) return false } claim, ok := claimObj.(*v1.PersistentVolumeClaim) if !ok { klog.Errorf("expected claim, got %+v", claimObj) return false } ctrl.deleteClaim(claim) return false } for { if quit := workFunc(); quit { klog.Infof("claim worker queue shutting down") return } }}
复制代码

3.1 ctrl.updateClaim

主要调用 ctrl.syncClaim 做进一步处理。


//kubernetes/pkg/controller/volume/persistentvolume/pv_controller_base.go
// updateClaim runs in worker thread and handles "claim added",// "claim updated" and "periodic sync" events.func (ctrl *PersistentVolumeController) updateClaim(claim *v1.PersistentVolumeClaim) { // Store the new claim version in the cache and do not process it if this is // an old version. new, err := ctrl.storeClaimUpdate(claim) if err != nil { klog.Errorf("%v", err) } if !new { return } err = ctrl.syncClaim(claim) if err != nil { if errors.IsConflict(err) { // Version conflict error happens quite often and the controller // recovers from it easily. klog.V(3).Infof("could not sync claim %q: %+v", claimToClaimKey(claim), err) } else { klog.Errorf("could not sync volume %q: %+v", claimToClaimKey(claim), err) } }}
复制代码


3.1.1 ctrl.syncClaim


主要逻辑:


(1)检查 pvc 中 key 为"pv.kubernetes.io/bind-completed"的 annotation,有则说明该 pvc 已经完成了绑定操作;


(2)没有该 annotation,则调用 ctrl.syncUnboundClaim,给 Unbound 的 pvc,找到对应的 PV,执行绑定操作;


(3)有该 annotation,则调用 ctrl.syncBoundClaim,看是否需要做修复逻辑。


//kubernetes/pkg/controller/volume/persistentvolume/pv_controller.go
func (ctrl *PersistentVolumeController) syncClaim(claim *v1.PersistentVolumeClaim) error { klog.V(4).Infof("synchronizing PersistentVolumeClaim[%s]: %s", claimToClaimKey(claim), getClaimStatusForLogging(claim))
// Set correct "migrated-to" annotations on PVC and update in API server if // necessary newClaim, err := ctrl.updateClaimMigrationAnnotations(claim) if err != nil { // Nothing was saved; we will fall back into the same // condition in the next call to this method return err } claim = newClaim
if !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBindCompleted) { return ctrl.syncUnboundClaim(claim) } else { return ctrl.syncBoundClaim(claim) }}
复制代码


ctrl.syncUnboundClaim 主要逻辑:


(1)给 Unbound 的 pvc,找到对应的 PV,调用 bind 执行绑定操作;


(2)当找不到合适的 pv 时,调用 ctrl.provisionClaim 来做进一步操作。


ctrl.provisionClaim 主要逻辑:


(1)获取 pvc 对应的 storageclass 对象,根据 volume plugin 的配置,决定走 in-tree 或 out-tree 逻辑来创建存储、创建 pv 对象。


(2)out-tree 逻辑:调用 ctrl.provisionClaimOperationExternal 来给 pvc 对象设置 annotation:volume.beta.kubernetes.io/storage-provisioner:{plugin name},让相应的 plugin 来创建存储,并创建 pv 对象(例:当 volume plugin 为 ceph-csi 时,由 external provisioner 来完成创建存储与 pv 对象的操作)。


(3)in-tree 逻辑:调用 ctrl.provisionClaimOperation 来创建存储,并创建 pv 对象。


//kubernetes/pkg/controller/volume/persistentvolume/pv_controller.go
func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVolumeClaim) error { // This is a new PVC that has not completed binding // OBSERVATION: pvc is "Pending" if claim.Spec.VolumeName == "" { // User did not care which PV they get. delayBinding, err := pvutil.IsDelayBindingMode(claim, ctrl.classLister) if err != nil { return err }
// [Unit test set 1] volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding) if err != nil { klog.V(2).Infof("synchronizing unbound PersistentVolumeClaim[%s]: Error finding PV for claim: %v", claimToClaimKey(claim), err) return fmt.Errorf("Error finding PV for claim %q: %v", claimToClaimKey(claim), err) } if volume == nil { klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: no volume found", claimToClaimKey(claim)) // No PV could be found // OBSERVATION: pvc is "Pending", will retry switch { case delayBinding && !pvutil.IsDelayBindingProvisioning(claim): ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.WaitForFirstConsumer, "waiting for first consumer to be created before binding") case v1helper.GetPersistentVolumeClaimClass(claim) != "": if err = ctrl.provisionClaim(claim); err != nil { return err } return nil default: ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.FailedBinding, "no persistent volumes available for this claim and no storage class is set") }
// Mark the claim as Pending and try to find a match in the next // periodic syncClaim if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil { return err } return nil } else /* pv != nil */ { // Found a PV for this claim // OBSERVATION: pvc is "Pending", pv is "Available" claimKey := claimToClaimKey(claim) klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimKey, volume.Name, getVolumeStatusForLogging(volume)) if err = ctrl.bind(volume, claim); err != nil { // On any error saving the volume or the claim, subsequent // syncClaim will finish the binding. // record count error for provision if exists // timestamp entry will remain in cache until a success binding has happened metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, err) return err } // OBSERVATION: claim is "Bound", pv is "Bound" // if exists a timestamp entry in cache, record end to end provision latency and clean up cache // End of the provision + binding operation lifecycle, cache will be cleaned by "RecordMetric" // [Unit test 12-1, 12-2, 12-4] metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, nil) return nil } } else /* pvc.Spec.VolumeName != nil */ { // [Unit test set 2] // User asked for a specific PV. klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested", claimToClaimKey(claim), claim.Spec.VolumeName) obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName) if err != nil { return err } if !found { // User asked for a PV that does not exist. // OBSERVATION: pvc is "Pending" // Retry later. klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and not found, will try again next time", claimToClaimKey(claim), claim.Spec.VolumeName) if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil { return err } return nil } else { volume, ok := obj.(*v1.PersistentVolume) if !ok { return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %+v", claim.Spec.VolumeName, obj) } klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q requested and found: %s", claimToClaimKey(claim), claim.Spec.VolumeName, getVolumeStatusForLogging(volume)) if volume.Spec.ClaimRef == nil { // User asked for a PV that is not claimed // OBSERVATION: pvc is "Pending", pv is "Available" klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume is unbound, binding", claimToClaimKey(claim)) if err = checkVolumeSatisfyClaim(volume, claim); err != nil { klog.V(4).Infof("Can't bind the claim to volume %q: %v", volume.Name, err) // send an event msg := fmt.Sprintf("Cannot bind to requested volume %q: %s", volume.Name, err) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, msg) // volume does not satisfy the requirements of the claim if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil { return err } } else if err = ctrl.bind(volume, claim); err != nil { // On any error saving the volume or the claim, subsequent // syncClaim will finish the binding. return err } // OBSERVATION: pvc is "Bound", pv is "Bound" return nil } else if pvutil.IsVolumeBoundToClaim(volume, claim) { // User asked for a PV that is claimed by this PVC // OBSERVATION: pvc is "Pending", pv is "Bound" klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim))
// Finish the volume binding by adding claim UID. if err = ctrl.bind(volume, claim); err != nil { return err } // OBSERVATION: pvc is "Bound", pv is "Bound" return nil } else { // User asked for a PV that is claimed by someone else // OBSERVATION: pvc is "Pending", pv is "Bound" if !metav1.HasAnnotation(claim.ObjectMeta, pvutil.AnnBoundByController) { klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim by user, will retry later", claimToClaimKey(claim)) // User asked for a specific PV, retry later if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil { return err } return nil } else { // This should never happen because someone had to remove // AnnBindCompleted annotation on the claim. klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound to different claim %q by controller, THIS SHOULD NEVER HAPPEN", claimToClaimKey(claim), claimrefToClaimKey(volume.Spec.ClaimRef)) return fmt.Errorf("Invalid binding of claim %q to volume %q: volume already claimed by %q", claimToClaimKey(claim), claim.Spec.VolumeName, claimrefToClaimKey(volume.Spec.ClaimRef)) } } } }}
// provisionClaim starts new asynchronous operation to provision a claim if// provisioning is enabled.func (ctrl *PersistentVolumeController) provisionClaim(claim *v1.PersistentVolumeClaim) error { if !ctrl.enableDynamicProvisioning { return nil } klog.V(4).Infof("provisionClaim[%s]: started", claimToClaimKey(claim)) opName := fmt.Sprintf("provision-%s[%s]", claimToClaimKey(claim), string(claim.UID)) plugin, storageClass, err := ctrl.findProvisionablePlugin(claim) // findProvisionablePlugin does not return err for external provisioners if err != nil { ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, err.Error()) klog.Errorf("error finding provisioning plugin for claim %s: %v", claimToClaimKey(claim), err) // failed to find the requested provisioning plugin, directly return err for now. // controller will retry the provisioning in every syncUnboundClaim() call // retain the original behavior of returning nil from provisionClaim call return nil } ctrl.scheduleOperation(opName, func() error { // create a start timestamp entry in cache for provision operation if no one exists with // key = claimKey, pluginName = provisionerName, operation = "provision" claimKey := claimToClaimKey(claim) ctrl.operationTimestamps.AddIfNotExist(claimKey, ctrl.getProvisionerName(plugin, storageClass), "provision") var err error if plugin == nil { _, err = ctrl.provisionClaimOperationExternal(claim, storageClass) } else { _, err = ctrl.provisionClaimOperation(claim, plugin, storageClass) } // if error happened, record an error count metric // timestamp entry will remain in cache until a success binding has happened if err != nil { metrics.RecordMetric(claimKey, &ctrl.operationTimestamps, err) } return err }) return nil}
复制代码


至此,pv controller 的分析已经完毕,下面进行一下简单的总结。

总结

PV Cotroller 全称 PersistentVolume controller,主要负责 pv、pvc 对象的绑定与 pv、pvc 对象的生命周期管理(如创建/删除底层存储,创建/删除 pv 对象,pv 与 pvc 对象的状态变更)。

创建存储、pv 对象

当一个 pvc 对象创建出来后,pv controller 会为其寻找合适的 pv 进行绑定,当一个 PVC 找不到合适的 PV 时,相应的 volume plugin 就会根据 StorageClass 对象的参数配置去做一个动态创建 PV 的操作。


根据 pvc 对应的 storageclass 对象中 volume plugin 的配置,决定走 in-tree 或 out-tree 逻辑来创建存储、创建 pv 对象:


(1)out-tree 逻辑:pv controller 来给 pvc 对象设置 annotation:volume.beta.kubernetes.io/storage-provisioner:{plugin name},让相应的 plugin 来创建存储,并创建 pv 对象(例:当 volume plugin 为 ceph-csi 时,由 external provisioner 来完成创建存储与 pv 对象的操作)。


(2)in-tree 逻辑:调用内置的 volume plungin 来创建存储,并创建 pv 对象。

删除存储、pv 对象

当与 pv 绑定的 pvc 被删除后,如果 pv 配置的回收策略为 retain,则将 pv 的状态变更为 released,如果 pv 的回收策略为 delete,则调用 volume plugin 来做底层存储以及 pv 对象的删除操作。


根据 pv 对象的 annotation:pv.kubernetes.io/provisioned-by配置,决定走 in-tree 或 out-tree 逻辑来删除存储、删除 pv 对象:


(1)out-tree 逻辑:让相应的 plugin 来删除存储,并删除 pv 对象(例:当 volume plugin 为 ceph-csi 时,由 external provisioner 来完成删除存储与 pv 对象的操作)。


(2)in-tree 逻辑:pv controller 调用内置的 volume plungin 来删除存储,并创建 pv 对象。

发布于: 2021 年 05 月 22 日阅读数: 24
用户头像

良凯尔

关注

热爱的力量 2020.01.10 加入

kubernetes开发者

评论

发布
暂无评论
kube-controller-manager之PV Cotroller源码分析