揭開Spark Streaming神秘面紗⑥ - Spark Streaming結(jié)合 Kafka 兩種不同的數(shù)據(jù)接收方式比較

DirectKafkaInputDStream 只在 driver 端接收數(shù)據(jù),所以繼承了 InputDStream抬纸,是沒有 receivers 的


在結(jié)合 Spark Streaming 及 Kafka 的實(shí)時(shí)應(yīng)用中宵呛,我們通常使用以下兩個(gè) API 來獲取最初的 DStream(這里不關(guān)心這兩個(gè) API 的重載):

KafkaUtils#createDirectStream

KafkaUtils#createStream

這兩個(gè) API 除了要傳入的參數(shù)不同外藕坯,接收 kafka 數(shù)據(jù)的節(jié)點(diǎn)伍纫、拉取數(shù)據(jù)的時(shí)機(jī)也完全不同隆夯。本文將分別就兩者進(jìn)行詳細(xì)分析钳恕。

KafkaUtils#createStream

先來分析 createStream,在該函數(shù)中蹄衷,會(huì)新建一個(gè) KafkaInputDStream對(duì)象忧额,KafkaInputDStream繼承于 ReceiverInputDStream。我們?cè)谖恼?a href="http://www.reibang.com/p/3195fb3c4191" target="_blank">揭開Spark Streaming神秘面紗② - ReceiverTracker 與數(shù)據(jù)導(dǎo)入分析過

  1. 繼承ReceiverInputDStream的類需要重載 getReceiver 函數(shù)以提供用于接收數(shù)據(jù)的 receiver
  2. recever 會(huì)調(diào)度到某個(gè) executor 上并啟動(dòng)愧口,不間斷的接收數(shù)據(jù)并將收到的數(shù)據(jù)交由 ReceiverSupervisor 存成 block 作為 RDD 輸入數(shù)據(jù)

KafkaInputDStream當(dāng)然也實(shí)現(xiàn)了getReceiver方法睦番,如下:

  def getReceiver(): Receiver[(K, V)] = {
    if (!useReliableReceiver) {
      //< 不啟用 WAL
      new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
    } else {
      //< 啟用 WAL
      new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
    }
  }

根據(jù)是否啟用 WAL,receiver 分為 KafkaReceiver 和 ReliableKafkaReceiver耍属。揭開Spark Streaming神秘面紗②-ReceiverTracker 與數(shù)據(jù)導(dǎo)入一文中詳細(xì)地介紹了

  1. receiver 是如何被分發(fā)啟動(dòng)的
  2. receiver 接受數(shù)據(jù)后數(shù)據(jù)的流轉(zhuǎn)過程
    并在 揭開Spark Streaming神秘面紗③ - 動(dòng)態(tài)生成 job 一文中詳細(xì)介紹了
  3. receiver 接受的數(shù)據(jù)存儲(chǔ)為 block 后托嚣,如何將 blocks 作為 RDD 的輸入數(shù)據(jù)
  4. 動(dòng)態(tài)生成 job

以上兩篇文章并沒有具體介紹 receiver 是如何接收數(shù)據(jù)的,當(dāng)然每個(gè)重載了 ReceiverInputDStream 的類的 receiver 接收數(shù)據(jù)方式都不相同厚骗。下圖描述了 KafkaReceiver 接收數(shù)據(jù)的具體流程:

KafkaUtils#createDirectStream

揭開Spark Streaming神秘面紗③ - 動(dòng)態(tài)生成 job中示启,介紹了在生成每個(gè) batch 的過程中,會(huì)去取這個(gè) batch 對(duì)應(yīng)的 RDD领舰,若未生成該 RDD夫嗓,則會(huì)取該 RDD 對(duì)應(yīng)的 blocks 數(shù)據(jù)來生成 RDD,最終會(huì)調(diào)用到DStream#compute(validTime: Time)函數(shù)冲秽,在KafkaUtils#createDirectStream調(diào)用中舍咖,會(huì)新建DirectKafkaInputDStreamDirectKafkaInputDStream#compute(validTime: Time)會(huì)從 kafka 拉取數(shù)據(jù)并生成 RDD锉桑,流程如下:

如上圖所示谎仲,該函數(shù)主要做了以下三個(gè)事情:

  1. 確定要接收的 partitions 的 offsetRange,以作為第2步創(chuàng)建的 RDD 的數(shù)據(jù)來源
  2. 創(chuàng)建 RDD 并執(zhí)行 count 操作刨仑,使 RDD 真實(shí)具有數(shù)據(jù)
  3. 以 streamId郑诺、數(shù)據(jù)條數(shù)夹姥,offsetRanges 信息初始化 inputInfo 并添加到 JobScheduler 中

