性能是系統(tǒng)的重要維護(hù)指標(biāo)风罩,性能優(yōu)化的方法論很多,今天結(jié)合我的日常開發(fā)燃观,聊一下提升性能的一個(gè)重要方法:串行處理異步化(并行化)
最近項(xiàng)目中有批處理的需求:調(diào)度中心會(huì)定時(shí)觸發(fā)job敢课,系統(tǒng)會(huì)批量(分頁(yè))獲取表中數(shù)據(jù),然后調(diào)用外部系統(tǒng)圆仔,根據(jù)外部系統(tǒng)返回的結(jié)果,將待處理數(shù)據(jù)打標(biāo)蔫劣,落庫(kù)坪郭;如下圖:
調(diào)用risk系統(tǒng)是feign調(diào)用,開發(fā)完成后脉幢,進(jìn)行測(cè)試歪沃,正常的串行處理下,20萬(wàn)數(shù)據(jù)處理完成耗時(shí)2個(gè)小時(shí)左右嫌松,性能嚴(yán)重堪憂沪曙,所以優(yōu)化是必然的。分析下可以發(fā)現(xiàn)萎羔,該場(chǎng)景下性能可以撈取收益的部分就是將調(diào)用risk系統(tǒng)改為并發(fā)處理液走。
常規(guī)串行方式
串行的偽代碼如下:
@Component
@Slf4j
class FooOriginDataProcessor {
@Autowired
private FooOriginDataService fooOriginDataService;
@Autowired
private FooHitDataService hitDataService;
@Autowired
private DecisionRemoteService decisionRemoteService;
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
public void doProcess(final List<FooOriginDataPO> originDataList) {
if (CollectionUtils.isEmpty(originDataList)) {
return;
}
List<FooHitDataPO> hitDataList = new ArrayList<>();
originDataList.forEach(e -> {
AfterSaleAbnormalReturnDto returnDto = FooOriginDataConverter.instance.convertToReturnDto(e);
DecisionVo decisionVo = decisionRemoteService.decideReturnRisk(returnDto);
if (EnumMerchantReturnRiskType.isAbnormal(decisionVo.getCode())) {
//命中風(fēng)險(xiǎn)
hitDataList.add(FooOriginDataConverter.instance.convertToHitDataPo(e));
}
});
hitDataService.batchInsert(hitDataList);
List<Integer> originDataIds = originDataList.stream().map(m -> m.getId()).collect(Collectors.toList());
FooOriginDataService.updateToProcessSucceed(originDataIds);
}
}
串行異步化
改為并發(fā)處理后,代碼如下:
@Component
@RefreshScope
@Slf4j
class FooOriginDataProcessor implements ApplicationListener<ContextClosedEvent> {
private static final String NAME_PREFIX = "MR_OriginDataProcessor_Thread_";
private static final int CAPACITY = 200;
private static final int N_CPU = Runtime.getRuntime().availableProcessors();
/** I/O密集型任務(wù)贾陷,線程數(shù)量不要超過cpu核數(shù) */
private static final ExecutorService EXECUTOR = new ThreadPoolExecutor(N_CPU, N_CPU, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(CAPACITY),
new ThreadFactoryImpl(NAME_PREFIX), new ThreadPoolExecutor.CallerRunsPolicy());
/**
* 并發(fā)處理的大小缘眶,防止觸發(fā)風(fēng)控系統(tǒng)的流控
*/
@Value("${concurrent_size_of_invocation_risk_api:120}")
private Integer batchSize;
@Autowired
private FooOriginDataService fooOriginDataService;
@Autowired
private FooHitDataService hitDataService;
@Autowired
private DecisionRemoteService decisionRemoteService;
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
public void doProcess(final List<FooOriginDataPO> originDataList) {
log.info("doProcess with cpu numbers: {}", N_CPU);
if (CollectionUtils.isEmpty(originDataList)) {
return;
}
List<FooHitDataPO> hitDataList = new ArrayList<>();
List<CompletableFuture<FooHitDataPO>> hitDataFutureList = new ArrayList<>(originDataList.size());
originDataList.forEach(e -> {
CompletableFuture<FooHitDataPO> itemFuture = CompletableFuture.supplyAsync(() -> {
AfterSaleAbnormalReturnDto returnDto = FooOriginDataConverter.instance.convertToReturnDto(e);
DecisionVo decisionVo = decisionRemoteService.decideReturnRisk(returnDto);
if (EnumMerchantReturnRiskType.isAbnormal(decisionVo.getCode())) {
//命中風(fēng)險(xiǎn)
return FooOriginDataConverter.instance.convertToHitDataPo(e);
}
return null;
}, EXECUTOR);
hitDataFutureList.add(itemFuture);
});
CompletableFuture.allOf((CompletableFuture<?>) hitDataFutureList).get();
AtomicBoolean hasFail = new AtomicBoolean(false);
hitDataFutureList.forEach(fu -> {
try {
FooHitDataPO hitDataPO = fu.get();
if (Objects.nonNull(hitDataPO)) {
hitDataList.add(hitDataPO);
}
} catch (InterruptedException e) {
log.error("fail to get result from future occurs InterruptedException", e);
Thread.currentThread().interrupt();
hasFail.set(true);
} catch (ExecutionException e) {
log.error("fail to get result from future occurs ExecutionException", e);
hasFail.set(true);
}
});
if (hasFail.get()) {
throw new ServiceResponseException("獲取xxx信息發(fā)生異常");
}
hitDataService.batchInsert(hitDataList);
List<Integer> originDataIds = originDataList.stream().map(m -> m.getId()).collect(Collectors.toList());
FooOriginDataService.updateToProcessSucceed(originDataIds);
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
EXECUTOR.shutdown();
}
}
上面的并發(fā)處理代碼也是有問題,大數(shù)據(jù)量下會(huì)造成如下問題:
- 1.如果外部系統(tǒng)有流控(正常項(xiàng)目都會(huì)有流控髓废,是服務(wù)的自我保護(hù)措施)巷懈,你會(huì)觸發(fā)外部系統(tǒng)的流控,導(dǎo)致請(qǐng)求失敗
- 2.如果外部系統(tǒng)沒有流控慌洪,大數(shù)據(jù)量下你會(huì)將它的系統(tǒng)沖垮顶燕,同時(shí)也會(huì)將自己系統(tǒng)打垮:I/O占用系統(tǒng)大量資源,導(dǎo)致其它業(yè)務(wù)無法被處理
綜上所述蒋譬,需要繼續(xù)對(duì)并發(fā)部分進(jìn)行優(yōu)化割岛,優(yōu)化的核心就是思想就是:控制并發(fā)請(qǐng)求的流量
,代碼如下:
@Component
@RefreshScope
@Slf4j
class FooOriginDataProcessor implements ApplicationListener<ContextClosedEvent> {
private static final long ONE_THOUSAND = 1000L;
private static final String NAME_PREFIX = "MR_OriginDataProcessor_Thread_";
private static final int CAPACITY = 200;
private static final int N_CPU = Runtime.getRuntime().availableProcessors();
/** I/O密集型任務(wù),線程數(shù)量不要超過cpu核數(shù) */
private static final ExecutorService EXECUTOR = new ThreadPoolExecutor(N_CPU, N_CPU, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(CAPACITY),
new ThreadFactoryImpl(NAME_PREFIX), new ThreadPoolExecutor.CallerRunsPolicy());
/**
* 并發(fā)處理的大小犯助,防止觸發(fā)風(fēng)控系統(tǒng)的流控
*/
@Value("${concurrent_size_of_invocation_risk_api:120}")
private Integer batchSize;
@Autowired
private FooOriginDataService fooOriginDataService;
@Autowired
private FooHitDataService hitDataService;
@Autowired
private DecisionRemoteService decisionRemoteService;
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
public void doProcess(final List<FooOriginDataPO> originDataList) throws ExecutionException, InterruptedException {
log.info("doProcess with cpu numbers: {}, batchSize: {}", N_CPU, batchSize);
if (CollectionUtils.isEmpty(originDataList)) {
return;
}
List<FooHitDataPO> hitDataList = new ArrayList<>();
List<CompletableFuture<FooHitDataPO>> hitDataFutureList = new ArrayList<>(originDataList.size());
List<List<FooOriginDataPO>> allSubListOfList = Lists.partition(originDataList, batchSize);
for (List<FooOriginDataPO> originDataSubList : allSubListOfList) {
CompletableFuture<FooHitDataPO>[] hitDataFutureArray = new CompletableFuture[originDataSubList.size()];
long start = System.currentTimeMillis();
for (int i = 0; i < originDataSubList.size(); i++) {
FooOriginDataPO originDataPO = originDataSubList.get(i);
CompletableFuture<FooHitDataPO> itemFuture = CompletableFuture.supplyAsync(() -> {
AfterSaleAbnormalReturnDto returnDto = FooOriginDataConverter.instance.convertToReturnDto(originDataPO);
DecisionVo decisionVo = decisionRemoteService.decideReturnRisk(returnDto);
if (EnumMerchantReturnRiskType.isAbnormal(decisionVo.getCode())) {
//命中風(fēng)險(xiǎn)
return FooOriginDataConverter.instance.convertToHitDataPo(originDataPO);
}
return null;
}, EXECUTOR);
hitDataFutureArray[i] = itemFuture;
}
//等待當(dāng)前批次返回再處理下一批次
CompletableFuture.allOf(hitDataFutureArray).get();
long concurrentReqCosts = System.currentTimeMillis() - start;
log.info("concurrent request end, costs: {}", concurrentReqCosts);
//concurrentReqCosts不足一秒時(shí)癣漆,主線程需要等待,防止觸發(fā)接口流控
long needSleepTimeMillis = ONE_THOUSAND - concurrentReqCosts;
TimeUnit.MILLISECONDS.sleep(needSleepTimeMillis);
hitDataFutureList.addAll(Arrays.asList(hitDataFutureArray));
}
AtomicBoolean hasFail = new AtomicBoolean(false);
hitDataFutureList.forEach(fu -> {
try {
FooHitDataPO hitDataPO = fu.get();
if (Objects.nonNull(hitDataPO)) {
hitDataList.add(hitDataPO);
}
} catch (InterruptedException e) {
log.error("fail to get result from future occurs InterruptedException", e);
Thread.currentThread().interrupt();
hasFail.set(true);
} catch (ExecutionException e) {
log.error("fail to get result from future occurs ExecutionException", e);
hasFail.set(true);
}
});
if (hasFail.get()) {
throw new ServiceResponseException("獲取信息發(fā)生異常");
}
hitDataService.batchInsert(hitDataList);
List<Integer> originDataIds = originDataList.stream().map(m -> m.getId()).collect(Collectors.toList());
FooOriginDataService.updateToProcessSucceed(originDataIds);
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
EXECUTOR.shutdown();
}
}
最終優(yōu)化后剂买,20萬(wàn)數(shù)據(jù)處理落庫(kù)耗時(shí)25分鐘惠爽,需要知道的是此處受限于外部系統(tǒng)的流控(如這里risk系統(tǒng)對(duì)提供給我的這個(gè)接口的限流是150/s),也就是說我每秒最多能處理的數(shù)據(jù)量也就是150條瞬哼,所以你會(huì)發(fā)現(xiàn)婚肆,優(yōu)化是有很多限制條件的,也就是說大家需要具體問題具體分析
一些小結(jié):
- 串行處理可以通過異步化(并行化)提升處理速度
- I/O密集型任務(wù)(網(wǎng)絡(luò)IO或文件IO)異步處理時(shí)坐慰,處理線程的數(shù)量不要超過CPU核數(shù)较性,線程過多反而會(huì)降低處理性能
- 異步處理時(shí)要考慮并發(fā)量,防止因并發(fā)過高拖垮系統(tǒng)和其它相關(guān)系統(tǒng)