OkHttp源碼解析

OkHttp優(yōu)點

OkHttp是一個高效的Http客戶端窄锅,有如下的特點:

  • 支持HTTP2/SPDY黑科技

  • socket自動選擇最好路線,并支持自動重連

  • 擁有自動維護的socket連接池尝胆,減少握手次數(shù)

  • 擁有隊列線程池,輕松寫并發(fā)

  • 擁有Interceptors輕松處理請求與響應(yīng)(比如透明GZIP壓縮,LOGGING)

  • 基于Headers的緩存策略(不僅可以緩存數(shù)據(jù),就連響應(yīng)頭都給緩存了)


源碼涉及的主要幾個對象
  • Call:對請求的封裝,有異步請求和同步請求众弓。

  • Dispatcher:任務(wù)調(diào)度器

  • Connection:是RealConnection的父類接口,表示對JDK中的物理socket進行了引用計數(shù)封裝隔箍,用來控制socket連接

  • HttpCodec:對Http請求進行編碼谓娃,對Http響應(yīng)進行解碼,由于Http協(xié)議有基于HTTP1.0和Http2.0的兩種情況蜒滩,Http1Code代表基于Http1.0協(xié)議的方式滨达,Http2Code代表基于Http2.0協(xié)議的方式。

  • StreamAllocation: 用來控制Connections/Streams的資源分配與釋放

  • RouteDatabase:用來保存連接的錯誤路徑俯艰,以便能提升連接的效率捡遍。

  • RetryAndFollowUpInterceptor 負責失敗重試以及重定向的攔截器

  • BridgeInterceptor: 負責把用戶構(gòu)造的請求轉(zhuǎn)換為發(fā)送到服務(wù)器的請求、把服務(wù)器返回的響應(yīng)轉(zhuǎn)為用戶友好的響應(yīng)的

  • CacheInterceptor: 負責讀取緩存直接返回竹握、更新緩存

  • ConnectInterceptor: 負責和服務(wù)器建立連接的

  • CallServerInterceptor:負責向服務(wù)器發(fā)送請求數(shù)據(jù)画株、從服務(wù)器讀取響應(yīng)數(shù)據(jù)

源碼解析

源碼開始之前我先貼一段OkHttp請求網(wǎng)絡(luò)的實例

   OkHttpClient mOkHttpClient = new OkHttpClient();

        final Request request = new Request.Builder()
                .url("http://www.reibang.com/u/b4e69e85aef6")
                .addHeader("user_agent","22222")
                .build();
          Call call = mOkHttpClient.newCall(request);
          call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {

            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                if(response != null )
                Log.i(TAG, "返回服務(wù)端數(shù)據(jù):"+ String.valueOf(response.body().string()));
            }
        });

(1)OkHttp網(wǎng)絡(luò)請求流程

Call call = mOkHttpClient.newCall(request);

首先會new一個Call對象出來,但其實真正new出來的對象是NewCall對象

 static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.eventListener = client.eventListenerFactory().create(call);
    return call;
  }

然后會執(zhí)行call的enqueue方法

 @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

該方法中首先判斷請求有沒有被執(zhí)行涩搓,如果請求已經(jīng)執(zhí)行污秆,那么直接拋出異常,如果請求沒有執(zhí)行昧甘,就會執(zhí)行Dispatcher對象的enqueue方法,Dispatcher的enqueue方法的源碼如下所示:

synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

如果正在運行的異步請求數(shù)量小于最大的并發(fā)數(shù)战得,且正在運行的客戶端實際數(shù)量請求小于規(guī)定的每個主機最大請求數(shù)量充边,那么就把該請求放進正在運行的異步請求隊列中,否則就把該請求放進將要執(zhí)行的異步請求隊列中常侦。


(2) Dispatcher任務(wù)調(diào)度

Dispatcher的各個參數(shù)的說明如下:

  //支持的最大并發(fā)請求數(shù)量
  private int maxRequests = 64;
 //每個主機的最大請求數(shù)量
  private int maxRequestsPerHost = 5;

  //請求線程池
  private @Nullable ExecutorService executorService;

  //將要運行的異步請求隊列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  //正在運行的異步請求隊列
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  //正在運行的同步請求隊列
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  • maxRequests :OkHttp支持的最大并發(fā)請求數(shù)量

  • maxRequestsPerHost :每個主機的最大請求數(shù)量

  • readyAsyncCalls :將要運行的異步請求隊列

  • runningAsyncCalls :正在運行的異步請求隊列

  • runningSyncCalls :正在運行的同步請求隊列

