【容錯篇】Spark Streaming的還原藥水——Checkpoint

一個 Streaming Application 往往需要7*24不間斷的跑贺奠,所以需要有抵御意外的能力(比如機器或者系統(tǒng)掛掉己单,JVM crash等)赛不。為了讓這成為可能睬辐,Spark Streaming需要 checkpoint 足夠多信息至一個具有容錯設(shè)計的存儲系統(tǒng)才能讓 Application 從失敗中恢復(fù)疗垛。Spark Streaming 會 checkpoint 兩種類型的數(shù)據(jù)症汹。

  • Metadata(元數(shù)據(jù)) checkpointing - 保存定義了 Streaming 計算邏輯至類似 HDFS 的支持容錯的存儲系統(tǒng)。用來恢復(fù) driver贷腕,元數(shù)據(jù)包括:
    • 配置 - 用于創(chuàng)建該 streaming application 的所有配置
    • DStream 操作 - DStream 一些列的操作
    • 未完成的 batches - 那些提交了 job 但尚未執(zhí)行或未完成的 batches
  • Data checkpointing - 保存已生成的RDDs至可靠的存儲背镇。這在某些 stateful 轉(zhuǎn)換中是需要的,在這種轉(zhuǎn)換中泽裳,生成 RDD 需要依賴前面的 batches瞒斩,會導(dǎo)致依賴鏈隨著時間而變長。為了避免這種沒有盡頭的變長涮总,要定期將中間生成的 RDDs 保存到可靠存儲來切斷依賴鏈

總之胸囱,metadata checkpointing 主要用來恢復(fù) driver;而 RDD數(shù)據(jù)的 checkpointing 對于stateful 轉(zhuǎn)換操作是必要的瀑梗。

什么時候需要啟用 checkpoint烹笔?

什么時候該啟用 checkpoint 呢?滿足以下任一條件:

  • 使用了 stateful 轉(zhuǎn)換 - 如果 application 中使用了updateStateByKeyreduceByKeyAndWindow等 stateful 操作抛丽,必須提供 checkpoint 目錄來允許定時的 RDD checkpoint
  • 希望能從意外中恢復(fù) driver

如果 streaming app 沒有 stateful 操作谤职,也允許 driver 掛掉后再次重啟的進度丟失,就沒有啟用 checkpoint的必要了亿鲜。

如何使用 checkpoint柬帕?

啟用 checkpoint,需要設(shè)置一個支持容錯 的狡门、可靠的文件系統(tǒng)(如 HDFS陷寝、s3 等)目錄來保存 checkpoint 數(shù)據(jù)。通過調(diào)用 streamingContext.checkpoint(checkpointDirectory) 來完成其馏。另外凤跑,如果你想讓你的 application 能從 driver 失敗中恢復(fù),你的 application 要滿足:

  • 若 application 為首次重啟叛复,將創(chuàng)建一個新的 StreamContext 實例
  • 如果 application 是從失敗中重啟仔引,將會從 checkpoint 目錄導(dǎo)入 checkpoint 數(shù)據(jù)來重新創(chuàng)建 StreamingContext 實例

通過 StreamingContext.getOrCreate 可以達到目的:

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

如果 checkpointDirectory 存在扔仓,那么 context 將導(dǎo)入 checkpoint 數(shù)據(jù)。如果目錄不存在咖耘,函數(shù) functionToCreateContext 將被調(diào)用并創(chuàng)建新的 context

除調(diào)用 getOrCreate 外翘簇,還需要你的集群模式支持 driver 掛掉之后重啟之。例如儿倒,在 yarn 模式下版保,driver 是運行在 ApplicationMaster 中,若 ApplicationMaster 掛掉夫否,yarn 會自動在另一個節(jié)點上啟動一個新的 ApplicationMaster彻犁。

需要注意的是,隨著 streaming application 的持續(xù)運行凰慈,checkpoint 數(shù)據(jù)占用的存儲空間會不斷變大汞幢。因此,需要小心設(shè)置checkpoint 的時間間隔微谓。設(shè)置得越小森篷,checkpoint 次數(shù)會越多,占用空間會越大豺型;如果設(shè)置越大仲智,會導(dǎo)致恢復(fù)時丟失的數(shù)據(jù)和進度越多。一般推薦設(shè)置為 batch duration 的5~10倍触创。

導(dǎo)出 checkpoint 數(shù)據(jù)

上文提到坎藐,checkpoint 數(shù)據(jù)會定時導(dǎo)出到可靠的存儲系統(tǒng)为牍,那么

  1. 在什么時機進行 checkpoint
  2. checkpoint 的形式是怎么樣的

checkpoint 的時機

在 Spark Streaming 中哼绑,JobGenerator 用于生成每個 batch 對應(yīng)的 jobs,它有一個定時器碉咆,定時器的周期即初始化 StreamingContext 時設(shè)置的 batchDuration抖韩。這個周期一到,JobGenerator 將調(diào)用generateJobs方法來生成并提交 jobs疫铜,這之后調(diào)用 doCheckpoint 方法來進行 checkpoint茂浮。doCheckpoint 方法中,會判斷當(dāng)前時間與 streaming application start 的時間之差是否是 checkpoint duration 的倍數(shù)壳咕,只有在是的情況下才進行 checkpoint席揽。

checkpoint 的形式

最終 checkpoint 的形式是將類 Checkpoint的實例序列化后寫入外部存儲,值得一提的是谓厘,有專門的一條線程來做將序列化后的 checkpoint 寫入外部存儲幌羞。類 Checkpoint 包含以下數(shù)據(jù)

除了 Checkpoint 類,還有 CheckpointWriter 類用來導(dǎo)出 checkpoint竟稳,CheckpointReader 用來導(dǎo)入 checkpoint

Checkpoint 的局限

Spark Streaming 的 checkpoint 機制看起來很美好属桦,卻有一個硬傷熊痴。上文提到最終刷到外部存儲的是類 Checkpoint 對象序列化后的數(shù)據(jù)。那么在 Spark Streaming application 重新編譯后聂宾,再去反序列化 checkpoint 數(shù)據(jù)就會失敗果善。這個時候就必須新建 StreamingContext。

針對這種情況系谐,在我們結(jié)合 Spark Streaming + kafka 的應(yīng)用中巾陕,我們自行維護了消費的 offsets,這樣一來及時重新編譯 application蔚鸥,還是可以從需要的 offsets 來消費數(shù)據(jù)惜论,這里只是舉個例子,不詳細展開了止喷。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末馆类,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子弹谁,更是在濱河造成了極大的恐慌乾巧,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,744評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件预愤,死亡現(xiàn)場離奇詭異沟于,居然都是意外死亡,警方通過查閱死者的電腦和手機植康,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,505評論 3 392
  • 文/潘曉璐 我一進店門旷太,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人销睁,你說我怎么就攤上這事供璧。” “怎么了冻记?”我有些...
    開封第一講書人閱讀 163,105評論 0 353
  • 文/不壞的土叔 我叫張陵睡毒,是天一觀的道長。 經(jīng)常有香客問我冗栗,道長演顾,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,242評論 1 292
  • 正文 為了忘掉前任隅居,我火速辦了婚禮钠至,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘胎源。我一直安慰自己棉钧,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,269評論 6 389
  • 文/花漫 我一把揭開白布乒融。 她就那樣靜靜地躺著掰盘,像睡著了一般摄悯。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上愧捕,一...
    開封第一講書人閱讀 51,215評論 1 299
  • 那天奢驯,我揣著相機與錄音,去河邊找鬼次绘。 笑死瘪阁,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的邮偎。 我是一名探鬼主播管跺,決...
    沈念sama閱讀 40,096評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼禾进!你這毒婦竟也來了豁跑?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,939評論 0 274
  • 序言:老撾萬榮一對情侶失蹤泻云,失蹤者是張志新(化名)和其女友劉穎艇拍,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體宠纯,經(jīng)...
    沈念sama閱讀 45,354評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡卸夕,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,573評論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了婆瓜。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片快集。...
    茶點故事閱讀 39,745評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖廉白,靈堂內(nèi)的尸體忽然破棺而出个初,到底是詐尸還是另有隱情,我是刑警寧澤蒙秒,帶...
    沈念sama閱讀 35,448評論 5 344
  • 正文 年R本政府宣布勃黍,位于F島的核電站宵统,受9級特大地震影響晕讲,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜马澈,卻給世界環(huán)境...
    茶點故事閱讀 41,048評論 3 327
  • 文/蒙蒙 一瓢省、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧痊班,春花似錦勤婚、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,683評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽缨称。三九已至,卻和暖如春祝迂,著一層夾襖步出監(jiān)牢的瞬間睦尽,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,838評論 1 269
  • 我被黑心中介騙來泰國打工型雳, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留当凡,地道東北人。 一個月前我還...
    沈念sama閱讀 47,776評論 2 369
  • 正文 我出身青樓纠俭,卻偏偏與公主長得像沿量,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子冤荆,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,652評論 2 354

推薦閱讀更多精彩內(nèi)容