OKHttp源碼解析(九):OKHTTP連接中三個"核心"RealConnection谴垫、ConnectionPool蜓耻、StreamAllocation

本片文章終于講到OKHTTP中的核心了,復用連接池官还,本片文章的順序是

  • 1芹橡、RealConnection類
  • 2、ConnectionPool類
  • 3望伦、StreamAllocation類

一林说、RealConnection

RealConnection是Connection的實現(xiàn)類,代表著鏈接socket的鏈路屯伞,如果擁有了一個RealConnection就代表了我們已經(jīng)跟服務器有了一條通信鏈路述么,而且通過
RealConnection代表是連接socket鏈路,RealConnection對象意味著我們已經(jīng)跟服務端有了一條通信鏈路了愕掏。很多朋友這時候會想到度秘,有通信鏈路了,是不是與意味著在這個類實現(xiàn)的三次握手饵撑,你們猜對了剑梳,的確是在這個類里面實現(xiàn)的三次握手。在講握手的之前滑潘,看下它的屬性和構造函數(shù)垢乙,對他有個大概的了解。

  private final ConnectionPool connectionPool;
  private final Route route;

  // The fields below are initialized by connect() and never reassigned.
  //下面這些字段语卤,通過connect()方法開始初始化追逮,并且絕對不會再次賦值
  /** The low-level TCP socket. */
  private Socket rawSocket; //底層socket
  /**
   * The application layer socket. Either an {@link SSLSocket} layered over {@link #rawSocket}, or
   * {@link #rawSocket} itself if this connection does not use SSL.
   */
  private Socket socket;  //應用層socket
  //握手
  private Handshake handshake;
   //協(xié)議
  private Protocol protocol;
   // http2的鏈接
  private Http2Connection http2Connection;
  //通過source和sink酪刀,大家可以猜到是與服務器交互的輸入輸出流
  private BufferedSource source;
  private BufferedSink sink;

  // The fields below track connection state and are guarded by connectionPool.
  //下面這個字段是 屬于表示鏈接狀態(tài)的字段,并且有connectPool統(tǒng)一管理
  /** If true, no new streams can be created on this connection. Once true this is always true. */
  //如果noNewStreams被設為true钮孵,則noNewStreams一直為true骂倘,不會被改變,并且表示這個鏈接不會再創(chuàng)新的stream流
  public boolean noNewStreams;
  
  //成功的次數(shù)
  public int successCount;

  /**
   * The maximum number of concurrent streams that can be carried by this connection. If {@code
   * allocations.size() < allocationLimit} then new streams can be created on this connection.
   */
  //此鏈接可以承載最大并發(fā)流的限制巴席,如果不超過限制历涝,可以隨意增加
  public int allocationLimit = 1;

通過上面代碼,我們可以得出以下結論:

1漾唉、里面除了route 字段荧库,部分的字段都是在connect()方法里面賦值的,并且不會再次賦值
2赵刑、這里含有source和sink分衫,所以可以以流的形式對服務器進行交互
3、noNewStream可以簡單理解為它表示該連接不可用般此。這個值一旦被設為true,則這個conncetion則不會再創(chuàng)建stream丐箩。
4、allocationLimit是分配流的數(shù)量上限恤煞,一個connection最大只能支持一個1并發(fā)
5屎勘、allocations是關聯(lián)StreamAllocation,它用來統(tǒng)計在一個連接上建立了哪些流,通過StreamAllocation的acquire方法和release方法可以將一個allcation對方添加到鏈表或者移除鏈表居扒,

其實大家估計已經(jīng)猜到了connect()里面進行了三次握手概漱,大家也猜對了,那咱們就簡單的介紹下connect()方法:

public void connect( int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) {
    if (protocol != null) throw new IllegalStateException("already connected");
     // 線路的選擇
    RouteException routeException = null;
    List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
    ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);

    if (route.address().sslSocketFactory() == null) {
      if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
        throw new RouteException(new UnknownServiceException(
            "CLEARTEXT communication not enabled for client"));
      }
      String host = route.address().url().host();
      if (!Platform.get().isCleartextTrafficPermitted(host)) {
        throw new RouteException(new UnknownServiceException(
            "CLEARTEXT communication to " + host + " not permitted by network security policy"));
      }
    }
    // 連接開始
    while (true) {
      try {
        // 如果要求隧道模式喜喂,建立通道連接瓤摧,通常不是這種
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout);
        } else {
           // 一般都走這條邏輯了,實際上很簡單就是socket的連接
          connectSocket(connectTimeout, readTimeout);
        }
        // https的建立
        establishProtocol(connectionSpecSelector);
        break;
      } catch (IOException e) {
        closeQuietly(socket);
        closeQuietly(rawSocket);
        socket = null;
        rawSocket = null;
        source = null;
        sink = null;
        handshake = null;
        protocol = null;
        http2Connection = null;

        if (routeException == null) {
          routeException = new RouteException(e);
        } else {
          routeException.addConnectException(e);
        }

        if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
          throw routeException;
        }
      }
    }

    if (http2Connection != null) {
      synchronized (connectionPool) {
        allocationLimit = http2Connection.maxConcurrentStreams();
      }
    }
  }

這里的執(zhí)行過程大體如下玉吁;

  • 1照弥、檢查連接是否已經(jīng)建立,若已經(jīng)建立进副,則拋出異常这揣,否則繼續(xù),連接的是否簡歷由protocol標示影斑,它表示在整個連接建立给赞,及可能的協(xié)商過程中選擇所有要用到的協(xié)議。
  • 2矫户、用 集合connnectionspecs構造ConnectionSpecSelector片迅。
  • 3、如果請求是不安全的請求皆辽,會對請求執(zhí)行一些額外的限制:
    3.1柑蛇、ConnectionSpec集合必須包含ConnectionSpec.CLEARTEXT芥挣。也就是說OkHttp用戶可以通過OkHttpClient設置不包含ConnectionSpec.CLEARTEXT的ConnectionSpec集合來禁用所有的明文要求。
    3.2耻台、平臺本身的安全策略允向相應的主機發(fā)送明文請求空免。對于Android平臺而言,這種安全策略主要由系統(tǒng)的組件android.security.NetworkSecurityPolicy執(zhí)行粘我。平臺的這種安全策略不是每個Android版本都有的。Android6.0之后存在這種控制痹换。
    (okhttp/okhttp/src/main/java/okhttp3/internal/platform/AndroidPlatform.java 里面的isCleartextTrafficPermitted()方法)
  • 4征字、根據(jù)請求判斷是否需要建立隧道連接,如果建立隧道連接則調(diào)用
    connectTunnel(connectTimeout, readTimeout, writeTimeout);
  • 5娇豫、如果不是隧道連接則調(diào)用connectSocket(connectTimeout, readTimeout);建立普通連接匙姜。
  • 6、通過調(diào)用establishProtocol建立協(xié)議
  • 7冯痢、如果是HTTP/2氮昧,則設置相關屬性。

整個流程已經(jīng)梳理完浦楣,咱們就摳一下具體的細節(jié)袖肥,首先來看下建立普通連接,因為隧道連接也會用到普通連接的代碼:
看下connectSocket()方法

/** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket. */
  private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();
     // 根據(jù)代理類型來選擇socket類型振劳,是代理還是直連
    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);

    rawSocket.setSoTimeout(readTimeout);
    try {
      // 連接socket椎组,之所以這樣寫是因為支持不同的平臺
      //里面實際上是  socket.connect(address, connectTimeout);
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
      ce.initCause(e);
      throw ce;
    }
     // 得到輸入/輸出流
    source = Okio.buffer(Okio.source(rawSocket));
    sink = Okio.buffer(Okio.sink(rawSocket));
  }

有3種情況需要建立普通連接:

  • 無代理
  • 明文的HTTP代理
  • SOCKS代理

普通連接的建立過程為建立TCP連接,建立TCP連接的過程為:

  • 1历恐、創(chuàng)建Socket寸癌,非SOCKS代理的情況下,通過SocketFactory創(chuàng)建弱贼;在SOCKS代理則傳入proxy手動new一個出來蒸苇。
  • 2、為Socket設置超時
  • 3吮旅、完成特定于平臺的連接建立
  • 4溪烤、創(chuàng)建用于I/O的source和sink

下面我來看下connectSocket()的具體實現(xiàn),connectSocket()具體實現(xiàn)是AndroidPlatform.java里面的connectSocket()庇勃。
關于AndroidPlatform.java請看上一篇文章氛什。

設置了SOCKS代理的情況下,僅有的特別之處在于匪凉,是通過傳入proxy手動創(chuàng)建Socket枪眉。route的socketAddress包含目標HTTP服務器的域名。由此可見SOCKS協(xié)議的處理再层,主要是在Java標準庫的java.net.Socket中處理贸铜,對于外界而言堡纬,就好像是HTTP服務器直接建立連接一樣,因此連接時傳入的地址都是HTTP服務器的域名蒿秦。

而對于明文的HTTP代理的情況下烤镐,這里滅有任何特殊處理。route的socketAddress包含著代理服務器的IP地址棍鳖。HTTP代理自身會根據(jù)請求及相應的實際內(nèi)容炮叶,建立與HTTP服務器的TCP連接,并轉發(fā)數(shù)據(jù)渡处。

這時候我們再來看下建立隧道邏輯:

  /**
   * Does all the work to build an HTTPS connection over a proxy tunnel. The catch here is that a
   * proxy server can issue an auth challenge and then close the connection.
   */
  private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout)
      throws IOException {
    Request tunnelRequest = createTunnelRequest();
    HttpUrl url = tunnelRequest.url();
    int attemptedConnections = 0;
    int maxAttempts = 21;
    while (true) {
      if (++attemptedConnections > maxAttempts) {
        throw new ProtocolException("Too many tunnel connections attempted: " + maxAttempts);
      }

      connectSocket(connectTimeout, readTimeout);
      tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);

      if (tunnelRequest == null) break; // Tunnel successfully created.

      // The proxy decided to close the connection after an auth challenge. We need to create a new
      // connection, but this time with the auth credentials.
      closeQuietly(rawSocket);
      rawSocket = null;
      sink = null;
      source = null;
    }
  }

建立隧道連接的過程又分為幾個步驟:

  • 創(chuàng)建隧道請求
  • 建立Socket連接
  • 發(fā)送請求建立隧道

隧道請求是一個常規(guī)的HTTP請求镜悉,只是請求的內(nèi)容有點特殊医瘫。最初創(chuàng)建的隧道請求如:

  /**
   * Returns a request that creates a TLS tunnel via an HTTP proxy. Everything in the tunnel request
   * is sent unencrypted to the proxy server, so tunnels include only the minimum set of headers.
   * This avoids sending potentially sensitive data like HTTP cookies to the proxy unencrypted.
   */
  private Request createTunnelRequest() {
    return new Request.Builder()
        .url(route.address().url())
        .header("Host", Util.hostHeader(route.address().url(), true))
        .header("Proxy-Connection", "Keep-Alive") // For HTTP/1.0 proxies like Squid.
        .header("User-Agent", Version.userAgent())
        .build();
  }

一個隧道請求的例子如下:

隧道.png

請求的"Host" header中包含了目標HTTP服務器的域名莺禁。建立socket連接的過程這里就不細說了

創(chuàng)建隧道的過程是這樣子的:

/**
   * To make an HTTPS connection over an HTTP proxy, send an unencrypted CONNECT request to create
   * the proxy connection. This may need to be retried if the proxy requires authorization.
   */
  private Request createTunnel(int readTimeout, int writeTimeout, Request tunnelRequest,
      HttpUrl url) throws IOException {
    // Make an SSL Tunnel on the first message pair of each SSL + proxy connection.
    String requestLine = "CONNECT " + Util.hostHeader(url, true) + " HTTP/1.1";
    while (true) {
      Http1Codec tunnelConnection = new Http1Codec(null, null, source, sink);
      source.timeout().timeout(readTimeout, MILLISECONDS);
      sink.timeout().timeout(writeTimeout, MILLISECONDS);
      tunnelConnection.writeRequest(tunnelRequest.headers(), requestLine);
      tunnelConnection.finishRequest();
      Response response = tunnelConnection.readResponseHeaders(false)
          .request(tunnelRequest)
          .build();
      // The response body from a CONNECT should be empty, but if it is not then we should consume
      // it before proceeding.
      long contentLength = HttpHeaders.contentLength(response);
      if (contentLength == -1L) {
        contentLength = 0L;
      }
      Source body = tunnelConnection.newFixedLengthSource(contentLength);
      Util.skipAll(body, Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
      body.close();

      switch (response.code()) {
        case HTTP_OK:
          // Assume the server won't send a TLS ServerHello until we send a TLS ClientHello. If
          // that happens, then we will have buffered bytes that are needed by the SSLSocket!
          // This check is imperfect: it doesn't tell us whether a handshake will succeed, just
          // that it will almost certainly fail because the proxy has sent unexpected data.
          if (!source.buffer().exhausted() || !sink.buffer().exhausted()) {
            throw new IOException("TLS tunnel buffered too many bytes!");
          }
          return null;

        case HTTP_PROXY_AUTH:
          tunnelRequest = route.address().proxyAuthenticator().authenticate(route, response);
          if (tunnelRequest == null) throw new IOException("Failed to authenticate with proxy");

          if ("close".equalsIgnoreCase(response.header("Connection"))) {
            return tunnelRequest;
          }
          break;

        default:
          throw new IOException(
              "Unexpected response code for CONNECT: " + response.code());
      }
    }
  }

在前面創(chuàng)建的TCP連接值上斩例,完成代理服務器的HTTP請求/響應交互。請求的內(nèi)容類似下面這樣:

"CONNECT m.taobao.com:443 HTTP/1.1"

這里可能會根據(jù)HTTP代理是否需要認證而有多次HTTP請求/響應交互僚纷。
總結一下OkHttp3中代理相關的處理矩距;

  • 1、沒有設置代理的情況下怖竭,直接與HTTP服務器建立TCP連接剩晴,然后進行HTTP請求/響應的交互。
  • 2侵状、設置了SOCKS代理的情況下赞弥,創(chuàng)建Socket時,為其傳入proxy趣兄,連接時還是以HTTP服務器為目標绽左。在標準庫的Socket中完成SOCKS協(xié)議相關的處理。此時基本上感知不到代理的存在艇潭。
  • 3拼窥、設置了HTTP代理時的HTTP請求,與HTTP代理服務器建立TCP連接蹋凝。HTTP代理服務器解析HTTP請求/響應的內(nèi)容鲁纠,并根據(jù)其中的信息來完成數(shù)據(jù)的轉發(fā)。也就是說鳍寂,如果HTTP請求中不包含"Host"header改含,則有可能在設置了HTTP代理的情況下無法與HTTP服務器建立連接。
  • 4迄汛、HTTP代理時的HTTPS/HTTP2請求捍壤,與HTTP服務器建立通過HTTP代理的隧道連接骤视。HTTP代理不再解析傳輸?shù)臄?shù)據(jù),僅僅完成數(shù)據(jù)轉發(fā)的功能鹃觉。此時HTTP代理的功能退化為如同SOCKS代理類似专酗。
  • 5、設置了代理類時盗扇,HTTP的服務器的域名解析會交給代理服務器執(zhí)行祷肯。其中設置了HTTP代理時,會對HTTP代理的域名做域名解析疗隶。

上述流程弄明白后佑笋,來看下建立協(xié)議
不管是建立隧道連接,還是建立普通連接抽减,都少不了建立協(xié)議這一步驟允青,這一步是建立好TCP連接之后橄碾,而在該TCP能被拿來手法數(shù)據(jù)之前執(zhí)行的卵沉。它主要為了數(shù)據(jù)的加密傳輸做一些初始化,比如TCL握手法牲,HTTP/2的協(xié)商史汗。

  private void establishProtocol(ConnectionSpecSelector connectionSpecSelector) throws IOException {
    //如果不是ssl
    if (route.address().sslSocketFactory() == null) {
      protocol = Protocol.HTTP_1_1;
      socket = rawSocket;
      return;
    }
    //如果是sll
    connectTls(connectionSpecSelector);
   //如果是HTTP2
    if (protocol == Protocol.HTTP_2) {
      socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
      http2Connection = new Http2Connection.Builder(true)
          .socket(socket, route.address().url().host(), source, sink)
          .listener(this)
          .build();
      http2Connection.start();
    }
  }

上面的代碼大體上可以歸納為兩點

  • 1、對于加密的數(shù)據(jù)傳輸拒垃,創(chuàng)建TLS連接停撞。對于明文傳輸,則設置protocol和socket悼瓮。socket直接指向應用層戈毒,如HTTP或HTTP/2,交互的Socket。
    1.1對于明文傳輸沒有設置HTTP代理的HTTP請求横堡,它是與HTTP服務器之間的TCP socket埋市。
    1.2對于加密傳輸沒有設置HTTP代理服務器的HTTP或HTTP2請求,它是與HTTP服務器之間的SSLSocket命贴。
    1.3對于加密傳輸設置了HTTP代理服務器的HTTP或HTTP2請求道宅,它是與HTTP服務器之間經(jīng)過代理服務器的SSLSocket,一個隧道連接胸蛛;
    1.4對于加密傳輸設置了SOCKS代理的HTTP或HTTP2請求污茵,它是一條經(jīng)過了代理服務器的SSLSocket連接。
  • 2葬项、對于HTTP/2泞当,通過new 一個Http2Connection.Builder會建立HTTP/2連接 Http2Connection,然后執(zhí)行http2Connection.start()和服務器建立協(xié)議民珍。我們先來看下建立TLS連接的connectTls()方法
 private void connectTls(ConnectionSpecSelector connectionSpecSelector) throws IOException {
    Address address = route.address();
    SSLSocketFactory sslSocketFactory = address.sslSocketFactory();
    boolean success = false;
    SSLSocket sslSocket = null;
    try {
      // Create the wrapper over the connected socket.
      //在原來的Socket加一層ssl
      sslSocket = (SSLSocket) sslSocketFactory.createSocket(
          rawSocket, address.url().host(), address.url().port(), true /* autoClose */);

      // Configure the socket's ciphers, TLS versions, and extensions.
      ConnectionSpec connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket);
      if (connectionSpec.supportsTlsExtensions()) {
        Platform.get().configureTlsExtensions(
            sslSocket, address.url().host(), address.protocols());
      }

      // Force handshake. This can throw!
      sslSocket.startHandshake();
      Handshake unverifiedHandshake = Handshake.get(sslSocket.getSession());

      // Verify that the socket's certificates are acceptable for the target host.
      if (!address.hostnameVerifier().verify(address.url().host(), sslSocket.getSession())) {
        X509Certificate cert = (X509Certificate) unverifiedHandshake.peerCertificates().get(0);
        throw new SSLPeerUnverifiedException("Hostname " + address.url().host() + " not verified:"
            + "\n    certificate: " + CertificatePinner.pin(cert)
            + "\n    DN: " + cert.getSubjectDN().getName()
            + "\n    subjectAltNames: " + OkHostnameVerifier.allSubjectAltNames(cert));
      }

      // Check that the certificate pinner is satisfied by the certificates presented.
      address.certificatePinner().check(address.url().host(),
          unverifiedHandshake.peerCertificates());

      // Success! Save the handshake and the ALPN protocol.
      String maybeProtocol = connectionSpec.supportsTlsExtensions()
          ? Platform.get().getSelectedProtocol(sslSocket)
          : null;
      socket = sslSocket;
      source = Okio.buffer(Okio.source(socket));
      sink = Okio.buffer(Okio.sink(socket));
      handshake = unverifiedHandshake;
      protocol = maybeProtocol != null
          ? Protocol.get(maybeProtocol)
          : Protocol.HTTP_1_1;
      success = true;
    } catch (AssertionError e) {
      if (Util.isAndroidGetsocknameError(e)) throw new IOException(e);
      throw e;
    } finally {
      if (sslSocket != null) {
        Platform.get().afterHandshake(sslSocket);
      }
      if (!success) {
        closeQuietly(sslSocket);
      }
    }
  }

