OkHttp源碼流程分析(request篇)

OkHttp現(xiàn)在幾乎已經(jīng)占據(jù)了所有網(wǎng)絡(luò)請求 了解其內(nèi)部原理可以更好的進行擴展妖啥、封裝和優(yōu)化

我們今天分析一下OkHttp源碼 因為流程比較多 所以分為兩篇(請求和響應(yīng))來分析 Okio本文暫時不涉及 后面可能會更新一篇Okio的文章 版本基于4.5.0-RC1

請求方式

fun load() {
      //1.創(chuàng)建請求(包含url,method,headers,body)
      val request = Request
              .Builder()
              .url("")
              .build()
       //2.創(chuàng)建OkHttpClient (包含分發(fā)器磕谅、攔截器柒桑、DNS等)
       val okHttpClient = OkHttpClient.Builder().build()
       //3.創(chuàng)建Call(用于調(diào)用請求)
       val newCall = okHttpClient.newCall(request)
       //4.通過異步請求數(shù)據(jù)
       newCall.enqueue(object :Callback{
           override fun onFailure(call: Call, e: IOException) {}
           override fun onResponse(call: Call, response: Response) {}
       })
       //4.通過同步請求數(shù)據(jù)
       val response =  newCall.execute()
}

我們會按照順序來分析一下請求的流程

前面1,2,3步很多文章已經(jīng)分析過很多遍了 也比較簡單 同學(xué)們可以自己看一下 我們就不再贅述 我們直接看第四步進入今天的主要流程

Okhttp請求分為同步方式和異步方式 不過最終都是殊途同歸 我們以異步的方式分析一下請求流程

enqueue()

話不多說 先看一眼代碼

RealCall.enqueue()->
Dispatcher.enqueue()->
Dispatcher.promoteAndExecute()->
RealCall.executeOn()

override fun enqueue(responseCallback: Callback) {
    synchronized(this) {
        //檢查是否已經(jīng)開始運行
      check(!executed) { "Already Executed" }
      executed = true
    }
    callStart()
    //封裝AsyncCall對象 并放入隊列中等待執(zhí)行
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }
  
internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
        //放入等待隊列
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    //推進執(zhí)行
    promoteAndExecute()
  }
  
//將readyAsyncCalls中合格的請求過渡升級到runningAsyncCalls中
private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.//運行最大64
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.每個主機最大5

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }

上面的流程主要是將我們的異步任務(wù)放入隊列中 并且將可以運行的任務(wù)開啟運行

RealCall.executeOn()

fun executeOn(executorService: ExecutorService) {
      client.dispatcher.assertThreadDoesntHoldLock()

      var success = false
      try {
            //將當(dāng)前Runnable放到線程池中運行
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          client.dispatcher.finished(this) // This call is no longer running!
        }
      }
    }
 
override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
        //終于到這篇文章的重頭戲了
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          cancel()
          if (!signalledCallback) {
            val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
          client.dispatcher.finished(this)
        }
      }
    }
  }

我們知道OkHttp采用了攔截鏈模式 看一下何為攔截器模式 借用一下大佬的圖(懶得畫圖了??)

3631399-0626631d246373a4.png

我們可以看到 請求鏈會以鏈的形式調(diào)用下去 直到到鏈尾或者return response

getResponseWithInterceptorChain()

 @Throws(IOException::class)
  internal fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    //加入我們自己的攔截器
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
        //如果不是websocket的話 加入網(wǎng)絡(luò)攔截器
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)
    
    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
        //開始攔截鏈調(diào)用
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
  }
  

@Throws(IOException::class)
  override fun proceed(request: Request): Response {
    check(index < interceptors.size)

    calls++

    if (exchange != null) {
      check(exchange.finder.sameHostAndPort(request.url)) {
        "network interceptor ${interceptors[index - 1]} must retain the same host and port"
      }
      check(calls == 1) {
        "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
      }
    }

    // Call the next interceptor in the chain.
    //循環(huán)拿取下一個攔截器
    val next = copy(index = index + 1, request = request)
    val interceptor = interceptors[index]

    @Suppress("USELESS_ELVIS")
    //這里會調(diào)用攔截器的方法  接下來我們會分析各個攔截器的作用
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    if (exchange != null) {
      check(index + 1 >= interceptors.size || next.calls == 1) {
        "network interceptor $interceptor must call proceed() exactly once"
      }
    }

    check(response.body != null) { "interceptor $interceptor returned a response with no body" }

    return response
  }

我們看到攔截器的鏈路模式 其實就是遍歷調(diào)用各攔截器 對Request進行作用 直到return response

我們來逐個分析一下各個攔截器的作用 本文只分析處理Request,response的處理會在下文講解

RetryAndFollowUpInterceptor

作用:處理錯誤 重定向 所以在請求的過程中 沒有做太多事情 下文會分析如何處理錯誤 重定向

這個攔截器在請求的過程中幾乎沒做什么事情 只做了一件事 就是創(chuàng)建ExchangeFinder 這個對象在后面的ConnectInterceptor用來生成exchange對象

fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) {
    check(interceptorScopedExchange == null)
    check(exchange == null) {
      "cannot make a new request because the previous response is still open: " +
          "please call response.close()"
    }

    if (newExchangeFinder) {
      this.exchangeFinder = ExchangeFinder(
          connectionPool,
          createAddress(request.url),
          this,
          eventListener
      )
    }
  }

BridgeInterceptor

作用:添加必要的請求頭信息呛踊、gzip處理等

@Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val userRequest = chain.request()
    val requestBuilder = userRequest.newBuilder()

    val body = userRequest.body
    if (body != null) {
      val contentType = body.contentType()
      if (contentType != null) {
            //添加contentType
        requestBuilder.header("Content-Type", contentType.toString())
      }

      val contentLength = body.contentLength()
      if (contentLength != -1L) {
        requestBuilder.header("Content-Length", contentLength.toString())
        requestBuilder.removeHeader("Transfer-Encoding")
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked")
        requestBuilder.removeHeader("Content-Length")
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", userRequest.url.toHostHeader())
    }
    //默認keep-Alive
    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive")
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    //傳輸流的壓縮方式 默認gzip方式
    var transparentGzip = false
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true
      requestBuilder.header("Accept-Encoding", "gzip")
    }

    val cookies = cookieJar.loadForRequest(userRequest.url)
    //添加cookie
    if (cookies.isNotEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies))
    }

    //默認UA
    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", userAgent)
    }

    val networkResponse = chain.proceed(requestBuilder.build())
    ......handle response
  }

CacheInterceptor

okhttp默認的緩存機制會加快響應(yīng)流程 我們看一下緩存策略 首先 我們要解釋幾個變量

//緩存策略類
class CacheStrategy{
    //如果我們需要請求網(wǎng)絡(luò) 則networkRequest不為null 否則為null
    val networkRequest: Request?
    //請求的返回或者請求的響應(yīng) 如果無法使用緩存(一般是過期或者無緩存 則為null)
    val cacheResponse: Response?
}

上面兩個變量是緩存策略中比較重要的兩個變量 我們會根據(jù)這兩個變量來選擇是否命中緩存
先看一下結(jié)論

networkRequest\cacheResponse cacheResponse is null cacheResponse is not null
networkRequest is null ① 返回HTTP_GATEWAY_TIMEOUT 504錯誤 ② 直接使用緩存
networkRequest is not null ③ 進行網(wǎng)絡(luò)請求 并且緩存新response ④ 先請求 根據(jù)code(304) 判斷是否需要重新request

再看一下intercept()方法 可以對照上面兩個變量的解釋和表格來觀看

override fun intercept(chain: Interceptor.Chain): Response {
    //獲取緩存 如果我們配置了緩存 那么會去查找是否存在cache 
    //這里需要注意的一點是 okhttp默認并不會配置緩存 只是規(guī)范了一套緩存策略 我們可以自己通過OkHttpClient.Builder 的 cache 方法設(shè)置
    val cacheCandidate = cache?.get(chain.request())

    val now = System.currentTimeMillis()

    //這里的策略 會自動判斷是否使用緩存 是否存在緩存
    val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
    //這里就是上面我們解釋過的兩個變量
    val networkRequest = strategy.networkRequest
    val cacheResponse = strategy.cacheResponse

    cache?.trackResponse(strategy)

    //LruCache沒有hit cache 并且網(wǎng)絡(luò)緩存不可用 
    if (cacheCandidate != null && cacheResponse == null) {
      // The cache candidate wasn't applicable. Close it.
      //關(guān)閉cacheCandidate.body
      cacheCandidate.body?.closeQuietly()
    }

    //按照我們上面對緩存的解釋 不允許使用網(wǎng)絡(luò)請求 并且當(dāng)前沒有緩存 對應(yīng)表格中①
    // If we're forbidden from using the network and the cache is insufficient, fail.
    if (networkRequest == null && cacheResponse == null) {
      return Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(HTTP_GATEWAY_TIMEOUT)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
    }

    //不允許使用網(wǎng)絡(luò) 僅直接使用緩存 對應(yīng)表格②
    // If we don't need the network, we're done.
    if (networkRequest == null) {
      return cacheResponse!!.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build()
    }

    var networkResponse: Response? = null
    try {
      //使用網(wǎng)絡(luò)
      //對應(yīng)表格③④
      networkResponse = chain.proceed(networkRequest)
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        cacheCandidate.body?.closeQuietly()
      }
    }
     ......緩存新 response
   }

我們看完上面的代碼 發(fā)現(xiàn)所有的緩存策略都是根據(jù)networkRequestcacheResponse兩個變量進行控制的 接下來我們看一下緩存策略的生成過程

fun compute(): CacheStrategy {

        class Factory(
            private val nowMillis: Long,
            internal val request: Request,
            private val cacheResponse: Response? //這邊的cacheResponse和我們上面講的還不太一樣 這邊完全是Cache類中緩存的對象 如果我們之前請求過 并且緩存 則不為null
        )
        
       //根據(jù)request的cache-control 和response 的cache-control判斷
      val candidate = computeCandidate()

      //使用網(wǎng)絡(luò)請求 但是請求的request的cache-control是onlyIfCached 表示僅使用緩存
      //這就是個悖論了 所以直接返回504 對應(yīng)表格①
      // We're forbidden from using the network and the cache is insufficient.
      if (candidate.networkRequest != null && request.cacheControl.onlyIfCached) {
        return CacheStrategy(null, null)
      }

      return candidate
    }
    
    
    private fun computeCandidate(): CacheStrategy {
      // No cached response.
      //這里的cacheResponse是緩存中命中的  所以如果為null 表示之前沒有緩存
      if (cacheResponse == null) {
        return CacheStrategy(request, null)
      }

      // Drop the cached response if it's missing a required handshake.
      //如果缺少tls握手 直接請求網(wǎng)絡(luò)
      if (request.isHttps && cacheResponse.handshake == null) {
        return CacheStrategy(request, null)
      }

      // If this response shouldn't have been stored, it should never be used as a response source.
      // This check should be redundant as long as the persistence store is well-behaved and the
      // rules are constant.
      //根據(jù)cacheResponse的code 判斷是否允許cache 
      //判斷expires是否過期 并且request和reponse的cache-control都是noStore 
      //這里就不往下追蹤了 感興趣的同學(xué)可以自己閱讀一下
      if (!isCacheable(cacheResponse, request)) {
        return CacheStrategy(request, null)
      }

        //如果是nocache或者根據(jù)If-Modified-Since來判斷
        //If-Modified-Since會重新請求服務(wù)器 然后獲取last-modified-since 判斷是否修改過
      val requestCaching = request.cacheControl
      if (requestCaching.noCache || hasConditions(request)) {
        return CacheStrategy(request, null)
      }

      val responseCaching = cacheResponse.cacheControl

      val ageMillis = cacheResponseAge()
      var freshMillis = computeFreshnessLifetime()

      if (requestCaching.maxAgeSeconds != -1) {
        freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong()))
      }

      var minFreshMillis: Long = 0
      if (requestCaching.minFreshSeconds != -1) {
        minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong())
      }

      var maxStaleMillis: Long = 0
      if (!responseCaching.mustRevalidate && requestCaching.maxStaleSeconds != -1) {
        maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong())
      }
        
      if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
        val builder = cacheResponse.newBuilder()
        if (ageMillis + minFreshMillis >= freshMillis) {
          builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"")
        }
        val oneDayMillis = 24 * 60 * 60 * 1000L
        if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
          builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"")
        }
        return CacheStrategy(null, builder.build())
      }

      // Find a condition to add to the request. If the condition is satisfied, the response body
      // will not be transmitted.
      val conditionName: String
      val conditionValue: String?
      when {
        etag != null -> {
          conditionName = "If-None-Match"
          conditionValue = etag
        }

        lastModified != null -> {
          conditionName = "If-Modified-Since"
          conditionValue = lastModifiedString
        }

        servedDate != null -> {
          conditionName = "If-Modified-Since"
          conditionValue = servedDateString
        }
         //這里會重新request 然后判斷modified-time
        else -> return CacheStrategy(request, null) // No condition! Make a regular request.
      }

      val conditionalRequestHeaders = request.headers.newBuilder()
      conditionalRequestHeaders.addLenient(conditionName, conditionValue!!)

      val conditionalRequest = request.newBuilder()
          .headers(conditionalRequestHeaders.build())
          .build()
          //直接使用緩存
      return CacheStrategy(conditionalRequest, cacheResponse)
    }

我們可以根據(jù)上面的判斷,確定緩存策略
大致是根據(jù)cache-control或者handshake是否過期來判斷是否需要重新request
例如:
noStore,noCache,If-Modified-Since等等 所有的 cache-control可以參考這篇

如果在CacheInterceptor中hit cache的話 就不會再往下面的攔截器傳遞 而是直接原路返回 return response

ConnectInterceptor

作用:負責(zé)與服務(wù)器連接 這個攔截器的過程分析其實相當(dāng)復(fù)雜
簡單來說流程是從連接池中查找連接 如果不存在 就創(chuàng)建連接 并完成TCP,TLS握手
然后等待下一個CallServerInterceptor進行數(shù)據(jù)的交互

我們分析一下源碼 攔截器里的代碼真的很少 不過不要被表象欺騙了?? 我第一次看OkHttp源碼時 看到這里直接就跳過了 然后分析了CallServerInterceptor源碼之后 發(fā)現(xiàn)沒有獲取連接過程

 override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    //獲取exchange對象 exchange是我們用來和服務(wù)端交互的對象封裝 看一下initExchange方法
    val exchange = realChain.call.initExchange(chain)
    val connectedChain = realChain.copy(exchange = exchange)
    return connectedChain.proceed(realChain.request)
  }

initExchange()中主要會調(diào)用ExchangeFinder#find()然后根據(jù)下面的調(diào)用鏈
ExchangeFinder#find()->
ExchangeFinder#findHealthyConnection->
ExchangeFinder#findConnection
然后我們看一下findConnection()這個方法內(nèi)部就實現(xiàn)了connection的查找或創(chuàng)建
前方高能 下面代碼會又臭又長??

private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
    var foundPooledConnection = false
    var result: RealConnection? = null
    var selectedRoute: Route? = null
    var releasedConnection: RealConnection?
    val toClose: Socket?
    synchronized(connectionPool) {
      if (call.isCanceled()) throw IOException("Canceled")

      val callConnection = call.connection // changes within this overall method
      releasedConnection = callConnection
      //如果url不一致或者callConnection為null 就斷開鏈接
      toClose = if (callConnection != null && (callConnection.noNewExchanges ||
              !sameHostAndPort(callConnection.route().address.url))) {
        call.releaseConnectionNoEvents()
      } else {
        null
      }

      if (call.connection != null) {
        // We had an already-allocated connection and it's good.
        //經(jīng)過判斷上面驗證 如果不為null 發(fā)現(xiàn)當(dāng)前connection可用 那么就會直接復(fù)用connection
        result = call.connection
        releasedConnection = null
      }

      if (result == null) {
        // The connection hasn't had any problems for this call.
        refusedStreamCount = 0
        connectionShutdownCount = 0
        otherFailureCount = 0

        // Attempt to get a connection from the pool.
        //第一次試從連接池獲取
        if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
          foundPooledConnection = true
          result = call.connection
        } else if (nextRouteToTry != null) {
          selectedRoute = nextRouteToTry
          nextRouteToTry = null
        }
      }
    }
    toClose?.closeQuietly()

    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection!!)
    }
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result!!)
    }
    if (result != null) {
      // If we found an already-allocated or pooled connection, we're done.
      return result!!
    }

    // If we need a route selection, make one. This is a blocking operation.
    // 查看是否有新的路由信息
    var newRouteSelection = false
    if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
      var localRouteSelector = routeSelector
      if (localRouteSelector == null) {
        localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
        this.routeSelector = localRouteSelector
      }
      newRouteSelection = true
      routeSelection = localRouteSelector.next()
    }

    var routes: List<Route>? = null
    synchronized(connectionPool) {
      if (call.isCanceled()) throw IOException("Canceled")

      if (newRouteSelection) {
        // Now that we have a set of IP addresses, make another attempt at getting a connection from
        // the pool. This could match due to connection coalescing.
        routes = routeSelection!!.routes
        // 如果有新的路由 繼續(xù)從連接池中查找試試
        if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
          foundPooledConnection = true
          result = call.connection
        }
      }

      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection!!.next()
        }

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        result = RealConnection(connectionPool, selectedRoute!!)
        connectingConnection = result
      }
    }

    // If we found a pooled connection on the 2nd time around, we're done.
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result!!)
      return result!!
    }

    // Do TCP + TLS handshakes. This is a blocking operation.
    //現(xiàn)在發(fā)現(xiàn)可能連接池中沒有我們要的connection
    //進行TCP和TLS連接
    result!!.connect(
        connectTimeout,
        readTimeout,
        writeTimeout,
        pingIntervalMillis,
        connectionRetryEnabled,
        call,
        eventListener
    )
    call.client.routeDatabase.connected(result!!.route())

    var socket: Socket? = null
    synchronized(connectionPool) {
      connectingConnection = null
      // Last attempt at connection coalescing, which only occurs if we attempted multiple
      // concurrent connections to the same host.
      //最后一次嘗試從連接池中獲取 如果能獲取到 就使用連接池中 否則使用連接的connection 并且放入連接池中
      if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
        // We lost the race! Close the connection we created and return the pooled connection.
        result!!.noNewExchanges = true
        socket = result!!.socket()
        result = call.connection

        // It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
        // that case we will retry the route we just successfully connected with.
        nextRouteToTry = selectedRoute
      } else {
        //放入連接池中
        connectionPool.put(result!!)
        call.acquireConnectionNoEvents(result!!)
      }
    }
    socket?.closeQuietly()

    eventListener.connectionAcquired(call, result!!)
    return result!!
  }

