話(huà)說(shuō)上回中缕贡,KafkaProducer已經(jīng)將生產(chǎn)的記錄追加到了RecordAccumulator中。那么接下來(lái)的事情,就是怎么樣把這些記錄提交到服務(wù)端了崭倘。
是否還記得在KafkaProducer.doSend方法一下代碼段:
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
// 5.如果 batch已經(jīng)滿(mǎn)了或者是新建立的Batch,喚醒 sender線(xiàn)程發(fā)送數(shù)據(jù)
if (result.batchIsFull || result.newBatchCreated) {
this.sender.wakeup();
}
在一個(gè)RecordBatch已經(jīng)滿(mǎn)了或是新建立了一個(gè)RecordBatch(之所以新建是因?yàn)榕f的放不下消息了问芬。因此意味著舊的就可以發(fā)送了)悦析。就喚醒發(fā)送線(xiàn)程,準(zhǔn)備提交記錄到服務(wù)端此衅。
this.sender.wakeup(); // 將Sender線(xiàn)程從阻塞中喚醒
Sender
實(shí)現(xiàn)Runnable接口的對(duì)象强戴。一個(gè)KafkaProducer持有一個(gè)Sender實(shí)例。Sender線(xiàn)程迭代RecordAccumulator中batches變量的每個(gè)分區(qū)(tp)挡鞍,獲取分區(qū)對(duì)應(yīng)的主副本節(jié)點(diǎn)骑歹,然后取出分區(qū)對(duì)應(yīng)隊(duì)列中的RecordBatch,提交到服務(wù)端墨微。(追加消息到記錄收集器中(RecordAccumulator)都是按照分區(qū)分好組了道媚,所以每個(gè)分區(qū)隊(duì)列都是保存的即將發(fā)送到這個(gè)分區(qū)主副本對(duì)應(yīng)的節(jié)點(diǎn)上的記錄)。
Sender.run
void run(long now) {
// 從元數(shù)據(jù)對(duì)象中獲取集群信息
Cluster cluster = metadata.fetch();
// 遍歷所有的topic-partition,如果其對(duì)應(yīng)的RecordBatch可以發(fā)送(大小達(dá)到 batch.size或時(shí)間達(dá)到 linger.ms) 就取出其對(duì)應(yīng)的leader最域。
// 返回ReadyCheckResult實(shí)例谴分,包含:可以發(fā)送的RecordBatch對(duì)應(yīng)的節(jié)點(diǎn)(leader)等信息
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// 如果有topic-partition的leader是未知的,就強(qiáng)制metadata更新
if (!result.unknownLeaderTopics.isEmpty()) {
// ready()方法中遇到?jīng)]有l(wèi)eader的tp就將其加入ReadyCheckResult.unknownLeaderTopics的set集合中
// 然后會(huì)去請(qǐng)求這些tp的的meta
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
}
// 如果與node沒(méi)有連接(如果允許連接但還沒(méi)連接,就初始化連接),就證明該node暫時(shí)不能接收數(shù)據(jù),暫時(shí)移除該 node
// 建立到主節(jié)點(diǎn)的網(wǎng)絡(luò)連接镀脂,移除還沒(méi)有準(zhǔn)備好的節(jié)點(diǎn)(leader還沒(méi)有選擇出來(lái)的節(jié)點(diǎn)) Iterator iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// 返回該 node 對(duì)應(yīng)的所有可以發(fā)送的 RecordBatch 組成的 batches(key 是 node.id),并將 RecordBatch 從對(duì)應(yīng)的 queue 中移除
// 讀取記錄收集器牺蹄,返回組合好的在同一個(gè)節(jié)點(diǎn)上的所有主副本對(duì)應(yīng)的分區(qū)的RecordBatch
// Map<nodeID,要準(zhǔn)備發(fā)送到該節(jié)點(diǎn)的所有RecordBatch(包括不同的分區(qū))>
Map<Integer,List<RecordBatch>> batches = this.accumulator.drain(cluster,result.readyNodes,this.maxRequestSize,now);
if (guaranteeMessageOrder) {
// 記錄將要發(fā)送的 topicPartition
for (List batchList : batches.values()) {
for (RecordBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
// 將由于元數(shù)據(jù)不可用等情況而導(dǎo)致不能發(fā)送的 RecordBatch移除
List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches);
// 構(gòu)建以節(jié)點(diǎn)為級(jí)別的生產(chǎn)請(qǐng)求列表,既每個(gè)節(jié)點(diǎn)只有一個(gè)客戶(hù)端請(qǐng)求
// 減少客戶(hù)端到服務(wù)端的請(qǐng)求次數(shù)
List requests = createProduceRequests(batches, now);
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (result.readyNodes.size() > 0) {
pollTimeout = 0;
}
for (ClientRequest request : requests)
client.send(request, now); // 保存要發(fā)送的客戶(hù)端請(qǐng)求薄翅,這里沒(méi)有真正的發(fā)送
// 執(zhí)行真正的網(wǎng)絡(luò)讀寫(xiě)請(qǐng)求钞馁,將上面的客戶(hù)端請(qǐng)求真正發(fā)送出去
this.client.poll(pollTimeout, now);
}
在發(fā)送線(xiàn)程發(fā)送消息時(shí),記錄收集器會(huì)按照節(jié)點(diǎn)維度將RecordBatch重新組裝(Map<nodeID,要準(zhǔn)備發(fā)送到該節(jié)點(diǎn)的所有RecordBatch>),返回給發(fā)送線(xiàn)程匿刮,再由發(fā)送線(xiàn)程為每一個(gè)節(jié)點(diǎn)創(chuàng)建一個(gè)客戶(hù)端請(qǐng)求僧凰。
細(xì)看一下run中的方法:
RecordAccumulator.ready
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set unknownLeaderTopics = new HashSet<>();
boolean exhausted = this.free.queued() > 0;
for (Map.Entry> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque deque = entry.getValue();
Node leader = cluster.leaderFor(part); // 查詢(xún)tp的leader對(duì)應(yīng)的節(jié)點(diǎn)信息
synchronized (deque) {
if (leader == null && !deque.isEmpty()) {
unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !muted.contains(part)) {
// 如果 muted 集合包含這個(gè) tp,那么在遍歷時(shí)將不會(huì)處理它對(duì)應(yīng)的 deque熟丸,
// 也就是說(shuō)训措,如果一個(gè) tp在muted集合中,說(shuō)明它還有RecordBatch正在處理中(沒(méi)有收到響應(yīng))
// 那么即使它對(duì)應(yīng)的RecordBatch可以發(fā)送了光羞,也不會(huì)處理
RecordBatch batch = deque.peekFirst();
if (batch != null) {
boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
long waitedTimeMs = nowMs - batch.lastAttemptMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
boolean full = deque.size() > 1 || batch.records.isFull(); // batch滿(mǎn)了
boolean expired = waitedTimeMs >= timeToWaitMs; // batch超時(shí)
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader); // 將可以發(fā)送的leader添加到集合中
} else {
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
ready方法返回的ReadyCheckResult對(duì)象包括:可以發(fā)送的RecordBatch對(duì)應(yīng)的節(jié)點(diǎn)(leader)信息绩鸣,下一次就緒檢查點(diǎn)的時(shí)間,分區(qū)的leader未知的topic信息纱兑。發(fā)現(xiàn)有分區(qū)的leader未知的topic信息那么就會(huì)去強(qiáng)制更新元數(shù)據(jù)里面的集群信息呀闻。
RecordAccumulator.drain
public Map<Integer,List<RecordBatch>> drain(Cluster cluster, Set nodes, int maxSize, long now) {
if (nodes.isEmpty())
return Collections.emptyMap();
Map<Integer,List<RecordBatch>> batches = new HashMap<>();
for (Node node : nodes) {
int size = 0;
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
List<RecordBatch> ready = new ArrayList<>();
int start = drainIndex = drainIndex % parts.size();
do {
PartitionInfo part = parts.get(drainIndex);
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
if (!muted.contains(tp)) {
// 被 mute 的 tp 依然不會(huì)被遍歷
Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
if (deque != null) {
// tp有對(duì)應(yīng)的隊(duì)列有數(shù)據(jù),會(huì)選擇出來(lái)潜慎,加上已經(jīng)被選擇出來(lái)的RecordBatch捡多,直到達(dá)到最大的請(qǐng)求長(zhǎng)度,才停止
// 這樣一個(gè)RecordBatch及時(shí)沒(méi)有達(dá)到發(fā)送條件(沒(méi)有裝滿(mǎn))铐炫,為了保證每個(gè)請(qǐng)求盡可能多的發(fā)送數(shù)據(jù)垒手,也會(huì)被發(fā)送出去。
synchronized (deque) {
RecordBatch first = deque.peekFirst();
if (first != null) {
boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
if (!backoff) {
if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
break;
} else {
RecordBatch batch = deque.pollFirst();
batch.records.close();
size += batch.records.sizeInBytes();
ready.add(batch);
batch.drainedMs = now;
}
}
}
}
}
}
this.drainIndex = (this.drainIndex + 1) % parts.size();
} while (start != drainIndex);
batches.put(node.id(), ready);
}
return batches;
}
drain方法倒信,在max.request.size的范圍內(nèi)發(fā)送盡可能多的RecordBatch科贬。并且重新按照節(jié)點(diǎn)維度重新整合記錄。
在記錄收集器中的存儲(chǔ)數(shù)據(jù)格式為:batches-->Map<TopicPartition,Deque<RecordBatch>>鳖悠。發(fā)送線(xiàn)程獲取數(shù)據(jù)時(shí)記錄收集器返回的數(shù)據(jù)格式為:batches-->Map<nodeId,List<RecordBatch>>
記錄收集器(RecordAccumulator)榜掌,發(fā)送線(xiàn)程(Sender),服務(wù)端(Broker)
參考了《Kafka技術(shù)內(nèi)幕:圖文詳解Kafka源碼設(shè)計(jì)與實(shí)現(xiàn)》中的圖
wakeup方法把發(fā)送線(xiàn)程喚醒乘综,但是Sender并不負(fù)責(zé)真正發(fā)送客戶(hù)端請(qǐng)求到服務(wù)端憎账,它做的事情只是從記錄收集器(RecordAccumulator)中,取出可以發(fā)送的記錄瘾带,封裝成Map<nodeId,List<RecordBatch>>結(jié)構(gòu)鼠哥,創(chuàng)建好客戶(hù)端請(qǐng)求熟菲,然后把請(qǐng)求交給NetworkClient(客戶(hù)端網(wǎng)絡(luò)對(duì)象)去發(fā)送看政。
NetworkClient
Kafka客戶(hù)端發(fā)送是基于NIO構(gòu)建自己的通信層NetworkClient朴恳。它管理了客戶(hù)端和服務(wù)端的網(wǎng)絡(luò)通信。
以上是NetworkClient關(guān)于通信層方面的生態(tài)類(lèi)允蚣。
NetworkClient重要的幾個(gè)方法:
ready():連接所有可以連接的節(jié)點(diǎn)于颖。如果服務(wù)器不能連接偏塞,就把節(jié)點(diǎn)移除掉收班。
send():在當(dāng)前節(jié)點(diǎn)可以發(fā)送新的請(qǐng)求的情況下(這里的可以發(fā)是在能正常連接的情況下芭挽,同一個(gè)節(jié)點(diǎn)私杜,一個(gè)客戶(hù)端請(qǐng)求還沒(méi)有完成時(shí)骂删,就不能發(fā)送新的客戶(hù)端請(qǐng)求)廷没,把Sender發(fā)送線(xiàn)程創(chuàng)建的客戶(hù)端請(qǐng)求抬闷,存到節(jié)點(diǎn)對(duì)應(yīng)的通道中(KafkaChannel)缸逃,并緩存到“沒(méi)有收到響應(yīng)的隊(duì)列”中(InFlightRequests)壶运。
poll():輪詢(xún)耐齐,真正的執(zhí)行網(wǎng)絡(luò)請(qǐng)求,發(fā)送請(qǐng)求到節(jié)點(diǎn)蒋情,讀取響應(yīng)埠况。此方法中要調(diào)用org.apache.kafka.common.network.Selector.poll()方法。在一次poll之后會(huì)對(duì)這次poll數(shù)據(jù)進(jìn)行相關(guān)的處理:
1棵癣,處理已經(jīng)完成的Send辕翰,包括那些發(fā)送完成后不需要響應(yīng)的Send-->handleCompletedSends。
2狈谊,處理從服務(wù)端接收到響應(yīng)-->handleCompletedReceives喜命。
3,處理連接失敗那些連接-->handleDisconnections河劝。
4渊抄,處理新建立的那些連接-->handleConnections。
5丧裁,超時(shí)的請(qǐng)求-->handleTimedOutRequests护桦。
6,調(diào)用請(qǐng)求的回調(diào)函數(shù)煎娇。
Selector(org.apache.kafka.common.network)
來(lái)回顧一下java NIO中的一些概念:以下描述參考:《Kafka技術(shù)內(nèi)幕:圖文詳解Kafka源碼設(shè)計(jì)與實(shí)現(xiàn)》
SocketChannel:客戶(hù)端網(wǎng)絡(luò)連接通道二庵,底層的字節(jié)數(shù)據(jù)讀寫(xiě)都發(fā)生在通道上(從通道中讀取數(shù)據(jù),將數(shù)據(jù)寫(xiě)入通道中)缓呛,通道會(huì)和字節(jié)緩沖區(qū)一起使用催享,從通道中讀取數(shù)據(jù)時(shí)需要構(gòu)造一個(gè)緩沖區(qū),調(diào)用channel.read(buffer)就會(huì)將通道中的數(shù)據(jù)添加到緩沖區(qū)中哟绊。將數(shù)據(jù)寫(xiě)入通道時(shí)因妙。要先將數(shù)據(jù)寫(xiě)到緩存區(qū)中,調(diào)用channel.write(buffer)將緩沖區(qū)中的每個(gè)字節(jié)寫(xiě)入到通道中。
Selector:發(fā)生在通道上的事件有讀和寫(xiě)攀涵,選擇器會(huì)通過(guò)選擇鍵的方式監(jiān)聽(tīng)讀寫(xiě)事件的發(fā)生铣耘。
SelectionKey:將通道注冊(cè)到選擇器上:channel.register(selector)會(huì)返回選擇鍵。選擇鍵將通道和選擇器關(guān)聯(lián)起來(lái)以故。讀和寫(xiě)事件發(fā)生時(shí)蜗细,通過(guò)選擇鍵可以得到對(duì)應(yīng)的通道,從而在通道上進(jìn)行讀寫(xiě)操作怒详。
Sender炉媒,NetworkClient,Selector
KafkaChannel
id:NodeId
TransportLayer:負(fù)責(zé)字節(jié)操作的傳輸層昆烁,KafkaChannel要操作SockerChannel時(shí)吊骤,都交給TransportLayer傳輸層去做。
NetworkReceive:接收的數(shù)據(jù)静尼。
Send:發(fā)送的請(qǐng)求數(shù)據(jù)白粉,一個(gè)KafkaChannel一次只存放一個(gè)請(qǐng)求數(shù)據(jù)。等著數(shù)據(jù)發(fā)送完成后茅郎,才能發(fā)送下一個(gè)請(qǐng)求數(shù)據(jù)蜗元。
TransportLayer
傳輸層對(duì)SockerChannel做了簡(jiǎn)單的封裝(都實(shí)現(xiàn)了ScatteringByteChannel和GatheringByteChannel接口),選擇器Selector在調(diào)用KafkaChannel.write和read方法時(shí)系冗,實(shí)際是調(diào)用Send.writeTo和NetworkReceive.readFrom奕扣,再調(diào)用底層SockerChannel.write和read方法
選擇器監(jiān)聽(tīng)到了客戶(hù)端的讀寫(xiě)事件,會(huì)獲取綁定到選擇鍵(SelectionKey)上的KafkaChannel掌敬,KafkaChannel會(huì)將讀寫(xiě)操作交給傳輸層(TransportLayer)惯豆,TransportLayer再使用底層的SocketChannel完成數(shù)據(jù)的操作。
NetworkClient.ready
在確認(rèn)節(jié)點(diǎn)是否可以發(fā)送的時(shí)奔害,允許連接但是沒(méi)有連接的情況下會(huì)初始化連接楷兽,調(diào)用org.apache.kafka.common.network.Selector.connect,連接動(dòng)作使用java原生的SocketChannel完成华临。在此方法中會(huì)構(gòu)建KafkaChannel芯杀,讓KafkaChannel和SelectionKey關(guān)聯(lián)起來(lái)。還維護(hù)了節(jié)點(diǎn)和KafkaChannel的映射關(guān)系(<nodeId,KafkaChannel>)雅潭。
SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); Socket socket = socketChannel.socket(); socket.setKeepAlive(true);
boolean connected = socketChannel.connect(address); // 發(fā)起連接請(qǐng)求
SelectionKey key = socketChannel.register(java.nio.channels.Selector, SelectionKey.OP_CONNECT);
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize); // 構(gòu)建KafkaChannel對(duì)象
key.attach(channel); // 將KafkaChannel注冊(cè)到選擇鍵上
this.channels.put(id, channel); //?節(jié)點(diǎn)和KafkaChannel的映射關(guān)系
NetworkClient.send
客戶(hù)端發(fā)送的ClientRequest請(qǐng)求揭厚,經(jīng)過(guò)NetworkClient.send()--->org.apache.kafka.common.network.Selector.send()--->KafkaChannel.setSend()。保存到對(duì)應(yīng)的KafkaChannel中扶供,但在KafkaChannel還有未發(fā)送成功的Send請(qǐng)求筛圆,則后面的請(qǐng)求則不能發(fā)送(在一個(gè)KafkaChannel中,一次只能發(fā)送一個(gè)客戶(hù)端請(qǐng)求)椿浓。
KafkaChannel一次只處理一個(gè)Send太援,每次都會(huì)注冊(cè)寫(xiě)事件闽晦,當(dāng)Send發(fā)送成功后,就注銷(xiāo)寫(xiě)事件提岔。這里的發(fā)送完成是整個(gè)Send請(qǐng)求發(fā)送完成仙蛉,如果調(diào)用一次底層的write方法沒(méi)有完成寫(xiě)完,那么寫(xiě)事件不會(huì)被注銷(xiāo)唧垦,會(huì)繼續(xù)監(jiān)聽(tīng)寫(xiě)事件捅儒,直到整個(gè)Send請(qǐng)求發(fā)送完成液样。
注冊(cè)寫(xiě)事件振亮,當(dāng)Selector輪詢(xún)后,寫(xiě)事件準(zhǔn)備就緒鞭莽,就會(huì)從KafkaChannel取出客戶(hù)端請(qǐng)求坊秸,調(diào)用底層的write方法進(jìn)行發(fā)送。
NetworkClient.poll
NetworkClient的輪詢(xún)會(huì)調(diào)用Selector.poll()澎怒,在選擇鍵上處理讀寫(xiě)事件褒搔,當(dāng)事件發(fā)生時(shí),調(diào)用KafkaChannel上的read和write會(huì)得到返回值NetworkReceive和Send對(duì)象喷面,加入到List<Send>:completedSends(發(fā)送完成的客戶(hù)端請(qǐng)求對(duì)象集合)和Map<KafkaChannel,Deque<NetworkReceive>>:stagedReceives(完全接收完服務(wù)端響應(yīng)保存到KafkaChannel對(duì)應(yīng)的隊(duì)列中)星瘾。最后這些集合中的數(shù)據(jù)服務(wù)于poll方法后續(xù)的handle開(kāi)頭的方法中。
private void pollSelectionKeys(Iterable selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {
Iterator iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
KafkaChannel channel = channel(key); // 獲取綁定到選擇鍵中的KafkaChannel sensors.maybeRegisterConnectionMetrics(channel.id());
if (idleExpiryManager != null)
idleExpiryManager.update(channel.id(), currentTimeNanos);
try { // 處理一些剛建立 tcp 連接的 channel
if (isImmediatelyConnected || key.isConnectable()) { if (channel.finishConnect()) { // 連接已經(jīng)建立
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
SocketChannel socketChannel = (SocketChannel) key.channel();
}
else continue;
}
if (channel.isConnected() && !channel.ready())
channel.prepare();
// 在讀取一個(gè)響應(yīng)的時(shí)候惧辈,可能會(huì)調(diào)用很多次的read琳状,所以需要循環(huán)讀取
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null) // 循環(huán)接收,直到讀取到一個(gè)完整的 Receive,才退出循環(huán)
addToStagedReceives(channel, networkReceive); // 讀取完成后將響應(yīng)數(shù)據(jù)添加到集合中
}
// 底層發(fā)送的時(shí)候盒齿,并不定一次可以完全發(fā)送念逞,所以會(huì)調(diào)用很多次的write,才會(huì)完成一個(gè)Send的發(fā)送
// write是非阻塞的,不會(huì)等到全部發(fā)送才返回
// 所以在沒(méi)有全部發(fā)送的時(shí)候边翁,不會(huì)注銷(xiāo)寫(xiě)事件
//在epoll的缺省模式下(LT(水平觸發(fā))):寫(xiě)緩沖區(qū)只要不滿(mǎn)翎承,就一直會(huì)觸發(fā)寫(xiě)事件。所以只要不注銷(xiāo)寫(xiě)事件符匾,那么就會(huì)觸發(fā)寫(xiě)事件叨咖,直到把一個(gè)完整的Send發(fā)送完成
// 在LT模式下,寫(xiě)緩沖區(qū)為滿(mǎn)的概率很小啊胶,所以寫(xiě)完Send后甸各,要注銷(xiāo)寫(xiě)事件,否則會(huì)出現(xiàn)一直觸發(fā)寫(xiě)事件
if (channel.ready() && key.isWritable()) {
Send send = channel.write(); // send不為空创淡,表示完全發(fā)送出去了痴晦,返回此send對(duì)象,如果沒(méi)有完全發(fā)送出去琳彩,就返回NULL
if (send != null) {
this.completedSends.add(send); // 將完成的 send添加到list中 this.sensors.recordBytesSent(channel.id(), send.size());
}
}
if (!key.isValid()) { // 關(guān)閉斷開(kāi)的連接
close(channel);
this.disconnected.add(channel.id());
}
} catch (Exception e) {
String desc = channel.socketDescription();
close(channel); this.disconnected.add(channel.id());
}
}
}
以上就是生產(chǎn)者的產(chǎn)出客戶(hù)端請(qǐng)求通過(guò)Sender-->NetworkClient-->Selector-->KafkaChannel-->Send/NetworkReceive-->TransportLayer-->SocketChannel誊酌。這個(gè)鏈條進(jìn)行發(fā)送和消息接收部凑。