背景
在后臺服務(wù)開發(fā)中, 經(jīng)常要用到多線程技術(shù)進(jìn)行加速執(zhí)行, 每家公司都有內(nèi)部多線程的框架, 這些框架不是文檔不規(guī)范, 就是只能適用特定場景.
基于這些原因, spring batch帶來了更易用, 性能更好的解決方案.
基本概念
JobRepository
job倉庫, 提供了JobLauncher, Job, Setp的CRUD實現(xiàn)
JobLauncher
job的啟動器, 可以傳入job所需參數(shù)
Job
一個任務(wù)概念, 可以包含多個step, 且對step的執(zhí)行順序進(jìn)行編排
Step
具體步驟, 基本包含reader, writer, reader后可選processor, 或者使用tesklet
下面用一個圖來說明他們之間的關(guān)系
概念還是挺簡單的, 就是框架有點(diǎn)復(fù)雜, 用起來坑不少
實踐代碼
我這里使用java config形式使用spring batch, 需要額外注意的是, 所有帶有@Bean的方法名不要重復(fù)
- build.gradle
buildscript {
ext {
springBootVersion = '1.5.2.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'war'
sourceCompatibility = 1.8
targetCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
compile('org.springframework.boot:spring-boot-starter-batch')
}
這里引用了spring boot starter batch, 只是為了解決jar包依賴問題, 實際使用時沒有使用spring boot.
創(chuàng)建任務(wù)使用的obj, TestObj.java
public class TestObj {
private String id;
private int index;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public int getIndex() {
return index;
}
public void setIndex(int index) {
this.index = index;
}
}
主邏輯BatchConfiguration.java
@EnableBatchProcessing
public class BatchConfiguration {
Object lock = new Object();
Logger logger = Logger.getRootLogger();
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step step1() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(10);
taskExecutor.setAllowCoreThreadTimeOut(true);
taskExecutor.afterPropertiesSet();
return stepBuilderFactory.get("step1")
.<TestObj, TestObj> chunk(10)
.reader(new ItemReader<TestObj>() {
private List<TestObj> list = null;
@Override
public synchronized TestObj read() throws Exception {
if (list == null) {
list = new ArrayList<TestObj>();
for (int i = 0; i < 10000; i++) {
TestObj obj = new TestObj();
obj.setId(UUID.randomUUID().toString());
obj.setIndex(i);
list.add(obj);
}
System.out.println("----------------"+list.size());
}
if (!this.list.isEmpty()) {
TestObj t = this.list.remove(0);
logger.info("step1==========read data:" + t.getIndex()));
return t;
}
return null;
}
})
.processor(new ItemProcessor<TestObj, TestObj>() {
public TestObj process(TestObj item) {
logger.debug("step1==============process: " + item.getIndex());
return item;
}
})
.writer(new ItemWriter<TestObj>() {
@Override
public void write(List<? extends TestObj> items) throws Exception {
logger.debug("step1=============write batch start: " + items.size());
for (TestObj item : items) {
logger.debug("step1=============write: " + item.getIndex());
}
logger.info("step1=============write batch end: " + items.size());
}
})
.taskExecutor(taskExecutor)
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2").tasklet(new Tasklet() {
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
logger.debug("step2========================Tasklet");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Job job1(Step step1, Step step2) throws Exception {
return jobBuilderFactory.get("job1").incrementer(new RunIdIncrementer()).start(step1).next(step2).build();
}
}
最后是啟動類Main.java
public class Main {
public static void main(String[] args) {
Logger logger = Logger.getRootLogger();
logger.setLevel(Level.INFO);
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
ctx.register(BatchConfiguration.class);
ctx.refresh();
JobLauncher jobLauncher = ctx.getBean(JobLauncher.class);
Job job = (Job)ctx.getBean("job1");
try {
jobLauncher.run(job, new JobParameters());
logger.debug("------job1 finished");
} catch (Exception e) {
e.printStackTrace();
}
}
}
其中需要注意以下幾點(diǎn)
- @EnableBatchProcessing 會默認(rèn)給出一些基礎(chǔ)配置
- JobRepository - bean name "jobRepository"
- JobLauncher - bean name "jobLauncher"
- JobRegistry - bean name "jobRegistry"
- PlatformTransactionManager - bean name "transactionManager"
- JobBuilderFactory - bean name "jobBuilders"
StepBuilderFactory - bean name "stepBuilders"
- 此處使用了多線程進(jìn)行執(zhí)行任務(wù), 其中taskExecutor.setAllowCoreThreadTimeOut(true);表示當(dāng)沒有任務(wù)時(默認(rèn)為60s), 線程池中線程會自動銷毀
- 自定義的ItemReader實現(xiàn)類中的read()方法需要加synchronized(多線程環(huán)境一定要加), 在官方文檔上有提過一嘴, 如果不是使用多線程可以不加, 在官方很多默認(rèn)實現(xiàn)中, 有一些是線程安全的, 有一些則不是, 如果非線程安全, 使用時都需要加上synchronized關(guān)鍵字
- 如果read()方法, 返回null, 則整個任務(wù)結(jié)束.
- chunk(10)表示當(dāng)每次傳入write的list的個數(shù)為10時, write執(zhí)行一次, 為主要的調(diào)優(yōu)方法
- 實際使用中, processor可以去掉
一些說明
spring batch本身有很多功能以及高級特性(比如監(jiān)聽, 任務(wù)流, spring batch admin), 本文中不做展開, 這里只針對最常用情況給出一個可用版本, 在我實際使用過程中, 發(fā)現(xiàn)大多數(shù)文章的例子基本都無法使用或者是使用xml或者不能單獨(dú)執(zhí)行.
很多時候還是要多看官方文檔, 只是官方文檔有點(diǎn)太平鋪直敘了