1、消費(fèi)者和消費(fèi)者組
????????消費(fèi)者負(fù)責(zé)訂閱Kafka中的主題,并從訂閱的主題中拉取消息。與其他消息中間件不同的是:Kafka中的消費(fèi)理念中還有一層消費(fèi)者組(consumer group),每個(gè)消費(fèi)者都屬于一個(gè)消費(fèi)者組徙菠。當(dāng)消息發(fā)布到主題后,只會(huì)投遞給訂閱它的每個(gè)消費(fèi)者組中的一個(gè)消費(fèi)者郁岩。換言之婿奔,每一個(gè)分區(qū)只能被一個(gè)消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi)。
????????消費(fèi)者與消費(fèi)者組的這種模式可以讓整體的消費(fèi)能力具有橫向伸縮性问慎,我們可以增加或減少消費(fèi)者的個(gè)數(shù)來(lái)提高或降低整體的消費(fèi)能力萍摊。但是,一味的增加消費(fèi)者并不會(huì)使消費(fèi)能力得到提升蝴乔,如果消費(fèi)者過(guò)多记餐,出現(xiàn)了消費(fèi)者的數(shù)量大于分區(qū)的數(shù)量,就會(huì)有消費(fèi)者分配不到任何分區(qū)薇正。
????????消息中間件片酝,一般有兩種消息投遞模式:點(diǎn)對(duì)點(diǎn)和發(fā)布訂閱。
? ? ? ? 1)如果所有的消費(fèi)這都屬于一個(gè)消費(fèi)者組挖腰,那么所有的消息都會(huì)被均衡地投遞給每一個(gè)消費(fèi)者雕沿,即每條消息只會(huì)被一個(gè)消費(fèi)者處理,這就相當(dāng)于點(diǎn)對(duì)點(diǎn)模式的應(yīng)用猴仑。
? ? ? ? 2)如果所有消費(fèi)者都屬于不同的消費(fèi)者組审轮,那么所有的消息都會(huì)被廣播被所有的消費(fèi)者肥哎,即每條消息都會(huì)被所有的消費(fèi)者處理,這就相當(dāng)于發(fā)布/訂閱模式的應(yīng)用疾渣。
2篡诽、客戶(hù)端開(kāi)發(fā)
????????一個(gè)正常的消費(fèi)邏輯應(yīng)該具備以下幾個(gè)步驟:
? ? ? ? 1)配置消費(fèi)者客戶(hù)端參數(shù)以及創(chuàng)建相關(guān)的消費(fèi)者實(shí)例
? ? ? ? 2)訂閱主題
? ? ? ? 3)拉取消息并消費(fèi)
? ? ? ? 4)提交消費(fèi)位移
? ? ? ? 5)關(guān)閉消費(fèi)者實(shí)例
2.1、消息消費(fèi)
????????消息的消費(fèi)一般有兩種模式:推模式和拉模式榴捡。Kafka的消費(fèi)是基于拉(poll)模式的杈女。Kafka的消息消費(fèi)是一個(gè)不斷輪詢(xún)的過(guò)程,消費(fèi)者要做的就是不斷地調(diào)用poll()方法吊圾。
????????消費(fèi)者消費(fèi)到的每條消息的類(lèi)型為ConsumerRecord达椰。
????????public class ConsumerRecord<K,V> (
? ? ????????? private final String topic;? // 主題
? ? ????????? private final int partition;? // 分區(qū)
? ? ? ????????private final long offset;? // 消息所在分區(qū)的偏移量
? ? ? ????????private final long timestamp; // 時(shí)間戳
? ? ????????? private final Timestamptype timestampType;? ? ? // 時(shí)間戳類(lèi)型
? ? ????????? private final int serialzedKeySize; // key的序列化器
? ? ????????? private final int serialzedValueSize;? ? // value的序列化器
? ? ? ????????private final Headers headers;? ? ? // 消息的頭部?jī)?nèi)容
? ? ????????? private final K key;? ? // 消息的鍵
? ? ? ????????private final V value;? // 消息的值
? ? ????????? private volatile Long checksum;? ?
? ? ????????? // 省略若干方法
????????)
????????poll()方法返回的類(lèi)型是ConsumerRecords,它用來(lái)表示一次拉取操作所獲得的消息集项乒,內(nèi)部包含了ConsumerRecord啰劲。
2.2、位移提交
????????每次調(diào)用poll()的時(shí)候檀何,它返回的是還沒(méi)有被消費(fèi)的消息集蝇裤,要做到這一點(diǎn),就需要記錄上一次消費(fèi)時(shí)的消費(fèi)位移埃碱。而且猖辫,這個(gè)消費(fèi)位移必須要持久化保存酥泞,不能僅僅保存在內(nèi)存中砚殿,否則消費(fèi)者重啟之后就無(wú)法知曉之前的消費(fèi)位移。再比如芝囤,如果消費(fèi)者組中加入了新的消費(fèi)者似炎,再均衡之后,分區(qū)有可能會(huì)被分配給新的消費(fèi)者悯姊,如果不持久化保存羡藐,新的消費(fèi)者會(huì)不知道之前的消費(fèi)位移。
????????舊的消費(fèi)位移是存儲(chǔ)在zookeeper中悯许,而新的消費(fèi)者客戶(hù)端中仆嗦,是保存在Kafka自己的topic中,即_consumer_offsets先壕。
????????對(duì)于位移提交的時(shí)機(jī)非常有講究瘩扼,有可能會(huì)造成重復(fù)消費(fèi)或者消息丟失的現(xiàn)象。
????????比如:這次我拉取的是[x+1,x+5]的消息垃僚,拉取到消息之后集绰,就提交了位移。但是當(dāng)消費(fèi)到x+3的時(shí)候遇到了異常谆棺,在故障恢復(fù)之后栽燕,重現(xiàn)拉取消息是從x+6開(kāi)始的。數(shù)據(jù)發(fā)生了丟失。x+3到x+5之間的消息丟失了碍岔。
????????再比如:這次我拉取的是[x+1,x+5]的消息浴讯,位移提交動(dòng)作是消費(fèi)完所有信息之后才執(zhí)行的,那么當(dāng)消費(fèi)到x+3的時(shí)候發(fā)生了異常蔼啦。在故障恢復(fù)之后兰珍,我們重新拉取的消息是從x+1開(kāi)始的,那么x+1到x+2之間的消息又重新消費(fèi)了询吴。
????????實(shí)際情況可能更復(fù)雜掠河。
????????Kafka的默認(rèn)消費(fèi)位移提交方式是自動(dòng)提交,由消費(fèi)者客戶(hù)端參數(shù)enable.auto.commit配置猛计,默認(rèn)值為true唠摹。自動(dòng)提交是定期提交,定期的周期由參數(shù)auto.commit.interval.ms配置奉瘤,默認(rèn)值為5秒勾拉。默認(rèn)情況下,消費(fèi)者每隔5秒會(huì)將拉取到的每個(gè)分區(qū)中最大的消息位移進(jìn)行提交盗温。自動(dòng)提交的動(dòng)作是在poll()方法的邏輯中完成的藕赞。
????????自動(dòng)提交雖然簡(jiǎn)便,但隨之而來(lái)的是重復(fù)消費(fèi)和消息丟失的問(wèn)題卖局。雖然可以通過(guò)減少提交的時(shí)間間隔來(lái)減少丟失的窗口大小斧蜕,但是并不能避免重復(fù)消費(fèi)的發(fā)生,而且會(huì)使位移提交更加頻繁砚偶。
????????自動(dòng)提交重復(fù)消費(fèi)很容易理解批销。數(shù)據(jù)丟失的情況:比如有兩個(gè)線程,線程A不斷地拉取消息并存入本地緩存染坯。線程B從本地緩存中讀取消息并進(jìn)行響應(yīng)的邏輯處理均芽。假設(shè) 線程A拉取到的消息過(guò)多,線程B還沒(méi)來(lái)得及消費(fèi)完单鹿,此時(shí)掛掉了掀宋。等重新恢復(fù)過(guò)來(lái)之后,線程B還沒(méi)有小得到消息就丟失了仲锄。
????????除此之外劲妙,自動(dòng)提交還無(wú)法做到精準(zhǔn)的位移管理。
????????很多情況下昼窗,并不是拉取完信息就算消費(fèi)完成是趴,而是需要將信息寫(xiě)入數(shù)據(jù)庫(kù)、寫(xiě)入本地緩存等等澄惊。在這些場(chǎng)景下唆途,所有的業(yè)務(wù)都被完成了富雅,才能認(rèn)為消息被成功消費(fèi),手動(dòng)的提交方式可以讓開(kāi)發(fā)人員在合適的地方進(jìn)行位移提交肛搬。
????????手動(dòng)提交分為同步提交和異步提交没佑。
2.2.1、同步提交
????????pubulic void commitSync()
????????代碼示例:
????????while (isRunning.get()){
? ? ? ????????ConsumerRecords<String,String> records = consumer.poll(100)
? ? ? ????????for (ConsumerRecords<String,String> record:records){
? ? ? ? ? ? ????????// do some logical processing
? ? ????????? }
? ? ? ????????consumer.commitSync()
????????}
????????同步提交會(huì)阻塞消費(fèi)者線程直至提交完成温赔。這個(gè)示例依然有重復(fù)消費(fèi)的問(wèn)題蛤奢,如果業(yè)務(wù)邏輯處理完之后,在位移提交之前陶贼,程序奔潰了啤贩,那么待恢復(fù)之后又要從上一次位移提交的地方拉取消息。
????????pubulic void commitSync(final Map<TopicPartition,OffsetAndMetadata> offsets)
????????上述方法提供了offset參數(shù)拜秧,用來(lái)提交指定分區(qū)的位移痹屹。
????????實(shí)際應(yīng)用中,很少會(huì)每消費(fèi)一條消息就提交一次枉氮。因?yàn)楸旧韈ommitSync就是同步執(zhí)行的志衍,會(huì)消耗一定的性能。
2.2.2聊替、異步提交
????????異步提交的方式在執(zhí)行的時(shí)候消費(fèi)者線程不會(huì)被阻塞楼肪,可能在提交消費(fèi)位移的結(jié)果還未返回之前就開(kāi)始了新一次的拉取操作。
????????commitAsync()提交的時(shí)候同樣會(huì)有失敗的情況發(fā)生惹悄。重試春叫?不可取,比如這次提交的X偏移量失敗了俘侠,但是另一次X+Y成功了象缀,假如重試成功了,那么此時(shí)位移偏移量又重新回到了X爷速。如果此時(shí)發(fā)生再均衡,那么恢復(fù)之后又從X開(kāi)始消費(fèi)霞怀。
????????為此惫东,我們可以設(shè)置一個(gè)遞增的序號(hào)來(lái)維持異步提交的順序。在遇到位移提交失敗需要重試的時(shí)候毙石,可以檢查所提交的位移和序號(hào)的值的大小廉沮。如果小了,就不要重試了徐矩。
????????實(shí)際情況下滞时,位移提交失敗的情況很少發(fā)生,不重試也沒(méi)關(guān)系滤灯,后面的提交總有成功的坪稽。重試會(huì)增加代碼的復(fù)雜度曼玩,不重試會(huì)增加代碼重復(fù)消費(fèi)的幾率。
????????如果消費(fèi)者異常退出窒百,其實(shí)重復(fù)消費(fèi)的情況很難避免黍判,更多可能是從業(yè)務(wù)處理側(cè)糾正。如果消費(fèi)者正常退出或者發(fā)生再均衡時(shí)篙梢,可以在退出或再均衡執(zhí)行之情使用同步提交的方式做最后的把關(guān)顷帖。
????????思考如何盡量避免重復(fù)消費(fèi)和消息丟失?
2.3渤滞、指定位移提交
????????在Kafka中每當(dāng)消費(fèi)者查找不到所記錄的消費(fèi)位移時(shí)贬墩,就會(huì)根據(jù)消費(fèi)者客戶(hù)端參數(shù)auto.offset.reset的配置來(lái)決定從何處開(kāi)始消費(fèi)。默認(rèn)值時(shí)"latest"妄呕,從分區(qū)末尾開(kāi)始消費(fèi)消息震糖。還有個(gè)值"earliest",從開(kāi)頭開(kāi)始消費(fèi)趴腋。
????????消息拉取的poll()方法其實(shí)對(duì)于我們來(lái)說(shuō)是一個(gè)黑盒吊说,普通開(kāi)發(fā)人員無(wú)法精準(zhǔn)的掌握消費(fèi)起始位置。提供的auto.offset.reset參數(shù)只有在找不到位移或位移越界的情況下才會(huì)粗略地從末尾或者從頭開(kāi)始消費(fèi)优炬。
????????KafkaConsumer中的seek()方法提供了一種功能颁井,可以讓我們呢追前消費(fèi)或者回溯消費(fèi)。
????????public void seek (TopicPartition partition, long offset)
????????partition表示分區(qū)蠢护,offset表示從分區(qū)的哪個(gè)位置開(kāi)始消費(fèi)雅宾。
????????seek方法為我們提供了從特定位置讀取消息的能力,我們可以通過(guò)這個(gè)方法向前跳過(guò)若干消息葵硕,也可以通過(guò)這個(gè)方法回溯若干消息眉抬,為消費(fèi)消息提供了很大的靈活性。
2.4懈凹、再均衡
????????再均衡是指分區(qū)的所屬權(quán)從一個(gè)消費(fèi)者轉(zhuǎn)移到另一個(gè)消費(fèi)者的行為蜀变,它為消費(fèi)組具備高可用性和伸縮性提供保障,讓我們可以既方便又安全地刪除和增加消費(fèi)者介评。不過(guò)库北,再均衡期間,消費(fèi)者組內(nèi)的消費(fèi)者是無(wú)法讀取消息的们陆。換言之寒瓦,消費(fèi)組會(huì)變得不可用。
????????除此之外坪仇,當(dāng)一個(gè)分區(qū)被分配到另一個(gè)消費(fèi)者時(shí)杂腰,消費(fèi)者之前的狀態(tài)會(huì)丟失,可能會(huì)造成重復(fù)消費(fèi)椅文,因此要避免不必要的再均衡喂很。
2.5惜颇、多線程實(shí)現(xiàn)
????????KafkaProducer是線程安全的,但是KafkaConsumer是非線程安全的恤筛。
????????KafkaConsumer非線程安全并不意味著我們?cè)谙M(fèi)消息的時(shí)候只能以單線程的方式執(zhí)行官还。我們可以通過(guò)多線程的方式來(lái)實(shí)現(xiàn)消息消費(fèi),多線程的目的就是為了提高整體的消費(fèi)能力毒坛。
????????第一種方式:線程封閉望伦,即為每一個(gè)線程實(shí)例化一個(gè)KafkaConsumer對(duì)象,一個(gè)線程隊(duì)形一個(gè)KafkaConsumer實(shí)例煎殷,我們可以稱(chēng)之為消費(fèi)線程屯伞。一個(gè)線程可以消費(fèi)一個(gè)或多個(gè)分區(qū)內(nèi)的消息,所有的消費(fèi)線程都隸屬于一個(gè)消費(fèi)者組豪直。
????????第二種方法:多個(gè)線程同時(shí)對(duì)應(yīng)一個(gè)分區(qū)劣摇,可以通過(guò)assign()、seek()方法實(shí)現(xiàn)弓乙,這樣可以打破原有的消費(fèi)線程的個(gè)數(shù)不能超過(guò)分區(qū)數(shù)的限制末融,進(jìn)一步提供消費(fèi)能力。不過(guò)這種方式非常復(fù)雜暇韧,實(shí)際很少用到勾习。
????????第三種方法:一般而言,poll()拉取消息的速度是相當(dāng)快的懈玻,而整體消費(fèi)瓶頸其實(shí)是現(xiàn)在處理消息這一塊巧婶。我們可以將處理消息模塊分為多線程處理。缺點(diǎn)就是對(duì)于消息的順序處理就比較困難了涂乌。