轉(zhuǎn)載請(qǐng)標(biāo)明地址 QuincySx:[http://www.reibang.com/p/d9da64774f7b]
近期用到 RxJava ,線程切換的時(shí)候出了點(diǎn)小插曲仁讨,首先先上理論,在上實(shí)踐咐旧,不喜理論可跳過掂骏,此篇文章適合會(huì)使用 RxJava 的人群,如果還沒有接觸過可以自學(xué)過后再來讀這篇文章,這篇文章這幾個(gè)例子其實(shí)代碼都是基本都是一樣的眯娱,我也不知道這樣寫是不是更清晰
理論
總所周知 RxJava 在切換線程時(shí)用到了兩個(gè)方法 subscribeOn()
和 observeOn()
下面來分別解釋一下這兩個(gè)方法
- subscribeOn() : 影響的是最開始的被觀察者所在的線程礁苗。當(dāng)使用多個(gè) subscribeOn() 的時(shí)候,只有第一個(gè) subscribeOn() 起作用徙缴;
- observeOn() : 影響的是跟在后面的操作(指定觀察者運(yùn)行的線程)试伙。所以如果想要多次改變線程,可以多次使用 observeOn于样;
我之前還看到有人說 subscribeOn()
必須在 observeOn()
的前面疏叨,不過經(jīng)過我測(cè)試他兩個(gè)的位置并沒有什么聯(lián)系,就如上面所說 第一次出現(xiàn) subscribeOn()
的地方是有效的穿剖,其他的無效
實(shí)踐
實(shí)踐一下蚤蔓,先來個(gè)基本的栗子
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "===create: " + Thread.currentThread().getName());
subscriber.onNext("1");
}
})
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
Log.e(TAG, "===String -> Integer: " + Thread.currentThread().getName());
return Integer.valueOf(s);
}
})
.flatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(final Integer integer) {
Log.e(TAG, "===Integer->Observable: " + Thread.currentThread().getName());
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "===Observable<String> call: " + Thread.currentThread().getName());
for (int i = 0; i < integer; i++) {
subscriber.onNext(i + "");
}
subscriber.onCompleted();
}
});
}
})
.map(new Func1<String, Long>() {
@Override
public Long call(String s) {
Log.e(TAG, "===String->Long: " + Thread.currentThread().getName());
return Long.parseLong(s);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long aLong) {
Log.e(TAG, "===onNext: " + Thread.currentThread().getName());
}
});
這個(gè)例子呢就是簡(jiǎn)單的將 String 轉(zhuǎn)為 Integer 然后轉(zhuǎn)換發(fā)射源 發(fā)射 String 在然后 將 String 轉(zhuǎn)換為 Long 然后打印
這個(gè)例子呢沒有什么實(shí)際意義,只作為個(gè)示例
有沒有看到
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
這兩句代碼 我切換了一下線程 糊余,接下來看一下我的運(yùn)行結(jié)果
===create: RxIoScheduler-2
===String -> Integer: RxIoScheduler-2
===Integer->Observable: RxIoScheduler-2
===Observable<String> call: RxIoScheduler-2
===String->Long: RxIoScheduler-2
===onNext: main
接下來解釋一下秀又,因?yàn)?code>subscribeOn(Schedulers.io())它指定了最開始的被觀察者所在的線程所以后面的操作都是根據(jù)最開始的被觀察者制定的線程運(yùn)行的,又因?yàn)?.observeOn(AndroidSchedulers.mainThread())
它指定了都面的操作符使用主線程運(yùn)行贬芥。
接下來再寫一個(gè)多 subscribeOn()
observeOn()
的情況
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "===create: " + Thread.currentThread().getName());
subscriber.onNext("1");
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
Log.e(TAG, "===String -> Integer: " + Thread.currentThread().getName());
return Integer.valueOf(s);
}
})
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(final Integer integer) {
Log.e(TAG, "===Integer->Observable: " + Thread.currentThread().getName());
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "===Observable<String> call: " + Thread.currentThread().getName());
for (int i = 0; i < integer; i++) {
subscriber.onNext(i + "");
}
subscriber.onCompleted();
}
});
}
})
.observeOn(Schedulers.io())
.map(new Func1<String, Long>() {
@Override
public Long call(String s) {
Log.e(TAG, "===String->Long: " + Thread.currentThread().getName());
return Long.parseLong(s);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long aLong) {
Log.e(TAG, "===onNext: " + Thread.currentThread().getName());
}
});
運(yùn)行結(jié)果
===create: main
===String -> Integer: RxIoScheduler-4
===Integer->Observable: main
===Observable<String> call: main
===String->Long: RxIoScheduler-3
===onNext: main
下面應(yīng)該不用我解釋就能知道結(jié)果為什么是這樣的
簡(jiǎn)單解釋一下 因?yàn)橛袃蓚€(gè)subscribeOn()
所以取第一個(gè)吐辙,所以最開始的被觀察者所在的線程為主線程,接著使用observeOn()
使用制定后面的操作符為 io 線程蘸劈,接著observeOn()
又指定后面的操作符為主線程...以此類推不再贅述
接下來到了我要說的重點(diǎn)了也是我受到迷惑的地方昏苏,我就簡(jiǎn)單的寫一段栗子來重現(xiàn)一下,直接上代碼
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "===create: " + Thread.currentThread().getName());
subscriber.onNext("1");
}
})
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
Log.e(TAG, "===String -> Integer: " + Thread.currentThread().getName());
return Integer.valueOf(s);
}
})
.flatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(final Integer integer) {
Log.e(TAG, "===Integer->Observable: " + Thread.currentThread().getName());
return forEach(integer);
}
})
.map(new Func1<String, Long>() {
@Override
public Long call(String s) {
Log.e(TAG, "===String->Long: " + Thread.currentThread().getName());
return Long.parseLong(s);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long aLong) {
Log.e(TAG, "===onNext: " + Thread.currentThread().getName());
}
});
可以看到 flatMap
那個(gè)操作符哪里使用了一個(gè)方法 forEach(int)
代碼如下
public Observable<String> forEach(final int integer) {
return Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "===Observable<String> call: " + Thread.currentThread().getName());
for (int i = 0; i < integer; i++) {
subscriber.onNext(i + "");
}
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
再貼一下結(jié)果
===create: RxIoScheduler-2
===String -> Integer: RxIoScheduler-2
===Integer->Observable: RxIoScheduler-2
===Observable<String> call: RxIoScheduler-3
===String->Long: main
===onNext: main
解釋一下結(jié)果:我想大家肯定發(fā)現(xiàn)了端倪,為什么 String -> Long 這個(gè)步驟為什么會(huì)在主線程里運(yùn)行呢捷雕,原因呢很簡(jiǎn)單 flatMap 變換被監(jiān)聽者椒丧,這個(gè)被監(jiān)聽者使用observeOn
切換了后邊操作符的線程,影響到了 flatMap 后面的 map 操作符所以導(dǎo)致了如此結(jié)果
緊接著我們?cè)诳匆粋€(gè)例子
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "===create: " + Thread.currentThread().getName());
subscriber.onNext("1");
}
})
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
Log.e(TAG, "===String -> Integer: " + Thread.currentThread().getName());
return Integer.valueOf(s);
}
})
.flatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(final Integer integer) {
Log.e(TAG, "===Integer->Observable: " + Thread.currentThread().getName());
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(final Subscriber<? super String> subscriber) {
forEach(integer, new CallBack() {
@Override
public void call(String s) {
Log.e(TAG, "===Subscriber: " + Thread.currentThread().getName());
subscriber.onNext(s);
}
});
}
});
}
})
.map(new Func1<String, Long>() {
@Override
public Long call(String s) {
Log.e(TAG, "===String->Long: " + Thread.currentThread().getName());
return Long.parseLong(s);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long aLong) {
Log.e(TAG, "===onNext: " + Thread.currentThread().getName());
}
});
public void forEach(final int integer, final CallBack back) {
Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "===Observable<String> call: " + Thread.currentThread()
.getName());
for (int i = 0; i < integer; i++) {
subscriber.onNext(i + "");
}
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
back.call(s);
}
});
}
interface CallBack {
void call(String s);
}
結(jié)果
===create: RxIoScheduler-4
===String -> Integer: RxIoScheduler-4
===Integer->Observable: RxIoScheduler-4
===Observable<String> call: RxIoScheduler-5
===Subscriber: main
===String->Long: main
===onNext: main
大家發(fā)現(xiàn)的現(xiàn)在還是 和上一個(gè)例子相仿救巷,主要的改變還是在 flatMap()
這個(gè)方法里壶熏,現(xiàn)在我的做法是創(chuàng)建一個(gè)被監(jiān)聽者,然后里面是調(diào)用forEach()
等待接口返回值浦译,再往下發(fā)射數(shù)據(jù)棒假。
注意以下幾點(diǎn):
- 在
forEach()
方法里面切換了線程,這個(gè)回調(diào)接口的線程為 主線程 - 我們?cè)?code>flatMap()這里新建了一個(gè)發(fā)射源(被監(jiān)聽者)精盅、在這里我們并沒有指定它的線程帽哑,所以
flatMap()
是什么線程,這個(gè)被監(jiān)聽者就是什么線程
看清以上兩點(diǎn)叹俏,我們分析一下妻枕,flatMap()
以上的結(jié)果大家肯定都能想出來,我們就直接從flatMap()
開始分析了粘驰,都知道了回調(diào)的接口里面的線程為主線程屡谐,那么它作為被觀察者它下面的操作符按照 RxJava 線程切換的基本原理來說,肯定也是主線程蝌数。
上個(gè)圖理解一下
所以大家如果像我這樣使用 flatMap()
的時(shí)候一定注意下面操作符的線程
總結(jié)
今天去調(diào)一個(gè) Js 的方法愕掏,然后有一個(gè)接口回調(diào),無意中看見一個(gè) 名字奇怪的 線程名稱顶伞,一想 RxJava 的 io 線程名 應(yīng)該是 RxIoScheduler 開頭的啊饵撑,突然有點(diǎn)蒙,感覺自己一點(diǎn)都不了解 RxJava 的線程 切換了唆貌,結(jié)果分析一波 也是萬變不離其宗滑潘,還是挺有意思的一個(gè)小插曲
歡迎各位前來拍磚