YoloKokura

这篇blog延续调度器插件的话题,为我们的GPUTask CDR增加监听功能,这主要通过自定义控制器来实现,最终达到如下效果:

  1. 当集群出现新增GPUTask资源时,控制器创建对应的Pod,由kube-scheduler配合之前编写的Score插件分配到指定节点;
  2. 周期性重新调度GPUTask对应的Pod。

代码生成工具

首先要做的是利用k8s自带的代码生成工具生成GPUTask对应的WorkQueue、Informer和Client等对象。 集群中GPUTask资源的变化会通过Informer存入WorkQueue,我们的控制器将消费WorkQueue获得变化信息,再按照预定逻辑对集群状态做出调整,达到期望状态。

第一步先按照规范建立项目结构:

/artifacts
/hack
/pkg
    /apis
        /gpucontroller
            /v1
                doc.go
                register.go
                types.go
            register.go
gpu_controller.go
main.go

hack目录下需要存放代码自动生成脚本(update-codegen.sh),可从前面提到的规范项目模板中获取。 然后,需要分别各个文件中提供代码生成必要的信息:

gpucontroller/register.go

go
package gpucontroller

const (
	GroupName = "falldio.com"
	Version   = "v1"
)

v1/doc.go

go
// +k8s:deepcopy-gen=package

// +groupName=falldio.com
package v1

vi/register.go

go
package v1

import (
	"falldio.github.io/gpu-controller/pkg/apis/gpucontroller"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
)

var SchemeGroupVersion = schema.GroupVersion{
	Group:   gpucontroller.GroupName,
	Version: gpucontroller.Version,
}

var (
	SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
	AddToScheme   = SchemeBuilder.AddToScheme
)

func Resource(resource string) schema.GroupResource {
	return SchemeGroupVersion.WithResource(resource).GroupResource()
}

func Kind(kind string) schema.GroupKind {
	return SchemeGroupVersion.WithKind(kind).GroupKind()
}

func addKnownTypes(scheme *runtime.Scheme) error {
	scheme.AddKnownTypes(
		SchemeGroupVersion,
		&GPUTask{},
		&GPUTaskList{},
	)

	metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
	return nil
}

v1/types.go

go
package v1

import (
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +genclient
// +genclient:noStatus
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

type GPUTask struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`
	Spec              GPUTaskSpec `json:"spec"`
}

type GPUTaskSpec struct {
	Memory    string `json:"memory"`
	GPU       string `json:"gpu"`
	GPUAmount string `json:"gpu-amount"`
	CPU       string `json:"cpu"`
	Priority  string `json:"priority"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

type GPUTaskList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata"`

	Items []GPUTask `json:"items"`
}

执行脚本程序即可生成对应的deepcopy和客户端相关文件。

编写controller相关逻辑

GPUController及其工厂函数的逻辑如下:

go
type GPUController struct {
	kubeClientset    kubernetes.Interface
	gpuTaskClientset clientset.Interface

	gpuTasksLister listers.GPUTaskLister
	gpuTasksSynced cache.InformerSynced

	workqueue workqueue.RateLimitingInterface

	recorder record.EventRecorder
}

func NewController(
	kubeClientset kubernetes.Interface,
	gpuTaskClientset clientset.Interface,
	gpuTaskInformer informers.GPUTaskInformer,
	rescheduleDuration time.Duration,
) *GPUController {
	utilruntime.Must(gputaskscheme.AddToScheme(scheme.Scheme))
	klog.V(4).Info("Creating event broadcaster")
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(klog.Infof)
	eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClientset.CoreV1().Events("")})
	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerAgentName})

	controller := &GPUController{
		kubeClientset:    kubeClientset,
		gpuTaskClientset: gpuTaskClientset,
		gpuTasksLister:   gpuTaskInformer.Lister(),
		gpuTasksSynced:   gpuTaskInformer.Informer().HasSynced,
		workqueue: workqueue.NewRateLimitingQueueWithConfig(
			workqueue.DefaultControllerRateLimiter(),
			workqueue.RateLimitingQueueConfig{
				Name: "GPUTasks",
			},
		),
		recorder: recorder,
	}

	klog.Info("Setting up event handlers")

	gpuTaskInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    controller.addGPUTask,
		UpdateFunc: controller.updateGPUTask,
		DeleteFunc: controller.deleteGPUTask,
	})

	if rescheduleDuration.Seconds() == 0 {
		klog.Info("Reschduling is disabled")
	} else {
		klog.Info("Rescheduling is enabled")
		ticker := time.NewTicker(rescheduleDuration)
		go func() {
			for range ticker.C {
				controller.ReschedulePods()
			}
		}()
	}

	return controller
}

可见,其中包含k8s客户端、GPUTask客户端,GPUTaskInformer、Workqueue及事件记录相关对象。 另外,为了满足周期性调度的需要,这里还额外编写了利用ticker周期性唤起ReschedulePods的代码。

在Informer中,我们针对增、改、删资源事件注册相关逻辑,其实就是将对应事件存入workqueue中:

go
func (c *GPUController) addGPUTask(obj any) {
	var key string
	var err error
	if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
		klog.Error(err)
		runtime.HandleError(err)
	}

	c.workqueue.AddRateLimited(key)
}

func (c *GPUController) updateGPUTask(old any, new any) {
	oldGPUTask := old.(*gpucontrollerv1.GPUTask)
	newGPUTask := new.(*gpucontrollerv1.GPUTask)
	if oldGPUTask.ResourceVersion == newGPUTask.ResourceVersion {
		return
	}
	c.addGPUTask(new)
}

func (c *GPUController) deleteGPUTask(obj any) {
	var key string
	var err error
	if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil {
		runtime.HandleError(err)
		return
	}
	c.workqueue.AddRateLimited(key)
}

我们在Run函数中持续消费workqueue:

go
func (c *GPUController) Run(ctx context.Context, workers int) error {
	defer runtime.HandleCrash()
	defer c.workqueue.ShutDown()
	logger := klog.FromContext(ctx)
	logger.Info("Starting GPU Controller")

	if !cache.WaitForCacheSync(ctx.Done(), c.gpuTasksSynced) {
		return fmt.Errorf("timed out waiting for caches to sync")
	}

	klog.Info("Starting workers", "count", workers)
	for i := 0; i < workers; i++ {
		go wait.UntilWithContext(ctx, c.runWorker, time.Second)
	}

	klog.Info("GPU Controller started")
	<-ctx.Done()
	klog.Info("Stopping GPU Controller")

	return nil
}

func (c *GPUController) runWorker(ctx context.Context) {
	for c.processNextItem(ctx) {
	}
}

func (c *GPUController) processNextItem(ctx context.Context) bool {
	obj, shutdown := c.workqueue.Get()
	logger := klog.FromContext(ctx)
	if shutdown {
		return false
	}

	err := func(obj any) error {
		defer c.workqueue.Done(obj)
		var key string
		var ok bool

		if key, ok = obj.(string); !ok {
			c.workqueue.Forget(obj)
			utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
			return nil
		}

		if err := c.syncHanlder(key); err != nil {
			return fmt.Errorf("syncing %q: %v", key, err)
		}

		c.workqueue.Forget(obj)
		logger.Info("Successfully synced " + key)
		return nil
	}(obj)

	if err != nil {
		runtime.HandleError(err)
	}
	return true
}

