Android - RxJava2.0 操作符整理歸納

老婆保佑,代碼無(wú)BUG

目錄

  • 一:創(chuàng)建操作
  • 二:合并操作
  • 三:過(guò)濾操作
  • 四:切換線程
  • 五:條件/布爾操作
  • 六:聚合操作
  • 七:轉(zhuǎn)換操作
  • 八:變換操作
  • 九:錯(cuò)誤處理/重試機(jī)制
  • 十:連接操作
  • 十一:阻塞操作
  • 十二:工具集
  • 十三:Flowable (2.0出來(lái)的) 非操作符

前言

RxJava,這個(gè)詞爸邢,如果是android 開(kāi)發(fā)的小伙伴韵吨,估計(jì)早就聽(tīng)過(guò)不知道多少遍了茫蛹,如果你對(duì)RxJava 一點(diǎn)都不了解,推薦RxJava 入門(mén) 拋物線寫(xiě)的會(huì)讓你對(duì)RxJava罢浇,有個(gè)認(rèn)識(shí),本文只要是記錄操作符,畢竟太多了抓歼,記不住啊

一:創(chuàng)建操作

create

Observable.create(new Observable.OnSubscribe<String>() {

        @Override
        public void call(Subscriber<? super String> subscriber) {

            subscriber.onNext("item1");
            subscriber.onNext("item2");
            subscriber.onCompleted();
        }
    });

just

Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會(huì)依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();

from

String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 將會(huì)依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();

empty

創(chuàng)建一個(gè)什么都不做直接通知完成的Observable

error

創(chuàng)建一個(gè)什么都不做直接通知錯(cuò)誤的Observable

never

創(chuàng)建一個(gè)什么都不做的Observable

Observable observable1=Observable.empty();//直接調(diào)用onCompleted。
Observable observable2=Observable.error(new RuntimeException());//直接調(diào)用onError示绊。這里可以自定義異常
Observable observable3=Observable.never();//啥都不做

timer

創(chuàng)建一個(gè)在給定的延時(shí)之后

Observable.timer(1000,TimeUnit.MILLISECONDS)
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    Log.d("JG",aLong.toString()); // 0
                }
            });

interval

創(chuàng)建一個(gè)按照給定的時(shí)間間隔發(fā)射從0開(kāi)始的整數(shù)序列的

Observable.interval(1, TimeUnit.SECONDS)
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                     //每隔1秒發(fā)送數(shù)據(jù)項(xiàng)锭部,從0開(kāi)始計(jì)數(shù)
                     //0,1,2,3....
                }
            });

range:

創(chuàng)建一個(gè)發(fā)射指定范圍的整數(shù)序列的Observable<Integer>

Observable.range(2,5).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.d("JG",integer.toString());// 2,3,4,5,6 從2開(kāi)始發(fā)射5個(gè)數(shù)據(jù)
        }
    });

defer

只有當(dāng)訂閱者訂閱才創(chuàng)建Observable,為每個(gè)訂閱創(chuàng)建一個(gè)新的Observable面褐。內(nèi)部通過(guò)OnSubscribeDefer在訂閱時(shí)調(diào)用Func0創(chuàng)建Observable拌禾。

Observable.defer(new Func0<Observable<String>>() {
        @Override
        public Observable<String> call() {
            return Observable.just("hello");
        }
    }).subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            Log.d("JG",s);
        }
    });

二:合并操作

concat

按順序連接多個(gè)Observables。需要注意的是Observable.concat(a,b)等價(jià)于a.concatWith(b)展哭。

Observable<Integer> observable1=Observable.just(1,2,3,4);
    Observable<Integer>  observable2=Observable.just(4,5,6);

    Observable.concat(observable1,observable2)
            .subscribe(item->Log.d("JG",item.toString()));//1,2,3,4,4,5,6

startWith

在數(shù)據(jù)序列的開(kāi)頭增加一項(xiàng)數(shù)據(jù)湃窍。startWith的內(nèi)部也是調(diào)用了concat

 Observable.just(1,2,3,4,5)
            .startWith(6,7,8)
    .subscribe(item->Log.d("JG",item.toString()));//6,7,8,1,2,3,4,5

merge

將多個(gè)Observable合并為一個(gè)。不同于concat匪傍,merge不是按照添加順序連接您市,而是按照時(shí)間線來(lái)連接。其中mergeDelayError將異常延遲到其它沒(méi)有錯(cuò)誤的Observable發(fā)送完畢后才發(fā)射役衡。而merge則是一遇到異常將停止發(fā)射數(shù)據(jù)茵休,發(fā)送onError通知。

image

zip

使用一個(gè)函數(shù)組合多個(gè)Observable發(fā)射的數(shù)據(jù)集合手蝎,然后再發(fā)射這個(gè)結(jié)果榕莺。如果多個(gè)Observable發(fā)射的數(shù)據(jù)量不一樣,則以最少的Observable為標(biāo)準(zhǔn)進(jìn)行壓合棵介。內(nèi)部通過(guò)OperatorZip進(jìn)行壓合

image
Observable<Integer>  observable1=Observable.just(1,2,3,4);
Observable<Integer>  observable2=Observable.just(4,5,6);
    Observable.zip(observable1, observable2, new Func2<Integer, Integer, String>() {
        @Override
        public String call(Integer item1, Integer item2) {
            return item1+"and"+item2;
        }
    })
    .subscribe(item->Log.d("JG",item)); //1and4,2and5,3and6

combineLatest

當(dāng)兩個(gè)Observables中的任何一個(gè)發(fā)射了一個(gè)數(shù)據(jù)時(shí)钉鸯,通過(guò)一個(gè)指定的函數(shù)組合每個(gè)Observable發(fā)射的最新數(shù)據(jù)(一共兩個(gè)數(shù)據(jù)),然后發(fā)射這個(gè)函數(shù)的結(jié)果邮辽。類似于zip唠雕,但是,不同的是zip只有在每個(gè)Observable都發(fā)射了數(shù)據(jù)才工作吨述,而combineLatest任何一個(gè)發(fā)射了數(shù)據(jù)都可以工作岩睁,每次與另一個(gè)Observable最近的數(shù)據(jù)壓合。具體請(qǐng)看下面流程圖锐极。
zip工作流程

image

三:過(guò)濾操作

filter

過(guò)濾數(shù)據(jù)笙僚。內(nèi)部通過(guò)OnSubscribeFilter過(guò)濾數(shù)據(jù)。

Observable.just(3,4,5,6)
            .filter(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer integer) {
                    return integer>4;
                }
            })
    .subscribe(item->Log.d("JG",item.toString())); //5,6

ofType

過(guò)濾指定類型的數(shù)據(jù)灵再,與filter類似肋层,

Observable.just(1,2,"3")
            .ofType(Integer.class)
            .subscribe(item -> Log.d("JG",item.toString()));

take

只發(fā)射開(kāi)始的N項(xiàng)數(shù)據(jù)或者一定時(shí)間內(nèi)的數(shù)據(jù)亿笤。內(nèi)部通過(guò)OperatorTake和OperatorTakeTimed過(guò)濾數(shù)據(jù)。

Observable.just(3,4,5,6)
            .take(3)//發(fā)射前三個(gè)數(shù)據(jù)項(xiàng)
            .take(100, TimeUnit.MILLISECONDS)//發(fā)射100ms內(nèi)的數(shù)據(jù)

takeLast

只發(fā)射最后的N項(xiàng)數(shù)據(jù)或者一定時(shí)間內(nèi)的數(shù)據(jù)栋猖。內(nèi)部通過(guò)OperatorTakeLast和OperatorTakeLastTimed過(guò)濾數(shù)據(jù)净薛。takeLastBuffer和takeLast類似,不同點(diǎn)在于takeLastBuffer會(huì)收集成List后發(fā)射蒲拉。

 Observable.just(3,4,5,6)
            .takeLast(3)
            .subscribe(integer -> Log.d("JG",integer.toString()));//4,5,6

first/firstOrDefault:

只發(fā)射第一項(xiàng)(或者滿足某個(gè)條件的第一項(xiàng))數(shù)據(jù)肃拜,可以指定默認(rèn)值。

Observable.just(3,4,5,6)
            .first()
            .subscribe(integer -> Log.d("JG",integer.toString()));//3
            
Observable.just(3,4,5,6)
            .first(new Func1<Integer, Boolean>() {
                @Override
                 public Boolean call(Integer integer) {
                     return integer>3;
                 }
             }) .subscribe(integer -> Log.d("JG",integer.toString()));//4

last/lastOrDefault:

只發(fā)射最后一項(xiàng)(或者滿足某個(gè)條件的最后一項(xiàng))數(shù)據(jù)雌团,可以指定默認(rèn)值燃领。

skip

跳過(guò)開(kāi)始的N項(xiàng)數(shù)據(jù)或者一定時(shí)間內(nèi)的數(shù)據(jù)。內(nèi)部通過(guò)OperatorSkip和OperatorSkipTimed實(shí)現(xiàn)過(guò)濾锦援。

Observable.just(3,4,5,6)
               .skip(1)
            .subscribe(integer -> Log.d("JG",integer.toString()));//4,5,6

skipLast

跳過(guò)最后的N項(xiàng)數(shù)據(jù)或者一定時(shí)間內(nèi)的數(shù)據(jù)猛蔽。內(nèi)部通過(guò)OperatorSkipLast和OperatorSkipLastTimed實(shí)現(xiàn)過(guò)濾。

        Observable.just(3,4,5,6)
                 .elementAt(2)
        .subscribe(item->Log.d("JG",item.toString())); //5

ignoreElements

丟棄所有數(shù)據(jù)灵寺,只發(fā)射錯(cuò)誤或正常終止的通知曼库。內(nèi)部通過(guò)OperatorIgnoreElements實(shí)現(xiàn)。

distinct

過(guò)濾重復(fù)數(shù)據(jù)略板,內(nèi)部通過(guò)OperatorDistinct實(shí)現(xiàn)毁枯。

 Observable.just(3,4,5,6,3,3,4,9)
       .distinct()
      .subscribe(item->Log.d("JG",item.toString())); //3,4,5,6,9

distinctUntilChanged

過(guò)濾掉連續(xù)重復(fù)的數(shù)據(jù)。內(nèi)部通過(guò)OperatorDistinctUntilChanged實(shí)現(xiàn)

 Observable.just(3,4,5,6,3,3,4,9)
       .distinctUntilChanged()
      .subscribe(item->Log.d("JG",item.toString())); //3,4,5,6,3,4,9

throttleFirst:

定期發(fā)射Observable發(fā)射的第一項(xiàng)數(shù)據(jù)叮称。內(nèi)部通過(guò)OperatorThrottleFirst實(shí)現(xiàn)种玛。

Observable.create(subscriber -> {
        subscriber.onNext(1);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            throw Exceptions.propagate(e);
        }
        subscriber.onNext(2);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            throw Exceptions.propagate(e);
        }

        subscriber.onNext(3);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw Exceptions.propagate(e);
        }
        subscriber.onNext(4);
        subscriber.onNext(5);
        subscriber.onCompleted();

    }).throttleFirst(999, TimeUnit.MILLISECONDS)
            .subscribe(item-> Log.d("JG",item.toString())); //結(jié)果為1,3,4

throttleWithTimeout/debounce:

發(fā)射數(shù)據(jù)時(shí),如果兩次數(shù)據(jù)的發(fā)射間隔小于指定時(shí)間瓤檐,就會(huì)丟棄前一次的數(shù)據(jù),直到指定時(shí)間內(nèi)都沒(méi)有新數(shù)據(jù)發(fā)射時(shí)
才進(jìn)行發(fā)射

Observable.create(subscriber -> {
         subscriber.onNext(1);
         try {
             Thread.sleep(500);
         } catch (InterruptedException e) {
             throw Exceptions.propagate(e);
         }
         subscriber.onNext(2);
         try {
             Thread.sleep(500);
         } catch (InterruptedException e) {
             throw Exceptions.propagate(e);
         }

         subscriber.onNext(3);
         try {
             Thread.sleep(1000);
         } catch (InterruptedException e) {
             throw Exceptions.propagate(e);
         }
         subscriber.onNext(4);
         subscriber.onNext(5);
         subscriber.onCompleted();

     }).debounce(999, TimeUnit.MILLISECONDS)//或者為throttleWithTimeout(1000, TimeUnit.MILLISECONDS)
             .subscribe(item-> Log.d("JG",item.toString())); //結(jié)果為3,5

sample/throttleLast:

定期發(fā)射Observable最近的數(shù)據(jù)蒂誉。內(nèi)部通過(guò)OperatorSampleWithTime實(shí)現(xiàn)。

 Observable.create(subscriber -> {
        subscriber.onNext(1);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            throw Exceptions.propagate(e);
        }
        subscriber.onNext(2);
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            throw Exceptions.propagate(e);
        }

        subscriber.onNext(3);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw Exceptions.propagate(e);
        }
        subscriber.onNext(4);
        subscriber.onNext(5);
        subscriber.onCompleted();

    }).sample(999, TimeUnit.MILLISECONDS)//或者為throttleLast(1000, TimeUnit.MILLISECONDS)
            .subscribe(item-> Log.d("JG",item.toString())); //結(jié)果為2,3,5
            

timeout:

如果原始Observable過(guò)了指定的一段時(shí)長(zhǎng)沒(méi)有發(fā)射任何數(shù)據(jù)距帅,就發(fā)射一個(gè)異常或者使用備用的Observable括堤。

   Observable.create(( subscriber) -> {
        subscriber.onNext(1);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw Exceptions.propagate(e);
        }
        subscriber.onNext(2);
       
        subscriber.onCompleted();

    }).timeout(999, TimeUnit.MILLISECONDS,Observable.just(99,100))//如果不指定備用Observable將會(huì)拋出異常
            .subscribe(item-> Log.d("JG",item.toString()),error->Log.d("JG","onError")); //結(jié)果為1,99,100  如果不指定備用Observable結(jié)果為1,onError
}

四:切換線程

.subscribeOn(Schedulers.newThread())                                              
.observeOn(AndroidSchedulers.mainThread())
  • subscribeOn
    • 發(fā)送事件的線程
  • observeOn
    • 接收事件的線程

線程選項(xiàng)

  • Schedulers.io()
    • 代表io操作的線程, 通常用于網(wǎng)絡(luò),讀寫(xiě)文件等io密集型的操作
  • Schedulers.computation()
    • 代表CPU計(jì)算密集型的操作, 例如需要大量計(jì)算的操作
  • Schedulers.newThread()
    • 代表一個(gè)常規(guī)的新線程
  • AndroidSchedulers.mainThread()
    • 代表Android的主線程

五:條件/布爾操作

all:

判斷所有的數(shù)據(jù)項(xiàng)是否滿足某個(gè)條件碌秸,內(nèi)部通過(guò)OperatorAll實(shí)現(xiàn)。

  Observable.just(2,3,4,5)
            .all(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer integer) {
                    return integer>3;
                }
            })
    .subscribe(new Action1<Boolean>() {
        @Override
        public void call(Boolean aBoolean) {
            Log.d("JG",aBoolean.toString()); //false
        }
    });

exists:

判斷是否存在數(shù)據(jù)項(xiàng)滿足某個(gè)條件悄窃。內(nèi)部通過(guò)OperatorAny實(shí)現(xiàn)讥电。

   Observable.just(2,3,4,5)
            .exists(integer -> integer>3)
            .subscribe(aBoolean -> Log.d("JG",aBoolean.toString())); //true

contains:

判斷在發(fā)射的所有數(shù)據(jù)項(xiàng)中是否包含指定的數(shù)據(jù),內(nèi)部調(diào)用的其實(shí)是exists

  Observable.just(2,3,4,5)
            .contains(3)
            .subscribe(aBoolean -> Log.d("JG",aBoolean.toString())); //true
        

sequenceEqual:

用于判斷兩個(gè)Observable發(fā)射的數(shù)據(jù)是否相同(數(shù)據(jù)轧抗,發(fā)射順序恩敌,終止?fàn)顟B(tài))。

 Observable.sequenceEqual(Observable.just(2,3,4,5),Observable.just(2,3,4,5))
            .subscribe(aBoolean -> Log.d("JG",aBoolean.toString()));//true

isEmpty:

用于判斷Observable發(fā)射完畢時(shí)横媚,有沒(méi)有發(fā)射數(shù)據(jù)纠炮。有數(shù)據(jù)false月趟,如果只收到了onComplete通知?jiǎng)t為true。

  Observable.just(3,4,5,6)
               .isEmpty()
              .subscribe(item -> Log.d("JG",item.toString()));//false
              

amb:

給定多個(gè)Observable恢口,只讓第一個(gè)發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù)孝宗,其他Observable將會(huì)被忽略。

    Observable<Integer> observable1=Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                subscriber.onError(e);
            }
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onCompleted();
        }
    }).subscribeOn(Schedulers.computation());

    Observable<Integer> observable2=Observable.create(subscriber -> {
        subscriber.onNext(3);
        subscriber.onNext(4);
        subscriber.onCompleted();
    });

    Observable.amb(observable1,observable2)
    .subscribe(integer -> Log.d("JG",integer.toString())); //3,4
    

switchIfEmpty:

如果原始Observable正常終止后仍然沒(méi)有發(fā)射任何數(shù)據(jù)耕肩,就使用備用的Observable因妇。

   Observable.empty()
            .switchIfEmpty(Observable.just(2,3,4))
    .subscribe(o -> Log.d("JG",o.toString())); //2,3,4
    

defaultIfEmpty:

如果原始Observable正常終止后仍然沒(méi)有發(fā)射任何數(shù)據(jù),就發(fā)射一個(gè)默認(rèn)值,內(nèi)部調(diào)用的switchIfEmpty猿诸。

takeUntil:

當(dāng)發(fā)射的數(shù)據(jù)滿足某個(gè)條件后(包含該數(shù)據(jù))婚被,或者第二個(gè)Observable發(fā)送完畢,終止第一個(gè)Observable發(fā)送數(shù)據(jù)梳虽。

 Observable.just(2,3,4,5)
            .takeUntil(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer integer) {
                    return integer==4;
                }
            }).subscribe(integer -> Log.d("JG",integer.toString())); //2,3,4

takeWhile:

當(dāng)發(fā)射的數(shù)據(jù)滿足某個(gè)條件時(shí)(不包含該數(shù)據(jù))址芯,Observable終止發(fā)送數(shù)據(jù)。

  Observable.just(2,3,4,5)
            .takeWhile(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer integer) {
                    return integer==4;
                }
            })
            .subscribe(integer -> Log.d("JG",integer.toString())); //2,3
            

skipUntil:

丟棄Observable發(fā)射的數(shù)據(jù)怖辆,直到第二個(gè)Observable發(fā)送數(shù)據(jù)是复。(丟棄條件數(shù)據(jù))

skipWhile:

丟棄Observable發(fā)射的數(shù)據(jù),直到一個(gè)指定的條件不成立(不丟棄條件數(shù)據(jù))

六:聚合操作

reduce:

對(duì)序列使用reduce()函數(shù)并發(fā)射最終的結(jié)果,內(nèi)部使用OnSubscribeReduce實(shí)現(xiàn)竖螃。

  Observable.just(2,3,4,5)
            .reduce(new Func2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer sum, Integer item) {
                    return sum+item;
                }
            })
            .subscribe(integer -> Log.d("JG",integer.toString()));//14
            

collect:

使用collect收集數(shù)據(jù)到一個(gè)可變的數(shù)據(jù)結(jié)構(gòu)淑廊。

  Observable.just(3,4,5,6)
               .collect(new Func0<List<Integer>>() { //創(chuàng)建數(shù)據(jù)結(jié)構(gòu)

                   @Override
                   public List<Integer> call() {
                       return new ArrayList<Integer>();
                   }
               }, new Action2<List<Integer>, Integer>() { //收集器
                   @Override
                   public void call(List<Integer> integers, Integer integer) {
                       integers.add(integer);
                   }
               })
              .subscribe(new Action1<List<Integer>>() {
                  @Override
                  public void call(List<Integer> integers) {
                      
                  }
              });
              

count/countLong:

計(jì)算發(fā)射的數(shù)量,內(nèi)部調(diào)用的是reduce.

doOnNext()

允許我們?cè)诿看屋敵鲆粋€(gè)元素之前做一些額外的事情特咆。

Observable.just(list).flatMap(new Function<List<String>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(List<String> strings) throws Exception {
                return Observable.fromIterable(strings);
            }
        }).take(5).doOnNext(new Consumer<Object>() {
            @Override
            public void accept(Object o) throws Exception {
                System.out.println("準(zhǔn)備工作");
            }
        }).subscribe(new Consumer<Object>() {
            @Override
            public void accept(Object s) throws Exception {
                System.out.println((String)s);
            }
        });


七:轉(zhuǎn)換操作

toList:

收集原始Observable發(fā)射的所有數(shù)據(jù)到一個(gè)列表季惩,然后返回這個(gè)列表.

    Observable.just(2,3,4,5)
            .toList()
            .subscribe(new Action1<List<Integer>>() {
                @Override
                public void call(List<Integer> integers) {
                    
                }
            });

toSortedList:

收集原始Observable發(fā)射的所有數(shù)據(jù)到一個(gè)有序列表,然后返回這個(gè)列表腻格。

   Observable.just(6,2,3,4,5)
            .toSortedList(new Func2<Integer, Integer, Integer>() {//自定義排序
                @Override
                public Integer call(Integer integer, Integer integer2) {
                    return integer-integer2; //>0 升序 画拾,<0 降序
                }
            })
            .subscribe(new Action1<List<Integer>>() {
                @Override
                public void call(List<Integer> integers) {
                    Log.d("JG",integers.toString()); // [2, 3, 4, 5, 6]
                }
            });

toMap:

將序列數(shù)據(jù)轉(zhuǎn)換為一個(gè)Map。我們可以根據(jù)數(shù)據(jù)項(xiàng)生成key和生成value菜职。

    Observable.just(6,2,3,4,5)
            .toMap(new Func1<Integer, String>() {
                @Override
                public String call(Integer integer) {
                    return "key:" + integer; //根據(jù)數(shù)據(jù)項(xiàng)生成map的key
                }
            }, new Func1<Integer, String>() {
                @Override
                public String call(Integer integer) {
                    return "value:"+integer; //根據(jù)數(shù)據(jù)項(xiàng)生成map的kvalue
                }
            }).subscribe(new Action1<Map<String, String>>() {
        @Override
        public void call(Map<String, String> stringStringMap) {
            Log.d("JG",stringStringMap.toString()); // {key:6=value:6, key:5=value:5, key:4=value:4, key:2=value:2, key:3=value:3}
        }
    });

toMultiMap

: 類似于toMap青抛,不同的地方在于map的value是一個(gè)集合。

八:變換操作

map

: 對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)都應(yīng)用一個(gè)函數(shù)來(lái)變換酬核。

 Observable.just(6,2,3,4,5)
            .map(integer -> "item:"+integer)
            .subscribe(s -> Log.d("JG",s));//item:6,item:2....

cast:

在發(fā)射之前強(qiáng)制將Observable發(fā)射的所有數(shù)據(jù)轉(zhuǎn)換為指定類型

flatMap:

將Observable發(fā)射的數(shù)據(jù)變換為Observables集合蜜另,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進(jìn)一個(gè)單獨(dú)的Observable,內(nèi)部采用merge合并嫡意。

       Observable.just(2,3,5)
            .flatMap(new Func1<Integer, Observable<String>>() {
                @Override
                public Observable<String> call(Integer integer) {
                    return Observable.create(subscriber -> {
                        subscriber.onNext(integer*10+"");
                        subscriber.onNext(integer*100+"");
                        subscriber.onCompleted();
                    });
                }
            })
    .subscribe(o -> Log.d("JG",o)) //20,200,30,300,50,500

flatMapIterable:

和flatMap的作用一樣举瑰,只不過(guò)生成的是Iterable而不是Observable。

        Observable.just(2,3,5)
            .flatMapIterable(new Func1<Integer, Iterable<String>>() {
                @Override
                public Iterable<String> call(Integer integer) {
                    return Arrays.asList(integer*10+"",integer*100+"");
                }
            }).subscribe(new Action1<String>() {
              @Override
              public void call(String s) {
            
              }
    });

concatMap:

類似于flatMap蔬螟,由于內(nèi)部使用concat合并此迅,所以是按照順序連接發(fā)射。

switchMap:

和flatMap很像,將Observable發(fā)射的數(shù)據(jù)變換為Observables集合耸序,當(dāng)原始Observable發(fā)射一個(gè)新的數(shù)據(jù)(Observable)時(shí)忍些,它將取消訂閱前一個(gè)Observable。

  Observable.create(new Observable.OnSubscribe<Integer>() {

        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            for(int i=1;i<4;i++){
                subscriber.onNext(i);
                Utils.sleep(500,subscriber);//線程休眠500ms
            }

            subscriber.onCompleted();
        }
    }).subscribeOn(Schedulers.newThread())
      .switchMap(new Func1<Integer, Observable<Integer>>() {
             @Override
           public Observable<Integer> call(Integer integer) {
                   //每當(dāng)接收到新的數(shù)據(jù)佑吝,之前的Observable將會(huì)被取消訂閱
                    return Observable.create(new Observable.OnSubscribe<Integer>() {
                        @Override
                        public void call(Subscriber<? super Integer> subscriber) {
                            subscriber.onNext(integer*10);
                            Utils.sleep(500,subscriber);
                            subscriber.onNext(integer*100);
                            subscriber.onCompleted();
                        }
                    }).subscribeOn(Schedulers.newThread());
                }
            })
            .subscribe(s -> Log.d("JG",s.toString()));//10,20,30,300

scan:

與reduce很像坐昙,對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),然后按順序依次發(fā)射每一個(gè)值芋忿。

  Observable.just(2,3,5)
            .scan(new Func2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer sum, Integer item) {
                    return sum+item;
                }
            })
    .subscribe(integer -> Log.d("JG",integer.toString())) //2,5,10
Observable.just(1, 2, 3, 4, 5)
        .scan(-1,new Func2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer sum, Integer item) {
                return sum + item;
            }
        })

groupBy:

將Observable分拆為Observable集合炸客,將原始Observable發(fā)射的數(shù)據(jù)按Key分組,每一個(gè)Observable發(fā)射一組不同的數(shù)據(jù)戈钢。

   Observable.just(2,3,5,6)
            .groupBy(new Func1<Integer, String>() {
                @Override
                public String call(Integer integer) {//分組
                    return integer%2==0?"偶數(shù)":"奇數(shù)";
                }
            })
    .subscribe(new Action1<GroupedObservable<String, Integer>>() {
        @Override
        public void call(GroupedObservable<String, Integer> o) {

            o.subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    Log.d("JG",o.getKey()+":"+integer.toString()); //偶數(shù):2痹仙,奇數(shù):3,...
                }
            });
        }
    })
    

buffer:

它定期從Observable收集數(shù)據(jù)到一個(gè)集合殉了,然后把這些數(shù)據(jù)集合打包發(fā)射开仰,而不是一次發(fā)射一個(gè)

    Observable.just(2,3,5,6)
            .buffer(3)
            .subscribe(new Action1<List<Integer>>() {
                @Override
                public void call(List<Integer> integers) {
                    
                }
            })

window:

定期將來(lái)自O(shè)bservable的數(shù)據(jù)分拆成一些Observable窗口,然后發(fā)射這些窗口薪铜,而不是每次發(fā)射一項(xiàng)众弓。

   Observable.just(2,3,5,6)
            .window(3)
            .subscribe(new Action1<Observable<Integer>>() {
                @Override
                public void call(Observable<Integer> integerObservable) {
                    integerObservable.subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            
                        }
                    });
                }
            })

九:錯(cuò)誤處理/重試機(jī)制

onErrorResumeNext:

當(dāng)原始Observable在遇到錯(cuò)誤時(shí),使用備用Observable隔箍。谓娃。

  Observable.just(1,"2",3)
    .cast(Integer.class)
    .onErrorResumeNext(Observable.just(1,2,3))
    .subscribe(integer -> Log.d("JG",integer.toString())) //1,2,3
    ;

onExceptionResumeNext:

當(dāng)原始Observable在遇到異常時(shí),使用備用的Observable蜒滩。與onErrorResumeNext類似滨达,區(qū)別在于onErrorResumeNext可以處理所有的錯(cuò)誤,onExceptionResumeNext只能處理異常俯艰。

onErrorReturn:

當(dāng)原始Observable在遇到錯(cuò)誤時(shí)發(fā)射一個(gè)特定的數(shù)據(jù)捡遍。

 Observable.just(1,"2",3)
            .cast(Integer.class)
            .onErrorReturn(new Func1<Throwable, Integer>() {
                @Override
                public Integer call(Throwable throwable) {
                    return 4;
                }
            }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.d("JG",integer.toString());1,4
        }
    });

retry:

當(dāng)原始Observable在遇到錯(cuò)誤時(shí)進(jìn)行重試。

    Observable.just(1,"2",3)
    .cast(Integer.class)
    .retry(3)
    .subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError"))
    ;//1,1,1,1,onError

retryWhen

: 當(dāng)原始Observable在遇到錯(cuò)誤竹握,將錯(cuò)誤傳遞給另一個(gè)Observable來(lái)決定是否要重新訂閱這個(gè)Observable,內(nèi)部調(diào)用的是retry画株。

  Observable.just(1,"2",3)
    .cast(Integer.class)
    .retryWhen(new Func1<Observable<? extends Throwable>, Observable<Long>>() {
        @Override
        public Observable<Long> call(Observable<? extends Throwable> observable) {
            return Observable.timer(1, TimeUnit.SECONDS);
        }
    })
    .subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError"));
    //1,1

十:連接操作

ConnectableObservable與普通的Observable差不多,但是可連接的Observable在被訂閱時(shí)并不開(kāi)始發(fā)射數(shù)據(jù)啦辐,只有在它的connect()被調(diào)用時(shí)才開(kāi)始污秆。用這種方法,你可以等所有的潛在訂閱者都訂閱了這個(gè)Observable之后才開(kāi)始發(fā)射數(shù)據(jù)昧甘。

ConnectableObservable.connect()

指示一個(gè)可連接的Observable開(kāi)始發(fā)射數(shù)據(jù).

Observable.publish()

將一個(gè)Observable轉(zhuǎn)換為一個(gè)可連接的Observable

Observable.replay()

確保所有的訂閱者看到相同的數(shù)據(jù)序列的ConnectableObservable,即使它們?cè)贠bservable開(kāi)始發(fā)射數(shù)據(jù)之后才訂閱战得。

ConnectableObservable.refCount()

讓一個(gè)可連接的Observable表現(xiàn)得像一個(gè)普通的Observable充边。

       ConnectableObservable<Integer> co= Observable.just(1,2,3)
                .publish();

        co .subscribe(integer -> Log.d("JG",integer.toString()) );
        co.connect();//此時(shí)開(kāi)始發(fā)射數(shù)據(jù)

十一:阻塞操作

BlockingObservable是一個(gè)阻塞的Observable。普通的Observable 轉(zhuǎn)換為 BlockingObservable,可以使用 Observable.toBlocking(?)方法或者BlockingObservable.from(?)方法浇冰。內(nèi)部通過(guò)CountDownLatch實(shí)現(xiàn)了阻塞操作贬媒。。

以下的操作符可以用于BlockingObservable肘习,如果是普通的Observable际乘,務(wù)必使用Observable.toBlocking()轉(zhuǎn)為阻塞Observable后使用,否則達(dá)不到預(yù)期的效果漂佩。

在Rxjava1中的BlockingObservable已經(jīng)在Rxjava2中去掉了脖含,在Rxjava2中已經(jīng)集成到了Observable中。

名稱 解析
blockingForEach() 對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)調(diào)用一個(gè)方法投蝉,會(huì)阻塞直到Observable完成
blockingFirst() 阻塞直到Observable發(fā)射了一個(gè)數(shù)據(jù)养葵,然后返回第一項(xiàng)數(shù)據(jù)
blockingMostRecent() 返回一個(gè)總是返回Observable最近發(fā)射的數(shù)據(jù)的iterable
blockingLatest() 返回一個(gè)iterable,會(huì)阻塞直到或者除非Observable發(fā)射了一個(gè)iterable沒(méi)有返回的值瘩缆,然后返回這個(gè)值
blockingNext() 返回一個(gè)iterable,阻塞直到返回另外一個(gè)值
blockingLast() 阻塞直到Observable終止关拒,然后返回最后一項(xiàng)數(shù)據(jù)
blockingIterable() 將Observable轉(zhuǎn)換返回一個(gè)iterable.
blockingSingle() 如果Observable終止時(shí)只發(fā)射了一個(gè)值,返回那個(gè)值庸娱,否則拋出異常
blockingSubscribe() 在當(dāng)前線程訂閱着绊,和forEach類似

十二:工具集

materialize:

將Observable轉(zhuǎn)換成一個(gè)通知列表。

 Observable.just(1,2,3)
           .materialize()
           .subscribe(new Action1<Notification<Integer>>() {
               @Override
               public void call(Notification<Integer> notification) {
                   Log.d("JG",notification.getKind()+" "+notification.getValue());
                   //OnNext 1
                   //OnNext 2
                   //OnNext 3
                   //OnCompleted null
               }
           });

dematerialize:

與上面的作用相反熟尉,將通知逆轉(zhuǎn)回一個(gè)Observable归露。

timestamp:

給Observable發(fā)射的每個(gè)數(shù)據(jù)項(xiàng)添加一個(gè)時(shí)間戳。

  Observable.just(1,2,3)
           .timestamp()
           .subscribe(new Action1<Timestamped<Integer>>() {
               @Override
               public void call(Timestamped<Integer> timestamped) {
                   Log.d("JG",timestamped.getTimestampMillis()+" "+timestamped.getValue());
                   //1472627510548 1
                   //1472627510549 2
                   //1472627510549 3
               }
           });

timeInterval:

給Observable發(fā)射的兩個(gè)數(shù)據(jù)項(xiàng)間添加一個(gè)時(shí)間差臣樱,實(shí)現(xiàn)在OperatorTimeInterval中
timeInterval

serialize:

強(qiáng)制Observable按次序發(fā)射數(shù)據(jù)并且要求功能是完好的

cache:

緩存Observable發(fā)射的數(shù)據(jù)序列并發(fā)射相同的數(shù)據(jù)序列給后續(xù)的訂閱者

observeOn:

指定觀察者觀察Observable的調(diào)度器

subscribeOn:

指定Observable執(zhí)行任務(wù)的調(diào)度器

doOnEach:

注冊(cè)一個(gè)動(dòng)作靶擦,對(duì)Observable發(fā)射的每個(gè)數(shù)據(jù)項(xiàng)使用

   Observable.just(2,3)
             .doOnEach(new Action1<Notification<? super Integer>>() {
                 @Override
                 public void call(Notification<? super Integer> notification) {
                     Log.d("JG","--doOnEach--"+notification.toString());
                 }
             })
             .subscribe(integer -> Log.d("JG",integer.toString()));
 //結(jié)果為:            
  // --doOnEach--[rx.Notification@133c40b0 OnNext 2]
 // 2
  // --doOnEach--[rx.Notification@133c40b0 OnNext 3]
 // 3
// --doOnEach--[rx.Notification@df4db0e OnCompleted]

doOnCompleted':

注冊(cè)一個(gè)動(dòng)作,對(duì)正常完成的Observable使用

doOnError:

注冊(cè)一個(gè)動(dòng)作雇毫,對(duì)發(fā)生錯(cuò)誤的Observable使用

doOnTerminate

:注冊(cè)一個(gè)動(dòng)作玄捕,對(duì)完成的Observable使用,無(wú)論是否發(fā)生錯(cuò)誤

  Observable.just(2,3)
            .doOnTerminate(new Action0() {
                @Override
                public void call() {
                    Log.d("JG","--doOnTerminate--");
                }
            })
            .subscribe(integer -> Log.d("JG",integer.toString()));
// 2 , 3 , --doOnTerminate--

doOnSubscribe:

注冊(cè)一個(gè)動(dòng)作棚放,在觀察者訂閱時(shí)使用枚粘。內(nèi)部由OperatorDoOnSubscribe實(shí)現(xiàn)doOnSubscribe

doOnUnsubscribe

: 注冊(cè)一個(gè)動(dòng)作,在觀察者取消訂閱時(shí)使用飘蚯。內(nèi)部由OperatorDoOnUnsubscribe實(shí)現(xiàn)馍迄,在call中加入一個(gè)解綁動(dòng)作。
doOnUnsubscribe

finallyDo/doAfterTerminate:

注冊(cè)一個(gè)動(dòng)作局骤,在Observable完成時(shí)使用

Observable.just(2,3)
            .doAfterTerminate(new Action0() {
                @Override
                public void call() {
                    Log.d("JG","--doAfterTerminate--");
                }
            })
            .subscribe(integer -> Log.d("JG",integer.toString()));
//2,3,  --doAfterTerminate-- 

delay:

延時(shí)發(fā)射Observable的結(jié)果攀圈。即讓原始Observable在發(fā)射每項(xiàng)數(shù)據(jù)之前都暫停一段指定的時(shí)間段。效果是Observable發(fā)射的數(shù)據(jù)項(xiàng)在時(shí)間上向前整體平移了一個(gè)增量(除了onError峦甩,它會(huì)即時(shí)通知)赘来。

delaySubscription:

延時(shí)處理訂閱請(qǐng)求现喳。實(shí)現(xiàn)在OnSubscribeDelaySubscription中
delaySubscription

using

: 創(chuàng)建一個(gè)只在Observable生命周期存在的資源,當(dāng)Observable終止時(shí)這個(gè)資源會(huì)被自動(dòng)釋放犬辰。

  Observable.using(new Func0<File>() {//資源工廠
        @Override
        public File call() {

            File file = new File(getCacheDir(), "a.txt");
            if(!file.exists()){
                try {
                    Log.d("JG","--create--");
                    file.createNewFile();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            return file;
        }
    }, new Func1<File, Observable<String>>() { //Observable
        @Override
        public Observable<String> call(File file) {
            return Observable.just(file.exists() ? "exist" : "no exist");
        }
    }, new Action1<File>() {//釋放資源動(dòng)作
        @Override
        public void call(File file) {
            if(file!=null&&file.exists()){
                Log.d("JG","--delete--");
                file.delete();
            }
        }
    })
    .subscribe(s -> Log.d("JG",s))
    ;
 //--create--
 //exist
 //--delete--

single/singleOrDefault:

強(qiáng)制返回單個(gè)數(shù)據(jù)嗦篱,否則拋出異常或默認(rèn)數(shù)據(jù)幌缝。

十三:Flowable (2.0出來(lái)的) 非操作符

Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR); //增加了一個(gè)參數(shù)

        Subscriber<Integer> downstream = new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onSubscribe");
                s.request(Long.MAX_VALUE);  //注意這句代碼
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);

            }

            @Override
            public void onError(Throwable t) {
                 Log.w(TAG, "onError: ", t);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

s.request(Long.MAX_VALUE); //注意這句代碼

處理事件 默認(rèn)處理 128 灸促,當(dāng)發(fā)送的事件 > 處理的事件 MissingBackpressureException異常

BackpressureStrategy

  • BackpressureStrategy.BUFFER
    • 沒(méi)有128 的限制
  • BackpressureStrategy.ERROR
    • 默認(rèn)128
  • BackpressureStrategy.DROP
    • 直接把存不下的事件丟棄
  • BackpressureStrategy.LATEST
    • 只保留最新的事件

寫(xiě)在最后

感謝作者maplejaw

RxJava操作符大全

推薦Rxjava系列

給初學(xué)者的RxJava2.0教程

官網(wǎng)學(xué)習(xí)筆記

ReactiveX/RxJava文檔中文版

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市涵卵,隨后出現(xiàn)的幾起案子浴栽,更是在濱河造成了極大的恐慌,老刑警劉巖缘厢,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件吃度,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡贴硫,警方通過(guò)查閱死者的電腦和手機(jī)椿每,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)英遭,“玉大人间护,你說(shuō)我怎么就攤上這事⊥谥睿” “怎么了汁尺?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)多律。 經(jīng)常有香客問(wèn)我痴突,道長(zhǎng),這世上最難降的妖魔是什么狼荞? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任辽装,我火速辦了婚禮,結(jié)果婚禮上相味,老公的妹妹穿的比我還像新娘拾积。我一直安慰自己,他們只是感情好丰涉,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布拓巧。 她就那樣靜靜地躺著,像睡著了一般一死。 火紅的嫁衣襯著肌膚如雪肛度。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,125評(píng)論 1 297
  • 那天投慈,我揣著相機(jī)與錄音承耿,去河邊找鬼策吠。 笑死,一個(gè)胖子當(dāng)著我的面吹牛瘩绒,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播带族,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼锁荔,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了蝙砌?” 一聲冷哼從身側(cè)響起阳堕,我...
    開(kāi)封第一講書(shū)人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎择克,沒(méi)想到半個(gè)月后恬总,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡肚邢,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年壹堰,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片骡湖。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡贱纠,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出响蕴,到底是詐尸還是另有隱情谆焊,我是刑警寧澤,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布浦夷,位于F島的核電站辖试,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏劈狐。R本人自食惡果不足惜罐孝,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望懈息。 院中可真熱鬧肾档,春花似錦、人聲如沸辫继。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)姑宽。三九已至遣耍,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間炮车,已是汗流浹背舵变。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來(lái)泰國(guó)打工酣溃, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人纪隙。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓赊豌,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親绵咱。 傳聞我的和親對(duì)象是個(gè)殘疾皇子碘饼,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353