Structured Streaming 分析

StructedStreaming 流程分析

導言

Spark在2.*版本后加入StructedStreaming模塊鲫骗,與流處理引擎Sparkstreaming一樣,用于處理流數(shù)據(jù)于游。但二者又有許多不同之處杂拨。

Sparkstreaming首次引入在0.*版本葬馋,其核心思想是利用spark批處理框架砍艾,以microbatch(以一段時間的流作為一個batch)的方式蒂教,完成對流數(shù)據(jù)的處理。

StructedStreaming誕生于2.*版本脆荷,主要用于處理結(jié)構化流數(shù)據(jù)凝垛,與Sparkstreaming不同的是StructedStrreaming不再是microbatch的處理方式,而是可以"不停的"循環(huán)從數(shù)據(jù)源獲取數(shù)據(jù)蜓谋。從而實現(xiàn)真正的流處理梦皮。以dataset為代表的帶有結(jié)構化(schema信息)的數(shù)據(jù)處理由于鎢絲計劃的完成,表現(xiàn)出更優(yōu)越的性能桃焕。同時Structedstreaming可以從數(shù)據(jù)中獲取時間(eventTime)剑肯,從而可以針對流數(shù)據(jù)的生產(chǎn)時間而非收到數(shù)據(jù)的時間進行處理。

StructedStreaming的相關介紹可參考(http://spark.apache.org/docs/2.3.0/structured-streaming-programming-guide.html)观堂。本文對StructedStreaming的流程/機制進行分析

開發(fā)structedStreaming應用

StructedStreaming應用開發(fā)流程

從官網(wǎng)/源碼中可以看到structedstreaming應用的開發(fā)
除了spark的初始化工作让网,通常有三步與業(yè)務相關的操作:

  1. 獲取輸入數(shù)據(jù)源(可以理解為source)

     val lines = spark
     .readStream
     .format("kafka")
     .option("kafka.bootstrap.servers", bootstrapServers)
     .option(subscribeType, topics)
     .load()
     .selectExpr("CAST(value AS STRING)")
     .as[String]
    
  2. 根據(jù)業(yè)務邏輯對數(shù)據(jù)進行轉(zhuǎn)換處理 (業(yè)務處理)

 wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
  1. 將處理結(jié)果寫入第三方數(shù)據(jù)源,整個流應用通過query.start啟動(可以理解為sink)
query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .option("checkpointLocation", checkpointLocation)
      .start()
      query.awaitTermination()

流數(shù)據(jù)的讀取

通過DataStreamReader類完成應用層與不同的流source源的reader隔離。load方法會為應用獲取數(shù)據(jù)的邏輯

在處理數(shù)據(jù)源時框架使用serviceload機制型将,將所有集成DataSourceRegister的類加載如內(nèi)存寂祥,判斷對應source的shortName是否與設置的一致,如果一致七兜,則實例化此類。并根據(jù)此類屬性生成對應的dataframe福扬。

當前支持的source源有如下:

Source名 Source源
MemorySource 測試用
TextSocketSource 用于展示使用
FileStreamSource 從固定目下下讀文件
KafkaSource kafka作為數(shù)據(jù)源
RateStreamSource 固定速率的消息生成器腕铸,自增長的long型和時間戳

流數(shù)據(jù)的寫出

數(shù)據(jù)的寫出需要選擇寫出模式以及寫出的sink源

寫出模式:append,update,complete。 Structed streaming對寫出模式的支持與數(shù)據(jù)處理時使用到的算子有關铛碑。需要根據(jù)需求狠裹,處理邏輯選合適的寫出模式∑常可參考(http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes). Structed streaming對一些輸出模式和算子的支持情況的校驗可參考org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker

sink源的寫出:

在處理sink源時框架依然使用serviceload機制涛菠,將所有集成DataSourceRegister的類加載如內(nèi)存,判斷對應source的shortName是否與設置的一致,如果一致俗冻,則實例化此類

當前實現(xiàn)的sink

Sink名 sink目的地
memorysink 測試用
foreachSink 需要實現(xiàn)foreachwriter礁叔,用于定制化sink
kafkaSink 寫出數(shù)據(jù)到kafka
fileformatSink 寫出數(shù)據(jù)到hdfs。支持ORC溜徙,parquet等

StructedStreaming深入理解

對于structed streaming有如上理解即可開發(fā)相關應用诗眨。但structedstreaming的實現(xiàn)機制依然值得深究至朗,尤其是structedstreaming是job是如何觸發(fā)機制,watermark是如何實現(xiàn)的涣易,狀態(tài)數(shù)據(jù)是如何保存并用戶應用恢復的。如下對這三個“問題”進行分析

About Trigger

與sparkstreaming基于定時器產(chǎn)生job然后調(diào)度的機制不同冶伞,structedstreaming實現(xiàn)了一套新的job觸發(fā)機制(trigger)新症。類似于flink這就是trigger機制。

trigger的設置

通過DataStreamWriter.trigger()完成對trigger設置响禽。默認的trigger為ProcessingTime(interval)账劲,interval默認為0

trigger的分類

trigger有三種,OneTimeTrigger只會觸發(fā)一次計算金抡。在流應用中一般使用ProcessingTime和ContinuousTrigger兩種瀑焦,下面對著兩種trigger進行對比

Trigger類 ProcessingTime Continuous
對應execution MicroBatchExecution ContinuousExecution
工作模式 以一定間隔(interval)調(diào)度計算邏輯,間隔為0時梗肝,上批次調(diào)用完成后榛瓮,立即進入下一批次調(diào)用一直調(diào)用,退化為類似sparkstreaming的micro batch的流處理 以一定間隔(interval)查看流計算狀態(tài)
支持API 支持API豐富巫击,如匯聚禀晓,關聯(lián)等操作 僅簡單的projection類(map,select等)
備注 total-cores個數(shù)大于partition數(shù)坝锰,task長時運行

ProcessingTime

在使用ProcessingTime Trigger時粹懒,對應的執(zhí)行引擎為MicrobatchExecution。

Trigger調(diào)度機制如下:

override def execute(triggerHandler: () => Boolean): Unit = {
 while (true) {
  val triggerTimeMs = clock.getTimeMillis
  val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)
  val terminated = !triggerHandler()
  if (intervalMs > 0) {
    val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs
    if (batchElapsedTimeMs > intervalMs) {
      notifyBatchFallingBehind(batchElapsedTimeMs)
    }
    if (terminated) {
      return
    }
    clock.waitTillTime(nextTriggerTimeMs)
  } else {
    if (terminated) {
      return
    }
  }
}
}

ProcessingTime Trigger循環(huán)調(diào)度每執(zhí)行邏輯:

triggerExecutor.execute(() => {
  startTrigger()
  if (isActive) {
    reportTimeTaken("triggerExecution") {
      if (currentBatchId < 0) {
        // We'll do this initialization only once
        populateStartOffsets(sparkSessionForStream)
        ...
      } else {
        constructNextBatch()
      }
      if (dataAvailable) {
        currentStatus = currentStatus.copy(isDataAvailable = true)
        updateStatusMessage("Processing new data")
        runBatch(sparkSessionForStream)
      }
    }
    // Report trigger as finished and construct progress object.
    finishTrigger(dataAvailable)
    if (dataAvailable) {
      // Update committed offsets.
      commitLog.add(currentBatchId)
      committedOffsets ++= availableOffsets
      currentBatchId += 1
      sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
    } else {
      currentStatus = currentStatus.copy(isDataAvailable = false)
      updateStatusMessage("Waiting for data to arrive")
      Thread.sleep(pollingDelayMs)
    }
  }
  updateStatusMessage("Waiting for next trigger")
  isActive
})

ContinuousTrigger

在使用ContinuousTrigger時顷级,對應的執(zhí)行邏輯為continuousExecution凫乖。在調(diào)度時,Trigger退化為ProcessingTime Trigger弓颈。僅僅對執(zhí)行狀態(tài)查詢帽芽,記錄

Continuous執(zhí)行邏輯

    triggerExecutor.execute(() => {
        startTrigger()

        if (reader.needsReconfiguration() && state.compareAndSet(ACTIVE, RECONFIGURING)) {
          stopSources()
          if (queryExecutionThread.isAlive) {
            sparkSession.sparkContext.cancelJobGroup(runId.toString)
            queryExecutionThread.interrupt()
          }
          false
        } else if (isActive) {
          currentBatchId = epochEndpoint.askSync[Long](IncrementAndGetEpoch)
          logInfo(s"New epoch $currentBatchId is starting.")
          true
        } else {
          false
        }
      })

在ContinuousDataSourceRDD的compute方法中可以看出,其計算邏輯如下:

* 通過一個名為**continuous-reader--${context.partitionId()}--" +
    s"${context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)}** 的線程實時獲取數(shù)據(jù)翔冀,放入名為queue的隊列中导街。
* worker線程則長時間運行,在計算時則是從queue中實時獲取消息處理纤子。

About waternark

StructedStreaming的與sparkstreaming相比一大特性就是支持基于數(shù)據(jù)中的時間戳的數(shù)據(jù)處理搬瑰。也就是在處理數(shù)據(jù)時款票,可以對記錄中的字段的時間進行考慮。eventTime更好的代表數(shù)據(jù)本身的信息泽论。
可以獲取消息本身的時間戳之后艾少,就可以根據(jù)該時間戳來判斷消息的到達是否延遲(亂序)以及延遲的時間是否在容忍的范圍內(nèi)。該判斷方法是根據(jù)watermark機制來設置和判斷消息的有效性(延遲是否在可容忍范圍內(nèi))

屏幕快照 2018-06-12 下午10.15.59.png

watermark的設置

通過dataset.withWatermark()完成對watermark的設置

watermark的生成/更新

  1. 在driver內(nèi)注冊一個累加器eventTimeStats佩厚;

  2. 在一個批次計算內(nèi)姆钉,executor的各task根據(jù)各自分區(qū)內(nèi)的消息的時間戳,來更新累加器

     executor中各task獲取分區(qū)的eventtime信息方式如下:
     在EventTimeWatermarkExec中的doExecute方法中
     iter.map { row =>
         eventTimeStats.add(getEventTime(row).getLong(0) / 1000)
         row
       }
      def add(eventTime: Long): Unit = {
         this.max = math.max(this.max, eventTime)
         this.min = math.min(this.min, eventTime)
         this.count += 1
         this.avg += (eventTime - avg) / count
     }
    
  3. 在driver端生成batch時抄瓦,獲取各個操作/plan的watermark潮瓶,找出操作的最小的watermark時間點,寫入offsetSeqMetadata,同時寫入offsetlog

     // 計算各plan的watermark
     lastExecution.executedPlan.collect {
               case e: EventTimeWatermarkExec => e
             }.zipWithIndex.foreach {
               case (e, index) if e.eventTimeStats.value.count > 0 =>
                 logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
                 val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
                 val prevWatermarkMs = watermarkMsMap.get(index)
                 if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
                   watermarkMsMap.put(index, newWatermarkMs)
                 }
           //找出watermark中最小值      
         if(!watermarkMsMap.isEmpty) {
           val newWatermarkMs = watermarkMsMap.minBy(_._2)._2
           if (newWatermarkMs > batchWatermarkMs) {
             logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
             batchWatermarkMs = newWatermarkMs
           }
           //寫入offsetSeqMetadata
           offsetSeqMetadata = offsetSeqMetadata.copy(
         batchWatermarkMs = batchWatermarkMs,
         batchTimestampMs = triggerClock.getTimeMillis())
         //寫入offsetlog
         offsetLog.add(
       currentBatchId,
       availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)
    
  4. 根據(jù)watermark在讀消息時過濾數(shù)據(jù)

     StreamingSymmetricHashJoinExec -> doExecutor -> processPartitions -> StoreAndJoinWithOtherSide中有如下操作:
     
     val nonLateRows =
     WatermarkSupport.watermarkExpression(watermarkAttribute, eventTimeWatermark) match {
       case Some(watermarkExpr) =>
         val predicate = newPredicate(watermarkExpr, inputAttributes)
         inputIter.filter { row => !predicate.eval(row) }
       case None =>
         inputIter
     }
    

About state:

流應用中钙姊,如果有狀態(tài)相關的如匯聚毯辅,關聯(lián)等操作,需要再應用中將部分數(shù)據(jù)進行緩存煞额,structedstreaming中通過statestore來對數(shù)據(jù)緩存以備后續(xù)計算及異乘伎郑恢復使用

當前的statestore的實現(xiàn)僅HDFSBackedStateStore,由HDFSBackedStateStoreProvider生成和管理; 每個HDFSBackedStateStoreProvider對應一個目錄。該目錄為{checkpointLocation}/state/operatorId/partitionId/{storeName}.
其中checkpointLocation是query中設置的路徑膊毁,storeName是store分類胀莹,在關聯(lián)中有如如下joinSide-storeType(如left-keyToNumValues)
每個statestore對應一個versionId.delta文件 {checkpointLocation}/state/operatorId/partitionId/{storeName}/versionId.delta。

狀態(tài)數(shù)據(jù)的寫入:

在在一些有狀態(tài)的操作如關聯(lián)匯聚等婚温,部分數(shù)據(jù)需要保存以備后續(xù)計算使用描焰,

store的put操作:
只有需要存儲部分狀態(tài)的操作/算子需要對狀態(tài)數(shù)據(jù)進行緩存。從源碼中查看栅螟,有如下算子:

StateStoreSaveExec
FlatMapGroupsWithStateExec
SymmetricHashJoinStateManager

以流關聯(lián)操作為例荆秦,介紹SymmetricHashJoinStateManager中的state寫流程如下:

1) 將數(shù)據(jù)寫入state文件:在StreamingSymmetricHashJoinExec的doExecute方法中,調(diào)用到processPartitions力图,會調(diào)用到OneSideHashJoiner的storeAndJoinWithOtherSide方法步绸,會根據(jù)條件判斷該記錄是否寫入臨時文件的輸出流中。判斷條件condition ( !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow))
屏幕快照 2018-06-12 下午10.16.14.png
2) 在計算節(jié)結(jié)束后吃媒,將statestore數(shù)據(jù)寫入磁盤
    StreamingSymmetricHashJoinExec -> onOutputCompletion -> leftSideJoiner.commitStateAndGetMetrics -> joinStateManager.commit -> keyToNumValues.commit -> StateStoreHandler.commit -> HDFSBackedStateStore.commit

狀態(tài)數(shù)據(jù)的讀热拷椤:

在一些有狀態(tài)的操作如關聯(lián)匯聚等,需要對“歷史/之前批次”數(shù)據(jù)進行“緩存”晓折,以備下次計算時惑朦,讀取使用。
有兩處讀取store的邏輯

1) statestoreRdd的compute方法
2)StreamingSymmetricHashJoinExec -> doExecutor -> processPartitions -> OneSideHashJoiner.init -> SymmetricHashJoinStateManager.init -> KeyToNumValuesStore.init -> getStateStore -> stateStore.get ->storeProvider.getStore

狀態(tài)數(shù)據(jù)的管理/maintain
在executor內(nèi)部漓概,對于每一個分片啟動一個線程定期“compact”中間數(shù)據(jù),周期由spark.sql.streaming.stateStore.maintenanceInterval參數(shù)控制病梢,默認為60s胃珍,線程名 : state-store-maintenance-task 主要工作是掃描delta文件梁肿,生成snapshot文件,清理陳舊文件觅彰。

生成snapshot文件具體邏輯:
    1) 掃描目錄下的文件吩蔑,找出delta文件當前最大的版本號Max(d)(delta文件的命名方式Int.delta,版本號為Int值填抬,如10.delta烛芬,則版本號為10)
    2) 找出當前最大的snapshot文件的版本號Max(s)(delta文件的命名方式Int.snapshot,版本號為Int值飒责,如10.snapshot赘娄,則版本號為10)
    3) 當Max(d) - Max(s) 大于spark.sql.streaming.stateStore.minDeltasForSnapshot(默認為10)時,進行打快照操作宏蛉。否則遣臼,跳過。
陳舊文件清理:
    1) 找出當前文件的最大版本號Max(v)
    2) MaxversionToRetain =  Max(v) - spark.sql.streaming.minBatchesToRetain(默認100)時拾并,當MaxversionToRetain > 0 時清理所有版本號小于MaxversionToRetain的文件揍堰。否則,跳過
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末嗅义,一起剝皮案震驚了整個濱河市屏歹,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌之碗,老刑警劉巖蝙眶,帶你破解...
    沈念sama閱讀 211,376評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異继控,居然都是意外死亡械馆,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,126評論 2 385
  • 文/潘曉璐 我一進店門武通,熙熙樓的掌柜王于貴愁眉苦臉地迎上來霹崎,“玉大人,你說我怎么就攤上這事冶忱∥补剑” “怎么了?”我有些...
    開封第一講書人閱讀 156,966評論 0 347
  • 文/不壞的土叔 我叫張陵囚枪,是天一觀的道長派诬。 經(jīng)常有香客問我,道長链沼,這世上最難降的妖魔是什么默赂? 我笑而不...
    開封第一講書人閱讀 56,432評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮括勺,結(jié)果婚禮上缆八,老公的妹妹穿的比我還像新娘曲掰。我一直安慰自己,他們只是感情好奈辰,可當我...
    茶點故事閱讀 65,519評論 6 385
  • 文/花漫 我一把揭開白布栏妖。 她就那樣靜靜地躺著,像睡著了一般奖恰。 火紅的嫁衣襯著肌膚如雪吊趾。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,792評論 1 290
  • 那天瑟啃,我揣著相機與錄音论泛,去河邊找鬼。 笑死翰守,一個胖子當著我的面吹牛孵奶,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播蜡峰,決...
    沈念sama閱讀 38,933評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼了袁,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了湿颅?” 一聲冷哼從身側(cè)響起载绿,我...
    開封第一講書人閱讀 37,701評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎油航,沒想到半個月后崭庸,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,143評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡谊囚,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,488評論 2 327
  • 正文 我和宋清朗相戀三年怕享,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片镰踏。...
    茶點故事閱讀 38,626評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡函筋,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出奠伪,到底是詐尸還是另有隱情跌帐,我是刑警寧澤,帶...
    沈念sama閱讀 34,292評論 4 329
  • 正文 年R本政府宣布绊率,位于F島的核電站谨敛,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏滤否。R本人自食惡果不足惜脸狸,卻給世界環(huán)境...
    茶點故事閱讀 39,896評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望藐俺。 院中可真熱鬧肥惭,春花似錦盯仪、人聲如沸紊搪。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽耀石。三九已至牵囤,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間滞伟,已是汗流浹背揭鳞。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留梆奈,地道東北人野崇。 一個月前我還...
    沈念sama閱讀 46,324評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像亩钟,于是被迫代替她去往敵國和親乓梨。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,494評論 2 348

推薦閱讀更多精彩內(nèi)容