1 Kafka
概述
1.1 定義
Kafka
是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列杰妓,主要應(yīng)用于大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域狈定。
應(yīng)用場(chǎng)景:
解耦
異步
削峰
1.2 消息隊(duì)列
1.2.1 傳統(tǒng)消息隊(duì)列的應(yīng)用場(chǎng)景
1.2.2 消息隊(duì)列的兩種模式
點(diǎn)對(duì)點(diǎn)模式:
消息生產(chǎn)者生產(chǎn)消息發(fā)送到Queue
中,然后消息消費(fèi)者從Queue
中取出并且消費(fèi)消息,消息被消費(fèi)以后税弃,Queue
中不再有存儲(chǔ),所以消息消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息讹语,Queue
支持存在多個(gè)消費(fèi)者钙皮,但是對(duì)一個(gè)消息而言,只會(huì)有一個(gè)消費(fèi)者可以消費(fèi)顽决。
發(fā)布訂閱模式:
消息生產(chǎn)者將消息發(fā)布到Topic
短条,同時(shí)有多個(gè)消息消費(fèi)者該消息,和點(diǎn)對(duì)點(diǎn)不同的是才菠,發(fā)布到Topic
中的消息會(huì)被所有訂閱者消費(fèi)茸时。
1.3 基礎(chǔ)架構(gòu)
Producer
:消息生產(chǎn)者,就是向Kafka Broker
發(fā)消息的客戶端
Consumer
:消息消費(fèi)者赋访,向Kafka Broker
取消息的客戶端
Consumer Group (CG)
:消費(fèi)者組可都,由多個(gè)Consumer
組成,消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù)蚓耽,一個(gè)分區(qū)只能由一個(gè)消費(fèi)者消費(fèi)渠牲,消費(fèi)者組之間互不影響,所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組步悠,即消費(fèi)者組是邏輯上的一個(gè)訂閱者
Broker
:一臺(tái)Kafka
服務(wù)器就是一個(gè)Broker
签杈,一個(gè)集群由多個(gè)Broker
組成,一個(gè)Broker
可以容納多個(gè)Topic
Topic
:可以理解為一個(gè)隊(duì)列鼎兽,生產(chǎn)者和消費(fèi)者面向的都是一個(gè)Topic
Partition
:為了實(shí)現(xiàn)擴(kuò)展性答姥,一個(gè)非常大的Topic
可以分布到多個(gè)Broker
(即服務(wù)器)上,一個(gè)Topic
可以分為多個(gè)Partition
谚咬,每個(gè)Partition
是一個(gè)有序的隊(duì)列
Replica
:副本鹦付,為保證集群中的某個(gè)節(jié)點(diǎn)發(fā)生故障時(shí),該節(jié)點(diǎn)上的Partition
數(shù)據(jù)不丟失择卦,且Kafka
仍然能夠繼續(xù)工作敲长,Kafka
提供了副本機(jī)制,一個(gè)Topic
的每個(gè)分區(qū)都有若干個(gè)副本秉继,一個(gè)leader
和若干個(gè)follower
leader
:每個(gè)分區(qū)多個(gè)副本的主潘明,生產(chǎn)者發(fā)送數(shù)據(jù)的對(duì)象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對(duì)象都是leader
follower
:每個(gè)分區(qū)多個(gè)副本中的從秕噪,實(shí)時(shí)從leader
中同步數(shù)據(jù),保持和leader
數(shù)據(jù)的同步厚宰,leader
發(fā)生故障時(shí)腌巾,某個(gè)follower
會(huì)成為新的follower
2 Kafka
快速入門
2.1 安裝部署
1遂填、解壓
[djm@hadoop102 software]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
2、修改解壓后的文件夾名稱
[djm@hadoop102 module]$ mv kafka_2.11-0.11.0.0/ kafka
3澈蝙、在/opt/module/kafka
目錄下創(chuàng)建logs
文件夾
[djm@hadoop102 kafka]$ mkdir logs
4吓坚、修改配置文件
[djm@hadoop102 kafka]$ vi config/server.properties
修改以下內(nèi)容
#broker的全局唯一編號(hào),不能重復(fù)
broker.id=0
#刪除topic功能使能
delete.topic.enable=true
#處理網(wǎng)絡(luò)請(qǐng)求的線程數(shù)量
num.network.threads=3
#用來(lái)處理磁盤IO的現(xiàn)成數(shù)量
num.io.threads=8
#發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400
#請(qǐng)求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600
#kafka運(yùn)行日志存放的路徑
log.dirs=/opt/module/kafka/logs
#topic在當(dāng)前broker上的分區(qū)個(gè)數(shù)
num.partitions=1
#用來(lái)恢復(fù)和清理data下數(shù)據(jù)的線程數(shù)量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長(zhǎng)時(shí)間灯荧,超時(shí)將被刪除
log.retention.hours=168
#配置連接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
5礁击、分發(fā)
[djm@hadoop102 kafka]$ xsync kafka
6、修改其他Broker
的broker.id
7逗载、Kafka
群起腳本
[djm@hadoop102 kafka]$ vim start-kafka
for i in `cat /opt/module/hadoop-2.7.2/etc/hadoop/slaves`
do
echo "========== $i =========="
ssh $i 'source /etc/profile&&/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties'
echo $?
done
[djm@hadoop102 kafka]$ chmod 777 start-kafka
[djm@hadoop102 kafka]$ sudo mv start-kafka /bin
8哆窿、啟動(dòng)Kafka
集群
[djm@hadoop102 kafka]$ start-kafka
2.2 命令行操作
1、查看所有Topic
[djm@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
2厉斟、創(chuàng)建Topic
[djm@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first
#--topic 定義topic名
#--replication-factor 定義副本數(shù)
#--partitions 定義分區(qū)數(shù)
--topic 定義topic名
--replication-factor 定義副本數(shù)
--partitions 定義分區(qū)數(shù)
3挚躯、刪除Topic
[djm@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first
4、發(fā)送消息
[djm@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
5擦秽、消費(fèi)消息
[djm@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
--from-beginning 會(huì)把topic中以往所有的消息消費(fèi)出來(lái)
6码荔、查看Topic
詳細(xì)信息
[djm@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first
7、修改分區(qū)數(shù)
[djm@hadoop102 kafka]$bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6
分區(qū)數(shù)只能增加感挥,不能減少
3 Kafka
架構(gòu)
3.1 Kafka
工作流程及文件存儲(chǔ)機(jī)制
Kafka
中消息是以Topic
進(jìn)行分類的缩搅,生產(chǎn)者生產(chǎn)消息,消費(fèi)者消費(fèi)消息触幼,都是面向Topic
的硼瓣;
Topic
是邏輯上的概念,而Partition
是物理上的概念域蜗,每個(gè)Partition
對(duì)應(yīng)于一個(gè)log
文件巨双,該log
文件中存儲(chǔ)的就是Producer
生產(chǎn)的數(shù)據(jù);
Producer
生產(chǎn)的數(shù)據(jù)會(huì)被不斷追加到該log
文件末端霉祸,且每條數(shù)據(jù)都有自己的offset
筑累,消費(fèi)者組中的每個(gè)消費(fèi)者,都會(huì)實(shí)時(shí)記錄自己消費(fèi)到了哪個(gè)offset
丝蹭,以便出錯(cuò)恢復(fù)時(shí)慢宗,從上次的位置繼續(xù)消費(fèi)。
由于生產(chǎn)者生產(chǎn)的消息會(huì)不斷的追加到log
文件末尾奔穿,為了防止文件過大而導(dǎo)致數(shù)據(jù)定位效率低下镜沽,Kafka
采取了分片和索引機(jī)制,將每個(gè)Partiton
分為多個(gè)segment
贱田,每個(gè)segment
對(duì)應(yīng)兩個(gè)文件缅茉,分別是.log
和.index
,這些文件位于同一個(gè)文件夾下男摧,文件夾的命名規(guī)則為Topic
名稱+Partiton
序號(hào)蔬墩,.log
和.index
文件以當(dāng)前segment
的第一條消息的offset
命名译打,index
存儲(chǔ)索引信息,.log
存儲(chǔ)數(shù)據(jù)信息拇颅,索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message
的物理偏移地址奏司。
3.2 Producer
3.2.1 分區(qū)策略
為什么要進(jìn)行分區(qū)?
- 方便在群集中擴(kuò)展樟插,每個(gè)
Partition
可以通過調(diào)整以適應(yīng)它所在的機(jī)器韵洋,而一個(gè)Topic
又可以有多個(gè)Partition
組成,因此整個(gè)集群就可以適應(yīng)任意大小的數(shù)據(jù)了 - 可以提高并發(fā)
分區(qū)的原則是什么黄锤?
我們需要將Producer
發(fā)送的數(shù)據(jù)封裝成一個(gè)ProducerRecord
對(duì)象:
- 指明
Partition
的情況下搪缨,直接將指明的值直接作為Partition
值 - 沒有指明
Partition
值但有key
的情況下,將key
的hash
值與Topic
的Partition
數(shù)進(jìn)行取余得到Partition
值 - 既沒有
Partition
值又沒有key
值的情況下猜扮,第一次調(diào)用時(shí)隨機(jī)生成一個(gè)整數(shù)(后面每次調(diào)用在這個(gè)整數(shù)上自增)勉吻,將這個(gè)值與Topic
可用的Partition
總數(shù)取余得到Partition
值,也就是常說(shuō)的round-robin
算法
3.2.2 數(shù)據(jù)可靠性保證
為保證Partition
發(fā)送的數(shù)據(jù)旅赢,能可靠的發(fā)送到指定的Topic
齿桃,Topic
的每個(gè)Partition
收到Producer
發(fā)送的數(shù)據(jù)后,都需要向Producer
發(fā)送ack
(acknowledgement
確認(rèn)收到)煮盼,如果Producer
收到ack
短纵,就會(huì)進(jìn)行下一輪的發(fā)送,否則重新發(fā)送數(shù)據(jù)僵控。
副本數(shù)據(jù)同步策略:
方案 | 優(yōu)點(diǎn) | 缺點(diǎn) |
---|---|---|
半數(shù)以上完成同步香到,就發(fā)送ack
|
延遲低 | 選舉新的leader 時(shí),容忍n 臺(tái)節(jié)點(diǎn)的故障报破,需要2n+1 個(gè)副本 |
全部完成同步悠就,才發(fā)送ack
|
選舉新的leader 時(shí),容忍n 臺(tái)節(jié)點(diǎn)的故障充易,需要n+1 個(gè)副本 |
延遲高 |
Kafka
選擇了第二種方案梗脾,原因如下:
同樣為了容忍n
臺(tái)節(jié)點(diǎn)的故障,第一種方案需要2n+1
個(gè)副本盹靴,而第二種方案只需要n+1
個(gè)副本炸茧,而Kafka
的每個(gè)Partition
存儲(chǔ)大量的數(shù)據(jù),這樣會(huì)造成大量的數(shù)據(jù)冗余;
雖然第二種方案的延遲會(huì)比較高,但是相比而言延遲對(duì)Kafka
的影響較小空执。
采用第二種方案后,leader
收到數(shù)據(jù)控漠,所有的follower
都開始同步數(shù)據(jù),但是有一個(gè)follower
悬钳,因?yàn)槟撤N故障润脸,遲遲不能與leader
同步柬脸,那leader
就要一直等下去,直到它同步完才能發(fā)送ack
毙驯,這個(gè)問題怎么解決呢?
leader
維護(hù)了一個(gè)動(dòng)態(tài)的in-syncreplica set (ISR)
,意為和leader
保持同步的follower
集合灾测,當(dāng)ISR
中的follower
完成數(shù)據(jù)的同步之后爆价,leader
就會(huì)給follower
發(fā)送ack
,如果follower
長(zhǎng)時(shí)間未向leader
同步數(shù)據(jù)媳搪,則該follower
將被踢出ISR
铭段,該時(shí)間閾值由replica.lag.time.max.ms
參數(shù)設(shè)定,leader
發(fā)生故障之后秦爆,就會(huì)從ISR
中選舉新的leader
序愚。
ack
應(yīng)答機(jī)制:
對(duì)于某些不重要的數(shù)據(jù),能夠容忍少量數(shù)據(jù)的丟失等限,所以沒必要等ISR
中的所有follower
全部同步完成
所以Kafka
提供了三種可靠性級(jí)別爸吮,根據(jù)對(duì)可靠性和延遲的要求權(quán)衡,分別是:
- 0
Producer
不等待Broker
的ack
望门,這一操作提供了最低的延遲形娇,Broker
一接收到還沒有落盤就已經(jīng)返回,當(dāng)Broker
故障時(shí)可能會(huì)丟失數(shù)據(jù) - 1
Producer
等待Broker
的ack
筹误,Partition
的leader
落盤成功后返回ack
桐早,如果在follower
同步成功之前leader
故障,那么將會(huì)丟失數(shù)據(jù) - -1
Producer
等待Broker
的ack
厨剪,Partition
的leader
和follower
全部落盤成功后才返回ack
哄酝,但是如果在follower
同步完成后,Broker
發(fā)送ack
之前祷膳,leader
發(fā)生故障陶衅,那么會(huì)造成數(shù)據(jù)重復(fù)
故障處理:
follower
掛了被會(huì)暫時(shí)提出ISR
,等到follower
恢復(fù)后钾唬,follower
會(huì)讀取本地磁盤記錄上次的HW
万哪,并將log
文件中高于HW
的部分截取掉,從HW
開始向leader
進(jìn)行同步抡秆,等leader
的LEO
高于Partition
的HW
奕巍,就可以被重新加入ISR
leader
發(fā)生故障之后,會(huì)從ISR
中選出一個(gè)新的leader
儒士,為保證多個(gè)副本之間的數(shù)據(jù)一致性的止,每個(gè)leader
會(huì)將各自log
文件中高于HW
的數(shù)據(jù)切掉,然后從新的leader
同步數(shù)據(jù)
3.3.3 Exactly Once
語(yǔ)義
對(duì)于某些比較重要的消息着撩,我們需要保證Exactly Once
語(yǔ)義诅福,即保證每條消息被發(fā)送且僅被發(fā)送一次
在0.11
版本之后匾委,Kafka
引入了冪等性機(jī)制(idempotent
),配合acks = -1
時(shí)的at least once
語(yǔ)義氓润,實(shí)現(xiàn)了Producer
到Broker
的Exactly once
語(yǔ)義
idempotent + at least once = exactly once
使用時(shí)赂乐,只需將enable.idempotence
屬性設(shè)置為true
,Kafka
自動(dòng)將acks
屬性設(shè)為-1
3.3 Consumer
3.3.1 消費(fèi)方式
Consumer
采取pull
的方式從Broker
中讀取數(shù)據(jù)
為什么采用pull
方式呢咖气?
因?yàn)?code>push模式很難適應(yīng)不同速率的Consumer
挨措,因此發(fā)送速率是由Broker
決定的,它的目的就是盡可能快的傳遞消息崩溪,但是這樣容易造成Consumer
來(lái)不及處理消息浅役,典型的表現(xiàn)就是網(wǎng)絡(luò)擁堵以及拒絕服務(wù),而poll
模式則可以根據(jù)Consumer
的消費(fèi)能力消費(fèi)消息伶唯。
但是poll
也有不足觉既,就是如果隊(duì)列中沒有消息,Consumer
可能陷入循環(huán)中乳幸,一直返回空數(shù)據(jù)瞪讼,針對(duì)這個(gè)缺點(diǎn),Consumer
在消費(fèi)數(shù)據(jù)時(shí)會(huì)傳入一個(gè)timeout
反惕,如果當(dāng)前沒有消息可供消費(fèi)尝艘,Consumer
會(huì)等待一段時(shí)間再返回,這段時(shí)間就是timeout
姿染。
3.3.2 分區(qū)分配策略
Kafka
有兩種分配策略背亥,分別是:
3.3.3 offset
維護(hù)
由于Consumer
在消息過程中可能會(huì)出現(xiàn)斷電宕機(jī)等故障,Consumer
恢復(fù)后悬赏,需要從故障的位置繼續(xù)消費(fèi)狡汉,所以Consumer
需要實(shí)時(shí)記錄自己消費(fèi)到了哪個(gè)offset
0.9
以前,Consumer
默認(rèn)將offset
保存在ZK
中
0.9
以后闽颇,Consumer
默認(rèn)將offset
保存在Kafka
一個(gè)內(nèi)置的Topic
盾戴,該Topic
為__consumer_offsets
3.4 Kafka
高效讀取數(shù)據(jù)
順序?qū)懘疟P
Kafka
的Producer
生產(chǎn)數(shù)據(jù),要寫入到log
文件中兵多,寫的過程是一直追加到文件末端
零拷貝技術(shù)
3.5 Zookeeper
在Kafka
中的作用
Kafka集群中有一個(gè)broker會(huì)被選舉為Controller尖啡,負(fù)責(zé)管理集群broker的上下線,所有topic的分區(qū)副本分配和leader選舉等工作剩膘。
Controller的管理工作都是依賴于Zookeeper的衅斩。
以下為partition的leader選舉過程:
4 Kafka API
4.1 Producer API
4.1.1 消息發(fā)送流程
Kafka
的Producer
發(fā)送消息采用的是異步發(fā)送的方式,在消息發(fā)送的過程中怠褐,涉及到了兩個(gè)線程——main
線程和Sender
線程畏梆,以及一個(gè)線程共享變量——RecordAccumulator
,main
線程將消息發(fā)送給RecordAccumulator
,Sender
線程不斷從RecordAccumulator
中拉取消息發(fā)送到Kafka broker
奠涌。
相關(guān)參數(shù):
batch.size
:只有數(shù)據(jù)積累到batch.size
之后宪巨,sender
才會(huì)發(fā)送數(shù)據(jù)
linger.ms
:如果數(shù)據(jù)遲遲未達(dá)到batch.size
,sender
等待linger.time
之后就會(huì)發(fā)送數(shù)據(jù)
相關(guān)類:
KafkaProducer
:需要?jiǎng)?chuàng)建一個(gè)生產(chǎn)者對(duì)象溜畅,用來(lái)發(fā)送數(shù)據(jù)
ProducerConfig
:獲取所需的一系列配置參數(shù)
ProducerRecord
:每條數(shù)據(jù)都要封裝成一個(gè)ProducerRecord
對(duì)象
導(dǎo)入依賴:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
4.1.2 異步發(fā)送
package com.djm.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
producer.send(new ProducerRecord<>("first", i + "", "message-" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("success -> " + metadata.offset());
} else {
exception.printStackTrace();
}
}
});
}
producer.close();
}
}
4.1.3 同步發(fā)送
package com.djm.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
producer.send(new ProducerRecord<>("first", i + "", "message-" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("success -> " + metadata.offset());
} else {
exception.printStackTrace();
}
}
}).get();
}
producer.close();
}
}
4.2 Consumer API
Consumer
消費(fèi)數(shù)據(jù)時(shí)的可靠性是很容易保證的捏卓,因?yàn)閿?shù)據(jù)在Kafka
中是持久化的,故不用擔(dān)心數(shù)據(jù)丟失問題慈格。
由于Consumer
在消費(fèi)過程中可能會(huì)出現(xiàn)斷電宕機(jī)等故障天吓,Consumer
恢復(fù)后,需要從故障前的位置的繼續(xù)消費(fèi)峦椰,所以Consumer
需要實(shí)時(shí)記錄自己消費(fèi)到了哪個(gè)offset
,以便故障恢復(fù)后繼續(xù)消費(fèi)汰规。
所以offset
的維護(hù)是Consumer
消費(fèi)數(shù)據(jù)是必須考慮的問題汤功。
相關(guān)類:
KafkaConsumer
:需要?jiǎng)?chuàng)建一個(gè)消費(fèi)者對(duì)象,用來(lái)消費(fèi)數(shù)據(jù)
ConsumerConfig
:獲取所需的一系列配置參數(shù)
ConsuemrRecord
:每條數(shù)據(jù)都要封裝成一個(gè)ConsumerRecord
對(duì)象
導(dǎo)入依賴:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
4.2.1 手動(dòng)提交offset
package com.djm.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList("first"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
}
}
手動(dòng)提交offset的方法有兩種:
-
commitSync
(同步提交):將本次poll
的一批數(shù)據(jù)最高的偏移量提交溜哮,失敗重試滔金,一直到提交成功 -
commitAsync
(異步提交):將本次poll
的一批數(shù)據(jù)最高的偏移量提交,沒有失敗重試機(jī)制茂嗓,有可能提交失敗
4.2.2 自動(dòng)提交offset
自動(dòng)提交offset
的相關(guān)參數(shù):
enable.auto.commit
:是否開啟自動(dòng)提交offset
功能
auto.commit.interval.ms
:自動(dòng)提交offset
的時(shí)間間隔
package com.djm.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList("first"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
4.3 自定義Interceptor
Interceptor
是在Kafka
0.10
版本被引入的餐茵,主要用于實(shí)現(xiàn)Client
端的定制化控制邏輯。
對(duì)于Producer
而言述吸,Interceptor
使得用戶在消息發(fā)送前以及Producer
回調(diào)邏輯前有機(jī)會(huì)對(duì)消息做一些定制化需求忿族,比如修改消息等,同時(shí)蝌矛,Producer
允許用戶指定多個(gè)Interceptor
按序作用于同一條消息從而形成一個(gè)Interceptorchain
道批。
Interceptor
的實(shí)現(xiàn)接口是org.apache.kafka.clients.producer.ProducerInterceptor
,其定義的方法包括:
-
configure(configs)
:獲取配置信息和初始化數(shù)據(jù)時(shí)調(diào)用 -
onSend(ProducerRecord)
:Producer
確保在消息被序列化以及計(jì)算分區(qū)前調(diào)用該方法入撒,用戶可以在該方法中對(duì)消息做任何操作隆豹,但最好保證不要修改消息所屬的Topic
和Partition
,否則會(huì)影響目標(biāo)分區(qū)的計(jì)算 -
onAcknowledgement(RecordMetadata, Exception)
:該方法會(huì)在消息從RecordAccumulator
成功發(fā)送到Kafka Broker
之后茅逮,或者在發(fā)送過程中失敗時(shí)調(diào)用 -
close
:關(guān)閉Interceptor
璃赡,主要用于執(zhí)行一些資源清理工作
攔截器案例
1、需求分析:
實(shí)現(xiàn)一個(gè)簡(jiǎn)單的雙Interceptor
組成的攔截鏈献雅,第一個(gè)Interceptor
會(huì)在消息發(fā)送前將時(shí)間戳信息加到消息value
的最前部碉考,第二個(gè)Interceptor
會(huì)在消息發(fā)送后更新成功發(fā)送消息數(shù)或失敗發(fā)送消息數(shù)
2、編寫TimeInterceptor
package com.djm.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class TimeInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), System.currentTimeMillis() + "," + record.value());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
3惩琉、編寫CounterInterceptor
package com.djm.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class CounterInterceptor implements ProducerInterceptor<String, String> {
private static long successCounter = 0L;
private static long errorCounter = 0L;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
@Override
public void configure(Map<String, ?> configs) {
}
}
4豆励、修改CustomProducer
package com.djm.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class CustomProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
List<String> interceptors = new ArrayList<>();
interceptors.add("com.djm.kafka.interceptor.TimeInterceptor");
interceptors.add("com.djm.kafka.interceptor.CounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
producer.send(new ProducerRecord<>("first", i + "", "message-" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("success -> " + metadata.offset());
} else {
exception.printStackTrace();
}
}
});
}
producer.close();
}
}
5 Flume
對(duì)接Kafka
1、配置Flume
編寫flume-kafka.conf
[djm@hadoop102 job]$ vim flume-kafka.conf
輸入一下內(nèi)容
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/module/datas/flume.log
a1.sources.r1.shell = /bin/bash -c
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2、啟動(dòng)消費(fèi)者
[djm@hadoop102 ~]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
3良蒸、啟動(dòng)Flume
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf
4技扼、向/opt/module/datas/flume.log
里追加數(shù)據(jù),查看Kafka
消費(fèi)情況
6 Kafka
監(jiān)控
6.1 Monitor
1嫩痰、上傳jar
包KafkaOffsetMonitor-assembly-0.4.6.jar
到集群
2剿吻、在/opt/module/
下創(chuàng)建kafka-offset-console
文件夾
3、將上傳的jar
包放入剛創(chuàng)建的目錄下
4串纺、在/opt/module/kafka-offset-console
目錄下創(chuàng)建啟動(dòng)腳本start.sh
丽旅,內(nèi)容如下:
#!/bin/bash
java -cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--offsetStorage kafka \
--kafkaBrokers hadoop102:9092,hadoop103:9092,hadoop104:9092 \
--kafkaSecurityProtocol PLAINTEXT \
--zk hadoop102:2181,hadoop103:2181,hadoop104:2181 \
--port 8086 \
--refresh 10.seconds \
--retain 2.days \
--dbName offsetapp_kafka &
5、在/opt/module/kafka-offset-console
目錄下創(chuàng)建mobile-logs
文件夾
6纺棺、啟動(dòng)Monitor
./start.sh
6.2 Manager
1榄笙、上傳壓縮包kafka-manager-1.3.3.15.zip
到集群
2、解壓到/opt/module
3祷蝌、修改配置文件conf/application.conf
kafka-manager.zkhosts="kafka-manager-zookeeper:2181"
修改為:
kafka-manager.zkhosts="hadoop102:2181,hadoop103:2181,hadoop104:2181"
4茅撞、啟動(dòng)kafka-manager
[djm@hadoop102 kafka-manager-1.3.3.15]$ bin/kafka-manager
5、登錄hadoop102:9000
頁(yè)面查看詳細(xì)信息