Flink#CheckpointFunction接口實(shí)現(xiàn)

1.CheckpointFunction

Flink 中持久化的動(dòng)作就是checkpoint ,其在Flink中的所占的分量不言而喻埃篓,當(dāng)我們使用Flink的一些自定義邏輯接口的時(shí)候如果在實(shí)現(xiàn)邏輯的同時(shí)還能實(shí)現(xiàn)其 CheckpointFunction接口邏輯泛领,無疑是我們的自定義實(shí)現(xiàn)更加趨于完美,同時(shí)也有效的體現(xiàn)了Flink 的state 計(jì)算的強(qiáng)大能力帚屉。

public interface CheckpointedFunction {

    /**
     * This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to
     * ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when
     * the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself.
     *
     * @param context the context for drawing a snapshot of the operator
     * @throws Exception
     */
    void snapshotState(FunctionSnapshotContext context) throws Exception;

    /**
     * This method is called when the parallel function instance is created during distributed
     * execution. Functions typically set up their state storing data structures in this method.
     *
     * @param context the context for initializing the operator
     * @throws Exception
     */
    void initializeState(FunctionInitializationContext context) throws Exception;
}

#snapshotState方法:當(dāng)每次任務(wù)觸發(fā)checkpoint時(shí)執(zhí)行寺渗,更新保存狀態(tài)數(shù)據(jù)
#initializeState方法:初始化checkpoint 存儲(chǔ)結(jié)構(gòu)种柑,一般在這里我們會(huì)實(shí)現(xiàn)兩個(gè)邏輯:

  • 1.判斷checkpoint 是否是重啟狀態(tài)恢復(fù),并實(shí)現(xiàn)狀態(tài)恢復(fù)邏輯
  • 2.初始化checkpoint存儲(chǔ)邏輯規(guī)則拍谐。

FlinkKafkaConsumerBase 源碼分析解讀

下面將該實(shí)現(xiàn)加上相應(yīng)的注釋,以方便大家對(duì)這塊代碼的理解

      /** Data for pending but uncommitted offsets.  新的狀態(tài)快照暫時(shí)存儲(chǔ)的集合 */
    private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
    /** Accessor for state in the operator state backend.*/
     // 狀態(tài)存儲(chǔ)的規(guī)則集合馏段。(簡(jiǎn)單來說也就是定義了:FlinkKafkaConsumer存儲(chǔ)的格式是什么 )
    private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
      /** State name of the consumer's partition offset states. */
    private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";

@Override
    public final void initializeState(FunctionInitializationContext context) throws Exception {
          //獲取計(jì)算狀態(tài)存儲(chǔ)對(duì)象
        OperatorStateStore stateStore = context.getOperatorStateStore();

        //通過默認(rèn)的狀態(tài)存儲(chǔ)名稱轩拨,獲取對(duì)應(yīng)的存儲(chǔ)狀態(tài)集合(如果是初次啟動(dòng),則一定為空院喜,此操作是為了判斷是否為狀態(tài)恢復(fù)操作)
        ListState<Tuple2<KafkaTopicPartition, Long>> oldRoundRobinListState =
              stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);



//***************************************************************************************************************************************
// 在不考慮狀態(tài)恢復(fù)的情況下亡蓉,其實(shí)本步操作已經(jīng)完成了 狀態(tài)存儲(chǔ)的初始化的所有邏輯 
//***************************************************************************************************************************************
       //為狀態(tài)存儲(chǔ)集合初始化 
        this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
                OFFSETS_STATE_NAME, // 狀態(tài)存儲(chǔ)字符串名稱
                 /*類型狀態(tài)信息(該寫法是固定寫法,是由TypeHint 源碼示例提供 返回值:*/
                        TypeInformation<Tuple2<KafkaTopicPartition, Long>>)
                TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
          
//****************************************************************************************************************************************
//****************************************************************************************************************************************



//****************************************************************************************************************************************
//  判斷是否為狀態(tài)恢復(fù)的操作:若是狀態(tài)恢復(fù)的操作-->讀取舊狀態(tài)集合#oldRoundRobinListState 加載到#unionOffsetStates 中保存喷舀,并清空#oldRoundRobinListState 
//****************************************************************************************************************************************
            
              // 判斷 是否為狀態(tài)恢復(fù)的操作
        if (context.isRestored() && !restoredFromOldState) {
            restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());

            // 讀取舊狀態(tài)集合加載到 狀態(tài)存儲(chǔ)集合中
            for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : oldRoundRobinListState.get()) {
                restoredFromOldState = true;
                unionOffsetStates.add(kafkaOffset);
            }
           //清空舊狀態(tài)集合
            oldRoundRobinListState.clear();
            //判斷分區(qū)發(fā)現(xiàn)功能是否可用
            if (restoredFromOldState && discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {
                throw new IllegalArgumentException(
                    "Topic / partition discovery cannot be enabled if the job is restored from a savepoint from Flink 1.2.x.");
            }

            //此步操作是將restoredState存儲(chǔ)一份保存起來砍濒,用于分區(qū)發(fā)現(xiàn)功能時(shí)操作(可閱讀restoredState參數(shù)的原英文說明)
            for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
                restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
            }

            LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState);
        } else {
           // 不是狀態(tài)恢復(fù)操作 給出日志提示
            LOG.info("No restore state for FlinkKafkaConsumer.");
        }
//****************************************************************************************************************************************
//****************************************************************************************************************************************

    }

    @Override   // 觸發(fā)快照
    public final void snapshotState(FunctionSnapshotContext context) throws Exception {
                //判斷消費(fèi)者是否還在運(yùn)行
        if (!running) {
            LOG.debug("snapshotState() called on closed source");
        } else {
                    // 清空狀態(tài)存儲(chǔ)數(shù)據(jù)
            unionOffsetStates.clear();
                     // kafkaFetcher kafka 訪問對(duì)象,該對(duì)象第一次初始化在 FlinkKafkaConsumerBase#run 方法中硫麻,每次的數(shù)據(jù)交互都會(huì)更新
            final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
  //*******************************************************************************************************************************
  // 此處不在詳細(xì)贅述:kafka 訪問器為null,則說明數(shù)據(jù)run 還未執(zhí)行爸邢,則以訂閱分區(qū)信息作為狀態(tài)信息存儲(chǔ)
  //*******************************************************************************************************************************
            if (fetcher == null) {
                // the fetcher has not yet been initialized, which means we need to return the
                // originally restored offsets or the assigned partitions
                for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
                    unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
                }

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call can happen
                    // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
                }
  //******************************************************************************************************************************* 
  //*******************************************************************************************************************************




  //*******************************************************************************************************************************
  //  獲取 fetcher 訪問器中的狀態(tài)信息到#currentOffsets 中,并將其作為最新的狀態(tài)信息保存
  //*******************************************************************************************************************************
            } else {
                HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();

                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                    // the map cannot be asynchronously updated, because only one checkpoint call can happen
                    // on this function at a time: either snapshotState() or notifyCheckpointComplete()
                    pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
                }

                for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
                    unionOffsetStates.add(
                            Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
                }
            }
  //******************************************************************************************************************************* 
  //*******************************************************************************************************************************



  //******************************************************************************************************************************* 
  // 判斷 最大checkpoint 長(zhǎng)度庶香,將超出的記錄移除 保證記錄的最大長(zhǎng)度使用不超過用戶的配置長(zhǎng)度
  //*******************************************************************************************************************************
            if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
                // truncate the map of pending offsets to commit, to prevent infinite growth
                while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
                    pendingOffsetsToCommit.remove(0);
                }
            }
  //******************************************************************************************************************************* 
  //*******************************************************************************************************************************
        }
    }

本文在注釋中詳細(xì)的解讀了 FlinkKafkaConsumerBase 中CheckpointFunction的具體實(shí)現(xiàn)方式甲棍,以及每一步實(shí)現(xiàn)的意義。后期會(huì)進(jìn)一步向大家展示具體的應(yīng)用案例 赶掖。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末感猛,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子奢赂,更是在濱河造成了極大的恐慌陪白,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,542評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件膳灶,死亡現(xiàn)場(chǎng)離奇詭異咱士,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)轧钓,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,596評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門序厉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人毕箍,你說我怎么就攤上這事弛房。” “怎么了而柑?”我有些...
    開封第一講書人閱讀 158,021評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵文捶,是天一觀的道長(zhǎng)荷逞。 經(jīng)常有香客問我,道長(zhǎng)粹排,這世上最難降的妖魔是什么种远? 我笑而不...
    開封第一講書人閱讀 56,682評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮顽耳,結(jié)果婚禮上坠敷,老公的妹妹穿的比我還像新娘。我一直安慰自己斧抱,他們只是感情好常拓,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,792評(píng)論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著辉浦,像睡著了一般弄抬。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上宪郊,一...
    開封第一講書人閱讀 49,985評(píng)論 1 291
  • 那天掂恕,我揣著相機(jī)與錄音,去河邊找鬼弛槐。 笑死懊亡,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的乎串。 我是一名探鬼主播店枣,決...
    沈念sama閱讀 39,107評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼叹誉!你這毒婦竟也來了鸯两?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,845評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤长豁,失蹤者是張志新(化名)和其女友劉穎钧唐,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體匠襟,經(jīng)...
    沈念sama閱讀 44,299評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡钝侠,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,612評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了酸舍。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片帅韧。...
    茶點(diǎn)故事閱讀 38,747評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖啃勉,靈堂內(nèi)的尸體忽然破棺而出忽舟,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 34,441評(píng)論 4 333
  • 正文 年R本政府宣布萧诫,位于F島的核電站,受9級(jí)特大地震影響枝嘶,放射性物質(zhì)發(fā)生泄漏帘饶。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,072評(píng)論 3 317
  • 文/蒙蒙 一群扶、第九天 我趴在偏房一處隱蔽的房頂上張望及刻。 院中可真熱鬧,春花似錦竞阐、人聲如沸缴饭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,828評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽颗搂。三九已至,卻和暖如春幕垦,著一層夾襖步出監(jiān)牢的瞬間丢氢,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,069評(píng)論 1 267
  • 我被黑心中介騙來泰國(guó)打工先改, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留疚察,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,545評(píng)論 2 362
  • 正文 我出身青樓仇奶,卻偏偏與公主長(zhǎng)得像貌嫡,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子该溯,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,658評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容