基于rxjava的集合并發(fā)

簡介

  • 本文主要是講基于rxjava包裝的一個針對集合做并發(fā)操作的工具類
  • rxjava文檔地址

依賴

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava</artifactId>
    <version></version>
</dependency>
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version></version>
</dependency>

支持四類function

  • 傳入數(shù)據(jù)List,返回處理之后的List(不保證順序)
  • 傳入數(shù)據(jù)List,返回處理之后的數(shù)據(jù)Map
  • 傳入數(shù)據(jù)List,分組大小嚣伐,返回處理之后的List(不保證順序)
  • 傳入數(shù)據(jù)List,分組大小入热,返回處理之后的數(shù)據(jù)Map

function接口

public interface List2List<I,R> {
    List<R> call(List<I> list);
}

public interface List2Map<I, K, V>{
    Map<K, V> call(List<I> list);
}

public interface Object2List<I, R> {
    List<R> call(I i);
}

public interface Object2Map<I, K, V> {
    Map<K, V> call(I i);
}

線程池及工廠方法

import rx.Observable;
import rx.Subscriber;
import rx.functions.Func0;
import rx.schedulers.Schedulers;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
 * 功能描述:
 * <p>
 * </p>
 *
 * @author : yuanchao.he
 * @version 1.0 2016-03-22
 * @since mobile-oppkit-server 1.0
 */
public class ObservableHelper {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 5, SECONDS,
            new LinkedBlockingQueue<Runnable>());

    private ObservableHelper(){}

    private static class ClassHolder{
        private static ObservableHelper observableHelper = new ObservableHelper();
    }

    public static ObservableHelper INSTANCE(){
        return ClassHolder.observableHelper;
    }

    public <T> Observable<T> createObservable(final Func0<T> func) {
        return Observable.create(new Observable.OnSubscribe<T>() {
            @Override
            public void call(Subscriber<? super T> subscriber) {
                try {
                    subscriber.onNext(func.call());
                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        }).subscribeOn(Schedulers.from(executor)).cache();
    }
}

ObservableExecutor 并行處理器

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import rx.Observable;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
 * 功能描述:
 * <p>
 * </p>
 *
 * @author : yuanchao.he
 * @version 1.0 2016-03-22
 * @since mobile-oppkit-server 1.0
 */
public class ObservableExecutor {

    private ObservableHelper helper = ObservableHelper.INSTANCE();

    private static class ExecutorHolder {
        private static ObservableExecutor executor = new ObservableExecutor();
    }

    private ObservableExecutor(){}

    public static ObservableExecutor INSTANCE() {
        return ExecutorHolder.executor;
    }

    /**
     * 傳入List, 對List中的每一項分配一條線程做處理,返回Map結(jié)構(gòu)帜慢,自己需要實現(xiàn)Object2Map接口
     * @param input
     * @param func
     * @param <I>
     * @param <K>
     * @param <V>
     * @return
     */
    public <I, K, V> Map<K, V> executeObservable(List<I> input, final Object2Map<I, K, V> func) {
        if(input==null || input.size()==0){
            return Collections.emptyMap();
        }
        Map<K, V> resultMap = Maps.newHashMap();
        Observable.from(input).flatMap(new Func1<I, Observable<Map<K, V>>>() {
            @Override
            public Observable<Map<K, V>> call(final I i) {
                return helper.createObservable(new Func0<Map<K, V>>() {
                    @Override
                    public Map<K, V> call() {
                        return func.call(i);
                    }
                }).onErrorReturn(new Func1<Throwable, Map<K, V>>() {
                    @Override
                    public Map<K, V> call(Throwable throwable) {
                        return Collections.emptyMap();
                    }
                });
            }
        }).reduce(resultMap, new Func2<Map<K, V>, Map<K, V>, Map<K, V>>() {
            @Override
            public Map<K, V> call(Map<K, V> acc, Map<K, V> map) {
                acc.putAll(map);
                return acc;
            }
        }).toBlocking().first();
        return resultMap;
    }

    /**
     * 傳入List,針對List中每一項分配一條線程做處理坏快,返回List結(jié)構(gòu)铅檩,自己實現(xiàn)Object2List接口
     * @param input
     * @param func
     * @param <I>
     * @param <R>
     * @return
     */
    public <I, R> List<R> executeObservable(List<I> input, final Object2List<I, R> func) {
        if(input==null || input.size()==0){
            return Collections.emptyList();
        }
        List<R> result = Lists.newLinkedList();
        Observable.from(input).flatMap(new Func1<I, Observable<List<R>>>() {
            @Override
            public Observable<List<R>> call(final I i) {
                return helper.createObservable(new Func0<List<R>>() {
                    @Override
                    public List<R> call() {
                        return func.call(i);
                    }
                }).onErrorReturn(new Func1<Throwable, List<R>>() {
                    @Override
                    public List<R> call(Throwable throwable) {
                        return Collections.emptyList();
                    }
                });
            }
        }).reduce(result, new Func2<List<R>, List<R>, List<R>>() {
            @Override
            public List<R> call(List<R> acc, List<R> list) {
                acc.addAll(list);
                return acc;
            }
        }).toBlocking().first();
        return result;
    }

    /**
     * 傳入List數(shù)據(jù),分組大小莽鸿,對List數(shù)據(jù)做分組以后昧旨,為每一組分配一個線程做處理,返回Map結(jié)構(gòu)富拗,自己實現(xiàn)List2Map接口
     * @param input
     * @param partitionSize
     * @param functionRMap
     * @param <I>
     * @param <K>
     * @param <V>
     * @return
     */
    public <I, K, V> Map<K, V> executeObservable(List<I> input, int partitionSize,
            final List2Map<I, K, V> functionRMap) {
        if(input==null || input.size()==0){
            return Collections.emptyMap();
        }
        if (partitionSize <= 0)
            partitionSize = 10;
        List<List<I>> lists = Lists.partition(input, partitionSize);
        Map<K, V> resultMap = Maps.newHashMap();
        Observable.from(lists).flatMap(new Func1<List<I>, Observable<Map<K, V>>>() {
            @Override
            public Observable<Map<K, V>> call(final List<I> list) {
                return helper.createObservable(new Func0<Map<K, V>>() {
                    @Override
                    public Map<K, V> call() {
                        return functionRMap.call(list);
                    }
                }).onErrorReturn(new Func1<Throwable, Map<K, V>>() {
                    @Override
                    public Map<K, V> call(Throwable throwable) {
                        return Collections.emptyMap();
                    }
                });
            }
        }).reduce(resultMap, new Func2<Map<K, V>, Map<K, V>, Map<K, V>>() {
            @Override
            public Map<K, V> call(Map<K, V> acc, Map<K, V> map) {
                acc.putAll(map);
                return acc;
            }
        }).toBlocking().first();
        return resultMap;
    }

    /**
     * 傳入List數(shù)據(jù)臼予,分組大小,為每一組數(shù)據(jù)分配一條線程處理啃沪,返回List結(jié)構(gòu)數(shù)據(jù)粘拾,自己實現(xiàn)List2List接口
     * @param input
     * @param partitionSize
     * @param func
     * @param <I>
     * @param <R>
     * @return
     */
    public <I, R> List<R> executeObservable(List<I> input, int partitionSize,
            final List2List<I, R> func) {
        if(input==null || input.size()==0){
            return Collections.emptyList();
        }
        List<R> result = Lists.newLinkedList();
        if (partitionSize <= 0)
            partitionSize = 10;
        List<List<I>> partitions = Lists.partition(input, partitionSize);
        Observable.from(partitions).flatMap(new Func1<List<I>, Observable<List<R>>>() {
            @Override public Observable<List<R>> call(final List<I> is) {
                return helper.createObservable(new Func0<List<R>>() {
                    @Override public List<R> call() {
                        return func.call(is);
                    }
                }).onErrorReturn(new Func1<Throwable, List<R>>() {
                    @Override public List<R> call(Throwable throwable) {
                        return Collections.emptyList();
                    }
                });
            }
        }).reduce(result, new Func2<List<R>, List<R>, List<R>>() {
            @Override public List<R> call(List<R> acc, List<R> list) {
                acc.addAll(list);
                return acc;
            }
        }).toBlocking().first();
        return result;
    }
}

demos

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import java.util.List;
import java.util.Map;

/**
 * 功能描述:
 * <p>
 * </p>
 *
 * @author : yuanchao.he
 * @version 1.0 2016-03-23
 * @since mobile-oppkit-server 1.0
 */
public class Demo {
    private static ObservableExecutor observableExecutor = ObservableExecutor.INSTANCE();

    public static void main(String[] args) {

        List<Integer> numbers = Lists.newLinkedList();
        for(int i=500;i<=1000;i++){
            numbers.add(i);
        }

        Map<Integer, String> result1 = observableExecutor.executeObservable(numbers,
            new Object2Map<Integer, Integer, String>() {
                @Override
                public Map<Integer, String> call(Integer integer) {
                    Map<Integer, String> map = Maps.newHashMap();
                    map.put(integer, String.valueOf(integer*integer%1000));
                    return map;
                }
            });

        System.out.println(result1);
        /**
         * 將數(shù)字轉(zhuǎn)換為字符串,每個線程處理一個數(shù)字轉(zhuǎn)換创千,并以list的結(jié)構(gòu)返回
         */
        List<String> result2 = observableExecutor.executeObservable(numbers,
            new Object2List<Integer, String>() {
                @Override
                public List<String> call(Integer integer) {
                    List<String> result = Lists.newLinkedList();
                    result.add(String.valueOf(integer*integer%1000));
                    return result;
                }
            });

        System.out.println(result2);

        /**
         * 將數(shù)字轉(zhuǎn)換為字符串缰雇,對 numbers 分組,每組2個元素追驴,每個線程處理一組械哟,并以map結(jié)構(gòu)返回
         */
        Map<Integer, String> result3 = observableExecutor.executeObservable(numbers, 2,
            new List2Map<Integer, Integer, String>() {
                @Override
                public Map<Integer, String> call(List<Integer> list) {
                    Map<Integer, String> map = Maps.newHashMap();
                    for (Integer integer : list) {
                        map.put(integer, String.valueOf(integer*integer%1000));
                    }
                    return map;
                }
            });
        System.out.println(result3);

        /**
         * 將數(shù)字轉(zhuǎn)換為字符串 對 numbers 分組,每組2個元素殿雪,每個線程處理一組暇咆,并以list結(jié)構(gòu)返回
         */
        List<String> result4 = observableExecutor.executeObservable(numbers, 2,
            new List2List<Integer, String>() {
                @Override
                public List<String> call(List<Integer> list) {
                    List<String> result = Lists.newLinkedList(Lists.transform(list,
                        new Function<Integer, String>() {
                            @Override
                            public String apply(Integer input) {
                                return String.valueOf(input*input%1000);
                            }
                        }));
                    return result;
                }
            });
        System.out.println(result4);
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市丙曙,隨后出現(xiàn)的幾起案子爸业,更是在濱河造成了極大的恐慌,老刑警劉巖亏镰,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件扯旷,死亡現(xiàn)場離奇詭異,居然都是意外死亡索抓,警方通過查閱死者的電腦和手機(jī)钧忽,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進(jìn)店門毯炮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人耸黑,你說我怎么就攤上這事桃煎。” “怎么了崎坊?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵备禀,是天一觀的道長。 經(jīng)常有香客問我奈揍,道長,這世上最難降的妖魔是什么赋续? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任男翰,我火速辦了婚禮,結(jié)果婚禮上纽乱,老公的妹妹穿的比我還像新娘蛾绎。我一直安慰自己,他們只是感情好鸦列,可當(dāng)我...
    茶點故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布租冠。 她就那樣靜靜地躺著,像睡著了一般薯嗤。 火紅的嫁衣襯著肌膚如雪顽爹。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天骆姐,我揣著相機(jī)與錄音镜粤,去河邊找鬼。 笑死玻褪,一個胖子當(dāng)著我的面吹牛肉渴,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播带射,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼同规,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了窟社?” 一聲冷哼從身側(cè)響起券勺,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎桥爽,沒想到半個月后朱灿,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡钠四,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年盗扒,在試婚紗的時候發(fā)現(xiàn)自己被綠了跪楞。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡侣灶,死狀恐怖甸祭,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情褥影,我是刑警寧澤池户,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布,位于F島的核電站凡怎,受9級特大地震影響校焦,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜统倒,卻給世界環(huán)境...
    茶點故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一寨典、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧房匆,春花似錦耸成、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至岳链,卻和暖如春花竞,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背宠页。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工左胞, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人举户。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓烤宙,卻偏偏與公主長得像,于是被迫代替她去往敵國和親俭嘁。 傳聞我的和親對象是個殘疾皇子躺枕,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)供填,斷路器拐云,智...
    卡卡羅2017閱讀 134,657評論 18 139
  • 本篇文章介主要紹RxJava中操作符是以函數(shù)作為基本單位,與響應(yīng)式編程作為結(jié)合使用的近她,對什么是操作叉瘩、操作符都有哪些...
    嘎啦果安卓獸閱讀 2,860評論 0 10
  • Android 自定義View的各種姿勢1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 172,133評論 25 707
  • 商婷婷曾經(jīng)是我為之驕傲的一個學(xué)生。 商婷婷是10級學(xué)生泳桦,大學(xué)期間叱咤風(fēng)云汤徽,她讓我驕傲的不是成績。學(xué)的會計專業(yè)灸撰,卻對...
    古古飛閱讀 1,802評論 13 35
  • 一谒府、保證不曠課,不缺交作業(yè)浮毯,課前認(rèn)真準(zhǔn)備完疫,上課認(rèn)真聽講,課下認(rèn)真自習(xí)亲轨! 二趋惨、確保人身安全,沒事不出校門惦蚊,不單獨...
    少村閱讀 173評論 0 0