SparkStreaming 窗口操作

Streaming提供了滑動(dòng)窗口操作的支持,從而讓我們可以對(duì)一個(gè)滑動(dòng)窗口內(nèi)的數(shù)據(jù)執(zhí)行計(jì)算操作颂碘。每次掉落在窗口內(nèi)的RDD的數(shù)據(jù),會(huì)被聚合起來(lái)執(zhí)行計(jì)算操作微饥,然后生成的RDD,會(huì)作為window DStream的一個(gè)RDD古戴。

網(wǎng)官圖中所示欠橘,就是對(duì)每三秒鐘的數(shù)據(jù)執(zhí)行一次滑動(dòng)窗口計(jì)算,這3秒內(nèi)的3個(gè)RDD會(huì)被聚合起來(lái)進(jìn)行處理允瞧,然后過(guò)了兩秒鐘,又會(huì)對(duì)最近三秒內(nèi)的數(shù)據(jù)執(zhí)行滑動(dòng)窗口計(jì)算蛮拔。所以每個(gè)滑動(dòng)窗口操作述暂,都必須指定兩個(gè)參數(shù),窗口長(zhǎng)度以及滑動(dòng)間隔建炫,而且這兩個(gè)參數(shù)值都必須是batch間隔的整數(shù)倍畦韭。

Spark Streaming對(duì)滑動(dòng)窗口的支持,是比Storm更加完善和強(qiáng)大的肛跌。

image.png

SparkStreaming對(duì)滑動(dòng)窗口支持的轉(zhuǎn)換操作:

image.png

示例講解:

maven

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <spark.version>2.2.1</spark.version>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

基礎(chǔ)代碼

package com.neusoft
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WordCount {
    def main(args: Array[String]): Unit = {

    // Create a StreamingContext with a local master
    // Spark Streaming needs at least two working thread
    val sc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(10) )
        sc.checkpoint("hdfs://192.168.121.128:8020/demo1")
    // Create a DStream that will connect to serverIP:serverPort, like localhost:9999
    val lines = sc.socketTextStream("192.168.121.128", 9999)
    // Split each line into words
    // 以空格把收到的每一行數(shù)據(jù)分割成單詞
    val words = lines.flatMap(_.split(" "))
    // 在本批次內(nèi)計(jì)單詞的數(shù)目
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    // 打印每個(gè)RDD中的前10個(gè)元素到控制臺(tái)
    wordCounts.print()
    sc.start()
    sc.awaitTermination()
    }
}

1艺配、window(windowLength, slideInterval)

該操作由一個(gè)DStream對(duì)象調(diào)用,傳入一個(gè)窗口長(zhǎng)度參數(shù)衍慎,一個(gè)窗口移動(dòng)速率參數(shù)转唉,然后將當(dāng)前時(shí)刻當(dāng)前長(zhǎng)度窗口中的元素取出形成一個(gè)新的DStream。

下面的代碼以長(zhǎng)度為3稳捆,移動(dòng)速率為1截取源DStream中的元素形成新的DStream赠法。

val windowWords = words.window(Seconds( 3 ), Seconds( 1))
image

基本上每秒輸入一個(gè)字母,然后取出當(dāng)前時(shí)刻3秒這個(gè)長(zhǎng)度中的所有元素乔夯,打印出來(lái)砖织。從上面的截圖中可以看到,下一秒時(shí)已經(jīng)看不到a了末荐,再下一秒侧纯,已經(jīng)看不到b和c了。表示a, b, c已經(jīng)不在當(dāng)前的窗口中甲脏。

2眶熬、 countByWindow(windowLength,slideInterval)

返回指定長(zhǎng)度窗口中的元素個(gè)數(shù)。

代碼如下块请,統(tǒng)計(jì)當(dāng)前3秒長(zhǎng)度的時(shí)間窗口的DStream中元素的個(gè)數(shù):

val windowWords = words.countByWindow(Seconds( 3 ), Seconds( 1))
image

3聋涨、 reduceByWindow(func, windowLength,slideInterval)

類似于上面的reduce操作,只不過(guò)這里不再是對(duì)整個(gè)調(diào)用DStream進(jìn)行reduce操作,而是在調(diào)用DStream上首先取窗口函數(shù)的元素形成新的DStream溶握,然后在窗口元素形成的DStream上進(jìn)行reduce戒洼。

val windowWords = words.reduceByWindow(_ + "-" + _, Seconds( 3) , Seconds( 1 ))
image

4、 reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])

調(diào)用該操作的DStream中的元素格式為(k, v)茂腥,整個(gè)操作類似于前面的reduceByKey狸涌,只不過(guò)對(duì)應(yīng)的數(shù)據(jù)源不同,reduceByKeyAndWindow的數(shù)據(jù)源是基于該DStream的窗口長(zhǎng)度中的所有數(shù)據(jù)最岗。該操作也有一個(gè)可選的并發(fā)數(shù)參數(shù)帕胆。

下面代碼中,將當(dāng)前長(zhǎng)度為3的時(shí)間窗口中的所有數(shù)據(jù)元素根據(jù)key進(jìn)行合并般渡,統(tǒng)計(jì)當(dāng)前3秒中內(nèi)不同單詞出現(xiàn)的次數(shù)懒豹。

val windowWords = pairs.reduceByKeyAndWindow((a:Int , b:Int) => (a + b) , Seconds(3 ) , Seconds( 1 ))
image

5、 reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks])

這個(gè)窗口操作和上一個(gè)的區(qū)別是多傳入一個(gè)函數(shù)invFunc驯用。前面的func作用和上一個(gè)reduceByKeyAndWindow相同脸秽,后面的invFunc是用于處理流出rdd的。

在下面這個(gè)例子中蝴乔,如果把3秒的時(shí)間窗口當(dāng)成一個(gè)池塘记餐,池塘每一秒都會(huì)有魚游進(jìn)或者游出,那么第一個(gè)函數(shù)表示每由進(jìn)來(lái)一條魚薇正,就在該類魚的數(shù)量上累加片酝。而第二個(gè)函數(shù)是,每由出去一條魚挖腰,就將該魚的總數(shù)減去一雕沿。{在窗口長(zhǎng)度長(zhǎng)(比如10個(gè)批次),窗口間隔泻锫亍(比如一個(gè)批次)的情況下計(jì)算更高效晦炊,只需減去離開窗口的數(shù)據(jù),和加上進(jìn)入窗口的數(shù)據(jù)即可宁脊,不用計(jì)算重復(fù)的部分(9個(gè)批次)}

 windowWords = pairs.reduceByKeyAndWindow((a: Int, b:Int ) => (a + b) , (a:Int, b: Int) => (a - b) , Seconds( 3 ), Seconds( 1 ))

下面是演示結(jié)果断国,最終的結(jié)果是該3秒長(zhǎng)度的窗口中歷史上出現(xiàn)過(guò)的所有不同單詞個(gè)數(shù)都為0。
image

一段時(shí)間不輸入任何信息榆苞,看一下最終結(jié)果

image

6稳衬、 countByValueAndWindow(windowLength,slideInterval, [numTasks])

類似于前面的countByValue操作,調(diào)用該操作的DStream數(shù)據(jù)格式為(K, v)坐漏,返回的DStream格式為(K, Long)薄疚。統(tǒng)計(jì)當(dāng)前時(shí)間窗口中元素值相同的元素的個(gè)數(shù)。

val windowWords = words.countByValueAndWindow(Seconds( 3 ), Seconds( 1))
image

示例二:熱點(diǎn)搜索詞滑動(dòng)統(tǒng)計(jì)赊琳,每隔10秒鐘街夭,統(tǒng)計(jì)最近60秒鐘的搜索詞的搜索頻次,并打印出排名最靠前的3個(gè)搜索詞以及出現(xiàn)次數(shù)


