本文用java來實(shí)現(xiàn)一個(gè)簡單的fink計(jì)數(shù)單詞脚乡。
本文寫了兩個(gè)flink版本哥蔚,一個(gè)普通版本嚼贡,一個(gè)lambda版本驮配。有以下注意點(diǎn):
- 寫法差異娘扩,lambda需要調(diào)用return方法,而普通版本是不需要的壮锻。
- 在導(dǎo)包時(shí)琐旁,注意不要導(dǎo)成Scala中包,很多類名在flink和Scala中都有同名猜绣。會(huì)衍生出不必要的錯(cuò)誤灰殴。比如Tuple2這個(gè)類。
在本地執(zhí)行命令:
curl https://flink.apache.org/q/quickstart.sh | bash
會(huì)下載一個(gè)官網(wǎng)的示例掰邢,主要看中了他的pom文件牺陶。可以將他的項(xiàng)目導(dǎo)入IDEA中辣之,編寫自己的示例程序掰伸。
package org.myorg.test;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.stream.Stream;
/**
* @author lingbao08
* @DESCRIPTION
* @create 2019-09-15 13:23
**/
public class WordCount {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.fromElements("我是中國人,我愛中國");
//普通版本
DataSet<Tuple2<String, Integer>> counts =
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] split = s.split("");
for (String s1 : split) {
if (s1.length() > 0)
collector.collect(new Tuple2<String, Integer>(s1, 1));
}
}
}).groupBy(0)
.sum(1);
counts.print();
}
}
lambda版本:
DataSet<Tuple2<String, Integer>> counts =
text.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (s, collector) -> {
Stream.of(s.split("")).forEach(v -> collector.collect(new Tuple2<>(v, 1)));
}).returns(Types.TUPLE(Types.STRING, Types.INT)).groupBy(0)
.sum(1);
counts.print();
參考:https://blog.csdn.net/RUIMENG061511332/article/details/91873570.