写点什么

kubelet 之 volume manager 源码分析

用户头像
良凯尔
关注
发布于: 2021 年 06 月 06 日
kubelet之volume manager源码分析

kubernetes ceph-csi 分析 - 目录导航:

https://xie.infoq.cn/article/4b1d3e32f124307a49cd9c1e3

基于 tag v1.17.4

https://github.com/kubernetes/kubernetes/releases/tag/v1.17.4

概述

volume manager 存在于 kubelet 中,主要是管理存储卷的 attach/detach(与 AD controller 作用相同,通过 kubelet 启动参数控制哪个组件来做该操作,后续会详细介绍)、mount/umount 等操作。

简介

容器的存储挂载分为两大步:


(1)attach;


(2)mount。


解除容器存储挂载分为两大步:


(1)umount;


(2)detach。


attach/detach 操作可以由 kube-controller-manager 或者 kubelet 中的 volume manager 来完成,根据启动参数enable-controller-attach-detach来决定;而 mount/umount 操作只由 kubelet 中的 volume manager 来完成。

VolumeManager 接口

(1)运行在 kubelet 里让存储 Ready 的部件,主要是 mount/unmount(attach/detach 可选);


(2)pod 调度到这个 node 上后才会有卷的相应操作,所以它的触发端是 kubelet(严格讲是 kubelet 里的 pod manager),根据 Pod Manager 里 pod spec 里申明的存储来触发卷的挂载操作;


(3)Kubelet 会监听到调度到该节点上的 pod 声明,会把 pod 缓存到 Pod Manager 中,VolumeManager 通过 Pod Manager 获取 PV/PVC 的状态,并进行分析出具体的 attach/detach、mount/umount, 操作然后调用 plugin 进行相应的业务处理。


// pkg/kubelet/volumemanager/volume_manager.go
// VolumeManager runs a set of asynchronous loops that figure out which volumes// need to be attached/mounted/unmounted/detached based on the pods scheduled on// this node and makes it so.type VolumeManager interface { // Starts the volume manager and all the asynchronous loops that it controls Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
// WaitForAttachAndMount processes the volumes referenced in the specified // pod and blocks until they are all attached and mounted (reflected in // actual state of the world). // An error is returned if all volumes are not attached and mounted within // the duration defined in podAttachAndMountTimeout. WaitForAttachAndMount(pod *v1.Pod) error
// GetMountedVolumesForPod returns a VolumeMap containing the volumes // referenced by the specified pod that are successfully attached and // mounted. The key in the map is the OuterVolumeSpecName (i.e. // pod.Spec.Volumes[x].Name). It returns an empty VolumeMap if pod has no // volumes. GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
// GetExtraSupplementalGroupsForPod returns a list of the extra // supplemental groups for the Pod. These extra supplemental groups come // from annotations on persistent volumes that the pod depends on. GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64
// GetVolumesInUse returns a list of all volumes that implement the volume.Attacher // interface and are currently in use according to the actual and desired // state of the world caches. A volume is considered "in use" as soon as it // is added to the desired state of world, indicating it *should* be // attached to this node and remains "in use" until it is removed from both // the desired state of the world and the actual state of the world, or it // has been unmounted (as indicated in actual state of world). GetVolumesInUse() []v1.UniqueVolumeName
// ReconcilerStatesHasBeenSynced returns true only after the actual states in reconciler // has been synced at least once after kubelet starts so that it is safe to update mounted // volume list retrieved from actual state. ReconcilerStatesHasBeenSynced() bool
// VolumeIsAttached returns true if the given volume is attached to this // node. VolumeIsAttached(volumeName v1.UniqueVolumeName) bool
// Marks the specified volume as having successfully been reported as "in // use" in the nodes's volume status. MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)}
复制代码

两个关键结构体

(1)desiredStateOfWorld: 集群中期望要达到的数据卷挂载状态,简称 DSW。假设集群内新调度了一个 Pod,此时要用到 volume,Pod 被分配到某节点 NodeA 上。 此时,对于 AD controller 来说,DSW 中节点 NodeA 应该有被分配的 volume 在准备被这个 Pod 挂载。


(2)actualStateOfWorld: 集群中实际存在的数据卷挂载状态,简称 ASW。实际状态未必是和期望状态一样,比如实际状态 Node 上有刚调度过来的 Pod,但是还没有相应已经 attached 状态的 volume。

actualStateOfWorld 相关结构体

actualStateOfWorld

实际存储挂载状态结构体。


actualStateOfWorld: 实际存储挂载状态,简称 ASW。包括了已经成功挂载到 node 节点的存储,以及已经成功挂载该存储的 pod 列表。


主要属性 attachedVolumes,数据结构 map,key 为已经成功挂载到 node 的存储名称,value 为已经成功挂载到 node 节点的存储信息。


// pkg/kubelet/volumemanager/cache/actual_state_of_world.gotype actualStateOfWorld struct {  // nodeName is the name of this node. This value is passed to Attach/Detach  nodeName types.NodeName
// attachedVolumes is a map containing the set of volumes the kubelet volume // manager believes to be successfully attached to this node. Volume types // that do not implement an attacher interface are assumed to be in this // state by default. // The key in this map is the name of the volume and the value is an object // containing more information about the attached volume. attachedVolumes map[v1.UniqueVolumeName]attachedVolume
// volumePluginMgr is the volume plugin manager used to create volume // plugin objects. volumePluginMgr *volume.VolumePluginMgr sync.RWMutex}
复制代码

attachedVolume

主要属性 mountedPods,数据结构 map,key 为 pod 名称,value 为已经成功挂载了该存储的 pod 列表。


// pkg/kubelet/volumemanager/cache/actual_state_of_world.go// attachedVolume represents a volume the kubelet volume manager believes to be// successfully attached to a node it is managing. Volume types that do not// implement an attacher are assumed to be in this state.type attachedVolume struct {  // volumeName contains the unique identifier for this volume.  volumeName v1.UniqueVolumeName
// mountedPods is a map containing the set of pods that this volume has been // successfully mounted to. The key in this map is the name of the pod and // the value is a mountedPod object containing more information about the // pod. mountedPods map[volumetypes.UniquePodName]mountedPod
// spec is the volume spec containing the specification for this volume. // Used to generate the volume plugin object, and passed to plugin methods. // In particular, the Unmount method uses spec.Name() as the volumeSpecName // in the mount path: // /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{volumeSpecName}/ spec *volume.Spec
// pluginName is the Unescaped Qualified name of the volume plugin used to // attach and mount this volume. It is stored separately in case the full // volume spec (everything except the name) can not be reconstructed for a // volume that should be unmounted (which would be the case for a mount path // read from disk without a full volume spec). pluginName string
// pluginIsAttachable indicates the volume plugin used to attach and mount // this volume implements the volume.Attacher interface pluginIsAttachable bool
// globallyMounted indicates that the volume is mounted to the underlying // device at a global mount point. This global mount point must be unmounted // prior to detach. globallyMounted bool
// devicePath contains the path on the node where the volume is attached for // attachable volumes devicePath string
// deviceMountPath contains the path on the node where the device should // be mounted after it is attached. deviceMountPath string}
复制代码

mountedPod

pod 相关信息。


// pkg/kubelet/volumemanager/cache/actual_state_of_world.go// The mountedPod object represents a pod for which the kubelet volume manager// believes the underlying volume has been successfully been mounted.type mountedPod struct {  // the name of the pod  podName volumetypes.UniquePodName
// the UID of the pod podUID types.UID
// mounter used to mount mounter volume.Mounter
// mapper used to block volumes support blockVolumeMapper volume.BlockVolumeMapper
// spec is the volume spec containing the specification for this volume. // Used to generate the volume plugin object, and passed to plugin methods. // In particular, the Unmount method uses spec.Name() as the volumeSpecName // in the mount path: // /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{volumeSpecName}/ volumeSpec *volume.Spec
// outerVolumeSpecName is the volume.Spec.Name() of the volume as referenced // directly in the pod. If the volume was referenced through a persistent // volume claim, this contains the volume.Spec.Name() of the persistent // volume claim outerVolumeSpecName string
// remountRequired indicates the underlying volume has been successfully // mounted to this pod but it should be remounted to reflect changes in the // referencing pod. // Atomically updating volumes depend on this to update the contents of the // volume. All volume mounting calls should be idempotent so a second mount // call for volumes that do not need to update contents should not fail. remountRequired bool
// volumeGidValue contains the value of the GID annotation, if present. volumeGidValue string
// fsResizeRequired indicates the underlying volume has been successfully // mounted to this pod but its size has been expanded after that. fsResizeRequired bool}
复制代码

desiredStateOfWorld 相关结构体

desiredStateOfWorld

期望存储挂载状态结构体。


desiredStateOfWorld: 期望的存储挂载状态,简称 DSW。包括了期望挂载到 node 节点的存储,以及期望挂载该存储的 pod 列表。


主要属性 volumesToMount,数据结构 map,key 为期望挂载到该 node 节点的存储,value 为该存储相关信息。


// pkg/kubelet/volumemanager/cache/actual_state_of_world.gotype desiredStateOfWorld struct {  // volumesToMount is a map containing the set of volumes that should be  // attached to this node and mounted to the pods referencing it. The key in  // the map is the name of the volume and the value is a volume object  // containing more information about the volume.  volumesToMount map[v1.UniqueVolumeName]volumeToMount  // volumePluginMgr is the volume plugin manager used to create volume  // plugin objects.  volumePluginMgr *volume.VolumePluginMgr  // podErrors are errors caught by desiredStateOfWorldPopulator about volumes for a given pod.  podErrors map[types.UniquePodName]sets.String
sync.RWMutex}
复制代码

volumeToMount

主要属性 podsToMount,数据结构 map,key 为 pod 名称,value 为期望挂载该存储的所有 pod 的相关信息。


// pkg/kubelet/volumemanager/cache/actual_state_of_world.go// The volume object represents a volume that should be attached to this node,// and mounted to podsToMount.type volumeToMount struct {  // volumeName contains the unique identifier for this volume.  volumeName v1.UniqueVolumeName
// podsToMount is a map containing the set of pods that reference this // volume and should mount it once it is attached. The key in the map is // the name of the pod and the value is a pod object containing more // information about the pod. podsToMount map[types.UniquePodName]podToMount
// pluginIsAttachable indicates that the plugin for this volume implements // the volume.Attacher interface pluginIsAttachable bool
// pluginIsDeviceMountable indicates that the plugin for this volume implements // the volume.DeviceMounter interface pluginIsDeviceMountable bool
// volumeGidValue contains the value of the GID annotation, if present. volumeGidValue string
// reportedInUse indicates that the volume was successfully added to the // VolumesInUse field in the node's status. reportedInUse bool
// desiredSizeLimit indicates the desired upper bound on the size of the volume // (if so implemented) desiredSizeLimit *resource.Quantity}
复制代码

podToMount

podToMount 结构体主要记录了 pod 信息。


// pkg/kubelet/volumemanager/cache/actual_state_of_world.go// The pod object represents a pod that references the underlying volume and// should mount it once it is attached.type podToMount struct {  // podName contains the name of this pod.  podName types.UniquePodName
// Pod to mount the volume to. Used to create NewMounter. pod *v1.Pod
// volume spec containing the specification for this volume. Used to // generate the volume plugin object, and passed to plugin methods. // For non-PVC volumes this is the same as defined in the pod object. For // PVC volumes it is from the dereferenced PV object. volumeSpec *volume.Spec
// outerVolumeSpecName is the volume.Spec.Name() of the volume as referenced // directly in the pod. If the volume was referenced through a persistent // volume claim, this contains the volume.Spec.Name() of the persistent // volume claim outerVolumeSpecName string}
复制代码

方法入口分析

kubelet 管理 volume 的方式基于两个不同的状态:


(1)DesiredStateOfWorld:预期中,volume 的挂载情况,简称预期状态。从 pod 对象中获取预期状态;


(2)ActualStateOfWorld:实际中,voluem 的挂载情况,简称实际状态。从 node.Status.VolumesAttached 获取实际状态,并根据调谐更新实际状态。


Run 方法中主要包含了 2 个方法:


(1)vm.desiredStateOfWorldPopulator.Run


(2)vm.reconciler.Run


下面将一一分析。


// pkg/kubelet/volumemanager/volume_manager.gofunc (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {  defer runtime.HandleCrash()        // 从apiserver同步pod信息,来更新DesiredStateOfWorld  go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)  klog.V(2).Infof("The desired_state_of_world populator starts")
klog.Infof("Starting Kubelet Volume Manager") // 预期状态和实际状态的协调者,负责调整实际状态至预期状态 go vm.reconciler.Run(stopCh)
metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)
if vm.kubeClient != nil { // start informer for CSIDriver vm.volumePluginMgr.Run(stopCh) }
<-stopCh klog.Infof("Shutting down Kubelet Volume Manager")}
复制代码

1 vm.desiredStateOfWorldPopulator.Run

主要逻辑为调用 dswp.populatorLoop(),根据 pod 的 volume 信息,来更新 DesiredStateOfWorld;并且不断循环调用该方法,来不断更新 DesiredStateOfWorld。


dswp.loopSleepDuration 的值为 100ms。


// pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.gofunc (dswp *desiredStateOfWorldPopulator) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {  // Wait for the completion of a loop that started after sources are all ready, then set hasAddedPods accordingly  klog.Infof("Desired state populator starts to run")  wait.PollUntil(dswp.loopSleepDuration, func() (bool, error) {    done := sourcesReady.AllReady()    dswp.populatorLoop()    return done, nil  }, stopCh)  dswp.hasAddedPodsLock.Lock()  dswp.hasAddedPods = true  dswp.hasAddedPodsLock.Unlock()  wait.Until(dswp.populatorLoop, dswp.loopSleepDuration, stopCh)}
复制代码

1.1 dswp.populatorLoop()

两个关键方法:


dswp.findAndAddNewPods():添加 volume 进 DesiredStateOfWorld;


dswp.findAndRemoveDeletedPods():从 DesiredStateOfWorld 删除 volume。


// pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.gofunc (dswp *desiredStateOfWorldPopulator) populatorLoop() {  dswp.findAndAddNewPods()
// findAndRemoveDeletedPods() calls out to the container runtime to // determine if the containers for a given pod are terminated. This is // an expensive operation, therefore we limit the rate that // findAndRemoveDeletedPods() is called independently of the main // populator loop. if time.Since(dswp.timeOfLastGetPodStatus) < dswp.getPodStatusRetryDuration { klog.V(5).Infof( "Skipping findAndRemoveDeletedPods(). Not permitted until %v (getPodStatusRetryDuration %v).", dswp.timeOfLastGetPodStatus.Add(dswp.getPodStatusRetryDuration), dswp.getPodStatusRetryDuration)
return }
dswp.findAndRemoveDeletedPods()}
复制代码


1.1.1 dswp.findAndAddNewPods()


主要逻辑:


(1)如果 kubelet 开启了 features.ExpandInUsePersistentVolumes,处理一下 map mountedVolumesForPod,用于后续处理标记存储扩容逻辑;


(2)遍历 pod 列表,调用 dswp.processPodVolumes 将 pod 中的 volume 添加到 DesiredStateOfWorld;同时 processPodVolumes 方法里有标记某存储是否需要扩容的逻辑,用于后续触发存储扩容操作。


// pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go// Iterate through all pods and add to desired state of world if they don't// exist but shouldfunc (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {  // Map unique pod name to outer volume name to MountedVolume.  mountedVolumesForPod := make(map[volumetypes.UniquePodName]map[string]cache.MountedVolume)  if utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) {    for _, mountedVolume := range dswp.actualStateOfWorld.GetMountedVolumes() {      mountedVolumes, exist := mountedVolumesForPod[mountedVolume.PodName]      if !exist {        mountedVolumes = make(map[string]cache.MountedVolume)        mountedVolumesForPod[mountedVolume.PodName] = mountedVolumes      }      mountedVolumes[mountedVolume.OuterVolumeSpecName] = mountedVolume    }  }
processedVolumesForFSResize := sets.NewString() for _, pod := range dswp.podManager.GetPods() { // pod如果terminated了,则跳过该pod的volume if dswp.isPodTerminated(pod) { // Do not (re)add volumes for terminated pods continue } // 将pod中的volume添加到 the desired state of the world dswp.processPodVolumes(pod, mountedVolumesForPod, processedVolumesForFSResize) }}
func (dswp *desiredStateOfWorldPopulator) isPodTerminated(pod *v1.Pod) bool { podStatus, found := dswp.podStatusProvider.GetPodStatus(pod.UID) if !found { podStatus = pod.Status } return util.IsPodTerminated(pod, podStatus)}
复制代码


pod 属于 terminated 的判断看下列代码:


// pkg/volume/util/util.go// IsPodTerminated checks if pod is terminatedfunc IsPodTerminated(pod *v1.Pod, podStatus v1.PodStatus) bool {  return podStatus.Phase == v1.PodFailed || podStatus.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(podStatus.ContainerStatuses))}
// notRunning returns true if every status is terminated or waiting, or the status list// is empty.func notRunning(statuses []v1.ContainerStatus) bool { for _, status := range statuses { if status.State.Terminated == nil && status.State.Waiting == nil { return false } } return true}
复制代码


dswp.processPodVolumes()主要逻辑:


(1)调用 dswp.podPreviouslyProcessed 判断指定 pod 的 volume 是否已经被处理过了,处理过则直接返回;


(2)调用 dswp.desiredStateOfWorld.AddPodToVolume 将 pod 的 volume 信息加入 desiredStateOfWorld 中;


(3)如果开启了 features.ExpandInUsePersistentVolumes,则调用 dswp.checkVolumeFSResize 来标记需要扩容的 volume 信息;


(4)调用 dswp.markPodProcessed 标记该 pod 的 volume 信息已被处理。


// pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go// processPodVolumes processes the volumes in the given pod and adds them to the// desired state of the world.func (dswp *desiredStateOfWorldPopulator) processPodVolumes(  pod *v1.Pod,  mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume,  processedVolumesForFSResize sets.String) {  if pod == nil {    return  }
uniquePodName := util.GetUniquePodName(pod) if dswp.podPreviouslyProcessed(uniquePodName) { return }
allVolumesAdded := true mounts, devices := util.GetPodVolumeNames(pod)
expandInUsePV := utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) // Process volume spec for each volume defined in pod for _, podVolume := range pod.Spec.Volumes { if !mounts.Has(podVolume.Name) && !devices.Has(podVolume.Name) { // Volume is not used in the pod, ignore it. klog.V(4).Infof("Skipping unused volume %q for pod %q", podVolume.Name, format.Pod(pod)) continue }
pvc, volumeSpec, volumeGidValue, err := dswp.createVolumeSpec(podVolume, pod.Name, pod.Namespace, mounts, devices) if err != nil { klog.Errorf( "Error processing volume %q for pod %q: %v", podVolume.Name, format.Pod(pod), err) dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error()) allVolumesAdded = false continue }
// for local volume err = dswp.checkLocalVolumePV(pod, volumeSpec) if err != nil { klog.Errorf( "Error processing volume %q for pod %q: %v", podVolume.Name, format.Pod(pod), err) allVolumesAdded = false continue }

