看完了Flink的datasource糕伐、sink烙懦,也就把一頭一尾給看完了,從數(shù)據(jù)流入到數(shù)據(jù)流出赤炒,缺少了中間的處理環(huán)節(jié)。
而flink的大頭恰恰是只在這個(gè)中間環(huán)節(jié)亏较,如下圖:
中間的處理環(huán)節(jié)比較復(fù)雜莺褒,現(xiàn)在也就看了其中一部分,這里先開始講其中最簡(jiǎn)單 也最常用的map雪情、flatmap及filter遵岩。
map
flink中dataSourceStream和java8中的map很類似,都是用來做轉(zhuǎn)換處理的巡通,看下map的實(shí)現(xiàn):
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
TypeInformation<R> outType = TypeExtractor.getMapReturnTypes((MapFunction)this.clean(mapper), this.getType(), Utils.getCallLocationName(), true);
return this.transform("Map", outType, new StreamMap((MapFunction)this.clean(mapper)));
}
可以看到:
1尘执、返回的是SingleOutputStreamOperator泛型,這是個(gè)基礎(chǔ)的類型宴凉,好多DataStream的方法都返回它誊锭,比如map、flapmap弥锄、filter丧靡、process等
2蟆沫、最終是調(diào)用transform方法來實(shí)現(xiàn)的,看下transfrom的實(shí)現(xiàn):
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
this.transformation.getOutputType();
OneInputTransformation<T, R> resultTransform = new OneInputTransformation(this.transformation, operatorName, operator, outTypeInfo, this.environment.getParallelism());
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(this.environment, resultTransform);
this.getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
額温治,好像還不如不看饭庞,直接看怎么用吧!
@Slf4j
public class KafkaUrlSinkJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("zookeeper.connect", "localhost:2181");
properties.put("group.id", "metric-group");
properties.put("auto.offset.reset", "latest");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
SingleOutputStreamOperator<UrlInfo> dataStreamSource = env.addSource(
new FlinkKafkaConsumer010<String>(
"testjin",// topic
new SimpleStringSchema(),
properties
)
).setParallelism(1)
// map操作熬荆,轉(zhuǎn)換舟山,從一個(gè)數(shù)據(jù)流轉(zhuǎn)換成另一個(gè)數(shù)據(jù)流,這里是從string-->UrlInfo
.map(string -> JSON.parseObject(string, UrlInfo.class))
}
可以看到卤恳,kafka中傳遞的是String類型累盗,在這里通過map轉(zhuǎn)換后,變SingleOutputStreamOperator<UrlInfo> 類型纬黎,否則就是SingleOutputStreamOperator<String> 幅骄。
map方法不允許缺少數(shù)據(jù),也就是原來多少條數(shù)據(jù)本今,處理后依然是多少條數(shù)據(jù)拆座,只是用來做轉(zhuǎn)換。
flatmap
flatmap冠息,也就是將嵌套集合轉(zhuǎn)換并平鋪成非嵌套集合挪凑。看個(gè)例子逛艰,還是用上面的kafka datasource:
// 構(gòu)造一個(gè)嵌套的數(shù)據(jù)
SingleOutputStreamOperator<List<UrlInfo>> listDataStreaamSource = dataStreamSource
.map(urlInfo -> {
List<UrlInfo> list = Lists.newArrayList();
list.add(urlInfo);
UrlInfo urlInfo1 = new UrlInfo();
urlInfo1.setUrl(urlInfo.getUrl() + "-copy");
urlInfo1.setHash(DigestUtils.md5Hex(urlInfo1.getUrl()));
list.add(urlInfo1);
return list;
}).returns(new ListTypeInfo(UrlInfo.class));
listDataStreaamSource.addSink(new PrintSinkFunction<>());
說明:
1躏碳、注意這里的returns方法,如果不指定散怖,會(huì)在運(yùn)行時(shí)報(bào)錯(cuò)
/*I think the short description of the error message is quite good, but let me expand it a bit. In order to execute a program, Flink needs to know the type of the values that are processed because it needs to serialize and deserialize them. Flink's type system is based on TypeInformation which describes a data type. When you specify a function, Flink tries to infer the return type of that function. In case of the FlatMapFunction of your example the type of the objects that are passed to the Collector. Unfortunately, some Lambda functions lose this information due to type erasure such that Flink cannot automatically infer the type. Therefore, you have to explicitly declare the return type. 如果直接上面這樣轉(zhuǎn)換菇绵,因?yàn)閘ambda表達(dá)式會(huì)丟失部分信息,會(huì)報(bào)如下異常: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information. */
不過由于返回的是一個(gè)List<Ojbect>,不可能直接用 List<Object>.class镇眷,沒這種寫法咬最。而flink則
提供了更多選項(xiàng),這里使用的是
public SingleOutputStreamOperator<T> returns(TypeInformation<T> typeInfo){}
這個(gè)構(gòu)造函數(shù)欠动,而ListTypeInfo則是繼承TypeInfomation抽象類的一個(gè)List實(shí)現(xiàn)永乌。
和上文的KafkaSender一起運(yùn)行,會(huì)有如下結(jié)果:
kafkaSender:
2019-01-15 20:21:46.650 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : e89bffd6b2eff799
2019-01-15 20:21:46.653 [main] INFO myflink.KafkaSender - send msg:{"domain":"so.com","id":0,"url":"http://so.com/1547554906650"}
KafkaUrlSinkJob
[UrlInfo(id=0, url=http://so.com/1547554906650, hash=null), UrlInfo(id=0, url=http://so.com/1547554906650-copy, hash=efb0862d481297743b08126b2cda602e)]
也就是一個(gè)UrlInfo 擴(kuò)展成了 一個(gè)List<UrlInfo>
下面看看怎么使用flatmap
...
SingleOutputStreamOperator<UrlInfo> flatSource = listDataStreaamSource.flatMap(new FlatMapFunction<List<UrlInfo>, UrlInfo>() {
@Override
public void flatMap(List<UrlInfo> urlInfos, Collector<UrlInfo> collector) throws Exception {
urlInfos.parallelStream().forEach(urlInfo -> collector.collect(urlInfo));
}
});
flatSource.addSink(new PrintSinkFunction<>());
...
當(dāng)然可以寫成lambda表達(dá)式:(注意lambda表達(dá)式需要顯式指定return type)
SingleOutputStreamOperator<UrlInfo> flatSource = listDataStreaamSource.flatMap(
(FlatMapFunction<List<UrlInfo>, UrlInfo>) (urlInfos, collector) ->
urlInfos.parallelStream().forEach(urlInfo -> collector.collect(urlInfo))).returns(UrlInfo.class);
看看打印出來的結(jié)果:
2> [UrlInfo(id=0, url=http://so.com/1547554906650, hash=null), UrlInfo(id=0, url=http://so.com/1547554906650-copy, hash=efb0862d481297743b08126b2cda602e)]
1> [UrlInfo(id=0, url=http://so.com/1547554903640, hash=null), UrlInfo(id=0, url=http://so.com/1547554903640-copy, hash=138f79ecc92744a65b03132959da2f73)]
1> UrlInfo(id=0, url=http://so.com/1547554903640-copy, hash=138f79ecc92744a65b03132959da2f73)
1> UrlInfo(id=0, url=http://so.com/1547554903640, hash=null)
2> UrlInfo(id=0, url=http://so.com/1547554906650, hash=null)
2> UrlInfo(id=0, url=http://so.com/1547554906650-copy, hash=efb0862d481297743b08126b2cda602e)
也就是說具伍,flatmap方法最終返回的是一個(gè)collector翅雏,而這個(gè)collector只有一層,當(dāng)輸入數(shù)據(jù)有嵌套的情況下人芽,可以將數(shù)據(jù)平鋪處理望几。
當(dāng)然,不只是針對(duì)嵌套集合啼肩,由于flatmap返回的數(shù)據(jù)條數(shù)并不會(huì)做限制橄妆,也就可以做一些擴(kuò)展數(shù)據(jù)處理的情況衙伶,如下:
dataStream.flatMap((FlatMapFunction<String, String>) (value, out) -> {
for (String word : value.split(" ")) {
out.collect(word);
}
});
這里就是將string使用空格切割后,組成一個(gè)新的dataStream.
filter
顧名思義害碾,filter用于過濾數(shù)據(jù)矢劲,繼續(xù)在上面代碼的基礎(chǔ)上寫測(cè)試。為了避免干擾慌随,將上面兩個(gè)dataSourceStream.addSink注釋掉芬沉,添加以下代碼:
// 根據(jù)domain字段,過濾數(shù)據(jù)阁猜,只保留BAIDU的domain
SingleOutputStreamOperator<UrlInfo> filterSource = flatSource.filter(urlInfo -> {
if(StringUtils.equals(UrlInfo.BAIDU,urlInfo.getDomain())){
return true;
}
return false;
});
filterSource.addSink(new PrintSinkFunction<>());
這里排除別的domain數(shù)據(jù)丸逸,只保留BAIDU的數(shù)據(jù),運(yùn)行結(jié)果就不貼出來了剃袍,驗(yàn)證了filter的效果黄刚。