1.CheckpointFunction
Flink 中持久化的動(dòng)作就是checkpoint ,其在Flink中的所占的分量不言而喻埃篓,當(dāng)我們使用Flink的一些自定義邏輯接口的時(shí)候如果在實(shí)現(xiàn)邏輯的同時(shí)還能實(shí)現(xiàn)其 CheckpointFunction接口邏輯泛领,無疑是我們的自定義實(shí)現(xiàn)更加趨于完美,同時(shí)也有效的體現(xiàn)了Flink 的state 計(jì)算的強(qiáng)大能力帚屉。
public interface CheckpointedFunction {
/**
* This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to
* ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when
* the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself.
*
* @param context the context for drawing a snapshot of the operator
* @throws Exception
*/
void snapshotState(FunctionSnapshotContext context) throws Exception;
/**
* This method is called when the parallel function instance is created during distributed
* execution. Functions typically set up their state storing data structures in this method.
*
* @param context the context for initializing the operator
* @throws Exception
*/
void initializeState(FunctionInitializationContext context) throws Exception;
}
#snapshotState方法:當(dāng)每次任務(wù)觸發(fā)checkpoint時(shí)執(zhí)行寺渗,更新保存狀態(tài)數(shù)據(jù)
#initializeState方法:初始化checkpoint 存儲(chǔ)結(jié)構(gòu)种柑,一般在這里我們會(huì)實(shí)現(xiàn)兩個(gè)邏輯:
- 1.判斷checkpoint 是否是重啟狀態(tài)恢復(fù),并實(shí)現(xiàn)狀態(tài)恢復(fù)邏輯
- 2.初始化checkpoint存儲(chǔ)邏輯規(guī)則拍谐。
FlinkKafkaConsumerBase 源碼分析解讀
下面將該實(shí)現(xiàn)加上相應(yīng)的注釋,以方便大家對(duì)這塊代碼的理解
/** Data for pending but uncommitted offsets. 新的狀態(tài)快照暫時(shí)存儲(chǔ)的集合 */
private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
/** Accessor for state in the operator state backend.*/
// 狀態(tài)存儲(chǔ)的規(guī)則集合馏段。(簡(jiǎn)單來說也就是定義了:FlinkKafkaConsumer存儲(chǔ)的格式是什么 )
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
/** State name of the consumer's partition offset states. */
private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
@Override
public final void initializeState(FunctionInitializationContext context) throws Exception {
//獲取計(jì)算狀態(tài)存儲(chǔ)對(duì)象
OperatorStateStore stateStore = context.getOperatorStateStore();
//通過默認(rèn)的狀態(tài)存儲(chǔ)名稱轩拨,獲取對(duì)應(yīng)的存儲(chǔ)狀態(tài)集合(如果是初次啟動(dòng),則一定為空院喜,此操作是為了判斷是否為狀態(tài)恢復(fù)操作)
ListState<Tuple2<KafkaTopicPartition, Long>> oldRoundRobinListState =
stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
//***************************************************************************************************************************************
// 在不考慮狀態(tài)恢復(fù)的情況下亡蓉,其實(shí)本步操作已經(jīng)完成了 狀態(tài)存儲(chǔ)的初始化的所有邏輯
//***************************************************************************************************************************************
//為狀態(tài)存儲(chǔ)集合初始化
this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
OFFSETS_STATE_NAME, // 狀態(tài)存儲(chǔ)字符串名稱
/*類型狀態(tài)信息(該寫法是固定寫法,是由TypeHint 源碼示例提供 返回值:*/
TypeInformation<Tuple2<KafkaTopicPartition, Long>>)
TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
//****************************************************************************************************************************************
//****************************************************************************************************************************************
//****************************************************************************************************************************************
// 判斷是否為狀態(tài)恢復(fù)的操作:若是狀態(tài)恢復(fù)的操作-->讀取舊狀態(tài)集合#oldRoundRobinListState 加載到#unionOffsetStates 中保存喷舀,并清空#oldRoundRobinListState
//****************************************************************************************************************************************
// 判斷 是否為狀態(tài)恢復(fù)的操作
if (context.isRestored() && !restoredFromOldState) {
restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
// 讀取舊狀態(tài)集合加載到 狀態(tài)存儲(chǔ)集合中
for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : oldRoundRobinListState.get()) {
restoredFromOldState = true;
unionOffsetStates.add(kafkaOffset);
}
//清空舊狀態(tài)集合
oldRoundRobinListState.clear();
//判斷分區(qū)發(fā)現(xiàn)功能是否可用
if (restoredFromOldState && discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {
throw new IllegalArgumentException(
"Topic / partition discovery cannot be enabled if the job is restored from a savepoint from Flink 1.2.x.");
}
//此步操作是將restoredState存儲(chǔ)一份保存起來砍濒,用于分區(qū)發(fā)現(xiàn)功能時(shí)操作(可閱讀restoredState參數(shù)的原英文說明)
for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
}
LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState);
} else {
// 不是狀態(tài)恢復(fù)操作 給出日志提示
LOG.info("No restore state for FlinkKafkaConsumer.");
}
//****************************************************************************************************************************************
//****************************************************************************************************************************************
}
@Override // 觸發(fā)快照
public final void snapshotState(FunctionSnapshotContext context) throws Exception {
//判斷消費(fèi)者是否還在運(yùn)行
if (!running) {
LOG.debug("snapshotState() called on closed source");
} else {
// 清空狀態(tài)存儲(chǔ)數(shù)據(jù)
unionOffsetStates.clear();
// kafkaFetcher kafka 訪問對(duì)象,該對(duì)象第一次初始化在 FlinkKafkaConsumerBase#run 方法中硫麻,每次的數(shù)據(jù)交互都會(huì)更新
final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
//*******************************************************************************************************************************
// 此處不在詳細(xì)贅述:kafka 訪問器為null,則說明數(shù)據(jù)run 還未執(zhí)行爸邢,則以訂閱分區(qū)信息作為狀態(tài)信息存儲(chǔ)
//*******************************************************************************************************************************
if (fetcher == null) {
// the fetcher has not yet been initialized, which means we need to return the
// originally restored offsets or the assigned partitions
for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
}
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
}
//*******************************************************************************************************************************
//*******************************************************************************************************************************
//*******************************************************************************************************************************
// 獲取 fetcher 訪問器中的狀態(tài)信息到#currentOffsets 中,并將其作為最新的狀態(tài)信息保存
//*******************************************************************************************************************************
} else {
HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
}
for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
unionOffsetStates.add(
Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
}
}
//*******************************************************************************************************************************
//*******************************************************************************************************************************
//*******************************************************************************************************************************
// 判斷 最大checkpoint 長(zhǎng)度庶香,將超出的記錄移除 保證記錄的最大長(zhǎng)度使用不超過用戶的配置長(zhǎng)度
//*******************************************************************************************************************************
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// truncate the map of pending offsets to commit, to prevent infinite growth
while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
pendingOffsetsToCommit.remove(0);
}
}
//*******************************************************************************************************************************
//*******************************************************************************************************************************
}
}