首先订正下quickstart 的做法:saturn 的job 同时保存在数据库及zookeeper上,修改job不能只修改zookeeper上的数据,都应该通过console的rest api或者界面实现。
executor 不连接数据库,所有对job的修改都是通过zookeeper watcher触发的。
先上一个zookeeper 截图:
saturn对job的watcher分布在很多类中,跟踪分析要注意这一点;
对应新增加job是通过watch $jobs来实现的,具体代码在InitNewJobService的如下代码:
class InitNewJobListener implements TreeCacheListener {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {if (event == null) {return;}ChildData data = Data();if (data == null) {return;}String path = Path();if (path == null || path.equals(JobNodePath.ROOT)) {return;}TreeCacheEvent.Type type = Type();if (type == null || !type.equals(TreeCacheEvent.Type.NODE_ADDED)) {return;}String jobName = StringUtils.substringAfterLast(path, "/");String jobClassPath = NodeFullPath(jobName, ConfigurationNode.JOB_CLASS);// wait 5 seconds at most until jobClass created.for (int i = 0; i < WAIT_JOBCLASS_ADDED_COUNT; i++) {if (!regCenter.isExisted(jobClassPath)) {Thread.sleep(200L);continue;}LogUtils.info(log, jobName, "new job: {} 's jobClass created event received", jobName);if (!ains(jobName)) {if (canInitTheJob(jobName) && initJobScheduler(jobName)) {jobNames.add(jobName);LogUtils.info(log, jobName, "the job {} initialize successfully", jobName);}} else {LogUtils.warn(log, jobName,"the job {} is unnecessary to initialize, because it's already existing", jobName);}break;}}
同时可以注意如下代码实现了job group 管理:
/*** 如果Executor配置了groups,则只能初始化属于groups的作业;否则,可以初始化全部作业*/private boolean canInitTheJob(String jobName) {Set<String> executorGroups = SystemEnvProperties.VIP_SATURN_INIT_JOB_BY_GROUPS;if (executorGroups.isEmpty()) {return true;}String jobGroups = NodeFullPath(jobName, ConfigurationNode.GROUPS));if (StringUtils.isNotBlank(jobGroups)) {String[] split = jobGroups.split(",");for (String temp : split) {if (StringUtils.isBlank(temp)) {continue;}if (im())) {return true;}}}LogUtils.info(log, jobName, "the job {} wont be initialized, because it's not in the groups {}", jobName,executorGroups);return false;}
其它还有哪些watcher呢?通过在addTreeCacheListener增加日志
public void addTreeCacheListener(final TreeCacheListener listener, final String path, final int depth) {TreeCache tc = buildAndStartTreeCache(path, depth);if (tc != null) {tc.getListenable().addListener(listener);LogUtils.info(log, "addTreeCacheListener", "listener {} -path {} and depth is {}", listener, path,depth);}}
发现了如下的watcher及对应类:
[addTreeCacheListener] msg=listener com.vip.saturn.job.internal.server.JobOperationListenerManager$TriggerJobRunAtOnceListener@2480f1a6 -path /$Jobs/demo_DemoJavaJob/servers/executor-2/runOneTime and depth is 0[demo_DemoJavaJob] msg=executor-2 - demo_DemoJavaJob builds treeCache for path = /$Jobs/demo_DemoJavaJob/servers/executor-2/stopOneTime, depth = 0[addTreeCacheListener] msg=listener com.vip.saturn.job.internal.server.JobOperationListenerManager$JobForcedToStopListener@3e8ff3fe -path /$Jobs/demo_DemoJavaJob/servers/executor-2/stopOneTime and depth is 0[demo_DemoJavaJob] msg=executor-2 - demo_DemoJavaJob builds treeCache for path = /$Jobs/demo_DemoJavaJob/config/toDelete, depth = 0[addTreeCacheListener] msg=listener com.vip.saturn.job.internal.server.JobOperationListenerManager$JobDeleteListener@496c6125 -path /$Jobs/demo_DemoJavaJob/config/toDelete and depth is 0[demo_DemoJavaJob] msg=executor-2 - demo_DemoJavaJob builds treeCache for path = /$Jobs/demo_DemoJavaJob/config/cron, depth = 0[addTreeCacheListener] msg=listener com.vip.saturn.fig.ConfigurationListenerManager$CronPathListener@1fe7e5ef -path /$Jobs/demo_DemoJavaJob/config/cron and depth is 0[demo_DemoJavaJob] msg=executor-2 - demo_DemoJavaJob builds treeCache for path = /$Jobs/demo_DemoJavaJob/config/downStream, depth = 0[addTreeCacheListener] msg=listener com.vip.saturn.fig.ConfigurationListenerManager$DownStreamPathListener@1d26520a -path /$Jobs/demo_DemoJavaJob/config/downStream and depth is 0[demo_DemoJavaJob] msg=executor-2 - demo_DemoJavaJob builds treeCache for path = /$Jobs/demo_DemoJavaJob/config/enabled, depth = 0[addTreeCacheListener] msg=listener com.vip.saturn.fig.ConfigurationListenerManager$EnabledPathListener@531d0801 -path /$Jobs/demo_DemoJavaJob/config/enabled and depth is 0[demo_DemoJavaJob] msg=executor-2 - demo_DemoJavaJob builds treeCache for path = /$Jobs/demo_DemoJavaJob/analyse/reset, depth = 0[addTreeCacheListener] msg=listener com.vip.saturn.job.internal.analyse.AnalyseResetListenerManager$AnalyseResetPathListener@4dca3ed4 -path /$Jobs/demo_DemoJavaJob/analyse/reset and depth is 0[demo_DemoJavaJob] msg=executor-2 - demo_DemoJavaJob builds treeCache for path = /$Jobs/demo_DemoJavaJob/control/report, depth = 0[addTreeCacheListener] msg=listener com.vip.saturn.l.ControlListenerManager$ReportPathListener@13f18aaa -path /$Jobs/demo_DemoJavaJob/control/report and depth is 0
这些watcher其实也对应console 上的不同操作
jobs 的触发管理类public class SaturnWorker implements Runnable的
public void run(),包括了设置下次触发时间
附带public class SystemEnvProperties,基本上所有环境变量都在此定义
本文发布于:2024-01-28 15:58:50,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/17064287358568.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |