Spark Streaming讀取Kafka數(shù)據(jù)

Kafka為一個分布式的消息隊列稠集,spark流操作kafka有兩種方式:
一種是利用接收器(receiver)和kafaka的高層API實(shí)現(xiàn)奶段。
一種是不利用接收器,直接用kafka底層的API來實(shí)現(xiàn)(spark1.3以后引入)剥纷。

1.Reveiver方式

基于Receiver方式實(shí)現(xiàn)會利用Kakfa的高層消費(fèi)API痹籍,和所有的其他Receivers一樣,接受到的數(shù)據(jù)會保存到excutors中晦鞋,然后由spark Streaming 來啟動Job進(jìn)行處理這些數(shù)據(jù)蹲缠。
在默認(rèn)的配置下,這種方式在失敗的情況下悠垛,會丟失數(shù)據(jù)线定,如果要保證零數(shù)據(jù)丟失,需要啟用WAL(Write Ahead Logs)确买。它同步將接受到數(shù)據(jù)保存到分布式文件系統(tǒng)上比如HDFS斤讥。 所以數(shù)據(jù)在出錯的情況下可以恢復(fù)出來。
使用兩個步驟:
1湾趾、添加依賴:spark-streaming-kafka_2.10-1.3.0
2芭商、編程:import org.apache.spark.streaming.kafka._

val kafkaStream = KafkaUtils.createStream(streamingContext,[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

注意:
1.kafka的分區(qū)數(shù)和Spark的RDD的分區(qū)不是一個概念派草。所以在上述函數(shù)中增加特定主題的分區(qū)數(shù),僅僅增加了一個receiver中消費(fèi)topic的線程數(shù)铛楣,并不難增加spark并行處理數(shù)據(jù)的數(shù)量近迁。
(那是不是多少個paratition最好對應(yīng)多少個receiver的消費(fèi)線程啊簸州?)
2.對于不同的group和topic鉴竭,可以使用多個recivers創(chuàng)建多個DStreams來并行處理數(shù)據(jù)(如果是同一個topic如何保證數(shù)據(jù)不被重復(fù)消費(fèi)?)岸浑。
3.如果啟用了WAL拓瞪,接收到的數(shù)據(jù)會被持久化一份到日志中,因此需要將storage_lever設(shè)置成StorgeLevel.MEMORY_AND_DISK_SER開啟:

val conf = new SparkConf()
conf.set("spark.streaming.receiver.writeAheadLog.enable","true")
val sc= new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(5))
ssc.checkpoint("checkpoint")
val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
 
//開啟在強(qiáng)行終止的情況下助琐,數(shù)據(jù)仍然會丟失,解決辦法:
sys.addShutdownHook({
  ssc.stop(true,true)
)})

3面氓、運(yùn)行
運(yùn)行提交代碼的時候兵钮,需要添加以下基本Jar包依賴:
 --jars lib/spark-streaming-kafka_2.10-1.3.0.jar,     lib/spark-streaming_2.10-1.3.0.jar,     lib/kafka_2.10-0.8.1.1.jar,lib/zkclient-0.3.jar

4、例子

object KafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }
  
    StreamingExamples.setStreamingLogLevels()
  
    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
    val ssc =  new StreamingContext(sparkConf, Seconds(2))
    //保證元數(shù)據(jù)恢復(fù)舌界,就是Driver端掛了之后數(shù)據(jù)仍然可以恢復(fù)
    ssc.checkpoint("checkpoint")
  
    val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L))
      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
    wordCounts.print()
  
    ssc.start()
    ssc.awaitTermination()
  }
}

5掘譬、圖示:

Receiver
<接收示意圖>
<接收示意圖>
數(shù)據(jù)恢復(fù)
Paste_Image.png

2.直接操作方式

不同于Receiver接收數(shù)據(jù)方式,這種方式定期從kafka的topic下對應(yīng)的partition中查詢最新偏移量呻拌,并在每個批次中根據(jù)相應(yīng)的定義的偏移范圍進(jìn)行處理葱轩。Spark通過調(diào)用kafka簡單的消費(fèi)者API讀取一定范圍的數(shù)據(jù)。
相比基于Receiver方式有幾個優(yōu)點(diǎn):
1藐握、簡單的并發(fā):
不需要創(chuàng)建多個kafka輸入流靴拱,然后Union他們,而使用DirectStream猾普,spark Streaming將會創(chuàng)建和kafka分區(qū)一樣的RDD的分區(qū)數(shù)袜炕,而且會從kafka并行讀取數(shù)據(jù),Spark的分區(qū)數(shù)和Kafka的分區(qū)數(shù)是一一對應(yīng)的關(guān)系初家。
2偎窘、高效
第一種實(shí)現(xiàn)數(shù)據(jù)的零丟失是將數(shù)據(jù)預(yù)先保存在WAL中,會復(fù)制一遍數(shù)據(jù)溜在,會導(dǎo)致數(shù)據(jù)被拷貝兩次:一次是被Kafka復(fù)制陌知;另一次是寫入到WAL中,沒有Receiver消除了這個問題掖肋。
3仆葡、僅一次語義:
Receiver方式讀取kafka,使用的是高層API將偏移量寫入ZK中培遵,雖然這種方法可以通過數(shù)據(jù)保存在WAL中保證數(shù)據(jù)的不對浙芙,但是可能會因為sparkStreaming和ZK中保存的偏移量不一致而導(dǎo)致數(shù)據(jù)被消費(fèi)了多次登刺,
第二種方式不采用ZK保存偏移量,消除了兩者的不一致嗡呼,保證每個記錄只被Spark Streaming操作一次纸俭,即使是在處理失敗的情況下。如果想更新ZK中的偏移量數(shù)據(jù)南窗,需要自己寫代碼來實(shí)現(xiàn)揍很。

