写点什么

external-provisioner 源码分析(1)- 主体处理逻辑分析

用户头像
良凯尔
关注
发布于: 2021 年 04 月 17 日
external-provisioner源码分析(1)-主体处理逻辑分析

kubernetes ceph-csi 分析 - 目录导航https://xie.infoq.cn/article/4b1d3e32f124307a49cd9c1e3

概述

接下来将对 external-provisioner 组件进行源码分析。


在 external-provisioner 组件中,rbd 与 cephfs 共用一套处理逻辑,也即同一套代码,同时适用于 rbd 存储与 cephfs 存储。


external-provisioner 组件的源码分析分为三部分:


(1)主体处理逻辑分析;


(2)main 方法与 Leader 选举分析;


(3)组件启动参数分析。


基于 tag v1.6.0


https://github.com/kubernetes-csi/external-provisioner/releases/tag/v1.6.0

external-provisioner 作用介绍

(1)create pvc 时,external-provisioner 参与存储资源与 pv 对象的创建。external-provisioner 组件监听到 pvc 创建事件后,负责拼接请求,调用 ceph-csi 组件的 CreateVolume 方法来创建存储,创建存储成功后,创建 pv 对象;


(2)delete pvc 时,external-provisioner 参与存储资源与 pv 对象的删除。当 pvc 被删除时,pv controller 会将其绑定的 pv 对象状态由 bound 更新为 release,external-provisioner 监听到 pv 更新事件后,调用 ceph-csi 的 DeleteVolume 方法来删除存储,并删除 pv 对象。

external-provisioner 源码分析(1)-主体处理逻辑分析

external-provisioner 组件中,主要的业务处理逻辑都在 provisionController 中了,所以对 external-provisioner 组件的分析,先从 provisionController 入手。


provisionController 主要负责处理 claimQueue(也即处理 pvc 对象的新增与更新事件),根据需要调用 ceph-csi 组件的 CreateVolume 方法来创建存储,并创建 pv 对象;与处理 volumeQueue(也即处理 pv 对象的新增与更新事件),根据 pv 的状态以及回收策略决定是否调用 ceph-csi 组件的 DeleteVolume 方法来删除存储,并删除 pv 对象。


后续会对 claimQueue 与 volumeQueue 进行分析。


main 方法中调用了provisionController.Run(wait.NeverStop),作为 provisionController 的分析入口。

provisionController.Run()

provisionController.Run()中定义了 run 方法并执行。主要关注 run 方法中的ctrl.runClaimWorkerctrl.runVolumeWorker,这两个方法负责处理主体逻辑。


// Run starts all of this controller's control loopsfunc (ctrl *ProvisionController) Run(_ <-chan struct{}) {  // TODO: arg is as of 1.12 unused. Nothing can ever be cancelled. Should  // accept a context instead and use it instead of context.TODO(), but would  // break API. Not urgent: realistically, users are simply passing in  // wait.NeverStop() anyway.
run := func(ctx context.Context) { glog.Infof("Starting provisioner controller %s!", ctrl.component) defer utilruntime.HandleCrash() defer ctrl.claimQueue.ShutDown() defer ctrl.volumeQueue.ShutDown()
ctrl.hasRunLock.Lock() ctrl.hasRun = true ctrl.hasRunLock.Unlock() if ctrl.metricsPort > 0 { prometheus.MustRegister([]prometheus.Collector{ metrics.PersistentVolumeClaimProvisionTotal, metrics.PersistentVolumeClaimProvisionFailedTotal, metrics.PersistentVolumeClaimProvisionDurationSeconds, metrics.PersistentVolumeDeleteTotal, metrics.PersistentVolumeDeleteFailedTotal, metrics.PersistentVolumeDeleteDurationSeconds, }...) http.Handle(ctrl.metricsPath, promhttp.Handler()) address := net.JoinHostPort(ctrl.metricsAddress, strconv.FormatInt(int64(ctrl.metricsPort), 10)) glog.Infof("Starting metrics server at %s\n", address) go wait.Forever(func() { err := http.ListenAndServe(address, nil) if err != nil { glog.Errorf("Failed to listen on %s: %v", address, err) } }, 5*time.Second) }
// If a external SharedInformer has been passed in, this controller // should not call Run again if !ctrl.customClaimInformer { go ctrl.claimInformer.Run(ctx.Done()) } if !ctrl.customVolumeInformer { go ctrl.volumeInformer.Run(ctx.Done()) } if !ctrl.customClassInformer { go ctrl.classInformer.Run(ctx.Done()) }
if !cache.WaitForCacheSync(ctx.Done(), ctrl.claimInformer.HasSynced, ctrl.volumeInformer.HasSynced, ctrl.classInformer.HasSynced) { return } // 两个worker跑多个goroutine for i := 0; i < ctrl.threadiness; i++ { go wait.Until(ctrl.runClaimWorker, time.Second, context.TODO().Done()) go wait.Until(ctrl.runVolumeWorker, time.Second, context.TODO().Done()) }
glog.Infof("Started provisioner controller %s!", ctrl.component)
select {} }
go ctrl.volumeStore.Run(context.TODO(), DefaultThreadiness) // 选主相关 if ctrl.leaderElection { rl, err := resourcelock.New("endpoints", ctrl.leaderElectionNamespace, strings.Replace(ctrl.provisionerName, "/", "-", -1), ctrl.client.CoreV1(), nil, resourcelock.ResourceLockConfig{ Identity: ctrl.id, EventRecorder: ctrl.eventRecorder, }) if err != nil { glog.Fatalf("Error creating lock: %v", err) }
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: ctrl.leaseDuration, RenewDeadline: ctrl.renewDeadline, RetryPeriod: ctrl.retryPeriod, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: run, OnStoppedLeading: func() { glog.Fatalf("leaderelection lost") }, }, }) panic("unreachable") } else { run(context.TODO()) }}
复制代码


接下来将分别对 run 方法中的ctrl.runClaimWorkerctrl.runVolumeWorker进行分析。

1.ctrl.runClaimWorker

根据 threadiness 的个数,起相应个数的 goroutine,运行 ctrl.runClaimWorker。


主要负责处理 claimQueue,处理 pvc 对象的新增与更新事件,根据需要调用 csi 组件的 CreateVolume 方法来创建存储,并创建 pv 对象。


        for i := 0; i < ctrl.threadiness; i++ {      go wait.Until(ctrl.runClaimWorker, time.Second, context.TODO().Done())      go wait.Until(ctrl.runVolumeWorker, time.Second, context.TODO().Done())    }
