- kafka是一個
分布式
的基于發(fā)布/訂閱模式的消息隊列
(Message Queue)茎芭,主要應(yīng)用于大數(shù)據(jù)實時處理領(lǐng)域抖拦。
- 使用消息隊列的好處:
-
解耦
:允許你獨立地擴展或修改兩邊的處理過程堰塌,只要確保它們遵守同樣的接口約束且预。
-
可恢復(fù)性
:系統(tǒng)的一部分組件失效時蜈项,不會影響到整個系統(tǒng)定硝。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉件豌,加入隊列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理疮方。
-
緩沖
:有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度,解決生產(chǎn)消息和消費消息的處理速度不一致的情況茧彤。
-
削峰
:在訪問量劇增的情況下,使用消息隊列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力疆栏,而不會因為突發(fā)的超負荷的請求而完全崩潰曾掂。
-
異步通信
:很多時候用戶不需要立即處理消息。消息隊列提供了異步處理機制
壁顶,允許用戶把一個消息放入隊列珠洗,但并不立即處理它。想向隊列中放入多少消息就放多少若专,然后在需要的時候再去處理它們许蓖。
- 消息隊列的兩種模式:
-
點對點模式
:一對一,消費者主動拉取
消息,消費者消費消息后膊爪,消息就被清除自阱。隊列支持存在多個消費者,但是對一個消息而言米酬,只會有一個消費者可以消費沛豌。
-
發(fā)布/訂閱模式
:一對多,生產(chǎn)者生產(chǎn)消息放到隊列中赃额,隊列推送消息
加派,消費者消費消息后,消息不會被清除跳芳。
-
Producer
:消息生產(chǎn)者芍锦,就是向 kafka broker 發(fā)消息的客戶端;
-
Consumer
:消息消費者飞盆,向 kafka broker 取消息的客戶端醉旦;
-
Consumer Group(CG)
:消費者組,由多個 consumer 組成桨啃。消費者組內(nèi)每個消費者負責(zé)消費不同分區(qū)的數(shù)據(jù)车胡,一個分區(qū)只能由一個組內(nèi)某個消費者消費;消費者組之間互不影響照瘾。
所有的消費者都屬于某個消費者組匈棘,即消費者組是邏輯上的一個訂閱者。
-
Broker
:一臺 kafka 服務(wù)器就是一個 broker析命。一個集群由多個 broker 組成主卫。一個 broker可以容納多個 topic。
-
Topic
: 可以理解為一個隊列鹃愤, 生產(chǎn)者和消費者面向的都是一個 topic簇搅;
-
Partition
: 為了實現(xiàn)擴展性,一個非常大的 topic 可以分布到多個 broker(即服務(wù)器)上软吐,一個 topic 可以分為多個 partition瘩将,每個 partition 是一個有序的隊列;
-
Replication
:副本凹耙,為保證集群中的某個節(jié)點發(fā)生故障時姿现,該節(jié)點上的分區(qū)數(shù)據(jù)不丟失,且Kafka仍然能夠繼續(xù)工作肖抱, Kafka 提供了副本機制备典,一個 topic 的每個分區(qū)都有若干個副本,一個 leader 和若干個 follower意述。
-
Leader
: 每個分區(qū)多個副本的“主”提佣,生產(chǎn)者發(fā)送數(shù)據(jù)的對象吮蛹,以及消費者消費數(shù)據(jù)的對象都是 leader。
-
Follower
:每個分區(qū)多個副本中的“從”拌屏,實時從 leader 中同步數(shù)據(jù)潮针,保持和 leader 數(shù)據(jù)的同步。 leader 發(fā)生故障時槐壳,某個 Follower 會成為新的 leader然低。
依賴zookeeper集群搭建kafka集群
tar -zxvf kafka_2.12-2.5.0.tgz、mv kafka_2.12-2.5.0 kafka-1
tar -zxvf kafka_2.12-2.5.0.tgz枫笛、mv kafka_2.12-2.5.0 kafka-2
tar -zxvf kafka_2.12-2.5.0.tgz吨灭、mv kafka_2.12-2.5.0 kafka-3
- 修改各個節(jié)點中kafka服務(wù)器的配置文件
server.properties
、producer.properties
刑巧、consumer.properties
:
-
/opt/kafka-1/config/server.properties
:
broker.id=1 #服務(wù)器節(jié)點編號
listeners=PLAINTEXT://192.168.211.147:9092 #設(shè)置監(jiān)聽哪個端口
advertised.listeners=PLAINTEXT://192.168.211.147:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka-1/data #存放kafka數(shù)據(jù)的目錄
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.211.147:2181,192.168.211.147:2182,192.168.211.147:2183
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
-
/opt/kafka-1/config/producer.properties
:
bootstrap.servers=192.168.211.147:9092
compression.type=none
-
/opt/kafka-1/config/consumer.properties
:
bootstrap.servers=192.168.211.147:9092
group.id=test
-
/opt/kafka-2/config/server.properties
:
broker.id=2 #服務(wù)器節(jié)點編號
listeners=PLAINTEXT://192.168.211.141:9092 #設(shè)置監(jiān)聽哪個端口
advertised.listeners=PLAINTEXT://192.168.211.141:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka-2/data #存放kafka數(shù)據(jù)的目錄
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.211.147:2181,192.168.211.147:2182,192.168.211.147:2183
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
-
/opt/kafka-2/config/producer.properties
:
bootstrap.servers=192.168.211.141:9092
compression.type=none
-
/opt/kafka-2/config/consumer.properties
:
bootstrap.servers=192.168.211.141:9092
group.id=test
-
/opt/kafka-3/config/server.properties
:
broker.id=3 #服務(wù)器節(jié)點編號
listeners=PLAINTEXT://192.168.211.156:9092 #設(shè)置監(jiān)聽哪個端口
advertised.listeners=PLAINTEXT://192.168.211.156:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka-3/data #存放kafka數(shù)據(jù)的目錄
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.211.147:2181,192.168.211.147:2182,192.168.211.147:2183
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
-
/opt/kafka-3/config/producer.properties
:
bootstrap.servers=192.168.211.156:9092
compression.type=none
-
/opt/kafka-3/config/consumer.properties
:
bootstrap.servers=192.168.211.156:9092
group.id=test
- 分別在各個節(jié)點的kafka安裝目錄下創(chuàng)建兩個文件夾:
mkdir data logs
- 分別在各個節(jié)點的kafka安裝目錄下的data文件夾中創(chuàng)建一個文件
meta.properties
:
-
/opt/kafka-1/data/meta.properties
:
version=0
broker.id=1 #注意和server.properties中的broker.id的值相同
-
/opt/kafka-2/data/meta.properties
:
version=0
broker.id=2
-
/opt/kafka-3/data/meta.properties
:
version=0
broker.id=3
- 先啟動zookeeper集群喧兄,再啟動kafka集群(可以單獨寫一個shell腳本來啟動和關(guān)閉集群),然后鍵入命令:
jps
查看kafka是否已啟動啊楚。
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
nohup bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &
- 進入kafka安裝目錄下的logs文件夾吠冤,通過文件
server.log
可以查看該kafka節(jié)點的運行日志。
- 創(chuàng)建主題(
bigdata3
)恭理,指定分區(qū)數(shù)2個和副本數(shù)1個(注意:分區(qū)數(shù)可以超過節(jié)點數(shù)拯辙,副本數(shù)不能超過節(jié)點數(shù)):
bin/kafka-topics.sh --zookeeper 192.168.211.147:2181 --create --topic bigdata3 --partitions 2 --replication-factor 1
bin/kafka-topics.sh --zookeeper 192.168.211.147:2181 --list
- 刪除主題(
bigdata3
),刪除后會被標(biāo)記為刪除狀態(tài)颜价,等cpu空閑時再自動清除
./bin/kafka-topics.sh --zookeeper 192.168.211.147:2181 --delete --topic bigdata3
./bin/kafka-topics.sh --zookeeper 192.168.211.147:2182 --describe --topic bigdata3
- 往
bigdata3
主題上發(fā)送消息(啟動生產(chǎn)者):
./bin/kafka-console-producer.sh --broker-list 192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092 --topic bigdata3
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092 --topic bigdata3 --from-beginning
- kafka中消息是以
topic
進行分類的涯保,producer
生產(chǎn)消息,consumer
消費消息周伦,都是面向topic的夕春。(從命令行操作看出)
-
topic
是邏輯上的概念,而partition
是物理上的概念专挪,每個partition
對應(yīng)于一個log文件及志,該 log 文件中存儲的就是producer 生產(chǎn)的數(shù)據(jù)。(topic = N partition狈蚤,partition = log)
- 由于生產(chǎn)者生產(chǎn)的消息會不斷追加到
.log
文件末尾困肩, 為防止.log
文件過大導(dǎo)致數(shù)據(jù)定位效率低下,kafka采取了分片
和索引
機制脆侮,將每個partition
分為多個segment
。每個segment
對應(yīng)兩個文件——.index
文件和.log
文件勇劣。 這些文件位于一個文件夾下靖避,該文件夾的命名規(guī)則為:topic name+分區(qū)序號
潭枣。例如:first-topic
這個topic有三個分區(qū),則其對應(yīng)的文件夾為first-topic-0
幻捏,first-topic-1
盆犁,first-topic-2
。
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
.index文件和.log文件的結(jié)構(gòu)示意圖
-
.index
和.log
文件以當(dāng)前segment
的第一條消息的offset
命名篡九。.index
文件存儲大量的索引信息谐岁,.log
文件存儲大量的數(shù)據(jù),索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message
的物理偏移地址榛臼。
- 主題分區(qū)的原因:
-
方便在集群中擴展
伊佃,每個partition
可以通過調(diào)整以適應(yīng)它所在的機器,而一個topic
又可以有多個partition
組成沛善,因此整個集群就可以適應(yīng)適合的數(shù)據(jù)了航揉;
-
可以提高并發(fā)
,因為可以以partition
為單位進行讀寫了金刁。
- 分區(qū)的原則:
- 指明
partition
的情況下帅涂,直接將指明的值作為partiton
值;
- 沒有指明
partition
值但有key
的情況下尤蛮,將key
的 hash 值與 topic 的 partition 總數(shù)進行取余得到的partition
值媳友;
- 既沒有
partition
值又沒有 key 值的情況下,第一次調(diào)用時隨機生成一個整數(shù)(后面每次調(diào)用在這個整數(shù)上自增)产捞,將這個值與 topic 可用的 partition 總數(shù)取余得到 partition值醇锚,即round-robin
算法。
生產(chǎn)者發(fā)送數(shù)據(jù)分裝為ProducerRecord對象的API
- 生產(chǎn)者數(shù)據(jù)可靠性保證:為保證
producer
發(fā)送的數(shù)據(jù)轧葛,能可靠地發(fā)送到指定的 topic搂抒, topic 的每個partition
收到producer 發(fā)送的數(shù)據(jù)后,都需要向 producer 發(fā)送ack
(acknowledgement:確認收到)尿扯,若producer收到 ack求晶,則會進行下一輪的發(fā)送,否則重新發(fā)送數(shù)據(jù)衷笋。
生產(chǎn)者數(shù)據(jù)可靠性保證機制
- 何時發(fā)送ack芳杏?確保有
follower
與leader
同步完成,leader
再發(fā)送ack
辟宗,這樣才能保證leader
掛掉之后爵赵,能在follower
中選舉出新的leader
。
- 多少個follower同步完成之后發(fā)送ack泊脐?①半數(shù)以上的follower同步完成空幻,即可發(fā)送ack;②全部的follower同步完成容客,才可以發(fā)送ack秕铛。
序號 |
方案 |
優(yōu)點 |
缺點 |
1 |
半數(shù)以上完成同步约郁,就發(fā)送ack |
延遲低 |
選舉新的 leader 時,容忍 n 臺節(jié)點的故障但两,需要 2n+1 個副本鬓梅。容錯率:1/2。 |
2 |
全部完成同步谨湘,才發(fā)送ack |
選舉新的 leader 時绽快, 容忍 n 臺節(jié)點的故障,需要 n+1 個副本紧阔。容錯率:1坊罢。 |
延遲高 |
- kafka選取了第二種方案,原因:①為了容忍 n 臺節(jié)點的故障寓辱,第一種方案需要 2n+1 個副本艘绍,而第二種方案只需要 n+1 個副本,又 kafka 的每個分區(qū)都有大量的數(shù)據(jù)秫筏, 因此诱鞠,第一種方案會造成大量數(shù)據(jù)的冗余。②雖然第二種方案的網(wǎng)絡(luò)延遲會比較高这敬,但網(wǎng)絡(luò)延遲對 kafka 的影響較小航夺。
- kafka采用第二種方案之后,設(shè)想以下情景: leader 收到數(shù)據(jù)崔涂,所有 follower 都開始同步數(shù)據(jù)阳掐,但有一個 follower,因為某種故障遲遲不能與 leader 進行同步冷蚂,那 leader 就要一直等下去缭保,直到它完成同步,才能發(fā)送 ack蝙茶。這個問題怎么解決呢艺骂?
- leader 維護了一個動態(tài)的
in-sync replica set(ISR)
,意為和 leader 保持同步的 follower 集合隆夯。當(dāng) ISR 中的 follower 完成數(shù)據(jù)的同步之后钳恕,就會給 leader 發(fā)送 ack。若follower長時間未向leader同步數(shù)據(jù)蹄衷,則該follower將被踢出ISR忧额,該時間閾值由replica.lag.time.max.ms
參數(shù)設(shè)定。當(dāng)leader 發(fā)生故障之后愧口,就會從 ISR 中選舉新的 leader睦番。
-
replica.lag.time.max.ms
:If a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time, the leader will remove the follower from isr。TYPE: long耍属。DEFAULT: 10000抡砂。
- 若對數(shù)據(jù)的可靠性要求不是很高大咱,能夠容忍少量數(shù)據(jù)的丟失恬涧,則沒必要等 ISR 中的 follower 全部接收成功注益,因此,kafka 為用戶提供了三種可靠性級別溯捆,用戶根據(jù)對可靠性和延遲的要求進行權(quán)衡丑搔,acks可以選擇以下幾個值:
-
0
:producer
不等待 broker 的 ack,這一操作提供了一個最低的延遲提揍,broker
一接收到還沒有寫入磁盤就已經(jīng)返回啤月,當(dāng) broker 故障時有可能丟失數(shù)據(jù);
-
1
:producer
等待 broker 的 ack劳跃,partition
的 leader 落盤成功后返回才ack谎仲,若在 follower同步成功之前 leader 故障,則將會丟失數(shù)據(jù)刨仑;
-
-1
(all):producer
等待 broker 的 ack郑诺,partition
的 leader 和 ISR 的follower 全部落盤成功后才返回 ack。但若在 follower 同步完成后杉武,broker 發(fā)送 ack 之前辙诞, leader 發(fā)生故障,則會造成數(shù)據(jù)重復(fù)轻抱。
acks=-1飞涂,可能造成數(shù)據(jù)重復(fù)寫入
- 助記:返A(chǔ)CK前,0無落盤祈搜,1一落盤较店,-1全落盤,(落盤:消息存到本地)
- kafka數(shù)據(jù)一致性
-
LEO(Log End Offset)
:每個副本的最后一個offset容燕。
-
HW(High Watermark)
:高水位梁呈,指消費者能見到的最大的offset,ISR 隊列中最小的 LEO缰趋。
-
follower
故障:follower 發(fā)生故障后會被臨時踢出 ISR捧杉,待該 follower 恢復(fù)后, follower 會讀取本地磁盤記錄的上次的 HW秘血,并將 log 文件高于 HW 的部分截取掉味抖,從 HW 開始向 leader 進行同步;等該 follower 的 LEO 大于等于該 Partition 的 HW,即 follower 追上 leader 之后缔杉,就可以重新加入 ISR 了墩邀。
-
leader
故障:leader 發(fā)生故障之后深胳,會從 ISR 中選出一個新的 leader熔脂,之后為保證多個副本之間的數(shù)據(jù)一致性佩研, 其余的 follower 會先將各自的 log 文件高于 HW 的部分截掉,然后從新的 leader同步數(shù)據(jù)霞揉。
- 注意:這只能保證副本之間的數(shù)據(jù)一致性旬薯,并不能保證數(shù)據(jù)不丟失或者不重復(fù)。
- 當(dāng)服務(wù)器的 ACK 級別設(shè)置為-1(all)适秩,可以保證 Producer 到 Server 之間不會丟失數(shù)據(jù)绊序,即
At Least Once
語義。當(dāng)服務(wù)器的 ACK 級別設(shè)置為 0秽荞,可以保證生產(chǎn)者每條消息只會被發(fā)送一次骤公,即At Most Once
語義。
-
At Least Once
可以保證數(shù)據(jù)不丟失扬跋,但是不能保證數(shù)據(jù)不重復(fù)阶捆。At Most Once
可以保證數(shù)據(jù)不重復(fù),但是不能保證數(shù)據(jù)不丟失钦听。 對于一些非常重要的信息洒试,比如說交易數(shù)據(jù),下游數(shù)據(jù)消費者要求數(shù)據(jù)既不重復(fù)也不丟失彪见,即 Exactly Once
語義儡司。
- 在 0.11 版本以前的 Kafka,對此是無能為力的余指,只能保證數(shù)據(jù)不丟失捕犬,再在下游消費者對數(shù)據(jù)做全局去重。對于多個下游應(yīng)用的情況酵镜,每個都需要單獨做全局去重碉碉,這就對性能造成了很大影響。
- 0.11 版本的 Kafka淮韭,引入了一項重大特性:
冪等性
垢粮。所謂的冪等性就是指 Producer 不論向 Server 發(fā)送多少次重復(fù)數(shù)據(jù), Server 端都只會持久化一條靠粪。冪等性結(jié)合At Least Once
語義蜡吧,就構(gòu)成了 Kafka 的Exactly Once
語義。即:At Least Once + 冪等性 = Exactly Once
占键。
- 要啟用冪等性昔善,只需要將 Producer 的參數(shù)中
enable.idempotence
設(shè)置為 true 即可。 Kafka的冪等性實現(xiàn)其實就是將原來下游需要做的去重放在了數(shù)據(jù)上游畔乙。開啟冪等性的 Producer 在初始化的時候會被分配一個 PID君仆,發(fā)往同一 Partition 的消息會附帶 Sequence Number。而Broker 端會對<PID, Partition, SeqNumber>做緩存,當(dāng)具有相同主鍵的消息提交時返咱, Broker 只會持久化一條(單分區(qū)單會話)钥庇。但是生產(chǎn)者重啟時PID 就會變化,同時不同的 Partition 也具有不同主鍵咖摹,所以冪等性無法保證跨分區(qū)跨會話的 Exactly Once评姨。
- 消費者消費方式:consumer 采用 pull(拉) 模式從 broker 中讀取數(shù)據(jù)。
-
push(推)模式
很難適應(yīng)消費速率不同的消費者楞艾,因為消息發(fā)送速率是由 broker 決定的参咙。它的目標(biāo)是盡可能以最快的速度傳遞消息,但是這樣很容易造成 consumer 來不及處理消息硫眯,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而 pull 模式則可以根據(jù) consumer 的消費能力以適當(dāng)?shù)乃俾氏M消息择同。
-
pull (拉)模式
不足之處是两入,若 kafka 沒有數(shù)據(jù),則消費者可能會陷入循環(huán)中敲才,一直返回空數(shù)據(jù)裹纳。 針對這一點, Kafka 的消費者在消費數(shù)據(jù)時會傳入一個時長參數(shù) timeout紧武,若當(dāng)前沒有數(shù)據(jù)可供消費剃氧,則consumer 會等待一段時間之后再返回,這段時長即為 timeout阻星。
- 消費者分區(qū)分配策略:一個 consumer group 中有多個 consumer朋鞍,一個 topic 有多個 partition,所以會涉及到 partition 的分配問題妥箕,即確定哪個 partition 由哪個 consumer 來消費滥酥。Kafka 有兩種分配策略:①round-robin循環(huán);②range畦幢。
- 關(guān)于Roudn Robin重分配策略坎吻,其主要采用的是一種輪詢的方式來分配所有的分區(qū),主要實現(xiàn)的步驟如下:假設(shè)有三個topic:t0宇葱、t1和t2瘦真,這三個topic擁有的分區(qū)數(shù)分別為1、2和3黍瞧,則總共有六個分區(qū)诸尽,這六個分區(qū)分別為:t0-0、t1-0雷逆、t1-1弦讽、t2-0、t2-1和t2-2。又假設(shè)我們有三個consumer:C0往产、C1和C2被碗,它們訂閱的情況為:C0訂閱t0,C1訂閱t0和t1仿村,C2訂閱t0锐朴、t1和t2,則這些分區(qū)的分配步驟如下:
- 首先將所有的partition和consumer按照字典序進行排序蔼囊,所謂的字典序焚志,就是按照其名稱的字符串順序,那么上面的六個分區(qū)和三個consumer排序之后分別為:t0-0畏鼓、t1-0酱酬、t1-1、t2-0云矫、t2-1膳沽、t2-2和C0、C1让禀、C2挑社。
- 然后按順序輪詢的方式將這六個分區(qū)分配給三個consumer,若當(dāng)前consumer沒有訂閱當(dāng)前分區(qū)所在的topic巡揍,則輪詢地判斷下一個consumer:
- 嘗試將t0-0分配給C0痛阻,由于C0訂閱了t0,因而可以分配成功腮敌;
- 嘗試將t1-0分配給C1阱当,由于C1訂閱了t1,因而可以分配成功缀皱;
- 嘗試將t1-1分配給C2斗这,由于C2訂閱了t1,因而可以分配成功啤斗;
- 嘗試將t2-0分配給C0表箭,由于C0沒有訂閱t2,因而會輪詢下一個consumer钮莲;
- 嘗試將t2-0分配給C1免钻,由于C1沒有訂閱t2,因而會輪詢下一個consumer崔拥;
- 嘗試將t2-0分配給C2极舔,由于C2訂閱了t2,因而可以分配成功链瓦;
- 同理由于t2-1和t2-2所在的topic都沒有被C0和C1訂閱拆魏,因而都不會分配成功盯桦,最終都會分配給C2。
- 從上面的步驟分析可以看出渤刃,輪詢的策略就是簡單地將所有的partition和consumer按照字典序進行排序之后拥峦,然后依次將partition分配給各個consumer,若當(dāng)前的consumer沒有訂閱當(dāng)前的partition卖子,則會輪詢下一個consumer略号,直至最終將所有的分區(qū)都分配完畢。但是從上面的分配結(jié)果可以看出洋闽,輪詢的方式會導(dǎo)致每個consumer所承載的分區(qū)數(shù)量不一致玄柠,從而導(dǎo)致各個consumer壓力不均一。
- 按照上述的步驟將所有的分區(qū)都分配完畢之后诫舅,最終分區(qū)的訂閱情況如下:
- Range重分配策略:首先會計算各個consumer將會承載的分區(qū)數(shù)量羽利,然后將指定數(shù)量的分區(qū)分配給該consumer。假設(shè)有兩個consumer:C0和C1骚勘,兩個topic:t0和t1铐伴,這兩個topic分別都有三個分區(qū),總共的分區(qū)有六個:t0-0俏讹、t0-1、t0-2畜吊、t1-0泽疆、t1-1和t1-2,則這些分區(qū)的分配步驟如下:
- 注意:Range策略是按照topic進行分區(qū)分配的玲献。拿主題t0來分析殉疼,其首先會獲取t0的所有分區(qū):t0-0、t0-1和t0-2捌年,以及所有訂閱了該topic的consumer:C0和C1瓢娜,并且會將這些分區(qū)和consumer按照字典序進行排序;
- 然后按照平均分配的方式計算每個consumer會得到多少個分區(qū)礼预,若沒有除盡眠砾,則會將多出來的分區(qū)依次計算到前面幾個consumer。假設(shè)有3個分區(qū)和2個consumer托酸,則每個consumer至少會得到1個分區(qū)褒颈,因為3除以2后還余1,所以會將剩余的1個分區(qū)分配給第一個consumer励堡,最終C0從第0個分區(qū)開始分配2個分區(qū)谷丸,而C1從第2個分區(qū)開始分配1個分區(qū);
- 同理应结,按照上面的步驟依次進行后面topic的分配刨疼,最終上面六個分區(qū)的分配情況如下所示:
- 由于 consumer 在消費過程中可能會出現(xiàn)斷電宕機等故障, consumer 恢復(fù)后,需要從故障前的位置繼續(xù)消費揩慕,所以 consumer 需要實時記錄自己消費到了哪個 offset亭畜,以便故障恢復(fù)后繼續(xù)消費。
- Kafka 0.9 版本之前漩绵, consumer 默認將 offset 保存在 Zookeeper 中贱案,從 0.9 版本開始,consumer 默認將 offset 保存在 Kafka 一個內(nèi)置的 topic 中止吐,該 topic 為
__consumer_offsets
宝踪。
- 修改配置文件
consumer.properties
:exclude.internal.topics=false
。
- 0.11.0.0 之前版本:
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper hadoop102:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper hadoop102:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
- 消費者組案例:測試同一個消費者組中的消費者碍扔, 同一時刻只能有一個消費者消費一個消息瘩燥。
- 修改consumer.properties文件中的 group.id 屬性:
group.id=zhangsan
- 創(chuàng)建主題:bigdata3
- 啟動生產(chǎn)者
- 分別啟動2個消費者
- 在生產(chǎn)者窗口輸入消息,觀察兩個消費者窗口不同,會發(fā)現(xiàn)兩個消費者窗口中厉膀,只有一個才會彈出消息。
bin/kafka-topics.sh --zookeeper 192.168.211.147:2181 --create --topic bigdata3 --partitions 2 --replication-factor 1
--------------------------------------------------------
./bin/kafka-console-producer.sh --broker-list 192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092 --topic bigdata3
-------------------------------------------------------
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092 --topic bigdata3 --consumer.config ./config/consumer.properties
- kafka高效讀寫數(shù)據(jù):
-
順序?qū)懘疟P
:Kafka 的 producer 生產(chǎn)數(shù)據(jù)二拐,要寫入到 log 文件中服鹅,寫的過程是一直追加到文件末端,為順序?qū)憽?官網(wǎng)有數(shù)據(jù)表明百新,同樣的磁盤企软,順序?qū)懩艿?600M/s,而隨機寫只有 100K/s饭望。這與磁盤的機械機構(gòu)有關(guān)仗哨,順序?qū)懼钥欤且驗槠涫∪チ舜罅看蓬^尋址的時間铅辞。
-
零復(fù)制技術(shù)
:NIC network interface controller 網(wǎng)絡(luò)接口控制器厌漂。
- zookeeper 在 kafka 中的作用:Kafka 集群中有一個 broker 會被選舉為
controller
,負責(zé)管理集群中 broker 的上下線斟珊,所有 topic 的分區(qū)副本分配和 leader 選舉等工作苇倡。controller 的管理工作都是依賴于 zookeeper 的。以下為 partition 的 leader 選舉過程:
- Kafka 從 0.11 版本開始引入了事務(wù)支持倍宾。事務(wù)可以保證 Kafka 在 Exactly Once 語義的基礎(chǔ)上雏节,生產(chǎn)和消費可以跨分區(qū)和跨會話,要么全部成功高职,要么全部失敗钩乍。
- Producer 事務(wù):
- 為了實現(xiàn)跨分區(qū)跨會話的事務(wù),需要引入一個全局唯一的 Transaction ID怔锌,并將 Producer 獲得的PID 和Transaction ID 綁定寥粹。這樣當(dāng)Producer 重啟后就可以通過正在進行的 TransactionID 獲得原來的 PID变过。
- 為了管理 Transaction, Kafka 引入了一個新的組件 Transaction Coordinator涝涤。 Producer 就是通過和 Transaction Coordinator 交互獲得 Transaction ID 對應(yīng)的任務(wù)狀態(tài)媚狰。 Transaction Coordinator 還負責(zé)將事務(wù)所有寫入 Kafka 的一個內(nèi)部 Topic,這樣即使整個服務(wù)重啟阔拳,由于事務(wù)狀態(tài)得到保存崭孤,進行中的事務(wù)狀態(tài)可以得到恢復(fù),從而繼續(xù)進行糊肠。
- Consumer 事務(wù):無法保證 Commit 的信息被精確消費一次辨宠,這是由于 Consumer 可以通過 offset 訪問任意信息,而且不同的 Segment File 生命周期不同货裹,同一事務(wù)的消息可能會出現(xiàn)重啟后被刪除的情況嗤形。
- 消息發(fā)送流程:Kafka 的 Producer 發(fā)送消息采用的是
異步發(fā)送
的方式。在消息發(fā)送的過程中弧圆,涉及到了兩個線程——main線程
和 Sender線程
赋兵,以及一個線程共享變量——RecordAccumulator
。 main 線程將消息發(fā)送給 RecordAccumulator
搔预, Sender 線程不斷從 RecordAccumulator
中拉取消息發(fā)送到 kafka broker
霹期。
- 相關(guān)參數(shù):①
batch.size
: 只有數(shù)據(jù)積累到 batch.size 之后, sender 才會發(fā)送數(shù)據(jù)拯田。②linger.ms
:如果數(shù)據(jù)遲遲未達到 batch.size经伙, sender 等待 linger.time 之后就會發(fā)送數(shù)據(jù)。
- kafka應(yīng)用實例:測試異步發(fā)送消息以及帶回調(diào)函數(shù)的生產(chǎn)者勿锅。
- 導(dǎo)入依賴:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class MyProducer {
/**
* KafkaProducer:需要創(chuàng)建一個生產(chǎn)者對象,用來發(fā)送數(shù)據(jù)
* ProducerConfig:獲取所需的一系列配置參數(shù)
* ProducerRecord:每條數(shù)據(jù)都要封裝成一個 ProducerRecord 對象
*/
public static void main(String[] args) {
//1枣氧、創(chuàng)建kafka生產(chǎn)者的配置信息
Properties props = new Properties();
//2溢十、指定連接的kafka集群 broker-list
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092");
//3、ACK應(yīng)答級別
props.put(ProducerConfig.ACKS_CONFIG, "all");
//4达吞、重試次數(shù)
props.put(ProducerConfig.RETRIES_CONFIG, 1);
//5张弛、批次大小:16k
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//6酪劫、等待時間:1ms
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//7吞鸭、RecordAccumulator 緩沖區(qū)大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
//8、指定序列化的類
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//9覆糟、創(chuàng)建生產(chǎn)者對象
Producer<String, String> producer = new KafkaProducer<>(props);
//10刻剥、發(fā)送數(shù)據(jù)
/*for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("bigdata3", "test-" + i,
"test-" + i));
}*/
for (int i = 0; i < 10; i++) {
/**
* 回調(diào)函數(shù)會在 producer 收到 ack 時調(diào)用,且為異步調(diào)用滩字,
* 該方法有兩個參數(shù)造虏,分別是 RecordMetadata 和 Exception御吞,
* 若 Exception 為 null,則說明消息發(fā)送成功漓藕;否則說明消息發(fā)送失敗陶珠。
* 注意:消息發(fā)送失敗會自動重試,不需要我們在回調(diào)函數(shù)中手動重試享钞。
*/
producer.send(new ProducerRecord<>("bigdata3",
"test-" + i), (metadata, exception) -> {
if (exception == null) {
System.out.println(metadata.partition() + " - " + metadata.offset());
} else {
exception.printStackTrace();
}
});
}
//11揍诽、關(guān)閉資源
producer.close();
}
}
- kafka應(yīng)用實例:測試自定義分區(qū)器的生產(chǎn)者。
- MyPartitioner.java
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class MyPartitioner implements Partitioner {
/**
* 具體內(nèi)容填寫可參考默認分區(qū)器:org.apache.kafka.clients.producer.internals.DefaultPartitioner
* 然后Producer配置中注冊使用:
* props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
*/
@Override
public void configure(Map<String, ?> configs) {
}
//自定義存儲在哪個分區(qū)
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return 0;
}
@Override
public void close() {
}
}
- kafka應(yīng)用實例:測試同步發(fā)送消息的生產(chǎn)者栗竖。
- 同步發(fā)送的意思就是一條消息發(fā)送之后暑脆,會阻塞當(dāng)前線程,直至其返回 ack划滋。由于 send 方法返回的是一個 Future 對象饵筑,根據(jù) Futrue 對象的特點,只需再調(diào)用 Future 對象的 get 方法即可处坪。
- SyncProducer.java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class SyncProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 1);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("bigdata3",
"test-" + i), (metadata, exception) -> {
if (exception == null) {
System.out.println(metadata.partition() + " - " + metadata.offset());
} else {
exception.printStackTrace();
}
}).get(); //調(diào)用future的get方法來實現(xiàn)同步發(fā)送消息
}
producer.close();
}
}
- kafka應(yīng)用實例:測試消費者消費消息根资。
- MyConsumer.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class MyConsumer {
/**
* KafkaConsumer:需要創(chuàng)建一個消費者對象,用來消費數(shù)據(jù)
* ConsumerConfig:獲取所需的一系列配置參數(shù)
* ConsuemrRecord:每條數(shù)據(jù)都要封裝成一個 ConsumerRecord 對象
*/
public static void main(String[] args) {
//創(chuàng)建消費者的配置信息
Properties props = new Properties();
//連接的kafka集群同窘,--bootstrap-server
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092");
//設(shè)置消費者組
props.put(ConsumerConfig.GROUP_ID_CONFIG, "zhangsan");
//開啟自動提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//自動提交offset的時間間隔
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//重置消費者的offset玄帕,注意需要換組名才能出現(xiàn) --from-beginning 的效果
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//默認值是 latest
//key,value的反序列化類
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//訂閱主題
consumer.subscribe(Collections.singletonList("bigdata3"));
while (true) {
//死循環(huán)獲取所有記錄
ConsumerRecords<String, String> records = consumer.poll(100); //100ms
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d想邦,key = %s裤纹,value = %s\n", record.offset(), record.key(), record.value());
}
}
//關(guān)閉連接
//consumer.close();
}
}
- 雖然自動提交 offset 十分便利,但由于其是基于時間提交的丧没,開發(fā)人員難以把握offset 提交的時機鹰椒,因此 Kafka 還提供了手動提交 offset 的 API。手動提交 offset 的方法有兩種:①commitSync(同步提交)呕童;②commitAsync(異步提交)
- 相同點:都會將本次 poll 的一批數(shù)據(jù)最高的偏移量提交漆际;
- 不同點:commitSync 阻塞當(dāng)前線程,一直到提交成功夺饲,并且失敗會自動重試(由不可控因素導(dǎo)致奸汇,也會出現(xiàn)提交失敗)往声;而 commitAsync 則沒有失敗重試機制擂找,故有可能提交失敗。
- SyncCommitOffset.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class SyncCommitOffset {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "zhangsan");
//關(guān)閉自動提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//默認值是 latest
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("bigdata3"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100); //100ms
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d浩销,key = %s贯涎,value = %s\n", record.offset(), record.key(), record.value());
}
//同步提交,當(dāng)前線程會阻塞直到 offset 提交成功
consumer.commitSync();
}
//關(guān)閉連接
//consumer.close();
}
}
- 雖然同步提交 offset 更可靠一些撼嗓,但是由于其會阻塞當(dāng)前線程直到提交成功柬采,吞吐量會收到很大的影響欢唾,因此更多情況下,會選用異步提交 offset 的方式粉捻。
- AsyncCommitOffset.java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
public class AsyncCommitOffset {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "zhangsan");
//關(guān)閉自動提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//默認值是 latest
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("bigdata3"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100); //100ms
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d礁遣,key = %s,value = %s\n", record.offset(), record.key(), record.value());
}
// 異步提交
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
System.err.println("Commit failed for" + offsets);
}
});
}
//關(guān)閉連接
//consumer.close();
}
}
- 無論是同步提交還是異步提交 offset肩刃,都有可能會造成數(shù)據(jù)的漏消費或者重復(fù)消費祟霍。先提交 offset 后消費,有可能造成數(shù)據(jù)的漏消費盈包;而先消費后提交 offset沸呐,有可能會造成數(shù)據(jù)的重復(fù)消費。
- kafka 0.9 版本之前呢燥,offset 存儲在 zookeeper中崭添;0.9 版本及之后,默認將 offset 存儲在 kafka的一個內(nèi)置的 topic 中叛氨。除此之外呼渣, Kafka 還可以選擇自定義存儲 offset(需要借助
ConsumerRebalanceListener
接口)。offset 的維護相當(dāng)繁瑣寞埠, 因為需要考慮到消費者的 Rebalance屁置。(當(dāng)有新的消費者加入消費者組、已有的消費者推出消費者組或者所訂閱的主題的分區(qū)發(fā)生變化仁连,就會觸發(fā)到分區(qū)的重新分配蓝角,重新分配的過程叫做 Rebalance。)
- 消費者發(fā)生 Rebalance 之后饭冬,每個消費者消費的分區(qū)就會發(fā)生變化使鹅。因此消費者要首先獲取到自己被重新分配到的分區(qū),并且定位到每個分區(qū)最近提交的 offset 位置繼續(xù)消費昌抠。
- CustomSaveOffset.java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class CustomSaveOffset {
private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "zhangsan");
//關(guān)閉自動提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//默認值是 latest
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("bigdata3"),
new ConsumerRebalanceListener() {
// 該方法會在 Rebalance 之前調(diào)用
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitOffset(currentOffset);
}
// 該方法會在 Rebalance 之后調(diào)用
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
currentOffset.clear();
for (TopicPartition partition : partitions) {
//定位到最近提交的 offset 位置繼續(xù)消費
consumer.seek(partition, getOffset(partition));
}
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100); //100ms
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d并徘,key = %s,value = %s\n", record.offset(), record.key(), record.value());
}
//異步提交
commitOffset(currentOffset);
}
//關(guān)閉連接
//consumer.close();
}
// 獲取某分區(qū)的最新 offset
private static long getOffset(TopicPartition partition) {
return 0;
}
// 提交該消費者所有分區(qū)的 offset(可將offset存入MySQL數(shù)據(jù)庫中)
private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
}
}
- 攔截器原理:Producer 攔截器(
interceptor
)是在 kafka 0.10 版本被引入的扰魂,主要用于實現(xiàn) clients 端的定制化控制邏輯。對于 producer 而言蕴茴, interceptor 使得用戶在消息發(fā)送前以及 producer 回調(diào)邏輯前有機會對消息做一些定制化需求劝评,比如修改消息等。同時倦淀,producer 允許用戶指定多個 interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain
)蒋畜。Intercetpor 的實現(xiàn)接口是org.apache.kafka.clients.producer.ProducerInterceptor
,其定義的方法包括:
-
configure(configs)
:獲取配置信息和初始化數(shù)據(jù)時調(diào)用撞叽。
-
onSend(ProducerRecord)
:該方法封裝進 KafkaProducer.send 方法中姻成,即它運行在用戶主線程中插龄。 Producer 確保在消息被序列化以及計算分區(qū)前調(diào)用該方法。 用戶可以在該方法中對消息做任何操作科展,但最好保證不要修改消息所屬的 topic 和分區(qū)
均牢, 否則會影響目標(biāo)分區(qū)的計算。
-
onAcknowledgement(RecordMetadata, Exception)
:該方法會在消息從 RecordAccumulator 成功發(fā)送到 Kafka Broker 之后才睹,或者在發(fā)送過程中失敗時調(diào)用徘跪,并且通常在 producer 回調(diào)邏輯觸發(fā)之前。 onAcknowledgement 運行在producer 的 IO 線程中琅攘,因此不要在該方法中放入很多的邏輯垮庐,否則會拖慢 producer 的消息發(fā)送效率。
-
close()
:關(guān)閉 interceptor坞琴,主要用于執(zhí)行一些資源清理工作哨查。
- 如上所述:interceptor 可能被運行在多個線程中,因此在具體實現(xiàn)時用戶需要自行確保線程安全剧辐。另外寒亥,倘若指定了多個 interceptor,則 producer 將按照指定順序調(diào)用它們浙于,并僅僅是捕獲每個 interceptor 可能拋出的異常記錄到錯誤日志中而非在向上傳遞护盈。
- kafka應(yīng)用實例:實現(xiàn)一個簡單的雙 interceptor 組成的攔截鏈。
- 需求:第一個 interceptor 會在消息發(fā)送前將時間戳信息加到消息 value 的最前部羞酗;第二個 interceptor 會在消息發(fā)送后更新成功發(fā)送消息數(shù)或失敗發(fā)送消息數(shù)腐宋。
- TimeInterceptor.java
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
//時間戳攔截器
public class TimeInterceptor implements ProducerInterceptor<String, String> {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
//創(chuàng)建一個新的 record,把時間戳寫入消息體的最前部
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(),
"TimeInterceptor: " + System.currentTimeMillis() + "," + record.value());
}
@Override
public void close() {
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
}
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
//統(tǒng)計發(fā)送消息成功和發(fā)送失敗消息數(shù),并在 producer 關(guān)閉時打印這兩個計數(shù)器
public class CounterInterceptor implements ProducerInterceptor<String, String> {
private int errorCounter = 0;
private int successCounter = 0;
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
//統(tǒng)計成功和失敗的次數(shù)
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
//保存結(jié)果
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class InterceptorProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 1);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//構(gòu)建攔截鏈
List<String> interceptors = new ArrayList<>();
interceptors.add("com.zzw.interceptor.TimeInterceptor");
interceptors.add("com.zzw.interceptor.CounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("bigdata3", "zhangsan", "message-" + i);
producer.send(record);
}
//一定要關(guān)閉 producer柜候,這樣才會調(diào)用 interceptor 的 close 方法
producer.close();
}
}
tar -zxvf kafka-eagle-bin-2.0.3.tar.gz
cd kafka-eagle-bin-2.0.3/
tar -zxvf kafka-eagle-web-2.0.3-bin.tar.gz -C /opt/eagle
-----------------------------------------------------------------
sudo vim /etc/profile
-----------------------------------------------------------------
export JAVA_HOME=/opt/jdk1.8
export KE_HOME=/opt/eagle
export PATH=$PATH:$KE_HOME/bin:$JAVA_HOME/bin
-----------------------------------------------------------------
source /etc/profile
- 進入kafka-eagle的conf目錄下修改配置文件
system-config.properties
:
######################################
# multi zookeeper & kafka cluster list (zookeeper地址)
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=192.168.211.147:2181,192.168.211.147:2182,192.168.211.147:2183
######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123
######################################
# broker size online list
######################################
cluster1.kafka.eagle.broker.size=20
######################################
# zk client thread limit (zk線程數(shù))
######################################
kafka.zk.limit.size=25
######################################
# kafka eagle webui port (kafka eagle的端口)
######################################
kafka.eagle.webui.port=8048
######################################
# kafka jmx acl and ssl authenticate
######################################
cluster1.kafka.eagle.jmx.acl=false
cluster1.kafka.eagle.jmx.user=keadmin
cluster1.kafka.eagle.jmx.password=keadmin123
cluster1.kafka.eagle.jmx.ssl=false
cluster1.kafka.eagle.jmx.truststore.location=/Users/dengjie/workspace/ssl/certificates/kafka.truststore
cluster1.kafka.eagle.jmx.truststore.password=ke123456
######################################
# kafka offset storage (kafka offset保存的位置)
######################################
cluster1.kafka.eagle.offset.storage=kafka
######################################
# kafka metrics, 15 days by default (開啟圖表)
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.metrics.retain=15
######################################
# kafka sql topic records max
######################################
kafka.eagle.sql.topic.records.max=5000
######################################
# delete kafka topic token
######################################
kafka.eagle.topic.token=keadmin
######################################
# kafka sasl authenticate
######################################
cluster1.kafka.eagle.sasl.enable=false
cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256
cluster1.kafka.eagle.sasl.jaas.co![20210109181652.png](https://upload-images.jianshu.io/upload_images/7810329-5a7c09b3311a2647.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
nfig=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.kafka.eagle.sasl.client.id=
cluster1.kafka.eagle.blacklist.topics=
cluster1.kafka.eagle.sasl.cgroup.enable=false
cluster1.kafka.eagle.sasl.cgroup.topics=
######################################
# kafka mysql jdbc driver address (不用預(yù)先創(chuàng)建數(shù)據(jù)庫)
######################################
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456
- 打開Kafka broker JMX:Kafka Eagle獲取監(jiān)控數(shù)據(jù)是通過JMX(Java Managent ExtenSion)來實現(xiàn)的,所以需要打開Kafka broker 的 JMX怜奖,可以直接修改kafka-server-start.sh文件:
vi $KAFKA_HOME/bin/kafka-server-start.sh
:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
---------------------------------------------------------------------------
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
fi
- 接下來浑测,按順序進行以下操作:
- 運行zookeeper集群;
- 運行kafka集群之前歪玲,需要設(shè)置
JMX_PORT
迁央,否則Kafka Eagle 后臺提示連接失斨澜场;
- 啟動Kafka Eagle服務(wù)器岖圈;
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
nohup bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &
./ke.sh start