導(dǎo)讀目錄
- 線程組(ThreadGroup)
- 線程池(Thread Pool)
- Fork/Join框架和Executor框架
1.線程組
Java使用ThreadGroup來表示線程組色鸳,它可以對一批線程進行分類管理畸冲,Java允許程序直接對線程組進行控制苦银。
用戶創(chuàng)建的所有線程都屬于指定線程組,如果沒有顯示指定線程屬于哪個線程組名船,則該線程屬于默認線程組(與其父線程處于同一個線程組)
一旦某個線程加入了指定線程組之后,該線程將一直屬于該線程組旨怠,知道線程死亡渠驼,線程運行中途不能改變它所屬的線程組
(1)指定線程組的Thread構(gòu)造方法
Thread來提供的幾個構(gòu)造器用來設(shè)置新創(chuàng)建的線程屬于那個線程組:
Thread(ThreadGroup group, Runnable target); //以target的run()方法為線程執(zhí)行體創(chuàng)建新線程,屬于group線程組
Thread(ThreadGroup group, Runnable target, String name);//相對于上面為線程指定了名字
Thread(ThreadGroup group, Runnable target, String name, long stackSize)鉴腻;
Thread(ThreadGroup group, String name)渴邦;//創(chuàng)建新線程疯趟,名字為name,屬于group線程組
//返回該線程所屬的線程組,由于不能中途改變所屬組谋梭,因此沒有setThreadGroup()
ThreadGroup getThreadGroup();
(2)ThreadGroup相關(guān)方法
public class ThreadGroup extends Object implements Thread.UncaughtExceptionHandler
Thread.UncaughtExceptionHandler是Thread類的靜態(tài)內(nèi)部類信峻,代表一個異常處理器。其作用是用于處理該線程未處理的異常
ThreadGroup類實現(xiàn)了Thread.UncaughtExceptionHandler接口瓮床,該接口內(nèi)只有一個方法
public static interface Thread.UncaughtExceptionHandler {
//用于處理線程發(fā)生的未處理異常
void uncaughtException(Thread t, Throwable e)盹舞;
}
ThreadGrou構(gòu)造器
ThreadGroup(String name);//以指定的線程組名字來創(chuàng)建新的線程組
ThreadGroup(ThreadGroup parent, String name);//以指定的名字、指定的父線程組創(chuàng)建一個新的線程組
String getName(); //返回該線程組的名字隘庄,
線程組的名字一旦指定后是不允許改變的
ThreadGroup中用于操作線程組里的所有線程方法:
int activeCount(); //返回此線程組中活動線程的數(shù)目
void interrupt(); //中斷此線程組中所有的線程
boolean isDaemon(); //返回此線程組是否為后臺線程組
void setDaemon(boolean daemon); //設(shè)置給線程組是后臺線程組
注意:后臺線程組的最后一個線程執(zhí)行結(jié)束后踢步,或最后一個線程被銷毀后,該后臺線程組將自動銷毀
void setMaxpriority(int pri);//設(shè)置線程組的最高優(yōu)先級
//該方法是對Thread.UncaughtExceptionHandler接口里的方法實現(xiàn)丑掺。
void uncaughtException(Thread t, Throwable e); //處理該線程組內(nèi)的任意線程t拋出的未處理異常获印。t代表出現(xiàn)異常的線程,e代表該線程拋出的異常
/****************/
小結(jié):
1.每個線程都有所屬的線程組(默認或指定的)街州,而每個線程組都會異常處理方法兼丰,即這里的uncaughtException(t, e)方法。這也是所有處于該線程組里的線程默認的異常處理器(即該方法)唆缴。那么我們其實是可以單獨為每一個線程指定異常處理器(即Thread.UncaughtExceptionHandler的其他實現(xiàn)類)
2.對于一個特定的線程實例而言鳍征,其異常處理器可能有3個:(1)單獨為該線程實例指定的處理器,(2)該線程實例的所屬線程類2017/8/30的處理器面徽。(3)所在線程組的處理器
Thread類里可以設(shè)置異常處理器的方法:
static void setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh);//為該線程類的所有實例設(shè)置默認的異常處理器
void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh);//為指定的線程實例設(shè)置異常處理器
//自定義異常處理器
class MyExHandler implements Thread.UncaughtExceptionHandler {
// 實現(xiàn)uncaughtException方法艳丛,該方法將處理線程的未處理異常
public void uncaughtException(Thread t, Throwable e) {
System.out.println(t + " 線程出現(xiàn)了異常:" + e);
}
}
//程序沒有正常結(jié)束,說明該處理器處理完異常后還是把異常傳給其調(diào)用者(這里就是JVM了)
public class ExHandler {
public static void main(String[] args) {
// 設(shè)置主線程的異常處理器
Thread.currentThread().setUncaughtExceptionHandler(new MyExHandler());
int a = 5 / 0;
System.out.println("程序正常結(jié)束趟紊!");
}
}
注意:異常處理器和通過catch捕獲異常是不同的氮双。catch捕獲異常時,該異常時不會向上一級調(diào)用者傳播霎匈;但是異常處理器對異常處理之后眶蕉,該異常仍會向上一級調(diào)用者傳播(如上面的例子)。
2.線程池
系統(tǒng)啟動一個線程的成本很高唧躲,因為它涉及與操作系統(tǒng)交互造挽。因此,使用線程池可以很好的提高性能弄痹,尤其是當(dāng)程序中需要創(chuàng)建大量的生存期很短的線程時饭入。
類似于數(shù)據(jù)庫連接池,線程池在系統(tǒng)啟動的時候會創(chuàng)建大量的空閑的線程肛真,當(dāng)程序?qū)⒁粋€Runnable對象或Callable對象傳給線程池谐丢,線程池會啟動一個線程來執(zhí)行它們的run()或call()方法。執(zhí)行完畢,該線程并不會死亡乾忱,而是返回線程池成為空閑狀態(tài)哨啃,等待下一次被調(diào)用懦鼠。
線程池的優(yōu)點:
1.提高線程池的性能,不用重復(fù)創(chuàng)建線程。
2.線程池可以有效的控制系統(tǒng)中并發(fā)線程的數(shù)量酬姆。
(1)Executors工廠類
專門用于創(chuàng)建線程池痢法,可以執(zhí)行Runnable萨脑,Callable對象所代表的線程图云。
//Executors類
public class Executors extends Object
//Executor接口
public interface Executor
//ExecutorService接口克婶,代表盡快執(zhí)行線程(只要有空閑就立即執(zhí)行)
public interface ExecutorService extends Executor
//ScheduledExecutorService是ExecutorService的子接口嘀掸,用于表示一個可以在指定延遲后執(zhí)行線程任務(wù);
public interface ScheduledExecutorService extends ExecutorService
Executors提供如下幾個靜態(tài)工廠方法:
生產(chǎn)ExecutorService類的線程池(盡快立即執(zhí)行)
static ExecutorService newCachedThreadPool();//創(chuàng)建一個具有緩存功能的線程池,系統(tǒng)根據(jù)需要創(chuàng)建線程揩晴,這些線程將會被緩存在線程池中。
static ExcecutorService newFixedThreadPool(int corePoolSize);//創(chuàng)建一個可重用的、具有固定線程數(shù)的線程池
static ExecutorService newSingleThreadExecutor();//創(chuàng)建一個只有單線程的線程池
生產(chǎn)ScheduledExecutorService<>類的線程池(延遲執(zhí)行)
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);//創(chuàng)建具有指定線程數(shù)的線程池,可以在指定延遲后執(zhí)行線程任務(wù)
static ScheduledExecutorService newSingleThreadScheduledExecutor();// 創(chuàng)建只有一個線程的線程池,可以在指定延遲后執(zhí)行線程任務(wù)
Java8中新增的方法(相當(dāng)于后臺線程,用于支持多CPU的并行特點)
//parallelism參數(shù)代表"目標(biāo)并行級別(即CPU的個數(shù))"
static ExecutorService newWorkStealingPool(int parallelism);//生成的work-stealing線程池相當(dāng)于后臺線程城池耍目,當(dāng)所有的前臺線程都死亡了,work-stealing池中的線程會自動死亡。
static ExecutorService newWorkStealingPool();//
(2)ExecutorService線程池
ExecutorService接口,代表盡快執(zhí)行線程(只要有空閑就立即執(zhí)行),需要程序?qū)unnable或Callable對象提交給該線程池弛饭,該線程池就會盡快執(zhí)行該任務(wù)。
Future<?> submit(Runnable task);//Future代表Runnable任務(wù)的返回值,但run()沒有返回值拒担,故該方法執(zhí)行結(jié)束后返回null
注意:可以使用Future的boolean isCancelled()、boolean isDone()來獲得Runnable對象的執(zhí)行狀態(tài)
<T> Future<T> submit(Runnable task, T result); //result顯示指定線程執(zhí)行結(jié)束后的返回值,即run()執(zhí)行結(jié)束后的值
<T> Future<T> submit(Callable<T> task);//執(zhí)行Callable對象代表的任務(wù),F(xiàn)uture代表執(zhí)行結(jié)束的返回值,即call()方法的返回值
void shutdown(); //將啟動線程池的關(guān)閉序列气堕,關(guān)閉后不再接受新任務(wù),但會將以前提交的任務(wù)執(zhí)行完成。
List<Runnable> shutdownNow(); //試圖停止所有正在執(zhí)行的任務(wù),暫停處理正在等待的任務(wù)雀费,并返回等待執(zhí)行的任務(wù)列表
(3)ScheduledExecutorService線程池
代表可在指定延遲后或周期性的執(zhí)行線程任務(wù)的線程池薄啥。提供如下4個方法:
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);//指定callable任務(wù),在delay延遲后執(zhí)行
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);//指定commandr任務(wù),將在delay延遲后執(zhí)行
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);//指定command任務(wù),將在initialDelay延遲后,以period為周期重復(fù)執(zhí)行
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);//在initialDelay延遲后開始執(zhí)行菩暗,并在每一次執(zhí)行終止和下一次執(zhí)行開始之間都有固定的延遲
(4)使用線程池來執(zhí)行線程任務(wù)的步驟:
1.獲得線程池:調(diào)用Executors類的靜態(tài)工廠方法創(chuàng)建一個ExecutorService對象佑稠。即獲得一個線程池
2.創(chuàng)建執(zhí)行體:創(chuàng)建Runnable對象或Callable對象的實例舌胶,作為線程執(zhí)行任務(wù)
3.提交任務(wù):調(diào)用ExecutorService對象的submit()方法來提交Runnable或Callable實例
4.關(guān)閉線程池:當(dāng)不想提交任何任務(wù)時捆蜀,調(diào)用ExecutorService的shutdown()方法來關(guān)閉線程池
例子
ExecutorService pool = Executors.newFixedThreadPool(1);//1
//2. 使用Lambda表達式創(chuàng)建Runnable對象
Runnable target = () -> {
for (int i = 0; i < 100 ; i++ )
{
System.out.println(Thread.currentThread().getName()
+ "的i值為:" + i);
}
};
// 3.向線程池中提交兩個線程
pool.submit(target);
pool.submit(target);
//4. 關(guān)閉線程池
pool.shutdown();
3.Fork/Join框架
執(zhí)行器框架(Executor Framework)將任務(wù)的創(chuàng)建和執(zhí)行進行了分離,通過這個框架,只需要實現(xiàn)Runnable接口的對象和使用Executor對象辆它,然后將Runnable對象發(fā)送給執(zhí)行器誊薄。執(zhí)行器再負責(zé)運行這些任務(wù)所需要的線程,包括線程的創(chuàng)建锰茉,線程的管理以及線程的結(jié)束呢蔫。
java 7則又更進了一步,它包括了ExecutorService接口的另一種實現(xiàn)飒筑,用來解決特殊類型的問題片吊,它就是Fork/Join框架,有時也稱分解/合并框架协屡。
Fork/Join框架是用來解決能夠通過分治技術(shù)(Divide and Conquer Technique)將問題拆分成小任務(wù)的問題俏脊。在一個任務(wù)中,先檢查將要解決的問題的大小肤晓,如果大于一個設(shè)定的大小联予,那就將問題拆分成可以通過框架來執(zhí)行的小任務(wù)。如果問題的大小比設(shè)定的大小要小材原,就可以直接在任務(wù)里解決這個問題沸久,然后,根據(jù)需要返回任務(wù)的結(jié)果余蟹。
Fork/Join框架基于以下兩種操作:
分解(Fork)操作:當(dāng)需要將一個任務(wù)拆分成更小的多個任務(wù)時卷胯,在框架中執(zhí)行這些任務(wù);
合并(Join)操作:當(dāng)一個主任務(wù)等待其創(chuàng)建的多個子任務(wù)的完成執(zhí)行威酒。
Fork/Join框架和執(zhí)行器框架(Executor Framework)主要的區(qū)別在于工作竊取算法(Work-Stealing Algorithm)
與執(zhí)行器框架不同窑睁,使用Join操作讓一個主任務(wù)等待它所創(chuàng)建的子任務(wù)的完成,執(zhí)行這個任務(wù)的線程稱之為工作者線程(Worker Thread)葵孤。工作者線程尋找其他仍未被執(zhí)行的任務(wù)担钮,然后開始執(zhí)行。
Fork/Join框架的核心是由下列兩個類組成的尤仍。
ForkJoinPool:這個類實現(xiàn)了ExecutorService接口和工作竊取算法(Work-Stealing Algorithm)箫津。它管理工作者線程,并提供任務(wù)的狀態(tài)信息宰啦,以及任務(wù)的執(zhí)行信息苏遥。
ForkJoinTask:這個類是一個將在ForkJoinPool中執(zhí)行的任務(wù)的基類。
Fork/Join框架提供了在一個任務(wù)里執(zhí)行fork()和join()操作的機制和控制任務(wù)狀態(tài)的方法赡模。通常田炭,為了實現(xiàn)Fork/Join任務(wù),需要實現(xiàn)一個以下兩個類之一的子類:
RecursiveAction:用于任務(wù)沒有返回結(jié)果的場景漓柑。
RecursiveTask:用于任務(wù)有返回結(jié)果的場景教硫。
ForkJoinPool(通用池)
ForkJoinPool類是一個特殊的Executor執(zhí)行器類型叨吮,
為了利用多CPU的特點,將一個任務(wù)拆成多個小任務(wù)瞬矩,把多個小任務(wù)放在多個處理器上并行(不是并發(fā)哦)執(zhí)行挤安;當(dāng)這些小任務(wù)執(zhí)行結(jié)束后,在將這些誒結(jié)果合并起來即可
插入圖
構(gòu)造器:
ForkJoinPool();//以Runntime.availableProcessors()方法的返回值作為parallelism參數(shù)來創(chuàng)建ForkJoinPool丧鸯,即以實際的核心數(shù)作為目標(biāo)并行級別
ForkJoinPool(int parallelism);//創(chuàng)建一個包含parallelism個并行線程的ForkJoinPool
Java8擴展了ForkJoinPool的功能蛤铜,提供如下2個新的方法:
static ForkJoinPool commonPool();//返回一個通用池,該通用池不受shutdown()或shutdownNow()方法的影響丛肢。
static int getCommonPoolParallelism();//返回通用池的并行級別
//執(zhí)行指定的任務(wù):ForkJoinTask
<T> ForkJoinTask<T> submit(ForkJoinTask<T> task);
<T> T invoke(ForkJoinTask<T> task);
當(dāng)然了围肥,也可以直接執(zhí)行Runnable任務(wù)
void execute(Runnable task)
(2)ForkJoinTask類
上面的ForkJoinTask代表一個可以并行、合并的任務(wù)蜂怎。ForkJoinTask是抽象類穆刻,它有兩個抽象的子:
RecursiveAction(代表無返回值的任務(wù)),
protected abstract void compute();//任務(wù)的主要計算是通過這個方法, 無返回值
RecursiveTask(代表有返回值的任務(wù))
protected abstract V compute();//有返回值
方法:
ForkJoinTask<V> fork(); //安排異步執(zhí)行此任務(wù)(即執(zhí)行方法的調(diào)用者里封裝的任務(wù))
V join(); //返回計算完成時的結(jié)果。
雖然ForkJoinPool類是設(shè)計用來執(zhí)行ForkJoinTask對象的杠步,但也可以直接用來執(zhí)行Runnable和Callable對象氢伟。當(dāng)然,也可以使用ForkJoinTask類的adapt()方法來接收一個Callable對象或者一個Runnable對象幽歼,然后將之轉(zhuǎn)化為一個ForkJoinTask對象朵锣,然后再去執(zhí)行。
static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable); //將一個Callable對象封裝成ForkJoinTask對象甸私,然后再通過ForkJoinPool來執(zhí)行
static ForkJoinTask<?> adapt(Runnable runnable);//同上面诚些,只是封裝的Runnable對象
執(zhí)行不需要返回值的任務(wù)事例
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
public class ForkJoinPoolTest {
public static void main(String[] args) throws Exception{
//1.創(chuàng)建執(zhí)行器(即通用線程池)
ForkJoinPool pool = new ForkJoinPool();
//2.創(chuàng)建任務(wù)
PrintTask task = new PrintTask(0, 500);
//3.提交任務(wù)(即執(zhí)行任務(wù))
pool.submit(task);
pool.awaitTermination(2, TimeUnit.SECONDS);//表示當(dāng)所有任務(wù)執(zhí)行結(jié)束后,程序在此阻塞2s
//4.關(guān)閉線程池
pool.shutdown();
}
}
//任務(wù)類
class PrintTask extends RecursiveAction {
private static final int THRESHOLD = 50;
private int start;
private int end;
//打印從start到end的任務(wù)
public PrintTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if(end - start < THRESHOLD) {
for(int i = start; i < end; i++) {
System.out.println(Thread.currentThread().getName() + "打印 " + i);
}
}else {
int mid = (start + end) / 2;
PrintTask left = new PrintTask(start, mid);
PrintTask right = new PrintTask(mid, end);
//并行執(zhí)行這兩個任務(wù)
left.fork();
right.fork();
//由于沒有返回值皇型,因此這里就不用合并結(jié)果了
}
}
}
帶返回值的事例
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.Future;
import java.util.concurrent.ForkJoinPool;
public class ForkJoinPoolTest2 {
public static void main(String[] args) throws Exception{
//1.創(chuàng)建執(zhí)行器(即通用線程池)
ForkJoinPool pool = new ForkJoinPool();
//2.創(chuàng)建任務(wù)
int[] arrs = new int[1000];
for(int i = 0; i < arrs.length; i++) {
arrs[i] = i + 1;
}
PrintTask task = new PrintTask(arrs, 0, arrs.length);
//3.提交任務(wù)(即執(zhí)行任務(wù))
Future<Integer> f = pool.submit(task);
//4.返回結(jié)果
int sum = f.get()
System.out.println(sum);
//4.關(guān)閉線程池
pool.shutdown();
}
}
//任務(wù)
class PrintTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 20;
private int start;
private int end;
private int[] arrs; //數(shù)組
//累加數(shù)組元素值
public PrintTask(int[] arrs, int start, int end) {
this.start = start;
this.end = end;
this.arrs = arrs;
}
@Override
protected Integer compute() {
int sum = 0;
if(end - start < THRESHOLD) {
for(int i = start; i < end; i++) {
sum += arrs[i];
}
return sum;
}else {
int mid = (start + end) / 2;
PrintTask left = new PrintTask(arrs, start, mid);
PrintTask right = new PrintTask(arrs, mid, end);
//并行執(zhí)行這兩個任務(wù)
left.fork();
right.fork();
//把每個小任務(wù)的結(jié)果加起來
return left.join() + right.join();
}
}
}