Saturn的sharding 是由Console 来负责,对于每个namespace 是分开进行sharding控制。
Sharding的入口是创建一个namespaceShardingManager,创建在public class RegistryCenterServiceImpl的createNamespaceShardingManager方法的如下片段:
namespaceShardingManager = new NamespaceShardingManager(client, namespace,generateShardingLeadershipHostValue(), reportAlarmService, updateJobConfigService);namespaceShardingManager.start();if (namespaceShardingListenerManagerMap.putIfAbsent(nns, namespaceShardingManager) != null) {// 已经存在,则关闭当前的clienttry {namespaceShardingManager.stopWithCurator();} catch (Exception e) {(e.getMessage(), e);}} else {log.info("Done starting NamespaceShardingManager {}", nns);}
每隔5分钟RegistryCenterServiceImpl的private synchronized void localRefresh() 会检测是否需要创建新的NamespaceShardingManager。
每个Namespace的job sharding都是单线程进行,所有的sharding都是通过public class NamespaceShardingService 的private ExecutorService executorService 来执行,以下是private ExecutorService executorService的创建代码:
private ExecutorService newSingleThreadExecutor() {wSingleThreadExecutor(new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, namespace + "-" + r.getClass().getSimpleName());}});}
sharding的计算核心类是public abstract class AbstractAsyncShardingTask,通过代码搜索可以目前saturn有如下类负责sharding计算,从类名称就能看出各自是在什么场景下进行resharding:
Searching 545 files for "extends AbstractAsyncShardingTask"saturn-job-shardingsrcmainjavacomvipsaturnjobshardingtaskExecuteAllShardingTask.java:13 * 域下重排,移除已经存在所有executor,重新获取executors,重新获取作业shards14 */15: public class ExecuteAllShardingTask extends AbstractAsyncShardingTask {16 17 private static final Logger log = Logger(ExecuteAllShardingTask.class);saturn-job-shardingsrcmainjavacomvipsaturnjobshardingtaskExecuteExtractTrafficShardingTask.java:12 * 摘取executor流量,标记该executor的noTraffic为true,并移除其所有作业分片,只摘取所有非本地作业分片,设置totalLoadLevel为013 */14: public class ExecuteExtractTrafficShardingTask extends AbstractAsyncShardingTask {15 16 private static final Logger log = Logger(ExecuteExtractTrafficShardingTask.class);saturn-job-shardingsrcmainjavacomvipsaturnjobshardingtaskExecuteJobDisableShardingTask.java:11 * 作业禁用,摘取所有executor运行的该作业的shard,注意要相应地减loadLevel,不需要放回12 */13: public class ExecuteJobDisableShardingTask extends AbstractAsyncShardingTask {14 15 private static final Logger log = Logger(ExecuteJobDisableShardingTask.class);saturn-job-shardingsrcmainjavacomvipsaturnjobshardingtaskExecuteJobEnableShardingTask.java:12 * 作业启用,获取该作业的shards,注意要过滤不能运行该作业的executors13 */14: public class ExecuteJobEnableShardingTask extends AbstractAsyncShardingTask {15 16 private static final Logger log = Logger(ExecuteJobEnableShardingTask.class);saturn-job-shardingsrcmainjavacomvipsaturnjobshardingtaskExecuteJobForceShardShardingTask.java:12 * 作业重排,移除所有executor的该作业shard,重新获取该作业的shards,finally删除forceShard结点13 */14: public class ExecuteJobForceShardShardingTask extends AbstractAsyncShardingTask {15 16 private static final Logger log = Logger(ExecuteJobForceShardShardingTask.class);saturn-job-shardingsrcmainjavacomvipsaturnjobshardingtaskExecuteJobServerOfflineShardingTask.java:12 * 作业的executor下线,将该executor运行的该作业分片都摘取,如果是本地作业,则移除13 */14: public class ExecuteJobServerOfflineShardingTask extends AbstractAsyncShardingTask {15 16 private static final Logger log = Logger(ExecuteJobServerOfflineShardingTask.class);saturn-job-shardingsrcmainjavacomvipsaturnjobshardingtaskExecuteJobServerOnlineShardingTask.java:15 * 作业的executor上线,executor级别平衡摘取,但是只能摘取该作业的shard;添加的新的shard16 */17: public class ExecuteJobServerOnlineShardingTask extends AbstractAsyncShardingTask {18 19 private static final Logger log = Logger(ExecuteJobServerOnlineShardingTask.class);saturn-job-shardingsrcmainjavacomvipsaturnjobshardingtaskExecuteOfflineShardingTask.java:12 * executor下线,摘取该executor运行的所有非本地模式作业,移除该executor13 */14: public class ExecuteOfflineShardingTask extends AbstractAsyncShardingTask {15 16 private static final Logger log = Logger(ExecuteOfflineShardingTask.class);
回到AbstractAsyncShardingTask 的run函数及分析各个子类代码,可以发现各个场景不同,主要是override两个函数
其中customLastOnlineExecutorList 返回当前在线的executor,但关键还是pick返回
进行具体job shard 和executor mapping(分配)是函数protected void putBackBalancing的如下代码:
// 1、放回localMode的Shard// 如果配置了preferList,则选取preferList中的executor。// 如果preferList中的executor都挂了,则不转移;否则,选取没有接管该作业的executor列表的loadLevel最小的一个。// 如果没有配置preferList,则选取没有接管该作业的executor列表的loadLevel最小的一个。putBackShardWithLocalMode(shardList, noDockerTrafficExecutorsMapByJob,lastOnlineTrafficExecutorListMapByJob,localModeMap, preferListIsConfiguredMap, preferListConfiguredMap);// 2、放回配置了preferList的ShardputBackShardWithPreferList(shardList, lastOnlineTrafficExecutorListMapByJob, preferListIsConfiguredMap,preferListConfiguredMap, useDispreferListMap);// 3、放回没有配置preferList的ShardputBackShardWithoutPreferlist(shardList, noDockerTrafficExecutorsMapByJob);
Saturn 的executor 的loadlevel 是通过函数
private void putShardIntoExecutor(Shard shard, Executor executor)
在放入job shard的时候,将shard的loadlevel 累加上来的,一个job的多个分片可能分配在同一executor上。
而job shard的loadlevel 是需要使用者自行评估的,可以在console的job 配置页面的高级属性看到,默认都是1
最后,sharding结果写入zookeeper的时候,使用了事务模式,具体可以看public class NamespaceShardingContentService的public void persistJobsNecessaryInTransaction,使用了public interface CuratorTransactionFinal extends CuratorTransaction
关键sharding结果写入
/*** 获取$Jobs/xx/leader/sharding/necessary完整路径*/public static String getJobLeaderShardingNecessaryNodePath(String jobName) {return String.format("/%s/%s/%s/%s/%s", JOBS_NODE, jobName, "leader", "sharding", "necessary");}
写入内容为byte[] necessaryContent = Bytes(“UTF-8”);
结果如下图:
console 分片完成后,executor leader还会执行一次数据处理将分片设置到zookeeper的
job下面的server/%s/sharding,结果如下图(这种设计不是太理解,感觉有点啰嗦,同时要求executor做leader 选举)
本文发布于:2024-01-28 15:59:00,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/17064287438569.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |