Kafka Comsumer Client API 說明(一)--- 基礎(chǔ)部分

Maven依賴
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
</dependency>
最簡(jiǎn)單的實(shí)例
public class DemoConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.20.0.139:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("foo", "bar"));
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(0);
            for(ConsumerRecord<String, String> record : records){
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

程序解釋

  • 配置項(xiàng)
    • bootstrap.servers:kafka集群中的一個(gè)就行,只要能與客戶端通信
    • group.id:消費(fèi)者組络断,所有消費(fèi)者都必要?dú)w屬于某一個(gè)消費(fèi)者組,每一個(gè)消費(fèi)者組對(duì)相同的Topic擁有不同的Offset托猩,即互不影響邦鲫,同屬于一個(gè)消費(fèi)者組的消費(fèi)者平攤消息,即不重復(fù)接受
    • enable.auto.commit:offset偏移量按照指定時(shí)間間隔自動(dòng)提交汉额,此時(shí),只要從Topic中讀取了消息而且被自動(dòng)提交了offset榨汤,就認(rèn)為該消息已經(jīng)被消費(fèi)成功蠕搜,而不管客戶端在處理具體消息時(shí)是否正常處理,因此此種策略下客戶端需要自行設(shè)計(jì)消息處理失敗后的處理策略以防止消息丟失
    • auto.commit.interval.ms:offset自動(dòng)提交的時(shí)間間隔
    • key.deserializer:消息的key值解序列化類收壕,此處表明消息key值將被解析為字符串
    • value.deserializer:消息value值解序列化類妓灌,此處表明消息value值將被解析為字符串
  • 根據(jù)配置創(chuàng)建消費(fèi)者
  • 指定消費(fèi)者監(jiān)聽的Topic,可多個(gè)
  • 循環(huán)處理以下任務(wù)
    • 從指定監(jiān)聽的Topic中拉取由上次提交的offset(含)到最新的數(shù)據(jù)
    • 處理獲取得到的消息
擴(kuò)展一:取消自動(dòng)提交offset蜜宪,改為當(dāng)消息處理完成時(shí)手動(dòng)提交
public class ManualConsumerDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.20.0.139:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "false");---------------------------------//改動(dòng)1
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("foo", "bar"));
        final int minBatchSize = 200;---------------------------------------------//改動(dòng)2
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.key() + ", " + record.value());
                buffer.add(record);
            }
            if (buffer.size() >= minBatchSize) {
                consumer.commitSync();-------------------------------------------//改動(dòng)3
                buffer.clear();
            }
        }
    }
}

程序解釋

  • 改動(dòng)1:把自動(dòng)提交offset的配置修改為非自動(dòng)提交
  • 改動(dòng)2:這里為了說明多消息的處理需要一個(gè)過程虫埂,把消息加入到緩存中,待緩存達(dá)到指定大小后圃验,一次性處理這些緩存數(shù)據(jù)掉伏,比如存入數(shù)據(jù)庫
  • 改動(dòng)3:手動(dòng)提交offset
擴(kuò)展二:任意變更offset值,而不是提交最新的
public class ResetOffsetDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.20.0.139:9092");
        props.put("group.id", "test2");--------------------------------------------改動(dòng)1
        props.put("auto.offset.reset", "earliest");--------------------------------改動(dòng)2
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("foo", "bar"));
        while (true){--------------------------------------------------------------改動(dòng)3
            //從訂閱的主題中獲取所有自上次offset提交到最新的數(shù)據(jù)
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
            //獲取消息的分區(qū)信息
            for(TopicPartition partition : records.partitions()){
                //獲取指定分區(qū)得到的消息
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                for(ConsumerRecord<String, String> record : partitionRecords){
                    //顯示該分區(qū)得到記錄的偏移量和值
                    System.out.println(record.offset() + ", " + record.value());
                }
                //獲取該分區(qū)上對(duì)于該消費(fèi)者組的最近一條消息偏移量
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                //提交最新的偏移量
                consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
            }
        }
    }
}

程序解釋

  • 改動(dòng)1:為便于測(cè)試說明澳窑,修改消費(fèi)者組斧散,之前提到,消費(fèi)者組是消費(fèi)的單位摊聋,變更消費(fèi)者組后相當(dāng)于對(duì)原有Topic重新消費(fèi)鸡捐,但是第一次訪問時(shí)默認(rèn)沒有offset,官方設(shè)置的默認(rèn)值是latest麻裁,也就是說箍镜,當(dāng)一個(gè)新的消費(fèi)者組向一個(gè)Topic請(qǐng)求消息時(shí),該組的offset會(huì)被默認(rèn)設(shè)置為其他消費(fèi)者組最新的offset煎源,因此丟失了歷史數(shù)據(jù)色迂,為避免這個(gè)問題,重新設(shè)置該策略薪夕,即改動(dòng)2
  • 改動(dòng)2:設(shè)置新消費(fèi)者組對(duì)Topic首次請(qǐng)求的offset設(shè)置策略為最早脚草,即從頭開始
  • 改動(dòng)3:見程序注釋女轿,這里需要理解kafka的設(shè)計(jì)阳懂,kafka對(duì)于每個(gè)Topic都有1個(gè)或者多個(gè)partition,目的是為了提高并發(fā)量和防止單機(jī)器瓶頸棒拂,消息發(fā)送者把消息發(fā)送到指定Topic的時(shí)候可以選擇默認(rèn)策略姑隅,此時(shí)kafka會(huì)根據(jù)負(fù)載均衡策略自動(dòng)把消息發(fā)到不同的partition写隶,也可以由發(fā)送者根據(jù)一定的策略(比如對(duì)消息的某些字段求哈希)指定要發(fā)送的分區(qū),每個(gè)分區(qū)互不干擾讲仰,每個(gè)分區(qū)對(duì)每個(gè)消費(fèi)者組維護(hù)著一個(gè)offset慕趴,每個(gè)分區(qū)還有冗余因子,避免單點(diǎn)故障鄙陡。提交的offset總是下一個(gè)要消費(fèi)的消息位置
擴(kuò)展三:不指定Topic冕房,而是指定partition進(jìn)行消息訂閱
public class PartitionAssignDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.20.0.139:9092");
        props.put("group.id", "test3");
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        String topic = "foo";
        TopicPartition partition0 = new TopicPartition(topic, 0);
        //TopicPartition partition1 = new TopicPartition(topic, 1);
        consumer.assign(Arrays.asList(partition0));
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(2000);
            for(ConsumerRecord<String, String> record : records){
                System.out.printf("offset = %d, partition = %d, value = %s%n", record.offset(), record.partition(), record.value());
            }
        }
    }
}

此處的Topic “foo”只有一個(gè)partition,id為0趁矾,因此如果分配不存在的partition給消費(fèi)者時(shí)耙册,在poll的時(shí)候就會(huì)造成阻塞

程序解釋

  • 根據(jù)topic名稱和partition號(hào)創(chuàng)建TopicPartition對(duì)象,該分區(qū)必須真實(shí)存在
  • 給指定消費(fèi)者客戶端指定要監(jiān)聽的partition
  • 此種監(jiān)聽方式不會(huì)導(dǎo)致topic對(duì)監(jiān)聽者組的平均分配或者在分配
擴(kuò)展四:消費(fèi)者只監(jiān)聽一個(gè)Topic的部分partition時(shí)會(huì)怎樣

創(chuàng)建一個(gè)新的topic "muti_part"毫捣,指定分區(qū)數(shù)為2详拙,往"bar"中發(fā)送幾條消息,確保2個(gè)分區(qū)中都有消息

//創(chuàng)建Topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic muti_part
//發(fā)送消息
kafka-console-producer.sh --broker-list localhost:9092 --topic muti_part
>abc
>def
>ghi
>ijk
>jkl
>lmn
>123
>

讀取消息

public class PartitionAssignDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.20.0.139:9092");
        props.put("group.id", "test");
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        String topic = "muti_part";
        TopicPartition partition0 = new TopicPartition(topic, 0);
        //TopicPartition partition1 = new TopicPartition(topic, 1);
        consumer.assign(Arrays.asList(partition0));
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(100);
            for(ConsumerRecord<String, String> record : records){
                System.out.printf("offset = %d, partition = %d, value = %s%n", record.offset(), record.partition(), record.value());
            }
        }
    }
}

輸出結(jié)果如下:

15:46:17,064 [ main ] [ INFO ]:109 - Kafka version : 1.0.0
15:46:17,064 [ main ] [ INFO ]:110 - Kafka commitId : aaa7af6d4a11b29d
15:46:17,172 [ main ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=test] Discovered coordinator 10.20.0.139:9092 (id: 2147483647 rack: null)
offset = 0, partition = 0, value = def
offset = 1, partition = 0, value = ijk
offset = 2, partition = 0, value = lmn

結(jié)論:

kafka均勻的把消息放到不同的partition中蔓同,該消費(fèi)者只能獲取指定分區(qū)的消息

擴(kuò)展五:多個(gè)消費(fèi)者并行讀取

情形一:Topic只有一個(gè)partition時(shí)饶辙,以topic "foo"為例

public class MultiConsumerSinglePartition {
    static class MyConsumer implements Runnable{
        KafkaConsumer<String, String> consumer;
        int buffer;
        String name;

        MyConsumer(KafkaConsumer<String, String> consumer, String name){
            this.consumer = consumer;
            this.buffer = 0;
            this.name = name;
        }

        @Override
        public void run() {
            while (true){
                ConsumerRecords<String, String> records = consumer.poll(100);
                for(ConsumerRecord<String, String> record : records){
                    System.out.printf("name = %s, partition = %d, offset = %d, key = %s, value = %s%n", name,
                            record.partition(), record.offset(), record.key(), record.value());
                    buffer ++;
                }
                if(buffer >= 5){
                    consumer.commitSync();
                    buffer = 0;
                    System.out.println(name  + " commit");
                }
            }
        }
    }

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.20.0.139:9092");
        props.put("group.id", "mcsp");
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("foo"));
        new Thread(new MyConsumer(consumer, "consumer1")).start();
    }
}

程序說明

  • 消費(fèi)者每讀取到累計(jì)5條消息時(shí),提交offset斑粱,為便于測(cè)試后續(xù)線程斷開的情況弃揽,啟用多線程

輸出結(jié)果如下:

16:30:51,033 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Discovered coordinator 10.20.0.139:9092 (id: 2147483647 rack: null)
16:30:51,036 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Revoking previously assigned partitions []
16:30:51,036 [ Thread-1 ] [ INFO ]:336 - [Consumer clientId=consumer-1, groupId=mcsp3] (Re-)joining group
16:30:51,049 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Successfully joined group with generation 1
16:30:51,050 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Setting newly assigned partitions [foo-0]
name = consumer1, partition = 0, offset = 0, key = null, value = hello
name = consumer1, partition = 0, offset = 1, key = null, value = baby
name = consumer1, partition = 0, offset = 2, key = null, value = so
...
name = consumer1, partition = 0, offset = 16, key = null, value = 8
consumer1 commit

可以看出,進(jìn)行了一次分組分配则北,把partition0分配給了consumer1蹋宦,consumer1把所有消息讀完之后,更新了offset值咒锻,此時(shí)冷冗,在開一個(gè)終端,把name改為consumer2惑艇,重新啟動(dòng)一個(gè)進(jìn)程蒿辙,此時(shí),輸出如下:

consumer1上的輸出:
16:31:18,052 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Revoking previously assigned partitions [foo-0]
16:31:18,053 [ Thread-1 ] [ INFO ]:336 - [Consumer clientId=consumer-1, groupId=mcsp3] (Re-)joining group
16:31:18,068 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Successfully joined group with generation 2
16:31:18,069 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Setting newly assigned partitions [foo-0]

consumer2上的輸出:
16:31:15,567 [ main ] [ INFO ]:109 - Kafka version : 1.0.0
16:31:15,567 [ main ] [ INFO ]:110 - Kafka commitId : aaa7af6d4a11b29d
16:31:15,669 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Discovered coordinator 10.20.0.139:9092 (id: 2147483647 rack: null)
16:31:15,672 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Revoking previously assigned partitions []
16:31:15,672 [ Thread-1 ] [ INFO ]:336 - [Consumer clientId=consumer-1, groupId=mcsp3] (Re-)joining group
16:31:18,066 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Successfully joined group with generation 2
16:31:18,069 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Setting newly assigned partitions []

可以看出滨巴,由于consumer2的加入思灌,導(dǎo)致重新消費(fèi)者組的消息均衡策略被重新刷新,現(xiàn)在往foo中發(fā)送2條消息恭取,結(jié)果如下:只有consumer1有輸出泰偿,consumer2沒有輸出,也就是partition0被分配給了consumer1蜈垮,由于只有2條消息耗跛,consumer1并沒有提交offset裕照,現(xiàn)在斷開consumer1進(jìn)程,發(fā)現(xiàn)consumer2輸出如下:

16:32:27,074 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Revoking previously assigned partitions []
16:32:27,074 [ Thread-1 ] [ INFO ]:336 - [Consumer clientId=consumer-1, groupId=mcsp3] (Re-)joining group
16:32:27,081 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Successfully joined group with generation 3
16:32:27,082 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Setting newly assigned partitions [foo-0]
name = consumer2, partition = 0, offset = 17, key = null, value = a
name = consumer2, partition = 0, offset = 18, key = null, value = b

沒錯(cuò)调塌,消費(fèi)者組策略又被重新分配了晋南,consumer2輸出了剛剛發(fā)送的2條消息,這里就導(dǎo)致了一個(gè)問題羔砾,由于consumer1的異常關(guān)閉负间,導(dǎo)致沒有提交最新的offset,導(dǎo)致那2條消息被消費(fèi)了2次姜凄,解決這個(gè)問題的辦法見擴(kuò)展六:在消費(fèi)者監(jiān)聽Topic時(shí)添加ConsumerRebalanceListener

情形二:Topic有多個(gè)patition時(shí)政溃,以topic "muti_part"為例

代碼和操作方式跟上述一致,把topic名字改為muti_part即可

當(dāng)先啟動(dòng)的consumer把消息消費(fèi)完后态秧,有新消費(fèi)者加入是玩祟,會(huì)rebalence,此時(shí)屿聋,再往該Topic里面發(fā)消息時(shí)空扎,出現(xiàn):

consumer1:
17:12:32,879 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp] Revoking previously assigned partitions [muti_part-0, muti_part-1]
17:12:32,879 [ Thread-1 ] [ INFO ]:336 - [Consumer clientId=consumer-1, groupId=mcsp] (Re-)joining group
17:12:32,890 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp] Successfully joined group with generation 4
17:12:32,892 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp] Setting newly assigned partitions [muti_part-0]
name = consumer1, partition = 0, offset = 3, key = null, value = b
name = consumer1, partition = 0, offset = 4, key = null, value = d
name = consumer1, partition = 0, offset = 5, key = null, value = f
name = consumer1, partition = 0, offset = 6, key = null, value = h
  
consumer2:
name = consumer2, partition = 1, offset = 4, key = null, value = a
name = consumer2, partition = 1, offset = 5, key = null, value = c
name = consumer2, partition = 1, offset = 6, key = null, value = e
name = consumer2, partition = 1, offset = 7, key = null, value = g

可以看出,每個(gè)消費(fèi)者負(fù)責(zé)一個(gè)partition

擴(kuò)展六:消費(fèi)者組中有新消費(fèi)者加入又沒有提交offset時(shí)润讥,導(dǎo)致并發(fā)情況下rebalance后重復(fù)消費(fèi)數(shù)據(jù)转锈,添加ConsumerRebalanceListener
public class MultiConsumerSinglePartition {
    static class MyConsumer implements Runnable{
        KafkaConsumer<String, String> consumer;
        int buffer;
        String name;

        MyConsumer(KafkaConsumer<String, String> consumer, String name){
            this.consumer = consumer;
            this.buffer = 0;
            this.name = name;
        }

        @Override
        public void run() {
            while (true){
                ConsumerRecords<String, String> records = consumer.poll(100);
                for(ConsumerRecord<String, String> record : records){
                    System.out.printf("name = %s, partition = %d, offset = %d, key = %s, value = %s%n", name,
                            record.partition(), record.offset(), record.key(), record.value());
                    buffer ++;
                }
                if(buffer >= 5){
                    consumer.commitSync();
                    buffer = 0;
                    System.out.println(name  + " commit");
                }
            }
        }
    }


    static class MyListener implements ConsumerRebalanceListener{

        KafkaConsumer<String, String> consumer;
        String name;

        MyListener(KafkaConsumer<String, String> consumer, String name) {
            this.consumer = consumer;
            this.name = name;
        }

        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            Map<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
            for(TopicPartition partition : partitions){
                System.out.println("revoke " + name + " from partition " + partition.partition());
                System.out.println("commit partition " + partition.partition() + " offset " +consumer.position(partition));
                map.put(partition, new OffsetAndMetadata(consumer.position(partition)));
            }
            consumer.commitSync(map);
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            for(TopicPartition partition : partitions){
                System.out.println("assign partition " + partition.partition() + " to " + name);
            }
        }
    }


    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "47.100.49.129:9092");
        props.put("group.id", "mcsp2");
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        String name = "consumer2";
        consumer.subscribe(Arrays.asList("muti_part"), new MyListener(consumer, name));
        new Thread(new MyConsumer(consumer, name)).start();
    }
}

程序說明

自定義MyListener類實(shí)現(xiàn)ConsumerRebalanceListener接口,其中onPartitionsRevoked方法表示當(dāng)某個(gè)分區(qū)從指定消費(fèi)者移除時(shí)應(yīng)該做的動(dòng)作楚殿,這里實(shí)現(xiàn)為提交每個(gè)分區(qū)最新的offset值撮慨,以免rebalance完成之后消息重復(fù)消費(fèi)

首先啟動(dòng)一個(gè)消費(fèi)者,在啟動(dòng)另一個(gè)消費(fèi)者脆粥,第一個(gè)消費(fèi)者輸出如下:

22:26:49,555 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Discovered coordinator 47.100.49.129:9092 (id: 2147483647 rack: null)
22:26:49,565 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Revoking previously assigned partitions []
22:26:49,565 [ Thread-1 ] [ INFO ]:336 - [Consumer clientId=consumer-1, groupId=mcsp2] (Re-)joining group
assign partition 0 to consumer1
assign partition 1 to consumer1
22:26:49,645 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Successfully joined group with generation 3
22:26:49,645 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Setting newly assigned partitions [muti_part-0, muti_part-1]
name = consumer1, partition = 0, offset = 0, key = null, value = 2
name = consumer1, partition = 1, offset = 0, key = null, value = 1
name = consumer1, partition = 1, offset = 1, key = null, value = 3
revoke consumer1 from partition 0
commit partition 0 offset 1
22:27:49,735 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Revoking previously assigned partitions [muti_part-0, muti_part-1]
revoke consumer1 from partition 1
commit partition 1 offset 2
22:27:49,765 [ Thread-1 ] [ INFO ]:336 - [Consumer clientId=consumer-1, groupId=mcsp2] (Re-)joining group
22:27:49,795 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Successfully joined group with generation 4
assign partition 0 to consumer1
22:27:49,795 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Setting newly assigned partitions [muti_part-0]

可以看出砌溺,在沒有啟動(dòng)第二個(gè)消費(fèi)者之前,2個(gè)分區(qū)都被指派給了consumer1变隔,consumer1讀取了3條消息规伐,并沒有提交

consumer2輸出如下:

22:27:48,488 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Discovered coordinator 47.100.49.129:9092 (id: 2147483647 rack: null)
22:27:48,488 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Revoking previously assigned partitions []
22:27:48,488 [ Thread-1 ] [ INFO ]:336 - [Consumer clientId=consumer-1, groupId=mcsp2] (Re-)joining group
assign partition 1 to consumer2
22:27:49,795 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Successfully joined group with generation 4
22:27:49,795 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Setting newly assigned partitions [muti_part-1]

可以看出,當(dāng)consumer2啟動(dòng)時(shí)匣缘,kafka將所有分區(qū)收回再重新分配猖闪,收回觸發(fā)了consumer1的listener接口提交最新的offset,因此consumer2不會(huì)重復(fù)讀到數(shù)據(jù)

注:此種方法只能用于有新的消費(fèi)者加入組時(shí)使用肌厨,當(dāng)消費(fèi)者異常斷開時(shí)培慌,依然不會(huì)提交offset,若想要保證消費(fèi)者斷開時(shí)不會(huì)重復(fù)消費(fèi)數(shù)據(jù)柑爸,則可以通過指定partition的方式監(jiān)聽吵护,同時(shí)把offset保存起來,原則是不讓kafka進(jìn)行rebalance

擴(kuò)展七:任意移動(dòng)offset值

接口一覽

public void seek(TopicPartition partition, long offset);
public void seekToBeginning(Collection<TopicPartition> partitions);
public void seekToEnd(Collection<TopicPartition> partitions);

Demo程序

public class ManualSeekDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "47.100.49.129:9092");
        props.put("group.id", "mcsp5");
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("foo"));
        int buffer = 0;
        long lastOffset = 0;
        int part = 0;
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(100);
            for(ConsumerRecord<String, String> record : records){
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                buffer ++;
                lastOffset = record.offset();
                part = record.partition();
            }
            if(buffer >= 3){
                buffer = 0;
                System.out.println("seek to " + (lastOffset - 1));
                consumer.seek(new TopicPartition("foo", part), (lastOffset - 1));
            }
        }
    }
}

程序說明

  • 該程序設(shè)計(jì)為每讀取到3條消息就回退2條
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市馅而,隨后出現(xiàn)的幾起案子祥诽,更是在濱河造成了極大的恐慌,老刑警劉巖用爪,帶你破解...
    沈念sama閱讀 221,635評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件原押,死亡現(xiàn)場(chǎng)離奇詭異胁镐,居然都是意外死亡偎血,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門盯漂,熙熙樓的掌柜王于貴愁眉苦臉地迎上來颇玷,“玉大人,你說我怎么就攤上這事就缆√” “怎么了?”我有些...
    開封第一講書人閱讀 168,083評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵竭宰,是天一觀的道長(zhǎng)空郊。 經(jīng)常有香客問我,道長(zhǎng)切揭,這世上最難降的妖魔是什么狞甚? 我笑而不...
    開封第一講書人閱讀 59,640評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮廓旬,結(jié)果婚禮上哼审,老公的妹妹穿的比我還像新娘。我一直安慰自己孕豹,他們只是感情好涩盾,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,640評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著励背,像睡著了一般春霍。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上叶眉,一...
    開封第一講書人閱讀 52,262評(píng)論 1 308
  • 那天终畅,我揣著相機(jī)與錄音,去河邊找鬼竟闪。 笑死离福,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的炼蛤。 我是一名探鬼主播妖爷,決...
    沈念sama閱讀 40,833評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了絮识?” 一聲冷哼從身側(cè)響起绿聘,我...
    開封第一講書人閱讀 39,736評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎次舌,沒想到半個(gè)月后熄攘,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,280評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡彼念,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,369評(píng)論 3 340
  • 正文 我和宋清朗相戀三年挪圾,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片逐沙。...
    茶點(diǎn)故事閱讀 40,503評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡哲思,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出吩案,到底是詐尸還是另有隱情棚赔,我是刑警寧澤,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布徘郭,位于F島的核電站靠益,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏残揉。R本人自食惡果不足惜胧后,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,870評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望冲甘。 院中可真熱鬧绩卤,春花似錦、人聲如沸江醇。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽陶夜。三九已至凛驮,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間条辟,已是汗流浹背黔夭。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留羽嫡,地道東北人本姥。 一個(gè)月前我還...
    沈念sama閱讀 48,909評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像杭棵,于是被迫代替她去往敵國(guó)和親婚惫。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,512評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容

  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,727評(píng)論 13 425
  • kafka的定義:是一個(gè)分布式消息系統(tǒng),由LinkedIn使用Scala編寫先舷,用作LinkedIn的活動(dòng)流(Act...
    時(shí)待吾閱讀 5,326評(píng)論 1 15
  • Kafka入門經(jīng)典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語閱讀 10,841評(píng)論 4 54
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理艰管,服務(wù)發(fā)現(xiàn),斷路器蒋川,智...
    卡卡羅2017閱讀 134,701評(píng)論 18 139
  • 一牲芋、入門1、簡(jiǎn)介Kafka is a distributed,partitioned,replicated com...
    HxLiang閱讀 3,352評(píng)論 0 9