kafka發(fā)送者線程一

kafka生產(chǎn)者線程負(fù)責(zé)生產(chǎn)消息顶别,而將消息發(fā)送給broker是有一個(gè)專門的發(fā)送者線程來(lái)處理的谷徙,也稱之為IO Thread,實(shí)現(xiàn)了消息的生產(chǎn)與發(fā)送解耦驯绎,提高吞吐量完慧。

IO Thread是隨著生產(chǎn)者的產(chǎn)生而啟動(dòng)的,在啟動(dòng)發(fā)送者線程之前會(huì)先初始化一個(gè)消息累加器

this.accumulator = new RecordAccumulator(logContext,
                    config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                    this.compressionType,
                    lingerMs(config),
                    retryBackoffMs,
                    deliveryTimeoutMs,
                    metrics,
                    PRODUCER_METRIC_GROUP_NAME,
                    time,
                    apiVersions,
                    transactionManager,
                    new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));

這里有幾個(gè)參數(shù)需要注意下:
kafka會(huì)將發(fā)往同一個(gè)分區(qū)的消息累積到同一個(gè)批次中剩失,就會(huì)涉及到兩個(gè)維度屈尼,空間和時(shí)間,不然會(huì)陷入無(wú)限等待批次消息的累積
batchSize拴孤,該參數(shù)就是指的空間脾歧,也就是每個(gè)批次累積多少消息,或者每個(gè)批次的分配的內(nèi)存大小演熟,太大會(huì)造成內(nèi)存浪費(fèi)鞭执,
太小的話,當(dāng)消息一多時(shí),會(huì)有多個(gè)批次需要發(fā)送兄纺,降低了吞吐量
lingerMs免猾,該參數(shù)就是指的時(shí)間,就是每個(gè)批次延遲時(shí)間的上限囤热,但是假如批次信息達(dá)到batchSize的值后,lingerMs就會(huì)失效
當(dāng)批次的大小還未達(dá)到batchSize時(shí)获三,發(fā)送消息會(huì)延遲旁蔼,即使當(dāng)前kafka沒(méi)有負(fù)載壓力的情況下
deliveryTimeoutMs,調(diào)send方法到return的時(shí)間限制疙教,包含消息的延遲發(fā)送時(shí)間棺聊、收到broker響應(yīng)的時(shí)間、發(fā)送失敗重試的時(shí)間三者總和的時(shí)間限制贞谓。
該值應(yīng)該大于或等于request.timeout.ms+linger.ms的時(shí)間

當(dāng)對(duì)生產(chǎn)者的相關(guān)屬性值和累加器都初始化后限佩,kafka會(huì)自動(dòng)啟動(dòng)一個(gè)io thread

this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();

其中,KafkaClient是實(shí)際負(fù)責(zé)發(fā)送消息的客戶端裸弦,底層基于nio實(shí)現(xiàn)的祟同,與broker進(jìn)行網(wǎng)絡(luò)通信;KafkaThread繼承了Thread理疙;設(shè)置了發(fā)送者線程的名稱晕城、運(yùn)行的任務(wù)runnable、以及線程模式為daemon窖贤,這里為true砖顷,因此發(fā)送者線程的主要邏輯在于runnable的run方法,也就是Sender的run

public void run() {
    log.debug("Starting Kafka producer I/O thread.");

    // main loop, runs until close is called
    while (running) {
        try {
            runOnce();
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }

    log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

 省略....
}

從run方法可看出赃梧,就是不斷執(zhí)行runOnce方法滤蝠,因此來(lái)看下runOnce方法的邏輯

void runOnce() {
    if (transactionManager != null) {
        //忽略....
        //處理事務(wù)相關(guān)的
    }

    long currentTimeMs = time.milliseconds();
//建立與broker的連接,準(zhǔn)備待發(fā)送消息的數(shù)據(jù)
    long pollTimeout = sendProducerData(currentTimeMs);
//處理連接上發(fā)生的各種IO事件授嘀,包含獲取來(lái)自broker的數(shù)據(jù)物咳,發(fā)送實(shí)際的消息對(duì)象
    client.poll(pollTimeout, currentTimeMs);
}

在sendProducerData的方法中,看下以下幾個(gè)關(guān)鍵的邏輯

Cluster cluster = metadata.fetch();
// 通過(guò)累加器獲取到準(zhǔn)備發(fā)送數(shù)據(jù)的分區(qū)
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

//和這些分區(qū)的節(jié)點(diǎn)建立連接粤攒,底層是通過(guò)nio的方式建立連接的
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
    Node node = iter.next();
    if (!this.client.ready(node, now)) {
        iter.remove();
        notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
    }
}
//獲取待發(fā)送的批次信息
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
//省略...

//將批次信息轉(zhuǎn)化為具體的網(wǎng)絡(luò)請(qǐng)求信息所森,看如下重載的方法
sendProduceRequests(batches, now);
return pollTimeout;


//簡(jiǎn)單的說(shuō),這個(gè)方法就是將與節(jié)點(diǎn)連接的channel的監(jiān)聽(tīng)事件設(shè)置為OP_WRITE表示可寫的夯接,
//然后構(gòu)建相應(yīng)的請(qǐng)求消息體焕济,放置到ByteBuffer中
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
    if (batches.isEmpty())
        return;

    Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
    final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

  //
    ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
            produceRecordsByPartition, transactionalId);
    RequestCompletionHandler callback = new RequestCompletionHandler() {
        public void onComplete(ClientResponse response) {
            handleProduceResponse(response, recordsByPartition, time.milliseconds());
        }
    };

    String nodeId = Integer.toString(destination);
    ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
            requestTimeoutMs, callback);
    client.send(clientRequest, now);
}

sendProducerData方法就是建立與broker的連接,注冊(cè)到selector盔几,設(shè)置監(jiān)聽(tīng)事件為op_write晴弃,準(zhǔn)備好相應(yīng)的請(qǐng)求數(shù)據(jù)(待發(fā)送的消息)。需要注意的是,此時(shí)上鞠,消息還沒(méi)有真正的發(fā)送出去际邻。真正發(fā)送的消息在runOnce方法內(nèi)部調(diào)用client.poll方法

public List<ClientResponse> poll(long timeout, long now) {
    ensureActive();
   //省略.....
    long metadataTimeout = metadataUpdater.maybeUpdate(now);
    try {
//這里底層就是調(diào)用selector的select方法,獲取到可以處理IO事件的channel芍阎,根據(jù)selectionKey的類型進(jìn)行相對(duì)應(yīng)的處理世曾,
//分別為isConnectable、isReadable谴咸、isWritable
        this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
    } catch (IOException e) {
        log.error("Unexpected error during I/O", e);
    }
    // process completed actions
    long updatedNow = this.time.milliseconds();
    List<ClientResponse> responses = new ArrayList<>();
    handleCompletedSends(responses, updatedNow);
    handleCompletedReceives(responses, updatedNow);
    handleDisconnections(responses, updatedNow);
    handleConnections();
    handleInitiateApiVersionRequests(updatedNow);
    handleTimedOutRequests(responses, updatedNow);
    completeResponses(responses);

    return responses;
}

下面的兩個(gè)方法為與broker節(jié)點(diǎn)建立連接的底層nio實(shí)現(xiàn)

private void initiateConnect(Node node, long now) {
    String nodeConnectionId = node.idString();
    try {
        connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup);
        InetAddress address = connectionStates.currentAddress(nodeConnectionId);
        log.debug("Initiating connection to node {} using address {}", node, address);
        selector.connect(nodeConnectionId,
                new InetSocketAddress(address, node.port()),
                this.socketSendBuffer,
                this.socketReceiveBuffer);
    } catch (IOException e) {
        //省略
}

public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
    ensureNotRegistered(id);
    SocketChannel socketChannel = SocketChannel.open();
    SelectionKey key = null;
    try {
//配置channel為非阻塞模式轮听,并設(shè)置channel的發(fā)送和接收緩沖區(qū)大小
        configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
//建立連接
        boolean connected = doConnect(socketChannel, address);
//將channel注冊(cè)到selector上
        key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);

        if (connected) {
            immediatelyConnectedKeys.add(key);
//若連接成功了,改變channel的監(jiān)聽(tīng)事件
            key.interestOps(0);
        }
    } catch (IOException | RuntimeException e) {
        if (key != null)
            immediatelyConnectedKeys.remove(key);
        channels.remove(id);
        socketChannel.close();
        throw e;
    }
}

總結(jié)岭佳,kafka的發(fā)送者線程底層使用nio來(lái)與broker建立連接與數(shù)據(jù)通信血巍,因此涉及到如何構(gòu)造發(fā)送消息的ByteBuffer對(duì)象,處理來(lái)自broker的響應(yīng)數(shù)據(jù)等珊随,但是本文只是介紹個(gè)大體的方向述寡,并沒(méi)有對(duì)細(xì)節(jié)進(jìn)行詳情的說(shuō)明

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市叶洞,隨后出現(xiàn)的幾起案子鲫凶,更是在濱河造成了極大的恐慌,老刑警劉巖京办,帶你破解...
    沈念sama閱讀 217,277評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件掀序,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡惭婿,警方通過(guò)查閱死者的電腦和手機(jī)不恭,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)财饥,“玉大人换吧,你說(shuō)我怎么就攤上這事≡啃牵” “怎么了沾瓦?”我有些...
    開(kāi)封第一講書人閱讀 163,624評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)谦炒。 經(jīng)常有香客問(wèn)我贯莺,道長(zhǎng),這世上最難降的妖魔是什么宁改? 我笑而不...
    開(kāi)封第一講書人閱讀 58,356評(píng)論 1 293
  • 正文 為了忘掉前任缕探,我火速辦了婚禮,結(jié)果婚禮上还蹲,老公的妹妹穿的比我還像新娘爹耗。我一直安慰自己耙考,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,402評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布潭兽。 她就那樣靜靜地躺著倦始,像睡著了一般。 火紅的嫁衣襯著肌膚如雪山卦。 梳的紋絲不亂的頭發(fā)上鞋邑,一...
    開(kāi)封第一講書人閱讀 51,292評(píng)論 1 301
  • 那天,我揣著相機(jī)與錄音账蓉,去河邊找鬼炫狱。 笑死,一個(gè)胖子當(dāng)著我的面吹牛剔猿,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播嬉荆,決...
    沈念sama閱讀 40,135評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼归敬,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了鄙早?” 一聲冷哼從身側(cè)響起汪茧,我...
    開(kāi)封第一講書人閱讀 38,992評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎限番,沒(méi)想到半個(gè)月后舱污,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,429評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡弥虐,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,636評(píng)論 3 334
  • 正文 我和宋清朗相戀三年扩灯,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片霜瘪。...
    茶點(diǎn)故事閱讀 39,785評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡珠插,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出颖对,到底是詐尸還是另有隱情捻撑,我是刑警寧澤,帶...
    沈念sama閱讀 35,492評(píng)論 5 345
  • 正文 年R本政府宣布缤底,位于F島的核電站顾患,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏个唧。R本人自食惡果不足惜江解,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,092評(píng)論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望坑鱼。 院中可真熱鬧膘流,春花似錦絮缅、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,723評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至彭谁,卻和暖如春吸奴,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背缠局。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,858評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工则奥, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人狭园。 一個(gè)月前我還...
    沈念sama閱讀 47,891評(píng)論 2 370
  • 正文 我出身青樓读处,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親唱矛。 傳聞我的和親對(duì)象是個(gè)殘疾皇子罚舱,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,713評(píng)論 2 354