本文基于OkHttp 4.3.1源碼分析
OkHttp - 官方地址
OkHttp - GitHub代碼地址
概述
OkHttp整體流程(本文覆蓋紅色部分)
IO流程圖
源碼分析
攔截器
CallServerInterceptor.intercept
整體可以劃分6個步驟随闪,根據不同協(xié)議執(zhí)行實現(xiàn)邏輯區(qū)分Http1.x和Http2
- 寫請求頭
- 創(chuàng)建請求體
- 寫請求體
- 完成請求寫入
- 讀取響應頭
- 返回響應結果
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.exchange()
val request = realChain.request()
val requestBody = request.body
val sentRequestMillis = System.currentTimeMillis()
// 寫入請求頭
exchange.writeRequestHeaders(request)
var invokeStartEvent = true
var responseBuilder: Response.Builder? = null
// 針對支持body的請求秤标,且body不為空的情況進行請求體寫入
if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
// 對 “100-continue” 進行處理
if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
exchange.flushRequest()
responseBuilder = exchange.readResponseHeaders(expectContinue = true)
exchange.responseHeadersStart()
invokeStartEvent = false
}
if (responseBuilder == null) {
if (requestBody.isDuplex()) {
// 寫入請求體
exchange.flushRequest()
val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
requestBody.writeTo(bufferedRequestBody)
} else {
val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
requestBody.writeTo(bufferedRequestBody)
bufferedRequestBody.close()
}
} else {
exchange.noRequestBody()
if (!exchange.connection()!!.isMultiplexed) {
exchange.noNewExchangesOnConnection()
}
}
} else {
exchange.noRequestBody()
}
if (requestBody == null || !requestBody.isDuplex()) {
exchange.finishRequest() // 寫入請求
}
if (responseBuilder == null) {
// 讀取響應頭,寫入到創(chuàng)建的ResponseBuilder
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
invokeStartEvent = false
}
}
// 創(chuàng)建響應
var response = responseBuilder
.request(request)
.handshake(exchange.connection()!!.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
var code = response.code
if (code == 100) {
// 響應碼為100的情況下英染,再次讀取響應頭诗赌,重新構建response
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
if (invokeStartEvent) {
exchange.responseHeadersStart()
}
response = responseBuilder
.request(request)
.handshake(exchange.connection()!!.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
code = response.code
}
// 讀取響應頭結束
exchange.responseHeadersEnd(response)
response = if (forWebSocket && code == 101) {
// webSocket & 101情況下汗茄,返回一個empty的reponse
response.newBuilder()
.body(EMPTY_RESPONSE)
.build()
} else {
// 輸入響應體,創(chuàng)建響應
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
... // check
return response
}
}
寫請求頭
Exchange.writeRequestHeaders
fun writeRequestHeaders(request: Request) {
try {
eventListener.requestHeadersStart(call)
codec.writeRequestHeaders(request) // 調用codec執(zhí)行寫headers
eventListener.requestHeadersEnd(call, request)
} catch (e: IOException) {
...
}
}
創(chuàng)建請求體
Exchange.createRequestBody
fun createRequestBody(request: Request, duplex: Boolean): Sink {
this.isDuplex = duplex
val contentLength = request.body!!.contentLength()
eventListener.requestBodyStart(call)
// 執(zhí)行編碼器創(chuàng)建請求體
val rawRequestBody = codec.createRequestBody(request, contentLength)
return RequestBodySink(rawRequestBody, contentLength)
}
寫請求體
RequestBodySink.write
根據創(chuàng)建的RequestBodySink進行寫入操作
Http/1.x 铭若,newKnownLengthSink或者newChunkedSink
override fun write(source: Buffer, byteCount: Long) {
check(!closed) { "closed" }
if (contentLength != -1L && bytesReceived + byteCount > contentLength) {
throw ProtocolException(
"expected $contentLength bytes but received ${bytesReceived + byteCount}")
}
try {
// 執(zhí)行寫入操作
super.write(source, byteCount)
this.bytesReceived += byteCount
} catch (e: IOException) {
throw complete(e)
}
}
完成請求寫入
Exchange.finishRequest
fun finishRequest() {
try {
codec.finishRequest() // 執(zhí)行請求寫入
} catch (e: IOException) {
eventListener.requestFailed(call, e)
trackFailure(e)
throw e
}
}
讀取響應頭
Exchange.readResponseHeaders
fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
try {
val result = codec.readResponseHeaders(expectContinue)
result?.initExchange(this)
return result
} catch (e: IOException) {
eventListener.responseFailed(call, e)
trackFailure(e)
throw e
}
}
讀取響應體
Exchange.openResponseBody
創(chuàng)建響應體封裝實例洪碳,含有響應體的讀取流
fun openResponseBody(response: Response): ResponseBody {
try {
// 數(shù)據類型
val contentType = response.header("Content-Type")
// 數(shù)據大小
val contentLength = codec.reportedContentLength(response)
// 編碼器創(chuàng)建讀取原生流
val rawSource = codec.openResponseBodySource(response)
// 創(chuàng)建響應體的Source
val source = ResponseBodySource(rawSource, contentLength)
return RealResponseBody(contentType, contentLength, source.buffer())
} catch (e: IOException) {
eventListener.responseFailed(call, e)
trackFailure(e)
throw e
}
}
HTTP1.x對應實現(xiàn)
Http1ExchangeCodec.writeRequestHeaders
創(chuàng)建RequestLine,執(zhí)行寫入headers方法
override fun writeRequestHeaders(request: Request) {
// 獲取RequestLine實例叼屠,進行request輸入
val requestLine = RequestLine.get(
request, realConnection!!.route().proxy.type())
// 寫入headers
writeRequest(request.headers, requestLine)
}
Http1ExchangeCodec.writeRequest
通過Okio的Sink實例 寫入headers
fun writeRequest(headers: Headers, requestLine: String) {
check(state == STATE_IDLE) { "state: $state" }
sink.writeUtf8(requestLine).writeUtf8("\r\n")
// 寫入headers
for (i in 0 until headers.size) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n")
}
sink.writeUtf8("\r\n")
state = STATE_OPEN_REQUEST_BODY
}
Http1ExchangeCodec.createRequestBody
override fun createRequestBody(request: Request, contentLength: Long): Sink {
return when {
request.body != null && request.body.isDuplex() -> throw ProtocolException(
"Duplex connections are not supported for HTTP/1")
request.isChunked() -> newChunkedSink() // 創(chuàng)建未知大小的SInk
contentLength != -1L -> newKnownLengthSink() // 創(chuàng)建已知大小的Sink
else -> // Stream a request body of a known length.
throw IllegalStateException(
"Cannot stream a request body without chunked encoding or a known content length!")
}
}
Http1ExchangeCodec.finishRequest
請求寫入
override fun finishRequest() {
sink.flush()
}
Http1ExchangeCodec.readResponseHeaders
- 讀取響應頭信息瞳腌,根據StatusLine解析協(xié)議類型、響應碼镜雨、Message
- 構建ResponseBuilder接受響應頭信息
override fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
check(state == STATE_OPEN_REQUEST_BODY || state == STATE_READ_RESPONSE_HEADERS) {
"state: $state"
}
try {
// 讀取statusLine嫂侍,響應狀態(tài)信息
val statusLine = StatusLine.parse(readHeaderLine())
// 創(chuàng)建 Response,賦值status信息和header信息
val responseBuilder = Response.Builder()
.protocol(statusLine.protocol)
.code(statusLine.code)
.message(statusLine.message)
.headers(readHeaders())
// 返回responseBuilder
return when {
expectContinue && statusLine.code == HTTP_CONTINUE -> {
null
}
statusLine.code == HTTP_CONTINUE -> {
state = STATE_READ_RESPONSE_HEADERS
responseBuilder
}
else -> {
state = STATE_OPEN_RESPONSE_BODY
responseBuilder
}
}
} catch (e: EOFException) {
...
}
}
Http1ExchangeCodec.openResponseBodySource
根據響應體chunked特性和大小是否已知創(chuàng)建不同的Source流
override fun openResponseBodySource(response: Response): Source {
return when {
!response.promisesBody() -> newFixedLengthSource(0) // 固定長度Source
response.isChunked() -> newChunkedSource(response.request.url) // ChunkedSource
else -> {
val contentLength = response.headersContentLength()
if (contentLength != -1L) {
newFixedLengthSource(contentLength)
} else {
newUnknownLengthSource() // 未知長度Source
}
}
}
}
HTTP2對應實現(xiàn)
Http2ExchangeCodec.writeRequestHeaders
創(chuàng)建RequestLine荚坞,執(zhí)行寫入headers方法
override fun writeRequestHeaders(request: Request) {
if (stream != null) return
val hasRequestBody = request.body != null
// 將request中的Headers 轉換為一個含有Header的List集合
val requestHeaders = http2HeadersList(request)
// 創(chuàng)建 本地發(fā)起的雙向流 (過程會寫入header)
stream = connection.newStream(requestHeaders, hasRequestBody)
...
}
Http2ExchangeCodec.newStream
private fun newStream(
associatedStreamId: Int,
requestHeaders: List<Header>,
out: Boolean
): Http2Stream {
val outFinished = !out
val inFinished = false
val flushHeaders: Boolean
val stream: Http2Stream
val streamId: Int
synchronized(writer) {
synchronized(this) {
if (nextStreamId > Int.MAX_VALUE / 2) {
shutdown(REFUSED_STREAM)
}
if (isShutdown) {
throw ConnectionShutdownException()
}
streamId = nextStreamId
nextStreamId += 2
// 創(chuàng)建Http2Stream
stream = Http2Stream(streamId, this, outFinished, inFinished, null)
flushHeaders = !out ||
writeBytesTotal >= writeBytesMaximum ||
stream.writeBytesTotal >= stream.writeBytesMaximum
if (stream.isOpen) {
streams[streamId] = stream
}
}
if (associatedStreamId == 0) {
// 寫入headers
writer.headers(outFinished, streamId, requestHeaders)
} else {
require(!client) { "client streams shouldn't have associated stream IDs" }
// HTTP/2 has a PUSH_PROMISE frame.
writer.pushPromise(associatedStreamId, streamId, requestHeaders)
}
}
if (flushHeaders) {
writer.flush()
}
return stream
}
Http2Writer.headers
- HPACK加密數(shù)據
- 寫入幀頭數(shù)據
- 寫入加密過的Header數(shù)據
fun headers(
outFinished: Boolean,
streamId: Int,
headerBlock: List<Header>
) {
if (closed) throw IOException("closed")
// 通過Hpack進行headers 的HPACK加密
hpackWriter.writeHeaders(headerBlock)
val byteCount = hpackBuffer.size
val length = minOf(maxFrameSize.toLong(), byteCount)
var flags = if (byteCount == length) FLAG_END_HEADERS else 0
if (outFinished) flags = flags or FLAG_END_STREAM
// 寫入幀頭
frameHeader(
streamId = streamId,
length = length.toInt(),
type = TYPE_HEADERS,
flags = flags
)
// 寫入加密過的header數(shù)據
sink.write(hpackBuffer, length)
if (byteCount > length) writeContinuationFrames(streamId, byteCount - length)
}
Http2ExchangeCodec.createRequestBody
返回一個 FramingSink 對象 吵冒,RequestBodySink.writeTo(bufferedRequestBody)最后會調用其成員變量delegate即sink對象的write方法
override fun createRequestBody(request: Request, contentLength: Long): Sink {
return stream!!.getSink() // FramingSink
}
FramingSink.write
寫入幀數(shù)據
override fun write(source: Buffer, byteCount: Long) {
this@Http2Stream.assertThreadDoesntHoldLock()
// buffer寫入source數(shù)據
sendBuffer.write(source, byteCount)
while (sendBuffer.size >= EMIT_BUFFER_SIZE) {
emitFrame(false) // 當buffer數(shù)據大于EMIT_BUFFER_SIZE 則執(zhí)行發(fā)送frame數(shù)據
}
}
FramingSink.emitFrame
Emit a single data frame to the connection. The frame's size be limited by this stream's write window. This method will block until the write window is nonempty.
往連接中寫入Frame數(shù)據,當寫入的數(shù)據超過了寫入的最大值就阻塞直到喚起
private fun emitFrame(outFinishedOnLastFrame: Boolean) {
val toWrite: Long
val outFinished: Boolean
synchronized(this@Http2Stream) {
writeTimeout.enter()
try {
// 當寫入的總數(shù)據大小 大于 最大寫入值的時候 阻塞
while (writeBytesTotal >= writeBytesMaximum &&
!finished &&
!closed &&
errorCode == null) {
waitForIo() // 阻塞直到 接受 WINDOW_UPDATE
}
} finally {
writeTimeout.exitAndThrowIfTimedOut()
}
checkOutNotClosed() //
toWrite = minOf(writeBytesMaximum - writeBytesTotal, sendBuffer.size)
writeBytesTotal += toWrite
outFinished = outFinishedOnLastFrame && toWrite == sendBuffer.size && errorCode == null
}
writeTimeout.enter()
try {
// 寫入數(shù)據
connection.writeData(id, outFinished, sendBuffer, toWrite)
} finally {
writeTimeout.exitAndThrowIfTimedOut()
}
}
Http2Connection.writeData
連接寫入數(shù)據西剥,當寫入的數(shù)據超過了寫入的最大值就阻塞直到喚起
writeBytesMaximum = 16384(每個幀限制大小)
fun writeData(
streamId: Int,
outFinished: Boolean,
buffer: Buffer?,
byteCount: Long
) {
var byteCount = byteCount
while (byteCount > 0L) {
var toWrite: Int
synchronized(this@Http2Connection) {
try {
while (writeBytesTotal >= writeBytesMaximum) { // 待寫入數(shù)據超了
if (!streams.containsKey(streamId)) {
throw IOException("stream closed")
}
this@Http2Connection.wait() // 阻塞
}
} catch (e: InterruptedException) {
Thread.currentThread().interrupt() // Retain interrupted status.
throw InterruptedIOException()
}
toWrite = minOf(byteCount, writeBytesMaximum - writeBytesTotal).toInt()
toWrite = minOf(toWrite, writer.maxDataLength())
writeBytesTotal += toWrite.toLong()
}
byteCount -= toWrite.toLong()
//寫入數(shù)據
writer.data(outFinished && byteCount == 0L, streamId, buffer, toWrite)
}
}
Http2Write.data
數(shù)據幀的寫入執(zhí)行方法
fun data(outFinished: Boolean, streamId: Int, source: Buffer?, byteCount: Int) {
if (closed) throw IOException("closed")
var flags = FLAG_NONE
if (outFinished) flags = flags or FLAG_END_STREAM
// 幀數(shù)據寫入
dataFrame(streamId, flags, source, byteCount)
}
@Throws(IOException::class)
fun dataFrame(streamId: Int, flags: Int, buffer: Buffer?, byteCount: Int) {
// 寫入幀頭
frameHeader(
streamId = streamId,
length = byteCount,
type = TYPE_DATA,
flags = flags
)
// 寫入數(shù)據
if (byteCount > 0) {
sink.write(buffer!!, byteCount.toLong())
}
}
Http2ExchangeCodec.readResponseHeaders
- 阻塞等待獲取Headers數(shù)據
- 獲取Headers數(shù)據亿汞,轉換為Response中headers瞭空,以及其它響應頭信息,創(chuàng)建ResponseBuilder
override fun readResponseHeaders(expectContinue: Boolean): Response.Builder? {
val headers = stream!!.takeHeaders() // 取headers
val responseBuilder = readHttp2HeadersList(headers, protocol) // 讀取到headers集合并創(chuàng)建建響應Buidler
return if (expectContinue && responseBuilder.code == HTTP_CONTINUE) {
null
} else {
responseBuilder
}
}
Http2Stream.takeHeaders
阻塞等待headers數(shù)據疗我,在拿到headers后咆畏,返回數(shù)據
headers數(shù)據在receiveHeaders方法中會添加
fun takeHeaders(): Headers {
readTimeout.enter()
try {
while (headersQueue.isEmpty() && errorCode == null) {
waitForIo() // 如果為空,則阻塞等待
}
} finally {
readTimeout.exitAndThrowIfTimedOut()
}
if (headersQueue.isNotEmpty()) {
return headersQueue.removeFirst() //取headers 返回
}
throw errorException ?: StreamResetException(errorCode!!)
}
Http2Stream.receiveHeaders
從數(shù)據源接受header后吴裤,存儲到headers隊列中
fun receiveHeaders(headers: Headers, inFinished: Boolean) {
this@Http2Stream.assertThreadDoesntHoldLock()
val open: Boolean
synchronized(this) {
if (!hasResponseHeaders || !inFinished) {
hasResponseHeaders = true
headersQueue += headers // 加入headers
} else {
this.source.trailers = headers
}
if (inFinished) {
this.source.finished = true
}
open = isOpen
notifyAll()
}
if (!open) {
connection.removeStream(id)
}
}
ReaderRunnable.run
在Http2Connection.start的時候旧找,最后會執(zhí)行一個此線程方法
當讀取完preface之后,就一直循環(huán)讀取下一個幀數(shù)據
override fun run() {
var connectionErrorCode = ErrorCode.INTERNAL_ERROR
var streamErrorCode = ErrorCode.INTERNAL_ERROR
var errorException: IOException? = null
try {
reader.readConnectionPreface(this)
// 循環(huán)讀取幀數(shù)據
while (reader.nextFrame(false, this)) {
}
connectionErrorCode = ErrorCode.NO_ERROR
streamErrorCode = ErrorCode.CANCEL
}
}
Http2Reader.nextFrame
讀取響應數(shù)據
fun nextFrame(requireSettings: Boolean, handler: Handler): Boolean {
try {
source.require(9) // Frame header size.
} catch (e: EOFException) {
return false // This might be a normal socket close.
}
val length = source.readMedium()
if (length > INITIAL_MAX_FRAME_SIZE) {
throw IOException("FRAME_SIZE_ERROR: $length")
}
val type = source.readByte() and 0xff
if (requireSettings && type != TYPE_SETTINGS) {
throw IOException("Expected a SETTINGS frame but was $type")
}
val flags = source.readByte() and 0xff
val streamId = source.readInt() and 0x7fffffff // Ignore reserved bit.
if (logger.isLoggable(FINE)) logger.fine(frameLog(true, streamId, length, type, flags))
// 讀取不同類型數(shù)據
when (type) {
TYPE_DATA -> readData(handler, length, flags, streamId)
TYPE_HEADERS -> readHeaders(handler, length, flags, streamId)
TYPE_PRIORITY -> readPriority(handler, length, flags, streamId)
TYPE_RST_STREAM -> readRstStream(handler, length, flags, streamId)
TYPE_SETTINGS -> readSettings(handler, length, flags, streamId)
TYPE_PUSH_PROMISE -> readPushPromise(handler, length, flags, streamId)
TYPE_PING -> readPing(handler, length, flags, streamId)
TYPE_GOAWAY -> readGoAway(handler, length, flags, streamId)
TYPE_WINDOW_UPDATE -> readWindowUpdate(handler, length, flags, streamId)
else -> source.skip(length.toLong()) // Implementations MUST discard frames of unknown types.
}
return true
}
FramingSource.read
override fun read(sink: Buffer, byteCount: Long): Long {
require(byteCount >= 0L) { "byteCount < 0: $byteCount" }
while (true) {
var tryAgain = false
var readBytesDelivered = -1L
var errorExceptionToDeliver: IOException? = null
// 1. Decide what to do in a synchronized block.
synchronized(this@Http2Stream) {
readTimeout.enter()
try {
if (errorCode != null) {
// Prepare to deliver an error.
errorExceptionToDeliver = errorException ?: StreamResetException(errorCode!!)
}
if (closed) {
throw IOException("stream closed")
} else if (readBuffer.size > 0L) {
// Prepare to read bytes. Start by moving them to the caller's buffer.
readBytesDelivered = readBuffer.read(sink, minOf(byteCount, readBuffer.size))
readBytesTotal += readBytesDelivered
val unacknowledgedBytesRead = readBytesTotal - readBytesAcknowledged
if (errorExceptionToDeliver == null &&
unacknowledgedBytesRead >= connection.okHttpSettings.initialWindowSize / 2) {
// Flow control: notify the peer that we're ready for more data! Only send a
// WINDOW_UPDATE if the stream isn't in error.
connection.writeWindowUpdateLater(id, unacknowledgedBytesRead)
readBytesAcknowledged = readBytesTotal
}
} else if (!finished && errorExceptionToDeliver == null) {
// Nothing to do. Wait until that changes then try again.
waitForIo()
tryAgain = true
}
} finally {
readTimeout.exitAndThrowIfTimedOut()
}
}
// 2. Do it outside of the synchronized block and timeout.
if (tryAgain) {
continue
}
if (readBytesDelivered != -1L) {
// Update connection.unacknowledgedBytesRead outside the synchronized block.
updateConnectionFlowControl(readBytesDelivered)
return readBytesDelivered
}
...
return -1L // This source is exhausted.
}
}