【Flink 精選】Kafka Consumer 源碼詳解

本文首先進(jìn)行 Flink Kafka Consumer 原理分析,結(jié)合 SourceFunction 和 Kafka Client API 詳解源碼刃麸。


1.Flink Kafka Consumer 原理

本文基于 flink-1.11 分析 Kafka Consumer 原理。

FlinkKafkaConsumer 主要是繼承基類(lèi) RichParallelSourceFunction司浪,不但可以執(zhí)行 run(...) 方法讀取數(shù)據(jù)泊业,而且擁有狀態(tài)把沼、metric 和多并發(fā)等功能。

1.1 RichParallelSourceFunction 分析

RichParallelSourceFunction 與父類(lèi)的繼承關(guān)系吁伺,如下圖所示饮睬。一方面,RichParallelSourceFunction 間接實(shí)現(xiàn)接口 SourceFunction篮奄,可以執(zhí)行 run(...) 方法讀取數(shù)據(jù)捆愁;另一方面,RichParallelSourceFunction 間接實(shí)現(xiàn)接口 RichFunction窟却,擁有狀態(tài)昼丑、metric 和多并發(fā)等功能。因此夸赫,RichParallelSourceFunction 是有狀態(tài)的和多并發(fā)的 Source 基類(lèi)菩帝。

ParallelSourceFunction 是接口 SourceFunction 的子類(lèi)。共同點(diǎn)是 Source 的基類(lèi)茬腿,需要實(shí)現(xiàn) run() 讀取數(shù)據(jù)呼奢。不同點(diǎn)是前者提供多并發(fā)的能力,后者的并發(fā)度只能為 1切平;
AbstractRichFunction 是接口 RichFunction 的實(shí)現(xiàn)類(lèi)握础,可以提供 open() 方法獲取 RuntimeContext,而 RuntimeContext 擁有 metric揭绑、subtasks 信息弓候、accumulator、state 等功能他匪;

RichParallelSourceFunction繼承圖.jpg

1.2 Flink Kafka Consumer 流程分析

如下圖所示菇存,F(xiàn)link Kafka Consumer 流程主要分為 ①主線(xiàn)程循環(huán)獲取緩存數(shù)據(jù),發(fā)送到下游邦蜜;②消費(fèi)線(xiàn)程循環(huán)消費(fèi) Kafka 數(shù)據(jù)依鸥,保存到緩存

Handover.next:Handover 類(lèi)的 next 屬性悼沈,即 ConsumerRecords 類(lèi)型的緩存數(shù)據(jù)贱迟。Handover 的主要作用是協(xié)調(diào)主線(xiàn)程和消費(fèi)線(xiàn)程,有序地消費(fèi) Kafka 和發(fā)送數(shù)據(jù)到下游算子絮供。

Flink Kafka Consumer流程圖.JPG

(1)主線(xiàn)程

主線(xiàn)程獲取緩存的 Handover.next 對(duì)象即 ConsumerRecords衣吠,發(fā)送到下游算子。首先創(chuàng)建 KafkaFetcher壤靶,同時(shí)內(nèi)部創(chuàng)建消費(fèi)線(xiàn)程 KafkaConsumerThread缚俏。然后,調(diào)用 KafkaFetcher.runFetchLoop() 方法,啟動(dòng)消費(fèi)線(xiàn)程忧换、循環(huán)獲取緩存數(shù)據(jù)恬惯;最后,根據(jù)分區(qū)往下游發(fā)送數(shù)據(jù)亚茬。

(2)消費(fèi)線(xiàn)程

消費(fèi)線(xiàn)程 KafkaConsumerThread 主要循環(huán)消費(fèi) Kafka 數(shù)據(jù)酪耳,保存到緩存。首先刹缝,主線(xiàn)程啟動(dòng)消費(fèi)線(xiàn)程碗暗。接著,KafkaConsumer 從 Kafka Broker 循環(huán) poll 數(shù)據(jù)赞草,同時(shí)保持到緩存中讹堤。

2.Flink Kafka Consumer 源碼詳解

問(wèn)題1:如何使用 FlinkKafkaConsumer ?如何直接使用 KafkaClient API 厨疙?


/**
* 示例1:  Flink DataStream API 使用 FlinkKafkaConsumer 
**/
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//SimpleStringSchema為數(shù)據(jù)字段解析類(lèi)
env.addSource(new FlinkKafkaConsumer<>("eventTopic", new SimpleStringSchema(), properties)


/**
* 示例2:  KafkaClient API 直接使用 KafkaConsumer 
**/
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.poll(Duration.ofMillis(100));

問(wèn)題2:FlinkKafkaConsumer 內(nèi)部是如何使用 KafkaClient API 洲守?

① 初始化

執(zhí)行 env.addSource 的時(shí)候會(huì)創(chuàng)建 StreamSource 算子對(duì)象;StreamSource 構(gòu)造函數(shù)中將 function 即 FlinkKafkaConsumer 對(duì)象傳給父類(lèi) AbstractUdfStreamOperator 的 userFunction 變量沾凄;

StreamExecutionEnvironment源碼:

    public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
        // 省略...
        // function 即 FlinkKafkaConsumer 
        final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
        // 省略...
    }

AbstractUdfStreamOperator源碼:

    // userFunction 即 FlinkKafkaConsumer 
    public AbstractUdfStreamOperator(F userFunction) {
        this.userFunction = requireNonNull(userFunction);
        checkUdfCheckpointingPreconditions();
    }

② Task 啟動(dòng)和運(yùn)行

Task 實(shí)現(xiàn) Java多線(xiàn)程接口 Runnable梗醇。Task 啟動(dòng)后,函數(shù)調(diào)用鏈如下 Task.run() -> Task.doRun() -> StreamTask.invoke() -> StreamTask.runMailboxLoop() -> MailboxProcessor.runMailboxLoop() -> MailboxProcessor.runMailboxStep() -> SourceStreamTask .processInput()撒蟀。processInput() 方法里面啟動(dòng)線(xiàn)程 sourceThread.start()叙谨。上述的關(guān)鍵源碼,如下所示保屯。

StreamTask 源碼如下:

    @Override
    public final void invoke() throws Exception {
            // 省略...
            // 調(diào)用 MailboxProcessor.runMailboxLoop()
            runMailboxLoop();
            // 省略...
    }

MailboxProcessor 源碼如下:

    public void runMailboxLoop() throws Exception {
        // 省略...
        // 循環(huán)執(zhí)行 runMailboxStep
        while (runMailboxStep(localMailbox, defaultActionContext)) {
        }
    }

    private boolean runMailboxStep(TaskMailbox localMailbox, MailboxController defaultActionContext) throws Exception {
        if (processMail(localMailbox)) {
            // 執(zhí)行 mailboxDefaultAction.runDefaultAction手负,即執(zhí)行 SourceStreamTask .processInput()
            mailboxDefaultAction.runDefaultAction(defaultActionContext); // lock is acquired inside default action as needed
            return true;
        }
        return false;
    }

SourceStreamTask 源碼如下:

    @Override
    protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
        // 由于目前沒(méi)有輸入,TaskMailbox 先暫停 loop 主線(xiàn)程
        controller.suspendDefaultAction();

        sourceThread.setTaskDescription(getName());
        sourceThread.start();
        // 省略...
    }

    private class LegacySourceFunctionThread extends Thread {
        // 省略...
        @Override
        public void run() {
            try {
                // 執(zhí)行 source function 的 run() 方法
                mainOperator.run(lock, getStreamStatusMaintainer(), operatorChain);
                completionFuture.complete(null);
            } catch (Throwable t) {
                completionFuture.completeExceptionally(t);
            }
        }
        // 省略...
    }

③ 消費(fèi) Kafka

FlinkKafkaConsumerBase 間接實(shí)現(xiàn)了 SourceFunction 接口姑尺,主要實(shí)現(xiàn) run() 方法竟终。然后,在 run() 方法創(chuàng)建了一個(gè) KafkaFetcher 對(duì)象切蟋,并主要調(diào)用 KafkaFetcher.runFetchLoop()统捶。最終,運(yùn)行消費(fèi)線(xiàn)程 KafkaConsumerThread柄粹,并 while 循環(huán)地 poll Kafka 數(shù)據(jù)喘鸟。上述的關(guān)鍵源碼,如下所示驻右。

FlinkKafkaConsumerBase 源碼如下:

    @Override
    public void run(SourceContext<T> sourceContext) throws Exception {
        // 省略...
        // 創(chuàng)建 KafkaFetcher 對(duì)象 
        this.kafkaFetcher = createFetcher(
                sourceContext,
                subscribedPartitionsToStartOffsets,
                watermarkStrategy,
                (StreamingRuntimeContext) getRuntimeContext(),
                offsetCommitMode,
                getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
                useMetrics);

        // 省略...
        // kafkaFetcher 執(zhí)行 runFetchLoop()什黑,即循環(huán)消費(fèi)數(shù)據(jù)
        kafkaFetcher.runFetchLoop();
        // 省略...
    }

KafkaFetcher 源碼如下:

    @Override
    public void runFetchLoop() throws Exception {
        try {
            // 啟動(dòng)消費(fèi)線(xiàn)程 KafkaConsumerThread 
            consumerThread.start();

            while (running) {
                // 獲取協(xié)調(diào)者 Handover 的 next 緩存值 
                final ConsumerRecords<byte[], byte[]> records = handover.pollNext();

                // 從partition 獲取 數(shù)據(jù)
                for (KafkaTopicPartitionState<T, TopicPartition> partition : subscribedPartitionStates()) {

                    List<ConsumerRecord<byte[], byte[]>> partitionRecords =
                        records.records(partition.getKafkaPartitionHandle());
                    // 向下游發(fā)送數(shù)據(jù)

                    partitionConsumerRecordsHandler(partitionRecords, partition);
                }
            }
        }
        finally {
            consumerThread.shutdown();
        }

KafkaConsumerThread 源碼如下,run() 方法中創(chuàng)建 KafkaClient API 的 KafkaConsumer堪夭,并使用 KafkaConsumer.poll() 消費(fèi)數(shù)據(jù)兑凿。

@Override
    public void run() {
        // 省略...
        // 從主線(xiàn)程獲取的 handover 賦值給本地變量...
        final Handover handover = this.handover;
        // 省略...
        try {
            // 創(chuàng)建 KafkaConsumer
            this.consumer = getConsumer(kafkaProperties);
        }
        catch (Throwable t) {
            handover.reportError(t);
            return;
        }
            // 省略...
            ConsumerRecords<byte[], byte[]> records = null;
            // while 循環(huán)消費(fèi) Kafka
            while (running) {
                // 省略...
                if (records == null) {
                    try {
                        // KafkaConsumer poll 數(shù)據(jù)凯力,即使用 KafkaClient API 的 KafkaConsumer 消費(fèi)數(shù)據(jù)
                        records = consumer.poll(pollTimeout);
                    }
                    catch (WakeupException we) {
                        continue;
                    }
                }

                try {
                        // 把 Kafka 的數(shù)據(jù)保存在 Handover 的緩存中
                    handover.produce(records);
                    records = null;
                }
                // 省略...
            }
    }

問(wèn)題3:Handover 是如何協(xié)調(diào)消費(fèi)線(xiàn)程和主線(xiàn)程,使得前者可以及時(shí)消費(fèi)和保存數(shù)據(jù)礼华,而后者也可以及時(shí)獲取數(shù)據(jù) ?

Handover 的關(guān)鍵方法是 produce() 保存緩存數(shù)據(jù) next拗秘、pollNext() 獲取緩存數(shù)據(jù) next圣絮,主要作用是在消費(fèi)線(xiàn)程和主線(xiàn)程下,保證同一個(gè)緩存數(shù)據(jù) next 雕旨,在同一時(shí)間內(nèi)是不能既更新(寫(xiě))扮匠,也輸出(讀),即保證原子性操作 next凡涩。

Handover 源碼如下:

    /**
    * consumer 線(xiàn)程把 Kafka 數(shù)據(jù)保存到 next 
    **/
    public void produce(final ConsumerRecords<byte[], byte[]> element)
            throws InterruptedException, WakeupException, ClosedException {

        checkNotNull(element);

        synchronized (lock) {
            // 循環(huán)判斷 next 是否為 null
            while (next != null && !wakeupProducer) {
                // lock 會(huì)釋放當(dāng)前的鎖棒搜,該 consumer 線(xiàn)程進(jìn)入 waiting 狀態(tài)
                lock.wait();
            }
            // 省略...
            else if (error == null) {
                // 寫(xiě) next
                next = element;
                // 喚醒 lock(使得處于 waiting 狀態(tài)的 main 線(xiàn)程能夠繼續(xù)執(zhí)行)
                lock.notifyAll();
            }
            // 省略...
        }
    }

    /**
    * main 線(xiàn)程讀取 next 
    **/
    public ConsumerRecords<byte[], byte[]> pollNext() throws Exception {
        synchronized (lock) {
            // 循環(huán)判斷 next 是否為 null
            while (next == null && error == null) {
                // lock 會(huì)釋放當(dāng)前的鎖,該 main 線(xiàn)程進(jìn)入 waiting 狀態(tài)
                lock.wait();
            }
            // 讀取 next
            ConsumerRecords<byte[], byte[]> n = next;
            if (n != null) {
                next = null;
                // 喚醒 lock(使得處于 waiting 狀態(tài)的 consumer 線(xiàn)程能夠繼續(xù)執(zhí)行)
                lock.notifyAll();
                return n;
            }
            // 省略...
        }
    }

Java 多線(xiàn)程的等待/通知機(jī)制:Object 的 wait()活箕、notify/notifyAll()
① 當(dāng)線(xiàn)程執(zhí)行 wait() 方法的時(shí)候力麸,會(huì)釋放當(dāng)前的鎖,然后讓出CPU育韩,進(jìn)入等待狀態(tài)克蚂。
② 當(dāng)線(xiàn)程執(zhí)行 notify/notifyAll() 方法的時(shí)候,會(huì)喚醒一個(gè)或多個(gè)正處于等待狀態(tài)的線(xiàn)程筋讨,然后繼續(xù)往下執(zhí)行埃叭,直到執(zhí)行完synchronized 代碼塊的代碼或是中途遇到 wait() ,再次釋放鎖悉罕。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末赤屋,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子壁袄,更是在濱河造成了極大的恐慌类早,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,576評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件然想,死亡現(xiàn)場(chǎng)離奇詭異莺奔,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)变泄,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén)令哟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人妨蛹,你說(shuō)我怎么就攤上這事屏富。” “怎么了蛙卤?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,017評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵狠半,是天一觀(guān)的道長(zhǎng)噩死。 經(jīng)常有香客問(wèn)我,道長(zhǎng)神年,這世上最難降的妖魔是什么已维? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,626評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮已日,結(jié)果婚禮上垛耳,老公的妹妹穿的比我還像新娘。我一直安慰自己飘千,他們只是感情好堂鲜,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,625評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著护奈,像睡著了一般缔莲。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上霉旗,一...
    開(kāi)封第一講書(shū)人閱讀 52,255評(píng)論 1 308
  • 那天痴奏,我揣著相機(jī)與錄音,去河邊找鬼奖慌。 笑死抛虫,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的简僧。 我是一名探鬼主播建椰,決...
    沈念sama閱讀 40,825評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼岛马!你這毒婦竟也來(lái)了棉姐?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,729評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤啦逆,失蹤者是張志新(化名)和其女友劉穎伞矩,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體夏志,經(jīng)...
    沈念sama閱讀 46,271評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡乃坤,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,363評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了沟蔑。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片湿诊。...
    茶點(diǎn)故事閱讀 40,498評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖瘦材,靈堂內(nèi)的尸體忽然破棺而出厅须,到底是詐尸還是另有隱情,我是刑警寧澤食棕,帶...
    沈念sama閱讀 36,183評(píng)論 5 350
  • 正文 年R本政府宣布朗和,位于F島的核電站错沽,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏眶拉。R本人自食惡果不足惜千埃,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,867評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望镀层。 院中可真熱鬧镰禾,春花似錦、人聲如沸唱逢。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,338評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至馋吗,卻和暖如春脐恩,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背廉沮。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,458評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人奶陈。 一個(gè)月前我還...
    沈念sama閱讀 48,906評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像附较,于是被迫代替她去往敵國(guó)和親吃粒。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,507評(píng)論 2 359