Kafka Java Consumer設計
Kafka Java Consumer采用的是單線程的設計吏饿。其入口類KafkaConsumer是一個雙線程的設計,即用戶主線程和心跳線程笆怠。
用戶主線程铝耻,指的是啟動Consumer應用程序main方法的線程,新引入的心跳線程(Heartbeat Thread)只負責定期給對應的Broker機器發(fā)送心跳請求,以表示消費者應用的存活性瓢捉。
多線程方案
KafkaConsumer不是線程安全的频丘,在使用中必須確保線程安全。
線程安全:多線程訪問時泡态,采用加鎖機制搂漠,當一個線程訪問該類的某個數(shù)據(jù)時,進行保護某弦,其他線程不能進行訪問直到該線程讀取完桐汤,
其他線程才可使用。不會出現(xiàn)數(shù)據(jù)不一致或數(shù)據(jù)污染問題刀崖。
線程不安全:不提供數(shù)據(jù)訪問保護惊科,有可能出現(xiàn)多個線程先后更改數(shù)據(jù)造成所得到的數(shù)據(jù)時臟數(shù)據(jù)。
鑒于以上事實亮钦,我們可以指定兩套多線程方案:
1.消費者程序啟動多個線程,每個線程維護專屬的KafkaConsumer充活,負責完整的消息獲取蜂莉、消息處理流程。
2.消費者程序使用單或多線程獲取消息混卵,同時創(chuàng)建多個消費線程執(zhí)行消息處理邏輯映穗。獲取消息的線程可以是一個或多個,每個維護專屬KafkaConsumer實例幕随,處理消息交由特定線程池來做蚁滋,從而實現(xiàn)消息獲取與消息處理的真正解耦。
以下為兩種方案的優(yōu)缺點:
實例代碼
1.方案一:
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public void run() {
try{
consumer.subscribe(Arrays.asList("topic"));
while(!closed.get()){
ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
}
}catch (WakeupException e){
if (!closed.get()) throw e;
}finally {
consumer.close();
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
2.方案二:
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...
private int workerNum = ...;
executors = new ThreadPoolExecutor(
workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
...
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (final ConsumerRecord record : records) {
executors.submit(new Worker(record));
}
}