KafkaController分析2-NetworkClient分析

  • NetworkClient: 顧名思義哈, 用于網(wǎng)絡(luò)連接,消息發(fā)送的客戶端封裝, 源碼中的注釋:

A network client for asynchronous request/response network i/o. This is an internal class used to implement the user-facing producer and consumer clients.

  • 用在何處:
1. kafka本身實(shí)現(xiàn)了java版的producer和consumer,里面的網(wǎng)絡(luò)連接,請(qǐng)求發(fā)送均使用NetworkClient實(shí)現(xiàn);
2. KafkaController中controller與其他broker的通訊,使用NetworkClient實(shí)現(xiàn);

InFlightRequests類

  • 所在文件: clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
  • 實(shí)現(xiàn)了request的集合, 包括正在發(fā)送的和已經(jīng)發(fā)送的但還沒(méi)有接收到response的request;
  • 主要成員變量: private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();
    針對(duì)每個(gè)連接使用Deque<ClientRequest>數(shù)據(jù)結(jié)構(gòu)來(lái)保存所有的request;Deque<ClientRequest> 是個(gè)雙端隊(duì)列;
  • 添加新的request, 新的reqeust總是通過(guò)addFirst放到隊(duì)首
public void add(ClientRequest request) {
        Deque<ClientRequest> reqs = this.requests.get(request.request().destination());
        if (reqs == null) {
            reqs = new ArrayDeque<>();
            this.requests.put(request.request().destination(), reqs);
        }
        reqs.addFirst(request);
    }
  • 取出最早發(fā)送的request, 通過(guò)pollLast()取出
public ClientRequest completeNext(String node) {
        return requestQueue(node).pollLast();
    }
  • public boolean canSendMore(String node)決定是否可以通過(guò)NetworkClient來(lái)發(fā)送請(qǐng)求
    對(duì)于通過(guò)NetworkClient來(lái)發(fā)送的request, 如果之前發(fā)送的請(qǐng)求并沒(méi)有通過(guò)底層socket實(shí)際發(fā)送完成, 是不允許發(fā)送新的request的
public boolean canSendMore(String node) {
        Deque<ClientRequest> queue = requests.get(node);
        return queue == null || queue.isEmpty() ||
               (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
    }
```

# ClusterConnectionStates
* 所在文件:clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
* 記錄到各個(gè)broker node的連接狀態(tài):
`private final Map<String, NodeConnectionState> nodeState`
* 對(duì)同一node的兩次連接有一定的時(shí)間間隔限制, 即采用延遲連接:
`private final long reconnectBackoffMs`
* 連接狀態(tài)有如下三種:
```
ConnectionState.DISCONNECTED -- 未連接
ConnectionState.DISCONNECTING -- 正在連接
ConnectionState.CONNECTED -- 已連接
```
* `canConnect`: 判斷是否允許連接到node:如果從未連接過(guò)或者連接當(dāng)前是斷開(kāi)的并且距離上次連接的間隔大于`reconnectBackoffMs`藐不, 則允許連接铣减;
```
public boolean canConnect(String id, long now) {
        NodeConnectionState state = nodeState.get(id);
        if (state == null)
            return true;
        else
            return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs >= this.reconnectBackoffMs;
    }
```

# NetworkClien類
* 所在文件: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
* 非線程安全
* 繼承自 `KafkaClient`
* 使用了 `org.apache.kafka.common.network.Selector`來(lái)處理網(wǎng)絡(luò)IO, [詳情點(diǎn)這里 => Kafka源碼分析-網(wǎng)絡(luò)層](http://www.reibang.com/p/8cbc7618abcb)
* 簡(jiǎn)單講這個(gè)類用來(lái)管理一個(gè)到broker node的連接曼验,請(qǐng)求發(fā)送和響應(yīng)接收:
>A network client for asynchronous request/response network i/o. This is an internal class used to implement the user-facing producer and consumer clients.
* 核心函數(shù) `poll`
使用`selector.poll`來(lái)處理實(shí)現(xiàn)的socket讀寫事件;
```
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }
```
經(jīng)過(guò)`selector.poll`的調(diào)用,所有**發(fā)送完成的requet**, **接收完成的response**, **所有斷開(kāi)的連接**椿浓, **所有新建成功的連接**都將放到`selector`中相應(yīng)的隊(duì)列里充择;
* 處理發(fā)送完成的request
```
private void handleCompletedSends(List<ClientResponse> responses, long now) {
        // if no response is expected then when the send is completed, return it
        for (Send send : this.selector.completedSends()) {
            ClientRequest request = this.inFlightRequests.lastSent(send.destination());
            if (!request.expectResponse()) {
                this.inFlightRequests.completeLastSent(send.destination());
                responses.add(new ClientResponse(request, now, false, null));
            }
        }
    }
```
對(duì)于不需要回應(yīng)response的請(qǐng)求,將從`ifFlightRequests`中刪除;
* 處理接收到的response
```
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
        for (NetworkReceive receive : this.selector.completedReceives()) {
            String source = receive.source();
            ClientRequest req = inFlightRequests.completeNext(source);
            ResponseHeader header = ResponseHeader.parse(receive.payload());
            // Always expect the response version id to be the same as the request version id
            short apiKey = req.request().header().apiKey();
            short apiVer = req.request().header().apiVersion();
            Struct body = ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload());
            correlate(req.request().header(), header);
            if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
                responses.add(new ClientResponse(req, now, false, body));
        }
    }
```
如果是`metadata`的更新response,則調(diào)用`metadataUpdater.maybeHandleCompletedReceive` 處理metadata的更新;
* 處理新建的連接
```
 private void handleConnections() {
        for (String node : this.selector.connected()) {
            log.debug("Completed connection to node {}", node);
            this.connectionStates.connected(node);
        }
    }
```
* 處理所有的`handle***`函數(shù)返回的responses
```
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleTimedOutRequests(responses, updatedNow);

        // invoke callbacks
        for (ClientResponse response : responses) {
            if (response.request().hasCallback()) {
                try {
                    response.request().callback().onComplete(response);
                } catch (Exception e) {
                    log.error("Uncaught error in request completion:", e);
                }
            }
        }
```

# NetworkClientBlockingOps
* 所在文件: core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
* 利用非阻塞的`NetworkClient`的方法, 實(shí)現(xiàn)了阻塞的方法;
* 阻塞直到`Client.ready`
```
def blockingReady(node: Node, timeout: Long)(implicit time: JTime): Boolean = {
    client.ready(node, time.milliseconds()) || pollUntil(timeout) { (_, now) =>
      if (client.isReady(node, now))
        true
      else if (client.connectionFailed(node))
        throw new IOException(s"Connection to $node failed")
      else false
    }
  }
```
* 阻塞發(fā)送request直到收到response
```
def blockingSendAndReceive(request: ClientRequest, timeout: Long)(implicit time: JTime): Option[ClientResponse] = {
    client.send(request, time.milliseconds())

    pollUntilFound(timeout) { case (responses, _) =>
      val response = responses.find { response =>
        response.request.request.header.correlationId == request.request.header.correlationId
      }
      response.foreach { r =>
        if (r.wasDisconnected) {
          val destination = request.request.destination
          throw new IOException(s"Connection to $destination was disconnected before the response was read")
        }
      }
      response
    }
  }
```

##### [Kafka源碼分析-匯總](http://www.reibang.com/p/aa274f8fe00f)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末展哭,一起剝皮案震驚了整個(gè)濱河市湃窍,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌匪傍,老刑警劉巖您市,帶你破解...
    沈念sama閱讀 217,406評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異役衡,居然都是意外死亡茵休,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門手蝎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)榕莺,“玉大人,你說(shuō)我怎么就攤上這事柑船∶背牛” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,711評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵鞍时,是天一觀的道長(zhǎng)亏拉。 經(jīng)常有香客問(wèn)我,道長(zhǎng)逆巍,這世上最難降的妖魔是什么及塘? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,380評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮锐极,結(jié)果婚禮上笙僚,老公的妹妹穿的比我還像新娘。我一直安慰自己灵再,他們只是感情好肋层,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布亿笤。 她就那樣靜靜地躺著,像睡著了一般栋猖。 火紅的嫁衣襯著肌膚如雪净薛。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,301評(píng)論 1 301
  • 那天蒲拉,我揣著相機(jī)與錄音肃拜,去河邊找鬼。 笑死雌团,一個(gè)胖子當(dāng)著我的面吹牛燃领,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播锦援,決...
    沈念sama閱讀 40,145評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼猛蔽,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了雨涛?” 一聲冷哼從身側(cè)響起枢舶,我...
    開(kāi)封第一講書(shū)人閱讀 39,008評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎替久,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體躏尉,經(jīng)...
    沈念sama閱讀 45,443評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蚯根,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評(píng)論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了胀糜。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片颅拦。...
    茶點(diǎn)故事閱讀 39,795評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖教藻,靈堂內(nèi)的尸體忽然破棺而出距帅,到底是詐尸還是另有隱情,我是刑警寧澤括堤,帶...
    沈念sama閱讀 35,501評(píng)論 5 345
  • 正文 年R本政府宣布碌秸,位于F島的核電站,受9級(jí)特大地震影響悄窃,放射性物質(zhì)發(fā)生泄漏讥电。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評(píng)論 3 328
  • 文/蒙蒙 一轧抗、第九天 我趴在偏房一處隱蔽的房頂上張望恩敌。 院中可真熱鬧,春花似錦横媚、人聲如沸纠炮。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,731評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)恢口。三九已至狮斗,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間弧蝇,已是汗流浹背碳褒。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,865評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留看疗,地道東北人沙峻。 一個(gè)月前我還...
    沈念sama閱讀 47,899評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像两芳,于是被迫代替她去往敵國(guó)和親摔寨。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容