Learn RxJava 2 Observables 與 Subscribers

Chapter 2: Observables and Subscribers-觀察者與可觀察對象


本章重點

  • Observable的使用
  • Observer的使用
  • 其他創(chuàng)建Observable的工廠方法
  • Single、Completable和Maybe
  • Disposable

Observable是如何工作的

在我們做其他事情之前忌卤,我們需要學(xué)習(xí)的是一個Observable序列是如何將item通過鏈推送給Observer的喻括。在最高層,Observable傳遞這三類事件:

  • onNext():每次從源推送一個item到Observer
  • onComplete():將一個完成事件推送給Observer领追,表明后續(xù)不再有onNext()調(diào)用
  • onError():將一個錯誤事件推送給Observer,通常由觀察者來定義如何處理它他膳。除非使用retry操作符來攔截這一錯誤,否則可觀察鏈將終止蔓腐,并不在發(fā)出任何事件

這三個事件是Observer中的抽象方法矩乐,稍后我們將介紹其中的一些實現(xiàn)。我們先觀察它們在日常場景中是如何使用的回论。

提示??:在RxJava1.0中onComplete()其實叫做onCompleted()

使用Observable.create()

現(xiàn)在散罕,讓我們使用Observable.create()來創(chuàng)建一個Observable。相對而言source是我們觀測鏈的起點傀蓉。

Observable.create()允許我們通過lambda表達(dá)式創(chuàng)建一個Observable emitter欧漱。我們能夠調(diào)用Observable emitter的onNext()方法來發(fā)射(一次)數(shù)據(jù)(emissions),以及調(diào)用一次onComplete()來通知發(fā)射完成葬燎,之后便不再有事件發(fā)出误甚。這些onNext()調(diào)用將把item連接到觀察者缚甩,它將打印每一個項目,如下代碼所示:

package chapter2;


import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

public class Launcher {

    public static void main(String[] args) {

        Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("Alpha");
                emitter.onNext("Beta");
                emitter.onNext("Gamma");
                emitter.onNext("Delta");
                emitter.onNext("Epsilon");
                emitter.onComplete();
            }
        });

        source.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                System.out.println("RECEIVED:" + s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });


        // lambada
        Observable<String> sourceByLambda = Observable.create(emitter -> {
                emitter.onNext("Alpha");
                emitter.onNext("Beta");
                emitter.onNext("Gamma");
                emitter.onNext("Delta");
                emitter.onNext("Epsilon");
                emitter.onComplete();

        });

        sourceByLambda.subscribe(s -> System.out.println("REVEIVED:" + s));

    }
}

它們將輸出(非lambda版本):

RECEIVED:Alpha
RECEIVED:Beta
RECEIVED:Gamma
RECEIVED:Delta
RECEIVED:Epsilon
onComplete

lamda版本:

RECEIVED:Alpha
RECEIVED:Beta
RECEIVED:Gamma
RECEIVED:Delta
RECEIVED:Epsilon

提示??:在RxJava1.0中窑邦,需要使用Observable.fromEmitter()來替換Observable.create()擅威,因為后者在RxJava1.0中的作用與RxJava2.0,并且只針對RxJava高級用戶冈钦。

onNext()是處理每一項數(shù)據(jù)的一種方式:從“Alpha”開始郊丛,到調(diào)用鏈中的下一步。在這個例子中瞧筛,下一步是Observer,它使用s -> System.out.println("RECEIVED: " + s) 這一lambda表達(dá)式來打印每一項數(shù)據(jù)厉熟。這一Lambda表達(dá)式是在Observer的onNext()中調(diào)用的,我們稍后會更仔細(xì)的觀察Observer较幌。

警告??:需要注意的是揍瑟,Observable規(guī)定(http://reactivex.io/documentation/contract.html)數(shù)據(jù)必須按照順序一次性發(fā)送完畢,而不能通過Observable并行的發(fā)送乍炉。這似乎是一個限制绢片,但實際上這簡化了程序,使得Rx更易使用恩急。我們將在第六章:并發(fā)性與并行性中學(xué)習(xí)一些強(qiáng)大的技巧杉畜,從而在不違背Observable規(guī)定的情況下有效的利用并發(fā)性和并行性。

onComplete()用于通知Observer衷恭,不會再有數(shù)據(jù)推過來了此叠。Observable可以是無限的,如果是這種情況随珠,則永遠(yuǎn)不會調(diào)用onComplete()事件灭袁。從技術(shù)來講,Observable可以通過不再調(diào)用的onNext()方式來停止發(fā)出數(shù)據(jù)窗看,從而不使用onComplete()方法茸歧。但是如果Observable不再計劃發(fā)送數(shù)據(jù),這可能是一個糟糕的設(shè)計显沈。

盡管這個在這個例子中不會拋出異常软瞎,但是我們?nèi)耘f可以在Observable.create()代碼塊中捕獲可能會發(fā)生的異常,并通過onError()發(fā)射異常拉讯。這樣涤浇,異常就會被推送到調(diào)用鏈上交由Observer處理。前面的例子中我們的Observer不處理異常魔慷,但是你可以像下面這么做:

package chapter2;


import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

public class Launcher {

    public static void main(String[] args) {

        Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                try{
                    emitter.onNext("Alpha");
                    emitter.onNext("Beta");
                    emitter.onNext("Gamma");
                    emitter.onNext("Delta");
                    emitter.onNext("Epsilon");
                    emitter.onComplete(); 
                }catch (Exception e){
                    emitter.onError(e);
                }
            }
        });
        // Observer bind Observable
        source.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                System.out.println("RECEIVED:" + s);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println(e);
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });
        // lambda版本
        Observable<String> sourceByLambda = Observable.create(emitter -> {
            try{
                emitter.onNext("Alpha");
                emitter.onNext("Beta");
                emitter.onNext("Gamma");
                emitter.onNext("Delta");
                emitter.onNext("Epsilon");
            }catch (Exception e){
                emitter.onError(e);
            }
        });

        sourceByLambda.subscribe(s->System.out.println("RECEIVED:" + s),
                Throwable::printStackTrace);
    }
}

注意只锭,onNext()onComplete()onError并不一定會直接發(fā)送給最終的Observer院尔。它們還可以發(fā)送給調(diào)用鏈中的下一個操作符蜻展。在下面的代碼中喉誊,我們將使用map()filter()操作符派生新的Observable,這些操作符將在源Observable后到最終Observer打印數(shù)據(jù)間進(jìn)行操作纵顾。

package chapter2;


import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;

public class Launcher {

    public static void main(String[] args) {

        Observable<String> source = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                try {
                    emitter.onNext("Alpha");
                    emitter.onNext("Beta");
                    emitter.onNext("Gamma");
                    emitter.onNext("Delta");
                    emitter.onNext("Epsilon");
                    emitter.onComplete();
                } catch (Exception e) {
                    emitter.onError(e);
                }
            }
        });
        Observable<Integer> lengths = source.map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Exception {
                return s.length();
            }
        });
        Observable<Integer> filtered = lengths.filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer >= 5;
            }
        });
        filtered.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println("RECEIVED:" + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                throwable.printStackTrace();
            }
        });
        // lambda版本
        Observable<String> sourceByLambda = Observable.create(emitter -> {
            try {
                emitter.onNext("Alpha");
                emitter.onNext("Beta");
                emitter.onNext("Gamma");
                emitter.onNext("Delta");
                emitter.onNext("Epsilon");
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        });
        Observable<Integer> lengthsByLambda = sourceByLambda.map(String::length);
        Observable<Integer> filteredByLambda = lengthsByLambda.filter(integer -> integer >= 5);
        filteredByLambda.subscribe(s -> System.out.println("RECEIVED:" + s),
                Throwable::printStackTrace);
    }
}

運行后將輸出:

RECEIVED:5

RECEIVED:5

RECEIVED:5

RECEIVED:7

使用map()filter()操作符可在source Observer與Observer之間伍茄,onNext()將會把每一項數(shù)據(jù)交由map()操作符處理。在內(nèi)部片挂,它將充當(dāng)中介觀察者幻林,將每一個字符串轉(zhuǎn)換為其長度。反過來音念,這將調(diào)用onNext()將該整數(shù)傳遞給filter(),而lambda表達(dá)式i -> i >= 5過濾掉長度小于5的數(shù)據(jù)躏敢。最后filter()操作符調(diào)用onNext()將每一項數(shù)據(jù)推送給最終的Observer闷愤,將結(jié)果打印出來。

值得注意的是件余,map()操作符將從原來的Observable<String>上產(chǎn)生一個新的可觀察對象Observable<Integer>讥脐。filter()操作符同樣會返回一個Observable<Integer>,但是忽略了那些沒有達(dá)到輸出條件的數(shù)據(jù)啼器。

由于像map()filter()這樣的操作符產(chǎn)生了新的可觀察對象(在內(nèi)部通過使用Observer來接收數(shù)據(jù)實現(xiàn))旬渠,我們可以將所有的可觀察對象與下一個操作符連接在一起,不必再將每一步都保存在一個中間變量中端壳。

package chapter2;


import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;

public class Launcher {

    public static void main(String[] args) {

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                try {
                    emitter.onNext("Alpha");
                    emitter.onNext("Beta");
                    emitter.onNext("Gamma");
                    emitter.onNext("Delta");
                    emitter.onNext("Epsilon");
                    emitter.onComplete();
                } catch (Exception e) {
                    emitter.onError(e);
                }
            }
        }).map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Exception {
                return s.length();
            }
        }).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer >= 5;
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println("RECEIVED:" + integer);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                throwable.printStackTrace();
            }
        });
        // lambda版本
        Observable<String> sourceByLambda = Observable.create(emitter -> {
            try {
                emitter.onNext("Alpha");
                emitter.onNext("Beta");
                emitter.onNext("Gamma");
                emitter.onNext("Delta");
                emitter.onNext("Epsilon");
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        });
        sourceByLambda.map(String::length)
                .filter(integer -> integer >= 5)
                .subscribe(s -> System.out.println("RECEIVED:" + s),
                Throwable::printStackTrace);
    }
}