進(jìn)一步看 KafkaRDD 的 getPartitions 實(shí)現(xiàn):

  override def getPartitions: Array[Partition] = {
    offsetRanges.zipWithIndex.map { case (o, i) =>
        val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
    }.toArray
  }

從上面的代碼可以很明顯看到,KafkaRDD 的 partition 數(shù)據(jù)與 Kafka topic 的某個(gè) partition 的 o.fromOffset 至 o.untilOffset 數(shù)據(jù)是相對(duì)應(yīng)的辙诞,也就是說 KafkaRDD 的 partition 與 Kafka partition 是一一對(duì)應(yīng)的


通過以上分析辙售,我們可以對(duì)這兩種方式的區(qū)別做一個(gè)總結(jié):

  1. createStream會(huì)使用 Receiver;而createDirectStream不會(huì)
  2. createStream使用的 Receiver 會(huì)分發(fā)到某個(gè) executor 上去啟動(dòng)并接受數(shù)據(jù)飞涂;而createDirectStream直接在 driver 上接收數(shù)據(jù)
  3. createStream使用 Receiver 源源不斷的接收數(shù)據(jù)并把數(shù)據(jù)交給 ReceiverSupervisor 處理最終存儲(chǔ)為 blocks 作為 RDD 的輸入旦部,從 kafka 拉取數(shù)據(jù)與計(jì)算消費(fèi)數(shù)據(jù)相互獨(dú)立;而createDirectStream會(huì)在每個(gè) batch 拉取數(shù)據(jù)并就地消費(fèi)较店,到下個(gè) batch 再次拉取消費(fèi)士八,周而復(fù)始,從 kafka 拉取數(shù)據(jù)與計(jì)算消費(fèi)數(shù)據(jù)是連續(xù)的梁呈,沒有獨(dú)立開
  4. createStream中創(chuàng)建的KafkaInputDStream 每個(gè) batch 所對(duì)應(yīng)的 RDD 的 partition 不與 Kafka partition 一一對(duì)應(yīng)婚度;而createDirectStream中創(chuàng)建的 DirectKafkaInputDStream 每個(gè) batch 所對(duì)應(yīng)的 RDD 的 partition 與 Kafka partition 一一對(duì)應(yīng)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市官卡,隨后出現(xiàn)的幾起案子蝗茁,更是在濱河造成了極大的恐慌,老刑警劉巖寻咒,帶你破解...
    沈念sama閱讀 211,743評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件哮翘,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡毛秘,警方通過查閱死者的電腦和手機(jī)饭寺,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,296評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來叫挟,“玉大人艰匙,你說我怎么就攤上這事∠既啵” “怎么了旬薯?”我有些...
    開封第一講書人閱讀 157,285評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵晰骑,是天一觀的道長(zhǎng)适秩。 經(jīng)常有香客問我,道長(zhǎng)硕舆,這世上最難降的妖魔是什么秽荞? 我笑而不...
    開封第一講書人閱讀 56,485評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮抚官,結(jié)果婚禮上扬跋,老公的妹妹穿的比我還像新娘。我一直安慰自己凌节,他們只是感情好钦听,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,581評(píng)論 6 386
  • 文/花漫 我一把揭開白布洒试。 她就那樣靜靜地躺著,像睡著了一般朴上。 火紅的嫁衣襯著肌膚如雪垒棋。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,821評(píng)論 1 290
  • 那天痪宰,我揣著相機(jī)與錄音叼架,去河邊找鬼。 笑死衣撬,一個(gè)胖子當(dāng)著我的面吹牛乖订,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播具练,決...
    沈念sama閱讀 38,960評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼乍构,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了靠粪?” 一聲冷哼從身側(cè)響起蜡吧,我...
    開封第一講書人閱讀 37,719評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎占键,沒想到半個(gè)月后昔善,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,186評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡畔乙,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,516評(píng)論 2 327
  • 正文 我和宋清朗相戀三年君仆,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片牲距。...
    茶點(diǎn)故事閱讀 38,650評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡返咱,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出牍鞠,到底是詐尸還是另有隱情咖摹,我是刑警寧澤,帶...
    沈念sama閱讀 34,329評(píng)論 4 330
  • 正文 年R本政府宣布难述,位于F島的核電站萤晴,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏胁后。R本人自食惡果不足惜店读,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,936評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望攀芯。 院中可真熱鬧屯断,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,757評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至趴久,卻和暖如春敏储,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背朋鞍。 一陣腳步聲響...
    開封第一講書人閱讀 31,991評(píng)論 1 266
  • 我被黑心中介騙來泰國(guó)打工已添, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人滥酥。 一個(gè)月前我還...
    沈念sama閱讀 46,370評(píng)論 2 360
  • 正文 我出身青樓更舞,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親坎吻。 傳聞我的和親對(duì)象是個(gè)殘疾皇子缆蝉,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,527評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容