写点什么

k8s daemonset controller 源码分析

作者:良凯尔
  • 2021 年 12 月 23 日
  • 本文字数:15399 字

    阅读完需:约 51 分钟

k8s daemonset controller源码分析

daemonset controller 分析

daemonset controller 简介

daemonset controller 是 kube-controller-manager 组件中众多控制器中的一个,是 daemonset 资源对象的控制器,其通过对 daemonset、pod、node、ControllerRevision 四种资源的监听,当这四种资源发生变化时会触发 daemonset controller 对相应的 daemonset 资源进行调谐操作,从而完成 daemonset 在合适 node 上 pod 的创建、在不合适 node 上 pod 的删除、daemonset 的滚动更新、daemonset 状态 status 更新、旧版本 daemonset 清理等操作。

daemonset controller 架构图

daemonset controller 的大致组成和处理流程如下图,daemonset controller 对 daemonset、pod、node、ControllerRevision 对象注册了 event handler,当有事件时,会 watch 到然后将对应的 daemonset 对象放入到 queue 中,然后syncDaemonset方法为 daemonset controller 调谐 daemonset 对象的核心处理逻辑所在,从 queue 中取出 daemonset 对象,做调谐处理。



daemonset 更新策略

(1)OnDelete:使用 OnDelete 更新策略时,在更新 DaemonSet pod 模板后,只有当你手动删除老的 DaemonSet pods 之后,新的 DaemonSet Pod 才会被自动创建。


(2)RollingUpdate:默认的更新策略。使用 RollingUpdate 更新策略时,在更新 DaemonSet pod 模板后, 老的 DaemonSet pods 将被删除,并且将根据滚动更新配置自动创建新的 DaemonSet pods。 滚动更新期间,最多只能有 DaemonSet 的一个 Pod 运行于每个节点上。


daemonset controller 分析将分为两大块进行,分别是:


(1)daemonset controller 初始化与启动分析;


(2)daemonset controller 处理逻辑分析。

1.daemonset controller 初始化与启动分析

基于 tag v1.17.4

https://github.com/kubernetes/kubernetes/releases/tag/v1.17.4


直接看到 startDaemonSetController 函数,作为 daemonset controller 初始化与启动分析的入口。

startDaemonSetController

startDaemonSetController 主要逻辑:


(1)调用 daemon.NewDaemonSetsController 新建并初始化 DaemonSetsController;


(2)拉起一个 goroutine,跑 DaemonSetsController 的 Run 方法。


// cmd/kube-controller-manager/app/apps.gofunc startDaemonSetController(ctx ControllerContext) (http.Handler, bool, error) {  if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "daemonsets"}] {    return nil, false, nil  }  dsc, err := daemon.NewDaemonSetsController(    ctx.InformerFactory.Apps().V1().DaemonSets(),    ctx.InformerFactory.Apps().V1().ControllerRevisions(),    ctx.InformerFactory.Core().V1().Pods(),    ctx.InformerFactory.Core().V1().Nodes(),    ctx.ClientBuilder.ClientOrDie("daemon-set-controller"),    flowcontrol.NewBackOff(1*time.Second, 15*time.Minute),  )  if err != nil {    return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err)  }  go dsc.Run(int(ctx.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), ctx.Stop)  return nil, true, nil}
复制代码

1.1 daemon.NewDaemonSetsController

daemon.NewDaemonSetsController函数代码中可以看到,daemonset controller 注册了 daemonset、node、pod 与 ControllerRevisions 对象的 EventHandler,也即对这几个对象的 event 进行监听,把 event 放入事件队列并做处理。并且将dsc.syncDaemonSet方法赋值给dsc.syncHandler,也即注册为核心处理方法,在dsc.Run方法中会调用该核心处理方法来调谐 daemonset 对象(核心处理方法后面会进行详细分析)。


