定時(shí)作業(yè)數(shù)據(jù)加工如何寫(xiě)?

概述

生產(chǎn)開(kāi)發(fā)過(guò)程經(jīng)常遇到數(shù)據(jù)加工場(chǎng)景识啦,如果處理不好很容易引起各種問(wèn)題负蚊,比如:加工慢、漏加工等颓哮,本文針對(duì)常見(jiàn)的無(wú)序加工場(chǎng)景進(jìn)行分析介紹家妆。

案例

先看一段目前主流的加工代碼,以下代碼存在一些坑冕茅。
題外話:還有些童鞋使用了死循環(huán)取數(shù)加工伤极,取不到數(shù)據(jù)時(shí)跳出循環(huán)或者休眠指定時(shí)間,普通業(yè)務(wù)場(chǎng)景不推薦姨伤。

  public void doExecute() {
     ...
    try {
          long count = getBillRepository().count(store.getCode(), latestProcessTime,  executeTime);
          int loop = (int) Math.ceil(Double.valueOf(count) / pageSize);
          for (int i = 0; i < loop; i++) {
            //list方法一般會(huì)按照時(shí)間或某個(gè)字段排序哨坪,保證分頁(yè)取數(shù)不會(huì)亂
            List<Bill> list = getBillRepository().list(store.getCode(),
                latestProcessTime, executeTime, i, pageSize);
            for (Bill bill : list) {
              //  加工
              processOne(bill);
            }
          }
      } catch (Exception e) {
      log.error("加工XX流水發(fā)生錯(cuò)誤:", e);
    }
  }
      
     
 private void processOne(Bill bill) throws SwallowsServiceException {
        //實(shí)際加工內(nèi)容 TODO
 }

問(wèn)題分析

  1. processOne方法中如果拋出異常,那么會(huì)導(dǎo)致加工異常中斷乍楚,影響到下一批數(shù)據(jù)的加工当编,如果錯(cuò)誤數(shù)據(jù)得不到解決,那么加工程序就一起卡在這條錯(cuò)誤數(shù)據(jù)徒溪。
    • 影響級(jí)別:高忿偷,生產(chǎn)事故拧篮。
    • 解決方案:processOne內(nèi)部捕獲異常,不對(duì)外拋出牵舱,同時(shí)一般會(huì)提供記錄失敗重試次數(shù)以及失敗原因串绩,示例如下所示
private void processOne(Bill bill)  {
    try{
        //實(shí)際加工內(nèi)容 TODO
     } catch (Exception e) {
       log.error("加工單據(jù)XX發(fā)生錯(cuò)誤:", e);
       try{
           BillFailure failure = new BillFailure();
           failure.setBillId(bill.getId());
           //有童鞋不截取消息,導(dǎo)致數(shù)據(jù)庫(kù)存儲(chǔ)超長(zhǎng)報(bào)錯(cuò)芜壁,當(dāng)然有些童鞋已經(jīng)在fail方法中處理礁凡,此處特別列出坑。
           failure.setMessage(StringUtils.substring(e.getMessage), 0, 255);
           //fail中會(huì)累加重試次數(shù)update XX set retries=retries+1 where ...
           getBillRepository().fail(failure);//此處代碼還可以進(jìn)一步優(yōu)化慧妄,比如累積一批失敗記錄顷牌,批量更新記錄日志
       }catch(Exception e){
           log.error("", e);
       }
    }
 }
  1. 如果processOne會(huì)影響list查詢(xún)的數(shù)據(jù),比如刪除或修改了待加工數(shù)據(jù)為已加工塞淹,會(huì)導(dǎo)致查詢(xún)的數(shù)據(jù)范圍發(fā)生變化窟蓝,那么再取第二個(gè)分頁(yè)數(shù)據(jù)時(shí),實(shí)際上已經(jīng)不是未加工前的第二個(gè)分頁(yè)數(shù)據(jù)饱普;原第二個(gè)分頁(yè)數(shù)據(jù)部分?jǐn)?shù)據(jù)已跑到第一個(gè)分頁(yè)中运挫,如此導(dǎo)致本次加工作業(yè)漏加工數(shù)據(jù)。
    • 影響級(jí)別:中套耕,目前大部分是定時(shí)加工谁帕,錯(cuò)過(guò)的數(shù)據(jù),下一次定時(shí)作業(yè)還能加工到冯袍,但是加工處理過(guò)程會(huì)被拉長(zhǎng)匈挖。
    • 解決方案:如果不同數(shù)據(jù)間的加工無(wú)關(guān)聯(lián),無(wú)需保證順序的話康愤,那么可以從最后一個(gè)分頁(yè)往前加工儡循,如下所示
// doExecute方法中的代碼

         for (int i = loop-1; i >=0; i--) {
            //list方法一般會(huì)按照時(shí)間或某個(gè)字段排序,保證分頁(yè)取數(shù)不會(huì)亂
            List<Bill> list = getBillRepository().list(store.getCode(),
                latestProcessTime, executeTime, i, pageSize);
            for (Bill bill : list) {
              //  加工
              processOne(bill);
            }
          }
  1. 如果數(shù)據(jù)加工失敗沒(méi)得到及時(shí)修復(fù)征冷,那么這部分?jǐn)?shù)據(jù)會(huì)一直重復(fù)加工择膝,純粹浪費(fèi)資源。
    • 影響級(jí)別:低
    • 解決方案:
      • 增加最大重試次數(shù)资盅,超過(guò)最大重試次數(shù)之后不再進(jìn)行加工调榄;
      • list待加工數(shù)據(jù)時(shí)限制重試次數(shù)踊赠,processOne失敗增加重試次數(shù)呵扛。
  2. 如果查詢(xún)的待加工表數(shù)據(jù)量異常龐大,比如百萬(wàn)級(jí)別數(shù)據(jù)量以上筐带,那么使用count統(tǒng)計(jì)數(shù)據(jù)隨數(shù)據(jù)量的增加對(duì)應(yīng)查詢(xún)耗時(shí)增加今穿。
    • 影響級(jí)別:低,目前較少遇到大數(shù)據(jù)加工場(chǎng)景伦籍,如果有那么當(dāng)前的代碼框架也得換了蓝晒。
    • 解決方案:(只適用于待加工數(shù)據(jù)范圍不會(huì)變化的場(chǎng)景腮出,比如全量同步某種資料,該場(chǎng)景較少)去掉count語(yǔ)句芝薇,最外層for改用while
// doExecute方法中的代碼
        int page = 0;
        List<Bill> list=null;
        while(CollectionUtils.isNotEmpty(list=getBillRepository().list(store.getCode(),
                latestProcessTime, executeTime,page, pageSize)){ 
            for (Bill bill : list) {
              //  加工
              processOne(bill);
            }
            
            if (list.size() < pageSize) {   
                break;
            }
            page++;
          }
          

推薦寫(xiě)法

  public void doExecute() {
     ...
    try {
          long count = getBillRepository().count(store.getCode(), latestProcessTime,  executeTime);
          int loop = (int) Math.ceil(Double.valueOf(count) / pageSize);
         for (int i = loop-1; i >=0; i--) {
            //list方法一般會(huì)按照時(shí)間或某個(gè)字段排序胚嘲,保證分頁(yè)取數(shù)不會(huì)亂
            List<Bill> list = getBillRepository().list(store.getCode(),
                latestProcessTime, executeTime, i, pageSize);
            for (Bill bill : list) {
              //  加工
              processOne(bill);
            }
          }
      } catch (Exception e) {
      log.error("加工XX流水發(fā)生錯(cuò)誤:", e);
    }
  }
      
     
private void processOne(Bill bill)  {
    try{
        //實(shí)際加工內(nèi)容 TODO
     } catch (Exception e) {
       log.error("加工單據(jù)XX發(fā)生錯(cuò)誤:", e);
       try{
           BillFailure failure = new BillFailure();
           failure.setBillId(bill.getId());
           //有童鞋不截取消息,導(dǎo)致數(shù)據(jù)庫(kù)存儲(chǔ)超長(zhǎng)報(bào)錯(cuò)洛二,當(dāng)然有些童鞋已經(jīng)在fail方法中處理馋劈,此處特別列出坑。
           failure.setMessage(StringUtils.substring(e.getMessage), 0, 255);
           //fail中會(huì)累加重試次數(shù)update XX set retries=retries+1 where ...
           getBillRepository().fail(failure);//此處代碼還可以進(jìn)一步優(yōu)化晾嘶,比如累積一批失敗記錄妓雾,批量更新記錄日志
       }catch(Exception e){
           log.error("", e);
       }
    }
 }
 

基類(lèi)抽象寫(xiě)法

可將以上通用過(guò)程抽取基類(lèi),減少開(kāi)發(fā)踩坑垒迂,最終代碼可能如下所示:

//如果想一次取出所有數(shù)據(jù)械姻,total方法返回1,fetchData查詢(xún)返回所有數(shù)據(jù)即可
    @Slf4j
    @Component
    @ConditionalOnProperty(value = "demo.job.enabled", havingValue = "true")
    public class DemoDataProcessJob extends DataProcessAbstractJob<PChain> {
        public static final String CRON_EXPRESSION_KEY = "demo.job.cronExpression";
        public static final String CRON_EXPRESSION_DEFAULT_VALUE = "0 0 0,12 * * ?";
    
        @Autowired
        private ChainRepository repository;
    
        //方法說(shuō)明:待加工的總記錄數(shù)
        @Override
        protected long total() {
            return repository.count();
        }
    
        //方法說(shuō)明:獲取指定頁(yè)碼的一頁(yè)待加工數(shù)據(jù)
        @Override
        protected List<PChain> fetchData(int page, int pageSize) {
            //特別注意查詢(xún)方法中需要按照一定規(guī)則排序机断,一般是XX時(shí)間字段
            return repository.list(page, pageSize);
        }
    
        //方法說(shuō)明:加工一頁(yè)數(shù)據(jù)
        @Override
        protected void processData(List<PChain> data) {
            for (PChain chain : data) {
                processOne(chain);
            }
        }
    
        //方法說(shuō)明(實(shí)際代碼別拷貝我):加工一條數(shù)據(jù)
        private void processOne(PChain chain) {
            try {
                //TODO 此處做實(shí)際數(shù)據(jù)加工處理
    
            } catch (Exception e) {
            log.error("加工單據(jù)XX發(fā)生錯(cuò)誤:", e);
                //TODO 此處做數(shù)據(jù)加工異常處理楷拳,比如增加最大重試次數(shù)、記錄失敗日志
            }
        }
    
        //方法說(shuō)明(實(shí)際代碼別拷貝我):分頁(yè)大小吏奸,默認(rèn)為500唯竹,根據(jù)業(yè)務(wù)需要可重寫(xiě)父類(lèi)此方法
        // @Override
        // protected int pageSize() {
        //     return super.pageSize();
        // }
    
        //方法說(shuō)明(實(shí)際代碼別拷貝我):一頁(yè)數(shù)據(jù)加工過(guò)程發(fā)生異常是否忽略,繼續(xù)加工下一分頁(yè)數(shù)據(jù)苦丁;默認(rèn)為true浸颓。
        // @Override
        // protected boolean ignoreException() {
        //     return super.ignoreException();
        // }
    
        @Override
        public String getDescription() {
            return "數(shù)據(jù)加工作業(yè)示例";
        }
    
        @Override
        public String getCronExpression() {
            Environment env = ApplicationContextUtils.getBean(Environment.class);
            return env.getProperty(CRON_EXPRESSION_KEY, CRON_EXPRESSION_DEFAULT_VALUE);
        }
    }

其它思考

  1. 加工過(guò)程涉及的取數(shù)、更新等操作盡量使用批量旺拉。
  2. 針對(duì)頻繁失敗的加工數(shù)據(jù)产上,除了增加重試次數(shù)以外,還可將重試加工的時(shí)間往后延蛾狗,避免失敗數(shù)據(jù)積壓影響正常數(shù)據(jù)的加工速度晋涣。思路類(lèi)似Spring Retry中的重試等待策略。
  3. 集群場(chǎng)景沉桌,可考慮結(jié)合quartz谢鹊、ElasticJob等實(shí)現(xiàn)分布式定時(shí)作業(yè)加工。
  4. 如數(shù)據(jù)量較大留凭,加工時(shí)限要求高的佃扼,可引入線程池并發(fā)加工。(大部分場(chǎng)景無(wú)需引入并發(fā)處理)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末蔼夜,一起剝皮案震驚了整個(gè)濱河市兼耀,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖瘤运,帶你破解...
    沈念sama閱讀 218,858評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件窍霞,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡拯坟,警方通過(guò)查閱死者的電腦和手機(jī)但金,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)郁季,“玉大人傲绣,你說(shuō)我怎么就攤上這事」ぃ” “怎么了秃诵?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,282評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)塞琼。 經(jīng)常有香客問(wèn)我菠净,道長(zhǎng),這世上最難降的妖魔是什么彪杉? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,842評(píng)論 1 295
  • 正文 為了忘掉前任毅往,我火速辦了婚禮,結(jié)果婚禮上派近,老公的妹妹穿的比我還像新娘攀唯。我一直安慰自己,他們只是感情好渴丸,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,857評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布侯嘀。 她就那樣靜靜地躺著,像睡著了一般谱轨。 火紅的嫁衣襯著肌膚如雪戒幔。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,679評(píng)論 1 305
  • 那天土童,我揣著相機(jī)與錄音诗茎,去河邊找鬼。 笑死献汗,一個(gè)胖子當(dāng)著我的面吹牛敢订,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播罢吃,決...
    沈念sama閱讀 40,406評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼楚午,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了刃麸?” 一聲冷哼從身側(cè)響起醒叁,我...
    開(kāi)封第一講書(shū)人閱讀 39,311評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤司浪,失蹤者是張志新(化名)和其女友劉穎泊业,沒(méi)想到半個(gè)月后把沼,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,767評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡吁伺,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年饮睬,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片篮奄。...
    茶點(diǎn)故事閱讀 40,090評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡捆愁,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出窟却,到底是詐尸還是另有隱情昼丑,我是刑警寧澤,帶...
    沈念sama閱讀 35,785評(píng)論 5 346
  • 正文 年R本政府宣布夸赫,位于F島的核電站菩帝,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏茬腿。R本人自食惡果不足惜呼奢,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,420評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望切平。 院中可真熱鬧握础,春花似錦、人聲如沸悴品。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,988評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)苔严。三九已至菇存,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間邦蜜,已是汗流浹背依鸥。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,101評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留悼沈,地道東北人贱迟。 一個(gè)月前我還...
    沈念sama閱讀 48,298評(píng)論 3 372
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像絮供,于是被迫代替她去往敵國(guó)和親衣吠。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,033評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容