TLS連接是對原始TCP連接的一個封裝零蓉,以及聽過TLS握手笤受,及數(shù)據(jù)手法過程中的加解密等功能。在Java中敌蜂,用SSLSocket來描述箩兽。上面建立的TLS連接的過程大體為:

  • 1、用SSLSocketFactory基于原始的TCP Socket章喉,創(chuàng)建一個SSLSocket汗贫。
  • 2、并配置SSLSocket秸脱。
  • 3落包、在前面選擇的ConnectionSpec支持TLS擴展參數(shù)時,配置TLS擴展參數(shù)摊唇。
  • 4咐蝇、啟動TLS握手
  • 5、TLS握手完成之后巷查,獲取證書信息有序。
  • 6、對TLS握手過程中傳回來的證書進行驗證岛请。
  • 7旭寿、在前面選擇的ConnectionSpec支持TLS擴展參數(shù)時,獲取TLS握手過程中順便完成的協(xié)議協(xié)商過程所選擇的協(xié)議崇败。這個過程主要用于HTTP/2的ALPN擴展盅称。
  • 8、OkHttp主要使用Okio來做IO操作后室,這里會基于前面獲取到SSLSocket創(chuàng)建于執(zhí)行的IO的BufferedSource和BufferedSink等缩膝,并保存握手信息以及所選擇的協(xié)議。

至此連接已經(jīng)建立連接已經(jīng)結束了岸霹。

這里說一下isHealthy(boolean doExtensiveChecks)方法疾层,入?yún)⑹且粋€布爾類,表示是否需要額外的檢查松申。這里主要是檢查云芦,判斷這個連接是否是健康的連接,即是否可以重用贸桶。那我們來看下

/** Returns true if this connection is ready to host new streams. */
  public boolean isHealthy(boolean doExtensiveChecks) {
    if (socket.isClosed() || socket.isInputShutdown() || socket.isOutputShutdown()) {
      return false;
    }

    if (http2Connection != null) {
      return !http2Connection.isShutdown();
    }

    if (doExtensiveChecks) {
      try {
        int readTimeout = socket.getSoTimeout();
        try {
          socket.setSoTimeout(1);
          if (source.exhausted()) {
            return false; // Stream is exhausted; socket is closed.
          }
          return true;
        } finally {
          socket.setSoTimeout(readTimeout);
        }
      } catch (SocketTimeoutException ignored) {
        // Read timed out; socket is good.
      } catch (IOException e) {
        return false; // Couldn't read; socket is closed.
      }
    }
    return true;
  }

看上述代碼可知舅逸,同時滿足如下條件才是健康的連接,否則返回false

  • 1皇筛、socket已經(jīng)關閉
  • 2琉历、輸入流關閉
  • 3、輸出流關閉
  • 4、如果是HTTP/2連接旗笔,則HTTP/2連接也要關閉彪置。

讓我們再來看下isEligible(Address, Route)方法,這個方法主要是判斷面對給出的addres和route蝇恶,這個realConnetion是否可以重用拳魁。

/**
   * Returns true if this connection can carry a stream allocation to {@code address}. If non-null
   * {@code route} is the resolved route for a connection.
   */
  public boolean isEligible(Address address, Route route) {
    // If this connection is not accepting new streams, we're done.
    if (allocations.size() >= allocationLimit || noNewStreams) return false;

    // If the non-host fields of the address don't overlap, we're done.
    if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;

    // If the host exactly matches, we're done: this connection can carry the address.
    if (address.url().host().equals(this.route().address().url().host())) {
      return true; // This connection is a perfect match.
    }

    // At this point we don't have a hostname match. But we still be able to carry the request if
    // our connection coalescing requirements are met. See also:
    // https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
    // https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/

    // 1. This connection must be HTTP/2.
    if (http2Connection == null) return false;

    // 2. The routes must share an IP address. This requires us to have a DNS address for both
    // hosts, which only happens after route planning. We can't coalesce connections that use a
    // proxy, since proxies don't tell us the origin server's IP address.
    if (route == null) return false;
    if (route.proxy().type() != Proxy.Type.DIRECT) return false;
    if (this.route.proxy().type() != Proxy.Type.DIRECT) return false;
    if (!this.route.socketAddress().equals(route.socketAddress())) return false;

    // 3. This connection's server certificate's must cover the new host.
    if (route.address().hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false;
    if (!supportsUrl(address.url())) return false;

    // 4. Certificate pinning must match the host.
    try {
      address.certificatePinner().check(address.url().host(), handshake().peerCertificates());
    } catch (SSLPeerUnverifiedException e) {
      return false;
    }

    return true; // The caller's address can be carried by this connection.
  }

判斷邏輯如下:

  • 如果連接達到共享上限,則不能重用
  • 非host域必須完全一樣撮弧,如果不一樣不能重用
  • 如果此時host域也相同潘懊,則符合條件,可以被復用
  • 如果host不相同贿衍,在HTTP/2的域名切片場景下一樣可以復用
    關于HTTP/2的可以參考下面的文章
    https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding

最后再來看下newCodec(OkHttpClient, StreamAllocation)方法

    if (http2Connection != null) {
      return new Http2Codec(client, streamAllocation, http2Connection);
    } else {
      socket.setSoTimeout(client.readTimeoutMillis());
      source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
      sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
      return new Http1Codec(client, streamAllocation, source, sink);
    }

里面主要是判斷是否是HTTP/2,如果是HTTP/2則new一個Http2Codec授舟。如果不是HTTP/2則new一個Http1Codec。

上面提到了connection的跟蹤狀態(tài)由ConncetionPool來管理贸辈。

二释树、ConnectionPool

大家先來看下一個類的注釋

/**
 * Manages reuse of HTTP and HTTP/2 connections for reduced network latency. HTTP requests that
 * share the same {@link Address} may share a {@link Connection}. This class implements the policy
 * of which connections to keep open for future use.
 */

簡單的翻譯下,如下:
管理http和http/2的鏈接擎淤,以便減少網(wǎng)絡請求延遲奢啥。同一個address將共享同一個connection。該類實現(xiàn)了復用連接的目標揉燃。

然后看下這個類的字段:

/**
   * Background threads are used to cleanup expired connections. There will be at most a single
   * thread running per connection pool. The thread pool executor permits the pool itself to be
   * garbage collected.
   */
  //這是一個用于清楚過期鏈接的線程池扫尺,每個線程池最多只能運行一個線程筋栋,并且這個線程池允許被垃圾回收
  private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

  /** The maximum number of idle connections for each address. */
  //每個address的最大空閑連接數(shù)炊汤。
  private final int maxIdleConnections;
  private final long keepAliveDurationNs;
  //清理任務
  private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };
  //鏈接的雙向隊列
  private final Deque<RealConnection> connections = new ArrayDeque<>();
  //路由的數(shù)據(jù)庫
  final RouteDatabase routeDatabase = new RouteDatabase();
   //清理任務正在執(zhí)行的標志
  boolean cleanupRunning;

來看下它的屬性,

  • 1弊攘、主要就是connections抢腐,可見ConnectionPool內(nèi)部以隊列方式存儲連接;
  • 2襟交、routDatabase是一個黑名單迈倍,用來記錄不可用的route,但是看代碼貌似ConnectionPool并沒有使用它捣域。所以此處不做分析啼染。
  • 3、剩下的就是和清理有關了焕梅,所以executor是清理任務的線程池迹鹅,cleanupRunning是清理任務的標志,cleanupRunnable是清理任務贞言。

再來看下他的構造函數(shù)

  /**
   * Create a new connection pool with tuning parameters appropriate for a single-user application.
   * The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
   * this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity.
   */
 //創(chuàng)建一個適用于單個應用程序的新連接池斜棚。
 //該連接池的參數(shù)將在未來的okhttp中發(fā)生改變
 //目前最多可容乃5個空閑的連接,存活期是5分鐘
  public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);
  }

  public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
    //保持活著的時間,否則清理將旋轉循環(huán)
    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
    }
  }

通過這個構造器我們知道了這個連接池最多維持5個連接弟蚀,且每個鏈接最多活5分鐘蚤霞。并且包含一個線程池包含一個清理任務。
所以maxIdleConnections和keepAliveDurationNs則是清理中淘汰連接的的指標义钉,這里需要說明的是maxIdleConnections是值每個地址上最大的空閑連接數(shù)昧绣。所以OkHttp只是限制與同一個遠程服務器的空閑連接數(shù)量,對整體的空閑連接并沒有限制捶闸。

PS:

這時候說下ConnectionPool的實例化的過程滞乙,一個OkHttpClient只包含一個ConnectionPool,其實例化也是在OkHttpClient的過程鉴嗤。這里說一下ConnectionPool各個方法的調(diào)用并沒有直接對外暴露斩启,而是通過OkHttpClient的Internal接口統(tǒng)一對外暴露。

然后我們來看下他的get和put方法

  /**
   * Returns a recycled connection to {@code address}, or null if no such connection exists. The
   * route is null if the address has not yet been routed.
   */
  RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    //斷言醉锅,判斷線程是不是被自己鎖住了
    assert (Thread.holdsLock(this));
    // 遍歷已有連接集合
    for (RealConnection connection : connections) { 
       //如果connection和需求中的"地址"和"路由"匹配
      if (connection.isEligible(address, route)) {
       //復用這個連接
        streamAllocation.acquire(connection);
        //返回這個連接
        return connection;
      }
    }
    return null;
  }

get() 方法遍歷 connections 中的所有 RealConnection 尋找同時滿足條件的RealConnection兔簇。

void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }

put方法更為簡單,就是異步觸發(fā)清理任務硬耍,然后將連接添加到隊列中垄琐。那么下面開始重點分析他的清理任務。

  private final long keepAliveDurationNs;
  private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

這個邏輯也很簡單经柴,就是調(diào)用cleanup方法執(zhí)行清理狸窘,并等待一段時間,持續(xù)清理坯认,其中cleanup方法返回的值來來決定而等待的時間長度翻擒。那我們繼續(xù)來看下cleanup函數(shù):

  /**
   * Performs maintenance on this pool, evicting the connection that has been idle the longest if
   * either it has exceeded the keep alive limit or the idle connections limit.
   *
   * <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns
   * -1 if no further cleanups are required.
   */
  long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

    // Find either a connection to evict, or the time that the next eviction is due.
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        // If the connection is in use, keep searching.
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }
        //統(tǒng)計空閑連接數(shù)量
        idleConnectionCount++;

        // If the connection is ready to be evicted, we're done.
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          //找出空閑時間最長的連接以及對應的空閑時間
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }

      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        // We've found a connection to evict. Remove it from the list, then close it below (outside
        // of the synchronized block).
       //在符合清理條件下,清理空閑時間最長的連接
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        // A connection will be ready to evict soon.
        //不符合清理條件牛哺,則返回下次需要執(zhí)行清理的等待時間陋气,也就是此連接即將到期的時間
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        // All connections are in use. It'll be at least the keep alive duration 'til we run again.
       //沒有空閑的連接,則隔keepAliveDuration(分鐘)之后再次執(zhí)行
        return keepAliveDurationNs;
      } else {
        // No connections, idle or in use.
       //清理結束
        cleanupRunning = false;
        return -1;
      }
    }
    //關閉socket資源
    closeQuietly(longestIdleConnection.socket());

    // Cleanup again immediately.
     //這里是在清理一個空閑時間最長的連接以后會執(zhí)行到這里引润,需要立即再次執(zhí)行清理
    return 0;
  }

  

這里的首先統(tǒng)計空閑連接數(shù)量巩趁,然后通過for循環(huán)查找最長空閑時間的連接以及對應空閑時長,然后判斷是否超出最大空閑連接數(shù)(maxIdleConnections)或者或者超過最大空閑時間(keepAliveDurationNs)淳附,滿足其一則清除最長空閑時長的連接议慰。如果不滿足清理條件,則返回一個對應等待時間奴曙。
這個對應等待的時間又分二種情況:

  • 1 有連接則等待下次需要清理的時間去清理:keepAliveDurationNs-longestIdleDurationNs;
  • 2 沒有空閑的連接别凹,則等下一個周期去清理:keepAliveDurationNs

如果清理完畢返回-1。
綜上所述缆毁,我們來梳理一下清理任務番川,清理任務就是異步執(zhí)行的,遵循兩個指標,最大空閑連接數(shù)量和最大空閑時長颁督,滿足其一則清理空閑時長最大的那個連接践啄,然后循環(huán)執(zhí)行,要么等待一段時間沉御,要么繼續(xù)清理下一個連接屿讽,知道清理所有連接,清理任務才結束吠裆,下一次put的時候伐谈,如果已經(jīng)停止的清理任務則會被再次觸發(fā)

/**
   * Prunes any leaked allocations and then returns the number of remaining live allocations on
   * {@code connection}. Allocations are leaked if the connection is tracking them but the
   * application code has abandoned them. Leak detection is imprecise and relies on garbage
   * collection.
   */
  private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    List<Reference<StreamAllocation>> references = connection.allocations;
     //遍歷弱引用列表
    for (int i = 0; i < references.size(); ) {
      Reference<StreamAllocation> reference = references.get(i);
       //若StreamAllocation被使用則接著循環(huán)
      if (reference.get() != null) {
        i++;
        continue;
      }

      // We've discovered a leaked allocation. This is an application bug.
      StreamAllocation.StreamAllocationReference streamAllocRef =
          (StreamAllocation.StreamAllocationReference) reference;
      String message = "A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?";
      Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
       //若StreamAllocation未被使用則移除引用,這邊注釋為泄露
      references.remove(i);
      connection.noNewStreams = true;

      // If this was the last allocation, the connection is eligible for immediate eviction.
      //如果列表為空則說明此連接沒有被引用了试疙,則返回0诵棵,表示此連接是空閑連接
      if (references.isEmpty()) {
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }
    return references.size();
  }

