Streaming提供了滑動(dòng)窗口操作的支持,從而讓我們可以對(duì)一個(gè)滑動(dòng)窗口內(nèi)的數(shù)據(jù)執(zhí)行計(jì)算操作颂碘。每次掉落在窗口內(nèi)的RDD的數(shù)據(jù),會(huì)被聚合起來(lái)執(zhí)行計(jì)算操作微饥,然后生成的RDD,會(huì)作為window DStream的一個(gè)RDD古戴。
網(wǎng)官圖中所示欠橘,就是對(duì)每三秒鐘的數(shù)據(jù)執(zhí)行一次滑動(dòng)窗口計(jì)算,這3秒內(nèi)的3個(gè)RDD會(huì)被聚合起來(lái)進(jìn)行處理允瞧,然后過(guò)了兩秒鐘,又會(huì)對(duì)最近三秒內(nèi)的數(shù)據(jù)執(zhí)行滑動(dòng)窗口計(jì)算蛮拔。所以每個(gè)滑動(dòng)窗口操作述暂,都必須指定兩個(gè)參數(shù),窗口長(zhǎng)度以及滑動(dòng)間隔建炫,而且這兩個(gè)參數(shù)值都必須是batch間隔的整數(shù)倍畦韭。
Spark Streaming對(duì)滑動(dòng)窗口的支持,是比Storm更加完善和強(qiáng)大的肛跌。
SparkStreaming對(duì)滑動(dòng)窗口支持的轉(zhuǎn)換操作:
示例講解:
maven
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>2.2.1</spark.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
基礎(chǔ)代碼
package com.neusoft
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WordCount {
def main(args: Array[String]): Unit = {
// Create a StreamingContext with a local master
// Spark Streaming needs at least two working thread
val sc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(10) )
sc.checkpoint("hdfs://192.168.121.128:8020/demo1")
// Create a DStream that will connect to serverIP:serverPort, like localhost:9999
val lines = sc.socketTextStream("192.168.121.128", 9999)
// Split each line into words
// 以空格把收到的每一行數(shù)據(jù)分割成單詞
val words = lines.flatMap(_.split(" "))
// 在本批次內(nèi)計(jì)單詞的數(shù)目
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
// 打印每個(gè)RDD中的前10個(gè)元素到控制臺(tái)
wordCounts.print()
sc.start()
sc.awaitTermination()
}
}
1艺配、window(windowLength, slideInterval)
該操作由一個(gè)DStream對(duì)象調(diào)用,傳入一個(gè)窗口長(zhǎng)度參數(shù)衍慎,一個(gè)窗口移動(dòng)速率參數(shù)转唉,然后將當(dāng)前時(shí)刻當(dāng)前長(zhǎng)度窗口中的元素取出形成一個(gè)新的DStream。
下面的代碼以長(zhǎng)度為3稳捆,移動(dòng)速率為1截取源DStream中的元素形成新的DStream赠法。
val windowWords = words.window(Seconds( 3 ), Seconds( 1))
基本上每秒輸入一個(gè)字母,然后取出當(dāng)前時(shí)刻3秒這個(gè)長(zhǎng)度中的所有元素乔夯,打印出來(lái)砖织。從上面的截圖中可以看到,下一秒時(shí)已經(jīng)看不到a了末荐,再下一秒侧纯,已經(jīng)看不到b和c了。表示a, b, c已經(jīng)不在當(dāng)前的窗口中甲脏。
2眶熬、 countByWindow(windowLength,slideInterval)
返回指定長(zhǎng)度窗口中的元素個(gè)數(shù)。
代碼如下块请,統(tǒng)計(jì)當(dāng)前3秒長(zhǎng)度的時(shí)間窗口的DStream中元素的個(gè)數(shù):
val windowWords = words.countByWindow(Seconds( 3 ), Seconds( 1))
3聋涨、 reduceByWindow(func, windowLength,slideInterval)
類似于上面的reduce操作,只不過(guò)這里不再是對(duì)整個(gè)調(diào)用DStream進(jìn)行reduce操作,而是在調(diào)用DStream上首先取窗口函數(shù)的元素形成新的DStream溶握,然后在窗口元素形成的DStream上進(jìn)行reduce戒洼。
val windowWords = words.reduceByWindow(_ + "-" + _, Seconds( 3) , Seconds( 1 ))
4、 reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])
調(diào)用該操作的DStream中的元素格式為(k, v)茂腥,整個(gè)操作類似于前面的reduceByKey狸涌,只不過(guò)對(duì)應(yīng)的數(shù)據(jù)源不同,reduceByKeyAndWindow的數(shù)據(jù)源是基于該DStream的窗口長(zhǎng)度中的所有數(shù)據(jù)最岗。該操作也有一個(gè)可選的并發(fā)數(shù)參數(shù)帕胆。
下面代碼中,將當(dāng)前長(zhǎng)度為3的時(shí)間窗口中的所有數(shù)據(jù)元素根據(jù)key進(jìn)行合并般渡,統(tǒng)計(jì)當(dāng)前3秒中內(nèi)不同單詞出現(xiàn)的次數(shù)懒豹。
val windowWords = pairs.reduceByKeyAndWindow((a:Int , b:Int) => (a + b) , Seconds(3 ) , Seconds( 1 ))
5、 reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks])
這個(gè)窗口操作和上一個(gè)的區(qū)別是多傳入一個(gè)函數(shù)invFunc驯用。前面的func作用和上一個(gè)reduceByKeyAndWindow相同脸秽,后面的invFunc是用于處理流出rdd的。
在下面這個(gè)例子中蝴乔,如果把3秒的時(shí)間窗口當(dāng)成一個(gè)池塘记餐,池塘每一秒都會(huì)有魚游進(jìn)或者游出,那么第一個(gè)函數(shù)表示每由進(jìn)來(lái)一條魚薇正,就在該類魚的數(shù)量上累加片酝。而第二個(gè)函數(shù)是,每由出去一條魚挖腰,就將該魚的總數(shù)減去一雕沿。{在窗口長(zhǎng)度長(zhǎng)(比如10個(gè)批次),窗口間隔泻锫亍(比如一個(gè)批次)的情況下計(jì)算更高效晦炊,只需減去離開窗口的數(shù)據(jù),和加上進(jìn)入窗口的數(shù)據(jù)即可宁脊,不用計(jì)算重復(fù)的部分(9個(gè)批次)}
windowWords = pairs.reduceByKeyAndWindow((a: Int, b:Int ) => (a + b) , (a:Int, b: Int) => (a - b) , Seconds( 3 ), Seconds( 1 ))
下面是演示結(jié)果断国,最終的結(jié)果是該3秒長(zhǎng)度的窗口中歷史上出現(xiàn)過(guò)的所有不同單詞個(gè)數(shù)都為0。一段時(shí)間不輸入任何信息榆苞,看一下最終結(jié)果
6稳衬、 countByValueAndWindow(windowLength,slideInterval, [numTasks])
類似于前面的countByValue操作,調(diào)用該操作的DStream數(shù)據(jù)格式為(K, v)坐漏,返回的DStream格式為(K, Long)薄疚。統(tǒng)計(jì)當(dāng)前時(shí)間窗口中元素值相同的元素的個(gè)數(shù)。
val windowWords = words.countByValueAndWindow(Seconds( 3 ), Seconds( 1))
示例二:熱點(diǎn)搜索詞滑動(dòng)統(tǒng)計(jì)赊琳,每隔10秒鐘街夭,統(tǒng)計(jì)最近60秒鐘的搜索詞的搜索頻次,并打印出排名最靠前的3個(gè)搜索詞以及出現(xiàn)次數(shù)
package com.spark.streaming
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
/** * @author Ganymede */
object WindowHotWordS {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WindowHotWordS").setMaster("local[2]") //Scala中躏筏,創(chuàng)建的是StreamingContext
val ssc = new StreamingContext(conf, Seconds(5))
val searchLogsDStream = ssc.socketTextStream("spark1", 9999)
val searchWordsDStream = searchLogsDStream.map { searchLog => searchLog.split(" ")(1) }
val searchWordPairDStream = searchWordsDStream.map { searchWord => (searchWord, 1) }
// reduceByKeyAndWindow
// 第二個(gè)參數(shù)板丽,是窗口長(zhǎng)度,這是是60秒
// 第三個(gè)參數(shù),是滑動(dòng)間隔埃碱,這里是10秒
// 也就是說(shuō)猖辫,每隔10秒鐘,將最近60秒的數(shù)據(jù)砚殿,作為一個(gè)窗口啃憎,進(jìn)行內(nèi)部的RDD的聚合,然后統(tǒng)一對(duì)一個(gè)RDD進(jìn)行后續(xù)計(jì)算
// 而是只是放在那里
// 然后似炎,等待我們的滑動(dòng)間隔到了以后辛萍,10秒到了,會(huì)將之前60秒的RDD羡藐,因?yàn)橐粋€(gè)batch間隔是5秒贩毕,所以之前60秒,就有12個(gè)RDD传睹,給聚合起來(lái)耳幢,然后統(tǒng)一執(zhí)行reduceByKey操作
// 所以這里的reduceByKeyAndWindow岸晦,是針對(duì)每個(gè)窗口執(zhí)行計(jì)算的欧啤,而不是針對(duì) 某個(gè)DStream中的RDD // 每隔10秒鐘,出來(lái) 之前60秒的收集到的單詞的統(tǒng)計(jì)次數(shù)
// 用法 不斷輸入 zhangsan java這樣的name word
val searchWordCountsDStream = searchWordPairDStream.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2, Seconds(60), Seconds(10))
val finalDStream = searchWordCountsDStream.transform(searchWordCountsRDD => {
val countSearchWordsRDD = searchWordCountsRDD.map(tuple => (tuple._2, tuple._1))
val sortedCountSearchWordsRDD = countSearchWordsRDD.sortByKey(false)
val sortedSearchWordCountsRDD = sortedCountSearchWordsRDD.map(tuple => (tuple._1, tuple._2))
val top3SearchWordCounts = sortedSearchWordCountsRDD.take(3) for (tuple <- top3SearchWordCounts) {
println("result : " + tuple) //打印前三名的詞
}
searchWordCountsRDD
})
finalDStream.print() //打印出窗口期全部單詞
ssc.start()
ssc.awaitTermination()
}
}
補(bǔ)充 Transform操作
Transform操作允許任意RDD-to-RDD類型的函數(shù)被應(yīng)用在一個(gè)DStream上启上。通過(guò)它可以在DStream上使用任何沒(méi)有在DStream API中暴露的任意RDD操作邢隧。