Spark Streaming - Receiver啟動(dòng)流程

在Spark Streaming整個(gè)架構(gòu)體系里面翠勉,數(shù)據(jù)接收其實(shí)最為關(guān)鍵的一個(gè)流程洋侨,在數(shù)據(jù)接收完之后夜牡,后面的數(shù)據(jù)處理上面就是復(fù)用Spark Core的數(shù)據(jù)處理架構(gòu)(包括BlockManager來(lái)管理數(shù)據(jù))窗宦,之后就是”水到渠成”的工作酬姆。所以作為Spark Streaming第一個(gè)章節(jié)勤庐,我們這里先從Receiver的啟動(dòng)流程介紹起走示惊。

Receiver 架構(gòu)

目前整個(gè)Spark Streaming架構(gòu)里面,支持兩種數(shù)據(jù)接收的方式: Receiver, Direct. 區(qū)別在[Spark-kafka-integration]里面有了詳細(xì)介紹愉镰,后續(xù)會(huì)詳細(xì)解讀兩者的區(qū)別米罚。回到Receiver, 我們首要介紹下目前Receiver的架構(gòu)丈探。

Receiver.png

在我們自己實(shí)現(xiàn)Receiver里面主要關(guān)注幾個(gè)點(diǎn):

  1. onStart() 里面實(shí)現(xiàn)接受數(shù)據(jù)的邏輯录择,這里面只需要關(guān)心數(shù)據(jù)如何接收,無(wú)需關(guān)于SS里面的周期性等,而且必須是No-blocking的隘竭,所以這里一般會(huì)啟動(dòng)一個(gè)Thread塘秦,然后在里面不停地接受數(shù)據(jù)。
  2. Reliability保證动看,每個(gè)Receiver接受了數(shù)據(jù)之后尊剔,就需要存儲(chǔ)到Spark的內(nèi)存中區(qū),這里就是調(diào)用Store方法菱皆,為了確保Reliability, 需要在Store成功之后進(jìn)行ACK返回须误,甚至在這里需要進(jìn)行WAL保證。
  3. 當(dāng)寫(xiě)完成Receiver仇轻,會(huì)把這個(gè)Receiver封裝到DStream里面京痢,每個(gè)DStream都會(huì)有自己compute方法來(lái)觸發(fā)從Receiver接受的數(shù)據(jù)進(jìn)行轉(zhuǎn)換轉(zhuǎn)換為BlockRDD進(jìn)而利用Spark Core的計(jì)算流程中

Receiver 啟動(dòng)流程

啟動(dòng)Receiver是一趟修煉的道路,會(huì)學(xué)習(xí)一種利用Spark分布式的環(huán)境啟動(dòng)自己的Process的方法拯田。首先我們看下大致畫(huà)的流程圖如下:

