replicaset controller 分析
replicaset controller 简介
replicaset controller 是 kube-controller-manager 组件中众多控制器中的一个,是 replicaset 资源对象的控制器,其通过对 replicaset、pod 2 种资源的监听,当这 2 种资源发生变化时会触发 replicaset controller 对相应的 replicaset 对象进行调谐操作,从而完成 replicaset 期望副本数的调谐,当实际 pod 的数量未达到预期时创建 pod,当实际 pod 的数量超过预期时删除 pod。
replicaset controller 主要作用是根据 replicaset 对象所期望的 pod 数量与现存 pod 数量做比较,然后根据比较结果创建/删除 pod,最终使得 replicaset 对象所期望的 pod 数量与现存 pod 数量相等。
replicaset controller 架构图
replicaset controller 的大致组成和处理流程如下图,replicaset controller 对 pod 和 replicaset 对象注册了 event handler,当有事件时,会 watch 到然后将对应的 replicaset 对象放入到 queue 中,然后syncReplicaSet
方法为 replicaset controller 调谐 replicaset 对象的核心处理逻辑所在,从 queue 中取出 replicaset 对象,做调谐处理。
replicaset controller 分析将分为 3 大块进行,分别是:
(1)replicaset controller 初始化和启动分析;
(2)replicaset controller 核心处理逻辑分析;
(3)replicaset controller expectations 机制分析。
本篇博客先进行 replicaset controller 初始化和启动分析。
ReplicaSetController 的初始化与启动分析
基于 tag v1.17.4
https://github.com/kubernetes/kubernetes/releases/tag/v1.17.4
直接以 startReplicaSetController 函数作为 garbage collector 的初始化与启动源码分析入口。
startReplicaSetController 中调用了replicaset.NewReplicaSetController
来进行 ReplicaSetController 的初始化,初始化完成后调用Run
进行启动。
这里留意传入Run
方法的参数ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs
,后面会详细分析。
// cmd/kube-controller-manager/app/apps.go
func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
return nil, false, nil
}
go replicaset.NewReplicaSetController(
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas,
).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
return nil, true, nil
}
复制代码
初始化分析
分析入口 NewReplicaSetController
NewReplicaSetController 主要是初始化 ReplicaSetController,定义 replicaset 与 pod 对象的 informer,并注册 EventHandler-AddFunc、UpdateFunc 与 DeleteFunc 等,用于监听 replicaset 与 pod 对象的变动。
// pkg/controller/replicaset/replica_set.go
// NewReplicaSetController configures a replica set controller with the specified event recorder
func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas,
apps.SchemeGroupVersion.WithKind("ReplicaSet"),
"replicaset_controller",
"replicaset",
controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
},
)
}
// NewBaseController is the implementation of NewReplicaSetController with additional injected
// parameters so that it can also serve as the implementation of NewReplicationController.
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
ratelimiter.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter())
}
rsc := &ReplicaSetController{
GroupVersionKind: gvk,
kubeClient: kubeClient,
podControl: podControl,
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
}
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.addRS,
UpdateFunc: rsc.updateRS,
DeleteFunc: rsc.deleteRS,
})
rsc.rsLister = rsInformer.Lister()
rsc.rsListerSynced = rsInformer.Informer().HasSynced
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rsc.addPod,
// This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
// overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
// local storage, so it should be ok.
UpdateFunc: rsc.updatePod,
DeleteFunc: rsc.deletePod,
})
rsc.podLister = podInformer.Lister()
rsc.podListerSynced = podInformer.Informer().HasSynced
rsc.syncHandler = rsc.syncReplicaSet
return rsc
}
复制代码
queue
queue 是 replicaset controller 做 sync 操作的关键。当 replicaset 或 pod 对象发生改变,其对应的 EventHandler 会把该对象往 queue 中加入,而 replicaset controller 的 Run 方法中调用的 rsc.worker(后面再做分析)会从 queue 中获取对象并做相应的调谐操作。
queue 中存放的对象格式:namespace/name
type ReplicaSetController struct {
...
// Controllers that need to be synced
queue workqueue.RateLimitingInterface
}
复制代码
queue 的来源是 replicaset 与 pod 对象的 EventHandler,下面来一个个分析。
1 rsc.addRS
当发现有新增的 replicaset 对象,会调用该方法。
主要逻辑:调用 rsc.enqueueRS 将该对象加入 queue 中。
// pkg/controller/replicaset/replica_set.go
func (rsc *ReplicaSetController) addRS(obj interface{}) {
rs := obj.(*apps.ReplicaSet)
klog.V(4).Infof("Adding %s %s/%s", rsc.Kind, rs.Namespace, rs.Name)
rsc.enqueueRS(rs)
}
复制代码
rsc.enqueueRS
组装 key,将 key 加入 queue。
func (rsc *ReplicaSetController) enqueueRS(rs *apps.ReplicaSet) {
key, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
return
}
rsc.queue.Add(key)
}
复制代码
2 rsc.updateRS
当发现 replicaset 对象有更改,会调用该方法。
主要逻辑:
(1)如果新旧 replicaset 对象的 uid 不一致,则调用 rsc.deleteRS(rsc.deleteRS 在后面分析);
(2)调用 rsc.enqueueRS,组装 key,将 key 加入 queue。
// pkg/controller/replicaset/replica_set.go
func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
oldRS := old.(*apps.ReplicaSet)
curRS := cur.(*apps.ReplicaSet)
// TODO: make a KEP and fix informers to always call the delete event handler on re-create
if curRS.UID != oldRS.UID {
key, err := controller.KeyFunc(oldRS)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", oldRS, err))
return
}
rsc.deleteRS(cache.DeletedFinalStateUnknown{
Key: key,
Obj: oldRS,
})
}
// You might imagine that we only really need to enqueue the
// replica set when Spec changes, but it is safer to sync any
// time this function is triggered. That way a full informer
// resync can requeue any replica set that don't yet have pods
// but whose last attempts at creating a pod have failed (since
// we don't block on creation of pods) instead of those
// replica sets stalling indefinitely. Enqueueing every time
// does result in some spurious syncs (like when Status.Replica
// is updated and the watch notification from it retriggers
// this function), but in general extra resyncs shouldn't be
// that bad as ReplicaSets that haven't met expectations yet won't
// sync, and all the listing is done using local stores.
if *(oldRS.Spec.Replicas) != *(curRS.Spec.Replicas) {
klog.V(4).Infof("%v %v updated. Desired pod count change: %d->%d", rsc.Kind, curRS.Name, *(oldRS.Spec.Replicas), *(curRS.Spec.Replicas))
}
rsc.enqueueRS(curRS)
}
复制代码
3 rsc.deleteRS
当发现 replicaset 对象被删除,会调用该方法。
主要逻辑:
(1)调用rsc.expectations.DeleteExpectations
方法删除该 rs 的 expectations(关于 expectations 机制,会在后面单独进行分析,这里有个印象就行);
(2)组装 key,放入 queue 中。
// pkg/controller/replicaset/replica_set.go
func (rsc *ReplicaSetController) deleteRS(obj interface{}) {
rs, ok := obj.(*apps.ReplicaSet)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
rs, ok = tombstone.Obj.(*apps.ReplicaSet)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))
return
}
}
key, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
return
}
klog.V(4).Infof("Deleting %s %q", rsc.Kind, key)
// Delete expectations for the ReplicaSet so if we create a new one with the same name it starts clean
rsc.expectations.DeleteExpectations(key)
rsc.queue.Add(key)
}
复制代码
4 rsc.addPod
当发现有新增的 pod 对象,会调用该方法。
主要逻辑:
(1)如果 pod 的 DeletionTimestamp 属性不为空,则调用rsc.deletePod
(后面再做分析),然后返回;
(2)调用metav1.GetControllerOf
获取该 pod 对象的 OwnerReference,并判断该 pod 是否有上层 controller,有则再调用 rsc.resolveControllerRef 查询该 pod 所属的 replicaset 是否存在,不存在则直接返回;
(3)调用rsc.expectations.CreationObserved
方法,将该 rs 的 expectations 期望创建 pod 数量减 1(关于 expectations 机制,会在后面单独进行分析,这里有个印象就行);
(4)组装 key,放入 queue 中。
注意:pod的eventHandler处理逻辑依然是将pod对应的replicaset对象加入queue中,而不是将pod加入到queue中。
复制代码
// pkg/controller/replicaset/replica_set.go
func (rsc *ReplicaSetController) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
if pod.DeletionTimestamp != nil {
// on a restart of the controller manager, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
rsc.deletePod(pod)
return
}
// If it has a ControllerRef, that's all that matters.
if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
if rs == nil {
return
}
rsKey, err := controller.KeyFunc(rs)
if err != nil {
return
}
klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
rsc.expectations.CreationObserved(rsKey)
rsc.queue.Add(rsKey)
return
}
// Otherwise, it's an orphan. Get a list of all matching ReplicaSets and sync
// them to see if anyone wants to adopt it.
// DO NOT observe creation because no controller should be waiting for an
// orphan.
rss := rsc.getPodReplicaSets(pod)
if len(rss) == 0 {
return
}
klog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod)
for _, rs := range rss {
rsc.enqueueRS(rs)
}
}
复制代码
5 rsc.updatePod
当发现有 pod 对象发生更改,会调用该方法。
主要逻辑:
(1)判断新旧 pod 的 ResourceVersion,如一致,代表无变化,直接返回;
(2)如果 pod 的 DeletionTimestamp 不为空,则调用 rsc.deletePod(后面再做分析),然后返回;
(3)...
// pkg/controller/replicaset/replica_set.go
func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
curPod := cur.(*v1.Pod)
oldPod := old.(*v1.Pod)
if curPod.ResourceVersion == oldPod.ResourceVersion {
// Periodic resync will send update events for all known pods.
// Two different versions of the same pod will always have different RVs.
return
}
labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
if curPod.DeletionTimestamp != nil {
// when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
// for modification of the deletion timestamp and expect an rs to create more replicas asap, not wait
// until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because
// an rs never initiates a phase change, and so is never asleep waiting for the same.
rsc.deletePod(curPod)
if labelChanged {
// we don't need to check the oldPod.DeletionTimestamp because DeletionTimestamp cannot be unset.
rsc.deletePod(oldPod)
}
return
}
curControllerRef := metav1.GetControllerOf(curPod)
oldControllerRef := metav1.GetControllerOf(oldPod)
controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
if controllerRefChanged && oldControllerRef != nil {
// The ControllerRef was changed. Sync the old controller, if any.
if rs := rsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); rs != nil {
rsc.enqueueRS(rs)
}
}
// If it has a ControllerRef, that's all that matters.
if curControllerRef != nil {
rs := rsc.resolveControllerRef(curPod.Namespace, curControllerRef)
if rs == nil {
return
}
klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
rsc.enqueueRS(rs)
// TODO: MinReadySeconds in the Pod will generate an Available condition to be added in
// the Pod status which in turn will trigger a requeue of the owning replica set thus
// having its status updated with the newly available replica. For now, we can fake the
// update by resyncing the controller MinReadySeconds after the it is requeued because
// a Pod transitioned to Ready.
// Note that this still suffers from #29229, we are just moving the problem one level
// "closer" to kubelet (from the deployment to the replica set controller).
if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 {
klog.V(2).Infof("%v %q will be enqueued after %ds for availability check", rsc.Kind, rs.Name, rs.Spec.MinReadySeconds)
// Add a second to avoid milliseconds skew in AddAfter.
// See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
rsc.enqueueRSAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
}
return
}
// Otherwise, it's an orphan. If anything changed, sync matching controllers
// to see if anyone wants to adopt it now.
if labelChanged || controllerRefChanged {
rss := rsc.getPodReplicaSets(curPod)
if len(rss) == 0 {
return
}
klog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
for _, rs := range rss {
rsc.enqueueRS(rs)
}
}
}
复制代码
6 rsc.deletePod
当发现有 pod 对象被删除,会调用该方法。
主要逻辑:
(1)调用 metav1.GetControllerOf 获取该 pod 对象的 OwnerReference,并判断是否是 controller,是则再调用 rsc.resolveControllerRef 查询该 pod 所属的 replicaset 是否存在,不存在则直接返回;
(2)调用rsc.expectations.DeletionObserved
方法,将该 rs 的 expectations 期望删除 pod 数量减 1(关于 expectations 机制,会在后面单独进行分析,这里有个印象就行);(3)组装 key,放入 queue 中。
// pkg/controller/replicaset/replica_set.go
func (rsc *ReplicaSetController) deletePod(obj interface{}) {
pod, ok := obj.(*v1.Pod)
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the pod
// changed labels the new ReplicaSet will not be woken up till the periodic resync.
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
return
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
return
}
}
controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil {
// No controller should care about orphans being deleted.
return
}
rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
if rs == nil {
return
}
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", rs, err))
return
}
klog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod)
rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
rsc.queue.Add(rsKey)
}
复制代码
启动分析
分析入口 Run
根据 workers 的值启动相应数量的 goroutine,循环调用rsc.worker
,从 queue 中取出一个 key 做 replicaset 资源对象的调谐处理。
// pkg/controller/replicaset/replica_set.go
// Run begins watching and syncing.
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer rsc.queue.ShutDown()
controllerName := strings.ToLower(rsc.Kind)
glog.Infof("Starting %v controller", controllerName)
defer glog.Infof("Shutting down %v controller", controllerName)
if !controller.WaitForCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(rsc.worker, time.Second, stopCh)
}
<-stopCh
}
复制代码
此处的workers
参数由startReplicaSetController
方法中传入,值为ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs
,它的值实际由 kube-controller-manager 组件的concurrent-replicaset-syncs
启动参数决定,当不配置时,默认值设置为 5,代表会起 5 个 goroutine 来并行处理和调谐队列中的 replicaset 对象。
下面来看一下 kube-controller-manager 组件中 replicaset controller 相关的concurrent-replicaset-syncs
启动参数。
ReplicaSetControllerOptions
// cmd/kube-controller-manager/app/options/replicasetcontroller.go
// ReplicaSetControllerOptions holds the ReplicaSetController options.
type ReplicaSetControllerOptions struct {
*replicasetconfig.ReplicaSetControllerConfiguration
}
// AddFlags adds flags related to ReplicaSetController for controller manager to the specified FlagSet.
func (o *ReplicaSetControllerOptions) AddFlags(fs *pflag.FlagSet) {
if o == nil {
return
}
fs.Int32Var(&o.ConcurrentRSSyncs, "concurrent-replicaset-syncs", o.ConcurrentRSSyncs, "The number of replica sets that are allowed to sync concurrently. Larger number = more responsive replica management, but more CPU (and network) load")
}
// ApplyTo fills up ReplicaSetController config with options.
func (o *ReplicaSetControllerOptions) ApplyTo(cfg *replicasetconfig.ReplicaSetControllerConfiguration) error {
if o == nil {
return nil
}
cfg.ConcurrentRSSyncs = o.ConcurrentRSSyncs
return nil
}
复制代码
默认值设置
concurrent-replicaset-syncs 参数默认值配置为 5。
// pkg/controller/apis/config/v1alpha1/register.go
func init() {
// We only register manually written functions here. The registration of the
// generated functions takes place in the generated files. The separation
// makes the code compile even when the generated files are missing.
localSchemeBuilder.Register(addDefaultingFuncs)
}
复制代码
// pkg/controller/apis/config/v1alpha1/defaults.go
func addDefaultingFuncs(scheme *kruntime.Scheme) error {
return RegisterDefaults(scheme)
}
复制代码
// pkg/controller/apis/config/v1alpha1/zz_generated.defaults.go
func RegisterDefaults(scheme *runtime.Scheme) error {
scheme.AddTypeDefaultingFunc(&v1alpha1.KubeControllerManagerConfiguration{}, func(obj interface{}) {
SetObjectDefaults_KubeControllerManagerConfiguration(obj.(*v1alpha1.KubeControllerManagerConfiguration))
})
return nil
}
func SetObjectDefaults_KubeControllerManagerConfiguration(in *v1alpha1.KubeControllerManagerConfiguration) {
SetDefaults_KubeControllerManagerConfiguration(in)
SetDefaults_KubeCloudSharedConfiguration(&in.KubeCloudShared)
}
复制代码
// pkg/controller/apis/config/v1alpha1/defaults.go
func SetDefaults_KubeControllerManagerConfiguration(obj *kubectrlmgrconfigv1alpha1.KubeControllerManagerConfiguration) {
...
// Use the default RecommendedDefaultReplicaSetControllerConfiguration options
replicasetconfigv1alpha1.RecommendedDefaultReplicaSetControllerConfiguration(&obj.ReplicaSetController)
...
}
复制代码
// pkg/controller/replicaset/config/v1alpha1/defaults.go
func RecommendedDefaultReplicaSetControllerConfiguration(obj *kubectrlmgrconfigv1alpha1.ReplicaSetControllerConfiguration) {
if obj.ConcurrentRSSyncs == 0 {
obj.ConcurrentRSSyncs = 5
}
}
复制代码
分析完 replicaset controller 启动参数后,来看一下启动后调用的核心处理方法。
1 rsc.worker
前面提到,在 replicaset controller 的 Run 方法中,会根据 workers 的值启动相应数量的 goroutine,循环调用rsc.worker
,从 queue 中取出一个 key 做 replicaset 资源对象的调谐处理。
rsc.worker 主要逻辑:
(1)从 queue 中获取一个 key;
(2)调用rsc.syncHandler
对该 key 做进一步处理;
(3)从 queue 中去除该 key。
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (rsc *ReplicaSetController) worker() {
for rsc.processNextWorkItem() {
}
}
func (rsc *ReplicaSetController) processNextWorkItem() bool {
key, quit := rsc.queue.Get()
if quit {
return false
}
defer rsc.queue.Done(key)
err := rsc.syncHandler(key.(string))
if err == nil {
rsc.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
rsc.queue.AddRateLimited(key)
return true
}
复制代码
1.1 rsc.syncHandler
调用rsc.syncHandler
实际为调用rsc.syncReplicaSet
方法,rsc.syncHandler
在NewBaseController
中被赋值为rsc.syncReplicaSet
,后续分析核心处理逻辑时再具体分析rsc.syncHandler
,此处不做深入分析。
// NewBaseController is the implementation of NewReplicaSetController with additional injected
// parameters so that it can also serve as the implementation of NewReplicationController.
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
...
rsc.syncHandler = rsc.syncReplicaSet
return rsc
}
复制代码
总结
replicaset controller 是 kube-controller-manager 组件中众多控制器中的一个,是 replicaset 资源对象的控制器,其通过对 replicaset、pod 2 种资源的监听,当这 2 种资源发生变化时会触发 replicaset controller 对相应的 replicaset 对象进行调谐操作,从而完成 replicaset 期望副本数的调谐,当实际 pod 的数量未达到预期时创建 pod,当实际 pod 的数量超过预期时删除 pod。
本篇博客对 replicaset controller 的初始化和启动做了分析,其中对 replicaset controller 注册的 pod 和 replicaet 对象的 event handler 做了代码分析,以及 replicaset controller 如何启动,注册了什么方法作为核心处理逻辑方法做了分析与介绍。
replicaset controller 架构图
replicaset controller 的大致组成和处理流程如下图,replicaset controller 对 pod 和 replicaset 对象注册了 event handler,当有事件时,会 watch 到然后将对应的 replicaset 对象放入到 queue 中,然后syncReplicaSet
方法为 replicaset controller 调谐 replicaset 对象的核心处理逻辑所在,从 queue 中取出 replicaset 对象,做调谐处理。
接下来的两篇博客,会依次给大家做 replicaset controller 的核心处理逻辑以及 expectations 机制的分析,敬请期待。
评论