介紹
RxJava是JVM的響應(yīng)式擴展,其實我現(xiàn)在也不知道它是什么意思,因為我現(xiàn)在也沒有用過響應(yīng)式編程,它提到了JVM,我也不知道它具體根JVM有多大的聯(lián)系,它還提到了觀察者模式的設(shè)計模式,擴展的觀察者模式(It extends the observer pattern
),這個我有一點了解.作為認(rèn)識的一部分,我先擺在這里.
官方的介紹是這樣的:
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
翻譯成中文:
一個在 Java VM 上使用可觀測的序列來組成異步的穆咐、基于事件的程序的庫
觀察者模式
RxJava使用了擴展的觀察者模式,與普通的觀察者模式不同,最大的不同在于普通的觀察者模式一次發(fā)布一個內(nèi)容,而RxJava會維護(hù)一個隊列,每出隊一次分發(fā)一個事件,觀察者接受和處理一個事件,當(dāng)然也有一定的限制,限制如下:
- 當(dāng)發(fā)射器調(diào)用onComplete()函數(shù)時(可以看作是特定信號),被觀察者繼續(xù)發(fā)射事件但觀察者不會接收.
- 被觀察者傳遞一個Disposable(通過接口),當(dāng)調(diào)用這個對象的
Disposable.dispose()
函數(shù)時,發(fā)射器停止發(fā)射事件
用RxJava實現(xiàn)通用的觀察者模式
值得注意的地方:
- RxJava還依賴了
Reactive Streams
,如果找不到這個包會報ClassNotFoundExpection
- 通過繼承
Observable<T>
的方式擴展被觀察者的時候(此示例使用這種方式),注冊,添加或者說注冊訂閱者和發(fā)布消息使用的是同一個函數(shù),這是結(jié)構(gòu)所決定的 - 使用第二點所說的方式實現(xiàn)通用的觀察者模式似乎不能取消訂閱
被觀察者
import io.reactivex.Observable;
import io.reactivex.Observer;
public class MyObservable extends Observable<String> {
@Override
protected void subscribeActual(Observer<? super String> observer) {
observer.onNext("test1");
observer.onNext("test2");
observer.onNext("test3");
observer.onComplete();
}
}
觀察者
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class MyObserver implements Observer<String> {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(String s) {
System.out.println("MyObsrever receiver"+s);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
System.out.println("MyObsrever receiver complete");
}
}
主函數(shù)
public class Main {
public static void main(String[] arg){
MyObservable observable = new MyObservable();
MyObserver observer = new MyObserver();
MyObserver2 observer2 = new MyObserver2();
observable.subscribeActual(observer);
observable.subscribeActual(observer2);
}
}
可以看到跟我自己實現(xiàn)的觀察者模式來比是差不多的
另一種方式
被觀察者
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
public class MyObservable implements ObservableOnSubscribe<String> {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
observableEmitter.onNext("test1");
observableEmitter.onNext("test2");
observableEmitter.onNext("test3");
observableEmitter.onComplete();
}
}
觀察者
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class MyObserver implements Observer<String> {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(String s) {
System.out.println("MyObsrever receiver"+s);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
System.out.println("MyObsrever receiver complete");
}
}
主函數(shù)
import io.reactivex.Observable;
public class Main {
public static void main(String[] arg){
MyObserver observer = new MyObserver();
MyObserver2 observer2 = new MyObserver2();
Observable<String> observable = Observable.create(new MyObservable());
observable.subscribe(observer);
observable.subscribe(observer2);
}
}
鏈?zhǔn)秸{(diào)用
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class Main {
public static void main(String[] arg){
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) {
observableEmitter.onNext("test1");
observableEmitter.onNext("test2");
observableEmitter.onNext("test3");
observableEmitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(String s) {
System.out.println("receive message "+s);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
System.out.println("receive complete!");
}
});
}
}
小結(jié)
Rxjava當(dāng)然不是只是實現(xiàn)觀察者模式這么簡單,但觀察者模式的的確確是它很重要的一個特征.