總結kafka的consumer消費能力很低的情況下的處理方案

簡介

由于項目中需要使用kafka作為消息隊列斜纪,并且項目是基于spring-boot來進行構建的瓶堕,所以項目采用了spring-kafka作為原生kafka的一個擴展庫進行使用掠河。先說明一下版本:

  • spring-boot 的版本是1.4.0.RELEASE
  • kafka 的版本是0.9.0.x 版本
  • spring-kafka 的版本是1.0.3.RELEASE

用過kafka的人都知道,對于使用kafka來說,producer的使用相對簡單一些奏甫,只需要把數(shù)據(jù)按照指定的格式發(fā)送給kafka中某一個topic就可以了尖飞。本文主要是針對spring-kafka的consumer端上的使用進行簡單一些分析和總結症副。

kafka的速度是很快店雅,所以一般來說producer的生產(chǎn)消息的邏輯速度都會比consumer的消費消息的邏輯速度快。

具體案例

之前在項目中遇到了一個案例是贞铣,consumer消費一條數(shù)據(jù)平均需要200ms的時間闹啦,并且在某個時刻,producer會在短時間內(nèi)產(chǎn)生大量的數(shù)據(jù)丟進kafka的broker里面(假設平均1s中內(nèi)丟入了5w條需要消費的消息辕坝,這個情況會持續(xù)幾分鐘)窍奋。

對于這種情況,kafka的consumer的行為會是:

  • kafka的consumer會從broker里面取出一批數(shù)據(jù)酱畅,?給消費線程進行消費琳袄。
  • 由于取出的一批消息數(shù)量太大,consumer在session.timeout.ms時間之內(nèi)沒有消費完成
  • consumer coordinator 會由于沒有接受到心跳而掛掉纺酸,并且出現(xiàn)一些日志
[rhllor] Tue Oct 18 21:39:16 CST 2016 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator coordinatorDead 529: kafka-example|NTI|Marking the coordinator 2147483646 dead.
[rhllor] Tue Oct 18 21:39:16 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator sendGroupMetadataRequest 465: kafka-example|NTI|Issuing group metadata request to broker 1
[rhllor] Tue Oct 18 21:39:16 CST 2016 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator handle 550: kafka-example|NTI|Error ILLEGAL_GENERATION occurred while committing offsets for group new-message-1
[rhllor] Tue Oct 18 21:39:16 CST 2016 WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onComplete 424: kafka-example|NTI|Auto offset commit failed: Commit cannot be completed due to group rebalance
[rhllor] Tue Oct 18 21:39:16 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator run 408: kafka-example|NTI|Cannot auto-commit offsets now since the coordinator is unknown, will retry after backoff
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handleGroupMetadataResponse 478: kafka-example|NTI|Group metadata response ClientResponse(receivedTimeMs=1476797957072, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@1d3d7e6, request=RequestSend(header={api_key=10,api_version=0,correlation_id=20,client_id=consumer-1}, body={group_id=new-message-1}), createdTimeMs=1476797956485, sendTimeMs=1476797956485), responseBody={error_code=0,coordinator={node_id=1,host=10.10.44.124,port=9092}})
[rhllor] Tue Oct 18 21:39:17 CST 2016 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator handle 550: kafka-example|NTI|Error ILLEGAL_GENERATION occurred while committing offsets for group new-message-1
[rhllor] Tue Oct 18 21:39:17 CST 2016 WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator maybeAutoCommitOffsetsSync 445: kafka-example|NTI|Auto offset commit failed: 
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onJoinPrepare 247: kafka-example|NTI|Revoking previously assigned partitions [rhllor-log-0, rhllor-log-1, rhllor-log-2]
[rhllor] Tue Oct 18 21:39:17 CST 2016 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.springframework.kafka.listener.KafkaMessageListenerContainer onPartitionsRevoked 244: kafka-example|NTI|partitions revoked:[rhllor-log-0, rhllor-log-1, rhllor-log-2]
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator performGroupJoin 309: kafka-example|NTI|(Re-)joining group new-message-1
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator performGroupJoin 318: kafka-example|NTI|Issuing request (JOIN_GROUP: {group_id=new-message-1,session_timeout=15000,member_id=consumer-1-64063d04-9d4e-45af-a927-17ccf31c6ec1,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=22 cap=22]}]}) to coordinator 2147483646
[rhllor] Tue Oct 18 21:39:17 CST 2016 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle 354: kafka-example|NTI|Attempt to join group new-message-1 failed due to unknown member id, resetting and retrying.
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator performGroupJoin 309: kafka-example|NTI|(Re-)joining group new-message-1
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator performGroupJoin 318: kafka-example|NTI|Issuing request (JOIN_GROUP: {group_id=new-message-1,session_timeout=15000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=22 cap=22]}]}) to coordinator 2147483646
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle 336: kafka-example|NTI|Joined group: {error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2,member_id=consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2,members=[{member_id=consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=22 cap=22]}]}
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator performAssignment 225: kafka-example|NTI|Performing range assignment for subscriptions {consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2=org.apache.kafka.clients.consumer.internals.PartitionAssignor$Subscription@1dbca7d4}
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator performAssignment 229: kafka-example|NTI|Finished assignment: {consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2=org.apache.kafka.clients.consumer.internals.PartitionAssignor$Assignment@4826f394}
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator onJoinLeader 397: kafka-example|NTI|Issuing leader SyncGroup (SYNC_GROUP: {group_id=new-message-1,generation_id=1,member_id=consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2,group_assignment=[{member_id=consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=38 cap=38]}]}) to coordinator 2147483646
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle 423: kafka-example|NTI|Received successful sync group response for group new-message-1: {error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=38 cap=38]}
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onJoinComplete 191: kafka-example|NTI|Setting newly assigned partitions [rhllor-log-0, rhllor-log-1, rhllor-log-2]
[rhllor] Tue Oct 18 21:39:17 CST 2016 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.springframework.kafka.listener.KafkaMessageListenerContainer onPartitionsAssigned 249: kafka-example|NTI|partitions assigned:[rhllor-log-0, rhllor-log-1, rhllor-log-2]
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator sendOffsetFetchRequest 581: kafka-example|NTI|Fetching committed offsets for partitions: [rhllor-log-0, rhllor-log-1, rhllor-log-2]

日志的意思大概是coordinator掛掉了窖逗,然后自動提交offset失敗,然后重新分配partition給客戶端

  • 由于自動提交offset失敗餐蔬,導致重新分配了partition的客戶端又重新消費之前的一批數(shù)據(jù)
  • 接著consumer重新消費碎紊,又出現(xiàn)了消費超時,無限循環(huán)下去用含。

解決方案

遇到了這個問題之后矮慕, 我們做了一些步驟:

  • 提高了partition的數(shù)量,從而提高了consumer的并行能力啄骇,從而提高數(shù)據(jù)的消費能力
  • ?對于單partition的消費線程痴鳄,增加了一個固定長度的阻塞隊列和工作線程池進一步提高并行消費的能力
  • ?由于使用了spring-kafka,則把kafka-client的enable.auto.commit設置成了false缸夹,表示禁止kafka-client自動提交offset痪寻,因為就是之前的自動提交失敗,導致offset永遠沒更新虽惭,從而轉(zhuǎn)向使用spring-kafka的offset提交機制橡类。并且spring-kafka提供了多種提交策略:
/**
     * The ack mode to use when auto ack (in the configuration properties) is false.
     * <ul>
     * <li>RECORD: Ack after each record has been passed to the listener.</li>
     * <li>BATCH: Ack after each batch of records received from the consumer has been
     * passed to the listener</li>
     * <li>TIME: Ack after this number of milliseconds; (should be greater than
     * {@code #setPollTimeout(long) pollTimeout}.</li>
     * <li>COUNT: Ack after at least this number of records have been received</li>
     * <li>MANUAL: Listener is responsible for acking - use a
     * {@link AcknowledgingMessageListener}.
     * </ul>
     */
    private AbstractMessageListenerContainer.AckMode ackMode = AckMode.BATCH;

這些策略保證了在一批消息沒有完成消費的情況下,也能提交offset芽唇,從而避免了完全提交不上而導致永遠重復消費的問題顾画。

分析

那么問題來了,為什么spring-kafka的提交offset的策略能夠解決spring-kafka的auto-commit的帶來的重復消費的問題呢匆笤?下面通過分析spring-kafka的關鍵源碼來解析這個問題研侣。

  • 首先來看看spring-kafka的消費線程邏輯
if (isRunning() && this.definedPartitions != null) { 
      initPartitionsIfNeeded();      
 // we start the invoker here as there will be no rebalance calls to       
// trigger it, but only if the container is not set to autocommit       
// otherwise we will process records on a separate thread      
     if (!this.autoCommit) {        
            startInvoker();     
     }
 }

