全栈工程师开发手册 (作者:栾鹏)
一站式云原生机器学习平台
Volcano是一个基于Kubernetes的云原生批量计算平台,也是CNCF的首个批量计算项目。
volcano是华为开源出的分布式训练架构,github官方网址:
有时候单台机器多进程也无法快速完成代码运行,这个时候就需要多机器实现:
1、单机器算力有限,核数不足
2、有些运行有机器白名单显示,需要多台机器ip增加并发处理
volcano主要为我们提供index job, 也就是启动多个pod,并为每个pod提供index,role,以及其他role的访问地址。这样我们就可以用这些信息来做事情。
为了方便的实现一个volcano多机分布式集群,这里直接使用
开源的云原生一站式机器学习平台。
使用volcano这个模板,填上自己的worker数量,每个worker的镜像和启动命令就可以了
部署分布式volcano集群 平台已经我们实现了,我们只需要编写分布式的代码。 要想针对实现并发操作
1、通过环境变量VC_WORKER_NUM 有多少个worker
2、通过环境变量VC_TASK_INDEX实现当前pod是第几个worker
3、每个worker里面都判别一遍总共需要处理的数据,和当前worker需要处理的数据。
4、代码根据当前是第几个worker处理自己该做的工作。
保留单机的代码,添加识别集群信息的代码(多少个worker,当前worker是第几个),添加分工(只处理归属于当前worker的任务),
import time, datetime, json, requests, io, os
from multiprocessing import Pool
from functools import partial
import os, random, sysWORLD_SIZE = v('VC_WORKER_NUM', '1')) # 总worker的数目
RANK = v("VC_TASK_INDEX", '0')) # 当前是第几个worker 从0开始print(WORLD_SIZE, RANK)# 子进程要执行的代码
def task(key):print(w(),'worker:', RANK, ', task:', key, flush=True)time.sleep(1)if __name__ == '__main__':# if ists('./success%s' % RANK):# os.remove('./success%s' % RANK)input = range(300) # 所有要处理的数据local_task = [] # 当前worker需要处理的任务for index in input:if index % WORLD_SIZE == RANK:local_task.append(index) # 要处理的数据均匀分配到每个worker# 每个worker内部还可以用多进程,线程池之类的并发操作。pool = Pool(10) # 开辟包含指定数目线程的线程池pool.map(partial(task), local_task) # 当前worker,只处理分配给当前worker的任务pool.close()pool.join()# 添加文件标识,当前worker结束# open('./success%s' % RANK, mode='w').close()# # rank0做聚合操作# while (RANK == 0):# success = [x for x in range(WORLD_SIZE) if ists('./success%s' % x)]# if len(success) != WORLD_SIZE:# time.sleep(5)# else:# # 所有worker全部结束,worker0开始聚合操作# print('begin reduce')# break
本文发布于:2024-02-02 04:13:30,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170681841041279.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |