RxJava中的flatMap源碼分析

本文的分析基于RxJava1.1.5版本谈火,flatMap是為了一對多的轉(zhuǎn)換而設(shè)計的痪欲,具體的實現(xiàn)運用了merge和map的操作界拦,而最終也還是基于了lift()方法,是轉(zhuǎn)換的思想两嘴,下面是具體的分析

1、首先創(chuàng)建一個簡單的例子族壳,代碼如下

        final List<Student> students = new ArrayList<>();

        List<Course> jayList = new ArrayList<>();
        jayList.add(new Course("語文", "何炅"));
        jayList.add(new Course("英語", "謝娜"));
        jayList.add(new Course("物理", "何時風(fēng)"));
        students.add(new Student(1, "周杰倫", jayList));

        List<Course> jjList = new ArrayList<>();
        jjList.add(new Course("數(shù)學(xué)", "鄧軍權(quán)"));
        jjList.add(new Course("生物", "搖風(fēng)"));
        jjList.add(new Course("物理", "何時風(fēng)"));
        jjList.add(new Course("語文", "何炅"));
        students.add(new Student(2, "林俊杰", jjList));

        List<Course> luhanList = new ArrayList<>();
        luhanList.add(new Course("英語", "謝娜"));
        luhanList.add(new Course("生物", "搖風(fēng)"));
        luhanList.add(new Course("語文", "何炅"));
        students.add(new Student(3, "鹿晗", luhanList));

        Observable.create(new Observable.OnSubscribe<Student>() {
            @Override
            public void call(Subscriber<? super Student> subscriber) {
                for (Student s : students) {
                    subscriber.onNext(s);
                }
            }
        }).flatMap(new Func1<Student, Observable<Course>>() {
            @Override
            public Observable<Course> call(Student student) {
                Log.e("TAG", "學(xué)生名稱為:" + student.getName());
                return Observable.from(student.getmList());
            }
        }).subscribe(new Subscriber<Course>() {
            @Override
            public void onCompleted() {
                Log.e("TAG", "---onComplete()------");
            }

            @Override
            public void onError(Throwable e) {
                Log.e("TAG", "---onError()------");
            }

            @Override
            public void onNext(Course course) {
                Log.e("TAG", "課程名稱為:" + course.getCourseName() + ", 任課老師為:" + course.getTechName());
            }
        });

以上用到的Student類還有Course類如下

class Student {
        private int id;
        private String name;
        private List<Course> mList;

        public Student(int id, String name, List<Course> mList) {
            this.id = id;
            this.name = name;
            this.mList = mList;
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public List<Course> getmList() {
            return mList;
        }

        public void setmList(List<Course> mList) {
            this.mList = mList;
        }
    }

    class Course {
        private String courseName;
        private String techName;

        public Course(String courseName, String techName) {
            this.courseName = courseName;
            this.techName = techName;
        }

        public String getCourseName() {
            return courseName;
        }

        public void setCourseName(String courseName) {
            this.courseName = courseName;
        }

        public String getTechName() {
            return techName;
        }

        public void setTechName(String techName) {
            this.techName = techName;
        }
    }

2憔辫、下面是具體的分析

首先進(jìn)入到flatMap()方法中,flatMap的代碼如下

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
        if (getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
        }
        return merge(map(func));
    }

判斷直接跳過仿荆,主要看返回值贰您,返回值調(diào)用了merge()方法,并且以map()方法的返回值作為參數(shù)拢操,那么我們首先進(jìn)入到map()方法中看看

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }

這個方法中將會調(diào)用以func1對象為參數(shù)锦亦,創(chuàng)建OperatorMap對象,然后將OperatorMap對象作為參數(shù)調(diào)用lift()方法令境,那么進(jìn)入到lift()方法看看

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }

這個方法中將以初始被觀察者對象中的onSubscribe(本文中我們將初始被觀察者對象稱為ob_init杠园,將ob_init中的onSubscribe稱為onSub_init)和OperatorMap對象為參數(shù)創(chuàng)建第一個OnSubscribeLift對象(稱為onSublift_one),同時以onSublift_one為參數(shù)創(chuàng)建新的被觀察者對象(稱為ob_one)舔庶,那么到此map完畢抛蚁,它將將ob_one返回作為merge()方法的參數(shù),那么下面進(jìn)入到merge()方法中

public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
        if (source.getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
        }
        return source.lift(OperatorMerge.<T>instance(false));
    }

在merge()方法中惕橙,前面判斷忽略瞧甩,直接看返回值,發(fā)現(xiàn)它將用ob_one去調(diào)用lift()方法弥鹦,并且會創(chuàng)建OperatorMerge對象作為lift()方法的參數(shù)肚逸,那么通過看前面lift()方法的作用,我們可以知道惶凝,它將會以O(shè)peratorMerge對象和ob_one中的onSubscribe作為參數(shù)再次創(chuàng)建新的OnSubscribeLift對象(稱為onSublift_merge)吼虎,同時會以onSublift_merge作為參數(shù),再次創(chuàng)建新的被觀察者對象(稱為ob_merge)苍鲜,那么現(xiàn)在我們就可以知道思灰,flatMap()方法的最終返回值為ob_merge對象,那么下面ob_merge將會調(diào)用訂閱方法subscribe()混滔,并且會傳入初始觀察者對象(稱為sub_init)洒疚,那么下面進(jìn)入到subscribe()中看看

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }
    
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
   
    if (subscriber == null) {
        throw new IllegalArgumentException("observer can not be null");
    }
    if (observable.onSubscribe == null) {
        throw new IllegalStateException("onSubscribe function can not be null.");
    }
        
    // new Subscriber so onStart it
    subscriber.onStart();
       
    if (!(subscriber instanceof SafeSubscriber)) {
        // assign to `observer` so we return the protected version
        subscriber = new SafeSubscriber<T>(subscriber);
    }

    try {
        // allow the hook to intercept and/or decorate
        hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
        return hook.onSubscribeReturn(subscriber);
    } catch (Throwable e) {
           異常忽略...
    }
}

subscribe()最終會調(diào)用靜態(tài)的subscribe()方法,傳入的參數(shù)為sub_init對象和ob_merge對象坯屿,忽略掉前面的判斷直接到hook.onSubscribeStart(observable,observable.onSubscribe).call(subscriber)這一句油湖,在這里onSubscribeStart方法將原路返回傳入的observable.onSubscribe,那么傳入的傳入的observable.onSubscribe其實就是ob_merge中的onSubscribe领跛,那么它調(diào)用的call()方法應(yīng)該就是onSublift_merge對象中的call()方法乏德,也就是OnSubscribeLift類中的call()方法,傳入的參數(shù)為sub_init,下面進(jìn)入到該call()方法看看

 public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators 
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }

在這個方法中喊括,onLift()方法將會將傳入的參數(shù)原路返回胧瓜,也就是返回值就是傳入的operator,這個operator就是在創(chuàng)建onSublift_merge對象時保存的operator郑什,也就是OperatorMerge對象府喳,那么也就是會調(diào)用OperatorMerge對象中的call()方法,傳入的參數(shù)是sub_init蘑拯,下面進(jìn)入到OperatorMerge對象中的call()方法

@Override
public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
    MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
    MergeProducer<T> producer = new MergeProducer<T>(subscriber);
    subscriber.producer = producer;
        
    child.add(subscriber);
    child.setProducer(producer);
        
    return subscriber;
}

在這個方法中的主要作用就是將sub_init對象進(jìn)行包裝钝满,重新創(chuàng)建一個觀察者對象(稱為sub_merge),并且返回該對象申窘,那么在OnSubscribeLift類中的call()方法中的Subscriber<? super T> st = hook.onLift(operator).call(o)這個操作所創(chuàng)建的觀察者對象就為sub_merge弯蚜,接著call()方法會執(zhí)行,parent.call(st)偶洋,這里傳入的參數(shù)就是sub_merge熟吏,但是這里需要特別注意,parent的值為ob_one對象中的onSubscribe玄窝,也就是在利用map()方法創(chuàng)建的被觀察者對象中的onSubscribe,那么它調(diào)用的call()方法就是OnSubscribeLift類中的call()方法悍引,所以程序?qū)⒃俅螆?zhí)行OnSubscribeLift類中的call()方法恩脂,這次傳入的參數(shù)是sub_merge,那么這次的operator就是OperatorMap對象趣斤,那么它以sub_merge為參數(shù)調(diào)用call()方法俩块,調(diào)用的就是OperatorMap類中的方法,下面進(jìn)入到該方法

@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
    MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
    o.add(parent);
    return parent;
}

在這個方法中浓领,將會以sub_merge和func1對象(transformer保存的就是func1對象)為參數(shù)創(chuàng)建新的觀察者對象(稱為sub_one)玉凯,并且返回,那么在OnSubscribeLift類中的call()方法中返回的對象將是sub_one联贩,那么繼續(xù)往下執(zhí)行漫仆,將再次來到parent.call(st),那么這次的st就是sub_one泪幌,parent就是ob_init對象中的onSubscribe盲厌,也就是初始被觀察者對象中的onSubscribe,那么它調(diào)用的call()方法祸泪,將會回到一下代碼

public void call(Subscriber<? super Student> subscriber) {
                for (Student s : students) {
                    subscriber.onNext(s);
                }
            }

現(xiàn)在的觀察者對象已經(jīng)是sub_one吗浩,那么它調(diào)用的onNext()方法就是OperatorMap類中的靜態(tài)內(nèi)部類MapSubscriber中的onNext()方法,那么進(jìn)入到該方法

@Override
public void onNext(T t) {
    R result;
            
    try {
        result = mapper.call(t);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        unsubscribe();
        onError(OnErrorThrowable.addValueAsLastCause(ex, t));
        return;
    }
            
    actual.onNext(result);
}

在這個方法中没隘,主要就是result = mapper.call(t)這個操作懂扼,這里的mapper就是func1對象,那么func1對象調(diào)用的call()方法有回到了我們開始flatMap中的回調(diào)call()方法右蒲,它將會返回一個Observable對象阀湿,那么接著會調(diào)用actual.onNext(result)屡限,這里的actual就是sub_merge對象,也就是OperatorMerge類中創(chuàng)建的MergeSubscriber對象炕倘,那么調(diào)用它的onNext()方法钧大,我們進(jìn)入到它的onNext()方法看看,傳入的參數(shù)是func1對象返回的Observable對象

@Override
public void onNext(Observable<? extends T> t) {
    if (t == null) {
        return;
    }
    if (t == Observable.empty()) {
        emitEmpty();
    } else
    if (t instanceof ScalarSynchronousObservable) {
        tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
    } else {
        InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
        addInner(inner);
        t.unsafeSubscribe(inner);
        emit();
    }
}

在這個方法中罩旋,忽略掉前面的判斷啊央,直接進(jìn)入else分析,這里InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++)將會以sub_merge和uniqueId為參數(shù)再次創(chuàng)建一個觀察者對象(稱為inner_sub)涨醋,然后t.unsafeSubscribe(inner)這個操作瓜饥,因為t為func1對象所返回的Observable對象,所以將會將inner_sub為參數(shù)浴骂,調(diào)用unsafeSubscribe()方法乓土,那么進(jìn)入到unsafeSubscribe()方法

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
        try {
            // new Subscriber so onStart it
            subscriber.onStart();
            // allow the hook to intercept and/or decorate
            hook.onSubscribeStart(this, onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(hook.onSubscribeError(e));
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                hook.onSubscribeError(r);
                // TODO why aren't we throwing the hook's return value.
                throw r;
            }
            return Subscriptions.unsubscribed();
        }
    }

在這個方法中主要看hook.onSubscribeStart(this, onSubscribe).call(subscriber)這里,這里的onSubscribe就是func1對象返回的Observable對象中的onSubscribe溯警,所以調(diào)用它的call()方法趣苏,那么在這個call()方法中肯定會調(diào)用subscribe.onNext()方法,那么這個subscriber就是傳進(jìn)來的參數(shù)梯轻,也就是inner_sub食磕,那么將會調(diào)用inner_sub中的onNext()方法,也就是InnerSubscriber類中的onNext()方法喳挑,下面進(jìn)入到該方法

public void onNext(T t) {   
      parent.tryEmit(this, t);
}

在這個方法中的parent就是創(chuàng)建inner_sub時的傳入的父級觀察者對象彬伦,也就是MergeSubscriber對象,也就是sub_merge伊诵,那么調(diào)用該對象的tryEmit()方法单绑,下面進(jìn)入該方法,傳入的參數(shù)是inner_sub,t ( t為最終輸出的數(shù)據(jù))

void tryEmit(T value) {
    boolean success = false;
    long r = producer.get();
    if (r != 0L) {
        synchronized (this) {
            // if nobody is emitting and child has available requests
            r = producer.get();
            if (!emitting && r != 0L) {
                emitting = true;
                success = true;
            }
        }
    }
    if (success) {
        emitScalar(value, r);
    } else {
        queueScalar(value);
    }
}

這里的關(guān)鍵句在 emitScalar(value, r)這里曹宴,它將會將最終需要輸出的值和r作為參數(shù)調(diào)用emitScalar()方法搂橙,下面進(jìn)入到emitScalar()方法

protected void emitScalar(T value, long r) {
            boolean skipFinal = false;
            try {
                try {
                    child.onNext(value);
                } catch (Throwable t) {
                    if (!delayErrors) {
                        Exceptions.throwIfFatal(t);
                        skipFinal = true;
                        this.unsubscribe();
                        this.onError(t);
                        return;
                    }
                    getOrCreateErrorQueue().offer(t);
                }
                if (r != Long.MAX_VALUE) {
                    producer.produced(1);
                }
                
                int produced = scalarEmissionCount + 1;
                if (produced == scalarEmissionLimit) {
                    scalarEmissionCount = 0;
                    this.requestMore(produced);
                } else {
                    scalarEmissionCount = produced;
                }
                
                // check if some state changed while emitting
                synchronized (this) {
                    skipFinal = true;
                    if (!missed) {
                        emitting = false;
                        return;
                    }
                    missed = false;
                }
            } finally {
                if (!skipFinal) {
                    synchronized (this) {
                        emitting = false;
                    }
                }
            }
            emitLoop();
        }

在這個方法中的最主要的操作就是child.onNext(value)這個了,在這里浙炼,終于看到了child份氧,這個child就是初始的觀察者,也就是我們一開始創(chuàng)建的觀察者弯屈,那么它調(diào)用onNext()方法就是以下代碼

public void onNext(Course course) {
                Log.e("TAG", "課程名稱為:" + course.getCourseName() + ", 任課老師為:" + course.getTechName());
            }

這個方法就是我們自己創(chuàng)建觀察者對象時的回調(diào)方法蜗帜,就是最終的調(diào)用方法,到這里资厉,整個流程也就打通了厅缺,因為這是正常情況下的流程,所以忽略了很多的判斷和特殊的情況,最后湘捎,這個過程實在是有點復(fù)雜诀豁,所以,可能描寫的有點亂窥妇,望見諒

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末舷胜,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子活翩,更是在濱河造成了極大的恐慌烹骨,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件材泄,死亡現(xiàn)場離奇詭異沮焕,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)拉宗,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進(jìn)店門峦树,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人旦事,你說我怎么就攤上這事魁巩。” “怎么了族檬?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵歪赢,是天一觀的道長。 經(jīng)常有香客問我单料,道長,這世上最難降的妖魔是什么点楼? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任扫尖,我火速辦了婚禮,結(jié)果婚禮上掠廓,老公的妹妹穿的比我還像新娘换怖。我一直安慰自己,他們只是感情好蟀瞧,可當(dāng)我...
    茶點故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布沉颂。 她就那樣靜靜地躺著,像睡著了一般悦污。 火紅的嫁衣襯著肌膚如雪铸屉。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天切端,我揣著相機(jī)與錄音彻坛,去河邊找鬼。 笑死,一個胖子當(dāng)著我的面吹牛昌屉,可吹牛的內(nèi)容都是我干的钙蒙。 我是一名探鬼主播,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼间驮,長吁一口氣:“原來是場噩夢啊……” “哼躬厌!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起竞帽,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤扛施,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后抢呆,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體煮嫌,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年抱虐,在試婚紗的時候發(fā)現(xiàn)自己被綠了昌阿。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,561評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡恳邀,死狀恐怖懦冰,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情谣沸,我是刑警寧澤刷钢,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站乳附,受9級特大地震影響内地,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜赋除,卻給世界環(huán)境...
    茶點故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一阱缓、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧举农,春花似錦荆针、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至棱貌,卻和暖如春玖媚,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背键畴。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工最盅, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留突雪,地道東北人。 一個月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓涡贱,卻偏偏與公主長得像咏删,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子问词,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,573評論 2 359

推薦閱讀更多精彩內(nèi)容