創(chuàng)建一個Observable
- Observable.create()
該方法接收一個Obsubscribe對象
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
}
});
來個例子:
Observable<Integer> observable=Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for(int i=0;i<5;i++){
subscriber.onNext(i);
}
subscriber.onCompleted();
}
});
//Observable.subscribe(Observer)文兑,Observer訂閱了Observable
Subscription subscribe = observable.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "完成");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "異常");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "接收Obsverable中發(fā)射的值:" + integer);
}
});
輸出:
接收Obsverable中發(fā)射的值:0
接收Obsverable中發(fā)射的值:1
接收Obsverable中發(fā)射的值:2
接收Obsverable中發(fā)射的值:3
接收Obsverable中發(fā)射的值:4
完成
從上面的例子可以看出,在Observer訂閱了Observable后,Observer作為OnSubscribe中call方法的參數(shù)傳入饭聚,從而調(diào)用了Observer的相關(guān)方法
- Observable.from()
該方法需要一個 數(shù)組或集合參數(shù)
假如現(xiàn)在我們有一個集合蹋岩,我們能否也像上面一樣但不使用for循環(huán)脏毯,一個個發(fā)送給Observer疚颊,發(fā)送完后再調(diào)用onCompleted方法呢?
//創(chuàng)建一個集合
List<Integer> list=new ArrayList<>();
for(int i=0;i<5;i++){
list.add(i);
}
//使用Observable.from接收上面的list集合
Observable<Integer> observable=Observable.from(list);
//Observer訂閱Observable
Subscription subscribe = observable.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "完成");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "異常");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "接收Obsverable中發(fā)射的值:" + integer);
}
});
輸出:
接收Obsverable中發(fā)射的值:0
接收Obsverable中發(fā)射的值:1
接收Obsverable中發(fā)射的值:2
接收Obsverable中發(fā)射的值:3
接收Obsverable中發(fā)射的值:4
完成
可以看到使用Observable.from()和1中的效果一樣
- Observable.just()
該方法可接收 1到9 個同一任意類型的參數(shù)
假如我們有一個帶返回值的方法彭雾,我們也想使用Observable對其操作碟刺,該怎么辦呢?
private List<Integer> getDatas(){//提供數(shù)據(jù)的方法
List<Integer> list=new ArrayList<>();
for(int i=0;i<5;i++){
list.add(i);
}
return list;
}
//使用Observable.just調(diào)用上面的getDatas()方法薯酝,注意這里得到的Observable的類型為
//List<Integer>半沽,而非Integer
Observable<List<Integer>> observable=Observable.just(getDatas());
Subscription subscribe = observable.subscribe(new Observer<List<Integer>>() {
@Override
public void onCompleted() {
Log.e(TAG, "完成");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "異常");
}
@Override
public void onNext(List<Integer> integers) {
for(Integer integer: integers)
Log.e(TAG, "接收Obsverable中發(fā)射的值:" + integer);
}
});
輸出:
接收Obsverable中發(fā)射的值:0
接收Obsverable中發(fā)射的值:1
接收Obsverable中發(fā)射的值:2
接收Obsverable中發(fā)射的值:3
接收Obsverable中發(fā)射的值:4
完成
從上面的代碼,我們可以看出just與from的區(qū)別吴菠,just是將集合直接作為一個參數(shù)發(fā)送給Observer,而from是將
集合中所有的元素一個一個的發(fā)送給Observer者填。假如上面代碼中的getDatas方法返回的不是一個集合,我們也就不能使用from了應(yīng)該使用just做葵。
在發(fā)送數(shù)據(jù)后占哟,just也會自動調(diào)用onCompleted方法。
-
其他方式:
當我們需要一個Observable毫無理由的不再發(fā)射數(shù)據(jù)正常結(jié)束時,我們可以使用empty()
榨乎。我們可以使用never()
創(chuàng)建一個不發(fā)射數(shù)據(jù)并且也永遠不會結(jié)束的Observable怎燥。我們也可以使用throw()
創(chuàng)建一個不發(fā)射數(shù)據(jù)并且以錯誤結(jié)束的Observable。
特殊類Subject
為啥說它特殊呢蜜暑?
因為它繼承了Observable并且實現(xiàn)Observer接口铐姚,所以說它即可已是一個Observer又可以是一個Observable,它包含4個子類:
- PublishSubject
- BehaviorSubject
- ReplaySubject
- AsyncSubject
- PublishSubject
//創(chuàng)建一個Subject這里看起來是不是缺點什么肛捍,對比上面的Observable.create()方法
PublishSubject<String> subject = PublishSubject.create();
Subscription subscription = subject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.e(TAG, "完成");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "異常");
}
@Override
public void onNext(String s) {
Log.e(TAG,"接收PublishSubject發(fā)送的值:"+s);
}
});
//手動發(fā)送數(shù)據(jù)給訂閱它的Observer
subject.onNext("PublishSubject");
輸出:
接收PublishSubject發(fā)送的值:PublishSubject
從上面代碼可以看出隐绵,我們需手動發(fā)送數(shù)據(jù)給訂閱它的Observer。所以這里就與Observable不同:
在Observable中拙毫,如果有訂閱者訂閱了它依许,它就會馬上自動發(fā)送數(shù)據(jù)給Observer。
而對于Subject而言缀蹄,它在被訂閱的時候并不會自動發(fā)送數(shù)據(jù)給Observer峭跳,發(fā)送數(shù)據(jù)的控制權(quán)交給了我們,在我們發(fā)送數(shù)據(jù)之前缺前,訂閱的Observer會一直處于等待狀態(tài)坦康,但是這種等待并不會阻塞線程,也不會消耗太多的資源诡延。
當然我們也可以像Observable一樣使用Subject,沒有區(qū)別古胆。
- ReplaySubject
//PublishSubject<Integer> subject=PublishSubject.create();
ReplaySubject<Integer> subject= ReplaySubject.create();
subject.onNext(1);// 1
subject.subscribe(new Action1<Integer>() {// 2
@Override
public void call(Integer integer) {
Log.e(TAG,""+integer);
}
});
subject.onNext(2);// 3
subject.onNext(3);// 4
subject.onNext(4);// 5
這里沒有給出輸出肆良,那我們先來猜猜如果上面的subject是PublishSubject情況下輸出的是什么吧?
執(zhí)行到1時逸绎,由于subject沒有訂閱者訂閱惹恃,所以發(fā)送出去的數(shù)據(jù)1也就沒有訂閱者接收
執(zhí)行到2時,subject有訂閱者訂閱了(這里的Action1相當于一個實現(xiàn)了onNext方法的Observer對象)
執(zhí)行到3 4 5時棺牧,subject發(fā)送的數(shù)據(jù)都會被2中的訂閱者接收到巫糙,從而輸出2、3颊乘、4参淹。
PublishSubject發(fā)送數(shù)據(jù)時,會將數(shù)據(jù)發(fā)送給訂閱了它的所有Observer
那subject是ReplaySubject輸出的是怎樣呢乏悄?
Replay是不是重新播放的意思呢浙值,這里的重新播放是指 當Observer訂閱ReplaySubject時,會將ReplaySubject之前發(fā)送過的數(shù)據(jù)檩小,重新發(fā)送給該Observer开呐,所以這里會輸出1 2 3 4。
再來看看另外2個ReplaySubject.createXXX()方法
- ReplaySubject.createWithSize(int size)
如果在Observer訂閱該ReplaySubject前,ReplaySubject發(fā)送數(shù)據(jù)的個數(shù)大于size,那么超出size部分的數(shù)據(jù)
不會發(fā)送給Observer筐付。
ReplaySubject<Integer> subject= ReplaySubject.createWithSize(2);
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
subject.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(TAG,""+integer);
}
});
subject.onNext(4);
輸出:2 3 4
- ReplaySubject.createWithTime(int time, TimeUnit unit, Scheduler scheduler);
需要3個參數(shù)卵惦,第12個用來確定時間,第3個傳入一個Scheduler
該方法表示如果ReplaySubject發(fā)送數(shù)據(jù)的時間超過了指定的時間瓦戚,將不會重新發(fā)送給新訂閱的Observer
- BehaviorSubject
BehaviorSubject subject=BehaviorSubject.create();
subject.onNext(1);
subject.onNext(2);
subject.onNext(3);
subject.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(TAG,""+integer);
}
});
subject.onNext(4);
subject.onNext(5);
輸出:
3 4 5
BehaviorSubject可以當作ReplaySubject來看沮尿,它只接收Observer訂閱前BehaviorSubject發(fā)送的最后一條數(shù)據(jù)。
- AsyncSubject
AsyncSubject subject=AsyncSubject.create();
subject.onNext(1);
subject.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(TAG,"A"+integer);
}
});
subject.onNext(2);
subject.onNext(3);
subject.onCompleted();
subject.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(TAG,"B"+integer);
}
});
輸出:
A3 B3
AsyncSubject會將執(zhí)行過的一個完整的事件緩存起來(最后一個subject.onNext() +subject.onCompleted()),然后會發(fā)送給所有訂閱它的Observer伤极。