上一篇文章RxJava 線程模型分析詳細(xì)介紹了RxJava的線程模型讹蘑,被觀察者(Observable巫橄、Flowable...)發(fā)射的數(shù)據(jù)流可以經(jīng)歷各種線程切換,但是數(shù)據(jù)流的各個(gè)元素之間不會(huì)產(chǎn)生并行執(zhí)行的效果纺棺。我們知道并行并不是并發(fā)叨恨,不是同步,更不是異步搭盾。
Java 8新增了并行流來(lái)實(shí)現(xiàn)并行的效果咳秉,只需要在集合上調(diào)用parallelStream()即可。
List<Integer> result = new ArrayList();
for(Integer i=1;i<=100;i++) {
result.add(i);
}
result.parallelStream()
.map(new java.util.function.Function<Integer, String>() {
@Override
public String apply(Integer integer) {
return integer.toString();
}
}).forEach(new java.util.function.Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
});
如果要達(dá)到類似于 Java8 的 parallel 執(zhí)行效果鸯隅,可以借助 flatMap 操作符來(lái)實(shí)現(xiàn)并行的效果澜建。
Observable.range(1,100)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.just(integer)
.subscribeOn(Schedulers.computation())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer.toString();
}
});
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String str) throws Exception {
System.out.println(str);
}
});
flatMap操作符的原理是將這個(gè)Observable轉(zhuǎn)化為多個(gè)以原Observable發(fā)射的數(shù)據(jù)作為源數(shù)據(jù)的Observable,然后再將這多個(gè)Observable發(fā)射的數(shù)據(jù)整合發(fā)射出來(lái)蝌以,需要注意的是最后的順序可能會(huì)交錯(cuò)地發(fā)射出來(lái)炕舵。
flatMap會(huì)對(duì)原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)執(zhí)行變換操作。在這里跟畅,生成的每個(gè)Observable可以使用線程池(指定了computation作為Scheduler)并發(fā)的執(zhí)行咽筋。
當(dāng)然我們還可以使用ExecutorService來(lái)創(chuàng)建一個(gè)Scheduler。
int threadNum = Runtime.getRuntime().availableProcessors()+1;
ExecutorService executor = Executors.newFixedThreadPool(threadNum);
final Scheduler scheduler = Schedulers.from(executor);
Observable.range(1,100)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.just(integer)
.subscribeOn(scheduler)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer.toString();
}
});
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String str) throws Exception {
System.out.println(str);
}
});
需要補(bǔ)充的是: 當(dāng)完成所有的操作之后徊件,ExecutorService需要執(zhí)行shutdown()來(lái)關(guān)閉 ExecutorService奸攻。在這里,可以使用doFinally操作符來(lái)執(zhí)行shutdown()虱痕。
doFinally操作符可以在onError或者onComplete之后調(diào)用指定的操作睹耐,或由下游處理。
增加了doFinally操作符之后部翘,代碼是這樣的硝训。
int threadNum = Runtime.getRuntime().availableProcessors()+1;
final ExecutorService executor = Executors.newFixedThreadPool(threadNum);
final Scheduler scheduler = Schedulers.from(executor);
Observable.range(1,100)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.just(integer)
.subscribeOn(scheduler)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer.toString();
}
});
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
executor.shutdown();
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String str) throws Exception {
System.out.println(str);
}
});
Round-Robin 算法實(shí)現(xiàn)并行
Round-Robin算法是最簡(jiǎn)單的一種負(fù)載均衡算法。它的原理是把來(lái)自用戶的請(qǐng)求輪流分配給內(nèi)部的服務(wù)器:從服務(wù)器1開始略就,直到服務(wù)器N捎迫,然后重新開始循環(huán)。也被稱為哈希取模法表牢,在實(shí)際中是非常常用的數(shù)據(jù)分片方法窄绒。Round-Robin算法的優(yōu)點(diǎn)是其簡(jiǎn)潔性,它無(wú)需記錄當(dāng)前所有連接的狀態(tài)崔兴,所以它是一種無(wú)狀態(tài)調(diào)度彰导。
通過(guò) Round-Robin 算法把數(shù)據(jù)分組, 按線程數(shù)分組蛔翅,分成5組每組個(gè)數(shù)相同,一起發(fā)送處理位谋。這樣做的目的可以減少Observable的創(chuàng)建節(jié)省系統(tǒng)資源山析,但是會(huì)增加處理時(shí)間,Round-Robin 算法可以看成是對(duì)時(shí)間和空間的綜合考慮掏父。
final AtomicInteger batch = new AtomicInteger(0);
Observable.range(1,100)
.groupBy(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
return batch.getAndIncrement() % 5;
}
})
.flatMap(new Function<GroupedObservable<Integer, Integer>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception {
return integerIntegerGroupedObservable.observeOn(Schedulers.io())
.map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return integer.toString();
}
});
}
})
.subscribe(new Consumer<Object>() {
@Override
public void accept(@NonNull Object o) throws Exception {
System.out.println(o);
}
});
在這里笋轨,也可以使用ExecutorService創(chuàng)建Scheduler,來(lái)替代Schedulers.io()
final AtomicInteger batch = new AtomicInteger(0);
int threadNum = 5;
final ExecutorService executor = Executors.newFixedThreadPool(threadNum);
final Scheduler scheduler = Schedulers.from(executor);
Observable.range(1,100)
.groupBy(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
return batch.getAndIncrement() % threadNum;
}
})
.flatMap(new Function<GroupedObservable<Integer, Integer>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) throws Exception {
return integerIntegerGroupedObservable.observeOn(scheduler)
.map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return integer.toString();
}
});
}
})
.subscribe(new Consumer<Object>() {
@Override
public void accept(@NonNull Object o) throws Exception {
System.out.println(o);
}
});