RxJava整理(一)

函數(shù)響應(yīng)式編程

RxJava到底是什么

一個在 Java VM 上使用可觀測的序列來組成異步的届垫、基于事件的程序的庫。RxJava 的本質(zhì)可以壓縮為異步這一個詞。說到根上,它就是一個實現(xiàn)異步操作的庫摹迷。

特點
簡潔  隨著程序邏輯變得越來越復(fù)雜疟赊,它依然能夠保持簡潔郊供。

創(chuàng)建Observer

Observer 即觀察者,它決定事件觸發(fā)的時候?qū)⒂性鯓拥男袨?/p>

Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }
    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }
    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

Subscriber是Observer 的子類

Subscriber<String> mSubscriber = new Subscriber<String>() {
            @Override
            public void onStart() {
                super.onStart();
            }
            @Override
            public void onCompleted() {

            }
            @Override
            public void onError(Throwable e) {

            }
            @Override
            public void onNext(String s) {
                Toast.makeText(MainActivity.this, s, Toast.LENGTH_SHORT).show();
            }
        };
  • Observer比Subscriber多一個onStart()方法近哟。它會在 subscribe 剛開始驮审,而事件還未發(fā)送之前被調(diào)用,可以用于做一些準備工作吉执,例如數(shù)據(jù)的清零或重置疯淫。這是一個可選方法,默認情況下它的實現(xiàn)為空戳玫。
    需要注意的是熙掺,如果對準備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執(zhí)行)咕宿, onStart()就不適用了币绩,因為它總是在 subscribe 所發(fā)生的線程被調(diào)用蜡秽,而不能指定線程。要在指定的線程來做準備工作缆镣,可以使用 doOnSubscribe()方法芽突。
Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            progressBar.setVisibility(View.VISIBLE); // 需要在主線程執(zhí)行
        }
    })
    .subscribeOn(AndroidSchedulers.mainThread()) // 指定主線程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

它和 Subscriber.onStart() 同樣是在 subscribe() 調(diào)用后而且在事件發(fā)送前執(zhí)行,但區(qū)別在于它可以指定線程董瞻。默認情況下寞蚌, doOnSubscribe() 執(zhí)行在subscribe()發(fā)生的線程;而如果在doOnSubscribe() 之后有 subscribeOn()的話钠糊,它將執(zhí)行在離它最近的 subscribeOn()所指定的線程挟秤。

  • unsubscribe(): 這是 Subscriber所實現(xiàn)的另一個接口 Subscription
    的方法,用于取消訂閱抄伍。在這個方法被調(diào)用后煞聪,Subscriber將不再接收事件。一般在這個方法調(diào)用前逝慧,可以使用 isUnsubscribed()先判斷一下狀態(tài)昔脯。 unsubscribe()這個方法很重要,因為在 subscribe()后笛臣, Observable會持有 Subscriber的引用云稚,這個引用如果不能及時被釋放,將有內(nèi)存泄露的風(fēng)險沈堡。所以最好保持一個原則:要在不再使用的時候盡快在合適的地方(例如 onPause() onStop()等方法中)調(diào)用 unsubscribe()來解除引用關(guān)系静陈,以避免內(nèi)存泄露的發(fā)生。

創(chuàng)建Observable

Observable 即被觀察者诞丽,它決定什么時候觸發(fā)事件以及觸發(fā)怎樣的事件鲸拥。

  • 使用create()方法創(chuàng)建
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});

當(dāng) Observable 被訂閱的時候,OnSubscribe 的 call() 方法會自動被調(diào)用僧免,事件序列就會依照設(shè)定依次觸發(fā)(對于上面的代碼刑赶,就是觀察者Subscriber 將會被調(diào)用三次 onNext() 和一次 onCompleted())

  • just(T...): 將傳入的參數(shù)依次發(fā)送出來。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
  • from(T[]) / from(Iterable<? extends T>) : 將傳入的數(shù)組或 Iterable 拆分成具體對象后懂衩,依次發(fā)送出來撞叨。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);

Subscribe (訂閱)

創(chuàng)建了 Observable 和 Observer 之后,再用 subscribe() 方法將它們聯(lián)結(jié)起來浊洞。

observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);```
>有人可能會注意到牵敷, subscribe() 這個方法有點怪:它看起來是『observalbe 訂閱了 observer / subscriber』而不是『observer / subscriber 訂閱了 observalbe』,這看起來就像『雜志訂閱了讀者』一樣顛倒了對象關(guān)系法希。這讓人讀起來有點別扭枷餐,不過如果把 API 設(shè)計成observer.subscribe(observable) / subscriber.subscribe(observable) ,雖然更加符合思維邏輯苫亦,但對流式 API 的設(shè)計就造成影響了毛肋,比較起來明顯是得不償失的奕锌。

`Observable.subscribe(Subscriber)`的內(nèi)部實現(xiàn)是這樣的(僅核心代碼):

// 注意:這不是 subscribe() 的源碼,而是將源碼中與性能村生、兼容性惊暴、擴展性有關(guān)的代碼剔除后的核心代碼。
// 如果需要看源碼趁桃,可以去 RxJava 的 GitHub 倉庫下載辽话。
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}

可以看到,`subscriber()` 做了3件事:

* 調(diào)用 `Subscriber.onStart()` 卫病。這個方法在前面已經(jīng)介紹過油啤,是一個可選的準備方法。
* 調(diào)用 `Observable` 中的 `OnSubscribe.call(Subscriber)` 蟀苛。在這里益咬,事件發(fā)送的邏輯開始運行。從這也可以看出帜平,在 RxJava 中幽告, `Observable` 并不是在創(chuàng)建的時候就立即開始發(fā)送事件,而是在它被訂閱的時候裆甩,即當(dāng) `subscribe()` 方法執(zhí)行的時候冗锁。
* 將傳入的 `Subscriber` 作為 `Subscription` 返回。這是為了方便 `unsubscribe()`.


除了 `subscribe(Observer)` 和 `subscribe(Subscriber)` 嗤栓,`subscribe()` 還支持不完整定義的回調(diào)冻河,RxJava 會自動根據(jù)定義創(chuàng)建出 Subscriber 。形式如下:

Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自動創(chuàng)建 Subscriber 茉帅,并使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動創(chuàng)建 Subscriber 叨叙,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction堪澎、 onErrorAction 和 onCompletedAction 來定義 onNext()擂错、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);


Observable.just("1111")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
tv1.setText(s);
}
});

簡單解釋一下這段代碼中出現(xiàn)的 Action1 和 Action0。 Action0 是 RxJava 的一個接口全封,它只有一個方法 `call()`马昙,這個方法是無參無返回值的;由于 `onCompleted() `方法也是無參無返回值的刹悴,因此 `Action0` 可以被當(dāng)成一個包裝對象,將 `onCompleted()` 的內(nèi)容打包起來將自己作為一個參數(shù)傳入 `subscribe() `以實現(xiàn)不完整定義的回調(diào)攒暇。這樣其實也可以看做將 `onCompleted() `方法作為參數(shù)傳進了 `subscribe()`土匀,相當(dāng)于其他某些語言中的『閉包』。 Action1 也是一個接口形用,它同樣只有一個方法` call(T param)`就轧,這個方法也無返回值证杭,但有一個參數(shù);與 Action0 同理妒御,由于 `onNext(T obj)` 和 `onError(Throwable error)` 也是單參數(shù)無返回值的解愤,因此 Action1 可以將 `onNext(obj) `和 `onError(error) `打包起來傳入 `subscribe() `以實現(xiàn)不完整定義的回調(diào)。事實上乎莉,雖然 Action0 和 Action1 在 API 中使用最廣泛送讲,但 RxJava 是提供了多個 ActionX 形式的接口 (例如 Action2, Action3) 的,它們可以被用以包裝不同的無返回值的方法惋啃。
>正如前面所提到的哼鬓,Observer 和 Subscriber 具有相同的角色,而且 Observer 在 `subscribe()` 過程中最終會被轉(zhuǎn)換成 Subscriber 對象边灭,因此异希,從這里開始,后面的描述我將用 Subscriber 來代替 Observer 绒瘦,這樣更加嚴謹称簿。

---------------
##場景示例
######a.打印字符串?dāng)?shù)組
將字符串?dāng)?shù)組 names中的所有字符串依次打印出來:

String[] names = ...;
Observable.from(names)
.subscribe(new Action1<String>() {
@Override
public void call(String name) {
Log.d(tag, name);
}
});

######b. 由 id 取得圖片并顯示
由指定的一個 drawable 文件 id drawableRes 取得圖片,并顯示在 ImageView 中惰帽,并在出現(xiàn)異常的時候打印 Toast 報錯:

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
}).subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
    Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}

});

然而
![](http://upload-images.jianshu.io/upload_images/1798389-9e6a44afb9cb0501.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)在 RxJava 的默認規(guī)則中予跌,事件的發(fā)出和消費都是在同一個線程的。也就是說善茎,如果只用上面的方法券册,實現(xiàn)出來的只是一個同步的觀察者模式。觀察者模式本身的目的就是『后臺處理垂涯,前臺回調(diào)』的異步機制烁焙,因此異步對于 RxJava 是至關(guān)重要的。而要實現(xiàn)異步耕赘,則需要用到 RxJava 的另一個概念: Scheduler 骄蝇。
##線程控制 —— Scheduler 

在RxJava 中,Scheduler ——調(diào)度器操骡,相當(dāng)于線程控制器九火,RxJava 通過它來指定每一段代碼應(yīng)該運行在什么樣的線程。RxJava 已經(jīng)內(nèi)置了幾個 Scheduler 册招,它們已經(jīng)適合大多數(shù)的使用場景:
* `Schedulers.immediate()`: 直接在當(dāng)前線程運行岔激,相當(dāng)于不指定線程。這是默認的 Scheduler是掰。
* `Schedulers.newThread()`: 總是啟用新線程虑鼎,并在新線程執(zhí)行操作。
* `Schedulers.io()`: I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫炫彩、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler匾七。行為模式和 newThread() 差不多艘狭,區(qū)別在于 io() 的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池杯聚,可以重用空閑的線程,因此多數(shù)情況下 io() 比 newThread() 更有效率抓督。不要把計算工作放在 io() 中杉允,可以避免創(chuàng)建不必要的線程邑贴。
* `Schedulers.computation()`: 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算夺颤,即不會被 I/O 等操作限制性能的操作痢缎,例如圖形的計算。這個 Scheduler 使用的固定的線程池世澜,大小為 CPU 核數(shù)独旷。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU寥裂。
* 另外嵌洼, Android 還有一個專用的` AndroidSchedulers.mainThread()`,它指定的操作將在 Android 主線程運行封恰。

>有了這幾個 Scheduler 麻养,就可以使用 `subscribeOn()` 和 `observeOn() `兩個方法來對線程進行控制了。  `subscribeOn()`: 指定 `subscribe() `所發(fā)生的線程诺舔,即 `Observable.OnSubscribe `被激活時所處的線程鳖昌。或者叫做事件產(chǎn)生的線程低飒。 `observeOn()`: 指定 Subscriber 所運行在的線程许昨。或者叫做事件消費的線程褥赊。

Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});

>上面這段代碼中糕档,由于 `subscribeOn(Schedulers.io()) `的指定,被創(chuàng)建的事件的內(nèi)容 1拌喉、2速那、3、4 將會在 IO 線程發(fā)出尿背;而由于 `observeOn(AndroidScheculers.mainThread()) `的指定端仰,因此 subscriber 數(shù)字的打印將發(fā)生在主線程 。事實上残家,這種在` subscribe() `之前寫上兩句 `subscribeOn(Scheduler.io()) `和` observeOn(AndroidSchedulers.mainThread()) `的使用方式非常常見榆俺,它適用于多數(shù)的 『后臺線程取數(shù)據(jù),主線程顯示』的程序策略坞淮。

而前面提到的由圖片 id 取得圖片并顯示的例子茴晋,如果也加上這兩句:

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程
.subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
    Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}

});

那么,加載圖片將會發(fā)生在 IO 線程回窘,而設(shè)置圖片則被設(shè)定在了主線程诺擅。這就意味著,即使加載圖片耗費了幾十甚至幾百毫秒的時間啡直,也不會造成絲毫界面的卡頓烁涌。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市酒觅,隨后出現(xiàn)的幾起案子撮执,更是在濱河造成了極大的恐慌,老刑警劉巖舷丹,帶你破解...
    沈念sama閱讀 211,290評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件抒钱,死亡現(xiàn)場離奇詭異,居然都是意外死亡颜凯,警方通過查閱死者的電腦和手機谋币,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評論 2 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來症概,“玉大人蕾额,你說我怎么就攤上這事”顺牵” “怎么了诅蝶?”我有些...
    開封第一講書人閱讀 156,872評論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長募壕。 經(jīng)常有香客問我调炬,道長,這世上最難降的妖魔是什么司抱? 我笑而不...
    開封第一講書人閱讀 56,415評論 1 283
  • 正文 為了忘掉前任筐眷,我火速辦了婚禮,結(jié)果婚禮上习柠,老公的妹妹穿的比我還像新娘匀谣。我一直安慰自己,他們只是感情好资溃,可當(dāng)我...
    茶點故事閱讀 65,453評論 6 385
  • 文/花漫 我一把揭開白布武翎。 她就那樣靜靜地躺著,像睡著了一般溶锭。 火紅的嫁衣襯著肌膚如雪宝恶。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,784評論 1 290
  • 那天,我揣著相機與錄音垫毙,去河邊找鬼霹疫。 笑死,一個胖子當(dāng)著我的面吹牛综芥,可吹牛的內(nèi)容都是我干的丽蝎。 我是一名探鬼主播,決...
    沈念sama閱讀 38,927評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼膀藐,長吁一口氣:“原來是場噩夢啊……” “哼屠阻!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起额各,我...
    開封第一講書人閱讀 37,691評論 0 266
  • 序言:老撾萬榮一對情侶失蹤国觉,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后虾啦,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體麻诀,經(jīng)...
    沈念sama閱讀 44,137評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,472評論 2 326
  • 正文 我和宋清朗相戀三年缸逃,在試婚紗的時候發(fā)現(xiàn)自己被綠了针饥。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,622評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡需频,死狀恐怖丁眼,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情昭殉,我是刑警寧澤苞七,帶...
    沈念sama閱讀 34,289評論 4 329
  • 正文 年R本政府宣布,位于F島的核電站挪丢,受9級特大地震影響蹂风,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜乾蓬,卻給世界環(huán)境...
    茶點故事閱讀 39,887評論 3 312
  • 文/蒙蒙 一惠啄、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧任内,春花似錦撵渡、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至越除,卻和暖如春节腐,著一層夾襖步出監(jiān)牢的瞬間外盯,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工翼雀, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留饱苟,地道東北人。 一個月前我還...
    沈念sama閱讀 46,316評論 2 360
  • 正文 我出身青樓锅纺,卻偏偏與公主長得像掷空,于是被迫代替她去往敵國和親肋殴。 傳聞我的和親對象是個殘疾皇子囤锉,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,490評論 2 348

推薦閱讀更多精彩內(nèi)容