最近在支持CPU/GPU混布资源任务的时候,在我们自己开发的训练框架上,遇到volcano无法成功调度的情况,为此给volcano提了个issue。
最终发现是因为我们的kubeflow/common版本太低,导致没有给PodTemplate添加annotation volcano.sh/task-spec=replecaType
。
我们的crd类似于pytorch/tensorflow,需要创建一个master和多个worker,其中master只用到CPU资源,worker需要GPU资源。
为了保证集群中的GPU资源能够最大化使用,我们往集群里添加了CPU机器。将master节点调度到CPU机器上,worker调度到GPU机器上。
避免因master调度到GPU机器,导致GPU机器的卡分不完(极端情况下,如果有多个任务,master可能占用某个GPU机器的大量CPU资源,导致GPU卡分不完)。
这里有两种方式实现: nodeSelector或者nodeAffinity。
nodeSelector可以定义要匹配拥有哪些标签的node。
这里可以为CPU的node增加对应的标签,比如resType=CPU,并在master的PodSpec增加nodeSelector。
worker可以不需要nodeSelector,因为CPU机器没有GPU卡,肯定不满足资源需要。
下面借用tensorflow的crd作为例子:
apiVersion: kubeflow/v1
kind: TFJob
metadata:name: tf-test
spec:tfReplicaSpecs:Master:replicas: 1template:spec:containers:- name: tensorflowimage: docker.io/kubeflowkatib/tf-mnist-with-summaries:latestcommand:- sh- -c- sleep 10nodeSelector:resType: CPU -- 此处增加nodeSelectorWorker:replicas: 1template:spec:containers:- name: tensorflowimage: docker.io/kubeflowkatib/tf-mnist-with-summaries:latestcommand:- sh- -c- sleep 10resources:limits:nvidia/gpu: "1"requests:nvidia/gpu: "1"
nodeAffinity的表述能力更强,可以对label做In/NotIn等操作。
对于GPU机器,因为用的是nvidia的卡,为保证nvidia-device-plugin/dcgm-exporter这些组件正常工作,会给GPU机器打上label: nvidia-device-enable=enable
。
所以可以通过nodeAffinity的NotIn来实现,也无需单独为CPU机器增加label。
apiVersion: kubeflow/v1kind: TFJobmetadata:name: tf-testspec:tfReplicaSpecs:Master:replicas: 1template:spec:affinity:nodeAffinity: -- 此处增加nodeAffnitiy排除某些label的机器requiredDuringSchedulingIgnoredDuringExecution:nodeSelectorTerms:- matchExpressions:- key: nvidia-device-enableoperator: NotInvalues:- enablecontainers:- name: tensorflowimage: docker.io/kubeflowkatib/tf-mnist-with-summaries:latestcommand:- sh- -c- sleep 10Worker:replicas: 1template:spec:containers:- name: tensorflowimage: docker.io/kubeflowkatib/tf-mnist-with-summaries:latestcommand:- sh- -c- sleep 10resources:limits:nvidia/gpu: "1"requests:nvidia/gpu: "1"
在为master增加nodeSelector/nodeAffinity之后,发现volcano一直无法选择出合适的node。具体的日志可以参考文章开头的issue。
分析volcano-scheduler的日志发现,master和worker会依次尝试分配node。顺序可能是master先分配,也可能worker先分配。分配过程中,可选的node集合会被不断缩小。
比如master先分配,那么worker可选的node就只能是满足master条件的那些node,而worker又有自己的一些条件,所以worker可选的node集合就变成master条件和worker条件的交集,在这里的场景下交集为空(master需要cpu机器,worker需要GPU机器),就无法调度起来。
volcano调度任务的时候,会参考worker和master对node节点的条件。这里的条件有几部分,分别对应predicates plugin的一些参数。
/********************************************************* pkg/scheduler/plugins/ 155行 *********************************************************/
predicate := predicateEnable{nodeAffinityEnable: true, -- nodeAffinity/nodeSelectornodePortEnable: true,taintTolerationEnable: true,podAffinityEnable: true,nodeVolumeLimitsEnable: true,volumeZoneEnable: true,podTopologySpreadEnable: true,cacheEnable: false,proportionalEnable: false,}
此选项默认打开,打开后会考虑PodSpec里定义的nodeSelector和affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution进行节点选择。
GetRequiredNodeAffinity
函数定义了如何从PodSpec里取出对应的条件构造过滤器。Match过程也很简单,逐个条件判断即可。
/***************************************************************************************** vendor/k8s.io/component-helpers/scheduling/corev1/ 296行 *****************************************************************************************/
// GetRequiredNodeAffinity returns the parsing result of pod's nodeSelector and nodeAffinity.
func GetRequiredNodeAffinity(pod *v1.Pod) RequiredNodeAffinity {var selector labels.Selectorif len(pod.Spec.NodeSelector) > 0 {selector = labels.SelectorFromSet(pod.Spec.NodeSelector)}// Use LazyErrorNodeSelector for backwards compatibility of parsing errors.var affinity *LazyErrorNodeSelectorif pod.Spec.Affinity != nil &&pod.Spec.Affinity.NodeAffinity != nil &&pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {affinity = NewLazyErrorNodeSelector(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution)}return RequiredNodeAffinity{labelSelector: selector, nodeSelector: affinity}
}// Match checks whether the pod is schedulable onto nodes according to
// the requirements in both nodeSelector and nodeAffinity.
func (s RequiredNodeAffinity) Match(node *v1.Node) (bool, error) {if s.labelSelector != nil {if !s.labelSelector.Matches(labels.Set(node.Labels)) {return false, nil}}deSelector != nil {deSelector.Match(node)}return true, nil
}
nodeSelector和nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution都会转成labels.Requirement
结构(这个结构和corev1.NodeSelectorRequirement
差不多,只是operator更丰富些)。
/************************************************************ vendor/k8s.io/apimachinery/pkg/ 135行 ************************************************************/
// Requirement contains values, a key, and an operator that relates the key and values.
// The zero value of Requirement is invalid.
// Requirement implements both set based match and exact match
// Requirement should be initialized via NewRequirement constructor for creating a valid Requirement.
// +k8s:deepcopy-gen=true
type Requirement struct {key stringoperator selection.Operator// In huge majority of cases we have at most one value here.// It is generally faster to operate on a single-element slice// than on a single-element map, so we have a slice here.strValues []string
}/********************************************************* vendor/k8s.io/apimachinery/pkg/ *********************************************************/
// Operator represents a key/field's relationship to value(s).
// See labels.Requirement and fields.Requirement for more details.
type Operator stringconst (DoesNotExist Operator = "!"Equals Operator = "="DoubleEquals Operator = "=="In Operator = "in"NotEquals Operator = "!="NotIn Operator = "notin"Exists Operator = "exists"GreaterThan Operator = "gt"LessThan Operator = "lt"
)
研究完这里的逻辑,发现并没有什么疑点,还是得从master分配后,worker为何要从满足master条件的node中进一步筛选,而不是从全局筛选,这一角度来排查。
通过对比我们自己的operator和tf-operator,对开启Gang调度的任务,tf-operator里多设置了一个annotation:podTemplate.Annotations[
volcanoTaskSpecKey
] = rt
。
这里的volcanoTaskSpecKey=“volcano.sh/task-spec”,在volcano里搜索相关逻辑,可以找到如下函数:
/**************************************** pkg/scheduler/api/ 145行 ****************************************/
func getTaskID(pod *v1.Pod) TaskID {if ts, found := pod.Annotations[batch.TaskSpecKey]; found && len(ts) != 0 {return TaskID(ts)}return ""
}/**************************************** pkg/scheduler/api/ 262行 ****************************************/
func (ti *TaskInfo) GetTaskSpecKey() TaskID {if ti.Pod == nil {return ""}return getTaskID(ti.Pod)
}/************************************************* pkg/scheduler/util/ 103行 *************************************************/
func taskGroupID(task *api.TaskInfo) string {return fmt.Sprintf("%s/%s", task.Job, task.GetTaskSpecKey())
}
最后这个taskGroupID
函数会被predicateHelper.PredicateNodes
调用,其作用是对一个volcano job里的task进行分组,同一组task共享node校验失败的信息。
/************************************************ pkg/scheduler/util/ 23行 ************************************************/
// PredicateNodes returns the specified number of nodes that fit a task
func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeInfo, fn api.PredicateFn, enableErrorCache bool) ([]*api.NodeInfo, *api.FitErrors) {// ... 省略若干行代码 ...taskGroupid := taskGroupID(task)nodeErrorCache, taskFailedBefore := ph.taskPredicateErrorCache[taskGroupid] // 同一个taskGroupid共享nodeErrorCacheif nodeErrorCache == nil {nodeErrorCache = map[string]error{}}//create a context with cancellationctx, cancel := context.WithCancel(context.Background())checkNode := func(index int) {// ... 省略若干行代码 ...// Check if the task had "predicate" failure before.// And then check if the task failed to predict on this node before.if enableErrorCache && taskFailedBefore {errorLock.RLock()errC, ok := nodeErrorCache[node.Name]errorLock.RUnlock()if ok { // node不满足前面task的条件,直接判定当前task不能分配到这个nodeerrorLock.Lock()fe.SetNodeError(node.Name, errC)errorLock.Unlock()return}}// TODO (k82cn): Enable eCache for performance improvement.if err := fn(task, node); err != nil {klog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v",task.Namespace, task.Name, node.Name, err)errorLock.Lock()nodeErrorCache[node.Name] = errph.taskPredicateErrorCache[taskGroupid] = nodeErrorCache // node不满足当前task的条件,记录到缓存里fe.SetNodeError(node.Name, err)errorLock.Unlock()return}// ... 省略若干行代码 ...}// ... 省略若干行代码 ...
}
到此就可以明白为什么tf-operator要设置volcano.sh/task-spec
这个annotation了,通过对master和worker设置不同的annotation值,让其归属到不同的taskGroupid,从而避免共用nodeErrorCache,master和worker都可以各自从全局查找合适的node。
在自己的operator里加上类似于tf-operator的处理逻辑即可,即增加annotation: podTemplate.Annotations[
volcanoTaskSpecKey
] = rt
。
如果依赖的是kubeflow/common里面的ReconcilePod逻辑,需要升级kubeflow/common版本(至少0.4.7),社区的这个pull requests顺带修复了这里的问题。
本文发布于:2024-02-02 04:14:32,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170681847941286.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |