func main() {
var config *rest.Config
var err error
flag.Var(utilflag.NewMapStringBool(&featureGates), "feature-gates", "A set of key=value pairs that describe feature gates for alpha/experimental features. "+
"Options are:\n"+strings.Join(utilfeature.DefaultFeatureGate.KnownFeatures(), "\n"))
klog.InitFlags(nil)
flag.CommandLine.AddGoFlagSet(goflag.CommandLine)
flag.Set("logtostderr", "true")
flag.Parse()
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(featureGates); err != nil {
klog.Fatal(err)
}
if *showVersion {
fmt.Println(os.Args[0], version)
os.Exit(0)
}
klog.Infof("Version: %s", version)
// get the KUBECONFIG from env if specified (useful for local/debug cluster)
kubeconfigEnv := os.Getenv("KUBECONFIG")
if kubeconfigEnv != "" {
klog.Infof("Found KUBECONFIG environment variable set, using that..")
kubeconfig = &kubeconfigEnv
}
if *master != "" || *kubeconfig != "" {
klog.Infof("Either master or kubeconfig specified. building kube config from that..")
config, err = clientcmd.BuildConfigFromFlags(*master, *kubeconfig)
} else {
klog.Infof("Building kube configs for running in cluster...")
config, err = rest.InClusterConfig()
}
if err != nil {
klog.Fatalf("Failed to create config: %v", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatalf("Failed to create client: %v", err)
}
// snapclientset.NewForConfig creates a new Clientset for VolumesnapshotV1beta1Client
snapClient, err := snapclientset.NewForConfig(config)
if err != nil {
klog.Fatalf("Failed to create snapshot client: %v", err)
}
// The controller needs to know what the server version is because out-of-tree
// provisioners aren't officially supported until 1.5
serverVersion, err := clientset.Discovery().ServerVersion()
if err != nil {
klog.Fatalf("Error getting server version: %v", err)
}
metricsManager := metrics.NewCSIMetricsManager("" /* driverName */)
grpcClient, err := ctrl.Connect(*csiEndpoint, metricsManager)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}
// 循环探测,直至CSI driver即cephcsi-rbd服务准备好
err = ctrl.Probe(grpcClient, *operationTimeout)
if err != nil {
klog.Error(err.Error())
os.Exit(1)
}
// 从ceph-csi组件中获取driver name
provisionerName, err := ctrl.GetDriverName(grpcClient, *operationTimeout)
if err != nil {
klog.Fatalf("Error getting CSI driver name: %s", err)
}
klog.V(2).Infof("Detected CSI driver %s", provisionerName)
metricsManager.SetDriverName(provisionerName)
metricsManager.StartMetricsEndpoint(*metricsAddress, *metricsPath)
// 从ceph-csi组件中获取driver能力
pluginCapabilities, controllerCapabilities, err := ctrl.GetDriverCapabilities(grpcClient, *operationTimeout)
if err != nil {
klog.Fatalf("Error getting CSI driver capabilities: %s", err)
}
// Generate a unique ID for this provisioner
timeStamp := time.Now().UnixNano() / int64(time.Millisecond)
identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + provisionerName
// 开始构建infomer
factory := informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)
// -------------------------------
// Listers
// Create informer to prevent hit the API server for all resource request
scLister := factory.Storage().V1().StorageClasses().Lister()
claimLister := factory.Core().V1().PersistentVolumeClaims().Lister()
var csiNodeLister storagelistersv1beta1.CSINodeLister
var nodeLister v1.NodeLister
if ctrl.SupportsTopology(pluginCapabilities) {
csiNodeLister = factory.Storage().V1beta1().CSINodes().Lister()
nodeLister = factory.Core().V1().Nodes().Lister()
}
// -------------------------------
// PersistentVolumeClaims informer
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax)
claimQueue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")
claimInformer := factory.Core().V1().PersistentVolumeClaims().Informer()
// Setup options
provisionerOptions := []func(*controller.ProvisionController) error{
controller.LeaderElection(false), // Always disable leader election in provisioner lib. Leader election should be done here in the CSI provisioner level instead.
controller.FailedProvisionThreshold(0),
controller.FailedDeleteThreshold(0),
controller.RateLimiter(rateLimiter),
controller.Threadiness(int(*workerThreads)),
controller.CreateProvisionedPVLimiter(workqueue.DefaultControllerRateLimiter()),
controller.ClaimsInformer(claimInformer),
}
translator := csitrans.New()
supportsMigrationFromInTreePluginName := ""
if translator.IsMigratedCSIDriverByName(provisionerName) {
supportsMigrationFromInTreePluginName, err = translator.GetInTreeNameFromCSIName(provisionerName)
if err != nil {
klog.Fatalf("Failed to get InTree plugin name for migrated CSI plugin %s: %v", provisionerName, err)
}
klog.V(2).Infof("Supports migration from in-tree plugin: %s", supportsMigrationFromInTreePluginName)
provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName}))
}
// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(
clientset,
*operationTimeout,
identity,
*volumeNamePrefix,
*volumeNameUUIDLength,
grpcClient,
snapClient,
provisionerName,
pluginCapabilities,
controllerCapabilities,
supportsMigrationFromInTreePluginName,
*strictTopology,
translator,
scLister,
csiNodeLister,
nodeLister,
claimLister,
*extraCreateMetadata,
)
provisionController = controller.NewProvisionController(
clientset,
provisionerName,
csiProvisioner,
serverVersion.GitVersion,
provisionerOptions...,
)
csiClaimController := ctrl.NewCloningProtectionController(
clientset,
claimLister,
claimInformer,
claimQueue,
)
// 主循环函数
run := func(context.Context) {
stopCh := context.Background().Done()
factory.Start(stopCh)
cacheSyncResult := factory.WaitForCacheSync(stopCh)
for _, v := range cacheSyncResult {
if !v {
klog.Fatalf("Failed to sync Informers!")
}
}
// 跑两个controller,后面主要分析provisionController
go csiClaimController.Run(int(*finalizerThreads), stopCh)
provisionController.Run(wait.NeverStop)
}
// Leader 选举相关
if !*enableLeaderElection {
run(context.TODO())
} else {
// this lock name pattern is also copied from sigs.k8s.io/sig-storage-lib-external-provisioner/v5/controller
// to preserve backwards compatibility
lockName := strings.Replace(provisionerName, "/", "-", -1)
// 使用endpoints或leases资源对象来选leader
var le leaderElection
if *leaderElectionType == "endpoints" {
klog.Warning("The 'endpoints' leader election type is deprecated and will be removed in a future release. Use '--leader-election-type=leases' instead.")
le = leaderelection.NewLeaderElectionWithEndpoints(clientset, lockName, run)
} else if *leaderElectionType == "leases" {
le = leaderelection.NewLeaderElection(clientset, lockName, run)
} else {
klog.Error("--leader-election-type must be either 'endpoints' or 'leases'")
os.Exit(1)
}
if *leaderElectionNamespace != "" {
le.WithNamespace(*leaderElectionNamespace)
}
// 处理Leader 选举逻辑的方法
if err := le.Run(); err != nil {
klog.Fatalf("failed to initialize leader election: %v", err)
}
}
}
评论