okhttp
1.基本使用
初始化可以添加自定義的攔截器
OkHttpClient okHttpClient = new OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.addInterceptor(interceptorImpl).builder();//創(chuàng)建OKHttpClient的Builder
使用方法
String url = "http://wwww.baidu.com";
final Request request = new Request.Builder()
.url(url)
.get()//默認(rèn)就是GET請求赴捞,可以不寫
.build();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
Log.d(TAG, "onFailure: ");
}
@Override
public void onResponse(Call call, Response response) throws IOException {
Log.d(TAG, "onResponse: " + response.body().string());
}
});
一般的使用大致就是這樣的
2.從OkHttpClient創(chuàng)建開始入手分析
OkHttpClient.Builder()使用builder模式逼裆,用戶可以自定義相應(yīng)的參數(shù)
開發(fā)一般會用到的是
.connectTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.addInterceptor(interceptorImpl)
連接時間,寫時間螟炫,讀時間以及對應(yīng)的Interceptor相關(guān)的攔截器
3.構(gòu)建Request
Request用的也是Builder模式波附,好處主要是可以動態(tài)配置相應(yīng)的參數(shù)
Request(Builder builder) {
this.url = builder.url;
this.method = builder.method;
this.headers = builder.headers.build();
this.body = builder.body;
this.tags = Util.immutableMap(builder.tags);
}
tag主要是做標(biāo)識的,請求返回為null時候的標(biāo)識操作
4.構(gòu)建Call
構(gòu)建Call,主要是調(diào)用RealCall.newRealCall方法昼钻,并在其內(nèi)部添加了一個事件回調(diào)監(jiān)聽
/**
* Prepares the {@code request} to be executed at some point in the future.
*/
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
//RealCall
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
//添加一個事件回調(diào)掸屡,后續(xù)會有用處
call.eventListener = client.eventListenerFactory().create(call);
return call;
}
而在newRealCall方法中同時也調(diào)用了RealCall的構(gòu)造方法
構(gòu)造方法中加入了RetryAndFollowUpInterceptor重試攔截器,okhttp中加入了很多攔截器然评,這也是一大特色
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}
5. 執(zhí)行異步請求enqueue
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
executed以及synchronized主要是用來防止重復(fù)操作和多線程同步用的
接下來的方法
private void captureCallStackTrace() {
Object callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()");
retryAndFollowUpInterceptor.setCallStackTrace(callStackTrace);
}
重試監(jiān)聽器做一些棧StackTrace記錄仅财,以及eventListener.callStart(this);事件監(jiān)聽做回調(diào)處理,不影響流程
接著就到了Dispatcher的enqueue方法
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
Dispatcher中定義三個隊列分別是readyAsyncCalls異步等待碗淌,同步運行runningAsyncCalls以及runningSyncCalls異步運行隊列盏求,enqueue方法中,當(dāng)運行異步隊列個數(shù)小于最大請求數(shù)(64)并且同一Host請求個數(shù)小于maxRequestsPerHost(5)則加入異步運行隊列亿眠,并且用線程執(zhí)行碎罚,否則加入異步等待隊列中,這是okhttp的線程隊列優(yōu)化
6.查看AsyncCall的run方法
AsyncCall 繼承了NamedRunnable纳像,其內(nèi)部會run方法會調(diào)用execute()荆烈,代碼如下
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
signalledCallback這個標(biāo)識用來處理是否打印對應(yīng)的日志,這里可以看到Response類竟趾,說明網(wǎng)絡(luò)請求是在getResponseWithInterceptorChain中完成的憔购,之后會回調(diào)當(dāng)前的Call狀態(tài)值
7.真正的網(wǎng)絡(luò)請求的getResponseWithInterceptorChain
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
//失敗重試攔截器
interceptors.add(retryAndFollowUpInterceptor);
//request和response攔截器
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//緩存攔截器
interceptors.add(new CacheInterceptor(client.internalCache()));
//網(wǎng)絡(luò)請求連接攔截器
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
//網(wǎng)絡(luò)攔截器
interceptors.addAll(client.networkInterceptors());
}
//實際網(wǎng)絡(luò)請求的攔截器
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
加入各式各樣的攔截器,各個攔截器之間不耦合岔帽,易于用戶的自己配置玫鸟,最后調(diào)用RealInterceptorChain的proceed方法
8.RealInterceptorChain的proceed方法
public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
HttpCodec httpCodec, RealConnection connection, int index, Request request, Call call,
EventListener eventListener, int connectTimeout, int readTimeout, int writeTimeout) {
this.interceptors = interceptors;
this.connection = connection;
this.streamAllocation = streamAllocation;
this.httpCodec = httpCodec;
this.index = index;
this.request = request;
this.call = call;
this.eventListener = eventListener;
this.connectTimeout = connectTimeout;
this.readTimeout = readTimeout;
this.writeTimeout = writeTimeout;
}
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
//...
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// .....
return response;
}
構(gòu)造方法中加入了eventListener事件監(jiān)聽,看來okhttp中eventListener的監(jiān)聽一直延伸到這里犀勒,還加入了
this.connectTimeout = connectTimeout;
this.readTimeout = readTimeout;
this.writeTimeout = writeTimeout;
連接時間的配置
要重點關(guān)注的是index這個字段屎飘,前面?zhèn)鬟M(jìn)來的時候,默認(rèn)是0贾费,而在proceed方法中枚碗,又重新執(zhí)行了RealInterceptorChain的構(gòu)造方法,并通過 interceptors.get(index)獲取下一個攔截器铸本,并且執(zhí)行interceptor.intercept(next)方法,隨便找一個攔截器看看
public final class BridgeInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
//省略部分代碼
Response networkResponse = chain.proceed(requestBuilder.build());
//省略部分代碼
return responseBuilder.build();
}
}
攔截器內(nèi)部又重新調(diào)用了chain.proceed的方法遵堵,這和遞歸操作類似箱玷,也是okHttp最經(jīng)典的責(zé)任鏈模式怨规。
9.同步操作
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
同步請求也是通過getResponseWithInterceptorChain來完成的,流程更簡單
10.大致的流程圖
總結(jié)
OkHttp 主要是通過 5 個攔截器和 3 個隊列(同步隊列锡足,異步隊列波丰,等待隊列)工作,內(nèi)部實現(xiàn)通過一個責(zé)任鏈模式完成舶得,將網(wǎng)絡(luò)請求的各個階段封裝到各個鏈條中掰烟,實現(xiàn)了各層的解耦。具體流程是>1.構(gòu)建OkHttpClient對象,通過RealCall發(fā)起同步或異步請求,而決定是異步還是同步請求的是由線程分發(fā)器dispatcher來決定沐批。
2.當(dāng)發(fā)起同步請求時會將請求加入到同步隊列中開啟子線程并執(zhí)行纫骑,當(dāng)發(fā)起異步請求時會創(chuàng)建一個線程池,并且判斷請求隊列是否大于最大請求隊列64,請求主機數(shù)是否大于5,如果大于請求添加到異步等待隊列中,否則添加到異步執(zhí)行隊列并執(zhí)行任務(wù)
3.通過Connection實例執(zhí)行請求,經(jīng)過攔截器鏈后拿到data九孩,然后回調(diào)callback的onresponse返回數(shù)據(jù)先馆。
Dispatcher 功能?
他負(fù)責(zé)將每一次Requst進(jìn)行分發(fā)躺彬,壓棧到自己的線程池煤墙,并通過調(diào)用者自己不同的方式進(jìn)行異步和同步處理。很好的維護(hù)了任務(wù)隊列宪拥。
攔截器的作用
**設(shè)置任意數(shù)量的 Intercepter 來對網(wǎng)絡(luò)請求及其響應(yīng)做任何中間處理仿野,比如設(shè)置緩存,Https證書認(rèn)證她君,請求加密脚作,過濾請求,打印log等等犁河。
BridgeInterceptor(橋接):為請求添加請求頭鳖枕,為響應(yīng)添加響應(yīng)頭,完成應(yīng)用層和網(wǎng)絡(luò)層的橋接
RetryAndFollowUpInterceptor(重定向阻):請求失敗重試
CacheInterceptor(緩存):緩存get請求
ConnectInterceptor(連接):內(nèi)部維護(hù)一個連接池,負(fù)責(zé)連接復(fù)用桨螺、創(chuàng)建連接宾符、釋放連接。
CallServerInterceptor(網(wǎng)絡(luò)):真正發(fā)起了網(wǎng)絡(luò)請求灭翔。里面用的okio庫魏烫,主要是segment的機制運用內(nèi)存共享和復(fù)用,數(shù)據(jù)不需要進(jìn)行二次copy,盡可能少的去申請內(nèi)存肝箱,同時也就降低了GC的頻率哄褒。強了流與流交互,優(yōu)化緩存策略減小內(nèi)存壓力和性能消耗煌张。
okhttp底層用的socket通信呐赡,而socket底層是tcp/ip傳輸協(xié)議,每次都需要進(jìn)行三次握手四次揮手過程骏融,而請求過程也經(jīng)常是頻繁的链嘀,碎片化的萌狂,為了提高網(wǎng)絡(luò)連接的效率,Okhttp3還實現(xiàn)了connectionPool網(wǎng)絡(luò)連接池進(jìn)行復(fù)用怀泊。
okhttp優(yōu)勢
1.支持 http2茫藏,對一臺機器的所有請求共享同一個 Socket
2.支持透明的 gzip 壓縮響應(yīng)體
3.請求失敗時自動重試主機的其他 ip,自動重定向
4.響應(yīng)緩存可以完全避免網(wǎng)絡(luò)重復(fù)請求
5.內(nèi)置連接池霹琼,支持連接復(fù)用务傲,減少延遲
6.豐富的 API,可擴展性好
7.框架使用了很多設(shè)計模式
Okhttp 運用了設(shè)計模式枣申?
1.構(gòu)造者模式(OkhttpClient,Request 等各種對象的創(chuàng)建)
2.工廠模式(在 Call 接口中售葡,有一個內(nèi)部工廠 realCallFactory 接口。)
3.單例模式(每個 OkHttpClient 對象都管理自己獨有的線程池和連接池糯而。 這一點很多同學(xué)天通,甚至在我經(jīng)歷的團隊中就有人踩過坑, 每一個請求都創(chuàng)建一個 OkHttpClient 導(dǎo)致內(nèi)存爆掉)
5.責(zé)任鏈模式(攔截器的鏈?zhǔn)秸{(diào)用)降低邏輯的耦合熄驼,相互獨立的邏輯寫到自己的攔截器中像寒,也無需關(guān)注其它攔截器所做的事情。還擴展性強瓜贾,可以添加新的攔截器诺祸。*
6.享元模式(Dispatcher 的線程池中實現(xiàn)了對象復(fù)用)
retrofit
1.使用
- 1、創(chuàng)建HTTP接口首先創(chuàng)建HTTP的API服務(wù)接口祭芦,接口下的一個方法對應(yīng)HTTP的一個請求筷笨,方法上面的注解表示請求的接口地址部分,返回類型是請求的返回值類型龟劲,方法的注解參數(shù)即是請求的參數(shù)胃夏。
public interface ApiService {
/**
* 登錄:
* @param body
* @return
*/
@POST("/ny/consumer/login")
Observable<BaseResponse<User>> login(@Body RequestBody body);
}
- 2、構(gòu)建Retrofit實例配置OkHttpClient實例昌跌;設(shè)置HTTP接口的域名地址仰禀;添加RxJava2網(wǎng)絡(luò)請求適配器工廠;添加Gson數(shù)據(jù)轉(zhuǎn)換器工廠蚕愤;
mRetrofit = new Retrofit.Builder()
.client(sOkHttpClient)
.baseUrl(HOST)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build();
- 3答恶、生成ApiService的動態(tài)代理通過Retrofit生成動態(tài)代理,用于發(fā)起HTTP請求萍诱。
mApiService = sRetrofit.create(ApiService.class);
- 4悬嗓、發(fā)起請求:使用動態(tài)代理發(fā)起HTTP請求。
getApiService().login(requestBody);
2裕坊、源碼解析
2.1包竹、Retrofit實例的構(gòu)建
Retrofit實例的構(gòu)建使用建造者模式,包括
1、okhttp3.Call.Factory也就是OkHttpClient映企,因為OkHttpClient實現(xiàn)了okhttp3.Call.Factory悟狱,用于發(fā)起請求。
2堰氓、Executor 用于提交回調(diào)任務(wù),默認(rèn)使用Platform.Android的MainThreadExecutor苹享,其實現(xiàn)了Executor接口双絮,并在執(zhí)行回調(diào)中實現(xiàn)了主線程Handler的handler.post(runnable)操作,用于將異步請求的回調(diào)結(jié)果從子線程切換到主線程得问。
static class MainThreadExecutor implements Executor {
private final Handler handler = new Handler(Looper.getMainLooper());
@Override public void execute(Runnable r) {
handler.post(r);
}
}
3囤攀、List<CallAdapter.Factory>
網(wǎng)絡(luò)請求適配器工廠,默認(rèn)使用Platform.Android的ExecutorCallAdapterFactory宫纬,該網(wǎng)絡(luò)請求適配器工廠適配的網(wǎng)絡(luò)請求是ExecutorCallbackCall焚挠。
@Override public @Nullable CallAdapter<?, ?> get(
Type returnType, Annotation[] annotations, Retrofit retrofit) {
if (getRawType(returnType) != Call.class) {
return null;
}
final Type responseType = Utils.getCallResponseType(returnType);
return new CallAdapter<Object, Call<?>>() {
@Override public Type responseType() {
return responseType;
}
@Override public Call<Object> adapt(Call<Object> call) {
return new ExecutorCallbackCall<>(callbackExecutor, call);
}
};
}
4、List<Converter.Factory> 數(shù)據(jù)轉(zhuǎn)換器工廠漓骚,默認(rèn)使用的是Platform.Android的OptionalConverterFactory蝌衔,該數(shù)據(jù)轉(zhuǎn)換器工廠使用的是默認(rèn)的OptionalConverter。
2.2蝌蹂、生成ApiService的動態(tài)代理并發(fā)起HTTP請求
采用動態(tài)代理可以非常靈活地實現(xiàn)解耦噩斟,傳入ApiService的Class對象,Proxy提供了用于創(chuàng)建動態(tài)代理對象的靜態(tài)方法孤个,執(zhí)行動態(tài)代理實例的每個方法時都會被替換為執(zhí)行InvocationHandler對象的invoke方法剃允。
public <T> T create(final Class<T> service){
}
InvocationHandler對象的invoke方法中最后調(diào)用的是ServiceMethod的invoke方法:
return loadServiceMethod(method).invoke(args != null ? args : emptyArgs);
也就是HttpServiceMethod實現(xiàn)的invoke方法:
方法中進(jìn)行了網(wǎng)絡(luò)請求適配器對OkHttpCall進(jìn)行的適配,也就是說網(wǎng)絡(luò)請求是使用OkHttpCall進(jìn)行的齐鲤,但返回類型由網(wǎng)絡(luò)請求適配器進(jìn)行適配斥废。
@Override ReturnT invoke(Object[] args) {
return callAdapter.adapt(
new OkHttpCall<>(requestFactory, args, callFactory, responseConverter));
}
從上文配置請求網(wǎng)絡(luò)適配器工廠我們知道,默認(rèn)的網(wǎng)絡(luò)請求適配器適配的是ExecutorCallbackCall给郊,故默認(rèn)使用ExecutorCallbackCall的enqueue做異步網(wǎng)絡(luò)請求:
@Override public void enqueue(final Callback<T> callback) {
delegate.enqueue(new Callback<T>() {
@Override public void onResponse(Call<T> call, final Response<T> response) {
callbackExecutor.execute(new Runnable() {
@Override public void run() {
if (delegate.isCanceled()) {
callback.onFailure(ExecutorCallbackCall.this, new IOException("Canceled"));
} else {
callback.onResponse(ExecutorCallbackCall.this, response);
}
}
});
}
@Override public void onFailure(Call<T> call, final Throwable t) {
callbackExecutor.execute(new Runnable() {
@Override public void run() {
callback.onFailure(ExecutorCallbackCall.this, t);
}
});
}
});
}
這里的delegate就是OkHttpCall實例牡肉,callbackExecutor就是MainThreadExecutor實例,OkHttpCall異步請求回調(diào)后丑罪,使用MainThreadExecutor提交回調(diào)任務(wù)荚板,該任務(wù)執(zhí)行的就是在異步請求回調(diào)的子線程中將異步請求的回調(diào)結(jié)果從子線程切換到主線程。
其中OkHttpCall的異步請求方法中吩屹,調(diào)用的就是okhttp3.Call的異步請求跪另,回調(diào)結(jié)果中會使用Converter<ResponseBody, T>對數(shù)據(jù)進(jìn)行轉(zhuǎn)換并返回。
T body = responseConverter.convert(catchingBody);
2.3煤搜、RxJava網(wǎng)絡(luò)請求適配器工廠
使用RxJava2CallAdapter做網(wǎng)絡(luò)請求適配器免绿,將Call轉(zhuǎn)換為Observable<Response>。
@Override public Object adapt(Call<R> call) {
Observable<Response<R>> responseObservable = isAsync
? new CallEnqueueObservable<>(call)
: new CallExecuteObservable<>(call);
Observable<?> observable;
if (isResult) {
observable = new ResultObservable<>(responseObservable);
} else if (isBody) {
observable = new BodyObservable<>(responseObservable);
} else {
observable = responseObservable;
}
if (scheduler != null) {
observable = observable.subscribeOn(scheduler);
}
if (isFlowable) {
return observable.toFlowable(BackpressureStrategy.LATEST);
}
if (isSingle) {
return observable.singleOrError();
}
if (isMaybe) {
return observable.singleElement();
}
if (isCompletable) {
return observable.ignoreElements();
}
return observable;
}
2.4擦盾、Gson數(shù)據(jù)轉(zhuǎn)換器工廠
使用GsonResponseBodyConverter做數(shù)據(jù)轉(zhuǎn)換器嘲驾,將ResponseBody轉(zhuǎn)換為T淌哟。
@Override public T convert(ResponseBody value) throws IOException {
JsonReader jsonReader = gson.newJsonReader(value.charStream());
try {
T result = adapter.read(jsonReader);
if (jsonReader.peek() != JsonToken.END_DOCUMENT) {
throw new JsonIOException("JSON document was not fully consumed.");
}
return result;
} finally {
value.close();
}
}
2.5 流程圖
總結(jié)
定義:Retrofit就是一個網(wǎng)絡(luò)請求框架的封裝,通過java接口以及注解來描述網(wǎng)絡(luò)請求辽故,并用動態(tài)代理的方式在調(diào)用接口方法時注入自己的方法徒仓,本身只是簡化了用戶網(wǎng)絡(luò)請求的參數(shù)配置等,底層的網(wǎng)絡(luò)請求還是Okhttp誊垢,請求完成后將返回的response通過converterFactorty轉(zhuǎn)換成相應(yīng)的數(shù)據(jù)model掉弛,最后通過calladapter轉(zhuǎn)換成其他數(shù)據(jù)類型,比如Rxjava的 Observable)喂走。所以可以很好的與Rxjava相結(jié)合殃饿,使用起來簡潔方便
代理模式:通過訪問代理對象的方式來間接訪問目標(biāo)對象
分為靜態(tài)代理 & 動態(tài)代理:
靜態(tài)代理:代理類在程序運行前已經(jīng)存在的代理方式
動態(tài)代理:代理類在程序運行前不存在、運行時由程序動態(tài)生成的代理方式
retrofit通過Proxy.newProxyInstance產(chǎn)生的代理類芋肠,當(dāng)調(diào)用接口方法時都會調(diào)用InvocationHandler#invoke方法得到Http請求鏈接乎芳、請求方法、請求路徑帖池、請求參數(shù)等請求信息奈惑,構(gòu)建一個OkHttp的請求并執(zhí)行。
Retrofit 優(yōu)點
1.可以配置不同 HTTP client 來實現(xiàn)網(wǎng)絡(luò)請求碘裕,如 okhttp携取、httpclient 等;
2.請求的方法參數(shù)注解都可以定制帮孔;
3.支持同步雷滋、異步和 RxJava;
4.超級解耦文兢;
5.可以配置不同的反序列化工具來解析數(shù)據(jù)晤斩,如 json、xml;
6.框架使用了很多設(shè)計模式:代理模式姆坚,構(gòu)造者模式澳泵,工廠模式,適配器模式兼呵,觀察者模式兔辅,外觀模式。
RXjava:
RxJava采取的是觀察者模式击喂,使用時要先分別創(chuàng)建一個觀察者Observer或Subscriber處理接收到數(shù)據(jù)后要做的處理(onNext,onError,onCompleted)维苔,一個被觀察者Observable用來發(fā)送要處理的數(shù)據(jù),最后由被觀察者訂閱觀察者(subscribe)懂昂,這時要發(fā)送的數(shù)據(jù)就會由被觀察發(fā)出介时,然后觀察者做出相應(yīng)處理。用代碼來簡單描述為:
//創(chuàng)建被觀察者
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello world");
}
});
//創(chuàng)建觀察者
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
//completed
}
@Override
public void onError(Throwable e) {
//error
}
@Override
public void onNext(String s) {
//do it
}
};
//訂閱事件
observable.subscribe(subscriber);
這里有個問題,為什么是被觀察者訂閱觀察者而不觀察者訂閱被觀察者呢沸柔?我認(rèn)為應(yīng)該是這樣:被觀察者循衰,即發(fā)送數(shù)據(jù)方,他的數(shù)據(jù)可以發(fā)送給多個觀察者褐澎,即可以有多個觀察者觀察他会钝,因此他是占據(jù)主導(dǎo)權(quán)的,他想讓哪個觀察者看就訂閱哪個觀察者把數(shù)據(jù)發(fā)給他工三。
當(dāng)然顽素,觀察者,被觀察者及訂閱的代碼還有很多簡單的書寫方式徒蟆,如直接使用just()等方法發(fā)送數(shù)據(jù),不創(chuàng)建觀察者而是在subscribe()方法中傳遞幾個Action()方法等等型型,在這里我只是展示了最基本的一套用法用來比較清晰地梳理一下工作流程曾棕。除此之外菱阵,RxJava還可以切換線程,可以對數(shù)據(jù)進(jìn)行變換,這些都是在訂閱過程中完成的民傻,代碼如下:
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(new Func1<String,Integer>() {
@Override
public Integer call(String s) {
//do String --> Integer
return 0;
}
})
.filter(new Func1<Integer,Boolean>() {
@Override
public Boolean call(Integer integer) {
//return your boolean
return integer>10;
}
})
.subscribe(subscriber);
其中,在切換線程時拨黔,subscribeOn指定subscribe發(fā)生的線程称杨,observeOn指定Subscriber的回調(diào)發(fā)生的線程,其他操作符如過濾砌烁、變換操作符可以自己琢磨一下筐喳。
以上,就是RxJava大體上的使用流程函喉,接下來我將從源碼角度看一下他的實際工作過程避归。
工作原理
1.創(chuàng)建Observable
首先看看Observable里面有哪些變量:
final OnSubscribe<T> onSubscribe;
然后看看create的源碼:
public final static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
通過傳入一個OnSubscribe對象,將其作為參數(shù)傳入hook.onCreate()方法管呵,將返回值作為參數(shù)構(gòu)造一個Observable對象梳毙。這里的 hook 是一個static的RxJavaObservableExecutionHook對象,他的create()方法是這樣的:
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
return f;
}
他接受一個OnSubscribe對象捐下,然后就將他返回账锹。hook.create(f) 可以等價的看作是 f 本身,Observable 的構(gòu)造器接受的就是一個OnSubscribe對象了坷襟,看看他的構(gòu)造器:
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
這里將傳入的OnSubscribe對象賦給了自己的onSubscribe奸柬。
分析完了,創(chuàng)建Observable就是給他一個OnSubscribe對象啤握,把他傳入構(gòu)造器創(chuàng)建一個Observable對象鸟缕。那么OnSubscribe是什么?
2.OnSubscribe是什么?
看看OnSubscribe源碼:
/**
* Invoked when Observable.subscribe is called.
*/
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>>{
// cover for generics insanity
}
原來是繼承自 Action1 的一個接口懂从,注釋說他在subscribe被調(diào)用的時候喚醒授段,OnSubscribe應(yīng)該就是所謂的“事務(wù)”,他的call方法負(fù)責(zé)發(fā)起事務(wù)番甩,即notifyObservers()侵贵。結(jié)合前面分析的使用過程,在創(chuàng)建Observable時傳入的OnSubscribe中實現(xiàn)了call方法并且執(zhí)行了subscriber的一些方法缘薛。
3.創(chuàng)建觀察者Observer/Subscriber
首先看Observer的定義:
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
只是一個簡單的接口窍育,再看看Subscriber:
public abstract class Subscriber<T> implements Observer<T>,Subscription{
Subscriber是一個實現(xiàn)了Observer接口的抽象類,并且還擴充了許多方法宴胧。既然如此漱抓,那我們在使用RxJava時就應(yīng)當(dāng)盡量用Subscriber代替Observer了。
看看他有哪些屬性:
// represents requested not set yet
private static final Long NOT_SET = Long.MIN_VALUE;
private final SubscriptionList subscriptions;
private final Subscriber<?> subscriber;
/* protected by `this` */
private Producer producer;
/* protected by `this` */
private long requested = NOT_SET; // default to not set
他持有一個自己的引用恕齐,一個SubscriptionList引用乞娄。分析一下,創(chuàng)建一個觀察者Subscriber显歧,就必須要實現(xiàn)來自O(shè)bserver接口的三個方法:onNext(), onError(), onCompleted()仪或。
4.訂閱subscribe
創(chuàng)建好觀察者和被觀察者之后,就可以進(jìn)行訂閱了:
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
在subscribe方法中又調(diào)用了Observer的一個私有方法:
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
}
// new Subscriber so onStart it
subscriber.onStart();
/*
* See Guideline 6.4: Protect calls to user code from within an operator · Issue #216 · ReactiveX/RxJava for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.unsubscribed();
}
}
精簡一下這個方法士骤,逐步分析范删。首先,對傳入的subscriber對象和observable.onSubscribe方法判空拷肌,然后執(zhí)行了sunscriber的start()方法到旦,之后對subscriber做了安全性封裝:
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
所有檢測完畢,開始執(zhí)行下列方法:
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
又看到那個熟悉的hook了廓块,看看他的onSubscribeStart方法是怎樣實現(xiàn)的:
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
// pass-thru by default
return onSubscribe;
}
傳給了observer對象和他的onSubscribe對象厢绝,結(jié)果直接把后者返回了……有趣的設(shè)計,返回之后又繼續(xù)調(diào)用了onSubscribe對象的call()方法带猴,并傳入了subscriber對象昔汉。
5.Subscription
在分析訂閱部分代碼時,我發(fā)現(xiàn)了subscribe()方法完成后拴清,執(zhí)行了Subscriptions的unsubscribed()方法并返回靶病。這個Subscription是什么呢?
/**
* Subscription returns from {@link Observable#subscribe(Subscriber)} to allow unsubscribing.
* <p>
* See the utilities in {@link Subscriptions} and the implementations in the {@code rx.subscriptions} package.
* <p>
* This interface is the RxJava equivalent of {@code IDisposable} in Microsoft's Rx implementation.
*/
public interface Subscription {
/**
* Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription
* was received.
* <p>
* This allows unregistering an {@link Subscriber} before it has finished receiving all events (i.e. before
* onCompleted is called).
*/
void unsubscribe();
/**
* Indicates whether this {@code Subscription} is currently unsubscribed.
*
* @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise
*/
boolean isUnsubscribed();
}
一直說觀察者模式中要訂閱口予,要訂閱娄周,怎么取消訂閱呢?原來就在這里沪停。我的理解煤辨,Subscription可以理解為一件訂閱事務(wù)裳涛,他有一個取消訂閱和檢測是否取消的方法。每一個訂閱事件众辨,最后是可以返回這樣一個subscription對象的端三。我們完全可以把這個對象收集起來,在需要的時候?qū)⑺∠嗛喚槌埂@缦裣旅孢@樣:
private CompositeSubscription subscriptions = new CompositeSubscription();
//創(chuàng)建一個異步任務(wù)
subscriptions.add(subscription);//將subscription加入合集中
subscriptions.unsubscribe();//取消訂閱
6.變換:map()
RxJava的操作符很多郊闯,我這里只選一個最基礎(chǔ)的map來看看,首先看map代碼如下:
//這段代碼是我在1.1版本中分析的蛛株,在1.2.2中已經(jīng)更改了實現(xiàn)方式团赁,多謝評論區(qū)提醒~
public final <R> Observable<R> map(Func1<? super T, ? extends R> func){
return lift(new OperatorMap<T, R>(func));
}
在內(nèi)部調(diào)用了lift()方法,并將結(jié)果返回了谨履』渡悖可以看到,變換的過程中笋粟,將包含T在內(nèi)的T的基類變換為了包含R在內(nèi)的R的子類剧浸,所以這里重點要看兩個地方,一是lift()如何實現(xiàn)矗钟,二是OperatorMap是什么。先看看lift()方法的實現(xiàn):
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
});
}
可以看到嫌变,在lift內(nèi)部用類型R又重新創(chuàng)建了一個Observable吨艇。注意觀察,這里的代碼和調(diào)用subscribe()時很像腾啥,但又不同东涡。對比一下發(fā)現(xiàn),在subscribe()時倘待,是由Observable自己的OnSubscribe調(diào)用了call()方法疮跑,并將自己的subscriber作為參數(shù)傳入call()。但是在這里凸舵,通過一個新的OnSubscribe創(chuàng)建了一個新的Observable祖娘,在內(nèi)部先創(chuàng)建了一個新的Subscriber,然后由舊的onSubscribe調(diào)用自己的call()方法啊奄,這里傳入的又是新的Subscriber渐苏。新舊之間的關(guān)聯(lián)就在于新的SUbscriber創(chuàng)建的過程:
Subscriber<? super T> st = hook.onLift(operator).call(o);
可以看到,創(chuàng)建新的Subscriber時用到了我們傳入的operator菇夸,看看hook的lift()實現(xiàn):
public <T, R> Operator<? extends R, ? super T> onLift(final Operator<? extends R, ? super T> lift) {
return lift;
}
把傳入的operator又原樣返回了琼富。那么前面的代碼就可以簡化為operator直接調(diào)用了call()方法。我們自己寫的變換代碼就是實現(xiàn)了這個operator的call()方法庄新。
.map(new Func1<String,Integer>() {
@Override
public Integer call(String s) {
//do String --> Integer
return 0;
}
})
看看前面的代碼鞠眉,我們傳入的這個Func1薯鼠,在內(nèi)部會由他創(chuàng)建一個OperatorMap,然后將OperatorMap傳入了lift()械蹋,這個OperatorMap就是我們剛才講的operator的來源出皇。
再來看看OperatorMap是什么:
public final class OperatorMap<T, R> implements Operator<R, T> {
private final Func1<? super T, ? extends R> transformer;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, t);
}
}
};
}
}
可以看到,當(dāng)構(gòu)造一個OperatorMap時朝蜘,傳入了一個func恶迈,在OperatorMap構(gòu)造器中,是將其賦給了自己記得一個叫做transformer的屬性谱醇,這個transformer是一個Func1對象暇仲,因此我們的實現(xiàn)變換的主要細(xì)節(jié)其實就在于這個Func1。
7.變換:compose()
除了最基礎(chǔ)的mao進(jìn)行變換外副渴,我們常用的還有compose變換奈附,看看他是怎么實現(xiàn)的:
public <R> Observable<R> compose(Transformer<? super T, ? extends R> transformer) {
return ((Transformer<T, R>) transformer).call(this);
}
通過compose變換時,傳入的是一個transfomer煮剧,最后調(diào)用的是他的call()方法斥滤。transfomer就是前面Map變換中用到的那個,綜合來看勉盅,在RxJava中進(jìn)行變換時佑颇,是通過創(chuàng)建新的Observable進(jìn)行代理來實現(xiàn)的,而具體實現(xiàn)細(xì)節(jié)使用了transformer草娜。
8.線程切換:subscrieOn()
subscribeOn()指定了subscribe()所發(fā)生的線程挑胸,看看他是怎樣實現(xiàn)的:
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
}
先調(diào)用了nest(),返回一個Observable對象宰闰,然后調(diào)用lift()進(jìn)行變換茬贵,進(jìn)行變換時傳入的是一個由線程調(diào)度器scheduler構(gòu)造的OperatorSubscribeOn對象。先看看nest中發(fā)生了什么:
public final Observable<Observable<T>> nest() {
return just(this);
}
把自己傳給了just()方法:
public final static <T> Observable<T> just(final T value) {
return ScalarSynchronousObservable.create(value);
}
這個ScalarSynchronousObservable是繼承自O(shè)bservabel的移袍,到頭來還是調(diào)用create()創(chuàng)建了一個Observable對象解藻。lift()方法前面已經(jīng)分析過了,lift()中創(chuàng)建了一個新的Observable葡盗,這里不同的地方在于傳入的是一個線程調(diào)度器scheduler而非OperatorMap螟左,所以線程調(diào)度的具體實現(xiàn)應(yīng)該就是由scheduler和OperatorSubscribeOn來決定的了。那么接下來就看看OperatorSubscribeOn是如何實現(xiàn)線程控制的觅够。首先根據(jù)上面的分析路狮,這里傳入了一個scheduler對象給構(gòu)造器,點進(jìn)來看看他的實現(xiàn)蔚约,:
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {
private final Scheduler scheduler;
public OperatorSubscribeOn(Scheduler scheduler) {
this.scheduler = scheduler;
}
@Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
return new Subscriber<Observable<T>>(subscriber) {
@Override
public void onCompleted() {
// ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(final Observable<T> o) {
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
o.unsafeSubscribe(new Subscriber<T>(subscriber) {
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void setProducer(final Producer producer) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (Thread.currentThread() == t) {
// don't schedule if we're already on the thread (primarily for first setProducer call)
// see unit test 'testSetProducerSynchronousRequest' for more context on this
producer.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
producer.request(n);
}
});
}
}
});
}
});
}
});
}
};
}
}
可以看到奄妨,在構(gòu)造器中,傳入的scheduler賦給了自己的scheduler苹祟,然后在call方法中砸抛,通過scheduler創(chuàng)建了一個worker對象评雌,名叫inner,之后的所有操作都是由inner完成的直焙【岸總結(jié)一下,就是傳入的scheduler創(chuàng)建了一個worker對象奔誓,由這個對象進(jìn)行了實際上的線程控制斤吐。所以線程控制的關(guān)鍵就在于這個scheduler。而scheduler就是我們在使用過程中傳入的http://Schedulers.io()等厨喂,這里就拿http://Schedulers.io()看看和措。
public static Scheduler io() {
return INSTANCE.ioScheduler;
}
再看看Schedulers類的構(gòu)造器,可以知道INSTANCE.ioSchduler是在構(gòu)造器中進(jìn)行初始化的:
Scheduler io = RxJavaPlugins.getInstance().getSchedulersHook().getIOScheduler();
if (io != null) {
ioScheduler = io;
} else {
ioScheduler = new CachedThreadScheduler();
}
再結(jié)合:
/**
* Scheduler to return from {@link rx.schedulers.Schedulers#io()} or null if default should be used.
*
* This instance should be or behave like a stateless singleton;
*/
public Scheduler getIOScheduler() {
return null;
}
可知蜕煌,ioScheduler是由CacheThreadSchduler這個類創(chuàng)建的派阱,這個類繼承自Scheduler,那么也就是說抽象類Scheduler的createWorker()方法由子類CacheThreadSchduler實現(xiàn)了斜纪。那就來看看這個方法具體的實現(xiàn):
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
可以看到贫母,createWorker()方法返回了一個EventLoopWorker對象。而這個類是CacheThreadSchduler類的內(nèi)部類盒刚∠倭樱回憶一下,之前我們創(chuàng)建好worker之后因块,他是如何工作的誓酒?:
inner.schedule(new Action0() {...};
是有這個worker對象調(diào)用了schedule()方法,并且傳入了一個Action0贮聂。那么就來看看worker的源頭,EventLoopWorker寨辩,在schedule()時做了什么:
private static final class EventLoopWorker extends Scheduler.Worker {
/* 省略部分代碼*/
@Override
public Subscription schedule(Action0 action) {
return schedule(action, 0, null);
}
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.unsubscribed();
}
ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
return s;
}
}
可以看到吓懈,最終實際上是調(diào)用了thread.scheduleActual()方法,并將action傳給了他靡狞,返回一個 ScheduledAction 對象耻警。那么看看這個方法內(nèi)部是如何實現(xiàn)的:
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
創(chuàng)建了一個ScheduledAction的對象,并將其返回甸怕,而ScheduledAction類是實現(xiàn)了Runnable接口的:
public final class ScheduledAction extends AtomicReference<Thread> implements Runnable, Subscription {
因此甘穿,具體対線程的操作就是在這里了∩液迹總結(jié)一下温兼,SubscribeOn() 是通過 life() 變換來完成的,而在變換中實際上是通過 CachedThreadScheduler 類提供的 schedule() 方法武契,用Runnable來完成的線程控制募判。
9.線程切換:observeOn()
和 subscribeOn() 方法一樣荡含,observeOn() 方法實現(xiàn)原理也是通過 lift() 變換:
public final Observable<T> observeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler));
}
類似的,這里也是傳入了一個Operator届垫,不同的是這里傳入的是通過scheduler創(chuàng)建的OperatorObserveOn對象释液。先來看看OperatorObserveOn的構(gòu)造器:
public OperatorObserveOn(Scheduler scheduler) {
this.scheduler = scheduler;
}
類似的,將傳入的 scheduler 賦給了自己的 scheduler 屬性装处。這個scheduler 在哪里用到了呢误债?首先是在回調(diào)方法 call() 中:
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child);
parent.init();
return parent;
}
}
通過scheduler創(chuàng)建了一個 ObserveOnSubscriber 對象 parent ,并調(diào)用了 init() 方法妄迁。這個 ObserveOnSubscriber 類是一個內(nèi)部類寝蹈,他的構(gòu)造器:
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
} else {
queue = new SynchronizedQueue<Object>(RxRingBuffer.SIZE);
}
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler);
}
在這里就調(diào)用了 scheduler 的 createWorker() 方法,并將返回結(jié)果賦給了自己的 recursiveScheduler 判族,然后由他創(chuàng)建了 ScheduledUnsubscribe 對象躺盛,將這個對象賦給了 scheduledUnsubscribe。好像有點亂形帮,大概理一下槽惫,這里創(chuàng)建了一個 Subscriber 對象,在內(nèi)部做了一些初始化的操作辩撑,而這個Subscriber 對象實際上就是 ObserveOnSubscriber 的對象界斜。觀察 ObserveOnSubscriber 類:
@Override
public void onNext(final T t) {
if (isUnsubscribed()) {
return;
}
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
@Override
public void onCompleted() {
if (isUnsubscribed() || finished) {
return;
}
finished = true;
schedule();
}
@Override
public void onError(final Throwable e) {
if (isUnsubscribed() || finished) {
return;
}
error = e;
// unsubscribe eagerly since time will pass before the scheduled onError results in an unsubscribe event
unsubscribe();
finished = true;
// polling thread should skip any onNext still in the queue
schedule();
}
會發(fā)現(xiàn),在這三個被調(diào)用的方法中都會調(diào)用 schedule() 方法合冀,而 schedule() 方法的實現(xiàn)是這樣的:
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(action);
}
}
注意各薇,這個 recursiveScheduler 就是前面創(chuàng)建的worker。所以控制線程切換的關(guān)鍵還是在于傳入的 scheduler及他所創(chuàng)建的 worker 和worker的 schedule() 方法君躺。傳入的 scheduler 有很多種峭判,就拿 AndroidSchedulers.mainThread() 來說:
public final class AndroidSchedulers {
private AndroidSchedulers() {
throw new AssertionError("No instances");
}
// See Unit testing support for AndroidSchedulers · Issue #238 · ReactiveX/RxAndroid
// Initialization-on-demand holder idiom
private static class MainThreadSchedulerHolder {
static final Scheduler MAIN_THREAD_SCHEDULER =
new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
/** A {@link Scheduler} which executes actions on the Android UI thread. */
public static Scheduler mainThread() {
Scheduler scheduler =
RxAndroidPlugins.getInstance().getSchedulersHook().getMainThreadScheduler();
return scheduler != null ? scheduler : MainThreadSchedulerHolder.MAIN_THREAD_SCHEDULER;
}
}
可以看到,scheduler就是一個 HandlerScheduler 對象棕叫,看HandlerScheduler類的實現(xiàn):
public final class HandlerScheduler extends Scheduler {
/*省略部分代碼*/
HandlerScheduler(Handler handler) {
this.handler = handler;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
/*省略部分代碼*/
}
}
HandlerScheduler 類也繼承了 Scheduler 林螃,他的createWorker() 創(chuàng)建了一個HandlerWorker 對象。所以前面創(chuàng)建的worker其實就是 HandlerWorker俺泣。HandlerWorker 類是HandlerScheduler 的內(nèi)部類疗认,他的schedule 方法:
static class HandlerWorker extends Worker {
private final Handler handler;
private final CompositeSubscription compositeSubscription = new CompositeSubscription();
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (compositeSubscription.isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
action = RxAndroidPlugins.getInstance().getSchedulersHook().onSchedule(action);
final ScheduledAction scheduledAction = new ScheduledAction(action);
scheduledAction.addParent(compositeSubscription);
compositeSubscription.add(scheduledAction);
handler.postDelayed(scheduledAction, unit.toMillis(delayTime));
scheduledAction.add(Subscriptions.create(new Action0() {
@Override
public void call() {
handler.removeCallbacks(scheduledAction);
}
}));
return scheduledAction;
}
@Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0, TimeUnit.MILLISECONDS);
}
可以看到,在 schedule 內(nèi)部還是創(chuàng)建了一個 ScheduledAction 對象伏钠,之后所有的操作都有他來完成横漏。由前面分析可知,ScheduledAction 類實現(xiàn)了 Runnable熟掂。所以歸根結(jié)底缎浇,兩個線程控制都是由 Runnable 來實現(xiàn)的。
總結(jié)
RxJava 是一個 基于事件流赴肚、實現(xiàn)異步操作的庫
原理:被觀察者 (Observable) 通過 訂閱(Subscribe) 按順序發(fā)送事件 給觀察者 (Observer)
觀察者(Observer) 按順序接收事件 & 作出對應(yīng)的響應(yīng)動作
Scheduler的原理(線程切換)
首先是observeOn切換線程华畏,他根據(jù)傳入的參數(shù)(newThread()鹏秋,io(),singleThread()亡笑、AndroidmainThread可以生成不同的線程侣夷,它在調(diào)用onNext方法中執(zhí)行schedule方法內(nèi)部有一個work對象實現(xiàn)了runnable接口,完成了線程切換仑乌。但如果是AndroidmainThread百拓,則由handle發(fā)送postdeply完成到主線程的切換。subscribeOn切換線程與observeOn類似晰甚,但是observeOn是改變它所在線程所以每次切換都有效衙传,而subscribeOn是改變數(shù)據(jù)源的運行線程,只在第一次有效厕九,后續(xù)切換都無效蓖捶,因為subscribeOn自下而上每次在指定線程中向上級訂閱,下次再執(zhí)行subscribeOn只會在改變的線程里進(jìn)行扁远,用戶感受不到線程切換俊鱼。
優(yōu)點
1)采用鏈?zhǔn)秸{(diào)用,代碼簡潔優(yōu)雅有美感畅买,并且可讀性增強并闲。
2)rxjava中采用觀察者模式。模塊之間劃定了清晰的界限谷羞,降低了模塊間的耦合性帝火,提高了代碼的可維護(hù)性和重用性。
3)rxjava中提供了強大的操作符湃缎。
:just:將同種數(shù)據(jù)源組合放到被觀察者上面
from:將類似數(shù)組犀填、集合的數(shù)據(jù)源放到被觀察者上面
map:將一種數(shù)據(jù)源轉(zhuǎn)化成另外一種,可以是任意類型變換是1對1
flatmap:將一種數(shù)據(jù)源轉(zhuǎn)化成另外一種數(shù)據(jù)嗓违,返回ObservableSource對象九巡。可以對數(shù)據(jù)進(jìn)行一對多靠瞎,多對多的變換。flatMap并不保證數(shù)據(jù)有序求妹。
zip:處理多種不同結(jié)果集的數(shù)據(jù)發(fā)射乏盐,一般用得多的地方是多個網(wǎng)絡(luò)請求組合然后統(tǒng)一處理業(yè)務(wù)邏輯。
除此之外還有經(jīng)常用到compose操作符制恍,因為rxjava發(fā)布訂閱如果沒及時取消會內(nèi)存泄漏父能,通過compose與rxlivercycle配合使用綁定容器生命周期。