之前看了一篇響應(yīng)式編程的文章,準(zhǔn)備應(yīng)用到項(xiàng)目中,所以學(xué)習(xí)下rxjava的使用方式
rxjava的相關(guān)博客 拉丁吳 扔物線 木水川的博客
以下代碼來自github拉丁吳的demo
1.創(chuàng)建一個(gè)被觀察者(三種方式)
create七兜、from、just
create
//創(chuàng)建被觀察者卿啡,這是最正常的創(chuàng)建方法
Observable observable=Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("一二三四五");
subscriber.onNext("上山打老虎");
subscriber.onNext("老虎一發(fā)威");
subscriber.onNext("武松就發(fā)怵");
subscriber.onCompleted();
}
});
create方法接收的是一個(gè)OnSubscribe對(duì)象
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
from
String [] kk={"一二三四五","上山打老虎","老虎一發(fā)威","武松就發(fā)怵"};
Observable observable=Observable.from(kk);
from接收的是一個(gè)數(shù)組
public static <T> Observable<T> from(T[] array) {
int n = array.length;
if (n == 0) {
return empty();
} else
if (n == 1) {
return just(array[0]);
}
return create(new OnSubscribeFromArray<T>(array));
}
just
Observable observable=Observable.just("一二三四五","上山打老虎","老虎一發(fā)威","武松就發(fā)怵");
just方法在Observable類里面有10個(gè)重載方法分別對(duì)應(yīng)10個(gè)參數(shù)
public static <T> Observable<T> just(final T value) {
return ScalarSynchronousObservable.create(value);
}
.
.
.
public static <T> Observable<T> just(T t1, T t2) {
return from((T[])new Object[] { t1, t2 });
}
.
.
.
public static <T> Observable<T> just(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9, T t10) {
return from((T[])new Object[] { t1, t2, t3, t4, t5, t6, t7, t8, t9, t10 });
}
2.創(chuàng)建一個(gè)觀察者
兩種方式
第一種
觀察者的基類是Observer
//創(chuàng)建觀察者
Subscriber subscriber=new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
};
public abstract class Subscriber<T> implements Observer<T>, Subscription {}
第二種
通過Action
Action總共有10種(Action0 Action1.......Action9)嘱支,它們都繼承了Action
Action其實(shí)和Observer是沒有任何關(guān)系的,Action并不是Observer的子類厕九,但是它可以充當(dāng)觀察者使用蓖捶,專門處理onNext事件
public interface Action extends Function {
}
public interface Function {
}
public interface Action0 extends Action {
void call();
}
.
.
.
public interface Action1<T> extends Action {
void call(T var1);
}
public interface Action9<T1, T2, T3, T4, T5, T6, T7, T8, T9> extends Action {
void call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9);
}
它們的區(qū)別只是call方法中參數(shù)的個(gè)數(shù)不同
訂閱
觀察者與被觀察者是通過訂閱來聯(lián)系起來的
被觀察者.subscribe(觀察者)
之所以這么設(shè)計(jì),是為了保證流式API調(diào)用風(fēng)格扁远,參考
訂閱流程
observable.subscribe(subscriber);
之后做了哪些事情呢俊鱼?
在Observable類里面
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
接著調(diào)用了
//subscriber是我們的觀察者,在訂閱的時(shí)候傳進(jìn)來的參數(shù)刻像,而observable代表this,就是訂閱的時(shí)候所調(diào)用的方法的對(duì)象并闲,就是我們的被觀察者
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
//省略代碼
// new Subscriber so onStart it
subscriber.onStart();
//省略
try {
// allow the hook to intercept and/or decorate
//這兒調(diào)用了call方法
RxJavaHooks.onObservableStart(observable,observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
}
}
RxJavaHooks類中
public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
if (f != null) {
return f.call(instance, onSubscribe);
}
return onSubscribe;
}
傳進(jìn)來的參數(shù)是observable和observable的唯一屬性onSubscribe(也是構(gòu)造Observable的時(shí)候所必須的细睡,只有這個(gè)一帶參數(shù)的構(gòu)造函數(shù)),onObservableStart是不為null的,在靜態(tài)代碼塊中就已經(jīng)初始化了帝火。
/** Initialize with the default delegation to the original RxJavaPlugins. */
static {
init();
}
/** Utility class. */
private RxJavaHooks() {
throw new IllegalStateException("No instances!");
}
/**
* Initialize the hooks via delegating to RxJavaPlugins.
*/
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation"})
static void init() {
onError = new Action1<Throwable>() {
@Override
public void call(Throwable e) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
}
};
onObservableStart = new Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe>() {
@Override
public Observable.OnSubscribe call(Observable t1, Observable.OnSubscribe t2) {
return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2);
}
};
}
@Deprecated
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
// pass through by default
return onSubscribe;
}
默認(rèn)返回的就是傳進(jìn)來的onSubscribe對(duì)象
所以
RxJavaHooks.onObservableStart(observable,observable.onSubscribe).call(subscriber);
其實(shí)調(diào)用的就是observable.onSubscribe的call方法溜徙,而這個(gè)onSubscribe是在構(gòu)造被觀察者的時(shí)候創(chuàng)建的
這就是從訂閱動(dòng)作發(fā)生到被觀察者中的call方法調(diào)用的流程
call方法中的參數(shù)subscriber正是我們的觀察者。
操作符
以map操作符為例
Observable.just("picurl")
//使用map操作來完成類型轉(zhuǎn)換
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
//顯然自定義的createBitmapFromPath(s)方法购公,是一個(gè)極其耗時(shí)的操作
return createBitmapFromPath(s);
}
})
.subscribe(
//創(chuàng)建觀察者萌京,作為事件傳遞的終點(diǎn)處理事件
new Subscriber<Bitmap>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
//出現(xiàn)錯(cuò)誤會(huì)調(diào)用這個(gè)方法
}
@Override
public void onNext(Bitmap s) {
//處理事件
}
}
);
map是如何對(duì)事件進(jìn)行變換處理,最后傳遞到觀察者手中的呢宏浩?
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return create(new OnSubscribeMap<T, R>(this, func));
}
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
構(gòu)造一個(gè)Observable被觀察者知残,有三種方式,用create時(shí)比庄,傳入的參數(shù)是Observable.OnSubscribe,所以O(shè)nSubscripbeMap是Onsubscribe的子類,所以肯定會(huì)實(shí)現(xiàn)call方法
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
}
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}
public interface Action1<T> extends Action {
void call(T t);
}
在OnSubscribeMap的call方法中,創(chuàng)建了一個(gè)觀察者的代理對(duì)象MapSubscriber
@Override
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
}
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
這個(gè)source是我們的原始的被觀察者
source.unsafeSubscribe(parent);
所以原始的被觀察者在這里會(huì)注冊(cè)代理觀察者
以onNext方法為例
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
mapper 是什么求妹?
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return create(new OnSubscribeMap<T, R>(this, func));
}
所以mapper就是調(diào)用map方法時(shí)傳入的參數(shù)
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
調(diào)用result = mapper.call(t);就是
new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
//顯然自定義的createBitmapFromPath(s)方法,是一個(gè)極其耗時(shí)的操作
return createBitmapFromPath(s);
}
}
然后再執(zhí)行 actual.onNext(result);actual是我們最原始的觀察者佳窑。