RxJava 操作符

創(chuàng)建操作

以下操作符用于創(chuàng)建Observable。
create: 使用OnSubscribe從頭創(chuàng)建一個Observable祟身,這種方法比較簡單。需要注意的是袜硫,使用該方法創(chuàng)建時氯葬,建議在OnSubscribe#call方法中檢查訂閱狀態(tài),以便及時停止發(fā)射數(shù)據(jù)或者運(yùn)算父款。

      Observable.create(new Observable.OnSubscribe<String>() {

          @Override
          public void call(Subscriber<? super String> subscriber) {

              subscriber.onNext("item1");
              subscriber.onNext("item2");
              subscriber.onCompleted();
          }
      });

from: 將一個Iterable, 一個Future, 或者一個數(shù)組溢谤,內(nèi)部通過代理的方式轉(zhuǎn)換成一個Observable。
Future轉(zhuǎn)換為OnSubscribe是通過OnSubscribeToObservableFuture進(jìn)行的憨攒,
Iterable轉(zhuǎn)換通過OnSubscribeFromIterable進(jìn)行世杀。數(shù)組通過OnSubscribeFromArray轉(zhuǎn)換。

      //Iterable
      List<String> list=new ArrayList<>();
      ...
      Observable.from(list)
              .subscribe(new Action1<String>() {
          @Override
          public void call(String s) {

          }
      });

      //Future
       Future<String> futrue= Executors.newSingleThreadExecutor().submit(new Callable<String>() {

          @Override
          public String call() throws Exception {
              Thread.sleep(1000);
              return "maplejaw";
          }
      });

      Observable.from(futrue)
                .subscribe(new Action1<String>() {
          @Override
          public void call(String s) {

          }
      });
;

just: 將一個或多個對象轉(zhuǎn)換成發(fā)射這個或這些對象的一個Observable肝集。如果是單個對象瞻坝,內(nèi)部創(chuàng)建的是ScalarSynchronousObservable
對象。如果是多個對象杏瞻,則是調(diào)用了from方法創(chuàng)建所刀。

empty: 創(chuàng)建一個什么都不做直接通知完成的Observable
error: 創(chuàng)建一個什么都不做直接通知錯誤的Observable
never: 創(chuàng)建一個什么都不做的Observable

      Observable observable1=Observable.empty();//直接調(diào)用onCompleted衙荐。
      Observable observable2=Observable.error(new RuntimeException());//直接調(diào)用onError。這里可以自定義異常
      Observable observable3=Observable.never();//啥都不做

timer: 創(chuàng)建一個在給定的延時之后發(fā)射數(shù)據(jù)項(xiàng)為0的Observable<Long>,內(nèi)部通過OnSubscribeTimerOnce工作

 Observable.timer(1000,TimeUnit.MILLISECONDS)
              .subscribe(new Action1<Long>() {
                  @Override
                  public void call(Long aLong) {
                      Log.d("JG",aLong.toString()); // 0
                  }
              });

interval: 創(chuàng)建一個按照給定的時間間隔發(fā)射從0開始的整數(shù)序列的Observable<Long>浮创,內(nèi)部通過OnSubscribeTimerPeriodically工作忧吟。

Observable.interval(1, TimeUnit.SECONDS)
              .subscribe(new Action1<Long>() {
                  @Override
                  public void call(Long aLong) {
                       //每隔1秒發(fā)送數(shù)據(jù)項(xiàng),從0開始計(jì)數(shù)
                       //0,1,2,3....
                  }
              });

range: 創(chuàng)建一個發(fā)射指定范圍的整數(shù)序列的Observable<Integer>

 Observable.range(2,5).subscribe(new Action1<Integer>() {
          @Override
          public void call(Integer integer) {
              Log.d("JG",integer.toString());// 2,3,4,5,6 從2開始發(fā)射5個數(shù)據(jù)
          }
      });

defer: 只有當(dāng)訂閱者訂閱才創(chuàng)建Observable斩披,為每個訂閱創(chuàng)建一個新的Observable溜族。內(nèi)部通過OnSubscribeDefer在訂閱時調(diào)用Func0創(chuàng)建Observable。

Observable.defer(new Func0<Observable<String>>() {
          @Override
          public Observable<String> call() {
              return Observable.just("hello");
          }
      }).subscribe(new Action1<String>() {
          @Override
          public void call(String s) {
              Log.d("JG",s);
          }
      });

合并操作

以下操作符用于組合多個Observable垦沉。
注意煌抒,為了使結(jié)構(gòu)更加清晰以及縮小代碼量,之后的例子部分地方將會使用Lambda表達(dá)式書寫厕倍,如果你對Lambda表達(dá)式不太熟悉的話寡壮,可以閱讀JAVA8 Lambda表達(dá)式完全解析這篇文章。

concat: 按順序連接多個Observables讹弯。需要注意的是Observable.concat(a,b)等價于a.concatWith(b)况既。

  Observable<Integer> observable1=Observable.just(1,2,3,4);
  Observable<Integer>  observable2=Observable.just(4,5,6);

  Observable.concat(observable1,observable2)
              .subscribe(item->Log.d("JG",item.toString()));//1,2,3,4,4,5,6

