1 kafka簡介
??Kafka是最初由Linkedin公司開發(fā),是一個分布式举畸、支持分區(qū)的(partition)查排、多副本的(replica),基于zookeeper協(xié)調(diào)的分布式消息系統(tǒng)抄沮,它的最大的特性就是可以實時的處理大量數(shù)據(jù)以滿足各種需求場景:比如基于hadoop的批處理系統(tǒng)跋核、低延遲的實時系統(tǒng)、storm/Spark流式處理引擎叛买,web/nginx日志砂代、訪問日志,消息服務(wù)等等聪全,用scala語言編寫泊藕,Linkedin于2010年貢獻給了Apache基金會并成為頂級開源 項目。
1.1 kafka特性
- 高吞吐量难礼、低延遲:kafka每秒可以處理幾十萬條消息娃圆,它的延遲最低只有幾毫秒,每個topic可以分多個partition, consumer group 對partition進行consume操作蛾茉。
- 可擴展性:kafka集群支持熱擴展
- 持久性讼呢、可靠性:消息被持久化到本地磁盤,并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失
- 容錯性:允許集群中節(jié)點失斍妗(若副本數(shù)量為n,則允許n-1個節(jié)點失斣闷痢)
- 高并發(fā):支持?jǐn)?shù)千個客戶端同時讀寫
1.2 kafka應(yīng)用場景
- 日志收集:一個公司可以用Kafka可以收集各種服務(wù)的log,通過kafka以統(tǒng)一接口服務(wù)的方式開放給各種consumer键思,例如hadoop础爬、Hbase、Solr等吼鳞。
- 消息系統(tǒng):解耦和生產(chǎn)者和消費者看蚜、緩存消息等。
- 用戶活動跟蹤:Kafka經(jīng)常被用來記錄web用戶或者app用戶的各種活動赔桌,如瀏覽網(wǎng)頁供炎、搜索渴逻、點擊等活動,這些活動信息被各個服務(wù)器發(fā)布到kafka的topic中音诫,然后訂閱者通過訂閱這些topic來做實時的監(jiān)控分析惨奕,或者裝載到hadoop、數(shù)據(jù)倉庫中做離線分析和挖掘竭钝。
- 運營指標(biāo):Kafka也經(jīng)常用來記錄運營監(jiān)控數(shù)據(jù)梨撞。包括收集各種分布式應(yīng)用的數(shù)據(jù),生產(chǎn)各種操作的集中反饋蜓氨,比如報警和報告聋袋。
- 流式處理:比如spark streaming和storm
- 事件源
1.3 kafka核心組件
kafka結(jié)構(gòu):
- Producer:消息生產(chǎn)者,產(chǎn)生的消息將會被發(fā)送到某個topic
- Consumer:消息消費者穴吹,消費的消息內(nèi)容來自某個topic
- Topic:消息根據(jù)topic進行歸類幽勒,topic其本質(zhì)是一個目錄,即將同一主題消息歸類到同一個目錄
- Broker:每一個kafka實例(或者說每臺kafka服務(wù)器節(jié)點)就是一個broker港令,一個broker可以有多個topic
- Zookeeper:zookeeper集群不屬于kafka內(nèi)的組件啥容,但kafka依賴zookeeper集群保存meta信息,同時對broker分布式集群管理顷霹,所以在此做聲明其重要性咪惠。
1.4 kafka基本概念
kafka拓撲結(jié)構(gòu):
??一個典型的Kafka集群中包含若干Producer(可以是web前端產(chǎn)生的Page View,或者是服務(wù)器日志淋淀,系統(tǒng)CPU遥昧、Memory等),若干broker(Kafka支持水平擴展朵纷,一般broker數(shù)量越多炭臭,集群吞吐率越高),若干Consumer Group袍辞,以及一個Zookeeper集群鞋仍。Kafka通過Zookeeper管理集群配置,選舉leader搅吁,以及在Consumer Group發(fā)生變化時進行rebalance威创。Producer使用push模式將消息發(fā)布到broker,Consumer使用pull模式從broker訂閱并消費消息谎懦。
- Broker:一臺kafka服務(wù)器就是一個broker肚豺。一個集群由多個broker組成。一個broker可以容納多個topic界拦,這種服務(wù)器被稱為broker 代理详炬、中介者
- Topic:每條發(fā)布到Kafka集群的消息都有一個類別,這個類別被稱為 Topic 主題
物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存于一個或多個broker上但用戶只需指定消息的Topic即可生產(chǎn)或消費數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處
- Partition:Parition是物理上的概念呛谜,每個Topic包含一個或多個Partition 分割、分區(qū)
為了實現(xiàn)擴展性枪萄,一個非常大的topic可以分布到多個 broker(即服務(wù)器)上隐岛,一個topic可以分為多個partition,每個partition是一個有序的隊列瓷翻。
partition中的每條消息都會被分配一個有序的id(offset)聚凹。kafka只保證按一個partition中的順序?qū)⑾l(fā)給consumer,不保證一個topic的整體 (多個partition間)的順序齐帚。
- Producer:負責(zé)發(fā)布消息到Kafka broker
- Consumer:消息消費者妒牙,向Kafka broker讀取消息的客戶端。
- Consumer Group:每個Consumer屬于一個特定的Consumer Group
可為每個Consumer指定group name对妄,若不指定group name則屬于默認(rèn)的group
2. kafka設(shè)計思想
??kafka包括producer湘今、broker、consumer和zookeeper四部分剪菱,為了實現(xiàn)了消息的生產(chǎn)消費以及大吞吐量摩瞎、高性能等特點,kafka設(shè)計了一系列關(guān)于消息的生產(chǎn)孝常、備份旗们、持久化、負載均衡构灸、消費者組上渴、消費、集群管理的策略喜颁,這些就是kafka的設(shè)計思想稠氮。下面我們給大家一一介紹
2.1 ConsumerGroup
??消費組是一個邏輯上的概念,它將旗下的消費者歸為一類洛巢,每一個消費者只隸屬于一個消費組 每個消費組都會有一個固定的名稱括袒,消費者在進行消費前需要指定其所屬的消費組的名稱,可以在消費者客戶端通過 group.id 參數(shù)來配置稿茉,默認(rèn)為空字符串锹锰。
??各個consumer(consumer 線程)可以組成一個組(Consumer group ),同一partition中的每個message只能被組(Consumer group )中的一個consumer(consumer 線程)消費漓库,如果一個message可以被多個consumer(consumer 線程)消費的話恃慧,那么這些consumer必須在不同的組。
??如果同一組內(nèi)的consumer需要消費同一partition渺蒿,則需要對組內(nèi)消費者重新劃分痢士,將其分配到不同的consumer group內(nèi)。同一partition被多個consumer消費茂装,這些consumer都必須是順序讀取partition里面的message怠蹂,新啟動的consumer默認(rèn)從partition隊列最頭端最新的地方開始阻塞的讀message善延。
??當(dāng)啟動一個consumer group去消費一個topic的時候,無論topic里面有多個少個partition城侧,無論我們consumer group里面配置了多少個consumer thread易遣,這個consumer group下面的所有consumer thread一定會消費全部的partition;即便這個consumer group下只有一個consumer thread嫌佑,那么這個consumer thread也會去消費所有的partition豆茫。因此,最優(yōu)的設(shè)計就是屋摇,consumer group下的consumer thread的數(shù)量等于partition數(shù)量揩魂,這樣效率是最高的。
2.2 kafka支持的消費模式
push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式炮温,即Producer只管向broker push消息火脉,consumer只管從broker pull消息,兩者對消息的生產(chǎn)和消費是異步的茅特。
??對于消息中間件而已忘分,一般有兩種消息投遞模式:點對點(P2P,Point-to-Point)模式和發(fā)布/訂閱(Pub/Sub)模式白修。點對點模式是基于隊列的妒峦,生產(chǎn)者將消息發(fā)送到隊列,消費者從隊列接收消息兵睛。發(fā)布/訂閱模式定義了如何向一個內(nèi)容節(jié)點發(fā)送和訂閱消息肯骇,這個內(nèi)容節(jié)點在Kafka中為主題(Topic)相當(dāng)于一個中介,生產(chǎn)者發(fā)布消息到主題祖很,而消費者消費所訂閱的主題笛丙。主題使消息的訂閱者和發(fā)布者互相保持獨立,不需要接觸就可以進行消息的傳遞假颇,發(fā)布/訂閱模式在消息一對多廣播時使用胚鸯。Kafka同時支持這兩種模式,正是得益于消費者和消費組模型的契合:
- 如果所有的消費者都屬于同一個消費組笨鸡,那么消息就會被均衡的投遞給每一個消費者姜钳,也就是一個消費組內(nèi)每條消息只會被一個消費者處理,這就相當(dāng)于點對點模式形耗。
- 如果所有的消費者都不屬于一個消費組哥桥,那么消息就會投遞給訂閱該主題的每個消費組中的一個消費者,這就相當(dāng)于發(fā)布/訂閱模式激涤。
2.3 kafka broker集群
??Kakfa Broker集群受Zookeeper管理拟糕,broker之間沒有主從關(guān)系,各個broker在集群中地位一樣,我們可以隨意的增加或刪除任何一個broker節(jié)點送滞,但broker之中會選舉出Controller侠草,其他的叫Kafka Broker follower,這個過程叫做Controller在Zookeeper注冊Watch犁嗅。
??所有的Kafka Broker節(jié)點一起去Zookeeper上注冊一個臨時節(jié)點梦抢,因為只有一個Kafka Broker會注冊成功,其他的都會失敗愧哟,所以這個成功在Zookeeper上注冊臨時節(jié)點的這個Kafka Broker會成為Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower哼蛆。這個Controller會監(jiān)聽其他的Kafka Broker的所有信息蕊梧,如果這個kafka broker controller宕機了,在zookeeper上面的那個臨時節(jié)點就會消失腮介,此時所有的kafka broker又會一起去Zookeeper上注冊一個臨時節(jié)點肥矢,因為只有一個Kafka Broker會注冊成功,其他的都會失敗叠洗。
Controller對宕機broker及其partition處理過程:
1.Controller在Zookeeper注冊Watch甘改,一旦有Broker宕機(這是用宕機代表任何讓系統(tǒng)認(rèn)為其die的情景,包括但不限于機器斷電灭抑,網(wǎng)絡(luò)不可用十艾,GC導(dǎo)致的Stop The World,進程crash等)腾节,其在Zookeeper對應(yīng)的znode會自動被刪除忘嫉,Zookeeper會fire Controller注冊的watch,Controller讀取最新的幸存的Broker
2.Controller決定set_p案腺,該集合包含了宕機的所有Broker上的所有Partition
3.對set_p中的每一個Partition
3.1 從/brokers/topics/[topic]/partitions/[partition]/state讀取該Partition當(dāng)前的ISR
3.2 決定該Partition的新Leader庆冕。如果當(dāng)前ISR中有至少一個Replica還幸存,則選擇其中一個作為新Leader劈榨,新的ISR則包含當(dāng)前ISR中所有幸存的Replica(選舉算法的實現(xiàn)類似于微軟的PacificA)访递。否則選擇該Partition中任意一個幸存的Replica作為新的Leader以及ISR(該場景下可能會有潛在的數(shù)據(jù)丟失)。如果該Partition的所有Replica都宕機了同辣,則將新的Leader設(shè)置為-1拷姿,等待恢復(fù)。
3.3 將新的Leader邑闺,ISR和新的leader_epoch及controller_epoch寫入/brokers/topics/[topic]/partitions/[partition]/state跌前。注意,該操作只有其version在3.1至3.3的過程中無變化時才會執(zhí)行陡舅,否則跳轉(zhuǎn)到3.1
- 直接通過RPC向set_p相關(guān)的Broker發(fā)送LeaderAndISRRequest命令抵乓。Controller可以在一個RPC操作中發(fā)送多個命令從而提高效率。
所有Replica都不工作即Partition的Leader為-1的處理策略:
??上文提到,在ISR中至少有一個follower時灾炭,Kafka可以確保已經(jīng)commit的數(shù)據(jù)不丟失茎芋,但如果某個Partition的所有Replica都宕機了,就無法保證數(shù)據(jù)不丟失了蜈出。這種情況下有兩種可行的方案:
1.等待ISR中的任一個Replica“活”過來田弥,并且選它作為Leader
2.選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader
??這就需要在可用性和一致性當(dāng)中作出一個簡單的折衷。如果一定要等待ISR中的Replica“活”過來铡原,那不可用的時間就可能會相對較長偷厦。而且如果ISR中的所有Replica都無法“活”過來了,或者數(shù)據(jù)都丟失了燕刻,這個Partition將永遠不可用只泼。選擇第一個“活”過來的Replica作為Leader,而這個Replica不是ISR中的Replica卵洗,那即使它并不保證已經(jīng)包含了所有已commit的消息请唱,它也會成為Leader而作為consumer的數(shù)據(jù)源(前文有說明,所有讀寫都由Leader完成)过蹂。Kafka0.8.*使用了第二種方式十绑。根據(jù)Kafka的文檔,在以后的版本中酷勺,Kafka支持用戶通過配置選擇這兩種方式中的一種本橙,從而根據(jù)不同的使用場景選擇高可用性還是強一致性。 unclean.leader.election.enable 參數(shù)決定使用哪種方案鸥印,默認(rèn)是true勋功,采用第二種方案。
2.4 kafka partition及replica(備份)
??partion可以看作一個有序的隊列库说,里面的數(shù)據(jù)是儲存在硬盤中的,追加式的潜的。partition的作用就是提供分布式的擴展,一個topic可以有許多partions啰挪,多個partition可以并行處理數(shù)據(jù),所以可以處理相當(dāng)量的數(shù)據(jù)亡呵。只有partition的leader才會進行讀寫操作,folower僅進行復(fù)制锰什,客戶端是感知不到的丁逝。下圖把kafka集群看成一個kakfa服務(wù)霜幼,僅顯示leader。
offset:
??每一條數(shù)據(jù)都有一個offset罪既,是每一條數(shù)據(jù)在該partition中的唯一標(biāo)識铡恕。各個consumer控制和設(shè)置其在該partition下消費到offset位置,這樣下次可以以該offset位置開始進行消費探熔。
??各個consumer的offset位置默認(rèn)是在某一個broker當(dāng)中的topic中保存的(為防止該broker宕掉無法獲取offset信息猩谊,可以配置在每個broker中都進行保存,配置文件中配置)
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
2.5 replicas(備份)
??replica數(shù)其實就是partition的副本總數(shù)祭刚,其中包括一個leader,其他的就是copy副本墙牌。如果有N個replicas涡驮,其中一個replica為leader,其他都為follower喜滨,leader處理partition的所有讀寫請求捉捅,于此同時,follower會被動定期的去復(fù)制leader上的數(shù)據(jù)虽风。
Partition leader與follower:
??partition也有l(wèi)eader和follower之分棒口。leader(圖中標(biāo)紅)是主partition,producer寫kafka的時候先寫partition leader辜膝,再由partition leader push給其他的partition follower(圖中標(biāo)綠)无牵。partition leader與follower的信息受Zookeeper控制,一旦partition leader所在的broker節(jié)點宕機厂抖,zookeeper會沖其他的broker的partition follower上選擇follower變?yōu)閜arition leader茎毁。
Topic分配partition和partition replica的算法:
- 1.將Broker(size=n)和待分配的Partition排序。
- 2.將第i個Partition分配到第(i%n)個Broker上忱辅。
- 3.將第i個Partition的第j個Replica分配到第((i + j) % n)個Broker上七蜘。如圖所示,topic1-part0(紅色)即第0個partition的第1個replica分配在broker 1上墙懂,topic1-part0(broker 2上綠色)即第0個partition第2個replica橡卤。
消息投遞有關(guān)partition的機制:
- producer先把message發(fā)送到partition leader,再由leader發(fā)送給其他partition follower损搬。
- 向Producer發(fā)送ACK前需要保證有多少個Replica已經(jīng)收到該消息:根據(jù)ack配的個數(shù)而定碧库。
- 怎樣處理某個Replica不工作的情況:如果這個部工作的partition replica不在ack列表中柜与,就是producer在發(fā)送消息到partition leader上,partition leader向partition follower發(fā)送message沒有響應(yīng)而已谈为,這個不會影響整個系統(tǒng)旅挤,也不會有什么問題。如果這個不工作的partition replica在ack列表中的話伞鲫,producer發(fā)送的message的時候會等待這個不工作的partition replca寫message成功秕脓,但是會等到time out吠架,然后返回失敗因為某個ack列表中的partition replica沒有響應(yīng)傍药,此時kafka會自動的把這個部工作的partition replica從ack列表中移除拐辽,以后的producer發(fā)送message的時候就不會有這個ack列表下的這個部工作的partition replica了俱诸。
- 怎樣處理Failed Replica恢復(fù)回來的情況:如果這個partition replica之前不在ack列表中睁搭,那么啟動后重新受Zookeeper管理即可园骆,之后producer發(fā)送message的時候锌唾,partition leader會繼續(xù)發(fā)送message到這個partition follower上鸠珠。如果這個partition replica之前在ack列表中渐排,此時重啟后驯耻,需要把這個partition replica再手動加到ack列表中。(ack列表是手動添加的霎迫,出現(xiàn)某個部工作的partition replica的時候自動從ack列表中移除的)
2.6 ISR和AR
- 分區(qū)中的所有副本統(tǒng)稱為AR(Assigned Repllicas)知给。
- 所有與leader副本保持一定程度同步的副本(包括Leader)組成ISR(In-Sync Replicas),ISR集合是AR集合中的一個子集戈次。
- 消息會先發(fā)送到leader副本怯邪,然后follower副本才能從leader副本中拉取消息進行同步悬秉,同步期間內(nèi)follower副本相對于leader副本而言會有一定程度的滯后搂捧。前面所說的“一定程度”是指可以忍受的滯后范圍,這個范圍可以通過參數(shù)進行配置王凑。
- 與leader副本同步滯后過多的副本(不包括leader)副本索烹,組成OSR(Out-Sync Relipcas),由此可見:AR=ISR+OSR百姓。
- Leader副本負責(zé)維護和跟蹤ISR集合中所有的follower副本的滯后狀態(tài),當(dāng)follower副本落后太多或者失效時旬迹,leader副本會吧它從ISR集合中剔除(數(shù)量滯后和時間滯后兩個維度奔垦,replica.lag.time.max.ms和replica.lag.max.message可配置)椿猎。如果OSR集合中follower副本“追上”了Leader副本,之后再ISR集合中的副本才有資格被選舉為leader按灶,而在OSR集合中的副本則沒有機會(這個原則可以通過修改對應(yīng)的參數(shù)配置來改變)
ISR的伸縮:
??Kafka在啟動的時候會開啟兩個與ISR相關(guān)的定時任務(wù)鸯旁,名稱分別為“isr-expiration"和”isr-change-propagation".羡亩。isr-expiration任務(wù)會周期性的檢測每個分區(qū)是否需要縮減其ISR集合畏铆。這個周期和“replica.lag.time.max.ms”參數(shù)有關(guān)辞居。大小是這個參數(shù)一半蛋勺。默認(rèn)值為5000ms抱完,當(dāng)檢測到ISR中有是失效的副本的時候碉怔,就會縮減ISR集合禁添。如果某個分區(qū)的ISR集合發(fā)生變更老翘, 則會將變更后的數(shù)據(jù)記錄到ZooKerper對應(yīng)/brokers/topics//partition//state節(jié)點中铺峭。節(jié)點中數(shù)據(jù)示例如下:
{“controller_cpoch":26,“l(fā)eader”:0,“version”:1,“l(fā)eader_epoch”:2,“isr”:{0,1}}
? ??其中controller_epoch表示的是當(dāng)前的kafka控制器epoch.leader表示當(dāng)前分區(qū)的leader副本所在的broker的id編號卫键,version表示版本號永罚,(當(dāng)前半本固定位1),leader_epoch表示當(dāng)前分區(qū)的leader紀(jì)元翅敌,isr表示變更后的isr列表惕蹄。
kafka擴充ISR的機制:
??隨著follower副本不斷進行消息同步卖陵,follower副本LEO也會逐漸后移,并且最終趕上leader副本棒旗,此時follower副本就有資格進入ISR集合铣揉,追趕上leader副本的判定準(zhǔn)側(cè)是此副本的LEO是否小于leader副本HW逛拱,這里并不是和leader副本LEO相比朽合。ISR擴充之后同樣會更新ZooKeeper中的/broker/topics//partition//state節(jié)點和isrChangeSet饱狂,之后的步驟就和ISR收縮的時的相同。
???當(dāng)ISR集合發(fā)生增減時衍腥,或者ISR集合中任一副本LEO發(fā)生變化時纳猫,都會影響整個分區(qū)的HW尚骄。
???如下圖所示倔丈,leader副本的LEO為9,follower副本的LEO為7鹉动,而follower2副本的LEO為6泽示,如果判定這三個副本都處于ISR集合中械筛,那么分區(qū)的HW為6埋哟,如果follower3已經(jīng)判定失效副本被剝離出ISR集合定欧,那么此時分區(qū)HW為leader副本和follower副本中LEO的最小值砍鸠,即為7.
HW 是 High Watermark 的縮寫,俗稱高水位朦肘,水印弟断,它標(biāo)識了一個特定的消息偏移量(Offset ),消費者只能拉取到這個 Offset 之前的消息趴生。
LEO 是 Log End Offset 的縮寫刘急, 它標(biāo)識了當(dāng)前日志文件中 下一條待寫入消息的 Offset .
注意 :LEO 標(biāo)識的是下一條待寫入消息的 Offset
參考資料:
2.7 消息投遞可靠性
Kafka消息發(fā)送三種方式:
- 發(fā)送并忘記(不關(guān)心消息是否正常到達开睡,對返回結(jié)果不做任何判斷處理):
發(fā)送并忘記的方式本質(zhì)上也是一種異步的方式,只是它不會獲取消息發(fā)送的返回結(jié)果胁艰,這種方式的吞吐量是最高的腾么,但是無法保證消息的可靠性
- 發(fā)送并忘記(不關(guān)心消息是否正常到達开睡,對返回結(jié)果不做任何判斷處理):
- 同步發(fā)送(通過get方法等待Kafka的響應(yīng),判斷消息是否發(fā)送成功):
以同步的方式發(fā)送消息時殴泰,一條一條的發(fā)送悍汛,對每條消息返回的結(jié)果判斷, 可以明確地知道每條消息的發(fā)送情況奉件,但是由于同步的方式會阻塞糖埋,只有當(dāng)消息通過get返回future對象時,才會繼續(xù)下一條消息的發(fā)送
- 同步發(fā)送(通過get方法等待Kafka的響應(yīng),判斷消息是否發(fā)送成功):
- 異步發(fā)送+回調(diào)函數(shù)(消息以異步的方式發(fā)送杭攻,通過回調(diào)函數(shù)返回消息發(fā)送成功/失敗):
在調(diào)用send方法發(fā)送消息的同時馆铁,指定一個回調(diào)函數(shù)埠巨,服務(wù)器在返回響應(yīng)時會調(diào)用該回調(diào)函數(shù)辣垒,通過回調(diào)函數(shù)能夠?qū)Ξ惓G闆r進行處理,當(dāng)調(diào)用了回調(diào)函數(shù)時印蔬,只有回調(diào)函數(shù)執(zhí)行完畢生產(chǎn)者才會結(jié)束勋桶,否則一直會阻塞
- 異步發(fā)送+回調(diào)函數(shù)(消息以異步的方式發(fā)送杭攻,通過回調(diào)函數(shù)返回消息發(fā)送成功/失敗):
Kafka消息投遞提供了三種確認(rèn)模式:
- ack=0圾另,生產(chǎn)者在成功寫入消息之前不會等待任何來自服務(wù)器的響應(yīng)雨涛。
- 當(dāng)ack=1,表示producer寫partition leader成功后库继,broker就返回成功退唠,無論其他的partition follower是否寫成功鹃锈。
- 當(dāng)ack=2,表示producer寫partition leader和其他一個follower成功的時候铜邮,broker就返回成功松蒜,無論其他的partition follower是否寫成功。
- 當(dāng)ack=-1或all [parition的數(shù)量]的時候,表示只有producer全部寫成功的時候雅倒,才算成功裁良,kafka broker才返回成功信息侨把。這里需要注意的是华匾,如果ack=1的時候旭旭,一旦有個broker宕機導(dǎo)致partition的follower和leader切換稍味,會導(dǎo)致丟數(shù)據(jù)。
2.8 消息消費
???Kafka中的Producer和consumer采用的是push-and-pull模式疼燥,即Producer只管向broker push消息违寿,consumer只管從broker pull消息息罗,兩者對消息的生產(chǎn)和消費是異步的。
??consumer如何知道自己應(yīng)該拉取哪一個partition。cordinator(某一個Kafka的broker)在分配consumer的時候,會選舉consumer leader备绽,后者分配每一個consumer要連接的broker,topic,partition眠冈,然后上報cordinator雇盖。然后consumer會根據(jù)自己被分配的partion去拉取數(shù)據(jù)薛匪。批量讀取和單數(shù)據(jù)讀取娇跟,ack機制做裙。如果poll()時間超時,那么broker會認(rèn)為consumer掛掉了滤港,會踢掉該consumer添履。cordinator重新分配consumer。有時超時會拋異常,不過也會重新分配consumer珊泳。consumer的groupId機制。對于一個groupId中的consumer來說狮辽,一個partition只能由一個consumer來消費菱皆。即不可能多個consumer消費1個partition。
??消息傳輸一致性語義:
- at most once(至多一次): 消費者fetch消息,然后保存offset,然后處理消息;當(dāng)client保存offset之后,但是在消息處理過程中consumer進程失效(crash),導(dǎo)致部分消息未能繼續(xù)處理.那么此后可能其他consumer會接管,但是因為offset已經(jīng)提前保存,那么新的consumer將不能fetch到offset之前的消息(盡管它們尚沒有被處理),這就是"at most once".
設(shè)置enable.auto.commit為ture
設(shè)置 auto.commit.interval.ms為一個較小的時間間隔.
client不要調(diào)用commitSync()蹄殃,kafka在特定的時間間隔內(nèi)自動提交诅岩。
public void mostOnce(){
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
process(record);
}
}
}
- at least once(至少一次): 消費者fetch消息,然后處理消息,然后保存offset.如果消息處理成功之后,但是在保存offset階段zookeeper異巢菝瑁或者consumer失效,導(dǎo)致保存offset操作未能執(zhí)行成功,這就導(dǎo)致接下來再次fetch時可能獲得上次已經(jīng)處理過的消息,這就是"at least once".
設(shè)置enable.auto.commit為false
client調(diào)用commitSync()倔韭,增加消息偏移;
- exactly once(恰好1次):最少1次+消費者的輸出中額外增加已處理消息最大編號:由于已處理消息最大編號的存在陕赃,不會出現(xiàn)重復(fù)處理消息的情況卵蛉。
如果要實現(xiàn)這種方式,必須自己控制消息的offset么库,自己記錄一下當(dāng)前的offset傻丝,對消息的處理和offset的移動必須保持在同一個事務(wù)中,例如在同一個事務(wù)中诉儒,把消息處理的結(jié)果存到mysql數(shù)據(jù)庫同時更新此時的消息的偏移葡缰。
設(shè)置enable.auto.commit為false
保存ConsumerRecord中的offset到數(shù)據(jù)庫
當(dāng)partition分區(qū)發(fā)生變化的時候需要rebalance,有以下幾個事件會觸發(fā)分區(qū)變化1 consumer訂閱的topic中的分區(qū)大小發(fā)生變化
2 topic被創(chuàng)建或者被刪除
3 consuer所在group中有個成員掛了
4 新的consumer通過調(diào)用join加入了group
此時 consumer通過實現(xiàn)ConsumerRebalanceListener接口允睹,捕捉這些事件运准,對偏移量進行處理幌氮。
consumer通過調(diào)用seek(TopicPartition, long)方法缭受,移動到指定的分區(qū)的偏移位置。
順序保證性:
由于kafka的producer的寫message與consumer去讀message都是順序的讀寫该互,保證了高效的性能米者。