Maven依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
最簡(jiǎn)單的實(shí)例
public class DemoConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "10.20.0.139:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true){
ConsumerRecords<String, String> records = consumer.poll(0);
for(ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
程序解釋
- 配置項(xiàng)
- bootstrap.servers:kafka集群中的一個(gè)就行,只要能與客戶端通信
- group.id:消費(fèi)者組络断,所有消費(fèi)者都必要?dú)w屬于某一個(gè)消費(fèi)者組,每一個(gè)消費(fèi)者組對(duì)相同的Topic擁有不同的Offset托猩,即互不影響邦鲫,同屬于一個(gè)消費(fèi)者組的消費(fèi)者平攤消息,即不重復(fù)接受
- enable.auto.commit:offset偏移量按照指定時(shí)間間隔自動(dòng)提交汉额,此時(shí),只要從Topic中讀取了消息而且被自動(dòng)提交了offset榨汤,就認(rèn)為該消息已經(jīng)被消費(fèi)成功蠕搜,而不管客戶端在處理具體消息時(shí)是否正常處理,因此此種策略下客戶端需要自行設(shè)計(jì)消息處理失敗后的處理策略以防止消息丟失
- auto.commit.interval.ms:offset自動(dòng)提交的時(shí)間間隔
- key.deserializer:消息的key值解序列化類收壕,此處表明消息key值將被解析為字符串
- value.deserializer:消息value值解序列化類妓灌,此處表明消息value值將被解析為字符串
- 根據(jù)配置創(chuàng)建消費(fèi)者
- 指定消費(fèi)者監(jiān)聽的Topic,可多個(gè)
- 循環(huán)處理以下任務(wù)
- 從指定監(jiān)聽的Topic中拉取由上次提交的offset(含)到最新的數(shù)據(jù)
- 處理獲取得到的消息
擴(kuò)展一:取消自動(dòng)提交offset蜜宪,改為當(dāng)消息處理完成時(shí)手動(dòng)提交
public class ManualConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "10.20.0.139:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");---------------------------------//改動(dòng)1
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;---------------------------------------------//改動(dòng)2
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.key() + ", " + record.value());
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
consumer.commitSync();-------------------------------------------//改動(dòng)3
buffer.clear();
}
}
}
}
程序解釋
- 改動(dòng)1:把自動(dòng)提交offset的配置修改為非自動(dòng)提交
- 改動(dòng)2:這里為了說明多消息的處理需要一個(gè)過程虫埂,把消息加入到緩存中,待緩存達(dá)到指定大小后圃验,一次性處理這些緩存數(shù)據(jù)掉伏,比如存入數(shù)據(jù)庫
- 改動(dòng)3:手動(dòng)提交offset
擴(kuò)展二:任意變更offset值,而不是提交最新的
public class ResetOffsetDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "10.20.0.139:9092");
props.put("group.id", "test2");--------------------------------------------改動(dòng)1
props.put("auto.offset.reset", "earliest");--------------------------------改動(dòng)2
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true){--------------------------------------------------------------改動(dòng)3
//從訂閱的主題中獲取所有自上次offset提交到最新的數(shù)據(jù)
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
//獲取消息的分區(qū)信息
for(TopicPartition partition : records.partitions()){
//獲取指定分區(qū)得到的消息
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for(ConsumerRecord<String, String> record : partitionRecords){
//顯示該分區(qū)得到記錄的偏移量和值
System.out.println(record.offset() + ", " + record.value());
}
//獲取該分區(qū)上對(duì)于該消費(fèi)者組的最近一條消息偏移量
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
//提交最新的偏移量
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
}
}
程序解釋
- 改動(dòng)1:為便于測(cè)試說明澳窑,修改消費(fèi)者組斧散,之前提到,消費(fèi)者組是消費(fèi)的單位摊聋,變更消費(fèi)者組后相當(dāng)于對(duì)原有Topic重新消費(fèi)鸡捐,但是第一次訪問時(shí)默認(rèn)沒有offset,官方設(shè)置的默認(rèn)值是latest麻裁,也就是說箍镜,當(dāng)一個(gè)新的消費(fèi)者組向一個(gè)Topic請(qǐng)求消息時(shí),該組的offset會(huì)被默認(rèn)設(shè)置為其他消費(fèi)者組最新的offset煎源,因此丟失了歷史數(shù)據(jù)色迂,為避免這個(gè)問題,重新設(shè)置該策略薪夕,即改動(dòng)2
- 改動(dòng)2:設(shè)置新消費(fèi)者組對(duì)Topic首次請(qǐng)求的offset設(shè)置策略為最早脚草,即從頭開始
- 改動(dòng)3:見程序注釋女轿,這里需要理解kafka的設(shè)計(jì)阳懂,kafka對(duì)于每個(gè)Topic都有1個(gè)或者多個(gè)partition,目的是為了提高并發(fā)量和防止單機(jī)器瓶頸棒拂,消息發(fā)送者把消息發(fā)送到指定Topic的時(shí)候可以選擇默認(rèn)策略姑隅,此時(shí)kafka會(huì)根據(jù)負(fù)載均衡策略自動(dòng)把消息發(fā)到不同的partition写隶,也可以由發(fā)送者根據(jù)一定的策略(比如對(duì)消息的某些字段求哈希)指定要發(fā)送的分區(qū),每個(gè)分區(qū)互不干擾讲仰,每個(gè)分區(qū)對(duì)每個(gè)消費(fèi)者組維護(hù)著一個(gè)offset慕趴,每個(gè)分區(qū)還有冗余因子,避免單點(diǎn)故障鄙陡。提交的offset總是下一個(gè)要消費(fèi)的消息位置
擴(kuò)展三:不指定Topic冕房,而是指定partition進(jìn)行消息訂閱
public class PartitionAssignDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "10.20.0.139:9092");
props.put("group.id", "test3");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
//TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0));
while (true){
ConsumerRecords<String, String> records = consumer.poll(2000);
for(ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, partition = %d, value = %s%n", record.offset(), record.partition(), record.value());
}
}
}
}
此處的Topic “foo”只有一個(gè)partition,id為0趁矾,因此如果分配不存在的partition給消費(fèi)者時(shí)耙册,在poll的時(shí)候就會(huì)造成阻塞
程序解釋
- 根據(jù)topic名稱和partition號(hào)創(chuàng)建TopicPartition對(duì)象,該分區(qū)必須真實(shí)存在
- 給指定消費(fèi)者客戶端指定要監(jiān)聽的partition
- 此種監(jiān)聽方式不會(huì)導(dǎo)致topic對(duì)監(jiān)聽者組的平均分配或者在分配
擴(kuò)展四:消費(fèi)者只監(jiān)聽一個(gè)Topic的部分partition時(shí)會(huì)怎樣
創(chuàng)建一個(gè)新的topic "muti_part"毫捣,指定分區(qū)數(shù)為2详拙,往"bar"中發(fā)送幾條消息,確保2個(gè)分區(qū)中都有消息
//創(chuàng)建Topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic muti_part
//發(fā)送消息
kafka-console-producer.sh --broker-list localhost:9092 --topic muti_part
>abc
>def
>ghi
>ijk
>jkl
>lmn
>123
>
讀取消息
public class PartitionAssignDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "10.20.0.139:9092");
props.put("group.id", "test");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "muti_part";
TopicPartition partition0 = new TopicPartition(topic, 0);
//TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0));
while (true){
ConsumerRecords<String, String> records = consumer.poll(100);
for(ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, partition = %d, value = %s%n", record.offset(), record.partition(), record.value());
}
}
}
}
輸出結(jié)果如下:
15:46:17,064 [ main ] [ INFO ]:109 - Kafka version : 1.0.0
15:46:17,064 [ main ] [ INFO ]:110 - Kafka commitId : aaa7af6d4a11b29d
15:46:17,172 [ main ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=test] Discovered coordinator 10.20.0.139:9092 (id: 2147483647 rack: null)
offset = 0, partition = 0, value = def
offset = 1, partition = 0, value = ijk
offset = 2, partition = 0, value = lmn
結(jié)論:
kafka均勻的把消息放到不同的partition中蔓同,該消費(fèi)者只能獲取指定分區(qū)的消息
擴(kuò)展五:多個(gè)消費(fèi)者并行讀取
情形一:Topic只有一個(gè)partition時(shí)饶辙,以topic "foo"為例
public class MultiConsumerSinglePartition {
static class MyConsumer implements Runnable{
KafkaConsumer<String, String> consumer;
int buffer;
String name;
MyConsumer(KafkaConsumer<String, String> consumer, String name){
this.consumer = consumer;
this.buffer = 0;
this.name = name;
}
@Override
public void run() {
while (true){
ConsumerRecords<String, String> records = consumer.poll(100);
for(ConsumerRecord<String, String> record : records){
System.out.printf("name = %s, partition = %d, offset = %d, key = %s, value = %s%n", name,
record.partition(), record.offset(), record.key(), record.value());
buffer ++;
}
if(buffer >= 5){
consumer.commitSync();
buffer = 0;
System.out.println(name + " commit");
}
}
}
}
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "10.20.0.139:9092");
props.put("group.id", "mcsp");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo"));
new Thread(new MyConsumer(consumer, "consumer1")).start();
}
}
程序說明
- 消費(fèi)者每讀取到累計(jì)5條消息時(shí),提交offset斑粱,為便于測(cè)試后續(xù)線程斷開的情況弃揽,啟用多線程
輸出結(jié)果如下:
16:30:51,033 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Discovered coordinator 10.20.0.139:9092 (id: 2147483647 rack: null)
16:30:51,036 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Revoking previously assigned partitions []
16:30:51,036 [ Thread-1 ] [ INFO ]:336 - [Consumer clientId=consumer-1, groupId=mcsp3] (Re-)joining group
16:30:51,049 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Successfully joined group with generation 1
16:30:51,050 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Setting newly assigned partitions [foo-0]
name = consumer1, partition = 0, offset = 0, key = null, value = hello
name = consumer1, partition = 0, offset = 1, key = null, value = baby
name = consumer1, partition = 0, offset = 2, key = null, value = so
...
name = consumer1, partition = 0, offset = 16, key = null, value = 8
consumer1 commit
可以看出,進(jìn)行了一次分組分配则北,把partition0分配給了consumer1蹋宦,consumer1把所有消息讀完之后,更新了offset值咒锻,此時(shí)冷冗,在開一個(gè)終端,把name改為consumer2惑艇,重新啟動(dòng)一個(gè)進(jìn)程蒿辙,此時(shí),輸出如下:
consumer1上的輸出:
16:31:18,052 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Revoking previously assigned partitions [foo-0]
16:31:18,053 [ Thread-1 ] [ INFO ]:336 - [Consumer clientId=consumer-1, groupId=mcsp3] (Re-)joining group
16:31:18,068 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Successfully joined group with generation 2
16:31:18,069 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Setting newly assigned partitions [foo-0]
consumer2上的輸出:
16:31:15,567 [ main ] [ INFO ]:109 - Kafka version : 1.0.0
16:31:15,567 [ main ] [ INFO ]:110 - Kafka commitId : aaa7af6d4a11b29d
16:31:15,669 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Discovered coordinator 10.20.0.139:9092 (id: 2147483647 rack: null)
16:31:15,672 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Revoking previously assigned partitions []
16:31:15,672 [ Thread-1 ] [ INFO ]:336 - [Consumer clientId=consumer-1, groupId=mcsp3] (Re-)joining group
16:31:18,066 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Successfully joined group with generation 2
16:31:18,069 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Setting newly assigned partitions []
可以看出滨巴,由于consumer2的加入思灌,導(dǎo)致重新消費(fèi)者組的消息均衡策略被重新刷新,現(xiàn)在往foo中發(fā)送2條消息恭取,結(jié)果如下:只有consumer1有輸出泰偿,consumer2沒有輸出,也就是partition0被分配給了consumer1蜈垮,由于只有2條消息耗跛,consumer1并沒有提交offset裕照,現(xiàn)在斷開consumer1進(jìn)程,發(fā)現(xiàn)consumer2輸出如下:
16:32:27,074 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Revoking previously assigned partitions []
16:32:27,074 [ Thread-1 ] [ INFO ]:336 - [Consumer clientId=consumer-1, groupId=mcsp3] (Re-)joining group
16:32:27,081 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Successfully joined group with generation 3
16:32:27,082 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Setting newly assigned partitions [foo-0]
name = consumer2, partition = 0, offset = 17, key = null, value = a
name = consumer2, partition = 0, offset = 18, key = null, value = b
沒錯(cuò)调塌,消費(fèi)者組策略又被重新分配了晋南,consumer2輸出了剛剛發(fā)送的2條消息,這里就導(dǎo)致了一個(gè)問題羔砾,由于consumer1的異常關(guān)閉负间,導(dǎo)致沒有提交最新的offset,導(dǎo)致那2條消息被消費(fèi)了2次姜凄,解決這個(gè)問題的辦法見擴(kuò)展六:在消費(fèi)者監(jiān)聽Topic時(shí)添加ConsumerRebalanceListener
情形二:Topic有多個(gè)patition時(shí)政溃,以topic "muti_part"為例
代碼和操作方式跟上述一致,把topic名字改為muti_part即可
當(dāng)先啟動(dòng)的consumer把消息消費(fèi)完后态秧,有新消費(fèi)者加入是玩祟,會(huì)rebalence,此時(shí)屿聋,再往該Topic里面發(fā)消息時(shí)空扎,出現(xiàn):
consumer1:
17:12:32,879 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp] Revoking previously assigned partitions [muti_part-0, muti_part-1]
17:12:32,879 [ Thread-1 ] [ INFO ]:336 - [Consumer clientId=consumer-1, groupId=mcsp] (Re-)joining group
17:12:32,890 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp] Successfully joined group with generation 4
17:12:32,892 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp] Setting newly assigned partitions [muti_part-0]
name = consumer1, partition = 0, offset = 3, key = null, value = b
name = consumer1, partition = 0, offset = 4, key = null, value = d
name = consumer1, partition = 0, offset = 5, key = null, value = f
name = consumer1, partition = 0, offset = 6, key = null, value = h
consumer2:
name = consumer2, partition = 1, offset = 4, key = null, value = a
name = consumer2, partition = 1, offset = 5, key = null, value = c
name = consumer2, partition = 1, offset = 6, key = null, value = e
name = consumer2, partition = 1, offset = 7, key = null, value = g
可以看出,每個(gè)消費(fèi)者負(fù)責(zé)一個(gè)partition
擴(kuò)展六:消費(fèi)者組中有新消費(fèi)者加入又沒有提交offset時(shí)润讥,導(dǎo)致并發(fā)情況下rebalance后重復(fù)消費(fèi)數(shù)據(jù)转锈,添加ConsumerRebalanceListener
public class MultiConsumerSinglePartition {
static class MyConsumer implements Runnable{
KafkaConsumer<String, String> consumer;
int buffer;
String name;
MyConsumer(KafkaConsumer<String, String> consumer, String name){
this.consumer = consumer;
this.buffer = 0;
this.name = name;
}
@Override
public void run() {
while (true){
ConsumerRecords<String, String> records = consumer.poll(100);
for(ConsumerRecord<String, String> record : records){
System.out.printf("name = %s, partition = %d, offset = %d, key = %s, value = %s%n", name,
record.partition(), record.offset(), record.key(), record.value());
buffer ++;
}
if(buffer >= 5){
consumer.commitSync();
buffer = 0;
System.out.println(name + " commit");
}
}
}
}
static class MyListener implements ConsumerRebalanceListener{
KafkaConsumer<String, String> consumer;
String name;
MyListener(KafkaConsumer<String, String> consumer, String name) {
this.consumer = consumer;
this.name = name;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
Map<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
for(TopicPartition partition : partitions){
System.out.println("revoke " + name + " from partition " + partition.partition());
System.out.println("commit partition " + partition.partition() + " offset " +consumer.position(partition));
map.put(partition, new OffsetAndMetadata(consumer.position(partition)));
}
consumer.commitSync(map);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for(TopicPartition partition : partitions){
System.out.println("assign partition " + partition.partition() + " to " + name);
}
}
}
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "47.100.49.129:9092");
props.put("group.id", "mcsp2");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String name = "consumer2";
consumer.subscribe(Arrays.asList("muti_part"), new MyListener(consumer, name));
new Thread(new MyConsumer(consumer, name)).start();
}
}
程序說明
自定義MyListener類實(shí)現(xiàn)ConsumerRebalanceListener接口,其中onPartitionsRevoked方法表示當(dāng)某個(gè)分區(qū)從指定消費(fèi)者移除時(shí)應(yīng)該做的動(dòng)作楚殿,這里實(shí)現(xiàn)為提交每個(gè)分區(qū)最新的offset值撮慨,以免rebalance完成之后消息重復(fù)消費(fèi)
首先啟動(dòng)一個(gè)消費(fèi)者,在啟動(dòng)另一個(gè)消費(fèi)者脆粥,第一個(gè)消費(fèi)者輸出如下:
22:26:49,555 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Discovered coordinator 47.100.49.129:9092 (id: 2147483647 rack: null)
22:26:49,565 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Revoking previously assigned partitions []
22:26:49,565 [ Thread-1 ] [ INFO ]:336 - [Consumer clientId=consumer-1, groupId=mcsp2] (Re-)joining group
assign partition 0 to consumer1
assign partition 1 to consumer1
22:26:49,645 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Successfully joined group with generation 3
22:26:49,645 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Setting newly assigned partitions [muti_part-0, muti_part-1]
name = consumer1, partition = 0, offset = 0, key = null, value = 2
name = consumer1, partition = 1, offset = 0, key = null, value = 1
name = consumer1, partition = 1, offset = 1, key = null, value = 3
revoke consumer1 from partition 0
commit partition 0 offset 1
22:27:49,735 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Revoking previously assigned partitions [muti_part-0, muti_part-1]
revoke consumer1 from partition 1
commit partition 1 offset 2
22:27:49,765 [ Thread-1 ] [ INFO ]:336 - [Consumer clientId=consumer-1, groupId=mcsp2] (Re-)joining group
22:27:49,795 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Successfully joined group with generation 4
assign partition 0 to consumer1
22:27:49,795 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Setting newly assigned partitions [muti_part-0]
可以看出砌溺,在沒有啟動(dòng)第二個(gè)消費(fèi)者之前,2個(gè)分區(qū)都被指派給了consumer1变隔,consumer1讀取了3條消息规伐,并沒有提交
consumer2輸出如下:
22:27:48,488 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Discovered coordinator 47.100.49.129:9092 (id: 2147483647 rack: null)
22:27:48,488 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Revoking previously assigned partitions []
22:27:48,488 [ Thread-1 ] [ INFO ]:336 - [Consumer clientId=consumer-1, groupId=mcsp2] (Re-)joining group
assign partition 1 to consumer2
22:27:49,795 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Successfully joined group with generation 4
22:27:49,795 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Setting newly assigned partitions [muti_part-1]
可以看出,當(dāng)consumer2啟動(dòng)時(shí)匣缘,kafka將所有分區(qū)收回再重新分配猖闪,收回觸發(fā)了consumer1的listener接口提交最新的offset,因此consumer2不會(huì)重復(fù)讀到數(shù)據(jù)
注:此種方法只能用于有新的消費(fèi)者加入組時(shí)使用肌厨,當(dāng)消費(fèi)者異常斷開時(shí)培慌,依然不會(huì)提交offset,若想要保證消費(fèi)者斷開時(shí)不會(huì)重復(fù)消費(fèi)數(shù)據(jù)柑爸,則可以通過指定partition的方式監(jiān)聽吵护,同時(shí)把offset保存起來,原則是不讓kafka進(jìn)行rebalance
擴(kuò)展七:任意移動(dòng)offset值
接口一覽
public void seek(TopicPartition partition, long offset);
public void seekToBeginning(Collection<TopicPartition> partitions);
public void seekToEnd(Collection<TopicPartition> partitions);
Demo程序
public class ManualSeekDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "47.100.49.129:9092");
props.put("group.id", "mcsp5");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo"));
int buffer = 0;
long lastOffset = 0;
int part = 0;
while (true){
ConsumerRecords<String, String> records = consumer.poll(100);
for(ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
buffer ++;
lastOffset = record.offset();
part = record.partition();
}
if(buffer >= 3){
buffer = 0;
System.out.println("seek to " + (lastOffset - 1));
consumer.seek(new TopicPartition("foo", part), (lastOffset - 1));
}
}
}
}
程序說明
- 該程序設(shè)計(jì)為每讀取到3條消息就回退2條