Spark Streaming
Spark Streaming 執(zhí)行過程
Spark 內(nèi)部實(shí)現(xiàn)遵循以下步驟:
- 主要部件的初始化過程
- 網(wǎng)絡(luò)側(cè)接收到的數(shù)據(jù)如何存儲(chǔ)到內(nèi)存
- 如何根據(jù)存儲(chǔ)下來的數(shù)據(jù)生成相應(yīng)的spark job
下面我們以wordcount為例說明孵班。
1. streamingContext初始化過程
val ssc= new StreamingContext("local[2]","Networkwordcount",Seconds(1))
Streamingcontext的參數(shù):
- sparkcontext洞慎,任務(wù)最重通過sparkcontext借口提交到spark cluster運(yùn)行
- checkpoint 檢查點(diǎn)
- duration 根據(jù)多久時(shí)長常見一個(gè)batch
利用初始化的ssc生成dstream症脂。
val lines=ssc.sockettextStream("localhost",9999)
sockettextstream返回值是dstream
socketinputdstream的實(shí)現(xiàn)中伞租,最主要就是定義getreceiver函數(shù)氯庆,在getreceiver函數(shù)中制作了一件事情命咐,即產(chǎn)生一個(gè)新的socketreceiver景殷。
創(chuàng)建完socketreceiver之后隙赁,接下來的工作就是對dstream進(jìn)行一系列的操作轉(zhuǎn)換仇祭。對Streaming的實(shí)際應(yīng)用開發(fā)也集中在這樣的一個(gè)階段披蕉。
val words=lines.flatmap(_.split(" "))
import org.apache.spark.streaming.streamingContext._
val pairs=words.map(word=>(word,1))
val wordcounts=pairs.reduceByKey(_+_)
wordcounts.print()
在上述的轉(zhuǎn)換過程中,print屬于輸出操作乌奇。
共有如下輸出操作
- foreachrdd
- saveasobjectfiles
- saveastextfiles
- saveashadoopfiles
上述設(shè)計(jì)的輸出操作其實(shí)最后都會(huì)調(diào)用到foreachdstaream没讲,foreachdstream不同于dstream的地方在于沖在了generatejob方法。
最后就是提交礁苗。
ssc.start()
ssc.awaitTermination()
2. 數(shù)據(jù)接收
ssc.start觸及的運(yùn)行邏輯爬凑。調(diào)用jobscheduler.start,由job scheduler一次啟動(dòng)一下三大功能模塊试伙。
- 監(jiān)控
- 數(shù)據(jù)接收
- 定期生成spark job的jobgenerator
3. 數(shù)據(jù)處理
如何將輸出和輸入綁定一起嘁信,依賴于dstreamgraph,dstreamgraph記錄輸入的stream和輸出的stream疏叨。
窗口操作
滑動(dòng)窗口:
在任何基于窗口的操作都需要制定兩個(gè)參數(shù)潘靖,一個(gè)是窗口總的長度,另一個(gè)是滑動(dòng)窗口的間隔蚤蔓。需要注意的是這兩個(gè)參數(shù)的值必須是批量處理時(shí)間間隔的倍數(shù)卦溢。
比如想知道過去30s某個(gè)單詞出現(xiàn)的次數(shù),每10s更新一次結(jié)果,可以使用如下代碼:
val windowedwordcounts=pairs.reduceBykeyandWindow(a:int,b:int)=>(a+b),seconds(30),seconds(10))