RxJava Github地址:https://github.com/ReactiveX/RxJava
首先引入庫文件(當前最新版本為2.1.4)
dependencies {
...
compile 'io.reactivex.rxjava2:rxjava:2.1.4'
}
先來一個簡單例子
例1
//創(chuàng)建被觀察者
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onNext("2");
subscriber.onNext("3");
subscriber.onCompleted();
}
});
//創(chuàng)建觀察者
Observer observer = new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println(e);
}
@Override
public void onNext(String s) {
System.out.println(s);
}
};
//訂閱
observable.subscribe(observer);
結果:
1
2
3
onCompleted
(1)我們先來看看第一步掌挚,創(chuàng)建被觀察者:
Observable.create方法(2.0后已經(jīng)Deprecated)
@Deprecated
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
OnSubscribe是接口享甸, 繼承了Action1
注意這里的參數(shù)從轉換為了Subscriber<? super T>
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
}
Action1接口,只有一個call方法 有一個參數(shù)知举,無返回值
public interface Action1<T> extends Action {
void call(T t);
}
這就與以上例1中的創(chuàng)建觀察者相對應了补履, Observable.create()傳入泛型為String的onSubscribe, 并實現(xiàn)接口call(T t),這里的T也變?yōu)榱薙ubscriber<? super String>類型
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
}
});
(2)我們再來看看Observer
public interface Observer<T> {
//通知觀察者,被觀察者完成了基于推送的通知添坊。onCompleted()與onError()設計上只會有一個被調用
void onCompleted();
//通知觀察者,出錯了箫锤,一但出錯贬蛙,被觀察者將不再繼續(xù)調用接下來的步驟,如onNext, onCompleted
void onError(Throwable e);
//向觀察者提供新的觀察事件
void onNext(T t);
}
(3)最后實現(xiàn)訂閱關系谚攒,以原代碼角度來分析阳准,如何將Observable與Observer相關聯(lián)
Observable.create()時,新建了一個Observable實例
@Deprecated
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
構造方法中只是將onSubscribe對象保存起來
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
我們再來看看Observable.subscribe()方法馏臭, 這里判斷observer是不是Subscriber的實例野蝇,不是的話通過新建ObserverSubscriber類來轉換成為Subscriber,所以最終都是調用subscribe(Subscriber)
Subscriber 與Observer類似括儒, 實現(xiàn)了Observer, 同時也實現(xiàn)了Subscription, 多了兩個方法:unsubscribe(), isUnsubscribed()
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
if (observer == null) {
throw new NullPointerException("observer is null");
}
return subscribe(new ObserverSubscriber<T>(observer));
}
public final class ObserverSubscriber<T> extends Subscriber<T> {
final Observer<? super T> observer;
public ObserverSubscriber(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
observer.onNext(t);
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onCompleted() {
observer.onCompleted();
}
}
最后調用 了Observable.subscribe(subscriber, this)绕沈,注意這里關鍵的this
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
//......
// Subscriber類中的onStart()為空實現(xiàn), 在Observer所有方法之前被調用帮寻。
subscriber.onStart();
//......
try {
//這里的observable實際就是當前對象,在這里調用了當前對象中onSubscribe的call.并將例一中的observer 以Subscriber封裝后的實例作為call的參數(shù)返回乍狐。
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
//......
subscriber.onError(RxJavaHooks.onObservableError(e));
//最后返回未取消訂閱的Subscription對象
return Subscriptions.unsubscribed();
}
}
簡單來看一看RxJavaHooks中的onObservableStart方法
public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
if (f != null) {
//經(jīng)過一系列調用, 最終還是返回的onSubscribe本身固逗,參考下面的分析
return f.call(instance, onSubscribe);
}
//如果onObservableStart為null便將傳進來的onSubscribe原封返回
return onSubscribe;
}
來簡單分析一下f.call();
static void init() {
//......
onObservableStart = new Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe>() {
@Override
public Observable.OnSubscribe call(Observable t1, Observable.OnSubscribe t2) {
return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2);
}
};
//......
}
@Deprecated
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
return onSubscribe;//將傳進來的onSubscribe原封返回
}
至此浅蚪,例1所有代碼便分析完成。
例1步驟可以簡化成以下
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onNext("2");
subscriber.onNext("3");
subscriber.onCompleted();
}
}).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println(e);
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
或lambda表達式:
Observable.create((Observable.OnSubscribe<String>) subscriber -> {
subscriber.onNext("1");
subscriber.onNext("2");
subscriber.onNext("3");
subscriber.onCompleted();
}).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println(e);
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
操作符
這和java8中的Stream很相似抒蚜,可參考我的另一篇文章《函數(shù)式編程 Lambda及Stream》
下面舉幾個常用的例子
from
接收數(shù)組掘鄙,實現(xiàn)Iterable的子類對象耘戚,及Future對象
String[] array = new String[]{"a", "b", "c"};
Observable.from(Arrays.asList(array))
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
a
b
c
onCompleted
just
看原碼也是調用的 from((T[])new Object[] { })
Observable.just("1", "2", "3")
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
1
2
3
onCompleted
defer 延遲
- 即用普通的創(chuàng)建Observable方法
private static String string = "0";
public static void main(String[] args) {
Observable observable = Observable.just(string);
string = "1";
observable.subscribe(System.out::println);
}
結果:0
- 使用defer創(chuàng)建Observable
private static String string = "0";
public static void main(String[] args) {
Observable observable = Observable.defer(() -> Observable.just(string));
string = "1";
observable.subscribe(System.out::println);
}
結果:1
封裝時可先使用defer,后面調用時便會根據(jù)不同的參數(shù)創(chuàng)建相應的新的Observable
map 變換
接收一個Func1類型的參數(shù)(一個參數(shù)嗡髓,一個返回值 )
String[] array = new String[]{"a", "b", "c"};
Observable.from(array)
.map(String::toUpperCase)
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
A
B
C
onCompleted
flatMap變換
以原有Observable為基礎,將發(fā)射的每一條數(shù)據(jù)再分成若干個Observable,最后合并收津,這就是flat的意思饿这,壓平
注意這里與map不同的是,返回值是一個Observable對象撞秋。
String[] array = new String[]{"aa", "bbb", "cc"};
Observable.from(array)
.map(String::toUpperCase)
.flatMap(s -> {
List<String> singleLetter = new ArrayList<String>();
for (int i = 0; i < s.length(); i++) {
singleLetter.add(String.valueOf(s.charAt(i)));
}
return Observable.from(singleLetter);
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
A
A
B
B
B
C
C
onCompleted
scan 累加器
由初始值與累加器構成
-
接收一個Func2<T, T, T> 類型參數(shù)
String[] array = new String[]{"aa", "bbb", "cc"};
Observable.from(array)
.map(String::toUpperCase)
.flatMap(s -> {
List<String> singleLetter = new ArrayList<String>();
for (int i = 0; i < s.length(); i++) {
singleLetter.add(String.valueOf(s.charAt(i)));
}
return Observable.from(singleLetter);
})
.scan(((s1, s2) -> s1 + s2))
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
A
AA
AAB
AABB
AABBB
AABBBC
AABBBCC
onCompleted
在未賦初值的情況下长捧,是不調用該方法的(看下圖,scan方法被調用前吻贿,Console面板中已經(jīng)顯示出發(fā)射的第一條數(shù)據(jù)串结。)
發(fā)射的第一條數(shù)據(jù)會作為初始值,發(fā)射第二條數(shù)據(jù)時調用該方法并將初始值傳給s1, 將第二條數(shù)據(jù)傳給s2
并將結果作為下一條數(shù)據(jù)的初始值。
-
接收(R initialValue, Func2<R, ? super T, R> accumulator) 兩個參數(shù)
String[] array = new String[]{"aa", "bbb", "cc"};
Observable.from(array)
.map(String::toUpperCase)
.flatMap(s -> {
List<String> singleLetter = new ArrayList<String>();
for (int i = 0; i < s.length(); i++) {
singleLetter.add(String.valueOf(s.charAt(i)));
}
return Observable.from(singleLetter);
})
.scan("-", ((s1, s2) -> s1 + s2))
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
- //在scan被第一次調用前肌割,已經(jīng)被打印出來
-A
-AA
-AAB
-AABB
-AABBB
-AABBBC
-AABBBCC
onCompleted
filter 過濾出符合條件的
參數(shù)為Func1<? super T, Boolean>卧蜓, 返回布爾值
Observable.from(array)
.filter(s -> s.length()<=2)
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
aa
cc
onCompleted
take
String[] array = new String[]{"aa", "bbb", "cc"};
Observable.from(array)
.map(String::toUpperCase)
.flatMap(s -> {
List<String> singleLetter = new ArrayList<String>();
for (int i = 0; i < s.length(); i++) {
singleLetter.add(String.valueOf(s.charAt(i)));
}
return Observable.from(singleLetter);
})
.take(5)//取出5個數(shù)據(jù)
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
A
A
B
B
B
onCompleted
takeFirst
String[] array = new String[]{"aa", "bbb", "cccc", "dddd"};
Observable.from(array)
.map(String::toUpperCase)
.takeFirst(s -> s.length()==4)
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
CCCC
onCompleted
takeLast
Observable.from(array)
.map(String::toUpperCase)
.flatMap(s -> {
List<String> singleLetter = new ArrayList<String>();
for (int i = 0; i < s.length(); i++) {
singleLetter.add(String.valueOf(s.charAt(i)));
}
return Observable.from(singleLetter);
})
.takeLast(2) // 取出最后2個數(shù)據(jù)
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
C
C
onCompleted
take與takeLast以時間為限制取數(shù)據(jù)
public final Observable<T> take(long time, TimeUnit unit) //只取time時間內發(fā)射的數(shù)據(jù)
public final Observable<T> takeLast(long time, TimeUnit unit) //只取time時間結束前的數(shù)據(jù)
first
- public final Observable<T> first() {
String[] array = new String[]{"aa", "bbb", "cccc", "dddd"};
Observable.from(array)
.map(String::toUpperCase)
.first()
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
AA
onCompleted
- public final Observable<T> first(Func1<? super T, Boolean> predicate) {
String[] array = new String[]{"aa", "bbb", "cccc", "dddd"};
Observable.from(array)
.map(String::toUpperCase)
.first(s -> s.length()==4)
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
CCCC
onCompleted
- public final Observable<T> firstOrDefault(T defaultValue) {
String[] array = new String[]{};
Observable.from(array)
.map(String::toUpperCase)
.firstOrDefault("null")
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
null
onCompleted
- public final Observable<T> firstOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
String[] array = new String[]{"aa", "bbb", "cccc", "dddd"};
Observable.from(array)
.map(String::toUpperCase)
.firstOrDefault("null", s -> s.equals("fuck"))
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
null
onCompleted
skip
String[] array = new String[]{"aa", "bbb", "cccc", "dddd"};
Observable.from(array)
.map(String::toUpperCase)
.skip(2)
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
CCCC
DDDD
onCompleted
skipLast
String[] array = new String[]{"aa", "bbb", "cccc", "dddd"};
Observable.from(array)
.map(String::toUpperCase)
.skipLast(2)
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
AA
BBB
onCompleted
skipWhile
String[] array = new String[]{"aa", "bbb", "cccc", "dddd"};
Observable.from(array)
.map(String::toUpperCase)
.skipWhile(s -> s.length()<4)
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
CCCC
DDDD
onCompleted
distinct 去重
String[] array = new String[]{"aa", "aa", "cccc", "dddd"};
Observable.from(array)
.map(String::toUpperCase)
.distinct()
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
AA
CCCC
DDDD
onCompleted
elementAt
String[] array = new String[]{"a", "bb", "ccc", "dddd"};
Observable.from(array)
.map(String::toUpperCase)
.elementAt(2)
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
CCC
onCompleted
merge 合并數(shù)據(jù)源
Observable<String> justA = Observable.just("a1", "a2", "a3");
Observable<String> justB = Observable.just("b1", "b2", "b3");
Observable.merge(justB,justA)
.map(String::toUpperCase)
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
B1
B2
B3
A1
A2
A3
onCompleted
zip 合并數(shù)據(jù)
Observable<String> justA = Observable.just("a1", "a2", "a3");
Observable<String> justB = Observable.just("b1", "b2", "b3", "b4");
Observable.zip(justA,justB,(s1, s2) -> s1+s2)
.map(String::toUpperCase)
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
A1B1
A2B2
A3B3
onCompleted
startWith
Observable<String> justA = Observable.just("a1", "a2", "a3");
Observable<String> justB = Observable.just("b1", "b2", "b3", "b4");
Observable.zip(justA,justB,(s1, s2) -> s1+s2)
.map(String::toUpperCase)
.startWith("fuck")
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
結果:
fuck
A1B1
A2B2
A3B3
onCompleted
empty
創(chuàng)建一個不發(fā)射任何數(shù)據(jù)但是正常終止的Observable
Observable.empty().subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println(e);
}
@Override
public void onNext(Object o) {
System.out.println(o);
}
});
結果:
onCompleted
簡單分析一下原碼:
public static <T> Observable<T> empty() {
return EmptyObservableHolder.instance();
}
public enum EmptyObservableHolder implements OnSubscribe<Object> {
INSTANCE
;
static final Observable<Object> EMPTY = Observable.unsafeCreate(INSTANCE);
public static <T> Observable<T> instance() {
return (Observable<T>)EMPTY;
}
@Override
public void call(Subscriber<? super Object> child) {
child.onCompleted();//只調用了onCompleted()方法
}
}
never
Observable.never().subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println(e);
}
@Override
public void onNext(Object o) {
System.out.println(o);
}
});
結果什么都沒有
原碼分析:
public static <T> Observable<T> never() {
return NeverObservableHolder.instance();
}
public enum NeverObservableHolder implements OnSubscribe<Object> {
INSTANCE
;
static final Observable<Object> NEVER = Observable.unsafeCreate(INSTANCE);
@SuppressWarnings("unchecked")
public static <T> Observable<T> instance() {
return (Observable<T>)NEVER;
}
@Override
public void call(Subscriber<? super Object> child) {
// deliberately no op //什么也沒做
}
}
range
Observable.range(10, 4).subscribe(System.out::println);
結果:
10
11
12
13
interval
Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline()).subscribe(System.out::println);
結果:從0開始每一秒按順序輸出
注意,這里需要設置Schedulers.trampoline(),在當前線程將任務插入隊列中把敞。
timer
Observable.timer(3, TimeUnit.SECONDS, Schedulers.trampoline()).subscribe(System.out::println);
結果:三秒后輸出0后結束
delay
String[] arrays = new String[]{"1", "2", "3", "4"};
Observable.from(arrays).delay(1, TimeUnit.SECONDS, Schedulers.trampoline()).subscribe(System.out::println);
結果:運行后弥奸,一秒后輸出1, 每一少輸出下一個奋早,輸出4再一秒后結束
contains
String[] arrays = new String[]{"1", "2", "3", "4"};
Observable.from(arrays).contains("1").subscribe(System.out::println);
結果:true
all
Integer[] ints = new Integer[]{1, 2, 3, 4};
Observable.from(ints).all(integer -> integer < 3).subscribe(System.out::println);
結果:false
isEmpty
String[] arrays = new String[]{};
Observable.from(arrays).isEmpty().subscribe(System.out::println);
結果:true
exists
Integer[] ints = new Integer[]{1, 2, 3, 4};
Observable.from(ints).exists(integer -> integer > 3).subscribe(System.out::println);
結果:true
count
Integer[] ints = new Integer[]{1, 2, 3, 4, 1};
Observable.from(ints).count().subscribe(System.out::println);
結果:5
reduce 與scan相似盛霎,不同的是直接輸出結果。
Integer[] ints = new Integer[]{1, 2, 3, 4};
Observable.from(ints).reduce((integer, integer2) -> integer + integer2).subscribe(System.out::println);
Observable.from(ints).reduce(10, (integer, integer2) -> integer + integer2).subscribe(System.out::println);
結果:
10
20
collect
與reduce相似, 但collect是用來將源Observable發(fā)射的數(shù)據(jù)給收集到一個數(shù)據(jù)結構里面
Integer[] ints = new Integer[]{1, 2, 3, 4};
Observable.from(ints)
// .collect((Func0<ArrayList<Integer>>) ArrayList::new, (list, integer) -> list.add(integer))
.collect((Func0<ArrayList<Integer>>) ArrayList::new, ArrayList::add)
.subscribe(System.out::println);
buffer
Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline())
.buffer(3)
.subscribe(System.out::println);
結果:每三個放入緩存再輸出
[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
[9, 10, 11]
[12, 13, 14]
[15, 16, 17]
耽装。愤炸。。
Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline())
.buffer(3, 2) //(多少為一組剂邮, 生成新的buffer的間隔)
.subscribe(System.out::println);
結果:
[0, 1, 2]
[2, 3, 4]
[4, 5, 6]
[6, 7, 8]
[8, 9, 10]
[10, 11, 12]
摇幻。。挥萌。
RxJava1.0 與RxJava2.0區(qū)別:
- Rxjava1.0資源庫都在rx包下,而RxJava2.0都在io.reactivex包下
import rx.Observable;
import rx.Subscriber;
---------------------
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.annotations.NonNull;