RxJava操作符(一)

注:只包含標準包中的操作符个绍,用于個人學習及備忘
參考博客:http://blog.csdn.net/maplejaw_/article/details/52396175

本篇將介紹rxjava中的創(chuàng)建操作巴柿、合并操作篮洁、過濾操作、條件/布爾操作蜗侈、聚合操作、轉換操作以及變換操作该面,只針對用法不涉及原理信卡,對RxJava不熟悉的可參考:http://gank.io/post/560e15be2dca930e00da1083

創(chuàng)建操作

  • create:使用OnSubscrib直接創(chuàng)建一個Observable

     Observable.create(new Observable.OnSubscribe<String>() {
          @Override
          public void call(Subscriber<? super String> subscriber) {
              subscriber.onNext("item1");
              subscriber.onNext("item2");
              subscriber.onCompleted();
          }
      });
    
  • from:將數(shù)組或集合拆分成具體對象后,轉換成發(fā)送這些對象的Observable

      String[] arr = {"item1", "item2", "item3"};
      Observable.from(arr)
              .subscribe(new Action1<String>() {
                  @Override
                  public void call(String s) {
                      Log.d("debug", s);      //調用多次猾瘸,分別打印出item1,item2,item3
                  }
              });
    
  • just:將一個或多個對象轉換成發(fā)送這些對象的Obserbable

      Observable.just("item1","item2","item3")
              .subscribe(new Action1<String>() {
                  @Override
                  public void call(String s) {
                      Log.d("debug", s);      //調用多次,分別打印出item1,item2,item3
                  }
              });
    
  • empty:創(chuàng)建一個直接通知完成的Observable

  • error:創(chuàng)建一個直接通知錯誤的Observable

  • never:創(chuàng)建一個什么都不做的Observable

      Observable observable1 = Observable.empty();    //直接調用onCompleted()方法
      Observable observable2 = Observable.error(new RuntimeException());  //直接調用onError()方法
      Observable observable3 = Observable.never();    //onNext(),onCompleted(),onError()均不調用
    
  • timer:創(chuàng)建一個延時發(fā)射數(shù)據(jù)的Observable

      Observable.timer(1000, TimeUnit.MILLISECONDS)
              .subscribe(new Action1<Long>() {
                  @Override
                  public void call(Long aLong) {
                      Log.d("debug", aLong.toString());   //aLong為0
                  }
              });
    
  • interval:創(chuàng)建一個按照給定的時間間隔發(fā)射送0開始的整數(shù)序列的Obervable

      Observable.interval(2, 1, TimeUnit.SECONDS)
              .subscribe(new Action1<Long>() {
                  @Override
                  public void call(Long aLong) {
                      //等待2秒后開始發(fā)射數(shù)據(jù)揽思,發(fā)射的時間間隔為1秒,從0開始計數(shù)
                  }
              });
    
      Observable.interval(1, TimeUnit.SECONDS)
              .subscribe(new Action1<Long>() {
                  @Override
                  public void call(Long aLong) {
                      //等待1秒后開始發(fā)射數(shù)據(jù)儡湾,發(fā)射的時間間隔為1秒徐钠,從0開始計數(shù)
                      //相當于Observable.interval(1, 1, TimeUnit.SECONDS)
                  }
              });
    
  • range:創(chuàng)建一個發(fā)射指定范圍的整數(shù)序列的Observable

      Observable.range(3, 4)
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString());//依次發(fā)射3,4,5,6,從3開始發(fā)射4個數(shù)據(jù)
                  }
              });
    
  • defer:觀察者訂閱時才創(chuàng)建Observable,每次訂閱返回一個新的Observable

      Observable.defer(new Func0<Observable<String>>() {
          @Override
          public Observable<String> call() {
              return Observable.just("s");
          }
      }).subscribe(new Action1<String>() {
          @Override
          public void call(String s) {
              Log.d("debug", s);      //打印s
          }
      });
    

合并操作(用于組合多個Observavle)

  • concat:按順序連接多個Observable,注:Observable.concat(a,b)等價于a.concatWith(b)

      Observable<Integer> observable1 = Observable.just(1, 2, 3);
      Observable<Integer> observable2 = Observable.just(4, 5, 6);
    
      Observable.concat(observable1, observable2)
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString()); //打印1,2,3,4,5,6
                  }
              });
    
  • startWith:在數(shù)據(jù)序列的開頭增加一項數(shù)據(jù)爹袁,內部調用concat

     Observable.just(1, 2, 3)
             .startWith(Observable.just(4, 5))   //添加一個Observable
             .subscribe(new Action1<Integer>() {
                 @Override
                 public void call(Integer integer) {
                     Log.d("debug", integer.toString()); //打印4,5,1,2,3
                 }
             });
    
     Observable.just(1,2,3)
             .startWith(4,5)     //添加多個數(shù)據(jù)
             .subscribe(new Action1<Integer>() {
                 @Override
                 public void call(Integer integer) {
                     Log.d("debug", integer.toString()); //打印4,5,1,2,3
                 }
             });
    
     List<Integer> integers = new ArrayList<>();
     integers.add(4);
     integers.add(5);
     Observable.just(1,2,3)
             .startWith(integers)    //添加一個集合
             .subscribe(new Action1<Integer>() {
                 @Override
                 public void call(Integer integer) {
                     Log.d("debug", integer.toString()); //打印4,5,1,2,3
                 }
             });
    
  • merge / mergeDelayError:將多個Observable合并為一個譬淳。不同于concat邻梆,merge不是按照添加順序連接,而是按照時間線來連接剂娄。其中mergeDelayError將異常延遲到其它沒有錯誤的Observable發(fā)送完畢后才發(fā)射。而merge則是一遇到異常將停止發(fā)射數(shù)據(jù),發(fā)送onError通知

merge工作流程
    Observable<Integer> observable1 = Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            SystemClock.sleep(1000);
            subscriber.onNext(2);
            subscriber.onNext(3);
            subscriber.onCompleted();
        }
    }).subscribeOn(Schedulers.computation());

    Observable<Integer> observable2 = Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            SystemClock.sleep(500);
            subscriber.onNext(4);
            subscriber.onNext(5);
            SystemClock.sleep(1000);
            subscriber.onNext(6);
            subscriber.onCompleted();
        }
    }).subscribeOn(Schedulers.computation());

    Observable.merge(observable1, observable2)
            .subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    Log.d("debug", integer.toString()); //打印1庭砍,4怠缸,5揭北,2,3疚俱,6
                }
            });
  • zip:使用一個函數(shù)組合多個Observable發(fā)射的數(shù)據(jù)集合呆奕,再發(fā)射這個結果。如果多個Observable發(fā)射的數(shù)據(jù)量不一樣姆泻,則以最少的Observable為標準進行壓合