pruneAndGetAllocationCount主要是用來標記泄露連接的。內(nèi)部通過遍歷傳入進來的RealConnection的StreamAllocation列表祝旷,如果StreamAllocation被使用則接著遍歷下一個StreamAllocation履澳。如果StreamAllocation未被使用則從列表中移除,如果列表中為空則說明此連接連接沒有引用了怀跛,返回0距贷,表示此連接是空閑連接,否則就返回非0表示此連接是活躍連接吻谋。
接下來讓我看下ConnectionPool的connectionBecameIdle()方法忠蝗,就是當有連接空閑時,喚起cleanup線程清洗連接池

  /**
   * Notify this pool that {@code connection} has become idle. Returns true if the connection has
   * been removed from the pool and should be closed.
   */
  boolean connectionBecameIdle(RealConnection connection) {
    assert (Thread.holdsLock(this));
     //該連接已經(jīng)不可用
    if (connection.noNewStreams || maxIdleConnections == 0) {
      connections.remove(connection);
      return true;
    } else {
      //歡迎clean 線程
      notifyAll(); // Awake the cleanup thread: we may have exceeded the idle connection limit.
      return false;
    }
  }

connectionBecameIdle標示一個連接處于空閑狀態(tài)漓拾,即沒有流任務阁最,那么久需要調(diào)用該方法,由ConnectionPool來決定是否需要清理該連接晦攒。
再來看下deduplicate()方法

  /**
   * Replaces the connection held by {@code streamAllocation} with a shared connection if possible.
   * This recovers when multiple multiplexed connections are created concurrently.
   */
  Socket deduplicate(Address address, StreamAllocation streamAllocation) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, null)
          && connection.isMultiplexed()
          && connection != streamAllocation.connection()) {
        return streamAllocation.releaseAndAcquire(connection);
      }
    }
    return null;
  }

該方法主要是針對HTTP/2場景下多個多路復用連接清除的場景闽撤。如果是當前連接是HTTP/2得哆,那么所有指向該站點的請求都應該基于同一個TCP連接脯颜。這個方法比較簡單就不詳細說了,再說下另外一個方法

  /** Close and remove all idle connections in the pool. */
  public void evictAll() {
    List<RealConnection> evictedConnections = new ArrayList<>();
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();
        if (connection.allocations.isEmpty()) {
          connection.noNewStreams = true;
          evictedConnections.add(connection);
          i.remove();
        }
      }
    }
    for (RealConnection connection : evictedConnections) {
      closeQuietly(connection.socket());
    }
  }

該方法是刪除所有空閑的連接贩据,比較簡單栋操,不說了

三、 StreamAllocation

這個類很重要饱亮,我們先來看下類的注釋

/**
 * This class coordinates the relationship between three entities:
 *
 * <ul>
 *     <li><strong>Connections:</strong> physical socket connections to remote servers. These are
 *         potentially slow to establish so it is necessary to be able to cancel a connection
 *         currently being connected.
 *     <li><strong>Streams:</strong> logical HTTP request/response pairs that are layered on
 *         connections. Each connection has its own allocation limit, which defines how many
 *         concurrent streams that connection can carry. HTTP/1.x connections can carry 1 stream
 *         at a time, HTTP/2 typically carry multiple.
 *     <li><strong>Calls:</strong> a logical sequence of streams, typically an initial request and
 *         its follow up requests. We prefer to keep all streams of a single call on the same
 *         connection for better behavior and locality.
 * </ul>
 *
 * <p>Instances of this class act on behalf of the call, using one or more streams over one or more
 * connections. This class has APIs to release each of the above resources:
 *
 * <ul>
 *     <li>{@link #noNewStreams()} prevents the connection from being used for new streams in the
 *         future. Use this after a {@code Connection: close} header, or when the connection may be
 *         inconsistent.
 *     <li>{@link #streamFinished streamFinished()} releases the active stream from this allocation.
 *         Note that only one stream may be active at a given time, so it is necessary to call
 *         {@link #streamFinished streamFinished()} before creating a subsequent stream with {@link
 *         #newStream newStream()}.
 *     <li>{@link #release()} removes the call's hold on the connection. Note that this won't
 *         immediately free the connection if there is a stream still lingering. That happens when a
 *         call is complete but its response body has yet to be fully consumed.
 * </ul>
 *
 * <p>This class supports {@linkplain #cancel asynchronous canceling}. This is intended to have the
 * smallest blast radius possible. If an HTTP/2 stream is active, canceling will cancel that stream
 * but not the other streams sharing its connection. But if the TLS handshake is still in progress
 * then canceling may break the entire connection.
 */

在講解這個類的時候不得不說下背景:
HTTP的版本:
HTTP的版本從最初的1.0版本矾芙,到后續(xù)的1.1版本,再到后續(xù)的google推出的SPDY,后來再推出2.0版本近上,http協(xié)議越來越完善剔宪。(ps:okhttp也是根據(jù)2.0和1.1/1.0作為區(qū)分,實現(xiàn)了兩種連接機制)這里要說下http2.0和http1.0,1.1的主要區(qū)別,2.0解決了老版本(1.1和1.0)最重要兩個問題:連接無法復用和head of line blocking (HOL)問題.2.0使用多路復用的技術葱绒,多個stream可以共用一個socket連接感帅,每個tcp連接都是通過一個socket來完成的,socket對應一個host和port地淀,如果有多個stream(也就是多個request)都是連接在一個host和port上失球,那么它們就可以共同使用同一個socket,這樣做的好處就是可以減少TCP的一個三次握手的時間。在OKHttp里面帮毁,記錄一次連接的是RealConnection实苞,這個負責連接,在這個類里面用socket來連接烈疚,用HandShake來處理握手黔牵。

在講解這個類的之前我們先熟悉3個概念:請求、連接爷肝、流荧止。我們要明白HTTP通信執(zhí)行網(wǎng)絡"請求"需要在"連接"上建立一個新的"流",我們將StreamAllocation稱之流的橋梁,它負責為一次"請求"尋找"連接"并建立"流"阶剑,從而完成遠程通信跃巡。所以說StreamAllocation與"請求"、"連接"牧愁、"流"都有關素邪。

從注釋我們看到。Connection是建立在Socket之上的物流通信信道猪半,而Stream則是代表邏輯的流兔朦,至于Call是對一次請求過程的封裝。之前也說過一個Call可能會涉及多個流(比如重定向或者auth認證等情況)磨确。所以我們想一下沽甥,如果StreamAllocation要想解決上述問題,需要兩個步驟乏奥,一是尋找連接摆舟,二是獲取流。所以StreamAllocation里面應該包含一個Stream(上文已經(jīng)說到了邓了,OKHttp里面的流是HttpCodec)恨诱;還應該包含連接Connection。如果想找到合適的劉姐骗炉,還需要一個連接池ConnectionPool屬性照宝。所以應該有一個獲取流的方法在StreamAllocation里面是newStream();找到合適的流的方法findConnection()句葵;還應該有完成請求任務的之后finish()的方法來關閉流對象厕鹃,還有終止和取消等方法兢仰,以及釋放資源的方法。

1剂碴、那咱們先就看下他的屬性

  public final Address address;//地址
  private Route route; //路由
  private final ConnectionPool connectionPool;  //連接池
  private final Object callStackTrace; //日志

  // State guarded by connectionPool.
  private final RouteSelector routeSelector; //路由選擇器
  private int refusedStreamCount;  //拒絕的次數(shù)
  private RealConnection connection;  //連接
  private boolean released;  //是否已經(jīng)被釋放
  private boolean canceled  //是否被取消了

看完屬性旨别,我們來看下構造函數(shù)


  public StreamAllocation(ConnectionPool connectionPool, Address address, Object callStackTrace) {
    this.connectionPool = connectionPool;
    this.address = address;
    this.routeSelector = new RouteSelector(address, routeDatabase());
    this.callStackTrace = callStackTrace;
  }
 

這時候我們再來看下他的一個比較重要的方法

  public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    int connectTimeout = client.connectTimeoutMillis();
    int readTimeout = client.readTimeoutMillis();
    int writeTimeout = client.writeTimeoutMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      //獲取一個連接
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
   //實例化HttpCodec,如果是HTTP/2則是Http2Codec否則是Http1Codec
      HttpCodec resultCodec = resultConnection.newCodec(client, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }

這里面兩個重要方法
1是通過findHealthyConnection獲取一個連接、2是通過resultConnection.newCodec獲取流汗茄。
我們接著來看findHealthyConnection()方法

  /**
   * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
   * until a healthy connection is found.
   */
  private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
      throws IOException {
    while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          connectionRetryEnabled);

      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;
    }
  }

我們看到里面調(diào)用findConnection來獲取一個RealConnection秸弛,然后通過RealConnection自己的方法isHealthy,去判斷是否是健康的連接洪碳,如果是健康的連接递览,則重用,否則就繼續(xù)查找瞳腌。那我們繼續(xù)看下findConnection()方法

 /**
   * Returns a connection to host a new stream. This prefers the existing connection if it exists,
   * then the pool, finally building a new connection.
   */
  private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");
      //獲取存在的連接
      // Attempt to use an already-allocated connection.
      RealConnection allocatedConnection = this.connection;
      if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
          // 如果已經(jīng)存在的連接滿足要求绞铃,則使用已存在的連接
        return allocatedConnection;
      }
      //從緩存中去取
      // Attempt to get a connection from the pool.
      Internal.instance.get(connectionPool, address, this, null);
      if (connection != null) {
        return connection;
      }

      selectedRoute = route;
    }
       // 線路的選擇,多ip的支持
    // If we need a route, make one. This is a blocking operation.
    if (selectedRoute == null) {
      //里面是個遞歸
      selectedRoute = routeSelector.next();
    }

    RealConnection result;
    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      // Now that we have an IP address, make another attempt at getting a connection from the pool.
      // This could match due to connection coalescing.
      //更換路由再次嘗試
      Internal.instance.get(connectionPool, address, this, selectedRoute);
      if (connection != null) return connection;

      // Create a connection and assign it to this allocation immediately. This makes it possible
      // for an asynchronous cancel() to interrupt the handshake we're about to do.
      route = selectedRoute;
      refusedStreamCount = 0;
     // 以上都不符合嫂侍,創(chuàng)建一個連接
      result = new RealConnection(connectionPool, selectedRoute);
      acquire(result);
    }
     //連接并握手
    // Do TCP + TLS handshakes. This is a blocking operation.
    result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
    //更新本地數(shù)據(jù)庫
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      // Pool the connection.
      //把連接放到連接池中
      Internal.instance.put(connectionPool, result);
      //如果這個連接是多路復用
      // If another multiplexed connection to the same address was created concurrently, then
      // release this connection and acquire that one.
      if (result.isMultiplexed()) {
        //調(diào)用connectionPool的deduplicate方法去重儿捧。
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    //如果是重復的socket則關閉socket,不是則socket為nul挑宠,什么也不做
    closeQuietly(socket);
    //返回整個連接
    return result;
  }

上面代碼大概的邏輯是:

  • 1菲盾、先找是否有已經(jīng)存在的連接,如果有已經(jīng)存在的連接各淀,并且可以使用(!noNewStreams)則直接返回懒鉴。
  • 2、根據(jù)已知的address在connectionPool里面找碎浇,如果有連接临谱,則返回
  • 3、更換路由奴璃,更換線路悉默,在connectionPool里面再次查找,如果有則返回苟穆。
  • 4抄课、如果以上條件都不滿足則直接new一個RealConnection出來
  • 5、new出來的RealConnection通過acquire關聯(lián)到connection.allocations上
  • 6鞭缭、做去重判斷剖膳,如果有重復的socket則關閉

里面涉及到的RealConnection的connect()方法,我們已經(jīng)在RealConnection里面講過岭辣,這里就不講了。不過這里說下acquire()方法

  /**
   * Use this allocation to hold {@code connection}. Each call to this must be paired with a call to
   * {@link #release} on the same connection.
   */
  public void acquire(RealConnection connection) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection != null) throw new IllegalStateException();

    this.connection = connection;
    connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
  }

這里相當于給connection的引用計數(shù)器加1
這里說下StreamAllocationReference甸饱,StreamAllocationReference其實是弱引用的子類沦童。具體代碼如下:

  public static final class StreamAllocationReference extends WeakReference<StreamAllocation> {
    /**
     * Captures the stack trace at the time the Call is executed or enqueued. This is helpful for
     * identifying the origin of connection leaks.
     */
    public final Object callStackTrace;

    StreamAllocationReference(StreamAllocation referent, Object callStackTrace) {
      super(referent);
      this.callStackTrace = callStackTrace;
    }
  }

下面來看下他的他的其他方法streamFinished(boolean, HttpCodec)仑濒、release(RealConnection)和deallocate(boolean, boolean, boolean)方法。

  public void streamFinished(boolean noNewStreams, HttpCodec codec) {
    Socket socket;
    synchronized (connectionPool) {
      if (codec == null || codec != this.codec) {
        throw new IllegalStateException("expected " + this.codec + " but was " + codec);
      }
      if (!noNewStreams) {
        connection.successCount++;
      }
      socket = deallocate(noNewStreams, false, true);
    }
    closeQuietly(socket);
  }

