springboot项目自定义设置执行定时任务与异步任务的线程池

阅读: 评论:0

springboot项目自定义设置执行定时任务与异步任务的线程池

springboot项目自定义设置执行定时任务与异步任务的线程池

我们知道在springboot项目执行定时任务和异步任务还是比较简单的,因为springboot框架为我们做的太多了,封装好了太多的东西

如执行定时任务,只需要在项目启动类上,加上@EnableScheduling,表示启用调度任务

然后再创建一个类,用于声明需要执行的任务job即可

例如创建一个定时任务类,在类上加上@Component注解,声明为被spring管理的bean,可以被spring容器扫描到

在需要执行的具体方法上加上@Scheduled注解,注解里边可以设置固定执行,延迟执行等方法,不过平时用的最多的还是使用cron表达式方式灵活设置执行频率,cron表达式在线生成可以参考这个网站:在线Cron表达式生成器


import dulemon.service.BusinessService ;
import dulemon.service.ConsumerService ;/*** @author xiaomifeng1010* @version 1.0* @date: * @Description*/
@Component
@Slf4j
public class GetAndSaveTaxAndLawDataTask {@Autowiredprivate BusinessService businessService;@Autowiredprivate ConsumerService consumerService;/*** cron表达式需要为6位* @param* @description: * @author: xiaomifeng1010* @date: * @return: void**/@Scheduled(cron = "0 30 23 * * ?")public void getData() {// 你的业务逻辑businessService.selectData();log.info("添加数据成功");}/*** cron表达式需要为6位* @param* @description: * @author: xiaomifeng1010* @date: 2022/3/16* @return: void**/@Scheduled(cron = "0 30 23 * * ?")public void getConsumerData() {Data();}}
我这里写了两个需要执行的定时任务,spring task的调度任务线程默认是单线程的,定时任务需要排队,需要一个任务完成之后才能再执行另一个定时任务,如果先开始的任务在另一个任务需要开始的时候,还没执行完,那么后边的这个任务是没有执行的,如果需要异步执行这两个定时任务,就需要自定义设置一下执行定时任务的线程池

如下:

创建一个配置类,用于配置执行定时任务和异步任务的线程池

slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import t.annotation.Bean;
import t.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.urrent.ThreadPoolTaskExecutor;
import org.urrent.ThreadPoolTaskScheduler;
import org.fig.ScheduledTaskRegistrar;
import org.fig.Task;import urrent.Executor;
import urrent.ThreadPoolExecutor;/*** @author xiaomifeng1010* @version 1.0* @date: 2022/3/13 20:33* @Description 线程池配置*/
@Configuration
@Slf4j
public class ThreadPoolConfig implements AsyncConfigurer,SchedulingConfigurer {/*** 异步任务执行线程池参数*/private static final Integer CORE_POOL_SIZE = 5;private static final Integer MAX_POOL_SIZE = 200;private static final Integer QUEUE_CAPACITY = 2000;private static final String THREAD_NAME_PREFIX = "async-thread-";private static final Integer KEEP_ALIVE_SECONDS = 60;/*** 定时任务线程池线程名前缀*/private static final String SCHEDULER_THEREAD_NAME_PREFIX = "task-";/*** @description: 创建执行spring task定时任务的线程池,调用@scheduled注解的定时任务* @author: xiaomifeng1010* @date: 2022/3/13* @param* @return: TaskScheduler**/@Beanpublic ThreadPoolTaskScheduler threadPoolTaskScheduler() {ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();threadPoolTaskScheduler.setPoolSize(10);threadPoolTaskScheduler.setThreadNamePrefix(SCHEDULER_THEREAD_NAME_PREFIX);threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);return threadPoolTaskScheduler;}/*** @description: 创建执行异步任务的线程池,用于调用 @async注解的方法* @author: xiaomifeng1010* @date: 2022/3/13* @param* @return: ThreadPoolTaskExecutor**/@Bean("asyncThreadPoolTaskExecutor")public ThreadPoolTaskExecutor asyncThreadPoolTaskExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
//        核心线程数量threadPoolTaskExecutor.setCorePoolSize(CORE_POOL_SIZE);
//        最大线程数量threadPoolTaskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
//        队列中最大任务数threadPoolTaskExecutor.setQueueCapacity(QUEUE_CAPACITY);
//        线程名称前缀threadPoolTaskExecutor.setThreadNamePrefix(THREAD_NAME_PREFIX);
//        当达到最大线程数时如何处理新任务(拒绝策略)threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//        线程空闲后最大存活时间threadPoolTaskExecutor.setKeepAliveSeconds(KEEP_ALIVE_SECONDS);
//        初始化线程池threadPoolTaskExecutor.initialize();
//        关闭线程池threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);return threadPoolTaskExecutor;}/*** Callback allowing a {@link TaskScheduler* TaskScheduler} and specific {@link Task Task}* instances to be registered against the given the {@link ScheduledTaskRegistrar}.** @param taskRegistrar the registrar to be configured.*/@Overridepublic void configureTasks(ScheduledTaskRegistrar taskRegistrar) {taskRegistrar.setTaskScheduler(threadPoolTaskScheduler());}/*** The {@link Executor} instance to be used when processing async* method invocations.*/@Overridepublic Executor getAsyncExecutor() {return asyncThreadPoolTaskExecutor();}/*** The {@link AsyncUncaughtExceptionHandler} instance to be used* when an exception is thrown during an asynchronous method execution* with {@code void} return type.*/@Overridepublic AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {return (throwable, method, objects) -> {("异步任务执行出现异常, message {}, method {}, params {}", throwable, method, objects);};}
}

虽然在执行@Async注解的异步方法时,异步不用自定义线程池,因为springboot会创建一个默认的执行异步任务的线程池,不过一般推荐最好在执行异步任务的时候自定义一下线程池,同时可以声明线程的名称前缀,最好与业务相关,在日志排查的时候是有用的

由于最近的项目中需要执行的异步任务是需要调用第三方服务商的服务接口,所以有可能调用失败,所以还添加了重试机制,而重试机制,spring框架也提供了解决方案,只需要在项目中引入spring-retry

pom文件中加入:

 <dependency><groupId></groupId><artifactId>spring-retry</artifactId></dependency>

需要异步执行的方法以及需要重试的方法 代码如下:

import dulemon.service.BusinessService ;
lemon.base.Stopwatch;
slf4j.Slf4j;
import llections4.CollectionUtils;
import org.apachemons.lang3.BooleanUtils;
import org.apachemons.lang3.StringUtils;
import org.apachemons.lang3.math.NumberUtils;
import annotation.Backoff;
import annotation.Recover;
import annotation.Retryable;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch;import java.util.List;
import java.util.Objects;
import urrent.Future;
import urrent.TimeUnit;/*** @author xiaomifeng1010* @version 1.0* @date: 2022/3/9 13:53* @Description*/
@Service
@Slf4j
public class BusinessServiceImpl implements BusinessService {/*** @param enterpriseName 纳税人名称* @param uscc           统一社会信用代码* @description: * @author: xiaomifeng1010* @date: 2022/3/9* @return: **/@Override
//   默认重试3次,时间间隔2秒@Retryable(value = Exception.class, backoff = @Backoff(delay = 2000))public JSONObject obtainData( String enterpriseName, String uscc) {try {//执行http调用第三方服务接口} catch (Exception e) {("出错了,开始重试", e);throw new Message());}}/*** @param resultData     调用第三方服务接口响应的数据* @param enterpriseName* @param uscc           统一社会信用代码* @description: )* @author: xiaomifeng1010* @date: 2022/3/10* @return: Boolean**/@Override@Async("asyncThreadPoolTaskExecutor")public Future<Boolean> explainData( JSONObject resultData, String interfaceCode, String enterpriseName, String uscc) {Integer code = Integer("code");Boolean isSuccess = Boolean("isSuccess");//        不处理基本信息if (xxx) {return new AsyncResult<>(Boolean.TRUE);}
//       状态码code为1,并且success参数为true时,才是响应正常,返回数据if (BooleanUtils.isFalse(isSuccess) || !Objects.(NumberUtils.INTEGER_ONE, code)) {return new AsyncResult<>(Boolean.FALSE);}JSONObject dataJSONObject = JSONObject(DATA);
//        可能返回的数据是空的if (Objects.isNull(dataJSONObject)) {return new AsyncResult<>(Boolean.TRUE);}//        更新之前先清理旧数据deleteOldData(enterpriseName,uscc);if (ainsKey("containList")) {JSONArray dataArray = JSONArray("containList");if (dataArray.size() > 0) {StopWatch stopWatch = new StopWatch();//处理数据...
//                记录耗时stopWatch.start("处理xxx业务数据,并保存");//  处理具体的业务逻辑stopWatch.stop();log.info(stopWatch.prettyPrint());log.info("执行完[{}]任务,耗时{}毫秒,执行了{}项任务", LastTaskName(), TotalTimeMillis(), TaskCount());return new AsyncResult<>(Boolean.TRUE);}return new AsyncResult<>(Boolean.FALSE);}return new AsyncResult<>(Boolean.FALSE);}/*** @description: 批量更新新数据时,先删除旧数据* @author: xiaomifeng1010* @date: 2022/3/14* @param enterpriseName* @param uscc* @return: void**/
public void deleteOldTaxData(String enterpriseName,String uscc){//具体的判断逻辑及删除操作}/*** 重试3次后仍然失败后,执行的方法* @param e* @return*/@Recoverpublic JSONObject recover(Exception e){("重试3次,依然无法获取信息,出错信息:{}",e.getMessage());return new JSONObject();}}

在需要重试的方法上加上 @Retryable(value = Exception.class, backoff = @Backoff(delay = 2000))

那么该方法在执行出现exception的时候,会再次执行,执行了执行的重试次数后,依然失败,则会调用被@Recover注解的方法,注意重试的方法中捕获到了异常后,需要重新抛出,这样才能被

@Recover注解的方法捕获到,进行后续处理 

最后别忘了在启动类上加上需要@EnableAsync  和@EnableRetry注解,表示启用异步执行与重试机制。

还有一个注意点就是使用shedlock,在定时任务的地方加上

@SchedulerLock(name = "recordAndStatistics", lockAtMostFor = "10m", lockAtLeastFor = "5m")

也会使异步执行定时任务失效

    <!-- shedlock: 分布式定时任务锁 --><!-- .javacrumbs.shedlock/shedlock-spring --><dependency><groupId>net.javacrumbs.shedlock</groupId><artifactId>shedlock-spring</artifactId><version>4.39.0</version></dependency><!-- 使用redis做分布式任务 --><!-- .javacrumbs.shedlock/shedlock-provider-redis-spring --><dependency><groupId>net.javacrumbs.shedlock</groupId><artifactId>shedlock-provider-redis-spring</artifactId><version>4.39.0</version></dependency>

 例如这个截图的任务需要在另一个任务开始10分钟后,执行这个任务,先开始执行的任务需要几个小时才能执行完,那么这个截图的任务也是执行不到的,相当于,加了@SchedulerLock(name = "recordAndStatistics", lockAtMostFor = "10m", lockAtLeastFor = "5m")这个注解,定时任务线程调度又变成了单线程,所以这是个坑点。

本文发布于:2024-02-01 00:58:29,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170672031132674.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:自定义   线程   项目   springboot
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23