網(wǎng)絡(luò)請(qǐng)求框架學(xué)習(xí) okhttp ----連接池

上一篇攔截器分析中,在ConnectInterceptor的intercept方法中鸥跟,有這樣一句代碼來(lái)獲得stream丢郊。

HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);

點(diǎn)streamAllocation進(jìn)去之后看到newStream方法中調(diào)用了findHealthyConnection方法,實(shí)現(xiàn)如下:

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
      throws IOException {
    while (true) {
      // 這一句代碼是實(shí)際獲得連接的
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          connectionRetryEnabled);

      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;
    }
  }

如果是新的連接就跳過(guò)健康檢查医咨,如果不是就查一下是否已經(jīng)斷開(kāi)啦枫匾,輸入輸出是否關(guān)閉啦。再進(jìn)去看findConnection方法

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    // 在pool中取連接的話拟淮,就需要拿這個(gè)pool做同步鎖干茉,如果是第一次發(fā)起請(qǐng)求應(yīng)該是拿不到的,會(huì)走到下面
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");

      // Attempt to use an already-allocated connection.
      RealConnection allocatedConnection = this.connection;
      // 如果allocatedConnection 不為空并且連接池還沒(méi)滿很泊,就直接使用這個(gè)連接
      if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
        return allocatedConnection;
      }

      // Attempt to get a connection from the pool.
      // 調(diào)用了okhttpclient的get方法角虫,從connectionPool中根據(jù)address拿到連接。
      Internal.instance.get(connectionPool, address, this, null);
      if (connection != null) {
        return connection;
      }
      // 第一次進(jìn)來(lái)就是空
      selectedRoute = route;
    }

    // If we need a route, make one. This is a blocking operation.
    if (selectedRoute == null) {
      // 這里面先是查找內(nèi)存緩存委造,根據(jù)proxies的類(lèi)型在routeSelector的集合inetSocketAddresses中查找戳鹅,沒(méi)有的話就重設(shè)一個(gè)
      // 調(diào)用address.dns().lookup(socketHost)方法,通過(guò)DNS服務(wù)器查詢(xún)返回一組ip地址(一個(gè)域名可能對(duì)應(yīng)多個(gè)ip地址昏兆,可用于自動(dòng)重連)
      // 最后將得到的address 加入集合inetSocketAddresses中緩存起來(lái)粉楚。
      selectedRoute = routeSelector.next();
    }

    RealConnection result;
    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      // Now that we have an IP address, make another attempt at getting a connection from the pool.
      // This could match due to connection coalescing.
      // 再進(jìn)行一次嘗試,從連接池中拿連接
      Internal.instance.get(connectionPool, address, this, selectedRoute);
      if (connection != null) {
        route = selectedRoute;
        return connection;
      }

      // Create a connection and assign it to this allocation immediately. This makes it possible
      // for an asynchronous cancel() to interrupt the handshake we're about to do.
      route = selectedRoute;
      refusedStreamCount = 0;
      // 新搞一個(gè)連接
      result = new RealConnection(connectionPool, selectedRoute);
      // 將connection的引用交給streamAllocation亮垫,將streamAllocation的弱引用加入到connection的allocations集合中
      acquire(result);
    }

    // Do TCP + TLS handshakes. This is a blocking operation.
    // 這里調(diào)用RealConnection的connect方法模软,傳入ip 端口號(hào)進(jìn)行connect,RealConnection內(nèi)部的source和sink就是在這個(gè)方法中賦值的饮潦。
    result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
    // connectionPool中維護(hù)了一個(gè)鍵值對(duì)燃异,里面存了所有連接失敗的route,每個(gè)連接失敗的route都加入進(jìn)去继蜡。
    // 而這句話是把連接成功的從route黑名單中去除掉回俐。  
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      // Pool the connection.
      // 用okhttpclient的代碼把當(dāng)前的連接放入連接池中逛腿,這種麻煩的寫(xiě)法估計(jì)是跟設(shè)計(jì)模式有關(guān)系
      // 注意放進(jìn)去的同時(shí)會(huì)觸發(fā)清理
      Internal.instance.put(connectionPool, result);

      // If another multiplexed connection to the same address was created concurrently, then
      // release this connection and acquire that one.
      // 處理多線程產(chǎn)生的問(wèn)題,如果產(chǎn)生了多個(gè)connection就release掉當(dāng)前的仅颇,用另一個(gè)線程創(chuàng)建的connection
      // 并且關(guān)閉掉多余的socket
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    return result;
  }

