對(duì)于長(zhǎng)時(shí)間運(yùn)行的Spark Streaming作業(yè)苛秕,一旦提交到Y(jié)ARN群集便需要永久運(yùn)行颖杏,直到有意停止姜凄。任何中斷都會(huì)引起嚴(yán)重的處理延遲政溃,并可能導(dǎo)致數(shù)據(jù)丟失或重復(fù)。YARN和Apache Spark都不是為了執(zhí)行長(zhǎng)時(shí)間運(yùn)行的服務(wù)而設(shè)計(jì)的态秧。但是董虱,它們已經(jīng)成功地滿(mǎn)足了近實(shí)時(shí)數(shù)據(jù)處理作業(yè)的常駐需求。成功并不一定意味著沒(méi)有技術(shù)挑戰(zhàn)申鱼。
這篇博客總結(jié)了在安全的YARN集群上空扎,運(yùn)行一個(gè)關(guān)鍵任務(wù)且長(zhǎng)時(shí)間的Spark Streaming作業(yè)的經(jīng)驗(yàn)。您將學(xué)習(xí)如何將Spark Streaming應(yīng)用程序提交到Y(jié)ARN群集润讥,以避免在值班時(shí)候的不眠之夜转锈。
Fault tolerance
在YARN集群模式下,Spark驅(qū)動(dòng)程序與Application Master(應(yīng)用程序分配的第一個(gè)YARN容器)在同一容器中運(yùn)行楚殿。此過(guò)程負(fù)責(zé)從YARN 驅(qū)動(dòng)應(yīng)用程序和請(qǐng)求資源(Spark執(zhí)行程序)撮慨。重要的是竿痰,Application Master消除了在應(yīng)用程序生命周期中運(yùn)行的任何其他進(jìn)程的需要。即使一個(gè)提交Spark Streaming作業(yè)的邊緣Hadoop節(jié)點(diǎn)失敗砌溺,應(yīng)用程序也不會(huì)受到影響影涉。
要以集群模式運(yùn)行Spark Streaming應(yīng)用程序,請(qǐng)確保為spark-submit命令提供以下參數(shù):
spark-submit --master yarn --deploy-mode cluster</pre>
由于Spark驅(qū)動(dòng)程序和Application Master共享一個(gè)JVM规伐,Spark驅(qū)動(dòng)程序中的任何錯(cuò)誤都會(huì)阻止我們長(zhǎng)期運(yùn)行的工作蟹倾。幸運(yùn)的是,可以配置重新運(yùn)行應(yīng)用程序的最大嘗試次數(shù)猖闪。設(shè)置比默認(rèn)值2更高的值是合理的(從YARN集群屬性yarn.resourcemanager.am.max嘗試中導(dǎo)出)鲜棠。對(duì)我來(lái)說(shuō),4工作相當(dāng)好培慌,即使失敗的原因是永久性的豁陆,較高的值也可能導(dǎo)致不必要的重新啟動(dòng)。
spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAppAttempts=4
如果應(yīng)用程序運(yùn)行數(shù)天或數(shù)周吵护,而不重新啟動(dòng)或重新部署在高度使用的群集上盒音,則可能在幾個(gè)小時(shí)內(nèi)耗盡4次嘗試。為了避免這種情況馅而,嘗試計(jì)數(shù)器應(yīng)該在每個(gè)小時(shí)都重置祥诽。
spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAppAttempts=4 --conf spark.yarn.am.attemptFailuresValidityInterval=1h
另一個(gè)重要的設(shè)置是在應(yīng)用程序發(fā)生故障之前executor失敗的最大數(shù)量。默認(rèn)情況下是max(2 * num executors瓮恭,3)雄坪,非常適合批處理作業(yè),但不適用于長(zhǎng)時(shí)間運(yùn)行的作業(yè)偎血。該屬性具有相應(yīng)的有效期間,也應(yīng)設(shè)置盯漂。
spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAppAttempts=4 --conf spark.yarn.am.attemptFailuresValidityInterval=1h --conf spark.yarn.max.executor.failures={8 * num_executors} --conf spark.yarn.executor.failuresValidityInterval=1h
對(duì)于長(zhǎng)時(shí)間運(yùn)行的作業(yè)颇玷,您也可以考慮在放棄作業(yè)之前提高任務(wù)失敗的最大數(shù)量。默認(rèn)情況下就缆,任務(wù)將重試4次帖渠,然后作業(yè)失敗。
spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAppAttempts=4 --conf spark.yarn.am.attemptFailuresValidityInterval=1h --conf spark.yarn.max.executor.failures={8 * num_executors} --conf spark.yarn.executor.failuresValidityInterval=1h --conf spark.task.maxFailures=8
Performance
當(dāng)Spark Streaming應(yīng)用程序提交到集群時(shí)竭宰,必須定義運(yùn)行作業(yè)的YARN隊(duì)列空郊。我強(qiáng)烈建議使用YARN Capacity Scheduler并將長(zhǎng)時(shí)間運(yùn)行的作業(yè)提交到單獨(dú)的隊(duì)列。沒(méi)有一個(gè)單獨(dú)的YARN隊(duì)列切揭,您的長(zhǎng)時(shí)間運(yùn)行的工作遲早將被的大量Hive查詢(xún)搶占狞甚。
spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAppAttempts=4 --conf spark.yarn.am.attemptFailuresValidityInterval=1h --conf spark.yarn.max.executor.failures={8 * num_executors} --conf spark.yarn.executor.failuresValidityInterval=1h --conf spark.task.maxFailures=8 --queue realtime_queue
Spark Streaming工作的另一個(gè)重要問(wèn)題是保持處理時(shí)間的穩(wěn)定性和高度可預(yù)測(cè)性。處理時(shí)間應(yīng)保持在批次持續(xù)時(shí)間以下以避免延誤廓旬。我發(fā)現(xiàn)Spark的推測(cè)執(zhí)行有很多幫助哼审,特別是在繁忙的群集中。當(dāng)啟用推測(cè)性執(zhí)行時(shí),批處理時(shí)間更加穩(wěn)定涩盾。只有當(dāng)Spark操作是冪等時(shí)十气,才能啟用推測(cè)模式。
spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAppAttempts=4 --conf spark.yarn.am.attemptFailuresValidityInterval=1h --conf spark.yarn.max.executor.failures={8 * num_executors} --conf spark.yarn.executor.failuresValidityInterval=1h --conf spark.task.maxFailures=8 --queue realtime_queue --conf spark.speculation=true
Security
在安全的HDFS群集上春霍,長(zhǎng)時(shí)間運(yùn)行的Spark Streaming作業(yè)由于Kerberos票據(jù)到期而失敗砸西。沒(méi)有其他設(shè)置,當(dāng)Spark Streaming作業(yè)提交到集群時(shí)址儒,會(huì)發(fā)布Kerberos票證芹枷。當(dāng)票證到期時(shí)Spark Streaming作業(yè)不能再?gòu)腍DFS寫(xiě)入或讀取數(shù)據(jù)。
在理論上(基于文檔)离福,應(yīng)該將Kerberos主體和keytab作為spark-submit命令傳遞:
spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAppAttempts=4 --conf spark.yarn.am.attemptFailuresValidityInterval=1h --conf spark.yarn.max.executor.failures={8 * num_executors} --conf spark.yarn.executor.failuresValidityInterval=1h --conf spark.task.maxFailures=8 --queue realtime_queue --conf spark.speculation=true --principal user/hostname@domain --keytab /path/to/foo.keytab
實(shí)際上杖狼,由于幾個(gè)錯(cuò)誤(HDFS-9276, SPARK-11182)必須禁用HDFS緩存。如果沒(méi)有妖爷,Spark將無(wú)法從HDFS上的文件讀取更新的令牌蝶涩。
spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAppAttempts=4 --conf spark.yarn.am.attemptFailuresValidityInterval=1h --conf spark.yarn.max.executor.failures={8 * num_executors} --conf spark.yarn.executor.failuresValidityInterval=1h --conf spark.task.maxFailures=8 --queue realtime_queue --conf spark.speculation=true --principal user/hostname@domain --keytab /path/to/foo.keytab --conf spark.hadoop.fs.hdfs.impl.disable.cache=true
Mark Grover指出,這些錯(cuò)誤只影響在HA模式下配置了NameNodes的HDFS集群絮识。謝謝绿聘,馬克
Logging
訪問(wèn)Spark應(yīng)用程序日志的最簡(jiǎn)單方法是配置Log4j控制臺(tái)追加程序,等待應(yīng)用程序終止并使用yarn logs -applicationId [applicationId]命令次舌。不幸的是終止長(zhǎng)時(shí)間運(yùn)行的Spark Streaming作業(yè)來(lái)訪問(wèn)日志是不可行的熄攘。
我建議安裝和配置Elastic,Logstash和Kibana(ELK套裝)彼念。ELK的安裝和配置是超出了這篇博客的范圍挪圾,但請(qǐng)記住記錄以下上下文字段:
- YARN application id
- YARN container hostname
- Executor id (Spark driver is always 000001, Spark executors start from 000002)
- YARN attempt (to check how many times Spark driver has been restarted)
Log4j配置使用Logstash特定的appender和布局定義應(yīng)該傳遞給spark-submit命令:
spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAppAttempts=4 --conf spark.yarn.am.attemptFailuresValidityInterval=1h --conf spark.yarn.max.executor.failures={8 * num_executors} --conf spark.yarn.executor.failuresValidityInterval=1h --conf spark.task.maxFailures=8 --queue realtime_queue --conf spark.speculation=true --principal user/hostname@domain --keytab /path/to/foo.keytab --conf spark.hadoop.fs.hdfs.impl.disable.cache=true --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties --files /path/to/log4j.properties
最后,Spark Job的Kibana儀表板可能如下所示:
Monitoring
長(zhǎng)時(shí)間運(yùn)行的工作全天候運(yùn)行逐沙,所以了解歷史指標(biāo)很重要哲思。Spark UI僅在有限數(shù)量的批次中保留統(tǒng)計(jì)信息,并且在重新啟動(dòng)后吩案,所有度量標(biāo)準(zhǔn)都消失了棚赔。再次,需要外部工具徘郭。我建議安裝Graphite用于收集指標(biāo)和Grafana來(lái)建立儀表板靠益。
首先,Spark需要配置為將指標(biāo)報(bào)告給Graphite残揉,準(zhǔn)備metrics.properties文件:
*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=[hostname]
*.sink.graphite.port=[port] *.sink.graphite.prefix=some_meaningful_name
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
Graceful stop
最后一個(gè)難題是如何以?xún)?yōu)雅的方式停止部署在YARN上的Spark Streaming應(yīng)用程序胧后。停止(甚至殺死)YARN應(yīng)用程序的標(biāo)準(zhǔn)方法是使用命令yarn application -kill [applicationId]。這個(gè)命令會(huì)停止Spark Streaming應(yīng)用程序抱环,但這可能發(fā)生在批處理中绩卤。因此途样,如果該作業(yè)是從Kafka讀取數(shù)據(jù)然后在HDFS上保存處理結(jié)果,并最終提交Kafka偏移量濒憋,當(dāng)作業(yè)在提交偏移之前停止工作時(shí)何暇,您應(yīng)該預(yù)見(jiàn)到HDFS會(huì)有重復(fù)的數(shù)據(jù)。
解決優(yōu)雅關(guān)機(jī)問(wèn)題的第一個(gè)嘗試是在關(guān)閉程序時(shí)回調(diào)Spark Streaming Context的停止方法凛驮。
sys.addShutdownHook {
streamingContext.stop(stopSparkContext = true, stopGracefully = true)
}
令人失望的是裆站,由于Spark應(yīng)用程序幾乎立即被殺死,一個(gè)退出回調(diào)函數(shù)來(lái)不及完成已啟動(dòng)的批處理任務(wù)黔夭。此外宏胯,不能保證JVM會(huì)調(diào)用shutdown hook。
在撰寫(xiě)本博客文章時(shí)本姥,唯一確認(rèn)的YARN Spark Streaming應(yīng)用程序的確切方法是通知應(yīng)用程序關(guān)于計(jì)劃關(guān)閉肩袍,然后以編程方式停止流式傳輸(但不是關(guān)閉掛鉤)。命令yarn application -kill 如果通知應(yīng)用程序在定義的超時(shí)后沒(méi)有停止婚惫,則應(yīng)該僅用作最后手段氛赐。
可以使用HDFS上的標(biāo)記文件(最簡(jiǎn)單的方法)或使用驅(qū)動(dòng)程序上公開(kāi)的簡(jiǎn)單Socket / HTTP端點(diǎn)(復(fù)雜方式)通知應(yīng)用程序。
因?yàn)槲蚁矚gKISS原理先舷,下面你可以找到shell腳本偽代碼艰管,用于啟動(dòng)/停止Spark Streaming應(yīng)用程序使用標(biāo)記文件:
start() {
hdfs dfs -touchz /path/to/marker/my_job_unique_name
spark-submit ...
}
stop() {
hdfs dfs -rm /path/to/marker/my_job_unique_name
force_kill=true application_id=$(yarn application -list | grep -oe "application_[0-9]*_[0-9]*"`) for i in `seq 1 10`; do application_status=$(yarn application -status ${application_id} | grep "State : \(RUNNING\|ACCEPTED\)") if [ -n "$application_status" ]; then
sleep 60s else force_kill=false
break fi
done
$force_kill && yarn application -kill ${application_id}
}
在Spark Streaming應(yīng)用程序中,后臺(tái)線程應(yīng)該監(jiān)視標(biāo)記文件蒋川,當(dāng)文件消失時(shí)停止上下文調(diào)用
streamingContext.stop(stopSparkContext = true, stopGracefully = true).
Summary
可以看到牲芋,部署在YARN上的關(guān)鍵任務(wù)Spark Streaming應(yīng)用程序的配置相當(dāng)復(fù)雜。以上提出的技術(shù)捺球,由一些非常聰明的開(kāi)發(fā)人員經(jīng)過(guò)漫長(zhǎng)而冗長(zhǎng)乏味的迭代學(xué)習(xí)缸浦。最終,部署在高可用的YARN集群上的長(zhǎng)期運(yùn)行的Spark Streaming應(yīng)用非常穩(wěn)定氮兵。
翻譯:http://mkuthan.github.io/blog/2016/09/30/spark-streaming-on-yarn/
關(guān)注公眾號(hào):“程序員成長(zhǎng)軟技能” 裂逐,日拱一卒,功不唐捐胆剧!