复制代码


// vendor/sigs.k8s.io/sig-storage-lib-external-provisioner/v5/controller/controller.go
func (ctrl *ProvisionController) runClaimWorker() { // 无限循环processNextClaimWorkItem for ctrl.processNextClaimWorkItem() { }}
复制代码


调用链:main() --> provisionController.Run() --> ctrl.runClaimWorker() --> ctrl.processNextClaimWorkItem() --> ctrl.syncClaimHandler() --> ctrl.syncClaim() --> ctrl.provisionClaimOperation() --> ctrl.provisioner.Provision()

1.1 ctrl.processNextClaimWorkItem

主要逻辑:


(1)从 claimQueue 中获取 pvc;


(2)调 ctrl.syncClaimHandler 做进一步处理;


(3)处理成功后,清理该 pvc 的 rateLimiter,并将 pvc 从 claimsInProgress 中移除;


(4)处理失败后,会进行一定次数的重试,即将该 pvc 添加 rateLimiter;


(5)最后,无论调 ctrl.syncClaimHandler 成功与否,将该 pvc 从 claimQueue 中移除。


// Map UID -> *PVC with all claims that may be provisioned in the background.claimsInProgress sync.Map
// processNextClaimWorkItem processes items from claimQueuefunc (ctrl *ProvisionController) processNextClaimWorkItem() bool { // 从claimQueue中获取pvc obj, shutdown := ctrl.claimQueue.Get()
if shutdown { return false }
err := func(obj interface{}) error { // 最后,无论调ctrl.syncClaimHandler成功与否,将该pvc从claimQueue中移除 defer ctrl.claimQueue.Done(obj) var key string var ok bool if key, ok = obj.(string); !ok { ctrl.claimQueue.Forget(obj) return fmt.Errorf("expected string in workqueue but got %#v", obj) } // 调ctrl.syncClaimHandler做进一步处理 if err := ctrl.syncClaimHandler(key); err != nil { // 处理失败后,会进行一定次数的重试,即将该pvc添加rateLimiter if ctrl.failedProvisionThreshold == 0 { glog.Warningf("Retrying syncing claim %q, failure %v", key, ctrl.claimQueue.NumRequeues(obj)) ctrl.claimQueue.AddRateLimited(obj) } else if ctrl.claimQueue.NumRequeues(obj) < ctrl.failedProvisionThreshold { glog.Warningf("Retrying syncing claim %q because failures %v < threshold %v", key, ctrl.claimQueue.NumRequeues(obj), ctrl.failedProvisionThreshold) ctrl.claimQueue.AddRateLimited(obj) } else { glog.Errorf("Giving up syncing claim %q because failures %v >= threshold %v", key, ctrl.claimQueue.NumRequeues(obj), ctrl.failedProvisionThreshold) glog.V(2).Infof("Removing PVC %s from claims in progress", key) ctrl.claimsInProgress.Delete(key) // This can leak a volume that's being provisioned in the background! // Done but do not Forget: it will not be in the queue but NumRequeues // will be saved until the obj is deleted from kubernetes } return fmt.Errorf("error syncing claim %q: %s", key, err.Error()) } // 处理成功后,清理该pvc的rateLimiter,并将pvc从claimsInProgress中移除 ctrl.claimQueue.Forget(obj) // Silently remove the PVC from list of volumes in progress. The provisioning either succeeded // or the PVC was ignored by this provisioner. ctrl.claimsInProgress.Delete(key) return nil }(obj)
if err != nil { utilruntime.HandleError(err) return true }
return true}
复制代码


下面先分析一下 claimQueue 的相关方法:


(1)Done:从 claimQueue 中删除


// Done marks item as done processing, and if it has been marked as dirty again// while it was being processed, it will be re-added to the queue for// re-processing.func (q *Type) Done(item interface{}) {  q.cond.L.Lock()  defer q.cond.L.Unlock()
q.metrics.done(item)
q.processing.delete(item) if q.dirty.has(item) { q.queue = append(q.queue, item) q.cond.Signal() }}
复制代码


(2)Forget:仅清理 rateLimiter


func (q *rateLimitingType) Forget(item interface{}) {  q.rateLimiter.Forget(item)}
复制代码


(3)AddRateLimited:在速率限制器表示可以之后向 claimQueue 重新加入


// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's okfunc (q *rateLimitingType) AddRateLimited(item interface{}) {  q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))}
// AddAfter adds the given item to the work queue after the given delayfunc (q *delayingType) AddAfter(item interface{}, duration time.Duration) { // don't add if we're already shutting down if q.ShuttingDown() { return }
q.metrics.retry()
// immediately add things with no delay if duration <= 0 { q.Add(item) return }
select { case <-q.stopCh: // unblock if ShutDown() is called case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: }}
复制代码

1.1.1 ctrl.syncClaimHandler

最主要是调 ctrl.syncClaim


// syncClaimHandler gets the claim from informer's cache then calls syncClaim. A non-nil error triggers requeuing of the claim.func (ctrl *ProvisionController) syncClaimHandler(key string) error {  objs, err := ctrl.claimsIndexer.ByIndex(uidIndex, key)  if err != nil {    return err  }  var claimObj interface{}  if len(objs) > 0 {    claimObj = objs[0]  } else {    obj, found := ctrl.claimsInProgress.Load(key)    if !found {      utilruntime.HandleError(fmt.Errorf("claim %q in work queue no longer exists", key))      return nil    }    claimObj = obj  }  return ctrl.syncClaim(claimObj)}
复制代码


1.1.1.1 ctrl.syncClaim


主要逻辑:


(1)先调用 ctrl.shouldProvision 判断是否需要 provision 操作;


(2)调 ctrl.provisionClaimOperation 做进一步操作。


// syncClaim checks if the claim should have a volume provisioned for it and// provisions one if so. Returns an error if the claim is to be requeued.func (ctrl *ProvisionController) syncClaim(obj interface{}) error {  claim, ok := obj.(*v1.PersistentVolumeClaim)  if !ok {    return fmt.Errorf("expected claim but got %+v", obj)  }
should, err := ctrl.shouldProvision(claim) if err != nil { ctrl.updateProvisionStats(claim, err, time.Time{}) return err } else if should { startTime := time.Now()
status, err := ctrl.provisionClaimOperation(claim) ctrl.updateProvisionStats(claim, err, startTime) if err == nil || status == ProvisioningFinished { // Provisioning is 100% finished / not in progress. switch err { case nil: glog.V(5).Infof("Claim processing succeeded, removing PVC %s from claims in progress", claim.UID) case errStopProvision: glog.V(5).Infof("Stop provisioning, removing PVC %s from claims in progress", claim.UID) // Our caller would requeue if we pass on this special error; return nil instead. err = nil default: glog.V(2).Infof("Final error received, removing PVC %s from claims in progress", claim.UID) } ctrl.claimsInProgress.Delete(string(claim.UID)) return err } if status == ProvisioningInBackground { // Provisioning is in progress in background. glog.V(2).Infof("Temporary error received, adding PVC %s to claims in progress", claim.UID) ctrl.claimsInProgress.Store(string(claim.UID), claim) } else { // status == ProvisioningNoChange. // Don't change claimsInProgress: // - the claim is already there if previous status was ProvisioningInBackground. // - the claim is not there if if previous status was ProvisioningFinished. } return err } return nil}
复制代码


ctrl.shouldProvision


该方法主要判断一个 pvc 对象是否需要进行 provision 操作,主要逻辑:


(1)当 claim.Spec.VolumeName 不为空时,不需要进行 provision 操作,返回 false;


(2)调 qualifier.ShouldProvision 判断存储 driver 是否支持 provision 操作;


(3)如果是 Kubernetes 1.5 及以上版本,则从 pvc 的 annotation 中获取 driver 名称,并做相应校验;


// shouldProvision returns whether a claim should have a volume provisioned for// it, i.e. whether a Provision is "desired"func (ctrl *ProvisionController) shouldProvision(claim *v1.PersistentVolumeClaim) (bool, error) {  if claim.Spec.VolumeName != "" {    return false, nil  }
if qualifier, ok := ctrl.provisioner.(Qualifier); ok { if !qualifier.ShouldProvision(claim) { return false, nil } }
// Kubernetes 1.5 provisioning with annStorageProvisioner if ctrl.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.5.0")) { if provisioner, found := claim.Annotations[annStorageProvisioner]; found { if ctrl.knownProvisioner(provisioner) { claimClass := util.GetPersistentVolumeClaimClass(claim) class, err := ctrl.getStorageClass(claimClass) if err != nil { return false, err } if class.VolumeBindingMode != nil && *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer { // When claim is in delay binding mode, annSelectedNode is // required to provision volume. // Though PV controller set annStorageProvisioner only when // annSelectedNode is set, but provisioner may remove // annSelectedNode to notify scheduler to reschedule again. if selectedNode, ok := claim.Annotations[annSelectedNode]; ok && selectedNode != "" { return true, nil } return false, nil } return true, nil } } } else { // Kubernetes 1.4 provisioning, evaluating class.Provisioner claimClass := util.GetPersistentVolumeClaimClass(claim) class, err := ctrl.getStorageClass(claimClass) if err != nil { glog.Errorf("Error getting claim %q's StorageClass's fields: %v", claimToClaimKey(claim), err) return false, err } if class.Provisioner != ctrl.provisionerName { return false, nil }
return true, nil }
return false, nil}
复制代码


ctrl.provisionClaimOperation


主要逻辑:


(1)从 pvc 对象中获取 storageclass 的名称;


(2)检查 pv 是否已经存在;


(3)从 pvc 对象中获取信息构造 claimRef;


(4)检查是否支持动态创建存储;


(5)获取 storageclass 对象;


(6)检查 storageclass 对象中的 provisioner 是否已注册;


(7)构造 options 结构体;


(8)开始调用 provision 方法,返回 pv 对象结构体;


(9)pv 对象结构体额外信息添加;


(10)创建 pv 到 apiserver。


// provisionClaimOperation attempts to provision a volume for the given claim.// Returns nil error only when the volume was provisioned (in which case it also returns ProvisioningFinished),// a normal error when the volume was not provisioned and provisioning should be retried (requeue the claim),// or the special errStopProvision when provisioning was impossible and no further attempts to provision should be tried.func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVolumeClaim) (ProvisioningState, error) {  // Most code here is identical to that found in controller.go of kube's PV controller...    // 从pvc对象中获取storageclass的名称  claimClass := util.GetPersistentVolumeClaimClass(claim)  operation := fmt.Sprintf("provision %q class %q", claimToClaimKey(claim), claimClass)  glog.Info(logOperation(operation, "started"))
// 检查pv是否已经存在 pvName := ctrl.getProvisionedVolumeNameForClaim(claim) volume, err := ctrl.client.CoreV1().PersistentVolumes().Get(pvName, metav1.GetOptions{}) if err == nil && volume != nil { // Volume has been already provisioned, nothing to do. glog.Info(logOperation(operation, "persistentvolume %q already exists, skipping", pvName)) return ProvisioningFinished, errStopProvision }
// 从pvc对象中获取信息构造claimRef claimRef, err := ref.GetReference(scheme.Scheme, claim) if err != nil { glog.Error(logOperation(operation, "unexpected error getting claim reference: %v", err)) return ProvisioningNoChange, err }
// 检查是否支持动态创建存储 if err = ctrl.canProvision(claim); err != nil { ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error()) glog.Error(logOperation(operation, "failed to provision volume: %v", err)) return ProvisioningFinished, errStopProvision }
// 获取storageclass对象 class, err := ctrl.getStorageClass(claimClass) if err != nil { glog.Error(logOperation(operation, "error getting claim's StorageClass's fields: %v", err)) return ProvisioningFinished, err } // 检查storageclass对象中的provisioner是否已注册 if !ctrl.knownProvisioner(class.Provisioner) { // class.Provisioner has either changed since shouldProvision() or // annDynamicallyProvisioned contains different provisioner than // class.Provisioner. glog.Error(logOperation(operation, "unknown provisioner %q requested in claim's StorageClass", class.Provisioner)) return ProvisioningFinished, errStopProvision } // 指定节点相关操作 var selectedNode *v1.Node if ctrl.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.11.0")) { // Get SelectedNode if nodeName, ok := getString(claim.Annotations, annSelectedNode, annAlphaSelectedNode); ok { selectedNode, err = ctrl.client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) // TODO (verult) cache Nodes if err != nil { err = fmt.Errorf("failed to get target node: %v", err) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error()) return ProvisioningNoChange, err } } } // 构造options结构体 options := ProvisionOptions{ StorageClass: class, PVName: pvName, PVC: claim, SelectedNode: selectedNode, }
ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, "Provisioning", fmt.Sprintf("External provisioner is provisioning volume for claim %q", claimToClaimKey(claim))) // 开始调用provision方法,返回pv对象结构体 result := ProvisioningFinished if p, ok := ctrl.provisioner.(ProvisionerExt); ok { volume, result, err = p.ProvisionExt(options) } else { volume, err = ctrl.provisioner.Provision(options) } if err != nil { if ierr, ok := err.(*IgnoredError); ok { // Provision ignored, do nothing and hope another provisioner will provision it. glog.Info(logOperation(operation, "volume provision ignored: %v", ierr)) return ProvisioningFinished, errStopProvision } err = fmt.Errorf("failed to provision volume with StorageClass %q: %v", claimClass, err) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", err.Error()) if _, ok := claim.Annotations[annSelectedNode]; ok && result == ProvisioningReschedule { // For dynamic PV provisioning with delayed binding, the provisioner may fail // because the node is wrong (permanent error) or currently unusable (not enough // capacity). If the provisioner wants to give up scheduling with the currently // selected node, then it can ask for that by returning ProvisioningReschedule // as state. // // `selectedNode` must be removed to notify scheduler to schedule again. if errLabel := ctrl.rescheduleProvisioning(claim); errLabel != nil { glog.Info(logOperation(operation, "volume rescheduling failed: %v", errLabel)) // If unsetting that label fails in ctrl.rescheduleProvisioning, we // keep the volume in the work queue as if the provisioner had // returned ProvisioningFinished and simply try again later. return ProvisioningFinished, err } // Label was removed, stop working on the volume. glog.Info(logOperation(operation, "volume rescheduled because: %v", err)) return ProvisioningFinished, errStopProvision }
// ProvisioningReschedule shouldn't have been returned for volumes without selected node, // but if we get it anyway, then treat it like ProvisioningFinished because we cannot // reschedule. if result == ProvisioningReschedule { result = ProvisioningFinished } return result, err }
glog.Info(logOperation(operation, "volume %q provisioned", volume.Name)) // pv对象结构体额外信息添加 // Set ClaimRef and the PV controller will bind and set annBoundByController for us volume.Spec.ClaimRef = claimRef
// Add external provisioner finalizer if it doesn't already have it if ctrl.addFinalizer && !ctrl.checkFinalizer(volume, finalizerPV) { volume.ObjectMeta.Finalizers = append(volume.ObjectMeta.Finalizers, finalizerPV) }
metav1.SetMetaDataAnnotation(&volume.ObjectMeta, annDynamicallyProvisioned, ctrl.provisionerName) if ctrl.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.6.0")) { volume.Spec.StorageClassName = claimClass } else { metav1.SetMetaDataAnnotation(&volume.ObjectMeta, annClass, claimClass) }
glog.Info(logOperation(operation, "succeeded")) // 创建pv到apiserver if err := ctrl.volumeStore.StoreVolume(claim, volume); err != nil { return ProvisioningFinished, err } return ProvisioningFinished, nil}
复制代码


