以java job为例,单个job的执行都是从public abstract class AbstractElasticJob
的execute函数开始,execute的触发是在public class SaturnWorker implements Runnable类的run中。
execute函数主要负责
关键代码如下:
executeJobInternal(shardingContext);if (isFailoverSupported() && configService.isFailover() && !stopped && !forceStopped && !aborted) {failoverService.failoverIfNecessary();}
executeJobInternal从名称就可以看出是负责内部执行,包括失败shard的failover
在finally代码中有如下处理:
if (isFailoverSupported() && configService.isFailover()) {failoverService.updateFailoverComplete(item);}}}runDownStream(shardingContext);
即设置failover处理完成标注(其实其它节点会再次检测是否完成),调用下游passive 任务,注意是在finally代码,意味job shard本身无论执行成功还是失败,都会被执行,但实际有任何shard失败,都不会执行下游job,原因是
runDownStream有如下代码
if (!mayRunDownStream(shardingContext)) {
return
}
AbstractSaturnJob,通过override mayRunDownStream函数只要有一个失败,就不触发下游
job shard的执行并不是直接在线程内完成,而是通过sumbit 到service 队列完成
以下是submit的thread stack
Thread [executor-2_DemoJob-saturnWorker] (Suspended (breakpoint at line 128 in SaturnJavaJob)) owns: HashMap<K,V> (id=397) SaturnJavaJob.handleJob(SaturnExecutionContext) line: 128 SaturnJavaJob(AbstractSaturnJob).executeJob(JobExecutionMultipleShardingContext) line: 62 SaturnJavaJob(AbstractElasticJob).executeJobInternal(JobExecutionMultipleShardingContext) line: 203 SaturnJavaJob(AbstractElasticJob).execute(Triggered) line: 183 SaturnWorker.run() line: 173 Executors$RunnableAdapter<T>.call() line: 511 FutureTask<V>.run() line: 266 ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1149 ThreadPoolExecutor$Worker.run() line: 624 Thread.run() line: 748
java job超时可以强杀,这个实现过程大致是在public class SaturnJavaJob extends AbstractSaturnJob类的
protected Map<Integer, SaturnJobReturn> handleJob中,在提交job shard执行task,同时启动一个timeout task,代码如下
if (timeoutSeconds > 0) {
TimeoutSchedulerExecutor.ExecutorName(), timeoutSeconds,
shardingItemFutureTask);
}
timeout 处理代码主要见private static class TimeoutHandleTask implements Runnable
而job failover 处理主要都在package com.vip.saturn.job.internal.failover下。
public class FailoverListenerManager 在启动(public void start())的时候,增加对job下execution的listener,代码如下:
zkCacheManager.addTreeCacheListener(new ExecutionPathListener(), executionPath, 1)
而这个listener在有分片变化时候对running和failover 子item增加两个listener 代码如下
switch (Type()) {case NODE_ADDED:zkCacheManager.addNodeCacheListener(new RunningPathListener(item), runningPath);runningAndFailoverPath.add(runningPath);zkCacheManager.addNodeCacheListener(new FailoverPathJobListener(item), failoverPath);runningAndFailoverPath.add(failoverPath);break;case NODE_REMOVED:zkCacheManager.closeNodeCache(runningPath);ve(runningPath);zkCacheManager.closeNodeCache(failoverPath);ve(failoverPath);break;default:}
class ExecutionPathListener extends AbstractJobListener 和class FailoverPathJobListener implements NodeCacheListener
的failover触发其实都是item被删除,具体代码如下
if (!executionService.isRunning(item)) {
failover(item);
}
if (!executionService.isFailover(item)) {
failover(item);
}
注意一下:所有executor节点都会触发所有shard的failover 检测
private synchronized void failover 函数负责创建failover待执行节点:leader/failover/items
同时无job 待执行情况下尝试重新执行失败job shard,代码如下:
failover 在进入正式执行前,为避免多个executor重复执行,还要进行leader选举,这段代码在public class FailoverService的public void failoverIfNecessary()函数中,代码如下:
public void failoverIfNecessary() {if (!needFailover()) {return;}// 通过leader/failover/latch 确保只有一个节点执行failover job shardgetJobNodeStorage().executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback(), 1, TimeUnit.MINUTES,new FailoverTimeoutLeaderExecutionCallback());}
注意上面代码的executeInLeader
本文发布于:2024-01-28 15:58:29,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/17064287178566.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |