問題及背景
首先說一下背景,最近在做實(shí)時數(shù)倉,準(zhǔn)備構(gòu)建實(shí)時寬表,讀取kafka數(shù)據(jù)實(shí)時關(guān)聯(lián)維表并寫入kafka和HDFS,由于公司hadoop版本是2.6的所以寫HDFS用的BucketingSink
程序開發(fā)完運(yùn)行了一段時間發(fā)現(xiàn)寫到hdfs文件的狀態(tài)一直是pending狀態(tài)
于是各種排查,后來發(fā)現(xiàn)是程序一直無法觸發(fā)checkpoint和savepoint導(dǎo)致pending沒有轉(zhuǎn)換為finished,job Manager日志如下
排查
順著Job Manager日志找到 CheckpointCoordinator,我們可以發(fā)現(xiàn)checkpoint觸發(fā)了triggerCheckpoint方法,順著往下看會發(fā)現(xiàn)
Execution[] executions = new Execution[tasksToTrigger.length];
for (int i = 0; i < tasksToTrigger.length; i++) {
Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
if (ee == null) {
LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job);
return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
} else if (ee.getState() == ExecutionState.RUNNING) {
executions[i] = ee;
} else {
LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job,
ExecutionState.RUNNING,
ee.getState());
return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
}
}
可以看到ee.getState() == ExecutionState.RUNNING 在checkpoint時會判斷每個Execution的State,當(dāng)State不為RUNNING時直接報出
Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.
并結(jié)束triggerCheckpoint,由此推斷出有Execution不是運(yùn)行狀態(tài),進(jìn)入WEB UI 看一下視圖發(fā)現(xiàn)確實(shí)有些Execution處于FINISHED狀態(tài)
原因是在廣播維表時全量與增量數(shù)據(jù)union到一起,全量數(shù)據(jù)由HDFS讀取,而且用的是readTextFile加載,當(dāng)讀取完HDFS文件后Execution就是FINISHED狀態(tài)
解決
修改全量維表加載方式改為在RichFlatMap或者BroadcastProcessFunction的open方法中加載,避免出現(xiàn)Execution為finished狀態(tài)
當(dāng)然如果不寫hdfs或者程序不涉及broadcastState以外的state這種方案是可行的