package com.github.cai.greendaotaste.rxjava;
import android.os.Bundle;
import android.support.annotation.Nullable;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import android.widget.TextView;
import android.widget.Toast;
import com.github.cai.greendaotaste.R;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;
/**
* Created by admin on 2017/6/19.
* 參考自 碼個蛋 http://chuansong.me/n/1875207353935
* 上游和下游就分別對應(yīng)著RxJava中的Observable和Observer酱塔,它們之間的連接就對應(yīng)著subscribe()
* <p>
* 知識點一:
* 一,上游可以發(fā)送無限個onNext, 下游也可以接收無限個onNext.
* 二弟蚀,當(dāng)上游發(fā)送了一個onComplete后, 上游onComplete之后的事件將會繼續(xù)發(fā)送, 而下游收到onComplete事件之后將不再繼續(xù)接收事件.
* 三拌消,當(dāng)上游發(fā)送了一個onError后, 上游onError之后的事件將繼續(xù)發(fā)送, 而下游收到onError事件之后將不再繼續(xù)接收事件.
* 四挑豌,上游可以不發(fā)送onComplete或onError.
* 五,最為關(guān)鍵的是onComplete和onError必須唯一并且互斥, 即不能發(fā)多個onComplete, 也不能發(fā)多個onError, 也不能先發(fā)一個onComplete, 然后再發(fā)一個onError, 反之亦然
* 六墩崩,調(diào)用順序:先調(diào)用onSubscribe -> subscribe(上游方法) -> onNext -> onComplete或onError
* <p>
* 知識點二: 調(diào)用dispose()并不會導(dǎo)致上游不再繼續(xù)發(fā)送事件, 上游會繼續(xù)發(fā)送剩余的事件.
* <p>
* 知識點三:
* 帶有Observer參數(shù)的我們已經(jīng)使用過了,這里對其他幾個方法進行說明.
* 一氓英,不帶任何參數(shù)的subscribe() 表示下游不關(guān)心任何事件,你上游盡管發(fā)你的數(shù)據(jù)去吧, 老子可不管你發(fā)什么.
* 二,帶有一個Consumer參數(shù)的方法表示下游只關(guān)心onNext事件, 其他的事件我假裝沒看見, 因此我們?nèi)绻恍枰猳nNext事件可以這么寫:
* <p>
* 知識點四:
* 上游和下游是工作在同一個線程中的, 也就是說上游在哪個線程發(fā)事件, 下游就在哪個線程接收事件.
* <p>
* 知識點五:
* 一,多次指定上游的線程只有第一次指定的有效, 也就是說多次調(diào)用subscribeOn() 只有第一次的有效, 其余的會被忽略.
* 二,多次指定下游的線程是可以的, 也就是說*每調(diào)用一次observeOn()* , 下游的線程就會切換一次.
* <p>
* 知識點六:
* Schedulers.io()
* 代表io操作的線程, 通常用于網(wǎng)絡(luò),讀寫文件等io密集型的操作
* <p>
* Schedulers.computation()
* 代表CPU計算密集型的操作, 例如需要大量計算的操作
* <p>
* Schedulers.newThread()
* 代表一個常規(guī)的新線程
* <p>
* AndroidSchedulers.mainThread()
* 代表Android的主線程
* <p>
* 知識點七:
* 那如果有多個Disposable 該怎么辦呢, RxJava中已經(jīng)內(nèi)置了一個容器CompositeDisposable,
* 每當(dāng)我們得到一個Disposable時就調(diào)用CompositeDisposable.add()將它添加到容器中, 在退出的時候,
* 調(diào)用CompositeDisposable.clear() 即可切斷所有的水管.
* <p>
* 知識點八:
* 上游每發(fā)送一個事件, flatMap都將創(chuàng)建一個新的水管, 然后發(fā)送轉(zhuǎn)換之后的新的事件, 下游接收到的就是這些新的水管發(fā)送的數(shù)據(jù).
* 這里需要注意的是, flatMap并不保證事件的順序, 也就是圖中所看到的, 并不是事件1就在事件2的前面.
* 如果需要保證順序則需要使用concatMap.
* <p>
* 最終下游收到的事件數(shù)量是和上游中發(fā)送事件最少的那一根水管的事件數(shù)量相同,這個也很好理解,
* 因為是從每一根水管里取一個事件來進行合并,
* 最少的那個肯定就最先取完, 這個時候其他的水管盡管還有事件,
* 但是已經(jīng)沒有足夠的事件來組合了, 因此下游就不會收到剩余的事件
* <p>
*/
public class RxJavaActivity extends AppCompatActivity implements View.OnClickListener {
public static final String TAG = RxJavaActivity.class.getSimpleName();
private TextView showRxMessage;
private Button subscriber, observer,requestBtn;
private Subscription mSubscription;
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxjava);
showRxMessage = (TextView) findViewById(R.id.show_rx_message);
subscriber = (Button) findViewById(R.id.subscriber);
observer = (Button) findViewById(R.id.observer);
requestBtn = (Button) findViewById(R.id.request);
subscriber.setOnClickListener(this);
observer.setOnClickListener(this);
requestBtn.setOnClickListener(this);
Log.d(TAG, "onCreate: current thread is " + Thread.currentThread().getName());
}
@Override
public void onClick(View v) {
switch (v.getId()) {
case R.id.subscriber:
//simpleUseRxJava();
//onlyCareAboutOnNext();
//rxJavaMapTaste();
//flatMapTaste();
//rxJavaZipTaste();
//dealWithBackpressureWithFilter();
//dealWithBackPressWithSample();
rxJavaFlowableResponseFetchAync();
break;
case R.id.observer:
//flowableTaste();
//changeFlowableCacheSize();
//rxJavaIntervalTaste();
//rxJavaFlowableResponseFetch();
lookWhatTimeHitRequestValue();
break;
case R.id.request:
mSubscription.request(96);
break;
}
}
public void simpleUseRxJava() {
//可觀察者 上游 可觀察者所以用訂閱subscribeOn
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
//ObservableEmitter: Emitter是發(fā)射器的意思 可以聯(lián)想為送報員
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
e.onComplete();//發(fā)完一個后還可以發(fā)第二個呵燕,只是觀察者只收一個
//e.onError(new Throwable());崩潰 onComplete和onError事件是互斥的昔汉,只能發(fā)一個
Log.d(TAG, "subscribe: current thread is " + Thread.currentThread().getName());
}
});
//觀察者 所有用觀察observeOn
Observer<Integer> observer = new Observer<Integer>() {
private Disposable disposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
// Disposable, 這個單詞的字面意思是一次性用品,用完即可丟棄的.
// 那么在RxJava中怎么去理解它呢, 對應(yīng)于上面的水管的例子,
// 我們可以把它理解成兩根管道之間的一個機關(guān), 當(dāng)調(diào)用它的dispose()方法時,
// 它就會將兩根管道切斷, 從而導(dǎo)致下游收不到事件.
// 因此我們可以在Activity中將這個Disposable 保存起來, 當(dāng)Activity退出時, 切斷它即可.
showMessageInText("onSubscribe: ");
disposable = d;
Log.d(TAG, "onSubscribe: current thread is " + Thread.currentThread().getName());
}
@Override
public void onNext(@NonNull Integer integer) {
//showMessageInText("onNext: " + integer.toString());
if (integer == 2) {
disposable.dispose();
}
//showMessageInText("onNext: " + disposable.isDisposed());//切斷水管,之后的時間不在接收
//但是不妨礙上游時間的傳遞
Log.d(TAG, "onNext: current thread is " + Thread.currentThread().getName());
}
@Override
public void onError(@NonNull Throwable e) {
//showMessageInText("onError: ");
Log.d(TAG, "onError: current thread is " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
//showMessageInText("onComplete: ");
Log.d(TAG, "onComplete: current thread is " + Thread.currentThread().getName());
}
};
//郵局派送報紙(聯(lián)想訂報紙,雖然是人去訂報社報紙,但是報社是實際遞送報紙給人的),上游向下游發(fā)送事件
observable.subscribeOn(Schedulers.newThread())//指定發(fā)送事件的線程 上游發(fā)送事件的線程,
.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())//指定接收事件的線程 下游接收事件的線程.
.observeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(observer);
}
public void onlyCareAboutOnNext() {
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
//ObservableEmitter: Emitter是發(fā)射器的意思 可以聯(lián)想為送報員
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new Throwable("test error"));
//e.onComplete();
Log.d(TAG, "subscribe: current thread is " + Thread.currentThread().getName());
}
});
Consumer<Integer> careAboutNext = new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
showMessageInText("accept: " + integer.toString() + " login is success");
}
};
Consumer<Throwable> failureNext = new Consumer<Throwable>() {//注意:用于錯誤的Consumer只能使用泛型Throwable
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
showMessageInText("accept: login is failure");
}
};
observable.subscribe(careAboutNext, failureNext);
}
//map是RxJava中最簡單的一個變換操作符了, 它的作用就是對上游發(fā)送的每一個事件應(yīng)用一個函數(shù), 使得每一個事件都按照指定的函數(shù)去變化
public void rxJavaMapTaste() {
//要傳遞的是Integer類型
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(10);
e.onNext(11);
e.onNext(12);
e.onComplete();
}
});
//要接收的是String類型
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
showMessageInText("accept: I receive value is" + s);
}
};
//提供的轉(zhuǎn)換函數(shù)
observable.map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return String.valueOf(integer);
}
}).subscribe(consumer);
}
//FlatMap將一個發(fā)送事件的上游Observable變換為多個發(fā)送事件的Observables啊鸭,然后將它們發(fā)射的事件合并后放進一個單獨的Observable里.
//自己的理解:
public void flatMapTaste() {
//要傳遞的是Integer類型
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(100);
e.onNext(110);
e.onNext(120);
e.onComplete();
}
});
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
showMessageInText("accept: After flat map I receive value is" + s);
}
};
observable.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
ArrayList<String> list = new ArrayList<>(3);
for (int i = 0; i < 3; i++) {
list.add(String.valueOf(integer));
}
return Observable.fromIterable(list).delay(15, TimeUnit.MILLISECONDS);//不保證發(fā)送順序
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(consumer);
}
// Zip通過一個函數(shù)將多個Observable發(fā)送的事件結(jié)合到一起,然后發(fā)送這些組合到一起的事件.
// 它按照*嚴格的順序*應(yīng)用這個函數(shù)匿值。它只發(fā)射與 發(fā)射數(shù)據(jù)項最少的那個Observable 一樣多的數(shù)據(jù)
// (最終下游收到的事件數(shù)量是和上游中發(fā)送事件最少的那一根水管的事件數(shù)量相同)
public void rxJavaZipTaste() {
//要傳遞的是Integer類型
Observable<Integer> observableInt = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1000);
e.onNext(1100);
e.onNext(1200);
e.onNext(1300);
e.onComplete();
}
});
//要傳遞的是Integer類型
Observable<String> observableStr = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("thousand");
e.onNext("thousand one hundred");
e.onNext("thousand two hundred");
e.onComplete();
}
});
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
showMessageInText("accept: After zip I receive value is" + s);
}
};
//泛型參數(shù)表示 第一個表示一個上游的類型赠制,第二個參數(shù)表示第二個上游的類型,第三個參數(shù)表示組合后返回的類型
Observable.zip(observableInt, observableStr, new BiFunction<Integer, String, String>() {
@Override
public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
return integer + s;
}
}).subscribe(consumer);//如果沒有指定線程,默認全部都是在同一個線程中執(zhí)行
}
public void backpressureTaste() {
/**
* Zip可以將多個上游發(fā)送的事件組合起來發(fā)送給下游, 那大家有沒有想過一個問題,
* 如果其中一個水管A發(fā)送事件特別快, 而另一個水管B 發(fā)送事件特別慢, 那就可能出現(xiàn)這種情況,
* 發(fā)得快的水管A 已經(jīng)發(fā)送了1000個事件了, 而發(fā)的慢的水管B 才發(fā)一個出來, 組合了一個之后水管A 還剩999個事件,
* 這些事件需要繼續(xù)等待水管B 發(fā)送事件出來組合, 那么這么多的事件是放在哪里的呢? 總有一個地方保存吧?
* 沒錯, Zip給我們的每一根水管都弄了一個水缸 , 用來保存這些事件
*
* 所謂的Backpressure其實就是為了控制流量
*/
/**
* 知識點九:
* 一钟些,當(dāng)上下游工作在同一個線程中時, 這時候是一個同步的訂閱關(guān)系, 也就是說上游每發(fā)送一個事件必須等到下游接收處理完了以后才能接著發(fā)送下一個事件
* 相當(dāng)于直接調(diào)用方法的過程
* 二烟号,當(dāng)上下游工作在不同的線程中時, 這時候是一個異步的訂閱關(guān)系, 這個時候上游發(fā)送數(shù)據(jù)不需要等待下游接收,
* 為什么呢, 因為兩個線程并不能直接進行通信, 因此上游發(fā)送的事件并不能直接到下游里去,
* 這個時候就需要一個田螺姑娘來幫助它們倆, 這個田螺姑娘就是我們剛才說的水缸 ! 上游把事件發(fā)送到水缸里去,
* 下游從水缸里取出事件來處理, 因此, 當(dāng)上游發(fā)事件的速度太快, 下游取事件的速度太慢, 水缸就會迅速裝滿, 然后溢出來, 最后就OOM了.
*/
}
public void dealWithBackpressureWithFilter() {
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; ; i++)
e.onNext(i);
}
});
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
showMessageInText("the receive value is " + integer);
}
};
Consumer<Throwable> error = new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Toast.makeText(RxJavaActivity.this, "Oop,occur error", Toast.LENGTH_SHORT).show();
}
};
observable.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.filter(new Predicate<Integer>() {//過濾,只有符合條件的才放入水缸
@Override
public boolean test(@NonNull Integer integer) throws Exception {
if (integer % 10000 == 0)
return true;
return false;
}
})
.subscribe(consumer, error);
}
public void dealWithBackPressWithSample() {
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; ; i++)
e.onNext(i);
}
});
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "accept: the receive value is " + integer);
}
};
Consumer<Throwable> error = new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Toast.makeText(RxJavaActivity.this, "Oop,occur error", Toast.LENGTH_SHORT).show();
}
};
observable.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io())
.sample(2,TimeUnit.SECONDS)//這個操作符每隔指定的時間就從上游中取出一個事件發(fā)送給下游政恍,這里是每隔2秒鐘取樣一次
.subscribe(consumer,error);
}
/**
* 知識點一:
* 我們注意到這次和Observable有些不同. 首先是創(chuàng)建Flowable的時候增加了一個參數(shù),
* 這個參數(shù)是用來選擇背壓,也就是出現(xiàn)上下游流速不均衡的時候應(yīng)該怎么處理的辦法,
* 這里我們直接用BackpressureStrategy.ERROR這種方式, 這種方式會在出現(xiàn)上下游流速不均衡的時候直接拋出一個異常,
* 這個異常就是著名的MissingBackpressureException. 其余的策略后面再來講解
*
* 知識點二:
* 另外的一個區(qū)別是在下游的onSubscribe方法中傳給我們的不再是Disposable了, 而是Subscription,
* 它倆有什么區(qū)別呢, 首先它們都是上下游中間的一個開關(guān), 之前我們說調(diào)用Disposable.dispose()方法可以切斷水管,
* 同樣的調(diào)用Subscription.cancel()也可以切斷水管, 不同的地方在于Subscription增加了一個void request(long n)方法,
* 這個方法有什么用呢, 在上面的代碼中也有這么一句代碼:
*
* 知識點三:
* 關(guān)于Subscription的request(int)方法
* 這是因為Flowable在設(shè)計的時候采用了一種新的思路也就是響應(yīng)式拉取的方式來更好的解決上下游流速不均衡的問題,
* 與我們之前所講的控制數(shù)量和控制速度不太一樣, 這種方式用通俗易懂的話來說就好比是葉問打鬼子, 我們把上游看成小日本,
* 把下游當(dāng)作葉問, 當(dāng)調(diào)用Subscription.request(1)時, 葉問就說我要打一個! 然后小日本就拿出一個鬼子給葉問,
* 讓他打, 等葉問打死這個鬼子之后, 再次調(diào)用request(10), 葉問就又說我要打十個! 然后小日本又派出十個鬼子給葉問,
* 然后就在邊上看熱鬧, 看葉問能不能打死十個鬼子, 等葉問打死十個鬼子后再繼續(xù)要鬼子接著打...
* 所以我們把request當(dāng)做是一種能力, 當(dāng)成下游處理事件的能力, 下游能處理幾個就告訴上游我要幾個,
* 這樣只要上游根據(jù)下游的處理能力來決定發(fā)送多少事件, 就不會造成一窩蜂的發(fā)出一堆事件來, 從而導(dǎo)致OOM.
* 這也就完美的解決之前我們所學(xué)到的兩種方式的缺陷, 過濾事件會導(dǎo)致事件丟失, 減速又可能導(dǎo)致性能損失.
* 而這種方式既解決了事件丟失的問題, 又解決了速度的問題, 完美 !
*
* 知識點四:
* 首先第一個同步的代碼, 為什么上游發(fā)送第一個事件后下游就拋出了MissingBackpressureException異常(最新版的只是調(diào)用onError方法),
* 這是因為下游沒有調(diào)用request, 上游就認為下游沒有處理事件的能力, 而這又是一個同步的訂閱, 既然下游處理不了,
* 那上游不可能一直等待吧, 如果是這樣, 萬一這兩根水管工作在主線程里, 界面不就卡死了嗎, 因此只能拋個異常來提醒我們.
* 那如何解決這種情況呢, 很簡單啦, 下游直接調(diào)用request(Long.MAX_VALUE)就行了, 或者根據(jù)上游發(fā)送事件的數(shù)量來request就行了,
* 比如這里request(3)就可以了.
* 然后我們再來看看第二段代碼, 為什么上下游沒有工作在同一個線程時, 上游卻正確的發(fā)送了所有的事件呢?
* 這是因為在Flowable里默認有一個大小為128的水缸, 當(dāng)上下游工作在不同的線程中時, 上游就會先把事件發(fā)送到這個水缸中,
* 因此, 下游雖然沒有調(diào)用request, 但是上游在水缸中保存著這些事件, 只有當(dāng)下游調(diào)用request時, 才從水缸里取出事件發(fā)給下游.
*/
public void flowableTaste(){
//上游變成了Flowable
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
e.onNext(10);
e.onNext(11);
e.onNext(12);
e.onNext(13);
e.onComplete();
}
}, BackpressureStrategy.ERROR);
//下游變成了Subscriber
Subscriber<Integer> subcriber = new Subscriber<Integer>() {
Subscription s;
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe: ");
//未指定線程注釋這段話汪拥,不會發(fā)送onNext事件,發(fā)完onSubscribe篙耗,直接發(fā)onError
//如果指定線程注釋這段話迫筑,那么只會執(zhí)行onSubscribe
s.request(2);//可看做是下游處理事件的能力,但是需要上游配合宗弯,如果上游不配合應(yīng)該沒有用
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
s.request(2);
this.s = s;
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
//if (integer == 12)
//s.cancel();//切斷水管
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: " + t.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
};
flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subcriber);
}
/**
* 知識點一:
* 想想看我們之前學(xué)習(xí)Observable的時候說到的如何解決上游發(fā)送事件太快的, 有一招叫從數(shù)量上取勝,
* 同樣的Flowable中也有這種方法, 對應(yīng)的就是BackpressureStrategy.DROP和BackpressureStrategy.LATEST這兩種策略.
* 從名字上就能猜到它倆是干啥的, Drop就是直接把存不下的事件丟棄,Latest就是只保留最新的事件, 來看看它們的實際效果吧.
*/
public void changeFlowableCacheSize(){
//上游變成了Flowable
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
for (int i = 0;i < 1000; i++){
e.onNext(i);
}
e.onComplete();
}
}, BackpressureStrategy.BUFFER);//使用ERROR的時候overflow了脯燃,這次換成BUFFER,它沒有大小限制, 因此可以存放許許多多的事件.
// 但是使用BUFFER無限循環(huán)的發(fā)送事件,會和Observable一樣發(fā)生OOM
// 所以我們得從數(shù)量上取勝, 同樣的Flowable中也有這種方法, 對應(yīng)的就是BackpressureStrategy.DROP和BackpressureStrategy.LATEST這兩種策略.
//下游變成了Subscriber
Subscriber<Integer> subcriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: " + t.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
};
flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subcriber);
}
public void rxJavaIntervalTaste(){
//每隔一秒進行加1
Flowable.interval(1,TimeUnit.SECONDS)
.onBackpressureBuffer()//指定緩存的大小
.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe: ");
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Long aLong) {
Log.e(TAG, "onNext: " + aLong);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: " + t.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
});
}
/**
* 知識點一:
* 在下游中調(diào)用Subscription.request(n)就可以告訴上游蒙保,下游能夠處理多少個事件辕棚,
* 那么上游要根據(jù)下游的處理能力正確的去發(fā)送事件,那么上游是不是應(yīng)該知道下游的處理能力是多少啊追他,
* 對吧坟募,不然岛蚤,一個巴掌拍不響啊邑狸,這種事情得你情我愿才行。
* 那么上游從哪里得知下游的處理能力呢涤妒?我們來看看上游最重要的部分单雾,肯定就是FlowableEmitter了啊,
* 我們就是通過它來發(fā)送事件的啊她紫,來看看它的源碼吧
*
* 知識點二:
* 下游調(diào)用request(n) 告訴上游它的處理能力硅堆,上游每發(fā)送一個next事件之后,requested就減一贿讹,注意是next事件渐逃,
* complete和error事件不會消耗requested,當(dāng)減到0時民褂,則代表下游沒有處理能力了茄菊,這個時候你如果繼續(xù)發(fā)送事件,就會調(diào)用onError事件了
*/
public void rxJavaFlowableResponseFetch(){//同步的情況
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
e.onNext(15);
e.onNext(17);
e.onNext(19);
e.onNext(21);
e.onComplete();
}
},BackpressureStrategy.ERROR);
Subscriber<Integer> subcriber = new Subscriber<Integer>() {
Subscription sub;
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe: ");
sub = s;
s.request(1);
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
sub.request(1);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: " + t.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
};
flowable.subscribe(subcriber);
}
/**
* 可以看到赊堪,當(dāng)上下游工作在不同的線程里時面殖,每一個線程里都有一個requested,而我們調(diào)用request(1000)時哭廉,
* 實際上改變的是下游主線程中的requested脊僚,而上游中的requested的值是由RxJava內(nèi)部調(diào)用request(n)去設(shè)置的,
* 這個調(diào)用會在合適的時候自動觸發(fā)遵绰。
*
* question:什么時候是何時的時候辽幌?
* 剛才同步的時候我們說了增淹,上游每發(fā)送一個事件,requested的值便會減一乌企,對于異步來說同樣如此埠通,
* 那有人肯定有疑問了,一開始上游的requested的值是128逛犹,那這128個事件發(fā)送完了不就不能繼續(xù)發(fā)送了嗎端辱?
*/
public void rxJavaFlowableResponseFetchAync(){//異步的情況
//上下游都不做操作的情況
Flowable<Integer> flowableNothingDoing = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
Log.e(TAG, "subscribe: request count is " + e.requested());//128
}
},BackpressureStrategy.ERROR);
Subscriber<Integer> subcriberNothingDoing = new Subscriber<Integer>() {
Subscription sub;
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe: ");
sub = s;
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: " + t.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
};
flowableNothingDoing.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subcriberNothingDoing);
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
Log.e(TAG, "subscribe: request count is " + e.requested());//128
e.onNext(15);
e.onNext(17);
e.onNext(19);
e.onNext(21);
e.onComplete();
}
},BackpressureStrategy.ERROR);
Subscriber<Integer> subcriber = new Subscriber<Integer>() {
Subscription sub;
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe: ");
sub = s;
s.request(1000);
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
sub.request(1);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: " + t.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
};
flowable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subcriber);
}
/**
* 可以看到,當(dāng)下游消費掉第96個事件之后虽画,上游又開始發(fā)事件了舞蔽,
* 而且可以看到當(dāng)前上游的requested的值是96(打印出來的95是已經(jīng)發(fā)送了一個事件減一之后的值),
* 最終發(fā)出了第223個事件之后又進入了等待區(qū)码撰,而223-127 正好等于 96渗柿。
* 這是不是說明當(dāng)下游每消費96個事件便會自動觸發(fā)內(nèi)部的request()去設(shè)置上游的requested的值啊脖岛!沒錯朵栖,就是這樣,而這個新的值就是96柴梆。
*/
public void lookWhatTimeHitRequestValue(){
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
Log.e(TAG, "subscribe: request count is " + e.requested());//128
boolean flag;
for (int i = 0; ; i++){
flag = false;
while(e.requested() == 0){
if (!flag) {
Log.e(TAG, "subscribe: Oop I can't emitter event");
flag =true;
}
}
e.onNext(i);
Log.e(TAG, "subscribe: emit " + i + " request is " + e.requested());
}
}
},BackpressureStrategy.ERROR);
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe: ");
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: " + t.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
};
flowable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
}
public void showMessageInText(String appendStr) {
showRxMessage.append(appendStr + "\n");
}
}
RxJava2 筆記
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
- 文/潘曉璐 我一進店門吉拳,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人适揉,你說我怎么就攤上這事留攒。” “怎么了涡扼?”我有些...
- 文/不壞的土叔 我叫張陵稼跳,是天一觀的道長。 經(jīng)常有香客問我吃沪,道長汤善,這世上最難降的妖魔是什么? 我笑而不...
- 正文 為了忘掉前任,我火速辦了婚禮红淡,結(jié)果婚禮上不狮,老公的妹妹穿的比我還像新娘。我一直安慰自己在旱,他們只是感情好摇零,可當(dāng)我...
- 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著桶蝎,像睡著了一般驻仅。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上登渣,一...
- 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼廊遍!你這毒婦竟也來了嬉愧?” 一聲冷哼從身側(cè)響起,我...
- 正文 年R本政府宣布,位于F島的核電站墨闲,受9級特大地震影響今妄,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
- 文/蒙蒙 一盾鳞、第九天 我趴在偏房一處隱蔽的房頂上張望犬性。 院中可真熱鬧,春花似錦腾仅、人聲如沸乒裆。這莊子的主人今日做“春日...
- 文/蒼蘭香墨 我抬頭看了看天上的太陽鹤耍。三九已至,卻和暖如春验辞,著一層夾襖步出監(jiān)牢的瞬間惰蜜,已是汗流浹背。 一陣腳步聲響...
推薦閱讀更多精彩內(nèi)容
- map變換 從Obserable事件發(fā)出開始撑教,每一次map操作符的調(diào)用,就是將上一個的Obserable事件用Ob...
- 核心:異步 觀察者模式 作用:RxJava的特點就是可以非常簡便的實現(xiàn)異步調(diào)用醉拓,可以在邏輯復(fù)雜的代碼邏...
- 線程調(diào)度源碼分析1:subscribeOn subscribeOn(Schedulers.io())這句其實就是創(chuàng)...