Spark-streaming kafka數(shù)據(jù)接收兩種方式對比

1.1 Receiver-based Approach

這種方式利用接收器(Receiver)來接收kafka中的數(shù)據(jù),其最基本是使用Kafka高階用戶API接口往枣。對于所有的接收器,從kafka接收來的數(shù)據(jù)會存儲在spark的executor中,之后spark streaming提交的job會處理這些數(shù)據(jù)。
Receiver-based的Kafka讀取方式是基于Kafka高階(high-level) api來實現(xiàn)對Kafka數(shù)據(jù)的消費柿菩。在提交Spark Streaming任務后,Spark集群會劃出指定的Receivers來專門雨涛、持續(xù)不斷枢舶、異步讀取Kafka的數(shù)據(jù),讀取時間間隔以及每次讀取offsets范圍可以由參數(shù)來配置替久。讀取的數(shù)據(jù)保存在Receiver中凉泄,具體StorageLevel方式由用戶指定,諸如MEMORY_ONLY等侣肄。當driver 觸發(fā)batch任務的時候旧困,Receivers中的數(shù)據(jù)會轉(zhuǎn)移到剩余的Executors中去執(zhí)行醇份。在執(zhí)行完之后稼锅,Receivers會相應更新ZooKeeper的offsets。如要確保at least once的讀取方式僚纷,可以設置spark.streaming.receiver.writeAheadLog.enable為true矩距。具體Receiver執(zhí)行流程見下圖:


image.png
import org.apache.spark.streaming.kafka._

 val kafkaStream = KafkaUtils.createStream(streamingContext, 
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

還有幾個需要注意的點:

  • 在Receiver的方式中,Spark中的partition和kafka中的partition并不是相關的怖竭,所以如果我們加大每個topic的partition數(shù)量锥债,僅僅是增加線程來處理由單一Receiver消費的主題。但是這并沒有增加Spark在處理數(shù)據(jù)上的并行度。
  • 對于不同的Group和topic我們可以使用多個Receiver創(chuàng)建不同的Dstream來并行接收數(shù)據(jù)哮肚,之后可以利用union來統(tǒng)一成一個Dstream登夫。
  • 如果我們啟用了Write Ahead Logs復制到文件系統(tǒng)如HDFS,那么storage level需要設置成StorageLevel.MEMORY_AND_DISK_SER允趟,也就
    是KafkaUtils.createStream(...,StorageLevel.MEMORY_AND_DISK_SER)

4.1.1 源碼分析

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

需要查看currentBuffer在哪里被處理恼策,搜索代碼,發(fā)現(xiàn)
image.png

updateCurrentBuffer方法又在這里被回調(diào)
image.png

image.png

image.png

image.png

period就是上面?zhèn)魅氲模?code>blockIntervalMs,默認200ms切一個block潮剪,這也是一個優(yōu)化參數(shù)
image.png

image.png

這里參考博客涣楷,暫時未找到從哪里找到的

image.png

image.png

image.png

BlockRDD類中getPartitions方法是說將這個batch的blocks作為partitions。Compute方法則按照入?yún)?code>BlockRDDPartition的blockId抗碰,從blockManager中獲取該block作為partition的數(shù)據(jù)狮斗。getPreferredLocations則是將BlockRDDPartition所在的host作為partition的首選位置,移動計算,不移動數(shù)據(jù)原則弧蝇。

image.png

1.2 Direct Approach (No Receivers)

Direct方式采用Kafka簡單的consumer api方式來讀取數(shù)據(jù)碳褒,無需經(jīng)由ZooKeeper,此種方式不再需要專門Receiver來持續(xù)不斷讀取數(shù)據(jù)看疗。當batch任務觸發(fā)時骤视,由Executor讀取數(shù)據(jù),并參與到其他Executor的數(shù)據(jù)計算過程中去鹃觉。driver來決定讀取多少offsets专酗,并將offsets交由checkpoints來維護。將觸發(fā)下次batch任務盗扇,再由Executor讀取Kafka數(shù)據(jù)并計算祷肯。從此過程我們可以發(fā)現(xiàn)Direct方式無需Receiver讀取數(shù)據(jù),而是需要計算時再讀取數(shù)據(jù)疗隶,所以Direct方式的數(shù)據(jù)消費對內(nèi)存的要求不高佑笋,只需要考慮批量計算所需要的內(nèi)存即可;另外batch任務堆積時斑鼻,也不會影響數(shù)據(jù)堆積蒋纬。其具體讀取方式如下圖:


image.png

