流式計(jì)算系統(tǒng)的特點(diǎn)
低延遲
容錯性
數(shù)據(jù)不丟失
數(shù)據(jù)的送達(dá)
計(jì)算狀態(tài)的持久化
計(jì)算遷移
故障恢復(fù)擴(kuò)展能力
應(yīng)用的邏輯表達(dá)能力
流式計(jì)算任務(wù)都會部署成由多個計(jì)算節(jié)點(diǎn)和流經(jīng)這些節(jié)點(diǎn)的數(shù)據(jù)流構(gòu)成的有向無環(huán)圖(DAG)
系統(tǒng)架構(gòu)
主從
P2P
DAG拓?fù)浣Y(jié)構(gòu)
構(gòu)成DAG拓?fù)浣Y(jié)構(gòu)的元素是: 計(jì)算節(jié)點(diǎn)和流經(jīng)各個計(jì)算節(jié)點(diǎn)的實(shí)時數(shù)據(jù)流.
DAG結(jié)構(gòu)最常見的拓?fù)浣Y(jié)構(gòu)是: 流水線(最常見的方式), 亂序分組, 定向分組, 廣播模式.
送達(dá)保證
至少送達(dá)一次
至多送達(dá)一次
恰好送達(dá)一次
系統(tǒng)容錯
- 備用服務(wù)器, 熱備份, 檢查點(diǎn)機(jī)制.
Spark Streaming
Spark Streaming嚴(yán)格意義上說是一個micro-batch處理框架, 它把連續(xù)的數(shù)據(jù)流按照一定的時間間隔, 分成一系列連續(xù)的批數(shù)據(jù), 組成DStream. 依托于Spark的處理核心, Spark Stream只需把DStream轉(zhuǎn)化成RDD, 并把生成的RDD交給Spark核心處理就行了. 所以, spark-streaming類庫要做的事情就是: 1. 對接外部數(shù)據(jù)流服務(wù)器; 2. 將外部流轉(zhuǎn)化為DStream; 3. 定期向Spark-Core提交任務(wù).
所以, 依托于Spark Core的強(qiáng)大功能, Spark Streaming要做的事情就變得非常的簡單. 第一, 它不負(fù)責(zé)具體的計(jì)算; 第二, 數(shù)據(jù)存儲也是借助Spark Core的BlockManager. 它要做的事情接受數(shù)據(jù), 還有維護(hù)這幾個定時任務(wù).
幾個核心類及其其功能:
ReceiverInputDStream
這個類的最終要的作用是: 橋接外部數(shù)據(jù)源以及內(nèi)部DStream. 這是一個抽象類, 繼承的子類需要定制不同的Receiver. 比如SocketInputDStream的Receiver
是new SocketReceiver(host, port, bytesToObjects, storageLevel)
;KafkaInputDStream
的Receiver是KafkaReceiver
或者ReliableKafkaReceiver
.Receiver
這個類是負(fù)責(zé)接收外部數(shù)據(jù)的, 這也是個抽象類, 自定義Receiver要自己實(shí)現(xiàn)onStart()
,onStop()
,restart()
函數(shù). 我們可以看一下SocketReceiver的實(shí)現(xiàn), 非常簡單.
Receiver的另一個任務(wù)是數(shù)據(jù)存儲, 它內(nèi)部定義了store方法, store方法要ReceiverSupervisor的具體存儲邏輯. ReceiverSupervisor是Receiver的看護(hù)者, Receiver負(fù)責(zé)數(shù)據(jù)的接受邏輯, 它則負(fù)責(zé)數(shù)據(jù)的存儲邏輯. 它內(nèi)部有兩個定時任務(wù): blockIntervalTimer 和 blockPushingThread. ReceiverSupervisor接收到數(shù)據(jù)后, 先對數(shù)據(jù)作內(nèi)存緩存(currentBuffer), blockIntervalTimer定期的將內(nèi)存緩存currentBuffer中的數(shù)據(jù)封裝成Block, 然后blockPushingThread持續(xù)的把剛剛構(gòu)建的Blocks推送給BlockManager.同時還要把剛存儲的BlockInfo推送給ReceiverTracker.
ReceiverTracker接到AddBlock消息之后把BlockInfo交給ReceivedBlockTracker, ReceivedBlockTracker記錄這所有的接收到的Block. ReceivedBlockTracker在記錄之前可以先寫操作日志, 如果宕機(jī)可以從操作日志恢復(fù). ReceivedBlockTracker記錄的這些Block信息用于以后生成Job, 交給Spark-Core處理.
- JobGenerator
JobGenerator內(nèi)部啟動了一個定時任務(wù), 周期性的生成Job和執(zhí)行checkpoint. 生成Job之前先把從上次開始接收到的所有的Block取出來, 構(gòu)建一個AllocatedBlocks, 存到內(nèi)存中. 這一步是準(zhǔn)備數(shù)據(jù). 生成Job的邏輯在DStream的generateJob和compute函數(shù)中. compute函數(shù)最終會生成一個BlockRDD, 數(shù)據(jù)就是剛才我們Allocate的,receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
. generateJob函數(shù)在拿到compute生成的RDD之后把它提交個SparkContext,context.sparkContext.runJob(rdd, emptyFunc)
.
至此, Spark Streaming的大概是如何工作的解釋完了.