SPARK-21444踩坑記錄: DAGSchedulerEventProcessLoop failed when executor preempted

原文

前段時間工作中踩到SPARK-21444的坑纷闺,這里做個記錄安接。

1 場景描述

我們的一個spark app在正常運(yùn)行幾個月后經(jīng)常出現(xiàn)driver端hang住的情況挽拔,用yarn container -list查看發(fā)現(xiàn)只有driver的container還在矛纹,executors已全部退出抵栈。然后查看driver端的日志,發(fā)現(xiàn)DAGSchedulerEventProcessLoop failed昙读,SparkContext已shut down召调,錯誤棧如下:

19/03/27 13:39:37 ERROR scheduler.DAGSchedulerEventProcessLoop: DAGSchedulerEventProcessLoop failed; shutting down SparkContext

org.apache.spark.SparkException: Exception thrown in awaitResult:

? ? ? ? at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)

? ? ? ? at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)

? ? ? ? at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:152)

? ? ? ? at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:306)

? ? ? ? at org.apache.spark.broadcast.TorrentBroadcast.doDestroy(TorrentBroadcast.scala:197)

? ? ? ? at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:111)

? ? ? ? at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:98)

? ? ? ? at org.apache.spark.ShuffleStatus.invalidateSerializedMapOutputStatusCache(MapOutputTracker.scala:180)

? ? ? ? at org.apache.spark.ShuffleStatus$$anonfun$removeOutputsOnExecutor$1.apply$mcVI$sp(MapOutputTracker.scala:118)

? ? ? ? at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)

? ? ? ? at org.apache.spark.ShuffleStatus.removeOutputsOnExecutor(MapOutputTracker.scala:114)

? ? ? ? at org.apache.spark.MapOutputTrackerMaster$$anonfun$removeOutputsOnExecutor$2.apply(MapOutputTracker.scala:424)

? ? ? ? at org.apache.spark.MapOutputTrackerMaster$$anonfun$removeOutputsOnExecutor$2.apply(MapOutputTracker.scala:424)

? ? ? ? at scala.collection.Iterator$class.foreach(Iterator.scala:893)

? ? ? ? at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)

? ? ? ? at org.apache.spark.MapOutputTrackerMaster.removeOutputsOnExecutor(MapOutputTracker.scala:424)

? ? ? ? at org.apache.spark.scheduler.DAGScheduler.handleExecutorLost(DAGScheduler.scala:1471)

? ? ? ? at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1428)

? ? ? ? at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1787)

? ? ? ? at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1745)

? ? ? ? at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1734)

? ? ? ? at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Caused by: java.io.IOException: Connection reset by peer

? ? ? ? at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

? ? ? ? at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

? ? ? ? at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)

? ? ? ? at sun.nio.ch.IOUtil.read(IOUtil.java:192)

? ? ? ? at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)

? ? ? ? at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)

? ? ? ? at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)

? ? ? ? at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275)

? ? ? ? at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)

? ? ? ? at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)

? ? ? ? at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)

? ? ? ? at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)

? ? ? ? at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)

? ? ? ? at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)

? ? ? ? at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)

? ? ? ? at java.lang.Thread.run(Thread.java:748)

19/03/27 13:39:37 INFO scheduler.DAGScheduler: Executor lost: 388 (epoch 5955)

2 源碼分析

2.1 為何調(diào)用handleExecutorLost

從錯誤棧看到dagscheduler在處理CompletionEvent時(handleTaskCompletion方法中)調(diào)用了handleExecutorLost蛮浑,我們看下handleTaskCompletion的源碼:

/**

? * Responds to a task finishing. This is called inside the event loop so it assumes that it can

? * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.

? */

? private[scheduler] def handleTaskCompletion(event: CompletionEvent) {

//省略無關(guān)代碼 (下面以。只嚣。沮稚。代替)

? ? event.reason match {

? ? //。册舞。蕴掏。

? ? case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>

? ? ? //。调鲸。盛杰。

? ? ? ? ? // TODO: mark the executor as failed only if there were lots of fetch failures on it

? ? ? ? ? if (bmAddress != null) {

? ? ? ? ? ? handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))

? ? ? ? ? }

? ? ? ? }

? ? //...

? ? }

可以看到,當(dāng)task complete的原因是FetchFailed并且FetchFailed傳回的blockManager地址不為空藐石,則DAGScheduler會認(rèn)為對應(yīng)的executor掛了即供,然后調(diào)用handleExecutorLost進(jìn)行處理。

注意:這里有標(biāo)注 //TODO: mark the executor as failed only if there were lots of fetch failures on it

但是當(dāng)前版本(spark 2.2.3)中于微,只要一個executor上有一次fetchFailed就會調(diào)用handleExecutorLost處理逗嫡。

2.2 為何會有FetchFailed

為什么會有FetchFailed產(chǎn)生呢?我們在driver日志中發(fā)現(xiàn):

