StructedStreaming 流程分析
導言
Spark在2.*版本后加入StructedStreaming模塊鲫骗,與流處理引擎Sparkstreaming一樣,用于處理流數(shù)據(jù)于游。但二者又有許多不同之處杂拨。
Sparkstreaming首次引入在0.*版本葬馋,其核心思想是利用spark批處理框架砍艾,以microbatch(以一段時間的流作為一個batch)的方式蒂教,完成對流數(shù)據(jù)的處理。
StructedStreaming誕生于2.*版本脆荷,主要用于處理結(jié)構化流數(shù)據(jù)凝垛,與Sparkstreaming不同的是StructedStrreaming不再是microbatch的處理方式,而是可以"不停的"循環(huán)從數(shù)據(jù)源獲取數(shù)據(jù)蜓谋。從而實現(xiàn)真正的流處理梦皮。以dataset為代表的帶有結(jié)構化(schema信息)的數(shù)據(jù)處理由于鎢絲計劃的完成,表現(xiàn)出更優(yōu)越的性能桃焕。同時Structedstreaming可以從數(shù)據(jù)中獲取時間(eventTime)剑肯,從而可以針對流數(shù)據(jù)的生產(chǎn)時間而非收到數(shù)據(jù)的時間進行處理。
StructedStreaming的相關介紹可參考(http://spark.apache.org/docs/2.3.0/structured-streaming-programming-guide.html)观堂。本文對StructedStreaming的流程/機制進行分析
開發(fā)structedStreaming應用
StructedStreaming應用開發(fā)流程
從官網(wǎng)/源碼中可以看到structedstreaming應用的開發(fā)
除了spark的初始化工作让网,通常有三步與業(yè)務相關的操作:
-
獲取輸入數(shù)據(jù)源(可以理解為source)
val lines = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option(subscribeType, topics) .load() .selectExpr("CAST(value AS STRING)") .as[String]
根據(jù)業(yè)務邏輯對數(shù)據(jù)進行轉(zhuǎn)換處理 (業(yè)務處理)
wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
- 將處理結(jié)果寫入第三方數(shù)據(jù)源,整個流應用通過query.start啟動(可以理解為sink)
query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", checkpointLocation)
.start()
query.awaitTermination()
流數(shù)據(jù)的讀取
通過DataStreamReader類完成應用層與不同的流source源的reader隔離。load方法會為應用獲取數(shù)據(jù)的邏輯
在處理數(shù)據(jù)源時框架使用serviceload機制型将,將所有集成DataSourceRegister的類加載如內(nèi)存寂祥,判斷對應source的shortName是否與設置的一致,如果一致七兜,則實例化此類。并根據(jù)此類屬性生成對應的dataframe福扬。
當前支持的source源有如下:
Source名 | Source源 |
---|---|
MemorySource | 測試用 |
TextSocketSource | 用于展示使用 |
FileStreamSource | 從固定目下下讀文件 |
KafkaSource | kafka作為數(shù)據(jù)源 |
RateStreamSource | 固定速率的消息生成器腕铸,自增長的long型和時間戳 |
流數(shù)據(jù)的寫出
數(shù)據(jù)的寫出需要選擇寫出模式以及寫出的sink源
寫出模式:append,update,complete。 Structed streaming對寫出模式的支持與數(shù)據(jù)處理時使用到的算子有關铛碑。需要根據(jù)需求狠裹,處理邏輯選合適的寫出模式∑常可參考(http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes). Structed streaming對一些輸出模式和算子的支持情況的校驗可參考org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
sink源的寫出:
在處理sink源時框架依然使用serviceload機制涛菠,將所有集成DataSourceRegister的類加載如內(nèi)存,判斷對應source的shortName是否與設置的一致,如果一致俗冻,則實例化此類
當前實現(xiàn)的sink
Sink名 | sink目的地 |
---|---|
memorysink | 測試用 |
foreachSink | 需要實現(xiàn)foreachwriter礁叔,用于定制化sink |
kafkaSink | 寫出數(shù)據(jù)到kafka |
fileformatSink | 寫出數(shù)據(jù)到hdfs。支持ORC溜徙,parquet等 |
StructedStreaming深入理解
對于structed streaming有如上理解即可開發(fā)相關應用诗眨。但structedstreaming的實現(xiàn)機制依然值得深究至朗,尤其是structedstreaming是job是如何觸發(fā)機制,watermark是如何實現(xiàn)的涣易,狀態(tài)數(shù)據(jù)是如何保存并用戶應用恢復的。如下對這三個“問題”進行分析
About Trigger
與sparkstreaming基于定時器產(chǎn)生job然后調(diào)度的機制不同冶伞,structedstreaming實現(xiàn)了一套新的job觸發(fā)機制(trigger)新症。類似于flink這就是trigger機制。
trigger的設置
通過DataStreamWriter.trigger()完成對trigger設置响禽。默認的trigger為ProcessingTime(interval)账劲,interval默認為0
trigger的分類
trigger有三種,OneTimeTrigger只會觸發(fā)一次計算金抡。在流應用中一般使用ProcessingTime和ContinuousTrigger兩種瀑焦,下面對著兩種trigger進行對比
Trigger類 | ProcessingTime | Continuous |
---|---|---|
對應execution | MicroBatchExecution | ContinuousExecution |
工作模式 | 以一定間隔(interval)調(diào)度計算邏輯,間隔為0時梗肝,上批次調(diào)用完成后榛瓮,立即進入下一批次調(diào)用一直調(diào)用,退化為類似sparkstreaming的micro batch的流處理 | 以一定間隔(interval)查看流計算狀態(tài) |
支持API | 支持API豐富巫击,如匯聚禀晓,關聯(lián)等操作 | 僅簡單的projection類(map,select等) |
備注 | total-cores個數(shù)大于partition數(shù)坝锰,task長時運行 |
ProcessingTime
在使用ProcessingTime Trigger時粹懒,對應的執(zhí)行引擎為MicrobatchExecution。
Trigger調(diào)度機制如下:
override def execute(triggerHandler: () => Boolean): Unit = {
while (true) {
val triggerTimeMs = clock.getTimeMillis
val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)
val terminated = !triggerHandler()
if (intervalMs > 0) {
val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs
if (batchElapsedTimeMs > intervalMs) {
notifyBatchFallingBehind(batchElapsedTimeMs)
}
if (terminated) {
return
}
clock.waitTillTime(nextTriggerTimeMs)
} else {
if (terminated) {
return
}
}
}
}
ProcessingTime Trigger循環(huán)調(diào)度每執(zhí)行邏輯:
triggerExecutor.execute(() => {
startTrigger()
if (isActive) {
reportTimeTaken("triggerExecution") {
if (currentBatchId < 0) {
// We'll do this initialization only once
populateStartOffsets(sparkSessionForStream)
...
} else {
constructNextBatch()
}
if (dataAvailable) {
currentStatus = currentStatus.copy(isDataAvailable = true)
updateStatusMessage("Processing new data")
runBatch(sparkSessionForStream)
}
}
// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
if (dataAvailable) {
// Update committed offsets.
commitLog.add(currentBatchId)
committedOffsets ++= availableOffsets
currentBatchId += 1
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
} else {
currentStatus = currentStatus.copy(isDataAvailable = false)
updateStatusMessage("Waiting for data to arrive")
Thread.sleep(pollingDelayMs)
}
}
updateStatusMessage("Waiting for next trigger")
isActive
})
ContinuousTrigger
在使用ContinuousTrigger時顷级,對應的執(zhí)行邏輯為continuousExecution凫乖。在調(diào)度時,Trigger退化為ProcessingTime Trigger弓颈。僅僅對執(zhí)行狀態(tài)查詢帽芽,記錄
Continuous執(zhí)行邏輯
triggerExecutor.execute(() => {
startTrigger()
if (reader.needsReconfiguration() && state.compareAndSet(ACTIVE, RECONFIGURING)) {
stopSources()
if (queryExecutionThread.isAlive) {
sparkSession.sparkContext.cancelJobGroup(runId.toString)
queryExecutionThread.interrupt()
}
false
} else if (isActive) {
currentBatchId = epochEndpoint.askSync[Long](IncrementAndGetEpoch)
logInfo(s"New epoch $currentBatchId is starting.")
true
} else {
false
}
})
在ContinuousDataSourceRDD的compute方法中可以看出,其計算邏輯如下:
* 通過一個名為**continuous-reader--${context.partitionId()}--" +
s"${context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)}** 的線程實時獲取數(shù)據(jù)翔冀,放入名為queue的隊列中导街。
* worker線程則長時間運行,在計算時則是從queue中實時獲取消息處理纤子。
About waternark
StructedStreaming的與sparkstreaming相比一大特性就是支持基于數(shù)據(jù)中的時間戳的數(shù)據(jù)處理搬瑰。也就是在處理數(shù)據(jù)時款票,可以對記錄中的字段的時間進行考慮。eventTime更好的代表數(shù)據(jù)本身的信息泽论。
可以獲取消息本身的時間戳之后艾少,就可以根據(jù)該時間戳來判斷消息的到達是否延遲(亂序)以及延遲的時間是否在容忍的范圍內(nèi)。該判斷方法是根據(jù)watermark機制來設置和判斷消息的有效性(延遲是否在可容忍范圍內(nèi))
watermark的設置
通過dataset.withWatermark()完成對watermark的設置
watermark的生成/更新
在driver內(nèi)注冊一個累加器eventTimeStats佩厚;
-
在一個批次計算內(nèi)姆钉,executor的各task根據(jù)各自分區(qū)內(nèi)的消息的時間戳,來更新累加器
executor中各task獲取分區(qū)的eventtime信息方式如下: 在EventTimeWatermarkExec中的doExecute方法中 iter.map { row => eventTimeStats.add(getEventTime(row).getLong(0) / 1000) row } def add(eventTime: Long): Unit = { this.max = math.max(this.max, eventTime) this.min = math.min(this.min, eventTime) this.count += 1 this.avg += (eventTime - avg) / count }
-
在driver端生成batch時抄瓦,獲取各個操作/plan的watermark潮瓶,找出操作的最小的watermark時間點,寫入offsetSeqMetadata,同時寫入offsetlog
// 計算各plan的watermark lastExecution.executedPlan.collect { case e: EventTimeWatermarkExec => e }.zipWithIndex.foreach { case (e, index) if e.eventTimeStats.value.count > 0 => logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}") val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs val prevWatermarkMs = watermarkMsMap.get(index) if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) { watermarkMsMap.put(index, newWatermarkMs) } //找出watermark中最小值 if(!watermarkMsMap.isEmpty) { val newWatermarkMs = watermarkMsMap.minBy(_._2)._2 if (newWatermarkMs > batchWatermarkMs) { logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms") batchWatermarkMs = newWatermarkMs } //寫入offsetSeqMetadata offsetSeqMetadata = offsetSeqMetadata.copy( batchWatermarkMs = batchWatermarkMs, batchTimestampMs = triggerClock.getTimeMillis()) //寫入offsetlog offsetLog.add( currentBatchId, availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)
-
根據(jù)watermark在讀消息時過濾數(shù)據(jù)
StreamingSymmetricHashJoinExec -> doExecutor -> processPartitions -> StoreAndJoinWithOtherSide中有如下操作: val nonLateRows = WatermarkSupport.watermarkExpression(watermarkAttribute, eventTimeWatermark) match { case Some(watermarkExpr) => val predicate = newPredicate(watermarkExpr, inputAttributes) inputIter.filter { row => !predicate.eval(row) } case None => inputIter }
About state:
流應用中钙姊,如果有狀態(tài)相關的如匯聚毯辅,關聯(lián)等操作,需要再應用中將部分數(shù)據(jù)進行緩存煞额,structedstreaming中通過statestore來對數(shù)據(jù)緩存以備后續(xù)計算及異乘伎郑恢復使用
當前的statestore的實現(xiàn)僅HDFSBackedStateStore,由HDFSBackedStateStoreProvider生成和管理; 每個HDFSBackedStateStoreProvider對應一個目錄。該目錄為{storeName}.
其中checkpointLocation是query中設置的路徑膊毁,storeName是store分類胀莹,在關聯(lián)中有如如下storeType(如left-keyToNumValues)
每個statestore對應一個versionId.delta文件 {storeName}/versionId.delta。
狀態(tài)數(shù)據(jù)的寫入:
在在一些有狀態(tài)的操作如關聯(lián)匯聚等婚温,部分數(shù)據(jù)需要保存以備后續(xù)計算使用描焰,
store的put操作:
只有需要存儲部分狀態(tài)的操作/算子需要對狀態(tài)數(shù)據(jù)進行緩存。從源碼中查看栅螟,有如下算子:
StateStoreSaveExec
FlatMapGroupsWithStateExec
SymmetricHashJoinStateManager
以流關聯(lián)操作為例荆秦,介紹SymmetricHashJoinStateManager中的state寫流程如下:
1) 將數(shù)據(jù)寫入state文件:在StreamingSymmetricHashJoinExec的doExecute方法中,調(diào)用到processPartitions力图,會調(diào)用到OneSideHashJoiner的storeAndJoinWithOtherSide方法步绸,會根據(jù)條件判斷該記錄是否寫入臨時文件的輸出流中。判斷條件condition ( !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow))
2) 在計算節(jié)結(jié)束后吃媒,將statestore數(shù)據(jù)寫入磁盤
StreamingSymmetricHashJoinExec -> onOutputCompletion -> leftSideJoiner.commitStateAndGetMetrics -> joinStateManager.commit -> keyToNumValues.commit -> StateStoreHandler.commit -> HDFSBackedStateStore.commit
狀態(tài)數(shù)據(jù)的讀热拷椤:
在一些有狀態(tài)的操作如關聯(lián)匯聚等,需要對“歷史/之前批次”數(shù)據(jù)進行“緩存”晓折,以備下次計算時惑朦,讀取使用。
有兩處讀取store的邏輯
1) statestoreRdd的compute方法
2)StreamingSymmetricHashJoinExec -> doExecutor -> processPartitions -> OneSideHashJoiner.init -> SymmetricHashJoinStateManager.init -> KeyToNumValuesStore.init -> getStateStore -> stateStore.get ->storeProvider.getStore
狀態(tài)數(shù)據(jù)的管理/maintain
在executor內(nèi)部漓概,對于每一個分片啟動一個線程定期“compact”中間數(shù)據(jù),周期由spark.sql.streaming.stateStore.maintenanceInterval參數(shù)控制病梢,默認為60s胃珍,線程名 : state-store-maintenance-task 主要工作是掃描delta文件梁肿,生成snapshot文件,清理陳舊文件觅彰。
生成snapshot文件具體邏輯:
1) 掃描目錄下的文件吩蔑,找出delta文件當前最大的版本號Max(d)(delta文件的命名方式Int.delta,版本號為Int值填抬,如10.delta烛芬,則版本號為10)
2) 找出當前最大的snapshot文件的版本號Max(s)(delta文件的命名方式Int.snapshot,版本號為Int值飒责,如10.snapshot赘娄,則版本號為10)
3) 當Max(d) - Max(s) 大于spark.sql.streaming.stateStore.minDeltasForSnapshot(默認為10)時,進行打快照操作宏蛉。否則遣臼,跳過。
陳舊文件清理:
1) 找出當前文件的最大版本號Max(v)
2) MaxversionToRetain = Max(v) - spark.sql.streaming.minBatchesToRetain(默認100)時拾并,當MaxversionToRetain > 0 時清理所有版本號小于MaxversionToRetain的文件揍堰。否則,跳過