ForkJoin

TODO
持續(xù)補(bǔ)充中...

前言

以下內(nèi)容都是基于JDK1.8介紹

什么是ForkJoin(FJ)

ForkJoinPool 是 JDK7 引入的,由 Doug Lea 編寫的高性能線程池。

核心
  • 分治算法(Divide-and-Conquer):
      把任務(wù)遞歸的拆分為各個子任務(wù)前方,這樣可以更好的利用系統(tǒng)資源居灯,盡可能的使用所有可用的計(jì)算能力來提升應(yīng)用性能着憨。

image.png

圖片來源:http://www.reibang.com/p/32a15ef2f1bf

  • work-stealing(工作竊取)算法:
      線程池內(nèi)的所有工作線程都嘗試找到并執(zhí)行已經(jīng)提交的任務(wù)弧呐,或者是被其他活動任務(wù)創(chuàng)建的子任務(wù)(如果不存在就阻塞等待)管引。這種特性使得 ForkJoinPool 在運(yùn)行多個可以產(chǎn)生子任務(wù)的任務(wù)士败,或者是提交的許多小任務(wù)時(shí)效率更高。

以下翻譯自 A Java Fork/Join Framework (Doug Lea)

fork/join框架的核心在于它的輕量級調(diào)度機(jī)制褥伴。FJTask采用了Cilk的work - stealing調(diào)度器中首創(chuàng)的基本策略 :

  • 每個工作線程在自己的線程中維護(hù)可運(yùn)行的任務(wù)調(diào)度隊(duì)列谅将。
  • 隊(duì)列被維護(hù)為雙端隊(duì)列(deques),支持LIFO噩翠,push戏自、pop操作邦投,以及FIFO的poll/take
  • 工作線程(FJThread)運(yùn)行的任務(wù)中生成子任務(wù)被push自己的deques上伤锚。
  • 工作線程默認(rèn)情況下以LIFO的模式處理自己的deque,通過pop獲取任務(wù)。
  • 當(dāng)工作線程沒有本地任務(wù)要運(yùn)行時(shí)屯援,它會嘗試猛们,從隨機(jī)選擇的其他隊(duì)列那里,使用FIFO(最早的優(yōu)先)規(guī)則偷取一個任務(wù)狞洋,弯淘。
  • 當(dāng)一個工作線程遇到一個join操作時(shí),它處理其他任務(wù)(如果可用)吉懊,直到目標(biāo)任務(wù)完成庐橙。否則,所有任務(wù)都會在沒有阻塞的情況下運(yùn)行完成借嗽。 TODO
  • 當(dāng)一個工作線程沒有任務(wù)并且偷取不到任何任務(wù)時(shí)态鳖,對于其他類型,它會back off(通過yield恶导、sleep和/或優(yōu)先級調(diào)節(jié))浆竭,并稍后再次嘗試,除非所有
    的worker都空閑惨寿,在這種情況下邦泄,所有的worker都block,直到從頂層調(diào)用另一個任務(wù)裂垦。
image.png

圖片來源:A Java Fork/Join Framework (Doug Lea)

核心類介紹

  • ForkJoinPool 這是線程池的核心類顺囊,也是提供方法供我們使用的入口類〗堵#基本上forkJoin的核心操作及線程池的管理方法都由這個類提供包蓝。
image.png
  • ForkJoinPool.WorkQueue 這是ForkJoinPool類的內(nèi)部類。也是線程池核心的組成部分企量。ForkJoinPool線程池將由WorkQueue數(shù)組組成测萎。為了進(jìn)一步提高性能,與ThreadPoolExecutor不一樣的是届巩,這沒有采用外部傳入的任務(wù)隊(duì)列硅瞧,而是作者自己實(shí)現(xiàn)了一個阻塞隊(duì)列。奇數(shù)位是帶工作線程的存放fork出的子任務(wù)的隊(duì)列恕汇,偶數(shù)隊(duì)列存放的是外部提交的任務(wù)腕唧。

  • ForkJoinWorkerThread 線程池中運(yùn)行的thread也是作者重新定義的。這個類的目的是在于將外部的各種形式的task都轉(zhuǎn)換為統(tǒng)一的ForkJoinTask格式瘾英。

  • ForkJoinTask 這是ForkJoinPool支持運(yùn)行的task抽象類枣接,我們一般使用其子類如RecursiveTask(有返回值)或者RecursiveAction(無返回值)。

