java多線程蒲列,線程池,多線程工具

線程的基本操作

用new關(guān)鍵字創(chuàng)建一個線程對象搀罢,然后調(diào)用它的start()啟動線程即可嫉嘀。

Thread thread  = new Thread();
thread.start();

線程有個run()方法,start()會創(chuàng)建一個新的線程并讓這個線程執(zhí)行run()方法魄揉。

//這種run()方法調(diào)用不能新建一個線程亏娜,而是在當(dāng)前線程中調(diào)用run()方法蕴坪,
//將run方法只是作為一個普通的方法調(diào)用。
Thread thread1  = new Thread();
thread1.run();

start方法是啟動一個線程夹抗,run方法只會在當(dāng)前線程中串行的執(zhí)行run方法中的代碼杰标。

Thread thread = new Thread(){
    @Override
    public void run() {
        System.out.println("xxxxxxx");
    }
};
thread.start();

通過繼承Thread類兵怯,然后重寫run方法,來自定義一個線程腔剂。
java中剛好提供了Runnable接口來自定義一個線程媒区。

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

Thread類有一個非常重要的構(gòu)造方法:

public Thread(Runnable target)

Thread的run方法:

@Override
public void run() {
    if (target != null) {
        target.run();
    }
}

實現(xiàn)Runnable接口是比較常見的做法,也是推薦的做法掸犬。

Thread thread2 =new Thread(new Runnable() {
      @Override
      public void run() {

      }
});
thread2.start();


線程狀態(tài) Thread

6個狀態(tài)定義:java.lang.Thread.State


image.png

1.NEW : 尚未啟動的線程的線程狀態(tài)

2.RUNNABLE :可運行線程的線程狀態(tài)袜漩,等待cpu調(diào)度

3.BLOCKED:線程阻塞等待監(jiān)視器鎖定的線程狀態(tài),處在synchronized同步代碼塊或方法中被阻塞湾碎。

4.WAITING:等待線程的線程狀態(tài)宙攻,如 不帶超時的方式:Object.wait,Thread.join,LockSupport.park進入這個狀態(tài)

5.TIMED_WAITING:具有指定等待時間的等待線程的此案陳狀態(tài),如 帶超時方式;Thread.sleep,Object.wait,Thread.join,LockSupport.parkNanos,LockSupport.parkUntil進入這個狀態(tài)

6.TERMINATED:終止線程的線程狀態(tài)介褥。線程正常完成執(zhí)行或出現(xiàn)異常

線程終止

正確的線程中止 interrupter

如果目標(biāo)線程在調(diào)用Object class的wait()座掘,wait(long)或wait(long,int)方法递惋,join(),join(long,int),或sleep(long,int)方法是被阻塞,那么Interrupt會生效溢陪,該線程的中斷狀態(tài)將被清除萍虽,拋出InterrupttedException異常。

如果目標(biāo)線程是被I/O或者NIO中的Channel鎖阻塞形真,同樣杉编,I/O操作會被中斷或者返回特殊異常值,達到終止線程的目的没酣。

如果以上條件都不滿足王财,則會設(shè)置此線程的中斷狀態(tài)。

正確的線程中止 標(biāo)志位

在代碼邏輯中裕便,增加一個判斷绒净,用來控制。

Thread提供了3個與線程中斷有關(guān)的方法:

public void interrupt()//中斷線程
public boolean isInterrupted()//判斷線程是否中斷
public static boolean interrupted()//判斷線程是否被中斷偿衰,并清除當(dāng)前中斷狀態(tài)
public static void main(String[] args) throws InterruptedException {
        Thread thread2 =new Thread(){
            @Override
            public void run() {
                while (true){
                    if (this.interrupted()){
                        break;
                    }
                }
            }
        };
        thread2.start();
        TimeUnit.SECONDS.sleep(1);
        thread2.interrupt();//中斷
    }

如果一個線程調(diào)用了sleep方法挂疆,一直處于休眠狀態(tài),通過變量控制下翎,不可以中斷線程缤言。此時只能使用線程提供的interrupt方法來中斷線程了。

 Thread thread2 =new Thread(){
            @Override
            public void run() {
                while (true){
                    try {
                        TimeUnit.SECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        this.interrupt();//拋異常视事,發(fā)出終止線程
                        e.printStackTrace();
                    }
                    if (this.isInterrupted()){
                        break;
                    }
                }
            }
        };
        thread2.setName("thread2");
        thread2.start();
        TimeUnit.SECONDS.sleep(1);
        thread2.interrupt();//中斷
}

sleep方法由于中斷而拋出異常之后胆萧,線程的中斷標(biāo)志會被清除(置為false),所以在異常中需要執(zhí)行this.interrupt()方法俐东,將中斷標(biāo)志位置為true.

線程通信

通信方式:

1)文件共享
2)網(wǎng)絡(luò)共享
3)共享變量
4)jdk提供的線程協(xié)調(diào)API

線程協(xié)作 JDK API

多線程協(xié)作的典型場景:生產(chǎn)者 - 消費者 (線程的阻塞跌穗,線程喚醒)

等待(wait)和通知(notify)
為了支持 多線程 之間的協(xié)作,JDK提供了兩個非常重要的方法:等待wait()方法和通知notify()方法虏辫。在Object類中定義的蚌吸。這意味著所有的對象都可以調(diào)用者兩個方法。java.lang.Object

public final void wait() throws InterruptedException
public final native void notify();//隨機喚醒

當(dāng)在一個對象實例上調(diào)用wait()方法后砌庄,當(dāng)前線程就會在這個對象上等待羹唠。

比如在線程A中,調(diào)用了obj.wait()方法娄昆,那么線程A就會停止繼續(xù)執(zhí)行佩微,轉(zhuǎn)為等待狀態(tài)。等待到什么時候結(jié)束呢萌焰?線程A會一直等到其他線程調(diào)用obj.notify()方法為止喊衫,這時,obj對象成為了多個線程之間的有效通信手段杆怕。

如果一個線程調(diào)用了object.wait()方法族购,那么它就會進出object對象的等待隊列壳贪。這個隊列中,可能會有多個線程寝杖,因為系統(tǒng)可能運行多個線程同時等待某一個對象违施。

當(dāng)object.notify()方法被調(diào)用時,它就會從這個隊列中 隨機 選擇一個線程瑟幕,并將其喚醒磕蒲。這個選擇是不公平.

Object獨享還有一個nofiyAll()方法,它和notify()方法的功能類似只盹,不同的是辣往,它會喚醒在這個等待隊列中所有等待的線程,而不是隨機選擇一個殖卑。

Object.wait()方法并不能隨便調(diào)用站削。它必須包含在對應(yīng)的synchronize語句匯總,無論是wait()方法或者notify()方法都需要首先獲取目標(biāo)獨享的一個監(jiān)視器孵稽。
Object.wait()方法和Thread.sleeep()方法都可以讓現(xiàn)場等待若干時間许起。除wait()方法可以被喚醒外,另外一個主要的區(qū)別就是wait()方法會釋放目標(biāo)對象的鎖菩鲜,而Thread.sleep()方法不會釋放鎖园细。

park/unpark機制
線程調(diào)用park則等待“許可”,unpark方法為指定線程
提供“許可permit”接校。
不要求 park和unpark方法的調(diào)用順序猛频。
調(diào)了幾次park,就得調(diào)幾次unpark

線程封閉

數(shù)據(jù)都被封閉在各自的線程之中蛛勉,就不需要同步鹿寻,這種通過將數(shù)據(jù)封閉在線程中而避免使用同步的技術(shù)即線程封閉。
ThreadLocal
ThreadLocal是java中的一種特殊的變量董习,每個線程都有一個ThreadLocal就是每個線程都擁有了自己的一個變量,競爭條件被徹底消除了爱只,在并發(fā)模式下是絕對安全的變量皿淋。

用法:

  ThreadLocal<T> var = new ThreadLocal<T>();

會自動在每一個線程上創(chuàng)建一個T的副本,副本之間彼此獨立恬试,互不影響窝趣。可以用ThreadLocal存儲一些參數(shù)训柴,以便在線程中多個方法中使用哑舒,用來代替方法傳參的做法。

棧封閉
局部變量的固有屬性之一即是封閉在線程中幻馁,它們位于執(zhí)行線程的棧中洗鸵,其它線程無法訪問這個棧越锈。

線程池及原理

volatile修飾共享變量

java幫我們提供了這樣的方法,使用volatile修飾共享變量膘滨,被volatile修改的變量有以下特點:

1.線程中讀取的時候甘凭,每次讀取都會去主內(nèi)存中讀取共享變量最新的值,然后將其復(fù)制到工作內(nèi)存

2.線程中修改了工作內(nèi)存中變量的副本火邓,修改之后會立即刷新到主內(nèi)存

volatile解決了共享變量在多線程中可見性的問題丹弱,可見性是指一個線程對共享變量的修改,對于另一個線程來說是否是可以看到的铲咨。

volatile不能保證線程安全躲胳,只能保證被修飾變量的內(nèi)存可見性,如果對該變量執(zhí)行的是非原子性操作依舊線程不安全纤勒。

什么是線程池?

如果系統(tǒng)能夠提前為我們創(chuàng)建好線程坯苹,我們需要的時候直接拿來使用,用完之后不是直接將其關(guān)閉踊东,而是將其返回到線程中中北滥,給其他需要這使用,這樣直接節(jié)省了創(chuàng)建和銷毀的時間闸翅,提升了系統(tǒng)的性能再芋。

線程池實現(xiàn)原理

當(dāng)向線程池提交一個任務(wù)之后,線程池的處理流程如下:

  1. 判斷是否達到核心線程數(shù)坚冀,若未達到济赎,則直接創(chuàng)建新的線程處理當(dāng)前傳入的任務(wù),否則進入下個流程
  2. 線程池中的工作隊列是否已滿记某,若未滿司训,則將任務(wù)丟入工作隊列中先存著等待處理,否則進入下個流程
  3. 是否達到最大線程數(shù)液南,若未達到壳猜,則創(chuàng)建新的線程處理當(dāng)前傳入的任務(wù),否則交給線程池中的飽和策略進行處理滑凉。
    image.png

jdk中提供了線程池的具體實現(xiàn)统扳,實現(xiàn)類是:java.util.concurrent.ThreadPoolExecutor,主要構(gòu)造方法:ThreadPoolExecutor

類型 類名 描述
接口 Executor 最上層的接口畅姊,定義了執(zhí)行任務(wù)的 execute
接口 ExecutorService 繼承Executor接口咒钟,擴展Callable,Tutrue若未,關(guān)閉方法
接口 ScheduledExecutorService 繼承ExecutorService朱嘴,增加定時任務(wù)相關(guān)方法
實現(xiàn)類 ThreadPoolExecutor 基礎(chǔ),標(biāo)準(zhǔn)的線程池實現(xiàn)
實現(xiàn)類 ScheduledThreadPoolExecutor 繼承ThreadPoolExecutor粗合,實現(xiàn)ScheduledExecutorService定時任務(wù)的方法
image.png

ExecutorService

//優(yōu)雅關(guān)閉線程萍嬉,之前提交的任務(wù)繼續(xù)執(zhí)行乌昔,但不接受新的任務(wù)
void shutdown();
//嘗試停止所有正在執(zhí)行的任務(wù),停止等待任務(wù)的處理帚湘,并返回等待執(zhí)行任務(wù)的列表
List<Runnable> shutdownNow();
//如果關(guān)閉后所有任務(wù)都已完成玫荣,則返回true
boolean isTerminated();
//監(jiān)測線程池是否關(guān)閉,直到所有任務(wù)完成執(zhí)行大诸,或超時發(fā)生捅厂,或當(dāng)前線程被中斷
boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
//提交一個用于執(zhí)行Callable返回任務(wù),并返回一個Future资柔,用獲取Callable執(zhí)行結(jié)果
<T> Future<T> submit(Callable<T> task);
//提交一個運行任務(wù)執(zhí)行焙贷,并返回一個Future, 執(zhí)行結(jié)果為傳入result
<T> Future<T> submit(Runnable task, T result);
//提交一個運行任務(wù)執(zhí)行贿堰,并返回一個Future辙芍, 執(zhí)行結(jié)果null
Future<?> submit(Runnable task);
//執(zhí)行給定的任務(wù)集合,執(zhí)行完畢后返回結(jié)果
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
//執(zhí)行給定的任務(wù)集合羹与,執(zhí)行完畢或超時故硅,返回結(jié)果,其它任務(wù)終止
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
//執(zhí)行給定的任務(wù),任意一個執(zhí)行成功纵搁,則返回結(jié)果吃衅,其它終止
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
//執(zhí)行給定的任務(wù),任意一個執(zhí)行成功或超時,則返回結(jié)果腾誉,其它終止
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

ScheduledExecutorService

//創(chuàng)建并執(zhí)行一個一次性任務(wù)徘层,過了延遲時間會被執(zhí)行
public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
//創(chuàng)建并執(zhí)行一個一次性任務(wù),過了延遲時間會被執(zhí)行
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);
//創(chuàng)建并執(zhí)行一個周期性任務(wù)利职,過了給定初始延遲時間趣效,會第一次被執(zhí)行,執(zhí)行過程發(fā)生異常猪贪,那么任務(wù)終止
//一次性任務(wù)執(zhí)行超過了周期時間跷敬,下一次任務(wù)會等到該任務(wù)執(zhí)行結(jié)束后,立刻執(zhí)行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
//創(chuàng)建并執(zhí)行一個周期性任務(wù)热押,過了給定初始延遲時間西傀,第一次被執(zhí)行,后續(xù)以給定的周期時間執(zhí)行
//執(zhí)行過程中發(fā)生異常楞黄,那么任務(wù)就停止池凄。
//一次性任務(wù)執(zhí)行超過了周期時間抡驼,下一次任務(wù)在該任務(wù)執(zhí)行結(jié)束的時間基礎(chǔ)上鬼廓,計算執(zhí)行延時
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
ThreadPoolExecutor
image.png

image.png

image.png
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

corePoolSize:核心線程大小,當(dāng)提交一個任務(wù)到線程池時致盟,線程池會創(chuàng)建一個線程來執(zhí)行任務(wù)碎税,即使有其他空閑線程可以處理任務(wù)也會創(chuàng)新線程尤慰,等到工作的線程數(shù)大于核心線程數(shù)時就不會在創(chuàng)建了。如果調(diào)用了線程池的prestartAllCoreThreads方法雷蹂,線程池會提前把核心線程都創(chuàng)造好伟端,并啟動

maximumPoolSize:線程池允許創(chuàng)建的最大線程數(shù)。如果隊列滿了匪煌,并且以創(chuàng)建的線程數(shù)小于最大線程數(shù)责蝠,則線程池會再創(chuàng)建新的線程執(zhí)行任務(wù)。如果我們使用了無界隊列萎庭,那么所有的任務(wù)會加入隊列霜医,這個參數(shù)就沒有什么效果了

keepAliveTime:線程池的工作線程空閑后,保持存活的時間驳规。如果沒有任務(wù)處理了肴敛,有些線程會空閑,空閑的時間超過了這個值吗购,會被回收掉医男。如果任務(wù)很多,并且每個任務(wù)的執(zhí)行時間比較短捻勉,避免線程重復(fù)創(chuàng)建和回收镀梭,可以調(diào)大這個時間,提高線程的利用率

unit:keepAliveTIme的時間單位贯底,可以選擇的單位有天丰辣、小時、分鐘禽捆、毫秒笙什、微妙、千分之一毫秒和納秒胚想。類型是一個枚舉java.util.concurrent.TimeUnit琐凭,這個枚舉也經(jīng)常使用,有興趣的可以看一下其源碼

workQueue:工作隊列浊服,用于緩存待處理任務(wù)的阻塞隊列统屈,常見的有4種,本文后面有介紹

threadFactory:線程池中創(chuàng)建線程的工廠牙躺,可以通過線程工廠給每個創(chuàng)建出來的線程設(shè)置更有意義的名字

handler:飽和策略愁憔,當(dāng)線程池?zé)o法處理新來的任務(wù)了,那么需要提供一種策略處理提交的新任務(wù)孽拷,默認(rèn)有4種策略

調(diào)用線程池的execute方法處理任務(wù)吨掌,執(zhí)行execute方法的過程:
  1. 判斷線程池中運行的線程數(shù)是否小于corepoolsize,是:則創(chuàng)建新的線程來處理任務(wù),否:執(zhí)行下一步
  2. 試圖將任務(wù)添加到workQueue指定的隊列中膜宋,如果無法添加到隊列窿侈,進入下一步
  3. 判斷線程池中運行的線程數(shù)是否小于maximumPoolSize,是:則新增線程處理當(dāng)前傳入的任務(wù)秋茫,否:將任務(wù)傳遞給handler對象rejectedExecution方法處理
線程池的使用步驟:
  1. 調(diào)用構(gòu)造方法創(chuàng)建線程池
  2. 調(diào)用線程池的方法處理任務(wù)
  3. 關(guān)閉線程池
/**
 * ThreadPoolExecutor 線程池
 *
 */
public class ThreadPoolExecutorDemo {

    static ThreadPoolExecutor executor = new ThreadPoolExecutor(3,
            5,
            10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(10),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

    public static void main(String[] args) {
        for (int i=0;i<10;i++){
            int j = i;
            String taskName = "任務(wù)"+j;
            executor.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(j);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+taskName+"處理完畢");
            });

        }
        executor.shutdown();
    }

}

線程池中常見5種工作隊列

任務(wù)太多的時候史简,工作隊列用于暫時緩存待處理的任務(wù),jdk中常見的5種阻塞隊列:

ArrayBlockingQueue:是一個基于數(shù)組結(jié)構(gòu)的有界阻塞隊列肛著,此隊列按照先進先出原則對元素進行排序

LinkedBlockingQueue:是一個基于鏈表結(jié)構(gòu)阻塞隊列圆兵,此隊列按照先進先出排序元素,吞吐量通常要高于ArrayBlockingQueue枢贿。靜態(tài)工廠方法Executors.newFixedThreadPool使用了這個隊列衙傀。

SynchronousQueue :一個不存儲元素的阻塞隊列,每個插入操作必須等到另外一個線程調(diào)用移除操作萨咕,否則插入操作一直處理阻塞狀態(tài)统抬,吞吐量通常要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用這個隊列

PriorityBlockingQueue:優(yōu)先級隊列危队,進入隊列的元素按照優(yōu)先級會進行排序

image.png

public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();
    System.out.println("開始");
    for (int i=0;i<50;i++){
        int j = i;
        String taskName = "任務(wù)"+j;
        executorService.execute(() -> {
            System.out.println(Thread.currentThread().getName()+" 處理 "+taskName);
            try {
                //模擬任務(wù)內(nèi)部處理耗時
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    System.out.println("結(jié)束");
    executorService.shutdown();
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

使用上面的方式創(chuàng)建線程池需要注意聪建,如果需要處理的任務(wù)比較耗時,會導(dǎo)致新來的任務(wù)都會創(chuàng)建新的線程進行處理茫陆,可能會導(dǎo)致創(chuàng)建非常多的線程金麸,最終耗盡系統(tǒng)資源,觸發(fā)OOM簿盅。

PriorityBlockingQueue優(yōu)先級隊列的線程池
static class Task implements Runnable,Comparable<Task>{
    private int i;
    private String name;
    public Task(int i,String name){
        this.i = i;
        this.name=name;
    }
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+" 處理 "+this.name);
    }
    @Override
    public int compareTo(Task o) {
        return Integer.compare(o.i,this.i);
    }
}    
public static void main(String[] args) {
    ExecutorService executorService = new ThreadPoolExecutor(1,1,60L,
            TimeUnit.SECONDS,new PriorityBlockingQueue<>());
    for (int i=0;i<10;i++){
        int j = i;
        String taskName = " 任務(wù) "+j;
        executorService.execute(new Task(i,taskName));
    }
    for (int i=100;i>=90;i--){
        int j = i;
        String taskName = " 任務(wù) "+j;
        executorService.execute(new Task(i,taskName));
    }
    executor.shutdown();
}

除了第一個任務(wù)挥下,其他任務(wù)按照優(yōu)先級高低按順序處理。原因在于:創(chuàng)建線程池的時候使用了優(yōu)先級隊列桨醋,進入隊列中的任務(wù)會進行排序午笛,任務(wù)的先后順序由Task中的i變量決定驶冒。向PriorityBlockingQueue加入元素的時候,內(nèi)部會調(diào)用代碼中Task的compareTo方法決定元素的先后順序。

自定義創(chuàng)建線程的工廠

給線程池中線程起一個有意義的名字我碟,在系統(tǒng)出現(xiàn)問題的時候官研,通過線程堆棧信息可以更容易發(fā)現(xiàn)系統(tǒng)中問題所在藻治。自定義創(chuàng)建工廠需要實現(xiàn)java.util.concurrent.ThreadFactory接口中的Thread newThread(Runnable r)方法载矿,參數(shù)為傳入的任務(wù),需要返回一個工作線程虫蝶。

static AtomicInteger threadNum  = new AtomicInteger(1);
public static void main(String[] args) {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(5,5,
            60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(10),
            r -> {
        Thread thread = new Thread(r);
        thread.setName("自定義線程--"+threadNum.getAndIncrement());
        return thread;
            });
    for (int i=0;i<5;i++){
        String taskName = "任務(wù)--" + i;
        executor.execute(() ->{
            System.out.println(Thread.currentThread().getName()+" , 處理: "+taskName);
        });
    }
    executor.shutdown();
}
4種常見飽和策略

當(dāng)線程池中隊列已滿章咧,并且線程池已達到最大線程數(shù),線程池會將任務(wù)傳遞給飽和策略進行處理能真。這些策略都實現(xiàn)了RejectedExecutionHandler接口赁严。接口中有個方法:

void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

r:需要執(zhí)行的任務(wù)

executor:當(dāng)前線程池對象

JDK中提供了4種常見的飽和策略:

AbortPolicy:直接拋出異常

CallerRunsPolicy:在當(dāng)前調(diào)用者的線程中運行任務(wù)调限,即隨丟來的任務(wù),由他自己去處理

DiscardOldestPolicy:丟棄隊列中最老的一個任務(wù)误澳,即丟棄隊列頭部的一個任務(wù),然后執(zhí)行當(dāng)前傳入的任務(wù)

DiscardPolicy:不處理秦躯,直接丟棄掉忆谓,方法內(nèi)部為空

自定義飽和策略

需要實現(xiàn)RejectedExecutionHandler接口。任務(wù)無法處理的時候踱承,我們想記錄一下日志倡缠,我們需要自定義一個飽和策略,示例代碼:

static class Task implements Runnable{
        private String name;
        public Task(String name){
            this.name = name;
        }
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName()+", 處理="+this.name);
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public String toString() {
            return "Task{name="+name+"}";
        }
    }
  public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1,1,
                60L,TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(1),
                Executors.defaultThreadFactory(),
                (r,rexecutor) -> {
                //自定義飽和策略
                 //記錄一下無法處理的任務(wù)
                    System.out.println("無法處理的任務(wù): "+r.toString());
        });
        for (int i=0;i<5;i++){
            executor.execute(new Task(" 任務(wù)-"+i));
        }
        executor.shutdown();
    }

輸出結(jié)果中可以看到有3個任務(wù)進入了飽和策略中茎活,記錄了任務(wù)的日志昙沦,對于無法處理多任務(wù),我們最好能夠記錄一下载荔,讓開發(fā)人員能夠知道盾饮。任務(wù)進入了飽和策略,說明線程池的配置可能不是太合理懒熙,或者機器的性能有限丘损,需要做一些優(yōu)化調(diào)整。

擴展線程池

ThreadPoolExecutor內(nèi)部提供了幾個方法beforeExecute工扎、afterExecute徘钥、terminated,可以由開發(fā)人員自己去這些方法肢娘〕蚀。看一下線程池內(nèi)部的源碼:

image.png

image.png

image.png

beforeExecute:任務(wù)執(zhí)行之前調(diào)用的方法,有2個參數(shù)橱健,第1個參數(shù)是執(zhí)行任務(wù)的線程而钞,第2個參數(shù)是任務(wù)

protected void beforeExecute(Thread t, Runnable r) { }

afterExecute:任務(wù)執(zhí)行完成之后調(diào)用的方法,2個參數(shù)拘荡,第1個參數(shù)表示任務(wù)笨忌,第2個參數(shù)表示任務(wù)執(zhí)行時的異常信息,如果無異常俱病,第二個參數(shù)為null

protected void afterExecute(Runnable r, Throwable t) { }

terminated:線程池最終關(guān)閉之后調(diào)用的方法官疲。所有的工作線程都退出了,最終線程池會退出亮隙,退出時調(diào)用該方法

protected void terminated() { }
public static void main(String[] args) throws InterruptedException {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(10,
            10,60L,TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(1),
            Executors.defaultThreadFactory(),
            (r,executors) -> {
                //自定義飽和策略
                //記錄一下無法處理的任務(wù)
                System.out.println("無法處理的任務(wù)-" + r.toString());

            }){
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            System.out.println(System.currentTimeMillis()+","+t.getName()+",開始執(zhí)行任務(wù):" + r.toString());
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            System.out.println(System.currentTimeMillis()+","+Thread.currentThread().getName()+",r任務(wù):" + r.toString()+",執(zhí)行完畢");
        }

        @Override
        protected void terminated() {
            System.out.println(System.currentTimeMillis()+","+Thread.currentThread().getName() + ",關(guān)閉線程池");
        }
    };
  for (int i= 0;i<10;i++){
        executor.execute(new Task("任務(wù)-"+i));
    }
    TimeUnit.SECONDS.sleep(2);
    executor.shutdown();
}

JUC中的Executors框架

Excecutor框架主要包含3部分的內(nèi)容:

  1. 任務(wù)相關(guān)的:包含被執(zhí)行的任務(wù)要實現(xiàn)的接口:Runnable接口或Callable接口
  2. 任務(wù)的執(zhí)行相關(guān)的:包含任務(wù)執(zhí)行機制的核心接口Executor途凫,以及繼承自ExecutorExecutorService接口。Executor框架中有兩個關(guān)鍵的類實現(xiàn)了ExecutorService接口(ThreadPoolExecutorScheduleThreadPoolExecutor
  3. 異步計算結(jié)果相關(guān)的:包含接口Future實現(xiàn)Future接口的FutureTask類

Executors框架包括:

  • Executor
  • ExecutorService
  • ThreadPoolExecutor
  • Executors
  • Future
  • Callable
  • FutureTask
  • CompletableFuture
  • CompletionService
  • ExecutorCompletionService

Executors類

Executors類溢吻,提供了一系列工廠方法用于創(chuàng)建線程池维费,返回的線程池都實現(xiàn)了ExecutorService接口果元。常用的方法有:


image.png

image.png

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor(){}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {}

創(chuàng)建一個單線程的線程池。這個線程池只有一個線程在工作犀盟,也就是相當(dāng)于單線程串行執(zhí)行所有任務(wù)而晒。如果這個唯一的線程因為異常結(jié)束,那么會有一個新的線程來替代它阅畴。此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行倡怎。內(nèi)部使用了無限容量的LinkedBlockingQueue阻塞隊列來緩存任務(wù),任務(wù)如果比較多贱枣,單線程如果處理不過來监署,會導(dǎo)致隊列堆滿,引發(fā)OOM纽哥。

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {}

創(chuàng)建固定大小的線程池钠乏。每次提交一個任務(wù)就創(chuàng)建一個線程,直到線程達到線程池的最大大小春塌。線程池的大小一旦達到最大值就會保持不變晓避,在提交新任務(wù),任務(wù)將會進入等待隊列中等待只壳。如果某個線程因為執(zhí)行異常而結(jié)束够滑,那么線程池會補充一個新線程。內(nèi)部使用了無限容量的LinkedBlockingQueue阻塞隊列來緩存任務(wù)吕世,任務(wù)如果比較多彰触,如果處理不過來,會導(dǎo)致隊列堆滿命辖,引發(fā)OOM况毅。

newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {}

public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {}

創(chuàng)建一個大小無限的線程池。此線程池支持定時以及周期性執(zhí)行任務(wù)的需求尔艇。

在《阿里巴巴java開發(fā)手冊》中指出了線程資源必須通過線程池提供,
使用ThreadPoolExecutor有助于大家明確線程池的運行規(guī)則尔许,創(chuàng)建符合自己的業(yè)務(wù)場景需要的線程池,避免資源耗盡的風(fēng)險终娃。

Future味廊、Callable接口

Future接口定義了操作異步任務(wù)執(zhí)行一些方法,如獲取異步任務(wù)的執(zhí)行結(jié)果棠耕、取消任務(wù)的執(zhí)行余佛、判斷任務(wù)是否被取消、判斷任務(wù)執(zhí)行是否完畢等窍荧。

image.png

Callable接口中定義了需要有返回的任務(wù)需要實現(xiàn)的方法辉巡。

@FunctionalInterface
public interface Callable<V> { 
    V call() throws Exception;
}

比如主線程讓一個子線程去執(zhí)行任務(wù),子線程可能比較耗時蕊退,啟動子線程開始執(zhí)行任務(wù)后郊楣,主線程就去做其他事情了憔恳,過了一會才去獲取子任務(wù)的執(zhí)行結(jié)果。

//獲取異步任務(wù)執(zhí)行結(jié)果
public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(1);
    Future<Integer> result = executorService.submit(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            System.out.println(System.currentTimeMillis()+","+Thread.currentThread().getName()+",開始");
            TimeUnit.SECONDS.sleep(5);
            System.out.println(System.currentTimeMillis()+","+Thread.currentThread().getName()+",結(jié)束");
            return 10;
        }
    });
     System.out.println(System.currentTimeMillis() +"  "+ Thread.currentThread().getName());
    try {//get()阻塞等待結(jié)果返回
        System.out.println(System.currentTimeMillis() +"  "+ Thread.currentThread().getName()+", result="+result.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }finally {
        executorService.shutdown();
    }
}

代碼中創(chuàng)建了一個線程池净蚤,調(diào)用線程池的submit方法執(zhí)行任務(wù)钥组,submit參數(shù)為Callable接口:表示需要執(zhí)行的任務(wù)有返回值,submit方法返回一個Future對象今瀑,F(xiàn)uture相當(dāng)于一個憑證程梦,可以在任意時間拿著這個憑證去獲取對應(yīng)任務(wù)的執(zhí)行結(jié)果(調(diào)用其get方法),代碼中調(diào)用了result.get()方法之后放椰,此方法會阻塞當(dāng)前線程直到任務(wù)執(zhí)行結(jié)束。

超時獲取異步任務(wù)執(zhí)行結(jié)果

可能任務(wù)執(zhí)行比較耗時愉粤,比如耗時1分鐘砾医,我最多只能等待10秒,如果10秒還沒返回衣厘,我就去做其他事情了如蚜。

剛好get有個超時的方法,聲明如下:

java.util.concurrent.Future;

V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;
Future其他方法介紹一下

cancel:取消在執(zhí)行的任務(wù)影暴,參數(shù)表示是否對執(zhí)行的任務(wù)發(fā)送中斷信號错邦,方法聲明如下:

boolean cancel(boolean mayInterruptIfRunning);

isCancelled:用來判斷任務(wù)是否被取消

isDone:判斷任務(wù)是否執(zhí)行完畢。

Future型宙、Callable接口需要結(jié)合ExecutorService來使用撬呢,需要有線程池的支持。

FutureTask類

FutureTask除了實現(xiàn)Future接口妆兑,還實現(xiàn)了Runnable接口魂拦,因此FutureTask可以交給Executor執(zhí)行,也可以交給線程執(zhí)行執(zhí)行(Thread有個Runnable的構(gòu)造方法)搁嗓,FutureTask表示帶返回值結(jié)果的任務(wù)芯勘。

public static void main(String[] args) throws ExecutionException, InterruptedException {
    FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            System.out.println(System.currentTimeMillis() + "開始" + Thread.currentThread().getName());
            TimeUnit.SECONDS.sleep(3);
            System.out.println(System.currentTimeMillis() + " 結(jié)束 " + Thread.currentThread().getName());
            return 10;
        }
    });
    System.out.println(System.currentTimeMillis() + "  " + Thread.currentThread().getName());
    new Thread(futureTask).start();//線程啟動Futuretask
    System.out.println(System.currentTimeMillis() + "  " + Thread.currentThread().getName());
    System.out.println(System.currentTimeMillis() +"  "+ Thread.currentThread().getName()+",result:"+futureTask.get());
}

image.png

image.png

線程池的submit方法返回的Future實際類型正是FutureTask對象

image.png

CompletionService接口

java.util.concurrent.CompletionService


CompletionService相當(dāng)于一個執(zhí)行任務(wù)的服務(wù),通過submit丟任務(wù)給這個服務(wù)腺逛,服務(wù)內(nèi)部去執(zhí)行任務(wù)荷愕,可以通過服務(wù)提供的一些方法獲取服務(wù)中已經(jīng)完成的任務(wù)。

接口內(nèi)的幾個方法:
Future<V> submit(Callable<V> task);

用于向服務(wù)中提交有返回結(jié)果的任務(wù)棍矛,并返回Future對象

Future<V> take() throws InterruptedException;

從服務(wù)中返回并移除一個已經(jīng)完成的任務(wù)安疗,如果獲取不到,會一致阻塞到有返回值為止够委。此方法會響應(yīng)線程中斷茂契。

從服務(wù)中返回并移除一個已經(jīng)完成的任務(wù),如果獲取不到慨绳,會一致阻塞到有返回值為止掉冶。此方法會響應(yīng)線程中斷真竖。
Future<V> poll();

通過submit向內(nèi)部提交任意多個任務(wù),通過take方法可以獲取已經(jīng)執(zhí)行完成的任務(wù)厌小,如果獲取不到將等待恢共。

ExecutorCompletionService 類

ExecutorCompletionService類是CompletionService接口的具體實現(xiàn);

ExecutorCompletionService創(chuàng)建的時候會傳入一個線程池璧亚,調(diào)用submit方法傳入需要執(zhí)行的任務(wù)讨韭,任務(wù)由內(nèi)部的線程池來處理;ExecutorCompletionService內(nèi)部有個阻塞隊列癣蟋,任意一個任務(wù)完成之后透硝,會將任務(wù)的執(zhí)行結(jié)果(Future類型)放入阻塞隊列中,然后其他線程可以調(diào)用它take疯搅、poll方法從這個阻塞隊列中獲取一個已經(jīng)完成的任務(wù)濒生,獲取任務(wù)返回結(jié)果的順序和任務(wù)執(zhí)行完成的先后順序一致,所以最先完成的任務(wù)會先返回幔欧。

image.png
public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

構(gòu)造方法需要傳入一個Executor對象罪治,這個對象表示任務(wù)執(zhí)行器,所有傳入的任務(wù)會被這個執(zhí)行器執(zhí)行礁蔗。

completionQueue是用來存儲任務(wù)結(jié)果的阻塞隊列觉义,默認(rèn)用采用的是LinkedBlockingQueue,也支持開發(fā)自己設(shè)置浴井。通過submit傳入需要執(zhí)行的任務(wù)晒骇,任務(wù)執(zhí)行完成之后,會放入completionQueue中磺浙。

執(zhí)行一批任務(wù)厉碟,然后消費執(zhí)行結(jié)果
public static void main(String[] args) throws ExecutionException, InterruptedException {
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    List<Callable<Integer>> list = new ArrayList<>();
    int taskCount = 5;
    for (int i = taskCount;i>0;i--){
        int j=i*2;
        list.add(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                TimeUnit.SECONDS.sleep(j);
                return j;
            }
        });
    }
    soive(executorService,list,new Consumer<Integer>(){

        @Override
        public void accept(Integer integer) {
            System.out.println(System.currentTimeMillis()+" = "+integer);
        }
    });
    executorService.shutdown();
}
private static <T> void soive(ExecutorService executorService, List<Callable<T>> solvers, Consumer<T> consumer) throws InterruptedException, ExecutionException {
    CompletionService<T> ecs = new ExecutorCompletionService<T>(executorService);
    for (Callable<T> s:solvers){
        ecs.submit(s);//提交任務(wù)
    }
    int n = solvers.size();
    for (int i = 0;i<n;i++){
        T r = ecs.take().get();
        if (r !=null){
            consumer.accept(r);//消費任務(wù)
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市屠缭,隨后出現(xiàn)的幾起案子箍鼓,更是在濱河造成了極大的恐慌,老刑警劉巖呵曹,帶你破解...
    沈念sama閱讀 216,997評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件款咖,死亡現(xiàn)場離奇詭異,居然都是意外死亡奄喂,警方通過查閱死者的電腦和手機铐殃,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,603評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來跨新,“玉大人富腊,你說我怎么就攤上這事∮蛘剩” “怎么了赘被?”我有些...
    開封第一講書人閱讀 163,359評論 0 353
  • 文/不壞的土叔 我叫張陵是整,是天一觀的道長。 經(jīng)常有香客問我民假,道長浮入,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,309評論 1 292
  • 正文 為了忘掉前任羊异,我火速辦了婚禮事秀,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘野舶。我一直安慰自己易迹,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,346評論 6 390
  • 文/花漫 我一把揭開白布平道。 她就那樣靜靜地躺著睹欲,像睡著了一般。 火紅的嫁衣襯著肌膚如雪巢掺。 梳的紋絲不亂的頭發(fā)上句伶,一...
    開封第一講書人閱讀 51,258評論 1 300
  • 那天劲蜻,我揣著相機與錄音陆淀,去河邊找鬼。 笑死先嬉,一個胖子當(dāng)著我的面吹牛轧苫,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播疫蔓,決...
    沈念sama閱讀 40,122評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼含懊,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了衅胀?” 一聲冷哼從身側(cè)響起岔乔,我...
    開封第一講書人閱讀 38,970評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎滚躯,沒想到半個月后雏门,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,403評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡掸掏,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,596評論 3 334
  • 正文 我和宋清朗相戀三年茁影,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片丧凤。...
    茶點故事閱讀 39,769評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡募闲,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出愿待,到底是詐尸還是另有隱情浩螺,我是刑警寧澤靴患,帶...
    沈念sama閱讀 35,464評論 5 344
  • 正文 年R本政府宣布,位于F島的核電站年扩,受9級特大地震影響蚁廓,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜厨幻,卻給世界環(huán)境...
    茶點故事閱讀 41,075評論 3 327
  • 文/蒙蒙 一相嵌、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧况脆,春花似錦饭宾、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,705評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至盛末,卻和暖如春弹惦,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背悄但。 一陣腳步聲響...
    開封第一講書人閱讀 32,848評論 1 269
  • 我被黑心中介騙來泰國打工棠隐, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人檐嚣。 一個月前我還...
    沈念sama閱讀 47,831評論 2 370
  • 正文 我出身青樓助泽,卻偏偏與公主長得像,于是被迫代替她去往敵國和親嚎京。 傳聞我的和親對象是個殘疾皇子嗡贺,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,678評論 2 354