startWith: 在數(shù)據(jù)序列的開頭增加一項(xiàng)數(shù)據(jù)。startWith
的內(nèi)部也是調(diào)用了concat

 Observable.just(1,2,3,4,5)
              .startWith(6,7,8)
      .subscribe(item->Log.d("JG",item.toString()));//6,7,8,1,2,3,4,5

merge: 將多個Observable合并為一個闸婴。不同于concat坏挠,merge不是按照添加順序連接芍躏,而是按照時間線來連接邪乍。其中mergeDelayError
將異常延遲到其它沒有錯誤的Observable發(fā)送完畢后才發(fā)射。而merge
則是一遇到異常將停止發(fā)射數(shù)據(jù)对竣,發(fā)送onError通知庇楞。

zip: 使用一個函數(shù)組合多個Observable發(fā)射的數(shù)據(jù)集合嘿歌,然后再發(fā)射這個結(jié)果顶岸。如果多個Observable發(fā)射的數(shù)據(jù)量不一樣昔逗,則以最少的Observable為標(biāo)準(zhǔn)進(jìn)行壓合遮怜。內(nèi)部通過OperatorZip進(jìn)行壓合厂置。

 Observable<Integer>  observable1=Observable.just(1,2,3,4);
  Observable<Integer>  observable2=Observable.just(4,5,6);
      Observable.zip(observable1, observable2, new Func2<Integer, Integer, String>() {
          @Override
          public String call(Integer item1, Integer item2) {
              return item1+"and"+item2;
          }
      })
      .subscribe(item->Log.d("JG",item)); //1and4,2and5,3and6

combineLatest: 橱乱。當(dāng)兩個Observables中的任何一個發(fā)射了一個數(shù)據(jù)時弟塞,通過一個指定的函數(shù)組合每個Observable發(fā)射的最新數(shù)據(jù)(一共兩個數(shù)據(jù))稿械,然后發(fā)射這個函數(shù)的結(jié)果膜廊。類似于zip乏沸,但是,不同的是zip只有在每個Observable都發(fā)射了數(shù)據(jù)才工作爪瓜,而combineLatest任何一個發(fā)射了數(shù)據(jù)都可以工作蹬跃,每次與另一個Observable最近的數(shù)據(jù)壓合。具體請看下面流程圖铆铆。zip工作流程

zip流程圖

combineLatest工作流程


combineLatest流程

過濾操作
filter: 過濾數(shù)據(jù)蝶缀。內(nèi)部通過OnSubscribeFilter
過濾數(shù)據(jù)丹喻。

    Observable.just(3,4,5,6)
              .filter(new Func1<Integer, Boolean>() {
                  @Override
                  public Boolean call(Integer integer) {
                      return integer>4;
                  }
              })
      .subscribe(item->Log.d("JG",item.toString())); //5,6

ofType: 過濾指定類型的數(shù)據(jù),與filter類似翁都,

Observable.just(1,2,"3")
              .ofType(Integer.class)
              .subscribe(item -> Log.d("JG",item.toString()));

take: 只發(fā)射開始的N項(xiàng)數(shù)據(jù)或者一定時間內(nèi)的數(shù)據(jù)碍论。內(nèi)部通過OperatorTake和OperatorTakeTimed過濾數(shù)據(jù)。

Observable.just(3,4,5,6)
              .take(3)//發(fā)射前三個數(shù)據(jù)項(xiàng)
              .take(100, TimeUnit.MILLISECONDS)//發(fā)射100ms內(nèi)的數(shù)據(jù)

takeLast: 只發(fā)射最后的N項(xiàng)數(shù)據(jù)或者一定時間內(nèi)的數(shù)據(jù)柄慰。內(nèi)部通過OperatorTakeLast和OperatorTakeLastTimed過濾數(shù)據(jù)骑冗。takeLastBuffer和takeLast類似,不同點(diǎn)在于takeLastBuffer會收集成List后發(fā)射先煎。

 Observable.just(3,4,5,6)
              .takeLast(3)
              .subscribe(integer -> Log.d("JG",integer.toString()));//4,5,6

takeFirst:提取滿足條件的第一項(xiàng)贼涩。內(nèi)部實(shí)現(xiàn)源碼如下:

public final Observable<T> takeFirst(Func1<? super T, Boolean> predicate) {
        return filter(predicate).take(1); //先過濾,后提取
  }

first/firstOrDefault:只發(fā)射第一項(xiàng)(或者滿足某個條件的第一項(xiàng))數(shù)據(jù)薯蝎,可以指定默認(rèn)值遥倦。

   Observable.just(3,4,5,6)
              .first()
              .subscribe(integer -> Log.d("JG",integer.toString()));//3

      Observable.just(3,4,5,6)
                 .first(new Func1<Integer, Boolean>() {
                     @Override
                     public Boolean call(Integer integer) {
                         return integer>3;
                     }
                 }) .subscribe(integer -> Log.d("JG",integer.toString()));//4

last/lastOrDefault:只發(fā)射最后一項(xiàng)(或者滿足某個條件的最后一項(xiàng))數(shù)據(jù),可以指定默認(rèn)值占锯。

skip:跳過開始的N項(xiàng)數(shù)據(jù)或者一定時間內(nèi)的數(shù)據(jù)袒哥。內(nèi)部通過OperatorSkip和OperatorSkipTimed實(shí)現(xiàn)過濾。

  Observable.just(3,4,5,6)
                 .skip(1)
              .subscribe(integer -> Log.d("JG",integer.toString()));//4,5,6

skipLast:跳過最后的N項(xiàng)數(shù)據(jù)或者一定時間內(nèi)的數(shù)據(jù)消略。內(nèi)部通過OperatorSkipLast和OperatorSkipLastTimed實(shí)現(xiàn)過濾堡称。

elementAt/elementAtOrDefault:發(fā)射某一項(xiàng)數(shù)據(jù),如果超過了范圍可以的指定默認(rèn)值艺演。內(nèi)部通過OperatorElementAt過濾却紧。

 Observable.just(3,4,5,6)
               .elementAt(2)
      .subscribe(item->Log.d("JG",item.toString())); //5

ignoreElements:丟棄所有數(shù)據(jù),只發(fā)射錯誤或正常終止的通知胎撤。內(nèi)部通過OperatorIgnoreElements實(shí)現(xiàn)晓殊。

distinct:過濾重復(fù)數(shù)據(jù),內(nèi)部通過OperatorDistinct
實(shí)現(xiàn)伤提。

   Observable.just(3,4,5,6,3,3,4,9)
         .distinct()
        .subscribe(item->Log.d("JG",item.toString())); //3,4,5,6,9

distinctUntilChanged:過濾掉連續(xù)重復(fù)的數(shù)據(jù)巫俺。內(nèi)部通過OperatorDistinctUntilChanged實(shí)現(xiàn)

Observable.just(3,4,5,6,3,3,4,9)
         .distinctUntilChanged()
        .subscribe(item->Log.d("JG",item.toString())); //3,4,5,6,3,4,9

throttleFirst:定期發(fā)射Observable發(fā)射的第一項(xiàng)數(shù)據(jù)。內(nèi)部通過OperatorThrottleFirst實(shí)現(xiàn)肿男。

Observable.create(subscriber -> {
          subscriber.onNext(1);
          try {
              Thread.sleep(500);
          } catch (InterruptedException e) {
              throw Exceptions.propagate(e);
          }
          subscriber.onNext(2);
          try {
              Thread.sleep(500);
          } catch (InterruptedException e) {
              throw Exceptions.propagate(e);
          }

          subscriber.onNext(3);
          try {
              Thread.sleep(1000);
          } catch (InterruptedException e) {
              throw Exceptions.propagate(e);
          }
          subscriber.onNext(4);
          subscriber.onNext(5);
          subscriber.onCompleted();

      }).throttleFirst(999, TimeUnit.MILLISECONDS)
              .subscribe(item-> Log.d("JG",item.toString())); //結(jié)果為1,3,4

throttleWithTimeout/debounce:發(fā)射數(shù)據(jù)時介汹,如果兩次數(shù)據(jù)的發(fā)射間隔小于指定時間,就會丟棄前一次的數(shù)據(jù),直到指定時間內(nèi)都沒有新數(shù)據(jù)發(fā)射時才進(jìn)行發(fā)射

Observable.create(subscriber -> {
          subscriber.onNext(1);
          try {
              Thread.sleep(500);
          } catch (InterruptedException e) {
              throw Exceptions.propagate(e);
          }
          subscriber.onNext(2);
          try {
              Thread.sleep(500);
          } catch (InterruptedException e) {
              throw Exceptions.propagate(e);
          }

          subscriber.onNext(3);
          try {
              Thread.sleep(1000);
          } catch (InterruptedException e) {
              throw Exceptions.propagate(e);
          }
          subscriber.onNext(4);
          subscriber.onNext(5);
          subscriber.onCompleted();

      }).debounce(999, TimeUnit.MILLISECONDS)//或者為throttleWithTimeout(1000, TimeUnit.MILLISECONDS)
              .subscribe(item-> Log.d("JG",item.toString())); //結(jié)果為3,5

sample/throttleLast:定期發(fā)射Observable最近的數(shù)據(jù)舶沛。內(nèi)部通過OperatorSampleWithTime實(shí)現(xiàn)嘹承。

   Observable.create(subscriber -> {
          subscriber.onNext(1);
          try {
              Thread.sleep(500);
          } catch (InterruptedException e) {
              throw Exceptions.propagate(e);
          }
          subscriber.onNext(2);
          try {
              Thread.sleep(500);
          } catch (InterruptedException e) {
              throw Exceptions.propagate(e);
          }

          subscriber.onNext(3);
          try {
              Thread.sleep(1000);
          } catch (InterruptedException e) {
              throw Exceptions.propagate(e);
          }
          subscriber.onNext(4);
          subscriber.onNext(5);
          subscriber.onCompleted();

      }).sample(999, TimeUnit.MILLISECONDS)//或者為throttleLast(1000, TimeUnit.MILLISECONDS)
              .subscribe(item-> Log.d("JG",item.toString())); //結(jié)果為2,3,5

timeout: 如果原始Observable過了指定的一段時長沒有發(fā)射任何數(shù)據(jù),就發(fā)射一個異彻谕酰或者使用備用的Observable赶撰。

Observable.create(( subscriber) -> {
          subscriber.onNext(1);
          try {
              Thread.sleep(1000);
          } catch (InterruptedException e) {
              throw Exceptions.propagate(e);
          }
          subscriber.onNext(2);

          subscriber.onCompleted();

      }).timeout(999, TimeUnit.MILLISECONDS,Observable.just(99,100))//如果不指定備用Observable將會拋出異常
              .subscribe(item-> Log.d("JG",item.toString()),error->Log.d("JG","onError")); //結(jié)果為1,99,100  如果不指定備用Observable結(jié)果為1,onError
  }

條件/布爾操作

all: 判斷所有的數(shù)據(jù)項(xiàng)是否滿足某個條件,內(nèi)部通過OperatorAll
實(shí)現(xiàn)。

    Observable.just(2,3,4,5)
              .all(new Func1<Integer, Boolean>() {
                  @Override
                  public Boolean call(Integer integer) {
                      return integer>3;
                  }
              })
      .subscribe(new Action1<Boolean>() {
          @Override
          public void call(Boolean aBoolean) {
              Log.d("JG",aBoolean.toString()); //false
          }
      })
      ;

exists: 判斷是否存在數(shù)據(jù)項(xiàng)滿足某個條件豪娜。內(nèi)部通過OperatorAny
實(shí)現(xiàn)餐胀。

 Observable.just(2,3,4,5)
              .exists(integer -> integer>3)
              .subscribe(aBoolean -> Log.d("JG",aBoolean.toString())); //true

contains: 判斷在發(fā)射的所有數(shù)據(jù)項(xiàng)中是否包含指定的數(shù)據(jù),內(nèi)部調(diào)用的其實(shí)是exists

    Observable.just(2,3,4,5)
              .contains(3)
              .subscribe(aBoolean -> Log.d("JG",aBoolean.toString())); //true

sequenceEqual: 用于判斷兩個Observable發(fā)射的數(shù)據(jù)是否相同(數(shù)據(jù)瘤载,發(fā)射順序否灾,終止?fàn)顟B(tài))。

 Observable.sequenceEqual(Observable.just(2,3,4,5),Observable.just(2,3,4,5))
              .subscribe(aBoolean -> Log.d("JG",aBoolean.toString()));//true

isEmpty: 用于判斷Observable發(fā)射完畢時鸣奔,有沒有發(fā)射數(shù)據(jù)墨技。有數(shù)據(jù)false,如果只收到了onComplete通知則為true挎狸。

 Observable.just(3,4,5,6)
                 .isEmpty()
                .subscribe(item -> Log.d("JG",item.toString()));//false

amb: 給定多個Observable扣汪,只讓第一個發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù),其他Observable將會被忽略锨匆。

Observable<Integer> observable1=Observable.create(new Observable.OnSubscribe<Integer>() {
          @Override
          public void call(Subscriber<? super Integer> subscriber) {
              try {
                  Thread.sleep(1000);
              } catch (InterruptedException e) {
                  subscriber.onError(e);
              }
              subscriber.onNext(1);
              subscriber.onNext(2);
              subscriber.onCompleted();
          }
      }).subscribeOn(Schedulers.computation());

      Observable<Integer> observable2=Observable.create(subscriber -> {
          subscriber.onNext(3);
          subscriber.onNext(4);
          subscriber.onCompleted();
      });

      Observable.amb(observable1,observable2)
      .subscribe(integer -> Log.d("JG",integer.toString())); //3,4

switchIfEmpty: 如果原始Observable正常終止后仍然沒有發(fā)射任何數(shù)據(jù)崭别,就使用備用的Observable。

Observable.empty()
             .switchIfEmpty(Observable.just(2,3,4))
     .subscribe(o -> Log.d("JG",o.toString())); //2,3,4

defaultIfEmpty: 如果原始Observable正常終止后仍然沒有發(fā)射任何數(shù)據(jù)恐锣,就發(fā)射一個默認(rèn)值,內(nèi)部調(diào)用的switchIfEmpty茅主。

takeUntil: 當(dāng)發(fā)射的數(shù)據(jù)滿足某個條件后(包含該數(shù)據(jù)),或者第二個Observable發(fā)送完畢土榴,終止第一個Observable發(fā)送數(shù)據(jù)诀姚。

Observable.just(2,3,4,5)
              .takeUntil(new Func1<Integer, Boolean>() {
                  @Override
                  public Boolean call(Integer integer) {
                      return integer==4;
                  }
              }).subscribe(integer -> Log.d("JG",integer.toString())); //2,3,4

takeWhile: 當(dāng)發(fā)射的數(shù)據(jù)滿足某個條件時(不包含該數(shù)據(jù)),Observable終止發(fā)送數(shù)據(jù)玷禽。

Observable.just(2,3,4,5)
              .takeWhile(new Func1<Integer, Boolean>() {
                  @Override
                  public Boolean call(Integer integer) {
                      return integer==4;
                  }
              })
              .subscribe(integer -> Log.d("JG",integer.toString())); //2,3

skipUntil: 丟棄Observable發(fā)射的數(shù)據(jù)赫段,直到第二個Observable發(fā)送數(shù)據(jù)。(丟棄條件數(shù)據(jù))

skipWhile: 丟棄Observable發(fā)射的數(shù)據(jù)论衍,直到一個指定的條件不成立(不丟棄條件數(shù)據(jù))

聚合操作

reduce: 對序列使用reduce()函數(shù)并發(fā)射最終的結(jié)果,內(nèi)部使用OnSubscribeReduce實(shí)現(xiàn)瑞佩。

 Observable.just(2,3,4,5)
              .reduce(new Func2<Integer, Integer, Integer>() {
                  @Override
                  public Integer call(Integer sum, Integer item) {
                      return sum+item;
                  }
              })
              .subscribe(integer -> Log.d("JG",integer.toString()));//14

collect: 使用collect
收集數(shù)據(jù)到一個可變的數(shù)據(jù)結(jié)構(gòu)聚磺。

Observable.just(3,4,5,6)
                 .collect(new Func0<List<Integer>>() { //創(chuàng)建數(shù)據(jù)結(jié)構(gòu)

                     @Override
                     public List<Integer> call() {
                         return new ArrayList<Integer>();
                     }
                 }, new Action2<List<Integer>, Integer>() { //收集器
                     @Override
                     public void call(List<Integer> integers, Integer integer) {
                         integers.add(integer);
                     }
                 })
                .subscribe(new Action1<List<Integer>>() {
                    @Override
                    public void call(List<Integer> integers) {

                    }
                });

count/countLong: 計(jì)算發(fā)射的數(shù)量坯台,內(nèi)部調(diào)用的是reduce
.

轉(zhuǎn)換操作

toList: 收集原始Observable發(fā)射的所有數(shù)據(jù)到一個列表,然后返回這個列表.

      Observable.just(2,3,4,5)
              .toList()
              .subscribe(new Action1<List<Integer>>() {
                  @Override
                  public void call(List<Integer> integers) {

                  }
              });

toSortedList: 收集原始Observable發(fā)射的所有數(shù)據(jù)到一個有序列表瘫寝,然后返回這個列表蜒蕾。

     Observable.just(6,2,3,4,5)
              .toSortedList(new Func2<Integer, Integer, Integer>() {//自定義排序
                  @Override
                  public Integer call(Integer integer, Integer integer2) {
                      return integer-integer2; //>0 升序 ,<0 降序
                  }
              })
              .subscribe(new Action1<List<Integer>>() {
                  @Override
                  public void call(List<Integer> integers) {
                      Log.d("JG",integers.toString()); // [2, 3, 4, 5, 6]
                  }
              });

toMap: 將序列數(shù)據(jù)轉(zhuǎn)換為一個Map焕阿。我們可以根據(jù)數(shù)據(jù)項(xiàng)生成key和生成value咪啡。

 Observable.just(6,2,3,4,5)
          .toMap(new Func1<Integer, String>() {
              @Override
              public String call(Integer integer) {
                  return "key:" + integer; //根據(jù)數(shù)據(jù)項(xiàng)生成map的key
              }
          }, new Func1<Integer, String>() {
              @Override
              public String call(Integer integer) {
                  return "value:"+integer; //根據(jù)數(shù)據(jù)項(xiàng)生成map的kvalue
              }
          }).subscribe(new Action1<Map<String, String>>() {
      @Override
      public void call(Map<String, String> stringStringMap) {
          Log.d("JG",stringStringMap.toString()); // {key:6=value:6, key:5=value:5, key:4=value:4, key:2=value:2, key:3=value:3}
      }
  });

toMultiMap: 類似于toMap,不同的地方在于map的value是一個集合暮屡。變換操作

map: 對Observable發(fā)射的每一項(xiàng)數(shù)據(jù)都應(yīng)用一個函數(shù)來變換撤摸。

 Observable.just(6,2,3,4,5)
              .map(integer -> "item:"+integer)
              .subscribe(s -> Log.d("JG",s));//item:6,item:2....

cast: 在發(fā)射之前強(qiáng)制將Observable發(fā)射的所有數(shù)據(jù)轉(zhuǎn)換為指定類型
flatMap: 將Observable發(fā)射的數(shù)據(jù)變換為Observables集合,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進(jìn)一個單獨(dú)的Observable,內(nèi)部采用merge合并准夷。

Observable.just(2,3,5)
              .flatMap(new Func1<Integer, Observable<String>>() {
                  @Override
                  public Observable<String> call(Integer integer) {
                      return Observable.create(subscriber -> {
                          subscriber.onNext(integer*10+"");
                          subscriber.onNext(integer*100+"");
                          subscriber.onCompleted();
                      });
                  }
              })
      .subscribe(o -> Log.d("JG",o)) //20,200,30,300,50,500

flatMapIterable: 和flatMap的作用一樣钥飞,只不過生成的是Iterable而不是Observable。

 Observable.just(2,3,5)
              .flatMapIterable(new Func1<Integer, Iterable<String>>() {
                  @Override
                  public Iterable<String> call(Integer integer) {
                      return Arrays.asList(integer*10+"",integer*100+"");
                  }
              }).subscribe(new Action1<String>() {
                @Override
                public void call(String s) {

                }
      });

