volcano中的task

阅读: 评论:0

volcano中的task

volcano中的task

最近在支持CPU/GPU混布资源任务的时候,在我们自己开发的训练框架上,遇到volcano无法成功调度的情况,为此给volcano提了个issue。

最终发现是因为我们的kubeflow/common版本太低,导致没有给PodTemplate添加annotation volcano.sh/task-spec=replecaType

背景

我们的crd类似于pytorch/tensorflow,需要创建一个master和多个worker,其中master只用到CPU资源,worker需要GPU资源。

为了保证集群中的GPU资源能够最大化使用,我们往集群里添加了CPU机器。将master节点调度到CPU机器上,worker调度到GPU机器上。

避免因master调度到GPU机器,导致GPU机器的卡分不完(极端情况下,如果有多个任务,master可能占用某个GPU机器的大量CPU资源,导致GPU卡分不完)。

这里有两种方式实现: nodeSelector或者nodeAffinity。

nodeSelector

nodeSelector可以定义要匹配拥有哪些标签的node。

这里可以为CPU的node增加对应的标签,比如resType=CPU,并在master的PodSpec增加nodeSelector。

worker可以不需要nodeSelector,因为CPU机器没有GPU卡,肯定不满足资源需要。

下面借用tensorflow的crd作为例子:

apiVersion: kubeflow/v1
kind: TFJob
metadata:name: tf-test
spec:tfReplicaSpecs:Master:replicas: 1template:spec:containers:- name: tensorflowimage: docker.io/kubeflowkatib/tf-mnist-with-summaries:latestcommand:- sh- -c- sleep 10nodeSelector:resType: CPU           -- 此处增加nodeSelectorWorker:replicas: 1template:spec:containers:- name: tensorflowimage: docker.io/kubeflowkatib/tf-mnist-with-summaries:latestcommand:- sh- -c- sleep 10resources:limits:nvidia/gpu: "1"requests:nvidia/gpu: "1"

nodeAffinity

nodeAffinity的表述能力更强,可以对label做In/NotIn等操作。

对于GPU机器,因为用的是nvidia的卡,为保证nvidia-device-plugin/dcgm-exporter这些组件正常工作,会给GPU机器打上label: nvidia-device-enable=enable

所以可以通过nodeAffinity的NotIn来实现,也无需单独为CPU机器增加label。

    apiVersion: kubeflow/v1kind: TFJobmetadata:name: tf-testspec:tfReplicaSpecs:Master:replicas: 1template:spec:affinity:nodeAffinity:                  -- 此处增加nodeAffnitiy排除某些label的机器requiredDuringSchedulingIgnoredDuringExecution:nodeSelectorTerms:- matchExpressions:- key: nvidia-device-enableoperator: NotInvalues:- enablecontainers:- name: tensorflowimage: docker.io/kubeflowkatib/tf-mnist-with-summaries:latestcommand:- sh- -c- sleep 10Worker:replicas: 1template:spec:containers:- name: tensorflowimage: docker.io/kubeflowkatib/tf-mnist-with-summaries:latestcommand:- sh- -c- sleep 10resources:limits:nvidia/gpu: "1"requests:nvidia/gpu: "1"

问题表现

在为master增加nodeSelector/nodeAffinity之后,发现volcano一直无法选择出合适的node。具体的日志可以参考文章开头的issue。

分析volcano-scheduler的日志发现,master和worker会依次尝试分配node。顺序可能是master先分配,也可能worker先分配。分配过程中,可选的node集合会被不断缩小。

比如master先分配,那么worker可选的node就只能是满足master条件的那些node,而worker又有自己的一些条件,所以worker可选的node集合就变成master条件和worker条件的交集,在这里的场景下交集为空(master需要cpu机器,worker需要GPU机器),就无法调度起来。

源码阅读

volcano调度任务的时候,会参考worker和master对node节点的条件。这里的条件有几部分,分别对应predicates plugin的一些参数。

/********************************************************* pkg/scheduler/plugins/ 155行 *********************************************************/
predicate := predicateEnable{nodeAffinityEnable:      true,     -- nodeAffinity/nodeSelectornodePortEnable:          true,taintTolerationEnable:   true,podAffinityEnable:       true,nodeVolumeLimitsEnable:  true,volumeZoneEnable:        true,podTopologySpreadEnable: true,cacheEnable:             false,proportionalEnable:      false,}

predicate.NodeAffinityEnable

此选项默认打开,打开后会考虑PodSpec里定义的nodeSelector和affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution进行节点选择。

GetRequiredNodeAffinity函数定义了如何从PodSpec里取出对应的条件构造过滤器。Match过程也很简单,逐个条件判断即可。

/***************************************************************************************** vendor/k8s.io/component-helpers/scheduling/corev1/ 296行 *****************************************************************************************/
// GetRequiredNodeAffinity returns the parsing result of pod's nodeSelector and nodeAffinity.
func GetRequiredNodeAffinity(pod *v1.Pod) RequiredNodeAffinity {var selector labels.Selectorif len(pod.Spec.NodeSelector) > 0 {selector = labels.SelectorFromSet(pod.Spec.NodeSelector)}// Use LazyErrorNodeSelector for backwards compatibility of parsing errors.var affinity *LazyErrorNodeSelectorif pod.Spec.Affinity != nil &&pod.Spec.Affinity.NodeAffinity != nil &&pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {affinity = NewLazyErrorNodeSelector(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution)}return RequiredNodeAffinity{labelSelector: selector, nodeSelector: affinity}
}// Match checks whether the pod is schedulable onto nodes according to
// the requirements in both nodeSelector and nodeAffinity.
func (s RequiredNodeAffinity) Match(node *v1.Node) (bool, error) {if s.labelSelector != nil {if !s.labelSelector.Matches(labels.Set(node.Labels)) {return false, nil}}deSelector != nil {deSelector.Match(node)}return true, nil
}

