Kafka系列《二》-- 生產(chǎn)者Producer中的請求及源碼詳解

1. 背景

系列文章

在前一篇系列文章Kafka系列《一》-- 生產(chǎn)者Producer流程及Partition詳解中,相信已經(jīng)對Kafka的整體流程有所了解了徒溪,在這篇文章中在進一步看看producer中的那些請求及相關(guān)源碼解析忿偷;

有了前一篇的基礎(chǔ),再看這一片相信就覺得理所當(dāng)然了臊泌。

2. Producer中的請求及其響應(yīng)

2.1 API_VERSIONS請求

API_VERSIONS請求用于協(xié)商producer和broken之間的API版本鲤桥,在初始請求時會使用最大的版本號發(fā)送請求

Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=4, clientId=consumer-group1-1, correlationId=1, headerVersion=2) and timeout 30000 to node -1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.9.0')

當(dāng)broken不支持這個最新的版本號時,會返回錯誤渠概,響應(yīng)中errorCode=35對應(yīng)的是UNSUPPORTED_VERSION錯誤茶凳,由于broken沒有返回自己支持的最大版本號,因此producer會使用最低的版本號0重新發(fā)起一次API_VERSIONS請求

Received API_VERSIONS response from node -1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=4, clientId=consumer-group1-1, correlationId=1, headerVersion=2): ApiVersionsResponseData(errorCode=35, apiKeys=[], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=-1, finalizedFeatures=[], zkMigrationReady=false)

此時broken就會返回所有API支持的版本號播揪;比如apiKey=18對應(yīng)的是API_VERSIONS 請求贮喧,支持的最低版本號是0,最高版本號是2猪狈;再如apiKey=3對應(yīng)的是METADATA請求塞淹,支持的最低版本號是0,最高版本號是8

ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=7), ApiVersion(apiKey=3, minVersion=0, maxVersion=8),ApiVersion(apiKey=18, minVersion=0, maxVersion=2))

獲取到API對應(yīng)的版本后罪裹,后續(xù)producer發(fā)送請求時就按照broken支持的最高版本號發(fā)送消息了饱普。

2.2 METADATA請求

METADATA請求用于更新原數(shù)據(jù)信息,其中使用API版本號就是最大的8状共,并且指定了需要獲取kafka-k8s-test這個topic的原數(shù)據(jù)信息套耕;這里的topic就是我們通過send方法發(fā)送的topic

Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=8, clientId=consumer-group1-1, correlationId=3, headerVersion=1) and timeout 30000 to node -1: MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='kafka-k8s-test')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false)

正常情況下,broken會返回broken list和topic信息

MetadataResponseData(throttleTimeMs=0, brokers=[MetadataResponseBroker(nodeId=4, host='1.1.1.1', port=9092, rack='b1'), MetadataResponseBroker(nodeId=1, host='2.2.2.2', port=9092, rack='b2'), MetadataResponseBroker(nodeId=2, host='3.3.3.3', port=9092, rack='b3'), MetadataResponseBroker(nodeId=3, host='4.4.4.4', port=9092, rack='b4')], clusterId='Yxxxxxxxxx_xxxxxxw', controllerId=1, topics=[MetadataResponseTopic(errorCode=0, name='kafka-k8s-test', topicId=AAAAAAAAAAAAAAAAAAAAAA, isInternal=false, partitions=[MetadataResponsePartition(errorCode=0, partitionIndex=0, leaderId=3, leaderEpoch=1, replicaNodes=[3, 2, 1], isrNodes=[3, 1, 2], offlineReplicas=[]), MetadataResponsePartition(errorCode=0, partitionIndex=2, leaderId=1, leaderEpoch=1, replicaNodes=[1, 3, 2], isrNodes=[1, 3, 2], offlineReplicas=[]), MetadataResponsePartition(errorCode=0, partitionIndex=1, leaderId=2, leaderEpoch=3, replicaNodes=[2, 1, 3], isrNodes=[1, 3, 2], offlineReplicas=[])], topicAuthorizedOperations=0)], clusterAuthorizedOperations=0)

2.3 PRODUCE請求

PRODUCE請求用于發(fā)送producer的數(shù)據(jù)峡继,最大的支持版本是7

Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=7, clientId=producer-1, correlationId=8, headerVersion=1) and timeout 30000 to node 1: {acks=-1,timeout=30000,partitionSizes=[kafka-k8s-test-2=82]}

正常情況下broken的響應(yīng)

ProduceResponseData(responses=[TopicProduceResponse(name='kafka-k8s-test', partitionResponses=[PartitionProduceResponse(index=2, errorCode=0, baseOffset=32, logAppendTimeMs=-1, logStartOffset=28, recordErrors=[], errorMessage=null, currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1))])], throttleTimeMs=0, nodeEndpoints=[])

3. 一些需要反復(fù)看才能懂的代碼

3.1 metadata的更新

producer會緩存broken的狀態(tài)在本地冯袍,對應(yīng)的是一個Map集合,key是broken id,value是這個broken的狀態(tài)康愤,初始為空

private final Map<String, NodeConnectionState> nodeState;

伴隨著metadata的初始化會經(jīng)歷的狀態(tài)變化

null -> CONNECTING -> CHECKING_API_VERSIONS -> READY;

metadata的更新是由NIO線程完成的:

  • 初始為空時儡循,會先判斷到canConnect,然后開始連接broken征冷,注冊通道择膝、OP_CONNECT事件到NIO;并將broken狀態(tài)修改為CONNECTING

  • 然后再進入NIO的select方法检激,等待OP_CONNECT事件到來肴捉;如果連接較慢的時候,NIO超時從select方法醒來叔收,然后繼續(xù)開始下一輪的檢查更新齿穗,此時會判斷到isAnyNodeConnecting,存在CONNECTING狀態(tài)的broken饺律,因此直接再進入NIO的select方法窃页,等待OP_CONNECT事件到來;

  • 監(jiān)聽到OP_CONNECT事件后复濒,還需要發(fā)送API_VERSIONS請求腮出,此時broken狀態(tài)會更新為CHECKING_API_VERSIONS狀態(tài),等待API_VERSIONS請求成功返回后芝薇,才會將broken狀態(tài)改為READY

  • 直達broken狀態(tài)改為READY后,才會滿足canSendRequest這個條件作儿,在這個條件下才會發(fā)送METADATA請求

private long maybeUpdate(long now, Node node) {
            String nodeConnectionId = node.idString();
            // broken狀態(tài)為READY
            if (canSendRequest(nodeConnectionId, now)) {
                Metadata.MetadataRequestAndVersion requestAndVersion = metadata.newMetadataRequestAndVersion(now);
                MetadataRequest.Builder metadataRequest = requestAndVersion.requestBuilder;
                sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
                inProgress = new InProgressData(requestAndVersion.requestVersion, requestAndVersion.isPartialUpdate);
                return defaultRequestTimeoutMs;
            }
            // 存在CONNECTING 狀態(tài)broken
            if (isAnyNodeConnecting()) {
                return reconnectBackoffMs;
            }
            // broken狀態(tài)為空
            if (connectionStates.canConnect(nodeConnectionId, now)) {
               
                initiateConnect(node, now);
                return reconnectBackoffMs;
            }

            return Long.MAX_VALUE;
        }

3.2 send消息存入緩存

send方法發(fā)送消息洛二,本質(zhì)上就是往batches數(shù)組追加批次數(shù)據(jù);可以看到這里隊列的操作都是需要同步鎖的攻锰,因為send線程和NIO線程是可能同時操作隊列的晾嘶;而且NIO發(fā)送失敗的話,對于可以重試的錯誤也會直接將批次重新加入到隊首娶吞,等待重試

但是對于ByteBuffer的分配卻是放在同步代碼塊外面的垒迂,因為ByteBuffer分配是可能需要等待的,因為producer可以分配的最大內(nèi)存默認是32M妒蛇,無法立即分配的ByteBuffer則需要wait內(nèi)存釋放机断;如果分配ByteBuffer也放在同步鎖內(nèi),那NIO線程就沒機會獲取到隊列的鎖了绣夺,進而就沒機會從隊列中移除批次發(fā)送后釋放內(nèi)存吏奸;這就形成了死鎖了;send線程占用了隊列的鎖等著釋放內(nèi)存陶耍,NIO線程想要釋放內(nèi)存但是獲取不到隊列的鎖了奋蔚。

Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
                synchronized (dq) {
                    // After taking the lock, validate that the partition hasn't changed and retry.
                    if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
                        continue;

                    RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
                    if (appendResult != null) {
                        // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
                        boolean enableSwitch = allBatchesFull(dq);
                        topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
                        return appendResult;
                    }
                }

                if (buffer == null) {
                    byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
                    int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression.type(), key, value, headers));
                    
                    buffer = free.allocate(size, maxTimeToBlock);
                    
                    nowMs = time.milliseconds();
                }

                synchronized (dq) {
                    // After taking the lock, validate that the partition hasn't changed and retry.
                    if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
                        continue;

                    RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs);
                    // Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch.
                    if (appendResult.newBatchCreated)
                        buffer = null;
                    // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
                    boolean enableSwitch = allBatchesFull(dq);
                    topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
                    return appendResult;
                }

