上一篇文章講的是在主線(xiàn)程慎皱,消息在調(diào)用了send后楼雹,消息內(nèi)容和該消息關(guān)聯(lián)的future對(duì)象被一起放入了RecordAccumulator中,future對(duì)象最終被send方法返回闲礼。對(duì)于客戶(hù)端來(lái)說(shuō)寡具,send方法返回了秤茅,但是send方法返回并不代表消息已經(jīng)被成功發(fā)送到Broker了,如果接下去的任意行為都是需要確保消息成功發(fā)送的情況下進(jìn)行童叠,客戶(hù)端需要調(diào)用future.get()
等待future的完成嫂伞。
這一節(jié)繼續(xù)接下去的工作。消息被主線(xiàn)程放入RecordAccumulator后拯钻,主線(xiàn)程早就撒手不管了帖努,這時(shí)一個(gè)叫做Sender線(xiàn)程會(huì)從RecordAccumulator把消息拉出來(lái),并且發(fā)送給Broker粪般。Sender線(xiàn)程早在構(gòu)造KafkaProducer的時(shí)候拼余,已經(jīng)被創(chuàng)建和啟動(dòng)。
KafkaProducer(ProducerConfig config,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
Metadata metadata,
KafkaClient kafkaClient) {
try {
// ...
this.sender = new Sender(logContext,
client,
this.metadata,
this.accumulator,
maxInflightRequests == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
retries,
metricsRegistry.senderMetrics,
Time.SYSTEM,
this.requestTimeoutMs,
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager,
apiVersions);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
// ...
} catch (Throwable t) {
// ...
}
}
Sender線(xiàn)程是一個(gè)事件循環(huán)亩歹,總是在while循環(huán)中做一些事情匙监,接下來(lái)主要分析這兩個(gè)事情
// org.apache.kafka.clients.producer.internals.Sender#run(long)
void run(long now) {
// ..
long pollTimeout = sendProducerData(now);
client.poll(pollTimeout, now);
}
在sendProducerData
方法中,簡(jiǎn)化下它的主要邏輯
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// code
// remove any nodes we aren't ready to send to
// code
// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
this.maxRequestSize, now);
// code
// code
sendProduceRequests(batches, now);
return pollTimeout;
}
首先調(diào)用this.accumulator.ready(cluster, now)
找到哪一些Broker是已經(jīng)準(zhǔn)備好的小作。然后再調(diào)用this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now)
將那些已經(jīng)準(zhǔn)備的Broker上的Batch進(jìn)行重新整理后亭姥,全部從RecordAccumulator的Deque中取出來(lái),發(fā)送出去顾稀。