kubernetes ceph-csi 分析 - 目录导航:
https://xie.infoq.cn/article/4b1d3e32f124307a49cd9c1e3
当 ceph-csi 组件启动时指定的 driver type 为 cephfs 时,会启动 cephfs driver 相关的服务。然后再根据controllerserver
、nodeserver
的参数配置,决定启动ControllerServer
与IdentityServer
,或NodeServer
与IdentityServer
。
基于 tag v3.0.0
https://github.com/ceph/ceph-csi/releases/tag/v3.0.0
cephfs driver,与 rbd driver 类似,同样包括了 controllerserver、nodeserver 与 IdentityServer,且大部分方法逻辑一致,只是最后调用的 cli 命令稍有不同,所以大部分方法的分析可以参考 rbd driver 部分。
其中,controllerserver 主要包括了 CreateVolume(创建存储)、DeleteVolume(删除存储)、ControllerExpandVolume(存储扩容):
CreateVolume:调用 ceph 存储后端,创建存储(与 rbd 逻辑类似,不过 cephfs 这里创建的是目录,而不是 rbd image)。
DeleteVolume:调用 ceph 存储后端,删除存储(与 rbd 逻辑类似,不过 cephfs 这里删除的是目录,而不是 rbd image)。
ControllerExpandVolume:调用 ceph 存储后端,扩容存储(重新设置 cephfs 目录的 quota)。
nodeserver 主要包括了 NodeGetCapabilities(获取 driver 能力)、NodeGetVolumeStats(存储探测及 metrics 获取)、NodeStageVolume(mount stagingPath)、NodePublishVolume(mount targetPath)、NodeUnpublishVolume(umount targetPath)、NodeUnstageVolume(umount stagingPath):
NodeGetCapabilities:获取 ceph-csi driver 的能力。
NodeGetVolumeStats:探测挂载存储的状态,并返回该存储的相关 metrics 给 kubelet。
NodeStageVolume:将 cephfs 的远端目录挂载到 node 上的 staging path。
NodePublishVolume:将 NodeStageVolume 方法中的 staging path,mount 到 target path。
NodeUnpublishVolume:解除掉 stagingPath 到 targetPath 的挂载。
NodeUnstageVolume:将 cephfs 的远端目录到 node 上的 staging path 的挂载解除掉。
IdentityServer 主要包括了 GetPluginInfo(获取 driver 信息)、Probe(探测接口)、GetPluginCapabilities(获取 driver 能力)三个方法:
GetPluginInfo:用于获取该 ceph-csi driver 的信息,如 driver 名称、版本等。
Probe:一个探测接口,用于探测该 driver 是否启动。
GetPluginCapabilities:用于获取 driver 的能力。
cephfs 挂载知识讲解
cephfs 挂载分为 fuse 挂载和内核挂载。
一个 cephfs 存储挂载给 pod,一共分为 2 个步骤,分别如下:
1.kubelet 组件调用 cephfsType-nodeserver-ceph-csi 的 NodeStageVolume 方法,将 cephfs 的远端目录挂载到 node 上的 staging path;
2.kubelet 组件调用 cephfsType-nodeserver-ceph-csi 的 NodePublishVolume 方法,将上一步骤中的 staging path mount 到 target path。
可以看出,与 rbd image 挂载给 pod 相比,在 NodeStageVolume 方法中少了一个 map rbd/nbd device 的操作,同样的,在 NodeUnstageVolume 方法中也会少一个 unmap rbd/nbd device 的操作。
cephfs 解除挂载知识讲解
一个 cephfs 存储从 pod 中解除挂载,一共分为 2 个步骤,分别如下:
1.kubelet 组件调用 cephfsType-nodeserver-ceph-csi 的 NodeUnpublishVolume 方法,解除掉stagingPath
与targetPath
的挂载关系。
2.kubelet 组件调用 cephfsType-nodeserver-ceph-csi 的 NodeUnstageVolume 方法,先解除掉targetPath
到远端 cephfs 存储(目录)的挂载关系。
cephfs 存储挂载给 pod 后,node 上会出现 2 个 mount 关系,示例如下:
# mount | grep ceph-fuse
ceph-fuse on /home/cld/kubernetes/lib/kubelet/plugins/kubernetes.io/csi/pv/pvc-fa752c51-79d4-42f2-a3ff-9d7afe8767b5/globalmount type fuse.ceph-fuse (rw,nosuid,nodev,relatime,user_id=0,group_id=0,allow_other)
ceph-fuse on /home/cld/kubernetes/lib/kubelet/pods/87f7e220-8b2d-4cd3-8395-12794940fa2e/volumes/kubernetes.io~csi/pvc-fa752c51-79d4-42f2-a3ff-9d7afe8767b5/mount type fuse.ceph-fuse (rw,nosuid,nodev,relatime,user_id=0,group_id=0,allow_other,_netdev)
复制代码
其中/home/cld/kubernetes/lib/kubelet/plugins/kubernetes.io/csi/pv/pvc-fa752c51-79d4-42f2-a3ff-9d7afe8767b5/globalmount
为 staging path;而/home/cld/kubernetes/lib/kubelet/pods/87f7e220-8b2d-4cd3-8395-12794940fa2e/volumes/kubernetes.io~csi/pvc-fa752c51-79d4-42f2-a3ff-9d7afe8767b5/mount
为 target path。
cephfs driver 分析
下面将对 cephfs driver 中与 rbd driver 不一致的展开分析。
(1)cephfs 扩容逻辑
cephfs driver 没有 NodeExpandVolume(node 端存储扩容),与 rbd 存储扩容分为两大步骤不一样,cephfs 存储扩容只需一步,就是 csi 的 ControllerExpandVolume,主要负责将 cephfs 存储扩容(即重新设置 cephfs 目录的 quota)。
cephfs driver 的 NodeGetCapabilities 方法中,相比于 rbd driver,也少了 node 端存储扩容的能力。
// internal/cephfs/nodeserver.go
// NodeGetCapabilities returns the supported capabilities of the node server.
func (ns *NodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
return &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability{
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
},
},
},
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
},
},
},
},
}, nil
}
复制代码
(2)NodeStageVolume
将 cephfs 的远端目录挂载到 node 上的 staging path。
NodeStageVolume
主要逻辑:
(1)校验请求参数;
(2)构建 volOptions;
(3)检查 stagingPath 是否是挂载点;
(4)调用 ns.mount 进行挂载操作。
// internal/cephfs/nodeserver.go
// NodeStageVolume mounts the volume to a staging path on the node.
func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
var (
volOptions *volumeOptions
)
if err := util.ValidateNodeStageVolumeRequest(req); err != nil {
return nil, err
}
// Configuration
stagingTargetPath := req.GetStagingTargetPath()
volID := volumeID(req.GetVolumeId())
if acquired := ns.VolumeLocks.TryAcquire(req.GetVolumeId()); !acquired {
klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, req.GetVolumeId())
}
defer ns.VolumeLocks.Release(req.GetVolumeId())
volOptions, _, err := newVolumeOptionsFromVolID(ctx, string(volID), req.GetVolumeContext(), req.GetSecrets())
if err != nil {
if !errors.Is(err, ErrInvalidVolID) {
return nil, status.Error(codes.Internal, err.Error())
}
// gets mon IPs from the supplied cluster info
volOptions, _, err = newVolumeOptionsFromStaticVolume(string(volID), req.GetVolumeContext())
if err != nil {
if !errors.Is(err, ErrNonStaticVolume) {
return nil, status.Error(codes.Internal, err.Error())
}
// get mon IPs from the volume context
volOptions, _, err = newVolumeOptionsFromMonitorList(string(volID), req.GetVolumeContext(),
req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}
}
// Check if the volume is already mounted
isMnt, err := util.IsMountPoint(stagingTargetPath)
if err != nil {
klog.Errorf(util.Log(ctx, "stat failed: %v"), err)
return nil, status.Error(codes.Internal, err.Error())
}
if isMnt {
util.DebugLog(ctx, "cephfs: volume %s is already mounted to %s, skipping", volID, stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil
}
// It's not, mount now
if err = ns.mount(ctx, volOptions, req); err != nil {
return nil, err
}
util.DebugLog(ctx, "cephfs: successfully mounted volume %s to %s", volID, stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil
}
复制代码
ns.mount
这里的挂载分为 fuse 挂载和内核挂载,不同的挂载调用不同的本地 command 来进行。
// internal/cephfs/nodeserver.go
func (*NodeServer) mount(ctx context.Context, volOptions *volumeOptions, req *csi.NodeStageVolumeRequest) error {
stagingTargetPath := req.GetStagingTargetPath()
volID := volumeID(req.GetVolumeId())
cr, err := getCredentialsForVolume(volOptions, req)
if err != nil {
klog.Errorf(util.Log(ctx, "failed to get ceph credentials for volume %s: %v"), volID, err)
return status.Error(codes.Internal, err.Error())
}
defer cr.DeleteCredentials()
m, err := newMounter(volOptions)
if err != nil {
klog.Errorf(util.Log(ctx, "failed to create mounter for volume %s: %v"), volID, err)
return status.Error(codes.Internal, err.Error())
}
util.DebugLog(ctx, "cephfs: mounting volume %s with %s", volID, m.name())
readOnly := "ro"
fuseMountOptions := strings.Split(volOptions.FuseMountOptions, ",")
kernelMountOptions := strings.Split(volOptions.KernelMountOptions, ",")
if req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY ||
req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY {
switch m.(type) {
case *fuseMounter:
if !csicommon.MountOptionContains(strings.Split(volOptions.FuseMountOptions, ","), readOnly) {
volOptions.FuseMountOptions = util.MountOptionsAdd(volOptions.FuseMountOptions, readOnly)
fuseMountOptions = append(fuseMountOptions, readOnly)
}
case *kernelMounter:
if !csicommon.MountOptionContains(strings.Split(volOptions.KernelMountOptions, ","), readOnly) {
volOptions.KernelMountOptions = util.MountOptionsAdd(volOptions.KernelMountOptions, readOnly)
kernelMountOptions = append(kernelMountOptions, readOnly)
}
}
}
if err = m.mount(ctx, stagingTargetPath, cr, volOptions); err != nil {
klog.Errorf(util.Log(ctx,
"failed to mount volume %s: %v Check dmesg logs if required."),
volID,
err)
return status.Error(codes.Internal, err.Error())
}
if !csicommon.MountOptionContains(kernelMountOptions, readOnly) && !csicommon.MountOptionContains(fuseMountOptions, readOnly) {
// #nosec - allow anyone to write inside the stagingtarget path
err = os.Chmod(stagingTargetPath, 0777)
if err != nil {
klog.Errorf(util.Log(ctx, "failed to change stagingtarget path %s permission for volume %s: %v"), stagingTargetPath, volID, err)
uErr := unmountVolume(ctx, stagingTargetPath)
if uErr != nil {
klog.Errorf(util.Log(ctx, "failed to umount stagingtarget path %s for volume %s: %v"), stagingTargetPath, volID, uErr)
}
return status.Error(codes.Internal, err.Error())
}
}
return nil
}
复制代码
fuse 挂载
// internal/cephfs/volumemounter.go
func (m *fuseMounter) mount(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error {
if err := util.CreateMountPoint(mountPoint); err != nil {
return err
}
return mountFuse(ctx, mountPoint, cr, volOptions)
}
func mountFuse(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error {
args := []string{
mountPoint,
"-m", volOptions.Monitors,
"-c", util.CephConfigPath,
"-n", cephEntityClientPrefix + cr.ID, "--keyfile=" + cr.KeyFile,
"-r", volOptions.RootPath,
"-o", "nonempty",
}
if volOptions.FuseMountOptions != "" {
args = append(args, ","+volOptions.FuseMountOptions)
}
if volOptions.FsName != "" {
args = append(args, "--client_mds_namespace="+volOptions.FsName)
}
_, stderr, err := util.ExecCommand(ctx, "ceph-fuse", args[:]...)
if err != nil {
return err
}
// Parse the output:
// We need "starting fuse" meaning the mount is ok
// and PID of the ceph-fuse daemon for unmount
match := fusePidRx.FindSubmatch([]byte(stderr))
// validMatchLength is set to 2 as match is expected
// to have 2 items, starting fuse and PID of the fuse daemon
const validMatchLength = 2
if len(match) != validMatchLength {
return fmt.Errorf("ceph-fuse failed: %s", stderr)
}
pid, err := strconv.Atoi(string(match[1]))
if err != nil {
return fmt.Errorf("failed to parse FUSE daemon PID: %w", err)
}
fusePidMapMtx.Lock()
fusePidMap[mountPoint] = pid
fusePidMapMtx.Unlock()
return nil
}
复制代码
内核挂载
// internal/cephfs/volumemounter.go
func (m *kernelMounter) mount(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error {
if err := util.CreateMountPoint(mountPoint); err != nil {
return err
}
return mountKernel(ctx, mountPoint, cr, volOptions)
}
func mountKernel(ctx context.Context, mountPoint string, cr *util.Credentials, volOptions *volumeOptions) error {
if err := execCommandErr(ctx, "modprobe", "ceph"); err != nil {
return err
}
args := []string{
"-t", "ceph",
fmt.Sprintf("%s:%s", volOptions.Monitors, volOptions.RootPath),
mountPoint,
}
optionsStr := fmt.Sprintf("name=%s,secretfile=%s", cr.ID, cr.KeyFile)
mdsNamespace := ""
if volOptions.FsName != "" {
mdsNamespace = fmt.Sprintf("mds_namespace=%s", volOptions.FsName)
}
optionsStr = util.MountOptionsAdd(optionsStr, mdsNamespace, volOptions.KernelMountOptions, netDev)
args = append(args, "-o", optionsStr)
return execCommandErr(ctx, "mount", args[:]...)
}
复制代码
(3)NodeUnstageVolume
将 cephfs 的远端目录到 node 上的 staging path 的挂载解除掉。
NodeUnstageVolume
主要逻辑:
(1)校验请求参数;
(2)调用 unmountVolume 解除 cephfs 的远端目录到 node 上的 staging path 的挂载。
// internal/cephfs/nodeserver.go
// NodeUnstageVolume unstages the volume from the staging path.
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
var err error
if err = util.ValidateNodeUnstageVolumeRequest(req); err != nil {
return nil, err
}
volID := req.GetVolumeId()
if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID)
}
defer ns.VolumeLocks.Release(volID)
stagingTargetPath := req.GetStagingTargetPath()
// Unmount the volume
if err = unmountVolume(ctx, stagingTargetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
util.DebugLog(ctx, "cephfs: successfully unmounted volume %s from %s", req.GetVolumeId(), stagingTargetPath)
return &csi.NodeUnstageVolumeResponse{}, nil
}
复制代码
unmountVolume
// internal/cephfs/volumemounter.go
func unmountVolume(ctx context.Context, mountPoint string) error {
if err := execCommandErr(ctx, "umount", mountPoint); err != nil {
if strings.Contains(err.Error(), fmt.Sprintf("exit status 32: umount: %s: not mounted", mountPoint)) ||
strings.Contains(err.Error(), "No such file or directory") {
return nil
}
return err
}
fusePidMapMtx.Lock()
pid, ok := fusePidMap[mountPoint]
if ok {
delete(fusePidMap, mountPoint)
}
fusePidMapMtx.Unlock()
if ok {
p, err := os.FindProcess(pid)
if err != nil {
klog.Warningf(util.Log(ctx, "failed to find process %d: %v"), pid, err)
} else {
if _, err = p.Wait(); err != nil {
klog.Warningf(util.Log(ctx, "%d is not a child process: %v"), pid, err)
}
}
}
return nil
}
复制代码
评论