func main() { var config *rest.Config var err error
flag.Var(utilflag.NewMapStringBool(&featureGates), "feature-gates", "A set of key=value pairs that describe feature gates for alpha/experimental features. "+ "Options are:\n"+strings.Join(utilfeature.DefaultFeatureGate.KnownFeatures(), "\n"))
klog.InitFlags(nil) flag.CommandLine.AddGoFlagSet(goflag.CommandLine) flag.Set("logtostderr", "true") flag.Parse()
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(featureGates); err != nil { klog.Fatal(err) }
if *showVersion { fmt.Println(os.Args[0], version) os.Exit(0) } klog.Infof("Version: %s", version)
// get the KUBECONFIG from env if specified (useful for local/debug cluster) kubeconfigEnv := os.Getenv("KUBECONFIG")
if kubeconfigEnv != "" { klog.Infof("Found KUBECONFIG environment variable set, using that..") kubeconfig = &kubeconfigEnv }
if *master != "" || *kubeconfig != "" { klog.Infof("Either master or kubeconfig specified. building kube config from that..") config, err = clientcmd.BuildConfigFromFlags(*master, *kubeconfig) } else { klog.Infof("Building kube configs for running in cluster...") config, err = rest.InClusterConfig() } if err != nil { klog.Fatalf("Failed to create config: %v", err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { klog.Fatalf("Failed to create client: %v", err) } // snapclientset.NewForConfig creates a new Clientset for VolumesnapshotV1beta1Client snapClient, err := snapclientset.NewForConfig(config) if err != nil { klog.Fatalf("Failed to create snapshot client: %v", err) }
// The controller needs to know what the server version is because out-of-tree // provisioners aren't officially supported until 1.5 serverVersion, err := clientset.Discovery().ServerVersion() if err != nil { klog.Fatalf("Error getting server version: %v", err) }
metricsManager := metrics.NewCSIMetricsManager("" /* driverName */)
grpcClient, err := ctrl.Connect(*csiEndpoint, metricsManager) if err != nil { klog.Error(err.Error()) os.Exit(1) } // 循环探测,直至CSI driver即cephcsi-rbd服务准备好 err = ctrl.Probe(grpcClient, *operationTimeout) if err != nil { klog.Error(err.Error()) os.Exit(1) }
// 从ceph-csi组件中获取driver name provisionerName, err := ctrl.GetDriverName(grpcClient, *operationTimeout) if err != nil { klog.Fatalf("Error getting CSI driver name: %s", err) } klog.V(2).Infof("Detected CSI driver %s", provisionerName) metricsManager.SetDriverName(provisionerName) metricsManager.StartMetricsEndpoint(*metricsAddress, *metricsPath) // 从ceph-csi组件中获取driver能力 pluginCapabilities, controllerCapabilities, err := ctrl.GetDriverCapabilities(grpcClient, *operationTimeout) if err != nil { klog.Fatalf("Error getting CSI driver capabilities: %s", err) }
// Generate a unique ID for this provisioner timeStamp := time.Now().UnixNano() / int64(time.Millisecond) identity := strconv.FormatInt(timeStamp, 10) + "-" + strconv.Itoa(rand.Intn(10000)) + "-" + provisionerName // 开始构建infomer factory := informers.NewSharedInformerFactory(clientset, ctrl.ResyncPeriodOfCsiNodeInformer)
// ------------------------------- // Listers // Create informer to prevent hit the API server for all resource request scLister := factory.Storage().V1().StorageClasses().Lister() claimLister := factory.Core().V1().PersistentVolumeClaims().Lister()
var csiNodeLister storagelistersv1beta1.CSINodeLister var nodeLister v1.NodeLister if ctrl.SupportsTopology(pluginCapabilities) { csiNodeLister = factory.Storage().V1beta1().CSINodes().Lister() nodeLister = factory.Core().V1().Nodes().Lister() }
// ------------------------------- // PersistentVolumeClaims informer rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax) claimQueue := workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims") claimInformer := factory.Core().V1().PersistentVolumeClaims().Informer()
// Setup options provisionerOptions := []func(*controller.ProvisionController) error{ controller.LeaderElection(false), // Always disable leader election in provisioner lib. Leader election should be done here in the CSI provisioner level instead. controller.FailedProvisionThreshold(0), controller.FailedDeleteThreshold(0), controller.RateLimiter(rateLimiter), controller.Threadiness(int(*workerThreads)), controller.CreateProvisionedPVLimiter(workqueue.DefaultControllerRateLimiter()), controller.ClaimsInformer(claimInformer), }
translator := csitrans.New()
supportsMigrationFromInTreePluginName := "" if translator.IsMigratedCSIDriverByName(provisionerName) { supportsMigrationFromInTreePluginName, err = translator.GetInTreeNameFromCSIName(provisionerName) if err != nil { klog.Fatalf("Failed to get InTree plugin name for migrated CSI plugin %s: %v", provisionerName, err) } klog.V(2).Infof("Supports migration from in-tree plugin: %s", supportsMigrationFromInTreePluginName) provisionerOptions = append(provisionerOptions, controller.AdditionalProvisionerNames([]string{supportsMigrationFromInTreePluginName})) }
// Create the provisioner: it implements the Provisioner interface expected by // the controller csiProvisioner := ctrl.NewCSIProvisioner( clientset, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities, controllerCapabilities, supportsMigrationFromInTreePluginName, *strictTopology, translator, scLister, csiNodeLister, nodeLister, claimLister, *extraCreateMetadata, )
provisionController = controller.NewProvisionController( clientset, provisionerName, csiProvisioner, serverVersion.GitVersion, provisionerOptions..., )
csiClaimController := ctrl.NewCloningProtectionController( clientset, claimLister, claimInformer, claimQueue, ) // 主循环函数 run := func(context.Context) { stopCh := context.Background().Done() factory.Start(stopCh) cacheSyncResult := factory.WaitForCacheSync(stopCh) for _, v := range cacheSyncResult { if !v { klog.Fatalf("Failed to sync Informers!") } } // 跑两个controller,后面主要分析provisionController go csiClaimController.Run(int(*finalizerThreads), stopCh) provisionController.Run(wait.NeverStop) } // Leader 选举相关 if !*enableLeaderElection { run(context.TODO()) } else { // this lock name pattern is also copied from sigs.k8s.io/sig-storage-lib-external-provisioner/v5/controller // to preserve backwards compatibility lockName := strings.Replace(provisionerName, "/", "-", -1) // 使用endpoints或leases资源对象来选leader var le leaderElection if *leaderElectionType == "endpoints" { klog.Warning("The 'endpoints' leader election type is deprecated and will be removed in a future release. Use '--leader-election-type=leases' instead.") le = leaderelection.NewLeaderElectionWithEndpoints(clientset, lockName, run) } else if *leaderElectionType == "leases" { le = leaderelection.NewLeaderElection(clientset, lockName, run) } else { klog.Error("--leader-election-type must be either 'endpoints' or 'leases'") os.Exit(1) }
if *leaderElectionNamespace != "" { le.WithNamespace(*leaderElectionNamespace) } // 处理Leader 选举逻辑的方法 if err := le.Run(); err != nil { klog.Fatalf("failed to initialize leader election: %v", err) } }
}
评论