「okhttp3 4.9.3 版本簡單解析」
一、寫在前面
關(guān)于okhttp3
的解析網(wǎng)上已經(jīng)有非常多優(yōu)秀的博文了,每每看完都覺得醍醐灌頂遮精,豁然開朗。但等不了幾天再回頭看败潦,還是跟當(dāng)初一樣陌生本冲,究其根本原因准脂,我們不過是在享受著別人的成果跟著別人的思路云閱讀源碼了一遍。okhttp
從早期的Java
版本到Kotlin
版本一直不斷優(yōu)化升級檬洞,實(shí)現(xiàn)細(xì)節(jié)上也作出了調(diào)整意狠。重讀源碼加上自身的思考能深刻的理解okhttp
的實(shí)現(xiàn)原理。
二疮胖、從基本使用說起
- 項(xiàng)目中引入依賴环戈,
Github
地址okhttp
dependencies {
//...
implementation("com.squareup.okhttp3:okhttp:4.9.3")
}
- 從一個(gè)簡單的同步
(execute)
請求入手分析整體的加載流程:
private fun clientNetWork() {
val request: Request = Request.Builder()
.url("https://www.baidu.com")
.build()
OkHttpClient().newCall(request).execute()
}
從execute()
開始,發(fā)現(xiàn)其實(shí)是一個(gè)接口中的方法(Call)
澎灸,這個(gè)很好理解根據(jù)官方的解釋院塞,Call
其實(shí)是一個(gè)待執(zhí)行的請求,并且這個(gè)請求所要的參數(shù)已經(jīng)被準(zhǔn)備好性昭;當(dāng)然既然是請求拦止,那么它是可以被取消的。其次代表單個(gè)請求與響應(yīng)流糜颠,因此不能夠被再次執(zhí)行汹族。
A call is a request that has been prepared for execution. A call can be canceled. As this object represents a single request/response pair (stream), it cannot be executed twice.
Call
接口具體的代碼實(shí)現(xiàn),重點(diǎn)關(guān)注同步執(zhí)行方法execute()
與異步請求enqueue()
:
interface Call : Cloneable {
fun request(): Request
//同步請求
@Throws(IOException::class)
fun execute(): Response
//起步請求
fun enqueue(responseCallback: Callback)
fun cancel()
fun isExecuted(): Boolean
fun isCanceled(): Boolean
fun interface Factory {
fun newCall(request: Request): Call
}
}
Call
作為接口其兴,那么具體的實(shí)現(xiàn)細(xì)節(jié)則需要看它的實(shí)現(xiàn)類顶瞒,RealCall
作為Call
的實(shí)現(xiàn)類,先找到對execute()
的重寫元旬。
-
RealCall
-是應(yīng)用程序與網(wǎng)絡(luò)之間的橋梁榴徐,看看具體有什么作用,首先分析同步請求execute()
方法的實(shí)現(xiàn):
override fun execute(): Response {
check(executed.compareAndSet(false, true)) { "Already Executed" } //#1
timeout.enter()//#2
callStart()//#3
try {
client.dispatcher.executed(this)//#4
return getResponseWithInterceptorChain()//#5
} finally {
client.dispatcher.finished(this)//#6
}
}
代碼很少匀归,逐步分析坑资,首先對同步請求進(jìn)行了檢查-判斷請求是否已經(jīng)被執(zhí)行過了;而這里使用的是并發(fā)包下的原子類CAS
樂觀鎖穆端,這里使用CAS
比較算法目的也是為提升效率袱贮。其次是超時(shí)時(shí)間的判斷,這個(gè)比較簡單体啰。在看callStart()
的具體實(shí)現(xiàn)攒巍。上代碼:
private fun callStart() {
this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
eventListener.callStart(this)
}
看名稱猜測應(yīng)該是事件
監(jiān)聽之類的,可能是包括一些信息的記錄與打印狡赐∫ひ担回到RealCall
類中,看看這個(gè)eventListener
的作用到底是什么:
internal val eventListener: EventListener = client.eventListenerFactory.create(this)
繼續(xù)向下可以知道這個(gè)EventListener
是一個(gè)抽象類枕屉,而項(xiàng)目中其唯一實(shí)現(xiàn)類為LoggingEventListener
常柄,猜測還是有依據(jù)的,繼續(xù)往下看:
abstract class EventListener {
open fun callStart(
call: Call
) {
}
//省略了其他的方法
}
實(shí)現(xiàn)類LoggingEventListener
中對此方法的具體實(shí)現(xiàn):
class LoggingEventListener private constructor(
private val logger: HttpLoggingInterceptor.Logger
) : EventListener() {
override fun callStart(call: Call) {
startNs = System.nanoTime()
logWithTime("callStart: ${call.request()}")
}
}
總結(jié)這個(gè)callStart()
的作用,當(dāng)同步請求或者異步請求被加到隊(duì)列時(shí)西潘,callStart()
會(huì)被立即執(zhí)行(在沒有達(dá)到線程限制的情況下)
記錄請求開始的時(shí)間與請求的一些信息卷玉。如下:
override fun callStart(call: Call) {
startNs = System.nanoTime()
logWithTime("callStart: ${call.request()}")
}
private fun logWithTime(message: String) {
val timeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs)
logger.log("[$timeMs ms] $message")
}
代賣第四段#4 client.dispatcher.executed(this)
,看樣子是在這里開啟執(zhí)行的喷市,可實(shí)際真是如此嘛相种?回到OkHttpClient
類中,看看這個(gè)分發(fā)器dispatcher
到底是什么品姓。
@get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher
具體實(shí)現(xiàn)類Dispatcher
代碼(保留重要代碼):
class Dispatcher constructor() {
/** Ready async calls in the order they'll be run. */
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private val runningSyncCalls = ArrayDeque<RealCall>()
constructor(executorService: ExecutorService) : this() {
this.executorServiceOrNull = executorService
}
@get:Synchronized var maxRequests = 64
set(maxRequests) {
require(maxRequests >= 1) { "max < 1: $maxRequests" }
synchronized(this) {
field = maxRequests
}
promoteAndExecute()
}
@get:Synchronized var maxRequestsPerHost = 5
set(maxRequestsPerHost) {
require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
synchronized(this) {
field = maxRequestsPerHost
}
promoteAndExecute()
}
//異步請求
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)
//Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to the same host.
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
//同步請求
/** Used by [Call.execute] to signal it is in-flight. */
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
//Max capacity.
if (runningAsyncCalls.size >= this.maxRequests) break
//Host max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[I]
asyncCall.executeOn(executorService)
}
return isRunning
}
}
根據(jù)注釋的信息可以知道寝并,Dispatcher
是處理異步請求的執(zhí)行的策略,當(dāng)然開發(fā)可以實(shí)現(xiàn)自己的策略腹备。
Policy on when async requests are executed.
Each dispatcher uses an ExecutorService to run calls internally. If you supply your own executor, it should be able to run the configured maximum number of calls concurrently.
知道了Dispatcher
的作用衬潦,再回到client.dispatcher.executed(this)
,也即:
/** Used by [Call.execute] to signal it is in-flight. */
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
結(jié)合execute()
與Dispatcher
分析
-
Dispatcher
主要是對異步請求的調(diào)度作用植酥,內(nèi)部實(shí)現(xiàn)了線程池镀岛,對請求進(jìn)行分發(fā)執(zhí)行。 - 包含三個(gè)雙端隊(duì)列分別為等待的異步隊(duì)列
readyAsyncCalls
友驮、執(zhí)行中的隊(duì)列runningAsyncCalls
漂羊、執(zhí)行中的同步隊(duì)列runningSyncCalls
。這里使用ArrayDeque
卸留,為什么不使用LinkedList
有興趣的可以思考一下走越。 - 當(dāng)請求為同步請求時(shí),內(nèi)部執(zhí)行
execute()
將RealCall
添加到runningSyncCalls
隊(duì)列中艾猜。
到這里請求其實(shí)還沒有真正的執(zhí)行买喧,只是在做一些前期的工作捻悯,回到Call
接口中看看對方法同步請求方法execute()
的說明:同步請求可以立即執(zhí)行匆赃,阻塞直到返回正確的結(jié)果,或者報(bào)錯(cuò)結(jié)束今缚。
Invokes the request immediately, and blocks until the response can be processed or is in error.
到#4
步執(zhí)行后算柳,return getResponseWithInterceptorChain() //#5
這個(gè)方法才是請求一步步推進(jìn)的核心。也是okhttp
網(wǎng)絡(luò)請求責(zé)任鏈的核心模塊姓言。
三瞬项、鏈的開啟-getResponseWithInterceptorChain()
- getResponseWithInterceptorChain()具體實(shí)現(xiàn)
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
//Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
分析getResponseWithInterceptorChain()
方法之前有必要看看OkHttpClient
的構(gòu)造參數(shù),使用的Builder
模式何荚,參數(shù)很多囱淋,可配置化的東西很多,精簡一下主要關(guān)注幾個(gè)參數(shù):
class Builder constructor() {
//異步請求任務(wù)調(diào)度器
internal var dispatcher: Dispatcher = Dispatcher()
//鏈接池
internal var connectionPool: ConnectionPool = ConnectionPool()
//自定義攔截器
internal val interceptors: MutableList<Interceptor> = mutableListOf()
//自定義網(wǎng)絡(luò)攔截器
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
//緩存
internal var cache: Cache? = null
//代理
internal var proxy: Proxy? = null
//鏈接協(xié)議
internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
//.......
}
//添加自定義攔截器的方法
fun addInterceptor(interceptor: Interceptor) = apply {
interceptors += interceptor
}
//添加自定義網(wǎng)絡(luò)攔截器的方法
fun addNetworkInterceptor(interceptor: Interceptor) = apply {
networkInterceptors += interceptor
}
到這里有個(gè)疑問餐塘,這個(gè)添加自定義攔截器與添加自定義網(wǎng)絡(luò)攔截器有什么區(qū)別呢妥衣?方法看上去是差不多的,查看官方的說明可以發(fā)現(xiàn)一些細(xì)節(jié)。文檔中解釋了Application Interceptor
與Network Interceptors
的細(xì)微差別税手。先回到RealCall
中查看getResponseWithInterceptorChain()
是如何對攔截器結(jié)合組裝的:
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors //#1
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors//#2
}
interceptors += CallServerInterceptor(forWebSocket)
}
看#1
與#2
分別對應(yīng)添加的自定義攔截器與自定義網(wǎng)絡(luò)攔截器的位置蜂筹,自定義攔截器是攔截器鏈的鏈頭,而自定義網(wǎng)絡(luò)攔截器在ConnectInterceptor
攔截器與CallServerInterceptor
攔截器之間被添加芦倒∫张玻總結(jié)一下:
自定義攔截器在整個(gè)攔截器鏈的頭部,做一些請求的之前的準(zhǔn)備工作兵扬,包括一些
Log
的信息打印麻裳,如LoggingInterceptor
。自定義網(wǎng)絡(luò)攔截器添加在
ConnectInterceptor
與CallServerInterceptor
之間器钟,可以執(zhí)行兩次掂器,并且獲得的信息更多,包括原網(wǎng)址的信息和重定向后網(wǎng)址的信息俱箱。
Don’t need to worry about intermediate responses like redirects and retries.
Are always invoked once, even if the HTTP response is served from the cache.
Observe the application’s original intent. Unconcerned with OkHttp-injected headers like
If-None-Match
.Permitted to short-circuit and not call
Chain.proceed()
.Permitted to retry and make multiple calls to
Chain.proceed()
.Can adjust Call timeouts using withConnectTimeout, withReadTimeout, withWriteTimeout.
Able to operate on intermediate responses like redirects and retries.
Not invoked for cached responses that short-circuit the network.
Observe the data just as it will be transmitted over the network.
Access to the
Connection
that carries the request.
綜上可以得出整個(gè)鏈的順序結(jié)構(gòu)国瓮,如果都包含自定義攔截器與自定義網(wǎng)絡(luò)攔截器,則為自定義攔截器
->RetryAndFollowUpInterceptor
->BridgeInterceptor
->CacheInterceptor
->ConnectInterceptor
->自定義網(wǎng)絡(luò)攔截器
->CallServerInterceptor
狞谱;那么鏈?zhǔn)侨绾伟凑枕樞蛞来螆?zhí)行的呢乃摹?okhttp
在這里設(shè)計(jì)比較精妙,在構(gòu)造RealInterceptorChain
對象時(shí)帶入index
信息跟衅,這個(gè)index
記錄的就是單個(gè)攔截器鏈的位置信息孵睬,而RealInterceptorChain.proceed(request: Request)
通過index++
自增一步步執(zhí)行責(zé)任鏈一直到鏈尾。
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
//記錄了攔截器對應(yīng)的index通過對
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
val response = chain.proceed(originalRequest)
四伶跷、proceed鏈的推進(jìn)
-
RealInterceptorChain
中方法proceed()
方法的具體實(shí)現(xiàn)
@Throws(IOException::class)
override fun proceed(request: Request): Response {
check(index < interceptors.size)
calls++
if (exchange != null) {
check(exchange.finder.sameHostAndPort(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
check(calls == 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
}
// Call the next interceptor in the chain.
val next = copy(index = index + 1, request = request)
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
//.....
return response
}
簡單的分析推進(jìn)過程:
1.RealInterceptorChain的構(gòu)造參數(shù)中攜帶了index的信息掰读,index++自增通過proceed方法不斷執(zhí)行。
2.攔截器統(tǒng)一實(shí)現(xiàn)Interceptor接口叭莫,接口中
fun proceed(request: Request): Response
保證了鏈?zhǔn)芥溄拥讣.?dāng)然攔截器的順序是按照一定的規(guī)則排列的,逐個(gè)分析雇初。
五拢肆、RetryAndFollowUpInterceptor重試與重定向
companion object {
private const val MAX_FOLLOW_UPS = 20
}
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
var request = chain.request
val call = realChain.call
var followUpCount = 0
var priorResponse: Response? = null
var newExchangeFinder = true
var recoveredFailures = listOf<IOException>()
while (true) {
//....
try {
response = realChain.proceed(request)
newExchangeFinder = true
}
}
//....
}
1.重試攔截器規(guī)定默認(rèn)的重試次數(shù)為20次
2.以
response = realChain.proceed(request)
為分界點(diǎn),包括其他的攔截器靖诗,在責(zé)任鏈傳遞之前所做的工作都是前序工作郭怪,然后將request下發(fā)到下一個(gè)攔截器。3.
response = realChain.proceed(request)
后的代碼邏輯為后續(xù)工作刊橘,即拿到上個(gè)攔截器的response
結(jié)果鄙才,有點(diǎn)遞歸的意思,按照責(zé)任鏈的執(zhí)行一直到最后一個(gè)攔截器獲得的結(jié)果依次上拋每個(gè)攔截器處理這個(gè)response
完成一些后序工作促绵。4.當(dāng)然并不是每個(gè)請求都會(huì)走完整個(gè)鏈攒庵,如
CacheInterceptor
當(dāng)開啟了緩存(存在緩存)拿到了緩存的response
那么之后的攔截器就不會(huì)在繼續(xù)傳遞据途。
六、BridgeInterceptor
override fun intercept(chain: Interceptor.Chain): Response {
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
val body = userRequest.body
if (body != null) {
val contentType = body.contentType()
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString())
}
val contentLength = body.contentLength()
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}
//.....
//分界點(diǎn)叙甸,上部分為前序操作
val networkResponse = chain.proceed(requestBuilder.build())
//下部分代碼為拿到response的后序操作
cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
}
1.橋接攔截器主要對請求的Hader的信息的補(bǔ)充颖医,包括內(nèi)容長度等。
2.傳遞請求到下一個(gè)鏈裆蒸,等待返回的
response
信息熔萧。3.后序的操作包括
Cookie
、gzip
壓縮信息僚祷,User-Agent
等信息的補(bǔ)充噪珊。
七凫乖、CacheInterceptor
override fun intercept(chain: Interceptor.Chain): Response {
val call = chain.call()
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse
//......
var networkResponse: Response? = null
try {
networkResponse = chain.proceed(networkRequest)
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
//.....
}
-
OkHttpClient
的默認(rèn)構(gòu)造中是沒有開啟緩存的;internal var cache: Cache? = null
,需要手動(dòng)添加年碘,但是有默認(rèn)實(shí)現(xiàn)澈缺,基于最近最少使用算法實(shí)現(xiàn)的DiskLruCache
磁盤緩存:
private val client: OkHttpClient = OkHttpClient.Builder()
.cache(Cache(
directory = File(application.cacheDir, "http_cache"),
// $0.05 worth of phone storage in 2020
maxSize = 50L * 1024L * 1024L // 50 MiB
))
.build()
1.緩存攔截器默認(rèn)沒有被開啟伐厌,需要在調(diào)用時(shí)指定緩存的目錄旗国,內(nèi)部基于
DiskLruCache
實(shí)現(xiàn)了磁盤緩存。2.當(dāng)緩存開啟蜕琴,且命中緩存萍桌,那么鏈的調(diào)用不會(huì)再繼續(xù)向下傳遞(此時(shí)已經(jīng)拿到了response)直接進(jìn)行后序的操作。
3.如果未命中凌简,則會(huì)繼續(xù)傳遞到下一個(gè)鏈也即是
ConnectInterceptor
上炎。
八、ConnectInterceptor
/**
* Opens a connection to the target server and proceeds to the next interceptor.
* The network might be used for the returned response,
* or to validate a cached response with a conditional GET.
*/
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
}
-
ConnectInterceptor
的代碼量很少主要作用很清晰雏搂。
1.建立與目標(biāo)的服務(wù)器的
TCP
或者TCP-TLS
的鏈接藕施。2.與之前的攔截器不同,前面的攔截器的前序操作基于調(diào)用方法
realChain.proceed()
之前凸郑,但是ConnectInterceptor
沒有后序操作裳食,下發(fā)到下一個(gè)攔截器。
九线椰、CallServerInterceptor
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()
exchange.writeRequestHeaders(request)
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
exchange.noRequestBody()
if (!exchange.connection.isMultiplexed) {
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}
if (requestBody == null || !requestBody.isDuplex()) {
exchange.finishRequest()
}
if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
invokeStartEvent = false
}
}
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
if (code == 100) {
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
}
response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}
exchange.responseHeadersEnd(response)
response = if (forWebSocket && code == 101) {
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()
}
if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
throw ProtocolException(
"HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
}
return response
}
}
-
CallServerInterceptor
是整個(gè)責(zé)任鏈的尾鏈:
1.實(shí)質(zhì)上是請求與
I/O
操作胞谈,將請求的數(shù)據(jù)寫入到Socket
中。2.從
Socket
讀取響應(yīng)的數(shù)據(jù)TCP/TCP-TLS對應(yīng)的端口憨愉,對于I/O
操作基于的是okio
,而okhttp
的高效請求同樣離不開okio
的支持卿捎。3.拿到數(shù)據(jù)
reponse
返回到之前包含有后序操作的攔截器配紫,但ConnectInterceptor
除外,ConnectInterceptor
是沒有后續(xù)操作的午阵。
整個(gè)攔截器流程圖如下:
十躺孝、ReaclCall.execute()
- 回頭看同步請求的方法:
override fun execute(): Response {
check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter()
callStart()
try {
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}
- 在Dispatcher中對于同步的請求僅僅只是將享扔,
RealCall
的實(shí)例添加到runningSyncCalls.add(call)
執(zhí)行中的隊(duì)列中。 -
RealCall
是Call
的唯一實(shí)現(xiàn)類植袍,而對于execute()
方法的定義明確了惧眠,立即執(zhí)行并阻塞直到response
結(jié)果的返回或者error
信息打斷阻塞。 - 經(jīng)過getResponseWithInterceptorChain()的調(diào)用于个,鏈的推進(jìn)此時(shí)已經(jīng)拿到結(jié)果氛魁,那么后序的操作是什么呢?先看一個(gè)
Java
基礎(chǔ)try{}finally{}結(jié)構(gòu)厅篓,以及dispatcher.finished(call: RealCall)
秀存。
try{
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
} finally{
client.dispatcher.finished(this)
}
/** Used by [Call.execute] to signal completion. */
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
1.排除極端的情況,
System.exit()
或者其他羽氮,finally 塊必然執(zhí)行或链,不論發(fā)生異常與否,也不論在 finally 之前是否有return
档押。2.不管在 try 塊中是否包含 return澳盐,finally 塊總是在 return 之前執(zhí)行。
3.如果 finally 塊中有return令宿,那么 try 塊和 catch 塊中的 return就沒有執(zhí)行機(jī)會(huì)了洞就。
Tip:第二條的結(jié)論很重要,回到
execute()
方法掀淘,dispatcher.finished(this)
在結(jié)果response
結(jié)果返回之前執(zhí)行旬蟋,看finished()
具體實(shí)現(xiàn)。
/** Used by [Call.execute] to signal completion. */
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
//#1
if(!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
//#2
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
//#3
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
// Max capacity.
if (runningAsyncCalls.size >= this.maxRequests) break
// Host max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
//#4
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[I]
asyncCall.executeOn(executorService)
}
//#5
return isRunning
}
@Synchronized fun runningCallsCount(): Int = runningAsyncCalls.size + runningSyncCalls.size
1.
#1
方法一革娄,calls.remove(call)
返回為true倾贰,也即是這個(gè)同步請求被從runningSyncCalls
中移除釋放;所以idleCallback
為空拦惋。2.
#3
很顯然asyncCall
的結(jié)果為空匆浙,沒有異步請求,在看#4
具體實(shí)現(xiàn)厕妖,runningSyncCalls
的size
為1首尼。則isRunning
的結(jié)果為true。idleCallback.run()不會(huì)被執(zhí)行言秸,并且idleCallback
其實(shí)也是為空软能。
- 至此,開始的例子所構(gòu)建的同步請求就被執(zhí)行完畢了举畸,阻塞拿到了結(jié)果回調(diào)查排,用于頁面展示或者其他操作。
- 梳理整個(gè)流程如下:
1.從OkHttpClient().newCall(request).execute()開啟同步請求任務(wù)抄沮。
2.得到的RealCall對象作為Call的唯一實(shí)現(xiàn)類跋核,其中同步方法execute()是阻塞的岖瑰,調(diào)用到會(huì)立即執(zhí)行阻塞到有結(jié)果返回,或者發(fā)生錯(cuò)誤error被打斷阻塞砂代。
3.RealCall中同步execute()請求方法被執(zhí)行蹋订,而此時(shí)OkHttpClient實(shí)例中的異步任務(wù)分發(fā)器Dispatcher會(huì)將請求的實(shí)例RealCall添加到雙端隊(duì)列runningSyncCalls中去。
4.通過RealCall中的方法getResponseWithInterceptorChain()開啟請求攔截器的責(zé)任鏈刻伊,將請求逐一下發(fā)露戒,通過持有index 并自增操作,其次除ConnectInterceptor與鏈尾CallServerInterceptor其余默認(rèn)攔截器均有以chain.proceed(request)為分界點(diǎn)的前序與后序操作娃圆,拿到response后依次處理后序操作玫锋。
5.最終返回結(jié)果response之前,對進(jìn)行中的同步任務(wù)做了移除隊(duì)列的操作也即finally中client.dispatcher.finished(this)方法讼呢,最終得到的結(jié)果response返回到客戶端撩鹿,至此整個(gè)同步請求流程就結(jié)束了。
十一悦屏、異步請求的處理enqueue()
- 異步請求的大致流程與同步請求是差不多的节沦,但是多了一個(gè)從等待隊(duì)列到執(zhí)行隊(duì)列轉(zhuǎn)化的一個(gè)過程。
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
//#1
readyAsyncCalls.add(call)
//Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
//the same host.
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
//#2
promoteAndExecute()
}
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
//#3
val asyncCall = i.next()
// Max capacity.
if (runningAsyncCalls.size >= this.maxRequests) break
// Host max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue
//#4
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
//#5
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[I]
asyncCall.executeOn(executorService)
}
return isRunning
}
- #1方法中將異步請求任務(wù)加入到readyAsyncCalls等待隊(duì)列之中础爬,#2開啟執(zhí)行任務(wù)(當(dāng)然Dispatcher維護(hù)了線程池來處理這些請求)甫贯。
- 執(zhí)行方法promoteAndExecute()中#3將異步任務(wù)從等待隊(duì)列readyAsyncCalls中取出并移除操作,#5則添加到執(zhí)行中的異步任務(wù)隊(duì)列runningAsyncCalls看蚜,如此直到所有請求全部執(zhí)行完畢叫搁。
十二、小結(jié)
- 這里僅僅是對大概的流程簡單的作出了分析供炎,忽略了一些其他的實(shí)現(xiàn)細(xì)節(jié)渴逻,像請求緩存池、請求的協(xié)議音诫。包括本地緩存DiskLruCache的實(shí)現(xiàn)細(xì)節(jié)惨奕,以及異步請求的一些閥值的判斷處理。在精力允許的情況下還應(yīng)該仔細(xì)研究一番竭钝,畢竟從Java到現(xiàn)在的Kotlin版本okhttp經(jīng)歷了很多的迭代也更加成熟梨撞,當(dāng)然網(wǎng)絡(luò)請求這塊okhttp的地位還是無法撼動(dòng)的??。