spring batch 之 step总结

阅读: 评论:0

spring batch 之 step总结

spring batch 之 step总结

在spring batch中 job是step运行的框架,而step则是运行具体业务的步骤,以下是step相关的一些总结。

step结构

一个Step通常涵盖三个部分:读数据(Reader)、处理数据(Processor)和写数据(Writer)。但是并不是所有的Step都需要自身来完成数据的处理,比如存储过程等方式是通过外部功能来完成,因此Spring Batch提供了2种Step的处理方式:
1)面向分片的ChunkStep,
2)面向过程的TaskletStep。
一般使用ChunkStep。

ChunkStep

在Step中数据是按记录(按行)处理的,但是每条记录处理完毕之后马上提交事物反而会导致IO的巨大压力。因此Spring Batch提供了数据处理的分片功能。设置了分片之后,一次工作会从Read开始,然后交由给Processor处理。处理完毕后会进行聚合,待聚合到一定的数量的数据之后一次性调用Write将数据提交到物理数据库。
如果在聚合数据期间出现任何错误,所有的这些数据都将不执行写入。

代码示例

@Bean
public Step testStep(PlatformTransactionManager transactionManager) {("testStep").transactionManager(transactionManager).<String, String>chunk(int) .reader(testReader()) .processor(testProcessor).writer(testWriter()) .build();
}

transactionManager:使用默认的 PlatformTransactionManager 对事物进行管理。当配置好事物之后Spring Batch会自动对事物进行管理,无需开发人员显示操作。
processor看具体需求可以不写

提交间隔

Step使用PlatformTransactionManager管理事物。每次事物提交的间隔根据chunk方法中配置的数据执行。提交间隔设置太小,那么会浪费需要多不必要的资源,提交间隔设置的太长,会导致事物链太长占用空间,并且出现失败会导致大量数据回滚。一般设为10到20。

step重启次数

某些Step可能用于处理一些先决的任务,所以当Job再次重启时这Step就没必要再执行,可以通过设置startLimit来限定某个Step重启的次数。当设置为1时候表示仅仅运行一次,而出现重启时将不再执行。

<("testStep").<String, String>chunk(int).reader(testReader()) .processor(testProcessor).writer(testWriter()) .startLimit(1).build();

step每次重启都运行

可以通过设置allow-start-if-complete为true告知框架每次重启该Step都要执行

<("testStep").<String, String>chunk(int).reader(testReader()) .processor(testProcessor).writer(testWriter()) .allowStartIfComplete(true).build();

step失败后跳过

<("testStep").<String, String>chunk(int).reader(testReader()) .processor(testProcessor).writer(testWriter()) .skipLimit(10).skip(Exception.class).noSkip(FileNotFoundException.class).build();

skip-limit(skipLimit方法)配置的参数表示当跳过的次数超过数值时则会导致整个Step失败,从而停止继续运行
skip表示要当捕捉到Exception异常就跳过。但是Exception有很多继承类,此时可以使用noSkip方法指定某些异常不能跳过。

step重试

<("testStep").<String, String>chunk(int).reader(testReader()) .processor(testProcessor).writer(testWriter()) .faultTolerant().retryLimit(3).retry(DeadlockLoserDataAccessException.class).build();

retry(DeadlockLoserDataAccessException.class)表示只有捕捉到该异常才会重试,retryLimit(3)表示最多重试3次,faultTolerant()表示启用对应的容错功能。

step控制不会滚

<("testStep").<String, String>chunk(int).reader(testReader()) .processor(testProcessor).writer(testWriter()) .faultTolerant().noRollback(ValidationException.class) //不必回滚的异常.build();

noRollback属性为Step提供了不必进行事物回滚的异常配置

step数据重读

<("testStep").<String, String>chunk(int).reader(testReader()) .processor(testProcessor).writer(testWriter()) .readerIsTransactionalQueue() //数据重读.build();

默认情况下如果错误不是发生在Reader阶段,那么没必要再去重新读取一次数据。但是某些场景下需要Reader部分也需要重新执行,比如Reader是从一个JMS队列中消费消息,当发生回滚的时候消息也会在队列上重放,因此也要将Reader纳入到回滚的事物中,根据这个场景可以使用readerIsTransactionalQueue来配置数据重读

step事务属性

	//配置事物属性DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();attribute.setPropagationBehavior(Propagation.REQUIRED.value());attribute.setIsolationLevel(Isolation.DEFAULT.value());attribute.setTimeout(30);return ("testStep").<String, String>chunk(int).reader(testReader()) .processor(testProcessor).writer(testWriter()) .transactionAttribute(attribute) //设置事物属性.build();

Step注册 ItemStream

