Kafka消費(fèi)者

1、消費(fèi)者和消費(fèi)者組

????????消費(fèi)者負(fù)責(zé)訂閱Kafka中的主題,并從訂閱的主題中拉取消息。與其他消息中間件不同的是:Kafka中的消費(fèi)理念中還有一層消費(fèi)者組(consumer group),每個(gè)消費(fèi)者都屬于一個(gè)消費(fèi)者組徙菠。當(dāng)消息發(fā)布到主題后,只會(huì)投遞給訂閱它的每個(gè)消費(fèi)者組中的一個(gè)消費(fèi)者郁岩。換言之婿奔,每一個(gè)分區(qū)只能被一個(gè)消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi)。

????????消費(fèi)者與消費(fèi)者組的這種模式可以讓整體的消費(fèi)能力具有橫向伸縮性问慎,我們可以增加或減少消費(fèi)者的個(gè)數(shù)來(lái)提高或降低整體的消費(fèi)能力萍摊。但是,一味的增加消費(fèi)者并不會(huì)使消費(fèi)能力得到提升蝴乔,如果消費(fèi)者過(guò)多记餐,出現(xiàn)了消費(fèi)者的數(shù)量大于分區(qū)的數(shù)量,就會(huì)有消費(fèi)者分配不到任何分區(qū)薇正。

????????消息中間件片酝,一般有兩種消息投遞模式:點(diǎn)對(duì)點(diǎn)和發(fā)布訂閱。

? ? ? ? 1)如果所有的消費(fèi)這都屬于一個(gè)消費(fèi)者組挖腰,那么所有的消息都會(huì)被均衡地投遞給每一個(gè)消費(fèi)者雕沿,即每條消息只會(huì)被一個(gè)消費(fèi)者處理,這就相當(dāng)于點(diǎn)對(duì)點(diǎn)模式的應(yīng)用猴仑。

? ? ? ? 2)如果所有消費(fèi)者都屬于不同的消費(fèi)者組审轮,那么所有的消息都會(huì)被廣播被所有的消費(fèi)者肥哎,即每條消息都會(huì)被所有的消費(fèi)者處理,這就相當(dāng)于發(fā)布/訂閱模式的應(yīng)用疾渣。

2篡诽、客戶(hù)端開(kāi)發(fā)

????????一個(gè)正常的消費(fèi)邏輯應(yīng)該具備以下幾個(gè)步驟:

? ? ? ? 1)配置消費(fèi)者客戶(hù)端參數(shù)以及創(chuàng)建相關(guān)的消費(fèi)者實(shí)例

? ? ? ? 2)訂閱主題

? ? ? ? 3)拉取消息并消費(fèi)

? ? ? ? 4)提交消費(fèi)位移

? ? ? ? 5)關(guān)閉消費(fèi)者實(shí)例

2.1、消息消費(fèi)

????????消息的消費(fèi)一般有兩種模式:推模式和拉模式榴捡。Kafka的消費(fèi)是基于拉(poll)模式的杈女。Kafka的消息消費(fèi)是一個(gè)不斷輪詢(xún)的過(guò)程,消費(fèi)者要做的就是不斷地調(diào)用poll()方法吊圾。

????????消費(fèi)者消費(fèi)到的每條消息的類(lèi)型為ConsumerRecord达椰。

????????public class ConsumerRecord<K,V> (

? ? ????????? private final String topic;? // 主題

? ? ????????? private final int partition;? // 分區(qū)

? ? ? ????????private final long offset;? // 消息所在分區(qū)的偏移量

? ? ? ????????private final long timestamp; // 時(shí)間戳

? ? ????????? private final Timestamptype timestampType;? ? ? // 時(shí)間戳類(lèi)型

? ? ????????? private final int serialzedKeySize; // key的序列化器

? ? ????????? private final int serialzedValueSize;? ? // value的序列化器

? ? ? ????????private final Headers headers;? ? ? // 消息的頭部?jī)?nèi)容

? ? ????????? private final K key;? ? // 消息的鍵

? ? ? ????????private final V value;? // 消息的值

? ? ????????? private volatile Long checksum;? ?

? ? ????????? // 省略若干方法

????????)

????????poll()方法返回的類(lèi)型是ConsumerRecords,它用來(lái)表示一次拉取操作所獲得的消息集项乒,內(nèi)部包含了ConsumerRecord啰劲。

2.2、位移提交

????????每次調(diào)用poll()的時(shí)候檀何,它返回的是還沒(méi)有被消費(fèi)的消息集蝇裤,要做到這一點(diǎn),就需要記錄上一次消費(fèi)時(shí)的消費(fèi)位移埃碱。而且猖辫,這個(gè)消費(fèi)位移必須要持久化保存酥泞,不能僅僅保存在內(nèi)存中砚殿,否則消費(fèi)者重啟之后就無(wú)法知曉之前的消費(fèi)位移。再比如芝囤,如果消費(fèi)者組中加入了新的消費(fèi)者似炎,再均衡之后,分區(qū)有可能會(huì)被分配給新的消費(fèi)者悯姊,如果不持久化保存羡藐,新的消費(fèi)者會(huì)不知道之前的消費(fèi)位移。

????????舊的消費(fèi)位移是存儲(chǔ)在zookeeper中悯许,而新的消費(fèi)者客戶(hù)端中仆嗦,是保存在Kafka自己的topic中,即_consumer_offsets先壕。

????????對(duì)于位移提交的時(shí)機(jī)非常有講究瘩扼,有可能會(huì)造成重復(fù)消費(fèi)或者消息丟失的現(xiàn)象。

????????比如:這次我拉取的是[x+1,x+5]的消息垃僚,拉取到消息之后集绰,就提交了位移。但是當(dāng)消費(fèi)到x+3的時(shí)候遇到了異常谆棺,在故障恢復(fù)之后栽燕,重現(xiàn)拉取消息是從x+6開(kāi)始的。數(shù)據(jù)發(fā)生了丟失。x+3到x+5之間的消息丟失了碍岔。

????????再比如:這次我拉取的是[x+1,x+5]的消息浴讯,位移提交動(dòng)作是消費(fèi)完所有信息之后才執(zhí)行的,那么當(dāng)消費(fèi)到x+3的時(shí)候發(fā)生了異常蔼啦。在故障恢復(fù)之后兰珍,我們重新拉取的消息是從x+1開(kāi)始的,那么x+1到x+2之間的消息又重新消費(fèi)了询吴。

????????實(shí)際情況可能更復(fù)雜掠河。

????????Kafka的默認(rèn)消費(fèi)位移提交方式是自動(dòng)提交,由消費(fèi)者客戶(hù)端參數(shù)enable.auto.commit配置猛计,默認(rèn)值為true唠摹。自動(dòng)提交是定期提交,定期的周期由參數(shù)auto.commit.interval.ms配置奉瘤,默認(rèn)值為5秒勾拉。默認(rèn)情況下,消費(fèi)者每隔5秒會(huì)將拉取到的每個(gè)分區(qū)中最大的消息位移進(jìn)行提交盗温。自動(dòng)提交的動(dòng)作是在poll()方法的邏輯中完成的藕赞。

????????自動(dòng)提交雖然簡(jiǎn)便,但隨之而來(lái)的是重復(fù)消費(fèi)和消息丟失的問(wèn)題卖局。雖然可以通過(guò)減少提交的時(shí)間間隔來(lái)減少丟失的窗口大小斧蜕,但是并不能避免重復(fù)消費(fèi)的發(fā)生,而且會(huì)使位移提交更加頻繁砚偶。

????????自動(dòng)提交重復(fù)消費(fèi)很容易理解批销。數(shù)據(jù)丟失的情況:比如有兩個(gè)線程,線程A不斷地拉取消息并存入本地緩存染坯。線程B從本地緩存中讀取消息并進(jìn)行響應(yīng)的邏輯處理均芽。假設(shè) 線程A拉取到的消息過(guò)多,線程B還沒(méi)來(lái)得及消費(fèi)完单鹿,此時(shí)掛掉了掀宋。等重新恢復(fù)過(guò)來(lái)之后,線程B還沒(méi)有小得到消息就丟失了仲锄。

????????除此之外劲妙,自動(dòng)提交還無(wú)法做到精準(zhǔn)的位移管理。

????????很多情況下昼窗,并不是拉取完信息就算消費(fèi)完成是趴,而是需要將信息寫(xiě)入數(shù)據(jù)庫(kù)、寫(xiě)入本地緩存等等澄惊。在這些場(chǎng)景下唆途,所有的業(yè)務(wù)都被完成了富雅,才能認(rèn)為消息被成功消費(fèi),手動(dòng)的提交方式可以讓開(kāi)發(fā)人員在合適的地方進(jìn)行位移提交肛搬。

????????手動(dòng)提交分為同步提交和異步提交没佑。

2.2.1、同步提交

????????pubulic void commitSync()

????????代碼示例:

????????while (isRunning.get()){

? ? ? ????????ConsumerRecords<String,String> records = consumer.poll(100)

? ? ? ????????for (ConsumerRecords<String,String> record:records){

? ? ? ? ? ? ????????// do some logical processing

? ? ????????? }

? ? ? ????????consumer.commitSync()

????????}

????????同步提交會(huì)阻塞消費(fèi)者線程直至提交完成温赔。這個(gè)示例依然有重復(fù)消費(fèi)的問(wèn)題蛤奢,如果業(yè)務(wù)邏輯處理完之后,在位移提交之前陶贼,程序奔潰了啤贩,那么待恢復(fù)之后又要從上一次位移提交的地方拉取消息。

????????pubulic void commitSync(final Map<TopicPartition,OffsetAndMetadata> offsets)

????????上述方法提供了offset參數(shù)拜秧,用來(lái)提交指定分區(qū)的位移痹屹。

????????實(shí)際應(yīng)用中,很少會(huì)每消費(fèi)一條消息就提交一次枉氮。因?yàn)楸旧韈ommitSync就是同步執(zhí)行的志衍,會(huì)消耗一定的性能。

2.2.2聊替、異步提交

????????異步提交的方式在執(zhí)行的時(shí)候消費(fèi)者線程不會(huì)被阻塞楼肪,可能在提交消費(fèi)位移的結(jié)果還未返回之前就開(kāi)始了新一次的拉取操作。

????????commitAsync()提交的時(shí)候同樣會(huì)有失敗的情況發(fā)生惹悄。重試春叫?不可取,比如這次提交的X偏移量失敗了俘侠,但是另一次X+Y成功了象缀,假如重試成功了,那么此時(shí)位移偏移量又重新回到了X爷速。如果此時(shí)發(fā)生再均衡,那么恢復(fù)之后又從X開(kāi)始消費(fèi)霞怀。

????????為此惫东,我們可以設(shè)置一個(gè)遞增的序號(hào)來(lái)維持異步提交的順序。在遇到位移提交失敗需要重試的時(shí)候毙石,可以檢查所提交的位移和序號(hào)的值的大小廉沮。如果小了,就不要重試了徐矩。

????????實(shí)際情況下滞时,位移提交失敗的情況很少發(fā)生,不重試也沒(méi)關(guān)系滤灯,后面的提交總有成功的坪稽。重試會(huì)增加代碼的復(fù)雜度曼玩,不重試會(huì)增加代碼重復(fù)消費(fèi)的幾率。

????????如果消費(fèi)者異常退出窒百,其實(shí)重復(fù)消費(fèi)的情況很難避免黍判,更多可能是從業(yè)務(wù)處理側(cè)糾正。如果消費(fèi)者正常退出或者發(fā)生再均衡時(shí)篙梢,可以在退出或再均衡執(zhí)行之情使用同步提交的方式做最后的把關(guān)顷帖。

????????思考如何盡量避免重復(fù)消費(fèi)和消息丟失?

2.3渤滞、指定位移提交

????????在Kafka中每當(dāng)消費(fèi)者查找不到所記錄的消費(fèi)位移時(shí)贬墩,就會(huì)根據(jù)消費(fèi)者客戶(hù)端參數(shù)auto.offset.reset的配置來(lái)決定從何處開(kāi)始消費(fèi)。默認(rèn)值時(shí)"latest"妄呕,從分區(qū)末尾開(kāi)始消費(fèi)消息震糖。還有個(gè)值"earliest",從開(kāi)頭開(kāi)始消費(fèi)趴腋。

????????消息拉取的poll()方法其實(shí)對(duì)于我們來(lái)說(shuō)是一個(gè)黑盒吊说,普通開(kāi)發(fā)人員無(wú)法精準(zhǔn)的掌握消費(fèi)起始位置。提供的auto.offset.reset參數(shù)只有在找不到位移或位移越界的情況下才會(huì)粗略地從末尾或者從頭開(kāi)始消費(fèi)优炬。

????????KafkaConsumer中的seek()方法提供了一種功能颁井,可以讓我們呢追前消費(fèi)或者回溯消費(fèi)。

????????public void seek (TopicPartition partition, long offset)

????????partition表示分區(qū)蠢护,offset表示從分區(qū)的哪個(gè)位置開(kāi)始消費(fèi)雅宾。

????????seek方法為我們提供了從特定位置讀取消息的能力,我們可以通過(guò)這個(gè)方法向前跳過(guò)若干消息葵硕,也可以通過(guò)這個(gè)方法回溯若干消息眉抬,為消費(fèi)消息提供了很大的靈活性。

2.4懈凹、再均衡

????????再均衡是指分區(qū)的所屬權(quán)從一個(gè)消費(fèi)者轉(zhuǎn)移到另一個(gè)消費(fèi)者的行為蜀变,它為消費(fèi)組具備高可用性和伸縮性提供保障,讓我們可以既方便又安全地刪除和增加消費(fèi)者介评。不過(guò)库北,再均衡期間,消費(fèi)者組內(nèi)的消費(fèi)者是無(wú)法讀取消息的们陆。換言之寒瓦,消費(fèi)組會(huì)變得不可用。

????????除此之外坪仇,當(dāng)一個(gè)分區(qū)被分配到另一個(gè)消費(fèi)者時(shí)杂腰,消費(fèi)者之前的狀態(tài)會(huì)丟失,可能會(huì)造成重復(fù)消費(fèi)椅文,因此要避免不必要的再均衡喂很。

2.5惜颇、多線程實(shí)現(xiàn)

????????KafkaProducer是線程安全的,但是KafkaConsumer是非線程安全的恤筛。

????????KafkaConsumer非線程安全并不意味著我們?cè)谙M(fèi)消息的時(shí)候只能以單線程的方式執(zhí)行官还。我們可以通過(guò)多線程的方式來(lái)實(shí)現(xiàn)消息消費(fèi),多線程的目的就是為了提高整體的消費(fèi)能力毒坛。

????????第一種方式:線程封閉望伦,即為每一個(gè)線程實(shí)例化一個(gè)KafkaConsumer對(duì)象,一個(gè)線程隊(duì)形一個(gè)KafkaConsumer實(shí)例煎殷,我們可以稱(chēng)之為消費(fèi)線程屯伞。一個(gè)線程可以消費(fèi)一個(gè)或多個(gè)分區(qū)內(nèi)的消息,所有的消費(fèi)線程都隸屬于一個(gè)消費(fèi)者組豪直。

????????第二種方法:多個(gè)線程同時(shí)對(duì)應(yīng)一個(gè)分區(qū)劣摇,可以通過(guò)assign()、seek()方法實(shí)現(xiàn)弓乙,這樣可以打破原有的消費(fèi)線程的個(gè)數(shù)不能超過(guò)分區(qū)數(shù)的限制末融,進(jìn)一步提供消費(fèi)能力。不過(guò)這種方式非常復(fù)雜暇韧,實(shí)際很少用到勾习。