上面的注釋寫的也比較多 流程其實也比較清晰了 我們接下來分析一下 如何從連接池中查找以及如何建立連接

//從連接池中獲取
fun callAcquirePooledConnection(
    address: Address,
    call: RealCall,
    routes: List<Route>?,
    requireMultiplexed: Boolean
  ): Boolean {
    this.assertThreadHoldsLock()

    for (connection in connections) {
        //判斷connection是否支持多路復(fù)用
      if (requireMultiplexed && !connection.isMultiplexed) continue
      //判斷connection的host是否匹配
      if (!connection.isEligible(address, routes)) continue
      call.acquireConnectionNoEvents(connection)
      return true
    }
    return false
  }

還有一個TCP和TLS握手流程

fun connect(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    call: Call,
    eventListener: EventListener
  ) {
    check(protocol == null) { "already connected" }

    var routeException: RouteException? = null
    val connectionSpecs = route.address.connectionSpecs
    val connectionSpecSelector = ConnectionSpecSelector(connectionSpecs)
    ......

    while (true) {
      try {
          //注釋的意思是 如果是通過HTTP代理HTTPS 那么需要連接Tunnel  
          //這里我是在是沒看懂什么意思 告辭?? 有知道的大佬可以留言告訴我一下
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
          if (rawSocket == null) {
            // We were unable to connect the tunnel but properly closed down our resources.
            break
          }
        } else {
          //先建立socket連接 包括代理的配置
          connectSocket(connectTimeout, readTimeout, call, eventListener)
        }
        //如果是Http2協(xié)議還會創(chuàng)建Http2連接 或者 TLS握手 
        //TLS流程大家可以參考一下我之前寫的一篇文章
        establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
        eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
        break
      } catch (e: IOException) {
        socket?.closeQuietly()
        rawSocket?.closeQuietly()
        socket = null
        rawSocket = null
        source = null
        sink = null
        handshake = null
        protocol = null
        http2Connection = null
        allocationLimit = 1

        eventListener.connectFailed(call, route.socketAddress, route.proxy, null, e)

        if (routeException == null) {
          routeException = RouteException(e)
        } else {
          routeException.addConnectException(e)
        }

        if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
          throw routeException
        }
      }
    }

    if (route.requiresTunnel() && rawSocket == null) {
      throw RouteException(ProtocolException(
          "Too many tunnel connections attempted: $MAX_TUNNEL_ATTEMPTS"))
    }

    idleAtNs = System.nanoTime()
  }

TLS握手流程有疑問的同學(xué) 可以看一下我之前寫的一篇文章吶 小飛機??覺得還可以可以點個贊哦

現(xiàn)在connection就獲取完成了 接下來就是與服務(wù)器的交互啦
還有創(chuàng)建了新connection之后會putConnectionPool中 其中還有一個clean操作 同學(xué)們可以自己看一下代碼吶

