1蛾狗、RxJava簡介
RxJava 的本質(zhì)可以壓縮為異步這一個詞。說到根上座菠,它就是一個實現(xiàn)異步操作的庫狸眼,Rx的全稱是Reactive Extensions,直譯過來就是響應式擴展浴滴。Rx基于觀察者模式拓萌,他是一種編程模型,目標是提供一致的編程接口升略,幫助開發(fā)者更方便的處理異步數(shù)據(jù)流微王。ReactiveX.io給的定義是,Rx是一個使用可觀察數(shù)據(jù)流進行異步編程的編程接口品嚣,ReactiveX結合了觀察者模式炕倘、迭代器模式和函數(shù)式編程的精華。RxJava說白了就是用Java語言邊寫的使用各種操作符形成鏈式結構來處理異步數(shù)據(jù)流的工具翰撑。
2罩旋、RxJava基礎及常用操作符
RxJava 有四個基本概念:
Observable(可觀察者,即被觀察者);
Observer(觀察者)瘸恼;
subscribe(訂閱)事件劣挫;
Observable和Observer通過 subscribe() 方法實現(xiàn)訂閱關系册养,從而 Observable可以在需要的時候發(fā)出事件來通知 Observer东帅。
(1) 創(chuàng)建 Observable被觀察者
Observable<String> myObservable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello, RxJava!");
subscriber.onCompleted();
}});
(2) 創(chuàng)建 Observer觀察者
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) { }
@Override
public void onNext(String s) {
Log.d("haijiang", "--->" + s); }
};
Subscriber是Observer的擴展,在使用過程中球拦,我們直接用Subscriber作為觀察者對象就OK了靠闭!
(3) 建立訂閱關系
myObservable.subscribe(mySubscriber);
一旦建立訂閱關系坎炼,OnSubscribe中的call方法就會被調(diào)用愧膀,在call方法中主動觸發(fā)了觀察者的onNext,onCompleted方法谣光,可看到輸出“Hello, RxJava!” 檩淋。
以上是一個最基本的流程,我們可以寫成鏈式調(diào)用:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello, RxJava!");
subscriber.onCompleted();
}}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) { }
@Override
public void onNext(String s) {
Log.d("haijiang", "--->" + s); }
});
更多操作符欣賞:
just
subscribe方法有一個重載版本萄金,接受三個Action1類型的參數(shù)蟀悦,分別對應OnNext,OnComplete氧敢, OnError函數(shù)
Observable.just("hello","rxjava","rxandroid").subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("haijiang", "--->" + s);
}});
ActionX沒有返回日戈,還有一種FuncX有返回的操作,后面會說孙乖。
just操作符是創(chuàng)建一個Observable浙炼,一次發(fā)送傳入的參數(shù)。
from
/** * Observable.from()方法唯袄,它接收一個集合作為輸入 */
String[] strArrsy = {"hello","rxjava","rxandroid"};
Observable.from(strArrsy).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("haijiang", "--->" + s);
}});
下面介紹兩個重量級操作符map和flatMap弯屈,核心變換,靈活操作恋拷。
map
map是一對一的變化季俩,將一個Observable<T>變換成Observable<R>
Observable.just("I LOVE YOU!").map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return 520;
}}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer s) {
Log.d("haijiang", "--->" + s);
}});
通過map,F(xiàn)unc1將String轉換成了Int梅掠;其中FuncX是RxJava的一個包裝接口酌住,跟ActionX類似,只不過FuncX是有返回對象的阎抒。
flatMap
flatMap()中返回的是個 Observable對象酪我,并且這個 Observable對象并不是被直接發(fā)送到了 Subscriber
的回調(diào)方法中。flatMap()的原理是這樣的:
- 使用傳入的事件對象創(chuàng)建一個 Observable對象且叁;
- 并不發(fā)送這個 Observable, 而是將它激活都哭,于是它開始發(fā)送事件;
- 每一個創(chuàng)建出來的 Observable發(fā)送的事件,都被匯入同一個 Observable欺矫,而這個 Observable負責將這些事件統(tǒng)一交給Subscriber 的回調(diào)方法纱新。
這三個步驟,把事件拆成了兩級穆趴,通過一組新創(chuàng)建的 Observable將初始的對象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去脸爱。而這個『鋪平』就是 flatMap() 所謂的 flat。
private ArrayList<Data> mData;
Observable.from(mData).flatMap(new Func1<Data, Observable<ChildData>>() {
@Override
public Observable<ChildData> call(Data data) {
return Observable.from(data.getChildData);
}
}).subscribe(new Action1<ChildData>() {
@Override
public void call(ChildData cd) {
Log.d("haijiang", "--->" + cd.getName);
}});
concatMap
flatMap()操作符使用你提供的原本會被原始Observable發(fā)送的事件未妹,來創(chuàng)建一個新的Observable簿废。而且這個操作符,返回的是一個自身發(fā)送事件并合并結果的Observable络它∽迕剩可以用于任何由原始Observable發(fā)送出的事件,發(fā)送合并后的結果化戳。記住单料,flatMap()可能交錯的發(fā)送事件,最終結果的順序可能并是不原始Observable發(fā)送時的順序点楼。為了防止交錯的發(fā)生扫尖,可以使用與之類似的concatMap()操作符。綜上所述盟步,就是利用concatMap替換flatMap操作符藏斩,輸入順序就防止了交錯,跟原始Obervable順序一致却盘。
timer()
timer操作符:用于創(chuàng)建Observabl狰域,延遲發(fā)送一次。
下面延時兩秒黄橘,輸出log
Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {
log.d ("completed");
}
@Override
public void onError(Throwable e) {
log.e("error");
}
@Override
public void onNext(Long number) {
log.d ("hello world");
}
});
interval()
interval:用于創(chuàng)建Observable兆览,用于每個XX秒循環(huán)進行某個操作
Observable.timer(2, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) { /
/TODO WHAT YOU WANT
}
});
delay()
delay:用于事件流中,可以延遲發(fā)送事件流中的某一次發(fā)送塞关。
retryWhen
retryWhen()是RxJava的一種錯誤處理機制抬探,當遇到錯誤時,將錯誤傳遞給另一個Observable來決定是否要重新給訂閱這個Observable帆赢。下面封裝一個處理網(wǎng)絡錯誤的類
public class RetryWhenProcess implements Func1<Observable<? extends Throwable>, Observable<?>> {
private long mInterval;
public RetryWhenProcess(long interval) {
mInterval = interval;
}
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
if (throwable instanceof UnknownHostException) {
return Observable.error(throwable);
}
return Observable.just(throwable).zipWith(Observable.range(1, 5), new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer i) {
return i;
}
}).flatMap(new Func1<Integer, Observable<? extends Long>>() {
@Override
public Observable<? extends Long> call(Integer retryCount) {
return Observable.timer((long) Math.pow(mInterval, retryCount), TimeUnit.SECONDS);
}
});
}
});
}
});
}
}
使用方法:
.retryWhen(new RetryWhenProcess(5))
compose
compose()是針對 Observable自身進行變換小压。假設在程序中有多個 Observable,并且他們都需要應用一組相同的變換可以使用椰于。
場景是這樣的:work thread 中處理數(shù)據(jù)怠益,然后 UI thread 中處理結果。當然瘾婿,我們知道是要使用 subscribeOn() 和 observeOn() 進行處理蜻牢。最常見的場景是烤咧,調(diào)server 的 API 接口取數(shù)據(jù)的時候,那么抢呆,那么多接口煮嫌,反復寫這兩個操作符是蛋疼的,為了避免這種情況抱虐,我們可以通過 compse() 操作符來實現(xiàn)復用昌阿,下面面這段代碼就實現(xiàn)了這樣的功能。
/**
* 這個類是 小鄧子 提供的梯码!
*/
public class SchedulersCompat {
private static final Observable.Transformer computationTransformer =
new Observable.Transformer() {
@Override public Object call(Object observable) {
return ((Observable) observable).subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread());
}
};
private static final Observable.Transformer ioTransformer = new Observable.Transformer() {
@Override public Object call(Object observable) {
return ((Observable) observable).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
private static final Observable.Transformer newTransformer = new Observable.Transformer() {
@Override public Object call(Object observable) {
return ((Observable) observable).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
}
};
private static final Observable.Transformer trampolineTransformer = new Observable.Transformer() {
@Override public Object call(Object observable) {
return ((Observable) observable).subscribeOn(Schedulers.trampoline())
.observeOn(AndroidSchedulers.mainThread());
}
};
private static final Observable.Transformer executorTransformer = new Observable.Transformer() {
@Override public Object call(Object observable) {
return ((Observable) observable).subscribeOn(Schedulers.from(ExecutorManager.eventExecutor))
.observeOn(AndroidSchedulers.mainThread());
}
};
/**
* Don't break the chain: use RxJava's compose() operator
*/
public static <T> Observable.Transformer<T, T> applyComputationSchedulers() {
return (Observable.Transformer<T, T>) computationTransformer;
}
public static <T> Observable.Transformer<T, T> applyIoSchedulers() {
return (Observable.Transformer<T, T>) ioTransformer;
}
public static <T> Observable.Transformer<T, T> applyNewSchedulers() {
return (Observable.Transformer<T, T>) newTransformer;
}
public static <T> Observable.Transformer<T, T> applyTrampolineSchedulers() {
return (Observable.Transformer<T, T>) trampolineTransformer;
}
public static <T> Observable.Transformer<T, T> applyExecutorSchedulers() {
return (Observable.Transformer<T, T>) executorTransformer;
}
}
使用方式:
.compose(SchedulersCompat.ioTransformer );
3宝泵、線程控制 — Scheduler
在RxJava 中好啰,Scheduler—調(diào)度器轩娶,相當于線程控制器,RxJava 通過它來指定每一段代碼應該運行在什么樣的線程框往。RxJava 已經(jīng)內(nèi)置了幾個 Schedule鳄抒,它們已經(jīng)適合大多數(shù)的使用場景:
Schedulers.immediate(): 直接在當前線程運行,相當于不指定線程椰弊。這是默認的 Scheduler许溅。
Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作秉版。
Schedulers.io(): I/O 操作(讀寫文件贤重、讀寫數(shù)據(jù)庫、網(wǎng)絡信息交互等)所使用的 Scheduler清焕。行為模式和 newThread() 差不多并蝗,區(qū)別在于 io()的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程秸妥,因此多數(shù)情況下 io()比 newThread()更有效率滚停。不要把計算工作放在 io()中,可以避免創(chuàng)建不必要的線程粥惧。
Schedulers.computation(): 計算所使用的 Scheduler键畴。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作突雪,例如圖形的計算起惕。這個 Scheduler使用的固定的線程池,大小為 CPU 核數(shù)咏删。不要把 I/O 操作放在 computation() 中惹想,否則 I/O 操作的等待時間會浪費 CPU。另外饵婆, Android 還有一個專用的 AndroidSchedulers.mainThread()勺馆,它指定的操作將在 Android 主線程運行戏售。
有了這幾個 Scheduler,就可以使用 subscribeOn() 和 observeOn()兩個方法來對線程進行控制草穆。subscribeOn() 指定subscribe()所發(fā)生的線程灌灾,即 Observable.OnSubscribe被激活時所處的線程,或者叫做事件產(chǎn)生的線程悲柱。 observeOn()指定Subscriber所運行在的線程锋喜,或者叫做事件消費的線程。
4RxJava在android開發(fā)中的一些應用
參考:可能是東半球最全的RxJava使用場景小結 (http://blog.csdn.net/theone10211024/article/details/50435325)
RxBinding
節(jié)流(防止按鈕的重復點擊)
輪詢豌鸡,
定時操作
RxPermissions
RxBus
RxJava與Retrofit
(1)RxBinding
RxBinding是JakeWharton大牛用RxJava為Android控件編寫的一個控件綁定庫粗蔚。
例子:
Button button = (Button) findViewById(R.id.button);
RxView.clicks(button).subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
Log.i("test", "clicked");
}
});
(2)防止重復點擊
RxView.clicks(button).debounce(300, TimeUnit.MILLISECONDS).subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
Log.i("test", "clicked");
}
});
(3)EditText輸入請求。避免每次輸入產(chǎn)生頻繁的請求
RxTextView.textChangeEvents(inputEditText)
.debounce(400, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<TextViewTextChangeEvent>() {
@Override
public void onCompleted() {
log.d("onComplete");
}
@Override
public void onError(Throwable e) {
log.d("Error");
}
@Override
public void onNext(TextViewTextChangeEvent onTextChangeEvent) {
log.d(format("Searching for %s", onTextChangeEvent.text().toString()));
}
});
(4)RxPermissions
RxPermissions也是國外的大牛開發(fā)的基于RxJava的Android權限管理庫拐辽,他讓6.0以上的權限管理更加的簡單育韩,如果有適配6.0以上的手機的需求,這個庫是個不錯的選擇蛇更。下面我們來看看基本的用法瞻赶。
// 請求相機權限
RxPermissions.getInstance(this)
.request(Manifest.permission.CAMERA)
.subscribe(granted -> {
if (granted) { // 用戶同意了(在6.0之前的手機始終都為true)
//可以拍照了
} else {
//可以在這里提示用戶,或者再次請求
}
});
更多功能研究github吧
(5)RxBus
參考:http://www.reibang.com/p/ca090f6e2fe2
不多說派任,上代碼
/**
* RxBus
* Created by YoKeyword on 2015/6/17.
*/
public class RxBus {
private static volatile RxBus defaultInstance;
private final Subject<Object, Object> bus;
// PublishSubject只會把在訂閱發(fā)生的時間點之后來自原始Observable的數(shù)據(jù)發(fā)射給觀察者
public RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
// 單例RxBus
public static RxBus getDefault() {
if (defaultInstance == null) {
synchronized (RxBus.class) {
if (defaultInstance == null) {
defaultInstance = new RxBus();
}
}
}
return rxBus;
}
// 發(fā)送一個新的事件
public void post (Object o) {
bus.onNext(o);
}
// 根據(jù)傳遞的 eventType 類型返回特定類型(eventType)的 被觀察者
public <T> Observable<T> toObservable (Class<T> eventType) {
return bus.ofType(eventType);
// 這里感謝小鄧子的提醒: ofType = filter + cast
// return bus.filter(new Func1<Object, Boolean>() {
// @Override
// public Boolean call(Object o) {
// return eventType.isInstance(o);
// }
// }) .cast(eventType);
}
}
(6)RxJava 與 Retrofit 結合的最佳實踐
參考扔物線文章:http://gank.io/post/56e80c2c677659311bed9841
參考:
1砸逊、http://www.reibang.com/users/df40282480b4/latest_articles
2、http://gank.io/post/560e15be2dca930e00da1083#toc
3掌逛、http://www.reibang.com/p/8cf84f719188