kafka生產(chǎn)者線程負(fù)責(zé)生產(chǎn)消息顶别,而將消息發(fā)送給broker是有一個(gè)專門的發(fā)送者線程來(lái)處理的谷徙,也稱之為IO Thread,實(shí)現(xiàn)了消息的生產(chǎn)與發(fā)送解耦驯绎,提高吞吐量完慧。
IO Thread是隨著生產(chǎn)者的產(chǎn)生而啟動(dòng)的,在啟動(dòng)發(fā)送者線程之前會(huì)先初始化一個(gè)消息累加器
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.compressionType,
lingerMs(config),
retryBackoffMs,
deliveryTimeoutMs,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
apiVersions,
transactionManager,
new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
這里有幾個(gè)參數(shù)需要注意下:
kafka會(huì)將發(fā)往同一個(gè)分區(qū)的消息累積到同一個(gè)批次中剩失,就會(huì)涉及到兩個(gè)維度屈尼,空間和時(shí)間,不然會(huì)陷入無(wú)限等待批次消息的累積
batchSize拴孤,該參數(shù)就是指的空間脾歧,也就是每個(gè)批次累積多少消息,或者每個(gè)批次的分配的內(nèi)存大小演熟,太大會(huì)造成內(nèi)存浪費(fèi)鞭执,
太小的話,當(dāng)消息一多時(shí),會(huì)有多個(gè)批次需要發(fā)送兄纺,降低了吞吐量
lingerMs免猾,該參數(shù)就是指的時(shí)間,就是每個(gè)批次延遲時(shí)間的上限囤热,但是假如批次信息達(dá)到batchSize的值后,lingerMs就會(huì)失效
當(dāng)批次的大小還未達(dá)到batchSize時(shí)获三,發(fā)送消息會(huì)延遲旁蔼,即使當(dāng)前kafka沒(méi)有負(fù)載壓力的情況下
deliveryTimeoutMs,調(diào)send方法到return的時(shí)間限制疙教,包含消息的延遲發(fā)送時(shí)間棺聊、收到broker響應(yīng)的時(shí)間、發(fā)送失敗重試的時(shí)間三者總和的時(shí)間限制贞谓。
該值應(yīng)該大于或等于request.timeout.ms+linger.ms的時(shí)間
當(dāng)對(duì)生產(chǎn)者的相關(guān)屬性值和累加器都初始化后限佩,kafka會(huì)自動(dòng)啟動(dòng)一個(gè)io thread
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
其中,KafkaClient是實(shí)際負(fù)責(zé)發(fā)送消息的客戶端裸弦,底層基于nio實(shí)現(xiàn)的祟同,與broker進(jìn)行網(wǎng)絡(luò)通信;KafkaThread繼承了Thread理疙;設(shè)置了發(fā)送者線程的名稱晕城、運(yùn)行的任務(wù)runnable、以及線程模式為daemon窖贤,這里為true砖顷,因此發(fā)送者線程的主要邏輯在于runnable的run方法,也就是Sender的run
public void run() {
log.debug("Starting Kafka producer I/O thread.");
// main loop, runs until close is called
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
省略....
}
從run方法可看出赃梧,就是不斷執(zhí)行runOnce方法滤蝠,因此來(lái)看下runOnce方法的邏輯
void runOnce() {
if (transactionManager != null) {
//忽略....
//處理事務(wù)相關(guān)的
}
long currentTimeMs = time.milliseconds();
//建立與broker的連接,準(zhǔn)備待發(fā)送消息的數(shù)據(jù)
long pollTimeout = sendProducerData(currentTimeMs);
//處理連接上發(fā)生的各種IO事件授嘀,包含獲取來(lái)自broker的數(shù)據(jù)物咳,發(fā)送實(shí)際的消息對(duì)象
client.poll(pollTimeout, currentTimeMs);
}
在sendProducerData的方法中,看下以下幾個(gè)關(guān)鍵的邏輯
Cluster cluster = metadata.fetch();
// 通過(guò)累加器獲取到準(zhǔn)備發(fā)送數(shù)據(jù)的分區(qū)
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
//和這些分區(qū)的節(jié)點(diǎn)建立連接粤攒,底層是通過(guò)nio的方式建立連接的
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
//獲取待發(fā)送的批次信息
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
//省略...
//將批次信息轉(zhuǎn)化為具體的網(wǎng)絡(luò)請(qǐng)求信息所森,看如下重載的方法
sendProduceRequests(batches, now);
return pollTimeout;
//簡(jiǎn)單的說(shuō),這個(gè)方法就是將與節(jié)點(diǎn)連接的channel的監(jiān)聽(tīng)事件設(shè)置為OP_WRITE表示可寫的夯接,
//然后構(gòu)建相應(yīng)的請(qǐng)求消息體焕济,放置到ByteBuffer中
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
if (batches.isEmpty())
return;
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
//
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
String nodeId = Integer.toString(destination);
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
requestTimeoutMs, callback);
client.send(clientRequest, now);
}
sendProducerData方法就是建立與broker的連接,注冊(cè)到selector盔几,設(shè)置監(jiān)聽(tīng)事件為op_write晴弃,準(zhǔn)備好相應(yīng)的請(qǐng)求數(shù)據(jù)(待發(fā)送的消息)。需要注意的是,此時(shí)上鞠,消息還沒(méi)有真正的發(fā)送出去际邻。真正發(fā)送的消息在runOnce方法內(nèi)部調(diào)用client.poll方法
public List<ClientResponse> poll(long timeout, long now) {
ensureActive();
//省略.....
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
//這里底層就是調(diào)用selector的select方法,獲取到可以處理IO事件的channel芍阎,根據(jù)selectionKey的類型進(jìn)行相對(duì)應(yīng)的處理世曾,
//分別為isConnectable、isReadable谴咸、isWritable
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses);
return responses;
}
下面的兩個(gè)方法為與broker節(jié)點(diǎn)建立連接的底層nio實(shí)現(xiàn)
private void initiateConnect(Node node, long now) {
String nodeConnectionId = node.idString();
try {
connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup);
InetAddress address = connectionStates.currentAddress(nodeConnectionId);
log.debug("Initiating connection to node {} using address {}", node, address);
selector.connect(nodeConnectionId,
new InetSocketAddress(address, node.port()),
this.socketSendBuffer,
this.socketReceiveBuffer);
} catch (IOException e) {
//省略
}
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
ensureNotRegistered(id);
SocketChannel socketChannel = SocketChannel.open();
SelectionKey key = null;
try {
//配置channel為非阻塞模式轮听,并設(shè)置channel的發(fā)送和接收緩沖區(qū)大小
configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
//建立連接
boolean connected = doConnect(socketChannel, address);
//將channel注冊(cè)到selector上
key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
if (connected) {
immediatelyConnectedKeys.add(key);
//若連接成功了,改變channel的監(jiān)聽(tīng)事件
key.interestOps(0);
}
} catch (IOException | RuntimeException e) {
if (key != null)
immediatelyConnectedKeys.remove(key);
channels.remove(id);
socketChannel.close();
throw e;
}
}
總結(jié)岭佳,kafka的發(fā)送者線程底層使用nio來(lái)與broker建立連接與數(shù)據(jù)通信血巍,因此涉及到如何構(gòu)造發(fā)送消息的ByteBuffer對(duì)象,處理來(lái)自broker的響應(yīng)數(shù)據(jù)等珊随,但是本文只是介紹個(gè)大體的方向述寡,并沒(méi)有對(duì)細(xì)節(jié)進(jìn)行詳情的說(shuō)明