Kafka源碼分析-序列4 -Producer -network層核心原理

在上一篇我們分析了Java NIO的原理和使用方式,本篇將進一步分析Kafka client是如何基于NIO構建自己的network層之斯。

network層的分層架構

下圖展示了從最上層的KafkaProducer到最底層的Java NIO的構建層次關系:
圖中淡紫色的方框表示接口或者抽象類光涂,白色方框是具體實現(xiàn)庞萍。

整個架構圖也體現(xiàn)了“面向接口編程”的思想:最底層Java NIO往上層全部以接口形式暴露,上面的3層忘闻,也都定義了相應的接口钝计,逐層往上暴露。

接口的實例化(包括KafkaClient, Selectable, ChannelBuilder),也都在最外層的容器類KafkaProducer的構造函數(shù)中完成私恬,KafkaProducer也就充當了一個“工廠”的角色债沮,裝配所有這些底層組件。


1.png

network層組件與NIO組件的映射關系

從上圖也可以看出:

KakfaChannel基本是對SocketChannel的封裝本鸣,只是這個中間多個一個間接層:TransportLayer疫衩,為了封裝普通和加密的Channel;
Send/NetworkReceive是對ByteBuffer的封裝荣德,表示一次請求的數(shù)據(jù)包闷煤;
Kafka的Selector封裝了NIO的Selector,內(nèi)含一個NIO Selector對象涮瞻。

Kafka Selector實現(xiàn)思路

1.從上圖可以看出鲤拿, Selector內(nèi)部包含一個Map, 也就是它維護了所有連接的連接池。這些KafkaChannel都由ChannelBuilder接口創(chuàng)建饲宛。

private final Map<String, KafkaChannel> channels;

2.所有的io操作:connect, read, write其實都是在poll這1個函數(shù)里面完成的皆愉。具體什么意思呢?

NetworkClient的send()函數(shù)艇抠,調(diào)用了selector.send(Send send)幕庐, 但這個時候數(shù)據(jù)并沒有真的發(fā)送出去,只是暫存在了selector內(nèi)部相對應的channel里面家淤。下面看代碼:

//Selector  
    public void send(Send send) {  
        KafkaChannel channel = channelOrFail(send.destination());  //找到數(shù)據(jù)包相對應的connection  
        try {  
            channel.setSend(send);  //暫存在這個connection(channel)里面  
        } catch (CancelledKeyException e) {  
            this.failedSends.add(send.destination());  
            close(channel);  
        }  
    }  
  
//KafkaChannel  
    public void setSend(Send send) {  
        if (this.send != null)  //關鍵點:當前的沒有發(fā)出去之前异剥,不能暫存下1個!P踔亍冤寿!關于這個,后面還要詳細分析  
            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");  
        this.send = send;   //暫存這個數(shù)據(jù)包  
        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);  
    }  
  
public class KafkaChannel {  
    private final String id;  
    private final TransportLayer transportLayer;  
    private final Authenticator authenticator;  
    private final int maxReceiveSize;  
    private NetworkReceive receive;  
    private Send send;   //關鍵點:1個channel一次只能存放1個數(shù)據(jù)包青伤,在當前的send數(shù)據(jù)包沒有完整發(fā)出去之前督怜,不能存放下一個  
    ...  
}

暫存在channel中之后,poll函數(shù)進行處理狠角,我們抽象出一個輸入-輸出模型如下:
輸入:暫存的send數(shù)據(jù)包
輸出:完成的sends, 完成的receive(針對上1次的send), 建立的連接号杠, 斷掉的連接。


2.png
@Override  
public void poll(long timeout) throws IOException {  
    if (timeout < 0)  
        throw new IllegalArgumentException("timeout should be >= 0");  
    clear();  //關鍵點:每次poll之前丰歌,會清空“輸出”  
    if (hasStagedReceives())  
        timeout = 0;  
    /* check ready keys */  
    long startSelect = time.nanoseconds();  
    int readyKeys = select(timeout);  
    long endSelect = time.nanoseconds();  
    currentTimeNanos = endSelect;  
    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());  
  
    if (readyKeys > 0) {  
        Set<SelectionKey> keys = this.nioSelector.selectedKeys();  
        Iterator<SelectionKey> iter = keys.iterator();  
        while (iter.hasNext()) {  
            SelectionKey key = iter.next();  
            iter.remove();  
            KafkaChannel channel = channel(key);  
  
            // register all per-connection metrics at once  
            sensors.maybeRegisterConnectionMetrics(channel.id());  
            lruConnections.put(channel.id(), currentTimeNanos);  
  
            try {  
                /* complete any connections that have finished their handshake */  
                if (key.isConnectable()) {  
                    channel.finishConnect();    //把建立的連接姨蟋,加入輸出結果集合  
                    this.connected.add(channel.id());  
                    this.sensors.connectionCreated.record();  
                }  
  
                ...  
  
                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {  
                    NetworkReceive networkReceive;  
                    while ((networkReceive = channel.read()) != null)  
                        addToStagedReceives(channel, networkReceive);  
                }  
  
                if (channel.ready() && key.isWritable()) {  
                    Send send = channel.write();  
                    if (send != null) {  
                        this.completedSends.add(send);  //把完成的發(fā)送,加入輸出結果集合  
                        this.sensors.recordBytesSent(channel.id(), send.size());  
                    }  
                }  
  
                if (!key.isValid()) {  
                    close(channel);  
                    this.disconnected.add(channel.id());  //把斷掉的連接立帖,加入輸出結果集合  
                }  
            } catch (Exception e) {  
                String desc = channel.socketDescription();  
                if (e instanceof IOException)  
                    log.debug("Connection with {} disconnected", desc, e);  
                else  
                    log.warn("Unexpected error from {}; closing connection", desc, e);  
                close(channel);  
                this.disconnected.add(channel.id()); //把斷掉的連接眼溶,加入輸出結果集合  
            }  
        }  
    }  
  
    addToCompletedReceives(); //把完成的接收,加入輸出結果集合  
  
    long endIo = time.nanoseconds();  
    this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());  
    maybeCloseOldestConnection();  
}

核心原理之1 – 消息的分包

在上面的代碼中晓勇,為什么會有addToStagedReceives堂飞? 什么叫做staged receives呢灌旧? 這叫要從數(shù)據(jù)的分包說起:

在NetworkClient中,往下傳的是一個完整的ClientRequest酝静,進到Selector节榜,暫存到channel中的羡玛,也是一個完整的Send對象(1個數(shù)據(jù)包)别智。但這個Send對象,交由底層的channel.write(Bytebuffer b)的時候稼稿,并不一定一次可以完全發(fā)送薄榛,可能要調(diào)用多次write,才能把一個Send對象完全發(fā)出去让歼。這是因為write是非阻塞的敞恋,不是等到完全發(fā)出去,才會返回谋右。所以才有上面的代碼:

if (channel.ready() && key.isWritable()) {  
    Send send = channel.write(); //send不為空硬猫,表示完全發(fā)送出去,返回發(fā)出去的這個Send對象改执。如果沒完全發(fā)出去啸蜜,返回null  
    if (send != null) {    
        this.completedSends.add(send);  
        this.sensors.recordBytesSent(channel.id(), send.size());  
    }  
}

同樣,在接收的時候辈挂,channel.read(Bytebuffer b)衬横,一個response也可能要read多次,才能完全接收终蒂。所以就有了上面的while循環(huán)代碼:

if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {  
    NetworkReceive networkReceive;  
    while ((networkReceive = channel.read()) != null)  //循環(huán)接收蜂林,直到1個response完全接收到,才會從while循環(huán)退出  
        addToStagedReceives(channel, networkReceive);  
}

核心原理之2 – 消息的分界

從上面知道拇泣,底層數(shù)據(jù)的通信噪叙,是在每一個channel上面,2個源源不斷的byte流霉翔,一個send流睁蕾,一個receive流。
send的時候早龟,還好說惫霸,發(fā)送之前知道一個完整的消息的大小葱弟;
那接收的時候壹店,我怎么知道一個msg response什么時候結束,然后開始接收下一個response呢芝加?

這就需要一個小技巧:在所有request硅卢,response頭部射窒,首先是一個定長的,4字節(jié)的頭将塑,receive的時候脉顿,至少調(diào)用2次read,先讀取這4個字節(jié)点寥,獲取整個response的長度艾疟,接下來再讀取消息體。

public class NetworkReceive implements Receive {  
    private final String source;  
    private final ByteBuffer size;  //頭部4字節(jié)的buffer  
    private final int maxSize;  
    private ByteBuffer buffer;  //后面整個消息response的buffer  
  
    public NetworkReceive(String source) {  
        this.source = source;  
        this.size = ByteBuffer.allocate(4);   //先分配4字節(jié)的頭部  
        this.buffer = null;  
        this.maxSize = UNLIMITED;  
   }  
}

核心原理之3 - 消息時序保證

在InFlightRequests中敢辩,存放了所有發(fā)出去蔽莱,但是response還沒有回來的request。request發(fā)出去的時候戚长,入對盗冷;response回來,就把相對應的request出對同廉。

final class InFlightRequests {  
  
    private final int maxInFlightRequestsPerConnection;  
    private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();  
}

這個有個關鍵點:我們注意到request與response的配對仪糖,在這里是用隊列表達的,而不是Map迫肖。用隊列的入隊锅劝,出隊,完成2者的匹配咒程。要實現(xiàn)這個鸠天,服務器就必須要保證消息的時序:即在一個socket上面,假如發(fā)出去的reqeust是0, 1, 2帐姻,那返回的response的順序也必須是0, 1, 2稠集。

但是服務器是1 + N + M模型,所有的請求進入一個requestQueue饥瓷,然后是多線程并行處理的剥纷。那它如何保證消息的時序呢?

答案是mute/unmute機制:每當一個channel上面接收到一個request呢铆,這個channel就會被mute晦鞋,然后等response返回之后,才會再unmute棺克。這樣就保證了同1個連接上面悠垛,同時只會有1個請求被處理。

下面是服務端的代碼:

selector.completedReceives.asScala.foreach { receive =>  
      try {  
        val channel = selector.channel(receive.source)  
        val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),  
          channel.socketAddress)  
        val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)  
        requestChannel.sendRequest(req)  
      } catch {  
        case e @ (_: InvalidRequestException | _: SchemaException) =>  
          // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier  
          error("Closing socket for " + receive.source + " because of error", e)  
          close(selector, receive.source)  
      }  
      selector.mute(receive.source)    //收到請求娜谊,把這個請求對應的channel, mute  
    }  
  
    selector.completedSends.asScala.foreach { send =>  
      val resp = inflightResponses.remove(send.destination).getOrElse {  
        throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")  
      }  
      resp.request.updateRequestMetrics()  
      selector.unmute(send.destination)  //發(fā)送response之后确买,把這個responese對應的channel, unmute  
    }

NetworkClient實現(xiàn)思路

上面已經(jīng)講到:
(1)Selector維護了所有連接的連接池,所有連接上纱皆,消息的發(fā)送湾趾、接收都是通過poll函數(shù)進行的
(2)一個channel一次只能暫存1個Send對象芭商。

但如果這個Send對象,一次poll之后搀缠,沒有完全發(fā)送出去怎么辦呢铛楣?看上層NetworkClient怎么處理的:

關鍵的client.ready函數(shù)

先從Sender的run()函數(shù)看起:

public void run(long now) {  
    Cluster cluster = metadata.fetch();  
    // get the list of partitions with data ready to send  
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);  
  
    if (result.unknownLeadersExist)  
        this.metadata.requestUpdate();  
  
    // remove any nodes we aren't ready to send to  
    Iterator<Node> iter = result.readyNodes.iterator();  
    long notReadyTimeout = Long.MAX_VALUE;  
    while (iter.hasNext()) {  
        Node node = iter.next();  
        if (!this.client.ready(node, now)) {   //關鍵函數(shù)!R掌铡簸州!  
            iter.remove();  
            notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));  
        }  
    }  
  
    // create produce requests  
    Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,  
                                                                     result.readyNodes,  
                                                                     this.maxRequestSize,  
                                                                     now);  
  
    List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now);  
    // update sensors  
    for (RecordBatch expiredBatch : expiredBatches)  
        this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);  
  
    sensors.updateProduceRequestMetrics(batches);  
    List<ClientRequest> requests = createProduceRequests(batches, now);  
  
    long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);  
    if (result.readyNodes.size() > 0) {  
        log.trace("Nodes with data ready to send: {}", result.readyNodes);  
        log.trace("Created {} produce requests: {}", requests.size(), requests);  
        pollTimeout = 0;  
    }  
  
    for (ClientRequest request : requests)  //每個request分屬于不同的Node  
        client.send(request, now);   //client的send就是直接調(diào)用了selector.send,消息暫存在channel里面衷敌,沒有發(fā)送  
  
    this.client.poll(pollTimeout, now); //調(diào)用selector.poll勿侯,處理連接、發(fā)送缴罗、接收  
}

