如何理解 Flink 中的 算子(operator)與鏈接(chain)腿堤?

Operators

Operator 可翻譯成算子,即:將一個(gè)或多個(gè)數(shù)據(jù)流轉(zhuǎn)換成一個(gè)新的數(shù)據(jù)流的計(jì)算過(guò)程甚垦。用戶可以將多個(gè)算子組合使用來(lái)實(shí)現(xiàn)復(fù)雜數(shù)據(jù)流的轉(zhuǎn)換邏輯茶鹃。

常見(jiàn) Operators

官方支持的數(shù)據(jù)流轉(zhuǎn)換類型文檔

Map

DataStream -> DataStream
接受一個(gè)元素,然后生成一個(gè)元素艰亮。下面的代碼將源數(shù)據(jù)數(shù)值加倍生成一個(gè)新數(shù)據(jù):

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});

Filter

DataStream -> DataStream
用一個(gè)布爾型的函數(shù)來(lái)評(píng)估數(shù)據(jù)流中的每個(gè)元素闭翩,如果評(píng)估結(jié)果為真則保留,否則丟棄迄埃。下面的代碼過(guò)濾出數(shù)值為0的元素:

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});

KeyBy

DataStream → KeyedStream
邏輯上將一個(gè)數(shù)據(jù)流拆成幾個(gè)互不相交的分區(qū)疗韵。擁有相同 key 的記錄被分配到同個(gè)分區(qū)內(nèi)。內(nèi)部通過(guò)哈希分區(qū)的方式實(shí)現(xiàn)侄非。區(qū)分 key 的方式有多種蕉汪。下面的代碼返回一個(gè) KeyedStream,這個(gè) KeyedStream 可以在將來(lái)某個(gè)場(chǎng)景提供 keyed state 屬性接口逞怨。

dataStream.keyBy(value -> value.getSomeKey()) // Key by field "someKey"
dataStream.keyBy(value -> value.f0) // Key by the first element of a Tuple

注意:以下類型不能被當(dāng)成 key

  • 本身是 POJO 類型但沒(méi)有重寫 hashCode() 方法者疤,并且依賴 Object.hashCode() 實(shí)現(xiàn)。
  • 是一個(gè)包含任意類型的數(shù)組

Aggregations

KeyedStream → DataStream
在 keyed data stream 上進(jìn)行聚合操作叠赦。其中 minminBy 的區(qū)別是驹马,前者返回具體的值,后者返回該元素除秀。如:

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

通過(guò)上面介紹糯累,想必對(duì) Operators 有了一定了解,就是 Flink 實(shí)現(xiàn)了的一系列轉(zhuǎn)換數(shù)據(jù)的接口册踩,各接口接收的數(shù)據(jù)源類型不同泳姐,處理邏輯不同,產(chǎn)出的數(shù)據(jù)類型也不同暂吉,但都能在數(shù)據(jù)源上執(zhí)行一定處理邏輯胖秒。
接下來(lái)聊一聊 Chaining。

Task chaining 和資源組

在 task 執(zhí)行過(guò)程中慕的,連續(xù)執(zhí)行的幾個(gè)算子往往會(huì)隨機(jī)分配到不同的線程處理扒怖,這增加了線程間交換與緩沖的開銷,通過(guò)調(diào)用鏈接接口业稼,可以把連續(xù)的算子強(qiáng)行安排到同一個(gè)線程上處理以提高 task 的執(zhí)行性能。默認(rèn)情況下蚂蕴,F(xiàn)link 會(huì)盡可能將多個(gè)算子連接起來(lái)(如兩個(gè)連續(xù)的 map 轉(zhuǎn)換)低散。

當(dāng)然俯邓,F(xiàn)link 還提供許多細(xì)粒度的鏈接控制 API,需要注意的是熔号,調(diào)用這些 API 時(shí)必須緊跟在某個(gè) Operator 之后稽鞭,而不能直接作用于一個(gè)數(shù)據(jù)流,原因是這些 API 都依賴于之前的轉(zhuǎn)換 Operator引镊,例如:

  • someStream.map(...).startNewChain():是允許的朦蕴,可以開啟一個(gè)新的鏈
  • someStream.startNewChain():是不允許的,該 API 未跟在某個(gè) Operator 后面

注意:用戶可以通過(guò)調(diào)用接口 StreamExecutionEnvironment.disableOperatorChaining() 來(lái)禁止整個(gè) job 的鏈接操作弟头。

Flink 中的 resource group 其實(shí)就是一個(gè) slot吩抓,是整個(gè)集群的最小調(diào)度單位,屬于 TaskManagers赴恨,每個(gè) TaskManager 所擁有的 slot 數(shù)默認(rèn)為1疹娶,在集群?jiǎn)?dòng)時(shí),可以通過(guò)改變配置 taskmanager.numberOfTaskSlots 來(lái)增加伦连,slot 越多雨饺,意味著該 TaskManager 能夠同時(shí)處理的 task 越多。

通過(guò)調(diào)用不同的鏈接接口惑淳,我們可以把不同的算子隔離分配到不同的 slots 中:

開啟新鏈

接口:startNewChain()
用例:someStream.filter(...).map(...).startNewChain().map(...);
解釋:開啟一個(gè)新的鏈额港,將接口前后的算子分派到一個(gè)獨(dú)立的 slot 上,這不包括 filter 這個(gè)算子歧焦,因?yàn)樗磁c startNewChain()直接相連移斩。

關(guān)閉鏈接

接口:disableChaining()
用例:someStream.map(...).disableChaining();
解釋:由于 Flink 會(huì)盡可能將多個(gè) Operator 鏈接起來(lái),即分配到同個(gè) slot 上處理倚舀,如果你想關(guān)閉這個(gè)機(jī)制叹哭,除了前面提到的調(diào)用StreamExecutionEnvironment.disableOperatorChaining()關(guān)閉整個(gè) job 的鏈接機(jī)制之外,還可以在該算子之后調(diào)用接口disableChaining()來(lái)僅取消鏈接這個(gè)算子痕貌。

設(shè)置 slot sharing group

接口:slotSharingGroup()
用例:someStream.filter(...).slotSharingGroup("name");
解釋:在 Operator 后調(diào)用此接口风罩,可該 Operator 進(jìn)行分組,同分組內(nèi)的 Operator 執(zhí)行時(shí)會(huì)被 Flink 安排到同一個(gè) slot 中舵稠,非本分組內(nèi)的其他 Operators 將會(huì)被分配到其他 slots 中超升。默認(rèn)的 slot sharing group 叫“deafult”。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末哺徊,一起剝皮案震驚了整個(gè)濱河市室琢,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌落追,老刑警劉巖盈滴,帶你破解...
    沈念sama閱讀 210,978評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡巢钓,警方通過(guò)查閱死者的電腦和手機(jī)病苗,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)症汹,“玉大人硫朦,你說(shuō)我怎么就攤上這事”痴颍” “怎么了咬展?”我有些...
    開封第一講書人閱讀 156,623評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)瞒斩。 經(jīng)常有香客問(wèn)我破婆,道長(zhǎng),這世上最難降的妖魔是什么济瓢? 我笑而不...
    開封第一講書人閱讀 56,324評(píng)論 1 282
  • 正文 為了忘掉前任荠割,我火速辦了婚禮,結(jié)果婚禮上旺矾,老公的妹妹穿的比我還像新娘蔑鹦。我一直安慰自己,他們只是感情好箕宙,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,390評(píng)論 5 384
  • 文/花漫 我一把揭開白布嚎朽。 她就那樣靜靜地躺著,像睡著了一般柬帕。 火紅的嫁衣襯著肌膚如雪哟忍。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,741評(píng)論 1 289
  • 那天陷寝,我揣著相機(jī)與錄音锅很,去河邊找鬼。 笑死凤跑,一個(gè)胖子當(dāng)著我的面吹牛爆安,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播仔引,決...
    沈念sama閱讀 38,892評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼扔仓,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了咖耘?” 一聲冷哼從身側(cè)響起翘簇,我...
    開封第一講書人閱讀 37,655評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎儿倒,沒(méi)想到半個(gè)月后版保,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,104評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評(píng)論 2 325
  • 正文 我和宋清朗相戀三年找筝,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了蹈垢。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,569評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡袖裕,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出溉瓶,到底是詐尸還是另有隱情急鳄,我是刑警寧澤,帶...
    沈念sama閱讀 34,254評(píng)論 4 328
  • 正文 年R本政府宣布堰酿,位于F島的核電站疾宏,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏触创。R本人自食惡果不足惜坎藐,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,834評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望哼绑。 院中可真熱鬧岩馍,春花似錦、人聲如沸抖韩。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)茂浮。三九已至双谆,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間席揽,已是汗流浹背顽馋。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評(píng)論 1 264
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留幌羞,地道東北人寸谜。 一個(gè)月前我還...
    沈念sama閱讀 46,260評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像新翎,于是被迫代替她去往敵國(guó)和親程帕。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,446評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容