分析Okhttp源碼的一篇筆記
1.OkHttp的請求網(wǎng)絡(luò)流程
使用就不說了 Okhttp官網(wǎng):http://square.github.io/okhttp/
(1) 從請求處理開始分析
OkHttpClient.newCall(request) 進(jìn)行 execute 或者 enqueue操作 當(dāng)調(diào)用newCall方法時
@Override public Call newCall(Request request) {
return new RealCall(this, request);
}
看到返回的是一個RealCall類, 調(diào)用enqueue 異步請求網(wǎng)絡(luò)實際調(diào)用的是RealCall的enqueue方法幔虏,接下來看一下RealCall的enqueue里面干了什么
void enqueue(Callback responseCallback, boolean forWebSocket) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
//最終的請求是dispatcher來完成
client.dispatcher().enqueue(new AsyncCall(responseCallback, forWebSocket));
}
So 接下來分析一下dispatcher
(2) Dispatcher任務(wù)調(diào)度
Dispatcher 主要是控制并發(fā)的請求避凝,它主要維護(hù)一下變量:
//最大并發(fā)請求數(shù)
private int maxRequests = 64;
//每個主機(jī)的最大請求數(shù)
private int maxRequestsPerHost = 5;
/** 消費者線程 */
private ExecutorService executorService;
/** 將要運(yùn)行的異步請求隊列 */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** 正在運(yùn)行的異步請求隊列*/
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** 正在運(yùn)行的同步請求隊列 */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
Dispatcher構(gòu)造方法
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
兩個構(gòu)造方法喊递,可以使用自己設(shè)定的線程池枢里,如果沒有設(shè)定,則會請求網(wǎng)絡(luò)前自己創(chuàng)建默認(rèn)線程池撼短。這個線程池類似CachedThreadPool,比較適合執(zhí)行大量的耗時比較少的任務(wù)踩寇,其實當(dāng)調(diào)用RealCall的enqueue方法實際上調(diào)用的Dispatcher里面的enqueue方法
synchronized void enqueue(AsyncCall call) {
//如果正在運(yùn)行的異步請求隊列小于64且 正在運(yùn)行的請求主機(jī)數(shù)小于5 則把請求加載到runningAsyncCalls
//中并在線程池中執(zhí)行框沟,否則就加入到readyAsyncCalls中進(jìn)行緩存等待牍氛。
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
線程池中傳進(jìn)來的參數(shù)AsyncCall晨继,它是RealCall的內(nèi)部類,其內(nèi)部也實現(xiàn)了execute方法
@Override protected void execute() {
boolean signalledCallback = false;
try {
...
} catch (IOException e) {
...
} finally {
//無論這個請求的結(jié)果如何搬俊,都會執(zhí)行 client.dispatcher().finished(this);
client.dispatcher().finished(this);
}
}
}
finished方法如下
synchronized void finished(AsyncCall call) {
//將此次請求 runningAsyncCalls移除后還執(zhí)行promoteCalls方法
if (!runningAsyncCalls.remove(call)) throw new AssertionError("AsyncCall wasn't running!");
promoteCalls();
}
promoteCalls方法如下
private void promoteCalls() {
//如果正在運(yùn)行的異步請求隊列數(shù)大于最大請求數(shù) return
if (runningAsyncCalls.size() >= maxRequests) return;
//若果將要運(yùn)行的異步請求隊列為空 return
if (readyAsyncCalls.isEmpty()) return;
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();//取出下一個請求
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);//加入runningAsyncCalls中并交由線程池處理
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
在回AsyncCall的execute方法
@Override protected void execute() {
try {
//getResponseWithInterceptorChain() 返回Response 請求網(wǎng)絡(luò)
Response response = getResponseWithInterceptorChain(forWebSocket);
...
} catch (IOException e) {
...
} finally {
...
}
}
}
(3) Interceptor攔截器
getResponseWithInterceptorChain方法如下
private Response getResponseWithInterceptorChain(boolean forWebSocket) throws IOException {
Interceptor.Chain chain = new ApplicationInterceptorChain(0, originalRequest, forWebSocket);
return chain.proceed(originalRequest);
}
getResponseWithInterceptorChain方法中創(chuàng)建了ApplicationInterceptorChain紊扬,他是一個攔截器鏈,這個類也是RealCall的內(nèi)部類唉擂,接下來執(zhí)行了它的proceed方法
@Override public Response proceed(Request request) throws IOException {
if (index < client.interceptors().size()) {
Interceptor.Chain chain = new ApplicationInterceptorChain(index + 1, request, forWebSocket);
//從攔截器列表中取出攔截器
Interceptor interceptor = client.interceptors().get(index);
Response interceptedResponse = interceptor.intercept(chain);//1
if (interceptedResponse == null) {
throw new NullPointerException("application interceptor " + interceptor
+ " returned null");
}
return interceptedResponse;
}
// 如果沒有更多攔截器的話 執(zhí)行網(wǎng)絡(luò)請求
return getResponse(request, forWebSocket);
}
}
proceed方法每次從攔截器列表中去除攔截器餐屎,當(dāng)存在多個攔截器時都會在上面注釋1處阻塞,并等待下一個攔截器的調(diào)用返回玩祟,下面分別以攔截器中有一個啤挎、兩個攔截器的場景加以模擬
攔截器是一種能夠監(jiān)控、重寫卵凑、重試調(diào)用的機(jī)制。通常情況下用來添加胜臊、移除勺卢、轉(zhuǎn)換請求和響應(yīng)的頭部信息,比如將域名替換為IP地址象对,在請求中添加host屬性黑忱,也可以添加我們應(yīng)用中的一下公共參數(shù),比如設(shè)備id勒魔、版本號等甫煞,回到代碼上來, 最后一行返回getResponse(request, forWebSocket)
來看getResponse做了什么
Response getResponse(Request request, boolean forWebSocket) throws IOException {
engine = new HttpEngine(client, request, false, false, forWebSocket, null, null, null);
int followUpCount = 0;
while (true) {
if (canceled) {
engine.releaseStreamAllocation();
throw new IOException("Canceled");
}
boolean releaseConnection = true;
try {
engine.sendRequest();
engine.readResponse();
releaseConnection = false;
} catch (RequestException e) {
throw e.getCause();
} catch (RouteException e) {
} catch (IOException e) {
...
}
}
}
創(chuàng)建了HttpEngine并且調(diào)用了HttpEngine的sendRequest方法和readResponse方法
(4) 緩存策略
查看一下sendRequest方法
public void sendRequest() throws RequestException, RouteException, IOException {
if (cacheStrategy != null) return; // Already sent.
if (httpStream != null) throw new IllegalStateException();
Request request = networkRequest(userRequest);
// 獲取client中的Cache冠绢,同時Cache在初始化時會讀取緩存目錄中曾經(jīng)請求過的所有信息
InternalCache responseCache = Internal.instance.internalCache(client);
Response cacheCandidate = responseCache != null
? responseCache.get(request)//1
: null;
long now = System.currentTimeMillis();
cacheStrategy = new CacheStrategy.Factory(now, request, cacheCandidate).get();
//網(wǎng)絡(luò)請求
networkRequest = cacheStrategy.networkRequest;
//緩存的響應(yīng)
cacheResponse = cacheStrategy.cacheResponse;
if (responseCache != null) {
//記錄當(dāng)前請求是網(wǎng)絡(luò)發(fā)起還是緩存發(fā)起
responseCache.trackResponse(cacheStrategy);
}
// 不進(jìn)行網(wǎng)絡(luò)請求并且緩存不存在或者過期抚吠,則返回504
if (networkRequest == null && cacheResponse == null) {
userResponse = new Response.Builder()
.request(userRequest)
.priorResponse(stripBody(priorResponse))
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_BODY)
.build();
return;
}
// 不進(jìn)行網(wǎng)絡(luò)請求而且緩存可以使用,則直接返回緩存
if (networkRequest == null) {
userResponse = cacheResponse.newBuilder()
.request(userRequest)
.priorResponse(stripBody(priorResponse))
.cacheResponse(stripBody(cacheResponse))
.build();
userResponse = unzip(userResponse);
return;
}
// 需要訪問網(wǎng)絡(luò)時
boolean success = false;
try {
httpStream = connect();
httpStream.setHttpEngine(this);
...
}
}
顯然是發(fā)送請求弟胀,但是最主要的是做了緩存的策略楷力,上面注釋1處cacheCandidate 是上次與服務(wù)器交互緩存的Response,這里緩存均基于Map,key是請求中的url的md5,value是在文件中查詢到的緩存,頁面置換基于LRU算法孵户,現(xiàn)在只需知道cachCandidate是一個可以讀取緩存Header的Response即可萧朝,根據(jù)cacheStrategy的處理得到了networkRequest和cacheResponse都是為nulld的情況下,也就是不進(jìn)行網(wǎng)絡(luò)請求并且緩存不存在或者過期夏哭,這個是返回504錯誤检柬,當(dāng)networkRequest為null時也就是不進(jìn)行網(wǎng)絡(luò)請求,如果緩存可以使用時則直接返回緩存竖配,其他則請求網(wǎng)絡(luò)何址。
接下來查看readResponse方法
public void readResponse() throws IOException {
...
Response networkResponse;
if (forWebSocket) {
//讀取網(wǎng)絡(luò)響應(yīng)
networkResponse = readNetworkResponse();
}
receiveHeaders(networkResponse.headers());
//檢查緩存是否可用里逆,如果可用,就用當(dāng)前緩存的Response头朱,關(guān)閉網(wǎng)絡(luò)連接运悲,釋放連接
if (cacheResponse != null) {
if (validate(cacheResponse, networkResponse)) {//1
userResponse = cacheResponse.newBuilder()
.request(userRequest)
.priorResponse(stripBody(priorResponse))
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
releaseStreamAllocation();
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
InternalCache responseCache = Internal.instance.internalCache(client);
responseCache.trackConditionalCacheHit();
responseCache.update(cacheResponse, stripBody(userResponse));
userResponse = unzip(userResponse);
return;
} else {
closeQuietly(cacheResponse.body());
}
}
);
}
這個方法主要用來解析HTTP響應(yīng)報文,如果有緩存并且可用项钮,則用緩存的數(shù)據(jù)并更新緩存班眯,否則就用網(wǎng)絡(luò) 請求返回的數(shù)據(jù),查看注釋1處validate方法是如何判斷緩存是否可用烁巫。
private static boolean validate(Response cached, Response network) {
//如果服務(wù)器返回304署隘,則緩存有效
if (network.code() == HTTP_NOT_MODIFIED) {
return true;
}
//通過緩存和網(wǎng)絡(luò)請求響應(yīng)中的Last-Modified來計算是否是最新數(shù)據(jù),如果是亚隙,則緩存有效
Date lastModified = cached.headers().getDate("Last-Modified");
if (lastModified != null) {
Date networkLastModified = network.headers().getDate("Last-Modified");
if (networkLastModified != null
&& networkLastModified.getTime() < lastModified.getTime()) {
return true;
}
}
return false;
}
如果緩存有效磁餐,則返回304 Not Modified,否者直接返回body阿弃。如果緩存過期或者強(qiáng)制放棄緩存诊霹,則緩存策略全部交給服務(wù)器判斷,客服端只需要發(fā)送條件GET請求即可渣淳。條件GET請求有兩種方式:一種是Last-Modified-Data脾还,另一種ETag,這里采用Last-Modified-Data,通過緩存和網(wǎng)絡(luò)請求響應(yīng)中的Last-Modified來計算是否是最新數(shù)據(jù) 入愧,如果是 則緩存有效
(5) 失敗重連
重回RealCall的getResponse方法
Response getResponse(Request request, boolean forWebSocket) throws IOException {
...
try {
}catch (RouteException e) {
HttpEngine retryEngine = engine.recover(e.getLastConnectException(), null);//1
} catch (IOException e) {
HttpEngine retryEngine = engine.recover(e, null);//2
}
}
}
當(dāng)發(fā)生IOException或者RouteException時都會執(zhí)行HttpEngine的recover方法 鄙漏,代碼如下:
public HttpEngine recover(IOException e, Sink requestBodyOut) {
if (!streamAllocation.recover(e, requestBodyOut)) {
return null;
}
if (!client.retryOnConnectionFailure()) {
return null;
}
StreamAllocation streamAllocation = close();
//重新創(chuàng)建HttpEngine并返回,用來完成重連
return new HttpEngine(client, userRequest, bufferRequestBody, callerWritesRequestBody,
forWebSocket, streamAllocation, (RetryableSink) requestBodyOut, priorResponse);
}
Okhttp請求流程圖