DStream 轉(zhuǎn)換操作包括:無狀態(tài)轉(zhuǎn)換、有狀態(tài)轉(zhuǎn)換杭抠。
無狀態(tài)轉(zhuǎn)換:每個批次的處理不依賴于之前批次的數(shù)據(jù)。
有狀態(tài)轉(zhuǎn)換:當(dāng)前批次的處理需要使用 之前批次的數(shù)據(jù)或者中間結(jié)果。有狀態(tài)轉(zhuǎn)換包括基于 滑動窗口的轉(zhuǎn)換 和 追蹤狀態(tài)變化的轉(zhuǎn)換(updateStateByKey)帮匾。
無狀態(tài)轉(zhuǎn)換操作
下面給出一些無狀態(tài)轉(zhuǎn)換操作的含義:
map(func) :對源DStream的每個元素,采用func函數(shù)進(jìn)行轉(zhuǎn)換痴鳄,得到一個新的DStream瘟斜;
flatMap(func): 與map相似,但是每個輸入項可用被映射為0個或者多個輸出項痪寻;
filter(func): 返回一個新的DStream螺句,僅包含源DStream中滿足函數(shù)func的項;
repartition(numPartitions): 通過創(chuàng)建更多或者更少的分區(qū)改變DStream的并行程度橡类;
union(otherStream): 返回一個新的DStream蛇尚,包含源DStream和其他DStream的元素;
rdd1.union(rdd2)
count():統(tǒng)計源DStream中每個RDD的元素數(shù)量顾画;
reduce(func):利用函數(shù)func聚集源DStream中每個RDD的元素取劫,返回一個包含單元素RDDs的新DStream匆笤;
countByValue():應(yīng)用于元素類型為K的DStream上,返回一個(K谱邪,V)鍵值對類型的新DStream炮捧,每個鍵的值是在原DStream的每個RDD中的出現(xiàn)次數(shù);
reduceByKey(func, [numTasks]):當(dāng)在一個由(K,V)鍵值對組成的DStream上執(zhí)行該操作時惦银,返回一個新的由(K,V)鍵值對組成的DStream咆课,每一個key的值均由給定的recuce函數(shù)(func)聚集起來;
join(otherStream, [numTasks]):當(dāng)應(yīng)用于兩個DStream(一個包含(K,V)鍵值對,一個包含(K,W)鍵值對)璧函,返回一個包含(K, (V, W))鍵值對的新DStream白筹;
-
cogroup(otherStream, [numTasks]):將多個RDD中同一個Key對應(yīng)的Value組合到一起亮曹。看一個例子:
val data1 = sc.parallelize(List((1, "www"), (2, "bbs"))) val data2 = sc.parallelize(List((1, "iteblog"), (2, "iteblog"), (3, "very"))) val data3 = sc.parallelize(List((1, "com"), (2, "com"), (3, "good"))) val result = data1.cogroup(data2, data3) result.collect //結(jié)果為: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array((1, (CompactBuffer(www), CompactBuffer(iteblog), CompactBuffer(com))), (2, (CompactBuffer(bbs), CompactBuffer(iteblog), CompactBuffer(com))), (3, (CompactBuffer(), CompactBuffer(very), CompactBuffer(good)))) 從上面的結(jié)果可以看到,data1中不存在Key為3的元素(自然就不存在Value了)例隆, 在組合的過程中將data1對應(yīng)的位置設(shè)置為CompactBuffer()了,而不是去掉了膏孟。
transform(func):通過對源DStream的每個RDD應(yīng)用RDD-to-RDD函數(shù)鹃锈,創(chuàng)建一個新的DStream。支持在新的DStream中做任何RDD操作宪萄。todo:transform 的使用例子 網(wǎng)上較少艺谆,貌似在 MLlib上用的較多,先留著
上面的無狀態(tài)轉(zhuǎn)換有一些在這篇文章做了簡要介紹并附上了例子:Spark 編程基礎(chǔ)
DStream有狀態(tài)轉(zhuǎn)換操作
對于DStream有狀態(tài)轉(zhuǎn)換操作而言拜英,當(dāng)前批次的處理需要使用之前批次的數(shù)據(jù)或者中間結(jié)果静汤。有狀態(tài)轉(zhuǎn)換包括 基于滑動窗口的轉(zhuǎn)換 和 追蹤狀態(tài)變化(updateStateByKey) 的轉(zhuǎn)換。
滑動窗口轉(zhuǎn)換操作
滑動窗口轉(zhuǎn)換操作的計算過程如下圖所示居凶,我們可以事先設(shè)定一個滑動窗口的長度:windowLength(也就是窗口的持續(xù)時間)虫给,并且設(shè)定滑動窗口的時間間隔:slideInterval(每隔多長時間執(zhí)行一次計算),然后侠碧,就可以讓窗口按照指定時間間隔在源DStream上滑動抹估,每次窗口停放的位置上,都會有一部分DStream被框入窗口內(nèi)弄兜,形成一個小段的DStream药蜻,這時,就可以啟動對這個小段DStream的計算替饿。
下面給給出一些窗口轉(zhuǎn)換操作的含義:
window(windowLength, slideInterval) 基于源DStream產(chǎn)生的窗口化的批數(shù)據(jù)语泽,計算得到一個新的DStream;
countByWindow(windowLength, slideInterval) 返回流中元素的一個滑動窗口數(shù)视卢;
reduceByWindow(func, windowLength, slideInterval) 返回一個單元素流湿弦。利用函數(shù)func聚集滑動時間間隔的流的元素創(chuàng)建這個單元素流。函數(shù)func必須滿足結(jié)合律腾夯,從而可以支持并行計算颊埃;
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 應(yīng)用到一個 (K, V) 鍵值對組成的 DStream 上時蔬充,會返回一個由 (K, V) 鍵值對組成的新的DStream。每一個key的值均由給定的 reduce 函數(shù) (func 函數(shù)) 進(jìn)行聚合計算班利。注意:在默認(rèn)情況下饥漫,這個算子利用了 Spark 默認(rèn)的并發(fā)任務(wù)數(shù)去分組÷薇辏可以通過 numTasks 參數(shù)的設(shè)置來指定不同的任務(wù)數(shù)庸队;
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 更加高效的reduceByKeyAndWindow,每個窗口的 reduce 值闯割,是基于先前窗口的 reduce 值進(jìn)行增量計算得到的彻消;它會對進(jìn)入滑動窗口的新數(shù)據(jù)進(jìn)行 reduce 操作,并對離開窗口的老數(shù)據(jù)進(jìn)行“逆向 reduce”操作宙拉。但是宾尚,只能用于“可逆 reduce 函數(shù)”,即那些reduce函數(shù)都有一個對應(yīng)的“逆向 reduce函數(shù)”(以InvFunc參數(shù)傳入)谢澈;
countByValueAndWindow(windowLength, slideInterval, [numTasks]) 當(dāng)應(yīng)用到一個(K,V)鍵值對組成的DStream上煌贴,返回一個由(K,V)鍵值對組成的新的DStream。每個key的值都是它們在滑動窗口中出現(xiàn)的頻率锥忿。
下面重點介紹下 reduceByKeyAndWindow:
reduceByKeyAndWindow
再看一下這個圖:
1牛郑、reduceByKeyAndWindow(_ + _, Seconds(3), Seconds(2))
此方法的窗口長度為3,兩秒劃分一次敬鬓,并對 key 相同的 value 進(jìn)行相加淹朋,即 WordCount。
2钉答、對于他的重載函數(shù) reduceByKeyAndWindow(_ + _, _ - _, Seconds(3s), seconds(2))
設(shè)計理念是瑞你,當(dāng) 滑動窗口的時間Seconds(2) < Seconds(3)(窗口大小)時希痴,兩個統(tǒng)計的部分會有重復(fù),那么我們就可以不用重新獲取或者計算春感,而是通過獲取舊信息來更新新的信息砌创,這樣即節(jié)省了空間又節(jié)省了內(nèi)容,并且效率也大幅提升鲫懒。
如上圖所示嫩实,2次統(tǒng)計重復(fù)的部分為 time3 對用的時間片內(nèi)的數(shù)據(jù),這樣對于window1窥岩,和window2的計算可以如下所示
win1 = time1 + time2 + time3
win2 = time3 + time4 + time5
更新為
win1 = time1 + time2 + time3
win2 = win1+ time4 + time5 - time1 - time2
這樣就理解了吧, _ + _ 是對新產(chǎn)生的時間分片(time4, time5 內(nèi) RDD)進(jìn)行統(tǒng)計甲献,而 _ - _ 是對上一個窗口中,過時的時間分片 (time1, time2) 進(jìn)行統(tǒng)計
updateStateByKey操作
我們統(tǒng)計單詞詞頻采用的是無狀態(tài)轉(zhuǎn)換操作颂翼,也就是說晃洒,只對本批次內(nèi)的單詞進(jìn)行詞頻統(tǒng)計慨灭,不會考慮之前到達(dá)的批次的單詞,所以球及,不同批次的單詞詞頻都是獨立統(tǒng)計的氧骤。
對于有狀態(tài)轉(zhuǎn)換操作而言,本批次的詞頻統(tǒng)計吃引,會在之前批次的詞頻統(tǒng)計結(jié)果的基礎(chǔ)上進(jìn)行不斷累加筹陵,所以,最終統(tǒng)計得到的詞頻镊尺,是所有批次的單詞的總的詞頻統(tǒng)計結(jié)果朦佩。
注意 updateStateByKey 操作要求開啟 checkPoint。
//設(shè)置檢查點庐氮,檢查點具有容錯機(jī)制
sc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful/")
val words = lines.flatMap(_.split(" "))
val dStream = words.map(x => (x, 1))
val results = dStream.updateStateByKey[Int]((values: Seq[Int], state: Option[Int]) => {
val currentCount = values.sum
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
})
results.print()
sc.start()
sc.awaitTermination()
函數(shù)中第一個參數(shù) values:
是當(dāng)前batch 某個key 即某個單詞的列表语稠,譬如:(x, 1) (x, 1) (x, 1) ...
values.sum
即統(tǒng)計 當(dāng)前 batch 某個單詞 所有值的總和:(x, 3)。第二個參數(shù) state:
某個key的歷史狀態(tài)信息旭愧,也就是某個單詞歷史批次的詞頻匯總結(jié)果颅筋,因為當(dāng)前批次(batch )可能是第一個 batch,上一batch可能沒有输枯,所以使用getOrElse(0)
议泵,沒有的話就是0。上一batch 和 當(dāng)前batch 累加
當(dāng)新的batch到來時桃熄,繼續(xù)進(jìn)行累加先口,依次統(tǒng)計當(dāng)前所有 batch 的值。