輸入文件:
image.png
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718206,32
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.9
min算子
image.png
輸入輸出之對照:
image.png
Reduce算子
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object ReduceTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputPath = "D:\\jacky\\flink\\src\\main\\resources\\sensor.txt"
val inputStream = env.readTextFile(inputPath)
import org.apache.flink.streaming.api.scala._
val stream: DataStream[SensorReading] = inputStream.map(data => {
val arr: Array[String] = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
})
// val aggStream: DataStream[SensorReading] = stream.keyBy("id").minBy("temperature")
val ans: DataStream[SensorReading] = stream.keyBy("id")
.reduce((currState, newState) => {
SensorReading(currState.id, newState.timestamp, currState.temperature.min(newState.temperature))
})
ans.print()
env.setParallelism(1);
env.execute()
}
}
為了排除并行度帶來的影響挂滓,先把并行度設置為1:
image.png
如下兩圖,分別是 關鍵邏輯 和 輸出結果與輸入文件的對比:
image.png
image.png
另外曹体,也可以用這種等價寫法:
image.png
注意包蓝,在KeyedStream類中厕隧,才有min等轉換算子:
image.png
而真正對每一條數據進行處理的算子,是aggregate算子:
image.png
split算子
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object ReduceTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputPath = "D:\\jacky\\flink\\src\\main\\resources\\sensor.txt"
val inputStream = env.readTextFile(inputPath)
import org.apache.flink.streaming.api.scala._
val stream: DataStream[SensorReading] = inputStream.map(data => {
val arr: Array[String] = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
})
val splitStream: SplitStream[SensorReading] = stream.split(data => {
if (data.temperature > 30.0) Seq("high") else Seq("low")
})
val high: DataStream[SensorReading] = splitStream.select("high")
val low: DataStream[SensorReading] = splitStream.select("low")
val all: DataStream[SensorReading] = splitStream.select("high", "low")
high.print("high")
low.print("low")
all.print("all")
env.execute()
}
}
class MyReduceFunction extends ReduceFunction[SensorReading] {
override def reduce(a: SensorReading, b: SensorReading): SensorReading =
SensorReading(a.id, b.timestamp, a.temperature.min(b.temperature))
}
image.png
image.png