func (c *GPUController) syncHanlder(key string) error {
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
		return nil
	}

	klog.Infof("namespace: %s, name: %s", namespace, name)
	gpuTask, err := c.gpuTasksLister.GPUTasks(namespace).Get(name)
	if err != nil {
		if errors.IsNotFound(err) {
			klog.Infof("GPUTask '%s' in work queue no longer exists, deleting the Pod.", key)
			c.kubeClientset.CoreV1().Pods(namespace).Delete(context.Background(), name, metav1.DeleteOptions{})
			return nil
		}

		runtime.HandleError(fmt.Errorf("failed to list GPUTask by: %s/%s", namespace, name))

		return err
	}

	klog.Infof("Syncing GPUTask '%+#v'", gpuTask)
	newPod := &v1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name:      gpuTask.Name,
			Namespace: gpuTask.Namespace,
			Labels:    map[string]string{"app": "gpu-task"},
		},
		Spec: v1.PodSpec{
			NodeSelector: map[string]string{
				"falldio.com/gpu": gpuTask.Spec.GPU,
			},
			Containers: []v1.Container{
				{
					Name:  "gpu-task",
					Image: "nvidia/cuda:11.0-base",
					Resources: v1.ResourceRequirements{
						Limits: v1.ResourceList{
							v1.ResourceCPU:    resource.MustParse(gpuTask.Spec.CPU),
							v1.ResourceMemory: resource.MustParse(gpuTask.Spec.Memory),
							"falldio.com/gpu": resource.MustParse(gpuTask.Spec.GPUAmount),
						},
					},
				},
			},
			PriorityClassName: string(gpuTask.Spec.Priority),
		},
	}
	pod, err := c.kubeClientset.CoreV1().Pods(gpuTask.Namespace).Create(context.Background(), newPod, metav1.CreateOptions{})
	if err != nil {
		panic(err)
	}
	klog.Infof("Created Pod: %s", pod.Name)

	c.recorder.Event(gpuTask, v1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
	return nil
}

对于正常情况下的GPUTask,我们从workqueue可以获取事件,拿到GPUTask的key,再从gpuTaskLister获取该资源, 然后根据其信息通过kube客户端创建Pod即可。 如果出现删除事件,我们将无法得到GPUTask CDR这种情况下直接将Pod删除即可。

对于周期性调度:

go
func (c *GPUController) ReschedulePods() {
	labelSelector := metav1.LabelSelector{
		MatchLabels: map[string]string{"app": "gpu-task"},
	}

	podList, err := c.kubeClientset.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{
		LabelSelector: metav1.FormatLabelSelector(&labelSelector),
	})
	if err != nil {
		panic(err)
	}

	for _, pod := range podList.Items {
		c.kubeClientset.CoreV1().Pods(pod.Namespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{})
		c.kubeClientset.CoreV1().Pods(pod.Namespace).Create(context.Background(), &pod, metav1.CreateOptions{})
	}
}

我们要做的也仅仅是找到由GPUTask得到的Pod,全部删除再创建一次。

编写入口程序

go
func main() {
	klog.InitFlags(nil)
	flag.Parse()

	ctx := signals.SetupSignalHandler()
	logger := klog.FromContext(ctx)

	cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
	if err != nil {
		logger.Error(err, "Error building kubeconfig")
		klog.FlushAndExit(klog.ExitFlushTimeout, 1)
	}

	logger.Info("Setting up client for kubernetes")
	kubeClient, err := kubernetes.NewForConfig(cfg)
	if err != nil {
		logger.Error(err, "Error building kubernetes clientset")
		klog.FlushAndExit(klog.ExitFlushTimeout, 1)
	}

	logger.Info("Setting up client for gputask")
	gpuTaskClientset, err := clientset.NewForConfig(cfg)
	if err != nil {
		logger.Error(err, "Error building gputask clientset")
		klog.FlushAndExit(klog.ExitFlushTimeout, 1)
	}

	gpuTaskInformerFactory := informers.NewSharedInformerFactory(gpuTaskClientset, time.Second*30)

	controller := NewController(kubeClient, gpuTaskClientset, gpuTaskInformerFactory.Falldio().V1().GPUTasks(), rescheduleDuration)

	gpuTaskInformerFactory.Start(ctx.Done())

	if err = controller.Run(ctx, 2); err != nil {
		logger.Error(err, "Error running controller")
		klog.FlushAndExit(klog.ExitFlushTimeout, 1)
	}
}

在main函数中,我们根据用户命令行传参(这里忽略flag声明和parse部分)完成客户端初始化、Informer生成和Controller初始化,并调用之前的Run函数进入到事件循环。

至此,整个GPU调度的基本逻辑已经完成。

Tags: