更多 ceph-csi 其他源码分析,请查看下面这篇博文:kubernetes ceph-csi 分析 - 目录导航https://xie.infoq.cn/article/4b1d3e32f124307a49cd9c1e3
摘要
ceph-csi 分析-external-attacher 源码分析。external-attacher 属于 external plugin 中的一个,辅助 csi plugin 组件,共同完成了存储相关操作。external-attacher watch volumeAttachment 对象,然后调用 csi plugin 来做 attach/dettach 操作,并修改 volumeAttachment 对象与 pv 对象。
基于 tag v2.1.1
https://github.com/kubernetes-csi/external-attacher/releases/tag/v2.1.1
external-attacher
external-attacher 属于 external plugin 中的一个。下面我们先来回顾一下 external plugin 以及 csi 系统结构。
external plugin
external plugin 包括了 external-provisioner、external-attacher、external-resizer、external-snapshotter 等,external plugin 辅助 csi plugin 组件,共同完成了存储相关操作。external plugin 负责 watch pvc、volumeAttachment 等对象,然后调用 volume plugin 来完成存储的相关操作。如 external-provisioner watch pvc 对象,然后调用 csi plugin 来创建存储,最后创建 pv 对象;external-attacher watch volumeAttachment 对象,然后调用 csi plugin 来做 attach/dettach 操作,并修改 volumeAttachment 对象与 pv 对象;external-resizer watch pvc 对象,然后调用 csi plugin 来做存储的扩容操作等。
csi 系统结构
external-attacher 作用分析
根据 CSI plugin 是否支持 ControllerPublish/ControllerUnpublish 操作,external-attacher 的作用分为如下两种:
(1)当 CSI plugin 不支持 ControllerPublish/ControllerUnpublish 操作时,AD controller(或 kubelet 的 volume manager)创建 VolumeAttachment 对象后,external-attacher 仅参与 VolumeAttachment 对象的修改,将 attached 属性值 patch 为 true;而 external-attacher 对 pv 对象无任何同步处理操作。
(2)当 CSI plugin 支持 ControllerPublish/ControllerUnpublish 操作时,external-attacher 调用 csi plugin(ControllerPublishVolume)进行存储的 attach 操作,然后更改 VolumeAttachment 对象,将 attached 属性值 patch 为 true,并 patch pv 对象,增加该 external-attacher 相关的 Finalizer;对于 pv 对象,external-attacher 负责处理 pv 对象的 finalizer,patch pv 对象,去除该 external-attacher 相关的 finalizer(该 external-attacher 执行 attach 操作时添加的 finalizer)。
源码分析
external-attacher 的源码分析分为两部分:
(1)main 方法以及启动参数分析;
(2)核心处理逻辑分析。
这篇博客先对 external-attacher 的 main 方法以及启动参数做分析,下篇博客将对 external-attacher 的核心处理逻辑进行源码分析。
main
main 方法主要逻辑:
(1)解析启动参数;
(2)校验 worker-threads 配置;
(3)根据配置建立 clientset;
(4)建立与 csi plugin 的 grpcclient;
(5)调用 csi plugin 的探测方法(探测 cephcsi-rbd 服务是否准备好),直至探测成功;
(6)获取 driver 名称;
(7)获取 csi plugin 提供的能力,判断是否提供 Controller 服务,当不支持时实例化 TrivialHandler,当支持时进入下一步;
(8)获取 csi plugin 提供的能力,判断是否支持 ControllerPublish/ControllerUnpublish 操作,支持则实例化 CSIHandler,不支持则实例化 TrivialHandler(CSIHandler 与 TrivialHandler 在下一篇博客再做具体介绍,这里只要知道有这么两个东西就行);
(9)ceph-csi 不提供 Controller attach 服务,所以走 NewTrivialHandler;
(10)获取 csi plugin 提供的能力,判断是否提供 ListVolumesPublishedNodes 服务;
(11)新建 CSIAttachController;
(12)定义 run 方法,方法中调用 CSIAttachController 的 run 方法;
(13)进行高可用选主相关操作,并运行 run 方法。
func main() {
// 解析启动参数
klog.InitFlags(nil)
flag.Set("logtostderr", "true")
flag.Parse()
// --version输出
if *showVersion {
fmt.Println(os.Args[0], version)
return
}
klog.Infof("Version: %s", version)
// 根据配置建立clientset
// Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
config, err := buildConfig(*kubeconfig)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}
// 校验worker-threads配置
if *workerThreads == 0 {
klog.Error("option -worker-threads must be greater than zero")
os.Exit(1)
}
// 根据配置建立clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}
factory := informers.NewSharedInformerFactory(clientset, *resync)
var handler controller.Handler
metricsManager := metrics.NewCSIMetricsManager("" /* driverName */)
// 建立grpcclient
// Connect to CSI.
csiConn, err := connection.Connect(*csiAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss()))
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}
// 进行grpc探测(探测cephcsi-rbd服务是否准备好),直至探测成功
err = rpc.ProbeForever(csiConn, *timeout)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}
// 获取driver名称
// Find driver name.
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()
csiAttacher, err := rpc.GetDriverName(ctx, csiConn)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}
klog.V(2).Infof("CSI driver name: %q", csiAttacher)
metricsManager.SetDriverName(csiAttacher)
metricsManager.StartMetricsEndpoint(*metricsAddress, *metricsPath)
// 获取csi plugin提供的能力,判断是否提供Controller服务
supportsService, err := supportsPluginControllerService(ctx, csiConn)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}
if !supportsService {
// 实例化TrivialHandler
handler = controller.NewTrivialHandler(clientset)
klog.V(2).Infof("CSI driver does not support Plugin Controller Service, using trivial handler")
} else {
// 获取csi plugin提供的能力,判断是否提供Controller attach服务
// Find out if the driver supports attach/detach.
supportsAttach, supportsReadOnly, err := supportsControllerPublish(ctx, csiConn)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}
if supportsAttach {
pvLister := factory.Core().V1().PersistentVolumes().Lister()
nodeLister := factory.Core().V1().Nodes().Lister()
vaLister := factory.Storage().V1beta1().VolumeAttachments().Lister()
csiNodeLister := factory.Storage().V1beta1().CSINodes().Lister()
volAttacher := attacher.NewAttacher(csiConn)
CSIVolumeLister := attacher.NewVolumeLister(csiConn)
// 实例化CSIHandler
handler = controller.NewCSIHandler(clientset, csiAttacher, volAttacher, CSIVolumeLister, pvLister, nodeLister, csiNodeLister, vaLister, timeout, supportsReadOnly, csitrans.New())
klog.V(2).Infof("CSI driver supports ControllerPublishUnpublish, using real CSI handler")
} else {
// 实例化TrivialHandler
handler = controller.NewTrivialHandler(clientset)
klog.V(2).Infof("CSI driver does not support ControllerPublishUnpublish, using trivial handler")
}
}
// 获取csi plugin提供的能力,判断是否提供ListVolumesPublishedNodes服务
slvpn, err := supportsListVolumesPublishedNodes(ctx, csiConn)
if err != nil {
klog.Errorf("Failed to check if driver supports ListVolumesPublishedNodes, assuming it does not: %v", err)
}
if slvpn {
klog.V(2).Infof("CSI driver supports list volumes published nodes. Using capability to reconcile volume attachment objects with actual backend state")
}
// 新建CSIAttachController
ctrl := controller.NewCSIAttachController(
clientset,
csiAttacher,
handler,
factory.Storage().V1beta1().VolumeAttachments(),
factory.Core().V1().PersistentVolumes(),
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
slvpn,
*reconcileSync,
)
// 定义run方法,方法中调用CSIAttachController的run方法
run := func(ctx context.Context) {
stopCh := ctx.Done()
factory.Start(stopCh)
ctrl.Run(int(*workerThreads), stopCh)
}
// 进行高可用选主相关操作,并运行run方法
if !*enableLeaderElection {
run(context.TODO())
} else {
// Name of config map with leader election lock
lockName := "external-attacher-leader-" + csiAttacher
le := leaderelection.NewLeaderElection(clientset, lockName, run)
if *leaderElectionNamespace != "" {
le.WithNamespace(*leaderElectionNamespace)
}
if err := le.Run(); err != nil {
klog.Fatalf("failed to initialize leader election: %v", err)
}
}
}
复制代码
启动参数分析
启动参数列表
具体参考https://github.com/kubernetes-csi/external-attacher#command-line-options
// Command line flags
var (
kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.")
resync = flag.Duration("resync", 10*time.Minute, "Resync interval of the controller.")
csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
showVersion = flag.Bool("version", false, "Show version.")
timeout = flag.Duration("timeout", 15*time.Second, "Timeout for waiting for attaching or detaching the volume.")
workerThreads = flag.Uint("worker-threads", 10, "Number of attacher worker threads")
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed create volume or deletion. It doubles with each failure, up to retry-interval-max.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed create volume or deletion.")
enableLeaderElection = flag.Bool("leader-election", false, "Enable leader election.")
leaderElectionNamespace = flag.String("leader-election-namespace", "", "Namespace where the leader election resource lives. Defaults to the pod namespace if not set.")
reconcileSync = flag.Duration("reconcile-sync", 1*time.Minute, "Resync interval of the VolumeAttachment reconciler.")
metricsAddress = flag.String("metrics-address", "", "The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled.")
metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.")
)
func main() {
klog.InitFlags(nil)
flag.Set("logtostderr", "true")
flag.Parse()
......
复制代码
external-attacher 容器启动参数
- args:
- --v=5
- --csi-address=$(ADDRESS)
- --leader-election=true
- --retry-interval-start=500ms
env:
- name: ADDRESS
value: /csi/csi-provisioner.sock
volumeMounts:
- mountPath: /csi
name: socket-dir
volumes:
- emptyDir:
medium: Memory
name: socket-dir
复制代码
下面是部分参数解析:
v
日志打印等级。
csi-address
csi plugin(controller server)暴露服务的 socket 地址。
leader-election
是否开启高可用选主(使用 Leases)。
retry-interval-start
VA/PV 对象同步失败后的重试时间间隔。
retry-interval-max
VA/PV 对象同步失败后的最大重试时间间隔。
timeout
与 CSI driver 通信的超时时间,默认 15 秒。
worker-threads
同步 VA/PV 对象的工作线程数量,默认 10。
总结
external-attacher 属于 external plugin 中的一个。
external-attacher 作用分析
根据 CSI plugin 是否支持 ControllerPublish/ControllerUnpublish 操作,external-attacher 的作用分为如下两种:
(1)当 CSI plugin 不支持 ControllerPublish/ControllerUnpublish 操作时,AD controller(或 kubelet 的 volume manager)创建 VolumeAttachment 对象后,external-attacher 仅参与 VolumeAttachment 对象的修改,将 attached 属性值 patch 为 true;而 external-attacher 对 pv 对象无任何同步处理操作。
(2)当 CSI plugin 支持 ControllerPublish/ControllerUnpublish 操作时,external-attacher 调用 csi plugin(ControllerPublishVolume)进行存储的 attach 操作,然后更改 VolumeAttachment 对象,将 attached 属性值 patch 为 true,并 patch pv 对象,增加该 external-attacher 相关的 Finalizer;对于 pv 对象,external-attacher 负责处理 pv 对象的 finalizer,patch pv 对象,去除该 external-attacher 相关的 finalizer(该 external-attacher 执行 attach 操作时添加的 finalizer)。
external-attacher 与 ceph-csi 结合使用
ceph-csi 不支持 ControllerPublish/ControllerUnpublish 操作,所以 external-attacher 与 ceph-csi 结合使用,external-attacher 仅参与 VolumeAttachment 对象的修改,将 attached 属性值 patch 为 true。
评论