1.回顧
之前說的Executor作用是把線程的定義和執(zhí)行分開半哟,主要是用來做線程的執(zhí)行接口训唱,在他下面還有一個控制著線程生命周期的ExecutorService,然后才是各種各樣的ThreadPoolExecutor拐辽,把線程池作為一個執(zhí)行的單元,給他單獨出一個類育韩,下面是他的七個參數(shù)
corePoolSize 核心線程數(shù)
maxmumPoolSize 最大線程數(shù)
keepAliveTime 生存時間
TimeUnit 生存時間的單位
BlockingQueue 任務(wù)隊列
**ThreadFactory **線程工廠
RejectStrategy 拒絕策略(Abort 拋異常 Discard扔掉 不拋異常 DiscardOldest 扔掉排隊時間最久的,CallerRuns 調(diào)用處理者處理服務(wù))
2.jdk自帶線程池
今天我們來看看JDK給我們提供了一些默認線程池的實現(xiàn)闺鲸,默認的常用的有哪些筋讨,然后來讀讀ThreadPoolExecutor源碼
所有的線程池都是繼承ExecutorService的,所以Executors是對線程執(zhí)行的工具類摸恍,可以看做是線程的工廠悉罕,產(chǎn)生各種各樣的線程池
2.1.SingleThreadPool
先來看第一個SingleThreadPool ,看這個名字就覺得只有一個線程,這個一個線程的線程池可以保證扔進去的任務(wù)是順序執(zhí)行的
package com.learn.thread.eight;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestSingleThreadPool {
private static ExecutorService service = Executors.newSingleThreadExecutor();
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
final int j = i;
// 順序執(zhí)行的
service.execute(() -> {
System.out.println(j + " " +Thread.currentThread().getName());
});
}
}
}
2.2.CacheThreadPool
我們來看第二種立镶,看他源碼實際上是跟之前的SingleThreadPool一樣壁袄,底層是還是ThreadPoolExecutor
沒有核心線程數(shù),最大線程可以很Intger的最大值個媚媒,如果60秒沒人理他嗜逻,自動被回收
任務(wù)隊列用的是SynchronousQueue 不是用來存數(shù)據(jù)的,用來傳遞消息的缭召,如果任務(wù)沒有被執(zhí)行栈顷,就會被阻塞
用的是默認線程工廠
沒有指定拒絕策略,用默認拒絕策略
可以看出CachePool的特點嵌巷,就是你來一個任務(wù)我啟動一個線程萄凤。啟動線程的邏輯如下
如果線程沒有被回收,就去看當前線程池的線程是不是有空閑的線程搪哪,如果有就執(zhí)行讓它去執(zhí)行任務(wù)靡努。如果沒有,就自己new 一個線程去執(zhí)行噩死,原因是隊列是SynchronousQueue 颤难,它必須保證它的大小為0,所以你來一個任務(wù)必須有一個線程去執(zhí)行已维,不然別的線程提交任務(wù)就統(tǒng)統(tǒng)阻塞了
來看這個小程序行嗤,首先將線程池service打印出來,最后又打印一遍線程池services垛耳,然后任務(wù)是睡眠500毫秒
package com.learn.thread.eight;
import com.sun.tools.internal.ws.wsdl.document.soap.SOAPUse;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author zglx
*/
public class TestCachePool {
private static ExecutorService service = Executors.newCachedThreadPool();
public static void main(String[] args) throws InterruptedException {
System.out.println(service);
for (int i = 0; i < 2; i++) {
// 順序執(zhí)行的
Thread.sleep(100);
service.execute(() -> {
try {
Thread.sleep(0);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
}
System.out.println(service);
Thread.sleep(800);
System.out.println(service);
}
}
注意線程存活的時間是60S栅屏,所以第一個線程被復(fù)用了
2.3.FixedThreadPool
fixed是固定的含義,就是固定一個線程數(shù)堂鲜。
fixedThreadPool指定一個參數(shù)栈雳,到底有多少線程,核心線程數(shù)和最大線程數(shù)是固定的缔莲,所以不存在回收之說哥纫,但是這里用的是LinkedBlockingQueue,這里一定要小心痴奏,因為是不建議使用的蛀骇,因為是會造成內(nèi)存泄漏
但是用fixedThreadPool有一個好處厌秒,可以進行并行的計算
并行與并發(fā) 并發(fā)是指任務(wù)的提交、并行是指任務(wù)執(zhí)行擅憔,并行是并發(fā)的子集鸵闪,并行是多個cpu可以同時進行處理,并發(fā)是多個任務(wù)同時過來暑诸。
我們看下面一個程序蚌讼,用多線程計算1-200000中的質(zhì)數(shù),可以對這個區(qū)間進行分組
package com.learn.thread.eight;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class TestFixedThreadPool {
public static void main(String[] args) throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
List<Integer> list = getPrime(1,200000);
list.forEach(System.out::println);
long end = System.currentTimeMillis();
System.out.println("time " + (end - start));
start = System.currentTimeMillis();
int num = 4;
ExecutorService service = Executors.newFixedThreadPool(num);
Future<List<Integer>> future1 = service.submit(new Mytask(1, 80000));
Future<List<Integer>> future2 = service.submit(new Mytask(80001, 160000));
Future<List<Integer>> future3 = service.submit(new Mytask(160001, 200000));
future2.get().addAll(future3.get());
future1.get().addAll(future2.get());
future1.get().forEach(System.out::println);
end = System.currentTimeMillis();
System.out.println("time " + (end - start));
}
public static List<Integer> getPrime(Integer start, Integer end) {
List<Integer> list = new ArrayList<>(100);
for (Integer i = start; i <= end; i++) {
if (isPrime(i)) {
list.add(i);
}
}
return list;
}
public static boolean isPrime(Integer num) {
for (int i = 2; i <= num/2; i++) {
if (num % i == 0) {
return false;
}
}
return true;
}
static class Mytask implements Callable<List<Integer>> {
int start;
int end;
public Mytask() {
}
public Mytask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
public List<Integer> call() throws Exception {
List<Integer> result = getPrime(start, end);
return result;
}
}
}
2.4.cacahe vs fixed
什么時候用cache 什么時候用fixed 个榕,這得看你的業(yè)務(wù)量篡石,如果線程池的線程太多,他們就會競爭稀缺的處理器和內(nèi)存資源,浪費大量的時候在上下文切換笛洛,反之如果線程太少夏志,處理器就可能無法充分利用。
建議:線程池大小與處理器的利用率之比可以使用公式來進行估算
線程池 = 你有多少個CPU乘以cpu期望利用率 乘以 (1+W/C)W除C是等待時間與計算時間的比率
表達式為
Nthread = Ncpu * Ucpu * (1+W/C)
如果你的任務(wù)不確定是否平穩(wěn)苛让,但是要保證任務(wù)來的時候有線程去執(zhí)行沟蔑,那我們就可以用cache,當然你要保證這個任務(wù)不會堆積狱杰。
假如你大概估了線程的值瘦材,這個值完全可以處理任務(wù),我可以直接New 一個線程來執(zhí)行仿畸,那就用fixed食棕,但是阿里不建議這么使用
2.5.ScheduledThreadPool
定時任務(wù)線程池,就是一個定時器任務(wù)错沽,隔一段時間后執(zhí)行簿晓,這個就是我們專門用來執(zhí)行定時任務(wù)的一個線程池。
這里super調(diào)用的是ThreadPoolExecutor 本質(zhì)上還是ThreadPoolExecutor千埃,它最大線程數(shù)也是Integer的最大值憔儿,用的隊列是DelayedWorkQueue。
它有一些好用的方法放可,比如scheduleAtFixedRate間隔多長時間在一個固定頻率執(zhí)行一次這個任務(wù)谒臼,可用通過這樣靈活的時間配置。
第一個參數(shù)是Delay耀里,第一次執(zhí)行任務(wù)在此之后多長時間
第二個參數(shù)period間隔多長時間
第三個參數(shù)是時間單位
package com.learn.thread.eight;
import com.learn.thread.five.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class TestScheduledPool {
private static ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
public static void main(String[] args) {
// 5秒打印一次線程名字
service.scheduleAtFixedRate(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}, 0, 5, TimeUnit.SECONDS);
}
}
假如有一個鬧鐘服務(wù)蜈缤,假如有十億人訂閱了這個鬧鐘,意味著冯挎,每天早上會有10億的并發(fā)量底哥,你怎么優(yōu)化
思想就是把這個定時的任務(wù)分發(fā)到很多個邊緣的服務(wù)器上,假如說你有一臺服務(wù)器,你也是要用到多線程去消費叠艳,總之就是一個分而治之的思想
SingleThreadPool 只有一個線程的線程池
FixedThreadPool 固定多少個線程
CacheThreadPool 有彈性的線程池奶陈,有一個啟動一個,只要沒有空閑的就啟動一個新的來執(zhí)行
ScheduleThreadPool 定時任務(wù)來執(zhí)行線程池
package com.learn.thread.eight;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
/**
* 自定義線程池的拒絕策略
*/
public class TestRejectedHandler {
private static ThreadFactory factory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread("t1");
}
};
private static ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),factory, new MyHandler());
static class MyHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("asdasd");
}
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
3.ThreadPoolExecutor
3.1.常用變量的解釋
// AtomicInteger是int類型附较,是32位。高3位代表線程狀態(tài)潦俺,低29位代表目前線程池有多少個線程數(shù)量拒课,這里把兩個值合二為一就是算了執(zhí)行效率更高一些,因為都需要線程同步事示,而同步一個值往往比同步一個值容易的多
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer.size為32早像,所以COUNT_BITS為29
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY 線程允許的最大線程數(shù),1左移29位肖爵,然后減1卢鹦,就是2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// (線程有五種狀態(tài)按大小排序為 RUNNING -> SHUTDOWN -> STOP -> TIDYING -> TERMINATED)
// 正常運行的
private static final int RUNNING = -1 << COUNT_BITS;
// 調(diào)用shutdown方法進去了shutdown狀態(tài)
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 調(diào)用shutdown馬上停止
private static final int STOP = 1 << COUNT_BITS;
// 調(diào)用了shutdown然后這個線程也執(zhí)行完了,現(xiàn)在正在處理的過程叫做TIDYING
private static final int TIDYING = 2 << COUNT_BITS;
// 整個線程全部結(jié)束
private static final int TERMINATED = 3 << COUNT_BITS;
// 獲取線程池的狀態(tài) 通過按位與操作劝堪,低29位將全部變成0
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 獲取線程的數(shù)量冀自,通過按位與操作,高3位全部變成0
private static int workerCountOf(int c) { return c & CAPACITY; }
// 根據(jù)線程池狀態(tài)和線程池的線程數(shù)量生成ct1值
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 線程池狀態(tài)小于xx
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// 線程池狀態(tài)大于等于xx
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
3.2.構(gòu)造方法
// 構(gòu)造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 基本類型參數(shù)校驗
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
// 空指針校驗
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
// 安全管理器
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
// 核心線程數(shù)
this.corePoolSize = corePoolSize;
// 最大線程池數(shù)
this.maximumPoolSize = maximumPoolSize;
//
this.workQueue = workQueue;
// 根據(jù)傳入?yún)?shù)unit和keepAliveTime 將存活時間轉(zhuǎn)換為納秒存到變量keepAliveTime中
this.keepAliveTime = unit.toNanos(keepAliveTime);
// 線程工廠
this.threadFactory = threadFactory;
// 策略
this.handler = handler;
}
3.3. 提交執(zhí)行task的過程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1.判斷線程池活著的那些線程數(shù)是不是小于核心線程數(shù)秒啦,如果小于就addWorker添加一個線程熬粗。
if (workerCountOf(c) < corePoolSize) {
// addWorker創(chuàng)建線程,第二個參數(shù)表示是否創(chuàng)建核心線程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 這里處理當前線程數(shù)超過核心線程數(shù)的邏輯
// 2.先判斷當前線程池狀態(tài)余境,如果是在運行中驻呐,就把任務(wù)丟到隊列里邊去,否則拒絕任務(wù)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 這里需要雙重判斷線程池的狀態(tài)芳来,因為這阻塞的過程中有可能線程池的狀態(tài)被改變了含末,
// 如果不是Running狀態(tài),說明線程池執(zhí)行了shutdown操作即舌,就刪除此任務(wù)佣盒,并且拒絕
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果工作狀態(tài)的線程為0,說明沒有線程了或者核心線程數(shù)設(shè)置為0了侥涵,就添加非核心線程數(shù)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3.如果線程不是運行狀態(tài)沼撕,那就任務(wù)進入隊列,這里有3點需要注意
// 線程池不是運行狀態(tài)芜飘,addWorker內(nèi)部會判斷線程池的狀態(tài)
// 第二個參數(shù)表示是否創(chuàng)建核心線程
// addWorker返回false說明任務(wù)執(zhí)行失敗务豺,需要拒絕任務(wù)
else if (!addWorker(command, false))
reject(command);
}
3.4. addWorker源碼分析
addWorker涉及到了很多細節(jié),如果要讀懂每一個細節(jié)完全不必要嗦明,但是思想理解就行了笼沥,addWorker的意思就是添加線程,線程要存到容器里,往里頭添加線程任務(wù)的時候肯定是多個線程同時往里面扔的奔浅,所以一定要同步馆纳,但是追求效率,一般都是用自旋或者lock
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 兩層死循環(huán)就為了做一個事情汹桦,添加一個woker的數(shù)量加1
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 判斷線程池狀態(tài)滿足以下條件鲁驶,就返回
// 1、線程池狀態(tài)大于SHUTDOWN
// 2舞骆、線程池狀態(tài)等于SHUTDOWN并且firstTask執(zhí)行任務(wù)不為null,直接返回false
// 3钥弯、線程池狀態(tài)等于SHUTDOWN,且隊列為空督禽,直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 嵌套死循環(huán)
for (;;) {
int wc = workerCountOf(c);
// 如果當前線程超過最大允許線程數(shù)脆霎,或者根據(jù)core狀態(tài),大于核心線程或者最大線程數(shù)狈惫,返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 嘗試把當前執(zhí)行線程數(shù)+1睛蛛,如果+1成功,打破循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
// 重新獲取當前線程池狀態(tài)
c = ctl.get();
// 如果當前執(zhí)行線程數(shù)不等于之前讀取的數(shù)量胧谈,說明有別的線程加忆肾!成功了
if (runStateOf(c) != rs)
// 重試
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 下面的邏輯是創(chuàng)建一個線程去執(zhí)行任務(wù)
// 是否執(zhí)行任務(wù)狀態(tài),true執(zhí)行成功
boolean workerStarted = false;
// 判斷是否假如addWorker狀態(tài) add加入成功
boolean workerAdded = false;
Worker w = null;
try {
// 創(chuàng)建一個Worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// worker的添加必須是串行的,因此必須加鎖
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 重新檢查當前線程池狀態(tài)第岖,查詢當前執(zhí)行任務(wù)的數(shù)量
int rs = runStateOf(ctl.get());
// 如果rs小于SHUTDOWN难菌,或者rs為SHUTDOWN并且firstTask為null
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// workder已經(jīng)調(diào)用過了start方法,則不再創(chuàng)建worker蔑滓,拋出異常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// worker創(chuàng)建并添加workers成功
workers.add(w)郊酒;
// 更新largestPoolSize變量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 釋放鎖
mainLock.unlock();
}
// 啟動worker線程
if (workerAdded) {
// 這里會調(diào)用worker的run方法,
// 實際上是調(diào)用執(zhí)行runWorker方法
t.start();
workerStarted = true;
}
}
} finally {
// worker線程啟動失敗键袱,說明線程池狀態(tài)發(fā)生了變化(關(guān)閉操作被執(zhí)行), 需要進行shutdown相關(guān)操作
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
3.5.線程池worker任務(wù)單元
來看看worker是個什么東西燎窘,看源碼是一個靜態(tài)內(nèi)部類Worker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
// 1. 線程
final Thread thread;
// 2. 任務(wù)
Runnable firstTask;
// 3. 執(zhí)行次數(shù)
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// 線程池執(zhí)行的核心方法
public void run() {
runWorker(this);
}
}
3.6.核心線程執(zhí)行邏輯runWorker
這里有一個有意思的地方可以看到Worker 是實現(xiàn)了Runnable,它自己也有一個Runnbale和Thread蹄咖,你就把Worker當成一個工人褐健,工人有任務(wù)(Runnbale)和執(zhí)行能力(Thread),但是你得保證每一個工人執(zhí)行的任務(wù)是自己的,并且自己執(zhí)行完了以后澜汤,completedTasks要加1
所以當多線程添加任務(wù)的時候蚜迅,把當前線程復(fù)制一份給Thread和任務(wù)下發(fā)給工人的Runnbale,然后讓工人去lock俊抵,工人的lock 其實就是之前學(xué)過ReentrantLock的acquire方法谁不,加入鏈表,等待執(zhí)行徽诲,這樣子就完成了整個串行刹帕。
final void runWorker(Worker w) {
// 1. 這里是用來自己實現(xiàn)方法的beforeExecute吵血,自己實現(xiàn)內(nèi)容
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 這里調(diào)用了unlock,其實也就調(diào)用了release(1)偷溺,執(zhí)行過程中允許線程被中斷
w.unlock(); // allow interrupts
// 判斷是否繼續(xù)自旋
boolean completedAbruptly = true;
try {
// 這里判斷任務(wù)是否為存在或者隊列中的任務(wù)不為空蹋辅,注意如果從隊列取就會造成阻塞
while (task != null || (task = getTask()) != null) {
// CAS加鎖
w.lock();
// 判斷當前線程池狀態(tài)是否為停止,如果停止了挫掏,則中斷
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 這里用來擴展功能侦另,但是沒用到
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 這個工人開始執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
// gc回收
task = null;
// 任務(wù)加1
w.completedTasks++;
// 釋放鎖
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 自旋操作退出,說明線程池結(jié)束
processWorkerExit(w, completedAbruptly);
}
}
4.woker類總結(jié)
這個woker是一個Thread 也是一個Runnable 同樣也是一個AQS砍濒,先來說說為什么要有Runnable和Thread
4.1.為什么要有Runnable?
這里其實是用來記錄任務(wù)的淋肾,因為Woker 里邊有很多狀態(tài)需要根據(jù)當前任務(wù)去記錄,并且這個東西必須要在Thread中執(zhí)行
4.2.為什么要有Thread?
因為線程池有很多Woker去競爭爸邢,所以干脆就把Woker設(shè)計成AQS,一個線程處理一個當前任務(wù)拿愧,而不是說有其他worker執(zhí)行了不是自己的任務(wù)
4.3.submit和execute的區(qū)別是什么
方法定義的位置不同杠河,execute是在Executor執(zhí)行器中,而submit是在ExecutorService執(zhí)行服務(wù)中的
參數(shù)接收不同浇辜,execute接收的是Runnbale 券敌,submit接收的是Callable
作用不用,execute只是單純的執(zhí)行任務(wù)柳洋,submit可以把任務(wù)的結(jié)果帶出來待诅。
4.4.線程池大概執(zhí)行流程
核心線程數(shù)不夠,啟動核心線程
核心線程滿了熊镣,加入隊列
核心線程和隊列都滿了卑雁,addWorker 加入非核心線程
4.5.addWorker 做的事情
count 加1
真正的加入任務(wù)并且start
WorkStealingPool
WorkStealingPool 是另外一種線程,核心非常簡單绪囱,之前講的ThreadPoolExecutor線程是去一個任務(wù)的隊列里取任務(wù)测蹲。而WorkStealingPool是每一個線程都有自己的任務(wù)隊列,當一個線程執(zhí)行完以后鬼吵,會在別的線程任務(wù)隊列中偷任務(wù)
跟原來只有一個隊列的線程池相比扣甲,如果有某一個線程被占用了很長時間,然后任務(wù)隊列又特別的重齿椅,那其他線程只能空著琉挖,沒辦法幫到任務(wù)重的線程
源碼
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
WorkStealingPool本質(zhì)上是一個ForkJoinPool
package com.learn.thread.eight;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class TestWorkStealingPool {
private static final ExecutorService service = Executors.newWorkStealingPool();
public static void main(String[] args) throws IOException {
System.out.println(Runtime.getRuntime().availableProcessors());
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
service.execute(new R(2000));
System.in.read();
}
static class R implements Runnable {
int time;
R(int time) {
this.time = time;
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(time + " " + Thread.currentThread().getName());
}
}
}
5.ForkJoinPool
ForkJoinPool 適合把大任務(wù)切分成一個個小任務(wù)去執(zhí)行,小任務(wù)如果還是大涣脚,再切示辈,不一定是兩個,可以是多個涩澡,但是最終的結(jié)果就是多個小任務(wù)結(jié)果的匯總顽耳。這個過程就叫做join 坠敷,所以這種線程池叫做ForkJoinPool。
既然是可以分割的任務(wù)射富,那怎么定義任務(wù)呢膝迎,之前線程池執(zhí)行的任務(wù)就是Runnable。在這里胰耗,我們一般實現(xiàn)ForkJoinPool的時候需要定義成特定他的類型限次,這個類型又必須是可以分叉的任務(wù),這個任務(wù)就叫做ForkJoinTask,但是實際上這個ForkJoinTask又比較原始柴灯,我們可以用RecursiveAction卖漫,這里邊又有兩種。
第一個RecursiveAction遞歸赠群,稱為不帶返回值的任務(wù)羊始,因為我們可以把大任務(wù)分割成小任務(wù),小任務(wù)又可以分成小任務(wù)查描,一直到我滿意的條件為止突委,這其中就帶有遞歸的過程。等會來看看一個例子冬三,所以我要對一百萬個數(shù)進行求和匀油,單線程肯定很慢。
第二個RecursiveTask勾笆,叫做帶返回值的子任務(wù)
package com.learn.thread.eight;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
public class TestForkJoinPool {
private static int[] nums = new int[1000000];
// 按5萬個一組敌蚜,進行分組
private static int MAX_SIZE = 500000;
static Random random = new Random();
static {
for (int i = 0; i < 1000000; i++) {
nums[i] = random.nextInt(100);
}
System.out.println("-----" + Arrays.stream(nums).sum());
}
public static void main(String[] args) throws IOException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 無返回參數(shù)子任務(wù)
AddTask addTask = new AddTask(0, nums.length);
// forkJoinPool.execute(addTask);
AddTaskRet addTaskRet = new AddTaskRet(0, nums.length);
forkJoinPool.invoke(addTaskRet);
System.out.println(addTaskRet.join());
System.in.read();
}
/**
* 不帶返回值的ForkJoinPool
*/
static class AddTask extends RecursiveAction {
private int start;
private int end;
public AddTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
// 如果是在分組內(nèi),開始計算窝爪,否則再分組
if (end - start <= MAX_SIZE) {
long sum = 0L;
for (int i = start; i < end; i++) {
sum += nums[i];
}
System.out.println("from start" + start + "to end " + end + "sum =" + sum);
} else {
int middle = start + (end - start) / 2;
AddTask addTask = new AddTask(start, middle);
AddTask addTask1 = new AddTask(middle, end);
addTask.fork();
addTask1.fork();
}
}
}
/**
* 不帶返回值的ForkJoinPool
*/
static class AddTaskRet extends RecursiveTask<Long> {
private int start;
private int end;
public AddTaskRet(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
// 如果是在分組內(nèi)弛车,開始計算,否則再分組
if (end - start <= MAX_SIZE) {
long sum = 0L;
for (int i = start; i < end; i++) {
sum += nums[i];
}
return sum;
}
int middle = start + (end - start) / 2;
AddTaskRet addTask = new AddTaskRet(start, middle);
AddTaskRet addTask1 = new AddTaskRet(middle, end);
addTask.fork();
addTask1.fork();
return addTask.join() + addTask1.join();
}
}
}
6.parallelStream
java8 有一個并行流酸舍,底層就是ForkJoinPool算法來實現(xiàn)的帅韧。
你可以把集合里面的內(nèi)容想像成一個個河流往外流,在流經(jīng)過某個地方的時候處理一下啃勉。
舉例:我們new 了一個1000000 數(shù)據(jù)的集合忽舟,然后判斷這些數(shù)是不是質(zhì)數(shù),foreach是lamdba表達式的一個流式處理淮阐,還是相當于一個遍歷循環(huán)叮阅。
但是parallelStream并行流是并行的來處理這個任務(wù)切分成一個個子任務(wù),所以跟foreach會有一個時間上的差距泣特。所以在互相之間線程不需要同步的時候浩姥,你可以用這種并行流來處理效率會高一些
package com.learn.thread.eight;
import com.google.common.collect.RangeMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class TestParallelStream {
public static boolean isPrime(int num) {
for (int i = 2; i < num/2; i++) {
if (num % i ==0) {
return false;
}
}
return true;
}
public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
Random random = new Random();
for (int i = 0; i < 1000000; i++) {
list.add(random.nextInt(1000000));
}
long start = System.currentTimeMillis();
list.forEach(TestParallelStream::isPrime);
long end = System.currentTimeMillis();
System.out.println("ends" + (end - start));
start = System.currentTimeMillis();
list.parallelStream().forEach(TestParallelStream::isPrime);
end = System.currentTimeMillis();
System.out.println("ends" + (end - start));
}
}
總結(jié)
線程池有兩種ThreadPoolService\ForkJoinPool
區(qū)別在于ThreadPoolService多個線程共享一個任務(wù)隊列,下面各個每個線程都有自己的任務(wù)隊列