優(yōu)雅的使用Kafka Consumer

如何消費數(shù)據(jù)

我們已經(jīng)知道了如何發(fā)送數(shù)據(jù)到Kafka,既然有數(shù)據(jù)發(fā)送,那么肯定就有數(shù)據(jù)消費,消費者也是Kafka整個體系中不可缺少的一環(huán)

public class KafkaConsumerDemo {
public static void main(String[] args) throws InterruptedException {
    Properties props = new Properties();

    // 必須設置的屬性
    props.put("bootstrap.servers", "192.168.239.131:9092");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("group.id", "group1");
    
    // 可選設置屬性
    props.put("enable.auto.commit", "true");
    // 自動提交offset,每1s提交一次
    props.put("auto.commit.interval.ms", "1000");
    props.put("auto.offset.reset","earliest ");
    props.put("client.id", "zy_client_id");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    // 訂閱test1 topic
    consumer.subscribe(Collections.singletonList("test1"));

    while(true) {
        //  從服務器開始拉取數(shù)據(jù)
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        records.forEach(record -> {
            System.out.printf("topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(),
                    record.offset(), record.key(), record.value());
        });
    }
  }
}

push 還是 pull

Kafka Consumer采用的是主動拉取broker數(shù)據(jù)進行消費的森瘪。一般消息中間件存在推送(server推送數(shù)據(jù)給consumer)和拉取(consumer主動取服務器取數(shù)據(jù))兩種方式铣耘,這兩種方式各有優(yōu)劣。

如果是選擇推送的方式最大的阻礙就是服務器不清楚consumer的消費速度攘宙,如果consumer中執(zhí)行的操作又是比較耗時的,那么consumer可能會不堪重負,甚至會導致系統(tǒng)掛掉唉堪。

而采用拉取的方式則可以解決這種情況模聋,consumer根據(jù)自己的狀態(tài)來拉取數(shù)據(jù),可以對服務器的數(shù)據(jù)進行延遲處理。但是這種方式也有一個劣勢就是服務器沒有數(shù)據(jù)的時候可能會一直輪詢唠亚,不過還好Kafka在poll()有參數(shù)允許消費者請求在“長輪詢”中阻塞链方,等待數(shù)據(jù)到達(并且可選地等待直到給定數(shù)量的字節(jié)可用以確保傳輸大小)。

必須屬性

上面代碼中消費者必須的屬性有4個,這里著重說一下group.id這個屬性,kafka Consumer和Producer不一樣,Consummer中有一個Consumer group(消費組)灶搜,由它來決定同一個Consumer group中的消費者具體拉取哪個partition的數(shù)據(jù),所以這里必須指定group.id屬性祟蚀。

  1. bootstrap.servers
    連接Kafka集群的地址,多個地址以逗號分隔
  2. key.deserializer
    消息中key反序列化類,需要和Producer中key序列化類相對應
  3. value.deserializer
    消息中value的反序列化類,需要和Producer中Value序列化類相對應
  4. group.id
    消費者所屬消費組的唯一標識

訂閱/取消主題

  1. 使用subscribe()方法訂閱主題
  2. 使用assign()方法訂閱確定主題和分區(qū)
List<PartitionInfo> partitionInfoList = consumer.partitionsFor("topic1");
if(null != partitionInfoList) {
  for(PartitionInfo partitionInfo : partitionInfoList) {
      consumer.assign(Collections.singletonList(
        new TopicPartition(partitionInfo.topic(), partitionInfo.partition())));
  }
}

通過subscribe()方法訂閱主題具有消費者自動再均衡(reblance)的功能割卖,存在多個消費者的情況下可以根據(jù)分區(qū)分配策略來自動分配各個消費者與分區(qū)的關系前酿。當組內(nèi)的消費者增加或者減少時,分區(qū)關系會自動調(diào)整鹏溯。實現(xiàn)消費負載均衡以及故障自動轉移罢维。使用assign()方法訂閱則不具有該功能。

  1. 取消主題
consumer.unsubscribe();
consumer.subscribe(new ArrayList<>());
consumer.assign(new ArrayList<TopicPartition>());

上面的三行代碼作用相同丙挽,都是取消訂閱肺孵,其中unsubscribe()方法即可以取消通過subscribe()方式實現(xiàn)的訂閱,還可以取消通過assign()方式實現(xiàn)的訂閱颜阐。

如何更好的消費數(shù)據(jù)

開頭處的代碼展示了我們是如何消費數(shù)據(jù)的,但是代碼未免過于簡單,我們測試的時候這樣寫沒有問題,但是實際開發(fā)過程中我們并不會這樣寫平窘,我們會選擇更加高效的方式,這里提供兩種方式供大家參考。

  1. 一個Consumer group,多個consumer,數(shù)量小于等于partition的數(shù)量
