TODO
持續(xù)補(bǔ)充中...
前言
以下內(nèi)容都是基于JDK1.8介紹
什么是ForkJoin(FJ)
ForkJoinPool 是 JDK7 引入的,由 Doug Lea 編寫的高性能線程池。
核心
- 分治算法(Divide-and-Conquer):
把任務(wù)遞歸的拆分為各個子任務(wù)前方,這樣可以更好的利用系統(tǒng)資源居灯,盡可能的使用所有可用的計(jì)算能力來提升應(yīng)用性能着憨。
圖片來源:http://www.reibang.com/p/32a15ef2f1bf
- work-stealing(工作竊取)算法:
線程池內(nèi)的所有工作線程都嘗試找到并執(zhí)行已經(jīng)提交的任務(wù)弧呐,或者是被其他活動任務(wù)創(chuàng)建的子任務(wù)(如果不存在就阻塞等待)管引。這種特性使得 ForkJoinPool 在運(yùn)行多個可以產(chǎn)生子任務(wù)的任務(wù)士败,或者是提交的許多小任務(wù)時(shí)效率更高。
以下翻譯自 A Java Fork/Join Framework (Doug Lea)
fork/join框架的核心在于它的輕量級調(diào)度機(jī)制褥伴。FJTask采用了Cilk的work - stealing調(diào)度器中首創(chuàng)的基本策略 :
- 每個工作線程在自己的線程中維護(hù)可運(yùn)行的任務(wù)調(diào)度隊(duì)列谅将。
- 隊(duì)列被維護(hù)為雙端隊(duì)列(deques),支持LIFO噩翠,push戏自、pop操作邦投,以及FIFO的poll/take
- 工作線程(FJThread)運(yùn)行的任務(wù)中生成子任務(wù)被push自己的deques上伤锚。
- 工作線程默認(rèn)情況下以LIFO的模式處理自己的deque,通過pop獲取任務(wù)。
- 當(dāng)工作線程沒有本地任務(wù)要運(yùn)行時(shí)屯援,它會嘗試猛们,從隨機(jī)選擇的其他隊(duì)列那里,使用FIFO(最早的優(yōu)先)規(guī)則偷取一個任務(wù)狞洋,弯淘。
- 當(dāng)一個工作線程遇到一個join操作時(shí),它處理其他任務(wù)(如果可用)吉懊,直到目標(biāo)任務(wù)完成庐橙。否則,所有任務(wù)都會在沒有阻塞的情況下運(yùn)行完成借嗽。 TODO
- 當(dāng)一個工作線程沒有任務(wù)并且偷取不到任何任務(wù)時(shí)态鳖,對于其他類型,它會back off(通過yield恶导、sleep和/或優(yōu)先級調(diào)節(jié))浆竭,并稍后再次嘗試,除非所有
的worker都空閑惨寿,在這種情況下邦泄,所有的worker都block,直到從頂層調(diào)用另一個任務(wù)裂垦。
圖片來源:A Java Fork/Join Framework (Doug Lea)
核心類介紹
- ForkJoinPool 這是線程池的核心類顺囊,也是提供方法供我們使用的入口類〗堵#基本上forkJoin的核心操作及線程池的管理方法都由這個類提供包蓝。
ForkJoinPool.WorkQueue 這是ForkJoinPool類的內(nèi)部類。也是線程池核心的組成部分企量。ForkJoinPool線程池將由WorkQueue數(shù)組組成测萎。為了進(jìn)一步提高性能,與ThreadPoolExecutor不一樣的是届巩,這沒有采用外部傳入的任務(wù)隊(duì)列硅瞧,而是作者自己實(shí)現(xiàn)了一個阻塞隊(duì)列。奇數(shù)位是帶工作線程的存放fork出的子任務(wù)的隊(duì)列恕汇,偶數(shù)隊(duì)列存放的是外部提交的任務(wù)腕唧。
ForkJoinWorkerThread 線程池中運(yùn)行的thread也是作者重新定義的。這個類的目的是在于將外部的各種形式的task都轉(zhuǎn)換為統(tǒng)一的ForkJoinTask格式瘾英。
ForkJoinTask 這是ForkJoinPool支持運(yùn)行的task抽象類枣接,我們一般使用其子類如RecursiveTask(有返回值)或者RecursiveAction(無返回值)。
https://blog.51cto.com/u_14014612/6031659
Fork-join任務(wù)運(yùn)行機(jī)制
workequeues為什么區(qū)分奇偶slot
外部提交任務(wù)放在pool.workequeues偶數(shù)slot
內(nèi)部提交任務(wù)放在pool.workequeues奇數(shù)slot
在我看來是一般外部提交的任務(wù)初始化缺谴,即遞歸起始點(diǎn)但惶,會是一個大任務(wù)(相對于fork之后的小任務(wù)),F(xiàn)JWorkerThread會把fork之后的小任務(wù)放在自己隊(duì)列的top(即工作線程自己的隊(duì)列都是相對于外部提交任務(wù)而言的小任務(wù)),在FJWorkerThread啟動之后是通過scan即竊取其他隊(duì)列的任務(wù)膀曾,通過poll即使用FIFO的方式竊取base位县爬,即“大任務(wù)”開始,相當(dāng)于“廣度優(yōu)先”添谊,而FJWorker是通過LIFO方式pop本地隊(duì)列财喳,類似與一種“深度優(yōu)先”的方式,兩者結(jié)合有助于發(fā)揮多線程優(yōu)勢斩狱。
work-steal的優(yōu)勢
通過
List<Future<?>> list = new ArrayList<>();
long start = System.currentTimeMillis();
for (int j = 1; j <= 2; j++) {
int i = j;
Future<?> submit = threadPool.submit(() -> {
System.out.println(Thread.currentThread().getName() + ", level1 task " + i);
Future innerTask = threadPool.submit(() ->
System.out.println(Thread.currentThread().getName() + ", level2 task" + i));
try {
innerTask.get();
} catch (Exception e) {
e.printStackTrace();
}
});
list.add(submit);
}
Future[] outerTasks = list.toArray(new Future[0]);
System.out.println("waiting...");
try {
for (Future outerTask : outerTasks) {
outerTask.get();
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("done");
System.out.println("cost:" + (System.currentTimeMillis() - start));
分別使用
//會死鎖
testThreadPool(new ThreadPoolExecutor(2,2,10,TimeUnit.DAYS,new ArrayBlockingQueue<>(20)));
testThreadPool(Executors.newFixedThreadPool(2));
//下面兩個一樣耳高,都是使用ForkJoin進(jìn)行工作竊取
testThreadPool(new ForkJoinPool(2));
testThreadPool(Executors.newWorkStealingPool(2));
- 首先提交的兩個任務(wù)把線程池中的兩個線程都占滿了,而它們又分別提交了子任務(wù)所踊,并等待子任務(wù)完成才退出
子任務(wù)在工作隊(duì)列中等待線程池中釋放出空閑線程來執(zhí)行祝高,這是不可能的,所以兩邊互相等待污筷,死鎖了 - ForkJoinPool 與普通線程池的主要區(qū)別是它實(shí)現(xiàn)了工作竊取算法工闺。明顯的內(nèi)部區(qū)別是:
- 普通線程池所有線程共享一個工作隊(duì)列,有空閑線程時(shí)工作隊(duì)列中的任務(wù)才能得到執(zhí)行
- ForkJoinPool 中的每個線程有自己獨(dú)立的工作隊(duì)列瓣蛀,每個工作線程運(yùn)行中產(chǎn)生新的任務(wù)陆蟆,放在隊(duì)尾,某個工作線程會嘗試竊取別個工作線程隊(duì)列中的任務(wù)惋增,從隊(duì)列頭部竊取,遇到 join() 時(shí)叠殷,如前面的 future.get(),如果 join 的任務(wù)尚未完成诈皿,則可先處理其他任務(wù),這就是 ForkJoinPool 不會像普通線程池那樣被死鎖的秘訣林束。
https://blog.csdn.net/weixin_42593549/article/details/114613629
FJ執(zhí)行圖示
public class AddNumTask extends RecursiveTask<Integer> {
private List<Integer> ints;
public AddNumTask(List<Integer> ints) {
this.ints = ints;
}
@Override
protected Integer compute() {
if (ints.size() <= 2) {
int sum = 0;
for (int in : ints) {
sum += in;
}
return sum;
} else {
AddNumTask task1 = new AddNumTask(ints.subList(0, ints.size() / 2));
AddNumTask task2 = new AddNumTask(ints.subList(ints.size() / 2, ints.size()));
invokeAll(task1,task2);
Integer join = task1.join();
join += task2.join();
return join;
}
}
}
public class ForkJoinMain {
public static void main(String[] args) {
List<Integer> ints = new ArrayList<>();
for (int i = 0; i <= 10; i++) {
ints.add(i);
}
ForkJoinPool pool = ForkJoinPool.commonPool();
ForkJoinTask<Integer> submit = pool.submit(new AddNumTask(ints));
try {
Integer integer = submit.get();
System.out.println(integer);
} catch (Exception e) {
e.printStackTrace();
}
}
- compute方法執(zhí)行對半切為兩個小task,t1(0-5)稽亏,t2(6-10)后壶冒,使用invokeAll(t1,t2)的執(zhí)行流程。
下去簡要描述了在并發(fā)數(shù)2的情況下的FJ的運(yùn)行情況
- 首先在 pool.submit(new AddNumTask(ints))之后截歉,把任務(wù)0-10放在了外部提交隊(duì)列胖腾,create了worker1
- worker1獲取到任務(wù)0-10,執(zhí)行compute瘪松,切分為t1(0-5),t2(6-10)后調(diào)用invokeAll
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
int s1, s2;
t2.fork();
if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
t1.reportException(s1);
if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
t2.reportException(s2);
}
- 首先t2.fork咸作,把t2放到當(dāng)前worker(FJWorkerThread)的top
- t1.doInvoke直接執(zhí)行,遞歸處理compute方法
FJ與普通線程池效果對別
public class Test {
private static ExecutorService executorService = new ThreadPoolExecutor(20, 20, 120, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2000), new ThreadPoolExecutor.CallerRunsPolicy());
public static void main(String[] args) {
int count = 0;
int nums = 100;
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "0");
ForkJoinPool.commonPool();
for(int i = 0;i < nums;i++){
long start = System.currentTimeMillis();
testFJ(3000000);
// testThreadPool(3000000);
count += (System.currentTimeMillis() - start);
}
System.out.println(nums + "次宵睦,平均耗時(shí):" + (count / nums) + "毫秒");
Scanner scanner = new Scanner(System.in);
scanner.nextLine();
}
//測試forkjoin
private static void testFJ(int size){
List<Integer> ints = new ArrayList<>();
for (int i = 0; i < size; i++) {
ints.add(i);
}
ForkJoinPool pool = ForkJoinPool.commonPool();
ForkJoinTask<Long> submit = pool.submit(new InvokeInterfaceTask(ints));
try {
Long aLong = submit.get();
System.out.println("count:" + aLong);
} catch (Exception e) {
e.printStackTrace();
}
}
//測試線程池
private static void testThreadPool(int size){
CountDownLatch latch = new CountDownLatch(size);
AtomicLong count = new AtomicLong();
for(int i = 0; i< size;i++){
executorService.execute(()->{
try{
TestTools.testMethod();
}finally{
latch.countDown();
}
});
}
try {
latch.await();
// System.out.println("count:" + count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public class TestTools {
public static long testMethod(){
// try {
// Thread.sleep(1);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
return 1L<<12 & 0xfff | 1 + 1;
}
}
public class InvokeInterfaceTask extends RecursiveTask<Long> {
private List<Integer> ints;
public InvokeInterfaceTask(List<Integer> ints) {
this.ints = ints;
}
@Override
protected Long compute() {
// System.out.println(Thread.currentThread().getName() + "," + ints.get(0) + "," + ints.get(ints.size() - 1));
if (ints.size() <= 1) {
long sum = 0;
sum = TestTools.testMethod(enterpriseDictService);
return sum;
} else {
InvokeInterfaceTask task1 = new InvokeInterfaceTask(ints.subList(0, ints.size() / 2));
InvokeInterfaceTask task2 = new InvokeInterfaceTask(ints.subList(ints.size() / 2, ints.size()));
// System.out.println(Thread.currentThread().getName() + "---invokeAll begin");
invokeAll(task1, task2);
// System.out.println(Thread.currentThread().getName() + "---invokeAll end");
Long join = task1.join();
// System.out.println(Thread.currentThread().getName() + "---task1.join");
join += task2.join();
// System.out.println(Thread.currentThread().getName() + "---task2.join");
return join;
}
}
}
上述代碼是測試在size數(shù)目(最細(xì)任務(wù)记罚,因?yàn)镕J的任務(wù)肯定大于size,ThreadPool的任務(wù)=size)時(shí)壳嚎,運(yùn)行nums次桐智,求平均耗時(shí)末早。
本機(jī)FJ的并發(fā)數(shù)為7
如下,random 10表示testMethod中使用了new Random.nextInt(10)酵使,隨機(jī)休眠荐吉,模擬接口耗時(shí)
- 1.把ThreadPool的core和max線程數(shù)設(shè)置為7焙糟,隊(duì)列長度2000時(shí)口渔,運(yùn)行結(jié)果二者耗時(shí)基本相等
- 2.把ThreadPool的core和max線程數(shù)設(shè)置為7,隊(duì)列長度3000000時(shí)穿撮,二者并發(fā)線程數(shù)相等的情況下缺脉,F(xiàn)J的耗時(shí)明顯低于ThreadPool,即便增加ThreadPool的線程數(shù)悦穿,隊(duì)列長度攻礼,耗時(shí)依然沒有明顯降低甚至增加。
總結(jié)
- 1.FJ對于“很多”的小任務(wù)處理優(yōu)勢是明顯的栗柒。
- 2.FJ對于遞歸處理礁扮,執(zhí)行結(jié)果進(jìn)行join的支持友好,不必需要額外代碼進(jìn)行匯總結(jié)果瞬沦,也無需特別注意并發(fā)問題
- 3.FJ是對于一般線程池的補(bǔ)充太伊,對于多線程情況多提供了一種解決方案的選擇
普通線程池使用問題
參考
Java并發(fā)編程——ForkJoinPool之外部提交及worker執(zhí)行過程
https://blog.51cto.com/u_14014612/6031659
JUC源碼分析-線程池篇(五):ForkJoinPool - 2
http://www.reibang.com/p/6a14d0b54b8d
擴(kuò)展閱讀
CAS原理解析
https://blog.csdn.net/yyqhwr/article/details/106965444
volatile
https://www.cnblogs.com/dolphin0520/p/3920373.html
wait/sleep
https://blog.csdn.net/qq_45731021/article/details/116502429
Java線程池實(shí)現(xiàn)原理及其在美團(tuán)業(yè)務(wù)中的實(shí)踐
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
ThreadLocalRandom.getProbe() 線程的探針哈希值
線程的探針哈希值說明
使用 ThreadLocalRandom.getProbe() 得到線程的探針哈希值,探針哈希值和 map 里使用的哈希值的區(qū)別是,當(dāng)線程發(fā)生數(shù)組元素爭用后逛钻,可以改變線程的探針哈希值僚焦,讓線程去使用另一個數(shù)組元素,而 map 中 key 對象的哈希值曙痘,由于有定位 value 的需求芳悲,所以它是一定不能變的。
private static final long PROBE
= U.objectFieldOffset(Thread.class, "threadLocalRandomProbe");
//實(shí)際是獲取的 Thread中的 threadLocalRandomProbe 屬性
//可以看出這個屬性是靜態(tài)的边坤,即類級別的名扛,所有對象共用
static final int getProbe() {
return U.getInt(Thread.currentThread(), PROBE);
}
//更新線程探針哈希值
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
U.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
守護(hù)線程
在Java中有兩類線程:用戶線程 (User Thread)、守護(hù)線程 (Daemon Thread)茧痒。
所謂守護(hù) 線程罢洲,是指在程序運(yùn)行的時(shí)候在后臺提供一種通用服務(wù)的線程,比如垃圾回收線程就是一個很稱職的守護(hù)者文黎,并且這種線程并不屬于程序中不可或缺的部分惹苗。因此,當(dāng)所有的非守護(hù)線程結(jié)束時(shí)耸峭,程序也就終止了桩蓉,同時(shí)會殺死進(jìn)程中的所有守護(hù)線程。反過來說劳闹,只要任何非守護(hù)線程還在運(yùn)行院究,程序就不會終止洽瞬。
用戶線程和守護(hù)線程兩者幾乎沒有區(qū)別,唯一的不同之處就在于虛擬機(jī)的離開:如果用戶線程已經(jīng)全部退出運(yùn)行了业汰,只剩下守護(hù)線程存在了伙窃,虛擬機(jī)也就退出了。 因?yàn)闆]有了被守護(hù)者样漆,守護(hù)線程也就沒有工作可做了为障,也就沒有繼續(xù)運(yùn)行程序的必要了。
將線程轉(zhuǎn)換為守護(hù)線程可以通過調(diào)用Thread對象的setDaemon(true)方法來實(shí)現(xiàn)放祟。在使用守護(hù)線程時(shí)需要注意一下幾點(diǎn):
(1) thread.setDaemon(true)必須在thread.start()之前設(shè)置鳍怨,否則會跑出一個IllegalThreadStateException異常。你不能把正在運(yùn)行的常規(guī)線程設(shè)置為守護(hù)線程跪妥。
(2) 在Daemon線程中產(chǎn)生的新線程也是Daemon的鞋喇。
(3) 守護(hù)線程應(yīng)該永遠(yuǎn)不去訪問固有資源,如文件眉撵、數(shù)據(jù)庫侦香,因?yàn)樗鼤谌魏螘r(shí)候甚至在一個操作的中間發(fā)生中斷。
>> 是 有符號的 右移 操作符纽疟。
符號為正罐韩,高位插入 0
符號為負(fù),高位插入 1
>>> 是 無符號的 右移 操作符仰挣。
不管符號為啥伴逸,高位插入0
負(fù)數(shù)左右移
https://blog.csdn.net/qq_41675265/article/details/126002069