package com.spark.streaming
import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.SparkConf

 /** * @author Ganymede */
 object WindowHotWordS {  
  def main(args: Array[String]): Unit = {  
    val conf = new SparkConf().setAppName("WindowHotWordS").setMaster("local[2]") //Scala中躏筏,創(chuàng)建的是StreamingContext 
    val ssc = new StreamingContext(conf, Seconds(5))  

    val searchLogsDStream = ssc.socketTextStream("spark1", 9999)  

    val searchWordsDStream = searchLogsDStream.map { searchLog => searchLog.split(" ")(1) }  

    val searchWordPairDStream = searchWordsDStream.map { searchWord => (searchWord, 1) } 
// reduceByKeyAndWindow
 // 第二個(gè)參數(shù)板丽,是窗口長(zhǎng)度,這是是60秒 
// 第三個(gè)參數(shù),是滑動(dòng)間隔埃碱,這里是10秒
 // 也就是說(shuō)猖辫,每隔10秒鐘,將最近60秒的數(shù)據(jù)砚殿,作為一個(gè)窗口啃憎,進(jìn)行內(nèi)部的RDD的聚合,然后統(tǒng)一對(duì)一個(gè)RDD進(jìn)行后續(xù)計(jì)算 
// 而是只是放在那里
 // 然后似炎,等待我們的滑動(dòng)間隔到了以后辛萍,10秒到了,會(huì)將之前60秒的RDD羡藐,因?yàn)橐粋€(gè)batch間隔是5秒贩毕,所以之前60秒,就有12個(gè)RDD传睹,給聚合起來(lái)耳幢,然后統(tǒng)一執(zhí)行reduceByKey操作
 // 所以這里的reduceByKeyAndWindow岸晦,是針對(duì)每個(gè)窗口執(zhí)行計(jì)算的欧啤,而不是針對(duì) 某個(gè)DStream中的RDD // 每隔10秒鐘,出來(lái) 之前60秒的收集到的單詞的統(tǒng)計(jì)次數(shù) 
// 用法 不斷輸入 zhangsan java這樣的name word
    val searchWordCountsDStream = searchWordPairDStream.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2, Seconds(60), Seconds(10))  

    val finalDStream = searchWordCountsDStream.transform(searchWordCountsRDD => {  
      val countSearchWordsRDD = searchWordCountsRDD.map(tuple => (tuple._2, tuple._1))  
      val sortedCountSearchWordsRDD = countSearchWordsRDD.sortByKey(false)  
      val sortedSearchWordCountsRDD = sortedCountSearchWordsRDD.map(tuple => (tuple._1, tuple._2))  
      val top3SearchWordCounts = sortedSearchWordCountsRDD.take(3) for (tuple <- top3SearchWordCounts) {  
        println("result : " + tuple)  //打印前三名的詞
      }  

      searchWordCountsRDD  
    })  

    finalDStream.print()  //打印出窗口期全部單詞

    ssc.start()  
    ssc.awaitTermination()  
  }  
} 

補(bǔ)充 Transform操作

Transform操作允許任意RDD-to-RDD類型的函數(shù)被應(yīng)用在一個(gè)DStream上启上。通過(guò)它可以在DStream上使用任何沒(méi)有在DStream API中暴露的任意RDD操作邢隧。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市冈在,隨后出現(xiàn)的幾起案子倒慧,更是在濱河造成了極大的恐慌,老刑警劉巖包券,帶你破解...
    沈念sama閱讀 212,332評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件纫谅,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡溅固,警方通過(guò)查閱死者的電腦和手機(jī)付秕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,508評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)侍郭,“玉大人询吴,你說(shuō)我怎么就攤上這事×猎” “怎么了猛计?”我有些...
    開封第一講書人閱讀 157,812評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)爆捞。 經(jīng)常有香客問(wèn)我奉瘤,道長(zhǎng),這世上最難降的妖魔是什么煮甥? 我笑而不...
    開封第一講書人閱讀 56,607評(píng)論 1 284
  • 正文 為了忘掉前任毛好,我火速辦了婚禮望艺,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘肌访。我一直安慰自己找默,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,728評(píng)論 6 386
  • 文/花漫 我一把揭開白布吼驶。 她就那樣靜靜地躺著惩激,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蟹演。 梳的紋絲不亂的頭發(fā)上风钻,一...
    開封第一講書人閱讀 49,919評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音酒请,去河邊找鬼骡技。 笑死,一個(gè)胖子當(dāng)著我的面吹牛羞反,可吹牛的內(nèi)容都是我干的布朦。 我是一名探鬼主播,決...
    沈念sama閱讀 39,071評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼昼窗,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼是趴!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起澄惊,我...
    開封第一講書人閱讀 37,802評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤唆途,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后掸驱,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體肛搬,經(jīng)...
    沈念sama閱讀 44,256評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,576評(píng)論 2 327
  • 正文 我和宋清朗相戀三年毕贼,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了温赔。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,712評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡帅刀,死狀恐怖让腹,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情扣溺,我是刑警寧澤骇窍,帶...
    沈念sama閱讀 34,389評(píng)論 4 332
  • 正文 年R本政府宣布,位于F島的核電站锥余,受9級(jí)特大地震影響腹纳,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,032評(píng)論 3 316
  • 文/蒙蒙 一嘲恍、第九天 我趴在偏房一處隱蔽的房頂上張望足画。 院中可真熱鬧,春花似錦佃牛、人聲如沸淹辞。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)象缀。三九已至,卻和暖如春爷速,著一層夾襖步出監(jiān)牢的瞬間央星,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,026評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工惫东, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留莉给,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,473評(píng)論 2 360
  • 正文 我出身青樓廉沮,卻偏偏與公主長(zhǎng)得像颓遏,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子废封,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,606評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容