Rxjava2~take~timer~interval~buffer~filter等源碼如何實(shí)現(xiàn)(你應(yīng)該懂的)~學(xué)渣帶你扣rxjava2

take()


<pre>
Observable.just(1, 2, 3, 4, 5)
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.take(3)
.subscribe(getObserver())

</pre>

輸出沒(méi)錯(cuò)是123

我們面來(lái)看看源碼
直接來(lái)看ObservableTake的subscribeActual钙勃,[不懂的同學(xué)請(qǐng)看我前面的學(xué)渣系列]
<pre>
protected void subscribeActual(Observer<? super T> observer) {
source.subscribe(new TakeObserver<T>(observer, limit));
}
</pre>
這個(gè)source是ObservableSource的對(duì)象。 那么我們?nèi)フ覍?shí)現(xiàn)他的Observable
好吧 又回到了碉就。
<pre>
public final void subscribe(Observer<? super T> observer)
subscribeActual(observer);
其他的省略了

</pre>
關(guān)鍵點(diǎn)一步底挫,這回調(diào)用了誰(shuí)的方法呢血久? 下面來(lái)揭曉
是ObservableObserveOn的subscribeActual
<pre>
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

</pre>
看到了嗎 又會(huì)調(diào)用
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
然后 又要調(diào)用的是ObservableSubscribeOn的subscribeActual
<pre>
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }));
}

</pre>

大家會(huì)好奇這兩個(gè)地方為什么會(huì)被調(diào)用呢?
下面我給大家看一個(gè)地方
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
O(∩_∩)O
你沒(méi)有看錯(cuò)
<pre>
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
</pre>
大家可以看到不。 這兩個(gè)方法返回的也是Observable對(duì)象挖炬。 所以 他們會(huì)分別調(diào)用這兩個(gè)對(duì)象subscribeActual方法讯壶。好吧料仗,讓我們來(lái)像下進(jìn)行。
【下面是一個(gè)小擴(kuò)展 給大家一個(gè)小小的感覺(jué)】
<pre>

Observable.just(1, 2, 3, 4, 5)

            .observeOn(AndroidSchedulers.mainThread())
            .take(3)
            .subscribe(getObserver())

看到有什么不同了嗎伏蚊? 我注釋掉了一個(gè)方法立轧。我為什么要這么干?我注視掉了那么
source.subscribe 會(huì)調(diào)用誰(shuí)呢躏吊? 我直接給出來(lái)答案氛改。大家可以思考一個(gè) 當(dāng)我直接注釋之后會(huì)調(diào)用just的subscribeActual

public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
@Override
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);

    s.onSubscribe(d);

    if (d.fusionMode) {
        return;
    }

    d.run();
}

相信大家看過(guò)我之前的應(yīng)該可以看懂。
</pre>

讓我們回歸正題當(dāng)執(zhí)行到ObservableSubscribeOn的subscribeActual的方法的時(shí)候

  public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }));

source.subscribe(parent); 看到這個(gè)方法了嗎颜阐、首先它是異步的平窘。另外執(zhí)行
.source.subscribe(parent);的時(shí)候 ,實(shí)際上就執(zhí)行了ObservableFromArray的subscribeActual
<pre>
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
s.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();

</pre>
剩下的就好理解了凳怨,都是分別執(zhí)行onnext等方法瑰艘。

到這里task的大體思路介紹完畢

2下面開始timer 定時(shí)器

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    w.schedule(new Runnable() {
        @Override
        public void run() {
            try {
                decoratedRun.run();
            } finally {
                w.dispose();
            }
        }
    }, delay, unit);

    return w;
}

重復(fù)的就不貼了是鬼。 都是差不多重復(fù)的。 只是給大家貼上關(guān)鍵代碼
看到這里面了嗎紫新。delay 就是大家貼上的時(shí)間均蜜。 詳細(xì)這個(gè)大家都是可以看明白的。芒率,

3interval

做周期性操作囤耳,從翻譯上大家就應(yīng)該可以看明白
ComputationScheduler的schedulePeriodicallyDirect的方法
<pre>
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
PoolWorker w = pool.get().getEventLoop();
return w.schedulePeriodicallyDirect(run, initialDelay, period, unit);
}
</pre>

<pre>
NewThreadWorker的schedulePeriodicallyDirect的方法
public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDelay, long period, TimeUnit unit) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
try {
Future<?> f = executor.scheduleAtFixedRate(decoratedRun, initialDelay, period, unit);
return Disposables.fromFuture(f);
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}

</pre>

分別設(shè)置了 什么時(shí)候開始。多長(zhǎng)時(shí)間執(zhí)行一次

4buffer

Observable<List<String>> buffered = getObservable().buffer(3, 2);

    buffered.subscribe(getObserver());

ObservableBuffer的subscribeActual的方法
<pre>

protected void subscribeActual(Observer<? super U> t) {
if (skip == count) {
BufferExactObserver<T, U> bes = new BufferExactObserver<T, U>(t, count, bufferSupplier);
if (bes.createBuffer()) {
source.subscribe(bes);
}
} else {
source.subscribe(new BufferSkipObserver<T, U>(t, count, skip, bufferSupplier));
}
}
</pre>
好吧到了關(guān)鍵的地方 source.subscribe是調(diào)用誰(shuí)的地方
Observable.just("one", "two", "three", "four", "five");
所以是ObservableFromArray的subscribeActual方法

<pre>
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);

    s.onSubscribe(d);

    if (d.fusionMode) {
        return;
    }

    d.run();
}

void run() {
T[] a = array;
int n = a.length;

        for (int i = 0; i < n && !isDisposed(); i++) {
            T value = a[i];
            if (value == null) {
                actual.onError(new NullPointerException("The " + i + "th element is null"));
                return;
            }
            actual.onNext(value);
        }
        if (!isDisposed()) {
            actual.onComplete();
        }
    }

</pre>

看到這個(gè)for方法了嗎 這個(gè)就是決定你跳過(guò)的數(shù)量的偶芍。

5filter

Paste_Image.png

這個(gè)相信大家很熟悉充择,對(duì)就是過(guò)濾
<pre>
fromArray(1, 0, 6)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer.intValue() > 5;
}
})
</pre>

這里只是放出來(lái)關(guān)鍵代碼
ObservableFilter的onNext
<pre>
public void onNext(T t) {
if (sourceMode == NONE) {
boolean b;
try {
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return;
}
if (b) {
actual.onNext(t);
}
} else {
actual.onNext(null);
}
}

</pre>
這個(gè)b就是你的過(guò)濾條件。 下面的就是判斷匪蟀。 不符合的就不執(zhí)行 actual.onNext(t);其實(shí)很簡(jiǎn)單的方式

6skip

和上面同理關(guān)鍵部分ObservableSkip的onNext方法
<pre>

public void onNext(T t) {
if (remaining != 0L) {
remaining--;
} else {
actual.onNext(t);
}
}
</pre>

7 scan

Paste_Image.png

RxJava的scan()函數(shù)可以看做是一個(gè)累加器函數(shù)椎麦。scan()函數(shù)對(duì)原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)都應(yīng)用一個(gè)函數(shù),它將函數(shù)的結(jié)果填充回可觀測(cè)序列材彪,等待和下一次發(fā)射的數(shù)據(jù)一起使用观挎。

關(guān)鍵代碼
<pre>
@Override
public void onNext(T t) {
if (done) {
return;
}
final Observer<? super T> a = actual;
T v = value;
if (v == null) {
value = t;
a.onNext(t);
} else {
T u;

            try {
                u = ObjectHelper.requireNonNull(accumulator.apply(v, t), "The value returned by the accumulator is null");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                s.dispose();
                onError(e);
                return;
            }

            value = u;
            a.onNext(u);
        }
    }

</pre>
執(zhí)行的時(shí)候value 會(huì)累加。 a.onNext(u);在發(fā)射出去

8 replay

<pre>
PublishSubject<Integer> source = PublishSubject.create();
ConnectableObservable<Integer> connectableObservable = source.replay(2); // bufferSize = 3 to retain 3 values to replay
connectableObservable.connect(); // connecting the connectableObservable
connectableObservable.subscribe(getFirstObserver());
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();

   /*
     * it will emit 2, 3, 4 as (count = 3), retains the 3 values for replay
     */
    connectableObservable.subscribe(getSecondObserver());

</pre>
replay 這個(gè)是緩存操作段化。
第二次訂閱之后嘁捷,就是緩存后面兩個(gè)數(shù)據(jù)

9concat

Paste_Image.png

<pre>
final String[] aStrings = {"A1", "A2", "A3", "A4"};
final String[] bStrings = {"B1", "B2", "B3"};

    final Observable<String> aObservable = Observable.fromArray(aStrings);
    final Observable<String> bObservable = Observable.fromArray(bStrings);

    Observable.concat(aObservable, bObservable)
            .subscribe(getObserver());

</pre>

他的過(guò)程是
<pre>
return RxJavaPlugins.onAssembly(new ObservableConcatMap(fromArray(sources), Functions.identity(), bufferSize(), ErrorMode.BOUNDARY));
</pre>

concat操作符肯定也是有序的,實(shí)際上fromArray(sources)這么一個(gè)過(guò)程显熏。

10merge

Paste_Image.png

<pre>
final String[] aStrings = {"A1", "A2", "A3", "A4"};
final String[] bStrings = {"B1", "B2", "B3"};

    final Observable<String> aObservable = Observable.fromArray(aStrings);
    final Observable<String> bObservable = Observable.fromArray(bStrings);

    Observable.merge(aObservable, bObservable)
            .subscribe(getObserver());

</pre>

無(wú)序的合并

11distinct 去除重復(fù)的

Paste_Image.png

<pre>
enum HashSetCallable implements Callable<Set<Object>> {
INSTANCE;
@Override
public Set<Object> call() throws Exception {
return new HashSet<Object>();
}
}
</pre>
HashSet中 是不允許重復(fù)元素的

12last

Paste_Image.png

<pre>
private void doSomeWork() {
getObservable().last("A1") // the default item ("A1") to emit if the source ObservableSource is empty
.subscribe(getObserver());
}

private Observable<String> getObservable() {
    return Observable.just("A1", "A2", "A3", "A4", "A5", "A6");
}

打印出來(lái)的是a6

</pre>

ObservableFromArray的run方法
<pre>
void run() {
T[] a = array;
int n = a.length;

        for (int i = 0; i < n && !isDisposed(); i++) {
            T value = a[i];
            if (value == null) {
                actual.onError(new NullPointerException("The " + i + "th element is null"));
                return;
            }
            actual.onNext(value);
        }
        if (!isDisposed()) {
            actual.onComplete();
        }
    }

</pre>

ObservableLastSingle的onComplete
<pre>
public void onComplete() {
s = DisposableHelper.DISPOSED;
T v = item;
if (v != null) {
item = null;
actual.onSuccess(v);
} else {
v = defaultItem;
if (v != null) {
actual.onSuccess(v);
} else {
actual.onError(new NoSuchElementException());
}
}

</pre>

last方法會(huì)返回Single

13throttleFirst

Paste_Image.png

<pre>
private void doSomeWork() {
getObservable()
.throttleFirst(500, TimeUnit.MILLISECONDS)
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}

private Observable<Integer> getObservable() {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // send events with simulated time wait
            Thread.sleep(0);
            emitter.onNext(1); // skip
            emitter.onNext(2); // deliver
            Thread.sleep(505);
            emitter.onNext(3); // skip
            Thread.sleep(99);
            emitter.onNext(4); // skip
            Thread.sleep(100);
            emitter.onNext(5); // skip
            emitter.onNext(6); // deliver
            Thread.sleep(305);
            emitter.onNext(7); // deliver
            Thread.sleep(510);
            emitter.onComplete();
        }
    });
}

</pre>

從這個(gè)可以理解到發(fā)送第一個(gè)之后雄嚣。剩下的500之后才會(huì)接受第二個(gè)

14throttleLast

Paste_Image.png

從這個(gè)可以看出來(lái),這是在一段時(shí)間內(nèi)接受最后一個(gè)數(shù)據(jù)
<pre>
getObservable()
.throttleLast(500, TimeUnit.MILLISECONDS)
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}

private Observable<Integer> getObservable() {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // send events with simulated time wait
            Thread.sleep(0);
            emitter.onNext(1); // skip
            emitter.onNext(2); // deliver
            Thread.sleep(505);
            emitter.onNext(3); // skip
            Thread.sleep(99);
            emitter.onNext(4); // skip
            Thread.sleep(100);
            emitter.onNext(5); // skip
            emitter.onNext(6); // deliver
            Thread.sleep(305);
            emitter.onNext(7); // deliver
            Thread.sleep(510);
            emitter.onComplete();
        }
    });
}

</pre>

15debounce

<pre>

getObservable()
            .debounce(500, TimeUnit.MILLISECONDS)
            // Run on a background thread
            .subscribeOn(Schedulers.io())
            // Be notified on the main thread
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(getObserver());
}

private Observable<Integer> getObservable() {
    return Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            // send events with simulated time wait
            emitter.onNext(1); // skip
            Thread.sleep(400);
            emitter.onNext(2); // deliver
            Thread.sleep(505);
            emitter.onNext(3); // skip
            Thread.sleep(100);
            emitter.onNext(4); // deliver
            Thread.sleep(605);
            emitter.onNext(5); // deliver
            Thread.sleep(510);
            emitter.onComplete();
        }
    });

</pre>

Paste_Image.png

這個(gè)接受的一一個(gè)時(shí)間跨度之內(nèi)的數(shù)據(jù)

16window

Paste_Image.png

可以看出來(lái)大概 的意思就是截取被觀察者組成一個(gè)新的被觀察者

17delay

Paste_Image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末佃延,一起剝皮案震驚了整個(gè)濱河市现诀,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌履肃,老刑警劉巖仔沿,帶你破解...
    沈念sama閱讀 222,865評(píng)論 6 518
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異尺棋,居然都是意外死亡封锉,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,296評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門膘螟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)成福,“玉大人,你說(shuō)我怎么就攤上這事荆残∨” “怎么了?”我有些...
    開封第一講書人閱讀 169,631評(píng)論 0 364
  • 文/不壞的土叔 我叫張陵内斯,是天一觀的道長(zhǎng)蕴潦。 經(jīng)常有香客問(wèn)我像啼,道長(zhǎng),這世上最難降的妖魔是什么潭苞? 我笑而不...
    開封第一講書人閱讀 60,199評(píng)論 1 300
  • 正文 為了忘掉前任忽冻,我火速辦了婚禮,結(jié)果婚禮上此疹,老公的妹妹穿的比我還像新娘僧诚。我一直安慰自己,他們只是感情好蝗碎,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,196評(píng)論 6 398
  • 文/花漫 我一把揭開白布湖笨。 她就那樣靜靜地躺著,像睡著了一般衍菱。 火紅的嫁衣襯著肌膚如雪赶么。 梳的紋絲不亂的頭發(fā)上肩豁,一...
    開封第一講書人閱讀 52,793評(píng)論 1 314
  • 那天脊串,我揣著相機(jī)與錄音,去河邊找鬼清钥。 笑死琼锋,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的祟昭。 我是一名探鬼主播缕坎,決...
    沈念sama閱讀 41,221評(píng)論 3 423
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼篡悟!你這毒婦竟也來(lái)了谜叹?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,174評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤搬葬,失蹤者是張志新(化名)和其女友劉穎荷腊,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體急凰,經(jīng)...
    沈念sama閱讀 46,699評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡女仰,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,770評(píng)論 3 343
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了抡锈。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片疾忍。...
    茶點(diǎn)故事閱讀 40,918評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖床三,靈堂內(nèi)的尸體忽然破棺而出一罩,到底是詐尸還是另有隱情,我是刑警寧澤撇簿,帶...
    沈念sama閱讀 36,573評(píng)論 5 351
  • 正文 年R本政府宣布聂渊,位于F島的核電站推汽,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏歧沪。R本人自食惡果不足惜歹撒,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,255評(píng)論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望诊胞。 院中可真熱鬧暖夭,春花似錦、人聲如沸撵孤。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,749評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)邪码。三九已至裕菠,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間闭专,已是汗流浹背奴潘。 一陣腳步聲響...
    開封第一講書人閱讀 33,862評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留影钉,地道東北人画髓。 一個(gè)月前我還...
    沈念sama閱讀 49,364評(píng)論 3 379
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像平委,于是被迫代替她去往敵國(guó)和親奈虾。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,926評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容

  • 我和rx的對(duì)話來(lái)繼續(xù)了解冷觀察者和熱觀察者 wow我們又多了一天來(lái)學(xué)習(xí)新的知識(shí)這真的很棒廉赔。 hello 伙伴們肉微。希...
    品味與回味閱讀 768評(píng)論 3 1
  • 注:只包含標(biāo)準(zhǔn)包中的操作符,用于個(gè)人學(xué)習(xí)及備忘參考博客:http://blog.csdn.net/maplejaw...
    小白要超神閱讀 2,199評(píng)論 2 8
  • 創(chuàng)建操作 用于創(chuàng)建Observable的操作符Create通過(guò)調(diào)用觀察者的方法從頭創(chuàng)建一個(gè)ObservableEm...
    rkua閱讀 1,836評(píng)論 0 1
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理蜡塌,服務(wù)發(fā)現(xiàn)碉纳,斷路器,智...
    卡卡羅2017閱讀 134,719評(píng)論 18 139
  • 作者: maplejaw本篇只解析標(biāo)準(zhǔn)包中的操作符岗照。對(duì)于擴(kuò)展包村象,由于使用率較低,如有需求攒至,請(qǐng)讀者自行查閱文檔厚者。 創(chuàng)...
    maplejaw_閱讀 45,699評(píng)論 8 93