1.回顧
前面一節(jié)介紹了并發(fā)容器和隊(duì)列的內(nèi)容,基于上次介紹關(guān)于隊(duì)列的內(nèi)容,才能更好的了解線程池的原理
開始介紹線程池之前,先看一道華為面試題:
兩個(gè)線程涡戳,第一個(gè)線程從1到26,第二個(gè)線程從A到Z很钓,交替順序輸出
LockSupport實(shí)現(xiàn)
package com.learn.thread.seven;
import java.util.concurrent.locks.LockSupport;
/**
* @author zglx
*
* 面試題祖乳,交替輸出
*/
public class TestQuestion {
static Thread t1 = null;
static Thread t2 = null;
public static void main(String[] args) {
char[] a1 = "12345678".toCharArray();
char[] a2 = "abcdefgh".toCharArray();
t1 = new Thread(() -> {
for (char item : a1) {
System.out.println(item);
// 叫醒t2
LockSupport.unpark(t2);
// t1阻塞
LockSupport.park();
}
},"t1");
t2 = new Thread(() -> {
for (char item : a2) {
// t2阻塞
LockSupport.park();
// 打印值
System.out.println(item);
// 叫醒t2
LockSupport.unpark(t1);
}
},"t2");
t1.start();
t2.start();
}
}
wait 和 nofity的實(shí)現(xiàn)
package com.learn.thread.seven;
import java.util.concurrent.locks.LockSupport;
public class TestQueue2 {
private static final Object object = new Object();
public static void main(String[] args) {
char[] a1 = "12345678".toCharArray();
char[] a2 = "abcdefgh".toCharArray();
new Thread(() -> {
synchronized (object) {
for (char item : a1) {
System.out.println(item);
try {
object.notify();
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 最終兩個(gè)線程終歸有一個(gè)是wait的捅僵,阻塞在這里不懂
object.notify();
}
},"t1").start();
new Thread(() -> {
synchronized (object) {
for (char item : a2) {
System.out.println(item);
try {
object.notify();
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 為什么這里要喚醒
object.notify();
}
},"t2").start();
}
}
第二個(gè)線程比第一個(gè)線程先執(zhí)行
package com.learn.thread.seven;
/**
* @author zglx
* 這里用cas自旋的方式去實(shí)現(xiàn)第二個(gè)線程比第一個(gè)線程先執(zhí)行
*/
public class TestQueue3 {
private static volatile boolean status = false;
private static final Object object = new Object();
public static void main(String[] args) {
char[] a1 = "12345678".toCharArray();
char[] a2 = "abcdefgh".toCharArray();
new Thread(() -> {
synchronized (object) {
// 第一個(gè)線程上來(lái)就等待
while (!status) {
try {
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (char item : a1) {
System.out.println(item);
try {
object.notify();
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
object.notify();
}
}).start();
new Thread(() -> {
synchronized (object) {
for (char item : a2) {
System.out.println(item);
status = true;
try {
object.notify();
object.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
object.notify();
}
}).start();
}
}
Condition 實(shí)現(xiàn)Synchronized
package com.learn.thread.seven;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestQueue5 {
static char[] a1 = "12345678".toCharArray();
static char[] a2 = "abcdefgh".toCharArray();
private static ReentrantLock reentrantLock = new ReentrantLock();
private static Condition condition = reentrantLock.newCondition();
public static void main(String[] args) {
new Thread(() -> {
try {
reentrantLock.lock();
for (char item : a1) {
System.out.println(item);
// signal 相當(dāng)于nofity
condition.signal();
// await 相當(dāng)于 wait
condition.await();
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
reentrantLock.unlock();
}
}).start();
new Thread(() -> {
try {
reentrantLock.lock();
for (char item : a2) {
System.out.println(item);
// signal 相當(dāng)于nofity
condition.signal();
// await 相當(dāng)于 wait
condition.await();
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
reentrantLock.unlock();
}
condition.signal();
}).start();
}
}
但是一個(gè)Condition 就是一個(gè)等待隊(duì)列足删,既然有兩個(gè)線程,那么就完全可以用生產(chǎn)者消費(fèi)者的模式去實(shí)現(xiàn)捕发,那么就需要兩個(gè)Condition
package com.learn.thread.seven;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestQueue6 {
static char[] a1 = "12345678".toCharArray();
static char[] a2 = "abcdefgh".toCharArray();
private static ReentrantLock reentrantLock = new ReentrantLock();
private static Condition condition1 = reentrantLock.newCondition();
private static Condition condition2 = reentrantLock.newCondition();
public static void main(String[] args) {
new Thread(() -> {
try {
reentrantLock.lock();
for (char item : a1) {
System.out.println(item);
// signal 相當(dāng)于nofity
condition2.signal();
// await 相當(dāng)于 wait
condition1.await();
}
condition2.signal();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
reentrantLock.unlock();
}
}).start();
new Thread(() -> {
try {
reentrantLock.lock();
for (char item : a2) {
System.out.println(item);
// signal 相當(dāng)于nofity
condition1.signal();
// await 相當(dāng)于 wait
condition2.await();
}
condition1.signal();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
reentrantLock.unlock();
}
}).start();
}
}
自旋鎖實(shí)現(xiàn)
線程取兩個(gè)值t1,t2疏旨,定義一個(gè)ReadyToRun的變量
剛開始是t1,相當(dāng)于一個(gè)信號(hào)燈r扎酷,某一時(shí)刻只能取一個(gè)值充石,不能同時(shí)取到兩個(gè)線程
程序上來(lái)就判斷是不是t1 如果不是就占用cpu等待,如果一看是t1就打印值,然后把r的值改成t2,打印完之后r又變成了t1骤铃。就這么交替的玩法
注意信號(hào)燈r要為volatile拉岁,保證線程之間的可見性,我們把信號(hào)燈設(shè)置為枚舉惰爬,防止它取別的值
package com.learn.thread.seven;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestQueue7 {
enum ReadyToRun{
T1,
T2
}
// 保證線程可見性喊暖,并且保證只能是兩個(gè)值
static volatile ReadyToRun readyToRun = ReadyToRun.T1;
static char[] a1 = "12345678".toCharArray();
static char[] a2 = "abcdefgh".toCharArray();
public static void main(String[] args) {
new Thread(() -> {
try {
for (char item : a1) {
// cas 無(wú)鎖化,如果不是T1,什么都不做
while (readyToRun != ReadyToRun.T1) {
}
System.out.println(item);
// 信號(hào)燈交替給t2
readyToRun = ReadyToRun.T2;
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
}
}).start();
new Thread(() -> {
try {
for (char item : a2) {
while (readyToRun != ReadyToRun.T2) {
}
System.out.println(item);
// 信號(hào)燈交換給T1
readyToRun = ReadyToRun.T1;
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
}
}).start();
}
}
BlockingQueue的玩法
前面說(shuō)到過Queue 的玩法撕瞧,可以支持多線程阻塞操作陵叽,有put和take操作,put滿的時(shí)候會(huì)被阻塞住丛版,take 的時(shí)候巩掺,如果沒有就會(huì)阻塞
實(shí)例兩個(gè)BlockingQueue,都是ArrayBlockingQueue數(shù)組實(shí)現(xiàn)的,并且長(zhǎng)度都是1
相當(dāng)于兩個(gè)容器页畦,這兩個(gè)容器里頭放兩個(gè)值胖替,這兩個(gè)值比如說(shuō)我第一個(gè)線程打印出了1我就往第一個(gè)容器里放一個(gè)ok,然后另外一個(gè)線程就盯著這個(gè)容器,有值了就立馬打印A豫缨,并且同樣往另外一個(gè)容器放一個(gè)ok,這樣第一個(gè)線程監(jiān)聽的容器就有值独令,打印出2,如此循環(huán)下去好芭。
package com.learn.thread.seven;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class TestQueue8 {
// 保證線程可見性燃箭,并且保證只能是兩個(gè)值
static char[] a1 = "12345678".toCharArray();
static char[] a2 = "abcdefgh".toCharArray();
private static BlockingQueue blockingQueue1 = new ArrayBlockingQueue(1);
private static BlockingQueue blockingQueue2 = new ArrayBlockingQueue(1);
public static void main(String[] args) {
new Thread(() -> {
try {
for (char item : a1) {
System.out.println(item);
// 賦值給第二個(gè)容器
blockingQueue2.put("ok");
// 開始監(jiān)聽
blockingQueue1.take();
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
}
}).start();
new Thread(() -> {
try {
for (char item : a2) {
blockingQueue2.take();
System.out.println(item);
blockingQueue1.put("ok");
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
}
}).start();
}
}
TransferQueue 優(yōu)化
之前說(shuō)過TransferQueue 可以跟exchange一樣做線程數(shù)據(jù)交換
第一個(gè)線程上來(lái)就take 等待第二個(gè)線程給值,拿到值打印舍败,又經(jīng)過transferQueue傳1給第二個(gè)線程
第二個(gè)線程上來(lái)就經(jīng)過transfer 把自己變量扔進(jìn)去招狸,第一個(gè)線程立馬take到了,拿出來(lái)打印邻薯。然后等待第一個(gè)線程給值瓢颅。如此循環(huán)
package com.learn.thread.seven;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;
public class TestQueue9 {
// 保證線程可見性,并且保證只能是兩個(gè)值
static char[] a1 = "12345678".toCharArray();
static char[] a2 = "abcdefgh".toCharArray();
private static TransferQueue<Character> transferQueue = new LinkedTransferQueue<>();
public static void main(String[] args) {
new Thread(() -> {
try {
for (char item : a1) {
// 取值
System.out.println(transferQueue.take());
// 傳值
transferQueue.transfer(item);
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
}
}).start();
new Thread(() -> {
try {
for (char item : a2) {
transferQueue.transfer(item);
System.out.println(transferQueue.take());
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
}
}).start();
}
}
2.線程池
線程池有幾個(gè)接口,第一個(gè)接口是Executor,第二個(gè)是ExecutorService繼承自Executor,他們的最終子類才是ThreadPoolExecutor,如下圖
Executor 執(zhí)行者 有一個(gè)方法叫做執(zhí)行弛说,執(zhí)行的東西就是Runnable 是一個(gè)接口挽懦,可以有很多實(shí)現(xiàn)。因此可以說(shuō)這里只是定義了任務(wù)的執(zhí)行是Runnable,而執(zhí)行交給了實(shí)現(xiàn)Executor的類木人。這里不像我們以前定義一個(gè)Thread信柿,new 一個(gè)Thread然后去重寫它的Run方法,然后start分開執(zhí)行醒第。
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package java.util.concurrent;
public interface Executor {
void execute(Runnable var1);
}
ExecutorService 是Executor的繼承渔嚷,它除了父類execute方法外,還完善了整個(gè)任務(wù)執(zhí)行器的一個(gè)生命周期稠曼。拿線程池舉例形病,一個(gè)線程池里面一堆線程就是一堆的工人,執(zhí)行完一個(gè)任務(wù)后這個(gè)線程該怎么結(jié)束,ExecutorService就定義了這么一些個(gè)方法
package com.learn.thread.five;
import java.util.List;
import java.util.Collection;
import java.util.concurrent.*;
public interface ExecutorService extends Executor {
// 結(jié)束
void shutdown();
// 馬上結(jié)束
List<Runnable> shutdownNow();
// 是否結(jié)束了
boolean isShutdown();
// 是不是整體都執(zhí)行完了
boolean isTerminated();
// 等著結(jié)束漠吻,等多長(zhǎng)時(shí)間量瓜,時(shí)間到了還不結(jié)束的話他就返回false
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 提交任務(wù),返回結(jié)果放在Future中
<T> Future<T> submit(Callable<T> task);
// 提交任務(wù)途乃,返回結(jié)果放在Future中
<T> Future<T> submit(Runnable task, T result);
// 提交任務(wù)绍傲,返回結(jié)果放在Future中
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
主要看任務(wù)submit方法,這里涉及了入?yún)allable和出參Future
2.1.Callable
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
/**
* A task that returns a result and may throw an exception.
* Implementors define a single method with no arguments called
* {@code call}.
*
* <p>The {@code Callable} interface is similar to {@link
* java.lang.Runnable}, in that both are designed for classes whose
* instances are potentially executed by another thread. A
* {@code Runnable}, however, does not return a result and cannot
* throw a checked exception.
*
* <p>The {@link Executors} class contains utility methods to
* convert from other common forms to {@code Callable} classes.
*
* @see Executor
* @since 1.5
* @author Doug Lea
* @param <V> the result type of method {@code call}
*/
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
里邊有一個(gè)call方法耍共,并且?guī)в蟹祷刂堤瘫鷕un方法一樣,只不過run方法沒有返回值试读。
2.2.Future
Future 代表的就是線程池執(zhí)行完runnable 后返回的結(jié)果杠纵,F(xiàn)uture表示的就是未來(lái)的執(zhí)行結(jié)果,如果任務(wù)還沒有被執(zhí)行完就調(diào)用get方法钩骇,就會(huì)造成線程阻塞比藻。
其實(shí)更靈活的用法就是FutureTask即是一個(gè)Future 同時(shí)又是一個(gè)Task,因?yàn)镃allable只能是一個(gè)Task 執(zhí)行一個(gè)任務(wù),但是不能作為一個(gè)Future來(lái)用FutureTask 實(shí)現(xiàn)了RunnbaleFuture 伊履,RunnbaleFuture實(shí)現(xiàn)了Runnable 有實(shí)現(xiàn)了Future韩容,所以它是一個(gè)任務(wù)又是一個(gè)Future款违,后面WorkStealingPool,ForkJoinPool 基本都會(huì)用到FutureTask唐瀑。
package com.learn.thread.seven;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
public class TestFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> future = new FutureTask<Integer>(() -> {
Thread.sleep(1000);
return 100;
});
new Thread(future).start();
// 會(huì)造成阻塞
System.out.println(future.get());
}
}
小結(jié)
Callable類似于Runnable 但是有返回值
了解了Future 用于存儲(chǔ)執(zhí)行的將來(lái)才會(huì)產(chǎn)生的結(jié)果
FutureTask他是Future 加上Runnable 既可以執(zhí)行也可以存結(jié)果
CompletableFuture 管理多個(gè)Future 的結(jié)果
CompletableFuture
它用來(lái)組合各種不同的任務(wù),等這個(gè)任務(wù)執(zhí)行完產(chǎn)生一個(gè)結(jié)果后進(jìn)行一個(gè)組合插爹,基于java8之前將的價(jià)格查詢器哄辣,用小程序再模擬一個(gè)。
定義三個(gè)future分別代表淘寶赠尾、京東力穗、天貓,用了CompletableFuture的一個(gè)方法叫supplyAsync產(chǎn)生了一個(gè)異步任務(wù)
每一個(gè)任務(wù)去不同的商家拉取數(shù)據(jù)气嫁,什么時(shí)候拉取完了就放在一個(gè)Future中当窗,但是要求三個(gè)商家都拉取數(shù)據(jù)完了才給最后的展示
你可以使用allOf方法相當(dāng)于這里面的所有任務(wù)完成之后,最后join寸宵,你才能繼續(xù)往下執(zhí)行崖面。
除了allOf 方法,還提供了一些lamdba表示的寫法
package com.learn.thread.seven;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class TestCompletableFuture {
public static void main(String[] args) throws IOException {
long start = System.currentTimeMillis();
CompletableFuture<Double> futurem = CompletableFuture.supplyAsync(() -> {
return priceOfM();
});
CompletableFuture<Double> futuret = CompletableFuture.supplyAsync(() -> {
return priceOfT();
});
CompletableFuture<Double> futureb = CompletableFuture.supplyAsync(() -> {
return priceOfB();
});
// 用join
CompletableFuture.allOf(futureb, futurem, futuret).join();
// 用lamdba
CompletableFuture.supplyAsync(TestCompletableFuture::priceOfM)
.thenApply(String::valueOf)
.thenApply(str -> "price" + str)
.thenAccept(System.out::println);
long end = System.currentTimeMillis();
System.out.println(end - start);
System.in.read();
}
private static double priceOfM() {
delay();
System.out.println(1.00);
return 1.00;
}
private static double priceOfB() {
delay();
System.out.println(2.00);
return 2.00;
}
private static double priceOfT() {
delay();
System.out.println(3.00);
return 3.00;
}
private static void delay() {
int time = new Random().nextInt(500);
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3.ThreadPoolExector和ForkJoinPool
我們來(lái)了解線程池梯影,線程池從目前的JDK來(lái)說(shuō)有兩種類型巫员,第一種就是普通的線程池ThreadPoolExector,第二種是ForkJoinPool甲棍,這兩種是不同的類型的線程池简识,能做的事情也不一樣。
Fork成為分叉,合并稱為join七扰,這是ForkJoinPool的一個(gè)特性奢赂,我們先來(lái)講講ThreadPoolExector
ThreadPoolExector 的父類是AbstractExecutorService,而AbstractExecutorService的父類是ExecutorService,ExecutorService的父類是Executor戳寸,所以ThreadPoolExector相當(dāng)于線程池的一個(gè)執(zhí)行器呈驶,你可以往這個(gè)池子里丟任務(wù)。
阿里開發(fā)手冊(cè)要求線程是自定義的疫鹊,下面看看如何自定義一個(gè)線程池
線程池有很多構(gòu)造方法袖瞻,來(lái)看看主要的七個(gè)參數(shù)
java多線程開發(fā)時(shí),常常用到線程池技術(shù)拆吆,這篇文章是對(duì)創(chuàng)建java線程池時(shí)的七個(gè)參數(shù)的詳細(xì)解釋聋迎。
從源碼中可以看出,線程池的構(gòu)造函數(shù)有7個(gè)參數(shù)枣耀,分別是corePoolSize霉晕、maximumPoolSize、keepAliveTime捞奕、unit牺堰、workQueue、threadFactory颅围、handler伟葫。下面會(huì)對(duì)這7個(gè)參數(shù)一一解釋。
一院促、corePoolSize 線程池核心線程大小
線程池中會(huì)維護(hù)一個(gè)最小的線程數(shù)量筏养,即使這些線程處理空閑狀態(tài),他們也不會(huì)被銷毀常拓,除非設(shè)置了allowCoreThreadTimeOut渐溶。這里的最小線程數(shù)量即是corePoolSize。
二弄抬、maximumPoolSize 線程池最大線程數(shù)量
一個(gè)任務(wù)被提交到線程池以后茎辐,首先會(huì)找有沒有空閑存活線程,如果有則直接將任務(wù)交給這個(gè)空閑線程來(lái)執(zhí)行掂恕,如果沒有則會(huì)緩存到工作隊(duì)列(后面會(huì)介紹)中拖陆,如果工作隊(duì)列滿了,才會(huì)創(chuàng)建一個(gè)新線程竹海,然后從工作隊(duì)列的頭部取出一個(gè)任務(wù)交由新線程來(lái)處理慕蔚,而將剛提交的任務(wù)放入工作隊(duì)列尾部。線程池不會(huì)無(wú)限制的去創(chuàng)建新線程斋配,它會(huì)有一個(gè)最大線程數(shù)量的限制孔飒,這個(gè)數(shù)量即由maximunPoolSize指定灌闺。
三、keepAliveTime 空閑線程存活時(shí)間
一個(gè)線程如果處于空閑狀態(tài)坏瞄,并且當(dāng)前的線程數(shù)量大于corePoolSize桂对,那么在指定時(shí)間后,這個(gè)空閑線程會(huì)被銷毀鸠匀,這里的指定時(shí)間由keepAliveTime來(lái)設(shè)定
四蕉斜、unit 空閑線程存活時(shí)間單位
keepAliveTime的計(jì)量單位
五、workQueue 工作隊(duì)列
新任務(wù)被提交后缀棍,會(huì)先進(jìn)入到此工作隊(duì)列中宅此,任務(wù)調(diào)度時(shí)再?gòu)年?duì)列中取出任務(wù)。jdk中提供了四種工作隊(duì)列:
①ArrayBlockingQueue
基于數(shù)組的有界阻塞隊(duì)列爬范,按FIFO排序父腕。新任務(wù)進(jìn)來(lái)后,會(huì)放到該隊(duì)列的隊(duì)尾青瀑,有界的數(shù)組可以防止資源耗盡問題璧亮。當(dāng)線程池中線程數(shù)量達(dá)到corePoolSize后,再有新任務(wù)進(jìn)來(lái)斥难,則會(huì)將任務(wù)放入該隊(duì)列的隊(duì)尾枝嘶,等待被調(diào)度。如果隊(duì)列已經(jīng)是滿的哑诊,則創(chuàng)建一個(gè)新線程群扶,如果線程數(shù)量已經(jīng)達(dá)到maxPoolSize,則會(huì)執(zhí)行拒絕策略搭儒。
②LinkedBlockingQuene
基于鏈表的無(wú)界阻塞隊(duì)列(其實(shí)最大容量為Interger.MAX)穷当,按照FIFO排序提茁。由于該隊(duì)列的近似無(wú)界性淹禾,當(dāng)線程池中線程數(shù)量達(dá)到corePoolSize后,再有新任務(wù)進(jìn)來(lái)茴扁,會(huì)一直存入該隊(duì)列铃岔,而不會(huì)去創(chuàng)建新線程直到maxPoolSize,因此使用該工作隊(duì)列時(shí)峭火,參數(shù)maxPoolSize其實(shí)是不起作用的毁习。
③SynchronousQuene
一個(gè)不緩存任務(wù)的阻塞隊(duì)列,生產(chǎn)者放入一個(gè)任務(wù)必須等到消費(fèi)者取出這個(gè)任務(wù)卖丸。也就是說(shuō)新任務(wù)進(jìn)來(lái)時(shí)纺且,不會(huì)緩存,而是直接被調(diào)度執(zhí)行該任務(wù)稍浆,如果沒有可用線程载碌,則創(chuàng)建新線程猜嘱,如果線程數(shù)量達(dá)到maxPoolSize,則執(zhí)行拒絕策略嫁艇。
④PriorityBlockingQueue
具有優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列朗伶,優(yōu)先級(jí)通過參數(shù)Comparator實(shí)現(xiàn)。
六步咪、threadFactory 線程工廠
創(chuàng)建一個(gè)新線程時(shí)使用的工廠论皆,可以用來(lái)設(shè)定線程名、是否為daemon線程等等
七猾漫、handler 拒絕策略
當(dāng)工作隊(duì)列中的任務(wù)已到達(dá)最大限制点晴,并且線程池中的線程數(shù)量也達(dá)到最大限制,這時(shí)如果有新任務(wù)提交進(jìn)來(lái)悯周,該如何處理呢觉鼻。這里的拒絕策略,就是解決這個(gè)問題的队橙,jdk中提供了4中拒絕策略:
①CallerRunsPolicy
該策略下坠陈,在調(diào)用者線程中直接執(zhí)行被拒絕任務(wù)的run方法,除非線程池已經(jīng)shutdown捐康,則直接拋棄任務(wù)仇矾。
②AbortPolicy
該策略下,直接丟棄任務(wù)解总,并拋出RejectedExecutionException異常贮匕。
③DiscardPolicy
該策略下,直接丟棄任務(wù)花枫,什么都不做刻盐。
④DiscardOldestPolicy
該策略下,拋棄進(jìn)入隊(duì)列最早的那個(gè)任務(wù)劳翰,然后嘗試把這次拒絕的任務(wù)放入隊(duì)列
到此敦锌,構(gòu)造線程池時(shí)的七個(gè)參數(shù),就全部介紹完畢了
一般來(lái)說(shuō)都是需要保存來(lái)的CallerRuns
package com.learn.thread.seven;
import com.learn.thread.one.T;
import java.io.IOException;
import java.util.concurrent.*;
public class TestTask {
static class Task implements Runnable{
private int i;
public Task(int i) {
this.i = i;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "task" + i);
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2,4,60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(4), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 8; i++) {
poolExecutor.execute(new Task(i));
}
System.out.println(poolExecutor.getQueue());
poolExecutor.execute(new Task(100));
System.out.println(poolExecutor.getQueue());
poolExecutor.shutdown();
}
}