spark streaming源碼分析之ReceiverTracker詳解

我們關(guān)注的問題是數(shù)據(jù)是怎么被接收的仰坦?又是怎么存儲的驳阎?

數(shù)據(jù)是被executor上的線程receiver接收的捂齐,接收之后交由executor上的線程ReceiverSupervisorImpl處理慧脱。

JobScheduler的重要成員之一登場3曷恕镜廉!ReceiverTrackerE濉!!
ReceiverTracker的簡單介紹齐遵?

ReceiverTracker的目的是為每個batch的RDD提供輸入數(shù)據(jù)寂玲。通過以下三步完成:

  1. 分發(fā)receiver到executor,啟動接收的線程梗摇。
  2. 分發(fā)ReceiverSupervisorImpl到executor拓哟,啟動處理數(shù)據(jù)的線程,并掌握數(shù)據(jù)的信息
  3. 一個job提交了伶授,它是怎么為其提供數(shù)據(jù)進行etl的断序?

++首先看下Receiver是怎么被分發(fā)到各個executor上的++

def start(): Unit = synchronized {
    //....

    if (!receiverInputStreams.isEmpty) {
      endpoint = ssc.env.rpcEnv.setupEndpoint(
        "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))//用來接收和處理來自 ReceiverTracker 和 receivers 發(fā)送的消息
      if (!skipReceiverLaunch) launchReceivers() //重要!考點C优搿Nナ!將receiver分發(fā)到executers
      //.....
    }
  }
//來景图!具體來看launchReceivers
private def launchReceivers(): Unit = {
    val receivers = receiverInputStreams.map {...}//DStreamGraph持有所有的inputDS较雕,獲取到這些inputDS的receiver

    
    endpoint.send(StartAllReceivers(receivers))//拿到receivers后分發(fā)的具體實現(xiàn)
}

override def receive: PartialFunction[Any, Unit] = {
      // 確定了每個 receiver 要分發(fā)到哪些 executors 
      case StartAllReceivers(receivers) =>
        val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
        for (receiver <- receivers) {
          val executors = scheduledLocations(receiver.streamId)
          updateReceiverScheduledExecutors(receiver.streamId, executors)
          receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
          startReceiver(receiver, executors)
        }
      //.....  
}

private def startReceiver(
        receiver: Receiver[_],
        scheduledLocations: Seq[TaskLocation]): Unit = {

      // Function to start the receiver on the worker node
      //重點!考點V勘摇亮蒋!這個函數(shù)會和rdd一起提交,它new了一個ReceiverSupervisorImpl用來具體處理接收的數(shù)據(jù)妆毕,后面會具體講I骶痢!
      val startReceiverFunc: Iterator[Receiver[_]] => Unit =
        (iterator: Iterator[Receiver[_]]) => {
          
          if (TaskContext.get().attemptNumber() == 0) {
            val receiver = iterator.next()
            val supervisor = new ReceiverSupervisorImpl(
              receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)//真正處理接收到的數(shù)據(jù)
            supervisor.start()//啟動線程
            supervisor.awaitTermination()//重要笛粘!堵塞線程趁怔,源源不斷的從reciver處獲取數(shù)據(jù)!
          }
        }

      // Create the RDD using the scheduledLocations to run the receiver in a Spark job
      //重點薪前!考點H笈!這里把recever和location打包成一個rdd了示括,所以recevier可以在多個executor上運行F探健!垛膝!
      val receiverRDD: RDD[Receiver[_]] =
        if (scheduledLocations.isEmpty) {
          ssc.sc.makeRDD(Seq(receiver), 1)
        } else {
          val preferredLocations = scheduledLocations.map(_.toString).distinct
          ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
        }
      
      //.....

     //提交啦鳍侣!?? 到這里recevier就被分發(fā)到具體的executor上了
      val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
        receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
      
      //....
    }
  

++來,再看一下具體在executor上是怎么實現(xiàn)處理數(shù)據(jù)的吼拥?++

第一部分倚聚,怎么接收數(shù)據(jù)?

recevier被分發(fā)到具體的executor上之后會怎么實現(xiàn)數(shù)據(jù)的處理呢凿可?reciver會調(diào)用supervisor的put方法;笳邸!!也就是說recevier其實只關(guān)心從哪兒接數(shù)據(jù)以及數(shù)據(jù)接過來怎么解析唬复,而并不關(guān)心數(shù)據(jù)怎么存4;H埂敞咧!誰在用!9枷佟休建!

//先看下recevier怎么把數(shù)據(jù)給ReceiverSupervisorImpl,比如KafkaReceiver
class KafkaReceiver(....) extends Receiver[(K, V)](storageLevel) with Logging {

  def onStart() {

  
    //去哪兒接收數(shù)據(jù)
    // Kafka connection properties
    // Create the connection to the cluster

    //接收到的數(shù)據(jù)怎么解析
    val keyDecoder = ...
    val valueDecoder = ...


    //線程池接收數(shù)據(jù)
    val executorPool = ...
    topicMessageStreams.values.foreach { streams =>
        streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
  }

  // 處理接收到的數(shù)據(jù),storeF懒啤2馍啊!這里會調(diào)用supervisor.pushSingle0俅摇F鲂!加匈!
  private class MessageHandler(stream: KafkaStream[K, V])
    extends Runnable {
    def run() {
      val streamIterator = stream.iterator()
        while (streamIterator.hasNext()) {
          val msgAndMetadata = streamIterator.next()
          store((msgAndMetadata.key, msgAndMetadata.message))
        }
    }
  }
}

第二部分存璃,那么數(shù)據(jù)接過來了,怎么存儲呢雕拼?這里是ReceiverSupervisorImpl實現(xiàn)的纵东,主要有三個方法:

//put類,會把一條條的數(shù)據(jù)交給BlockGenerator啥寇,匯聚成block
def pushSingle(data: Any) {
    defaultBlockGenerator.addData(data)
}


def pushAndReportBlock(
      receivedBlock: ReceivedBlock,
      metadataOption: Option[Any],
      blockIdOption: Option[StreamBlockId]
    ) {
    
    //存儲block的具體邏輯
    val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
    
    //存儲成功之后偎球,發(fā)送新增的blockInfo到ReceiverTracker
    val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
    trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
    
}

//把每個block通過blockManager存到內(nèi)存/硬盤,同rdd邏輯一致
private val receivedBlockHandler: ReceivedBlockHandler = {
    if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
     //wal辑甜,重點衰絮!預(yù)寫!磷醋!防丟數(shù)據(jù)
      new WriteAheadLogBasedBlockHandler(env.blockManager, env.serializerManager, receiver.streamId,
        receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
    } else {
      new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
    }
  }

第三部分猫牡,數(shù)據(jù)怎么被用呢?數(shù)據(jù)被存儲之后告知了ReceiverTracker子檀,但是怎么用呢镊掖?

//ReceiverTracker自己是不管block的,它有一個成員receivedBlockTracker來處理褂痰!它是個老板D督!缩歪!
private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    receivedBlockTracker.addBlock(receivedBlockInfo)
}


//注意??定時器JobGenerate在定時提交job的時候會調(diào)用ReceiverTracker的allocateBlocksToBatch方法來把block和batch對應(yīng)起來归薛,可以看到block怎么被分配到batch這個過程是receivedBlockTracker處理的!!
def allocateBlocksToBatch(batchTime: Time): Unit = {
    if (receiverInputStreams.nonEmpty) {
      receivedBlockTracker.allocateBlocksToBatch(batchTime)
    }
  }

關(guān)于數(shù)據(jù)被存儲之后主籍,是怎么和rdd關(guān)聯(lián)起來的习贫,更多的內(nèi)容在spark streaming源碼分析之job、rdd千元、blocks之間是如何對應(yīng)的苫昌?

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市幸海,隨后出現(xiàn)的幾起案子祟身,更是在濱河造成了極大的恐慌,老刑警劉巖物独,帶你破解...
    沈念sama閱讀 222,378評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件袜硫,死亡現(xiàn)場離奇詭異,居然都是意外死亡挡篓,警方通過查閱死者的電腦和手機婉陷,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,970評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來官研,“玉大人秽澳,你說我怎么就攤上這事》Р危” “怎么了肝集?”我有些...
    開封第一講書人閱讀 168,983評論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長蛛壳。 經(jīng)常有香客問我杏瞻,道長,這世上最難降的妖魔是什么衙荐? 我笑而不...
    開封第一講書人閱讀 59,938評論 1 299
  • 正文 為了忘掉前任捞挥,我火速辦了婚禮,結(jié)果婚禮上忧吟,老公的妹妹穿的比我還像新娘砌函。我一直安慰自己,他們只是感情好溜族,可當我...
    茶點故事閱讀 68,955評論 6 398
  • 文/花漫 我一把揭開白布讹俊。 她就那樣靜靜地躺著,像睡著了一般煌抒。 火紅的嫁衣襯著肌膚如雪仍劈。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,549評論 1 312
  • 那天寡壮,我揣著相機與錄音贩疙,去河邊找鬼讹弯。 笑死,一個胖子當著我的面吹牛这溅,可吹牛的內(nèi)容都是我干的组民。 我是一名探鬼主播,決...
    沈念sama閱讀 41,063評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼悲靴,長吁一口氣:“原來是場噩夢啊……” “哼臭胜!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起对竣,我...
    開封第一講書人閱讀 39,991評論 0 277
  • 序言:老撾萬榮一對情侶失蹤庇楞,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后否纬,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,522評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡蛋褥,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,604評論 3 342
  • 正文 我和宋清朗相戀三年临燃,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片烙心。...
    茶點故事閱讀 40,742評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡膜廊,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出淫茵,到底是詐尸還是另有隱情爪瓜,我是刑警寧澤,帶...
    沈念sama閱讀 36,413評論 5 351
  • 正文 年R本政府宣布匙瘪,位于F島的核電站铆铆,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏丹喻。R本人自食惡果不足惜薄货,卻給世界環(huán)境...
    茶點故事閱讀 42,094評論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望碍论。 院中可真熱鬧谅猾,春花似錦、人聲如沸鳍悠。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,572評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽藏研。三九已至敬矩,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間遥倦,已是汗流浹背谤绳。 一陣腳步聲響...
    開封第一講書人閱讀 33,671評論 1 274
  • 我被黑心中介騙來泰國打工占锯, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人缩筛。 一個月前我還...
    沈念sama閱讀 49,159評論 3 378
  • 正文 我出身青樓消略,卻偏偏與公主長得像,于是被迫代替她去往敵國和親瞎抛。 傳聞我的和親對象是個殘疾皇子艺演,可洞房花燭夜當晚...
    茶點故事閱讀 45,747評論 2 361

推薦閱讀更多精彩內(nèi)容