RxJava基礎(chǔ)
使用RxJava之前疆前,先了解四個基本概念:
- Observable:被觀察者
- Observer:觀察者
- subscribe:訂閱
- 事件
工作流程:
RxJava采用的是觀察者模式蹂季,首先建立被觀察者Observable和觀察者Observer沼撕,他們通過subscribe來建立聯(lián)系(Observable.subscribe(Observer))蒸矛,然后Observable在特定的時刻(完成某些操作考余,獲得返回結(jié)果等)可以發(fā)送事件給Observer技即,Observer根據(jù)接收到的事件作出相應(yīng)的動作未蝌。
來看一個簡單的例子:
//創(chuàng)建被觀察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//執(zhí)行一些其他操作
//.............
//執(zhí)行完畢蔽午,觸發(fā)回調(diào)易茬,通知觀察者
e.onNext("事件");
}
});
//創(chuàng)建觀察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
//觀察者接收到通知,進(jìn)行相關(guān)操作
public void onNext(String aLong) {
System.out.println("收到事件");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
//訂閱
observable.subscribe(observer);
這就是一個完整的RxJava觀察者模式。被觀察者的創(chuàng)建使用 Observable.create,內(nèi)部實(shí)現(xiàn)了subscribe方法抽莱。一旦Observable被訂閱(observable.subscribe(observer))范抓,subscribe方法就會被調(diào)用,開始發(fā)送事件食铐。
事件的發(fā)送由ObservableEmitter來完成匕垫,Emitter是事件的發(fā)送者,可以調(diào)用它的onNext(), onComplete(), onError()方法來發(fā)送不同的事件虐呻。
觀察者Observer包含四個方法:onSubscribe方法在訂閱完成以后象泵,第一個onNext事件之前調(diào)用,只調(diào)用一次斟叼。其他三個方法則由被觀察者決定是否調(diào)用偶惠。
onSubscribe方法接收一個Disposable對象作為參數(shù),它用來切斷observable和observer之間的聯(lián)系朗涩。一旦調(diào)用了Disposable對象的dispose()方法忽孽,observer就停止動作,不再響應(yīng)觀察對象發(fā)送的事件了谢床。
將上面的例子加工一下兄一,驗(yàn)證Disposable的作用:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("a");
e.onNext("b");
e.onNext("c");
}
})
.subscribe(new Observer<String>() {
private Disposable mDisposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d("RxJava", "onSubscribe" );
mDisposable = d;
}
@Override
public void onNext(@NonNull String s) {
Log.d("RxJava", "Received Message: " + s );
if (s.equals("b")) mDisposable.dispose();
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
Log.d("RxJava", "onComplete" );
}
});
代碼做了一點(diǎn)修改,采用鏈?zhǔn)讲僮魇锻龋雌饋砀啙嵰恍┏龈铮硗獍l(fā)送完第二個事件之后,調(diào)用了dispose方法覆履√E瑁可以看到運(yùn)行結(jié)果如下:
D/RxJava: onSubscribe
D/RxJava: Received Message: a
D/RxJava: Received Message: b
介紹到這里,整個RxJava的工作流程就完成了硝全∑芪恚總結(jié)一下事件的發(fā)送和接收規(guī)則:
- onNext()事件不限制發(fā)送次數(shù),被觀察者可以發(fā)送無限多個onNext伟众,下游也可以接收無限多個onNext
- onComplete()/onError()并不會自動發(fā)送析藕,需要被觀察者指定什么時間發(fā)送
- onComplete()/onError()發(fā)送之后,被觀察者仍然可以繼續(xù)發(fā)送事件凳厢,但是觀察者接收到onComplete()/onError()之后就不再接收事件了账胧。
- onComplete()/onError()是唯一且互斥的。也就是說先紫,發(fā)送的事件中治泥,只能存在一個onComplete()或者onError()。否則可能會引起崩潰
- dispose()方法可以在觀察者一端決定是夠繼續(xù)接收事件
Observable創(chuàng)建
在上面的例子中創(chuàng)建Observable使用的是create方法遮精,除此之外居夹,還有其他不同的創(chuàng)建方法:
-
just
使用該方式創(chuàng)建的Observable對象在創(chuàng)建完成后會自動調(diào)用onNext方法败潦,依次發(fā)送創(chuàng)建時傳入的數(shù)據(jù)。
Observable<String> observable = Observable.just("aa","bb","cc");
-
fromIterable
這種方式接收一個List集合作為參數(shù)准脂,創(chuàng)建完成后會遍歷該集合并調(diào)用onNext方法來發(fā)送數(shù)據(jù):
List<String> list = new ArrayList<>() ;
list.add( "aa" ) ;
list.add( "bb" ) ;
list.add( "cc" ) ;
Observable<String> observable2 = Observable.fromIterable(list) ;
-
defer
defer是推遲的意思劫扒,observable在觀察者進(jìn)行訂閱的時候才創(chuàng)建對象,針對每一個觀察者狸膏,都會創(chuàng)建一個新的observable對象:
Observable<String> observable4 = Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
return Observable.just("aa" , "bb" , "cc");
}
});
-
interval
采用該方式創(chuàng)建的observable會按照指定的時間間隔沟饥,從0開始發(fā)送整數(shù)序列。
//每隔兩秒發(fā)送一個整數(shù)
Observable.interval(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.d("RxJava", "accept " + aLong );
}
});
-
range
range接收兩個整型參數(shù)湾戳,第一個表示起始值贤旷,第二個表示發(fā)送次數(shù)N。observable會從初始值開始院塞,調(diào)用N次onNext遮晚,發(fā)送數(shù)據(jù)從初始值開始依次加1.
Observable.range(10, 20).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d("RxJava", "accept1 " + integer );//輸出結(jié)果為10,11,12,......29
}
});
-
timer
observable在指定的時間發(fā)送事件調(diào)用onNext:
//表示5秒后發(fā)送事件
Observable.timer(5, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.d("RxJava", "timer " + aLong );//默認(rèn)發(fā)送值為0
}
});
-
repeat
repeat修飾的observable對象可以重復(fù)發(fā)送事件,并且可以指定重復(fù)次數(shù)拦止。如果不指定县遣,默認(rèn)為一直重復(fù)。
Observable.timer(5, TimeUnit.SECONDS).repeat()//.repeat(3);
Observer創(chuàng)建
Observer的創(chuàng)建很簡單汹族,上面的例子中已經(jīng)給出來了:
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
//觀察者接收到通知,進(jìn)行相關(guān)操作
public void onNext(String aLong) {
System.out.println("收到事件");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
為了代碼簡潔易讀萧求,通常會在訂閱的時候直接創(chuàng)建Observer,訂閱的方法subscribe有以下幾種重載:
public final Disposable subscribe() {}/
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}
最開始的例子中顶瞒,采用的是最后一種方法夸政,傳入一個Observer對象。
不帶參數(shù)的subscribe榴徐,第一種方法守问,表示Observable負(fù)責(zé)發(fā)送數(shù)據(jù),但是沒有觀察者去接收坑资。
只有一個參數(shù)Consumer<? super T> onNext表示Observer只需要處理onNext 方法耗帕。剩余的幾種重載同理。