簡介
- 本文主要是講基于rxjava包裝的一個針對集合做并發(fā)操作的工具類
- rxjava文檔地址
依賴
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version></version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version></version>
</dependency>
支持四類function
- 傳入數(shù)據(jù)List,返回處理之后的List(不保證順序)
- 傳入數(shù)據(jù)List,返回處理之后的數(shù)據(jù)Map
- 傳入數(shù)據(jù)List,分組大小嚣伐,返回處理之后的List(不保證順序)
- 傳入數(shù)據(jù)List,分組大小入热,返回處理之后的數(shù)據(jù)Map
function接口
public interface List2List<I,R> {
List<R> call(List<I> list);
}
public interface List2Map<I, K, V>{
Map<K, V> call(List<I> list);
}
public interface Object2List<I, R> {
List<R> call(I i);
}
public interface Object2Map<I, K, V> {
Map<K, V> call(I i);
}
線程池及工廠方法
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func0;
import rx.schedulers.Schedulers;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
* 功能描述:
* <p>
* </p>
*
* @author : yuanchao.he
* @version 1.0 2016-03-22
* @since mobile-oppkit-server 1.0
*/
public class ObservableHelper {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 5, SECONDS,
new LinkedBlockingQueue<Runnable>());
private ObservableHelper(){}
private static class ClassHolder{
private static ObservableHelper observableHelper = new ObservableHelper();
}
public static ObservableHelper INSTANCE(){
return ClassHolder.observableHelper;
}
public <T> Observable<T> createObservable(final Func0<T> func) {
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
try {
subscriber.onNext(func.call());
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.from(executor)).cache();
}
}
ObservableExecutor 并行處理器
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import rx.Observable;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* 功能描述:
* <p>
* </p>
*
* @author : yuanchao.he
* @version 1.0 2016-03-22
* @since mobile-oppkit-server 1.0
*/
public class ObservableExecutor {
private ObservableHelper helper = ObservableHelper.INSTANCE();
private static class ExecutorHolder {
private static ObservableExecutor executor = new ObservableExecutor();
}
private ObservableExecutor(){}
public static ObservableExecutor INSTANCE() {
return ExecutorHolder.executor;
}
/**
* 傳入List, 對List中的每一項分配一條線程做處理,返回Map結(jié)構(gòu)帜慢,自己需要實現(xiàn)Object2Map接口
* @param input
* @param func
* @param <I>
* @param <K>
* @param <V>
* @return
*/
public <I, K, V> Map<K, V> executeObservable(List<I> input, final Object2Map<I, K, V> func) {
if(input==null || input.size()==0){
return Collections.emptyMap();
}
Map<K, V> resultMap = Maps.newHashMap();
Observable.from(input).flatMap(new Func1<I, Observable<Map<K, V>>>() {
@Override
public Observable<Map<K, V>> call(final I i) {
return helper.createObservable(new Func0<Map<K, V>>() {
@Override
public Map<K, V> call() {
return func.call(i);
}
}).onErrorReturn(new Func1<Throwable, Map<K, V>>() {
@Override
public Map<K, V> call(Throwable throwable) {
return Collections.emptyMap();
}
});
}
}).reduce(resultMap, new Func2<Map<K, V>, Map<K, V>, Map<K, V>>() {
@Override
public Map<K, V> call(Map<K, V> acc, Map<K, V> map) {
acc.putAll(map);
return acc;
}
}).toBlocking().first();
return resultMap;
}
/**
* 傳入List,針對List中每一項分配一條線程做處理坏快,返回List結(jié)構(gòu)铅檩,自己實現(xiàn)Object2List接口
* @param input
* @param func
* @param <I>
* @param <R>
* @return
*/
public <I, R> List<R> executeObservable(List<I> input, final Object2List<I, R> func) {
if(input==null || input.size()==0){
return Collections.emptyList();
}
List<R> result = Lists.newLinkedList();
Observable.from(input).flatMap(new Func1<I, Observable<List<R>>>() {
@Override
public Observable<List<R>> call(final I i) {
return helper.createObservable(new Func0<List<R>>() {
@Override
public List<R> call() {
return func.call(i);
}
}).onErrorReturn(new Func1<Throwable, List<R>>() {
@Override
public List<R> call(Throwable throwable) {
return Collections.emptyList();
}
});
}
}).reduce(result, new Func2<List<R>, List<R>, List<R>>() {
@Override
public List<R> call(List<R> acc, List<R> list) {
acc.addAll(list);
return acc;
}
}).toBlocking().first();
return result;
}
/**
* 傳入List數(shù)據(jù),分組大小莽鸿,對List數(shù)據(jù)做分組以后昧旨,為每一組分配一個線程做處理,返回Map結(jié)構(gòu)富拗,自己實現(xiàn)List2Map接口
* @param input
* @param partitionSize
* @param functionRMap
* @param <I>
* @param <K>
* @param <V>
* @return
*/
public <I, K, V> Map<K, V> executeObservable(List<I> input, int partitionSize,
final List2Map<I, K, V> functionRMap) {
if(input==null || input.size()==0){
return Collections.emptyMap();
}
if (partitionSize <= 0)
partitionSize = 10;
List<List<I>> lists = Lists.partition(input, partitionSize);
Map<K, V> resultMap = Maps.newHashMap();
Observable.from(lists).flatMap(new Func1<List<I>, Observable<Map<K, V>>>() {
@Override
public Observable<Map<K, V>> call(final List<I> list) {
return helper.createObservable(new Func0<Map<K, V>>() {
@Override
public Map<K, V> call() {
return functionRMap.call(list);
}
}).onErrorReturn(new Func1<Throwable, Map<K, V>>() {
@Override
public Map<K, V> call(Throwable throwable) {
return Collections.emptyMap();
}
});
}
}).reduce(resultMap, new Func2<Map<K, V>, Map<K, V>, Map<K, V>>() {
@Override
public Map<K, V> call(Map<K, V> acc, Map<K, V> map) {
acc.putAll(map);
return acc;
}
}).toBlocking().first();
return resultMap;
}
/**
* 傳入List數(shù)據(jù)臼予,分組大小,為每一組數(shù)據(jù)分配一條線程處理啃沪,返回List結(jié)構(gòu)數(shù)據(jù)粘拾,自己實現(xiàn)List2List接口
* @param input
* @param partitionSize
* @param func
* @param <I>
* @param <R>
* @return
*/
public <I, R> List<R> executeObservable(List<I> input, int partitionSize,
final List2List<I, R> func) {
if(input==null || input.size()==0){
return Collections.emptyList();
}
List<R> result = Lists.newLinkedList();
if (partitionSize <= 0)
partitionSize = 10;
List<List<I>> partitions = Lists.partition(input, partitionSize);
Observable.from(partitions).flatMap(new Func1<List<I>, Observable<List<R>>>() {
@Override public Observable<List<R>> call(final List<I> is) {
return helper.createObservable(new Func0<List<R>>() {
@Override public List<R> call() {
return func.call(is);
}
}).onErrorReturn(new Func1<Throwable, List<R>>() {
@Override public List<R> call(Throwable throwable) {
return Collections.emptyList();
}
});
}
}).reduce(result, new Func2<List<R>, List<R>, List<R>>() {
@Override public List<R> call(List<R> acc, List<R> list) {
acc.addAll(list);
return acc;
}
}).toBlocking().first();
return result;
}
}
demos
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
/**
* 功能描述:
* <p>
* </p>
*
* @author : yuanchao.he
* @version 1.0 2016-03-23
* @since mobile-oppkit-server 1.0
*/
public class Demo {
private static ObservableExecutor observableExecutor = ObservableExecutor.INSTANCE();
public static void main(String[] args) {
List<Integer> numbers = Lists.newLinkedList();
for(int i=500;i<=1000;i++){
numbers.add(i);
}
Map<Integer, String> result1 = observableExecutor.executeObservable(numbers,
new Object2Map<Integer, Integer, String>() {
@Override
public Map<Integer, String> call(Integer integer) {
Map<Integer, String> map = Maps.newHashMap();
map.put(integer, String.valueOf(integer*integer%1000));
return map;
}
});
System.out.println(result1);
/**
* 將數(shù)字轉(zhuǎn)換為字符串,每個線程處理一個數(shù)字轉(zhuǎn)換创千,并以list的結(jié)構(gòu)返回
*/
List<String> result2 = observableExecutor.executeObservable(numbers,
new Object2List<Integer, String>() {
@Override
public List<String> call(Integer integer) {
List<String> result = Lists.newLinkedList();
result.add(String.valueOf(integer*integer%1000));
return result;
}
});
System.out.println(result2);
/**
* 將數(shù)字轉(zhuǎn)換為字符串缰雇,對 numbers 分組,每組2個元素追驴,每個線程處理一組械哟,并以map結(jié)構(gòu)返回
*/
Map<Integer, String> result3 = observableExecutor.executeObservable(numbers, 2,
new List2Map<Integer, Integer, String>() {
@Override
public Map<Integer, String> call(List<Integer> list) {
Map<Integer, String> map = Maps.newHashMap();
for (Integer integer : list) {
map.put(integer, String.valueOf(integer*integer%1000));
}
return map;
}
});
System.out.println(result3);
/**
* 將數(shù)字轉(zhuǎn)換為字符串 對 numbers 分組,每組2個元素殿雪,每個線程處理一組暇咆,并以list結(jié)構(gòu)返回
*/
List<String> result4 = observableExecutor.executeObservable(numbers, 2,
new List2List<Integer, String>() {
@Override
public List<String> call(List<Integer> list) {
List<String> result = Lists.newLinkedList(Lists.transform(list,
new Function<Integer, String>() {
@Override
public String apply(Integer input) {
return String.valueOf(input*input%1000);
}
}));
return result;
}
});
System.out.println(result4);
}
}