Kafka_multi_consumer.png
  1. 一個consumer,多線程處理事件
Kafka_multi_event_handler.png

第一種方式每個consumer都要維護一個獨立的TCP連接凳怨,如果分區(qū)數(shù)和創(chuàng)建consumer線程的數(shù)量過多瑰艘,會造成不小系統(tǒng)開銷。但是如果處理消息足夠快速肤舞,消費性能也會提升,如果慢的話就會導致消費性能降低紫新。

第二種方式是采用一個consumer,多個消息處理線程來處理消息李剖,其實在生產(chǎn)中芒率,瓶頸一般是集中在消息處理上的(可能會插入數(shù)據(jù)到數(shù)據(jù)庫,或者請求第三方API)杖爽,所以我們采用多個線程來處理這些消息敲董。

當然可以結合第一二種方式紫皇,采用多consumer+多個消息處理線程來消費Kafka中的數(shù)據(jù),核心代碼如下:

for (int i = 0; i < consumerNum; i++) {

  //根據(jù)屬性創(chuàng)建Consumer
  final Consumer<String, byte[]> consumer = consumerFactory.getConsumer(getServers(), groupId);
  consumerList.add(consumer);

  //訂閱主題
  consumer.subscribe(Arrays.asList(this.getTopic()));

  //consumer.poll()拉取數(shù)據(jù)
  BufferedConsumerRecords bufferedConsumerRecords = new BufferedConsumerRecords(consumer);

  getExecutor().scheduleWithFixedDelay(() -> {
      long startTime = System.currentTimeMillis();

      //進行消息處理
      consumeEvents(bufferedConsumerRecords);

      long sleepTime = intervalMillis - (System.currentTimeMillis() - startTime);
      if (sleepTime > 0) {
        Thread.sleep(sleepTime);
      }
  }, 0, 1000, TimeUnit.MILLISECONDS);
}

不過這種方式不能順序處理數(shù)據(jù),如果你的業(yè)務是順序處理腋寨,那么第一種方式可能更適合你聪铺。所以實際生產(chǎn)中請根據(jù)業(yè)務選擇最適合自己的方式。

消費數(shù)據(jù)考慮哪些問題?

在Kafka中無論是producer往topic中寫數(shù)據(jù),還是consumer從topic中讀數(shù)據(jù),都避免不了和offset打交道,關于offset主要有以下幾個概念萄窜。

kafka-partition-offset.png
  • Last Committed Offset:consumer group最新一次 commit 的 offset铃剔,表示這個 group 已經(jīng)把 Last Committed Offset 之前的數(shù)據(jù)都消費成功了。
  • Current Position:consumer group 當前消費數(shù)據(jù)的 offset查刻,也就是說键兜,Last Committed Offset 到 Current Position 之間的數(shù)據(jù)已經(jīng)拉取成功,可能正在處理穗泵,但是還未 commit普气。
  • Log End Offset(LEO):記錄底層日志(log)中的下一條消息的 offset。,對producer來說佃延,就是即將插入下一條消息的offset现诀。
  • High Watermark(HW):已經(jīng)成功備份到其他 replicas 中的最新一條數(shù)據(jù)的 offset,也就是說 Log End Offset 與 High Watermark 之間的數(shù)據(jù)已經(jīng)寫入到該 partition 的 leader 中履肃,但是還未完全備份到其他的 replicas 中仔沿,consumer是無法消費這部分消息(未提交消息)。

每個Kafka副本對象都有兩個重要的屬性:LEO和HW尺棋。注意是所有的副本封锉,而不只是leader副本。關于這兩者更詳細解釋膘螟,建議參考這篇文章成福。

對于消費者而言,我們更多時候關注的是消費完成之后如何和服務器進行消費確認萍鲸,告訴服務器這部分數(shù)據(jù)我已經(jīng)消費過了闷叉。

這里就涉及到了2個offset擦俐,一個是current position,一個是處理完畢向服務器確認的committed offset脊阴。顯然,異步模式下committed offset是落后于current position的。如果consumer掛掉了,那么下一次消費數(shù)據(jù)又只會從committed offset的位置拉取數(shù)據(jù)蚯瞧,就會導致數(shù)據(jù)被重復消費嘿期。

提交策略如何選擇

Kafka提供了3種提交offset的方式

  1. 自動提交
// 自動提交,默認true
props.put("enable.auto.commit", "true");
// 設置自動每1s提交一次
props.put("auto.commit.interval.ms", "1000");

  1. 手動同步提交offset
consumer.commitSync();
  1. 手動異步提交offset
consumer.commitAsync();

上面說了既然異步提交offset可能會重復消費,那么我使用同步提交是否就可以表明這個問題呢?我只能說too young,too sample埋合。

while(true) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  records.forEach(record -> {
      insertIntoDB(record);
      consumer.commitSync();
  });
}

