基本API補(bǔ)充
1.不完整回調(diào)函數(shù) Action1 中call() 方法
Observable<String> observable = Observable.just("hello", "world!");
// 我們之前的寫法
// observable.subscribe(new Observer<String>() {
//
// @Override
// public void onCompleted() {
//
// }
//
// @Override
// public void onError(Throwable e) {
//
// }
//
// @Override
// public void onNext(String t) {
// Log.i("main", "值:" + t);
// }
// });
observable.subscribe(new Action1<String>() {
/**
* 相當(dāng)于onNext
*/
@Override
public void call(String t) {
Log.e("main", "值:" + t);
}
});
// observable.subscribe(onNext, onError)
// observable.subscribe(onNext, onError, onCompleted);
結(jié)果輸出:
08-07 02:46:32.001 4533-4533/com.haocai.architect.rxjava E/main: 值:hello
08-07 02:46:32.001 4533-4533/com.haocai.architect.rxjava E/main: 值:world!
call()相當(dāng)于onNext方法
2.過濾函數(shù)
(1) filter
filter(Func1)用來過濾觀測序列中我們不想要的值,只返回滿足條件的值败富,我們看下原理圖:
public class FilterActivity extends Activity {
private Observable<AppInfo> observable;
private AppInfoAdapter appInfoAdapter;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_simple9);
observable = getApps();
initView();
}
private void initView() {
ListView listView = (ListView) findViewById(R.id.lv_app_name);
appInfoAdapter = new AppInfoAdapter(this);
listView.setAdapter(appInfoAdapter);
}
/**
* 創(chuàng)建Observable
*
* @return
*/
private Observable<AppInfo> getApps() {
AppInfo appInfo1 = new AppInfo("Xiong", 0);
AppInfo appInfo2 = new AppInfo("Tony", 0);
AppInfo appInfo3 = new AppInfo("Tomcat", 0);
AppInfo appInfo4 = new AppInfo("Lucy", 0);
AppInfo appInfo5 = new AppInfo("Lucy pioneer", 0);
return Observable
.just(appInfo1, appInfo2, appInfo3, appInfo4, appInfo5).filter(
new Func1<AppInfo, Boolean>() {
@Override
public Boolean call(AppInfo t) {
return t.getName().contains("Lucy");
}
});
}
public void click(View v) {
observable.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
//完成之后刷新UI
appInfoAdapter.notifyDataSetChanged();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(AppInfo t) {
Log.e("main",t.getName());
//添加數(shù)據(jù)
appInfoAdapter.addAppInfo(t);
}
});
}
}
結(jié)果輸出:
08-07 03:19:10.492 32521-32521/com.haocai.architect.rxjava E/main: Lucy
08-07 03:19:10.492 32521-32521/com.haocai.architect.rxjava E/main: Lucy pioneer
filter相關(guān)源碼:
public final class OnSubscribeFilter<T> implements OnSubscribe<T> {
final Observable<T> source;
final Func1<? super T, Boolean> predicate;
public OnSubscribeFilter(Observable<T> source, Func1<? super T, Boolean> predicate) {
this.source = source;
this.predicate = predicate;
}
@Override
public void call(final Subscriber<? super T> child) {
FilterSubscriber<T> parent = new FilterSubscriber<T>(child, predicate);
child.add(parent);
source.unsafeSubscribe(parent);
}
static final class FilterSubscriber<T> extends Subscriber<T> {
final Subscriber<? super T> actual;
final Func1<? super T, Boolean> predicate;
boolean done;
public FilterSubscriber(Subscriber<? super T> actual, Func1<? super T, Boolean> predicate) {
this.actual = actual;
this.predicate = predicate;
request(0);
}
@Override
public void onNext(T t) {
boolean result;
try {
result = predicate.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
if (result) {
actual.onNext(t);
} else {
request(1);
}
}
@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;
actual.onError(e);
}
@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}
@Override
public void setProducer(Producer p) {
super.setProducer(p);
actual.setProducer(p);
}
}
}
(2) take(獲取前幾位或指定范圍)
/**
* 創(chuàng)建Observable
*
* @return
*/
private Observable<AppInfo> getApps() {
AppInfo appInfo1 = new AppInfo("Xiong", 0);
AppInfo appInfo2 = new AppInfo("Tony", 0);
AppInfo appInfo3 = new AppInfo("Tomcat", 0);
AppInfo appInfo4 = new AppInfo("Lucy", 0);
AppInfo appInfo5 = new AppInfo("Lucy pioneer", 0);
//獲取當(dāng)前數(shù)據(jù)前兩條
return Observable
.just(appInfo1, appInfo2, appInfo3, appInfo4, appInfo5).take(2);
}
public void click(View v) {
observable.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
//完成之后刷新UI
appInfoAdapter.notifyDataSetChanged();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(AppInfo t) {
Log.e("main",t.getName());
//添加數(shù)據(jù)
appInfoAdapter.addAppInfo(t);
}
});
}
結(jié)果輸出:
08-07 05:54:16.887 12684-12684/com.haocai.architect.rxjava E/main: Xiong
08-07 05:54:16.887 12684-12684/com.haocai.architect.rxjava E/main: Tony
take相關(guān)源碼
public final class OperatorTake<T> implements Operator<T, T> {
final int limit;
public OperatorTake(int limit) {
if (limit < 0) {
throw new IllegalArgumentException("limit >= 0 required but it was " + limit);
}
this.limit = limit;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final Subscriber<T> parent = new Subscriber<T>() {
int count;
boolean completed;
@Override
public void onCompleted() {
if (!completed) {
completed = true;
child.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if (!completed) {
completed = true;
try {
child.onError(e);
} finally {
unsubscribe();
}
}
}
@Override
public void onNext(T i) {
if (!isUnsubscribed() && count++ < limit) {
boolean stop = count == limit;
child.onNext(i);
if (stop && !completed) {
completed = true;
try {
child.onCompleted();
} finally {
unsubscribe();
}
}
}
}
/**
* We want to adjust the requested values based on the `take` count.
*/
@Override
public void setProducer(final Producer producer) {
child.setProducer(new Producer() {
// keeps track of requests up to maximum of `limit`
final AtomicLong requested = new AtomicLong(0);
@Override
public void request(long n) {
if (n > 0 && !completed) {
// because requests may happen concurrently use a CAS loop to
// ensure we only request as much as needed, no more no less
while (true) {
long r = requested.get();
long c = Math.min(n, limit - r);
if (c == 0) {
break;
} else if (requested.compareAndSet(r, r + c)) {
producer.request(c);
break;
}
}
}
}
});
}
};
if (limit == 0) {
child.onCompleted();
parent.unsubscribe();
}
/*
* We decouple the parent and child subscription so there can be multiple take() in a chain such as for
* the groupBy Observer use case where you may take(1) on groups and take(20) on the children.
*
* Thus, we only unsubscribe UPWARDS to the parent and an onComplete DOWNSTREAM.
*
* However, if we receive an unsubscribe from the child we still want to propagate it upwards so we
* register 'parent' with 'child'
*/
child.add(parent);
return parent;
}
}
(3) takeLast (獲取最后幾位)
/**
* 創(chuàng)建Observable
*
* @return
*/
private Observable<AppInfo> getApps() {
AppInfo appInfo1 = new AppInfo("Xiong", 0);
AppInfo appInfo2 = new AppInfo("Tony", 0);
AppInfo appInfo3 = new AppInfo("Tomcat", 0);
AppInfo appInfo4 = new AppInfo("Lucy", 0);
AppInfo appInfo5 = new AppInfo("Lucy pioneer", 0);
//獲取當(dāng)前數(shù)據(jù)前兩條
return Observable
.just(appInfo1, appInfo2, appInfo3, appInfo4, appInfo5).takeLast(2);
}
public void click(View v) {
observable.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
//完成之后刷新UI
appInfoAdapter.notifyDataSetChanged();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(AppInfo t) {
Log.e("main",t.getName());
//添加數(shù)據(jù)
appInfoAdapter.addAppInfo(t);
}
});
}
結(jié)果輸出:
08-07 06:16:08.483 32192-32192/com.haocai.architect.rxjava E/main: Lucy
08-07 06:16:08.483 32192-32192/com.haocai.architect.rxjava E/main: Lucy pioneer
(4) distinct (去重)
private Observable<String> getApps() {
//獲取當(dāng)前數(shù)據(jù)前兩條
return Observable.just("Tony","pioneer", "Tomcat","Tony","Lucy","Tomcat","Tony").distinct();
}
public void click(View v) {
observable.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
//完成之后刷新UI
appInfoAdapter.notifyDataSetChanged();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String t) {
Log.e("main",t);
}
});
}
結(jié)果輸出:
08-07 06:47:40.045 28387-28387/com.haocai.architect.rxjava E/main: Tony
08-07 06:47:40.045 28387-28387/com.haocai.architect.rxjava E/main: pioneer
08-07 06:47:40.045 28387-28387/com.haocai.architect.rxjava E/main: Tomcat
08-07 06:47:40.045 28387-28387/com.haocai.architect.rxjava E/main: Lucy
(5) distinctUntilChanged(去除位置相鄰重復(fù)數(shù)據(jù))
/**
* 創(chuàng)建Observable
*
* @return
*/
private Observable<String> getApps() {
list = new ArrayList<String>();
list.add("Michael");
list.add("Michael");
list.add("pioneer");
list.add("Michael");
list.add("Michael");
list.add("Huni");
list.add("Huni");
list.add("Huni");
list.add("King");
list.add("Huni");
return Observable.from(list).distinctUntilChanged();
}
public void click(View v) {
observable.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String t) {
Log.e("main", "過濾后的值: " + t);
}
});
}
結(jié)果輸出:
08-07 07:46:45.444 17378-17378/com.haocai.architect.rxjava E/main: 過濾后的值: Michael
08-07 07:46:45.445 17378-17378/com.haocai.architect.rxjava E/main: 過濾后的值: pioneer
08-07 07:46:45.445 17378-17378/com.haocai.architect.rxjava E/main: 過濾后的值: Michael
08-07 07:46:45.445 17378-17378/com.haocai.architect.rxjava E/main: 過濾后的值: Huni
08-07 07:46:45.445 17378-17378/com.haocai.architect.rxjava E/main: 過濾后的值: King
08-07 07:46:45.445 17378-17378/com.haocai.architect.rxjava E/main: 過濾后的值: Huni
(6) First
first()顧名思義庆亡,它是的Observable只發(fā)送觀測序列中的第一個數(shù)據(jù)項献幔。
private Observable<String> getApps() {
list = new ArrayList<String>();
list.add("Michael");
list.add("pioneer");
list.add("Huni");
list.add("King");
list.add("Cookie");
// first:發(fā)送序列中第一個值(內(nèi)部調(diào)用了take(1).single())
// last:發(fā)送最后一個(內(nèi)部調(diào)用了takeLast(1).single())
return Observable.from(list).first();
// return Observable.from(list).last();
}
public void click(View v) {
observable.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String t) {
Log.e("main", "過濾后的值: " + t);
}
});
}
結(jié)果輸出:
08-07 08:30:57.648 25076-25076/com.haocai.architect.rxjava E/main: 過濾后的值: Michael
(7) Last
last()只發(fā)射觀測序列中的最后一個數(shù)據(jù)項楞黄。
private Observable<String> getApps() {
list = new ArrayList<String>();
list.add("Michael");
list.add("pioneer");
list.add("Huni");
list.add("King");
list.add("Cookie");
// first:發(fā)送序列中第一個值(內(nèi)部調(diào)用了take(1).single())
// last:發(fā)送最后一個(內(nèi)部調(diào)用了takeLast(1).single())
return Observable.from(list).last();
}
結(jié)果輸出:
08-07 08:30:57.648 25076-25076/com.haocai.architect.rxjava E/main: 過濾后的值: Cookie
(8)Skip
skip(int)讓我們可以忽略O(shè)bservable發(fā)射的前n項數(shù)據(jù)涝影。
/**
* 創(chuàng)建Observable
*
* @return
*/
private Observable<String> getApps() {
list = new ArrayList<String>();
list.add("Michael");
list.add("Pioneer");
list.add("Huni");
list.add("King");
list.add("Cookie");
list.add("Faker");
list.add("Gigi");
// skip:從頭開始,跳過多少個,然后在發(fā)送
// skipLast:最后面的多少個我不需要
return Observable.from(list).skip(2);
}
public void click(View v) {
observable.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String t) {
Log.i("main", "過濾后的值: " + t);
}
});
}
08-07 08:51:58.061 12238-12238/com.haocai.architect.rxjava I/main: 過濾后的值: Huni
08-07 08:51:58.061 12238-12238/com.haocai.architect.rxjava I/main: 過濾后的值: King
08-07 08:51:58.061 12238-12238/com.haocai.architect.rxjava I/main: 過濾后的值: Cookie
08-07 08:51:58.061 12238-12238/com.haocai.architect.rxjava I/main: 過濾后的值: Faker
08-07 08:51:58.061 12238-12238/com.haocai.architect.rxjava I/main: 過濾后的值: Gigi
(9)SkipLast
skipLast(int)忽略O(shè)bservable發(fā)射的后n項數(shù)據(jù)魄眉。
/**
* 創(chuàng)建Observable
*
* @return
*/
private Observable<String> getApps() {
list = new ArrayList<String>();
list.add("Michael");
list.add("Pioneer");
list.add("Huni");
list.add("King");
list.add("Cookie");
list.add("Faker");
list.add("Gigi");
// skip:從頭開始,跳過多少個砰盐,然后在發(fā)送
// skipLast:最后面的多少個我不需要
return Observable.from(list).skipLast(2);
}
結(jié)果輸出:
08-07 08:50:01.719 10295-10295/com.haocai.architect.rxjava I/main: 過濾后的值: Michael
08-07 08:50:01.719 10295-10295/com.haocai.architect.rxjava I/main: 過濾后的值: Pioneer
08-07 08:50:01.719 10295-10295/com.haocai.architect.rxjava I/main: 過濾后的值: Huni
08-07 08:50:01.719 10295-10295/com.haocai.architect.rxjava I/main: 過濾后的值: King
08-07 08:50:01.719 10295-10295/com.haocai.architect.rxjava I/main: 過濾后的值: Cookie
(10)SkipLast
elementAt(int)用來獲取元素Observable發(fā)射的事件序列中的第n項數(shù)據(jù),并當(dāng)做唯一的數(shù)據(jù)發(fā)射出去坑律。
private Observable<String> getApps() {
list = new ArrayList<String>();
list.add("Michael");
list.add("Pioneer");
list.add("Huni");
list.add("King");
list.add("Cookie");
list.add("Faker");
list.add("Gigi");
// skip:從頭開始,跳過多少個岩梳,然后在發(fā)送
// skipLast:最后面的多少個我不需要
return Observable.from(list).elementAt(2);
}
08-07 09:01:17.495 20645-20645/com.haocai.architect.rxjava I/main: 過濾后的值: Huni
(11)Sample
sample操作符是定期掃描源Observable產(chǎn)生的結(jié)果,在指定的間隔周期內(nèi)進(jìn)行采樣
獲得定期發(fā)射Observable最近的數(shù)據(jù)
例一
observable.interval(1, TimeUnit.SECONDS).sample(2, TimeUnit.SECONDS).subscribe(
new Observer<Long>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Long t) {
Log.i("main", "接收到的值: " + t);
}
});
08-07 09:36:08.478 20117-20195/com.haocai.architect.rxjava I/main: 接收到的值: 0
08-07 09:36:10.477 20117-20195/com.haocai.architect.rxjava I/main: 接收到的值: 2
08-07 09:36:12.478 20117-20195/com.haocai.architect.rxjava I/main: 接收到的值: 4
08-07 09:36:14.479 20117-20195/com.haocai.architect.rxjava I/main: 接收到的值: 6
08-07 09:36:16.478 20117-20195/com.haocai.architect.rxjava I/main: 接收到的值: 8
......
例二
Observable.create(subscriber -> {
subscriber.onNext(1);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(2);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(3);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
subscriber.onNext(4);
subscriber.onNext(5);
subscriber.onCompleted();
}).sample(999, TimeUnit.MILLISECONDS)//或者為throttleLast(1000, TimeUnit.MILLISECONDS)
.subscribe(item-> Log.d("JG",item.toString()));
//結(jié)果為2,3,5
(12)Timeout
timeout: 如果原始Observable過了指定的一段時長沒有發(fā)射任何數(shù)據(jù)脾歇,就發(fā)射一個異辰或者使用備用的Observable淘捡。
private Observable<String> getApps() {
observable = Observable
.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> observer) {
observer.onNext("Kpioneer");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
observer.onNext("Lucy");
observer.onCompleted();
}
});
return observable;
}
public void click(View v) {
observable.timeout(999, TimeUnit.MILLISECONDS,Observable.just("Michel","QQ")).subscribe(
new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String t) {
Log.i("main", "接收到的值: " + t);
}
});
}
結(jié)果輸出:
08-07 10:02:30.806 11757-11757/com.haocai.architect.rxjava I/main: 接收到的值: Kpioneer
08-07 10:02:31.808 11757-11824/com.haocai.architect.rxjava I/main: 接收到的值: Michel
08-07 10:02:31.808 11757-11824/com.haocai.architect.rxjava I/main: 接收到的值: QQ
}
如果不指定備用Observable結(jié)果為Kpioneer, onError
3.變換操作
(1) Map
map()函數(shù)接受一個Func1類型的參數(shù)(就像這樣map(Func1<? super T, ? extends R> func)),然后把這個Func1應(yīng)用到每一個由Observable發(fā)射的值上藕各,將發(fā)射的值轉(zhuǎn)換為我們期望的值。這種狗屁定義我相信你也聽不懂焦除,我們來看一下官方給出的原理圖:
userModelList = new ArrayList<UserModel>();
for (int i = 0; i < 3; i++) {
UserModel userModel = new UserModel("userId_" + i, "userName_" + i);
List<OrderModel> orderList = new ArrayList<OrderModel>();
for (int j = 0; j < 2; j++) {
OrderModel orderModel = new OrderModel("userId_" + i
+ "_orderId_" + j, "user_" + i + "_orderName_" + j);
orderList.add(orderModel);
}
userModel.setOrderList(orderList);
userModelList.add(userModel);
}
Observable.from(userModelList).map(new Func1<UserModel, String>() {
@Override
public String call(UserModel userModel) {
return userModel.getUserName();
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i("main", "轉(zhuǎn)換之后的值:" +s);
}
});
08-07 11:39:51.493 2499-2499/com.haocai.architect.rxjava I/main: 轉(zhuǎn)換之后的值:userName_0
08-07 11:39:51.493 2499-2499/com.haocai.architect.rxjava I/main: 轉(zhuǎn)換之后的值:userName_1
08-07 11:39:51.493 2499-2499/com.haocai.architect.rxjava I/main: 轉(zhuǎn)換之后的值:userName_2
(2) flatmap
flatMap()的原理是這樣的:
1.將傳入的事件對象裝換成一個Observable對象激况;
2.這是不會直接發(fā)送這個Observable, 而是將這個Observable激活讓它自己開始發(fā)送事件;
3.每一個創(chuàng)建出來的Observable發(fā)送的事件,都被匯入同一個Observable乌逐,這個Observable負(fù)責(zé)將這些事件統(tǒng)一交給Subscriber的回調(diào)方法竭讳。
這三個步驟,把事件拆成了兩級浙踢,通過一組新創(chuàng)建的Observable將初始的對象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去绢慢。而這個『鋪平』就是flatMap()所謂的flat。
最后我們來看看flatMap的原理圖:
從前面的例子中你坑定發(fā)現(xiàn)了洛波,flatMap()和map()都是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個對象胰舆。但和map()不同的是,flatMap()中返回的是Observable對象蹬挤,并且這個Observable對象并不是被直接發(fā)送到 Subscriber的回調(diào)方法中缚窿。
userModelList = new ArrayList<UserModel>();
for (int i = 0; i < 3; i++) {
UserModel userModel = new UserModel("userId_" + i, "userName_" + i);
List<OrderModel> orderList = new ArrayList<OrderModel>();
for (int j = 0; j < 2; j++) {
OrderModel orderModel = new OrderModel("userId_" + i
+ "_orderId_" + j, "user_" + i + "_orderName_" + j);
orderList.add(orderModel);
}
userModel.setOrderList(orderList);
userModelList.add(userModel);
}
// flatmap提供這樣的解決方案(權(quán)衡)
// 場景:解決會到接口嵌套問題(例如:授權(quán)認(rèn)證成功之后,登錄場景)
Observable.from(userModelList).flatMap(new Func1<UserModel, Observable<OrderModel>>() {
@Override
public Observable<OrderModel> call(UserModel userModel) {
return Observable.from(userModel.getOrderList());
}
}).subscribe(new Action1<OrderModel>() {
@Override
public void call(OrderModel orderModel) {
Log.i("main", "轉(zhuǎn)換之后的值:" +orderModel.getOrderId());
}
});