RxJava2的Observable變種之變換操作符

map—— 一對(duì)一轉(zhuǎn)換典奉,一般是轉(zhuǎn)換成結(jié)果悼沿,如string到int的轉(zhuǎn)換∥可以集合進(jìn)行轉(zhuǎn)換翁巍,每次轉(zhuǎn)換都接著執(zhí)行onNext操作。

看個(gè)map的例子

Observable.create(new ObservableOnSubscribe<String>() {

    @Override
    public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
        observableEmitter.onNext("hello world");
    }
}).map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Exception {
                return s.hashCode();
            }
        })
   .subscribe(new io.reactivex.Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable disposable) {

    }

    @Override
    public void onNext(Integer number) {
        Log.d("RXJava2", "hascode: " + number);
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onComplete() {

    }
});

我們多次強(qiáng)調(diào)所有的訂閱者關(guān)系都會(huì)是形如observable.subscribe(observer)休雌。那么從上面的例子中灶壶,肯定可以看到經(jīng)過(guò)調(diào)用map方法之后,仍然返回Observable具體實(shí)例杈曲,當(dāng)然驰凛,參考線程變換一文,經(jīng)過(guò)map之后担扑,肯定是經(jīng)過(guò)裝飾者模式進(jìn)行了一層Observeable組合包裝恰响,經(jīng)過(guò)suscibe方法之后,也進(jìn)行了一層Observer組合包裝涌献。

具體情況不再詳細(xì)分析胚宦,可以詳細(xì)看線程變換一文,原理大體一致。現(xiàn)在主要大概分析下經(jīng)過(guò)map之后间唉,返回的ObservableMap如何執(zhí)行subscribe方法:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }


    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}

我們可以看到绞灼,直接調(diào)用source.subscribe(new MapObserver<T, U>(t, function))利术,進(jìn)而進(jìn)入到MapObserver的onNexy方法中呈野,主要看下面幾句代碼:

try {
    v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
    fail(ex);
    return;
}
actual.onNext(v);

執(zhí)行mapper.apply(t),這里mapper即為map方法參數(shù)印叁,獲得值v被冒,本例中即為s.hashCode, 接著執(zhí)行事件發(fā)射,到達(dá)最基層observer轮蜕,打印log昨悼。

flatmap—— 一對(duì)多轉(zhuǎn)換,多對(duì)多轉(zhuǎn)換跃洛,每次轉(zhuǎn)換都會(huì)生成observable率触,而且是全部轉(zhuǎn)換完成才提交給訂閱者處理。

flatmap的作用一般是用于一個(gè)列表做扁平化處理汇竭,比如一個(gè)學(xué)生數(shù)據(jù)結(jié)構(gòu)的表單葱蝗,想對(duì)這個(gè)表單里的單元數(shù)據(jù)做一一處理,那么flatmap就派上場(chǎng)用到了细燎。flatmap的實(shí)現(xiàn)方式仍然和前面分析的大致两曼。不過(guò)既然是從一對(duì)多轉(zhuǎn)換,我們先得出一個(gè)的就是肯定通過(guò)某個(gè)組合包裝的Observable玻驻,在subscribe訂閱的時(shí)候通過(guò)循環(huán)往外發(fā)射數(shù)據(jù)悼凑,而observer觀察者也應(yīng)該通過(guò)循環(huán)接收數(shù)據(jù)。

我們以一個(gè)例子以及進(jìn)入相關(guān)的關(guān)鍵代碼來(lái)分析是否與猜想的一致:

observable.just(studentList).flatMap(new Func1<List, Observable<?>>() {
            @Override
            public Observable<?> call(List list) {
                return Observable.fromInterable(list);
            }
        }).subscribe(new Action1<Object>() {
            @Override
            public void call(Object o) {
                Log.d("flatmap", o.toString());
            }
        });

上面的例子中,studentList是一個(gè)list列表璧瞬。那么經(jīng)過(guò)flatMap方法

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    if (this instanceof ScalarCallable) {
        T v = ((ScalarCallable)this).call();
        return v == null ? empty() : ObservableScalarXMap.scalarXMap(v, mapper);
    } else {
        return RxJavaPlugins.onAssembly(new ObservableFlatMap(this, mapper, delayErrors, maxConcurrency, bufferSize));
    }
}

我們看到返回一個(gè)ObservableFlatMap實(shí)例户辫。當(dāng)我們調(diào)用observable.subscrbe(obverser)的時(shí)候,調(diào)用到如下方法:

@Override
public void subscribeActual(Observer<? super U> t) {

    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
        return;
    }

    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}

主要看MergeObserver的onNext方法

public void onNext(T t) {
    // safeguard against misbehaving sources
    if (done) {
        return;
    }
    ObservableSource<? extends U> p;
    try {
        p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        s.dispose();
        onError(e);
        return;
    }

    if (maxConcurrency != Integer.MAX_VALUE) {
        synchronized (this) {
            if (wip == maxConcurrency) {
                sources.offer(p);
                return;
            }
            wip++;
        }
    }

    subscribeInner(p);
}

而mapper.apply(t)即是例子中返回的Observable.from(list)嗤锉,我們繼續(xù)分析subscribeInner方法:

void subscribeInner(ObservableSource<? extends U> p) {
    for (;;) {
        if (p instanceof Callable) {
            if (tryEmitScalar(((Callable<? extends U>)p)) && maxConcurrency != Integer.MAX_VALUE) {
                boolean empty = false;
                synchronized (this) {
                    p = sources.poll();
                    if (p == null) {
                        wip--;
                        empty = true;
                    }
                }
                if (empty) {
                    drain();
                    break;
                }
            } else {
                break;
            }
        } else {
            InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
            if (addInner(inner)) {
                p.subscribe(inner);
            }
            break;
        }
    }
}

一般走else語(yǔ)句渔欢,進(jìn)入InnerObserver中,走到InnerObserver的onNext方法

public void onNext(U t) {
    if (fusionMode == QueueDisposable.NONE) {
        parent.tryEmit(t, this);
    } else {
        parent.drain();
    }
}

一般fusionMode為NONE档冬,調(diào)用MergeObserver的tryEmit方法:

void tryEmit(U value, InnerObserver<T, U> inner) {
    if (get() == 0 && compareAndSet(0, 1)) {
        actual.onNext(value);
        if (decrementAndGet() == 0) {
            return;
        }
    } else {
        SimpleQueue<U> q = inner.queue;
        if (q == null) {
            q = new SpscLinkedArrayQueue<U>(bufferSize);
            inner.queue = q;
        }
        q.offer(value);
        if (getAndIncrement() != 0) {
            return;
        }
    }
    drainLoop();
}

主要看drainLoop方法, drainLoop比較長(zhǎng)膘茎,大體就是循環(huán)接收發(fā)射來(lái)的數(shù)據(jù),并通知子observer接收數(shù)據(jù)child.onNext();

那么誰(shuí)發(fā)射數(shù)據(jù)到MergeObserver的酷誓,我們上面提到p = Observable.fromInterable(list); 這里就是主要發(fā)射數(shù)據(jù)點(diǎn), 實(shí)際上p為ObservableFromIterable實(shí)例披坏,進(jìn)入具體subscribeActual方法:

public void subscribeActual(Observer<? super T> s) {
    Iterator<? extends T> it;
    try {
        it = source.iterator();
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        EmptyDisposable.error(e, s);
        return;
    }
    boolean hasNext;
    try {
        hasNext = it.hasNext();
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        EmptyDisposable.error(e, s);
        return;
    }
    if (!hasNext) {
        EmptyDisposable.complete(s);
        return;
    }

    FromIterableDisposable<T> d = new FromIterableDisposable<T>(s, it);
    s.onSubscribe(d);

    if (!d.fusionMode) {
        d.run();
    }
}

進(jìn)入FromIterableDisposable的run方法

void run() {
    boolean hasNext;

    do {
        if (isDisposed()) {
            return;
        }
        T v;

        try {
            v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            actual.onError(e);
            return;
        }

        actual.onNext(v);

        if (isDisposed()) {
            return;
        }
        try {
            hasNext = it.hasNext();
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            actual.onError(e);
            return;
        }
    } while (hasNext);

    if (!isDisposed()) {
        actual.onComplete();
    }
}

進(jìn)入for循環(huán),發(fā)射數(shù)據(jù)actual.onNext(v)盐数,這個(gè)actual是通過(guò)組合包裝的MergeObserver棒拂,于是驗(yàn)證了我們一開(kāi)始猜測(cè)的是正確的。

scan:將發(fā)射源的結(jié)果作為入?yún)⒌牡谝粋€(gè)參數(shù),入?yún)⒌牡诙€(gè)參數(shù)為第i個(gè)發(fā)射源數(shù)據(jù)帚屉,然后根據(jù)自定義的處理結(jié)果繼續(xù)處理谜诫。

scan通過(guò)看完源碼之后會(huì)特別容易理解,先舉個(gè)例子攻旦。

Observable.just(1, 2, 3, 4,5)
            .scan(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer t1, Integer t2) {
                        return t1 + t2;//累加操作喻旷,
                    }
                })
                .subscribe(getObserver())

那么上面的例子,observer將依次接收到1牢屋,3且预,6,10烙无,15
上面的例子中我們主要看scan相關(guān)的Observer——ScanObserver锋谐。onNext方法如下:

public void onNext(T t) {
    if (!this.done) {
        Observer<? super T> a = this.actual;
        T v = this.value;
        if (v == null) {
            this.value = t;
            a.onNext(t);
        } else {
            Object u;
            try {
                u = ObjectHelper.requireNonNull(this.accumulator.apply(v, t), "The value returned by the accumulator is null");
            } catch (Throwable var6) {
                Exceptions.throwIfFatal(var6);
                this.s.dispose();
                this.onError(var6);
                return;
            }

            this.value = u;
            a.onNext(u);
        }

    }
}

從上面可以看出,當(dāng)發(fā)射1的時(shí)候截酷,this.value為空涮拗,因此進(jìn)入if語(yǔ)句中,輸出1迂苛,接著發(fā)送2的時(shí)候三热,進(jìn)入else語(yǔ)句,通過(guò)this.accumulator.apply(v, t)灾部,例子中是進(jìn)行v+t操作康铭,這里t為上一個(gè)發(fā)射源結(jié)果1,因?yàn)?+2赌髓,得出3从藤。后面處理結(jié)果一樣。

reduce:和scan類(lèi)似锁蠕,只不過(guò)scan會(huì)先把結(jié)果提交給訂閱者輸出夷野,而reduce是等所有observable處理完,才把最終結(jié)果提交給訂閱者處理荣倾。

先看個(gè)例子:

Observable.just(1, 2, 3, 4)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer t1, Integer t2) {
                        return t1 + t2;
                    }
                })
                .subscribe(getObserver())

主要看reduce方法的對(duì)應(yīng)的Observer——ReduceObserver悯搔,onNext方法如下:

public void onNext(T value) {
    if (!this.done) {
        T v = this.value;
        if (v == null) {
            this.value = value;
        } else {
            try {
                this.value = ObjectHelper.requireNonNull(this.reducer.apply(v, value), "The reducer returned a null value");
            } catch (Throwable var4) {
                Exceptions.throwIfFatal(var4);
                this.d.dispose();
                this.onError(var4);
            }
        }
    }
}

這樣,只是先計(jì)算出來(lái)并保存結(jié)果value上舌仍。然后在onComplete中輸出妒貌。

public void onComplete() {
    if (!this.done) {
        this.done = true;
        T v = this.value;
        this.value = null;
        if (v != null) {
            this.actual.onSuccess(v);
        } else {
            this.actual.onComplete();
        }

    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市铸豁,隨后出現(xiàn)的幾起案子灌曙,更是在濱河造成了極大的恐慌,老刑警劉巖节芥,帶你破解...
    沈念sama閱讀 219,589評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件冒滩,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡撮奏,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,615評(píng)論 3 396
  • 文/潘曉璐 我一進(jìn)店門(mén)款筑,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人,你說(shuō)我怎么就攤上這事〈吭桑” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,933評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵输玷,是天一觀的道長(zhǎng)队丝。 經(jīng)常有香客問(wèn)我,道長(zhǎng)欲鹏,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,976評(píng)論 1 295
  • 正文 為了忘掉前任臭墨,我火速辦了婚禮赔嚎,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘胧弛。我一直安慰自己尤误,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,999評(píng)論 6 393
  • 文/花漫 我一把揭開(kāi)白布结缚。 她就那樣靜靜地躺著损晤,像睡著了一般。 火紅的嫁衣襯著肌膚如雪红竭。 梳的紋絲不亂的頭發(fā)上尤勋,一...
    開(kāi)封第一講書(shū)人閱讀 51,775評(píng)論 1 307
  • 那天,我揣著相機(jī)與錄音茵宪,去河邊找鬼最冰。 笑死,一個(gè)胖子當(dāng)著我的面吹牛稀火,可吹牛的內(nèi)容都是我干的暖哨。 我是一名探鬼主播,決...
    沈念sama閱讀 40,474評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼凰狞,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼篇裁!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起赡若,我...
    開(kāi)封第一講書(shū)人閱讀 39,359評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤达布,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后斩熊,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體往枣,經(jīng)...
    沈念sama閱讀 45,854評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,007評(píng)論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了分冈。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片圾另。...
    茶點(diǎn)故事閱讀 40,146評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖雕沉,靈堂內(nèi)的尸體忽然破棺而出集乔,到底是詐尸還是另有隱情,我是刑警寧澤坡椒,帶...
    沈念sama閱讀 35,826評(píng)論 5 346
  • 正文 年R本政府宣布扰路,位于F島的核電站,受9級(jí)特大地震影響倔叼,放射性物質(zhì)發(fā)生泄漏汗唱。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,484評(píng)論 3 331
  • 文/蒙蒙 一丈攒、第九天 我趴在偏房一處隱蔽的房頂上張望哩罪。 院中可真熱鬧,春花似錦巡验、人聲如沸际插。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,029評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)框弛。三九已至,卻和暖如春捕捂,著一層夾襖步出監(jiān)牢的瞬間瑟枫,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,153評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工绞蹦, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留力奋,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,420評(píng)論 3 373
  • 正文 我出身青樓幽七,卻偏偏與公主長(zhǎng)得像景殷,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子澡屡,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,107評(píng)論 2 356

推薦閱讀更多精彩內(nèi)容