1、創(chuàng)建Client
Builder 是OkHttpClient的一個內(nèi)部類停士,使用的是構(gòu)建者模式
val client = OkHttpClient.Builder().connectTimeout(5000, TimeUnit.MILLISECONDS).build()
// Builder
class Builder constructor() {
internal var dispatcher: Dispatcher = Dispatcher() // 創(chuàng)建分發(fā)器對象
...
}
2挖帘、構(gòu)建Request
也就是構(gòu)建請求報文信息,請求的url,header等信息恋技,也是用的Builder模式
val request = Request.Builder().url(url).build()
3拇舀、創(chuàng)建Call 對象,可以把Call 對象當成是Request 和 Response 的中間橋梁蜻底,
Call 是一個接口骄崩,真正的實現(xiàn)類是RealCall舌涨。
val call = client.newCall(request)
// OkHttpClient.newCall
override fun newCall(request: Request): Call {
return RealCall.newRealCall(this, request, forWebSocket = false)
}
// RealCall.newRealCall
fun newRealCall(
client: OkHttpClient,
originalRequest: Request,
forWebSocket: Boolean
): RealCall {
// Safely publish the Call instance to the EventListener.
return RealCall(client, originalRequest, forWebSocket).apply {
transmitter = Transmitter(client, this)
}
}
}
同步請求:
val response = call.execute()
// RealCall.execute
override fun execute(): Response {
// 一個RealCall 只能執(zhí)行一次
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
}
transmitter.timeoutEnter()
transmitter.callStart()
try {
client.dispatcher.executed(this)
// 通過攔截器鏈獲取Response
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}
// Dispatcher.executed
// 把call 添加到runningSyncCalls 隊列中
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
// Dispatcher.finished
internal fun finished(call: AsyncCall) {
call.callsPerHost().decrementAndGet()
finished(runningAsyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
// 同步隊列中移除這個call
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
// 查看是否還有請求诫龙,如有再繼續(xù)執(zhí)行
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
異步請求:
val response2 = call.enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
TODO("Not yet implemented")
}
override fun onResponse(call: Call, response: Response) {
TODO("Not yet implemented")
}
})
// ReaCall.enqueue
override fun enqueue(responseCallback: Callback) {
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
}
transmitter.callStart()
// 將new 一個AsyncCall驹饺,AsyncCall 是一個Runnable,是RealCall 的一個內(nèi)部類
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
// Dispatcher.enqueue
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
// 在等待隊列中添加AsyncCall
readyAsyncCalls.add(call)
if (!call.get().forWebSocket) {
val existingCall = findExistingCallWithHost(call.host())
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
// Dispatcher.promoteAndExecute
private fun promoteAndExecute(): Boolean {
assert(!Thread.holdsLock(this))
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
// 判斷異步運行隊列是否大于等于 64 且同主機請求個數(shù) 是否大于等于5
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost().incrementAndGet()
// 條件成立泌辫,則將等到請求隊列的請求 readyAsyncCalls 添加到 executableCalls 和 runningAsyncCalls 中
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
// 將剛剛添加到runningAsyncCalls 中的請求蜓堕,放在線程池中執(zhí)行
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
// Dispatcher.executorService
// 異步請求線程池
// 核心線程是為0,這樣沒有任務的時候苹享,就可以線程全部關(guān)掉阳谍, 最大線程數(shù)是無限大采记,其實不是的窿春,
// 因為異步運行隊列的最大個數(shù)限制在了64
// 超時時間的是60秒
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
}
return executorServiceOrNull!!
}
// AsyncCall.enqueue
fun executeOn(executorService: ExecutorService) {
assert(!Thread.holdsLock(client.dispatcher))
var success = false
try {
// 執(zhí)行AsyncCall 里面的run 方法
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
transmitter.noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
// AsyncCall.run
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
transmitter.timeoutEnter()
try {
// 最后調(diào)用 RealCall的 getResponseWithInterceptorChain 獲取請求結(jié)果
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} finally {
client.dispatcher.finished(this)
}
}
}
同步請求和異步請求都是從攔截器鏈中獲取結(jié)果拉一,采用的是責任鏈模式:
@Throws(IOException::class)
fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (transmitter.isCanceled) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw transmitter.noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null)
}
}
}