Spark Streaming運行架構(gòu)分析

簡介

Spark StreamingSpark Core的擴展,是構(gòu)建于Spark Core之上的實時流處理系統(tǒng)监婶。相對于其他實時流處理系統(tǒng),Spark Streaming最大的優(yōu)勢在于其位于Spark技術(shù)棧中役拴,也即流處理引擎與數(shù)據(jù)處理引擎在同一個軟件棧中钝尸。在Spark Streaming中,數(shù)據(jù)的采集是以逐條方式部翘,而數(shù)據(jù)處理是按批進(jìn)行的硝训。因此,其系統(tǒng)吞吐量會比流行的純實時流處理引擎Storm高2~5倍新思。

Spark Streaming對流數(shù)據(jù)處理的過成大致可以分為:啟動流處理引擎窖梁、接收和存儲流數(shù)據(jù)、處理流數(shù)據(jù)和輸出處理結(jié)果等四個步驟夹囚。其運行架構(gòu)圖如下所示:

Step1 啟動流處理引擎

StreamingContextSpark StreamingDriver端的上下文纵刘,是spark streaming程序的入口。在該對象的啟 動過程中荸哟,會初始化其內(nèi)部的組件假哎,其中最為重要的是DStreamGraph以及JobScheduler組件的初始化。

class StreamingContext private[streaming] (
    _sc: SparkContext,
    _cp: Checkpoint,
    _batchDur: Duration
  ) extends Logging {
...
private[streaming] val conf = sc.conf

private[streaming] val env = sc.env

private[streaming] val graph: DStreamGraph = {
    if (isCheckpointPresent) {
      _cp.graph.setContext(this)
      _cp.graph.restoreCheckpointData()
      _cp.graph
    } else {
      require(_batchDur != null, "Batch duration for StreamingContext cannot be null")
      val newGraph = new DStreamGraph()
      newGraph.setBatchDuration(_batchDur)
      newGraph
    }
  }
...    
private[streaming] val scheduler = new JobScheduler(this)
...
}

Spark Streaming中作業(yè)的生成方式類似Spark核心鞍历,對DStream進(jìn)行的各種操作讓他們之間構(gòu)建起依賴關(guān)系舵抹,DStreamGraph記錄了DStream之間的依賴關(guān)系等信息。

JobSchedulerSpark StreamingJob總調(diào)度者劣砍。JobScheduler 有兩個非常重要的成員:JobGeneratorReceiverTracker惧蛹。JobGenerator維護一個定時器,定時為每個 batch 生成RDD DAG的實例;ReceiverTracker負(fù)責(zé)啟動香嗓、管理各個 receiver及管理各個receiver 接收到的數(shù)據(jù)爵政。

通過調(diào)用StreamingContext#start()方法啟動流處理引擎。在StreamingContext#start()中陶缺,調(diào)用StreamingContext#validate()方法對DStreamGraphcheckpoint等做有效性檢查钾挟,然后啟動新的線程設(shè)置SparkContext,并啟動JobScheduler饱岸。

 def start(): Unit = synchronized {
...
     validate()
     ThreadUtils.runInNewThread("streaming-start") {
         sparkContext.setCallSite(startSite.get)
         sparkContext.clearJobGroup()
         sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,                  "false")      
         savedProperties.set(SerializationUtils.clone(sparkContext
                .localProperties.get())) 
         scheduler.start()
     }
     state = StreamingContextState.ACTIVE
     StreamingContext.setActiveContext(this)
...
  }

Step2 接收與存儲流數(shù)據(jù)

JobScheduler啟動時掺出,會創(chuàng)建一個新的 ReceiverTracker 實例 receiverTracker,并調(diào)用其start() 方法苫费。在ReceiverTracker #start()中會初始化一個endpoint: ReceiverTrackerEndpoint對象汤锨,該對象用于接收和處理ReceiverTrackerreceivers之間 發(fā)送的消息。此外百框,在ReceiverTracker#start()中還會調(diào)用 launchReceivers 將各個receivers 分發(fā)到 executors 上闲礼。

def start(): Unit = synchronized {
    if (isTrackerStarted) {
      throw new SparkException("ReceiverTracker already started")
    }
    if (!receiverInputStreams.isEmpty) {
      endpoint = ssc.env.rpcEnv.setupEndpoint(
        "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
      if (!skipReceiverLaunch) launchReceivers()
      logInfo("ReceiverTracker started")
      trackerState = Started
    }
  }

ReceiverTracker#launchReceivers()會從DStreamGraph.inputStreams 中抽取出receivers,也即數(shù)據(jù)接收器铐维。得到receivers后柬泽,給消息接收處理器 endpoint 發(fā)送 StartAllReceivers(receivers)消息。

  private def launchReceivers(): Unit = {
    val receivers = receiverInputStreams.map { nis =>
      val rcvr = nis.getReceiver()
      rcvr.setReceiverId(nis.id)
      rcvr
    }
    runDummySparkJob()
    logInfo("Starting " + receivers.length + " receivers")
    endpoint.send(StartAllReceivers(receivers))
  }

endpoint在接收到消息后嫁蛇,首先會判別消息的類型锨并,對不同的消息執(zhí)行不同的處理操作。當(dāng)收到StartAllReceivers類型的消息時睬棚,首先會計算每一個receiver要發(fā)送的目的executors第煮,其計算主要遵循兩條原則:一是盡可能的使receiver分布均勻;二是如果receiver本身的preferredLocation不均勻抑党,則以preferredLocation為準(zhǔn)包警。然后遍歷每一個receiver,根據(jù)計算出的executors調(diào)用startReceiver方法來啟動receivers底靠。

case StartAllReceivers(receivers) =>
        val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
        for (receiver <- receivers) {
          val executors = scheduledLocations(receiver.streamId)
          updateReceiverScheduledExecutors(receiver.streamId, executors)
          receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
          startReceiver(receiver, executors)
        }

由于ReceiverInputDStream實例只有一個receiver害晦,但receiver可能需要在多個worker上啟動線程來接收數(shù)據(jù),因此在startReceiver中需要將receiver及其對應(yīng)的目的excutors轉(zhuǎn)換成RDD苛骨。

val receiverRDD: RDD[Receiver[_]] =
        if (scheduledLocations.isEmpty) {
          ssc.sc.makeRDD(Seq(receiver), 1)
        } else {
          val preferredLocations = scheduledLocations.map(_.toString).distinct
          ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
        }

轉(zhuǎn)換為RDD后篱瞎,需要把receiver所進(jìn)行的計算定義為startReceiverFunc函數(shù),該函數(shù)以receiver實例為參數(shù)構(gòu)造ReceiverSupervisorImpl實例supervisor痒芝,構(gòu)造完畢后使用新線程啟動該supervisor并阻塞該線程俐筋。

val supervisor = new ReceiverSupervisorImpl(
  receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()

最后,將receiverRDD以及要在receiverRDD上執(zhí)行的函數(shù)作為Job提交严衬,以真正在各個executors上啟動Receiver澄者。Job執(zhí)行后將會持續(xù)的進(jìn)行數(shù)據(jù)的接收。

val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
        receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())

Receiver源源不斷的接收到實時流數(shù)據(jù)后,根據(jù)接收數(shù)據(jù)的大小進(jìn)行判斷粱挡,若數(shù)據(jù)量很小赠幕,則會聚集多條數(shù)據(jù)成一塊,然后進(jìn)行塊存儲询筏;若數(shù)據(jù)量很大榕堰,則直接進(jìn)行塊存儲。對于這些數(shù)據(jù)嫌套,Receiver會直接交由ReceiverSupervisor逆屡,由其進(jìn)行數(shù)據(jù)的轉(zhuǎn)儲操作。配置參數(shù)spark.streaming.receiver.writeAheadLog.enable的值決定是否預(yù)寫日志踱讨。根據(jù)參數(shù)值會產(chǎn)生不同類型的存儲receivedBlockHandler對象魏蔗。

private val receivedBlockHandler: ReceivedBlockHandler = {
  if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
    //先寫 WAL,再存儲到 executor 的內(nèi)存或硬盤
    new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
      receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
  } else {
    //直接存到 executor 的內(nèi)存或硬盤
    new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
  }
}

根據(jù)receivedBlockHandler進(jìn)行塊存儲痹筛。將 block 存儲之后莺治,會獲得 block 描述信息 blockInfo:ReceivedBlockInfo,這其中包含:streamId帚稠、數(shù)據(jù)位置谣旁、數(shù)據(jù)條數(shù)、數(shù)據(jù) size 等信息翁锡。接著蔓挖,封裝以 block 作為參數(shù)的 AddBlock(blockInfo) 消息并發(fā)送給 ReceiverTracker 以通知其有新增 block 數(shù)據(jù)塊。

//調(diào)用 receivedBlockHandler.storeBlock 方法存儲 block馆衔,并得到一個 blockStoreResult
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
//使用blockStoreResult初始化一個ReceivedBlockInfo實例
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
//發(fā)送消息通知 ReceiverTracker 新增并存儲了 block
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))

ReceiverTracker再把這些信息轉(zhuǎn)發(fā)給ReceivedBlockTracker,由其負(fù)責(zé)管理收到數(shù)據(jù)塊元信息怨绣。

private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    receivedBlockTracker.addBlock(receivedBlockInfo)
  }

step3 處理流數(shù)據(jù)

JobScheduler中有兩個主要的成員角溃,一個是上文提到的ReceiverTracker,另一個則是JobGenerator 篮撑。在JobScheduler啟動時减细,會創(chuàng)建一個新的 JobGenerator 實例 jobGenerator,并調(diào)用其start() 方法赢笨。在 JobGenerator 的主構(gòu)造函數(shù)中未蝌,會創(chuàng)建一個定時器:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

定時器中定義了批處理時間間隔ssc.graph.batchDuration.milliseconds。每當(dāng)批處理時間到來時茧妒,會執(zhí)行一次eventLoop.post(GenerateJobs(new Time(longTime)))方法來向 eventLoop 發(fā)送 GenerateJobs(new Time(longTime))消息萧吠,eventLoop收到消息后會基于當(dāng)前batch內(nèi)的數(shù)據(jù)進(jìn)行Job的生成及提交執(zhí)行。

private def generateJobs(time: Time) {
    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
    // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
    Try {
    // allocate received blocks to batch
    jobScheduler.receiverTracker.allocateBlocksToBatch(time)
    // generate jobs using allocated block
    graph.generateJobs(time)
} match {
    case Success(jobs) =>
    val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
    jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
    jobScheduler.reportError("Error generating jobs for time " + time, e)
    PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
  }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
 }

由源碼可知桐筏,eventLoop 在接收到 GenerateJobs(new Time(longTime))消息后首先調(diào)用了allocateBlocksToBatch()方法將已收到的blocks分配給batch纸型。緊接著調(diào)用DStreamGraph類中的generateJobs()方法來生成基于該batchJob序列。然后將批處理時間time、作業(yè)序列Seq[Job]和本批次數(shù)據(jù)的源信息包裝為JobSet狰腌,調(diào)用JobScheduler.submitJobSet(JobSet)提交給JobScheduler除破,JobScheduler將這些作業(yè)發(fā)送給Spark核心進(jìn)行處理。

Step4 輸出處理結(jié)果

由于數(shù)據(jù)的處理有Spark核心來完成琼腔,因此處理的結(jié)果會從Spark核心中直接輸出至外部系統(tǒng)瑰枫,如數(shù)據(jù)庫或者文件系統(tǒng)等,同時輸出的數(shù)據(jù)也可以直接被外部系統(tǒng)所使用丹莲。由于實時流數(shù)據(jù)的數(shù)據(jù)源源不斷的流入光坝,Spark會周而復(fù)始的進(jìn)行數(shù)據(jù)的計算,相應(yīng)也會持續(xù)輸出處理結(jié)果圾笨。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末教馆,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子擂达,更是在濱河造成了極大的恐慌土铺,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,744評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件板鬓,死亡現(xiàn)場離奇詭異悲敷,居然都是意外死亡,警方通過查閱死者的電腦和手機俭令,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,505評論 3 392
  • 文/潘曉璐 我一進(jìn)店門后德,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人抄腔,你說我怎么就攤上這事瓢湃。” “怎么了赫蛇?”我有些...
    開封第一講書人閱讀 163,105評論 0 353
  • 文/不壞的土叔 我叫張陵绵患,是天一觀的道長。 經(jīng)常有香客問我悟耘,道長落蝙,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,242評論 1 292
  • 正文 為了忘掉前任暂幼,我火速辦了婚禮筏勒,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘旺嬉。我一直安慰自己管行,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,269評論 6 389
  • 文/花漫 我一把揭開白布鹰服。 她就那樣靜靜地躺著病瞳,像睡著了一般揽咕。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上套菜,一...
    開封第一講書人閱讀 51,215評論 1 299
  • 那天亲善,我揣著相機與錄音,去河邊找鬼逗柴。 笑死蛹头,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的戏溺。 我是一名探鬼主播渣蜗,決...
    沈念sama閱讀 40,096評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼旷祸!你這毒婦竟也來了耕拷?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,939評論 0 274
  • 序言:老撾萬榮一對情侶失蹤托享,失蹤者是張志新(化名)和其女友劉穎骚烧,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體闰围,經(jīng)...
    沈念sama閱讀 45,354評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡赃绊,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,573評論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了羡榴。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片碧查。...
    茶點故事閱讀 39,745評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖校仑,靈堂內(nèi)的尸體忽然破棺而出忠售,到底是詐尸還是另有隱情,我是刑警寧澤迄沫,帶...
    沈念sama閱讀 35,448評論 5 344
  • 正文 年R本政府宣布档痪,位于F島的核電站,受9級特大地震影響邢滑,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜愿汰,卻給世界環(huán)境...
    茶點故事閱讀 41,048評論 3 327
  • 文/蒙蒙 一困后、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧衬廷,春花似錦摇予、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,683評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽宁昭。三九已至,卻和暖如春酗宋,著一層夾襖步出監(jiān)牢的瞬間积仗,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,838評論 1 269
  • 我被黑心中介騙來泰國打工蜕猫, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留寂曹,地道東北人。 一個月前我還...
    沈念sama閱讀 47,776評論 2 369
  • 正文 我出身青樓回右,卻偏偏與公主長得像隆圆,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子翔烁,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,652評論 2 354