(基于最新的Kafka version 0.10.2 new consumer API )想要Spark Streaming精確一次消費Topic蘸劈?拿去不謝,記得點贊和分享尊沸!

本文基于Spark2.1.0威沫、Kafka 0.10.2、Scala 2.11.8版本

背景:

Kafka做為一款流行的分布式發(fā)布訂閱消息系統(tǒng)洼专,以高吞吐棒掠、低延時、高可靠的特點著稱屁商,已經(jīng)成為Spark Streaming常用的流數(shù)據(jù)來源烟很。

常用的ETL架構(gòu)


1,Kafka Topic的消費保證:

流數(shù)據(jù)蜡镶,顧名思義雾袱,就是無界的、流動的官还、快速的芹橡、連續(xù)到達的數(shù)據(jù)序列,所以它不像可靠的文件系統(tǒng)(如HDFS)在計算出現(xiàn)故障時望伦,可以隨時恢復(fù)數(shù)據(jù)來重新計算林说。

那么,如何保證流數(shù)據(jù)可靠的傳遞呢屯伞?我們先了解下面的概念:

Producer通過Broker傳遞消息給Consumer腿箩,Consumer消費消息,

P-B-C 3者之間的傳輸劣摇,主要有以下幾種可能的場景:

At most once(最多傳輸一次): 消息可能會丟珠移,但絕不會重復(fù)傳輸;

At least one ?(至少傳輸一次): ?消息絕不會丟,但可能會重復(fù)傳輸;

Exactly once(精確傳輸一次): ?每條消息肯定會被傳輸一次且僅傳輸一次饵撑,不會重復(fù).

3種場景剑梳,適合不同生產(chǎn)環(huán)境的需要,相關(guān)介紹網(wǎng)上很多滑潘,這里就不多說了垢乙。

本文的重點是,如何用Spark 2.1.0语卤、Kafka 0.10.2追逮、spark-streaming-kafka-0-10_2.11-2.1.0.jar酪刀、HBase 1.3.0來配合實現(xiàn)消息Exactly once(精確一次)的傳遞和消費。網(wǎng)上相關(guān)的scala或者java代碼钮孵,都是基于老版本的API骂倘,目前沒有發(fā)現(xiàn)基于new Kafka consumer API的實現(xiàn),所以看到本文覺得有收獲的同學(xué)巴席,就給點個贊吧历涝。

2,Kafka 0.10.2版本介紹:

Kafka 0.10.2版本漾唉,為了和Zookeeper的解耦荧库,較之前的版本有了很大的變化,老版本的高級API和簡單API的說法不見了赵刑,取而代之的是New Consumer API及New Consumer Configs分衫,相關(guān)接口的參數(shù)及P-B-C 3者的配置參數(shù)有了很多改動。

Spark官方與之配合的工具包spark-streaming-kafka-0-10_2.11-2.1.0.jar 也做了相應(yīng)的改變般此,取消了KafkaCluster類蚪战、取消了ZkUtils.updatePersistentPath等多個方法,也都是為了不在將Topic offset由zookeeper自動保存铐懊,而由用戶靈活的選擇Kafka和Spark 2.1.0官方提供的幾種方法來保存offset邀桑,最好的使用情況下,端到端的業(yè)務(wù)可以達到精確一次的消費保證居扒。

(為了美觀概漱,本文相關(guān)的java代碼都用貼圖方式展現(xiàn)了丑慎,最終實現(xiàn)的端到端精確一次消費消息的源碼見文末的鏈接)

3喜喂,Kafka官方提供的多種消費保證:

Consumer的3個重要的配置,需要配合使用竿裂,來達到Broker到Consumer之間精確一次的消費保證玉吁。

請看這些參數(shù)的組合(有點繞,請仔細看)

(enable.auto.commit:false) + (auto.offset.reset:earliest):

在Broker到Consumer之間實現(xiàn)了至少一次語義腻异,因為不使用Kafka提供的自動保存offset功能进副,每次應(yīng)用程序啟動時,都是從Topic的初始位置來獲取消息悔常。也就是說影斑,應(yīng)用程序因為故障失敗,或者是人為的停止机打,再次啟動應(yīng)用程序時矫户,都會從初始位置把指定的Topic所有的消息都消費一遍,這就導(dǎo)致了Consumer會重復(fù)消費残邀。

(enable.auto.commit:false) + (auto.offset.reset:latest):

在Broker到Consumer之間實現(xiàn)了至多一次語義皆辽,因為不使用Kafka提供的自動保存offset功能柑蛇,每次應(yīng)用程序啟動時,都是從Topic的末尾位置來獲取消息驱闷。也就是說耻台,應(yīng)用程序因為故障失敗,或者是人為的停止后空另,如果Producer向Broker發(fā)送新的消息盆耽,當再次啟動應(yīng)用程序時,Consumer從指定的Topic的末尾來開始消費扼菠,這就導(dǎo)致了這部分新產(chǎn)生的消息丟失征字。

(enable.auto.commit:true)+(auto.offset.reset:earliest)+(auto.commit.interval.ms) :

在Broker到Consumer之間實現(xiàn)了精確一次語義,因為使用了Kafka提供的自動保存offset功能娇豫,當應(yīng)用程序第一次啟動時匙姜,首先從Topic的初試位置來獲取消息,原有的消息一個都沒有丟失冯痢;緊接著氮昧,在auto.commit.interval.ms時間后,Kafka會使用coordinator協(xié)議commit當前的offset(topic的每個分區(qū)的offset)浦楣。當應(yīng)用程序因為故障失敗袖肥,或者是人為的停止,再次啟動應(yīng)用程序時振劳,都會從coordinator模塊獲取Topic的offset椎组,從上一次消費結(jié)束的位置繼續(xù)消費,所以不會重復(fù)消費已經(jīng)消費過的消息历恐,也不會丟失在應(yīng)用程序停止期間新產(chǎn)生的消息寸癌,做到了Broker到Consumer之間精確一次的傳遞。

下面是Kafka 0.10.2 ConsumerCoordinator.java的源碼片段弱贼,用戶配置enable.auto.commit:true對應(yīng)的代碼是autoCommitEnabled為true蒸苇,最終調(diào)用doAutoCommitOffsetsAsync,使用coordinator協(xié)議保存offset(注意吮旅,最新版本已經(jīng)和zookeeper解耦溪烤,不會把offset保存在zookeeper中,所以通過zkCli.sh是看不到相關(guān)topic的)

下面是實現(xiàn)的Spark Streaming代碼庇勃。

當然檬嘀,這還遠遠不夠,因為這樣的方式责嚷,會出現(xiàn)業(yè)務(wù)兩段性的后果:

1鸳兽,讀完消息先commit再處理消息。這種模式下再层,如果consumer在commit后還沒來得及處理消息就crash了贸铜,下次重新開始工作后就無法讀到剛剛已提交而未處理的消息堡纬,這就對應(yīng)于At most once;

2蒿秦,讀完消息先處理再commit烤镐。這種模式下,如果處理完了消息在commit之前consumer crash了棍鳖,下次重新開始工作時還會處理剛剛未commit的消息炮叶,實際上該消息已經(jīng)被處理過了。這就對應(yīng)于At least once渡处。

所以镜悉,要想實現(xiàn)端到端消息的精確一次消費,還需要耐心往后看医瘫。

3侣肄,Spark官方提供的多種消費保證:(基于spark-streaming-kafka-0-10_2.11-2.1.0.jar,相比前一個版本有很多改變)

CheckPoint:

通過設(shè)置Driver程序的checkpoint醇份,來保存topic offset稼锅。這種方法很簡單,但是缺陷也很大:應(yīng)用程序有改變時僚纷,無法使用原來的checkpoint來恢復(fù)offset矩距;只能滿足Broker到Consumer之間精確一次的傳遞。

當應(yīng)用程序第一次啟動時怖竭,首先從Topic的初試位置來獲取消息锥债,原有的消息一個都沒有丟失;緊接著痊臭,在batch時間到達后哮肚,Spark會使用checkpoint保存當前的offset(topic的每個分區(qū)的offset)。當應(yīng)用程序失敗或者人為停止后趣兄,再次啟動應(yīng)用程序時绽左,都會從checkpoint恢復(fù)Topic的offset,從上一次消費結(jié)束的位置繼續(xù)消費艇潭,所以不會重復(fù)消費已經(jīng)消費過的消息,也不會丟失在應(yīng)用程序停止期間新產(chǎn)生的消息戏蔑。

實現(xiàn)的Spark Streaming代碼如下(注意:Spark 1.6.3之后蹋凝,檢查checkpoint的實現(xiàn)已經(jīng)不在用JavaStreamingContextFactory工廠操作了,請細看我的代碼是怎么做的)

Kafka itself:

和前面提到的enable.auto.commit:true異曲同工总棵,不過這里用commitAsync方法異步的把offset提交給Kafka 鳍寂。當應(yīng)用程序第一次啟動時,首先從Topic的初試位置來獲取消息情龄,原有的消息一個都沒有丟失迄汛;緊接著捍壤,用commitAsync方法異步的把offset提交給Kafka(topic的每個分區(qū)的offset)。當應(yīng)用程序失敗或者人為停止后鞍爱,再次啟動應(yīng)用程序時鹃觉,都會從kafka恢復(fù)Topic的offset,從上一次消費結(jié)束的位置繼續(xù)消費睹逃,所以不會重復(fù)消費已經(jīng)消費過的消息盗扇,也不會丟失在應(yīng)用程序停止期間新產(chǎn)生的消息。

與checkpoint相比沉填,應(yīng)用程序代碼的更改不會影響offset的存儲和獲取疗隶。然而,這樣的操作不是事務(wù)性的翼闹,由于是異步提交offset斑鼻,當提交offset過程中應(yīng)用程序crash,則無法保存正確的offset猎荠,會導(dǎo)致消息丟失或者重復(fù)消費卵沉。

實現(xiàn)的Spark Streaming代碼如下:

Your own data store:(當當當當,好戲出場)

如果要做到消息端到端的Exactly once消費法牲,就需要事務(wù)性的處理offset和實際操作的輸出史汗。

經(jīng)典的做法讓offset和操作輸出存在同一個地方,會更簡潔和通用拒垃。比如停撞,consumer把最新的offset和加工后的數(shù)據(jù)一起寫到HBase中,那就可以保證數(shù)據(jù)的輸出和offset的更新要么都成功悼瓮,要么都失敗戈毒,間接實現(xiàn)事務(wù)性,最終做到消息的端到端的精確一次消費横堡。(新版本的官網(wǎng)中只字未提使用Zookeeper保存offset埋市,是有多嫌棄??)

實現(xiàn)的Spark Streaming代碼如下(ConsumerRecord類不能序列化,使用時要注意命贴,不要分發(fā)該類到其他工作節(jié)點上道宅,避免錯誤打印)

其實說白了胸蛛,官方提供的思路就是污茵,把JavaInputDStream轉(zhuǎn)換為OffsetRange對象,該對象具有topic對應(yīng)的分區(qū)的所有信息葬项,每次batch處理完悲关,Spark Streaming都會自動更新該對象侍匙,所以你只需要找個合適的地方保存該對象(比如HBase蒜撮、HDFS),就可以愉快的操縱offset了盗飒。

4,相關(guān)鏈接

本文實現(xiàn)的精確一次消費的Java源代碼

Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)

Kafka 0.10.2 Documentation

(如需轉(zhuǎn)載陋桂,請標明作者和出處)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末逆趣,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子章喉,更是在濱河造成了極大的恐慌汗贫,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件秸脱,死亡現(xiàn)場離奇詭異落包,居然都是意外死亡,警方通過查閱死者的電腦和手機摊唇,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進店門咐蝇,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人巷查,你說我怎么就攤上這事有序。” “怎么了岛请?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵旭寿,是天一觀的道長。 經(jīng)常有香客問我崇败,道長盅称,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任后室,我火速辦了婚禮缩膝,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘岸霹。我一直安慰自己疾层,他們只是感情好,可當我...
    茶點故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布贡避。 她就那樣靜靜地躺著痛黎,像睡著了一般。 火紅的嫁衣襯著肌膚如雪贸桶。 梳的紋絲不亂的頭發(fā)上舅逸,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天,我揣著相機與錄音皇筛,去河邊找鬼。 笑死坠七,一個胖子當著我的面吹牛水醋,可吹牛的內(nèi)容都是我干的旗笔。 我是一名探鬼主播,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼拄踪,長吁一口氣:“原來是場噩夢啊……” “哼蝇恶!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起惶桐,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤撮弧,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后姚糊,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體贿衍,經(jīng)...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年救恨,在試婚紗的時候發(fā)現(xiàn)自己被綠了贸辈。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,059評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡肠槽,死狀恐怖擎淤,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情秸仙,我是刑警寧澤嘴拢,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站寂纪,受9級特大地震影響席吴,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜弊攘,卻給世界環(huán)境...
    茶點故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一抢腐、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧襟交,春花似錦迈倍、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至焕梅,卻和暖如春迹鹅,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背贞言。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工斜棚, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓弟蚀,卻偏偏與公主長得像蚤霞,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子义钉,可洞房花燭夜當晚...
    茶點故事閱讀 42,792評論 2 345

推薦閱讀更多精彩內(nèi)容

  • 背景介紹 Kafka簡介 Kafka是一種分布式的昧绣,基于發(fā)布/訂閱的消息系統(tǒng)。主要設(shè)計目標如下: 以時間復(fù)雜度為O...
    高廣超閱讀 12,818評論 8 167
  • 原文鏈接 翻譯者:倪辰皓 最后翻譯時間:2017/06/18 目前狀態(tài):已完成 轉(zhuǎn)載請附上本文鏈接: http:/...
    即墨燈火閱讀 1,168評論 0 4
  • Kafka入門經(jīng)典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語閱讀 10,810評論 4 54
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理捶闸,服務(wù)發(fā)現(xiàn)夜畴,斷路器,智...
    卡卡羅2017閱讀 134,599評論 18 139
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,431評論 0 34