SparkStreaming如何維護Kafka消息偏移量

SparkStreaming? ? 維護????Kafka? ? 消息偏移量據(jù)個人了解有兩種方式

一牺堰、利用????SparkStreaming? ? 自帶的????Checkpoint? ? 方法來維護

二雇毫、自己來編寫維護????Kafka? ? 消息偏移量的代碼

首先說明下集群中的各組件版本

Hadoop-version:????2.7.1

Spark-version? ? :????1.6.2

zookeeper-version:????3.4.6

jdk-version:? ? ? ? ? ? ? 1.8

maven-version:? ? ? ? 3.3.3

kafka-version:? ? ? ? ? ?0.10.0

集群搭建方式? ? HDP(ambari)

首先說第一種????SparkStreaming? ? 自帶的????Checkpoint? ? 方法,以下為代碼示例

object Test {

def main(args: Array[String]): Unit = {

//? ? TODO:創(chuàng)建檢查點的位置可以設(shè)置為Hdfs如果程序重新啟動spark程序會到此目錄中檢查并恢復(fù)

? ? val checkpointDirectory ="";

//? ? TODO:調(diào)用getOrCreate方法,這個方法入如果是第一次運行該作業(yè)

? ? //? ? TODO:沒有checkpointDirectory該文件時

? ? //? ? TODO:將會重新創(chuàng)建一個StreamingContext搬俊,并從最新或是最老的偏移量處開始消費

? ? val ssc: StreamingContext = StreamingContext.getOrCreate(checkpointDirectory, () => {

createStreaming(checkpointDirectory)

})

ssc.start()

ssc.awaitTermination()

}

//? TODO:你的業(yè)務(wù)邏輯不應(yīng)該寫在main函數(shù)中,而是應(yīng)該寫在創(chuàng)建

? //? TODO:StreamingContext的方法中

? def createStreaming(checkpointDirectory:String): StreamingContext = {

? ??????val brokerlist:String ="192.168.1.1:6667,192.168.1.1:6667"

? ?? ???val topic =Set("test")

? ??????val kafkaParams:Map[String,String] =Map[String,String]("metadata.broker.list" -> brokerlist)

? ??????val conf: SparkConf =new SparkConf()

? ??????????????????????????????????.setMaster("local")

? ??????????????????????????????????.setAppName(Test.getClass.getSimpleName)

? ??????val ssc: StreamingContext =new StreamingContext(conf,Seconds(3))

//? ? TODO:在這里創(chuàng)建檢查點曲饱,至于檢查點在哪里創(chuàng)建悠抹,具體需要看個人的業(yè)務(wù)需求

? ??? ??ssc.checkpoint(checkpointDirectory)

? ??????val data: InputDStream[(String,String)] =

? ??????????????????????KafkaUtils.createDirectStream[String,String, StringDecoder, StringDecoder](ssc, kafkaParams, topic)

//? TODO:以下代碼就是你的業(yè)務(wù)邏輯了,這里只是循環(huán)遍歷打印了Key扩淀,Value

? ??? ??data.foreachRDD(rdd => {

? ??????????????rdd.foreachPartition(partition => {

? ??????????????????????if (partition.isEmpty) {

? ??????????????????????????????for (tuple <- partition) {

? ??????????????????????????????????println(tuple._1 + tuple._2)

? ??????????????????????????}

? ??????????????????????}

? ??????????????????})

? ??????????????})

? ??????????ssc

? ??????????}

? ??????}

說一下這個方法的缺點:

這個方法不適合總是需要迭代升級的應(yīng)用楔敌,因為這個方法會在你建立檢查點時將你的jar包信息以序列化的方式存在此目錄中,

如果你的作業(yè)掛掉重新啟動時驻谆,這時候是沒有問題的卵凑,因為什么都沒有改變。

但是在你的應(yīng)用迭代升級時你的代碼發(fā)生了變化胜臊,這是程序會發(fā)現(xiàn)其中的變化勺卢,你迭代升級后的版本將無法運行,就算是啟動成功了象对,

運行的也還是迭代升級之前的代碼黑忱。所還是以失敗而告終!勒魔!

在Spark官方文檔中給出了兩個解決辦法

第一個:老的作業(yè)不停機甫煞,新作業(yè)個老作業(yè)同時運行一段時間,這樣是不安全的9诰睢8Х汀!

會導(dǎo)致數(shù)據(jù)重復(fù)消費弟胀,也有可能會發(fā)生數(shù)據(jù)丟失等問題

第二個:就是我要講的自己維護消息偏移量

以下是自己維護消息偏移量代碼示例

object Test{

? ??????def main(args: Array[String]): Unit = {

//? ? TODO: Zookeeper 集群地址和端口

??? ??????????? val zkHost:String ="192.168.1.1:2181,192.168.1.1:2181,192.168.1.1:2181"

? ? //? ? TODO:Kafka集群地址及端口

??? ??????????? val brokerlist:String ="192.168.1.1:6667,192.168.1.1:6667,192.168.1.1:6667"

? ? //? ? TODO:指定消費 Kafka 主題

??? ??????????? val topic:String ="test"

? ??? ??????????val kafkaParams:Map[String,String] =Map[String,String]("metadata.broker.list" -> brokerlist)

? ??? ??????????val conf: SparkConf =new SparkConf()

? ??????????????????????????????.setMaster("local[8]")

? ??????????????????????????????.setAppName(NGBoss_Dcc_Analysis.getClass.getSimpleName)

//? ? TODO:獲取一個StreamingContext對象

? ?? ???????????val ssc: StreamingContext =new StreamingContext(conf,Seconds(5))

//? ? TODO:將獲取的Zookeeper客戶端

? ??? ??????????val zkClient =new ZkClient(zkHost)

? ??????????????var offsetRanges: Array[OffsetRange] =Array[OffsetRange]()

//? ? TODO:設(shè)置在zookeeper中存儲offset的路徑

? ??? ??????????val topicDirs: ZKGroupTopicDirs =new ZKGroupTopicDirs("TEST_TOPIC_spark_streaming", topic)

? ??? ??????????val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}")


? ??? ??????????var fromOffsets:Map[TopicAndPartition, Long] =Map()

? ??????????????var kafkaStream: InputDStream[(String, Array[Byte])] =null

? ??? ? ? ? ? ? ? ? ? ? ??if (children >0) {

//TODO:如果 zookeeper 中有保存 offset楷力,我們會利用這個 offset 作為 kafkaStream 的起始位置

//TODO:如果保存過 offset喊式,這里更好的做法,還應(yīng)該和? kafka 上最小的 offset 做對比萧朝,不然會報 OutOfRange 的錯誤

? ? ?? ?????????????????????????for (i <-0 until children) {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")

? ??????????????????????????????????val tp =TopicAndPartition(topic, i)

//TODO:將不同 partition 對應(yīng)的 offset 增加到 fromOffsets 中

? ? ? ?? ???????????fromOffsets += (tp -> partitionOffset.toLong)

? ??????????????????}

//TODO:這個會將 kafka 的消息進行 transform岔留,最終 kafak 的數(shù)據(jù)都會變成 (topic_name, message) 這樣的 tuple

? ? ?? ???????val messageHandler: (MessageAndMetadata[String, Array[Byte]]) => (String, Array[Byte]) = (mmd:?? ??????????????????????????????MessageAndMetadata[String, Array[Byte]]) => (mmd.topic, mmd.message())

? ????????????kafkaStream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, ByteDecoder,?? ??????????????????????????????(String, Array[Byte])](ssc, kafkaParams, fromOffsets, messageHandler)

? ??????????}else {

//TODO:如果未保存,根據(jù) kafkaParam 的配置使用最新或者最舊的 offset

? ? ?? ???????????kafkaStream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, ByteDecoder]? ??????????????????????????????(ssc, kafkaParams,Set(topic))

? ??????????????}

? ??????????kafkaStream.foreachRDD(rdd => {

? ??????????????????if (!rdd.isEmpty()) {

? ??????????????????????rdd.foreachPartition(partition=> {

? ??????????????????????????????partition.foreach(tuple => {

? ??????????????????????????????????println(tuple._1 + tuple._2)

? ??????????????????????????????})

? ??????????????????????})

? ??????????????????offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

? ??????????????????????????????for (o <- offsetRanges) {

? ??????????????????????????????val zkPath:String =s"${topicDirs.consumerOffsetDir}/${o.partition}"

? ? ? ? ? //TODO:將該 partition 的 offset 保存到 zookeeper

? ? ? ? ?? ??????????????????????ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString)

? ??????????????????????????}

? ??????????????????????}

? ??????????????????})

? ??????????ssc.start()

? ??????ssc.awaitTermination()

? ??????}

? ??}

第二種方法中offset可以存儲在zookeeper中也可以存儲在數(shù)據(jù)庫中

自己維護offset是不會發(fā)生Checkpoint?中的問題剪勿,是我目前知道的最好的一個解決方案贸诚。

但是也有其中的缺點,如果從失敗中恢復(fù)運行時不能獲取到Key值厕吉,默認它的Key就是主題。

還有就是不能同時消費維護多個主題中的偏移量械念。

如果你有跟好的解決方案头朱,還請不要吝嗇你的知識!A浼酢O钆ァ!希停!

謝謝你可以在百忙之中看完我寫的文章K肝住!3枘堋Q窍丁!违崇!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末阿弃,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子羞延,更是在濱河造成了極大的恐慌渣淳,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,482評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件伴箩,死亡現(xiàn)場離奇詭異入愧,居然都是意外死亡,警方通過查閱死者的電腦和手機嗤谚,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評論 2 382
  • 文/潘曉璐 我一進店門棺蛛,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人呵恢,你說我怎么就攤上這事鞠值。” “怎么了渗钉?”我有些...
    開封第一講書人閱讀 152,762評論 0 342
  • 文/不壞的土叔 我叫張陵彤恶,是天一觀的道長钞钙。 經(jīng)常有香客問我,道長声离,這世上最難降的妖魔是什么芒炼? 我笑而不...
    開封第一講書人閱讀 55,273評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮术徊,結(jié)果婚禮上本刽,老公的妹妹穿的比我還像新娘。我一直安慰自己赠涮,他們只是感情好子寓,可當我...
    茶點故事閱讀 64,289評論 5 373
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著笋除,像睡著了一般斜友。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上垃它,一...
    開封第一講書人閱讀 49,046評論 1 285
  • 那天鲜屏,我揣著相機與錄音,去河邊找鬼国拇。 笑死洛史,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的酱吝。 我是一名探鬼主播也殖,決...
    沈念sama閱讀 38,351評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼掉瞳!你這毒婦竟也來了毕源?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,988評論 0 259
  • 序言:老撾萬榮一對情侶失蹤陕习,失蹤者是張志新(化名)和其女友劉穎霎褐,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體该镣,經(jīng)...
    沈念sama閱讀 43,476評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡冻璃,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,948評論 2 324
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了损合。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片省艳。...
    茶點故事閱讀 38,064評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖嫁审,靈堂內(nèi)的尸體忽然破棺而出跋炕,到底是詐尸還是另有隱情,我是刑警寧澤律适,帶...
    沈念sama閱讀 33,712評論 4 323
  • 正文 年R本政府宣布辐烂,位于F島的核電站遏插,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏纠修。R本人自食惡果不足惜胳嘲,卻給世界環(huán)境...
    茶點故事閱讀 39,261評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望扣草。 院中可真熱鬧了牛,春花似錦、人聲如沸辰妙。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽密浑。三九已至福荸,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間肴掷,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評論 1 262
  • 我被黑心中介騙來泰國打工背传, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留呆瞻,地道東北人。 一個月前我還...
    沈念sama閱讀 45,511評論 2 354
  • 正文 我出身青樓径玖,卻偏偏與公主長得像痴脾,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子梳星,可洞房花燭夜當晚...
    茶點故事閱讀 42,802評論 2 345

推薦閱讀更多精彩內(nèi)容