ThreadPool實(shí)戰(zhàn)應(yīng)用

在這里記錄下線程池的實(shí)際應(yīng)用場(chǎng)景蔚携。包括:
ExecutorService 使用(包含CountDownLatch使用,和一個(gè)CyclicBarrier的demo)
ForkJoinPool 使用 (包含RecursiveAction吃衅,RecursiveTask)
spring的threadPool 使用

  • ExecutorService 使用

拿一個(gè)很簡(jiǎn)單的需求來說,群發(fā)短信愚铡。如果給一個(gè)公司的全部客戶群發(fā)留瞳。中小型公司的客戶也有幾十萬。如果用單線程來執(zhí)行件舵,很慢。
所以這里就拿多線程來實(shí)現(xiàn)這個(gè)群發(fā)短信的需求脯厨。(當(dāng)然只是demo)
看代碼

    /**
     * 暫且把這個(gè)方法看成一個(gè)發(fā)短信的方法铅祸。
     * 發(fā)送一個(gè)短信,然后保存一條發(fā)短信的記錄合武。
     * @param user
     * @throws Exception
     */
    @Override
    public void sendMessage(User user) throws Exception{
        MessageRecord messageRecord = new MessageRecord();
        try {
            messageRecord.setUserId(user.getId());
            messageRecord.setUserPhone(user.getPhone());
            messageRecord.setMessageContent(user.getName()+",你好临梗,這是你的短信。");
            messageRecord.setCurrentTimeMillis(System.currentTimeMillis()+"");
            messageRecordMapper.insertSelective(messageRecord);
            System.out.println("給用戶id:["+user.getId()+"]發(fā)送短信稼跳。當(dāng)前線程:["+Thread.currentThread().getName()+"]");
        }catch (Exception e){
            throw new Exception("發(fā)送短信異常");
        }

    }
/**
 * 發(fā)短信的線程盟庞。
 * 簡(jiǎn)單說下,Callable 和Runnable 的區(qū)別汤善,
 * Callable有返回結(jié)果什猖,帶泛型;泛型為返回結(jié)果的類型红淡,
 * Runnable 沒有返回結(jié)果不狮。
 * 下邊這個(gè)線程有返回結(jié)果,就不寫了在旱。
 */
public class MessageCallable implements Callable{
    //需要被操作的用戶
    private User user;
    //服務(wù)類
    private UserService userService;
    //倒數(shù)計(jì)數(shù)器摇零,為了阻塞主線程
    private CountDownLatch latch;

    /**
     * 有參構(gòu)造器,為了給這三個(gè)賦值
     * @param user
     * @param userService
     * @param latch
     */
    public MessageCallable(User user,UserService userService,CountDownLatch latch){
        this.user = user;
        this.userService = userService;
        this.latch = latch;
    }

    @Override
    public Object call() throws Exception{
        try {
            //調(diào)用發(fā)短信方法桶蝎。
            userService.sendMessage(user);
        }finally {
            //必須執(zhí)行在finally中驻仅。
            //如果 沒有 countDown  會(huì)導(dǎo)致主線程在子線程都執(zhí)行后也阻塞
            latch.countDown();
            //可以把返回結(jié)果在這里返回
            return null;
        }
    }
}

Executors 可以通過工廠方法創(chuàng)建好幾種線程池谅畅,這里說下幾種典型的。

1噪服,newCachedThreadPool創(chuàng)建一個(gè)可緩存線程池毡泻,如果線程池長(zhǎng)度超過處理需要,可靈活回收空閑線程芯咧,若無可回收牙捉,則新建線程。
2敬飒,newFixedThreadPool 創(chuàng)建一個(gè)定長(zhǎng)線程池邪铲,可控制線程最大并發(fā)數(shù),超出的線程會(huì)在隊(duì)列中等待无拗。
3带到,newScheduledThreadPool 創(chuàng)建一個(gè)定長(zhǎng)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行英染。
4揽惹,newSingleThreadExecutor 創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來執(zhí)行任務(wù)四康,保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行搪搏。

(在JDK1.8中,還有幾種闪金,無非是基于上述4個(gè)線程池的疯溺。除了newWorkStealingPool,這個(gè)一會(huì)說)

上述4個(gè)線程池哎垦,都是基于這個(gè)構(gòu)造器的

  • corePoolSize:線程池基本大小

  • maximumPoolSize:線程池允許創(chuàng)建的最大線程數(shù) (如果選用FixedThreadPool囱嫩,那么corePoolSize和maximumPoolSize都是構(gòu)造器中指定的大小。如果選用newCachedThreadPool漏设,那么corePoolSize為0墨闲,maximumPoolSize為Integer最大值)

  • keepAliveTime :線程池的工作線程空閑后,保持存活的時(shí)間郑口。

  • timeUnit :時(shí)間單位

  • workQueue(任務(wù)隊(duì)列) : 用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列鸳碧。可以選擇以下幾個(gè)阻塞隊(duì)列:

    • ArrayBlockingQueue:是一個(gè)基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列潘酗,按FIFO原則進(jìn)行排序
    • LinkedBlockingQueue:一個(gè)基于鏈表結(jié)構(gòu)的阻塞隊(duì)列杆兵,吞吐量高于ArrayBlockingQueue。靜態(tài)工廠方法
      Excutors.newFixedThreadPool()使用了這個(gè)隊(duì)列
    • SynchronousQueue: 一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列仔夺。每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作琐脏,否則
      插入操作一直處于阻塞狀態(tài),吞吐量高于LinkedBlockingQueue,靜態(tài)工廠方法
      Excutors.newCachedThreadPool()使用了這個(gè)隊(duì)列
    • PriorityBlockingQueue:一個(gè)具有優(yōu)先級(jí)的無限阻塞隊(duì)列日裙。
  • threadFactory:線程工廠吹艇,可以給該線程池起名字

  • RejectedExecutionHandler :當(dāng)隊(duì)列和線程池都滿了,說明線程池處于飽和狀態(tài)昂拂,那么必須采取一種策略還處理新提交的任務(wù)受神。它可以有如下四個(gè)選項(xiàng):

    • AbortPolicy:直接拋出異常,默認(rèn)情況下采用這種策略
    • CallerRunsPolicy:只用調(diào)用者所在線程來運(yùn)行任務(wù)
    • DiscardOldestPolicy:丟棄隊(duì)列里最近的一個(gè)任務(wù)格侯,并執(zhí)行當(dāng)前任務(wù)
    • DiscardPolicy:不處理鼻听,丟棄掉

構(gòu)造器參數(shù)參考文獻(xiàn):https://blog.csdn.net/u010723709/article/details/50377543

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {

下邊繼續(xù)看代碼。這里使用的是newFixedThreadPool 定長(zhǎng)線程池

   /**
     * threads   time(ms)
     * fix30     -  3485
     * 30     -  2995
     * 30     -  3092
     * 10     -  2890
     * 10     -  2976
     * 10     -  3023
     * 4      -  3584
     * 4      -  3775
     * 4      -  3781
     * cache  -  14025
     * cache  -  7093
     * cache  -  10235
     * single -  8950
     * single -  8948
     * single -  9364
     *
     * 注:本機(jī)器cpu為4核联四,上述為幾個(gè)線程池處理10000條用戶的效率對(duì)比
     * @return
     */
    @Override
    public Map massTextingByThreadPoolExecutorAndLatch() {
        Map<String,Object> resultMap = new HashMap(2);
        //創(chuàng)建定長(zhǎng)線程池
        ExecutorService executorService = Executors.newFixedThreadPool(8);
        //聲明倒數(shù)計(jì)數(shù)器
        CountDownLatch latch = null;
        //要處理的用戶
        List<User> users = null;
        long start = System.currentTimeMillis();
        try {
            users = this.findAllUser();
            if(users==null || users.size()==0){
                resultMap.put("isSuccess",false);
                resultMap.put("message","沒有用戶");
            }else{
                //通過構(gòu)造器創(chuàng)建 user總數(shù)的通過CountDownLatch
                latch = new CountDownLatch(users.size());
                for (User user : users){
                    //循環(huán)執(zhí)行撑碴。
                    executorService.submit(new MessageCallable(user,this,latch));
                }
                //主線程阻塞等待所有的子線程循環(huán)執(zhí)行完畢users.size()的數(shù)量
                //如果,子線程中的CountDownLatch沒有countDown朝墩。await 會(huì)一直等待醉拓,
                //當(dāng)然也可以使用  latch(long timeout, TimeUnit unit)這個(gè)方法來規(guī)定阻塞多少時(shí)間。
                latch.await();
                resultMap.put("isSuccess",true);
                resultMap.put("message","發(fā)送成功");
            }
        }catch (Exception e){
            resultMap.put("isSuccess",false);
            resultMap.put("message","系統(tǒng)異常");
        }finally {

            System.out.println("耗時(shí):["+ (System.currentTimeMillis()-start)+"]毫秒");
            return resultMap;
        }

    }

如果想要接收返回結(jié)果呢收苏。其實(shí)可以用CompletionService來完成亿卤。

CompletionService實(shí)際上可以看做是Executor和BlockingQueue的結(jié)合體。CompletionService在接收到要執(zhí)行的任務(wù)時(shí)鹿霸,通過類似BlockingQueue的put和take獲得任務(wù)執(zhí)行的結(jié)果排吴。CompletionService的一個(gè)實(shí)現(xiàn)是ExecutorCompletionService,ExecutorCompletionService把具體的計(jì)算任務(wù)交給Executor完成懦鼠。

在實(shí)現(xiàn)上傍念,ExecutorCompletionService在構(gòu)造函數(shù)中會(huì)創(chuàng)建一個(gè)BlockingQueue(使用的基于鏈表的無界隊(duì)列LinkedBlockingQueue),該BlockingQueue的作用是保存Executor執(zhí)行的結(jié)果葛闷。當(dāng)計(jì)算完成時(shí),調(diào)用FutureTask的done方法双藕。當(dāng)提交一個(gè)任務(wù)到ExecutorCompletionService時(shí)淑趾,首先將任務(wù)包裝成QueueingFuture,它是FutureTask的一個(gè)子類忧陪,然后改寫FutureTask的done方法扣泊,之后把Executor執(zhí)行的計(jì)算結(jié)果放入BlockingQueue中。

看實(shí)現(xiàn)代碼嘶摊,其實(shí)就多一行就行延蟹。
但是一般都多線程了。都不要返回結(jié)果了叶堆。

    public Map massTextingByThreadPoolExecutorAndLatchC() {
        Map<String,Object> resultMap = new HashMap(2);
        //創(chuàng)建定長(zhǎng)線程池
        ExecutorService executorService = Executors.newFixedThreadPool(8);
        //用完成服務(wù)來包一下線程池阱飘,用來接收結(jié)果。
        CompletionService cs = new ExecutorCompletionService(executorService);
        //聲明倒數(shù)計(jì)數(shù)器
        CountDownLatch latch = null;
        //要處理的用戶
        List<User> users = null;
        long start = System.currentTimeMillis();
        try {
            users = this.findAllUser();
            if(users==null || users.size()==0){
                resultMap.put("isSuccess",false);
                resultMap.put("message","沒有用戶");
            }else{
                //通過構(gòu)造器創(chuàng)建 user總數(shù)的通過CountDownLatch
                latch = new CountDownLatch(users.size());
                for (User user : users){
                    //循環(huán)執(zhí)行。通過cs來獲取結(jié)果
                    cs.submit(new MessageCallable(user,this,latch));
                }
                //主線程阻塞等待所有的子線程循環(huán)執(zhí)行完畢users.size()的數(shù)量
                //如果沥匈,子線程中的CountDownLatch沒有countDown蔗喂。await 會(huì)一直等待,
                //當(dāng)然也可以使用  latch(long timeout, TimeUnit unit)這個(gè)方法來規(guī)定阻塞多少時(shí)間高帖。
                latch.await();
                
                for(int i=0;i<users.size();i++){
                    //循環(huán)打印結(jié)果
                    //cs.take() 返回的是 Future 在get就行了缰儿。
                    System.out.println(cs.take().get());
                }
                resultMap.put("isSuccess",true);
                resultMap.put("message","發(fā)送成功");
            }
        }catch (Exception e){
            resultMap.put("isSuccess",false);
            resultMap.put("message","系統(tǒng)異常");
        }finally {

            System.out.println("耗時(shí):["+ (System.currentTimeMillis()-start)+"]毫秒");
            return resultMap;
        }

    }

CountDownLatch算是一個(gè)多線程的輔助類,該類利用倒數(shù)的方式來阻塞主線程散址,達(dá)到一種所有子線程執(zhí)行完畢在走主線程的目的乖阵。
和CountDownLatch類似功能的還有CyclicBarrier,該類用的是增加的方式在阻塞子線程预麸〉山可以讓子線程在某個(gè)節(jié)點(diǎn)阻塞,然后所有子線程執(zhí)行完畢后從該節(jié)點(diǎn)繼續(xù)運(yùn)行和執(zhí)行別的線程师崎。(和CountDownLatch最大的區(qū)別是 一個(gè)阻塞主線程默终,一個(gè)是阻塞子線程)

看一個(gè)CyclicBarrier運(yùn)用的demo吧。

/**
 * 子線程
 */
public class MessageCallableByBarrier implements Callable{
    private User user;

    private CyclicBarrier barrier;

    public MessageCallableByBarrier(User user,CyclicBarrier barrier){
        this.user = user;
        this.barrier = barrier;
    }

    @Override
    public Object call() throws Exception{

        System.out.println("我是用戶:"+user.getName()+",開始等待犁罩。");
        //此時(shí)開始阻塞子線程
        barrier.await();
        System.out.println("我是用戶:"+user.getName()+",全部等待完畢齐蔽。一起執(zhí)行");
        return null;
    }
}
    public Map massTextingByThreadPoolExecutorAndBarrier(){
        Map<String,Object> resultMap = new HashMap(2);
        List<User> users = this.findAllUser().subList(0,10);
        //創(chuàng)建定長(zhǎng)線程池
        ExecutorService executorService = Executors.newFixedThreadPool(users.size());
        //創(chuàng)建cyclicBarrier,
        //cyclicBarrier 有兩個(gè)構(gòu)造器床估,
        // CyclicBarrier(int parties, Runnable barrierAction)這個(gè)構(gòu)造器第一個(gè)參數(shù)是阻塞多少線程含滴,第二個(gè)參數(shù)是所有子線程等待完畢要執(zhí)行的線程
        CyclicBarrier cyclicBarrier = new CyclicBarrier(users.size(),()->
            System.out.println("在所有的子線程await之后執(zhí)行")
        );
        for(User user:users){
            executorService.submit(new MessageCallableByBarrier(user,cyclicBarrier));
        }
        System.out.println("主線程不會(huì)阻塞");
        return resultMap;
    }

輸出

主線程不會(huì)阻塞
我是用戶:dajiejie0,開始等待。
我是用戶:dajiejie9,開始等待丐巫。
我是用戶:dajiejie8,開始等待谈况。
我是用戶:dajiejie7,開始等待。
我是用戶:dajiejie6,開始等待递胧。
我是用戶:dajiejie5,開始等待碑韵。
我是用戶:dajiejie4,開始等待。
我是用戶:dajiejie3,開始等待缎脾。
我是用戶:dajiejie2,開始等待祝闻。
我是用戶:dajiejie1,開始等待。
在所有的子線程await之后執(zhí)行
我是用戶:dajiejie1,全部等待完畢遗菠。一起執(zhí)行
我是用戶:dajiejie0,全部等待完畢联喘。一起執(zhí)行
我是用戶:dajiejie9,全部等待完畢。一起執(zhí)行
我是用戶:dajiejie8,全部等待完畢辙纬。一起執(zhí)行
我是用戶:dajiejie7,全部等待完畢豁遭。一起執(zhí)行
我是用戶:dajiejie5,全部等待完畢。一起執(zhí)行
我是用戶:dajiejie4,全部等待完畢贺拣。一起執(zhí)行
我是用戶:dajiejie3,全部等待完畢蓖谢。一起執(zhí)行
我是用戶:dajiejie6,全部等待完畢捂蕴。一起執(zhí)行
我是用戶:dajiejie2,全部等待完畢。一起執(zhí)行
  • ForkJoinPool使用

forkJoinPool 是一個(gè)分拆/合并的線程池蜈抓。他可以充分利用CPU把一個(gè)大任務(wù)拆成多個(gè)小任務(wù)启绰。多個(gè)小任務(wù)執(zhí)行完后在合并起來。
該線程池是JDK1.7后加入的沟使。也是并發(fā)大神Doug Lea寫的委可。
forkJoinPool使用了工作竊取算法。既當(dāng)一個(gè)線程空閑時(shí)會(huì)去幫助別的線程從尾端執(zhí)行任務(wù)腊嗡。ForkJoinPool的效率是略高于ThreadPoolExecutor的着倾,但是CPU占用率會(huì)高很多,也會(huì)產(chǎn)生大量的GC(可以使用jconsole等工具觀察)燕少。具體選擇使用的話要看情況卡者。

要使用forkJoinPool的話,一定要會(huì)用到RecursiveAction 和RecursiveTask
從名字就看出來這是個(gè)遞歸客们。
RecursiveAction 沒返回結(jié)果
RecursiveTask 返回結(jié)果為泛型的類型
注:看這個(gè)的時(shí)候崇决,我的同事syq給了我很大的幫助,謝謝哦底挫。

還是拿上邊那個(gè)發(fā)短信的需求來說 看代碼

public class MessageAction extends RecursiveAction {
    //用戶服務(wù)
    private UserService userService;
    //臨界值
    private static final int COUNT = 500;
    //需要被處理的數(shù)據(jù)
    private List<User> users;
    //通過構(gòu)造器傳參
    public MessageAction(List<User> users, UserService userService){
        super();
        this.users = users;
        this.userService = userService;
    }

    //該方法是必須實(shí)現(xiàn)的恒傻。
    @Override
    protected void compute() {
        //如果users.size() 沒有拆分到臨界值,那么繼續(xù)拆分
        if(users.size()>COUNT){
            //用了二分查找來拆分建邓,一個(gè)從中間向左找盈厘,一個(gè)從中間向右找。一直遞歸到小于臨界值
            int middle=users.size()/2;
            MessageAction left = new MessageAction(users.subList(0,middle),userService);
            MessageAction right = new MessageAction(users.subList(middle,users.size()),userService);
            left.fork();
            right.fork();
        }else{
            try {  
                //小于臨界值后執(zhí)行
                userService.massTexting(users);
            }catch (Exception e){

            }
        }
    }
}
/**
     /**
     * 2509
     * 2394
     * 2161
     * 2467
     * 注:由于 這個(gè)沒有countDownLatch 阻塞主線程官边,所以不能用單元測(cè)試來跑沸手,而且統(tǒng)計(jì)下來的執(zhí)行時(shí)間 要從數(shù)據(jù)庫看
     * @return
     */
    @Override
    public Map massTextingByForkJoinPool() {
        Map<String,Object> resultMap = new HashMap(2);
        //一般就用這種創(chuàng)建方式就行。這種創(chuàng)建方式會(huì)創(chuàng)建一個(gè) cpu核數(shù)-1 的線程池注簿,是最合理的契吉。
        //用new ForkJoinPool()也可以。會(huì)創(chuàng)建一個(gè) cpu核數(shù) 的線程池
        ForkJoinPool pool = ForkJoinPool.commonPool();
        List<User> users = null;
        long start = System.currentTimeMillis();
        try {
            users = this.findAllUser();
            if(users==null || users.size()==0){
                resultMap.put("isSuccess",false);
                resultMap.put("message","沒有用戶");
            }else{
                //在這里執(zhí)行運(yùn)行就可以诡渴,execute沒有返回結(jié)果栅隐,submit和invoke有返回結(jié)果。
                pool.execute(new MessageAction(users,this));
                resultMap.put("isSuccess",true);
                resultMap.put("message","發(fā)送成功");
            }
        }catch (Exception e){
            resultMap.put("isSuccess",false);
            resultMap.put("message","系統(tǒng)異常");
            return resultMap;
        }finally {
            pool.shutdown();
            return resultMap;
        }

    }

在看下有返回結(jié)果的玩徊。還是拿發(fā)短信來說,我要返回發(fā)送過短信的用戶id的總和

public class MessageTask extends RecursiveTask<Integer>{
    //用戶服務(wù)
    private UserService userService;
    //臨界值
    private static final int COUNT = 500;
    //需要發(fā)短信的用戶
    private List<User> users;
    
    public MessageTask(List<User> users, UserService userService){
        super();
        this.users = users;
        this.userService = userService;
    }

    @Override
    protected Integer compute() {
        if(users.size()>COUNT){
            int middle=users.size()/2;
            MessageTask left = new MessageTask(users.subList(0,middle),userService);
            MessageTask right = new MessageTask(users.subList(middle,users.size()),userService);
            left.fork();
            right.fork();
            //返回左半部分和右半部分相加的結(jié)果
            //返回結(jié)果其實(shí)就是 join  有return  就是與 RecursiveAction最大的區(qū)別
            return left.join()+right.join();
        }else{
            try {
                return userService.massTexting(users);
            }catch (Exception e){

            }
            return null;
        }
    }
}
    /**
     * @return
     * 有返回結(jié)果的forkJoinPool
     */
    @Override
    public Map massTextingByForkJoinPoolByTask() {
        Map<String,Object> resultMap = new HashMap(2);
        ForkJoinPool pool = new ForkJoinPool();
        List<User> users = null;
        Integer sum = null;
        long start = System.currentTimeMillis();
        try {
            users = this.findAllUser();
            if(users==null || users.size()==0){
                resultMap.put("isSuccess",false);
                resultMap.put("message","沒有用戶");
            }else{
                //invoke 和submit的區(qū)別在于 invoke是同步的谨究。
                sum = pool.invoke(new MessageTask(users, this));
                resultMap.put("isSuccess",true);
                resultMap.put("message","發(fā)送成功");
            }
        }catch (Exception e){
            resultMap.put("isSuccess",false);
            resultMap.put("message","系統(tǒng)異常");
            return resultMap;
        }finally {
            pool.shutdown();
            System.out.println(sum);
            System.out.println("耗時(shí):["+ (System.currentTimeMillis()-start)+"]毫秒");
            return resultMap;
        }
    }
上述 ThreadPoolExecutor 和ForkJoinPool 還是適合于半夜定時(shí)任務(wù)的應(yīng)用恩袱。那時(shí)候可以瘋狂的占用CPU,哈哈胶哲。高峰期 對(duì)于多線程的使用還是慎用畔塔。還有,拆分和線程的調(diào)度也是消耗效率的,不是使用多線程就一定會(huì)效率增高澈吨,還是看情況把敢。
  • Spring中的ThreadPool

在看一個(gè)spring中的threadPool, 很簡(jiǎn)單谅辣。使用情況也挺多的修赞。比如我一個(gè)查詢 ,需要查詢用戶列表和短信發(fā)送列表

看代碼

/**
 * 先繼承一個(gè)ThreadPoolTaskExecutor
 * 可以自己設(shè)置參數(shù)
 * 如果不設(shè)置的話桑阶,就用默認(rèn)參數(shù)
 * private int corePoolSize = 1;
 * private int maxPoolSize = 2147483647;
 * private int keepAliveSeconds = 60;
 * private int queueCapacity = 2147483647;
 * private boolean allowCoreThreadTimeOut = false;
 * 跟cachedPool挺像的  具體含義上邊有
 */
@Component
public class ThreadPoolForSpring extends ThreadPoolTaskExecutor {

}
    @Autowired
    private ThreadPoolForSpring threadPoolForSpring;



    public void getUserAndMessageRecord() throws Exception {
        //開啟異步
        Future<List<User>> usersFuture  = threadPoolForSpring.submit(()-> {
            System.out.println("當(dāng)前線程名稱:"+Thread.currentThread().getName());
            return this.findAllUser();
        });
        MessageRecordExample recordExample = new MessageRecordExample();

        List<MessageRecord> messageRecords = messageRecordMapper.selectByExample(recordExample);

        System.out.println("同步查詢messageRecords:"+messageRecords.size()+"條");
        System.out.println("異步查詢users:"+usersFuture.get().size()+"條");

    }

這就OK了柏副。 Spring的ThreadPool 也是基于JDK的ThreadPool的。設(shè)置起參數(shù)即可蚣录。


在給大家看個(gè)更騷的割择。
java8的并行流也是用ForkJoinPool實(shí)現(xiàn)的。并且主線程也加入了任務(wù)萎河。
在lambda效率那么垃圾的情況下荔泳,這個(gè)方法也只用了3S多點(diǎn)。

public void massTextingByParallelStream(){
        long start = System.currentTimeMillis();
        List<User> allUser = this.findAllUser();
        allUser.parallelStream().forEach(user -> {
            MessageRecord messageRecord = new MessageRecord();
            messageRecord.setUserId(user.getId());
            messageRecord.setUserPhone(user.getPhone());
            messageRecord.setMessageContent(user.getName()+",你好虐杯,這是你的短信玛歌。");
            messageRecord.setCurrentTimeMillis(System.currentTimeMillis()+"");
            messageRecordMapper.insertSelective(messageRecord);
            System.out.println("給用戶id:["+user.getId()+"]發(fā)送短信。當(dāng)前線程:["+Thread.currentThread().getName()+"]");
        });
        System.out.println("耗時(shí):["+ (System.currentTimeMillis()-start)+"]毫秒");
    }

github:https://github.com/zycisbg/thread-pool

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末厦幅,一起剝皮案震驚了整個(gè)濱河市沾鳄,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌确憨,老刑警劉巖译荞,帶你破解...
    沈念sama閱讀 218,858評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異休弃,居然都是意外死亡吞歼,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門塔猾,熙熙樓的掌柜王于貴愁眉苦臉地迎上來篙骡,“玉大人,你說我怎么就攤上這事丈甸∨此祝” “怎么了?”我有些...
    開封第一講書人閱讀 165,282評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵睦擂,是天一觀的道長(zhǎng)得湘。 經(jīng)常有香客問我,道長(zhǎng)顿仇,這世上最難降的妖魔是什么淘正? 我笑而不...
    開封第一講書人閱讀 58,842評(píng)論 1 295
  • 正文 為了忘掉前任摆马,我火速辦了婚禮,結(jié)果婚禮上鸿吆,老公的妹妹穿的比我還像新娘囤采。我一直安慰自己,他們只是感情好惩淳,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,857評(píng)論 6 392
  • 文/花漫 我一把揭開白布蕉毯。 她就那樣靜靜地躺著,像睡著了一般黎泣。 火紅的嫁衣襯著肌膚如雪恕刘。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,679評(píng)論 1 305
  • 那天抒倚,我揣著相機(jī)與錄音褐着,去河邊找鬼。 笑死托呕,一個(gè)胖子當(dāng)著我的面吹牛含蓉,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播项郊,決...
    沈念sama閱讀 40,406評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼馅扣,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了着降?” 一聲冷哼從身側(cè)響起差油,我...
    開封第一講書人閱讀 39,311評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎任洞,沒想到半個(gè)月后蓄喇,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,767評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡交掏,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年妆偏,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片盅弛。...
    茶點(diǎn)故事閱讀 40,090評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡钱骂,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出挪鹏,到底是詐尸還是另有隱情见秽,我是刑警寧澤,帶...
    沈念sama閱讀 35,785評(píng)論 5 346
  • 正文 年R本政府宣布讨盒,位于F島的核電站解取,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏催植。R本人自食惡果不足惜肮蛹,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,420評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望创南。 院中可真熱鬧伦忠,春花似錦、人聲如沸稿辙。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽邻储。三九已至赋咽,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間吨娜,已是汗流浹背脓匿。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評(píng)論 1 271
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留宦赠,地道東北人陪毡。 一個(gè)月前我還...
    沈念sama閱讀 48,298評(píng)論 3 372
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像勾扭,于是被迫代替她去往敵國(guó)和親毡琉。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,033評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容