1.前言
在上一章節(jié)中,簡單分析了Volley
源碼的實現(xiàn),他并不是一個底層網(wǎng)絡(luò)庫,而是上層網(wǎng)絡(luò)庫的封裝,這一章將繼續(xù)講解網(wǎng)絡(luò)庫的第三篇OkHttp
,他是Square公司Jake Wharton的杰作,是android端最主流的開源框架
2.目錄
3.OkHttp
3.1.OkHttp的優(yōu)勢
首先回顧下OkHttp的優(yōu)勢
- 1.對同一主機發(fā)出的所有請求都可以共享相同的套接字連接
- 2.使用連接池來復(fù)用連接以提高效率
- 3.提供了對GZIP的默認支持來降低傳輸內(nèi)容的大小
- 4.對Http響應(yīng)的緩存機制,可以避免不必要的網(wǎng)絡(luò)請求
- 5.當網(wǎng)絡(luò)出現(xiàn)問題時,OkHttp會自動重試一個主機的多個IP地址
3.2.OkHttp的簡單使用
- 同步請求(需在子線程運行)
private void synchronizeRequest() {
// 構(gòu)建okHttpClient,相當于請求的客戶端,Builder設(shè)計模式
OkHttpClient okHttpClient = new OkHttpClient.Builder()
.readTimeout(5, TimeUnit.SECONDS)
.build();
// 構(gòu)建一個請求體芙贫,同樣也是Builder設(shè)計模式
Request request = new Request.Builder()
.url("http://www.baidu.com")
.build();
// 生成一個Call對象进栽,該對象是接口類型
Call call = okHttpClient.newCall(request);
try {
// 拿到Response
Response response = call.execute();
System.out.println(response.body().string());
} catch (IOException e) {
e.printStackTrace();
}
}
- 異步請求(響應(yīng)回調(diào)在子線程)
private void asyncRequest() {
// 構(gòu)建okHttpClient,相當于請求的客戶端镣丑,Builder設(shè)計模式
OkHttpClient okHttpClient = new OkHttpClient.Builder()
.readTimeout(5, TimeUnit.SECONDS)
.build();
// 構(gòu)建一個請求體奢入,同樣也是Builder設(shè)計模式
Request request = new Request.Builder()
.url("http://www.baidu.com")
.build();
// 生成一個Call對象,該對象是接口類型
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
System.out.println(response.body().string());
}
});
}
- post請求的兩種方式
private static void postRequest() throws IOException {
OkHttpClient okHttpClient = new OkHttpClient.Builder()
.build();
//表單形式
FormBody formBody = new FormBody.Builder()
.add("name", "mary")
.build();
Request request = new Request.Builder()
.post(formBody)
.build();
okhttp3.Response execute = okHttpClient.newCall(request).execute();
//json形式
MediaType mediaType = MediaType.parse("application/json");
//object表示實體類
String json = GsonUtils.toJson(new Object());
RequestBody requestBody = RequestBody.create(mediaType, json);
Request request1 = new Request.Builder()
.post(requestBody)
.build();
okhttp3.Response execute1 = okHttpClient.newCall(request1).execute();
}
3.3.OkHttp的源碼分析
先簡單看一下OkHttp的執(zhí)行流程
- 從流程圖看一看出異步
enqueue()
請求是由Dispatcher進行分發(fā),在新線程調(diào)用,execute()
請求在當前線程調(diào)用的 - 后面會通過責任鏈模式進行請求和響應(yīng)的處理
首先OkHttp
通過建造者模式可以配置的參數(shù)
//攔截器
final List<Interceptor> interceptors = new ArrayList<>();
//網(wǎng)絡(luò)攔截器
final List<Interceptor> networkInterceptors = new ArrayList<>();
//緩存
@Nullable Cache cache;
//內(nèi)部緩存
@Nullable InternalCache internalCache;
public Builder() {
//分發(fā)器
dispatcher = new Dispatcher();
//協(xié)議
protocols = DEFAULT_PROTOCOLS;
//傳輸層版本和連接協(xié)議(Https需進行配置)
connectionSpecs = DEFAULT_CONNECTION_SPECS;
//事件監(jiān)聽工廠
eventListenerFactory = EventListener.factory(EventListener.NONE);
//代理選擇器
proxySelector = ProxySelector.getDefault();
//cookie
cookieJar = CookieJar.NO_COOKIES;
//socket工廠
socketFactory = SocketFactory.getDefault();
//主機名字驗證
hostnameVerifier = OkHostnameVerifier.INSTANCE;
//證書鏈
certificatePinner = CertificatePinner.DEFAULT;
//代理身份驗證
proxyAuthenticator = Authenticator.NONE;
//本地身份驗證
authenticator = Authenticator.NONE;
//連接池
connectionPool = new ConnectionPool();
//域名
dns = Dns.SYSTEM;
//安全桃姐層重定向
followSslRedirects = true;
//本地重定向
followRedirects = true;
//失敗重連
retryOnConnectionFailure = true;
//連接超時
connectTimeout = 10_000;
//讀操作超時
readTimeout = 10_000;
//寫操作超時
writeTimeout = 10_000;
//ping測試周期
pingInterval = 0;
}
接著再看看rquest
可配置的參數(shù)
Request(Request.Builder builder) {
//url地址
this.url = builder.url;
//請求方式
this.method = builder.method;
//請求頭
this.headers = builder.headers.build();
//請求體(post請求RequestBody)
this.body = builder.body;
//請求標識
this.tag = builder.tag != null ? builder.tag : this;
}
接著看請求的構(gòu)建okHttpClient.newCall(request)
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// 構(gòu)建一個RealCall
RealCall call = new RealCall(client, originalRequest, forWebSocket);
//call事件的綁定
call.eventListener = client.eventListenerFactory().create(call);
return call;
}
接著再看請求的同步執(zhí)行call.execute()
@Override public Response execute() throws IOException {
//防止并發(fā)安全,且同一個請求只能執(zhí)行一次
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
//call回調(diào)開始事件
eventListener.callStart(this);
try {
//通過dispatcher將這個請求加入到runningSyncCalls隊列中
client.dispatcher().executed(this);
//調(diào)用攔截鏈獲取響應(yīng)結(jié)果
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
//call回調(diào)失敗事件
eventListener.callFailed(this, e);
throw e;
} finally {
//請求結(jié)束 將請求從runningSyncCalls隊列移除
client.dispatcher().finished(this);
}
}
接著再來看看dispatcher
是如何分發(fā)處理請求的,從上面分分析可以看到dispatcher
是在構(gòu)建Builder()
是初始化的dispatcher = new Dispatcher()
,下面來看看Dispatcher
這個類
public final class Dispatcher {
//異步請求的最大請求數(shù) 超過則添加到準備隊列
private int maxRequests = 64;
//異步請求中每個主機的同時執(zhí)行的最大請求數(shù)
private int maxRequestsPerHost = 5;
//閑置時的回調(diào)(沒有正在執(zhí)行的同步和異步請求)
private @Nullable Runnable idleCallback;
//線程池執(zhí)行器,懶漢式創(chuàng)建
private @Nullable ExecutorService executorService;
//準備執(zhí)行的異步隊列請求(會在前面的請求執(zhí)行完畢后進行判斷調(diào)用)
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//正在執(zhí)行的異步請求隊列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//正在執(zhí)行的同步請求隊列(僅作為取消和統(tǒng)計請求數(shù)時使用)
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
}
接著查看client.dispatcher().executed(this)
方法
//同步的將其添加到正在運行的同步請求隊列
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
接著對比查看異步請求call.enqueue(new Callback())
@Override public void enqueue(Callback responseCallback) {
//同同步,同一請求只執(zhí)行一次
//異步請求,一般不會出現(xiàn)并發(fā)安全
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
//call開始事件
eventListener.callStart(this);
//通過dispatcher將異步請求添加到異步隊列中
client.dispatcher().enqueue(new RealCall.AsyncCall(responseCallback));
}
synchronized void enqueue(RealCall.AsyncCall call) {
//判斷正在運行的請求數(shù)是否小于最大請求數(shù)及同一主機的請求數(shù)是否小于同一主機的最大請求數(shù)
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
//小于則添加到異步運行隊列
runningAsyncCalls.add(call);
//并通過線程執(zhí)行器執(zhí)行請求
executorService().execute(call);
} else {
//大于則添加到等待隊列
readyAsyncCalls.add(call);
}
}
然后再來查看RealCall.AsyncCall
,實際上是一個Runnable
,但是run()
方法有調(diào)用了execute()
方法
@Override protected void execute() {
boolean signalledCallback = false;
try {
//通過攔截連獲取響應(yīng)數(shù)據(jù)
Response response = getResponseWithInterceptorChain();
//請求是否被取消
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
//請求失敗被取消
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
//響應(yīng)成功的回調(diào)
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
//響應(yīng)是否已經(jīng)返回
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
//響應(yīng)失敗的回調(diào)
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//請求結(jié)束,移除異步執(zhí)行隊列
client.dispatcher().finished(this);
}
}
然后就是兩種請求方式都調(diào)用的Response response = getResponseWithInterceptorChain()
方法,這是OkHttp的核心方法,弄懂這個流程基本上也就了解了OkHttp的原理
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
//okHttpClient配置的攔截器
interceptors.addAll(client.interceptors());
//失敗和重定向攔截器
interceptors.add(retryAndFollowUpInterceptor);
//從應(yīng)用程序代碼到網(wǎng)絡(luò)代碼的橋梁.根據(jù)用戶請求構(gòu)建網(wǎng)絡(luò)請求,根據(jù)網(wǎng)絡(luò)響應(yīng)構(gòu)建用戶響應(yīng)
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//處理 緩存配置 根據(jù)條件返回緩存響應(yīng)
//設(shè)置請求頭(If-None-Match铅鲤、If-Modified-Since等) 服務(wù)器可能返回304(未修改)
//可配置用戶自己設(shè)置的緩存攔截器
interceptors.add(new CacheInterceptor(client.internalCache()));
//連接服務(wù)器 負責和服務(wù)器建立連接 這里才是真正的請求網(wǎng)絡(luò)
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
//okHttpClient配置的網(wǎng)絡(luò)攔截器
//返回觀察單個網(wǎng)絡(luò)請求和響應(yīng)的不可變攔截器列表
interceptors.addAll(client.networkInterceptors());
}
//執(zhí)行流操作(寫出請求體,獲得響應(yīng)數(shù)據(jù)) 負責向服務(wù)器發(fā)送請求數(shù)據(jù),從服務(wù)器讀取響應(yīng)數(shù)據(jù)
//進行http請求報文的封裝與請求報文的解析
interceptors.add(new CallServerInterceptor(forWebSocket));
//創(chuàng)建責任鏈,請求的調(diào)用者
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
上述代碼的關(guān)鍵在于RealInterceptorChain
,他是使責任鏈運行的關(guān)鍵代碼,先看看他們的結(jié)構(gòu)
public final class RealInterceptorChain implements Interceptor.Chain {
//構(gòu)造器
public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
HttpCodec httpCodec, RealConnection connection, int index, Request request, Call call,
EventListener eventListener, int connectTimeout, int readTimeout, int writeTimeout) {
//攔截器集合
this.interceptors = interceptors;
//實際建立連接的類
this.connection = connection;
//流分配類
this.streamAllocation = streamAllocation;
//http請求響應(yīng)編解碼
this.httpCodec = httpCodec;
//集合中攔截器的索引
this.index = index;
//當前請求
this.request = request;
//操作請求的接口類
this.call = call;
//網(wǎng)絡(luò)請求流程事件監(jiān)聽類
this.eventListener = eventListener;
//連接超時
this.connectTimeout = connectTimeout;
//IO讀操作超時
this.readTimeout = readTimeout;
//IO寫操作超時
this.writeTimeout = writeTimeout;
}
}
然后會調(diào)用chain.proceed(originalRequest)
@Override public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
//索引越界則拋出 斷言Error
if (index >= interceptors.size()) throw new AssertionError();
//calls的執(zhí)行次數(shù)
calls++;
// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// 獲取下一個攔截鏈
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
//獲取攔截器對象
Interceptor interceptor = interceptors.get(index);
//攔截器執(zhí)行攔截鏈的方法
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}
return response;
}
可能現(xiàn)在還看不太清楚到底是怎么一步步的調(diào)用每一個攔截器的,這里主要涉及一個接口Interceptor
及其內(nèi)部接口Interceptor.Chain
public interface Interceptor {
Response intercept(okhttp3.Interceptor.Chain chain) throws IOException;
interface Chain {
Request request();
Response proceed(Request request) throws IOException;
}
}
- 1.首先是
RealInterceptorChain.proceed(originalRequest)
,開發(fā)分發(fā)請求 - 2.
RealInterceptorChain next = new RealInterceptorChain(index + 1...)
,將索引加1,獲取下一個攔截鏈(見3) - 2.
Interceptor interceptor = interceptors.get(index)
獲取index
的攔截器 - 3.
Response response = interceptor.intercept(next)
,具體攔截器執(zhí)行,并傳入寫一個RealInterceptorChain
,里面有請求的各種參數(shù) - 4.然后查看具體的攔截器的
intercept()
方法,如:RealInterceptorChain
攔截器,intercept()
方法中realChain.proceed(request, streamAllocation, null, null)
又會遞歸回調(diào)RealInterceptorChain.proceed(originalRequest)
回到第1步,至此整個遞歸的閉環(huán)流程就走完了
3.3.1. RetryAndFollowUpInterceptor攔截器
- 作用:用于失敗時恢復(fù)以及在必要時進行重定向
@Override public Response intercept(Interceptor.Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();
//創(chuàng)建流分配相關(guān)類
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
//重試次數(shù)
int followUpCount = 0;
//先前的Response
Response priorResponse = null;
while (true) {
//取消 釋放流分配類
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response;
boolean releaseConnection = true;
try {
//遞歸調(diào)用攔截器鏈
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
//是否是一個可恢復(fù)的請求
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
//重試 不繼續(xù)向下執(zhí)行
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
//請求是否已經(jīng)發(fā)送
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
//是否是一個可恢復(fù)的請求
if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
releaseConnection = false;
//重試 不繼續(xù)向下執(zhí)行
continue;
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
//沒有捕獲到異常 釋放流分配類
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
//priorResponse用于保存前一個Response
// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
//判斷是否需要重定向 需要則返回一個重定向的Request 沒有則返回null
Request followUp = followUpRequest(response, streamAllocation.route());
if (followUp == null) {
//不需要重定向
if (!forWebSocket) {
//不是webSocket 釋放streamAllocation
streamAllocation.release();
}
//直接返回response
return response;
}
//需要重定向 關(guān)閉響應(yīng)流
closeQuietly(response.body());
//重定向次數(shù)加1 若大于最大重定向次數(shù) 釋放streamAllocation 拋出ProtocolException
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
//如果是不能被再次操作的請求體
if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
//判斷是否為同一連接 請求
if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
//否則重新創(chuàng)建StreamAllocation
streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
//重新賦值request priorResponse 再次執(zhí)行
request = followUp;
priorResponse = response;
}
}
然后查看是否可重試的方法recover()
private boolean recover(IOException e, boolean requestSendStarted, Request userRequest) {
streamAllocation.streamFailed(e);
// The application layer has forbidden retries.
//如果OkHttpClient直接配置拒絕失敗重連,return false
if (!client.retryOnConnectionFailure()) return false;
// We can't send the request body again.
//如果請求已經(jīng)發(fā)送枫弟,并且這個請求體是一個UnrepeatableRequestBody類型邢享,則不能重試。
//StreamedRequestBody實現(xiàn)了UnrepeatableRequestBody接口淡诗,是個流類型骇塘,不會被緩存,所以只能執(zhí)行一次韩容,具體可看款违。
if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false;
// This exception is fatal.
//一些嚴重的問題,就不要重試了
if (!isRecoverable(e, requestSendStarted)) return false;
// No more routes to attempt.
//沒有更多的路由就不要重試了
if (!streamAllocation.hasMoreRoutes()) return false;
// For failure recovery, use the same route selector with a new connection.
return true;
}
從上面的代碼中可以看出,RetryAndFollowUpInterceptor
內(nèi)部開啟了while(true)
循環(huán)
- 1.內(nèi)部請求拋出異常時,判斷是否需要重試
- 2.當響應(yīng)結(jié)果是重定向時,構(gòu)建新的請求
是否可以重試的規(guī)則recover()
:
- 1.client的
retryOnConnectionFailure
參數(shù)設(shè)置為false,不進行重試 - 2.請求的body已經(jīng)發(fā)出,不進行重試
- 3.特殊的異常類型,不進行重試
- 4.沒有更多的route(包含proxy和inetaddress),不進行重試
3.3.2. BridgeInterceptor攔截器
- 作用:根據(jù)用戶請求構(gòu)建網(wǎng)絡(luò)請求,根據(jù)網(wǎng)絡(luò)響應(yīng)構(gòu)建用戶響應(yīng)
@Override public Response intercept(Interceptor.Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
//處理請求header相關(guān)信息
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
//沒有添加編碼方式,默認添加gzip的編解碼
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
//如果添加了cookieJar,這里會設(shè)置到Cookie上
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
//上面是請求頭的處理
//遞歸調(diào)用連接鏈
Response networkResponse = chain.proceed(requestBuilder.build());
//根據(jù)響應(yīng)結(jié)果 是否保存cookie
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
//重新設(shè)置響應(yīng)頭
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
//當服務(wù)器返回的數(shù)據(jù)是 GZIP 壓縮的群凶,那么客戶端就有責任去進行解壓操作 使用的是okio
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
BridgeInterceptor
負責在request階段對請求頭添加一些字段,cookie處理,在response階段對響應(yīng)進行一些gzip解壓(okio)
操作插爹。
3.3.3. CacheInterceptor攔截器
- 作用:根據(jù)用戶的配置,處理緩存相關(guān)內(nèi)容
@Override public Response intercept(Interceptor.Chain chain) throws IOException {
//根據(jù)配置讀取候選緩存
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
//解析請求和緩存策略
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
//如果僅度緩存,那么networkRequest會為null
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
cache.trackResponse(strategy);
}
if (cacheCandidate != null && cacheResponse == null) {
// The cache candidate wasn't applicable. Close it.
//候選緩存 無用可關(guān)閉
closeQuietly(cacheCandidate.body());
}
//不使用網(wǎng)絡(luò)請求且緩存為空 返回504 空的響應(yīng)
// If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
//不使用網(wǎng)絡(luò)請求 返回緩存響應(yīng)
// If we don't need the network, we're done.
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
Response networkResponse = null;
try {
//前面都沒有return 繼續(xù)執(zhí)行攔截器鏈
networkResponse = chain.proceed(networkRequest);
} finally {
//防止內(nèi)存泄露 關(guān)閉候選緩存
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
//根據(jù)網(wǎng)絡(luò)結(jié)果 如果是304 把網(wǎng)絡(luò)響應(yīng)和緩存的響應(yīng)合并 然后返回響應(yīng)數(shù)據(jù)
// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
//更新緩存
cache.update(cacheResponse, response);
return response;
} else {
//內(nèi)容有修改 關(guān)閉緩存
closeQuietly(cacheResponse.body());
}
}
//無匹配的緩存
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
// 將響應(yīng)數(shù)據(jù)寫入緩存
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
//是否是無效的緩存
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
//刪除已有緩存
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
return response;
}
-
CacheInterceptor
首先根據(jù)request
嘗試從cache
讀取緩存,由于OkHttp
默認不支持緩存,需通過OkHttpClient
配置 - 根據(jù)緩存策略是否使用網(wǎng)絡(luò),是否存在緩存,來構(gòu)建Response返回
- 如果緩存策略中網(wǎng)絡(luò)請求為空,繼續(xù)執(zhí)行攔截鏈,然后根據(jù)
networkResponse````來對
response```進行緩存和返回 - 當數(shù)據(jù)指定只從緩存獲取時,后面的攔截鏈將不會執(zhí)行
3.3.4. ConnectInterceptor攔截器
- 作用:負責DNS解析和Scoket連接
@Override public Response intercept(Interceptor.Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
//獲取RetryAndFollowUpInterceptor創(chuàng)建的StreamAllocation
//包含connectionPool Address Call等信息
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
//創(chuàng)建連接等一系列操作
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
//獲取創(chuàng)建的connection
RealConnection connection = streamAllocation.connection();
//將建立的連接,傳遞到下一個攔截器鏈
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
從ConnectInterceptor
類本身來看代碼比較簡單,但其邏輯非常復(fù)雜,涉及網(wǎng)絡(luò)連接建立的整個過程,是最重要的攔截器之一
首先查看streamAllocation.newStream()
創(chuàng)建連接的方法
public HttpCodec newStream(OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
//重試
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
//獲取健康的連接
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
//對請求以及結(jié)果 編解碼的類(分http 1.1 和http 2.0)
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
接著查看獲取健康可用連接的方法findHealthyConnection ()
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
//循環(huán)查找 直至找到
while (true) {
//查找連接
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
//連接池同步查找 判斷是否是一個新的連接 是就直接返回
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
//如果不是 判斷是否是一個健康的連接 是則在后面返回
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
然后繼續(xù)查看查找連接的方法findConnection()
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
//是否在連接池中找到Connection
boolean foundPooledConnection = false;
//對應(yīng)找到的可用連接
RealConnection result = null;
//對應(yīng)找到的路由
Route selectedRoute = null;
//可釋放的連接
Connection releasedConnection;
//需要關(guān)閉的連接
Socket toClose;
//同步連接池 獲取里面的連接
synchronized (connectionPool) {
//是否釋放
if (released) throw new IllegalStateException("released");
//是否沒有編碼
if (codec != null) throw new IllegalStateException("codec != null");
//是否取消
if (canceled) throw new IOException("Canceled");
// Attempt to use an already-allocated connection. We need to be careful here because our
// already-allocated connection may have been restricted from creating new streams.
//首先嘗試將當前連接設(shè)置為可釋放的連接
releasedConnection = this.connection;
//如果限制了新的流的創(chuàng)建 需釋放當前連接 并返回需關(guān)閉的socket
toClose = releaseIfNoNewStreams();
if (this.connection != null) {
// We had an already-allocated connection and it's good.
//如果當前connection不為空 可直接使用
result = this.connection;
releasedConnection = null;
}
if (!reportedAcquired) {
// If the connection was never reported acquired, don't report it as released!
releasedConnection = null;
}
if (result == null) {
// Attempt to get a connection from the pool.
//當前連接不可用 通過for循環(huán)從連接池獲取合格的連接 成功獲取會更新connection的值
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
//關(guān)閉socket
closeQuietly(toClose);
//事件的綁定
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
//如果找到已分配或連接池可復(fù)用的連接 則直接返回該對象
return result;
}
// If we need a route selection, make one. This is a blocking operation.
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
//切換路由
routeSelection = routeSelector.next();
}
synchronized (connectionPool) {
//是否取消
if (canceled) throw new IOException("Canceled");
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
//會更新connection值 找到則跳出for循環(huán)
Internal.instance.get(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}
//如果任然沒有找到可用連接
if (!foundPooledConnection) {
//如果當前路由為空
if (selectedRoute == null) {
//切換路由
selectedRoute = routeSelection.next();
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
//創(chuàng)建新連接
result = new RealConnection(connectionPool, selectedRoute);
//會更新connection值
acquire(result, false);
}
}
// If we found a pooled connection on the 2nd time around, we're done.
// 如果找到(通過一組ip地址查找)
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
//返回可復(fù)用的連接
return result;
}
// Do TCP + TLS handshakes. This is a blocking operation.
//TCP的三次握手 和 TLS握手
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;
//同步將connect加入連接池
// Pool the connection.
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
//多路復(fù)用的判斷 http2才有
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
//關(guān)閉socket
closeQuietly(socket);
//連接的事件回調(diào)
eventListener.connectionAcquired(call, result);
return result;
}
ConnectInterceptor
的連接過程還是比較復(fù)雜的,上面大概過程都已講述,主要的過程就是獲取當前連接(connection)
如果不可用,則從連接池中獲取可復(fù)用連接,如果仍然獲取不到,則新建連接,通過連接生成編解碼對象(HttpCodec)
,繼續(xù)交由攔截鏈處理
3.3.5. CallServerInterceptor攔截器
- 作用:http請求報文的封裝與請求報文的解析
@Override public Response intercept(Interceptor.Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
realChain.eventListener().requestHeadersStart(realChain.call());
//sink(Okio中OutputStream)中寫頭信息
httpCodec.writeRequestHeaders(request);
realChain.eventListener().requestHeadersEnd(realChain.call(), request);
Response.Builder responseBuilder = null;
//檢查是否是GET和HEAD以外的請求
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
//如果頭部添加了"100-continue",相當于第一次握手,只有拿到服務(wù)器結(jié)果再繼續(xù)
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
realChain.eventListener().responseHeadersStart(realChain.call());
//構(gòu)建responseBuilder 當response100 返回null
responseBuilder = httpCodec.readResponseHeaders(true);
}
// 前面的"100-continue" responseBuilder 為null
if (responseBuilder == null) {
// Write the request body if the "Expect: 100-continue" expectation was met.
//head成功響應(yīng)
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
//請求體的輸出流
CallServerInterceptor.CountingSink requestBodyOut =
new CallServerInterceptor.CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
//發(fā)送請求體
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if (!connection.isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
//初次握手失敗
//進制同主機請求流的分配
streamAllocation.noNewStreams();
}
}
//flush流
httpCodec.finishRequest();
//如果是GET請求 揮著需要"100-continue"握手成功的情況下
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
//再次構(gòu)建responseBuilder
responseBuilder = httpCodec.readResponseHeaders(false);
}
//獲取響應(yīng)
Response response = responseBuilder
//原請求
.request(request)
//握手情況
.handshake(streamAllocation.connection().handshake())
//請求時間
.sentRequestAtMillis(sentRequestMillis)
//響應(yīng)時間
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (code == 100) {
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
//即使我們沒有請求 服務(wù)端也會發(fā)送一個100-continue
//重新讀取真正的響應(yīng)
responseBuilder = httpCodec.readResponseHeaders(false);
//構(gòu)建response
response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
code = response.code();
}
realChain.eventListener()
.responseHeadersEnd(realChain.call(), response);
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
//返回一個空的響應(yīng)體
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
//請求關(guān)閉連接 連接不需要保活
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
//拋出協(xié)議異常 204:No Content 205:Reset Content
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
CallServerInterceptor
攔截器的主要工作過程是獲取HttpCodec
對象,針對Http1.1
之前和Http2
不同協(xié)議的http處理,發(fā)送http請求構(gòu)建Response.Builder
對象,然后構(gòu)建```Response````并返回
3.3.6. Interceptors和networkInterceptors
Interceptors:應(yīng)用攔截器,對發(fā)出去的
Request
做最初的處理,拿到Response
做最后的處理
- 不用擔心響應(yīng)和重定向之間的中間響應(yīng)(發(fā)生重定向也只調(diào)用一次)
- 始終調(diào)用一次,即使Http響應(yīng)是從緩存中提供的
- 關(guān)注原始的
request
,而不關(guān)心注入的headers
,比如if-None-Match
- 允許短路,并且不調(diào)用
chain.proceed()
(意思是可通過緩存返回Response
實例)- 允許請求失敗重試,并多次調(diào)用
chain.proceed()
networkInterceptors:對發(fā)出去的
Request
做最后的處理,拿到Response
時做最初的處理
- 允許像重定向和重試一樣的中間響應(yīng)(重定向多次調(diào)用)
- 網(wǎng)絡(luò)發(fā)生短路時不調(diào)用緩存響應(yīng)(必須通過
chain.proceed()
獲取)- 在數(shù)據(jù)被傳遞到網(wǎng)絡(luò)時觀察數(shù)據(jù)(網(wǎng)絡(luò)請求過程日志)
- 有權(quán)獲得裝載請求的連接(可通過
chain.connection()
獲取)
4.總結(jié)
至此,OkHttp
的整個請求過程大致過了一遍,最為核心的內(nèi)容為攔截器部分
,因為所有請求的創(chuàng)建,連接,響應(yīng)都是在這里面處理的
關(guān)于連接池的復(fù)用是通過connectionPool
處理的,他會將要執(zhí)行的RealConnection
維護到一個隊列中,并且會清理掉keepAliveDurationNs
超時和longestIdleDurationNs
超時的連接,然后執(zhí)行網(wǎng)絡(luò)請求時會從隊列中獲取合適的連接,獲取不到則建立新的連接