性能優(yōu)化落地之串行異步化

性能是系統(tǒng)的重要維護(hù)指標(biāo)风罩,性能優(yōu)化的方法論很多,今天結(jié)合我的日常開發(fā)燃观,聊一下提升性能的一個(gè)重要方法:串行處理異步化(并行化)

最近項(xiàng)目中有批處理的需求:調(diào)度中心會(huì)定時(shí)觸發(fā)job敢课,系統(tǒng)會(huì)批量(分頁(yè))獲取表中數(shù)據(jù),然后調(diào)用外部系統(tǒng)圆仔,根據(jù)外部系統(tǒng)返回的結(jié)果,將待處理數(shù)據(jù)打標(biāo)蔫劣,落庫(kù)坪郭;如下圖:


業(yè)務(wù)簡(jiǎn)圖.png

調(diào)用risk系統(tǒng)是feign調(diào)用,開發(fā)完成后脉幢,進(jìn)行測(cè)試歪沃,正常的串行處理下,20萬(wàn)數(shù)據(jù)處理完成耗時(shí)2個(gè)小時(shí)左右嫌松,性能嚴(yán)重堪憂沪曙,所以優(yōu)化是必然的。分析下可以發(fā)現(xiàn)萎羔,該場(chǎng)景下性能可以撈取收益的部分就是將調(diào)用risk系統(tǒng)改為并發(fā)處理液走。

常規(guī)串行方式

串行的偽代碼如下:

@Component
@Slf4j
class FooOriginDataProcessor {

    @Autowired
    private FooOriginDataService fooOriginDataService;
    @Autowired
    private FooHitDataService hitDataService;
    @Autowired
    private DecisionRemoteService decisionRemoteService;


    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
    public void doProcess(final List<FooOriginDataPO> originDataList) {
        if (CollectionUtils.isEmpty(originDataList)) {
            return;
        }

        List<FooHitDataPO> hitDataList = new ArrayList<>();

        originDataList.forEach(e -> {
            AfterSaleAbnormalReturnDto returnDto = FooOriginDataConverter.instance.convertToReturnDto(e);
            DecisionVo decisionVo = decisionRemoteService.decideReturnRisk(returnDto);
            if (EnumMerchantReturnRiskType.isAbnormal(decisionVo.getCode())) {
                //命中風(fēng)險(xiǎn)
                hitDataList.add(FooOriginDataConverter.instance.convertToHitDataPo(e));
            }
        });

        hitDataService.batchInsert(hitDataList);
        List<Integer> originDataIds = originDataList.stream().map(m -> m.getId()).collect(Collectors.toList());
        FooOriginDataService.updateToProcessSucceed(originDataIds);
    }

}

串行異步化

改為并發(fā)處理后,代碼如下:

@Component
@RefreshScope
@Slf4j
class FooOriginDataProcessor implements ApplicationListener<ContextClosedEvent> {

    private static final String NAME_PREFIX = "MR_OriginDataProcessor_Thread_";
    private static final int CAPACITY = 200;
    private static final int N_CPU = Runtime.getRuntime().availableProcessors();
    /** I/O密集型任務(wù)贾陷,線程數(shù)量不要超過cpu核數(shù) */
    private static final ExecutorService EXECUTOR = new ThreadPoolExecutor(N_CPU, N_CPU, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(CAPACITY),
            new ThreadFactoryImpl(NAME_PREFIX), new ThreadPoolExecutor.CallerRunsPolicy());

    /**
     * 并發(fā)處理的大小缘眶,防止觸發(fā)風(fēng)控系統(tǒng)的流控
     */
    @Value("${concurrent_size_of_invocation_risk_api:120}")
    private Integer batchSize;
    @Autowired
    private FooOriginDataService fooOriginDataService;
    @Autowired
    private FooHitDataService hitDataService;
    @Autowired
    private DecisionRemoteService decisionRemoteService;


    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
    public void doProcess(final List<FooOriginDataPO> originDataList) {
        
        log.info("doProcess with cpu numbers: {}", N_CPU);
        
        if (CollectionUtils.isEmpty(originDataList)) {
            return;
        }

        List<FooHitDataPO> hitDataList = new ArrayList<>();
        List<CompletableFuture<FooHitDataPO>> hitDataFutureList = new ArrayList<>(originDataList.size());

        originDataList.forEach(e -> {
            CompletableFuture<FooHitDataPO> itemFuture = CompletableFuture.supplyAsync(() -> {
                AfterSaleAbnormalReturnDto returnDto = FooOriginDataConverter.instance.convertToReturnDto(e);
                DecisionVo decisionVo = decisionRemoteService.decideReturnRisk(returnDto);
                if (EnumMerchantReturnRiskType.isAbnormal(decisionVo.getCode())) {
                    //命中風(fēng)險(xiǎn)
                    return FooOriginDataConverter.instance.convertToHitDataPo(e);
                }
                return null;
            }, EXECUTOR);

            hitDataFutureList.add(itemFuture);
        });

        CompletableFuture.allOf((CompletableFuture<?>) hitDataFutureList).get();

        AtomicBoolean hasFail = new AtomicBoolean(false);
        hitDataFutureList.forEach(fu -> {
            try {
                FooHitDataPO hitDataPO = fu.get();
                if (Objects.nonNull(hitDataPO)) {
                    hitDataList.add(hitDataPO);
                }
            } catch (InterruptedException e) {
                log.error("fail to get result from future occurs InterruptedException", e);
                Thread.currentThread().interrupt();
                hasFail.set(true);
            } catch (ExecutionException e) {
                log.error("fail to get result from future occurs ExecutionException", e);
                hasFail.set(true);
            }
        });

        if (hasFail.get()) {
            throw new ServiceResponseException("獲取xxx信息發(fā)生異常");
        }
        
        hitDataService.batchInsert(hitDataList);
        List<Integer> originDataIds = originDataList.stream().map(m -> m.getId()).collect(Collectors.toList());
        FooOriginDataService.updateToProcessSucceed(originDataIds);
    }

    @Override
    public void onApplicationEvent(ContextClosedEvent event) {
        EXECUTOR.shutdown();
    }

}

上面的并發(fā)處理代碼也是有問題,大數(shù)據(jù)量下會(huì)造成如下問題:

  • 1.如果外部系統(tǒng)有流控(正常項(xiàng)目都會(huì)有流控髓废,是服務(wù)的自我保護(hù)措施)巷懈,你會(huì)觸發(fā)外部系統(tǒng)的流控,導(dǎo)致請(qǐng)求失敗
  • 2.如果外部系統(tǒng)沒有流控慌洪,大數(shù)據(jù)量下你會(huì)將它的系統(tǒng)沖垮顶燕,同時(shí)也會(huì)將自己系統(tǒng)打垮:I/O占用系統(tǒng)大量資源,導(dǎo)致其它業(yè)務(wù)無法被處理

綜上所述蒋譬,需要繼續(xù)對(duì)并發(fā)部分進(jìn)行優(yōu)化割岛,優(yōu)化的核心就是思想就是:控制并發(fā)請(qǐng)求的流量,代碼如下:

@Component
@RefreshScope
@Slf4j
class FooOriginDataProcessor implements ApplicationListener<ContextClosedEvent> {

    private static final long ONE_THOUSAND = 1000L;
    private static final String NAME_PREFIX = "MR_OriginDataProcessor_Thread_";
    private static final int CAPACITY = 200;
    private static final int N_CPU = Runtime.getRuntime().availableProcessors();
    /** I/O密集型任務(wù),線程數(shù)量不要超過cpu核數(shù) */
    private static final ExecutorService EXECUTOR = new ThreadPoolExecutor(N_CPU, N_CPU, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(CAPACITY),
            new ThreadFactoryImpl(NAME_PREFIX), new ThreadPoolExecutor.CallerRunsPolicy());

    /**
     * 并發(fā)處理的大小犯助,防止觸發(fā)風(fēng)控系統(tǒng)的流控
     */
    @Value("${concurrent_size_of_invocation_risk_api:120}")
    private Integer batchSize;
    @Autowired
    private FooOriginDataService fooOriginDataService;
    @Autowired
    private FooHitDataService hitDataService;
    @Autowired
    private DecisionRemoteService decisionRemoteService;


    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
    public void doProcess(final List<FooOriginDataPO> originDataList) throws ExecutionException, InterruptedException {

        log.info("doProcess with cpu numbers: {}, batchSize: {}", N_CPU, batchSize);

        if (CollectionUtils.isEmpty(originDataList)) {
            return;
        }

        List<FooHitDataPO> hitDataList = new ArrayList<>();
        List<CompletableFuture<FooHitDataPO>> hitDataFutureList = new ArrayList<>(originDataList.size());

        List<List<FooOriginDataPO>> allSubListOfList = Lists.partition(originDataList, batchSize);
        for (List<FooOriginDataPO> originDataSubList : allSubListOfList) {
            CompletableFuture<FooHitDataPO>[] hitDataFutureArray = new CompletableFuture[originDataSubList.size()];
            long start = System.currentTimeMillis();

            for (int i = 0; i < originDataSubList.size(); i++) {
                FooOriginDataPO originDataPO = originDataSubList.get(i);
                CompletableFuture<FooHitDataPO> itemFuture = CompletableFuture.supplyAsync(() -> {
                    AfterSaleAbnormalReturnDto returnDto = FooOriginDataConverter.instance.convertToReturnDto(originDataPO);
                    DecisionVo decisionVo = decisionRemoteService.decideReturnRisk(returnDto);
                    if (EnumMerchantReturnRiskType.isAbnormal(decisionVo.getCode())) {
                        //命中風(fēng)險(xiǎn)
                        return FooOriginDataConverter.instance.convertToHitDataPo(originDataPO);
                    }
                    return null;
                }, EXECUTOR);

                hitDataFutureArray[i] = itemFuture;
            }

            //等待當(dāng)前批次返回再處理下一批次
            CompletableFuture.allOf(hitDataFutureArray).get();

            long concurrentReqCosts = System.currentTimeMillis() - start;
            log.info("concurrent request end, costs: {}", concurrentReqCosts);

            //concurrentReqCosts不足一秒時(shí)癣漆,主線程需要等待,防止觸發(fā)接口流控
            long needSleepTimeMillis = ONE_THOUSAND - concurrentReqCosts;
            TimeUnit.MILLISECONDS.sleep(needSleepTimeMillis);

            hitDataFutureList.addAll(Arrays.asList(hitDataFutureArray));
        }

        AtomicBoolean hasFail = new AtomicBoolean(false);
        hitDataFutureList.forEach(fu -> {
            try {
                FooHitDataPO hitDataPO = fu.get();
                if (Objects.nonNull(hitDataPO)) {
                    hitDataList.add(hitDataPO);
                }
            } catch (InterruptedException e) {
                log.error("fail to get result from future occurs InterruptedException", e);
                Thread.currentThread().interrupt();
                hasFail.set(true);
            } catch (ExecutionException e) {
                log.error("fail to get result from future occurs ExecutionException", e);
                hasFail.set(true);
            }
        });

        if (hasFail.get()) {
            throw new ServiceResponseException("獲取信息發(fā)生異常");
        }

        hitDataService.batchInsert(hitDataList);
        List<Integer> originDataIds = originDataList.stream().map(m -> m.getId()).collect(Collectors.toList());
        FooOriginDataService.updateToProcessSucceed(originDataIds);
    }

    @Override
    public void onApplicationEvent(ContextClosedEvent event) {
        EXECUTOR.shutdown();
    }

}

最終優(yōu)化后剂买,20萬(wàn)數(shù)據(jù)處理落庫(kù)耗時(shí)25分鐘惠爽,需要知道的是此處受限于外部系統(tǒng)的流控(如這里risk系統(tǒng)對(duì)提供給我的這個(gè)接口的限流是150/s),也就是說我每秒最多能處理的數(shù)據(jù)量也就是150條瞬哼,所以你會(huì)發(fā)現(xiàn)婚肆,優(yōu)化是有很多限制條件的,也就是說大家需要具體問題具體分析

一些小結(jié):

  1. 串行處理可以通過異步化(并行化)提升處理速度
  2. I/O密集型任務(wù)(網(wǎng)絡(luò)IO或文件IO)異步處理時(shí)坐慰,處理線程的數(shù)量不要超過CPU核數(shù)较性,線程過多反而會(huì)降低處理性能
  3. 異步處理時(shí)要考慮并發(fā)量,防止因并發(fā)過高拖垮系統(tǒng)和其它相關(guān)系統(tǒng)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市赞咙,隨后出現(xiàn)的幾起案子责循,更是在濱河造成了極大的恐慌,老刑警劉巖攀操,帶你破解...
    沈念sama閱讀 218,941評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件院仿,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡速和,警方通過查閱死者的電腦和手機(jī)歹垫,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來颠放,“玉大人排惨,你說我怎么就攤上這事〈嚷酰” “怎么了若贮?”我有些...
    開封第一講書人閱讀 165,345評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)痒留。 經(jīng)常有香客問我谴麦,道長(zhǎng),這世上最難降的妖魔是什么伸头? 我笑而不...
    開封第一講書人閱讀 58,851評(píng)論 1 295
  • 正文 為了忘掉前任匾效,我火速辦了婚禮,結(jié)果婚禮上恤磷,老公的妹妹穿的比我還像新娘面哼。我一直安慰自己,他們只是感情好扫步,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,868評(píng)論 6 392
  • 文/花漫 我一把揭開白布魔策。 她就那樣靜靜地躺著,像睡著了一般河胎。 火紅的嫁衣襯著肌膚如雪闯袒。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,688評(píng)論 1 305
  • 那天游岳,我揣著相機(jī)與錄音政敢,去河邊找鬼。 笑死胚迫,一個(gè)胖子當(dāng)著我的面吹牛喷户,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播访锻,決...
    沈念sama閱讀 40,414評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼褪尝,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼闹获!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起恼五,我...
    開封第一講書人閱讀 39,319評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤昌罩,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后灾馒,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,775評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡遣总,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年睬罗,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片旭斥。...
    茶點(diǎn)故事閱讀 40,096評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡容达,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出垂券,到底是詐尸還是另有隱情花盐,我是刑警寧澤,帶...
    沈念sama閱讀 35,789評(píng)論 5 346
  • 正文 年R本政府宣布菇爪,位于F島的核電站算芯,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏凳宙。R本人自食惡果不足惜熙揍,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,437評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望氏涩。 院中可真熱鬧届囚,春花似錦、人聲如沸是尖。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)饺汹。三九已至蛔添,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間首繁,已是汗流浹背作郭。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評(píng)論 1 271
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留弦疮,地道東北人夹攒。 一個(gè)月前我還...
    沈念sama閱讀 48,308評(píng)論 3 372
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像胁塞,于是被迫代替她去往敵國(guó)和親咏尝。 傳聞我的和親對(duì)象是個(gè)殘疾皇子压语,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,037評(píng)論 2 355