? ? 通過案例對spark streaming透徹理解三板斧之三:解密Spark Streaming運行機(jī)制和框架
? ? 首先我們運行以下的程序浑槽,然后通過這個程序的運行過程進(jìn)一步加深理解Spark?Streaming流處理的Job的執(zhí)行的過程占调,代碼如下:
object?OnlineForeachRDD2DB?{
def?main(args:?Array[String]){
/*
*?第1步:創(chuàng)建Spark的配置對象SparkConf,設(shè)置Spark程序的運行時的配置信息啥纸,
*?例如說通過setMaster來設(shè)置程序要鏈接的Spark集群的Master的URL,如果設(shè)置
*?為local,則代表Spark程序在本地運行婴氮,特別適合于機(jī)器配置條件非常差(例如
*?只有1G的內(nèi)存)的初學(xué)者???????*
*/
val?conf?=?new?SparkConf()?//創(chuàng)建SparkConf對象
conf.setAppName("OnlineForeachRDD")?//設(shè)置應(yīng)用程序的名稱斯棒,在程序運行的監(jiān)控界面可以看到名稱
conf.setMaster("spark://Master:7077")?//此時盾致,程序在Spark集群
conf.setMaster("local[6]")
//設(shè)置batchDuration時間間隔來控制Job生成的頻率并且創(chuàng)建Spark?Streaming執(zhí)行的入口
val?ssc?=?new?StreamingContext(conf,?Seconds(5))
val?lines?=?ssc.socketTextStream("Master",?9999)
val?words?=?lines.flatMap(_.split("?"))
val?wordCounts?=?words.map(x?=>?(x,?1)).reduceByKey(_?+?_)
wordCounts.foreachRDD?{?rdd?=>
rdd.foreachPartition?{?partitionOfRecords?=>?{
//?ConnectionPool?is?a?static,?lazily?initialized?pool?of?connections
val?connection?=?ConnectionPool.getConnection()
partitionOfRecords.foreach(record?=>?{
val?sql?=?"insert?into?streaming_itemcount(item,count)?values('"?+?record._1?+?"',"?+?record._2?+?")"
val?stmt?=?connection.createStatement();
stmt.executeUpdate(sql);
})
ConnectionPool.returnConnection(connection)??//?return?to?the?pool?for?future?reuse
}
}
}
/**
* ?在StreamingContext調(diào)用start方法的內(nèi)部其實是會啟動JobScheduler的Start方法,進(jìn)行消息循環(huán)荣暮,
*在JobScheduler的start內(nèi)部會構(gòu)造JobGenerator和ReceiverTacker庭惜,并且調(diào)用JobGenerator和
*ReceiverTacker的start方法:
* ?1,JobGenerator啟動后會不斷的根據(jù)batchDuration生成一個個的Job
* ?2穗酥,ReceiverTracker啟動后首先在Spark?Cluster中啟動Receiver(其實是在Executor中先啟動
*ReceiverSupervisor)护赊,在Receiver收到數(shù)據(jù)后會通過ReceiverSupervisor存儲到Executor并且把
*數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker,在ReceiverTracker內(nèi)部會通過
*ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息每個BatchInterval會產(chǎn)生一個具體的Job砾跃,
*其實這里的Job不是Spark?Core中所指的Job骏啰,它只是基于DStreamGraph而生成的RDD的DAG
*而已,從Java角度講蜓席,相當(dāng)于Runnable接口實例器一,此時要想運行Job需要提交給JobScheduler,
*在JobScheduler中通過線程池的方式找到一個單獨的線程來提交Job到集群運行(其實是在線程中
*基于RDD的Action觸發(fā)真正的作業(yè)的運行)厨内,
* ?為什么使用線程池呢祈秕?
* ?1,作業(yè)不斷生成雏胃,所以為了提升效率请毛,我們需要線程池;這和在Executor中通過線程池執(zhí)行Task
*有異曲同工之妙瞭亮;
* ?2方仿,有可能設(shè)置了Job的FAIR公平調(diào)度的方式,這個時候也需要多線程的支持统翩;
*/
ssc.start()
ssc.awaitTermination()
}
}
Spark Streaming容錯機(jī)制
? (1)driver級別容錯
(2)Executor級別容錯
? ? ? ? 1.接收數(shù)據(jù)的安全性
? ? ? ? 2.執(zhí)行的安全性
備注:
資料來源于:DT_大數(shù)據(jù)夢工廠(Spark發(fā)行版本定制)
更多私密內(nèi)容仙蚜,請關(guān)注微信公眾號:DT_Spark
如果您對大數(shù)據(jù)Spark感興趣,可以免費聽由王家林老師每天晚上20:00開設(shè)的Spark永久免費公開課厂汗,地址YY房間號:68917580