1. 整體架構(gòu)概述
Kafka 的整體架構(gòu)主要由 Producer(生產(chǎn)者)伐庭、Broker(代理服務(wù)器)、Consumer(消費(fèi)者)和 ZooKeeper 組成。生產(chǎn)者負(fù)責(zé)將消息發(fā)送到 Kafka 的主題(Topic)中,Broker 是 Kafka 的核心服務(wù)西设,負(fù)責(zé)存儲(chǔ)和管理消息,消費(fèi)者從主題中訂閱并消費(fèi)消息答朋,ZooKeeper 則用于集群的元數(shù)據(jù)管理和協(xié)調(diào)贷揽。
2. 網(wǎng)絡(luò)通信模塊
2.1 NIO 網(wǎng)絡(luò)模型
Kafka 使用 Java 的 NIO(Non - Blocking I/O)來實(shí)現(xiàn)高效的網(wǎng)絡(luò)通信。在 KafkaServer
啟動(dòng)時(shí)梦碗,會(huì)創(chuàng)建 SocketServer
來處理網(wǎng)絡(luò)連接禽绪。SocketServer
包含多個(gè) Acceptor
線程和多個(gè) Processor
線程。
-
Acceptor 線程:負(fù)責(zé)監(jiān)聽客戶端的連接請(qǐng)求洪规,當(dāng)有新的連接到來時(shí)印屁,將其分配給一個(gè)
Processor
線程。 -
Processor 線程:負(fù)責(zé)處理客戶端的請(qǐng)求和響應(yīng)斩例。每個(gè)
Processor
線程維護(hù)一個(gè)Selector
雄人,用于監(jiān)聽多個(gè)客戶端連接的讀寫事件。
以下是 Acceptor
線程的部分關(guān)鍵代碼:
class Acceptor extends AbstractServerThread {
private final ServerSocketChannel serverChannel;
private final List<Processor> processors;
private int currentProcessor = 0;
public Acceptor(String name, int port, List<Processor> processors) throws IOException {
super(name);
this.serverChannel = openServerSocket(port);
this.processors = processors;
}
@Override
public void run() {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT);
while (isRunning()) {
try {
nioSelector.select(500);
Set<SelectionKey> keys = nioSelector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
accept(key);
}
}
} catch (IOException e) {
// 處理異常
}
}
}
private void accept(SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
try {
socketChannel.configureBlocking(false);
Processor processor = nextProcessor();
processor.accept(socketChannel);
} catch (Exception e) {
// 處理異常
}
}
private Processor nextProcessor() {
Processor processor = processors.get(currentProcessor);
currentProcessor = (currentProcessor + 1) % processors.size();
return processor;
}
}
2.2 請(qǐng)求處理流程
當(dāng) Processor
線程接收到客戶端的請(qǐng)求后,會(huì)將請(qǐng)求封裝成 RequestChannel.Request
對(duì)象础钠,并將其放入 RequestChannel
的請(qǐng)求隊(duì)列中恰力。KafkaRequestHandlerPool
中的多個(gè) KafkaRequestHandler
線程會(huì)從請(qǐng)求隊(duì)列中取出請(qǐng)求進(jìn)行處理。處理完成后旗吁,將響應(yīng)結(jié)果封裝成 RequestChannel.Response
對(duì)象踩萎,并返回給客戶端。
3. 存儲(chǔ)模塊
3.1 日志文件結(jié)構(gòu)
Kafka 的消息存儲(chǔ)在日志文件中很钓,每個(gè)主題的每個(gè)分區(qū)對(duì)應(yīng)一個(gè)日志目錄香府,日志目錄下包含多個(gè)日志段(Log Segment)文件。每個(gè)日志段文件由一個(gè) .log
文件和一個(gè) .index
文件組成码倦。
- .log 文件:存儲(chǔ)實(shí)際的消息數(shù)據(jù)企孩。
- .index 文件:存儲(chǔ)消息的偏移量和對(duì)應(yīng)的物理文件位置的索引信息,用于快速定位消息叹洲。
3.2 日志追加和讀取
當(dāng)生產(chǎn)者發(fā)送消息時(shí)柠硕,Kafka 會(huì)將消息追加到對(duì)應(yīng)的日志段文件中。在 Log
類中运提,有 append
方法用于實(shí)現(xiàn)消息的追加操作。當(dāng)消費(fèi)者讀取消息時(shí)闻葵,會(huì)根據(jù)偏移量和索引信息在日志文件中查找對(duì)應(yīng)的消息民泵。
以下是 Log
類的 append
方法的部分關(guān)鍵代碼:
public class Log {
private final List<LogSegment> segments;
public synchronized LogAppendInfo append(MemoryRecords records, boolean assignOffsets) {
LogSegment segment = maybeRoll(records.sizeInBytes());
return segment.append(records, assignOffsets);
}
private LogSegment maybeRoll(int size) {
LogSegment lastSegment = segments.get(segments.size() - 1);
if (lastSegment.size() + size > config.segmentSize) {
lastSegment = roll();
}
return lastSegment;
}
private LogSegment roll() {
// 創(chuàng)建新的日志段文件
LogSegment newSegment = LogSegment.open(/* 參數(shù) */);
segments.add(newSegment);
return newSegment;
}
}
4. 副本機(jī)制
4.1 副本的概念
Kafka 為了保證數(shù)據(jù)的可靠性和高可用性,采用了副本機(jī)制槽畔。每個(gè)分區(qū)可以有多個(gè)副本栈妆,其中一個(gè)副本作為領(lǐng)導(dǎo)者(Leader),負(fù)責(zé)處理客戶端的讀寫請(qǐng)求厢钧,其他副本作為追隨者(Follower)鳞尔,從領(lǐng)導(dǎo)者副本同步數(shù)據(jù)。
4.2 領(lǐng)導(dǎo)者選舉
當(dāng)領(lǐng)導(dǎo)者副本出現(xiàn)故障時(shí)早直,需要進(jìn)行領(lǐng)導(dǎo)者選舉寥假。Kafka 使用 ZooKeeper 來存儲(chǔ)分區(qū)的元數(shù)據(jù)信息,包括領(lǐng)導(dǎo)者副本的信息霞扬。當(dāng)領(lǐng)導(dǎo)者副本失效時(shí)糕韧,ZooKeeper 會(huì)觸發(fā)領(lǐng)導(dǎo)者選舉機(jī)制,從存活的追隨者副本中選舉出新的領(lǐng)導(dǎo)者喻圃。
4.3 數(shù)據(jù)同步
追隨者副本通過向領(lǐng)導(dǎo)者副本發(fā)送 Fetch 請(qǐng)求來同步數(shù)據(jù)萤彩。領(lǐng)導(dǎo)者副本接收到 Fetch 請(qǐng)求后,會(huì)將相應(yīng)的消息數(shù)據(jù)返回給追隨者副本斧拍。追隨者副本將接收到的數(shù)據(jù)追加到自己的日志文件中雀扶。
5. 消費(fèi)者組和偏移量管理
5.1 消費(fèi)者組
消費(fèi)者組是 Kafka 實(shí)現(xiàn)消息分發(fā)和負(fù)載均衡的重要概念。多個(gè)消費(fèi)者可以組成一個(gè)消費(fèi)者組肆汹,共同消費(fèi)一個(gè)主題的消息愚墓。Kafka 會(huì)將主題的分區(qū)均勻地分配給消費(fèi)者組中的消費(fèi)者予权。
5.2 偏移量管理
消費(fèi)者在消費(fèi)消息時(shí),需要記錄自己消費(fèi)到的位置转绷,即偏移量伟件。Kafka 將消費(fèi)者的偏移量信息存儲(chǔ)在一個(gè)特殊的主題 __consumer_offsets
中。消費(fèi)者在提交偏移量時(shí)议经,會(huì)將偏移量信息發(fā)送到 __consumer_offsets
主題中進(jìn)行存儲(chǔ)斧账。
6. 總結(jié)
Kafka 的源碼設(shè)計(jì)非常復(fù)雜且精妙,通過高效的網(wǎng)絡(luò)通信煞肾、合理的存儲(chǔ)管理咧织、可靠的副本機(jī)制和靈活的消費(fèi)者組管理,實(shí)現(xiàn)了高性能籍救、高可用性和分布式的消息隊(duì)列系統(tǒng)习绢。深入理解 Kafka 源碼有助于我們更好地使用和優(yōu)化 Kafka,同時(shí)也能為我們?cè)O(shè)計(jì)和實(shí)現(xiàn)其他分布式系統(tǒng)提供寶貴的經(jīng)驗(yàn)蝙昙。