概述
生產(chǎn)開(kāi)發(fā)過(guò)程經(jīng)常遇到數(shù)據(jù)加工場(chǎng)景识啦,如果處理不好很容易引起各種問(wèn)題负蚊,比如:加工慢、漏加工等颓哮,本文針對(duì)常見(jiàn)的無(wú)序加工場(chǎng)景進(jìn)行分析介紹家妆。
案例
先看一段目前主流的加工代碼,以下代碼存在一些坑冕茅。
題外話:還有些童鞋使用了死循環(huán)取數(shù)加工伤极,取不到數(shù)據(jù)時(shí)跳出循環(huán)或者休眠指定時(shí)間,普通業(yè)務(wù)場(chǎng)景不推薦姨伤。
public void doExecute() {
...
try {
long count = getBillRepository().count(store.getCode(), latestProcessTime, executeTime);
int loop = (int) Math.ceil(Double.valueOf(count) / pageSize);
for (int i = 0; i < loop; i++) {
//list方法一般會(huì)按照時(shí)間或某個(gè)字段排序哨坪,保證分頁(yè)取數(shù)不會(huì)亂
List<Bill> list = getBillRepository().list(store.getCode(),
latestProcessTime, executeTime, i, pageSize);
for (Bill bill : list) {
// 加工
processOne(bill);
}
}
} catch (Exception e) {
log.error("加工XX流水發(fā)生錯(cuò)誤:", e);
}
}
private void processOne(Bill bill) throws SwallowsServiceException {
//實(shí)際加工內(nèi)容 TODO
}
問(wèn)題分析
- processOne方法中如果拋出異常,那么會(huì)導(dǎo)致加工異常中斷乍楚,影響到下一批數(shù)據(jù)的加工当编,如果錯(cuò)誤數(shù)據(jù)得不到解決,那么加工程序就一起卡在這條錯(cuò)誤數(shù)據(jù)徒溪。
- 影響級(jí)別:高忿偷,生產(chǎn)事故拧篮。
- 解決方案:processOne內(nèi)部捕獲異常,不對(duì)外拋出牵舱,同時(shí)一般會(huì)提供記錄失敗重試次數(shù)以及失敗原因串绩,示例如下所示
private void processOne(Bill bill) {
try{
//實(shí)際加工內(nèi)容 TODO
} catch (Exception e) {
log.error("加工單據(jù)XX發(fā)生錯(cuò)誤:", e);
try{
BillFailure failure = new BillFailure();
failure.setBillId(bill.getId());
//有童鞋不截取消息,導(dǎo)致數(shù)據(jù)庫(kù)存儲(chǔ)超長(zhǎng)報(bào)錯(cuò)芜壁,當(dāng)然有些童鞋已經(jīng)在fail方法中處理礁凡,此處特別列出坑。
failure.setMessage(StringUtils.substring(e.getMessage), 0, 255);
//fail中會(huì)累加重試次數(shù)update XX set retries=retries+1 where ...
getBillRepository().fail(failure);//此處代碼還可以進(jìn)一步優(yōu)化慧妄,比如累積一批失敗記錄顷牌,批量更新記錄日志
}catch(Exception e){
log.error("", e);
}
}
}
- 如果processOne會(huì)影響list查詢(xún)的數(shù)據(jù),比如刪除或修改了待加工數(shù)據(jù)為已加工塞淹,會(huì)導(dǎo)致查詢(xún)的數(shù)據(jù)范圍發(fā)生變化窟蓝,那么再取第二個(gè)分頁(yè)數(shù)據(jù)時(shí),實(shí)際上已經(jīng)不是未加工前的第二個(gè)分頁(yè)數(shù)據(jù)饱普;原第二個(gè)分頁(yè)數(shù)據(jù)部分?jǐn)?shù)據(jù)已跑到第一個(gè)分頁(yè)中运挫,如此導(dǎo)致本次加工作業(yè)漏加工數(shù)據(jù)。
- 影響級(jí)別:中套耕,目前大部分是定時(shí)加工谁帕,錯(cuò)過(guò)的數(shù)據(jù),下一次定時(shí)作業(yè)還能加工到冯袍,但是加工處理過(guò)程會(huì)被拉長(zhǎng)匈挖。
- 解決方案:如果不同數(shù)據(jù)間的加工無(wú)關(guān)聯(lián),無(wú)需保證順序的話康愤,那么可以從最后一個(gè)分頁(yè)往前加工儡循,如下所示
// doExecute方法中的代碼
for (int i = loop-1; i >=0; i--) {
//list方法一般會(huì)按照時(shí)間或某個(gè)字段排序,保證分頁(yè)取數(shù)不會(huì)亂
List<Bill> list = getBillRepository().list(store.getCode(),
latestProcessTime, executeTime, i, pageSize);
for (Bill bill : list) {
// 加工
processOne(bill);
}
}
- 如果數(shù)據(jù)加工失敗沒(méi)得到及時(shí)修復(fù)征冷,那么這部分?jǐn)?shù)據(jù)會(huì)一直重復(fù)加工择膝,純粹浪費(fèi)資源。
- 影響級(jí)別:低
- 解決方案:
- 增加最大重試次數(shù)资盅,超過(guò)最大重試次數(shù)之后不再進(jìn)行加工调榄;
- list待加工數(shù)據(jù)時(shí)限制重試次數(shù)踊赠,processOne失敗增加重試次數(shù)呵扛。
- 如果查詢(xún)的待加工表數(shù)據(jù)量異常龐大,比如百萬(wàn)級(jí)別數(shù)據(jù)量以上筐带,那么使用count統(tǒng)計(jì)數(shù)據(jù)隨數(shù)據(jù)量的增加對(duì)應(yīng)查詢(xún)耗時(shí)增加今穿。
- 影響級(jí)別:低,目前較少遇到大數(shù)據(jù)加工場(chǎng)景伦籍,如果有那么當(dāng)前的代碼框架也得換了蓝晒。
- 解決方案:(只適用于待加工數(shù)據(jù)范圍不會(huì)變化的場(chǎng)景腮出,比如全量同步某種資料,該場(chǎng)景較少)去掉count語(yǔ)句芝薇,最外層for改用while
// doExecute方法中的代碼
int page = 0;
List<Bill> list=null;
while(CollectionUtils.isNotEmpty(list=getBillRepository().list(store.getCode(),
latestProcessTime, executeTime,page, pageSize)){
for (Bill bill : list) {
// 加工
processOne(bill);
}
if (list.size() < pageSize) {
break;
}
page++;
}
推薦寫(xiě)法
public void doExecute() {
...
try {
long count = getBillRepository().count(store.getCode(), latestProcessTime, executeTime);
int loop = (int) Math.ceil(Double.valueOf(count) / pageSize);
for (int i = loop-1; i >=0; i--) {
//list方法一般會(huì)按照時(shí)間或某個(gè)字段排序胚嘲,保證分頁(yè)取數(shù)不會(huì)亂
List<Bill> list = getBillRepository().list(store.getCode(),
latestProcessTime, executeTime, i, pageSize);
for (Bill bill : list) {
// 加工
processOne(bill);
}
}
} catch (Exception e) {
log.error("加工XX流水發(fā)生錯(cuò)誤:", e);
}
}
private void processOne(Bill bill) {
try{
//實(shí)際加工內(nèi)容 TODO
} catch (Exception e) {
log.error("加工單據(jù)XX發(fā)生錯(cuò)誤:", e);
try{
BillFailure failure = new BillFailure();
failure.setBillId(bill.getId());
//有童鞋不截取消息,導(dǎo)致數(shù)據(jù)庫(kù)存儲(chǔ)超長(zhǎng)報(bào)錯(cuò)洛二,當(dāng)然有些童鞋已經(jīng)在fail方法中處理馋劈,此處特別列出坑。
failure.setMessage(StringUtils.substring(e.getMessage), 0, 255);
//fail中會(huì)累加重試次數(shù)update XX set retries=retries+1 where ...
getBillRepository().fail(failure);//此處代碼還可以進(jìn)一步優(yōu)化晾嘶,比如累積一批失敗記錄妓雾,批量更新記錄日志
}catch(Exception e){
log.error("", e);
}
}
}
基類(lèi)抽象寫(xiě)法
可將以上通用過(guò)程抽取基類(lèi),減少開(kāi)發(fā)踩坑垒迂,最終代碼可能如下所示:
//如果想一次取出所有數(shù)據(jù)械姻,total方法返回1,fetchData查詢(xún)返回所有數(shù)據(jù)即可
@Slf4j
@Component
@ConditionalOnProperty(value = "demo.job.enabled", havingValue = "true")
public class DemoDataProcessJob extends DataProcessAbstractJob<PChain> {
public static final String CRON_EXPRESSION_KEY = "demo.job.cronExpression";
public static final String CRON_EXPRESSION_DEFAULT_VALUE = "0 0 0,12 * * ?";
@Autowired
private ChainRepository repository;
//方法說(shuō)明:待加工的總記錄數(shù)
@Override
protected long total() {
return repository.count();
}
//方法說(shuō)明:獲取指定頁(yè)碼的一頁(yè)待加工數(shù)據(jù)
@Override
protected List<PChain> fetchData(int page, int pageSize) {
//特別注意查詢(xún)方法中需要按照一定規(guī)則排序机断,一般是XX時(shí)間字段
return repository.list(page, pageSize);
}
//方法說(shuō)明:加工一頁(yè)數(shù)據(jù)
@Override
protected void processData(List<PChain> data) {
for (PChain chain : data) {
processOne(chain);
}
}
//方法說(shuō)明(實(shí)際代碼別拷貝我):加工一條數(shù)據(jù)
private void processOne(PChain chain) {
try {
//TODO 此處做實(shí)際數(shù)據(jù)加工處理
} catch (Exception e) {
log.error("加工單據(jù)XX發(fā)生錯(cuò)誤:", e);
//TODO 此處做數(shù)據(jù)加工異常處理楷拳,比如增加最大重試次數(shù)、記錄失敗日志
}
}
//方法說(shuō)明(實(shí)際代碼別拷貝我):分頁(yè)大小吏奸,默認(rèn)為500唯竹,根據(jù)業(yè)務(wù)需要可重寫(xiě)父類(lèi)此方法
// @Override
// protected int pageSize() {
// return super.pageSize();
// }
//方法說(shuō)明(實(shí)際代碼別拷貝我):一頁(yè)數(shù)據(jù)加工過(guò)程發(fā)生異常是否忽略,繼續(xù)加工下一分頁(yè)數(shù)據(jù)苦丁;默認(rèn)為true浸颓。
// @Override
// protected boolean ignoreException() {
// return super.ignoreException();
// }
@Override
public String getDescription() {
return "數(shù)據(jù)加工作業(yè)示例";
}
@Override
public String getCronExpression() {
Environment env = ApplicationContextUtils.getBean(Environment.class);
return env.getProperty(CRON_EXPRESSION_KEY, CRON_EXPRESSION_DEFAULT_VALUE);
}
}
其它思考
- 加工過(guò)程涉及的取數(shù)、更新等操作盡量使用批量旺拉。
- 針對(duì)頻繁失敗的加工數(shù)據(jù)产上,除了增加重試次數(shù)以外,還可將重試加工的時(shí)間往后延蛾狗,避免失敗數(shù)據(jù)積壓影響正常數(shù)據(jù)的加工速度晋涣。思路類(lèi)似Spring Retry中的重試等待策略。
- 集群場(chǎng)景沉桌,可考慮結(jié)合quartz谢鹊、ElasticJob等實(shí)現(xiàn)分布式定時(shí)作業(yè)加工。
- 如數(shù)據(jù)量較大留凭,加工時(shí)限要求高的佃扼,可引入線程池并發(fā)加工。(大部分場(chǎng)景無(wú)需引入并發(fā)處理)