1.1 Receiver-based Approach
這種方式利用接收器(Receiver)來接收kafka中的數(shù)據(jù),其最基本是使用Kafka高階用戶API接口往枣。對于所有的接收器,從kafka接收來的數(shù)據(jù)會存儲在spark的executor中,之后spark streaming提交的job會處理這些數(shù)據(jù)。
Receiver-based的Kafka讀取方式是基于Kafka高階(high-level) api來實現(xiàn)對Kafka數(shù)據(jù)的消費柿菩。在提交Spark Streaming任務后,Spark集群會劃出指定的Receivers來專門雨涛、持續(xù)不斷枢舶、異步讀取Kafka的數(shù)據(jù),讀取時間間隔以及每次讀取offsets范圍可以由參數(shù)來配置替久。讀取的數(shù)據(jù)保存在Receiver中凉泄,具體StorageLevel方式由用戶指定,諸如MEMORY_ONLY等侣肄。當driver 觸發(fā)batch任務的時候旧困,Receivers中的數(shù)據(jù)會轉(zhuǎn)移到剩余的Executors中去執(zhí)行醇份。在執(zhí)行完之后稼锅,Receivers會相應更新ZooKeeper的offsets。如要確保at least once的讀取方式僚纷,可以設置spark.streaming.receiver.writeAheadLog.enable為true矩距。具體Receiver執(zhí)行流程見下圖:
import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
還有幾個需要注意的點:
- 在Receiver的方式中,Spark中的partition和kafka中的partition并不是相關的怖竭,所以如果我們加大每個topic的partition數(shù)量锥债,僅僅是增加線程來處理由單一Receiver消費的主題。但是這并沒有增加Spark在處理數(shù)據(jù)上的并行度。
- 對于不同的Group和topic我們可以使用多個Receiver創(chuàng)建不同的Dstream來并行接收數(shù)據(jù)哮肚,之后可以利用union來統(tǒng)一成一個Dstream登夫。
- 如果我們啟用了Write Ahead Logs復制到文件系統(tǒng)如HDFS,那么storage level需要設置成StorageLevel.MEMORY_AND_DISK_SER允趟,也就
是KafkaUtils.createStream(...,StorageLevel.MEMORY_AND_DISK_SER)
4.1.1 源碼分析
需要查看
currentBuffer
在哪里被處理恼策,搜索代碼,發(fā)現(xiàn)而
updateCurrentBuffer
方法又在這里被回調(diào)period
就是上面?zhèn)魅氲模?code>blockIntervalMs,默認200ms切一個block潮剪,這也是一個優(yōu)化參數(shù)這里參考博客涣楷,暫時未找到從哪里找到的
BlockRDD
類中getPartitions
方法是說將這個batch的blocks作為partitions。Compute
方法則按照入?yún)?code>BlockRDDPartition的blockId
抗碰,從blockManager
中獲取該block作為partition的數(shù)據(jù)狮斗。getPreferredLocations
則是將BlockRDDPartition
所在的host作為partition的首選位置,移動計算,不移動數(shù)據(jù)原則弧蝇。
1.2 Direct Approach (No Receivers)
Direct方式采用Kafka簡單的consumer api方式來讀取數(shù)據(jù)碳褒,無需經(jīng)由ZooKeeper,此種方式不再需要專門Receiver來持續(xù)不斷讀取數(shù)據(jù)看疗。當batch任務觸發(fā)時骤视,由Executor讀取數(shù)據(jù),并參與到其他Executor的數(shù)據(jù)計算過程中去鹃觉。driver來決定讀取多少offsets专酗,并將offsets交由checkpoints來維護。將觸發(fā)下次batch任務盗扇,再由Executor讀取Kafka數(shù)據(jù)并計算祷肯。從此過程我們可以發(fā)現(xiàn)Direct方式無需Receiver讀取數(shù)據(jù),而是需要計算時再讀取數(shù)據(jù)疗隶,所以Direct方式的數(shù)據(jù)消費對內(nèi)存的要求不高佑笋,只需要考慮批量計算所需要的內(nèi)存即可;另外batch任務堆積時斑鼻,也不會影響數(shù)據(jù)堆積蒋纬。其具體讀取方式如下圖:
這種方法相較于Receiver方式的優(yōu)勢在于:
- 簡化的并行:在Receiver的方式中我們提到創(chuàng)建多個Receiver之后利用union來合并成一個Dstream的方式提高數(shù)據(jù)傳輸并行度。而在Direct方式中坚弱,Kafka中的partition與RDD中的partition是一一對應的并行讀取Kafka數(shù)據(jù)蜀备,這種映射關系也更利于理解和優(yōu)化。
- 高效:在Receiver的方式中荒叶,為了達到0數(shù)據(jù)丟失需要將數(shù)據(jù)存入Write Ahead Log中碾阁,這樣在Kafka和日志中就保存了兩份數(shù)據(jù),浪費些楣!而第二種方式不存在這個問題脂凶,只要我們Kafka的數(shù)據(jù)保留時間足夠長宪睹,我們都能夠從Kafka進行數(shù)據(jù)恢復。
- 精確一次:在Receiver的方式中蚕钦,使用的是Kafka的高階API接口從Zookeeper中獲取offset值亭病,這也是傳統(tǒng)的從Kafka中讀取數(shù)據(jù)的方式,但由于Spark Streaming消費的數(shù)據(jù)和Zookeeper中記錄的offset不同步嘶居,這種方式偶爾會造成數(shù)據(jù)重復消費命贴。而第二種方式,直接使用了簡單的低階Kafka API食听,Offsets則利用Spark Streaming的checkpoints進行記錄胸蛛,消除了這種不一致性。
4.2.1 源碼分析
val stream = KafkaUtils.createDirectStream()
這里兩個優(yōu)化參數(shù):
val refreshLeaderBackoffMs = props.getInt("refresh.leader.backoff.ms", RefreshMetadataBackoffMs)
val maxRetries = context.sparkContext.getConf.getInt("spark.streaming.kafka.maxRetries", 1)
優(yōu)化參數(shù) "spark.streaming.kafka.maxRatePerPartition"
在getPartitions方法中可以看到樱报,KafkaRDD的partition個數(shù)就是topic的partition個數(shù)之和葬项。
compute方法是RDD用來構建一個partition的數(shù)據(jù)的。