ItemStream是用于每一个阶段(Reader、Processor、Writer)的“生命周期回调数据处理器”。在4.×版本之后默认注入注册了通用的ItemStream。
一是使用stream方法

<("testStep").<String, String>chunk(int).reader(testReader()) .processor(testProcessor).writer(testWriter()) .stream(fileItemWriter1()).build();

二是使用相关方法的代理

	List<ItemWriter> writers = new ArrayList<>(2);writers.add(fileItemWriter1());writers.add(fileItemWriter2());CompositeItemWriter itemWriter = new CompositeItemWriter();itemWriter.setDelegates(writers);return itemWriter;

StepExecution拦截器

在Step执行的过程中会产生各种各样的事件,开发人员可以利用各种Listener接口对Step及Item进行监听。通常在创建一个Step的时候添加拦截器

<("testStep").<String, String>chunk(int).reader(testReader()) .processor(testProcessor).writer(testWriter()) .listener(chunkListener()) //添加拦截器.build();

StepExecutionListener

StepExecutionListener可以看做一个通用的Step拦截器,他的作用是在Step开始之前和结束之后进行拦截处理

public interface StepExecutionListener extends StepListener {void beforeStep(StepExecution stepExecution); //Step执行之前ExitStatus afterStep(StepExecution stepExecution); //Step执行完毕之后
}

ChunkListener

ChunkListener是在数据事物发生的两端被触发。chunk的配置决定了处理多少项记录才进行一次事物提交,ChunkListener的作用就是对一次事物开始之后或事物提交之后进行拦截

public interface ChunkListener extends StepListener {void beforeChunk(ChunkContext context); //事物开始之后,ItemReader调用之前void afterChunk(ChunkContext context); //事物提交之后void afterChunkError(ChunkContext context); //事物回滚之后
}

ItemReadListener

该接口用于对Reader相关的事件进行监控

public interface ItemReadListener<T> extends StepListener {void beforeRead();void afterRead(T item);void onReadError(Exception ex);
}

beforeRead在每次Reader调用之前被调用,afterRead在每次Reader成功返回之后被调用,而onReadError会在出现异常之后被调用,可以将其用于记录异常日志

ItemProcessListener

ItemProcessListener和ItemReadListener类似,是围绕着ItemProcessor进行处理的

public interface ItemProcessListener<T, S> extends StepListener {void beforeProcess(T item); //processor执行之前void afterProcess(T item, S result); //processor直线成功之后void onProcessError(T item, Exception e); //processor执行出现异常
}

ItemWriteListener

ItemWriteListener的功能和ItemReadListener、ItemReadListener类似,但是需要注意的是它接收和处理的数据对象是一个List。List的长度与chunk配置相关

public interface ItemWriteListener<S> extends StepListener {void beforeWrite(List<? extends S> items);void afterWrite(List<? extends S> items);void onWriteError(Exception exception, List<? extends S> items);
}

SkipListener

ItemReadListener、ItemProcessListener和ItemWriteListener都提供了错误拦截处理的机制,但是没有处理跳过(skip)的数据记录。因此框架提供了SkipListener来专门处理那么被跳过的记录

public interface SkipListener<T,S> extends StepListener {void onSkipInRead(Throwable t); //Read期间导致跳过的异常void onSkipInProcess(T item, Throwable t); //Process期间导致跳过的异常void onSkipInWrite(S item, Throwable t); //Write期间导致跳过的异常
}

SkipListener的价值是可以将那些未能成功处理的记录在某个位置保存下来,然后交给其他批处理进一步解决,或者人工来处理。Spring Batch保证以下2个特征:

  • 跳过的元素只会出现一次。
  • SkipListener始终在事物提交之前被调用,这样可以保证监听器使用的事物资源不会被业务事物影响。

TaskletStep

TaskletStep是一个非常简单的接口,仅有一个方法——execute。TaskletStep会反复的调用这个方法直到获取一个RepeatStatus.FINISHED返回或者抛出一个异常。所有的Tasklet调用都会包装在一个事物中。

<("step1").tasklet(myTasklet()) //注入Tasklet的实现.build();

step顺序执行

<("job").start(stepA()).next(stepB()) //顺序执行.next(stepC()).build();

step条件执行

<("job").start(stepA()) //启动时执行的("*").to(stepB()) //默认跳转到stepB.from(stepA()).on("FAILED").to(stepC()) //当返回的ExitStatus为"FAILED"时,执行。.end().build();

Step的停机退出机制

Spring Batch为Job提供了三种退出机制,这些机制为批处理的执行提供了丰富的控制方法。在介绍退出机制之前需要回顾一下 数据批处理概念一文中关于StepExecution的内容。在StepExecution中有2个表示状态的值,一个名为status,另外一个名为exitStatus。前者也被称为BatchStatus。前面以及介绍了ExitStatus的使用,他可以控制Step执行链条的条件执行过程。除此之外BatchStatus也会参与到过程的控制。

End退出

默认情况下(没有使用end、fail方法结束),Job要顺序执行直到退出,这个退出称为end。这个时候,BatchStatus=COMPLETED、ExitStatus=COMPLETED,表示成功执行。除了Step链式处理自然退出,也可以显示调用end来退出系统。

<("job").start(step1()) //启动.next(step2()) //顺序执行.on("FAILED").end().from(step2()).on("*").to(step3()) //条件执行.end().build();

step1到step2是顺序执行,当step2的exitStatus返回"FAILED"时则直接End退出。其他情况执行Step3。

Fail退出

除了end还可以使用fail退出,这个时候,BatchStatus=FAILED、ExitStatus=EARLY TERMINATION,表示执行失败。这个状态与End最大的区别是Job会尝试重启执行新的JobExecution。

<("job").start(step1()) //执行(step2()).on("FAILED").fail() //step2的ExitStatus=FAILED 执行fail.from(step2()).on("*").to(step3()) //否则执行d().build();

在指定的节点中断

Spring Batch还支持在指定的节点退出,退出后下次重启会从中断的点继续执行。中断的作用是某些批处理到某个步骤后需要人工干预,当干预完之后又接着处理

<("job")//如果step1的ExitStatus=COMPLETED则在step2中断.start(step1()).on("COMPLETED").stopAndRestart(step2())//否则直接退出批处理.end().build();

程序化流程的分支

可以直接进行编码来控制Step之间的扭转,Spring Batch提供了JobExecutionDecider接口来协助分支管理

public class MyDecider implements JobExecutionDecider {public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {String status;if (someCondition()) {status = "FAILED";}else {status = "COMPLETED";}return new FlowExecutionStatus(status);}
}

接着将MyDecider作为过滤器添加到配置过程中

<("job").start(step1()).next(decider()).on("FAILED").to(step2()).from(decider()).on("COMPLETED").to(step3()).end().build();

流程分裂

在线性处理过程中,流程都是一个接着一个执行的。但是为了满足某些特殊的需要,Spring Batch提供了执行的过程分裂并行Step的方法。

@Bean
public Job job() {Flow flow1 = new FlowBuilder<SimpleFlow>("flow1").start(step1()).next(step2()).build();//并行流程1Flow flow2 = new FlowBuilder<SimpleFlow>("flow2").start(step3()).build();//并行流程2return ("job").start(flow1).split(new SimpleAsyncTaskExecutor()) //创建一个异步执行任务.add(flow2).next(step4()) //2个分支执行完毕之后再执行step4。.end().build();
}

这里表示flow1和flow2会并行执行,待2者执行成功后执行step4。

数据绑定

在Job或Step的任何位置,都可以获取到统一配置的数据。比如使用标准的Spring Framework方式

@Bean
public FlatFileItemReader flatFileItemReader(@Value("${input.file.name}") String name) {return new FlatFileItemReaderBuilder<Foo>().name("flatFileItemReader").resource(new FileSystemResource(name))...
}

当我们通过配置文件(application.properties中 input.file.name=filepath)或者jvm参数(-Dinput.file.name=filepath)指定某些数据时,都可以通过这种方式获取到对应的配置参数。此外,也可以从JobParameters从获取到Job运行的上下文参数

@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {return new FlatFileItemReaderBuilder<Foo>().name("flatFileItemReader").resource(new FileSystemResource(name))...
}

无论是JobExecution还是StepExecution,其中的内容都可以通过这种方式去获取参数

@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.file.name']}") String name) {return new FlatFileItemReaderBuilder<Foo>().name("flatFileItemReader").resource(new FileSystemResource(name))...
}@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{stepExecutionContext['input.file.name']}") String name) {return new FlatFileItemReaderBuilder<Foo>().name("flatFileItemReader").resource(new FileSystemResource(name))...
}

Job Scope

Job Scope的概念和 Step Scope类似,都是用于标识在到了某个执行时间段再添加和注入Bean。@JobScope用于告知框架知道JobInstance存在时候才初始化对应的@Bean

@JobScope
@Bean
// 初始化获取 jobParameters中的参数
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input]}") String name) {return new FlatFileItemReaderBuilder<Foo>().name("flatFileItemReader").resource(new FileSystemResource(name))...
}@JobScope
@Bean
// 初始化获取jobExecutionContext中的参数
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext中的参数['input.name']}") String name) {return new FlatFileItemReaderBuilder<Foo>().name("flatFileItemReader").resource(new FileSystemResource(name))...
}

本文发布于:2024-02-04 15:29:45,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170710564056709.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:spring   batch   step
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23