代碼很長(zhǎng)单默,我看到的是:
RouteSelector做準(zhǔn)備;
ConnectionPool管理連接忘瓦;
RealConnection做具體執(zhí)行搁廓;
以StreamAllocation為中心協(xié)調(diào)各個(gè)類(lèi);
將RealConnection生成的Http1Codec和Http2Codec這種面向協(xié)議(設(shè)置請(qǐng)求頭讀取回復(fù))進(jìn)行sink和read的類(lèi) 傳遞到攔截器中耕皮。

代碼執(zhí)行步驟大致分三部分:

  1. 獲得route境蜕、ip、port那些鬼凌停,由RouteSelector這個(gè)類(lèi)完成粱年,拿到address之后再?lài)L試連接池中拿connection。
  2. 實(shí)在從連接池中拿不到了罚拟,就新建connection台诗,用raw socket進(jìn)行三握手那些鬼,由RealConnection這個(gè)類(lèi)完成赐俗,拿到source和sink拉队;
    讓connection和streamAllocation相互引用(一個(gè)強(qiáng)引用一個(gè)弱引用),連接池里面有一個(gè)ArrayDeque來(lái)記錄所有的socket連接秃励。將新的connection放入連接池氏仗,觸發(fā)清理;
    將route從黑名單移除夺鲜。
  3. 檢查是否有多線程導(dǎo)致的問(wèn)題皆尔,如果有,就釋放當(dāng)前連接币励,用別的線程創(chuàng)建的連接慷蠕。

上面說(shuō)將連接加入連接池時(shí)會(huì)觸發(fā)清理操作,下面貼上代碼詳細(xì)說(shuō)明是如何清理的食呻。

在connectionPool中流炕,有個(gè)cleanup方法來(lái)執(zhí)行清理操作

 long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

    // Find either a connection to evict, or the time that the next eviction is due.
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        // If the connection is in use, keep searching.
        // 這里具體執(zhí)行streamAllocation的清理,具體代碼在下面
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }

        idleConnectionCount++;

        // If the connection is ready to be evicted, we're done.
        // 如果說(shuō):閑置的時(shí)間超過(guò)了設(shè)定值仅胞,或者最大限制連接數(shù)超過(guò)設(shè)定值每辟,就把connection從連接池中移除,并關(guān)掉connection干旧。
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }

      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        // We've found a connection to evict. Remove it from the list, then close it below (outside
        // of the synchronized block).
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        // A connection will be ready to evict soon.
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        // All connections are in use. It'll be at least the keep alive duration 'til we run again.
        return keepAliveDurationNs;
      } else {
        // No connections, idle or in use.
        cleanupRunning = false;
        return -1;
      }
    }

    closeQuietly(longestIdleConnection.socket());

    // Cleanup again immediately.
    // 根據(jù)閑置時(shí)間和閑置連接數(shù)渠欺,還有可能立刻執(zhí)行下一次清理
    return 0;
  }
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    List<Reference<StreamAllocation>> references = connection.allocations;
    for (int i = 0; i < references.size(); ) {
      Reference<StreamAllocation> reference = references.get(i);

      if (reference.get() != null) {
        i++;
        continue;
      }

      // We've discovered a leaked allocation. This is an application bug.
      //遍歷每一個(gè)connection的streamAllocation弱引用集合,發(fā)現(xiàn)弱引用已被回收椎眯,就將其在弱引用集合中移除
      StreamAllocation.StreamAllocationReference streamAllocRef =
          (StreamAllocation.StreamAllocationReference) reference;
      String message = "A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?";
      // 打印警告挠将,告知程序員胳岂,你的使用有問(wèn)題
      Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);

      references.remove(i);
      connection.noNewStreams = true;

      // If this was the last allocation, the connection is eligible for immediate eviction.
      if (references.isEmpty()) {
        // 如果所有的弱引用都被移除掉了,說(shuō)明這個(gè)connection是閑置的舔稀,記錄閑置的時(shí)間乳丰。將閑置最久的connection記錄下來(lái)。
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }

    return references.size();
  }

