我們都知道SparkStreaming程序是一個(gè)長服務(wù),一旦運(yùn)轉(zhuǎn)起來不會(huì)輕易停掉衫樊,那么如果我們想要停掉正在運(yùn)行的程序應(yīng)該怎么做呢飒赃?
如果運(yùn)行的是spark on yarn模式直接使用
yarn application -kill taskId
暴力停掉sparkstreaming是有可能出現(xiàn)問題的,比如你的數(shù)據(jù)源是kafka科侈,已經(jīng)加載了一批數(shù)據(jù)到sparkstreaming中正在處理载佳,如果中途停掉,這個(gè)批次的數(shù)據(jù)很有可能沒有處理完,就被強(qiáng)制stop了藕漱,下次啟動(dòng)時(shí)候會(huì)重復(fù)消費(fèi)或者部分?jǐn)?shù)據(jù)丟失肋联。
如何優(yōu)雅的關(guān)閉spark streaming呢?方式主要有三種:
第一種:全人工介入
Spark 1.3及其前的版本
通過 Runtime.getRuntime().addShutdownHook
注冊關(guān)閉鉤子侮繁, JVM將在關(guān)閉之前執(zhí)行關(guān)閉鉤子中的 run
函數(shù)(不管是正常退出還是異常退出都會(huì)調(diào)用)彬祖,所以我們可以在 driver 代碼中加入以下代碼:
JAVA代碼:
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() {
log("Shutting down streaming app...")
ssc.stop(true, true)
log("Shutdown of streaming app complete.")
}
})
或
Scala方式:
sys.ShutdownHookThread
{
ssc.stop(true, true)
}
這樣就能保證即使 application 被強(qiáng)行 kill 掉圆恤,在 driver 結(jié)束前羽历,ssc.stop(true, true)
也會(huì)被調(diào)用,從而保證已接收的數(shù)據(jù)都會(huì)被處理跳夭。
Spark 1.4及其后的版本
上一小節(jié)介紹的方法僅適用于 1.3及以前的版本,在 1.4及其后的版本中不僅不能保證生效模狭,甚至?xí)鹚梨i等線程問題践磅。在 1.4及其后的版本中,只需要在SparkConf里面設(shè)置下面的參數(shù)即可:
sparkConf.set("spark.streaming.stopGracefullyOnShutdown","true")
然后按照下面的步驟依次操作:
- 通過Hadoop 8088頁面找到運(yùn)行的程序
- 打開spark ui的監(jiān)控頁面
- 打開executor的監(jiān)控頁面
- 登錄liunx找到驅(qū)動(dòng)節(jié)點(diǎn)所在的機(jī)器ip以及運(yùn)行的端口號
- 然后執(zhí)行一個(gè)封裝好的命令
sudo ss -tanlp | grep 5555 |awk '{print $6}'|awk -F, '{print $2}' | sudo xargs kill -15
注意上面的操作執(zhí)行后,sparkstreaming程序浑侥,并不會(huì)立即停止寓落,而是會(huì)把當(dāng)前的批處理里面的數(shù)據(jù)處理完畢后 才會(huì)停掉史飞,此間sparkstreaming不會(huì)再消費(fèi)kafka的數(shù)據(jù)祸憋,這樣以來就能保證結(jié)果不丟和重復(fù)蚯窥。
從上面的步驟可以看出,這樣停掉一個(gè)spark streaming程序是比較復(fù)雜的。那么有沒有更加優(yōu)雅的方式來停止它呢允乐?答案是有的
第二種:使用HDFS系統(tǒng)做消息通知
在驅(qū)動(dòng)程序中,加一段代碼鳞陨,這段代碼的作用每隔一段時(shí)間可以是10秒也可以是3秒歼狼,掃描HDFS上某一個(gè)文件趟咆,如果發(fā)現(xiàn)這個(gè)文件存在,就調(diào)用StreamContext對象stop方法计雌,自己優(yōu)雅的終止自己妈橄,其實(shí)這里HDFS可以換成redis反番,zk罢缸,hbase爵川,db都可以,這里唯一的問題就是依賴了外部的一個(gè)存儲系統(tǒng)來達(dá)到消息通知的目的圃泡,如果使用了這種方式后洞焙。停止流程序就比較簡單了褒链,登錄上有hdfs客戶端的機(jī)器惦费,然后touch一個(gè)空文件到指定目錄,然后等到間隔的掃描時(shí)間到之后鳍贾,發(fā)現(xiàn)有文件存在构拳,就知道需要關(guān)閉程序了。
第三種:內(nèi)部暴露一個(gè)socket或者h(yuǎn)ttp端口用來接收請求,等待觸發(fā)關(guān)閉流程序
這種方式,需要在driver啟動(dòng)一個(gè)socket線程县好,或者h(yuǎn)ttp服務(wù)拣播,這里推薦使用http服務(wù),因?yàn)閟ocket有點(diǎn)偏底層處理起來稍微復(fù)雜點(diǎn),如果使用http服務(wù),我們可以直接用內(nèi)嵌的jetty,對外暴露一個(gè)http接口辟拷,spark ui頁面用的也是內(nèi)嵌的jetty提供服務(wù)隅俘,所以我不需要在pom里面引入額外的依賴杀狡,在關(guān)閉的時(shí)候,找到驅(qū)動(dòng)所在ip,就可以直接通過curl或者瀏覽器就直接關(guān)閉流程序。
找到驅(qū)動(dòng)程序所在的ip,可以在程序啟動(dòng)的log中看到阐肤,也可以在spark master ui的頁面上找到。這種方式不依賴任何外部的存儲系統(tǒng)削罩,僅僅部署的時(shí)候需要一個(gè)額外的端口號用來暴露http服務(wù)愿阐。
至此辛孵,關(guān)于優(yōu)雅的停止spark streaming的主流方式已經(jīng)介紹完畢焚廊,推薦使用第二種或者第三種搞疗,如果想要最大程度減少對外部系統(tǒng)的依賴,推薦使用第三種方式。
關(guān)于具體第二種和第三種的樣例代碼,下篇文章會(huì)整理一下放在github中給大家參考。