「okhttp3 4.9.3 版本簡單解析」

「okhttp3 4.9.3 版本簡單解析」

一、寫在前面

關(guān)于okhttp3的解析網(wǎng)上已經(jīng)有非常多優(yōu)秀的博文了,每每看完都覺得醍醐灌頂遮精,豁然開朗。但等不了幾天再回頭看败潦,還是跟當(dāng)初一樣陌生本冲,究其根本原因准脂,我們不過是在享受著別人的成果跟著別人的思路云閱讀源碼了一遍。okhttp從早期的Java版本到Kotlin版本一直不斷優(yōu)化升級檬洞,實(shí)現(xiàn)細(xì)節(jié)上也作出了調(diào)整意狠。重讀源碼加上自身的思考能深刻的理解okhttp的實(shí)現(xiàn)原理。

二疮胖、從基本使用說起
  • 項(xiàng)目中引入依賴环戈,Github地址okhttp
dependencies {
  //...
  implementation("com.squareup.okhttp3:okhttp:4.9.3")
}
  • 從一個(gè)簡單的同步(execute)請求入手分析整體的加載流程:
private fun clientNetWork() {
     val request: Request = Request.Builder()
         .url("https://www.baidu.com")
         .build()
     OkHttpClient().newCall(request).execute()
}

execute()開始,發(fā)現(xiàn)其實(shí)是一個(gè)接口中的方法(Call)澎灸,這個(gè)很好理解根據(jù)官方的解釋院塞,Call其實(shí)是一個(gè)待執(zhí)行的請求,并且這個(gè)請求所要的參數(shù)已經(jīng)被準(zhǔn)備好性昭;當(dāng)然既然是請求拦止,那么它是可以被取消的。其次代表單個(gè)請求與響應(yīng)流糜颠,因此不能夠被再次執(zhí)行汹族。

A call is a request that has been prepared for execution. A call can be canceled. As this object represents a single request/response pair (stream), it cannot be executed twice.

Call接口具體的代碼實(shí)現(xiàn),重點(diǎn)關(guān)注同步執(zhí)行方法execute()與異步請求enqueue()

interface Call : Cloneable {
  fun request(): Request
  //同步請求
  @Throws(IOException::class)
  fun execute(): Response
  //起步請求
  fun enqueue(responseCallback: Callback)
  
  fun cancel()
  
  fun isExecuted(): Boolean
  
  fun isCanceled(): Boolean
  
  fun interface Factory {
   fun newCall(request: Request): Call
  }
}

Call作為接口其兴,那么具體的實(shí)現(xiàn)細(xì)節(jié)則需要看它的實(shí)現(xiàn)類顶瞒,RealCall作為Call的實(shí)現(xiàn)類,先找到對execute()的重寫元旬。

  • RealCall-是應(yīng)用程序與網(wǎng)絡(luò)之間的橋梁榴徐,看看具體有什么作用,首先分析同步請求execute()方法的實(shí)現(xiàn):
override fun execute(): Response {
  check(executed.compareAndSet(false, true)) { "Already Executed" } //#1
  timeout.enter()//#2
  callStart()//#3
  try {
    client.dispatcher.executed(this)//#4
    return getResponseWithInterceptorChain()//#5
  } finally {
    client.dispatcher.finished(this)//#6
  }
}

代碼很少匀归,逐步分析坑资,首先對同步請求進(jìn)行了檢查-判斷請求是否已經(jīng)被執(zhí)行過了;而這里使用的是并發(fā)包下的原子類CAS樂觀鎖穆端,這里使用CAS比較算法目的也是為提升效率袱贮。其次是超時(shí)時(shí)間的判斷,這個(gè)比較簡單体啰。在看callStart()的具體實(shí)現(xiàn)攒巍。上代碼:

  private fun callStart() {
    this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
    eventListener.callStart(this)
  }

看名稱猜測應(yīng)該是事件監(jiān)聽之類的,可能是包括一些信息的記錄與打印狡赐∫ひ担回到RealCall類中,看看這個(gè)eventListener的作用到底是什么:

internal val eventListener: EventListener = client.eventListenerFactory.create(this)