直接操作

步驟:
1、引入依賴
同第一種方式万伤。
2窒悔、編程

val directKafkaStream = KafkaUtils.createDirectStream[[key class], [value class], [key decoder class], [value decoder class] ](streamingContext, [map of Kafka parameters], [set of topics to consume])```
3、如果想獲得每個topic中每個分區(qū)的在spark streaming中的偏移量敌买,可以通過以下代碼:

directKafkaStream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
// offsetRanges.length = # of Kafka partitions being consumed
...
}
//例子:
val ssc = new StreamingContext(sc, Seconds(2))
val kafkaParams = Map("zookeeper.connect" -> zkConnect,
"group.id" -> kafkaGroupId,
"metadata.broker.list" -> "10.15.42.23:8092,10.15.42.22:8092",
"auto.offset.reset" -> "smallest"
)
val topics = Set(topic)

val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)

//KafkaCluster 需要從源碼拷貝简珠,此類是私有類。
directKafkaStream.foreachRDD(
rdd => {
val offsetLists = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val kc = new KafkaCluster(kafkaParams)
for (offsets <- offsetLists) {
val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
val o = kc.setConsumerOffsets(kafkaGroupId, Map((topicAndPartition, offsets.untilOffset)))
if (o.isLeft) {
println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
}
)

3虹钮、部署:
同第一種方式聋庵。
 
4、圖示:

![直接操作](http://upload-images.jianshu.io/upload_images/1737506-05b1de922c684896.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)


##說明
圖片均來自互聯(lián)網(wǎng)芙粱,根據(jù)Spark官網(wǎng)的文章總結(jié)翻譯而來祭玉。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市春畔,隨后出現(xiàn)的幾起案子脱货,更是在濱河造成了極大的恐慌,老刑警劉巖律姨,帶你破解...
    沈念sama閱讀 211,376評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件振峻,死亡現(xiàn)場離奇詭異,居然都是意外死亡线召,警方通過查閱死者的電腦和手機(jī)铺韧,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,126評論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來缓淹,“玉大人哈打,你說我怎么就攤上這事⊙逗” “怎么了料仗?”我有些...
    開封第一講書人閱讀 156,966評論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長伏蚊。 經(jīng)常有香客問我立轧,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,432評論 1 283
  • 正文 為了忘掉前任氛改,我火速辦了婚禮帐萎,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘胜卤。我一直安慰自己疆导,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,519評論 6 385
  • 文/花漫 我一把揭開白布葛躏。 她就那樣靜靜地躺著澈段,像睡著了一般饺蔑。 火紅的嫁衣襯著肌膚如雪肥照。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,792評論 1 290
  • 那天拢锹,我揣著相機(jī)與錄音摩窃,去河邊找鬼兽叮。 笑死,一個胖子當(dāng)著我的面吹牛猾愿,可吹牛的內(nèi)容都是我干的充择。 我是一名探鬼主播,決...
    沈念sama閱讀 38,933評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼匪蟀,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了宰僧?” 一聲冷哼從身側(cè)響起材彪,我...
    開封第一講書人閱讀 37,701評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎琴儿,沒想到半個月后段化,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,143評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡造成,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,488評論 2 327
  • 正文 我和宋清朗相戀三年显熏,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片晒屎。...
    茶點(diǎn)故事閱讀 38,626評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡喘蟆,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出鼓鲁,到底是詐尸還是另有隱情蕴轨,我是刑警寧澤,帶...
    沈念sama閱讀 34,292評論 4 329
  • 正文 年R本政府宣布骇吭,位于F島的核電站橙弱,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜棘脐,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,896評論 3 313
  • 文/蒙蒙 一斜筐、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧蛀缝,春花似錦顷链、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至俘闯,卻和暖如春潭苞,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背真朗。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工此疹, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人遮婶。 一個月前我還...
    沈念sama閱讀 46,324評論 2 360
  • 正文 我出身青樓蝗碎,卻偏偏與公主長得像,于是被迫代替她去往敵國和親旗扑。 傳聞我的和親對象是個殘疾皇子蹦骑,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,494評論 2 348

推薦閱讀更多精彩內(nèi)容