java多線程與高并發(fā)(九)線程池與源碼解讀

1.回顧

之前說的Executor作用是把線程的定義和執(zhí)行分開半哟,主要是用來做線程的執(zhí)行接口训唱,在他下面還有一個控制著線程生命周期的ExecutorService,然后才是各種各樣的ThreadPoolExecutor拐辽,把線程池作為一個執(zhí)行的單元,給他單獨出一個類育韩,下面是他的七個參數(shù)
corePoolSize 核心線程數(shù)
maxmumPoolSize 最大線程數(shù)
keepAliveTime 生存時間
TimeUnit 生存時間的單位
BlockingQueue 任務(wù)隊列
**ThreadFactory **線程工廠
RejectStrategy 拒絕策略(Abort 拋異常 Discard扔掉 不拋異常 DiscardOldest 扔掉排隊時間最久的,CallerRuns 調(diào)用處理者處理服務(wù))

2.jdk自帶線程池

今天我們來看看JDK給我們提供了一些默認線程池的實現(xiàn)闺鲸,默認的常用的有哪些筋讨,然后來讀讀ThreadPoolExecutor源碼
所有的線程池都是繼承ExecutorService的,所以Executors是對線程執(zhí)行的工具類摸恍,可以看做是線程的工廠悉罕,產(chǎn)生各種各樣的線程池

2.1.SingleThreadPool

先來看第一個SingleThreadPool ,看這個名字就覺得只有一個線程,這個一個線程的線程池可以保證扔進去的任務(wù)是順序執(zhí)行的

package com.learn.thread.eight;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestSingleThreadPool {
    private static ExecutorService service = Executors.newSingleThreadExecutor();


    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            final int j = i;
            // 順序執(zhí)行的
            service.execute(() -> {
                System.out.println(j + " " +Thread.currentThread().getName());
            });
            
        }
    }
}

SingleThreadPool.png

2.2.CacheThreadPool

我們來看第二種立镶,看他源碼實際上是跟之前的SingleThreadPool一樣壁袄,底層是還是ThreadPoolExecutor


CachePool.png

沒有核心線程數(shù),最大線程可以很Intger的最大值個媚媒,如果60秒沒人理他嗜逻,自動被回收
任務(wù)隊列用的是SynchronousQueue 不是用來存數(shù)據(jù)的,用來傳遞消息的缭召,如果任務(wù)沒有被執(zhí)行栈顷,就會被阻塞
用的是默認線程工廠
沒有指定拒絕策略,用默認拒絕策略
可以看出CachePool的特點嵌巷,就是你來一個任務(wù)我啟動一個線程萄凤。啟動線程的邏輯如下
如果線程沒有被回收,就去看當前線程池的線程是不是有空閑的線程搪哪,如果有就執(zhí)行讓它去執(zhí)行任務(wù)靡努。如果沒有,就自己new 一個線程去執(zhí)行噩死,原因是隊列是SynchronousQueue 颤难,它必須保證它的大小為0,所以你來一個任務(wù)必須有一個線程去執(zhí)行已维,不然別的線程提交任務(wù)就統(tǒng)統(tǒng)阻塞了
來看這個小程序行嗤,首先將線程池service打印出來,最后又打印一遍線程池services垛耳,然后任務(wù)是睡眠500毫秒

package com.learn.thread.eight;

import com.sun.tools.internal.ws.wsdl.document.soap.SOAPUse;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author zglx
 */
public class TestCachePool {
    private static ExecutorService service = Executors.newCachedThreadPool();

    public static void main(String[] args) throws InterruptedException {
        System.out.println(service);
        for (int i = 0; i < 2; i++) {
            // 順序執(zhí)行的
            Thread.sleep(100);
            service.execute(() -> {
                try {
                    Thread.sleep(0);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println(service);
        Thread.sleep(800);
        System.out.println(service);
    }


}

注意線程存活的時間是60S栅屏,所以第一個線程被復(fù)用了

2.3.FixedThreadPool

fixed是固定的含義,就是固定一個線程數(shù)堂鲜。


FixedThreadPool.png

fixedThreadPool指定一個參數(shù)栈雳,到底有多少線程,核心線程數(shù)和最大線程數(shù)是固定的缔莲,所以不存在回收之說哥纫,但是這里用的是LinkedBlockingQueue,這里一定要小心痴奏,因為是不建議使用的蛀骇,因為是會造成內(nèi)存泄漏

但是用fixedThreadPool有一個好處厌秒,可以進行并行的計算
并行與并發(fā) 并發(fā)是指任務(wù)的提交、并行是指任務(wù)執(zhí)行擅憔,并行是并發(fā)的子集鸵闪,并行是多個cpu可以同時進行處理,并發(fā)是多個任務(wù)同時過來暑诸。

我們看下面一個程序蚌讼,用多線程計算1-200000中的質(zhì)數(shù),可以對這個區(qū)間進行分組

package com.learn.thread.eight;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class TestFixedThreadPool {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        List<Integer> list = getPrime(1,200000);
        list.forEach(System.out::println);
        long end = System.currentTimeMillis();
        System.out.println("time " + (end - start));

        start = System.currentTimeMillis();
        int num = 4;
        ExecutorService service = Executors.newFixedThreadPool(num);
        Future<List<Integer>> future1 = service.submit(new Mytask(1, 80000));
        Future<List<Integer>> future2 = service.submit(new Mytask(80001, 160000));
        Future<List<Integer>> future3 = service.submit(new Mytask(160001, 200000));
        future2.get().addAll(future3.get());
        future1.get().addAll(future2.get());
        future1.get().forEach(System.out::println);
        end = System.currentTimeMillis();
        System.out.println("time " + (end - start));
    }

    public static List<Integer> getPrime(Integer start, Integer end) {
        List<Integer> list = new ArrayList<>(100);
        for (Integer i = start; i <= end; i++) {
            if (isPrime(i)) {
                list.add(i);
            }
        }

        return list;
    }

    public static boolean isPrime(Integer num) {
        for (int i = 2; i <= num/2; i++) {
            if (num % i == 0) {
                return false;
            }
        }
        return true;
    }

    static class Mytask implements Callable<List<Integer>> {
        int start;
        int end;
        public Mytask() {
        }

        public Mytask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        public List<Integer> call() throws Exception {
            List<Integer> result = getPrime(start, end);
            return result;
        }
    }
}

2.4.cacahe vs fixed

什么時候用cache 什么時候用fixed 个榕,這得看你的業(yè)務(wù)量篡石,如果線程池的線程太多,他們就會競爭稀缺的處理器和內(nèi)存資源,浪費大量的時候在上下文切換笛洛,反之如果線程太少夏志,處理器就可能無法充分利用。
建議:線程池大小與處理器的利用率之比可以使用公式來進行估算

線程池 = 你有多少個CPU乘以cpu期望利用率 乘以 (1+W/C)W除C是等待時間與計算時間的比率
表達式為
Nthread = Ncpu * Ucpu * (1+W/C)

如果你的任務(wù)不確定是否平穩(wěn)苛让,但是要保證任務(wù)來的時候有線程去執(zhí)行沟蔑,那我們就可以用cache,當然你要保證這個任務(wù)不會堆積狱杰。

假如你大概估了線程的值瘦材,這個值完全可以處理任務(wù),我可以直接New 一個線程來執(zhí)行仿畸,那就用fixed食棕,但是阿里不建議這么使用

2.5.ScheduledThreadPool

定時任務(wù)線程池,就是一個定時器任務(wù)错沽,隔一段時間后執(zhí)行簿晓,這個就是我們專門用來執(zhí)行定時任務(wù)的一個線程池。


ScheduledThreadPool.png

這里super調(diào)用的是ThreadPoolExecutor 本質(zhì)上還是ThreadPoolExecutor千埃,它最大線程數(shù)也是Integer的最大值憔儿,用的隊列是DelayedWorkQueue。
它有一些好用的方法放可,比如scheduleAtFixedRate間隔多長時間在一個固定頻率執(zhí)行一次這個任務(wù)谒臼,可用通過這樣靈活的時間配置。
第一個參數(shù)是Delay耀里,第一次執(zhí)行任務(wù)在此之后多長時間
第二個參數(shù)period間隔多長時間
第三個參數(shù)是時間單位

package com.learn.thread.eight;

import com.learn.thread.five.ExecutorService;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TestScheduledPool {
    private static ScheduledExecutorService service = Executors.newScheduledThreadPool(10);

    public static void main(String[] args) {
        // 5秒打印一次線程名字
        service.scheduleAtFixedRate(() -> {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName());
        }, 0, 5, TimeUnit.SECONDS);
    }
}

假如有一個鬧鐘服務(wù)蜈缤,假如有十億人訂閱了這個鬧鐘,意味著冯挎,每天早上會有10億的并發(fā)量底哥,你怎么優(yōu)化
思想就是把這個定時的任務(wù)分發(fā)到很多個邊緣的服務(wù)器上,假如說你有一臺服務(wù)器,你也是要用到多線程去消費叠艳,總之就是一個分而治之的思想

SingleThreadPool 只有一個線程的線程池
FixedThreadPool 固定多少個線程
CacheThreadPool 有彈性的線程池奶陈,有一個啟動一個,只要沒有空閑的就啟動一個新的來執(zhí)行
ScheduleThreadPool 定時任務(wù)來執(zhí)行線程池

package com.learn.thread.eight;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

/**
 * 自定義線程池的拒絕策略
 */
public class TestRejectedHandler {
    private static ThreadFactory factory = new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread("t1");
        }
    };
    private static ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1),factory, new MyHandler());

    static class MyHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("asdasd");

        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            executorService.execute(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }

}