zip工作流程
    Observable<Integer> observable1 = Observable.just(1, 2, 3, 4, 5);
    Observable<String> observable2 = Observable.just("A", "B", "C", "D");
    Observable.zip(observable1, observable2, new Func2<Integer, String, String>() {
        @Override
        public String call(Integer integer, String s) {
            return integer + s;
        }
    }).subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            Log.d("debug", s);  //打印1A,2B,3C,4D
        }
    });
  • combineLatest:當兩個Observable中任何一個發(fā)射了一個數(shù)據(jù)時四苇,通過一個指定的函數(shù)組合每個Observable發(fā)射的最新數(shù)據(jù)(一共兩個數(shù)據(jù))蛔琅,然后發(fā)射這個函數(shù)的結果罗售。類似于zip,但是寨躁,不同的是zip只有在每個Observable都發(fā)射了數(shù)據(jù)才工作,而combineLatest任何一個發(fā)射了數(shù)據(jù)都可以工作放钦,每次與另一個Observable最近的數(shù)據(jù)壓合
combineLatest工作流程
    Observable<Integer> observable1 = Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            subscriber.onNext(1);
            SystemClock.sleep(500);
            subscriber.onNext(2);
            SystemClock.sleep(1000);
            subscriber.onNext(3);
            SystemClock.sleep(300);
            subscriber.onNext(4);
            SystemClock.sleep(500);
            subscriber.onNext(5);
            subscriber.onCompleted();
        }
    }).subscribeOn(Schedulers.computation());

    Observable<String> observable2 = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            SystemClock.sleep(300);
            subscriber.onNext("A");
            SystemClock.sleep(300);
            subscriber.onNext("B");
            SystemClock.sleep(500);
            subscriber.onNext("C");
            subscriber.onNext("D");
            subscriber.onCompleted();
        }
    }).subscribeOn(Schedulers.computation());

    Observable.combineLatest(observable1, observable2, new Func2<Integer, String, String>() {
        @Override
        public String call(Integer integer, String s) {
            return integer + s;
        }
    }).subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            Log.d("debug", s);  //打印1A,2A,2B,2C,2D,3D,4D,5D
        }
    });

過濾操作

  • filter:過濾數(shù)據(jù)

      Observable.just(1, 2, 3, 4)
              .filter(new Func1<Integer, Boolean>() {
                  @Override
                  public Boolean call(Integer integer) {
                      return integer % 2 == 0;    //過濾偶數(shù)
                  }
              })
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString()); //打印2横腿,4
                  }
              });
    
  • ofType:過濾指定類型數(shù)據(jù)

      Observable.just(1, "2", 3, "4")
              .ofType(Integer.class)  //過濾整形數(shù)據(jù)
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString()); //打印1耿焊,3
                  }
              });
    
  • take:只發(fā)射前n項數(shù)據(jù)或者一定時間內的數(shù)據(jù)(無需考慮索引越界問題器腋,配合interval操作符可作為定時器使用)

      Observable.just(1, 2, 3, 4)
              .take(2)    //只發(fā)射前2項
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString()); //打印1蒂培,2
                  }
              });
    
      Observable.interval(1, TimeUnit.SECONDS)
              .take(3, TimeUnit.SECONDS)  //只發(fā)射3秒內的數(shù)據(jù)
              .subscribe(new Action1<Long>() {
                  @Override
                  public void call(Long aLong) {
                      //打印0,1(打印出來的并不是相像中的0,1,2,應該與代碼代碼執(zhí)行的時間有關媳荒,使用時需要注意G怼)
                      Log.d("debug", aLong.toString());   
                  }
              });
    
  • takeLast:只發(fā)射最后的N項數(shù)據(jù)或者一定時間內的數(shù)據(jù)(無需考慮索引越界問題)

      Observable.just(1, 2, 3, 4)
              .takeLast(3)    //只發(fā)射后3項
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString()); //打印2衔沼,3指蚁,4
                  }
              });
    
      Observable.interval(1, TimeUnit.SECONDS)
              .take(10)   //每1秒發(fā)射一個數(shù)據(jù)凝化,發(fā)射10秒
              .takeLast(3, TimeUnit.SECONDS)  //只發(fā)射最后3秒的數(shù)據(jù)
              .subscribe(new Action1<Long>() {
                  @Override
                  public void call(Long aLong) {
                      Log.d("debug", aLong.toString());   //打印6,7枪向,8遣疯,9(同樣存在些許誤差数苫,使用時需注意!)
                  }
              });
    
      Observable.interval(1, TimeUnit.SECONDS)
              .take(10)
              .takeLast(2, 3, TimeUnit.SECONDS)   //只發(fā)射最后3秒內的最后2個數(shù)據(jù)
              .subscribe(new Action1<Long>() {
                  @Override
                  public void call(Long aLong) {
                      Log.d("debug", aLong.toString());   //打印8虐急,9
                  }
              });
    
  • takeFirst:只發(fā)射滿足條件的第一項(其實就是filter+take)

      Observable.just(1, 2, 3, 4)
              .takeFirst(new Func1<Integer, Boolean>() {
                  @Override
                  public Boolean call(Integer integer) {
                      return integer > 1;
                  }
              })
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString()); //打印2
                  }
              });
    

*first / firstOrDefault:只發(fā)射第一項或者滿足條件的第一項數(shù)據(jù)被辑,其中firstOrDefault可以指定默認值(建議使用firstOrDefault盼理,找不到對應元素時first會報異常)

    Observable.just(1, 2, 3)
            .first()    //發(fā)射第一項
            .subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    Log.d("debug", integer.toString()); //打印1
                }
            });

    Observable.just(1, 2, 3, 4)
            .first(new Func1<Integer, Boolean>() {  //發(fā)射大于2的第一項
                @Override
                public Boolean call(Integer integer) {
                    return integer > 2;
                }
            })
            .subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    Log.d("debug", integer.toString()); //打印3
               }
            });

    Integer[] arr = {};
    Observable.from(arr)
            .firstOrDefault(2)  //發(fā)射第一項奏路,沒有可發(fā)射的數(shù)據(jù)時鸽粉,發(fā)射默認值2
            .subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    Log.d("debug", integer.toString()); //打印2
                }
            });
  • last / lastOrDefault:只發(fā)射最后一項或滿足條件的最后一項,其中l(wèi)astOrDefault可以指定默認值

      Observable.just(1, 2, 3)
              .last()    //發(fā)射最后一項
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString()); //打印3
                  }
              });
    
      Observable.just(1, 2, 3, 4)
              .last(new Func1<Integer, Boolean>() {  //發(fā)射大于2的最后一項
                  @Override
                  public Boolean call(Integer integer) {
                      return integer > 2;
                  }
              })
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString()); //打印4
                 }
              });
    
      Integer[] arr = {};
      Observable.from(arr)
              .lastOrDefault(2)  //發(fā)射最后一項,沒有可發(fā)射的數(shù)據(jù)時椒舵,發(fā)射默認值2
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString()); //打印2
                  }
              });
    
  • skip:跳過開始的n項數(shù)據(jù)或者一定時間內的數(shù)據(jù)(與take類似)

      Observable.just(1, 2, 3, 4)
              .skip(2)    //跳過前2項
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString());     //打印3,4
                  }
              });
    
      Observable.interval(1, TimeUnit.SECONDS)
              .take(5)
              .skip(3, TimeUnit.SECONDS)   //跳過前3秒
              .subscribe(new Action1<Long>() {
                  @Override
                  public void call(Long aLong) {
                      Log.d("debug", aLong.toString());  //打印2泼橘,3炬灭,4,同樣存在誤差!
                  }
              });
    
  • skipLast:跳過最后的n項數(shù)據(jù)或一定時間內的數(shù)據(jù)

      Observable.just(1, 2, 3, 4)
              .skipLast(2)    //跳過最后2項
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString());     //打印1鼻吮,2
                  }
              });
    
      Observable.interval(1, TimeUnit.SECONDS)
              .take(7)
              .skipLast(3, TimeUnit.SECONDS)   //跳過最后3秒
              .subscribe(new Action1<Long>() {
                  @Override
                  public void call(Long aLong) {
                      Log.d("debug", aLong.toString());  //打印0,1香椎,2,同樣存在誤差畜伐!
                  }
              });
    
  • elementAt / elementAtOrDefault:發(fā)射某一項數(shù)據(jù)讼积,其中elementAtOrDefault可以指定索引越界時發(fā)射的默認值

      Observable.just(1, 2, 3, 4)
              .elementAt(2)   //發(fā)射索引為2的數(shù)據(jù)
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString()); //打印3
                  }
              });
    
      Observable.just(1, 2, 3)
              .elementAtOrDefault(4, 5)   //發(fā)射索引為4的數(shù)據(jù)勤众,索引越界時發(fā)射默認數(shù)據(jù)5
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString()); //打印5
                  }
              });
    
  • ignoreElements:丟棄所有數(shù)據(jù),只發(fā)射錯誤或正常終止的通知窥突,即只觸發(fā)觀察者的onError()或onCompleted()方法

  • distinct:過濾重復數(shù)據(jù),可指定判定唯一的標準

      Observable.just(1, 1, 2, 3, 2, 4)
              .distinct()
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString()); //打印1称近,2刨秆,3,4
                  }
              });
    
      Observable.just(1, 1, 2, 3, 2, 4)
              //根據(jù)發(fā)射的數(shù)據(jù)生成對應的key缓醋,通過key值來判斷唯一改衩,如果兩個數(shù)據(jù)的key相同竭鞍,則只發(fā)射第一個
              .distinct(new Func1<Integer, Integer>() {   
                  @Override
                  public Integer call(Integer integer) {
                      //奇數(shù)對應的key為1冯乘,偶數(shù)對應的key為2
                      if (integer % 2 == 0) {
                          return 2;
                      }
                      return 1;
                  }
              })
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString()); //打印1裆馒,2
                  }
              });
    
  • distinctUntilChanged:過濾掉連續(xù)重復的數(shù)據(jù)翔横,可指定判定唯一的標準

      Observable.just(1, 1, 2, 3, 2, 4)
              .distinctUntilChanged()
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString()); //打印1禾唁,2,3掘托,2闪盔,4
                  }
              });
    
      Observable.just(1, 1, 2, 3, 2, 4)
              //根據(jù)發(fā)射的數(shù)據(jù)生成對應的key,通過key值來判斷唯一族淮,如果兩個數(shù)據(jù)的key相同,則只發(fā)射第一個
              .distinctUntilChanged(new Func1<Integer, Integer>() {
                  @Override
                  public Integer call(Integer integer) {
                      //奇數(shù)對應的key為1蝙斜,偶數(shù)對應的key為2
                      if (integer % 2 == 0) {
                          return 2;
                      }
                      return 1;
                  }
              })
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString()); //打印1,2稚伍,3,2
                  }
              });
    
      Observable.just(1, 1, 2, 3, 2, 4)
              //傳入比較器的方式
              .distinctUntilChanged(new Func2<Integer, Integer, Boolean>() {
                  @Override
                  public Boolean call(Integer integer, Integer integer2) {
                      return integer % 2 == integer2 % 2; //同為奇數(shù)或偶數(shù)返回true
                  }
              })
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString()); //打印1垦搬,2对雪,3慌植,2
                  }
              });
    
  • throttleFirst:定期發(fā)射Observable在該時間段發(fā)射的第一項數(shù)據(jù)

      Observable.interval(0, 500, TimeUnit.MILLISECONDS)
              .take(10)   //每500毫秒發(fā)射一次數(shù)據(jù),發(fā)射10次
              .throttleFirst(1000, TimeUnit.MILLISECONDS) //每1秒發(fā)射該秒內發(fā)射數(shù)據(jù)中的第一項數(shù)據(jù)
              .subscribe(new Action1<Long>() {
                  @Override
                  public void call(Long aLong) {
                      //打印0,2芙扎,5戒洼,8(即第一秒發(fā)射0,1磷蜀,第二秒發(fā)射2,3庶弃,4,第三秒發(fā)射5,6,7鸡捐,第四秒發(fā)射8煎源,9)歇僧,同樣存在誤差兽埃!
                      Log.d("debug", aLong.toString());   
                  }
              });
    
  • throttleWithTimeout / debounce(兩者使用及效果相同):發(fā)射數(shù)據(jù)時舷夺,如果兩次數(shù)據(jù)的發(fā)射間隔小于指定時間,就會丟棄前一次的數(shù)據(jù),直到指定時間內都沒有新數(shù)據(jù)發(fā)射時才進行發(fā)射

      Observable.create(new Observable.OnSubscribe<Integer>() {
          @Override
          public void call(Subscriber<? super Integer> subscriber) {  //依次發(fā)射1-6,發(fā)射間隔不同
              subscriber.onNext(1);
              SystemClock.sleep(500);
              subscriber.onNext(2);
              SystemClock.sleep(500);
              subscriber.onNext(3);
              SystemClock.sleep(1000);
              subscriber.onNext(4);
              SystemClock.sleep(1000);
              subscriber.onNext(5);
              SystemClock.sleep(500);
              subscriber.onNext(6);
              subscriber.onCompleted();
          }
      }).throttleWithTimeout(700, TimeUnit.MILLISECONDS)  //指定最小發(fā)射間隔時間為700毫秒
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString()); //打印3蔓同,4,6
                  }
              });
    
  • sample / throttleLast(兩者使用及效果相同):定期發(fā)射Observable在該時間段發(fā)射的最后一項數(shù)據(jù)弃揽,與throttleFirst相反

      Observable.interval(0, 500, TimeUnit.MILLISECONDS)
              .take(10)   //每500毫秒發(fā)射一次數(shù)據(jù)矿微,發(fā)射10次
              .throttleLast(1000, TimeUnit.MILLISECONDS) //每1秒發(fā)射該秒內發(fā)射數(shù)據(jù)中的最后一項數(shù)據(jù)
              .subscribe(new Action1<Long>() {
                  @Override
                  public void call(Long aLong) {
                      //打印1,3掖举,5,7励负,9(即第一秒發(fā)射0熄守,1耗跛,第二秒發(fā)射2调塌,3羔砾,第三秒發(fā)射4,5政溃,第四秒發(fā)射6董虱,7愤诱,第五秒發(fā)射8捐友,9)
                      Log.d("debug", aLong.toString());
                  }
              });
    
  • timeout:如果指定時間內沒有發(fā)射任何數(shù)據(jù)匣砖,就發(fā)射一個異常或者使用備用的Observavle

      Observable.timer(5, TimeUnit.SECONDS)
              .timeout(3, TimeUnit.SECONDS)   //超時則發(fā)射異常
              .subscribe(new Subscriber<Long>() {
                  @Override
                  public void onCompleted() {
                  }
    
                  @Override
                  public void onError(Throwable e) {
                      Log.d("debug", "onError()");    //拋出異常
                  }
    
                  @Override
                  public void onNext(Long aLong) {
                      Log.d("debug", aLong.toString());
                  }
              });
    
      Observable.timer(5, TimeUnit.SECONDS)
              .timeout(3, TimeUnit.SECONDS, Observable.just(2L))  //設置備用Observable
              .subscribe(new Subscriber<Long>() {
                  @Override
                  public void onCompleted() {
    
                  }
    
                  @Override
                  public void onError(Throwable e) {
                      Log.d("debug", "onError()");
                  }
    
                  @Override
                  public void onNext(Long aLong) {
                      Log.d("debug", aLong.toString());   //發(fā)射備用Observable影涉,打印2
                  }
              });
    

條件 / 布爾操作

  • all:判斷所有數(shù)據(jù)中是否都滿足某個條件

      Observable.just(1, 2, 3, 4)
              .all(new Func1<Integer, Boolean>() {
                  @Override
                  public Boolean call(Integer integer) {
                      return integer < 5;//所有項都小于5
                  }
              })
              .subscribe(new Action1<Boolean>() {
                  @Override
                  public void call(Boolean aBoolean) {
                      Log.d("debug", aBoolean.toString());    //打印true
                  }
              });
    
      Observable.just(1, 2, 3, 4)
              .all(new Func1<Integer, Boolean>() {
                  @Override
                  public Boolean call(Integer integer) {
                      return integer > 2;//所有項都大于2
                  }
              })
              .subscribe(new Action1<Boolean>() {
                  @Override
                  public void call(Boolean aBoolean) {
                      Log.d("debug", aBoolean.toString());    //打印false
                  }
              });
    
  • exists:判斷是否存在數(shù)據(jù)項滿足某個條件

      Observable.just(1, 2, 3, 4)
              .exists(new Func1<Integer, Boolean>() {
                  @Override
                  public Boolean call(Integer integer) {
                      return integer > 2; //存在某項大于2
                  }   
              })
              .subscribe(new Action1<Boolean>() {
                  @Override
                  public void call(Boolean aBoolean) {
                      Log.d("debug", aBoolean.toString());    //打印true
                  }
              });
    
  • contains:判斷所有數(shù)據(jù)中是否包含指定的數(shù)據(jù)(內部調用exists)

      Observable.just(1, 2, 3, 4)
              .contains(2)    //是否包含2
              .subscribe(new Action1<Boolean>() {
                  @Override
                  public void call(Boolean aBoolean) {
                      Log.d("debug", aBoolean.toString());    //打印true
                  }
              });
    
  • sequenceEqual:判斷兩個Observable發(fā)射的數(shù)據(jù)是否相同(數(shù)據(jù)规伐,發(fā)射順序猖闪,終止狀態(tài))

      Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2, 3))
              .subscribe(new Action1<Boolean>() {
                  @Override
                  public void call(Boolean aBoolean) {
                      Log.d("debug", aBoolean.toString());    //打印true
                  }
              });
    
  • isEmpty:用于判斷Observable是否沒有發(fā)射任何數(shù)據(jù)(發(fā)射null返回為false)

      Observable.from(new ArrayList<Integer>())  //集合中沒有數(shù)據(jù)
              .isEmpty()
              .subscribe(new Action1<Boolean>() {
                  @Override
                  public void call(Boolean aBoolean) {
                      Log.d("debug", aBoolean.toString());    //打印true
                  }
              });
    
      Observable.empty()
              .isEmpty()
              .subscribe(new Action1<Boolean>() {
                  @Override
                  public void call(Boolean aBoolean) {
                      Log.d("debug", aBoolean.toString());    //打印true
                  }
              });
    
  • amber:指定多個Observable,只允許第一個開始發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù)培慌,其他Observable將會被會忽略

      Observable<Integer> observable1 = Observable.create(new Observable.OnSubscribe<Integer>() {
          @Override
          public void call(Subscriber<? super Integer> subscriber) {
              SystemClock.sleep(500);     //延遲500毫秒
              subscriber.onNext(1);
              subscriber.onNext(2);
              subscriber.onCompleted();
          }
      }.subscribeOn(Schedulers.computation()));  //指定為新的線程
    
      Observable<String> observable2 = Observable.create(new Observable.OnSubscribe<String>() {
          @Override
          public void call(Subscriber<? super String> subscriber) {
              subscriber.onNext("a");
              subscriber.onNext("b");
              subscriber.onCompleted();
          }
      });
    
      Observable.amb(observable1, observable2)
              .subscribe(new Action1<Serializable>() {
                  @Override
                  public void call(Serializable serializable) {
                      Log.d("debug", serializable.toString());    //打印a,b
                  }
              });
    
  • switchIfEmpty:如果原始Observable正常終止后仍沒有發(fā)射任何數(shù)據(jù)吵护,就使用備用的Observable

      Observable.from(new ArrayList<Integer>())
              .switchIfEmpty(Observable.just(1, 2))
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString()); //打印1馅而,2
                  }
              });
    
  • defaultIfEmpty:如果原始Observable正常終止后仍沒有發(fā)射任何數(shù)據(jù)瓮恭,就發(fā)射一個默認值(內部調用switchIfEmpty)

      Observable.from(new ArrayList<Integer>())
              .defaultIfEmpty(1)
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString()); //打印1
                  }
              });
    
  • takeUntil:當發(fā)射的數(shù)據(jù)滿足某個條件后(包含該數(shù)據(jù))屯蹦,或者第二個Observable發(fā)射了一項數(shù)據(jù)或發(fā)射了一個終止通知時(觀察者接受不到第二個Observable發(fā)射的數(shù)據(jù))登澜,終止第一個Observable發(fā)送數(shù)據(jù)

      Observable.just(1, 2, 3, 4)
              .takeUntil(new Func1<Integer, Boolean>() {
                  @Override
                  public Boolean call(Integer integer) {
                      return integer == 3;
                  }
              })
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", "just" + integer.toString());     //打印1脑蠕,2空郊,3
                  }
              });
    
      Observable.interval(0, 500, TimeUnit.MILLISECONDS)
              .subscribeOn(Schedulers.computation())
              .takeUntil(Observable.timer(1200, TimeUnit.MILLISECONDS))
              .subscribe(new Action1<Long>() {
                  @Override
                  public void call(Long aLong) {
                      Log.d("debug", aLong.toString());   //打印0狞甚,1哼审,2
                  }
              });
    
  • takeWhile:當發(fā)射的數(shù)據(jù)對應某個條件為false時(不包含該數(shù)據(jù))涩盾,Observable終止發(fā)送數(shù)據(jù)

      Observable.just(1, 2, 3, 4)
              .takeWhile(new Func1<Integer, Boolean>() {
                  @Override
                  public Boolean call(Integer integer) {
                      return integer != 3;
                  }
              })
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString());     //打印1春霍,2
                  }
              });
    
  • skipUnit:丟棄Observable發(fā)射的數(shù)據(jù)址儒,直到第二個Observable開始發(fā)射數(shù)據(jù)或者發(fā)射一個終止通知時

      Observable.interval(0, 500, TimeUnit.MILLISECONDS)
              .take(5)
              .subscribeOn(Schedulers.computation())
              .skipUntil(Observable.timer(1200, TimeUnit.MILLISECONDS))
              .subscribe(new Action1<Long>() {
                  @Override
                  public void call(Long aLong) {
                      Log.d("debug", aLong.toString());   //打印3莲趣,4
                  }
              });
    
  • skipWhile:丟棄Observable發(fā)射的數(shù)據(jù)喧伞,直到一個指定的條件不成立(不丟棄條件數(shù)據(jù))

      Observable.just(1, 2, 3, 4)
              .skipWhile(new Func1<Integer, Boolean>() {
                  @Override
                  public Boolean call(Integer integer) {
                      return integer < 3;
                  }
              })
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString());     //打印3潘鲫,4
                  }
              });
    

聚合操作

  • reduce:用一個函數(shù)接收Observable發(fā)射的數(shù)據(jù)次舌,將函數(shù)的計算結果作為下次計算的參數(shù),最后輸出結果挪圾。

      Observable.just(1, 2, 3, 4)
              .reduce(new Func2<Integer, Integer, Integer>() {
                  @Override
                  public Integer call(Integer integer, Integer integer2) {
                      Log.d("debug", "integer1:" + integer + ",integer2:" + integer2);
                      return integer + integer2;  //求和操作
                  }
              })
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", "result:" + integer);
                  }
              });
      /**
       * 日志輸出
       * integer1:1,integer2:2
       * integer1:3,integer2:3
       * integer1:6,integer2:4
       * result:10
       */
    
  • collect:用于將數(shù)據(jù)收集到一個可變的數(shù)據(jù)結構(如List,Map)

      Observable.just(1, 2, 3, 4)
              .collect(new Func0<List<Integer>>() {
                  @Override
                  public List<Integer> call() {
                      return new ArrayList<Integer>();    //創(chuàng)建List用于收集數(shù)據(jù)
                  }
              }, new Action2<List<Integer>, Integer>() {
                  @Override
                  public void call(List<Integer> integers, Integer integer) {
                      integers.add(integer);  //將數(shù)據(jù)添加到List中
                  }
              })
              .subscribe(new Action1<List<Integer>>() {
                  @Override
                  public void call(List<Integer> integers) {
                      Log.d("debug", integers.toString());    //打印[1, 2, 3, 4]
                  }
              });
    
      Observable.just(1, 2, 3, 4)
              .collect(new Func0<Map<Integer, String>>() {
                  @Override
                  public Map<Integer, String> call() {
                      return new HashMap<Integer, String>();  //創(chuàng)建Map用于收集數(shù)據(jù)
                  }
              }, new Action2<Map<Integer, String>, Integer>() {
                  @Override
                  public void call(Map<Integer, String> integerStringMap, Integer integer) {
                      integerStringMap.put(integer, "value" + integer);   //將數(shù)據(jù)添加到Map中
                  }
              })
              .subscribe(new Action1<Map<Integer, String>>() {
                  @Override
                  public void call(Map<Integer, String> integerStringMap) {
                      //打印{4=value4, 1=value1, 3=value3, 2=value2},注:HashMap保存的數(shù)據(jù)是無序的
                      Log.d("debug", integerStringMap.toString());    
                  }
              });
    
  • count / countLong:計算發(fā)射的數(shù)量棚赔,內部調用的是reduce

      Observable.just(1, 2, 3, 4)
              .count()
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", "integer:" + integer.toString());    //打印4
                  }
              });
    

轉換操作

  • toList:將Observable發(fā)射的所有數(shù)據(jù)收集到一個列表中靠益,返回這個列表

      Observable.just(1, 2, 3, 4)
              .toList()
              .subscribe(new Action1<List<Integer>>() {
                  @Override
                  public void call(List<Integer> integers) {
                      Log.d("debug", integers.toString());    //打印[1, 2, 3, 4]
                  }
              });
    
  • toSortedList:將Observable發(fā)射的所有數(shù)據(jù)收集到一個有序列表中,返回這個列表

      Observable.just(3, 2, 5, 4, 1)
              .toSortedList()     //默認升序排序
              .subscribe(new Action1<List<Integer>>() {
                  @Override
                  public void call(List<Integer> integers) {
                      Log.d("debug", integers.toString());    //打印[1, 2, 3, 4, 5]
                  }
              });
    
      Observable.just(3, 2, 5, 4, 1)
              .toSortedList(new Func2<Integer, Integer, Integer>() {
                  @Override
                  public Integer call(Integer integer, Integer integer2) {
                      return integer2 - integer;  //自定義排序規(guī)則(倒序)
                  }
              })
              .subscribe(new Action1<List<Integer>>() {
                  @Override
                  public void call(List<Integer> integers) {
                      Log.d("debug", integers.toString());    //打印[5, 4, 3, 2, 1]
                  }
              });
    
  • toMap:將序列數(shù)據(jù)轉換為一個Map壳快,根據(jù)數(shù)據(jù)項生成key和value

      Observable.just(1, 2, 3, 4)
              .toMap(new Func1<Integer, String>() {   //根據(jù)數(shù)據(jù)項生成key眶痰,value為原始數(shù)據(jù)
                  @Override
                  public String call(Integer integer) {
                      return "key:" + integer;
                  }
              })
              .subscribe(new Action1<Map<String, Integer>>() {
                  @Override
                  public void call(Map<String, Integer> stringIntegerMap) {
                      Log.d("debug", stringIntegerMap.toString());    //打印{key:4=4, key:2=2, key:1=1, key:3=3}
                  }
              });
    
      Observable.just(1, 2, 3, 4)
              .toMap(new Func1<Integer, String>() {   //根據(jù)數(shù)據(jù)項生成key和value
                  @Override
                  public String call(Integer integer) {
                      return "key:" + integer;
                  }
              }, new Func1<Integer, String>() {
                  @Override
                  public String call(Integer integer) {
                      return "value:" + integer;
                  }
              })
              .subscribe(new Action1<Map<String, String>>() {
                  @Override
                  public void call(Map<String, String> stringStringMap) {
                      Log.d("debug", stringStringMap.toString()); //打印{key:4=value:4, key:2=value:2, key:1=value:1, key:3=value:3}
                  }
              });
    
      Observable.just(1, 2, 3, 4)
              .toMap(new Func1<Integer, String>() {   //根據(jù)數(shù)據(jù)項生成key和value竖伯,創(chuàng)建指定類型的Map
                  @Override
                  public String call(Integer integer) {
                      return "key:" + integer;
                  }
              }, new Func1<Integer, String>() {
                  @Override
                  public String call(Integer integer) {
                      return "value:" + integer;
                  }
              }, new Func0<Map<String, String>>() {
                  @Override
                  public Map<String, String> call() {
                      return new LinkedHashMap<String, String>(); //LinkedHashMap保證存取順序相同
                  }
              })
              .subscribe(new Action1<Map<String, String>>() {
                  @Override
                  public void call(Map<String, String> stringStringMap) {
                      Log.d("debug", stringStringMap.toString());  //打印{key:1=value:1, key:2=value:2, key:3=value:3, key:4=value:4}
                  }
              });
    
  • toMultiMap:類似toMap宏胯,不同的地方在于map的value是一個集合,使一個key可以映射多個value婚惫,多用于分組

      Observable.just(1, 2, 1, 4)
              .toMultimap(new Func1<Integer, String>() {   //根據(jù)數(shù)據(jù)項生成key魂爪,value為原始數(shù)據(jù)
                  @Override
                  public String call(Integer integer) {
                      return "key:" + integer;
                  }
              })
              .subscribe(new Action1<Map<String, Collection<Integer>>>() {
                  @Override
                  public void call(Map<String, Collection<Integer>> stringCollectionMap) {
                      Log.d("debug", stringCollectionMap.toString()); //打印{key:4=[4], key:2=[2], key:1=[1, 1]}
                  }
              });
    
      Observable.just(1, 2, 1, 4)
              .toMap(new Func1<Integer, String>() {
                  @Override
                  public String call(Integer integer) {
                      return "key:" + integer;
                  }
              })
              .subscribe(new Action1<Map<String, Integer>>() {
                  @Override
                  public void call(Map<String, Integer> stringIntegerMap) {
                      Log.d("debug", stringIntegerMap.toString());    //打印{key:4=4, key:2=2, key:1=1}
                  }
              });
    

變換操作

  • map:對Observable發(fā)射的每一項數(shù)據(jù)都應用一個函數(shù)進行變換

      Observable.just(1, 2, 3, 4)
              .map(new Func1<Integer, String>() {
                  @Override
                  public String call(Integer integer) {
                      return "item:" + integer;
                  }
              })
              .subscribe(new Action1<String>() {
                  @Override
                  public void call(String s) {
                      Log.d("debug", s);  //打印item:1,item:2,item:3,item:4
                  }
              });
    
  • cast:在發(fā)射之前強制將Observable發(fā)射的所有數(shù)據(jù)轉換為指定類型(父類強轉為子類)

      List list = new ArrayList();
      Observable.just(list)
              .cast(ArrayList.class)  //將List強轉為ArrayList
              .subscribe(new Action1<ArrayList>() {
                  @Override
                  public void call(ArrayList arrayList) {
    
                  }
              });
    
  • flatMap:將Observable發(fā)射的數(shù)據(jù)變換為Observables集合蒋川,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進一個單獨的Observable,內部使用marge合并(可用于一對多轉換或多對多轉換捺球,也可用于網(wǎng)絡請求的嵌套)

      Observable.just(1, 2, 3)
              .flatMap(new Func1<Integer, Observable<Integer>>() {
                  @Override
                  public Observable<Integer> call(Integer integer) {
                      return Observable.create(new Observable.OnSubscribe<Integer>() {
                          @Override
                          public void call(Subscriber<? super Integer> subscriber) {
                              subscriber.onNext(integer * 10);
                              subscriber.onNext(integer * 100);
                              subscriber.onCompleted();
                          }
                      });
                  }
              })
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString()); //打印10氮兵,100泣栈,20南片,200疼进,30螺捐,300
                  }
              });
    
  • flatMapIterable:和flatMap作用一樣,只不過生成的是Iterable而不是Observable

      Observable.just(1, 2, 3)
              .flatMapIterable(new Func1<Integer, Iterable<Integer>>() {
                  @Override
                  public Iterable<Integer> call(Integer integer) {
                      return Arrays.asList(integer * 10, integer * 100);
                  }
              })
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", integer.toString()); //打印10,100灾票,20刊苍,200正什,30婴氮,300
                  }
              });
    
  • concatMap:類似于flatMap主经,由于內部使用concat合并,所以是按照順序連接發(fā)射

  • switchMap:和flatMap很像穗酥,將Observable發(fā)射的數(shù)據(jù)變換為Observables集合砾跃,當原始Observable發(fā)射一個新的數(shù)據(jù)(Observable)時蜓席,它將取消訂閱前一個Observable

      Observable.interval(0, 500, TimeUnit.MILLISECONDS)  //每500毫秒發(fā)射一次
              .take(4)
              .switchMap(new Func1<Long, Observable<String>>() {
                  @Override
                  public Observable<String> call(Long aLong) {
                      return Observable.create(new Observable.OnSubscribe<String>() {
                          @Override
                          public void call(Subscriber<? super String> subscriber) {
                              subscriber.onNext(aLong + "A");
                              SystemClock.sleep(800);     //延遲800毫秒
                              subscriber.onNext(aLong + "B");
                              subscriber.onCompleted();
                          }
                      }).subscribeOn(Schedulers.newThread());
                  }
              })
              .subscribe(new Action1<String>() {
                  @Override
                  public void call(String s) {
                      Log.d("debug", s);  //打印0A,1A渺贤,2A志鞍,3A固棚,3B
                  }
              });
    
  • 與reduce很像,對Observable發(fā)射的每一項數(shù)據(jù)應用一個函數(shù)委粉,然后按順序依次發(fā)射每一個值

      Observable.just(1, 2, 3, 4)
              .scan(new Func2<Integer, Integer, Integer>() {
                  @Override
                  public Integer call(Integer integer, Integer integer2) {
                      Log.d("debug", "integer1:" + integer + ",integer2:" + integer2);
                      return integer + integer2;
                  }
              })
              .subscribe(new Action1<Integer>() {
                  @Override
                  public void call(Integer integer) {
                      Log.d("debug", "result:" + integer);
                  }
              });
      /**
       * 日志輸出
       * result:1
       * integer1:1,integer2:2
       * result:3
       * integer1:3,integer2:3
       * result:6
       * integer1:6,integer2:4
       * result:10
       */
    
  • groupBy:將Observable分拆為Observable集合贾节,將原始Observable發(fā)射的數(shù)據(jù)按Key分組栗涂,每個Observable發(fā)射一組不同的數(shù)據(jù)(類似于toMultiMap)

      Observable.just(1, 2, 3, 4)
              .groupBy(new Func1<Integer, String>() {
                  @Override
                  public String call(Integer integer) {   //根據(jù)數(shù)據(jù)項生成key
                      return integer % 2 == 0 ? "偶數(shù)" : "奇數(shù)";
                  }
              })
              .subscribe(new Action1<GroupedObservable<String, Integer>>() {
                  @Override
                  public void call(GroupedObservable<String, Integer> o) {
                      o.subscribe(new Action1<Integer>() {
                          @Override
                          public void call(Integer integer) {
                              Log.d("debug", o.getKey() + ":" + integer); //打印奇數(shù):1斤程,偶數(shù):2暖释,奇數(shù):3,偶數(shù):4
                          }
                      });
                  }
              });
    
      Observable.just(1, 2, 3, 4)
              .groupBy(new Func1<Integer, String>() {
                  @Override
                  public String call(Integer integer) {   //根據(jù)數(shù)據(jù)項生成key
                      return integer % 2 == 0 ? "偶數(shù)" : "奇數(shù)";
                  }
              }, new Func1<Integer, Integer>() {
                  @Override
                  public Integer call(Integer integer) {  //根據(jù)數(shù)據(jù)項生成value
                      return integer * 10;
                  }
              })
              .subscribe(new Action1<GroupedObservable<String, Integer>>() {
                  @Override
                  public void call(GroupedObservable<String, Integer> o) {
                      o.subscribe(new Action1<Integer>() {
                          @Override
                          public void call(Integer integer) {
                              Log.d("debug", o.getKey() + ":" + integer); //打印奇數(shù):10,偶數(shù):20橄杨,奇數(shù):30式矫,偶數(shù):40
                          }
                      });
                  }
              });
    
  • buffer:定期從Observable收集數(shù)據(jù)到一個集合采转,然后將這些數(shù)據(jù)集合打包發(fā)射

      Observable.just(1, 2, 3, 4, 5)
              .buffer(3, 1)   //count:表示從當前指針位置開始打包3個數(shù)據(jù)項到集合中故慈,skip:表示指針向后移1位察绷,
              .subscribe(new Action1<List<Integer>>() {
                  @Override
                  public void call(List<Integer> integers) {
                      Log.d("debug", "skip" + integers.toString());   //打印[1, 2, 3]拆撼,[2, 3, 4],[3, 4, 5]蚜印,[4, 5]娶视,[5]
                  }
              });
    
      Observable.just(1, 2, 3, 4, 5)
              .buffer(3)  //每3個打包成一個集合,內部就是.buffer(3,3)
              .subscribe(new Action1<List<Integer>>() {
                  @Override
                  public void call(List<Integer> integers) {
                      Log.d("debug", integers.toString());    //打印[1, 2, 3]寝凌,[4, 5]
                  }
              });
    
      Observable.interval(0, 100, TimeUnit.MILLISECONDS)
              .take(5)
              .buffer(250, TimeUnit.MILLISECONDS, 2)     //將每250毫秒內發(fā)射的數(shù)據(jù)收集到多個集合中,每個集合最多存放2個數(shù)據(jù)
              .subscribe(new Action1<List<Long>>() {
                  @Override
                  public void call(List<Long> longs) {
                      //打印[0, 1]较木,[2]青柄,[3, 4]致开,[]
                      Log.d("debug", "count:" + longs.toString());
                  }
              });
    
      Observable.interval(0, 100, TimeUnit.MILLISECONDS)
              .take(5)
              .buffer(250, TimeUnit.MILLISECONDS)     //將每250毫秒內發(fā)射的數(shù)據(jù)收集到一個集合中双戳,集合不限制大小
              .subscribe(new Action1<List<Long>>() {
                  @Override
                  public void call(List<Long> longs) {
                      Log.d("debug", longs.toString());   //打印[0, 1, 2]飒货,[3, 4]
                  }
              });
    
      Observable.interval(0, 100, TimeUnit.MILLISECONDS)
              .take(5)
              //從指定時間節(jié)點開始塘辅,將該節(jié)點后250毫秒內發(fā)射的數(shù)據(jù)收集的一個集合中扣墩,初始節(jié)點為0呻惕,每發(fā)射一次集合蟆融,
              //節(jié)點的時間增加150毫秒,即下一次收集數(shù)據(jù)從150毫秒開始查乒,收集150毫秒到400毫秒之間發(fā)射的數(shù)據(jù)
              .buffer(250, 150, TimeUnit.MILLISECONDS)     
              .subscribe(new Action1<List<Long>>() {
                  @Override
                  public void call(List<Long> longs) {
                      Log.d("debug", longs.toString());   //打印[0, 1, 2]玛迄,[2蓖议,3]勒虾,[4]
                  }
              });
    
  • window:定期將來自Observable的數(shù)據(jù)分拆成一些Observable窗口修然,然后發(fā)射這些窗口质况,而不是每次發(fā)射一項结榄,類似于buffer臼朗,buffer發(fā)射的是集合依溯,而window發(fā)射的是Observable

      Observable.just(1, 2, 3, 4, 5)
              .window(3, 1)
              .subscribe(new Action1<Observable<Integer>>() {
                  @Override
                  public void call(Observable<Integer> integerObservable) {
                      integerObservable.toList()  //將所有數(shù)據(jù)搜集成一個集合黎炉,便于觀察
                              .subscribe(new Action1<List<Integer>>() {
                                  @Override
                                  public void call(List<Integer> integers) {
                                      Log.d("debug", integers.toString());    //打印[1, 2, 3]慷嗜,[2, 3, 4]庆械,[3, 4, 5]缭乘,[4, 5]堕绩,[5]
                                  }
                              });
                  }
              });
    
      Observable.just(1, 2, 3, 4, 5)
              .window(3)  //相當于window(3,3)
              .subscribe(new Action1<Observable<Integer>>() {
                  @Override
                  public void call(Observable<Integer> integerObservable) {
                      integerObservable.toList()  //將所有數(shù)據(jù)搜集成一個集合奴紧,便于觀察
                              .subscribe(new Action1<List<Integer>>() {
                                  @Override
                                  public void call(List<Integer> integers) {
                                      Log.d("debug", integers.toString());    //打印[1, 2, 3]黍氮,[4, 5]
                                  }
                              });
                  }
              });
      //剩下其余重載方法也與buffer基本一樣沫浆,不重復了
    

篇幅有限件缸,第一部分介紹到這里

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末争剿,一起剝皮案震驚了整個濱河市蚕苇,隨后出現(xiàn)的幾起案子涩笤,更是在濱河造成了極大的恐慌蹬碧,老刑警劉巖炒刁,帶你破解...
    沈念sama閱讀 218,682評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件罗心,死亡現(xiàn)場離奇詭異渤闷,居然都是意外死亡飒箭,警方通過查閱死者的電腦和手機补憾,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評論 3 395
  • 文/潘曉璐 我一進店門盈匾,熙熙樓的掌柜王于貴愁眉苦臉地迎上來削饵,“玉大人窿撬,你說我怎么就攤上這事劈伴□髓担” “怎么了追城?”我有些...
    開封第一講書人閱讀 165,083評論 0 355
  • 文/不壞的土叔 我叫張陵座柱,是天一觀的道長色洞。 經(jīng)常有香客問我火诸,道長惭蹂,這世上最難降的妖魔是什么割粮? 我笑而不...
    開封第一講書人閱讀 58,763評論 1 295
  • 正文 為了忘掉前任盾碗,我火速辦了婚禮,結果婚禮上舀瓢,老公的妹妹穿的比我還像新娘廷雅。我一直安慰自己,他們只是感情好,可當我...
    茶點故事閱讀 67,785評論 6 392
  • 文/花漫 我一把揭開白布航缀。 她就那樣靜靜地躺著,像睡著了一般芥玉。 火紅的嫁衣襯著肌膚如雪蛇摸。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,624評論 1 305
  • 那天灿巧,我揣著相機與錄音赶袄,去河邊找鬼。 笑死抠藕,一個胖子當著我的面吹牛饿肺,可吹牛的內容都是我干的盾似。 我是一名探鬼主播敬辣,決...
    沈念sama閱讀 40,358評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼零院!你這毒婦竟也來了溉跃?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,261評論 0 276
  • 序言:老撾萬榮一對情侶失蹤门粪,失蹤者是張志新(化名)和其女友劉穎喊积,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體玄妈,經(jīng)...
    沈念sama閱讀 45,722評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡乾吻,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了拟蜻。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片绎签。...
    茶點故事閱讀 40,030評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖酝锅,靈堂內的尸體忽然破棺而出诡必,到底是詐尸還是另有隱情,我是刑警寧澤搔扁,帶...
    沈念sama閱讀 35,737評論 5 346
  • 正文 年R本政府宣布爸舒,位于F島的核電站,受9級特大地震影響稿蹲,放射性物質發(fā)生泄漏扭勉。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,360評論 3 330
  • 文/蒙蒙 一苛聘、第九天 我趴在偏房一處隱蔽的房頂上張望涂炎。 院中可真熱鬧忠聚,春花似錦、人聲如沸唱捣。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽震缭。三九已至赂毯,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間拣宰,已是汗流浹背欢瞪。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留徐裸,地道東北人遣鼓。 一個月前我還...
    沈念sama閱讀 48,237評論 3 371
  • 正文 我出身青樓,卻偏偏與公主長得像重贺,于是被迫代替她去往敵國和親骑祟。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,976評論 2 355

推薦閱讀更多精彩內容

  • 注:只包含標準包中的操作符气笙,用于個人學習及備忘參考博客:http://blog.csdn.net/maplejaw...
    小白要超神閱讀 934評論 0 3
  • 作者: maplejaw本篇只解析標準包中的操作符次企。對于擴展包,由于使用率較低潜圃,如有需求缸棵,請讀者自行查閱文檔。 創(chuàng)...
    maplejaw_閱讀 45,668評論 8 93
  • 創(chuàng)建unfaseCreate(create)創(chuàng)建一個Observable(被觀察者)谭期,當被觀察者(Observer...
    chuwe1閱讀 7,008評論 3 8
  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理堵第,服務發(fā)現(xiàn),斷路器隧出,智...
    卡卡羅2017閱讀 134,659評論 18 139
  • 最近看了一個貼子胀瞪,一個白領用月租1600针余,租下了廣州芳村的一套60平米的兩房一廳,花了三萬多塊錢裝修凄诞。當她下了班便...
    素錦之年閱讀 242評論 0 1