RxJava技術探索及其Android中的應用

1蛾狗、RxJava簡介

RxJava 的本質(zhì)可以壓縮為異步這一個詞。說到根上座菠,它就是一個實現(xiàn)異步操作的庫狸眼,Rx的全稱是Reactive Extensions,直譯過來就是響應式擴展浴滴。Rx基于觀察者模式拓萌,他是一種編程模型,目標是提供一致的編程接口升略,幫助開發(fā)者更方便的處理異步數(shù)據(jù)流微王。ReactiveX.io給的定義是,Rx是一個使用可觀察數(shù)據(jù)流進行異步編程的編程接口品嚣,ReactiveX結合了觀察者模式炕倘、迭代器模式和函數(shù)式編程的精華。RxJava說白了就是用Java語言邊寫的使用各種操作符形成鏈式結構來處理異步數(shù)據(jù)流的工具翰撑。

2罩旋、RxJava基礎及常用操作符

RxJava 有四個基本概念:
Observable(可觀察者,即被觀察者);
Observer(觀察者)瘸恼;
subscribe(訂閱)事件劣挫;
Observable和Observer通過 subscribe() 方法實現(xiàn)訂閱關系册养,從而 Observable可以在需要的時候發(fā)出事件來通知 Observer东帅。

(1) 創(chuàng)建 Observable被觀察者

Observable<String> myObservable = Observable.create(new Observable.OnSubscribe<String>() {    
@Override    
public void call(Subscriber<? super String> subscriber) {  
    subscriber.onNext("Hello, RxJava!");        
    subscriber.onCompleted();    
}});

(2) 創(chuàng)建 Observer觀察者

Subscriber<String> mySubscriber = new Subscriber<String>() {   
    @Override    
    public void onCompleted() { }   
    @Override    
    public void onError(Throwable e) { }    
    @Override    
    public void onNext(String s) {       
    Log.d("haijiang", "--->" + s); }
};

Subscriber是Observer的擴展,在使用過程中球拦,我們直接用Subscriber作為觀察者對象就OK了靠闭!

(3) 建立訂閱關系

myObservable.subscribe(mySubscriber);

一旦建立訂閱關系坎炼,OnSubscribe中的call方法就會被調(diào)用愧膀,在call方法中主動觸發(fā)了觀察者的onNext,onCompleted方法谣光,可看到輸出“Hello, RxJava!” 檩淋。
以上是一個最基本的流程,我們可以寫成鏈式調(diào)用:

Observable.create(new Observable.OnSubscribe<String>() {    
@Override    
public void call(Subscriber<? super String> subscriber) {  
    subscriber.onNext("Hello, RxJava!");        
    subscriber.onCompleted();    
}}).subscribe(new Subscriber<String>() {   
    @Override    
    public void onCompleted() { }   
    @Override    
    public void onError(Throwable e) { }    
    @Override    
    public void onNext(String s) {       
    Log.d("haijiang", "--->" + s); }
});

更多操作符欣賞:

just

subscribe方法有一個重載版本萄金,接受三個Action1類型的參數(shù)蟀悦,分別對應OnNext,OnComplete氧敢, OnError函數(shù) 
Observable.just("hello","rxjava","rxandroid").subscribe(new Action1<String>() {    
   @Override    
   public void call(String s) {       
        Log.d("haijiang", "--->" + s);   
 }});

ActionX沒有返回日戈,還有一種FuncX有返回的操作,后面會說孙乖。
just操作符是創(chuàng)建一個Observable浙炼,一次發(fā)送傳入的參數(shù)。

from

/** * Observable.from()方法唯袄,它接收一個集合作為輸入 */
String[] strArrsy = {"hello","rxjava","rxandroid"};
Observable.from(strArrsy).subscribe(new Action1<String>() {    
   @Override    
   public void call(String s) {        
        Log.d("haijiang", "--->" + s);    
}});

下面介紹兩個重量級操作符map和flatMap弯屈,核心變換,靈活操作恋拷。

map

map是一對一的變化季俩,將一個Observable<T>變換成Observable<R>

Observable.just("I LOVE YOU!").map(new Func1<String, Integer>() {   
    @Override    
    public Integer call(String s) {        
         return 520;    
   }}).subscribe(new Action1<Integer>() {    
    @Override    
    public void call(Integer s) {       
         Log.d("haijiang", "--->" + s);   
 }});

通過map,F(xiàn)unc1將String轉換成了Int梅掠;其中FuncX是RxJava的一個包裝接口酌住,跟ActionX類似,只不過FuncX是有返回對象的阎抒。

flatMap

flatMap()中返回的是個 Observable對象酪我,并且這個 Observable對象并不是被直接發(fā)送到了 Subscriber
的回調(diào)方法中。flatMap()的原理是這樣的:

  1. 使用傳入的事件對象創(chuàng)建一個 Observable對象且叁;
  2. 并不發(fā)送這個 Observable, 而是將它激活都哭,于是它開始發(fā)送事件;
  3. 每一個創(chuàng)建出來的 Observable發(fā)送的事件,都被匯入同一個 Observable欺矫,而這個 Observable負責將這些事件統(tǒng)一交給Subscriber 的回調(diào)方法纱新。
    這三個步驟,把事件拆成了兩級穆趴,通過一組新創(chuàng)建的 Observable將初始的對象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去脸爱。而這個『鋪平』就是 flatMap() 所謂的 flat。
private ArrayList<Data> mData;
Observable.from(mData).flatMap(new Func1<Data, Observable<ChildData>>() {
      @Override 
      public Observable<ChildData> call(Data data) { 
             return Observable.from(data.getChildData); 
        } 
}).subscribe(new Action1<ChildData>() { 
      @Override 
      public void call(ChildData cd) {
             Log.d("haijiang", "--->" + cd.getName); 
  }});

concatMap

flatMap()操作符使用你提供的原本會被原始Observable發(fā)送的事件未妹,來創(chuàng)建一個新的Observable簿废。而且這個操作符,返回的是一個自身發(fā)送事件并合并結果的Observable络它∽迕剩可以用于任何由原始Observable發(fā)送出的事件,發(fā)送合并后的結果化戳。記住单料,flatMap()可能交錯的發(fā)送事件,最終結果的順序可能并是不原始Observable發(fā)送時的順序点楼。為了防止交錯的發(fā)生扫尖,可以使用與之類似的concatMap()操作符。綜上所述盟步,就是利用concatMap替換flatMap操作符藏斩,輸入順序就防止了交錯,跟原始Obervable順序一致却盘。

timer()

timer操作符:用于創(chuàng)建Observabl狰域,延遲發(fā)送一次。
下面延時兩秒黄橘,輸出log

Observable.timer(2, TimeUnit.SECONDS)
              .subscribe(new Observer<Long>() {
                  @Override
                  public void onCompleted() {
                      log.d ("completed");
                  }

                  @Override
                  public void onError(Throwable e) {
                      log.e("error");
                  }

                  @Override
                  public void onNext(Long number) {
                      log.d ("hello world");
                  }
              });

interval()

interval:用于創(chuàng)建Observable兆览,用于每個XX秒循環(huán)進行某個操作

Observable.timer(2, TimeUnit.SECONDS).subscribe(new Action1<Long>() { 
     @Override 
     public void call(Long aLong) { /
    /TODO WHAT YOU WANT 
   } 
});

delay()

delay:用于事件流中,可以延遲發(fā)送事件流中的某一次發(fā)送塞关。

retryWhen

retryWhen()是RxJava的一種錯誤處理機制抬探,當遇到錯誤時,將錯誤傳遞給另一個Observable來決定是否要重新給訂閱這個Observable帆赢。下面封裝一個處理網(wǎng)絡錯誤的類

public class RetryWhenProcess implements Func1<Observable<? extends Throwable>, Observable<?>> {

private long mInterval;

    public RetryWhenProcess(long interval) {

        mInterval = interval;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> observable) {
        return observable.flatMap(new Func1<Throwable, Observable<?>>() {
            @Override
            public Observable<?> call(Throwable throwable) {
                return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                        @Override
                        public Observable<?> call(Throwable throwable) {
                            if (throwable instanceof UnknownHostException) {
                                return Observable.error(throwable);
                            }
                            return Observable.just(throwable).zipWith(Observable.range(1, 5), new Func2<Throwable, Integer, Integer>() {
                                @Override
                                public Integer call(Throwable throwable, Integer i) {

                                    return i;
                                }
                            }).flatMap(new Func1<Integer, Observable<? extends Long>>() {
                                @Override
                                public Observable<? extends Long> call(Integer retryCount) {

                                    return Observable.timer((long) Math.pow(mInterval, retryCount), TimeUnit.SECONDS);
                                }
                            });
                        }
                    });
            }
        });
    }
}

使用方法:

.retryWhen(new RetryWhenProcess(5))

compose

compose()是針對 Observable自身進行變換小压。假設在程序中有多個 Observable,并且他們都需要應用一組相同的變換可以使用椰于。
場景是這樣的:work thread 中處理數(shù)據(jù)怠益,然后 UI thread 中處理結果。當然瘾婿,我們知道是要使用 subscribeOn() 和 observeOn() 進行處理蜻牢。最常見的場景是烤咧,調(diào)server 的 API 接口取數(shù)據(jù)的時候,那么抢呆,那么多接口煮嫌,反復寫這兩個操作符是蛋疼的,為了避免這種情況抱虐,我們可以通過 compse() 操作符來實現(xiàn)復用昌阿,下面面這段代碼就實現(xiàn)了這樣的功能。

/**
 * 這個類是 小鄧子 提供的梯码!
 */
public class SchedulersCompat {
    private static final Observable.Transformer computationTransformer =
            new Observable.Transformer() {
                @Override public Object call(Object observable) {
                    return ((Observable) observable).subscribeOn(Schedulers.computation())
                            .observeOn(AndroidSchedulers.mainThread());
                }
            };
    private static final Observable.Transformer ioTransformer = new Observable.Transformer() {
        @Override public Object call(Object observable) {
            return ((Observable) observable).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
    private static final Observable.Transformer newTransformer = new Observable.Transformer() {
        @Override public Object call(Object observable) {
            return ((Observable) observable).subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
    private static final Observable.Transformer trampolineTransformer = new Observable.Transformer() {
        @Override public Object call(Object observable) {
            return ((Observable) observable).subscribeOn(Schedulers.trampoline())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
    private static final Observable.Transformer executorTransformer = new Observable.Transformer() {
        @Override public Object call(Object observable) {
            return ((Observable) observable).subscribeOn(Schedulers.from(ExecutorManager.eventExecutor))
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
    /**
     * Don't break the chain: use RxJava's compose() operator
     */
    public static <T> Observable.Transformer<T, T> applyComputationSchedulers() {
        return (Observable.Transformer<T, T>) computationTransformer;
    }
    public static <T> Observable.Transformer<T, T> applyIoSchedulers() {
        return (Observable.Transformer<T, T>) ioTransformer;
    }
    public static <T> Observable.Transformer<T, T> applyNewSchedulers() {
        return (Observable.Transformer<T, T>) newTransformer;
    }
    public static <T> Observable.Transformer<T, T> applyTrampolineSchedulers() {
        return (Observable.Transformer<T, T>) trampolineTransformer;
    }
    public static <T> Observable.Transformer<T, T> applyExecutorSchedulers() {
        return (Observable.Transformer<T, T>) executorTransformer;
    }
}

使用方式:

.compose(SchedulersCompat.ioTransformer );

3宝泵、線程控制 — Scheduler

在RxJava 中好啰,Scheduler—調(diào)度器轩娶,相當于線程控制器,RxJava 通過它來指定每一段代碼應該運行在什么樣的線程框往。RxJava 已經(jīng)內(nèi)置了幾個 Schedule鳄抒,它們已經(jīng)適合大多數(shù)的使用場景:
Schedulers.immediate(): 直接在當前線程運行,相當于不指定線程椰弊。這是默認的 Scheduler许溅。
Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作秉版。
Schedulers.io(): I/O 操作(讀寫文件贤重、讀寫數(shù)據(jù)庫、網(wǎng)絡信息交互等)所使用的 Scheduler清焕。行為模式和 newThread() 差不多并蝗,區(qū)別在于 io()的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程秸妥,因此多數(shù)情況下 io()比 newThread()更有效率滚停。不要把計算工作放在 io()中,可以避免創(chuàng)建不必要的線程粥惧。
Schedulers.computation(): 計算所使用的 Scheduler键畴。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作突雪,例如圖形的計算起惕。這個 Scheduler使用的固定的線程池,大小為 CPU 核數(shù)咏删。不要把 I/O 操作放在 computation() 中惹想,否則 I/O 操作的等待時間會浪費 CPU。另外饵婆, Android 還有一個專用的 AndroidSchedulers.mainThread()勺馆,它指定的操作將在 Android 主線程運行戏售。

有了這幾個 Scheduler,就可以使用 subscribeOn() 和 observeOn()兩個方法來對線程進行控制草穆。subscribeOn() 指定subscribe()所發(fā)生的線程灌灾,即 Observable.OnSubscribe被激活時所處的線程,或者叫做事件產(chǎn)生的線程悲柱。 observeOn()指定Subscriber所運行在的線程锋喜,或者叫做事件消費的線程。

4RxJava在android開發(fā)中的一些應用

參考:可能是東半球最全的RxJava使用場景小結 (http://blog.csdn.net/theone10211024/article/details/50435325)

RxBinding
節(jié)流(防止按鈕的重復點擊)
輪詢豌鸡,
定時操作
RxPermissions
RxBus
RxJava與Retrofit

(1)RxBinding

RxBindingJakeWharton大牛用RxJava為Android控件編寫的一個控件綁定庫粗蔚。
例子:

Button button = (Button) findViewById(R.id.button); 
RxView.clicks(button).subscribe(new Action1<Void>() { 
@Override 
public void call(Void aVoid) { 
Log.i("test", "clicked"); 
}
 });

(2)防止重復點擊

RxView.clicks(button).debounce(300, TimeUnit.MILLISECONDS).subscribe(new Action1<Void>() {
            @Override
            public void call(Void aVoid) {
                Log.i("test", "clicked");
            }
        });

(3)EditText輸入請求。避免每次輸入產(chǎn)生頻繁的請求

RxTextView.textChangeEvents(inputEditText)
      .debounce(400, TimeUnit.MILLISECONDS) 
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Observer<TextViewTextChangeEvent>() {
    @Override
    public void onCompleted() {
        log.d("onComplete");
    }

    @Override
    public void onError(Throwable e) {
        log.d("Error");
    }

    @Override
    public void onNext(TextViewTextChangeEvent onTextChangeEvent) {
        log.d(format("Searching for %s", onTextChangeEvent.text().toString()));
    }
});

(4)RxPermissions

RxPermissions也是國外的大牛開發(fā)的基于RxJava的Android權限管理庫拐辽,他讓6.0以上的權限管理更加的簡單育韩,如果有適配6.0以上的手機的需求,這個庫是個不錯的選擇蛇更。下面我們來看看基本的用法瞻赶。

 // 請求相機權限
    RxPermissions.getInstance(this)
    .request(Manifest.permission.CAMERA)
    .subscribe(granted -> {
        if (granted) { // 用戶同意了(在6.0之前的手機始終都為true)
          //可以拍照了
        } else {
           //可以在這里提示用戶,或者再次請求
        }
    });

更多功能研究github吧

(5)RxBus

參考:http://www.reibang.com/p/ca090f6e2fe2
不多說派任,上代碼

/**
* RxBus
* Created by YoKeyword on 2015/6/17.
*/
public class RxBus {
    private static volatile RxBus defaultInstance;

    private final Subject<Object, Object> bus;
    // PublishSubject只會把在訂閱發(fā)生的時間點之后來自原始Observable的數(shù)據(jù)發(fā)射給觀察者
    public RxBus() {
      bus = new SerializedSubject<>(PublishSubject.create());
    }
    // 單例RxBus
    public static RxBus getDefault() {
        if (defaultInstance == null) {
            synchronized (RxBus.class) {
                if (defaultInstance == null) {
                    defaultInstance = new RxBus();
                }
            }
        }
        return rxBus;
    }
    // 發(fā)送一個新的事件
    public void post (Object o) {
        bus.onNext(o);
    }
    // 根據(jù)傳遞的 eventType 類型返回特定類型(eventType)的 被觀察者
    public <T> Observable<T> toObservable (Class<T> eventType) {
        return bus.ofType(eventType);
//        這里感謝小鄧子的提醒: ofType = filter + cast
//        return bus.filter(new Func1<Object, Boolean>() {
//            @Override
//            public Boolean call(Object o) {
//                return eventType.isInstance(o);
//            }
//        }) .cast(eventType);
    }
}

(6)RxJava 與 Retrofit 結合的最佳實踐

參考扔物線文章:http://gank.io/post/56e80c2c677659311bed9841

參考:
1砸逊、http://www.reibang.com/users/df40282480b4/latest_articles
2、http://gank.io/post/560e15be2dca930e00da1083#toc
3掌逛、http://www.reibang.com/p/8cf84f719188

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末师逸,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子豆混,更是在濱河造成了極大的恐慌篓像,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,734評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件崖叫,死亡現(xiàn)場離奇詭異遗淳,居然都是意外死亡,警方通過查閱死者的電腦和手機心傀,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評論 3 394
  • 文/潘曉璐 我一進店門屈暗,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人脂男,你說我怎么就攤上這事养叛。” “怎么了宰翅?”我有些...
    開封第一講書人閱讀 164,133評論 0 354
  • 文/不壞的土叔 我叫張陵弃甥,是天一觀的道長。 經(jīng)常有香客問我汁讼,道長淆攻,這世上最難降的妖魔是什么阔墩? 我笑而不...
    開封第一講書人閱讀 58,532評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮瓶珊,結果婚禮上啸箫,老公的妹妹穿的比我還像新娘。我一直安慰自己伞芹,他們只是感情好忘苛,可當我...
    茶點故事閱讀 67,585評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著唱较,像睡著了一般扎唾。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上南缓,一...
    開封第一講書人閱讀 51,462評論 1 302
  • 那天胸遇,我揣著相機與錄音,去河邊找鬼西乖。 笑死狐榔,一個胖子當著我的面吹牛坛增,可吹牛的內(nèi)容都是我干的获雕。 我是一名探鬼主播,決...
    沈念sama閱讀 40,262評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼收捣,長吁一口氣:“原來是場噩夢啊……” “哼届案!你這毒婦竟也來了?” 一聲冷哼從身側響起罢艾,我...
    開封第一講書人閱讀 39,153評論 0 276
  • 序言:老撾萬榮一對情侶失蹤楣颠,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后咐蚯,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體童漩,經(jīng)...
    沈念sama閱讀 45,587評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,792評論 3 336
  • 正文 我和宋清朗相戀三年春锋,在試婚紗的時候發(fā)現(xiàn)自己被綠了矫膨。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,919評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡期奔,死狀恐怖侧馅,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情呐萌,我是刑警寧澤馁痴,帶...
    沈念sama閱讀 35,635評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站肺孤,受9級特大地震影響罗晕,放射性物質(zhì)發(fā)生泄漏济欢。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,237評論 3 329
  • 文/蒙蒙 一小渊、第九天 我趴在偏房一處隱蔽的房頂上張望船逮。 院中可真熱鬧,春花似錦粤铭、人聲如沸挖胃。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽酱鸭。三九已至,卻和暖如春垛吗,著一層夾襖步出監(jiān)牢的瞬間凹髓,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評論 1 269
  • 我被黑心中介騙來泰國打工怯屉, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蔚舀,地道東北人。 一個月前我還...
    沈念sama閱讀 48,048評論 3 370
  • 正文 我出身青樓锨络,卻偏偏與公主長得像赌躺,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子羡儿,可洞房花燭夜當晚...
    茶點故事閱讀 44,864評論 2 354

推薦閱讀更多精彩內(nèi)容