SparkStreaming 窗口操作

熱點搜索詞滑動統(tǒng)計,每隔10秒鐘龄减,統(tǒng)計最近60秒鐘的搜索詞的搜索頻次,并打印出排名最靠前的3個搜索詞以及出現(xiàn)次數(shù)

普通SparkStreaming處理方式,如果將時間間隔設(shè)置成60s页屠,無法每隔10s輸出一次結(jié)果;如果將時間間隔設(shè)置成60s蓖柔,同時使用updatebyKeyState辰企,那么統(tǒng)計的是持續(xù)的累加結(jié)果,無法做到統(tǒng)計60s之內(nèi)的結(jié)果况鸣,此時就需要使用滑動窗口來實現(xiàn)牢贸。

Streaming提供了滑動窗口操作的支持,從而讓我們可以對一個滑動窗口內(nèi)的數(shù)據(jù)執(zhí)行計算操作懒闷。每次掉落在窗口內(nèi)的RDD的數(shù)據(jù)十减,會被聚合起來執(zhí)行計算操作栈幸,然后生成的RDD,會作為window DStream的一個RDD帮辟。

網(wǎng)官圖中所示速址,就是對每三秒鐘的數(shù)據(jù)執(zhí)行一次滑動窗口計算,這3秒內(nèi)的3個RDD會被聚合起來進(jìn)行處理由驹,然后過了兩秒鐘芍锚,又會對最近三秒內(nèi)的數(shù)據(jù)執(zhí)行滑動窗口計算。所以每個滑動窗口操作蔓榄,都必須指定兩個參數(shù)并炮,窗口長度以及滑動間隔,而且這兩個參數(shù)值都必須是batch間隔的整數(shù)倍甥郑。

Spark Streaming對滑動窗口的支持逃魄,是比Storm更加完善和強大的。

image

SparkStreaming對滑動窗口支持的轉(zhuǎn)換操作:

image

示例講解:

maven

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <spark.version>2.2.1</spark.version>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

基礎(chǔ)代碼

package com.neusoft
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WordCount {
    def main(args: Array[String]): Unit = {

    // Create a StreamingContext with a local master
    // Spark Streaming needs at least two working thread
    val sc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(10) )
        sc.checkpoint("hdfs://192.168.121.128:8020/demo1")
    // Create a DStream that will connect to serverIP:serverPort, like localhost:9999
    val lines = sc.socketTextStream("192.168.121.128", 9999)
    // Split each line into words
    // 以空格把收到的每一行數(shù)據(jù)分割成單詞
    val words = lines.flatMap(_.split(" "))
    // 在本批次內(nèi)計單詞的數(shù)目
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    // 打印每個RDD中的前10個元素到控制臺
    wordCounts.print()
    sc.start()
    sc.awaitTermination()
    }
}

1澜搅、window(windowLength, slideInterval)

該操作由一個DStream對象調(diào)用伍俘,傳入一個窗口長度參數(shù),一個窗口移動速率參數(shù)勉躺,然后將當(dāng)前時刻當(dāng)前長度窗口中的元素取出形成一個新的DStream癌瘾。

下面的代碼以長度為3,移動速率為1截取源DStream中的元素形成新的DStream饵溅。

val windowWords = words.window(Seconds( 3 ), Seconds( 1))

image

基本上每秒輸入一個字母妨退,然后取出當(dāng)前時刻3秒這個長度中的所有元素,打印出來蜕企。從上面的截圖中可以看到咬荷,下一秒時已經(jīng)看不到a了,再下一秒糖赔,已經(jīng)看不到b和c了萍丐。表示a, b, c已經(jīng)不在當(dāng)前的窗口中。

2放典、 countByWindow(windowLength,slideInterval)

返回指定長度窗口中的元素個數(shù)逝变。

代碼如下,統(tǒng)計當(dāng)前3秒長度的時間窗口的DStream中元素的個數(shù):

val windowWords = words.countByWindow(Seconds( 3 ), Seconds( 1))

image

3奋构、 reduceByWindow(func, windowLength,slideInterval)

類似于上面的reduce操作壳影,只不過這里不再是對整個調(diào)用DStream進(jìn)行reduce操作,而是在調(diào)用DStream上首先取窗口函數(shù)的元素形成新的DStream弥臼,然后在窗口元素形成的DStream上進(jìn)行reduce宴咧。

val windowWords = words.reduceByWindow(_ + "-" + _, Seconds( 3) , Seconds( 1 ))

image

4、 reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])

調(diào)用該操作的DStream中的元素格式為(k, v)径缅,整個操作類似于前面的reduceByKey掺栅,只不過對應(yīng)的數(shù)據(jù)源不同烙肺,reduceByKeyAndWindow的數(shù)據(jù)源是基于該DStream的窗口長度中的所有數(shù)據(jù)。該操作也有一個可選的并發(fā)數(shù)參數(shù)氧卧。

下面代碼中桃笙,將當(dāng)前長度為3的時間窗口中的所有數(shù)據(jù)元素根據(jù)key進(jìn)行合并,統(tǒng)計當(dāng)前3秒中內(nèi)不同單詞出現(xiàn)的次數(shù)沙绝。

val windowWords = pairs.reduceByKeyAndWindow((a:Int , b:Int) => (a + b) , Seconds(3 ) , Seconds( 1 ))

image

5搏明、 reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks])

這個窗口操作和上一個的區(qū)別是多傳入一個函數(shù)invFunc。前面的func作用和上一個reduceByKeyAndWindow相同闪檬,后面的invFunc是用于處理流出rdd的星著。

在下面這個例子中,如果把3秒的時間窗口當(dāng)成一個池塘粗悯,池塘每一秒都會有魚游進(jìn)或者游出虚循,那么第一個函數(shù)表示每由進(jìn)來一條魚,就在該類魚的數(shù)量上累加为黎。而第二個函數(shù)是邮丰,每由出去一條魚行您,就將該魚的總數(shù)減去一铭乾。{在窗口長度長(比如10個批次),窗口間隔型扪(比如一個批次)的情況下計算更高效炕檩,只需減去離開窗口的數(shù)據(jù),和加上進(jìn)入窗口的數(shù)據(jù)即可捌斧,不用計算重復(fù)的部分(9個批次)}

 windowWords = pairs.reduceByKeyAndWindow((a: Int, b:Int ) => (a + b) , (a:Int, b: Int) => (a - b) , Seconds( 3 ), Seconds( 1 ))

下面是演示結(jié)果笛质,最終的結(jié)果是該3秒長度的窗口中歷史上出現(xiàn)過的所有不同單詞個數(shù)都為0。

image

一段時間不輸入任何信息捞蚂,看一下最終結(jié)果

image

6妇押、 countByValueAndWindow(windowLength,slideInterval, [numTasks])

類似于前面的countByValue操作,調(diào)用該操作的DStream數(shù)據(jù)格式為(K, v)姓迅,返回的DStream格式為(K, Long)敲霍。統(tǒng)計當(dāng)前時間窗口中元素值相同的元素的個數(shù)。

val windowWords = words.countByValueAndWindow(Seconds( 3 ), Seconds( 1))

image

示例二:熱點搜索詞滑動統(tǒng)計丁存,每隔10秒鐘肩杈,統(tǒng)計最近60秒鐘的搜索詞的搜索頻次,并打印出排名最靠前的3個搜索詞以及出現(xiàn)次數(shù)


package com.spark.streaming
import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.SparkConf

 /** * @author Ganymede */
 object WindowHotWordS {  
  def main(args: Array[String]): Unit = {  
    val conf = new SparkConf().setAppName("WindowHotWordS").setMaster("local[2]") //Scala中解寝,創(chuàng)建的是StreamingContext 
    val ssc = new StreamingContext(conf, Seconds(5))  

    val searchLogsDStream = ssc.socketTextStream("spark1", 9999)  

    val searchWordsDStream = searchLogsDStream.map { searchLog => searchLog.split(" ")(1) }  

    val searchWordPairDStream = searchWordsDStream.map { searchWord => (searchWord, 1) } 
// reduceByKeyAndWindow
 // 第二個參數(shù)扩然,是窗口長度,這是是60秒 
// 第三個參數(shù)聋伦,是滑動間隔夫偶,這里是10秒
 // 也就是說界睁,每隔10秒鐘,將最近60秒的數(shù)據(jù)兵拢,作為一個窗口晕窑,進(jìn)行內(nèi)部的RDD的聚合,然后統(tǒng)一對一個RDD進(jìn)行后續(xù)計算 
// 而是只是放在那里
 // 然后卵佛,等待我們的滑動間隔到了以后杨赤,10秒到了,會將之前60秒的RDD截汪,因為一個batch間隔是5秒疾牲,所以之前60秒,就有12個RDD衙解,給聚合起來阳柔,然后統(tǒng)一執(zhí)行reduceByKey操作
 // 所以這里的reduceByKeyAndWindow,是針對每個窗口執(zhí)行計算的蚓峦,而不是針對 某個DStream中的RDD // 每隔10秒鐘舌剂,出來 之前60秒的收集到的單詞的統(tǒng)計次數(shù) 
// 用法 不斷輸入 zhangsan java這樣的name word
    val searchWordCountsDStream = searchWordPairDStream.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2, Seconds(60), Seconds(10))  

    val finalDStream = searchWordCountsDStream.transform(searchWordCountsRDD => {  
      val countSearchWordsRDD = searchWordCountsRDD.map(tuple => (tuple._2, tuple._1))  
      val sortedCountSearchWordsRDD = countSearchWordsRDD.sortByKey(false)  
      val sortedSearchWordCountsRDD = sortedCountSearchWordsRDD.map(tuple => (tuple._1, tuple._2))  
      val top3SearchWordCounts = sortedSearchWordCountsRDD.take(3) for (tuple <- top3SearchWordCounts) {  
        println("result : " + tuple)  //打印前三名的詞
      }  

      searchWordCountsRDD  
    })  

    finalDStream.print()  //打印出窗口期全部單詞

    ssc.start()  
    ssc.awaitTermination()  
  }  
} 

補充 Transform操作

Transform操作允許任意RDD-to-RDD類型的函數(shù)被應(yīng)用在一個DStream上。通過它可以在DStream上使用任何沒有在DStream API中暴露的任意RDD操作暑椰。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末霍转,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子一汽,更是在濱河造成了極大的恐慌避消,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,294評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件召夹,死亡現(xiàn)場離奇詭異岩喷,居然都是意外死亡,警方通過查閱死者的電腦和手機监憎,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,493評論 3 385
  • 文/潘曉璐 我一進(jìn)店門纱意,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人鲸阔,你說我怎么就攤上這事偷霉。” “怎么了隶债?”我有些...
    開封第一講書人閱讀 157,790評論 0 348
  • 文/不壞的土叔 我叫張陵腾它,是天一觀的道長。 經(jīng)常有香客問我死讹,道長瞒滴,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,595評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮妓忍,結(jié)果婚禮上虏两,老公的妹妹穿的比我還像新娘。我一直安慰自己世剖,他們只是感情好定罢,可當(dāng)我...
    茶點故事閱讀 65,718評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著旁瘫,像睡著了一般祖凫。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上酬凳,一...
    開封第一講書人閱讀 49,906評論 1 290
  • 那天惠况,我揣著相機與錄音,去河邊找鬼宁仔。 笑死稠屠,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的翎苫。 我是一名探鬼主播权埠,決...
    沈念sama閱讀 39,053評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼煎谍!你這毒婦竟也來了攘蔽?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,797評論 0 268
  • 序言:老撾萬榮一對情侶失蹤粱快,失蹤者是張志新(化名)和其女友劉穎秩彤,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體事哭,經(jīng)...
    沈念sama閱讀 44,250評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,570評論 2 327
  • 正文 我和宋清朗相戀三年瓜富,在試婚紗的時候發(fā)現(xiàn)自己被綠了鳍咱。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,711評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡与柑,死狀恐怖谤辜,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情价捧,我是刑警寧澤丑念,帶...
    沈念sama閱讀 34,388評論 4 332
  • 正文 年R本政府宣布,位于F島的核電站结蟋,受9級特大地震影響脯倚,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,018評論 3 316
  • 文/蒙蒙 一推正、第九天 我趴在偏房一處隱蔽的房頂上張望恍涂。 院中可真熱鬧,春花似錦植榕、人聲如沸再沧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,796評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽炒瘸。三九已至,卻和暖如春寝衫,著一層夾襖步出監(jiān)牢的瞬間什燕,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,023評論 1 266
  • 我被黑心中介騙來泰國打工竞端, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留屎即,地道東北人。 一個月前我還...
    沈念sama閱讀 46,461評論 2 360
  • 正文 我出身青樓事富,卻偏偏與公主長得像技俐,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子统台,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,595評論 2 350

推薦閱讀更多精彩內(nèi)容