Spark 1.3及其前的版本
你的一個(gè) spark streaming application 已經(jīng)好好運(yùn)行了一段時(shí)間了,這個(gè)時(shí)候你因?yàn)槟撤N原因要停止它。你應(yīng)該怎么做冷溃?直接暴力 kill 該 application 嗎?這可能會(huì)導(dǎo)致數(shù)據(jù)丟失,因?yàn)?receivers 可能已經(jīng)接受到了數(shù)據(jù),但該數(shù)據(jù)還未被處理琳钉,當(dāng)你強(qiáng)行停止該 application,driver 就沒辦法處理這些本該處理的數(shù)據(jù)蛛倦。
所以歌懒,我們應(yīng)該使用一種避免數(shù)據(jù)丟失的方式,官方建議調(diào)用 StreamingContext#stop(stopSparkContext: Boolean, stopGracefully: Boolean)
溯壶,將 stopGracefully 設(shè)置為 true及皂,這樣可以保證在 driver 結(jié)束前處理完所有已經(jīng)接受的數(shù)據(jù)。
一個(gè) streaming application 往往是長(zhǎng)時(shí)間運(yùn)行的且改,所以存在兩個(gè)問題:
- 應(yīng)該在什么時(shí)候去調(diào)用
StreamingContext#stop
- 當(dāng) streaming application 已經(jīng)在運(yùn)行了該怎么去調(diào)用
StreamingContext#stop
how
通過 Runtime.getRuntime().addShutdownHook
注冊(cè)關(guān)閉鉤子验烧, JVM將在關(guān)閉之前執(zhí)行關(guān)閉鉤子中的 run
函數(shù)(不管是正常退出還是異常退出都會(huì)調(diào)用),所以我們可以在 driver 代碼中加入以下代碼:
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() {
log("Shutting down streaming app...")
streamingContext.stop(true, true)
log("Shutdown of streaming app complete.")
}
})
這樣就能保證即使 application 被強(qiáng)行 kill 掉又跛,在 driver 結(jié)束前碍拆,streamingContext.stop(true, true)
也會(huì)被調(diào)用,從而保證已接收的數(shù)據(jù)都會(huì)被處理慨蓝。
Spark 1.4及其后的版本
上一小節(jié)介紹的方法僅適用于 1.3及以前的版本感混,在 1.4及其后的版本中不僅不能保證生效,甚至?xí)鹚梨i等線程問題菌仁。在 1.4及其后的版本中浩习,我們只需設(shè)置 spark.streaming.stopGracefullyOnShutdown
為 true
即可達(dá)到上一小節(jié)相同的效果。
下面來分析為什么上一小節(jié)介紹的方法在 1.4其后的版本中不能用济丘。首先谱秽,需要明確的是:
- 當(dāng)我們注冊(cè)了多個(gè)關(guān)閉鉤子時(shí)洽蛀,JVM開始啟用其關(guān)閉序列時(shí),它會(huì)以某種未指定的順序啟動(dòng)所有已注冊(cè)的關(guān)閉鉤子疟赊,并讓它們同時(shí)運(yùn)行
- 萬(wàn)一不止一個(gè)關(guān)閉鉤子郊供,它們將并行地運(yùn)行,并容易引發(fā)線程問題近哟,例如死鎖
綜合以上兩點(diǎn)驮审,我們可以明確,如果除了我們注冊(cè)的關(guān)閉鉤子外吉执,driver 還有注冊(cè)了其他鉤子疯淫,將會(huì)引發(fā)上述兩個(gè)問題。
在 StreamingContext#start 中戳玫,會(huì)調(diào)用
ShutdownHookManager.addShutdownHook(StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
該函數(shù)最終注冊(cè)一個(gè)關(guān)閉鉤子熙掺,并會(huì)在 run
方法中調(diào)用 stopOnShutdown
,
private def stopOnShutdown(): Unit = {
val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")
// Do not stop SparkContext, let its own shutdown hook stop it
stop(stopSparkContext = false, stopGracefully = stopGracefully)
}
從 stopOnShutdown
中會(huì)根據(jù) stopGracefully
的值來決定是否以優(yōu)雅的方式結(jié)束 driver咕宿,而 stopGracefully
的值由 spark.streaming.stopGracefullyOnShutdown
決定币绩。結(jié)合上文,也就能說明為什么 spark.streaming.stopGracefullyOnShutdown
能決定是否優(yōu)雅的結(jié)束 application 和為什么上一小節(jié)的方法不適用與 1.4及其后版本府阀。