写点什么

改造 Kubernetes 自定义调度器

  • 2024-05-27
    福建
  • 本文字数:4546 字

    阅读完需:约 15 分钟

Overview


Kubernetes 默认调度器在调度 Pod 时并不关心特殊资源例如磁盘、GPU 等,因此突发奇想来改造调度器,在翻阅官方调度器框架[1]、调度器配置[2]和参考大佬的文章[3]后,自己也来尝试改写一下。


环境配置


相关软件版本:

  • Kubernetes 版本:v1.19.0

  • Docker 版本:v26.1.2

  • Prometheus 版本:v2.49

  • Node Exporter 版本:v1.7.0

集群内有 1 个 master 和 3 个 node。


实验部分


项目总览


项目结构如下:

.├── Dockerfile├── deployment.yaml├── go.mod├── go.sum├── main.go├── pkg│   ├── cpu│   │   └── cputraffic.go│   ├── disk│   │   └── disktraffic.go│   ├── diskspace│   │   └── diskspacetraffic.go│   ├── memory│   │   └── memorytraffic.go│   ├── network│   │   └── networktraffic.go│   └── prometheus.go├── scheduler├── scheduler.conf└── scheduler.yaml
复制代码


插件部分


下面以构建内存插件为例。


定义插件名称、变量和结构体

const MemoryPlugin = "MemoryTraffic"var _ = framework.ScorePlugin(&MemoryTraffic{})
type MemoryTraffic struct { prometheus *pkg.PrometheusHandle handle framework.FrameworkHandle}
复制代码


下面来实现 framework.FrameworkHandle 的接口。


先定义插件初始化入口

func New(plArgs runtime.Object, h framework.FrameworkHandle) (framework.Plugin, error) {    args := &MemoryTrafficArgs{}    if err := fruntime.DecodeInto(plArgs, args); err != nil {        return nil, err    }
klog.Infof("[MemoryTraffic] args received. Device: %s; TimeRange: %d, Address: %s", args.DeviceName, args.TimeRange, args.IP)
return &MemoryTraffic{ handle: h, prometheus: pkg.NewProme(args.IP, args.DeviceName, time.Minute*time.Duration(args.TimeRange)), }, nil}
复制代码


实现 Score 接口,Score 进行初步打分

func (n *MemoryTraffic) Score(ctx context.Context, state *framework.CycleState, p *corev1.Pod, nodeName string) (int64, *framework.Status) {    nodeBandwidth, err := n.prometheus.MemoryGetGauge(nodeName)    if err != nil {        return 0, framework.NewStatus(framework.Error, fmt.Sprintf("error getting node bandwidth measure: %s", err))    }    bandWidth := int64(nodeBandwidth.Value)    klog.Infof("[MemoryTraffic] node '%s' bandwidth: %v", nodeName, bandWidth)    return bandWidth, nil}
复制代码


实现 NormalizeScore,对上一步 Score 的打分进行修正

func (n *MemoryTraffic) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, scores framework.NodeScoreList) *framework.Status {    var higherScore int64    for _, node := range scores {        if higherScore < node.Score {            higherScore = node.Score        }    }    // 计算公式为,满分 - (当前内存使用 / 总内存 * 100)    // 公式的计算结果为,内存使用率越大的节点,分数越低    for i, node := range scores {        scores[i].Score = node.Score * 100 / higherScore        klog.Infof("[MemoryTraffic] Nodes final score: %v", scores[i].Score)    }
klog.Infof("[MemoryTraffic] Nodes final score: %v", scores) return nil}
复制代码


配置插件名称和返回 ScoreExtension

func (n *MemoryTraffic) Name() string {    return MemoryPlugin}
// 如果返回framework.ScoreExtensions 就需要实现framework.ScoreExtensionsfunc (n *MemoryTraffic) ScoreExtensions() framework.ScoreExtensions { return n}
复制代码


Prometheus 部分


首先来编写查询内存可用率的 PromQL

const memoryMeasureQueryTemplate = ` (avg_over_time(node_memory_MemAvailable_bytes[30m]) / avg_over_time(node_memory_MemTotal_bytes[30m])) * 100 * on(instance) group_left(nodename) (node_uname_info{nodename="%s"})`
复制代码


然后来声明 PrometheusHandle

type PrometheusHandle struct {    deviceName string    timeRange  time.Duration    ip         string    client     v1.API}
复制代码


另外在插件部分也要声明查询 Prometheus 的参数结构体

type MemoryTrafficArgs struct {    IP         string `json:"ip"`    DeviceName string `json:"deviceName"`    TimeRange  int    `json:"timeRange"`}
复制代码


编写初始化 Prometheus 插件入口

func NewProme(ip, deviceName string, timeRace time.Duration) *PrometheusHandle {    client, err := api.NewClient(api.Config{Address: ip})    if err != nil {        klog.Fatalf("[Prometheus Plugin] FatalError creating prometheus client: %s", err.Error())    }    return &PrometheusHandle{        deviceName: deviceName,        ip:         ip,        timeRange:  timeRace,        client:     v1.NewAPI(client),    }}
复制代码


编写通用查询接口,可供其他类型资源查询

func (p *PrometheusHandle) query(promQL string) (model.Value, error) {    results, warnings, err := p.client.Query(context.Background(), promQL, time.Now())    if len(warnings) > 0 {        klog.Warningf("[Prometheus Query Plugin] Warnings: %v\n", warnings)    }
return results, err}
复制代码


