Kafka 新版消費者 API(四):優(yōu)雅的退出消費者程序怠堪、多線程消費者以及獨立消費者

1. 優(yōu)雅的退出消費者程序

package com.bonc.rdpe.kafka110.consumer;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

/**
 * @author YangYunhe
 * @date 2018-07-17 11:05:39
 * @description: 優(yōu)雅的退出消費者
 */
public class QuitConsumer {

    public static void main(String[] args) {
        
        Properties props = new Properties();
        props.put("bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092");
        props.put("group.id", "dev3-yangyunhe-topic001-group005");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
        consumer.subscribe(Arrays.asList("dev3-yangyunhe-topic001"));
        
        final Thread mainThread = Thread.currentThread();
        
        /*
         * 退出循環(huán)需要通過另一個線程調用consumer.wakeup()方法
         * 調用consumer.wakeup()可以退出poll(),并拋出WakeupException異常
         * 我們不需要處理 WakeupException,因為它只是用于跳出循環(huán)的一種方式
         * consumer.wakeup()是消費者唯一一個可以從其他線程里安全調用的方法
         * 如果循環(huán)運行在主線程里乌昔,可以在 ShutdownHook里調用該方法
         */ 
        Runtime.getRuntime().addShutdownHook(new Thread() {
             public void run() {
                 System.out.println("Starting exit...");
                 consumer.wakeup();
                 try {
                     // 主線程繼續(xù)執(zhí)行,以便可以關閉consumer尾组,提交偏移量
                     mainThread.join();
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
             }
        });
        
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("topic = " + record.topic() + ", partition = " + record.partition() 
                        + ", offset = " + record.offset());
                }
                consumer.commitAsync();
            }
        }catch (WakeupException e) {
            // 不處理異常
        } finally {
            // 在退出線程之前調用consumer.close()是很有必要的忙芒,它會提交任何還沒有提交的東西,并向組協(xié)調器發(fā)送消息讳侨,告知自己要離開群組呵萨。
            // 接下來就會觸發(fā)再均衡,而不需要等待會話超時跨跨。
            consumer.commitSync();
            consumer.close();
            System.out.println("Closed consumer and we are done");
        }   
    }
}

2. 多線程消費者

KafkaConsumer是非線程安全的潮峦,多線程需要處理好線程同步,多線程的實現(xiàn)方式有多種勇婴,這里介紹一種:每個線程各自實例化一個KakfaConsumer對象忱嘹,這種方式的缺點是:當這些線程屬于同一個消費組時,線程的數(shù)量受限于分區(qū)數(shù)耕渴,當消費者線程的數(shù)量大于分區(qū)數(shù)時拘悦,就有一部分消費線程一直處于空閑狀態(tài)

多線程消費者的線程實現(xiàn)類代碼如下:

package com.bonc.rdpe.kafka110.thread;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;

/**
 * @author YangYunhe
 * @date 2018-07-17 10:48:45
 * @description: 多線程消費者的線程實現(xiàn)類
 */
public class ConsumerLoop implements Runnable {

    private final KafkaConsumer<String, String> consumer;
    private final List<String> topics;
    private final int id;

    public ConsumerLoop(int id, String groupId, List<String> topics) {
        this.id = id;
        this.topics = topics;
        Properties props = new Properties();
        props.put("bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092");
        props.put("group.id", groupId);
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<>(props);
      }

    @Override
    public void run() {
        try {
            consumer.subscribe(topics);
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                for (ConsumerRecord<String, String> record : records) {
                    Map<String, Object> data = new HashMap<>();
                    data.put("partition", record.partition());
                    data.put("offset", record.offset());
                    data.put("value", record.value());
                    System.out.println(this.id + ": " + data);
                }
            }
        } catch (WakeupException e) {
            // ignore for shutdown
        } finally {
            consumer.close();
        }
    }

    public void shutdown() {
        consumer.wakeup();
    }

}

多線程消費者主程序代碼如下:

package com.bonc.rdpe.kafka110.consumer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.bonc.rdpe.kafka110.thread.ConsumerLoop;

/**
 * @author YangYunhe
 * @date 2018-07-17 10:39:25
 * @description: 多線程消費者主程序
 */
public class MultiThreadConsumer {

    public static void main(String[] args) { 
          
        int numConsumers = 3;
        String groupId = "dev3-yangyunhe-topic001-group004";
        List<String> topics = Arrays.asList("dev3-yangyunhe-topic001");
        ExecutorService executor = Executors.newFixedThreadPool(numConsumers);

        final List<ConsumerLoop> consumers = new ArrayList<>();
        for (int i = 0; i < numConsumers; i++) {
            ConsumerLoop consumer = new ConsumerLoop(i, groupId, topics);
            consumers.add(consumer);
            executor.submit(consumer);
        }

        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                for (ConsumerLoop consumer : consumers) {
                    consumer.shutdown();
                } 
                executor.shutdown();
                try {
                    executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

運行結果:
2: {partition=2, offset=1216, value=...}
......
1: {partition=1, offset=1329, value=...}
......
0: {partition=0, offset=1292, value=...}
......

3. 獨立消費者

有時候你可能只需要一個消費者從一個主題的所有分區(qū)或者某個特定的分區(qū)讀取數(shù)據(jù)。這個時候就不需要消費者群組和再均衡了橱脸,只需要把主題或者分區(qū)分配給消費者础米,然后開始讀取消息并提交偏移量。如果是這樣的話添诉,就不需要訂閱主題屁桑,取而代之的是為自己分配分區(qū)。一個消費者可以訂閱主題(并加入消費者群組)吻商,或者為自己分配分區(qū)掏颊,但不能同時做這兩件事情糟红。以下是獨立消費者的示例代碼:

package com.bonc.rdpe.kafka110.consumer;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

/**
 * @author YangYunhe
 * @date 2018-07-17 12:44:50
 * @description: 獨立消費者
 */
public class AloneConsumer {
    
    public static void main(String[] args) {
        
        Properties props = new Properties();
        props.put("bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092");
        // 獨立消費者不需要設置消費組
        // props.put("group.id", "dev3-yangyunhe-topic001-group003");
        props.put("auto.offset.reset", "earliest");
        props.put("auto.commit.offset", false);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        
        /*
         * consumer.partitionsFor(topic)用于獲取topic的分區(qū)信息
         * 當有新的分區(qū)加入或者原有的分區(qū)被改變后艾帐,這個方法是不能動態(tài)感知的
         * 所以要么周期性的執(zhí)行這個方法,要么當分區(qū)數(shù)改變的時候盆偿,你需要重新執(zhí)行這個程序
         */
        List<PartitionInfo> partitionInfos = consumer.partitionsFor("dev3-yangyunhe-topic001");
        List<TopicPartition> partitions = new ArrayList<>();
        
        if(partitionInfos != null && partitionInfos.size() != 0) {
            for (PartitionInfo partition : partitionInfos) {
                partitions.add(new TopicPartition(partition.topic(), partition.partition()));
            }
            
            consumer.assign(partitions);
            
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(1000);
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println("partition = " + record.partition() + ", offset = " + record.offset());
                    }
                    consumer.commitAsync();
                }
            } finally {
                consumer.commitSync();
                consumer.close();
            } 
        }
    }
}
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末柒爸,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子事扭,更是在濱河造成了極大的恐慌捎稚,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,639評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異今野,居然都是意外死亡葡公,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評論 3 385
  • 文/潘曉璐 我一進店門条霜,熙熙樓的掌柜王于貴愁眉苦臉地迎上來催什,“玉大人,你說我怎么就攤上這事宰睡∑研祝” “怎么了?”我有些...
    開封第一講書人閱讀 157,221評論 0 348
  • 文/不壞的土叔 我叫張陵拆内,是天一觀的道長旋圆。 經常有香客問我,道長麸恍,這世上最難降的妖魔是什么灵巧? 我笑而不...
    開封第一講書人閱讀 56,474評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮抹沪,結果婚禮上孩等,老公的妹妹穿的比我還像新娘。我一直安慰自己采够,他們只是感情好肄方,可當我...
    茶點故事閱讀 65,570評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著蹬癌,像睡著了一般权她。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上逝薪,一...
    開封第一講書人閱讀 49,816評論 1 290
  • 那天隅要,我揣著相機與錄音,去河邊找鬼董济。 笑死步清,一個胖子當著我的面吹牛,可吹牛的內容都是我干的虏肾。 我是一名探鬼主播廓啊,決...
    沈念sama閱讀 38,957評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼封豪!你這毒婦竟也來了谴轮?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,718評論 0 266
  • 序言:老撾萬榮一對情侶失蹤吹埠,失蹤者是張志新(化名)和其女友劉穎第步,沒想到半個月后疮装,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 44,176評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡粘都,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,511評論 2 327
  • 正文 我和宋清朗相戀三年廓推,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片翩隧。...
    茶點故事閱讀 38,646評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡受啥,死狀恐怖,靈堂內的尸體忽然破棺而出鸽心,到底是詐尸還是另有隱情滚局,我是刑警寧澤,帶...
    沈念sama閱讀 34,322評論 4 330
  • 正文 年R本政府宣布顽频,位于F島的核電站藤肢,受9級特大地震影響,放射性物質發(fā)生泄漏糯景。R本人自食惡果不足惜嘁圈,卻給世界環(huán)境...
    茶點故事閱讀 39,934評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望蟀淮。 院中可真熱鬧最住,春花似錦、人聲如沸怠惶。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽策治。三九已至脓魏,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間通惫,已是汗流浹背茂翔。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留履腋,地道東北人珊燎。 一個月前我還...
    沈念sama閱讀 46,358評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像遵湖,于是被迫代替她去往敵國和親悔政。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,514評論 2 348

推薦閱讀更多精彩內容

  • 原文地址 當kafka最初被創(chuàng)建的時候奄侠,它附帶一個Scala的生產者和消費者客戶端卓箫。隨著時間的推移载矿,我們逐漸意識到...
    明翼閱讀 6,612評論 0 9
  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理垄潮,服務發(fā)現(xiàn)烹卒,斷路器,智...
    卡卡羅2017閱讀 134,633評論 18 139
  • 1. 訂閱主題 (1)訂閱主題的全部分區(qū) (2) 用正則表達式來訂閱主題的全部分區(qū) (3) 訂閱指定的分區(qū) 2. ...
    CoderJed閱讀 3,028評論 0 2
  • 騎行中弯洗,無意間看到一棵樹旅急,瞬間有一種被震撼到的感覺。也許是落日相映牡整,也許是心有所悟藐吮,仿佛在蒼穹之中勃然英發(fā)。 獨自...
    牧戎云越閱讀 339評論 0 0
  • 也許一個人最好的狀態(tài)泥从,就是你的本事配的上你的情懷,你可以腳踏實地沪摄,又可以仰望星空躯嫉,從容不迫的與歲月相處。而你心中所...
    左眼殤暮光閱讀 251評論 5 12