在上面的代碼中,有一個關鍵函數(shù):client.ready(Node n, ..)祭埂, 這個函數(shù)內(nèi)部會判斷這個node有沒有ready面氓,如果沒有ready,就會從readNodes里面移除蛆橡,接下來就不會往這個Node發(fā)送消息舌界。

那什么叫ready呢? 我們看一下代碼:

public boolean ready(Node node, long now) {  
    if (isReady(node, now))  
        return true;  
  
    if (connectionStates.canConnect(node.idString(), now))  
        initiateConnect(node, now);  
    return false;  
}  
  
public boolean isReady(Node node, long now) {  
    return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString());  
}  
  
private boolean canSendRequest(String node) {  
    return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);  
}  
  
public boolean canSendMore(String node) {  
    Deque<ClientRequest> queue = requests.get(node);  
    return queue == null || queue.isEmpty() ||  
           (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);  
}  
  
public boolean completed() {  
    return remaining <= 0 && !pending;  
}

上面的代碼封了好幾層泰演,但總結下來呻拌,一個Node ready,可以向其發(fā)送請求睦焕,需要符合以下幾個條件:

  1. metadata正常藐握,不需要update: !metadataUpdater.isUpdateDue(now)
  2. 連接正常 connectionStates.isConnected(node)
  3. channel是ready狀態(tài):這個對于PlaintextChannel, 一直返回true
  4. 當前該channel中垃喊,沒有in flight request猾普,所有請求都處理完了
  5. 當前該channel中,隊列尾部的request已經(jīng)完全發(fā)送出去, request.completed()本谜,并且inflight request數(shù)目初家,沒有超過設定的最大值
    缺省為5,即允許在“天上飛”的request最多有5個乌助,所謂在“天上飛”溜在,就是發(fā)出去了,response還沒有回來)

而上面的第5個條件他托,正是解決了上面的問題:一個channel里面的Send對象要是只發(fā)送了部分掖肋,下1次就不會處于ready狀態(tài)了。

client.poll函數(shù)

下面看一下client.poll上祈,是如何封裝selector.poll的:

    public List<ClientResponse> poll(long timeout, long now) {  
        long metadataTimeout = metadataUpdater.maybeUpdate(now);  
        try {  
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));  
        } catch (IOException e) {  
            log.error("Unexpected error during I/O", e);  
        }  
  
        //上面說到培遵,selector.poll函數(shù)浙芙,會把處理結果,放到一堆的狀態(tài)變量里面(輸出結果集)籽腕,現(xiàn)在就是處理這堆輸出結果的時候了嗡呼。  
  
        long updatedNow = this.time.milliseconds();  
        List<ClientResponse> responses = new ArrayList<>();  
        handleCompletedSends(responses, updatedNow);  
        handleCompletedReceives(responses, updatedNow);  
        handleDisconnections(responses, updatedNow);  
        handleConnections();  
        handleTimedOutRequests(responses, updatedNow);  
  
        // invoke callbacks  
        for (ClientResponse response : responses) {  
            if (response.request().hasCallback()) {  
                try {  
                    response.request().callback().onComplete(response);  
                } catch (Exception e) {  
                    log.error("Uncaught error in request completion:", e);  
                }  
            }  
        }  
  
        return responses;  
   }  
  
//Selector中的那堆狀態(tài)變量,在每次poll之前,被clear情況掉皇耗,每次poll之后南窗,填充。  
//然后在client.poll里面郎楼,這堆輸出結果被處理  
public class Selector implements Selectable {  
    万伤。。呜袁。  
    private final List<Send> completedSends;  
    private final List<NetworkReceive> completedReceives;  
    private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;  
    private final List<String> disconnected;  
    private final List<String> connected;  
敌买。。阶界。  
}

連接檢測 & 自動重連機制

在所有tcp長鏈接的編程中虹钮,都有一個基本問題要解決:如何判斷1個連接是否斷開?客戶端需要維護所有連接的狀態(tài)(connecting, connected, disconnected)膘融,然后根據(jù)連接狀態(tài)做不同邏輯芙粱。

但在NIO中,并沒有一個函數(shù)氧映,可以直接告訴你一個連接是否斷開了春畔;在NetworkClient里面,也并沒有開一個線程岛都,不斷發(fā)送心跳消息律姨,來檢測連接。那它是如何處理的呢疗绣?

檢測連接斷開的手段

在networkClient的實現(xiàn)中线召,用了3種手段,來判斷一個連接是否斷開:

手段1:所有的IO函數(shù)多矮,connect, finishConnect, read, write都會拋IOException缓淹,因此任何時候,調(diào)用這些函數(shù)的時候塔逃,只要拋異常讯壶,就認為連接已經(jīng)斷開。

手段2:selectionKey.isValid()

手段3:inflightRequests湾盗,所有發(fā)出去的request伏蚊,都設置有一個response返回的時間。在這個時間內(nèi)格粪,response沒有回來躏吊,就認為連接斷了氛改。

前2種手段,都集中在Select.poll函數(shù)里面:

public void poll(long timeout) throws IOException {  
    if (timeout < 0)  
        throw new IllegalArgumentException("timeout should be >= 0");  
    clear();  
    if (hasStagedReceives())  
        timeout = 0;  
    /* check ready keys */  
    long startSelect = time.nanoseconds();  
    int readyKeys = select(timeout);  
    long endSelect = time.nanoseconds();  
    currentTimeNanos = endSelect;  
    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());  
  
    if (readyKeys > 0) {  
        Set<SelectionKey> keys = this.nioSelector.selectedKeys();  
        Iterator<SelectionKey> iter = keys.iterator();  
        while (iter.hasNext()) {  
            SelectionKey key = iter.next();  
            iter.remove();  
            KafkaChannel channel = channel(key);  
  
            // register all per-connection metrics at once  
            sensors.maybeRegisterConnectionMetrics(channel.id());  
            lruConnections.put(channel.id(), currentTimeNanos);  
  
            try {  
                /* complete any connections that have finished their handshake */  
                if (key.isConnectable()) {  
                    channel.finishConnect();  
                    this.connected.add(channel.id());  
                    this.sensors.connectionCreated.record();  
                }  
  
                /* if channel is not ready finish prepare */  
                if (channel.isConnected() && !channel.ready())  
                    channel.prepare();  
  
                /* if channel is ready read from any connections that have readable data */  
                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {  
                    NetworkReceive networkReceive;  
                    while ((networkReceive = channel.read()) != null)  
                        addToStagedReceives(channel, networkReceive);  
                }  
  
                /* if channel is ready write to any sockets that have space in their buffer and for which we have data */  
                if (channel.ready() && key.isWritable()) {  
                    Send send = channel.write();  
                    if (send != null) {  
                        this.completedSends.add(send);  
                        this.sensors.recordBytesSent(channel.id(), send.size());  
                    }  
                }  
  
                if (!key.isValid()) {   //手段2  
                    close(channel);  
                    this.disconnected.add(channel.id());  
                }  
            } catch (Exception e) {  //手段1:任何一個io函數(shù)比伏,只要拋錯胜卤,就認為連接斷了  
                String desc = channel.socketDescription();  
                if (e instanceof IOException)  
                    log.debug("Connection with {} disconnected", desc, e);  
                else  
                    log.warn("Unexpected error from {}; closing connection", desc, e);  
                close(channel);  
                this.disconnected.add(channel.id());  
            }  
        }  
    }  
  
    addToCompletedReceives();  
  
    long endIo = time.nanoseconds();  
    this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());  
    maybeCloseOldestConnection();  
}

第3種手段,在NetworkClient里面:

public List<ClientResponse> poll(long timeout, long now) {  
    long metadataTimeout = metadataUpdater.maybeUpdate(now);  
    try {  
        this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));  
    } catch (IOException e) {  
        log.error("Unexpected error during I/O", e);  
    }  
  
    long updatedNow = this.time.milliseconds();  
    List<ClientResponse> responses = new ArrayList<>();  
    handleCompletedSends(responses, updatedNow);  
    handleCompletedReceives(responses, updatedNow);  
    handleDisconnections(responses, updatedNow);  
    handleConnections();  
    handleTimedOutRequests(responses, updatedNow); //手段3:處理所有TimeOutRequests  
  
    for (ClientResponse response : responses) {  
        if (response.request().hasCallback()) {  
            try {  
                response.request().callback().onComplete(response);  
            } catch (Exception e) {  
                log.error("Uncaught error in request completion:", e);  
            }  
        }  
    }  
  
    return responses;  
}  
  
