1. 引言
默認(rèn)情況下,Spring批處理作業(yè)在執(zhí)行過(guò)程中出現(xiàn)任何錯(cuò)誤都會(huì)失敗毛仪。然而有些時(shí)候哮幢,為了提高應(yīng)用程序的彈性唬党,我們就需要處理這類間歇性的故障。
在這篇短文中炬丸,我們就來(lái)一起探討 如何在Spring批處理框架中配置重試邏輯瘫寝。
2. 簡(jiǎn)單舉例
假設(shè)有一個(gè)批處理作業(yè)蜒蕾,它讀取一個(gè)CSV文件作為輸入:
username, userid, transaction_date, transaction_amount
sammy, 1234, 31/10/2015, 10000
john, 9999, 3/12/2015, 12321
然后稠炬,它通過(guò)訪問(wèn)REST端點(diǎn)來(lái)處理每條記錄,獲取用戶的 age 和 postCode 屬性:
public class RetryItemProcessor implements ItemProcessor<Transaction, Transaction> {
@Override
public Transaction process(Transaction transaction) throws IOException {
log.info("RetryItemProcessor, attempting to process: {}", transaction);
HttpResponse response = fetchMoreUserDetails(transaction.getUserId());
//parse user's age and postCode from response and update transaction
...
return transaction;
}
...
}
最后咪啡,它生成并輸出一個(gè)合并的XML:
<transactionRecord>
<transactionRecord>
<amount>10000.0</amount>
<transactionDate>2015-10-31 00:00:00</transactionDate>
<userId>1234</userId>
<username>sammy</username>
<age>10</age>
<postCode>430222</postCode>
</transactionRecord>
...
</transactionRecord>
3. ItemProcessor 中添加重試
現(xiàn)在假設(shè)首启,如果到REST端點(diǎn)的連接由于某些網(wǎng)絡(luò)速度慢而超時(shí),該怎么辦撤摸?如果發(fā)生這種情況毅桃,則我們的批處理工作將失敗。
在這種情況下准夷,我們希望失敗的 item 處理重試幾次钥飞。因此,接下來(lái)我將批處理作業(yè)配置為:在出現(xiàn)故障時(shí)執(zhí)行最多三次重試:
@Bean
public Step retryStep(
ItemProcessor<Transaction, Transaction> processor,
ItemWriter<Transaction> writer) throws ParseException {
return stepBuilderFactory
.get("retryStep")
.<Transaction, Transaction>chunk(10)
.reader(itemReader(inputCsv))
.processor(processor)
.writer(writer)
.faultTolerant()
.retryLimit(3)
.retry(ConnectTimeoutException.class)
.retry(DeadlockLoserDataAccessException.class)
.build();
}
這里調(diào)用了 faultTolerant() 來(lái)啟用重試功能衫嵌。另外读宙,我們使用 retry 和 retryLimit 分別定義符合重試條件的異常和 item 的最大重試次數(shù)。
4. 測(cè)試重試次數(shù)
假設(shè)我們有一個(gè)測(cè)試場(chǎng)景楔绞,其中返回 age 和 postCode 的REST端點(diǎn)關(guān)閉了一段時(shí)間结闸。在這個(gè)測(cè)試場(chǎng)景中,我們只對(duì)前兩個(gè) API 調(diào)用獲取一個(gè) ConnectTimeoutException 酒朵,而第三個(gè)調(diào)用將成功:
@Test
public void whenEndpointFailsTwicePasses3rdTime_thenSuccess() throws Exception {
FileSystemResource expectedResult = new FileSystemResource(EXPECTED_OUTPUT);
FileSystemResource actualResult = new FileSystemResource(TEST_OUTPUT);
when(httpResponse.getEntity())
.thenReturn(new StringEntity("{ \"age\":10, \"postCode\":\"430222\" }"));
//fails for first two calls and passes third time onwards
when(httpClient.execute(any()))
.thenThrow(new ConnectTimeoutException("Timeout count 1"))
.thenThrow(new ConnectTimeoutException("Timeout count 2"))
.thenReturn(httpResponse);
JobExecution jobExecution = jobLauncherTestUtils
.launchJob(defaultJobParameters());
JobInstance actualJobInstance = jobExecution.getJobInstance();
ExitStatus actualJobExitStatus = jobExecution.getExitStatus();
assertThat(actualJobInstance.getJobName(), is("retryBatchJob"));
assertThat(actualJobExitStatus.getExitCode(), is("COMPLETED"));
AssertFile.assertFileEquals(expectedResult, actualResult);
}
在這里桦锄,我們的工作成功地完成了。另外蔫耽,從日志中可以明顯看出 第一條記錄 id=1234 失敗了兩次结耀,最后在第三次重試時(shí)成功了:
19:06:57.742 [main] INFO o.s.batch.core.job.SimpleStepHandler - Executing step: [retryStep]
19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=1234
19:06:57.758 [main] INFO o.b.batch.service.RetryItemProcessor - Attempting to process user with id=9999
19:06:57.773 [main] INFO o.s.batch.core.step.AbstractStep - Step: [retryStep] executed in 31ms
同樣,看下另一個(gè)測(cè)試用例,當(dāng)所有重試次數(shù)都用完時(shí)會(huì)發(fā)生什么:
@Test
public void whenEndpointAlwaysFail_thenJobFails() throws Exception {
when(httpClient.execute(any()))
.thenThrow(new ConnectTimeoutException("Endpoint is down"));
JobExecution jobExecution = jobLauncherTestUtils
.launchJob(defaultJobParameters());
JobInstance actualJobInstance = jobExecution.getJobInstance();
ExitStatus actualJobExitStatus = jobExecution.getExitStatus();
assertThat(actualJobInstance.getJobName(), is("retryBatchJob"));
assertThat(actualJobExitStatus.getExitCode(), is("FAILED"));
assertThat(actualJobExitStatus.getExitDescription(),
containsString("org.apache.http.conn.ConnectTimeoutException"));
}
在這個(gè)測(cè)試用例中图甜,在作業(yè)因 ConnectTimeoutException
而失敗之前香伴,會(huì)嘗試對(duì)第一條記錄重試三次。
5. 使用XML配置重試
最后具则,讓我們看一下與上述配置等價(jià)的XML:
<batch:job id="retryBatchJob">
<batch:step id="retryStep">
<batch:tasklet>
<batch:chunk reader="itemReader" writer="itemWriter"
processor="retryItemProcessor" commit-interval="10"
retry-limit="3">
<batch:retryable-exception-classes>
<batch:include class="org.apache.http.conn.ConnectTimeoutException"/>
<batch:include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
</batch:retryable-exception-classes>
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
6. 簡(jiǎn)單總結(jié)
在本文中即纲,我們學(xué)習(xí)了如何在Spring批處理中配置重試邏輯,其中包括使用Java和XML配置博肋。以及使用單元測(cè)試來(lái)觀察重試在實(shí)踐中是如何工作的低斋。
如果你覺(jué)得文章還不錯(cuò),記得關(guān)注公眾號(hào): 鍋外的大佬
鍋外的大佬博客