紹圣--kafka之生產(chǎn)者(三)

話(huà)說(shuō)上回中缕贡,KafkaProducer已經(jīng)將生產(chǎn)的記錄追加到了RecordAccumulator中。那么接下來(lái)的事情,就是怎么樣把這些記錄提交到服務(wù)端了崭倘。

是否還記得在KafkaProducer.doSend方法一下代碼段:

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);

// 5.如果 batch已經(jīng)滿(mǎn)了或者是新建立的Batch,喚醒 sender線(xiàn)程發(fā)送數(shù)據(jù)

if (result.batchIsFull || result.newBatchCreated) {

this.sender.wakeup();

}

在一個(gè)RecordBatch已經(jīng)滿(mǎn)了或是新建立了一個(gè)RecordBatch(之所以新建是因?yàn)榕f的放不下消息了问芬。因此意味著舊的就可以發(fā)送了)悦析。就喚醒發(fā)送線(xiàn)程,準(zhǔn)備提交記錄到服務(wù)端此衅。

this.sender.wakeup(); // 將Sender線(xiàn)程從阻塞中喚醒

Sender

實(shí)現(xiàn)Runnable接口的對(duì)象强戴。一個(gè)KafkaProducer持有一個(gè)Sender實(shí)例。Sender線(xiàn)程迭代RecordAccumulator中batches變量的每個(gè)分區(qū)(tp)挡鞍,獲取分區(qū)對(duì)應(yīng)的主副本節(jié)點(diǎn)骑歹,然后取出分區(qū)對(duì)應(yīng)隊(duì)列中的RecordBatch,提交到服務(wù)端墨微。(追加消息到記錄收集器中(RecordAccumulator)都是按照分區(qū)分好組了道媚,所以每個(gè)分區(qū)隊(duì)列都是保存的即將發(fā)送到這個(gè)分區(qū)主副本對(duì)應(yīng)的節(jié)點(diǎn)上的記錄)。

Sender.run

void run(long now) {

// 從元數(shù)據(jù)對(duì)象中獲取集群信息

Cluster cluster = metadata.fetch();

// 遍歷所有的topic-partition,如果其對(duì)應(yīng)的RecordBatch可以發(fā)送(大小達(dá)到 batch.size或時(shí)間達(dá)到 linger.ms) 就取出其對(duì)應(yīng)的leader最域。

// 返回ReadyCheckResult實(shí)例谴分,包含:可以發(fā)送的RecordBatch對(duì)應(yīng)的節(jié)點(diǎn)(leader)等信息

RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

// 如果有topic-partition的leader是未知的,就強(qiáng)制metadata更新

if (!result.unknownLeaderTopics.isEmpty()) {

// ready()方法中遇到?jīng)]有l(wèi)eader的tp就將其加入ReadyCheckResult.unknownLeaderTopics的set集合中

// 然后會(huì)去請(qǐng)求這些tp的的meta

for (String topic : result.unknownLeaderTopics)

this.metadata.add(topic);

this.metadata.requestUpdate();

}

// 如果與node沒(méi)有連接(如果允許連接但還沒(méi)連接,就初始化連接),就證明該node暫時(shí)不能接收數(shù)據(jù),暫時(shí)移除該 node

// 建立到主節(jié)點(diǎn)的網(wǎng)絡(luò)連接镀脂,移除還沒(méi)有準(zhǔn)備好的節(jié)點(diǎn)(leader還沒(méi)有選擇出來(lái)的節(jié)點(diǎn)) Iterator iter = result.readyNodes.iterator();

long notReadyTimeout = Long.MAX_VALUE;

while (iter.hasNext()) {

Node node = iter.next();

if (!this.client.ready(node, now)) {

iter.remove();

notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));

}

}

// 返回該 node 對(duì)應(yīng)的所有可以發(fā)送的 RecordBatch 組成的 batches(key 是 node.id),并將 RecordBatch 從對(duì)應(yīng)的 queue 中移除

// 讀取記錄收集器牺蹄,返回組合好的在同一個(gè)節(jié)點(diǎn)上的所有主副本對(duì)應(yīng)的分區(qū)的RecordBatch

// Map<nodeID,要準(zhǔn)備發(fā)送到該節(jié)點(diǎn)的所有RecordBatch(包括不同的分區(qū))>

Map<Integer,List<RecordBatch>> batches = this.accumulator.drain(cluster,result.readyNodes,this.maxRequestSize,now);

if (guaranteeMessageOrder) {

// 記錄將要發(fā)送的 topicPartition

for (List batchList : batches.values()) {

for (RecordBatch batch : batchList)

this.accumulator.mutePartition(batch.topicPartition);

}

}

// 將由于元數(shù)據(jù)不可用等情況而導(dǎo)致不能發(fā)送的 RecordBatch移除

List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);

for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches);

// 構(gòu)建以節(jié)點(diǎn)為級(jí)別的生產(chǎn)請(qǐng)求列表,既每個(gè)節(jié)點(diǎn)只有一個(gè)客戶(hù)端請(qǐng)求

// 減少客戶(hù)端到服務(wù)端的請(qǐng)求次數(shù)

List requests = createProduceRequests(batches, now);

long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);

if (result.readyNodes.size() > 0) {

pollTimeout = 0;

}

for (ClientRequest request : requests)

client.send(request, now); // 保存要發(fā)送的客戶(hù)端請(qǐng)求薄翅,這里沒(méi)有真正的發(fā)送

// 執(zhí)行真正的網(wǎng)絡(luò)讀寫(xiě)請(qǐng)求钞馁,將上面的客戶(hù)端請(qǐng)求真正發(fā)送出去

this.client.poll(pollTimeout, now);

}

在發(fā)送線(xiàn)程發(fā)送消息時(shí),記錄收集器會(huì)按照節(jié)點(diǎn)維度將RecordBatch重新組裝(Map<nodeID,要準(zhǔn)備發(fā)送到該節(jié)點(diǎn)的所有RecordBatch>),返回給發(fā)送線(xiàn)程匿刮,再由發(fā)送線(xiàn)程為每一個(gè)節(jié)點(diǎn)創(chuàng)建一個(gè)客戶(hù)端請(qǐng)求僧凰。

細(xì)看一下run中的方法:

RecordAccumulator.ready

public ReadyCheckResult ready(Cluster cluster, long nowMs) {

Set readyNodes = new HashSet<>();

long nextReadyCheckDelayMs = Long.MAX_VALUE;

Set unknownLeaderTopics = new HashSet<>();

boolean exhausted = this.free.queued() > 0;

for (Map.Entry> entry : this.batches.entrySet()) {

TopicPartition part = entry.getKey();

Deque deque = entry.getValue();

Node leader = cluster.leaderFor(part); // 查詢(xún)tp的leader對(duì)應(yīng)的節(jié)點(diǎn)信息

synchronized (deque) {

if (leader == null && !deque.isEmpty()) {

unknownLeaderTopics.add(part.topic());

} else if (!readyNodes.contains(leader) && !muted.contains(part)) {

// 如果 muted 集合包含這個(gè) tp,那么在遍歷時(shí)將不會(huì)處理它對(duì)應(yīng)的 deque熟丸,

// 也就是說(shuō)训措,如果一個(gè) tp在muted集合中,說(shuō)明它還有RecordBatch正在處理中(沒(méi)有收到響應(yīng))

// 那么即使它對(duì)應(yīng)的RecordBatch可以發(fā)送了光羞,也不會(huì)處理

RecordBatch batch = deque.peekFirst();

if (batch != null) {

boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;

long waitedTimeMs = nowMs - batch.lastAttemptMs;

long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;

long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);

boolean full = deque.size() > 1 || batch.records.isFull(); // batch滿(mǎn)了

boolean expired = waitedTimeMs >= timeToWaitMs; // batch超時(shí)

boolean sendable = full || expired || exhausted || closed || flushInProgress();

if (sendable && !backingOff) {

readyNodes.add(leader); // 將可以發(fā)送的leader添加到集合中

} else {

nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);

}

}

}

}

}

