VipShop Saturn 的job sharding 过程

阅读: 评论:0

VipShop Saturn 的job sharding 过程

VipShop Saturn 的job sharding 过程

Saturn的sharding 是由Console 来负责,对于每个namespace 是分开进行sharding控制。

Sharding的入口是创建一个namespaceShardingManager,创建在public class RegistryCenterServiceImpl的createNamespaceShardingManager方法的如下片段:

namespaceShardingManager = new NamespaceShardingManager(client, namespace,generateShardingLeadershipHostValue(), reportAlarmService, updateJobConfigService);namespaceShardingManager.start();if (namespaceShardingListenerManagerMap.putIfAbsent(nns, namespaceShardingManager) != null) {// 已经存在,则关闭当前的clienttry {namespaceShardingManager.stopWithCurator();} catch (Exception e) {(e.getMessage(), e);}} else {log.info("Done starting NamespaceShardingManager {}", nns);}

每隔5分钟RegistryCenterServiceImpl的private synchronized void localRefresh() 会检测是否需要创建新的NamespaceShardingManager。

每个Namespace的job sharding都是单线程进行,所有的sharding都是通过public class NamespaceShardingService 的private ExecutorService executorService 来执行,以下是private ExecutorService executorService的创建代码:

private ExecutorService newSingleThreadExecutor() {wSingleThreadExecutor(new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, namespace + "-" + r.getClass().getSimpleName());}});}

sharding的计算核心类是public abstract class AbstractAsyncShardingTask,通过代码搜索可以目前saturn有如下类负责sharding计算,从类名称就能看出各自是在什么场景下进行resharding:

Searching 545 files for "extends AbstractAsyncShardingTask"saturn-job-shardingsrcmainjavacomvipsaturnjobshardingtaskExecuteAllShardingTask.java:13   * 域下重排,移除已经存在所有executor,重新获取executors,重新获取作业shards14   */15: public class ExecuteAllShardingTask extends AbstractAsyncShardingTask {16  17  	private static final Logger log = Logger(ExecuteAllShardingTask.class);saturn-job-shardingsrcmainjavacomvipsaturnjobshardingtaskExecuteExtractTrafficShardingTask.java:12   * 摘取executor流量,标记该executor的noTraffic为true,并移除其所有作业分片,只摘取所有非本地作业分片,设置totalLoadLevel为013   */14: public class ExecuteExtractTrafficShardingTask extends AbstractAsyncShardingTask {15  16  	private static final Logger log = Logger(ExecuteExtractTrafficShardingTask.class);saturn-job-shardingsrcmainjavacomvipsaturnjobshardingtaskExecuteJobDisableShardingTask.java:11   * 作业禁用,摘取所有executor运行的该作业的shard,注意要相应地减loadLevel,不需要放回12   */13: public class ExecuteJobDisableShardingTask extends AbstractAsyncShardingTask {14  15  	private static final Logger log = Logger(ExecuteJobDisableShardingTask.class);saturn-job-shardingsrcmainjavacomvipsaturnjobshardingtaskExecuteJobEnableShardingTask.java:12   * 作业启用,获取该作业的shards,注意要过滤不能运行该作业的executors13   */14: public class ExecuteJobEnableShardingTask extends AbstractAsyncShardingTask {15  16  	private static final Logger log = Logger(ExecuteJobEnableShardingTask.class);saturn-job-shardingsrcmainjavacomvipsaturnjobshardingtaskExecuteJobForceShardShardingTask.java:12   * 作业重排,移除所有executor的该作业shard,重新获取该作业的shards,finally删除forceShard结点13   */14: public class ExecuteJobForceShardShardingTask extends AbstractAsyncShardingTask {15  16  	private static final Logger log = Logger(ExecuteJobForceShardShardingTask.class);saturn-job-shardingsrcmainjavacomvipsaturnjobshardingtaskExecuteJobServerOfflineShardingTask.java:12   * 作业的executor下线,将该executor运行的该作业分片都摘取,如果是本地作业,则移除13   */14: public class ExecuteJobServerOfflineShardingTask extends AbstractAsyncShardingTask {15  16  	private static final Logger log = Logger(ExecuteJobServerOfflineShardingTask.class);saturn-job-shardingsrcmainjavacomvipsaturnjobshardingtaskExecuteJobServerOnlineShardingTask.java:15   * 作业的executor上线,executor级别平衡摘取,但是只能摘取该作业的shard;添加的新的shard16   */17: public class ExecuteJobServerOnlineShardingTask extends AbstractAsyncShardingTask {18  19  	private static final Logger log = Logger(ExecuteJobServerOnlineShardingTask.class);saturn-job-shardingsrcmainjavacomvipsaturnjobshardingtaskExecuteOfflineShardingTask.java:12   * executor下线,摘取该executor运行的所有非本地模式作业,移除该executor13   */14: public class ExecuteOfflineShardingTask extends AbstractAsyncShardingTask {15  16  	private static final Logger log = Logger(ExecuteOfflineShardingTask.class);

回到AbstractAsyncShardingTask 的run函数及分析各个子类代码,可以发现各个场景不同,主要是override两个函数

  1. pick(allJobs, allEnableJobs, shardList, lastOnlineExecutorList, lastOnlineTrafficExecutorList))
  2. customLastOnlineExecutorList()

其中customLastOnlineExecutorList 返回当前在线的executor,但关键还是pick返回

  1. shardList – 需要重新分配的job shard
  2. lastOnlineExecutorList-- 在线executor,即负担job shard的executor
  3. customLastOnlineExecutorList-- 在线executor并且有负载,是2 executor的子集

进行具体job shard 和executor mapping(分配)是函数protected void putBackBalancing的如下代码:

// 1、放回localMode的Shard// 如果配置了preferList,则选取preferList中的executor。// 如果preferList中的executor都挂了,则不转移;否则,选取没有接管该作业的executor列表的loadLevel最小的一个。// 如果没有配置preferList,则选取没有接管该作业的executor列表的loadLevel最小的一个。putBackShardWithLocalMode(shardList, noDockerTrafficExecutorsMapByJob,lastOnlineTrafficExecutorListMapByJob,localModeMap, preferListIsConfiguredMap, preferListConfiguredMap);// 2、放回配置了preferList的ShardputBackShardWithPreferList(shardList, lastOnlineTrafficExecutorListMapByJob, preferListIsConfiguredMap,preferListConfiguredMap, useDispreferListMap);// 3、放回没有配置preferList的ShardputBackShardWithoutPreferlist(shardList, noDockerTrafficExecutorsMapByJob);

Saturn 的executor 的loadlevel 是通过函数
private void putShardIntoExecutor(Shard shard, Executor executor)
在放入job shard的时候,将shard的loadlevel 累加上来的,一个job的多个分片可能分配在同一executor上。

而job shard的loadlevel 是需要使用者自行评估的,可以在console的job 配置页面的高级属性看到,默认都是1


最后,sharding结果写入zookeeper的时候,使用了事务模式,具体可以看public class NamespaceShardingContentService的public void persistJobsNecessaryInTransaction,使用了public interface CuratorTransactionFinal extends CuratorTransaction
关键sharding结果写入

/*** 获取$Jobs/xx/leader/sharding/necessary完整路径*/public static String getJobLeaderShardingNecessaryNodePath(String jobName) {return String.format("/%s/%s/%s/%s/%s", JOBS_NODE, jobName, "leader", "sharding", "necessary");}

写入内容为byte[] necessaryContent = Bytes(“UTF-8”);
结果如下图:

console 分片完成后,executor leader还会执行一次数据处理将分片设置到zookeeper的
job下面的server/%s/sharding,结果如下图(这种设计不是太理解,感觉有点啰嗦,同时要求executor做leader 选举)

本文发布于:2024-01-28 15:59:00,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/17064287438569.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:过程   Saturn   VipShop   sharding   job
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23