最后還有一個小點需要說明一下 因為關(guān)系到我們下一個攔截器的閱讀
在我們最上面的說到 我們會通過ExchangeFinder#find來生成ExchangeCodec對象

首先我們解釋一下ExchangeCodec作用okhttp會使用ExchangeCodec封裝了與服務(wù)器的IO操作
ExchangeCodec的實現(xiàn)類分別對應(yīng)協(xié)議是Http1ExchangeCodecHttp2ExchangeCodec

看一下find方法實現(xiàn)

fun find(
    client: OkHttpClient,
    chain: RealInterceptorChain
  ): ExchangeCodec {
    try {
      val resultConnection = findHealthyConnection(
          connectTimeout = chain.connectTimeoutMillis,
          readTimeout = chain.readTimeoutMillis,
          writeTimeout = chain.writeTimeoutMillis,
          pingIntervalMillis = client.pingIntervalMillis,
          connectionRetryEnabled = client.retryOnConnectionFailure,
          doExtensiveHealthChecks = chain.request.method != "GET"
      )
      //newCodec方法會對應(yīng)不同的HTTP協(xié)議生成ExchangeCodeC對象
      return resultConnection.newCodec(client, chain)
    } catch (e: RouteException) {
      trackFailure(e.lastConnectException)
      throw e
    } catch (e: IOException) {
      trackFailure(e)
      throw RouteException(e)
    }
  }

Http2

在閱讀CallServerInterceptor之前 我們有必要看一下Http2.0相關(guān)知識 因為在CallServerInterceptor中會根據(jù)不同的Http協(xié)議 使用不同的傳輸方式 我們看一下Http2.0發(fā)展的幾個階段

  • HTTP1.x 一個tcp連接只支持一個請求,單向,只能由客戶端發(fā)起
  • SPDY支持多路復(fù)用(Tcp連接復(fù)用),header壓縮,強制使用Https,服務(wù)端推送
  • HTTP2.0 支持明文和加密傳輸,優(yōu)化header壓縮算法,支持SPDY現(xiàn)有功能,全雙工
  • Quic基于UDP實現(xiàn)穩(wěn)定傳輸協(xié)議 弱網(wǎng)有優(yōu)化

CallServerInterceptor

作用:負責(zé)與服務(wù)器進行數(shù)據(jù)交互

在了解和服務(wù)器交互的流程之前 我想先介紹一下okio 這是Square公司開發(fā)的一款對java輸入輸出流的封裝框架
JAVA輸入輸出流真的是非常的復(fù)雜 子類繁多 而okio主要分為兩個接口SinkSource 分別對應(yīng)輸出和輸入相關(guān)

接下來我們看一下實現(xiàn)代碼 流程也比較簡單 就是發(fā)送請求+獲取響應(yīng)

 @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.exchange!!
    val request = realChain.request
    val requestBody = request.body
    val sentRequestMillis = System.currentTimeMillis()
    //將header寫入socket
    exchange.writeRequestHeaders(request)

    var invokeStartEvent = true
    var responseBuilder: Response.Builder? = null
    if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
        exchange.flushRequest()
        responseBuilder = exchange.readResponseHeaders(expectContinue = true)
        exchange.responseHeadersStart()
        invokeStartEvent = false
      }
      if (responseBuilder == null) {
        //如果支持復(fù)用 傳輸request body
        if (requestBody.isDuplex()) {
          // Prepare a duplex body so that the application can send a request body later.
          exchange.flushRequest()
          val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
          requestBody.writeTo(bufferedRequestBody)
        } else {
          // Write the request body if the "Expect: 100-continue" expectation was met.
          val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
          requestBody.writeTo(bufferedRequestBody)
          bufferedRequestBody.close()
        }
      } else {
        exchange.noRequestBody()
        if (!exchange.connection.isMultiplexed) {
          // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
          // from being reused. Otherwise we're still obligated to transmit the request body to
          // leave the connection in a consistent state.
          exchange.noNewExchangesOnConnection()
        }
      }
    } else {
      exchange.noRequestBody()
    }
    //request完成
    if (requestBody == null || !requestBody.isDuplex()) {
      exchange.finishRequest()
    }
    //獲取reponse header
    if (responseBuilder == null) {
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
      if (invokeStartEvent) {
        exchange.responseHeadersStart()
        invokeStartEvent = false
      }
    }
    var response = responseBuilder
        .request(request)
        .handshake(exchange.connection.handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build()
    var code = response.code
    if (code == 100) {
      // Server sent a 100-continue even though we did not request one. Try again to read the actual
      // response status.
      //100表示繼續(xù)獲取
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
      if (invokeStartEvent) {
        exchange.responseHeadersStart()
      }
      response = responseBuilder
          .request(request)
          .handshake(exchange.connection.handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
      code = response.code
    }

    exchange.responseHeadersEnd(response)

    response = if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response.newBuilder()
          .body(EMPTY_RESPONSE)
          .build()
    } else {
      //獲取response body
      response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build()
    }
    if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
        "close".equals(response.header("Connection"), ignoreCase = true)) {
      exchange.noNewExchangesOnConnection()
    }
    if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
      throw ProtocolException(
          "HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
    }
    return response
  }

上面的代碼 流程也比較簡單 就是request+response
我們分析一下分析一下寫入Request流程

ExchangeCodec#writeRequestHeaders(request)

傳輸頭部在HTTP1.x和HTTP2.0有點區(qū)別,HTTP1.x就直接將header通過寫入Sink Buffer,而HTTP2.0會先創(chuàng)建http2Connection.newStream()對象

@Synchronized @Throws(IOException::class)
  fun headers(
    outFinished: Boolean,
    streamId: Int,
    headerBlock: List<Header>
  ) {
    if (closed) throw IOException("closed")
    //Hpack壓縮算法 將壓縮后數(shù)據(jù)存入hpackBuffer
    hpackWriter.writeHeaders(headerBlock)

    val byteCount = hpackBuffer.size
    val length = minOf(maxFrameSize.toLong(), byteCount)
    var flags = if (byteCount == length) FLAG_END_HEADERS else 0
    if (outFinished) flags = flags or FLAG_END_STREAM
    //HTTP2.0特性 幀傳輸
    frameHeader(
        streamId = streamId,
        length = length.toInt(),
        type = TYPE_HEADERS,
        flags = flags
    )
    sink.write(hpackBuffer, length)

    if (byteCount > length) writeContinuationFrames(streamId, byteCount - length)
  }

這里有兩個可以HTTP2.0特性可以關(guān)注一下

  • HPACK壓縮
  • 傳輸幀

總結(jié)

OkHttp的Request流程分析就到此結(jié)束了 接下來會接著分析一下Response的處理 覺得有收獲的同學(xué)們歡迎點贊留言啊

參考

OKio - 重新定義了“短小精悍”的IO框架
淺析HTTPS握手流程

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末板祝,一起剝皮案震驚了整個濱河市献起,隨后出現(xiàn)的幾起案子口蝠,更是在濱河造成了極大的恐慌涨椒,老刑警劉巖摊鸡,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異蚕冬,居然都是意外死亡免猾,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門囤热,熙熙樓的掌柜王于貴愁眉苦臉地迎上來猎提,“玉大人,你說我怎么就攤上這事旁蔼∠撬眨” “怎么了疙教?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長伞租。 經(jīng)常有香客問我贞谓,道長,這世上最難降的妖魔是什么葵诈? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任裸弦,我火速辦了婚禮,結(jié)果婚禮上作喘,老公的妹妹穿的比我還像新娘理疙。我一直安慰自己,他們只是感情好泞坦,可當(dāng)我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布沪斟。 她就那樣靜靜地躺著,像睡著了一般暇矫。 火紅的嫁衣襯著肌膚如雪主之。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天李根,我揣著相機與錄音槽奕,去河邊找鬼。 笑死房轿,一個胖子當(dāng)著我的面吹牛粤攒,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播囱持,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼夯接,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了纷妆?” 一聲冷哼從身側(cè)響起盔几,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎掩幢,沒想到半個月后逊拍,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡际邻,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年芯丧,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片世曾。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡缨恒,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情骗露,我是刑警寧澤岭佳,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站椒袍,受9級特大地震影響驼唱,放射性物質(zhì)發(fā)生泄漏藻茂。R本人自食惡果不足惜驹暑,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望辨赐。 院中可真熱鬧优俘,春花似錦、人聲如沸掀序。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽不恭。三九已至叶雹,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間换吧,已是汗流浹背折晦。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留沾瓦,地道東北人满着。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像贯莺,于是被迫代替她去往敵國和親风喇。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容