Rx指的是ReactiveX,也是Reactive Extensions的縮寫泞莉,是一個使用可觀察數(shù)據(jù)流進行異步編程的編程接口馍管,結合了觀察者模式掸掏、迭代器模式和函數(shù)式編程的精華茁影,是一種編程思想的突破,它影響了許多其它的程序庫和框架以及編程語言
RxJava里最核心的兩個東西Observable和Subscriber丧凤,Observable指的是被觀察者募闲、事件源,Subscriber指的是被觀察者愿待、訂閱者浩螺、用戶。
舉個簡單的例子仍侥,像微信里面的服務號和公眾號要出,我們關注了某個服務號或公眾號,只要發(fā)出消息农渊,我們都能同時收到患蹂。這里的服務號和公眾號就是Observable,Subscriber指的就是我們用戶砸紊。
看起來RxJava蠻像觀察者模式传于,不過有一點不同Observable如果沒有Subscriber不會發(fā)出任何事件
下面就讓我們一起進入RxJava的世界吧
參考資料學習網(wǎng)址
1、Grokking RxJava
2醉顽、github地址
3格了、ReactiveX/RxJava文檔中文版
4、ReactiveX官網(wǎng)
5徽鼎、拋物線博客
配置環(huán)境
在builde.gradle里面添加
compile 'io.reactivex:rxandroid:1.2.1'
compile 'io.reactivex:rxjava:1.1.6'
基本實現(xiàn)
1、Hello world
創(chuàng)建Observable對象弹惦,通過Observable.create()來創(chuàng)建否淤,sub.onNext()可以發(fā)出信息,最后調用sub.onCompleted()來完成棠隐,還有一個sub.onError()方法是出現(xiàn)異常提供的一個方法石抡,一旦調用了sub.onError()或者sub.onCompleted(),后面的邏輯代碼都不會執(zhí)行助泽。
Observable<String> myObservable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) {
sub.onNext("Hello, world1!");
sub.onNext("Hello, world2!");
sub.onCompleted();
}
}
);
創(chuàng)建Subscriber對象啰扛,實例化Subscriber抽象類,實現(xiàn)onNext嗡贺、onCompleted隐解、onError三個方法,依次響應Observable對象里面的sub.onNext()诫睬、sub.onCompleted()和sub.onError()方法
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d("rxjava","onNext="+s);
}
@Override
public void onCompleted() {
Log.d("rxjava","onCompleted");
}
@Override
public void onError(Throwable e) {
Log.d("rxjava","onError="+e);
}
};
最后Subscriber訂閱Observable煞茫,這里Observable可以被多個Subscriber訂閱
myObservable.subscribe(mySubscriber);
myObservable.subscribe(mySubscriber2);
myObservable.subscribe(mySubscriber3);
2、簡化代碼
上面的代碼可以轉變?yōu)檫@樣:
Observable<String> myObservable = Observable.just("Hello, world1!","Hello, world2!");
Action1<String> onNextAction = new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
@Override
public void call(Throwable e) {
}
};
Action0 onCompletedAction = new Action0() {
@Override
public void call() {
}
};
// 自動創(chuàng)建 Subscriber 续徽,并使用 onNextAction 來定義 onNext()
myObservable.subscribe(onNextAction);
// 自動創(chuàng)建 Subscriber 蚓曼,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
myObservable.subscribe(onNextAction,onErrorAction);
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction钦扭、 onErrorAction 和 onCompletedAction 來定義 onNext()纫版、 onError() 和 onCompleted()
myObservable.subscribe(onNextAction,onErrorAction,onCompletedAction);
Observable.just("Hello, world1!","Hello, world2!")
會直接創(chuàng)建Observable對象客情,
然后依次調用sub.onNext("Hello, world1!"); sub.onNext("Hello, world2!"); sub.onCompleted();
一般onErrorAction和onCompletedAction我們可以不用管理 最后代碼可優(yōu)化成:
Observable.just("Hello, world1!","Hello, world2!")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
java8語法lambda可以寫成
Observable.just("Hello, world!")
.subscribe(s -> System.out.println(s));
3其弊、操作符map
簡單的一個例子,傳遞一個字符串裹匙,然后獲取它的長度瑞凑,最后打印出來,在RxJava里我們可以這樣實現(xiàn):
Observable.just("Hello, world!")
.map(new Func1<String,Integer>(){
@Override
public Integer call(String s){
return s.length();
}
})
.subscribe(new Action1<Integer>(){
@Override
public void call(Integer i){
System.out.println(Integer.toString(i));
}
});
這里map方法需要結合Func1接口來使用概页,F(xiàn)unc1里第一個String類型是對應接收just類型次伶,第二個Integer類型是返回值,供下一個對象接收
4更胖、操作符from
如果我們要循環(huán)遍歷一個數(shù)組或者集合预茄,在RxJava里面可以使用from操作符來實現(xiàn)
String[] items = { "0", "1", "2", "3", "4", "5" };
Observable.from(items)
.subscribe(new Action1<String>(){
@Override
public void call(String str){
//依次遍歷打印items
System.out.println(str);
}
});
操作符from會循環(huán)遍歷方法參數(shù)里面的數(shù)組或者集合,然后依次調用onNext()方法
5项鬼、操作符flatMap
如果我們要實現(xiàn)多重循環(huán)哑梳,可以使用flatMap操作符來實現(xiàn)
String[][] itemArray = {{"1","2","3"},{"4","5","6"},{"7","8","9"}};
Observable.from(itemArray)
.flatMap(new Func1<String[],Observable<String>>(){
@Override
public Observable<String> call(String[] s){
return Observable.from(s);
}
})
.subscribe(new Subscriber<String>(){
@Override
public void onCompleted(){
}
@Override
public void onError(Throwable e){
}
@Override
public void onNext(String str){
Log.d("rxJava","str="+str);
}
});
上面代碼執(zhí)行后,依次打印str=1到9绘盟,這里要注意的是Func1第二個參數(shù)類型是Observable<?>
6鸠真、操作符filter
字面理解應該就知道它的作用了,過濾不需要的數(shù)據(jù)
Observable.just(1, 2, 3, 4, 5)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer item) {
return( item < 4 );
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
上面代碼執(zhí)行結果龄毡,只會打印1到3吠卷,后面4和5不符合,則過濾
7沦零、操作符merge
合并多個Observables
Observable<Integer> odds = Observable.just(1, 3, 5);
Observable<Integer> evens = Observable.just(2, 4, 6);
Observable.merge(odds, evens)
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
上面代碼執(zhí)行后結果:依次打印的是1祭隔、3、5路操、2疾渴、4、6
* Javadoc: merge(Iterable)
* Javadoc: merge(Iterable,int)
* Javadoc: merge(Observable[])
* Javadoc: merge(Observable,Observable) (接受二到九個Observable)
除了傳遞多個Observable給merge屯仗,你還可以傳遞一個Observable列表List搞坝,數(shù)組,甚至是一個發(fā)射Observable序列的Observable魁袜,merge將合并它們的輸出作為單個Observable的輸出
8瞄沙、操作符timer和interval
5秒后打印數(shù)據(jù)己沛,第一個參數(shù)是多少時間執(zhí)行,第二個參數(shù)是時間單位
Observable.timer(5,TimeUnit.SECONDS)
.subscribe(new Action1<Long>(){
@Override
public void call(Long aLong){
Log.d("rxJava","5秒后執(zhí)行"+aLong);
}
});
1秒后每5秒循環(huán)打印距境,第一個參數(shù)為多少時間后執(zhí)行申尼,第二個為沒多少時間執(zhí)行,第三個是時間單位
Observable.interval(1,5,TimeUnit.SECONDS)
.subscribe(new Action1<Long>(){
@Override
public void call(Long aLong){
Log.d("rxJava","1秒后每5秒循環(huán)執(zhí)行"+aLong);
}
});
這里aLong返回的值應該是打印的次數(shù)
9垫桂、線程切換
主要通過subscribeOn()和 observeOn()來實現(xiàn)
subscribeOn()
是控制Observable.OnSubscribe的線程切換师幕, subscribeOn()的線程切換發(fā)生在 OnSubscribe中,即在它通知上一級 OnSubscribe時诬滩,這時事件還沒有開始發(fā)送霹粥,因此控制可以從事件發(fā)出的開端就造成影響
observeOn()
則是控制Subscriber的線程切換,observeOn() 的線程切換則發(fā)生在它內建的 Subscriber中疼鸟,即發(fā)生在它即將給下一級 Subscriber發(fā)送事件時后控,因此控制的是它后面的線程
Schedulers.io()
:IO線程
AndroidSchedulers.mainThread()
:主線程
Schedulers.newThread()
:新開線程
Observable.just(1, 2, 3, 4) // IO 線程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新線程空镜,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 線程浩淘,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主線程,由 observeOn() 指定
10吴攒、防止按鈕重復點擊
1秒鐘內只會執(zhí)行第一次點擊
RxView.clicks(view)
.throttleFirst(1,TimeUnit.SECONDS)
.subscribe(new Observer<Object>(){
@Override
public void onCompleted(){
}
@Override
public void onError(Throwable e){
}
@Override
public void onNext(Object o){
}
});
RxView是RxBinding庫里的對象张抄,需要添加環(huán)境
compile 'com.jakewharton.rxbinding:rxbinding:0.4.0'
throttleFirst():在每次事件觸發(fā)后的一定時間間隔內丟棄新的事件
11、RxBus代替EventBus
public class RxBus {
// 主題
private final Subject<Object, Object> bus;
// PublishSubject只會把在訂閱發(fā)生的時間點之后來自原始Observable的數(shù)據(jù)發(fā)射給觀察者
private RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
public static RxBus get() {
return RxBusHolder.sInstance;
}
public void unSubscribe(CompositeSubscription compositeSubscription){
if (compositeSubscription != null && !compositeSubscription.isUnsubscribed())
compositeSubscription.unsubscribe();
}
private static class RxBusHolder {
private static final RxBus sInstance = new RxBus();
}
// 提供了一個新的事件
public void post(Object o) {
bus.onNext(o);
}
// 根據(jù)傳遞的 eventType 類型返回特定類型(eventType)的 被觀察者
public <T> Observable<T> toObserverable(Class<T> eventType) {
return bus.ofType(eventType);
}
}
RxBus提供了三個方法toObserverable()獲取Observable對象洼怔、unSubscribe()解除訂閱署惯、post()發(fā)送數(shù)據(jù)。
在MainActivity中實現(xiàn)
public class MainActivity extends AppCompatActivity {
private CompositeSubscription allSubscription = new CompositeSubscription();
@Override
protected void onCreate(Bundle savedInstanceState){
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
...
initRxBus();
}
private void initRxBus(){
//注冊訂閱RxBus并添加到CompositeSubscription,在onDestroy()里可以統(tǒng)一解除訂閱
allSubscription.add(RxBus.get().toObserverable(OneEvent.class).subscribe(this::response));
}
@Override
protected void onDestroy(){
super.onDestroy();
//解除訂閱
RxBus.get().unSubscribe(allSubscription);
}
/**
* RxBus響應接收數(shù)據(jù)方法
* @param event
*/
public void response(OneEvent event) {
Toast.makeText(getApplicationContext(),event.msg,Toast.LENGTH_LONG).show();
}
class OneEvent {
String msg;
public OneEvent(String msg) {
this.msg = msg;
}
}
}
最后在需要發(fā)送數(shù)據(jù)的地方調用镣隶,跟EventBus還是滿相似的
RxBus.get().post(new OneEvent("hello bus"));
12极谊、結合Retrofit使用
這里我就直接拿介紹Retrofit那篇文章的例子在這里說明下
@POST("query")
Observable<PostQueryInfo> searchRx(@Query("type") String type, @Query("postid") String postid);
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://www.kuaidi100.com/")
//添加數(shù)據(jù)解析ConverterFactory
.addConverterFactory(GsonConverterFactory.create())
//添加RxJava
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
GitHubService apiService = retrofit.create(GitHubService.class);
apiService.searchRx("yuantong","500379523313")
//訪問網(wǎng)絡切換異步線程
.subscribeOn(Schedulers.io())
//響應結果處理切換成主線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<PostQueryInfo>() {
@Override
public void onCompleted() {
//請求結束回調
}
@Override
public void onError(Throwable e) {
//錯誤回調
e.printStackTrace();
}
@Override
public void onNext(PostQueryInfo postQueryInfo) {
//成功結果返回
Log.e("APP",postQueryInfo.getNu());
}
});
Retrofit支持RxJava可以返回Observable對象,所以我們就可以直接拿來用安岂,很方便吧
.subscribeOn(Schedulers.io()
請求網(wǎng)絡切換成IO線程
.observeOn(AndroidSchedulers.mainThread())
切換成主線程更新UI
onNext(PostQueryInfo postQueryInfo)
成功回調解析數(shù)據(jù)
public void onError(Throwable e)
異郴晨幔回調拋出異常
Tips
配置java8 lambda環(huán)境
1、在Module中的build.gradle中添加嗜闻,然后Make Module 'xxx'
apply plugin: 'me.tatarka.retrolambda'
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath 'me.tatarka:gradle-retrolambda:3.2.5'
}
}
repositories {
mavenCentral()
}
android {
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
...
}
2、測試 下面代碼不報紅桅锄,配置環(huán)境OK
textView.setOnClickListener(v ->
Toast.makeText(getApplicationContext(),"lambda",Toast.LENGTH_LONG).show()
);
學到最后其實RxJava只是剛入了個門琉雳,還有很多很多需要我們去學習的,這里只是跟大家一起看了看RxJava的世界是什么樣子的友瘤,如果要學好翠肘,還是要多去看看api文檔,多練習才能真正的熟練使用它辫秧。
革命尚未成功束倍,我們仍需努力