- Spark Streaming Job架構(gòu)與運(yùn)行機(jī)制
- Spark Streaming 容錯(cuò)架構(gòu)與運(yùn)行機(jī)制
Spark Streaming是一個(gè)流處理架構(gòu)儿捧,隨著時(shí)間的推移,根據(jù)時(shí)間分片不斷的產(chǎn)生Job戳鹅,一直不停的運(yùn)行。從Job的產(chǎn)生上看昏兆,根據(jù)沒有流處理枫虏,只是看起來像流一樣而已。
下面以一個(gè)例子說明爬虱,代碼如下
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 使用Scala開發(fā)集群運(yùn)行的Spark 在線黑名單過濾程序
* 背景描述:在廣告點(diǎn)擊計(jì)費(fèi)系統(tǒng)中隶债,我們?cè)诰€過濾掉黑名單的點(diǎn)擊,進(jìn)而保護(hù)廣告商的利益跑筝,只進(jìn)行有效的廣告點(diǎn)擊計(jì)費(fèi)
* 或者在防刷評(píng)分(或者流量)系統(tǒng)死讹,過濾掉無效的投票或者評(píng)分或者流量;
* 實(shí)現(xiàn)技術(shù):使用transform Api直接基于RDD編程曲梗,進(jìn)行join操作
*/
object OnlineForeachRDD2DB {
def main(args: Array[String]){
/**
* 創(chuàng)建Spark的配置對(duì)象SparkConf赞警,設(shè)置Spark程序的運(yùn)行時(shí)的配置信息,
* 例如說通過setMaster來設(shè)置程序要鏈接的Spark集群的Master的URL,如果設(shè)置
* 為local虏两,則代表Spark程序在本地運(yùn)行愧旦,特別適合于機(jī)器配置條件非常差(例如
* 只有1G的內(nèi)存)的初學(xué)者 *
*/
val conf = new SparkConf() //創(chuàng)建SparkConf對(duì)象
conf.setAppName("OnlineForeachRDD") //設(shè)置應(yīng)用程序的名稱,在程序運(yùn)行的監(jiān)控界面可以看到名稱
//conf.setMaster("spark://Master:7077") //此時(shí)碘举,程序在Spark集群
conf.setMaster("local[6]")
//設(shè)置batchDuration時(shí)間間隔來控制Job生成的頻率并且創(chuàng)建Spark Streaming執(zhí)行的入口
val ssc = new StreamingContext(conf, Seconds(5))
val lines = ssc.socketTextStream("Master", 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords => {
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => {
val sql = "insert into streaming_itemcount(item,count) values('" + record._1 + "'," + record._2 + ")"
val stmt = connection.createStatement();
stmt.executeUpdate(sql);
})
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
}
ssc.start()
ssc.awaitTermination()
}
}
-
通過案例代碼透視Job的執(zhí)行過程忘瓦,解析Spark Streaming運(yùn)行機(jī)制搁廓,代碼運(yùn)行剖析如下:
首先通過StreamingContext調(diào)用start方法引颈,其內(nèi)部再啟動(dòng)JobScheduler的Start方法耕皮,進(jìn)行消息循環(huán)
在JobScheduler的start內(nèi)部會(huì)構(gòu)造JobGenerator和ReceiverTacker
-
然后調(diào)用JobGenerator和ReceiverTacker的start方法執(zhí)行以下操作
- JobGenerator啟動(dòng)后會(huì)不斷的根據(jù)batchDuration生成一個(gè)個(gè)的Job
- ReceiverTracker啟動(dòng)后首先在Spark Cluster中啟動(dòng)Receiver(其實(shí)是在Executor中先啟動(dòng)ReceiverSupervisor)
在Receiver收到數(shù)據(jù)后會(huì)通過ReceiverSupervisor存儲(chǔ)到Executor
同時(shí)把數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker,在ReceiverTracker內(nèi)部會(huì)通過ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息
每個(gè)BatchInterval會(huì)產(chǎn)生一個(gè)具體的Job蝙场,其實(shí)這里的Job不是Spark Core中所指的Job凌停,它只是基于DStream Graph而生成的RDD的DAG而已
要想運(yùn)行Job需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個(gè)單獨(dú)的線程來提交Job到集群運(yùn)行售滤,在線程中基于RDD的Action觸發(fā)作業(yè)的運(yùn)行
由于流處理過程中作業(yè)不斷生成罚拟,為了提升效率,可以使用線程池完箩。同時(shí)有可能設(shè)置了Job的FAIR公平調(diào)度的方式赐俗,也需要多線程的支持
-
從容錯(cuò)架構(gòu)的角度透視Spark Streaming 運(yùn)行機(jī)制
Spark Streaming是基于DStream的容錯(cuò)機(jī)制,DStream是隨著時(shí)間流逝不斷的產(chǎn)生RDD弊知,也就是說DStream是在固定的時(shí)間上操作RDD阻逮,容錯(cuò)會(huì)劃分到每一次所形成的RDD。Spark Streaming的容錯(cuò)包括 Executor與Driver兩方面的容錯(cuò)機(jī)制-
Executor 容錯(cuò):
- 數(shù)據(jù)接收:分布式方式秩彤、wal方式叔扼,先寫日志再保存數(shù)據(jù)到Executor
- 任務(wù)執(zhí)行安全性 Job基于RDD容錯(cuò)
-
Driver容錯(cuò) : checkpoint
基于RDD的特性,它的容錯(cuò)機(jī)制主要就是兩種:- 基于checkpoint漫雷,在stage之間是寬依賴瓜富,產(chǎn)生了shuffle操作,lineage鏈條過于復(fù)雜和冗長(zhǎng)降盹,這時(shí)候就需要做checkpoint与柑。
- 基于lineage(血統(tǒng))的容錯(cuò):
一般而言,spark選擇血統(tǒng)容錯(cuò)澎现,因?yàn)閷?duì)于大規(guī)模的數(shù)據(jù)集仅胞,做檢查點(diǎn)的成本很高〗1瑁考慮到RDD的依賴關(guān)系干旧,每個(gè)stage內(nèi)部都是窄依賴,此時(shí)一般基于lineage容錯(cuò)妹蔽,方便高效椎眯。
-
總結(jié): stage內(nèi)部做lineage,stage之間做checkpoint胳岂。