return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);

}

ready方法返回的ReadyCheckResult對(duì)象包括:可以發(fā)送的RecordBatch對(duì)應(yīng)的節(jié)點(diǎn)(leader)信息绩鸣,下一次就緒檢查點(diǎn)的時(shí)間,分區(qū)的leader未知的topic信息纱兑。發(fā)現(xiàn)有分區(qū)的leader未知的topic信息那么就會(huì)去強(qiáng)制更新元數(shù)據(jù)里面的集群信息呀闻。

RecordAccumulator.drain

public Map<Integer,List<RecordBatch>> drain(Cluster cluster, Set nodes, int maxSize, long now) {

if (nodes.isEmpty())

return Collections.emptyMap();

Map<Integer,List<RecordBatch>> batches = new HashMap<>();

for (Node node : nodes) {

int size = 0;

List<PartitionInfo> parts = cluster.partitionsForNode(node.id());

List<RecordBatch> ready = new ArrayList<>();

int start = drainIndex = drainIndex % parts.size();

do {

PartitionInfo part = parts.get(drainIndex);

TopicPartition tp = new TopicPartition(part.topic(), part.partition());

if (!muted.contains(tp)) {

// 被 mute 的 tp 依然不會(huì)被遍歷

Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));

if (deque != null) {

// tp有對(duì)應(yīng)的隊(duì)列有數(shù)據(jù),會(huì)選擇出來(lái)潜慎,加上已經(jīng)被選擇出來(lái)的RecordBatch捡多,直到達(dá)到最大的請(qǐng)求長(zhǎng)度,才停止

// 這樣一個(gè)RecordBatch及時(shí)沒(méi)有達(dá)到發(fā)送條件(沒(méi)有裝滿(mǎn))铐炫,為了保證每個(gè)請(qǐng)求盡可能多的發(fā)送數(shù)據(jù)垒手,也會(huì)被發(fā)送出去。

synchronized (deque) {

RecordBatch first = deque.peekFirst();

if (first != null) {

boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;

if (!backoff) {

if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {

break;

} else {

RecordBatch batch = deque.pollFirst();

batch.records.close();

size += batch.records.sizeInBytes();

ready.add(batch);

batch.drainedMs = now;

}

}

}

}

}

}

this.drainIndex = (this.drainIndex + 1) % parts.size();

} while (start != drainIndex);

batches.put(node.id(), ready);

}

return batches;

}

drain方法倒信,在max.request.size的范圍內(nèi)發(fā)送盡可能多的RecordBatch科贬。并且重新按照節(jié)點(diǎn)維度重新整合記錄。

在記錄收集器中的存儲(chǔ)數(shù)據(jù)格式為:batches-->Map<TopicPartition,Deque<RecordBatch>>鳖悠。發(fā)送線(xiàn)程獲取數(shù)據(jù)時(shí)記錄收集器返回的數(shù)據(jù)格式為:batches-->Map<nodeId,List<RecordBatch>>

記錄收集器(RecordAccumulator)榜掌,發(fā)送線(xiàn)程(Sender),服務(wù)端(Broker)

參考了《Kafka技術(shù)內(nèi)幕:圖文詳解Kafka源碼設(shè)計(jì)與實(shí)現(xiàn)》中的圖


wakeup方法把發(fā)送線(xiàn)程喚醒乘综,但是Sender并不負(fù)責(zé)真正發(fā)送客戶(hù)端請(qǐng)求到服務(wù)端憎账,它做的事情只是從記錄收集器(RecordAccumulator)中,取出可以發(fā)送的記錄瘾带,封裝成Map<nodeId,List<RecordBatch>>結(jié)構(gòu)鼠哥,創(chuàng)建好客戶(hù)端請(qǐng)求熟菲,然后把請(qǐng)求交給NetworkClient(客戶(hù)端網(wǎng)絡(luò)對(duì)象)去發(fā)送看政。

NetworkClient

Kafka客戶(hù)端發(fā)送是基于NIO構(gòu)建自己的通信層NetworkClient朴恳。它管理了客戶(hù)端和服務(wù)端的網(wǎng)絡(luò)通信。

以上是NetworkClient關(guān)于通信層方面的生態(tài)類(lèi)允蚣。

NetworkClient重要的幾個(gè)方法:

ready():連接所有可以連接的節(jié)點(diǎn)于颖。如果服務(wù)器不能連接偏塞,就把節(jié)點(diǎn)移除掉收班。

send():在當(dāng)前節(jié)點(diǎn)可以發(fā)送新的請(qǐng)求的情況下(這里的可以發(fā)是在能正常連接的情況下芭挽,同一個(gè)節(jié)點(diǎn)私杜,一個(gè)客戶(hù)端請(qǐng)求還沒(méi)有完成時(shí)骂删,就不能發(fā)送新的客戶(hù)端請(qǐng)求)廷没,把Sender發(fā)送線(xiàn)程創(chuàng)建的客戶(hù)端請(qǐng)求抬闷,存到節(jié)點(diǎn)對(duì)應(yīng)的通道中(KafkaChannel)缸逃,并緩存到“沒(méi)有收到響應(yīng)的隊(duì)列”中(InFlightRequests)壶运。

poll():輪詢(xún)耐齐,真正的執(zhí)行網(wǎng)絡(luò)請(qǐng)求,發(fā)送請(qǐng)求到節(jié)點(diǎn)蒋情,讀取響應(yīng)埠况。此方法中要調(diào)用org.apache.kafka.common.network.Selector.poll()方法。在一次poll之后會(huì)對(duì)這次poll數(shù)據(jù)進(jìn)行相關(guān)的處理:

1棵癣,處理已經(jīng)完成的Send辕翰,包括那些發(fā)送完成后不需要響應(yīng)的Send-->handleCompletedSends。

2狈谊,處理從服務(wù)端接收到響應(yīng)-->handleCompletedReceives喜命。

3,處理連接失敗那些連接-->handleDisconnections河劝。

4渊抄,處理新建立的那些連接-->handleConnections。

5丧裁,超時(shí)的請(qǐng)求-->handleTimedOutRequests护桦。

6,調(diào)用請(qǐng)求的回調(diào)函數(shù)煎娇。

Selector(org.apache.kafka.common.network)

來(lái)回顧一下java NIO中的一些概念:以下描述參考:《Kafka技術(shù)內(nèi)幕:圖文詳解Kafka源碼設(shè)計(jì)與實(shí)現(xiàn)》

SocketChannel:客戶(hù)端網(wǎng)絡(luò)連接通道二庵,底層的字節(jié)數(shù)據(jù)讀寫(xiě)都發(fā)生在通道上(從通道中讀取數(shù)據(jù),將數(shù)據(jù)寫(xiě)入通道中)缓呛,通道會(huì)和字節(jié)緩沖區(qū)一起使用催享,從通道中讀取數(shù)據(jù)時(shí)需要構(gòu)造一個(gè)緩沖區(qū),調(diào)用channel.read(buffer)就會(huì)將通道中的數(shù)據(jù)添加到緩沖區(qū)中哟绊。將數(shù)據(jù)寫(xiě)入通道時(shí)因妙。要先將數(shù)據(jù)寫(xiě)到緩存區(qū)中,調(diào)用channel.write(buffer)將緩沖區(qū)中的每個(gè)字節(jié)寫(xiě)入到通道中。

Selector:發(fā)生在通道上的事件有讀和寫(xiě)攀涵,選擇器會(huì)通過(guò)選擇鍵的方式監(jiān)聽(tīng)讀寫(xiě)事件的發(fā)生铣耘。

SelectionKey:將通道注冊(cè)到選擇器上:channel.register(selector)會(huì)返回選擇鍵。選擇鍵將通道和選擇器關(guān)聯(lián)起來(lái)以故。讀和寫(xiě)事件發(fā)生時(shí)蜗细,通過(guò)選擇鍵可以得到對(duì)應(yīng)的通道,從而在通道上進(jìn)行讀寫(xiě)操作怒详。

Sender炉媒,NetworkClient,Selector

KafkaChannel

id:NodeId

TransportLayer:負(fù)責(zé)字節(jié)操作的傳輸層昆烁,KafkaChannel要操作SockerChannel時(shí)吊骤,都交給TransportLayer傳輸層去做。

NetworkReceive:接收的數(shù)據(jù)静尼。

Send:發(fā)送的請(qǐng)求數(shù)據(jù)白粉,一個(gè)KafkaChannel一次只存放一個(gè)請(qǐng)求數(shù)據(jù)。等著數(shù)據(jù)發(fā)送完成后茅郎,才能發(fā)送下一個(gè)請(qǐng)求數(shù)據(jù)蜗元。

TransportLayer

傳輸層對(duì)SockerChannel做了簡(jiǎn)單的封裝(都實(shí)現(xiàn)了ScatteringByteChannel和GatheringByteChannel接口),選擇器Selector在調(diào)用KafkaChannel.write和read方法時(shí)系冗,實(shí)際是調(diào)用Send.writeTo和NetworkReceive.readFrom奕扣,再調(diào)用底層SockerChannel.write和read方法

Selector輪詢(xún)

選擇器監(jiān)聽(tīng)到了客戶(hù)端的讀寫(xiě)事件,會(huì)獲取綁定到選擇鍵(SelectionKey)上的KafkaChannel掌敬,KafkaChannel會(huì)將讀寫(xiě)操作交給傳輸層(TransportLayer)惯豆,TransportLayer再使用底層的SocketChannel完成數(shù)據(jù)的操作。

NetworkClient.ready

在確認(rèn)節(jié)點(diǎn)是否可以發(fā)送的時(shí)奔害,允許連接但是沒(méi)有連接的情況下會(huì)初始化連接楷兽,調(diào)用org.apache.kafka.common.network.Selector.connect,連接動(dòng)作使用java原生的SocketChannel完成华临。在此方法中會(huì)構(gòu)建KafkaChannel芯杀,讓KafkaChannel和SelectionKey關(guān)聯(lián)起來(lái)。還維護(hù)了節(jié)點(diǎn)和KafkaChannel的映射關(guān)系(<nodeId,KafkaChannel>)雅潭。

SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); Socket socket = socketChannel.socket(); socket.setKeepAlive(true);

boolean connected = socketChannel.connect(address); // 發(fā)起連接請(qǐng)求

SelectionKey key = socketChannel.register(java.nio.channels.Selector, SelectionKey.OP_CONNECT);

KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize); // 構(gòu)建KafkaChannel對(duì)象

key.attach(channel); // 將KafkaChannel注冊(cè)到選擇鍵上

this.channels.put(id, channel); //?節(jié)點(diǎn)和KafkaChannel的映射關(guān)系

NetworkClient.send

客戶(hù)端發(fā)送的ClientRequest請(qǐng)求揭厚,經(jīng)過(guò)NetworkClient.send()--->org.apache.kafka.common.network.Selector.send()--->KafkaChannel.setSend()。保存到對(duì)應(yīng)的KafkaChannel中扶供,但在KafkaChannel還有未發(fā)送成功的Send請(qǐng)求筛圆,則后面的請(qǐng)求則不能發(fā)送(在一個(gè)KafkaChannel中,一次只能發(fā)送一個(gè)客戶(hù)端請(qǐng)求)椿浓。

