RxJava是一個(gè)生產(chǎn)者和消費(fèi)者模型硕并,有生產(chǎn)者Observable和消費(fèi)者Observer,對于簡單的一個(gè)生產(chǎn)者和一個(gè)消費(fèi)者的情況比較簡單缰雇,但真實(shí)業(yè)務(wù)中生產(chǎn)者可能有多個(gè)拒担,且多個(gè)生產(chǎn)者合作生產(chǎn)的情況可能有多種,比如多個(gè)生產(chǎn)者誰先生產(chǎn)第一個(gè)其他的就取消印荔。而RxJava的操作符就是用于組合多個(gè)生產(chǎn)者的邏輯實(shí)現(xiàn)低葫。
1 - "amb"操作符源碼分析
"amb"操作符的作用是,組合多個(gè)ObservableSource保證誰先生產(chǎn)出數(shù)據(jù)仍律,就只接收到ObservableSource的數(shù)據(jù)嘿悬,操作符示意圖如下:
public abstract class Observable<T> implements ObservableSource<T> {
public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
// 該操作把多個(gè)ObservableSource轉(zhuǎn)化為一個(gè)ObservableAmb對象
return RxJavaPlugins.onAssembly(new ObservableAmb<T>(null, sources));
}
}
public final class ObservableAmb<T> extends Observable<T> {
final ObservableSource<? extends T>[] sources;
final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
public ObservableAmb(ObservableSource<? extends T>[] sources, Iterable<? extends ObservableSource<? extends T>> sourcesIterable) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
}
public void subscribeActual(Observer<? super T> s) {
ObservableSource<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
// 1 - 這里的邏輯是把sourcesIterable中的對象添加到source數(shù)組中
...
} else {
count = sources.length;
}
if (count == 0) {
EmptyDisposable.complete(s);
return;
} else
if (count == 1) {
sources[0].subscribe(s);
return;
}
// 2 - 創(chuàng)建一個(gè)AmbCoordinator對象
AmbCoordinator<T> ac = new AmbCoordinator<T>(s, count);
// 調(diào)用各個(gè)ObservableSource的subscribe()
ac.subscribe(sources);
}
// 3 - 處理多個(gè)ObservableSource競爭生產(chǎn)權(quán)的邏輯處理類,通過為沒一個(gè)ObservableSource創(chuàng)建一個(gè)Observer水泉,
// 當(dāng)對應(yīng)的Observer收到數(shù)據(jù)時(shí)善涨,來競爭是否贏得生產(chǎn)權(quán),如果贏得生產(chǎn)權(quán)就把其他的Observer dispose草则,
// 保證只有贏得生產(chǎn)權(quán)的ObservableSource來發(fā)送數(shù)據(jù)給原始Observer
static final class AmbCoordinator<T> implements Disposable {
final Observer<? super T> actual;
final AmbInnerObserver<T>[] observers;
final AtomicInteger winner = new AtomicInteger();
@SuppressWarnings("unchecked")
AmbCoordinator(Observer<? super T> actual, int count) {
this.actual = actual;
this.observers = new AmbInnerObserver[count];
}
// 處理sources中各個(gè)ObservableSource的subscribe邏輯
public void subscribe(ObservableSource<? extends T>[] sources) {
AmbInnerObserver<T>[] as = observers;
int len = as.length;
for (int i = 0; i < len; i++) {
// 4 - 把Observer轉(zhuǎn)為AmbInnerObserver钢拧,AmbInnerObserver發(fā)送數(shù)據(jù)時(shí)通知到AmbCoordinator中來處理誰贏得了生產(chǎn)權(quán)
as[i] = new AmbInnerObserver<T>(this, i + 1, actual);
}
winner.lazySet(0); // release the contents of 'as'
// 5 - 調(diào)用Observer.onSubscribe()
actual.onSubscribe(this);
for (int i = 0; i < len; i++) {
// 6 - 如果已經(jīng)有ObservableSource贏取了生產(chǎn)權(quán),不再處理
if (winner.get() != 0) {
return;
}
// 7 -如果還沒哪個(gè)ObservableSource贏得生產(chǎn)權(quán)炕横,就調(diào)用所有ObservableSource.subscribe()
sources[i].subscribe(as[i]);
}
}
// 8 - 當(dāng)AmbInnerObserver收到數(shù)據(jù)時(shí)調(diào)用源内,代表AmbInnerObserver對應(yīng)的ObservableSource贏得了生產(chǎn)權(quán),
// 把其他的AmbInnerObserver dispose份殿。保證只有贏得生產(chǎn)權(quán)的ObservableSource生產(chǎn)數(shù)據(jù)膜钓。
public boolean win(int index) {
int w = winner.get();
if (w == 0) {
// 如果還沒人贏得生產(chǎn)權(quán),就設(shè)置該index卿嘲,表示該index對應(yīng)得Observable贏得生產(chǎn)權(quán)
if (winner.compareAndSet(0, index)) {
AmbInnerObserver<T>[] a = observers;
int n = a.length;
// 贏得生產(chǎn)權(quán)后把其他得Observer dispose
for (int i = 0; i < n; i++) {
if (i + 1 != index) {
a[i].dispose();
}
}
return true;
}
return false;
}
return w == index;
}
...
}
// 9 - 用于轉(zhuǎn)發(fā)生產(chǎn)者發(fā)送過來的數(shù)據(jù)并轉(zhuǎn)發(fā)給原始的Observer呻此,同時(shí)在收到數(shù)據(jù)時(shí),調(diào)用AmbCoordinator.win(index)來贏得生產(chǎn)權(quán)
static final class AmbInnerObserver<T> extends AtomicReference<Disposable> implements Observer<T> {
private static final long serialVersionUID = -1185974347409665484L;
final AmbCoordinator<T> parent;
final int index;
final Observer<? super T> actual;
boolean won;
AmbInnerObserver(AmbCoordinator<T> parent, int index, Observer<? super T> actual) {
this.parent = parent;
this.index = index;
this.actual = actual;
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this, s);
}
@Override
public void onNext(T t) {
if (won) {
// 10 - 已經(jīng)贏得來生產(chǎn)權(quán)腔寡,直接轉(zhuǎn)發(fā)數(shù)據(jù)
actual.onNext(t);
} else {
// 11 - 競爭焚鲜,贏得生產(chǎn)權(quán)
if (parent.win(index)) {
// 12 - 贏得生產(chǎn)權(quán)
won = true;
actual.onNext(t);
} else {
// 13 - 沒有贏得生產(chǎn)權(quán)
get().dispose();
}
}
}
@Override
public void onError(Throwable t) {
// 14 - 與onNext()處理一樣
if (won) {
actual.onError(t);
} else {
if (parent.win(index)) {
won = true;
actual.onError(t);
} else {
RxJavaPlugins.onError(t);
}
}
}
@Override
public void onComplete() {
// 15 - 與onNext()處理一樣
if (won) {
actual.onComplete();
} else {
if (parent.win(index)) {
won = true;
actual.onComplete();
}
}
}
public void dispose() {
DisposableHelper.dispose(this);
}
}
}
從上面代碼分析可知,"amb"操作符是把多個(gè)ObservableSource先轉(zhuǎn)化為ObservableAmb對象放前,并在ObservableAmb中為每個(gè)ObservableSource創(chuàng)建對應(yīng)的包裝的AmbInnerObserver(可把收到的數(shù)據(jù)轉(zhuǎn)發(fā)給原始Observer)忿磅,并進(jìn)行訂閱操作,當(dāng)某個(gè)AmbInnerObserver先收到數(shù)據(jù)時(shí)凭语,代表ObservableSource贏得生產(chǎn)權(quán)葱她,把其他的AmbInnerObserver dispose,保證只接收贏得生產(chǎn)權(quán)的ObservableSource生產(chǎn)的數(shù)據(jù)似扔。
2 - "concatArray"操作符源碼分析
"concatArray"操作符的作用是吨些,組合多個(gè)ObservableSource按照順序依次生產(chǎn)發(fā)送數(shù)據(jù)搓谆,即多個(gè)ObservableSource一個(gè)生產(chǎn)完數(shù)據(jù)之后再下一個(gè),依次下去知道所有的結(jié)束豪墅,Observer接收所有ObservableSource發(fā)送的數(shù)據(jù)泉手,操作符示意圖如下:
public abstract class Observable<T> implements ObservableSource<T> {
public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources) {
if (sources.length == 0) {
return empty();
} else
if (sources.length == 1) {
return wrap((ObservableSource<T>)sources[0]);
}
// 1 - fromArray(sources)把多個(gè)ObservableSource轉(zhuǎn)為一個(gè)發(fā)送這些ObservableSource的Observable
// 并把轉(zhuǎn)化后的Observable轉(zhuǎn)為ObservableConcatMap對象,記住此處arrayObservable=fromArray(sources)發(fā)送的是參數(shù)的sources
// 2 - 下一步看看ObservableConcatMap
return RxJavaPlugins.onAssembly(new ObservableConcatMap(fromArray(sources), Functions.identity(), bufferSize(), ErrorMode.BOUNDARY));
}
}
public final class ObservableConcatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
public ObservableConcatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
int bufferSize, ErrorMode delayErrors) {
super(source);
this.mapper = mapper;
this.delayErrors = delayErrors;
this.bufferSize = Math.max(8, bufferSize);
}
@Override
public void subscribeActual(Observer<? super U> s) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, s, mapper)) {
return;
}
if (delayErrors == ErrorMode.IMMEDIATE) {
SerializedObserver<U> serial = new SerializedObserver<U>(s);
// 3 - 這里的source是arrayObservable偶器,即那個(gè)發(fā)送原始sources的Observable
// 把原始Observer轉(zhuǎn)為SourceObserver斩萌,這種是接收到error就結(jié)束,下面的情況和這差不多就不看了屏轰,
// 下面看看SourceObserver
source.subscribe(new SourceObserver<T, U>(serial, mapper, bufferSize));
} else {
source.subscribe(new ConcatMapDelayErrorObserver<T, U>(s, mapper, bufferSize, delayErrors == ErrorMode.END));
}
}
static final class SourceObserver<T, U> extends AtomicInteger implements Observer<T>, Disposable {
SourceObserver(Observer<? super U> actual,
Function<? super T, ? extends ObservableSource<? extends U>> mapper, int bufferSize) {
this.actual = actual;
this.mapper = mapper;
this.bufferSize = bufferSize;
// 把原始Observer轉(zhuǎn)為InnerObserver
this.inner = new InnerObserver<U>(actual, this);
}
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY);
// 4 - 對于concatArray這里是true颊郎,可以看ObservableFromArray中的requestFusion邏輯
if (m == QueueDisposable.SYNC) {
fusionMode = m;
// 5 - 這里把QueueDisposable存為queue,這里的QueueDisposable對應(yīng)的在ObservableFromArray中霎苗,即保存了所有原始sources
queue = qd;
done = true;
actual.onSubscribe(this);
// 6 - 處理發(fā)送數(shù)據(jù)邏輯
drain();
return;
}
if (m == QueueDisposable.ASYNC) {
fusionMode = m;
queue = qd;
actual.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
void drain() {
if (getAndIncrement() != 0) {
return;
}
for (;;) {
if (disposed) {
queue.clear();
return;
}
// 7 - 沒有正在處理的ObservableSource
if (!active) {
boolean d = done;
T t;
try {
// 8 - 取出一個(gè)ObservableSource
t = queue.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
dispose();
queue.clear();
actual.onError(ex);
return;
}
boolean empty = t == null;
if (d && empty) {
disposed = true;
actual.onComplete();
return;
}
if (!empty) {
ObservableSource<? extends U> o;
try {
o = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
dispose();
queue.clear();
actual.onError(ex);
return;
}
// 9 - 標(biāo)記為true姆吭,for循環(huán)不在處理ObservableSource
active = true;
// 10 - 處理訂閱原始的ObservableSource
o.subscribe(inner);
}
}
if (decrementAndGet() == 0) {
break;
}
}
}
void innerComplete() {
// 13 - 比較為false,drain()中可以處理下一個(gè)ObservableSource
active = false;
// 14 - 調(diào)drain()唁盏,處理下一個(gè)ObservableSource
drain();
}
}
static final class InnerObserver<U> extends AtomicReference<Disposable> implements Observer<U> {
...
@Override
public void onError(Throwable t) {
// 11 - Error内狸,整個(gè)流程結(jié)束
parent.dispose();
actual.onError(t);
}
@Override
public void onComplete() {
// 12 - 一個(gè)成功結(jié)束,調(diào)用SourceObserver.innerComplete()
parent.innerComplete();
}
void dispose() {
DisposableHelper.dispose(this);
}
}
}
從上面的代碼分析可清除了解到升敲,"concatArray"操作符是把多個(gè)ObservableSource轉(zhuǎn)為一個(gè)ObservableFromArray類型的Observable(注意:在ObservableFromArray中把sources放進(jìn)FromArrayDisposable,之后傳遞給Observer.onSubscribe(FromArrayDisposable))轰传,然后又把ObservableFromArray轉(zhuǎn)為ObservableConcatMap類型Observable驴党,在ObservableConcatMap中把原始Observer轉(zhuǎn)為SourceObserver,在SourceObserver中取出FromArrayDisposable中的sources获茬,逐個(gè)ObservableSource進(jìn)行生產(chǎn)數(shù)據(jù)港庄,并把原始的Observer轉(zhuǎn)為InnerObserver配合實(shí)現(xiàn)逐個(gè)ObservableSource生產(chǎn)數(shù)據(jù)。
總結(jié)語
操作符就只分析這兩個(gè)恕曲,因?yàn)閷?shí)在太多了鹏氧,大家感興趣可以自己去看看源碼∨逡ィ總的來說操作符得邏輯就是把原始得一個(gè)或多個(gè)Observable轉(zhuǎn)為另一種Observable來處理相關(guān)邏輯把还,把原始Observer轉(zhuǎn)為另一種Observer出來相關(guān)邏輯。
RxJava源碼分析系列文章主題目錄:
- 1. RxJava源碼分析-----初始篇
- 2. RxJava源碼分析之 --- 訂閱過程和線程切換
- RxJava源碼分析之 --- Backpressure
- RxJava源碼分析之 --- hook