# -*- coding: UTF-8 -*- #单机单卡 #对于单机单卡,可以吧参数和计算都定义再GPU上,不过如果参数模型比较大,显存不足等,就得放在CPU上 import tensorflow as tf with tf.device("/cpu:0"): #也可以放在GPU上w = tf.get_variable('w', (2,2), tf.float32, initializerstant_initializer(2))b = tf.get_variable('b', (2,2), tf.float32, initializerstant_initializer(5))with tf.device("/gpu:0"):addwb = w+bmutwb = w*bini = tf.initialize_all_variables() with tf.Session() as sess:sess.run(ini)np1, np2 = sess.run([addwb, mutwb])print(np1)print(np2)
"""#数据并行 分布式训练中的数据并行方法在每一个worker machine 上都有一套完整的模型,单分别对训练数据集的不通子集进行处理 数据并行训练方法均需要一些整合结果和在各工作器(worker)间同步模型参数的方法1参数平均vs.基于更新(梯度)的方法:所有的参数加起来除以参数个数 1.1参数平均 (1)根据模型配置随机初始化网络参数 (2)将现有的参数的一个副本分配给每一个worker machine (3)在该数据的一个子集上对每一个worker进行训练 (4)从每一个worker的平均参数上设置一个全局参数 (5)当还需要处理更多数据时,回到第2步2同步VS异步的方法 同步:每次计算都需要等到所有GPU运行完成之后,问题:等待时间长影响GPU利用率,取决于运算最慢的机器运行时间,机器之间通信太浪费如果一台机器卡住/计算结果无穷大不会收敛,会导致所有参数没有上报Parameter Server服务器 异步:只要有一台计算完成,则会这台传输Parameter Server,会出现梯度失效问题。(开始参数都相同, 但是可能一个设备完成迭代后,发现模型参数被其它更新)只能实现梯度次优解,不能实现最优解 不需要所有运算完成再计算。 3集中式VS.分布式的同步模型并行(一般情况下不采用模型并行,) 把模型部署到很多机器上去运行,当神经网络模型很大的时候GPU显存会有限制,很难跑在单个GPU上,所以需要GPU并行 但实际上,层与层只有有约束,后面的层需要后面的层作为输入。 如果模型本身含有并行模块,则可以进行模型并行的训练需要有参数服务器:K/V格式 ,再加上参数更新(并行文件系统)。计算节点为worker节点 ps服务器一般node0, node1, worker节点node2,node3,分布式包含两种格式 in-graph模式:把计算节点从单机多GPU扩展到了多机多GPU, 不过数据分发还是在一个节点。这样的好处是配置简单,但是这样的坏处是训练数据的分发依然在一个节点上, 要把训练数据分发到不通的机器上,严重影响并发训练速度。在大数据训练的情况下,不推荐使用这种模式 between-graph模式下,训练的参数保存在参数服务器,数据不用分发,数据分片的保存在各个计算节点,各个计算节点自己算自己的,算完之后,把要更新的参数告诉参数服务器,参数服务器更新参数。这种模式的优点是不用训练数据的分发,尤其是在数据量在TB级的时候,所以大数据深度学习推荐使用between-graph模式。 """ # -*- coding: UTF-8 -*- #单机多卡 #各个GPU通过各自计算各自batch数据的梯度值,然后统一传到CPU上, 由CPU计算求取平均值,CPU更新参数 import tensorflow as tf with tf.device("/cpu:0"):w = tf.get_variable('w', (2,2), tf.float32, initializerstant_initializer(2))b = tf.get_variable('b', (2, 2), tf.float32, initializerstant_initializer(5)) with tf.device("/gpu:0"):addwb = w + b with tf.device("/gpu:1"):mutwb = w*b ini = tf.initialize_all_variables() with tf.Session() as sess:sess.run(ini)while 1:print(sess.run([addwb, mutwb]))
#多机单卡的关键点 "机"可以指什么? 物理机/docker容器 "多机"之间如何联通,如何交换信息? "多机单卡"相对于"单机多卡"性能比较 基本概念。Cluster。Job。task # -*- coding: UTF-8 -*- import tensorflow as tf #现在假设我们有A,B,C,D四台机器,每台机器上由一个GPU,首先需要在各台机器上写一份代码,跑起来, 各机器上的代码内容大部分相同 #除了开始定义的时候,需要各自指定该台机器的task之外,以机器A为离子, A机器上的代码如下 cluster = tf.train.ClusterSpec({"worker":["A_IP:2222", #IP地址:端口号,第一台机器A的IP地址,在代码中需要用这台机器计算的时候,就要定义:/job:worker/task:0for"B_IP:1234", #第二台机器的IP地址/job:worker/task:1"C_IP:2222"#第三台机器的IP地址/job:worker/task:2],"ps":["D_IP:2222",#第四台机器的IP地址对应到代码块:/job:ps/task:0]})# -*- coding: UTF-8 -*- #上面是因为worker计算内容各不相同,不过再深度学习中,一般每个worker的计算内容是一样的 #以为都是计算神经网络的每个batch前向传导,所以一般代码是重用的 import tensorflow as tf #现在假设我们有A,B台机器,首先需要在各台机器上写一份代码,并跑起来,各机器上的内容大部分相同 cluster = tf.train.ClusterSpec({"worker":["192.168.11.105:1234"], #省略所有IP都需要写入"ps":["192.168.11.130:2223"]})
#多机多卡 import tensorflow as tf var = tf.Variable(initial_value=0.0)#第一步,我们需要为每个进程创建自己的会话 sess1 = tf.Session() sess2 = tf.Session()sess1.run(tf.global_variables_initializer()) sess2.run(tf.global_variables_initializer())print("Initial value of var in session 1:", sess1.run(var)) print("Initial value of var in session 2:", sess2.run(var))sess1.run(var.assign_add(1.0)) sess2.run(var.assign_add(2.0)) print("Incremented var in session 1") print("Value of var in session 1: ", sess1.run(var)) print("Value of var in session 2: ", sess2.run(var))#Distributed TensorFlow import tensorflow as tf c = tf.constant("Hello, Distributed TensorFlow!") #创建一个本地TensorFlow集群 server = tf.ate_local_server() #在集群上创建一个会话 sess = tf.Session(server.target) print(sess.run(c))#第一步是定义集群的规模。我们从最简单的集群开始:即两台服务器(两个任务),它们都在同一台机器上,一个在 2222 端口,一个在 2223 端口。 tasks = ["localhost:2222", "localhost:2223"] #每个任务都与「工作」(job)相关联,该工作是相关任务的集合。我们将这两个任务与一个称为「local」的工作相关联。 jobs = {"local": tasks} #所有这些即定义为一个集群。 cluster = tf.train.ClusterSpec(jobs) server1 = tf.train.Server(cluster, job_name="local", task_index=0)server2 = tf.train.Server(cluster, job_name="local", task_index=1) #特性:任何具有相同名称的变量都将在所有服务器之间共享。 tf.reset_default_graph() var = tf.Variable(initial_value=0.0, name='var') sess1 = tf.Session(server1.target) sess2 = tf.Session(server2.target) sess1.run(tf.global_variables_initializer()) sess2.run(tf.global_variables_initializer())print("Initial value of var in session 1:", sess1.run(var)) print("Initial value of var in session 2:", sess2.run(var))sess1.run(var.assign_add(1.0)) print("Incremented var in session 1")print("Value of var in session 1:", sess1.run(var)) print("Value of var in session 2:", sess2.run(var)) print("OK------------------------------------") #存放 def run_with_location_trace(sess, op):run_options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)run_metadata = tf.RunMetadata()sess.run(op, options=run_options, run_metadata=run_metadata)for device in run_metadata.step_stats.dev_stats:print(device.device)for node de_stats:print(" ", de_name) run_with_location_trace(sess1, var) run_with_location_trace(sess1, var.assign_add(1.0)) run_with_location_trace(sess2, var)with tf.device("/job:local/task:0"):var1 = tf.Variable(0.0, name='var1') with tf.device("/job:local/task:1"):var2 = tf.Variable(0.0, name='var2')# (This will initialize both variables) sess1.run(tf.global_variables_initializer()) run_with_location_trace(sess1, var1) run_with_location_trace(sess1, var2) run_with_location_trace(sess2, var2) run_with_location_trace(sess2, var1)#计算图 cluster = tf.train.ClusterSpec({"local": ["localhost:2224", "localhost:2225"]}) server1 = tf.train.Server(cluster, job_name="local", task_index=0) server2 = tf.train.Server(cluster, job_name="local", task_index=1) graph1 = tf.Graph() with graph1.as_default():var1 = tf.Variable(0.0, name='var') sess1 = tf.Session(target=server1.target, graph=graph1) _operations()) graph2 = tf.Graph() sess2 = tf.Session(target=server2.target, graph=graph2) _operations()) with graph2.as_default():var2 = tf.Variable(0.0, name='var') sess1.run(var1.assign(1.0)) sess2.run(var2)#实现细节 import tensorflow as tf #谁负责初始化共享变量 def s1():server1 = tf.train.Server(cluster,job_name="local",task_index=0)var = tf.Variable(0.0, name='var')sess1 = tf.Session(server1.target)print("Server 1: waiting ")sess1.port_uninitialized_variables())while len(sess1.port_uninitialized_variables())) > 0:print("Server 1: waiting ")sleep(1.0)print("Server 1: variables initialized!")def s2():server2 = tf.train.Server(cluster,job_name="local",task_index=1)var = tf.Variable(0.0, name='var')sess2 = tf.Session(server2.target)for i in range(3):print("Server 2: waiting %d seconds "% (3 - i))sleep(1.0)sess2.run(tf.global_variables_initializer())p1 = Process(target=s1, daemon=True) p2 = Process(target=s2, daemon=True) p1.start() p2.start()p1.terminate() p2.terminate()#示例 #我们会创建:#一个存储单个变量 var 的参数服务器。 #两个工作站任务(worker task),每个工作站将多次增加变量 var 的值。 我们将让参数服务器多输出几次 var 的值,以便查看其变化。 import tensorflow as tf from multiprocessing import Process from time import sleepcluster = tf.train.ClusterSpec({"worker": ["localhost:3333","localhost:3334",],"ps": ["localhost:3335"] })def parameter_server():with tf.device("/job:ps/task:0"):var = tf.Variable(0.0, name='var')server = tf.train.Server(cluster,job_name="ps",task_index=0)sess = tf.Session(target=server.target)print("Parameter server: waiting for ")sess.port_uninitialized_variables())print("Parameter server: cluster ready!")print("Parameter server: ")sess.run(tf.global_variables_initializer())print("Parameter server: variables initialized")for i in range(5):val = sess.run(var)print("Parameter server: var has value %.1f" % val)sleep(1.0)print("Parameter server: ")server.join()def worker(worker_n):with tf.device("/job:ps/task:0"):var = tf.Variable(0.0, name='var')server = tf.train.Server(cluster,job_name="worker",task_index=worker_n)sess = tf.Session(target=server.target)print("Worker %d: waiting for " % worker_n)sess.port_uninitialized_variables())print("Worker %d: cluster ready!" % worker_n)while sess.port_uninitialized_variables()):print("Worker %d: waiting for " % worker_n)sleep(1.0)print("Worker %d: variables initialized" % worker_n)for i in range(5):print("Worker %d: incrementing var" % worker_n)sess.run(var.assign_add(1.0))sleep(1.0)print("Worker %d: " % worker_n)server.join()ps_proc = Process(target=parameter_server, daemon=True) w1_proc = Process(target=worker, args=(0,), daemon=True) w2_proc = Process(target=worker, args=(1,), daemon=True) ps_proc.start() w1_proc.start() w2_proc.start() for proc in [w1_proc, w2_proc, ps_proc]:inate() #如何将多个 TensorFlow 执行引擎(运行在不同进程或不同机器上)集成为一个集群,以便共享变量。 #如何为变量或操作指定服务器。 #图内复制与图间复制。 #如何等待变量被集群中的另一个任务初始化。
本文发布于:2024-02-01 16:22:41,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170677576337912.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |