flink的鍵控流轉換算子

輸入文件:


image.png
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718206,32
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.9

min算子

image.png

輸入輸出之對照:


image.png

Reduce算子

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

case class SensorReading(id: String, timestamp: Long, temperature: Double)

object ReduceTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    
    val inputPath = "D:\\jacky\\flink\\src\\main\\resources\\sensor.txt"
    val inputStream = env.readTextFile(inputPath)

    import org.apache.flink.streaming.api.scala._
    val stream: DataStream[SensorReading] = inputStream.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    //    val aggStream: DataStream[SensorReading] = stream.keyBy("id").minBy("temperature")

    val ans: DataStream[SensorReading] = stream.keyBy("id")
      .reduce((currState, newState) => {
        SensorReading(currState.id, newState.timestamp, currState.temperature.min(newState.temperature))
      })

    ans.print()

    env.setParallelism(1);

    env.execute()
  }
}

為了排除并行度帶來的影響挂滓,先把并行度設置為1:


image.png

如下兩圖,分別是 關鍵邏輯 和 輸出結果與輸入文件的對比:

image.png
image.png

另外曹体,也可以用這種等價寫法:


image.png

注意包蓝,在KeyedStream類中厕隧,才有min等轉換算子:

image.png

而真正對每一條數據進行處理的算子,是aggregate算子:


image.png

split算子

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

case class SensorReading(id: String, timestamp: Long, temperature: Double)

object ReduceTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val inputPath = "D:\\jacky\\flink\\src\\main\\resources\\sensor.txt"
    val inputStream = env.readTextFile(inputPath)

    import org.apache.flink.streaming.api.scala._
    val stream: DataStream[SensorReading] = inputStream.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    val splitStream: SplitStream[SensorReading] = stream.split(data => {
      if (data.temperature > 30.0) Seq("high") else Seq("low")
    })
    val high: DataStream[SensorReading] = splitStream.select("high")
    val low: DataStream[SensorReading] = splitStream.select("low")
    val all: DataStream[SensorReading] = splitStream.select("high", "low")

    high.print("high")
    low.print("low")
    all.print("all")

    env.execute()
  }
}

class MyReduceFunction extends ReduceFunction[SensorReading] {
  override def reduce(a: SensorReading, b: SensorReading): SensorReading =
    SensorReading(a.id, b.timestamp, a.temperature.min(b.temperature))
}
image.png
image.png

connect算子(同床異夢)

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末炸站,一起剝皮案震驚了整個濱河市星澳,隨后出現的幾起案子,更是在濱河造成了極大的恐慌旱易,老刑警劉巖禁偎,帶你破解...
    沈念sama閱讀 222,000評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異阀坏,居然都是意外死亡如暖,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 94,745評論 3 399
  • 文/潘曉璐 我一進店門忌堂,熙熙樓的掌柜王于貴愁眉苦臉地迎上來盒至,“玉大人,你說我怎么就攤上這事士修〖纤欤” “怎么了?”我有些...
    開封第一講書人閱讀 168,561評論 0 360
  • 文/不壞的土叔 我叫張陵李命,是天一觀的道長登淘。 經常有香客問我,道長封字,這世上最難降的妖魔是什么黔州? 我笑而不...
    開封第一講書人閱讀 59,782評論 1 298
  • 正文 為了忘掉前任耍鬓,我火速辦了婚禮,結果婚禮上流妻,老公的妹妹穿的比我還像新娘牲蜀。我一直安慰自己,他們只是感情好绅这,可當我...
    茶點故事閱讀 68,798評論 6 397
  • 文/花漫 我一把揭開白布涣达。 她就那樣靜靜地躺著,像睡著了一般证薇。 火紅的嫁衣襯著肌膚如雪度苔。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,394評論 1 310
  • 那天浑度,我揣著相機與錄音寇窑,去河邊找鬼。 笑死箩张,一個胖子當著我的面吹牛甩骏,可吹牛的內容都是我干的。 我是一名探鬼主播先慷,決...
    沈念sama閱讀 40,952評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼饮笛,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了论熙?” 一聲冷哼從身側響起福青,我...
    開封第一講書人閱讀 39,852評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎赴肚,沒想到半個月后素跺,有當地人在樹林里發(fā)現了一具尸體,經...
    沈念sama閱讀 46,409評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡誉券,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,483評論 3 341
  • 正文 我和宋清朗相戀三年指厌,在試婚紗的時候發(fā)現自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片踊跟。...
    茶點故事閱讀 40,615評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡踩验,死狀恐怖,靈堂內的尸體忽然破棺而出商玫,到底是詐尸還是另有隱情箕憾,我是刑警寧澤,帶...
    沈念sama閱讀 36,303評論 5 350
  • 正文 年R本政府宣布拳昌,位于F島的核電站袭异,受9級特大地震影響,放射性物質發(fā)生泄漏炬藤。R本人自食惡果不足惜御铃,卻給世界環(huán)境...
    茶點故事閱讀 41,979評論 3 334
  • 文/蒙蒙 一碴里、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧上真,春花似錦咬腋、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,470評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至就珠,卻和暖如春唉匾,著一層夾襖步出監(jiān)牢的瞬間臊泌,已是汗流浹背负蠕。 一陣腳步聲響...
    開封第一講書人閱讀 33,571評論 1 272
  • 我被黑心中介騙來泰國打工哺徊, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人蹂季。 一個月前我還...
    沈念sama閱讀 49,041評論 3 377
  • 正文 我出身青樓,卻偏偏與公主長得像疏日,于是被迫代替她去往敵國和親偿洁。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,630評論 2 359

推薦閱讀更多精彩內容