一宗弯、maven項目的pom.xml中的依賴
<properties>
<flink.version>1.9.1</flink.version>
</properties>
<!--引入flink依賴-->
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
二褐鸥、測試數(shù)據
input file path:./hello.txt
hello world
hello flink
hello spark
hello scala
how are you
fine thank you
and you
三、Flink WordCount Java版
package com.cn.wc;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 設置并行度
env.setParallelism(2);
String inputPath = "C:\\workstation\\maven_project\\flink_wordcount\\src\\main\\resources\\hello.txt";
// DataStreamSource 繼承SingleOutputStreamOperator,其繼承DataStream
// 從文件中讀取數(shù)據并模仿流式數(shù)據
DataStreamSource<String> inputDataStream = env.readTextFile(inputPath);
// 從socket(localhost:9000 可自己定義)文本流讀取數(shù)據
// DataStreamSource<String> inputDataStream2 = env.socketTextStream("localhost", 9000);
// 基于數(shù)據流進行轉換操作
SingleOutputStreamOperator<Tuple2<String, Integer>> result = inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = s.split(" ");
for (String word : words) {
collector.collect(new Tuple2<String, Integer>(word, 1));
}
}
})
.keyBy(0) // 針對相同的word 合并,批處理groupby宇弛,流處理keyby
.sum(1);
result.print(); // 到這里不會輸出
// 觸發(fā)流執(zhí)行任務
env.execute();
}
}
四步氏、運行結果
注:隨著流的不斷的觸發(fā)任務會不斷更新結果查辩。
2> (how,1)
1> (hello,1)
2> (you,1)
2> (fine,1)
1> (hello,2)
2> (you,2)
1> (hello,3)
2> (and,1)
1> (spark,1)
1> (hello,4)
2> (you,3)
1> (scala,1)
2> (world,1)
1> (are,1)
1> (thank,1)
2> (flink,1)