繼續(xù)向下可以知道這個(gè)EventListener是一個(gè)抽象類枕屉,而項(xiàng)目中其唯一實(shí)現(xiàn)類為LoggingEventListener常柄,猜測還是有依據(jù)的,繼續(xù)往下看:

abstract class EventListener {
    open fun callStart(
    call: Call
  ) {
  }
  //省略了其他的方法
}

實(shí)現(xiàn)類LoggingEventListener中對此方法的具體實(shí)現(xiàn):

class LoggingEventListener private constructor(
  private val logger: HttpLoggingInterceptor.Logger
) : EventListener() {
    override fun callStart(call: Call) {
    startNs = System.nanoTime()
    logWithTime("callStart: ${call.request()}")
  }
}

總結(jié)這個(gè)callStart()的作用,當(dāng)同步請求或者異步請求被加到隊(duì)列時(shí)西潘,callStart()會(huì)被立即執(zhí)行(在沒有達(dá)到線程限制的情況下)記錄請求開始的時(shí)間與請求的一些信息卷玉。如下:

  override fun callStart(call: Call) {
    startNs = System.nanoTime()
    logWithTime("callStart: ${call.request()}")
  }
  
    private fun logWithTime(message: String) {
    val timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs)
    logger.log("[$timeMs ms] $message")
  }

代賣第四段#4 client.dispatcher.executed(this),看樣子是在這里開啟執(zhí)行的喷市,可實(shí)際真是如此嘛相种?回到OkHttpClient類中,看看這個(gè)分發(fā)器dispatcher到底是什么品姓。

@get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher

具體實(shí)現(xiàn)類Dispatcher代碼(保留重要代碼):

class Dispatcher constructor() {
  /** Ready async calls in the order they'll be run. */
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()
  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()
  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private val runningSyncCalls = ArrayDeque<RealCall>()

  constructor(executorService: ExecutorService) : this() {
    this.executorServiceOrNull = executorService
  }
    @get:Synchronized var maxRequests = 64
    set(maxRequests) {
      require(maxRequests >= 1) { "max < 1: $maxRequests" }
      synchronized(this) {
        field = maxRequests
      }
      promoteAndExecute()
    }
    @get:Synchronized var maxRequestsPerHost = 5
    set(maxRequestsPerHost) {
      require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
      synchronized(this) {
        field = maxRequestsPerHost
      }
      promoteAndExecute()
    }
    //異步請求
    internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)
      //Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to the same host.
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }
  //同步請求
  /** Used by [Call.execute] to signal it is in-flight. */
  @Synchronized internal fun executed(call: RealCall) {
    runningSyncCalls.add(call)
  }
    private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()
    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()
        //Max capacity.
        if (runningAsyncCalls.size >= this.maxRequests) break 
        //Host max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue 
        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[I]
      asyncCall.executeOn(executorService)
    }
    return isRunning
  }
}

根據(jù)注釋的信息可以知道寝并,Dispatcher是處理異步請求的執(zhí)行的策略,當(dāng)然開發(fā)可以實(shí)現(xiàn)自己的策略腹备。

Policy on when async requests are executed.
Each dispatcher uses an ExecutorService to run calls internally. If you supply your own executor, it should be able to run the configured maximum number of calls concurrently.

知道了Dispatcher的作用衬潦,再回到client.dispatcher.executed(this),也即:

  /** Used by [Call.execute] to signal it is in-flight. */
  @Synchronized internal fun executed(call: RealCall) {
    runningSyncCalls.add(call)
  }

結(jié)合execute()Dispatcher分析

  • Dispatcher主要是對異步請求的調(diào)度作用植酥,內(nèi)部實(shí)現(xiàn)了線程池镀岛,對請求進(jìn)行分發(fā)執(zhí)行。
  • 包含三個(gè)雙端隊(duì)列分別為等待的異步隊(duì)列readyAsyncCalls友驮、執(zhí)行中的隊(duì)列runningAsyncCalls漂羊、執(zhí)行中的同步隊(duì)列runningSyncCalls。這里使用ArrayDeque卸留,為什么不使用LinkedList有興趣的可以思考一下走越。
  • 當(dāng)請求為同步請求時(shí),內(nèi)部執(zhí)行execute()RealCall添加到runningSyncCalls隊(duì)列中艾猜。

到這里請求其實(shí)還沒有真正的執(zhí)行买喧,只是在做一些前期的工作捻悯,回到Call接口中看看對方法同步請求方法execute()的說明:同步請求可以立即執(zhí)行匆赃,阻塞直到返回正確的結(jié)果,或者報(bào)錯(cuò)結(jié)束今缚。

Invokes the request immediately, and blocks until the response can be processed or is in error.

#4步執(zhí)行后算柳,return getResponseWithInterceptorChain() //#5這個(gè)方法才是請求一步步推進(jìn)的核心。也是okhttp網(wǎng)絡(luò)請求責(zé)任鏈的核心模塊姓言。

三瞬项、鏈的開啟-getResponseWithInterceptorChain()
  • getResponseWithInterceptorChain()具體實(shí)現(xiàn)
  @Throws(IOException::class)
  internal fun getResponseWithInterceptorChain(): Response {
    //Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)
    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )
    var calledNoMoreExchanges = false
    try {
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
  }

分析getResponseWithInterceptorChain()方法之前有必要看看OkHttpClient的構(gòu)造參數(shù),使用的Builder模式何荚,參數(shù)很多囱淋,可配置化的東西很多,精簡一下主要關(guān)注幾個(gè)參數(shù):

class Builder constructor() {
  //異步請求任務(wù)調(diào)度器
  internal var dispatcher: Dispatcher = Dispatcher()
  //鏈接池
  internal var connectionPool: ConnectionPool = ConnectionPool()
  //自定義攔截器
  internal val interceptors: MutableList<Interceptor> = mutableListOf()
  //自定義網(wǎng)絡(luò)攔截器
  internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
  //緩存
  internal var cache: Cache? = null
  //代理
  internal var proxy: Proxy? = null
  //鏈接協(xié)議
  internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
  //.......
}
//添加自定義攔截器的方法
fun addInterceptor(interceptor: Interceptor) = apply {
  interceptors += interceptor
}
//添加自定義網(wǎng)絡(luò)攔截器的方法
fun addNetworkInterceptor(interceptor: Interceptor) = apply {
  networkInterceptors += interceptor
}

到這里有個(gè)疑問餐塘,這個(gè)添加自定義攔截器與添加自定義網(wǎng)絡(luò)攔截器有什么區(qū)別呢妥衣?方法看上去是差不多的,查看官方的說明可以發(fā)現(xiàn)一些細(xì)節(jié)。文檔中解釋了Application InterceptorNetwork Interceptors的細(xì)微差別税手。先回到RealCall中查看getResponseWithInterceptorChain()是如何對攔截器結(jié)合組裝的:

internal fun getResponseWithInterceptorChain(): Response {
  // Build a full stack of interceptors.
  val interceptors = mutableListOf<Interceptor>()
  interceptors += client.interceptors //#1
  interceptors += RetryAndFollowUpInterceptor(client)
  interceptors += BridgeInterceptor(client.cookieJar)
  interceptors += CacheInterceptor(client.cache)
  interceptors += ConnectInterceptor
  if (!forWebSocket) {
    interceptors += client.networkInterceptors//#2
  }
  interceptors += CallServerInterceptor(forWebSocket)
}

#1#2分別對應(yīng)添加的自定義攔截器與自定義網(wǎng)絡(luò)攔截器的位置蜂筹,自定義攔截器是攔截器鏈的鏈頭,而自定義網(wǎng)絡(luò)攔截器在ConnectInterceptor攔截器與CallServerInterceptor攔截器之間被添加芦倒∫张玻總結(jié)一下:

  • 自定義攔截器在整個(gè)攔截器鏈的頭部,做一些請求的之前的準(zhǔn)備工作兵扬,包括一些Log的信息打印麻裳,如LoggingInterceptor

  • 自定義網(wǎng)絡(luò)攔截器添加在ConnectInterceptorCallServerInterceptor之間器钟,可以執(zhí)行兩次掂器,并且獲得的信息更多,包括原網(wǎng)址的信息和重定向后網(wǎng)址的信息俱箱。

  • Application interceptors

Don’t need to worry about intermediate responses like redirects and retries.

Are always invoked once, even if the HTTP response is served from the cache.

Observe the application’s original intent. Unconcerned with OkHttp-injected headers like If-None-Match.

Permitted to short-circuit and not call Chain.proceed().

Permitted to retry and make multiple calls to Chain.proceed().

Can adjust Call timeouts using withConnectTimeout, withReadTimeout, withWriteTimeout.

Able to operate on intermediate responses like redirects and retries.

Not invoked for cached responses that short-circuit the network.

Observe the data just as it will be transmitted over the network.

Access to the Connection that carries the request.

WechatIMG59.jpeg

綜上可以得出整個(gè)鏈的順序結(jié)構(gòu)国瓮,如果都包含自定義攔截器與自定義網(wǎng)絡(luò)攔截器,則為自定義攔截器->RetryAndFollowUpInterceptor->BridgeInterceptor->CacheInterceptor->ConnectInterceptor->自定義網(wǎng)絡(luò)攔截器->CallServerInterceptor狞谱;那么鏈?zhǔn)侨绾伟凑枕樞蛞来螆?zhí)行的呢乃摹?okhttp在這里設(shè)計(jì)比較精妙,在構(gòu)造RealInterceptorChain對象時(shí)帶入index信息跟衅,這個(gè)index記錄的就是單個(gè)攔截器鏈的位置信息孵睬,而RealInterceptorChain.proceed(request: Request)通過index++自增一步步執(zhí)行責(zé)任鏈一直到鏈尾。

val chain = RealInterceptorChain(
  call = this,
  interceptors = interceptors,
  //記錄了攔截器對應(yīng)的index通過對
  index = 0,
  exchange = null,
  request = originalRequest,
  connectTimeoutMillis = client.connectTimeoutMillis,
  readTimeoutMillis = client.readTimeoutMillis,
  writeTimeoutMillis = client.writeTimeoutMillis
)
val response = chain.proceed(originalRequest)
四伶跷、proceed鏈的推進(jìn)
  • RealInterceptorChain中方法proceed()方法的具體實(shí)現(xiàn)
@Throws(IOException::class)
override fun proceed(request: Request): Response {
  check(index < interceptors.size)
  calls++
  if (exchange != null) {
   check(exchange.finder.sameHostAndPort(request.url)) {
    "network interceptor ${interceptors[index - 1]} must retain the same host and port"
   }
   check(calls == 1) {
    "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
   }
  }
  // Call the next interceptor in the chain.
  val next = copy(index = index + 1, request = request)
  val interceptor = interceptors[index]
  @Suppress("USELESS_ELVIS")
  val response = interceptor.intercept(next) ?: throw NullPointerException(
    "interceptor $interceptor returned null")
  //.....
  return response
}

簡單的分析推進(jìn)過程:

1.RealInterceptorChain的構(gòu)造參數(shù)中攜帶了index的信息掰读,index++自增通過proceed方法不斷執(zhí)行。

2.攔截器統(tǒng)一實(shí)現(xiàn)Interceptor接口叭莫,接口中fun proceed(request: Request): Response保證了鏈?zhǔn)芥溄拥讣.?dāng)然攔截器的順序是按照一定的規(guī)則排列的,逐個(gè)分析雇初。

五拢肆、RetryAndFollowUpInterceptor重試與重定向
companion object {
  private const val MAX_FOLLOW_UPS = 20
}
override fun intercept(chain: Interceptor.Chain): Response {
  val realChain = chain as RealInterceptorChain
  var request = chain.request
  val call = realChain.call
  var followUpCount = 0
  var priorResponse: Response? = null
  var newExchangeFinder = true
  var recoveredFailures = listOf<IOException>()
    while (true) {
    //....
    try {
      response = realChain.proceed(request)
      newExchangeFinder = true
    }
  }
  //....
}

1.重試攔截器規(guī)定默認(rèn)的重試次數(shù)為20次

2.以response = realChain.proceed(request)為分界點(diǎn),包括其他的攔截器靖诗,在責(zé)任鏈傳遞之前所做的工作都是前序工作郭怪,然后將request下發(fā)到下一個(gè)攔截器。

3.response = realChain.proceed(request)后的代碼邏輯為后續(xù)工作刊橘,即拿到上個(gè)攔截器的response結(jié)果鄙才,有點(diǎn)遞歸的意思,按照責(zé)任鏈的執(zhí)行一直到最后一個(gè)攔截器獲得的結(jié)果依次上拋每個(gè)攔截器處理這個(gè)response完成一些后序工作促绵。

4.當(dāng)然并不是每個(gè)請求都會(huì)走完整個(gè)鏈攒庵,如CacheInterceptor當(dāng)開啟了緩存(存在緩存)拿到了緩存的response那么之后的攔截器就不會(huì)在繼續(xù)傳遞据途。

六、BridgeInterceptor
override fun intercept(chain: Interceptor.Chain): Response {
  val userRequest = chain.request()
  val requestBuilder = userRequest.newBuilder()
  val body = userRequest.body
  if (body != null) {
    val contentType = body.contentType()
    if (contentType != null) {
      requestBuilder.header("Content-Type", contentType.toString())
    }
    val contentLength = body.contentLength()
    if (contentLength != -1L) {
      requestBuilder.header("Content-Length", contentLength.toString())
      requestBuilder.removeHeader("Transfer-Encoding")
    } else {
      requestBuilder.header("Transfer-Encoding", "chunked")
      requestBuilder.removeHeader("Content-Length")
    }
  }
  //.....
  //分界點(diǎn)叙甸,上部分為前序操作
  val networkResponse = chain.proceed(requestBuilder.build())
  //下部分代碼為拿到response的后序操作
  cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
}

1.橋接攔截器主要對請求的Hader的信息的補(bǔ)充颖医,包括內(nèi)容長度等。

2.傳遞請求到下一個(gè)鏈裆蒸,等待返回的response信息熔萧。

3.后序的操作包括Cookiegzip壓縮信息僚祷,User-Agent等信息的補(bǔ)充噪珊。

七凫乖、CacheInterceptor
override fun intercept(chain: Interceptor.Chain): Response {
  val call = chain.call()
  val cacheCandidate = cache?.get(chain.request())
  val now = System.currentTimeMillis()
  val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
  val networkRequest = strategy.networkRequest
  val cacheResponse = strategy.cacheResponse
  //......
  var networkResponse: Response? = null
  try {
    networkResponse = chain.proceed(networkRequest)
  } finally {
  // If we're crashing on I/O or otherwise, don't leak the cache body.
  if (networkResponse == null && cacheCandidate != null) {
    cacheCandidate.body?.closeQuietly()
   }
  }
  //.....
}
  • OkHttpClient的默認(rèn)構(gòu)造中是沒有開啟緩存的;internal var cache: Cache? = null,需要手動(dòng)添加年碘,但是有默認(rèn)實(shí)現(xiàn)澈缺,基于最近最少使用算法實(shí)現(xiàn)的DiskLruCache磁盤緩存:
private val client: OkHttpClient = OkHttpClient.Builder()
  .cache(Cache(
    directory = File(application.cacheDir, "http_cache"),
    // $0.05 worth of phone storage in 2020
    maxSize = 50L * 1024L * 1024L // 50 MiB
  ))
  .build()

1.緩存攔截器默認(rèn)沒有被開啟伐厌,需要在調(diào)用時(shí)指定緩存的目錄旗国,內(nèi)部基于DiskLruCache實(shí)現(xiàn)了磁盤緩存。

2.當(dāng)緩存開啟蜕琴,且命中緩存萍桌,那么鏈的調(diào)用不會(huì)再繼續(xù)向下傳遞(此時(shí)已經(jīng)拿到了response)直接進(jìn)行后序的操作。

3.如果未命中凌简,則會(huì)繼續(xù)傳遞到下一個(gè)鏈也即是ConnectInterceptor上炎。

八、ConnectInterceptor
/**
 * Opens a connection to the target server and proceeds to the next interceptor. 
 * The network might be used for the returned response, 
 * or to validate a cached response with a conditional GET.
 */
object ConnectInterceptor : Interceptor {
  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.call.initExchange(chain)
    val connectedChain = realChain.copy(exchange = exchange)
    return connectedChain.proceed(realChain.request)
  }
}
  • ConnectInterceptor的代碼量很少主要作用很清晰雏搂。

1.建立與目標(biāo)的服務(wù)器的TCP或者TCP-TLS的鏈接藕施。

