kubernetes ceph-csi 分析 - 目录导航:
https://xie.infoq.cn/article/4b1d3e32f124307a49cd9c1e3
存储的扩容分为 controller 端操作与 node 端操作两大步骤,controller 端操作由 external-resizer 来调用 ceph 完成,而 node 端操作由 kubelet 来完成,下面来分析下 kubelet 中有关存储扩容的相关代码。
基于 tag v1.17.4
https://github.com/kubernetes/kubernetes/releases/tag/v1.17.4
controller 端存储扩容作用
将底层存储扩容,如 ceph rbd 扩容,则会让 ceph 集群中的 rbd image 扩容。
node 端存储扩容作用
在 pod 所在的 node 上做相应的操作,让 node 感知该存储已经扩容,如 ceph rbd filesystem 扩容,则会调用 node 上的文件系统扩容命令让文件系统扩容。
某些存储无需进行 node 端扩容操作如 cephfs。
存储扩容大致过程
(1)更改 pvc.Spec.Resources.Requests.storgage,触发扩容
(2)controller 端存储扩容:external-resizer watch pvc 对象,当发现 pvc.Spec.Resources.Requests.storgage 比 pvc.Status.Capacity.storgage 大,于是调 csi plugin 的 ControllerExpandVolume 方法进行 controller 端扩容,进行底层存储扩容,并更新 pv.Spec.Capacity.storgage。
(3)node 端存储扩容:kubelet 发现 pv.Spec.Capacity.storage 大于 pvc.Status.Capacity.storage,于是调 csi node 端扩容,对 dnode 上文件系统扩容,成功后 kubelet 更新 pvc.Status.Capacity.storage。
存储扩容详细流程
下面以 ceph rbd 存储扩容为例,对详细的存储扩容过程进行分析。
(1)修改 pvc 对象,修改申请存储大小(pvc.spec.resources.requests.storage);
(2)修改成功后,external-resizer 监听到该 pvc 的 update 事件,发现 pvc.Spec.Resources.Requests.storgage 比 pvc.Status.Capacity.storgage 大,于是调 ceph-csi 组件进行 controller 端扩容;
(3)ceph-csi 组件调用 ceph 存储,进行底层存储扩容;
(4)底层存储扩容完成后,ceph-csi 组件更新 pv 对象的.Spec.Capacity.storgage 的值为扩容后的存储大小;
(5)kubelet 的 volume manager 在 reconcile()调谐过程中发现 pv.Spec.Capacity.storage 大于 pvc.Status.Capacity.storage,于是调 ceph-csi 组件进行 node 端扩容;
(6)ceph-csi 组件对 node 上存储对应的文件系统扩容;
(7)扩容完成后,kubelet 更新 pvc.Status.Capacity.storage 的值为扩容后的存储大小。
下面主要对 kubelet 中的存储扩容相关的代码进行分析,controller 端存储扩容分析将在后续分析 external-resizer 时进行分析。
volumeManager.Run
关于存储扩容,主要看到两个主要方法:
(1)vm.desiredStateOfWorldPopulator.Run:主要负责找到并标记需要扩容的存储;
(2)vm.reconciler.Run:主要负责对需要扩容的存储触发进行扩容操作。
func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
defer runtime.HandleCrash()
go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
klog.V(2).Infof("The desired_state_of_world populator starts")
klog.Infof("Starting Kubelet Volume Manager")
go vm.reconciler.Run(stopCh)
metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)
if vm.kubeClient != nil {
// start informer for CSIDriver
vm.volumePluginMgr.Run(stopCh)
}
<-stopCh
klog.Infof("Shutting down Kubelet Volume Manager")
}
复制代码
1.vm.desiredStateOfWorldPopulator.Run
主要是对 dswp.populatorLoop 的调用
func (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
// Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly
klog.Infof("Desired state populator starts to run")
wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) {
done := sourcesReady.AllReady()
dswp.populatorLoop()
return done, nil
}, stopCh)
dswp.hasAddedPodsLock.Lock()
dswp.hasAddedPods = true
dswp.hasAddedPodsLock.Unlock()
wait.Until(dswp.populatorLoop, dswp.loopSleepDuration, stopCh)
}
复制代码
populatorLoop 中调用 dswp.findAndAddNewPods
func (dswp *desiredStateOfWorldPopulator) populatorLoop() {
dswp.findAndAddNewPods()
// findAndRemoveDeletedPods() calls out to the container runtime to
// determine if the containers for a given pod are terminated. This is
// an expensive operation, therefore we limit the rate that
// findAndRemoveDeletedPods() is called independently of the main
// populator loop.
if time.Since(dswp.timeOfLastGetPodStatus) < dswp.getPodStatusRetryDuration {
klog.V(5).Infof(
"Skipping findAndRemoveDeletedPods(). Not permitted until %v (getPodStatusRetryDuration %v).",
dswp.timeOfLastGetPodStatus.Add(dswp.getPodStatusRetryDuration),
dswp.getPodStatusRetryDuration)
return
}
dswp.findAndRemoveDeletedPods()
}
复制代码
findAndAddNewPods 中主要看到 dswp.processPodVolumes
// Iterate through all pods and add to desired state of world if they don't
// exist but should
func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {
// Map unique pod name to outer volume name to MountedVolume.
mountedVolumesForPod := make(map[volumetypes.UniquePodName]map[string]cache.MountedVolume)
if utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
for _, mountedVolume := range dswp.actualStateOfWorld.GetMountedVolumes() {
mountedVolumes, exist := mountedVolumesForPod[mountedVolume.PodName]
if !exist {
mountedVolumes = make(map[string]cache.MountedVolume)
mountedVolumesForPod[mountedVolume.PodName] = mountedVolumes
}
mountedVolumes[mountedVolume.OuterVolumeSpecName] = mountedVolume
}
}
processedVolumesForFSResize := sets.NewString()
for _, pod := range dswp.podManager.GetPods() {
if dswp.isPodTerminated(pod) {
// Do not (re)add volumes for terminated pods
continue
}
dswp.processPodVolumes(pod, mountedVolumesForPod, processedVolumesForFSResize)
}
}
复制代码
processPodVolumes 主要是调用 dswp.checkVolumeFSResize 对需要扩容的存储进行标记
// processPodVolumes processes the volumes in the given pod and adds them to the
// desired state of the world.
func (dswp *desiredStateOfWorldPopulator) processPodVolumes(
pod *v1.Pod,
mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume,
processedVolumesForFSResize sets.String) {
......
expandInUsePV := utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes)
// Process volume spec for each volume defined in pod
for _, podVolume := range pod.Spec.Volumes {
......
if expandInUsePV {
dswp.checkVolumeFSResize(pod, podVolume, pvc, volumeSpec,
uniquePodName, mountedVolumesForPod, processedVolumesForFSResize)
}
}
......
}
复制代码
1.1 checkVolumeFSResize
主要逻辑:
(1)调用 volumeRequiresFSResize 判断是否需要扩容;
(2)调用 dswp.actualStateOfWorld.MarkFSResizeRequired 做进标记处理。
// checkVolumeFSResize checks whether a PVC mounted by the pod requires file
// system resize or not. If so, marks this volume as fsResizeRequired in ASW.
// - mountedVolumesForPod stores all mounted volumes in ASW, because online
// volume resize only considers mounted volumes.
// - processedVolumesForFSResize stores all volumes we have checked in current loop,
// because file system resize operation is a global operation for volume, so
// we only need to check it once if more than one pod use it.
func (dswp *desiredStateOfWorldPopulator) checkVolumeFSResize(
pod *v1.Pod,
podVolume v1.Volume,
pvc *v1.PersistentVolumeClaim,
volumeSpec *volume.Spec,
uniquePodName volumetypes.UniquePodName,
mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume,
processedVolumesForFSResize sets.String) {
if podVolume.PersistentVolumeClaim == nil {
// Only PVC supports resize operation.
return
}
uniqueVolumeName, exist := getUniqueVolumeName(uniquePodName, podVolume.Name, mountedVolumesForPod)
if !exist {
// Volume not exist in ASW, we assume it hasn't been mounted yet. If it needs resize,
// it will be handled as offline resize(if it indeed hasn't been mounted yet),
// or online resize in subsequent loop(after we confirm it has been mounted).
return
}
if processedVolumesForFSResize.Has(string(uniqueVolumeName)) {
// File system resize operation is a global operation for volume,
// so we only need to check it once if more than one pod use it.
return
}
// volumeSpec.ReadOnly is the value that determines if volume could be formatted when being mounted.
// This is the same flag that determines filesystem resizing behaviour for offline resizing and hence
// we should use it here. This value comes from Pod.spec.volumes.persistentVolumeClaim.readOnly.
if volumeSpec.ReadOnly {
// This volume is used as read only by this pod, we don't perform resize for read only volumes.
klog.V(5).Infof("Skip file system resize check for volume %s in pod %s/%s "+
"as the volume is mounted as readonly", podVolume.Name, pod.Namespace, pod.Name)
return
}
if volumeRequiresFSResize(pvc, volumeSpec.PersistentVolume) {
dswp.actualStateOfWorld.MarkFSResizeRequired(uniqueVolumeName, uniquePodName)
}
processedVolumesForFSResize.Insert(string(uniqueVolumeName))
}
复制代码
1.1.1 volumeRequiresFSResize
pv.Spec.Capacity.storage 大小比 pvc.Status.Capacity.storage 大小要大时返回 true
func volumeRequiresFSResize(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool {
capacity := pvc.Status.Capacity[v1.ResourceStorage]
requested := pv.Spec.Capacity[v1.ResourceStorage]
return requested.Cmp(capacity) > 0
}
复制代码
1.1.2 MarkFSResizeRequired
主要逻辑:
(1)获取 volume 对应的 volumePlugin;
(2)调用 volumePlugin.RequiresFSResize()判断 plugin 是否支持 resize;
(3)plugin 支持则设置 podObj 的 fsResizeRequired 属性为 true。(reconcile 中会根据 podObj 的 fsResizeRequired 属性为 true 来触发 node 端 resize 操作)
func (asw *actualStateOfWorld) MarkFSResizeRequired(
volumeName v1.UniqueVolumeName,
podName volumetypes.UniquePodName) {
asw.Lock()
defer asw.Unlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
klog.Warningf("MarkFSResizeRequired for volume %s failed as volume not exist", volumeName)
return
}
podObj, podExists := volumeObj.mountedPods[podName]
if !podExists {
klog.Warningf("MarkFSResizeRequired for volume %s failed "+
"as pod(%s) not exist", volumeName, podName)
return
}
volumePlugin, err :=
asw.volumePluginMgr.FindNodeExpandablePluginBySpec(podObj.volumeSpec)
if err != nil || volumePlugin == nil {
// Log and continue processing
klog.Errorf(
"MarkFSResizeRequired failed to find expandable plugin for pod %q volume: %q (volSpecName: %q)",
podObj.podName,
volumeObj.volumeName,
podObj.volumeSpec.Name())
return
}
if volumePlugin.RequiresFSResize() {
if !podObj.fsResizeRequired {
klog.V(3).Infof("PVC volume %s(OuterVolumeSpecName %s) of pod %s requires file system resize",
volumeName, podObj.outerVolumeSpecName, podName)
podObj.fsResizeRequired = true
}
asw.attachedVolumes[volumeName].mountedPods[podName] = podObj
}
}
复制代码
2.vm.reconciler.Run
func (rc *reconciler) Run(stopCh <-chan struct{}) {
wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh)
}
func (rc *reconciler) reconciliationLoopFunc() func() {
return func() {
rc.reconcile()
// Sync the state with the reality once after all existing pods are added to the desired state from all sources.
// Otherwise, the reconstruct process may clean up pods' volumes that are still in use because
// desired state of world does not contain a complete list of pods.
if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() {
klog.Infof("Reconciler: start to sync state")
rc.sync()
}
}
}
复制代码
省略了部分代码,下面列出的是扩容相关代码。
扩容相关主要逻辑:
(1)调用 rc.actualStateOfWorld.PodExistsInVolume;
(2)判断上一步骤的返回是否是 IsFSResizeRequiredError,true 时调用 rc.operationExecutor.ExpandInUseVolume 触发扩容操作。
func (rc *reconciler) reconcile() {
......
// Ensure volumes that should be attached/mounted are attached/mounted.
for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)
volumeToMount.DevicePath = devicePath
if cache.IsVolumeNotAttachedError(err) {
......
} else if !volMounted || cache.IsRemountRequiredError(err) {
......
} else if cache.IsFSResizeRequiredError(err) &&
utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""))
err := rc.operationExecutor.ExpandInUseVolume(
volumeToMount.VolumeToMount,
rc.actualStateOfWorld)
if err != nil &&
!nestedpendingoperations.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
klog.Errorf(volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error())
}
if err == nil {
klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""))
}
}
}
......
}
复制代码
2.1 rc.actualStateOfWorld.PodExistsInVolume
扩容相关主要逻辑:
(1)从 actualStateOfWorld 中获取获取 volumeObj;
(2)从 volumeObj 中获取 podObj;
(3)判断 podObj 的 fsResizeRequired 属性,true 时返回 newFsResizeRequiredError。
func (asw *actualStateOfWorld) PodExistsInVolume(
podName volumetypes.UniquePodName,
volumeName v1.UniqueVolumeName) (bool, string, error) {
asw.RLock()
defer asw.RUnlock()
volumeObj, volumeExists := asw.attachedVolumes[volumeName]
if !volumeExists {
return false, "", newVolumeNotAttachedError(volumeName)
}
podObj, podExists := volumeObj.mountedPods[podName]
if podExists {
if podObj.remountRequired {
return true, volumeObj.devicePath, newRemountRequiredError(volumeObj.volumeName, podObj.podName)
}
if podObj.fsResizeRequired &&
utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {
return true, volumeObj.devicePath, newFsResizeRequiredError(volumeObj.volumeName, podObj.podName)
}
}
return podExists, volumeObj.devicePath, nil
}
复制代码
2.2 rc.operationExecutor.ExpandInUseVolume
调用 oe.operationGenerator.GenerateExpandInUseVolumeFunc 做进一步处理
func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater) error {
generatedOperations, err := oe.operationGenerator.GenerateExpandInUseVolumeFunc(volumeToMount, actualStateOfWorld)
if err != nil {
return err
}
return oe.pendingOperations.Run(volumeToMount.VolumeName, "", generatedOperations)
}
复制代码
GenerateExpandInUseVolumeFunc 中主要看到 og.doOnlineExpansion
func (og *operationGenerator) GenerateExpandInUseVolumeFunc(
volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
volumePlugin, err :=
og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
if err != nil || volumePlugin == nil {
return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("NodeExpandVolume.FindPluginBySpec failed", err)
}
fsResizeFunc := func() (error, error) {
var resizeDone bool
var simpleErr, detailedErr error
resizeOptions := volume.NodeResizeOptions{
VolumeSpec: volumeToMount.VolumeSpec,
}
attachableVolumePlugin, _ :=
og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec)
if attachableVolumePlugin != nil {
volumeAttacher, _ := attachableVolumePlugin.NewAttacher()
if volumeAttacher != nil {
resizeOptions.CSIVolumePhase = volume.CSIVolumeStaged
resizeOptions.DevicePath = volumeToMount.DevicePath
dmp, err := volumeAttacher.GetDeviceMountPath(volumeToMount.VolumeSpec)
if err != nil {
return volumeToMount.GenerateError("NodeExpandVolume.GetDeviceMountPath failed", err)
}
resizeOptions.DeviceMountPath = dmp
resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions)
if simpleErr != nil || detailedErr != nil {
return simpleErr, detailedErr
}
if resizeDone {
return nil, nil
}
}
}
// if we are here that means volume plugin does not support attach interface
volumeMounter, newMounterErr := volumePlugin.NewMounter(
volumeToMount.VolumeSpec,
volumeToMount.Pod,
volume.VolumeOptions{})
if newMounterErr != nil {
return volumeToMount.GenerateError("NodeExpandVolume.NewMounter initialization failed", newMounterErr)
}
resizeOptions.DeviceMountPath = volumeMounter.GetPath()
resizeOptions.CSIVolumePhase = volume.CSIVolumePublished
resizeDone, simpleErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions)
if simpleErr != nil || detailedErr != nil {
return simpleErr, detailedErr
}
if resizeDone {
return nil, nil
}
// This is a placeholder error - we should NEVER reach here.
err = fmt.Errorf("volume resizing failed for unknown reason")
return volumeToMount.GenerateError("NodeExpandVolume.NodeExpandVolume failed to resize volume", err)
}
eventRecorderFunc := func(err *error) {
if *err != nil {
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error())
}
}
return volumetypes.GeneratedOperations{
OperationName: "volume_fs_resize",
OperationFunc: fsResizeFunc,
EventRecorderFunc: eventRecorderFunc,
CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToMount.VolumeSpec), "volume_fs_resize"),
}, nil
}
复制代码
og.doOnlineExpansion
doOnlineExpansion 主要是调用 og.nodeExpandVolume
func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount,
actualStateOfWorld ActualStateOfWorldMounterUpdater,
resizeOptions volume.NodeResizeOptions) (bool, error, error) {
resizeDone, err := og.nodeExpandVolume(volumeToMount, resizeOptions)
if err != nil {
klog.Errorf("NodeExpandVolume.NodeExpandVolume failed : %v", err)
e1, e2 := volumeToMount.GenerateError("NodeExpandVolume.NodeExpandVolume failed", err)
return false, e1, e2
}
if resizeDone {
markFSResizedErr := actualStateOfWorld.MarkVolumeAsResized(volumeToMount.PodName, volumeToMount.VolumeName)
if markFSResizedErr != nil {
// On failure, return error. Caller will log and retry.
e1, e2 := volumeToMount.GenerateError("NodeExpandVolume.MarkVolumeAsResized failed", markFSResizedErr)
return false, e1, e2
}
return true, nil, nil
}
return false, nil, nil
}
复制代码
og.nodeExpandVolume
og.nodeExpandVolume 主要逻辑:
(1)获取扩容 plugin;
(2)获取 pv 与 pvc 对象;
(3)当 pv.Spec.Capacity 比 pvc.Status.Capacity 大时,调用 expandableVolumePlugin.NodeExpand 进行扩容;
(4)扩容完成,调用 util.MarkFSResizeFinished,更新 PVC.Status.Capacity.storage 的值为扩容后的存储大小值。
func (og *operationGenerator) nodeExpandVolume(volumeToMount VolumeToMount, rsOpts volume.NodeResizeOptions) (bool, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) {
klog.V(4).Infof("Resizing is not enabled for this volume %s", volumeToMount.VolumeName)
return true, nil
}
if volumeToMount.VolumeSpec != nil &&
volumeToMount.VolumeSpec.InlineVolumeSpecForCSIMigration {
klog.V(4).Infof("This volume %s is a migrated inline volume and is not resizable", volumeToMount.VolumeName)
return true, nil
}
// Get expander, if possible
expandableVolumePlugin, _ :=
og.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeToMount.VolumeSpec)
if expandableVolumePlugin != nil &&
expandableVolumePlugin.RequiresFSResize() &&
volumeToMount.VolumeSpec.PersistentVolume != nil {
pv := volumeToMount.VolumeSpec.PersistentVolume
pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(pv.Spec.ClaimRef.Name, metav1.GetOptions{})
if err != nil {
// Return error rather than leave the file system un-resized, caller will log and retry
return false, fmt.Errorf("MountVolume.NodeExpandVolume get PVC failed : %v", err)
}
pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage]
if pvcStatusCap.Cmp(pvSpecCap) < 0 {
// File system resize was requested, proceed
klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)))
if volumeToMount.VolumeSpec.ReadOnly {
simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume failed", "requested read-only file system")
klog.Warningf(detailedMsg)
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
return true, nil
}
rsOpts.VolumeSpec = volumeToMount.VolumeSpec
rsOpts.NewSize = pvSpecCap
rsOpts.OldSize = pvcStatusCap
resizeDone, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts)
if resizeErr != nil {
return false, fmt.Errorf("MountVolume.NodeExpandVolume failed : %v", resizeErr)
}
// Volume resizing is not done but it did not error out. This could happen if a CSI volume
// does not have node stage_unstage capability but was asked to resize the volume before
// node publish. In which case - we must retry resizing after node publish.
if !resizeDone {
return false, nil
}
simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", "")
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
klog.Infof(detailedMsg)
// File system resize succeeded, now update the PVC's Capacity to match the PV's
err = util.MarkFSResizeFinished(pvc, pvSpecCap, og.kubeClient)
if err != nil {
// On retry, NodeExpandVolume will be called again but do nothing
return false, fmt.Errorf("MountVolume.NodeExpandVolume update PVC status failed : %v", err)
}
return true, nil
}
}
return true, nil
}
复制代码
expandableVolumePlugin.NodeExpand
NodeExpand 中会调用 util.CheckVolumeModeFilesystem 来检查 volumemode 是否是 block,如果是 block,则不用进行 node 端扩容操作。
func (c *csiPlugin) NodeExpand(resizeOptions volume.NodeResizeOptions) (bool, error) {
klog.V(4).Infof(log("Expander.NodeExpand(%s)", resizeOptions.DeviceMountPath))
csiSource, err := getCSISourceFromSpec(resizeOptions.VolumeSpec)
if err != nil {
return false, errors.New(log("Expander.NodeExpand failed to get CSI persistent source: %v", err))
}
csClient, err := newCsiDriverClient(csiDriverName(csiSource.Driver))
if err != nil {
return false, err
}
fsVolume, err := util.CheckVolumeModeFilesystem(resizeOptions.VolumeSpec)
if err != nil {
return false, errors.New(log("Expander.NodeExpand failed to check VolumeMode of source: %v", err))
}
return c.nodeExpandWithClient(resizeOptions, csiSource, csClient, fsVolume)
}
复制代码
MarkFSResizeFinished
更新 PVC 对象,将.Status.Capacity.storage 的值为扩容后的存储大小值
// MarkFSResizeFinished marks file system resizing as done
func MarkFSResizeFinished(
pvc *v1.PersistentVolumeClaim,
newSize resource.Quantity,
kubeClient clientset.Interface) error {
newPVC := pvc.DeepCopy()
newPVC.Status.Capacity[v1.ResourceStorage] = newSize
newPVC = MergeResizeConditionOnPVC(newPVC, []v1.PersistentVolumeClaimCondition{})
_, err := PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
return err
}
// PatchPVCStatus updates PVC status using PATCH verb
// Don't use Update because this can be called from kubelet and if kubelet has an older client its
// Updates will overwrite new fields. And to avoid writing to a stale object, add ResourceVersion
// to the patch so that Patch will fail if the patch's RV != actual up-to-date RV like Update would
func PatchPVCStatus(
oldPVC *v1.PersistentVolumeClaim,
newPVC *v1.PersistentVolumeClaim,
kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
patchBytes, err := createPVCPatch(oldPVC, newPVC)
if err != nil {
return nil, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, err)
}
updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(oldPVC.Namespace).
Patch(oldPVC.Name, types.StrategicMergePatchType, patchBytes, "status")
if updateErr != nil {
return nil, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, updateErr)
}
return updatedClaim, nil
}
复制代码
总结
存储的扩容分为 controller 端操作与 node 端操作两大步骤,controller 端操作由 external-resizer 来调用 ceph 完成,而 node 端操作由 kubelet 来完成。
controller 端存储扩容作用
将底层存储扩容,如 ceph rbd 扩容,则会让 ceph 集群中的 rbd image 扩容。
node 端存储扩容作用
在 pod 所在的 node 上做相应的操作,让 node 感知该存储已经扩容,如 ceph rbd filesystem 扩容,则会调用 node 上的文件系统扩容命令让文件系统扩容。
某些存储无需进行 node 端扩容操作如 cephfs。
存储扩容大致过程
(1)更改 pvc.Spec.Resources.Requests.storgage,触发扩容
(2)controller 端存储扩容:external-resizer watch pvc 对象,当发现 pvc.Spec.Resources.Requests.storgage 比 pvc.Status.Capacity.storgage 大,于是调 csi plugin 的 ControllerExpandVolume 方法进行 controller 端扩容,进行底层存储扩容,并更新 pv.Spec.Capacity.storgage。
(3)node 端存储扩容:kubelet 发现 pv.Spec.Capacity.storage 大于 pvc.Status.Capacity.storage,于是调 csi node 端扩容,对 dnode 上文件系统扩容,成功后 kubelet 更新 pvc.Status.Capacity.storage。
存储扩容整体流程
如图,整体的存储扩容步骤如下:
(1)修改 pvc 对象,修改申请存储大小(pvc.spec.resources.requests.storage);
(2)修改成功后,external-resizer 监听到该 pvc 的 update 事件,发现 pvc.Spec.Resources.Requests.storgage 比 pvc.Status.Capacity.storgage 大,于是调 ceph-csi 组件进行 controller 端扩容;
(3)ceph-csi 组件调用 ceph 存储,进行底层存储扩容;
(4)底层存储扩容完成后,ceph-csi 组件更新 pv 对象的.Spec.Capacity.storgage 的值为扩容后的存储大小;
(5)kubelet 的 volume manager 在 reconcile()调谐过程中发现 pv.Spec.Capacity.storage 大于 pvc.Status.Capacity.storage,于是调 ceph-csi 组件进行 node 端扩容;
(6)ceph-csi 组件对 node 上存储对应的文件系统扩容;
(7)扩容完成后,kubelet 更新 pvc.Status.Capacity.storage 的值为扩容后的存储大小。
node 端(kubelet)存储扩容调用链
vm.reconciler.Run --> rc.operationExecutor.ExpandInUseVolume --> oe.operationGenerator.GenerateExpandInUseVolumeFunc --> og.doOnlineExpansion --> og.nodeExpandVolume --> expander.NodeExpand (pkg/volume/csi/expander.go) --> csClient.NodeExpandVolume
评论