Demo地址
本Demo旨在幫助從未接觸過(guò)RxJava的同學(xué)直接入坑RxJava2组去,如絲般順滑男窟,萬(wàn)水千山總是情,留個(gè)star行不行蒿囤?
RxJava & RxAndroid (2.0版)#
定義
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
一個(gè)在 Java VM 上使用可觀測(cè)的序列來(lái)組成異步的客们、基于事件的程序的庫(kù)
初學(xué)者如果看到這個(gè)準(zhǔn)確但晦澀的定義肯定一臉懵逼,不過(guò)我們我們只要把握重點(diǎn)即可:
- 異步
- 基于事件
- 觀察者模式
RxAndroid - Android specific bindings for RxJava 2.This module adds the minimum classes to RxJava that make writing reactive components in Android applications easy and hassle-free.
RxAndroid在RxJava的基礎(chǔ)上添加了最少的類使得開發(fā)Android應(yīng)用中的響應(yīng)式組件更加的容易和自由
特點(diǎn)
簡(jiǎn)潔材诽,并不是指代碼量上的那種簡(jiǎn)潔底挫,而是邏輯上的簡(jiǎn)潔,隨著程序邏輯變得越來(lái)越復(fù)雜脸侥,它依然能夠保持簡(jiǎn)潔建邓。
Github
Hello world
添加依賴
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
簡(jiǎn)單版本
//簡(jiǎn)單版本
private void helloWorldSimple() {
//創(chuàng)建消費(fèi)者,消費(fèi)者接受一個(gè)String類型的事件
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
};
//被觀察者發(fā)出Hello World, 并且指定該事件的消費(fèi)者為consumer
Observable.just("Hello World").subscribe(consumer);
}
運(yùn)行結(jié)果
D/MainActivity: Hello World
復(fù)雜版本
private void helloWorldComplex() {
//Observer可以看做Consumer的完整版
Observer<String> observer = new Observer<String>() {
//當(dāng)Observable調(diào)用subscribe方法時(shí)會(huì)回調(diào)該方法
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
//onSubscribe方法后調(diào)用
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
//這里沒有出錯(cuò)睁枕,沒有被調(diào)用
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
//onNext之后調(diào)用
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};
//被觀察者發(fā)出Hello World, 并且指定該事件的觀察者為observer
Observable.just("Hello World").subscribe(observer);
}
運(yùn)行結(jié)果
D/MainActivity: onSubscribe:
D/MainActivity: onNext: Hello World
D/MainActivity: onComplete:
變態(tài)版本
private void helloWorldPlus() {
//創(chuàng)建一個(gè)觀察者
Observer<String> observer = new Observer<String>() {
//當(dāng)Observable調(diào)用subscribe方法時(shí)會(huì)回調(diào)該方法
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
//onSubscribe方法后調(diào)用
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
//這里沒有出錯(cuò)官边,沒有被調(diào)用
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
//onNext之后調(diào)用
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};
//創(chuàng)建一個(gè)Observable
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Hello World");//會(huì)調(diào)用到觀察者的onNext
e.onComplete();//會(huì)調(diào)用到觀察者的onComplete
}
});
observable.subscribe(observer);
}
運(yùn)行結(jié)果
D/MainActivity: onSubscribe:
D/MainActivity: onNext: Hello World
D/MainActivity: onComplete:
filter操作符
你早上去吃早餐沸手,師傅是被觀察者,說(shuō)咱這有包子注簿,饅頭契吉,腸粉,春卷诡渴,餃子捐晶,炒粉,你仔細(xì)想了想玩徊,發(fā)現(xiàn)你是最喜歡餃子的租悄,所以把其他的都排除掉,于是你就吃到了餃子恩袱。
private void filter() {
//把Consumer可以看做精簡(jiǎn)版的Observer
Consumer<String> consumer = new Consumer<String>() {
//accept可以簡(jiǎn)單的看做onNext
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);//這里只能吃上餃子
}
};
Observable.just("包子", "饅頭", "腸粉", "春卷", "餃子", "炒粉")
.filter(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
Log.d(TAG, "test: " + s);
return s.equals("餃子");//只允許餃子通過(guò)測(cè)試
}
})
.subscribe(consumer);
}
運(yùn)行結(jié)果
D/MainActivity: test: 包子
D/MainActivity: test: 饅頭
D/MainActivity: test: 腸粉
D/MainActivity: test: 春卷
D/MainActivity: test: 餃子
D/MainActivity: accept: 餃子
D/MainActivity: test: 炒粉
map操作符
map操作符能夠完成數(shù)據(jù)類型的轉(zhuǎn)換泣棋。 以下代碼展示了一個(gè)Student到Developer的轉(zhuǎn)換。
private void map() {
Observer<Developer> observer = new Observer<Developer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
//觀察者接收到一個(gè)Developer
@Override
public void onNext(Developer value) {
Log.d(TAG, "onNext: " + value.toString());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};
Student student = new Student();
student.setName("Leon");
student.setAge(18);
//map操作符畔塔,從Student類型轉(zhuǎn)換成Developer
Observable.just(student).map(new Function<Student, Developer>() {
@Override
public Developer apply(Student student) throws Exception {
Log.d(TAG, "apply: " + student.toString());
Developer developer = new Developer();
developer.setName(student.getName());
developer.setAge(student.getAge());
developer.setSkill("Android");
return developer;
}
}).subscribe(observer);
}
運(yùn)行結(jié)果
D/MainActivity: onSubscribe:
D/MainActivity: apply: Student{name='Leon', age=18}
D/MainActivity: onNext: Developer{name='Leon', age=18, skill='Android'}
D/MainActivity: onComplete:
flatmap操作符
flatmap能夠鏈?zhǔn)降赝瓿蓴?shù)據(jù)類型的轉(zhuǎn)換和加工潭辈。
遍歷一個(gè)學(xué)校中所有班級(jí)所有學(xué)生
private void flatmapClassToStudent() {
Observable.fromIterable(new School().getClasses())
//輸入是Class類型,輸出是ObservableSource<Student>類型
.flatMap(new Function<Class, ObservableSource<Student>>() {
//輸入是Class類型澈吨,輸出是ObservableSource<Student>類型
@Override
public ObservableSource<Student> apply(Class aClass) throws Exception {
Log.d(TAG, "apply: " + aClass.toString());
return Observable.fromIterable(aClass.getStudents());
}
}).subscribe(
new Observer<Student>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Student value) {
Log.d(TAG, "onNext: " + value.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
運(yùn)行結(jié)果
D/MainActivity: onSubscribe:
D/MainActivity: apply: Class0
D/MainActivity: onNext: Student{name='Class0_0', age=18}
D/MainActivity: onNext: Student{name='Class0_1', age=18}
D/MainActivity: onNext: Student{name='Class0_2', age=18}
D/MainActivity: apply: Class1
D/MainActivity: onNext: Student{name='Class1_0', age=18}
D/MainActivity: onNext: Student{name='Class1_1', age=18}
D/MainActivity: onNext: Student{name='Class1_2', age=18}
D/MainActivity: apply: Class2
D/MainActivity: onNext: Student{name='Class2_0', age=18}
D/MainActivity: onNext: Student{name='Class2_1', age=18}
D/MainActivity: onNext: Student{name='Class2_2', age=18}
遍歷一個(gè)學(xué)校所有班級(jí)所有組的所有學(xué)生
private void flatmapClassToGroupToStudent() {
Observable.fromIterable(new School().getClasses())
//輸入是Class類型熊锭,輸出是ObservableSource<Group>類型
.flatMap(new Function<Class, ObservableSource<Group>>() {
@Override
public ObservableSource<Group> apply(Class aClass) throws Exception {
Log.d(TAG, "apply: " + aClass.toString());
return Observable.fromIterable(aClass.getGroups());
}
})
//輸入類型是Group,輸出類型是ObservableSource<Student>類型
.flatMap(new Function<Group, ObservableSource<Student>>() {
@Override
public ObservableSource<Student> apply(Group group) throws Exception {
Log.d(TAG, "apply: " + group.toString());
return Observable.fromIterable(group.getStudents());
}
})
.subscribe(
new Observer<Student>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(Student value) {
Log.d(TAG, "onNext: " + value.toString());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
運(yùn)行結(jié)果
D/MainActivity: onSubscribe:
D/MainActivity: apply: Class0
D/MainActivity: apply: Group0
D/MainActivity: onNext: Student{name='Group0_0', age=18}
D/MainActivity: onNext: Student{name='Group0_1', age=18}
D/MainActivity: onNext: Student{name='Group0_2', age=18}
D/MainActivity: apply: Group1
D/MainActivity: onNext: Student{name='Group1_0', age=18}
D/MainActivity: onNext: Student{name='Group1_1', age=18}
D/MainActivity: onNext: Student{name='Group1_2', age=18}
D/MainActivity: apply: Group2
D/MainActivity: onNext: Student{name='Group2_0', age=18}
D/MainActivity: onNext: Student{name='Group2_1', age=18}
D/MainActivity: onNext: Student{name='Group2_2', age=18}
D/MainActivity: apply: Class1
D/MainActivity: apply: Group0
D/MainActivity: onNext: Student{name='Group0_0', age=18}
D/MainActivity: onNext: Student{name='Group0_1', age=18}
D/MainActivity: onNext: Student{name='Group0_2', age=18}
D/MainActivity: apply: Group1
D/MainActivity: onNext: Student{name='Group1_0', age=18}
D/MainActivity: onNext: Student{name='Group1_1', age=18}
D/MainActivity: onNext: Student{name='Group1_2', age=18}
D/MainActivity: apply: Group2
D/MainActivity: onNext: Student{name='Group2_0', age=18}
D/MainActivity: onNext: Student{name='Group2_1', age=18}
D/MainActivity: onNext: Student{name='Group2_2', age=18}
D/MainActivity: apply: Class2
D/MainActivity: apply: Group0
D/MainActivity: onNext: Student{name='Group0_0', age=18}
D/MainActivity: onNext: Student{name='Group0_1', age=18}
D/MainActivity: onNext: Student{name='Group0_2', age=18}
D/MainActivity: apply: Group1
D/MainActivity: onNext: Student{name='Group1_0', age=18}
D/MainActivity: onNext: Student{name='Group1_1', age=18}
D/MainActivity: onNext: Student{name='Group1_2', age=18}
D/MainActivity: apply: Group2
D/MainActivity: onNext: Student{name='Group2_0', age=18}
D/MainActivity: onNext: Student{name='Group2_1', age=18}
D/MainActivity: onNext: Student{name='Group2_2', age=18}
線程調(diào)度
關(guān)于RxJava的線程調(diào)度蒸眠,初學(xué)者只需要掌握兩個(gè)api就夠夠的啦前翎。
subscribeOn
指定Observable在一個(gè)指定的線程調(diào)度器上創(chuàng)建。只能指定一次桑阶,如果指定多次則以第一次為準(zhǔn)
observeOn
指定在事件傳遞柏副,轉(zhuǎn)換,加工和最終被觀察者接受發(fā)生在哪一個(gè)線程調(diào)度器蚣录「钤瘢可指定多次,每次指定完都在下一步生效萎河。
常用線程調(diào)度器類型
- Schedulers.single() 單線程調(diào)度器荔泳,線程可復(fù)用
- Schedulers.newThread() 為每個(gè)任務(wù)創(chuàng)建新的線程
- Schedulers.io() 處理io密集型任務(wù),內(nèi)部是線程池實(shí)現(xiàn)虐杯,可自動(dòng)根據(jù)需求增長(zhǎng)
- Schedulers.computation() 處理計(jì)算任務(wù)玛歌,如事件循環(huán)和回調(diào)任務(wù)
- AndroidSchedulers.mainThread() Android主線程調(diào)度器
示例
private void scheduleThreads() {
Observable.create(
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d(TAG, "subscribe: " + Thread.currentThread().getName());
e.onNext("Hello Leon Fan");
e.onComplete();
}
})
//指定subscribe方法在io線程池中調(diào)用
.subscribeOn(Schedulers.io())
//指定onNext方法 onComplete的方法在新建的線程中調(diào)用
.observeOn(Schedulers.newThread())
.subscribe(
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: " + Thread.currentThread().getName());
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + Thread.currentThread().getName() + " " + value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: " + Thread.currentThread().getName());
}
});
}
運(yùn)行結(jié)果
D/MainActivity: onSubscribe: main
D/MainActivity: subscribe: RxCachedThreadScheduler-4
D/MainActivity: onNext: RxNewThreadScheduler-1 Hello Leon Fan
D/MainActivity: onComplete: RxNewThreadScheduler-1
如果將示例中的.observeOn(Schedulers.newThread())改成AndroidSchedulers.mainThread(),則運(yùn)行結(jié)果如下:
D/MainActivity: onSubscribe: main
D/MainActivity: subscribe: RxCachedThreadScheduler-5
D/MainActivity: onNext: main Hello Leon Fan
D/MainActivity: onComplete: main
RxJava與Retrofit集成
我們做一個(gè)Demo通過(guò)網(wǎng)絡(luò)請(qǐng)求獲取豆瓣電影Top10的列表來(lái)展示RxJava和Retrofit的集成的姿勢(shì)擎椰。
Retrofit集成
添加依賴
compile 'com.squareup.retrofit2:retrofit:2.1.0'
compile 'com.squareup.retrofit2:converter-gson:2.1.0'
//compile 'com.squareup.retrofit2:adapter-rxjava:2.1.0' 官方adapter僅支持rxjava1.0
compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
創(chuàng)建網(wǎng)絡(luò)接口
public interface Api {
@GET("top250")
Observable<MovieBean> listTop250(@Query("start") int start, @Query("count") int count);
}
實(shí)現(xiàn)Api
public class MovieRetrofit {
private static MovieRetrofit sMovieRetrofit;
private final Api mApi;
public static MovieRetrofit getInstance() {
if (sMovieRetrofit == null) {
synchronized (MovieRetrofit.class) {
if (sMovieRetrofit == null) {
sMovieRetrofit = new MovieRetrofit();
}
}
}
return sMovieRetrofit;
}
private MovieRetrofit() {
Retrofit retrofit = new Retrofit.Builder().baseUrl("https://api.douban.com/v2/movie/")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
mApi = retrofit.create(Api.class);
}
public Api getApi() {
return mApi;
}
}
發(fā)送網(wǎng)絡(luò)請(qǐng)求刷新列表
<!--添加網(wǎng)絡(luò)權(quán)限-->
<uses-permission android:name="android.permission.INTERNET"/>
Observable<MovieBean> movieBeanObservable = MovieRetrofit.getInstance().getApi().listTop250(0, 10);
movieBeanObservable.subscribeOn(Schedulers.io())//在io線程池中執(zhí)行map
//將網(wǎng)絡(luò)的結(jié)果轉(zhuǎn)換成我們要的電影名的列表
.map(new Function<MovieBean, List<String>>() {
@Override
public List<String> apply(MovieBean movieBean) throws Exception {
List<String> array = new ArrayList<String>();
for (int i = 0; i < movieBean.getSubjects().size(); i++) {
String title = movieBean.getSubjects().get(i).getTitle();
array.add(title);
}
return array;
}
})
.observeOn(AndroidSchedulers.mainThread())//在主線程中執(zhí)行onNext
.subscribe(new Observer<List<String>>() {
......
@Override
public void onNext(List<String> value) {
ArrayAdapter<String> arrayAdapter = new ArrayAdapter<String>(MovieListActivity.this, android.R.layout.simple_list_item_1, value);
setListAdapter(arrayAdapter);
}
......
});
參考
本人旨在幫助從未接觸過(guò)RxJava的童鞋直接入坑RxJava2.0支子,更多使用姿勢(shì)請(qǐng)自行參考其他資料學(xué)習(xí)。