????????第三種方法:一般而言,poll()拉取消息的速度是相當(dāng)快的懈玻,而整體消費(fèi)瓶頸其實(shí)是現(xiàn)在處理消息這一塊巧婶。我們可以將處理消息模塊分為多線程處理。缺點(diǎn)就是對(duì)于消息的順序處理就比較困難了涂乌。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末艺栈,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子湾盒,更是在濱河造成了極大的恐慌湿右,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,183評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件历涝,死亡現(xiàn)場(chǎng)離奇詭異诅需,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)荧库,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,850評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)赵刑,“玉大人分衫,你說(shuō)我怎么就攤上這事“愦耍” “怎么了蚪战?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,766評(píng)論 0 361
  • 文/不壞的土叔 我叫張陵牵现,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我邀桑,道長(zhǎng)瞎疼,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,854評(píng)論 1 299
  • 正文 為了忘掉前任壁畸,我火速辦了婚禮贼急,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘捏萍。我一直安慰自己太抓,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,871評(píng)論 6 398
  • 文/花漫 我一把揭開(kāi)白布令杈。 她就那樣靜靜地躺著走敌,像睡著了一般。 火紅的嫁衣襯著肌膚如雪逗噩。 梳的紋絲不亂的頭發(fā)上掉丽,一...
    開(kāi)封第一講書(shū)人閱讀 52,457評(píng)論 1 311
  • 那天,我揣著相機(jī)與錄音异雁,去河邊找鬼捶障。 笑死,一個(gè)胖子當(dāng)著我的面吹牛片迅,可吹牛的內(nèi)容都是我干的残邀。 我是一名探鬼主播,決...
    沈念sama閱讀 40,999評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼柑蛇,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼芥挣!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起耻台,我...
    開(kāi)封第一講書(shū)人閱讀 39,914評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤空免,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后盆耽,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體蹋砚,經(jīng)...
    沈念sama閱讀 46,465評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,543評(píng)論 3 342
  • 正文 我和宋清朗相戀三年摄杂,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了坝咐。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,675評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡析恢,死狀恐怖墨坚,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情映挂,我是刑警寧澤泽篮,帶...
    沈念sama閱讀 36,354評(píng)論 5 351
  • 正文 年R本政府宣布盗尸,位于F島的核電站,受9級(jí)特大地震影響帽撑,放射性物質(zhì)發(fā)生泄漏泼各。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,029評(píng)論 3 335
  • 文/蒙蒙 一亏拉、第九天 我趴在偏房一處隱蔽的房頂上張望扣蜻。 院中可真熱鬧,春花似錦专筷、人聲如沸弱贼。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,514評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)吮旅。三九已至,卻和暖如春味咳,著一層夾襖步出監(jiān)牢的瞬間庇勃,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,616評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工槽驶, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留责嚷,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,091評(píng)論 3 378
  • 正文 我出身青樓掂铐,卻偏偏與公主長(zhǎng)得像罕拂,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子全陨,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,685評(píng)論 2 360

推薦閱讀更多精彩內(nèi)容

  • 消費(fèi)者網(wǎng)絡(luò)客戶(hù)端輪詢(xún):ConsumerNetworkClient爆班。ConsumerNetworkClient是對(duì)N...
    紹圣閱讀 775評(píng)論 0 0
  • 此篇開(kāi)始進(jìn)入kafka的另外一側(cè):消費(fèi)者。kafka中的消費(fèi)者比生產(chǎn)者要復(fù)雜的多辱姨,里面涉及到的消費(fèi)組柿菩,偏移量等概念...
    紹圣閱讀 1,925評(píng)論 0 0
  • 1、消費(fèi)者組 消費(fèi)者(Consumer)負(fù)責(zé)訂閱 Kafka 中的主題( Topic)雨涛,并且從訂閱的主題上拉取消息...
    冰河winner閱讀 1,698評(píng)論 0 6
  • 消費(fèi)者能發(fā)送拉取請(qǐng)求的前提條件是:1枢舶,消費(fèi)者已經(jīng)連接上了服務(wù)端協(xié)調(diào)者所在的節(jié)點(diǎn);2替久,消費(fèi)者必須獲取到服務(wù)端協(xié)調(diào)者分...
    紹圣閱讀 685評(píng)論 0 0
  • 前言 讀完本文凉泄,你將了解到如下知識(shí)點(diǎn): kafka 的消費(fèi)者 和 消費(fèi)者組 如何正確使用kafka consume...
    zwb_jianshu閱讀 861評(píng)論 0 0