1. 背景
系列文章
- Kafka系列《一》-- 生產(chǎn)者Producer流程及Partition詳解
- Kafka系列《二》-- 生產(chǎn)者Producer中的請求及源碼詳解
- Kafka系列《三》-- 生產(chǎn)者Producer中的冪等性
- Kafka系列《四》-- 生產(chǎn)者Producer中的事務(wù)性
- Kafka系列《五》-- 消費者Consumer流程概覽
- Kafka系列《六》-- 消費者Consumer中的消費方案分配算法解析
- Kafka系列《七》-- 消費者Consumer中的重平衡解析
- Kafka系列《八》-- 消費者Consumer中的消費過程解析
- Kafka系列《九》-- 消費者Consumer中的消費session會話和transaction事務(wù)
在前一篇系列文章Kafka系列《一》-- 生產(chǎn)者Producer流程及Partition詳解中,相信已經(jīng)對Kafka的整體流程有所了解了徒溪,在這篇文章中在進一步看看producer中的那些請求及相關(guān)源碼解析忿偷;
有了前一篇的基礎(chǔ),再看這一片相信就覺得理所當(dāng)然了臊泌。
2. Producer中的請求及其響應(yīng)
2.1 API_VERSIONS請求
API_VERSIONS請求用于協(xié)商producer和broken之間的API版本鲤桥,在初始請求時會使用最大的版本號發(fā)送請求
Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=4, clientId=consumer-group1-1, correlationId=1, headerVersion=2) and timeout 30000 to node -1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.9.0')
當(dāng)broken不支持這個最新的版本號時,會返回錯誤渠概,響應(yīng)中errorCode=35
對應(yīng)的是UNSUPPORTED_VERSION錯誤茶凳,由于broken沒有返回自己支持的最大版本號,因此producer會使用最低的版本號0重新發(fā)起一次API_VERSIONS請求
Received API_VERSIONS response from node -1 for request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=4, clientId=consumer-group1-1, correlationId=1, headerVersion=2): ApiVersionsResponseData(errorCode=35, apiKeys=[], throttleTimeMs=0, supportedFeatures=[], finalizedFeaturesEpoch=-1, finalizedFeatures=[], zkMigrationReady=false)
此時broken就會返回所有API支持的版本號播揪;比如apiKey=18對應(yīng)的是API_VERSIONS 請求贮喧,支持的最低版本號是0,最高版本號是2猪狈;再如apiKey=3對應(yīng)的是METADATA請求塞淹,支持的最低版本號是0,最高版本號是8
ApiVersionsResponseData(errorCode=0, apiKeys=[ApiVersion(apiKey=0, minVersion=0, maxVersion=7), ApiVersion(apiKey=3, minVersion=0, maxVersion=8),ApiVersion(apiKey=18, minVersion=0, maxVersion=2))
獲取到API對應(yīng)的版本后罪裹,后續(xù)producer發(fā)送請求時就按照broken支持的最高版本號發(fā)送消息了饱普。
2.2 METADATA請求
METADATA請求用于更新原數(shù)據(jù)信息,其中使用API版本號就是最大的8状共,并且指定了需要獲取kafka-k8s-test
這個topic的原數(shù)據(jù)信息套耕;這里的topic就是我們通過send方法發(fā)送的topic
Sending METADATA request with header RequestHeader(apiKey=METADATA, apiVersion=8, clientId=consumer-group1-1, correlationId=3, headerVersion=1) and timeout 30000 to node -1: MetadataRequestData(topics=[MetadataRequestTopic(topicId=AAAAAAAAAAAAAAAAAAAAAA, name='kafka-k8s-test')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false)
正常情況下,broken會返回broken list和topic信息
MetadataResponseData(throttleTimeMs=0, brokers=[MetadataResponseBroker(nodeId=4, host='1.1.1.1', port=9092, rack='b1'), MetadataResponseBroker(nodeId=1, host='2.2.2.2', port=9092, rack='b2'), MetadataResponseBroker(nodeId=2, host='3.3.3.3', port=9092, rack='b3'), MetadataResponseBroker(nodeId=3, host='4.4.4.4', port=9092, rack='b4')], clusterId='Yxxxxxxxxx_xxxxxxw', controllerId=1, topics=[MetadataResponseTopic(errorCode=0, name='kafka-k8s-test', topicId=AAAAAAAAAAAAAAAAAAAAAA, isInternal=false, partitions=[MetadataResponsePartition(errorCode=0, partitionIndex=0, leaderId=3, leaderEpoch=1, replicaNodes=[3, 2, 1], isrNodes=[3, 1, 2], offlineReplicas=[]), MetadataResponsePartition(errorCode=0, partitionIndex=2, leaderId=1, leaderEpoch=1, replicaNodes=[1, 3, 2], isrNodes=[1, 3, 2], offlineReplicas=[]), MetadataResponsePartition(errorCode=0, partitionIndex=1, leaderId=2, leaderEpoch=3, replicaNodes=[2, 1, 3], isrNodes=[1, 3, 2], offlineReplicas=[])], topicAuthorizedOperations=0)], clusterAuthorizedOperations=0)
2.3 PRODUCE請求
PRODUCE請求用于發(fā)送producer的數(shù)據(jù)峡继,最大的支持版本是7
Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=7, clientId=producer-1, correlationId=8, headerVersion=1) and timeout 30000 to node 1: {acks=-1,timeout=30000,partitionSizes=[kafka-k8s-test-2=82]}
正常情況下broken的響應(yīng)
ProduceResponseData(responses=[TopicProduceResponse(name='kafka-k8s-test', partitionResponses=[PartitionProduceResponse(index=2, errorCode=0, baseOffset=32, logAppendTimeMs=-1, logStartOffset=28, recordErrors=[], errorMessage=null, currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1))])], throttleTimeMs=0, nodeEndpoints=[])
3. 一些需要反復(fù)看才能懂的代碼
3.1 metadata的更新
producer會緩存broken的狀態(tài)在本地冯袍,對應(yīng)的是一個Map集合,key是broken id,value是這個broken的狀態(tài)康愤,初始為空
private final Map<String, NodeConnectionState> nodeState;
伴隨著metadata的初始化會經(jīng)歷的狀態(tài)變化
null -> CONNECTING -> CHECKING_API_VERSIONS -> READY;
metadata的更新是由NIO線程完成的:
初始為空時儡循,會先判斷到
canConnect
,然后開始連接broken征冷,注冊通道择膝、OP_CONNECT
事件到NIO;并將broken狀態(tài)修改為CONNECTING
然后再進入NIO的select方法检激,等待
OP_CONNECT
事件到來肴捉;如果連接較慢的時候,NIO超時從select方法醒來叔收,然后繼續(xù)開始下一輪的檢查更新齿穗,此時會判斷到isAnyNodeConnecting
,存在CONNECTING
狀態(tài)的broken饺律,因此直接再進入NIO的select方法窃页,等待OP_CONNECT
事件到來;監(jiān)聽到
OP_CONNECT
事件后复濒,還需要發(fā)送API_VERSIONS
請求腮出,此時broken狀態(tài)會更新為CHECKING_API_VERSIONS
狀態(tài),等待API_VERSIONS
請求成功返回后芝薇,才會將broken狀態(tài)改為READY
直達broken狀態(tài)改為
READY
后,才會滿足canSendRequest
這個條件作儿,在這個條件下才會發(fā)送METADATA
請求
private long maybeUpdate(long now, Node node) {
String nodeConnectionId = node.idString();
// broken狀態(tài)為READY
if (canSendRequest(nodeConnectionId, now)) {
Metadata.MetadataRequestAndVersion requestAndVersion = metadata.newMetadataRequestAndVersion(now);
MetadataRequest.Builder metadataRequest = requestAndVersion.requestBuilder;
sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
inProgress = new InProgressData(requestAndVersion.requestVersion, requestAndVersion.isPartialUpdate);
return defaultRequestTimeoutMs;
}
// 存在CONNECTING 狀態(tài)broken
if (isAnyNodeConnecting()) {
return reconnectBackoffMs;
}
// broken狀態(tài)為空
if (connectionStates.canConnect(nodeConnectionId, now)) {
initiateConnect(node, now);
return reconnectBackoffMs;
}
return Long.MAX_VALUE;
}
3.2 send消息存入緩存
send方法發(fā)送消息洛二,本質(zhì)上就是往batches
數(shù)組追加批次數(shù)據(jù);可以看到這里隊列的操作都是需要同步鎖的攻锰,因為send線程和NIO線程是可能同時操作隊列的晾嘶;而且NIO發(fā)送失敗的話,對于可以重試的錯誤也會直接將批次重新加入到隊首娶吞,等待重試
但是對于ByteBuffer的分配卻是放在同步代碼塊外面的垒迂,因為ByteBuffer分配是可能需要等待的,因為producer可以分配的最大內(nèi)存默認是32M妒蛇,無法立即分配的ByteBuffer則需要wait內(nèi)存釋放机断;如果分配ByteBuffer也放在同步鎖內(nèi),那NIO線程就沒機會獲取到隊列的鎖了绣夺,進而就沒機會從隊列中移除批次發(fā)送后釋放內(nèi)存吏奸;這就形成了死鎖了;send線程占用了隊列的鎖等著釋放內(nèi)存陶耍,NIO線程想要釋放內(nèi)存但是獲取不到隊列的鎖了奋蔚。
Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
synchronized (dq) {
// After taking the lock, validate that the partition hasn't changed and retry.
if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
continue;
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
if (appendResult != null) {
// If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
boolean enableSwitch = allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
}
}
if (buffer == null) {
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression.type(), key, value, headers));
buffer = free.allocate(size, maxTimeToBlock);
nowMs = time.milliseconds();
}
synchronized (dq) {
// After taking the lock, validate that the partition hasn't changed and retry.
if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
continue;
RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs);
// Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch.
if (appendResult.newBatchCreated)
buffer = null;
// If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
boolean enableSwitch = allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
}
3.3 NIO處理寫事件
producer請求broken都是通過doSend
方法完成的,而請求都會先轉(zhuǎn)為Send
對象,這個步驟對于理解NIO處理讀事件至關(guān)重要
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
String destination = clientRequest.destination();
RequestHeader header = clientRequest.makeHeader(request.version());
Send send = request.toSend(header);
InFlightRequest inFlightRequest = new InFlightRequest(
clientRequest,
header,
isInternalRequest,
request,
send,
now);
this.inFlightRequests.add(inFlightRequest);
selector.send(new NetworkSend(clientRequest.destination(), send));
}
在構(gòu)建Send
對象的時候泊碑,可以看到其中主要包括開始的4字節(jié)坤按、請求Header(包括請求類型、版本號信息)馒过、請求體(消息主體)臭脓;而開始的4字節(jié)寫入的是整個message的大小builder.writeInt(messageSize.totalSize());
,因為int對應(yīng)的就是4字節(jié)沉桌;接著才是寫入請求頭信息谢鹊、請求體信息
private static Send buildSend(
Message header,
short headerVersion,
Message apiMessage,
short apiVersion
) {
ObjectSerializationCache serializationCache = new ObjectSerializationCache();
MessageSizeAccumulator messageSize = new MessageSizeAccumulator();
header.addSize(messageSize, serializationCache, headerVersion);
apiMessage.addSize(messageSize, serializationCache, apiVersion);
SendBuilder builder = new SendBuilder(messageSize.sizeExcludingZeroCopy() + 4);
builder.writeInt(messageSize.totalSize());
header.write(builder, serializationCache, headerVersion);
apiMessage.write(builder, serializationCache, apiVersion);
return builder.build();
}
注意到了這個細節(jié),再來看看NIO是怎么處理讀事件的就一目了然了留凭。
3.4 NIO處理讀事件
當(dāng)producer收到broken的響應(yīng)后佃扼,就需要從通道讀取數(shù)據(jù),相信很多人看到這里的ByteBuffer.allocate(4)
都會很迷惑蔼夜,為什么是4兼耀?而不是其它的數(shù)字呢?
這就和上面的寫數(shù)據(jù)剛好對應(yīng)上了求冷,因為這兩處代碼里使用的是魔法值瘤运,所以有點難直接聯(lián)想到;這也告誡我們代碼里盡量不要使用魔法值~~
public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) {
this.source = source;
this.size = ByteBuffer.allocate(4);
this.buffer = null;
this.maxSize = maxSize;
this.memoryPool = memoryPool;
}
然后開始從通道讀取數(shù)據(jù)的時候匠题,也是要先把size讀滿拯坟,也就是一定要先讀4字節(jié);只有讀完4字節(jié)后!size.hasRemaining()
才會開始分配ByteBuffer
來接受實際的響應(yīng)數(shù)據(jù)
因為寫數(shù)據(jù)的時候最開始寫入的4字節(jié)是整條請求數(shù)據(jù)的大小韭山,按照Kafka的協(xié)議郁季,響應(yīng)數(shù)據(jù)的最開始4字節(jié)也必須是響應(yīng)數(shù)據(jù)的大小。
public long readFrom(ScatteringByteChannel channel) throws IOException {
int read = 0;
if (size.hasRemaining()) {
int bytesRead = channel.read(size);
if (bytesRead < 0)
throw new EOFException();
read += bytesRead;
if (!size.hasRemaining()) {
size.rewind();
int receiveSize = size.getInt();
if (receiveSize < 0)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
if (maxSize != UNLIMITED && receiveSize > maxSize)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
requestedBufferSize = receiveSize; // may be 0 for some payloads (SASL)
if (receiveSize == 0) {
buffer = EMPTY_BUFFER;
}
}
}
if (buffer == null && requestedBufferSize != -1) { // we know the size we want but haven't been able to allocate it yet
buffer = memoryPool.tryAllocate(requestedBufferSize);
if (buffer == null)
log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);
}
if (buffer != null) {
int bytesRead = channel.read(buffer);
if (bytesRead < 0)
throw new EOFException();
read += bytesRead;
}
return read;
}
獲取到響應(yīng)數(shù)據(jù)的大小后钱磅,就可以為其分配ByteBuffer
來接受實際的響應(yīng)數(shù)據(jù)了梦裂;而且也不用擔(dān)心TCP的粘包和拆包問題了:
粘包問題:多個響應(yīng)數(shù)據(jù)同時返回了,但是現(xiàn)在producer已經(jīng)有了響應(yīng)數(shù)據(jù)的實際大小了盖淡,多的數(shù)據(jù)也不會讀取了年柠,因為分配的
ByteBuffer
的大小就是實際響應(yīng)數(shù)據(jù)的大小褪迟;多余的數(shù)據(jù)直接等待下一次處理讀事件時再重頭開始接收即可拆包問題:響應(yīng)數(shù)據(jù)包被拆成多個了冗恨,但是現(xiàn)在producer已經(jīng)有了響應(yīng)數(shù)據(jù)的實際大小了,如果沒能讀到完整的
ByteBuffer
味赃,那么這里的buffer.hasRemaining()
就是true了派近,也就是本次select的complete是無法完成的,那么本次接受就會按照未完成的接收處理洁桌,等待下一次讀事件再次觸發(fā)時渴丸,繼續(xù)往這次接收里寫入數(shù)據(jù),直到寫滿ByteBuffer
public NetworkReceive maybeCompleteReceive() {
if (receive != null && receive.complete()) {
receive.payload().rewind();
NetworkReceive result = receive;
receive = null;
return result;
}
return null;
}
@Override
public boolean complete() {
return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
}
Kafka中教科書級別的處理粘包/拆包問題,有沒有給你一些觸動呢谱轨?
繼續(xù)閱讀:
你覺得producer里還有哪些寫的很好的代碼或者很難理解的代碼戒幔,歡迎評論區(qū)一起學(xué)習(xí)~