從創(chuàng)建一個(gè)數(shù)組的Observable說(shuō)起:
public static <T> Observable<T> from(T[] array) {
......
return create(new OnSubscribeFromArray<T>(array));
}
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f)); //RxJavaHooks提供了改寫(xiě)OnSubscribe的鉤子女蜈,可以認(rèn)為直接返回參數(shù)
}
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
OnSubsribeFromArray:
public final class OnSubscribeFromArray<T> implements OnSubscribe<T> {
final T[] array;
public OnSubscribeFromArray(T[] array) {
this.array = array;
}
@Override
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}
當(dāng)調(diào)用Observable的subscribe時(shí)倘零,會(huì)創(chuàng)建一個(gè)ObserverSubscribe包裝Observer
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
if (observer == null) {
throw new NullPointerException("observer is null");
}
return subscribe(new ObserverSubscriber<T>(observer));
}
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// new Subscriber so onStart it
subscriber.onStart();
/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
observable.onSubscribe.call(subscriber);
return subscriber;
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// in case the subscriber can't listen to exceptions anymore
if (subscriber.isUnsubscribed()) {
RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
} else {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
RxJavaHooks.onObservableError(r);
// TODO why aren't we throwing the hook's return value.
throw r; // NOPMD
}
}
return Subscriptions.unsubscribed();
}
}
顯示調(diào)用subscriber的onStart,然后用SafeSubscriber包裝subscriber碟婆,SafeSubscriber是為了保證OnError和OnComplete只會(huì)執(zhí)行其中之一逞频,并且執(zhí)行后不再執(zhí)行onNext
接著調(diào)用OnSubscribeFromArray.call(subscriber)
@Override
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}
static final class FromArrayProducer<T>
extends AtomicLong
implements Producer {
/** */
private static final long serialVersionUID = 3534218984725836979L;
final Subscriber<? super T> child;
final T[] array;
int index;
public FromArrayProducer(Subscriber<? super T> child, T[] array) {
this.child = child;
this.array = array;
}
@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required but it was " + n);
}
if (n == Long.MAX_VALUE) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
fastPath();
}
} else
if (n != 0) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
slowPath(n);
}
}
}
void fastPath() {
final Subscriber<? super T> child = this.child;
for (T t : array) {
if (child.isUnsubscribed()) {
return;
}
child.onNext(t);
}
if (child.isUnsubscribed()) {
return;
}
child.onCompleted();
}
void slowPath(long r) {
final Subscriber<? super T> child = this.child;
final T[] array = this.array;
final int n = array.length;
long e = 0L;
int i = index;
for (;;) {
while (r != 0L && i != n) {
if (child.isUnsubscribed()) {
return;
}
child.onNext(array[i]);
i++;
if (i == n) {
if (!child.isUnsubscribed()) {
child.onCompleted();
}
return;
}
r--;
e--;
}
r = get() + e;
if (r == 0L) {
index = i;
r = addAndGet(e);
if (r == 0L) {
return;
}
e = 0L;
}
}
}
}
Subscriber.setProducer:如果包裝了其他subscriber,則轉(zhuǎn)交給其他subscriber任岸,如果沒(méi)有包裝subscriber再榄,沒(méi)有調(diào)用request,會(huì)調(diào)用p.request(Long.MAX_VALUE)享潜。如果之前調(diào)用了request困鸥,則調(diào)用p.request(n),n為累積的數(shù)量剑按。
public void setProducer(Producer p) {
long toRequest;
boolean passToSubscriber = false;
synchronized (this) {
toRequest = requested;
producer = p;
if (subscriber != null) {
// middle operator ... we pass through unless a request has been made
if (toRequest == NOT_SET) {
// we pass through to the next producer as nothing has been requested
passToSubscriber = true;
}
}
}
// do after releasing lock
if (passToSubscriber) {
subscriber.setProducer(producer);
} else {
// we execute the request with whatever has been requested (or Long.MAX_VALUE)
if (toRequest == NOT_SET) {
producer.request(Long.MAX_VALUE);
} else {
producer.request(toRequest);
}
}
}
FromArrayProducer.request(n)
@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required but it was " + n);
}
if (n == Long.MAX_VALUE) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
fastPath();
}
} else
if (n != 0) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
slowPath(n);
}
}
}
如果n 為 Long.MAX_VALUE疾就,如果之前沒(méi)有請(qǐng)求過(guò)澜术,則調(diào)用fastPath
如果n不為0,則說(shuō)明之前調(diào)用了request猬腰,則要支持背壓鸟废,調(diào)用slowPath
void fastPath() {
final Subscriber<? super T> child = this.child;
for (T t : array) {
if (child.isUnsubscribed()) {
return;
}
child.onNext(t);
}
if (child.isUnsubscribed()) {
return;
}
child.onCompleted();
}
只要沒(méi)有unsubscribe,會(huì)對(duì)數(shù)組中每個(gè)元素調(diào)用Observer的onNext姑荷,然后調(diào)用Observer.onCompleted()
Scheduler
通過(guò)subscribeOn和observeOn可以改變操作符執(zhí)行的線程
subscribeOn只能調(diào)用一次盒延,表示subscribe操作所在的線程,第一次ObserveOn之前所以操作都在subscribeOn所指定的線程
observeOn可以調(diào)用多次厢拭,調(diào)用之后所有的操作都在observeOn指定的線程上進(jìn)行操作
subscribeOn兰英,創(chuàng)建一個(gè)OperatorSubscribeOn將原來(lái)的Observable包裝起來(lái)。
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};
source.unsafeSubscribe(s);
}
});
}
}
從scheduler中create一個(gè)worker供鸠,調(diào)用worker的schedule執(zhí)行包裝Observable的subscribe方法畦贸。而且保證Producer的request方法在worker線程上調(diào)用。
RxJava中的Scheduler有
- NewThreadScheduler楞捂,NewThreadWorker(包裝了一個(gè)ScheduledExecutorService)
- ComputationScheduler:EventLoopsScheduler薄坏,根據(jù)cpu數(shù)量確定worker的數(shù)量,EventLoopWorker包裝了PoolWorker(實(shí)際是NewThreadWorker)
- IOScheduler: CachedThreadScheduler,ThreadWorker: unsubscribe的ThreadWorker可以重用寨闹,長(zhǎng)期不用的會(huì)被清理胶坠。
Map操作符
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return create(new OnSubscribeMap<T, R>(this, func));
}
創(chuàng)建一個(gè)OnSubscribeMap包裝map函數(shù)和原Observable。
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
final Observable<T> source;
final Func1<? super T, ? extends R> transformer;
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
@Override
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
}
subscribe時(shí)會(huì)生成一個(gè)MapSubscriber包裝原來(lái)subscriber繁堡。
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;
actual.onError(e);
}
@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}
@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
}
當(dāng)MapSubscriber的onNext方法被調(diào)用時(shí)沈善,先用map函數(shù)轉(zhuǎn)換一下,然后傳給包裝subscriber的onNext