繼續(xù)看看Dispatcher的executorService方法浇冰,如下:

 public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

Dispatcher初始化了一個線程池,核心線程的數(shù)量為0 聋亡,最大的線程數(shù)量為Integer.MAX_VALUE肘习,空閑線程存在的最大時間為60秒,這個線程類似于CacheThreadPool坡倔,比較適合執(zhí)行大量的耗時比較少的任務(wù)漂佩。同時我們Dispatcher也可以來設(shè)置自己線程池脖含。

Dispatcher我們大概了解之后,回到之前說的投蝉,call的enqueue方法其實執(zhí)行的使Dispatcher的enqueue方法养葵,Dispatcher之后會把call放進請求隊列中,最終執(zhí)行由線程池來執(zhí)行請求任務(wù)瘩缆。下面來看看RealCall里究竟執(zhí)行了什么任務(wù)关拒。

  @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
       ...
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

RealCall通過執(zhí)行g(shù)etResponseWithInterceptorChain()返回Response,如果請求被取消則在進行OnFailue回調(diào)庸娱,如果請求成功則進行onResponse的回調(diào)着绊。
這里要注意兩點:

  • 請求如果被取消,其回調(diào)實在onFailue中進行回調(diào)的

  • enqueue方法的回調(diào)是在子線程中完成的


(3) 攔截器

那么RealCall 的getResponseWithInterceptorChain方法中究竟干了些什么呢熟尉,它是如何返回Response的呢畔柔?

 Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());//1
    interceptors.add(retryAndFollowUpInterceptor);//2
    interceptors.add(new BridgeInterceptor(client.cookieJar()));//3
    interceptors.add(new CacheInterceptor(client.internalCache()));//4
    interceptors.add(new ConnectInterceptor(client));//5
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());//6 
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));//7

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);//8
  }

  1. 在配置 OkHttpClient 時設(shè)置的 interceptors ()

  2. 負責失敗重試以及重定向的RetryAndFollowUpInterceptor

  3. 負責把用戶構(gòu)造的請求轉(zhuǎn)換為發(fā)送到服務(wù)器的請求、把服務(wù)器返回的響應(yīng)轉(zhuǎn)為用戶友好的響應(yīng)的 BridgeInterceptor

  4. 負責讀取緩存直接返回臣樱、更新緩存的 CacheInterceptor

  5. 負責和服務(wù)器建立連接的 ConnectInterceptor

  6. 配置 OkHttpClient 時設(shè)置的 networkInterceptors

  7. 負責向服務(wù)器發(fā)送請求數(shù)據(jù)靶擦、從服務(wù)器讀取響應(yīng)數(shù)據(jù)的 CallServerInterceptor

  8. 在 return chain.proceed(originalRequest),中開啟鏈式調(diào)用

RealInterceptorChain的proceed方法源碼如下:

  public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.httpCodec != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
  }

理解了這段代碼其實整個OkHttp核心流程你就基本掌握了雇毫,開始看的時候大家可能頭都大了玄捕,可是當你debug一下你就豁然開朗了。這段代碼核心在下面這部分:

 // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(ne
xt);

首先來了解一下攔截器吧棚放,攔截器是一種能夠監(jiān)控枚粘、重寫,重試調(diào)用的機制飘蚯。通常情況下馍迄,攔截器用來添加、移除局骤、轉(zhuǎn)換請求和響應(yīng)的頭部信息攀圈。比如將域名替換為IP地址,在請求頭中移除添加host屬性峦甩;也可以添加我們應(yīng)用中的一些公共參數(shù)赘来,比如設(shè)備id、版本號凯傲,等等犬辰。

攔截器的基本代碼結(jié)構(gòu)如下:

public interface Interceptor {
  Response intercept(Chain chain) throws IOException;

  interface Chain {
    Request request();

