-
本文概述
結(jié)合使用從源碼層面分析OkHttp的原理。
-
使用回顧
public static String getByOkHttp(String url) throws IOException { OkHttpClient client = new OkHttpClient(); Request request = new Request.Builder() .url(url) .build(); try (Response response = client.newCall(request).execute()) { return Objects.requireNonNull(response.body()).string(); } } public static final MediaType JSON = MediaType.get("application/json; charset=utf-8"); public static String postByOkHttp(String url, String json) throws IOException { OkHttpClient client = new OkHttpClient(); RequestBody body = RequestBody.create(json, JSON); Request request = new Request.Builder() .url(url) .post(body) .build(); try (Response response = client.newCall(request).execute()) { return Objects.requireNonNull(response.body()).string(); } }
簡(jiǎn)單回顧一下OkHttp的使用api答毫,在Retrofit出現(xiàn)之前年鸳,我們對(duì)這個(gè)用法并不陌生灌危,相比于HttpClient,參數(shù)可以直接使用json添加,因?yàn)閖son是通用格式且易解析,不需要像HttpClient那樣requestBody里面都是a=b這樣去存屋谭,通常的操作都是bean->json,json->bean龟糕,所以這更合理桐磁;request和requestConfig可以放在一起鏈?zhǔn)秸{(diào)用,調(diào)用過程更簡(jiǎn)潔讲岁。OkHttp淘汰了HttpClient成為主流網(wǎng)絡(luò)請(qǐng)求框架的原因可能不止于此我擂,隨著源碼再來看看有什么不同衬以。
-
源碼分析
首先,OkHttpClient的構(gòu)造方法里:
public OkHttpClient() { this(new Builder()); } ...... public Builder() { dispatcher = new Dispatcher(); protocols = DEFAULT_PROTOCOLS; connectionSpecs = DEFAULT_CONNECTION_SPECS; eventListenerFactory = EventListener.factory(EventListener.NONE); proxySelector = ProxySelector.getDefault(); if (proxySelector == null) { proxySelector = new NullProxySelector(); } cookieJar = CookieJar.NO_COOKIES; socketFactory = SocketFactory.getDefault(); hostnameVerifier = OkHostnameVerifier.INSTANCE; certificatePinner = CertificatePinner.DEFAULT; proxyAuthenticator = Authenticator.NONE; authenticator = Authenticator.NONE; connectionPool = new ConnectionPool(); dns = Dns.SYSTEM; followSslRedirects = true; followRedirects = true; retryOnConnectionFailure = true; callTimeout = 0; connectTimeout = 10_000; readTimeout = 10_000; writeTimeout = 10_000; pingInterval = 0; }
可以看到在Builder的構(gòu)造方法里校摩,一些必須的參數(shù)都初始化了默認(rèn)值看峻,這里的參數(shù)暫時(shí)先不管,用到的時(shí)候才能知道它的意義衙吩。
RequestBody.create方法會(huì)返回一個(gè)RequestBody對(duì)象:
/** * Returns a new request body that transmits {@code content}. If {@code contentType} is non-null * and lacks a charset, this will use UTF-8. */ public static RequestBody create(@Nullable MediaType contentType, String content) { Charset charset = UTF_8; if (contentType != null) { charset = contentType.charset(); if (charset == null) { charset = UTF_8; contentType = MediaType.parse(contentType + "; charset=utf-8"); } } byte[] bytes = content.getBytes(charset); return create(contentType, bytes); } ...... /** Returns a new request body that transmits {@code content}. */ public static RequestBody create(final @Nullable MediaType contentType, final byte[] content) { return create(contentType, content, 0, content.length); } ...... /** Returns a new request body that transmits {@code content}. */ public static RequestBody create(final @Nullable MediaType contentType, final byte[] content, final int offset, final int byteCount) { if (content == null) throw new NullPointerException("content == null"); Util.checkOffsetAndCount(content.length, offset, byteCount); return new RequestBody() { @Override public @Nullable MediaType contentType() { return contentType; } @Override public long contentLength() { return byteCount; } @Override public void writeTo(BufferedSink sink) throws IOException { sink.write(content, offset, byteCount); } }; }
匿名類對(duì)象RequestBody有一個(gè)writeTo方法备籽,這里猜測(cè)應(yīng)該是socket網(wǎng)絡(luò)寫入的入口,具體再往下看分井。
Request的構(gòu)建就是一系列的解析和參數(shù)賦值,這里不深究這些細(xì)節(jié)霉猛,看一下client.newCall(request):
/** * Prepares the {@code request} to be executed at some point in the future. */ @Override public Call newCall(Request request) { return RealCall.newRealCall(this, request, false /* for web socket */); }
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { // Safely publish the Call instance to the EventListener. RealCall call = new RealCall(client, originalRequest, forWebSocket); call.transmitter = new Transmitter(client, call); return call; }
返回了一個(gè)RealCall對(duì)象尺锚,realCall.execute就會(huì)返回一個(gè)Response了,所以這個(gè)execute就是調(diào)用網(wǎng)絡(luò)請(qǐng)求的入口:
@Override public Response execute() throws IOException { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } transmitter.timeoutEnter(); transmitter.callStart(); try { client.dispatcher().executed(this); return getResponseWithInterceptorChain(); } finally { client.dispatcher().finished(this); } }
首先調(diào)用transmitter的timeoutEnter方法:
public void timeoutEnter() { timeout.enter(); }
private final AsyncTimeout timeout = new AsyncTimeout() { @Override protected void timedOut() { cancel(); } };
所以去AsyncTimeOut里面找enter方法:
public final void enter() { if (inQueue) throw new IllegalStateException("Unbalanced enter/exit"); long timeoutNanos = timeoutNanos(); boolean hasDeadline = hasDeadline(); if (timeoutNanos == 0 && !hasDeadline) { return; // No timeout and no deadline? Don't bother with the queue. } inQueue = true; scheduleTimeout(this, timeoutNanos, hasDeadline); }
這里判斷如果沒有設(shè)置call超時(shí)時(shí)間和deadline則不scheduleTimeout惜浅,值得一提的是瘫辩,以timeoutNanos為例:
public Transmitter(OkHttpClient client, Call call) { this.client = client; this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool()); this.call = call; this.eventListener = client.eventListenerFactory().create(call); this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS); }
timeoutNanos是client.callTimeoutMillis的值,設(shè)置這個(gè)值我們需要改一下okhttpClient的構(gòu)造方式:
//設(shè)置call的timeout時(shí)長(zhǎng)是30秒 OkHttpClient client = new OkHttpClient().newBuilder().callTimeout(30000, TimeUnit.MILLISECONDS).build();
再回到enter坛悉,走到scheduleTimeout方法:
private static synchronized void scheduleTimeout( AsyncTimeout node, long timeoutNanos, boolean hasDeadline) { // Start the watchdog thread and create the head node when the first timeout is scheduled. if (head == null) { head = new AsyncTimeout(); new Watchdog().start(); } long now = System.nanoTime(); if (timeoutNanos != 0 && hasDeadline) { // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around, // Math.min() is undefined for absolute values, but meaningful for relative ones. node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now); } else if (timeoutNanos != 0) { node.timeoutAt = now + timeoutNanos; } else if (hasDeadline) { node.timeoutAt = node.deadlineNanoTime(); } else { throw new AssertionError(); } // Insert the node in sorted order. long remainingNanos = node.remainingNanos(now); for (AsyncTimeout prev = head; true; prev = prev.next) { if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) { node.next = prev.next; prev.next = node; if (prev == head) { AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front. } break; } } }
這里的工作是有一個(gè)鏈表記錄著所有的call請(qǐng)求伐厌,如果當(dāng)前請(qǐng)求是第一個(gè)則即刻開啟一個(gè)線程去開始倒計(jì)時(shí),設(shè)置當(dāng)前請(qǐng)求的timeoutAt(截止時(shí)間)裸影,并根據(jù)鏈表中截止時(shí)間的先后順序把它插入到鏈表中適當(dāng)?shù)奈恢谩?/p>
繼續(xù)往下看挣轨,接下來是transmitter.callStart():
public void callStart() { this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()"); eventListener.callStart(call); }
eventListener是哪里來的:
public Transmitter(OkHttpClient client, Call call) { this.client = client; this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool()); this.call = call; this.eventListener = client.eventListenerFactory().create(call); this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS); }
因?yàn)槲覀儧]有手動(dòng)設(shè)置,所以client.eventListenerFactory()是Builder構(gòu)造方法里默認(rèn)的參數(shù)轩猩,通過EventListener.factory(EventListener.NONE)構(gòu)造卷扮,通過查找發(fā)現(xiàn)EventListener.NONE的callStart是一個(gè)空實(shí)現(xiàn),所以在這里沒有意義均践,繼續(xù)看client.dispatcher().executed(this):
/** Used by {@code Call#execute} to signal it is in-flight. */ synchronized void executed(RealCall call) { runningSyncCalls.add(call); }
runningSyncCalls保存這個(gè)call晤锹。然后調(diào)用getResponseWithInterceptorChain:
Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. List<Interceptor> interceptors = new ArrayList<>(); interceptors.addAll(client.interceptors()); interceptors.add(new RetryAndFollowUpInterceptor(client)); interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket)); Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this, client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis()); boolean calledNoMoreExchanges = false; try { Response response = chain.proceed(originalRequest); if (transmitter.isCanceled()) { closeQuietly(response); throw new IOException("Canceled"); } return response; } catch (IOException e) { calledNoMoreExchanges = true; throw transmitter.noMoreExchanges(e); } finally { if (!calledNoMoreExchanges) { transmitter.noMoreExchanges(null); } } }
前面設(shè)置一系列intercepator,然后構(gòu)造一個(gè)RealInterceptorChain對(duì)象彤委,并調(diào)用它的proceed方法:
public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange) throws IOException { if (index >= interceptors.size()) throw new AssertionError(); calls++; // If we already have a stream, confirm that the incoming request will use it. if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) { throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must retain the same host and port"); } // If we already have a stream, confirm that this is the only call to chain.proceed(). if (this.exchange != null && calls > 1) { throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must call proceed() exactly once"); } // Call the next interceptor in the chain. RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange, index + 1, request, call, connectTimeout, readTimeout, writeTimeout); Interceptor interceptor = interceptors.get(index); Response response = interceptor.intercept(next); // Confirm that the next interceptor made its required call to chain.proceed(). if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) { throw new IllegalStateException("network interceptor " + interceptor + " must call proceed() exactly once"); } // Confirm that the intercepted response isn't null. if (response == null) { throw new NullPointerException("interceptor " + interceptor + " returned null"); } if (response.body() == null) { throw new IllegalStateException( "interceptor " + interceptor + " returned a response with no body"); } return response; }
response是通過intercaptor.intercept()方法獲得的鞭铆,還記得之前getResponseWithInterceptorChain方法里設(shè)置的一系列intercept嗎,這里我們不關(guān)注自定義的interceptor焦影,只考慮必須設(shè)置的车遂、源碼里固定的幾個(gè)interceptor,根據(jù)getResponseWithInterceptorChain方法里的添加順序如下:
interceptors.add(new RetryAndFollowUpInterceptor(client)); interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { //這里的forWebSocket是false斯辰,但是client.networkInterceptors是空的 interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket));
首先RealInterceptorChain參數(shù)中index是0艰额,所以在proceed中interceptors.get(index)就是RetryAndFollowUpInterceptor,執(zhí)行它的intercept方法椒涯,傳入的參數(shù)是持有下一個(gè)interceptor(也就是BridgeInterceptor)的RealInterceptorChain:
@Override public Response intercept(Chain chain) throws IOException { Request request = chain.request(); RealInterceptorChain realChain = (RealInterceptorChain) chain; Transmitter transmitter = realChain.transmitter(); int followUpCount = 0; Response priorResponse = null; while (true) { transmitter.prepareToConnect(request); if (transmitter.isCanceled()) { throw new IOException("Canceled"); } Response response; boolean success = false; try { response = realChain.proceed(request, transmitter, null); success = true; } catch (RouteException e) { // The attempt to connect via a route failed. The request will not have been sent. if (!recover(e.getLastConnectException(), transmitter, false, request)) { throw e.getFirstConnectException(); } continue; } catch (IOException e) { // An attempt to communicate with a server failed. The request may have been sent. boolean requestSendStarted = !(e instanceof ConnectionShutdownException); if (!recover(e, transmitter, requestSendStarted, request)) throw e; continue; } finally { // The network call threw an exception. Release any resources. if (!success) { transmitter.exchangeDoneDueToException(); } } // Attach the prior response if it exists. Such responses never have a body. if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build(); } Exchange exchange = Internal.instance.exchange(response); Route route = exchange != null ? exchange.connection().route() : null; Request followUp = followUpRequest(response, route); if (followUp == null) { if (exchange != null && exchange.isDuplex()) { transmitter.timeoutEarlyExit(); } return response; } RequestBody followUpBody = followUp.body(); if (followUpBody != null && followUpBody.isOneShot()) { return response; } closeQuietly(response.body()); if (transmitter.hasExchange()) { exchange.detachWithViolence(); } if (++followUpCount > MAX_FOLLOW_UPS) { throw new ProtocolException("Too many follow-up requests: " + followUpCount); } request = followUp; priorResponse = response; } }
看見while(true)了嗎柄沮,這就是方法名Retry的意義。
transmitter.prepareToConnect(request)就是設(shè)置一些屬性,這些屬性目前也不明確他們的含義祖搓,所以暫時(shí)先不管狱意。
關(guān)鍵代碼response = realChain.proceed(request, transmitter, null)把邏輯又帶到了之前RealInterceptorChain的proceed方法里,所以和之前一樣拯欧,只不過現(xiàn)在的interceptors.get(index)變成了BridgeInterceptor详囤,next變成了持有下一個(gè)interceptor(CacheInterceptor)的RealInterceptorChain,所以此時(shí)走到了BridgeInterceptor的intercept方法:
@Override public Response intercept(Chain chain) throws IOException { Request userRequest = chain.request(); Request.Builder requestBuilder = userRequest.newBuilder(); RequestBody body = userRequest.body(); if (body != null) { MediaType contentType = body.contentType(); if (contentType != null) { requestBuilder.header("Content-Type", contentType.toString()); } long contentLength = body.contentLength(); if (contentLength != -1) { requestBuilder.header("Content-Length", Long.toString(contentLength)); requestBuilder.removeHeader("Transfer-Encoding"); } else { requestBuilder.header("Transfer-Encoding", "chunked"); requestBuilder.removeHeader("Content-Length"); } } if (userRequest.header("Host") == null) { requestBuilder.header("Host", hostHeader(userRequest.url(), false)); } if (userRequest.header("Connection") == null) { requestBuilder.header("Connection", "Keep-Alive"); } // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing // the transfer stream. boolean transparentGzip = false; if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) { transparentGzip = true; requestBuilder.header("Accept-Encoding", "gzip"); } List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url()); if (!cookies.isEmpty()) { requestBuilder.header("Cookie", cookieHeader(cookies)); } if (userRequest.header("User-Agent") == null) { requestBuilder.header("User-Agent", Version.userAgent()); } Response networkResponse = chain.proceed(requestBuilder.build()); HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers()); Response.Builder responseBuilder = networkResponse.newBuilder() .request(userRequest); if (transparentGzip && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding")) && HttpHeaders.hasBody(networkResponse)) { GzipSource responseBody = new GzipSource(networkResponse.body().source()); Headers strippedHeaders = networkResponse.headers().newBuilder() .removeAll("Content-Encoding") .removeAll("Content-Length") .build(); responseBuilder.headers(strippedHeaders); String contentType = networkResponse.header("Content-Type"); responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody))); } return responseBuilder.build(); }
可以看到镐作,在chain.proceed(requestBuilder.build())之前的代碼就是給request設(shè)置一系列header藏姐。
按照遞歸調(diào)用邏輯,此時(shí)的主角變成了CacheInterceptor该贾,它的intercept方法如下:
@Override public Response intercept(Chain chain) throws IOException { Response cacheCandidate = cache != null ? cache.get(chain.request()) : null; long now = System.currentTimeMillis(); CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get(); Request networkRequest = strategy.networkRequest; Response cacheResponse = strategy.cacheResponse; if (cache != null) { cache.trackResponse(strategy); } if (cacheCandidate != null && cacheResponse == null) { closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it. } // If we're forbidden from using the network and the cache is insufficient, fail. if (networkRequest == null && cacheResponse == null) { return new Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(504) .message("Unsatisfiable Request (only-if-cached)") .body(Util.EMPTY_RESPONSE) .sentRequestAtMillis(-1L) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); } // If we don't need the network, we're done. if (networkRequest == null) { return cacheResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build(); } Response networkResponse = null; try { networkResponse = chain.proceed(networkRequest); } finally { // If we're crashing on I/O or otherwise, don't leak the cache body. if (networkResponse == null && cacheCandidate != null) { closeQuietly(cacheCandidate.body()); } } // If we have a cache response too, then we're doing a conditional get. if (cacheResponse != null) { if (networkResponse.code() == HTTP_NOT_MODIFIED) { Response response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers(), networkResponse.headers())) .sentRequestAtMillis(networkResponse.sentRequestAtMillis()) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis()) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); networkResponse.body().close(); // Update the cache after combining headers but before stripping the // Content-Encoding header (as performed by initContentStream()). cache.trackConditionalCacheHit(); cache.update(cacheResponse, response); return response; } else { closeQuietly(cacheResponse.body()); } } Response response = networkResponse.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build(); if (cache != null) { if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) { // Offer this request to the cache. CacheRequest cacheRequest = cache.put(response); return cacheWritingResponse(cacheRequest, response); } if (HttpMethod.invalidatesCache(networkRequest.method())) { try { cache.remove(networkRequest); } catch (IOException ignored) { // The cache cannot be written. } } } return response; }
在chain.proceed(requestBuilder.build())之前的代碼就是優(yōu)先從緩存中讀取response羔杨,如果沒有,則繼續(xù)往下走杨蛋,我們當(dāng)這是此request的第一次請(qǐng)求兜材,所以接下來的就是ConnectInterceptor的intercept:
@Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); Transmitter transmitter = realChain.transmitter(); // We need the network to satisfy this request. Possibly for validating a conditional GET. //所以GET是不安全的 boolean doExtensiveHealthChecks = !request.method().equals("GET"); Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks); return realChain.proceed(request, transmitter, exchange); } }
通過transmitter.newExchange創(chuàng)建一個(gè)Exchange:
/** Returns a new exchange to carry a new request and response. */ Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) { synchronized (connectionPool) { if (noMoreExchanges) { throw new IllegalStateException("released"); } if (exchange != null) { throw new IllegalStateException("cannot make a new request because the previous response " + "is still open: please call response.close()"); } } ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks); Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec); synchronized (connectionPool) { this.exchange = result; this.exchangeRequestDone = false; this.exchangeResponseDone = false; return result; } }
然后找到最后一個(gè)CallServerInterceptor,看一下它的intercept方法:
@Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Exchange exchange = realChain.exchange(); Request request = realChain.request(); long sentRequestMillis = System.currentTimeMillis(); exchange.writeRequestHeaders(request); boolean responseHeadersStarted = false; Response.Builder responseBuilder = null; if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) { // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 // Continue" response before transmitting the request body. If we don't get that, return // what we did get (such as a 4xx response) without ever transmitting the request body. // if ("100-continue".equalsIgnoreCase(request.header("Expect"))) { exchange.flushRequest(); responseHeadersStarted = true; exchange.responseHeadersStart(); responseBuilder = exchange.readResponseHeaders(true); } if (responseBuilder == null) { //如果為true則先把緩沖區(qū)的內(nèi)容發(fā)送出去 if (request.body().isDuplex()) { // Prepare a duplex body so that the application can send a request body later. exchange.flushRequest(); BufferedSink bufferedRequestBody = Okio.buffer( exchange.createRequestBody(request, true)); request.body().writeTo(bufferedRequestBody); } else { // Write the request body if the "Expect: 100-continue" expectation was met. BufferedSink bufferedRequestBody = Okio.buffer( exchange.createRequestBody(request, false)); request.body().writeTo(bufferedRequestBody); bufferedRequestBody.close(); } } else { exchange.noRequestBody(); if (!exchange.connection().isMultiplexed()) { // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection // from being reused. Otherwise we're still obligated to transmit the request body to // leave the connection in a consistent state. exchange.noNewExchangesOnConnection(); } } } else { exchange.noRequestBody(); } if (request.body() == null || !request.body().isDuplex()) { exchange.finishRequest(); } if (!responseHeadersStarted) { exchange.responseHeadersStart(); } if (responseBuilder == null) { responseBuilder = exchange.readResponseHeaders(false); } Response response = responseBuilder .request(request) .handshake(exchange.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); int code = response.code(); if (code == 100) { // server sent a 100-continue even though we did not request one. // try again to read the actual response response = exchange.readResponseHeaders(false) .request(request) .handshake(exchange.connection().handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build(); code = response.code(); } exchange.responseHeadersEnd(response); if (forWebSocket && code == 101) { // Connection is upgrading, but we need to ensure interceptors see a non-null response body. response = response.newBuilder() .body(Util.EMPTY_RESPONSE) .build(); } else { response = response.newBuilder() .body(exchange.openResponseBody(response)) .build(); } if ("close".equalsIgnoreCase(response.request().header("Connection")) || "close".equalsIgnoreCase(response.header("Connection"))) { exchange.noNewExchangesOnConnection(); } if ((code == 204 || code == 205) && response.body().contentLength() > 0) { throw new ProtocolException( "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength()); } return response; }
exchange.writeRequestHeaders(request)方法:
public void writeRequestHeaders(Request request) throws IOException { try { eventListener.requestHeadersStart(call); codec.writeRequestHeaders(request); eventListener.requestHeadersEnd(call, request); } catch (IOException e) { eventListener.requestFailed(call, e); trackFailure(e); throw e; } }
因?yàn)闆]有設(shè)置過eventListener逞力,所以是默認(rèn)的eventListenerFactory = EventListener.factory(EventListener.NONE)曙寡,所以eventListener.requestHeadersStart(call)和eventListener.requestHeadersEnd(call, request)是空實(shí)現(xiàn)。newExchange方法中可知codec是由exchangeFinder.find(client, chain, doExtensiveHealthChecks)得到寇荧,exchangeFinder是在Transmitter的prepareToConnect中創(chuàng)建举庶,找到exchangeFinder的find方法:
public ExchangeCodec find( OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) { int connectTimeout = chain.connectTimeoutMillis(); int readTimeout = chain.readTimeoutMillis(); int writeTimeout = chain.writeTimeoutMillis(); int pingIntervalMillis = client.pingIntervalMillis(); boolean connectionRetryEnabled = client.retryOnConnectionFailure(); try { RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks); return resultConnection.newCodec(client, chain); } catch (RouteException e) { trackFailure(); throw e; } catch (IOException e) { trackFailure(); throw new RouteException(e); } }
/** * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated * until a healthy connection is found. */ private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks) throws IOException { while (true) { RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled); // If this is a brand new connection, we can skip the extensive health checks. synchronized (connectionPool) { if (candidate.successCount == 0 && !candidate.isMultiplexed()) { return candidate; } } // Do a (potentially slow) check to confirm that the pooled connection is still good. If it // isn't, take it out of the pool and start again. if (!candidate.isHealthy(doExtensiveHealthChecks)) { candidate.noNewExchanges(); continue; } return candidate; } }
/** * Returns a connection to host a new stream. This prefers the existing connection if it exists, * then the pool, finally building a new connection. */ private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException { boolean foundPooledConnection = false; RealConnection result = null; Route selectedRoute = null; RealConnection releasedConnection; Socket toClose; synchronized (connectionPool) { if (transmitter.isCanceled()) throw new IOException("Canceled"); hasStreamFailure = false; // This is a fresh attempt. // Attempt to use an already-allocated connection. We need to be careful here because our // already-allocated connection may have been restricted from creating new exchanges. releasedConnection = transmitter.connection; toClose = transmitter.connection != null && transmitter.connection.noNewExchanges ? transmitter.releaseConnectionNoEvents() : null; if (transmitter.connection != null) { // We had an already-allocated connection and it's good. result = transmitter.connection; releasedConnection = null; } if (result == null) { // Attempt to get a connection from the pool. if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) { foundPooledConnection = true; result = transmitter.connection; } else if (nextRouteToTry != null) { selectedRoute = nextRouteToTry; nextRouteToTry = null; } else if (retryCurrentRoute()) { selectedRoute = transmitter.connection.route(); } } } closeQuietly(toClose); if (releasedConnection != null) { eventListener.connectionReleased(call, releasedConnection); } if (foundPooledConnection) { eventListener.connectionAcquired(call, result); } if (result != null) { // If we found an already-allocated or pooled connection, we're done. return result; } // If we need a route selection, make one. This is a blocking operation. boolean newRouteSelection = false; if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) { newRouteSelection = true; routeSelection = routeSelector.next(); } List<Route> routes = null; synchronized (connectionPool) { if (transmitter.isCanceled()) throw new IOException("Canceled"); if (newRouteSelection) { // Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. This could match due to connection coalescing. routes = routeSelection.getAll(); if (connectionPool.transmitterAcquirePooledConnection( address, transmitter, routes, false)) { foundPooledConnection = true; result = transmitter.connection; } } if (!foundPooledConnection) { if (selectedRoute == null) { selectedRoute = routeSelection.next(); } // Create a connection and assign it to this allocation immediately. This makes it possible // for an asynchronous cancel() to interrupt the handshake we're about to do. result = new RealConnection(connectionPool, selectedRoute); connectingConnection = result; } } // If we found a pooled connection on the 2nd time around, we're done. if (foundPooledConnection) { eventListener.connectionAcquired(call, result); return result; } // Do TCP + TLS handshakes. This is a blocking operation. result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener); connectionPool.routeDatabase.connected(result.route()); Socket socket = null; synchronized (connectionPool) { connectingConnection = null; // Last attempt at connection coalescing, which only occurs if we attempted multiple // concurrent connections to the same host. if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) { // We lost the race! Close the connection we created and return the pooled connection. result.noNewExchanges = true; socket = result.socket(); result = transmitter.connection; // It's possible for us to obtain a coalesced connection that is immediately unhealthy. In // that case we will retry the route we just successfully connected with. nextRouteToTry = selectedRoute; } else { connectionPool.put(result); transmitter.acquireConnectionNoEvents(result); } } closeQuietly(socket); eventListener.connectionAcquired(call, result); return result; }
關(guān)鍵代碼:
// Do TCP + TLS handshakes. This is a blocking operation. result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,connectionRetryEnabled, call, eventListener);
注釋說明這是TCP阻塞操作,很明顯這里是網(wǎng)絡(luò)請(qǐng)求的地方:
public void connect(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, Call call, EventListener eventListener) { if (protocol != null) throw new IllegalStateException("already connected"); RouteException routeException = null; List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs(); ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs); if (route.address().sslSocketFactory() == null) { if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) { throw new RouteException(new UnknownServiceException( "CLEARTEXT communication not enabled for client")); } String host = route.address().url().host(); if (!Platform.get().isCleartextTrafficPermitted(host)) { throw new RouteException(new UnknownServiceException( "CLEARTEXT communication to " + host + " not permitted by network security policy")); } } else { if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) { throw new RouteException(new UnknownServiceException( "H2_PRIOR_KNOWLEDGE cannot be used with HTTPS")); } } while (true) { try { if (route.requiresTunnel()) { connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener); if (rawSocket == null) { // We were unable to connect the tunnel but properly closed down our resources. break; } } else { connectSocket(connectTimeout, readTimeout, call, eventListener); } establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener); eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol); break; } catch (IOException e) { closeQuietly(socket); closeQuietly(rawSocket); socket = null; rawSocket = null; source = null; sink = null; handshake = null; protocol = null; http2Connection = null; eventListener.connectFailed(call, route.socketAddress(), route.proxy(), null, e); if (routeException == null) { routeException = new RouteException(e); } else { routeException.addConnectException(e); } if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) { throw routeException; } } } if (route.requiresTunnel() && rawSocket == null) { ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: " + MAX_TUNNEL_ATTEMPTS); throw new RouteException(exception); } if (http2Connection != null) { synchronized (connectionPool) { allocationLimit = http2Connection.maxConcurrentStreams(); } } }
createTunnel里面也是先進(jìn)行connectSocket:
/** * To make an HTTPS connection over an HTTP proxy, send an unencrypted CONNECT request to create * the proxy connection. This may need to be retried if the proxy requires authorization. */ private Request createTunnel(int readTimeout, int writeTimeout, Request tunnelRequest, HttpUrl url) throws IOException { // Make an SSL Tunnel on the first message pair of each SSL + proxy connection. String requestLine = "CONNECT " + Util.hostHeader(url, true) + " HTTP/1.1"; while (true) { Http1ExchangeCodec tunnelCodec = new Http1ExchangeCodec(null, null, source, sink); source.timeout().timeout(readTimeout, MILLISECONDS); sink.timeout().timeout(writeTimeout, MILLISECONDS); tunnelCodec.writeRequest(tunnelRequest.headers(), requestLine); tunnelCodec.finishRequest(); Response response = tunnelCodec.readResponseHeaders(false) .request(tunnelRequest) .build(); tunnelCodec.skipConnectBody(response); switch (response.code()) { case HTTP_OK: // Assume the server won't send a TLS ServerHello until we send a TLS ClientHello. If // that happens, then we will have buffered bytes that are needed by the SSLSocket! // This check is imperfect: it doesn't tell us whether a handshake will succeed, just // that it will almost certainly fail because the proxy has sent unexpected data. if (!source.getBuffer().exhausted() || !sink.buffer().exhausted()) { throw new IOException("TLS tunnel buffered too many bytes!"); } return null; case HTTP_PROXY_AUTH: tunnelRequest = route.address().proxyAuthenticator().authenticate(route, response); if (tunnelRequest == null) throw new IOException("Failed to authenticate with proxy"); if ("close".equalsIgnoreCase(response.header("Connection"))) { return tunnelRequest; } break; default: throw new IOException( "Unexpected response code for CONNECT: " + response.code()); } } }
所以看一下connectSocket:
/** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket. */ private void connectSocket(int connectTimeout, int readTimeout, Call call, EventListener eventListener) throws IOException { Proxy proxy = route.proxy(); Address address = route.address(); rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP ? address.socketFactory().createSocket() : new Socket(proxy); eventListener.connectStart(call, route.socketAddress(), proxy); rawSocket.setSoTimeout(readTimeout); try { Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout); } catch (ConnectException e) { ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress()); ce.initCause(e); throw ce; } // The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0 // More details: // https://github.com/square/okhttp/issues/3245 // https://android-review.googlesource.com/#/c/271775/ try { source = Okio.buffer(Okio.source(rawSocket)); sink = Okio.buffer(Okio.sink(rawSocket)); } catch (NullPointerException npe) { if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) { throw new IOException(npe); } } }
Platform.get().connectSocket往下追溯揩抡,最終也會(huì)調(diào)用到SocketsSocketImpl的privilegedConnect中:
private synchronized void privilegedConnect(final String host, final int port, final int timeout) throws IOException { try { AccessController.doPrivileged( new java.security.PrivilegedExceptionAction<Void>() { public Void run() throws IOException { superConnectServer(host, port, timeout); cmdIn = getInputStream(); cmdOut = getOutputStream(); return null; } }); } catch (java.security.PrivilegedActionException pae) { throw (IOException) pae.getException(); } }
這又到了socket通信的地方了灯变,此時(shí)rawSocket持有了通信連接的inputStream和outputStream,下面的Okio.buffer又把輸入輸出流封裝了一層捅膘,以輸入流的read為例:
/** * Returns a source that reads from {@code socket}. Prefer this over {@link * #source(InputStream)} because this method honors timeouts. When the socket * read times out, the socket is asynchronously closed by a watchdog thread. */ public static Source source(Socket socket) throws IOException { if (socket == null) throw new IllegalArgumentException("socket == null"); if (socket.getInputStream() == null) throw new IOException("socket's input stream == null"); AsyncTimeout timeout = timeout(socket); Source source = source(socket.getInputStream(), timeout); return timeout.source(source); }
Okio的source方法添祸,它持有InputStream:
private static Source source(final InputStream in, final Timeout timeout) { if (in == null) throw new IllegalArgumentException("in == null"); if (timeout == null) throw new IllegalArgumentException("timeout == null"); return new Source() { @Override public long read(Buffer sink, long byteCount) throws IOException { if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount); if (byteCount == 0) return 0; try { timeout.throwIfReached(); Segment tail = sink.writableSegment(1); int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit); int bytesRead = in.read(tail.data, tail.limit, maxToCopy); if (bytesRead == -1) return -1; tail.limit += bytesRead; sink.size += bytesRead; return bytesRead; } catch (AssertionError e) { if (isAndroidGetsocknameError(e)) throw new IOException(e); throw e; } } @Override public void close() throws IOException { in.close(); } @Override public Timeout timeout() { return timeout; } @Override public String toString() { return "source(" + in + ")"; } }; }
timeout.source方法,它持有的source就是上面的Source:
/** * Returns a new source that delegates to {@code source}, using this to implement timeouts. This * works best if {@link #timedOut} is overridden to interrupt {@code sink}'s current operation. */ public final Source source(final Source source) { return new Source() { @Override public long read(Buffer sink, long byteCount) throws IOException { boolean throwOnTimeout = false; enter(); try { long result = source.read(sink, byteCount); throwOnTimeout = true; return result; } catch (IOException e) { throw exit(e); } finally { exit(throwOnTimeout); } } @Override public void close() throws IOException { boolean throwOnTimeout = false; enter(); try { source.close(); throwOnTimeout = true; } catch (IOException e) { throw exit(e); } finally { exit(throwOnTimeout); } } @Override public Timeout timeout() { return AsyncTimeout.this; } @Override public String toString() { return "AsyncTimeout.source(" + source + ")"; } }; }
所以最終RealConnection中的source和sink此時(shí)被賦值了寻仗。所以最后find返回的ExchangeCodec就是通過RealConnection中的newCodec得到的刃泌,并且在這個(gè)過程中取得了網(wǎng)絡(luò)連接:
ExchangeCodec newCodec(OkHttpClient client, Interceptor.Chain chain) throws SocketException { if (http2Connection != null) { return new Http2ExchangeCodec(client, this, chain, http2Connection); } else { socket.setSoTimeout(chain.readTimeoutMillis()); source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS); sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS); return new Http1ExchangeCodec(client, this, source, sink); } }
可以看到newCodec就是Http1ExchangeCodec,所以codec的writeRequestHeaders為:
@Override public void writeRequestHeaders(Request request) throws IOException { String requestLine = RequestLine.get( request, realConnection.route().proxy().type()); writeRequest(request.headers(), requestLine); }
/** Returns bytes of a request header for sending on an HTTP transport. */ public void writeRequest(Headers headers, String requestLine) throws IOException { if (state != STATE_IDLE) throw new IllegalStateException("state: " + state); sink.writeUtf8(requestLine).writeUtf8("\r\n"); for (int i = 0, size = headers.size(); i < size; i++) { sink.writeUtf8(headers.name(i)) .writeUtf8(": ") .writeUtf8(headers.value(i)) .writeUtf8("\r\n"); } sink.writeUtf8("\r\n"); state = STATE_OPEN_REQUEST_BODY; }
在這里把headers信息寫入Buffer署尤。
回到CallServerInterceptor的intercept繼續(xù)往下走耙替,如果不是GET和HEAD請(qǐng)求方法并且request的body不為空,則調(diào)用flushRequest方法曹体,這個(gè)方法實(shí)際上走到底就是調(diào)用了Okio的source方法俗扇,最終調(diào)用了socket的InputStream的flush方法,即通過網(wǎng)絡(luò)發(fā)送數(shù)據(jù)箕别。
關(guān)于100-Continue的頭信息铜幽,是預(yù)先請(qǐng)求服務(wù)器是否接受POST數(shù)據(jù)滞谢。
最后是讀取response,最終返回一個(gè)RealResponseBody對(duì)象除抛,最后通過Objects.requireNonNull(response.body()).string()讀取返回的body數(shù)據(jù)狮杨。
-
總結(jié)
從源碼層面分析了OkHttp是怎樣一步步把Socket請(qǐng)求封裝成API調(diào)用的,類似于HttpClient到忽,不同的是橄教,對(duì)于大部分的通用處理都以模板模式寫好了,比如Interceptor設(shè)置喘漏,還有大部分的基礎(chǔ)參數(shù)都通過默認(rèn)的Builder構(gòu)造了默認(rèn)值护蝶。
OkHttp源碼分析
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
- 文/潘曉璐 我一進(jìn)店門衩藤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來吧慢,“玉大人,你說我怎么就攤上這事赏表〖焓” “怎么了?”我有些...
- 文/不壞的土叔 我叫張陵瓢剿,是天一觀的道長(zhǎng)逢慌。 經(jīng)常有香客問我,道長(zhǎng)间狂,這世上最難降的妖魔是什么攻泼? 我笑而不...
- 正文 為了忘掉前任,我火速辦了婚禮鉴象,結(jié)果婚禮上忙菠,老公的妹妹穿的比我還像新娘。我一直安慰自己纺弊,他們只是感情好牛欢,可當(dāng)我...
- 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著淆游,像睡著了一般傍睹。 火紅的嫁衣襯著肌膚如雪隔盛。 梳的紋絲不亂的頭發(fā)上,一...
- 文/蒼蘭香墨 我猛地睜開眼俱笛,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了传趾?” 一聲冷哼從身側(cè)響起迎膜,我...
- 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎浆兰,沒想到半個(gè)月后磕仅,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
- 正文 獨(dú)居荒郊野嶺守林人離奇死亡簸呈,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
- 正文 我和宋清朗相戀三年榕订,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蜕便。...
- 正文 年R本政府宣布族壳,位于F島的核電站憔辫,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏仿荆。R本人自食惡果不足惜螺垢,卻給世界環(huán)境...
- 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望赖歌。 院中可真熱鬧枉圃,春花似錦、人聲如沸庐冯。這莊子的主人今日做“春日...
- 文/蒼蘭香墨 我抬頭看了看天上的太陽展父。三九已至返劲,卻和暖如春玲昧,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背篮绿。 一陣腳步聲響...
- 正文 我出身青樓尘应,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國和親吼虎。 傳聞我的和親對(duì)象是個(gè)殘疾皇子犬钢,可洞房花燭夜當(dāng)晚...
推薦閱讀更多精彩內(nèi)容
- 那么我今天給大家簡(jiǎn)單地講一下Okhttp這款網(wǎng)絡(luò)框架及其原理。它是如何請(qǐng)求數(shù)據(jù)思灰,如何響應(yīng)數(shù)據(jù)的 有什么優(yōu)點(diǎn)玷犹?它的應(yīng)...
- 流程分析 我們從一個(gè)簡(jiǎn)單的 HTTP 請(qǐng)求開始: 上面的代碼將會(huì)發(fā)起兩個(gè)簡(jiǎn)單的 HTTP 請(qǐng)求,請(qǐng)求流程如下圖所示...
- OkHttp 4.2.2 源碼分析 原文鏈接注意:OkHttp 3.12.0 以上版本需要 Android 21+...
- 本片文章主要分析的是OkHttp獲取響應(yīng)的過程洒疚,以及攔截器鏈歹颓。 getResponseWithIntercepto...
- OkHttp作為時(shí)下最受歡迎的網(wǎng)絡(luò)請(qǐng)求框架之一,它有著自己的優(yōu)點(diǎn): 使用了眾多的設(shè)計(jì)模式(如:Builder模式油湖、...