前言
Kafka 是一款分布式消息發(fā)布和訂閱系統(tǒng),具有高性能存捺、高吞吐量的特點而被廣泛應用與大數(shù)據(jù)傳輸場景槐沼。它是由 LinkedIn 公司開發(fā),使用 Scala 語言編寫捌治,之后成為 Apache 基金會的一個頂級項目岗钩。kafka 提供了類似 JMS 的特性,但是在設計和實現(xiàn)上是完全不同的肖油,而且他也不是 JMS 規(guī)范的實現(xiàn)兼吓。
Kafka簡介
kafka產(chǎn)生背景
kafka 作為一個消息系統(tǒng),早起設計的目的是用作 LinkedIn 的活動流(Activity Stream)和運營數(shù)據(jù)處理管道(Pipeline)构韵≈懿洌活動流數(shù)據(jù)是所有的網(wǎng)站對用戶的使用情況做分析的時候要用到的最常規(guī)的部分,活動數(shù)據(jù)包括頁面的訪問量(PV)趋艘、被查看內(nèi)容方面的信息以及搜索內(nèi)容。這種數(shù)據(jù)通常的處理方式是先把各種活動以日志的形式寫入某種文件凶朗,然后周期性的對這些文件進行統(tǒng)計分析瓷胧。運營數(shù)據(jù)指的是服務器的性能數(shù)據(jù)(CPU、IO 使用率棚愤、請求時間搓萧、服務日志等)。
Kafka應用場景
由于 kafka 具有更好的吞吐量宛畦、內(nèi)置分區(qū)瘸洛、冗余及容錯性的優(yōu)點(kafka 每秒可以處理幾十萬消息),讓 kafka 成為了一個很好的大規(guī)模消息處理應用的解決方案次和。
日志收集:日志收集方面反肋,有很多比較優(yōu)秀的產(chǎn)品,比如 Apache Flume踏施,很多公司使用kafka 代理日志聚合石蔗。
`
kafka架構(gòu)
一個典型的 kafka 集群包含若干 Producer(可以是應用節(jié)點產(chǎn)生的消息养距,也可以是通過Flume 收集日志產(chǎn)生的事件),若干個 Broker(kafka 支持水平擴展)日熬、若干個 Consumer Group棍厌,以及一個 zookeeper 集群。kafka 通過 zookeeper 管理集群配置及服務協(xié)同竖席。
Producer 使用 push 模式將消息發(fā)布到 broker耘纱,consumer 通過監(jiān)聽使用 pull 模式從broker 訂閱并消費消息。多個 broker 協(xié)同工作怕敬,producer 和 consumer 部署在各個業(yè)務邏輯中揣炕。三者通過zookeeper 管理協(xié)調(diào)請求和轉(zhuǎn)發(fā)。這樣就組成了一個高性能的分布式消息發(fā)布和訂閱系統(tǒng)东跪。圖上有一個細節(jié)是和其他 mq 中間件不同的點畸陡,producer 發(fā)送消息到 broker的過程是 push,而 consumer 從 broker 消費消息的過程是 pull虽填,主動去拉數(shù)據(jù)丁恭。而不是 broker 把數(shù)據(jù)主動發(fā)送給 consumer
名詞解釋:
Topic:Kafka將消息分門別類牲览,每一類的消息稱之為一個主題(Topic)。
Producer:發(fā)布消息的對象稱之為主題生產(chǎn)者(Kafka topic producer)
Consumer:訂閱消息并處理發(fā)布的消息的對象稱之為主題消費者(consumers)
Broker:已發(fā)布的消息保存在一組服務器中恶守,稱之為Kafka集群第献。集群中的每一個服務器都是一個代理(Broker)贡必。 消費者可以訂閱一個或多個主題(topic),并從Broker拉數(shù)據(jù)庸毫,從而消費這些已發(fā)布的消息仔拟。
Topic和Log:Topic是發(fā)布的消息的類別名,一個topic可以有零個飒赃,一個或多個消費者訂閱該主題的消息利花。對于每個topic,Kafka集群都會維護一個分區(qū)log载佳,就像下圖中所示:
每一個分區(qū)都是一個順序的蔫慧、不可變的消息隊列挠乳, 并且可以持續(xù)的添加。分區(qū)中的消息都被分了一個序列號藕漱,稱之為偏移量(offset)欲侮,在每個分區(qū)中此偏移量都是唯一的。
Kafka集群保持所有的消息肋联,直到它們過期(無論消息是否被消費)〉蠹螅可以看到這種設計對消費者來說操作自如橄仍,一個消費者的操作不會影響其它消費者對此log的處理。
分布式:Log的分區(qū)被分布到集群中的多個服務器上。每個服務器處理它分到的分區(qū)如孝。 根據(jù)配置每個分區(qū)還可以復制到其它服務器作為備份容錯宪哩。 每個分區(qū)有一個leader,零或多個follower第晰。Leader處理此分區(qū)的所有的讀寫請求锁孟,而follower被動的復制數(shù)據(jù)。 這樣可以平衡負載茁瘦,避免所有的請求都只讓一臺或者某幾臺服務器處理品抽。
生產(chǎn)者:生產(chǎn)者往某個Topic上發(fā)布消息。生產(chǎn)者也負責選擇發(fā)布到Topic上的哪一個分區(qū)甜熔。最簡單的方式從分區(qū)列表中輪流選擇圆恤。也可以根據(jù)某種算法依照權(quán)重選擇分區(qū)。開發(fā)者負責如何選擇分區(qū)的算法腔稀。
消費者:通常來講盆昙,消息模型可以分為兩種羽历, 隊列和發(fā)布-訂閱式。 隊列的處理方式是 一組消費者從服務器讀取消息淡喜,一條消息只有其中的一個消費者來處理秕磷。在發(fā)布-訂閱模型中,消息被廣播給所有的消費者拆火,接收到消息的消費者都可以處理此消息跳夭。Kafka為這兩種模型提供了單一的消費者抽象模型: 消費者組 (consumer group)。 消費者用一個消費者組名標記自己们镜。 一個發(fā)布在Topic上消息被分發(fā)給此消費者組中的一個消費者币叹。 每個組包含數(shù)目不等的消費者, 一個組內(nèi)多個消費者可以用來擴展性能和容錯模狭。正如下圖所示:
2個kafka集群托管4個分區(qū)(P0-P3)嚼鹉,2個消費者組贩汉,消費組A有2個消費者實例,消費組B有4個锚赤。
Docker搭建kafka
下載以下三個鏡像
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
docker pull sheepkiller/kafka-manager
kafka-manager是kafka的可視化管理工具
啟動容器
docker run -d --name zookeeper --publish 2181:2181 \--volume /etc/localtime:/etc/localtime \--restart=always \wurstmeister/zookeeper
docker run -d --name kafka --publish 9082:9092 \--link zookeeper:zookeeper \--env KAFKA_BROKER_ID=100 \--env HOST_IP=127.0.0.1 \--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \--env KAFKA_ADVERTISED_HOST_NAME=192.168.1.108 \--env KAFKA_ADVERTISED_PORT=9082 \--restart=always \--volume /etc/localtime:/etc/localtime \wurstmeister/kafka
docker run -d --name kafka-manager \--link zookeeper:zookeeper \--link kafka:kafka -p 9001:9000 \--restart=always \--env ZK_HOSTS=zookeeper:2181 \sheepkiller/kafka-manager
訪問
http://127.0.0.1:9001
添加Cluster
查看界面
搭建完畢,頁面其他功能自己摸索下
Kafka快速加入門
//以下Spring Boot應用程序?qū)⑷齻€消息發(fā)送到一個主題,接收它們浑侥,然后停止:
@SpringBootApplication
public class Application implements CommandLineRunner {
public static Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Autowired
private KafkaTemplate<String, String> template;
private final CountDownLatch latch = new CountDownLatch(3);
@Override
public void run(String... args) throws Exception {
this.template.send("myTopic", "foo1");
this.template.send("myTopic", "foo2");
this.template.send("myTopic", "foo3");
latch.await(60, TimeUnit.SECONDS);
logger.info("All received");
}
@KafkaListener(topics = "myTopic")
public void listen(ConsumerRecord<?, ?> cr) throws Exception {
logger.info(cr.toString());
latch.countDown();
}
}
Kafka進階
通信原理
消息是 kafka 中最基本的數(shù)據(jù)單元姊舵,在 kafka 中,一條消息由 key寓落、 value 兩部分構(gòu)成括丁,在發(fā)送一條消息時,我們可以指定這個 key伶选,那么 producer 會根據(jù) key 和 partition 機制來判斷當前這條消息應該發(fā)送并存儲到哪個 partition 中史飞。我們可以根據(jù)需要進行擴展 producer 的 partition 機制。
消息默認的分發(fā)機制
默認情況下考蕾,kafka 采用的是 hash 取模的分區(qū)算法祸憋。如果Key 為 null,則會隨機分配一個分區(qū)肖卧。這個隨機是在這個參數(shù)”metadata.max.age.ms”的時間范圍內(nèi)隨機選擇一個蚯窥。對于這個時間段內(nèi),如果 key 為 null,則只會發(fā)送到唯一的分區(qū)拦赠。這個值值哦默認情況下是 10 分鐘更新一次巍沙。
關(guān)于 Metadata ,這個之前沒講過荷鼠,簡單理解就是T opic/Partition 和 broker 的映射關(guān)系句携,每一個 topic 的每一個 partition,需要知道對應的 broker 列表是什么允乐, leader是誰矮嫉、 follower 是誰。這些信息都是存儲在 Metadata 這個類里面牍疏。
消費端如何消費指定的分區(qū)
//通過下面的代碼蠢笋,就可以消費指定該 topic 下的 0 號分區(qū)。其他分區(qū)的數(shù)據(jù)就無法接收
//消費指定分區(qū)的時候鳞陨,不需要再訂閱
//kafkaConsumer.subscribe(Collections.singletonList(topic));
//消費指定的分區(qū)
TopicPartition topicPartition=new
TopicPartition(topic,0);
kafkaConsumer.assign(Arrays.asList(topicPartit
ion));
消費原理
在實際生產(chǎn)過程中昨寞,每個 topic 都會有多個 partitions,多個 partitions 的好處在于厦滤,一方面能夠?qū)?broker 上的數(shù)據(jù)進行分片有效減少了消息的容量從而提升 io 性能援岩。另外一方面,為了提高消費端的消費能力掏导,一般會通過多個consumer 去消費同一個 topic 享怀,也就是消費端的負載均衡機制,也就是我們接下來要了解的趟咆,在多個partition 以及多個 consumer 的情況下凹蜈,消費者是如何消費消息的同時,在上一節(jié)課忍啸,我們講了, kafka 存在 consumer group的 概 念 履植, 也 就是 group.id 一樣 的 consumer 计雌,這些consumer 屬于一個 consumer group,組內(nèi)的所有消費者協(xié)調(diào)在一起來消費訂閱主題的所有分區(qū)玫霎。當然每一個分區(qū)只能由同一個消費組內(nèi)的 consumer 來消費凿滤,那么同一個consumer group 里面的 consumer 是怎么去分配該消費哪個分區(qū)里的數(shù)據(jù)的呢?如下圖所示庶近, 3 個分區(qū)翁脆, 3 個消費者,那么哪個消費者消分哪個分區(qū)鼻种?
分區(qū)分配策略
在 kafka 中,存在兩種分區(qū)分配策略罢缸,一種是 Range(默認)篙贸、另 一 中 另 一 中 還 是 RoundRobin ( 輪 詢 )。 通過partition.assignment.strategy 這個參數(shù)來設置枫疆。
Range strategy(范圍分區(qū))
Range 策略是對每個主題而言的爵川,首先對同一個主題里面的分區(qū)按照序號進行排序,并對消費者按照字母順序進行排序息楔。假設我們有 10 個分區(qū)寝贡,3 個消費者,排完序的分區(qū)將會是 0, 1, 2, 3, 4, 5, 6, 7, 8, 9值依;消費者線程排完序?qū)荂1-0, C2-0, C3-0圃泡。然后將 partitions 的個數(shù)除于消費者線程的總數(shù)來決定每個消費者線程消費幾個分區(qū)。如果除不盡鳞滨,那么前面幾個消費者線程將會多消費一個分區(qū)洞焙。在我們的例子里面,我們有 10 個分區(qū)拯啦,3 個消費者線程澡匪, 10 / 3 = 3,而且除不盡褒链,那么消費者線程 C1-0 將會多消費一個分區(qū)唁情,所以最后分區(qū)分配的結(jié)果看起來是這樣的:
- C1-0 將消費 0, 1, 2, 3 分區(qū)
- C2-0 將消費 4, 5, 6 分區(qū)
- C3-0 將消費 7, 8, 9 分區(qū)
假如我們有 11 個分區(qū),那么最后分區(qū)分配的結(jié)果看起來是這樣的:
- C1-0 將消費 0, 1, 2, 3 分區(qū)
- C2-0 將消費 4, 5, 6, 7 分區(qū)
- C3-0 將消費 8, 9, 10 分區(qū)
假如我們有 2 個主題(T1 和 T2)甫匹,分別有 10 個分區(qū)甸鸟,那么最后分區(qū)分配的結(jié)果看起來是這樣的:
- C1-0 將消費 T1 主題的 0, 1, 2, 3 分區(qū)以及 T2 主題的 0, 1, 2, 3 分區(qū)
- C2-0 將消費 T1 主題的 4, 5, 6 分區(qū)以及 T2 主題的 4, 5, 6 分區(qū)
- C3-0 將消費 T1 主題的 7, 8, 9 分區(qū)以及 T2 主題的 7, 8, 9 分區(qū)
可以看出,C1-0 消費者線程比其他消費者線程多消費了 2 個分區(qū)兵迅,這就是 Range strategy 的一個很明顯的弊端
RoundRobin strategy(輪詢分區(qū))
輪詢分區(qū)策略是把所有 partition 和所有 consumer 線程都列出來抢韭,然后按照 hashcode 進行排序。最后通過輪詢算法分配 partition 給消費線程恍箭。如果所有 consumer 實例的訂閱是相同的刻恭,那么 partition 會均勻分布。
在我們的例子里面扯夭,假如按照 hashCode 排序完的 topicpartitions 組依次為 T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9鳍贾,我們的消費者線程排序為 C1-0, C1-1, C2-0, C2-1,最后分區(qū)分配的結(jié)果為:
- C1-0 將消費 T1-5, T1-2, T1-6 分區(qū)交洗;
- C1-1 將消費 T1-3, T1-1, T1-9 分區(qū)骑科;
- C2-0 將消費 T1-0, T1-4 分區(qū);
- C2-1 將消費 T1-8, T1-7 分區(qū)构拳;
使用輪詢分區(qū)策略必須滿足兩個條件
- 每個主題的消費者實例具有相同數(shù)量的流
- 每個消費者訂閱的主題必須是相同的
什么時候會觸發(fā)這個策略呢咆爽?
當出現(xiàn)以下幾種情況時梁棠,kafka 會進行一次分區(qū)分配操作,
也就是 kafka consumer 的 rebalance
- 同一個 consumer group 內(nèi)新增了消費者
- 消費者離開當前所屬的 consumer group伍掀,比如主動停機或者宕機
- topic 新增了分區(qū)(也就是分區(qū)數(shù)量發(fā)生了變化)
kafka consuemr 的 rebalance 機制規(guī)定了一個 consumergroup 下的所有 consumer 如何達成一致來分配訂閱 topic的每個分區(qū)掰茶。而具體如何執(zhí)行分區(qū)策略,就是前面提到過的兩種內(nèi)置的分區(qū)策略蜜笤。而 kafka 對于分配策略這塊濒蒋,提供了可插拔的實現(xiàn)方式, 也就是說把兔,除了這兩種之外沪伙,我們還可以創(chuàng)建自己的分配機制。
什么時候會觸發(fā)這個策略呢县好?
當出現(xiàn)以下幾種情況時围橡,kafka 會進行一次分區(qū)分配操作,也就是 kafka consumer 的 rebalance
- 同一個 consumer group 內(nèi)新增了消費者
- 消費者離開當前所屬的 consumer group缕贡,比如主動停機或者宕機
- topic 新增了分區(qū)(也就是分區(qū)數(shù)量發(fā)生了變化)kafka consuemr 的 rebalance 機制規(guī)定了一個 consumergroup 下的所有 consumer 如何達成一致來分配訂閱 topic的每個分區(qū)翁授。而具體如何執(zhí)行分區(qū)策略,就是前面提到過的兩種內(nèi)置的分區(qū)策略晾咪。而 kafka 對于分配策略這塊收擦,提供了可插拔的實現(xiàn)方式, 也就是說谍倦,除了這兩種之外塞赂,我們還可以創(chuàng)建自己的分配機制。
誰來執(zhí)行 Rebalance 以及管理 consumer 的 group 呢昼蛀?
Kafka 提供了一個角色: coordinator 來執(zhí)行對于 consumer group 的管理,Kafka 提供了一個角色:coordinator 來執(zhí)行對于 consumer group 的管理仇哆,當 consumer group 的第一個 consumer 啟動的時候夫植,它會去和 kafka server 確定誰是它們組的 coordinator。之后該 group 內(nèi)的所有成員都會和該 coordinator 進行協(xié)調(diào)通信
如何確定 coordinator
consumer group 如何確定自己的 coordinator 是誰呢, 消費 者 向 kafka 集 群 中 的 任 意 一 個 broker 發(fā) 送 一 個GroupCoordinatorRequest 請求,服務端會返回一個負載最 小 的 broker 節(jié) 點 的 id 撞羽, 并 將 該 broker 設 置 為coordinator
JoinGroup 的過程
在 rebalance 之前阐斜,需要保證 coordinator 是已經(jīng)確定好了的,整個 rebalance 的過程分為兩個步驟诀紊,Join 和 Syncjoin: 表示加入到 consumer group 中谒出,在這一步中,所有的成員都會向 coordinator 發(fā)送 joinGroup 的請求。一旦所有成員都發(fā)送了 joinGroup 請求笤喳,那么 coordinator 會選擇一個 consumer 擔任 leader 角色为居,并把組成員信息和訂閱信息發(fā)送消費者
protocol_metadata: 序列化后的消費者的訂閱信息
leader_id: 消費組中的消費者蒙畴,coordinator 會選擇一個座位 leader,對應的就是 member_id
member_metadata 對應消費者的訂閱信息
members:consumer group 中全部的消費者的訂閱信息
generation_id: 年代信息呜象,類似于之前講解 zookeeper 的時候的 epoch 是一樣的膳凝,對于每一輪 rebalance
generation_id 都會遞增。主要用來保護 consumer group恭陡。隔離無效的 offset 提交蹬音。也就是上一輪的 consumer 成員無法提交 offset 到新的 consumer group 中。
Synchronizing Group State 階段
完成分區(qū)分配之后休玩,就進入了 Synchronizing Group State階段著淆,主要邏輯是向 GroupCoordinator 發(fā)送SyncGroupRequest 請求,并且處理 SyncGroupResponse響應拴疤,簡單來說,就是 leader 將消費者對應的 partition 分配方案同步給 consumer group 中的所有 consumer
每個消費者都會向 coordinator 發(fā)送 syncgroup 請求晨炕,不過只有 leader 節(jié)點會發(fā)送分配方案,其他消費者只是打打醬油而已费奸。當 leader 把方案發(fā)給 coordinator 以后,coordinator 會把結(jié)果設置到 SyncGroupResponse 中缨历。這樣所有成員都知道自己應該消費哪個分區(qū)辛孵。
? consumer group 的分區(qū)分配方案是在客戶端執(zhí)行的宝与!Kafka 將這個權(quán)利下放給客戶端主要是因為這樣做可以有更好的靈活性
如何保存消費端的消費位置
什么是 offset
前面在講partition 的時候,提到過 offset榜聂, 每個 topic可以劃分多個分區(qū)(每個 Topic 至少有一個分區(qū)),同一topic 下的不同分區(qū)包含的消息是不同的豌汇。每個消息在被添加到分區(qū)時,都會被分配一個 offset(稱之為偏移量)逻澳,它是消息在此分區(qū)中的唯一編號, kafka 通過 offset 保證消息在分區(qū)內(nèi)的順序瓤逼, offset 的順序不跨分區(qū),即 kafka 只保證在同一個分區(qū)內(nèi)的消息是有序的定硝; 對于應用層的消費來說,每次消費一個消息并且提交以后箱蟆,會保存當前消費到的最近的一個 offset。那么 offset 保存在哪里辈毯?
offset 在哪里維護?
在 kafka 中饺蔑,提供了一個_consumer_offsets的一個topic 孔祸,把 offset 信息寫入到這個topic中。
_consumer_offsets——按保存了每個 consumer group某一時刻提交的 offset 信息一铅。 consumer_offsets 默認有50 個分區(qū)拇涤。
消息的存儲原理
消息的保存路徑
消息發(fā)送端發(fā)送消息到 broker 上以后慢哈,消息是如何持久化的呢卵贱?那么接下來去分析下消息的存儲
首先我們需要了解的是, kafka 是使用日志文件的方式來保存生產(chǎn)者和發(fā)送者的消息编振,每條消息都有一個 offset 值來表示它在分區(qū)中的偏移量。 Kafka 中存儲的一般都是海量的消息數(shù)據(jù)杯瞻,為了避免日志文件過大,Log 并不是直接對應在一個磁盤上的日志文件旗唁,而是對應磁盤上的一個目錄,這個目錄的明明規(guī)則是<topic_name>_<partition_id>比如創(chuàng)建一個名為 firstTopic 的 topic屎媳,其中有 3 個 partition,那么在 kafka 的數(shù)據(jù)目錄(/tmp/kafka-log)中就有 3 個目錄丹禀,firstTopic-0~3
多個分區(qū)在集群中的分配
如果我們對于一個 topic持搜,在集群中創(chuàng)建多個 partition朵诫,那么 partition 是如何分布的呢邓梅?
1.將所有 N Broker 和待分配的 i 個 Partition 排序
2.將第 i 個 Partition 分配到第(i mod n)個 Broker 上
了解到這里的時候,大家再結(jié)合前面講的消息分發(fā)策略,就應該能明白消息發(fā)送到 broker 上外里,消息會保存到哪個分區(qū)中,并且消費端應該消費哪些分區(qū)的數(shù)據(jù)了墩莫。
消息寫入的性能
我們現(xiàn)在大部分企業(yè)仍然用的是機械結(jié)構(gòu)的磁盤,如果把消息以隨機的方式寫入到磁盤,那么磁盤首先要做的就是尋址愕秫,也就是定位到數(shù)據(jù)所在的物理地址符喝,在磁盤上就要找到對應的柱面协饲、磁頭以及對應的扇區(qū)把夸;這個過程相對內(nèi)存來說會消耗大量時間膀篮,為了規(guī)避隨機讀寫帶來的時間消耗, kafka 采用順序?qū)懙姆绞酱鎯?shù)據(jù)烤黍。
頁緩存
順序?qū)懭胧荎afka高吞吐量的一個原因,當然即使采用的是磁盤的順序?qū)懭耄敲匆彩菦]有辦法和內(nèi)存相比的唉锌。因為為了再一次提高Kakfa的吞吐量泛啸,Kafka采用了Memory Mapped Files
(后面簡稱mmap)也被翻譯成內(nèi)存映射文件 ,它的工作原理是直接利用操作系統(tǒng)的page cache 來實現(xiàn)文件到物理內(nèi)存的直接映射,完成映射之后你對物理內(nèi)存的操作會被同步到硬盤上(操作系統(tǒng)在適當?shù)臅r候)吕粹。
操作系統(tǒng)本身有一層緩存种柑,叫做page cache,是在內(nèi)存里的緩存匹耕,我們也可以稱之為os cache聚请,意思就是操作系統(tǒng)自己管理的緩存。你在寫入磁盤文件的時候稳其,可以直接寫入這個os cache里,也就是僅僅寫入內(nèi)存中返帕,接下來由操作系統(tǒng)自己決定什么時候把os cache里的數(shù)據(jù)真的刷入磁
盤文件中(每5秒檢查一次是否需要將頁緩存數(shù)據(jù)同步到磁盤文件)踊谋。僅僅這一個步驟鞭呕,就可以將磁盤文件寫性能提升很多了,因為其實這里相當于是在寫內(nèi)存胡控,不是在寫磁盤.
零拷貝
消息從發(fā)送到落地保存凡傅,broker 維護的消息日志本身就是文件目錄,每個文件都是二進制保存懂鸵,生產(chǎn)者和消費者使用相同的格式來處理。在消費者獲取消息時摸航,服務器先從硬盤讀取數(shù)據(jù)到內(nèi)存,然后把內(nèi)存中的數(shù)據(jù)原封不動的通過 socket 發(fā)送給消費者。雖然這個操作描述起來很簡單,但實際上經(jīng)歷了很多步驟娃惯。
? 操作系統(tǒng)將數(shù)據(jù)從磁盤讀入到內(nèi)核空間的頁緩存
? 應用程序?qū)?shù)據(jù)從內(nèi)核空間讀入到用戶空間緩存中
? 應用程序?qū)?shù)據(jù)寫回到內(nèi)核空間到 socket 緩存中
? 操作系統(tǒng)將數(shù)據(jù)從 socket 緩沖區(qū)復制到網(wǎng)卡緩沖區(qū),以便將數(shù)據(jù)經(jīng)網(wǎng)絡發(fā)出
這個過程涉及到 4 次上下文切換以及 4 次數(shù)據(jù)復制鬼悠,并且有兩次復制操作是由 CPU 完成客给。但是這個過程中桩引,數(shù)據(jù)完全沒有進行變化忱详,僅僅是從磁盤復制到網(wǎng)卡緩沖區(qū)。
通過“ 零拷貝 ”技術(shù)澜术,可以去掉這些沒必要的數(shù)據(jù)復制操作,同時也會減少上下文切換次數(shù)♀現(xiàn)代的 unix 操作系統(tǒng)提供一個優(yōu)化的代碼路徑鸟废,用于將數(shù)據(jù)從頁緩存?zhèn)鬏數(shù)?socket;在 Linux 中漆诽,是通過 sendfile 系統(tǒng)調(diào)用來完成的侮攀。Java 提供了訪問這個系統(tǒng)調(diào)用的方法: FileChannel.transferTo API
使用 sendfile兰英,只需要一次拷貝就行,允許操作系統(tǒng)將數(shù)據(jù)直接從頁緩存發(fā)送到網(wǎng)絡上供鸠。所以在這個優(yōu)化的路徑中畦贸,只有最后一步將數(shù)據(jù)拷貝到網(wǎng)卡緩存中是需要的
消息的文件存儲機制
前面我們知道了一個 topic 的多個 partition 在物理磁盤上的保存路徑薄坏,那么我們再來分析日志的存儲方式。通過如下命令找到對應 partition 下的日志內(nèi)容
[root@localhost ~]# ls /tmp/kafka-logs/firstTopic-1/00000000000000000000.index 00000000000000000000.log 00000000000000000000.timeindex leader-epochcheckpoint
kafka 是通過分段的方式將 Log 分為多個 LogSegment寨闹,LogSegment 是一個邏輯上的概念胶坠,一個 LogSegment 對應磁盤上的一個日志文件和一個索引文件,其中日志文件是用來記錄消息的繁堡。索引文件是用來保存消息的索引沈善。那么這個 LogSegment 是什么呢?
LogSegment
假設 kafka 以 partition 為最小存儲單位椭蹄,那么我們可以想象當 kafka producer 不斷發(fā)送消息闻牡,必然會引起 partition文件的無線擴張,這樣對于消息文件的維護以及被消費的消息的清理帶來非常大的挑戰(zhàn)绳矩,所以 kafka 以 segment 為單位又把 partition 進行細分罩润。每個 partition 相當于一個巨型文件被平均分配到多個大小相等的 segment 數(shù)據(jù)文件中(每個 segment 文件中的消息不一定相等),這種特性方便已經(jīng)被消費的消息的清理翼馆,提高磁盤的利用率割以。
? log.segment.bytes=107370 ( 設置分段大小 ), 默認是1gb金度,我們把這個值調(diào)小以后,可以看到日志分段的效果
? 抽取其中 3 個分段來進行分析
segment file 由 2 大部分組成,分別為 index file 和 data file祝峻,此 2 個文件一一對應魔吐,成對出現(xiàn),后綴".index"和“.log”分別表示為 segment 索引文件莱找、數(shù)據(jù)文件.segment 文件命名規(guī)則:partion 全局的第一個 segment從 0 開始酬姆,后續(xù)每個 segment 文件名為上一個 segment文件最后一條消息的 offset 值進行遞增。數(shù)值最大為 64 位long 大小奥溺,20 位數(shù)字字符長度辞色,沒有數(shù)字用 0 填。
segment 中 index 和 log 的對應關(guān)系
從所有分段中浮定,找一個分段進行分析為了提高查找消息的性能相满,為每一個日志文件添加 2 個索引索引文件: OffsetIndex 和 TimeIndex,分別對應 .index以及 .timeindex, TimeIndex 索引文件格式:它是映射時間戳和相對offset
查 看 索 引 內(nèi) 容 : sh kafka-run-class.sh
kafka.tools.DumpLogSegments --files /tmp/kafkalogs/test-0/00000000000000000000.index --print-datalog
如圖所示,index 中存儲了索引以及物理偏移量方灾。 log 存儲了消息的內(nèi)容建蹄。索引文件的元數(shù)據(jù)執(zhí)行對應數(shù)據(jù)文件中
message 的物理偏移地址。舉個簡單的案例來說裕偿,以[4053,80899]為例洞慎,在 log 文件中,對應的是第 4053 條記錄嘿棘,物理偏移量( position )為 80899. position 是ByteBuffer 的指針位置
在 partition 中如何通過 offset 查找 message
- 根據(jù) offset 的值劲腿,查找 segment 段中的 index 索引文件。由于索引文件命名是以上一個文件的最后一個offset 進行命名的鸟妙,所以焦人,使用二分查找算法能夠根據(jù)offset 快速定位到指定的索引文件。
- 找到索引文件后圆仔,根據(jù) offset 進行定位垃瞧,找到索引文件中的符合范圍的索引蔫劣。(kafka 采用稀疏索引的方式來提高查找性能)
- 得到 position 以后坪郭,再到對應的 log 文件中,從 position出開始查找 offset 對應的消息脉幢,將每條消息的 offset 與目標 offset 進行比較歪沃,直到找到消息
比如說嗦锐,我們要查找 offset=2490 這條消息,那么先找到00000000000000000000.index, 然后找到[2487,49111]這個索引沪曙,再到 log 文件中奕污,根據(jù) 49111 這個 position 開始查找,比較每條消息的 offset 是否大于等于 2490液走。最后查找到對應的消息以后返回
日志清除策略
前面提到過碳默,日志的分段存儲,一方面能夠減少單個文件內(nèi)容的大小缘眶,另一方面嘱根,方便 kafka 進行日志清理。日志的清理策略有兩個
- 根據(jù)消息的保留時間巷懈,當消息在 kafka 中保存的時間超過了指定的時間该抒,就會觸發(fā)清理過程
- 根據(jù) topic 存儲的數(shù)據(jù)大小,當 topic 所占的日志文件大小大于一定的閥值顶燕,則可以開始刪除最舊的消息凑保。 kafka會啟動一個后臺線程,定期檢查是否存在可以刪除的消息
通過 log.retention.bytes 和 log.retention.hours 這兩個參數(shù)來設置涌攻,當其中任意一個達到要求欧引,都會執(zhí)行刪除。默認的保留時間是:7 天
日志壓縮策略
Kafka 還提供了“日志壓縮(Log Compaction)”功能癣漆,通過這個功能可以有效的減少日志文件的大小维咸,緩解磁盤緊張的情況,在很多實際場景中惠爽,消息的 key 和 value 的值之間的對應關(guān)系是不斷變化的癌蓖,就像數(shù)據(jù)庫中的數(shù)據(jù)會不斷被修改一樣,消費者只關(guān)心 key 對應的最新的 value婚肆。 因此租副,我們可以開啟 kafka 的日志壓縮功能,服務端會在后臺啟動啟動 Cleaner 線程池较性,定期將相同的 key 進行合并用僧,只保留最新的 value 值。日志的壓縮原理是
partition 的高可用副本機制
我們已經(jīng)知道 Kafka 的每個 topic 都可以分為多個 Partition,并且多個 partition 會均勻分布在集群的各個節(jié)點下攀操。雖然這種方式能夠有效的對數(shù)據(jù)進行分片院仿,但是對于每個partition 來說,都是單點的,當其中一個 partition 不可用的時候歹垫,那么這部分消息就沒辦法消費剥汤。所以 kafka 為了提高 partition 的可靠性而提供了副本的概念(Replica) ,通過副本機制來實現(xiàn)冗余備份。每個分區(qū)可以有多個副本排惨,并且在副本集合中會存在一個leader 的副本吭敢,所有的讀寫請求都是由 leader 副本來進行處理。剩余的其他副本都做為 follower 副本暮芭,follower 副本 會 從 leader 佛 本 同 步 笑 息 日 志 鹿驼。 這 個 有 點 類 似zookeeper 中 leader 和 follower 的概念,但是具體的時間方式還是有比較大的差異辕宏。所以我們可以認為蠢沿,副本集會存在一主多從的關(guān)系。
一般情況下匾效,同一個分區(qū)的多個副本會被均勻分配到集群中的不同 broker 上舷蟀,當 leader 副本所在的 broker 出現(xiàn)故障后,可以重新選舉新的 leader 副本繼續(xù)對外提供服務面哼。通過這樣的副本機制來提高 kafka 集群的可用性野宜。
副本分配算法
將所有 N Broker 和待分配的 i 個 Partition 排序.
將第 i 個 Partition 分配到第(i mod n)個 Broker 上.
將第 i 個 Partition 的第 j 個副本分配到第((i + j) mod n)個Broker 上.
kafka 副本機制中的幾個概念
Kafka 分區(qū)下有可能有很多個副本(replica)用于實現(xiàn)冗余,從而進一步實現(xiàn)高可用魔策。副本根據(jù)角色的不同可分為 3 類:
leader 副本:響應 clients 端讀寫請求的副本
follower 副本:被動的備份 leader 副本中的數(shù)據(jù)匈子,不能響應 clients 端讀寫請求。
ISR 副本:包含了 leader 副本和所有與 leader 副本保持同步的 follower 副本——如何判定是否與 leader 同步后面會提到每個 Kafka 副本對象都有兩個重要的屬性:LEO 和HW闯袒。注意是所有的副本虎敦,而不只是 leader 副本。
LEO:即日志末端位移(log end offset)政敢,記錄了該副本底層日志(log)中下一條消息的位移值其徙。注意是下一條消息!也就是說喷户,如果 LEO=10唾那,那么表示該副本保存了 10 條消息,位移值范圍是[0, 9]褪尝。另外闹获, leader LEO 和follower LEO 的更新是有區(qū)別的。我們后面會詳細說
HW:即上面提到的水位值河哑。對于同一個副本對象而言避诽,其
HW 值不會大于 LEO 值。小于等于 HW 值的所有消息都被認為是“ 已備份” 的(replicated )璃谨。同理沙庐, leader 副本和follower 副本的 HW 更新是有區(qū)別的
副本協(xié)同機制
剛剛提到了,消息的讀寫操作都只會由 leader 節(jié)點來接收和處理。follower 副本只負責同步數(shù)據(jù)以及當 leader 副本所在的 broker 掛了以后轨功,會從 follower 副本中選取新的leader。
請求首先由 Leader 副本處理,之后 follower 副本會從leader 上拉取寫入的消息花盐,這個過程會有一定的延遲羡滑,導致 follower 副本中保存的消息略少于 leader 副本,但是只要沒有超出閾值都可以容忍算芯。但是如果一個 follower 副本出現(xiàn)異常柒昏,比如宕機、網(wǎng)絡斷開等原因長時間沒有同步到消息熙揍,那這個時候职祷, leader 就會把它踢出去。 kafka 通過 ISR集合來維護一個分區(qū)副本信息
HW&LEO
關(guān)于 follower 副本同步的過程中届囚,還有兩個關(guān)鍵的概念有梆,HW(HighWatermark)和 LEO(Log End Offset). 這兩個參數(shù)跟 ISR 集合緊密關(guān)聯(lián)。 HW 標記了一個特殊的 offset意系,當消費者處理消息的時候泥耀,只能拉去到 HW 之前的消息, HW之后的消息對消費者來說是不可見的蛔添。也就是說痰催,取partition 對應 ISR 中最小的 LEO 作為 HW,consumer 最多只能消費到 HW 所在的位置迎瞧。每個 replica 都有 HW夸溶,leader 和 follower 各自維護更新自己的 HW 的狀態(tài)。一條消息只有被 ISR 里的所有 Follower 都從 Leader 復制過去才會被認為已提交凶硅。這樣就避免了部分數(shù)據(jù)被寫進了Leader蜘醋,還沒來得及被任何 Follower 復制就宕機了,而造成數(shù)據(jù)丟失(Consumer 無法消費這些數(shù)據(jù))咏尝。而對于Producer 而言压语,它可以選擇是否等待消息 commit,這可以通過 acks 來設置编检。這種機制確保了只要 ISR 有一個或以上的 Follower胎食,一條被 commit 的消息就不會丟失。
數(shù)據(jù)的同步過程
了解了副本的協(xié)同過程以后允懂,還有一個最重要的機制厕怜,就是數(shù)據(jù)的同步過程。它需要解決
- 怎么傳播消息
- 在向消息發(fā)送端返回 ack 之前需要保證多少個 Replica
已經(jīng)接收到這個消息
初始狀態(tài)
初始狀態(tài)下,leader 和 follower 的 HW 和 LEO 都是 0粥航,leader 副本會保存 remote LEO琅捏,表示所有 follower LEO,也會被初始化為 0递雀,這個時候柄延,producer 沒有發(fā)送消息。follower 會不斷地個 leader 發(fā)送 FETCH 請求缀程,但是因為沒有數(shù)據(jù)搜吧,這個請求會被 leader 寄存,當在指定的時間之后會 強 制 完 成 請 秋 杨凑, 這 個 時 見 配 致 是(replica.fetch.wait.max.ms)滤奈,如果在指定時間內(nèi) producer有消息發(fā)送過來,那么 kafka 會喚醒 fetch 請求撩满,讓 leader繼續(xù)處理
這里會分兩種情況伺帘,第一種是 leader 處理完 producer 請求之后搞糕,follower 發(fā)送一個 fetch 請求過來、第二種是follower 阻塞在 leader 指定時間之內(nèi)曼追,leader 副本收到producer 的請求窍仰。這兩種情況下處理方式是不一樣的。先來看第一種情況
一礼殊、follower 的 fetch 請求是當 leader 處理消息以后執(zhí)行的
leader 處理完 producer 請求之后驹吮,follower 發(fā)送一個fetch 請求過來 。狀態(tài)圖如下
leader 副本收到請求以后,會做幾件事情
- 把消息追加到 log 文件婚陪,同時更新 leader 副本的 LEO
- 嘗試更新 leader HW 值族沃。這個時候由于 follower 副本還沒有發(fā)送 fetch 請求,那么 leader 的 remote LEO 仍然是 0泌参。leader 會比較自己的 LEO 以及 remote LEO 的值發(fā)現(xiàn)最小值是 0脆淹,與 HW 的值相同,所以不會更新 HW
follower fetch 消息
follower 發(fā)送 fetch 請求,leader 副本的處理邏輯是:
- 讀取 log 數(shù)據(jù)铣缠、更新 remote LEO=0(follower 還沒有寫入這條消息烘嘱,這個值是根據(jù) follower 的 fetch 請求中的offset 來確定的)
- 嘗試更新 HW昆禽,因為這個時候 LEO 和 remoteLEO 還是不一致,所以仍然是 HW=0
- 把消息內(nèi)容和當前分區(qū)的 HW 只發(fā)送給 follower 副本follower 副本收到 response 以后
- 將消息寫入到本地 log蝇庭,同時更新 follower 的 LEO
- 更新 follower HW醉鳖,本地的 LEO 和 leader 返回的 HW進行比較取小的值,所以仍然是 0第一次交互結(jié)束以后哮内, HW 仍然還是 0盗棵,這個值會在下一次follower 發(fā)起 fetch 請求時被更新
follower 發(fā)第二次 fetch 請求,leader 收到請求以后
- 讀取 log 數(shù)據(jù)
- 更新 remote LEO=1泰涂, 因為這次 fetch 攜帶的 offset 是1.
- 更新當前分區(qū)的 HW鲫竞,這個時候 leader LEO 和 remoteLEO 都是 1,所以 HW 的值也更新為 1
- 把數(shù)據(jù)和當前分區(qū)的 HW 值返回給 follower 副本逼蒙,這個時候如果沒有數(shù)據(jù)从绘,則返回為空
follower 副本收到 response 以后
- 如果有數(shù)據(jù)則寫本地日志,并且更新 LEO
- 更新 follower 的 HW 值 到目前為止是牢,數(shù)據(jù)的同步就完成了僵井,意味著消費端能夠消費 offset=0 這條消息。
二驳棱、follower 的 fetch 請求是直接從阻塞過程中觸發(fā)
前面說過批什,由于 leader 副本暫時沒有數(shù)據(jù)過來,所以follower 的 fetch 會被阻塞社搅,直到等待超時或者 leader 接收到新的數(shù)據(jù)驻债。當 leader 收到請求以后會喚醒處于阻塞的fetch 請求。處理過程基本上和前面說的一直
- leader 將消息寫入本地日志形葬,更新 Leader 的 LEO
- 喚醒 follower 的 fetch 請求
- 更新 HWkafka 使用 HW 和 LEO 的方式來實現(xiàn)副本數(shù)據(jù)的同步合呐,本身是一個好的設計,但是在這個地方會存在一個數(shù)據(jù)丟失的問題笙以,當然這個丟失只出現(xiàn)在特定的背景下淌实。我們回想一下, HW 的值是在新的一輪 FETCH 中才會被更新猖腕。我們分析下這個過程為什么會出現(xiàn)數(shù)據(jù)丟失
由于篇幅限制的原因拆祈,只能將文章展示到這里,希望可以對大家學習Kafka有幫助倘感,喜歡的小伙伴可以幫忙轉(zhuǎn)發(fā)+關(guān)注缘屹,感謝大家~