    Response proceed(Request request) throws IOException;

}
}
  • 攔截器的intercept方法中持有一個Chain對象,上面的RealInterceptorChain其實就是一個Chain的實現(xiàn)類冰单,然后chain對象的request方法可以拿到Request對象幌缝,proceed方法可以拿到Response對象,也就是說我們可以通過實現(xiàn)Interceptor诫欠,定義一個攔截器對象涵卵,然后拿到請求和Response對象浴栽,對Request和Response進行修改。

  • 事實上OkHttp就是通過定義許多攔截器一步一步地對Request進行攔截處理(從頭至尾)缘厢,直到請求返回網(wǎng)絡(luò)數(shù)據(jù)吃度,后面又倒過來,一步一步地對Response進行攔截處理贴硫,最后攔截的結(jié)果就是回調(diào)的最終Response椿每。(從尾至頭)

  • 回頭再看RealInterceptorChain的proceed方法,通過順序地傳入一個攔截器的集合英遭,創(chuàng)建一個RealInterceptorChain间护,然后拿到之前OkHttp創(chuàng)建的各種攔截器,并調(diào)用其interrupt方法挖诸,并返回Response對象汁尺。其調(diào)用順序如下:
okhttp攔截器.png

再來看看各個攔截器的源碼:

  1. 在配置 OkHttpClient 時設(shè)置的 interceptors ()

  2. 負責失敗重試以及重定向的RetryAndFollowUpInterceptor

  3. 負責把用戶構(gòu)造的請求轉(zhuǎn)換為發(fā)送到服務(wù)器的請求、把服務(wù)器返回的響應(yīng)轉(zhuǎn)為用戶友好的響應(yīng)的 BridgeInterceptor

  4. 負責讀取緩存直接返回多律、更新緩存的 CacheInterceptor

  5. 負責和服務(wù)器建立連接的 ConnectInterceptor

  6. 配置 OkHttpClient 時設(shè)置的 networkInterceptors

  7. 負責向服務(wù)器發(fā)送請求數(shù)據(jù)痴突、從服務(wù)器讀取響應(yīng)數(shù)據(jù)的 CallServerInterceptor


  • RetryAndFollowUpInterceptor:進行連接失敗重新連接,以及重定向
 @Override public Response intercept(Chain chain) throws IOException {
  Request request = chain.request();
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  Call call = realChain.call();
...

  followUpCount = 0;
  Response priorResponse = null;
 while (true) {
    if (canceled) {
      streamAllocation.release();
 throw new IOException("Canceled");
  }

    Response response;
 boolean releaseConnection = true;
 try {
      response = realChain.proceed(request, streamAllocation, null, null);
 ...

   Request followUp = followUpRequest(response);   
if (followUp == null) {
      if (!forWebSocket) {
        streamAllocation.release();
  }
      return response;
  }

...

 if (++followUpCount > MAX_FOLLOW_UPS) {
  streamAllocation.release();
 throw new ProtocolException("Too many follow-up requests: " + followUpCount); }
...
    
   request = followUp;
   priorResponse = response;
  }
}

整段代碼就是在一個死循環(huán)

  • 可以看出重連接的次數(shù)最多為20次

  • 重定向功能的邏輯在followUpRequest方法中狼荞,這個方法會根據(jù)響應(yīng)頭中的location字段獲取重定向的url辽装,并通過requestBuilder重新new一個Request對象,并改變request的response的值相味,然后重新進行攔截拾积。

  • BridgeInterceptor:對請求頭和響應(yīng)頭進行修改
  @Override public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();

    RequestBody body = userRequest.body();
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }

      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }

    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive");
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }

    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", Version.userAgent());
    }

    Response networkResponse = chain.proceed(requestBuilder.build());

    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);

    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      String contentType = networkResponse.header("Content-Type");
      responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
    }

    return responseBuilder.build();
  }
  • CacheInterceptor:讀取緩存和更新緩存的操作

攔截request并讀取緩存,該操作在proceed方法之前執(zhí)行丰涉,也就是在請求的時候進行緩存判斷拓巧。

 @Override public Response intercept(Chain chain) throws IOException {
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();

    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;

    if (cache != null) {
      cache.trackResponse(strategy);
    }

    if (cacheCandidate != null && cacheResponse == null) {
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

    // If we don't need the network, we're done.
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }

    Response networkResponse = null;
    try {
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

判斷是否應(yīng)該更新緩存

 // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }

    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();

    if (cache != null) {
      if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
      }

      if (HttpMethod.invalidatesCache(networkRequest.method())) {
        try {
          cache.remove(networkRequest);
        } catch (IOException ignored) {
          // The cache cannot be written.
        }
      }
    }

    return response;
  }
  • ConnectInterceptor:與服務(wù)器進行連接
 @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }

實際上建立連接就是創(chuàng)建了一個 HttpCodec 對象,它將在后面的步驟中被使用一死,那它又是何方神圣呢肛度?它是對 HTTP 協(xié)議操作的抽象,有兩個實現(xiàn):Http1Codec和 Http2Codec摘符,顧名思義贤斜,它們分別對應(yīng) HTTP/1.1 和 HTTP/2 版本的實現(xiàn)。

在 Http1Codec中逛裤,它利用 Okio 對 Socket 的讀寫操作進行封裝,它對 java.io和 java.nio 進行了封裝猴抹,讓我們更便捷高效的進行 IO 操作带族。

而創(chuàng)建 HttpCodec 對象的過程涉及到 StreamAllocation、RealConnection代碼較長蟀给,這個過程概括來說蝙砌,就是找到一個可用的 RealConnection阳堕,再利用 RealConnection 的輸入輸出(BufferedSource 和 BufferedSink)創(chuàng)建 HttpCodec 對象,供后續(xù)步驟使用择克。

  • CallServerInterceptor:發(fā)送請求和接收數(shù)據(jù)
 @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();

    realChain.eventListener().requestHeadersStart(realChain.call());
    httpCodec.writeRequestHeaders(request);
    realChain.eventListener().requestHeadersEnd(realChain.call(), request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        realChain.eventListener().responseHeadersStart(realChain.call());
        responseBuilder = httpCodec.readResponseHeaders(true);
      }

      if (responseBuilder == null) {
        // Write the request body if the "Expect: 100-continue" expectation was met.
        realChain.eventListener().requestBodyStart(realChain.call());
        long contentLength = request.body().contentLength();
        CountingSink requestBodyOut =
            new CountingSink(httpCodec.createRequestBody(request, contentLength));
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);

        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
        realChain.eventListener()
            .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
      } else if (!connection.isMultiplexed()) {
        // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
        // from being reused. Otherwise we're still obligated to transmit the request body to
        // leave the connection in a consistent state.
        streamAllocation.noNewStreams();
      }
    }

    httpCodec.finishRequest();

    if (responseBuilder == null) {
      realChain.eventListener().responseHeadersStart(realChain.call());
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    realChain.eventListener()
        .responseHeadersEnd(realChain.call(), response);

    int code = response.code();
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }
  1. 向服務(wù)器發(fā)送 request header恬总;

  2. 如果有 request body,就向服務(wù)器發(fā)送肚邢;

  3. 讀取 response header壹堰,先構(gòu)造一個 Response對象木羹;

  4. 如果有 response body奶甘,就在 3 的基礎(chǔ)上加上 body 構(gòu)造一個新的 Response對象生闲;

這里我們可以看到字币,核心工作都由 HttpCodec 對象完成绝页,而 HttpCodec 實際上利用的是 Okio轰驳,而 Okio 實際上還是用的 Socket蹲坷,所以沒什么神秘的坷剧,只不過一層套一層浦夷,層數(shù)有點多辖试。

其實 Interceptor的設(shè)計也是一種分層的思想,每個 Interceptor 就是一層劈狐。為什么要套這么多層呢罐孝?分層的思想在 TCP/IP 協(xié)議中就體現(xiàn)得淋漓盡致,分層簡化了每一層的邏輯懈息,每層只需要關(guān)注自己的責任(單一原則思想也在此體現(xiàn))肾档,而各層之間通過約定的接口/協(xié)議進行合作(面向接口編程思想),共同完成復雜的任務(wù)辫继,這是典型的責任鏈設(shè)計模式

責任鏈模式是一種對象的行為模式怒见。在責任鏈模式里,很多對象由每一個對象對其下家的引用而連接起來形成一條鏈姑宽。請求在這個鏈上傳遞遣耍,直到鏈上的某一個對象決定處理此請求。發(fā)出這個請求的客戶端并不知道鏈上的哪一個對象最終處理這個請求炮车,這使得系統(tǒng)可以在不影響客戶端的情況下動態(tài)地重新組織和分配責任舵变。

  • OkHttp的整個運行流程圖
okhttp_full_process.png

(4)OkHttp的復用連接池

Http有一種叫做keepalive connections的機制,而okHttp支持5個并發(fā)socket連接瘦穆,默認keepalive時間為5分鐘纪隙,接下來我們學習okHttp是如何復用連接的。

  • 主要變量與構(gòu)造方法
    連接池的類位于okHttp.ConnectionPool扛或,它的主要變量如下:
  private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

  /** The maximum number of idle connections for each address. */
  private final int maxIdleConnections;
  private final long keepAliveDurationNs;
  private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

  private final Deque<RealConnection> connections = new ArrayDeque<>();
  final RouteDatabase routeDatabase = new RouteDatabase();
  boolean cleanupRunning;

主要變量說明一下:

  • executor線程池:類似于CachedThreadPool绵咱,需要注意的是這種線程池的工作隊列采用了沒有容量的SynchronousQueue。

  • Deque 雙向隊列:雙端隊列同時具有隊列和棧的性質(zhì)熙兔,經(jīng)常在緩存中被使用悲伶,里面維護了RealConnection也就是Socket物理連接的包裝艾恼。

  • RouteDatabase :它用來記錄連接失敗的路線名單,當連接失敗時就會把失敗的路線加進去麸锉。

  • ConnectionPool的構(gòu)造方法如下所示:

 public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
    }
  }

通過構(gòu)造方法可以看出CollectionPool默認空閑的socket最大連接數(shù)為5個钠绍,socket的keepalive時間為5分鐘。CollectionPool實在OkHttpClient實例化的時候創(chuàng)建的

  OkHttpClient(Builder builder) {
    this.dispatcher = builder.dispatcher;

    ...
    this.connectionPool = builder.connectionPool;
    ...
 
  }
  • 緩存操作

ConnectionPool提供對Deque<RealConnection>進行操作的方法分別為put花沉,get柳爽,connectionBecameIdle和evictAll
這幾個操作,分別對應(yīng)放入連接主穗,獲取連接泻拦,移除連接和移除所有連接操作。這里我們只舉例說明put和get操作忽媒。

  void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }

再添加到Deque之前首先要清理空閑線程争拐,這個后面會講到。再來看看get操作:

  @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection, true);
        return connection;
      }
    }
    return null;
  }

遍歷connections緩存列表晦雨。當某個連接計數(shù)小于限制的大小架曹,并且request的地址和緩存列表中此連接的地址完全匹配時,則直接復用緩存列表中的connection作為request的連接闹瞧。

  • 自動回收連接
    OkHttp時根據(jù)StreamAllocation引用計數(shù)是否為0來實現(xiàn)自動回收連接的绑雄。我們在put操作前首先要調(diào)用executor.execute(cleanupRunnable)來清理閑置的線程。我們來查看cleanupRunnable到底做了什么奥邮?
private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

線程不斷地調(diào)用clearup方法進行清理万牺,并返回下次需要清理的間隔時間,然后調(diào)用wait方法進行等待以釋放鎖與時間片洽腺。當?shù)却龝r間到了后脚粟,再次進行清理,并返回下次需要清理的間隔時間蘸朋,如此循環(huán)下去核无。接下來看看clearup方法,如下所示:

  long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

   
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

     
        if (pruneAndGetAllocationCount(connection, now) > 0) {//注釋<1>
          inUseConnectionCount++;
          continue;
        }

        idleConnectionCount++;

      
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }

      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {//注釋<2>
     
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
    
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
    
        return keepAliveDurationNs;
      } else {
        // No connections, idle or in use.
        cleanupRunning = false;
        return -1;//注釋<3>
      }
    }

    closeQuietly(longestIdleConnection.socket());
    return 0;
  }

clearup方法所做的事情非常簡單總結(jié)就是藕坯,根據(jù)連接中的引用計數(shù)來計算空閑連接數(shù)和活躍連接數(shù)团南,然后標記空閑的連接。

  • 注釋<2>:如果空閑連接keepAlive時間超過5分鐘炼彪,或者空閑連接數(shù)超過5個吐根,則從Deque中移除此連接。接下來更具空閑連接或者活躍連接來返回下次需要清理的時間數(shù):
    如果空閑連接大于0辐马,則返回此連接即將到期的時間佑惠;
    如果都是活躍連接且大于0,則返回默認的keepAlive時間5分鐘齐疙;

  • 注釋<3>:如果沒有任何連接膜楷,則跳出循環(huán)并返回-1;

  • 注釋<1>:通過pruneAndGetAllocationCount方法來判斷連接是否閑置贞奋。如果pruneAndGetAllocationCount方法的返回值大于0則是活躍連接赌厅,否則就是空閑連接。接下來查看pruneAndGetAllocationCount方法轿塔,如下所示:

 private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    List<Reference<StreamAllocation>> references = connection.allocations;
    for (int i = 0; i < references.size(); ) {
      Reference<StreamAllocation> reference = references.get(i);

      if (reference.get() != null) {
        i++;
        continue;
      }

      // We've discovered a leaked allocation. This is an application bug.
      StreamAllocation.StreamAllocationReference streamAllocRef =
          (StreamAllocation.StreamAllocationReference) reference;
      String message = "A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?";
      Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);

      references.remove(i);
      connection.noNewStreams = true;

      // If this was the last allocation, the connection is eligible for immediate eviction.
      if (references.isEmpty()) {//注釋<1>
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }

    return references.size();
  }

pruneAndGetAllocationCount方法首先遍歷傳進來的RealConnection的StreamAllocation特愿;如果StreamAllocation未被使用,則接下來遍歷下一個StreamAllocation勾缭;如果StreamAllocation未被使用揍障,則從列表中移除。在上面代碼注釋1處俩由,如果列表為空毒嫡,則說明此連接沒有引用了,這時返回0幻梯,表示此連接時空閑連接兜畸;否則就返回非0的數(shù),表示此連接時活躍連接碘梢。那么StreamAllocation是什么?怎么才能判斷StreamAllocation使用與否咬摇?接著往下看。

  • 引用計數(shù)
    在OkHttp的高層代碼調(diào)用中煞躬,使用了類似于引用計數(shù)的方式跟蹤socket流的調(diào)用肛鹏。這里的計數(shù)對象是StreamAllocation,它被反復執(zhí)行acquire和release操作恩沛,這兩個方法其實是在改變RealConnection中 List<Reference<StreamAllocation>>的大小在扰。acquire方法和release方法,如下所示:
 public void acquire(RealConnection connection, boolean reportedAcquired) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection != null) throw new IllegalStateException();

    this.connection = connection;
    this.reportedAcquired = reportedAcquired;
    connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
  }

RealConnection是socket物理連接的包裝复唤,它里面維護了
List<Reference<StreamAllocation>>的引用健田。List中StreamAllocation的數(shù)量也是socket被引用的計數(shù)。如果計數(shù)為0佛纫,則說明此連接沒有被復用妓局,也就是空閑的,需要通過下文的算法實現(xiàn)回收呈宇;如果計數(shù)不為0好爬,則表示上層代碼仍然在引用,就無需關(guān)閉連接甥啄。

可以看出此連接池復用的核心就是用Deque<RealConnection>來存儲連接存炮,通過put,getconnectionBecameIdle和evictAll幾個操作來對Deque進行操作,另外通過判斷連接中的計數(shù)對象StreamAllocation來進行自動回收連接穆桂。

創(chuàng)作不易宫盔,如果本文對您有用的話,記得點一個贊哦


(1)參考文章:https://blog.piasy.com/2016/07/11/Understand-OkHttp/
(2)《Android進階之光》

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末享完,一起剝皮案震驚了整個濱河市灼芭,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌般又,老刑警劉巖彼绷,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異茴迁,居然都是意外死亡寄悯,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進店門堕义,熙熙樓的掌柜王于貴愁眉苦臉地迎上來猜旬,“玉大人,你說我怎么就攤上這事胳螟∥舨觯” “怎么了?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵糖耸,是天一觀的道長秘遏。 經(jīng)常有香客問我,道長嘉竟,這世上最難降的妖魔是什么邦危? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮舍扰,結(jié)果婚禮上倦蚪,老公的妹妹穿的比我還像新娘。我一直安慰自己边苹,他們只是感情好陵且,可當我...
    茶點故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著个束,像睡著了一般慕购。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上茬底,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天沪悲,我揣著相機與錄音,去河邊找鬼阱表。 笑死殿如,一個胖子當著我的面吹牛贡珊,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播涉馁,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼门岔,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了谨胞?” 一聲冷哼從身側(cè)響起固歪,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎胯努,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體逢防,經(jīng)...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡叶沛,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了忘朝。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片灰署。...
    茶點故事閱讀 38,137評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖局嘁,靈堂內(nèi)的尸體忽然破棺而出溉箕,到底是詐尸還是另有隱情,我是刑警寧澤悦昵,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布肴茄,位于F島的核電站,受9級特大地震影響但指,放射性物質(zhì)發(fā)生泄漏寡痰。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一棋凳、第九天 我趴在偏房一處隱蔽的房頂上張望拦坠。 院中可真熱鬧,春花似錦剩岳、人聲如沸贞滨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽晓铆。三九已至,卻和暖如春莫湘,著一層夾襖步出監(jiān)牢的瞬間尤蒿,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工幅垮, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留腰池,地道東北人。 一個月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓,卻偏偏與公主長得像示弓,于是被迫代替她去往敵國和親讳侨。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,901評論 2 345