Kafka源碼分析(四)高吞吐核心——RecordAccumulator消息發(fā)送過(guò)程

上一篇文章講的是在主線(xiàn)程慎皱,消息在調(diào)用了send后楼雹,消息內(nèi)容和該消息關(guān)聯(lián)的future對(duì)象被一起放入了RecordAccumulator中,future對(duì)象最終被send方法返回闲礼。對(duì)于客戶(hù)端來(lái)說(shuō)寡具,send方法返回了秤茅,但是send方法返回并不代表消息已經(jīng)被成功發(fā)送到Broker了,如果接下去的任意行為都是需要確保消息成功發(fā)送的情況下進(jìn)行童叠,客戶(hù)端需要調(diào)用future.get()等待future的完成嫂伞。

這一節(jié)繼續(xù)接下去的工作。消息被主線(xiàn)程放入RecordAccumulator后拯钻,主線(xiàn)程早就撒手不管了帖努,這時(shí)一個(gè)叫做Sender線(xiàn)程會(huì)從RecordAccumulator把消息拉出來(lái),并且發(fā)送給Broker粪般。Sender線(xiàn)程早在構(gòu)造KafkaProducer的時(shí)候拼余,已經(jīng)被創(chuàng)建和啟動(dòng)。

KafkaProducer(ProducerConfig config,
                Serializer<K> keySerializer,
                Serializer<V> valueSerializer,
                Metadata metadata,
                KafkaClient kafkaClient) {
    try {
        // ...
        this.sender = new Sender(logContext,
                client,
                this.metadata,
                this.accumulator,
                maxInflightRequests == 1,
                config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                acks,
                retries,
                metricsRegistry.senderMetrics,
                Time.SYSTEM,
                this.requestTimeoutMs,
                config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                this.transactionManager,
                apiVersions);
        String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
        this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
        this.ioThread.start();
        // ...
    } catch (Throwable t) {
        // ...
    }
}

Sender線(xiàn)程是一個(gè)事件循環(huán)亩歹,總是在while循環(huán)中做一些事情匙监,接下來(lái)主要分析這兩個(gè)事情

// org.apache.kafka.clients.producer.internals.Sender#run(long)
void run(long now) {
    // ..

    long pollTimeout = sendProducerData(now);
    client.poll(pollTimeout, now);
}

sendProducerData方法中,簡(jiǎn)化下它的主要邏輯

private long sendProducerData(long now) {
    Cluster cluster = metadata.fetch();

    // get the list of partitions with data ready to send
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

    // code

    // remove any nodes we aren't ready to send to
    // code

    // create produce requests
    Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
            this.maxRequestSize, now);
    // code

    // code
    sendProduceRequests(batches, now);

    return pollTimeout;
}

首先調(diào)用this.accumulator.ready(cluster, now)找到哪一些Broker是已經(jīng)準(zhǔn)備好的小作。然后再調(diào)用this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now)將那些已經(jīng)準(zhǔn)備的Broker上的Batch進(jìn)行重新整理后亭姥,全部從RecordAccumulator的Deque中取出來(lái),發(fā)送出去顾稀。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末达罗,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子静秆,更是在濱河造成了極大的恐慌粮揉,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,204評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件抚笔,死亡現(xiàn)場(chǎng)離奇詭異扶认,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)殊橙,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)辐宾,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)狱从,“玉大人,你說(shuō)我怎么就攤上這事叠纹〗煤唬” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,548評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵吊洼,是天一觀的道長(zhǎng)训貌。 經(jīng)常有香客問(wèn)我,道長(zhǎng)冒窍,這世上最難降的妖魔是什么递沪? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,657評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮综液,結(jié)果婚禮上款慨,老公的妹妹穿的比我還像新娘。我一直安慰自己谬莹,他們只是感情好檩奠,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,689評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著附帽,像睡著了一般埠戳。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蕉扮,一...
    開(kāi)封第一講書(shū)人閱讀 51,554評(píng)論 1 305
  • 那天整胃,我揣著相機(jī)與錄音,去河邊找鬼喳钟。 笑死屁使,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的奔则。 我是一名探鬼主播蛮寂,決...
    沈念sama閱讀 40,302評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼易茬!你這毒婦竟也來(lái)了酬蹋?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,216評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤疾呻,失蹤者是張志新(化名)和其女友劉穎除嘹,沒(méi)想到半個(gè)月后写半,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體岸蜗,經(jīng)...
    沈念sama閱讀 45,661評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,851評(píng)論 3 336
  • 正文 我和宋清朗相戀三年叠蝇,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了璃岳。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片年缎。...
    茶點(diǎn)故事閱讀 39,977評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖铃慷,靈堂內(nèi)的尸體忽然破棺而出单芜,到底是詐尸還是另有隱情,我是刑警寧澤犁柜,帶...
    沈念sama閱讀 35,697評(píng)論 5 347
  • 正文 年R本政府宣布洲鸠,位于F島的核電站,受9級(jí)特大地震影響馋缅,放射性物質(zhì)發(fā)生泄漏扒腕。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,306評(píng)論 3 330
  • 文/蒙蒙 一萤悴、第九天 我趴在偏房一處隱蔽的房頂上張望瘾腰。 院中可真熱鬧,春花似錦覆履、人聲如沸蹋盆。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,898評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)栖雾。三九已至,卻和暖如春伟众,著一層夾襖步出監(jiān)牢的瞬間岩灭,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,019評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工赂鲤, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留噪径,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,138評(píng)論 3 370
  • 正文 我出身青樓数初,卻偏偏與公主長(zhǎng)得像找爱,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子泡孩,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,927評(píng)論 2 355