Kafka為一個分布式的消息隊列稠集,spark流操作kafka有兩種方式:
一種是利用接收器(receiver)和kafaka的高層API實(shí)現(xiàn)奶段。
一種是不利用接收器,直接用kafka底層的API來實(shí)現(xiàn)(spark1.3以后引入)剥纷。
1.Reveiver方式
基于Receiver方式實(shí)現(xiàn)會利用Kakfa的高層消費(fèi)API痹籍,和所有的其他Receivers一樣,接受到的數(shù)據(jù)會保存到excutors中晦鞋,然后由spark Streaming 來啟動Job進(jìn)行處理這些數(shù)據(jù)蹲缠。
在默認(rèn)的配置下,這種方式在失敗的情況下悠垛,會丟失數(shù)據(jù)线定,如果要保證零數(shù)據(jù)丟失,需要啟用WAL(Write Ahead Logs)确买。它同步將接受到數(shù)據(jù)保存到分布式文件系統(tǒng)上比如HDFS斤讥。 所以數(shù)據(jù)在出錯的情況下可以恢復(fù)出來。
使用兩個步驟:
1湾趾、添加依賴:spark-streaming-kafka_2.10-1.3.0
2芭商、編程:import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
注意:
1.kafka的分區(qū)數(shù)和Spark的RDD的分區(qū)不是一個概念派草。所以在上述函數(shù)中增加特定主題的分區(qū)數(shù),僅僅增加了一個receiver中消費(fèi)topic的線程數(shù)铛楣,并不難增加spark并行處理數(shù)據(jù)的數(shù)量近迁。
(那是不是多少個paratition最好對應(yīng)多少個receiver的消費(fèi)線程啊簸州?)
2.對于不同的group和topic鉴竭,可以使用多個recivers創(chuàng)建多個DStreams來并行處理數(shù)據(jù)(如果是同一個topic如何保證數(shù)據(jù)不被重復(fù)消費(fèi)?)岸浑。
3.如果啟用了WAL拓瞪,接收到的數(shù)據(jù)會被持久化一份到日志中,因此需要將storage_lever設(shè)置成StorgeLevel.MEMORY_AND_DISK_SER開啟:
val conf = new SparkConf()
conf.set("spark.streaming.receiver.writeAheadLog.enable","true")
val sc= new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(5))
ssc.checkpoint("checkpoint")
val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
//開啟在強(qiáng)行終止的情況下助琐,數(shù)據(jù)仍然會丟失,解決辦法:
sys.addShutdownHook({
ssc.stop(true,true)
)})
3面氓、運(yùn)行
運(yùn)行提交代碼的時候兵钮,需要添加以下基本Jar包依賴:
--jars lib/spark-streaming-kafka_2.10-1.3.0.jar, lib/spark-streaming_2.10-1.3.0.jar, lib/kafka_2.10-0.8.1.1.jar,lib/zkclient-0.3.jar
4、例子
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
//保證元數(shù)據(jù)恢復(fù)舌界,就是Driver端掛了之后數(shù)據(jù)仍然可以恢復(fù)
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
5掘譬、圖示:

2.直接操作方式
不同于Receiver接收數(shù)據(jù)方式,這種方式定期從kafka的topic下對應(yīng)的partition中查詢最新偏移量呻拌,并在每個批次中根據(jù)相應(yīng)的定義的偏移范圍進(jìn)行處理葱轩。Spark通過調(diào)用kafka簡單的消費(fèi)者API讀取一定范圍的數(shù)據(jù)。
相比基于Receiver方式有幾個優(yōu)點(diǎn):
1藐握、簡單的并發(fā):
不需要創(chuàng)建多個kafka輸入流靴拱,然后Union他們,而使用DirectStream猾普,spark Streaming將會創(chuàng)建和kafka分區(qū)一樣的RDD的分區(qū)數(shù)袜炕,而且會從kafka并行讀取數(shù)據(jù),Spark的分區(qū)數(shù)和Kafka的分區(qū)數(shù)是一一對應(yīng)的關(guān)系初家。
2偎窘、高效
第一種實(shí)現(xiàn)數(shù)據(jù)的零丟失是將數(shù)據(jù)預(yù)先保存在WAL中,會復(fù)制一遍數(shù)據(jù)溜在,會導(dǎo)致數(shù)據(jù)被拷貝兩次:一次是被Kafka復(fù)制陌知;另一次是寫入到WAL中,沒有Receiver消除了這個問題掖肋。
3仆葡、僅一次語義:
Receiver方式讀取kafka,使用的是高層API將偏移量寫入ZK中培遵,雖然這種方法可以通過數(shù)據(jù)保存在WAL中保證數(shù)據(jù)的不對浙芙,但是可能會因為sparkStreaming和ZK中保存的偏移量不一致而導(dǎo)致數(shù)據(jù)被消費(fèi)了多次登刺,
第二種方式不采用ZK保存偏移量,消除了兩者的不一致嗡呼,保證每個記錄只被Spark Streaming操作一次纸俭,即使是在處理失敗的情況下。如果想更新ZK中的偏移量數(shù)據(jù)南窗,需要自己寫代碼來實(shí)現(xiàn)揍很。
步驟:
1、引入依賴
同第一種方式万伤。
2窒悔、編程
val directKafkaStream = KafkaUtils.createDirectStream[[key class], [value class], [key decoder class], [value decoder class] ](streamingContext, [map of Kafka parameters], [set of topics to consume])```
3、如果想獲得每個topic中每個分區(qū)的在spark streaming中的偏移量敌买,可以通過以下代碼:
directKafkaStream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
// offsetRanges.length = # of Kafka partitions being consumed
...
}
//例子:
val ssc = new StreamingContext(sc, Seconds(2))
val kafkaParams = Map("zookeeper.connect" -> zkConnect,
"group.id" -> kafkaGroupId,
"metadata.broker.list" -> "10.15.42.23:8092,10.15.42.22:8092",
"auto.offset.reset" -> "smallest"
)
val topics = Set(topic)
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
//KafkaCluster 需要從源碼拷貝简珠,此類是私有類。
directKafkaStream.foreachRDD(
rdd => {
val offsetLists = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val kc = new KafkaCluster(kafkaParams)
for (offsets <- offsetLists) {
val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
val o = kc.setConsumerOffsets(kafkaGroupId, Map((topicAndPartition, offsets.untilOffset)))
if (o.isLeft) {
println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
}
)
3虹钮、部署:
同第一種方式聋庵。
4、圖示:

##說明
圖片均來自互聯(lián)網(wǎng)芙粱,根據(jù)Spark官網(wǎng)的文章總結(jié)翻譯而來祭玉。