前言
最近有個想法——就是把 Android 主流開源框架進行深入分析劳秋,然后寫成一系列文章仓手,包括該框架的詳細使用與源碼解析。目的是通過鑒賞大神的源碼來了解框架底層的原理玻淑,也就是做到不僅要知其然嗽冒,還要知其所以然。
這里我說下自己閱讀源碼的經(jīng)驗补履,我一般都是按照平時使用某個框架或者某個系統(tǒng)源碼的使用流程入手的添坊,首先要知道怎么使用,然后再去深究每一步底層做了什么干像,用了哪些好的設(shè)計模式帅腌,為什么要這么設(shè)計。
系列文章:
- Android 主流開源框架(一)OkHttp 鋪墊-HttpClient 與 HttpURLConnection 使用詳解
- Android 主流開源框架(二)OkHttp 使用詳解
- Android 主流開源框架(三)OkHttp 源碼解析
- Android 主流開源框架(四)Retrofit 使用詳解
- Android 主流開源框架(五)Retrofit 源碼解析
- Android 主流開源框架(六)Glide 的執(zhí)行流程源碼解析
- 更多框架持續(xù)更新中...
更多干貨請關(guān)注 AndroidNotes
一麻汰、OkHttp 的基本使用示例
1.1 同步 GET 請求
// (1)創(chuàng)建 OkHttpClient 對象
OkHttpClient client = new OkHttpClient();
// (2)創(chuàng)建 Request 對象
Request request = new Request.Builder()
.url(url)
.build();
// (3)創(chuàng)建 Call 對象速客。
Call call = client.newCall(request);
// (4)發(fā)送請求并獲取服務(wù)器返回的數(shù)據(jù)
Response response = call.execute();
// (5)取出相應(yīng)的數(shù)據(jù)
String data = response.body().string();
1.2 異步 GET 請求
// (1)創(chuàng)建 OkHttpClient 對象
OkHttpClient client = new OkHttpClient();
// (2)創(chuàng)建 Request 對象
Request request = new Request.Builder()
.url(url)
.build();
// (3)創(chuàng)建 Call 對象。
Call call = client.newCall(request);
// (4)發(fā)送請求并獲取服務(wù)器返回的數(shù)據(jù)
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
// (5)取出相應(yīng)的數(shù)據(jù)
String data = response.body().string();
}
});
可以看到不管是同步請求還是異步請求五鲫,OkHttp 的基本使用就只有 5 步溺职。同步請求與異步請求唯一不同的就是第 (4) 步,前者使用同步方法 execute()位喂,后者使用異步方法 enqueue()浪耘。接下來我們就根據(jù)這 5 步進行源碼閱讀。
更多 OkHttp 的使用方法可以看我之前寫的文章 Android 主流開源框架(二)OkHttp 使用詳解
二塑崖、OkHttp 源碼分析
源碼版本:3.11.0
2.1 (1)創(chuàng)建 OkHttpClient 對象
OkHttpClient client = new OkHttpClient();
首先我們點擊創(chuàng)建的 OkHttpClient 對象進去源碼是這樣的:
/*OkHttpClient*/
public OkHttpClient() {
this(new Builder());
}
然后是走了有參構(gòu)造:
/*OkHttpClient*/
OkHttpClient(Builder builder) {
this.dispatcher = builder.dispatcher;
this.proxy = builder.proxy;
this.protocols = builder.protocols;
this.connectionSpecs = builder.connectionSpecs;
this.interceptors = Util.immutableList(builder.interceptors);
this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
this.eventListenerFactory = builder.eventListenerFactory;
this.proxySelector = builder.proxySelector;
this.cookieJar = builder.cookieJar;
this.cache = builder.cache;
this.internalCache = builder.internalCache;
this.socketFactory = builder.socketFactory;
boolean isTLS = false;
for (ConnectionSpec spec : connectionSpecs) {
isTLS = isTLS || spec.isTls();
}
if (builder.sslSocketFactory != null || !isTLS) {
this.sslSocketFactory = builder.sslSocketFactory;
this.certificateChainCleaner = builder.certificateChainCleaner;
} else {
X509TrustManager trustManager = Util.platformTrustManager();
this.sslSocketFactory = newSslSocketFactory(trustManager);
this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
}
if (sslSocketFactory != null) {
Platform.get().configureSslSocketFactory(sslSocketFactory);
}
this.hostnameVerifier = builder.hostnameVerifier;
this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
certificateChainCleaner);
this.proxyAuthenticator = builder.proxyAuthenticator;
this.authenticator = builder.authenticator;
this.connectionPool = builder.connectionPool;
this.dns = builder.dns;
this.followSslRedirects = builder.followSslRedirects;
this.followRedirects = builder.followRedirects;
this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
this.connectTimeout = builder.connectTimeout;
this.readTimeout = builder.readTimeout;
this.writeTimeout = builder.writeTimeout;
this.pingInterval = builder.pingInterval;
if (interceptors.contains(null)) {
throw new IllegalStateException("Null interceptor: " + interceptors);
}
if (networkInterceptors.contains(null)) {
throw new IllegalStateException("Null network interceptor: " + networkInterceptors);
}
}
可以看到有很多常量七冲,這里使用了建造者模式,所以這些常量可以通過 build() 進行配置规婆。如果不進行配置則使用無參構(gòu)造中傳進來的默認(rèn)配置澜躺,每個常量的意思具體如下:
/*OkHttpClient*/
public Builder() {
dispatcher = new Dispatcher();// 分發(fā)器
protocols = DEFAULT_PROTOCOLS;// HTTP 協(xié)議
connectionSpecs = DEFAULT_CONNECTION_SPECS;// 傳輸層版本和連接協(xié)議
eventListenerFactory = EventListener.factory(EventListener.NONE);// 事件監(jiān)聽工廠
proxySelector = ProxySelector.getDefault();// 代理選擇器
cookieJar = CookieJar.NO_COOKIES;// cookie
socketFactory = SocketFactory.getDefault();// socket 工廠
hostnameVerifier = OkHostnameVerifier.INSTANCE;// 主機名字確認(rèn)
certificatePinner = CertificatePinner.DEFAULT;// 證書鏈
proxyAuthenticator = Authenticator.NONE;// 代理服務(wù)器身份驗證
authenticator = Authenticator.NONE;// 源服務(wù)器身份驗證
connectionPool = new ConnectionPool();// 連接池
dns = Dns.SYSTEM;// 域名
followSslRedirects = true;// 是否遵循 ssl 重定向
followRedirects = true;// 是否遵循重定向
retryOnConnectionFailure = true;// 連接失敗的時候是否重試
connectTimeout = 10_000;// 連接超時
readTimeout = 10_000;// 讀超時
writeTimeout = 10_000;// 寫超時
pingInterval = 0;// HTTP / 2 和 Web 套接字 ping 之間的時間間隔
}
2.2 (2)創(chuàng)建 Request 對象
Request request = new Request.Builder()
.url(url)
.build();
可以看到,這里同樣使用了建造者模式抒蚜,我們點擊 Request 進去看看:
/*Request*/
//...
final HttpUrl url;
final String method;
final Headers headers;
final @Nullable RequestBody body;
final Map<Class<?>, Object> tags;
//...
發(fā)現(xiàn) Request 還是比較簡單的掘鄙,只是用來設(shè)置一些請求鏈接(url)、請求方法(method)嗡髓、請求頭(headers)操漠、請求體(body)、標(biāo)簽(tag饿这,可作為取消請求的標(biāo)記)浊伙。
2.3 (3)創(chuàng)建 Call 對象
Call call = client.newCall(request);
我們點擊 newCall() 方法進去看看:
/*OkHttpClient*/
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
發(fā)現(xiàn)是調(diào)用了 RealCall 的 newRealCall() 方法撞秋,并傳入了 OkHttpClient 與 Request 對象。
再跟進到 newRealCall() 方法:
/*RealCall*/
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call;
}
發(fā)現(xiàn)是創(chuàng)建了一個 RealCall 對象吧黄,并返回給上一層部服。RealCall 是 Call 的實現(xiàn)類唆姐,Call 定義了請求相關(guān)的操作拗慨,例如同步異步、取消請求等方法奉芦。所以后續(xù)的請求相關(guān)操作基本都是在調(diào)用 Call 定義的方法赵抢,而這些方法真正的執(zhí)行是它的實現(xiàn)類 RealCall。
最后看看 RealCall 的構(gòu)造函數(shù)声功,該函數(shù)是比較簡單的烦却,只是賦值一些常量,然后創(chuàng)建了重試與重定向攔截器(RetryAndFollowUpInterceptor)(這個后面會講):
/*RealCall*/
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}
2.4 (4)發(fā)送請求并獲取服務(wù)器返回的數(shù)據(jù)
前面我們已經(jīng)說了先巴,同步請求與異步請求唯一不同的就是第 (4) 步其爵,前者使用同步方法 execute(),后者使用異步方法 enqueue()伸蚯。所以我們分 2 種情況來講摩渺。
2.4.1 同步請求
Response response = call.execute();
我們點擊 execute() 方法進去看看:
/*RealCall*/
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
client.dispatcher().executed(this);// (1)
Response result = getResponseWithInterceptorChain();// (2)
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);// (3)
}
}
源碼中我標(biāo)注了 3 個關(guān)注點,點擊關(guān)注點(1)的 executed() 方法進去剂邮,可以看到是將傳進來的 RealCall 加入了一個雙端隊列:
/*Dispatcher*/
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
其中 runningSyncCalls 是一個雙端隊列摇幻,用來記錄正在運行的同步請求隊列:
/*Dispatcher*/
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
關(guān)注點(2)返回了一個 Response,也就是服務(wù)器返回的數(shù)據(jù)挥萌,說明請求就是在這里執(zhí)行了绰姻,這個是我們要研究的重點,放到后面再說引瀑。
點擊關(guān)注點(3)的 finished() 方法進去狂芋,是這樣的:
/*Dispatcher*/
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");// (1)
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
可以看到關(guān)注點(1)calls.remove(call) 只是把當(dāng)前 RealCall 又從正在運行的同步請求隊列中移除了,說明請求已經(jīng)完成了憨栽。
你應(yīng)該注意到了帜矾,上面還有個 dispatcher 沒講到,其實這是一個分發(fā)器徒像,是用來對請求進行分發(fā)的黍特。我們剛剛也分析了在同步請求中涉及到的 dispatcher 只是用來記錄正在運行的同步請求隊列,然后請求完成就移除掉锯蛀。所以這個分發(fā)器主要用在異步請求中灭衷,我們放到異步請求中再去講。
2.4.2 異步請求
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
我們點擊 enqueue() 方法進去看看:
/*RealCall*/
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));// (1)
}
前面幾行與同步請求源碼一樣旁涤,我們點擊關(guān)注點(1)的 enqueue() 方法進去看看:
/*Dispatcher*/
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
可以看到這里面涉及到很多 Dispatcher 對象里面的常量與變量翔曲,所以也能看出 Dispatcher 主要用在異步請求中迫像。先看下 Dispatcher 對象里面的常量與變量:
/*Dispatcher*/
// 最大并發(fā)請求數(shù)
private int maxRequests = 64;
// 每個主機最大請求數(shù)
private int maxRequestsPerHost = 5;
// 每次調(diào)度程序變?yōu)榭臻e時調(diào)用的回調(diào)
private @Nullable Runnable idleCallback;
// 用來執(zhí)行異步任務(wù)的線程池
private @Nullable ExecutorService executorService;
// 準(zhǔn)備中的異步請求隊列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
// 正在運行的異步請求隊列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
// 正在運行的同步請求隊列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
弄懂了這些常量與變量的意思,就很好理解上面關(guān)注點(1)的 enqueue() 方法了瞳遍,即如果 ”正在運行的異步請求隊列數(shù)“ 小于 ”最大并發(fā)請求數(shù)“闻妓,并且 ”每個主機正在運行的請求數(shù)“ 小于 ”每個主機最大請求數(shù)“,則將當(dāng)前請求繼續(xù)加入 ”正在運行的異步請求隊列“ 并在線程池中執(zhí)行掠械,否則將當(dāng)前請求加入 ”準(zhǔn)備中的異步請求隊列“由缆。
我們看到線程池中還傳了一個 AsyncCall 進去,點擊進去看看:
/*RealCall*/
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();// (1)
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));// (2)
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);// (3)
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);// (4)
}
} finally {
client.dispatcher().finished(this);// (5)
}
}
}
發(fā)現(xiàn)他是 RealCall 的內(nèi)部類猾蒂,繼承 NamedRunnable均唉,實現(xiàn)了 Runnable。里面同樣執(zhí)行了 execute() 方法肚菠,仔細看這個方法與之前我們閱讀同步請求中的 execute() 類似舔箭,關(guān)注點(1)(5)都是一樣的,不同的是多了 2 個回調(diào)蚊逢,因為是異步請求层扶,所以需要把最終返回的結(jié)果通過 responseCallback 回調(diào)到最外層我們使用的地方去,其中(2)(4)是失敗的回調(diào)烙荷,(3)是成功的回調(diào)镜会。
到這里,OkHttp 基本使用的第(4)步除了 getResponseWithInterceptorChain() 方法奢讨,其他都看完了稚叹,下面就重點閱讀這個方法。
2.4.3 攔截器
點擊 getResponseWithInterceptorChain() 方法進去看看:
/*RealCall*/
Response getResponseWithInterceptorChain() throws IOException {
// 創(chuàng)建一個攔截器集合
List<Interceptor> interceptors = new ArrayList<>();
// 添加用戶自定義的攔截器
interceptors.addAll(client.interceptors());
// 添加重試與重定向攔截器
interceptors.add(retryAndFollowUpInterceptor);
// 添加橋攔截器
interceptors.add(new BridgeInterceptor(client.cookieJar()));
// 添加緩存攔截器
interceptors.add(new CacheInterceptor(client.internalCache()));
// 添加連接攔截器
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
// 添加用戶自定義的網(wǎng)絡(luò)攔截器
interceptors.addAll(client.networkInterceptors());
}
// 添加服務(wù)器請求攔截器
interceptors.add(new CallServerInterceptor(forWebSocket));
// (1) 構(gòu)建責(zé)任鏈
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
// (2) 處理責(zé)任鏈中的攔截器
return chain.proceed(originalRequest);
}
可以看到拿诸,這里用到了很多攔截器扒袖,將這些攔截器構(gòu)建成一條責(zé)任鏈,然后再一個個處理亩码。這里用到了責(zé)任鏈模式季率,每個攔截器負責(zé)相應(yīng)的功能,上一個攔截器完成會傳給下一個攔截器描沟,直到最后一個攔截器執(zhí)行完再一層層向上返回 Response飒泻。
我們先驗證下這個責(zé)任鏈的執(zhí)行過程是否跟我說的一樣,然后再看看每個攔截器的具體作用吏廉。這里我標(biāo)記了 2 個關(guān)注點:
關(guān)注點(1)是構(gòu)建一條責(zé)任鏈泞遗,并把責(zé)任鏈需要用到的參數(shù)傳過去,其中參數(shù) 5 為責(zé)任鏈的索引席覆,這里傳 “0” 表示當(dāng)前正在處理第一個攔截器史辙。
關(guān)注點(2)是處理責(zé)任鏈中的攔截器,點擊 proceed() 方法進去看看:
/*RealInterceptorChain*/
@Override public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
//(1)start
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// (1)end
// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}
return response;
}
可以看到,除了一些判斷只需要看關(guān)注點(1)即可聊倔。這里會構(gòu)建一個新的責(zé)任鏈晦毙,然后把責(zé)任鏈的索引加 1(為了下次從攔截器集合中取出下一個攔截器),接著從攔截器集合中取出當(dāng)前攔截器并調(diào)用 intercept() 方法耙蔑,這樣如果這個攔截器可以完成任務(wù)會馬上返回 Response见妒,否則會在 intercept() 方法中繼續(xù)處理責(zé)任鏈,因為該 intercept() 方法中會繼續(xù)調(diào)用責(zé)任鏈的 proceed() 方法甸陌⌒氪В看完源碼確實跟我們之前設(shè)想的一樣的,接下來我們看看每個攔截器的具體作用邀层。
2.4.3.1 重試與重定向攔截器(RetryAndFollowUpInterceptor)
該攔截器主要負責(zé)失敗后重連以及重定向返敬,從前面的 proceed() 方法可知遂庄,每個攔截器被調(diào)用的方法都是 intercept() 方法寥院,所以閱讀攔截器的入口就是該方法。
重試與重定向攔截器中的 intercept() 方法如下:
/*RetryAndFollowUpInterceptor*/
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();
// (1) 創(chuàng)建 StreamAllocation 對象涛目,用來協(xié)調(diào)三個實體(Connections秸谢、Streams、Calls)之間的關(guān)系
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
// 重定向次數(shù)
int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response;
boolean releaseConnection = true;
try {
//(2)執(zhí)行下一個攔截器
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
//(3)發(fā)生 Route 異常霹肝,則嘗試進行恢復(fù)
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getFirstConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
//(4)發(fā)生 IO 異常估蹄,則嘗試進行恢復(fù)
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// 如果中途出現(xiàn)異常,則釋放所有資源
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
// 構(gòu)建 body 為空的響應(yīng)體
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
Request followUp;
try {
// (5)檢查是否需要重定向沫换,不需要則 followUp 返回 null
followUp = followUpRequest(response, streamAllocation.route());
} catch (IOException e) {
streamAllocation.release();
throw e;
}
// (6)不需要重定向臭蚁,則返回之前的 response
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
// 關(guān)閉資源
closeQuietly(response.body());
// 重定向次數(shù)大于最大值,則釋放 StreamAllocation 并拋出異常
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
// 如果該請求無法復(fù)用之前的連接讯赏,則釋放后重新創(chuàng)建
if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
request = followUp;
priorResponse = response;
}
}
該方法的注釋都寫的比較詳細了垮兑,我們重點看下我標(biāo)記的關(guān)注點。
(1):創(chuàng)建 StreamAllocation 對象漱挎,StreamAllocation 相當(dāng)于一個管理類系枪,用來協(xié)調(diào)三個實體(Connections、Streams磕谅、Calls)之間的關(guān)系私爷。這里還傳了一個 client.connectionPool(),它是第一步創(chuàng)建 OkHttpClient 對象的時候創(chuàng)建的膊夹,是一個連接池衬浑。它們會在后面的連接攔截器(ConnectInterceptor)中才被真正的使用到,后面會講放刨。
其中:
Connections:連接到遠程服務(wù)器的物理套接字工秩。
Streams:在連接上分層的邏輯 http 請求/響應(yīng)對。
Calls:流的邏輯序列,通常是初始請求以及它的重定向請求拓诸。(2):是執(zhí)行下一個攔截器侵佃,按順序調(diào)用那就是 BridgeInterceptor。
(3)(4):發(fā)生 Route 或 IO 異常奠支,則進行重試馋辈,我們看看重試的相關(guān)方法:
/*RetryAndFollowUpInterceptor*/
private boolean recover(IOException e, StreamAllocation streamAllocation,
boolean requestSendStarted, Request userRequest) {
streamAllocation.streamFailed(e);
// 客戶端配置了出錯不再重試
if (!client.retryOnConnectionFailure()) return false;
// 無法再次發(fā)送 request body
if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false;
// 發(fā)生 isRecoverable() 方法中出現(xiàn)的異常
if (!isRecoverable(e, requestSendStarted)) return false;
// 沒有更多的路線可供嘗試
if (!streamAllocation.hasMoreRoutes()) return false;
// For failure recovery, use the same route selector with a new connection.
return true;
}
private boolean isRecoverable(IOException e, boolean requestSendStarted) {
// 協(xié)議異常
if (e instanceof ProtocolException) {
return false;
}
// 中斷異常
if (e instanceof InterruptedIOException) {
return e instanceof SocketTimeoutException && !requestSendStarted;
}
// SSL握手異常
if (e instanceof SSLHandshakeException) {
// If the problem was a CertificateException from the X509TrustManager,
// do not retry.
if (e.getCause() instanceof CertificateException) {
return false;
}
}
// SSL握手未授權(quán)異常
if (e instanceof SSLPeerUnverifiedException) {
// e.g. a certificate pinning error.
return false;
}
// An example of one we might want to retry with a different route is a problem connecting to a
// proxy and would manifest as a standard IOException. Unless it is one we know we should not
// retry, we return true and try a new route.
return true;
}
可以看到嘗試進行重試的時候,如果出現(xiàn)以下情況則不會重試:
客戶端配置了出錯不再重試
無法再次發(fā)送 request body
發(fā)生 ProtocolException(協(xié)議異常)倍谜、InterruptedIOException(中斷異常)迈螟、SSLHandshakeException(SSL握手異常)、SSLPeerUnverifiedException(SSL握手未授權(quán)異常)中的任意一個異常
沒有更多的路線可供嘗試
(5)(6):檢查是否需要重定向尔崔,如果不需要則返回之前的 response答毫,需要則進行重定向,也就是繼續(xù)循環(huán)請求重試季春。是否需要重定向主要根據(jù)響應(yīng)碼來決定洗搂,具體可以去看看 followUpRequest() 方法,這里就不貼代碼了载弄。
ps:如果你想拿重定向的域名來跟一遍源碼中重定向的流程耘拇,那么你可以試試郭霖的域名(http://guolin.tech
), 該域名會重定向到他的 csdn 博客(https://blog.csdn.net/guolin_blog
)宇攻, 走一遍流程會讓你對源碼中重定向的原理有更深的理解惫叛。
2.4.3.2 橋攔截器(BridgeInterceptor)
該攔截器相當(dāng)于一個橋梁,首先將用戶的請求轉(zhuǎn)換為發(fā)給服務(wù)器的請求逞刷,然后使用該請求訪問網(wǎng)絡(luò)嘉涌,最后將服務(wù)器返回的響應(yīng)轉(zhuǎn)換為用戶可用的響應(yīng)。
我們看看該攔截器中的 intercept() 方法:
/*BridgeInterceptor*/
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
//(1)將用戶的請求轉(zhuǎn)換為發(fā)給服務(wù)器的請求-start
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}
// 如果我們在創(chuàng)建 Request 的時候添加了 "Accept-Encoding: gzip" 請求頭夸浅,那么要自己負責(zé)解壓縮傳輸流仑最。
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
// 默認(rèn)是 gzip 壓縮
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
//(1)將用戶的請求轉(zhuǎn)換為發(fā)給服務(wù)器的請求-end
//(2)執(zhí)行下一個攔截器進行網(wǎng)絡(luò)請求
Response networkResponse = chain.proceed(requestBuilder.build());
//(3)將服務(wù)器返回的響應(yīng)轉(zhuǎn)換為用戶可用的響應(yīng)-start
// 解析服務(wù)器返回的 header
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
// gzip 解壓
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}
//(3)將服務(wù)器返回的響應(yīng)轉(zhuǎn)換為用戶可用的響應(yīng)-end
return responseBuilder.build();
}
根據(jù)我標(biāo)記的關(guān)注點大概就是:
- (1):將用戶的請求轉(zhuǎn)換為發(fā)給服務(wù)器的請求。主要是添加一些默認(rèn)的請求頭题篷,例如 Content-Type词身、Content-Length、Transfer-Encoding番枚、Host法严、Connection。因為我們在創(chuàng)建 Request 的時候可以不添加任何請求頭葫笼,如果這里不加上一些默認(rèn)的請求頭是無法完成請求的深啤。
- (2):執(zhí)行下一個攔截器進行網(wǎng)絡(luò)請求。
- (3):將服務(wù)器返回的響應(yīng)轉(zhuǎn)換為用戶可用的響應(yīng)路星。主要是解析服務(wù)器返回的 header溯街,進行 gzip 解壓诱桂。
2.4.3.3 緩存攔截器(CacheInterceptor)
該攔截器主要用來實現(xiàn)緩存的讀取和存儲,即進行網(wǎng)絡(luò)請求的時候執(zhí)行到緩存攔截器會先判斷是否有緩存呈昔,如果有會直接返回緩存挥等,沒有則會執(zhí)行后面的攔截器繼續(xù)請求網(wǎng)絡(luò),請求成功會將請求到的數(shù)據(jù)緩存起來堤尾。
我們看看該攔截器中的 intercept() 方法:
/*CacheInterceptor*/
@Override public Response intercept(Chain chain) throws IOException {
//(1)通過 Request 得到緩存
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
//(2)通過緩存策略獲取是使用緩存還是使用網(wǎng)絡(luò)請求肝劲,或者 2 者同時使用或都不使用
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
cache.trackResponse(strategy);
}
// 有緩存,但是策略中不使用緩存郭宝,需要釋放資源
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body());
}
// (3)如果策略中不使用網(wǎng)絡(luò)請求辞槐,也不使用緩存,那么直接返回失敗
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
//(4)如果策略中不使用網(wǎng)絡(luò)請求粘室,執(zhí)行到這里說明是使用緩存的榄檬,則直接返回緩存
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
Response networkResponse = null;
try {
//(5)執(zhí)行下一個攔截器進行網(wǎng)絡(luò)請求
networkResponse = chain.proceed(networkRequest);
} finally {
// 如果發(fā)生 IO 或者其他崩潰,為了不泄漏緩存體衔统,需要釋放資源
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
//(6)如果策略中使用緩存鹿榜,并且響應(yīng)碼為 304,則返回緩存
if (cacheResponse != null) {
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
cache.trackConditionalCacheHit();
// 更新緩存
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
//(7)將請求返回的結(jié)果存進緩存
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
return response;
}
根據(jù)我標(biāo)記的關(guān)注點大概流程就是:
- (1):通過 Request 得到緩存缰冤。這里的 cache 是 InternalCache犬缨,但是因為 InternalCache 是一個接口,而且只有一個實現(xiàn)類 Cache棉浸,所以 cache 其實就是 Cache。進入 Cache 可以發(fā)現(xiàn)它底層使用的是 DiskLruCache 緩存機制刺彩,也就是使用 “最近最少使用” 算法將數(shù)據(jù)緩存到磁盤內(nèi)迷郑。
- (2):通過緩存策略獲取是使用緩存還是使用網(wǎng)絡(luò)請求,或者 2 者同時使用或都不使用创倔。networkRequest 為 null 表示不使用網(wǎng)絡(luò)請求嗡害,cacheResponse 為 null 表示不使用緩存。
- (3):如果策略中不使用網(wǎng)絡(luò)請求畦攘,也不使用緩存霸妹,那么直接返回失敗。這樣就直接停止了后面攔截器的執(zhí)行知押,結(jié)束了整個請求叹螟。
- (4):如果策略中不使用網(wǎng)絡(luò)請求,執(zhí)行到這里說明是使用緩存的台盯,則直接返回緩存罢绽。這樣就直接停止了后面攔截器的執(zhí)行,結(jié)束了整個請求静盅。
- (5):執(zhí)行到這里良价,說明需要從網(wǎng)絡(luò)獲取數(shù)據(jù),則會繼續(xù)執(zhí)行下一個攔截器進行網(wǎng)絡(luò)請求。
- (6):如果策略中使用緩存明垢,并且響應(yīng)碼為 304蚣常,則返回緩存,并且更新緩存痊银。
- (7):最后將請求返回的結(jié)果進行緩存史隆。
2.4.3.4 連接攔截器(ConnectInterceptor)
該攔截器主要用來打開與目標(biāo)服務(wù)器的連接,然后繼續(xù)執(zhí)行下一個攔截器曼验。
我們看看該攔截器中的 intercept() 方法:
/*ConnectInterceptor*/
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
//(1)獲取 StreamAllocation
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
//(2)創(chuàng)建 HttpCodec
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
//(3)獲取 RealConnection
RealConnection connection = streamAllocation.connection();
//(4)執(zhí)行下一個攔截器
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
根據(jù)我標(biāo)記的關(guān)注點大概流程就是:
- (1):獲取 StreamAllocation泌射,這里獲取的其實就是第一個攔截器 RetryAndFollowUpInterceptor 中創(chuàng)建的。
- (2):創(chuàng)建 HttpCodec鬓照,是通過 StreamAllocation 的 newStream() 方法獲取的熔酷,我們看下 newStream() 方法:
/*StreamAllocation*/
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
//(5)尋找可用的連接
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
//(6)通過這個可用的連接創(chuàng)建 HttpCodec
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
我們看下關(guān)注點(5)中的 findHealthyConnection() 方法:
/*StreamAllocation*/
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
//(7)尋找一個連接
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// 如果這是一個全新的連接,則不需要后面的健康檢查豺裆,而是在這里直接返回連接
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// 如果不健康拒秘,則禁止創(chuàng)建新流,并且繼續(xù)循環(huán)查找可用的鏈接
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
可以看到臭猜,findHealthyConnection() 方法中又通過 findConnection() 方法去尋找躺酒,看下這個方法:
/*StreamAllocation*/
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
Connection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
//(8)start
// 嘗試使用已分配的連接
releasedConnection = this.connection;
toClose = releaseIfNoNewStreams();
if (this.connection != null) {
// 已經(jīng)分配的連接,并且是可用的蔑歌,則將該已分配的連接賦值為可用的連接
result = this.connection;
releasedConnection = null;
}
//(8)end
if (!reportedAcquired) {
// 如果這個連接從未標(biāo)記過已獲取羹应,那么請不要標(biāo)記為為已發(fā)布
releasedConnection = null;
}
//(9)start 嘗試從連接池中獲取連接
if (result == null) {
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
closeQuietly(toClose);
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if (result != null) {
// 如果找到一個可用的連接,那么直接返回
return result;
}
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
//(10)根據(jù)不同的路由再次從連接池中獲取可用的連接
if (newRouteSelection) {
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.get(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}
//(11)還是沒有找到可用的連接次屠,那么重新創(chuàng)建一個新的連接
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result, false);
}
}
// 如果在第二次找到了可用的連接园匹,則直接返回
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
//(12)進行 TCP 和 TLS 握手
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;
//(13)將新創(chuàng)建的連接放進連接池中
Internal.instance.put(connectionPool, result);
// 如果同時創(chuàng)建了到同一地址的另一個多路復(fù)用連接,則釋放這個連接并獲取那個多路復(fù)用連接劫灶。
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
通過上面的代碼分析裸违,findConnection() 方法大概流程就是:
- (8):判斷當(dāng)前連接是否可用,可用則進行賦值本昏,在后面直接返回
- (9):如果當(dāng)前連接不可用供汛,那么嘗試從連接池中獲取可用連接
- (10):如果連接池中找不到可用的連接,那么切換不同的路由再次從連接池中獲取可用的連接
- (11):還是沒有找到可用的連接涌穆,那么只能重新創(chuàng)建一個新的連接
- (12):進行 TCP 和 TLS 握手
- (13):最后將新創(chuàng)建的連接放進連接池中
可以看到怔昨,關(guān)注點(9)(13)分別是從連接池中取出連接和存入連接到連接池,分別調(diào)用的是 Internal.instance.get() 與 Internal.instance.put()蒲犬。
我們看下 get() 方法是怎樣的朱监,點擊 get() 方法進去,發(fā)現(xiàn) Internal 是一個抽象類原叮,它有一個靜態(tài)的實例赫编,在 OkHttpClient 的靜態(tài)代碼快中被初始化:
/*OkHttpClient*/
static {
Internal.instance = new Internal() {
// 省略部分代碼...
@Override public RealConnection get(ConnectionPool pool, Address address,
StreamAllocation streamAllocation, Route route) {
return pool.get(address, streamAllocation, route);
}
// 省略部分代碼...
}
可以看到 Internal 的 get() 方法中調(diào)用的是 ConnectionPool(連接池)的 get() 方法巡蘸,所以可以肯定這個連接池就是用來操作這些連接的,內(nèi)部具體怎么操作我們放到后面去講擂送,這里只需要知道它可以用來存取連接就可以了悦荒。
關(guān)注點(12)其實就是與服務(wù)器建立連接的核心代碼,我們看下這個方法:
/*RealConnection*/
public void connect(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
EventListener eventListener) {
if (protocol != null) throw new IllegalStateException("already connected");
/*線路選擇*/
RouteException routeException = null;
List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
if (route.address().sslSocketFactory() == null) {
if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication not enabled for client"));
}
String host = route.address().url().host();
if (!Platform.get().isCleartextTrafficPermitted(host)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication to " + host + " not permitted by network security policy"));
}
} else {
if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) {
throw new RouteException(new UnknownServiceException(
"H2_PRIOR_KNOWLEDGE cannot be used with HTTPS"));
}
}
while (true) {
try {
//(14)如果需要隧道連接嘹吨,則進行隧道連接
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
if (rawSocket == null) {
// We were unable to connect the tunnel but properly closed down our resources.
break;
}
} else {
//(15)不需要隧道連接搬味,則直接進行 socket 連接
connectSocket(connectTimeout, readTimeout, call, eventListener);
}
// 建立協(xié)議
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
// 連接結(jié)束
eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
break;
} catch (IOException e) {
closeQuietly(socket);
closeQuietly(rawSocket);
socket = null;
rawSocket = null;
source = null;
sink = null;
handshake = null;
protocol = null;
http2Connection = null;
// 連接失敗
eventListener.connectFailed(call, route.socketAddress(), route.proxy(), null, e);
if (routeException == null) {
routeException = new RouteException(e);
} else {
routeException.addConnectException(e);
}
if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException;
}
}
}
if (route.requiresTunnel() && rawSocket == null) {
ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: "
+ MAX_TUNNEL_ATTEMPTS);
throw new RouteException(exception);
}
if (http2Connection != null) {
synchronized (connectionPool) {
allocationLimit = http2Connection.maxConcurrentStreams();
}
}
}
關(guān)注點(14)(15)最終都會調(diào)用 connectSocket() 方法:
/*RealConnection*/
private void connectSocket(int connectTimeout, int readTimeout, Call call,
EventListener eventListener) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
// 創(chuàng)建 socket
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
eventListener.connectStart(call, route.socketAddress(), proxy);
// 設(shè)置 socket 超時時間
rawSocket.setSoTimeout(readTimeout);
try {
//(16)進行 socket 連接
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
try {
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
throw new IOException(npe);
}
}
}
可以看到 okhttp 底層是通過 socket 進行連接的。
看完關(guān)注點(5)中的 findHealthyConnection() 方法蟀拷,我們繼續(xù)回去看關(guān)注點(6)的方法:
/*StreamAllocation*/
public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain,
StreamAllocation streamAllocation) throws SocketException {
if (http2Connection != null) {
return new Http2Codec(client, chain, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(chain.readTimeoutMillis());
source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
}
該方法是創(chuàng)建 HttpCodec碰纬,HttpCodec 的作用主要是進行 HTTP 請求和響應(yīng)的編碼與解碼操作。它有兩個實現(xiàn)類问芬,分別是 Http1Codec 與 Http2Codec悦析,這里主要判斷如果是 HTTP/2,則創(chuàng)建 Http2Codec此衅,否則創(chuàng)建 Http1Codec强戴。
(3):繼續(xù)回去看關(guān)注點(3),點擊 connection() 方法進去發(fā)現(xiàn)挡鞍,這里獲取的 RealConnection 其實就是關(guān)注點(7) findConnection()
方法中從連接池中取出連接或重新創(chuàng)建的連接骑歹。(4):關(guān)注點(4)則拿到連接后繼續(xù)執(zhí)行下一個攔截器。
2.4.3.5 服務(wù)器請求攔截器(CallServerInterceptor)
該攔截器主要用來向服務(wù)器發(fā)起請求并獲取數(shù)據(jù)墨微,它是責(zé)任鏈中的最后一個攔截器道媚,獲取到服務(wù)器的數(shù)據(jù)后會直接返回給上一個攔截器。
我們看看該攔截器中的 intercept() 方法:
/*CallServerInterceptor*/
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
// 獲取 ConnectInterceptor 中創(chuàng)建的 HttpCodec
HttpCodec httpCodec = realChain.httpStream();
// 獲取 RetryAndFollowUpInterceptor 中創(chuàng)建的 StreamAllocation
StreamAllocation streamAllocation = realChain.streamAllocation();
// 獲取 ConnectInterceptor 中新創(chuàng)建或者從連接池中拿到的 RealConnection
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
realChain.eventListener().requestHeadersStart(realChain.call());
//(1)寫入請求頭
httpCodec.writeRequestHeaders(request);
realChain.eventListener().requestHeadersEnd(realChain.call(), request);
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(true);
}
//(2)寫入請求體
if (responseBuilder == null) {
// Write the request body if the "Expect: 100-continue" expectation was met.
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if (!connection.isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
streamAllocation.noNewStreams();
}
}
httpCodec.finishRequest();
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
//(3)讀取響應(yīng)頭
responseBuilder = httpCodec.readResponseHeaders(false);
}
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (code == 100) {
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
responseBuilder = httpCodec.readResponseHeaders(false);
response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
code = response.code();
}
realChain.eventListener()
.responseHeadersEnd(realChain.call(), response);
//(4)讀取響應(yīng)體
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
可以看到欢嘿,這個攔截器還是比較簡單的衰琐,上一個攔截器 ConnectInterceptor 已經(jīng)連接到服務(wù)器了并創(chuàng)建了 HttpCodec 對象,HttpCodec 對象封裝了 okio 提供的輸出流(BufferedSink)與輸入流(BufferedSource)炼蹦,所以這里就主要通過 HttpCodec 對象與服務(wù)器進行讀寫操作。例如寫入請求頭與請求體狸剃,讀取響應(yīng)頭與響應(yīng)體掐隐。
2.4.4 ConnectionPool(連接池)
簡介
連接池是用來管理 HTTP 和 HTTP / 2 連接的復(fù)用,以減少網(wǎng)絡(luò)延遲钞馁。從上面我們閱讀 findConnection() 方法源碼也可以得出虑省,即如果從連接池中找到了可用的連接,那么就不用重新創(chuàng)建新的連接僧凰,也省去了 TCP 和 TLS 握手探颈。ConnectionPool 類中的主要常量
/*ConnectionPool*/
// 線程池,用于清除過期的連接
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
// 最大允許空閑的連接數(shù)量
private final int maxIdleConnections;
// 連接的存活時間
private final long keepAliveDurationNs;
// 清理任務(wù)训措,用來清理無效的連接
private final Runnable cleanupRunnable = new Runnable() {
//...
};
// 用來記錄連接的雙端隊列
private final Deque<RealConnection> connections = new ArrayDeque<>();
- 構(gòu)造函數(shù)
/*ConnectionPool*/
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
// Put a floor on the keep alive duration, otherwise cleanup will spin loop.
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}
}
可以看到構(gòu)造函數(shù)設(shè)置了默認(rèn)的最大允許空閑的連接數(shù)量為 5 個伪节,連接的存活時間為 5 分鐘光羞。
- 主要函數(shù)
這里主要講下前面連接攔截器中用到的 get()、put() 方法怀大。
get() 方法:
/*ConnectionPool*/
@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}
該方法是從連接池中獲取可復(fù)用的連接纱兑,這里的邏輯是遍歷記錄連接的雙端隊列,取出可復(fù)用的連接化借。
put() 方法:
/*ConnectionPool*/
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
// 執(zhí)行清理任務(wù)
executor.execute(cleanupRunnable);
}
// 將新創(chuàng)建的連接添加進記錄連接的雙端隊列中
connections.add(connection);
}
該方法是將新創(chuàng)建的連接放進連接池中,這里的邏輯是先清理無效的連接,然后再將新創(chuàng)建的連接添加進記錄連接的雙端隊列中绪励。
我們先看下清理任務(wù):
/*ConnectionPool*/
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
// 清理無效連接
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
這是一個阻塞的清理任務(wù)黎烈,并且通過無限循環(huán)來清理。這里首先調(diào)用 cleanup() 方法清理無效連接蒜焊,并返回下次需要清理的間隔時間倒信,然后調(diào)用 wait() 方法進行等待以釋放鎖與時間片,當(dāng)?shù)却龝r間到了后山涡,再次循環(huán)清理堤结。
我們看下 cleanup() 方法:
/*ConnectionPool*/
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
// 遍歷連接,找出無效連接進行清理
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
//(1)查詢此連接的 StreamAllocation 的引用數(shù)量鸭丛,大于 0 則 inUseConnectionCount 加 1竞穷,否則 idleConnectionCount 加 1。
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
// 標(biāo)記空閑連接
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// 如果連接存活時間大于等于 5 分鐘鳞溉,或者空閑的連接數(shù)量大于 5 個瘾带,則將該鏈接從隊列中移除
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// 如果空閑的連接數(shù)量大于 0,返回此連接即將到期的時間
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// 如果沒有空閑連接熟菲,則返回 5 分鐘看政,也就是下次需要清理的間隔時間為 5 分鐘
return keepAliveDurationNs;
} else {
// 沒有任何連接,則跳出循環(huán)
cleanupRunning = false;
return -1;
}
}
closeQuietly(longestIdleConnection.socket());
// 馬上進行下一次清理
return 0;
}
可以看到抄罕,這里主要通過判斷連接存活時間是否大于等于 5 分鐘允蚣,或者空閑的連接數(shù)量是否大于 5 個來進行連接的清理。連接是否空閑是通過關(guān)注點(1)中的 pruneAndGetAllocationCount() 方法來判斷的呆贿,我們看下這個方法:
/*ConnectionPool*/
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
// 獲得 allocations 的弱引用列表
List<Reference<StreamAllocation>> references = connection.allocations;
// 遍歷 allocations 的弱引用列表
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);
// 說明 StreamAllocation 被使用嚷兔,則繼續(xù)下一次循環(huán)
if (reference.get() != null) {
i++;
continue;
}
// We've discovered a leaked allocation. This is an application bug.
StreamAllocation.StreamAllocationReference streamAllocRef =
(StreamAllocation.StreamAllocationReference) reference;
String message = "A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?";
Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
// 說明 StreamAllocation 沒有被使用,則從列表中移除
references.remove(i);
connection.noNewStreams = true;
// 列表為空做入,說明都被移除了冒晰,這個時候返回 allocationCount 為 0,表示該連接是空閑的竟块。
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
// 列表不為空壶运,返回列表的大小,大于 0 表示該連接是在使用的浪秘。
return references.size();
}
該方法比較簡單蒋情,主要是遍歷 allocations 的弱引用列表埠况,如果 StreamAllocation 沒有被使用,則從列表中移除恕出,最后返回該列表的大小询枚,通過該大小即可判斷是否是空閑連接,小于等于 0 才是空閑連接浙巫。
2.5 (5)取出相應(yīng)的數(shù)據(jù)
String data = response.body().string();
在第(4)步同步請求或者異步請求執(zhí)行完都會返回 Response金蜀,這個就是最終返回的數(shù)據(jù),可以通過它獲取到 code的畴、message渊抄、header、body 等丧裁。
這里講下 body护桦,點擊 body() 進去是這樣的:
/*Response*/
public @Nullable ResponseBody body() {
return body;
}
可以看到這里的 body 就是 ResponseBody,它是一個抽象類煎娇,不能被實例化二庵,一般用它的子類 RealResponseBody 進行實例化。它是在前面講的 “2.4.3.5 服務(wù)器請求攔截器(CallServerInterceptor)” 小節(jié)中賦值的:
/*CallServerInterceptor*/
if (forWebSocket && code == 101) {
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
// openResponseBody() 方法中創(chuàng)建了 RealResponseBody 對象返回
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
如果有緩存則會在緩存攔截器(CacheInterceptor)中賦值缓呛。
ResponseBody 中常用的方法有如下幾種:
/*ResponseBody*/
public final String string() throws IOException {
BufferedSource source = source();
try {
Charset charset = Util.bomAwareCharset(source, charset());
return source.readString(charset);
} finally {
Util.closeQuietly(source);
}
}
public final InputStream byteStream() {
return source().inputStream();
}
public final byte[] bytes() throws IOException {
long contentLength = contentLength();
if (contentLength > Integer.MAX_VALUE) {
throw new IOException("Cannot buffer entire body for content length: " + contentLength);
}
BufferedSource source = source();
byte[] bytes;
try {
bytes = source.readByteArray();
} finally {
Util.closeQuietly(source);
}
if (contentLength != -1 && contentLength != bytes.length) {
throw new IOException("Content-Length ("
+ contentLength
+ ") and stream length ("
+ bytes.length
+ ") disagree");
}
return bytes;
}
可以看到催享,這三個方法內(nèi)部都調(diào)用了 source() 來獲取 BufferedSource,BufferedSource 就是 okio 提供的輸入流哟绊,拿到輸入流就可以將 body 數(shù)據(jù)轉(zhuǎn)換為你需要的類型因妙。例如:
希望返回 String,則調(diào)用 response.body().string()票髓,適用于不超過 1 MB 的數(shù)據(jù)攀涵。
希望返回輸入流,則調(diào)用 response.body().byteStream()洽沟,適用于超過 1 MB 的數(shù)據(jù)以故,例如下載文件。
希望返回二進制字節(jié)數(shù)組裆操,則調(diào)用 response.body().bytes()据德。
需要注意的是,response.body().string() 只能調(diào)用一次跷车,否則會拋出如下異常:
W/System.err: java.lang.IllegalStateException: closed
W/System.err: at okio.RealBufferedSource.rangeEquals(RealBufferedSource.java:408)
W/System.err: at okio.RealBufferedSource.rangeEquals(RealBufferedSource.java:402)
W/System.err: at okhttp3.internal.Util.bomAwareCharset(Util.java:469)
W/System.err: at okhttp3.ResponseBody.string(ResponseBody.java:175)
根據(jù)報錯日志可以看到,是在 RealBufferedSource 類的 408 行報的錯橱野,我們跳轉(zhuǎn)過去看看:
/*RealBufferedSource*/
@Override
public boolean rangeEquals(long offset, ByteString bytes, int bytesOffset, int byteCount)
throws IOException {
if (closed) throw new IllegalStateException("closed");
//...
}
可以看到朽缴,這里做了個判斷,closed 為 true 就拋出該異常水援,繼續(xù)跟蹤 closed 賦值的地方:
/*RealBufferedSource*/
@Override public void close() throws IOException {
if (closed) return;
closed = true;
source.close();
buffer.clear();
}
可以看到密强,closed 唯一賦值的地方在 close() 方法中茅郎,而該方法正是 string() 方法中的 Util.closeQuietly(source); 調(diào)用的:
/*ResponseBody*/
public static void closeQuietly(Closeable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch (RuntimeException rethrown) {
throw rethrown;
} catch (Exception ignored) {
}
}
}
到這里我們就明白了為什么 response.body().string() 只能調(diào)用一次的原因,即 string() 方法中獲取到 String后又調(diào)用了 Util.closeQuietly(source) 方法關(guān)閉了輸入流或渤,并且標(biāo)記 closed 為 true系冗,然后第二次調(diào)用 string() 方法的時候會在 RealBufferedSource.rangeEquals() 方法進行判斷,為 true 就拋出異常薪鹦。
這樣設(shè)計的原因是服務(wù)器返回的 body 可能會很大掌敬,所以 OkHttp 不會將其存儲在內(nèi)存中,只有當(dāng)你需要的時候才去獲取它池磁,如果沒有新的請求則無法獲取 2 次奔害。
三、總結(jié)
看完源碼地熄,發(fā)現(xiàn) OkHttp 是一個設(shè)計得非常優(yōu)秀的框架华临。該框架運用了很多設(shè)計模式,例如建造者模式端考、責(zé)任鏈模式等等雅潭。知道了 OkHttp 的核心是攔截器,這里采用的就是責(zé)任鏈模式却特,每個攔截器負責(zé)相應(yīng)的功能扶供,發(fā)起請求的時候由上往下依次執(zhí)行每個攔截器,響應(yīng)的數(shù)據(jù)則層層往上傳遞核偿。
參考資料: