3.組合
(1) Merge
merge(Observable, Observable)將兩個(gè)Observable發(fā)射的事件序列組合并成一個(gè)事件序列,就像是一個(gè)Observable發(fā)射的一樣摊趾。你可以簡單的將它理解為兩個(gè)Obsrvable合并成了一個(gè)Observable慎王,合并后的數(shù)據(jù)是無序的。
String[] array1 = {"kpioneer","Tiger","Cook","Zhang","Haocai"};
String[] array2 = {"WangWu","Zhangsan","Lisi","Luo"};
Observable<String> merge = Observable.merge(Observable.from(array1),Observable.from(array2));
merge.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e("main","輸出結(jié)果:"+s);
}
});
結(jié)果輸出:
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 輸出結(jié)果:kpioneer
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 輸出結(jié)果:Tiger
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 輸出結(jié)果:Cook
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 輸出結(jié)果:Zhang
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 輸出結(jié)果:Haocai
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 輸出結(jié)果:WangWu
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 輸出結(jié)果:Zhangsan
08-08 08:50:12.517 3958-3958/com.haocai.architect.rxjava E/main: 輸出結(jié)果:Lisi
08-08 08:50:12.518 3958-3958/com.haocai.architect.rxjava E/main: 輸出結(jié)果:Luo
在Observable中被廓,一旦某一個(gè)事件拋異常,后面的序列將會(huì)終止
注意:mergeDelayError如果你希望在序列中出錯(cuò)的時(shí)候,不影響后面的序列兽狭,那么可以使用mergeDelayError方法
(2) Zip
zip(Observable, Observable, Func2)用來合并兩個(gè)Observable發(fā)射的數(shù)據(jù)項(xiàng),根據(jù)Func2函數(shù)生成一個(gè)新的值并發(fā)射出去。當(dāng)其中一個(gè)Observable發(fā)送數(shù)據(jù)結(jié)束或者出現(xiàn)異常后箕慧,另一個(gè)Observable也將停在發(fā)射數(shù)據(jù)服球。
簡單來說zip操作符就是合并多個(gè)數(shù)據(jù)流,
然后發(fā)送(Emit)最終合并的數(shù)據(jù)颠焦。
流程圖:
String[] array1 = {"kpioneer","Tiger","Cook","Zhang","Haocai"};
String[] array2 = {"WangWu","Zhangsan","Lisi","Luo"};
//1.合并算法自己決定
//2.序列長度有原始的最小數(shù)組的長度決定
Observable<String> zip = Observable.zip(Observable.from(array1),Observable.from(array2), new Func2<String, String, String>() {
@Override
public String call(String s, String s2) {
//這個(gè)Func是我們合并數(shù)組算法
return s+"--"+s2;
}
});
zip.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e("main","輸出結(jié)果:"+s);
}
});
結(jié)果輸出:
08-08 09:32:46.856 10134-10134/com.haocai.architect.rxjava E/main: 輸出結(jié)果:kpioneer--WangWu
08-08 09:32:46.856 10134-10134/com.haocai.architect.rxjava E/main: 輸出結(jié)果:Tiger--Zhangsan
08-08 09:32:46.856 10134-10134/com.haocai.architect.rxjava E/main: 輸出結(jié)果:Cook--Lisi
08-08 09:32:46.856 10134-10134/com.haocai.architect.rxjava E/main: 輸出結(jié)果:Zhang--Luo
同時(shí)發(fā)現(xiàn)Haocai 那項(xiàng)沒有輸出 以最小數(shù)組為單位
String[] array1 = {"kpioneer","Tiger","Cook","Zhang","Haocai"};
String[] array2 = {"WangWu","Zhangsan","Lisi","Luo"};
Integer[] array3 = { 100, 1000, 10000 };
Observable<String> zip = Observable.zip(Observable.from(array1),Observable.from(array2),Observable.from(array3), new Func3<String, String,Integer, String>() {
@Override
public String call(String s, String s2, Integer integer) {
return s+"---"+s2+"---"+integer;
}
});
zip.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e("main","輸出結(jié)果:"+s);
}
});
結(jié)果輸出:
08-08 10:02:06.519 4330-4330/com.haocai.architect.rxjava E/main: 輸出結(jié)果:kpioneer---WangWu---100
08-08 10:02:06.519 4330-4330/com.haocai.architect.rxjava E/main: 輸出結(jié)果:Tiger---Zhangsan---1000
08-08 10:02:06.520 4330-4330/com.haocai.architect.rxjava E/main: 輸出結(jié)果:Cook---Lisi---10000
(3) Join
前面兩個(gè)方法斩熊,zip()和merge()方法作用在發(fā)射數(shù)據(jù)的范疇內(nèi),在決定如何操作值之前有些場(chǎng)景我們需要考慮時(shí)間的伐庭。RxJava的join()函數(shù)基于時(shí)間窗口將兩個(gè)Observables發(fā)射的數(shù)據(jù)結(jié)合在一起粉渠。
join(Observable, Func1, Func1, Func2)我們先介紹下join操作符的4個(gè)參數(shù):
Observable:源Observable需要組合的Observable,這里我們姑且稱之為目標(biāo)Observable;
Func1:接收從源Observable發(fā)射來的數(shù)據(jù)圾另,并返回一個(gè)Observable霸株,這個(gè)Observable的聲明周期決定了源Obsrvable發(fā)射出來的數(shù)據(jù)的有效期;
Func1:接收目標(biāo)Observable發(fā)射來的數(shù)據(jù)集乔,并返回一個(gè)Observable去件,這個(gè)Observable的聲明周期決定了目標(biāo)Obsrvable發(fā)射出來的數(shù)據(jù)的有效期;
Func2:接收從源Observable和目標(biāo)Observable發(fā)射出來的數(shù)據(jù)扰路,并將這兩個(gè)數(shù)據(jù)組合后返回尤溜。
所以Join操作符的語法結(jié)構(gòu)大致是這樣的:onservableA.join(observableB, 控制observableA發(fā)射數(shù)據(jù)有效期的函數(shù), 控制observableB發(fā)射數(shù)據(jù)有效期的函數(shù)汗唱,兩個(gè)observable發(fā)射數(shù)據(jù)的合并規(guī)則)
String[] array1 = {"A","B","C"};
String[] array2 = {"1","2"};
Observable<String> observable1 = Observable.from(array1);
Observable<String> observable2 = Observable.from(array2);
Observable<String> join = observable1.join(observable2, new Func1<String, Observable<Long>>() {
@Override
public Observable<Long> call(String s) {
return Observable.timer(2, TimeUnit.SECONDS);
}
}, new Func1<String, Observable<Long>>() {
@Override
public Observable<Long> call(String s) {
return Observable.timer(2,TimeUnit.SECONDS);
}
}, new Func2<String,String,String>() {
@Override
public String call(String o, String o2) {
return o +"---" +o2;
}
});
join.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e("main","輸出結(jié)果:"+s);
}
});
結(jié)果輸出:
08-08 12:21:13.407 32015-32015/com.haocai.architect.rxjava E/main: 輸出結(jié)果:A---1
08-08 12:21:13.407 32015-32015/com.haocai.architect.rxjava E/main: 輸出結(jié)果:B---1
08-08 12:21:13.408 32015-32015/com.haocai.architect.rxjava E/main: 輸出結(jié)果:C---1
08-08 12:21:13.408 32015-32015/com.haocai.architect.rxjava E/main: 輸出結(jié)果:A---2
08-08 12:21:13.408 32015-32015/com.haocai.architect.rxjava E/main: 輸出結(jié)果:B---2
08-08 12:21:13.408 32015-32015/com.haocai.architect.rxjava E/main: 輸出結(jié)果:C---2
4.線程
(1)Scheduler
默認(rèn)情況下宫莱,RxJava遵循線程不變?cè)瓌t。即:在哪個(gè)線程調(diào)用subscribe()方法哩罪,就在哪個(gè)線程生產(chǎn)事件授霸,在哪個(gè)線程生產(chǎn)事件,就在哪個(gè)線程消費(fèi)事件际插。如果需要切換線程绝葡,就需要用到Scheduler(調(diào)度器)。
在RxJava中腹鹉,Scheduler相當(dāng)于線程控制器藏畅,RxJava通過它來指定每一段代碼運(yùn)行在什么線程中。RxJava內(nèi)置了幾個(gè)Scheduler功咒,適合大多數(shù)使用場(chǎng)景:
Schedulers.immediate(): 直接在當(dāng)前線程運(yùn)行愉阎,相當(dāng)于不指定線程。這是默認(rèn)的Scheduler力奋。
Schedulers.newThread(): 總是啟用新線程榜旦,并在新線程執(zhí)行操作。
Schedulers.io(): I/O 操作(讀寫文件景殷、讀寫數(shù)據(jù)庫溅呢、網(wǎng)絡(luò)信息交互等)所使用的Scheduler澡屡。行為模式和newThread()差不多,區(qū)別在于io()的內(nèi)部實(shí)現(xiàn)是是用一個(gè)無數(shù)量上限的線程池咐旧,可以重用空閑的線程驶鹉,因此多數(shù)情況下io()比newThread()更有效率。不要把計(jì)算工作放在io()中铣墨,可以避免創(chuàng)建不必要的線程室埋。
Schedulers.computation(): 計(jì)算所使用的Scheduler。這個(gè)計(jì)算指的是 CPU 密集型計(jì)算伊约,即不會(huì)被 I/O 等操作限制性能的操作姚淆,例如圖形的計(jì)算。這個(gè)Scheduler使用的固定的線程池屡律,大小為 CPU 核數(shù)腌逢。不要把 I/O 操作放在computation()中,否則 I/O 操作的等待時(shí)間會(huì)浪費(fèi) CPU超埋。
Android 專用的AndroidSchedulers.mainThread()上忍,它指定的操作將在 Android 主線程運(yùn)行。
有了這幾個(gè) Scheduler 纳本,就可以使用 subscribeOn()和 observeOn()兩個(gè)方法來對(duì)線程進(jìn)行控制了。
subscribeOn(): 指定subscribe()(訂閱/注冊(cè))所發(fā)生的線程腋颠,即 Observable.OnSubscribe被激活時(shí)所處的線程繁成。或者叫做事件產(chǎn)生的線程淑玫。
observeOn(): 指定Subscriber(觀察者)所運(yùn)行在的線程巾腕。或者叫做事件消費(fèi)的線程絮蒿。
文字?jǐn)⑹隹倸w難理解尊搬,上代碼:
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程(即 Action1對(duì)象)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
上面這段代碼中,由于 subscribeOn(Schedulers.io())的指定土涝,被創(chuàng)建的事件的內(nèi)容 1佛寿、2、3但壮、4將會(huì)在 IO 線程發(fā)出冀泻;而由于observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber數(shù)字的打印將發(fā)生在主線程 蜡饵。事實(shí)上弹渔,這種在subscribe()之前寫上兩句 subscribeOn(Scheduler.io())和 observeOn(AndroidSchedulers.mainThread())的使用方式非常常見,它適用于多數(shù)的 『后臺(tái)線程取數(shù)據(jù)溯祸,主線程顯示』的程序策略肢专。
已加載圖片為例:
加載圖片傳統(tǒng)方式寫法:
private ImageView iv_image;
private static String URL_STR ="http://pic36.nipic.com/20131203/3822951_101052690000_2.jpg";
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_scheduler);
iv_image = (ImageView)findViewById(R.id.iv_image);
loadImage();
}
//傳統(tǒng)方式:下載圖片
//方案一: Thread+Handler
//方案二: AsyncTask
private Bitmap download(String urlString) {
try {
URL url = new URL(urlString);
HttpURLConnection connection = (HttpURLConnection) url
.openConnection();
InputStream inputStream = connection.getInputStream();
return BitmapFactory.decodeStream(inputStream);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
private void loadImage(){
new DownloadTask().execute();
}
class DownloadTask extends AsyncTask<Void,Void,Bitmap>{
@Override
protected Bitmap doInBackground(Void... params) {
return download(URL_STR);
}
@Override
protected void onPostExecute(Bitmap bitmap) {
super.onPostExecute(bitmap);
iv_image.setImageBitmap(bitmap);
}
}
RxJava方式寫法:
private static String URL_STR = "http://pic36.nipic.com/20131203/3822951_101052690000_2.jpg";
private ImageView iv_image;
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_scheduler);
iv_image = (ImageView) findViewById(R.id.iv_image);
rxJavaLoadImage();
}
//RxJava實(shí)現(xiàn)下載
private void rxJavaLoadImage() {
//Url地址變換成Bitmap
//我們需要指定這些事件的執(zhí)行所在的線程
Observable.just(URL_STR).map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
return download(s);
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())//需要引入RxAndroid庫
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
//更新UI
iv_image.setImageBitmap(bitmap);
}
});
}
Schedulers.io(): I/O 操作(讀寫文件舞肆、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的Scheduler博杖。
注意:
1.在RxJava中整個(gè)流程分為事件的生產(chǎn)和消費(fèi)
2.RxJava整體架構(gòu)分為4個(gè)角色:Observable椿胯、Observer、Subscriber欧募、Subjects
其中Observables和Subjects是兩個(gè)“生產(chǎn)”實(shí)體压状,Observers和Subscribers是兩個(gè)“消費(fèi)”實(shí)體
subscribeOn和observeOn線程控制的區(qū)別?--線程切換
1.subscribeOn:指定生產(chǎn)事件所在的線程
observeOn :指定消費(fèi)事件所在的線程
2.subscribeOn:按照順序執(zhí)行序列跟继,作用于他前后的序列种冬,直到遇到observeOn才切換新的線程
onserveOn :按照順序執(zhí)行序列,只能作用于他之后的序列
注意:subscribeOn可以在序列中調(diào)用(執(zhí)行)多次舔糖,但是前提條件(在生產(chǎn)序列之前調(diào)用娱两,在訪問網(wǎng)絡(luò)之前,我們需要初始化一些UI)
例子證明:
//RxJava實(shí)現(xiàn)下載
private void rxJavaLoadImage() {
//Url地址變換成Bitmap
//我們需要指定這些事件的執(zhí)行所在的線程
Observable.just(URL_STR).doOnSubscribe(new Action0() {
@Override
public void call() {
// 在執(zhí)行生產(chǎn)事件之前金吗,回調(diào)方法十兢,我們需要做一些初始化工作,而這個(gè)工作可以在子線程摇庙,也可以在主線程
// 更新UI必須在主線程
iv_image.setVisibility(View.VISIBLE);
// 第一步:初始化
Log.e("main", "當(dāng)前線程狀態(tài) : " + Thread.currentThread().getName());
}
}).subscribeOn(AndroidSchedulers.mainThread()).observeOn(Schedulers.io()).map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
Log.e("main", "當(dāng)前線程狀態(tài) : " + Thread.currentThread().getName());
return download(s);
}
}).map(new Func1<Bitmap, Bitmap>() {
@Override
public Bitmap call(Bitmap t) {
Log.e("main", "當(dāng)前線程狀態(tài) : " + Thread.currentThread().getName());
// 加水印旱物、裁剪、灰度處理等等(圖像處理相關(guān))......
// 第三步:圖像處理
return t;
}
}).observeOn(AndroidSchedulers.mainThread())//需要引入RxAndroid庫
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
Log.e("main", "當(dāng)前線程狀態(tài) : " + Thread.currentThread().getName());
//更新UI
iv_image.setImageBitmap(bitmap);
}
});
}
結(jié)果輸出:
08-09 06:53:29.852 17616-17616/com.haocai.architect.rxjava E/main: 當(dāng)前線程狀態(tài) : main
08-09 06:53:29.854 17616-17647/com.haocai.architect.rxjava E/main: 當(dāng)前線程狀態(tài) : RxIoScheduler-2
08-09 06:53:29.983 17616-17647/com.haocai.architect.rxjava E/main: 當(dāng)前線程狀態(tài) : RxIoScheduler-2
08-09 06:53:30.072 17616-17616/com.haocai.architect.rxjava E/main: 當(dāng)前線程狀態(tài) : main
#其中main表示主線程卫袒, RxIoScheduler表示子線程