版權(quán)聲明:本文為小斑馬偉原創(chuàng)文章,轉(zhuǎn)載請注明出處输吏!
上篇簡單的闡述了響應式編程的基本理論权旷。這篇主要對響應編程進行詳細的介紹。以及主要介紹響應編程的主要實現(xiàn)-RxJava贯溅。其中RxJava的操作符和調(diào)度器(Scheduler)主要介紹對象炼杖。
一、什么是響應式編程盗迟?
響應式編程:是一種基于異步數(shù)據(jù)流概念的編程模式坤邪。關鍵概念:事件 把現(xiàn)實中的事件, 搬到軟件中區(qū)去罚缕, 事件過程等待 觸發(fā)到另一種事件艇纺。響應式編程最通用的一個場景是UI:我們的移動App必須做出對網(wǎng)絡調(diào)用、用戶觸摸輸入和系統(tǒng)彈框的響應邮弹。RxJava是響應式編程的具體實現(xiàn)黔衡,是異步數(shù)據(jù)處理庫 擴展的觀察者模式。
二腌乡、擴展觀察者模式
擴展的觀察者模式:有四個基本的要素:Observable (被觀察者)盟劫、 Observer (觀察者)、 subscribe (訂閱)与纽、event(事件)侣签。Observable (被觀察者)和 Observer (觀察者)通過 subscribe() 方法實現(xiàn)訂閱關系塘装,Observable就可以在需要的時候通知Observer。其中onCompleted影所、onError蹦肴、onNext是必須要實現(xiàn)的方法,他們的含義分別是:
- 1.onCompleted:事件隊列完結(jié)猴娩,RxJava 不僅把每個事件單獨處理阴幌,還會把它們看做一個隊列。當不會再有新的 onNext發(fā)出時卷中,需要觸發(fā) onCompleted() 方法作為完成標志矛双。
- 2.onError:事件隊列異常,在事件處理過程中出異常時蟆豫,onError() 會被觸發(fā)背零,同時隊列自動終止,不允許再有事件發(fā)出无埃。
- 3.onNext:普通的事件,將要處理的事件添加到事件隊列中毛雇。
- 4.onStart:它會在事件還未發(fā)送之前被調(diào)用嫉称,可以用于做一些準備工作。例如數(shù)據(jù)的清零或重置灵疮,這是一個可選方法织阅,默認情況下它的實現(xiàn)為空。
三震捣、基本概念
RxJava的四個基本角色:Observable荔棉、Observer、Subscriber蒿赢、Subject润樱。Observable和Subject是兩個“生產(chǎn)”實體,Observer和Subscriber是兩個“消費”實體羡棵。RxJava本質(zhì)上是Rx是一個函數(shù)庫壹若,讓開發(fā)者可以利用可觀察序列和LINQ風格查詢操作符來編寫異步和基于事件的程序,開發(fā)者可以用Observables表示異步數(shù)據(jù)流皂冰,用LINQ操作符查詢異步數(shù)據(jù)流店展, 用Schedulers參數(shù)化異步數(shù)據(jù)流的并發(fā)處理,Rx可以這樣定義:Rx = Observables + LINQ + Schedulers秃流。
四赂蕴、RxJava操作符的分類
操作符的分類 |
---|
1.Creating ObServables(創(chuàng)建Observable) |
2.Transforming Observables(轉(zhuǎn)換Observabl) |
3.Filtering Observables(過濾Observable) |
4.Combing Observables(組合Observable) |
5.Error Handling Operators(處理錯誤) |
4.1Creating ObServables(創(chuàng)建Observable)
Create | Just | From |
---|---|---|
Defer | Empty/Never/Throw | Interval |
Range | Repeat | Start |
Timer | *** | *** |
-
1.原始的創(chuàng)建:Observable.create()
private static void create() { //一個原始的創(chuàng)建 Observable.create(new OnSubscribe<String>(){ @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("RxJava學習"); } }).subscribe(new Subscriber<String>(){ @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable arg0) { System.out.println("onError()"); } @Override public void onNext(String arg0) { System.out.println("onNext():"+arg0); } }); }
-
2.Observable.just() 方法可以傳入一到九個參數(shù),它們會按照傳入的參數(shù)的順序來發(fā)射它們舶胀。 just() 方法也可以接受列表或數(shù)組概说,
private static void just() {//對create的簡寫 快捷鍵 Observable.just("sdgsagsag").subscribe(new Subscriber<String>(){ @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable arg0) { // TODO Auto-generated method stub } @Override public void onNext(String arg0) { System.out.println("onNext():"+arg0); } }); }
3.Observable.from() 創(chuàng)建符可以從一個列表/數(shù)組來創(chuàng)建Observable,并一個接一個的從列表/數(shù)組中發(fā)射出來每一個對象碧注,或者也可以從Java Future 類來創(chuàng)建Observable,并發(fā)射Future對象的 .get() 方法返回的結(jié)果值席怪。傳入 Future 作為參數(shù)時应闯,我們可以指定一個超時的值。Observable將等待來自 Future 的結(jié)果挂捻;如果在超時之前仍然沒有結(jié)果返回碉纺,Observable將會觸發(fā) onError() 方法通知觀察者有錯誤發(fā)生了。
private static void from() { //創(chuàng)建一個數(shù)組和列表轉(zhuǎn)換成其他數(shù)據(jù)類型的對象 和數(shù)據(jù)類型
ArrayList<Integer> items = new ArrayList<Integer>();
items.add(1);
items.add(2);
items.add(3);
items.add(5);
items.add(6);
items.add(7);
Observable.from(items).subscribe(new Subscriber<Integer>(){
@Override
public void onCompleted() {
System.out.println("onCompleted()");
}
@Override
public void onError(Throwable arg0) {
// TODO Auto-generated method stub
}
@Override
public void onNext(Integer arg0) {
System.out.println("onNext():"+arg0);
}
});
}
-
4.Observable.defer()對象后賦值 也能輸出來
private static String valuestr; private static void defer() {//Observable對象后賦值 也能輸出來 ArrayList<Integer> items = new ArrayList<Integer>(); items.add(1); items.add(2); items.add(3); items.add(4); items.add(5); items.add(6); items.add(7); Observable observable = Observable.defer(new Func0<Observable<String>>() { @Override public Observable<String> call() { return Observable.just(valuestr); } }); valuestr = "速度更快sag凱撒"; observable.subscribe(new Subscriber<String>(){ @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable arg0) { } @Override public void onNext(String arg0) { System.out.println("onNext():"+arg0); } });
}
-
5.Obervable.range()對象后賦值 也能輸出來
private static void range() {//Observable對象后賦值 也能輸出來 ArrayList<Integer> items = new ArrayList<Integer>(); items.add(1); items.add(2); items.add(3); items.add(4); items.add(5); items.add(6); items.add(7); Observable observable = Observable.range(1, 5).repeat(4); observable.subscribe(new Subscriber<Integer>(){ @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable arg0) { } @Override public void onNext(Integer arg0) { System.out.println("onNext():"+arg0); } }); }
4.2Transforming Observables(轉(zhuǎn)換Observable)
1.Map:將一個對象轉(zhuǎn)換為我們?nèi)魏我粋€想要的對象
2.FlatMap:Observable.flatMap()接收一個Observable的輸出作為輸入刻撒,同時輸出另外一個Observable 一個對象返回Observable對象 一對多
3.GroupBy:通過一定的規(guī)則對數(shù)據(jù)列表進行分組和歸類骨田。
4.Buffer:一次性把Observable數(shù)據(jù)對象發(fā)射到我們的觀察者
-
5.Scan:把前面的數(shù)據(jù)進行累加 發(fā)射到觀察者
private static void testStranfrom() { //整形轉(zhuǎn)換成字符串 Observable.just(123).map(new Func1<Integer, String>() { @Override public String call(Integer arg0) { // TODO Auto-generated method stub return arg0 +""; } }).subscribe(new Subscriber<String>(){ @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable arg0) { } @Override public void onNext(String arg0) { System.out.println("onNext():"+arg0); } }); } private static void textflatmap() { Observable.just(1,2,3,4,5,6).flatMap(new Func1<Integer, Observable<? extends String>>() { @Override public Observable<? extends String> call(Integer arg0) { // TODO Auto-generated method stub return Observable.just(arg0+""); } }).subscribe(new Subscriber<String>(){ @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable arg0) { } @Override public void onNext(String arg0) { System.out.println("onNext():"+arg0); } }); } private static void groupby() { Observable.just(1,2,3,4,5,6).groupBy(new Func1<Integer, Integer>() { @Override public Integer call(Integer arg0) { // TODO Auto-generated method stub return arg0 % 2; //分組的規(guī)則 } }).subscribe(new Observer<GroupedObservable<Integer,Integer>>(){ @Override public void onCompleted() { System.out.println("onCompleted()"); } @Override public void onError(Throwable arg0) { } @Override public void onNext(final GroupedObservable<Integer, Integer> arg) { arg.subscribe(new Subscriber<Integer>(){ @Override public void onCompleted() { } @Override public void onError(Throwable arg0) { } @Override public void onNext(Integer arg0) { System.out.println("groupbt onNext():"+arg.getKey()+"data:"+arg0); } }); } }); } private static void buffer() { Observable.range(1, 5).buffer(2).subscribe(new Observer<List<Integer>>(){ @Override public void onCompleted() { } @Override public void onError(Throwable arg0) { } @Override public void onNext(List<Integer> arg0) { System.out.println("onNext:"+arg0); } }); } private static void scan() { //每次據(jù)取兩個數(shù)據(jù)項,然后向觀察者發(fā)射 Observable.range(1, 5).scan(new Func2<Integer,Integer,Integer>(){ @Override public Integer call(Integer sum, Integer arg1) { // TODO Auto-generated method stub return sum + arg1; } }).subscribe(new Observer<Integer>(){ @Override public void onCompleted() { // TODO Auto-generated method stub } @Override public void onError(Throwable arg0) { // TODO Auto-generated method stub } @Override public void onNext(Integer arg0) { // TODO Auto-generatd method stube System.out.println("onNext:"+arg0); } }); }
4.3Filtering Observables(過濾Observable)
1.Debounce: 一定的時間內(nèi) 數(shù)據(jù)項后面沒有產(chǎn)生數(shù)據(jù) 將該數(shù)據(jù)發(fā)送出去
2.Distinct:去掉重復的
3.ElementAt:指定位置的數(shù)據(jù)
4.Filter:按照我們指定的規(guī)則來進行過濾 得道我們想要的數(shù)據(jù)
5.First:取列表中的第一位數(shù)據(jù)
6.IgnoreElements: 忽略列表上的所有元素 事件結(jié)束的時候回調(diào)Complement函數(shù)
7.Last:我們指定的最后數(shù)位項數(shù)據(jù)
8.Sample:對數(shù)據(jù)樣本數(shù)據(jù)的采集声怔,采集后發(fā)射到觀察者 定時的取樣 取到的數(shù)據(jù)放到一個列表中 再發(fā)射态贤。
9.Skip:跳躍指定數(shù)據(jù)項的數(shù)據(jù) 然后取后面的數(shù)據(jù)
10.SkipLast:跳過數(shù)據(jù)的后幾項
11.Take:取值的意思 在數(shù)據(jù)列表中 如果只取數(shù)據(jù)項的前幾項
-
12.TakeLast:取數(shù)據(jù)列表的最后幾項數(shù)據(jù) 指定的幾位數(shù)據(jù)
private static void testDebounce() { //在一定的時間內(nèi) 數(shù)據(jù)項后面沒有產(chǎn)生數(shù)據(jù) 將該數(shù)據(jù)發(fā)送出去 Observable.create(new OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> arg0) { for(int i = 0; i <10; i++) { try { Thread.sleep(1000); arg0.onNext(1); } catch (InterruptedException e) { arg0.onError(e); } } } }).debounce(1,TimeUnit.SECONDS).subscribe(new Subscriber<Integer>(){ @Override public void onCompleted() { // TODO Auto-generated method stub System.out.println("onCompleted():" + "\n"); } @Override public void onError(Throwable arg0) { // TODO Auto-generated method stub System.out.println("onError:" + arg0+ "\n"); } @Override public void onNext(Integer arg0) { // TODO Auto-generated method stub System.out.println("onNext:" + arg0+ "\n"); } }); } private static void testDistinct() { //去掉重復的 Observable.just(1, 2, 3, 3, 2).distinct().subscribe(new Subscriber<Integer>(){ @Override public void onCompleted() { // TODO Auto-generated method stub System.out.println("onCompleted():" + "\n"); } @Override public void onError(Throwable arg0) { // TODO Auto-generated method stub System.out.println("onError:" + arg0+ "\n"); } @Override public void onNext(Integer arg0) { // TODO Auto-generated method stub System.out.println("onNext:" + arg0+ "\n"); } }); } private static void testElementAt() { //指定位置的數(shù)據(jù) Observable.just(1, 2, 2, 4).elementAt(2).subscribe(new Subscriber<Integer>(){ @Override public void onCompleted() { System.out.println("onCompleted():" + "\n"); } @Override public void onError(Throwable arg0) { System.out.println("onError:" + arg0+ "\n"); } @Override public void onNext(Integer arg0) { System.out.println("onNext:" + arg0+ "\n"); } }); } private static void testFilter() { //去掉重復的 過濾 Observable.just(1, 2, 3, 3, 2).distinct().filter(new Func1<Integer,Boolean>(){ public Boolean call(Integer arg0) { return arg0 > 2; } }).subscribe(new Subscriber<Integer>(){ @Override public void onCompleted() { // TODO Auto-generated method stub System.out.println("onCompleted():" + "\n"); } @Override public void onError(Throwable arg0) { // TODO Auto-generated method stub System.out.println("onError:" + arg0+ "\n"); } @Override public void onNext(Integer arg0) { // TODO Auto-generated method stub System.out.println("onNext:" + arg0+ "\n"); } }); } private static void testFirst() { //指定位置的數(shù)據(jù) Observable.just(1, 2, 2, 4).first().subscribe(new Subscriber<Integer>(){ @Override public void onCompleted() { System.out.println("onCompleted():" + "\n"); } @Override public void onError(Throwable arg0) { System.out.println("onError:" + arg0+ "\n"); } @Override public void onNext(Integer arg0) { System.out.println("onNext:" + arg0+ "\n"); } }); } private static void testIgnoreElement() { Observable.create(new OnSubscribe<Integer>(){ @Override public void call(Subscriber<? super Integer> arg0) { // TODO Auto-generated method stub arg0.onNext(233); //throw new NullPointerException(); } }).ignoreElements().subscribe(new Subscriber<Integer>(){ @Override public void onCompleted() { // TODO Auto-generated method stub System.out.println("onCompleted():" + "\n"); } @Override public void onError(Throwable arg0) { // TODO Auto-generated method stub System.out.println("onError:" + arg0+ "\n"); } @Override public void onNext(Integer arg0) { // TODO Auto-generated method stub System.out.println("onNext:" + arg0+ "\n"); } }); } private static void testSample() { //定時對數(shù)據(jù)進行采樣 再發(fā)送出去 Observable.create(new OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> arg0) { for(int i = 0; i <10; i++) { try { Thread.sleep(1000); arg0.onNext(i); } catch (InterruptedException e) { arg0.onError(e); } } arg0.onCompleted(); } }).sample(1,TimeUnit.SECONDS).subscribe(new Subscriber<Integer>(){ @Override public void onCompleted() { // TODO Auto-generated method stub System.out.println("onCompleted():" + "\n"); } @Override public void onError(Throwable arg0) { // TODO Auto-generated method stub System.out.println("onError:" + arg0+ "\n"); } @Override public void onNext(Integer arg0) { // TODO Auto-generated method stub System.out.println("onNext:" + arg0+ "\n"); } }); } private static void testSkip() { //跳過多少項目 Observable.just(1, 2, 2, 4).skip(3).subscribe(new Subscriber<Integer>(){ @Override public void onCompleted() { System.out.println("onCompleted():" + "\n"); } @Override public void onError(Throwable arg0) { System.out.println("onError:" + arg0+ "\n"); } @Override public void onNext(Integer arg0) { System.out.println("onNext:" + arg0+ "\n"); } }); } private static void testTake() { //取出多少項目 Observable.just(1, 2, 2, 4).take(3).subscribe(new Subscriber<Integer>(){ @Override public void onCompleted() { System.out.println("onCompleted():" + "\n"); } @Override public void onError(Throwable arg0) { System.out.println("onError:" + arg0+ "\n"); } @Override public void onNext(Integer arg0) { System.out.println("onNext:" + arg0+ "\n"); } }); }
4.3Combing Observables(組合Observable)
1.Zip:在一段時間內(nèi)將最近的元素進行組合在一起
2.Merge: 按照時間先后順序 進行組合
3.StartWith: 在當前的數(shù)據(jù)之前插入 我們想要的數(shù)據(jù) (數(shù)據(jù)類型或者Observable)
4.CombineLatest: 最新接觸 組合數(shù)據(jù)
5.Join:
-
6.SwithchOnNext:后面的覆蓋前面的數(shù)據(jù)組合
/** * 用來合并兩個Observable發(fā)射的數(shù)據(jù)項,根據(jù)Func2函數(shù)生成一個新的值并發(fā)射出去醋火。其中一個Observable發(fā)送數(shù)據(jù)結(jié)束或者出現(xiàn)異常后悠汽, * 另外一個Observable也將停在發(fā)射數(shù)據(jù) */ private static void testZip() { Observable<Integer> observable1 = Observable.just(10, 20, 30); Observable<Integer> observable2 = Observable.just(4, 8, 12, 16); Observable.zip(observable1, observable2, new Func2<Integer,Integer,Integer>(){ @Override public Integer call(Integer arg0, Integer arg1) { return arg0 + arg1; } }).subscribe(new Subscriber<Integer>(){ @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable arg0) { System.out.println("onError():"+arg0.getMessage()); } @Override public void onNext(Integer arg0) { System.out.println("onNext arg0 "+arg0); } }); } /** * 將兩個Observable發(fā)射的事件序列組合合并成一個事件序列,就像是Observable發(fā)射的一樣芥驳。你可以簡單的將它理解成兩個Observable合并成一個Observable對象 * */ private static void testMerg() { Observable<Integer> observable1 = Observable.just(10, 20, 30); Observable<Integer> observable2 = Observable.just(4, 8, 12, 16); Observable.merge(observable1,observable2).subscribe(new Subscriber<Integer>(){ @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable arg0) { System.out.println("onError():"+arg0.getMessage()); } @Override public void onNext(Integer arg0) { System.out.println("onNext arg0 "+arg0); } }); } /** * 用于在源Observable發(fā)射的數(shù)據(jù)前插入數(shù)據(jù).使用startWith(Interable<T>)我們還可以在源Observable發(fā)射的數(shù)據(jù)前插入Iterable */ private static void testStartWith() { Observable<Integer> first = Observable.just(1, 3, 5); Observable<Integer> second = Observable.just(2, 4, 6); first.startWith(second).subscribe(new Subscriber<Integer>(){ @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable arg0) { System.out.println("onError():"+arg0.getMessage()); } @Override public void onNext(Integer arg0) { System.out.println("onNext arg0 "+arg0); } }); } /** * 用于將兩個Observable最近發(fā)射的數(shù)據(jù)已經(jīng)Func2函數(shù)的規(guī)則進展集合 5+2 5+4 5+6 */ private static void testCombineLatest() { Observable<Integer> first = Observable.just(1, 3, 5); Observable<Integer> second = Observable.just(2, 4, 6); first.combineLatest(first, second, new Func2<Integer,Integer,Integer>(){ @Override public Integer call(Integer arg0, Integer arg1) { // TODO Auto-generated method stub return arg0 + arg1; } }).subscribe(new Subscriber<Integer>(){ @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable arg0) { System.out.println("onError():"+arg0.getMessage()); } @Override public void onNext(Integer arg0) { System.out.println("onNext arg0 "+arg0); } }); } /** * join()函數(shù)基于時間窗口將兩個Observables發(fā)射的數(shù)據(jù)結(jié)合在一起柿冲,每個Observable在自己的時間窗口內(nèi)獨有有效的,都可以拿來組合 */ private static void testJoin() { Observable<Integer> first = Observable.just(1, 3, 5); Observable<Integer> second = Observable.create(new OnSubscribe<Integer>(){ @Override public void call(Subscriber<? super Integer> arg0) { try{ for(int i = 0; i < 5; i++) { arg0.onNext(i); } arg0.onCompleted(); }catch(Exception e){ e.printStackTrace(); } } }); }
4.4 Error Handling Operators(處理錯誤)
Catch:捕獲異常處理
**onErrorReturn: **遇見異常正常結(jié)束數(shù)據(jù)流
onErrorResumeNext:當數(shù)據(jù)遇見異常兆旬,用正常數(shù)據(jù)代替錯誤數(shù)據(jù)項 不可以捕獲到異常信息
onExceptionResumeNext:當有異常的時候假抄,可以用正確的數(shù)據(jù)流替換掉,并且和丽猬、可以捕獲到異常信息
五宿饱、Schedulers(調(diào)度器)種類
調(diào)度器(Scheduler)是RxJava以一種極其簡單的方式解決多線程問題機制。
Schedulers(調(diào)度器)種類:
.io() I/O操作
.computation()計算 計算類型
.immediate() 這個調(diào)度器允許你立即在當前線程執(zhí)行你指定的工作
.newThread() 建立一個新的線程 它為指定任務啟動一個新的線程
.trampoline() 按照順序排隊處理的 調(diào)度器將會按序處理隊列脚祟,并運行隊列中每一個任務