之前根據(jù)Okhttp使用流程,逐塊看了源碼內(nèi)的相關(guān)內(nèi)容介紹。現(xiàn)在去看同步與異步之間的差異
val mOk = OkHttpClient()
val request = Request.Builder()
.url("請求地址")
.get()//請求方式
.build()
val call = mOk.newCall(request)
同步分析:
call.execute()
call.execute()開啟同步請求,返回Response,點(diǎn)進(jìn)去看看
override fun execute(): Response {
check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter()
callStart()
try {
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}
-
timeout.enter()
衩辟,內(nèi)部代碼如下:如果沒有超時(shí)也沒有結(jié)束? 不允許進(jìn)入,阻塞
fun enter() {
val timeoutNanos = timeoutNanos()
val hasDeadline = hasDeadline()
if (timeoutNanos == 0L && !hasDeadline) {
return
}
scheduleTimeout(this, timeoutNanos, hasDeadline)
}
- 事件偵聽器EventListener回調(diào)
callStart()
private fun callStart() {
this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
eventListener.callStart(this)
}
- 調(diào)用
client.dispatcher.executed(this)}
加入到隊(duì)列中由Dispatcher統(tǒng)一管理波附,之前分析過Dispatcher作用
/** Used by [Call.execute] to signal it is in-flight. */
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
- 構(gòu)建一個(gè)攔截器鏈
getResponseWithInterceptorChain()
返回Response
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors //自定義
interceptors += RetryAndFollowUpInterceptor(client) //錯(cuò)誤和重定向
interceptors += BridgeInterceptor(client.cookieJar) //橋梁:應(yīng)用程序過渡到網(wǎng)絡(luò)
interceptors += CacheInterceptor(client.cache) //緩存
interceptors += ConnectInterceptor //連接
if (!forWebSocket) {
interceptors += client.networkInterceptors //網(wǎng)絡(luò)
}
interceptors += CallServerInterceptor(forWebSocket) //對服務(wù)器進(jìn)行網(wǎng)絡(luò)調(diào)用
//承載整個(gè)攔截器鏈的具體攔截器鏈
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
//返回response
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
異步分析:
call.enqueue(object : Callback {
override fun onResponse(call: Call, response: Response) {
Log.e(TAG, "請求成功")
}
override fun onFailure(call: Call, e: IOException) {
Log.e(TAG, "請求失敗")
}
})
異步執(zhí)行是通過call.enqueue(responseCallback: Callback)來執(zhí)行艺晴,點(diǎn)進(jìn)去查看
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }
callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
- 事件偵聽器EventListener回調(diào)
callStart()
和同步請求一樣 - 調(diào)用
client.dispatcher.enqueue(AsyncCall(responseCallback))
并傳入了一個(gè)實(shí)例AsyncCall
inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
@Volatile var callsPerHost = AtomicInteger(0)
private set
fun reuseCallsPerHostFrom(other: AsyncCall) {
this.callsPerHost = other.callsPerHost
}
val host: String
get() = originalRequest.url.host
val request: Request
get() = originalRequest
val call: RealCall
get() = this@RealCall
/**
* Attempt to enqueue this async call on [executorService]. This will attempt to clean up
* if the executor has been shut down by reporting the call as failed.
*/
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
}
AsyncCall繼承了Runnable ,所以具體的請求流程都在run()
里面進(jìn)行處理,和同步請求流程一樣調(diào)用timeout.enter()
最后也會構(gòu)建一個(gè)攔截鏈getResponseWithInterceptorChain()
返回Response,成功回調(diào) fun onResponse(call: Call, response: Response)
,失敗回調(diào)fun onFailure(call: Call, e: IOException)
叶雹〔萍ⅲ回過頭來繼續(xù)看client.dispatcher.enqueue
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
這里將AsyncCall加入到了準(zhǔn)備執(zhí)行的隊(duì)列(readyAsyncCalls.add(call)
)中,往下看if里面的邏輯折晦,首先是findExistingCallWithHost(host: String)
方法
private fun findExistingCallWithHost(host: String): AsyncCall? {
for (existingCall in runningAsyncCalls) {
if (existingCall.host == host) return existingCall
}
for (existingCall in readyAsyncCalls) {
if (existingCall.host == host) return existingCall
}
return null
}
在這個(gè)方法里面他主要在查找隊(duì)列中已經(jīng)存在的host并返回钥星,回調(diào)asyncCall.reuseCallsPerHostFrom
使其共享對同一主機(jī)的現(xiàn)有運(yùn)行調(diào)用的AtomicInteger,再回到異步enqueue(call: AsyncCall)方法中满着,看最后一步調(diào)用promoteAndExecute()
方法
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
再同步代碼塊內(nèi)對readyAsyncCalls
隊(duì)列進(jìn)行迭代谦炒,將符合要求的條件從隊(duì)列中移除添加到runningAsyncCalls
隊(duì)列中,不符合的話繼續(xù)待在readyAsyncCalls
等待執(zhí)行风喇,最后調(diào)用asyncCall.executeOn(executorService)
放入到線程中執(zhí)行
總結(jié):
- 同步請求:發(fā)送一個(gè)請求后需要等待返回宁改,才能發(fā)送下一個(gè)請求。
- 異步請求:發(fā)送一個(gè)請求后不需要等待返回魂莫,可以繼續(xù)發(fā)送还蹲,因?yàn)閮?nèi)部有兩個(gè)隊(duì)列,等待執(zhí)行(
readyAsyncCalls
)和執(zhí)行中(runningAsyncCalls
),加入了AtomicInteger和線程池支持高并發(fā) - Dispatcher:同步谜喊、異步都由Dispatcher進(jìn)行統(tǒng)一管理