Kafka 源碼深度解析

1. 整體架構(gòu)概述

Kafka 的整體架構(gòu)主要由 Producer(生產(chǎn)者)伐庭、Broker(代理服務(wù)器)、Consumer(消費(fèi)者)和 ZooKeeper 組成。生產(chǎn)者負(fù)責(zé)將消息發(fā)送到 Kafka 的主題(Topic)中,Broker 是 Kafka 的核心服務(wù)西设,負(fù)責(zé)存儲(chǔ)和管理消息,消費(fèi)者從主題中訂閱并消費(fèi)消息答朋,ZooKeeper 則用于集群的元數(shù)據(jù)管理和協(xié)調(diào)贷揽。

2. 網(wǎng)絡(luò)通信模塊

2.1 NIO 網(wǎng)絡(luò)模型

Kafka 使用 Java 的 NIO(Non - Blocking I/O)來實(shí)現(xiàn)高效的網(wǎng)絡(luò)通信。在 KafkaServer 啟動(dòng)時(shí)梦碗,會(huì)創(chuàng)建 SocketServer 來處理網(wǎng)絡(luò)連接禽绪。SocketServer 包含多個(gè) Acceptor 線程和多個(gè) Processor 線程。

  • Acceptor 線程:負(fù)責(zé)監(jiān)聽客戶端的連接請(qǐng)求洪规,當(dāng)有新的連接到來時(shí)印屁,將其分配給一個(gè) Processor 線程。
  • Processor 線程:負(fù)責(zé)處理客戶端的請(qǐng)求和響應(yīng)斩例。每個(gè) Processor 線程維護(hù)一個(gè) Selector雄人,用于監(jiān)聽多個(gè)客戶端連接的讀寫事件。

以下是 Acceptor 線程的部分關(guān)鍵代碼:

class Acceptor extends AbstractServerThread {
    private final ServerSocketChannel serverChannel;
    private final List<Processor> processors;
    private int currentProcessor = 0;

    public Acceptor(String name, int port, List<Processor> processors) throws IOException {
        super(name);
        this.serverChannel = openServerSocket(port);
        this.processors = processors;
    }

    @Override
    public void run() {
        serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT);
        while (isRunning()) {
            try {
                nioSelector.select(500);
                Set<SelectionKey> keys = nioSelector.selectedKeys();
                Iterator<SelectionKey> iter = keys.iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    if (key.isAcceptable()) {
                        accept(key);
                    }
                }
            } catch (IOException e) {
                // 處理異常
            }
        }
    }

    private void accept(SelectionKey key) throws IOException {
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        try {
            socketChannel.configureBlocking(false);
            Processor processor = nextProcessor();
            processor.accept(socketChannel);
        } catch (Exception e) {
            // 處理異常
        }
    }

    private Processor nextProcessor() {
        Processor processor = processors.get(currentProcessor);
        currentProcessor = (currentProcessor + 1) % processors.size();
        return processor;
    }
}

2.2 請(qǐng)求處理流程

當(dāng) Processor 線程接收到客戶端的請(qǐng)求后,會(huì)將請(qǐng)求封裝成 RequestChannel.Request 對(duì)象础钠,并將其放入 RequestChannel 的請(qǐng)求隊(duì)列中恰力。KafkaRequestHandlerPool 中的多個(gè) KafkaRequestHandler 線程會(huì)從請(qǐng)求隊(duì)列中取出請(qǐng)求進(jìn)行處理。處理完成后旗吁,將響應(yīng)結(jié)果封裝成 RequestChannel.Response 對(duì)象踩萎,并返回給客戶端。

3. 存儲(chǔ)模塊

3.1 日志文件結(jié)構(gòu)

Kafka 的消息存儲(chǔ)在日志文件中很钓,每個(gè)主題的每個(gè)分區(qū)對(duì)應(yīng)一個(gè)日志目錄香府,日志目錄下包含多個(gè)日志段(Log Segment)文件。每個(gè)日志段文件由一個(gè) .log 文件和一個(gè) .index 文件組成码倦。

  • .log 文件:存儲(chǔ)實(shí)際的消息數(shù)據(jù)企孩。
  • .index 文件:存儲(chǔ)消息的偏移量和對(duì)應(yīng)的物理文件位置的索引信息,用于快速定位消息叹洲。

3.2 日志追加和讀取

當(dāng)生產(chǎn)者發(fā)送消息時(shí)柠硕,Kafka 會(huì)將消息追加到對(duì)應(yīng)的日志段文件中。在 Log 類中运提,有 append 方法用于實(shí)現(xiàn)消息的追加操作。當(dāng)消費(fèi)者讀取消息時(shí)闻葵,會(huì)根據(jù)偏移量和索引信息在日志文件中查找對(duì)應(yīng)的消息民泵。

以下是 Log 類的 append 方法的部分關(guān)鍵代碼:

public class Log {
    private final List<LogSegment> segments;

    public synchronized LogAppendInfo append(MemoryRecords records, boolean assignOffsets) {
        LogSegment segment = maybeRoll(records.sizeInBytes());
        return segment.append(records, assignOffsets);
    }

    private LogSegment maybeRoll(int size) {
        LogSegment lastSegment = segments.get(segments.size() - 1);
        if (lastSegment.size() + size > config.segmentSize) {
            lastSegment = roll();
        }
        return lastSegment;
    }

    private LogSegment roll() {
        // 創(chuàng)建新的日志段文件
        LogSegment newSegment = LogSegment.open(/* 參數(shù) */);
        segments.add(newSegment);
        return newSegment;
    }
}

4. 副本機(jī)制

4.1 副本的概念

Kafka 為了保證數(shù)據(jù)的可靠性和高可用性,采用了副本機(jī)制槽畔。每個(gè)分區(qū)可以有多個(gè)副本栈妆,其中一個(gè)副本作為領(lǐng)導(dǎo)者(Leader),負(fù)責(zé)處理客戶端的讀寫請(qǐng)求厢钧,其他副本作為追隨者(Follower)鳞尔,從領(lǐng)導(dǎo)者副本同步數(shù)據(jù)。

4.2 領(lǐng)導(dǎo)者選舉

當(dāng)領(lǐng)導(dǎo)者副本出現(xiàn)故障時(shí)早直,需要進(jìn)行領(lǐng)導(dǎo)者選舉寥假。Kafka 使用 ZooKeeper 來存儲(chǔ)分區(qū)的元數(shù)據(jù)信息,包括領(lǐng)導(dǎo)者副本的信息霞扬。當(dāng)領(lǐng)導(dǎo)者副本失效時(shí)糕韧,ZooKeeper 會(huì)觸發(fā)領(lǐng)導(dǎo)者選舉機(jī)制,從存活的追隨者副本中選舉出新的領(lǐng)導(dǎo)者喻圃。

4.3 數(shù)據(jù)同步

追隨者副本通過向領(lǐng)導(dǎo)者副本發(fā)送 Fetch 請(qǐng)求來同步數(shù)據(jù)萤彩。領(lǐng)導(dǎo)者副本接收到 Fetch 請(qǐng)求后,會(huì)將相應(yīng)的消息數(shù)據(jù)返回給追隨者副本斧拍。追隨者副本將接收到的數(shù)據(jù)追加到自己的日志文件中雀扶。

5. 消費(fèi)者組和偏移量管理

5.1 消費(fèi)者組

消費(fèi)者組是 Kafka 實(shí)現(xiàn)消息分發(fā)和負(fù)載均衡的重要概念。多個(gè)消費(fèi)者可以組成一個(gè)消費(fèi)者組肆汹,共同消費(fèi)一個(gè)主題的消息愚墓。Kafka 會(huì)將主題的分區(qū)均勻地分配給消費(fèi)者組中的消費(fèi)者予权。

5.2 偏移量管理

消費(fèi)者在消費(fèi)消息時(shí),需要記錄自己消費(fèi)到的位置转绷,即偏移量伟件。Kafka 將消費(fèi)者的偏移量信息存儲(chǔ)在一個(gè)特殊的主題 __consumer_offsets 中。消費(fèi)者在提交偏移量時(shí)议经,會(huì)將偏移量信息發(fā)送到 __consumer_offsets 主題中進(jìn)行存儲(chǔ)斧账。

6. 總結(jié)

Kafka 的源碼設(shè)計(jì)非常復(fù)雜且精妙,通過高效的網(wǎng)絡(luò)通信煞肾、合理的存儲(chǔ)管理咧织、可靠的副本機(jī)制和靈活的消費(fèi)者組管理,實(shí)現(xiàn)了高性能籍救、高可用性和分布式的消息隊(duì)列系統(tǒng)习绢。深入理解 Kafka 源碼有助于我們更好地使用和優(yōu)化 Kafka,同時(shí)也能為我們?cè)O(shè)計(jì)和實(shí)現(xiàn)其他分布式系統(tǒng)提供寶貴的經(jīng)驗(yàn)蝙昙。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末闪萄,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子奇颠,更是在濱河造成了極大的恐慌败去,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,607評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件烈拒,死亡現(xiàn)場(chǎng)離奇詭異圆裕,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)荆几,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,239評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門吓妆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人吨铸,你說我怎么就攤上這事行拢。” “怎么了焊傅?”我有些...
    開封第一講書人閱讀 164,960評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵剂陡,是天一觀的道長。 經(jīng)常有香客問我狐胎,道長鸭栖,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,750評(píng)論 1 294
  • 正文 為了忘掉前任握巢,我火速辦了婚禮晕鹊,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己溅话,他們只是感情好晓锻,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,764評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著飞几,像睡著了一般砚哆。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上屑墨,一...
    開封第一講書人閱讀 51,604評(píng)論 1 305
  • 那天躁锁,我揣著相機(jī)與錄音,去河邊找鬼卵史。 笑死战转,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的以躯。 我是一名探鬼主播槐秧,決...
    沈念sama閱讀 40,347評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼忧设!你這毒婦竟也來了刁标?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,253評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤址晕,失蹤者是張志新(化名)和其女友劉穎命雀,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體斩箫,經(jīng)...
    沈念sama閱讀 45,702評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,893評(píng)論 3 336
  • 正文 我和宋清朗相戀三年撵儿,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了乘客。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,015評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡淀歇,死狀恐怖易核,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情浪默,我是刑警寧澤牡直,帶...
    沈念sama閱讀 35,734評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站纳决,受9級(jí)特大地震影響碰逸,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜阔加,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,352評(píng)論 3 330
  • 文/蒙蒙 一饵史、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦胳喷、人聲如沸湃番。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,934評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽吠撮。三九已至,卻和暖如春讲竿,著一層夾襖步出監(jiān)牢的瞬間泥兰,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,052評(píng)論 1 270
  • 我被黑心中介騙來泰國打工戴卜, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留逾条,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,216評(píng)論 3 371
  • 正文 我出身青樓投剥,卻偏偏與公主長得像师脂,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子江锨,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,969評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容