給初學者的RxJava2.0教程(二)

Outline

[TOC]

前言

上一節(jié)教程講解了最基本的RxJava2的使用, 在本節(jié)中, 我們將學習RxJava強大的線程控制.

正題

還是以之前的例子, 兩根水管:

RxJava

正常情況下, 上游和下游是工作在同一個線程中的, 也就是說上游在哪個線程發(fā)事件, 下游就在哪個線程接收事件.

怎么去理解呢, 以Android為例, 一個Activity的所有動作默認都是在主線程中運行的, 比如我們在onCreate中打出當前線程的名字:

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        Log.d(TAG, Thread.currentThread().getName());
    }

結(jié)果便是:

D/TAG: main

回到RxJava中, 當我們在主線程中去創(chuàng)建一個上游Observable來發(fā)送事件, 則這個上游默認就在主線程發(fā)送事件.

當我們在主線程去創(chuàng)建一個下游Observer來接收事件, 則這個下游默認就在主線程中接收事件, 來看段代碼:

@Override                                                                                       
protected void onCreate(Bundle savedInstanceState) {                                            
    super.onCreate(savedInstanceState);                                                         
    setContentView(R.layout.activity_main);                                                     
                                                                          
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {   
        @Override                                                                               
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {            
            Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());           
            Log.d(TAG, "emit 1");                                                               
            emitter.onNext(1);                                                                  
        }                                                                                       
    });                                                                                         
                                                                                                
    Consumer<Integer> consumer = new Consumer<Integer>() {                                      
        @Override                                                                               
        public void accept(Integer integer) throws Exception {                                  
            Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());              
            Log.d(TAG, "onNext: " + integer);                                                   
        }                                                                                       
    };                                                                                          
                                                                                                
    observable.subscribe(consumer);                                                             
}                                                                                                                                                                   

在主線程中分別創(chuàng)建上游和下游, 然后將他們連接在一起, 同時分別打印出它們所在的線程, 運行結(jié)果為:

D/TAG: Observable thread is : main
D/TAG: emit 1                     
D/TAG: Observer thread is :main   
D/TAG: onNext: 1                  

這就驗證了剛才所說, 上下游默認是在同一個線程工作.

這樣肯定是滿足不了我們的需求的, 我們更多想要的是這么一種情況, 在子線程中做耗時的操作, 然后回到主線程中來操作UI, 用圖片來描述就是下面這個圖片:

thread.png

在這個圖中, 我們用黃色水管表示子線程, 深藍色水管表示主線程.

要達到這個目的, 我們需要先改變上游發(fā)送事件的線程, 讓它去子線程中發(fā)送事件, 然后再改變下游的線程, 讓它去主線程接收事件. 通過RxJava內(nèi)置的線程調(diào)度器可以很輕松的做到這一點. 接下來看一段代碼:

@Override                                                                                       
protected void onCreate(Bundle savedInstanceState) {                                            
    super.onCreate(savedInstanceState);                                                         
    setContentView(R.layout.activity_main);                                                     
                                                                                                
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {   
        @Override                                                                               
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {            
            Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());           
            Log.d(TAG, "emit 1");                                                               
            emitter.onNext(1);                                                                  
        }                                                                                       
    });                                                                                         
                                                                                                
    Consumer<Integer> consumer = new Consumer<Integer>() {                                      
        @Override                                                                               
        public void accept(Integer integer) throws Exception {                                  
            Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());              
            Log.d(TAG, "onNext: " + integer);                                                   
        }                                                                                       
    };                                                                                          
                                                                                                
    observable.subscribeOn(Schedulers.newThread())                                              
            .observeOn(AndroidSchedulers.mainThread())                                          
            .subscribe(consumer);                                                               
}                                                                                               

還是剛才的例子, 只不過我們太添加了一點東西, 先來看看運行結(jié)果:

 D/TAG: Observable thread is : RxNewThreadScheduler-2  
 D/TAG: emit 1                                         
 D/TAG: Observer thread is :main                       
 D/TAG: onNext: 1                                      

可以看到, 上游發(fā)送事件的線程的確改變了, 是在一個叫 RxNewThreadScheduler-2的線程中發(fā)送的事件, 而下游仍然在主線程中接收事件, 這說明我們的目的達成了, 接下來看看是如何做到的.

和上一段代碼相比,這段代碼只不過是增加了兩行代碼:

.subscribeOn(Schedulers.newThread())                                              
.observeOn(AndroidSchedulers.mainThread())   

作為一個初學者的入門教程, 并不會貼出一大堆源碼來分析, 因此只需要讓大家記住幾個要點, 已達到如何正確的去使用這個目的才是我們的目標.

簡單的來說, subscribeOn() 指定的是上游發(fā)送事件的線程, observeOn() 指定的是下游接收事件的線程.

多次指定上游的線程只有第一次指定的有效, 也就是說多次調(diào)用subscribeOn() 只有第一次的有效, 其余的會被忽略.

多次指定下游的線程是可以的, 也就是說每調(diào)用一次observeOn() , 下游的線程就會切換一次.

舉個例子:

 observable.subscribeOn(Schedulers.newThread())     
         .subscribeOn(Schedulers.io())              
         .observeOn(AndroidSchedulers.mainThread()) 
         .observeOn(Schedulers.io())                
         .subscribe(consumer);                      

這段代碼中指定了兩次上游發(fā)送事件的線程, 分別是newThread和IO線程, 下游也指定了兩次線程,分別是main和IO線程. 運行結(jié)果為:

D/TAG: Observable thread is : RxNewThreadScheduler-3
D/TAG: emit 1                                       
D/TAG: Observer thread is :RxCachedThreadScheduler-1
D/TAG: onNext: 1                                    

可以看到, 上游雖然指定了兩次線程, 但只有第一次指定的有效, 依然是在RxNewThreadScheduler 線程中, 而下游則跑到了RxCachedThreadScheduler 中, 這個CacheThread其實就是IO線程池中的一個.

為了更清晰的看到下游的線程切換過程, 我們加點log:

     observable.subscribeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "After observeOn(mainThread), current thread is: " + Thread.currentThread().getName());
                    }
                })
                .observeOn(Schedulers.io())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "After observeOn(io), current thread is : " + Thread.currentThread().getName());
                    }
                })
                .subscribe(consumer);                                                

我們在下游線程切換之后, 把當前的線程打印出來, 運行結(jié)果:

D/TAG: Observable thread is : RxNewThreadScheduler-1                                             
D/TAG: emit 1                                                                                    
D/TAG: After observeOn(mainThread), current thread is: main                                      
D/TAG: After observeOn(io), current thread is : RxCachedThreadScheduler-2                        
D/TAG: Observer thread is :RxCachedThreadScheduler-2                                             
D/TAG: onNext: 1                                 

可以看到, 每調(diào)用一次observeOn() 線程便會切換一次, 因此如果我們有類似的需求時, 便可知道如何處理了.

在RxJava中, 已經(jīng)內(nèi)置了很多線程選項供我們選擇, 例如有

  • Schedulers.io() 代表io操作的線程, 通常用于網(wǎng)絡,讀寫文件等io密集型的操作
  • Schedulers.computation() 代表CPU計算密集型的操作, 例如需要大量計算的操作
  • Schedulers.newThread() 代表一個常規(guī)的新線程
  • AndroidSchedulers.mainThread() 代表Android的主線程

這些內(nèi)置的Scheduler已經(jīng)足夠滿足我們開發(fā)的需求, 因此我們應該使用內(nèi)置的這些選項, 在RxJava內(nèi)部使用的是線程池來維護這些線程, 所有效率也比較高.

實踐

對于我們Android開發(fā)人員來說, 經(jīng)常會將一些耗時的操作放在后臺, 比如網(wǎng)絡請求或者讀寫文件,操作數(shù)據(jù)庫等等,等到操作完成之后回到主線程去更新UI, 有了上面的這些基礎, 那么現(xiàn)在我們就可以輕松的去做到這樣一些操作.

下面來舉幾個常用的場景.

網(wǎng)絡請求

Android中有名的網(wǎng)絡請求庫就那么幾個, Retrofit能夠從中脫穎而出很大原因就是因為它支持RxJava的方式來調(diào)用, 下面簡單講解一下它的基本用法.

要使用Retrofit,先添加Gradle配置:

    //retrofit
    compile 'com.squareup.retrofit2:retrofit:2.1.0'
    //Gson converter
    compile 'com.squareup.retrofit2:converter-gson:2.1.0'
    //RxJava2 Adapter
    compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
    //okhttp
    compile 'com.squareup.okhttp3:okhttp:3.4.1'
    compile 'com.squareup.okhttp3:logging-interceptor:3.4.1'

隨后定義Api接口:

public interface Api {
    @GET
    Observable<LoginResponse> login(@Body LoginRequest request);

    @GET
    Observable<RegisterResponse> register(@Body RegisterRequest request);
}

接著創(chuàng)建一個Retrofit客戶端:

private static Retrofit create() {
            OkHttpClient.Builder builder = new OkHttpClient().newBuilder();
            builder.readTimeout(10, TimeUnit.SECONDS);
            builder.connectTimeout(9, TimeUnit.SECONDS);

            if (BuildConfig.DEBUG) {
                HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
                interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
                builder.addInterceptor(interceptor);
            }

            return new Retrofit.Builder().baseUrl(ENDPOINT)
                    .client(builder.build())
                    .addConverterFactory(GsonConverterFactory.create())
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                    .build();
}

發(fā)起請求就很簡單了:

        Api api = retrofit.create(Api.class);
        api.login(request)
             .subscribeOn(Schedulers.io())               //在IO線程進行網(wǎng)絡請求
             .observeOn(AndroidSchedulers.mainThread())  //回到主線程去處理請求結(jié)果
            .subscribe(new Observer<LoginResponse>() {
                @Override
                public void onSubscribe(Disposable d) {}

                @Override
                public void onNext(LoginResponse value) {}

                @Override
                public void onError(Throwable e) {
                    Toast.makeText(mContext, "登錄失敗", Toast.LENGTH_SHORT).show();
                }

                @Override
                public void onComplete() {
                    Toast.makeText(mContext, "登錄成功", Toast.LENGTH_SHORT).show();
                }
            });

看似很完美, 但我們忽略了一點, 如果在請求的過程中Activity已經(jīng)退出了, 這個時候如果回到主線程去更新UI, 那么APP肯定就崩潰了, 怎么辦呢, 上一節(jié)我們說到了Disposable , 說它是個開關, 調(diào)用它的dispose()方法時就會切斷水管, 使得下游收不到事件, 既然收不到事件, 那么也就不會再去更新UI了. 因此我們可以在Activity中將這個Disposable 保存起來, 當Activity退出時, 切斷它即可.

那如果有多個Disposable 該怎么辦呢, RxJava中已經(jīng)內(nèi)置了一個容器CompositeDisposable, 每當我們得到一個Disposable時就調(diào)用CompositeDisposable.add()將它添加到容器中, 在退出的時候, 調(diào)用CompositeDisposable.clear() 即可切斷所有的水管.

讀寫數(shù)據(jù)庫

上面說了網(wǎng)絡請求的例子, 接下來再看看讀寫數(shù)據(jù)庫, 讀寫數(shù)據(jù)庫也算一個耗時的操作, 因此我們也最好放在IO線程里去進行, 這個例子就比較簡單, 直接上代碼:

public Observable<List<Record>> readAllRecords() {
        return Observable.create(new ObservableOnSubscribe<List<Record>>() {
            @Override
            public void subscribe(ObservableEmitter<List<Record>> emitter) throws Exception {
                Cursor cursor = null;
                try {
                    cursor = getReadableDatabase().rawQuery("select * from " + TABLE_NAME, new String[]{});
                    List<Record> result = new ArrayList<>();
                    while (cursor.moveToNext()) {
                        result.add(Db.Record.read(cursor));
                    }
                    emitter.onNext(result);
                    emitter.onComplete();
                } finally {
                    if (cursor != null) {
                        cursor.close();
                    }
                }
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
    }

好了本次的教程就到這里吧, 后面的教程將會教大家如何使用RxJava中強大的操作符. 通過使用這些操作符可以很輕松的做到各種吊炸天的效果. 敬請期待.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末写半,一起剝皮案震驚了整個濱河市生音,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌讯检,老刑警劉巖茄唐,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件凸郑,死亡現(xiàn)場離奇詭異辜限,居然都是意外死亡蛮位,警方通過查閱死者的電腦和手機逊移,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門预吆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人胳泉,你說我怎么就攤上這事拐叉⊙乙牛” “怎么了?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵凤瘦,是天一觀的道長宿礁。 經(jīng)常有香客問我,道長蔬芥,這世上最難降的妖魔是什么梆靖? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮笔诵,結(jié)果婚禮上返吻,老公的妹妹穿的比我還像新娘。我一直安慰自己乎婿,他們只是感情好测僵,可當我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著谢翎,像睡著了一般捍靠。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上森逮,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天榨婆,我揣著相機與錄音,去河邊找鬼吊宋。 笑死纲辽,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的璃搜。 我是一名探鬼主播拖吼,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼这吻!你這毒婦竟也來了吊档?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤唾糯,失蹤者是張志新(化名)和其女友劉穎怠硼,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體移怯,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡香璃,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了舟误。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片葡秒。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出眯牧,到底是詐尸還是另有隱情蹋岩,我是刑警寧澤,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布学少,位于F島的核電站剪个,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏版确。R本人自食惡果不足惜扣囊,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望绒疗。 院中可真熱鬧如暖,春花似錦、人聲如沸忌堂。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽士修。三九已至枷遂,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間棋嘲,已是汗流浹背酒唉。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留沸移,地道東北人痪伦。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像雹锣,于是被迫代替她去往敵國和親网沾。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容