Spring線程池ThreadPoolTaskExecutor的使用

1 線程池簡介

1.1 為什么使用線程池

  • 降低系統(tǒng)資源消耗回窘,通過重用已存在的線程,降低線程創(chuàng)建和銷毀造成的消耗市袖;
  • 提高系統(tǒng)響應速度啡直,當有任務到達時,通過復用已存在的線程苍碟,無需等待新線程的創(chuàng)建便能立即執(zhí)行酒觅;
  • 方便線程并發(fā)數(shù)的管控,因為線程若是無限制的創(chuàng)建微峰,可能會導致內存占用過多而產(chǎn)生OOM阐滩,并且會造成cpu過度切換(cpu切換線程是有時間成本的(需要保持當前執(zhí)行線程的現(xiàn)場,并恢復要執(zhí)行線程的現(xiàn)場)
  • 提供更強大的功能县忌,延時定時線程池

1.2 線程池為什么需要使用隊列

因為線程若是無限制的創(chuàng)建掂榔,可能會導致內存占用過多而產(chǎn)生OOM,并且會造成cpu過度切換症杏。

創(chuàng)建線程池的消耗較高或者線程池創(chuàng)建線程需要獲取mainlock這個全局鎖装获,影響并發(fā)效率,阻塞隊列可以很好的緩沖

1.3 線程池為什么要使用阻塞隊列而不使用非阻塞隊列

阻塞隊列可以保證任務隊列中沒有任務時阻塞獲取任務的線程厉颤,使得線程進入wait狀態(tài)穴豫,釋放cpu資源,當隊列中有任務時才喚醒對應線程從隊列中取出消息進行執(zhí)行。
使得在線程不至于一直占用cpu資源精肃。(線程執(zhí)行完任務后通過循環(huán)再次從任務隊列中取出任務進行執(zhí)行秤涩,代碼片段如:while (task != null || (task = getTask()) != null) {})。

不用阻塞隊列也是可以的司抱,不過實現(xiàn)起來比較麻煩而已筐眷,有好用的為啥不用呢

1.4 如何配置線程池

  • CPU密集型任務
    盡量使用較小的線程池,一般為CPU核心數(shù)+1习柠。 因為CPU密集型任務使得CPU使用率很高匀谣,若開過多的線程數(shù),會造成CPU過度切換

  • IO密集型任務
    可以使用稍大的線程池资溃,一般為2*CPU核心數(shù)武翎。 IO密集型任務CPU使用率并不高,因此可以讓CPU在等待IO的時候有其他線程去處理別的任務溶锭,充分利用CPU時間

  • 混合型任務
    可以將任務分成IO密集型和CPU密集型任務宝恶,然后分別用不同的線程池去處理。 只要分完之后兩個任務的執(zhí)行時間相差不大趴捅,那么就會比串行執(zhí)行來的高效
    因為如果劃分之后兩個任務執(zhí)行時間有數(shù)據(jù)級的差距卑惜,那么拆分沒有意義。
    因為先執(zhí)行完的任務就要等后執(zhí)行完的任務驻售,最終的時間仍然取決于后執(zhí)行完的任務露久,而且還要加上任務拆分與合并的開銷,得不償失

1.5 execute()和submit()方法

  1. execute()欺栗,執(zhí)行一個任務毫痕,沒有返回值
  2. submit(),提交一個線程任務,有返回值

submit(Callable<T> task)能獲取到它的返回值,通過future.get()獲然(阻塞直到任務執(zhí)行完)煌珊。一般使用FutureTask+Callable配合使用
submit(Runnable task, T result)能通過傳入的載體result間接獲得線程的返回值脸秽。
submit(Runnable task)則是沒有返回值的,就算獲取它的返回值也是null

Future.get()方法會使取結果的線程進入阻塞狀態(tài),直到線程執(zhí)行完成之后,喚醒取結果的線程缸逃,然后返回結果

1.6 Spring線程池

Spring 通過任務執(zhí)行器(TaskExecutor)來實現(xiàn)多線程和并發(fā)編程,使用ThreadPoolTaskExecutor實現(xiàn)一個基于線程池的TaskExecutor厂抽,
還得需要使用@EnableAsync開啟異步需频,并通過在需要的異步方法那里使用注解@Async聲明是一個異步任務
Spring 已經(jīng)實現(xiàn)的異常線程池:

  • SimpleAsyncTaskExecutor:不是真的線程池,這個類不重用線程筷凤,每次調用都會創(chuàng)建一個新的線程昭殉。
  • SyncTaskExecutor:這個類沒有實現(xiàn)異步調用,只是一個同步操作。只適用于不需要多線程的地方
  • ConcurrentTaskExecutorExecutor的適配類挪丢,不推薦使用蹂风。如果ThreadPoolTaskExecutor不滿足要求時,才用考慮使用這個類
  • SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的類乾蓬。線程池同時被quartz和非quartz使用惠啄,才需要使用此類
  • ThreadPoolTaskExecutor:最常使用,推薦巢块。 其實質是對java.util.concurrent.ThreadPoolExecutor的包裝

1.7 @Async調用中的事務處理機制

點擊了解使用@Async使用的事務問題

2 示例

2.1 線程池配置類

package cn.jzh.thread;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
@ComponentScan("cn.jzh.thread")
@EnableAsync  //開啟異步操作
public class TaskExecutorConfig implements AsyncConfigurer {

    /**
     * 通過getAsyncExecutor方法配置ThreadPoolTaskExecutor,獲得一個基于線程池TaskExecutor
     *
     * @return
     */
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
        pool.setCorePoolSize(5);//核心線程數(shù)
        pool.setMaxPoolSize(10);//最大線程數(shù)
        pool.setQueueCapacity(25);//線程隊列
        pool.initialize();//線程初始化
        return pool;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
}

配置類中方法說明:
Spring 中的ThreadPoolExecutor是借助JDK并發(fā)包中的java.util.concurrent.ThreadPoolExecutor來實現(xiàn)的。其中一些值的含義如下:

  • int corePoolSize:線程池維護線程的最小數(shù)量
  • int maximumPoolSize:線程池維護線程的最大數(shù)量巧号,線程池中允許的最大線程數(shù)族奢,線程池中的當前線程數(shù)目不會超過該值。如果隊列中任務已滿丹鸿,并且當前線程個數(shù)小于maximumPoolSize越走,那么會創(chuàng)建新的線程來執(zhí)行任務。
  • long keepAliveTime:空閑線程的存活時間TimeUnit
  • unit:時間單位靠欢,現(xiàn)由納秒廊敌,微秒,毫秒门怪,秒
  • BlockingQueue workQueue:持有等待執(zhí)行的任務隊列
  • RejectedExecutionHandler handler 線程池的拒絕策略骡澈,是指當任務添加到線程池中被拒絕,而采取的處理措施掷空。
    當任務添加到線程池中之所以被拒絕肋殴,可能是由于:第一,線程池異常關閉坦弟。第二护锤,任務數(shù)量超過線程池的最大限制。
    Reject策略預定義有四種:
  1. ThreadPoolExecutor.AbortPolicy策略酿傍,是默認的策略,處理程序遭到拒絕將拋出運行時 RejectedExecutionException
  2. ThreadPoolExecutor.CallerRunsPolicy策略 ,調用者的線程會執(zhí)行該任務,如果執(zhí)行器已關閉,則丟棄.
  3. ThreadPoolExecutor.DiscardPolicy策略烙懦,不能執(zhí)行的任務將被丟棄.
  4. ThreadPoolExecutor.DiscardOldestPolicy策略,如果執(zhí)行程序尚未關閉赤炒,則位于工作隊列頭部的任務將被刪除氯析,然后重試執(zhí)行程序(如
    果再次失敗,則重復此過程)
  5. 自定義策略:當然也可以根據(jù)應用場景需要來實現(xiàn)RejectedExecutionHandler接口自定義策略莺褒。如記錄日志或持久化不能處理的任務

2.2 異步方法

@Async注解可以用在方法上魄鸦,表示該方法是個異步方法,也可以用在類上癣朗,那么表示此類的所有方法都是異步方法
異步方法會自動注入使用ThreadPoolTaskExecutor作為TaskExecutor

package cn.jzh.thread;

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.Future;

@Service
public class AsyncTaskService {
    /**
     * 
     * @param i
     */
    @Async
    public void executeAsync(Integer i) throws Exception{
        System.out.println("線程ID:" + Thread.currentThread().getId() + "線程名字:" +Thread.currentThread().getName()+"執(zhí)行異步任務:" + i);
    }

    @Async
    public Future<String> executeAsyncPlus(Integer i) throws Exception {
        System.out.println("線程ID:" + Thread.currentThread().getId() +"線程名字:" +Thread.currentThread().getName()+ "執(zhí)行異步有返回的任務:" + i);
        return new AsyncResult<>("success:"+i);
    }

}

2.3 啟動測試

package cn.jzh.thread;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.util.concurrent.Future;

public class MainApp {
    public static void main(String[] args) throws Exception{
        System.out.println("主線程id:" + Thread.currentThread().getId() + "開始執(zhí)行調用任務...");
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(TaskExecutorConfig.class);
        AsyncTaskService service = context.getBean(AsyncTaskService.class);
        for (int i = 0;i<10;i++){
            service.executeAsync(i);
            Future<String> result = service.executeAsyncPlus(i);
            System.out.println("異步程序執(zhí)行結束拾因,獲取子線程返回內容(會阻塞當前main線程)" + result.get());
        }
        context.close();

        System.out.println("主線程id:" + Thread.currentThread().getId() + "程序結束!!");
    }
}

注意:

  1. 是否影響主線程
    如果main主線程不去獲取子線程的結果(Future.get()),那么主線程完全可以不阻塞。那么绢记,此時扁达,主線程和子線程完全異步。此功能蠢熄,可以做成類似MQ消息中間件之類的跪解,消息異步進行發(fā)送
  2. 判斷是否執(zhí)行完畢
    當返回的數(shù)據(jù)類型為Future類型,其為一個接口签孔。具體的結果類型為AsyncResult,這個是需要注意的地方叉讥。
    調用返回結果的異步方法,判斷是否執(zhí)行完畢時需要使用future.isDone()來判斷是否執(zhí)行完畢
public void testAsyncAnnotationForMethodsWithReturnType()  
   throws InterruptedException, ExecutionException {  
    System.out.println("Invoking an asynchronous method. "   + Thread.currentThread().getName());  
    Future<String> future = asyncAnnotationExample.asyncMethodWithReturnType();  
   
    while (true) {  ///這里使用了循環(huán)判斷饥追,等待獲取結果信息  
        if (future.isDone()) {  //判斷是否執(zhí)行完畢  
            System.out.println("Result from asynchronous process - " + future.get());  
            break;  
        }  
        System.out.println("Continue doing something else. ");  
        Thread.sleep(1000);  
    }  
}

這些獲取異步方法的結果信息图仓,是通過不停的檢查Future的狀態(tài)來獲取當前的異步方法是否執(zhí)行完畢來實現(xiàn)的

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市但绕,隨后出現(xiàn)的幾起案子救崔,更是在濱河造成了極大的恐慌,老刑警劉巖捏顺,帶你破解...
    沈念sama閱讀 216,402評論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件六孵,死亡現(xiàn)場離奇詭異,居然都是意外死亡幅骄,警方通過查閱死者的電腦和手機劫窒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來拆座,“玉大人烛亦,你說我怎么就攤上這事《埃” “怎么了煤禽?”我有些...
    開封第一講書人閱讀 162,483評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長岖赋。 經(jīng)常有香客問我檬果,道長,這世上最難降的妖魔是什么唐断? 我笑而不...
    開封第一講書人閱讀 58,165評論 1 292
  • 正文 為了忘掉前任选脊,我火速辦了婚禮,結果婚禮上脸甘,老公的妹妹穿的比我還像新娘恳啥。我一直安慰自己,他們只是感情好丹诀,可當我...
    茶點故事閱讀 67,176評論 6 388
  • 文/花漫 我一把揭開白布钝的。 她就那樣靜靜地躺著翁垂,像睡著了一般。 火紅的嫁衣襯著肌膚如雪硝桩。 梳的紋絲不亂的頭發(fā)上沿猜,一...
    開封第一講書人閱讀 51,146評論 1 297
  • 那天,我揣著相機與錄音碗脊,去河邊找鬼啼肩。 笑死,一個胖子當著我的面吹牛衙伶,可吹牛的內容都是我干的祈坠。 我是一名探鬼主播,決...
    沈念sama閱讀 40,032評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼矢劲,長吁一口氣:“原來是場噩夢啊……” “哼赦拘!你這毒婦竟也來了?” 一聲冷哼從身側響起卧须,我...
    開封第一講書人閱讀 38,896評論 0 274
  • 序言:老撾萬榮一對情侶失蹤另绩,失蹤者是張志新(化名)和其女友劉穎儒陨,沒想到半個月后花嘶,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,311評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡蹦漠,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,536評論 2 332
  • 正文 我和宋清朗相戀三年椭员,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片笛园。...
    茶點故事閱讀 39,696評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡隘击,死狀恐怖,靈堂內的尸體忽然破棺而出研铆,到底是詐尸還是另有隱情埋同,我是刑警寧澤,帶...
    沈念sama閱讀 35,413評論 5 343
  • 正文 年R本政府宣布棵红,位于F島的核電站凶赁,受9級特大地震影響,放射性物質發(fā)生泄漏逆甜。R本人自食惡果不足惜虱肄,卻給世界環(huán)境...
    茶點故事閱讀 41,008評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望交煞。 院中可真熱鬧咏窿,春花似錦、人聲如沸素征。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至纸淮,卻和暖如春平斩,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背咽块。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評論 1 269
  • 我被黑心中介騙來泰國打工绘面, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人侈沪。 一個月前我還...
    沈念sama閱讀 47,698評論 2 368
  • 正文 我出身青樓揭璃,卻偏偏與公主長得像,于是被迫代替她去往敵國和親亭罪。 傳聞我的和親對象是個殘疾皇子瘦馍,可洞房花燭夜當晚...
    茶點故事閱讀 44,592評論 2 353

推薦閱讀更多精彩內容