Spark Streaming 的 Transformations

DStream 轉(zhuǎn)換操作包括:無狀態(tài)轉(zhuǎn)換、有狀態(tài)轉(zhuǎn)換杭抠。
無狀態(tài)轉(zhuǎn)換:每個批次的處理不依賴于之前批次的數(shù)據(jù)。
有狀態(tài)轉(zhuǎn)換:當(dāng)前批次的處理需要使用 之前批次的數(shù)據(jù)或者中間結(jié)果。有狀態(tài)轉(zhuǎn)換包括基于 滑動窗口的轉(zhuǎn)換 和 追蹤狀態(tài)變化的轉(zhuǎn)換(updateStateByKey)帮匾。

無狀態(tài)轉(zhuǎn)換操作

下面給出一些無狀態(tài)轉(zhuǎn)換操作的含義:

  • map(func) :對源DStream的每個元素,采用func函數(shù)進(jìn)行轉(zhuǎn)換痴鳄,得到一個新的DStream瘟斜;

  • flatMap(func): 與map相似,但是每個輸入項可用被映射為0個或者多個輸出項痪寻;

  • filter(func): 返回一個新的DStream螺句,僅包含源DStream中滿足函數(shù)func的項;

  • repartition(numPartitions): 通過創(chuàng)建更多或者更少的分區(qū)改變DStream的并行程度橡类;

  • union(otherStream): 返回一個新的DStream蛇尚,包含源DStream和其他DStream的元素;rdd1.union(rdd2)

  • count():統(tǒng)計源DStream中每個RDD的元素數(shù)量顾画;

  • reduce(func):利用函數(shù)func聚集源DStream中每個RDD的元素取劫,返回一個包含單元素RDDs的新DStream匆笤;

  • countByValue():應(yīng)用于元素類型為K的DStream上,返回一個(K谱邪,V)鍵值對類型的新DStream炮捧,每個鍵的值是在原DStream的每個RDD中的出現(xiàn)次數(shù);

  • reduceByKey(func, [numTasks]):當(dāng)在一個由(K,V)鍵值對組成的DStream上執(zhí)行該操作時惦银,返回一個新的由(K,V)鍵值對組成的DStream咆课,每一個key的值均由給定的recuce函數(shù)(func)聚集起來;

  • join(otherStream, [numTasks]):當(dāng)應(yīng)用于兩個DStream(一個包含(K,V)鍵值對,一個包含(K,W)鍵值對)璧函,返回一個包含(K, (V, W))鍵值對的新DStream白筹;

  • cogroup(otherStream, [numTasks]):將多個RDD中同一個Key對應(yīng)的Value組合到一起亮曹。看一個例子:

    val data1 = sc.parallelize(List((1, "www"), (2, "bbs")))
    val data2 = sc.parallelize(List((1, "iteblog"), (2, "iteblog"), (3, "very")))
    val data3 = sc.parallelize(List((1, "com"), (2, "com"), (3, "good")))
    val result = data1.cogroup(data2, data3)
    result.collect
    
    //結(jié)果為:
    Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] =
    Array((1, (CompactBuffer(www), CompactBuffer(iteblog), CompactBuffer(com))), 
    (2, (CompactBuffer(bbs), CompactBuffer(iteblog), CompactBuffer(com))), 
    (3, (CompactBuffer(), CompactBuffer(very), CompactBuffer(good))))
    
    從上面的結(jié)果可以看到,data1中不存在Key為3的元素(自然就不存在Value了)例隆,
    在組合的過程中將data1對應(yīng)的位置設(shè)置為CompactBuffer()了,而不是去掉了膏孟。
    
  • transform(func):通過對源DStream的每個RDD應(yīng)用RDD-to-RDD函數(shù)鹃锈,創(chuàng)建一個新的DStream。支持在新的DStream中做任何RDD操作宪萄。todo:transform 的使用例子 網(wǎng)上較少艺谆,貌似在 MLlib上用的較多,先留著

上面的無狀態(tài)轉(zhuǎn)換有一些在這篇文章做了簡要介紹并附上了例子:Spark 編程基礎(chǔ)

DStream有狀態(tài)轉(zhuǎn)換操作

對于DStream有狀態(tài)轉(zhuǎn)換操作而言拜英,當(dāng)前批次的處理需要使用之前批次的數(shù)據(jù)或者中間結(jié)果静汤。有狀態(tài)轉(zhuǎn)換包括 基于滑動窗口的轉(zhuǎn)換 和 追蹤狀態(tài)變化(updateStateByKey) 的轉(zhuǎn)換。

滑動窗口轉(zhuǎn)換操作

滑動窗口轉(zhuǎn)換操作的計算過程如下圖所示居凶,我們可以事先設(shè)定一個滑動窗口的長度:windowLength(也就是窗口的持續(xù)時間)虫给,并且設(shè)定滑動窗口的時間間隔:slideInterval(每隔多長時間執(zhí)行一次計算),然后侠碧,就可以讓窗口按照指定時間間隔在源DStream上滑動抹估,每次窗口停放的位置上,都會有一部分DStream被框入窗口內(nèi)弄兜,形成一個小段的DStream药蜻,這時,就可以啟動對這個小段DStream的計算替饿。

下面給給出一些窗口轉(zhuǎn)換操作的含義:

  • window(windowLength, slideInterval) 基于源DStream產(chǎn)生的窗口化的批數(shù)據(jù)语泽,計算得到一個新的DStream;

  • countByWindow(windowLength, slideInterval) 返回流中元素的一個滑動窗口數(shù)视卢;

  • reduceByWindow(func, windowLength, slideInterval) 返回一個單元素流湿弦。利用函數(shù)func聚集滑動時間間隔的流的元素創(chuàng)建這個單元素流。函數(shù)func必須滿足結(jié)合律腾夯,從而可以支持并行計算颊埃;

  • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 應(yīng)用到一個 (K, V) 鍵值對組成的 DStream 上時蔬充,會返回一個由 (K, V) 鍵值對組成的新的DStream。每一個key的值均由給定的 reduce 函數(shù) (func 函數(shù)) 進(jìn)行聚合計算班利。注意:在默認(rèn)情況下饥漫,這個算子利用了 Spark 默認(rèn)的并發(fā)任務(wù)數(shù)去分組÷薇辏可以通過 numTasks 參數(shù)的設(shè)置來指定不同的任務(wù)數(shù)庸队;

  • reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 更加高效的reduceByKeyAndWindow,每個窗口的 reduce 值闯割,是基于先前窗口的 reduce 值進(jìn)行增量計算得到的彻消;它會對進(jìn)入滑動窗口的新數(shù)據(jù)進(jìn)行 reduce 操作,并對離開窗口的老數(shù)據(jù)進(jìn)行“逆向 reduce”操作宙拉。但是宾尚,只能用于“可逆 reduce 函數(shù)”,即那些reduce函數(shù)都有一個對應(yīng)的“逆向 reduce函數(shù)”(以InvFunc參數(shù)傳入)谢澈;

  • countByValueAndWindow(windowLength, slideInterval, [numTasks]) 當(dāng)應(yīng)用到一個(K,V)鍵值對組成的DStream上煌贴,返回一個由(K,V)鍵值對組成的新的DStream。每個key的值都是它們在滑動窗口中出現(xiàn)的頻率锥忿。

下面重點介紹下 reduceByKeyAndWindow:

reduceByKeyAndWindow

再看一下這個圖:


1牛郑、reduceByKeyAndWindow(_ + _, Seconds(3), Seconds(2))
此方法的窗口長度為3,兩秒劃分一次敬鬓,并對 key 相同的 value 進(jìn)行相加淹朋,即 WordCount。

2钉答、對于他的重載函數(shù) reduceByKeyAndWindow(_ + _, _ - _, Seconds(3s), seconds(2))
設(shè)計理念是瑞你,當(dāng) 滑動窗口的時間Seconds(2) < Seconds(3)(窗口大小)時希痴,兩個統(tǒng)計的部分會有重復(fù),那么我們就可以不用重新獲取或者計算春感,而是通過獲取舊信息來更新新的信息砌创,這樣即節(jié)省了空間又節(jié)省了內(nèi)容,并且效率也大幅提升鲫懒。

如上圖所示嫩实,2次統(tǒng)計重復(fù)的部分為 time3 對用的時間片內(nèi)的數(shù)據(jù),這樣對于window1窥岩,和window2的計算可以如下所示
win1 = time1 + time2 + time3
win2 = time3 + time4 + time5

更新為
win1 = time1 + time2 + time3
win2 = win1+ time4 + time5 - time1 - time2

這樣就理解了吧, _ + _ 是對新產(chǎn)生的時間分片(time4, time5 內(nèi) RDD)進(jìn)行統(tǒng)計甲献,而 _ - _ 是對上一個窗口中,過時的時間分片 (time1, time2) 進(jìn)行統(tǒng)計

updateStateByKey操作

我們統(tǒng)計單詞詞頻采用的是無狀態(tài)轉(zhuǎn)換操作颂翼,也就是說晃洒,只對本批次內(nèi)的單詞進(jìn)行詞頻統(tǒng)計慨灭,不會考慮之前到達(dá)的批次的單詞,所以球及,不同批次的單詞詞頻都是獨立統(tǒng)計的氧骤。

對于有狀態(tài)轉(zhuǎn)換操作而言,本批次的詞頻統(tǒng)計吃引,會在之前批次的詞頻統(tǒng)計結(jié)果的基礎(chǔ)上進(jìn)行不斷累加筹陵,所以,最終統(tǒng)計得到的詞頻镊尺,是所有批次的單詞的總的詞頻統(tǒng)計結(jié)果朦佩。

注意 updateStateByKey 操作要求開啟 checkPoint。

//設(shè)置檢查點庐氮,檢查點具有容錯機(jī)制
sc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful/")

val words = lines.flatMap(_.split(" "))
val dStream = words.map(x => (x, 1))

val results = dStream.updateStateByKey[Int]((values: Seq[Int], state: Option[Int]) => {
    val currentCount = values.sum
    val previousCount = state.getOrElse(0)
    Some(currentCount + previousCount)
})

results.print()
sc.start()
sc.awaitTermination()
  • 函數(shù)中第一個參數(shù) values:
    是當(dāng)前batch 某個key 即某個單詞的列表语稠,譬如:(x, 1) (x, 1) (x, 1) ...
    values.sum 即統(tǒng)計 當(dāng)前 batch 某個單詞 所有值的總和:(x, 3)。

  • 第二個參數(shù) state:
    某個key的歷史狀態(tài)信息旭愧,也就是某個單詞歷史批次的詞頻匯總結(jié)果颅筋,因為當(dāng)前批次(batch )可能是第一個 batch,上一batch可能沒有输枯,所以使用 getOrElse(0)议泵,沒有的話就是0。

  • 上一batch 和 當(dāng)前batch 累加
    當(dāng)新的batch到來時桃熄,繼續(xù)進(jìn)行累加先口,依次統(tǒng)計當(dāng)前所有 batch 的值。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末瞳收,一起剝皮案震驚了整個濱河市碉京,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌螟深,老刑警劉巖谐宙,帶你破解...
    沈念sama閱讀 222,729評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異界弧,居然都是意外死亡凡蜻,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,226評論 3 399
  • 文/潘曉璐 我一進(jìn)店門垢箕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來划栓,“玉大人,你說我怎么就攤上這事条获≈臆瘢” “怎么了?”我有些...
    開封第一講書人閱讀 169,461評論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長委煤。 經(jīng)常有香客問我堂油,道長,這世上最難降的妖魔是什么素标? 我笑而不...
    開封第一講書人閱讀 60,135評論 1 300
  • 正文 為了忘掉前任称诗,我火速辦了婚禮,結(jié)果婚禮上头遭,老公的妹妹穿的比我還像新娘寓免。我一直安慰自己,他們只是感情好计维,可當(dāng)我...
    茶點故事閱讀 69,130評論 6 398
  • 文/花漫 我一把揭開白布袜香。 她就那樣靜靜地躺著,像睡著了一般鲫惶。 火紅的嫁衣襯著肌膚如雪蜈首。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,736評論 1 312
  • 那天欠母,我揣著相機(jī)與錄音欢策,去河邊找鬼。 笑死赏淌,一個胖子當(dāng)著我的面吹牛踩寇,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播六水,決...
    沈念sama閱讀 41,179評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼俺孙,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了掷贾?” 一聲冷哼從身側(cè)響起睛榄,我...
    開封第一講書人閱讀 40,124評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎想帅,沒想到半個月后场靴,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,657評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡港准,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,723評論 3 342
  • 正文 我和宋清朗相戀三年旨剥,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片叉趣。...
    茶點故事閱讀 40,872評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖该押,靈堂內(nèi)的尸體忽然破棺而出疗杉,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 36,533評論 5 351
  • 正文 年R本政府宣布烟具,位于F島的核電站梢什,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏朝聋。R本人自食惡果不足惜嗡午,卻給世界環(huán)境...
    茶點故事閱讀 42,213評論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望冀痕。 院中可真熱鬧荔睹,春花似錦、人聲如沸言蛇。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,700評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽腊尚。三九已至吨拗,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間婿斥,已是汗流浹背劝篷。 一陣腳步聲響...
    開封第一講書人閱讀 33,819評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留民宿,地道東北人娇妓。 一個月前我還...
    沈念sama閱讀 49,304評論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像勘高,于是被迫代替她去往敵國和親峡蟋。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,876評論 2 361

推薦閱讀更多精彩內(nèi)容