上面可以看到,如果auto.commit關掉的話炮捧,spring-kafka會啟動一個invoker庶诡,這個invoker的目的就是啟動一個線程去消費數(shù)據(jù),他消費的數(shù)據(jù)不是直接從kafka里面直接取的咆课,那么他消費的數(shù)據(jù)從哪里來呢末誓?他是從一個spring-kafka自己創(chuàng)建的阻塞隊列里面取的扯俱。

  • 然后會進入一個循環(huán),從源代碼中可以看到如果auto.commit被關掉的話喇澡, 他會先把之前處理過的數(shù)據(jù)先進行提交offset迅栅,然后再去從kafka里面取數(shù)據(jù)。

  • 然后把取到的數(shù)據(jù)丟給上面提到的阻塞列隊晴玖,由上面創(chuàng)建的線程去消費库继,并且如果阻塞隊列滿了導致取到的數(shù)據(jù)塞不進去的話,spring-kafka會調(diào)用kafka的pause方法窜醉,則consumer會停止從kafka里面繼續(xù)再拿數(shù)據(jù)宪萄。

  • 接著spring-kafka還會處理一些異常的情況,比如失敗之后是不是需要commit offset這樣的邏輯榨惰。

最后

  • spring-kafka是一個很好的用來操作kafka的庫拜英,并且可以和spring進行完美結合。
  • spring-kafka提供了一些kafka使用上功能的擴展琅催。
  • 相比于使用原生的kafka-client的api的話居凶,使用更加簡單,需要編寫的碼量更少藤抡。
  • 最好能夠使用最新的kafka(0.10.0)和spring-kafka(1.1.1.RELEASE)的版本
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末侠碧,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子缠黍,更是在濱河造成了極大的恐慌弄兜,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,482評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件瓷式,死亡現(xiàn)場離奇詭異替饿,居然都是意外死亡,警方通過查閱死者的電腦和手機贸典,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評論 2 382
  • 文/潘曉璐 我一進店門视卢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人廊驼,你說我怎么就攤上這事据过。” “怎么了妒挎?”我有些...
    開封第一講書人閱讀 152,762評論 0 342
  • 文/不壞的土叔 我叫張陵绳锅,是天一觀的道長。 經(jīng)常有香客問我饥漫,道長榨呆,這世上最難降的妖魔是什么罗标? 我笑而不...
    開封第一講書人閱讀 55,273評論 1 279
  • 正文 為了忘掉前任庸队,我火速辦了婚禮积蜻,結果婚禮上,老公的妹妹穿的比我還像新娘彻消。我一直安慰自己竿拆,他們只是感情好,可當我...
    茶點故事閱讀 64,289評論 5 373
  • 文/花漫 我一把揭開白布宾尚。 她就那樣靜靜地躺著丙笋,像睡著了一般。 火紅的嫁衣襯著肌膚如雪煌贴。 梳的紋絲不亂的頭發(fā)上御板,一...
    開封第一講書人閱讀 49,046評論 1 285
  • 那天,我揣著相機與錄音牛郑,去河邊找鬼怠肋。 笑死,一個胖子當著我的面吹牛淹朋,可吹牛的內(nèi)容都是我干的笙各。 我是一名探鬼主播,決...
    沈念sama閱讀 38,351評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼础芍,長吁一口氣:“原來是場噩夢啊……” “哼杈抢!你這毒婦竟也來了?” 一聲冷哼從身側響起仑性,我...
    開封第一講書人閱讀 36,988評論 0 259
  • 序言:老撾萬榮一對情侶失蹤惶楼,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后诊杆,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鲫懒,經(jīng)...
    沈念sama閱讀 43,476評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,948評論 2 324
  • 正文 我和宋清朗相戀三年刽辙,在試婚紗的時候發(fā)現(xiàn)自己被綠了窥岩。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,064評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡宰缤,死狀恐怖颂翼,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情慨灭,我是刑警寧澤朦乏,帶...
    沈念sama閱讀 33,712評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站氧骤,受9級特大地震影響呻疹,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜筹陵,卻給世界環(huán)境...
    茶點故事閱讀 39,261評論 3 307
  • 文/蒙蒙 一刽锤、第九天 我趴在偏房一處隱蔽的房頂上張望镊尺。 院中可真熱鬧,春花似錦并思、人聲如沸庐氮。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽弄砍。三九已至,卻和暖如春输涕,著一層夾襖步出監(jiān)牢的瞬間音婶,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評論 1 262
  • 我被黑心中介騙來泰國打工莱坎, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留桃熄,地道東北人。 一個月前我還...
    沈念sama閱讀 45,511評論 2 354
  • 正文 我出身青樓型奥,卻偏偏與公主長得像瞳收,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子厢汹,可洞房花燭夜當晚...
    茶點故事閱讀 42,802評論 2 345

推薦閱讀更多精彩內(nèi)容