Spark Streaming Executor DynamicAllocation 機(jī)制分析

—————?—————?—————?—————?—————?—————
Spark Streaming概述
Spark Streaming 初始化過(guò)程
Spark Streaming Receiver啟動(dòng)過(guò)程分析
Spark Streaming 數(shù)據(jù)準(zhǔn)備階段分析(Receiver方式)
Spark Streaming 數(shù)據(jù)計(jì)算階段分析
SparkStreaming Backpressure分析
Spark Streaming Executor DynamicAllocation 機(jī)制分析

—————?—————?—————?—————?—————?—————

1黍判、 引入Streaming Executor DynamicAllocation 機(jī)制的原因

在Spark Streaming喳瓣,作業(yè)的執(zhí)行是以批處理的方式進(jìn)行的醇份,批處理間隔內(nèi)(batch interval)要完成對(duì)批量作業(yè)的執(zhí)行吝沫,這就要求作業(yè)的執(zhí)行時(shí)間(process time)不大于設(shè)定的批處理間隔须蜗。在計(jì)算資源一定的情況下垢村,執(zhí)行時(shí)間與待處理的數(shù)據(jù)規(guī)模成正比躏结。在大數(shù)據(jù)流式計(jì)算環(huán)境中劫狠,數(shù)據(jù)的產(chǎn)生完全由數(shù)據(jù)源決定,由于不同的數(shù)據(jù)源在不同時(shí)空范圍內(nèi)的狀態(tài)不統(tǒng)一且發(fā)生動(dòng)態(tài)變化潜的,導(dǎo)致數(shù)據(jù)流的速率呈現(xiàn)出了突發(fā)性的特征骚揍。前一時(shí)刻數(shù)據(jù)速率和后一時(shí)刻數(shù)據(jù)速率可能會(huì)有巨大的差異, 因此不同批次接收的數(shù)據(jù)總量存在差異啰挪,不同批次的執(zhí)行時(shí)間也就會(huì)存在差異信不。這種差異是由于計(jì)算資源與數(shù)據(jù)規(guī)模不匹配造成的。在Spark 2.0以前亡呵, Streaming的資源分配都采取資源預(yù)先分配的策略抽活,資源管理器依據(jù)應(yīng)用的申請(qǐng)量予以提前分配,在應(yīng)用執(zhí)行期間不能依據(jù)應(yīng)用實(shí)際的計(jì)算資源需求量進(jìn)行調(diào)整锰什,執(zhí)行作業(yè)時(shí)會(huì)存在資源過(guò)氏滤叮或資源不足的情況丁逝。

2、Spark Streaming 的計(jì)算特征

Spark Streaming應(yīng)用的執(zhí)行過(guò)程可以分成數(shù)據(jù)準(zhǔn)備和數(shù)據(jù)計(jì)算兩個(gè)階段卵牍,兩個(gè)階段分別由不同的作業(yè)進(jìn)行處理果港,分別是數(shù)據(jù)接收作業(yè)(提交Receiver時(shí)的Job)和數(shù)據(jù)處理作業(yè)(Batch Job)。在大數(shù)據(jù)流式計(jì)算中糊昙,數(shù)據(jù)是實(shí)時(shí)產(chǎn)生辛掠、動(dòng)態(tài)增加的,只要數(shù)據(jù)源處于活動(dòng)狀態(tài),數(shù)據(jù)就會(huì)一直產(chǎn)生释牺,因此要求數(shù)據(jù)接收作業(yè)不間斷的接收數(shù)據(jù)萝衩,并將接收的數(shù)據(jù)分割成批,供數(shù)據(jù)處理作業(yè)消費(fèi)没咙。數(shù)據(jù)處理作業(yè)以批處理方式運(yùn)行猩谊。一般情況下,批處理的執(zhí)行時(shí)間不大于其批處理間隔祭刚,一個(gè)批次可以分成作業(yè)實(shí)際處理階段和等待下一個(gè)批次階段牌捷。綜上所述,Spark Streaming的計(jì)算特征如下圖所示(實(shí)際處理時(shí)間<批處理間隔):


SparkStreaming 計(jì)算特征

另外涡驮,特殊情況下暗甥,會(huì)存在實(shí)際處理階段所用的時(shí)間 > 批處理時(shí)間。這種情況下捉捅,一般是由于作業(yè)處理能力弱引起的撤防。

3、Streaming Executor DynamicAllocation機(jī)制的評(píng)價(jià)指標(biāo)

