1.rxjava介紹
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
rxjava是用java實(shí)現(xiàn)Reactive Extensions莺治,可觀測(cè)的序列來(lái)組成異步的、基于事件的程序的庫(kù)帚稠。它擴(kuò)展了觀察者模式來(lái)支持?jǐn)?shù)據(jù)/事件序列谣旁,而抽象掉的事情,如低級(jí)別的線程翁锡,同步蔓挖,線程安全和并發(fā)數(shù)據(jù)結(jié)構(gòu)。(簡(jiǎn)而言之馆衔,就是做異步處理,類似于Android中的Asynctask和handler)
2.rxjava 簡(jiǎn)單使用
2.1 添加依賴
compile"io.reactivex.rxjava2:rxjava:2.0.7"
compile"io.reactivex.rxjava2:rxandroid:2.0.1"
2.2創(chuàng)建被觀察者Observable
//被觀察者 就是產(chǎn)生事件
Observable observable=Observable.create(new ObservableOnSubscribe() {
@Override
? ? public void subscribe(ObservableEmitter emitter)throws Exception {
//ObservableEmitter事件發(fā)射器
? ? ? ? emitter.onNext(1);
? ? ? ? emitter.onNext(2);
? ? ? ? emitter.onNext(3);
? ? ? ? emitter.onNext(4);//發(fā)射事件
? ? }
});
2.3創(chuàng)建觀察者Observer
//觀察者 就是處理事件
Observer observer=new Observer() {
? ? ?@Override
? ? public void onSubscribe(Disposable d) {
//這里開(kāi)始采用subscribe 鏈接
? ? ? ? Log.e("onSubscribe",""+d.toString());
? ? }
? @Override
? ? public void onNext(Integer integer) {
//處理onnext事件
? ? ? ? Log.e("onNext",""+integer);
? ? }
? ? @Override
? ? public void onError(Throwable e) {
//處理錯(cuò)誤的事件
? ? ? ? Log.e("onError",""+e.getMessage());
? ? }
? ? @Override
? ? public void onComplete() {
//完成事做出響應(yīng)
? ? ? ? Log.e("onComplete","");
? ? }
};
2.4訂閱Subscribe
//訂閱
observable.subscribe(observer);
2.5使用Disposable切斷鏈接
//觀察者
Observer observer=new Observer() {
Disposabledisposable;
? ? @Override
? ? public void onSubscribe(Disposable d) {
//這里開(kāi)始采用subscribe 鏈接
? ? ? ? Log.e("onSubscribe",""+d.toString());
? ? ? ? disposable=d;
? ? }
@Override
? ? public void onNext(Integer integer) {
//處理onnext事件
? ? ? ? Log.e("onNext",""+integer);
? ? }
@Override
? ? public void onError(Throwable e) {
//處理錯(cuò)誤的事件
? ? ? ? Log.e("onError",""+e.getMessage());
? ? ? ? disposable.dispose();
? ? }
@Override
? ? public void onComplete() {
//完成事做出響應(yīng)
? ? ? ? Log.e("onComplete","");
? ? }
};
3.切換線程
observable.subscribeOn(Schedulers.newThread())//指定被觀察者線程 即 發(fā)送事件的線程
? ? ? ? .observeOn(AndroidSchedulers.mainThread())//指定觀察者線程 即 處理事件時(shí)使用的線程
? ? ? ? .subscribe(observer);
如果observable.subscribeOn() 多次指定線程只有第一次的有效
observeOn每次切換都有效