RxJava用了一年多凹炸,一直cv,沒(méi)仔細(xì)的去學(xué)習(xí)過(guò)懒熙。拖到rxjava2都出來(lái)了丘损,所以今年的遺愿清單里加上學(xué)習(xí)rxjava2,半年過(guò)去了工扎,做下筆記徘钥。
新建了一個(gè)javalib的module,添加rxjava的依賴肢娘。
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
寫(xiě)一個(gè)Observable呈础,然后subscribe舆驶,發(fā)現(xiàn)多了很多的訂閱者。
先試簡(jiǎn)單的猪落,
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
System.out.println("subscribe");
e.onNext(1);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
System.out.println("accept="+integer);
}
});
和rxjava1如出一轍贞远。
訂閱者只有一個(gè)方法,那如果發(fā)射異常呢笨忌?
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
System.out.println("發(fā)射1");
e.onNext(1);
System.out.println("發(fā)射異常");
e.onError(new Throwable("我是異常"));
System.out.println("發(fā)射異常后");
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
System.out.println("accept="+integer);
}
});
結(jié)果就報(bào)異常了蓝仲。accept方法并沒(méi)有走。so官疲,如果只關(guān)心onNext袱结,可以這樣寫(xiě),但是途凫,我們實(shí)際工作種要考慮異常情況垢夹。那么
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
System.out.println("發(fā)射1");
e.onNext(1);
System.out.println("發(fā)射異常");
e.onError(new Throwable("我是異常"));
System.out.println("發(fā)射異常后");
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("onNext" + integer);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
---------------------------------------------
onSubscribe
發(fā)射1
onNext1
發(fā)射異常
onError
發(fā)射異常后
這些生產(chǎn)者和消費(fèi)者都是再主線程的,如果在Android里生產(chǎn)數(shù)據(jù)超時(shí),就引起ANR,那么試一下線程調(diào)度:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
System.out.println("shot1==thread=="+Thread.currentThread().getName());
e.onNext(1);
System.out.println("shot==thread=="+Thread.currentThread().getName());
e.onError(new Throwable("shoterror"));
System.out.println("aftershoterror==thread=="+Thread.currentThread().getName());
}
}).subscribeOn(Schedulers.io()).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("onSubscribe==thread=="+Thread.currentThread().getName());
}
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("onNext" + integer+"==thread=="+Thread.currentThread().getName());
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("onError==thread=="+Thread.currentThread().getName());
}
@Override
public void onComplete() {
System.out.println("onComplete"+Thread.currentThread().getName());
}
});
System.out.println("aaaaaaaaaaa==thread==" +Thread.currentThread().getName());
.subscribeOn(Schedulers.io())后维费,生產(chǎn)消費(fèi)都在io線程果元。
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("onSubscribe==thread=="+Thread.currentThread().getName());
}
這個(gè)方法是在main線程,因?yàn)檫@個(gè)是在訂閱前的線程犀盟,就是 Observable.create的時(shí)候的線程而晒。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
System.out.println("shot1==thread==" + Thread.currentThread().getName());
e.onNext(1);
System.out.println("shot==thread==" + Thread.currentThread().getName());
e.onError(new Throwable("shoterror"));
System.out.println("aftershoterror==thread==" + Thread.currentThread().getName());
}
}).subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.io()).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("onSubscribe==thread==" + Thread.currentThread().getName());
}
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("onNext" + integer + "==thread==" + Thread.currentThread().getName());
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("onError==thread==" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
System.out.println("onComplete" + Thread.currentThread().getName());
}
});
System.out.println("aaaaaaaaaaa==thread==" + Thread.currentThread().getName());
如果我.subscribeOn(Schedulers.io())變換2次線程,結(jié)果還是第一次的線程阅畴。
)
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
System.out.println("發(fā)射1==thread==" + Thread.currentThread().getName());
e.onNext(1);
System.out.println("發(fā)射2==thread==" + Thread.currentThread().getName());
e.onNext(2);
System.out.println("發(fā)射error==thread==" + Thread.currentThread().getName());
// e.onError(new Throwable("shoterror"));
System.out.println("發(fā)射error后==thread==" + Thread.currentThread().getName());
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("onSubscribe==thread==" + Thread.currentThread().getName());
}
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("onNext" + integer + "==thread==" + Thread.currentThread().getName());
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("onError==thread==" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
System.out.println("onComplete" + Thread.currentThread().getName());
}
});
.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())倡怎,subscribeOn是生產(chǎn)發(fā)射的線程,observeOn決定的是接收消費(fèi)的線程贱枣。
如果生產(chǎn)2個(gè)數(shù)據(jù)监署,打印出來(lái)是2個(gè)都發(fā)射后才消費(fèi)。順序是這樣的嗎纽哥?驗(yàn)證:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i <1000 ; i++) {
System.out.println(i+"要發(fā)射==thread==" + Thread.currentThread().getName());
e.onNext(i);
}
}
})
確實(shí)如此钠乏,全部數(shù)據(jù)發(fā)射完后才接受,那么去掉線程調(diào)度呢
生產(chǎn)一個(gè)消費(fèi)一個(gè)春塌。