Kafka - 新消費(fèi)者

Kafka - 新消費(fèi)者


一付燥、數(shù)據(jù)來源

數(shù)據(jù)使用上一個(gè)博文所配置的 Flume欺劳,將文本數(shù)據(jù)寫入到 Kafka中。不過這次有所改變航夺,數(shù)據(jù)的監(jiān)控目錄 有所改變,寫入的Kafka的主題名也變更為A25崔涂。

flume寫入數(shù)據(jù).png-36kB
flume寫入數(shù)據(jù).png-36kB

這里我們可以看到 Flume 對于新傳上去的 A91 數(shù)據(jù)已經(jīng)完成消費(fèi)阳掐。

二、消費(fèi)者代碼

2.1 創(chuàng)建消費(fèi)者

創(chuàng)建消費(fèi)者所使用的屬性和生產(chǎn)者使用的屬性差距不是很大:

  1. bootstrap.servers:指定了 Kafka 集群的連接字符串。
  2. key.deserializer 和 value.deserializer 與生產(chǎn)者的 serializer 定義也很類似缭保,不過它們不是使用指定的類把 Java 對象轉(zhuǎn)成字節(jié)數(shù)組汛闸,而是使用指定的類把字節(jié)數(shù)組轉(zhuǎn)成 Java 對象。
  3. group.id:非必需涮俄,指定了 KafkaConsumer 屬于哪一個(gè)消費(fèi)者群組蛉拙。

創(chuàng)建消費(fèi)者的代碼如下:

Properties props = new Properties();
props.put("bootstrap.servers", "master:9092");
props.put("group.id", "TestConsumer");
props.put("key.deserializer",
        "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
        "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

2.2 訂閱主題

consumer.subscribe(Collections.singletonList("A25"));

因?yàn)槲覀兪褂肍lume對數(shù)據(jù)進(jìn)行 Sinks 消費(fèi)的時(shí)候,指定的主題為A25彻亲,因此我們這里在對數(shù)據(jù)進(jìn)行訂閱的時(shí)候孕锄,也是A25。

同時(shí)苞尝,可以對訂閱的主題傳遞正則表達(dá)式進(jìn)行匹配畸肆,一次訂閱多個(gè)主題。

2.3 輪詢消費(fèi)

try {
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(100);

        logger.info("records length = {}", records.count());

        for (ConsumerRecord record : records) {
            logger.info("topic = {}, partition = {}, offset = {}, key = {}, value = {}\n",
                    record.topic(), record.partition(), record.offset(),
                    record.key(), record.value());
        }
    }
} finally {
    consumer.close();
}

這是一個(gè)無限循環(huán)宙址。消費(fèi)者實(shí)際上是一個(gè)長期運(yùn)行的應(yīng)用程序轴脐,它通過持續(xù)輪詢向 Kafka 請求數(shù)據(jù)。

消費(fèi)者必須持續(xù)對 Kafka 進(jìn)行輪詢抡砂,否則會被認(rèn)為已經(jīng)死亡大咱,它的分區(qū)會被移交給群組里的其他消費(fèi)者。傳給 poll() 方法的參數(shù)是一個(gè)超時(shí)時(shí)間注益,用于控制 poll() 方法的阻塞時(shí)間(在消費(fèi)者的緩沖區(qū)里沒有可用數(shù)據(jù)時(shí)會發(fā)生阻塞)碴巾。如果該參數(shù)被設(shè)為 0,poll() 會立即返回丑搔,否則它會在指定的毫秒數(shù)內(nèi)一直等待 broker 返回?cái)?shù)據(jù)厦瓢。

poll() 方法返回一個(gè)記錄列表。每條記錄都包含了記錄所屬主題的信息啤月、記錄所在分區(qū)的信息煮仇、記錄在分區(qū)里的偏移量,以及記錄的鍵值對谎仲。我們一般會遍歷這個(gè)列表浙垫,逐條處理這些記錄。

在退出應(yīng)用程序之前使用 close() 方法關(guān)閉消費(fèi)者郑诺。網(wǎng)絡(luò)連接和 socket 也會隨之關(guān)閉夹姥,并立即觸發(fā)一次再均衡,而不是等待群組協(xié)調(diào)器發(fā)現(xiàn)它不再發(fā)送心跳并認(rèn)定它已死亡间景,因?yàn)槟菢有枰L的時(shí)間佃声,導(dǎo)致整個(gè)群組在一段時(shí)間內(nèi)無法讀取消息。

運(yùn)行結(jié)果如下:

<center>
kafka消費(fèi)結(jié)果.png-30kB
kafka消費(fèi)結(jié)果.png-30kB

</center>

三倘要、其他配置

3.1 pom文件

 <properties>
    <java.version>1.8</java.version>
    <kafka.version>1.1.0</kafka.version>
</properties>


<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>${kafka.version}</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.21</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.21</version>
    </dependency>
</dependencies>

3.2 log4j.properties

log4j.rootLogger=INFO,console
log4j.additivity.org.apache=true

# 控制臺(console)
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=DEBUG
log4j.appender.console.ImmediateFlush=true
log4j.appender.console.Target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%p] %c  -  %m%n
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末圾亏,一起剝皮案震驚了整個(gè)濱河市十拣,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌志鹃,老刑警劉巖夭问,帶你破解...
    沈念sama閱讀 219,110評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異曹铃,居然都是意外死亡缰趋,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評論 3 395
  • 文/潘曉璐 我一進(jìn)店門陕见,熙熙樓的掌柜王于貴愁眉苦臉地迎上來秘血,“玉大人,你說我怎么就攤上這事评甜』伊福” “怎么了?”我有些...
    開封第一講書人閱讀 165,474評論 0 356
  • 文/不壞的土叔 我叫張陵忍坷,是天一觀的道長粘舟。 經(jīng)常有香客問我,道長佩研,這世上最難降的妖魔是什么柑肴? 我笑而不...
    開封第一講書人閱讀 58,881評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮旬薯,結(jié)果婚禮上晰骑,老公的妹妹穿的比我還像新娘。我一直安慰自己袍暴,他們只是感情好些侍,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,902評論 6 392
  • 文/花漫 我一把揭開白布隶症。 她就那樣靜靜地躺著政模,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蚂会。 梳的紋絲不亂的頭發(fā)上淋样,一...
    開封第一講書人閱讀 51,698評論 1 305
  • 那天,我揣著相機(jī)與錄音胁住,去河邊找鬼趁猴。 笑死,一個(gè)胖子當(dāng)著我的面吹牛彪见,可吹牛的內(nèi)容都是我干的儡司。 我是一名探鬼主播,決...
    沈念sama閱讀 40,418評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼余指,長吁一口氣:“原來是場噩夢啊……” “哼捕犬!你這毒婦竟也來了跷坝?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,332評論 0 276
  • 序言:老撾萬榮一對情侶失蹤碉碉,失蹤者是張志新(化名)和其女友劉穎柴钻,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體垢粮,經(jīng)...
    沈念sama閱讀 45,796評論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡贴届,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,968評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了蜡吧。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片毫蚓。...
    茶點(diǎn)故事閱讀 40,110評論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖昔善,靈堂內(nèi)的尸體忽然破棺而出绍些,到底是詐尸還是另有隱情,我是刑警寧澤耀鸦,帶...
    沈念sama閱讀 35,792評論 5 346
  • 正文 年R本政府宣布柬批,位于F島的核電站,受9級特大地震影響袖订,放射性物質(zhì)發(fā)生泄漏氮帐。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,455評論 3 331
  • 文/蒙蒙 一洛姑、第九天 我趴在偏房一處隱蔽的房頂上張望上沐。 院中可真熱鬧,春花似錦楞艾、人聲如沸参咙。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蕴侧。三九已至,卻和暖如春两入,著一層夾襖步出監(jiān)牢的瞬間净宵,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評論 1 272
  • 我被黑心中介騙來泰國打工裹纳, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留择葡,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,348評論 3 373
  • 正文 我出身青樓剃氧,卻偏偏與公主長得像敏储,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子朋鞍,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,047評論 2 355

推薦閱讀更多精彩內(nèi)容