concatMap: 類似于flatMap衫嵌,由于內(nèi)部使用concat合并读宙,所以是按照順序連接發(fā)射。
switchMap: 和flatMap很像楔绞,將Observable發(fā)射的數(shù)據(jù)變換為Observables集合结闸,當(dāng)原始Observable發(fā)射一個新的數(shù)據(jù)(Observable)時,它將取消訂閱前一個Observable酒朵。

 Observable.create(new Observable.OnSubscribe<Integer>() {

          @Override
          public void call(Subscriber<? super Integer> subscriber) {
              for(int i=1;i<4;i++){
                  subscriber.onNext(i);
                  Utils.sleep(500,subscriber);//線程休眠500ms
              }

              subscriber.onCompleted();
          }
      }).subscribeOn(Schedulers.newThread())
        .switchMap(new Func1<Integer, Observable<Integer>>() {
               @Override
             public Observable<Integer> call(Integer integer) {
                     //每當(dāng)接收到新的數(shù)據(jù)桦锄,之前的Observable將會被取消訂閱
                      return Observable.create(new Observable.OnSubscribe<Integer>() {
                          @Override
                          public void call(Subscriber<? super Integer> subscriber) {
                              subscriber.onNext(integer*10);
                              Utils.sleep(500,subscriber);
                              subscriber.onNext(integer*100);
                              subscriber.onCompleted();
                          }
                      }).subscribeOn(Schedulers.newThread());
                  }
              })
              .subscribe(s -> Log.d("JG",s.toString()));//10,20,30,300

scan: 與reduce很像,對Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個函數(shù)蔫耽,然后按順序依次發(fā)射每一個值察纯。

Observable.just(2,3,5)
              .scan(new Func2<Integer, Integer, Integer>() {
                  @Override
                  public Integer call(Integer sum, Integer item) {
                      return sum+item;
                  }
              })
      .subscribe(integer -> Log.d("JG",integer.toString())) //2,5,10

groupBy: 將Observable分拆為Observable集合,將原始Observable發(fā)射的數(shù)據(jù)按Key分組针肥,每一個Observable發(fā)射一組不同的數(shù)據(jù)饼记。

Observable.just(2,3,5,6)
              .groupBy(new Func1<Integer, String>() {
                  @Override
                  public String call(Integer integer) {//分組
                      return integer%2==0?"偶數(shù)":"奇數(shù)";
                  }
              })
      .subscribe(new Action1<GroupedObservable<String, Integer>>() {
          @Override
          public void call(GroupedObservable<String, Integer> o) {

              o.subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("JG",o.getKey()+":"+integer.toString()); //偶數(shù):2,奇數(shù):3慰枕,...
                  }
              });
          }
      })

buffer: 它定期從Observable收集數(shù)據(jù)到一個集合具则,然后把這些數(shù)據(jù)集合打包發(fā)射,而不是一次發(fā)射一個

 Observable.just(2,3,5,6)
              .buffer(3)
              .subscribe(new Action1<List<Integer>>() {
                  @Override
                  public void call(List<Integer> integers) {

                  }
              })

window: 定期將來自O(shè)bservable的數(shù)據(jù)分拆成一些Observable窗口具帮,然后發(fā)射這些窗口博肋,而不是每次發(fā)射一項(xiàng)。

 Observable.just(2,3,5,6)
              .window(3)
              .subscribe(new Action1<Observable<Integer>>() {
                  @Override
                  public void call(Observable<Integer> integerObservable) {
                      integerObservable.subscribe(new Action1<Integer>() {
                          @Override
                          public void call(Integer integer) {

                          }
                      });
                  }
              })

錯誤處理/重試機(jī)制
onErrorResumeNext: 當(dāng)原始Observable在遇到錯誤時蜂厅,使用備用Observable匪凡。。

Observable.just(1,"2",3)
      .cast(Integer.class)
      .onErrorResumeNext(Observable.just(1,2,3))
      .subscribe(integer -> Log.d("JG",integer.toString())) //1,2,3
      ;

onExceptionResumeNext: 當(dāng)原始Observable在遇到異常時掘猿,使用備用的Observable病游。與onErrorResumeNext
類似,區(qū)別在于onErrorResumeNext
可以處理所有的錯誤稠通,onExceptionResumeNext只能處理異常衬衬。
onErrorReturn: 當(dāng)原始Observable在遇到錯誤時發(fā)射一個特定的數(shù)據(jù)。

 Observable.just(1,"2",3)
              .cast(Integer.class)
              .onErrorReturn(new Func1<Throwable, Integer>() {
                  @Override
                  public Integer call(Throwable throwable) {
                      return 4;
                  }
              }).subscribe(new Action1<Integer>() {
          @Override
          public void call(Integer integer) {
              Log.d("JG",integer.toString());1,4
          }
      });

retry: 當(dāng)原始Observable在遇到錯誤時進(jìn)行重試改橘。

 Observable.just(1,"2",3)
      .cast(Integer.class)
      .retry(3)
      .subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError"))
      ;//1,1,1,1,onError

retryWhen: 當(dāng)原始Observable在遇到錯誤滋尉,將錯誤傳遞給另一個Observable來決定是否要重新訂閱這個Observable,內(nèi)部調(diào)用的是retry

Observable.just(1,"2",3)
      .cast(Integer.class)
      .retryWhen(new Func1<Observable<? extends Throwable>, Observable<Long>>() {
          @Override
          public Observable<Long> call(Observable<? extends Throwable> observable) {
              return Observable.timer(1, TimeUnit.SECONDS);
          }
      })
      .subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError"));
      //1,1

連接操作

ConnectableObservable與普通的Observable差不多飞主,但是可連接的Observable在被訂閱時并不開始發(fā)射數(shù)據(jù)狮惜,只有在它的connect()被調(diào)用時才開始高诺。用這種方法,你可以等所有的潛在訂閱者都訂閱了這個Observable之后才開始發(fā)射數(shù)據(jù)碾篡。ConnectableObservable.connect()指示一個可連接的Observable開始發(fā)射數(shù)據(jù).Observable.publish()將一個Observable轉(zhuǎn)換為一個可連接的ObservableObservable.replay()確保所有的訂閱者看到相同的數(shù)據(jù)序列的ConnectableObservable懒叛,即使它們在Observable開始發(fā)射數(shù)據(jù)之后才訂閱
ConnectableObservable.refCount()
讓一個可連接的Observable表現(xiàn)得像一個普通的Observable耽梅。

   ConnectableObservable<Integer> co= Observable.just(1,2,3)
                .publish();

        co .subscribe(integer -> Log.d("JG",integer.toString()) );
        co.connect();//此時開始發(fā)射數(shù)據(jù)

工具集

materialize: 將Observable轉(zhuǎn)換成一個通知列表薛窥。

   Observable.just(1,2,3)
             .materialize()
             .subscribe(new Action1<Notification<Integer>>() {
                 @Override
                 public void call(Notification<Integer> notification) {
                     Log.d("JG",notification.getKind()+" "+notification.getValue());
                     //OnNext 1
                     //OnNext 2
                     //OnNext 3
                     //OnCompleted null
                 }
             });

dematerialize: 與上面的作用相反,將通知逆轉(zhuǎn)回一個Observable。
timestamp: 給Observable發(fā)射的每個數(shù)據(jù)項(xiàng)添加一個時間戳滩租。

 Observable.just(1,2,3)
             .timestamp()
             .subscribe(new Action1<Timestamped<Integer>>() {
                 @Override
                 public void call(Timestamped<Integer> timestamped) {
                     Log.d("JG",timestamped.getTimestampMillis()+" "+timestamped.getValue());
                     //1472627510548 1
                     //1472627510549 2
                     //1472627510549 3
                 }
             });

timeInterval:給Observable發(fā)射的兩個數(shù)據(jù)項(xiàng)間添加一個時間差,實(shí)現(xiàn)在OperatorTimeInterval

timeInterval

serialize: 強(qiáng)制Observable按次序發(fā)射數(shù)據(jù)并且要求功能是完好的
cache: 緩存Observable發(fā)射的數(shù)據(jù)序列并發(fā)射相同的數(shù)據(jù)序列給后續(xù)的訂閱者
observeOn: 指定觀察者觀察Observable的調(diào)度器
subscribeOn: 指定Observable執(zhí)行任務(wù)的調(diào)度器
doOnEach: 注冊一個動作樟遣,對Observable發(fā)射的每個數(shù)據(jù)項(xiàng)使用

 Observable.just(2,3)
              .doOnEach(new Action1<Notification<? super Integer>>() {
                  @Override
                  public void call(Notification<? super Integer> notification) {
                      Log.d("JG","--doOnEach--"+notification.toString());
                  }
              })
              .subscribe(integer -> Log.d("JG",integer.toString()));
  //結(jié)果為:            
   // --doOnEach--[rx.Notification@133c40b0 OnNext 2]
  // 2
   // --doOnEach--[rx.Notification@133c40b0 OnNext 3]
  // 3
 // --doOnEach--[rx.Notification@df4db0e OnCompleted]

doOnCompleted: 注冊一個動作脱篙,對正常完成的Observable使用

doOnError: 注冊一個動作,對發(fā)生錯誤的Observable使用
doOnTerminate:注冊一個動作蔗彤,對完成的Observable使用,無論是否發(fā)生錯誤

    Observable.just(2,3)
              .doOnTerminate(new Action0() {
                  @Override
                  public void call() {
                      Log.d("JG","--doOnTerminate--");
                  }
              })
              .subscribe(integer -> Log.d("JG",integer.toString()));
  // 2 , 3 , --doOnTerminate--

doOnSubscribe: 注冊一個動作农猬,在觀察者訂閱時使用。內(nèi)部由OperatorDoOnSubscribe實(shí)現(xiàn)

doOnUnsubscribe: 注冊一個動作递瑰,在觀察者取消訂閱時使用慎颗。內(nèi)部由OperatorDoOnUnsubscribe實(shí)現(xiàn)辆憔,在call中加入一個解綁動作。

doOnUnsubscribe

finallyDo/doAfterTerminate: 注冊一個動作,在Observable完成時使用

 Observable.just(2,3)
              .doAfterTerminate(new Action0() {
                  @Override
                  public void call() {
                      Log.d("JG","--doAfterTerminate--");
                  }
              })
              .subscribe(integer -> Log.d("JG",integer.toString()));
  //2,3,  --doAfterTerminate--

