Kafka相關面試題


title: Kafka常見問題
date: 2020-04-01 16:25:49
update: 2020-04-01 20:31:30
excerpt: Kafka 面試中常見問題
toc_min_depth: 3
tags:

  • Kafka
  • 大數(shù)據(jù)框架
    categories:
  • [Kafka]
  • [大數(shù)據(jù)框架]

kafka

kafka的定義

Kafka是一個分布式的基于發(fā)布/訂閱模式的消息隊列(Message Queue)压彭,主要應用于大數(shù)據(jù)實時處理領域血淌。

消息隊列有什么好處

1)解耦
允許你獨立的擴展或修改兩邊的處理過程沦补,只要確保它們遵守同樣的接口約束。
2)可恢復性
系統(tǒng)的一部分組件失效時赠叼,不會影響到整個系統(tǒng)涧郊。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統(tǒng)恢復后被處理瑟俭。
3)緩沖
有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度逗扒,解決生產消息和消費消息的處理速度不一致的情況叉袍。
4)靈活性 & 峰值處理能力
在訪問量劇增的情況下,應用仍然需要繼續(xù)發(fā)揮作用骏掀,但是這樣的突發(fā)流量并不常見。如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費妹笆。使用消息隊列能夠使關鍵組件頂住突發(fā)的訪問壓力海渊,而不會因為突發(fā)的超負荷的請求而完全崩潰缺狠。
5)異步通信
很多時候薇正,用戶不想也不需要立即處理消息肥哎。消息隊列提供了異步處理機制篡诽,允許用戶把一個消息放入隊列崖飘,但并不立即處理它。想向隊列中放入多少消息就放多少杈女,然后在需要的時候再去處理它們芝囤。

消費隊列的兩種模式

(1)點對點模式(一對一改淑,消費者主動拉取數(shù)據(jù),消息收到后消息清除)
消息生產者生產消息發(fā)送到Queue中,然后消息消費者從Queue中取出并且消費消息澄惊。
消息被消費以后,queue中不再有存儲蔬将,所以消息消費者不可能消費到已經(jīng)被消費的消息坪稽。Queue支持存在多個消費者颁井,但是對一個消息而言厅贪,只會有一個消費者可以消費。
(2)發(fā)布/訂閱模式(一對多雅宾,消費者消費數(shù)據(jù)之后不會清除消息)
消息生產者(發(fā)布)將消息發(fā)布到topic中养涮,同時有多個消息消費者(訂閱)消費該消息。和點對點方式不同,發(fā)布到topic的消息會被所有訂閱者消費贯吓。

kafka中的相關概念

1)Producer :消息生產者懈凹,就是向kafka broker發(fā)消息的客戶端;
2)Consumer :消息消費者悄谐,向kafka broker取消息的客戶端介评;
3)Consumer Group (CG):消費者組,由多個consumer組成爬舰。消費者組內每個消費者負責消費不同分區(qū)的數(shù)據(jù)们陆,一個分區(qū)只能由一個組內消費者消費;消費者組之間互不影響情屹。所有的消費者都屬于某個消費者組坪仇,即消費者組是邏輯上的一個訂閱者。
4)Broker :一臺kafka服務器就是一個broker垃你。一個集群由多個broker組成椅文。一個broker可以容納多個topic。
5)Topic :可以理解為一個隊列惜颇,生產者和消費者面向的都是一個topic雾袱;
6)Partition:為了實現(xiàn)擴展性,一個非常大的topic可以分布到多個broker(即服務器)上官还,一個topic可以分為多個partition芹橡,每個partition是一個有序的隊列;
7)Replica:副本望伦,為保證集群中的某個節(jié)點發(fā)生故障時林说,該節(jié)點上的partition數(shù)據(jù)不丟失,且kafka仍然能夠繼續(xù)工作屯伞,kafka提供了副本機制腿箩,一個topic的每個分區(qū)都有若干個副本,一個leader和若干個follower劣摇。
8)leader:每個分區(qū)多個副本的“主”珠移,生產者發(fā)送數(shù)據(jù)的對象,以及消費者消費數(shù)據(jù)的對象都是leader末融。
9)follower:每個分區(qū)多個副本中的“從”钧惧,實時從leader中同步數(shù)據(jù),保持和leader數(shù)據(jù)的同步勾习。leader發(fā)生故障時浓瞪,某個follower會成為新的leader。

kafka配置文件

位置

[atguigu@hadoop102 kafka]$ cd config/
[atguigu@hadoop102 config]$ vi server.properties

內容

#broker的全局唯一編號巧婶,不能重復
broker.id=0
#刪除topic功能使能
delete.topic.enable=true
#處理網(wǎng)絡請求的線程數(shù)量
num.network.threads=3
#用來處理磁盤IO的線程數(shù)量
num.io.threads=8
#發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400
#請求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600
#kafka運行日志存放的路徑
log.dirs=/opt/module/kafka/logs
#topic在當前broker上的分區(qū)個數(shù)
num.partitions=1
#用來恢復和清理data下數(shù)據(jù)的線程數(shù)量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長時間乾颁,超時將被刪除
log.retention.hours=168
#配置連接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

kafka分布式的broker.id配置

修改配置文件/opt/module/kafka/config/server.properties中的broker.id=1涂乌、broker.id=2
注:broker.id不得重復

kafka的群起腳本

for i in hadoop102 hadoop103 hadoop104
do
echo "========== $i ==========" 
ssh $i '/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties'
done

kafka的命令行操作命令

啟動
[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties

查看當前服務器中的所有topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list

創(chuàng)建topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first
選項說明:
--topic 定義topic名
--replication-factor  定義副本數(shù)
--partitions  定義分區(qū)數(shù)


刪除topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first
需要server.properties中設置delete.topic.enable=true否則只是標記刪除。

發(fā)送消息
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
>hello world
>atguigu  atguigu


消費消息
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --topic first
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties 指定消費者的配置文件(可將多個消費者放置在一個組內)
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --topic first
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \
--bootstrap-server hadoop102:9092 --from-beginning --topic first
注 : --from-beginning:會把主題中以往所有的數(shù)據(jù)都讀取出來英岭。

查看某個Topic的詳情
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first

修改分區(qū)數(shù)
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6

kafka工作流程

Kafka中消息是以topic進行分類的湾盒,生產者生產消息,消費者消費消息诅妹,都是面向topic的历涝。

topic是邏輯上的概念,而partition是物理上的概念漾唉,每個partition對應于一個log文件荧库,該log文件中存儲的就是producer生產的數(shù)據(jù)。Producer生產的數(shù)據(jù)會被不斷追加到該log文件末端赵刑,且每條數(shù)據(jù)都有自己的offset分衫。消費者組中的每個消費者,都會實時記錄自己消費到了哪個offset般此,以便出錯恢復時蚪战,從上次的位置繼續(xù)消費。

由于生產者生產的消息會不斷追加到log文件末尾铐懊,為防止log文件過大導致數(shù)據(jù)定位效率低下邀桑,Kafka采取了分片和索引機制,將每個partition分為多個segment科乎。每個segment對應兩個文件——“.index”文件和“.log”文件壁畸。這些文件位于一個文件夾下,該文件夾的命名規(guī)則為:topic名稱+分區(qū)序號茅茂。例如捏萍,first這個topic有三個分區(qū),則其對應的文件夾為first-0,first-1,first-2空闲。
如下
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log

index和log文件以當前segment的第一條消息的offset命名令杈。

“.index”文件存儲大量的索引信息,“.log”文件存儲大量的數(shù)據(jù)碴倾,索引文件中的元數(shù)據(jù)指向對應數(shù)據(jù)文件中message的物理偏移地址逗噩。

kafka生產者的分區(qū)分配策略

1)分區(qū)的原因
(1)方便在集群中擴展,每個Partition可以通過調整以適應它所在的機器跌榔,而一個topic又可以有多個Partition組成异雁,因此整個集群就可以適應任意大小的數(shù)據(jù)了;
(2)可以提高并發(fā)矫户,因為可以以Partition為單位讀寫了片迅。

2)分區(qū)的原則
我們需要將producer發(fā)送的數(shù)據(jù)封裝成一個ProducerRecord對象残邀。
(1)指明 partition 的情況下皆辽,直接將指明的值直接作為 partiton 值柑蛇;
(2)沒有指明 partition 值但有 key 的情況下,將 key 的 hash 值與 topic 的 partition 數(shù)進行取余得到 partition 值驱闷;
(3)既沒有 partition 值又沒有 key 值的情況下耻台,第一次調用時隨機生成一個整數(shù)(后面每次調用在這個整數(shù)上自增),將這個值與 topic 可用的 partition 總數(shù)取余得到 partition 值空另,也就是常說的 round-robin 算法盆耽。

kafka如何保證數(shù)據(jù)可靠性

為保證producer發(fā)送的數(shù)據(jù)能可靠的發(fā)送到指定的topic,
topic的每個partition收到producer發(fā)送的數(shù)據(jù)后扼菠,都需要向producer發(fā)送ack(acknowledgement確認收到)摄杂,
如果producer收到ack,就會進行下一輪的發(fā)送循榆,否則重新發(fā)送數(shù)據(jù)析恢。

都有哪些副本數(shù)據(jù)同步策略 優(yōu)缺點是什么

方案 優(yōu)點 缺點
半數(shù)以上完成同步,就發(fā)送ack 延遲低 選舉新的leader時秧饮,容忍n臺節(jié)點的故障映挂,需要2n+1個副本
全部完成同步,才發(fā)送ack 選舉新的leader時盗尸,容忍n臺節(jié)點的故障柑船,需要n+1個副本 延遲高

kafka的副本同步策略是什么 這個策略會出現(xiàn)什么問題

Kafka選擇了第二種方案靡馁,原因如下:
1.同樣為了容忍n臺節(jié)點的故障陕见,第一種方案需要2n+1個副本瞒御,而第二種方案只需要n+1個副本鹃栽,而Kafka的每個分區(qū)都有大量的數(shù)據(jù)损晤,第一種方案會造成大量數(shù)據(jù)的冗余旺上。
2.雖然第二種方案的網(wǎng)絡延遲會比較高乖酬,但網(wǎng)絡延遲對Kafka的影響較小于未。

采用第二種方案之后弱贼,設想以下情景:leader收到數(shù)據(jù)蒸苇,所有follower都開始同步數(shù)據(jù),但有一個follower吮旅,因為某種故障溪烤,遲遲不能與leader進行同步,那leader就要一直等下去庇勃,直到它完成同步檬嘀,才能發(fā)送ack。這個問題怎么解決呢责嚷?

kafka中的ISR是什么

Leader維護了一個動態(tài)的in-sync replica set (ISR)鸳兽,意為和leader保持同步的follower集合。

當ISR中的follower完成數(shù)據(jù)的同步之后罕拂,leader就會給producer發(fā)送ack揍异。

如果follower長時間未向leader同步數(shù)據(jù)全陨,則該follower將被踢出ISR,

該時間閾值由replica.lag.time.max.ms參數(shù)設定衷掷。

Leader發(fā)生故障之后辱姨,就會從ISR中選舉新的leader。

kafka中的ack應答機制是什么

對于某些不太重要的數(shù)據(jù)戚嗅,對數(shù)據(jù)的可靠性要求不是很高雨涛,能夠容忍數(shù)據(jù)的少量丟失,所以沒必要等ISR中的follower全部接收成功懦胞。
所以Kafka為用戶提供了三種可靠性級別替久,用戶根據(jù)對可靠性和延遲的要求進行權衡,選擇以下的配置躏尉。
acks參數(shù)配置:
acks:
0:producer不等待broker的ack侣肄,這一操作提供了一個最低的延遲,broker一接收到還沒有寫入磁盤就已經(jīng)返回醇份,當broker故障時有可能丟失數(shù)據(jù)稼锅;
1:producer等待broker的ack,partition的leader落盤成功后返回ack僚纷,如果在follower同步成功之前l(fā)eader故障矩距,那么將會丟失數(shù)據(jù);
-1(all):producer等待broker的ack怖竭,partition的leader和follower全部落盤成功后才返回ack锥债。但是如果在follower同步完成后,broker發(fā)送ack之前痊臭,leader發(fā)生故障哮肚,那么會造成數(shù)據(jù)重復。

kafka如何進行故障處理

LEO:指的是每個副本最大的offset广匙;
HW:指的是消費者能見到的最大的offset允趟,ISR隊列中最小的LEO。
(1)follower故障
follower發(fā)生故障后會被臨時踢出ISR鸦致,待該follower恢復后潮剪,follower會讀取本地磁盤記錄的上次的HW,并將log文件高于HW的部分截取掉分唾,從HW開始向leader進行同步抗碰。等該follower的LEO大于等于該Partition的HW,即follower追上leader之后绽乔,就可以重新加入ISR了弧蝇。
(2)leader故障
leader發(fā)生故障之后,會從ISR中選出一個新的leader,之后看疗,為保證多個副本之間的數(shù)據(jù)一致性沙峻,其余的follower會先將各自的log文件高于HW的部分截掉,然后從新的leader同步數(shù)據(jù)鹃觉。
注意:這只能保證副本之間的數(shù)據(jù)一致性专酗,并不能保證數(shù)據(jù)不丟失或者不重復睹逃。

kafka消費者的消費方式

consumer采用pull(拉)模式從broker中讀取數(shù)據(jù)盗扇。
push(推)模式很難適應消費速率不同的消費者,因為消息發(fā)送速率是由broker決定的沉填。它的目標是盡可能以最快速度傳遞消息疗隶,但是這樣很容易造成consumer來不及處理消息,典型的表現(xiàn)就是拒絕服務以及網(wǎng)絡擁塞翼闹。而pull模式則可以根據(jù)consumer的消費能力以適當?shù)乃俾氏M消息斑鼻。
pull模式不足之處是,如果kafka沒有數(shù)據(jù)猎荠,消費者可能會陷入循環(huán)中坚弱,一直返回空數(shù)據(jù)。針對這一點关摇,Kafka的消費者在消費數(shù)據(jù)時會傳入一個時長參數(shù)timeout荒叶,如果當前沒有數(shù)據(jù)可供消費,consumer會等待一段時間之后再返回输虱,這段時長即為timeout些楣。

kafka消費者的分區(qū)分配策略

一個consumer group中有多個consumer,一個 topic有多個partition宪睹,所以必然會涉及到partition的分配問題愁茁,即確定那個partition由哪個consumer來消費。
Kafka有兩種分配策略亭病,一是RoundRobin鹅很,一是Range。

kafka消費者如何維護offset

由于consumer在消費過程中可能會出現(xiàn)斷電宕機等故障罪帖,consumer恢復后道宅,需要從故障前的位置的繼續(xù)消費,所以consumer需要實時記錄自己消費到了哪個offset胸蛛,以便故障恢復后繼續(xù)消費污茵。

Kafka 0.9版本之前,consumer默認將offset保存在Zookeeper中葬项,從0.9版本開始泞当,consumer默認將offset保存在Kafka一個內置的topic中,該topic為__consumer_offsets民珍。

1)修改配置文件consumer.properties
exclude.internal.topics=false
2)讀取offset
0.11.0.0之前版本:
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper hadoop102:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
0.11.0.0之后版本(含):
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper hadoop102:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

kafka中的消費者組是什么

配置config/consumer.properties文件中的group.id
然后在啟動消費者時候使用同一個配置文件 就可以讓消費者在一個組內

同一個消費者組中的消費者襟士,同一時刻只能有一個消費者消費盗飒。

如果消費者組中的消費者多于當前的分區(qū)數(shù) 會有警告提醒
No broker partitions consumed by consumer thread ...

如果停止了所有的消費者 那么offset會維護在我們選擇的地方(zk中或者是本地) 
再次啟動消費者會根據(jù)選擇的GTP(group topic partition所維護的offset位置進行繼續(xù)消費)

下圖為zk中維護的信息
image

kafka為什么能夠高效讀寫數(shù)據(jù)

  • 分布式框架
  • 分區(qū)
  • 順序寫磁盤
    • Kafka的producer生產數(shù)據(jù),要寫入到log文件中陋桂,寫的過程是一直追加到文件末端逆趣,為順序寫。官網(wǎng)有數(shù)據(jù)表明嗜历,同樣的磁盤宣渗,順序寫能到600M/s,而隨機寫只有100K/s梨州。這與磁盤的機械機構有關痕囱,順序寫之所以快,是因為其省去了大量磁頭尋址的時間暴匠。
  • 零復制技術
    • image

kafka的零拷貝技術如何實現(xiàn)

kafka中的消費者在讀取服務端的數(shù)據(jù)時鞍恢,需要將服務端的磁盤文件通過網(wǎng)絡發(fā)送到消費者進程,網(wǎng)絡發(fā)送需要經(jīng)過幾種網(wǎng)絡節(jié)點每窖。如下圖所示:

image

傳統(tǒng)的讀取文件數(shù)據(jù)并發(fā)送到網(wǎng)絡的步驟如下:
(1)操作系統(tǒng)將數(shù)據(jù)從磁盤文件中讀取到內核空間的頁面緩存帮掉;
(2)應用程序將數(shù)據(jù)從內核空間讀入用戶空間緩沖區(qū);
(3)應用程序將讀到數(shù)據(jù)寫回內核空間并放入socket緩沖區(qū)窒典;
(4)操作系統(tǒng)將數(shù)據(jù)從socket緩沖區(qū)復制到網(wǎng)卡接口蟆炊,此時數(shù)據(jù)才能通過網(wǎng)絡發(fā)送。

通常情況下崇败,Kafka的消息會有多個訂閱者盅称,生產者發(fā)布的消息會被不同的消費者多次消費,為了優(yōu)化這個流程后室,Kafka使用了“零拷貝技術”缩膝,如下圖所示:

image

“零拷貝技術”只用將磁盤文件的數(shù)據(jù)復制到頁面緩存中一次,然后將數(shù)據(jù)從頁面緩存直接發(fā)送到網(wǎng)絡中(發(fā)送給不同的訂閱者時岸霹,都可以使用同一個頁面緩存)疾层,避免了重復復制操作。

如果有10個消費者贡避,傳統(tǒng)方式下痛黎,數(shù)據(jù)復制次數(shù)為4*10=40次,而使用“零拷貝技術”只需要1+10=11次刮吧,一次為從磁盤復制到頁面緩存湖饱,10次表示10個消費者各自讀取一次頁面緩存。


傳統(tǒng)的文件拷貝通常需要從用戶態(tài)去轉到核心態(tài)杀捻,經(jīng)過read buffer井厌,然后再返回到用戶態(tài)的應用層buffer,然后再從用戶態(tài)把數(shù)據(jù)拷貝到核心態(tài)的socket buffer,然后發(fā)送到網(wǎng)卡仅仆。

image

傳統(tǒng)的數(shù)據(jù)傳輸需要多次的用戶態(tài)和核心態(tài)之間的切換器赞,而且還要把數(shù)據(jù)復制多次,最終才打到網(wǎng)卡墓拜。

如果減少了用戶態(tài)與核心態(tài)之間的切換港柜,是不是就會更快了呢?

image

此時我們會發(fā)現(xiàn)用戶態(tài)“空空如也”咳榜。數(shù)據(jù)沒有來到用戶態(tài)夏醉,而是直接在核心態(tài)就進行了傳輸,但這樣依然還是有多次復制贿衍。首先數(shù)據(jù)被讀取到read buffer中授舟,然后發(fā)到socket buffer救恨,最后才發(fā)到網(wǎng)卡贸辈。雖然減少了用戶態(tài)和核心態(tài)的切換,但依然存在多次數(shù)據(jù)復制肠槽。

如果可以進一步減少數(shù)據(jù)復制的次數(shù)擎淤,甚至沒有數(shù)據(jù)復制是不是就會做到最快呢?

DMA

別急秸仙,這里我們先介紹一個新的武器:DMA嘴拢。

DMA,全稱叫Direct Memory Access寂纪,一種可讓某些硬件子系統(tǒng)去直接訪問系統(tǒng)主內存席吴,而不用依賴CPU的計算機系統(tǒng)的功能。聽著是不是很厲害捞蛋,跳過CPU孝冒,直接訪問主內存。傳統(tǒng)的內存訪問都需要通過CPU的調度來完成拟杉。如下圖:

image

而DMA庄涡,則可以繞過CPU,硬件自己去直接訪問系統(tǒng)主內存搬设。如下圖:

image

很多硬件都支持DMA穴店,這其中就包括網(wǎng)卡。

image

零拷貝

回到本文中的文件傳輸拿穴,有了DMA后泣洞,就可以實現(xiàn)絕對的零拷貝了,因為網(wǎng)卡是直接去訪問系統(tǒng)主內存的默色。如下圖:

image

Java的零拷貝實現(xiàn)

在Java中的零拷貝實現(xiàn)是在FileChannel中球凰,其中有個方法transferTo(position,fsize,src)。

傳統(tǒng)的文件傳輸是通過java.io.DataOutputStream,java.io.FileInputStream來實現(xiàn)的弟蚀,然后通過while循環(huán)來讀取input蚤霞,然后寫入到output中。

image

零拷貝則是通過java.nio.channels.FileChannel中的transferTo方法來實現(xiàn)的义钉。transferTo方法底層是基于操作系統(tǒng)的sendfile這個system call來實現(xiàn)的(不再需要拷貝到用戶態(tài)了)昧绣,sendfile負責把數(shù)據(jù)從某個fd(file descriptor)傳輸?shù)搅硪粋€fd。

sendfile:

image
image

Java的transferTo:

image

傳統(tǒng)方式與零拷貝性能對比

image

可以看出速度快出至少三倍多捶闸。Kafka在文件傳輸?shù)倪^程中正是使用了零拷貝技術對文件進行拷貝夜畴。建議以后多用FileChannel的transferTo吧。

總結

  • 傳統(tǒng)的文件傳輸有多次用戶態(tài)和內核態(tài)之間的切換删壮,而且文件在多個buffer之間要復制多次最終才被發(fā)送到網(wǎng)卡贪绘。
  • DMA是一種硬件直接訪問系統(tǒng)主內存的技術。
  • 多種硬件都已使用了DMA技術央碟,其中就包括網(wǎng)卡(NIC)碉碉。
  • DMA技術讓CPU得到解放,讓CPU可以不用一直守著來完成文件傳輸沃于。
  • 零拷貝技術減少了用戶態(tài)與內核態(tài)之間的切換它呀,讓拷貝次數(shù)降到最低,從而實現(xiàn)高性能洛勉。
  • Kafka使用零拷貝技術來進行文件的傳輸粘秆。

zk在kafka中的作用

Kafka集群中有一個broker會被選舉為Controller,負責管理集群broker的上下線收毫,所有topic的分區(qū)副本分配和leader選舉等工作攻走。
Controller的管理工作都是依賴于Zookeeper的。
人話:
每個broker都會在zk進行注冊
然后KafkaController會實時監(jiān)聽zk中的/brokers/ids下的節(jié)點情況[0,1,2]
如果broker0宕機 ids中的節(jié)點會實時變化為[1,2]
KafkaController會更新topic中的leader和isr隊列
KafkaController會獲取當前可用的isr并從中選出新的leader

kafka的消息發(fā)送流程是什么樣的

Kafka的Producer發(fā)送消息采用的是異步發(fā)送的方式此再。
在消息發(fā)送的過程中昔搂,涉及到了兩個線程——main線程和Sender線程,
以及一個線程共享變量——RecordAccumulator(這個里面有分區(qū))

main線程將消息發(fā)送給RecordAccumulator引润,
Sender線程不斷從RecordAccumulator中拉取消息發(fā)送到Kafka broker巩趁。

注意這里面 先走攔截器 再走序列化器 再走分區(qū)器
達到batch.size大小或者是linger.ms時間就發(fā)到RecordAccumulator中
sender線程去拉取
image
相關參數(shù):
batch.size:只有數(shù)據(jù)積累到batch.size之后,sender才會發(fā)送數(shù)據(jù)淳附。(默認16kb)
linger.ms:如果數(shù)據(jù)遲遲未達到batch.size议慰,sender等待linger.time之后就會發(fā)送數(shù)據(jù)。

如何使用kafka API 實現(xiàn)異步消息發(fā)送

準備知識

需要用到的類:

KafkaProducer:需要創(chuàng)建一個生產者對象奴曙,用來發(fā)送數(shù)據(jù)

ProducerConfig:獲取所需的一系列配置參數(shù)

ProducerRecord:每條數(shù)據(jù)都要封裝成一個ProducerRecord對象

幾個比較重要的配置項

//kafka集群别凹,broker-list
props.put("bootstrap.servers", "hadoop102:9092");

    props.put("acks", "all");

    //重試次數(shù)
    props.put("retries", 1); 

    //批次大小
    props.put("batch.size", 16384); 

    //等待時間
    props.put("linger.ms", 1); 

    //RecordAccumulator緩沖區(qū)大小
    props.put("buffer.memory", 33554432);

    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  • kafka集群位置
  • 批次大小
  • 批次等待時間
  • 重試次數(shù)
  • 緩沖區(qū)大小
  • 序列化器(org\apache\kafka\common\serialization\Serializer.java)
image
image
org\apache\kafka\clients\producer\ProducerConfig.java

public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
public static final String BATCH_SIZE_CONFIG = "batch.size";
public static final String ACKS_CONFIG = "acks";
public static final String LINGER_MS_CONFIG = "linger.ms";
public static final String CLIENT_ID_CONFIG = "client.id";
public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
public static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";
public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = "reconnect.backoff.max.ms";
public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";
public static final String BUFFER_MEMORY_CONFIG = "buffer.memory";
public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
public static final String METRICS_RECORDING_LEVEL_CONFIG = "metrics.recording.level";
public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
public static final String RETRIES_CONFIG = "retries";
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";

public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";

public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
public static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";
public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";
public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms";
public static final String TRANSACTIONAL_ID_CONFIG = "transactional.id";
org\apache\kafka\clients\consumer\ConsumerConfig.java

public static final String GROUP_ID_CONFIG = "group.id";
public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records";
public static final String MAX_POLL_INTERVAL_MS_CONFIG = "max.poll.interval.ms";
public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";
public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms";
public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";
public static final String FETCH_MAX_BYTES_CONFIG = "fetch.max.bytes";
public static final int DEFAULT_FETCH_MAX_BYTES = 52428800;
public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
public static final String MAX_PARTITION_FETCH_BYTES_CONFIG = "max.partition.fetch.bytes";
public static final int DEFAULT_MAX_PARTITION_FETCH_BYTES = 1048576;
public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
public static final String CLIENT_ID_CONFIG = "client.id";
public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = "reconnect.backoff.max.ms";
public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
public static final String METRICS_RECORDING_LEVEL_CONFIG = "metrics.recording.level";
public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
public static final String CHECK_CRCS_CONFIG = "check.crcs";
public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
public static final String EXCLUDE_INTERNAL_TOPICS_CONFIG = "exclude.internal.topics";
public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true;
public static final String ISOLATION_LEVEL_CONFIG = "isolation.level";
public static final String DEFAULT_ISOLATION_LEVEL;
org\apache\kafka\clients\CommonClientConfigs.java

public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
public static final String CLIENT_ID_CONFIG = "client.id";
public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = "reconnect.backoff.max.ms";
public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
public static final String METRICS_RECORDING_LEVEL_CONFIG = "metrics.recording.level";
public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
public static final String SECURITY_PROTOCOL_CONFIG = "security.protocol";
public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT";
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
public static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";

不帶回調的API

image
package com.atguigu.kafka;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();

        // 整個配置中的key可以使用ProducerConfig中定義的常量
        //kafka集群,broker-list
        props.put("bootstrap.servers", "hadoop102:9092");

        props.put("acks", "all");

        //重試次數(shù)
        props.put("retries", 1); 

        //批次大小
        props.put("batch.size", 16384); 

        //等待時間
        props.put("linger.ms", 1); 

        //RecordAccumulator緩沖區(qū)大小
        props.put("buffer.memory", 33554432);

        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), Integer.toString(i)));
            // 輪循 這個會用到分區(qū)器 
            producer.send(new ProducerRecord("second","value++>"+i));
            
            // 根據(jù)給的key進行hash 然后放在不同的分區(qū) 這個會使用到分區(qū)器
            producer.send(new ProducerRecord("second","key"+i,"value==>"+i));
            
            // 具體指定了分區(qū)號 就不再使用到key 這個不會用到分區(qū)器
            if(i<5){
                producer.send(new ProducerRecord("second","key"+i,"value**>"+i));
            }else{
                producer.send(new ProducerRecord("second","key"+i,"value^^>"+i));
            }
            
            
        }
               

        producer.close();
    }
}
// 分區(qū)器源碼解讀
org\apache\kafka\clients\producer\Partitioner.java

public interface Partitioner extends Configurable, Closeable {
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
    public void close();
}
//唯一實現(xiàn)類
org\apache\kafka\clients\producer\internals\DefaultPartitioner.java
public class DefaultPartitioner implements Partitioner {
    // 傳進來的是topic key 還有序列化后的key value 序列化后的value
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        //如果key是空的 后面的邏輯用了自增然后對分區(qū)取余 其實就是輪循
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // 如果key不是空的 將keyBytes傳進去然后做hash murmur2是一種哈希算法
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
}

帶回調的API

image
跟上面不同的就是在使用send方法時候 帶上一個回調函數(shù)
    // 回調方法:當前消息發(fā)出后 不管是消息成功發(fā)送還是發(fā)送失敗 都會執(zhí)行該回調方法
    // metadata 當前消息的元數(shù)據(jù)
    // metadata能拿到當前分區(qū)的各種數(shù)據(jù) 如下圖所示
    // 偏移量 分區(qū) 主題 時間戳 等等
    // exception 當消息發(fā)送失敗 會返回該異常
image
// org\apache\kafka\clients\producer\Callback.java
public interface Callback {
    public void onCompletion(RecordMetadata metadata, Exception exception);
}
// 這是一個接口 里面有一個方法

它有兩個實現(xiàn)類
image
// 示例 帶回調的API
package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class MyCallBackProducer {
    public static void main(String[] args) throws Exception {
        //1. 創(chuàng)建配置對象
        Properties props  = new Properties();
        //kafka集群的位置
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        //ack級別
        props.put(ProducerConfig.ACKS_CONFIG,"all");
        //重試次數(shù)
        props.put(ProducerConfig.RETRIES_CONFIG,3);
        //批次大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        //等待時間
        props.put(ProducerConfig.LINGER_MS_CONFIG,1);
        //緩沖區(qū)大小
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
        //k v 序列化器
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //2. 創(chuàng)建生產者對象
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);

        //3.生產數(shù)據(jù)
        for (int i = 0; i < 10000 ; i++) {
            producer.send(new ProducerRecord<>("second", "atguigu@@@@@" + i), new Callback() {
                /**
                 * 回調方法: 當前的消息發(fā)送出去以后洽糟,會執(zhí)行回調方法炉菲。
                 * @param metadata  當前消息的元數(shù)據(jù)信息堕战。
                 * @param exception 當發(fā)送失敗,會返回異常拍霜。
                 */
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        //發(fā)送成功
                        System.out.println(metadata.topic() + " -- " + metadata.partition() + " -- " + metadata.offset());
                    }
                }
            });
        }


       // TimeUnit.MILLISECONDS.sleep(100);

        //關閉
        producer.close();
    }
}

