编写k8s调度框架Score插件

Falldio深圳

前阵子遇到这样一个题目,要求设计一个分布式 GPU 任务调度系统,需求归纳起来有几点:

  • 支持任务启动、终止等常见操作;
  • 支持优先级配置,资源不足的情况下尽可能满足高优先级用户的需求;
  • 尽量提高系统的资源利用率;
  • 保证任务可以运行到指定型号的显卡上;
  • 利用 k8s 的任务调度机制扩展实现。

这算是我第一次思考如何解决一个应用侧的分布式问题,提出的解决方案多有不成熟,但迈出第一步还是很有成就感~

常见操作实现

按我理解每个用户请求都对应一个 GPUTask 的概念, 因此可以使用 k8s 的自定义资源(Custom Defined Resource,CDR)功能为集群拓展 API,模板文件大概如下:

yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: gputasks.falldio.com
spec:
  group: falldio.com
  names:
    kind: GPUTask
    plural: gputasks
    singular: gputask
  scope: Namespaced
  versions:
    - name: v1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                memory:
                  type: string
                cpu:
                  type: string
                gpu:
                  type: string
                gpu-amount:
                  type: string
                priority:
                  type: string

这是一个资源模板,主要属性包括:

  1. memory :计算任务的内存需求;
  2. cpu :任务的 CPU 核数需求;
  3. gpu :任务指定的显卡型号;
  4. gpu-amount :任务需要的显卡数量;
  5. priority :任务的优先级配置。

kubectl apply -f 即可注册资源到集群,然后可以创建具体的 GPUTask 实例,如:

yaml
apiversion: falldio.com/v1
kind: gputask
metadata:
  name: rtx4090
spec:
  memory: 16gi
  cpu: '8'
  gpu: rtx4090
  gpu-amount: '1'
  priority: gpu-low-priority

任务的启动和终止操作即可映射为对集群中的 GPUTask 资源的创建和删除操作,需要的话还可以进一步给资源增加 uid,再用 go-client 把这些常见功能封起来,使用自定义控制器动态生成和删除对应的 Pod

优先级配置

优先级配置主要通过 PodPriorityClass 机制实现,当出现 Pod 无法被调度的情况时,k8s 的调度程序会尝试抢占较低优先级的 Pod ,以满足悬决 Pod 的调度需求。

yaml
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: gpu-low-priority
value: 1
globalDefault: false
description: "低优先级GPU任务。"
---
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: gpu-medium-priority
value: 10
globalDefault: false
description: "中等优先级GPU任务。"
---
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: gpu-high-priority
value: 100
globalDefault: false
description: "高优先级GPU任务。"

这里我们同样定义三种简单的优先级,value 对应 k8s 中的优先等级。 而上一节的 CDR 中已经配置了对应的优先级内容。

Scheduler Score Plugin

为了提高资源利用率,我们需要了解 k8s 的调度框架。 简单地说,可以在如下时机对调度框架进行扩展:

调度框架

  • PreEnqueue: Pod 进入调度队列之前;
  • Sort:对调度队列中的 Pod 排序;
  • PreFilter:在对集群节点进行过滤之前;
  • Filter:这一阶段检查节点的有效性,只有满足某些条件的节点才能被用于配置 Pod ,比如本问题中特定的显卡型号;
  • PostFilter:过滤之后如果没有满足要求的节点,则进入该阶段;
  • PreScore:在下一阶段之前记录节点状态;
  • Score:按照一定规则对节点进行打分, Pod 后面会被分配到得分最高的节点上;
  • NormalizeScore:对得分进行微调,使其分布在特定区间;
  • Reserve:锁定选择的节点,避免竟态条件;
  • Permit:最终允许 Pod 被调度到指定节点上。

我们要做的是自定义打分插件,实现在 Score 和 NormalizeScore 阶段的自定义。

  1. clone scheduler-plugins 项目;
  2. 在 pkg 目录下创建自定义插件目录,实现 framework.ScorePlugin 接口。

对于 Score,我们主要根据节点 CPU、内存和显卡数量的余量进行打分:

go
func (gs *GPUScore) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
	nodeInfo, err := gs.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
	if err != nil {
		return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
	}
	return gs.score(pod, nodeInfo)
}

func (gs *GPUScore) score(pod *v1.Pod, nodeInfo *framework.NodeInfo) (int64, *framework.Status) {
	node := nodeInfo.Node()
	if node == nil {
		return 0, framework.NewStatus(framework.Error, "node not found")
	}
	if nodeInfo.Allocatable == nil {
		return 0, framework.NewStatus(framework.Error, "allocatable resources not found")
	}

	cpuScore := gs.calculateResourceScore(v1.ResourceCPU, pod, nodeInfo)
	memScore := gs.calculateResourceScore(v1.ResourceMemory, pod, nodeInfo)

	score := cpuScore + memScore
	return score, nil
}

func (gs *GPUScore) calculateResourceScore(resource v1.ResourceName, pod *v1.Pod, nodeInfo *framework.NodeInfo) int64 {
	podRequest := calculatePodResourceRequest(pod, resource)
	var allocatable int64
	switch resource {
	case v1.ResourceCPU:
		allocatable = nodeInfo.Allocatable.MilliCPU
	case v1.ResourceMemory:
		allocatable = nodeInfo.Allocatable.Memory
	case "falldio.com/gpu":
		allocatable = nodeInfo.Allocatable.ScalarResources["falldio.com/gpu"]
	}
	return allocatable - podRequest
}

至于 NomalizeScore 则更简单,我们使用常见的归一化公式即可:

go
func (gs *GPUScore) NormalizeScore(ctx context.Context, state *framework.CycleState, p *v1.Pod, scores framework.NodeScoreList) *framework.Status {
	var max int64 = math.MinInt64
	var min int64 = math.MaxInt64
	for _, score := range scores {
		if score.Score > max {
			max = score.Score
		}
		if score.Score < min {
			min = score.Score
		}
	}

	oldRange := max - min
	newRange := framework.MaxNodeScore - framework.MinNodeScore
	for i, score := range scores {
		if oldRange == 0 {
			scores[i].Score = framework.MinNodeScore
		} else {
			scores[i].Score = ((score.Score - min) * newRange / oldRange) + framework.MinNodeScore
		}
	}
	return nil
}

通过 nodeSelector 指定显卡

实现指定显卡功能的最简单的方法是给集群节点打 label,然后在 Pod 的 nodeSelector 字段指示显卡型号信息。在 go-client 中即按如下方式声明 Pod

go
newPod := &v1.Pod{
	ObjectMeta: metav1.ObjectMeta{
		Name:      gpuTask.Name,
		Namespace: gpuTask.Namespace,
		Labels:    map[string]string{"app": "gpu-task"},
	},
	Spec: v1.PodSpec{
		NodeSelector: map[string]string{
			"falldio.com/gpu": gpuTask.Spec.GPU,
		},
		Containers: []v1.Container{
			{
				Name:  "gpu-task",
				Image: "nvidia/cuda:11.0-base",
				Resources: v1.ResourceRequirements{
					Limits: v1.ResourceList{
						v1.ResourceCPU:    resource.MustParse(gpuTask.Spec.CPU),
						v1.ResourceMemory: resource.MustParse(gpuTask.Spec.Memory),
						"falldio.com/gpu": resource.MustParse(gpuTask.Spec.GPUAmount),
					},
				},
			},
		},
		PriorityClassName: string(gpuTask.Spec.Priority),
	},
}