簡要:
需求了解:
Rxjava中的普通的 Observable
在觀察者訂閱的時候就會發(fā)射數據,但是有的時候我們想自己控制數據的發(fā)射岩喷,比如在有指定的觀察者或者全部的觀察者訂閱后開始發(fā)射數據呆奕,這個時候我們就要要用到Rxjava中的可連接的Observable來完成這個需求甘畅。
這一節(jié)主要介紹 ConnectableObservable
和它的子類以及它們的操作符:
- ConnectableObservable: 一個可連接的Observable慨蓝,在訂閱后不發(fā)射數據剧防,調用 connect() 方法后開始發(fā)射數據京革。
- Observable.publish():將一個Observable轉換為一個可連接的Observable 奇唤。
- ConnectableObservable.connect():指示一個可連接的Observable開始發(fā)射數據。
- Observable.replay():確保所有的訂閱者看到相同的數據序列匹摇,即使它們在Observable開始發(fā)射數據之后才訂閱咬扇。
- ConnectableObservable.refCount():讓一個可連接的Observable表現得像一個普通的Observable。
- Observable.share():可以直接將Observable轉換為一個具有ConnectableObservable特性的Observable對象廊勃,等價于Observable.publish().refCount()
- Observable.replay():保證所有的觀察者收到相同的數據序列懈贺,即使它們在Observable開始發(fā)射數據之后才訂閱。
1. ConnectableObservable
一個可連接的Observable(ConnectableObservable
)與普通的Observable差不多供搀。不同之處:可連接的Observable在被訂閱時并不開始發(fā)射數據隅居,只有在它的 connect()
被調用時才開始。用這種方法葛虐,你可以等部分或者所有的潛在訂閱者都訂閱了這個Observable之后才開始發(fā)射數據胎源。
注意: ConnectableObservable 的線程切換只能通過
replay
操作符來實現,普通 Observable 的 subscribeOn()
和 observerOn()
在 ConnectableObservable 中不起作用屿脐√樵椋可以通過 replay 操作符的指定線程調度器的方式來進行線程的切換。
Javadoc: ConnectableObservable
2. Publish
將普通的Observable轉換為可連接的Observable(ConnectableObservable
)的诵。
如果要使用可連接的Observable万栅,可以使用Observable的 publish
操作符,來將相應轉換為ConnectableObservable對象西疤。
有一個變體接受一個函數作為參數(publish(Function selector)
)烦粒。這個函數用原始Observable發(fā)射的數據作為參數,產生 一個新的數據作為 ConnectableObservable 給發(fā)射,替換原位置的數據項扰她。實質是在簽名的基礎上添加一個 Map 操作兽掰。
簡單實例:
// 1. publish()
// 創(chuàng)建ConnectableObservable
ConnectableObservable<Integer> connectableObservable = Observable.range(1, 5)
.publish(); // publish操作將Observable轉化為一個可連接的Observable
// 2. publish(Function<Observable<T>, ObservableSource<R>> selector)
// 接受原始Observable的數據,產生一個新的Observable徒役,可以對這個Observable進行函數處理
Observable<String> publish = Observable.range(1, 5)
.publish(new Function<Observable<Integer>, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Observable<Integer> integerObservable) throws Exception {
System.out.println("--> apply(4): " + integerObservable.toString());
Observable<String> map = integerObservable.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "[this is map value]: " + integer * integer;
}
});
return map;
}
});
publish.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("--> accept(4): " + s);
}
});
輸出:
--> apply(4): io.reactivex.subjects.PublishSubject@3fb4f649
--> accept(4): [this is map value]: 1
--> accept(4): [this is map value]: 4
--> accept(4): [this is map value]: 9
--> accept(4): [this is map value]: 16
--> accept(4): [this is map value]: 25
Javadoc: Observable.publish()
Javadoc: Observable.publish(Function<Observable<T>,ObservableSource<R> selector)
3. Connect
讓一個可連接的Observable開始發(fā)射數據給訂閱者孽尽。
- 可連接的Observable (connectableObservable)與普通的Observable差不多,不過它并不會在被訂閱時開始發(fā)射數據忧勿,而是直到使用了 Connect 操作符時才會開始杉女。
- RxJava中 connect 是 ConnectableObservable 接口的一個方法,使用 publish 操作符可以將一個普通的Observable轉換為一個 ConnectableObservable 鸳吸。
- 調用 ConnectableObservable 的
connect
方法會讓它后面的Observable開始給發(fā)射數據給訂閱 者熏挎。connect 方法返回一個 Subscription 對象,可以調用它的 unsubscribe 方法讓Observable停 止發(fā)射數據給觀察者层释。 - 即使沒有任何訂閱者訂閱它婆瓜,你也可以使用 connect 方法讓一個Observable開始發(fā)射數據 (或者開始生成待發(fā)射的數據)。這樣贡羔,你可以將一個"冷"的Observable變?yōu)?熱"的廉白。
實例代碼:
// 1. publish()
// 創(chuàng)建ConnectableObservable
ConnectableObservable<Integer> connectableObservable = Observable.range(1, 5)
.publish(); // publish操作將Observable轉化為一個可連接的Observable
// 創(chuàng)建普通的Observable
Observable<Integer> range = Observable.range(1, 5);
// 1.1 connectableObservable在被訂閱時并不開始發(fā)射數據,只有在它的 connect() 被調用時才開始
connectableObservable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(1)");
}
@Override
public void onNext(Integer integer) {
System.out.println("--> onNext(1): " + integer);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(1): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onComplete(1)");
}
});
// 1.2 connectableObservable在被訂閱時并不開始發(fā)射數據乖寒,只有在它的 connect() 被調用時才開始
connectableObservable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(2)");
}
@Override
public void onNext(Integer integer) {
System.out.println("--> onNext(2): " + integer);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(2): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onComplete(2)");
}
});
// 1.3 普通Observable在被訂閱時就會發(fā)射數據
range.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("--> onSubscribe(3)");
}
@Override
public void onNext(Integer integer) {
System.out.println("--> onNext(3): " + integer);
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(3): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onComplete(3)");
}
});
System.out.println("----------------start connect------------------");
// 可連接的Observable在被訂閱時并不開始發(fā)射數據猴蹂,只有在它的connect()被調用時才開始發(fā)射數據
// connectableObservable.connect();
// 可選參數Consumer,返回一個Disposable對象楣嘁,可以獲取訂閱狀態(tài)和取消當前的訂閱
connectableObservable.connect(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println("--> connect accept: " + disposable.isDisposed());
// disposable.dispose();
}
});
輸出:
--> onSubscribe(1)
--> onSubscribe(2)
--> onSubscribe(3)
--> onNext(3): 1
--> onNext(3): 2
--> onNext(3): 3
--> onNext(3): 4
--> onNext(3): 5
--> onComplete(3)
----------------start connect------------------
--> connect accept: false
--> onNext(1): 1
--> onNext(2): 1
--> onNext(1): 2
--> onNext(2): 2
--> onNext(1): 3
--> onNext(2): 3
--> onNext(1): 4
--> onNext(2): 4
--> onNext(1): 5
--> onNext(2): 5
--> onComplete(1)
--> onComplete(2)
Javadoc: ConnectableObservable.connect()
Javadoc: ConnectableObservable.connect(Consumer<Disposable> connection)
4. RefCount
RefCount
的作用是讓一個可連接的Observable行為像普通的Observable磅轻。
RefCount 操作符把從一個可連接的Observable連接和斷開的過程自動化了。它操作一個可連接的Observable逐虚,返回一個普通的Observable聋溜。當第一個訂閱者訂閱這個Observable 時, RefCount 連接到下層的可連接Observable叭爱。 RefCount 跟蹤有多少個觀察者訂閱它撮躁,直到最后一個觀察者完成才斷開與下層可連接Observable的連接。
解析: refCount()
把 ConnectableObservable 變?yōu)橐粋€普通的 Observable 但又保持了 ConnectableObservable 的特性买雾。如果出現第一個 Observer把曼,它就會自動調用 connect()
,如果所有的 Observer 全部 dispose
漓穿,那么它也會停止接受上游 Observable 的數據嗤军。
實例代碼:
/**
* refCount(int subscriberCount, long timeout, TimeUnit unit, Scheduler scheduler)
*
* 具有以下可選參數:
* subscriberCount: 指定需要連接到上游的訂閱者數量。注意:當訂閱者滿足此數量后才會處理
* timeout: 所有訂閱用戶退訂后斷開連接前的等待時間
* unit: 時間單位
* scheduler: 斷開連接之前要等待的目標調度器
*/
Observable<Long> refCountObservable = Observable
.intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)
.publish()
.refCount()
.subscribeOn(Schedulers.newThread()) // 指定訂閱調度在子線程
.observeOn(Schedulers.newThread()); // 指定觀察者調度在子線程
// .refCount(1, 500, TimeUnit.MILLISECONDS, Schedulers.newThread());
// 第1個訂閱者
refCountObservable.subscribe(new Observer<Long>() {
private Disposable disposable;
private int buff = 0;
@Override
public void onSubscribe(Disposable d) {
System.out.println("----> onSubscribe(1): ");
disposable = d;
}
@Override
public void onNext(Long value) {
if (buff == 3) {
disposable.dispose(); // 解除當前的訂閱
System.out.println("----> Subscribe(1) is dispose! ");
} else {
System.out.println("--> onNext(1): " + value);
}
buff++;
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(1): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onComplete(1): ");
}
});
// 第2個訂閱者
refCountObservable.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println("----> onSubscribe(2): ");
}
})
.delaySubscription(2, TimeUnit.SECONDS) // 延遲2秒后訂閱
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long value) throws Exception {
System.out.println("--> accept(2): " + value);
}
});
System.in.read();
輸出:
----> onSubscribe(1):
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
----> onSubscribe(2):
----> Subscribe(1) is dispose!
--> accept(2): 4
--> accept(2): 5
Javadoc: ConnectableObservable.refCount(subscriberCount, timeout, unit, scheduler)
5. Share
一個普通的Observable可以通過 publish
來將其轉換為ConnectableObservable晃危,然后可以調用其 refCount()
的方法將其轉換為一個具有 ConnectableObservable 特性的Observable叙赚。
其實Observable中還有一個操作方法,可以直接完成此步驟的操作,這就是 Observable.share()
操作符纠俭。
可以來看一下share操作符的源碼:
public final Observable<T> share() {
return publish().refCount();
}
通過源碼可以知道沿量,share() 方法可以直接將Observable轉換為一個具有ConnectableObservable特性的Observable對象,即Observable.publish().refCount() == Observable.share()
冤荆。
實例代碼:
// share()
// 通過share() 同時應用 publish 和 refCount 操作
Observable<Long> share = Observable
.intervalRange(1, 5, 0, 500, TimeUnit.MILLISECONDS)
// .publish().refCount()
.share() // 等價于上面的操作
.subscribeOn(Schedulers.newThread()) // 指定訂閱調度在子線程
.observeOn(Schedulers.newThread()); // 指定觀察者調度在子線程
// 1. 第一個訂閱者
share.subscribe(new Observer<Long>() {
private Disposable disposable;
private int buff = 0;
@Override
public void onSubscribe(Disposable d) {
System.out.println("----> onSubscribe(1): ");
disposable = d;
}
@Override
public void onNext(Long value) {
if (buff == 3) {
disposable.dispose(); // 解除當前的訂閱
System.out.println("----> Subscribe(1) is dispose! ");
} else {
System.out.println("--> onNext(1): " + value);
}
buff++;
}
@Override
public void onError(Throwable e) {
System.out.println("--> onError(1): " + e);
}
@Override
public void onComplete() {
System.out.println("--> onComplete(1): ");
}
});
// 2. 第二個訂閱者
share.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println("----> onSubscribe(2): ");
}
})
.delaySubscription(1, TimeUnit.SECONDS) // 延遲1秒后訂閱
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long value) throws Exception {
System.out.println("--> accept(2): " + value);
}
});
System.in.read();
輸出:
----> onSubscribe(1):
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
----> onSubscribe(2):
----> Subscribe(1) is dispose!
--> accept(2): 4
--> accept(2): 5
Javadoc: Observable.share()
6. Replay
保證所有的觀察者收到相同的數據序列,即使它們在Observable開始發(fā)射數據之后才訂閱权纤。
如果在將一個Observable轉換為可連接的Observable之前對它使用 Replay 操作符钓简,產生的這個可連接Observable將總是發(fā)射完整的數據序列給任何未來的觀察者,可以緩存發(fā)射過的數據汹想,即使那些觀察者在這 個Observable開始給其它觀察者發(fā)射數據之后才訂閱外邓。
注意: replay操作符生成的 connectableObservable
,如果沒有對緩存進行限定古掏,那么無論觀察者何時去訂閱损话,都可以收到 Observable 完整的數據序列項。
replay
操作符最好根據實際情況限定緩存的大小槽唾,否則數據發(fā)射過快或者較多時會占用很高的內存丧枪。replay
操作符有可以接受不同參數的變體,有的可以指定 replay
的最大緩存數量或者指定緩存時間庞萍,還可以指定調度器拧烦。
- replay不僅可以緩存Observable的所有數據序列,也可以進行限定緩存大小的操作钝计。
- 還有有一種 replay 返回一個普通的Observable恋博。它可以接受一個變換函數為參數,這個函數接受原始Observable發(fā)射的數據項為參數私恬,返回結果Observable要發(fā)射的一項數據债沮。因此,這個操作符其實是 replay 變換之后的數據項本鸣。
實例代碼:
// 創(chuàng)建發(fā)射數據的Observable
Observable<Long> observable = Observable
.intervalRange(1,
10,
1,
500,
TimeUnit.MILLISECONDS,
Schedulers.newThread());
/**
* 1.1 replay(Scheduler scheduler)
* 可選參數:scheduler, 指定線程調度器
* 接受原始數據的所有數據
*/
// ConnectableObservable<Long> replay1 = observable.replay();
/**
* 1.2 replay(int bufferSize, Scheduler scheduler)
* 可選參數:scheduler, 指定線程調度器
* 只緩存 bufferSize 個最近的原始數據
*/
// ConnectableObservable<Long> replay1 = observable.replay(1); // 設置緩存大小為1, 從原數據中緩存最近的1個數據
/**
* 1.3 replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler)
* 可選參數:scheduler, 指定線程調度器
* 在訂閱前指定的時間段內緩存 bufferSize 個數據, 注意計時開始是原始數據發(fā)射第1個數據項之后開始
*/
// ConnectableObservable<Long> replay1 = observable.replay(5, 1000, TimeUnit.MILLISECONDS);
/**
* 1.4 replay(long time, TimeUnit unit, Scheduler scheduler)
* 可選參數:scheduler, 指定線程調度器
* 在訂閱前指定的時間段內緩存數據, 注意計時開始是原始數據發(fā)射第1個數據項之后開始
*/
ConnectableObservable<Long> replay1 = observable.replay( 1000, TimeUnit.MILLISECONDS);
// 進行 connect 操作
replay1.connect();
// 第一個觀察者
replay1.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println("----> onSubScribe(1-1)");
}
}).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("--> accept(1-1): " + aLong);
}
});
// 第二個觀察者(延遲1秒后訂閱)
replay1.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println("----> onSubScribe(1-2)");
}
}).delaySubscription(1, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("--> accept(1-2): " + aLong);
}
});
// 第三個觀察者(延遲2秒后訂閱)
replay1.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println("----> onSubScribe(1-3)");
}
}).delaySubscription(2, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("--> accept(1-3): " + aLong);
}
});
System.in.read();
System.out.println("----------------------------------------------------------");
/**
* 2. replay(Function<Observable<T>, ObservableSource<R>> selector,
* int bufferSize, 可選參數: 指定從元數據序列數據的緩存大小
* long time, TimeUnit unit, 可選參數: 指定緩存指定時間段的數據序列
* Scheduler scheduler) 可選參數: 指定線程調度器
*
* 接受一個變換函數 function 為參數疫衩,這個函數接受原始Observable發(fā)射的數據項為參數
* 通過指定的函數處理后,返回一個處理后的Observable
*/
Observable<String> replayObservable = observable.replay(new Function<Observable<Long>, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Observable<Long> longObservable) throws Exception {
// 對原始數據進行處理
Observable<String> map = longObservable.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
return aLong + "2 = " + aLong * aLong; // 將原始數據進行平方處理永高,并轉換為字符串數據類型
}
});
return map;
}
}, 1, Schedulers.newThread());
replayObservable.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread());
// 第一個觀察者
replayObservable.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println("--> onSubScribe(2-1)");
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("--> accept(2-1): " + s);
}
});
// 訂閱第二個觀察者 (延遲2秒后訂閱)
replayObservable.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println("--> onSubScribe(2-2)");
}
}).delaySubscription(2, TimeUnit.SECONDS)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("--> accept(2-2): " + s);
}
});
System.in.read();
輸出:
----> onSubScribe(1-1)
--> accept(1-1): 1
--> accept(1-1): 2
--> accept(1-1): 3
----> onSubScribe(1-2)
--> accept(1-2): 2
--> accept(1-2): 3
--> accept(1-1): 4
--> accept(1-2): 4
--> accept(1-1): 5
--> accept(1-2): 5
----> onSubScribe(1-3)
--> accept(1-3): 4
--> accept(1-3): 5
--> accept(1-1): 6
--> accept(1-2): 6
--> accept(1-3): 6
--> accept(1-1): 7
--> accept(1-2): 7
--> accept(1-3): 7
--> accept(1-1): 8
--> accept(1-2): 8
--> accept(1-3): 8
--> accept(1-1): 9
--> accept(1-2): 9
--> accept(1-3): 9
--> accept(1-1): 10
--> accept(1-2): 10
--> accept(1-3): 10
----------------------------------------------------------
--> onSubScribe(2-1)
--> accept(2-1): 12 = 1
--> accept(2-1): 22 = 4
--> accept(2-1): 32 = 9
--> accept(2-1): 42 = 16
--> onSubScribe(2-2)
--> accept(2-1): 52 = 25
--> accept(2-2): 12 = 1
--> accept(2-2): 22 = 4
--> accept(2-1): 62 = 36
--> accept(2-2): 32 = 9
--> accept(2-1): 72 = 49
--> accept(2-1): 82 = 64
--> accept(2-2): 42 = 16
--> accept(2-2): 52 = 25
--> accept(2-1): 92 = 81
--> accept(2-2): 62 = 36
--> accept(2-1): 102 = 100
--> accept(2-2): 72 = 49
--> accept(2-2): 82 = 64
--> accept(2-2): 92 = 81
--> accept(2-2): 102 = 100
Javadoc: Observable.replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler)
Javadoc: Observable.replay(Function<Observable<T>,ObservableSource<R>> selector, int bufferSize, long time, TimeUnit unit, Scheduler scheduler)
小結
Rxjava 的連接操作符主要的核心是 ConnectableObservable
這個可連接的Observable對象的概念隧土。可連接的 Observable 在被訂閱時并不會直接發(fā)射數據命爬,只有在他的 connect() 方法被調用時才會發(fā)射數據曹傀。便于更好的對數據的發(fā)射行為的控制,同時也對數據有很好的操作能力饲宛,可以緩存數據皆愉,指定緩存大小,時間片段緩存等。
提示:以上使用的Rxjava2版本: 2.2.12
Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例
實例代碼: