1、 createDirectStream
自 Spark-1.3.0 起今豆,提供了不需要 Receiver 的方法灯蝴。替代了使用 receivers 來接收數(shù)據(jù),該方法定期查詢每個(gè) topic+partition 的 lastest offset聂渊,并據(jù)此決定每個(gè) batch 要接收的 offsets 范圍
? 簡(jiǎn)化并行:不再需要?jiǎng)?chuàng)建多個(gè) kafka input DStream 然后再 union 這些 input DStream差购。使用 directStream,Spark Streaming會(huì)創(chuàng)建與 Kafka partitions 相同數(shù)量的 paritions 的 RDD汉嗽,RDD 的 partition與 Kafka 的 partition 一一對(duì)應(yīng)欲逃,這樣更易于理解及調(diào)優(yōu)
? 高效:在方式一中要保證數(shù)據(jù)零丟失需要啟用 WAL(預(yù)寫日志),這會(huì)占用更多空間饼暑。而在方式二中稳析,可以直接從 Kafka 指定的 topic 的指定 offsets 處恢復(fù)數(shù)據(jù),不需要使用 WAL
? 恰好一次語(yǔ)義保證:基于Receiver方式使用了 Kafka 的 high level API 來在 Zookeeper 中存儲(chǔ)已消費(fèi)的 offsets弓叛。這在某些情況下會(huì)導(dǎo)致一些數(shù)據(jù)被消費(fèi)兩次彰居,比如 streaming app 在處理某個(gè) batch 內(nèi)已接受到的數(shù)據(jù)的過程中掛掉,但是數(shù)據(jù)已經(jīng)處理了一部分邪码,但這種情況下無(wú)法將已處理數(shù)據(jù)的 offsets 更新到 Zookeeper 中裕菠,下次重啟時(shí),這批數(shù)據(jù)將再次被消費(fèi)且處理闭专∨耍基于direct的方式,使用kafka的簡(jiǎn)單api影钉,Spark Streaming自己就負(fù)責(zé)追蹤消費(fèi)的offset画髓,并保存在checkpoint中。Spark自己一定是同步的平委,因此可以保證數(shù)據(jù)是消費(fèi)一次且僅消費(fèi)一次奈虾。這種方式中,只要將 output 操作和保存 offsets 操作封裝成一個(gè)原子操作就能避免失敗后的重復(fù)消費(fèi)和處理廉赔,從而達(dá)到恰好一次的語(yǔ)義(Exactly-once)
2肉微、 createStream
這種方法使用一個(gè) Receiver 來接收數(shù)據(jù)徘郭。在該 Receiver 的實(shí)現(xiàn)中使用了 Kafka high-level consumer API鼓黔。Receiver 從 kafka 接收的數(shù)據(jù)將被存儲(chǔ)到 Spark executor 中徐绑,隨后啟動(dòng)的 job 將處理這些數(shù)據(jù)做鹰。
在默認(rèn)配置下眶痰,該方法失敗后會(huì)丟失數(shù)據(jù)(保存在 executor 內(nèi)存里的數(shù)據(jù)在 application 失敗后就沒了)诊杆,若要保證數(shù)據(jù)不丟失船侧,需要啟用 WAL(即預(yù)寫日志至 HDFS蔬螟、S3等),這樣再失敗后可以從日志文件中恢復(fù)數(shù)據(jù)铁孵。
在該函數(shù)中锭硼,會(huì)新建一個(gè) KafkaInputDStream對(duì)象,KafkaInputDStream繼承于 ReceiverInputDStream蜕劝。KafkaInputDStream實(shí)現(xiàn)了getReceiver方法檀头,返回接收器的實(shí)例
def getReceiver(): Receiver[(K, V)] = {
if (!useReliableReceiver) {
//< 不啟用 WAL
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
} else {
//< 啟用 WAL
new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
}
}
需要注意的點(diǎn):
? Kafka Topic 的 partitions 與RDD 的 partitions 沒有直接關(guān)系,不能一一對(duì)應(yīng)岖沛。如果增加 topic 的 partition 個(gè)數(shù)的話僅僅會(huì)增加單個(gè) Receiver 接收數(shù)據(jù)的線程數(shù)鳖擒。事實(shí)上,使用這種方法只會(huì)在一個(gè) executor 上啟用一個(gè) Receiver烫止,該 Receiver 包含一個(gè)線程池蒋荚,線程池的線程個(gè)數(shù)與所有 topics 的 partitions 個(gè)數(shù)總和一致,每條線程接收一個(gè) topic 的一個(gè) partition 的數(shù)據(jù)馆蠕。而并不會(huì)增加處理數(shù)據(jù)時(shí)的并行度期升。
? 對(duì)于一個(gè) topic,可以使用多個(gè) groupid 相同的 input DStream 來使用多個(gè) Receivers 來增加并行度互躬,然后 union 他們播赁;對(duì)于多個(gè) topics,除了可以用上個(gè)辦法增加并行度外吼渡,還可以對(duì)不同的 topic 使用不同的 input DStream 然后 union 他們來增加并行度
? 如果你啟用了 WAL容为,為能將接收到的數(shù)據(jù)將以 log 的方式在指定的存儲(chǔ)系統(tǒng)備份一份,需要指定輸入數(shù)據(jù)的存儲(chǔ)等級(jí)為 StorageLevel.MEMORY_AND_DISK_SER 或 StorageLevel.MEMORY_AND_DISK_SER_2