rxjava1基本元素源碼分析

代碼示例

Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        LogUtils.loge("subscriber call ...");
        if (!subscriber.isUnsubscribed()) {
            subscriber.onNext("test1");
            subscriber.onCompleted();
        }
    }
});

Observer<String> observer = new Observer<String>() {

    @Override
    public void onCompleted() {
        LogUtils.loge("Observer onCompleted");
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onNext(String s) {
        LogUtils.loge("Observer onNext s = " + s);
    }
};
Subscription subscription = observable.subscribe(observer);

常用類說明

被觀察者

rx.Observable

訂閱

rx.Subscription

public interface Subscription {
    void unsubscribe();
    boolean isUnsubscribed();
}

觀察者

rx.Observer

public interface Observer<T> {
    void onCompleted();
    void onError(Throwable e);
    void onNext(T t);
}

執(zhí)行流程

rx.Observable#create(rx.Observable.OnSubscribe<T>)

public static <T> Observable<T> create(OnSubscribe<T> f) {          
    // 加載RxJavaHooks的static,初始化資源
    // 返回一個Observable對象
    return new Observable<T>(RxJavaHooks.onCreate(f));
}

rx.plugins.RxJavaHooks#onCreate(rx.Observable.OnSubscribe<T>)

public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
    Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
    if (f != null) {
        // 這里其實是調(diào)用到了onObservableCreate的call方法
        return f.call(onSubscribe);
    }
    return onSubscribe;
}

rx.plugins.RxJavaHooks

static {
    init();
}

static void init() {
    onObservableStart = new Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe>() {
        @Override
        public Observable.OnSubscribe call(Observable t1, Observable.OnSubscribe t2) {
            // 調(diào)用開始訂閱的方法
            return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2);
        }
    };
    
    onObservableReturn = new Func1<Subscription, Subscription>() {
        @Override
        public Subscription call(Subscription f) {
            return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeReturn(f);
        }
    };
    
    initCreate();
}

static void initCreate() {
    onObservableCreate = new Func1<Observable.OnSubscribe, Observable.OnSubscribe>() {
        @Override
        public Observable.OnSubscribe call(Observable.OnSubscribe f) {
            /*
            這里1. 初始化RxJavaObservableExecutionHook 
                2. 返回我們傳入的Observable.OnSubscribe
            */
            return RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f);
        }
    };
}

rx.plugins.RxJavaPlugins#getObservableExecutionHook

public RxJavaObservableExecutionHook getObservableExecutionHook() {
    if (observableExecutionHook.get() == null) {
        // 從系統(tǒng)配置文件中查找一個RxJavaObservableExecutionHook的實現(xiàn)類
        Object impl = getPluginImplementationViaProperty(RxJavaObservableExecutionHook.class, System.getProperties());
        // impl = null
        if (impl == null) {
            // 沒有找到就使用這個默認的RxJavaObservableExecutionHookDefault實現(xiàn)類
            observableExecutionHook.compareAndSet(null, RxJavaObservableExecutionHookDefault.getInstance());
            // we don't return from here but call get() again in case of thread-race so the winner will always get returned
        } else {
            // we received an implementation from the system property so use it
            observableExecutionHook.compareAndSet(null, (RxJavaObservableExecutionHook) impl);
        }
    }
    return observableExecutionHook.get();
}

rx.plugins.RxJavaObservableExecutionHookDefault

rx.Observable#subscribe(rx.Observer<? super T>)

public final Subscription subscribe(final Observer<? super T> observer) {
    // 使用ObserverSubscriber對observer進行包裝
    return subscribe(new ObserverSubscriber<T>(observer));
}

rx.Observable#subscribe(rx.Subscriber<? super T>)

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}

rx.Observable#subscribe(rx.Subscriber<? super T>, rx.Observable<T>)

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    
    // new Subscriber so onStart it
    subscriber.onStart();
    
    // 保證線程安全
    if (!(subscriber instanceof SafeSubscriber)) {
        // assign to `observer` so we return the protected version
        subscriber = new SafeSubscriber<T>(subscriber);
    }

    try {
        // 調(diào)用先RxJavaHooks的onObservableStart的call方法,然后再調(diào)用我們在activity中定義的onSubscribe的call方法
        // 這里其實就是調(diào)用了開始訂閱
        RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
        return RxJavaHooks.onObservableReturn(subscriber);
    } 
}

rx.plugins.RxJavaHooks#onObservableStart

public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
    Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
    if (f != null) {
        return f.call(instance, onSubscribe);
    }
    return onSubscribe;
}   

rx.plugins.RxJavaHooks#onObservableReturn

public static Subscription onObservableReturn(Subscription subscription) {
    Func1<Subscription, Subscription> f = onObservableReturn;
    if (f != null) {
        return f.call(subscription);
    }
    return subscription;
}

ObserverSubscriber類

public final class ObserverSubscriber<T> extends Subscriber<T> {
    final Observer<? super T> observer;

    public ObserverSubscriber(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        // 調(diào)用observer方法
        observer.onNext(t);
    }

    @Override
    public void onError(Throwable e) {
        observer.onError(e);
    }

    @Override
    public void onCompleted() {
        // 調(diào)用observer方法
        observer.onCompleted();
    }
}   

源碼閱讀總結(jié)

Subscription關(guān)聯(lián)觀察者和訂閱者

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    try {
        // 調(diào)用先RxJavaHooks的onObservableStart的call方法夸楣,然后再調(diào)用我們在activity中定義的onSubscribe的call方法
        // 這里其實就是調(diào)用了開始訂閱
        RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
        return RxJavaHooks.onObservableReturn(subscriber);
    } 
}

observable.onSubscribe執(zhí)行

RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

課堂總結(jié)

Observable

  1. 觀察得到的-被觀察者
  2. 通過Observable創(chuàng)建一個可觀察的序列(create方法)
  3. 通過subscribe去注冊一個觀察者

Observer

  1. 用于接收數(shù)據(jù)-觀察者
  2. 作為Observable的subsceibe方法的參數(shù)

Subscription

  1. 訂閱,用于描述被觀察者和觀察者之間的關(guān)系
  2. 用于取消訂閱和獲取當前訂閱狀態(tài)

OnSubscribe

  1. 當訂閱時會觸發(fā)此接口的調(diào)用
  2. 在Observable內(nèi)部凸丸,實際作用是向訂閱者發(fā)射數(shù)據(jù)

Subscribe

  1. 實現(xiàn)了Observer和Subscription
  2. 只有自己才能阻止自己
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌伞租,老刑警劉巖淹接,帶你破解...
    沈念sama閱讀 217,406評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件十性,死亡現(xiàn)場離奇詭異,居然都是意外死亡塑悼,警方通過查閱死者的電腦和手機劲适,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評論 3 393
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來厢蒜,“玉大人霞势,你說我怎么就攤上這事“哐唬” “怎么了愕贡?”我有些...
    開封第一講書人閱讀 163,711評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長鄙才。 經(jīng)常有香客問我颂鸿,道長,這世上最難降的妖魔是什么攒庵? 我笑而不...
    開封第一講書人閱讀 58,380評論 1 293
  • 正文 為了忘掉前任嘴纺,我火速辦了婚禮,結(jié)果婚禮上浓冒,老公的妹妹穿的比我還像新娘栽渴。我一直安慰自己,他們只是感情好稳懒,可當我...
    茶點故事閱讀 67,432評論 6 392
  • 文/花漫 我一把揭開白布闲擦。 她就那樣靜靜地躺著,像睡著了一般场梆。 火紅的嫁衣襯著肌膚如雪墅冷。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,301評論 1 301
  • 那天或油,我揣著相機與錄音寞忿,去河邊找鬼。 笑死顶岸,一個胖子當著我的面吹牛腔彰,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播辖佣,決...
    沈念sama閱讀 40,145評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼霹抛,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了卷谈?” 一聲冷哼從身側(cè)響起杯拐,我...
    開封第一講書人閱讀 39,008評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后藕施,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體寇损,經(jīng)...
    沈念sama閱讀 45,443評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,649評論 3 334
  • 正文 我和宋清朗相戀三年裳食,在試婚紗的時候發(fā)現(xiàn)自己被綠了矛市。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,795評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡诲祸,死狀恐怖浊吏,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情救氯,我是刑警寧澤找田,帶...
    沈念sama閱讀 35,501評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站着憨,受9級特大地震影響墩衙,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜甲抖,卻給世界環(huán)境...
    茶點故事閱讀 41,119評論 3 328
  • 文/蒙蒙 一漆改、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧准谚,春花似錦挫剑、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至唆铐,卻和暖如春哲戚,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背艾岂。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評論 1 269
  • 我被黑心中介騙來泰國打工惫恼, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人澳盐。 一個月前我還...
    沈念sama閱讀 47,899評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像令宿,于是被迫代替她去往敵國和親叼耙。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,724評論 2 354

推薦閱讀更多精彩內(nèi)容