delay: 延時發(fā)射Observable的結(jié)果豺总。即讓原始Observable在發(fā)射每項(xiàng)數(shù)據(jù)之前都暫停一段指定的時間段。效果是Observable發(fā)射的數(shù)據(jù)項(xiàng)在時間上向前整體平移了一個增量(除了onError,它會即時通知)。

delaySubscription: 延時處理訂閱請求。實(shí)現(xiàn)在OnSubscribeDelaySubscription

delaySubscription

using: 創(chuàng)建一個只在Observable生命周期存在的資源,當(dāng)Observable終止時這個資源會被自動釋放剔氏。

Observable.using(new Func0<File>() {//資源工廠
          @Override
          public File call() {

              File file = new File(getCacheDir(), "a.txt");
              if(!file.exists()){
                  try {
                      Log.d("JG","--create--");
                      file.createNewFile();
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
              }
              return file;
          }
      }, new Func1<File, Observable<String>>() { //Observable
          @Override
          public Observable<String> call(File file) {
              return Observable.just(file.exists() ? "exist" : "no exist");
          }
      }, new Action1<File>() {//釋放資源動作
          @Override
          public void call(File file) {
              if(file!=null&&file.exists()){
                  Log.d("JG","--delete--");
                  file.delete();
              }
          }
      })
      .subscribe(s -> Log.d("JG",s))
      ;
   //--create--
   //exist
   //--delete--

single/singleOrDefault: 強(qiáng)制返回單個數(shù)據(jù),否則拋出異秤却В或默認(rèn)數(shù)據(jù)莽龟。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末酌媒,一起剝皮案震驚了整個濱河市标捺,隨后出現(xiàn)的幾起案子冤今,更是在濱河造成了極大的恐慌戴而,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,427評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件翩蘸,死亡現(xiàn)場離奇詭異所意,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)催首,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,551評論 3 395
  • 文/潘曉璐 我一進(jìn)店門扶踊,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人郎任,你說我怎么就攤上這事秧耗。” “怎么了舶治?”我有些...
    開封第一講書人閱讀 165,747評論 0 356
  • 文/不壞的土叔 我叫張陵分井,是天一觀的道長。 經(jīng)常有香客問我霉猛,道長尺锚,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,939評論 1 295
  • 正文 為了忘掉前任惜浅,我火速辦了婚禮瘫辩,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己伐厌,他們只是感情好承绸,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,955評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著挣轨,像睡著了一般军熏。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上刃唐,一...
    開封第一講書人閱讀 51,737評論 1 305
  • 那天羞迷,我揣著相機(jī)與錄音界轩,去河邊找鬼画饥。 笑死,一個胖子當(dāng)著我的面吹牛浊猾,可吹牛的內(nèi)容都是我干的抖甘。 我是一名探鬼主播,決...
    沈念sama閱讀 40,448評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼葫慎,長吁一口氣:“原來是場噩夢啊……” “哼衔彻!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起偷办,我...
    開封第一講書人閱讀 39,352評論 0 276
  • 序言:老撾萬榮一對情侶失蹤艰额,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后椒涯,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體柄沮,經(jīng)...
    沈念sama閱讀 45,834評論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,992評論 3 338
  • 正文 我和宋清朗相戀三年废岂,在試婚紗的時候發(fā)現(xiàn)自己被綠了祖搓。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,133評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡湖苞,死狀恐怖拯欧,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情财骨,我是刑警寧澤镐作,帶...
    沈念sama閱讀 35,815評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站隆箩,受9級特大地震影響该贾,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜摘仅,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,477評論 3 331
  • 文/蒙蒙 一靶庙、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦六荒、人聲如沸护姆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,022評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽卵皂。三九已至,卻和暖如春砚亭,著一層夾襖步出監(jiān)牢的瞬間灯变,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,147評論 1 272
  • 我被黑心中介騙來泰國打工捅膘, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留添祸,地道東北人。 一個月前我還...
    沈念sama閱讀 48,398評論 3 373
  • 正文 我出身青樓寻仗,卻偏偏與公主長得像刃泌,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子署尤,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,077評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 作者: maplejaw本篇只解析標(biāo)準(zhǔn)包中的操作符耙替。對于擴(kuò)展包,由于使用率較低曹体,如有需求俗扇,請讀者自行查閱文檔。 創(chuàng)...
    maplejaw_閱讀 45,674評論 8 93
  • 注:只包含標(biāo)準(zhǔn)包中的操作符箕别,用于個人學(xué)習(xí)及備忘參考博客:http://blog.csdn.net/maplejaw...
    小白要超神閱讀 2,195評論 2 8
  • 創(chuàng)建操作 用于創(chuàng)建Observable的操作符Create通過調(diào)用觀察者的方法從頭創(chuàng)建一個ObservableEm...
    rkua閱讀 1,829評論 0 1
  • ReactiveX操作符 1. RxJava操作符介紹 創(chuàng)建操作 Create 從頭創(chuàng)建一個Observable ...
    夢sora閱讀 852評論 0 4
  • 這個頁面展示了創(chuàng)建Observable的各種方法镶殷。 **just(?)** — 將一個或多個對象轉(zhuǎn)換成發(fā)射這個或這...
    大于于閱讀 1,015評論 0 7