1.什么是Rx
是一種基于事件驅(qū)動(dòng)的,異步數(shù)據(jù)流的編程模型;
英文全名是Reactive Extension亿傅,2012年11月開源。Rx并不是一種編程接口瘟栖,而是一種編程思想葵擎,目標(biāo)是提供一致的編程接口,幫助開發(fā)者更方便的處理異步數(shù)據(jù)流半哟。
用中文翻譯就是響應(yīng)式編程(通俗的講:反應(yīng)式編程)酬滤,就是因?yàn)槲覀兊谩绊憫?yīng)”這些事件而得以命名。
例如寓涨,你訂了一份外賣盯串,在你等待外賣的時(shí)候繼續(xù)工作,當(dāng)外賣小哥給你打電話叫你去取餐的時(shí)候戒良,你會(huì)立即響應(yīng)嘴脾。這是最簡(jiǎn)單的執(zhí)行任務(wù)的反應(yīng)(響應(yīng))形式。
int x=111;
int y=x+111;
System.out.print(“y=”+y) // b=222
x=1111;
System.out.print(“y=”+y) // b=222
int x=111;
int y <= x+111; // <= 符號(hào)只是表示x和y之間關(guān)系的操作符
System.out.print(“y=”+y) // y=222
x=1111;
System.out.print(“y=”+y) // b=1222
這種編程的思想蔬墩,是企圖構(gòu)建某種關(guān)系译打,而不是執(zhí)行某種賦值語句。編寫代碼基于對(duì)變化的反應(yīng)拇颅。
這種靈感來源于生活奏司,也就是我們?nèi)绾尾扇⌒袆?dòng)和與他人溝通。
其實(shí)在項(xiàng)目開發(fā)中過程中樟插,也總是存在著各種業(yè)務(wù)關(guān)系韵洋,需要不斷處理一系列的業(yè)務(wù)邏輯相互作用竿刁。
然而,我們的編程語言并沒有表示關(guān)系的構(gòu)建方式搪缨。
因此Rx應(yīng)運(yùn)而生食拜。
首先要熟悉三個(gè)概念:
- 事件
- 異步
- 數(shù)據(jù)流
何為事件?代碼的角度來講就是一段業(yè)務(wù)邏輯副编。通俗的講负甸,例如,每次頂一個(gè)外賣都是一個(gè)“事件”痹届。如果你看看訂單列表呻待,你會(huì)發(fā)現(xiàn)其實(shí)是一個(gè)隨著時(shí)間的推移(一系列的事件)發(fā)生的一系列的“事件”。
異步編程队腐,我們可能只能在一個(gè)線程中順序調(diào)用這三個(gè)相對(duì)耗時(shí)較多的業(yè)務(wù)蚕捉,如圖,然后再去做頁面跳轉(zhuǎn)柴淘,如果順序執(zhí)行不僅沒有忠實(shí)反映業(yè)務(wù)本來的關(guān)系迫淹,而且會(huì)讓你的程序“反應(yīng)”更慢。區(qū)分出無關(guān)为严。
數(shù)據(jù)流只是事物之間溝通的橋梁敛熬。每一個(gè)業(yè)務(wù)完成后,都會(huì)有一條數(shù)據(jù)(一個(gè)事件)流向下游梗脾,下游的業(yè)務(wù)收到這條數(shù)據(jù)(這個(gè)事件),才會(huì)開始自己的工作盹靴。聯(lián)系起有關(guān)
異步和數(shù)據(jù)流都是為了正確的構(gòu)建事物的關(guān)系而存在的炸茧。通過 異步 和 數(shù)據(jù)流 來構(gòu)建事物關(guān)系的編程模型。只不過稿静,異步是為了區(qū)分出無關(guān)的事物梭冠,而數(shù)據(jù)流(事件流)是為了聯(lián)系起有關(guān)的事物。
2.觀察者模式(觸發(fā)聯(lián)動(dòng))銀行卡
即當(dāng)一個(gè)對(duì)象發(fā)生變化時(shí)改备,依賴它的所有對(duì)象都會(huì)被通知并且會(huì)自動(dòng)更新控漠。
1.生產(chǎn)者在沒有更多數(shù)據(jù)可用時(shí)能夠發(fā)出信號(hào)通知:onCompleted()事件。
2.生產(chǎn)者在發(fā)生錯(cuò)誤時(shí)能夠發(fā)出信號(hào)通知:onError()事件悬钳。
3.RxJava Observables 能夠組合而不是嵌套盐捷,從而避免開發(fā)者陷入回調(diào)地獄。
3.有什么優(yōu)勢(shì)
在業(yè)務(wù)層面實(shí)現(xiàn)代碼邏輯分離默勾,方便后期維護(hù)和拓展
極大提高程序響應(yīng)速度碉渡,充分發(fā)掘CPU的能力
幫助開發(fā)者提高代碼的抽象能力和充分理解業(yè)務(wù)邏輯
Rx豐富的操作符會(huì)幫助我們極大的簡(jiǎn)化代碼邏輯
一句話簡(jiǎn)單描述:避免“回調(diào)地獄”的問題,解耦代碼母剥。
代碼運(yùn)行的順序不是代碼行的順序
void 生產(chǎn)汽車(){
汽車 phone = 汽車模版.build("car");
裝發(fā)動(dòng)機(jī)(car){
onSuccess(){
裝方向盤(car){
onSuccess() {
裝座椅(car) {
onSuccess() {
continue(); ....
}
onFailure() {
handleError(); ...
}
}
}
onFailure()
{ print(msg); }
}
} onFailure(msg)
{ print(msg);
}
}
}
汽車生成流水線.from(汽車毛胚序列[]) //先把毛胚按序放到生產(chǎn)線上面
.map(裝發(fā)動(dòng)機(jī)())
.map(裝方向盤())
.map(裝座椅())
.map(裝導(dǎo)航())
.subscribe(出廠); //生產(chǎn)線的尾端滞诺,將好的產(chǎn)品打包
4.應(yīng)用在哪
- 在后端系統(tǒng)中給你一種新的學(xué)習(xí)擴(kuò)展和并發(fā)的方式形导,而這不需要更換開發(fā)語言;
- 你克服Android平臺(tái)局限性從而創(chuàng)建一個(gè)基于事件驅(qū)動(dòng)的,響應(yīng)式的习霹,流暢體驗(yàn)的Android應(yīng)用;rxandroid 中viewobservable,widgetObservable,lifecycleObservable;rxBinding
可能是東半球最全的RxJava使用場(chǎng)景小結(jié)
5.操作符
四大類:
1.創(chuàng)建 create from(迭代) just(整串) empty never error publishSubject BehaviorSubject ReplaySubject AsyncSubject
2.過濾 就像一個(gè)紗窗 過濾掉不要的 留下想要的 filter(from 組合 過濾空數(shù)據(jù))range repeat interval timer take skip frist last elementAt distinct distinctUtilChanged throttleFirst(指定時(shí)間的第一個(gè)) Timeout
3.變換 就像一個(gè)加工廠 對(duì)原料進(jìn)行加工 生成新的 map flamap (合并 允許交叉朵耕,重要是記住每一個(gè)發(fā)生錯(cuò)誤的情況,flatmap會(huì)觸發(fā)自己的onerror map() 是一對(duì)一的轉(zhuǎn)化淋叶,而flatmap是一對(duì)多的轉(zhuǎn)化)
concatMap (不合并 阎曹,連續(xù)在一起 不會(huì)交叉) flatmapIterable 兩兩結(jié)對(duì) switchMap停止前邊的 scan將返回結(jié)果帶回繼續(xù)發(fā)射 groupBy cast buffer window
4.組合 merge (多輸入 單輸出 如果是同步的不會(huì)交叉 異步會(huì)交叉 mergeDelayError 發(fā)生錯(cuò)誤所有完成后發(fā)送onerror)
zip 多個(gè)輸入進(jìn)行函數(shù)處理后輸出 有點(diǎn)像map 每個(gè)發(fā)射完才進(jìn)行操作
combineLastest 操作最近的兩個(gè) 與zip不同在于任何一個(gè)發(fā)射了數(shù)據(jù)都可以工作
and then when (rxjava-joins JoinObservable 類似zip)
switch 取消訂閱之前的 訂閱新的
startWith 和concat類似 只不過放在前邊
5.調(diào)度observeOn subscribeOn doOnCOmpeleted doOnError doOnTerminate oOnSubscribe delay
.io() 讀寫類
.computation() 計(jì)算類
.immediate() 立即
.newThread() 新開線程
.trampoline() 入隊(duì) 不立即執(zhí)行
6.布爾 all sequenceEqual exits contains isEmpty amb...
7.轉(zhuǎn)換 toList toSortedList(bean implements Comparable) toMap
8.錯(cuò)誤重試 onErrorResumeNext OnExceptionResumeNext OnErrorReturn retry ...
note: onBackPressBuffer
我的練習(xí)例子:https://git.oschina.net/gryllus/Rx.git
4.什么原理
只有熟悉響應(yīng)式編程的原理及思想,才能夠講其應(yīng)用到我們的項(xiàng)目當(dāng)中爸吮,成為我們手中的利器芬膝。
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自動(dòng)創(chuàng)建 Subscriber ,并使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動(dòng)創(chuàng)建 Subscriber 形娇,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動(dòng)創(chuàng)建 Subscriber 锰霜,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()桐早、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
因此 Action0 可以被當(dāng)成一個(gè)包裝對(duì)象癣缅,將 onCompleted() 的內(nèi)容打包起來將自己作為一個(gè)參數(shù)傳入 subscribe() 以實(shí)現(xiàn)不完整定義的回調(diào)。這樣其實(shí)也可以看做將 onCompleted() 方法作為參數(shù)傳進(jìn)了 subscribe()哄酝,相當(dāng)于其他某些語言中的『閉包』友存。
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
可以看到,subscriber() 做了3件事:
1. 調(diào)用 Subscriber.onStart() 陶衅。這個(gè)方法在前面已經(jīng)介紹過屡立,是一個(gè)可選的準(zhǔn)備方法。
2.調(diào)用 Observable 中的 OnSubscribe.call(Subscriber) 搀军。在這里膨俐,事件發(fā)送的邏輯開始運(yùn)行。從這也可以看出罩句,在 RxJava 中焚刺, Observable 并不是在創(chuàng)建的時(shí)候就立即開始發(fā)送事件,而是在它被訂閱的時(shí)候门烂,即當(dāng) subscribe() 方法執(zhí)行的時(shí)候乳愉。
3.將傳入的 Subscriber 作為 Subscription 返回。這是為了方便 unsubscribe().
- 變換的原理:lift()
// 注意:這不是 lift() 的源碼屯远,而是將源碼中與性能蔓姚、兼容性、擴(kuò)展性有關(guān)的代碼剔除后的核心代碼慨丐。
// 如果需要看源碼赂乐,可以去 RxJava 的 GitHub 倉庫下載。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}
subscribe() 中這句話的 onSubscribe 指的是 Observable 中的 onSubscribe 對(duì)象咖气,這個(gè)沒有問題挨措,但是 lift() 之后的情況就復(fù)雜了點(diǎn)挖滤。
當(dāng)含有 lift() 時(shí):
1.lift() 創(chuàng)建了一個(gè) Observable 后,加上之前的原始 Observable浅役,已經(jīng)有兩個(gè) Observable 了斩松;
2.而同樣地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe觉既,也就有了兩個(gè) OnSubscribe惧盹;
3.當(dāng)用戶調(diào)用經(jīng)過 lift() 后的 Observable 的 subscribe() 的時(shí)候,使用的是 lift() 所返回的新的 Observable 瞪讼,于是它所觸發(fā)的 onSubscribe.call(subscriber)钧椰,也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那個(gè) OnSubscribe符欠;
4.而這個(gè)新 OnSubscribe 的 call() 方法中的 onSubscribe 嫡霞,就是指的原始 Observable 中的原始 OnSubscribe ,在這個(gè) call() 方法里希柿,新 OnSubscribe 利用 operator.call(subscriber) 生成了一個(gè)新的 Subscriber(Operator 就是在這里诊沪,通過自己的 call() 方法將新 Subscriber和原始 Subscriber 進(jìn)行關(guān)聯(lián),并插入自己的『變換』代碼以實(shí)現(xiàn)變換)曾撤,然后利用這個(gè)新 Subscriber 向原始 Observable 進(jìn)行訂閱端姚。
這樣就實(shí)現(xiàn)了 lift() 過程,有點(diǎn)像一種代理機(jī)制挤悉,通過事件攔截和處理實(shí)現(xiàn)事件序列的變換渐裸。
精簡(jiǎn)掉細(xì)節(jié)的話,也可以這么說:在 Observable 執(zhí)行了 lift(Operator) 方法之后装悲,會(huì)返回一個(gè)新的 Observable昏鹃,這個(gè)新的 Observable 會(huì)像一個(gè)代理一樣,負(fù)責(zé)接收原始的 Observable 發(fā)出的事件衅斩,并在處理后發(fā)送給 Subscriber盆顾。
note:這就形成了流式的調(diào)用流程
- 除了 lift() 之外怠褐, Observable 還有一個(gè)變換方法叫做 compose(Transformer)畏梆。它和 lift() 的區(qū)別在于, lift() 是針對(duì)事件項(xiàng)和事件序列的奈懒,而 compose() 是針對(duì) Observable 自身進(jìn)行變換奠涌。
- subscribeOn() 的線程切換發(fā)生在 OnSubscribe 中,即在它通知上一級(jí) OnSubscribe 時(shí)磷杏,這時(shí)事件還沒有開始發(fā)送溜畅,因此 subscribeOn() 的線程控制可以從事件發(fā)出的開端就造成影響;而 observeOn() 的線程切換則發(fā)生在它內(nèi)建的 Subscriber 中极祸,即發(fā)生在它即將給下一級(jí) Subscriber 發(fā)送事件時(shí)慈格,因此 observeOn() 控制的是它后面的線程怠晴。
當(dāng)使用了多個(gè) subscribeOn() 的時(shí)候,只有第一個(gè) subscribeOn() 起作用浴捆。
extends可用于返回類型的限定蒜田,不能用于參數(shù)類型的限定
super可用于參數(shù)類型的限定,不能用于返回類型的限定
4.能不能多切換幾次線程选泻?
通過 observeOn() 的多次調(diào)用冲粤,程序?qū)崿F(xiàn)了線程的多次切換。
不過页眯,不同于 observeOn() 梯捕, subscribeOn() 的位置放在哪里都可以,但它是只能調(diào)用一次的窝撵。
5.Rxjava2 & Rxjava1
RxJava2最大的改動(dòng)就是對(duì)于backpressure的處理傀顾,為此將原來的Observable拆分成了新的Observable和Flowable,同時(shí)其他相關(guān)部分也同時(shí)進(jìn)行了拆分忿族。
RxJava2 vs RxJava1
Refenece:
官網(wǎng)
分支
通俗解釋什么是響應(yīng)式編程
重新理解響應(yīng)式編程
RxJava操作符大全
背壓
RxJava中backpressure這個(gè)概念的理解
給 Android 開發(fā)者的 RxJava 詳解
RxJava Essentials(英文版)講得比較詳細(xì)锣笨,適合RxJava入門學(xué)習(xí)。
RxJava Essentials(中文版)RxJava Essentials的中文翻譯道批。
RxJava教程大集合
RxJava-Android-Samples
給初學(xué)者的RxJava2.0教程