RxJava介紹

這篇文章大概是2017年時(shí)整理的,一直在OneNote中存放著铭乾,如今創(chuàng)建了個(gè)人博客吉拳,自然拿了出來。PS:從OneNote中復(fù)制出來后格式亂碼烈疚,整理的我想吐...

介紹

RxJava是Java上一個(gè)靈活的黔牵、使用可觀測序列組成的一個(gè)異步的、基于事件的庫爷肝。

特點(diǎn):

  • 作用:異步
  • 模式:觀察者模式-本質(zhì)上是基于回調(diào)
  • 結(jié)構(gòu):響應(yīng)式編程
  • 邏輯簡潔猾浦,可讀性高,易維護(hù)
  • 鏈?zhǔn)浇Y(jié)構(gòu)的執(zhí)行順序

基本流程

  1. 創(chuàng)建事件資源灯抛,也就是被觀察者金赦。可以用Observable.create/just/from等方法來創(chuàng)建对嚼。
  2. 通過filter/debounce等操作符夹抗,進(jìn)行自定義事件過濾。
  3. 通過Schedules進(jìn)行事件發(fā)送和訂閱的線程控制纵竖,也就是subscribeOn()和observeOn()漠烧。
  4. 通過map/flatMap/compose等操作符杏愤,進(jìn)行事件的變換
  5. 調(diào)用subscribe進(jìn)行事件訂閱。
  6. 最后已脓,不要忘了對訂閱者生命周期的控制珊楼,不用的時(shí)候,記得調(diào)用unsubscribe()度液,以免引發(fā)內(nèi)存泄漏厕宗。

注意:未取消訂閱而引起的內(nèi)存泄漏。在Activtity.onDestroy()或不需要繼續(xù)執(zhí)行時(shí)取消訂閱堕担。

CompositeSubscription已慢, 相當(dāng)于一個(gè)Subscription集合,來取消所有訂閱霹购。

示例

CompositeSubscription list = new CompositeSubscription();
list.add(subscription1);
list.add(subscription2);
list.add(subscription3);
// 統(tǒng)一調(diào)用一次unsubscribe蛇受,就可以把所有的訂閱都取消
list.unsubscribe();

基礎(chǔ)知識

Observer和Subscriber的關(guān)系

  • Observer是觀察者,Subscriber也是觀察者厕鹃。
  • Subscriber是一個(gè)實(shí)現(xiàn)了Observer接口的抽象類兢仰,對Observer進(jìn)行了部分?jǐn)U展,在使用上基本沒有區(qū)別剂碴。
  • Subscriber多了發(fā)送之前調(diào)用的onStart()和解除訂閱關(guān)系的unsubscribe()方法把将。
  • 在RxJava的subscribe過程中,Observer也總是會先被轉(zhuǎn)換成一個(gè)Subscriber再使用忆矛。
  • RxJava開發(fā)過程中一般都使用Subscriber察蹲。

RxJava的事件訂閱回調(diào)

支持以下三種不完整定義的回調(diào),我們可以根據(jù)當(dāng)前需要催训,傳入對應(yīng)的Action洽议,RxJava會相應(yīng)的自動創(chuàng)建Subscriber。

  1. observable.subscribe(onNextAction);
  2. observable.subscribe(onNextAction, onErrorAction);
  3. observable.subscribe(onNextAction, onErrorAction, onCompleteAction);

響應(yīng)式編程

  • Observable發(fā)出一系列事件漫拭,它是事件的產(chǎn)生者亚兄。

  • Subscriber負(fù)責(zé)處理事件,它是事件的消費(fèi)者采驻。

  • Operator是對Observable發(fā)出的事件進(jìn)行修改和變換 审胚。

    注意:若事件從產(chǎn)生到消費(fèi)不需要其他處理,則可以省略掉中間的Operator礼旅,從而流程變?yōu)?strong>Obsevable -> Subscriber膳叨。

  • Subscriber通常在主線程執(zhí)行,所以原則上不要去處理太多的事務(wù)痘系,而這些復(fù)雜的事務(wù)處理則交給Operator菲嘴。

知識點(diǎn)

Scheduler線程控制

默認(rèn)情況下,RxJava事件產(chǎn)生和消費(fèi)均在同一個(gè)線程中,例如在主線程中調(diào)用龄坪,那么事件的產(chǎn)生和消費(fèi)都在主線程昭雌,但RxJava可以自由切換線程。

RxJava線程調(diào)度器

  • Schedulers.io();

    I/O操作(讀寫文件悉默、數(shù)據(jù)庫、網(wǎng)絡(luò)請求等)苟穆,與 newThread() 差不多抄课,區(qū)別在于 io() 的內(nèi)部實(shí)現(xiàn)是是用一個(gè)無數(shù)量上限的線程池,可以重用空閑的線程雳旅,因此多數(shù)情況下 io() 效率比 newThread() 更高跟磨。值得注意的是,在 io() 下攒盈,不要進(jìn)行大量的計(jì)算抵拘,以免產(chǎn)生不必要的線程。

  • Schedulers.newThread();

    開啟新線程操作

  • Schedulers.immediate();

    默認(rèn)指定的線程型豁,也就是當(dāng)前線程

  • Schedulers.computation();

    計(jì)算所使用的調(diào)度器僵蛛。這個(gè)計(jì)算指的是CPU密集型計(jì)算,即不會被I/O等操作限制性能的操作迎变,例如圖形的計(jì)算充尉。這個(gè)Scheduler使用的固定的線程池,大小為CPU核數(shù)衣形。值得注意的是不要把I/O操作放在computation()中否則I/O操作的等待時(shí)間會浪費(fèi)CPU驼侠。

注意:

  • AndroidSchedulers.mainThread();是RxJava擴(kuò)展的Android主線程。
  • 通過subscribeOn()observeOn()這兩個(gè)方法來進(jìn)行線程調(diào)度谆吴。

變換操作符(重點(diǎn))

RxJava可以將發(fā)送的事件或事件序列倒源,加工后轉(zhuǎn)換成不用的事件或事件序列。

map操作符

  1. 是一對一的變換
  2. 返回的是變換后的對象
  3. 變換后的對象直接發(fā)到Subscriber回調(diào)中

flatMap操作符

  1. 可以適應(yīng)一對多的變換
  2. 返回的是一個(gè)Observable被觀察者對象
  3. 返回的Observable對象并不是直接發(fā)送到Subscriber的回調(diào)中句狼,而是重新創(chuàng)建一個(gè)Observable對象笋熬,并激活這個(gè)Observable對象,使之開始發(fā)送事件
  4. flatMap變換后產(chǎn)生的每一個(gè)Observable對象發(fā)送的事件腻菇,最終都匯入同一個(gè)Observable突诬,進(jìn)而發(fā)送給Subscriber回調(diào)

注意:

  • map的返回類型與flatMap返回的Observable事件類型,可以與原來的事件類型一樣
  • 可以對一個(gè)Observable多次使用map和flatMap
  • flatMap常常被用于嵌套的異步操作芜繁,例如:嵌套網(wǎng)絡(luò)請求

map操作符代碼示例

final ImageView ivLogo = (ImageView) findViewById(R.id.ivLogo);
Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
    subscriber.onNext("https://ss2.baidu.com/-vo3dSag_xI4khGko9WTAnF6hhy/image/h%3D200/sign=4db5130a073b5bb5a1d727fe06d2d523/cf1b9d16fdfaaf51965f931e885494eef11f7ad6.jpg");
    }
})
.map(new Func1<String, Drawable>() {
    @Override
    public Drawable call(String url) {
        try {
            Drawable drawable = Drawable.createFromStream(new URL(url).openStream(), "src");
            return drawable;
        } catch (IOException e) {
        }
        return null;
    }
})
// 指定subscribe()所在的線程旺隙,也就是call()方法調(diào)用的線程
.subscribeOn(Schedulers.io())
// 指定Subscriber回調(diào)方法所在的線程,也就是onCompleted,onError,onNext回調(diào)的線程 
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Drawable>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
        Log.e(TAG, e.toString());
    }
    @Override
    public void onNext(Drawable drawable) {
        if (drawable != null) {
            ivLogo.setImageDrawable(drawable);
        }
    }
});

flatMap操作符代碼示例

final List<Employee> list = new ArrayList<Employee>() {
    {
        add(new Employee("jackson", mission_list1));
        add(new Employee("sunny", mission_list2));
    }
};

Observable.from(list)
    .flatMap(new Func1<Employee, Observable<Employee.Mission>>() {
    @Override
    public Observable<Employee.Mission> call(Employee employee) {
        return Observable.from(employee.missions);
        }
    })
    .subscribe(new Subscriber<Employee.Mission>() {
        @Override
        public void onCompleted() {
        }
        @Override
        public void onError(Throwable e) {
        }
        @Override
        public void onNext(Employee.Mission mission) {
            Log.i(TAG, mission.desc);
        }
    });

from操作符

接收一個(gè)集合作為輸入骏令,每次輸出一個(gè)元素給subscriber

注意:若需要執(zhí)行耗時(shí)操作蔬捷,即使在from中使用subscribeOn(Schedulers.io()),仍然是在主線程執(zhí)行,會造成界面卡頓甚至崩潰周拐。

from操作符代碼示例

// 格式:Observable.from(T[] params)
Observable.from(new Integer[]{1, 2, 3, 4, 5})
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.i(TAG, "number:" + number);
        }
    });

just操作符

接收一個(gè)可變參數(shù)作為輸入铡俐,最終也是生成數(shù)組,調(diào)用from()妥粟,每次輸出一個(gè)元素給subscriber

just操作符代碼示例

// Observable.just(T... params)审丘,params的個(gè)數(shù)為1 ~ 10
Observable.just(1, 2, 3, 4, 5)
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.i(TAG, "number:" + number);
        }
    });

filter操作符

條件過濾,用于去除不符合條件的事件

filter操作符代碼示例

Observable.from(new Integer[]{1, 2, 3, 4, 5})
    .filter(new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer number) {
            // 偶數(shù)返回true勾给,則表示剔除奇數(shù)滩报,留下偶數(shù)
            return number % 2 == 0;
        }
    })
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.i(TAG, "number:" + number);
        }
    });

take操作符

最多保留的事件數(shù)

doOnNext操作符

在處理下一個(gè)事件前要做的事

take和doOnNext操作符代碼示例

Observable.from(new Integer[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12})
    .filter(new Func1<Integer, Boolean>() {
        @Override
        public Boolean call(Integer number) {
            // 偶數(shù)返回true,則表示剔除奇數(shù)
            return number % 2 == 0;
        }
    })
    // 最多保留三個(gè)播急,也就是最后剩三個(gè)偶數(shù)
    .take(3)
    .doOnNext(new Action1<Integer>() {
        @Override 
        public void call(Integer number) {
            // 在輸出偶數(shù)之前輸出它的hashCode
            Log.i(TAG, "hahcode = " + number.hashCode() + "");
        }
    })
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.i(TAG, "number = " + number);
        }
    });

輸出結(jié)果:

hahcode = 2
number = 2
hahcode = 4
number = 4
hahcode = 6
number = 6

debounce操作符

過濾在指定的時(shí)間間隔之間的事件脓钾,接收一個(gè)事件后將在指定時(shí)間間隔后開始接收事件

debounce操作符代碼示例

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        int i = 0;
        int[] times = new int[]{100, 1000};
        while (true) {
            i++;
            if (i >= 100) break;
            subscriber.onNext(i);
            try {
                /** 注意:
                當(dāng)i為奇數(shù)時(shí),休眠1000ms桩警,然后才發(fā)送i+1可训,這時(shí)i不會被過濾掉。
                當(dāng)i為偶數(shù)時(shí)捶枢,只休眠100ms握截,便發(fā)送i+1,這時(shí)i會被過濾掉
                */
                Thread.sleep(times[i % 2]);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        subscriber.onCompleted();
    }
})
// 間隔400ms以內(nèi)的事件將被丟棄
.debounce(400, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
    @Override
    public void onCompleted() {
        Log.i(TAG, "complete");
    }
    @Override
    public void onError(Throwable e) {
        Log.e(TAG, e.toString());
    }
    @Override
    public void onNext(Integer integer) {
        Log.i(TAG, "integer = " + integer);
    }
});

merge操作符

用于合并兩個(gè)Observable為一個(gè)Observable
格式:

Observable.merge(Observable1, Observable2)
.subscribe(subscriber);

concat操作符

順序的執(zhí)行多個(gè)Ovservable烂叔,個(gè)數(shù)為1—9(示例見first操作符)

compose操作符

類似flatMap川蒙,都是進(jìn)行變換,返回Observable對象长已,激活并發(fā)送事件

和flatmap區(qū)別

  1. compose是唯一一個(gè)能從數(shù)據(jù)流中得到原始Observable的操作符畜眨,需要對整個(gè)數(shù)據(jù)流產(chǎn)生作用的操作需使用compose來實(shí)現(xiàn)。如subscribeOn()和observeOn()术瓮,在flatMap中使用的話康聂,僅對在flatMap中創(chuàng)建的Observable起作用,不會對剩下的流產(chǎn)生影響.
  2. compose是對Observable整體的變換胞四。flatMap轉(zhuǎn)換Observable里的每一個(gè)事件恬汁,compose轉(zhuǎn)換的是整個(gè)Observable數(shù)據(jù)流。
  3. flatMap每發(fā)送一個(gè)事件都創(chuàng)建一個(gè)Observable辜伟,效率低氓侧。compose只在主干數(shù)據(jù)流上執(zhí)行操作。
  4. 建議使用compose代替flatMap导狡。

first操作符

只發(fā)送符合條件的第一個(gè)事件约巷。如:可以結(jié)合contact做網(wǎng)絡(luò)三級緩存

first操作符代碼示例

// 從緩存獲取
Observable<BookList> fromDisk = Observable.create(new Observable.OnSubscribe<BookList>() {
    @Override
    public void call(Subscriber<? super BookList> subscriber) {
        BookList list = getFromDisk();
        if (list != null) {
            subscriber.onNext(list);
        } else {
            subscriber.onCompleted();
        }
    }
});
// 從網(wǎng)絡(luò)獲取
Observable<BookList> fromNetWork = bookApi.getBookDetailDisscussionList();
Observable.concat(fromDisk, fromNetWork)
// 如果緩存不為null,則不再進(jìn)行網(wǎng)絡(luò)請求旱捧。
.first()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<BookList>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(BookList discussionList) {
    }
});

timer操作符

定時(shí)器独郎,可以做定時(shí)操作或延遲操作

timer操作符代碼示例

Observable.timer(2, TimeUnit.SECONDS)
    .subscribe(new Subscriber<Long>() {
        @Override
        public void onCompleted() {
        }
        @Override
        public void onError(Throwable e) {
        }
        @Override
        public void onNext(Long aLong) {
            Log.i(TAG, "Hello World!");
        }
    });

interval操作符

定時(shí)的周期性操作踩麦,與timer操作符的區(qū)別是可以重復(fù)操作

throttleFirst操作符

類似debounce操作符,時(shí)間間隔太短就會丟棄事件氓癌∥角可用于防抖操作,如防止雙擊

throttleFirst操作符代碼示例

RxView.clicks(button)
    .throttleFirst(1, TimeUnit.SECONDS)
    .subscribe(new Observer<Object>() {
        @Override
        public void onCompleted() {
        }
        @Override
        public void onError(Throwable e) {
        }
        @Override
        public void onNext(Object o) {
            Log.i(TAG, "do clicked!");
        }
    });

Single操作符

相當(dāng)于Observable的精簡版贪婉。觀察者回調(diào)的不是onNext/onError/onCompleted反粥,而是回調(diào)onSuccess/onError

subject操作符

既是事件的生產(chǎn)者,又是事件的消費(fèi)者

subject操作符代碼示例

Subject subject = PublishSubject.create();

subject.debounce(400, TimeUnit.MILLISECONDS)
    .subscribe(new Subscriber<Object>() {
        @Override
        public void onCompleted() {
        }
        @Override
        public void onError(Throwable e) {
        }
        @Override
        public void onNext(Object o) {
        // request
        }
    });

edittext.addTextChangedListener(new TextWatcher() {
    @Override
    public void beforeTextChanged(CharSequence s, int start, int count, int after) { }
    @Override
    public void onTextChanged(CharSequence s, int start, int before, int count) {
        subject.onNext(s.toString());
    }
    @Override
    public void afterTextChanged(Editable s) { }
});

參考資料->RxJava詳解-由淺入深

這是當(dāng)時(shí)的文章名稱疲迂,如今去看作者已經(jīng)進(jìn)行了更新RxJava 從入門到出軌才顿,也是騷的不行~

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市鬼譬,隨后出現(xiàn)的幾起案子娜膘,更是在濱河造成了極大的恐慌逊脯,老刑警劉巖优质,帶你破解...
    沈念sama閱讀 216,324評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異军洼,居然都是意外死亡巩螃,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,356評論 3 392
  • 文/潘曉璐 我一進(jìn)店門匕争,熙熙樓的掌柜王于貴愁眉苦臉地迎上來避乏,“玉大人,你說我怎么就攤上這事甘桑∨钠ぃ” “怎么了?”我有些...
    開封第一講書人閱讀 162,328評論 0 353
  • 文/不壞的土叔 我叫張陵跑杭,是天一觀的道長铆帽。 經(jīng)常有香客問我,道長德谅,這世上最難降的妖魔是什么爹橱? 我笑而不...
    開封第一講書人閱讀 58,147評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮窄做,結(jié)果婚禮上愧驱,老公的妹妹穿的比我還像新娘。我一直安慰自己椭盏,他們只是感情好组砚,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,160評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著掏颊,像睡著了一般惫确。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,115評論 1 296
  • 那天改化,我揣著相機(jī)與錄音掩蛤,去河邊找鬼。 笑死陈肛,一個(gè)胖子當(dāng)著我的面吹牛揍鸟,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播句旱,決...
    沈念sama閱讀 40,025評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼阳藻,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了谈撒?” 一聲冷哼從身側(cè)響起腥泥,我...
    開封第一講書人閱讀 38,867評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎啃匿,沒想到半個(gè)月后蛔外,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,307評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡溯乒,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,528評論 2 332
  • 正文 我和宋清朗相戀三年夹厌,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片裆悄。...
    茶點(diǎn)故事閱讀 39,688評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡矛纹,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出光稼,到底是詐尸還是另有隱情或南,我是刑警寧澤,帶...
    沈念sama閱讀 35,409評論 5 343
  • 正文 年R本政府宣布艾君,位于F島的核電站采够,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏腻贰。R本人自食惡果不足惜吁恍,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,001評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望播演。 院中可真熱鬧冀瓦,春花似錦、人聲如沸写烤。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,657評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽洲炊。三九已至感局,卻和暖如春尼啡,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背询微。 一陣腳步聲響...
    開封第一講書人閱讀 32,811評論 1 268
  • 我被黑心中介騙來泰國打工崖瞭, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人撑毛。 一個(gè)月前我還...
    沈念sama閱讀 47,685評論 2 368
  • 正文 我出身青樓书聚,卻偏偏與公主長得像,于是被迫代替她去往敵國和親藻雌。 傳聞我的和親對象是個(gè)殘疾皇子雌续,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,573評論 2 353

推薦閱讀更多精彩內(nèi)容

  • 轉(zhuǎn)一篇文章 原地址:http://gank.io/post/560e15be2dca930e00da1083 前言...
    jack_hong閱讀 913評論 0 2
  • 在正文開始之前的最后,放上GitHub鏈接和引入依賴的gradle代碼: Github: https://gith...
    蘇蘇說zz閱讀 677評論 0 2
  • 我從去年開始使用 RxJava 胯杭,到現(xiàn)在一年多了驯杜。今年加入了 Flipboard 后,看到 Flipboard 的...
    huqj閱讀 1,852評論 0 21
  • 文章轉(zhuǎn)自:http://gank.io/post/560e15be2dca930e00da1083作者:扔物線在正...
    xpengb閱讀 7,031評論 9 73
  • 他們喧嘩爭斗做个,他們懷疑失望鸽心,他們辯論而沒有結(jié)果。 我的孩子叁温,讓你的生命到他們當(dāng)中去再悼,如一線鎮(zhèn)定而純潔之光核畴,使他們愉...
    一花一世界1217閱讀 333評論 0 0