這是OKHttp的第三個(gè)攔截器 - ConnectInterceptor
嗦嗡,如果緩存判定失敗搜锰,就會(huì)走到這里進(jìn)行真正的網(wǎng)絡(luò)連接了。
連接池
連接池是對網(wǎng)絡(luò)連接的一種優(yōu)化起意,當(dāng)需要與服務(wù)進(jìn)行連接的時(shí)候责掏,會(huì)線程連接池里面查找有沒有閑置的連接霉咨,如果有那么就直接使用柔吼,否則就重新創(chuàng)建一個(gè)連接并放入連接池烫葬。
constructor() : this(5, 5, TimeUnit.MINUTES)
constructor(
maxIdleConnections: Int,
keepAliveDuration: Long,
timeUnit: TimeUnit
) : this(RealConnectionPool(
taskRunner = TaskRunner.INSTANCE,
maxIdleConnections = maxIdleConnections,
keepAliveDuration = keepAliveDuration,
timeUnit = timeUnit
))
連接池和線程池有些相似
maxIdleConnections
: 最大閑置的連接數(shù)筝尾,可以看到捡需,最大連接數(shù)為5個(gè)
maxIdleConnections
: 閑置連接最大存活時(shí)間,如果超過了存活時(shí)間筹淫,那么就會(huì)將連接關(guān)閉站辉,默認(rèn)5分鐘
timeUnit
: 時(shí)間單位,默認(rèn)是分鐘
建立連接
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
}
這個(gè)類看似只有幾行代碼,其實(shí)大部分邏輯都封裝到其他類里面了饰剥,首先我們回顧一下重試和重定向里面的一段代碼:
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
// ................
while (true) {
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
}
// // ................
}
// ................
}
每一次進(jìn)行網(wǎng)絡(luò)請求都會(huì)走到RealCall.enterNetworkInterceptorExchange()
方法:
fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) {
check(interceptorScopedExchange == null)
check(exchange == null) {
"cannot make a new request because the previous response is still open: " +
"please call response.close()"
}
if (newExchangeFinder) {
this.exchangeFinder = ExchangeFinder(
connectionPool,
createAddress(request.url),
this,
eventListener
)
}
}
可以看到殊霞,這里面只初始化了一個(gè)exchangeFinder
對象,其實(shí)這個(gè)對象是用來連接的關(guān)鍵所在汰蓉,在連接攔截器里面才用到了绷蹲。
val exchange = realChain.call.initExchange(chain)
網(wǎng)絡(luò)連接的一系列操作都封裝到了Exchange
對象中,當(dāng)一個(gè)請求發(fā)出古沥,需要建立連接瘸右,連接建立后需要使用流用來讀寫數(shù)據(jù);而這個(gè)Exchange
就是協(xié)調(diào)請求岩齿、連接與數(shù)據(jù)流三者之間的關(guān)系太颤,它負(fù)責(zé)為一次請求尋找連接,然后獲得流來實(shí)現(xiàn)網(wǎng)絡(luò)通信盹沈。
// RealCall.initExchange()
internal fun initExchange(chain: RealInterceptorChain): Exchange {
synchronized(connectionPool) {
check(!noMoreExchanges) { "released" }
check(exchange == null)
}
// HttpCodec
val codec = exchangeFinder!!.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder!!, codec)
this.interceptorScopedExchange = result
synchronized(connectionPool) {
this.exchange = result
this.exchangeRequestDone = false
this.exchangeResponseDone = false
return result
}
}
這里使用的 exchangeFinder!!.find()
方法實(shí)際上就是去查找或者建立一個(gè)與請求主機(jī)有效的連接龄章,返回值為ExchangeCodec
,包含了輸入輸出流乞封,并且封裝了對HTTP請求報(bào)文的編碼與解碼做裙,直接使用它就能夠與請求主機(jī)完成HTTP通信。Http1.x
和Http2.x
實(shí)現(xiàn)有所不同肃晚,可以看到它最終的兩個(gè)實(shí)現(xiàn)類的創(chuàng)建http2Connection
和Http1ExchangeCodec
:
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
// 建立連接
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
// 創(chuàng)建HttpCodec
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
trackFailure(e.lastConnectException)
throw e
} catch (e: IOException) {
trackFailure(e)
throw RouteException(e)
}
}
@Throws(SocketException::class)
internal fun newCodec(client: OkHttpClient, chain: RealInterceptorChain): ExchangeCodec {
val socket = this.socket!!
val source = this.source!!
val sink = this.sink!!
val http2Connection = this.http2Connection
return if (http2Connection != null) {
Http2ExchangeCodec(client, this, chain, http2Connection)
} else {
socket.soTimeout = chain.readTimeoutMillis()
source.timeout().timeout(chain.readTimeoutMillis.toLong(), MILLISECONDS)
sink.timeout().timeout(chain.writeTimeoutMillis.toLong(), MILLISECONDS)
Http1ExchangeCodec(client, this, source, sink)
}
}
接下來我們看看resultConnection
怎么創(chuàng)建的锚贱,跟蹤代碼:
findHealthyConnection() -> findConnection()
@Throws(IOException::class)
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
var foundPooledConnection = false
var result: RealConnection? = null
var selectedRoute: Route? = null
var releasedConnection: RealConnection?
val toClose: Socket?
synchronized(connectionPool) {
if (call.isCanceled()) throw IOException("Canceled")
val callConnection = call.connection // changes within this overall method
releasedConnection = callConnection
toClose = if (callConnection != null && (callConnection.noNewExchanges ||
!sameHostAndPort(callConnection.route().address.url))) {
call.releaseConnectionNoEvents()
} else {
null
}
if (call.connection != null) {
// We had an already-allocated connection and it's good.
result = call.connection
releasedConnection = null
}
if (result == null) {
// The connection hasn't had any problems for this call.
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
// Attempt to get a connection from the pool.
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
foundPooledConnection = true
result = call.connection
} else if (nextRouteToTry != null) {
selectedRoute = nextRouteToTry
nextRouteToTry = null
}
}
}
toClose?.closeQuietly()
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection!!)
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result!!
}
// If we need a route selection, make one. This is a blocking operation.
var newRouteSelection = false
if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
var localRouteSelector = routeSelector
if (localRouteSelector == null) {
localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
this.routeSelector = localRouteSelector
}
newRouteSelection = true
routeSelection = localRouteSelector.next()
}
var routes: List<Route>? = null
synchronized(connectionPool) {
if (call.isCanceled()) throw IOException("Canceled")
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
routes = routeSelection!!.routes
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
foundPooledConnection = true
result = call.connection
}
}
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection!!.next()
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
result = RealConnection(connectionPool, selectedRoute!!)
connectingConnection = result
}
}
// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
return result!!
}
// Do TCP + TLS handshakes. This is a blocking operation.
result!!.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
call.client.routeDatabase.connected(result!!.route())
var socket: Socket? = null
synchronized(connectionPool) {
connectingConnection = null
// Last attempt at connection coalescing, which only occurs if we attempted multiple
// concurrent connections to the same host.
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
// We lost the race! Close the connection we created and return the pooled connection.
result!!.noNewExchanges = true
socket = result!!.socket()
result = call.connection
// It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
// that case we will retry the route we just successfully connected with.
nextRouteToTry = selectedRoute
} else {
connectionPool.put(result!!)
call.acquireConnectionNoEvents(result!!)
}
}
socket?.closeQuietly()
eventListener.connectionAcquired(call, result!!)
return result!!
}
上面這么多代碼不需要太多的關(guān)注,總體上就是判斷了connection是否可復(fù)用关串,如果可以復(fù)用直接使用call.connection
即可拧廊,否則的話創(chuàng)建一個(gè)新的連接result = RealConnection(connectionPool, selectedRoute!!)
,找到了result之后進(jìn)行三次握手連接:
result!!.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
連接復(fù)用
// 判斷連接池是否有可復(fù)用的連接
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
foundPooledConnection = true
result = call.connection
}
最終可以看到滿足條件就返回RealConnection
:
fun callAcquirePooledConnection(
address: Address,
call: RealCall,
routes: List<Route>?,
requireMultiplexed: Boolean
): Boolean {
this.assertThreadHoldsLock()
for (connection in connections) {
if (requireMultiplexed && !connection.isMultiplexed) continue
if (!connection.isEligible(address, routes)) continue
call.acquireConnectionNoEvents(connection)
return true
}
return false
}
看一下if (!connection.isEligible(address, routes))
這個(gè)條件晋修, 遍歷所有的連接吧碾,如果滿足有條件的返回true,否則返回false
internal fun isEligible(address: Address, routes: List<Route>?): Boolean {
// If this connection is not accepting new exchanges, we're done.
if (calls.size >= allocationLimit || noNewExchanges) return false
// If the non-host fields of the address don't overlap, we're done.
if (!this.route.address.equalsNonHost(address)) return false
// If the host exactly matches, we're done: this connection can carry the address.
if (address.url.host == this.route().address.url.host) {
return true // This connection is a perfect match.
}
// At this point we don't have a hostname match. But we still be able to carry the request if
// our connection coalescing requirements are met. See also:
// https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
// https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/
// 1. This connection must be HTTP/2.
if (http2Connection == null) return false
// 2. The routes must share an IP address.
if (routes == null || !routeMatchesAny(routes)) return false
// 3. This connection's server certificate's must cover the new host.
if (address.hostnameVerifier !== OkHostnameVerifier) return false
if (!supportsUrl(address.url)) return false
// 4. Certificate pinning must match the host.
try {
address.certificatePinner!!.check(address.url.host, handshake()!!.peerCertificates)
} catch (_: SSLPeerUnverifiedException) {
return false
}
return true // The caller's address can be carried by this connection.
}
-
if (calls.size >= allocationLimit || noNewExchanges) return false
連接到達(dá)最大并發(fā)流或者連接不允許建立新的流墓卦;如http1.x正在使用的連接不能給其他人用(最大并發(fā)流為:1)或者連接被關(guān)閉倦春;那就不允許復(fù)用;
-
if (!this.route.address.equalsNonHost(address)) return false if (address.url.host == this.route().address.url.host) { return true // This connection is a perfect match. }
DNS落剪、代理睁本、SSL證書、服務(wù)器域名忠怖、端口完全相同則可復(fù)用呢堰;
如果上述條件都不滿足,在HTTP/2的某些場景下可能仍可以復(fù)用(http2先不管)脑又。
所以綜上暮胧,如果在連接池中找到個(gè)連接參數(shù)一致并且未被關(guān)閉沒被占用的連接,則可以復(fù)用问麸。
總結(jié)
這個(gè)攔截器中的所有實(shí)現(xiàn)都是為了獲得一份與目標(biāo)服務(wù)器的連接往衷,在這個(gè)連接上進(jìn)行HTTP數(shù)據(jù)的收發(fā)。
當(dāng)連接完成严卖,就會(huì)繼續(xù)下一個(gè)攔截器CallServerInterceptor
席舍。