KafkaChannel一次只處理一個(gè)Send太援,每次都會(huì)注冊(cè)寫(xiě)事件闽晦,當(dāng)Send發(fā)送成功后,就注銷(xiāo)寫(xiě)事件提岔。這里的發(fā)送完成是整個(gè)Send請(qǐng)求發(fā)送完成仙蛉,如果調(diào)用一次底層的write方法沒(méi)有完成寫(xiě)完,那么寫(xiě)事件不會(huì)被注銷(xiāo)唧垦,會(huì)繼續(xù)監(jiān)聽(tīng)寫(xiě)事件捅儒,直到整個(gè)Send請(qǐng)求發(fā)送完成液样。

注冊(cè)寫(xiě)事件振亮,當(dāng)Selector輪詢(xún)后,寫(xiě)事件準(zhǔn)備就緒鞭莽,就會(huì)從KafkaChannel取出客戶(hù)端請(qǐng)求坊秸,調(diào)用底層的write方法進(jìn)行發(fā)送。

NetworkClient.poll

NetworkClient的輪詢(xún)會(huì)調(diào)用Selector.poll()澎怒,在選擇鍵上處理讀寫(xiě)事件褒搔,當(dāng)事件發(fā)生時(shí),調(diào)用KafkaChannel上的read和write會(huì)得到返回值NetworkReceive和Send對(duì)象喷面,加入到List<Send>:completedSends(發(fā)送完成的客戶(hù)端請(qǐng)求對(duì)象集合)和Map<KafkaChannel,Deque<NetworkReceive>>:stagedReceives(完全接收完服務(wù)端響應(yīng)保存到KafkaChannel對(duì)應(yīng)的隊(duì)列中)星瘾。最后這些集合中的數(shù)據(jù)服務(wù)于poll方法后續(xù)的handle開(kāi)頭的方法中。

private void pollSelectionKeys(Iterable selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {

Iterator iterator = selectionKeys.iterator();

while (iterator.hasNext()) {

SelectionKey key = iterator.next();

iterator.remove();

KafkaChannel channel = channel(key); // 獲取綁定到選擇鍵中的KafkaChannel sensors.maybeRegisterConnectionMetrics(channel.id());

if (idleExpiryManager != null)

idleExpiryManager.update(channel.id(), currentTimeNanos);

try { // 處理一些剛建立 tcp 連接的 channel

if (isImmediatelyConnected || key.isConnectable()) { if (channel.finishConnect()) { // 連接已經(jīng)建立

this.connected.add(channel.id());

this.sensors.connectionCreated.record();

SocketChannel socketChannel = (SocketChannel) key.channel();

}

else continue;

}

if (channel.isConnected() && !channel.ready())

channel.prepare();

// 在讀取一個(gè)響應(yīng)的時(shí)候惧辈,可能會(huì)調(diào)用很多次的read琳状,所以需要循環(huán)讀取

if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {

NetworkReceive networkReceive;

while ((networkReceive = channel.read()) != null) // 循環(huán)接收,直到讀取到一個(gè)完整的 Receive,才退出循環(huán)

addToStagedReceives(channel, networkReceive); // 讀取完成后將響應(yīng)數(shù)據(jù)添加到集合中

}

// 底層發(fā)送的時(shí)候盒齿,并不定一次可以完全發(fā)送念逞,所以會(huì)調(diào)用很多次的write,才會(huì)完成一個(gè)Send的發(fā)送

// write是非阻塞的,不會(huì)等到全部發(fā)送才返回

// 所以在沒(méi)有全部發(fā)送的時(shí)候边翁,不會(huì)注銷(xiāo)寫(xiě)事件

//在epoll的缺省模式下(LT(水平觸發(fā))):寫(xiě)緩沖區(qū)只要不滿(mǎn)翎承,就一直會(huì)觸發(fā)寫(xiě)事件。所以只要不注銷(xiāo)寫(xiě)事件符匾,那么就會(huì)觸發(fā)寫(xiě)事件叨咖,直到把一個(gè)完整的Send發(fā)送完成

// 在LT模式下,寫(xiě)緩沖區(qū)為滿(mǎn)的概率很小啊胶,所以寫(xiě)完Send后甸各,要注銷(xiāo)寫(xiě)事件,否則會(huì)出現(xiàn)一直觸發(fā)寫(xiě)事件

if (channel.ready() && key.isWritable()) {

Send send = channel.write(); // send不為空创淡,表示完全發(fā)送出去了痴晦,返回此send對(duì)象,如果沒(méi)有完全發(fā)送出去琳彩,就返回NULL

if (send != null) {

this.completedSends.add(send); // 將完成的 send添加到list中 this.sensors.recordBytesSent(channel.id(), send.size());

}

}

if (!key.isValid()) { // 關(guān)閉斷開(kāi)的連接

close(channel);

this.disconnected.add(channel.id());

}

} catch (Exception e) {

String desc = channel.socketDescription();

close(channel); this.disconnected.add(channel.id());

}

}

}

以上就是生產(chǎn)者的產(chǎn)出客戶(hù)端請(qǐng)求通過(guò)Sender-->NetworkClient-->Selector-->KafkaChannel-->Send/NetworkReceive-->TransportLayer-->SocketChannel誊酌。這個(gè)鏈條進(jìn)行發(fā)送和消息接收部凑。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市碧浊,隨后出現(xiàn)的幾起案子涂邀,更是在濱河造成了極大的恐慌,老刑警劉巖箱锐,帶你破解...
    沈念sama閱讀 219,270評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件比勉,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡驹止,警方通過(guò)查閱死者的電腦和手機(jī)浩聋,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,489評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)臊恋,“玉大人衣洁,你說(shuō)我怎么就攤上這事《督觯” “怎么了坊夫?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,630評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)撤卢。 經(jīng)常有香客問(wèn)我环凿,道長(zhǎng),這世上最難降的妖魔是什么放吩? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,906評(píng)論 1 295
  • 正文 為了忘掉前任智听,我火速辦了婚禮,結(jié)果婚禮上屎慢,老公的妹妹穿的比我還像新娘瞭稼。我一直安慰自己,他們只是感情好腻惠,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,928評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布环肘。 她就那樣靜靜地躺著,像睡著了一般集灌。 火紅的嫁衣襯著肌膚如雪悔雹。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,718評(píng)論 1 305
  • 那天欣喧,我揣著相機(jī)與錄音腌零,去河邊找鬼。 笑死唆阿,一個(gè)胖子當(dāng)著我的面吹牛益涧,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播驯鳖,決...
    沈念sama閱讀 40,442評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼闲询,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼久免!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起扭弧,我...
    開(kāi)封第一講書(shū)人閱讀 39,345評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤阎姥,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后鸽捻,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體呼巴,經(jīng)...
    沈念sama閱讀 45,802評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,984評(píng)論 3 337
  • 正文 我和宋清朗相戀三年御蒲,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了衣赶。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,117評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡删咱,死狀恐怖屑埋,靈堂內(nèi)的尸體忽然破棺而出豪筝,到底是詐尸還是另有隱情痰滋,我是刑警寧澤,帶...
    沈念sama閱讀 35,810評(píng)論 5 346
  • 正文 年R本政府宣布续崖,位于F島的核電站敲街,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏严望。R本人自食惡果不足惜多艇,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,462評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望像吻。 院中可真熱鬧峻黍,春花似錦、人聲如沸拨匆。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,011評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)惭每。三九已至骨饿,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間台腥,已是汗流浹背宏赘。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,139評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留黎侈,地道東北人察署。 一個(gè)月前我還...
    沈念sama閱讀 48,377評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像峻汉,于是被迫代替她去往敵國(guó)和親贴汪。 傳聞我的和親對(duì)象是個(gè)殘疾皇子储藐,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,060評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容