这篇blog延续调度器插件的话题,为我们的GPUTask CDR增加监听功能,这主要通过自定义控制器来实现,最终达到如下效果:
- 当集群出现新增GPUTask资源时,控制器创建对应的Pod,由kube-scheduler配合之前编写的Score插件分配到指定节点;
- 周期性重新调度GPUTask对应的Pod。
代码生成工具
首先要做的是利用k8s自带的代码生成工具生成GPUTask对应的WorkQueue、Informer和Client等对象。 集群中GPUTask资源的变化会通过Informer存入WorkQueue,我们的控制器将消费WorkQueue获得变化信息,再按照预定逻辑对集群状态做出调整,达到期望状态。
第一步先按照规范建立项目结构:
/artifacts
/hack
/pkg
/apis
/gpucontroller
/v1
doc.go
register.go
types.go
register.go
gpu_controller.go
main.go
hack
目录下需要存放代码自动生成脚本(update-codegen.sh
),可从前面提到的规范项目模板中获取。 然后,需要分别各个文件中提供代码生成必要的信息:
gpucontroller/register.go
:
package gpucontroller
const (
GroupName = "falldio.com"
Version = "v1"
)
v1/doc.go
:
// +k8s:deepcopy-gen=package
// +groupName=falldio.com
package v1
vi/register.go
:
package v1
import (
"falldio.github.io/gpu-controller/pkg/apis/gpucontroller"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
var SchemeGroupVersion = schema.GroupVersion{
Group: gpucontroller.GroupName,
Version: gpucontroller.Version,
}
var (
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme
)
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
func Kind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(
SchemeGroupVersion,
&GPUTask{},
&GPUTaskList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}
v1/types.go
:
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +genclient
// +genclient:noStatus
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type GPUTask struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec GPUTaskSpec `json:"spec"`
}
type GPUTaskSpec struct {
Memory string `json:"memory"`
GPU string `json:"gpu"`
GPUAmount string `json:"gpu-amount"`
CPU string `json:"cpu"`
Priority string `json:"priority"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type GPUTaskList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`
Items []GPUTask `json:"items"`
}
执行脚本程序即可生成对应的deepcopy和客户端相关文件。
编写controller相关逻辑
GPUController及其工厂函数的逻辑如下:
type GPUController struct {
kubeClientset kubernetes.Interface
gpuTaskClientset clientset.Interface
gpuTasksLister listers.GPUTaskLister
gpuTasksSynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface
recorder record.EventRecorder
}
func NewController(
kubeClientset kubernetes.Interface,
gpuTaskClientset clientset.Interface,
gpuTaskInformer informers.GPUTaskInformer,
rescheduleDuration time.Duration,
) *GPUController {
utilruntime.Must(gputaskscheme.AddToScheme(scheme.Scheme))
klog.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerAgentName})
controller := &GPUController{
kubeClientset: kubeClientset,
gpuTaskClientset: gpuTaskClientset,
gpuTasksLister: gpuTaskInformer.Lister(),
gpuTasksSynced: gpuTaskInformer.Informer().HasSynced,
workqueue: workqueue.NewRateLimitingQueueWithConfig(
workqueue.DefaultControllerRateLimiter(),
workqueue.RateLimitingQueueConfig{
Name: "GPUTasks",
},
),
recorder: recorder,
}
klog.Info("Setting up event handlers")
gpuTaskInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.addGPUTask,
UpdateFunc: controller.updateGPUTask,
DeleteFunc: controller.deleteGPUTask,
})
if rescheduleDuration.Seconds() == 0 {
klog.Info("Reschduling is disabled")
} else {
klog.Info("Rescheduling is enabled")
ticker := time.NewTicker(rescheduleDuration)
go func() {
for range ticker.C {
controller.ReschedulePods()
}
}()
}
return controller
}
可见,其中包含k8s客户端、GPUTask客户端,GPUTaskInformer、Workqueue及事件记录相关对象。 另外,为了满足周期性调度的需要,这里还额外编写了利用ticker周期性唤起ReschedulePods的代码。
在Informer中,我们针对增、改、删资源事件注册相关逻辑,其实就是将对应事件存入workqueue中:
func (c *GPUController) addGPUTask(obj any) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
klog.Error(err)
runtime.HandleError(err)
}
c.workqueue.AddRateLimited(key)
}
func (c *GPUController) updateGPUTask(old any, new any) {
oldGPUTask := old.(*gpucontrollerv1.GPUTask)
newGPUTask := new.(*gpucontrollerv1.GPUTask)
if oldGPUTask.ResourceVersion == newGPUTask.ResourceVersion {
return
}
c.addGPUTask(new)
}
func (c *GPUController) deleteGPUTask(obj any) {
var key string
var err error
if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil {
runtime.HandleError(err)
return
}
c.workqueue.AddRateLimited(key)
}
我们在Run函数中持续消费workqueue:
func (c *GPUController) Run(ctx context.Context, workers int) error {
defer runtime.HandleCrash()
defer c.workqueue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting GPU Controller")
if !cache.WaitForCacheSync(ctx.Done(), c.gpuTasksSynced) {
return fmt.Errorf("timed out waiting for caches to sync")
}
klog.Info("Starting workers", "count", workers)
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
klog.Info("GPU Controller started")
<-ctx.Done()
klog.Info("Stopping GPU Controller")
return nil
}
func (c *GPUController) runWorker(ctx context.Context) {
for c.processNextItem(ctx) {
}
}
func (c *GPUController) processNextItem(ctx context.Context) bool {
obj, shutdown := c.workqueue.Get()
logger := klog.FromContext(ctx)
if shutdown {
return false
}
err := func(obj any) error {
defer c.workqueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.syncHanlder(key); err != nil {
return fmt.Errorf("syncing %q: %v", key, err)
}
c.workqueue.Forget(obj)
logger.Info("Successfully synced " + key)
return nil
}(obj)
if err != nil {
runtime.HandleError(err)
}
return true
}
func (c *GPUController) syncHanlder(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
klog.Infof("namespace: %s, name: %s", namespace, name)
gpuTask, err := c.gpuTasksLister.GPUTasks(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
klog.Infof("GPUTask '%s' in work queue no longer exists, deleting the Pod.", key)
c.kubeClientset.CoreV1().Pods(namespace).Delete(context.Background(), name, metav1.DeleteOptions{})
return nil
}
runtime.HandleError(fmt.Errorf("failed to list GPUTask by: %s/%s", namespace, name))
return err
}
klog.Infof("Syncing GPUTask '%+#v'", gpuTask)
newPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: gpuTask.Name,
Namespace: gpuTask.Namespace,
Labels: map[string]string{"app": "gpu-task"},
},
Spec: v1.PodSpec{
NodeSelector: map[string]string{
"falldio.com/gpu": gpuTask.Spec.GPU,
},
Containers: []v1.Container{
{
Name: "gpu-task",
Image: "nvidia/cuda:11.0-base",
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(gpuTask.Spec.CPU),
v1.ResourceMemory: resource.MustParse(gpuTask.Spec.Memory),
"falldio.com/gpu": resource.MustParse(gpuTask.Spec.GPUAmount),
},
},
},
},
PriorityClassName: string(gpuTask.Spec.Priority),
},
}
pod, err := c.kubeClientset.CoreV1().Pods(gpuTask.Namespace).Create(context.Background(), newPod, metav1.CreateOptions{})
if err != nil {
panic(err)
}
klog.Infof("Created Pod: %s", pod.Name)
c.recorder.Event(gpuTask, v1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
return nil
}
对于正常情况下的GPUTask,我们从workqueue可以获取事件,拿到GPUTask的key,再从gpuTaskLister获取该资源, 然后根据其信息通过kube客户端创建Pod即可。 如果出现删除事件,我们将无法得到GPUTask CDR这种情况下直接将Pod删除即可。
对于周期性调度:
func (c *GPUController) ReschedulePods() {
labelSelector := metav1.LabelSelector{
MatchLabels: map[string]string{"app": "gpu-task"},
}
podList, err := c.kubeClientset.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{
LabelSelector: metav1.FormatLabelSelector(&labelSelector),
})
if err != nil {
panic(err)
}
for _, pod := range podList.Items {
c.kubeClientset.CoreV1().Pods(pod.Namespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{})
c.kubeClientset.CoreV1().Pods(pod.Namespace).Create(context.Background(), &pod, metav1.CreateOptions{})
}
}
我们要做的也仅仅是找到由GPUTask得到的Pod,全部删除再创建一次。
编写入口程序
func main() {
klog.InitFlags(nil)
flag.Parse()
ctx := signals.SetupSignalHandler()
logger := klog.FromContext(ctx)
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
logger.Error(err, "Error building kubeconfig")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
logger.Info("Setting up client for kubernetes")
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
logger.Error(err, "Error building kubernetes clientset")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
logger.Info("Setting up client for gputask")
gpuTaskClientset, err := clientset.NewForConfig(cfg)
if err != nil {
logger.Error(err, "Error building gputask clientset")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
gpuTaskInformerFactory := informers.NewSharedInformerFactory(gpuTaskClientset, time.Second*30)
controller := NewController(kubeClient, gpuTaskClientset, gpuTaskInformerFactory.Falldio().V1().GPUTasks(), rescheduleDuration)
gpuTaskInformerFactory.Start(ctx.Done())
if err = controller.Run(ctx, 2); err != nil {
logger.Error(err, "Error running controller")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}
在main函数中,我们根据用户命令行传参(这里忽略flag声明和parse部分)完成客户端初始化、Informer生成和Controller初始化,并调用之前的Run函数进入到事件循环。
至此,整个GPU调度的基本逻辑已经完成。