keyBy
意思:分組之意奶陈。
DataStream -> KeyedStream : 邏輯的將一個流拆分成不相交的“分區(qū)”,每個分區(qū)包含相同的 key元素叽躯,在內(nèi)部以 hash 的形式實現(xiàn)座云。
滾動聚合算子(Rolling Aggregation)
- sum()
- min()
- max()
- minBy()
- maxBy()
- reduce()
//轉(zhuǎn)換成 SensorReading 類型
DataStream<SensorReading> dataStream = inputStream.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String value) throws Exception {
String[] fields = value.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
}
});
// DataStream<SensorReading> dataStream = inputStream.map(line -> {
// String[] fields = line.split(",");
// return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
// });
//分組
KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy("id");
//max 0r maxBy
// SingleOutputStreamOperator<SensorReading> resultStream = keyedStream.maxBy("temperature");
//reduce 聚合
SingleOutputStreamOperator<SensorReading> resultStream = keyedStream.reduce(new ReduceFunction<SensorReading>() {
@Override
public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()));
}
});
//lambda
// keyedStream.reduce((curState, newData) -> {
// return new SensorReading(curState.getId(), newData.getTimestamp(), Math.max(curState.getTemperature(), newData.getTemperature()));
// });
resultStream.print();
多流轉(zhuǎn)換算子
- Split 和 Select
Split : DataStream -> SplitStream : 根據(jù)某些特征把一個DataStream 拆分2個或者多個DataStream . - Connect 和CoMap
DataStream ,DataStream -> ConnectedStream : 鏈接兩個保持他們類型的數(shù)據(jù)流,兩個數(shù)據(jù)流Connect 之后交掏,只是被放在了同一個流中,內(nèi)部依然保持各自的數(shù)據(jù)和形式不發(fā)生任何的變化刃鳄,兩個流相互獨立
之后要做轉(zhuǎn)化用 CoMap ,CoFlatMap ,真正的轉(zhuǎn)換成一條流盅弛。
缺點:不能鏈接多條流。只能是兩條流叔锐。 - Union
DataStream -> DataStream : 對兩個或者兩個以上的DataStream 進行 union 操作挪鹏,產(chǎn)生一個包含所有DataStream 元素的新DataStream.
要求:當(dāng)前合并的多條流,必須是同樣的數(shù)據(jù)類型愉烙。