replicaset controller 分析
replicaset controller 简介
replicaset controller 是 kube-controller-manager 组件中众多控制器中的一个,是 replicaset 资源对象的控制器,其通过对 replicaset、pod 2 种资源的监听,当这 2 种资源发生变化时会触发 replicaset controller 对相应的 replicaset 对象进行调谐操作,从而完成 replicaset 期望副本数的调谐,当实际 pod 的数量未达到预期时创建 pod,当实际 pod 的数量超过预期时删除 pod。
replicaset controller 主要作用是根据 replicaset 对象所期望的 pod 数量与现存 pod 数量做比较,然后根据比较结果创建/删除 pod,最终使得 replicaset 对象所期望的 pod 数量与现存 pod 数量相等。
replicaset controller 架构图
replicaset controller 的大致组成和处理流程如下图,replicaset controller 对 pod 和 replicaset 对象注册了 event handler,当有事件时,会 watch 到然后将对应的 replicaset 对象放入到 queue 中,然后syncReplicaSet
方法为 replicaset controller 调谐 replicaset 对象的核心处理逻辑所在,从 queue 中取出 replicaset 对象,做调谐处理。
replicaset controller 分析分为 3 大块进行,分别是:
(1)replicaset controller 初始化和启动分析;
(2)replicaset controller 核心处理逻辑分析;
(3)replicaset controller expectations 机制分析。
本篇博客进行 replicaset controller expectations 机制分析。
expectations 机制概述
expectations 记录了 replicaset 对象在某一次调谐中期望创建/删除的 pod 数量,pod 创建/删除完成后,该期望数会相应的减少,当期望创建/删除的 pod 数量小于等于 0 时,说明上一次调谐中期望创建/删除的 pod 数量已经达到,调用rsc.expectations.SatisfiedExpectations
方法返回 true。
根据前面的分析,在 replicaset controller 对 replicaset 对象进行调谐操作时,首先会调用rsc.expectations.SatisfiedExpectations
方法,返回 true 且 replicaset 对象的 deletetimestamp 为空,才会调用rsc.manageReplicas
方法进行期望副本数的调谐操作,也即 pod 的创建/删除操作。
replicaset controller expectations 机制分析
这个 expectations 机制的作用是什么?下面来分析一下。
以创建 1000 个副本的 replicaset 为例,分析下 expectations 的作用。根据前面对 replicaset controller 的核心处理分析可以得知,1000 个 pod 将通过两次对 replicaset 对象的调谐,每次 500 个进行创建。
直接看到 replicaset controller 的核心处理逻辑方法syncReplicaSet
。
syncReplicaSet
每次调用 rsc.manageReplicas 方法前,都会调用rsc.expectations.SatisfiedExpectations
来判断是否可以进行 replicaset 期望副本的调谐操作(pod 的创建删除操作),返回 true 时才会调用rsc.manageReplicas
方法。
// pkg/controller/replicaset/replica_set.go
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
}()
...
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
return nil
}
...
var manageReplicasErr error
if rsNeedsSync && rs.DeletionTimestamp == nil {
manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
}
...
}
复制代码
rsc.expectations.SatisfiedExpectations
接下来看到 rsc.expectations.SatisfiedExpectations 方法,主要是用于判断是否需要在 syncReplicaSet 核心处理方法中调用 rsc.manageReplicas 方法来进行 pod 的创建删除操作。
(1)第一次进来(首次创建 replicaset)时 r.GetExpectations 找不到该 rs 对象对应的 expectations,exists 的值为 false,所以 rsc.expectations.SatisfiedExpectations 方法返回 true,也就是说 syncReplicaSet 方法中会调用 rsc.manageReplicas 方法来进行 pod 的创建操作,并在 rsc.manageReplicas 方法中设置 expectations 为期望创建 500 个 pod;
(2)在第一次创建 500 个 pod 的操作没有完成之前,以及第一次创建 500 个 pod 的操作开始后的 5 分钟之内,exp.Fulfilled 与 exp.isExpired 都返回 false,所以 rsc.expectations.SatisfiedExpectations 方法返回 false,也就是说 syncReplicaSet 方法中不会调用 rsc.manageReplicas 方法来进行 pod 的创建操作;
(3)在第一次创建 500 个 pod 的操作完成之后,或者第一次创建 500 个 pod 操作进行了 5 分钟有余,则 exp.Fulfilled 或 exp.isExpired 会返回 true,所以 rsc.expectations.SatisfiedExpectations 方法返回 true,也就是说 syncReplicaSet 方法中会调用 rsc.manageReplicas 方法来进行第二次 500 个 pod 的创建操作,并在 rsc.manageReplicas 方法中再次设置 expectations 为期望创建 500 个 pod。
// pkg/controller/controller_utils.go
// SatisfiedExpectations returns true if the required adds/dels for the given controller have been observed.
// Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller
// manager.
func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {
if exp, exists, err := r.GetExpectations(controllerKey); exists {
if exp.Fulfilled() {
klog.V(4).Infof("Controller expectations fulfilled %#v", exp)
return true
} else if exp.isExpired() {
klog.V(4).Infof("Controller expectations expired %#v", exp)
return true
} else {
klog.V(4).Infof("Controller still waiting on expectations %#v", exp)
return false
}
} else if err != nil {
klog.V(2).Infof("Error encountered while checking expectations %#v, forcing sync", err)
} else {
// When a new controller is created, it doesn't have expectations.
// When it doesn't see expected watch events for > TTL, the expectations expire.
// - In this case it wakes up, creates/deletes controllees, and sets expectations again.
// When it has satisfied expectations and no controllees need to be created/destroyed > TTL, the expectations expire.
// - In this case it continues without setting expectations till it needs to create/delete controllees.
klog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey)
}
// Trigger a sync if we either encountered and error (which shouldn't happen since we're
// getting from local store) or this controller hasn't established expectations.
return true
}
复制代码
exp.Fulfilled
判断 replicaset 对象的 expectations 里的期望创建 pod 数量以及期望删除 pod 数量,都小于等于 0 时返回 true。
// Fulfilled returns true if this expectation has been fulfilled.
func (e *ControlleeExpectations) Fulfilled() bool {
// TODO: think about why this line being atomic doesn't matter
return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0
}
复制代码
exp.isExpired
判断 replicaset 对象上次设置 expectations 时的时间距离现在的时间是否已经超过 5 分钟,是则返回 true。
func (exp *ControlleeExpectations) isExpired() bool {
return clock.RealClock{}.Since(exp.timestamp) > ExpectationsTimeout
}
复制代码
rsc.manageReplicas
核心处理方法,主要是根据 replicaset 所期望的 pod 数量与现存 pod 数量做比较,然后根据比较结果创建/删除 pod,最终使得 replicaset 对象所期望的 pod 数量与现存 pod 数量相等。
(1)创建 pod 之前,会调用 rsc.expectations.ExpectCreations 来设置 Expectations:(key,add:500,del:0);
(2)调用 slowStartBatch 来执行 pod 的创建;
(3)创建完 pod 之后,判断是否有创建失败的 pod,并根据创建失败的 pod 数量,调用 rsc.expectations.CreationObserved 减去 Expectations 中相应的 add 的值。
// pkg/controller/replicaset/replica_set.go
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
...
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
}
rsc.expectations.ExpectCreations(rsKey, diff)
klog.V(2).Infof("Too few replicas for %v %s/%s, need %d, creating %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)
successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
...
})
if skippedPods := diff - successfulCreations; skippedPods > 0 {
klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)
for i := 0; i < skippedPods; i++ {
// Decrement the expected number of creates because the informer won't observe this pod
rsc.expectations.CreationObserved(rsKey)
}
}
...
复制代码
rsc.expectations.ExpectCreations
设置 replicaset 对象的 expectations。
// pkg/controller/controller_utils.go
func (r *ControllerExpectations) ExpectCreations(controllerKey string, adds int) error {
return r.SetExpectations(controllerKey, adds, 0)
}
// SetExpectations registers new expectations for the given controller. Forgets existing expectations.
func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) error {
exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey, timestamp: clock.RealClock{}.Now()}
klog.V(4).Infof("Setting expectations %#v", exp)
return r.Add(exp)
}
复制代码
rsc.expectations.CreationObserved
将 replicaset 对象 expectations 中期望创建的 pod 数量减 1.
// pkg/controller/controller_utils.go
// CreationObserved atomically decrements the `add` expectation count of the given controller.
func (r *ControllerExpectations) CreationObserved(controllerKey string) {
r.LowerExpectations(controllerKey, 1, 0)
}
// Decrements the expectation counts of the given controller.
func (r *ControllerExpectations) LowerExpectations(controllerKey string, add, del int) {
if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
exp.Add(int64(-add), int64(-del))
// The expectations might've been modified since the update on the previous line.
klog.V(4).Infof("Lowered expectations %#v", exp)
}
}
复制代码
那正常情况下(即没有 pod 创建异常)Expectations 在什么时候会更新为(key,add:0,del:0)呢,继续看下面的分析。
pod add event handlerFunc-addPod
replicaset controller 会监听 pod 的新增事件,每成功创建出一个 pod,会调用 addPod 方法。在 addPod 方法中,同样会调用一次 rsc.expectations.CreationObserved,将 Expectations 中期望创建的 pod 数量减 1。
// pkg/controller/replicaset/replica_set.go
// When a pod is created, enqueue the replica set that manages it and update its expectations.
func (rsc *ReplicaSetController) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
...
// If it has a ControllerRef, that's all that matters.
if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
if rs == nil {
return
}
rsKey, err := controller.KeyFunc(rs)
if err != nil {
return
}
klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
rsc.expectations.CreationObserved(rsKey)
rsc.queue.Add(rsKey)
return
}
...
}
复制代码
replicaset controller 第一次创建了 500 个 pod 之后,通过 replicaset controller 对 pod 新增事件的 watch,然后调用 rsc.expectations.CreationObserved 方法将 Expectations 中期望创建的 pod 数量减 1,以及 rsc.manageReplicas 方法中对创建失败的 pod 数量,调用相应次数的 rsc.expectations.CreationObserved 方法将 Expectations 中期望创建的 pod 数量相应减少,最终使该 replicaset 对象的 Expectations 的值将变为:(key,add:0,del:0),这样在下次对该 replicaset 对象的调谐操作中,即可进行下一批次的 500 个 pod 的创建。
关于 replicaset controller 删除 pod 时的 expectations 机制,与上述创建 pod 时分析的 expectations 机制差不多,可以自己去分析下,这里不再展开分析。
总结
上面以 replicaset controller 创建 pod 为例分析了 expectations 的作用,删除 pod 的逻辑中 expectations 起到了类似的作用,此处不再分析。下面来总结一下 replicaset controller 中 expectations 机制的作用。
expectations 机制作用总结
expectations 的过期时间机制解决了某一批次创建/删除 pod 因某些原因一直卡住不能完成而导致的 replicaset 期望副本数永远达不到预期的问题。
expectations.SatisfiedExpectations 返回 true,则进入核心处理方法 rsc.manageReplicas,根据 replicaset 所期望的 pod 数量与现存 pod 数量做比较,判断是否需要进行下一批次的创建/删除 pod 的任务。
综上可以看出,expectations 主要用于控制让多个创建/删除 pod 批次串行执行,不让其并行执行,防止了并发执行所可能产生的重复删除 pod、创建出 replicaset 所期望的 pod 数量以外的多余的 pod 等问题(当 replicaset 对象的某一创建/删除 pod 的批次还在进行中,这时再次进行 pod 的创建删除操作,如果没有 expectations 的判断控制,就会再次进行 pod 的批量创建/删除时,从而导致该问题的发生)。
评论