/**
   * Releases resources held by this allocation. If sufficient resources are allocated, the
   * connection will be detached or closed. Callers must be synchronized on the connection pool.
   *
   * <p>Returns a closeable that the caller should pass to {@link Util#closeQuietly} upon completion
   * of the synchronized block. (We don't do I/O while synchronized on the connection pool.)
   */
  private Socket deallocate(boolean noNewStreams, boolean released, boolean streamFinished) {
    assert (Thread.holdsLock(connectionPool));

    if (streamFinished) {
      this.codec = null;
    }
    if (released) {
      this.released = true;
    }
    Socket socket = null;
    if (connection != null) {
      if (noNewStreams) {
        connection.noNewStreams = true;
      }
      if (this.codec == null && (this.released || connection.noNewStreams)) {
        release(connection);
        if (connection.allocations.isEmpty()) {
          connection.idleAtNanos = System.nanoTime();
          if (Internal.instance.connectionBecameIdle(connectionPool, connection)) {
            socket = connection.socket();
          }
        }
        connection = null;
      }
    }
    return socket;
  }

  /** Remove this allocation from the connection's list of allocations. */
  private void release(RealConnection connection) {
    for (int i = 0, size = connection.allocations.size(); i < size; i++) {
      Reference<StreamAllocation> reference = connection.allocations.get(i);
      if (reference.get() == this) {
        connection.allocations.remove(i);
        return;
      }
    }
    throw new IllegalStateException();
  }

其中deallocate(boolean, boolean, boolean)和release(RealConnection)方法都是private偷遗,而且均在streamFinished里面調(diào)用墩瞳。
release(RealConnection)方法比較簡單,主要是把RealConnection對應的allocations清除掉氏豌,把計數(shù)器歸零喉酌。
deallocate(boolean, boolean, boolean)方法也簡單,根據(jù)傳入的三個布爾類型的值進行操作泵喘,如果streamFinished為true則代表關閉流泪电,所以要通過連接池connectionPool把這個connection設置空閑連接,如果可以設為空閑連接則返回這個socket纪铺。不能則返回null相速。
streamFinished()主要做了一些異常判斷,然后調(diào)用deallocate()方法
綜上所述:streamFinished(boolean, HttpCodec)主要是關閉流鲜锚,release(RealConnection)主要是釋放connection的引用,deallocate(boolean, boolean, boolean)主要是根據(jù)參數(shù)做一些設置突诬。
上面說到了release(RealConnection),為了防止大家混淆概念芜繁,這里說一下另外一個方法release()這個是無參的方法旺隙。

  public void release() {
    Socket socket;
    synchronized (connectionPool) {
      socket = deallocate(false, true, false);
    }
    closeQuietly(socket);
  }

注意這個和上面的帶有RealConnection的參數(shù)release()的區(qū)別。
然后說一下noNewStreams()方法骏令,主要是設置防止別人在這個連接上開新的流催束。

  /** Forbid new streams from being created on the connection that hosts this allocation. */
  public void noNewStreams() {
    Socket socket;
    synchronized (connectionPool) {
      socket = deallocate(true, false, false);
    }
    closeQuietly(socket);
  }

還有一個方法,平時也是經(jīng)常有遇到的就是cancel()方法

  public void cancel() {
    HttpCodec codecToCancel;
    RealConnection connectionToCancel;
    synchronized (connectionPool) {
      canceled = true;
      codecToCancel = codec;
      connectionToCancel = connection;
    }
    if (codecToCancel != null) {
      codecToCancel.cancel();
    } else if (connectionToCancel != null) {
      connectionToCancel.cancel();
    }
  }

其實也比較簡單的就是調(diào)用RealConnection的Cancel方法伏社。
如果在連接中過程出現(xiàn)異常抠刺,會調(diào)用streamFailed(IOException)方法

public void streamFailed(IOException e) {
    Socket socket;
    boolean noNewStreams = false;

    synchronized (connectionPool) {
      if (e instanceof StreamResetException) {
        StreamResetException streamResetException = (StreamResetException) e;
        if (streamResetException.errorCode == ErrorCode.REFUSED_STREAM) {
          refusedStreamCount++;
        }
        // On HTTP/2 stream errors, retry REFUSED_STREAM errors once on the same connection. All
        // other errors must be retried on a new connection.
        if (streamResetException.errorCode != ErrorCode.REFUSED_STREAM || refusedStreamCount > 1) {
          noNewStreams = true;
          route = null;
        }
      } else if (connection != null
          && (!connection.isMultiplexed() || e instanceof ConnectionShutdownException)) {
        noNewStreams = true;

        // If this route hasn't completed a call, avoid it for new connections.
        if (connection.successCount == 0) {
          if (route != null && e != null) {
            routeSelector.connectFailed(route, e);
          }
          route = null;
        }
      }
      socket = deallocate(noNewStreams, false, true);
    }

    closeQuietly(socket);
  }

根據(jù)異常類型來采取不同的應對措施。注釋已經(jīng)比較清楚了摘昌,就不細說了速妖。
其他的方法比較簡單,我這里就不細說了聪黎。

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末罕容,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子稿饰,更是在濱河造成了極大的恐慌锦秒,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件喉镰,死亡現(xiàn)場離奇詭異旅择,居然都是意外死亡,警方通過查閱死者的電腦和手機侣姆,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進店門生真,熙熙樓的掌柜王于貴愁眉苦臉地迎上來沉噩,“玉大人,你說我怎么就攤上這事柱蟀〈桑” “怎么了?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵长已,是天一觀的道長畜眨。 經(jīng)常有香客問我,道長术瓮,這世上最難降的妖魔是什么康聂? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮斤斧,結果婚禮上早抠,老公的妹妹穿的比我還像新娘。我一直安慰自己撬讽,他們只是感情好蕊连,可當我...
    茶點故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著游昼,像睡著了一般甘苍。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上烘豌,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天载庭,我揣著相機與錄音,去河邊找鬼廊佩。 笑死囚聚,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的标锄。 我是一名探鬼主播顽铸,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼料皇!你這毒婦竟也來了?” 一聲冷哼從身側響起践剂,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎逊脯,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體男窟,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡盆赤,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年贾富,在試婚紗的時候發(fā)現(xiàn)自己被綠了牺六。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片汗捡。...
    茶點故事閱讀 40,040評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡淑际,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出扇住,到底是詐尸還是另有隱情春缕,我是刑警寧澤,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布锄贼,位于F島的核電站女阀,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏浸策。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一庸汗、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧蚯舱,春花似錦、人聲如沸陈肛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽懦窘。三九已至,卻和暖如春港华,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背立宜。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留橙数,地道東北人。 一個月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓崖技,卻偏偏與公主長得像钟哥,于是被迫代替她去往敵國和親迎献。 傳聞我的和親對象是個殘疾皇子腻贰,可洞房花燭夜當晚...
    茶點故事閱讀 44,979評論 2 355

推薦閱讀更多精彩內(nèi)容