Spring Boot 下使用 Spring Batch

阅读: 评论:0

Spring Boot 下使用 Spring Batch

Spring Boot 下使用 Spring Batch

公司有个小需求,就是将老平台用户信息和用户的资产信息迁移到新平台上。功能是实现起来是很简单。大概流程:

1.读取老平台用户

2.将读取到老平台用户信息转成新平台用户信息(还有其他的基本信息)的bean。

3.将bean写入到新平台。

 

这时突然想到了Spring Batch 框架,之前了解过,但一直没实践过。上面的需求感觉很适合用这个轻量级框架。

下面记录下,方便以后类似需求使用时参考。

说明:整个项目很小,使用的技术有: Spring Boot + Mybatis + Spring Batch 

因为要读取老平台数据库,插入到新平台数据库,所以这里使用了双数据源配置。

数据库密码是不可逆的,所以密码随机给、用户登录时根据状态提示用户必须修改密码才可以登录。

 

直接上干货:

 

 

	<dependencies><dependency><groupId&batis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>1.3.2</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId></dependency></dependencies>

 

application.properties:

 

 

spring.abled = false 是启动项目不自动执行批任务

 

Spring Boot 启动类:

inex;batis.spring.boot.autoconfigure.MybatisAutoConfiguration;
import org.onfiguration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;@SpringBootApplication
@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class,DataSourceTransactionManagerAutoConfiguration.class, MybatisAutoConfiguration.class})
@EnableBatchProcessing
public class ExUdtsApplication {public static void main(String[] args) {SpringApplication.run(ExUdtsApplication.class, args);}
}

@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class,DataSourceTransactionManagerAutoConfiguration.class, MybatisAutoConfiguration.class})

加这个是因为数据库是双数据源,自动注入时会冲突。所以这里给去掉。

@EnableBatchProcessing     这个别忘记加了

 

 

二个数据源注解配置:

 

fig;import javax.sql.DataSource;import org.apache.ibatis.session.SqlSessionFactory;
batis.spring.SqlSessionFactoryBean;
batis.spring.annotation.MapperScan;
batis.spring.boot.autoconfigure.SpringBootVFS;
import org.springframework.beans.factory.annotation.Qualifier;
import org.t.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import t.annotation.Bean;
import t.annotation.Configuration;
import io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import ansaction.PlatformTransactionManager;@Configuration
@MapperScan(basePackages= {&#ws.mapper"},sqlSessionFactoryRef="newSqlSessionFactory")
public class NewDataSourceConfig {@Bean(name = "newDataSource")@ConfigurationProperties(&#ws")public DataSource newDataSource() {ate().build();}@Bean(name = "newSqlSessionFactory")public SqlSessionFactory sqlSessionFactory(@Qualifier("newDataSource") DataSource dataSource) throws Exception {SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean();sessionFactoryBean.setDataSource(dataSource);sessionFactoryBean.setVfs(SpringBootVFS.class);sessionFactoryBean.setTypeAliasesPackage(&#ws.ity");sessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:mapper/news/*.xml"));Object();}@Bean(name = "newTransactionManager")public PlatformTransactionManager prodTransactionManager(@Qualifier("newDataSource") DataSource dataSource) {return new DataSourceTransactionManager(dataSource);}
}
fig;import javax.sql.DataSource;import org.apache.ibatis.session.SqlSessionFactory;
batis.spring.SqlSessionFactoryBean;
batis.spring.annotation.MapperScan;
batis.spring.boot.autoconfigure.SpringBootVFS;
import org.springframework.beans.factory.annotation.Qualifier;
import org.t.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import t.annotation.Bean;
import t.annotation.Configuration;
import t.annotation.Primary;
import io.support.PathMatchingResourcePatternResolver;@Configuration
@MapperScan(basePackages= {&#inex.olds.mapper"},sqlSessionFactoryRef="oldSqlSessionFactory")
public class OldDataSourceConfig {@Primary@Bean(name = "oldDataSource")@ConfigurationProperties("datasource.olds")public DataSource oldDataSource() {ate().build();}@Bean(name = "oldSqlSessionFactory")public SqlSessionFactory sqlSessionFactory(@Qualifier("oldDataSource") DataSource dataSource) throws Exception {SqlSessionFactoryBean sessionFactoryBean = new SqlSessionFactoryBean();sessionFactoryBean.setDataSource(dataSource);sessionFactoryBean.setVfs(SpringBootVFS.class);sessionFactoryBean.setTypeAliasesPackage(&#inex.olds.ity");sessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:mapper/olds/*.xml"));Object();}
}

sessionFactoryBean.setVfs(SpringBootVFS.class);  如果不加这个 把项目打成jar包运行时,会报mybatis找不到定义的别名类。

 

newTransactionManager  定义这个事物是为了让写入新库时用的。操作老平台库 和 新平台库不可能在一个事物内进行。

 

 

下面是Spring Batch 相关的代码:

首先简单说明下Spring Batch的几个组成部分:

一个Job有1个或多个Step组成,Step有读、处理、写三部分操作组成;通过JobLauncher启动Job,启动时从JobRepository获取Job Execution;当前运行的Job及Step的结果及状态保存在JobRepository中。

 

 

BatchConfig

fig;import javax.batch.api.chunk.ItemReader;
import javax.sql.DataSource;import org.Job;
import org.JobExecutionListener;
import org.Step;
import org.onfiguration.annotation.JobBuilderFactory;
import org.onfiguration.annotation.StepBuilderFactory;
import org.launch.support.RunIdIncrementer;
import org.launch.support.SimpleJobLauncher;
import org.epository.JobRepository;
import org.epository.support.JobRepositoryFactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import t.annotation.Bean;
import t.annotation.Configuration;
import ansaction.PlatformTransactionManager;listener.JobCompletionListener;
setp.NewUserWriter;
setp.OldUserReader;
setp.UserProcessor;
ws.domain.UserMemberDo;
inex.olds.domain.FuserDo;@Configuration
public class UserBatchConfig {@Autowiredpublic JobBuilderFactory jobBuilderFactory;@Autowiredpublic StepBuilderFactory stepBuilderFactory;/*** 创建JobRepository,用来注册Job的容器* @param dataSource* @param transactionManager* @return* @throws Exception*/
/*    @Bean(name="myJobRepository")public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {JobRepositoryFactoryBean jobRepositoryFactoryBean =new JobRepositoryFactoryBean();jobRepositoryFactoryBean.setDataSource(dataSource);jobRepositoryFactoryBean.setTransactionManager(transactionManager);jobRepositoryFactoryBean.setDatabaseType("mysql");Object();}*//*** 创建 JobLauncher,用来启动Job的接口* @param dataSource* @param transactionManager* @return* @throws Exception*/
/*    @Bean(name="myJobLauncher")public SimpleJobLauncher jobLauncher(DataSource dataSource,PlatformTransactionManager transactionManager)throws Exception{SimpleJobLauncher jobLauncher = new SimpleJobLauncher();jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));return jobLauncher;}*//*** 创建Job* @return*/@Beanpublic Job processJob() {("processJob").incrementer(new RunIdIncrementer()).listener(listener()).flow(userStep()).end().build();}/*** 创建Step* @return*/@Beanpublic Step userStep() {("userStep").<FuserDo,UserMemberDo>chunk(100).reader(reader()).processor(processor()).writer(writer()).build();}/*** 创建Reader* @return*/@Beanpublic OldUserReader reader() {return  new OldUserReader();}/*** 创建Processor* @return*/@Beanpublic UserProcessor processor() {return new UserProcessor();}/*** 创建Writer* @return*/@Beanpublic NewUserWriter writer() {return new NewUserWriter();}/*** 创建Listener* @return*/@Beanpublic JobExecutionListener listener() {return new JobCompletionListener();}}

 

 

Reader:

setp;import java.util.List;import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.beans.factory.annotation.Autowired;inex.olds.domain.FuserDo;
ity.VritualWallet;
inex.olds.service.FuserService;public class OldUserReader implements ItemReader<FuserDo> {@AutowiredFuserService fuserService;@Overridepublic FuserDo read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {FuserDo fuserDo =  PendingUser();if(fuserDo != null) {//查询用户资产List<VritualWallet> wallets = fuserService.Fid());fuserDo.setWallets(wallets);fuserService.Fid());}return fuserDo;}}

 

 

Processor:

setp;import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;inexmon.UuidUtil;
ws.domain.UserMemberDo;
ity.CoinInfor;
ity.UserAssetInfo;
ity.UserMemberExtend;
ws.service.UserMemberService;
inex.olds.domain.FuserDo;
ity.VritualWallet;public class UserProcessor implements ItemProcessor<FuserDo, UserMemberDo> {@Value("${custom.houseId}")private Long houseId;@Value("${custom.levelId}")private Long levelId;@Autowiredprivate UserMemberService userMemberService;@Overridepublic UserMemberDo process(FuserDo oldUser) throws Exception {//将老用户有用的信息转转换到新用户实体上,密码随机UserMemberDo memberDo = new UserMemberDo();memberDo.Uuid());memberDo.FloginName());memberDo.FloginName());memberDo.setHouseId(houseId);memberDo.setApplyTime(new Date());memberDo.setEmailStatus(1);memberDo.setIdentifiStatus(0);memberDo.setLastUpdate(new Date());memberDo.setStatus(1);memberDo.setType(2);memberDo.setVersion(0);memberDo.setUserLevel(0);memberDo.setLevelId(levelId);memberDo.setFeeSwitch(0);//为新用户生成所有币种资产信息,并将老用户资产转换到新用户资产上List<CoinInfor> coinInfors = userMemberService.finCoinInfoByHouseId(houseId);List<UserAssetInfo> assetInfos = new ArrayList<UserAssetInfo>();List<VritualWallet> wallets = Wallets();//这里有一个问题,就是当老平台有的币、新平台确没有这个币,则会导致用户这个币的资产落空!!!coinInfors.forEach( coinInfor -> {UserAssetInfo assetInfo = new UserAssetInfo();assetInfo.Uuid());assetInfo.setHouseId(houseId);assetInfo.Id());assetInfo.setFrozen(new BigDecimal(0));assetInfo.setLastTime(new Date());assetInfo.Id());assetInfo.Username());assetInfo.setTotal(getOldTotal(SortName()));assetInfo.setVersion(0);assetInfo.SortName());assetInfos.add(assetInfo);});memberDo.setAssetInfos(assetInfos);//用户扩展表UserMemberExtend memberExtend = new UserMemberExtend();memberExtend.setHouseId(houseId);memberExtend.Id());memberExtend.setCreateTime(new Date());memberExtend.setDealStatus(1);memberExtend.setRechargeStatus(1);memberExtend.setWithdrawStatus(1);memberExtend.setGooleStauts(0);memberExtend.setLoginOpenGA(0);memberExtend.setWithdrawOpenGA(0);memberExtend.setNoteAuthStatus(0);memberExtend.Status());memberExtend.Type());memberExtend.setWithdrawOpenNote(0);memberExtend.Id()+"");memberExtend.setRegisterPath(2);memberExtend.setLastLogin(new Date());memberExtend.setInvalidPassword(1);memberDo.setMemberExtend(memberExtend);return memberDo;}/*** 获取老平台用户资产total* @param wallets* @param sortName* @return*/private BigDecimal getOldTotal(List<VritualWallet> wallets,String sortName) {for(VritualWallet wallet : wallets) {im().CoinName().trim())) {Total();}}return new BigDecimal(0);}}

 

Writer:

setp;import java.util.List;
import urrent.atomic.AtomicLong;import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;ws.domain.UserMemberDo;
ws.service.UserMemberService;public class NewUserWriter implements ItemWriter<UserMemberDo> {@Autowiredprivate UserMemberService userMemberService;private AtomicLong counter = new AtomicLong(0);  @Overridepublic void write(List<? extends UserMemberDo> userMemberDos) throws Exception {userMemberDos.forEach( user -> {userMemberService.save(user);counter.incrementAndGet();});System.out.println("一个同步了:"+counter);}}

 

 

这里的save方法上要用newTransactionManager 事物 如:@Transactional(propagation=Propagation.REQUIRES_NEW,transactionManager="newTransactionManager")。要不save方法体会没事物的。注意,如果在save这块报错,Reader 和 Processos整个一批操作都会被回滚掉,因为整个操作在在old事物中进行的(数据源那默认设置oldDataSource)。当然,如果save方法执行完后框架内部报错了,那就会出现Reader 被回滚,但save的数据还是插入了,这是因为用了双数据源导致的。如果整个处理业务在一个数据源环境下操作Spring Batch会保证一批处理在一个事物内完成。(注:这里的一批是 前面batch config配置的  chunk(100))

 

 

Listeners:

listener;import org.BatchStatus;
import org.JobExecution;
import org.listener.JobExecutionListenerSupport;public class JobCompletionListener extends JobExecutionListenerSupport {@Overridepublic void afterJob(JobExecution jobExecution) {if(BatchStatus.COMPLETED == Status()) {System.out.println("old2new 任务执行完毕");}}@Overridepublic void beforeJob(JobExecution jobExecution) {System.out.println("old2new 任务开始执行....");}}

 

 

最后还要创建几张Spring Beach 内部用的几张表:

sql文件在 spring-batch-core-4.0.1 REEASE.jar 包里。根据自己的数据库去执行就可以了。

 

 

测试执行:

 

inex;import org.junit.Test;
import org.junit.runner.RunWith;
import org.Job;
import org.JobParameters;
import org.JobParametersBuilder;
import org.JobParametersInvalidException;
import org.launch.JobLauncher;
import org.epository.JobExecutionAlreadyRunningException;
import org.epository.JobInstanceAlreadyCompleteException;
import org.epository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.st.context.SpringBootTest;
import st.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class BathTest {@AutowiredJobLauncher jobLauncher;@AutowiredJob processJob;@Testpublic void excutorTest() throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException {JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis()).toJobParameters();jobLauncher.run(processJob, jobParameters);}
}

 

本文发布于:2024-01-28 02:57:39,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/17063818634279.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:Spring   Boot   Batch
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23