經(jīng)過(guò)上述分析棒口,作業(yè)的實(shí)際處理時(shí)間與設(shè)定的批處理間隔之間存在的關(guān)系如下:

  • 作業(yè)的實(shí)際處理時(shí)間 遠(yuǎn)小于 批處理間隔
    此時(shí)寄月,計(jì)算資源大部分時(shí)間處于空閑,造成資源浪費(fèi)
  • 作業(yè)的實(shí)際處理時(shí)間 約等于 批處理間隔
    此時(shí)无牵,能滿足處理漾肮,但數(shù)據(jù)具有波動(dòng)性,可能下一個(gè)時(shí)間不能滿足茎毁,造成延遲克懊。計(jì)算資源以初顯不足。
  • 介于以上兩者之間
    系統(tǒng)資源能正常滿足計(jì)算充岛,又不至于造成過(guò)多浪費(fèi)保檐,此時(shí)系統(tǒng)穩(wěn)定性良好耕蝉。
  • 作業(yè)的實(shí)際處理時(shí)間 大于 批處理間隔
    批處理間隔內(nèi)不能完成批作業(yè)崔梗,說(shuō)明計(jì)算資源不足。

因此垒在,采用 有效處理時(shí)間占比【有效處理時(shí)間占比 = 實(shí)際處理時(shí)間 / 批處理間隔】來(lái)評(píng)價(jià)當(dāng)前計(jì)算資源的過(guò)仕馄牵或不足扔亥。

4 、Streaming Executor DynamicAllocation機(jī)制的工作流程

  • 通過(guò)監(jiān)控組件谈为,獲取已完成批作業(yè)的實(shí)際執(zhí)行時(shí)間
  • 計(jì)算有效處理時(shí)間占比旅挤,然后與設(shè)置的閾值進(jìn)行比較,如果小于下限down,則結(jié)束一個(gè)Executor; 如果大于上限up伞鲫, 則增加Executor(個(gè)數(shù)由比率決定).

該功能啟用與否由參數(shù)spark.streaming.dynamicAllocation.enabled參數(shù)決定粘茄,默認(rèn)為false不開(kāi)啟。

5秕脓、源碼分析

與Spark Streaming Backpressure機(jī)制相同柒瓣,Spark Streaming Executor動(dòng)態(tài)伸縮機(jī)制也是以事件驅(qū)動(dòng)的形式工作的。其負(fù)責(zé)動(dòng)態(tài)伸縮的類(lèi)為ExecutorAllocationManager吠架,其繼承自StreamingListener芙贫。 類(lèi)定義結(jié)構(gòu)如下:

private[streaming] class ExecutorAllocationManager(
    client: ExecutorAllocationClient,
    receiverTracker: ReceiverTracker,
    conf: SparkConf,
    batchDurationMs: Long,
    clock: Clock) extends StreamingListener with Logging {
    ......
    ......
}

ExecutorAllocationManager 負(fù)責(zé)管理分配給StreamingContext(Streaming應(yīng)用)的Executor.其通過(guò)分析Streaming作業(yè)的監(jiān)控信息,動(dòng)態(tài)的伸請(qǐng)或釋放executor.

5.1 ExecutorAllocationManager 注冊(cè)與啟動(dòng)

JobScheduler啟動(dòng)時(shí)會(huì)創(chuàng)建ExecutorAllocationManager 并向ListenerBus注冊(cè)并開(kāi)啟監(jiān)聽(tīng)傍药。


  def start(): Unit = synchronized {
    ......

    val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
      case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
      case _ => null
    }
    ......
    executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
      executorAllocClient,
    executorAllocationManager.foreach(ssc.addStreamingListener)
   ......
    executorAllocationManager.foreach(_.start())
    logInfo("Started JobScheduler")
  }

其中executorAllocationManager的start方法定義如下:

  def start(): Unit = {
    timer.start()
    logInfo(s"ExecutorAllocationManager started with " +
      s"ratios = [$scalingUpRatio, $scalingDownRatio] and interval = $scalingIntervalSecs sec")
  }

  private val timer = new RecurringTimer(clock, scalingIntervalSecs * 1000,
    _ => manageAllocation(), "streaming-executor-allocation-manager")

其將開(kāi)啟定時(shí)器磺平,周期性的執(zhí)行manageAllocation方法,其時(shí)間周期由參數(shù) “spark.streaming.dynamicAllocation.scalingInterval”決定拐辽,默認(rèn)為60s.
即:默認(rèn)情況下每隔60s 執(zhí)行一次manageAllocation. managerAllocation會(huì)使用歷史作業(yè)執(zhí)行信息計(jì)算出有效處理時(shí)間占比ratio, 并依據(jù)占比與預(yù)設(shè)閾值的關(guān)系決定增減資源拣挪。預(yù)設(shè)的閾值信息及增減策略為:

  • scalingUpRatio
    閾值上限,由參數(shù)“park.streaming.dynamicAllocation.scalingUpRatio”控制薛训,默認(rèn)值為0.9媒吗。當(dāng)計(jì)算出的ratio大于scalingUpRatio 時(shí),將按如下算式計(jì)算出的值乙埃,增加若干Executor闸英。
 math.max(math.round(ratio).toInt, 1)
  • scalingDownRatio
    閾值下限, 由參數(shù)“spark.streaming.dynamicAllocation.scalingDownRatio”介袜,默認(rèn)值為0.3甫何。 當(dāng)計(jì)算出的ratio小于scalingDownRatio時(shí),將減少一個(gè)Executor.

manageAllocation的實(shí)現(xiàn)如下:

/**
   * Manage executor allocation by requesting or killing executors based on the collected
   * batch statistics.
   */
  private def manageAllocation(): Unit = synchronized {
    logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, $scalingDownRatio]")
    if (batchProcTimeCount > 0) {
      val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount
      val ratio = averageBatchProcTime.toDouble / batchDurationMs
      logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" )
      if (ratio >= scalingUpRatio) {
        logDebug("Requesting executors")
        val numNewExecutors = math.max(math.round(ratio).toInt, 1)
        requestExecutors(numNewExecutors)
      } else if (ratio <= scalingDownRatio) {
        logDebug("Killing executors")
        killExecutor()
      }
    }
    batchProcTimeSum = 0
    batchProcTimeCount = 0
  }

經(jīng)過(guò)分析代碼可知遇伞,初始情況下辙喂,因if條件batchProcTimeCount > 0不滿足,上述機(jī)制并不會(huì)執(zhí)行鸠珠。只有onBatchCompleted事件觸發(fā)之后巍耗,即有作業(yè)執(zhí)行完成,可以做為判斷依據(jù)之后渐排,manageAllocation()才會(huì)正式生效炬太。

5.2 事件觸發(fā)

Executor dynamicAllocation機(jī)制是依據(jù)完成的批Job的執(zhí)行信息進(jìn)行決策,其在批Job執(zhí)行完成時(shí)會(huì)收集作業(yè)驯耻。
其事件觸發(fā)過(guò)程亲族,同“SparkStreaming Backpressure分析”中"3.3.1 BatchCompleted觸發(fā)過(guò)程"一節(jié)過(guò)程相同炒考,不再贅述。

5.3 事件處理

onBatchCompleted的事件處理定義如下:

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
    logDebug("onBatchCompleted called: " + batchCompleted)
    if (!batchCompleted.batchInfo.outputOperationInfos.values.exists(_.failureReason.nonEmpty)) {
      batchCompleted.batchInfo.processingDelay.foreach(addBatchProcTime)
    }
  }

當(dāng)onBatchCompleted事件處發(fā)時(shí)霎迫,將執(zhí)行addBatchProcTime, 為batchProcTimeSum及batchProcTimeCount 增值斋枢。

 private def addBatchProcTime(timeMs: Long): Unit = synchronized {
    batchProcTimeSum += timeMs
    batchProcTimeCount += 1
    logDebug(
      s"Added batch processing time $timeMs, sum = $batchProcTimeSum, count = $batchProcTimeCount")
  }

5.4 生效時(shí)機(jī)

當(dāng)Timer 再次觸發(fā)時(shí),manageAllocation 的if條件將滿足知给,其會(huì)依據(jù)決策信息對(duì)Executor進(jìn)行增減操作瓤帚。 同時(shí)重置batchProcTimeSum及batchProcTimeCount的值,開(kāi)始下一輪次的準(zhǔn)備工作涩赢。

6缘滥、與Spark Core中Executor DynamicAllocation 機(jī)制的區(qū)別

Spark Core 中的DynamicAllocation機(jī)制同Spark Streaming中一樣,也是周期性調(diào)度評(píng)估機(jī)制確定是否要進(jìn)行Executor增減谒主。不同的是朝扼,Spark Core中應(yīng)用是批處理應(yīng)用,執(zhí)行完成之后霎肯,即可以結(jié)束擎颖,因此其使用idle策略來(lái)評(píng)估Executor的增減,具體策略為:

  • 減少Executor
    應(yīng)用擁有的Executor數(shù)量在處理當(dāng)前負(fù)載時(shí)綽綽有余观游,通過(guò)縮減Executor仍然能夠一次性執(zhí)行所有任務(wù)(running + pending task)時(shí)搂捧,會(huì)通過(guò)一定的策略結(jié)束掉部分Executor,達(dá)到節(jié)省資源的目的懂缕。Spark Core中的策略為如果監(jiān)測(cè)到一個(gè)Executor空閑了K 秒允跑,這意味著其在未來(lái)不再執(zhí)行任務(wù),則可以將其移除搪柑。其中參數(shù)K由“spark.dynamicAllocation.executorIdleTimeout ”配制聋丝,默認(rèn)值為60s.
  • 增加Executor
    應(yīng)用擁有的Executor數(shù)量不足以及時(shí)處理當(dāng)前負(fù)載,存在任務(wù)長(zhǎng)時(shí)間堆集工碾,則要增加Executor弱睦。Spark Core中的策略為如果監(jiān)測(cè)到任務(wù)調(diào)度隊(duì)列中的任務(wù)N秒內(nèi)沒(méi)有被調(diào)度,則會(huì)增加新的Executor渊额,如果再過(guò)M秒還未進(jìn)行調(diào)度况木,則以指數(shù)方式繼續(xù)增加,直到上限旬迹。

由于Spark Streaming應(yīng)用是長(zhǎng)期存在的火惊、微批處理應(yīng)用。其每隔一個(gè)小的時(shí)間間隔(batchInterval)就會(huì)提交微批處理應(yīng)用奔垦。其會(huì)使Spark Core中的Executor DynamicAllocation 機(jī)制的idle 策略受阻屹耐,因此Spark Streaming 采用有效處理時(shí)間占比radio來(lái)進(jìn)行決策。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末宴倍,一起剝皮案震驚了整個(gè)濱河市张症,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌鸵贬,老刑警劉巖俗他,帶你破解...
    沈念sama閱讀 206,378評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異阔逼,居然都是意外死亡兆衅,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)嗜浮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)羡亩,“玉大人,你說(shuō)我怎么就攤上這事危融∥访” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,702評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵吉殃,是天一觀的道長(zhǎng)辞居。 經(jīng)常有香客問(wèn)我,道長(zhǎng)蛋勺,這世上最難降的妖魔是什么瓦灶? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,259評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮抱完,結(jié)果婚禮上贼陶,老公的妹妹穿的比我還像新娘。我一直安慰自己巧娱,他們只是感情好碉怔,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,263評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著禁添,像睡著了一般眨层。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上上荡,一...
    開(kāi)封第一講書(shū)人閱讀 49,036評(píng)論 1 285
  • 那天趴樱,我揣著相機(jī)與錄音,去河邊找鬼酪捡。 笑死叁征,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的逛薇。 我是一名探鬼主播捺疼,決...
    沈念sama閱讀 38,349評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼永罚!你這毒婦竟也來(lái)了啤呼?” 一聲冷哼從身側(cè)響起卧秘,我...
    開(kāi)封第一講書(shū)人閱讀 36,979評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎官扣,沒(méi)想到半個(gè)月后翅敌,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,469評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡惕蹄,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,938評(píng)論 2 323
  • 正文 我和宋清朗相戀三年蚯涮,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片卖陵。...
    茶點(diǎn)故事閱讀 38,059評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡遭顶,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出泪蔫,到底是詐尸還是另有隱情棒旗,我是刑警寧澤,帶...
    沈念sama閱讀 33,703評(píng)論 4 323
  • 正文 年R本政府宣布撩荣,位于F島的核電站嗦哆,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏婿滓。R本人自食惡果不足惜老速,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,257評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望凸主。 院中可真熱鬧橘券,春花似錦、人聲如沸卿吐。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,262評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)嗡官。三九已至箭窜,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間衍腥,已是汗流浹背磺樱。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留婆咸,地道東北人竹捉。 一個(gè)月前我還...
    沈念sama閱讀 45,501評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像尚骄,于是被迫代替她去往敵國(guó)和親块差。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,792評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容