熱點搜索詞滑動統(tǒng)計,每隔10秒鐘龄减,統(tǒng)計最近60秒鐘的搜索詞的搜索頻次,并打印出排名最靠前的3個搜索詞以及出現(xiàn)次數(shù)
普通SparkStreaming處理方式,如果將時間間隔設(shè)置成60s页屠,無法每隔10s輸出一次結(jié)果;如果將時間間隔設(shè)置成60s蓖柔,同時使用updatebyKeyState辰企,那么統(tǒng)計的是持續(xù)的累加結(jié)果,無法做到統(tǒng)計60s之內(nèi)的結(jié)果况鸣,此時就需要使用滑動窗口來實現(xiàn)牢贸。
Streaming提供了滑動窗口操作的支持,從而讓我們可以對一個滑動窗口內(nèi)的數(shù)據(jù)執(zhí)行計算操作懒闷。每次掉落在窗口內(nèi)的RDD的數(shù)據(jù)十减,會被聚合起來執(zhí)行計算操作栈幸,然后生成的RDD,會作為window DStream的一個RDD帮辟。
網(wǎng)官圖中所示速址,就是對每三秒鐘的數(shù)據(jù)執(zhí)行一次滑動窗口計算,這3秒內(nèi)的3個RDD會被聚合起來進(jìn)行處理由驹,然后過了兩秒鐘芍锚,又會對最近三秒內(nèi)的數(shù)據(jù)執(zhí)行滑動窗口計算。所以每個滑動窗口操作蔓榄,都必須指定兩個參數(shù)并炮,窗口長度以及滑動間隔,而且這兩個參數(shù)值都必須是batch間隔的整數(shù)倍甥郑。
Spark Streaming對滑動窗口的支持逃魄,是比Storm更加完善和強大的。
SparkStreaming對滑動窗口支持的轉(zhuǎn)換操作:
示例講解:
maven
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>2.2.1</spark.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
基礎(chǔ)代碼
package com.neusoft
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WordCount {
def main(args: Array[String]): Unit = {
// Create a StreamingContext with a local master
// Spark Streaming needs at least two working thread
val sc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(10) )
sc.checkpoint("hdfs://192.168.121.128:8020/demo1")
// Create a DStream that will connect to serverIP:serverPort, like localhost:9999
val lines = sc.socketTextStream("192.168.121.128", 9999)
// Split each line into words
// 以空格把收到的每一行數(shù)據(jù)分割成單詞
val words = lines.flatMap(_.split(" "))
// 在本批次內(nèi)計單詞的數(shù)目
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
// 打印每個RDD中的前10個元素到控制臺
wordCounts.print()
sc.start()
sc.awaitTermination()
}
}
1澜搅、window(windowLength, slideInterval)
該操作由一個DStream對象調(diào)用伍俘,傳入一個窗口長度參數(shù),一個窗口移動速率參數(shù)勉躺,然后將當(dāng)前時刻當(dāng)前長度窗口中的元素取出形成一個新的DStream癌瘾。
下面的代碼以長度為3,移動速率為1截取源DStream中的元素形成新的DStream饵溅。
val windowWords = words.window(Seconds( 3 ), Seconds( 1))
基本上每秒輸入一個字母妨退,然后取出當(dāng)前時刻3秒這個長度中的所有元素,打印出來蜕企。從上面的截圖中可以看到咬荷,下一秒時已經(jīng)看不到a了,再下一秒糖赔,已經(jīng)看不到b和c了萍丐。表示a, b, c已經(jīng)不在當(dāng)前的窗口中。
2放典、 countByWindow(windowLength,slideInterval)
返回指定長度窗口中的元素個數(shù)逝变。
代碼如下,統(tǒng)計當(dāng)前3秒長度的時間窗口的DStream中元素的個數(shù):
val windowWords = words.countByWindow(Seconds( 3 ), Seconds( 1))
3奋构、 reduceByWindow(func, windowLength,slideInterval)
類似于上面的reduce操作壳影,只不過這里不再是對整個調(diào)用DStream進(jìn)行reduce操作,而是在調(diào)用DStream上首先取窗口函數(shù)的元素形成新的DStream弥臼,然后在窗口元素形成的DStream上進(jìn)行reduce宴咧。
val windowWords = words.reduceByWindow(_ + "-" + _, Seconds( 3) , Seconds( 1 ))
4、 reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])
調(diào)用該操作的DStream中的元素格式為(k, v)径缅,整個操作類似于前面的reduceByKey掺栅,只不過對應(yīng)的數(shù)據(jù)源不同烙肺,reduceByKeyAndWindow的數(shù)據(jù)源是基于該DStream的窗口長度中的所有數(shù)據(jù)。該操作也有一個可選的并發(fā)數(shù)參數(shù)氧卧。
下面代碼中桃笙,將當(dāng)前長度為3的時間窗口中的所有數(shù)據(jù)元素根據(jù)key進(jìn)行合并,統(tǒng)計當(dāng)前3秒中內(nèi)不同單詞出現(xiàn)的次數(shù)沙绝。
val windowWords = pairs.reduceByKeyAndWindow((a:Int , b:Int) => (a + b) , Seconds(3 ) , Seconds( 1 ))
5搏明、 reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks])
這個窗口操作和上一個的區(qū)別是多傳入一個函數(shù)invFunc。前面的func作用和上一個reduceByKeyAndWindow相同闪檬,后面的invFunc是用于處理流出rdd的星著。
在下面這個例子中,如果把3秒的時間窗口當(dāng)成一個池塘粗悯,池塘每一秒都會有魚游進(jìn)或者游出虚循,那么第一個函數(shù)表示每由進(jìn)來一條魚,就在該類魚的數(shù)量上累加为黎。而第二個函數(shù)是邮丰,每由出去一條魚行您,就將該魚的總數(shù)減去一铭乾。{在窗口長度長(比如10個批次),窗口間隔型扪(比如一個批次)的情況下計算更高效炕檩,只需減去離開窗口的數(shù)據(jù),和加上進(jìn)入窗口的數(shù)據(jù)即可捌斧,不用計算重復(fù)的部分(9個批次)}
windowWords = pairs.reduceByKeyAndWindow((a: Int, b:Int ) => (a + b) , (a:Int, b: Int) => (a - b) , Seconds( 3 ), Seconds( 1 ))
下面是演示結(jié)果笛质,最終的結(jié)果是該3秒長度的窗口中歷史上出現(xiàn)過的所有不同單詞個數(shù)都為0。
一段時間不輸入任何信息捞蚂,看一下最終結(jié)果
6妇押、 countByValueAndWindow(windowLength,slideInterval, [numTasks])
類似于前面的countByValue操作,調(diào)用該操作的DStream數(shù)據(jù)格式為(K, v)姓迅,返回的DStream格式為(K, Long)敲霍。統(tǒng)計當(dāng)前時間窗口中元素值相同的元素的個數(shù)。
val windowWords = words.countByValueAndWindow(Seconds( 3 ), Seconds( 1))
示例二:熱點搜索詞滑動統(tǒng)計丁存,每隔10秒鐘肩杈,統(tǒng)計最近60秒鐘的搜索詞的搜索頻次,并打印出排名最靠前的3個搜索詞以及出現(xiàn)次數(shù)
package com.spark.streaming
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
/** * @author Ganymede */
object WindowHotWordS {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WindowHotWordS").setMaster("local[2]") //Scala中解寝,創(chuàng)建的是StreamingContext
val ssc = new StreamingContext(conf, Seconds(5))
val searchLogsDStream = ssc.socketTextStream("spark1", 9999)
val searchWordsDStream = searchLogsDStream.map { searchLog => searchLog.split(" ")(1) }
val searchWordPairDStream = searchWordsDStream.map { searchWord => (searchWord, 1) }
// reduceByKeyAndWindow
// 第二個參數(shù)扩然,是窗口長度,這是是60秒
// 第三個參數(shù)聋伦,是滑動間隔夫偶,這里是10秒
// 也就是說界睁,每隔10秒鐘,將最近60秒的數(shù)據(jù)兵拢,作為一個窗口晕窑,進(jìn)行內(nèi)部的RDD的聚合,然后統(tǒng)一對一個RDD進(jìn)行后續(xù)計算
// 而是只是放在那里
// 然后卵佛,等待我們的滑動間隔到了以后杨赤,10秒到了,會將之前60秒的RDD截汪,因為一個batch間隔是5秒疾牲,所以之前60秒,就有12個RDD衙解,給聚合起來阳柔,然后統(tǒng)一執(zhí)行reduceByKey操作
// 所以這里的reduceByKeyAndWindow,是針對每個窗口執(zhí)行計算的蚓峦,而不是針對 某個DStream中的RDD // 每隔10秒鐘舌剂,出來 之前60秒的收集到的單詞的統(tǒng)計次數(shù)
// 用法 不斷輸入 zhangsan java這樣的name word
val searchWordCountsDStream = searchWordPairDStream.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2, Seconds(60), Seconds(10))
val finalDStream = searchWordCountsDStream.transform(searchWordCountsRDD => {
val countSearchWordsRDD = searchWordCountsRDD.map(tuple => (tuple._2, tuple._1))
val sortedCountSearchWordsRDD = countSearchWordsRDD.sortByKey(false)
val sortedSearchWordCountsRDD = sortedCountSearchWordsRDD.map(tuple => (tuple._1, tuple._2))
val top3SearchWordCounts = sortedSearchWordCountsRDD.take(3) for (tuple <- top3SearchWordCounts) {
println("result : " + tuple) //打印前三名的詞
}
searchWordCountsRDD
})
finalDStream.print() //打印出窗口期全部單詞
ssc.start()
ssc.awaitTermination()
}
}
補充 Transform操作
Transform操作允許任意RDD-to-RDD類型的函數(shù)被應(yīng)用在一個DStream上。通過它可以在DStream上使用任何沒有在DStream API中暴露的任意RDD操作暑椰。