簡(jiǎn)述:
a library for composing asynchronous and event-based programs using obaservable for the
Java VM (一個(gè)對(duì)于構(gòu)成使用的Java虛擬機(jī)觀察序列號(hào)異步和基于事件的程序庫(kù))碰辅。
總結(jié):隨著程序邏輯變得越來(lái)越復(fù)雜寺晌,它依然能夠保持簡(jiǎn)潔。
RxJava引入的目的:異步,代碼更清晰
優(yōu)點(diǎn):采用觀察者模式鏈?zhǔn)秸{(diào)用,簡(jiǎn)潔明了,以往實(shí)現(xiàn)子線(xiàn)程主線(xiàn)程切換需自己手動(dòng)new Thread(推送線(xiàn)程池)柴信,
并且線(xiàn)程之間還需要使用handler進(jìn)行通信,Rxjava一步到位宽气,極其簡(jiǎn)單随常。
1.基礎(chǔ)概念:
Observable(?b?z?:v?bl):在觀察者模式中稱(chēng)為“被觀察者”
Observer(?b?z?:v?(r)):觀察者模式中的“觀察者”潜沦,可接收Observeable發(fā)送的數(shù)據(jù)
subscribe(s?b?skra?b):訂閱,觀察者與被觀察者绪氛,通過(guò)subscribe()方法進(jìn)行訂閱
subscriber(s?b?skra?b?(r)):也是一種觀察者唆鸡,在2.0中它與Observer沒(méi)什么實(shí)質(zhì)的區(qū)別,不同的是Subscriber與Flowable聯(lián)合使用
Flowable(f'la??bl):也是悲觀餐者的一種枣察,與Subscriber進(jìn)行配合使用争占,實(shí)現(xiàn)背壓操作
RxJava的異步實(shí)現(xiàn)方式:
讓Observable開(kāi)啟子線(xiàn)程執(zhí)行耗時(shí)操作,完成耗時(shí)操作后序目,觸發(fā)回調(diào)臂痕,通知Observer進(jìn)行主線(xiàn)程UI更新。
如此輕松便可以實(shí)現(xiàn)Android中的異步猿涨,且代碼簡(jiǎn)潔明了刻蟹,集中分布。
RxJava中默認(rèn)Observer和Observable都在同一線(xiàn)程執(zhí)行任務(wù)嘿辟。
2.Rxjava常用操作符
from()操作符:
接受數(shù)組或集合,返回一個(gè)按參數(shù)列表順序發(fā)射這些數(shù)據(jù)的Observable片效。
源碼:
public final static <T> Observable<T> from(Iterable<? extents T> iterable){
return create(new OnSubscribeFromIterable<T>(iterable));
}
例如:
String[] array = {"Amy","Rookie","MLXG"};
Observable.from(array)
.subscribe(new Observer<String>(){
...
});
just()操作符:
接受1-9個(gè)參數(shù)红伦,它們還可以是不同類(lèi)型,返回一個(gè)按參數(shù)列表順序發(fā)射這些數(shù)據(jù)的Observable淀衣。
例如:
Observable.just(1,2.4,"adb")
.subscribe(new Action1<String>(){
...
});
map()操作符:
把原來(lái)的Observable對(duì)象轉(zhuǎn)換成另一個(gè)Observable對(duì)象昙读,方便Observer獲得想要的數(shù)據(jù)形式,一對(duì)一
列如:
Observable.just("images/logo.png") //輸入類(lèi)型 String
.map(new Func1<String,Bitmap>(){
@Verride
public Bitmap call(String filePath){ //參數(shù)類(lèi)型 String
return getBitmapFromPath(filePath); //返回類(lèi)型 Bitmap
}
})
.subscribe(new Action1<Bitmap>(){
@Override
public void call(Bitmap bitmap){ //參數(shù)類(lèi)型 bitmap
showBitmap(bitmap)
}
});
flatMap()操作符:
返回任何它想返回的Observable對(duì)象膨桥,一對(duì)多
列如:
Student[] students = ...?;
Subscriber<Course> subscriber = new Subscriber<Course>(){
@Override
public void onNext(Course course){
...
}
};
Observable.from(students)
.flatMap(new Func1<Student,Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
filter()操作符:
Func中對(duì)每項(xiàng)元素進(jìn)行過(guò)濾處理蛮浑,滿(mǎn)足條件的元素才會(huì)繼續(xù)發(fā)送,下面的過(guò)濾偶數(shù)只嚣。
列如:
Observable.just(2,3,23,54,15)
.filter(new Func1<Integer,Boolean>() {
@Override
public Boolean call(Integer integer){
return integer % 2 == 0;
}
})
.subscribe(new Observer<Integer>(){
@Override
public void onNext(Integer integer){
...
}
...
});
take()操作符:
輸出最多指定數(shù)量的結(jié)果
列如:
Observable.just(1,2,3,4)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.take(3) //只發(fā)送前三個(gè)事件
...
doOnNext()操作符:
用來(lái)在觀察者Observer.onNext()方法調(diào)用之前進(jìn)行一些初始化操作沮稚,保存/緩存網(wǎng)絡(luò)結(jié)果
例如:
Observable.just(1,2,3)
.doOnNext(new Action1<Integer>(){
@Override
public void call(Integer integer){
...
}
})
...
Merge()操作符:
合并多個(gè)Observable,按照加入的Observable順序?qū)⒏鱾€(gè)元素傳遞册舞。
例如:
Observable<Integer> obserable1 = Observable.just(2,12,34,32);
Observable<Integer> obserable2 = Observable.just(32,12,43,2);
Observable.merge(observable1,observable2)
.subscribe(new Observer<Integer>(){
...
});
zip()操作符:
將各個(gè)Observable個(gè)對(duì)應(yīng)位置各個(gè)元素取出做操作蕴掏,然后將結(jié)果傳遞。
例如:
Observable<Integer> observable1 = Observable.just(1,2,3);
Observable<Integer> observable2 = Observable.just(11,22);
Observable.zip(observable1,observable2,newFunc2<Integer,Integer,Integer>(){
@Override
public Integer call(Integer integer1,Integer integer){
return integer1+integer2;
}
})
.subscribe(new Observable<Integer>(){
...
@Override
public void onNext(Integer integer){
//out 12调鲸、24盛杰、3
}
});
3.Scheduler(調(diào)度號(hào))切換線(xiàn)程
Schedulers.immediate():
直接在當(dāng)前線(xiàn)程運(yùn)行,相當(dāng)于不指定線(xiàn)程藐石,默認(rèn)
Schedulers.newThread():
總是啟動(dòng)新線(xiàn)程即供,并在新線(xiàn)程操作
Schedulers.io():
用于IO密集型任務(wù),如異步阻塞IO操作于微,這個(gè)調(diào)度器的線(xiàn)程池會(huì)根據(jù)需要增長(zhǎng)逗嫡;
對(duì)于普通的計(jì)算任務(wù)青自,請(qǐng)使用Schedulers.computation();
Schedulers.io()默認(rèn)是一個(gè)CachedThreadScheduler,很像一個(gè)有線(xiàn)程緩存的新線(xiàn)程調(diào)度器。
Schedulers.computation():
用于計(jì)算任務(wù)祸穷,如事件循環(huán)或和回調(diào)處理性穿,不要用于IO操作(IO操作請(qǐng)使用Schedulers.io());默認(rèn)線(xiàn)程數(shù)等于處理器的數(shù)量
Schedulers.trampoline():
當(dāng)其它排隊(duì)的任務(wù)完成后,在當(dāng)前線(xiàn)程排隊(duì)開(kāi)始執(zhí)行雷滚。
SubscribeOn\ObserveOn
subscribeOn():
指定Observable(被觀察者)所在的線(xiàn)程需曾,或叫做事件產(chǎn)生的線(xiàn)程。
observeOn():
指定Observer(觀察者)所運(yùn)行在的線(xiàn)程祈远,或叫做事件消費(fèi)的線(xiàn)程呆万。
4.Fowable與Subscriber
當(dāng)被觀察者發(fā)射數(shù)據(jù)的速度大于觀察者接收處理數(shù)據(jù)的速度,造成觀察者的調(diào)度器中數(shù)據(jù)緩沖池?zé)o限堆積车份,
超出了緩沖池的最大容量谋减,導(dǎo)致OOM.
例如:
Observable.create(new ObservableOnSubscribe<String>(){
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception{
int a = 0;
while(true){
e.onNext("data:"+(i++));
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>(){
@Override
public void accept(String s) throws Exception{
Thread.sleep(2000);
print(s);
}
});
而此時(shí)Flowable的背壓策略就很好的解決這個(gè)問(wèn)題.
例如:
Flowable.create(new FlowableOnSubscribe<String>(){
@Override
public void subcribe(@NonNull FlowableEmitter<String> e) throws Exception{
int i = 0;
while(true){
e.onNext("data:"+(i++));
}
}
},BackpressureStrategy.DROP) //超出緩沖池的數(shù)據(jù)丟棄
.subecribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>(){
Subscription subscription;
@Override
public void onSubsrcibe(Subscription s){
subscription = s;
subscription.request(1);
}
@Override
public void onNext(String s){
try{
Thread.sleep(2000);
}catch(InterruptedException e){
e.printStackTrace();
}
printThred(2);
subscription.request(1); //處理完了扫沼,再請(qǐng)求數(shù)據(jù)
}
...
});
該背壓策略是超出緩沖池的數(shù)據(jù)被丟棄出爹,而觀察者要求處理一個(gè) 發(fā)送我一個(gè)數(shù)據(jù)。
Backpressure的策略
a.被觀察者和觀察者在異步線(xiàn)程的情況下缎除,如果被觀察者發(fā)射事件的速度大于
觀察者接收事件的速度严就,就會(huì)產(chǎn)生Backpressure問(wèn)題。
但是同步情況下器罐,Backpressure問(wèn)題不會(huì)存在梢为。
b.Backpressure的策略?xún)H僅是調(diào)度Subscriber接收事件,并不影響Flowable
發(fā)送事件轰坊。觀察者可以根據(jù)自身實(shí)際情況按需拉取數(shù)據(jù)铸董,而不是被動(dòng)接收。
最終實(shí)現(xiàn)了上游被觀察者發(fā)送事件的速度的控制肴沫,實(shí)現(xiàn)了背壓的策略粟害。
c.Backpressure的策略有5種:ERROR,BUFFER,DROP,LATEST,MISSING
ERROR:
用來(lái)緩存觀察者處理不了暫時(shí)緩存下來(lái)的數(shù)據(jù),緩沖池的默認(rèn)大小為128樊零,即只能緩存128個(gè)事件我磁。
如果緩沖池溢出,就會(huì)立刻拋出MissingBackpressureException異常驻襟。
BUFFER:
即把默認(rèn)容器為128的緩存池成一個(gè)大的緩存池夺艰,支持很多的數(shù)據(jù),這種方式
比較消耗內(nèi)存沉衣。
DROP:
當(dāng)消費(fèi)者處理不了的時(shí)候就丟棄郁副,消費(fèi)者通過(guò)request傳入其需求n(事件個(gè)數(shù)),
然后生產(chǎn)著把n個(gè)事件傳遞給消費(fèi)者供其消費(fèi)豌习,其他消費(fèi)不掉的丟棄存谎。
LATEST:
基本和DROP一致拔疚,消費(fèi)者通過(guò)request傳入需求n,然后生產(chǎn)者把n個(gè)事
件傳遞給消費(fèi)者供其消費(fèi),其他消費(fèi)不掉的事件就丟棄既荚。
唯一區(qū)別是LATEST總能使消費(fèi)者能夠接收到生產(chǎn)者產(chǎn)生的最好一個(gè)事件稚失。
MISSING:
沒(méi)有緩沖池,接收第一個(gè)數(shù)據(jù)之后恰聘,后面的都丟棄句各。