nodeSelector和nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution都会转成labels.Requirement结构(这个结构和corev1.NodeSelectorRequirement差不多,只是operator更丰富些)。

/************************************************************ vendor/k8s.io/apimachinery/pkg/ 135行 ************************************************************/
// Requirement contains values, a key, and an operator that relates the key and values.
// The zero value of Requirement is invalid.
// Requirement implements both set based match and exact match
// Requirement should be initialized via NewRequirement constructor for creating a valid Requirement.
// +k8s:deepcopy-gen=true
type Requirement struct {key      stringoperator selection.Operator// In huge majority of cases we have at most one value here.// It is generally faster to operate on a single-element slice// than on a single-element map, so we have a slice here.strValues []string
}/********************************************************* vendor/k8s.io/apimachinery/pkg/ *********************************************************/
// Operator represents a key/field's relationship to value(s).
// See labels.Requirement and fields.Requirement for more details.
type Operator stringconst (DoesNotExist Operator = "!"Equals       Operator = "="DoubleEquals Operator = "=="In           Operator = "in"NotEquals    Operator = "!="NotIn        Operator = "notin"Exists       Operator = "exists"GreaterThan  Operator = "gt"LessThan     Operator = "lt"
)

研究完这里的逻辑,发现并没有什么疑点,还是得从master分配后,worker为何要从满足master条件的node中进一步筛选,而不是从全局筛选,这一角度来排查。

通过对比我们自己的operator和tf-operator,对开启Gang调度的任务,tf-operator里多设置了一个annotation:podTemplate.Annotations[volcanoTaskSpecKey] = rt

这里的volcanoTaskSpecKey=“volcano.sh/task-spec”,在volcano里搜索相关逻辑,可以找到如下函数:

/**************************************** pkg/scheduler/api/ 145行 ****************************************/
func getTaskID(pod *v1.Pod) TaskID {if ts, found := pod.Annotations[batch.TaskSpecKey]; found && len(ts) != 0 {return TaskID(ts)}return ""
}/**************************************** pkg/scheduler/api/ 262行 ****************************************/
func (ti *TaskInfo) GetTaskSpecKey() TaskID {if ti.Pod == nil {return ""}return getTaskID(ti.Pod)
}/************************************************* pkg/scheduler/util/ 103行 *************************************************/
func taskGroupID(task *api.TaskInfo) string {return fmt.Sprintf("%s/%s", task.Job, task.GetTaskSpecKey())
}

最后这个taskGroupID函数会被predicateHelper.PredicateNodes调用,其作用是对一个volcano job里的task进行分组,同一组task共享node校验失败的信息。

/************************************************ pkg/scheduler/util/ 23行 ************************************************/
// PredicateNodes returns the specified number of nodes that fit a task
func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn, enableErrorCache bool) ([]*api.NodeInfo, *api.FitErrors) {// ... 省略若干行代码 ...taskGroupid := taskGroupID(task)nodeErrorCache, taskFailedBefore := ph.taskPredicateErrorCache[taskGroupid]   // 同一个taskGroupid共享nodeErrorCacheif nodeErrorCache == nil {nodeErrorCache = map[string]error{}}//create a context with cancellationctx, cancel := context.WithCancel(context.Background())checkNode := func(index int) {// ... 省略若干行代码 ...// Check if the task had "predicate" failure before.// And then check if the task failed to predict on this node before.if enableErrorCache && taskFailedBefore {errorLock.RLock()errC, ok := nodeErrorCache[node.Name]errorLock.RUnlock()if ok {                               // node不满足前面task的条件,直接判定当前task不能分配到这个nodeerrorLock.Lock()fe.SetNodeError(node.Name, errC)errorLock.Unlock()return}}// TODO (k82cn): Enable eCache for performance improvement.if err := fn(task, node); err != nil {klog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v",task.Namespace, task.Name, node.Name, err)errorLock.Lock()nodeErrorCache[node.Name] = errph.taskPredicateErrorCache[taskGroupid] = nodeErrorCache  // node不满足当前task的条件,记录到缓存里fe.SetNodeError(node.Name, err)errorLock.Unlock()return}// ... 省略若干行代码 ...}// ... 省略若干行代码 ...
}

到此就可以明白为什么tf-operator要设置volcano.sh/task-spec这个annotation了,通过对master和worker设置不同的annotation值,让其归属到不同的taskGroupid,从而避免共用nodeErrorCache,master和worker都可以各自从全局查找合适的node。

解决方案

在自己的operator里加上类似于tf-operator的处理逻辑即可,即增加annotation: podTemplate.Annotations[volcanoTaskSpecKey] = rt

如果依赖的是kubeflow/common里面的ReconcilePod逻辑,需要升级kubeflow/common版本(至少0.4.7),社区的这个pull requests顺带修复了这里的问题。

本文发布于:2024-02-02 04:14:32,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170681847941286.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:volcano   task
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23