Operators
Operator 可翻譯成算子,即:將一個(gè)或多個(gè)數(shù)據(jù)流轉(zhuǎn)換成一個(gè)新的數(shù)據(jù)流的計(jì)算過(guò)程甚垦。用戶可以將多個(gè)算子組合使用來(lái)實(shí)現(xiàn)復(fù)雜數(shù)據(jù)流的轉(zhuǎn)換邏輯茶鹃。
常見(jiàn) Operators
官方支持的數(shù)據(jù)流轉(zhuǎn)換類型文檔
Map
DataStream -> DataStream
接受一個(gè)元素,然后生成一個(gè)元素艰亮。下面的代碼將源數(shù)據(jù)數(shù)值加倍生成一個(gè)新數(shù)據(jù):
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
Filter
DataStream -> DataStream
用一個(gè)布爾型的函數(shù)來(lái)評(píng)估數(shù)據(jù)流中的每個(gè)元素闭翩,如果評(píng)估結(jié)果為真則保留,否則丟棄迄埃。下面的代碼過(guò)濾出數(shù)值為0的元素:
dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
KeyBy
DataStream → KeyedStream
邏輯上將一個(gè)數(shù)據(jù)流拆成幾個(gè)互不相交的分區(qū)疗韵。擁有相同 key 的記錄被分配到同個(gè)分區(qū)內(nèi)。內(nèi)部通過(guò)哈希分區(qū)的方式實(shí)現(xiàn)侄非。區(qū)分 key 的方式有多種蕉汪。下面的代碼返回一個(gè) KeyedStream,這個(gè) KeyedStream 可以在將來(lái)某個(gè)場(chǎng)景提供 keyed state 屬性接口逞怨。
dataStream.keyBy(value -> value.getSomeKey()) // Key by field "someKey"
dataStream.keyBy(value -> value.f0) // Key by the first element of a Tuple
注意:以下類型不能被當(dāng)成 key
- 本身是 POJO 類型但沒(méi)有重寫 hashCode() 方法者疤,并且依賴 Object.hashCode() 實(shí)現(xiàn)。
- 是一個(gè)包含任意類型的數(shù)組
Aggregations
KeyedStream → DataStream
在 keyed data stream 上進(jìn)行聚合操作叠赦。其中 min
與 minBy
的區(qū)別是驹马,前者返回具體的值,后者返回該元素除秀。如:
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
通過(guò)上面介紹糯累,想必對(duì) Operators 有了一定了解,就是 Flink 實(shí)現(xiàn)了的一系列轉(zhuǎn)換數(shù)據(jù)的接口册踩,各接口接收的數(shù)據(jù)源類型不同泳姐,處理邏輯不同,產(chǎn)出的數(shù)據(jù)類型也不同暂吉,但都能在數(shù)據(jù)源上執(zhí)行一定處理邏輯胖秒。
接下來(lái)聊一聊 Chaining。
Task chaining 和資源組
在 task 執(zhí)行過(guò)程中慕的,連續(xù)執(zhí)行的幾個(gè)算子往往會(huì)隨機(jī)分配到不同的線程處理扒怖,這增加了線程間交換與緩沖的開銷,通過(guò)調(diào)用鏈接接口业稼,可以把連續(xù)的算子強(qiáng)行安排到同一個(gè)線程上處理以提高 task 的執(zhí)行性能。默認(rèn)情況下蚂蕴,F(xiàn)link 會(huì)盡可能將多個(gè)算子連接起來(lái)(如兩個(gè)連續(xù)的 map 轉(zhuǎn)換)低散。
當(dāng)然俯邓,F(xiàn)link 還提供許多細(xì)粒度的鏈接控制 API,需要注意的是熔号,調(diào)用這些 API 時(shí)必須緊跟在某個(gè) Operator 之后稽鞭,而不能直接作用于一個(gè)數(shù)據(jù)流,原因是這些 API 都依賴于之前的轉(zhuǎn)換 Operator引镊,例如:
-
someStream.map(...).startNewChain()
:是允許的朦蕴,可以開啟一個(gè)新的鏈 -
someStream.startNewChain()
:是不允許的,該 API 未跟在某個(gè) Operator 后面
注意:用戶可以通過(guò)調(diào)用接口
StreamExecutionEnvironment.disableOperatorChaining()
來(lái)禁止整個(gè) job 的鏈接操作弟头。
Flink 中的 resource group
其實(shí)就是一個(gè) slot吩抓,是整個(gè)集群的最小調(diào)度單位,屬于 TaskManagers赴恨,每個(gè) TaskManager 所擁有的 slot 數(shù)默認(rèn)為1疹娶,在集群?jiǎn)?dòng)時(shí),可以通過(guò)改變配置 taskmanager.numberOfTaskSlots
來(lái)增加伦连,slot 越多雨饺,意味著該 TaskManager 能夠同時(shí)處理的 task 越多。
通過(guò)調(diào)用不同的鏈接接口惑淳,我們可以把不同的算子隔離分配到不同的 slots 中:
開啟新鏈
接口:startNewChain()
用例:someStream.filter(...).map(...).startNewChain().map(...);
解釋:開啟一個(gè)新的鏈额港,將接口前后的算子分派到一個(gè)獨(dú)立的 slot 上,這不包括 filter 這個(gè)算子歧焦,因?yàn)樗磁c startNewChain()
直接相連移斩。
關(guān)閉鏈接
接口:disableChaining()
用例:someStream.map(...).disableChaining();
解釋:由于 Flink 會(huì)盡可能將多個(gè) Operator 鏈接起來(lái),即分配到同個(gè) slot 上處理倚舀,如果你想關(guān)閉這個(gè)機(jī)制叹哭,除了前面提到的調(diào)用StreamExecutionEnvironment.disableOperatorChaining()
關(guān)閉整個(gè) job 的鏈接機(jī)制之外,還可以在該算子之后調(diào)用接口disableChaining()
來(lái)僅取消鏈接這個(gè)算子痕貌。
設(shè)置 slot sharing group
接口:slotSharingGroup()
用例:someStream.filter(...).slotSharingGroup("name");
解釋:在 Operator 后調(diào)用此接口风罩,可該 Operator 進(jìn)行分組,同分組內(nèi)的 Operator 執(zhí)行時(shí)會(huì)被 Flink 安排到同一個(gè) slot 中舵稠,非本分組內(nèi)的其他 Operators 將會(huì)被分配到其他 slots 中超升。默認(rèn)的 slot sharing group 叫“deafult”。