這種方法相較于Receiver方式的優(yōu)勢在于:

  • 簡化的并行:在Receiver的方式中我們提到創(chuàng)建多個Receiver之后利用union來合并成一個Dstream的方式提高數(shù)據(jù)傳輸并行度。而在Direct方式中坚弱,Kafka中的partition與RDD中的partition是一一對應的并行讀取Kafka數(shù)據(jù)蜀备,這種映射關系也更利于理解和優(yōu)化。
  • 高效:在Receiver的方式中荒叶,為了達到0數(shù)據(jù)丟失需要將數(shù)據(jù)存入Write Ahead Log中碾阁,這樣在Kafka和日志中就保存了兩份數(shù)據(jù),浪費些楣!而第二種方式不存在這個問題脂凶,只要我們Kafka的數(shù)據(jù)保留時間足夠長宪睹,我們都能夠從Kafka進行數(shù)據(jù)恢復。
  • 精確一次:在Receiver的方式中蚕钦,使用的是Kafka的高階API接口從Zookeeper中獲取offset值亭病,這也是傳統(tǒng)的從Kafka中讀取數(shù)據(jù)的方式,但由于Spark Streaming消費的數(shù)據(jù)和Zookeeper中記錄的offset不同步嘶居,這種方式偶爾會造成數(shù)據(jù)重復消費命贴。而第二種方式,直接使用了簡單的低階Kafka API食听,Offsets則利用Spark Streaming的checkpoints進行記錄胸蛛,消除了這種不一致性。

4.2.1 源碼分析

val stream = KafkaUtils.createDirectStream()
image.png

這里兩個優(yōu)化參數(shù):

 val refreshLeaderBackoffMs = props.getInt("refresh.leader.backoff.ms", RefreshMetadataBackoffMs)
 val maxRetries = context.sparkContext.getConf.getInt("spark.streaming.kafka.maxRetries", 1)

image.png

image.png

image.png

優(yōu)化參數(shù) "spark.streaming.kafka.maxRatePerPartition"
image.png

image.png

在getPartitions方法中可以看到樱报,KafkaRDD的partition個數(shù)就是topic的partition個數(shù)之和葬项。
image.png

compute方法是RDD用來構建一個partition的數(shù)據(jù)的。
image.png

image.png

image.png

image.png

image.png

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末迹蛤,一起剝皮案震驚了整個濱河市民珍,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌盗飒,老刑警劉巖嚷量,帶你破解...
    沈念sama閱讀 221,635評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異逆趣,居然都是意外死亡蝶溶,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評論 3 399
  • 文/潘曉璐 我一進店門宣渗,熙熙樓的掌柜王于貴愁眉苦臉地迎上來抖所,“玉大人,你說我怎么就攤上這事痕囱√镌” “怎么了?”我有些...
    開封第一講書人閱讀 168,083評論 0 360
  • 文/不壞的土叔 我叫張陵鞍恢,是天一觀的道長傻粘。 經(jīng)常有香客問我,道長帮掉,這世上最難降的妖魔是什么弦悉? 我笑而不...
    開封第一講書人閱讀 59,640評論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮旭寿,結(jié)果婚禮上警绩,老公的妹妹穿的比我還像新娘崇败。我一直安慰自己盅称,他們只是感情好肩祥,可當我...
    茶點故事閱讀 68,640評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著缩膝,像睡著了一般混狠。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上疾层,一...
    開封第一講書人閱讀 52,262評論 1 308
  • 那天将饺,我揣著相機與錄音,去河邊找鬼痛黎。 笑死予弧,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的湖饱。 我是一名探鬼主播掖蛤,決...
    沈念sama閱讀 40,833評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼井厌!你這毒婦竟也來了蚓庭?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,736評論 0 276
  • 序言:老撾萬榮一對情侶失蹤仅仆,失蹤者是張志新(化名)和其女友劉穎器赞,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體墓拜,經(jīng)...
    沈念sama閱讀 46,280評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡港柜,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,369評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了咳榜。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片潘懊。...
    茶點故事閱讀 40,503評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖贿衍,靈堂內(nèi)的尸體忽然破棺而出授舟,到底是詐尸還是另有隱情,我是刑警寧澤贸辈,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布释树,位于F島的核電站,受9級特大地震影響擎淤,放射性物質(zhì)發(fā)生泄漏奢啥。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,870評論 3 333
  • 文/蒙蒙 一嘴拢、第九天 我趴在偏房一處隱蔽的房頂上張望桩盲。 院中可真熱鬧,春花似錦席吴、人聲如沸赌结。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽柬姚。三九已至拟杉,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間量承,已是汗流浹背搬设。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留撕捍,地道東北人拿穴。 一個月前我還...
    沈念sama閱讀 48,909評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像忧风,于是被迫代替她去往敵國和親贞言。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,512評論 2 359

推薦閱讀更多精彩內(nèi)容