2.與之前的攔截器不同,前面的攔截器的前序操作基于調(diào)用方法realChain.proceed()之前凸郑,但是ConnectInterceptor 沒有后序操作裳食,下發(fā)到下一個(gè)攔截器

九线椰、CallServerInterceptor
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
  val realChain = chain as RealInterceptorChain
  val exchange = realChain.exchange!!
  val request = realChain.request
  val requestBody = request.body
  val sentRequestMillis = System.currentTimeMillis()
  exchange.writeRequestHeaders(request)
  var invokeStartEvent = true
  var responseBuilder: Response.Builder? = null
  if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
    if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
      exchange.flushRequest()
      responseBuilder = exchange.readResponseHeaders(expectContinue = true)
      exchange.responseHeadersStart()
      invokeStartEvent = false
     }
     if (responseBuilder == null) {
       if (requestBody.isDuplex()) {
         exchange.flushRequest()
         val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
         requestBody.writeTo(bufferedRequestBody)
        } else {
         val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
         requestBody.writeTo(bufferedRequestBody)
         bufferedRequestBody.close()
        }
      } else {
        exchange.noRequestBody()
        if (!exchange.connection.isMultiplexed) {
          exchange.noNewExchangesOnConnection()
        }
      }
    } else {
      exchange.noRequestBody()
    }
    if (requestBody == null || !requestBody.isDuplex()) {
      exchange.finishRequest()
    }
    if (responseBuilder == null) {
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
      if (invokeStartEvent) {
        exchange.responseHeadersStart()
        invokeStartEvent = false
      }
    }
    var response = responseBuilder
        .request(request)
        .handshake(exchange.connection.handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build()
    var code = response.code
    if (code == 100) {
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
      if (invokeStartEvent) {
        exchange.responseHeadersStart()
      }
      response = responseBuilder
          .request(request)
          .handshake(exchange.connection.handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
      code = response.code
    }
    exchange.responseHeadersEnd(response)
    response = if (forWebSocket && code == 101) {
    response.newBuilder()
          .body(EMPTY_RESPONSE)
          .build()
    } else {
      response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build()
    }
    if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
        "close".equals(response.header("Connection"), ignoreCase = true)) {
      exchange.noNewExchangesOnConnection()
    }
    if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
      throw ProtocolException(
          "HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
    }
    return response
  }
}
  • CallServerInterceptor是整個(gè)責(zé)任鏈的尾鏈:

1.實(shí)質(zhì)上是請求與I/O操作胞谈,將請求的數(shù)據(jù)寫入到Socket中。

2.從Socket讀取響應(yīng)的數(shù)據(jù)TCP/TCP-TLS對應(yīng)的端口憨愉,對于I/O操作基于的是okio,而okhttp的高效請求同樣離不開okio的支持卿捎。

3.拿到數(shù)據(jù)reponse返回到之前包含有后序操作的攔截器配紫,但ConnectInterceptor除外,ConnectInterceptor是沒有后續(xù)操作的午阵。

整個(gè)攔截器流程圖如下:

攔截器.png
十躺孝、ReaclCall.execute()
  • 回頭看同步請求的方法:
  override fun execute(): Response {
    check(executed.compareAndSet(false, true)) { "Already Executed" }
    timeout.enter()
    callStart()
    try {
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher.finished(this)
    }
  }
  • Dispatcher中對于同步的請求僅僅只是將享扔,RealCall的實(shí)例添加到runningSyncCalls.add(call)執(zhí)行中的隊(duì)列中。
  • RealCallCall的唯一實(shí)現(xiàn)類植袍,而對于execute()方法的定義明確了惧眠,立即執(zhí)行阻塞直到response結(jié)果的返回或者error信息打斷阻塞。
  • 經(jīng)過getResponseWithInterceptorChain()的調(diào)用于个,鏈的推進(jìn)此時(shí)已經(jīng)拿到結(jié)果氛魁,那么后序的操作是什么呢?先看一個(gè)Java基礎(chǔ)try{}finally{}結(jié)構(gòu)厅篓,以及dispatcher.finished(call: RealCall)秀存。
try{
  client.dispatcher.executed(this)
  return getResponseWithInterceptorChain()
} finally{
  client.dispatcher.finished(this)
}

/** Used by [Call.execute] to signal completion. */
internal fun finished(call: RealCall) {
  finished(runningSyncCalls, call)
}

1.排除極端的情況,System.exit()或者其他羽氮,finally 塊必然執(zhí)行或链,不論發(fā)生異常與否,也不論在 finally 之前是否有return档押。

2.不管在 try 塊中是否包含 return澳盐,finally 塊總是在 return 之前執(zhí)行。

3.如果 finally 塊中有return令宿,那么 try 塊和 catch 塊中的 return就沒有執(zhí)行機(jī)會(huì)了洞就。

Tip:第二條的結(jié)論很重要,回到execute()方法掀淘,dispatcher.finished(this)在結(jié)果response結(jié)果返回之前執(zhí)行旬蟋,看finished()具體實(shí)現(xiàn)。

  /** Used by [Call.execute] to signal completion. */
  internal fun finished(call: RealCall) {
    finished(runningSyncCalls, call)
  }
  private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
      //#1
      if(!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }
    //#2
    val isRunning = promoteAndExecute()
    if (!isRunning && idleCallback != null) {
      idleCallback.run()
    }
  }

  private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()
    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      //#3
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()
                // Max capacity.
        if (runningAsyncCalls.size >= this.maxRequests) break 
        // Host max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue 
        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      //#4
      isRunning = runningCallsCount() > 0
    }
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[I]
      asyncCall.executeOn(executorService)
    }
    //#5
    return isRunning
  }
    
 @Synchronized fun runningCallsCount(): Int = runningAsyncCalls.size + runningSyncCalls.size

1.#1方法一革娄,calls.remove(call)返回為true倾贰,也即是這個(gè)同步請求被從runningSyncCalls中移除釋放;所以idleCallback為空拦惋。

2.#3很顯然asyncCall的結(jié)果為空匆浙,沒有異步請求,在看#4具體實(shí)現(xiàn)厕妖,runningSyncCallssize為1首尼。則isRunning的結(jié)果為true。idleCallback.run()不會(huì)被執(zhí)行言秸,并且idleCallback其實(shí)也是為空软能。

  • 至此,開始的例子所構(gòu)建的同步請求就被執(zhí)行完畢了举畸,阻塞拿到了結(jié)果回調(diào)查排,用于頁面展示或者其他操作。
  • 梳理整個(gè)流程如下:

1.從OkHttpClient().newCall(request).execute()開啟同步請求任務(wù)抄沮。

2.得到的RealCall對象作為Call的唯一實(shí)現(xiàn)類跋核,其中同步方法execute()是阻塞的岖瑰,調(diào)用到會(huì)立即執(zhí)行阻塞到有結(jié)果返回,或者發(fā)生錯(cuò)誤error被打斷阻塞砂代。

3.RealCall中同步execute()請求方法被執(zhí)行蹋订,而此時(shí)OkHttpClient實(shí)例中的異步任務(wù)分發(fā)器Dispatcher會(huì)將請求的實(shí)例RealCall添加到雙端隊(duì)列runningSyncCalls中去。

4.通過RealCall中的方法getResponseWithInterceptorChain()開啟請求攔截器的責(zé)任鏈刻伊,將請求逐一下發(fā)露戒,通過持有index 并自增操作,其次除ConnectInterceptor與鏈尾CallServerInterceptor其余默認(rèn)攔截器均有以chain.proceed(request)為分界點(diǎn)的前序與后序操作娃圆,拿到response后依次處理后序操作玫锋。

5.最終返回結(jié)果response之前,對進(jìn)行中的同步任務(wù)做了移除隊(duì)列的操作也即finallyclient.dispatcher.finished(this)方法讼呢,最終得到的結(jié)果response返回到客戶端撩鹿,至此整個(gè)同步請求流程就結(jié)束了。

