Android 帶大家進入RxJava的世界

Rx指的是ReactiveX,也是Reactive Extensions的縮寫泞莉,是一個使用可觀察數(shù)據(jù)流進行異步編程的編程接口馍管,結合了觀察者模式掸掏、迭代器模式和函數(shù)式編程的精華茁影,是一種編程思想的突破,它影響了許多其它的程序庫和框架以及編程語言

RxJava里最核心的兩個東西Observable和Subscriber丧凤,Observable指的是被觀察者募闲、事件源,Subscriber指的是被觀察者愿待、訂閱者浩螺、用戶。

舉個簡單的例子仍侥,像微信里面的服務號和公眾號要出,我們關注了某個服務號或公眾號,只要發(fā)出消息农渊,我們都能同時收到患蹂。這里的服務號和公眾號就是Observable,Subscriber指的就是我們用戶砸紊。

看起來RxJava蠻像觀察者模式传于,不過有一點不同Observable如果沒有Subscriber不會發(fā)出任何事件

下面就讓我們一起進入RxJava的世界吧

參考資料學習網(wǎng)址

1、Grokking RxJava
2醉顽、github地址
3格了、ReactiveX/RxJava文檔中文版
4、ReactiveX官網(wǎng)
5徽鼎、拋物線博客

配置環(huán)境

在builde.gradle里面添加

compile 'io.reactivex:rxandroid:1.2.1'
compile 'io.reactivex:rxjava:1.1.6'

基本實現(xiàn)

1、Hello world

創(chuàng)建Observable對象弹惦,通過Observable.create()來創(chuàng)建否淤,sub.onNext()可以發(fā)出信息,最后調用sub.onCompleted()來完成棠隐,還有一個sub.onError()方法是出現(xiàn)異常提供的一個方法石抡,一旦調用了sub.onError()或者sub.onCompleted(),后面的邏輯代碼都不會執(zhí)行助泽。

Observable<String> myObservable = Observable.create(
        new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> sub) {
                sub.onNext("Hello, world1!");
                sub.onNext("Hello, world2!");
                sub.onCompleted();
            }
        }
);

創(chuàng)建Subscriber對象啰扛,實例化Subscriber抽象類,實現(xiàn)onNext嗡贺、onCompleted隐解、onError三個方法,依次響應Observable對象里面的sub.onNext()诫睬、sub.onCompleted()和sub.onError()方法

Subscriber<String> mySubscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d("rxjava","onNext="+s); 
    }
    @Override
    public void onCompleted() { 
        Log.d("rxjava","onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.d("rxjava","onError="+e);
    }
};

最后Subscriber訂閱Observable煞茫,這里Observable可以被多個Subscriber訂閱

myObservable.subscribe(mySubscriber);
myObservable.subscribe(mySubscriber2);
myObservable.subscribe(mySubscriber3);

2、簡化代碼

上面的代碼可以轉變?yōu)檫@樣:

Observable<String> myObservable = Observable.just("Hello, world1!","Hello, world2!");
Action1<String> onNextAction = new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println(s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    @Override
    public void call(Throwable e) {
    }
};
Action0 onCompletedAction = new Action0() {
    @Override
    public void call() {
    }
};
// 自動創(chuàng)建 Subscriber 续徽,并使用 onNextAction 來定義 onNext()
myObservable.subscribe(onNextAction);
// 自動創(chuàng)建 Subscriber 蚓曼,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
myObservable.subscribe(onNextAction,onErrorAction);
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction钦扭、 onErrorAction 和 onCompletedAction 來定義 onNext()纫版、 onError() 和 onCompleted()
myObservable.subscribe(onNextAction,onErrorAction,onCompletedAction);

Observable.just("Hello, world1!","Hello, world2!")會直接創(chuàng)建Observable對象客情,
然后依次調用sub.onNext("Hello, world1!"); sub.onNext("Hello, world2!"); sub.onCompleted();

一般onErrorAction和onCompletedAction我們可以不用管理 最后代碼可優(yōu)化成:

Observable.just("Hello, world1!","Hello, world2!")
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        System.out.println(s);
                    }
                });

java8語法lambda可以寫成

Observable.just("Hello, world!")
                .subscribe(s -> System.out.println(s));

3其弊、操作符map

簡單的一個例子,傳遞一個字符串裹匙,然后獲取它的長度瑞凑,最后打印出來,在RxJava里我們可以這樣實現(xiàn):

Observable.just("Hello, world!")
        .map(new Func1<String,Integer>(){
            @Override
            public Integer call(String s){
                return s.length();
            }
        })
        .subscribe(new Action1<Integer>(){
            @Override
            public void call(Integer i){
                System.out.println(Integer.toString(i));
            }
        });

這里map方法需要結合Func1接口來使用概页,F(xiàn)unc1里第一個String類型是對應接收just類型次伶,第二個Integer類型是返回值,供下一個對象接收

4更胖、操作符from

如果我們要循環(huán)遍歷一個數(shù)組或者集合预茄,在RxJava里面可以使用from操作符來實現(xiàn)

String[] items = { "0", "1", "2", "3", "4", "5" };
        Observable.from(items)
                .subscribe(new Action1<String>(){
                    @Override
                    public void call(String str){
                        //依次遍歷打印items
                        System.out.println(str);
                    }
                });

操作符from會循環(huán)遍歷方法參數(shù)里面的數(shù)組或者集合,然后依次調用onNext()方法

5项鬼、操作符flatMap

如果我們要實現(xiàn)多重循環(huán)哑梳,可以使用flatMap操作符來實現(xiàn)

String[][] itemArray = {{"1","2","3"},{"4","5","6"},{"7","8","9"}};
Observable.from(itemArray)
        .flatMap(new Func1<String[],Observable<String>>(){
            @Override
            public Observable<String> call(String[] s){
                return Observable.from(s);
            }
        })
        .subscribe(new Subscriber<String>(){
            @Override
            public void onCompleted(){
            }
            @Override
            public void onError(Throwable e){
            }
            @Override
            public void onNext(String str){
                Log.d("rxJava","str="+str);
            }
        });

上面代碼執(zhí)行后,依次打印str=1到9绘盟,這里要注意的是Func1第二個參數(shù)類型是Observable<?>

6鸠真、操作符filter

字面理解應該就知道它的作用了,過濾不需要的數(shù)據(jù)

Observable.just(1, 2, 3, 4, 5)
    .filter(new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer item) {
            return( item < 4 );
        }
    })
    .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }
        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }
        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

上面代碼執(zhí)行結果龄毡,只會打印1到3吠卷,后面4和5不符合,則過濾

7沦零、操作符merge

合并多個Observables

Observable<Integer> odds = Observable.just(1, 3, 5);
Observable<Integer> evens = Observable.just(2, 4, 6);
Observable.merge(odds, evens)
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer item) {
                System.out.println("Next: " + item);
            }
            @Override
            public void onError(Throwable error) {
                System.err.println("Error: " + error.getMessage());
            }
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }
        });

上面代碼執(zhí)行后結果:依次打印的是1祭隔、3、5路操、2疾渴、4、6

*  Javadoc: merge(Iterable)
*  Javadoc: merge(Iterable,int)
*  Javadoc: merge(Observable[])
*  Javadoc: merge(Observable,Observable) (接受二到九個Observable)

除了傳遞多個Observable給merge屯仗,你還可以傳遞一個Observable列表List搞坝,數(shù)組,甚至是一個發(fā)射Observable序列的Observable魁袜,merge將合并它們的輸出作為單個Observable的輸出

8瞄沙、操作符timer和interval

5秒后打印數(shù)據(jù)己沛,第一個參數(shù)是多少時間執(zhí)行,第二個參數(shù)是時間單位

Observable.timer(5,TimeUnit.SECONDS)
        .subscribe(new Action1<Long>(){
            @Override
            public void call(Long aLong){
                Log.d("rxJava","5秒后執(zhí)行"+aLong);
            }
        });

1秒后每5秒循環(huán)打印距境,第一個參數(shù)為多少時間后執(zhí)行申尼,第二個為沒多少時間執(zhí)行,第三個是時間單位

Observable.interval(1,5,TimeUnit.SECONDS)
        .subscribe(new Action1<Long>(){
            @Override
            public void call(Long aLong){
                Log.d("rxJava","1秒后每5秒循環(huán)執(zhí)行"+aLong);
            }
        });

這里aLong返回的值應該是打印的次數(shù)

9垫桂、線程切換

主要通過subscribeOn()和 observeOn()來實現(xiàn)
subscribeOn()是控制Observable.OnSubscribe的線程切換师幕, subscribeOn()的線程切換發(fā)生在 OnSubscribe中,即在它通知上一級 OnSubscribe時诬滩,這時事件還沒有開始發(fā)送霹粥,因此控制可以從事件發(fā)出的開端就造成影響
observeOn()則是控制Subscriber的線程切換,observeOn() 的線程切換則發(fā)生在它內建的 Subscriber中疼鸟,即發(fā)生在它即將給下一級 Subscriber發(fā)送事件時后控,因此控制的是它后面的線程
Schedulers.io():IO線程
AndroidSchedulers.mainThread():主線程
Schedulers.newThread():新開線程

Observable.just(1, 2, 3, 4) // IO 線程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新線程空镜,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 線程浩淘,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread) 
    .subscribe(subscriber);  // Android 主線程,由 observeOn() 指定

10吴攒、防止按鈕重復點擊

1秒鐘內只會執(zhí)行第一次點擊

RxView.clicks(view)
        .throttleFirst(1,TimeUnit.SECONDS)
        .subscribe(new Observer<Object>(){
            @Override
            public void onCompleted(){
            }
            @Override
            public void onError(Throwable e){
            }
            @Override
            public void onNext(Object o){
            }
        });

RxView是RxBinding庫里的對象张抄,需要添加環(huán)境

compile 'com.jakewharton.rxbinding:rxbinding:0.4.0'

throttleFirst():在每次事件觸發(fā)后的一定時間間隔內丟棄新的事件

11、RxBus代替EventBus

public class RxBus {
    // 主題
    private final Subject<Object, Object> bus;
    // PublishSubject只會把在訂閱發(fā)生的時間點之后來自原始Observable的數(shù)據(jù)發(fā)射給觀察者
    private RxBus() {
        bus = new SerializedSubject<>(PublishSubject.create());
    }
    public static RxBus get() {
        return RxBusHolder.sInstance;
    }
    public void unSubscribe(CompositeSubscription compositeSubscription){
        if (compositeSubscription != null && !compositeSubscription.isUnsubscribed())
            compositeSubscription.unsubscribe();
    }
    private static class RxBusHolder {
        private static final RxBus sInstance = new RxBus();
    }
    // 提供了一個新的事件
    public void post(Object o) {
        bus.onNext(o);
    }
    // 根據(jù)傳遞的 eventType 類型返回特定類型(eventType)的 被觀察者
    public <T> Observable<T> toObserverable(Class<T> eventType) {
        return bus.ofType(eventType);
    }
}

RxBus提供了三個方法toObserverable()獲取Observable對象洼怔、unSubscribe()解除訂閱署惯、post()發(fā)送數(shù)據(jù)。

在MainActivity中實現(xiàn)

    public class MainActivity extends AppCompatActivity {
        private CompositeSubscription allSubscription = new CompositeSubscription();
        @Override
        protected void onCreate(Bundle savedInstanceState){
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);
            ...
            initRxBus();
        }
        private void initRxBus(){
            //注冊訂閱RxBus并添加到CompositeSubscription,在onDestroy()里可以統(tǒng)一解除訂閱
            allSubscription.add(RxBus.get().toObserverable(OneEvent.class).subscribe(this::response));
        }
        @Override
        protected void onDestroy(){
            super.onDestroy();
            //解除訂閱
            RxBus.get().unSubscribe(allSubscription);
        }
        /**
         * RxBus響應接收數(shù)據(jù)方法
         * @param event
         */
        public void response(OneEvent event) {
            Toast.makeText(getApplicationContext(),event.msg,Toast.LENGTH_LONG).show();
        }
        class OneEvent {
            String msg;
            public OneEvent(String msg) {
                this.msg = msg;
            }
        }
    }

最后在需要發(fā)送數(shù)據(jù)的地方調用镣隶,跟EventBus還是滿相似的

RxBus.get().post(new OneEvent("hello bus"));

12极谊、結合Retrofit使用
這里我就直接拿介紹Retrofit那篇文章的例子在這里說明下

@POST("query")
Observable<PostQueryInfo> searchRx(@Query("type") String type, @Query("postid") String postid);
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("http://www.kuaidi100.com/")
                 //添加數(shù)據(jù)解析ConverterFactory
                .addConverterFactory(GsonConverterFactory.create()) 
                 //添加RxJava
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())   
                .build();
        GitHubService apiService = retrofit.create(GitHubService.class);
        apiService.searchRx("yuantong","500379523313")
                //訪問網(wǎng)絡切換異步線程
                .subscribeOn(Schedulers.io())
                //響應結果處理切換成主線程
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<PostQueryInfo>() {
                    @Override
                    public void onCompleted() {
                         //請求結束回調
                    }
                    @Override
                    public void onError(Throwable e) {
                         //錯誤回調
                        e.printStackTrace();
                    }
                    @Override
                    public void onNext(PostQueryInfo postQueryInfo) {
                         //成功結果返回
                        Log.e("APP",postQueryInfo.getNu());
                    }
                });

Retrofit支持RxJava可以返回Observable對象,所以我們就可以直接拿來用安岂,很方便吧
.subscribeOn(Schedulers.io()請求網(wǎng)絡切換成IO線程
.observeOn(AndroidSchedulers.mainThread())切換成主線程更新UI
onNext(PostQueryInfo postQueryInfo)成功回調解析數(shù)據(jù)
public void onError(Throwable e)異郴晨幔回調拋出異常

Tips

配置java8 lambda環(huán)境

1、在Module中的build.gradle中添加嗜闻,然后Make Module 'xxx'

apply plugin: 'me.tatarka.retrolambda'
buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath 'me.tatarka:gradle-retrolambda:3.2.5'
    }
}
repositories {
    mavenCentral()
}
android {
    compileOptions {
        sourceCompatibility JavaVersion.VERSION_1_8
        targetCompatibility JavaVersion.VERSION_1_8
    }
    ...
}

2、測試 下面代碼不報紅桅锄,配置環(huán)境OK

textView.setOnClickListener(v -> 
    Toast.makeText(getApplicationContext(),"lambda",Toast.LENGTH_LONG).show()
);

學到最后其實RxJava只是剛入了個門琉雳,還有很多很多需要我們去學習的,這里只是跟大家一起看了看RxJava的世界是什么樣子的友瘤,如果要學好翠肘,還是要多去看看api文檔,多練習才能真正的熟練使用它辫秧。

革命尚未成功束倍,我們仍需努力

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子绪妹,更是在濱河造成了極大的恐慌甥桂,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,265評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件邮旷,死亡現(xiàn)場離奇詭異黄选,居然都是意外死亡,警方通過查閱死者的電腦和手機婶肩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評論 2 385
  • 文/潘曉璐 我一進店門办陷,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人律歼,你說我怎么就攤上這事民镜。” “怎么了险毁?”我有些...
    開封第一講書人閱讀 156,852評論 0 347
  • 文/不壞的土叔 我叫張陵制圈,是天一觀的道長。 經(jīng)常有香客問我辱揭,道長离唐,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,408評論 1 283
  • 正文 為了忘掉前任问窃,我火速辦了婚禮亥鬓,結果婚禮上,老公的妹妹穿的比我還像新娘域庇。我一直安慰自己嵌戈,他們只是感情好,可當我...
    茶點故事閱讀 65,445評論 5 384
  • 文/花漫 我一把揭開白布听皿。 她就那樣靜靜地躺著熟呛,像睡著了一般。 火紅的嫁衣襯著肌膚如雪尉姨。 梳的紋絲不亂的頭發(fā)上庵朝,一...
    開封第一講書人閱讀 49,772評論 1 290
  • 那天,我揣著相機與錄音又厉,去河邊找鬼九府。 笑死,一個胖子當著我的面吹牛覆致,可吹牛的內容都是我干的侄旬。 我是一名探鬼主播,決...
    沈念sama閱讀 38,921評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼煌妈,長吁一口氣:“原來是場噩夢啊……” “哼儡羔!你這毒婦竟也來了宣羊?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,688評論 0 266
  • 序言:老撾萬榮一對情侶失蹤汰蜘,失蹤者是張志新(化名)和其女友劉穎仇冯,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鉴扫,經(jīng)...
    沈念sama閱讀 44,130評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡赞枕,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,467評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了坪创。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片炕婶。...
    茶點故事閱讀 38,617評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖莱预,靈堂內的尸體忽然破棺而出柠掂,到底是詐尸還是另有隱情,我是刑警寧澤依沮,帶...
    沈念sama閱讀 34,276評論 4 329
  • 正文 年R本政府宣布涯贞,位于F島的核電站,受9級特大地震影響危喉,放射性物質發(fā)生泄漏宋渔。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,882評論 3 312
  • 文/蒙蒙 一辜限、第九天 我趴在偏房一處隱蔽的房頂上張望皇拣。 院中可真熱鬧,春花似錦薄嫡、人聲如沸氧急。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽吩坝。三九已至,卻和暖如春哑蔫,著一層夾襖步出監(jiān)牢的瞬間钉寝,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評論 1 265
  • 我被黑心中介騙來泰國打工闸迷, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留嵌纲,地道東北人。 一個月前我還...
    沈念sama閱讀 46,315評論 2 360
  • 正文 我出身青樓稿黍,卻偏偏與公主長得像,于是被迫代替她去往敵國和親崩哩。 傳聞我的和親對象是個殘疾皇子巡球,可洞房花燭夜當晚...
    茶點故事閱讀 43,486評論 2 348

推薦閱讀更多精彩內容