ctrl.provisioner.Provision:


Provision 方法主要是调用 csi 的 CreateVolume 方法来创建存储,并返回 pv 对象结构体。


主要逻辑:


(1)调用 p.checkDriverCapabilities 检测 driver 提供的能力;


(2)构建 pv 对象名称;


(3)从 StorageClass.Parameters 中获取 fstype 并校验;


(4)从 pvc 中获取存储申请大小;


(5)构建 CreateVolumeRequest 结构体;


(6)获取 provisioner、controllerPublish、nodeStage、nodePublish、controllerExpand 操作对应的 secret 的名称与命名空间;


(7)从 apiserver 获取 provisioner 对应的 secret 对象,存放进 CreateVolumeRequest 结构体;


(8)去除部分 StorageClass.Parameters 中的参数,然后存放进 CreateVolumeRequest 结构体;


(9)调用 p.csiClient.CreateVolume(也即调用 ceph-csi 的 CreateVolume 方法)来创建 ceph 存储;


(10)存储创建成功后,校验创建的大小是否符合需求;


(11)构建 pv 对象结构体并返回。


func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.PersistentVolume, error) {  // The controller should call ProvisionExt() instead, but just in case...  pv, _, err := p.ProvisionExt(options)  return pv, err}
func (p *csiProvisioner) ProvisionExt(options controller.ProvisionOptions) (*v1.PersistentVolume, controller.ProvisioningState, error) { if options.StorageClass == nil { return nil, controller.ProvisioningFinished, errors.New("storage class was nil") }
if options.PVC.Annotations[annStorageProvisioner] != p.driverName && options.PVC.Annotations[annMigratedTo] != p.driverName { // The storage provisioner annotation may not equal driver name but the // PVC could have annotation "migrated-to" which is the new way to // signal a PVC is migrated (k8s v1.17+) return nil, controller.ProvisioningFinished, &controller.IgnoredError{ Reason: fmt.Sprintf("PVC annotated with external-provisioner name %s does not match provisioner driver name %s. This could mean the PVC is not migrated", options.PVC.Annotations[annStorageProvisioner], p.driverName), }
}
migratedVolume := false if p.supportsMigrationFromInTreePluginName != "" { // NOTE: we cannot depend on PVC.Annotations[volume.beta.kubernetes.io/storage-provisioner] to get // the in-tree provisioner name in case of CSI migration scenarios. The annotation will be // set to the CSI provisioner name by PV controller for migration scenarios // so that external provisioner can correctly pick up the PVC pointing to an in-tree plugin if options.StorageClass.Provisioner == p.supportsMigrationFromInTreePluginName { klog.V(2).Infof("translating storage class for in-tree plugin %s to CSI", options.StorageClass.Provisioner) storageClass, err := p.translator.TranslateInTreeStorageClassToCSI(p.supportsMigrationFromInTreePluginName, options.StorageClass) if err != nil { return nil, controller.ProvisioningFinished, fmt.Errorf("failed to translate storage class: %v", err) } options.StorageClass = storageClass migratedVolume = true } else { klog.V(4).Infof("skip translation of storage class for plugin: %s", options.StorageClass.Provisioner) } }
// Make sure the plugin is capable of fulfilling the requested options rc := &requiredCapabilities{} if options.PVC.Spec.DataSource != nil { // PVC.Spec.DataSource.Name is the name of the VolumeSnapshot API object if options.PVC.Spec.DataSource.Name == "" { return nil, controller.ProvisioningFinished, fmt.Errorf("the PVC source not found for PVC %s", options.PVC.Name) }
switch options.PVC.Spec.DataSource.Kind { case snapshotKind: if *(options.PVC.Spec.DataSource.APIGroup) != snapshotAPIGroup { return nil, controller.ProvisioningFinished, fmt.Errorf("the PVC source does not belong to the right APIGroup. Expected %s, Got %s", snapshotAPIGroup, *(options.PVC.Spec.DataSource.APIGroup)) } rc.snapshot = true case pvcKind: rc.clone = true default: klog.Infof("Unsupported DataSource specified (%s), the provisioner won't act on this request", options.PVC.Spec.DataSource.Kind) } } if err := p.checkDriverCapabilities(rc); err != nil { return nil, controller.ProvisioningFinished, err }
if options.PVC.Spec.Selector != nil { return nil, controller.ProvisioningFinished, fmt.Errorf("claim Selector is not supported") }
pvName, err := makeVolumeName(p.volumeNamePrefix, fmt.Sprintf("%s", options.PVC.ObjectMeta.UID), p.volumeNameUUIDLength) if err != nil { return nil, controller.ProvisioningFinished, err }
fsTypesFound := 0 fsType := "" for k, v := range options.StorageClass.Parameters { if strings.ToLower(k) == "fstype" || k == prefixedFsTypeKey { fsType = v fsTypesFound++ } if strings.ToLower(k) == "fstype" { klog.Warningf(deprecationWarning("fstype", prefixedFsTypeKey, "")) } } if fsTypesFound > 1 { return nil, controller.ProvisioningFinished, fmt.Errorf("fstype specified in parameters with both \"fstype\" and \"%s\" keys", prefixedFsTypeKey) } if len(fsType) == 0 { fsType = defaultFSType }
capacity := options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)] volSizeBytes := capacity.Value()
// add by zhongjialiang, if it is rbd request and fstype is xfs, we will check volume request size, it need to be equal or larger than 1G // because if the request size is less than 1G, it may occur an error when kubelet call NodeStageVolume(ceph-csi) // failed to run mkfs error: exit status 1, output: agsize (xxxx blocks) too small, need at least 4096 blocks // // issue: https://git-sa.nie.netease.com/venice/ceph-csi/issues/105 klog.Infof("request volume size is %s (volSizeBytes: %d)", capacity.String(), volSizeBytes) if isRbdRequest(options.StorageClass.Parameters) && fsType == xfsFSType && volSizeBytes < oneGi { return nil, controller.ProvisioningFinished, fmt.Errorf("fstype xfs volume size request must be equal or larger than 1Gi, but your volume size request is %s ", capacity.String()) }
// Get access mode volumeCaps := make([]*csi.VolumeCapability, 0) for _, pvcAccessMode := range options.PVC.Spec.AccessModes { volumeCaps = append(volumeCaps, getVolumeCapability(options, pvcAccessMode, fsType)) }
// Create a CSI CreateVolumeRequest and Response req := csi.CreateVolumeRequest{ Name: pvName, Parameters: options.StorageClass.Parameters, VolumeCapabilities: volumeCaps, CapacityRange: &csi.CapacityRange{ RequiredBytes: int64(volSizeBytes), }, }
if options.PVC.Spec.DataSource != nil && (rc.clone || rc.snapshot) { volumeContentSource, err := p.getVolumeContentSource(options) if err != nil { return nil, controller.ProvisioningNoChange, fmt.Errorf("error getting handle for DataSource Type %s by Name %s: %v", options.PVC.Spec.DataSource.Kind, options.PVC.Spec.DataSource.Name, err) } req.VolumeContentSource = volumeContentSource }
if options.PVC.Spec.DataSource != nil && rc.clone { err = p.setCloneFinalizer(options.PVC) if err != nil { return nil, controller.ProvisioningNoChange, err } }
if p.supportsTopology() { requirements, err := GenerateAccessibilityRequirements( p.client, p.driverName, options.PVC.Name, options.StorageClass.AllowedTopologies, options.SelectedNode, p.strictTopology, p.csiNodeLister, p.nodeLister) if err != nil { return nil, controller.ProvisioningNoChange, fmt.Errorf("error generating accessibility requirements: %v", err) } req.AccessibilityRequirements = requirements }
klog.V(5).Infof("CreateVolumeRequest %+v", req)
rep := &csi.CreateVolumeResponse{}
// Resolve provision secret credentials. provisionerSecretRef, err := getSecretReference(provisionerSecretParams, options.StorageClass.Parameters, pvName, options.PVC) if err != nil { return nil, controller.ProvisioningNoChange, err } provisionerCredentials, err := getCredentials(p.client, provisionerSecretRef) if err != nil { return nil, controller.ProvisioningNoChange, err } req.Secrets = provisionerCredentials
// Resolve controller publish, node stage, node publish secret references controllerPublishSecretRef, err := getSecretReference(controllerPublishSecretParams, options.StorageClass.Parameters, pvName, options.PVC) if err != nil { return nil, controller.ProvisioningNoChange, err } nodeStageSecretRef, err := getSecretReference(nodeStageSecretParams, options.StorageClass.Parameters, pvName, options.PVC) if err != nil { return nil, controller.ProvisioningNoChange, err } nodePublishSecretRef, err := getSecretReference(nodePublishSecretParams, options.StorageClass.Parameters, pvName, options.PVC) if err != nil { return nil, controller.ProvisioningNoChange, err } controllerExpandSecretRef, err := getSecretReference(controllerExpandSecretParams, options.StorageClass.Parameters, pvName, options.PVC) if err != nil { return nil, controller.ProvisioningNoChange, err }
req.Parameters, err = removePrefixedParameters(options.StorageClass.Parameters) if err != nil { return nil, controller.ProvisioningFinished, fmt.Errorf("failed to strip CSI Parameters of prefixed keys: %v", err) }
if p.extraCreateMetadata { // add pvc and pv metadata to request for use by the plugin req.Parameters[pvcNameKey] = options.PVC.GetName() req.Parameters[pvcNamespaceKey] = options.PVC.GetNamespace() req.Parameters[pvNameKey] = pvName }
ctx, cancel := context.WithTimeout(context.Background(), p.timeout) defer cancel() rep, err = p.csiClient.CreateVolume(ctx, &req)
if err != nil { // Giving up after an error and telling the pod scheduler to retry with a different node // only makes sense if: // - The CSI driver supports topology: without that, the next CreateVolume call after // rescheduling will be exactly the same. // - We are working on a volume with late binding: only in that case will // provisioning be retried if we give up for now. // - The error is one where rescheduling is // a) allowed (i.e. we don't have to keep calling CreateVolume because the operation might be running) and // b) it makes sense (typically local resource exhausted). // isFinalError is going to check this. // // We do this regardless whether the driver has asked for strict topology because // even drivers which did not ask for it explicitly might still only look at the first // topology entry and thus succeed after rescheduling. mayReschedule := p.supportsTopology() && options.SelectedNode != nil state := checkError(err, mayReschedule) klog.V(5).Infof("CreateVolume failed, supports topology = %v, node selected %v => may reschedule = %v => state = %v: %v", p.supportsTopology(), options.SelectedNode != nil, mayReschedule, state, err) return nil, state, err }
if rep.Volume != nil { klog.V(3).Infof("create volume rep: %+v", *rep.Volume) } volumeAttributes := map[string]string{provisionerIDKey: p.identity} for k, v := range rep.Volume.VolumeContext { volumeAttributes[k] = v } respCap := rep.GetVolume().GetCapacityBytes()
//According to CSI spec CreateVolume should be able to return capacity = 0, which means it is unknown. for example NFS/FTP if respCap == 0 { respCap = volSizeBytes klog.V(3).Infof("csiClient response volume with size 0, which is not supported by apiServer, will use claim size:%d", respCap) } else if respCap < volSizeBytes { capErr := fmt.Errorf("created volume capacity %v less than requested capacity %v", respCap, volSizeBytes) delReq := &csi.DeleteVolumeRequest{ VolumeId: rep.GetVolume().GetVolumeId(), } err = cleanupVolume(p, delReq, provisionerCredentials) if err != nil { capErr = fmt.Errorf("%v. Cleanup of volume %s failed, volume is orphaned: %v", capErr, pvName, err) } // use InBackground to retry the call, hoping the volume is deleted correctly next time. return nil, controller.ProvisioningInBackground, capErr }
if options.PVC.Spec.DataSource != nil { contentSource := rep.GetVolume().ContentSource if contentSource == nil { sourceErr := fmt.Errorf("volume content source missing") delReq := &csi.DeleteVolumeRequest{ VolumeId: rep.GetVolume().GetVolumeId(), } err = cleanupVolume(p, delReq, provisionerCredentials) if err != nil { sourceErr = fmt.Errorf("%v. cleanup of volume %s failed, volume is orphaned: %v", sourceErr, pvName, err) } return nil, controller.ProvisioningInBackground, sourceErr } }
pv := &v1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ Name: pvName, }, Spec: v1.PersistentVolumeSpec{ AccessModes: options.PVC.Spec.AccessModes, MountOptions: options.StorageClass.MountOptions, Capacity: v1.ResourceList{ v1.ResourceName(v1.ResourceStorage): bytesToGiQuantity(respCap), }, // TODO wait for CSI VolumeSource API PersistentVolumeSource: v1.PersistentVolumeSource{ CSI: &v1.CSIPersistentVolumeSource{ Driver: p.driverName, VolumeHandle: p.volumeIdToHandle(rep.Volume.VolumeId), VolumeAttributes: volumeAttributes, ControllerPublishSecretRef: controllerPublishSecretRef, NodeStageSecretRef: nodeStageSecretRef, NodePublishSecretRef: nodePublishSecretRef, ControllerExpandSecretRef: controllerExpandSecretRef, }, }, }, }
if options.StorageClass.ReclaimPolicy != nil { pv.Spec.PersistentVolumeReclaimPolicy = *options.StorageClass.ReclaimPolicy }
if p.supportsTopology() { pv.Spec.NodeAffinity = GenerateVolumeNodeAffinity(rep.Volume.AccessibleTopology) }
// Set VolumeMode to PV if it is passed via PVC spec when Block feature is enabled if options.PVC.Spec.VolumeMode != nil { pv.Spec.VolumeMode = options.PVC.Spec.VolumeMode } // Set FSType if PV is not Block Volume if !util.CheckPersistentVolumeClaimModeBlock(options.PVC) { pv.Spec.PersistentVolumeSource.CSI.FSType = fsType }
klog.V(2).Infof("successfully created PV %v for PVC %v and csi volume name %v", pv.Name, options.PVC.Name, pv.Spec.CSI.VolumeHandle)
if migratedVolume { pv, err = p.translator.TranslateCSIPVToInTree(pv) if err != nil { klog.Warningf("failed to translate CSI PV to in-tree due to: %v. Deleting provisioned PV", err) deleteErr := p.Delete(pv) if deleteErr != nil { klog.Warningf("failed to delete partly provisioned PV: %v", deleteErr) // Retry the call again to clean up the orphan return nil, controller.ProvisioningInBackground, err } return nil, controller.ProvisioningFinished, err } }
klog.V(5).Infof("successfully created PV %+v", pv.Spec.PersistentVolumeSource) return pv, controller.ProvisioningFinished, nil}
复制代码

2.ctrl.runVolumeWorker

根据 threadiness 的个数,起相应个数的 goroutine,运行 ctrl.runVolumeWorker。


主要负责处理 volumeQueue,处理 pv 对象的新增与更新事件,根据需要调用 csi 组件的 DeleteVolume 方法来删除存储,并删除 pv 对象)。


        for i := 0; i < ctrl.threadiness; i++ {      go wait.Until(ctrl.runClaimWorker, time.Second, context.TODO().Done())      go wait.Until(ctrl.runVolumeWorker, time.Second, context.TODO().Done())    }
复制代码


// vendor/sigs.k8s.io/sig-storage-lib-external-provisioner/v5/controller/controller.go
func (ctrl *ProvisionController) runClaimWorker() { // 无限循环ctrl.processNextClaimWorkItem for ctrl.processNextClaimWorkItem() { }}
复制代码


调用链:main() --> provisionController.Run() --> ctrl.runVolumeWorker() --> ctrl.processNextVolumeWorkItem() --> ctrl.syncVolumeHandler() --> ctrl.syncVolume() --> ctrl.deleteVolumeOperation() --> ctrl.provisioner.Delete()

2.1 crtl.processNextVolumeWorkItem

主要逻辑:


(1)从 volumeQueue 中获取 pv;


(2)调 ctrl.syncClaimHandler 做进一步处理;


(3)处理成功后,清理该 pv 的 rateLimiter;


(4)处理失败后,会进行一定次数的重试,即将该 pv 添加 rateLimiter;


(5)最后,无论调 ctrl.syncVolumeHandler 成功与否,将该 pv 从 volumeQueue 中移除。


主要是调 ctrl.syncVolumeHandler


// processNextVolumeWorkItem processes items from volumeQueuefunc (ctrl *ProvisionController) processNextVolumeWorkItem() bool {    // 从volumeQueue中获取pv  obj, shutdown := ctrl.volumeQueue.Get()
if shutdown { return false }
err := func(obj interface{}) error { // 最后,无论调ctrl.syncVolumeHandler成功与否,将该pv从volumeQueue中移除 defer ctrl.volumeQueue.Done(obj) var key string var ok bool if key, ok = obj.(string); !ok { ctrl.volumeQueue.Forget(obj) return fmt.Errorf("expected string in workqueue but got %#v", obj) } // 调ctrl.syncVolumeHandler做进一步处理 if err := ctrl.syncVolumeHandler(key); err != nil { // 处理失败后,会进行一定次数的重试,即将该pv添加rateLimiter if ctrl.failedDeleteThreshold == 0 { glog.Warningf("Retrying syncing volume %q, failure %v", key, ctrl.volumeQueue.NumRequeues(obj)) ctrl.volumeQueue.AddRateLimited(obj) } else if ctrl.volumeQueue.NumRequeues(obj) < ctrl.failedDeleteThreshold { glog.Warningf("Retrying syncing volume %q because failures %v < threshold %v", key, ctrl.volumeQueue.NumRequeues(obj), ctrl.failedDeleteThreshold) ctrl.volumeQueue.AddRateLimited(obj) } else { glog.Errorf("Giving up syncing volume %q because failures %v >= threshold %v", key, ctrl.volumeQueue.NumRequeues(obj), ctrl.failedDeleteThreshold) // Done but do not Forget: it will not be in the queue but NumRequeues // will be saved until the obj is deleted from kubernetes } return fmt.Errorf("error syncing volume %q: %s", key, err.Error()) } // 处理成功后,清理该pv的rateLimiter ctrl.volumeQueue.Forget(obj) return nil }(obj)
if err != nil { utilruntime.HandleError(err) return true }
return true}
复制代码

2.1.1 ctrl.syncVolumeHandler

主要是调 ctrl.syncVolume


// syncVolumeHandler gets the volume from informer's cache then calls syncVolumefunc (ctrl *ProvisionController) syncVolumeHandler(key string) error {  volumeObj, exists, err := ctrl.volumes.GetByKey(key)  if err != nil {    return err  }  if !exists {    utilruntime.HandleError(fmt.Errorf("volume %q in work queue no longer exists", key))    return nil  }
return ctrl.syncVolume(volumeObj)}
复制代码


2.1.1.1 ctrl.syncVolume


主要逻辑:


(1)调 ctrl.shouldDelete 判断是否要进行删除动作;


(2)要进行删除动作,则调 ctrl.deleteVolumeOperation。


// syncVolume checks if the volume should be deleted and deletes if sofunc (ctrl *ProvisionController) syncVolume(obj interface{}) error {  volume, ok := obj.(*v1.PersistentVolume)  if !ok {    return fmt.Errorf("expected volume but got %+v", obj)  }
if ctrl.shouldDelete(volume) { startTime := time.Now() err := ctrl.deleteVolumeOperation(volume) ctrl.updateDeleteStats(volume, err, startTime) return err } return nil}
复制代码


ctrl.shouldDelete 主要逻辑:


(1)判断 provisioner 是否授权删除动作;


(2)1.9+版本 k8s 中,做 PV protection 相关校验(当 pv 对象的 DeletionTimestamp 不为空时返回 false,说明已经触发了底层存储的删除以及 pv 对象的删除操作);


(3)1.5+版本 k8s 中,当 pv 的 state 不是 Released 或 Failed 时,返回 false;


(2)判断 pv 的回收策略是否为 delete,不是则返回 false


// shouldDelete returns whether a volume should have its backing volume// deleted, i.e. whether a Delete is "desired"func (ctrl *ProvisionController) shouldDelete(volume *v1.PersistentVolume) bool {    // 判断provisioner是否授权删除动作  if deletionGuard, ok := ctrl.provisioner.(DeletionGuard); ok {    if !deletionGuard.ShouldDelete(volume) {      return false    }  }
// In 1.9+ PV protection means the object will exist briefly with a // deletion timestamp even after our successful Delete. Ignore it. if ctrl.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.9.0")) { if ctrl.addFinalizer && !ctrl.checkFinalizer(volume, finalizerPV) && volume.ObjectMeta.DeletionTimestamp != nil { return false } else if volume.ObjectMeta.DeletionTimestamp != nil { return false } }
// In 1.5+ we delete only if the volume is in state Released. In 1.4 we must // delete if the volume is in state Failed too. if ctrl.kubeVersion.AtLeast(utilversion.MustParseSemantic("v1.5.0")) { if volume.Status.Phase != v1.VolumeReleased { return false } } else { if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed { return false } } // 判断回收策略是否为delete,不是则返回false if volume.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimDelete { return false }
if !metav1.HasAnnotation(volume.ObjectMeta, annDynamicallyProvisioned) { return false }
ann := volume.Annotations[annDynamicallyProvisioned] migratedTo := volume.Annotations[annMigratedTo] if ann != ctrl.provisionerName && migratedTo != ctrl.provisionerName { return false }
return true}
复制代码


2.1.1.2 crtl.deleteVolumeOperation


主要逻辑:


(1)调 provisioner.Delete 删除 rbd image;


(2)从 apiserver 中删除 pv 对象;


(3)Finalizer 相关处理。


// deleteVolumeOperation attempts to delete the volume backing the given// volume. Returns error, which indicates whether deletion should be retried// (requeue the volume) or notfunc (ctrl *ProvisionController) deleteVolumeOperation(volume *v1.PersistentVolume) error {  operation := fmt.Sprintf("delete %q", volume.Name)  glog.Info(logOperation(operation, "started"))
// This method may have been waiting for a volume lock for some time. // Our check does not have to be as sophisticated as PV controller's, we can // trust that the PV controller has set the PV to Released/Failed and it's // ours to delete newVolume, err := ctrl.client.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{}) if err != nil { return nil } if !ctrl.shouldDelete(newVolume) { glog.Info(logOperation(operation, "persistentvolume no longer needs deletion, skipping")) return nil }
err = ctrl.provisioner.Delete(volume) if err != nil { if ierr, ok := err.(*IgnoredError); ok { // Delete ignored, do nothing and hope another provisioner will delete it. glog.Info(logOperation(operation, "volume deletion ignored: %v", ierr)) return nil } // Delete failed, emit an event. glog.Error(logOperation(operation, "volume deletion failed: %v", err)) ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, "VolumeFailedDelete", err.Error()) return err }
glog.Info(logOperation(operation, "volume deleted"))
// Delete the volume if err = ctrl.client.CoreV1().PersistentVolumes().Delete(volume.Name, nil); err != nil { // Oops, could not delete the volume and therefore the controller will // try to delete the volume again on next update. glog.Info(logOperation(operation, "failed to delete persistentvolume: %v", err)) return err }
if ctrl.addFinalizer { if len(newVolume.ObjectMeta.Finalizers) > 0 { // Remove external-provisioner finalizer
// need to get the pv again because the delete has updated the object with a deletion timestamp newVolume, err := ctrl.client.CoreV1().PersistentVolumes().Get(volume.Name, metav1.GetOptions{}) if err != nil { // If the volume is not found return, otherwise error if !apierrs.IsNotFound(err) { glog.Info(logOperation(operation, "failed to get persistentvolume to update finalizer: %v", err)) return err } return nil } finalizers := make([]string, 0) for _, finalizer := range newVolume.ObjectMeta.Finalizers { if finalizer != finalizerPV { finalizers = append(finalizers, finalizer) } }
// Only update the finalizers if we actually removed something if len(finalizers) != len(newVolume.ObjectMeta.Finalizers) { newVolume.ObjectMeta.Finalizers = finalizers if _, err = ctrl.client.CoreV1().PersistentVolumes().Update(newVolume); err != nil { if !apierrs.IsNotFound(err) { // Couldn't remove finalizer and the object still exists, the controller may // try to remove the finalizer again on the next update glog.Info(logOperation(operation, "failed to remove finalizer for persistentvolume: %v", err)) return err } } } } }
glog.Info(logOperation(operation, "persistentvolume deleted"))
glog.Info(logOperation(operation, "succeeded")) return nil}
复制代码


ctrl.provisioner.Delete 主要是调 ceph-csi 组件的 DeleteVolume 来删除存储。


主要逻辑:


(1)获取 volumeId;


(2)构建 DeleteVolumeRequest 请求结构体;


(3)从 apiserver 中获取删除存储所需的 secret 对象,并存放进 DeleteVolumeRequest 请求结构体;


(4)调用 p.csiClient.DeleteVolume(也即调用 ceph-csi 的 DeleteVolume 方法)来删除存储。


func (p *csiProvisioner) Delete(volume *v1.PersistentVolume) error {  if volume == nil {    return fmt.Errorf("invalid CSI PV")  }
var err error if p.translator.IsPVMigratable(volume) { // we end up here only if CSI migration is enabled in-tree (both overall // and for the specific plugin that is migratable) causing in-tree PV // controller to yield deletion of PVs with in-tree source to external provisioner // based on AnnDynamicallyProvisioned annotation. volume, err = p.translator.TranslateInTreePVToCSI(volume) if err != nil { return err } }
if volume.Spec.CSI == nil { return fmt.Errorf("invalid CSI PV") }
volumeId := p.volumeHandleToId(volume.Spec.CSI.VolumeHandle)
rc := &requiredCapabilities{} if err := p.checkDriverCapabilities(rc); err != nil { return err }
req := csi.DeleteVolumeRequest{ VolumeId: volumeId, }
// get secrets if StorageClass specifies it storageClassName := util.GetPersistentVolumeClass(volume) if len(storageClassName) != 0 { if storageClass, err := p.scLister.Get(storageClassName); err == nil {
// edit by zhongjialiang // issue: https://git-sa.nie.netease.com/venice/ceph-csi/issues/85 secretParams := provisionerSecretParams
if isRbdRequest(storageClass.Parameters) { secretParams = deleteSecretParams }
// Resolve provision secret credentials. provisionerSecretRef, err := getSecretReference(secretParams, storageClass.Parameters, volume.Name, &v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ Name: volume.Spec.ClaimRef.Name, Namespace: volume.Spec.ClaimRef.Namespace, }, }) if err != nil { return fmt.Errorf("failed to get secretreference for volume %s: %v", volume.Name, err) }
credentials, err := getCredentials(p.client, provisionerSecretRef) if err != nil { // Continue with deletion, as the secret may have already been deleted. klog.Errorf("Failed to get credentials for volume %s: %s", volume.Name, err.Error()) } req.Secrets = credentials } else { klog.Warningf("failed to get storageclass: %s, proceeding to delete without secrets. %v", storageClassName, err) } } ctx, cancel := context.WithTimeout(context.Background(), p.timeout) defer cancel()
_, err = p.csiClient.DeleteVolume(ctx, &req)
return err}
复制代码


到这里,对 external-provisioner 的 provisionController 的分析就结束了,下面将对 provisionController 的分析做一个简单的总结。

总结

provisionController 主要负责处理 claimQueue(也即处理 pvc 对象的新增与更新事件),根据需要调用 ceph-csi 组件的 CreateVolume 方法来创建存储,并创建 pv 对象;与处理 volumeQueue(也即处理 pv 对象的新增与更新事件),根据 pv 的状态以及回收策略决定是否调用 ceph-csi 组件的 DeleteVolume 方法来删除存储,并删除 pv 对象。

发布于: 2021 年 04 月 17 日阅读数: 25
用户头像

良凯尔

关注

热爱的力量 2020.01.10 加入

kubernetes开发者

评论

发布
暂无评论
external-provisioner源码分析(1)-主体处理逻辑分析