一、背景
对于java开发中,十万百万千万级别的list进行各种操作,然后入库等操作好多时候会遇到。普通单线程处理,处理时间长,还经常报gc问题。针对此问题,查阅了网上很多资料,好多都使用多线程来处理。跟着好多的博客进行处理,要么是线程安全问题,要么根本速度就提高不了。我针对我项目中的使用场景,结合资料进行了修改,特提交此文,为有共同需求的小伙伴提供一点建议。
————————————————
开发过程中,合理地使用线程池可以带来3个好处:
降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。
提高线程的可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
————————————————
二、代码如下:
package org.hzero.gdh.di.app.service.impl;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.hscs.hsdi.app.service.handle.ItfDataAcceptingHandle;
import auth.DetailsHelper;
slf4j.Slf4j;
import org.hzero.boot.platform.ds.DatasourceHelper;
import org.hzero.boot.platform.ds.vo.DatasourceVO;
import org.hzero.boot.platform.lov.adapter.LovAdapter;
import org.hzero.boot.platform.lov.dto.LovValueDTO;
import org.hzero.boot.platform.profile.ProfileClient;
import base.BaseConstants;
import edis.RedisHelper;
import org.hzero.gdh.di.app.service.AccountsReceivableService;
import org.hzero.gdh.di.app.service.SchedulerLogService;
import org.hzero.gdh.ity.AccountsReceivable;
import org.hzero.gdh.pository.SchedulerLogRepository;
import org.hzero.gdh.di.infra.util.DatasourceUtils;
import org.hzero.jdbc.Query;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;SimpleDateFormat;
import java.util.*;
import urrent.LinkedBlockingQueue;
import urrent.ThreadPoolExecutor;
import urrent.TimeUnit;@Service
@Slf4j
public class AccountsReceivableServiceImpl implements AccountsReceivableService {protected static final Logger logger = Logger(AccountsReceivableServiceImpl.class);@Autowiredpublic SchedulerLogRepository schedulerLogRepository;@Autowiredpublic SchedulerLogService schedulerLogService;@Autowiredprivate ItfDataAcceptingHandle itfDataAcceptingHandle;@Autowiredprivate DatasourceHelper datasourceHelper;@Autowiredprivate ProfileClient profileClient;@Autowiredprivate LovAdapter lovAdapter;@Autowired@Qualifier("redisHelper")private RedisHelper redisHelper;@Overridepublic void incrementGrabData(Long tenantId, String scheduleCode) {// 获取当前系统用户id,用户姓名Long userId = UserDetails().getUserId();String username = UserDetails().getUsername();String sourceSystem = "YSXT";String interfaceCode = "BILL_INTERFACE";// 初始化同步总数int dataCount = 0;// 定义或初始化属性和对象JSONObject resultJson = new JSONObject();// 获取当前时间点Date date = new Date();SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String currentTime = simpleDateFormat.format(date);// 获取上次调用时间点String lastInvokeTime = simpleDateFormat.LastInvokeTime(scheduleCode, tenantId));// 遍历数据源值集List<LovValueDTO> lovValueDTOList = lovAdapter.queryLovValue("GDH.GOURP_CODE_RESOURCE", BaseConstants.DEFAULT_TENANT_ID);// 获取最大数据量String batchCount = ProfileValueByOptions("HSCS.ACCOUNTS_RECEIVABLE_MAX");int maxCount = Integer.parseInt(batchCount);/** ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler)corePoolSize:线程池维护线程的最少数量maximumPoolSize:线程池维护线程的最大数量keepAliveTime: 线程池维护线程所允许的空闲时间unit: 线程池维护线程所允许的空闲时间的单位workQueue: 线程池所使用的缓冲队列handler: 线程池对拒绝任务的处理策略**/// 获取cpu核数(IO密集型:核心线程数 = CPU核数 * 2; CPU密集型:CPU核数 + 1)int threadNum = Runtime().availableProcessors() + 1;// 构造一个线程池(用getTask()方法重复利用核心线程来执行任务)ThreadPoolExecutor executor = new ThreadPoolExecutor(threadNum, threadNum, 0L,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.DiscardOldestPolicy());try {for (LovValueDTO lovValueDTO : lovValueDTOList) {// 获取平台数据源明细配置DatasourceVO datasourceVO = Datasource("DI", tenantId, Tag());// 调用工厂类构建查询器Query query = DatasourceUtils.obtainDatasourceDetail(datasourceVO);// 执行sql语句拼装String countSqlText = accountsReceivableCountSql(lastInvokeTime);// 获取当前数据源需要同步的数据总数Long queryCount = query.queryCount(countSqlText);int count = Integer.parseInt(String.valueOf(queryCount));if (count == 0) {continue;}// 获取需要并发遍历的次数int value = count / maxCount;// 记录多数据源总数dataCount = dataCount + count;// 遍历每一个数据源的数据,每次只查询导入固定大小的数据(batchCount)for (int i = 0; i <= value; i++) {// 执行sql语句拼装:intern()方法会把string写到常量池String infoSqlText = accountsReceivableInfoSql(lastInvokeTime, currentTime,maxCount * i, maxCount * (i + 1)).intern();// 放在要检测的代码段前,取开始前的时间戳Long startTime = System.currentTimeMillis();/*** new Runnable() {* @Override* public void run() {}* }* 相当于lambda表达式:() -> {}*/ute(() -> {synchronized (infoSqlText) {try {// 随机获取puuidString puuid = UUID.randomUUID().toString().replaceAll("-", "");// 根据上次调用时间和当前时间增量拉取oracle数据库单次的应收账单信息Map<String, Object> objectMap = query.query(infoSqlText);List<AccountsReceivable> accountsReceivableList = JSONObject.JSONString(objectMap)).getJSONArray("DATA").toJavaList(AccountsReceivable.class);// 把每一条数据根据最后更新时间的月份值设置一个分库字段值标识accountsReceivableList.forEach(accountsReceivable -> {Calendar calendar = Instance();calendar.BusinessDate());String month = String.(Calendar.MONTH) + 1);accountsReceivable.setPartionValue(month);});JSONArray jsonArray = JSONArray.JSONStringWithDateFormat(accountsReceivableList, "yyyy-MM-dd"));// 将数据封装成一个jsonArray,之后传入数据集成服务resultJson.put("context", jsonArray);resultJson.put("batchNum", puuid);resultJson.put("sourceSystem", sourceSystem);resultJson.put("interfaceCode", interfaceCode);try {if (!jsonArray.isEmpty()) {Long startTime2 = System.currentTimeMillis();// 接入数据集成服务平台itfDataAcceptingHandle.outDataImportJsonTartgetTable(resultJson, tenantId, userId, username);logger.info("数据导入运行时间: " + (System.currentTimeMillis() - startTime2) / 1000L + "秒");}} catch (InterruptedException e) {logger.info("数据导入异常", e.getMessage());e.printStackTrace();}} catch (Exception e) {("线程运行异常", e.getMessage());e.printStackTrace();}}// 打印出当前线程logger.info(Thread.currentThread().getName() + " - executor");});// 计算并打印耗时logger.info("当前线程运行时间: " + (System.currentTimeMillis() - startTime) + "ms");}}} catch (Exception e) {("数据库连接发生错误", e.getMessage());e.printStackTrace();}//关闭线程池executor.shutdown();logger.info("所有线程任务执行结束");while (true) {//等待所有任务都执行结束if (executor.isTerminated()) {//所有的子线程都结束了// 当前调度任务没有记录时,记录本次调度任务的详情。否则只更新上次调用时间schedulerLogService.updateLastInvokeTime(scheduleCode, tenantId, date,true, "dataCount:" + dataCount);break;}}}@Overridepublic String accountsReceivableCountSql(String lastInvokeTime) {// select count(BILL_ID) from WATER_FCT.gdh_water_bill where 1=1// and last_update_date >= to_date('lastInvokeTime','yyyy-mm-dd hh24:mi:ss')// order by BILL_ID;
// String sqlStr = "select BILL_ID from water_fct.gdh_water_bill where 1=1";String countSql = ProfileValueByOptions("GSDI.SELECTCOUNTSQL");StringBuilder value = new StringBuilder(countSql);value.append(" and last_update_date >= to_date").append("(").append("'").append(lastInvokeTime).append("'").append(",").append("'").append("yyyy-mm-dd hh24:mi:ss").append("'").append(")").append(" order by BILL_ID");String resultSql = String();return resultSql;}@Overridepublic String accountsReceivableInfoSql(String lastInvokeTime, String currentTime, int pageIndex, int pageSize) {// select * from (select gwb.*,rownum r from (select * from water_fct.gdh_water_bill)gwb where 1=1// last_update_date >= to_date('lastInvokeTime','yyyy-mm-dd hh24:mi:ss')// last_update_date >= to_date('currentTime','yyyy-mm-dd hh24:mi:ss')// and rownum <= pageSize) where r > pageIndex;
// String sqlStr = "select * from (select gwb.*,rownum r from (select * from water_fct.gdh_water_bill order by BILL_ID)gwb where 1=1";String infoSql = ProfileValueByOptions("GSDI.SELECTINFOSQL");StringBuilder value = new StringBuilder(infoSql);value.append(" and last_update_date >= to_date").append("(").append("'").append(lastInvokeTime).append("'").append(",").append("'").append("yyyy-mm-dd hh24:mi:ss").append("'").append(")").append(" and last_update_date <= to_date").append("(").append("'").append(currentTime).append("'").append(",").append("'").append("yyyy-mm-dd hh24:mi:ss").append("'").append(")").append(" and rownum <= ").append(pageSize).append(")").append(" where r > ").append(pageIndex);String resultSql = String();return resultSql;}}
————————————————
三、ThreadPoolExecutor执行过程
1.当线程数小于核心线程数时,创建线程。
2.当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
3.当线程数大于等于核心线程数,且任务队列已满。
(1)若线程数小于最大线程数,创建线程。
(2)若线程数等于最大线程数,抛出异常,拒绝任务。
本文发布于:2024-02-02 20:34:38,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170687727646280.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |