前言
在Flink中比如某些算子(join,coGroup,keyBy,groupBy)要求在數(shù)據(jù)元上定義key。另外有些算子操作扒披,例如reduce沸停,groupReduce良狈,Aggregate,Windows需要數(shù)據(jù)在處理之前根據(jù)key進(jìn)行分組黍特。
在Flink中數(shù)據(jù)模型不是基于Key,Value格式處理的蛙讥,因此不需將數(shù)據(jù)處理成鍵值對的格式,key是“虛擬的”灭衷,可以人為的來指定次慢,實(shí)際數(shù)據(jù)處理過程中根據(jù)指定的key來對數(shù)據(jù)進(jìn)行分組,DataSet中使用groupBy來指定key,DataStream中使用keyBy來指定key翔曲。那么如何指定keys呢?
一.使用Tuples來指定key
定義元組來指定key可以指定tuple中的第幾個(gè)元素當(dāng)做key迫像,或者指定tuple中的聯(lián)合元素當(dāng)做key。需要使用org.apache.flink.api.java.tuple.TupleXX
包下的tuple,最多支持25個(gè)元素且Tuple必須new創(chuàng)建瞳遍。
如果Tuple是嵌套的格式闻妓,例如:DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds,如果指定keyBy(0)則會(huì)使用內(nèi)部的整個(gè)Tuple2作為key掠械。如果想要使用內(nèi)部Tuple2中的Float格式當(dāng)做key由缆,可以使用keyBy("f0.f1")
這樣的形式指定。
這里需要注意猾蒂,在Flink的Tuple中指定的key的下標(biāo)從0開始算起均唉,這里不像Scala中的Tuple從1開始算起,同時(shí)一般需要指定key的函數(shù)中都可以有兩種寫法肚菠,一種是直接寫數(shù)字0舔箭,1,2等等蚊逢,還有一種是寫字符串的形式前面的0层扶,1,2對應(yīng)的字符串的表達(dá)形式為f1时捌,f2怒医,f3。
如果需要指定多個(gè)字段當(dāng)做聯(lián)合的Key奢讨,可以寫成keyBy(0,1)
稚叹,如果寫成字符串形式在字符串中指定多個(gè)key焰薄,還可以寫成keyBy("f0","f1")
的形式。
二.使用Field Expression來指定key
可以使用Field Expression來指定key,一般作用的對象可以是類對象扒袖,或者嵌套的Tuple格式的數(shù)據(jù)塞茅。
對于這種形式的使用,注意點(diǎn)如下:
1.對于類對象可以使用類中的字段來指定key季率,類對象定義需要注意:
- 類的訪問級別必須是public
- 必須寫出默認(rèn)的空的構(gòu)造函數(shù)
- 類中所有的字段必須是public的或者必須有g(shù)etter野瘦,setter方法。
- Flink必須支持字段的類型飒泻。
2.對于嵌套的Tuple類型的Tuple數(shù)據(jù)可以使用"xx.f0"表示嵌套tuple中第一個(gè)元素鞭光,也可以直接使用”xx.0”來表示第一個(gè)元素。
三.使用Key Selector Functions來指定key
使用key Selector這種方式選擇key泞遗,非常方便惰许,可以從數(shù)據(jù)類型中指定想要的key.
KeyedStream<String, String> keyBy = socketText.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String line) throws Exception {
return line.split("\t")[2];
}
});