OkHttp現(xiàn)在幾乎已經(jīng)占據(jù)了所有網(wǎng)絡(luò)請求 了解其內(nèi)部原理可以更好的進行擴展妖啥、封裝和優(yōu)化
我們今天分析一下OkHttp源碼 因為流程比較多 所以分為兩篇(請求和響應(yīng))來分析 Okio本文暫時不涉及 后面可能會更新一篇Okio的文章 版本基于
4.5.0-RC1
請求方式
fun load() {
//1.創(chuàng)建請求(包含url,method,headers,body)
val request = Request
.Builder()
.url("")
.build()
//2.創(chuàng)建OkHttpClient (包含分發(fā)器磕谅、攔截器柒桑、DNS等)
val okHttpClient = OkHttpClient.Builder().build()
//3.創(chuàng)建Call(用于調(diào)用請求)
val newCall = okHttpClient.newCall(request)
//4.通過異步請求數(shù)據(jù)
newCall.enqueue(object :Callback{
override fun onFailure(call: Call, e: IOException) {}
override fun onResponse(call: Call, response: Response) {}
})
//4.通過同步請求數(shù)據(jù)
val response = newCall.execute()
}
我們會按照順序來分析一下請求的流程
前面1,2,3步很多文章已經(jīng)分析過很多遍了 也比較簡單 同學(xué)們可以自己看一下 我們就不再贅述 我們直接看第四步進入今天的主要流程
Okhttp請求分為同步方式和異步方式 不過最終都是殊途同歸 我們以異步的方式分析一下請求流程
enqueue()
話不多說 先看一眼代碼
RealCall.enqueue()->
Dispatcher.enqueue()->
Dispatcher.promoteAndExecute()->
RealCall.executeOn()
override fun enqueue(responseCallback: Callback) {
synchronized(this) {
//檢查是否已經(jīng)開始運行
check(!executed) { "Already Executed" }
executed = true
}
callStart()
//封裝AsyncCall對象 并放入隊列中等待執(zhí)行
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
//放入等待隊列
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
//推進執(zhí)行
promoteAndExecute()
}
//將readyAsyncCalls中合格的請求過渡升級到runningAsyncCalls中
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.//運行最大64
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.每個主機最大5
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
上面的流程主要是將我們的異步任務(wù)放入隊列中 并且將可以運行的任務(wù)開啟運行
RealCall.executeOn()
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
//將當(dāng)前Runnable放到線程池中運行
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
//終于到這篇文章的重頭戲了
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
}
我們知道OkHttp采用了攔截鏈模式 看一下何為攔截器模式 借用一下大佬的圖(懶得畫圖了??)
我們可以看到 請求鏈會以鏈的形式調(diào)用下去 直到到鏈尾或者return response
getResponseWithInterceptorChain()
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
//加入我們自己的攔截器
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
//如果不是websocket的話 加入網(wǎng)絡(luò)攔截器
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
//開始攔截鏈調(diào)用
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
@Throws(IOException::class)
override fun proceed(request: Request): Response {
check(index < interceptors.size)
calls++
if (exchange != null) {
check(exchange.finder.sameHostAndPort(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
check(calls == 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
}
// Call the next interceptor in the chain.
//循環(huán)拿取下一個攔截器
val next = copy(index = index + 1, request = request)
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
//這里會調(diào)用攔截器的方法 接下來我們會分析各個攔截器的作用
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
if (exchange != null) {
check(index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
}
check(response.body != null) { "interceptor $interceptor returned a response with no body" }
return response
}
我們看到攔截器的鏈路模式 其實就是遍歷調(diào)用各攔截器 對Request進行作用 直到return response
我們來逐個分析一下各個攔截器的作用 本文只分析處理Request,response的處理會在下文講解
RetryAndFollowUpInterceptor
作用:處理錯誤 重定向 所以在請求的過程中 沒有做太多事情 下文會分析如何處理錯誤 重定向
這個攔截器在請求的過程中幾乎沒做什么事情 只做了一件事 就是創(chuàng)建ExchangeFinder 這個對象在后面的ConnectInterceptor用來生成exchange對象
fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) {
check(interceptorScopedExchange == null)
check(exchange == null) {
"cannot make a new request because the previous response is still open: " +
"please call response.close()"
}
if (newExchangeFinder) {
this.exchangeFinder = ExchangeFinder(
connectionPool,
createAddress(request.url),
this,
eventListener
)
}
}
BridgeInterceptor
作用:添加必要的請求頭信息呛踊、gzip處理等
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val userRequest = chain.request()
val requestBuilder = userRequest.newBuilder()
val body = userRequest.body
if (body != null) {
val contentType = body.contentType()
if (contentType != null) {
//添加contentType
requestBuilder.header("Content-Type", contentType.toString())
}
val contentLength = body.contentLength()
if (contentLength != -1L) {
requestBuilder.header("Content-Length", contentLength.toString())
requestBuilder.removeHeader("Transfer-Encoding")
} else {
requestBuilder.header("Transfer-Encoding", "chunked")
requestBuilder.removeHeader("Content-Length")
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", userRequest.url.toHostHeader())
}
//默認keep-Alive
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive")
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
//傳輸流的壓縮方式 默認gzip方式
var transparentGzip = false
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true
requestBuilder.header("Accept-Encoding", "gzip")
}
val cookies = cookieJar.loadForRequest(userRequest.url)
//添加cookie
if (cookies.isNotEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies))
}
//默認UA
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", userAgent)
}
val networkResponse = chain.proceed(requestBuilder.build())
......handle response
}
CacheInterceptor
okhttp默認的緩存機制會加快響應(yīng)流程 我們看一下緩存策略 首先 我們要解釋幾個變量
//緩存策略類
class CacheStrategy{
//如果我們需要請求網(wǎng)絡(luò) 則networkRequest不為null 否則為null
val networkRequest: Request?
//請求的返回或者請求的響應(yīng) 如果無法使用緩存(一般是過期或者無緩存 則為null)
val cacheResponse: Response?
}
上面兩個變量是緩存策略中比較重要的兩個變量 我們會根據(jù)這兩個變量來選擇是否命中緩存
先看一下結(jié)論
networkRequest\cacheResponse | cacheResponse is null | cacheResponse is not null |
---|---|---|
networkRequest is null | ① 返回HTTP_GATEWAY_TIMEOUT 504錯誤 | ② 直接使用緩存 |
networkRequest is not null | ③ 進行網(wǎng)絡(luò)請求 并且緩存新response | ④ 先請求 根據(jù)code(304) 判斷是否需要重新request |
再看一下intercept()
方法 可以對照上面兩個變量的解釋和表格來觀看
override fun intercept(chain: Interceptor.Chain): Response {
//獲取緩存 如果我們配置了緩存 那么會去查找是否存在cache
//這里需要注意的一點是 okhttp默認并不會配置緩存 只是規(guī)范了一套緩存策略 我們可以自己通過OkHttpClient.Builder 的 cache 方法設(shè)置
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
//這里的策略 會自動判斷是否使用緩存 是否存在緩存
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
//這里就是上面我們解釋過的兩個變量
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse
cache?.trackResponse(strategy)
//LruCache沒有hit cache 并且網(wǎng)絡(luò)緩存不可用
if (cacheCandidate != null && cacheResponse == null) {
// The cache candidate wasn't applicable. Close it.
//關(guān)閉cacheCandidate.body
cacheCandidate.body?.closeQuietly()
}
//按照我們上面對緩存的解釋 不允許使用網(wǎng)絡(luò)請求 并且當(dāng)前沒有緩存 對應(yīng)表格中①
// If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
}
//不允許使用網(wǎng)絡(luò) 僅直接使用緩存 對應(yīng)表格②
// If we don't need the network, we're done.
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build()
}
var networkResponse: Response? = null
try {
//使用網(wǎng)絡(luò)
//對應(yīng)表格③④
networkResponse = chain.proceed(networkRequest)
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
......緩存新 response
}
我們看完上面的代碼 發(fā)現(xiàn)所有的緩存策略都是根據(jù)networkRequest
和cacheResponse
兩個變量進行控制的 接下來我們看一下緩存策略的生成過程
fun compute(): CacheStrategy {
class Factory(
private val nowMillis: Long,
internal val request: Request,
private val cacheResponse: Response? //這邊的cacheResponse和我們上面講的還不太一樣 這邊完全是Cache類中緩存的對象 如果我們之前請求過 并且緩存 則不為null
)
//根據(jù)request的cache-control 和response 的cache-control判斷
val candidate = computeCandidate()
//使用網(wǎng)絡(luò)請求 但是請求的request的cache-control是onlyIfCached 表示僅使用緩存
//這就是個悖論了 所以直接返回504 對應(yīng)表格①
// We're forbidden from using the network and the cache is insufficient.
if (candidate.networkRequest != null && request.cacheControl.onlyIfCached) {
return CacheStrategy(null, null)
}
return candidate
}
private fun computeCandidate(): CacheStrategy {
// No cached response.
//這里的cacheResponse是緩存中命中的 所以如果為null 表示之前沒有緩存
if (cacheResponse == null) {
return CacheStrategy(request, null)
}
// Drop the cached response if it's missing a required handshake.
//如果缺少tls握手 直接請求網(wǎng)絡(luò)
if (request.isHttps && cacheResponse.handshake == null) {
return CacheStrategy(request, null)
}
// If this response shouldn't have been stored, it should never be used as a response source.
// This check should be redundant as long as the persistence store is well-behaved and the
// rules are constant.
//根據(jù)cacheResponse的code 判斷是否允許cache
//判斷expires是否過期 并且request和reponse的cache-control都是noStore
//這里就不往下追蹤了 感興趣的同學(xué)可以自己閱讀一下
if (!isCacheable(cacheResponse, request)) {
return CacheStrategy(request, null)
}
//如果是nocache或者根據(jù)If-Modified-Since來判斷
//If-Modified-Since會重新請求服務(wù)器 然后獲取last-modified-since 判斷是否修改過
val requestCaching = request.cacheControl
if (requestCaching.noCache || hasConditions(request)) {
return CacheStrategy(request, null)
}
val responseCaching = cacheResponse.cacheControl
val ageMillis = cacheResponseAge()
var freshMillis = computeFreshnessLifetime()
if (requestCaching.maxAgeSeconds != -1) {
freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong()))
}
var minFreshMillis: Long = 0
if (requestCaching.minFreshSeconds != -1) {
minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong())
}
var maxStaleMillis: Long = 0
if (!responseCaching.mustRevalidate && requestCaching.maxStaleSeconds != -1) {
maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong())
}
if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
val builder = cacheResponse.newBuilder()
if (ageMillis + minFreshMillis >= freshMillis) {
builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"")
}
val oneDayMillis = 24 * 60 * 60 * 1000L
if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"")
}
return CacheStrategy(null, builder.build())
}
// Find a condition to add to the request. If the condition is satisfied, the response body
// will not be transmitted.
val conditionName: String
val conditionValue: String?
when {
etag != null -> {
conditionName = "If-None-Match"
conditionValue = etag
}
lastModified != null -> {
conditionName = "If-Modified-Since"
conditionValue = lastModifiedString
}
servedDate != null -> {
conditionName = "If-Modified-Since"
conditionValue = servedDateString
}
//這里會重新request 然后判斷modified-time
else -> return CacheStrategy(request, null) // No condition! Make a regular request.
}
val conditionalRequestHeaders = request.headers.newBuilder()
conditionalRequestHeaders.addLenient(conditionName, conditionValue!!)
val conditionalRequest = request.newBuilder()
.headers(conditionalRequestHeaders.build())
.build()
//直接使用緩存
return CacheStrategy(conditionalRequest, cacheResponse)
}
我們可以根據(jù)上面的判斷,確定緩存策略
大致是根據(jù)cache-control或者handshake是否過期來判斷是否需要重新request
例如:
noStore,noCache,If-Modified-Since等等 所有的 cache-control可以參考這篇
如果在CacheInterceptor中hit cache的話 就不會再往下面的攔截器傳遞 而是直接原路返回 return response
ConnectInterceptor
作用:負責(zé)與服務(wù)器連接 這個攔截器的過程分析其實相當(dāng)復(fù)雜
簡單來說流程是從連接池中查找連接 如果不存在 就創(chuàng)建連接 并完成TCP,TLS握手
然后等待下一個CallServerInterceptor進行數(shù)據(jù)的交互
我們分析一下源碼 攔截器里的代碼真的很少 不過不要被表象欺騙了?? 我第一次看OkHttp源碼時 看到這里直接就跳過了 然后分析了CallServerInterceptor源碼之后 發(fā)現(xiàn)沒有獲取連接過程
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
//獲取exchange對象 exchange是我們用來和服務(wù)端交互的對象封裝 看一下initExchange方法
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
initExchange()
中主要會調(diào)用ExchangeFinder#find()
然后根據(jù)下面的調(diào)用鏈
ExchangeFinder#find()
->
ExchangeFinder#findHealthyConnection
->
ExchangeFinder#findConnection
然后我們看一下findConnection()
這個方法內(nèi)部就實現(xiàn)了connection的查找或創(chuàng)建
前方高能 下面代碼會又臭又長??
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
var foundPooledConnection = false
var result: RealConnection? = null
var selectedRoute: Route? = null
var releasedConnection: RealConnection?
val toClose: Socket?
synchronized(connectionPool) {
if (call.isCanceled()) throw IOException("Canceled")
val callConnection = call.connection // changes within this overall method
releasedConnection = callConnection
//如果url不一致或者callConnection為null 就斷開鏈接
toClose = if (callConnection != null && (callConnection.noNewExchanges ||
!sameHostAndPort(callConnection.route().address.url))) {
call.releaseConnectionNoEvents()
} else {
null
}
if (call.connection != null) {
// We had an already-allocated connection and it's good.
//經(jīng)過判斷上面驗證 如果不為null 發(fā)現(xiàn)當(dāng)前connection可用 那么就會直接復(fù)用connection
result = call.connection
releasedConnection = null
}
if (result == null) {
// The connection hasn't had any problems for this call.
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
// Attempt to get a connection from the pool.
//第一次試從連接池獲取
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
foundPooledConnection = true
result = call.connection
} else if (nextRouteToTry != null) {
selectedRoute = nextRouteToTry
nextRouteToTry = null
}
}
}
toClose?.closeQuietly()
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection!!)
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result!!
}
// If we need a route selection, make one. This is a blocking operation.
// 查看是否有新的路由信息
var newRouteSelection = false
if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
var localRouteSelector = routeSelector
if (localRouteSelector == null) {
localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
this.routeSelector = localRouteSelector
}
newRouteSelection = true
routeSelection = localRouteSelector.next()
}
var routes: List<Route>? = null
synchronized(connectionPool) {
if (call.isCanceled()) throw IOException("Canceled")
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
routes = routeSelection!!.routes
// 如果有新的路由 繼續(xù)從連接池中查找試試
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
foundPooledConnection = true
result = call.connection
}
}
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection!!.next()
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
result = RealConnection(connectionPool, selectedRoute!!)
connectingConnection = result
}
}
// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
return result!!
}
// Do TCP + TLS handshakes. This is a blocking operation.
//現(xiàn)在發(fā)現(xiàn)可能連接池中沒有我們要的connection
//進行TCP和TLS連接
result!!.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
call.client.routeDatabase.connected(result!!.route())
var socket: Socket? = null
synchronized(connectionPool) {
connectingConnection = null
// Last attempt at connection coalescing, which only occurs if we attempted multiple
// concurrent connections to the same host.
//最后一次嘗試從連接池中獲取 如果能獲取到 就使用連接池中 否則使用連接的connection 并且放入連接池中
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
// We lost the race! Close the connection we created and return the pooled connection.
result!!.noNewExchanges = true
socket = result!!.socket()
result = call.connection
// It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
// that case we will retry the route we just successfully connected with.
nextRouteToTry = selectedRoute
} else {
//放入連接池中
connectionPool.put(result!!)
call.acquireConnectionNoEvents(result!!)
}
}
socket?.closeQuietly()
eventListener.connectionAcquired(call, result!!)
return result!!
}
上面的注釋寫的也比較多 流程其實也比較清晰了 我們接下來分析一下 如何從連接池中查找以及如何建立連接
//從連接池中獲取
fun callAcquirePooledConnection(
address: Address,
call: RealCall,
routes: List<Route>?,
requireMultiplexed: Boolean
): Boolean {
this.assertThreadHoldsLock()
for (connection in connections) {
//判斷connection是否支持多路復(fù)用
if (requireMultiplexed && !connection.isMultiplexed) continue
//判斷connection的host是否匹配
if (!connection.isEligible(address, routes)) continue
call.acquireConnectionNoEvents(connection)
return true
}
return false
}
還有一個TCP和TLS握手流程
fun connect(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
call: Call,
eventListener: EventListener
) {
check(protocol == null) { "already connected" }
var routeException: RouteException? = null
val connectionSpecs = route.address.connectionSpecs
val connectionSpecSelector = ConnectionSpecSelector(connectionSpecs)
......
while (true) {
try {
//注釋的意思是 如果是通過HTTP代理HTTPS 那么需要連接Tunnel
//這里我是在是沒看懂什么意思 告辭?? 有知道的大佬可以留言告訴我一下
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
if (rawSocket == null) {
// We were unable to connect the tunnel but properly closed down our resources.
break
}
} else {
//先建立socket連接 包括代理的配置
connectSocket(connectTimeout, readTimeout, call, eventListener)
}
//如果是Http2協(xié)議還會創(chuàng)建Http2連接 或者 TLS握手
//TLS流程大家可以參考一下我之前寫的一篇文章
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
break
} catch (e: IOException) {
socket?.closeQuietly()
rawSocket?.closeQuietly()
socket = null
rawSocket = null
source = null
sink = null
handshake = null
protocol = null
http2Connection = null
allocationLimit = 1
eventListener.connectFailed(call, route.socketAddress, route.proxy, null, e)
if (routeException == null) {
routeException = RouteException(e)
} else {
routeException.addConnectException(e)
}
if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException
}
}
}
if (route.requiresTunnel() && rawSocket == null) {
throw RouteException(ProtocolException(
"Too many tunnel connections attempted: $MAX_TUNNEL_ATTEMPTS"))
}
idleAtNs = System.nanoTime()
}
TLS握手流程有疑問的同學(xué) 可以看一下我之前寫的一篇文章吶 小飛機??覺得還可以可以點個贊哦
現(xiàn)在connection就獲取完成了 接下來就是與服務(wù)器的交互啦
還有創(chuàng)建了新connection
之后會put
到ConnectionPool
中 其中還有一個clean操作 同學(xué)們可以自己看一下代碼吶
最后還有一個小點需要說明一下 因為關(guān)系到我們下一個攔截器的閱讀
在我們最上面的說到 我們會通過ExchangeFinder#find
來生成ExchangeCodec
對象
首先我們解釋一下ExchangeCodec
作用okhttp
會使用ExchangeCodec
封裝了與服務(wù)器的IO操作
ExchangeCodec
的實現(xiàn)類分別對應(yīng)協(xié)議是Http1ExchangeCodec
和Http2ExchangeCodec
看一下find
方法實現(xiàn)
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
//newCodec方法會對應(yīng)不同的HTTP協(xié)議生成ExchangeCodeC對象
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
trackFailure(e.lastConnectException)
throw e
} catch (e: IOException) {
trackFailure(e)
throw RouteException(e)
}
}
Http2
在閱讀CallServerInterceptor之前 我們有必要看一下Http2.0相關(guān)知識 因為在CallServerInterceptor中會根據(jù)不同的Http協(xié)議 使用不同的傳輸方式 我們看一下Http2.0發(fā)展的幾個階段
- HTTP1.x 一個tcp連接只支持一個請求,單向,只能由客戶端發(fā)起
- SPDY支持多路復(fù)用(Tcp連接復(fù)用),header壓縮,強制使用Https,服務(wù)端推送
- HTTP2.0 支持明文和加密傳輸,優(yōu)化header壓縮算法,支持SPDY現(xiàn)有功能,全雙工
- Quic基于UDP實現(xiàn)穩(wěn)定傳輸協(xié)議 弱網(wǎng)有優(yōu)化
CallServerInterceptor
作用:負責(zé)與服務(wù)器進行數(shù)據(jù)交互
在了解和服務(wù)器交互的流程之前 我想先介紹一下okio
這是Square公司開發(fā)的一款對java輸入輸出流的封裝框架
JAVA輸入輸出流真的是非常的復(fù)雜 子類繁多 而okio主要分為兩個接口Sink
和Source
分別對應(yīng)輸出和輸入相關(guān)
接下來我們看一下實現(xiàn)代碼 流程也比較簡單 就是發(fā)送請求+獲取響應(yīng)
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange!!
val request = realChain.request
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()
//將header寫入socket
exchange.writeRequestHeaders(request)
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
if (responseBuilder == null) {
//如果支持復(fù)用 傳輸request body
if (requestBody.isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
exchange.noRequestBody()
if (!exchange.connection.isMultiplexed) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}
//request完成
if (requestBody == null || !requestBody.isDuplex()) {
exchange.finishRequest()
}
//獲取reponse header
if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
invokeStartEvent = false
}
}
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
if (code == 100) {
// Server sent a 100-continue even though we did not request one. Try again to read the actual
// response status.
//100表示繼續(xù)獲取
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
}
response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}
exchange.responseHeadersEnd(response)
response = if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
//獲取response body
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
"close".equals(response.header("Connection"), ignoreCase = true)) {
exchange.noNewExchangesOnConnection()
}
if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
throw ProtocolException(
"HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
}
return response
}
上面的代碼 流程也比較簡單 就是request+response
我們分析一下分析一下寫入Request
流程
ExchangeCodec#writeRequestHeaders(request)
傳輸頭部在HTTP1.x和HTTP2.0有點區(qū)別,HTTP1.x就直接將header通過寫入Sink Buffer,而HTTP2.0會先創(chuàng)建http2Connection.newStream()
對象
@Synchronized @Throws(IOException::class)
fun headers(
outFinished: Boolean,
streamId: Int,
headerBlock: List<Header>
) {
if (closed) throw IOException("closed")
//Hpack壓縮算法 將壓縮后數(shù)據(jù)存入hpackBuffer
hpackWriter.writeHeaders(headerBlock)
val byteCount = hpackBuffer.size
val length = minOf(maxFrameSize.toLong(), byteCount)
var flags = if (byteCount == length) FLAG_END_HEADERS else 0
if (outFinished) flags = flags or FLAG_END_STREAM
//HTTP2.0特性 幀傳輸
frameHeader(
streamId = streamId,
length = length.toInt(),
type = TYPE_HEADERS,
flags = flags
)
sink.write(hpackBuffer, length)
if (byteCount > length) writeContinuationFrames(streamId, byteCount - length)
}
這里有兩個可以HTTP2.0特性可以關(guān)注一下
- HPACK壓縮
- 傳輸幀
總結(jié)
OkHttp
的Request流程分析就到此結(jié)束了 接下來會接著分析一下Response的處理 覺得有收獲的同學(xué)們歡迎點贊留言啊