3.ThreadPoolExecutor

3.1.常用變量的解釋

// AtomicInteger是int類型附较,是32位。高3位代表線程狀態(tài)潦俺,低29位代表目前線程池有多少個線程數(shù)量拒课,這里把兩個值合二為一就是算了執(zhí)行效率更高一些,因為都需要線程同步事示,而同步一個值往往比同步一個值容易的多

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    // Integer.size為32早像,所以COUNT_BITS為29
    private static final int COUNT_BITS = Integer.SIZE - 3;

    // CAPACITY 線程允許的最大線程數(shù),1左移29位肖爵,然后減1卢鹦,就是2^29-1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // (線程有五種狀態(tài)按大小排序為  RUNNING -> SHUTDOWN  -> STOP -> TIDYING   -> TERMINATED)
    // 正常運行的
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 調(diào)用shutdown方法進去了shutdown狀態(tài)
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 調(diào)用shutdown馬上停止
    private static final int STOP       =  1 << COUNT_BITS;
    // 調(diào)用了shutdown然后這個線程也執(zhí)行完了,現(xiàn)在正在處理的過程叫做TIDYING
    private static final int TIDYING    =  2 << COUNT_BITS;
    // 整個線程全部結(jié)束
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 獲取線程池的狀態(tài) 通過按位與操作劝堪,低29位將全部變成0
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 獲取線程的數(shù)量冀自,通過按位與操作,高3位全部變成0
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 根據(jù)線程池狀態(tài)和線程池的線程數(shù)量生成ct1值
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    // 線程池狀態(tài)小于xx
private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
    // 線程池狀態(tài)大于等于xx
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

3.2.構(gòu)造方法

    // 構(gòu)造方法
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        // 基本類型參數(shù)校驗
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        // 空指針校驗
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        // 安全管理器
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        // 核心線程數(shù)
        this.corePoolSize = corePoolSize;
        // 最大線程池數(shù)
        this.maximumPoolSize = maximumPoolSize;
        // 
        this.workQueue = workQueue;
        // 根據(jù)傳入?yún)?shù)unit和keepAliveTime 將存活時間轉(zhuǎn)換為納秒存到變量keepAliveTime中
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        // 線程工廠
        this.threadFactory = threadFactory;
        // 策略
        this.handler = handler;
    }

3.3. 提交執(zhí)行task的過程

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // 1.判斷線程池活著的那些線程數(shù)是不是小于核心線程數(shù)秒啦,如果小于就addWorker添加一個線程熬粗。
        if (workerCountOf(c) < corePoolSize) {
            // addWorker創(chuàng)建線程,第二個參數(shù)表示是否創(chuàng)建核心線程
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 這里處理當前線程數(shù)超過核心線程數(shù)的邏輯
        // 2.先判斷當前線程池狀態(tài)余境,如果是在運行中驻呐,就把任務(wù)丟到隊列里邊去,否則拒絕任務(wù)
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 這里需要雙重判斷線程池的狀態(tài)芳来,因為這阻塞的過程中有可能線程池的狀態(tài)被改變了含末,
            // 如果不是Running狀態(tài),說明線程池執(zhí)行了shutdown操作即舌,就刪除此任務(wù)佣盒,并且拒絕
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果工作狀態(tài)的線程為0,說明沒有線程了或者核心線程數(shù)設(shè)置為0了侥涵,就添加非核心線程數(shù)
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 3.如果線程不是運行狀態(tài)沼撕,那就任務(wù)進入隊列,這里有3點需要注意
        // 線程池不是運行狀態(tài)芜飘,addWorker內(nèi)部會判斷線程池的狀態(tài)
        // 第二個參數(shù)表示是否創(chuàng)建核心線程
        // addWorker返回false說明任務(wù)執(zhí)行失敗务豺,需要拒絕任務(wù)
        else if (!addWorker(command, false))
            reject(command);
    }

3.4. addWorker源碼分析

addWorker涉及到了很多細節(jié),如果要讀懂每一個細節(jié)完全不必要嗦明,但是思想理解就行了笼沥,addWorker的意思就是添加線程,線程要存到容器里,往里頭添加線程任務(wù)的時候肯定是多個線程同時往里面扔的奔浅,所以一定要同步馆纳,但是追求效率,一般都是用自旋或者lock

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        // 兩層死循環(huán)就為了做一個事情汹桦,添加一個woker的數(shù)量加1
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 判斷線程池狀態(tài)滿足以下條件鲁驶,就返回
            // 1、線程池狀態(tài)大于SHUTDOWN
            // 2舞骆、線程池狀態(tài)等于SHUTDOWN并且firstTask執(zhí)行任務(wù)不為null,直接返回false
            // 3钥弯、線程池狀態(tài)等于SHUTDOWN,且隊列為空督禽,直接返回false
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            // 嵌套死循環(huán)
            for (;;) {
        
                int wc = workerCountOf(c);
                // 如果當前線程超過最大允許線程數(shù)脆霎,或者根據(jù)core狀態(tài),大于核心線程或者最大線程數(shù)狈惫,返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 嘗試把當前執(zhí)行線程數(shù)+1睛蛛,如果+1成功,打破循環(huán)
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 重新獲取當前線程池狀態(tài)
                c = ctl.get();  
                // 如果當前執(zhí)行線程數(shù)不等于之前讀取的數(shù)量胧谈,說明有別的線程加忆肾!成功了
                if (runStateOf(c) != rs)
                // 重試
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 下面的邏輯是創(chuàng)建一個線程去執(zhí)行任務(wù)
        // 是否執(zhí)行任務(wù)狀態(tài),true執(zhí)行成功
        boolean workerStarted = false;
        // 判斷是否假如addWorker狀態(tài) add加入成功
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 創(chuàng)建一個Worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // worker的添加必須是串行的,因此必須加鎖
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    // 重新檢查當前線程池狀態(tài)第岖,查詢當前執(zhí)行任務(wù)的數(shù)量
                    int rs = runStateOf(ctl.get());
                    // 如果rs小于SHUTDOWN难菌,或者rs為SHUTDOWN并且firstTask為null
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // workder已經(jīng)調(diào)用過了start方法,則不再創(chuàng)建worker蔑滓,拋出異常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // worker創(chuàng)建并添加workers成功
                        workers.add(w)郊酒;
                        // 更新largestPoolSize變量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    // 釋放鎖
                    mainLock.unlock();
                }
                // 啟動worker線程
                if (workerAdded) {
                    // 這里會調(diào)用worker的run方法,
                    // 實際上是調(diào)用執(zhí)行runWorker方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // worker線程啟動失敗键袱,說明線程池狀態(tài)發(fā)生了變化(關(guān)閉操作被執(zhí)行), 需要進行shutdown相關(guān)操作
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

3.5.線程池worker任務(wù)單元

來看看worker是個什么東西燎窘,看源碼是一個靜態(tài)內(nèi)部類Worker

  private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;

        // 1. 線程
        final Thread thread;
        // 2. 任務(wù)
        Runnable firstTask;
        // 3. 執(zhí)行次數(shù)
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
    
        // 線程池執(zhí)行的核心方法
          public void run() {
            runWorker(this);
        }
        }

3.6.核心線程執(zhí)行邏輯runWorker

這里有一個有意思的地方可以看到Worker 是實現(xiàn)了Runnable,它自己也有一個Runnbale和Thread蹄咖,你就把Worker當成一個工人褐健,工人有任務(wù)(Runnbale)和執(zhí)行能力(Thread),但是你得保證每一個工人執(zhí)行的任務(wù)是自己的,并且自己執(zhí)行完了以后澜汤,completedTasks要加1
所以當多線程添加任務(wù)的時候蚜迅,把當前線程復(fù)制一份給Thread和任務(wù)下發(fā)給工人的Runnbale,然后讓工人去lock俊抵,工人的lock 其實就是之前學(xué)過ReentrantLock的acquire方法谁不,加入鏈表,等待執(zhí)行徽诲,這樣子就完成了整個串行刹帕。

final void runWorker(Worker w) {
        // 1. 這里是用來自己實現(xiàn)方法的beforeExecute吵血,自己實現(xiàn)內(nèi)容
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 這里調(diào)用了unlock,其實也就調(diào)用了release(1)偷溺,執(zhí)行過程中允許線程被中斷
        w.unlock(); // allow interrupts
        // 判斷是否繼續(xù)自旋
        boolean completedAbruptly = true;
        try {
            // 這里判斷任務(wù)是否為存在或者隊列中的任務(wù)不為空蹋辅,注意如果從隊列取就會造成阻塞
            while (task != null || (task = getTask()) != null) {
            // CAS加鎖
                w.lock();
                // 判斷當前線程池狀態(tài)是否為停止,如果停止了挫掏,則中斷
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                // 這里用來擴展功能侦另,但是沒用到
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 這個工人開始執(zhí)行任務(wù)
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    // gc回收
                    task = null;
                    // 任務(wù)加1
                    w.completedTasks++;
                    // 釋放鎖
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 自旋操作退出,說明線程池結(jié)束
            processWorkerExit(w, completedAbruptly);
        }
    }

4.woker類總結(jié)

這個woker是一個Thread 也是一個Runnable 同樣也是一個AQS砍濒,先來說說為什么要有Runnable和Thread

4.1.為什么要有Runnable?

這里其實是用來記錄任務(wù)的淋肾,因為Woker 里邊有很多狀態(tài)需要根據(jù)當前任務(wù)去記錄,并且這個東西必須要在Thread中執(zhí)行

4.2.為什么要有Thread?

因為線程池有很多Woker去競爭爸邢,所以干脆就把Woker設(shè)計成AQS,一個線程處理一個當前任務(wù)拿愧,而不是說有其他worker執(zhí)行了不是自己的任務(wù)

4.3.submit和execute的區(qū)別是什么

方法定義的位置不同杠河,execute是在Executor執(zhí)行器中,而submit是在ExecutorService執(zhí)行服務(wù)中的
參數(shù)接收不同浇辜,execute接收的是Runnbale 券敌,submit接收的是Callable
作用不用,execute只是單純的執(zhí)行任務(wù)柳洋,submit可以把任務(wù)的結(jié)果帶出來待诅。

4.4.線程池大概執(zhí)行流程

核心線程數(shù)不夠,啟動核心線程
核心線程滿了熊镣,加入隊列
核心線程和隊列都滿了卑雁,addWorker 加入非核心線程

4.5.addWorker 做的事情

count 加1
真正的加入任務(wù)并且start
WorkStealingPool
WorkStealingPool 是另外一種線程,核心非常簡單绪囱,之前講的ThreadPoolExecutor線程是去一個任務(wù)的隊列里取任務(wù)测蹲。而WorkStealingPool是每一個線程都有自己的任務(wù)隊列,當一個線程執(zhí)行完以后鬼吵,會在別的線程任務(wù)隊列中偷任務(wù)

跟原來只有一個隊列的線程池相比扣甲,如果有某一個線程被占用了很長時間,然后任務(wù)隊列又特別的重齿椅,那其他線程只能空著琉挖,沒辦法幫到任務(wù)重的線程

源碼

    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

WorkStealingPool本質(zhì)上是一個ForkJoinPool

package com.learn.thread.eight;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class TestWorkStealingPool {
    private static final ExecutorService service = Executors.newWorkStealingPool();

    public static void main(String[] args) throws IOException {
        System.out.println(Runtime.getRuntime().availableProcessors());
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        service.execute(new R(2000));
        System.in.read();
    }

    static class R implements Runnable {
        int time;

        R(int time) {
            this.time = time;
        }
        @Override
        public void run() {
            try {
                TimeUnit.MILLISECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(time + " " + Thread.currentThread().getName());
        }
    }
}

image.png

5.ForkJoinPool

ForkJoinPool 適合把大任務(wù)切分成一個個小任務(wù)去執(zhí)行,小任務(wù)如果還是大涣脚,再切示辈,不一定是兩個,可以是多個涩澡,但是最終的結(jié)果就是多個小任務(wù)結(jié)果的匯總顽耳。這個過程就叫做join 坠敷,所以這種線程池叫做ForkJoinPool。
既然是可以分割的任務(wù)射富,那怎么定義任務(wù)呢膝迎,之前線程池執(zhí)行的任務(wù)就是Runnable。在這里胰耗,我們一般實現(xiàn)ForkJoinPool的時候需要定義成特定他的類型限次,這個類型又必須是可以分叉的任務(wù),這個任務(wù)就叫做ForkJoinTask,但是實際上這個ForkJoinTask又比較原始柴灯,我們可以用RecursiveAction卖漫,這里邊又有兩種。
第一個RecursiveAction遞歸赠群,稱為不帶返回值的任務(wù)羊始,因為我們可以把大任務(wù)分割成小任務(wù),小任務(wù)又可以分成小任務(wù)查描,一直到我滿意的條件為止突委,這其中就帶有遞歸的過程。等會來看看一個例子冬三,所以我要對一百萬個數(shù)進行求和匀油,單線程肯定很慢。
第二個RecursiveTask勾笆,叫做帶返回值的子任務(wù)

package com.learn.thread.eight;

import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;

public class TestForkJoinPool {
    private static int[] nums = new int[1000000];
    // 按5萬個一組敌蚜,進行分組
    private static int MAX_SIZE = 500000;

    static Random random = new Random();

    static {
        for (int i = 0; i < 1000000; i++) {
            nums[i] = random.nextInt(100);
        }
        System.out.println("-----" + Arrays.stream(nums).sum());
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 無返回參數(shù)子任務(wù)
        AddTask addTask = new AddTask(0, nums.length);
//      forkJoinPool.execute(addTask);

        AddTaskRet addTaskRet = new AddTaskRet(0, nums.length);
        forkJoinPool.invoke(addTaskRet);
        System.out.println(addTaskRet.join());
        System.in.read();
    }

    /**
     * 不帶返回值的ForkJoinPool
     */
    static class AddTask extends RecursiveAction {
        private int start;

        private int end;

        public AddTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected void compute() {
            // 如果是在分組內(nèi),開始計算窝爪,否則再分組
            if (end - start <= MAX_SIZE) {
                long sum = 0L;
                for (int i = start; i < end; i++) {
                    sum += nums[i];
                }
                System.out.println("from start" + start + "to end " + end + "sum =" + sum);
            } else {
                int middle = start + (end - start) / 2;
                AddTask addTask = new AddTask(start, middle);
                AddTask addTask1 = new AddTask(middle, end);
                addTask.fork();
                addTask1.fork();
            }
        }
    }

    /**
     * 不帶返回值的ForkJoinPool
     */
    static class AddTaskRet extends RecursiveTask<Long> {
        private int start;

        private int end;

        public AddTaskRet(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            // 如果是在分組內(nèi)弛车,開始計算,否則再分組
            if (end - start <= MAX_SIZE) {
                long sum = 0L;
                for (int i = start; i < end; i++) {
                    sum += nums[i];
                }
                return sum;
            }
            int middle = start + (end - start) / 2;
            AddTaskRet addTask = new AddTaskRet(start, middle);
            AddTaskRet addTask1 = new AddTaskRet(middle, end);
            addTask.fork();
            addTask1.fork();
            return addTask.join() + addTask1.join();
        }
    }
}

6.parallelStream

java8 有一個并行流酸舍,底層就是ForkJoinPool算法來實現(xiàn)的帅韧。
你可以把集合里面的內(nèi)容想像成一個個河流往外流,在流經(jīng)過某個地方的時候處理一下啃勉。
舉例:我們new 了一個1000000 數(shù)據(jù)的集合忽舟,然后判斷這些數(shù)是不是質(zhì)數(shù),foreach是lamdba表達式的一個流式處理淮阐,還是相當于一個遍歷循環(huán)叮阅。
但是parallelStream并行流是并行的來處理這個任務(wù)切分成一個個子任務(wù),所以跟foreach會有一個時間上的差距泣特。所以在互相之間線程不需要同步的時候浩姥,你可以用這種并行流來處理效率會高一些

package com.learn.thread.eight;

import com.google.common.collect.RangeMap;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class TestParallelStream {

    public static boolean isPrime(int num) {
        for (int i = 2; i < num/2; i++) {
            if (num % i ==0) {
                return false;
            }
        }
        return true;
    }
    public static void main(String[] args) {
        List<Integer> list = new ArrayList<>();
        Random random = new Random();
        for (int i = 0; i < 1000000; i++) {
            list.add(random.nextInt(1000000));
        }

        long start = System.currentTimeMillis();
        list.forEach(TestParallelStream::isPrime);
        long end = System.currentTimeMillis();
        System.out.println("ends" + (end - start));

        start = System.currentTimeMillis();
        list.parallelStream().forEach(TestParallelStream::isPrime);
        end = System.currentTimeMillis();
        System.out.println("ends" + (end - start));
    }
}

總結(jié)
線程池有兩種ThreadPoolService\ForkJoinPool
區(qū)別在于ThreadPoolService多個線程共享一個任務(wù)隊列,下面各個每個線程都有自己的任務(wù)隊列

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末状您,一起剝皮案震驚了整個濱河市勒叠,隨后出現(xiàn)的幾起案子兜挨,更是在濱河造成了極大的恐慌,老刑警劉巖眯分,帶你破解...
    沈念sama閱讀 212,816評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件拌汇,死亡現(xiàn)場離奇詭異,居然都是意外死亡弊决,警方通過查閱死者的電腦和手機噪舀,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來飘诗,“玉大人与倡,你說我怎么就攤上這事±ジ澹” “怎么了纺座?”我有些...
    開封第一講書人閱讀 158,300評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長溉潭。 經(jīng)常有香客問我比驻,道長,這世上最難降的妖魔是什么岛抄? 我笑而不...
    開封第一講書人閱讀 56,780評論 1 285
  • 正文 為了忘掉前任,我火速辦了婚禮狈茉,結(jié)果婚禮上夫椭,老公的妹妹穿的比我還像新娘。我一直安慰自己氯庆,他們只是感情好蹭秋,可當我...
    茶點故事閱讀 65,890評論 6 385
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著堤撵,像睡著了一般仁讨。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上实昨,一...
    開封第一講書人閱讀 50,084評論 1 291
  • 那天洞豁,我揣著相機與錄音,去河邊找鬼荒给。 笑死丈挟,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的志电。 我是一名探鬼主播曙咽,決...
    沈念sama閱讀 39,151評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼挑辆!你這毒婦竟也來了例朱?” 一聲冷哼從身側(cè)響起孝情,我...
    開封第一講書人閱讀 37,912評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎洒嗤,沒想到半個月后箫荡,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,355評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡烁竭,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,666評論 2 327
  • 正文 我和宋清朗相戀三年菲茬,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片派撕。...
    茶點故事閱讀 38,809評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡婉弹,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出终吼,到底是詐尸還是另有隱情镀赌,我是刑警寧澤,帶...
    沈念sama閱讀 34,504評論 4 334
  • 正文 年R本政府宣布际跪,位于F島的核電站商佛,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏姆打。R本人自食惡果不足惜良姆,卻給世界環(huán)境...
    茶點故事閱讀 40,150評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望幔戏。 院中可真熱鬧玛追,春花似錦、人聲如沸闲延。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽垒玲。三九已至陆馁,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間合愈,已是汗流浹背叮贩。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評論 1 267
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留想暗,地道東北人妇汗。 一個月前我還...
    沈念sama閱讀 46,628評論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像说莫,于是被迫代替她去往敵國和親杨箭。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,724評論 2 351

推薦閱讀更多精彩內(nèi)容