vipshop saturn job shard执行过程

阅读: 评论:0

vipshop saturn job shard执行过程

vipshop saturn job shard执行过程

以java job为例,单个job的执行都是从public abstract class AbstractElasticJob
的execute函数开始,execute的触发是在public class SaturnWorker implements Runnable类的run中。

execute函数主要负责

  1. 各种前置条件(包括结束console re-shard 结果)通过后调用真实执行函数executeJobInternal(shardingContext)
  2. 判断是否要进行failover处理

关键代码如下:

executeJobInternal(shardingContext);if (isFailoverSupported() && configService.isFailover() && !stopped && !forceStopped && !aborted) {failoverService.failoverIfNecessary();}

executeJobInternal从名称就可以看出是负责内部执行,包括失败shard的failover
在finally代码中有如下处理:

					if (isFailoverSupported() && configService.isFailover()) {failoverService.updateFailoverComplete(item);}}}runDownStream(shardingContext);

即设置failover处理完成标注(其实其它节点会再次检测是否完成),调用下游passive 任务,注意是在finally代码,意味job shard本身无论执行成功还是失败,都会被执行,但实际有任何shard失败,都不会执行下游job,原因是
runDownStream有如下代码

if (!mayRunDownStream(shardingContext)) {
return
}

AbstractSaturnJob,通过override mayRunDownStream函数只要有一个失败,就不触发下游

job shard的执行并不是直接在线程内完成,而是通过sumbit 到service 队列完成
以下是submit的thread stack

Thread [executor-2_DemoJob-saturnWorker] (Suspended (breakpoint at line 128 in SaturnJavaJob))	owns: HashMap<K,V>  (id=397)	SaturnJavaJob.handleJob(SaturnExecutionContext) line: 128	SaturnJavaJob(AbstractSaturnJob).executeJob(JobExecutionMultipleShardingContext) line: 62	SaturnJavaJob(AbstractElasticJob).executeJobInternal(JobExecutionMultipleShardingContext) line: 203	SaturnJavaJob(AbstractElasticJob).execute(Triggered) line: 183	SaturnWorker.run() line: 173	Executors$RunnableAdapter<T>.call() line: 511	FutureTask<V>.run() line: 266	ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1149	ThreadPoolExecutor$Worker.run() line: 624	Thread.run() line: 748	

java job超时可以强杀,这个实现过程大致是在public class SaturnJavaJob extends AbstractSaturnJob类的
protected Map<Integer, SaturnJobReturn> handleJob中,在提交job shard执行task,同时启动一个timeout task,代码如下

if (timeoutSeconds > 0) {
TimeoutSchedulerExecutor.ExecutorName(), timeoutSeconds,
shardingItemFutureTask);
}

timeout 处理代码主要见private static class TimeoutHandleTask implements Runnable

而job failover 处理主要都在package com.vip.saturn.job.internal.failover下。
public class FailoverListenerManager 在启动(public void start())的时候,增加对job下execution的listener,代码如下:

zkCacheManager.addTreeCacheListener(new ExecutionPathListener(), executionPath, 1)

而这个listener在有分片变化时候对running和failover 子item增加两个listener 代码如下

switch (Type()) {case NODE_ADDED:zkCacheManager.addNodeCacheListener(new RunningPathListener(item), runningPath);runningAndFailoverPath.add(runningPath);zkCacheManager.addNodeCacheListener(new FailoverPathJobListener(item), failoverPath);runningAndFailoverPath.add(failoverPath);break;case NODE_REMOVED:zkCacheManager.closeNodeCache(runningPath);ve(runningPath);zkCacheManager.closeNodeCache(failoverPath);ve(failoverPath);break;default:}

class ExecutionPathListener extends AbstractJobListener 和class FailoverPathJobListener implements NodeCacheListener
的failover触发其实都是item被删除,具体代码如下

if (!executionService.isRunning(item)) {
failover(item);
}
if (!executionService.isFailover(item)) {
failover(item);
}

注意一下:所有executor节点都会触发所有shard的failover 检测

private synchronized void failover 函数负责创建failover待执行节点:leader/failover/items
同时无job 待执行情况下尝试重新执行失败job shard,代码如下:

failover 在进入正式执行前,为避免多个executor重复执行,还要进行leader选举,这段代码在public class FailoverService的public void failoverIfNecessary()函数中,代码如下:

public void failoverIfNecessary() {if (!needFailover()) {return;}// 通过leader/failover/latch 确保只有一个节点执行failover job shardgetJobNodeStorage().executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback(), 1, TimeUnit.MINUTES,new FailoverTimeoutLeaderExecutionCallback());}

注意上面代码的executeInLeader

本文发布于:2024-01-28 15:58:29,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/17064287178566.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:过程   saturn   vipshop   shard   job
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23