Kafka消費者:讀消息從Kafka

前言

讀完本文,你將了解到如下知識點:

  1. kafka 的消費者 和 消費者組
  2. 如何正確使用kafka consumer
  3. 常用的 kafka consumer 配置

消費者 和 消費者組

  1. 什么是消費者?
    顧名思義攀细,消費者就是從kafka集群消費數(shù)據(jù)的客戶端浊闪,如下圖刽沾,展示了一個消費者從一個topic中消費數(shù)據(jù)的模型

    image
  2. 單個消費者模型存在的問題读处?
    如果這個時候 kafka 上游生產(chǎn)的數(shù)據(jù)很快尘分,超過了這個消費者1 的消費速度猜惋,那么就會導(dǎo)致數(shù)據(jù)堆積,產(chǎn)生一些大家都知道的蛋疼事情了培愁,那么我們只能加強 消費者 的消費能力著摔,所以也就有了我們下面來說的 消費者組

  3. 什么是消費者組?
    所謂 消費者組定续,其實就是一組 消費者 的集合谍咆,當我們看到下面這張圖是不是就特別舒服了,我們采用了一個 消費組 來消費這個 topic私股,眾人拾柴火焰高摹察,其消費能力那是按倍數(shù)遞增的,所以這里我們一般來說都是采用 消費者組 來消費數(shù)據(jù)倡鲸,而不會是 單消費者 來消費數(shù)據(jù)的供嚎。這里值得我們注意的是:

    • 一個topic 可以被 多個 消費者組 消費,但是每個 消費者組 消費的數(shù)據(jù)是 互不干擾 的峭状,也就是說克滴,每個 消費組 消費的都是 完整的數(shù)據(jù)
    • 一個分區(qū)只能被 同一個消費組內(nèi) 的一個 消費者 消費优床,而 不能拆給多個消費者 消費
image
  1. 是不是一個 消費組 的 消費者 越多其消費能力就越強呢劝赔?
    圖3 我們就很好的可以回答這個問題了,我們可以看到 消費者4 是完全沒有消費任何的數(shù)據(jù)的胆敞,所以如果你想要加強 消費者組 的能力望忆,除了添加消費者罩阵,分區(qū)的數(shù)量也是需要跟著增加的,只有這樣他們的并行度才能上的去启摄,消費能力才會強。
image
  1. 為了提高 消費組 的 消費能力幽钢,我是不是可以隨便添加 分區(qū) 和 消費者 呢歉备?
    答案當然是否定的啦。匪燕。蕾羊。嘿嘿
    我們看到圖2,一般來說我們建議 消費者 數(shù)量 和 分區(qū) 數(shù)量是一致的帽驯,當我們的消費能力不夠時龟再,就必須通過調(diào)整分區(qū)的數(shù)量來提高并行度,但是尼变,我們應(yīng)該盡量來避免這種情況發(fā)生利凑,比如:
    現(xiàn)在我們需要在圖2的基礎(chǔ)上增加一個 分區(qū)4,那么這個 分區(qū)4 該由誰來消費呢嫌术?這個時候kafka會進行 分區(qū)再均衡哀澈,來為這個分區(qū)分配消費者,分區(qū)再均衡 期間該 消費組 是不可用的度气,并且作為一個 被消費者割按,分區(qū)數(shù)的改動將影響到每一個消費者組 ,所以在創(chuàng)建 topic 的時候磷籍,我們就應(yīng)該考慮好分區(qū)數(shù)适荣,來盡量避免這種情況發(fā)生

  2. 分區(qū)分配過程
    上面我們提到了為 分區(qū)分配消費者,那么我們現(xiàn)在就來看看分配過程是怎么樣的院领。

    1. 確定 群組協(xié)調(diào)器
      每當我們創(chuàng)建一個消費組弛矛,kafka 會為我們分配一個 broker 作為該消費組的 coordinator(協(xié)調(diào)器)
    2. 注冊消費者 并選出 leader consumer
      當我們的有了 coordinator 之后,消費者將會開始往該 coordinator上進行注冊栅盲,第一個注冊的 消費者將成為該消費組的 leader汪诉,后續(xù)的 作為 follower,
    3. 當 leader 選出來后,他會從coordinator那里實時獲取分區(qū) 和 consumer 信息,并根據(jù)分區(qū)策略給每個consumer 分配 分區(qū)搁凸,并將分配結(jié)果告訴 coordinator藕届。
    4. follower 消費者將從 coordinator 那里獲取到自己相關(guān)的分區(qū)信息進行消費,對于所有的 follower 消費者而言赠摇,他們只知道自己消費的分區(qū),并不知道其他消費者的存在。
    5. 至此课竣,消費者都知道自己的消費的分區(qū)嘉赎,分區(qū)過程結(jié)束,當發(fā)送分區(qū)再均衡的時候于樟,leader 將會重復(fù)分配過程

消費組與分區(qū)重平衡

可以看到公条,當新的消費者加入消費組,它會消費一個或多個分區(qū)迂曲,而這些分區(qū)之前是由其他消費者負責的靶橱;另外,當消費者離開消費組(比如重啟路捧、宕機等)時关霸,它所消費的分區(qū)會分配給其他分區(qū)。這種現(xiàn)象稱為重平衡(rebalance)杰扫。重平衡是Kafka一個很重要的性質(zhì)队寇,這個性質(zhì)保證了高可用和水平擴展。不過也需要注意到章姓,在重平衡期間佳遣,所有消費者都不能消費消息,因此會造成整個消費組短暫的不可用啤覆。而且苍日,將分區(qū)進行重平衡也會導(dǎo)致原來的消費者狀態(tài)過期,從而導(dǎo)致消費者需要重新更新狀態(tài)窗声,這段期間也會降低消費性能相恃。后面我們會討論如何安全的進行重平衡以及如何盡可能避免。

消費者通過定期發(fā)送心跳(hearbeat)到一個作為組協(xié)調(diào)者(group coordinator)的broker來保持在消費組內(nèi)存活笨觅。這個broker不是固定的拦耐,每個消費組都可能不同。當消費者拉取消息或者提交時见剩,便會發(fā)送心跳杀糯。

如果消費者超過一定時間沒有發(fā)送心跳,那么它的會話(session)就會過期苍苞,組協(xié)調(diào)者會認為該消費者已經(jīng)宕機固翰,然后觸發(fā)重平衡「牵可以看到骂际,從消費者宕機到會話過期是有一定時間的,這段時間內(nèi)該消費者的分區(qū)都不能進行消息消費冈欢;通常情況下歉铝,我們可以進行優(yōu)雅關(guān)閉,這樣消費者會發(fā)送離開的消息到組協(xié)調(diào)者凑耻,這樣組協(xié)調(diào)者可以立即進行重平衡而不需要等待會話過期太示。

在0.10.1版本柠贤,Kafka對心跳機制進行了修改,將發(fā)送心跳與拉取消息進行分離类缤,這樣使得發(fā)送心跳的頻率不受拉取的頻率影響臼勉。另外更高版本的Kafka支持配置一個消費者多長時間不拉取消息但仍然保持存活,這個配置可以避免活鎖(livelock)呀非〖崴祝活鎖,是指應(yīng)用沒有故障但是由于某些原因不能進一步消費岸裙。

創(chuàng)建Kafka消費者
讀取Kafka消息只需要創(chuàng)建一個kafkaConsumer,創(chuàng)建過程與KafkaProducer非常相像速缆。我們需要使用四個基本屬性降允,bootstrap.servers、key.deserializer艺糜、value.deserializer和group.id剧董。其中,bootstrap.servers與創(chuàng)建KafkaProducer的含義一樣破停;key.deserializer和value.deserializer是用來做反序列化的翅楼,也就是將字節(jié)數(shù)組轉(zhuǎn)換成對象;group.id不是嚴格必須的真慢,但通常都會指定毅臊,這個參數(shù)是消費者的消費組。

下面是一個代碼樣例:

Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);

訂閱主題
創(chuàng)建完消費者后我們便可以訂閱主題了黑界,只需要通過調(diào)用subscribe()方法即可管嬉,這個方法接收一個主題列表,非常簡單:

consumer.subscribe(Collections.singletonList("customerCountries"));

這個例子中只訂閱了一個customerCountries主題朗鸠。另外蚯撩,我們也可以使用正則表達式來匹配多個主題,而且訂閱之后如果又有匹配的新主題烛占,那么這個消費組會立即對其進行消費胎挎。正則表達式在連接Kafka與其他系統(tǒng)時非常有用。比如訂閱所有的測試主題:

consumer.subscribe("test.*");

拉取循環(huán)
消費數(shù)據(jù)的API和處理方式很簡單忆家,我們只需要循環(huán)不斷拉取消息即可犹菇。Kafka對外暴露了一個非常簡潔的poll方法,其內(nèi)部實現(xiàn)了協(xié)作弦赖、分區(qū)重平衡项栏、心跳、數(shù)據(jù)拉取等功能蹬竖,但使用時這些細節(jié)都被隱藏了沼沈,我們也不需要關(guān)注這些流酬。下面是一個代碼樣例:

try {
   while (true) {  //1)
       ConsumerRecords<String, String> records = consumer.poll(100);  //2)
       for (ConsumerRecord<String, String> record : records)  //3)
       {
           log.debug("topic = %s, partition = %s, offset = %d,
              customer = %s, country = %s\n",
              record.topic(), record.partition(), record.offset(),
              record.key(), record.value());
           int updatedCount = 1;
           if (custCountryMap.countainsValue(record.value())) {
               updatedCount = custCountryMap.get(record.value()) + 1;
           }
           custCountryMap.put(record.value(), updatedCount)
           JSONObject json = new JSONObject(custCountryMap);
           System.out.println(json.toString(4))
       }
   }
} finally {
      consumer.close(); //4
}

其中,代碼中標注了幾點列另,說明如下:

1)這個例子使用無限循環(huán)消費并處理數(shù)據(jù)芽腾,這也是使用Kafka最多的一個場景,后面我們會討論如何更好的退出循環(huán)并關(guān)閉页衙。
2)這是上面代碼中最核心的一行代碼摊滔。我們不斷調(diào)用poll拉取數(shù)據(jù),如果停止拉取店乐,那么Kafka會認為此消費者已經(jīng)死亡并進行重平衡艰躺。參數(shù)值是一個超時時間,指明線程如果沒有數(shù)據(jù)時等待多長時間眨八,0表示不等待立即返回腺兴。
3)poll()方法返回記錄的列表,每條記錄包含key/value以及主題廉侧、分區(qū)页响、位移信息。
4)主動關(guān)閉可以使得Kafka立即進行重平衡而不需要等待會話過期段誊。
另外需要提醒的是闰蚕,消費者對象不是線程安全的,也就是不能夠多個線程同時使用一個消費者對象连舍;而且也不能夠一個線程有多個消費者對象没陡。簡而言之,一個線程一個消費者烟瞧,如果需要多個消費者那么請使用多線程來進行一一對應(yīng)诗鸭。

單線程版

package com.neuedu;

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

public class Consumer {
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers",
                "hadoop03:9092,hadoop05:9092,hadoop06:9092");// 該地址是集群的子集,用來探測集群参滴。
        props.put("group.id", "payment");// cousumer的分組id
        props.put("enable.auto.commit", "true");// 自動提交offsets
        props.put("auto.commit.interval.ms", "1000");// 每隔1s强岸,自動提交offsets
        props.put("session.timeout.ms", "30000");// Consumer向集群發(fā)送自己的心跳,超時則認為Consumer已經(jīng)死了砾赔,kafka會把它的分區(qū)分配給其他進程
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");// 反序列化器
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Arrays.asList("payment"));// 訂閱的topic,可以多個
//        String topic = "payment";
//        TopicPartition partition0 = new TopicPartition(topic, 0);
//        TopicPartition partition1 = new TopicPartition(topic, 1);
//        consumer.assign(Arrays.asList(partition0));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("賭東道賭東道賭東道賭東道賭東道賭東道 offset = %d, key = %s, value = %s, partition = %s",
                        record.offset(), record.key(), record.value(),record.partition());
                System.out.println();

            }
        }
    }
}

多線程版

package com.neuedu;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerRunnable implements Runnable {

    // 每個線程維護私有的KafkaConsumer實例
    private final KafkaConsumer<String, String> consumer;

    public ConsumerRunnable(String brokerList, String groupId, String topic) {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "true");        //本例使用自動提交位移
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
//分區(qū)分配策略
  props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
        props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));   // 本例使用分區(qū)副本自動分配策略
    }

    @Override
    public void run() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(200);   // 本例使用200ms作為獲取超時時間
            for (ConsumerRecord<String, String> record : records) {
                // 這里面寫處理消息的邏輯蝌箍,本例中只是簡單地打印消息
                System.out.println(Thread.currentThread().getName() + " consumed " + record.partition() +
                        "th message with offset: " + record.offset());
            }
        }
    }
}

package com.neuedu;

import java.util.ArrayList;
import java.util.List;

public class ConsumerGroup {

    private List<ConsumerRunnable> consumers;

    public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
        consumers = new ArrayList<>(consumerNum);
        for (int i = 0; i < consumerNum; ++i) {
            ConsumerRunnable consumerThread = new ConsumerRunnable(brokerList, groupId, topic);
            consumers.add(consumerThread);
        }
    }

    public void execute() {
        for (ConsumerRunnable task : consumers) {
            new Thread(task).start();
        }
    }
}

package com.neuedu;

public class ConsumerMain {

    public static void main(String[] args) {
        String brokerList = "hadoop03:9092";
        String groupId = "testGroup1";
        String topic = "payment";
        int consumerNum = 2;

        ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
        consumerGroup.execute();
    }
}

使用起來還是很簡單的,不過如果想要用好 consumer暴心,可能你還需要了解以下這些東西:

  1. 分區(qū)控制策略
  2. consumer 的一些常用配置
  3. offset 的控制

ok,那么我們接下來一個個來看吧妓盲。。专普。

分區(qū)控制策略

  1. 手動控制分區(qū)
    咱們先來說下最簡單的手動分區(qū)控制悯衬,代碼如下:
     String topic = "foo";
     TopicPartition partition0 = new TopicPartition(topic, 0);
     TopicPartition partition1 = new TopicPartition(topic, 1);
     consumer.assign(Arrays.asList(partition0, partition1));

看起來只是把普通的訂閱方式修改成了訂閱知道 topic 的分區(qū),其余的還是照常使用檀夹,不過這里也需要注意一下的是:

  • 一般只作為獨立消費者筋粗,也就是不能加入消費組策橘,或者說他本身就是作為一個消費組存在,要保證這一點娜亿,我們只需要保證其group id 是唯一的就可以了丽已。
  • 對于topic的分區(qū)變動不敏感,也就是說當 topic新增了分區(qū)买决,分區(qū)的數(shù)據(jù)將會發(fā)生改變沛婴,但該消費組對此確是不感知的,依然照常運行督赤,所以很多時候需要你手動consumer.partitionsFor()去查看topic的分區(qū)情況
  • 不要和 subscription混合使用
  1. 使用partition.assignment.strategy進行分區(qū)策略配置
  props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
  props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");

這里的話 kafka 是自帶兩種分區(qū)策略的嘁灯,為了方便理解,我們以如下場景為例來進行解釋:
已知:
TopicA 有 3 個 partition(分區(qū)):A-1躲舌,A-2旁仿,A-3;
TopicB 有 3 個 partition(分區(qū)):B-1孽糖,B-2,B-3混弥;
ConsumerA 和 ConsumerB 作為一個消費組 ConsumerGroup 同時消費 TopicA 和 TopicB

  • Range
    該方式最大的特點就是會將連續(xù)的分區(qū)分配給一個消費者实辑,根據(jù)示例铛碑,我們可以得出如下結(jié)論:

    ConsumerGroup 消費 TopicA 的時候:
    ConsumerA 會分配到 A-1,A-2
    ConsumerB 會分配到 A-3

    ConsumerGroup 消費 TopicB 的時候:
    ConsumerA 會分配到 B-1病蛉,B-2
    ConsumerB 會分配到 B-3

    所以:
    ConsumerA 分配到了4個分區(qū): A-1,A-2瑰煎,B-1铺然,B-2
    ConsumerB 分配到了2個分區(qū):A-3,B-3

  • RoundRobin
    該方式最大的特點就是會以輪詢的方式將分區(qū)分配給一個個消費者酒甸,根據(jù)示例魄健,我們可以得出如下結(jié)論:

    ConsumerGroup 消費 TopicA 的時候:
    ConsumerA 分配到 A-1
    ConsumerB 分配到 A-2
    ConsumerA 分配到 A-3

    ConsumerGroup 消費 TopicB 的時候,因為上次分配到了 ConsumerA,那么這次輪到 ConsumerB了 所以:
    ConsumerB 分配到 B-1
    ConsumerA 分配到 B-2
    ConsumerB 分配到 B-3

    所以:
    ConsumerA 分配到了4個分區(qū): A-1插勤,A-3沽瘦,B-2
    ConsumerB 分配到了2個分區(qū):A-2,B-1农尖,B-3

從上面我們也是可以看出這兩種策略的異同析恋,RoundRobin 相比較 Range 會使得分區(qū)分配的更加的均衡,但如果是消費單個 topic 盛卡,那么其均衡是差不多的助隧,而 Range 會比 RoundRobin 更具優(yōu)勢一點,至于這個優(yōu)勢滑沧,還得看你的具體業(yè)務(wù)了并村。

  • 自定義的分區(qū)策略
    上面兩種分區(qū)策略是 kafka 默認自帶的策略巍实,雖然大多數(shù)情況下夠用了,但是可能針對一些特殊需求橘霎,我們也可以定義自己的分區(qū)策略

    1. Range分區(qū)策略源碼
      如何自定義呢蔫浆?最好的方式莫過于看源碼是怎么實現(xiàn)的,然后自己依葫蘆畫瓢來一個姐叁,所以我們先來看看 Range分區(qū)策略源碼瓦盛,如下,我只做了簡單的注釋外潜,因為它本身也很簡單原环,重點看下 assign 的參數(shù)以及返回注釋就 ok了
    public class RangeAssignor extends AbstractPartitionAssignor{
      //省略部分代碼。处窥。嘱吗。。
     /**
       * 根據(jù)訂閱者 和 分區(qū)數(shù)量來進行分區(qū)
       * @param partitionsPerTopic: topic->分區(qū)數(shù)量
       * @param subscriptions: memberId 消費者id -> subscription 消費者信息
       * @return: memberId ->list<topic名稱 和 分區(qū)序號(id)>
       */
      @Override
      public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) {
          //topic -> list<消費者>
          Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
    
          //初始化 返回結(jié)果
          Map<String, List<TopicPartition>> assignment = new HashMap<>();
          for (String memberId : subscriptions.keySet())
              assignment.put(memberId, new ArrayList<TopicPartition>());
    
          for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
              //topic
              String topic = topicEntry.getKey();
              // 消費該topic的 consumer-id
              List<String> consumersForTopic = topicEntry.getValue();
    
              //topic 的分區(qū)數(shù)量
              Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
              if (numPartitionsForTopic == null)
                  continue;
    
              Collections.sort(consumersForTopic);
    
              //平均每個消費者分配的 分區(qū)數(shù)量
              int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
              //平均之后剩下的 分區(qū)數(shù)
              int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
    
              //這里就是將連續(xù)分區(qū)切開然后分配給每個消費者
              List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
              for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
                  int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
                  int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
                  assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
              }
          }
          return assignment;
        }
      }
    
    
    1. 自定義一個 分區(qū)策略
      這里先緩緩把滔驾,太簡單把谒麦,沒什么用,太復(fù)雜把哆致,一時也想不出好的場景绕德,如果你有需求,歡迎留言摊阀,我們一起來實現(xiàn)

Consumer 常用配置

首先耻蛇,我們都應(yīng)該知道,最全最全的文檔應(yīng)該是來自官網(wǎng)(雖然有時候可能官網(wǎng)找不到):
http://kafka.apachecn.org/documentation.html#newconsumerconfigs
嗯胞此,以下內(nèi)容來自 kafka權(quán)威指南 臣咖,請原諒我的小懶惰。漱牵。夺蛇。后續(xù)有時間會把工作中的遇到的補充上

  1. fetch.min.bytes
    該屬性指定了消費者從服務(wù)器獲取記錄的最小字節(jié)數(shù)。broker 在收到消費者的數(shù)據(jù)請求時布疙,
    如果可用的數(shù)據(jù)量小于fetch.min.bytes 指定的大小蚊惯,那么它會等到有足夠的可用數(shù)據(jù)時才把它返回給消費者。這樣可以降低消費者和 broker 的工作負載灵临,因為它們在主題不是很活躍的時候(或者一天里的低谷時段)就不需要來來回回地處理消息截型。如果沒有很多可用數(shù)據(jù),但消費者的 CPU 使用率卻很高儒溉,那么就需要把該屬性的值設(shè)得比默認值大宦焦。如果消費者的數(shù)量比較多,把該屬性的值設(shè)置得大一點可以降低broker 的工作負載。

  2. fetch.max.wait.ms
    我們通過 fetch.min.bytes 告訴 Kafka波闹,等到有足夠的數(shù)據(jù)時才把它返回給消費者酝豪。而feth.max.wait.ms 則用于指定 broker 的等待時間,默認是 500ms精堕。如果沒有足夠的數(shù)據(jù)流入 Kafka孵淘,消費者獲取最小數(shù)據(jù)量的要求就得不到滿足,最終導(dǎo)致 500ms 的延遲歹篓。如果要降低潛在的延遲(為了滿足 SLA)瘫证,可以把該參數(shù)值設(shè)置得小一些。
    如果 fetch.max.wait.ms 被設(shè)為 100ms庄撮,并且fetch.min.bytes 被設(shè)為 1MB背捌,那么 Kafka 在收到消費者的請求后,要么返回 1MB 數(shù)據(jù)洞斯,要么在100ms 后返回所有可用的數(shù)據(jù)毡庆,就看哪個條件先得到滿足。

  3. max.partition.fetch.bytes
    該屬性指定了服務(wù)器從每個分區(qū)里返回給消費者的最大字節(jié)數(shù)烙如。它的默認值是 1MB么抗,也就是說,KafkaConsumer.poll() 方法從每個分區(qū)里返回的記錄最多不超過 max.partition.fetch.bytes指定的字節(jié)亚铁。如果一個主題有 20 個分區(qū)和 5 個消費者乖坠,那么每個消費者需要至少 4MB 的可用內(nèi)存來接收記錄。在為消費者分配內(nèi)存時刀闷,可以給它們多分配一些,因為如果群組里有消費者發(fā)生崩潰仰迁,剩下的消費者需要處理更多的分區(qū)甸昏。
    max.partition.fetch.bytes 的值必須比 broker 能夠接收的最大消息的字節(jié)數(shù)(通過 max.message.size 屬性配置)大,否則消費者可能無法讀取這些消息徐许,導(dǎo)致消費者一直掛起重試施蜜。在設(shè)置該屬性時,另一個需要考慮的因素是消費者處理數(shù)據(jù)的時間雌隅。消費者需要頻繁調(diào)用poll() 方法來避免會話過期和發(fā)生分區(qū)再均衡翻默,如果單次調(diào)用 poll() 返回的數(shù)據(jù)太多,消費者需要更多的時間來處理恰起,可能無法及時進行下一個輪詢來避免會話過期修械。
    如果出現(xiàn)這種情況,可以把max.partition.fetch.bytes 值改小检盼,或者延長會話過期時間肯污。

  4. session.timeout.ms
    該屬性指定了消費者在被認為死亡之前可以與服務(wù)器斷開連接的時間,默認是 3s。如果消費者沒有在session.timeout.ms 指定的時間內(nèi)發(fā)送心跳給群組協(xié)調(diào)器蹦渣,就被認為已經(jīng)死亡哄芜,協(xié)調(diào)器就會觸發(fā)再均衡,把它的分區(qū)分配給群組里的其他消費者柬唯。
    該屬性與 heartbeat.interval.ms 緊密相關(guān)认臊。heartbeat.interval.ms 指定了 poll() 方法向協(xié)調(diào)器發(fā)送心跳的頻率,session.timeout.ms則指定了消費者可以多久不發(fā)送心跳锄奢。
    所以失晴,一般需要同時修改這兩個屬性,heartbeat.interval.ms 必須比 session.timeout.ms 小斟薇,一般是session.timeout.ms 的三分之一师坎。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 應(yīng)該是 1s堪滨。把session.timeout.ms 值設(shè)得比默認值小胯陋,可以更快地檢測和恢復(fù)崩潰的節(jié)點,不過長時間的輪詢或垃圾收集可能導(dǎo)致非預(yù)期的再均衡袱箱。把該屬性的值設(shè)置得大一些遏乔,可以減少意外的再均衡,不過檢測節(jié)點崩潰需要更長的時間发笔。

  5. auto.offset.reset
    該屬性指定了消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下(因消費者長時間失效盟萨,包含偏移量的記錄已經(jīng)過時并被刪除)該作何處理。
    它的默認值是 latest了讨,意思是說捻激,在偏移量無效的情況下,消費者將從最新的記錄開始讀取數(shù)據(jù)(在消費者啟動之后生成的記錄)前计。另一個值是earliest胞谭,意思是說,在偏移量無效的情況下男杈,消費者將從起始位置讀取分區(qū)的記錄丈屹。

  6. enable.auto.commit
    我們稍后將介紹幾種不同的提交偏移量的方式。該屬性指定了消費者是否自動提交偏移量伶棒,默認值是true旺垒。為了盡量避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)為 false肤无,由自己控制何時提交偏移量先蒋。如果把它設(shè)為true,還可以通過配置 auto.commit.interval.ms 屬性來控制提交的頻率宛渐。

  7. partition.assignment.strategy(這部分好像重復(fù)了 ~~~)
    我們知道鞭达,分區(qū)會被分配給群組里的消費者司忱。PartitionAssignor 根據(jù)給定的消費者和主題,決定哪些分區(qū)應(yīng)該被分配給哪個消費者畴蹭。Kafka 有兩個默認的分配策略坦仍。
    Range
      該策略會把主題的若干個連續(xù)的分區(qū)分配給消費者。假設(shè)消費者 C1 和消費者 C2 同時訂閱了主題T1 和 主題 T2叨襟,并且每個主題有 3 個分區(qū)繁扎。那么消費者 C1 有可能分配到這兩個主題的分區(qū) 0 和分區(qū) 1,而消費者 C2 分配到這兩個主題的分區(qū) 2糊闽。因為每個主題擁有奇數(shù)個分區(qū)梳玫,而分配是在主題內(nèi)獨立完成的,第一個消費者最后分配到比第二個消費者更多的分區(qū)右犹。只要使用了 Range 策略提澎,而且分區(qū)數(shù)量無法被消費者數(shù)量整除,就會出現(xiàn)這種情況念链。

    RoundRobin
      該策略把主題的所有分區(qū)逐個分配給消費者盼忌。如果使用 RoundRobin 策略來給消費者 C1 和消費者C2 分配分區(qū),那么消費者 C1 將分到主題 T1 的分區(qū) 0 和分區(qū) 2 以及主題 T2 的分區(qū) 1掂墓,消費者 C2 將分配到主題 T1 的分區(qū) 1 以及主題 T2 的分區(qū) 0 和分區(qū) 2谦纱。一般來說,如果所有消費者都訂閱相同的主題(這種情況很常見),RoundRobin 策略會給所有消費者分配相同數(shù)量的分區(qū)(或最多就差一個分區(qū))÷朔瘢可以通過設(shè)置 partition.assignment.strategy 來選擇分區(qū)策略。
    默認使用的是org.apache.kafka.clients.consumer.RangeAssignor祠乃,這個類實現(xiàn)了 Range 策略,不過也可以把它改成 org.apache.kafka.clients.consumer.RoundRobinAssignor兑燥。我們還可以使用自定義策略跳纳,在這種情況下,partition.assignment.strategy 屬性的值就是自定義類的名字贪嫂。

  8. client.id
    該屬性可以是任意字符串,broker 用它來標識從客戶端發(fā)送過來的消息艾蓝,通常被用在日志力崇、度量指標和配額里。

  9. max.poll.records
    該屬性用于控制單次調(diào)用 call() 方法能夠返回的記錄數(shù)量赢织,可以幫你控制在輪詢里需要處理的數(shù)據(jù)量亮靴。

  10. receive.buffer.bytes 和 send.buffer.bytes
    socket 在讀寫數(shù)據(jù)時用到的 TCP 緩沖區(qū)也可以設(shè)置大小。如果它們被設(shè)為 -1于置,就使用操作系統(tǒng)的默認值茧吊。
    如果生產(chǎn)者或消費者與 broker 處于不同的數(shù)據(jù)中心內(nèi),可以適當增大這些值,因為跨數(shù)據(jù)中心的網(wǎng)絡(luò)一般都有比較高的延遲和比較低的帶寬

提交(commit)與位移(offset)

當我們調(diào)用poll()時搓侄,該方法會返回我們沒有消費的消息瞄桨。當消息從broker返回消費者時,broker并不跟蹤這些消息是否被消費者接收到讶踪;Kafka讓消費者自身來管理消費的位移芯侥,并向消費者提供更新位移的接口,這種更新位移方式稱為提交(commit)乳讥。

在正常情況下柱查,消費者會發(fā)送分區(qū)的提交信息到Kafka,Kafka進行記錄云石。當消費者宕機或者新消費者加入時唉工,Kafka會進行重平衡,這會導(dǎo)致消費者負責之前并不屬于它的分區(qū)汹忠。重平衡完成后淋硝,消費者會重新獲取分區(qū)的位移,下面來看下兩種有意思的情況错维。

假如一個消費者在重平衡前后都負責某個分區(qū)奖地,如果提交位移比之前實際處理的消息位移要小,那么會導(dǎo)致消息重復(fù)消費赋焕,如下所示:

image

假如在重平衡前某個消費者拉取分區(qū)消息参歹,在進行消息處理前提交了位移,但還沒完成處理宕機了隆判,然后Kafka進行重平衡犬庇,新的消費者負責此分區(qū)并讀取提交位移,此時會“丟失”消息侨嘀,如下所示:

image

因此臭挽,提交位移的方式會對應(yīng)用有比較大的影響,下面來看下不同的提交方式咬腕。

自動提交

這種方式讓消費者來管理位移欢峰,應(yīng)用本身不需要顯式操作。當我們將enable.auto.commit設(shè)置為true涨共,那么消費者會在poll方法調(diào)用后每隔5秒(由auto.commit.interval.ms指定)提交一次位移纽帖。和很多其他操作一樣,自動提交也是由poll()方法來驅(qū)動的举反;在調(diào)用poll()時懊直,消費者判斷是否到達提交時間,如果是則提交上一次poll返回的最大位移火鼻。

需要注意到室囊,這種方式可能會導(dǎo)致消息重復(fù)消費雕崩。假如,某個消費者poll消息后融撞,應(yīng)用正在處理消息盼铁,在3秒后Kafka進行了重平衡,那么由于沒有更新位移導(dǎo)致重平衡后這部分消息重復(fù)消費懦铺。

提交當前位移

為了減少消息重復(fù)消費或者避免消息丟失捉貌,很多應(yīng)用選擇自己主動提交位移。設(shè)置auto.commit.offset為false冬念,那么應(yīng)用需要自己通過調(diào)用commitSync()來主動提交位移趁窃,該方法會提交poll返回的最后位移。

為了避免消息丟失急前,我們應(yīng)當在完成業(yè)務(wù)邏輯后才提交位移醒陆。而如果在處理消息時發(fā)生了重平衡,那么只有當前poll的消息會重復(fù)消費裆针。下面是一個自動提交的代碼樣例:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }

    try {
        consumer.commitSync();
    } catch (CommitFailedException e) {
        log.error("commit failed", e)
    }
}

上面代碼poll消息刨摩,并進行簡單的打印(在實際中有更多的處理)世吨,最后完成處理后進行了位移提交澡刹。

異步提交

手動提交有一個缺點,那就是當發(fā)起提交調(diào)用時應(yīng)用會阻塞耘婚。當然我們可以減少手動提交的頻率罢浇,但這個會增加消息重復(fù)的概率(和自動提交一樣)。另外一個解決辦法是沐祷,使用異步提交的API嚷闭。以下為使用異步提交的方式,應(yīng)用發(fā)了一個提交請求然后立即返回:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    }

    consumer.commitAsync();
}

但是異步提交也有個缺點赖临,那就是如果服務(wù)器返回提交失敗胞锰,異步提交不會進行重試。相比較起來兢榨,同步提交會進行重試直到成功或者最后拋出異常給應(yīng)用嗅榕。異步提交沒有實現(xiàn)重試是因為,如果同時存在多個異步提交吵聪,進行重試可能會導(dǎo)致位移覆蓋凌那。舉個例子,假如我們發(fā)起了一個異步提交commitA暖璧,此時的提交位移為2000,隨后又發(fā)起了一個異步提交commitB且位移為3000君旦;commitA提交失敗但commitB提交成功澎办,此時commitA進行重試并成功的話嘲碱,會將實際上將已經(jīng)提交的位移從3000回滾到2000,導(dǎo)致消息重復(fù)消費局蚀。

因此麦锯,基于這種性質(zhì),一般情況下對于異步提交琅绅,我們可能會通過回調(diào)的方式記錄提交結(jié)果:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("topic = %s, partition = %s,
        offset = %d, customer = %s, country = %s\n",
        record.topic(), record.partition(), record.offset(),
        record.key(), record.value());
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (e != null)
                log.error("Commit failed for offsets {}", offsets, e);
        } 
    });
}

而如果想進行重試同時又保證提交順序的話扶欣,一種簡單的辦法是使用單調(diào)遞增的序號。每次發(fā)起異步提交時增加此序號千扶,并且將此時的序號作為參數(shù)傳給回調(diào)方法料祠;當消息提交失敗回調(diào)時,檢查參數(shù)中的序號值與全局的序號值澎羞,如果相等那么可以進行重試提交髓绽,否則放棄(因為已經(jīng)有更新的位移提交了)。

混合同步提交與異步提交

正常情況下妆绞,偶然的提交失敗并不是什么大問題顺呕,因為后續(xù)的提交成功就可以了。但是在某些情況下(例如程序退出括饶、重平衡)株茶,我們希望最后的提交成功,因此一種非常普遍的方式是混合異步提交和同步提交图焰,如下所示:

try {
    while (true) {
       ConsumerRecords<String, String> records = consumer.poll(100);
       for (ConsumerRecord<String, String> record : records) {
           System.out.printf("topic = %s, partition = %s, offset = %d,
           customer = %s, country = %s\n",
           record.topic(), record.partition(),
           record.offset(), record.key(), record.value());
       }

       consumer.commitAsync();
    }
} catch (Exception e) {
    log.error("Unexpected error", e);
} finally {
    try {
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}

在正常處理流程中启盛,我們使用異步提交來提高性能,但最后使用同步提交來保證位移提交成功楞泼。

提交特定位移

commitSync()和commitAsync()會提交上一次poll()的最大位移驰徊,但如果poll()返回了批量消息,而且消息數(shù)量非常多堕阔,我們可能會希望在處理這些批量消息過程中提交位移棍厂,以免重平衡導(dǎo)致從頭開始消費和處理。幸運的是超陆,commitSync()和commitAsync()允許我們指定特定的位移參數(shù)牺弹,參數(shù)為一個分區(qū)與位移的map。由于一個消費者可能會消費多個分區(qū)时呀,所以這種方式會增加一定的代碼復(fù)雜度张漂,如下所示:

private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;

....

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());

        currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
        if (count % 1000 == 0)
            consumer.commitAsync(currentOffsets, null);
        count++;
} }

代碼中在處理poll()消息的過程中,不斷保存分區(qū)與位移的關(guān)系谨娜,每處理1000條消息就會異步提交(也可以使用同步提交)航攒。

重平衡監(jiān)聽器(Rebalance Listener)

變更分區(qū)命令

kafka-topics.sh --zookeeper hadoop03:2181 --alter --topic test --partitions 6

在分區(qū)重平衡前,如果消費者知道它即將不再負責某個分區(qū)趴梢,那么它可能需要將已經(jīng)處理過的消息位移進行提交漠畜。Kafka的API允許我們在消費者新增分區(qū)或者失去分區(qū)時進行處理币他,我們只需要在調(diào)用subscribe()方法時傳入ConsumerRebalanceListener對象,該對象有兩個方法:

  • public void onPartitionRevoked(Collection<topicpartition> partitions):此方法會在消費者停止消費消費后憔狞,在重平衡開始前調(diào)用蝴悉。</topicpartition>
  • public void onPartitionAssigned(Collection<topicpartition> partitions):此方法在分區(qū)分配給消費者后,在消費者開始讀取消息前調(diào)用瘾敢。</topicpartition>

下面來看一個onPartitionRevoked9)的例子拍冠,該例子在消費者失去某個分區(qū)時提交位移(以便其他消費者可以接著消費消息并處理):

private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

private class HandleRebalance implements ConsumerRebalanceListener {
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        System.out.println("Lost partitions in rebalance.
          Committing current
        offsets:" + currentOffsets);
        consumer.commitSync(currentOffsets);
    }
}

try {
    consumer.subscribe(topics, new HandleRebalance());
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
             System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
             currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
        }
        consumer.commitAsync(currentOffsets, null);
    }
} catch (WakeupException e) {
    // ignore, we're closing
} catch (Exception e) {
   log.error("Unexpected error", e);
} finally {
   try {
       consumer.commitSync(currentOffsets);
   } finally {
       consumer.close();
       System.out.println("Closed consumer and we are done");
   }
}

代碼中實現(xiàn)了onPartitionsRevoked()方法,當消費者失去某個分區(qū)時簇抵,會提交已經(jīng)處理的消息位移(而不是poll()的最大位移)庆杜。上面代碼會提交所有的分區(qū)位移,而不僅僅是失去分區(qū)的位移正压,但這種做法沒什么壞處欣福。

從指定位移開始消費

在此之前,我們使用poll()來從最后的提交位移開始消費焦履,但我們也可以從一個指定的位移開始消費拓劝。

如果想從分區(qū)開始端重新開始消費,那么可以使用seekToBeginning(TopicPartition tp)嘉裤;如果想從分區(qū)的最末端消費最新的消息郑临,那么可以使用seekToEnd(TopicPartition tp)。而且屑宠,Kafka還支持我們從指定位移開始消費厢洞。從指定位移開始消費的應(yīng)用場景有很多,其中最典型的一個是:位移存在其他系統(tǒng)(例如數(shù)據(jù)庫)中典奉,并且以其他系統(tǒng)的位移為準躺翻。

考慮這么個場景:我們從Kafka中讀取消費,然后進行處理卫玖,最后把結(jié)果寫入數(shù)據(jù)庫公你;我們既不想丟失消息,也不想數(shù)據(jù)庫中存在重復(fù)的消息數(shù)據(jù)假瞬。對于這樣的場景陕靠,我們可能會按如下邏輯處理:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
    {
        currentOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());
        processRecord(record);
        storeRecordInDB(record);
        consumer.commitAsync(currentOffsets);
    }
}

這個邏輯似乎沒什么問題,但是要注意到這么個事實脱茉,在持久化到數(shù)據(jù)庫成功后剪芥,提交位移到Kafka可能會失敗,那么這可能會導(dǎo)致消息會重復(fù)處理琴许。對于這種情況税肪,我們可以優(yōu)化方案,將持久化到數(shù)據(jù)庫與提交位移實現(xiàn)為原子性操作,也就是要么同時成功益兄,要么同時失敗签财。但這個是不可能的,因此我們可以在保存記錄到數(shù)據(jù)庫的同時偏塞,也保存位移,然后在消費者開始消費時使用數(shù)據(jù)庫的位移開始消費邦鲫。這個方案是可行的灸叼,我們只需要通過seek()來指定分區(qū)位移開始消費即可。下面是一個改進的樣例代碼:

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        //在消費者負責的分區(qū)被回收前提交數(shù)據(jù)庫事務(wù)庆捺,保存消費的記錄和位移
        commitDBTransaction();
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        //在開始消費前古今,從數(shù)據(jù)庫中獲取分區(qū)的位移,并使用seek()來指定開始消費的位移
        for(TopicPartition partition: partitions)
            consumer.seek(partition, getOffsetFromDB(partition));
    } 
}

    consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
    //在subscribe()之后poll一次滔以,并從數(shù)據(jù)庫中獲取分區(qū)的位移捉腥,使用seek()來指定開始消費的位移
    consumer.poll(0);
    for (TopicPartition partition: consumer.assignment())
        consumer.seek(partition, getOffsetFromDB(partition));

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
            processRecord(record);
            //保存記錄結(jié)果
            storeRecordInDB(record);
            //保存位移
            storeOffsetInDB(record.topic(), record.partition(), record.offset());
        }
        //提交數(shù)據(jù)庫事務(wù),保存消費的記錄以及位移
        commitDBTransaction();
    }

具體邏輯見代碼注釋你画,此處不再贅述抵碟。另外注意的是,seek()只是指定了poll()拉取的開始位移坏匪,這并不影響在Kafka中保存的提交位移(當然我們可以在seek和poll之后提交位移覆蓋)拟逮。

package com.neuedu;

import java.util.*;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

public class Consumer {
    private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
    static int count = 0;
    static KafkaConsumer<String, String> consumer;
    private class HandleRebalance implements ConsumerRebalanceListener {
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            System.out.println("Lost partitions in rebalance.Committing current offsets:" + currentOffsets);
            consumer.commitSync(currentOffsets);
        }
    }
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers",
                "hadoop03:9092,hadoop05:9092,hadoop06:9092");// 該地址是集群的子集,用來探測集群适滓。
        props.put("group.id", "payment");// cousumer的分組id
        props.put("enable.auto.commit", "false");// 自動提交offsets
        props.put("auto.commit.interval.ms", "1000");// 每隔1s沟娱,自動提交offsets
        props.put("session.timeout.ms", "30000");// Consumer向集群發(fā)送自己的心跳团搞,超時則認為Consumer已經(jīng)死了,kafka會把它的分區(qū)分配給其他進程
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");// 反序列化器
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<>(props);

//        consumer.subscribe(Arrays.asList("payment"));// 訂閱的topic,可以多個
        String topic = "payment";
        TopicPartition partition0 = new TopicPartition(topic, 0);
//        TopicPartition partition1 = new TopicPartition(topic, 1);
        consumer.assign(Arrays.asList(partition0));

        Collection<TopicPartition> partitions = Arrays.asList(partition0);

        consumer.seekToBeginning(partitions);
//        consumer.seek(partition0,495);

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("賭東道賭東道賭東道賭東道賭東道賭東道 offset = %d, key = %s, value = %s, partition = %s",
                        record.offset(), record.key(), record.value(),record.partition());
                System.out.println();
                currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
                if (count % 1 == 0)
                    consumer.commitAsync(currentOffsets, null);
                count++;
            }

        }
    }
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市律杠,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌麻裁,老刑警劉巖断国,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異朽砰,居然都是意外死亡尖滚,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進店門瞧柔,熙熙樓的掌柜王于貴愁眉苦臉地迎上來漆弄,“玉大人,你說我怎么就攤上這事造锅『惩伲” “怎么了?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵哥蔚,是天一觀的道長倒谷。 經(jīng)常有香客問我蛛蒙,道長,這世上最難降的妖魔是什么渤愁? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任牵祟,我火速辦了婚禮,結(jié)果婚禮上抖格,老公的妹妹穿的比我還像新娘诺苹。我一直安慰自己,他們只是感情好雹拄,可當我...
    茶點故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布收奔。 她就那樣靜靜地躺著,像睡著了一般滓玖。 火紅的嫁衣襯著肌膚如雪坪哄。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天势篡,我揣著相機與錄音翩肌,去河邊找鬼。 笑死禁悠,一個胖子當著我的面吹牛摧阅,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播绷蹲,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼棒卷,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了祝钢?” 一聲冷哼從身側(cè)響起比规,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎拦英,沒想到半個月后蜒什,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡疤估,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年灾常,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片铃拇。...
    茶點故事閱讀 40,561評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡钞瀑,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出慷荔,到底是詐尸還是另有隱情雕什,我是刑警寧澤,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站贷岸,受9級特大地震影響壹士,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜偿警,卻給世界環(huán)境...
    茶點故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一躏救、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧螟蒸,春花似錦落剪、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽呢堰。三九已至抄瑟,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間枉疼,已是汗流浹背皮假。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留骂维,地道東北人惹资。 一個月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像航闺,于是被迫代替她去往敵國和親褪测。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,573評論 2 359

推薦閱讀更多精彩內(nèi)容