1. 優(yōu)雅的退出消費者程序
package com.bonc.rdpe.kafka110.consumer;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
/**
* @author YangYunhe
* @date 2018-07-17 11:05:39
* @description: 優(yōu)雅的退出消費者
*/
public class QuitConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092");
props.put("group.id", "dev3-yangyunhe-topic001-group005");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("dev3-yangyunhe-topic001"));
final Thread mainThread = Thread.currentThread();
/*
* 退出循環(huán)需要通過另一個線程調用consumer.wakeup()方法
* 調用consumer.wakeup()可以退出poll(),并拋出WakeupException異常
* 我們不需要處理 WakeupException,因為它只是用于跳出循環(huán)的一種方式
* consumer.wakeup()是消費者唯一一個可以從其他線程里安全調用的方法
* 如果循環(huán)運行在主線程里乌昔,可以在 ShutdownHook里調用該方法
*/
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Starting exit...");
consumer.wakeup();
try {
// 主線程繼續(xù)執(zhí)行,以便可以關閉consumer尾组,提交偏移量
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("topic = " + record.topic() + ", partition = " + record.partition()
+ ", offset = " + record.offset());
}
consumer.commitAsync();
}
}catch (WakeupException e) {
// 不處理異常
} finally {
// 在退出線程之前調用consumer.close()是很有必要的忙芒,它會提交任何還沒有提交的東西,并向組協(xié)調器發(fā)送消息讳侨,告知自己要離開群組呵萨。
// 接下來就會觸發(fā)再均衡,而不需要等待會話超時跨跨。
consumer.commitSync();
consumer.close();
System.out.println("Closed consumer and we are done");
}
}
}
2. 多線程消費者
KafkaConsumer是非線程安全的潮峦,多線程需要處理好線程同步,多線程的實現(xiàn)方式有多種勇婴,這里介紹一種:每個線程各自實例化一個KakfaConsumer對象忱嘹,這種方式的缺點是:當這些線程屬于同一個消費組時,線程的數(shù)量受限于分區(qū)數(shù)耕渴,當消費者線程的數(shù)量大于分區(qū)數(shù)時拘悦,就有一部分消費線程一直處于空閑狀態(tài)
多線程消費者的線程實現(xiàn)類代碼如下:
package com.bonc.rdpe.kafka110.thread;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
/**
* @author YangYunhe
* @date 2018-07-17 10:48:45
* @description: 多線程消費者的線程實現(xiàn)類
*/
public class ConsumerLoop implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
private final int id;
public ConsumerLoop(int id, String groupId, List<String> topics) {
this.id = id;
this.topics = topics;
Properties props = new Properties();
props.put("bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092");
props.put("group.id", groupId);
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out.println(this.id + ": " + data);
}
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
}
}
public void shutdown() {
consumer.wakeup();
}
}
多線程消費者主程序代碼如下:
package com.bonc.rdpe.kafka110.consumer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.bonc.rdpe.kafka110.thread.ConsumerLoop;
/**
* @author YangYunhe
* @date 2018-07-17 10:39:25
* @description: 多線程消費者主程序
*/
public class MultiThreadConsumer {
public static void main(String[] args) {
int numConsumers = 3;
String groupId = "dev3-yangyunhe-topic001-group004";
List<String> topics = Arrays.asList("dev3-yangyunhe-topic001");
ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
final List<ConsumerLoop> consumers = new ArrayList<>();
for (int i = 0; i < numConsumers; i++) {
ConsumerLoop consumer = new ConsumerLoop(i, groupId, topics);
consumers.add(consumer);
executor.submit(consumer);
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (ConsumerLoop consumer : consumers) {
consumer.shutdown();
}
executor.shutdown();
try {
executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
運行結果:
2: {partition=2, offset=1216, value=...}
......
1: {partition=1, offset=1329, value=...}
......
0: {partition=0, offset=1292, value=...}
......
3. 獨立消費者
有時候你可能只需要一個消費者從一個主題的所有分區(qū)或者某個特定的分區(qū)讀取數(shù)據(jù)。這個時候就不需要消費者群組和再均衡了橱脸,只需要把主題或者分區(qū)分配給消費者础米,然后開始讀取消息并提交偏移量。如果是這樣的話添诉,就不需要訂閱主題屁桑,取而代之的是為自己分配分區(qū)。一個消費者可以訂閱主題(并加入消費者群組)吻商,或者為自己分配分區(qū)掏颊,但不能同時做這兩件事情糟红。以下是獨立消費者的示例代碼:
package com.bonc.rdpe.kafka110.consumer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
/**
* @author YangYunhe
* @date 2018-07-17 12:44:50
* @description: 獨立消費者
*/
public class AloneConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092");
// 獨立消費者不需要設置消費組
// props.put("group.id", "dev3-yangyunhe-topic001-group003");
props.put("auto.offset.reset", "earliest");
props.put("auto.commit.offset", false);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
/*
* consumer.partitionsFor(topic)用于獲取topic的分區(qū)信息
* 當有新的分區(qū)加入或者原有的分區(qū)被改變后艾帐,這個方法是不能動態(tài)感知的
* 所以要么周期性的執(zhí)行這個方法,要么當分區(qū)數(shù)改變的時候盆偿,你需要重新執(zhí)行這個程序
*/
List<PartitionInfo> partitionInfos = consumer.partitionsFor("dev3-yangyunhe-topic001");
List<TopicPartition> partitions = new ArrayList<>();
if(partitionInfos != null && partitionInfos.size() != 0) {
for (PartitionInfo partition : partitionInfos) {
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
}
consumer.assign(partitions);
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("partition = " + record.partition() + ", offset = " + record.offset());
}
consumer.commitAsync();
}
} finally {
consumer.commitSync();
consumer.close();
}
}
}
}