DirectKafkaInputDStream 只在 driver 端接收數(shù)據(jù),所以繼承了 InputDStream抬纸,是沒有 receivers 的
在結(jié)合 Spark Streaming 及 Kafka 的實(shí)時(shí)應(yīng)用中宵呛,我們通常使用以下兩個(gè) API 來獲取最初的 DStream(這里不關(guān)心這兩個(gè) API 的重載):
KafkaUtils#createDirectStream
及
KafkaUtils#createStream
這兩個(gè) API 除了要傳入的參數(shù)不同外藕坯,接收 kafka 數(shù)據(jù)的節(jié)點(diǎn)伍纫、拉取數(shù)據(jù)的時(shí)機(jī)也完全不同隆夯。本文將分別就兩者進(jìn)行詳細(xì)分析钳恕。
KafkaUtils#createStream
先來分析 createStream
,在該函數(shù)中蹄衷,會(huì)新建一個(gè) KafkaInputDStream
對(duì)象忧额,KafkaInputDStream
繼承于 ReceiverInputDStream
。我們?cè)谖恼?a href="http://www.reibang.com/p/3195fb3c4191" target="_blank">揭開Spark Streaming神秘面紗② - ReceiverTracker 與數(shù)據(jù)導(dǎo)入分析過
- 繼承ReceiverInputDStream的類需要重載 getReceiver 函數(shù)以提供用于接收數(shù)據(jù)的 receiver
- recever 會(huì)調(diào)度到某個(gè) executor 上并啟動(dòng)愧口,不間斷的接收數(shù)據(jù)并將收到的數(shù)據(jù)交由 ReceiverSupervisor 存成 block 作為 RDD 輸入數(shù)據(jù)
KafkaInputDStream當(dāng)然也實(shí)現(xiàn)了getReceiver方法睦番,如下:
def getReceiver(): Receiver[(K, V)] = {
if (!useReliableReceiver) {
//< 不啟用 WAL
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
} else {
//< 啟用 WAL
new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
}
}
根據(jù)是否啟用 WAL,receiver 分為 KafkaReceiver 和 ReliableKafkaReceiver耍属。揭開Spark Streaming神秘面紗②-ReceiverTracker 與數(shù)據(jù)導(dǎo)入一文中詳細(xì)地介紹了
- receiver 是如何被分發(fā)啟動(dòng)的
- receiver 接受數(shù)據(jù)后數(shù)據(jù)的流轉(zhuǎn)過程
并在 揭開Spark Streaming神秘面紗③ - 動(dòng)態(tài)生成 job 一文中詳細(xì)介紹了 - receiver 接受的數(shù)據(jù)存儲(chǔ)為 block 后托嚣,如何將 blocks 作為 RDD 的輸入數(shù)據(jù)
- 動(dòng)態(tài)生成 job
以上兩篇文章并沒有具體介紹 receiver 是如何接收數(shù)據(jù)的,當(dāng)然每個(gè)重載了 ReceiverInputDStream 的類的 receiver 接收數(shù)據(jù)方式都不相同厚骗。下圖描述了 KafkaReceiver 接收數(shù)據(jù)的具體流程:
KafkaUtils#createDirectStream
在揭開Spark Streaming神秘面紗③ - 動(dòng)態(tài)生成 job中示启,介紹了在生成每個(gè) batch 的過程中,會(huì)去取這個(gè) batch 對(duì)應(yīng)的 RDD领舰,若未生成該 RDD夫嗓,則會(huì)取該 RDD 對(duì)應(yīng)的 blocks 數(shù)據(jù)來生成 RDD,最終會(huì)調(diào)用到DStream#compute(validTime: Time)
函數(shù)冲秽,在KafkaUtils#createDirectStream
調(diào)用中舍咖,會(huì)新建DirectKafkaInputDStream
,DirectKafkaInputDStream#compute(validTime: Time)
會(huì)從 kafka 拉取數(shù)據(jù)并生成 RDD锉桑,流程如下:
如上圖所示谎仲,該函數(shù)主要做了以下三個(gè)事情:
- 確定要接收的 partitions 的 offsetRange,以作為第2步創(chuàng)建的 RDD 的數(shù)據(jù)來源
- 創(chuàng)建 RDD 并執(zhí)行 count 操作刨仑,使 RDD 真實(shí)具有數(shù)據(jù)
- 以 streamId郑诺、數(shù)據(jù)條數(shù)夹姥,offsetRanges 信息初始化 inputInfo 并添加到 JobScheduler 中
進(jìn)一步看 KafkaRDD 的 getPartitions 實(shí)現(xiàn):
override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) =>
val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
}.toArray
}
從上面的代碼可以很明顯看到,KafkaRDD 的 partition 數(shù)據(jù)與 Kafka topic 的某個(gè) partition 的 o.fromOffset 至 o.untilOffset 數(shù)據(jù)是相對(duì)應(yīng)的辙诞,也就是說 KafkaRDD 的 partition 與 Kafka partition 是一一對(duì)應(yīng)的
通過以上分析辙售,我們可以對(duì)這兩種方式的區(qū)別做一個(gè)總結(jié):
- createStream會(huì)使用 Receiver;而createDirectStream不會(huì)
- createStream使用的 Receiver 會(huì)分發(fā)到某個(gè) executor 上去啟動(dòng)并接受數(shù)據(jù)飞涂;而createDirectStream直接在 driver 上接收數(shù)據(jù)
- createStream使用 Receiver 源源不斷的接收數(shù)據(jù)并把數(shù)據(jù)交給 ReceiverSupervisor 處理最終存儲(chǔ)為 blocks 作為 RDD 的輸入旦部,從 kafka 拉取數(shù)據(jù)與計(jì)算消費(fèi)數(shù)據(jù)相互獨(dú)立;而createDirectStream會(huì)在每個(gè) batch 拉取數(shù)據(jù)并就地消費(fèi)较店,到下個(gè) batch 再次拉取消費(fèi)士八,周而復(fù)始,從 kafka 拉取數(shù)據(jù)與計(jì)算消費(fèi)數(shù)據(jù)是連續(xù)的梁呈,沒有獨(dú)立開
- createStream中創(chuàng)建的KafkaInputDStream 每個(gè) batch 所對(duì)應(yīng)的 RDD 的 partition 不與 Kafka partition 一一對(duì)應(yīng)婚度;而createDirectStream中創(chuàng)建的 DirectKafkaInputDStream 每個(gè) batch 所對(duì)應(yīng)的 RDD 的 partition 與 Kafka partition 一一對(duì)應(yīng)