tags: springbatch
1.引言
前面《數(shù)據(jù)批處理神器-Spring Batch(1)簡介及使用場景》已經(jīng)介紹了Spring Batch
是一個輕量級翔悠,完善的批處理框架峡迷,它使用起來簡單,方便票编,比較適合有點編程基礎(chǔ)(特別是使用Spring及SpringBoot框架)的開發(fā)人員,針對業(yè)務(wù)編程咽白,只需要關(guān)心具體的業(yè)務(wù)實現(xiàn)即可闺魏,把流程以及流程的控制交給Spring Batch
就好。常言道"talk is cheap, show me the code
",下面我們就通過一個簡單的hello world
蜕径,進(jìn)入Spring Batch
的世界怪蔑,通過這個示例,可以快速了解開發(fā)批處理的流程和Spring Batch
開發(fā)用到的組件丧荐,為后續(xù)的操作打下基礎(chǔ)。
2.開發(fā)環(huán)境
- JDK: jdk1.8
- Spring Boot: 2.1.4.RELEASE
- Spring Batch:4.1.2.RELEASE
- 開發(fā)IDE: IDEA
- 構(gòu)建工具M(jìn)aven: 3.3.9
- 日志組件logback:1.2.3
- lombok:1.18.6
3.helloworld開發(fā)
3.1 helloworld說明
本helloworld實現(xiàn)一個非常簡單的功能喧枷,就是從數(shù)據(jù)組中讀取字符串虹统,把字符串轉(zhuǎn)為大寫,然后輸出到控制臺隧甚。如圖:
整個過程就是一個批重任務(wù)(Job
)车荔,它只有一個步驟(Job Step
),步驟里分為三個階段戚扳,讀數(shù)據(jù)(ItemReader)忧便、處理數(shù)據(jù)(ItemProcessor)、寫數(shù)據(jù)(ItemWriter)帽借。
3.2 開發(fā)流程
開發(fā)的主要代碼如下:
總體來說就是珠增,通過Reader
,Processor
砍艾、Writer
完成任務(wù)蒂教,結(jié)束后通過Listener
進(jìn)行監(jiān)聽,整個任務(wù)通過配置(BatchConfig
)進(jìn)行配置脆荷。
3.2.1 創(chuàng)建Spring Boot
工程
直接使用Idea生成或在使用Spring Initializr
生成即可凝垛,此處不詳細(xì)說明。也可以直接使用我的代碼示例蜓谋。當(dāng)前使用的Spring Boot
版本是2.1.4.RELEASE
3.2.2 添加相關(guān)依賴
-
Spring Batch
依賴
在使用spring-boot-starter-parent
的情況下梦皮,直接添加以下依賴即可:
<!-- 批處理框架-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
引用后,會引用兩個jar包桃焕,一個是spring-batch-infrastructure
剑肯,一個是spring-batch-core
,版本是4.1.2.RELEASE
覆旭。分別對應(yīng)的是基礎(chǔ)框架層和核心層退子。
- 添加內(nèi)存數(shù)據(jù)庫H2
Spring Batch
是需要數(shù)據(jù)庫來存儲任務(wù)的基本信息以及運行狀態(tài)的,本例中不需要操作數(shù)據(jù)庫邏輯型将,直接使用內(nèi)存數(shù)據(jù)庫H2即可寂祥。添加以下依賴:
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
- 添加測試及工具類依賴
為了簡化開發(fā),使用lombok
進(jìn)行處理七兜。使用Spring Boot
進(jìn)行單元測試丸凭,添加依賴如下:
<!-- 工具包:lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
</dependency>
<!-- 測試框架 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
3.2.3 開發(fā)讀數(shù)據(jù)組件ItemReader
添加完依賴后,就可以進(jìn)入業(yè)務(wù)邏輯編程了。按Spring Batch
的批處理流程惜犀,讀數(shù)據(jù)ItemReader
是第一步铛碑,當(dāng)前示例中,我們的任務(wù)是從數(shù)組中讀取數(shù)據(jù)虽界。ItemReader
是一個接口汽烦,開發(fā)人員直接實現(xiàn)此接口即可。此接口定義了核心方法read()
莉御,負(fù)責(zé)從給定的資源中讀取可用的數(shù)據(jù)撇吞。具體實現(xiàn)如下:
@Slf4j
public class StringReader implements ItemReader<String> {
private String[] messages = {"aaa1","aaa2","aaa3","aaa4"};
private int count = 0;
@Override
public String read() throws UnexpectedInputException, ParseException, NonTransientResourceException {
if(count < messages.length){
String message = messages[count++];
log.debug(LogConstants.LOG_TAG + "read data:"+message);
return message;
}else{
log.debug(LogConstants.LOG_TAG + "read data end.");
count = 0;
}
return null;
}
}
說明:
- (1)StringReader實現(xiàn)ItemReader接口;
- (2)messages是數(shù)據(jù)源礁叔;
- (3)count表示讀取數(shù)據(jù)的下標(biāo)牍颈,每讀一次,下標(biāo)自增琅关,讀取完后返回null表示結(jié)束煮岁。同時把count置為0,以方便下次讀取涣易。
- (4)日志輸出使用的是logback画机,結(jié)合lombok的
@Slf4j
注解,直接可使用log進(jìn)行輸出新症,簡化操作色罚。
3.2.4 開發(fā)處理數(shù)據(jù)組件ItemProcessor
讀取數(shù)據(jù)后,返回的數(shù)據(jù)會流到ItemProcessor
進(jìn)行處理账劲。同樣戳护,ItemProcessor
是一個接口,要實現(xiàn)自己的處理邏輯瀑焦,實現(xiàn)此接口即可腌且。當(dāng)然,如果沒有ItemProcessor
榛瓮,讀到的數(shù)據(jù)直接就到ItemWriter
流程也是可以的铺董。此處,Spring Batch
有一個Chunk
的概念禀晓,用于多次讀精续,直到chunk指定的數(shù)量后,再統(tǒng)一給到processor和writer粹懒,以提高效率重付。本示例對于ItemProcessor
的實現(xiàn)很簡單,即把字符串轉(zhuǎn)為大寫凫乖。如下:
@Slf4j
public class ConvertProcessor implements ItemProcessor<String,String> {
@Autowired
private ConsoleService consoleService;
@Override
public String process(String data) {
String dataProcessed = consoleService.convert2UpperCase(data);
log.debug(LogConstants.LOG_TAG + data +" process data --> " + dataProcessed);
return dataProcessed;
}
}
說明:
- 實現(xiàn)ItemProcessor接口确垫,它有兩個泛型弓颈,分別是I和O,I是讀階段獲取的數(shù)據(jù)删掀,O是提交給寫階段的數(shù)據(jù)翔冀。
- 使用ConsoleService服務(wù),對數(shù)據(jù)進(jìn)行大寫轉(zhuǎn)換披泪,里面的實現(xiàn)直接使用字符串的
toUpperCase()
方法
3.2.5 開發(fā)寫數(shù)據(jù)組件ItemWriter
數(shù)據(jù)處理完后纤子,會統(tǒng)一交給寫組件(ItemWriter
)進(jìn)行寫入。ItemWriter
也是一個接口款票,核心方法是write
方法计福,參數(shù)是數(shù)組。要實現(xiàn)自己的邏輯徽职,實現(xiàn)此接口即可。本示例中佩厚,直接把數(shù)據(jù)輸出到日志中即可姆钉。如下:
@Slf4j
public class ConsoleWriter implements ItemWriter<String> {
@Override
public void write(List<? extends String> list) {
for (String msg :list) {
log.debug(LogConstants.LOG_TAG + "write data: "+msg);
}
}
}
3.2.6 開發(fā)任務(wù)完成后的監(jiān)聽器JobExecutionListener
數(shù)據(jù)寫入到目標(biāo)后,任務(wù)即結(jié)束抄瓦,但有時候我們還需要在任務(wù)結(jié)束時去做一些其它工作潮瓶,如清理數(shù)據(jù),更新時間等钙姊,則需要在任務(wù)完成后進(jìn)行邏輯處理毯辅。Spring Batch
對于任務(wù)或步驟開始和結(jié)束都會提供監(jiān)聽,以便于開發(fā)人員實現(xiàn)監(jiān)聽邏輯煞额。如通過繼承JobExecutionListenerSupport
思恐,可以實現(xiàn)beforeJob
和afterJob
的監(jiān)聽,以實現(xiàn)開始任務(wù)前和結(jié)束任務(wù)后的處理膊毁。當(dāng)前示例中胀莹,僅輸出任務(wù)完成的日志。如下:
@Slf4j
public class ConsoleJobEndListener extends JobExecutionListenerSupport {
@Override
public void afterJob(JobExecution jobExecution) {
if(jobExecution.getStatus() == BatchStatus.COMPLETED){
log.debug("console batch job complete!");
}
}
}
3.2.7 配置完整任務(wù)
經(jīng)過上面的讀婚温、處理描焰、寫、任務(wù)完成后監(jiān)聽的操作栅螟,現(xiàn)在需要把它們組裝在一起荆秦,形成一個完成的任務(wù),使用Spring Boot
力图,簡單的使用幾個配置即可完成任務(wù)的組裝步绸。任務(wù)及其相關(guān)組件的關(guān)系如下:
創(chuàng)建配置文件ConsoleBatchConfig.java
,具體代碼如下:
@Configuration
@EnableBatchProcessing
public class ConsoleBatchConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public Job consoleJob(Step consoleStep,JobExecutionListener consoleListener){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return jobBuilderFactory.get(funcName).listener(consoleListener).flow(consoleStep)
.end().build();
}
@Bean
public Step consoleStep(ItemReader stringReader,ItemProcessor convertProcessor
,ItemWriter consoleWriter, CommonStepListener commonStepListener){
String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
return stepBuilderFactory.get(funcName).listener(commonStepListener)
.<String,String>chunk(3).reader(stringReader).processor(convertProcessor)
.writer(consoleWriter).build();
}
@Bean
public ItemReader stringReader(){return new StringReader();}
@Bean
public ItemWriter consoleWriter(){return new ConsoleWriter();}
@Bean
public ItemProcessor convertProcessor(){return new ConvertProcessor();}
@Bean
public JobExecutionListener consoleListener(){return new ConsoleJobEndListener();}
}
說明:
- 添加注解
@Configuration及
和@EnableBatchProcessing
吃媒,標(biāo)識為配置及啟用Spring Batch
的配置(可以直接使用JobBuilderFactory
及StepBuilderFactory
分別用于創(chuàng)建Job和Step)靡努。 - 創(chuàng)建
ItemReader
坪圾、ItemWriter
、ItemProcessor
惑朦、Listener
對應(yīng)的Bean
兽泄,以供Step及Job的注入。 - 使用
stepBuilderFactory
創(chuàng)建作業(yè)Step漾月,其中chunk進(jìn)行面向塊的處理病梢,即多次讀取后再寫入,提高效率梁肿。當(dāng)前配置是3個為一個chunk蜓陌。 - 使用
jobBuilderFactory
添加step,創(chuàng)建任務(wù)吩蔑。 - 注意step和Job都需要有對應(yīng)的名稱(
get
方法確定)钮热,此處直接使用方法名作為Job和Step的名稱。
3.2.8 測試批處理
經(jīng)過上面的步驟烛芬,已經(jīng)完成Job的開發(fā)隧期,測試則可使用兩種方式,一個是編寫Controller
赘娄,以接口調(diào)用的方式運行job仆潮,一種編寫單元測試。
- Job的運行
通過JobLauncher
的run
方法來運行任務(wù)遣臼,run
方法參數(shù)分別是Job
和jobParameters
性置,即已配置的Job及job運行的參數(shù)。每個任務(wù)的區(qū)分是通過任務(wù)名(jobName
)和任務(wù)參數(shù)(jobParameters
)作為區(qū)別的揍堰,即如果jobName
和jobParameters
相同鹏浅,Spring Batch
會認(rèn)為是同一任務(wù),若任務(wù)已運行成功屏歹,同一任務(wù)不會再運行篡石。因此,一般來說西采,不同的任務(wù)凰萨,我們的jobParameters
可以直接以時間作為參數(shù),以便于區(qū)別械馆。生成jobParameters
胖眷。代碼如下:
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time",System.currentTimeMillis())
.toJobParameters();
- 編寫單元測試
編寫ConsoleJobTest
,加載job霹崎,運行測試珊搀,如下所示:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MainBootApplication.class,ConsoleBatchConfig.class})
@Slf4j
public class ConsoleJobTest {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job consoleJob;
public void testConsoleJob2() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
//構(gòu)建參數(shù)
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time",System.currentTimeMillis())
.toJobParameters();
//執(zhí)行任務(wù)
JobExecution run = jobLauncher.run(consoleJob, jobParameters);
ExitStatus exitStatus = run.getExitStatus();
log.debug(exitStatus.toString());
}
}
說明:引入SpringBootTest
注解時,需要把Spring Batch
任務(wù)也引入進(jìn)來尾菇。
-
執(zhí)行結(jié)果輸出
執(zhí)行結(jié)果如下圖所示:從輸出可知境析,由于設(shè)置的
chunk
是3囚枪,讀取3個數(shù)據(jù)后,就統(tǒng)一給ItemProcessor
進(jìn)行大寫轉(zhuǎn)換處理劳淆,然后統(tǒng)一交給ItemWriter
進(jìn)行寫入链沼。執(zhí)行完成后,Job的exitCode表示任務(wù)執(zhí)行的狀態(tài)沛鸵,如果正常則為COMPLETED
括勺,失敗則為FAILED
。
4.總結(jié)
經(jīng)過以上的操作步驟曲掰,即可完成批處理操作疾捍。關(guān)于任務(wù)的狀態(tài),流程的步驟(讀栏妖、處理乱豆、寫)均交給Spring Batch
來完成,開發(fā)人員所做的工作是根據(jù)自己的業(yè)務(wù)邏輯編寫具體的讀數(shù)據(jù)吊趾、處理數(shù)據(jù)和寫數(shù)據(jù)即可宛裕。希望通過本文,大家可以對Spring Batch
的組件有清晰的了解趾徽。