写点什么

k8s statefulset controller 源码分析

作者:良凯尔
  • 2021 年 11 月 28 日
  • 本文字数:15529 字

    阅读完需:约 51 分钟

k8s statefulset controller源码分析

statefulset 简介

statefulset 是 Kubernetes 提供的管理有状态应用的对象,而 deployment 用于管理无状态应用。


有状态的 pod 与无状态的 pod 不一样的是,有状态的 pod 有时候需要通过其主机名来定位,而无状态的不需要,因为无状态的 pod 每个都是一样的,随机选一个就行,但对于有状态的来说,每一个 pod 都不一样,通常希望操作的是特定的某一个。


statefulset 适用于需要以下特点的应用:


(1)稳定的网络标志(主机名):Pod 重新调度后其 PodName 和 HostName 不变;


(2)稳定的持久化存储:基于 PVC,Pod 重新调度后仍能访问到相同的持久化数据;


(3)稳定的创建与扩容次序:有序创建或扩容,从序号小到大的顺序对 pod 进行创建(即从 0 到 N-1),且在下一个 Pod 创建运行之前,所有之前的 Pod 必须都是 Running 和 Ready 状态;


(4)稳定的删除与缩容次序:有序删除或缩容,从序号大到小的顺序对 pod 进行删除(即从 N-1 到 0),且在下一个 Pod 终止与删除之前,所有之前的 Pod 必须都已经被删除;


(5)稳定的滚动更新次序:从序号大到小的顺序对 pod 进行更新(即从 N-1 到 0),先删除后创建,且需等待当前序号的 pod 再次创建完成且状态为 Ready 时才能进行下一个 pod 的更新处理。

statefulset controller 简介

statefulset controller 是 kube-controller-manager 组件中众多控制器中的一个,是 statefulset 资源对象的控制器,其通过对 statefulset、pod 资源的监听,当资源发生变化时会触发 statefulset controller 对相应的 statefulset 资源对象进行调谐操作,从而完成 statefulset 对于 pod 的创建、删除、更新、扩缩容、statefulset 的滚动更新、statefulset 状态 status 更新、旧版本 statefulset 清理等操作。

statefulset controller 架构图

statefulset controller 的大致组成和处理流程如下图,statefulset controller 对 statefulset、pod 对象注册了 event handler,当有事件时,会 watch 到然后将对应的 statefulset 对象放入到 queue 中,然后syncStatefulSet方法为 statefulset controller 调谐 statefulset 对象的核心处理逻辑所在,从 queue 中取出 statefulset 对象,做调谐处理。



statefulset pod 的命名规则、pod 创建与删除

如果创建一个名称为 web、replicas 为 3 的 statefulset 对象,则其 pod 名称分别为 web-0、web-1、web-2。


statefulset pod 的创建按 0 - n 的顺序创建,且在创建下一个 pod 之前,需要等待前一个 pod 创建完成并处于 ready 状态。


同样拿上面的例子来说明,在 web statefulset 创建后,3 个 Pod 将按顺序创建 web-0,web-1,web-2。在 web-0 处于 ready 状态之前,web-1 将不会被创建,同样当 web-1 处于 ready 状态之前 web-2 也不会被创建。如果在 web-1 ready 后,web-2 创建之前, web-0 不处于 ready 状态了,这个时候 web-2 将不会被创建,直到 web-0 再次回到 ready 状态。


statefulset 滚动更新或缩容过程中 pod 的删除按 n - 0 的顺序删除,且在删除下一个 pod 之前,需要等待前一个 pod 删除完成。


另外,当 statefulset.Spec.VolumeClaimTemplates 中定义了 pod 所需的 pvc 时,statefulset controller 在创建 pod 时,会同时创建对应的 pvc 出来,但删除 pod 时,不会做对应 pvc 的删除操作,这些 pvc 需要人工额外做删除操作。

statefulset 更新策略

(1)OnDelete:使用 OnDelete 更新策略时,在更新 statefulset pod 模板后,只有当你手动删除老的 statefulset pods 之后,新的 statefulset Pod 才会被自动创建。


(2)RollingUpdate:使用 RollingUpdate 更新策略时,在更新 statefulset pod 模板后, 老的 statefulset pods 将被删除,并且将根据滚动更新配置自动创建新的 statefulset pods。滚动更新期间,每个序号的 statefulset pod 最多只能有一个,且滚动更新下一个 pod 之前,需等待前一个 pod 更新完成并处于 ready 状态。与 statefulset pod 按 0 - n 的顺序创建不同,滚动更新时 Pod 按逆序的方式(即 n - 0)删除并创建。


statefulset 的滚动升级中还有一个 Partition 配置,在设置 partition 后,滚动更新过程中,statefulset 的 Pod 中序号大于或等于 partition 的 Pod 会进行滚动升级,而其余的 Pod 保持不变,不会进行滚动更新。


statefulset controller 分析将分为两大块进行,分别是:


(1)statefulset controller 初始化与启动分析;


(2)statefulset controller 处理逻辑分析。

1.statefulset controller 初始化与启动分析

基于 tag v1.17.4

https://github.com/kubernetes/kubernetes/releases/tag/v1.17.4


直接看到 startStatefulSetController 函数,作为 statefulset controller 初始化与启动分析的入口。

startStatefulSetController

startStatefulSetController 主要逻辑:


(1)调用 statefulset.NewStatefulSetController 新建并初始化 StatefulSetController;


(2)拉起一个 goroutine,跑 StatefulSetController 的 Run 方法。


// cmd/kube-controller-manager/app/apps.gofunc startStatefulSetController(ctx ControllerContext) (http.Handler, bool, error) {  if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"}] {    return nil, false, nil  }  go statefulset.NewStatefulSetController(    ctx.InformerFactory.Core().V1().Pods(),    ctx.InformerFactory.Apps().V1().StatefulSets(),    ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),    ctx.InformerFactory.Apps().V1().ControllerRevisions(),    ctx.ClientBuilder.ClientOrDie("statefulset-controller"),  ).Run(int(ctx.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs), ctx.Stop)  return nil, true, nil}
复制代码

1.1 statefulset.NewStatefulSetController

statefulset.NewStatefulSetController函数代码中可以看到,statefulset controller 注册了 statefulset、pod 对象的 EventHandler,也即对这几个对象的 event 进行监听,把 event 放入事件队列并做处理,对 statefulset controller 做了初始化。


// pkg/controller/statefulset/stateful_set.gofunc NewStatefulSetController(  podInformer coreinformers.PodInformer,  setInformer appsinformers.StatefulSetInformer,  pvcInformer coreinformers.PersistentVolumeClaimInformer,  revInformer appsinformers.ControllerRevisionInformer,  kubeClient clientset.Interface,) *StatefulSetController {  eventBroadcaster := record.NewBroadcaster()  eventBroadcaster.StartLogging(klog.Infof)  eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})  recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})
ssc := &StatefulSetController{ kubeClient: kubeClient, control: NewDefaultStatefulSetControl( NewRealStatefulPodControl( kubeClient, setInformer.Lister(), podInformer.Lister(), pvcInformer.Lister(), recorder), NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()), history.NewHistory(kubeClient, revInformer.Lister()), recorder, ), pvcListerSynced: pvcInformer.Informer().HasSynced, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"), podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},
revListerSynced: revInformer.Informer().HasSynced, }
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ // lookup the statefulset and enqueue AddFunc: ssc.addPod, // lookup current and old statefulset if labels changed UpdateFunc: ssc.updatePod, // lookup statefulset accounting for deletion tombstones DeleteFunc: ssc.deletePod, }) ssc.podLister = podInformer.Lister() ssc.podListerSynced = podInformer.Informer().HasSynced
setInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: ssc.enqueueStatefulSet, UpdateFunc: func(old, cur interface{}) { oldPS := old.(*apps.StatefulSet) curPS := cur.(*apps.StatefulSet) if oldPS.Status.Replicas != curPS.Status.Replicas { klog.V(4).Infof("Observed updated replica count for StatefulSet: %v, %d->%d", curPS.Name, oldPS.Status.Replicas, curPS.Status.Replicas) } ssc.enqueueStatefulSet(cur) }, DeleteFunc: ssc.enqueueStatefulSet, }, ) ssc.setLister = setInformer.Lister() ssc.setListerSynced = setInformer.Informer().HasSynced
// TODO: Watch volumes return ssc}
复制代码

