作者:浪人筆記
Rxjava
- Rxjava常用操作符
- map和flatMap有什么區(qū)別
- Rxjava1.0和Rxjava2.0有什么區(qū)別?
- subscribeOn與observeOn多次執(zhí)行會怎么樣屋匕?
- Rxjava是怎么切回到主線程的
協(xié)程
- 進(jìn)程渐白、線程、協(xié)程的區(qū)別
- 什么回調(diào)地獄以及協(xié)程在這方面的處理
- 開發(fā)中怎么選擇合適的調(diào)度器
Rxjava
Rxjava常用操作符
- map() 操作符:用于將流中的每個元素通過一個函數(shù)轉(zhuǎn)換為另一個元素粪狼。
- flatMap() 操作符:用于將流中的每個元素通過一個函數(shù)轉(zhuǎn)換為多個元素退腥,并將這些元素組合成一個新的流。
- filter() 操作符:用于過濾流中的元素再榄,只保留符合條件的元素狡刘。
- take() 操作符:用于從流中取前 n 個元素。
- reduce() 操作符:用于將流中的元素通過一個函數(shù)進(jìn)行累加困鸥,得到一個最終結(jié)果嗅蔬。
- scan() 操作符:用于將流中的元素通過一個函數(shù)進(jìn)行累加,得到每一步的中間結(jié)果疾就。
- concat() 操作符:用于將多個流組合成一個新的流澜术。
- merge() 操作符:用于將多個流合并成一個新的流。
- zip() 操作符:用于將多個流中的元素按順序一一組合成一個新的元素猬腰,并形成一個新的流鸟废。
-debounce() 操作符:用于過濾流中發(fā)射過快的元素,只保留一個元素姑荷。
map和flatMap有什么區(qū)別
- map 和 flatMap 都可以用來對數(shù)據(jù)流中的數(shù)據(jù)進(jìn)行變換盒延,但它們的實現(xiàn)方式有所不同。map 只進(jìn)行一次變換厢拭,并將變換后的結(jié)果發(fā)射出去兰英,而 flatMap 則進(jìn)行多次變換,并將得到的 Observable 合并成一個新的 Observable 發(fā)射出去
在源碼層面供鸠,map 操作符的實現(xiàn)非常簡單畦贸,它實際上就是在原有的 Observable 上添加了一個新的 MapObservable 觀察者,并將變換函數(shù)作為參數(shù)傳遞給 MapObservable。在 MapObservable 的 onNext 方法中薄坏,會將接收到的元素傳遞給變換函數(shù)進(jìn)行變換趋厉,并將變換后的結(jié)果作為新的元素發(fā)射出去。
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
在源碼層面胶坠,flatMap 操作符的實現(xiàn)相對比較復(fù)雜君账。它實際上是在原有的 Observable 上添加了一個新的 FlatMapObservable 觀察者,并將變換函數(shù)作為參數(shù)傳遞給 FlatMapObservable沈善。在 FlatMapObservable 的 onNext 方法中乡数,會將接收到的元素傳遞給變換函數(shù)進(jìn)行變換,并得到一個新的 Observable闻牡。然后净赴,它會將這個新的 Observable 注冊到一個 FlatMapSubscriber 中,等待下一次數(shù)據(jù)的到來罩润。當(dāng)所有數(shù)據(jù)都處理完成后玖翅,F(xiàn)latMapObservable 會調(diào)用 FlatMapSubscriber 的 onComplete 方法,將所有得到的 Observable 合并成一個新的 Observable割以,并將它發(fā)送給下游的觀察者金度。
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return flatMap(mapper, false, bufferSize(), bufferSize());
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
if (this instanceof ScalarCallable) {
ScalarCallable<T> scalarCallable = (ScalarCallable<T>)this;
R r = scalarCallable.call();
if (r == null) {
return empty();
}
return ObservableScalarXMap.scalarXMap(r, mapper);
}
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
Rxjava1.0和Rxjava2.0有什么區(qū)別?
- 改進(jìn)的異常處理:RxJava 2.0 改進(jìn)了異常處理機(jī)制严沥,使得開發(fā)者可以更好地處理異常猜极,避免應(yīng)用程序崩潰。
- 新的操作符:RxJava 2.0 引入了一些新的操作符祝峻,如 Flowable魔吐,Single 和 Completable,來取代舊版本的 Observable莱找。這些新的操作符可以更好地處理背壓(backpressure)和錯誤處理酬姆。
- 改進(jìn)的背壓支持:RxJava 2.0 引入了更好的背壓支持,可以更好地處理在數(shù)據(jù)源發(fā)送大量數(shù)據(jù)時的情況奥溺。
- 改進(jìn)的線程調(diào)度:RxJava 2.0 改進(jìn)了線程調(diào)度機(jī)制辞色,使得開發(fā)者可以更好地控制并發(fā)性。
5.更好的性能:RxJava 2.0 在性能上也有所提升浮定,可以更好地處理大量數(shù)據(jù)流相满。
總的來說,RxJava 2.0 在異常處理桦卒、背壓支持立美、線程調(diào)度和性能等方面都有所改進(jìn)和提升
什么是背壓?怎么改進(jìn)的方灾?
背壓(Backpressure)是指當(dāng)數(shù)據(jù)產(chǎn)生速度大于消費速度建蹄,程序處理不過來是消息就會出現(xiàn)堆積碌更。從而導(dǎo)致內(nèi)存溢出、程序崩潰等問題洞慎。這種情況被稱為背壓問題
邏輯上的改進(jìn)辦法
- 生產(chǎn)者數(shù)量=消費者數(shù)量
- 節(jié)流,丟棄一部分請求
- 打包痛单,把所有事件封裝在一個集合中發(fā)送
Rxjava1.x的時候沒有對背壓的支持,只提供了onBackpressureBuffer(time)劲腿、onBackpressureDrop() 等)來緩解背壓問題旭绒,但這些解決方案都只是對數(shù)據(jù)流進(jìn)行了緩存或者丟棄處理
RxJava 2.0后 引入了新的數(shù)據(jù)類型 Flowable,它支持背壓焦人,并提供了更多的背壓控制策略挥吵。
Flowable 類型是一個支持背壓的數(shù)據(jù)源,可以通過 onBackpressureBuffer花椭,onBackpressureDrop蔫劣,onBackpressureLatest 等方式來處理背壓問題。其中
- onBackpressureBuffer 策略會在內(nèi)存中緩存數(shù)據(jù)个从,直到消費者可以消費這些數(shù)據(jù);
- onBackpressureDrop 策略會在數(shù)據(jù)流中丟棄一部分?jǐn)?shù)據(jù)歪沃,直到消費者可以消費嗦锐;
- onBackpressureLatest 策略會只保留最新的數(shù)據(jù),丟棄舊數(shù)據(jù)沪曙。
另外Flowable 的方式和 Observable 類似,只是Flowable 在使用的時候需要注意要制定背壓策略奕污。
subscribeOn與observeOn多次執(zhí)行會怎么樣?
結(jié)論:subscribeOn只跟第一次指定的線程有關(guān)液走,執(zhí)行多次跟最后一次有關(guān)碳默。
- subscribeOn只有第一次會生效,所以只跟第一次指定的線程有關(guān)缘眶。
當(dāng)我們在一個 Observable中使用多個 subscribeOn 操作符時嘱根,它們的執(zhí)行順序只會影響到代碼中的順序,但實際上只有第一個 subscribeOn 會生效巷懈。原因是在 ObservableSubscribeOn 類的實現(xiàn)中该抒,只會在第一個 subscribeOn 操作符中調(diào)用 scheduler.scheduleDirect 方法,后面的 subscribeOn 操作符調(diào)用該方法也會被攔截顶燕,也就不會改變 Observable 的執(zhí)行線程凑保。這就是為什么在同一個 Observable 中使用多個 subscribeOn 操作符時,只有第一個 subscribeOn 會生效的原因涌攻。
//Observable.java
@Override
public final void subscribe(Observer<? super T> observer) {
...
subscribeActual(observer);
...
}
//-----------------
// ObservableSubscribeOn.java
@Override
public void subscribeActual(Observer<? super T> observer) {
if (once) {
source.subscribe(observer);
return;
}
once = true;
Scheduler scheduler = this.scheduler;
SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent, source)));
}
- observeOn 執(zhí)行多次跟最后一次有關(guān)欧引。
@Override
protected void subscribeActual(Observer<? super T> observer) {
Scheduler.Worker worker = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, worker, delayError, bufferSize));
}
其實比較好理解,subscribeOn理解為一個管道的入口恳谎,observeOn 理解為一個管道的出口芝此。數(shù)據(jù)進(jìn)去之后就沒辦法指定了,但是數(shù)據(jù)出來之前都可以在切換出口
Rxjava是怎么切回到主線程的
使用observeOn(AndroidSchedulers.mainThread()),內(nèi)部的實現(xiàn)其實是new Handler(Looper.getMainLooper())
public class MainThreadScheduler extends Scheduler {
private static MainThreadScheduler INSTANCE;
private MainThreadScheduler() {}
public static MainThreadScheduler instance() {
if (INSTANCE == null) {
INSTANCE = new MainThreadScheduler();
}
return INSTANCE;
}
@NonNull
@Override
public Worker createWorker() {
return new MainThreadWorker(new Handler(Looper.getMainLooper()));
}
private static class MainThreadWorker extends Worker {
private final Handler mHandler;
MainThreadWorker(Handler handler) {
mHandler = handler;
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable runnable) {
mHandler.post(runnable);
return Disposables.empty();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable runnable, long delay, @NonNull TimeUnit unit) {
mHandler.postDelayed(runnable, unit.toMillis(delay));
return Disposables.empty();
}
@Override
public void dispose() {}
@Override
public boolean isDisposed() {
return false;
}
}
}
協(xié)程
進(jìn)程癌蓖、線程瞬哼、協(xié)程的區(qū)別
- 進(jìn)程(Process)是指操作系統(tǒng)中的一個執(zhí)行單位,它有自己獨立的內(nèi)存空間和資源租副,可以執(zhí)行獨立的程序坐慰,是程序運行的基本單位。一個進(jìn)程可以包含多個線程用僧。
- 線程(Thread)是進(jìn)程中的一個執(zhí)行單元结胀,它共享進(jìn)程的內(nèi)存空間和資源,但具有獨立的執(zhí)行序列和運行堆棧责循。一個進(jìn)程可以包含多個線程糟港,線程之間可以并發(fā)執(zhí)行,實現(xiàn)多任務(wù)處理院仿。
- 協(xié)程(Coroutine)是一種用戶態(tài)的輕量級線程秸抚,由程序員自己控制調(diào)度,而不是由操作系統(tǒng)控制歹垫。協(xié)程可以在同一線程中實現(xiàn)并發(fā)執(zhí)行剥汤,利用時間片輪轉(zhuǎn)算法切換任務(wù),避免了線程上下文切換帶來的開銷排惨,可以提高程序的執(zhí)行效率吭敢。
Kotlin 的協(xié)程是基于 Kotlin 標(biāo)準(zhǔn)庫中的協(xié)程框架實現(xiàn)的。該框架基于一種稱為“掛起函數(shù)”的特殊函數(shù)類型實現(xiàn)暮芭,這些函數(shù)可以暫停執(zhí)行并在稍后的某個時候恢復(fù)執(zhí)行鹿驼,從而實現(xiàn)了協(xié)程的效果。不依賴于操作系統(tǒng)和編譯器辕宏。
什么回調(diào)地獄以及協(xié)程在這方面的處理
回調(diào)地獄指的是在異步編程中畜晰,如果多次嵌套使用回調(diào)函數(shù)來處理異步操作,會造成代碼的可讀性和可維護(hù)性變差匾效,代碼邏輯難以理解和調(diào)試的情況舷蟀。舉個例子
getUserInfo(userId) { user ->
getUserOrders(user.id) { orders ->
for (order in orders) {
getItems(order.id) { items ->
for (item in items) {
processItem(item) { result ->
saveResult(result) {
// ...
}
}
}
}
}
}
}
協(xié)程中的掛起函數(shù)寫法是
suspend fun processOrders(userId: String) = withContext(Dispatchers.IO) {
val user = getUserInfo(userId)
val orders = getUserOrders(user.id)
for (order in orders) {
val items = getItems(order.id)
for (item in items) {
val result = processItem(item)
saveResult(result)
}
}
}
使用 withContext 可以指定協(xié)程執(zhí)行的上下文,這里使用了 IO 線程池面哼,避免了主線程的阻塞野宜。
開發(fā)中怎么選擇合適的調(diào)度器
其實無非就是三個,一個主線程魔策,一個io密集型匈子,一個cpu密集型 rxjava中的調(diào)度器
- Schedulers.io():用于 I/O 密集型任務(wù),比如網(wǎng)絡(luò)請求等闯袒。
- Schedulers.computation():用于 CPU 密集型任務(wù)虎敦,比如類似視頻編解碼這種大量的計算和數(shù)據(jù)處理等游岳。
- Schedulers.newThread():每次都創(chuàng)建一個新線程,不推薦使用其徙。
- AndroidSchedulers.mainThread():用于 Android 平臺的 UI 線程胚迫。
對應(yīng)kotlin協(xié)程里面的是
- Dispatchers.Default:適合執(zhí)行 CPU 密集型任務(wù)的調(diào)度器,它會自動根據(jù)可用的 CPU 數(shù)量進(jìn)行調(diào)度唾那。
- Dispatchers.IO:適合執(zhí)行 I/O 密集型任務(wù)的調(diào)度器访锻,比如網(wǎng)絡(luò)請求和磁盤 I/O 等。
- Dispatchers.Main:適合在 Android 應(yīng)用程序中執(zhí)行 UI 操作的調(diào)度器闹获。在 Android 應(yīng)用程序中期犬,Main 調(diào)度器會將協(xié)程的執(zhí)行切換到主線程上。
- Dispatchers.Unconfined:一個不受限制的調(diào)度器避诽,它允許協(xié)程在調(diào)用掛起函數(shù)的線程中繼續(xù)執(zhí)行龟虎。使用這個調(diào)度器時需要特別小心,因為它可能會導(dǎo)致一些奇怪的行為沙庐。