在上一篇我們分析了Java NIO的原理和使用方式,本篇將進一步分析Kafka client是如何基于NIO構建自己的network層之斯。
network層的分層架構
下圖展示了從最上層的KafkaProducer到最底層的Java NIO的構建層次關系:
圖中淡紫色的方框表示接口或者抽象類光涂,白色方框是具體實現(xiàn)庞萍。
整個架構圖也體現(xiàn)了“面向接口編程”的思想:最底層Java NIO往上層全部以接口形式暴露,上面的3層忘闻,也都定義了相應的接口钝计,逐層往上暴露。
接口的實例化(包括KafkaClient, Selectable, ChannelBuilder),也都在最外層的容器類KafkaProducer的構造函數(shù)中完成私恬,KafkaProducer也就充當了一個“工廠”的角色债沮,裝配所有這些底層組件。
network層組件與NIO組件的映射關系
從上圖也可以看出:
KakfaChannel基本是對SocketChannel的封裝本鸣,只是這個中間多個一個間接層:TransportLayer疫衩,為了封裝普通和加密的Channel;
Send/NetworkReceive是對ByteBuffer的封裝荣德,表示一次請求的數(shù)據(jù)包闷煤;
Kafka的Selector封裝了NIO的Selector,內(nèi)含一個NIO Selector對象涮瞻。
Kafka Selector實現(xiàn)思路
1.從上圖可以看出鲤拿, Selector內(nèi)部包含一個Map, 也就是它維護了所有連接的連接池。這些KafkaChannel都由ChannelBuilder接口創(chuàng)建饲宛。
private final Map<String, KafkaChannel> channels;
2.所有的io操作:connect, read, write其實都是在poll這1個函數(shù)里面完成的皆愉。具體什么意思呢?
NetworkClient的send()函數(shù)艇抠,調(diào)用了selector.send(Send send)幕庐, 但這個時候數(shù)據(jù)并沒有真的發(fā)送出去,只是暫存在了selector內(nèi)部相對應的channel里面家淤。下面看代碼:
//Selector
public void send(Send send) {
KafkaChannel channel = channelOrFail(send.destination()); //找到數(shù)據(jù)包相對應的connection
try {
channel.setSend(send); //暫存在這個connection(channel)里面
} catch (CancelledKeyException e) {
this.failedSends.add(send.destination());
close(channel);
}
}
//KafkaChannel
public void setSend(Send send) {
if (this.send != null) //關鍵點:當前的沒有發(fā)出去之前异剥,不能暫存下1個!P踔亍冤寿!關于這個,后面還要詳細分析
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send; //暫存這個數(shù)據(jù)包
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
public class KafkaChannel {
private final String id;
private final TransportLayer transportLayer;
private final Authenticator authenticator;
private final int maxReceiveSize;
private NetworkReceive receive;
private Send send; //關鍵點:1個channel一次只能存放1個數(shù)據(jù)包青伤,在當前的send數(shù)據(jù)包沒有完整發(fā)出去之前督怜,不能存放下一個
...
}
暫存在channel中之后,poll函數(shù)進行處理狠角,我們抽象出一個輸入-輸出模型如下:
輸入:暫存的send數(shù)據(jù)包
輸出:完成的sends, 完成的receive(針對上1次的send), 建立的連接号杠, 斷掉的連接。
@Override
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
clear(); //關鍵點:每次poll之前丰歌,會清空“輸出”
if (hasStagedReceives())
timeout = 0;
/* check ready keys */
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
long endSelect = time.nanoseconds();
currentTimeNanos = endSelect;
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (readyKeys > 0) {
Set<SelectionKey> keys = this.nioSelector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
KafkaChannel channel = channel(key);
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
lruConnections.put(channel.id(), currentTimeNanos);
try {
/* complete any connections that have finished their handshake */
if (key.isConnectable()) {
channel.finishConnect(); //把建立的連接姨蟋,加入輸出結果集合
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
}
...
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
}
if (channel.ready() && key.isWritable()) {
Send send = channel.write();
if (send != null) {
this.completedSends.add(send); //把完成的發(fā)送,加入輸出結果集合
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
if (!key.isValid()) {
close(channel);
this.disconnected.add(channel.id()); //把斷掉的連接立帖,加入輸出結果集合
}
} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel);
this.disconnected.add(channel.id()); //把斷掉的連接眼溶,加入輸出結果集合
}
}
}
addToCompletedReceives(); //把完成的接收,加入輸出結果集合
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
maybeCloseOldestConnection();
}
核心原理之1 – 消息的分包
在上面的代碼中晓勇,為什么會有addToStagedReceives堂飞? 什么叫做staged receives呢灌旧? 這叫要從數(shù)據(jù)的分包說起:
在NetworkClient中,往下傳的是一個完整的ClientRequest酝静,進到Selector节榜,暫存到channel中的羡玛,也是一個完整的Send對象(1個數(shù)據(jù)包)别智。但這個Send對象,交由底層的channel.write(Bytebuffer b)的時候稼稿,并不一定一次可以完全發(fā)送薄榛,可能要調(diào)用多次write,才能把一個Send對象完全發(fā)出去让歼。這是因為write是非阻塞的敞恋,不是等到完全發(fā)出去,才會返回谋右。所以才有上面的代碼:
if (channel.ready() && key.isWritable()) {
Send send = channel.write(); //send不為空硬猫,表示完全發(fā)送出去,返回發(fā)出去的這個Send對象改执。如果沒完全發(fā)出去啸蜜,返回null
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
同樣,在接收的時候辈挂,channel.read(Bytebuffer b)衬横,一個response也可能要read多次,才能完全接收终蒂。所以就有了上面的while循環(huán)代碼:
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null) //循環(huán)接收蜂林,直到1個response完全接收到,才會從while循環(huán)退出
addToStagedReceives(channel, networkReceive);
}
核心原理之2 – 消息的分界
從上面知道拇泣,底層數(shù)據(jù)的通信噪叙,是在每一個channel上面,2個源源不斷的byte流霉翔,一個send流睁蕾,一個receive流。
send的時候早龟,還好說惫霸,發(fā)送之前知道一個完整的消息的大小葱弟;
那接收的時候壹店,我怎么知道一個msg response什么時候結束,然后開始接收下一個response呢芝加?
這就需要一個小技巧:在所有request硅卢,response頭部射窒,首先是一個定長的,4字節(jié)的頭将塑,receive的時候脉顿,至少調(diào)用2次read,先讀取這4個字節(jié)点寥,獲取整個response的長度艾疟,接下來再讀取消息體。
public class NetworkReceive implements Receive {
private final String source;
private final ByteBuffer size; //頭部4字節(jié)的buffer
private final int maxSize;
private ByteBuffer buffer; //后面整個消息response的buffer
public NetworkReceive(String source) {
this.source = source;
this.size = ByteBuffer.allocate(4); //先分配4字節(jié)的頭部
this.buffer = null;
this.maxSize = UNLIMITED;
}
}
核心原理之3 - 消息時序保證
在InFlightRequests中敢辩,存放了所有發(fā)出去蔽莱,但是response還沒有回來的request。request發(fā)出去的時候戚长,入對盗冷;response回來,就把相對應的request出對同廉。
final class InFlightRequests {
private final int maxInFlightRequestsPerConnection;
private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();
}
這個有個關鍵點:我們注意到request與response的配對仪糖,在這里是用隊列表達的,而不是Map迫肖。用隊列的入隊锅劝,出隊,完成2者的匹配咒程。要實現(xiàn)這個鸠天,服務器就必須要保證消息的時序:即在一個socket上面,假如發(fā)出去的reqeust是0, 1, 2帐姻,那返回的response的順序也必須是0, 1, 2稠集。
但是服務器是1 + N + M模型,所有的請求進入一個requestQueue饥瓷,然后是多線程并行處理的剥纷。那它如何保證消息的時序呢?
答案是mute/unmute機制:每當一個channel上面接收到一個request呢铆,這個channel就會被mute晦鞋,然后等response返回之后,才會再unmute棺克。這樣就保證了同1個連接上面悠垛,同時只會有1個請求被處理。
下面是服務端的代碼:
selector.completedReceives.asScala.foreach { receive =>
try {
val channel = selector.channel(receive.source)
val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
channel.socketAddress)
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
requestChannel.sendRequest(req)
} catch {
case e @ (_: InvalidRequestException | _: SchemaException) =>
// note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
error("Closing socket for " + receive.source + " because of error", e)
close(selector, receive.source)
}
selector.mute(receive.source) //收到請求娜谊,把這個請求對應的channel, mute
}
selector.completedSends.asScala.foreach { send =>
val resp = inflightResponses.remove(send.destination).getOrElse {
throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
}
resp.request.updateRequestMetrics()
selector.unmute(send.destination) //發(fā)送response之后确买,把這個responese對應的channel, unmute
}
NetworkClient實現(xiàn)思路
上面已經(jīng)講到:
(1)Selector維護了所有連接的連接池,所有連接上纱皆,消息的發(fā)送湾趾、接收都是通過poll函數(shù)進行的
(2)一個channel一次只能暫存1個Send對象芭商。
但如果這個Send對象,一次poll之后搀缠,沒有完全發(fā)送出去怎么辦呢铛楣?看上層NetworkClient怎么處理的:
關鍵的client.ready函數(shù)
先從Sender的run()函數(shù)看起:
public void run(long now) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
if (result.unknownLeadersExist)
this.metadata.requestUpdate();
// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) { //關鍵函數(shù)!R掌铡簸州!
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// create produce requests
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
result.readyNodes,
this.maxRequestSize,
now);
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now);
// update sensors
for (RecordBatch expiredBatch : expiredBatches)
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
sensors.updateProduceRequestMetrics(batches);
List<ClientRequest> requests = createProduceRequests(batches, now);
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (result.readyNodes.size() > 0) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
log.trace("Created {} produce requests: {}", requests.size(), requests);
pollTimeout = 0;
}
for (ClientRequest request : requests) //每個request分屬于不同的Node
client.send(request, now); //client的send就是直接調(diào)用了selector.send,消息暫存在channel里面衷敌,沒有發(fā)送
this.client.poll(pollTimeout, now); //調(diào)用selector.poll勿侯,處理連接、發(fā)送缴罗、接收
}
在上面的代碼中,有一個關鍵函數(shù):client.ready(Node n, ..)祭埂, 這個函數(shù)內(nèi)部會判斷這個node有沒有ready面氓,如果沒有ready,就會從readNodes里面移除蛆橡,接下來就不會往這個Node發(fā)送消息舌界。
那什么叫ready呢? 我們看一下代碼:
public boolean ready(Node node, long now) {
if (isReady(node, now))
return true;
if (connectionStates.canConnect(node.idString(), now))
initiateConnect(node, now);
return false;
}
public boolean isReady(Node node, long now) {
return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString());
}
private boolean canSendRequest(String node) {
return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);
}
public boolean canSendMore(String node) {
Deque<ClientRequest> queue = requests.get(node);
return queue == null || queue.isEmpty() ||
(queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}
public boolean completed() {
return remaining <= 0 && !pending;
}
上面的代碼封了好幾層泰演,但總結下來呻拌,一個Node ready,可以向其發(fā)送請求睦焕,需要符合以下幾個條件:
- metadata正常藐握,不需要update: !metadataUpdater.isUpdateDue(now)
- 連接正常 connectionStates.isConnected(node)
- channel是ready狀態(tài):這個對于PlaintextChannel, 一直返回true
- 當前該channel中垃喊,沒有in flight request猾普,所有請求都處理完了
- 當前該channel中,隊列尾部的request已經(jīng)完全發(fā)送出去, request.completed()本谜,并且inflight request數(shù)目初家,沒有超過設定的最大值
缺省為5,即允許在“天上飛”的request最多有5個乌助,所謂在“天上飛”溜在,就是發(fā)出去了,response還沒有回來)
而上面的第5個條件他托,正是解決了上面的問題:一個channel里面的Send對象要是只發(fā)送了部分掖肋,下1次就不會處于ready狀態(tài)了。
client.poll函數(shù)
下面看一下client.poll上祈,是如何封裝selector.poll的:
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
//上面說到培遵,selector.poll函數(shù)浙芙,會把處理結果,放到一堆的狀態(tài)變量里面(輸出結果集)籽腕,現(xiàn)在就是處理這堆輸出結果的時候了嗡呼。
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow);
// invoke callbacks
for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
response.request().callback().onComplete(response);
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
return responses;
}
//Selector中的那堆狀態(tài)變量,在每次poll之前,被clear情況掉皇耗,每次poll之后南窗,填充。
//然后在client.poll里面郎楼,這堆輸出結果被處理
public class Selector implements Selectable {
万伤。。呜袁。
private final List<Send> completedSends;
private final List<NetworkReceive> completedReceives;
private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
private final List<String> disconnected;
private final List<String> connected;
敌买。。阶界。
}
連接檢測 & 自動重連機制
在所有tcp長鏈接的編程中虹钮,都有一個基本問題要解決:如何判斷1個連接是否斷開?客戶端需要維護所有連接的狀態(tài)(connecting, connected, disconnected)膘融,然后根據(jù)連接狀態(tài)做不同邏輯芙粱。
但在NIO中,并沒有一個函數(shù)氧映,可以直接告訴你一個連接是否斷開了春畔;在NetworkClient里面,也并沒有開一個線程岛都,不斷發(fā)送心跳消息律姨,來檢測連接。那它是如何處理的呢疗绣?
檢測連接斷開的手段
在networkClient的實現(xiàn)中线召,用了3種手段,來判斷一個連接是否斷開:
手段1:所有的IO函數(shù)多矮,connect, finishConnect, read, write都會拋IOException缓淹,因此任何時候,調(diào)用這些函數(shù)的時候塔逃,只要拋異常讯壶,就認為連接已經(jīng)斷開。
手段2:selectionKey.isValid()
手段3:inflightRequests湾盗,所有發(fā)出去的request伏蚊,都設置有一個response返回的時間。在這個時間內(nèi)格粪,response沒有回來躏吊,就認為連接斷了氛改。
前2種手段,都集中在Select.poll函數(shù)里面:
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
clear();
if (hasStagedReceives())
timeout = 0;
/* check ready keys */
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
long endSelect = time.nanoseconds();
currentTimeNanos = endSelect;
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (readyKeys > 0) {
Set<SelectionKey> keys = this.nioSelector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
KafkaChannel channel = channel(key);
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
lruConnections.put(channel.id(), currentTimeNanos);
try {
/* complete any connections that have finished their handshake */
if (key.isConnectable()) {
channel.finishConnect();
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
}
/* if channel is not ready finish prepare */
if (channel.isConnected() && !channel.ready())
channel.prepare();
/* if channel is ready read from any connections that have readable data */
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
}
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
Send send = channel.write();
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
if (!key.isValid()) { //手段2
close(channel);
this.disconnected.add(channel.id());
}
} catch (Exception e) { //手段1:任何一個io函數(shù)比伏,只要拋錯胜卤,就認為連接斷了
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel);
this.disconnected.add(channel.id());
}
}
}
addToCompletedReceives();
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
maybeCloseOldestConnection();
}
第3種手段,在NetworkClient里面:
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow); //手段3:處理所有TimeOutRequests
for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
response.request().callback().onComplete(response);
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
return responses;
}
private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) {
connectionStates.disconnected(nodeId, now);
for (ClientRequest request : this.inFlightRequests.clearAll(nodeId)) {
log.trace("Cancelled request {} due to node {} being disconnected", request, nodeId);
if (!metadataUpdater.maybeHandleDisconnection(request)) //把MetaDataRequest排除在外赁项,其它所有請求葛躏,只要超時,就認為連接斷開
responses.add(new ClientResponse(request, now, true, null));
}
}
除了上述的2個地方悠菜,還要一個地方舰攒,就是初始化的時候
private void initiateConnect(Node node, long now) {
String nodeConnectionId = node.idString();
try {
log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
this.connectionStates.connecting(nodeConnectionId, now);
selector.connect(nodeConnectionId,
new InetSocketAddress(node.host(), node.port()),
this.socketSendBuffer,
this.socketReceiveBuffer);
} catch (IOException e) { //檢測到連接斷開
connectionStates.disconnected(nodeConnectionId, now);
metadataUpdater.requestUpdate();
log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
}
}
檢測時機
從上面代碼我們可以看出,連接的檢測時機悔醋,有2個:
一個是初始建立連接的時候摩窃,一個就是每次poll循環(huán),每poll一次篙顺,就收集到一個斷開的連接集合偶芍。
下面分別是Selector和NetworkClient中,關于連接狀態(tài)的數(shù)據(jù)結構:
//Selector中的連接狀態(tài)
public class Selector implements Selectable {
private final List<String> disconnected;
private final List<String> connected;
..
}
//NetworkClient中的連接狀態(tài)維護
public class NetworkClient implements KafkaClient {
private final ClusterConnectionStates connectionStates;
...
}
final class ClusterConnectionStates {
private final long reconnectBackoffMs; //重連的時間間隔
private final Map<String, NodeConnectionState> nodeState;
}
private static class NodeConnectionState {
ConnectionState state;
long lastConnectAttemptMs; //上1次發(fā)起重連的時間
...
}
public enum ConnectionState {
DISCONNECTED, CONNECTING, CONNECTED
}
總結:
- Selector中的連接狀態(tài)德玫,在每次poll之前,會調(diào)用clear清空椎麦;在poll之后宰僧,收集。
- Selector中的連接狀態(tài)观挎,會傳給上層NetworkClient琴儿,用于它更新自己的連接狀態(tài)
- 出了來自Selctor,NetworkClient自己內(nèi)部的inflightRequests嘁捷,也就是上面的手段3造成, 也用于檢測連接狀態(tài)。
通過上面的機制雄嚣,就保證了NetworkClient可以實時準確維護所有connection的狀態(tài)晒屎。
自動重連 - ready函數(shù)
狀態(tài)知道了,那剩下的就是自動重連了缓升。這個發(fā)生在更上層的Send的run函數(shù)里面:
//Sender
public void run(long now) {
Cluster cluster = metadata.fetch();
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
if (result.unknownLeadersExist)
this.metadata.requestUpdate();
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) { //關鍵的ready函數(shù)
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
public boolean ready(Node node, long now) {
if (isReady(node, now))
return true;
if (connectionStates.canConnect(node.idString(), now))
initiateConnect(node, now); //發(fā)起重連
return false;
}
public boolean canConnect(String id, long now) {
NodeConnectionState state = nodeState.get(id);
if (state == null)
return true;
else
return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs >= this.reconnectBackoffMs;
}
從上面函數(shù)可以看出鼓鲁,每次Send發(fā)數(shù)據(jù)之前,會先調(diào)用client.ready(node)判斷該node的連接是否可用港谊。
在ready內(nèi)部骇吭,如果連接不是connected狀態(tài),會再判斷是否可以發(fā)起自動重連歧寺,檢測條件有2個:
條件1: 它不能是connecting狀態(tài)燥狰,必須是disconnected
條件2: 重連不能太頻繁棘脐。當前時間距離上1次重連時間,要有一定的間隔龙致。如果broker掛了蛀缝,你太頻繁的重連也不起作用。
這里有個關鍵點:因為都是非阻塞調(diào)用净当,本次雖然檢測到連接斷了内斯,但只是發(fā)起連接,不會等到連接建立好了像啼,再執(zhí)行下面的代碼俘闯。
會在poll之后,判斷連接是否建立忽冻;在下1次或者下幾次poll之前真朗,可能連接才會建立好,ready才會返回true.
歡迎加入QQ群:104286694