https://blog.51cto.com/u_14014612/6031659

Fork-join任務(wù)運(yùn)行機(jī)制

image.png

workequeues為什么區(qū)分奇偶slot

外部提交任務(wù)放在pool.workequeues偶數(shù)slot
內(nèi)部提交任務(wù)放在pool.workequeues奇數(shù)slot
在我看來是一般外部提交的任務(wù)初始化缺谴,即遞歸起始點(diǎn)但惶,會是一個大任務(wù)(相對于fork之后的小任務(wù)),F(xiàn)JWorkerThread會把fork之后的小任務(wù)放在自己隊(duì)列的top(即工作線程自己的隊(duì)列都是相對于外部提交任務(wù)而言的小任務(wù)),在FJWorkerThread啟動之后是通過scan即竊取其他隊(duì)列的任務(wù)膀曾,通過poll即使用FIFO的方式竊取base位县爬,即“大任務(wù)”開始,相當(dāng)于“廣度優(yōu)先”添谊,而FJWorker是通過LIFO方式pop本地隊(duì)列财喳,類似與一種“深度優(yōu)先”的方式,兩者結(jié)合有助于發(fā)揮多線程優(yōu)勢斩狱。

work-steal的優(yōu)勢

通過

 List<Future<?>> list = new ArrayList<>();
        long start = System.currentTimeMillis();
        for (int j = 1; j <= 2; j++) {
            int i = j;
            Future<?> submit = threadPool.submit(() -> {
                System.out.println(Thread.currentThread().getName() + ", level1 task " + i);
                Future innerTask = threadPool.submit(() ->
                        System.out.println(Thread.currentThread().getName() + ", level2 task" + i));
                try {
                    innerTask.get();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            list.add(submit);
        }
        Future[] outerTasks = list.toArray(new Future[0]);
        System.out.println("waiting...");
        try {
            for (Future outerTask : outerTasks) {
                outerTask.get();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("done");
        System.out.println("cost:" + (System.currentTimeMillis() - start));

分別使用

//會死鎖
 testThreadPool(new ThreadPoolExecutor(2,2,10,TimeUnit.DAYS,new ArrayBlockingQueue<>(20)));
testThreadPool(Executors.newFixedThreadPool(2));
//下面兩個一樣耳高,都是使用ForkJoin進(jìn)行工作竊取
testThreadPool(new ForkJoinPool(2));
testThreadPool(Executors.newWorkStealingPool(2));
  • 首先提交的兩個任務(wù)把線程池中的兩個線程都占滿了,而它們又分別提交了子任務(wù)所踊,并等待子任務(wù)完成才退出
    子任務(wù)在工作隊(duì)列中等待線程池中釋放出空閑線程來執(zhí)行祝高,這是不可能的,所以兩邊互相等待污筷,死鎖了
  • ForkJoinPool 與普通線程池的主要區(qū)別是它實(shí)現(xiàn)了工作竊取算法工闺。明顯的內(nèi)部區(qū)別是:
    • 普通線程池所有線程共享一個工作隊(duì)列,有空閑線程時(shí)工作隊(duì)列中的任務(wù)才能得到執(zhí)行
    • ForkJoinPool 中的每個線程有自己獨(dú)立的工作隊(duì)列瓣蛀,每個工作線程運(yùn)行中產(chǎn)生新的任務(wù)陆蟆,放在隊(duì)尾,某個工作線程會嘗試竊取別個工作線程隊(duì)列中的任務(wù)惋增,從隊(duì)列頭部竊取,遇到 join() 時(shí)叠殷,如前面的 future.get(),如果 join 的任務(wù)尚未完成诈皿,則可先處理其他任務(wù),這就是 ForkJoinPool 不會像普通線程池那樣被死鎖的秘訣林束。
      https://blog.csdn.net/weixin_42593549/article/details/114613629

FJ執(zhí)行圖示

public class AddNumTask extends RecursiveTask<Integer> {

    private List<Integer> ints;

    public AddNumTask(List<Integer> ints) {
        this.ints = ints;
    }


    @Override
    protected Integer compute() {
        if (ints.size() <= 2) {
            int sum = 0;
            for (int in : ints) {
                sum += in;
            }
            return sum;
        } else {
            AddNumTask task1 = new AddNumTask(ints.subList(0, ints.size() / 2));
            AddNumTask task2 = new AddNumTask(ints.subList(ints.size() / 2, ints.size()));
            invokeAll(task1,task2);
            Integer join = task1.join();
            join += task2.join();
            return join;
        }
    }
}

public class ForkJoinMain {
    public static void main(String[] args) {

        List<Integer> ints = new ArrayList<>();
        for (int i = 0; i <= 10; i++) {
            ints.add(i);
        }
        ForkJoinPool pool = ForkJoinPool.commonPool();
        ForkJoinTask<Integer> submit = pool.submit(new AddNumTask(ints));
        try {
            Integer integer = submit.get();
            System.out.println(integer);
        } catch (Exception e) {
            e.printStackTrace();
        }
}
  • compute方法執(zhí)行對半切為兩個小task,t1(0-5)稽亏,t2(6-10)后壶冒,使用invokeAll(t1,t2)的執(zhí)行流程。

下去簡要描述了在并發(fā)數(shù)2的情況下的FJ的運(yùn)行情況

image.png
  • 首先在 pool.submit(new AddNumTask(ints))之后截歉,把任務(wù)0-10放在了外部提交隊(duì)列胖腾,create了worker1
  • worker1獲取到任務(wù)0-10,執(zhí)行compute瘪松,切分為t1(0-5),t2(6-10)后調(diào)用invokeAll

 public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
        int s1, s2;
        t2.fork();
        if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
            t1.reportException(s1);
        if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
            t2.reportException(s2);
    }
  • 首先t2.fork咸作,把t2放到當(dāng)前worker(FJWorkerThread)的top
  • t1.doInvoke直接執(zhí)行,遞歸處理compute方法

FJ與普通線程池效果對別

public class Test {
    private static ExecutorService executorService = new ThreadPoolExecutor(20, 20, 120, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2000), new ThreadPoolExecutor.CallerRunsPolicy());



    public static void main(String[] args) {
        int count = 0;
        int nums = 100;
        //System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "0");
        ForkJoinPool.commonPool();
        for(int i = 0;i < nums;i++){
            long start = System.currentTimeMillis();
            testFJ(3000000);
//        testThreadPool(3000000);
            count += (System.currentTimeMillis() - start);
        }
        System.out.println(nums + "次宵睦,平均耗時(shí):" + (count / nums) + "毫秒");
        Scanner scanner = new Scanner(System.in);
        scanner.nextLine();
    }
    //測試forkjoin
    private static void testFJ(int size){
        List<Integer> ints = new ArrayList<>();
        for (int i = 0; i < size; i++) {
            ints.add(i);
        }
        ForkJoinPool pool = ForkJoinPool.commonPool();
        ForkJoinTask<Long> submit = pool.submit(new InvokeInterfaceTask(ints));
        try {
            Long aLong = submit.get();
            System.out.println("count:" + aLong);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    //測試線程池
    private static void testThreadPool(int size){
        CountDownLatch latch = new CountDownLatch(size);
        AtomicLong count = new AtomicLong();
        for(int i = 0; i< size;i++){
            executorService.execute(()->{
                try{
                    TestTools.testMethod();
                }finally{
                    latch.countDown();
                }
            });
        }

        try {
            latch.await();
            // System.out.println("count:" + count);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

public class TestTools {
    public static long testMethod(){
//        try {
//            Thread.sleep(1);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
        return 1L<<12 & 0xfff | 1 + 1;
    }

}


public class InvokeInterfaceTask extends RecursiveTask<Long> {
    private List<Integer> ints;
    public InvokeInterfaceTask(List<Integer> ints) {
        this.ints = ints;
    }

    @Override
    protected Long compute() {
//        System.out.println(Thread.currentThread().getName() + "," + ints.get(0) + "," + ints.get(ints.size() - 1));
        if (ints.size() <= 1) {
            long sum = 0;
            sum = TestTools.testMethod(enterpriseDictService);
            return sum;
        } else {
            InvokeInterfaceTask task1 = new InvokeInterfaceTask(ints.subList(0, ints.size() / 2));
            InvokeInterfaceTask task2 = new InvokeInterfaceTask(ints.subList(ints.size() / 2, ints.size()));
//            System.out.println(Thread.currentThread().getName() + "---invokeAll begin");
            invokeAll(task1, task2);
//            System.out.println(Thread.currentThread().getName() + "---invokeAll end");
            Long join = task1.join();
//            System.out.println(Thread.currentThread().getName() + "---task1.join");
            join += task2.join();
//            System.out.println(Thread.currentThread().getName() + "---task2.join");
            return join;
        }
    }
}

上述代碼是測試在size數(shù)目(最細(xì)任務(wù)记罚,因?yàn)镕J的任務(wù)肯定大于size,ThreadPool的任務(wù)=size)時(shí)壳嚎,運(yùn)行nums次桐智,求平均耗時(shí)末早。
本機(jī)FJ的并發(fā)數(shù)為7
如下,random 10表示testMethod中使用了new Random.nextInt(10)酵使,隨機(jī)休眠荐吉,模擬接口耗時(shí)

image.png
  • 1.把ThreadPool的core和max線程數(shù)設(shè)置為7焙糟,隊(duì)列長度2000時(shí)口渔,運(yùn)行結(jié)果二者耗時(shí)基本相等
  • 2.把ThreadPool的core和max線程數(shù)設(shè)置為7,隊(duì)列長度3000000時(shí)穿撮,二者并發(fā)線程數(shù)相等的情況下缺脉,F(xiàn)J的耗時(shí)明顯低于ThreadPool,即便增加ThreadPool的線程數(shù)悦穿,隊(duì)列長度攻礼,耗時(shí)依然沒有明顯降低甚至增加。

總結(jié)

  • 1.FJ對于“很多”的小任務(wù)處理優(yōu)勢是明顯的栗柒。
  • 2.FJ對于遞歸處理礁扮,執(zhí)行結(jié)果進(jìn)行join的支持友好,不必需要額外代碼進(jìn)行匯總結(jié)果瞬沦,也無需特別注意并發(fā)問題
  • 3.FJ是對于一般線程池的補(bǔ)充太伊,對于多線程情況多提供了一種解決方案的選擇

普通線程池使用問題

http://javakk.com/188.html

參考

Java并發(fā)編程——ForkJoinPool之外部提交及worker執(zhí)行過程
https://blog.51cto.com/u_14014612/6031659
JUC源碼分析-線程池篇(五):ForkJoinPool - 2
http://www.reibang.com/p/6a14d0b54b8d

擴(kuò)展閱讀

CAS原理解析
https://blog.csdn.net/yyqhwr/article/details/106965444
volatile
https://www.cnblogs.com/dolphin0520/p/3920373.html
wait/sleep
https://blog.csdn.net/qq_45731021/article/details/116502429
Java線程池實(shí)現(xiàn)原理及其在美團(tuán)業(yè)務(wù)中的實(shí)踐
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

ThreadLocalRandom.getProbe() 線程的探針哈希值

線程的探針哈希值說明

使用 ThreadLocalRandom.getProbe() 得到線程的探針哈希值,探針哈希值和 map 里使用的哈希值的區(qū)別是,當(dāng)線程發(fā)生數(shù)組元素爭用后逛钻,可以改變線程的探針哈希值僚焦,讓線程去使用另一個數(shù)組元素,而 map 中 key 對象的哈希值曙痘,由于有定位 value 的需求芳悲,所以它是一定不能變的。

private static final long PROBE
        = U.objectFieldOffset(Thread.class, "threadLocalRandomProbe");
//實(shí)際是獲取的 Thread中的 threadLocalRandomProbe 屬性
//可以看出這個屬性是靜態(tài)的边坤,即類級別的名扛,所有對象共用
static final int getProbe() {
        return U.getInt(Thread.currentThread(), PROBE);
    }

//更新線程探針哈希值
static final int advanceProbe(int probe) {
        probe ^= probe << 13;   // xorshift
        probe ^= probe >>> 17;
        probe ^= probe << 5;
        U.putInt(Thread.currentThread(), PROBE, probe);
        return probe;
    }

守護(hù)線程

在Java中有兩類線程:用戶線程 (User Thread)、守護(hù)線程 (Daemon Thread)茧痒。
所謂守護(hù) 線程罢洲,是指在程序運(yùn)行的時(shí)候在后臺提供一種通用服務(wù)的線程,比如垃圾回收線程就是一個很稱職的守護(hù)者文黎,并且這種線程并不屬于程序中不可或缺的部分惹苗。因此,當(dāng)所有的非守護(hù)線程結(jié)束時(shí)耸峭,程序也就終止了桩蓉,同時(shí)會殺死進(jìn)程中的所有守護(hù)線程。反過來說劳闹,只要任何非守護(hù)線程還在運(yùn)行院究,程序就不會終止洽瞬。
用戶線程和守護(hù)線程兩者幾乎沒有區(qū)別,唯一的不同之處就在于虛擬機(jī)的離開:如果用戶線程已經(jīng)全部退出運(yùn)行了业汰,只剩下守護(hù)線程存在了伙窃,虛擬機(jī)也就退出了。 因?yàn)闆]有了被守護(hù)者样漆,守護(hù)線程也就沒有工作可做了为障,也就沒有繼續(xù)運(yùn)行程序的必要了。
將線程轉(zhuǎn)換為守護(hù)線程可以通過調(diào)用Thread對象的setDaemon(true)方法來實(shí)現(xiàn)放祟。在使用守護(hù)線程時(shí)需要注意一下幾點(diǎn):
(1) thread.setDaemon(true)必須在thread.start()之前設(shè)置鳍怨,否則會跑出一個IllegalThreadStateException異常。你不能把正在運(yùn)行的常規(guī)線程設(shè)置為守護(hù)線程跪妥。
(2) 在Daemon線程中產(chǎn)生的新線程也是Daemon的鞋喇。
(3) 守護(hù)線程應(yīng)該永遠(yuǎn)不去訪問固有資源,如文件眉撵、數(shù)據(jù)庫侦香,因?yàn)樗鼤谌魏螘r(shí)候甚至在一個操作的中間發(fā)生中斷。

>> 是 有符號的 右移 操作符纽疟。
符號為正罐韩,高位插入 0
符號為負(fù),高位插入 1
>>> 是 無符號的 右移 操作符仰挣。
不管符號為啥伴逸,高位插入0

負(fù)數(shù)左右移
https://blog.csdn.net/qq_41675265/article/details/126002069

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市膘壶,隨后出現(xiàn)的幾起案子错蝴,更是在濱河造成了極大的恐慌,老刑警劉巖颓芭,帶你破解...
    沈念sama閱讀 211,123評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件顷锰,死亡現(xiàn)場離奇詭異,居然都是意外死亡亡问,警方通過查閱死者的電腦和手機(jī)官紫,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,031評論 2 384
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來州藕,“玉大人束世,你說我怎么就攤上這事〈膊#” “怎么了毁涉?”我有些...
    開封第一講書人閱讀 156,723評論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長锈死。 經(jīng)常有香客問我贫堰,道長穆壕,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,357評論 1 283
  • 正文 為了忘掉前任其屏,我火速辦了婚禮喇勋,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘偎行。我一直安慰自己川背,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,412評論 5 384
  • 文/花漫 我一把揭開白布睦优。 她就那樣靜靜地躺著渗常,像睡著了一般壮不。 火紅的嫁衣襯著肌膚如雪汗盘。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,760評論 1 289
  • 那天询一,我揣著相機(jī)與錄音隐孽,去河邊找鬼。 笑死健蕊,一個胖子當(dāng)著我的面吹牛菱阵,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播缩功,決...
    沈念sama閱讀 38,904評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼晴及,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了嫡锌?” 一聲冷哼從身側(cè)響起虑稼,我...
    開封第一講書人閱讀 37,672評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎势木,沒想到半個月后蛛倦,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,118評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡啦桌,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,456評論 2 325
  • 正文 我和宋清朗相戀三年溯壶,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片甫男。...
    茶點(diǎn)故事閱讀 38,599評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡且改,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出板驳,到底是詐尸還是另有隱情又跛,我是刑警寧澤,帶...
    沈念sama閱讀 34,264評論 4 328
  • 正文 年R本政府宣布笋庄,位于F島的核電站效扫,受9級特大地震影響倔监,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜菌仁,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,857評論 3 312
  • 文/蒙蒙 一浩习、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧济丘,春花似錦谱秽、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,731評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至峡碉,卻和暖如春近哟,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背鲫寄。 一陣腳步聲響...
    開封第一講書人閱讀 31,956評論 1 264
  • 我被黑心中介騙來泰國打工吉执, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人地来。 一個月前我還...
    沈念sama閱讀 46,286評論 2 360
  • 正文 我出身青樓戳玫,卻偏偏與公主長得像,于是被迫代替她去往敵國和親未斑。 傳聞我的和親對象是個殘疾皇子咕宿,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,465評論 2 348

推薦閱讀更多精彩內(nèi)容