同樣會輸出如下結(jié)果:

RECEIVED:5

RECEIVED:5

RECEIVED:5

RECEIVED:7

這種鏈?zhǔn)秸{(diào)用的方式在響應(yīng)式編程中是常見的(并且是推薦的方式)告丢。它的可讀性很好,從左至右损谦,從上到下像書的結(jié)構(gòu)一樣岖免,這對可維護(hù)性和易讀性有很大的幫助。

警告??:在RxJava2.0中照捡,可觀察對象不再支持發(fā)射一個空值颅湘。如果你創(chuàng)建了一個試圖發(fā)射空值得可觀察對象,將得到一個非空y異常栗精。如果你需要發(fā)射一個空值闯参,請考慮使用Java 8或是Google的Guava庫的Optional將其封裝起來。

例如:

package chapter2;


import io.reactivex.Observable;

import java.util.Optional;

public class Launcher {

    public static void main(String[] args) {

        Optional<Void> voidOptional = Optional.empty();// 等同于Optional.ofNullable(null)
        Observable.create(emitter -> {
            emitter.onNext(voidOptional);
            emitter.onComplete();
        }).subscribe(s -> System.out.println(s));
    }
}

這將會輸出:

Optional.empty

有關(guān)Optional的更多信息看這里http://www.importnew.com/6675.html

使用Observable.just()

在我們繼續(xù)查看subscribe()方法之前悲立,先要提醒的是鹿寨,你可能并不需要經(jīng)常使用Observable.create()來創(chuàng)建可觀察對象。讀完本章后面的內(nèi)容你就會發(fā)現(xiàn)级历,它更多的是幫助我們使用本來不是響應(yīng)式的元數(shù)據(jù)释移。所以一般我們僅僅需要使用其他精簡版的工廠方法來創(chuàng)建Observable
在之前使用Observable.create()的例子中寥殖,我們可以使用Observable.just()方法來實現(xiàn)同樣的功能玩讳。它最多可以發(fā)射10個同類型的數(shù)據(jù)涩蜘,然后為每一項數(shù)據(jù)調(diào)用onNext()方法發(fā)射它們,并在發(fā)射完成后調(diào)用onComplete()方法:

package chapter2;

import io.reactivex.Observable;

public class Launcher {

    public static void main(String[] args) {
        Observable.just("Alpha", "Beta", "Gamma", "Delta", "Espilon")
                .map(String::length)
                .filter(i -> i >= 5)
                .subscribe(s -> System.out.println("RECEIVED:" + s),
                        Throwable::printStackTrace,() ->System.out.println("onComplete"));
    }
}

我們也可以使用Observable.formIterable()來發(fā)射任何Iterbale類型的數(shù)據(jù)熏纯,比如說List同诫。它也會通過調(diào)用onNext()方法來發(fā)射每一項數(shù)據(jù),然后在迭代完成后調(diào)用onComplete()方法樟澜。你可能會頻繁的使用這一工廠方法误窖,因為在java中迭代器是很常見的,并且很容易寫出如下代碼:

package chapter2;

import io.reactivex.Observable;

import java.util.Arrays;
import java.util.List;

public class Launcher {

    public static void main(String[] args) {
        List<String> items=Arrays.asList("Alpha", "Beta", "Gamma", "Delta", "Espilon");
        Observable.fromIterable(items)
                .map(String::length)
                .filter(i -> i>=5)
                .subscribe(s -> System.out.println("RECEIVED:" + s),
                        Throwable::printStackTrace,() ->System.out.println("onComplete"));
    }
}

我們將在本章后續(xù)引入更多創(chuàng)建Observable的工廠方法秩贰,但是現(xiàn)在讓我們把重點放在了解Observer上霹俺。

Observer接口

實際上onNext()onComplete()以及onError()方法都定義在Observer類中毒费,RxJava通過這一抽象接口來傳遞這些事件丙唧。下面這段代碼就是Observer接口的定義。暫時不要關(guān)注onSubscribe()方法觅玻,因為我們將在本章后面的內(nèi)容中介紹它想际。請注意其他三個方法:

package io.reactivex;

import io.reactivex.disposables.Disposable;

public interface Observer<T> { 
        void onSubscribe(Disposable d);
        void onNext(T value);
        void onError(Throwable e);
        void onComplete(); 
}

ObserverObservable有時可能是相對的。在同一上下文中溪厘,調(diào)用鏈的起點和數(shù)據(jù)發(fā)出的地方的Observable都可以稱為source Observable胡本。在前面的示例中,你可以認(rèn)為Observable.create()Observable.just()返回的Observable是source Observable畸悬。但是在filter()操作符中侧甫,它的Observable是從map()操作符返回的。它無從得知真正的起點在哪傻昙,它只知道它正在接收來自它上游map()發(fā)射的數(shù)據(jù)闺骚。

相反,由操作符返回的每一個Observable內(nèi)部都是Observer妆档,它接收僻爽、轉(zhuǎn)換來自上游的數(shù)據(jù),并作為中繼將數(shù)據(jù)傳遞給下游贾惦。它不知道下游究竟是一個操作符還是調(diào)用鏈尾部的最后一個Observer胸梆。當(dāng)我們經(jīng)常提到的Observer指的是處于調(diào)用連尾部的最終消耗了數(shù)據(jù)的觀察者。但是對于map()filter()這些操作符來說须板,其內(nèi)部同樣利用了Observer碰镜。

我們將會在第九章:Transformers 和 自定義操作符中學(xué)習(xí)更多關(guān)于自定義操作符的知識,現(xiàn)在我們將著重于如何使用Observersubscribe()方法习瑰。

警告??:在RxJava 1.0中绪颖,Subseriber本質(zhì)上是RxJava 2.0中的Observer。在RxJava 1.0中甜奄,Subseriber和RxJava 2.0的Observer類柠横,同樣定義了三個事件窃款,但是Subseribersubseribe()方法的參數(shù),并且它實現(xiàn)了Observer接口牍氛。在RxJava 2.0中晨继,僅當(dāng)我們提到Flowables時才存在Subseriber。我們將在第八章:Flowables 和 背壓中說到它搬俊。

實現(xiàn)并訂閱一個Observer

當(dāng)你調(diào)用Observablesubscribe()方法時,一個Observer就會通過實現(xiàn)它的抽象方法來使用這三個事件紊扬。我們可以手動實現(xiàn)一個Observer,并將它實例化后傳遞給subscribe()方法唉擂,而不是像之前那樣將lambda表達(dá)式作為參數(shù)〔褪海現(xiàn)在不要理會onSubscribe()方法,我們先提供一個空實現(xiàn)玩祟,在本章后面的內(nèi)容中啤挎,我們會談到過它。

package chapter2;

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

public class Launcher {

    public static void main(String[] args) {
        Observable<String> source = Observable.just("Alpha", "Beta", "Gamma", "Delta", "Espilon");
        Observer<Integer> myObserver = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                // 不去使用disposable卵凑,現(xiàn)在不要理會它
            }

            @Override
            public void onNext(Integer value) {
                System.out.println("RECEIVED:" + value);
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("Done!");
            }
        };

        source.map(String::length)
                .filter(i -> i>=5)
                .subscribe(myObserver);
    }
}

這將會輸出:

RECEIVED:5
RECEIVED:5
RECEIVED:5
RECEIVED:7
Done!

我們快速創(chuàng)建了一個Observer<Integer>類型的Observer,它接收整數(shù)類型的參數(shù)作為長度數(shù)據(jù)胜臊。我們的Observer作為調(diào)用鏈的尾部接收數(shù)據(jù)勺卢。這樣一來,意味著我們可以將這些數(shù)據(jù)寫入數(shù)據(jù)庫象对、文本文件黑忱,作為服務(wù)器響應(yīng),顯示在UI上勒魔,或者(像本例一樣)只是打印在控制臺中甫煞。

讓我們從開始發(fā)射字符串的源頭開始進(jìn)一步觀察這個例子。我們先是定義了我們的Observer冠绢,并將其作為參數(shù)抚吠,傳遞給了調(diào)用鏈尾部的subscribe()方法弟胀。注意楷力,每個字符串都被轉(zhuǎn)換成了它的長度。onNext()方法接收每個長度數(shù)據(jù)孵户,并通過System.out.println("RECEIVED:" + value)將其打印到控制臺萧朝。這個過程中不會拋出任何異常,但如果在我們的調(diào)用鏈中的任意一個階段拋出了異常夏哭,它將被推送到我們實現(xiàn)的onError()方法中检柬,然后打印堆棧信息。最后竖配,當(dāng)發(fā)射源沒有數(shù)據(jù)時(發(fā)射完 "Espilon"之后),這會導(dǎo)致調(diào)用鏈上的觀察者的onComplete()被逐一調(diào)用何址,直到最后一個onComplete()方法將Done打印到控制臺中里逆。

使用lambda表達(dá)式快速實現(xiàn)Observer

實現(xiàn)一個Observer代碼有點冗長且過程麻煩。幸運的是头朱,subscribe()的重載方法接收我們用lambda表達(dá)式實現(xiàn)這三個事件运悲。我們大多數(shù)情況下可能更喜歡這樣——指定三個用逗號分隔開的lambda表達(dá)式作為參數(shù):onNext() lambda、onError() lambda 和 onComplete()lambda项钮。對于前面的例子來說班眯,我們可以使用這三個lambda表達(dá)式來實現(xiàn)那三個方法:

    Consumer<Integer> onNext = i -> System.out.println("RECEIVED: " + i);
    Action onComplete = () -> System.out.println("Done!");
    Consumer<Throwable> onError = Throwable::printStackTrace;

我們可以將這三個lambda表達(dá)式作為subscribe()的參數(shù),subscribe()會將使用它們?yōu)槲覀儗崿F(xiàn)一個Observer烁巫。這樣要簡潔的多署隘,需要的樣本代碼也少得多:

package chapter2;

import io.reactivex.Observable;

public class Launcher {

    public static void main(String[] args) {
        Observable<String> source =
                Observable.just("Alpha", "Beta", "Gamma", "Delta",
                        "Epsilon");
        source.map(String::length)
                .filter(i -> i >= 5)
                .subscribe(i -> System.out.println("RECEIVED: " + i),
                        Throwable::printStackTrace, () -> System.out.println("Done!"));
    }
}

輸出如下:

RECEIVED: 5
RECEIVED: 5
RECEIVED: 5
RECEIVED: 7
Done!

請注意,subscribe()還有其他的重載亚隙。你可以忽略onComplete()磁餐,只實現(xiàn)onNext()onError()。當(dāng)你不需要時onComplete()阿弃,像下面這段代碼這樣诊霹,將不再為onComplete()執(zhí)行任何操作:

package chapter2;

import io.reactivex.Observable;

public class Launcher {

    public static void main(String[] args) {
        Observable<String> source =
                Observable.just("Alpha", "Beta", "Gamma", "Delta",
                        "Epsilon");
        source.map(String::length)
                .filter(i -> i >= 5)
                .subscribe(i -> System.out.println("RECEIVED: " + i),
                        Throwable::printStackTrace);
    }
}

輸出如下:

RECEIVED: 5
RECEIVED: 5
RECEIVED: 5
RECEIVED: 7

正如你前面看到的那樣,你甚至可以忽略onError()渣淳,僅指定onNext()

package chapter2;

import io.reactivex.Observable;

public class Launcher {

    public static void main(String[] args) {
        Observable<String> source =
                Observable.just("Alpha", "Beta", "Gamma", "Delta",
                        "Epsilon");
        source.map(String::length)
                .filter(i -> i >= 5)
                .subscribe(i -> System.out.println("RECEIVED: " + i));
    }
}

然而脾还,在生產(chǎn)環(huán)境中還是要盡量實現(xiàn)onError()。在調(diào)用鏈中的任何地方發(fā)生的異常都會讓Observable終止發(fā)射數(shù)據(jù)入愧,并將異常交由onError()處理鄙漏。如果你不指定onError(),那么異常將無法得到處理:

提示??:當(dāng)發(fā)生錯誤時棺蛛,你可以使用retry()操作符重新訂閱一個Observable來嘗試恢復(fù)怔蚌。我們將在下一章節(jié)介紹如何實現(xiàn)這一操作。

值得注意的是旁赊,subscribe()的大多數(shù)重載方法(包括我們剛剛使用的lambda縮寫)都返回了Disposable對象桦踊,我們并沒有對它進(jìn)行任何操作。disposable可以使我們在Observer中與Observable斷開連接彤恶,這樣發(fā)射就會被提前終止钞钙。這對于一直運行或者是長時間運行的Observable可是至關(guān)重要的,我們將在本章的末尾介紹disposable声离。

Observables的冷于熱

ObservableObserver間究竟是什么樣的微妙關(guān)系芒炼,與Observable的實現(xiàn)有關(guān)。Observables的冷與熱是它的一個重要特性术徊,這定義了當(dāng)存在多個Observable時會發(fā)生的情況本刽,首先,我們先介紹Observables的冷。

Observables的冷

冷的Observable就像是一張CD子寓。它能夠被每一個聽者重新播放暗挑,所以每一個人在任何時候都能夠聽到完整的音樂。同樣斜友,Observable也會為每一個Observer重發(fā)數(shù)據(jù)炸裆,確保每一個Observer都能夠接收到全部數(shù)據(jù)。大多數(shù)數(shù)據(jù)驅(qū)動的Observable都是冷的鲜屏,包括Observable.just()Observable.fromIterable()等工廠方法烹看。

在下面的例子中,有兩個慣著差訂閱了同一個Observable洛史。這個Observable將會把所有數(shù)據(jù)先發(fā)送給第一個Observer然后調(diào)用onComplete()惯殊。然后,它將會給第二個Observer發(fā)送所有的數(shù)據(jù)然后調(diào)用onComplete()也殖。這兩個觀察者從不同的數(shù)據(jù)流接收到了相同的數(shù)據(jù)土思,這便是冷Observable的一個典型應(yīng)用:

package chapter2;

import io.reactivex.Observable;

public class Launcher {

    public static void main(String[] args) {
        Observable<String> source =
                Observable.just("Alpha", "Beta", "Gamma", "Delta",
                        "Epsilon");
        source.subscribe(i -> System.out.println("observer1 RECEIVED: " + i));
        source.subscribe(i -> System.out.println("observer2 RECEIVED: " + i));
        
    }
}

輸出如下:

observer1 RECEIVED: Alpha
observer1 RECEIVED: Beta
observer1 RECEIVED: Gamma
observer1 RECEIVED: Delta
observer1 RECEIVED: Epsilon
observer2 RECEIVED: Alpha
observer2 RECEIVED: Beta
observer2 RECEIVED: Gamma
observer2 RECEIVED: Delta
observer2 RECEIVED: Epsilon

即使第二個Observer通過操作符轉(zhuǎn)換了它的數(shù)據(jù)集,仍舊能夠從數(shù)據(jù)集中得到自己的數(shù)據(jù)流忆嗜。使用像map()filter()操作冷的Obserbvable得到的仍舊是冷的Observable:

package chapter2;

import io.reactivex.Observable;

public class Launcher {

    public static void main(String[] args) {
        Observable<String> source =
                Observable.just("Alpha", "Beta", "Gamma", "Delta",
                        "Epsilon");
        source.subscribe(i -> System.out.println("observer1 RECEIVED: " + i));
        source.map(String::length)
                .filter(i -> i >= 5)
                .subscribe(i -> System.out.println("observer2 RECEIVED: " + i));
    }
}

輸出如下:

observer1 RECEIVED: Alpha
observer1 RECEIVED: Beta
observer1 RECEIVED: Gamma
observer1 RECEIVED: Delta
observer1 RECEIVED: Epsilon
observer2 RECEIVED: 5
observer2 RECEIVED: 5
observer2 RECEIVED: 5
observer2 RECEIVED: 7

如前所述己儒,發(fā)出有限數(shù)據(jù)集的Observable通常都是冷的。

這是一個更加真實的案例:Dave Moten的RxJava-JDBC庫允許你創(chuàng)建一個冷的Observable來查詢數(shù)據(jù)庫捆毫。我們并不會深入這個庫址愿。假設(shè)你想查詢一個SQLite數(shù)據(jù)庫,并且項目中引入了SQLite JDBC驅(qū)動和RxJava-JDBC庫损拢。你可以像下面這段代碼所示一樣舶衬,查詢表中數(shù)據(jù):

import com.github.davidmoten.rx.jdbc.ConnectionProviderFromUrl;
import com.github.davidmoten.rx.jdbc.Database;
import rx.Observable; import java.sql.Connection;
public class Launcher {
    public static void main(String[] args) { 
        Connection conn = new ConnectionProviderFromUrl("jdbc:sqlite:/home/thomas/rexon_metals.db").get();
        Database db = Database.from(conn);
        Observable<String> customerNames =
          db.select("SELECT NAME FROM CUSTOMER")
          .getAs(String.class);
        customerNames.subscribe(s -> System.out.println(s));
      }
}

輸出如下:


ToggleButton
LITE Industrial
Rex Tooling Inc
Re-Barre Construction
Prairie Construction
Marsh Lane Metal Works

這個SQL驅(qū)動的Observable是冷的。許多從數(shù)據(jù)庫、文本文件或者是JSON等有限的數(shù)據(jù)源發(fā)出的Observable都是冷的过牙。重點是Observable是如何運行的。RxJava-JDBC將會為每一個觀察者都巡行一次該查詢征绸。這意味著克锣,如果在第二個觀察者訂閱前數(shù)據(jù)發(fā)生了變化,那么它將得到與第一個觀察者所得到的不同的數(shù)據(jù)律适。即使結(jié)果數(shù)據(jù)從底層的表中發(fā)生了改變辐烂,但是Observable仍然是冷的,因為它僅僅是重新運行了這段查詢捂贿。

再次強(qiáng)調(diào)纠修,冷的Observbales將會以某種形式為每一個Observer重新發(fā)送這個Observbale所取得的數(shù)據(jù)。下面厂僧,我們將會介紹比數(shù)據(jù)更像事件的熱的Observbales扣草。

Observbales的熱

你剛剛學(xué)習(xí)了什么是冷的Observable,它的工作原理就像一張CD。而一個熱的Observbale則更像是一個電臺辰妙。它像廣播一樣同時向所有的觀察者發(fā)射數(shù)據(jù)鹰祸。如果一個Observable訂閱了一個熱的Observable并接收了一些數(shù)據(jù)。這時另一個Observer也訂閱了它密浑,那么他將錯過之前發(fā)射的那些數(shù)據(jù)蛙婴。就像電臺一樣,如果你切換的晚了些尔破,那么你將錯過那首歌街图。

從邏輯上來講,熱的Observble往往代表的是事件呆瞻,而不是有限的數(shù)據(jù)集台夺。這些事件能夠攜帶數(shù)據(jù),但是它們對時間敏感——后訂閱的觀察者會錯過之前發(fā)射的數(shù)據(jù)痴脾。

舉個栗子颤介,一個JavaFX或者Android 的UI事件都能夠被看做是熱的Observable。在JavaFX中赞赖,你可以使用Observable.create()創(chuàng)建一個Observable<Bollean>來包裝ToggleButtonselectedProperty()滚朵。然后將布爾值轉(zhuǎn)換為"UP"或者"DOWN"表示ToggleButton處于開啟狀態(tài)還是關(guān)閉狀態(tài),之后使用一個ObserverLabel中顯示它前域,如下面的代碼片段所示:

提示??:請在gradle中添加RxJavaFx的依賴:compile 'io.reactivex.rxjava2:rxjavafx:2.2.2'

package chapter2;

import io.reactivex.Observable;
import io.reactivex.functions.Function;
import javafx.application.Application;
import javafx.beans.value.ChangeListener;
import javafx.beans.value.ObservableValue;
import javafx.event.EventHandler;
import javafx.scene.Scene;
import javafx.scene.control.Button;
import javafx.scene.control.Label;
import javafx.scene.control.ToggleButton;
import javafx.scene.input.MouseEvent;
import javafx.scene.layout.VBox;
import javafx.stage.Stage;

public class MyJavaFxApp extends Application {

    @Override
    public void start(Stage primaryStage) throws Exception {
        System.out.println("start!");
        ToggleButton toggleButton = new ToggleButton("TOGGLE ME");
        Label label = new Label();

        Observable<Boolean> selectedStates = valuesOf(toggleButton.selectedProperty());
        selectedStates.map(selected -> selected?"DOWN":"UP")
                .subscribe(label::setText);// lambda還能夠引用實例的方法

        VBox vBox = new VBox(toggleButton,label);

        primaryStage.setScene(new Scene(vBox));
        primaryStage.show();
    }


    private static <T> Observable<T> valuesOf(final ObservableValue<T> fxObservable){
        return Observable.create(observableEmitter -> {
            // 發(fā)射初始狀態(tài)
            observableEmitter.onNext(fxObservable.getValue());
            // 當(dāng)狀態(tài)改變時在監(jiān)聽器中發(fā)射當(dāng)前狀態(tài)
            final ChangeListener<T> listener =
                    (observableValue,prev,current) -> observableEmitter.onNext(current);
            fxObservable.addListener(listener);
        });
    }
}

輸出如下:
[圖片上傳失敗...(image-c7b37a-1517302658055)]
這是一個通過一個熱的Observbale<Boolean>來包裝ToggleButton的選擇狀態(tài)的JavaFX app辕近。

注意??:如果你使用的時OpenJDK,則需要單獨導(dǎo)入JavaFX庫匿垄。JavaFx庫的可用版本可在甲骨文官方文檔中查看http://www.oracle.com/technetwork/java/javase/downloads/index.html

JavaFX的ObservableValue與RxJava的Observbale沒有任何關(guān)系移宅。它是JavaFX所獨有的,但是我們可以通過使用valuesOf()工廠方法在ChangeListener中調(diào)用onNext()方法從而很容易的轉(zhuǎn)換成了RxJava的Observable椿疗。當(dāng)你每次點擊ToggleButton時漏峰,這個Observable<Boolean>都會發(fā)射一個true或者false來映射選擇狀態(tài)。這是一個簡單的例子届榄,表明這個Observbale不但是在發(fā)射事件浅乔,而且也是在發(fā)射true或false這個數(shù)據(jù)。它將布爾值轉(zhuǎn)換為字符串铝条,并通過Observer改變Label中的文本靖苇。

在這個JavaFX的示例中,我們只有一個Observer班缰。如果我們讓更多的觀察者參與ToggleButton數(shù)據(jù)發(fā)出之后的事件贤壁,那么這些新的觀察者將錯過這些數(shù)據(jù)。

JavaFX和Android上的UI事件是熱的Observable的主要例子埠忘,但是你同樣可以使用熱的Observable來響應(yīng)服務(wù)器請求芯砸。如果你為某個特定的話題發(fā)射消息的Twitter流創(chuàng)建一個Observbale萧芙,那也將是一個熱的Observbale。雖然許多熱的Observable的源的數(shù)據(jù)都是無限的(infinite)假丧,但是它們也可以不這樣双揪。他們只需同時向所有觀察者發(fā)射數(shù)據(jù),并且不對那些遲到的觀察者重發(fā)之前的數(shù)據(jù)即可包帚。

提示??:RxJavaFX(以及RxAndroid)都有各類工廠方法幫你將UI事件與Observbale進(jìn)行綁定渔期。你可以使用RxJavaFX的valuesOf()這一工廠方法簡化上面的示例。

請注意渴邦,在這個JavaFX示例中我們同樣沒有處理disposal疯趟,我們將會在本章末尾再去討論它。

ConnnectableObservbale

ConnectableObservable是一個有用的熱Observbale谋梭。它能夠接收任何Observable信峻,即使它是一個冷的,也能夠?qū)⑵滢D(zhuǎn)為熱的然后將所有數(shù)據(jù)同時向所有的觀察者同時發(fā)送一次瓮床。要進(jìn)行這種轉(zhuǎn)換盹舞,只需調(diào)用任意一個Observbalepublish()方法,就會產(chǎn)生一個ConnectableObservable隘庄。
但是訂閱后并不會自動發(fā)射數(shù)據(jù)踢步。你需要調(diào)用connect()方法來開始發(fā)射,這允許你提前設(shè)置好所有的觀察者丑掺。請看下面這段代碼:

package chapter2;

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;

public class HotLauncher {

    public static void main(String[] args){
        ConnectableObservable<String> source = Observable
                .just("Alpha","Beta","Delta","Gamma","Epsilon")
                .publish();
        source.subscribe(s -> System.out.println("observer1 RECEIVED: " + s));

        source.map(String::length)
                .subscribe(i -> System.out.println("observer2 RECEIVED: " + i));
         //發(fā)射获印!
        source.connect();
    }

}

輸出如下:

observer1 RECEIVED: Alpha
observer2 RECEIVED: 5
observer1 RECEIVED: Beta
observer2 RECEIVED: 4
observer1 RECEIVED: Delta
observer2 RECEIVED: 5
observer1 RECEIVED: Gamma
observer2 RECEIVED: 5
observer1 RECEIVED: Epsilon
observer2 RECEIVED: 7

請注意,第一個觀察者接收到的是字符串街州,而另一個觀察者接收到的是長度兼丰,它們以交錯的形式打印數(shù)據(jù)。這兩種訂閱都是預(yù)先設(shè)置好的唆缴,然后通過調(diào)用connect()來發(fā)射數(shù)據(jù)地粪。兩個觀察者會同時收到這些數(shù)據(jù):第一個觀察者接收Alpha同時第二個觀察者接收到5之后是Beta和4,后面的也一樣琐谤,而不是讓第一個觀察者在第二個觀察者之前處理完所有數(shù)據(jù)。第使用ConnectableObservable強(qiáng)制將每一個數(shù)據(jù)都同時發(fā)送到所有觀察者的情況玩敏,我們將在第五章多播中詳細(xì)介紹斗忌。

ConnectableObservable有助于防止為每個觀察者重新發(fā)送數(shù)據(jù)的情況。當(dāng)重新發(fā)送的代價很大旺聚,這種情況下织阳,你也許寧愿將數(shù)據(jù)同時發(fā)送給所有觀察者。即使在下游有多個觀察者砰粹,你也可以通過ConnectableObservable簡單地強(qiáng)制上游的操作符使用單個流實例唧躲。多個觀察者通常會在上游生成多個數(shù)據(jù)流實例,但是使用public()返回的ConnectableObservable將上游所有的數(shù)據(jù)流合并到同一個流中。同樣弄痹,這些細(xì)微差別將在第五章多播中一一介紹饭入。

現(xiàn)在只需要記住ConnectableObservable是熱的,因此肛真,在connect()之后訂閱的觀察者將錯過之前發(fā)射的數(shù)據(jù)谐丢。

其他Observable源

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市蚓让,隨后出現(xiàn)的幾起案子乾忱,更是在濱河造成了極大的恐慌,老刑警劉巖历极,帶你破解...
    沈念sama閱讀 222,252評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件窄瘟,死亡現(xiàn)場離奇詭異,居然都是意外死亡趟卸,警方通過查閱死者的電腦和手機(jī)蹄葱,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來衰腌,“玉大人新蟆,你說我怎么就攤上這事∮胰铮” “怎么了琼稻?”我有些...
    開封第一講書人閱讀 168,814評論 0 361
  • 文/不壞的土叔 我叫張陵,是天一觀的道長饶囚。 經(jīng)常有香客問我帕翻,道長,這世上最難降的妖魔是什么萝风? 我笑而不...
    開封第一講書人閱讀 59,869評論 1 299
  • 正文 為了忘掉前任嘀掸,我火速辦了婚禮,結(jié)果婚禮上规惰,老公的妹妹穿的比我還像新娘睬塌。我一直安慰自己,他們只是感情好歇万,可當(dāng)我...
    茶點故事閱讀 68,888評論 6 398
  • 文/花漫 我一把揭開白布揩晴。 她就那樣靜靜地躺著,像睡著了一般贪磺。 火紅的嫁衣襯著肌膚如雪硫兰。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,475評論 1 312
  • 那天寒锚,我揣著相機(jī)與錄音劫映,去河邊找鬼违孝。 笑死,一個胖子當(dāng)著我的面吹牛泳赋,可吹牛的內(nèi)容都是我干的雌桑。 我是一名探鬼主播,決...
    沈念sama閱讀 41,010評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼摹蘑,長吁一口氣:“原來是場噩夢啊……” “哼筹燕!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起衅鹿,我...
    開封第一講書人閱讀 39,924評論 0 277
  • 序言:老撾萬榮一對情侶失蹤撒踪,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后大渤,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體制妄,經(jīng)...
    沈念sama閱讀 46,469評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,552評論 3 342
  • 正文 我和宋清朗相戀三年泵三,在試婚紗的時候發(fā)現(xiàn)自己被綠了耕捞。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,680評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡烫幕,死狀恐怖俺抽,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情较曼,我是刑警寧澤磷斧,帶...
    沈念sama閱讀 36,362評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站捷犹,受9級特大地震影響弛饭,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜萍歉,卻給世界環(huán)境...
    茶點故事閱讀 42,037評論 3 335
  • 文/蒙蒙 一侣颂、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧枪孩,春花似錦憔晒、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至斗幼,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間抚垄,已是汗流浹背蜕窿。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評論 1 274
  • 我被黑心中介騙來泰國打工谋逻, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人桐经。 一個月前我還...
    沈念sama閱讀 49,099評論 3 378
  • 正文 我出身青樓毁兆,卻偏偏與公主長得像,于是被迫代替她去往敵國和親阴挣。 傳聞我的和親對象是個殘疾皇子气堕,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,691評論 2 361