Rxandroid基礎(chǔ)

一葫掉、Rxjava環(huán)境配置

使用android studio,gradle腳本中加入依賴:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.5'

二诡渴、示例

Rxjava基本示例

Observable.just("one", "two", "three", "four", "five")
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(/* an Observer */);
Looper backgroundLooper = // ...
Observable.just("one", "two", "three", "four", "five")
        .observeOn(AndroidSchedulers.from(backgroundLooper))
        .subscribe(/* an Observer */)

Rxjava進(jìn)階使用示例

rxjava實現(xiàn)緩存機制

@Override
    public Observable<List<Test>> getDatas() {

        if (mCachedTests != null && !mCachedTests.isEmpty() && !mCacheIsDirty) {
            Log.i(TAG,"mCachedTests dataget");
            return Observable.fromIterable(mCachedTests.values()).toList().toObservable();
        } else if (mCachedTests == null) {
            mCachedTests = new LinkedHashMap<>();
        }

        Observable<List<Test>> remoteTests = loadTestRemoteDataSource();
        if (mCacheIsDirty) {
            return remoteTests;
        } else {
            // Query the local storage if available. If not, query the network.
            Observable<List<Test>>localTests = loadTestsLocalDataSource();

            return Observable.concat(localTests, remoteTests)
                    .firstElement().toObservable();
        }
    }

    private Observable<List<Test>> loadTestRemoteDataSource(){

        return mTestRemoteDataSource.getDatas()
                .filter(testList -> !testList.isEmpty())
                .flatMap(new Function<List<Test>, ObservableSource<List<Test>>>() {
                    @Override
                    public ObservableSource<List<Test>> apply(@io.reactivex.annotations.NonNull List<Test> testList) throws Exception {
                        mCachedTests.clear();
                        return Observable.fromIterable(testList).doOnNext(test -> mCachedTests.put(test.getId(),test)).toList().toObservable();
                    }
                })
                .doOnNext(mCachedTests->{
                    mTestLocalDataSource.refreshData();
                    mTestLocalDataSource.saveDatas(mCachedTests);
                })
                .doOnComplete(()-> mCacheIsDirty=false);
    }

    private Observable<List<Test>> loadTestsLocalDataSource(){

        return mTestLocalDataSource.getDatas()
                .firstElement()//problem : https://github.com/square/sqlbrite/issues/123
                .toObservable()
                .filter(testList -> !testList.isEmpty())
                .flatMap(new Function<List<Test>, ObservableSource<List<Test>>>() {
                    @Override
                    public ObservableSource<List<Test>> apply(@io.reactivex.annotations.NonNull List<Test> testList) throws Exception {
                        mCachedTests.clear();
                        return Observable.fromIterable(testList).doOnNext(test -> mCachedTests.put(test.getId(),test)).toList().toObservable();
                    }
                })
                .doOnNext(testList->Log.i(TAG,"mTestLocalDataSource doOnNext"))
                .doOnComplete(()->Log.i(TAG,"mTestLocalDataSource doOnComplete"));
    }

rxjava封裝網(wǎng)絡(luò)請求

public class RxjavaHttp {

    /**
     * 設(shè)置context
     * @param context 暫時不能使用ApplicationContext,可以為空
     * @return 當(dāng)前operator
     */
    public static Operator with(@Nullable Context context){
        Operator operator = new Operator();
        return operator.with(context);
    }


    public static class Operator{


        private Operator operator;
        private boolean willchecknet = true;
        private boolean willcheckresult = true;

        public Operator(){
            this.operator = this;
        }

        private WeakReference<Context> contextWeakReference;

        /**
         * 設(shè)置context
         * @param context 暫時不能使用ApplicationContext
         * @return 當(dāng)前operator
         */
        public Operator with(Context context){
            if (context == null) {
                throw new IllegalArgumentException("You cannot start a load on a null Context");
            } else if (context instanceof Application) {
                throw new IllegalArgumentException("You cannot use a context instanceof Application");
            } else {
                contextWeakReference = new WeakReference<Context>(context);
            }
            return operator;
        }

        /**
         * 請求前是否檢查網(wǎng)絡(luò)奏司,默認(rèn)值true瓷叫,沒有網(wǎng)絡(luò)時將拋出NetworkErrorException(message)
         * @param willchecknet 是否檢查網(wǎng)絡(luò)
         * @return 當(dāng)前operator
         */
        public Operator willchecknetfirst(boolean willchecknet){
            this.willchecknet = willchecknet;
            return operator;
        }

        /**
         * 請求完成是否檢查結(jié)果,默認(rèn)值true楞陷,接口沒有正常返回結(jié)果時將拋出NetworkErrorException(message)
         * @param willcheckresult 是否檢查結(jié)果
         * @return 當(dāng)前operator
         */
        public Operator willcheckresult(boolean willcheckresult){
            this.willcheckresult = willcheckresult;
            return operator;
        }


        /*****************************************************************************************************
         *
         * 以下方法為請求的調(diào)用怔鳖,返回數(shù)據(jù)源用作后續(xù)操作,沒有對線程進(jìn)行調(diào)度固蛾,用作http和其他數(shù)據(jù)源結(jié)合使用時或需要獲取原始數(shù)據(jù)源時结执。
         *
         *****************************************************************************************************/


        /**
         * 獲得一個http請求Observable
         * @param bean 請求參數(shù)
         * @param httpRequest 請求體
         * @param <T> 請求體泛型
         * @return http請求數(shù)據(jù)源
         */
        public <T> Observable<ResponesBean>excute(T bean, Function<T,ObservableSource<ResponesBean>> httpRequest){

            return Observable.just(bean).filter(new CheckNetPredicate<T>(contextWeakReference.get(),willchecknet))
                    .flatMap(httpRequest).filter(new HttpResultPredicate(willcheckresult));
        }

        /**
         * 獲得一個get請求的Observable
         * @param url 請求地址
         * @return get請求數(shù)據(jù)源
         */
        public Observable<ResponesBean> get(String url){
            return excute(url, new Function<String, ObservableSource<ResponesBean>>() {
                @Override
                public ObservableSource<ResponesBean> apply(String url) throws Exception {
                    return Observable.just(HttpFactory.get(url)).subscribeOn(Schedulers.io());
                }
            });
        }


        /**
         * 獲得一個post請求的Observable
         * @param rxjavaHttpBean 請求體
         * @param <T> RxjavaHttpBean的子類
         * @return post請求數(shù)據(jù)源
         */
        public <T extends RxjavaHttpBean> Observable<ResponesBean> post(T rxjavaHttpBean){
            return excute(rxjavaHttpBean, new Function<T, ObservableSource<ResponesBean>>() {
                @Override
                public ObservableSource<ResponesBean> apply(T rxjavaHttpBean) throws Exception {
                    return Observable.just(HttpFactory.post(rxjavaHttpBean.getUrl(),rxjavaHttpBean.getParams(),rxjavaHttpBean.getHeaders())).subscribeOn(Schedulers.io());
                }
            });
        }


        /**
         * 獲得一個upload請求的Observable
         * @param rxjavaHttpBean 請求體
         * @param <T> RxjavaHttpBean的子類
         * @return upload請求數(shù)據(jù)源
         */
        public <T extends RxjavaHttpBean> Observable<ResponesBean> upload(T rxjavaHttpBean){
            return excute(rxjavaHttpBean, new Function<T, ObservableSource<ResponesBean>>() {
                @Override
                public ObservableSource<ResponesBean> apply(T rxjavaHttpBean) throws Exception {
                    return Observable.just(HttpFactory.formUpload(rxjavaHttpBean.getUrl(),rxjavaHttpBean.getFiles(),rxjavaHttpBean.getParams())).subscribeOn(Schedulers.io());
                }
            });
        }


        /*****************************************************************************************************
         *
         * 以下方法為簡單的調(diào)用,直接傳入Consumer艾凯,io線程請求献幔,ui線程返回結(jié)果,并根據(jù)錯誤進(jìn)行toast彈出趾诗。
         *
         *****************************************************************************************************/

        /**
         * 直接執(zhí)行http請求蜡感,io線程請求,ui線程返回結(jié)果恃泪,并根據(jù)錯誤進(jìn)行toast彈出
         * @param bean 請求參數(shù)
         * @param httpRequest 請求體
         * @param consumer 訂閱者
         * @param <T> 請求體泛型
         * @return 返回Disposable用作生命周期控制
         */
        public <T> Disposable simpleExcute(T bean, Function<T,ObservableSource<ResponesBean>> httpRequest, Consumer<ResponesBean>consumer){

            return Observable.just(bean).filter(new CheckNetPredicate<T>(contextWeakReference.get(),willchecknet))
                    .flatMap(httpRequest).filter(new HttpResultPredicate(willcheckresult))
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .onErrorReturn(new HttpToastErrorReturn(contextWeakReference.get()))
                    .subscribe(consumer);
        }

        /**
         * 直接執(zhí)行g(shù)et請求铸敏,io線程請求,ui線程返回結(jié)果悟泵,并根據(jù)錯誤進(jìn)行toast彈出
         * @param url 請求地址
         * @param consumer 訂閱者
         * @return 返回Disposable用作生命周期控制
         */
        public Disposable simpleGet(String url, Consumer<ResponesBean>consumer){
            return simpleExcute(url, new Function<String, ObservableSource<ResponesBean>>() {
                @Override
                public ObservableSource<ResponesBean> apply(String url) throws Exception {
                    return Observable.just(HttpFactory.get(url)).subscribeOn(Schedulers.io());
                }
            },consumer);
        }


        /**
         * 直接執(zhí)行post請求杈笔,io線程請求,ui線程返回結(jié)果糕非,并根據(jù)錯誤進(jìn)行toast彈出
         * @param rxjavaHttpBean 請求內(nèi)容
         * @param consumer 訂閱者
         * @return 返回Disposable用作生命周期控制
         */
        public <T extends RxjavaHttpBean>  Disposable simplePost(T rxjavaHttpBean, Consumer<ResponesBean>consumer){
            return simpleExcute(rxjavaHttpBean, new Function<T, ObservableSource<ResponesBean>>() {
                @Override
                public ObservableSource<ResponesBean> apply(T rxjavaHttpBean) throws Exception {
                    return Observable.just(HttpFactory.post(rxjavaHttpBean.getUrl(),rxjavaHttpBean.getParams(),rxjavaHttpBean.getHeaders())).subscribeOn(Schedulers.io());
                }
            },consumer);
        }

        /**
         * 直接執(zhí)行upload請求蒙具,io線程請求,ui線程返回結(jié)果朽肥,并根據(jù)錯誤進(jìn)行toast彈出
         * @param rxjavaHttpBean 請求內(nèi)容
         * @param consumer 訂閱者
         * @return 返回Disposable用作生命周期控制
         */
        public <T extends RxjavaHttpBean>  Disposable simpleUpload(T rxjavaHttpBean, Consumer<ResponesBean>consumer){
            return simpleExcute(rxjavaHttpBean, new Function<T, ObservableSource<ResponesBean>>() {
                @Override
                public ObservableSource<ResponesBean> apply(T rxjavaHttpBean) throws Exception {
                    return Observable.just(HttpFactory.formUpload(rxjavaHttpBean.getUrl(),rxjavaHttpBean.getParams(),rxjavaHttpBean.getFiles())).subscribeOn(Schedulers.io());
                }
            },consumer);
        }

    }
}

三禁筏、Rxjava操作符詳解

Creating Observables

Operators that originate new Observables.
Create — create an Observable from scratch by calling observer methods programmatically
Defer — do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
Empty Never Throw — create Observables that have very precise and limited behavior
From — convert some other object or data structure into an Observable
Interval — create an Observable that emits a sequence of integers spaced by a particular time interval
Just — convert an object or a set of objects into an Observable that emits that or those objects
Range — create an Observable that emits a range of sequential integers
Repeat — create an Observable that emits a particular item or sequence of items repeatedly
Start — create an Observable that emits the return value of a function
Timer — create an Observable that emits a single item after a given delay

Transforming Observables

Operators that transform items that are emitted by an Observable.
Buffer — periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time
FlatMap — transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
GroupBy — divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key
Map — transform the items emitted by an Observable by applying a function to each item
Scan — apply a function to each item emitted by an Observable, sequentially, and emit each successive value
Window — periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time

Filtering Observables

Operators that selectively emit items from a source Observable.
Debounce — only emit an item from an Observable if a particular timespan has passed without it emitting another item
Distinct — suppress duplicate items emitted by an Observable
ElementAt — emit only item n emitted by an Observable
Filter — emit only those items from an Observable that pass a predicate test
First — emit only the first item, or the first item that meets a condition, from an Observable
IgnoreElements — do not emit any items from an Observable but mirror its termination notification
Last — emit only the last item emitted by an Observable
Sample — emit the most recent item emitted by an Observable within periodic time intervals
Skip — suppress the first n items emitted by an Observable
SkipLast — suppress the last n items emitted by an Observable
Take — emit only the first n items emitted by an Observable
TakeLast — emit only the last n items emitted by an Observable

Combining Observables

Operators that work with multiple source Observables to create a single Observable
And Then When — combine sets of items emitted by two or more Observables by means of Pattern
and Plan
intermediaries
CombineLatest — when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
Join — combine items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable
Merge — combine multiple Observables into one by merging their emissions
StartWith — emit a specified sequence of items before beginning to emit the items from the source Observable
Switch — convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables
Zip — combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

Error Handling Operators

Operators that help to recover from error notifications from an Observable
Catch — recover from an onError
notification by continuing the sequence without error
Retry — if a source Observable sends an onError
notification, resubscribe to it in the hopes that it will complete without error

Observable Utility Operators

A toolbox of useful Operators for working with Observables
Delay — shift the emissions from an Observable forward in time by a particular amount
Do — register an action to take upon a variety of Observable lifecycle events
Materialize Dematerialize — represent both the items emitted and the notifications sent as emitted items, or reverse this process
ObserveOn — specify the scheduler on which an observer will observe this Observable
Serialize — force an Observable to make serialized calls and to be well-behaved
Subscribe — operate upon the emissions and notifications from an Observable
SubscribeOn — specify the scheduler an Observable should use when it is subscribed to
TimeInterval — convert an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions
Timeout — mirror the source Observable, but issue an error notification if a particular period of time elapses without any emitted items
Timestamp — attach a timestamp to each item emitted by an Observable
Using — create a disposable resource that has the same lifespan as the Observable

Conditional and Boolean Operators

Operators that evaluate one or more Observables or items emitted by Observables
All — determine whether all items emitted by an Observable meet some criteria
Amb — given two or more source Observables, emit all of the items from only the first of these Observables to emit an item
Contains — determine whether an Observable emits a particular item or not
DefaultIfEmpty — emit items from the source Observable, or a default item if the source Observable emits nothing
SequenceEqual — determine whether two Observables emit the same sequence of items
SkipUntil — discard items emitted by an Observable until a second Observable emits an item
SkipWhile — discard items emitted by an Observable until a specified condition becomes false
TakeUntil — discard items emitted by an Observable after a second Observable emits an item or terminates
TakeWhile — discard items emitted by an Observable after a specified condition becomes false

Mathematical and Aggregate Operators

Operators that operate on the entire sequence of items emitted by an Observable
Average — calculates the average of numbers emitted by an Observable and emits this average
Concat — emit the emissions from two or more Observables without interleaving them
Count — count the number of items emitted by the source Observable and emit only this value
Max — determine, and emit, the maximum-valued item emitted by an Observable
Min — determine, and emit, the minimum-valued item emitted by an Observable
Reduce — apply a function to each item emitted by an Observable, sequentially, and emit the final value
Sum — calculate the sum of numbers emitted by an Observable and emit this sum

Backpressure Operators

backpressure operators — strategies for coping with Observables that produce items more rapidly than their observers consume them

Connectable Observable Operators

Specialty Observables that have more precisely-controlled subscription dynamics
Connect — instruct a connectable Observable to begin emitting items to its subscribers
Publish — convert an ordinary Observable into a connectable Observable
RefCount — make a Connectable Observable behave like an ordinary Observable
Replay — ensure that all observers see the same sequence of emitted items, even if they subscribe after the Observable has begun emitting items

Operators to Convert Observables

To — convert an Observable into another object or data structure

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市衡招,隨后出現(xiàn)的幾起案子篱昔,更是在濱河造成了極大的恐慌,老刑警劉巖始腾,帶你破解...
    沈念sama閱讀 212,816評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件州刽,死亡現(xiàn)場離奇詭異,居然都是意外死亡浪箭,警方通過查閱死者的電腦和手機穗椅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來奶栖,“玉大人匹表,你說我怎么就攤上這事门坷。” “怎么了袍镀?”我有些...
    開封第一講書人閱讀 158,300評論 0 348
  • 文/不壞的土叔 我叫張陵默蚌,是天一觀的道長。 經(jīng)常有香客問我苇羡,道長绸吸,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,780評論 1 285
  • 正文 為了忘掉前任宣虾,我火速辦了婚禮,結(jié)果婚禮上温数,老公的妹妹穿的比我還像新娘绣硝。我一直安慰自己,他們只是感情好撑刺,可當(dāng)我...
    茶點故事閱讀 65,890評論 6 385
  • 文/花漫 我一把揭開白布鹉胖。 她就那樣靜靜地躺著,像睡著了一般够傍。 火紅的嫁衣襯著肌膚如雪甫菠。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,084評論 1 291
  • 那天冕屯,我揣著相機與錄音寂诱,去河邊找鬼。 笑死安聘,一個胖子當(dāng)著我的面吹牛痰洒,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播浴韭,決...
    沈念sama閱讀 39,151評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼丘喻,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了念颈?” 一聲冷哼從身側(cè)響起泉粉,我...
    開封第一講書人閱讀 37,912評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎榴芳,沒想到半個月后嗡靡,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,355評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡窟感,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,666評論 2 327
  • 正文 我和宋清朗相戀三年叽躯,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片肌括。...
    茶點故事閱讀 38,809評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡点骑,死狀恐怖酣难,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情黑滴,我是刑警寧澤憨募,帶...
    沈念sama閱讀 34,504評論 4 334
  • 正文 年R本政府宣布,位于F島的核電站袁辈,受9級特大地震影響菜谣,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜晚缩,卻給世界環(huán)境...
    茶點故事閱讀 40,150評論 3 317
  • 文/蒙蒙 一尾膊、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧荞彼,春花似錦冈敛、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至寞缝,卻和暖如春癌压,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背荆陆。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評論 1 267
  • 我被黑心中介騙來泰國打工滩届, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人被啼。 一個月前我還...
    沈念sama閱讀 46,628評論 2 362
  • 正文 我出身青樓丐吓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親趟据。 傳聞我的和親對象是個殘疾皇子券犁,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,724評論 2 351