RxJava操作符系列傳送門
RxJava操作符源碼
RxJava操作符系列一
RxJava操作符系列二
RxJava操作符系列三
RxJava操作符系列四
今天就不啰嗦了蜗帜,直接開始我們今天的學習。今天介紹一些輔助操作符。
Delay
該操作符讓原始Observable在發(fā)射每項數(shù)據(jù)之前都暫停一段指定的時間。它接受一個定義時長的參數(shù)(包括long型數(shù)據(jù)和單位)憎乙。每當原始Observable發(fā)射一項數(shù)據(jù)翅娶,delay就啟動一個定時器弄屡,當定時器過了給定的時間段時,delay返回的Observable發(fā)射相同的數(shù)據(jù)項搬设。他默認是在computation調(diào)度器上執(zhí)行穴店,當然也有重載方法可以指定調(diào)度器,若發(fā)射數(shù)據(jù)后有更新UI操作需將調(diào)度器指定AndroidSchedulers.mainThread()拿穴。(注意重載方法delay(Fun1)泣洞,delay(Fun0,Fun1)是默認不在任何特定的調(diào)度器上執(zhí)行)
示例代碼
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.e(TAG, "call: "+new SimpleDateFormat("yyyy/MM/dd HH:MM:ss").format(new Date()));
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onNext(4);
subscriber.onCompleted();
}
}).delay(2,TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: "+new SimpleDateFormat("yyyy/MM/dd HH:MM:ss").format(new Date()));
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: "+new SimpleDateFormat("yyyy/MM/dd HH:MM:ss").format(new Date())+e.toString());
}
@Override
public void onNext(Integer integer) {
tv1.append("\n"+new SimpleDateFormat("yyyy/MM/ddHH:MM:ss").format(new Date())+" "+integer);
Log.e(TAG, "onNext: "+new SimpleDateFormat("yyyy/MM/dd HH:MM:ss").format(new Date())+integer);
}
});
輸出日志信息
call: 2016/12/17 20:12:07
onNext: 2016/12/17 20:12:091
onNext: 2016/12/17 20:12:092
onNext: 2016/12/17 20:12:093
onNext: 2016/12/17 20:12:094
onCompleted: 2016/12/17 20:12:09
為了讓你看到延遲效果,我把call和onNext()回調(diào)的時間也打印出來默色,發(fā)送最終數(shù)據(jù)是延遲兩秒發(fā)送的球凰。
delaySubscription
該操作符也是delay的一種實現(xiàn),它和dealy的區(qū)別是dealy是延遲數(shù)據(jù)的發(fā)送腿宰,而此操作符是延遲數(shù)據(jù)的注冊呕诉,指定延遲時間的重載方法是執(zhí)行在computation調(diào)度器的。為了方便觀察延遲注冊效果吃度,創(chuàng)建Observable變量甩挫。如下示例代碼
Observable observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.e(TAG, "call: "+new SimpleDateFormat("yyyy/MM/dd HH:MM:ss").format(new Date()));
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onNext(4);
subscriber.onCompleted();
}
});
Log.e(TAG, "call11: "+new SimpleDateFormat("yyyy/MM/dd HH:MM:ss").format(new Date()));
observable.delaySubscription(2,TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: "+new SimpleDateFormat("yyyy/MM/dd HH:MM:ss").format(new Date()));
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: "+new SimpleDateFormat("yyyy/MM/dd HH:MM:ss").format(new Date())+e.toString());
}
@Override
public void onNext(Integer integer) {
tv1.append("\n"+new SimpleDateFormat("yyyy/MM/ddHH:MM:ss").format(new Date())+" "+integer);
Log.e(TAG, "onNext: "+new SimpleDateFormat("yyyy/MM/dd HH:MM:ss").format(new Date())+integer);
}
});
輸出日志信息
call11: 2016/12/17 20:12:43
call: 2016/12/17 20:12:45
onNext: 2016/12/17 20:12:451
onNext: 2016/12/17 20:12:452
onNext: 2016/12/17 20:12:453
onNext: 2016/12/17 20:12:454
onCompleted: 2016/12/17 20:12:45
Do
對于do系列操作符理解比較容易,他相當于給Observable執(zhí)行周期的關鍵節(jié)點添加回調(diào)椿每。當Observable執(zhí)行到這個階段的時候伊者,這些回調(diào)就會被觸發(fā)。在Rxjava do系列操作符有多個间护,如doOnNext删壮,doOnSubscribe,doOnUnsubscribe兑牡,doOnCompleted央碟,doOnError,doOnTerminate和doOnEach均函。
當Observable每發(fā)送一個數(shù)據(jù)時亿虽,doOnNext會被首先調(diào)用,然后再onNext苞也。若發(fā)射中途出現(xiàn)異常doOnError會被調(diào)用洛勉,然后onError。若數(shù)據(jù)正常發(fā)送完畢doOnCompleted會被觸發(fā)如迟,然后執(zhí)行onCompleted收毫。當訂閱或者解除訂閱doOnSubscribe攻走,doOnUnsubscribe會被執(zhí)行。
示例代碼
Observable.just(1, 2, 3)
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(TAG, "doOnNext: " );
}
})
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.e(TAG, "doOnError: " );
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
Log.e(TAG, "doOnCompleted: " );
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
Log.e(TAG, "doOnSubscribe: " );
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
Log.e(TAG, "doOnUnsubscribe: " );
}
})
.doOnTerminate(new Action0() {
@Override
public void call() {
Log.e(TAG, "doOnTerminate: " );
}
})
.doAfterTerminate(new Action0() {
@Override
public void call() {
Log.e(TAG, "doAfterTerminate: " );
}
})
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted1: ");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError1: ");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext1: " + integer);
}
});
輸出日志信息
12-17 23:13:56.151 29946-29946/com.example.xh E/RxJava: doOnSubscribe:
12-17 23:13:56.151 29946-29946/com.example.xh E/RxJava: doOnNext:
12-17 23:13:56.155 29946-29946/com.example.xh E/RxJava: onNext1: 1
12-17 23:13:56.155 29946-29946/com.example.xh E/RxJava: doOnNext:
12-17 23:13:56.155 29946-29946/com.example.xh E/RxJava: onNext1: 2
12-17 23:13:56.155 29946-29946/com.example.xh E/RxJava: doOnNext:
12-17 23:13:56.155 29946-29946/com.example.xh E/RxJava: onNext1: 3
12-17 23:13:56.155 29946-29946/com.example.xh E/RxJava: doOnCompleted:
12-17 23:13:56.155 29946-29946/com.example.xh E/RxJava: doOnTerminate:
12-17 23:13:56.155 29946-29946/com.example.xh E/RxJava: onCompleted1:
12-17 23:13:56.155 29946-29946/com.example.xh E/RxJava: doOnUnsubscribe:
12-17 23:13:56.155 29946-29946/com.example.xh E/RxJava: doAfterTerminate:
對于doOnEach操作符此再,他接收的是一個Observable參數(shù)昔搂,相當于doOnNext,doOnError输拇,doOnCompleted綜合體摘符,如下示例代碼
Observable.just(1,2,3)
.doOnEach(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " );
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: "+integer);
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted1: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError1: " );
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext1: "+integer);
}
});
輸出日志信息
onNext: 1
onNext1: 1
onNext: 2
onNext1: 2
onNext: 3
onNext1: 3
onCompleted:
onCompleted1:
SubscribeOn/ObserveOn
該操作符指定Observable在一個特定的調(diào)度器上發(fā)送通知給觀察者 (調(diào)用觀察者的onNext, onCompleted, onError方法),當遇到一個異常時ObserveOn會立即向前傳遞這個onError終止通知策吠,它不會等待慢速消費的Observable接受任何之前它已經(jīng)收到但還沒有發(fā)射的數(shù)據(jù)項逛裤。這可能意味著onError通知會跳到(并吞掉)原始Observable發(fā)射的數(shù)據(jù)項前面。
SubscribeOn操作符的作用類似猴抹,但它是用于指定Observable本身在特定的調(diào)度器上執(zhí)行带族,它同樣會在那個調(diào)度器上給觀察者發(fā)通知。改操作符只能指定一次蟀给,如果指定多次則以第一次為準炉菲。而observeOn可以指定多次,每次指定會在observeOn下一句代碼處生效坤溃。
示例代碼
stringBuffer = new StringBuffer();
Observable.create(new Observable.OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
//不能執(zhí)行耗時操作拍霜,及更新ui
stringBuffer.append("\n" + "開始發(fā)送事件" + Thread.currentThread().getName() + "\n");
Drawable drawable = getResources().getDrawable(R.mipmap.dir);
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
//指定創(chuàng)建Observable在io中
.subscribeOn(Schedulers.io())
//由于map中做耗時操作,通過Observable指定發(fā)射數(shù)據(jù)在新的線程
.observeOn(Schedulers.newThread())
.map(new Func1<Drawable, ImageView>() {
@Override
public ImageView call(Drawable drawable) {
ImageView imageView = new ImageView(getActivity());
LinearLayout.LayoutParams params = new LinearLayout.LayoutParams(LinearLayout.LayoutParams.WRAP_CONTENT, LinearLayout.LayoutParams.WRAP_CONTENT);
imageView.setLayoutParams(params);
imageView.setImageDrawable(drawable);
return imageView;
}
})
//操作UI薪介,需要指定在主線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<ImageView>() {
@Override
public void call(ImageView imageView) {
tv.append(stringBuffer.toString() + "接收信息事件" + Thread.currentThread().getName());
layout.addView(imageView);
}
});
TimeInterval
這個操作符通過這張圖能更好的理解祠饺,這個操作符將原始Observable轉(zhuǎn)換為另一個Obserervable,后者發(fā)射一個標志替換前者的數(shù)據(jù)項汁政,這個標志表示前者的兩個連續(xù)發(fā)射物之間流逝的時間長度道偷。新的Observable的第一個發(fā)射物表示的是在觀察者訂閱原始Observable到原始Observable發(fā)射它的第一項數(shù)據(jù)之間流逝的時間長度。不存在與原始Observable發(fā)射最后一項數(shù)據(jù)和發(fā)射onCompleted通知之間時長對應的發(fā)射物记劈。
Observable.interval(1,TimeUnit.SECONDS)
.filter(new Func1<Long, Boolean>() {
@Override
public Boolean call(Long aLong) {
return aLong<5;
}
})
.timeInterval()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<TimeInterval<Long>>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ");
}
@Override
public void onNext(TimeInterval<Long> longTimeInterval) {
Log.e(TAG, "onNext: value:"+longTimeInterval.getValue()+"getIntervalInMilliseconds"+longTimeInterval.getIntervalInMilliseconds());
}
});
輸出日志信息
onNext: value:0getIntervalInMilliseconds1002
onNext: value:1getIntervalInMilliseconds999
onNext: value:2getIntervalInMilliseconds999
onNext: value:3getIntervalInMilliseconds1000
onNext: value:4getIntervalInMilliseconds1001
通過日志發(fā)現(xiàn)勺鸦,返回的TimeInterval類型數(shù)據(jù),包含時間間隔和值目木。
Timestamp
該操作符和TimeInterval一樣最終發(fā)射的都是TimeInterval類型數(shù)據(jù)换途。但是不同的是,改操作符發(fā)射數(shù)據(jù)每一項包含數(shù)據(jù)的原始發(fā)射時間(TimeInterval是時間間隔)
示例代碼
Observable.just(1,2,3,4).timestamp().subscribe(new Action1<Timestamped<Integer>>() {
@Override
public void call(Timestamped<Integer> integerTimestamped) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH:mm:ss");
Log.e(TAG, "value: " + integerTimestamped.getValue() + " time: "+sdf.format(new Date(integerTimestamped.getTimestampMillis())) );
}
});
輸出日志信息
value: 1 time: 2016-12-17-23:33:47
value: 2 time: 2016-12-17-23:33:47
value: 3 time: 2016-12-17-23:33:47
value: 4 time: 2016-12-17-23:33:47
Timeout
如果原始Observable過了指定的一段時間沒有發(fā)射任何數(shù)據(jù)刽射,Timeout操作符會以一個onError通知終止這個Observable军拟。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
try {
subscriber.onNext(1);
Thread.sleep(100);
subscriber.onNext(2);
Thread.sleep(200);
subscriber.onNext(3);
Thread.sleep(300);
subscriber.onNext(4);
Thread.sleep(400);
subscriber.onNext(5);
subscriber.onCompleted();
} catch (InterruptedException e) {
subscriber.onError(new Throwable("Error"));
e.printStackTrace();
}
}
})
//此timeout方法默認在computation調(diào)度器上執(zhí)行.
.timeout(250,TimeUnit.MILLISECONDS)
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " );
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: "+integer );
}
});
輸出日志信息
onNext: 1
onNext: 2
onNext: 3
onError:
由于發(fā)送數(shù)據(jù)3后sleep(300)超過設置的時間250ms,則執(zhí)行onError誓禁。timeout還有重載方法可以在超時的時候切換到一個我們指定的備用的Observable懈息,而不是發(fā)錯誤通知。它也默認在computation調(diào)度器上執(zhí)行摹恰。如下示例代碼
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
try {
subscriber.onNext(1);
Thread.sleep(100);
subscriber.onNext(2);
Thread.sleep(200);
subscriber.onNext(3);
Thread.sleep(300);
subscriber.onNext(4);
Thread.sleep(400);
subscriber.onNext(5);
subscriber.onCompleted();
} catch (InterruptedException e) {
subscriber.onError(new Throwable("Error"));
e.printStackTrace();
}
}
})
.timeout(250,TimeUnit.MILLISECONDS,Observable.just(10,11))
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " );
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: "+integer );
}
});
輸出日志信息
onNext: 1
onNext: 2
onNext: 3
onNext: 10
onNext: 11
onCompleted:
該操作符還有幾個重載方法如 timeout(Func1)辫继,timeout(Func1,Observable)怒见, timeout(Func0,Func1), timeout(Func0,Func1,Observable)這幾個操作符默認在immediate調(diào)度器上執(zhí)行姑宽,具體執(zhí)行效果可自行觀察代碼遣耍。
To
此系列操作符的作用是將Observable轉(zhuǎn)換為另一個對象或數(shù)據(jù)結(jié)構(gòu)。下面介紹幾個常用的to操作符低千。
toList
發(fā)射多項數(shù)據(jù)的Observable會為每一項數(shù)據(jù)調(diào)用onNext方法。你可以用toList操作符改變這個行為馏颂,讓Observable將多項數(shù)據(jù)組合成一個List示血,然后調(diào)用一次onNext方法傳遞整個列表,如果原始Observable沒有發(fā)射任何數(shù)據(jù)就調(diào)用了onCompleted救拉,toList返回的Observable會在調(diào)用onCompleted之前發(fā)射一個空列表难审。如果原始Observable調(diào)用了onError,toList返回的Observable會立即調(diào)用它的觀察者的onError方法亿絮。
Observable.just(1,2,3,4,5).toList().subscribe(new Subscriber<List<Integer>>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " );
}
@Override
public void onNext(List<Integer> integers) {
Log.e(TAG, "onNext: "+integers);
}
});
如上代碼告喊,通過toList將單個數(shù)據(jù)最終以List<Integer>的形式輸出。
ToMap
該操作符收集原始Observable發(fā)射的所有數(shù)據(jù)項到一個Map(默認是HashMap)然后發(fā)射這個Map派昧。我們可以提供一個用于生成Map的Key的函數(shù)黔姜,還可以提供一個函數(shù)轉(zhuǎn)換數(shù)據(jù)項到Map存儲的值(默認數(shù)據(jù)項本身就是值)。
示例代碼
Observable.just(1,2,3,4)
.toMap(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
//生成map的key值
return "key"+integer;
}
}).subscribe(new Subscriber<Map<String, Integer>>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ");
}
@Override
public void onNext(Map<String, Integer> integerIntegerMap) {
Log.e(TAG, "onNext: "+integerIntegerMap.toString() );
}
});
輸出日志信息
onNext: {key4=4, key3=3, key2=2, key1=1}
onCompleted:
該操作符有個兩個參數(shù)的構(gòu)造方法可以更改發(fā)射的數(shù)據(jù)的值蒂萎,如下
Observable.just(1,2,3,4)
.toMap(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "key" + integer;
}
}, new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
return integer+10;
}
})
.subscribe(new Subscriber<Map<String, Integer>>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ");
}
@Override
public void onNext(Map<String, Integer> integerIntegerMap) {
Log.e(TAG, "onNext: "+integerIntegerMap.toString() );
}
輸出日志信息
onNext: {key4=14, key3=13, key2=12, key1=11}
onCompleted:
發(fā)現(xiàn)此時指定了map的key秆吵,并且更改了發(fā)射的數(shù)據(jù)值。
toMutimap
類似于toMap五慈,不同的是纳寂,它生成的這個Map同時還是一個ArrayList(默認是這樣,你可以傳遞一個可選的工廠方法修改這個行為)泻拦。toMap(Func1)是將原Observable發(fā)送的數(shù)據(jù)保存到一個MAP中毙芜,并在參數(shù)函數(shù)中,設定key争拐。但toMultimap操作符在將數(shù)據(jù)保存到MAP前腋粥,先將數(shù)據(jù)保存到Collection,而toMap操作符將數(shù)據(jù)直接保存到MAP中架曹,并沒有再包裹一層Collection灯抛。
Observable.just(1,2,3,4)
.toMultimap(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return "key"+integer;
}
}).subscribe(new Subscriber<Map<String, Collection<Integer>>>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: ");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ");
}
@Override
public void onNext(Map<String, Collection<Integer>> integerCollectionMap) {
Log.e(TAG, "onNext: "+integerCollectionMap.toString());
}
});
輸出日志信息
onNext: {key4=[4], key3=[3], key2=[2], key1=[1]}
onCompleted:
通過上面信息,也看的兩者區(qū)別音瓷。
toSortedList
該操作符類似于toList对嚼,區(qū)別是它可以對數(shù)據(jù)進行自然排序。如下示例
Integer[] integers = {2, 3, 6, 4, 9,2, 8};
Observable.from(integers)
.toSortedList()
.flatMap(new Func1<List<Integer>, Observable<Integer>>() {
@Override
public Observable<Integer> call(List<Integer> integer) {
Log.e(TAG, "call: "+integer.toString() );
return Observable.from(integer);
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " );
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: "+integer);
tv.append("\n" + integer);
}
});
輸出日志信息
call: [2, 2, 3, 4, 6, 8, 9]
onNext: 2
onNext: 2
onNext: 3
onNext: 4
onNext: 6
onNext: 8
onNext: 9
onCompleted:
今天的這篇文章就到此結(jié)束绳慎,歡迎大家閱讀纵竖,若發(fā)現(xiàn)文中有錯誤的地方歡迎留言提出漠烧,感謝。