轉(zhuǎn)化算子(transform operator)將一個(gè)或多個(gè)DataStream轉(zhuǎn)換為新的DataStream,如此下去可以將多個(gè)轉(zhuǎn)換組合成復(fù)雜的數(shù)據(jù)流拓?fù)洹?/p>
本節(jié)介紹了基本轉(zhuǎn)換库倘,應(yīng)用這些轉(zhuǎn)換的有效物理分區(qū)(partition)临扮,以及對(duì)Flink轉(zhuǎn)換chain的深入介紹。
目錄
- DataStream轉(zhuǎn)換
- 物理分區(qū)
- 任務(wù)鏈和資源組
DataStream轉(zhuǎn)換
-
map
DataStream→DataStream |
讀取一個(gè)元素并生成一個(gè)元素教翩。例如杆勇,一個(gè)map函數(shù),它將輸入流的值加倍:
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
-
FlatMap
DataStream→DataStream
讀取一個(gè)元素饱亿,并生成零個(gè)蚜退、一個(gè)或多個(gè)元素。例如:將句子分割為單詞的flatmap函數(shù):
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
-
Filter
DataStream→DataStream
將每個(gè)元素輸入的filter布爾函數(shù):僅保留filter函數(shù)返回true的那部分元素彪笼,filter返回false的元素會(huì)被過濾掉钻注。例如:過濾掉零值的過濾器:
dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
-
KeyBy
DataStream→KeyedStream |
在邏輯上將流分區(qū)為互不相交的分區(qū)。具有相同key的所有記錄會(huì)分配給到同一分區(qū)配猫。在內(nèi)部幅恋,keyBy()是使用hash分區(qū)實(shí)現(xiàn)。在flink中有多種指定鍵的方法
此轉(zhuǎn)換返回的是KeyedStream泵肄,其中包括key-state捆交。
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
注意 :
- 它是POJO類型淑翼,但不覆蓋hashCode()方法,key內(nèi)部是hash分區(qū)依賴于hashCode()方法的實(shí)現(xiàn)品追。
- 任何類型的數(shù)組也不能成為key玄括。
-
reduce
KeyedStream→DataStream
注意reduce函數(shù)是將KeyedStream轉(zhuǎn)換為DataStream,也就是reduce調(diào)用前必須進(jìn)行分區(qū)肉瓦,即得先調(diào)用
keyBy()
函數(shù)
在分區(qū)的數(shù)據(jù)流上調(diào)用reduce函數(shù):將當(dāng)前元素與最后一個(gè)reduce的值合并生成新值遭京。
例如:一個(gè)求和的reduce函數(shù):
keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});
-
Fold
KeyedStream→DataStream
注意Fold轉(zhuǎn)換必須是基于KeyedStream(比如先執(zhí)行keyBy操作)。
在一個(gè)初始值上進(jìn)行Fold操作:將當(dāng)前值和上一次Fold產(chǎn)生的值進(jìn)行合并產(chǎn)生一個(gè)新的值:
比如:將Fold函數(shù)應(yīng)用于(1,2,3,4,5)時(shí)泞莉,結(jié)果為:“start-1”哪雕,“start-1-2”,“start-1-2-3”,. ..
DataStream<String> result =
keyedStream.fold("start", new FoldFunction<Integer, String>() {
@Override
public String fold(String current, Integer value) {
return current + "-" + value;
}
});
-
Aggregations(聚合)
KeyedStream→DataStream
min和minBy之間的差異是min返回最小值戒财,而minBy返回該字段中具有最小值的元素(max和maxBy相同)热监。
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
-
Window
KeyedStream→WindowedStream
可以在已經(jīng)分區(qū)的KeyedStream上定義Windows捺弦。Windows根據(jù)某些特征(例如饮寞,在最后5秒內(nèi)到達(dá)的數(shù)據(jù))對(duì)每個(gè)key中的數(shù)據(jù)進(jìn)行分組。有關(guān)窗口的完整說明列吼,請參見windows幽崩。
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
-
WindowAll
DataStream→AllWindowedStream
Windows可以在常規(guī)DataStream上定義。Windows根據(jù)某些特征(例如寞钥,在最后5秒內(nèi)到達(dá)的數(shù)據(jù))對(duì)所有流事件進(jìn)行分組慌申。有關(guān)窗口的完整說明,請參見windows理郑。
警告:在許多情況下蹄溉,這是非并行轉(zhuǎn)換。對(duì)于indowAll運(yùn)算算子來說所有記錄將收集在一個(gè)任務(wù)中您炉。
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
-
Window Apply
WindowedStream→DataStream
AllWindowedStream→DataStream |
將一般性函數(shù)應(yīng)用于整個(gè)窗口柒爵。下面是一個(gè)求和窗口函數(shù)。
注意:如果您正在使用windowAll轉(zhuǎn)換赚爵,則需要使用AllWindowFunction棉胀。
windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
public void apply (Tuple tuple,
Window window,
Iterable<Tuple2<String, Integer>> values,
Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum += t.f1;
}
out.collect (new Integer(sum));
}
});
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
public void apply (Window window,
Iterable<Tuple2<String, Integer>> values,
Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum += t.f1;
}
out.collect (new Integer(sum));
}
});
-
Window Reduce
WindowedStream→DataStream |
將reduce函數(shù)應(yīng)用于窗口。返回reduce后的新結(jié)果冀膝。
windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
}
});
-
Window Fold
WindowedStream→DataStream
將fold函數(shù)應(yīng)用于窗口并返回新的值唁奢。示例函數(shù)應(yīng)用于序列(1,2,3,4,5)時(shí),fold函數(shù)的輸出為字符串“start-1-2-3-4-5”:
windowedStream.fold("start", new FoldFunction<Integer, String>() {
public String fold(String current, Integer value) {
return current + "-" + value;
}
});
-
Windows上的聚合
WindowedStream→DataStream
聚合窗口上的內(nèi)容窝剖。min和minBy之間的差異是:min返回最小值麻掸,而minBy返回該字段中具有最小值的元素(max和maxBy相同)。
windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");
-
Union
DataStream *→DataStream
兩個(gè)或多個(gè)數(shù)據(jù)流的聯(lián)合赐纱,創(chuàng)建包含來自所有流的所有元素的新流脊奋。注意:如果將數(shù)據(jù)流與其自身聯(lián)合采郎,則會(huì)在結(jié)果流中獲取兩次元素。
dataStream.union(otherStream1, otherStream2, ...);
-
Window join
DataStream狂魔,DataStream→DataStream |
在給定key和公共窗口上連接兩個(gè)數(shù)據(jù)流蒜埋。
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});
-
Window CoGroup
DataStream,DataStream→DataStream |
在給定key和公共窗口上對(duì)兩個(gè)數(shù)據(jù)流進(jìn)行Cogroup最楷。
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...});
-
Connect
DataStream整份,DataStream→ConnectedStreams |
“連接”兩個(gè)保留其類型的數(shù)據(jù)流。連接操作允許兩個(gè)流之間的共享狀態(tài)籽孙。
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
-
CoMap烈评,CoFlatMap
ConnectedStreams→DataStream
類似于ConnectedStreams上的map和flatMap
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
@Override
public Boolean map1(Integer value) {
return true;
}
@Override
public Boolean map2(String value) {
return false;
}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) {
out.collect(value.toString());
}
@Override
public void flatMap2(String value, Collector<String> out) {
for (String word: value.split(" ")) {
out.collect(word);
}
}
});
-
Split
DataStream→SplitStream
根據(jù)某些標(biāo)準(zhǔn)將流拆分為兩個(gè)或更多個(gè)流。
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});
-
Select
SplitStream→DataStream
從split流中選擇一個(gè)或多個(gè)流犯建。
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");
-
迭代
DataStream→IterativeStream→DataStream |
通過將一個(gè)運(yùn)算符的輸出重定向到某個(gè)先前的運(yùn)算符讲冠,在流中創(chuàng)建“反饋”循環(huán)。這對(duì)于定義不斷更新模型的算法特別有用适瓦。以下代碼以流開頭并連續(xù)應(yīng)用迭代體竿开。大于0的元素將被發(fā)送回反饋通道,其余元素將向下游轉(zhuǎn)發(fā)玻熙。有關(guān)完整說明否彩,請參閱迭代。
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value > 0;
}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value <= 0;
}
});
物理分區(qū)
Flink還通過以下函數(shù)對(duì)轉(zhuǎn)換后的stream進(jìn)行精確分區(qū)嗦随、進(jìn)行l(wèi)ow-level控制(如果需要)列荔。
-
自定義分區(qū)
DataStream→DataStream
使用用戶定義的分區(qū)程序(Partitioner )為每個(gè)元素選擇目標(biāo)分區(qū)。
dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);
-
隨機(jī)分區(qū)
DataStream→DataStream
均勻分布隨機(jī)分配元素:(均勻分布)
dataStream.shuffle();
-
重新平衡(循環(huán)分區(qū))
DataStream→DataStream
對(duì)元素循環(huán)分區(qū)枚尼,每個(gè)分區(qū)的負(fù)載相等贴浙。在存在數(shù)據(jù)偏斜時(shí),用于性能優(yōu)化署恍。
dataStream.rebalance();