本文基于Spark2.1.0威沫、Kafka 0.10.2、Scala 2.11.8版本
背景:
Kafka做為一款流行的分布式發(fā)布訂閱消息系統(tǒng)洼专,以高吞吐棒掠、低延時、高可靠的特點著稱屁商,已經(jīng)成為Spark Streaming常用的流數(shù)據(jù)來源烟很。
1,Kafka Topic的消費保證:
流數(shù)據(jù)蜡镶,顧名思義雾袱,就是無界的、流動的官还、快速的芹橡、連續(xù)到達的數(shù)據(jù)序列,所以它不像可靠的文件系統(tǒng)(如HDFS)在計算出現(xiàn)故障時望伦,可以隨時恢復(fù)數(shù)據(jù)來重新計算林说。
那么,如何保證流數(shù)據(jù)可靠的傳遞呢屯伞?我們先了解下面的概念:
Producer通過Broker傳遞消息給Consumer腿箩,Consumer消費消息,
P-B-C 3者之間的傳輸劣摇,主要有以下幾種可能的場景:
At most once(最多傳輸一次): 消息可能會丟珠移,但絕不會重復(fù)傳輸;
At least one ?(至少傳輸一次): ?消息絕不會丟,但可能會重復(fù)傳輸;
Exactly once(精確傳輸一次): ?每條消息肯定會被傳輸一次且僅傳輸一次饵撑,不會重復(fù).
3種場景剑梳,適合不同生產(chǎn)環(huán)境的需要,相關(guān)介紹網(wǎng)上很多滑潘,這里就不多說了垢乙。
本文的重點是,如何用Spark 2.1.0语卤、Kafka 0.10.2追逮、spark-streaming-kafka-0-10_2.11-2.1.0.jar酪刀、HBase 1.3.0來配合實現(xiàn)消息Exactly once(精確一次)的傳遞和消費。網(wǎng)上相關(guān)的scala或者java代碼钮孵,都是基于老版本的API骂倘,目前沒有發(fā)現(xiàn)基于new Kafka consumer API的實現(xiàn),所以看到本文覺得有收獲的同學(xué)巴席,就給點個贊吧历涝。
2,Kafka 0.10.2版本介紹:
Kafka 0.10.2版本漾唉,為了和Zookeeper的解耦荧库,較之前的版本有了很大的變化,老版本的高級API和簡單API的說法不見了赵刑,取而代之的是New Consumer API及New Consumer Configs分衫,相關(guān)接口的參數(shù)及P-B-C 3者的配置參數(shù)有了很多改動。
Spark官方與之配合的工具包spark-streaming-kafka-0-10_2.11-2.1.0.jar 也做了相應(yīng)的改變般此,取消了KafkaCluster類蚪战、取消了ZkUtils.updatePersistentPath等多個方法,也都是為了不在將Topic offset由zookeeper自動保存铐懊,而由用戶靈活的選擇Kafka和Spark 2.1.0官方提供的幾種方法來保存offset邀桑,最好的使用情況下,端到端的業(yè)務(wù)可以達到精確一次的消費保證居扒。
(為了美觀概漱,本文相關(guān)的java代碼都用貼圖方式展現(xiàn)了丑慎,最終實現(xiàn)的端到端精確一次消費消息的源碼見文末的鏈接)
3喜喂,Kafka官方提供的多種消費保證:
Consumer的3個重要的配置,需要配合使用竿裂,來達到Broker到Consumer之間精確一次的消費保證玉吁。
請看這些參數(shù)的組合(有點繞,請仔細看)
(enable.auto.commit:false) + (auto.offset.reset:earliest):
在Broker到Consumer之間實現(xiàn)了至少一次語義腻异,因為不使用Kafka提供的自動保存offset功能进副,每次應(yīng)用程序啟動時,都是從Topic的初始位置來獲取消息悔常。也就是說影斑,應(yīng)用程序因為故障失敗,或者是人為的停止机打,再次啟動應(yīng)用程序時矫户,都會從初始位置把指定的Topic所有的消息都消費一遍,這就導(dǎo)致了Consumer會重復(fù)消費残邀。
(enable.auto.commit:false) + (auto.offset.reset:latest):
在Broker到Consumer之間實現(xiàn)了至多一次語義皆辽,因為不使用Kafka提供的自動保存offset功能柑蛇,每次應(yīng)用程序啟動時,都是從Topic的末尾位置來獲取消息驱闷。也就是說耻台,應(yīng)用程序因為故障失敗,或者是人為的停止后空另,如果Producer向Broker發(fā)送新的消息盆耽,當再次啟動應(yīng)用程序時,Consumer從指定的Topic的末尾來開始消費扼菠,這就導(dǎo)致了這部分新產(chǎn)生的消息丟失征字。
(enable.auto.commit:true)+(auto.offset.reset:earliest)+(auto.commit.interval.ms) :
在Broker到Consumer之間實現(xiàn)了精確一次語義,因為使用了Kafka提供的自動保存offset功能娇豫,當應(yīng)用程序第一次啟動時匙姜,首先從Topic的初試位置來獲取消息,原有的消息一個都沒有丟失冯痢;緊接著氮昧,在auto.commit.interval.ms時間后,Kafka會使用coordinator協(xié)議commit當前的offset(topic的每個分區(qū)的offset)浦楣。當應(yīng)用程序因為故障失敗袖肥,或者是人為的停止,再次啟動應(yīng)用程序時振劳,都會從coordinator模塊獲取Topic的offset椎组,從上一次消費結(jié)束的位置繼續(xù)消費,所以不會重復(fù)消費已經(jīng)消費過的消息历恐,也不會丟失在應(yīng)用程序停止期間新產(chǎn)生的消息寸癌,做到了Broker到Consumer之間精確一次的傳遞。
下面是Kafka 0.10.2 ConsumerCoordinator.java的源碼片段弱贼,用戶配置enable.auto.commit:true對應(yīng)的代碼是autoCommitEnabled為true蒸苇,最終調(diào)用doAutoCommitOffsetsAsync,使用coordinator協(xié)議保存offset(注意吮旅,最新版本已經(jīng)和zookeeper解耦溪烤,不會把offset保存在zookeeper中,所以通過zkCli.sh是看不到相關(guān)topic的)
下面是實現(xiàn)的Spark Streaming代碼庇勃。
當然檬嘀,這還遠遠不夠,因為這樣的方式责嚷,會出現(xiàn)業(yè)務(wù)兩段性的后果:
1鸳兽,讀完消息先commit再處理消息。這種模式下再层,如果consumer在commit后還沒來得及處理消息就crash了贸铜,下次重新開始工作后就無法讀到剛剛已提交而未處理的消息堡纬,這就對應(yīng)于At most once;
2蒿秦,讀完消息先處理再commit烤镐。這種模式下,如果處理完了消息在commit之前consumer crash了棍鳖,下次重新開始工作時還會處理剛剛未commit的消息炮叶,實際上該消息已經(jīng)被處理過了。這就對應(yīng)于At least once渡处。
所以镜悉,要想實現(xiàn)端到端消息的精確一次消費,還需要耐心往后看医瘫。
3侣肄,Spark官方提供的多種消費保證:(基于spark-streaming-kafka-0-10_2.11-2.1.0.jar,相比前一個版本有很多改變)
CheckPoint:
通過設(shè)置Driver程序的checkpoint醇份,來保存topic offset稼锅。這種方法很簡單,但是缺陷也很大:應(yīng)用程序有改變時僚纷,無法使用原來的checkpoint來恢復(fù)offset矩距;只能滿足Broker到Consumer之間精確一次的傳遞。
當應(yīng)用程序第一次啟動時怖竭,首先從Topic的初試位置來獲取消息锥债,原有的消息一個都沒有丟失;緊接著痊臭,在batch時間到達后哮肚,Spark會使用checkpoint保存當前的offset(topic的每個分區(qū)的offset)。當應(yīng)用程序失敗或者人為停止后趣兄,再次啟動應(yīng)用程序時绽左,都會從checkpoint恢復(fù)Topic的offset,從上一次消費結(jié)束的位置繼續(xù)消費艇潭,所以不會重復(fù)消費已經(jīng)消費過的消息,也不會丟失在應(yīng)用程序停止期間新產(chǎn)生的消息戏蔑。
實現(xiàn)的Spark Streaming代碼如下(注意:Spark 1.6.3之后蹋凝,檢查checkpoint的實現(xiàn)已經(jīng)不在用JavaStreamingContextFactory工廠操作了,請細看我的代碼是怎么做的)
Kafka itself:
和前面提到的enable.auto.commit:true異曲同工总棵,不過這里用commitAsync方法異步的把offset提交給Kafka 鳍寂。當應(yīng)用程序第一次啟動時,首先從Topic的初試位置來獲取消息情龄,原有的消息一個都沒有丟失迄汛;緊接著捍壤,用commitAsync方法異步的把offset提交給Kafka(topic的每個分區(qū)的offset)。當應(yīng)用程序失敗或者人為停止后鞍爱,再次啟動應(yīng)用程序時鹃觉,都會從kafka恢復(fù)Topic的offset,從上一次消費結(jié)束的位置繼續(xù)消費睹逃,所以不會重復(fù)消費已經(jīng)消費過的消息盗扇,也不會丟失在應(yīng)用程序停止期間新產(chǎn)生的消息。
與checkpoint相比沉填,應(yīng)用程序代碼的更改不會影響offset的存儲和獲取疗隶。然而,這樣的操作不是事務(wù)性的翼闹,由于是異步提交offset斑鼻,當提交offset過程中應(yīng)用程序crash,則無法保存正確的offset猎荠,會導(dǎo)致消息丟失或者重復(fù)消費卵沉。
實現(xiàn)的Spark Streaming代碼如下:
Your own data store:(當當當當,好戲出場)
如果要做到消息端到端的Exactly once消費法牲,就需要事務(wù)性的處理offset和實際操作的輸出史汗。
經(jīng)典的做法讓offset和操作輸出存在同一個地方,會更簡潔和通用拒垃。比如停撞,consumer把最新的offset和加工后的數(shù)據(jù)一起寫到HBase中,那就可以保證數(shù)據(jù)的輸出和offset的更新要么都成功悼瓮,要么都失敗戈毒,間接實現(xiàn)事務(wù)性,最終做到消息的端到端的精確一次消費横堡。(新版本的官網(wǎng)中只字未提使用Zookeeper保存offset埋市,是有多嫌棄??)
實現(xiàn)的Spark Streaming代碼如下(ConsumerRecord類不能序列化,使用時要注意命贴,不要分發(fā)該類到其他工作節(jié)點上道宅,避免錯誤打印)
其實說白了胸蛛,官方提供的思路就是污茵,把JavaInputDStream轉(zhuǎn)換為OffsetRange對象,該對象具有topic對應(yīng)的分區(qū)的所有信息葬项,每次batch處理完悲关,Spark Streaming都會自動更新該對象侍匙,所以你只需要找個合適的地方保存該對象(比如HBase蒜撮、HDFS),就可以愉快的操縱offset了盗飒。
4,相關(guān)鏈接
Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
(如需轉(zhuǎn)載陋桂,請標明作者和出處)