https://halfrost.com/http2_begin/
本文基于OkHttp 4.3.1源碼分析
OkHttp - 官方地址
OkHttp - GitHub代碼地址
概述
OkHttp整體流程(本文覆蓋紅色部分)
連接時(shí)序圖
HTTP不同協(xié)議連接區(qū)分
參考:HTTP/2 是如何建立連接的
源碼分析
ConnectInterceptor
- 獲取發(fā)射機(jī)
- 獲取有效連接但校,RealConnection
- 創(chuàng)建編碼器嘿辟,ExchangeCodex纪吮,實(shí)際I/O工作邏輯
- 創(chuàng)建一個(gè)Exchange,封裝上面結(jié)果,為后續(xù)網(wǎng)絡(luò)讀寫工作提供API
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val request = realChain.request()
// 取發(fā)射機(jī)桥爽,RealCall 創(chuàng)建時(shí)默認(rèn)會(huì)創(chuàng)建一個(gè)實(shí)例
val transmitter = realChain.transmitter()
// 對(duì)于非GET請(qǐng)求 檢查為true
val doExtensiveHealthChecks = request.method != "GET"
// 構(gòu)建Exchange
val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)
return realChain.proceed(request, transmitter, exchange)
}
}
發(fā)射機(jī) Transmitter
Bridge between OkHttp's application and network layers. This class exposes high-level application layer primitives: connections, requests, responses, and streams.
OkHttp 應(yīng)用層和網(wǎng)絡(luò)層的橋梁,它提供了應(yīng)用層連接、請(qǐng)求蝴罪、響應(yīng)的最上層函數(shù)
Transmitter構(gòu)造方法
RealCall 創(chuàng)建時(shí)默認(rèn)會(huì)創(chuàng)建一個(gè)實(shí)例,持有client步清、call要门、connectionPool的引用
class Transmitter(
private val client: OkHttpClient,
private val call: Call
) {
// 持有連接池引用
private val connectionPool: RealConnectionPool = client.connectionPool.delegate
// 監(jiān)聽器
private val eventListener: EventListener = client.eventListenerFactory.create(call)
}
Transmitter.prepareToConnect
準(zhǔn)備連接虏肾,觸發(fā)時(shí)機(jī):重試橋開始會(huì)執(zhí)行此方法
fun prepareToConnect(request: Request) {
if (this.request != null) {
if (this.request!!.url.canReuseConnectionFor(request.url) && exchangeFinder!!.hasRouteToTry()) {
return // 可以重用下,結(jié)束
}
check(exchange == null)
// 重置
if (exchangeFinder != null) {
maybeReleaseConnection(null, true)
exchangeFinder = null
}
}
// 構(gòu)造一個(gè)ExchangeFinder實(shí)例(下面介紹)
this.request = request
this.exchangeFinder = ExchangeFinder(
this, connectionPool, createAddress(request.url), call, eventListener)
}
交換器
核心類
- Exchange 欢搜,單個(gè)Http請(qǐng)求封豪,傳輸交換數(shù)據(jù)實(shí)現(xiàn)類
- ExchangeFinder ,請(qǐng)求連接獲取炒瘟,請(qǐng)求編解碼實(shí)例構(gòu)建吹埠,邏輯累
- ExchangeCodec,Http連接I/O操作上層封裝類
Transmitter.newExchange
- 獲取連接 RealConnection
- 構(gòu)造連接的編碼器疮装,ExchangeCodec
- 創(chuàng)建Exchange缘琅,它主要負(fù)責(zé)HTTP連接的維護(hù)管理及通知ExchangeCodec編碼器進(jìn)行實(shí)際的I/O操作
internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange {
...
// 建立連接初始方法,獲取連接后廓推,會(huì)創(chuàng)建一個(gè)ExchangeCodec用于I/O工作
val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks)
// 創(chuàng)建一個(gè) Exchange
val result = Exchange(this, call, eventListener, exchangeFinder!!, codec)
synchronized(connectionPool) {
this.exchange = result
this.exchangeRequestDone = false
this.exchangeResponseDone = false
return result
}
}
ExchangeFinder.find
- 獲取連接 RealConnection
- 構(gòu)造連接的編碼器刷袍,ExchangeCodec
fun find(
client: OkHttpClient,
chain: Interceptor.Chain,
doExtensiveHealthChecks: Boolean
): ExchangeCodec {
val connectTimeout = chain.connectTimeoutMillis()
val readTimeout = chain.readTimeoutMillis()
val writeTimeout = chain.writeTimeoutMillis()
val pingIntervalMillis = client.pingIntervalMillis
val connectionRetryEnabled = client.retryOnConnectionFailure
try {
// 獲取有效連接
val resultConnection = findHealthyConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled,
doExtensiveHealthChecks = doExtensiveHealthChecks
)
// 創(chuàng)建 編碼器
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
trackFailure()
throw e
} catch (e: IOException) {
trackFailure()
throw RouteException(e)
}
}
ExchangeFinder.findHealthyConnection
- 執(zhí)行獲取連接方法,獲取連接
- 如果連接是新創(chuàng)建的連接樊展,則直接返回結(jié)果
- 如果連接是復(fù)用連接池的連接呻纹,則繼續(xù)判斷該連接是否可用,可用則返回滚局,不可用則繼續(xù)find
private fun findHealthyConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
doExtensiveHealthChecks: Boolean
): RealConnection {
while (true) {
// 執(zhí)行連接獲取方法
val candidate = findConnection(
connectTimeout = connectTimeout,
readTimeout = readTimeout,
writeTimeout = writeTimeout,
pingIntervalMillis = pingIntervalMillis,
connectionRetryEnabled = connectionRetryEnabled
)
// 新創(chuàng)建的連接居暖,無需判斷可用,可以直接返回連接結(jié)果
synchronized(connectionPool) {
if (candidate.successCount == 0) {
return candidate
}
}
// 判斷該連接是否可用
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
candidate.noNewExchanges()
continue
}
return candidate
}
}
ExchangeFinder.findConnection
獲取連接過程
- 第一次藤肢,先從連接池獲取連接
- 第二次太闺,選擇新的路由,再次嘗試從連接池獲取
- 構(gòu)造一個(gè)新的連接
- 第三次嘁圈,在高并發(fā)情況下省骂,嘗試走HTTP2的多路復(fù)用連接
- 如果連接池沒有獲取到連接,則走構(gòu)造的那個(gè)新的連接最住,新的連接會(huì)加入到連接池
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
var foundPooledConnection = false
var result: RealConnection? = null
var selectedRoute: Route? = null
var releasedConnection: RealConnection?
val toClose: Socket?
synchronized(connectionPool) {
if (transmitter.isCanceled) throw IOException("Canceled")
hasStreamFailure = false
// 之前的連接
releasedConnection = transmitter.connection
// 前連接不為null 且不支持新的ExChange了則釋放連接 否則toClose 賦值null
toClose = if (transmitter.connection != null && transmitter.connection!!.noNewExchanges) {
transmitter.releaseConnectionNoEvents()
} else {
null
}
if (transmitter.connection != null) {
// 之前的連接可用钞澳,直接賦值result
result = transmitter.connection
releasedConnection = null
}
// 沒有可用連接,則嘗試獲取新的連接
if (result == null) {
// 先從連接池獲取鏈接
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
foundPooledConnection = true
result = transmitter.connection
} else if (nextRouteToTry != null) {
selectedRoute = nextRouteToTry
nextRouteToTry = null
} else if (retryCurrentRoute()) {
selectedRoute = transmitter.connection!!.route()
}
}
}
// 如果等待close的連接不為null則關(guān)閉掉
toClose?.closeQuietly()
// 釋放的連接不為空涨缚,事件通知連接回收
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection!!)
}
// 連接池獲取的連接不為空轧粟,事件通知連接連接獲得
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
}
if (result != null) {
// 找到了連接,返回連接脓魏,結(jié)束方法
return result!!
}
// 如果至此沒有獲取到可用連接兰吟,則嘗試更改路由,在連接
var newRouteSelection = false
if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
// 有新的路由茂翔,則標(biāo)記選了新的路由混蔼,以及獲取新的路由
newRouteSelection = true
routeSelection = routeSelector.next()
}
var routes: List<Route>? = null
synchronized(connectionPool) {
if (transmitter.isCanceled) throw IOException("Canceled")
if (newRouteSelection) {
// 如果更改了路由,則有新的一批地址珊燎,嘗試從新的一批地址獲取可用連接
routes = routeSelection!!.routes
if (connectionPool.transmitterAcquirePooledConnection(
address, transmitter, routes, false)) {
foundPooledConnection = true
result = transmitter.connection
}
}
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection!!.next()
}
// 如果仍然沒有獲取到可用連接惭嚣,則直接創(chuàng)建一個(gè)新的連接實(shí)例
result = RealConnection(connectionPool, selectedRoute!!)
connectingConnection = result
}
}
// 如果第二次從連接池中找到了可用連接遵湖,則直接返回該連接
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
return result!!
}
// 如果沒有找到直接用的連接,則嘗試TCP+TLC握手(阻塞過程)
result!!.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
// 連接 可用晚吞,則從路由黑名單中移除 該路由地址
connectionPool.routeDatabase.connected(result!!.route())
var socket: Socket? = null
synchronized(connectionPool) {
connectingConnection = null
// 最后一次嘗試連接 (這個(gè)只在一個(gè)host多個(gè)并發(fā)連接情況下發(fā)生)
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
// 如果成功延旧,則優(yōu)先使用連接池的連接,并關(guān)閉之前創(chuàng)建的連接socket
result!!.noNewExchanges = true
socket = result!!.socket()
result = transmitter.connection
nextRouteToTry = selectedRoute
} else {
// 沒有從連接池獲取到連接槽地,則把之前創(chuàng)建的連接存儲(chǔ)進(jìn)入
connectionPool.put(result!!)
transmitter.acquireConnectionNoEvents(result!!)
}
}
socket?.closeQuietly()
eventListener.connectionAcquired(call, result!!)
return result!!
}
連接池
初始化時(shí)機(jī)
OkHttpClient.Builder構(gòu)造時(shí)垄潮,會(huì)默認(rèn)構(gòu)造一個(gè)連接池
class Builder constructor() {
internal var connectionPool: ConnectionPool = ConnectionPool()
}
ConnectionPool
連接池,管理HTTP 和HTTP/2的所有連接闷盔,維護(hù)一定的連接,清理超過配置的連接旅急,及提供可復(fù)用的連接逢勾。
默認(rèn)最大空閑連接數(shù)目5個(gè),最長(zhǎng)連接時(shí)長(zhǎng)5分鐘
結(jié)構(gòu):內(nèi)部持有delegate為RealConnectionPool藐吮,它是連接池管理的邏輯實(shí)現(xiàn)類
class ConnectionPool internal constructor(
internal val delegate: RealConnectionPool // 連接池管理邏輯真正實(shí)現(xiàn)類
) {
constructor(
maxIdleConnections: Int,
keepAliveDuration: Long,
timeUnit: TimeUnit
) : this(RealConnectionPool(
taskRunner = TaskRunner.INSTANCE, // 工作線程
maxIdleConnections = maxIdleConnections, // 最大空閑連接數(shù)5
keepAliveDuration = keepAliveDuration, // 最長(zhǎng)連接時(shí)長(zhǎng)5min
timeUnit = timeUnit
))
constructor() : this(5, 5, TimeUnit.MINUTES)
fun idleConnectionCount(): Int = delegate.idleConnectionCount()
fun connectionCount(): Int = delegate.connectionCount()
/** Close and remove all idle connections in the pool. */
fun evictAll() {
delegate.evictAll()
}
}
RealConnectionPool.transmitterAcquirePooledConnection
前面我們知道Transmitter持有ConnectionPool溺拱,且看它持有的變量是直接以RealConnectionPool作為類型的,可以顯示執(zhí)行到它的其它方法
transmitterAcquirePooledConnection嘗試獲取連接池中的連接谣辞,達(dá)到復(fù)用連接的目的
一共兩種情況可以使用復(fù)用的連接迫摔,1.需要多路復(fù)用連接且有支持多路復(fù)用的連接;2.能給目標(biāo)Address分配stream的連接
fun transmitterAcquirePooledConnection(
address: Address,
transmitter: Transmitter,
routes: List<Route>?,
requireMultiplexed: Boolean
): Boolean {
this.assertThreadHoldsLock()
for (connection in connections) {
// 需要復(fù)用連接 && 該連接不支持復(fù)用連接(HTTP2) 則該連接不可用
if (requireMultiplexed && !connection.isMultiplexed) continue
// 如果該連接可以給對(duì)應(yīng)的 address 分配stream 則返回true
if (!connection.isEligible(address, routes)) continue
// 滿足條件下泥从,獲取
transmitter.acquireConnectionNoEvents(connection)
return true
}
return false
}
RealConnectionPool.acquireConnectionNoEvents
獲取連接句占,然后在連接實(shí)例中的transmitters中添加對(duì)應(yīng)承載的Transmitter弱引用
fun acquireConnectionNoEvents(connection: RealConnection) {
connectionPool.assertThreadHoldsLock()
check(this.connection == null)
this.connection = connection
// 連接的transmitters集合添加一個(gè)弱引用
connection.transmitters.add(TransmitterReference(this, callStackTrace))
}
RealConnectionPool.put
ExchangeFinder最后find的connection是創(chuàng)建的新的connection,則會(huì)執(zhí)行Pool的put新連接方法
put方法 先將新的連接添加到連接集合中躯嫉,然后執(zhí)行清理任務(wù)
fun put(connection: RealConnection) {
this.assertThreadHoldsLock()
connections.add(connection)
cleanupQueue.schedule(cleanupTask) // 執(zhí)行清理任務(wù)一次
}
RealConnectionPool.cleanup
對(duì)連接池的連接進(jìn)行維護(hù)纱烘,主要是清理長(zhǎng)時(shí)間無用的連接(超過空閑連接個(gè)數(shù)且超過最大空閑時(shí)長(zhǎng))
fun cleanup(now: Long): Long {
var inUseConnectionCount = 0
var idleConnectionCount = 0
var longestIdleConnection: RealConnection? = null
var longestIdleDurationNs = Long.MIN_VALUE
synchronized(this) {
for (connection in connections) {
// 如果該連接正在被使用,則繼續(xù)遍歷祈餐,使用連接數(shù)+1
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++
continue
}
// 空閑連接數(shù)+1
idleConnectionCount++
// 記錄最長(zhǎng)空閑時(shí)長(zhǎng)擂啥,以及最長(zhǎng)空閑時(shí)長(zhǎng)對(duì)應(yīng)的連接
val idleDurationNs = now - connection.idleAtNanos
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs
longestIdleConnection = connection
}
}
when {
// 當(dāng)連接最長(zhǎng)空閑時(shí)長(zhǎng)大于配置的時(shí)長(zhǎng) & 空閑連接個(gè)是> 配置的個(gè)數(shù)
longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections -> {
// 則移除 最大的那個(gè)空閑連接
connections.remove(longestIdleConnection)
// 如果連接為空了,則停止所有的清除任務(wù)
if (connections.isEmpty()) cleanupQueue.cancelAll()
}
// 其它情況則結(jié)束語句
... }
}
// 關(guān)閉被移除的連接帆阳,釋放資源
longestIdleConnection!!.socket().closeQuietly()
// Cleanup again immediately.
return 0L
}
連接
RealConnection.connect
連接操作哺壶,根據(jù)是否代理會(huì)做兩個(gè)邏輯區(qū)分,最后都是執(zhí)行socket連接蜒谤,然后進(jìn)行協(xié)議的處理
fun connect(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean,
call: Call,
eventListener: EventListener
) {
check(protocol == null) { "already connected" }
var routeException: RouteException? = null
val connectionSpecs = route.address.connectionSpecs
val connectionSpecSelector = ConnectionSpecSelector(connectionSpecs)
...
while (true) {
try {
// 是否走代理通道山宾, 即一個(gè)HTTPS請(qǐng)求 但是其有HTTP的代理通道
if (route.requiresTunnel()) {
// 通過代理通道建立連接 ,也叫隧道連接
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
if (rawSocket == null) {
// We were
break
}
} else {
// 建立連接
connectSocket(connectTimeout, readTimeout, call, eventListener)
}
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
break
} catch (e: IOException) {
... // 異常處理
}
}
if (route.requiresTunnel() && rawSocket == null) {
throw RouteException(ProtocolException(
"Too many tunnel connections attempted: $MAX_TUNNEL_ATTEMPTS"))
}
}
RealConnection.connectSocket
- 創(chuàng)建原生Socket
- 進(jìn)行Socket連接芭逝,至此對(duì)應(yīng)的底層Socket通道已經(jīng)建立
private fun connectSocket(
connectTimeout: Int,
readTimeout: Int,
call: Call,
eventListener: EventListener
) {
val proxy = route.proxy
val address = route.address
// 創(chuàng)建原生Socket
val rawSocket = when (proxy.type()) {
Proxy.Type.DIRECT, Proxy.Type.HTTP -> address.socketFactory.createSocket()!!
else -> Socket(proxy)
}
this.rawSocket = rawSocket
// 發(fā)送開始連接事件
eventListener.connectStart(call, route.socketAddress, proxy)
rawSocket.soTimeout = readTimeout
try {
// 找應(yīng)用對(duì)應(yīng)的平臺(tái) 進(jìn)行連接
Platform.get().connectSocket(rawSocket, route.socketAddress, connectTimeout)
} catch (e: ConnectException) {
throw ConnectException("Failed to connect to ${route.socketAddress}").apply {
initCause(e)
}
}
...
}
RealConnection.establishProtocol
前面已經(jīng)建立Socket連接通道塌碌,現(xiàn)在是對(duì)各個(gè)協(xié)議進(jìn)行支持
- 非HTTPS,1. 支持HTTP2情況下旬盯,優(yōu)先走HTTP/2協(xié)議連接台妆,2. 不支持翎猛,則走HTTP/1協(xié)議
- HTTPS,則先進(jìn)行TLS握手接剩,握手成功后切厘,如果是HTTP/2協(xié)議,則走HTTP/2連接方式
private fun establishProtocol(
connectionSpecSelector: ConnectionSpecSelector,
pingIntervalMillis: Int,
call: Call,
eventListener: EventListener
) {
if (route.address.sslSocketFactory == null) {
if (Protocol.H2_PRIOR_KNOWLEDGE in route.address.protocols) {
// 非HTTPS懊缺,支持HTTP2疫稿,優(yōu)先走HTTP2
socket = rawSocket
protocol = Protocol.H2_PRIOR_KNOWLEDGE
startHttp2(pingIntervalMillis)
return
}
socket = rawSocket
protocol = Protocol.HTTP_1_1
return
}
// 加密連接開始
eventListener.secureConnectStart(call)
// 開始Tls連接
connectTls(connectionSpecSelector)
// 加密連接結(jié)束
eventListener.secureConnectEnd(call, handshake)
if (protocol === Protocol.HTTP_2) {
startHttp2(pingIntervalMillis)
}
}
RealConnection.connectTls
進(jìn)行Tls連接
- 基于之前的原生Socket建立包裝的SSLSocket
- 對(duì)SSLSocket進(jìn)行相關(guān)安全信息的配置
- 通過SSLSocket進(jìn)行握手,及握手過程中鹃两,進(jìn)行一些證書信息校驗(yàn)
- 握手成功遗座,構(gòu)建對(duì)應(yīng)連接的source及sink讀寫流
private fun connectTls(connectionSpecSelector: ConnectionSpecSelector) {
val address = route.address
val sslSocketFactory = address.sslSocketFactory
var success = false
var sslSocket: SSLSocket? = null
try {
// 基于之前創(chuàng)建的原生Socket 建立一個(gè)SSLSocket
sslSocket = sslSocketFactory!!.createSocket(
rawSocket, address.url.host, address.url.port, true /* autoClose */) as SSLSocket
// 對(duì)sslSocket進(jìn)行 安全連接的配置 以及Tls的擴(kuò)展配置
val connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket)
if (connectionSpec.supportsTlsExtensions) {
Platform.get().configureTlsExtensions(sslSocket, address.url.host, address.protocols)
}
// 開始握手
sslSocket.startHandshake()
// 獲取sslSession
val sslSocketSession = sslSocket.session
val unverifiedHandshake = sslSocketSession.handshake()
// 驗(yàn)證證書對(duì)主機(jī)是否ok
if (!address.hostnameVerifier!!.verify(address.url.host, sslSocketSession)) {
val peerCertificates = unverifiedHandshake.peerCertificates
if (peerCertificates.isNotEmpty()) {
val cert = peerCertificates[0] as X509Certificate
throw SSLPeerUnverifiedException("""
|Hostname ${address.url.host} not verified:
| certificate: ${CertificatePinner.pin(cert)}
| DN: ${cert.subjectDN.name}
| subjectAltNames: ${OkHostnameVerifier.allSubjectAltNames(cert)}
""".trimMargin())
} else {
throw SSLPeerUnverifiedException(
"Hostname ${address.url.host} not verified (no certificates)")
}
}
val certificatePinner = address.certificatePinner!!
handshake = Handshake(unverifiedHandshake.tlsVersion, unverifiedHandshake.cipherSuite,
unverifiedHandshake.localCertificates) {
certificatePinner.certificateChainCleaner!!.clean(unverifiedHandshake.peerCertificates,
address.url.host)
}
// Check that the certificate pinner is satisfied by the certificates presented.
certificatePinner.check(address.url.host) {
handshake!!.peerCertificates.map { it as X509Certificate }
}
// Success! Save the handshake and the ALPN protocol.
val maybeProtocol = if (connectionSpec.supportsTlsExtensions) {
Platform.get().getSelectedProtocol(sslSocket)
} else {
null
}
// 握手成功,獲取source和sink流
socket = sslSocket
source = sslSocket.source().buffer()
sink = sslSocket.sink().buffer()
protocol = if (maybeProtocol != null) Protocol.get(maybeProtocol) else Protocol.HTTP_1_1
success = true
} finally {
if (sslSocket != null) {
Platform.get().afterHandshake(sslSocket)
}
if (!success) {
sslSocket?.closeQuietly()
}
}
}
RealConnection.startHttp2
構(gòu)建一個(gè)Http2Connection俊扳,然后啟動(dòng)HTTP/2協(xié)議連接途蒋,
http2連接實(shí)現(xiàn)是OkHttp中http2的包模塊進(jìn)行實(shí)現(xiàn)的
private fun startHttp2(pingIntervalMillis: Int) {
val socket = this.socket!!
val source = this.source!!
val sink = this.sink!!
socket.soTimeout = 0
// 構(gòu)建Http2Connection ,然后啟動(dòng)
val http2Connection = Http2Connection.Builder(client = true, taskRunner = TaskRunner.INSTANCE)
.socket(socket, route.address.url.host, source, sink)
.listener(this) // 添加RealConnection作為listerner
.pingIntervalMillis(pingIntervalMillis)
.build()
this.http2Connection = http2Connection
this.allocationLimit = Http2Connection.DEFAULT_SETTINGS.getMaxConcurrentStreams()
http2Connection.start()
}
RealConnection.newCodec
構(gòu)造編碼器馋记,Http2ExchangeCodec 或者 Http1ExchangeCodec号坡,它們的作用封裝了對(duì)請(qǐng)求和響應(yīng)的讀寫操作
@Throws(SocketException::class)
internal fun newCodec(client: OkHttpClient, chain: Interceptor.Chain): ExchangeCodec {
val socket = this.socket!!
val source = this.source!!
val sink = this.sink!!
val http2Connection = this.http2Connection
return if (http2Connection != null) {
Http2ExchangeCodec(client, this, chain, http2Connection)
} else {
socket.soTimeout = chain.readTimeoutMillis()
source.timeout().timeout(chain.readTimeoutMillis().toLong(), MILLISECONDS)
sink.timeout().timeout(chain.writeTimeoutMillis().toLong(), MILLISECONDS)
Http1ExchangeCodec(client, this, source, sink)
}
}
HTTP2連接
Http2Connection.start
配置HTTP2的請(qǐng)求前置信息,開啟線程進(jìn)行連接前置的信息讀取
fun start(sendConnectionPreface: Boolean = true) {
if (sendConnectionPreface) { // 必為true
// 配置“連接前奏”梯醒,每個(gè)端點(diǎn)都需要發(fā)送連接前奏作為正在使用的協(xié)議的最終確認(rèn)宽堆,并建立 HTTP/2 連接的初始設(shè)置。
writer.connectionPreface()
writer.settings(okHttpSettings)
val windowSize = okHttpSettings.initialWindowSize
if (windowSize != DEFAULT_INITIAL_WINDOW_SIZE) {
writer.windowUpdate(0, (windowSize - DEFAULT_INITIAL_WINDOW_SIZE).toLong())
}
}
// 開啟線程 進(jìn)行連接前奏的讀取工作
Thread(readerRunnable, connectionName).start()
}
Http2Connection.ReaderRunnable
讀取連接前奏信息茸习,確認(rèn)連接正常
inner class ReaderRunnable internal constructor(
internal val reader: Http2Reader
) : Runnable, Http2Reader.Handler {
override fun run() {
var connectionErrorCode = ErrorCode.INTERNAL_ERROR
var streamErrorCode = ErrorCode.INTERNAL_ERROR
var errorException: IOException? = null
try {
// 讀取連接前奏信息
reader.readConnectionPreface(this)
while (reader.nextFrame(false, this)) {
}
connectionErrorCode = ErrorCode.NO_ERROR
streamErrorCode = ErrorCode.CANCEL
} catch (e: IOException) {
errorException = e
connectionErrorCode = ErrorCode.PROTOCOL_ERROR
streamErrorCode = ErrorCode.PROTOCOL_ERROR
} finally {
close(connectionErrorCode, streamErrorCode, errorException)
reader.closeQuietly()
}
}