19/03/26 00:42:32 ERROR cluster.YarnClusterScheduler: Lost executor 674 on hostA: Container container_e131_1552474836332_972818_01_000882 on host: hostA was preempted.

還發(fā)現(xiàn):

org.apache.spark.shuffle.FetchFailedException: Failed to connect to hostA/xxxxxx:xxx

這說明FetchFailed是由于hostA上的executor 674的yarn container被preempted了株依。這是因?yàn)殡S著我們集群用戶增多驱证,hadoop集群負(fù)載逐漸增加,導(dǎo)致出現(xiàn)資源搶占恋腕。

下面我們看看handleExecutorLost做了什么抹锄。

2.3 handleExecutorLost做了什么

? private[scheduler] def handleExecutorLost(

? ? ? execId: String,

? ? ? filesLost: Boolean,

? ? ? maybeEpoch: Option[Long] = None) {

? ? val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)

? ? if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {

? ? ? failedEpoch(execId) = currentEpoch

? ? ? logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))

? ? ? blockManagerMaster.removeExecutor(execId)

? ? ? if (filesLost || !env.blockManager.externalShuffleServiceEnabled) {

? ? ? ? logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))

? ? ? ? mapOutputTracker.removeOutputsOnExecutor(execId)

? ? ? ? clearCacheLocs()

? ? ? }

? ? } else {

? ? ? logDebug("Additional executor lost message for " + execId +

? ? ? ? ? ? ? "(epoch " + currentEpoch + ")")

? ? }

? }

