關(guān)注公眾號:“程序員成長軟技能” 酪术,日拱一卒,功不唐捐跃巡!????????Spark Streaming 應(yīng)用定位是長期執(zhí)行的。但如何優(yōu)雅的關(guān)閉它牧愁,使正在被處理的消息在作業(yè)停止前被妥善處理素邪?很多博文建議我們必須通過JVM關(guān)閉的鉤子,可在此 查看相關(guān)代碼递宅。但是娘香,這個方法在新的Spark版本(1.4版本之后)中不能正常工作,并且會引起死鎖情況办龄。
????????目前有兩種方式去優(yōu)雅的關(guān)閉Spark Streaming作業(yè)。第一種方法是設(shè)置spark.streaming.stopGracefullyOnShutdown參數(shù)值為true(默認是false)淋昭。這個參數(shù)在解決Spark優(yōu)雅關(guān)閉的issue中引入俐填。開發(fā)者不再需要去調(diào)用ssc.stop()函數(shù),只需要向Driver發(fā)送SIGTERM信號翔忽。在實踐中英融,我們需要如下操作:
- 在Spark UI上找到Driver進程運行在哪個節(jié)點。在Yarn Cluster部署模式下歇式,Driver進程和AM運行在同一個Container驶悟。
- 登陸運行Driver的節(jié)點,并且執(zhí)行ps -ef |grep java |grep ApplicationMaster 去找到進程ID材失。請注意痕鳍,你搜索的字符串可能會因為作業(yè)或者環(huán)境等原因不同。
- 執(zhí)行kill -SIGTERM <AM-PID> 命令,發(fā)送SIGTERM信號給進程笼呆。
在Spark Driver接收到SIGTERM信號后熊响,你會在日志中看到類似如下的消息:
17/02/02 01:31:35 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM*
17/02/02 01:31:35 INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook...
17/02/02 01:31:45 INFO streaming.StreamingContext: StreamingContext stopped successfully**
17/02/02 01:31:45 INFO spark.SparkContext: Invoking stop() from shutdown hook...
17/02/02 01:31:45 INFO spark.SparkContext: Successfully stopped SparkContext...
17/02/02 01:31:45 INFO util.ShutdownHookManager: Shutdown hook called*
????????需要注意,默 spark.yarn.maxAppAttempts默認使用Yarn的yarn.resourcemanager.am.max-attempts的值诗赌。而yarn.resourcemanager.am.max-attempts值默認為2汗茄。因此,在執(zhí)行kill命令A(yù)M第一次停止后铭若,Yarn將會自動啟動另一個AM/Driver洪碳。你需要第二次kill掉它。你可以在spark-submit設(shè)置--conf spark.yarn.maxAppAttempts=1 叼屠,但是你必須考慮清楚偶宫,因為如此配置后Driver失敗后將沒機會重試。
????????你不能使用yarn application -kill <applicationid>去kill作業(yè)环鲤。這個命令不會發(fā)送SIGTERM信號給container纯趋,而是幾乎同時發(fā)送SIGKILL信號。SIGTERM和SIGKILL之間的時間間隔可以使用yarn.nodemanager.sleep-delay-before-sigkill.ms (默認 250)去配置冷离。當(dāng)然吵冒,你可以增大該值,但是西剥, 在一定程度上痹栖,即使我調(diào)整到60000(1分鐘),它仍然不起作用瞭空。作業(yè)的containers幾乎是立即被kill掉揪阿,并且日志中僅包含如下內(nèi)容:
17/02/02 12:12:27 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM*
17/02/02 12:12:27 INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook*
????????所以,我不建議使用yarn application -kill <applicationid> 命令去發(fā)送SIGTERM信號咆畏。
????????第二個解決方案是以某種方式通知Spark Streaming應(yīng)用它需要優(yōu)雅的關(guān)閉南捂,而不是使用SIGTERM信號。一種方式是在HDFS上放一個標(biāo)識文件旧找,Spark Streaming應(yīng)用周期性的去檢測它溺健。如果標(biāo)識文件存在了,就調(diào)用scc.stop(true, true) 钮蛛。第一個true意思是Spark context需要被停止鞭缭。第二個true意思是需要優(yōu)雅的關(guān)閉,允許正在處理的消息完成魏颓。
????????至關(guān)重要的是岭辣,不要在micro-batch的代碼中調(diào)用ssc.stop(true, true),試想一下甸饱,如果你在微批代碼中調(diào)用ssc.stop(true, true)沦童,它將等待所有正在被處理的消息完成,包括當(dāng)前正在執(zhí)行的微批。但是搞动,當(dāng)前的微批不會結(jié)束躏精,直到ssc.stop(true, true)結(jié)束返回。這是一種死鎖的情況鹦肿。所以矗烛,你必須在另一個線程中執(zhí)行標(biāo)識文件檢測和調(diào)用ssc.stop(true, true)。我在github上放了一個簡單的樣例箩溃,此樣例里我在mian線程中在ssc.start()后執(zhí)行檢測和調(diào)用ssc.stop() 瞭吃。你可以在這里找到源碼。當(dāng)然涣旨,使用HDFS標(biāo)識文件僅僅是一種方法歪架,其他可選擇的方法有使用一個單獨的線程監(jiān)聽一個socket,啟動一個RESTful服務(wù)等等霹陡。
????????期待在將來的release中和蚪,Spark會考慮更優(yōu)雅的方案。比如烹棉,在Spark UI中可以增加一個按鈕攒霹,去優(yōu)雅的停止Spark Streaming作業(yè),這樣浆洗,我們就不需要憑借定制化的編碼或者使用PID和SIGTERM信號了催束。
翻譯:http://blog.parseconsulting.com/2017/02/how-to-shutdown-spark-streaming-job.html
關(guān)注公眾號:“程序員成長軟技能” ,日拱一卒伏社,功不唐捐抠刺!