說到多線程川慌,概念性東西就不一一贅述了,首先回顧下線程的創(chuàng)建。
Java線程創(chuàng)建的四種方式
1.繼承Thread類蜒犯,重寫run方法
static class ThreadDemo extends Thread{
@Override
public void run() {
//super.run();
//業(yè)務(wù)代碼......
}
}
public static void main(String[] args) {
ThreadDemo thread = new ThreadDemo();
thread.setDaemon(true);
thread.setName("thread_demo");
thread.start();
}
2.實(shí)現(xiàn)Runnable接口抽活,重寫run方法硫戈,實(shí)現(xiàn)Runnable接口的實(shí)現(xiàn)類的實(shí)例對(duì)象作為Thread構(gòu)造函數(shù)的target
static class RunnableDemo implements Runnable{
@Override
public void run() {
//業(yè)務(wù)代碼......
}
}
public static void main(String[] args) {
Thread thread = new Thread(new RunnableDemo());
thread.start();
}
3.通過Callable和FutureTask創(chuàng)建線程
public static void main(String[] args) throws ExecutionException, InterruptedException {
CallableDemo callable = new CallableDemo();
FutureTask<Object> futureTask = new FutureTask<>(callable);
new Thread(futureTask)..start();
Object o = futureTask.get();
}
static class CallableDemo implements Callable<Object>{
@Override
public Object call() {
//業(yè)務(wù)代碼......
return null;
}
}
可以看出Callable與Runable的區(qū)別在于Callable帶有返回值且可以檢測(cè)線程是否完成
4.通過線程池創(chuàng)建線程
static class ThreadDemo extends Thread{
@Override
public void run() {
//super.run();
//業(yè)務(wù)代碼......
}
}
static class RunnableDemo implements Runnable{
@Override
public void run() {
//業(yè)務(wù)代碼......
}
}
static class CallableDemo implements Callable<Object>{
@Override
public Object call() {
//業(yè)務(wù)代碼......
return null;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.execute(new ThreadDemo());
executorService.execute(new RunnableDemo());
FutureTask<Object> futureTask = new FutureTask<>(new CallableDemo());
Future<?> submit = executorService.submit(futureTask);
submit.get();
}
說到線程池,Executor提供了四種線程池
1. newCachedThreadPool創(chuàng)建一個(gè)可緩存線程池下硕,如果線程池長(zhǎng)度超過處理需要丁逝,可靈活回收空閑線程,若無(wú)可回收梭姓,則新建線程霜幼。
2. newFixedThreadPool 創(chuàng)建一個(gè)定長(zhǎng)線程池,可控制線程最大并發(fā)數(shù)誉尖,超出的線程會(huì)在隊(duì)列中等待罪既。
3. newScheduledThreadPool 創(chuàng)建一個(gè)定長(zhǎng)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行铡恕。
4. newSingleThreadExecutor 創(chuàng)建一個(gè)單線程化的線程池琢感,它只會(huì)用唯一的工作線程來(lái)執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行探熔。
但安裝編碼規(guī)約插件的同學(xué)會(huì)發(fā)現(xiàn)用Executor創(chuàng)建線程池會(huì)爆紅提示驹针,當(dāng)然也給出了解釋:
找到源碼點(diǎn)進(jìn)去一探究竟
newFixedThreadPool除了設(shè)置了核心線程數(shù)和最大線程數(shù),其他用的都是默認(rèn)值诀艰。
那來(lái)了解下ThreadPoolExecutor的核心參數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
corePoolSize:核心線程數(shù)
核心線程會(huì)一直存活柬甥,及時(shí)沒有任務(wù)需要執(zhí)行
當(dāng)線程數(shù)小于核心線程數(shù)時(shí),即使有線程空閑其垄,線程池也會(huì)優(yōu)先創(chuàng)建新線程處理
設(shè)置allowCoreThreadTimeout=true(默認(rèn)false)時(shí)苛蒲,核心線程會(huì)超時(shí)關(guān)閉queueCapacity:任務(wù)隊(duì)列容量(阻塞隊(duì)列)
當(dāng)核心線程數(shù)達(dá)到最大時(shí),新任務(wù)會(huì)放在隊(duì)列中排隊(duì)等待執(zhí)行maxPoolSize:最大線程數(shù)
當(dāng)線程數(shù)>=corePoolSize捉捅,且任務(wù)隊(duì)列已滿時(shí)撤防。線程池會(huì)創(chuàng)建新線程來(lái)處理任務(wù)
當(dāng)線程數(shù)=maxPoolSize,且任務(wù)隊(duì)列已滿時(shí),線程池會(huì)拒絕處理任務(wù)而拋出異常keepAliveTime:線程空閑時(shí)間
當(dāng)線程空閑時(shí)間達(dá)到keepAliveTime時(shí)寄月,線程會(huì)退出辜膝,直到線程數(shù)量=corePoolSize
如果allowCoreThreadTimeout=true,則會(huì)直到線程數(shù)量=0allowCoreThreadTimeout:允許核心線程超時(shí)
rejectedExecutionHandler:任務(wù)拒絕處理器
當(dāng)線程數(shù)已經(jīng)達(dá)到maxPoolSize漾肮,切隊(duì)列已滿厂抖,會(huì)拒絕新任務(wù)
當(dāng)線程池被調(diào)用shutdown()后,會(huì)等待線程池里的任務(wù)執(zhí)行完畢克懊,再shutdown忱辅。如果在調(diào)用shutdown()和線程池真正shutdown之間提交任務(wù),會(huì)拒絕新任務(wù)
實(shí)現(xiàn)RejectedExecutionHandler接口谭溉,可自定義處理器
參數(shù)設(shè)置不當(dāng)是會(huì)出現(xiàn)oom的哦墙懂,所以要注意核心參數(shù)的默認(rèn)值
corePoolSize=1
queueCapacity=Integer.MAX_VALUE
maxPoolSize=Integer.MAX_VALUE
keepAliveTime=60s
allowCoreThreadTimeout=false
rejectedExecutionHandler=AbortPolicy()
參數(shù)設(shè)置了,在飽和的情況下ThreadPoolExecutor的處理順序是什么樣子的呢扮念?
- 當(dāng)線程數(shù)小于核心線程數(shù)時(shí)损搬,創(chuàng)建線程。
- 當(dāng)線程數(shù)大于等于核心線程數(shù)柜与,且任務(wù)隊(duì)列未滿時(shí)巧勤,將任務(wù)放入任務(wù)隊(duì)列。
- 當(dāng)線程數(shù)大于等于核心線程數(shù)弄匕,且任務(wù)隊(duì)列已滿
- 若線程數(shù)小于最大線程數(shù)颅悉,創(chuàng)建線程
- 若線程數(shù)等于最大線程數(shù),拋出異常迁匠,拒絕任務(wù)
最后分享個(gè)自己在項(xiàng)目中常用的線程池創(chuàng)建工具類
@Slf4j
public class LocalThreadPool {
public final static String poolName = "thread_pool";
private volatile static LocalThreadPool singletonPool;
private ThreadPoolExecutor executor;
private ThreadPoolExecutor callable;
public static LocalThreadPool getInstance(){
if(singletonPool == null){
synchronized (LocalThreadPool.class){
if(singletonPool == null){
singletonPool = new LocalThreadPool();
}
}
}
return singletonPool;
}
private LocalThreadPool(){
//runnable
final AtomicInteger runnableId = new AtomicInteger(0);
ThreadFactory runableFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r,"thread_pool_executor_"+runnableId);
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.error("{}:{}",t.getName(),e);
}
});
return thread;
}
};
//callable
final AtomicInteger callableId = new AtomicInteger(0);
ThreadFactory callableFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r,"thread_pool_callable"+callableId);
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.error("{}:{}",t.getName(),e);
}
});
return thread;
}
};
executor = new ThreadPoolExecutor(10,20,60,TimeUnit.SECONDS,
new LinkedBlockingQueue<>(20),runableFactory,new RejectedExecutionHandler(){
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if(!executor.isShutdown()){
r.run();
log.info("caller run runnable");
}
}
});
callable = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(20),
callableFactory, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if(!executor.isShutdown()){
r.run();
log.info("caller run callable");
}
}
});
}
public void execute(Runnable r){
executor.execute(r);
}
public <T> Future<T> submit(Callable<T> c){
return callable.submit(c);
}
}
用起來(lái)非常之方便
public static void main(String[] args) {
Future<Object> submit = LocalThreadPool.getInstance().submit(new Callable<Object>() {
@Override
public Object call() {
return null;
}
});
LocalThreadPool.getInstance().execute(new Runnable() {
@Override
public void run() {
//業(yè)務(wù)代碼......
}
});
}