3.3 NIO處理寫事件

producer請求broken都是通過doSend方法完成的,而請求都會先轉(zhuǎn)為Send對象,這個步驟對于理解NIO處理讀事件至關(guān)重要

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
        String destination = clientRequest.destination();
        RequestHeader header = clientRequest.makeHeader(request.version());
       
        Send send = request.toSend(header);
        InFlightRequest inFlightRequest = new InFlightRequest(
                clientRequest,
                header,
                isInternalRequest,
                request,
                send,
                now);
        this.inFlightRequests.add(inFlightRequest);
        selector.send(new NetworkSend(clientRequest.destination(), send));
    }

在構(gòu)建Send對象的時候泊碑,可以看到其中主要包括開始的4字節(jié)坤按、請求Header(包括請求類型、版本號信息)馒过、請求體(消息主體)臭脓;而開始的4字節(jié)寫入的是整個message的大小builder.writeInt(messageSize.totalSize());,因為int對應(yīng)的就是4字節(jié)沉桌;接著才是寫入請求頭信息谢鹊、請求體信息

private static Send buildSend(
        Message header,
        short headerVersion,
        Message apiMessage,
        short apiVersion
    ) {
        ObjectSerializationCache serializationCache = new ObjectSerializationCache();

        MessageSizeAccumulator messageSize = new MessageSizeAccumulator();
        header.addSize(messageSize, serializationCache, headerVersion);
        apiMessage.addSize(messageSize, serializationCache, apiVersion);

        SendBuilder builder = new SendBuilder(messageSize.sizeExcludingZeroCopy() + 4);
        builder.writeInt(messageSize.totalSize());
        header.write(builder, serializationCache, headerVersion);
        apiMessage.write(builder, serializationCache, apiVersion);

        return builder.build();
    }

注意到了這個細節(jié),再來看看NIO是怎么處理讀事件的就一目了然了留凭。

3.4 NIO處理讀事件

當(dāng)producer收到broken的響應(yīng)后佃扼,就需要從通道讀取數(shù)據(jù),相信很多人看到這里的ByteBuffer.allocate(4)都會很迷惑蔼夜,為什么是4兼耀?而不是其它的數(shù)字呢?

這就和上面的寫數(shù)據(jù)剛好對應(yīng)上了求冷,因為這兩處代碼里使用的是魔法值瘤运,所以有點難直接聯(lián)想到;這也告誡我們代碼里盡量不要使用魔法值~~

public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) {
        this.source = source;
        this.size = ByteBuffer.allocate(4);
        this.buffer = null;
        this.maxSize = maxSize;
        this.memoryPool = memoryPool;
    }

然后開始從通道讀取數(shù)據(jù)的時候匠题,也是要先把size讀滿拯坟,也就是一定要先讀4字節(jié);只有讀完4字節(jié)后!size.hasRemaining()才會開始分配ByteBuffer來接受實際的響應(yīng)數(shù)據(jù)

因為寫數(shù)據(jù)的時候最開始寫入的4字節(jié)是整條請求數(shù)據(jù)的大小韭山,按照Kafka的協(xié)議郁季,響應(yīng)數(shù)據(jù)的最開始4字節(jié)也必須是響應(yīng)數(shù)據(jù)的大小。

public long readFrom(ScatteringByteChannel channel) throws IOException {
        int read = 0;
        if (size.hasRemaining()) {
            int bytesRead = channel.read(size);
            if (bytesRead < 0)
                throw new EOFException();
            read += bytesRead;
            if (!size.hasRemaining()) {
                size.rewind();
                int receiveSize = size.getInt();
                if (receiveSize < 0)
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
                if (maxSize != UNLIMITED && receiveSize > maxSize)
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
                requestedBufferSize = receiveSize; // may be 0 for some payloads (SASL)
                if (receiveSize == 0) {
                    buffer = EMPTY_BUFFER;
                }
            }
        }
        if (buffer == null && requestedBufferSize != -1) { // we know the size we want but haven't been able to allocate it yet
            buffer = memoryPool.tryAllocate(requestedBufferSize);
            if (buffer == null)
                log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);
        }
        if (buffer != null) {
            int bytesRead = channel.read(buffer);
            if (bytesRead < 0)
                throw new EOFException();
            read += bytesRead;
        }

        return read;
    }

獲取到響應(yīng)數(shù)據(jù)的大小后钱磅,就可以為其分配ByteBuffer來接受實際的響應(yīng)數(shù)據(jù)了梦裂;而且也不用擔(dān)心TCP的粘包和拆包問題了:

  • 粘包問題:多個響應(yīng)數(shù)據(jù)同時返回了,但是現(xiàn)在producer已經(jīng)有了響應(yīng)數(shù)據(jù)的實際大小了盖淡,多的數(shù)據(jù)也不會讀取了年柠,因為分配的ByteBuffer的大小就是實際響應(yīng)數(shù)據(jù)的大小褪迟;多余的數(shù)據(jù)直接等待下一次處理讀事件時再重頭開始接收即可

  • 拆包問題:響應(yīng)數(shù)據(jù)包被拆成多個了冗恨,但是現(xiàn)在producer已經(jīng)有了響應(yīng)數(shù)據(jù)的實際大小了,如果沒能讀到完整的ByteBuffer味赃,那么這里的buffer.hasRemaining()就是true了派近,也就是本次select的complete是無法完成的,那么本次接受就會按照未完成的接收處理洁桌,等待下一次讀事件再次觸發(fā)時渴丸,繼續(xù)往這次接收里寫入數(shù)據(jù),直到寫滿ByteBuffer

public NetworkReceive maybeCompleteReceive() {
        if (receive != null && receive.complete()) {
            receive.payload().rewind();
            NetworkReceive result = receive;
            receive = null;
            return result;
        }
        return null;
    }

@Override
    public boolean complete() {
        return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
    }

Kafka中教科書級別的處理粘包/拆包問題,有沒有給你一些觸動呢谱轨?

繼續(xù)閱讀:

你覺得producer里還有哪些寫的很好的代碼或者很難理解的代碼戒幔,歡迎評論區(qū)一起學(xué)習(xí)~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市土童,隨后出現(xiàn)的幾起案子诗茎,更是在濱河造成了極大的恐慌,老刑警劉巖献汗,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件敢订,死亡現(xiàn)場離奇詭異,居然都是意外死亡罢吃,警方通過查閱死者的電腦和手機楚午,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來尿招,“玉大人矾柜,你說我怎么就攤上這事【兔眨” “怎么了怪蔑?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長丧荐。 經(jīng)常有香客問我缆瓣,道長,這世上最難降的妖魔是什么虹统? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任弓坞,我火速辦了婚禮,結(jié)果婚禮上窟却,老公的妹妹穿的比我還像新娘。我一直安慰自己呻逆,他們只是感情好夸赫,可當(dāng)我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著咖城,像睡著了一般茬腿。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上宜雀,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天切平,我揣著相機與錄音,去河邊找鬼辐董。 笑死悴品,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播苔严,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼定枷,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了届氢?” 一聲冷哼從身側(cè)響起欠窒,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎退子,沒想到半個月后岖妄,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡寂祥,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年荐虐,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片壤靶。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡缚俏,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出贮乳,到底是詐尸還是另有隱情忧换,我是刑警寧澤,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布向拆,位于F島的核電站亚茬,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏浓恳。R本人自食惡果不足惜刹缝,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望颈将。 院中可真熱鬧梢夯,春花似錦、人聲如沸晴圾。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽死姚。三九已至人乓,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間都毒,已是汗流浹背色罚。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留账劲,地道東北人戳护。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓金抡,卻偏偏與公主長得像,于是被迫代替她去往敵國和親姑尺。 傳聞我的和親對象是個殘疾皇子竟终,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容