kafka API中沒有寫producer.close()為什么讀不到數(shù)據(jù) 也沒有回調方法

這是因為異步發(fā)送消息的原因

main線程在發(fā)送完數(shù)據(jù)之后就結束了 這個時間小于了批次拉取設置的時間1ms 

sender線程去拉取數(shù)據(jù)的同時需要執(zhí)行main線程中的回調方法 
但是現(xiàn)在main線程已經(jīng)關閉 所以無法執(zhí)行回調方法

如果我們不寫close方法 而是讓main線程休眠100ms 這時sender就能在這個時間內拉取到數(shù)據(jù)并執(zhí)行回調方法

所以close方法肯定會等待sender線程拉取數(shù)據(jù)完成后再進行關閉
具體實現(xiàn)可以看close()方法的源碼 如下
// org\apache\kafka\clients\producer\KafkaProducer.java

    /**
     * Close this producer. This method blocks until all previously sent requests complete.
     * This method is equivalent to <code>close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)</code>.
     * <p>
     * <strong>If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS)
     * will be called instead. We do this because the sender thread would otherwise try to join itself and
     * block forever.</strong>
     * <p>
     *
     * @throws InterruptException If the thread is interrupted while blocked
     */
//關閉此生產者嘱丢。 此方法一直阻塞所有以前發(fā)送的請求完成。 此方法等效于close(Long.MAX_VALUE, TimeUnit.MILLISECONDS) 如果關閉()被從調用Callback 祠饺,警告消息將被記錄并關閉(0越驻,TimeUnit.MILLISECONDS)將被代替調用。 我們這樣做是因為發(fā)件人線程否則將嘗試加入自己和永遠阻塞道偷。
    @Override
    public void close() {
        close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    }

    /**
     * This method waits up to <code>timeout</code> for the producer to complete the sending of all incomplete requests.
     * <p>
     * If the producer is unable to complete all requests before the timeout expires, this method will fail
     * any unsent and unacknowledged records immediately.
     * <p>
     * If invoked from within a {@link Callback} this method will not block and will be equivalent to
     * <code>close(0, TimeUnit.MILLISECONDS)</code>. This is done since no further sending will happen while
     * blocking the I/O thread of the producer.
     *
     * @param timeout The maximum time to wait for producer to complete any pending requests. The value should be
     *                non-negative. Specifying a timeout of zero means do not wait for pending send requests to complete.
     * @param timeUnit The time unit for the <code>timeout</code>
     * @throws InterruptException If the thread is interrupted while blocked
     * @throws IllegalArgumentException If the <code>timeout</code> is negative.
     */
// 這種方法最多等待timeout的生產者完成所有未完成的請求的發(fā)送缀旁。
// 如果生產者是無法完成所有請求超時到期之前,此方法將立即失敗任何未發(fā)送和未確認的記錄勺鸦。
// 如果從內調用Callback此方法不會阻止和將等效于close(0, TimeUnit.MILLISECONDS) 這樣做是因為同時阻斷生產者的I/O線程沒有進一步的發(fā)送會發(fā)生
    @Override
    public void close(long timeout, TimeUnit timeUnit) {
        close(timeout, timeUnit, false);
    }

    private void close(long timeout, TimeUnit timeUnit, boolean swallowException) {
        if (timeout < 0)
            throw new IllegalArgumentException("The timeout cannot be negative.");

        log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeUnit.toMillis(timeout));
        // this will keep track of the first encountered exception
        AtomicReference<Throwable> firstException = new AtomicReference<>();
        boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
        if (timeout > 0) {
            if (invokedFromCallback) {
                log.warn("Overriding close timeout {} ms to 0 ms in order to prevent useless blocking due to self-join. " +
                        "This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", timeout);
            } else {
                // Try to close gracefully.
                if (this.sender != null)
                    this.sender.initiateClose();
                if (this.ioThread != null) {
                    try {
                        this.ioThread.join(timeUnit.toMillis(timeout));
                    } catch (InterruptedException t) {
                        firstException.compareAndSet(null, t);
                        log.error("Interrupted while joining ioThread", t);
                    }
                }
            }
        }

        if (this.sender != null && this.ioThread != null && this.ioThread.isAlive()) {
            log.info("Proceeding to force close the producer since pending requests could not be completed " +
                    "within timeout {} ms.", timeout);
            this.sender.forceClose();
            // Only join the sender thread when not calling from callback.
            // 僅當不從回調調用時才加入發(fā)送者線程并巍。
            if (!invokedFromCallback) {
                try {
                    this.ioThread.join();
                } catch (InterruptedException e) {
                    firstException.compareAndSet(null, e);
                }
            }
        }

        ClientUtils.closeQuietly(interceptors, "producer interceptors", firstException);
        ClientUtils.closeQuietly(metrics, "producer metrics", firstException);
        ClientUtils.closeQuietly(keySerializer, "producer keySerializer", firstException);
        ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException);
        ClientUtils.closeQuietly(partitioner, "producer partitioner", firstException);
        AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
        log.debug("The Kafka producer has closed.");
        if (firstException.get() != null && !swallowException)
            throw new KafkaException("Failed to close kafka producer", firstException.get());
    }

如何使用kafka API 實現(xiàn)同步消息發(fā)送

同步發(fā)送的意思就是,一條消息發(fā)送之后换途,會阻塞當前線程懊渡,直至返回ack。
由于send方法返回的是一個Future對象怀跛,根據(jù)Futrue對象的特點距贷,我們也可以實現(xiàn)同步發(fā)送的效果柄冲,只需在調用Future對象的get方法即可吻谋。
區(qū)別就在于在send方法處拿到返回值future
然后調用future中的get方法
調用此方法就會阻塞當前線程 一直等到結果返回
java\util\concurrent\Future.java
image
package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class MyCallBackProducer {
    public static void main(String[] args) throws Exception {
        //1. 創(chuàng)建配置對象
        Properties props  = new Properties();
        //kafka集群的位置
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        //ack級別
        props.put(ProducerConfig.ACKS_CONFIG,"all");
        //重試次數(shù)
        props.put(ProducerConfig.RETRIES_CONFIG,3);
        //批次大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        //等待時間
        props.put(ProducerConfig.LINGER_MS_CONFIG,1);
        //緩沖區(qū)大小
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
        //k v 序列化器
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //2. 創(chuàng)建生產者對象
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);

        //3.生產數(shù)據(jù)
        for (int i = 0; i < 10000 ; i++) {
            Future<RecordMetadata> future = producer.send(new ProducerRecord<>("second", "atguigu@@@@@" + i), new Callback() {
                /**
                 * 回調方法: 當前的消息發(fā)送出去以后,會執(zhí)行回調方法现横。
                 * @param metadata  當前消息的元數(shù)據(jù)信息漓拾。
                 * @param exception 當發(fā)送失敗,會返回異常戒祠。
                 */
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        //發(fā)送成功
                        System.out.println(metadata.topic() + " -- " + metadata.partition() + " -- " + metadata.offset());
                    }
                }
            });
            // 發(fā)送一個之后阻塞線程等待返回結果才繼續(xù)發(fā)送下一個
            // 阻塞等待 骇两, 同步發(fā)送
            // 此時會發(fā)現(xiàn)結果嚴格按照發(fā)送的順序
            RecordMetadata recordMetadata = future.get();
        }


        //關閉
        producer.close();
    }
}

kafka的分區(qū)器怎么寫 如何自定義分區(qū)器

  • 繼承Partitioner

  • 重寫三個方法configure() partition() close()

可以根據(jù)傳進的key分區(qū) 也可根據(jù)value分區(qū)

在定義好自己的分區(qū)器之后 還要再配置中添加分區(qū)器的全類名 否則會走默認的分區(qū)器

系統(tǒng)默認分區(qū)器
public class DefaultPartitioner implements Partitioner {

    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    public void configure(Map<String, ?> configs) {}

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }

    public void close() {}

}
// 簡單實現(xiàn)一個分區(qū)器
public class MyPartitioner implements Partitioner {
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (key == null) { // key為空 到0號分區(qū)
            return 0;
        } else { // key不為空 到1號分區(qū)
            return 1;
        }
    }

    public void close() {

    }

    public void configure(Map<String, ?> configs) {

    }
}
// 如果要使用自己定義的分區(qū)器 要在配置中指定分區(qū)器并傳入分區(qū)器的全類名
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.partitioner.MyPartitioner");

kafka的消費者需要注意的主要問題是什么

Consumer消費數(shù)據(jù)時的可靠性是很容易保證的,因為數(shù)據(jù)在Kafka中是持久化的姜盈,故不用擔心數(shù)據(jù)丟失問題低千。

由于consumer在消費過程中可能會出現(xiàn)斷電宕機等故障,consumer恢復后馏颂,需要從故障前的位置的繼續(xù)消費示血,所以consumer需要實時記錄自己消費到了哪個offset,以便故障恢復后繼續(xù)消費救拉。

所以offset的維護是Consumer消費數(shù)據(jù)是必須考慮的問題难审。

如何使用kafka API 實現(xiàn)消息接收(消費者)

準備知識

需要用到的類:

KafkaConsumer:需要創(chuàng)建一個消費者對象,用來消費數(shù)據(jù)

ConsumerConfig:獲取所需的一系列配置參數(shù)

ConsuemrRecord:每條數(shù)據(jù)都要封裝成一個ConsumerRecord對象

為了使我們能夠專注于自己的業(yè)務邏輯亿絮,Kafka提供了自動提交offset的功能告喊。

自動提交offset的相關參數(shù):

enable.auto.commit:是否開啟自動提交offset功能

auto.commit.interval.ms:自動提交offset的時間間隔


幾個比較重要的配置項

  • 自動提交offset功能
  • 自動提交時間間隔
  • 消費者組
  • 反序列化器(對應生產者端的序列化org\apache\kafka\common\serialization\Deserializer.java)
image

自動提交offset

package fun.hoffee.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * 消費者
 */
public class MyConsumer {
    public static void main(String[] args) {
        //1. 創(chuàng)建配置對象
        Properties props = new Properties();
        //指定kafka集群的位置
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");

        //開啟自動提交offset
        //props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        
        //自動提交offset的間隔
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1);

        //指定消費者組
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "atguigu");

        //指定kv的反序列化器
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //2. 創(chuàng)建消費者對象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        //3. 訂閱主題
        consumer.subscribe(Arrays.asList("first", "second", "third"));

        //4. 消費數(shù)據(jù)
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);

            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.topic() + " -- " + record.partition() + " -- " + record.offset() + " -- " + record.key() + " -- " + record.value());
            }
        }
    }
}

// 此時創(chuàng)建的是新組 不能消費到之前的數(shù)據(jù)

// 如果想要消費之前的數(shù)據(jù) 需要重置offset
// 由auto.offset.rest參數(shù)(ConsumerConfig中的AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";)控制  默認值為latest
// 可以配置為 earliest | latest | none
---
// 文檔說明如下 :
// What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): 
// earliest: automatically reset the offset to the earliest offset 
// latest: automatically reset the offset to the latest offset 
// none: throw exception to the consumer if no previous offset is found for the consumer's group 
// anything else: throw exception to the consumer.
// 當Kafka中沒有初始偏移量或服務器上不再存在當前偏移量時(例如麸拄,因為該數(shù)據(jù)已被刪除),該怎么辦:
// 最早:自動將偏移量重置為最早的偏移量 
// 最新:自動將偏移量重置為最新偏移量 
// 無:如果未找到消費者組的先前偏移量黔姜,則向消費者拋出異常 
// 其他:向消費者拋出異常

// 人話: 如果這個是一個新的組 或者是 這個組拿了一個kafka中不存在的偏移量去消費數(shù)據(jù)時候 kafka就會自動幫忙重置offset 如果配置過這個參數(shù) 就按這個參數(shù)配置的來 如果沒有配置過 默認重置為latest

重置offset

具體說明見上一節(jié)代碼末尾

package com.atguigu.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * 消費者
 */
public class MyConsumer {
    public static void main(String[] args) {
        //1. 創(chuàng)建配置對象
        Properties props = new Properties();
        //指定kafka集群的位置
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        //開啟自動提交offset
        //props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        //關閉自動提交offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        //自動提交offset的間隔
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1);

        //重置offset :   earliest(最早)   latest(最后)
        //滿足兩個條件: 
        // 1. 當前的消費者組在kafka沒有消費過所訂閱的主題   
        // 2.當前消費者組使用的offset在kafka集群中已經(jīng)被刪除
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        //指定消費者組
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"atguigu111");

        //指定kv的反序列化器
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");

        //2. 創(chuàng)建消費者對象
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);

        //3. 訂閱主題
        consumer.subscribe(Arrays.asList("first","second","third"));

        //4. 消費數(shù)據(jù)
        while(true){
            // 此處是拉取數(shù)據(jù)方法 poll中傳遞的參數(shù)是超時時間 當主題中沒有數(shù)據(jù)時候 等待超時時間之后再進行拉取數(shù)據(jù)
            // 假如某一次沒有消費到數(shù)據(jù) 會等待響應的時間之后再進行拉取 單位是ms
            ConsumerRecords<String, String> records  = consumer.poll(1000);

            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.topic() + " -- " + record.partition() + " -- " + record.offset() +" -- " +
                    record.key() +" -- " + record.value());
            }
        }
    }
}

手動提交offset的兩種方式

雖然自動提交offset十分簡介便利拢切,但由于其是基于時間提交的,開發(fā)人員難以把握offset提交的時機秆吵。因此Kafka還提供了手動提交offset的API失球。

手動提交offset的方法有兩種:分別是commitSync(同步提交)和commitAsync(異步提交)。

兩者的相同點是帮毁,都會將本次poll的一批數(shù)據(jù)最高的偏移量提交实苞;

不同點是,commitSync阻塞當前線程烈疚,一直到提交成功黔牵,并且會自動失敗重試(由不可控因素導致,也會出現(xiàn)提交失斠巍)猾浦;而commitAsync則沒有失敗重試機制,故有可能提交失敗灯抛。


由于同步提交offset有失敗重試機制金赦,故更加可靠

雖然同步提交offset更可靠一些,但是由于其會阻塞當前線程对嚼,直到提交成功夹抗。因此吞吐量會收到很大的影響。因此更多的情況下纵竖,會選用異步提交offset的方式漠烧。
如果關閉了提交offset 在一直沒有關閉consumer的情況下 consumer能正常消費數(shù)據(jù) 
因為consumer從kafka中拿到offset后會一直將offset維護在內存中

但是一旦關閉 因為沒有向kafka提交過offset 則offset還是之前的
那么這段時間生產的數(shù)據(jù)將被重復消費
package com.atguigu.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * 消費者
 */
public class MyConsumer {
    public static void main(String[] args) {
        //1. 創(chuàng)建配置對象
        Properties props = new Properties();
        //指定kafka集群的位置
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        //開啟自動提交offset
        //props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        //關閉自動提交offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        //自動提交offset的間隔
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1);

        //重置offset :   earliest(最早)   latest(最后)
        //滿足兩個條件: 1. 當前的消費者組在kafka沒有消費過所訂閱的主題   2.當前消費者組使用的offset在kafka集群中已經(jīng)被刪除
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        //指定消費者組
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"atguigu111");

        //指定kv的反序列化器
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");

        //2. 創(chuàng)建消費者對象
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);

        //3. 訂閱主題
        consumer.subscribe(Arrays.asList("first","second","third"));

        //4. 消費數(shù)據(jù)
        while(true){
            // 此處是拉取數(shù)據(jù)方法 poll中傳遞的參數(shù)是超時時間 當主題中沒有數(shù)據(jù)時候 等待超時時間之后再進行拉取數(shù)據(jù)
            // 假如某一次沒有消費到數(shù)據(jù) 會等待響應的時間之后再進行拉取 單位是ms
            ConsumerRecords<String, String> records  = consumer.poll(1000);

            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.topic() + " -- " + record.partition() + " -- " + record.offset() +" -- " +
                    record.key() +" -- " + record.value());
        }
            //手動提交offset
            //同步提交 代碼會阻塞 直到提交offset成功 才開始消費下一條數(shù)據(jù)
            consumer.commitSync();  //阻塞
            //異步提交 會觸發(fā)提交offset的操作 但是會繼續(xù)消費數(shù)據(jù) 不管offset是否提交成功
            //consumer.commitAsync();
        }
    }
}

kafka中重復消費數(shù)據(jù)和漏消費數(shù)據(jù)的情況

無論是同步提交還是異步提交offset,都有可能會造成數(shù)據(jù)的漏消費或者重復消費靡砌。

先提交offset后消費已脓,有可能造成數(shù)據(jù)的漏消費;

而先消費后提交offset通殃,有可能會造成數(shù)據(jù)的重復消費度液。
這是offset的提交  和 消費數(shù)據(jù) 這兩件事之間的先后順序問題

例1 : 
消費者poll進100條數(shù)據(jù) 但是在消費到第60條時候宕機 但是offset已經(jīng)提交 這時候 offset超前
則后40條出現(xiàn)漏消費

例2 :
消費者poll進100條數(shù)據(jù) 但是offset在提交時候失敗 但此時是先消費后提交offset的情況 這時候 offset滯后
則這100條數(shù)據(jù)在下次啟動時候會被重復消費
如何解決這個問題?

將兩件事情綁定在一起 如果失敗則同時失敗 如果成功則同時成功
不允許出現(xiàn)一個失敗一個成功的情況

將兩件事綁定為事務

kafka API 如何實現(xiàn)自定義存儲offset

Kafka 0.9版本之前,offset存儲在zookeeper画舌,0.9版本及之后堕担,默認將offset存儲在Kafka的一個內置的topic中。除此之外骗炉,Kafka還可以選擇自定義存儲offset照宝。

offset的維護是相當繁瑣的,因為需要考慮到消費者的Rebalace句葵。

當有新的消費者加入消費者組厕鹃、已有的消費者推出消費者組或者所訂閱的主題的分區(qū)發(fā)生變化兢仰,就會觸發(fā)到分區(qū)的重新分配,重新分配的過程叫做Rebalance剂碴。

消費者發(fā)生Rebalance之后把将,每個消費者消費的分區(qū)就會發(fā)生變化。因此消費者要首先獲取到自己被重新分配到的分區(qū)忆矛,并且定位到每個分區(qū)最近提交的offset位置繼續(xù)消費察蹲。

要實現(xiàn)自定義存儲offset,需要借助ConsumerRebalanceListener催训,以下為示例代碼洽议,其中提交和獲取offset的方法,需要根據(jù)所選的offset存儲系統(tǒng)自行實現(xiàn)漫拭。
package com.atguigu.kafka.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

public class CustomConsumer {

    private static Map<TopicPartition, Long> currentOffset = new HashMap<>();

public static void main(String[] args) {

        // 創(chuàng)建配置信息
        Properties props = new Properties();

        // Kafka集群
        props.put("bootstrap.servers", "hadoop102:9092"); 

        // 消費者組亚兄,只要group.id相同,就屬于同一個消費者組
        props.put("group.id", "test"); 

        // 關閉自動提交offset
        props.put("enable.auto.commit", "false");

        // Key和Value的反序列化類
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 創(chuàng)建一個消費者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 消費者訂閱主題 在訂閱時候創(chuàng)建一個ConsumerRebalanceListener的對象實時監(jiān)聽
        // 并重寫兩個方法onPartitionsRevoked 和 onPartitionsAssigned
        consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {
            
            //該方法會在Rebalance之前調用
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                commitOffset(currentOffset);
            }

            //該方法會在Rebalance之后調用
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                currentOffset.clear();
                for (TopicPartition partition : partitions) {
                    consumer.seek(partition, getOffset(partition));
                    //定位到最近提交的offset位置繼續(xù)消費
                }
            }
        });

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            //消費者拉取數(shù)據(jù)
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
            }
            commitOffset(currentOffset);//異步提交
        }
    }

    //獲取某分區(qū)的最新offset
    private static long getOffset(TopicPartition partition) {
        return 0;// 這里是偽代碼 需要根據(jù)具體存儲的系統(tǒng)來實現(xiàn)
    }

    //提交該消費者所有分區(qū)的offset
    private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
    // 這里是偽代碼 需要根據(jù)具體存儲的系統(tǒng)來實現(xiàn)
    }
}

kafka中的攔截器是如何實現(xiàn)的 原理是什么

Producer攔截器(interceptor)是在Kafka 0.10版本被引入的采驻,主要用于實現(xiàn)clients端的定制化控制邏輯审胚。
對于producer而言,interceptor使得用戶在消息發(fā)送前以及producer回調邏輯前有機會對消息做一些定制化需求礼旅,比如修改消息等膳叨。同時,producer允許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)痘系。Intercetpor的實現(xiàn)接口是org.apache.kafka.clients.producer.ProducerInterceptor菲嘴,其定義的方法包括:
(1)configure(configs)
獲取配置信息和初始化數(shù)據(jù)時調用。

(2)onSend(ProducerRecord):
該方法封裝進KafkaProducer.send方法中碎浇,即它運行在用戶主線程中临谱。Producer確保在消息被序列化以及計算分區(qū)前調用該方法。用戶可以在該方法中對消息做任何操作奴璃,但最好保證不要修改消息所屬的topic和分區(qū),否則會影響目標分區(qū)的計算城豁。

(3)onAcknowledgement(RecordMetadata, Exception):
該方法會在消息從RecordAccumulator成功發(fā)送到Kafka Broker之后苟穆,或者在發(fā)送過程中失敗時調用。并且通常都是在producer回調邏輯觸發(fā)之前唱星。onAcknowledgement運行在producer的IO線程中雳旅,因此不要在該方法中放入很重的邏輯,否則會拖慢producer的消息發(fā)送效率间聊。

(4)close:
關閉interceptor攒盈,主要用于執(zhí)行一些資源清理工作
如前所述,interceptor可能被運行在多個線程中哎榴,因此在具體實現(xiàn)時用戶需要自行確保線程安全型豁。另外倘若指定了多個interceptor僵蛛,則producer將按照指定順序調用它們,并僅僅是捕獲每個interceptor可能拋出的異常記錄到錯誤日志中而非在向上傳遞迎变。這在使用過程中要特別留意充尉。

請實現(xiàn)一個kafka的攔截器

需求:

實現(xiàn)一個簡單的雙interceptor組成的攔截鏈。第一個interceptor會在消息發(fā)送前將時間戳信息加到消息value的最前部衣形;第二個interceptor會在消息發(fā)送后更新成功發(fā)送消息數(shù)或失敗發(fā)送消息數(shù)驼侠。

分析:

image

時間攔截器

package fun.hoffee.kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

/**
 * 在所有的消息內容前面加上時間戳
 */
public class TimeInterceptor implements ProducerInterceptor<String, String> {

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        //獲取當前消息的value
        String value = record.value();
        value = System.currentTimeMillis() + " -- " + value;

        //構造一個producerRecord
        ProducerRecord<String, String> resultRecord =
                new ProducerRecord<>(record.topic(), record.partition(), record.key(), value);
        return resultRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

計數(shù)攔截器

package fun.hoffee.kafka.interceptor;


import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

/**
 * 統(tǒng)計發(fā)送成功或失敗的消息個數(shù)
 */
public class CountInterceptor implements ProducerInterceptor<String, String> {

    private Integer success = 0;

    private Integer fail = 0;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        // 相當于原路返回沒有做處理
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            success++;
        } else {
            fail++;
        }
    }

    @Override
    public void close() {
        // 整個攔截器走完之后 調用該方法
        System.out.println("Success : " + success);
        System.out.println("Fail :" + fail);
    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

在生產者的配置文件中配置攔截器(可設置多個 設置為一個list)

package fun.hoffee.kafka.interceptor;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class InterceptorProducer {
    public static void main(String[] args) {
        //1. 創(chuàng)建配置對象
        Properties props = new Properties();
        //指定kafka集群的位置,broker-list
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        //指定ack的應答級別  0  1  -1(all)
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        //重試次數(shù)
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        //批次大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);  // 16kb
        //等待時間
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        //RecordAccumulator緩沖區(qū)大小
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);  // 32M

        //指定kv的序列化器
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        //指定攔截器
        // "A list of classes to use as interceptors. Implementing the <code>ProducerInterceptor</code> interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.";
        List<String> interceptors = new ArrayList<>();
        interceptors.add("com.atguigu.kafka.interceptor.TimeInterceptor");
        interceptors.add("com.atguigu.kafka.interceptor.CountInterceptor");
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

        //2. 創(chuàng)建生產者對象
        KafkaProducer producer = new KafkaProducer<String, String>(props);

        //3. 生產數(shù)據(jù)

        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord("second", "shangguigu==>" + i));
        }

        //4. 關閉
        producer.close();
    }
}

flume如何對接kafka

使用kafkasink

此時kafkasink相當于kafka的生產者 它可以根據(jù)消息的標記發(fā)送給kafka中不同的topic

flume官網(wǎng)關于kafka sink的介紹如下

這是一個Flume Sink實現(xiàn)谆吴,可以將數(shù)據(jù)發(fā)布到 Kafka主題倒源。目標之一是將Flume與Kafka集成在一起,以便基于拉式的處理系統(tǒng)可以處理來自各種Flume來源的數(shù)據(jù)句狼。目前相速,該版本支持Kafka 0.9.x系列發(fā)行版。

此版本的Flume不再支持Kafka的舊版本(0.8.x)鲜锚。

必需的屬性以粗體標記突诬。

Property Name Default Description
type Must be set to org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers List of brokers Kafka-Sink will connect to, to get the list of topic partitions This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port
Kafka-Sink將連接到的代理列表,以獲取主題分區(qū)列表芜繁。這可以是部分代理列表旺隙,但是對于HA,我們建議至少兩個骏令。格式是用逗號分隔的主機名:端口列表
kafka.topic default-flume-topic The topic in Kafka to which the messages will be published. If this parameter is configured, messages will be published to this topic. If the event header contains a “topic” field, the event will be published to that topic overriding the topic configured here.
Kafka中將發(fā)布消息的主題蔬捷。如果配置了此參數(shù),則消息將發(fā)布到該主題榔袋。如果事件標題包含“主題”字段周拐,則事件將發(fā)布到該主題,并覆蓋此處配置的主題凰兑。
flumeBatchSize 100 How many messages to process in one batch. Larger batches improve throughput while adding latency.
一批中要處理多少條消息妥粟。較大的批次可提高吞吐量,同時增加延遲吏够。
kafka.producer.acks 1 How many replicas must acknowledge a message before its considered successfully written. Accepted values are 0 (Never wait for acknowledgement), 1 (wait for leader only), -1 (wait for all replicas) Set this to -1 to avoid data loss in some cases of leader failure.
在成功考慮一條消息之前勾给,有多少個副本必須確認一條消息。接受的值為0(永遠不等待確認)锅知,1(僅等待領導者)播急,-1(等待所有副本)將其設置為-1,以避免在某些領導者失敗的情況下丟失數(shù)據(jù)售睹。
useFlumeEventFormat false By default events are put as bytes onto the Kafka topic directly from the event body. Set to true to store events as the Flume Avro binary format. Used in conjunction with the same property on the KafkaSource or with the parseAsFlumeEvent property on the Kafka Channel this will preserve any Flume headers for the producing side.
默認情況下桩警,事件直接從事件主體作為字節(jié)放入Kafka主題。設置為true可將事件存儲為Flume Avro二進制格式昌妹。與KafkaSource上的相同屬性或Kafka Channel上的parseAsFlumeEvent屬性結合使用捶枢,將為生產方保留任何Flume標頭握截。
defaultPartitionId Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless overriden by partitionIdHeader. By default, if this property is not set, events will be distributed by the Kafka Producer’s partitioner - including by key if specified (or by a partitioner specified by kafka.partitioner.class).
partitionIdHeader When set, the sink will take the value of the field named using the value of this property from the event header and send the message to the specified partition of the topic. If the value represents an invalid partition, an EventDeliveryException will be thrown. If the header value is present then this setting overrides defaultPartitionId.
kafka.producer.security.protocol PLAINTEXT Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.
more producer security props If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on producer.
Other Kafka Producer Properties These properties are used to configure the Kafka Producer. Any producer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefix kafka.producer. For example: kafka.producer.linger.ms

The Kafka sink also provides defaults for the key.serializer(org.apache.kafka.common.serialization.StringSerializer) and value.serializer(org.apache.kafka.common.serialization.ByteArraySerializer). Modification of these parameters is not recommended.

An example configuration of a Kafka sink is given below. Properties starting with the prefix kafka.producer the Kafka producer. The properties that are passed when creating the Kafka producer are not limited to the properties given in this example. Also it is possible to include your custom properties here and access them inside the preprocessor through the Flume Context object passed in as a method argument.

示例配置如下:

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic // 指定寫入topic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 // kafka位置
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.ki.kafka.producer.compression.type = snappy

實現(xiàn)flume中不同的event發(fā)往kafka中不同的topic

image

如何監(jiān)控kafka

kafka面試題總結

1.Kafka中的ISR、OSR柱蟀、AR又代表什么川蒙?

ISR:與leader保持同步的follower集合
AR:分區(qū)的所有副本

2.Kafka中的HW、LEO等分別代表什么长已?

LEO:沒個副本的最后條消息的offset
HW:一個分區(qū)中所有副本最小的offset 控制整個分區(qū)中哪些數(shù)據(jù)能夠暴露給消費者

3.Kafka中是怎么體現(xiàn)消息順序性的畜眨?

每個分區(qū)內,每條消息都有一個offset术瓮,故只能保證分區(qū)內有序康聂。

4.Kafka中的分區(qū)器、序列化器胞四、攔截器是否了解恬汁?它們之間的處理順序是什么?

攔截器 -> 序列化器 -> 分區(qū)器

5.Kafka生產者客戶端的整體結構是什么樣子的辜伟?使用了幾個線程來處理氓侧?分別是什么?

image

6.“消費組中的消費者個數(shù)如果超過topic的分區(qū)导狡,那么就會有消費者消費不到數(shù)據(jù)”這句話是否正確约巷?

正確

7.消費者提交消費位移時提交的是當前消費到的最新消息的offset還是offset+1?

offset+1 記錄下次消費的數(shù)據(jù)的offset

8.有哪些情形會造成重復消費旱捧?

image

9.有哪些情景會造成消息漏消費独郎?

先提交offset,后消費枚赡,有可能造成數(shù)據(jù)的重復

10.當你使用kafka-topics.sh創(chuàng)建(刪除)了一個topic之后氓癌,Kafka背后會執(zhí)行什么邏輯?

1)會在zookeeper中的/brokers/topics節(jié)點下創(chuàng)建一個新的topic節(jié)點贫橙,如:/brokers/topics/first

2)觸發(fā)Controller的監(jiān)聽程序

3)kafka Controller 負責topic的創(chuàng)建工作贪婉,并更新metadata cache

11.topic的分區(qū)數(shù)可不可以增加?如果可以怎么增加料皇?如果不可以谓松,那又是為什么?

可以增加

bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-config --partitions 3

12.topic的分區(qū)數(shù)可不可以減少践剂?如果可以怎么減少?如果不可以娜膘,那又是為什么逊脯?

不可以減少,現(xiàn)有的分區(qū)數(shù)據(jù)難以處理竣贪。

13.Kafka有內部的topic嗎军洼?如果有是什么巩螃?有什么所用?

__consumer_offsets, 共有50個分區(qū) 保存消費者offset

14.Kafka分區(qū)分配的概念匕争?

一個topic多個分區(qū)避乏,一個消費者組多個消費者,故需要將分區(qū)分配個消費者(roundrobin甘桑、range)

15.簡述Kafka的日志目錄結構拍皮?

每個分區(qū)對應一個文件夾,文件夾的命名為topic-0,topic-1,內部為.log和.index文件

16.如果我指定了一個offset箱季,Kafka Controller怎么查找到對應的消息当辐?

先通過offset比對log文件的名字 確定好后 再找到對應的index文件中offset對應的消息索引位置

最后在log文件中找到相應的消息

image

17.聊一聊Kafka Controller的作用?

負責管理集群broker的上下線冠摄,所有topic的分區(qū)副本分配和leader選舉等工作。

18.Kafka中有那些地方需要選舉?這些地方的選舉策略又有哪些愧驱?

partition leader(ISR),由Controller負責

Controller(先到先得)

19.失效副本是指什么椭盏?有那些應對措施组砚?

不能及時與leader同步,暫時踢出ISR庸汗,等其追上leader之后再重新加入

20.Kafka的那些設計讓它有如此高的性能惫确?

分區(qū),順序寫磁盤蚯舱,0-copy

其他kafka相關面試題搜集(一)

1改化、請說明什么是Apache Kafka?

Apache Kafka是由Apache開發(fā)的一種發(fā)布訂閱消息系統(tǒng),它是一個分布式的枉昏、分區(qū)的和可復制的提交日志服務陈肛。

2兄裂、說說Kafka的使用場景晰奖?

①異步處理
②應用解耦
③流量削峰
④日志處理
⑤消息通訊等。

3匾南、使用Kafka有什么優(yōu)點和缺點啃匿?

優(yōu)點:
①支持跨數(shù)據(jù)中心的消息復制;
②單機吞吐量:十萬級,最大的優(yōu)點溯乒,就是吞吐量高;
③topic數(shù)量都吞吐量的影響:topic從幾十個到幾百個的時候夹厌,吞吐量會大幅度下降。所以在同等機器下裆悄,kafka盡量保證topic數(shù)量不要過多矛纹。如果要支撐大規(guī)模topic,需要增加更多的機器資源;
④時效性:ms級;
⑤可用性:非常高光稼,kafka是分布式的或南,一個數(shù)據(jù)多個副本,少數(shù)機器宕機钟哥,不會丟失數(shù)據(jù)迎献,不會導致不可用;
⑥消息可靠性:經(jīng)過參數(shù)優(yōu)化配置,消息可以做到0丟失;
⑦功能支持:功能較為簡單腻贰,主要支持簡單的MQ功能吁恍,在大數(shù)據(jù)領域的實時計算以及日志采集被大規(guī)模使用写烤。

缺點:
①由于是批量發(fā)送感局,數(shù)據(jù)并非真正的實時; 僅支持統(tǒng)一分區(qū)內消息有序,無法實現(xiàn)全局消息有序藻雌;
②有可能消息重復消費受啥;
③依賴zookeeper進行元數(shù)據(jù)管理,等等膝但。

4丑孩、為什么說Kafka性能很好,體現(xiàn)在哪里逃延?

①順序讀寫
②零拷貝
③分區(qū)
④批量發(fā)送
⑤數(shù)據(jù)壓縮

5檩电、請說明什么是傳統(tǒng)的消息傳遞方法?

傳統(tǒng)的消息傳遞方法包括兩種:
排隊:在隊列中,一組用戶可以從服務器中讀取消息,每條消息都發(fā)送給其中一個人恢准。
發(fā)布-訂閱:在這個模型中坠非,消息被廣播給所有的用戶秋泳。

6、請說明Kafka相對傳統(tǒng)技術有什么優(yōu)勢?

①快速:單一的Kafka代理可以處理成千上萬的客戶端,每秒處理數(shù)兆字節(jié)的讀寫操作。
②可伸縮:在一組機器上對數(shù)據(jù)進行分區(qū)
③和簡化奕筐,以支持更大的數(shù)據(jù)
④持久:消息是持久性的笆怠,并在集群中進
⑤行復制,以防止數(shù)據(jù)丟失。
⑥設計:它提供了容錯保證和持久性

7、解釋Kafka的Zookeeper是什么?我們可以在沒有Zookeeper的情況下使用Kafka嗎?

Zookeeper是一個開放源碼的、高性能的協(xié)調服務,它用于Kafka的分布式應用螃壤。
不,不可能越過Zookeeper,直接聯(lián)系Kafka broker睦霎。一旦Zookeeper停止工作,它就不能服務客戶端請求。
Zookeeper主要用于在集群中不同節(jié)點之間進行通信
在Kafka中,它被用于提交偏移量裹赴,因此如果節(jié)點在任何情況下都失敗了睛竣,它都可以從之前提交的偏移量中獲取
除此之外,它還執(zhí)行其他活動射沟,如: leader檢測殊者、分布式同步、配置管理躏惋、識別新節(jié)點何時離開或連接幽污、集群刑然、節(jié)點實時狀態(tài)等等。

8、解釋Kafka的用戶如何消費信息?

在Kafka中傳遞消息是通過使用sendfile API完成的。它支持將字節(jié)從套接口轉移到磁盤,通過內核空間保存副本唇聘,并在內核用戶之間調用內核阱持。

9、解釋如何提高遠程用戶的吞吐量?

如果用戶位于與broker不同的數(shù)據(jù)中心蛆挫,則可能需要調優(yōu)套接口緩沖區(qū)大小,以對長網(wǎng)絡延遲進行攤銷。

10、解釋一下,在數(shù)據(jù)制作過程中咽斧,你如何能從Kafka得到準確的信息?

在數(shù)據(jù)中屎暇,為了精確地獲得Kafka的消息把介,你必須遵循兩件事:

在數(shù)據(jù)消耗期間避免重復勤讽,在數(shù)據(jù)生產過程中避免重復。

這里有兩種方法拗踢,可以在數(shù)據(jù)生成時準確地獲得一個語義:

每個分區(qū)使用一個單獨的寫入器脚牍,每當你發(fā)現(xiàn)一個網(wǎng)絡錯誤,檢查該分區(qū)中的最后一條消息巢墅,以查看您的最后一次寫入是否成功

在消息中包含一個主鍵(UUID或其他)诸狭,并在用戶中進行反復制

11、解釋如何減少ISR中的擾動?broker什么時候離開ISR?

ISR是一組與leaders完全同步的消息副本君纫,也就是說ISR中包含了所有提交的消息驯遇。ISR應該總是包含所有的副本,直到出現(xiàn)真正的故障蓄髓。如果一個副本從leader中脫離出來叉庐,將會從ISR中刪除。

12会喝、Kafka為什么需要復制?

Kafka的信息復制確保了任何已發(fā)布的消息不會丟失陡叠,并且可以在機器錯誤玩郊、程序錯誤或更常見些的軟件升級中使用。

13枉阵、如果副本在ISR中停留了很長時間表明什么?

如果一個副本在ISR中保留了很長一段時間译红,那么它就表明,跟蹤器無法像在leader收集數(shù)據(jù)那樣快速地獲取數(shù)據(jù)兴溜。

14侦厚、請說明如果首選的副本不在ISR中會發(fā)生什么?

如果首選的副本不在ISR中,控制器將無法將leadership轉移到首選的副本昵慌。

15假夺、有可能在生產后發(fā)生消息偏移嗎?

在大多數(shù)隊列系統(tǒng)中,作為生產者的類無法做到這一點斋攀,它的作用是觸發(fā)并忘記消息已卷。broker將完成剩下的工作,比如使用id進行適當?shù)脑獢?shù)據(jù)處理淳蔼、偏移量等侧蘸。

作為消息的用戶,你可以從Kafka broker中獲得補償鹉梨。如果你注視SimpleConsumer類讳癌,你會注意到它會獲取包括偏移量作為列表的MultiFetchResponse對象。此外存皂,當你對Kafka消息進行迭代時晌坤,你會擁有包括偏移量和消息發(fā)送的MessageAndOffset對象。

16旦袋、Kafka的設計時什么樣的呢骤菠?

Kafka將消息以topic為單位進行歸納

將向Kafka topic發(fā)布消息的程序成為producers. 將訂閱了topics并消費消息的程序成為consumer.

Kafka以集群的方式運行,可以由一個或多個服務組成疤孕,每個服務叫做一個broker.

producers通過網(wǎng)絡將消息發(fā)送到Kafka集群商乎,集群向消費者提供消息

17、數(shù)據(jù)傳輸?shù)氖聞斩x有哪三種祭阀?

(1)最多一次:
消息不會被重復發(fā)送鹉戚,最多被傳輸一次,但也有可能一次不傳輸
(2)最少一次: 消息不會被漏發(fā)送专控,最少被傳輸一次抹凳,但也有可能被重復傳輸.
(3)精確的一次(Exactly once): 不會漏傳輸也不會重復傳輸,每個消息都傳輸被一次而且僅僅被傳輸一次,這是大家所期望的

18踩官、Kafka判斷一個節(jié)點是否還活著有那兩個條件却桶?

(1)節(jié)點必須可以維護和ZooKeeper的連接,Zookeeper通過心跳機制檢查每個節(jié)點的連接
(2)如果節(jié)點是個follower,他必須能及時的同步leader的寫操作,延時不能太久

19颖系、producer是否直接將數(shù)據(jù)發(fā)送到broker的leader(主節(jié)點)嗅剖?

producer直接將數(shù)據(jù)發(fā)送到broker的leader(主節(jié)點),不需要在多個節(jié)點進行分發(fā)嘁扼,為了幫助producer做到這點信粮,所有的Kafka節(jié)點都可以及時的告知:哪些節(jié)點是活動的,目標topic目標分區(qū)的leader在哪趁啸。這樣producer就可以直接將消息發(fā)送到目的地了强缘。

20、Kafa consumer是否可以消費指定分區(qū)消息不傅?

Kafa consumer消費消息時旅掂,向broker發(fā)出"fetch"請求去消費特定分區(qū)的消息,consumer指定消息在日志中的偏移量(offset)访娶,就可以消費從這個位置開始的消息商虐,customer擁有了offset的控制權,可以向后回滾去重新消費之前的消息崖疤,這是很有意義的

21秘车、Kafka消息是采用Pull模式,還是Push模式劫哼?

Kafka最初考慮的問題是叮趴,customer應該從brokes拉取消息還是brokers將消息推送到consumer,也就是pull還push权烧。在這方面眯亦,Kafka遵循了一種大部分消息系統(tǒng)共同的傳統(tǒng)的設計:producer將消息推送到broker,consumer從broker拉取消息一些消息系統(tǒng)比如Scribe和Apache Flume采用了push模式般码,將消息推送到下游的consumer搔驼。這樣做有好處也有壞處:由broker決定消息推送的速率,對于不同消費速率的consumer就不太好處理了侈询。消息系統(tǒng)都致力于讓consumer以最大的速率最快速的消費消息,但不幸的是糯耍,push模式下扔字,當broker推送的速率遠大于consumer消費的速率時,consumer恐怕就要崩潰了温技。最終Kafka還是選取了傳統(tǒng)的pull模式

Pull模式的另外一個好處是consumer可以自主決定是否批量的從broker拉取數(shù)據(jù)革为。Push模式必須在不知道下游consumer消費能力和消費策略的情況下決定是立即推送每條消息還是緩存之后批量推送。如果為了避免consumer崩潰而采用較低的推送速率舵鳞,將可能導致一次只推送較少的消息而造成浪費震檩。Pull模式下,consumer就可以根據(jù)自己的消費能力去決定這些策略

Pull有個缺點是,如果broker沒有可供消費的消息抛虏,將導致consumer不斷在循環(huán)中輪詢博其,直到新消息到t達。為了避免這點迂猴,Kafka有個參數(shù)可以讓consumer阻塞知道新消息到達(當然也可以阻塞知道消息的數(shù)量達到某個特定的量這樣就可以批量發(fā)

22慕淡、Kafka存儲在硬盤上的消息格式是什么?

消息由一個固定長度的頭部和可變長度的字節(jié)數(shù)組組成沸毁。頭部包含了一個版本號和CRC32校驗碼峰髓。
消息長度: 4 bytes (value: 1+4+n)
版本號: 1 byte
CRC校驗碼: 4 bytes
具體的消息: n bytes

23、Kafka高效文件存儲設計特點:

(1).Kafka把topic中一個parition大文件分成多個小文件段息尺,通過多個小文件段携兵,就容易定期清除或刪除已經(jīng)消費完文件,減少磁盤占用搂誉。
(2).通過索引信息可以快速定位message和確定response的最大大小徐紧。
(3).通過index元數(shù)據(jù)全部映射到memory,可以避免segment file的IO磁盤操作勒葱。
(4).通過索引文件稀疏存儲浪汪,可以大幅降低index文件元數(shù)據(jù)占用空間大小。

24凛虽、Kafka 與傳統(tǒng)消息系統(tǒng)之間有三個關鍵區(qū)別

(1).Kafka 持久化日志死遭,這些日志可以被重復讀取和無限期保留
(2).Kafka 是一個分布式系統(tǒng):它以集群的方式運行,可以靈活伸縮凯旋,在內部通過復制數(shù)據(jù)提升容錯能力和高可用性
(3).Kafka 支持實時的流式處理

25呀潭、Kafka創(chuàng)建Topic時如何將分區(qū)放置到不同的Broker中

副本因子不能大于 Broker 的個數(shù);
第一個分區(qū)(編號為0)的第一個副本放置位置是隨機從 brokerList 選擇的至非;
其他分區(qū)的第一個副本放置位置相對于第0個分區(qū)依次往后移钠署。也就是如果我們有5個 Broker,5個分區(qū)荒椭,假設第一個分區(qū)放在第四個 Broker 上谐鼎,那么第二個分區(qū)將會放在第五個 Broker 上;第三個分區(qū)將會放在第一個 Broker 上趣惠;第四個分區(qū)將會放在第二個 Broker 上狸棍,依次類推;
剩余的副本相對于第一個副本放置位置其實是由 nextReplicaShift 決定的味悄,而這個數(shù)也是隨機產生的

26草戈、Kafka新建的分區(qū)會在哪個目錄下創(chuàng)建

在啟動 Kafka 集群之前,我們需要配置好 log.dirs 參數(shù)侍瑟,其值是 Kafka 數(shù)據(jù)的存放目錄唐片,這個參數(shù)可以配置多個目錄丙猬,目錄之間使用逗號分隔,通常這些目錄是分布在不同的磁盤上用于提高讀寫性能费韭。 當然我們也可以配置 log.dir 參數(shù)茧球,含義一樣。只需要設置其中一個即可揽思。 如果 log.dirs 參數(shù)只配置了一個目錄袜腥,那么分配到各個 Broker 上的分區(qū)肯定只能在這個目錄下創(chuàng)建文件夾用于存放數(shù)據(jù)。 但是如果 log.dirs 參數(shù)配置了多個目錄钉汗,那么 Kafka 會在哪個文件夾中創(chuàng)建分區(qū)目錄呢羹令?答案是:Kafka 會在含有分區(qū)目錄最少的文件夾中創(chuàng)建新的分區(qū)目錄,分區(qū)目錄名為 Topic名+分區(qū)ID损痰。注意福侈,是分區(qū)文件夾總數(shù)最少的目錄,而不是磁盤使用量最少的目錄卢未!也就是說肪凛,如果你給 log.dirs 參數(shù)新增了一個新的磁盤,新的分區(qū)目錄肯定是先在這個新的磁盤上創(chuàng)建直到這個新的磁盤目錄擁有的分區(qū)目錄不是最少為止辽社。

27伟墙、partition的數(shù)據(jù)如何保存到硬盤

topic中的多個partition以文件夾的形式保存到broker,每個分區(qū)序號從0遞增滴铅, 且消息有序 Partition文件下有多個segment(xxx.index戳葵,xxx.log) segment 文件里的 大小和配置文件大小一致可以根據(jù)要求修改 默認為1g 如果大小大于1g時,會滾動一個新的segment并且以上一個segment最后一條消息的偏移量命名

28汉匙、kafka的ack機制

request.required.acks有三個值 0 1 -1
0:生產者不會等待broker的ack拱烁,這個延遲最低但是存儲的保證最弱當server掛掉的時候就會丟數(shù)據(jù)
1:服務端會等待ack值 leader副本確認接收到消息后發(fā)送ack但是如果leader掛掉后他不確保是否復制完成新leader也會導致數(shù)據(jù)丟失
-1:同樣在1的基礎上 服務端會等所有的follower的副本受到數(shù)據(jù)后才會受到leader發(fā)出的ack,這樣數(shù)據(jù)不會丟失

29噩翠、Kafka的消費者如何消費數(shù)據(jù)

消費者每次消費數(shù)據(jù)的時候戏自,消費者都會記錄消費的物理偏移量(offset)的位置 等到下次消費時,他會接著上次位置繼續(xù)消費伤锚。同時也可以按照指定的offset進行重新消費擅笔。

30、消費者負載均衡策略

結合consumer的加入和退出進行再平衡策略屯援。

31剂娄、kafka消息數(shù)據(jù)是否有序?

消費者組里某具體分區(qū)是有序的玄呛,所以要保證有序只能建一個分區(qū),但是實際這樣會存在性能問題和二,具體業(yè)務具體分析后確認徘铝。

32、kafaka生產數(shù)據(jù)時數(shù)據(jù)的分組策略,生產者決定數(shù)據(jù)產生到集群的哪個partition中

每一條消息都是以(key,value)格式 Key是由生產者發(fā)送數(shù)據(jù)傳入 所以生產者(key)決定了數(shù)據(jù)產生到集群的哪個partition

33惕它、kafka consumer 什么情況會觸發(fā)再平衡reblance?

①一旦消費者加入或退出消費組怕午,導致消費組成員列表發(fā)生變化,消費組中的所有消費者都要執(zhí)行再平衡淹魄。
②訂閱主題分區(qū)發(fā)生變化郁惜,所有消費者也都要再平衡。

34甲锡、描述下kafka consumer 再平衡步驟?

①關閉數(shù)據(jù)拉取線程兆蕉,清空隊列和消息流,提交偏移量缤沦;
②釋放分區(qū)所有權虎韵,刪除zk中分區(qū)和消費者的所有者關系;
③將所有分區(qū)重新分配給每個消費者缸废,每個消費者都會分到不同分區(qū)包蓝;
④將分區(qū)對應的消費者所有關系寫入ZK,記錄分區(qū)的所有權信息企量;
⑤重啟消費者拉取線程管理器测萎,管理每個分區(qū)的拉取線程。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末届巩,一起剝皮案震驚了整個濱河市硅瞧,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌姆泻,老刑警劉巖零酪,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異拇勃,居然都是意外死亡四苇,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進店門方咆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來月腋,“玉大人,你說我怎么就攤上這事瓣赂∮苌В” “怎么了?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵煌集,是天一觀的道長妓肢。 經(jīng)常有香客問我,道長苫纤,這世上最難降的妖魔是什么碉钠? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任纲缓,我火速辦了婚禮,結果婚禮上喊废,老公的妹妹穿的比我還像新娘祝高。我一直安慰自己,他們只是感情好污筷,可當我...
    茶點故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布工闺。 她就那樣靜靜地躺著,像睡著了一般瓣蛀。 火紅的嫁衣襯著肌膚如雪陆蟆。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天揪惦,我揣著相機與錄音遍搞,去河邊找鬼。 笑死器腋,一個胖子當著我的面吹牛溪猿,可吹牛的內容都是我干的。 我是一名探鬼主播纫塌,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼诊县,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了措左?” 一聲冷哼從身側響起依痊,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤都伪,失蹤者是張志新(化名)和其女友劉穎炼团,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體凹蜂,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡凉逛,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年性宏,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片状飞。...
    茶點故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡毫胜,死狀恐怖,靈堂內的尸體忽然破棺而出诬辈,到底是詐尸還是另有隱情酵使,我是刑警寧澤,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布焙糟,位于F島的核電站口渔,受9級特大地震影響,放射性物質發(fā)生泄漏穿撮。R本人自食惡果不足惜缺脉,卻給世界環(huán)境...
    茶點故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一瞧哟、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧枪向,春花似錦、人聲如沸咧党。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽傍衡。三九已至深员,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蛙埂,已是汗流浹背倦畅。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留绣的,地道東北人叠赐。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像屡江,于是被迫代替她去往敵國和親芭概。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,700評論 2 354