RxJava 2.0 簡單使用和分析

RxJava 2.0配置

在項目Build.gradle文件里面添加如下代碼汛骂,即可:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.1'

基本操作

1.Observable.create()
Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        for(int i=0; i<5; i++){
            e.onNext(String.valueOf(i));
        }
        e.onComplete();
    }
})
.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        System.out.println(s);
    }
});

a). 傳入ObservableOnSubscribe 里面就一個方法subscribe井佑,傳入一個可觀察的數(shù)據(jù)發(fā)射器彩郊,繼承關(guān)系:ObservableEmitter -> Emitter尸诽。Emitter代表發(fā)射器,代碼如下:

public interface Emitter<T> {
    void onNext(T value);
    void onError(Throwable error);
    void onComplete();
}

這和RxJava1.0使用的Subscriber的主要功能是一模一樣的吕晌,提供數(shù)據(jù)源的發(fā)送熄诡。

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

b).我們常用的subscribe方法列表彼宠,如下:

Disposable subscribe(Consumer<? super T> onNext)
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe)
void subscribe(Observer<? super T> observer)

Disposable定義如下趣席,與RxJava1.0 Subscription對應(yīng)兵志,提供流的解除訂閱:

public interface Disposable {
    void dispose();
    boolean isDisposed();
}

Observer定義如下:

public interface Observer<T> {
    void onSubscribe(Disposable d);
    void onNext(T t);
    void onError(Throwable e);
    void onComplete();
}

其中醇蝴,我們將看不到RxJava1.0的action1之類的接口宣肚,取而代之的是與Java8命名類似的函數(shù)式接口。

查看源碼悠栓,subscribe前幾個方法最后都是調(diào)用的都是subscribe(Observer observer)霉涨,如下:

public Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete, Consumer<? super Disposable> onSubscribe) {
    ...
    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
    subscribe(ls);
    return ls;
}

LambdaObserver簡單封裝了我們傳入的函數(shù)接口,作為一個Observer惭适。
我們再看最基本的方法subscribe(Observer)的實現(xiàn)笙瑟,如下:

public final void subscribe(Observer<? super T> observer) {
  observer = RxJavaPlugins.onSubscribe(this, observer);
  subscribeActual(observer);
}   

RxJavaPlugins就是以前RxJava的RxJavaHooks,可以看成代理包裝癞志,不設(shè)置就是直接返回原對象往枷,可以忽略,最后調(diào)用subscribeActual()這個是抽象方法凄杯,真正調(diào)用流都是在這個方法里面進一步實現(xiàn)的错洁。

2.分析Observable.just()調(diào)用

示例代碼,打印“5”戒突,如下:

Disposable d = Observable.just("5").subscribe(s -> System.out.println(s));

Observable.just 是最簡單的rxjava操作了屯碴,就是生成數(shù)據(jù),如下:

public static <T> Observable<T> just(T item) {
  return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}

很簡單膊存,生成一個ObservableJust對象导而,內(nèi)容如下:

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {

    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        s.onSubscribe(sd);
        sd.run();
    }

    @Override
    public T call() {
        return value;
    }
}

所以,我們調(diào)用Observable.just()隔崎,會拿到我們調(diào)用鏈的第一個Observable對象---ObservableJust今艺,它繼承了Observable<T>并實現(xiàn)subscribeActual()方法。如上面示例代碼爵卒,根據(jù)前面分析虚缎,我們將打印的Consumer-->轉(zhuǎn)換為了LambdaObserver,傳入了ObservableJust對象的subscribeActual(LambdaObserver)技潘。

下一步遥巴,傳入obersver和value,生成ScalarDisposable對象享幽,直接調(diào)用該對象的run方法铲掐,完成了整個調(diào)用過程。ScalarDisposable.run方法值桩,如下:

@Override
public void run() {
    if (get() == START && compareAndSet(START, ON_NEXT)) {
        observer.onNext(value);
        if (get() == ON_NEXT) {
            lazySet(ON_COMPLETE);
            observer.onComplete();
        }
    }
}

忽略其他代碼摆霉,基本看出,調(diào)用了我們傳入的observer.onNext() -> observer.onComplete(),完成打印携栋。

3.組合Observable.just() 和 Observable.map()

示例代碼搭盾,將數(shù)字轉(zhuǎn)換為字符串,并打印婉支,如下:

Disposable d = Observable.just(1).map(i -> String.valueOf(i)).subscribe(s -> System.out.println(s));

我們知道鸯隅,Observable.just()返回了一個ObservableJust對象,也就是Observable<T>的實例向挖,因此蝌以,上述等式相當于,如下:

new ObservableJust().map(xxx);

因此我們先記住何之,當前對象是ObservableJust跟畅,打開map方法定義,如下:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
  return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

可以看到溶推,返回了一個ObservableMap對象徊件,該對象傳入了一個this,也就是當前的Observable對象蒜危,所以我們可以知道虱痕,如果這個操作符是轉(zhuǎn)換用的,肯定會傳入this當前observable對象舰褪,而Just操作是數(shù)據(jù)源的開頭皆疹,所以不需要。ObservableMap代碼如下:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
}

一樣的我們點開AbstractObservableWithUpstream<T, U>知道它繼承Observable<U>對象占拍,轉(zhuǎn)換流的能力抽象略就,從原始的T類型流,轉(zhuǎn)換為我們需要的U類型流晃酒。ObservableMap的構(gòu)造函數(shù)傳入了原始的T類型流表牢,也就是我們的ObservableJust<Integer>類型流,以及我們的轉(zhuǎn)換函數(shù)接口贝次,將subscribeActual具體實現(xiàn)的時候崔兴,ObservableJust收到的Observer對象是個類似Observer<Integer>代理對象,里面封裝了map的轉(zhuǎn)換邏輯和原始的observer<String>對象蛔翅,到這里ObservableJust的數(shù)據(jù)就全部接入MapObserver里面敲茄。MapObserver代碼如下:

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
    final Function<? super T, ? extends U> mapper;

    MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
        super(actual);
        this.mapper = mapper;
    }

    @Override
    public void onNext(T t) {
        ...
        U v;
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper 
        actual.onNext(v);
    }
}

代碼可以看出,收到ObservableJust對象的Integer值后山析,通過轉(zhuǎn)換函數(shù)接口Function堰燎,將T類型轉(zhuǎn)換了U類型,也就是我們要的String類型笋轨,然后發(fā)送給實際的接收者秆剪,完成整個流過程赊淑。偽代碼如下:

方法組合過程:
ObservableJust just = new ObservableJust(數(shù)據(jù));
ObservableMap map = new ObservableMap (just,  function);
訂閱過程:
Observer 實際接收者;
--->
map.subscribe(實際接收者)仅讽;
--->
Observer mapObserver = new Observer(實際接收者);
just.subsribe(mapObserver);
--->
導(dǎo)致數(shù)據(jù)流動開始陶缺,int  --->  mapObserver --> string --> 實際接收者

簡單分析了一下組合調(diào)用流程。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末洁灵,一起剝皮案震驚了整個濱河市饱岸,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌处渣,老刑警劉巖伶贰,帶你破解...
    沈念sama閱讀 222,183評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蛛砰,死亡現(xiàn)場離奇詭異罐栈,居然都是意外死亡,警方通過查閱死者的電腦和手機泥畅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,850評論 3 399
  • 文/潘曉璐 我一進店門荠诬,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人位仁,你說我怎么就攤上這事柑贞。” “怎么了聂抢?”我有些...
    開封第一講書人閱讀 168,766評論 0 361
  • 文/不壞的土叔 我叫張陵钧嘶,是天一觀的道長。 經(jīng)常有香客問我琳疏,道長有决,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,854評論 1 299
  • 正文 為了忘掉前任空盼,我火速辦了婚禮书幕,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘揽趾。我一直安慰自己台汇,他們只是感情好,可當我...
    茶點故事閱讀 68,871評論 6 398
  • 文/花漫 我一把揭開白布篱瞎。 她就那樣靜靜地躺著苟呐,像睡著了一般。 火紅的嫁衣襯著肌膚如雪俐筋。 梳的紋絲不亂的頭發(fā)上牵素,一...
    開封第一講書人閱讀 52,457評論 1 311
  • 那天,我揣著相機與錄音校哎,去河邊找鬼两波。 笑死瞳步,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播院领,決...
    沈念sama閱讀 40,999評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼采记,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了嘀倒?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,914評論 0 277
  • 序言:老撾萬榮一對情侶失蹤局冰,失蹤者是張志新(化名)和其女友劉穎测蘑,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體康二,經(jīng)...
    沈念sama閱讀 46,465評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡碳胳,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,543評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了沫勿。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片挨约。...
    茶點故事閱讀 40,675評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖产雹,靈堂內(nèi)的尸體忽然破棺而出诫惭,到底是詐尸還是另有隱情,我是刑警寧澤蔓挖,帶...
    沈念sama閱讀 36,354評論 5 351
  • 正文 年R本政府宣布夕土,位于F島的核電站,受9級特大地震影響瘟判,放射性物質(zhì)發(fā)生泄漏怨绣。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,029評論 3 335
  • 文/蒙蒙 一荒适、第九天 我趴在偏房一處隱蔽的房頂上張望梨熙。 院中可真熱鬧,春花似錦刀诬、人聲如沸咽扇。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,514評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽质欲。三九已至,卻和暖如春糠馆,著一層夾襖步出監(jiān)牢的瞬間嘶伟,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,616評論 1 274
  • 我被黑心中介騙來泰國打工又碌, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留九昧,地道東北人绊袋。 一個月前我還...
    沈念sama閱讀 49,091評論 3 378
  • 正文 我出身青樓,卻偏偏與公主長得像铸鹰,于是被迫代替她去往敵國和親癌别。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,685評論 2 360

推薦閱讀更多精彩內(nèi)容