Java線程池源碼分析

Java 線程池的使用作岖,是面試必問(wèn)的割以。下面我們來(lái)從使用到源碼整理一下义钉。

1、構(gòu)造線程池

  • 通過(guò)Executors來(lái)構(gòu)造線程池
1典唇、構(gòu)造一個(gè)固定線程數(shù)目的線程池,配置的corePoolSize與maximumPoolSize大小相同胯府,
同時(shí)使用了一個(gè)無(wú)界LinkedBlockingQueue存放阻塞任務(wù)介衔,因此多余的任務(wù)將存在阻塞隊(duì)列,
不會(huì)由RejectedExecutionHandler處理 
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }


2盟劫、構(gòu)造一個(gè)緩沖功能的線程池夜牡,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE侣签,
keepAliveTime=60s,以及一個(gè)無(wú)容量的阻塞隊(duì)列 SynchronousQueue塘装,因此任務(wù)提交之后,
將會(huì)創(chuàng)建新的線程執(zhí)行影所;線程空閑超過(guò)60s將會(huì)銷毀 
 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }


3蹦肴、構(gòu)造一個(gè)只支持一個(gè)線程的線程池,配置corePoolSize=maximumPoolSize=1猴娩,
無(wú)界阻塞隊(duì)列LinkedBlockingQueue阴幌;保證任務(wù)由一個(gè)線程串行執(zhí)行 
 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

4勺阐、構(gòu)造有定時(shí)/延時(shí)功能的線程池,配置corePoolSize矛双,無(wú)界延遲阻塞隊(duì)列DelayedWorkQueue渊抽;
有意思的是:maximumPoolSize=Integer.MAX_VALUE,由于DelayedWorkQueue是無(wú)界隊(duì)列议忽,
所以這個(gè)值是沒(méi)有意義的
對(duì)于一些不能及時(shí)處理懒闷,需要延時(shí)處理的操作,用ScheduledExecutorService處理很方便栈幸,
比如我們?cè)谀承l件下需要清理redis/mysql中數(shù)據(jù)時(shí)愤估,但是可能當(dāng)前有些地方還需要用到(并發(fā)),這時(shí)用ScheduledExecutorService處理非常合適速址,
雖然也可以用定時(shí)任務(wù)處理玩焰,但是定時(shí)任務(wù)會(huì)一直執(zhí)行,而這里的場(chǎng)景是滿足一定條件去執(zhí)行芍锚,而執(zhí)行的機(jī)會(huì)又很少昔园。
 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

public ScheduledThreadPoolExecutor(int corePoolSize,
                             ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

注:阻塞隊(duì)列與普通隊(duì)列的區(qū)別在于,當(dāng)隊(duì)列是空的時(shí)闹炉,從隊(duì)列中獲取元素的操作將會(huì)被阻塞蒿赢,
或者當(dāng)隊(duì)列是滿時(shí),往隊(duì)列里添加元素的操作會(huì)被阻塞渣触。
試圖從空的阻塞隊(duì)列中獲取元素的線程將會(huì)被阻塞羡棵,直到其他的線程往空的隊(duì)列插入新的元素。
同樣嗅钻,試圖往已滿的阻塞隊(duì)列中添加新元素的線程同樣也會(huì)被阻塞皂冰,直到其他的線程使隊(duì)列重新變得空閑起來(lái),
如從隊(duì)列中移除一個(gè)或者多個(gè)元素养篓,或者完全清空隊(duì)列.
  • 通過(guò)ThreadPoolExecutor自定義線程池
public ThreadPoolExecutor(int corePoolSize, 
                              int maximumPoolSize,  
                              long keepAliveTime,
                              TimeUnit unit,  
                              BlockingQueue<Runnable> workQueue, 
                              ThreadFactory threadFactory,  
                              RejectedExecutionHandler handler )
* corePoolSize 核心線程池大小----1
* maximumPoolSize 最大線程池大小----3
* keepAliveTime 線程池中超過(guò)corePoolSize數(shù)目的空閑線程最大存活時(shí)間----30
* keepAliveTime時(shí)間單位----TimeUnit.MINUTES 
* workQueue 阻塞隊(duì)列----new ArrayBlockingQueue<Runnable>(5)----阻塞隊(duì)列的容量是5
* threadFactory 新建線程工廠----new CustomThreadFactory()----定制的線程工廠 
* rejectedExecutionHandler 當(dāng)提交任務(wù)數(shù)超過(guò)maxmumPoolSize+workQueue之和(3+5)秃流,
      即當(dāng)提交第9個(gè)任務(wù)時(shí)(前面線程都沒(méi)有執(zhí)行完,此測(cè)試方法中用 sleep(30), 
      任務(wù)會(huì)交給RejectedExecutionHandler來(lái)處理 

new ThreadPoolExecutor(  
                1,  
                3,  
                30,  
                TimeUnit.MINUTES,  
                new ArrayBlockingQueue<Runnable>(5),  
                new CustomThreadFactory(),  
                new CustomRejectedExecutionHandler());  

package com.vendor.control.web.device;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by zhangkai on 2019/8/12.
 */
public class CustomThreadPoolExecutor
{
    private ThreadPoolExecutor pool = null;
    
    public void init() {
        pool = new ThreadPoolExecutor(
                1,
                3,
                30,
                TimeUnit.MINUTES,
                new ArrayBlockingQueue<Runnable>(5),
                new CustomThreadFactory(),
                new CustomRejectedExecutionHandler());
    }


    public void destory() {
        if(pool != null) {
            pool.shutdownNow();
        }
    }


    public ExecutorService getCustomThreadPoolExecutor() {
        return this.pool;
    }

    private class CustomThreadFactory implements ThreadFactory
    {

        private AtomicInteger count = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
            System.out.println(threadName);
            t.setName(threadName);
            return t;
        }
    }


    private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                // 當(dāng)使用blockingqueue的offer插入數(shù)據(jù)時(shí),如果隊(duì)列已滿柳弄,那么阻塞指定時(shí)間等待隊(duì)列可用舶胀,
//等待期間如果被中斷,那么拋出InterruptedException碧注。
                // 如果插入成功嚣伐,那么返回true,如果在達(dá)到指定時(shí)間后仍然隊(duì)列不可用萍丐,
//那么返回false轩端。===========超時(shí)退出
                // 使用put 時(shí),插入數(shù)據(jù)時(shí)逝变,如果隊(duì)列已滿基茵,那么阻塞等待隊(duì)列可用奋构,等待期間如果被中斷,
//那么拋出InterruptedException拱层。 =============  一直阻塞:
                System.out.println("拒絕任務(wù)");
                executor.getQueue().offer(r); //會(huì)有任務(wù)線程不執(zhí)行
                //executor.getQueue().put(r); //不會(huì)有任務(wù)線程不執(zhí)行
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }



    // 測(cè)試構(gòu)造的線程池
    public static void main(String[] args) {

        CustomThreadPoolExecutor exec = new CustomThreadPoolExecutor();
        // 1.初始化
        exec.init();

        ExecutorService pool = exec.getCustomThreadPoolExecutor();
        for(int i=1; i<=10; i++) {
            System.out.println("提交第" + i + "個(gè)任務(wù)!");
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(30);
                        System.out.println(">>>task is running=====");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }


        // 2.銷毀----此處不能銷毀,因?yàn)槿蝿?wù)沒(méi)有提交執(zhí)行完,如果銷毀線程池,任務(wù)也就無(wú)法執(zhí)行了
        // exec.destory();

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

image.png

2弥臼、線程池執(zhí)行流程

源碼

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // 1、工作線程 < 核心線程 
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2舱呻、運(yùn)行態(tài)醋火,并嘗試將任務(wù)加入隊(duì)列悠汽;如果能加入箱吕,說(shuō)明隊(duì)列沒(méi)滿
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        } // 3、工作線程 < 核心線程柿冲,并且隊(duì)列滿了茬高,那么繼續(xù)新建線程,嘗試使用最大線程運(yùn)行
        else if (!addWorker(command, false))
            reject(command);
    }
image.png

從上面這個(gè)圖中可以看出假抄,在創(chuàng)建了線程池后怎栽,默認(rèn)情況下,線程池中并沒(méi)有任何線程宿饱,而是等待有任務(wù)到來(lái)才創(chuàng)建線程去執(zhí)行任務(wù)熏瞄,除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個(gè)方法的名字就可以看出谬以,是預(yù)創(chuàng)建線程的意思强饮,即在沒(méi)有任務(wù)到來(lái)之前就創(chuàng)建corePoolSize個(gè)線程或者一個(gè)線程。默認(rèn)情況下为黎,在創(chuàng)建了線程池后邮丰,線程池中的線程數(shù)為0,當(dāng)有任務(wù)來(lái)之后铭乾,并且工作線程數(shù)<核心線程數(shù)時(shí)剪廉,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后炕檩,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中斗蒋;
在核心線程數(shù)創(chuàng)建以后,就不會(huì)再關(guān)閉了笛质,這個(gè)創(chuàng)建過(guò)程類似懶加載泉沾,只有需要用的時(shí)候才去創(chuàng)建

當(dāng)核心線程數(shù)執(zhí)行完其第一個(gè)任務(wù)以后,就會(huì)阻塞经瓷,等待從隊(duì)列中獲取任務(wù)(getTask)爆哑,獲取到的話,線程就繼續(xù)執(zhí)行任務(wù)舆吮。見(jiàn)下面runWorker源碼揭朝。

image.png

ThreadPoolExecutor執(zhí)行順序總結(jié):
當(dāng)線程數(shù)小于核心線程數(shù)時(shí)队贱,創(chuàng)建線程。
當(dāng)線程數(shù)大于等于核心線程數(shù)潭袱,且任務(wù)隊(duì)列未滿時(shí)柱嫌,將任務(wù)放入任務(wù)隊(duì)列。
當(dāng)線程數(shù)大于等于核心線程數(shù)屯换,且任務(wù)隊(duì)列已滿
若線程數(shù)小于最大線程數(shù)编丘,創(chuàng)建線程
若線程數(shù)等于最大線程數(shù),拋出異常彤悔,拒絕任務(wù)

image.png
image.png

簡(jiǎn)單的說(shuō)嘉抓,

  • addWorker(command, true): 創(chuàng)建核心線程執(zhí)行任務(wù);
  • addWorker(command, false):創(chuàng)建非核心線程執(zhí)行任務(wù)晕窑;
  • addWorker(null, false): 創(chuàng)建非核心線程抑片,當(dāng)前任務(wù)為空;
  • addWorker(null,true) : 預(yù)先創(chuàng)建corePoolSize個(gè)線程杨赤;

addWorker源碼

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //啟動(dòng)線程敞斋,執(zhí)行任務(wù),這里調(diào)用的其實(shí)是worker的run方法疾牲,見(jiàn)下面Worker構(gòu)造方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }


//Worker構(gòu)造方法植捎,thread變量構(gòu)造的線程是它本身,即當(dāng)調(diào)用Worker中thread.start()時(shí)阳柔,
//最終執(zhí)行的是Worker類的run方法
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }


//執(zhí)行任務(wù)時(shí)焰枢,最后調(diào)用的方法
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //第一次執(zhí)行task時(shí),task肯定不為空盔沫,當(dāng)firstTask執(zhí)行完以后医咨,while循環(huán)等待,
            //指導(dǎo)從隊(duì)列中獲取到task架诞,即getTask()不為空時(shí)拟淮,getTask就是從隊(duì)列中獲取任務(wù)
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

3、拒絕策略

當(dāng)線程池的任務(wù)緩存隊(duì)列已滿并且線程池中的線程數(shù)目達(dá)到maximumPoolSize時(shí)谴忧,如果還有任務(wù)到來(lái)就會(huì)采取任務(wù)拒絕策略很泊,通常有以下四種策略:

  • ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
  • ThreadPoolExecutor.DiscardPolicy:丟棄任務(wù)沾谓,但是不拋出異常委造。
  • ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新提交被拒絕的任務(wù)
  • ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程(提交任務(wù)的線程)處理該任務(wù)

線程池的默認(rèn)拒絕策略為AbortPolicy均驶,即丟棄任務(wù)并拋出RejectedExecutionException異常昏兆。

4、線程池優(yōu)雅關(guān)閉

從源碼中可以看到妇穴,有兩種關(guān)閉方式爬虱,shutdown和shutdownNow隶债。

  • executorService.shutdown():線程池拒接收新提交的任務(wù),同時(shí)立馬關(guān)閉線程池跑筝,線程池里的任務(wù)不再執(zhí)行死讹。
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            //設(shè)置線程池狀態(tài)為SHUTDOWN,之后就不能再向線程池提交任務(wù)了
            advanceRunState(SHUTDOWN);
            //遍歷所有未執(zhí)行任務(wù)的線程曲梗,對(duì)其設(shè)置中斷
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
       //判斷所有任務(wù)是否都已退出赞警,如果退出則設(shè)置標(biāo)記 “TERMINATED”
      //在這里會(huì)死循環(huán)一直等到所有線程都執(zhí)行完任務(wù)后,再次中斷線程
        tryTerminate();
    }

private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
//中斷時(shí)虏两,先獲取鎖愧旦,runWorker中執(zhí)行任務(wù)時(shí),會(huì)先lock加鎖(見(jiàn)上面runWorker源碼)
//所以碘举,這里其實(shí)只會(huì)對(duì)中斷空閑線程
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
            }
        } finally {
            mainLock.unlock();
        }
    }

從上面的源碼中可以看出忘瓦,當(dāng)我們調(diào)用線程池的shuwdown方法時(shí),如果線程正在執(zhí)行線程池里的任務(wù)引颈,即便任務(wù)處于阻塞狀態(tài),線程也不會(huì)被中斷境蜕,而是繼續(xù)執(zhí)行(因?yàn)橛屑渔i蝙场,所以interruptIdleWorkers中worker獲取不到鎖,所以執(zhí)行不了中斷)粱年。
如果線程池阻塞等待從隊(duì)列里讀取任務(wù)getTask()售滤,則會(huì)被喚醒,但是會(huì)繼續(xù)判斷隊(duì)列是否為空台诗,如果不為空會(huì)繼續(xù)從隊(duì)列里讀取任務(wù)完箩,為空則線程退出。

  • executorService.shutdownNow():線程池拒接收新提交的任務(wù)拉队,同時(shí)立馬關(guān)閉線程池弊知,線程池里的任務(wù)不再執(zhí)行。
  public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
//修改線程池的狀態(tài)為STOP狀態(tài)
            advanceRunState(STOP);
//遍歷線程池里的所有工作線程粱快,然后調(diào)用線程的interrupt方法
            interruptWorkers();
//將隊(duì)列里還沒(méi)有執(zhí)行的任務(wù)放到列表里秩彤,返回給調(diào)用方
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }


    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                //直接中斷
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
//移除工作隊(duì)列中任務(wù),同時(shí)把其返回給調(diào)用方
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }

當(dāng)我們調(diào)用線程池的shutdownNow時(shí)事哭,如果線程正在getTask方法中執(zhí)行漫雷,則會(huì)通過(guò)for循環(huán)進(jìn)入到if語(yǔ)句,于是getTask返回null鳍咱,從而線程退出(getTask源碼中會(huì)先判斷線程狀態(tài)降盹,而上一步已經(jīng)把線程狀態(tài)修改為STOP了)。不管線程池里是否有未完成的任務(wù)谤辜。

如果線程因?yàn)閳?zhí)行提交到線程池里的任務(wù)而處于阻塞狀態(tài)蓄坏,則會(huì)導(dǎo)致報(bào)錯(cuò)(如果任務(wù)里沒(méi)有捕獲InterruptedException異常)仅胞,否則線程會(huì)執(zhí)行完當(dāng)前任務(wù),然后通過(guò)getTask方法返回為null來(lái)退出剑辫。

總結(jié):調(diào)用完shutdownNow和shuwdown方法后干旧,并不代表線程池已經(jīng)完成關(guān)閉操作,它只是異步的通知線程池進(jìn)行關(guān)閉處理妹蔽。如果要同步等待線程池徹底關(guān)閉后才繼續(xù)往下執(zhí)行椎眯,需要調(diào)用awaitTermination方法進(jìn)行同步等待。

5胳岂、ThreadPoolExecutor參數(shù)設(shè)置

5.1 默認(rèn)值

- corePoolSize=1
- queueCapacity=Integer.MAX_VALUE
- maxPoolSize=Integer.MAX_VALUE
- keepAliveTime=60s
- allowCoreThreadTimeout=false
- rejectedExecutionHandler=AbortPolicy()

5.2 自定義線程池參數(shù)的合理設(shè)置

為了說(shuō)明合理設(shè)置的條件编整,我們首先確定有以下?個(gè)相關(guān)參數(shù):

  • 1.tasks,程序每秒需要處理的最?任務(wù)數(shù)量(假設(shè)系統(tǒng)每秒任務(wù)數(shù)為100~1000)
  • 2.tasktime乳丰,單線程處理?個(gè)任務(wù)所需要的時(shí)間(每個(gè)任務(wù)耗時(shí)0.1秒)
  • 3.responsetime掌测,系統(tǒng)允許任務(wù)最?的響應(yīng)時(shí)間(每個(gè)任務(wù)的響應(yīng)時(shí)間不得超過(guò)2秒)

corePoolSize:核心線程數(shù)

每個(gè)任務(wù)需要tasktime秒處理,則每個(gè)線程每秒可處理1/tasktime個(gè)任務(wù)产园。系統(tǒng)每秒有tasks個(gè)任務(wù)需要處理汞斧,則需要的線程數(shù)為:tasks/(1/tasktime),即tasks*tasktime個(gè)線程數(shù)什燕。
假設(shè)系統(tǒng)每秒任務(wù)數(shù)為100到1000之間粘勒,每個(gè)任務(wù)耗時(shí)0.1秒,則需要100x0.1?1000x0.1屎即,即10到100個(gè)線程庙睡。

那么corePoolSize應(yīng)該設(shè)置為大于10。具體數(shù)字最好根據(jù)8020原則技俐,即80%情況下系統(tǒng)每秒任務(wù)數(shù)乘陪,若系統(tǒng)80%的情況下任務(wù)數(shù)小于200,最多時(shí)為1000雕擂,則corePoolSize可設(shè)置為20啡邑。

queueCapacity:任務(wù)隊(duì)列的長(zhǎng)度:

任務(wù)隊(duì)列的長(zhǎng)度要根據(jù)核心線程數(shù),以及系統(tǒng)對(duì)任務(wù)響應(yīng)時(shí)間的要求有關(guān)捂刺。隊(duì)列長(zhǎng)度可以設(shè)置為(corePoolSize/tasktime) * responsetime=(20/0.1) * 2=400谣拣,即隊(duì)列長(zhǎng)度可設(shè)置為400。

如果隊(duì)列長(zhǎng)度設(shè)置過(guò)?族展,會(huì)導(dǎo)致任務(wù)響應(yīng)時(shí)間過(guò)長(zhǎng)森缠,如以下寫(xiě)法:
LinkedBlockingQueue queue = new LinkedBlockingQueue();
這實(shí)際上是將隊(duì)列長(zhǎng)度設(shè)置為Integer.MAX_VALUE,將會(huì)導(dǎo)致線程數(shù)量永遠(yuǎn)為corePoolSize仪缸,再也不會(huì)增加贵涵,當(dāng)任務(wù)數(shù)量陡增時(shí),任務(wù)響應(yīng)時(shí)間也將隨之陡增。

maxPoolSize:最大線程數(shù)

當(dāng)系統(tǒng)負(fù)載達(dá)到最?值時(shí)宾茂,核心線程數(shù)已無(wú)法按時(shí)處理完所有任務(wù)瓷马,這時(shí)就需要增加線程。
每秒200個(gè)任務(wù)需要20個(gè)線程跨晴,那么當(dāng)每秒達(dá)到1000個(gè)任務(wù)時(shí)欧聘,則需要(1000-queueCapacity) * 0.1,即60個(gè)線程端盆,可將maxPoolSize設(shè)置為60怀骤。

keepAliveTime:

線程數(shù)量只增加不減少也不?。當(dāng)負(fù)載降低時(shí)焕妙,可減少線程數(shù)量蒋伦,如果?個(gè)線程空閑時(shí)間達(dá)到keepAliveTiime,該線程就退出焚鹊。默認(rèn)情況下線程池最少會(huì)保持corePoolSize個(gè)線程痕届。keepAliveTiime設(shè)定值可根據(jù)任務(wù)峰值持續(xù)時(shí)間來(lái)設(shè)定。

rejectedExecutionHandler:

根據(jù)具體情況來(lái)決定末患,任務(wù)不重要可丟棄研叫,任務(wù)重要?jiǎng)t要利用一些緩沖機(jī)制來(lái)處理

以上關(guān)于線程數(shù)量的計(jì)算并沒(méi)有考慮CPU的情況。若結(jié)合CPU的情況阻塑,比如蓝撇,當(dāng)線程數(shù)量達(dá)到50時(shí),CPU達(dá)到100%陈莽,則將maxPoolSize設(shè)置為60也不合適,此時(shí)若系統(tǒng)負(fù)載長(zhǎng)時(shí)間維持在每秒1000個(gè)任務(wù)虽抄,則超出線程池處理能?走搁,應(yīng)設(shè)法降低每個(gè)任務(wù)的處理時(shí)間(tasktime)。

補(bǔ)充:線程中斷

在程序中迈窟,我們是不能隨便中斷一個(gè)線程的私植,因?yàn)檫@是極其不安全的操作,我們無(wú)法知道這個(gè)線程正運(yùn)行在什么狀態(tài)车酣,它可能持有某把鎖曲稼,強(qiáng)行中斷可能導(dǎo)致鎖不能釋放的問(wèn)題;或者線程可能在操作數(shù)據(jù)庫(kù)湖员,強(qiáng)行中斷導(dǎo)致數(shù)據(jù)不一致混亂的問(wèn)題贫悄。正因此,JAVA里將Thread的stop方法設(shè)置為過(guò)時(shí)娘摔,以禁止大家使用窄坦。

一個(gè)線程什么時(shí)候可以退出呢?當(dāng)然只有線程自己才能知道。

所以我們這里要說(shuō)的Thread的interrrupt方法鸭津,本質(zhì)不是用來(lái)中斷一個(gè)線程彤侍。是將線程設(shè)置一個(gè)中斷狀態(tài)。

當(dāng)我們調(diào)用線程的interrupt方法逆趋,它有兩個(gè)作用:

  • 1盏阶、如果此線程處于阻塞狀態(tài)(比如調(diào)用了wait方法,io等待)闻书,則會(huì)立馬退出阻塞名斟,并拋出InterruptedException異常,線程就可以通過(guò)捕獲InterruptedException來(lái)做一定的處理惠窄,然后讓線程退出蒸眠。
  • 2、如果此線程正處于運(yùn)行之中杆融,則線程不受任何影響楞卡,繼續(xù)運(yùn)行,僅僅是線程的中斷標(biāo)記被設(shè)置為true脾歇。所以線程要在適當(dāng)?shù)奈恢猛ㄟ^(guò)調(diào)用isInterrupted方法來(lái)查看自己是否被中斷蒋腮,并做退出操作。

如果線程的interrupt方法先被調(diào)用藕各,然后線程調(diào)用阻塞方法進(jìn)入阻塞狀態(tài)池摧,InterruptedException異常依舊會(huì)拋出。
如果線程捕獲InterruptedException異常后激况,繼續(xù)調(diào)用阻塞方法作彤,將不再觸發(fā)InterruptedException異常。

參考:
http://www.reibang.com/p/f030aa5d7a28

http://www.reibang.com/p/23cb8b903d2c

https://www.cnblogs.com/zedosu/p/6665306.html

https://blog.csdn.net/mayongzhan_csdn/article/details/80790966

http://www.reibang.com/p/c41e942bcd64

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末乌逐,一起剝皮案震驚了整個(gè)濱河市竭讳,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌浙踢,老刑警劉巖绢慢,帶你破解...
    沈念sama閱讀 218,204評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異洛波,居然都是意外死亡胰舆,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)蹬挤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)缚窿,“玉大人,你說(shuō)我怎么就攤上這事闻伶”豕ィ” “怎么了够话?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,548評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)光绕。 經(jīng)常有香客問(wèn)我女嘲,道長(zhǎng),這世上最難降的妖魔是什么诞帐? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,657評(píng)論 1 293
  • 正文 為了忘掉前任欣尼,我火速辦了婚禮,結(jié)果婚禮上停蕉,老公的妹妹穿的比我還像新娘愕鼓。我一直安慰自己,他們只是感情好慧起,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,689評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布菇晃。 她就那樣靜靜地躺著,像睡著了一般蚓挤。 火紅的嫁衣襯著肌膚如雪磺送。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,554評(píng)論 1 305
  • 那天灿意,我揣著相機(jī)與錄音估灿,去河邊找鬼。 笑死缤剧,一個(gè)胖子當(dāng)著我的面吹牛馅袁,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播荒辕,決...
    沈念sama閱讀 40,302評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼汗销,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了抵窒?” 一聲冷哼從身側(cè)響起大溜,我...
    開(kāi)封第一講書(shū)人閱讀 39,216評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎估脆,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體座云,經(jīng)...
    沈念sama閱讀 45,661評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡疙赠,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,851評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了朦拖。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片圃阳。...
    茶點(diǎn)故事閱讀 39,977評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖璧帝,靈堂內(nèi)的尸體忽然破棺而出捍岳,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 35,697評(píng)論 5 347
  • 正文 年R本政府宣布锣夹,位于F島的核電站页徐,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏银萍。R本人自食惡果不足惜变勇,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,306評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望贴唇。 院中可真熱鬧搀绣,春花似錦、人聲如沸戳气。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,898評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)瓶您。三九已至麻捻,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間览闰,已是汗流浹背芯肤。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,019評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留压鉴,地道東北人崖咨。 一個(gè)月前我還...
    沈念sama閱讀 48,138評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像油吭,于是被迫代替她去往敵國(guó)和親击蹲。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,927評(píng)論 2 355