很明顯不行,因為insertIntoDB和commitSync()做不到原子操作,如果insertIntoDB()成功了备徐,但是提交offset的時候consumer掛掉了,然后服務器重啟甚颂,仍然會導致重復消費問題蜜猾。

是否需要做到不重復消費秀菱?

只要保證處理消息和提交offset得操作是原子操作,就可以做到不重復消費蹭睡。我們可以自己管理committed offset,而不讓kafka來進行管理衍菱。

比如如下使用方式:

  1. 如果消費的數(shù)據(jù)剛好需要存儲在數(shù)據(jù)庫,那么可以把offset也存在數(shù)據(jù)庫肩豁,就可以就可以在一個事物中提交這兩個結果脊串,保證原子操作。
  2. 借助搜索引擎清钥,把offset和數(shù)據(jù)一起放到索引里面琼锋,比如Elasticsearch

每條記錄都有自己的offset,所以如果要管理自己的offset還得要做下面事情

  1. 設置enable.auto.commit=false
  2. 使用每個ConsumerRecord提供的offset來保存消費的位置。
  3. 在重新啟動時使用seek(TopicPartition, long)恢復上次消費的位置祟昭。

通過上面的方式就可以在消費端實現(xiàn)"Exactly Once"的語義,即保證只消費一次缕坎。但是是否真的需要保證不重復消費呢?這個得看具體業(yè)務,重復消費數(shù)據(jù)對整體有什么影響在來決定是否需要做到不重復消費篡悟。

再均衡(reblance)怎么辦念赶?

再均衡是指分區(qū)的所屬權從一個消費者轉移到另一個消費者的行為,再均衡期間恰力,消費組內(nèi)的消費組無法讀取消息叉谜。為了更精確的控制消息的消費,我們可以再訂閱主題的時候踩萎,通過指定監(jiān)聽器的方式來設定發(fā)生再均衡動作前后的一些準備或者收尾的動作停局。

consumer.subscribe(Collections.singletonList("test3"), new ConsumerRebalanceListener() {
  @Override
  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
      //再均衡之前和消費者停止讀取消息之后被調(diào)用
  }

  @Override
  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
      //重新分配分區(qū)之后和消費者開始消費之前被調(diào)用
  }
});

具體如何做得根據(jù)具體的業(yè)務邏輯來實現(xiàn),如果消息比較重要,你可以在再均衡的時候處理offset,如果不夠重要香府,你可以什么都不做董栽。

無法消費的數(shù)據(jù)怎么辦?

可能由于你的業(yè)務邏輯有些數(shù)據(jù)沒法消費這個時候怎么辦?同樣的還是的看你認為這個數(shù)據(jù)有多重要或者多不重要企孩,如果重要可以記錄日志,把它存入文件或者數(shù)據(jù)庫锭碳,以便于稍候進行重試或者定向分析。如果不重要就當做什么事情都沒有發(fā)生好了勿璃。

實際開發(fā)中我的處理方式

我開發(fā)的項目中,用到kafka的其中一個地方是消息通知(誰給你發(fā)了消息,點贊,評論等),大概的流程就是用戶在client端做了某些操作擒抛,就會發(fā)送數(shù)據(jù)到kafka,然后把這些數(shù)據(jù)進行一定的處理之后插入到HBase中。

其中采用了 N consumer thread + N Event Handler的方式來消費數(shù)據(jù),并采用自動提交offset补疑。對于無法消費的數(shù)據(jù)往往只是簡單處理下歧沪,打印下日志以及消息體(無法消費的情況非常非常少)。

得益于HBase的多version控制,即使是重復消費了數(shù)據(jù)也無關緊要莲组。這樣做沒有去避免重復消費的問題主要是基于以下幾點考慮

  1. 重復消費的概率較低诊胞,服務器整體性能穩(wěn)定
  2. 即便是重復消費了數(shù)據(jù),入庫了HBase,獲取數(shù)據(jù)也是只有一條,不影響結果的正確性
  3. 有更高的吞吐量
  4. 編程簡單,不用單獨去處理以及保存offset

幾個重要的消費者參數(shù)

  • fetch.min.bytes

    配置poll()拉取請求過程種能從Kafka拉取的最小數(shù)據(jù)量锹杈,如果可用數(shù)據(jù)量小于它指定的大小會等到有足夠可用數(shù)據(jù)時才會返回給消費者撵孤,其默認值時1B

  • fetch.max.wait.ms

    和fetch.min.bytes有關,用于指定Kafka的等待時間迈着,默認時間500ms。如果fetch.min.bytes設置為1MB,fetch.max.wait.ms設置為100ms,Kafka收到消費者請求后,要么返回1MB數(shù)據(jù),要么在100ms后返回所有可用數(shù)據(jù),就看哪個提交得到滿足邪码。

  • max.poll.records

    用于控制單次調(diào)用poll()能返回的最大記錄數(shù)量寥假,默認為500條數(shù)據(jù)

  • partition.assignment.stragety

    分區(qū)會被分配給群組的消費者,這個參數(shù)用于指定分區(qū)分配策略。默認是RangeAssignore,可選的還有RoundRobinAssignor霞扬。同樣它還支持自定義

其他更多參數(shù)請參考官方文檔糕韧。

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市喻圃,隨后出現(xiàn)的幾起案子萤彩,更是在濱河造成了極大的恐慌,老刑警劉巖斧拍,帶你破解...
    沈念sama閱讀 221,430評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件雀扶,死亡現(xiàn)場離奇詭異,居然都是意外死亡肆汹,警方通過查閱死者的電腦和手機愚墓,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,406評論 3 398
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來昂勉,“玉大人浪册,你說我怎么就攤上這事「谡眨” “怎么了村象?”我有些...
    開封第一講書人閱讀 167,834評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長攒至。 經(jīng)常有香客問我厚者,道長,這世上最難降的妖魔是什么迫吐? 我笑而不...
    開封第一講書人閱讀 59,543評論 1 296
  • 正文 為了忘掉前任库菲,我火速辦了婚禮,結果婚禮上志膀,老公的妹妹穿的比我還像新娘熙宇。我一直安慰自己,他們只是感情好梧却,可當我...
    茶點故事閱讀 68,547評論 6 397
  • 文/花漫 我一把揭開白布奇颠。 她就那樣靜靜地躺著败去,像睡著了一般。 火紅的嫁衣襯著肌膚如雪圆裕。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,196評論 1 308
  • 那天吨铸,我揣著相機與錄音诞吱,去河邊找鬼房维。 笑死抬纸,一個胖子當著我的面吹牛湿故,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播脖阵,決...
    沈念sama閱讀 40,776評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼独撇,長吁一口氣:“原來是場噩夢啊……” “哼纷铣!你這毒婦竟也來了搜立?” 一聲冷哼從身側響起槐秧,我...
    開封第一講書人閱讀 39,671評論 0 276
  • 序言:老撾萬榮一對情侶失蹤颠通,失蹤者是張志新(化名)和其女友劉穎膀懈,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體刘陶,經(jīng)...
    沈念sama閱讀 46,221評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,303評論 3 340
  • 正文 我和宋清朗相戀三年纷责,在試婚紗的時候發(fā)現(xiàn)自己被綠了碰逸。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片饵史。...
    茶點故事閱讀 40,444評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡胳喷,死狀恐怖吭露,靈堂內(nèi)的尸體忽然破棺而出尊惰,到底是詐尸還是另有隱情弄屡,我是刑警寧澤膀捷,帶...
    沈念sama閱讀 36,134評論 5 350
  • 正文 年R本政府宣布全庸,位于F島的核電站壶笼,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏保礼。R本人自食惡果不足惜氓英,卻給世界環(huán)境...
    茶點故事閱讀 41,810評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望铐拐。 院中可真熱鬧遍蟋,春花似錦虚青、人聲如沸棒厘。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,285評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽支救。三九已至各墨,卻和暖如春欲主,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背详恼。 一陣腳步聲響...
    開封第一講書人閱讀 33,399評論 1 272
  • 我被黑心中介騙來泰國打工挽铁, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留叽掘,地道東北人更扁。 一個月前我還...
    沈念sama閱讀 48,837評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像膛薛,于是被迫代替她去往敵國和親哄啄。 傳聞我的和親對象是個殘疾皇子风范,可洞房花燭夜當晚...
    茶點故事閱讀 45,455評論 2 359

推薦閱讀更多精彩內(nèi)容

  • 目標 高吞吐量來支持高容量的事件流處理 支持從離線系統(tǒng)加載數(shù)據(jù) 低延遲的消息系統(tǒng) 持久化 依賴文件系統(tǒng)虑润,持久化到本...
    jiangmo閱讀 1,291評論 0 4
  • 4. 設計思想 4.1 動機 我們設計的 Kafka 能夠作為一個統(tǒng)一的平臺來處理大公司可能擁有的所有實時數(shù)據(jù)饋送...
    瘋狂的橙閱讀 1,085評論 1 4
  • 背景介紹 Kafka簡介 Kafka是一種分布式的冗澈,基于發(fā)布/訂閱的消息系統(tǒng)陋葡。主要設計目標如下: 以時間復雜度為O...
    高廣超閱讀 12,841評論 8 167
  • 本文轉載自:http://www.cnblogs.com/likehua/p/3999538.html捌归,作者做了一...
    shunyang閱讀 3,994評論 0 22
  • 一剃浇、入門1、簡介Kafka is a distributed,partitioned,replicated com...
    HxLiang閱讀 3,352評論 0 9