Spark Streaming(六):緩存與持久化

1复局、概述

與RDD類似马篮,Spark Streaming也可以讓開發(fā)人員手動控制映之,將數(shù)據(jù)流中的數(shù)據(jù)持久化到內(nèi)存中拦焚。
對DStream調(diào)用persist()方法,就可以讓Spark Streaming自動將該數(shù)據(jù)流中的所有產(chǎn)生的RDD杠输,都持久化到內(nèi)存中耕漱。
如果要對一個DStream多次執(zhí)行操作,那么抬伺,對DStream持久化是非常有用的螟够。因為多次操作,可以共享使用內(nèi)存中的一份緩存數(shù)據(jù)。

對于基于窗口的操作妓笙,比如reduceByWindow若河、reduceByKeyAndWindow,以及基于狀態(tài)的操作寞宫,比如updateStateByKey萧福,默認(rèn)就隱式開啟了持久化機(jī)制。
即Spark Streaming默認(rèn)就會將上述操作產(chǎn)生的Dstream中的數(shù)據(jù)辈赋,緩存到內(nèi)存中鲫忍,不需要開發(fā)人員手動調(diào)用persist()方法。

對于通過網(wǎng)絡(luò)接收數(shù)據(jù)的輸入流钥屈,比如socket悟民、Kafka、Flume等篷就,默認(rèn)的持久化級別射亏,是將數(shù)據(jù)復(fù)制一份,以便于容錯竭业。
相當(dāng)于是智润,用的是類似MEMORY_ONLY_SER_2。

與RDD不同的是未辆,默認(rèn)的持久化級別窟绷,統(tǒng)一都是要序列化的。

2咐柜、Checkpoint 機(jī)制

每一個Spark Streaming應(yīng)用钾麸,正常來說,都是要7 * 24小時運轉(zhuǎn)的炕桨,這就是實時計算程序的特點饭尝。
因為要持續(xù)不斷的對數(shù)據(jù)進(jìn)行計算,所以對實時計算應(yīng)用的要求献宫,應(yīng)該是必須要能夠?qū)εc應(yīng)用程序邏輯無關(guān)的失敗钥平,進(jìn)行容錯。

如果要實現(xiàn)這個目標(biāo)姊途,Spark Streaming程序就必須將足夠的信息checkpoint到容錯的存儲系統(tǒng)上涉瘾,從而讓它能夠從失敗中進(jìn)行恢復(fù)。
有兩種數(shù)據(jù)需要被進(jìn)行checkpoint:

  • 1捷兰、元數(shù)據(jù)checkpoint——將定義了流式計算邏輯的信息立叛,保存到容錯的存儲系統(tǒng)上,比如HDFS贡茅。當(dāng)運行Spark Streaming應(yīng)用程序的Driver進(jìn)程所在節(jié)點失敗時秘蛇,該信息可以用于進(jìn)行恢復(fù)其做。
    元數(shù)據(jù)信息包括了:

    • 1.1 配置信息——創(chuàng)建Spark Streaming應(yīng)用程序的配置信息,比如SparkConf中的信息赁还。
    • 1.2 DStream的操作信息——定義了Spark Stream應(yīng)用程序的計算邏輯的DStream操作信息妖泄。
    • 1.3 未處理的batch信息——哪些job正在排隊,還沒處理的batch信息艘策。
  • 2蹈胡、數(shù)據(jù)checkpoint——將實時計算過程中產(chǎn)生的RDD的數(shù)據(jù)保存到可靠的存儲系統(tǒng)中。

    對于一些將多個batch的數(shù)據(jù)進(jìn)行聚合的朋蔫,有狀態(tài)的transformation操作罚渐,這是非常有用的。在這種transformation操作中驯妄,生成的RDD是依賴于之前的batch的RDD的荷并,這會導(dǎo)致隨著時間的推移,RDD的依賴鏈條變得越來越長富玷。

    要避免由于依賴鏈條越來越長,導(dǎo)致的一起變得越來越長的失敗恢復(fù)時間既穆,有狀態(tài)的transformation操作執(zhí)行過程中間產(chǎn)生的RDD赎懦,會定期地被checkpoint到可靠的存儲系統(tǒng)上,比如HDFS幻工。從而削減RDD的依賴鏈條励两,進(jìn)而縮短失敗恢復(fù)時,RDD的恢復(fù)時間囊颅。

一句話概括当悔,元數(shù)據(jù)checkpoint主要是為了從driver失敗中進(jìn)行恢復(fù);而RDD checkpoint主要是為了踢代,使用到有狀態(tài)的transformation操作時盲憎,能夠在其生產(chǎn)出的數(shù)據(jù)丟失時,進(jìn)行快速的失敗恢復(fù)胳挎。

3饼疙、啟用Checkpoint機(jī)制

  • 何時啟用Checkpoint機(jī)制
    • 1、使用了有狀態(tài)的transformation操作——比如updateStateByKey慕爬,或者reduceByKeyAndWindow操作窑眯,被使用了,那么checkpoint目錄要求是必須提供的医窿,也就是必須開啟checkpoint機(jī)制磅甩,從而進(jìn)行周期性的RDD checkpoint。

    • 2姥卢、要保證可以從Driver失敗中進(jìn)行恢復(fù)——元數(shù)據(jù)checkpoint需要啟用卷要,來進(jìn)行這種情況的恢復(fù)渣聚。

要注意的是,并不是說却妨,所有的Spark Streaming應(yīng)用程序饵逐,都要啟用checkpoint機(jī)制,如果即不強制要求從Driver失敗中自動進(jìn)行恢復(fù)彪标,又沒使用有狀態(tài)的transformation操作倍权,那么就不需要啟用checkpoint。事實上捞烟,這么做反而是有助于提升性能的薄声。

  • 如何啟用Checkpoint機(jī)制
    • 1、對于有狀態(tài)的transformation操作题画,啟用checkpoint機(jī)制默辨,定期將其生產(chǎn)的RDD數(shù)據(jù)checkpoint,是比較簡單的苍息。

      可以通過配置一個容錯的缩幸、可靠的文件系統(tǒng)(比如HDFS)的目錄,來啟用checkpoint機(jī)制竞思,checkpoint數(shù)據(jù)就會寫入該目錄表谊。使用StreamingContext的checkpoint()方法即可。然后盖喷,你就可以放心使用有狀態(tài)的transformation操作了爆办。

    • 2、如果為了要從Driver失敗中進(jìn)行恢復(fù)课梳,那么啟用checkpoint機(jī)制距辆,是比較復(fù)雜的,需要改寫Spark Streaming應(yīng)用程序暮刃。

      當(dāng)應(yīng)用程序第一次啟動的時候跨算,需要創(chuàng)建一個新的StreamingContext,并且調(diào)用其start()方法椭懊,進(jìn)行啟動漂彤。當(dāng)Driver從失敗中恢復(fù)過來時,需要從checkpoint目錄中記錄的元數(shù)據(jù)中灾搏,恢復(fù)出來一個StreamingContext挫望。

    //為Driver失敗的恢復(fù)機(jī)制重寫程序
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
  @Override 
  public JavaStreamingContext create() {
    JavaStreamingContext jssc = new JavaStreamingContext(...);  
    JavaDStream<String> lines = jssc.socketTextStream(...);     
    jssc.checkpoint(checkpointDirectory);                       
    return jssc;
  }
};

JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
context.start();
context.awaitTermination();
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)  
    val lines = ssc.socketTextStream(...) 
    ssc.checkpoint(checkpointDirectory)   
    ssc
}

val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
context.start()
context.awaitTermination()

4、配置spark-submit提交參數(shù)

按照上述方法狂窑,進(jìn)行Spark Streaming應(yīng)用程序的重寫后媳板,當(dāng)?shù)谝淮芜\行程序時,如果發(fā)現(xiàn)checkpoint目錄不存在泉哈,那么就使用定義的函數(shù)來第一次創(chuàng)建一個StreamingContext蛉幸,并將其元數(shù)據(jù)寫入checkpoint目錄破讨;當(dāng)從Driver失敗中恢復(fù)過來時,發(fā)現(xiàn)checkpoint目錄已經(jīng)存在了奕纫,那么會使用該目錄中的元數(shù)據(jù)創(chuàng)建一個StreamingContext提陶。

但是上面的重寫應(yīng)用程序的過程,只是實現(xiàn)Driver失敗自動恢復(fù)的第一步匹层。第二步是隙笆,必須確保Driver可以在失敗時,自動被重啟升筏。

要能夠自動從Driver失敗中恢復(fù)過來撑柔,運行Spark Streaming應(yīng)用程序的集群,就必須監(jiān)控Driver運行的過程您访,并且在它失敗時將它重啟铅忿。對于Spark自身的standalone模式,需要進(jìn)行一些配置去supervise driver灵汪,在它失敗時將其重啟檀训。

首先,要在spark-submit中享言,添加--deploy-mode參數(shù)峻凫,默認(rèn)其值為client,即在提交應(yīng)用的機(jī)器上啟動Driver担锤;但是蔚晨,要能夠自動重啟Driver乍钻,就必須將其值設(shè)置為cluster肛循;此外,需要添加--supervise參數(shù)银择。

使用上述第二步驟提交應(yīng)用之后多糠,就可以讓driver在失敗時自動被重啟,并且通過checkpoint目錄的元數(shù)據(jù)恢復(fù)StreamingContext浩考。

5夹孔、補充說明

將RDD checkpoint到可靠的存儲系統(tǒng)上,會耗費很多性能析孽。當(dāng)RDD被checkpoint時搭伤,會導(dǎo)致這些batch的處理時間增加。因此袜瞬,checkpoint的間隔怜俐,需要謹(jǐn)慎的設(shè)置。對于那些間隔很多的batch邓尤,比如1秒拍鲤,如果還要執(zhí)行checkpoint操作贴谎,則會大幅度削減吞吐量。而另外一方面季稳,如果checkpoint操作執(zhí)行的不太頻繁擅这,那就會導(dǎo)致RDD的lineage變長,又會有失敗恢復(fù)時間過長的風(fēng)險景鼠。

對于那些要求checkpoint的有狀態(tài)的transformation操作仲翎,默認(rèn)的checkpoint間隔通常是batch間隔的數(shù)倍,至少是10秒莲蜘。使用DStream的checkpoint()方法谭确,可以設(shè)置這個DStream的checkpoint的間隔時長。
通常來說票渠,將checkpoint間隔設(shè)置為窗口操作的滑動間隔的5~10倍逐哈,是個不錯的選擇。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末问顷,一起剝皮案震驚了整個濱河市昂秃,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌杜窄,老刑警劉巖肠骆,帶你破解...
    沈念sama閱讀 219,110評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異塞耕,居然都是意外死亡蚀腿,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評論 3 395
  • 文/潘曉璐 我一進(jìn)店門扫外,熙熙樓的掌柜王于貴愁眉苦臉地迎上來莉钙,“玉大人,你說我怎么就攤上這事筛谚〈庞瘢” “怎么了?”我有些...
    開封第一講書人閱讀 165,474評論 0 356
  • 文/不壞的土叔 我叫張陵驾讲,是天一觀的道長蚊伞。 經(jīng)常有香客問我,道長吮铭,這世上最難降的妖魔是什么时迫? 我笑而不...
    開封第一講書人閱讀 58,881評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮谓晌,結(jié)果婚禮上掠拳,老公的妹妹穿的比我還像新娘。我一直安慰自己扎谎,他們只是感情好碳想,可當(dāng)我...
    茶點故事閱讀 67,902評論 6 392
  • 文/花漫 我一把揭開白布烧董。 她就那樣靜靜地躺著,像睡著了一般胧奔。 火紅的嫁衣襯著肌膚如雪逊移。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,698評論 1 305
  • 那天龙填,我揣著相機(jī)與錄音胳泉,去河邊找鬼。 笑死岩遗,一個胖子當(dāng)著我的面吹牛扇商,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播宿礁,決...
    沈念sama閱讀 40,418評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼案铺,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了梆靖?” 一聲冷哼從身側(cè)響起控汉,我...
    開封第一講書人閱讀 39,332評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎返吻,沒想到半個月后姑子,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,796評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡测僵,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,968評論 3 337
  • 正文 我和宋清朗相戀三年街佑,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片捍靠。...
    茶點故事閱讀 40,110評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡沐旨,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出剂公,到底是詐尸還是另有隱情希俩,我是刑警寧澤吊宋,帶...
    沈念sama閱讀 35,792評論 5 346
  • 正文 年R本政府宣布纲辽,位于F島的核電站,受9級特大地震影響璃搜,放射性物質(zhì)發(fā)生泄漏拖吼。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,455評論 3 331
  • 文/蒙蒙 一这吻、第九天 我趴在偏房一處隱蔽的房頂上張望吊档。 院中可真熱鬧,春花似錦唾糯、人聲如沸怠硼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽香璃。三九已至这难,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間葡秒,已是汗流浹背姻乓。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留眯牧,地道東北人蹋岩。 一個月前我還...
    沈念sama閱讀 48,348評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像学少,于是被迫代替她去往敵國和親剪个。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,047評論 2 355

推薦閱讀更多精彩內(nèi)容