簡(jiǎn)介
? Kafka起初是由LinkedIn公司采用Scala語(yǔ)言開(kāi)發(fā)的一個(gè)多分區(qū)泼诱、多副本且基于Zookeeper協(xié)調(diào)的分布式消息系統(tǒng)坛掠,現(xiàn)已捐獻(xiàn)給Apache基金會(huì)。目前Kafka已經(jīng)被定為為一個(gè)分布式流式處理平臺(tái)坷檩,它以高吞吐却音、可持久化、可水平拓展矢炼、支持流數(shù)據(jù)處理等多種特性而被廣泛使用。
? Kafka之所以受到越來(lái)越多的青睞阿纤,與它所”扮演“的三大角色是分不開(kāi)的:
? 消息系統(tǒng):作為消息中間件句灌,具備系統(tǒng)解耦、冗余存儲(chǔ)欠拾、流量削峰胰锌、緩沖、異步通信藐窄、擴(kuò)展性资昧、可恢復(fù)性等功能。與此同時(shí)荆忍,Kafka還提供了大多數(shù)消息系統(tǒng)難以實(shí)現(xiàn)的消息順序性保障及回溯消費(fèi)等功能格带。
? 存儲(chǔ)系統(tǒng):Kafka把消息持久化到磁盤,有效降低了數(shù)據(jù)丟失的風(fēng)險(xiǎn)刹枉。也正是得益于Kafka的消息持久化功能和多副本機(jī)制叽唱,我們可以把Kafka作為長(zhǎng)期的數(shù)據(jù)存儲(chǔ)系統(tǒng)來(lái)使用,只需要把對(duì)應(yīng)的數(shù)據(jù)保留策略設(shè)置為“永久”或啟用主題的日志壓縮功能即可微宝。
? 流式處理平臺(tái):
消息中間件MQ使用場(chǎng)景
· 解耦:
未使用MQ的耦合場(chǎng)景:
? 現(xiàn)有A服務(wù)在自己代碼中調(diào)用B服務(wù)的接口和C服務(wù)的接口發(fā)送數(shù)據(jù)
? 此時(shí)新增D服務(wù)也需要A服務(wù)發(fā)送數(shù)據(jù)棺亭,則需要A服務(wù)在自己代碼里修改,發(fā)送數(shù)據(jù)給D服務(wù)蟋软,緊接著C服務(wù)又說(shuō)不需要A服務(wù)給自己發(fā)送數(shù)據(jù)了
? 負(fù)責(zé)A服務(wù)的人還得考慮镶摘,如果調(diào)用的B服務(wù)掛了怎么辦?如果D服務(wù)訪問(wèn)超時(shí)怎么辦岳守?由于A服務(wù)產(chǎn)生了比較關(guān)鍵的數(shù)據(jù)凄敢,許多服務(wù)需要A服務(wù)發(fā)送該數(shù)據(jù)過(guò)來(lái),這也導(dǎo)致了A服務(wù)與其他服務(wù)的嚴(yán)重耦合棺耍。
使用MQ解耦場(chǎng)景:
我們自己使用的場(chǎng)景
· 異步:
未使用MQ的同步高延時(shí)請(qǐng)求場(chǎng)景:
? 現(xiàn)有一用戶請(qǐng)求贡未,調(diào)用服務(wù)A接口
? 我們來(lái)計(jì)算一下,服務(wù)A先是在自己本地執(zhí)行SQL,然后調(diào)用了服務(wù)B俊卤、服務(wù)C和服務(wù)D的接口嫩挤,4個(gè)步驟下來(lái),需要耗時(shí)的總時(shí)長(zhǎng)為970ms消恍。用戶通過(guò)瀏覽器發(fā)起請(qǐng)求岂昭,等待1秒才得到響應(yīng),體驗(yàn)比較差狠怨。一般對(duì)于用戶的直接的操作约啊,要求是每個(gè)請(qǐng)求都必須在200ms內(nèi)完成,對(duì)用戶幾乎是無(wú)感知的佣赖。
使用MQ進(jìn)行異步化:
[圖片上傳失敗...(image-dc15f-1571550202627)]
? 使用MQ進(jìn)行異步化之后恰矩,此時(shí)用戶發(fā)起請(qǐng)求調(diào)用服務(wù)A的總耗時(shí)變成了20+5=25ms。
· 削峰:
未使用MQ削峰大量用戶請(qǐng)求場(chǎng)景:
使用MQ進(jìn)行削峰場(chǎng)景:
? MQ中每秒有2000個(gè)請(qǐng)求進(jìn)來(lái)憎蛤,就只有1000個(gè)請(qǐng)求出去外傅,結(jié)果就是導(dǎo)致在高峰期(假設(shè)1個(gè)小時(shí))可能有幾十萬(wàn)甚至上百萬(wàn)的請(qǐng)求積壓在MQ中,但是高峰期過(guò)后俩檬,每秒鐘只有20個(gè)請(qǐng)求萎胰,系統(tǒng)還是會(huì)按照每秒1000個(gè)請(qǐng)求的速度處理,差不多1個(gè)多小時(shí)就可以把積壓的上百萬(wàn)條消息給處理掉棚辽,就沒(méi)有積壓了技竟。
引入MQ后可能存在的一些問(wèn)題
系統(tǒng)可用性降低:系統(tǒng)引入的外部依賴越多,越容易掛掉屈藐。拿上圖舉例榔组,MQ一旦故障,A服務(wù)沒(méi)發(fā)發(fā)送消息到MQ了估盘,然后BCD服務(wù)也沒(méi)發(fā)消費(fèi)到消息了瓷患,整個(gè)系統(tǒng)就崩潰了,沒(méi)法運(yùn)轉(zhuǎn)了遣妥。
系統(tǒng)復(fù)雜性提高:消息丟失擅编,消息重復(fù),消息順序性問(wèn)題如何保證箫踩?例如A服務(wù)本來(lái)只需要給B服務(wù)發(fā)送一條數(shù)據(jù)就可以了爱态,結(jié)果因?yàn)锳服務(wù)和MQ之間協(xié)調(diào)出現(xiàn)問(wèn)題,A服務(wù)不小心把同一條數(shù)據(jù)發(fā)了兩次到MQ中給B服務(wù)消費(fèi)境钟,導(dǎo)致B服務(wù)插入兩條一模一樣的數(shù)據(jù)锦担。
一致性問(wèn)題:如A服務(wù)處理完了直接返回成功了,都認(rèn)為這個(gè)請(qǐng)求成功了慨削,但是要BCD服務(wù)都寫(xiě)庫(kù)成功才是真正的成功洞渔,如果其中有一個(gè)寫(xiě)庫(kù)失敗了套媚,這樣數(shù)據(jù)就不一致了。
典型的Kafka體系架構(gòu)
先簡(jiǎn)單介紹下Kafka中的術(shù)語(yǔ):
(1)Producer:生產(chǎn)者磁椒,也就是發(fā)送消息的一方堤瘤。生產(chǎn)者負(fù)責(zé)創(chuàng)建消息,然后將其投遞到Kafka中浆熔。
(2)Consumer:消費(fèi)者本辐,也就是接收消息的一方。消費(fèi)者連接到Kafka上并接收消息医增,進(jìn)而進(jìn)行相應(yīng)的業(yè)務(wù)邏輯處理慎皱。
(3)Broker:服務(wù)代理節(jié)點(diǎn)∫豆牵可以將其看做一臺(tái)服務(wù)器上部署的一臺(tái)Kafka服務(wù)器茫多,前提是這臺(tái)服務(wù)器上只部署了一個(gè)Kafka實(shí)例。一個(gè)或多個(gè)Broker組成了一個(gè)Kafka集群邓萨。
(4)Topic:主題地梨。Kafka中的消息以主題為單位進(jìn)行歸類,生產(chǎn)者負(fù)責(zé)將消息發(fā)送到特定的主題缔恳,而消費(fèi)者負(fù)責(zé)訂閱主題并進(jìn)行消費(fèi)。
(5)Partition:分區(qū)洁闰。一個(gè)topic可以分為多個(gè)partition歉甚,每個(gè)partition是一個(gè)有序的隊(duì)列。
(6)offset:偏移量扑眉。同一個(gè)topic下的不同partition包含的消息是不同的纸泄,partition在存儲(chǔ)層面可以看作一個(gè)可追加的日志文件,消息在被追加到分區(qū)日志的時(shí)候都會(huì)分配一個(gè)特定的偏移量(offset)腰素。offset是消息在分區(qū)中的唯一標(biāo)識(shí)聘裁,Kafka通過(guò)它來(lái)保證消息在分區(qū)中的順序性,不過(guò)offset并不跨越分區(qū)弓千,也就是說(shuō)衡便,Kafka保證的是分區(qū)有序而不是主題有序。
如圖洋访,某個(gè)主題中有3個(gè)分區(qū)镣陕,消息被順序追加到每個(gè)分區(qū)日志文件的尾部。Kafka中的分區(qū)可以分布在不同的broker上姻政,也就是說(shuō)呆抑,一個(gè)topic的數(shù)據(jù)可以分布在多個(gè)broker上
? Kafka之所以將topic分成多個(gè)分區(qū),分布在不同的broker上汁展,就是提供負(fù)載均衡的能力鹊碍。
Kafka多副本機(jī)制
? Kafka每個(gè)主題可劃分為多個(gè)分區(qū)厌殉,每個(gè)分區(qū)又配置有多個(gè)副本(Replica)。Kafka為分區(qū)引入了多副本機(jī)制侈咕,通過(guò)增加副本數(shù)量可以提升容災(zāi)能力公罕。同一分區(qū)的不同副本中保存的是相同的消息(在同一時(shí)刻,副本之間并非完全一樣)乎完,副本之間是“一主多從”的關(guān)系熏兄,其中l(wèi)eader副本負(fù)責(zé)處理讀寫(xiě)請(qǐng)求,follower副本只負(fù)責(zé)與leader副本的消息同步树姨。副本處于不同的broker中摩桶,當(dāng)leader副本出現(xiàn)故障時(shí),從follower副本中從新選舉新的leader副本對(duì)外提供服務(wù)帽揪。Kafka通過(guò)多副本機(jī)制實(shí)現(xiàn) 了故障的自動(dòng)轉(zhuǎn)移硝清,當(dāng)Kafka集群中某個(gè)新的broker失效時(shí),仍然能保證服務(wù)可用转晰。
? 如圖所示芦拿,Kafka集群中有3個(gè)broker,某個(gè)topic中有3個(gè)分區(qū)查邢,且副本因子(即副本個(gè)數(shù))也為3蔗崎,如此每個(gè)分區(qū)便有1個(gè)leader副本和2個(gè)follower副本。生產(chǎn)者和消費(fèi)者只與leader副本進(jìn)行交互扰藕,而follower副本只負(fù)責(zé)消息的同步缓苛,很多時(shí)候follower副本中的消息相對(duì)于leader而言會(huì)有一定的滯后。
? 分區(qū)中的所有副本統(tǒng)稱為 AR(Assigned Replicas)邓深。所有與leader副本保持一定程度的同步的副本(包括leader副本在內(nèi))組成 ISR(In-Sync Replicas)未桥,ISR集合是AR集合的一個(gè)子集。消息會(huì)先發(fā)送到leader副本芥备,然后follower副本才能從leader副本中拉取消息進(jìn)行同步冬耿,同步期間內(nèi)follower副本相對(duì)于leader副本會(huì)有一定程度的滯后。與leader副本同步滯后過(guò)多的副本(不包括leader副本)組成OSR(Out-of-Sync Replicas)萌壳,由此可見(jiàn)亦镶,AR = ISR + OSR。在正常情況下讶凉,所有的follower副本都應(yīng)該與leader副本保持一定程度的同步染乌,即 AR = ISR,OSR集合為空懂讯。
? leader副本負(fù)責(zé)維護(hù)和跟蹤ISR集合中所有follower副本的滯后狀態(tài)荷憋,當(dāng)follower副本落后太多或失效時(shí),leader副本會(huì)把它從ISR集合中剔除褐望。這里的落后程度并不是指follower副本與leader副本相差的消息數(shù)勒庄,而是指follower副本寫(xiě)入消息的速度慢于leader副本持續(xù)超過(guò)10s(默認(rèn)參數(shù))串前,則認(rèn)為follower副本落后太多。如果OSR集合中所有的follower副本“追上”了leader副本的進(jìn)度实蔽,那么leader副本會(huì)把它從OSR集合轉(zhuǎn)移至ISR集合荡碾。默認(rèn)情況下,當(dāng)leader副本發(fā)生故障時(shí)局装,只有在ISR集合中的副本才有資格被選舉為新的leader坛吁。
? ISR與HW和LEO也有緊密的關(guān)系。HW是High Watermark的縮寫(xiě)铐尚,俗稱高水位拨脉,它標(biāo)識(shí)了一個(gè)特定的消息偏移量(offset),消息只能拉取到這個(gè)offset之前的消息宣增。
? 如上圖所示玫膀,表示一個(gè)分區(qū)中各種偏移量的說(shuō)明。它代表一個(gè)日志文件爹脾,這個(gè)日志文件中有9條消息帖旨,第一條消息的offset為0,最后一條消息的offset為8,offset為9代表下一條待寫(xiě)入的消息的位置灵妨。日志文件的HW為6解阅,表示消費(fèi)者只能拉取到offset在0至5之間的消息,而offset為6的消息對(duì)消費(fèi)者而言是不可見(jiàn)的泌霍。LEO是Log End Offset的縮寫(xiě)瓮钥,標(biāo)識(shí)當(dāng)前日志文件下一條待寫(xiě)入的消息的offset。分區(qū)ISR集合中的每個(gè)副本都會(huì)維護(hù)自身的的LEO烹吵,而集合中最小的LEO即為分區(qū)的HW,對(duì)消費(fèi)者而言桨武,只能消費(fèi)HW之前的消息肋拔。下面舉個(gè)例子來(lái)更好的說(shuō)明ISR集合與HW和LEO之間的關(guān)系:
? Leader副本接收到生產(chǎn)者發(fā)送的消息,寫(xiě)入本地磁盤后呀酸,會(huì)更新其LEO值凉蜂。
? 在同步過(guò)程中,不同的follower副本的同步效率也不盡相同性誉。
? 在某一時(shí)刻窿吩,follower1完全跟上了leader副本而follower2只同步到了消息3,如此leader副本的LEO為5错览,follower1的LEO為5纫雁,follower2的LEO為4,那么當(dāng)前分區(qū)的HW取最小值4倾哺,此時(shí)消費(fèi)者可以消費(fèi)到offset為0至3之間的消息轧邪。
? 所有的消息都成功寫(xiě)入了消息3和消息4刽脖,整個(gè)分區(qū)的HW和LEO都變?yōu)?,因此消費(fèi)者可以消費(fèi)到offset為4的消息了忌愚。由此可見(jiàn)曲管,Kafka的復(fù)制機(jī)制既不是完全的同步復(fù)制,也不是單純的異步復(fù)制硕糊。事實(shí)上院水,同步復(fù)制要求所有能工作的follower副本都復(fù)制完,這條消息才會(huì)被確認(rèn)為已成功提交简十,這種復(fù)制方式極大地影響了性能檬某。而在異步復(fù)制方式下,follower副本異步地從leader副本中復(fù)制數(shù)據(jù)勺远,數(shù)據(jù)只要被leader副本寫(xiě)入就被認(rèn)為已經(jīng)成功提交了橙喘。在這種情況下,如果follower副本都還沒(méi)有復(fù)制完而落后于leader副本胶逢,突然leader副本宕機(jī)厅瞎,則會(huì)造成數(shù)據(jù)丟失。Kafka使用的這種ISR的方式則有效地權(quán)衡了數(shù)據(jù)可靠性和性能之間的關(guān)系初坠。
生產(chǎn)者
? 一個(gè)正常的生產(chǎn)邏輯為以下幾個(gè)步驟:配置客戶端參數(shù)和簸,創(chuàng)建相應(yīng)的生產(chǎn)者實(shí)例,構(gòu)建待發(fā)送的消息碟刺,發(fā)送消息锁保。
? 客戶端參數(shù)配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
? bootstrap.servers:該參數(shù)用來(lái)指定生產(chǎn)者客戶端連接Kafka集群所需的broker地址清單,格式為host1:port1,host2:port2,這里不一定需要配置所以的broker地址半沽,因?yàn)樯a(chǎn)者會(huì)從給定的broker里找到其他broker信息爽柒。但至少配置2個(gè)以上,當(dāng)其中一個(gè)宕機(jī)了者填,能夠保證生產(chǎn)者仍然能連接到kafka集群上浩村。key.serializer和value.serializer指定序列化操作的序列化器。這三個(gè)參數(shù)都沒(méi)有默認(rèn)值占哟,所以在配置生產(chǎn)者客戶端時(shí)是必填的心墅。
properties.put("acks","0");
? acks,可取值0榨乎,1怎燥,-1,這個(gè)參數(shù)是用來(lái)指定分區(qū)中必須要有多少個(gè)副本接收到這條消息蜜暑,之后生產(chǎn)者才會(huì)認(rèn)為這條消息寫(xiě)入成功铐姚。默認(rèn)值為1,生產(chǎn)者發(fā)送消息之后史煎,只要分區(qū)的leader副本成功寫(xiě)入消息谦屑,那么它就會(huì)收到來(lái)自服務(wù)端的成功響應(yīng)驳糯。如果消息寫(xiě)入leader副本并返回成功響應(yīng)給生產(chǎn)者,且在被其他follower副本拉取前l(fā)eader副本崩潰了氢橙,那么此時(shí)消息還是會(huì)丟失酝枢,因?yàn)樾逻x舉的leader副本中并沒(méi)有這條對(duì)應(yīng)的消息。acks=0悍手,生產(chǎn)者發(fā)送消息之后不需要等待任何服務(wù)端的響應(yīng)帘睦。這樣可以達(dá)到最大的吞吐量,但是也存在問(wèn)題坦康,如果在消息發(fā)送到寫(xiě)入Kafka的過(guò)程中出現(xiàn)某些異常竣付,導(dǎo)致Kafka沒(méi)有接收到這條消息,那么生產(chǎn)者也不知道滞欠,消息也就丟失了古胆。acks=-1或acks=all,生產(chǎn)者在消息發(fā)送之后筛璧,需要等待ISR中的所有副本都成功寫(xiě)入消息之后才能夠收到來(lái)自服務(wù)端的成功響應(yīng)逸绎。設(shè)置成-1可以達(dá)到最強(qiáng)的可靠性,但這并不意味著消息就一定可靠夭谤,因?yàn)槿绻鸌SR中可能只有l(wèi)eader副本棺牧,這樣就退化成acks=1的情況了。所以acks默認(rèn)為1朗儒,是消息可靠性和吞吐量之間的一個(gè)折中方案颊乘。
? 構(gòu)建消息,即創(chuàng)建ProducerRecord對(duì)象醉锄。
public class ProducerRecord<K,V> {
private final String topic; //主題
private final Integer partition; //分區(qū)號(hào)
private final K key; //鍵
private final V value; //值
private final Long timestamp; //消息的時(shí)間戳
...
}
? 其中topic和partition字段分別指代消息要發(fā)往的主題和分區(qū)號(hào)乏悄。value是指消息體,即你要發(fā)送的內(nèi)容恳不。key是用來(lái)指定消息的鍵纲爸,它不僅是消息的附加信息,還可以用來(lái)計(jì)算分區(qū)號(hào)進(jìn)而可以讓消息發(fā)往特定的分區(qū)妆够。消息以主題為單位進(jìn)行歸類,而這個(gè)key可以讓消息再進(jìn)行二次歸類负蚊,同一個(gè)key的消息會(huì)被劃分到同一個(gè)分區(qū)中神妹。說(shuō)到key,這里如果要保證消息的順序性家妆,可以把需要保證消息消費(fèi)順序的指定同一個(gè)key鸵荠。消息在通過(guò)send()方法發(fā)往broker的過(guò)程中,有可能需要經(jīng)過(guò)攔截器伤极、序列化器和分區(qū)器蛹找。攔截器一般不是必需的姨伤,但序列化器是必需的。生產(chǎn)者需要用序列化器把對(duì)象轉(zhuǎn)換成字節(jié)數(shù)組才能通過(guò)網(wǎng)絡(luò)發(fā)送給Kafka庸疾。
? 如果在構(gòu)造消息時(shí)在ProducerRecord中指定了partition字段乍楚,那么就不需要分區(qū)器的作用,如果沒(méi)有指定届慈,那么就需要依賴分區(qū)器根據(jù)key這個(gè)字段來(lái)計(jì)算partition的值徒溪。在默認(rèn)分區(qū)器的方法中,如果key部位null金顿,那么默認(rèn)的分區(qū)器會(huì)對(duì)key進(jìn)行哈希臊泌,最終根據(jù)等到的哈希值來(lái)計(jì)算分區(qū)號(hào),有相同key的消息會(huì)被寫(xiě)入同一個(gè)分區(qū)揍拆。如果key為null渠概,那么消息將會(huì)以輪詢的方式發(fā)往主題內(nèi)的各個(gè)可用分區(qū)。
消費(fèi)者
消費(fèi)者(Consumer):負(fù)責(zé)訂閱Kafka中的主題(topic)嫂拴,并且從訂閱的主題上拉取消息播揪。
消費(fèi)組(Consumer Group):每個(gè)消費(fèi)者都有一個(gè)對(duì)應(yīng)的消費(fèi)組,消息發(fā)布到主題后顷牌,只會(huì)被投遞給訂閱它的每個(gè)消費(fèi)組中的一個(gè)消費(fèi)者剪芍。
? 如上圖所示,某個(gè)主題共有3個(gè)分區(qū)窟蓝,有兩個(gè)消費(fèi)組A和B都訂閱了這個(gè)主題罪裹。按照Kafka默認(rèn)的規(guī)則,消費(fèi)組A中每個(gè)消費(fèi)者分配到1個(gè)分區(qū)运挫,消費(fèi)組B中C3分配到兩個(gè)分區(qū)状共,C4分配到1個(gè)分區(qū)。兩個(gè)消費(fèi)組之間互不影響谁帕,每個(gè)消費(fèi)組只能消費(fèi)所分配到的分區(qū)中的消息峡继,換言之,每一個(gè)分區(qū)只能被一個(gè)消費(fèi)組中的一個(gè)消費(fèi)者所消費(fèi)匈挖。消費(fèi)組是一個(gè)邏輯上的概念碾牌,它將屬于同一組的消費(fèi)者歸為一類,每一個(gè)消費(fèi)者只隸屬于一個(gè)消費(fèi)組儡循,課通過(guò)消費(fèi)者客戶端參數(shù)group.id來(lái)配置消費(fèi)組舶吗。
對(duì)于消息中間件一般有兩種消息投遞模式:
點(diǎn)對(duì)點(diǎn)(P2P,Point-to-Point)模式:基于隊(duì)列择膝,生產(chǎn)者發(fā)送消息到隊(duì)列誓琼,消費(fèi)者從隊(duì)列中接收消息人断。一般是一對(duì)一晦闰。
發(fā)布/訂閱(Pub/Sub)模式:主題作為消息傳遞的中介核蘸,生產(chǎn)者將消息發(fā)布到某個(gè)主題撤奸,消費(fèi)者可主題中訂閱消息。該模式在消息的一對(duì)多廣播時(shí)采用傲隶。
? Kafka同時(shí)支持兩種消息投遞模式饺律,而這正得益于它的消費(fèi)者與消費(fèi)組模型設(shè)計(jì):
- 如果所有的消費(fèi)者都隸屬于同一個(gè)消費(fèi)組,那么所有的消息都會(huì)被均勻的投遞給每一個(gè)消費(fèi)者伦籍,即每條消息只會(huì)被一個(gè)消費(fèi)者處理蓝晒,這就相當(dāng)于點(diǎn)對(duì)點(diǎn)模式。
- 如果所有的消費(fèi)者都隸屬于不同的消費(fèi)組帖鸦,那么所有的消息都會(huì)被廣播給所有的消費(fèi)者芝薇,即每條消息會(huì)被所有的消費(fèi)者處理,這就相當(dāng)于發(fā)布/訂閱模式作儿。
? 一個(gè)正常的消費(fèi)邏輯步驟:配置消費(fèi)者客戶端參數(shù)洛二,創(chuàng)建消費(fèi)者實(shí)例,訂閱主題攻锰,拉取消息并消費(fèi)晾嘶,提交消費(fèi)位移等。
? 配置必要的消費(fèi)者客戶端參數(shù)娶吞,有4個(gè)參數(shù)是必填的垒迂。同生產(chǎn)者一樣,bootstrap.servers妒蛇、key.deserializer和value.deserializer三個(gè)參數(shù)是必配的机断,只不過(guò)key、value的變成了反序列化器绣夺,還有一個(gè)group.id配置消費(fèi)者隸屬的消費(fèi)組名稱吏奸。
props.put(ConsumerConfig.GROUP_ID_CONFIG, "goupA");
? 消息的消費(fèi)一般有兩種模式:推模式——是服務(wù)端主動(dòng)將消息推送給消費(fèi)者,拉模式——是消費(fèi)者主動(dòng)向服務(wù)端發(fā)起請(qǐng)求來(lái)拉取消息陶耍。Kafka中的消費(fèi)基于拉模式的奋蔚。Kafka中的消息消費(fèi)是一個(gè)不斷輪詢的過(guò)程,消費(fèi)者所要做的就是重復(fù)地調(diào)用poll()方法烈钞,返回所訂閱的主題(分區(qū))上的一組消息泊碑。
? 消費(fèi)者消費(fèi)到的每條消息類型為ConsumerRecord,這個(gè)和生產(chǎn)者發(fā)送的消息類型ProducerRecord對(duì)應(yīng)毯欣,不過(guò)字段更豐富:
public class ConsumerRecord<K,V> {
private final String topic; //消息所屬主題名稱
private final int partition; //消息所屬分區(qū)編號(hào)
private final long offset; //消息所屬分區(qū)偏移量
private final long timestamp; //時(shí)間戳
private final TimestampType timestampType; //兩種類型蛾狗,CreateTime消息創(chuàng)建的時(shí)間戳, //LogAppendTime消息追加到日志的時(shí)間戳
private final K key;
private final V value; //一般業(yè)務(wù)應(yīng)用所要讀取的值
...
}
位移提交
? Kafka中每條消息都有唯一的offset仪媒,表示該消息處在的partition中的位置,叫作“偏移量”。消費(fèi)者中也有一個(gè)offset概念算吩,表示消費(fèi)者消費(fèi)到分區(qū)中某個(gè)消息所在的位置留凭,我們把它與消息的區(qū)分開(kāi),可叫作“位移”偎巢。在舊消費(fèi)者客戶端(用Scala編寫(xiě)的客戶端版本)中蔼夜,消費(fèi)位移是保存在ZooKeeper中的,而在新消費(fèi)者客戶端(用Java編寫(xiě)的客戶端)中压昼,消費(fèi)位移存儲(chǔ)在Kafka內(nèi)部的主題_consumer_offsets中求冷。這里將消費(fèi)位移存儲(chǔ)起來(lái)(持久化)的動(dòng)作稱為“提交”。
? 當(dāng)前消費(fèi)者消費(fèi)的位移為X窍霞,但它需要提交的消費(fèi)位移不是X匠题,而是X+1,它表示下一條需要拉取的消息的位置但金。在Kafka中默認(rèn)的消費(fèi)位移提交方式是自動(dòng)提交韭山,提交時(shí)間默認(rèn)為5秒钱磅,可通過(guò)auto.commit.interval.ms配置盖淡。
? 自動(dòng)提交位移的方式非常簡(jiǎn)便褪迟,但是也會(huì)帶來(lái)重復(fù)消費(fèi)和消息丟失的問(wèn)題牵咙。
? 假設(shè)剛剛提交完一次消費(fèi)位移洁桌,然后拉取一批消息進(jìn)行消費(fèi)另凌,在下一次自動(dòng)位移提交之前,消費(fèi)者崩了诗茎,那么等消費(fèi)者恢復(fù)再來(lái)消費(fèi)消息的時(shí)候又得從上一次位移提交的地方重新開(kāi)始王污,這樣便發(fā)生了重復(fù)消費(fèi)的現(xiàn)象昭齐。我們可以通過(guò)減小位移提交時(shí)間間隔來(lái)減小重復(fù)消息的窗口就谜,但這樣并不能避免重復(fù)消費(fèi)的發(fā)送丧荐,而且也會(huì)使得位移提交更加頻繁篮奄。這里我們可以在拿數(shù)據(jù)寫(xiě)庫(kù)前窟却,根據(jù)主鍵去庫(kù)中查詢夸赫,如果已有茬腿,就update一下好了切平,若是寫(xiě)入redis,用set存儲(chǔ)简烘,去重届氢。
?