—————?—————?—————?—————?—————?—————
Spark Streaming概述
Spark Streaming 初始化過(guò)程
Spark Streaming Receiver啟動(dòng)過(guò)程分析
Spark Streaming 數(shù)據(jù)準(zhǔn)備階段分析(Receiver方式)
Spark Streaming 數(shù)據(jù)計(jì)算階段分析
SparkStreaming Backpressure分析
Spark Streaming Executor DynamicAllocation 機(jī)制分析
—————?—————?—————?—————?—————?—————
1黍判、 引入Streaming Executor DynamicAllocation 機(jī)制的原因
在Spark Streaming喳瓣,作業(yè)的執(zhí)行是以批處理的方式進(jìn)行的醇份,批處理間隔內(nèi)(batch interval)要完成對(duì)批量作業(yè)的執(zhí)行吝沫,這就要求作業(yè)的執(zhí)行時(shí)間(process time)不大于設(shè)定的批處理間隔须蜗。在計(jì)算資源一定的情況下垢村,執(zhí)行時(shí)間與待處理的數(shù)據(jù)規(guī)模成正比躏结。在大數(shù)據(jù)流式計(jì)算環(huán)境中劫狠,數(shù)據(jù)的產(chǎn)生完全由數(shù)據(jù)源決定,由于不同的數(shù)據(jù)源在不同時(shí)空范圍內(nèi)的狀態(tài)不統(tǒng)一且發(fā)生動(dòng)態(tài)變化潜的,導(dǎo)致數(shù)據(jù)流的速率呈現(xiàn)出了突發(fā)性的特征骚揍。前一時(shí)刻數(shù)據(jù)速率和后一時(shí)刻數(shù)據(jù)速率可能會(huì)有巨大的差異, 因此不同批次接收的數(shù)據(jù)總量存在差異啰挪,不同批次的執(zhí)行時(shí)間也就會(huì)存在差異信不。這種差異是由于計(jì)算資源與數(shù)據(jù)規(guī)模不匹配造成的。在Spark 2.0以前亡呵, Streaming的資源分配都采取資源預(yù)先分配的策略抽活,資源管理器依據(jù)應(yīng)用的申請(qǐng)量予以提前分配,在應(yīng)用執(zhí)行期間不能依據(jù)應(yīng)用實(shí)際的計(jì)算資源需求量進(jìn)行調(diào)整锰什,執(zhí)行作業(yè)時(shí)會(huì)存在資源過(guò)氏滤叮或資源不足的情況丁逝。
2、Spark Streaming 的計(jì)算特征
Spark Streaming應(yīng)用的執(zhí)行過(guò)程可以分成數(shù)據(jù)準(zhǔn)備和數(shù)據(jù)計(jì)算兩個(gè)階段卵牍,兩個(gè)階段分別由不同的作業(yè)進(jìn)行處理果港,分別是數(shù)據(jù)接收作業(yè)(提交Receiver時(shí)的Job)和數(shù)據(jù)處理作業(yè)(Batch Job)。在大數(shù)據(jù)流式計(jì)算中糊昙,數(shù)據(jù)是實(shí)時(shí)產(chǎn)生辛掠、動(dòng)態(tài)增加的,只要數(shù)據(jù)源處于活動(dòng)狀態(tài),數(shù)據(jù)就會(huì)一直產(chǎn)生释牺,因此要求數(shù)據(jù)接收作業(yè)不間斷的接收數(shù)據(jù)萝衩,并將接收的數(shù)據(jù)分割成批,供數(shù)據(jù)處理作業(yè)消費(fèi)没咙。數(shù)據(jù)處理作業(yè)以批處理方式運(yùn)行猩谊。一般情況下,批處理的執(zhí)行時(shí)間不大于其批處理間隔祭刚,一個(gè)批次可以分成作業(yè)實(shí)際處理階段和等待下一個(gè)批次階段牌捷。綜上所述,Spark Streaming的計(jì)算特征如下圖所示(實(shí)際處理時(shí)間<批處理間隔):
另外涡驮,特殊情況下暗甥,會(huì)存在實(shí)際處理階段所用的時(shí)間 > 批處理時(shí)間。這種情況下捉捅,一般是由于作業(yè)處理能力弱引起的撤防。
3、Streaming Executor DynamicAllocation機(jī)制的評(píng)價(jià)指標(biāo)
經(jīng)過(guò)上述分析棒口,作業(yè)的實(shí)際處理時(shí)間與設(shè)定的批處理間隔之間存在的關(guān)系如下:
- 作業(yè)的實(shí)際處理時(shí)間 遠(yuǎn)小于 批處理間隔
此時(shí)寄月,計(jì)算資源大部分時(shí)間處于空閑,造成資源浪費(fèi) - 作業(yè)的實(shí)際處理時(shí)間 約等于 批處理間隔
此時(shí)无牵,能滿足處理漾肮,但數(shù)據(jù)具有波動(dòng)性,可能下一個(gè)時(shí)間不能滿足茎毁,造成延遲克懊。計(jì)算資源以初顯不足。 - 介于以上兩者之間
系統(tǒng)資源能正常滿足計(jì)算充岛,又不至于造成過(guò)多浪費(fèi)保檐,此時(shí)系統(tǒng)穩(wěn)定性良好耕蝉。 - 作業(yè)的實(shí)際處理時(shí)間 大于 批處理間隔
批處理間隔內(nèi)不能完成批作業(yè)崔梗,說(shuō)明計(jì)算資源不足。
因此垒在,采用 有效處理時(shí)間占比【有效處理時(shí)間占比 = 實(shí)際處理時(shí)間 / 批處理間隔】來(lái)評(píng)價(jià)當(dāng)前計(jì)算資源的過(guò)仕馄牵或不足扔亥。
4 、Streaming Executor DynamicAllocation機(jī)制的工作流程
- 通過(guò)監(jiān)控組件谈为,獲取已完成批作業(yè)的實(shí)際執(zhí)行時(shí)間
- 計(jì)算有效處理時(shí)間占比旅挤,然后與設(shè)置的閾值進(jìn)行比較,如果小于下限down,則結(jié)束一個(gè)Executor; 如果大于上限up伞鲫, 則增加Executor(個(gè)數(shù)由比率決定).
該功能啟用與否由參數(shù)spark.streaming.dynamicAllocation.enabled參數(shù)決定粘茄,默認(rèn)為false不開(kāi)啟。
5秕脓、源碼分析
與Spark Streaming Backpressure機(jī)制相同柒瓣,Spark Streaming Executor動(dòng)態(tài)伸縮機(jī)制也是以事件驅(qū)動(dòng)的形式工作的。其負(fù)責(zé)動(dòng)態(tài)伸縮的類(lèi)為ExecutorAllocationManager吠架,其繼承自StreamingListener芙贫。 類(lèi)定義結(jié)構(gòu)如下:
private[streaming] class ExecutorAllocationManager(
client: ExecutorAllocationClient,
receiverTracker: ReceiverTracker,
conf: SparkConf,
batchDurationMs: Long,
clock: Clock) extends StreamingListener with Logging {
......
......
}
ExecutorAllocationManager 負(fù)責(zé)管理分配給StreamingContext(Streaming應(yīng)用)的Executor.其通過(guò)分析Streaming作業(yè)的監(jiān)控信息,動(dòng)態(tài)的伸請(qǐng)或釋放executor.
5.1 ExecutorAllocationManager 注冊(cè)與啟動(dòng)
JobScheduler啟動(dòng)時(shí)會(huì)創(chuàng)建ExecutorAllocationManager 并向ListenerBus注冊(cè)并開(kāi)啟監(jiān)聽(tīng)傍药。
def start(): Unit = synchronized {
......
val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
case _ => null
}
......
executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
executorAllocClient,
executorAllocationManager.foreach(ssc.addStreamingListener)
......
executorAllocationManager.foreach(_.start())
logInfo("Started JobScheduler")
}
其中executorAllocationManager的start方法定義如下:
def start(): Unit = {
timer.start()
logInfo(s"ExecutorAllocationManager started with " +
s"ratios = [$scalingUpRatio, $scalingDownRatio] and interval = $scalingIntervalSecs sec")
}
private val timer = new RecurringTimer(clock, scalingIntervalSecs * 1000,
_ => manageAllocation(), "streaming-executor-allocation-manager")
其將開(kāi)啟定時(shí)器磺平,周期性的執(zhí)行manageAllocation方法,其時(shí)間周期由參數(shù) “spark.streaming.dynamicAllocation.scalingInterval”決定拐辽,默認(rèn)為60s.
即:默認(rèn)情況下每隔60s 執(zhí)行一次manageAllocation. managerAllocation會(huì)使用歷史作業(yè)執(zhí)行信息計(jì)算出有效處理時(shí)間占比ratio, 并依據(jù)占比與預(yù)設(shè)閾值的關(guān)系決定增減資源拣挪。預(yù)設(shè)的閾值信息及增減策略為:
- scalingUpRatio
閾值上限,由參數(shù)“park.streaming.dynamicAllocation.scalingUpRatio”控制薛训,默認(rèn)值為0.9媒吗。當(dāng)計(jì)算出的ratio大于scalingUpRatio 時(shí),將按如下算式計(jì)算出的值乙埃,增加若干Executor闸英。
math.max(math.round(ratio).toInt, 1)
- scalingDownRatio
閾值下限, 由參數(shù)“spark.streaming.dynamicAllocation.scalingDownRatio”介袜,默認(rèn)值為0.3甫何。 當(dāng)計(jì)算出的ratio小于scalingDownRatio時(shí),將減少一個(gè)Executor.
manageAllocation的實(shí)現(xiàn)如下:
/**
* Manage executor allocation by requesting or killing executors based on the collected
* batch statistics.
*/
private def manageAllocation(): Unit = synchronized {
logInfo(s"Managing executor allocation with ratios = [$scalingUpRatio, $scalingDownRatio]")
if (batchProcTimeCount > 0) {
val averageBatchProcTime = batchProcTimeSum / batchProcTimeCount
val ratio = averageBatchProcTime.toDouble / batchDurationMs
logInfo(s"Average: $averageBatchProcTime, ratio = $ratio" )
if (ratio >= scalingUpRatio) {
logDebug("Requesting executors")
val numNewExecutors = math.max(math.round(ratio).toInt, 1)
requestExecutors(numNewExecutors)
} else if (ratio <= scalingDownRatio) {
logDebug("Killing executors")
killExecutor()
}
}
batchProcTimeSum = 0
batchProcTimeCount = 0
}
經(jīng)過(guò)分析代碼可知遇伞,初始情況下辙喂,因if條件batchProcTimeCount > 0不滿足,上述機(jī)制并不會(huì)執(zhí)行鸠珠。只有onBatchCompleted事件觸發(fā)之后巍耗,即有作業(yè)執(zhí)行完成,可以做為判斷依據(jù)之后渐排,manageAllocation()才會(huì)正式生效炬太。
5.2 事件觸發(fā)
Executor dynamicAllocation機(jī)制是依據(jù)完成的批Job的執(zhí)行信息進(jìn)行決策,其在批Job執(zhí)行完成時(shí)會(huì)收集作業(yè)驯耻。
其事件觸發(fā)過(guò)程亲族,同“SparkStreaming Backpressure分析”中"3.3.1 BatchCompleted觸發(fā)過(guò)程"一節(jié)過(guò)程相同炒考,不再贅述。
5.3 事件處理
onBatchCompleted的事件處理定義如下:
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
logDebug("onBatchCompleted called: " + batchCompleted)
if (!batchCompleted.batchInfo.outputOperationInfos.values.exists(_.failureReason.nonEmpty)) {
batchCompleted.batchInfo.processingDelay.foreach(addBatchProcTime)
}
}
當(dāng)onBatchCompleted事件處發(fā)時(shí)霎迫,將執(zhí)行addBatchProcTime, 為batchProcTimeSum及batchProcTimeCount 增值斋枢。
private def addBatchProcTime(timeMs: Long): Unit = synchronized {
batchProcTimeSum += timeMs
batchProcTimeCount += 1
logDebug(
s"Added batch processing time $timeMs, sum = $batchProcTimeSum, count = $batchProcTimeCount")
}
5.4 生效時(shí)機(jī)
當(dāng)Timer 再次觸發(fā)時(shí),manageAllocation 的if條件將滿足知给,其會(huì)依據(jù)決策信息對(duì)Executor進(jìn)行增減操作瓤帚。 同時(shí)重置batchProcTimeSum及batchProcTimeCount的值,開(kāi)始下一輪次的準(zhǔn)備工作涩赢。
6缘滥、與Spark Core中Executor DynamicAllocation 機(jī)制的區(qū)別
Spark Core 中的DynamicAllocation機(jī)制同Spark Streaming中一樣,也是周期性調(diào)度評(píng)估機(jī)制確定是否要進(jìn)行Executor增減谒主。不同的是朝扼,Spark Core中應(yīng)用是批處理應(yīng)用,執(zhí)行完成之后霎肯,即可以結(jié)束擎颖,因此其使用idle策略來(lái)評(píng)估Executor的增減,具體策略為:
- 減少Executor
應(yīng)用擁有的Executor數(shù)量在處理當(dāng)前負(fù)載時(shí)綽綽有余观游,通過(guò)縮減Executor仍然能夠一次性執(zhí)行所有任務(wù)(running + pending task)時(shí)搂捧,會(huì)通過(guò)一定的策略結(jié)束掉部分Executor,達(dá)到節(jié)省資源的目的懂缕。Spark Core中的策略為如果監(jiān)測(cè)到一個(gè)Executor空閑了K 秒允跑,這意味著其在未來(lái)不再執(zhí)行任務(wù),則可以將其移除搪柑。其中參數(shù)K由“spark.dynamicAllocation.executorIdleTimeout ”配制聋丝,默認(rèn)值為60s. - 增加Executor
應(yīng)用擁有的Executor數(shù)量不足以及時(shí)處理當(dāng)前負(fù)載,存在任務(wù)長(zhǎng)時(shí)間堆集工碾,則要增加Executor弱睦。Spark Core中的策略為如果監(jiān)測(cè)到任務(wù)調(diào)度隊(duì)列中的任務(wù)N秒內(nèi)沒(méi)有被調(diào)度,則會(huì)增加新的Executor渊额,如果再過(guò)M秒還未進(jìn)行調(diào)度况木,則以指數(shù)方式繼續(xù)增加,直到上限旬迹。
由于Spark Streaming應(yīng)用是長(zhǎng)期存在的火惊、微批處理應(yīng)用。其每隔一個(gè)小的時(shí)間間隔(batchInterval)就會(huì)提交微批處理應(yīng)用奔垦。其會(huì)使Spark Core中的Executor DynamicAllocation 機(jī)制的idle 策略受阻屹耐,因此Spark Streaming 采用有效處理時(shí)間占比radio來(lái)進(jìn)行決策。