// pkg/controller/daemon/daemon_controller.gofunc NewDaemonSetsController(  daemonSetInformer appsinformers.DaemonSetInformer,  historyInformer appsinformers.ControllerRevisionInformer,  podInformer coreinformers.PodInformer,  nodeInformer coreinformers.NodeInformer,  kubeClient clientset.Interface,  failedPodsBackoff *flowcontrol.Backoff,) (*DaemonSetsController, error) {  eventBroadcaster := record.NewBroadcaster()  eventBroadcaster.StartLogging(klog.Infof)  eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("daemon_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil { return nil, err } } dsc := &DaemonSetsController{ kubeClient: kubeClient, eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}), podControl: controller.RealPodControl{ KubeClient: kubeClient, Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}), }, crControl: controller.RealControllerRevisionControl{ KubeClient: kubeClient, }, burstReplicas: BurstReplicas, expectations: controller.NewControllerExpectations(), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"), }
daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { ds := obj.(*apps.DaemonSet) klog.V(4).Infof("Adding daemon set %s", ds.Name) dsc.enqueueDaemonSet(ds) }, UpdateFunc: func(old, cur interface{}) { oldDS := old.(*apps.DaemonSet) curDS := cur.(*apps.DaemonSet) klog.V(4).Infof("Updating daemon set %s", oldDS.Name) dsc.enqueueDaemonSet(curDS) }, DeleteFunc: dsc.deleteDaemonset, }) dsc.dsLister = daemonSetInformer.Lister() dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced
historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dsc.addHistory, UpdateFunc: dsc.updateHistory, DeleteFunc: dsc.deleteHistory, }) dsc.historyLister = historyInformer.Lister() dsc.historyStoreSynced = historyInformer.Informer().HasSynced
// Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete // more pods until all the effects (expectations) of a daemon set's create/delete have been observed. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dsc.addPod, UpdateFunc: dsc.updatePod, DeleteFunc: dsc.deletePod, }) dsc.podLister = podInformer.Lister()
// This custom indexer will index pods based on their NodeName which will decrease the amount of pods we need to get in simulate() call. podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{ "nodeName": indexByPodNodeName, }) dsc.podNodeIndex = podInformer.Informer().GetIndexer() dsc.podStoreSynced = podInformer.Informer().HasSynced
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dsc.addNode, UpdateFunc: dsc.updateNode, }, ) dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced dsc.nodeLister = nodeInformer.Lister()
dsc.syncHandler = dsc.syncDaemonSet dsc.enqueueDaemonSet = dsc.enqueue
dsc.failedPodsBackoff = failedPodsBackoff
return dsc, nil}
复制代码

1.2 dsc.Run

主要看到 for 循环处,根据 workers 的值(默认值为 2),启动相应数量的 goroutine,跑dsc.runWorker方法,主要是调用前面讲到的 daemonset controller 核心处理方法dsc.syncDaemonSet


// pkg/controller/daemon/daemon_controller.gofunc (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {  defer utilruntime.HandleCrash()  defer dsc.queue.ShutDown()
klog.Infof("Starting daemon sets controller") defer klog.Infof("Shutting down daemon sets controller")
if !cache.WaitForNamedCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) { return }
for i := 0; i < workers; i++ { go wait.Until(dsc.runWorker, time.Second, stopCh) }
go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, stopCh)
<-stopCh}
复制代码

1.2.1 dsc.runWorker

从 queue 队列中取出事件 key,并调用dsc.syncHandledsc.syncDaemonSet做调谐处理。queue 队列里的事件来源前面讲过,是 daemonset controller 注册的 daemonset、node、pod 与 ControllerRevisions 对象的 EventHandler,它们的变化 event 会被监听到然后放入 queue 中。


// pkg/controller/daemon/daemon_controller.gofunc (dsc *DaemonSetsController) runWorker() {  for dsc.processNextWorkItem() {  }}
func (dsc *DaemonSetsController) processNextWorkItem() bool { dsKey, quit := dsc.queue.Get() if quit { return false } defer dsc.queue.Done(dsKey)
err := dsc.syncHandler(dsKey.(string)) if err == nil { dsc.queue.Forget(dsKey) return true }
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err)) dsc.queue.AddRateLimited(dsKey)
return true}
复制代码

2.daemonset controller 核心处理逻辑分析

syncDaemonSet

直接看到 daemonset controller 核心处理方法 syncDaemonSet。


主要逻辑:


(1)获取执行方法时的当前时间,并定义defer函数,用于计算该方法总执行时间,也即统计对一个 daemonset 进行同步调谐操作的耗时;


(2)根据 daemonset 对象的命名空间与名称,获取 daemonset 对象;


(3)获取所有 node 的对象列表;


(4)判断 daemonset 对象的 DeletionTimestamp 是否为空,不为空则直接 return,代表该 daemonset 对象正在被删除,无需再调谐;


(5)调用 dsc.constructHistory 获取 daemonset 的历史版本;