// Add volume to desired state of world _, err = dswp.desiredStateOfWorld.AddPodToVolume( uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue) if err != nil { klog.Errorf( "Failed to add volume %s (specName: %s) for pod %q to desiredStateOfWorld: %v", podVolume.Name, volumeSpec.Name(), uniquePodName, err) dswp.desiredStateOfWorld.AddErrorToPod(uniquePodName, err.Error()) allVolumesAdded = false } else { klog.V(4).Infof( "Added volume %q (volSpec=%q) for pod %q to desired state.", podVolume.Name, volumeSpec.Name(), uniquePodName) }
if expandInUsePV { dswp.checkVolumeFSResize(pod, podVolume, pvc, volumeSpec, uniquePodName, mountedVolumesForPod, processedVolumesForFSResize) } }
// some of the volume additions may have failed, should not mark this pod as fully processed if allVolumesAdded { dswp.markPodProcessed(uniquePodName) // New pod has been synced. Re-mount all volumes that need it // (e.g. DownwardAPI) dswp.actualStateOfWorld.MarkRemountRequired(uniquePodName) // Remove any stored errors for the pod, everything went well in this processPodVolumes dswp.desiredStateOfWorld.PopPodErrors(uniquePodName) }
}
复制代码


再来看一个关键方法 checkVolumeFSResize():


与存储扩容相关,当 pv.Spec.Capacity 大小大于 pvc.Status.Capacity 时,将该存储标记为需要扩容


// pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go// checkVolumeFSResize checks whether a PVC mounted by the pod requires file// system resize or not. If so, marks this volume as fsResizeRequired in ASW.// - mountedVolumesForPod stores all mounted volumes in ASW, because online//   volume resize only considers mounted volumes.// - processedVolumesForFSResize stores all volumes we have checked in current loop,//   because file system resize operation is a global operation for volume, so//   we only need to check it once if more than one pod use it.func (dswp *desiredStateOfWorldPopulator) checkVolumeFSResize(  pod *v1.Pod,  podVolume v1.Volume,  pvc *v1.PersistentVolumeClaim,  volumeSpec *volume.Spec,  uniquePodName volumetypes.UniquePodName,  mountedVolumesForPod map[volumetypes.UniquePodName]map[string]cache.MountedVolume,  processedVolumesForFSResize sets.String) {  if podVolume.PersistentVolumeClaim == nil {    // Only PVC supports resize operation.    return  }  uniqueVolumeName, exist := getUniqueVolumeName(uniquePodName, podVolume.Name, mountedVolumesForPod)  if !exist {    // Volume not exist in ASW, we assume it hasn't been mounted yet. If it needs resize,    // it will be handled as offline resize(if it indeed hasn't been mounted yet),    // or online resize in subsequent loop(after we confirm it has been mounted).    return  }  if processedVolumesForFSResize.Has(string(uniqueVolumeName)) {    // File system resize operation is a global operation for volume,    // so we only need to check it once if more than one pod use it.    return  }  // volumeSpec.ReadOnly is the value that determines if volume could be formatted when being mounted.  // This is the same flag that determines filesystem resizing behaviour for offline resizing and hence  // we should use it here. This value comes from Pod.spec.volumes.persistentVolumeClaim.readOnly.  if volumeSpec.ReadOnly {    // This volume is used as read only by this pod, we don't perform resize for read only volumes.    klog.V(5).Infof("Skip file system resize check for volume %s in pod %s/%s "+      "as the volume is mounted as readonly", podVolume.Name, pod.Namespace, pod.Name)    return  }  if volumeRequiresFSResize(pvc, volumeSpec.PersistentVolume) {    dswp.actualStateOfWorld.MarkFSResizeRequired(uniqueVolumeName, uniquePodName)  }  processedVolumesForFSResize.Insert(string(uniqueVolumeName))}
// pv.Spec.Capacity大小大于pvc.Status.Capacity时返回truefunc volumeRequiresFSResize(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool { capacity := pvc.Status.Capacity[v1.ResourceStorage] requested := pv.Spec.Capacity[v1.ResourceStorage] return requested.Cmp(capacity) > 0}
复制代码


1.1.2 dswp.findAndRemoveDeletedPods


主要逻辑:


(1)从 dswp.desiredStateOfWorld 中获取已挂载 volume 的 pod,从 podManager 获取指定 pod 是否存在,不存在则再从 containerRuntime 中查询 pod 中的容器是否都已 terminated,如以上两个条件都符合,则继续往下执行,否则直接返回;


(2)调用 dswp.desiredStateOfWorld.DeletePodFromVolume,将 pod 从 dsw.volumesToMount[volumeName].podsToMount[podName]中去除,即将 volume 挂载到指定 pod 的信息从 desiredStateOfWorld 中去除。


// pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go// Iterate through all pods in desired state of world, and remove if they no// longer existfunc (dswp *desiredStateOfWorldPopulator) findAndRemoveDeletedPods() {  var runningPods []*kubecontainer.Pod
runningPodsFetched := false for _, volumeToMount := range dswp.desiredStateOfWorld.GetVolumesToMount() { pod, podExists := dswp.podManager.GetPodByUID(volumeToMount.Pod.UID) if podExists { // Skip running pods if !dswp.isPodTerminated(pod) { continue } if dswp.keepTerminatedPodVolumes { continue } }
// Once a pod has been deleted from kubelet pod manager, do not delete // it immediately from volume manager. Instead, check the kubelet // containerRuntime to verify that all containers in the pod have been // terminated. if !runningPodsFetched { var getPodsErr error runningPods, getPodsErr = dswp.kubeContainerRuntime.GetPods(false) if getPodsErr != nil { klog.Errorf( "kubeContainerRuntime.findAndRemoveDeletedPods returned error %v.", getPodsErr) continue }
runningPodsFetched = true dswp.timeOfLastGetPodStatus = time.Now() }
runningContainers := false for _, runningPod := range runningPods { if runningPod.ID == volumeToMount.Pod.UID { if len(runningPod.Containers) > 0 { runningContainers = true }
break } }
if runningContainers { klog.V(4).Infof( "Pod %q still has one or more containers in the non-exited state. Therefore, it will not be removed from desired state.", format.Pod(volumeToMount.Pod)) continue } exists, _, _ := dswp.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) if !exists && podExists { klog.V(4).Infof( volumeToMount.GenerateMsgDetailed(fmt.Sprintf("Actual state has not yet has this volume mounted information and pod (%q) still exists in pod manager, skip removing volume from desired state", format.Pod(volumeToMount.Pod)), "")) continue } klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Removing volume from desired state", ""))
dswp.desiredStateOfWorld.DeletePodFromVolume( volumeToMount.PodName, volumeToMount.VolumeName) dswp.deleteProcessedPod(volumeToMount.PodName) }
podsWithError := dswp.desiredStateOfWorld.GetPodsWithErrors() for _, podName := range podsWithError { if _, podExists := dswp.podManager.GetPodByUID(types.UID(podName)); !podExists { dswp.desiredStateOfWorld.PopPodErrors(podName) } }}
复制代码

2 vm.reconciler.Run

主要是调用 rc.reconcile(),做存储的预期状态和实际状态的协调,负责调整实际状态至预期状态。


dswp.loopSleepDuration 的值为 100ms。


// pkg/kubelet/volumemanager/reconciler/reconciler.gofunc (rc *reconciler) Run(stopCh <-chan struct{}) {  wait.Until(rc.reconciliationLoopFunc(), rc.loopSleepDuration, stopCh)}
func (rc *reconciler) reconciliationLoopFunc() func() { return func() { rc.reconcile()
// Sync the state with the reality once after all existing pods are added to the desired state from all sources. // Otherwise, the reconstruct process may clean up pods' volumes that are still in use because // desired state of world does not contain a complete list of pods. if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() { klog.Infof("Reconciler: start to sync state") rc.sync() } }}
复制代码

2.1 rc.reconcile

rc.reconcile()主要逻辑:


(1)对于实际已经挂载了的存储,如果期望挂载信息中无该存储,或期望挂载存储的 pod 列表中没有该 pod,则指定 pod 的指定存储需要 unmount(最终调用 csi.NodeUnpublishVolume);


(2)从 desiredStateOfWorld 中获取需要 mount 到 pod 的 volome 信息列表:


a.当存储未 attach 到 node 时:调用方法将存储先 attach 到 node 上(此处会判断是不是由 kubelet 来做 attach 操作,是则创建 VolumeAttachment 对象并等待该对象的.status.attached 的值为 true,不是则等待 AD controller 来做 attach 操作,kubelet 将会根据 node 对象的.Status.VolumesAttached 属性来判断该存储是否已 attach 到 node 上);


b.当存储 attach 后,但未 mount 给 pod 或者需要 remount 时:调用方法进行 volume mount(最终调用 csi.NodeStageVolume 与 csi.NodePublishVolume);


c.当存储需要扩容时,调用方法进行存储扩容(最终调用 csi.NodeExpandVolume);


(3)对比 actualStateOfWorld,从 desiredStateOfWorld 中获取需要 detached 的 volomes(detached 意思为把存储从 node 上解除挂载):


a.当 actualStateOfWorld 中表明,某 volume 没有被任何 pod 挂载,且 desiredStateOfWorld 中也不期望该 volume 被任何 pod 挂载,且 attachedVolume.GloballyMounted 属性为 true 时(device 与 global mount path 的挂载关系还在),会调用到 UnmountDevice,主要是调用 csi.NodeUnstageVolume 解除 node 上 global mount path 的存储挂载;


b.当 actualStateOfWorld 中表明,某 volume 没有被任何 pod 挂载,且 desiredStateOfWorld 中也不存在该 volume,且 attachedVolume.GloballyMounted 属性为 false 时(已经调用过 UnmountDevice,device 与 global mount path 的挂载关系已解除),会调用到 UnmountDevice,主要是从 etcd 中删除 VolumeAttachment 对象,并等待删除成功。


reconcile()涉及主要方法:


(1)rc.operationExecutor.UnmountVolume:当 actualStateOfWorld 中表明,pod 已经挂载了某 volume,但 desiredStateOfWorld 中期望挂载某 volume 的 pod 列表中不存在该 pod 时(即表明存储已经挂载给 pod,但该 pod 已经不存在了,需要解除该挂载),会调用到 UnmountVolume,主要是调用 csi.NodeUnpublishVolume 将 pod mount path 解除挂载;


(2)rc.operationExecutor.AttachVolume:当 actualStateOfWorld 中已经挂载到 node 节点的 volume 信息中不存在某 volume,但 desiredStateOfWorld 中期望某 volume 挂载到 node 节点上时(即表明需要挂载到 node 节点的存储未挂载),会调用到 AttachVolume,主要是创建 VolumeAttachment 对象,并等待其.status.attached 属性值更新为 true;


(3)rc.operationExecutor.MountVolume:当 desiredStateOfWorld 中期望某 volume 挂载给某 pod,但 actualStateOfWorld 中表明该 volume 并没有挂载给该 pod,且该 volume 已经挂载到了 node 节点上,(或者该 pod 的 volume 需要 remount),会调用到 MountVolume,主要是调用 csi.NodeStageVolume 将存储挂载到 node 上的 global mount path,调用 csi.NodePublishVolume 将存储从 global mount path 挂载到 pod mount path;


(4)rc.operationExecutor.ExpandInUseVolume:主要负责在 controller 端的存储扩容操作完成后,做 node 端的存储扩容操作(后续会单独分析存储扩容操作)。


(5)rc.operationExecutor.UnmountDevice:当 actualStateOfWorld 中表明,某 volume 没有被任何 pod 挂载,且 desiredStateOfWorld 中也不期望该 volume 被任何 pod 挂载,且 attachedVolume.GloballyMounted 属性为 true 时(device 与 global mount path 的挂载关系还在),会调用到 UnmountDevice,主要是调用 csi.NodeUnstageVolume 解除 node 上 global mount path 的存储挂载;


(6)rc.operationExecutor.DetachVolume:当 actualStateOfWorld 中表明,某 volume 没有被任何 pod 挂载,且 desiredStateOfWorld 中也不存在该 volume,且 attachedVolume.GloballyMounted 属性为 false 时(已经调用过 UnmountDevice,device 与 global mount path 的挂载关系已解除),会调用到 UnmountDevice,主要是从 etcd 中删除 VolumeAttachment 对象,并等待删除成功。


pod 挂载存储的调用流程:AttachVolume(csi-attacher.Attach) --> MountVolume(csi-attacher.MountDevice --> csi-mounter.SetUp)


解除 pod 存储挂载的调用流程:UnmountVolume(csi-mounter.TearDown) --> UnmountDevice(csi-attacher.UnmountDevice) --> DetachVolume(csi-attcher.Detach)


另外说明:controllerAttachDetachEnabled该参数的值由kubelet启动参数--enable-controller-attach-detach决定,该启动参数设置为 true 表示启用 Attach/Detach controller进行Attach/Detach 操作,同时禁用 kubelet 执行 Attach/Detach 操作(默认值为 true)。对于csi plugin来说,实际上Attach/Detach 操作只是创建/删除VolumeAttachment对象。
复制代码


// pkg/kubelet/volumemanager/reconciler/reconciler.go
func (rc *reconciler) reconcile() { // Unmounts are triggered before mounts so that a volume that was // referenced by a pod that was deleted and is now referenced by another // pod is unmounted from the first pod before being mounted to the new // pod. // 对于实际已经挂载了的存储,如果期望挂载信息中无该存储,或期望挂载存储的pod列表中没有该pod,则指定pod的指定存储需要unmount // Ensure volumes that should be unmounted are unmounted. // 从实际挂载信息结构体中获取存储挂载信息 for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() { // 判断期望挂载信息结构体中是否存在指定存储挂载到指定pod的信息,如果不存在,则调用rc.operationExecutor.UnmountVolume 从指定pod中unmount掉指定存储 if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) { // Volume is mounted, unmount it klog.V(5).Infof(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", "")) err := rc.operationExecutor.UnmountVolume( mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir) if err != nil && !nestedpendingoperations.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. klog.Errorf(mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error()) } if err == nil { klog.Infof(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", "")) } } } // 从desiredStateOfWorld中获取需要mount到pod的volome信息列表 // Ensure volumes that should be attached/mounted are attached/mounted. for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() { // 调用rc.actualStateOfWorld.PodExistsInVolume查询指定volume是否已mount到指定pod volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName) volumeToMount.DevicePath = devicePath // 当存储未attach到node时,调用方法将存储先attach到node上 if cache.IsVolumeNotAttachedError(err) { // 判断是否是controller来进行Attach/Detach操作 if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable { // Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait // for controller to finish attaching volume. klog.V(5).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", "")) // 如果是controller来进行Attach/Detach操作,则调用VerifyControllerAttachedVolume方法来判断controller是否已经执行完Attach/Detach操作 err := rc.operationExecutor.VerifyControllerAttachedVolume( volumeToMount.VolumeToMount, rc.nodeName, rc.actualStateOfWorld) if err != nil && !nestedpendingoperations.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error()) } if err == nil { klog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", "")) } } else { // Volume is not attached to node, kubelet attach is enabled, volume implements an attacher, // so attach it volumeToAttach := operationexecutor.VolumeToAttach{ VolumeName: volumeToMount.VolumeName, VolumeSpec: volumeToMount.VolumeSpec, NodeName: rc.nodeName, } klog.V(5).Infof(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", "")) err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld) if err != nil && !nestedpendingoperations.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error()) } if err == nil { klog.Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", "")) } } // 当存储attach后,但未mount给pod或者需要remount时,调用方法进行volume mount } else if !volMounted || cache.IsRemountRequiredError(err) { // Volume is not mounted, or is already mounted, but requires remounting remountingLogStr := "" isRemount := cache.IsRemountRequiredError(err) if isRemount { remountingLogStr = "Volume is already mounted to pod, but remount was requested." } klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr)) err := rc.operationExecutor.MountVolume( rc.waitForAttachTimeout, volumeToMount.VolumeToMount, rc.actualStateOfWorld, isRemount) if err != nil && !nestedpendingoperations.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. klog.Errorf(volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error()) } if err == nil { if remountingLogStr == "" { klog.V(1).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr)) } else { klog.V(5).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr)) } } // 当存储需要扩容时,调用方法进行存储扩容 } else if cache.IsFSResizeRequiredError(err) && utilfeature.DefaultFeatureGate.Enabled(features.ExpandInUsePersistentVolumes) { klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", "")) err := rc.operationExecutor.ExpandInUseVolume( volumeToMount.VolumeToMount, rc.actualStateOfWorld) if err != nil && !nestedpendingoperations.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. klog.Errorf(volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error()) } if err == nil { klog.V(4).Infof(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", "")) } } } // 对比actualStateOfWorld,从desiredStateOfWorld中获取需要detached的volomes(detached意思为把存储从node上解除挂载) // Ensure devices that should be detached/unmounted are detached/unmounted. for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() { // Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting. if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) && !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) { // 当volume已经从node上解除挂载后,GloballyMounted的值被赋值为false if attachedVolume.GloballyMounted { // Volume is globally mounted to device, unmount it klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", "")) err := rc.operationExecutor.UnmountDevice( attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.hostutil) if err != nil && !nestedpendingoperations.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { // Ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. klog.Errorf(attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountDevice failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error()) } if err == nil { klog.Infof(attachedVolume.GenerateMsgDetailed("operationExecutor.UnmountDevice started", "")) } } else { // Volume is attached to node, detach it // Kubelet not responsible for detaching or this volume has a non-attachable volume plugin. if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable { rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName) klog.Infof(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath))) } else { // Only detach if kubelet detach is enabled klog.V(5).Infof(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", "")) err := rc.operationExecutor.DetachVolume( attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld) if err != nil && !nestedpendingoperations.IsAlreadyExists(err) && !exponentialbackoff.IsExponentialBackoff(err) { // Ignore nestedpendingoperations.IsAlreadyExists && exponentialbackoff.IsExponentialBackoff errors, they are expected. // Log all other errors. klog.Errorf(attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.DetachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error()) } if err == nil { klog.Infof(attachedVolume.GenerateMsgDetailed("operationExecutor.DetachVolume started", "")) } } } } }}
复制代码


2.1.1 rc.operationExecutor.VerifyControllerAttachedVolume


因为 attach/detach 操作由 AD controller 来完成,所以 volume manager 只能通过 node 对象来获取指定 volume 是否已经 attach,如已经 attach,则更新 actualStateOfWorld。


主要逻辑:从 node 对象中获取.Status.VolumesAttached,从而判断 volume 是否已经 attach,然后更新 actualStateOfWorld。


// pkg/volume/util/operationexecutor/operation_executor.gofunc (oe *operationExecutor) VerifyControllerAttachedVolume(  volumeToMount VolumeToMount,  nodeName types.NodeName,  actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {  generatedOperations, err :=    oe.operationGenerator.GenerateVerifyControllerAttachedVolumeFunc(volumeToMount, nodeName, actualStateOfWorld)  if err != nil {    return err  }
return oe.pendingOperations.Run( volumeToMount.VolumeName, "" /* podName */, generatedOperations)}
func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) { volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) if err != nil || volumePlugin == nil { return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.FindPluginBySpec failed", err) }
verifyControllerAttachedVolumeFunc := func() (error, error) { if !volumeToMount.PluginIsAttachable { // If the volume does not implement the attacher interface, it is // assumed to be attached and the actual state of the world is // updated accordingly.
addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached( volumeToMount.VolumeName, volumeToMount.VolumeSpec, nodeName, "" /* devicePath */) if addVolumeNodeErr != nil { // On failure, return error. Caller will log and retry. return volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed", addVolumeNodeErr) }
return nil, nil }
if !volumeToMount.ReportedInUse { // If the given volume has not yet been added to the list of // VolumesInUse in the node's volume status, do not proceed, return // error. Caller will log and retry. The node status is updated // periodically by kubelet, so it may take as much as 10 seconds // before this clears. // Issue #28141 to enable on demand status updates. return volumeToMount.GenerateError("Volume has not been added to the list of VolumesInUse in the node's volume status", nil) }
// Fetch current node object node, fetchErr := og.kubeClient.CoreV1().Nodes().Get(string(nodeName), metav1.GetOptions{}) if fetchErr != nil { // On failure, return error. Caller will log and retry. return volumeToMount.GenerateError("VerifyControllerAttachedVolume failed fetching node from API server", fetchErr) }
if node == nil { // On failure, return error. Caller will log and retry. return volumeToMount.GenerateError( "VerifyControllerAttachedVolume failed", fmt.Errorf("Node object retrieved from API server is nil")) }
for _, attachedVolume := range node.Status.VolumesAttached { if attachedVolume.Name == volumeToMount.VolumeName { addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached( v1.UniqueVolumeName(""), volumeToMount.VolumeSpec, nodeName, attachedVolume.DevicePath) klog.Infof(volumeToMount.GenerateMsgDetailed("Controller attach succeeded", fmt.Sprintf("device path: %q", attachedVolume.DevicePath))) if addVolumeNodeErr != nil { // On failure, return error. Caller will log and retry. return volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttached failed", addVolumeNodeErr) } return nil, nil } }
// Volume not attached, return error. Caller will log and retry. return volumeToMount.GenerateError("Volume not attached according to node status", nil) }
return volumetypes.GeneratedOperations{ OperationName: "verify_controller_attached_volume", OperationFunc: verifyControllerAttachedVolumeFunc, CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToMount.VolumeSpec), "verify_controller_attached_volume"), EventRecorderFunc: nil, // nil because we do not want to generate event on error }, nil
}
复制代码

2.2 rc.sync()

rc.sync()调用时机:在 vm.desiredStateOfWorldPopulator.Run 中已经将所有 pod 的 volume 信息更新到了 desiredStateOfWorld 中。


rc.sync()主要逻辑:扫描 node 上所有 pod 目录下的 volume 目录,来更新 desiredStateOfWorld 与 actualStateOfWorld。


// pkg/kubelet/volumemanager/reconciler/reconciler.gofunc (rc *reconciler) reconciliationLoopFunc() func() {  return func() {    rc.reconcile()
// Sync the state with the reality once after all existing pods are added to the desired state from all sources. // Otherwise, the reconstruct process may clean up pods' volumes that are still in use because // desired state of world does not contain a complete list of pods. if rc.populatorHasAddedPods() && !rc.StatesHasBeenSynced() { klog.Infof("Reconciler: start to sync state") rc.sync() } }}
复制代码


// pkg/kubelet/volumemanager/reconciler/reconciler.go
// sync process tries to observe the real world by scanning all pods' volume directories from the disk.// If the actual and desired state of worlds are not consistent with the observed world, it means that some// mounted volumes are left out probably during kubelet restart. This process will reconstruct// the volumes and update the actual and desired states. For the volumes that cannot support reconstruction,// it will try to clean up the mount paths with operation executor.func (rc *reconciler) sync() { defer rc.updateLastSyncTime() rc.syncStates()}
// syncStates scans the volume directories under the given pod directory.// If the volume is not in desired state of world, this function will reconstruct// the volume related information and put it in both the actual and desired state of worlds.// For some volume plugins that cannot support reconstruction, it will clean up the existing// mount points since the volume is no long needed (removed from desired state)func (rc *reconciler) syncStates() { // Get volumes information by reading the pod's directory podVolumes, err := getVolumesFromPodDir(rc.kubeletPodsDir) if err != nil { klog.Errorf("Cannot get volumes from disk %v", err) return } volumesNeedUpdate := make(map[v1.UniqueVolumeName]*reconstructedVolume) volumeNeedReport := []v1.UniqueVolumeName{} for _, volume := range podVolumes { if rc.actualStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName) { klog.V(4).Infof("Volume exists in actual state (volume.SpecName %s, pod.UID %s), skip cleaning up mounts", volume.volumeSpecName, volume.podName) // There is nothing to reconstruct continue } volumeInDSW := rc.desiredStateOfWorld.VolumeExistsWithSpecName(volume.podName, volume.volumeSpecName)
reconstructedVolume, err := rc.reconstructVolume(volume) if err != nil { if volumeInDSW { // Some pod needs the volume, don't clean it up and hope that // reconcile() calls SetUp and reconstructs the volume in ASW. klog.V(4).Infof("Volume exists in desired state (volume.SpecName %s, pod.UID %s), skip cleaning up mounts", volume.volumeSpecName, volume.podName) continue } // No pod needs the volume. klog.Warningf("Could not construct volume information, cleanup the mounts. (pod.UID %s, volume.SpecName %s): %v", volume.podName, volume.volumeSpecName, err) rc.cleanupMounts(volume) continue } if volumeInDSW { // Some pod needs the volume. And it exists on disk. Some previous // kubelet must have created the directory, therefore it must have // reported the volume as in use. Mark the volume as in use also in // this new kubelet so reconcile() calls SetUp and re-mounts the // volume if it's necessary. volumeNeedReport = append(volumeNeedReport, reconstructedVolume.volumeName) klog.V(4).Infof("Volume exists in desired state (volume.SpecName %s, pod.UID %s), marking as InUse", volume.volumeSpecName, volume.podName) continue } // There is no pod that uses the volume. if rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, nestedpendingoperations.EmptyUniquePodName) { klog.Warning("Volume is in pending operation, skip cleaning up mounts") } klog.V(2).Infof( "Reconciler sync states: could not find pod information in desired state, update it in actual state: %+v", reconstructedVolume) volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume }
if len(volumesNeedUpdate) > 0 { if err = rc.updateStates(volumesNeedUpdate); err != nil { klog.Errorf("Error occurred during reconstruct volume from disk: %v", err) } } if len(volumeNeedReport) > 0 { rc.desiredStateOfWorld.MarkVolumesReportedInUse(volumeNeedReport) }}
复制代码

总结

volume manager 作用

volume manager 存在于 kubelet 中,主要是管理卷的 attach/detach(与 AD controller 作用相同,通过 kubelet 启动参数控制哪个组件来做该操作,后续会详细介绍)、mount/umount 等操作。

volume manager 中 pod 挂载存储的调用流程

AttachVolume(csi-attacher.Attach) --> MountVolume(csi-attacher.MountDevice --> csi-mounter.SetUp)

volume manager 中解除 pod 存储挂载的调用流程

UnmountVolume(csi-mounter.TearDown) --> UnmountDevice(csi-attacher.UnmountDevice) --> DetachVolume(csi-attcher.Detach)

volume manager 的 vm.reconciler.Run 中各个方法的调用链

(1)AttachVolume


Volume is not attached to node, kubelet attach is enabled


vm.reconciler.Run --> rc.operationExecutor.AttachVolume --> oe.operationGenerator.GenerateAttachVolumeFunc --> csi-attacher.Attach(pkg/volume/csi/csi_attacher.go)--> create VolumeAttachment


(2)MountVolume


Volume is not mounted, or is already mounted, but requires remounting


vm.reconciler.Run --> rc.operationExecutor.MountVolume --> oe.operationGenerator.GenerateMountVolumeFunc --> 1.csi-attacer.WaitForAttach(等待 VolumeAttachment 的.status.attached 属性值更新为 true) 2.csi-attacer.MountDevice(--


csi.NodeStageVolume) 3.csi-mounter.SetUp(-->csi.NodePublishVolume)


(3)ExpandInUseVolume


Volume is mounted, but it needs to resize


vm.reconciler.Run --> rc.operationExecutor.ExpandInUseVolume --> oe.operationGenerator.GenerateExpandInUseVolumeFunc --> og.doOnlineExpansion --> og.nodeExpandVolume --> expander.NodeExpand (pkg/volume/csi/expander.go) --> csi.NodeExpandVolume


(4)UnmountVolume


Volume is mounted, unmount it


vm.reconciler.Run --> rc.operationExecutor.UnmountVolume --> oe.operationGenerator.GenerateUnmountVolumeFunc --> csi-mounter.TearDown(pkg/volume/csi/csi_mounter.go)--> csi.NodeUnpublishVolume


(5)UnmountDevice


Volume is globally mounted to device, unmount it


vm.reconciler.Run --> rc.operationExecutor.UnmountDevice --> oe.operationGenerator.GenerateUnmountVolumeFunc --> csi-attacher.UnmountDevice --> csi.NodeUnstageVolume


(6)DetachVolume


Volume is attached to node, detach it


vm.reconciler.Run --> rc.operationExecutor.DetachVolume --> oe.operationGenerator.GenerateDetachVolumeFunc --> csi-attacher.Detach --> delete VolumeAttachment

发布于: 2021 年 06 月 06 日阅读数: 15
用户头像

良凯尔

关注

热爱的力量 2020.01.10 加入

kubernetes开发者

评论

发布
暂无评论
kubelet之volume manager源码分析