Spark本地提交Volcano调度支持方案

阅读: 评论:0

Spark本地提交Volcano调度支持方案

Spark本地提交Volcano调度支持方案

重构Volcano客户端vcctl

1.1新增命令行

在vcctl中新增命令行

rootCmd.AddCommand(buildSpark())
rootCmd.AddCommand(buildSparkOperator())

1.2创建命令行根及指令

func buildSparkOperator() *cobra.Command {// 创建根sparkOperatorCmd := &cobra.Command{Use:   "spark-operator",Short: "spark-operator cmd",}sparkSubmitCmd := &cobra.Command{Use:   "spark-operator",Short: "spark operator",Run: func(cmd *cobra.Command, args []string) {checkError(cmd, spark_operator.RunSparkOperatorSubmit())},}// 初始化flagspark_operator.InitSparkOperatorFlags(sparkSubmitCmd)sparkOperatorCmd.AddCommand(sparkSubmitCmd)return sparkOperatorCmd
}

1.3构造yaml文件并提交本地jar包至文件服务器

func RunSparkOperatorSubmit() error {//获取文件名称//  /opt/spark-examples_2.11-2.4.4.jarfilePathSplit := strings.Split(cf.FilePath, "/")cf.FileName = filePathSplit[len(filePathSplit)-1]//修改镜像内文件路径sf.Spec.MainApplicationFile = "local:///opt/spark/examples/target/scala-2.11/jars/" + cf.FileNamesf.Spec.Volumes.Volume = []Volume{{Name: cf.VolumeName, HostPath: HostPath{cf.HostPath, cf.HostPathType}}}//构造标签sf.Spec.Driver.Labels = map[string]string{cf.DriverLabel: cf.DriverLabelValue, "odin.k8s.io/spark": "true", "odin.io/filename": cf.FileName, &#istry/addr": "10.180.210.196"}sf.Spec.Driver.VolumeMounts.VolumeMount = []VolumeMount{{Name: cf.DriverVolumeMountName, MountPath: cf.DriverVolumeMountPath}}sf.Spec.Executor.Labels = map[string]string{cf.ExecutorLabel: cf.ExecutorLabelValue, "odin.k8s.io/spark": "true", "odin.io/filename": cf.FileName, &#istry/addr": "10.180.210.196"}sf.Spec.Executor.VolumeMounts.VolumeMount = []VolumeMount{{Name: cf.ExecutorVolumeMountName, MountPath: cf.ExecutorVolumeMountPath}}//构建yaml文件流fs, err := yaml.Marshal(&sf)if err != nil {println(err.Error())}//创建yaml文件f, err := os.Create(sf.Metadata.Name + ".yaml")if err != nil {fmt.Println(err)}//删除多余标签行用于匹配apirmVolume := regexp.MustCompile("volume:n    ")rmVolumeMount := regexp.MustCompile("volumeMount:n      ")yamlString := string(fs)yamlString = rmVolume.ReplaceAllString(yamlString, "")yamlString = rmVolumeMount.ReplaceAllString(yamlString, "")//写入文件_, err = f.WriteString(yamlString)if err != nil {fmt.Println(err)f.Close()}//上传jar包uploadFile(cf.FilePath, "10.180.210.37:33332/upload")//执行命令行cmd := exec.Command("/bin/bash", "-c", "kubectl apply -f "+f.Name())output, err := cmd.Output()if err != nil {return err}fmt.Printf("Execute Shell:%s finished with output:n%s", cmd, string(output))return err
}

yaml文件结构体定义如下

type sparkOperatorFlags struct {ApiVersion string `yaml:"apiVersion"`Kind       string `yaml:"kind"`Metadata   struct {Name      string `yaml:"name"`Namespace string `yaml:"namespace"`}Spec struct {Types               string `yaml:"type"`Mode                string `yaml:"mode"`Image               string `yaml:"image"`ImagePullPolicy     string `yaml:"imagePullPolicy"`MainClass           string `yaml:"mainClass"`MainApplicationFile string `yaml:"mainApplicationFile"`SparkVersion        string `yaml:"sparkVersion"`BatchScheduler      string `yaml:"batchScheduler"`RestartPolicy       struct {Types string `yaml:"type"`}Volumes struct {Volume []Volume `yaml:"volume"`}Driver struct {Cores          int               `yaml:"cores"`CoreLimit      string            `yaml:"coreLimit"`Memory         string            `yaml:"memory"`Labels         map[string]string `yaml:"labels"`ServiceAccount string            `yaml:"serviceAccount"`VolumeMounts   struct {VolumeMount []VolumeMount `yaml:"volumeMount"`}}Executor struct {Cores        int               `yaml:"cores"`Instances    int               `yaml:"instances"`Memory       string            `yaml:"memory"`Labels       map[string]string `yaml:"labels"`VolumeMounts struct {VolumeMount []VolumeMount `yaml:"volumeMount"`}}}
}type VolumeMount struct {Name      string `yaml:"name"`MountPath string `yaml:"mountPath"`
}type Volume struct {Name     string   `yaml:"name"`HostPath HostPath `yaml:"hostPath"`
}type HostPath struct {Path  string `yaml:"path"`Types string `yaml:"type"`
}

1.4修改webhook,使volcano能够拦截含有标签的请求

const (// DefaultQueue constant stores the name of the queue as "default"DefaultQueue = "default"defaultSchedulerName = "volcano"INIT_CONTAINER_NAME = "spark-init"ODIN_FILE_SERVER_ADDR = "10.180.210.37"//"odin-file-server"ODIN_FILE_SERVER_PORT = 80ODIN_FILE_DOWNLOAD_KEY = "odin.io/filename"ODIN_IMAGE_REGISTRY_ADDR_KEY = &#istry/addr"ODIN_CONFIRM_SPARK_APP_KEY = "odin.k8s.io/spark"ODIN_APP_EXEC_PATH="/opt/spark/examples/target/scala-2.11/jars/"ODIN_BASE_IMAGE="library/centos-ssh:latest"
)func init() {router.RegisterAdmission(service)
}// 创建MutatingWebhookConfiguration对象
var service = &router.AdmissionService{Path: "/pods/mutate",// 路由回调Func: MutatePods,// 拦截匹配条件MutatingConfig: &whv1beta1.MutatingWebhookConfiguration{Webhooks: []whv1beta1.MutatingWebhook{{Name: "mutatepod.volcano.sh",Rules: []whv1beta1.RuleWithOperations{{Operations: []whv1beta1.OperationType{whv1beta1.Create},Rule: whv1beta1.Rule{APIGroups:   []string{""},APIVersions: []string{"v1"},Resources:   []string{"pods"},},},},}},},
}

本文发布于:2024-02-02 04:12:54,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170681837541276.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:方案   Spark   Volcano
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23