1.2 Run

主要看到 for 循环处,根据 workers 的值(可通过 kube-controller-manager 组件启动参数concurrent-statefulset-syncs来设置,默认值为 5),启动相应数量的 goroutine,跑ssc.worker方法,调用 daemonset controller 核心处理方法ssc.sync来调谐 statefulset 对象。


// pkg/controller/statefulset/stateful_set.gofunc (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {  defer utilruntime.HandleCrash()  defer ssc.queue.ShutDown()
klog.Infof("Starting stateful set controller") defer klog.Infof("Shutting down statefulset controller")
if !cache.WaitForNamedCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) { return }
for i := 0; i < workers; i++ { go wait.Until(ssc.worker, time.Second, stopCh) }
<-stopCh}
复制代码

1.2.1 ssc.worker

从 queue 队列中取出事件 key,并调用ssc.sync(关于 ssc.sync 方法会在后面做详细分析)对 statefulset 对象做调谐处理。queue 队列里的事件来源前面讲过,是 statefulset controller 注册的 statefulset、pod 对象的 EventHandler,它们的变化 event 会被监听到然后放入 queue 中。


// pkg/controller/daemon/daemon_controller.gofunc (ssc *StatefulSetController) worker() {  for ssc.processNextWorkItem() {  }}
func (ssc *StatefulSetController) processNextWorkItem() bool { key, quit := ssc.queue.Get() if quit { return false } defer ssc.queue.Done(key) if err := ssc.sync(key.(string)); err != nil { utilruntime.HandleError(fmt.Errorf("Error syncing StatefulSet %v, requeuing: %v", key.(string), err)) ssc.queue.AddRateLimited(key) } else { ssc.queue.Forget(key) } return true}
复制代码

2.statefulset controller 核心处理逻辑分析

sync

直接看到 statefulset controller 核心处理入口 sync 方法。


主要逻辑:


(1)获取执行方法时的当前时间,并定义defer函数,用于计算该方法总执行时间,也即统计对一个 statefulset 进行同步调谐操作的耗时;


(2)根据 statefulset 对象的命名空间与名称,获取 statefulset 对象;


(3)调用 ssc.adoptOrphanRevisions,检查是否有孤儿 controllerrevisions 对象(即.spec.ownerReferences 中无 controller 属性定义或其属性值为 false),若有且其与 statefulset 对象的 selector 匹配 的则添加 ownerReferences 进行关联;


(4)调用 ssc.getPodsForStatefulSet,根据 statefulset 对象的 selector 去查找 pod 列表,且若有孤儿 pod 的 label 与 statefulset 的 selector 能匹配的则进行关联,若已关联的 pod 的 label 不再与 statefulset 的 selector 匹配,则更新解除它们的关联关系;


(5)调用 ssc.syncStatefulSet,对 statefulset 对象做调谐处理。


// pkg/controller/statefulset/stateful_set.gofunc (ssc *StatefulSetController) sync(key string) error {  startTime := time.Now()  defer func() {    klog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Since(startTime))  }()
namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } set, err := ssc.setLister.StatefulSets(namespace).Get(name) if errors.IsNotFound(err) { klog.Infof("StatefulSet has been deleted %v", key) return nil } if err != nil { utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err)) return err }
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) if err != nil { utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err)) // This is a non-transient error, so don't retry. return nil }
if err := ssc.adoptOrphanRevisions(set); err != nil { return err }
pods, err := ssc.getPodsForStatefulSet(set, selector) if err != nil { return err }
return ssc.syncStatefulSet(set, pods)}
复制代码

2.1 ssc.getPodsForStatefulSet

ssc.getPodsForStatefulSet 方法主要作用是获取属于 statefulset 对象的 pod 列表并返回,并检查孤儿 pod 与已匹配的 pod,看是否需要更新 statefulset 与 pod 的匹配。


主要逻辑:


(1)获取 statefulset 所在命名空间下的所有 pod;


(2)定义过滤出属于 statefulset 对象的 pod 的函数,即 isMemberOf 函数(根据 pod 的名称与 statefulset 名称匹配来过滤属于 statefulset 的 pod);


(3)调用 cm.ClaimPods,过滤出属于该 statefulset 对象的 pod,且若有孤儿 pod 的 label 与 statefulset 的 selector 能匹配的则进行关联,若已关联的 pod 的 label 不再与 statefulset 的 selector 匹配,则更新解除它们的关联关系。


// pkg/controller/statefulset/stateful_set.gofunc (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, selector labels.Selector) ([]*v1.Pod, error) {  // List all pods to include the pods that don't match the selector anymore but  // has a ControllerRef pointing to this StatefulSet.  pods, err := ssc.podLister.Pods(set.Namespace).List(labels.Everything())  if err != nil {    return nil, err  }
filter := func(pod *v1.Pod) bool { // Only claim if it matches our StatefulSet name. Otherwise release/ignore. return isMemberOf(set, pod) }
// If any adoptions are attempted, we should first recheck for deletion with // an uncached quorum read sometime after listing Pods (see #42639). canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { fresh, err := ssc.kubeClient.AppsV1().StatefulSets(set.Namespace).Get(set.Name, metav1.GetOptions{}) if err != nil { return nil, err } if fresh.UID != set.UID { return nil, fmt.Errorf("original StatefulSet %v/%v is gone: got uid %v, wanted %v", set.Namespace, set.Name, fresh.UID, set.UID) } return fresh, nil })
cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, controllerKind, canAdoptFunc) return cm.ClaimPods(pods, filter)}
复制代码

2.2 ssc.syncStatefulSet

ssc.syncStatefulSet 方法可以说是 statefulset controller 的核心处理逻辑所在了,主要看到ssc.control.UpdateStatefulSet方法。


// pkg/controller/statefulset/stateful_set.gofunc (ssc *StatefulSetController) syncStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {  klog.V(4).Infof("Syncing StatefulSet %v/%v with %d pods", set.Namespace, set.Name, len(pods))  // TODO: investigate where we mutate the set during the update as it is not obvious.  if err := ssc.control.UpdateStatefulSet(set.DeepCopy(), pods); err != nil {    return err  }  klog.V(4).Infof("Successfully synced StatefulSet %s/%s successful", set.Namespace, set.Name)  return nil}
复制代码


ssc.control.UpdateStatefulSet 方法主要逻辑:


(1)获取 statefulset 的所有 ControllerRevision 并根据版本新老顺序排序;


(2)调用 ssc.getStatefulSetRevisions,获取现存最新的 statefulset 版本以及计算出一个新的版本;


(3)调用 ssc.updateStatefulSet,完成 statefulset 对象对于 pod 的创建、删除、更新、扩缩容等操作;


(4)调用 ssc.updateStatefulSetStatus,更新 statefulset 对象的 status 状态;


(5)调用 ssc.truncateHistory,根据 statefulset 对象配置的历史版本数量限制,按之前的排序顺序清理掉没有 pod 的 statefulset 历史版本。


// pkg/controller/statefulset/stateful_set_control.gofunc (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {
// list all revisions and sort them revisions, err := ssc.ListRevisions(set) if err != nil { return err } history.SortControllerRevisions(revisions)
// get the current, and update revisions currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions) if err != nil { return err }
// perform the main update function and get the status status, err := ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods) if err != nil { return err }
// update the set's status err = ssc.updateStatefulSetStatus(set, status) if err != nil { return err }
klog.V(4).Infof("StatefulSet %s/%s pod status replicas=%d ready=%d current=%d updated=%d", set.Namespace, set.Name, status.Replicas, status.ReadyReplicas, status.CurrentReplicas, status.UpdatedReplicas)
klog.V(4).Infof("StatefulSet %s/%s revisions current=%s update=%s", set.Namespace, set.Name, status.CurrentRevision, status.UpdateRevision)
// maintain the set's revision history limit return ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)}
复制代码

2.2.1 ssc.updateStatefulSet

updateStatefulSet 方法是 statefulset 对象调谐操作中的核心方法,完成 statefulset 对象对于 pod 的创建、删除、更新、扩缩容等操作。此方法代码比较长,跟随我的节奏慢慢分析。


主要逻辑:


(1)看到第一个 for 循环,将 statefulset 的所有 pod 按 ord(ord 即为 pod name 中的序号)的值分到 replicas 和 condemned 两个数组中,序号小于 statefulset 对象期望副本数值的放到 replicas 数组(因为序号从 0 开始,所以是小于期望副本数值),大于等于期望副本数值的放到 condemned 数组,replicas 数组代表正常的可用的 pod 列表,condemned 数组中的是需要被删除的 pod 列表;在遍历 pod 时,同时根据 pod 的状态计算 statefulset 对象的 status 值;


(2)第二个 for 循环,当序号小于 statefulset 期望副本数值的 pod 未创建出来时,则根据 statefulset 对象中的 pod 模板,构建出相应序号值的 pod 对象(此时还并没有向 apiserver 发起创建 pod 的请求,只是构建好 pod 结构体);


(3)第三个和第四个 for 循环,遍历 replicas 和 condemned 两个数组,找到非 healthy 状态的最小序号的 pod 记录下来,并记录序号;


(4)当 statefulset 对象的 DeletionTimestamp 不为 nil 时,直接返回前面计算出来的 statefulset 的新 status 值,不再进行方法后续的逻辑处理;


(5)获取 monotonic 的值,当 statefulset.Spec.PodManagementPolicy 的值为 Parallel 时,monotonic 的值为 false,否则为 true(Parallel 代表 statefulset controller 可以并行的处理同一 statefulset 的 pod,串行则代表在启动和终止下一个 pod 之前需要等待前一个 pod 变成 ready 状态或 pod 对象被删除掉);


(6)第五个 for 循环,遍历 replicas 数组,处理 statefulset 的 pod,主要是做 pod 的创建(包括根据 statefulset.Spec.VolumeClaimTemplates 中定义的 pod 所需的 pvc 的创建):


(6.1)当 pod 处于 fail 状态(pod.Status.Phase 的值为 Failed)时,调用 apiserver 删除该 pod(pod 对应的 pvc 在这里不会做删除操作)并给 replicas 数组构建相应序号的新的 pod 结构体(用于下一步中重新创建该序号的 pod);


(6.2)如果相应序号的 pod 未创建时,调用 apiserver 创建该序号的 pod(包括创建 pvc),且当 monotonic 为 true 时(statefulset 没有配置 Parallel),直接 return,结束 updateStatefulSet 方法的执行;


(6.3)剩下的逻辑就是当没有配置 Parallel 时,将串行处理 pod,在启动和终止下一个 pod 之前需要等待前一个 pod 变成 ready 状态或 pod 对象被删除掉,不再展开分析;


(7)第六个 for 循环,逆序(pod 序号从大到小)遍历 condemned 数组,处理 statefulset 的 pod,主要是做多余 pod 的删除,删除逻辑也受 Parallel 影响,不展开分析。


(8)判断 statefulset 的更新策略,若为 OnDelete,则直接 return(使用了该更新策略,则需要人工删除 pod 后才会重建相应序号的 pod);(9)获取滚动更新配置中的 Partition 值,当 statefulset 进行滚动更新时,小于等于该序号的 pod 将不会被更新;


(10)第七个 for 循环,主要是处理更新策略为 RollingUpdate 的 statefulset 对象的更新。statefulset 的滚动更新,从序号大到小的顺序对 pod 进行更新,先删除后创建,且需等待当前序号的 pod 再次创建完成且状态为 Ready 时才能进行下一个 pod 的更新处理。


// pkg/controller/statefulset/stateful_set_control.gofunc (ssc *defaultStatefulSetControl) updateStatefulSet(  set *apps.StatefulSet,  currentRevision *apps.ControllerRevision,  updateRevision *apps.ControllerRevision,  collisionCount int32,  pods []*v1.Pod) (*apps.StatefulSetStatus, error) {  // get the current and update revisions of the set.  currentSet, err := ApplyRevision(set, currentRevision)  if err != nil {    return nil, err  }  updateSet, err := ApplyRevision(set, updateRevision)  if err != nil {    return nil, err  }
// set the generation, and revisions in the returned status status := apps.StatefulSetStatus{} status.ObservedGeneration = set.Generation status.CurrentRevision = currentRevision.Name status.UpdateRevision = updateRevision.Name status.CollisionCount = new(int32) *status.CollisionCount = collisionCount
replicaCount := int(*set.Spec.Replicas) // slice that will contain all Pods such that 0 <= getOrdinal(pod) < set.Spec.Replicas replicas := make([]*v1.Pod, replicaCount) // slice that will contain all Pods such that set.Spec.Replicas <= getOrdinal(pod) condemned := make([]*v1.Pod, 0, len(pods)) unhealthy := 0 firstUnhealthyOrdinal := math.MaxInt32 var firstUnhealthyPod *v1.Pod // 第一个for循环,将statefulset的pod分到replicas和condemned两个数组中,其中condemned数组中的pod代表需要被删除的 // First we partition pods into two lists valid replicas and condemned Pods for i := range pods { status.Replicas++
// count the number of running and ready replicas if isRunningAndReady(pods[i]) { status.ReadyReplicas++ }
// count the number of current and update replicas if isCreated(pods[i]) && !isTerminating(pods[i]) { if getPodRevision(pods[i]) == currentRevision.Name { status.CurrentReplicas++ } if getPodRevision(pods[i]) == updateRevision.Name { status.UpdatedReplicas++ } }
if ord := getOrdinal(pods[i]); 0 <= ord && ord < replicaCount { // if the ordinal of the pod is within the range of the current number of replicas, // insert it at the indirection of its ordinal replicas[ord] = pods[i]
} else if ord >= replicaCount { // if the ordinal is greater than the number of replicas add it to the condemned list condemned = append(condemned, pods[i]) } // If the ordinal could not be parsed (ord < 0), ignore the Pod. } // 第二个for循环,当序号小于statefulset期望副本数值的pod未创建出来时,则根据statefulset对象中的pod模板,构建出相应序号值的pod对象(此时还并没有向apiserver发起创建pod的请求,只是构建好pod结构体) // for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod at the correct revision for ord := 0; ord < replicaCount; ord++ { if replicas[ord] == nil { replicas[ord] = newVersionedStatefulSetPod( currentSet, updateSet, currentRevision.Name, updateRevision.Name, ord) } }
// sort the condemned Pods by their ordinals sort.Sort(ascendingOrdinal(condemned)) // 第三个和第四个for循环,遍历replicas和condemned两个数组,找到非healthy状态的最小序号的pod记录下来,并记录序号 // find the first unhealthy Pod for i := range replicas { if !isHealthy(replicas[i]) { unhealthy++ if ord := getOrdinal(replicas[i]); ord < firstUnhealthyOrdinal { firstUnhealthyOrdinal = ord firstUnhealthyPod = replicas[i] } } } for i := range condemned { if !isHealthy(condemned[i]) { unhealthy++ if ord := getOrdinal(condemned[i]); ord < firstUnhealthyOrdinal { firstUnhealthyOrdinal = ord firstUnhealthyPod = condemned[i] } } }
if unhealthy > 0 { klog.V(4).Infof("StatefulSet %s/%s has %d unhealthy Pods starting with %s", set.Namespace, set.Name, unhealthy, firstUnhealthyPod.Name) } // 当statefulset对象的DeletionTimestamp不为nil时,直接返回前面计算出来的statefulset的新status值,不再进行方法后续的逻辑处理 // If the StatefulSet is being deleted, don't do anything other than updating // status. if set.DeletionTimestamp != nil { return &status, nil } // 获取monotonic的值,当statefulset.Spec.PodManagementPolicy的值为Parallel时,monotonic的值为false,否则为true monotonic := !allowsBurst(set) // 第五个for循环,遍历replicas数组,处理statefulset的pod,主要是做pod的创建 // Examine each replica with respect to its ordinal for i := range replicas { // delete and recreate failed pods if isFailed(replicas[i]) { ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod", "StatefulSet %s/%s is recreating failed Pod %s", set.Namespace, set.Name, replicas[i].Name) if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil { return &status, err } if getPodRevision(replicas[i]) == currentRevision.Name { status.CurrentReplicas-- } if getPodRevision(replicas[i]) == updateRevision.Name { status.UpdatedReplicas-- } status.Replicas-- replicas[i] = newVersionedStatefulSetPod( currentSet, updateSet, currentRevision.Name, updateRevision.Name, i) } // If we find a Pod that has not been created we create the Pod if !isCreated(replicas[i]) { if err := ssc.podControl.CreateStatefulPod(set, replicas[i]); err != nil { return &status, err } status.Replicas++ if getPodRevision(replicas[i]) == currentRevision.Name { status.CurrentReplicas++ } if getPodRevision(replicas[i]) == updateRevision.Name { status.UpdatedReplicas++ }
// if the set does not allow bursting, return immediately if monotonic { return &status, nil } // pod created, no more work possible for this round continue } // If we find a Pod that is currently terminating, we must wait until graceful deletion // completes before we continue to make progress. if isTerminating(replicas[i]) && monotonic { klog.V(4).Infof( "StatefulSet %s/%s is waiting for Pod %s to Terminate", set.Namespace, set.Name, replicas[i].Name) return &status, nil } // If we have a Pod that has been created but is not running and ready we can not make progress. // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its // ordinal, are Running and Ready. if !isRunningAndReady(replicas[i]) && monotonic { klog.V(4).Infof( "StatefulSet %s/%s is waiting for Pod %s to be Running and Ready", set.Namespace, set.Name, replicas[i].Name) return &status, nil } // Enforce the StatefulSet invariants if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) { continue } // Make a deep copy so we don't mutate the shared cache replica := replicas[i].DeepCopy() if err := ssc.podControl.UpdateStatefulPod(updateSet, replica); err != nil { return &status, err } } // 第六个for循环,逆序(pod序号从大到小)遍历condemned数组,处理statefulset的pod,主要是做多余pod的删除 // At this point, all of the current Replicas are Running and Ready, we can consider termination. // We will wait for all predecessors to be Running and Ready prior to attempting a deletion. // We will terminate Pods in a monotonically decreasing order over [len(pods),set.Spec.Replicas). // Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over // updates. for target := len(condemned) - 1; target >= 0; target-- { // wait for terminating pods to expire if isTerminating(condemned[target]) { klog.V(4).Infof( "StatefulSet %s/%s is waiting for Pod %s to Terminate prior to scale down", set.Namespace, set.Name, condemned[target].Name) // block if we are in monotonic mode if monotonic { return &status, nil } continue } // if we are in monotonic mode and the condemned target is not the first unhealthy Pod block if !isRunningAndReady(condemned[target]) && monotonic && condemned[target] != firstUnhealthyPod { klog.V(4).Infof( "StatefulSet %s/%s is waiting for Pod %s to be Running and Ready prior to scale down", set.Namespace, set.Name, firstUnhealthyPod.Name) return &status, nil } klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for scale down", set.Namespace, set.Name, condemned[target].Name)
if err := ssc.podControl.DeleteStatefulPod(set, condemned[target]); err != nil { return &status, err } if getPodRevision(condemned[target]) == currentRevision.Name { status.CurrentReplicas-- } if getPodRevision(condemned[target]) == updateRevision.Name { status.UpdatedReplicas-- } if monotonic { return &status, nil } } // 判断statefulset的更新策略,若为OnDelete,则直接return(使用了该更新策略,则需要人工删除pod后才会重建相应序号的pod) // for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted. if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType { return &status, nil } // 获取滚动更新配置中的Partition值,当statefulset进行滚动更新时,小于等于该序号的pod将不会被更新 // we compute the minimum ordinal of the target sequence for a destructive update based on the strategy. updateMin := 0 if set.Spec.UpdateStrategy.RollingUpdate != nil { updateMin = int(*set.Spec.UpdateStrategy.RollingUpdate.Partition) } // 第七个for循环,主要是处理更新策略为RollingUpdate的statefulset对象的更新 // we terminate the Pod with the largest ordinal that does not match the update revision. for target := len(replicas) - 1; target >= updateMin; target-- {
// delete the Pod if it is not already terminating and does not match the update revision. if getPodRevision(replicas[target]) != updateRevision.Name && !isTerminating(replicas[target]) { klog.V(2).Infof("StatefulSet %s/%s terminating Pod %s for update", set.Namespace, set.Name, replicas[target].Name) err := ssc.podControl.DeleteStatefulPod(set, replicas[target]) status.CurrentReplicas-- return &status, err }
// wait for unhealthy Pods on update if !isHealthy(replicas[target]) { klog.V(4).Infof( "StatefulSet %s/%s is waiting for Pod %s to update", set.Namespace, set.Name, replicas[target].Name) return &status, nil }
} return &status, nil}
复制代码


结合上面对该方法的分析,来总结下在此方法中都有哪些步骤涉及了 statefulset 对象对于 pod 的创建、删除、扩缩容、更新操作:


1.创建:主要是(6)第五个 for 循环;


2.删除:主要是(7)第六个 for 循环;


3.扩缩容: (1)~(7);


4.更新:主要是(8)(9)与(10)第七个 for 循环,其中(8)为 OnDelete 更新策略的处理,(9)(10)为滚动更新策略的处理。

总结

statefulset controller 架构图

statefulset controller 的大致组成和处理流程如下图,statefulset controller 对 statefulset、pod 对象注册了 event handler,当有事件时,会 watch 到然后将对应的 statefulset 对象放入到 queue 中,然后syncStatefulSet方法为 statefulset controller 调谐 statefulset 对象的核心处理逻辑所在,从 queue 中取出 statefulset 对象,做调谐处理。



statefulset controller 核心处理逻辑

statefulset controller 的核心处理逻辑是调谐 statefulset 对象,从而完成 statefulset 对于 pod 的创建、删除、更新、扩缩容、statefulset 的滚动更新、statefulset 状态 status 更新、旧版本 statefulset 清理等操作。



statefulset 更新策略

(1)OnDelete:使用 OnDelete 更新策略时,在更新 statefulset pod 模板后,只有当你手动删除老的 statefulset pods 之后,新的 statefulset Pod 才会被自动创建。


(2)RollingUpdate:使用 RollingUpdate 更新策略时,在更新 statefulset pod 模板后, 老的 statefulset pods 将被删除,并且将根据滚动更新配置自动创建新的 statefulset pods。滚动更新期间,每个序号的 statefulset pod 最多只能有一个,且滚动更新下一个 pod 之前,需等待前一个 pod 更新完成并处于 ready 状态。与 statefulset pod 按 0 - n 的顺序创建不同,滚动更新时 Pod 按逆序的方式(即 n - 0)删除并创建。


statefulset 的滚动升级中还有一个 Partition 配置,在设置 partition 后,滚动更新过程中,statefulset 的 Pod 中序号大于或等于 partition 的 Pod 会进行滚动升级,而其余的 Pod 保持不变,不会进行滚动更新。

statefulset pod 的命名规则、pod 创建与删除

如果创建一个名称为 web、replicas 为 3 的 statefulset 对象,则其 pod 名称分别为 web-0、web-1、web-2。


statefulset pod 的创建按 0 - n 的顺序创建,且在创建下一个 pod 之前,需要等待前一个 pod 创建完成并处于 ready 状态。


同样拿上面的例子来说明,在 web statefulset 创建后,3 个 Pod 将按顺序创建 web-0,web-1,web-2。在 web-0 处于 ready 状态之前,web-1 将不会被创建,同样当 web-1 处于 ready 状态之前 web-2 也不会被创建。如果在 web-1 ready 后,web-2 创建之前, web-0 不处于 ready 状态了,这个时候 web-2 将不会被创建,直到 web-0 再次回到 ready 状态。


statefulset 滚动更新或缩容过程中 pod 的删除按 n - 0 的顺序删除,且在删除下一个 pod 之前,需要等待前一个 pod 删除完成。


另外,当 statefulset.Spec.VolumeClaimTemplates 中定义了 pod 所需的 pvc 时,statefulset controller 在创建 pod 时,会同时创建对应的 pvc 出来,但删除 pod 时,不会做对应 pvc 的删除操作,这些 pvc 需要人工额外做删除操作。

发布于: 16 小时前阅读数: 7
用户头像

良凯尔

关注

热爱的力量 2020.01.10 加入

kubernetes开发者

评论

发布
暂无评论
k8s statefulset controller源码分析