Android第三方開源框架——RxJava

RxJava是ReactiveX中使用Java語言實現(xiàn)的版本兽愤,而ReactiveX是一種基于異步數(shù)據(jù)流概念的編程模式,響應(yīng)式編程屡律。

RxJava 有四個基本概念:

Observable (可觀察者腌逢,即被觀察者)、 Observer (觀察者)超埋、 subscribe (訂閱)搏讶、事件佳鳖。
Observable 和 Observer 通過 subscribe() 方法實現(xiàn)訂閱關(guān)系,從而 Observable 可以在需要的時候發(fā)出事件來通知 Observer媒惕。

  • onCompleted(): 事件隊列完結(jié)系吩。RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列妒蔚。RxJava 規(guī)定穿挨,當(dāng)不會再有新的
  • onNext() 發(fā)出時,需要觸發(fā) onCompleted() 方法作為標(biāo)志肴盏。
  • onError(): 事件隊列異常科盛。在事件處理過程中出異常時,onError() 會被觸發(fā)菜皂,同時隊列自動終止贞绵,不允許再有事件發(fā)出。
  • 在一個正確運行的事件序列中, onCompleted() 和 onError() 有且只有一個恍飘,并且是事件序列中的最后一個榨崩。需要注意的是,onCompleted() 和 onError() 二者也是互斥的章母,即在隊列中調(diào)用了其中一個蜡饵,就不應(yīng)該再調(diào)用另一個。

Observer 觀察者

它決定事件觸發(fā)的時候?qū)⒂性鯓拥男袨楦焓T贠bserver中有三個方法,onNext()肢专、onCompleted()舞肆、onError();

//創(chuàng)建Observer 
Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }
    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

除了Observer接口之外,RxJava還內(nèi)置了一個實現(xiàn)了 Observer 的抽象類:Subscriber博杖。Subscriber對Observer接口進(jìn)行了一些擴展椿胯,但他們的基本使用方式是完全一樣。

Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }
    
    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};
  • onStart(): 這是 Subscriber 增加的方法剃根。它會在 subscribe 剛開始哩盲,而事件還未發(fā)送之前被調(diào)用,可以用于做一些準(zhǔn)備工作狈醉,例如數(shù)據(jù)的清零或重置廉油。

    這是一個可選方法,默認(rèn)情況下它的實現(xiàn)為空苗傅。需要注意的是抒线,如果對準(zhǔn)備工作的線程有要求(例如彈出一個顯示進(jìn)度的對話框,這必須在主線程執(zhí)行)渣慕, onStart() 就不適用了嘶炭,因為它總是在 subscribe 所發(fā)生的線程被調(diào)用抱慌,而不能指定線程。要在指定的線程來做準(zhǔn)備工作眨猎,可以使用 doOnSubscribe() 方法抑进,具體可以在后面的文中看到。

  • unsubscribe(): 這是 Subscriber 所實現(xiàn)的另一個接口 Subscription 的方法睡陪,用于取消訂閱寺渗。在這個方法被調(diào)用后,Subscriber 將不再接收事件宝穗。一般在這個方法調(diào)用前户秤,可以使用 isUnsubscribed() 先判斷一下狀態(tài)。

    unsubscribe() 這個方法很重要逮矛,因為在 subscribe() 之后鸡号, Observable 會持有 Subscriber 的引用,這個引用如果不能及時被釋放须鼎,將有內(nèi)存泄露的風(fēng)險鲸伴。所以最好保持一個原則:要在不再使用的時候盡快在合適的地方(例如 onPause() onStop() 等方法中)調(diào)用 unsubscribe() 來解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生晋控。

Observable 被觀察者

它決定什么時候觸發(fā)事件以及觸發(fā)怎樣的事件汞窗。 RxJava 使用 create() 方法來創(chuàng)建一個 Observable。

在RxJava中有幾種不同的Observables:

  • Observable<T>:能夠發(fā)射0或者n個數(shù)據(jù)赡译,并以成功或錯誤事件終止仲吏。
  • Flowable<T>:能夠發(fā)射0或者n個數(shù)據(jù),并以成功或錯誤事件終止蝌焚。支持Backpressure裹唆,可以控制數(shù)據(jù)流發(fā)射速度。
  • Single<T>:只發(fā)射單個數(shù)據(jù)或錯誤事件只洒。
  • Completable:他從來不發(fā)射數(shù)據(jù)许帐,只處理onComplete和onError事件”锨矗可以看成是Rx的Runable成畦。
  • Mapbe<T>:能夠發(fā)射0或者1個數(shù)據(jù),要么成功涝开,要么失敗循帐,有點類似于Optional
//創(chuàng)建Observable
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("RxJava");
        subscriber.onCompleted();
    }
});

create() 方法是 RxJava 最基本的創(chuàng)造事件序列的方法∫ㄎ洌基于這個方法惧浴, RxJava 還提供了一些方法用來快捷創(chuàng)建事件隊列:

  • just(T...): 將傳入的參數(shù)依次發(fā)送出來。
Observable observable = Observable.just("Hello", "RxJava");
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("RxJava");
// onCompleted();
  • from(T[]) / from(Iterable<? extends T>) : 將傳入的數(shù)組或 Iterable 拆分成具體對象后奕剃,依次發(fā)送出來衷旅。
String[] words = {"Hello", "RxJava"};
Observable observable = Observable.from(words);
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("RxJava");
// onCompleted();

Subscribe 訂閱

創(chuàng)建了 Observable 和 Observer 之后捐腿,再用 subscribe() 方法將它們聯(lián)結(jié)起來,整條鏈子就可以工作了柿顶。

observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);

Action

Action是RxJava 的一個接口茄袖,常用的有Action0和Action1。

  • Action0: 它只有一個方法 call()嘁锯,這個方法是無參無返回值的宪祥;

    由于 onCompleted() 方法也是無參無返回值的,因此 Action0 可以被當(dāng)成一個包裝對象家乘,將 onCompleted() 的內(nèi)容打包起來將自己作為一個參數(shù)傳入 subscribe() 以實現(xiàn)不完整定義的回調(diào)蝗羊。

  • Action1:它同樣只有一個方法 call(T param),這個方法也無返回值仁锯,但有一個參數(shù)耀找;

    與 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是單參數(shù)無返回值的业崖,因此 Action1 可以將 onNext(obj)和 onError(error) 打包起來傳入 subscribe() 以實現(xiàn)不完整定義的回調(diào)

創(chuàng)建被觀察者和觀察者并訂閱野芒,Override時只用了onNext(obj),但是還有onError(error)和onCompleted()必須被重寫双炕,但是這兩個方法并沒有被用到導(dǎo)致產(chǎn)生無用代碼狞悲。

Observable.just("Hello", "RxJava")
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                        
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(String s) {
                        Log.i(TAG, s);
                    }
                });
//Action來代替Subscriber
Observable.just("Hello", "RxJava")
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.i(TAG, s);
                    }
                });

定義三個對象,分別打包onNext(obj)妇斤、onError(error) 摇锋、onCompleted()方法。

Observable observable = Observable.just("Hello", "RxJava");
        //處理onNext()中的內(nèi)容
        Action1<String> onNextAction = new Action1<String>() {
            @Override
            public void call(String s) {
                Log.i(TAG, s);
            }
        };
        //處理onError()中的內(nèi)容
        Action1<Throwable> onErrorAction = new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {

            }
        };
        //處理onCompleted()中的內(nèi)容
        Action0 onCompletedAction = new Action0() {
            @Override
            public void call() {
                Log.i(TAG, "Completed");

            }
        };

使用subscribe重載的方法

//使用 onNextAction 來定義 onNext()
Observable.just("Hello", "RxJava").subscribe(onNextAction);
//使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
Observable.just("Hello", "RxJava").subscribe(onNextAction, onErrorAction);
//使用 onNextAction站超、 onErrorAction 和 onCompletedAction 來定義 onNext()荸恕、 onError() 和 onCompleted()
Observable.just("Hello", "RxJava").subscribe(onNextAction, onErrorAction, onCompletedAction);

map和flatmap

除了map和flatMap之外,還有其他操作符以供使用顷编。其他常用的操作符如下:

  • filter:集合進(jìn)行過濾
  • each:遍歷集合
  • take:取出集合中的前幾個
  • skip:跳過前幾個元素

map

在使用map時涉及到一個接口,F(xiàn)unc1剑刑。
Func1與Action1很相似媳纬,區(qū)別在于Func1包裝的有返回值方法。
得到多個Person對象中的name施掏,保存到nameList中

Observable.just(person1, person2, person3)
                //使用map進(jìn)行轉(zhuǎn)換钮惠,參數(shù)1:轉(zhuǎn)換前的類型,參數(shù)2:轉(zhuǎn)換后的類型
                .map(new Func1<Person, String>() {
                    @Override
                    public String call(Person p) {
                        String name = p.getName();//獲取Person對象中的name
                        return name;//返回name
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        nameList.add(s);
                    }
                });

多次使用map七芭,想用幾個用幾個

        Observable.just("Hello", "RxJava")
                .map(new Func1<String, Integer>() {//將String類型的轉(zhuǎn)化為Integer類型的哈希碼
                    @Override
                    public Integer call(String s) {
                        return s.hashCode();
                    }
                })
                .map(new Func1<Integer, String>() {//將轉(zhuǎn)化后得到的Integer類型的哈希碼再轉(zhuǎn)化為String類型
                    @Override
                    public String call(Integer integer) {
                        return integer.intValue() + "";
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.i(TAG, s);
                    }
                });

flatmap

flatmap和map的不同在于素挽,flatmap進(jìn)行遍歷操作時不需要用循環(huán)語句。
使用map來實現(xiàn)打印所有學(xué)生所修個課程名

List<Student> students = new ArrayList<Student>();
        students.add...
        ...

        Action1<List<Course>> action1 = new Action1<List<Course>>() {
            @Override
            public void call(List<Course> courses) {
                //遍歷courses狸驳,輸出cuouses的name
                 for (int i = 0; i < courses.size(); i++){
                    Log.i(TAG, courses.get(i).getName());
                }
            }
        };
        Observable.from(students)
                .map(new Func1<Student, List<Course>>() {
                    @Override
                    public List<Course> call(Student student) {
                        //返回coursesList
                        return student.getCoursesList();
                    }
                })
                .subscribe(action1);

在Action1中出現(xiàn)了for來循環(huán)打印課程名预明,使用RxJava就是為了剔除這樣的嵌套結(jié)構(gòu)缩赛。

List<Student> students = new ArrayList<Student>();
        students.add...
        ...

        Observable.from(students)
                .flatMap(new Func1<Student, Observable<Course>>() {
                    @Override
                    public Observable<Course> call(Student student) {
                        return Observable.from(student.getCoursesList());
                    }
                })
                .subscribe(new Action1<Course>() {
                    @Override
                    public void call(Course course) {
                        Log.i(TAG, course.getName());
                    }
                });

flatmap對數(shù)據(jù)做的合并操作,不能保證數(shù)據(jù)傳入和取出的順序一樣撰糠,也就是說flatmap的操作是無序的酥馍;如果想要有序的效果,使用concatMap阅酪。

Scheduler

Scheduler:線程控制器旨袒,可以指定每一段代碼在什么樣的線程中執(zhí)行。
我個人對觀察者模式的理解就是為了解決术辐,異步操作砚尽;后臺處理邏輯,處理結(jié)果回調(diào)到前臺顯示辉词。所以Scheduler線程控制對RxJava來說相當(dāng)重要必孤!

在RxJava中有幾種不同的Scheduler:

  • Schedulers.immediate():直接在當(dāng)前線程運行,相當(dāng)于不指定線程较屿。這是默認(rèn)的 Scheduler隧魄。
  • Schedulers.newThread():總是啟用新線程,并在新線程執(zhí)行操作隘蝎。
  • Schedulers.io(): I/O 操作(讀寫文件购啄、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler嘱么。行為模式和 newThread() 差不多狮含,區(qū)別在于 io() 的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程曼振,因此多數(shù)情況下 io() 比newThread() 更有效率几迄。不要把計算工作放在 io() 中,可以避免創(chuàng)建不必要的線程冰评。
  • Schedulers.computation():計算所使用的 Scheduler映胁。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作甲雅,例如圖形的計算解孙。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)抛人。不要把 I/O 操作放在 computation() 中弛姜,否則 I/O 操作的等待時間會浪費 CPU。
  • AndroidSchedulers.mainThread():它指定的操作將在 Android 主線程運行妖枚。

新的線程發(fā)起事件廷臼,在主線程中消費

Observable.just("Hello", "Word")
                .subscribeOn(Schedulers.newThread())//指定 subscribe() 發(fā)生在新的線程
                .observeOn(AndroidSchedulers.mainThread())// 指定 Subscriber 的回調(diào)發(fā)生在主線程
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.i(TAG, s);
                    }
                });

subscribeOn(),和observeOn()方法來指定發(fā)生的線程和消費的線程。

  • subscribeOn():指定subscribe() 所發(fā)生的線程荠商,即 Observable.OnSubscribe 被激活時所處的線程寂恬。或者叫做事件產(chǎn)生的線程结啼。
  • observeOn():指定Subscriber 所運行在的線程掠剑。或者叫做事件消費的線程郊愧。

以及參數(shù)Scheduler朴译。
多次切換線程

Observable.just("Hello", "RxJava")
                .subscribeOn(Schedulers.newThread())//指定:在新的線程中發(fā)起
                .observeOn(Schedulers.io())         //指定:在io線程中處理
                .map(new Func1<String, String>() {
                    @Override
                    public String call(String s) {
                        return handleString(s);       //處理數(shù)據(jù)
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())//指定:在主線程中處理
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        show(s);                       //消費事件
                    }
                });

observeOn()被調(diào)用了兩次,分別指定了map的處理的現(xiàn)場和消費事件show(s)的線程属铁。

若將observeOn(AndroidSchedulers.mainThread())去掉會怎么樣眠寿?不為消費事件show(s)指定線程后,map的處理和最后的消費事件show(s)都會在io線程中執(zhí)行焦蘑。observeOn() 指定的是它之后的操作所在的線程盯拱。
observeOn()可以多次使用,可以隨意變換線程

RxJava與Retrofit結(jié)合使用

使用Retrofit傳統(tǒng)方式定義接口和使用

//定義接口
@GET("/user")
public void getUser(@Query("userId") String userId, Callback<User> callback);
//使用
getUser(userId, new Callback<User>() {
    @Override
    public void success(User user) {
        userView.setUser(user);
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
};

使用Retrofit+Rxjava定義接口和使用

//定義接口
@GET("/user")
public Observable<User> getUser(@Query("userId") String userId);
//使用
getUser(userId)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
    });

Retrofit 把請求封裝進(jìn) Observable 例嘱,在請求結(jié)束后調(diào)用 onNext() 或在請求失敗后調(diào)用 onError()狡逢。

使用普通方式修改數(shù)據(jù)庫中User的值,耗時操作放在工作線程執(zhí)行拼卵,是異步操作奢浑。

getUser(userId, new Callback<User>() {
    @Override
    public void success(User user) {
        new Thread() {
            @Override
            public void run() {
                processUser(user); // 嘗試修正 User 數(shù)據(jù)
                runOnUiThread(new Runnable() { // 切回 UI 線程
                    @Override
                    public void run() {
                        userView.setUser(user);
                    }
                });
            }).start();
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
};

使用RxJava

getUser(userId)
    .doOnNext(new Action1<User>() {
        @Override
        public void call(User user) {
            processUser(user);
        })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
    });
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市腋腮,隨后出現(xiàn)的幾起案子雀彼,更是在濱河造成了極大的恐慌,老刑警劉巖即寡,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件徊哑,死亡現(xiàn)場離奇詭異,居然都是意外死亡聪富,警方通過查閱死者的電腦和手機莺丑,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來墩蔓,“玉大人梢莽,你說我怎么就攤上這事「峙。” “怎么了蟹漓?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵炕横,是天一觀的道長源内。 經(jīng)常有香客問我,道長,這世上最難降的妖魔是什么膜钓? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任嗽交,我火速辦了婚禮,結(jié)果婚禮上颂斜,老公的妹妹穿的比我還像新娘夫壁。我一直安慰自己,他們只是感情好沃疮,可當(dāng)我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布盒让。 她就那樣靜靜地躺著,像睡著了一般司蔬。 火紅的嫁衣襯著肌膚如雪邑茄。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天俊啼,我揣著相機與錄音肺缕,去河邊找鬼。 笑死授帕,一個胖子當(dāng)著我的面吹牛同木,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播跛十,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼彤路,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了偶器?” 一聲冷哼從身側(cè)響起斩萌,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎屏轰,沒想到半個月后颊郎,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡霎苗,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年姆吭,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片唁盏。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡内狸,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出厘擂,到底是詐尸還是另有隱情昆淡,我是刑警寧澤,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布刽严,位于F島的核電站昂灵,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜眨补,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一管削、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧撑螺,春花似錦含思、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至线婚,卻和暖如春调鬓,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背酌伊。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工腾窝, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人居砖。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓虹脯,卻偏偏與公主長得像徙垫,于是被迫代替她去往敵國和親唧领。 傳聞我的和親對象是個殘疾皇子倒堕,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內(nèi)容