(6)调用 dsc.expectations.SatisfiedExpectations,判断该 daemonset 对象是否满足 expectations 机制(expectations 机制与 replicaset controller 分析中的用途一致,这里不再展开分析),不满足则调用 dsc.updateDaemonSetStatus 更新 daemonset 状态后直接 return;


(7)调用 dsc.manage,dsc.manage 方法中不区分新旧 daemonset 版本的 pod,只保证 daemonset 的 pod 运行在每一个合适条件的 node 上,在合适的 node 上没有 daemonset 的 pod 时创建 pod,且把不符合条件的 node 上的 daemonset pod 删除掉;


(8)再次调用 dsc.expectations.SatisfiedExpectations 判断是否满足 expectations 机制,满足则判断 daemonset 配置的更新策略,如果是滚动更新则调用 dsc.rollingUpdate,主要用于处理 daemonset 对象的滚动更新处理,根据配置的滚动更新配置,删除旧的 pod(pod 的创建操作在 dsc.manage 方法中进行);


当 daemonset 更新策略配置为 OnDelete 时,这里不做额外处理,因为只有当手动删除老的 DaemonSet pods 之后,新的 DaemonSet Pod 才会被自动创建,手动删除老的 pod 后,将在 dsc.manage 方法中创建新版本的 pod;


(9)调用 dsc.cleanupHistory,根据 daemonset 的spec.revisionHistoryLimit配置以及版本新旧顺序(优先清理最老旧版本)来清理 daemonset 的已经不存在 pod 的历史版本;


(10)最后调用 dsc.updateDaemonSetStatus,根据现存 daemonset pod 的部署情况以及 pod 的状态、node 是否满足 pod 运行条件等信息,更新 daemonset 的 status。


// pkg/controller/daemon/daemon_controller.gofunc (dsc *DaemonSetsController) syncDaemonSet(key string) error {  startTime := time.Now()  defer func() {    klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Since(startTime))  }()
namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } ds, err := dsc.dsLister.DaemonSets(namespace).Get(name) if errors.IsNotFound(err) { klog.V(3).Infof("daemon set has been deleted %v", key) dsc.expectations.DeleteExpectations(key) return nil } if err != nil { return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err) }
nodeList, err := dsc.nodeLister.List(labels.Everything()) if err != nil { return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err) }
everything := metav1.LabelSelector{} if reflect.DeepEqual(ds.Spec.Selector, &everything) { dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required.") return nil }
// Don't process a daemon set until all its creations and deletions have been processed. // For example if daemon set foo asked for 3 new daemon pods in the previous call to manage, // then we do not want to call manage on foo until the daemon pods have been created. dsKey, err := controller.KeyFunc(ds) if err != nil { return fmt.Errorf("couldn't get key for object %#v: %v", ds, err) }
// If the DaemonSet is being deleted (either by foreground deletion or // orphan deletion), we cannot be sure if the DaemonSet history objects // it owned still exist -- those history objects can either be deleted // or orphaned. Garbage collector doesn't guarantee that it will delete // DaemonSet pods before deleting DaemonSet history objects, because // DaemonSet history doesn't own DaemonSet pods. We cannot reliably // calculate the status of a DaemonSet being deleted. Therefore, return // here without updating status for the DaemonSet being deleted. if ds.DeletionTimestamp != nil { return nil }
// Construct histories of the DaemonSet, and get the hash of current history cur, old, err := dsc.constructHistory(ds) if err != nil { return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err) } hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey]
if !dsc.expectations.SatisfiedExpectations(dsKey) { // Only update status. Don't raise observedGeneration since controller didn't process object of that generation. return dsc.updateDaemonSetStatus(ds, nodeList, hash, false) }
err = dsc.manage(ds, nodeList, hash) if err != nil { return err }
// Process rolling updates if we're ready. if dsc.expectations.SatisfiedExpectations(dsKey) { switch ds.Spec.UpdateStrategy.Type { case apps.OnDeleteDaemonSetStrategyType: case apps.RollingUpdateDaemonSetStrategyType: err = dsc.rollingUpdate(ds, nodeList, hash) } if err != nil { return err } }
err = dsc.cleanupHistory(ds, old) if err != nil { return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err) }
return dsc.updateDaemonSetStatus(ds, nodeList, hash, true)}
复制代码

2.1 dsc.manage

dsc.manage 方法中不区分新旧 daemonset 版本的 pod,主要是用于保证 daemonset 的 pod 运行在每一个合适条件的 node 上,在合适的 node 上没有 daemonset 的 pod 时创建 pod,且把不符合条件的 node 上的 daemonset pod 删除掉。


主要逻辑:


(1)调用 dsc.getNodesToDaemonPods,根据 daemonset 的 Selector 获取 daemonset 的所有 pod,然后返回 pod 与 node 的对应关联关系 map;


(2)遍历前面获取到的 node 列表,执行 dsc.podsShouldBeOnNode,根据 pod 是否指定了 nodeName、nodeSelector、ToleratesNodeTaints 等,以及 node 对象的相关信息来做比对,来确定在某个 node 上是否已经存在 daemonset 对应的 pod,以及是要为该 daemonset 创建 pod 还是删除 pod;


(3)调用 getUnscheduledPodsWithoutNode,将 pod 的 nodeName 与前面获取到的 node 列表比对,将 nodeName 不存在的 pod 加入到要被删除的 pod 列表中;


(4)调用 dsc.syncNodes,根据前面获取到的要创建的 pod 的 node 列表以及要删除的 pod 列表,做相应的创建、删除 pod 的操作。


// pkg/controller/daemon/daemon_controller.gofunc (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {  // Find out the pods which are created for the nodes by DaemonSet.  nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)  if err != nil {    return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)  }
// For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node. var nodesNeedingDaemonPods, podsToDelete []string for _, node := range nodeList { nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, err := dsc.podsShouldBeOnNode( node, nodeToDaemonPods, ds)
if err != nil { continue }
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...) podsToDelete = append(podsToDelete, podsToDeleteOnNode...) }
// Remove unscheduled pods assigned to not existing nodes when daemonset pods are scheduled by scheduler. // If node doesn't exist then pods are never scheduled and can't be deleted by PodGCController. podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)
// Label new pods using the hash label value of the current history when creating them if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil { return err }
return nil}
复制代码

2.1.1 dsc.podsShouldBeOnNode

dsc.podsShouldBeOnNode 方法用于判断一个 node 上是否需要运行 daemonset pod,方法返回 nodesNeedingDaemonPods 与 podsToDelete,分别代表需要运行 daemonset pod 的 node、需要被删除的 pod 列表。


主要逻辑:


(1)调用 dsc.nodeShouldRunDaemonPod,返回 shouldSchedule 与 shouldContinueRunning,分别代表 daemonset pod 是否应该调度到某 node、某 node 上的 daemonset pod 是否可以继续运行;


(2)当 shouldSchedule 为 true,即 pod 应该调度到某 node,但现在不存在时,将该 node 添加到 nodesNeedingDaemonPods;


(3)当 shouldContinueRunning 为 true,找出在该 node 上还在运行没有退出的 daemonset pod 列表,然后按照 pod 创建时间排序,只保留最新创建的 pod,其余的加入到 podsToDelete;


(4)当 shouldContinueRunning 为 false,即 daemonset pod 不应继续在某 node 上运行,且现在该 node 已经存在该 daemonset pod 时,将 node 上该 daemonset 的所有 pod 都加入到 podsToDelete;


(5)返回 nodesNeedingDaemonPods 与 podsToDelete,分别代表需要运行 daemonset pod 的 node、需要被删除的 pod 列表。


// pkg/controller/daemon/daemon_controller.gofunc (dsc *DaemonSetsController) podsShouldBeOnNode(  node *v1.Node,  nodeToDaemonPods map[string][]*v1.Pod,  ds *apps.DaemonSet,) (nodesNeedingDaemonPods, podsToDelete []string, err error) {
_, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds) if err != nil { return }
daemonPods, exists := nodeToDaemonPods[node.Name]
switch { case shouldSchedule && !exists: // If daemon pod is supposed to be running on node, but isn't, create daemon pod. nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name) case shouldContinueRunning: // If a daemon pod failed, delete it // If there's non-daemon pods left on this node, we will create it in the next sync loop var daemonPodsRunning []*v1.Pod for _, pod := range daemonPods { if pod.DeletionTimestamp != nil { continue } if pod.Status.Phase == v1.PodFailed { // This is a critical place where DS is often fighting with kubelet that rejects pods. // We need to avoid hot looping and backoff. backoffKey := failedPodsBackoffKey(ds, node.Name)
now := dsc.failedPodsBackoff.Clock.Now() inBackoff := dsc.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, now) if inBackoff { delay := dsc.failedPodsBackoff.Get(backoffKey) klog.V(4).Infof("Deleting failed pod %s/%s on node %s has been limited by backoff - %v remaining", pod.Namespace, pod.Name, node.Name, delay) dsc.enqueueDaemonSetAfter(ds, delay) continue }
dsc.failedPodsBackoff.Next(backoffKey, now)
msg := fmt.Sprintf("Found failed daemon pod %s/%s on node %s, will try to kill it", pod.Namespace, pod.Name, node.Name) klog.V(2).Infof(msg) // Emit an event so that it's discoverable to users. dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedDaemonPodReason, msg) podsToDelete = append(podsToDelete, pod.Name) } else { daemonPodsRunning = append(daemonPodsRunning, pod) } } // If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods. // Sort the daemon pods by creation time, so the oldest is preserved. if len(daemonPodsRunning) > 1 { sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning)) for i := 1; i < len(daemonPodsRunning); i++ { podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name) } } case !shouldContinueRunning && exists: // If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node. for _, pod := range daemonPods { if pod.DeletionTimestamp != nil { continue } podsToDelete = append(podsToDelete, pod.Name) } }
return nodesNeedingDaemonPods, podsToDelete, nil}
复制代码


dsc.nodeShouldRunDaemonPod


关于 dsc.nodeShouldRunDaemonPod 方法,不做展开分析,它主要是调用 dsc.simulate 执行 Predicates 预选算法来检查某个 node 是否满足 pod 的运行条件,如果预选失败,则根据失败信息,返回 wantToRun、shouldSchedule、shouldContinueRunning,分别代表 node 与 pod 的 selector、taints 等是否匹配(不考虑 node 资源是否充足)、daemonset pod 是否应该调度到某 node、某 node 上的 daemonset pod 是否可以继续运行,预选成功则全都返回 true。

2.1.2 dsc.syncNodes

dsc.syncNodes 是 daemonset controller 对 pod 进行创建和删除操作的方法。


该方法也涉及到 expectations 机制,与 replicaset controller 中的 expectations 机制作用一致,使用上也基本一致,忘记的可以回头看下 replicaset controller 分析中对 expectations 机制的分析,这里不再对 expectations 机制展开分析。


主要逻辑:


(1)计算要创建、删除 pod 的数量,上限为 dsc.burstReplicas(250),即每一次对 daemonset 对象的同步操作,能创建/删除的 pod 数量上限为 250,超出的部分需要在下一次同步操作才能进行;


(2)调用 dsc.expectations.SetExpectations,设置 expectations;


(3)调用 util.CreatePodTemplate,计算并获取要创建的 podTemplate;


(4)先进行 pod 的创建操作:pod 的创建与 replicaset controller 创建 pod 类似,使用了慢开始算法,分多批次进行创建,第一批创建 1 个 pod,第二批创建 2 个 pod,第三批创建 4 个 pod,以 2 倍往下依次执行,直到达到期望为止;而每一批次的创建,会拉起与要创建 pod 数量相等的 goroutine,每个 goroutine 负责创建一个 pod,并使用 WaitGroup 等待该批次的所有创建任务完成,再进行下一批次的创建;


(4)再进行 pod 的删除操作:对于每个要删除的 pod,都拉起一个 goroutine 来做删除操作,并使用 WaitGroup 等待所有 goroutine 完成。


// pkg/controller/daemon/daemon_controller.gofunc (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {  // We need to set expectations before creating/deleting pods to avoid race conditions.  dsKey, err := controller.KeyFunc(ds)  if err != nil {    return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)  }
createDiff := len(nodesNeedingDaemonPods) deleteDiff := len(podsToDelete)
if createDiff > dsc.burstReplicas { createDiff = dsc.burstReplicas } if deleteDiff > dsc.burstReplicas { deleteDiff = dsc.burstReplicas }
dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)
// error channel to communicate back failures. make the buffer big enough to avoid any blocking errCh := make(chan error, createDiff+deleteDiff)
klog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff) createWait := sync.WaitGroup{} // If the returned error is not nil we have a parse error. // The controller handles this via the hash. generation, err := util.GetTemplateGeneration(ds) if err != nil { generation = nil } template := util.CreatePodTemplate(ds.Spec.Template, generation, hash) // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize // and double with each successful iteration in a kind of "slow start". // This handles attempts to start large numbers of pods that would // likely all fail with the same error. For example a project with a // low quota that attempts to create a large number of pods will be // prevented from spamming the API service with the pod create requests // after one of its pods fails. Conveniently, this also prevents the // event spam that those failures would generate. batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize) for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize { errorCount := len(errCh) createWait.Add(batchSize) for i := pos; i < pos+batchSize; i++ { go func(ix int) { defer createWait.Done()
podTemplate := template.DeepCopy() // The pod's NodeAffinity will be updated to make sure the Pod is bound // to the target node by default scheduler. It is safe to do so because there // should be no conflicting node affinity with the target node. podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity( podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])
err := dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate, ds, metav1.NewControllerRef(ds, controllerKind))
if err != nil { if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { // If the namespace is being torn down, we can safely ignore // this error since all subsequent creations will fail. return } } if err != nil { klog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) dsc.expectations.CreationObserved(dsKey) errCh <- err utilruntime.HandleError(err) } }(i) } createWait.Wait() // any skipped pods that we never attempted to start shouldn't be expected. skippedPods := createDiff - (batchSize + pos) if errorCount < len(errCh) && skippedPods > 0 { klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for set %q/%q", skippedPods, ds.Namespace, ds.Name) dsc.expectations.LowerExpectations(dsKey, skippedPods, 0) // The skipped pods will be retried later. The next controller resync will // retry the slow start process. break } }
klog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff) deleteWait := sync.WaitGroup{} deleteWait.Add(deleteDiff) for i := 0; i < deleteDiff; i++ { go func(ix int) { defer deleteWait.Done() if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil { klog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) dsc.expectations.DeletionObserved(dsKey) errCh <- err utilruntime.HandleError(err) } }(i) } deleteWait.Wait()
// collect errors if any for proper reporting/retry logic in the controller errors := []error{} close(errCh) for err := range errCh { errors = append(errors, err) } return utilerrors.NewAggregate(errors)}
复制代码

2.2 dsc.rollingUpdate

dsc.rollingUpdate 方法主要用于处理 daemonset 对象的滚动更新处理,根据配置的滚动更新配置,删除旧的 pod(pod 的创建操作在 dsc.manage 方法中进行)。


主要逻辑:


(1)调用 dsc.getNodesToDaemonPods,获取 daemonset 所属 pod 与 node 的对应关联关系 map;


(2)调用 dsc.getAllDaemonSetPods,获取所有的旧版本 daemonset 的 pod;


(3)调用 dsc.getUnavailableNumbers,根据 daemonset 的滚动更新策略配置获取 maxUnavailable 值,再获取 numUnavailable 值,numUnavailable 代表在符合条件的 node 节点中,没有 daemonset 对应的 pod 或者 pod 处于 Unavailable 状态的 node 数量;


(4)调用 util.SplitByAvailablePods,将旧版本 daemonset 的所有 pod 分成 oldAvailablePods 列表,以及 oldUnavailablePods 列表;


(5)定义一个字符串数组 oldPodsToDelete,用于储存准备要删除的 pod;


(6)将全部 oldUnavailablePods 加入到 oldPodsToDelete 数组中;


(7)遍历 oldAvailablePods 列表,当 numUnavailable 小于 maxUnavailable 值时,将 pod 加入到 oldPodsToDelete 数组中,且 numUnavailable 值加一;


(8)调用 dsc.syncNodes,将 oldPodsToDelete 数组中的 pod 删除。


// pkg/controller/daemon/update.gofunc (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {  nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)  if err != nil {    return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)  }
_, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods, hash) maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeList, nodeToDaemonPods) if err != nil { return fmt.Errorf("couldn't get unavailable numbers: %v", err) } oldAvailablePods, oldUnavailablePods := util.SplitByAvailablePods(ds.Spec.MinReadySeconds, oldPods)
// for oldPods delete all not running pods var oldPodsToDelete []string klog.V(4).Infof("Marking all unavailable old pods for deletion") for _, pod := range oldUnavailablePods { // Skip terminating pods. We won't delete them again if pod.DeletionTimestamp != nil { continue } klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name) oldPodsToDelete = append(oldPodsToDelete, pod.Name) }
klog.V(4).Infof("Marking old pods for deletion") for _, pod := range oldAvailablePods { if numUnavailable >= maxUnavailable { klog.V(4).Infof("Number of unavailable DaemonSet pods: %d, is equal to or exceeds allowed maximum: %d", numUnavailable, maxUnavailable) break } klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name) oldPodsToDelete = append(oldPodsToDelete, pod.Name) numUnavailable++ } return dsc.syncNodes(ds, oldPodsToDelete, []string{}, hash)}
复制代码

2.3 dsc.updateDaemonSetStatus

dsc.updateDaemonSetStatus 方法负责根据现存 daemonset pod 的部署情况以及 pod 的状态、node 是否满足 pod 运行条件等信息,来更新 daemonset 的 status 状态值,这里不对代码展开分析,只分析一下 daemonset 的 status 中各个字段的意思。


(1)currentNumberScheduled: 已经调度了 daemonset pod 的节点数量;


(2)desiredNumberScheduled: 期望调度 daemonset pod 的节点数量;


(3)numberMisscheduled:不需要调度 daemonset pod 但已经调度完成了的节点数量;


(4)numberAvailable: pod 状态达到 Available 的数量(pod 达到 Ready 状态 MinReadySeconds 时间后,就认为达到了 Available 状态);


(5)numberReady: pod 状态达到 Ready 的数量;


(6)numberUnavailable: desiredNumberScheduled - numberAvailable;


(7)updatedNumberScheduled: 已经调度了最新版本 daemonset pod 的节点数量。

总结

daemonset controller 创建 pod 的流程与 replicaset controller 创建 pod 的流程是相似的,都使用了 expectations 机制并且限制了在一次调谐过程中最多创建或删除的 pod 数量。daemonset 的更新方式与 statefulset 一样包含 OnDelete 和 RollingUpdate(滚动更新) 两种,OnDelete 方式需要手动删除对应的 pod,然后 daemonset controller 才会创建出新的 pod,而 RollingUpdate 方式与 statefulset 和 deployment 有所区别, RollingUpdate 方式更新时是按照先删除 pod 再创建 pod 的顺序进行,不像 deployment 那样可以先创建出新的 pod 再删除旧的 pod。

daemonset controller 架构

daemonset controller 的大致组成和处理流程如下图,daemonset controller 对 daemonset、pod、node、ControllerRevision 对象注册了 event handler,当有事件时,会 watch 到然后将对应的 daemonset 对象放入到 queue 中,然后syncDaemonset方法为 daemonset controller 调谐 daemonset 对象的核心处理逻辑所在,从 queue 中取出 daemonset 对象,做调谐处理。



daemonset controller 核心处理逻辑

daemonset controller 的核心处理逻辑是调谐 daomonset 对象,使得 daemonset 在合适 node 上完成 pod 的创建、在不合适 node 上完成 pod 的删除,触发滚动更新时按照配置的滚动更新策略配置来删除旧的 pod、创建新的 pod,并根据历史版本限制配置清理 daemonset 的历史版本,最后更新 daemonset 对象的 status 状态。



daemonset controller 创建 pod 算法

daemonset controller 创建 pod 的算法与 replicaset controller 创建 pod 的算法几乎相同,按 1、2、4、8...的递增趋势分多批次进行(每次调谐中创建 pod 的数量上限为 250 个,超过上限的会在下次调谐中再创建),若某批次创建 pod 有失败的(如 apiserver 限流,丢弃请求等,注意:超时除外,因为 initialization 处理有可能超时),则后续批次的 pod 创建不再进行,需等待该 daemonset 对象下次调谐时再触发该 pod 创建算法,进行 pod 的创建,直至所有满足条件的 node 上都有该 daemonset 的 pod。

daemonset controller 删除 pod 算法

daemonset controller 删除 pod 的算法是,拉起与要删除的 pod 数量相同的 goroutine 来删除 pod(每次调谐中删除 pod 的数量上限为 250),并等待所有 goroutine 执行完成。删除 pod 有失败的(如 apiserver 限流,丢弃请求)或超过 250 上限的部分,需等待该 daemonset 对象下次调谐时再触发该 pod 删除算法,进行 pod 的删除,直至所有期望被删除的 pod 都被删除。

发布于: 2021 年 12 月 23 日
用户头像

良凯尔

关注

热爱的力量 2020.01.10 加入

kubernetes开发者

评论

发布
暂无评论
k8s daemonset controller源码分析