1复局、概述
與RDD類似马篮,Spark Streaming也可以讓開發(fā)人員手動控制映之,將數(shù)據(jù)流中的數(shù)據(jù)持久化到內(nèi)存中拦焚。
對DStream調(diào)用persist()方法,就可以讓Spark Streaming自動將該數(shù)據(jù)流中的所有產(chǎn)生的RDD杠输,都持久化到內(nèi)存中耕漱。
如果要對一個DStream多次執(zhí)行操作,那么抬伺,對DStream持久化是非常有用的螟够。因為多次操作,可以共享使用內(nèi)存中的一份緩存數(shù)據(jù)。
對于基于窗口的操作妓笙,比如reduceByWindow若河、reduceByKeyAndWindow,以及基于狀態(tài)的操作寞宫,比如updateStateByKey萧福,默認(rèn)就隱式開啟了持久化機(jī)制。
即Spark Streaming默認(rèn)就會將上述操作產(chǎn)生的Dstream中的數(shù)據(jù)辈赋,緩存到內(nèi)存中鲫忍,不需要開發(fā)人員手動調(diào)用persist()方法。
對于通過網(wǎng)絡(luò)接收數(shù)據(jù)的輸入流钥屈,比如socket悟民、Kafka、Flume等篷就,默認(rèn)的持久化級別射亏,是將數(shù)據(jù)復(fù)制一份,以便于容錯竭业。
相當(dāng)于是智润,用的是類似MEMORY_ONLY_SER_2。
與RDD不同的是未辆,默認(rèn)的持久化級別窟绷,統(tǒng)一都是要序列化的。
2咐柜、Checkpoint 機(jī)制
每一個Spark Streaming應(yīng)用钾麸,正常來說,都是要7 * 24小時運轉(zhuǎn)的炕桨,這就是實時計算程序的特點饭尝。
因為要持續(xù)不斷的對數(shù)據(jù)進(jìn)行計算,所以對實時計算應(yīng)用的要求献宫,應(yīng)該是必須要能夠?qū)εc應(yīng)用程序邏輯無關(guān)的失敗钥平,進(jìn)行容錯。
如果要實現(xiàn)這個目標(biāo)姊途,Spark Streaming程序就必須將足夠的信息checkpoint到容錯的存儲系統(tǒng)上涉瘾,從而讓它能夠從失敗中進(jìn)行恢復(fù)。
有兩種數(shù)據(jù)需要被進(jìn)行checkpoint:
-
1捷兰、元數(shù)據(jù)checkpoint——將定義了流式計算邏輯的信息立叛,保存到容錯的存儲系統(tǒng)上,比如HDFS贡茅。當(dāng)運行Spark Streaming應(yīng)用程序的Driver進(jìn)程所在節(jié)點失敗時秘蛇,該信息可以用于進(jìn)行恢復(fù)其做。
元數(shù)據(jù)信息包括了:- 1.1 配置信息——創(chuàng)建Spark Streaming應(yīng)用程序的配置信息,比如SparkConf中的信息赁还。
- 1.2 DStream的操作信息——定義了Spark Stream應(yīng)用程序的計算邏輯的DStream操作信息妖泄。
- 1.3 未處理的batch信息——哪些job正在排隊,還沒處理的batch信息艘策。
-
2蹈胡、數(shù)據(jù)checkpoint——將實時計算過程中產(chǎn)生的RDD的數(shù)據(jù)保存到可靠的存儲系統(tǒng)中。
對于一些將多個batch的數(shù)據(jù)進(jìn)行聚合的朋蔫,有狀態(tài)的transformation操作罚渐,這是非常有用的。在這種transformation操作中驯妄,生成的RDD是依賴于之前的batch的RDD的荷并,這會導(dǎo)致隨著時間的推移,RDD的依賴鏈條變得越來越長富玷。
要避免由于依賴鏈條越來越長,導(dǎo)致的一起變得越來越長的失敗恢復(fù)時間既穆,有狀態(tài)的transformation操作執(zhí)行過程中間產(chǎn)生的RDD赎懦,會定期地被checkpoint到可靠的存儲系統(tǒng)上,比如HDFS幻工。從而削減RDD的依賴鏈條励两,進(jìn)而縮短失敗恢復(fù)時,RDD的恢復(fù)時間囊颅。
一句話概括当悔,元數(shù)據(jù)checkpoint主要是為了從driver失敗中進(jìn)行恢復(fù);而RDD checkpoint主要是為了踢代,使用到有狀態(tài)的transformation操作時盲憎,能夠在其生產(chǎn)出的數(shù)據(jù)丟失時,進(jìn)行快速的失敗恢復(fù)胳挎。
3饼疙、啟用Checkpoint機(jī)制
- 何時啟用Checkpoint機(jī)制
1、使用了有狀態(tài)的transformation操作——比如updateStateByKey慕爬,或者reduceByKeyAndWindow操作窑眯,被使用了,那么checkpoint目錄要求是必須提供的医窿,也就是必須開啟checkpoint機(jī)制磅甩,從而進(jìn)行周期性的RDD checkpoint。
2姥卢、要保證可以從Driver失敗中進(jìn)行恢復(fù)——元數(shù)據(jù)checkpoint需要啟用卷要,來進(jìn)行這種情況的恢復(fù)渣聚。
要注意的是,并不是說却妨,所有的Spark Streaming應(yīng)用程序饵逐,都要啟用checkpoint機(jī)制,如果即不強制要求從Driver失敗中自動進(jìn)行恢復(fù)彪标,又沒使用有狀態(tài)的transformation操作倍权,那么就不需要啟用checkpoint。事實上捞烟,這么做反而是有助于提升性能的薄声。
- 如何啟用Checkpoint機(jī)制
-
1、對于有狀態(tài)的transformation操作题画,啟用checkpoint機(jī)制默辨,定期將其生產(chǎn)的RDD數(shù)據(jù)checkpoint,是比較簡單的苍息。
可以通過配置一個容錯的缩幸、可靠的文件系統(tǒng)(比如HDFS)的目錄,來啟用checkpoint機(jī)制竞思,checkpoint數(shù)據(jù)就會寫入該目錄表谊。使用StreamingContext的checkpoint()方法即可。然后盖喷,你就可以放心使用有狀態(tài)的transformation操作了爆办。
-
2、如果為了要從Driver失敗中進(jìn)行恢復(fù)课梳,那么啟用checkpoint機(jī)制距辆,是比較復(fù)雜的,需要改寫Spark Streaming應(yīng)用程序暮刃。
當(dāng)應(yīng)用程序第一次啟動的時候跨算,需要創(chuàng)建一個新的StreamingContext,并且調(diào)用其start()方法椭懊,進(jìn)行啟動漂彤。當(dāng)Driver從失敗中恢復(fù)過來時,需要從checkpoint目錄中記錄的元數(shù)據(jù)中灾搏,恢復(fù)出來一個StreamingContext挫望。
-
//為Driver失敗的恢復(fù)機(jī)制重寫程序
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() {
JavaStreamingContext jssc = new JavaStreamingContext(...);
JavaDStream<String> lines = jssc.socketTextStream(...);
jssc.checkpoint(checkpointDirectory);
return jssc;
}
};
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
context.start();
context.awaitTermination();
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...)
val lines = ssc.socketTextStream(...)
ssc.checkpoint(checkpointDirectory)
ssc
}
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
context.start()
context.awaitTermination()
4、配置spark-submit提交參數(shù)
按照上述方法狂窑,進(jìn)行Spark Streaming應(yīng)用程序的重寫后媳板,當(dāng)?shù)谝淮芜\行程序時,如果發(fā)現(xiàn)checkpoint目錄不存在泉哈,那么就使用定義的函數(shù)來第一次創(chuàng)建一個StreamingContext蛉幸,并將其元數(shù)據(jù)寫入checkpoint目錄破讨;當(dāng)從Driver失敗中恢復(fù)過來時,發(fā)現(xiàn)checkpoint目錄已經(jīng)存在了奕纫,那么會使用該目錄中的元數(shù)據(jù)創(chuàng)建一個StreamingContext提陶。
但是上面的重寫應(yīng)用程序的過程,只是實現(xiàn)Driver失敗自動恢復(fù)的第一步匹层。第二步是隙笆,必須確保Driver可以在失敗時,自動被重啟升筏。
要能夠自動從Driver失敗中恢復(fù)過來撑柔,運行Spark Streaming應(yīng)用程序的集群,就必須監(jiān)控Driver運行的過程您访,并且在它失敗時將它重啟铅忿。對于Spark自身的standalone模式,需要進(jìn)行一些配置去supervise driver灵汪,在它失敗時將其重啟檀训。
首先,要在spark-submit中享言,添加--deploy-mode參數(shù)峻凫,默認(rèn)其值為client,即在提交應(yīng)用的機(jī)器上啟動Driver担锤;但是蔚晨,要能夠自動重啟Driver乍钻,就必須將其值設(shè)置為cluster肛循;此外,需要添加--supervise參數(shù)银择。
使用上述第二步驟提交應(yīng)用之后多糠,就可以讓driver在失敗時自動被重啟,并且通過checkpoint目錄的元數(shù)據(jù)恢復(fù)StreamingContext浩考。
5夹孔、補充說明
將RDD checkpoint到可靠的存儲系統(tǒng)上,會耗費很多性能析孽。當(dāng)RDD被checkpoint時搭伤,會導(dǎo)致這些batch的處理時間增加。因此袜瞬,checkpoint的間隔怜俐,需要謹(jǐn)慎的設(shè)置。對于那些間隔很多的batch邓尤,比如1秒拍鲤,如果還要執(zhí)行checkpoint操作贴谎,則會大幅度削減吞吐量。而另外一方面季稳,如果checkpoint操作執(zhí)行的不太頻繁擅这,那就會導(dǎo)致RDD的lineage變長,又會有失敗恢復(fù)時間過長的風(fēng)險景鼠。
對于那些要求checkpoint的有狀態(tài)的transformation操作仲翎,默認(rèn)的checkpoint間隔通常是batch間隔的數(shù)倍,至少是10秒莲蜘。使用DStream的checkpoint()方法谭确,可以設(shè)置這個DStream的checkpoint的間隔時長。
通常來說票渠,將checkpoint間隔設(shè)置為窗口操作的滑動間隔的5~10倍逐哈,是個不錯的選擇。