目錄
【RxJava】- 創(chuàng)建操作符源碼分析
【RxJava】- 變換操作符源碼分析
【RxJava】- 過(guò)濾操作符源碼分析
【RxJava】- 結(jié)合操作符源碼分析
Connect
讓一個(gè)可連接的Observable開(kāi)始發(fā)射數(shù)據(jù)給訂閱者漓骚,可連接的Observable (connectable Observable)與普通的Observable差不多,不過(guò)它并不會(huì)在被訂閱時(shí)開(kāi)始發(fā)射數(shù)據(jù)预麸,而是直到使用了Connect操作符時(shí)才會(huì)開(kāi)始。用這個(gè)方法彭羹,你可以等待所有的觀察者都訂閱了Observable之后再開(kāi)始發(fā)射數(shù)據(jù)奕筐。
RxJava中connect是ConnectableObservable接口的一個(gè)方法工扎,使用publish操作符可以將一個(gè)普通的Observable轉(zhuǎn)換為一個(gè)ConnectableObservable徘钥。
Observable.create(null).replay().connect();
實(shí)現(xiàn)類
ConnectConsumer
自己查看,很簡(jiǎn)單肢娘,就幾行代碼呈础。
調(diào)用 connect(cc)方法,返回disposable實(shí)例橱健。
Replay
保證所有的觀察者收到相同的數(shù)據(jù)序列而钞,即使它們?cè)贠bservable開(kāi)始發(fā)射數(shù)據(jù)之后才訂閱.
可連接的Observable (connectable Observable)與普通的Observable差不多,不過(guò)它并不會(huì)在被訂閱時(shí)開(kāi)始發(fā)射數(shù)據(jù)拘荡,而是直到使用了Connect操作符時(shí)才會(huì)開(kāi)始臼节。用這種方法,你可以在任何時(shí)候讓一個(gè)Observable開(kāi)始發(fā)射數(shù)據(jù)珊皿。
實(shí)現(xiàn)類
ObservableReplay
Replay還有其它實(shí)現(xiàn)類网缝,請(qǐng)自己查看。這里講解replay()沒(méi)有傳入?yún)?shù)的實(shí)現(xiàn)蟋定》垭看一下創(chuàng)建過(guò)程,最終創(chuàng)建ObservableReplay實(shí)例傳入的參數(shù)如下:
- source
被觀察者 - bufferFactory
UnBoundedFactory實(shí)例 - curr
final AtomicReference<ReplayObserver<T>> curr = new AtomicReference<>(); - onSubscribe
ObservableSource<T> onSubscribe = new ReplaySource<>(curr, bufferFactory);
首先調(diào)用connect(Consumer<? super Disposable> connection)方法
返回初始化容量為16的UnboundedReplayBuffer數(shù)組
ReplayBuffer<T> buf = bufferFactory.call();
保存新創(chuàng)建的ReplayObserver對(duì)象
if (!current.compareAndSet(ps, u)){continue;}
調(diào)用accept方法
connection.accept(ps);
這里調(diào)用的是上面Connect中的ConnectConsumer類中的accept方法驶兜。
public void accept(Disposable t) {
this.disposable = t;
}
調(diào)用被觀察者
if (doConnect) {
source.subscribe(ps);
}
調(diào)用onNext(T t)
public void onNext(T t) {
if (!done) {
// 保存到數(shù)據(jù)
buffer.next(t);
replay();
}
}
核心邏輯就在replay中扼仲,自己查看。
Publish
將普通的Observable轉(zhuǎn)換為可連接的Observable
ConnectableObservable observable = Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
ObservableEmitter<Integer> observableEmitter = emitter.serialize();
try {
// 序列化
if (!emitter.isDisposed()) {
for (int i = 1; i < 3; i++) {
System.out.println("create operate--->emitter: "+i);
if (1==i){
// ExceptionHelper.TERMINATED
// observableEmitter.onError(new Throwable("error"));
}else {
observableEmitter.onNext(i);
}
}
observableEmitter.onComplete();
}
} catch (Exception e) {
observableEmitter.onError(e);
}
}).publish();
observable.subscribe(new Observer<Integer>() {...});
observable.connect();
observable.connect()不調(diào)用促王,是不會(huì)發(fā)射數(shù)據(jù)的犀盟。
實(shí)現(xiàn)類
ObservablePublish
看subscribeActual方法
protected void subscribeActual(Observer<? super T> observer) {
...
InnerDisposable<T> inner = new InnerDisposable<>(observer, co
observer.onSubscribe(inner);
if (conn.add(inner)) {
if (inner.isDisposed()) {
conn.remove(inner);
}
return;
}
...
}
conn.add(inner)為每一個(gè)訂閱者創(chuàng)建InnerDisposable對(duì)象,然后保存在PublishConnection中的一個(gè)數(shù)組中蝇狼。
看connect方法
public void connect(Consumer<? super Disposable> connection) {
...
if (doConnect) {
source.subscribe(conn);
}
}
調(diào)用被觀察者subscribe對(duì)象,然后可以在里面發(fā)射數(shù)據(jù)倡怎。
PublishConnection中的 onNext方法
public void onNext(T t) {
for (InnerDisposable<T> inner : get()) {
inner.downstream.onNext(t);
}
調(diào)用每一個(gè)訂閱者的onNext方法迅耘,將數(shù)據(jù)發(fā)射給訂閱者。
RefCount
讓一個(gè)可連接的Observable行為像普通的Observable监署〔ǎ可連接的Observable (connectable Observable)與普通的Observable差不多,不過(guò)它并不會(huì)在被訂閱時(shí)開(kāi)始發(fā)射數(shù)據(jù)钠乏,而是直到使用了Connect操作符時(shí)才會(huì)開(kāi)始栖秕。用這種方法,你可以在任何時(shí)候讓一個(gè)Observable開(kāi)始發(fā)射數(shù)據(jù)晓避。
Observable observable = Observable.create((ObservableOnSubscrib
ObservableEmitter<Integer> observableEmitter = emitter.seri
try {
// 序列化
if (!emitter.isDisposed()) {
for (int i = 0; i < 4; i++) {
System.out.println("create operate--->emitter:
if (1==i){
// ExceptionHelper.TERMINATED
observableEmitter.onError(new Throwable("
}else {
observableEmitter.onNext(i);
}
}
observableEmitter.onComplete();
}
} catch (Exception e) {
observableEmitter.onError(e);
}
}).publish().refCount(2);
observable.subscribe(...);
observable.subscribe(...);
實(shí)現(xiàn)類
ObservableRefCount
看一下subscribeActual方法
protected void subscribeActual(Observer<? super T> observer) {
RefConnection conn;
boolean connect = false;
synchronized (this) {
conn = connection;
if (conn == null) {
conn = new RefConnection(this);
connection = conn;
}
long c = conn.subscriberCount;
if (c == 0L && conn.timer != null) {
conn.timer.dispose();
}
conn.subscriberCount = c + 1;
if (!conn.connected && c + 1 == n) {
connect = true;
conn.connected = true;
}
}
source.subscribe(new RefCountObserver<>(observer, this, conn));
if (connect) {
source.connect(conn);
}
}
if (!conn.connected && c + 1 == n)當(dāng)訂閱者的數(shù)量等于refCount方法傳入的數(shù)量時(shí)簇捍,表示可以連接只壳,調(diào)用ObservablePublish中的connect方法。就與上面將Publish操作符調(diào)用connect方法作用相同暑塑。
當(dāng)然你可以隨時(shí)調(diào)用connect方法來(lái)強(qiáng)制發(fā)射數(shù)據(jù)吼句。