Receiver啟動(dòng)流程.png
  1. Streaming的開(kāi)始都是源于StreamingContext, 當(dāng)初始化的時(shí)候历造,會(huì)產(chǎn)生JobScheduler(這貨是驅(qū)動(dòng)Streaming Job的關(guān)鍵),在StreamingContext.start()的時(shí)候船庇,整個(gè)JobScheduler就開(kāi)始Start啦。
  2. ReceiverTracker, 他會(huì)負(fù)責(zé)整個(gè)StreamContexts里面的所有ReceiverInputDStream里面的Receivers的管理(略繞)侣监。
  3. 那我們直接到StartAllReceivers()里面鸭轮,這里面會(huì)先做schedule, 根據(jù)目前我們SS里面的Executors 和 所有需要啟動(dòng)的Receivers 統(tǒng)一考慮,來(lái)安排一下那些Receiver需要在哪個(gè)Executor上面進(jìn)行啟動(dòng)橄霉。
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
  1. 最巧妙的地方來(lái)了窃爷,全部精華就在ReceiverTracker里面:
      // Function to start the receiver on the worker node
      val startReceiverFunc: Iterator[Receiver[_]] => Unit =
        (iterator: Iterator[Receiver[_]]) => {
          if (!iterator.hasNext) {
            throw new SparkException(
              "Could not start receiver as object not found.")
          }
          if (TaskContext.get().attemptNumber() == 0) {
            val receiver = iterator.next()
            assert(iterator.hasNext == false)
            val supervisor = new ReceiverSupervisorImpl(
              receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
            supervisor.start()
            supervisor.awaitTermination()
          } else {
            // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
          }
        }

      // Create the RDD using the scheduledLocations to run the receiver in a Spark job
      val receiverRDD: RDD[Receiver[_]] =
        if (scheduledLocations.isEmpty) {
          ssc.sc.makeRDD(Seq(receiver), 1)
        } else {
          val preferredLocations = scheduledLocations.map(_.toString).distinct
          ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
        }
      receiverRDD.setName(s"Receiver $receiverId")
      ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
      ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

      val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
        receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
      // We will keep restarting the receiver job until ReceiverTracker is stopped
      future.onComplete {
        case Success(_) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))
          }
        case Failure(e) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logError("Receiver has been stopped. Try to restart it.", e)
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))
          }
      }(ThreadUtils.sameThread)
      logInfo(s"Receiver ${receiver.streamId} started")
  • 首先是startReceiverFunc,注意這里只是一個(gè)function, 它把傳入的Receiver封裝成ReceiverSupervisorImpl, 給每個(gè)Receiver創(chuàng)建監(jiān)督人姓蜂,監(jiān)督人主要是負(fù)責(zé)啟動(dòng)Receiver按厘,存儲(chǔ)Receiver接受的數(shù)據(jù) 以及 同Driver保持通信
  • 這時(shí)候創(chuàng)建出一個(gè)ReceiverRDD, 根據(jù)之前Receiver和Executor的安排, 這個(gè)RDD的內(nèi)容就是 receiver -> 它安排的executor的host.
  • 然后利用ssc.sparkContext.submitJob把這個(gè)RDD以及剛剛startReceiverFunc一起提交到集群上面去。這時(shí)候就利用Spark Core的分布式計(jì)算框架钱慢,讓receiver 分散到自己prefer的host上面進(jìn)行了啟動(dòng)逮京。
    這里就交了我們一招如何利用Spark分布式計(jì)算環(huán)境,啟動(dòng)我們自己進(jìn)程的任務(wù)束莫。關(guān)鍵點(diǎn): 構(gòu)造出自己數(shù)據(jù)對(duì)于的運(yùn)行Executor, 然后把這批數(shù)據(jù)啟動(dòng)流程function構(gòu)造出來(lái)懒棉,最后利用ssc.sparkContext.submitJob()
  1. 在ReceiverSupervisor, 用于處理Receiver收到的數(shù)據(jù),并且利用BlockGenerator 來(lái)產(chǎn)生最終數(shù)據(jù)的Block.

總結(jié)

在Spark Streaming里面數(shù)據(jù)接收是關(guān)鍵览绿,我們從上面看到我們的Receiver的啟動(dòng)流程策严,也學(xué)到如何利用Spark這個(gè)分布式平臺(tái)進(jìn)行提交自己的Job. 后面我們會(huì)看到Spark Streaming里面Job啟動(dòng)的流程。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末饿敲,一起剝皮案震驚了整個(gè)濱河市妻导,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖倔韭,帶你破解...
    沈念sama閱讀 211,423評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件术浪,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡狐肢,警方通過(guò)查閱死者的電腦和手機(jī)添吗,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,147評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)份名,“玉大人碟联,你說(shuō)我怎么就攤上這事〗┫伲” “怎么了鲤孵?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,019評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)辰如。 經(jīng)常有香客問(wèn)我普监,道長(zhǎng),這世上最難降的妖魔是什么琉兜? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,443評(píng)論 1 283
  • 正文 為了忘掉前任凯正,我火速辦了婚禮,結(jié)果婚禮上豌蟋,老公的妹妹穿的比我還像新娘廊散。我一直安慰自己,他們只是感情好梧疲,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,535評(píng)論 6 385
  • 文/花漫 我一把揭開(kāi)白布允睹。 她就那樣靜靜地躺著,像睡著了一般幌氮。 火紅的嫁衣襯著肌膚如雪缭受。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,798評(píng)論 1 290
  • 那天该互,我揣著相機(jī)與錄音米者,去河邊找鬼。 笑死慢洋,一個(gè)胖子當(dāng)著我的面吹牛塘雳,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播普筹,決...
    沈念sama閱讀 38,941評(píng)論 3 407
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼败明,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了太防?” 一聲冷哼從身側(cè)響起妻顶,我...
    開(kāi)封第一講書(shū)人閱讀 37,704評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤酸员,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后讳嘱,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體幔嗦,經(jīng)...
    沈念sama閱讀 44,152評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,494評(píng)論 2 327
  • 正文 我和宋清朗相戀三年沥潭,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了邀泉。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,629評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡钝鸽,死狀恐怖汇恤,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情拔恰,我是刑警寧澤因谎,帶...
    沈念sama閱讀 34,295評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站颜懊,受9級(jí)特大地震影響财岔,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜河爹,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,901評(píng)論 3 313
  • 文/蒙蒙 一匠璧、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧咸这,春花似錦患朱、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,742評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)冰沙。三九已至侨艾,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間拓挥,已是汗流浹背唠梨。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,978評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留侥啤,地道東北人当叭。 一個(gè)月前我還...
    沈念sama閱讀 46,333評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像盖灸,于是被迫代替她去往敵國(guó)和親蚁鳖。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,499評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容