1廓推、一個簡單的開始
- 創(chuàng)建一個maven項目汽煮,pom里添加flink依賴硼补,可以參考 https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_2-%e5%bf%ab%e9%80%9f%e4%b8%8a%e6%89%8b
- 創(chuàng)建如下代碼禾酱,并在命令行窗口輸入nc -lk 7777科侈,然后啟動程序,再在命令行窗口輸入一些詞即可開始統(tǒng)計
- StreamContextEnvironment.getExecutionEnvironment();創(chuàng)建執(zhí)行環(huán)境
- env.execute();有這個才會執(zhí)行
package BaseOperationTest;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamContextEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
// 創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
// 從文件中讀取數(shù)據(jù)
// String inputPath = "/Users/kaiker/Documents/projects/flink_study/src/main/resources/hello.txt";
// DataStream<String> inputDataStream = env.readTextFile(inputPath);
DataStream<String> inputDataStream = env.socketTextStream("localhost", 7777);
// 對數(shù)據(jù)集進行處理辅辩,按空格分詞展開难礼,轉(zhuǎn)換成(word, 1)二元組進行統(tǒng)計
// 按照第一個位置的word分組
// 按照第二個位置上的數(shù)據(jù)求和
DataStream<Tuple2<String, Integer>> resultSet = inputDataStream.flatMap(new MyFlatMapper())
.keyBy(0)
.sum(1);
resultSet.print();
env.execute();
}
// 自定義類,實現(xiàn)FlatMapFunction接口
public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> out) throws Exception {
// 按空格分詞
String[] words = s.split(" ");
// 遍歷所有word玫锋,包成二元組輸出
for (String str : words) {
out.collect(new Tuple2<>(str, 1));
}
}
}
}
2蛾茉、轉(zhuǎn)換操作
2.2 基于KeyedStream的轉(zhuǎn)換
keyBy
通過指定鍵值的方式將一個DataStream轉(zhuǎn)化為KeyedStream。流中的事件會根據(jù)各自鍵值被分到不同的分區(qū)
DataStream<SensorReading> inputDataStream = env.socketTextStream("localhost", 7777);
keyStream = inputDataStream.keyBy(SensorReading::getId);
滾動聚合
sum() min() max() 不斷獲取數(shù)據(jù)景醇,更新獲取到的聚合值
minBy() maxBy() sumBy() 不斷獲取數(shù)據(jù)臀稚,更新獲取到的聚合值吝岭,非聚合的值會使用最新的那一條更新
Reduce
每個到來的事件都會和reduce結(jié)果進行一次組合三痰,reduce不會改變數(shù)據(jù)類型
接收兩個參數(shù)吧寺,一個記錄現(xiàn)有(或者說已經(jīng)聚合的)記錄,一個記錄新來的記錄散劫,然后對兩者進行聚合稚机。
DataStream<SensorReading> resultStream = keyedStream.reduce(
(curSensor,newSensor)->new SensorReading(curSensor.getId(),newSensor.getTimestamp(), Math.max(curSensor.getTemperature(), newSensor.getTemperature()))
);
2.3 多流轉(zhuǎn)換
Union
DataStream.union可以合并多條類型相同的DataStream,生成一個新的類型相同的DataStream
Connect
- connect會連接起兩條流获搏,默認情況下赖条,不會使兩條輸入流的事件之間產(chǎn)生任何關(guān)聯(lián)。但是可以結(jié)合keyBy等方法使用常熙,讓兩條流有一定關(guān)聯(lián)
- 有coMap\coFlatMap等方法可以分別對兩條流進行處理
3纬乍、類型
- 原始類型
- Java和Scala Tuple,F(xiàn)link提供了Java元組的高效實現(xiàn)裸卫,最多可包含25個字段
- Scala case類
- POJO類仿贬,如果一個類有公共類、有一個無參構(gòu)造器墓贿、每個字段都有g(shù)etter和setter茧泪、字段類型可以支持,則可以被分析為POJO
- 其他特殊類型
為數(shù)據(jù)類型創(chuàng)建類型信息
TypeInformation<Integer> intType = Types.INT
顯式提供類型信息
有時候一些必要的信息可能無法提取聋袋,比如Java的類型擦除
可以通過實現(xiàn)ResultTypeQueryable接口來提供類型队伟,也可以使用returns()方法來顯示指定某算子的返回類型
DataStream<Person> persons = tuples
.map(t -> new Person(t.f0, t.f1)).returns(Types..POJO(Person.class));
4、富函數(shù)
- DataStreamAPI中所有轉(zhuǎn)換函數(shù)都有對應(yīng)的富函數(shù)
- 富函數(shù)可以在處理第一條數(shù)據(jù)之前進行初始化操作幽勒,獲取到一些上下文信息
public static class MyMapFunction extends RichMapFunction<SensorReading, Tuple2<Integer, String>> {
@Override public Tuple2<Integer, String> map(SensorReading value) throws Exception {
return new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(), value.getId());
}
@Override public void open(Configuration parameters) throws Exception {
System.out.println(getRuntimeContext.getIndexOfThisSubtask); // 以下可以做一些初始化工作嗜侮,例如建立一個和HDFS的連接
}
@Override public void close() throws Exception {
System.out.println("my map close"); // 以下做一些清理工作,例如斷開和HDFS的連接
}
}