kafka學(xué)習(xí)筆記

  • kafka是一個分布式的基于發(fā)布/訂閱模式的消息隊列(Message Queue)茎芭,主要應(yīng)用于大數(shù)據(jù)實時處理領(lǐng)域抖拦。
傳統(tǒng)消息隊列的應(yīng)用場景
  • 使用消息隊列的好處:
    • 解耦:允許你獨立地擴展或修改兩邊的處理過程堰塌,只要確保它們遵守同樣的接口約束且预。
    • 可恢復(fù)性:系統(tǒng)的一部分組件失效時蜈项,不會影響到整個系統(tǒng)定硝。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉件豌,加入隊列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理疮方。
    • 緩沖:有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度,解決生產(chǎn)消息和消費消息的處理速度不一致的情況茧彤。
    • 削峰:在訪問量劇增的情況下,使用消息隊列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力疆栏,而不會因為突發(fā)的超負荷的請求而完全崩潰曾掂。
    • 異步通信:很多時候用戶不需要立即處理消息。消息隊列提供了異步處理機制壁顶,允許用戶把一個消息放入隊列珠洗,但并不立即處理它。想向隊列中放入多少消息就放多少若专,然后在需要的時候再去處理它們许蓖。
  • 消息隊列的兩種模式:
    • 點對點模式:一對一,消費者主動拉取消息,消費者消費消息后膊爪,消息就被清除自阱。隊列支持存在多個消費者,但是對一個消息而言米酬,只會有一個消費者可以消費沛豌。
    • 發(fā)布/訂閱模式:一對多,生產(chǎn)者生產(chǎn)消息放到隊列中赃额,隊列推送消息加派,消費者消費消息后,消息不會被清除跳芳。
點對點模式
發(fā)布/訂閱模式(缺點:消費者長輪詢)
kafka基礎(chǔ)架構(gòu)
  • Producer:消息生產(chǎn)者芍锦,就是向 kafka broker 發(fā)消息的客戶端;
  • Consumer:消息消費者飞盆,向 kafka broker 取消息的客戶端醉旦;
  • Consumer Group(CG):消費者組,由多個 consumer 組成桨啃。消費者組內(nèi)每個消費者負責(zé)消費不同分區(qū)的數(shù)據(jù)车胡,一個分區(qū)只能由一個組內(nèi)某個消費者消費;消費者組之間互不影響照瘾。 所有的消費者都屬于某個消費者組匈棘,即消費者組是邏輯上的一個訂閱者。
  • Broker:一臺 kafka 服務(wù)器就是一個 broker析命。一個集群由多個 broker 組成主卫。一個 broker可以容納多個 topic。
  • Topic: 可以理解為一個隊列鹃愤, 生產(chǎn)者和消費者面向的都是一個 topic簇搅;
  • Partition: 為了實現(xiàn)擴展性,一個非常大的 topic 可以分布到多個 broker(即服務(wù)器)上软吐,一個 topic 可以分為多個 partition瘩将,每個 partition 是一個有序的隊列;
  • Replication:副本凹耙,為保證集群中的某個節(jié)點發(fā)生故障時姿现,該節(jié)點上的分區(qū)數(shù)據(jù)不丟失,且Kafka仍然能夠繼續(xù)工作肖抱, Kafka 提供了副本機制备典,一個 topic 的每個分區(qū)都有若干個副本,一個 leader 和若干個 follower意述。
  • Leader: 每個分區(qū)多個副本的“主”提佣,生產(chǎn)者發(fā)送數(shù)據(jù)的對象吮蛹,以及消費者消費數(shù)據(jù)的對象都是 leader。
  • Follower:每個分區(qū)多個副本中的“從”拌屏,實時從 leader 中同步數(shù)據(jù)潮针,保持和 leader 數(shù)據(jù)的同步。 leader 發(fā)生故障時槐壳,某個 Follower 會成為新的 leader然低。

依賴zookeeper集群搭建kafka集群

tar -zxvf kafka_2.12-2.5.0.tgz、mv kafka_2.12-2.5.0 kafka-1
tar -zxvf kafka_2.12-2.5.0.tgz枫笛、mv kafka_2.12-2.5.0 kafka-2
tar -zxvf kafka_2.12-2.5.0.tgz吨灭、mv kafka_2.12-2.5.0 kafka-3
  • 修改各個節(jié)點中kafka服務(wù)器的配置文件server.propertiesproducer.properties刑巧、consumer.properties
  • /opt/kafka-1/config/server.properties
broker.id=1 #服務(wù)器節(jié)點編號
listeners=PLAINTEXT://192.168.211.147:9092 #設(shè)置監(jiān)聽哪個端口
advertised.listeners=PLAINTEXT://192.168.211.147:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka-1/data #存放kafka數(shù)據(jù)的目錄
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.211.147:2181,192.168.211.147:2182,192.168.211.147:2183
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
  • /opt/kafka-1/config/producer.properties
bootstrap.servers=192.168.211.147:9092
compression.type=none
  • /opt/kafka-1/config/consumer.properties
bootstrap.servers=192.168.211.147:9092
group.id=test
  • /opt/kafka-2/config/server.properties
broker.id=2 #服務(wù)器節(jié)點編號
listeners=PLAINTEXT://192.168.211.141:9092 #設(shè)置監(jiān)聽哪個端口
advertised.listeners=PLAINTEXT://192.168.211.141:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka-2/data #存放kafka數(shù)據(jù)的目錄
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.211.147:2181,192.168.211.147:2182,192.168.211.147:2183
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
  • /opt/kafka-2/config/producer.properties
bootstrap.servers=192.168.211.141:9092
compression.type=none
  • /opt/kafka-2/config/consumer.properties
bootstrap.servers=192.168.211.141:9092
group.id=test
  • /opt/kafka-3/config/server.properties
broker.id=3 #服務(wù)器節(jié)點編號
listeners=PLAINTEXT://192.168.211.156:9092 #設(shè)置監(jiān)聽哪個端口
advertised.listeners=PLAINTEXT://192.168.211.156:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka-3/data #存放kafka數(shù)據(jù)的目錄
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.211.147:2181,192.168.211.147:2182,192.168.211.147:2183
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
  • /opt/kafka-3/config/producer.properties
bootstrap.servers=192.168.211.156:9092
compression.type=none
  • /opt/kafka-3/config/consumer.properties
bootstrap.servers=192.168.211.156:9092
group.id=test
  • 分別在各個節(jié)點的kafka安裝目錄下創(chuàng)建兩個文件夾:mkdir data logs
  • 分別在各個節(jié)點的kafka安裝目錄下的data文件夾中創(chuàng)建一個文件meta.properties
  • /opt/kafka-1/data/meta.properties
version=0
broker.id=1 #注意和server.properties中的broker.id的值相同
  • /opt/kafka-2/data/meta.properties
version=0
broker.id=2
  • /opt/kafka-3/data/meta.properties
version=0
broker.id=3
  • 先啟動zookeeper集群喧兄,再啟動kafka集群(可以單獨寫一個shell腳本來啟動和關(guān)閉集群),然后鍵入命令:jps查看kafka是否已啟動啊楚。
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
nohup bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &
  • 進入kafka安裝目錄下的logs文件夾吠冤,通過文件server.log可以查看該kafka節(jié)點的運行日志。
  • 創(chuàng)建主題(bigdata3)恭理,指定分區(qū)數(shù)2個和副本數(shù)1個(注意:分區(qū)數(shù)可以超過節(jié)點數(shù)拯辙,副本數(shù)不能超過節(jié)點數(shù)):
bin/kafka-topics.sh --zookeeper 192.168.211.147:2181 --create --topic bigdata3 --partitions 2 --replication-factor 1
  • 列舉主題(存放在zk中)
bin/kafka-topics.sh --zookeeper 192.168.211.147:2181 --list
  • 刪除主題(bigdata3),刪除后會被標(biāo)記為刪除狀態(tài)颜价,等cpu空閑時再自動清除
./bin/kafka-topics.sh --zookeeper 192.168.211.147:2181 --delete --topic bigdata3
  • 主題分布詳情:
./bin/kafka-topics.sh --zookeeper 192.168.211.147:2182 --describe --topic bigdata3
  • bigdata3主題上發(fā)送消息(啟動生產(chǎn)者):
./bin/kafka-console-producer.sh --broker-list 192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092 --topic bigdata3
  • bigdata3主題上讀取消息(啟動消費者):
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092 --topic bigdata3 --from-beginning
kafka工作流程
  • kafka中消息是以topic進行分類的涯保,producer生產(chǎn)消息,consumer消費消息周伦,都是面向topic的夕春。(從命令行操作看出)
  • topic是邏輯上的概念,而partition是物理上的概念专挪,每個partition對應(yīng)于一個log文件及志,該 log 文件中存儲的就是producer 生產(chǎn)的數(shù)據(jù)。(topic = N partition狈蚤,partition = log)
kafka文件存儲機制
  • 由于生產(chǎn)者生產(chǎn)的消息會不斷追加到.log文件末尾困肩, 為防止.log文件過大導(dǎo)致數(shù)據(jù)定位效率低下,kafka采取了分片索引機制脆侮,將每個partition分為多個segment。每個segment對應(yīng)兩個文件——.index文件和.log文件勇劣。 這些文件位于一個文件夾下靖避,該文件夾的命名規(guī)則為:topic name+分區(qū)序號潭枣。例如:first-topic這個topic有三個分區(qū),則其對應(yīng)的文件夾為first-topic-0幻捏,first-topic-1盆犁,first-topic-2
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
.index文件和.log文件的結(jié)構(gòu)示意圖
  • .index.log文件以當(dāng)前segment的第一條消息的offset命名篡九。.index文件存儲大量的索引信息谐岁,.log文件存儲大量的數(shù)據(jù),索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message的物理偏移地址榛臼。
  • 主題分區(qū)的原因:
    • 方便在集群中擴展伊佃,每個partition可以通過調(diào)整以適應(yīng)它所在的機器,而一個topic又可以有多個partition組成沛善,因此整個集群就可以適應(yīng)適合的數(shù)據(jù)了航揉;
    • 可以提高并發(fā),因為可以以partition為單位進行讀寫了金刁。
  • 分區(qū)的原則:
    • 指明partition的情況下帅涂,直接將指明的值作為partiton值;
    • 沒有指明partition值但有key的情況下尤蛮,將key的 hash 值與 topic 的 partition 總數(shù)進行取余得到的partition值媳友;
    • 既沒有partition值又沒有 key 值的情況下,第一次調(diào)用時隨機生成一個整數(shù)(后面每次調(diào)用在這個整數(shù)上自增)产捞,將這個值與 topic 可用的 partition 總數(shù)取余得到 partition值醇锚,即round-robin算法。
生產(chǎn)者發(fā)送數(shù)據(jù)分裝為ProducerRecord對象的API
  • 生產(chǎn)者數(shù)據(jù)可靠性保證:為保證producer發(fā)送的數(shù)據(jù)轧葛,能可靠地發(fā)送到指定的 topic搂抒, topic 的每個partition收到producer 發(fā)送的數(shù)據(jù)后,都需要向 producer 發(fā)送ack(acknowledgement:確認收到)尿扯,若producer收到 ack求晶,則會進行下一輪的發(fā)送,否則重新發(fā)送數(shù)據(jù)衷笋。
生產(chǎn)者數(shù)據(jù)可靠性保證機制
  • 何時發(fā)送ack芳杏?確保有followerleader同步完成,leader再發(fā)送ack辟宗,這樣才能保證leader掛掉之后爵赵,能在follower中選舉出新的leader
  • 多少個follower同步完成之后發(fā)送ack泊脐?①半數(shù)以上的follower同步完成空幻,即可發(fā)送ack;②全部的follower同步完成容客,才可以發(fā)送ack秕铛。
序號 方案 優(yōu)點 缺點
1 半數(shù)以上完成同步约郁,就發(fā)送ack 延遲低 選舉新的 leader 時,容忍 n 臺節(jié)點的故障但两,需要 2n+1 個副本鬓梅。容錯率:1/2。
2 全部完成同步谨湘,才發(fā)送ack 選舉新的 leader 時绽快, 容忍 n 臺節(jié)點的故障,需要 n+1 個副本紧阔。容錯率:1坊罢。 延遲高
  • kafka選取了第二種方案,原因:①為了容忍 n 臺節(jié)點的故障寓辱,第一種方案需要 2n+1 個副本艘绍,而第二種方案只需要 n+1 個副本,又 kafka 的每個分區(qū)都有大量的數(shù)據(jù)秫筏, 因此诱鞠,第一種方案會造成大量數(shù)據(jù)的冗余。②雖然第二種方案的網(wǎng)絡(luò)延遲會比較高这敬,但網(wǎng)絡(luò)延遲對 kafka 的影響較小航夺。
  • kafka采用第二種方案之后,設(shè)想以下情景: leader 收到數(shù)據(jù)崔涂,所有 follower 都開始同步數(shù)據(jù)阳掐,但有一個 follower,因為某種故障遲遲不能與 leader 進行同步冷蚂,那 leader 就要一直等下去缭保,直到它完成同步,才能發(fā)送 ack蝙茶。這個問題怎么解決呢艺骂?
    • leader 維護了一個動態(tài)的in-sync replica set(ISR),意為和 leader 保持同步的 follower 集合隆夯。當(dāng) ISR 中的 follower 完成數(shù)據(jù)的同步之后钳恕,就會給 leader 發(fā)送 ack。若follower長時間未向leader同步數(shù)據(jù)蹄衷,則該follower將被踢出ISR忧额,該時間閾值由replica.lag.time.max.ms參數(shù)設(shè)定。當(dāng)leader 發(fā)生故障之后愧口,就會從 ISR 中選舉新的 leader睦番。
    • replica.lag.time.max.ms:If a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time, the leader will remove the follower from isr。TYPE: long耍属。DEFAULT: 10000抡砂。
  • 若對數(shù)據(jù)的可靠性要求不是很高大咱,能夠容忍少量數(shù)據(jù)的丟失恬涧,則沒必要等 ISR 中的 follower 全部接收成功注益,因此,kafka 為用戶提供了三種可靠性級別溯捆,用戶根據(jù)對可靠性和延遲的要求進行權(quán)衡丑搔,acks可以選擇以下幾個值:
    • 0producer不等待 broker 的 ack,這一操作提供了一個最低的延遲提揍,broker一接收到還沒有寫入磁盤就已經(jīng)返回啤月,當(dāng) broker 故障時有可能丟失數(shù)據(jù);
    • 1producer等待 broker 的 ack劳跃,partition的 leader 落盤成功后返回才ack谎仲,若在 follower同步成功之前 leader 故障,則將會丟失數(shù)據(jù)刨仑;
    • -1(all):producer等待 broker 的 ack郑诺,partition的 leader 和 ISR 的follower 全部落盤成功后才返回 ack。但若在 follower 同步完成后杉武,broker 發(fā)送 ack 之前辙诞, leader 發(fā)生故障,則會造成數(shù)據(jù)重復(fù)轻抱。
acks=1
acks=-1飞涂,可能造成數(shù)據(jù)重復(fù)寫入
  • 助記:返A(chǔ)CK前,0無落盤祈搜,1一落盤较店,-1全落盤,(落盤:消息存到本地)
  • kafka數(shù)據(jù)一致性
  • LEO(Log End Offset):每個副本的最后一個offset容燕。
  • HW(High Watermark):高水位梁呈,指消費者能見到的最大的offset,ISR 隊列中最小的 LEO缰趋。
  • follower故障:follower 發(fā)生故障后會被臨時踢出 ISR捧杉,待該 follower 恢復(fù)后, follower 會讀取本地磁盤記錄的上次的 HW秘血,并將 log 文件高于 HW 的部分截取掉味抖,從 HW 開始向 leader 進行同步;等該 follower 的 LEO 大于等于該 Partition 的 HW,即 follower 追上 leader 之后缔杉,就可以重新加入 ISR 了墩邀。
  • leader故障:leader 發(fā)生故障之后深胳,會從 ISR 中選出一個新的 leader熔脂,之后為保證多個副本之間的數(shù)據(jù)一致性佩研, 其余的 follower 會先將各自的 log 文件高于 HW 的部分截掉,然后從新的 leader同步數(shù)據(jù)霞揉。
  • 注意:這只能保證副本之間的數(shù)據(jù)一致性旬薯,并不能保證數(shù)據(jù)不丟失或者不重復(fù)。
  • 當(dāng)服務(wù)器的 ACK 級別設(shè)置為-1(all)适秩,可以保證 Producer 到 Server 之間不會丟失數(shù)據(jù)绊序,即At Least Once語義。當(dāng)服務(wù)器的 ACK 級別設(shè)置為 0秽荞,可以保證生產(chǎn)者每條消息只會被發(fā)送一次骤公,即At Most Once語義。
  • At Least Once可以保證數(shù)據(jù)不丟失扬跋,但是不能保證數(shù)據(jù)不重復(fù)阶捆。At Most Once可以保證數(shù)據(jù)不重復(fù),但是不能保證數(shù)據(jù)不丟失钦听。 對于一些非常重要的信息洒试,比如說交易數(shù)據(jù),下游數(shù)據(jù)消費者要求數(shù)據(jù)既不重復(fù)也不丟失彪见,即 Exactly Once語義儡司。
    • 在 0.11 版本以前的 Kafka,對此是無能為力的余指,只能保證數(shù)據(jù)不丟失捕犬,再在下游消費者對數(shù)據(jù)做全局去重。對于多個下游應(yīng)用的情況酵镜,每個都需要單獨做全局去重碉碉,這就對性能造成了很大影響。
    • 0.11 版本的 Kafka淮韭,引入了一項重大特性:冪等性垢粮。所謂的冪等性就是指 Producer 不論向 Server 發(fā)送多少次重復(fù)數(shù)據(jù), Server 端都只會持久化一條靠粪。冪等性結(jié)合At Least Once語義蜡吧,就構(gòu)成了 Kafka 的Exactly Once語義。即:At Least Once + 冪等性 = Exactly Once占键。
  • 要啟用冪等性昔善,只需要將 Producer 的參數(shù)中enable.idempotence設(shè)置為 true 即可。 Kafka的冪等性實現(xiàn)其實就是將原來下游需要做的去重放在了數(shù)據(jù)上游畔乙。開啟冪等性的 Producer 在初始化的時候會被分配一個 PID君仆,發(fā)往同一 Partition 的消息會附帶 Sequence Number。而Broker 端會對<PID, Partition, SeqNumber>做緩存,當(dāng)具有相同主鍵的消息提交時返咱, Broker 只會持久化一條(單分區(qū)單會話)钥庇。但是生產(chǎn)者重啟時PID 就會變化,同時不同的 Partition 也具有不同主鍵咖摹,所以冪等性無法保證跨分區(qū)跨會話的 Exactly Once评姨。
  • 消費者消費方式:consumer 采用 pull(拉) 模式從 broker 中讀取數(shù)據(jù)。
    • push(推)模式很難適應(yīng)消費速率不同的消費者楞艾,因為消息發(fā)送速率是由 broker 決定的参咙。它的目標(biāo)是盡可能以最快的速度傳遞消息,但是這樣很容易造成 consumer 來不及處理消息硫眯,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而 pull 模式則可以根據(jù) consumer 的消費能力以適當(dāng)?shù)乃俾氏M消息择同。
    • pull (拉)模式不足之處是两入,若 kafka 沒有數(shù)據(jù),則消費者可能會陷入循環(huán)中敲才,一直返回空數(shù)據(jù)裹纳。 針對這一點, Kafka 的消費者在消費數(shù)據(jù)時會傳入一個時長參數(shù) timeout紧武,若當(dāng)前沒有數(shù)據(jù)可供消費剃氧,則consumer 會等待一段時間之后再返回,這段時長即為 timeout阻星。
  • 消費者分區(qū)分配策略:一個 consumer group 中有多個 consumer朋鞍,一個 topic 有多個 partition,所以會涉及到 partition 的分配問題妥箕,即確定哪個 partition 由哪個 consumer 來消費滥酥。Kafka 有兩種分配策略:①round-robin循環(huán);②range畦幢。
  • 關(guān)于Roudn Robin重分配策略坎吻,其主要采用的是一種輪詢的方式來分配所有的分區(qū),主要實現(xiàn)的步驟如下:假設(shè)有三個topic:t0宇葱、t1和t2瘦真,這三個topic擁有的分區(qū)數(shù)分別為1、2和3黍瞧,則總共有六個分區(qū)诸尽,這六個分區(qū)分別為:t0-0、t1-0雷逆、t1-1弦讽、t2-0、t2-1和t2-2。又假設(shè)我們有三個consumer:C0往产、C1和C2被碗,它們訂閱的情況為:C0訂閱t0,C1訂閱t0和t1仿村,C2訂閱t0锐朴、t1和t2,則這些分區(qū)的分配步驟如下:
    • 首先將所有的partition和consumer按照字典序進行排序蔼囊,所謂的字典序焚志,就是按照其名稱的字符串順序,那么上面的六個分區(qū)和三個consumer排序之后分別為:t0-0畏鼓、t1-0酱酬、t1-1、t2-0云矫、t2-1膳沽、t2-2和C0、C1让禀、C2挑社。
    • 然后按順序輪詢的方式將這六個分區(qū)分配給三個consumer,若當(dāng)前consumer沒有訂閱當(dāng)前分區(qū)所在的topic巡揍,則輪詢地判斷下一個consumer:
      • 嘗試將t0-0分配給C0痛阻,由于C0訂閱了t0,因而可以分配成功腮敌;
      • 嘗試將t1-0分配給C1阱当,由于C1訂閱了t1,因而可以分配成功缀皱;
      • 嘗試將t1-1分配給C2斗这,由于C2訂閱了t1,因而可以分配成功啤斗;
      • 嘗試將t2-0分配給C0表箭,由于C0沒有訂閱t2,因而會輪詢下一個consumer钮莲;
      • 嘗試將t2-0分配給C1免钻,由于C1沒有訂閱t2,因而會輪詢下一個consumer崔拥;
      • 嘗試將t2-0分配給C2极舔,由于C2訂閱了t2,因而可以分配成功链瓦;
      • 同理由于t2-1和t2-2所在的topic都沒有被C0和C1訂閱拆魏,因而都不會分配成功盯桦,最終都會分配給C2。
      • 從上面的步驟分析可以看出渤刃,輪詢的策略就是簡單地將所有的partition和consumer按照字典序進行排序之后拥峦,然后依次將partition分配給各個consumer,若當(dāng)前的consumer沒有訂閱當(dāng)前的partition卖子,則會輪詢下一個consumer略号,直至最終將所有的分區(qū)都分配完畢。但是從上面的分配結(jié)果可以看出洋闽,輪詢的方式會導(dǎo)致每個consumer所承載的分區(qū)數(shù)量不一致玄柠,從而導(dǎo)致各個consumer壓力不均一。
      • 按照上述的步驟將所有的分區(qū)都分配完畢之后诫舅,最終分區(qū)的訂閱情況如下:
  • Range重分配策略:首先會計算各個consumer將會承載的分區(qū)數(shù)量羽利,然后將指定數(shù)量的分區(qū)分配給該consumer。假設(shè)有兩個consumer:C0和C1骚勘,兩個topic:t0和t1铐伴,這兩個topic分別都有三個分區(qū),總共的分區(qū)有六個:t0-0俏讹、t0-1、t0-2畜吊、t1-0泽疆、t1-1和t1-2,則這些分區(qū)的分配步驟如下:
    • 注意:Range策略是按照topic進行分區(qū)分配的玲献。拿主題t0來分析殉疼,其首先會獲取t0的所有分區(qū):t0-0、t0-1和t0-2捌年,以及所有訂閱了該topic的consumer:C0和C1瓢娜,并且會將這些分區(qū)和consumer按照字典序進行排序;
    • 然后按照平均分配的方式計算每個consumer會得到多少個分區(qū)礼预,若沒有除盡眠砾,則會將多出來的分區(qū)依次計算到前面幾個consumer。假設(shè)有3個分區(qū)和2個consumer托酸,則每個consumer至少會得到1個分區(qū)褒颈,因為3除以2后還余1,所以會將剩余的1個分區(qū)分配給第一個consumer励堡,最終C0從第0個分區(qū)開始分配2個分區(qū)谷丸,而C1從第2個分區(qū)開始分配1個分區(qū);
    • 同理应结,按照上面的步驟依次進行后面topic的分配刨疼,最終上面六個分區(qū)的分配情況如下所示:
  • 由于 consumer 在消費過程中可能會出現(xiàn)斷電宕機等故障, consumer 恢復(fù)后,需要從故障前的位置繼續(xù)消費揩慕,所以 consumer 需要實時記錄自己消費到了哪個 offset亭畜,以便故障恢復(fù)后繼續(xù)消費。
  • Kafka 0.9 版本之前漩绵, consumer 默認將 offset 保存在 Zookeeper 中贱案,從 0.9 版本開始,consumer 默認將 offset 保存在 Kafka 一個內(nèi)置的 topic 中止吐,該 topic 為__consumer_offsets宝踪。
  • 修改配置文件 consumer.propertiesexclude.internal.topics=false
  • 0.11.0.0 之前版本:
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper hadoop102:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
  • 0.11.0.0 及之后版本:
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper hadoop102:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
  • 消費者組案例:測試同一個消費者組中的消費者碍扔, 同一時刻只能有一個消費者消費一個消息瘩燥。
    • 修改consumer.properties文件中的 group.id 屬性:group.id=zhangsan
    • 創(chuàng)建主題:bigdata3
    • 啟動生產(chǎn)者
    • 分別啟動2個消費者
    • 在生產(chǎn)者窗口輸入消息,觀察兩個消費者窗口不同,會發(fā)現(xiàn)兩個消費者窗口中厉膀,只有一個才會彈出消息。
bin/kafka-topics.sh --zookeeper 192.168.211.147:2181 --create --topic bigdata3 --partitions 2 --replication-factor 1
--------------------------------------------------------
./bin/kafka-console-producer.sh --broker-list 192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092 --topic bigdata3
-------------------------------------------------------
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092 --topic bigdata3 --consumer.config ./config/consumer.properties
  • kafka高效讀寫數(shù)據(jù):
    • 順序?qū)懘疟P:Kafka 的 producer 生產(chǎn)數(shù)據(jù)二拐,要寫入到 log 文件中服鹅,寫的過程是一直追加到文件末端,為順序?qū)憽?官網(wǎng)有數(shù)據(jù)表明百新,同樣的磁盤企软,順序?qū)懩艿?600M/s,而隨機寫只有 100K/s饭望。這與磁盤的機械機構(gòu)有關(guān)仗哨,順序?qū)懼钥欤且驗槠涫∪チ舜罅看蓬^尋址的時間铅辞。
    • 零復(fù)制技術(shù):NIC network interface controller 網(wǎng)絡(luò)接口控制器厌漂。
  • zookeeper 在 kafka 中的作用:Kafka 集群中有一個 broker 會被選舉為controller,負責(zé)管理集群中 broker 的上下線斟珊,所有 topic 的分區(qū)副本分配和 leader 選舉等工作苇倡。controller 的管理工作都是依賴于 zookeeper 的。以下為 partition 的 leader 選舉過程:
  • Kafka 從 0.11 版本開始引入了事務(wù)支持倍宾。事務(wù)可以保證 Kafka 在 Exactly Once 語義的基礎(chǔ)上雏节,生產(chǎn)和消費可以跨分區(qū)和跨會話,要么全部成功高职,要么全部失敗钩乍。
  • Producer 事務(wù):
    • 為了實現(xiàn)跨分區(qū)跨會話的事務(wù),需要引入一個全局唯一的 Transaction ID怔锌,并將 Producer 獲得的PID 和Transaction ID 綁定寥粹。這樣當(dāng)Producer 重啟后就可以通過正在進行的 TransactionID 獲得原來的 PID变过。
    • 為了管理 Transaction, Kafka 引入了一個新的組件 Transaction Coordinator涝涤。 Producer 就是通過和 Transaction Coordinator 交互獲得 Transaction ID 對應(yīng)的任務(wù)狀態(tài)媚狰。 Transaction Coordinator 還負責(zé)將事務(wù)所有寫入 Kafka 的一個內(nèi)部 Topic,這樣即使整個服務(wù)重啟阔拳,由于事務(wù)狀態(tài)得到保存崭孤,進行中的事務(wù)狀態(tài)可以得到恢復(fù),從而繼續(xù)進行糊肠。
  • Consumer 事務(wù):無法保證 Commit 的信息被精確消費一次辨宠,這是由于 Consumer 可以通過 offset 訪問任意信息,而且不同的 Segment File 生命周期不同货裹,同一事務(wù)的消息可能會出現(xiàn)重啟后被刪除的情況嗤形。
  • 消息發(fā)送流程:Kafka 的 Producer 發(fā)送消息采用的是異步發(fā)送的方式。在消息發(fā)送的過程中弧圆,涉及到了兩個線程——main線程Sender線程赋兵,以及一個線程共享變量——RecordAccumulator。 main 線程將消息發(fā)送給 RecordAccumulator搔预, Sender 線程不斷從 RecordAccumulator 中拉取消息發(fā)送到 kafka broker霹期。
  • 相關(guān)參數(shù):①batch.size: 只有數(shù)據(jù)積累到 batch.size 之后, sender 才會發(fā)送數(shù)據(jù)拯田。②linger.ms:如果數(shù)據(jù)遲遲未達到 batch.size经伙, sender 等待 linger.time 之后就會發(fā)送數(shù)據(jù)。
  • kafka應(yīng)用實例:測試異步發(fā)送消息以及帶回調(diào)函數(shù)的生產(chǎn)者勿锅。
  • 導(dǎo)入依賴:
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>
  • MyProducer.java
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class MyProducer {
    /**
     * KafkaProducer:需要創(chuàng)建一個生產(chǎn)者對象,用來發(fā)送數(shù)據(jù)
     * ProducerConfig:獲取所需的一系列配置參數(shù)
     * ProducerRecord:每條數(shù)據(jù)都要封裝成一個 ProducerRecord 對象
     */
    public static void main(String[] args) {
        //1枣氧、創(chuàng)建kafka生產(chǎn)者的配置信息
        Properties props = new Properties();
        //2溢十、指定連接的kafka集群 broker-list
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092");
        //3、ACK應(yīng)答級別
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        //4达吞、重試次數(shù)
        props.put(ProducerConfig.RETRIES_CONFIG, 1);
        //5张弛、批次大小:16k
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //6酪劫、等待時間:1ms
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        //7吞鸭、RecordAccumulator 緩沖區(qū)大小
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        //8、指定序列化的類
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //9覆糟、創(chuàng)建生產(chǎn)者對象
        Producer<String, String> producer = new KafkaProducer<>(props);
        //10刻剥、發(fā)送數(shù)據(jù)
        /*for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<>("bigdata3", "test-" + i,
                    "test-" + i));
        }*/
        for (int i = 0; i < 10; i++) {
            /**
             * 回調(diào)函數(shù)會在 producer 收到 ack 時調(diào)用,且為異步調(diào)用滩字,
             * 該方法有兩個參數(shù)造虏,分別是 RecordMetadata 和 Exception御吞,
             * 若 Exception 為 null,則說明消息發(fā)送成功漓藕;否則說明消息發(fā)送失敗陶珠。
             * 注意:消息發(fā)送失敗會自動重試,不需要我們在回調(diào)函數(shù)中手動重試享钞。
             */
            producer.send(new ProducerRecord<>("bigdata3",
                    "test-" + i), (metadata, exception) -> {
                if (exception == null) {
                    System.out.println(metadata.partition() + " - " + metadata.offset());
                } else {
                    exception.printStackTrace();
                }
            });
        }
        //11揍诽、關(guān)閉資源
        producer.close();
    }
}
  • kafka應(yīng)用實例:測試自定義分區(qū)器的生產(chǎn)者。
  • MyPartitioner.java
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class MyPartitioner implements Partitioner {
    /**
     * 具體內(nèi)容填寫可參考默認分區(qū)器:org.apache.kafka.clients.producer.internals.DefaultPartitioner
     * 然后Producer配置中注冊使用:
     * props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
     */
    @Override
    public void configure(Map<String, ?> configs) {

    }
    //自定義存儲在哪個分區(qū)
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return 0;
    }
    @Override
    public void close() {
    }
}
  • kafka應(yīng)用實例:測試同步發(fā)送消息的生產(chǎn)者栗竖。
  • 同步發(fā)送的意思就是一條消息發(fā)送之后暑脆,會阻塞當(dāng)前線程,直至其返回 ack划滋。由于 send 方法返回的是一個 Future 對象饵筑,根據(jù) Futrue 對象的特點,只需再調(diào)用 Future 對象的 get 方法即可处坪。
  • SyncProducer.java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class SyncProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 1);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<>("bigdata3",
                    "test-" + i), (metadata, exception) -> {
                if (exception == null) {
                    System.out.println(metadata.partition() + " - " + metadata.offset());
                } else {
                    exception.printStackTrace();
                }
            }).get(); //調(diào)用future的get方法來實現(xiàn)同步發(fā)送消息
        }
        producer.close();
    }
}
  • kafka應(yīng)用實例:測試消費者消費消息根资。
  • MyConsumer.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class MyConsumer {
    /**
     * KafkaConsumer:需要創(chuàng)建一個消費者對象,用來消費數(shù)據(jù)
     * ConsumerConfig:獲取所需的一系列配置參數(shù)
     * ConsuemrRecord:每條數(shù)據(jù)都要封裝成一個 ConsumerRecord 對象
     */
    public static void main(String[] args) {
        //創(chuàng)建消費者的配置信息
        Properties props = new Properties();
        //連接的kafka集群同窘,--bootstrap-server
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092");
        //設(shè)置消費者組
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "zhangsan");
        //開啟自動提交offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        //自動提交offset的時間間隔
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //重置消費者的offset玄帕,注意需要換組名才能出現(xiàn) --from-beginning 的效果
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//默認值是 latest
        //key,value的反序列化類
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //訂閱主題
        consumer.subscribe(Collections.singletonList("bigdata3"));
        while (true) {
            //死循環(huán)獲取所有記錄
            ConsumerRecords<String, String> records = consumer.poll(100); //100ms
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d想邦,key = %s裤纹,value = %s\n", record.offset(), record.key(), record.value());
            }
        }
        //關(guān)閉連接
        //consumer.close();
    }
}
  • 雖然自動提交 offset 十分便利,但由于其是基于時間提交的丧没,開發(fā)人員難以把握offset 提交的時機鹰椒,因此 Kafka 還提供了手動提交 offset 的 API。手動提交 offset 的方法有兩種:①commitSync(同步提交)呕童;②commitAsync(異步提交)
    • 相同點:都會將本次 poll 的一批數(shù)據(jù)最高的偏移量提交漆际;
    • 不同點:commitSync 阻塞當(dāng)前線程,一直到提交成功夺饲,并且失敗會自動重試(由不可控因素導(dǎo)致奸汇,也會出現(xiàn)提交失敗)往声;而 commitAsync 則沒有失敗重試機制擂找,故有可能提交失敗。
  • SyncCommitOffset.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class SyncCommitOffset {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "zhangsan");
        //關(guān)閉自動提交offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//默認值是 latest
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("bigdata3"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100); //100ms
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d浩销,key = %s贯涎,value = %s\n", record.offset(), record.key(), record.value());
            }
            //同步提交,當(dāng)前線程會阻塞直到 offset 提交成功
            consumer.commitSync();
        }
        //關(guān)閉連接
        //consumer.close();
    }
}
  • 雖然同步提交 offset 更可靠一些撼嗓,但是由于其會阻塞當(dāng)前線程直到提交成功柬采,吞吐量會收到很大的影響欢唾,因此更多情況下,會選用異步提交 offset 的方式粉捻。
  • AsyncCommitOffset.java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
public class AsyncCommitOffset {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "zhangsan");
        //關(guān)閉自動提交offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//默認值是 latest
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("bigdata3"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100); //100ms
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d礁遣,key = %s,value = %s\n", record.offset(), record.key(), record.value());
            }
            // 異步提交
            consumer.commitAsync((offsets, exception) -> {
                if (exception != null) {
                    System.err.println("Commit failed for" + offsets);
                }
            });
        }
        //關(guān)閉連接
        //consumer.close();
    }
}
  • 無論是同步提交還是異步提交 offset肩刃,都有可能會造成數(shù)據(jù)的漏消費或者重復(fù)消費祟霍。先提交 offset 后消費,有可能造成數(shù)據(jù)的漏消費盈包;而先消費后提交 offset沸呐,有可能會造成數(shù)據(jù)的重復(fù)消費。
  • kafka 0.9 版本之前呢燥,offset 存儲在 zookeeper中崭添;0.9 版本及之后,默認將 offset 存儲在 kafka的一個內(nèi)置的 topic 中叛氨。除此之外呼渣, Kafka 還可以選擇自定義存儲 offset(需要借助 ConsumerRebalanceListener接口)。offset 的維護相當(dāng)繁瑣寞埠, 因為需要考慮到消費者的 Rebalance屁置。(當(dāng)有新的消費者加入消費者組、已有的消費者推出消費者組或者所訂閱的主題的分區(qū)發(fā)生變化仁连,就會觸發(fā)到分區(qū)的重新分配蓝角,重新分配的過程叫做 Rebalance。)
  • 消費者發(fā)生 Rebalance 之后饭冬,每個消費者消費的分區(qū)就會發(fā)生變化使鹅。因此消費者要首先獲取到自己被重新分配到的分區(qū),并且定位到每個分區(qū)最近提交的 offset 位置繼續(xù)消費昌抠。
  • CustomSaveOffset.java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class CustomSaveOffset {
    private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "zhangsan");
        //關(guān)閉自動提交offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//默認值是 latest
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("bigdata3"),
                new ConsumerRebalanceListener() {
                    // 該方法會在 Rebalance 之前調(diào)用
                    @Override
                    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                        commitOffset(currentOffset);
                    }

                    // 該方法會在 Rebalance 之后調(diào)用
                    @Override
                    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                        currentOffset.clear();
                        for (TopicPartition partition : partitions) {
                            //定位到最近提交的 offset 位置繼續(xù)消費
                            consumer.seek(partition, getOffset(partition));
                        }
                    }
                });
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100); //100ms
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d并徘,key = %s,value = %s\n", record.offset(), record.key(), record.value());
            }
            //異步提交
            commitOffset(currentOffset);
        }
        //關(guān)閉連接
        //consumer.close();
    }
    // 獲取某分區(qū)的最新 offset
    private static long getOffset(TopicPartition partition) {
        return 0;
    }
    // 提交該消費者所有分區(qū)的 offset(可將offset存入MySQL數(shù)據(jù)庫中)
    private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
    }
}
  • 攔截器原理:Producer 攔截器(interceptor)是在 kafka 0.10 版本被引入的扰魂,主要用于實現(xiàn) clients 端的定制化控制邏輯。對于 producer 而言蕴茴, interceptor 使得用戶在消息發(fā)送前以及 producer 回調(diào)邏輯前有機會對消息做一些定制化需求劝评,比如修改消息等。同時倦淀,producer 允許用戶指定多個 interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)蒋畜。Intercetpor 的實現(xiàn)接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:
    • configure(configs):獲取配置信息和初始化數(shù)據(jù)時調(diào)用撞叽。
    • onSend(ProducerRecord):該方法封裝進 KafkaProducer.send 方法中姻成,即它運行在用戶主線程中插龄。 Producer 確保在消息被序列化以及計算分區(qū)前調(diào)用該方法。 用戶可以在該方法中對消息做任何操作科展,但最好保證不要修改消息所屬的 topic 和分區(qū)均牢, 否則會影響目標(biāo)分區(qū)的計算。
    • onAcknowledgement(RecordMetadata, Exception):該方法會在消息從 RecordAccumulator 成功發(fā)送到 Kafka Broker 之后才睹,或者在發(fā)送過程中失敗時調(diào)用徘跪,并且通常在 producer 回調(diào)邏輯觸發(fā)之前。 onAcknowledgement 運行在producer 的 IO 線程中琅攘,因此不要在該方法中放入很多的邏輯垮庐,否則會拖慢 producer 的消息發(fā)送效率。
    • close():關(guān)閉 interceptor坞琴,主要用于執(zhí)行一些資源清理工作哨查。
  • 如上所述:interceptor 可能被運行在多個線程中,因此在具體實現(xiàn)時用戶需要自行確保線程安全剧辐。另外寒亥,倘若指定了多個 interceptor,則 producer 將按照指定順序調(diào)用它們浙于,并僅僅是捕獲每個 interceptor 可能拋出的異常記錄到錯誤日志中而非在向上傳遞护盈。
  • kafka應(yīng)用實例:實現(xiàn)一個簡單的雙 interceptor 組成的攔截鏈。
  • 需求:第一個 interceptor 會在消息發(fā)送前將時間戳信息加到消息 value 的最前部羞酗;第二個 interceptor 會在消息發(fā)送后更新成功發(fā)送消息數(shù)或失敗發(fā)送消息數(shù)腐宋。
  • TimeInterceptor.java
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
//時間戳攔截器
public class TimeInterceptor implements ProducerInterceptor<String, String> {
    @Override
    public void configure(Map<String, ?> configs) {
    }
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        //創(chuàng)建一個新的 record,把時間戳寫入消息體的最前部
        return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(),
                "TimeInterceptor: " + System.currentTimeMillis() + "," + record.value());
    }
    @Override
    public void close() {
    }
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

    }
}
  • CounterInterceptor.java
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
//統(tǒng)計發(fā)送消息成功和發(fā)送失敗消息數(shù),并在 producer 關(guān)閉時打印這兩個計數(shù)器
public class CounterInterceptor implements ProducerInterceptor<String, String> {
    private int errorCounter = 0;
    private int successCounter = 0;
    @Override
    public void configure(Map<String, ?> configs) {
    }
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return record;
    }
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        //統(tǒng)計成功和失敗的次數(shù)
        if (exception == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
    }
    @Override
    public void close() {
        //保存結(jié)果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
    }
}
  • InterceptorProducer.java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class InterceptorProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.211.147:9092,192.168.211.141:9092,192.168.211.156:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 1);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //構(gòu)建攔截鏈
        List<String> interceptors = new ArrayList<>();
        interceptors.add("com.zzw.interceptor.TimeInterceptor");
        interceptors.add("com.zzw.interceptor.CounterInterceptor");
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>("bigdata3", "zhangsan", "message-" + i);
            producer.send(record);
        }
        //一定要關(guān)閉 producer柜候,這樣才會調(diào)用 interceptor 的 close 方法
        producer.close();
    }
}
  • Kafka Eagle是開源可視化和管理軟件锦溪。它允許您查詢、可視化卫枝、提醒和探索您的指標(biāo),無論它們存儲在哪里讹挎。簡單地說校赤,它為您提供了將kafka集群數(shù)據(jù)轉(zhuǎn)換為漂亮的圖形和可視化的工具。
  • Kafka Eagle官方主頁筒溃、 Kafka Eagle下載頁面马篮、Kafka Eagle官方文檔
  • 安裝并配置Kafka Eagle的環(huán)境變量:
tar -zxvf kafka-eagle-bin-2.0.3.tar.gz
cd kafka-eagle-bin-2.0.3/
tar -zxvf kafka-eagle-web-2.0.3-bin.tar.gz -C /opt/eagle
-----------------------------------------------------------------
sudo vim /etc/profile
-----------------------------------------------------------------
export JAVA_HOME=/opt/jdk1.8
export KE_HOME=/opt/eagle
export PATH=$PATH:$KE_HOME/bin:$JAVA_HOME/bin
-----------------------------------------------------------------
source /etc/profile
  • 進入kafka-eagle的conf目錄下修改配置文件system-config.properties
######################################
# multi zookeeper & kafka cluster list (zookeeper地址)
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=192.168.211.147:2181,192.168.211.147:2182,192.168.211.147:2183

######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
cluster1.zk.acl.username=test
cluster1.zk.acl.password=test123

######################################
# broker size online list
######################################
cluster1.kafka.eagle.broker.size=20

######################################
# zk client thread limit  (zk線程數(shù))
######################################
kafka.zk.limit.size=25

######################################
# kafka eagle webui port  (kafka eagle的端口)
######################################
kafka.eagle.webui.port=8048

######################################
# kafka jmx acl and ssl authenticate
######################################
cluster1.kafka.eagle.jmx.acl=false
cluster1.kafka.eagle.jmx.user=keadmin
cluster1.kafka.eagle.jmx.password=keadmin123
cluster1.kafka.eagle.jmx.ssl=false
cluster1.kafka.eagle.jmx.truststore.location=/Users/dengjie/workspace/ssl/certificates/kafka.truststore
cluster1.kafka.eagle.jmx.truststore.password=ke123456

######################################
# kafka offset storage (kafka offset保存的位置)
######################################
cluster1.kafka.eagle.offset.storage=kafka

######################################
# kafka metrics, 15 days by default (開啟圖表)
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.metrics.retain=15

######################################
# kafka sql topic records max
######################################
kafka.eagle.sql.topic.records.max=5000

######################################
# delete kafka topic token
######################################
kafka.eagle.topic.token=keadmin

######################################
# kafka sasl authenticate
######################################
cluster1.kafka.eagle.sasl.enable=false
cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT
cluster1.kafka.eagle.sasl.mechanism=SCRAM-SHA-256
cluster1.kafka.eagle.sasl.jaas.co![20210109181652.png](https://upload-images.jianshu.io/upload_images/7810329-5a7c09b3311a2647.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
nfig=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle";
cluster1.kafka.eagle.sasl.client.id=
cluster1.kafka.eagle.blacklist.topics=
cluster1.kafka.eagle.sasl.cgroup.enable=false
cluster1.kafka.eagle.sasl.cgroup.topics=

######################################
# kafka mysql jdbc driver address (不用預(yù)先創(chuàng)建數(shù)據(jù)庫)
######################################
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456
  • 打開Kafka broker JMX:Kafka Eagle獲取監(jiān)控數(shù)據(jù)是通過JMX(Java Managent ExtenSion)來實現(xiàn)的,所以需要打開Kafka broker 的 JMX怜奖,可以直接修改kafka-server-start.sh文件:vi $KAFKA_HOME/bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
---------------------------------------------------------------------------
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    export JMX_PORT="9999"
fi
  • 接下來浑测,按順序進行以下操作:
    • 運行zookeeper集群;
    • 運行kafka集群之前歪玲,需要設(shè)置JMX_PORT迁央,否則Kafka Eagle 后臺提示連接失斨澜场;
    • 啟動Kafka Eagle服務(wù)器岖圈;
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
nohup bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &
./ke.sh start
啟動成功
登錄頁面
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末讹语,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子幅狮,更是在濱河造成了極大的恐慌募强,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件崇摄,死亡現(xiàn)場離奇詭異擎值,居然都是意外死亡,警方通過查閱死者的電腦和手機逐抑,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進店門鸠儿,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人厕氨,你說我怎么就攤上這事进每。” “怎么了命斧?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵田晚,是天一觀的道長。 經(jīng)常有香客問我国葬,道長贤徒,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任汇四,我火速辦了婚禮接奈,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘通孽。我一直安慰自己序宦,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布背苦。 她就那樣靜靜地躺著互捌,像睡著了一般。 火紅的嫁衣襯著肌膚如雪行剂。 梳的紋絲不亂的頭發(fā)上疫剃,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天,我揣著相機與錄音硼讽,去河邊找鬼。 笑死牲阁,一個胖子當(dāng)著我的面吹牛固阁,可吹牛的內(nèi)容都是我干的壤躲。 我是一名探鬼主播,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼备燃,長吁一口氣:“原來是場噩夢啊……” “哼碉克!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起并齐,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤漏麦,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后况褪,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體撕贞,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年测垛,在試婚紗的時候發(fā)現(xiàn)自己被綠了捏膨。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,117評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡食侮,死狀恐怖号涯,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情锯七,我是刑警寧澤链快,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布,位于F島的核電站眉尸,受9級特大地震影響域蜗,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜效五,卻給世界環(huán)境...
    茶點故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一地消、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧畏妖,春花似錦脉执、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至迅细,卻和暖如春巫橄,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背茵典。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工湘换, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓彩倚,卻偏偏與公主長得像筹我,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子帆离,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,877評論 2 345

推薦閱讀更多精彩內(nèi)容