Kafka - 新消費(fèi)者
一付燥、數(shù)據(jù)來源
數(shù)據(jù)使用上一個(gè)博文所配置的 Flume欺劳,將文本數(shù)據(jù)寫入到 Kafka中。不過這次有所改變航夺,數(shù)據(jù)的監(jiān)控目錄 有所改變,寫入的Kafka的主題名也變更為A25
崔涂。

這里我們可以看到 Flume 對于新傳上去的 A91 數(shù)據(jù)已經(jīng)完成消費(fèi)阳掐。
二、消費(fèi)者代碼
2.1 創(chuàng)建消費(fèi)者
創(chuàng)建消費(fèi)者所使用的屬性和生產(chǎn)者使用的屬性差距不是很大:
- bootstrap.servers:指定了 Kafka 集群的連接字符串。
- key.deserializer 和 value.deserializer 與生產(chǎn)者的 serializer 定義也很類似缭保,不過它們不是使用指定的類把 Java 對象轉(zhuǎn)成字節(jié)數(shù)組汛闸,而是使用指定的類把字節(jié)數(shù)組轉(zhuǎn)成 Java 對象。
- group.id:非必需涮俄,指定了 KafkaConsumer 屬于哪一個(gè)消費(fèi)者群組蛉拙。
創(chuàng)建消費(fèi)者的代碼如下:
Properties props = new Properties();
props.put("bootstrap.servers", "master:9092");
props.put("group.id", "TestConsumer");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
2.2 訂閱主題
consumer.subscribe(Collections.singletonList("A25"));
因?yàn)槲覀兪褂肍lume對數(shù)據(jù)進(jìn)行 Sinks 消費(fèi)的時(shí)候,指定的主題為A25彻亲,因此我們這里在對數(shù)據(jù)進(jìn)行訂閱的時(shí)候孕锄,也是A25。
同時(shí)苞尝,可以對訂閱的主題傳遞正則表達(dá)式進(jìn)行匹配畸肆,一次訂閱多個(gè)主題。
2.3 輪詢消費(fèi)
try {
while(true) {
ConsumerRecords<String, String> records = consumer.poll(100);
logger.info("records length = {}", records.count());
for (ConsumerRecord record : records) {
logger.info("topic = {}, partition = {}, offset = {}, key = {}, value = {}\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
}
} finally {
consumer.close();
}
這是一個(gè)無限循環(huán)宙址。消費(fèi)者實(shí)際上是一個(gè)長期運(yùn)行的應(yīng)用程序轴脐,它通過持續(xù)輪詢向 Kafka 請求數(shù)據(jù)。
消費(fèi)者必須持續(xù)對 Kafka 進(jìn)行輪詢抡砂,否則會被認(rèn)為已經(jīng)死亡大咱,它的分區(qū)會被移交給群組里的其他消費(fèi)者。傳給 poll() 方法的參數(shù)是一個(gè)超時(shí)時(shí)間注益,用于控制 poll() 方法的阻塞時(shí)間(在消費(fèi)者的緩沖區(qū)里沒有可用數(shù)據(jù)時(shí)會發(fā)生阻塞)碴巾。如果該參數(shù)被設(shè)為 0,poll() 會立即返回丑搔,否則它會在指定的毫秒數(shù)內(nèi)一直等待 broker 返回?cái)?shù)據(jù)厦瓢。
poll() 方法返回一個(gè)記錄列表。每條記錄都包含了記錄所屬主題的信息啤月、記錄所在分區(qū)的信息煮仇、記錄在分區(qū)里的偏移量,以及記錄的鍵值對谎仲。我們一般會遍歷這個(gè)列表浙垫,逐條處理這些記錄。
在退出應(yīng)用程序之前使用 close() 方法關(guān)閉消費(fèi)者郑诺。網(wǎng)絡(luò)連接和 socket 也會隨之關(guān)閉夹姥,并立即觸發(fā)一次再均衡,而不是等待群組協(xié)調(diào)器發(fā)現(xiàn)它不再發(fā)送心跳并認(rèn)定它已死亡间景,因?yàn)槟菢有枰L的時(shí)間佃声,導(dǎo)致整個(gè)群組在一段時(shí)間內(nèi)無法讀取消息。
運(yùn)行結(jié)果如下:
<center>
</center>
三倘要、其他配置
3.1 pom文件
<properties>
<java.version>1.8</java.version>
<kafka.version>1.1.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
3.2 log4j.properties
log4j.rootLogger=INFO,console
log4j.additivity.org.apache=true
# 控制臺(console)
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=DEBUG
log4j.appender.console.ImmediateFlush=true
log4j.appender.console.Target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%p] %c - %m%n