引言
經(jīng)過幾年的發(fā)展般妙,響應式編程已經(jīng)是很流行了陈哑,在Android開發(fā)中的應用也非常的廣泛合瓢,身為Android開發(fā)者,則是必須掌握的技術杨凑。
正文
網(wǎng)上已經(jīng)有很多很多RxJava相關的文章滤奈,視頻等等教程,但是說實話對于入門撩满,或者新手來說蜒程,確實不好理解绅你,上來就是各種,觀察者昭躺、被觀察者忌锯、訂閱、發(fā)布等等概念领炫,一遍看下來直接就暈了偶垮,就感覺RxJava很難,難理解帝洪,用的時候也只是依葫蘆畫瓢似舵,暈乎乎的用著,然后就沒有然后了碟狞。
這里我都不說那些概念啄枕,因為講概念太抽象,難記住族沃,更難理解频祝。我們用另外一個視角來學習。因為RxJava 1.x的版本 官方已經(jīng)停止更新了維護了脆淹,沒有學習過也沒有關系常空,RxJava 2.x是全新的,直接學習使用就好了盖溺。
首先假設我們在工廠里上班漓糙,工廠都會有流水線,產(chǎn)品經(jīng)過流水線生產(chǎn)后來訂單了銷售出去烘嘱。
這里假設工廠生產(chǎn)的是一種六邊形的“Jerry帥氣餅干”昆禽,上游是生產(chǎn)車間流水線的事件流,下游是訂單產(chǎn)品的銷售消費事件流蝇庭。中間連接上下游關系的暫且叫做“Jerry帥氣餅干生產(chǎn)消費訂單管理系統(tǒng)”(不要臉醉鳖,名字寫這么長),為了下文方便抒寫且用“生產(chǎn)訂單管理系統(tǒng)”(PCMS)哮内。以上圖上下游對應的就是Obsersvable被觀察者也是發(fā)布者盗棵,下游對應Observer觀察者也是訂閱者。使用RxJava代碼表示上圖就是:
public void test1() {
// 上游生成產(chǎn)品流水線
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "test1 ====== Observable: ------ onNext: Jerry");
emitter.onNext("Jerry");
Log.d(TAG, "test1 ====== Observable: ------ onNext: 就是");
emitter.onNext("就是");
Log.d(TAG, "test1 ====== Observable: ------ onNext: 帥");
emitter.onNext("帥");
Log.d(TAG, "test1 ====== Observable: ------ onNext: 1狈ⅰN埔颉!");
emitter.onNext("A詹Α2t恰。?);
Log.d(TAG, "test1 ====== Observable: ------ onComplete");
emitter.onComplete();
Log.d(TAG, "test1 ====== Observable: ------ onNext: Jerry帥炸天S印>贰是牢!");
emitter.onNext("Jerry帥炸天!I陆亍!");
}
});
// 下游訂單產(chǎn)品銷售
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "test1 ====== Observer: onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "test1 ====== Observer: onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "test1 ====== Observer: onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "test1 ====== Observer: onComplete");
}
};
// 連接上下游的訂單管理系統(tǒng)
observable.subscribe(observer);
}
上述代碼批什,上游生產(chǎn)車間流水線就是Observable农曲,下游訂單銷售就是Observer,中間通過“生產(chǎn)訂單管理系統(tǒng)”subscribe來將上下游連接起來驻债。
運行后輸出結果是:
從輸出結果來看乳规,當上游Observable發(fā)出一個生產(chǎn)的餅干產(chǎn)品事件,下游訂單銷售的Observer就銷售一個餅干產(chǎn)品事件合呐,而且當上游調(diào)用了onComplete方法后暮的,上游的生產(chǎn)事件還是生產(chǎn)餅干事件(繼續(xù)生產(chǎn)了“Jerry帥炸天”餅干事件),但是下游的訂單銷售卻沒有消費掉淌实。也就是事件產(chǎn)生方調(diào)用onComplete方法后冻辩,之后的事件還會繼續(xù)發(fā)送,但是事件接收方就不會接收了拆祈。
我們來看看Observable的subscribe方法的參數(shù):ObservableEmitter恨闪,Emitter顧名思義是發(fā)射器的意思,ObservableEmitter接口繼承自Emitter接口:
public interface Emitter<T> {
/**
* Signal a normal value.
* @param value the value to signal, not null
*/
void onNext(T value);
/**
* Signal a Throwable exception.
* @param error the Throwable to signal, not null
*/
void onError(Throwable error);
/**
* Signal a completion.
*/
void onComplete();
}
接口定義很簡單放坏,就三個方法咙咽,onNext我們上門已經(jīng)用過了,是用來發(fā)射發(fā)送事件的淤年,onComplete是用來表示事件發(fā)送完了钧敞,后面如果有新的事件發(fā)送,下游接收者可以不用處理麸粮,onError方法看注釋說是發(fā)送一個異常事件給下游接收者溉苛。到底是不是這樣,我們來試試就曉得了豹休。
public void test2() {
// 上游生成產(chǎn)品流水線
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "test2 ====== Observable: ------ onNext: Jerry");
emitter.onNext("Jerry");
Log.d(TAG, "test2 ====== Observable: ------ onNext: 就是");
emitter.onNext("就是");
Log.d(TAG, "test2 ====== Observable: ------ onNext: 帥");
emitter.onNext("帥");
Log.d(TAG, "test2 ====== Observable: ------ onNext: 4独ァ!威根!");
emitter.onNext("7锞蕖!洛搀!");
Log.d(TAG, "test2 ====== Observable: ------ onError");
emitter.onError(new IllegalStateException("Jerry餅干烤焦了敢茁,賣出去會被打!"));
Log.d(TAG, "test2 ====== Observable: ------ onNext: Jerry帥炸天A裘馈U妹省伸刃!");
emitter.onNext("Jerry帥炸天!7瓯丁捧颅!");
}
});
// 下游訂單產(chǎn)品銷售
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "test2 ====== Observer: onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "test2 ====== Observer: onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "test2 ====== Observer: onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "test2 ====== Observer: onComplete");
}
};
// 連接上下游的訂單管理系統(tǒng)
observable.subscribe(observer);
}
在上游生產(chǎn)餅干的時候就生產(chǎn)了一個“Jerry餅干烤焦了,賣出去會被打较雕!”的錯誤餅干事件碉哑,下游訂單銷售的onError出錯狀態(tài)會消費這個事件。而上游在出錯事件后發(fā)送的“Jerry帥炸天A两?鄣洹!”餅干事件慎玖,同樣也只是把事件發(fā)送了處理贮尖,下游訂單銷售并沒有接收處理這個事件。
運行后輸出結果:
細心的小伙伴應該會發(fā)現(xiàn)趁怔,每次執(zhí)行的時候都會先調(diào)用下游的onSubscribe方法湿硝,這個方法里有個參數(shù)Disposable(用完即可丟棄)意思可以理解成,將上下游的連接切斷痕钢,讓上游的生產(chǎn)的餅干不打包放入下游訂單銷售環(huán)節(jié)图柏,實際開發(fā)中是有這種需求的,當發(fā)送事件出問題的時候就需要斷開事件接收處理任连。不像最近的疫苗事件蚤吹,一些不要臉的生物疫苗公司把生產(chǎn)不合格的疫苗上市銷售,傷天害理随抠,謀財害命裁着。下面舉個例子:
public void test3() {
// 上游生成產(chǎn)品流水線
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "test3 ====== Observable: ------ onNext: Jerry");
emitter.onNext("Jerry");
Log.d(TAG, "test3 ====== Observable: ------ onNext: 就是");
emitter.onNext("就是");
Log.d(TAG, "test3 ====== Observable: ------ onNext: 帥");
emitter.onNext("帥");
Log.d(TAG, "test3 ====== Observable: ------ onNext: !9八二驰!");
emitter.onNext("!1印桶雀!");
Log.d(TAG, "test3 ====== Observable: ------ onComplete");
emitter.onComplete();
Log.d(TAG, "test3 ====== Observable: ------ onNext: Jerry帥炸天!;8础矗积!");
emitter.onNext("Jerry帥炸天!3ㄟ帧棘捣!");
}
});
// 下游訂單產(chǎn)品銷售
Observer<String> observer = new Observer<String>() {
private Disposable mDisposable;
private int i;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "test3 ====== Observer: onSubscribe");
mDisposable = d;
}
@Override
public void onNext(String value) {
Log.d(TAG, "test3 ====== Observer: onNext: " + value);
i++;
// 第一個事件接收后,就斷開上下游連接
if (i == 1) {
Log.d(TAG, "test3 ====== Observer: start disposable");
mDisposable.dispose();
Log.d(TAG, "test3 ====== Observer: isDisposable: " + mDisposable.isDisposed());
}
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "test3 ====== Observer: onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "test3 ====== Observer: onComplete");
}
};
// 連接上下游的訂單管理系統(tǒng)
observable.subscribe(observer);
}
這里我們在下游訂單銷售的onNext方法中休建,當接收完第一個餅干事件后乍恐,就使用mDisposable.dispose()方法將上下游的連接斷開了评疗,斷開后上游后續(xù)生產(chǎn)的餅干事件,下游就接收不到茵烈。
運行的結果:
上圖中百匆,也驗證了我們的猜想,當使用dispose斷開上下游連接后呜投,下游就無法再繼續(xù)接收事件了胧华。
這一講就先介紹這么多,這樣的方式理解Observable和Observer以及訂閱動作subscribe是不是容易多了宙彪,希望對你有所幫助,下一講使用RxJava來切換變化餅干事件處理的線程(主線程有巧、子線程)释漆。