Flink 側(cè)流輸出源碼解析

Flink 側(cè)流輸出源碼解析

Flink 的 side output 為我們提供了側(cè)流(分流)輸出的功能磅崭,根據(jù)條件可以把一條流分為多個(gè)不同的流,之后做不同的處理邏輯应闯,下面就來(lái)看下側(cè)流輸出相關(guān)的源碼蛀序。

先來(lái)看下面的一個(gè) Demo欢瞪,一個(gè)流被分成了 3 個(gè)流,一個(gè)主流徐裸,兩個(gè)側(cè)流輸出遣鼓。

SingleOutputStreamOperator<JasonLeePOJO> process =
        kafka_source1.process(
                new ProcessFunction<JasonLeePOJO, JasonLeePOJO>() {
                    @Override
                    public void processElement(
                            JasonLeePOJO value,
                            ProcessFunction<JasonLeePOJO, JasonLeePOJO>.Context ctx,
                            Collector<JasonLeePOJO> out)
                            throws Exception {
                        // 這個(gè)是主流輸出
                        if (value.getName().equals("flink")) {
                            out.collect(value);
                        // 下面兩個(gè)是測(cè)流輸出
                        } else if (value.getName().equals("spark")) {
                            ctx.output(test, value);
                        // 測(cè)流
                        } else if (value.getName().equals("hadoop")) {
                            ctx.output(test1, value);
                        }
                    }
                });

為了更加清楚的查看每一個(gè)算子,我禁用了 operator chain重贺,任務(wù)的 DAG 圖如下所示:

image-20220912183133701

這樣就比較清晰了骑祟,很明顯從 process 算子開(kāi)始回懦,1 個(gè)數(shù)據(jù)流分為了 3 個(gè)數(shù)據(jù)流,當(dāng)然曾我,在默認(rèn)情況下沒(méi)有禁止

operator chain 所有的算子都是 chain 在一起的粉怕。

源碼解析

我們先來(lái)看第一個(gè)主流輸出也就是 out.collect(value) 的源碼,這里的 out 實(shí)際上是 TimestampedCollector 對(duì)象抒巢。

TimestampedCollector#collect

@Override
public void collect(T record) {
    output.collect(reuse.replace(record));
}

在 collect 方法中持有一個(gè) output 對(duì)象贫贝,用來(lái)輸出數(shù)據(jù),在這里實(shí)際上是一個(gè) CountingOutput 它是一個(gè)包裝了 Output 的對(duì)象蛉谜,主要用于更新發(fā)送數(shù)據(jù)的 metric稚晚,并輸出數(shù)據(jù)。

CountingOutput#collect

@Override
public void collect(StreamRecord<OUT> record) {
    numRecordsOut.inc();
    output.collect(record);
}

在 CountingOutput 中也持有一個(gè) output 對(duì)象型诚,但是這里的 output 是 BroadcastingOutputCollector 對(duì)象客燕,從名字就可以看出它是往下游廣播數(shù)據(jù)的,這里就有一個(gè)疑問(wèn)狰贯?把數(shù)據(jù)廣播到下游也搓,那豈不是下游的每個(gè)數(shù)據(jù)流都有這條數(shù)據(jù)嗎?這樣的話是怎么實(shí)現(xiàn)分流的呢涵紊?帶著這個(gè)疑問(wèn)傍妒,我們來(lái)看 BroadcastingOutputCollector 的 collect 方法是怎么實(shí)現(xiàn)的。

BroadcastingOutputCollector#collect

@Override
public void collect(StreamRecord<T> record) {
    // 這里的 outputs 數(shù)組有三個(gè) output 分別對(duì)應(yīng)上面的三個(gè)輸出流
    for (Output<StreamRecord<T>> output : outputs) {
        output.collect(record);
    }
}

在 BroadcastingOutputCollector 對(duì)象里也持有一個(gè) output 對(duì)象摸柄,其實(shí)他們都實(shí)現(xiàn)了 Output 接口颤练,用來(lái)往下游發(fā)送數(shù)據(jù),這里的 outputs 是一個(gè) Output 數(shù)組驱负,代表了下游的所有 Output嗦玖,因?yàn)樯厦嬗腥齻€(gè)輸出流,所以數(shù)組里面就包含了 3 個(gè) Output 對(duì)象跃脊。循環(huán)的調(diào)用 output 的 collect 方法往下游發(fā)送數(shù)據(jù)宇挫,因?yàn)槲掖驍嗔?operator chain,所以 process 算子和下游的 Print 算子不在同一個(gè) operatorChain 內(nèi)匾乓,那么上下游算子之間數(shù)據(jù)傳輸用的就是 RecordWriterOutput捞稿,否則用的是 CopyingChainingOutput 或者 ChainingOutput,具體使用的是哪個(gè) Output 這里就不多介紹了拼缝,后面有時(shí)間的話會(huì)單獨(dú)介紹。

RecordWriterOutput#collect

@Override
public void collect(StreamRecord<OUT> record) {
    // 主流是沒(méi)有 outputTag 的彰亥,只有測(cè)流有 outputTag
    if (this.outputTag != null) {
        // we are not responsible for emitting to the main output.
        return;
    }

    pushToRecordWriter(record);
}

接著來(lái)看 RecordWriterOutput 的 collect 方法咧七,在 collect 方法里面會(huì)先判斷 outputTag 是否為空,如果不為空不做任何處理任斋,直接返回继阻,否則就把數(shù)據(jù)推送到下游算子,只有側(cè)流輸出才需要定義 outputTag,主流(正常流)是沒(méi)有 outputTag 的瘟檩,所以這里會(huì)走 pushToRecordWriter 方法把數(shù)據(jù)寫入到下游抹缕,也就是說(shuō)雖然會(huì)以廣播的形式把數(shù)據(jù)廣播到所有下游,但其實(shí)另外兩個(gè)側(cè)流是直接返回的墨辛,只有主流才會(huì)把數(shù)據(jù)推送到下游卓研,這也就解釋了上面的疑問(wèn)。

然后再來(lái)看第二個(gè)側(cè)流輸出 ctx.output(test, value) 的源碼睹簇,這里的 ctx 實(shí)際上是 ProcessOperator#ContextImpl 對(duì)象奏赘。

ProcessOperator#ContextImpl#output

@Override
public <X> void output(OutputTag<X> outputTag, X value) {
    if (outputTag == null) {
        throw new IllegalArgumentException("OutputTag must not be null.");
    }
    output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
}

如果 outputTag 是空,直接拋出異常太惠,因?yàn)檫@個(gè)是側(cè)流磨淌,所以必須要定義 OutputTag。這里的 output 實(shí)際上是父類 AbstractStreamOperator 所持有的變量凿渊,如果 outputTag 不為空梁只,就調(diào)用 output 的 collect 方法把數(shù)據(jù)發(fā)送到下游,這里的 output 和上面的一樣是 CountingOutput 但是 collect 方法是另外一個(gè)重載的方法埃脏。

CountingOutput#collect

@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
    numRecordsOut.inc();
    output.collect(outputTag, record);
}

可以發(fā)現(xiàn)搪锣,這個(gè) collect 方法比上面那個(gè)多了一個(gè) OutputTag 參數(shù),也就是使用側(cè)流輸出的時(shí)候定義的 OutputTag 對(duì)象剂癌,然后調(diào)用 output 的 collect 方法發(fā)送數(shù)據(jù)淤翔,這個(gè)也和上面的一樣,同樣是 BroadcastingOutputCollector 對(duì)象的另外一個(gè)重載方法佩谷,多了一個(gè) OutputTag 參數(shù)旁壮。

BroadcastingOutputCollector#collect

@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
    for (Output<StreamRecord<T>> output : outputs) {
        output.collect(outputTag, record);
    }
}

這里的邏輯和上面是一樣的,同樣的循環(huán)調(diào)用 collect 方法發(fā)送數(shù)據(jù)谐檀。

RecordWriterOutput#collect

@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
    // 先要判斷兩個(gè) OutputTag 是否一樣
    if (OutputTag.isResponsibleFor(this.outputTag, outputTag)) {
        pushToRecordWriter(record);
    }
}

在這個(gè) collect 方法中會(huì)先判斷傳入的 OutputTag 對(duì)象和成員變量 this.outputTag 是不是相等抡谐,如果是的話,就發(fā)送數(shù)據(jù)桐猬,否則不做任何處理麦撵,所以這里每次只會(huì)選擇一個(gè)下游側(cè)流輸出數(shù)據(jù),這樣就實(shí)現(xiàn)了所謂的分流溃肪。

OutputTag#isResponsibleFor

public static boolean isResponsibleFor(
        @Nullable OutputTag<?> owner, @Nonnull OutputTag<?> other) {
    return other.equals(owner);
}

可以看到在 isResponsibleFor 方法內(nèi)是直接調(diào)用 OutputTag 的 equals 方法判斷兩個(gè)對(duì)象是否相等的免胃。

第三個(gè)側(cè)流 test1 ctx.output(test1, value) 和第二個(gè)側(cè)流 test 是完全一樣的情況,這里就不在看代碼了惫撰。

上面是完成了分流操作羔沙,那怎么獲取到分流后結(jié)果呢(數(shù)據(jù)流)?我們可以通過(guò) getSideOutput 方法獲取厨钻。

DataStream<JasonLeePOJO> sideOutput = process.getSideOutput(test);
DataStream<JasonLeePOJO> sideOutput1 = process.getSideOutput(test1);

getSideOutput 源碼

public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
    sideOutputTag = clean(requireNonNull(sideOutputTag));

    // make a defensive copy
    sideOutputTag = new OutputTag<X>(sideOutputTag.getId(), sideOutputTag.getTypeInfo());

    TypeInformation<?> type = requestedSideOutputs.get(sideOutputTag);
    if (type != null && !type.equals(sideOutputTag.getTypeInfo())) {
        throw new UnsupportedOperationException(
                "A side output with a matching id was "
                        + "already requested with a different type. This is not allowed, side output "
                        + "ids need to be unique.");
    }

    requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo());

    SideOutputTransformation<X> sideOutputTransformation =
            new SideOutputTransformation<>(this.getTransformation(), sideOutputTag);
    return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation);
}

getSideOutput 方法里先是構(gòu)建了一個(gè) SideOutputTransformation 對(duì)象扼雏,然后又構(gòu)建了 DataStream 對(duì)象坚嗜,這樣我們就可以基于分流后的 DataStream 做不同的處理邏輯了,從而實(shí)現(xiàn)了把一個(gè) DataStream 分流成多個(gè) DataStream 功能诗充。

總結(jié)

通過(guò)對(duì)側(cè)流輸出的源碼進(jìn)行解析苍蔬,在分流的時(shí)候,數(shù)據(jù)是通過(guò)廣播的方式發(fā)送到下游算子的蝴蜓,對(duì)于主流的數(shù)據(jù)來(lái)說(shuō)碟绑,只有 OutputTag 為空的才會(huì)處理,側(cè)流因?yàn)?OutputTag 不為空励翼,所以直接返回蜈敢,不做任何處理,那對(duì)于側(cè)流的數(shù)據(jù)來(lái)說(shuō)汽抚,是通過(guò)判斷兩個(gè) OutputTag 是否相等抓狭,所以每次只會(huì)把數(shù)據(jù)發(fā)送到下游對(duì)應(yīng)的那一個(gè)側(cè)流上去,這樣即可實(shí)現(xiàn)分流邏輯造烁。

如果你覺(jué)得文章對(duì)你有幫助,麻煩點(diǎn)一下在看吧,你的支持是我創(chuàng)作的最大動(dòng)力.

本文由mdnice多平臺(tái)發(fā)布

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末否过,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子惭蟋,更是在濱河造成了極大的恐慌苗桂,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,198評(píng)論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件告组,死亡現(xiàn)場(chǎng)離奇詭異煤伟,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)木缝,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門便锨,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人我碟,你說(shuō)我怎么就攤上這事放案。” “怎么了矫俺?”我有些...
    開(kāi)封第一講書(shū)人閱讀 167,643評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵吱殉,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我厘托,道長(zhǎng)友雳,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,495評(píng)論 1 296
  • 正文 為了忘掉前任铅匹,我火速辦了婚禮沥阱,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘伊群。我一直安慰自己考杉,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,502評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布舰始。 她就那樣靜靜地躺著崇棠,像睡著了一般。 火紅的嫁衣襯著肌膚如雪丸卷。 梳的紋絲不亂的頭發(fā)上枕稀,一...
    開(kāi)封第一講書(shū)人閱讀 52,156評(píng)論 1 308
  • 那天,我揣著相機(jī)與錄音谜嫉,去河邊找鬼萎坷。 笑死,一個(gè)胖子當(dāng)著我的面吹牛沐兰,可吹牛的內(nèi)容都是我干的哆档。 我是一名探鬼主播,決...
    沈念sama閱讀 40,743評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼住闯,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼瓜浸!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起比原,我...
    開(kāi)封第一講書(shū)人閱讀 39,659評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤插佛,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后量窘,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體雇寇,經(jīng)...
    沈念sama閱讀 46,200評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,282評(píng)論 3 340
  • 正文 我和宋清朗相戀三年蚌铜,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了锨侯。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,424評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡厘线,死狀恐怖识腿,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情造壮,我是刑警寧澤渡讼,帶...
    沈念sama閱讀 36,107評(píng)論 5 349
  • 正文 年R本政府宣布,位于F島的核電站耳璧,受9級(jí)特大地震影響成箫,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜旨枯,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,789評(píng)論 3 333
  • 文/蒙蒙 一蹬昌、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧攀隔,春花似錦皂贩、人聲如沸栖榨。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,264評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)婴栽。三九已至,卻和暖如春辈末,著一層夾襖步出監(jiān)牢的瞬間愚争,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,390評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工挤聘, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留轰枝,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,798評(píng)論 3 376
  • 正文 我出身青樓组去,卻偏偏與公主長(zhǎng)得像鞍陨,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子添怔,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,435評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容