十一悦屏、異步請求的處理enqueue()
  • 異步請求的大致流程與同步請求是差不多的节沦,但是多了一個(gè)從等待隊(duì)列到執(zhí)行隊(duì)列轉(zhuǎn)化的一個(gè)過程。
   internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
    //#1
    readyAsyncCalls.add(call)
   //Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
   //the same host.
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    //#2
    promoteAndExecute()
  }

 private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()
    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        //#3
        val asyncCall = i.next()
                // Max capacity.
        if (runningAsyncCalls.size >= this.maxRequests) break 
        // Host max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue 
        //#4
        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        //#5
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[I]
      asyncCall.executeOn(executorService)
    }
    return isRunning
  }
  • #1方法中將異步請求任務(wù)加入到readyAsyncCalls等待隊(duì)列之中础爬,#2開啟執(zhí)行任務(wù)(當(dāng)然Dispatcher維護(hù)了線程池來處理這些請求)甫贯。
  • 執(zhí)行方法promoteAndExecute()#3將異步任務(wù)從等待隊(duì)列readyAsyncCalls中取出并移除操作,#5則添加到執(zhí)行中的異步任務(wù)隊(duì)列runningAsyncCalls看蚜,如此直到所有請求全部執(zhí)行完畢叫搁。
十二、小結(jié)
  • 這里僅僅是對大概的流程簡單的作出了分析供炎,忽略了一些其他的實(shí)現(xiàn)細(xì)節(jié)渴逻,像請求緩存池、請求的協(xié)議音诫。包括本地緩存DiskLruCache的實(shí)現(xiàn)細(xì)節(jié)惨奕,以及異步請求的一些閥值的判斷處理。在精力允許的情況下還應(yīng)該仔細(xì)研究一番竭钝,畢竟從Java到現(xiàn)在的Kotlin版本okhttp經(jīng)歷了很多的迭代也更加成熟梨撞,當(dāng)然網(wǎng)絡(luò)請求這塊okhttp的地位還是無法撼動(dòng)的??。

十三香罐、文檔

Github

Square

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
禁止轉(zhuǎn)載卧波,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者。
  • 序言:七十年代末穴吹,一起剝皮案震驚了整個(gè)濱河市幽勒,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌港令,老刑警劉巖啥容,帶你破解...
    沈念sama閱讀 221,576評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異顷霹,居然都是意外死亡咪惠,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評論 3 399
  • 文/潘曉璐 我一進(jìn)店門淋淀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來遥昧,“玉大人,你說我怎么就攤上這事朵纷√砍簦” “怎么了?”我有些...
    開封第一講書人閱讀 168,017評論 0 360
  • 文/不壞的土叔 我叫張陵袍辞,是天一觀的道長鞋仍。 經(jīng)常有香客問我,道長搅吁,這世上最難降的妖魔是什么威创? 我笑而不...
    開封第一講書人閱讀 59,626評論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮谎懦,結(jié)果婚禮上肚豺,老公的妹妹穿的比我還像新娘。我一直安慰自己界拦,他們只是感情好吸申,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,625評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著享甸,像睡著了一般截碴。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上枪萄,一...
    開封第一講書人閱讀 52,255評論 1 308
  • 那天隐岛,我揣著相機(jī)與錄音,去河邊找鬼瓷翻。 笑死聚凹,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的齐帚。 我是一名探鬼主播妒牙,決...
    沈念sama閱讀 40,825評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼对妄!你這毒婦竟也來了湘今?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,729評論 0 276
  • 序言:老撾萬榮一對情侶失蹤剪菱,失蹤者是張志新(化名)和其女友劉穎摩瞎,沒想到半個(gè)月后拴签,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,271評論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡旗们,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,363評論 3 340
  • 正文 我和宋清朗相戀三年蚓哩,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片上渴。...
    茶點(diǎn)故事閱讀 40,498評論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡岸梨,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出稠氮,到底是詐尸還是另有隱情曹阔,我是刑警寧澤,帶...
    沈念sama閱讀 36,183評論 5 350
  • 正文 年R本政府宣布隔披,位于F島的核電站赃份,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏锹锰。R本人自食惡果不足惜芥炭,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,867評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望恃慧。 院中可真熱鬧园蝠,春花似錦、人聲如沸痢士。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,338評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽怠蹂。三九已至善延,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間城侧,已是汗流浹背易遣。 一陣腳步聲響...
    開封第一講書人閱讀 33,458評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留嫌佑,地道東北人豆茫。 一個(gè)月前我還...
    沈念sama閱讀 48,906評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像屋摇,于是被迫代替她去往敵國和親揩魂。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,507評論 2 359

推薦閱讀更多精彩內(nèi)容