Rx轉換操作符


map操作符

被觀察者數(shù)據(jù)源泛型剩瓶,當發(fā)射器的數(shù)據(jù)類型和觀察者數(shù)據(jù)類型不同時曹傀,通過map操作符轉換,可以將上游發(fā)射的類型轉換成任意對象類型形用。

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onComplete();
    }
}).map(new Function<Integer, String>() {
    @Override
    public String apply(Integer s) throws Exception {
        String newStr = s + "_";
        Log.d(TAG, "int apply s " + newStr);
        return newStr;
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String response) throws Exception {
        Log.d(TAG, "Observer : " + response);
    }
});

發(fā)射數(shù)據(jù)類型是Integer類,通過map操作符证杭,將類型轉換成String類田度。Function是一個類型轉換接口,F(xiàn)unction<T, R>解愤,將T轉換R镇饺,解決被觀察者和觀察者數(shù)據(jù)類型不匹配問題。

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

返回一個被觀察者ObservableMap送讲,封裝原始被觀察者ObservableCreate和轉換接口Function奸笤,調(diào)用ObservableMap的subscribe注冊方法。

@Override
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}

創(chuàng)建觀察者MapObserver哼鬓,封裝自定義觀察者和轉換接口Function监右。source源即內(nèi)部ObservableCreate,調(diào)用它的subscribe方法异希。

被觀察者鏈

ObservableCreate的#subscribeActual方法健盒,創(chuàng)建CreateEmitter數(shù)據(jù)發(fā)射器,通知觀察者已經(jīng)注冊称簿。
調(diào)用數(shù)據(jù)源source(ObservableOnSubscribe)的subscribe方法扣癣,將發(fā)射器暴漏給外部。外部通過發(fā)射器發(fā)射數(shù)據(jù)憨降,如onNext方法父虑。
發(fā)射器CreateEmitter持有觀察者MapObserver,當onNext事件發(fā)射后券册,通知觀察者MapObserver的onNext方法频轿,傳參發(fā)射的數(shù)據(jù)類型Integer類。

public void onNext(T t) {
    ...    
    U v;
    try {
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    actual.onNext(v);
}

根據(jù)MapObserver內(nèi)部轉換接口Function烁焙,apply方法航邢,將T類型轉換成U類型,再調(diào)用自己定義觀察者Observer的onNext方法骄蝇,入?yún)?shù)據(jù)類型轉換成String膳殷。
發(fā)射器onNext方法和觀察者accept方法按照通知順序執(zhí)行。

Rx的map操作符

flatMap操作符

flatMap操作符和map類似,F(xiàn)unction接口實現(xiàn)類型轉換赚窃,轉換的對象是一個被觀察者ObservableSource册招。

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
    }
}).flatMap(new Function<Integer, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(Integer s) throws Exception {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                //將上游Integer類型數(shù)據(jù),在新發(fā)射器中改造發(fā)射勒极。
                String newStr = s + "_gc1";
                String newStr2 = s + "_gc2";
                Log.d(TAG, "int apply s " + newStr);
                Log.d(TAG, "int apply s " + newStr2);
                e.onNext(newStr);
                e.onNext(newStr2);
            }
        });
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String response) throws Exception {
        Log.d(TAG, "Observer : " + response);
    }
});

將上游發(fā)射器每個Integer類型的數(shù)據(jù)轉換成Observable類型是掰,再由每個轉換的被觀察者發(fā)射目標類型數(shù)據(jù)。

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
                                       boolean delayErrors, int maxConcurrency, int bufferSize) {
    ...
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}

返回一個被觀察者ObservableFlatMap辱匿。封裝原始被觀察者ObservableCreate和轉換接口Function键痛,調(diào)用ObservableFlatMap的subscribe注冊方法。

public void subscribeActual(Observer<? super U> t) {
    ...
    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}

創(chuàng)建觀察者MergeObserver匾七,封裝自定義觀察者和轉換接口Function絮短,source源即內(nèi)部ObservableCreate,調(diào)用它的subscribe方法昨忆。當發(fā)射器onNext方法發(fā)射時丁频,調(diào)用發(fā)射器內(nèi)部MergeObserver的onNext方法。

@Override
public void onNext(T t) {

    ObservableSource<? extends U> p;
    try {
        p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
    } catch (Throwable e) {
        return;
    }
    //調(diào)用的是新建ObservableSource的注冊方法邑贴。
    subscribeInner(p);
}

通過Function接口方法席里,將Integer類型轉換成ObservableSource類型,轉換對象是一個被觀察者痢缎,外部創(chuàng)建胁勺,ObservableCreate類型,將Integer類型的數(shù)據(jù)暴露在新被觀察者的數(shù)據(jù)源發(fā)射器中独旷,處理轉換成新發(fā)射器支持String類型,subscribeInner方法寥裂,新被觀察者訂閱嵌洼。

void subscribeInner(ObservableSource<? extends U> p) {
    for (;;) {
        if (p instanceof Callable) { 
            ...
        } else {
            InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
            if (addInner(inner)) {
                p.subscribe(inner);
            }
            break;
        }
    }
}

調(diào)用Observable的subscribe方法,訂閱InnerObserver觀察者封恰,外部調(diào)用發(fā)射器onNext方法麻养,可以獲取apply方法中上層發(fā)射的Integer數(shù)據(jù),按照String類型诺舔,觸發(fā)兩個onNext方法再次發(fā)射數(shù)據(jù)鳖昌,兩次調(diào)用觀察者InnerObserver的onNext方法,每次低飒,調(diào)用它引用MergeObserver的onNext方法许昨,最終,通知到外部觀察者褥赊。

flatMap最初的onNext順序糕档,在Function轉換成新Observable后,根據(jù)收到的數(shù)據(jù)拌喉,包裝重新發(fā)射一批新數(shù)據(jù)速那。在觀察者到的onNext順序不一定是按照最初的onNext順序調(diào)用的俐银。
上面發(fā)送的1,2端仰,3捶惜,在觀察者中看到的不一定是1,2荔烧,3的排序吱七,加一個延遲就能看到,即1_gc1茴晋,1_gc2陪捷,3_gc1,3_gc2诺擅,2_gc1市袖,2_gc2。

flatMap操作符數(shù)據(jù)流程

總結

flatMap不保證數(shù)據(jù)發(fā)射流的通知順序烁涌。
concatMap和flatMap功能相同苍碟,可以保證按照發(fā)射順序通知。


任重而道遠

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末撮执,一起剝皮案震驚了整個濱河市微峰,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌抒钱,老刑警劉巖蜓肆,帶你破解...
    沈念sama閱讀 219,270評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異谋币,居然都是意外死亡仗扬,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,489評論 3 395
  • 文/潘曉璐 我一進店門蕾额,熙熙樓的掌柜王于貴愁眉苦臉地迎上來早芭,“玉大人,你說我怎么就攤上這事诅蝶⊥烁觯” “怎么了?”我有些...
    開封第一講書人閱讀 165,630評論 0 356
  • 文/不壞的土叔 我叫張陵调炬,是天一觀的道長语盈。 經(jīng)常有香客問我,道長筐眷,這世上最難降的妖魔是什么黎烈? 我笑而不...
    開封第一講書人閱讀 58,906評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮,結果婚禮上照棋,老公的妹妹穿的比我還像新娘资溃。我一直安慰自己,他們只是感情好烈炭,可當我...
    茶點故事閱讀 67,928評論 6 392
  • 文/花漫 我一把揭開白布溶锭。 她就那樣靜靜地躺著,像睡著了一般符隙。 火紅的嫁衣襯著肌膚如雪趴捅。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,718評論 1 305
  • 那天霹疫,我揣著相機與錄音拱绑,去河邊找鬼。 笑死丽蝎,一個胖子當著我的面吹牛猎拨,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播屠阻,決...
    沈念sama閱讀 40,442評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼红省,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了国觉?” 一聲冷哼從身側響起吧恃,我...
    開封第一講書人閱讀 39,345評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎麻诀,沒想到半個月后痕寓,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,802評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡蝇闭,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,984評論 3 337
  • 正文 我和宋清朗相戀三年厂抽,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片丁眼。...
    茶點故事閱讀 40,117評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖昭殉,靈堂內(nèi)的尸體忽然破棺而出苞七,到底是詐尸還是另有隱情,我是刑警寧澤挪丢,帶...
    沈念sama閱讀 35,810評論 5 346
  • 正文 年R本政府宣布蹂风,位于F島的核電站,受9級特大地震影響乾蓬,放射性物質發(fā)生泄漏惠啄。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,462評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望撵渡。 院中可真熱鬧融柬,春花似錦、人聲如沸趋距。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,011評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽节腐。三九已至外盯,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間翼雀,已是汗流浹背饱苟。 一陣腳步聲響...
    開封第一講書人閱讀 33,139評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留狼渊,地道東北人箱熬。 一個月前我還...
    沈念sama閱讀 48,377評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像囤锉,于是被迫代替她去往敵國和親坦弟。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,060評論 2 355

推薦閱讀更多精彩內(nèi)容