kube-scheduler 源码分析(3)-抢占调度分析
kube-scheduler 简介
kube-scheduler 组件是 kubernetes 中的核心组件之一,主要负责 pod 资源对象的调度工作,具体来说,kube-scheduler 组件负责根据调度算法(包括预选算法和优选算法)将未调度的 pod 调度到合适的最优的 node 节点上。
kube-scheduler 架构图
kube-scheduler 的大致组成和处理流程如下图,kube-scheduler 对 pod、node 等对象进行了 list/watch,根据 informer 将未调度的 pod 放入待调度 pod 队列,并根据 informer 构建调度器 cache(用于快速获取需要的 node 等对象),然后sched.scheduleOne
方法为 kube-scheduler 组件调度 pod 的核心处理逻辑所在,从未调度 pod 队列中取出一个 pod,经过预选与优选算法,最终选出一个最优 node,上述步骤都成功则更新 cache 并异步执行 bind 操作,也就是更新 pod 的 nodeName 字段,失败则进入抢占逻辑,至此一个 pod 的调度工作完成。
kube-scheduler 抢占调度概述
优先级和抢占机制,解决的是 Pod 调度失败时该怎么办的问题。
正常情况下,当一个 pod 调度失败后,就会被暂时 “搁置” 处于 pending 状态,直到 pod 被更新或者集群状态发生变化,调度器才会对这个 pod 进行重新调度。
但是有的时候,我们希望给 pod 分等级,即分优先级。当一个高优先级的 Pod 调度失败后,该 Pod 并不会被“搁置”,而是会“挤走”某个 Node 上的一些低优先级的 Pod,这样一来就可以保证高优先级 Pod 会优先调度成功。
关于 pod 优先级,具体请参考:https://kubernetes.io/zh/docs/concepts/scheduling-eviction/pod-priority-preemption/
抢占发生的原因,一定是一个高优先级的 pod 调度失败,我们称这个 pod 为“抢占者”,称被抢占的 pod 为“牺牲者”(victims)。
PDB 概述
PDB 全称 PodDisruptionBudget,可以理解为是 k8s 中用来保证 Deployment、StatefulSet 等控制器在集群中存在的最小副本数量的一个对象。
具体请参考:
https://kubernetes.io/zh/docs/concepts/workloads/pods/disruptions/
https://kubernetes.io/zh/docs/tasks/run-application/configure-pdb/
抢占调度功能开启与关闭配置
kube-scheduler 的抢占调度功能默认开启。
在 Kubernetes 1.15+版本,如果 NonPreemptingPriority
被启用了(kube-scheduler 组件启动参数--feature-gates=NonPreemptingPriority=true
) ,PriorityClass
可以设置 preemptionPolicy: Never
,则该 PriorityClass
的所有 Pod 在调度失败后将不会执行抢占逻辑。
另外,在 Kubernetes 1.11+版本,kube-scheduler 组件也可以配置文件参数设置将抢占调度功能关闭(注意:不能通过组件启动命令行参数设置)。
apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
...
disablePreemption: true
复制代码
配置文件通过 kube-scheduler 启动参数--config
指定。
kube-scheduler 启动参数参考:https://kubernetes.io/zh/docs/reference/command-line-tools-reference/kube-scheduler/
kube-scheduler 配置文件参考:https://kubernetes.io/zh/docs/reference/scheduling/config/
kube-scheduler 组件的分析将分为三大块进行,分别是:
(1)kube-scheduler 初始化与启动分析;
(2)kube-scheduler 核心处理逻辑分析;
(3)kube-scheduler 抢占调度逻辑分析;
本篇进行抢占调度逻辑分析。
3.kube-scheduler 抢占调度逻辑分析
基于 tag v1.17.4
https://github.com/kubernetes/kubernetes/releases/tag/v1.17.4
分析入口-scheduleOne
把 scheduleOne 方法作为 kube-scheduler 组件抢占调度的分析入口,这里只关注到scheduleOne
方法中抢占调度相关的逻辑:
(1)调用sched.Algorithm.Schedule
方法,调度 pod;
(2)pod 调度失败后,调用sched.DisablePreemption
判断 kube-scheduler 组件是否关闭了抢占调度功能;
(3)如未关闭抢占调度功能,则调用sched.preempt
进行抢占调度逻辑;
// pkg/scheduler/scheduler.go
func (sched *Scheduler) scheduleOne(ctx context.Context) {
...
// 调度pod
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod)
if err != nil {
...
if fitError, ok := err.(*core.FitError); ok {
// 判断是否关闭了抢占调度功能
if sched.DisablePreemption {
klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
" No preemption is performed.")
} else {
// 抢占调度逻辑
preemptionStartTime := time.Now()
sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError)
...
}
...
}
...
复制代码
sched.preempt
sched.preempt 为 kube-scheduler 抢占调度处理逻辑所在,主要逻辑:
(1)调用 sched.Algorithm.Preempt,模拟 pod 抢占调度过程,返回 pod 可以抢占的 node 节点、被抢占的 pod 列表、需要去除 NominatedNodeName 属性的 pod 列表;
(2)调用 sched.podPreemptor.setNominatedNodeName,请求 apiserver,将可以抢占的 node 节点名称设置到 pod 的 NominatedNodeName 属性值中,然后该 pod 会重新进入待调度 pod 队列,等待再一次调度;
(3)遍历被抢占的 pod 列表,请求 apiserver,删除 pod;
(4)遍历需要去除 NominatedNodeName 属性的 pod 列表,请求 apiserver,更新 pod,去除 pod 的 NominatedNodeName 属性值;
注意:抢占调度处理逻辑并马上把调度失败的 pod 再次抢占调度到 node 上,而是根据模拟抢占的结果,删除被抢占 pod,空出相应的资源,最后把该调度失败的 pod 交给下一个调度周期再处理。
// pkg/scheduler/scheduler.go
func (sched *Scheduler) preempt(ctx context.Context, state *framework.CycleState, fwk framework.Framework, preemptor *v1.Pod, scheduleErr error) (string, error) {
...
// (1)模拟pod抢占调度过程,返回pod可以抢占的node节点、被抢占的pod列表、需要去除nominateName属性的pod列表
node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, state, preemptor, scheduleErr)
if err != nil {
klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
return "", err
}
var nodeName = ""
if node != nil {
nodeName = node.Name
sched.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)
// (2)请求apiserver,将可以抢占的node节点名称设置到pod的nominatedNode属性值中,然后该pod会重新进入待调度pod队列,等待再一次调度
err = sched.podPreemptor.setNominatedNodeName(preemptor, nodeName)
if err != nil {
klog.Errorf("Error in preemption process. Cannot set 'NominatedPod' on pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
sched.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
return "", err
}
// (3)遍历被抢占的pod列表,请求apiserver,删除pod
for _, victim := range victims {
if err := sched.podPreemptor.deletePod(victim); err != nil {
klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
return "", err
}
// If the victim is a WaitingPod, send a reject message to the PermitPlugin
if waitingPod := fwk.GetWaitingPod(victim.UID); waitingPod != nil {
waitingPod.Reject("preempted")
}
sched.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
}
metrics.PreemptionVictims.Observe(float64(len(victims)))
}
// (4)遍历需要去除nominateName属性的pod列表,请求apiserver,更新pod,去除pod的nominateName属性值
for _, p := range nominatedPodsToClear {
rErr := sched.podPreemptor.removeNominatedNodeName(p)
if rErr != nil {
klog.Errorf("Cannot remove 'NominatedPod' field of pod: %v", rErr)
// We do not return as this error is not critical.
}
}
return nodeName, err
}
复制代码
sched.Algorithm.Preempt
sched.Algorithm.Preempt 方法模拟 pod 抢占调度过程,返回 pod 可以抢占的 node 节点、被抢占的 pod 列表、需要去除 NominatedNodeName 属性的 pod 列表,主要逻辑为:
(1)调用 nodesWherePreemptionMightHelp,获取预选失败且移除部分 pod 之后可能可以满足调度条件的节点;
(2)获取 PodDisruptionBudget 对象,用于后续筛选可以被抢占的 node 节点列表(关于 PodDisruptionBudget 的用法,可自行搜索资料查看);
(3)调用 g.selectNodesForPreemption,筛选可以被抢占的 node 节点列表,并返回 node 节点上被抢占的 pod 的最小集合;
(4)遍历 scheduler-extender(kube-scheduler 的一种 webhook 扩展机制),执行 extender 的抢占处理逻辑,根据处理逻辑过滤可以被抢占的 node 节点列表;
(5)调用 pickOneNodeForPreemption,从可被抢占的 node 节点列表中挑选出一个 node 节点;
(6)调用 g.getLowerPriorityNominatedPods,获取被抢占 node 节点上 NominatedNodeName 属性不为空且优先级比抢占 pod 低的 pod 列表;
// pkg/scheduler/core/generic_scheduler.go
func (g *genericScheduler) Preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
// Scheduler may return various types of errors. Consider preemption only if
// the error is of type FitError.
fitError, ok := scheduleErr.(*FitError)
if !ok || fitError == nil {
return nil, nil, nil, nil
}
if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfoMap, g.enableNonPreempting) {
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
return nil, nil, nil, nil
}
if len(g.nodeInfoSnapshot.NodeInfoMap) == 0 {
return nil, nil, nil, ErrNoNodesAvailable
}
// (1)获取预选失败且移除部分pod之后可能可以满足调度条件的节点;
potentialNodes := nodesWherePreemptionMightHelp(g.nodeInfoSnapshot.NodeInfoMap, fitError)
if len(potentialNodes) == 0 {
klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
// In this case, we should clean-up any existing nominated node name of the pod.
return nil, nil, []*v1.Pod{pod}, nil
}
var (
pdbs []*policy.PodDisruptionBudget
err error
)
// (2)获取PodDisruptionBudget对象,用于后续筛选可以被抢占的node节点列表(关于PodDisruptionBudget的用法,可自行搜索资料查看);
if g.pdbLister != nil {
pdbs, err = g.pdbLister.List(labels.Everything())
if err != nil {
return nil, nil, nil, err
}
}
// (3)获取可以被抢占的node节点列表;
nodeToVictims, err := g.selectNodesForPreemption(ctx, state, pod, potentialNodes, pdbs)
if err != nil {
return nil, nil, nil, err
}
// (4)遍历scheduler-extender(kube-scheduler的一种webhook扩展机制),执行extender的抢占处理逻辑,根据处理逻辑过滤可以被抢占的node节点列表;
// We will only check nodeToVictims with extenders that support preemption.
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
if err != nil {
return nil, nil, nil, err
}
// (5)从可被抢占的node节点列表中挑选出一个node节点;
candidateNode := pickOneNodeForPreemption(nodeToVictims)
if candidateNode == nil {
return nil, nil, nil, nil
}
// (6)获取被抢占node节点上nominateName属性不为空且优先级比抢占pod低的pod列表;
// Lower priority pods nominated to run on this node, may no longer fit on
// this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
if nodeInfo, ok := g.nodeInfoSnapshot.NodeInfoMap[candidateNode.Name]; ok {
return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, nil
}
return nil, nil, nil, fmt.Errorf(
"preemption failed: the target node %s has been deleted from scheduler cache",
candidateNode.Name)
}
复制代码
3.1 nodesWherePreemptionMightHelp
nodesWherePreemptionMightHelp 函数主要是返回预选失败且移除部分 pod 之后可能可以满足调度条件的节点。
怎么判断某个预选失败的 node 节点移除部分 pod 之后可能可以满足调度条件呢?主要逻辑看到 predicates.UnresolvablePredicateExists 方法。
// pkg/scheduler/core/generic_scheduler.go
func nodesWherePreemptionMightHelp(nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, fitErr *FitError) []*v1.Node {
potentialNodes := []*v1.Node{}
for name, node := range nodeNameToInfo {
if fitErr.FilteredNodesStatuses[name].Code() == framework.UnschedulableAndUnresolvable {
continue
}
failedPredicates := fitErr.FailedPredicates[name]
// If we assume that scheduler looks at all nodes and populates the failedPredicateMap
// (which is the case today), the !found case should never happen, but we'd prefer
// to rely less on such assumptions in the code when checking does not impose
// significant overhead.
// Also, we currently assume all failures returned by extender as resolvable.
if predicates.UnresolvablePredicateExists(failedPredicates) == nil {
klog.V(3).Infof("Node %v is a potential node for preemption.", name)
potentialNodes = append(potentialNodes, node.Node())
}
}
return potentialNodes
}
复制代码
3.1.1 predicates.UnresolvablePredicateExists
只要预选算法执行失败的 node 节点,其失败的原因不属于 unresolvablePredicateFailureErrors 中任何一个原因时,则该预选失败的 node 节点移除部分 pod 之后可能可以满足调度条件。
unresolvablePredicateFailureErrors 包括节点 NodeSelector 不匹配、pod 反亲和规则不符合、污点不容忍、节点属于 NotReady 状态、节点内存不足等等。
// pkg/scheduler/algorithm/predicates/error.go
var unresolvablePredicateFailureErrors = map[PredicateFailureReason]struct{}{
ErrNodeSelectorNotMatch: {},
ErrPodAffinityRulesNotMatch: {},
ErrPodNotMatchHostName: {},
ErrTaintsTolerationsNotMatch: {},
ErrNodeLabelPresenceViolated: {},
// Node conditions won't change when scheduler simulates removal of preemption victims.
// So, it is pointless to try nodes that have not been able to host the pod due to node
// conditions. These include ErrNodeNotReady, ErrNodeUnderPIDPressure, ErrNodeUnderMemoryPressure, ....
ErrNodeNotReady: {},
ErrNodeNetworkUnavailable: {},
ErrNodeUnderDiskPressure: {},
ErrNodeUnderPIDPressure: {},
ErrNodeUnderMemoryPressure: {},
ErrNodeUnschedulable: {},
ErrNodeUnknownCondition: {},
ErrVolumeZoneConflict: {},
ErrVolumeNodeConflict: {},
ErrVolumeBindConflict: {},
}
// UnresolvablePredicateExists checks if there is at least one unresolvable predicate failure reason, if true
// returns the first one in the list.
func UnresolvablePredicateExists(reasons []PredicateFailureReason) PredicateFailureReason {
for _, r := range reasons {
if _, ok := unresolvablePredicateFailureErrors[r]; ok {
return r
}
}
return nil
}
复制代码
3.2 g.selectNodesForPreemption
g.selectNodesForPreemption 方法,用于获取可以被抢占的 node 节点列表,并返回 node 节点上被抢占的 pod 的最小集合,主要逻辑如下:
(1)定义 checkNode 函数,主要是调用 g.selectVictimsOnNode 方法,方法返回某 node 是否适合被抢占,并返回该 node 节点上被抢占的 pod 的最小集合、被抢占 pod 中定义了 PDB 的 pod 数量;
(2)拉起 16 个 goroutine,并发调用 checkNode 函数,对预选失败的 node 节点列表进行是否适合被抢占的检查;
// pkg/scheduler/core/generic_scheduler.go
// selectNodesForPreemption finds all the nodes with possible victims for
// preemption in parallel.
func (g *genericScheduler) selectNodesForPreemption(
ctx context.Context,
state *framework.CycleState,
pod *v1.Pod,
potentialNodes []*v1.Node,
pdbs []*policy.PodDisruptionBudget,
) (map[*v1.Node]*extenderv1.Victims, error) {
nodeToVictims := map[*v1.Node]*extenderv1.Victims{}
var resultLock sync.Mutex
// (1)定义checkNode函数
// We can use the same metadata producer for all nodes.
meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot)
checkNode := func(i int) {
nodeName := potentialNodes[i].Name
if g.nodeInfoSnapshot.NodeInfoMap[nodeName] == nil {
return
}
nodeInfoCopy := g.nodeInfoSnapshot.NodeInfoMap[nodeName].Clone()
var metaCopy predicates.Metadata
if meta != nil {
metaCopy = meta.ShallowCopy()
}
stateCopy := state.Clone()
stateCopy.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: metaCopy})
// 调用g.selectVictimsOnNode方法,方法返回某node是否适合被抢占,并返回该node节点上被抢占的pod的最小集合、与PDB冲突的pod数量;
pods, numPDBViolations, fits := g.selectVictimsOnNode(ctx, stateCopy, pod, metaCopy, nodeInfoCopy, pdbs)
if fits {
resultLock.Lock()
victims := extenderv1.Victims{
Pods: pods,
NumPDBViolations: int64(numPDBViolations),
}
nodeToVictims[potentialNodes[i]] = &victims
resultLock.Unlock()
}
}
// (2)拉起16个goroutine,并发调用checkNode函数,对预选失败的node节点列表进行是否适合被抢占的检查;
workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
return nodeToVictims, nil
}
复制代码
3.2.1 g.selectVictimsOnNode
g.selectVictimsOnNode 方法用于判断某 node 是否适合被抢占,并返回该 node 节点上被抢占的 pod 的最小集合、被抢占 pod 中定义了 PDB 的 pod 数量。
主要逻辑:
(1)首先,假设把该 node 节点上比抢占 pod 优先级低的所有 pod 都删除掉,然后调用预选算法,看 pod 在该 node 上是否满足调度条件,假如还是不符合调度条件,则该 node 节点不适合被抢占,直接 return;
(2)将所有比抢占 pod 优先级低的 pod 按优先级高低进行排序,优先级最低的排在最前面;
(3)将排好序的 pod 列表按是否定义了 PDB 分成两个 pod 列表;
(4)先遍历定义了 PDB 的 pod 列表,逐一删除 pod(被删除的 pod 称为被抢占 pod),每删除一个 pod,调度预选算法,看 pod 在该 node 上是否满足调度条件,如满足则直接返回该 node 适合被抢占、被抢占的 pod 列表、被抢占 pod 中定义了 PDB 的 pod 数量;
(5)假如遍历完定义了 PDB 的 pod 列表后,抢占 pod 在该 node 上任然不满足调度条件,则继续遍历没有定义 PDB 的 pod 列表,逐一删除 pod,每删除一个 pod,调度预选算法,看 pod 在该 node 上是否满足调度条件,如满足则直接返回该 node 适合被抢占、被抢占的 pod 列表、被抢占 pod 中定义了 PDB 的 pod 数量;
(6)如果上述两个 pod 列表里的 pod 都被删除后,抢占 pod 在该 node 上任然不满足调度条件,则该 node 不适合被抢占,return。
注意:以上说的删除 pod 并不是真正的删除,而是模拟删除后,抢占 pod 是否满足调度条件而已。真正的删除被抢占 pod 的操作在后续确定了要抢占的 node 节点后,再删除该 node 节点上被抢占的 pod。
// pkg/scheduler/core/generic_scheduler.go
func (g *genericScheduler) selectVictimsOnNode(
ctx context.Context,
state *framework.CycleState,
pod *v1.Pod,
meta predicates.Metadata,
nodeInfo *schedulernodeinfo.NodeInfo,
pdbs []*policy.PodDisruptionBudget,
) ([]*v1.Pod, int, bool) {
var potentialVictims []*v1.Pod
removePod := func(rp *v1.Pod) error {
if err := nodeInfo.RemovePod(rp); err != nil {
return err
}
if meta != nil {
if err := meta.RemovePod(rp, nodeInfo.Node()); err != nil {
return err
}
}
status := g.framework.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo)
if !status.IsSuccess() {
return status.AsError()
}
return nil
}
addPod := func(ap *v1.Pod) error {
nodeInfo.AddPod(ap)
if meta != nil {
if err := meta.AddPod(ap, nodeInfo.Node()); err != nil {
return err
}
}
status := g.framework.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo)
if !status.IsSuccess() {
return status.AsError()
}
return nil
}
// (1)首先,假设把该node节点上比抢占pod优先级低的所有pod都删除掉,然后调用预选算法,看pod在该node上是否满足调度条件,假如还是不符合调度条件,则该node节点不适合被抢占,直接return
// As the first step, remove all the lower priority pods from the node and
// check if the given pod can be scheduled.
podPriority := podutil.GetPodPriority(pod)
for _, p := range nodeInfo.Pods() {
if podutil.GetPodPriority(p) < podPriority {
potentialVictims = append(potentialVictims, p)
if err := removePod(p); err != nil {
return nil, 0, false
}
}
}
// If the new pod does not fit after removing all the lower priority pods,
// we are almost done and this node is not suitable for preemption. The only
// condition that we could check is if the "pod" is failing to schedule due to
// inter-pod affinity to one or more victims, but we have decided not to
// support this case for performance reasons. Having affinity to lower
// priority pods is not a recommended configuration anyway.
if fits, _, _, err := g.podFitsOnNode(ctx, state, pod, meta, nodeInfo, false); !fits {
if err != nil {
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
}
return nil, 0, false
}
var victims []*v1.Pod
numViolatingVictim := 0
// (2)将所有比抢占pod优先级低的pod按优先级高低进行排序,优先级最低的排在最前面;
sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i], potentialVictims[j]) })
// Try to reprieve as many pods as possible. We first try to reprieve the PDB
// violating victims and then other non-violating ones. In both cases, we start
// from the highest priority victims.
// (3)将排好序的pod列表按是否定义了PDB分成两个pod列表;
violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims, pdbs)
reprievePod := func(p *v1.Pod) (bool, error) {
if err := addPod(p); err != nil {
return false, err
}
fits, _, _, _ := g.podFitsOnNode(ctx, state, pod, meta, nodeInfo, false)
if !fits {
if err := removePod(p); err != nil {
return false, err
}
victims = append(victims, p)
klog.V(5).Infof("Pod %v/%v is a potential preemption victim on node %v.", p.Namespace, p.Name, nodeInfo.Node().Name)
}
return fits, nil
}
// (4)先遍历定义了PDB的pod列表,逐一删除pod(被删除的pod称为被抢占pod),每删除一个pod,调度预选算法,看pod在该node上是否满足调度条件,如满足则直接返回该node适合被抢占、被抢占的pod列表、被抢占pod中定义了PDB的pod数量;
for _, p := range violatingVictims {
if fits, err := reprievePod(p); err != nil {
klog.Warningf("Failed to reprieve pod %q: %v", p.Name, err)
return nil, 0, false
} else if !fits {
numViolatingVictim++
}
}
// (5)假如遍历完定义了PDB的pod列表后,抢占pod在该node上任然不满足调度条件,则继续遍历没有定义PDB的pod列表,逐一删除pod,每删除一个pod,调度预选算法,看pod在该node上是否满足调度条件,如满足则直接返回该node适合被抢占、被抢占的pod列表、被抢占pod中定义了PDB的pod数量;
// Now we try to reprieve non-violating victims.
for _, p := range nonViolatingVictims {
if _, err := reprievePod(p); err != nil {
klog.Warningf("Failed to reprieve pod %q: %v", p.Name, err)
return nil, 0, false
}
}
// (6)如果上述两个pod列表里的pod都被删除后,抢占pod在该node上任然不满足调度条件,则该node不适合被抢占,return。
return victims, numViolatingVictim, true
}
复制代码
3.3 pickOneNodeForPreemption
pickOneNodeForPreemption 函数,从可被抢占的 node 节点列表中挑选出一个 node 节点,该函数将按顺序参照下列规则来挑选最优的被抢占 node,直到某个条件能够选出唯一的一个 node 节点:
(1)node 节点没有被抢占 pod 的,优先选择;
(2)被抢占 pod 中定义了 PDB 的 pod 数量最少的节点;
(3)高优先级 pod 数量最少的节点;
(4)对 node 节点上所有被抢占 pod 的优先级进行相加,选取其值最小的节点;
(5)选择被抢占 pod 数量最少的 node 节点;
(6)选择被抢占 pod 中运行时间最短的 pod 所在 node 节点;
(7)返回符合上述条件的最后一个 node 节点;
// pkg/scheduler/core/generic_scheduler.go
func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*extenderv1.Victims) *v1.Node {
if len(nodesToVictims) == 0 {
return nil
}
minNumPDBViolatingPods := int64(math.MaxInt32)
var minNodes1 []*v1.Node
lenNodes1 := 0
for node, victims := range nodesToVictims {
// (1)node节点没有被抢占pod的,优先选择
if len(victims.Pods) == 0 {
// We found a node that doesn't need any preemption. Return it!
// This should happen rarely when one or more pods are terminated between
// the time that scheduler tries to schedule the pod and the time that
// preemption logic tries to find nodes for preemption.
return node
}
//(2)与PDB冲突的pod数量最少的节点
numPDBViolatingPods := victims.NumPDBViolations
if numPDBViolatingPods < minNumPDBViolatingPods {
minNumPDBViolatingPods = numPDBViolatingPods
minNodes1 = nil
lenNodes1 = 0
}
if numPDBViolatingPods == minNumPDBViolatingPods {
minNodes1 = append(minNodes1, node)
lenNodes1++
}
}
if lenNodes1 == 1 {
return minNodes1[0]
}
// (3)高优先级pod数量最少的节点
// There are more than one node with minimum number PDB violating pods. Find
// the one with minimum highest priority victim.
minHighestPriority := int32(math.MaxInt32)
var minNodes2 = make([]*v1.Node, lenNodes1)
lenNodes2 := 0
for i := 0; i < lenNodes1; i++ {
node := minNodes1[i]
victims := nodesToVictims[node]
// highestPodPriority is the highest priority among the victims on this node.
highestPodPriority := podutil.GetPodPriority(victims.Pods[0])
if highestPodPriority < minHighestPriority {
minHighestPriority = highestPodPriority
lenNodes2 = 0
}
if highestPodPriority == minHighestPriority {
minNodes2[lenNodes2] = node
lenNodes2++
}
}
if lenNodes2 == 1 {
return minNodes2[0]
}
// (4)对node节点上所有被抢占pod的优先级进行相加,选取其值最小的节点
// There are a few nodes with minimum highest priority victim. Find the
// smallest sum of priorities.
minSumPriorities := int64(math.MaxInt64)
lenNodes1 = 0
for i := 0; i < lenNodes2; i++ {
var sumPriorities int64
node := minNodes2[i]
for _, pod := range nodesToVictims[node].Pods {
// We add MaxInt32+1 to all priorities to make all of them >= 0. This is
// needed so that a node with a few pods with negative priority is not
// picked over a node with a smaller number of pods with the same negative
// priority (and similar scenarios).
sumPriorities += int64(podutil.GetPodPriority(pod)) + int64(math.MaxInt32+1)
}
if sumPriorities < minSumPriorities {
minSumPriorities = sumPriorities
lenNodes1 = 0
}
if sumPriorities == minSumPriorities {
minNodes1[lenNodes1] = node
lenNodes1++
}
}
if lenNodes1 == 1 {
return minNodes1[0]
}
// (5)选择被抢占pod数量最少的node节点;
// There are a few nodes with minimum highest priority victim and sum of priorities.
// Find one with the minimum number of pods.
minNumPods := math.MaxInt32
lenNodes2 = 0
for i := 0; i < lenNodes1; i++ {
node := minNodes1[i]
numPods := len(nodesToVictims[node].Pods)
if numPods < minNumPods {
minNumPods = numPods
lenNodes2 = 0
}
if numPods == minNumPods {
minNodes2[lenNodes2] = node
lenNodes2++
}
}
if lenNodes2 == 1 {
return minNodes2[0]
}
// (6)选择被抢占pod中运行时间最短的pod所在node节点;
// There are a few nodes with same number of pods.
// Find the node that satisfies latest(earliestStartTime(all highest-priority pods on node))
latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]])
if latestStartTime == nil {
// If the earliest start time of all pods on the 1st node is nil, just return it,
// which is not expected to happen.
klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", minNodes2[0])
return minNodes2[0]
}
nodeToReturn := minNodes2[0]
for i := 1; i < lenNodes2; i++ {
node := minNodes2[i]
// Get earliest start time of all pods on the current node.
earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
if earliestStartTimeOnNode == nil {
klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", node)
continue
}
if earliestStartTimeOnNode.After(latestStartTime.Time) {
latestStartTime = earliestStartTimeOnNode
nodeToReturn = node
}
}
// (7)返回符合上述条件的最后一个node节点
return nodeToReturn
}
复制代码
总结
kube-scheduler 简介
kube-scheduler 组件是 kubernetes 中的核心组件之一,主要负责 pod 资源对象的调度工作,具体来说,kube-scheduler 组件负责根据调度算法(包括预选算法和优选算法)将未调度的 pod 调度到合适的最优的 node 节点上。
kube-scheduler 架构图
kube-scheduler 的大致组成和处理流程如下图,kube-scheduler 对 pod、node 等对象进行了 list/watch,根据 informer 将未调度的 pod 放入待调度 pod 队列,并根据 informer 构建调度器 cache(用于快速获取需要的 node 等对象),然后sched.scheduleOne
方法为 kube-scheduler 组件调度 pod 的核心处理逻辑所在,从未调度 pod 队列中取出一个 pod,经过预选与优选算法,最终选出一个最优 node,上述步骤都成功则更新 cache 并异步执行 bind 操作,也就是更新 pod 的 nodeName 字段,失败则进入抢占逻辑,至此一个 pod 的调度工作完成。
kube-scheduler 抢占调度概述
优先级和抢占机制,解决的是 Pod 调度失败时该怎么办的问题。
正常情况下,当一个 pod 调度失败后,就会被暂时 “搁置” 处于 pending 状态,直到 pod 被更新或者集群状态发生变化,调度器才会对这个 pod 进行重新调度。
但是有的时候,我们希望给 pod 分等级,即分优先级。当一个高优先级的 Pod 调度失败后,该 Pod 并不会被“搁置”,而是会“挤走”某个 Node 上的一些低优先级的 Pod,这样一来就可以保证高优先级 Pod 会优先调度成功。
抢占发生的原因,一定是一个高优先级的 pod 调度失败,我们称这个 pod 为“抢占者”,称被抢占的 pod 为“牺牲者”(victims)。
kube-scheduler 抢占逻辑流程图
下方处理流程图展示了 kube-scheduler 抢占逻辑的核心处理步骤,在开始抢占逻辑处理之前,会先进行抢占调度功能是否开启的判断。
评论