private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) {  
    connectionStates.disconnected(nodeId, now);  
    for (ClientRequest request : this.inFlightRequests.clearAll(nodeId)) {  
        log.trace("Cancelled request {} due to node {} being disconnected", request, nodeId);  
        if (!metadataUpdater.maybeHandleDisconnection(request)) //把MetaDataRequest排除在外赁项,其它所有請求葛躏,只要超時,就認為連接斷開  
            responses.add(new ClientResponse(request, now, true, null));  
    }  
}

除了上述的2個地方悠菜,還要一個地方舰攒,就是初始化的時候

private void initiateConnect(Node node, long now) {  
    String nodeConnectionId = node.idString();  
    try {  
        log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());  
        this.connectionStates.connecting(nodeConnectionId, now);  
        selector.connect(nodeConnectionId,  
                         new InetSocketAddress(node.host(), node.port()),  
                         this.socketSendBuffer,  
                         this.socketReceiveBuffer);  
    } catch (IOException e) { //檢測到連接斷開  
        connectionStates.disconnected(nodeConnectionId, now);  
        metadataUpdater.requestUpdate();  
        log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);  
    }  
}

檢測時機

從上面代碼我們可以看出,連接的檢測時機悔醋,有2個:
一個是初始建立連接的時候摩窃,一個就是每次poll循環(huán),每poll一次篙顺,就收集到一個斷開的連接集合偶芍。

下面分別是Selector和NetworkClient中,關于連接狀態(tài)的數(shù)據(jù)結構:

//Selector中的連接狀態(tài)  
public class Selector implements Selectable {  
    private final List<String> disconnected;  
    private final List<String> connected;  
    ..  
}
//NetworkClient中的連接狀態(tài)維護  
public class NetworkClient implements KafkaClient {  
    private final ClusterConnectionStates connectionStates;  
    ...  
}  
  
final class ClusterConnectionStates {  
    private final long reconnectBackoffMs; //重連的時間間隔  
    private final Map<String, NodeConnectionState> nodeState;  
}  
  
    private static class NodeConnectionState {  
        ConnectionState state;  
        long lastConnectAttemptMs;  //上1次發(fā)起重連的時間  
        ...  
    }  
  
public enum ConnectionState {  
    DISCONNECTED, CONNECTING, CONNECTED  
}

總結:

  1. Selector中的連接狀態(tài)德玫,在每次poll之前,會調(diào)用clear清空椎麦;在poll之后宰僧,收集。
  2. Selector中的連接狀態(tài)观挎,會傳給上層NetworkClient琴儿,用于它更新自己的連接狀態(tài)
  3. 出了來自Selctor,NetworkClient自己內(nèi)部的inflightRequests嘁捷,也就是上面的手段3造成, 也用于檢測連接狀態(tài)。

通過上面的機制雄嚣,就保證了NetworkClient可以實時準確維護所有connection的狀態(tài)晒屎。

自動重連 - ready函數(shù)

狀態(tài)知道了,那剩下的就是自動重連了缓升。這個發(fā)生在更上層的Send的run函數(shù)里面:

//Sender  
    public void run(long now) {  
        Cluster cluster = metadata.fetch();  
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);  
  
        if (result.unknownLeadersExist)  
            this.metadata.requestUpdate();  
  
        Iterator<Node> iter = result.readyNodes.iterator();  
        long notReadyTimeout = Long.MAX_VALUE;  
        while (iter.hasNext()) {  
            Node node = iter.next();  
            if (!this.client.ready(node, now)) {  //關鍵的ready函數(shù)  
                iter.remove();  
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));  
            }  
        }  
  
    public boolean ready(Node node, long now) {  
        if (isReady(node, now))  
            return true;  
  
        if (connectionStates.canConnect(node.idString(), now))  
            initiateConnect(node, now);   //發(fā)起重連  
  
        return false;  
    }  
  
    public boolean canConnect(String id, long now) {  
        NodeConnectionState state = nodeState.get(id);  
        if (state == null)  
            return true;  
        else  
            return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs >= this.reconnectBackoffMs;  
    }

從上面函數(shù)可以看出鼓鲁,每次Send發(fā)數(shù)據(jù)之前,會先調(diào)用client.ready(node)判斷該node的連接是否可用港谊。

在ready內(nèi)部骇吭,如果連接不是connected狀態(tài),會再判斷是否可以發(fā)起自動重連歧寺,檢測條件有2個:

條件1: 它不能是connecting狀態(tài)燥狰,必須是disconnected
條件2: 重連不能太頻繁棘脐。當前時間距離上1次重連時間,要有一定的間隔龙致。如果broker掛了蛀缝,你太頻繁的重連也不起作用。

這里有個關鍵點:因為都是非阻塞調(diào)用净当,本次雖然檢測到連接斷了内斯,但只是發(fā)起連接,不會等到連接建立好了像啼,再執(zhí)行下面的代碼俘闯。
會在poll之后,判斷連接是否建立忽冻;在下1次或者下幾次poll之前真朗,可能連接才會建立好,ready才會返回true.

歡迎加入QQ群:104286694

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末僧诚,一起剝皮案震驚了整個濱河市遮婶,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌湖笨,老刑警劉巖旗扑,帶你破解...
    沈念sama閱讀 211,817評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異慈省,居然都是意外死亡臀防,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評論 3 385
  • 文/潘曉璐 我一進店門边败,熙熙樓的掌柜王于貴愁眉苦臉地迎上來袱衷,“玉大人,你說我怎么就攤上這事笑窜≈略铮” “怎么了?”我有些...
    開封第一講書人閱讀 157,354評論 0 348
  • 文/不壞的土叔 我叫張陵排截,是天一觀的道長嫌蚤。 經(jīng)常有香客問我,道長匾寝,這世上最難降的妖魔是什么搬葬? 我笑而不...
    開封第一講書人閱讀 56,498評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮艳悔,結果婚禮上急凰,老公的妹妹穿的比我還像新娘。我一直安慰自己,他們只是感情好抡锈,可當我...
    茶點故事閱讀 65,600評論 6 386
  • 文/花漫 我一把揭開白布疾忍。 她就那樣靜靜地躺著,像睡著了一般床三。 火紅的嫁衣襯著肌膚如雪一罩。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,829評論 1 290
  • 那天撇簿,我揣著相機與錄音聂渊,去河邊找鬼。 笑死四瘫,一個胖子當著我的面吹牛汉嗽,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播找蜜,決...
    沈念sama閱讀 38,979評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼饼暑,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了洗做?” 一聲冷哼從身側響起弓叛,我...
    開封第一講書人閱讀 37,722評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎诚纸,沒想到半個月后撰筷,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,189評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡畦徘,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,519評論 2 327
  • 正文 我和宋清朗相戀三年闭专,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片旧烧。...
    茶點故事閱讀 38,654評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖画髓,靈堂內(nèi)的尸體忽然破棺而出掘剪,到底是詐尸還是另有隱情,我是刑警寧澤奈虾,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布夺谁,位于F島的核電站,受9級特大地震影響肉微,放射性物質發(fā)生泄漏匾鸥。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,940評論 3 313
  • 文/蒙蒙 一碉纳、第九天 我趴在偏房一處隱蔽的房頂上張望勿负。 院中可真熱鬧,春花似錦劳曹、人聲如沸奴愉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽锭硼。三九已至房资,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間檀头,已是汗流浹背轰异。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人供常。 一個月前我還...
    沈念sama閱讀 46,382評論 2 360
  • 正文 我出身青樓宏赘,卻偏偏與公主長得像,于是被迫代替她去往敵國和親戳稽。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,543評論 2 349

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理期升,服務發(fā)現(xiàn)惊奇,斷路器,智...
    卡卡羅2017閱讀 134,633評論 18 139
  • 這篇將是網(wǎng)絡層源碼分析的最后一篇 北京的天,無力吐槽啊~ 快年底了, 每個團隊都在旁邊錄制新年寄語,各種口號~ 對...
    掃帚的影子閱讀 2,166評論 0 0
  • 在上一篇我們分析了Metadata的更新機制播赁,其中涉及到一個問題颂郎,就是Sender如何跟服務器通信,也就是網(wǎng)絡層容为。...
    丸_子閱讀 622評論 0 2
  • 許久不寫字,發(fā)現(xiàn)寫作水平嚴重退步啊~~~ 以前也是個文藝青年,現(xiàn)在也要寫出詩意的代碼啊~ 沒找到以前寫的詩,咱們還...
    掃帚的影子閱讀 3,240評論 2 8
  • kafka的定義:是一個分布式消息系統(tǒng)乓序,由LinkedIn使用Scala編寫,用作LinkedIn的活動流(Act...
    時待吾閱讀 5,311評論 1 15