一破喻、概述
Kafka是最初由Linkedin公司開發(fā)盖灸,是一個(gè)分布式、支持分區(qū)的(partition)届囚、多副本的(replica)有梆,基于zookeeper協(xié)調(diào)的分布式消息系統(tǒng),它的最大的特性就是可以實(shí)時(shí)的處理大量數(shù)據(jù)以滿足各種需求場(chǎng)景:比如基于hadoop的批處理系統(tǒng)意系、低延遲的實(shí)時(shí)系統(tǒng)泥耀、Storm/Spark流式處理引擎,web/nginx日志蛔添、訪問日志痰催,消息服務(wù)等等,用scala語言編寫迎瞧, Linkedin于2010年貢獻(xiàn)給了Apache基金會(huì)并成為頂級(jí)開源項(xiàng)目夸溶。
Kafka的使用場(chǎng)景
- 日志收集:一個(gè)公司可以用Kafka收集各種服務(wù)的log,通過kafka以統(tǒng)一接口服務(wù)的方式開放給各種 consumer凶硅,例如hadoop缝裁、Hbase、Solr等足绅。
- 消息系統(tǒng):解耦和生產(chǎn)者和消費(fèi)者捷绑、緩存消息等。
- 用戶活動(dòng)跟蹤:Kafka經(jīng)常被用來記錄web用戶或者app用戶的各種活動(dòng)氢妈,如瀏覽網(wǎng)頁粹污、搜索、點(diǎn)擊等活動(dòng)允懂,這些活動(dòng)信息被各個(gè)服務(wù)器發(fā)布到kafka的topic中厕怜,然后訂閱者通過訂閱這些topic來做實(shí)時(shí)的監(jiān)控分析衩匣,或者裝載到 hadoop蕾总、數(shù)據(jù)倉庫中做離線分析和挖掘。
- 運(yùn)營指標(biāo):Kafka也經(jīng)常用來記錄運(yùn)營監(jiān)控?cái)?shù)據(jù)琅捏。包括收集各種分布式應(yīng)用的數(shù)據(jù)生百,生產(chǎn)各種操作的集中反饋,比如報(bào)警和報(bào)告柄延。
Kafka基本概念
kafka是一個(gè)分布式的蚀浆,分區(qū)的消息(官方稱之為commit log)服務(wù)缀程。它提供一個(gè)消息系統(tǒng)應(yīng)該具備的功能,但是確有著獨(dú) 特的設(shè)計(jì)市俊⊙畲眨可以這樣來說,Kafka借鑒了JMS規(guī)范的思想摆昧,卻并沒有完全遵循JMS規(guī)范撩满。
名稱 | 解釋 |
---|---|
Broker | 消息中間件處理節(jié)點(diǎn),一個(gè)Kafka節(jié)點(diǎn)就是一個(gè)broker绅你,一個(gè)或者多個(gè)Broker可以組成一個(gè)Kafka集群 |
Topic | Kafka根據(jù)topic對(duì)消息進(jìn)行歸類伺帘,發(fā)布到Kafka集群的每條消息都需要指定一個(gè)topic |
Producer | 消息生產(chǎn)者,向Broker發(fā)送消息的客戶端 |
Consumer | 消息消費(fèi)者忌锯,從Broker讀取消息的客戶端 |
ConsumerGroup | 每個(gè)Consumer屬于一個(gè)特定的Consumer Group伪嫁,一條消息可以被多個(gè)不同的Consumer Group消費(fèi),但是一個(gè)Consumer Group中只能有一個(gè)Consumer能夠消費(fèi)該消息 |
Partition | 物理上的概念偶垮,一個(gè)topic可以分為多個(gè)partition张咳,每個(gè)partition內(nèi)部消息是有序的 |
因此,從一個(gè)較高的層面上來看似舵,producer通過網(wǎng)絡(luò)發(fā)送消息到Kafka集群晶伦,然后consumer來進(jìn)行消費(fèi),如下圖:
服務(wù)端(brokers)和客戶端(producer啄枕、consumer)之間通信通過TCP協(xié)議來完成婚陪。
二、kafka基本使用
由于Kafka是用Scala語言開發(fā)的频祝,運(yùn)行在JVM上泌参,因此在安裝Kafka之前需要先安裝JDK。kafka依賴zookeeper常空,所以需要先安裝zookeeper沽一。
修改配置文件config/server.properties:
#broker.id屬性在kafka集群中必須要是唯一
broker.id=0
#kafka部署的機(jī)器ip和提供服務(wù)的端口號(hào)
listeners=PLAINTEXT://192.168.65.60:9092
#kafka的消息存儲(chǔ)文件
log.dir=/usr/local/data/kafka-logs
#kafka連接zookeeper的地址
zookeeper.connect=192.168.65.60:2181
Property | Default | Description |
---|---|---|
broker.id | 0 | 每個(gè)broker都可以用一個(gè)唯一的非負(fù)整數(shù)id進(jìn)行標(biāo)識(shí);這個(gè)id可以作為broker的“名字”,你可以選擇任意你喜歡的數(shù)字作為id吏祸,只要id是唯一的即可横殴。 |
log.dirs | /tmp/kafka-logs | kafka存放數(shù)據(jù)的路徑。這個(gè)路徑并不是唯一的蝗蛙,可以是多個(gè),路徑之間只需要使用逗號(hào)分隔即可醉鳖;每當(dāng)創(chuàng)建新partition時(shí)捡硅,都會(huì)選擇在包含最少partitions的路徑下進(jìn)行。 |
listeners | PLAINTEXT://192.168.65.60:9092 | server接受客戶端連接的端口盗棵,ip配置kafka本機(jī)ip即可 |
zookeeper.connect | localhost:2181 | zooKeeper連接字符串的格式為:hostname:port壮韭,此處hostname和port分別是ZooKeeper集群中某個(gè)節(jié)點(diǎn)的host和port北发;zookeeper如果是集群,連接方式為 hostname1:port1, hostname2:port2, hostname3:port3log.retention.hours |
log.retention.hours | 168 | 每個(gè)日志文件刪除之前保存的時(shí)間喷屋。默認(rèn)數(shù)據(jù)保存時(shí)間對(duì)所有topic都一樣琳拨。num.partitions |
num.partitions | 1 | 創(chuàng)建topic的默認(rèn)分區(qū)數(shù)default.replication.factor |
default.replication.factor | 1 | 自動(dòng)創(chuàng)建topic的默認(rèn)副本數(shù)量,建議設(shè)置為大于等于2min.insync.replicas |
min.insync.replicas | 1 | 當(dāng)producer設(shè)置acks為-1時(shí)屯曹,min.insync.replicas指定replicas的最小數(shù)目(必須確認(rèn)每一個(gè)repica的寫數(shù)據(jù)都是成功的)从绘,如果這個(gè)數(shù)目沒有達(dá)到,producer發(fā)送消息會(huì)產(chǎn)生異常delete.topic.enable |
delete.topic.enablefalse | false | 是否允許刪除主題 |
創(chuàng)建主題
現(xiàn)在我們來創(chuàng)建一個(gè)名字為“test”的Topic是牢,這個(gè)topic只有一個(gè)partition僵井,并且備份因子也設(shè)置為1.
bin/kafka-topics.sh --create --zookeeper 192.168.65.60:2181 --replication-factor 1 --partitions 1 --topic test
現(xiàn)在我們可以通過以下命令來查看kafka中目前存在的topic
bin/kafka-topics.sh --list --zookeeper 192.168.65.60:2181
除了我們通過手工的方式創(chuàng)建Topic,當(dāng)producer發(fā)布一個(gè)消息到某個(gè)指定的Topic驳棱,這個(gè)Topic如果不存在批什,就自動(dòng)創(chuàng)建。
刪除主題
bin/kafka-topics.sh --delete --topic test --zookeeper 192.168.65.60:2181
發(fā)送消息
kafka自帶了一個(gè)producer命令客戶端社搅,可以從本地文件中讀取內(nèi)容驻债,或者我們也可以以命令行中直接輸入內(nèi)容,并將這些內(nèi)容以消息的形式發(fā)送到kafka集群中形葬。在默認(rèn)情況下合呐,每一個(gè)行會(huì)被當(dāng)做成一個(gè)獨(dú)立的消息。
首先我們要運(yùn)行發(fā)布消息的腳本笙以,然后在命令中輸入要發(fā)送的消息的內(nèi)容:
bin/kafka-console-producer.sh --broker-list 192.168.65.60:9092 --topic test
>this is a msg
>this is a another msg
消費(fèi)消息
對(duì)于consumer淌实,kafka同樣也攜帶了一個(gè)命令行客戶端,會(huì)將獲取到內(nèi)容在命令中進(jìn)行輸出猖腕,默認(rèn)是消費(fèi)最新的消息:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --topic test
如果想要消費(fèi)之前的消息可以通過--from-beginning參數(shù)指定拆祈,如下命令:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --from-beginning --topic test
如果你是通過不同的終端窗口來運(yùn)行以上的命令,你將會(huì)看到在producer終端輸入的內(nèi)容倘感,很快就會(huì)在consumer的終端窗口上顯示出來放坏。
以上所有的命令都有一些附加的選項(xiàng);當(dāng)我們不攜帶任何參數(shù)運(yùn)行命令的時(shí)候老玛,將會(huì)顯示出這個(gè)命令的詳細(xì)用法淤年。
消費(fèi)多主題
bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --whitelist "test|test-2"
單播消費(fèi)
一條消息只能被某一個(gè)消費(fèi)者消費(fèi)的模式,類似queue模式蜡豹,只需讓所有消費(fèi)者在同一個(gè)消費(fèi)組里即可麸粮,分別在兩個(gè)客戶端執(zhí)行如下消費(fèi)命令,然后往主題里發(fā)送消息余素,結(jié)果只有一個(gè)客戶端能收到消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --consumer-property group.id=testGroup --topic test
多播消費(fèi)
一條消息能被多個(gè)消費(fèi)者消費(fèi)的模式豹休,類似publish-subscribe模式費(fèi)炊昆,針對(duì)Kafka同一條消息只能被同一個(gè)消費(fèi)組下的某一個(gè)消費(fèi)者消費(fèi)的特性桨吊,要實(shí)現(xiàn)多播只要保證這些消費(fèi)者屬于不同的消費(fèi)組即可威根。我們?cè)僭黾右粋€(gè)消費(fèi)者,該消費(fèi)者屬于testGroup-2消費(fèi)組视乐,結(jié)果兩個(gè)客戶端都能收到消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.65.60:9092 --consumer-property group.id=testGroup-2 --topic test
- 查看消費(fèi)組名
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.65.60:9092 --list
- 查看消費(fèi)組的消費(fèi)偏移量
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.65.60:9092 --describe --group testGroup
current-offset:當(dāng)前消費(fèi)組的已消費(fèi)偏移量
log-end-offset:主題對(duì)應(yīng)分區(qū)消息的結(jié)束偏移量(HW)
lag:當(dāng)前消費(fèi)組未消費(fèi)的消息數(shù)
三洛搀、主題Topic和消息日志Log
可以理解Topic是一個(gè)類別的名稱,同類消息發(fā)送到同一個(gè)Topic下面佑淀。對(duì)于每一個(gè)Topic留美,下面可以有多個(gè)分區(qū)(Partition)日志文件:
Partition是一個(gè)有序的message序列,這些message按順序添加到一個(gè)叫做commit log的文件中伸刃。每個(gè)partition中的消息都有一個(gè)唯一的編號(hào)谎砾,稱之為offset,用來唯一標(biāo)示某個(gè)分區(qū)中的message捧颅。
每個(gè)partition景图,都對(duì)應(yīng)一個(gè)commit log文件。一個(gè)partition中的message的offset都是唯一的碉哑,但是不同的partition中的message的offset可能是相同的挚币。
kafka一般不會(huì)刪除消息,不管這些消息有沒有被消費(fèi)扣典。只會(huì)根據(jù)配置的日志保留時(shí)間(log.retention.hours)確認(rèn)消息多久被刪除妆毕,默認(rèn)保留最近一周的日志消息。kafka的性能與保留的消息數(shù)據(jù)量大小沒有關(guān)系贮尖,因此保存大量的數(shù)據(jù)消息日志信息不會(huì)有什么影響笛粘。
每個(gè)consumer是基于自己在commit log中的消費(fèi)進(jìn)度(offset)來進(jìn)行工作的。在kafka中湿硝,消費(fèi)offset由consumer自己來維護(hù)闰蛔;一般情況下我們按照順序逐條消費(fèi)commit log中的消息,當(dāng)然我可以通過指定offset來重復(fù)消費(fèi)某些消息图柏,或者跳過某些消息序六。
這意味kafka中的consumer對(duì)集群的影響是非常小的,添加一個(gè)或者減少一個(gè)consumer蚤吹,對(duì)于集群或者其他consumer來說例诀,都是沒有影響的,因?yàn)槊總€(gè)consumer維護(hù)各自的消費(fèi)offset裁着。
創(chuàng)建多個(gè)分區(qū)的主題:
bin/kafka-topics.sh --create --zookeeper 192.168.65.60:2181 --replication-factor 1 --partitions 2 --topic test1
查看下topic的情況
bin/kafka-topics.sh --describe --zookeeper 192.168.65.60:2181 --topic test1
以下是輸出內(nèi)容的解釋繁涂,第一行是所有分區(qū)的概要信息,之后的每一行表示每一個(gè)partition的信息二驰。
- leader節(jié)點(diǎn)負(fù)責(zé)給定partition的所有讀寫請(qǐng)求扔罪。
- replicas 表示某個(gè)partition在哪幾個(gè)broker上存在備份。不管這個(gè)幾點(diǎn)是不是”leader“桶雀,甚至這個(gè)節(jié)點(diǎn)掛了矿酵,也會(huì)列出唬复。
- isr 是replicas的一個(gè)子集,它只列出當(dāng)前還存活著的全肮,并且已同步備份了該partition的節(jié)點(diǎn)敞咧。
我們可以運(yùn)行相同的命令查看之前創(chuàng)建的名稱為”test“的topic
為什么要對(duì)Topic下數(shù)據(jù)進(jìn)行分區(qū)存儲(chǔ)?
- commit log文件會(huì)受到所在機(jī)器的文件系統(tǒng)大小的限制辜腺,分區(qū)之后可以將不同的分區(qū)放在不同的機(jī)器上休建,相當(dāng)于對(duì)數(shù)據(jù)做了分布式存儲(chǔ),理論上一個(gè)topic可以處理任意數(shù)量的數(shù)據(jù)评疗。
- 為了提高并行度测砂。
四、kafka集群
對(duì)于kafka來說百匆,一個(gè)單獨(dú)的broker意味著kafka集群中只有一個(gè)節(jié)點(diǎn)邑彪。要想增加kafka集群中的節(jié)點(diǎn)數(shù)量,只需要多啟動(dòng)幾個(gè)broker實(shí)例即可胧华。
配置文件的需要修改的內(nèi)容分別如下:
config/server-1.properties:
#broker.id屬性在kafka集群中必須要是唯一
broker.id=1
#kafka部署的機(jī)器ip和提供服務(wù)的端口號(hào)
listeners=PLAINTEXT://192.168.65.60:9093
log.dir=/usr/local/data/kafka-logs-1
#kafka連接zookeeper的地址寄症,要把多個(gè)kafka實(shí)例組成集群,對(duì)應(yīng)連接的zookeeper必須相同
zookeeper.connect=192.168.65.60:2181
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://192.168.65.60:9094
log.dir=/usr/local/data/kafka-logs-2
zookeeper.connect=192.168.65.60:2181
kafka將很多集群關(guān)鍵信息記錄在zookeeper里矩动,保證自己的無狀態(tài)有巧,從而在水平擴(kuò)容時(shí)非常方便。
五悲没、集群消費(fèi)
log的partitions分布在kafka集群中不同的broker上篮迎,每個(gè)broker可以請(qǐng)求備份其他broker上partition上的數(shù)據(jù)。kafka集群支持配置一個(gè)partition備份的數(shù)量示姿。
針對(duì)每個(gè)partition甜橱,都有一個(gè)broker起到“l(fā)eader”的作用,0個(gè)或多個(gè)其他的broker作為“follwers”的作用栈戳。leader處理所有的針對(duì)這個(gè)partition的讀寫請(qǐng)求岂傲,而followers被動(dòng)復(fù)制leader的結(jié)果,不提供讀寫(主要是為了保證多副本數(shù)據(jù)與消費(fèi)的一致性)子檀。如果這個(gè)leader失效了镊掖,其中的一個(gè)follower將會(huì)自動(dòng)的變成新的leader。
Producers
生產(chǎn)者將消息發(fā)送到topic中去褂痰,同時(shí)負(fù)責(zé)選擇將message發(fā)送到topic的哪一個(gè)partition中亩进。通過round-robin做簡單的負(fù)載均衡。也可以根據(jù)消息中的某一個(gè)關(guān)鍵字來進(jìn)行區(qū)分缩歪。通常第二種方式使用的更多归薛。
Consumers
傳統(tǒng)的消息傳遞模式有2種:隊(duì)列( queue) 和(publish-subscribe)
- queue模式:多個(gè)consumer從服務(wù)器中讀取數(shù)據(jù),消息只會(huì)到達(dá)一個(gè)consumer。
- publish-subscribe模式:消息會(huì)被廣播給所有的consumer主籍。
Kafka基于這2種模式提供了一種consumer的抽象概念:consumer group习贫。
- queue模式:所有的consumer都位于同一個(gè)consumer group 下。
- publish-subscribe模式:所有的consumer都有著自己唯一的consumer group崇猫。
上圖說明:由2個(gè)broker組成的kafka集群沈条,某個(gè)主題總共有4個(gè)partition(P0-P3)需忿,分別位于不同的broker上诅炉。這個(gè)集群由2個(gè)Consumer Group消費(fèi), A有2個(gè)consumer instances 屋厘,B有4個(gè)涕烧。
通常一個(gè)topic會(huì)有幾個(gè)consumer group,每個(gè)consumer group都是一個(gè)邏輯上的訂閱者( logical subscriber )汗洒。每個(gè)consumer group由多個(gè)consumer instance組成议纯,從而達(dá)到可擴(kuò)展和容災(zāi)的功能。
消費(fèi)順序
一個(gè)partition同一個(gè)時(shí)刻在一個(gè)consumer group中只能有一個(gè)consumer instance在消費(fèi)溢谤,從而保證消費(fèi)順序瞻凤。
consumer group中的consumer instance的數(shù)量不能比一個(gè)Topic中的partition的數(shù)量多,否則世杀,多出來的consumer消費(fèi)不到消息阀参。
Kafka只在partition的范圍內(nèi)保證消息消費(fèi)的局部順序性,不能在同一個(gè)topic中的多個(gè)partition中保證總的消費(fèi)順序性瞻坝。
如果有在總體上保證消費(fèi)順序的需求蛛壳,那么我們可以通過將topic的partition數(shù)量設(shè)置為1,將consumer group中的consumer instance數(shù)量也設(shè)置為1所刀,但是這樣會(huì)影響性能衙荐,所以kafka的順序消費(fèi)很少用。
六浮创、Java客戶端訪問Kafka
引入maven依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
package com.tuling.kafka.kafkaDemo;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class MsgProducer {
private final static String TOPIC_NAME = "my-replicated-topic";
public static void main(String[] args) throws InterruptedException, ExecutionException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");
/*
發(fā)出消息持久化機(jī)制參數(shù)
(1)acks=0: 表示producer不需要等待任何broker確認(rèn)收到消息的回復(fù)忧吟,就可以繼續(xù)發(fā)送下一條消息。性能最高斩披,但是最容易丟消息瀑罗。
(2)acks=1: 至少要等待leader已經(jīng)成功將數(shù)據(jù)寫入本地log,但是不需要等待所有follower是否成功寫入雏掠。就可以繼續(xù)發(fā)送下一
條消息斩祭。這種情況下,如果follower沒有成功備份數(shù)據(jù)乡话,而此時(shí)leader又掛掉摧玫,則消息會(huì)丟失。
(3)acks=-1或all: 需要等待 min.insync.replicas(默認(rèn)為1,推薦配置大于等于2) 這個(gè)參數(shù)配置的副本個(gè)數(shù)都成功寫入日志诬像,這種策略會(huì)保證
只要有一個(gè)備份存活就不會(huì)丟失數(shù)據(jù)屋群。這是最強(qiáng)的數(shù)據(jù)保證。一般除非是金融級(jí)別坏挠,或跟錢打交道的場(chǎng)景才會(huì)使用這種配置芍躏。
*/
props.put(ProducerConfig.ACKS_CONFIG, "1");
/*
發(fā)送失敗會(huì)重試,默認(rèn)重試間隔100ms降狠,重試能保證消息發(fā)送的可靠性对竣,但是也可能造成消息重復(fù)發(fā)送,比如網(wǎng)絡(luò)抖動(dòng)榜配,所以需要在
接收者那邊做好消息接收的冪等性處理
*/
props.put(ProducerConfig.RETRIES_CONFIG, 3);
//重試間隔設(shè)置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
//設(shè)置發(fā)送消息的本地緩沖區(qū)否纬,如果設(shè)置了該緩沖區(qū),消息會(huì)先發(fā)送到本地緩沖區(qū)蛋褥,可以提高消息發(fā)送性能临燃,默認(rèn)值是33554432,即32MB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
/*
kafka本地線程會(huì)從緩沖區(qū)取數(shù)據(jù)烙心,批量發(fā)送到broker膜廊,
設(shè)置批量發(fā)送消息的大小,默認(rèn)值是16384淫茵,即16kb爪瓜,就是說一個(gè)batch滿了16kb就發(fā)送出去
*/
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
/*
默認(rèn)值是0,意思就是消息必須立即被發(fā)送痘昌,但這樣會(huì)影響性能
一般設(shè)置10毫秒左右钥勋,就是說這個(gè)消息發(fā)送完后會(huì)進(jìn)入本地的一個(gè)batch,如果10毫秒內(nèi)辆苔,這個(gè)batch滿了16kb就會(huì)隨batch一起被發(fā)送出去
如果10毫秒內(nèi)算灸,batch沒滿,那么也必須把消息發(fā)送出去驻啤,不能讓消息的發(fā)送延遲時(shí)間太長
*/
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
//把發(fā)送的key從字符串序列化為字節(jié)數(shù)組
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//把發(fā)送消息value從字符串序列化為字節(jié)數(shù)組
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<String, String>(props);
int msgNum = 5;
final CountDownLatch countDownLatch = new CountDownLatch(msgNum);
for (int i = 1; i <= msgNum; i++) {
Order order = new Order(i, 100 + i, 1, 1000.00);
//指定發(fā)送分區(qū)
/*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
, 0, order.getOrderId().toString(), JSON.toJSONString(order));*/
//未指定發(fā)送分區(qū)菲驴,具體發(fā)送的分區(qū)計(jì)算公式:hash(key)%partitionNum
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
, order.getOrderId().toString(), JSON.toJSONString(order));
//等待消息發(fā)送成功的同步阻塞方法
/*RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式發(fā)送消息結(jié)果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());*/
//異步回調(diào)方式發(fā)送消息
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("發(fā)送消息失敗:" + exception.getStackTrace());
}
if (metadata != null) {
System.out.println("異步方式發(fā)送消息結(jié)果:" + "topic-" + metadata.topic() + "|partition-"
+ metadata.partition() + "|offset-" + metadata.offset());
}
countDownLatch.countDown();
}
});
//送積分 TODO
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.close();
}
}
package com.tuling.kafka.kafkaDemo;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class MsgConsumer {
private final static String TOPIC_NAME = "my-replicated-topic";
private final static String CONSUMER_GROUP_NAME = "testGroup";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");
// 消費(fèi)分組名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
// 是否自動(dòng)提交offset骑冗,默認(rèn)就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自動(dòng)提交offset的間隔時(shí)間
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
/*
當(dāng)消費(fèi)主題的是一個(gè)新的消費(fèi)組赊瞬,或者指定offset的消費(fèi)方式,offset不存在贼涩,那么應(yīng)該如何消費(fèi)
latest(默認(rèn)) :只消費(fèi)自己啟動(dòng)之后發(fā)送到主題的消息
earliest:第一次從頭開始消費(fèi)巧涧,以后按照消費(fèi)offset記錄繼續(xù)消費(fèi),這個(gè)需要區(qū)別于consumer.seekToBeginning(每次都從頭開始消費(fèi))
*/
//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
/*
consumer給broker發(fā)送心跳的間隔時(shí)間遥倦,broker接收到心跳如果此時(shí)有rebalance發(fā)生會(huì)通過心跳響應(yīng)將
rebalance方案下發(fā)給consumer谤绳,這個(gè)時(shí)間可以稍微短一點(diǎn)
*/
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
/*
服務(wù)端broker多久感知不到一個(gè)consumer心跳就認(rèn)為他故障了,會(huì)將其踢出消費(fèi)組,
對(duì)應(yīng)的Partition也會(huì)被重新分配給其他consumer缩筛,默認(rèn)是10秒
*/
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
//一次poll最大拉取消息的條數(shù)消略,如果消費(fèi)者處理速度很快,可以設(shè)置大點(diǎn)瞎抛,如果處理速度一般艺演,可以設(shè)置小點(diǎn)
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
/*
如果兩次poll操作間隔超過了這個(gè)時(shí)間,broker就會(huì)認(rèn)為這個(gè)consumer處理能力太弱桐臊,
會(huì)將其踢出消費(fèi)組胎撤,將分區(qū)分配給別的consumer消費(fèi)
*/
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
// 消費(fèi)指定分區(qū)
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
//消息回溯消費(fèi)
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
//指定offset消費(fèi)
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
//從指定時(shí)間點(diǎn)開始消費(fèi)
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
//從1小時(shí)前開始消費(fèi)
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {
map.put(new TopicPartition(topicName, par.partition()), fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
TopicPartition key = entry.getKey();
OffsetAndTimestamp value = entry.getValue();
if (key == null || value == null) continue;
Long offset = value.offset();
System.out.println("partition-" + key.partition() + "|offset-" + offset);
System.out.println();
//根據(jù)消費(fèi)里的timestamp確定offset
if (value != null) {
consumer.assign(Arrays.asList(key));
consumer.seek(key, offset);
}
}
while (true) {
/*
* poll() API 是拉取消息的長輪詢
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
/*if (records.count() > 0) {
// 手動(dòng)同步提交offset,當(dāng)前線程會(huì)阻塞直到offset提交成功
// 一般使用同步提交豪硅,因?yàn)樘峤恢笠话阋矝]有什么邏輯代碼了
consumer.commitSync();
// 手動(dòng)異步提交offset哩照,當(dāng)前線程提交offset不會(huì)阻塞挺物,可以繼續(xù)處理后面的程序邏輯
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " + exception.getStackTrace());
}
}
});
}*/
}
}
}
七懒浮、Spring Boot整合Kafka
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application.yml配置如下:
server:
port: 8080
spring:
kafka:
bootstrap-servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094
producer: # 生產(chǎn)者
retries: 3 # 設(shè)置大于0的值,則客戶端會(huì)將發(fā)送失敗的記錄重新發(fā)送
batch-size: 16384
buffer-memory: 33554432
acks: 1
# 指定消息key和消息體的編解碼方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 當(dāng)每一條記錄被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后提交
# RECORD
# 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后提交
# BATCH
# 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后识藤,距離上次提交時(shí)間大于TIME時(shí)提交
# TIME
# 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后砚著,被處理record數(shù)量大于等于COUNT時(shí)提交
# COUNT
# TIME | COUNT 有一個(gè)條件滿足時(shí)提交
# COUNT_TIME
# 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后, 手動(dòng)調(diào)用Acknowledgment.acknowledge()后提交
# MANUAL
# 手動(dòng)調(diào)用Acknowledgment.acknowledge()后立即提交,一般使用這種
# MANUAL_IMMEDIATE
ack-mode: manual_immediate
package com.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
private final static String TOPIC_NAME = "my-replicated-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/send")
public void send() {
kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");
}
}
package com.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class MyConsumer {
/**
* @KafkaListener(groupId = "testGroup", topicPartitions = {
* @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
* @TopicPartition(topic = "topic2", partitions = "0",
* partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
* },concurrency = "6")
* //concurrency就是同組下的消費(fèi)者個(gè)數(shù)痴昧,就是并發(fā)消費(fèi)數(shù)稽穆,必須小于等于分區(qū)總數(shù)
* @param record
*/
@KafkaListener(topics = "my-replicated-topic",groupId = "zhugeGroup")
public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//手動(dòng)提交offset
ack.acknowledge();
}
/*//配置多個(gè)消費(fèi)組
@KafkaListener(topics = "my-replicated-topic",groupId = "tulingGroup")
public void listenTulingGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
ack.acknowledge();
}*/
}
八、Kafka核心總控制器Controller
在Kafka集群中會(huì)有一個(gè)或者多個(gè)broker赶撰,其中有一個(gè)broker會(huì)被選舉為控制器(Kafka Controller)舌镶,它負(fù)責(zé)管理整個(gè)集群中所有分區(qū)和副本的狀態(tài)。
- 當(dāng)某個(gè)分區(qū)的leader副本出現(xiàn)故障時(shí)豪娜,由控制器負(fù)責(zé)為該分區(qū)選舉新的leader副本餐胀。
- 當(dāng)檢測(cè)到某個(gè)分區(qū)的ISR集合發(fā)生變化時(shí),由控制器負(fù)責(zé)通知所有broker更新其元數(shù)據(jù)信息瘤载。
- 當(dāng)使用kafka-topics.sh腳本為某個(gè)topic增加分區(qū)數(shù)量時(shí)否灾,同樣還是由控制器負(fù)責(zé)讓新分區(qū)被其他節(jié)點(diǎn)感知到。
Controller選舉機(jī)制
在kafka集群啟動(dòng)的時(shí)候鸣奔,會(huì)自動(dòng)選舉一臺(tái)broker作為controller來管理整個(gè)集群墨技,選舉的過程是集群中每個(gè)broker都會(huì)嘗試在zookeeper上創(chuàng)建一個(gè) /controller 臨時(shí)節(jié)點(diǎn),zookeeper會(huì)保證有且僅有一個(gè)broker能創(chuàng)建成功挎狸,這個(gè)broker就會(huì)成為集群的總控器controller扣汪。
當(dāng)這個(gè)controller角色的broker宕機(jī)了,此時(shí)zookeeper臨時(shí)節(jié)點(diǎn)會(huì)消失锨匆,集群里其他broker會(huì)一直監(jiān)聽這個(gè)臨時(shí)節(jié)點(diǎn)崭别,發(fā)現(xiàn)臨時(shí)節(jié)點(diǎn)消失了,就競爭再次創(chuàng)建臨時(shí)節(jié)點(diǎn),就是我們上面說的選舉機(jī)制紊遵,zookeeper又會(huì)保證有一個(gè)broker成為新的controller账千。
具備控制器身份的broker需要比其他普通的broker多一份職責(zé),具體細(xì)節(jié)如下:
- 監(jiān)聽broker相關(guān)的變化暗膜。為Zookeeper中的/brokers/ids/節(jié)點(diǎn)添加BrokerChangeListener匀奏,用來處理broker增減的變化。
- 監(jiān)聽topic相關(guān)的變化学搜。為Zookeeper中的/brokers/topics節(jié)點(diǎn)添加TopicChangeListener娃善,用來處理topic增減的變化;為Zookeeper中的/admin/delete_topics節(jié)點(diǎn)添加TopicDeletionListener瑞佩,用來處理刪除topic的動(dòng)作聚磺。
- 從Zookeeper中讀取獲取當(dāng)前所有與topic、partition以及broker有關(guān)的信息并進(jìn)行相應(yīng)的管理炬丸。對(duì)于所有topic所對(duì)應(yīng)的Zookeeper中的/brokers/topics/[topic]節(jié)點(diǎn)添加PartitionModificationsListener瘫寝,用來監(jiān)聽topic中的分區(qū)分配變化。
- 更新集群的元數(shù)據(jù)信息稠炬,同步到其他普通的broker節(jié)點(diǎn)中焕阿。
Partition副本選舉Leader機(jī)制
controller感知到分區(qū)leader所在的broker掛了(controller監(jiān)聽了很多zk節(jié)點(diǎn)可以感知到broker存活),controller會(huì)從ISR列表(參數(shù)unclean.leader.election.enable=false的前提下)里挑第一個(gè)broker作為leader(第一個(gè)broker最先放進(jìn)ISR列表首启,可能是同步數(shù)據(jù)最多的副本)暮屡,如果參數(shù)unclean.leader.election.enable為true,代表在ISR列表里所有副本都掛了的時(shí)候可以在ISR列表以外的副本中選leader毅桃,這種設(shè)置褒纲,可以提高可用性,但是選出的新leader有可能數(shù)據(jù)少很多钥飞。
副本進(jìn)入ISR列表有兩個(gè)條件:
- 副本節(jié)點(diǎn)不能產(chǎn)生分區(qū)莺掠,必須能與zookeeper保持會(huì)話以及跟leader副本網(wǎng)絡(luò)連通
- 副本能復(fù)制leader上的所有寫操作,并且不能落后太多代承。(與leader副本同步滯后的副本汁蝶,是由 replica.lag.time.max.ms 配置決定的,超過這個(gè)時(shí)間都沒有跟leader同步過的一次的副本會(huì)被移出ISR列表)
消費(fèi)者消費(fèi)消息的offset記錄機(jī)制
- 每個(gè)consumer會(huì)定期將自己消費(fèi)分區(qū)的offset提交給kafka內(nèi)部topic:__consumer_offsets论悴,提交過去的時(shí)候掖棉,key是consumerGroupId+topic+分區(qū)號(hào),value就是當(dāng)前offset的值膀估,kafka會(huì)定期清理topic里的消息幔亥,最后就保留最新的那條數(shù)據(jù)
- 因?yàn)開_consumer_offsets可能會(huì)接收高并發(fā)的請(qǐng)求,kafka默認(rèn)給其分配50個(gè)分區(qū)(可以通過offsets.topic.num.partitions設(shè)置)察纯,這樣可以通過加機(jī)器的方式抗大并發(fā)帕棉。
- 通過如下公式可以選出consumer消費(fèi)的offset要提交到__consumer_offsets的哪個(gè)分區(qū)针肥,公式:hash(consumerGroupId) % __consumer_offsets主題的分區(qū)數(shù)。
消費(fèi)者Rebalance機(jī)制
rebalance就是說如果消費(fèi)組里的消費(fèi)者數(shù)量有變化或消費(fèi)的分區(qū)數(shù)有變化香伴,kafka會(huì)重新分配消費(fèi)者消費(fèi)分區(qū)的關(guān)系慰枕。比如consumer group中某個(gè)消費(fèi)者掛了,此時(shí)會(huì)自動(dòng)把分配給他的分區(qū)交給其他的消費(fèi)者即纲,如果他又重啟了具帮,那么又會(huì)把一些分區(qū)重新交還給他。
注意:rebalance只針對(duì)subscribe這種不指定分區(qū)消費(fèi)的情況低斋,如果通過assign這種消費(fèi)方式指定了分區(qū)蜂厅,kafka不會(huì)進(jìn)行rebanlance。
如下情況可能會(huì)觸發(fā)消費(fèi)者rebalance
- 消費(fèi)組里的consumer增加或減少了
- 動(dòng)態(tài)給topic增加了分區(qū)
- 消費(fèi)組訂閱了更多的topic
rebalance過程中膊畴,消費(fèi)者無法從kafka消費(fèi)消息掘猿,這對(duì)kafka的TPS會(huì)有影響,如果kafka集群內(nèi)節(jié)點(diǎn)較多唇跨,比如數(shù)百個(gè)稠通,那重平衡可能會(huì)耗時(shí)極多,所以應(yīng)盡量避免在系統(tǒng)高峰期的重平衡發(fā)生轻绞。
消費(fèi)者Rebalance分區(qū)分配策略
- 主要有三種rebalance的策略:range采记、round-robin佣耐、sticky政勃。
- Kafka 提供了消費(fèi)者客戶端參數(shù)partition.assignment.strategy 來設(shè)置消費(fèi)者與訂閱主題之間的分區(qū)分配策略。默認(rèn)情況為range分配策略兼砖。
- 假設(shè)一個(gè)主題有10個(gè)分區(qū)(0-9)奸远,現(xiàn)在有三個(gè)consumer消費(fèi):
range策略:就是按照分區(qū)序號(hào)排序,假設(shè) n=分區(qū)數(shù)/消費(fèi)者數(shù)量 = 3讽挟, m=分區(qū)數(shù)%消費(fèi)者數(shù)量 = 1懒叛,那么前 m 個(gè)消費(fèi)者每個(gè)分配 n+1 個(gè)分區(qū),后面的(消費(fèi)者數(shù)量-m )個(gè)消費(fèi)者每個(gè)分配 n 個(gè)分區(qū)耽梅。比如分區(qū)0-3給一個(gè)consumer薛窥,分區(qū)4-6給一個(gè)consumer,分區(qū)7~9給一個(gè)consumer眼姐。
round-robin策略:就是輪詢分配诅迷,比如分區(qū)0、3众旗、6罢杉、9給一個(gè)consumer,分區(qū)1贡歧、4滩租、7給一個(gè)consumer赋秀,分區(qū)2、5律想、8給一個(gè)consumer
sticky策略:初始時(shí)分配策略與round-robin類似猎莲,但是在rebalance的時(shí)候,需要保證如下兩個(gè)原則技即。
1)分區(qū)的分配要盡可能均勻 益眉。
2)分區(qū)的分配盡可能與上次分配的保持相同。
當(dāng)兩者發(fā)生沖突時(shí)姥份,第一個(gè)目標(biāo)優(yōu)先于第二個(gè)目標(biāo) 郭脂。這樣可以最大程度維持原來的分區(qū)分配的策略。
比如對(duì)于第一種range情況的分配澈歉,如果第三個(gè)consumer掛了展鸡,那么重新用sticky策略分配的結(jié)果如下:
consumer1除了原有的0~3,會(huì)再分配一個(gè)7
consumer2除了原有的4~6埃难,會(huì)再分配8和9
Rebalance過程如下
當(dāng)有消費(fèi)者加入消費(fèi)組時(shí)莹弊,消費(fèi)者、消費(fèi)組及組協(xié)調(diào)器之間會(huì)經(jīng)歷以下幾個(gè)階段涡尘。
第一階段:選擇組協(xié)調(diào)器
組協(xié)調(diào)器GroupCoordinator:每個(gè)consumer group都會(huì)選擇一個(gè)broker作為自己的組協(xié)調(diào)器coordinator忍弛,負(fù)責(zé)監(jiān)控這個(gè)消費(fèi)組里的所有消費(fèi)者的心跳,以及判斷是否宕機(jī)考抄,然后開啟消費(fèi)者rebalance细疚。
consumer group中的每個(gè)consumer啟動(dòng)時(shí)會(huì)向kafka集群中的某個(gè)節(jié)點(diǎn)發(fā)送 FindCoordinatorRequest 請(qǐng)求來查找對(duì)應(yīng)的組協(xié)調(diào)器GroupCoordinator,并跟其建立網(wǎng)絡(luò)連接川梅。
組協(xié)調(diào)器選擇方式:
consumer消費(fèi)的offset要提交到__consumer_offsets的哪個(gè)分區(qū)疯兼,這個(gè)分區(qū)leader對(duì)應(yīng)的broker就是這個(gè)consumer group的coordinator第二階段:加入消費(fèi)組JOIN GROUP
在成功找到消費(fèi)組所對(duì)應(yīng)的 GroupCoordinator 之后就進(jìn)入加入消費(fèi)組的階段,在此階段的消費(fèi)者會(huì)向 GroupCoordinator 發(fā)送 JoinGroupRequest 請(qǐng)求贫途,并處理響應(yīng)吧彪。然后GroupCoordinator 從一個(gè)consumer group中選擇第一個(gè)加入group的consumer作為leader(消費(fèi)組協(xié)調(diào)器),把consumer group情況發(fā)送給這個(gè)leader丢早,接著這個(gè)leader會(huì)負(fù)責(zé)制定分區(qū)方案姨裸。第三階段( SYNC GROUP)
consumer leader通過給GroupCoordinator發(fā)送SyncGroupRequest,接著GroupCoordinator就把分區(qū)方案下發(fā)給各個(gè)consumer怨酝,他們會(huì)根據(jù)指定分區(qū)的leader broker進(jìn)行網(wǎng)絡(luò)連接以及消息消費(fèi)傀缩。
producer發(fā)布消息機(jī)制剖析
寫入方式
producer 采用 push 模式將消息發(fā)布到 broker,每條消息都被 append 到 patition 中凫碌,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存要高扑毡,保障 kafka 吞吐率)。
消息路由
producer 發(fā)送消息到 broker 時(shí)盛险,會(huì)根據(jù)分區(qū)算法選擇將其存儲(chǔ)到哪一個(gè) partition瞄摊。其路由機(jī)制為:
1.指定了 patition勋又,則直接使用;
2.未指定 patition 但指定 key换帜,通過對(duì) key 的 value 進(jìn)行hash 選出一個(gè) patition
3.patition 和 key 都未指定楔壤,使用輪詢選出一個(gè) patition。
寫入流程
- producer 先從 zookeeper 的 "/brokers/.../state" 節(jié)點(diǎn)找到該 partition 的 leader.
- producer 將消息發(fā)送給該 leader.
- leader 將消息寫入本地 log.
- followers 從 leader pull 消息惯驼,寫入本地 log 后 向leader 發(fā)送 ACK.
- leader 收到所有 ISR 中的 replica 的 ACK 后蹲嚣,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 發(fā)送 ACK.
HW與LEO詳解
HW俗稱高水位祟牲,HighWatermark的縮寫隙畜,取一個(gè)partition對(duì)應(yīng)的ISR中最小的LEO(log-end-offset)作為HW,consumer最多只能消費(fèi)到HW所在的位置说贝。另外每個(gè)replica都有HW,leader和follower各自負(fù)責(zé)更新自己的HW的狀態(tài)议惰。對(duì)于leader新寫入的消息,consumer不能立刻消費(fèi)乡恕,leader會(huì)等待該消息被所有ISR中的replicas同步后更新HW言询,此時(shí)消息才能被consumer消費(fèi)。這樣就保證了如果leader所在的broker失效傲宜,該消息仍然可以從新選舉的leader中獲取运杭。對(duì)于來自內(nèi)部broker的讀取請(qǐng)求,沒有HW的限制函卒。
下圖詳細(xì)的說明了當(dāng)producer生產(chǎn)消息至broker后辆憔,ISR以及HW和LEO的流轉(zhuǎn)過程:
由此可見,Kafka的復(fù)制機(jī)制既不是完全的同步復(fù)制谆趾,也不是單純的異步復(fù)制躁愿。事實(shí)上,同步復(fù)制要求所有能工作的follower都復(fù)制完沪蓬,這條消息才會(huì)被commit,這種復(fù)制方式極大的影響了吞吐率来候。而異步復(fù)制方式下跷叉,follower異步的從leader復(fù)制數(shù)據(jù),數(shù)據(jù)只要被leader寫入log就被認(rèn)為已經(jīng)commit营搅,這種情況下如果follower都還沒有復(fù)制完云挟,落后于leader時(shí),突然leader宕機(jī),則會(huì)丟失數(shù)據(jù)。而Kafka的這種使用ISR的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率浸卦。再回顧下消息發(fā)送端對(duì)發(fā)出消息持久化機(jī)制參數(shù)acks的設(shè)置轻抱,我們結(jié)合HW和LEO來看下acks=1的情況
日志分段存儲(chǔ)
Kafka 一個(gè)分區(qū)的消息數(shù)據(jù)對(duì)應(yīng)存儲(chǔ)在一個(gè)文件夾下,以topic名稱+分區(qū)號(hào)命名唤殴,消息在分區(qū)內(nèi)是分段(segment)存儲(chǔ)档痪,每個(gè)段的消息都存儲(chǔ)在不一樣的log文件里枢里,這種特性方便old segment file快速被刪除绑榴,kafka規(guī)定了一個(gè)段位的 log 文件最大為 1G哪轿,做這個(gè)限制目的是為了方便把 log 文件加載到內(nèi)存去操作:
# 部分消息的offset索引文件,kafka每次往分區(qū)發(fā)4K(可配置)消息就會(huì)記錄一條當(dāng)前消息的offset到index文件翔怎,
# 如果要定位消息的offset會(huì)先在這個(gè)文件里快速定位窃诉,再去log文件里找具體消息
00000000000000000000.index
# 消息存儲(chǔ)文件,主要存offset和消息體
00000000000000000000.log
# 消息的發(fā)送時(shí)間索引文件赤套,kafka每次往分區(qū)發(fā)4K(可配置)消息就會(huì)記錄一條當(dāng)前消息的發(fā)送時(shí)間戳與對(duì)應(yīng)的offset到timeindex文件飘痛,
# 如果需要按照時(shí)間來定位消息的offset,會(huì)先在這個(gè)文件里查找
00000000000000000000.timeindex
00000000000005367851.index
00000000000005367851.log
00000000000005367851.timeindex
00000000000009936472.index
00000000000009936472.log
00000000000009936472.timeindex
這個(gè) 9936472 之類的數(shù)字容握,就是代表了這個(gè)日志段文件里包含的起始 Offset敦冬,也就說明這個(gè)分區(qū)里至少都寫入了接近 1000 萬條數(shù)據(jù)了。
Kafka Broker 有一個(gè)參數(shù)唯沮,log.segment.bytes脖旱,限定了每個(gè)日志段文件的大小,最大就是 1GB介蛉。
一個(gè)日志段文件滿了萌庆,就自動(dòng)開一個(gè)新的日志段文件來寫入,避免單個(gè)文件過大币旧,影響文件的讀寫性能践险,這個(gè)過程叫做 log rolling,正在被寫入的那個(gè)日志段文件吹菱,叫做 active log segment巍虫。
九、Kafka可視化管理工具kafka-manager
安裝及基本使用可參考:https://www.cnblogs.com/dadonggg/p/8205302.html
線上環(huán)境規(guī)劃
JVM參數(shù)設(shè)置
kafka是scala語言開發(fā)鳍刷,運(yùn)行在JVM上占遥,需要對(duì)JVM參數(shù)合理設(shè)置,參看JVM調(diào)優(yōu)專題
修改bin/kafka-start-server.sh中的jvm設(shè)置输瓜,假設(shè)機(jī)器是32G內(nèi)存瓦胎,可以如下設(shè)置:
export KAFKA_HEAP_OPTS="-Xmx16G -Xms16G -Xmn10G -XX:MetaspaceSize=256M -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:G1HeapRegionSize=16M"
這種大內(nèi)存的情況一般都要用G1垃圾收集器,因?yàn)槟贻p代內(nèi)存比較大尤揣,用G1可以設(shè)置GC最大停頓時(shí)間搔啊,不至于一次minor gc就花費(fèi)太長時(shí)間,當(dāng)然北戏,因?yàn)橄駅afka负芋,rocketmq,es這些中間件嗜愈,寫數(shù)據(jù)到磁盤會(huì)用到操作系統(tǒng)的page cache旧蛾,所以JVM內(nèi)存不宜分配過大莽龟,需要給操作系統(tǒng)的緩存留出幾個(gè)G。
線上問題及優(yōu)化
消息丟失情況:
消息發(fā)送端:
- acks=0: 表示producer不需要等待任何broker確認(rèn)收到消息的回復(fù)蚜点,就可以繼續(xù)發(fā)送下一條消息轧房。性能最高,但是最容易丟消息绍绘。大數(shù)據(jù)統(tǒng)計(jì)報(bào)表場(chǎng)景奶镶,對(duì)性能要求很高,對(duì)數(shù)據(jù)丟失不敏感的情況可以用這種陪拘。
- acks=1: 至少要等待leader已經(jīng)成功將數(shù)據(jù)寫入本地log厂镇,但是不需要等待所有follower是否成功寫入。就可以繼續(xù)發(fā)送下一條消息左刽。這種情況下捺信,如果follower沒有成功備份數(shù)據(jù),而此時(shí)leader又掛掉欠痴,則消息會(huì)丟失迄靠。
- acks=-1或all: 這意味著leader需要等待所有備份(min.insync.replicas配置的備份個(gè)數(shù))都成功寫入日志,這種策略會(huì)保證只要有一個(gè)備份存活就不會(huì)丟失數(shù)據(jù)喇辽。這是最強(qiáng)的數(shù)據(jù)保證掌挚。一般除非是金融級(jí)別,或跟錢打交道的場(chǎng)景才會(huì)使用這種配置菩咨。當(dāng)然如果min.insync.replicas配置的是1則也可能丟消息吠式,跟acks=1情況類似。
消息消費(fèi)端:
如果消費(fèi)這邊配置的是自動(dòng)提交抽米,萬一消費(fèi)到數(shù)據(jù)還沒處理完特占,就自動(dòng)提交offset了,但是此時(shí)你consumer直接宕機(jī)了云茸,未處理完的數(shù)據(jù)丟失了是目,下次也消費(fèi)不到了。
消息重復(fù)消費(fèi)
消息發(fā)送端:
發(fā)送消息如果配置了重試機(jī)制查辩,比如網(wǎng)絡(luò)抖動(dòng)時(shí)間過長導(dǎo)致發(fā)送端發(fā)送超時(shí)胖笛,實(shí)際broker可能已經(jīng)接收到消息,但發(fā)送方會(huì)重新發(fā)送消息
消息消費(fèi)端:
如果消費(fèi)這邊配置的是自動(dòng)提交宜岛,剛拉取了一批數(shù)據(jù)處理了一部分,但還沒來得及提交功舀,服務(wù)掛了萍倡,下次重啟又會(huì)拉取相同的一批數(shù)據(jù)重復(fù)處理
一般消費(fèi)端都是要做消費(fèi)冪等處理的。
消息亂序
如果發(fā)送端配置了重試機(jī)制辟汰,kafka不會(huì)等之前那條消息完全發(fā)送成功才去發(fā)送下一條消息列敲,這樣可能會(huì)出現(xiàn)阱佛,發(fā)送了1,2戴而,3條消息凑术,第一條超時(shí)了,后面兩條發(fā)送成功所意,再重試發(fā)送第1條消息淮逊,這時(shí)消息在broker端的順序就是2,3扶踊,1了
所以泄鹏,是否一定要配置重試要根據(jù)業(yè)務(wù)情況而定。也可以用同步發(fā)送的模式去發(fā)消息秧耗,當(dāng)然acks不能設(shè)置為0备籽,這樣也能保證消息發(fā)送的有序。
kafka保證全鏈路消息順序消費(fèi)分井,需要從發(fā)送端開始车猬,將所有有序消息發(fā)送到同一個(gè)分區(qū),然后用一個(gè)消費(fèi)者去消費(fèi)尺锚,但是這種性能比較低珠闰,可以在消費(fèi)者端接收到消息后將需要保證順序消費(fèi)的幾條消費(fèi)發(fā)到內(nèi)存隊(duì)列(可以搞多個(gè)),一個(gè)內(nèi)存隊(duì)列開啟一個(gè)線程順序處理消息缩麸。
消息積壓
線上有時(shí)因?yàn)榘l(fā)送方發(fā)送消息速度過快铸磅,或者消費(fèi)方處理消息過慢,可能會(huì)導(dǎo)致broker積壓大量未消費(fèi)消息杭朱。
此種情況如果積壓了上百萬未消費(fèi)消息需要緊急處理阅仔,可以修改消費(fèi)端程序,讓其將收到的消息快速轉(zhuǎn)發(fā)到其他topic(可以設(shè)置很多分區(qū))弧械,然后再啟動(dòng)多個(gè)消費(fèi)者同時(shí)消費(fèi)新主題的不同分區(qū)八酒。由于消息數(shù)據(jù)格式變動(dòng)或消費(fèi)者程序有bug,導(dǎo)致消費(fèi)者一直消費(fèi)不成功刃唐,也可能導(dǎo)致broker積壓大量未消費(fèi)消息羞迷。
此種情況可以將這些消費(fèi)不成功的消息轉(zhuǎn)發(fā)到其它隊(duì)列里去(類似死信隊(duì)列),后面再慢慢分析死信隊(duì)列里的消息處理問題画饥。
延時(shí)隊(duì)列
延時(shí)隊(duì)列存儲(chǔ)的對(duì)象是延時(shí)消息衔瓮。所謂的“延時(shí)消息”是指消息被發(fā)送以后,并不想讓消費(fèi)者立刻獲取抖甘,而是等待特定的時(shí)間后热鞍,消費(fèi)者才能獲取這個(gè)消息進(jìn)行消費(fèi),延時(shí)隊(duì)列的使用場(chǎng)景有很多, 比如 :
- 在訂單系統(tǒng)中薇宠, 一個(gè)用戶下單之后通常有 30 分鐘的時(shí)間進(jìn)行支付偷办,如果 30 分鐘之內(nèi)沒有支付成功,那么這個(gè)訂單將進(jìn)行異常處理澄港,這時(shí)就可以使用延時(shí)隊(duì)列來處理這些訂單了椒涯。
- 訂單完成1小時(shí)后通知用戶進(jìn)行評(píng)價(jià)。
實(shí)現(xiàn)思路:發(fā)送延時(shí)消息時(shí)先把消息按照不同的延遲時(shí)間段發(fā)送到指定的隊(duì)列中(topic_1s回梧,topic_5s废岂,topic_10s,...topic_2h漂辐,這個(gè)一般不能支持任意時(shí)間段的延時(shí))泪喊,然后通過定時(shí)器進(jìn)行輪訓(xùn)消費(fèi)這些topic,查看消息是否到期髓涯,如果到期就把這個(gè)消息發(fā)送到具體業(yè)務(wù)處理的topic中袒啼,隊(duì)列中消息越靠前的到期時(shí)間越早,具體來說就是定時(shí)器在一次消費(fèi)過程中纬纪,對(duì)消息的發(fā)送時(shí)間做判斷蚓再,看下是否延遲到對(duì)應(yīng)時(shí)間了,如果到了就轉(zhuǎn)發(fā)包各,如果還沒到這一次定時(shí)任務(wù)就可以提前結(jié)束了摘仅。
消息回溯
如果某段時(shí)間對(duì)已消費(fèi)消息計(jì)算的結(jié)果覺得有問題,可能是由于程序bug導(dǎo)致的計(jì)算錯(cuò)誤问畅,當(dāng)程序bug修復(fù)后娃属,這時(shí)可能需要對(duì)之前已消費(fèi)的消息重新消費(fèi),可以指定從多久之前的消息回溯消費(fèi)护姆,這種可以用consumer的offsetsForTimes矾端、seek等方法指定從某個(gè)offset偏移的消息開始消費(fèi),參見上節(jié)課的內(nèi)容卵皂。
分區(qū)數(shù)越多吞吐量越高嗎
- 可以用kafka壓測(cè)工具自己測(cè)試分區(qū)數(shù)不同秩铆,各種情況下的吞吐量。
- 網(wǎng)絡(luò)上很多資料都說分區(qū)數(shù)越多吞吐量越高 灯变, 但從壓測(cè)結(jié)果來看殴玛,分區(qū)數(shù)到達(dá)某個(gè)值吞吐量反而開始下降,實(shí)際上很多事情都會(huì)有一個(gè)臨界值添祸,當(dāng)超過這個(gè)臨界值之后滚粟,很多原本符合既定邏輯的走向又會(huì)變得不同。一般情況分區(qū)數(shù)跟集群機(jī)器數(shù)量相當(dāng)就差不多了刃泌。
- 當(dāng)然吞吐量的數(shù)值和走勢(shì)還會(huì)和磁盤坦刀、文件系統(tǒng)愧沟、 I/O調(diào)度策略等因素相關(guān)蔬咬。
注意:如果分區(qū)數(shù)設(shè)置過大鲤遥,比如設(shè)置10000,可能會(huì)設(shè)置不成功林艘,后臺(tái)會(huì)報(bào)錯(cuò)"java.io.IOException : Too many open files"盖奈。
異常中最關(guān)鍵的信息是“ Too many open flies”,這是一種常見的 Linux 系統(tǒng)錯(cuò)誤狐援,通常意味著文件描述符不足钢坦,它一般發(fā)生在創(chuàng)建線程、創(chuàng)建 Socket啥酱、打開文件這些場(chǎng)景下 爹凹。 在 Linux系統(tǒng)的默認(rèn)設(shè)置下,這個(gè)文件描述符的個(gè)數(shù)不是很多 镶殷,通過 ulimit -n 命令可以查看:一般默認(rèn)是1024禾酱,可以將該值增大,比如:ulimit -n 65535
消息傳遞保障
at most once(消費(fèi)者最多收到一次消息绘趋,0--1次):acks = 0 可以實(shí)現(xiàn)颤陶。
at least once(消費(fèi)者至少收到一次消息,1--多次):ack = all 可以實(shí)現(xiàn)陷遮。
exactly once(消費(fèi)者剛好收到一次消息):at least once 加上消費(fèi)者冪等性可以實(shí)現(xiàn)滓走,還可以用kafka生產(chǎn)者的冪等性來實(shí)現(xiàn)。
kafka生產(chǎn)者的冪等性:因?yàn)榘l(fā)送端重試導(dǎo)致的消息重復(fù)發(fā)送問題帽馋,kafka的冪等性可以保證重復(fù)發(fā)送的消息只接收一次搅方,只需在生產(chǎn)者加上參數(shù) props.put(“enable.idempotence”, true) 即可,默認(rèn)是false不開啟绽族。
具體實(shí)現(xiàn)原理是姨涡,kafka每次發(fā)送消息會(huì)生成PID和Sequence Number,并將這兩個(gè)屬性一起發(fā)送給broker项秉,broker會(huì)將PID和Sequence Number跟消息綁定一起存起來绣溜,下次如果生產(chǎn)者重發(fā)相同消息,broker會(huì)檢查PID和Sequence Number娄蔼,如果相同不會(huì)再接收怖喻。
kafka的事務(wù)
Kafka的事務(wù)不同于Rocketmq,Rocketmq是保障本地事務(wù)(比如數(shù)據(jù)庫)與mq消息發(fā)送的事務(wù)一致性岁诉,Kafka的事務(wù)主要是保障一次發(fā)送多條消息的事務(wù)一致性(要么同時(shí)成功要么同時(shí)失敗)锚沸,一般在kafka的流式計(jì)算場(chǎng)景用得多一點(diǎn),比如涕癣,kafka需要對(duì)一個(gè)topic里的消息做不同的流式計(jì)算處理哗蜈,處理完分別發(fā)到不同的topic里,這些topic分別被不同的下游系統(tǒng)消費(fèi)(比如hbase,redis距潘,es等)炼列,這種我們肯定希望系統(tǒng)發(fā)送到多個(gè)topic的數(shù)據(jù)保持事務(wù)一致性。Kafka要實(shí)現(xiàn)類似Rocketmq的分布式事務(wù)需要額外開發(fā)功能音比。
kafka的事務(wù)處理可以參考官方文檔:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
//初始化事務(wù)
producer.initTransactions();
try {
//開啟事務(wù)
producer.beginTransaction();
for (int i = 0; i < 100; i++){
//發(fā)到不同的主題的不同分區(qū)
producer.send(new ProducerRecord<>("hdfs-topic", Integer.toString(i), Integer.toString(i)));
producer.send(new ProducerRecord<>("es-topic", Integer.toString(i), Integer.toString(i)));
producer.send(new ProducerRecord<>("redis-topic", Integer.toString(i), Integer.toString(i)));
}
//提交事務(wù)
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
//回滾事務(wù)
producer.abortTransaction();
}
producer.close();
kafka高性能的原因
磁盤順序讀寫:kafka消息不能修改以及不會(huì)從文件中間刪除保證了磁盤順序讀俭尖,kafka的消息寫入文件都是追加在文件末尾,不會(huì)寫入文件中的某個(gè)位置(隨機(jī)寫)保證了磁盤順序?qū)憽?br>
數(shù)據(jù)傳輸?shù)牧憧截?br>
讀寫數(shù)據(jù)的批量batch處理以及壓縮傳輸
數(shù)據(jù)傳輸零拷貝原理: