在上一篇盖腕,我們從使用方式和策略上赫冬,對(duì)消息隊(duì)列做了一個(gè)宏觀描述浓镜。從本篇開始,我們將深入到源碼內(nèi)部劲厌,仔細(xì)分析Kafka到底是如何實(shí)現(xiàn)一個(gè)分布式消息隊(duì)列竖哩。我們的分析將從Producer端開始。
從Kafka 0.8.2開始脊僚,發(fā)布了一套新的Java版的client api, KafkaProducer/KafkaConsumer相叁,替代之前的scala版的api。本系列的分析將只針對(duì)這套Java版的api辽幌。
多線程異步發(fā)送模型
下圖是經(jīng)過源碼分析之后增淹,整理出來的Producer端的架構(gòu)圖:
在上一篇我們講過,Producer有同步發(fā)送和異步發(fā)送2種策略乌企。在以前的Kafka client api實(shí)現(xiàn)中虑润,同步和異步是分開實(shí)現(xiàn)的。而在0.9中加酵,同步發(fā)送其實(shí)是通過異步發(fā)送間接實(shí)現(xiàn)拳喻,其接口如下:
public class KafkaProducer<K, V> implements Producer<K, V> {
...
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) //異步發(fā)送接口
{
...
}
}
要實(shí)現(xiàn)同步發(fā)送,只要在拿到返回的Future對(duì)象之后猪腕,直接調(diào)用get()就可以了冗澈。
基本思路
從上圖我們可以看出,異步發(fā)送的基本思路就是:send的時(shí)候陋葡,KafkaProducer把消息放到本地的消息隊(duì)列RecordAccumulator亚亲,然后一個(gè)后臺(tái)線程Sender不斷循環(huán),把消息發(fā)給Kafka集群腐缤。
要實(shí)現(xiàn)這個(gè)捌归,還得有一個(gè)前提條件:就是KafkaProducer/Sender都需要獲取集群的配置信息Metadata。所謂Metadata岭粤,也就是在上一篇所講的惜索,Topic/Partion與broker的映射關(guān)系:每一個(gè)Topic的每一個(gè)Partion,得知道其對(duì)應(yīng)的broker列表是什么剃浇,其中l(wèi)eader是誰巾兆,follower是誰。
2個(gè)數(shù)據(jù)流
所以在上圖中偿渡,有2個(gè)數(shù)據(jù)流:
Metadata流(A1,A2,A3):Sender從集群獲取信息臼寄,然后更新Metadata; KafkaProducer先讀取Metadata溜宽,然后把消息放入隊(duì)列。
消息流(B1, B2, B3):這個(gè)很好理解质帅,不再詳述适揉。
本篇著重講述Metadata流留攒,消息流,將在后續(xù)詳細(xì)講述嫉嘀。
Metadata的線程安全性
從上圖可以看出炼邀,Metadata是多個(gè)producer線程讀,一個(gè)sender線程更新剪侮,因此它必須是線程安全的拭宁。
Kafka的官方文檔上也有說明,KafkaProducer是線程安全的瓣俯,可以在多線程中調(diào)用:
The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
從下面代碼也可以看出杰标,它的所有public方法都是synchronized:
public final class Metadata {
。彩匕。腔剂。
public synchronized Cluster fetch() {
return this.cluster;
}
public synchronized long timeToNextUpdate(long nowMs) {
。驼仪。掸犬。
}
public synchronized int requestUpdate() {
。绪爸。湾碎。
}
。奠货。胜茧。
}
Metadata的數(shù)據(jù)結(jié)構(gòu)
下面代碼列舉了Metadata的主要數(shù)據(jù)結(jié)構(gòu):一個(gè)Cluster對(duì)象 + 1堆狀態(tài)變量。前者記錄了集群的配置信息仇味,后者用于控制Metadata的更新策略呻顽。
public final class Metadata {
...
private final long refreshBackoffMs; //更新失敗的情況下,下1次更新的補(bǔ)償時(shí)間(這個(gè)變量在代碼中意義不是太大)
private final long metadataExpireMs; //關(guān)鍵值:每隔多久丹墨,更新一次廊遍。缺省是600*1000,也就是10分種
private int version; //每更新成功1次贩挣,version遞增1喉前。這個(gè)變量主要用于在while循環(huán),wait的時(shí)候王财,作為循環(huán)判斷條件
private long lastRefreshMs; //上一次更新時(shí)間(也包含更新失敗的情況)
private long lastSuccessfulRefreshMs; //上一次成功更新的時(shí)間(如果每次都成功的話卵迂,則2者相等。否則绒净,lastSuccessulRefreshMs < lastRefreshMs)
private Cluster cluster; //集群配置信息
private boolean needUpdate; //是否強(qiáng)制刷新
见咒、
...
}
public final class Cluster {
...
private final List<Node> nodes; //Node也就是Broker
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition; //Topic/Partion和broker list的映射關(guān)系
private final Map<String, List<PartitionInfo>> partitionsByTopic;
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
private final Map<Integer, Node> nodesById;
}
public class PartitionInfo {
private final String topic;
private final int partition;
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
}
producer讀取Metadata
下面是send函數(shù)的源碼,可以看到挂疆,在send之前改览,會(huì)先讀取metadata下翎。如果metadata讀不到,會(huì)一直阻塞在那宝当,直到超時(shí)视事,拋出TimeoutException
//KafkaProducer
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
try {
long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs); //拿不到topic的配置信息,會(huì)一直阻塞在這庆揩,直到拋異常
... //拿到了俐东,執(zhí)行下面的send邏輯
} catch()
{}
}
//KafkaProducer
private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
if (!this.metadata.containsTopic(topic))
this.metadata.add(topic);
if (metadata.fetch().partitionsForTopic(topic) != null)
return 0; //取到topic的配置信息,直接返回
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
while (metadata.fetch().partitionsForTopic(topic) == null) { //取不到topic的配置信息订晌,一直死循環(huán)wait虏辫,直到超時(shí),拋TimeoutException
log.trace("Requesting metadata update for topic {}.", topic);
int version = metadata.requestUpdate(); //把needUpdate置為true
sender.wakeup(); //喚起sender
metadata.awaitUpdate(version, remainingWaitMs); //metadata的關(guān)鍵函數(shù)
long elapsed = time.milliseconds() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
if (metadata.fetch().unauthorizedTopics().contains(topic))
throw new TopicAuthorizationException(topic);
remainingWaitMs = maxWaitMs - elapsed;
}
return time.milliseconds() - begin;
}
//Metadata
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
if (maxWaitMs < 0) {
throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
}
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
while (this.version <= lastVersion) { //當(dāng)Sender成功更新meatadata之后腾仅,version加1乒裆。否則會(huì)循環(huán),一直wait
if (remainingWaitMs != 0
wait(remainingWaitMs); //線程的wait機(jī)制推励,wait和synchronized的配合使用
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs) //wait時(shí)間超出了最長等待時(shí)間
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
}
}
總結(jié):從上面代碼可以看出鹤耍,producer wait metadata的時(shí)候,有2個(gè)條件:
(1) while (metadata.fetch().partitionsForTopic(topic) == null)
(2)while (this.version <= lastVersion)
有wait就會(huì)有notify验辞,notify在Sender更新Metadata的時(shí)候發(fā)出稿黄。
Sender的創(chuàng)建
下面是KafkaProducer的構(gòu)造函數(shù),從代碼可以看出跌造,Sender就是KafkaProducer中創(chuàng)建的一個(gè)Thread.
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
try {
...
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); //構(gòu)造metadata
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); //往metadata中杆怕,填入初始的,配置的node列表
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
NetworkClient client = new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", metricTags, channelBuilder),
this.metadata,
clientId,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
this.sender = new Sender(client, //構(gòu)造一個(gè)sender壳贪。sender本身實(shí)現(xiàn)的是Runnable接口
this.metadata,
this.accumulator,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
(short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
config.getInt(ProducerConfig.RETRIES_CONFIG),
this.metrics,
new SystemTime(),
clientId,
this.requestTimeoutMs);
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start(); //一個(gè)線程陵珍,開啟sender
Sender poll()更新Metadata
public void run() {
// main loop, runs until close is called
while (running) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
。违施。互纯。
}
public void run(long now) {
Cluster cluster = metadata.fetch();
。磕蒲。留潦。
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); //遍歷消息隊(duì)列中所有的消息,找出對(duì)應(yīng)的辣往,已經(jīng)ready的Node
if (result.unknownLeadersExist) //如果一個(gè)ready的node都沒有兔院,請(qǐng)求更新metadata
this.metadata.requestUpdate();
。站削。坊萝。
//client的2個(gè)關(guān)鍵函數(shù),一個(gè)發(fā)送ClientRequest,一個(gè)接收ClientResponse屹堰。底層調(diào)用的是NIO的poll肛冶。關(guān)于nio, 后面會(huì)詳細(xì)介紹
for (ClientRequest request : requests)
client.send(request, now);
this.client.poll(pollTimeout, now);
}
//NetworkClient
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now); //關(guān)鍵點(diǎn):每次poll的時(shí)候判斷是否要更新metadata
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow); //在返回的handler中街氢,會(huì)處理metadata的更新
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow);
// invoke callbacks
for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
response.request().callback().onComplete(response);
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
return responses;
}
//DefaultMetadataUpdater
@Override
public long maybeUpdate(long now) {
// should we update our metadata?
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
// if there is no node available to connect, back off refreshing metadata
long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
waitForMetadataFetch);
if (metadataTimeout == 0) {
// highly dependent on the behavior of leastLoadedNode.
Node node = leastLoadedNode(now); //找到負(fù)載最小的Node
maybeUpdate(now, node); //把更新Metadata的請(qǐng)求扯键,發(fā)給這個(gè)Node
}
return metadataTimeout;
}
private void maybeUpdate(long now, Node node) {
if (node == null) {
log.debug("Give up sending metadata request since no node is available");
// mark the timestamp for no node available to connect
this.lastNoNodeAvailableMs = now;
return;
}
String nodeConnectionId = node.idString();
if (canSendRequest(nodeConnectionId)) {
Set<String> topics = metadata.needMetadataForAllTopics() ? new HashSet<String>() : metadata.topics();
this.metadataFetchInProgress = true;
ClientRequest metadataRequest = request(now, nodeConnectionId, topics); //關(guān)鍵點(diǎn):發(fā)送更新Metadata的Request
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
doSend(metadataRequest, now); //這里只是異步發(fā)送,返回的response在上面的handleCompletedReceives里面處理
} else if (connectionStates.canConnect(nodeConnectionId, now)) {
log.debug("Initialize connection to node {} for sending metadata request", node.id());
initiateConnect(node, now);
} else { // connected, but can't send more OR connecting
this.lastNoNodeAvailableMs = now;
}
}
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
ClientRequest req = inFlightRequests.completeNext(source);
ResponseHeader header = ResponseHeader.parse(receive.payload());
// Always expect the response version id to be the same as the request version id
short apiKey = req.request().header().apiKey();
short apiVer = req.request().header().apiVersion();
Struct body = (Struct) ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload());
correlate(req.request().header(), header);
if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
responses.add(new ClientResponse(req, now, false, body));
}
}
@Override
public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
short apiKey = req.request().header().apiKey();
if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
handleResponse(req.request().header(), body, now);
return true;
}
return false;
}
//關(guān)鍵函數(shù)
private void handleResponse(RequestHeader header, Struct body, long now) {
this.metadataFetchInProgress = false;
MetadataResponse response = new MetadataResponse(body);
Cluster cluster = response.cluster(); //從response中珊肃,拿到一個(gè)新的cluster對(duì)象
if (response.errors().size() > 0) {
log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), response.errors());
}
if (cluster.nodes().size() > 0) {
this.metadata.update(cluster, now); //更新metadata荣刑,用新的cluster覆蓋舊的cluster
} else {
log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
this.metadata.failedUpdate(now); //更新metadata失敗,做失敗處理邏輯
}
}
//更新成功伦乔,version+1, 同時(shí)更新其它字段
public synchronized void update(Cluster cluster, long now) {
this.needUpdate = false;
this.lastRefreshMs = now;
this.lastSuccessfulRefreshMs = now;
this.version += 1;
for (Listener listener: listeners)
listener.onMetadataUpdate(cluster); //如果有人監(jiān)聽了metadata的更新厉亏,通知他們
this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster; //新的cluster覆蓋舊的cluster
notifyAll(); //通知所有的阻塞的producer線程
log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}
//更新失敗,只更新lastRefreshMs
public synchronized void failedUpdate(long now) {
this.lastRefreshMs = now;
}
從上面可以看出烈和,Metadata的更新爱只,是在while循環(huán),每次調(diào)用client.poll()的時(shí)候更新的招刹。
更新機(jī)制又有以下2種:
Metadata的2種更新機(jī)制
(1)周期性的更新: 每隔一段時(shí)間更新一次恬试,這個(gè)通過 Metadata的lastRefreshMs, lastSuccessfulRefreshMs 這2個(gè)字段來實(shí)現(xiàn)
對(duì)應(yīng)的ProducerConfig配置項(xiàng)為:
metadata.max.age.ms //缺省300000,即10分鐘1次
(2) 失效檢測(cè)疯暑,強(qiáng)制更新:檢查到metadata失效以后训柴,調(diào)用metadata.requestUpdate()強(qiáng)制更新。 requestUpdate()函數(shù)里面其實(shí)什么都沒做妇拯,就是把needUpdate置成了false
每次poll的時(shí)候幻馁,都檢查這2種更新機(jī)制,達(dá)到了越锈,就觸發(fā)更新仗嗦。
那如何判定Metadata失效了呢?這個(gè)在代碼中很分散甘凭,有很多地方稀拐,會(huì)判定Metadata失效。
Metadata失效檢測(cè)
條件1:initConnect的時(shí)候
private void initiateConnect(Node node, long now) {
String nodeConnectionId = node.idString();
try {
log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
this.connectionStates.connecting(nodeConnectionId, now);
selector.connect(nodeConnectionId,
new InetSocketAddress(node.host(), node.port()),
this.socketSendBuffer,
this.socketReceiveBuffer);
} catch (IOException e) {
connectionStates.disconnected(nodeConnectionId, now);
metadataUpdater.requestUpdate(); //判定metadata失效
log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
}
}
條件2:poll里面IO的時(shí)候对蒲,連接斷掉了
private void handleDisconnections(List<ClientResponse> responses, long now) {
for (String node : this.selector.disconnected()) {
log.debug("Node {} disconnected.", node);
processDisconnection(responses, node, now);
}
if (this.selector.disconnected().size() > 0)
metadataUpdater.requestUpdate(); //判定metadata失效
}
條件3:有請(qǐng)求超時(shí)
private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
List<String> nodeIds = this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs);
for (String nodeId : nodeIds) {
this.selector.close(nodeId);
log.debug("Disconnecting from node {} due to request timeout.", nodeId);
processDisconnection(responses, nodeId, now);
}
if (nodeIds.size() > 0)
metadataUpdater.requestUpdate(); //判定metadata失效
}
條件4:發(fā)消息的時(shí)候钩蚊,有partition的leader沒找到
public void run(long now) {
Cluster cluster = metadata.fetch();
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
if (result.unknownLeadersExist)
this.metadata.requestUpdate();
條件5:返回的response和請(qǐng)求對(duì)不上的時(shí)候
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {
int correlationId = response.request().request().header().correlationId();
if (response.wasDisconnected()) {
log.trace("Cancelled request {} due to node {} being disconnected", response, response.request()
.request()
.destination());
for (RecordBatch batch : batches.values())
completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlationId, now);
總之1句話:發(fā)生各式各樣的異常,數(shù)據(jù)不同步蹈矮,都認(rèn)為metadata可能出問題了砰逻,要求更新。
Metadata其他的更新策略
除了上面所述泛鸟,Metadata的更新蝠咆,還有以下幾個(gè)特點(diǎn):
1.更新請(qǐng)求MetadataRequest是nio異步發(fā)送的,在poll的返回中,處理MetadataResponse的時(shí)候刚操,才真正更新Metadata闸翅。
這里有個(gè)關(guān)鍵點(diǎn):Metadata的cluster對(duì)象,每次是整個(gè)覆蓋的菊霜,而不是局部更新坚冀。所以cluster內(nèi)部不用加鎖。
2.更新的時(shí)候鉴逞,是從metadata保存的所有Node记某,或者說Broker中,選負(fù)載最小的那個(gè)构捡,也就是當(dāng)前接收請(qǐng)求最少的那個(gè)液南。向其發(fā)送MetadataRequest請(qǐng)求,獲取新的Cluster對(duì)象勾徽。
歡迎加入QQ群:104286694