總結(jié)一下内贮,清理策略就是:

  1. connection自身記錄streamAllocation的連接數(shù)产园,達(dá)到0的時(shí)候就標(biāo)記自己為閑置連接,記錄閑置時(shí)間等待清理
  2. 滿足閑置時(shí)間太長(zhǎng)或者閑置連接太多時(shí)贺归,ConnectionPool就執(zhí)行清理操作關(guān)掉連接(默認(rèn)空閑的socket最大連接數(shù)為5個(gè)淆两,socket的keepAlive時(shí)間為5秒)断箫。
  3. 正常情況下ConnectionPool會(huì)每隔一段時(shí)間就嘗試清理一次拂酣。看連接使用情況仲义,忙的話就一直嘗試清理婶熬,閑的時(shí)候加入任務(wù)也會(huì)觸發(fā)清理。

okhttp對(duì)socket的直接管理還是通過(guò)ConnectionPool來(lái)實(shí)現(xiàn)的埃撵。

回顧一下前面的攔截器的知識(shí)赵颅,結(jié)合一下:

  1. 在RealInterceptorChain中有一個(gè)streamAllocation成員變量
  2. 在RetryAndFollowUpInterceptor中初始化streamAllocation傳到RealInterceptorChain中,此時(shí)還是沒(méi)有任何連接和這個(gè)streamAllocation綁定的
  3. 到了ConnectInterceptor中暂刘,調(diào)用streamAllocation的newStream方法饺谬,內(nèi)部調(diào)用findConnection方法,獲得連接
  4. 連接的獲得是先嘗試從連接池中取谣拣,取不到就初始化一個(gè)連接募寨,將streamAllocation弱引用給connection(此時(shí)connection可能已經(jīng)有很多streamAllocation在用了),同時(shí)在連接池中嘗試清理森缠。
  5. 拿到連接之后拔鹰,返回給ConnectInterceptor一個(gè)HttpCodec,這是一個(gè)接口的實(shí)現(xiàn)類(lèi)贵涵,根據(jù)http協(xié)議是 1.x 還是2 內(nèi)部有不同的實(shí)現(xiàn)
  6. 回到CallServerInterceptor中列肢,拿HttpCodec來(lái)執(zhí)行寫(xiě)入請(qǐng)求頭、讀取返回信息宾茂、構(gòu)造responseBody等瓷马。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市跨晴,隨后出現(xiàn)的幾起案子欧聘,更是在濱河造成了極大的恐慌餐茵,老刑警劉巖告私,帶你破解...
    沈念sama閱讀 221,576評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡婉徘,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén)委可,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)映皆,“玉大人,你說(shuō)我怎么就攤上這事凉敲∫露埽” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,017評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵爷抓,是天一觀的道長(zhǎng)势决。 經(jīng)常有香客問(wèn)我,道長(zhǎng)蓝撇,這世上最難降的妖魔是什么果复? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,626評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮渤昌,結(jié)果婚禮上虽抄,老公的妹妹穿的比我還像新娘。我一直安慰自己独柑,他們只是感情好迈窟,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,625評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著忌栅,像睡著了一般车酣。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上索绪,一...
    開(kāi)封第一講書(shū)人閱讀 52,255評(píng)論 1 308
  • 那天湖员,我揣著相機(jī)與錄音,去河邊找鬼者春。 笑死破衔,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的钱烟。 我是一名探鬼主播晰筛,決...
    沈念sama閱讀 40,825評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼拴袭!你這毒婦竟也來(lái)了读第?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,729評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤拥刻,失蹤者是張志新(化名)和其女友劉穎怜瞒,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,271評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡吴汪,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,363評(píng)論 3 340
  • 正文 我和宋清朗相戀三年惠窄,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片漾橙。...
    茶點(diǎn)故事閱讀 40,498評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡杆融,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出霜运,到底是詐尸還是另有隱情脾歇,我是刑警寧澤,帶...
    沈念sama閱讀 36,183評(píng)論 5 350
  • 正文 年R本政府宣布淘捡,位于F島的核電站藕各,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏焦除。R本人自食惡果不足惜激况,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,867評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望踢京。 院中可真熱鬧誉碴,春花似錦宦棺、人聲如沸瓣距。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,338評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)蹈丸。三九已至,卻和暖如春呐芥,著一層夾襖步出監(jiān)牢的瞬間逻杖,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,458評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工思瘟, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留荸百,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,906評(píng)論 3 376
  • 正文 我出身青樓滨攻,卻偏偏與公主長(zhǎng)得像够话,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子光绕,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,507評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容