[TOC]
數(shù)據(jù)轉(zhuǎn)換將數(shù)據(jù)流從一種形式轉(zhuǎn)換為另一種形式熄守,也就是說輸入可以是一個或多個數(shù)據(jù)流蜈垮,輸出也可以是零,或一個或多個數(shù)據(jù)流柠横。Flink1.7對transform另起一個新的名字“Operators ”--Operators transform 窃款。程序可以將多個transform組合成復(fù)雜的數(shù)據(jù)流拓?fù)洹?/p>
1.Map
Map [DataStream->DataStream]
Map: 一對一轉(zhuǎn)換,即一條轉(zhuǎn)換成另一條。
輸入一個元素并生成一個元素牍氛。 一個map函數(shù),它將輸入流的值加倍:
dataStream.map { x => x * 2 }
package com.bigdata.flink.dataStreamMapOperator;
import com.bigdata.flink.beans.UserAction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
/**
* Summary:
* Map: 一對一轉(zhuǎn)換
*/
public class DataStreamMapOperator {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 輸入: 用戶行為烟阐。某個用戶在某個時刻點(diǎn)擊或?yàn)g覽了某個商品搬俊,以及商品的價格。
DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
new UserAction("userID1", 1293984000, "click", "productID1", 10),
new UserAction("userID2", 1293984001, "browse", "productID2", 8),
new UserAction("userID1", 1293984002, "click", "productID1", 10)
));
// 轉(zhuǎn)換: 商品的價格乘以8
SingleOutputStreamOperator<UserAction> result = source.map(new MapFunction<UserAction, UserAction>() {
@Override
public UserAction map(UserAction value) throws Exception {
int newPrice = value.getProductPrice() * 8;
return new UserAction(value.getUserID(), value.getEventTime(), value.getEventType(), value.getProductID(), newPrice);
}
});
// 輸出: 輸出到控制臺
// UserAction(userID=userID1, eventTime=1293984002, eventType=click, productID=productID1, productPrice=80)
// UserAction(userID=userID1, eventTime=1293984000, eventType=click, productID=productID1, productPrice=80)
// UserAction(userID=userID2, eventTime=1293984001, eventType=browse, productID=productID2, productPrice=64)
result.print();
env.execute();
}
}
2.FlatMap
轉(zhuǎn)換:DataStream → DataStream
FlatMap [DataStream->DataStream]
FlatMap: 一行變零到多行蜒茄。如下唉擂,將一個句子(一行)分割成多個單詞(多行)。
輸入一個元素并生成零個檀葛,一個或多個元素玩祟。 將句子分割為單詞的flatmap函數(shù):
dataStream.flatMap { str => str.split(" ") }
package com.bigdata.flink.dataStreamFlatMapOperator;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* Summary:
* FlatMap: 一行變?nèi)我庑?0~多行)
*/
public class DataStreamFlatMapOperator {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 輸入: 英文電影臺詞
DataStreamSource<String> source = env
.fromElements(
"You jump I jump",
"Life was like a box of chocolates"
);
// 轉(zhuǎn)換: 將包含chocolates的句子轉(zhuǎn)換為每行一個單詞
SingleOutputStreamOperator<String> result = source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
if(value.contains("chocolates")){
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);
}
}
}
});
// 輸出: 輸出到控制臺
// Life
// was
// like
// a
// box
// of
// chocolates
result.print();
env.execute();
}
}
Filter [DataStream->DataStream]
DataStream → DataStream
計(jì)算每個元素的布爾函數(shù),并保留函數(shù)返回true的元素屿聋。 過濾掉零值的過濾器空扎,通俗來講就是過濾掉等于0的元素,轉(zhuǎn)換成新的數(shù)據(jù)流
dataStream.filter { _ != 0 }
4.KeyBy
轉(zhuǎn)換:DataStream → KeyedStream
KeyBy [DataStream->KeyedStream]
KeyBy: 按指定的Key對數(shù)據(jù)重分區(qū)润讥。將同一Key的數(shù)據(jù)放到同一個分區(qū)转锈。
邏輯分區(qū)流分為不同的分區(qū)。 具有相同key的所有記錄都分配給同一分區(qū)楚殿。 在內(nèi)部撮慨,keyBy()是使用hash分區(qū)實(shí)現(xiàn)的。 指定key有不同的方法脆粥。此Transformations返回KeyedStream砌溺,
注意:
- 分區(qū)結(jié)果和KeyBy下游算子的并行度強(qiáng)相關(guān)。如下游算子只有一個并行度,不管怎么分变隔,都會分到一起规伐。
- 對于POJO類型,KeyBy可以通過keyBy(fieldName)指定字段進(jìn)行分區(qū)弟胀。
- 對于Tuple類型楷力,KeyBy可以通過keyBy(fieldPosition)指定字段進(jìn)行分區(qū)喊式。
- 對于一般類型,如上, KeyBy可以通過keyBy(new KeySelector {...})指定字段進(jìn)行分區(qū)萧朝。
Reduce [KeyedStream->DataStream]
Reduce: 基于ReduceFunction進(jìn)行滾動聚合岔留,并向下游算子輸出每次滾動聚合后的結(jié)果。
注意: Reduce會輸出每一次滾動聚合的結(jié)果检柬。