handleExecutorLost除了調(diào)用blockManagerMaster.removeExecutor移除lost executor外,還調(diào)用了mapOutputTracker.removeOutputsOnExecutor方法 (關(guān)于MapOutputTracker, 讀者可參考[Spark MapOutputTracker淺析](http://www.reibang.com/p/1409dbc78a15)):

? def removeOutputsOnExecutor(execId: String): Unit = {

? ? shuffleStatuses.valuesIterator.foreach { _.removeOutputsOnExecutor(execId) }

? ? incrementEpoch()

? }

shuffleStatuses記錄了所有shuffle的shuffleStatus對象,對每個shuffleStatus調(diào)用其removeOutputsOnExecutor方法移除指定executor上運(yùn)行的所有shuffle map tasks的mapStatus :

? def removeOutputsOnExecutor(execId: String): Unit = synchronized {

? ? for (mapId <- 0 until mapStatuses.length) {

? ? ? if (mapStatuses(mapId) != null && mapStatuses(mapId).location.executorId == execId) {

? ? ? ? _numAvailableOutputs -= 1

? ? ? ? mapStatuses(mapId) = null

? ? ? ? invalidateSerializedMapOutputStatusCache()

? ? ? }

? ? }

? }

2.4 removeOutputsOnExecutor做了什么

shuffleStatus的removeOutputsOnExecutor方法最后會調(diào)用invalidateSerializedMapOutputStatusCache方法伙单,這個方法會將mapStatus的cache清理掉获高,清理cache的步驟之一就是銷毀廣播變量cachedSerializedBroadcast(這個廣播變量包含了序列化后的所有shuffle map tasks的output狀態(tài)信息,即mapStatus對象)车份。

? /**

? * Clears the cached serialized map output statuses.

? */

? def invalidateSerializedMapOutputStatusCache(): Unit = synchronized {

? ? if (cachedSerializedBroadcast != null) {

? ? ? cachedSerializedBroadcast.destroy()

? ? ? cachedSerializedBroadcast = null

? ? }

? ? cachedSerializedMapStatus = null

? }

這里需要簡單介紹一下destroy廣播變量的過程:

銷毀一個broadcast變量其實(shí)就是刪除所有節(jié)點(diǎn)上對應(yīng)的broacast block谋减,大致流程是:

1. 向blockManagerMasterEndpoint發(fā)送RemoveBroadcast消息。

2. blockManagerMasterEndpoint接收到RemoveBroadcast消息后會將此消息轉(zhuǎn)發(fā)給所有executors和driver上的blockManagerSlaveEndpoint.

3. blockManagerSlaveEndpoint接收到RemoveBroadcast消息后會調(diào)用本地blockManager的removeBroadcast方法移除指定broadcast的所有blocks.

2.5 DAGSchedulerEventProcessLoop是如何fail的

通過上面的介紹扫沼,我們知道:

1. preemption導(dǎo)致FetchFailed異常出爹;

2. FetchFailed異常導(dǎo)致dagScheduler.handleExecutorLost方法被調(diào)用;

3. handleExecutorLost方法會調(diào)用shuffleStatus.invalidateSerializedMapOutputStatusCache方法清理map outputs狀態(tài)信息緩存缎除;

4. invalidateSerializedMapOutputStatusCache方法調(diào)用broadcast.destroy方法銷毀map outputs狀態(tài)信息緩存的廣播變量严就;

5. destroy方法最終會通過blockManagerMasterEndpoint向所有的blockManagerSlaveEndpoint發(fā)送RemoveBroadcast消息并等待返回。

現(xiàn)在我們來分析DAGSchedulerEventProcessLoop failed的原因:

FetchFailed是preemption產(chǎn)生的器罐,并且當(dāng)時我們的spark application有大量executor被preempted梢为。在這個時候,當(dāng)blockManagerMasterEndpoint向所有blockManagerSlaveEndpoint發(fā)送RemoveBroadcast消息時轰坊,很大概率會發(fā)向某個已經(jīng)被preempted但還沒來得及在blockManagerMasterEndpoint中被移除的executor铸董,這種情況下就會出現(xiàn)connection exception。然而肴沫,在spark 2.2.3版本中粟害,這樣的connection exception在invalidateSerializedMapOutputStatusCache方法及其外層調(diào)用方法中都沒有捕獲,最終被拋到DAGSchedulerEventProcessLoop中并導(dǎo)致其failed.

3 解決方案

這個bug已經(jīng)在spark 2.3.0中被fix了颤芬,但是為了避免引入新的bug悲幅,我們沒有直接升級到2.3版本,而是在2.2.3版本上加上了2.3.0中針對這個bug的fix站蝠,fix的代碼很簡單汰具,只是修改了一下invalidateSerializedMapOutputStatusCache方法:

? /**

? * Clears the cached serialized map output statuses.

? */

? def invalidateSerializedMapOutputStatusCache(): Unit = synchronized {

? ? if (cachedSerializedBroadcast != null) {

? ? ? // Prevent errors during broadcast cleanup from crashing the DAGScheduler (see SPARK-21444)

? ? ? Utils.tryLogNonFatalError{

? ? ? ? // Use `blocking = false` so that this operation doesn't hang while trying to send cleanup

? ? ? ? // RPCs to dead executors.

? ? ? ? cachedSerializedBroadcast.destroy(blocking = false)

? ? ? }

? ? ? cachedSerializedBroadcast = null

? ? }

? ? cachedSerializedMapStatus = null

? }

4 總結(jié)

本文從報(bào)錯棧開始,通過源碼分析一步步找出DAGSchedulerEventProcessLoop在executor被preempted的情況下fail掉的root cause菱魔,并展示了spark官方的fix方法留荔。

5 說明

本文源碼版本:2.2.3

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市豌习,隨后出現(xiàn)的幾起案子存谎,更是在濱河造成了極大的恐慌,老刑警劉巖肥隆,帶你破解...
    沈念sama閱讀 217,406評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件既荚,死亡現(xiàn)場離奇詭異,居然都是意外死亡栋艳,警方通過查閱死者的電腦和手機(jī)恰聘,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評論 3 393
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人晴叨,你說我怎么就攤上這事凿宾。” “怎么了兼蕊?”我有些...
    開封第一講書人閱讀 163,711評論 0 353
  • 文/不壞的土叔 我叫張陵初厚,是天一觀的道長。 經(jīng)常有香客問我孙技,道長产禾,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,380評論 1 293
  • 正文 為了忘掉前任牵啦,我火速辦了婚禮亚情,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘哈雏。我一直安慰自己楞件,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評論 6 392
  • 文/花漫 我一把揭開白布裳瘪。 她就那樣靜靜地躺著土浸,像睡著了一般。 火紅的嫁衣襯著肌膚如雪彭羹。 梳的紋絲不亂的頭發(fā)上栅迄,一...
    開封第一講書人閱讀 51,301評論 1 301
  • 那天,我揣著相機(jī)與錄音皆怕,去河邊找鬼。 笑死西篓,一個胖子當(dāng)著我的面吹牛愈腾,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播岂津,決...
    沈念sama閱讀 40,145評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼虱黄,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了吮成?” 一聲冷哼從身側(cè)響起橱乱,我...
    開封第一講書人閱讀 39,008評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎粱甫,沒想到半個月后泳叠,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,443評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡茶宵,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評論 3 334
  • 正文 我和宋清朗相戀三年危纫,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,795評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡种蝶,死狀恐怖契耿,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情螃征,我是刑警寧澤搪桂,帶...
    沈念sama閱讀 35,501評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站盯滚,受9級特大地震影響踢械,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜淌山,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評論 3 328
  • 文/蒙蒙 一裸燎、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧泼疑,春花似錦德绿、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至会油,卻和暖如春个粱,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背翻翩。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評論 1 269
  • 我被黑心中介騙來泰國打工都许, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人嫂冻。 一個月前我還...
    沈念sama閱讀 47,899評論 2 370
  • 正文 我出身青樓胶征,卻偏偏與公主長得像,于是被迫代替她去往敵國和親桨仿。 傳聞我的和親對象是個殘疾皇子睛低,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評論 2 354

推薦閱讀更多精彩內(nèi)容