获取内存可用率接口

func (p *PrometheusHandle) MemoryGetGauge(node string) (*model.Sample, error) {    value, err := p.query(fmt.Sprintf(memoryMeasureQueryTemplate, node))    fmt.Println(fmt.Sprintf(memoryMeasureQueryTemplate, node))    if err != nil {        return nil, fmt.Errorf("[MemoryTraffic Plugin] Error querying prometheus: %w", err)    }
nodeMeasure := value.(model.Vector) if len(nodeMeasure) != 1 { return nil, fmt.Errorf("[MemoryTraffic Plugin] Invalid response, expected 1 value, got %d", len(nodeMeasure)) } return nodeMeasure[0], nil
}
复制代码


然后在程序入口里启用插件并执行

func main() {    rand.Seed(time.Now().UnixNano())    command := app.NewSchedulerCommand(        app.WithPlugin(network.NetworkPlugin, network.New),        app.WithPlugin(disk.DiskPlugin, disk.New),        app.WithPlugin(diskspace.DiskSpacePlugin, diskspace.New),        app.WithPlugin(cpu.CPUPlugin, cpu.New),        app.WithPlugin(memory.MemoryPlugin, memory.New),    )    // 对于外部注册一个plugin    // command := app.NewSchedulerCommand(    // 	app.WithPlugin("example-plugin1", ExamplePlugin1.New))
if err := command.Execute(); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) }}
复制代码


配置部分


为方便观察,这里使用二进制方式运行,准备运行时的配置文件

apiVersion: kubescheduler.config.k8s.io/v1beta1kind: KubeSchedulerConfigurationclientConnection:  kubeconfig: /etc/kubernetes/scheduler.confprofiles:- schedulerName: custom-scheduler  plugins:    score:      enabled:      - name: "CPUTraffic"        weight: 3      - name: "MemoryTraffic"        weight: 4      - name: "DiskSpaceTraffic"        weight: 3      - name: "NetworkTraffic"        weight: 2      disabled:      - name: "*"  pluginConfig:    - name: "NetworkTraffic"      args:        ip: "http://172.19.32.140:9090"        deviceName: "eth0"        timeRange: 60       - name: "CPUTraffic"      args:        ip: "http://172.19.32.140:9090"        deviceName: "eth0"        timeRange: 0    - name: "MemoryTraffic"      args:        ip: "http://172.19.32.140:9090"        deviceName: "eth0"        timeRange: 0    - name: "DiskSpaceTraffic"      args:        ip: "http://172.19.32.140:9090"        deviceName: "eth0"        timeRange: 0
复制代码


kubeconfig 处为 master 节点的 scheduler.conf,以实际路径为准,内包含集群的证书哈希,ip 为部署 Prometheus 节点的 ip,端口为 Promenade 配置中对外暴露的端口。


将二进制文件和 scheduler.yaml 放至 master 同一目录下运行:

./scheduler --logtostderr=true \	--address=127.0.0.1 \	--v=6 \	--config=`pwd`/scheduler.yaml \	--kubeconfig="/etc/kubernetes/scheduler.conf" \
复制代码


验证结果


准备一个要部署的 Pod,使用指定的调度器名称

apiVersion: apps/v1kind: Deploymentmetadata:  name: gin  namespace: default  labels:    app: ginspec:  replicas: 2  selector:    matchLabels:      app: gin  template:    metadata:      labels:        app: gin    spec:      schedulerName: my-custom-scheduler  # 使用自定义调度器      containers:      - name: gin        image: jaydenchang/k8s_test:latest        imagePullPolicy: Always        command: ["./app"]        ports:        - containerPort: 9999          protocol: TCP
复制代码


最后的可以查看日志,部分日志如下:

I0808 17:32:35.138289   27131 memorytraffic.go:83] [MemoryTraffic] node 'node1' bandwidth: %!s(int64=2680340)I0808 17:32:35.138763   27131 memorytraffic.go:70] [MemoryTraffic] Nodes final score: [{node1 2680340} {node2 0}]I0808 17:32:35.138851   27131 memorytraffic.go:70] [MemoryTraffic] Nodes final score: [{node1 71} {node2 0}]I0808 17:32:35.138911   27131 memorytraffic.go:73] [MemoryTraffic] Nodes final score: [{node1 71} {node2 0}]I0808 17:32:35.139565   27131 default_binder.go:51] Attempting to bind default/go-deployment-66878c4885-b4b7k to node1I0808 17:32:35.141114   27131 eventhandlers.go:225] add event for scheduled pod default/go-deployment-66878c4885-b4b7kI0808 17:32:35.141714   27131 eventhandlers.go:205] delete event for unscheduled pod default/go-deployment-66878c4885-b4b7kI0808 17:32:35.143504   27131 scheduler.go:609] "Successfully bound pod to node" pod="default/go-deployment-66878c4885-b4b7k" node="node1" evaluatedNodes=2 feasibleNodes=2I0808 17:32:35.104540   27131 scheduler.go:609] "Successfully bound pod to node" pod="default/go-deployment-66878c4885-b4b7k" node="node1" evaluatedNodes=2 feasibleNodes=2
复制代码


文章转载自:JaydenChang

原文链接:https://www.cnblogs.com/jaydenchang/p/18213489

体验地址:http://www.jnpfsoft.com/?from=infoq


用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
改造 Kubernetes 自定义调度器_Kubernetes_快乐非自愿限量之名_InfoQ写作社区