(三)線程的并發(fā)工具類

Fork-Join

java 下多線程的開發(fā)可以我們自己啟用多線程狭园,線程池胜臊,還可以使用fork-join ,fork-join 可以讓我們不去了解諸如Thread,Runnable等相關(guān)的知識(shí)舞终,只要遵循forkjoin的開發(fā)模式趟紊,就可以寫出很好的多線程并發(fā)程序椒振。

分而治之

同時(shí)forkjoin在處理某一類問題時(shí)非常的有用朽基,哪一類問題布隔?分而治之的問題。十大計(jì)算機(jī)經(jīng)典算法:
快速排序稼虎、堆排序衅檀、歸并排序、二分查找霎俩、線性查找哀军、深度優(yōu)先、廣度優(yōu)先打却、Dijkstra杉适、動(dòng)態(tài)規(guī)劃、樸素貝葉斯分類柳击,有幾個(gè)屬于分而治之猿推?
3個(gè),快速排序捌肴、歸并排序蹬叭、二分查找,還有大數(shù)據(jù)中M/R都是状知。
分治法的設(shè)計(jì)思想是:將一個(gè)難以直接解決的大問題秽五,分割成一些規(guī)模較小的相同問題,以便各個(gè)擊破试幽,分而治之筝蚕。
分治策略是:對于一個(gè)規(guī)模為n的問題,若該問題可以容易地解決(比如說規(guī)模n較衅涛搿)則直接解決起宽,否則將其分解為k個(gè)規(guī)模較小的子問題,這些子問題互相獨(dú)立且與原問題形式相同(子問題相互之間有聯(lián)系就會(huì)變?yōu)閯?dòng)態(tài)規(guī)劃算法)济榨,遞歸地解這些子問題坯沪,然后將各子問題的解合并得到原問題的解。這種算法設(shè)計(jì)策略叫做分治法擒滑。

歸并排序

歸并排序是建立在歸并操作上的一種有效的排序算法腐晾。該算法是采用分治法的一個(gè)非常典型的應(yīng)用。將已有序的子序列合并丐一,得到完全有序的序列藻糖;即先使每個(gè)子序列有序,再使子序列段間有序库车。
若將兩個(gè)有序表合并成一個(gè)有序表巨柒,稱為2-路歸并,與之對應(yīng)的還有多路歸并柠衍。
對于給定的一組數(shù)據(jù)洋满,利用遞歸與分治技術(shù)將數(shù)據(jù)序列劃分成為越來越小的半子表,在對半子表排序后珍坊,再用遞歸方法將排好序的半子表合并成為越來越大的有序序列牺勾。
為了提升性能,有時(shí)我們在半子表的個(gè)數(shù)小于某個(gè)數(shù)(比如15)的情況下阵漏,對半子表的排序采用其他排序算法驻民,比如插入排序

image.png

1.先將數(shù)組劃分為左右兩個(gè)子表:


image.png
image.png

2.然后繼續(xù)左右兩個(gè)子表拆分:


image.png

3.對有序的子表進(jìn)行排序和比較合并:


image.png

4.對合并后的子表繼續(xù)比較合并
image.png

幾個(gè)排序算法的比較

創(chuàng)建數(shù)據(jù)工具類


/**
 * @author sxylml
 * @Date : 2019/5/17 10:13
 * @Description: 創(chuàng)建數(shù)據(jù)
 */
public class MakeArray {

    public static int[] makeArray(int arraySize) {
        if (arraySize <= 0) {
            arraySize = 0;
        }
        //new一個(gè)隨機(jī)數(shù)發(fā)生器
        Random r = new Random();
        int[] result = new int[arraySize];
        for (int i = 0; i < arraySize; i++) {
            //用隨機(jī)數(shù)填充數(shù)組
            result[i] = r.nextInt(arraySize * 3);
        }
        return result;
    }


    public static void printArray(int[] array) {

        for (int i = 0; i < array.length; i++) {
            System.out.print(array[i]);
            System.out.print(",");
        }
        System.out.println();
    }

}
冒泡排序
public class BubbleSort {

    public static int[] sort(int[] array) {

        for (int i = 0; i < array.length - 1; i++) {
            //-1為了防止溢出
            for (int j = 0; j < array.length - i - 1; j++) {

                if (array[j] > array[j + 1]) {
                    int temp = array[j];
                    array[j] = array[j + 1];
                    array[j + 1] = temp;
                }
            }
        }
        return array;
    }
}
簡單插入排序
public class InsertionSort {
    public static int[] sort(int[] array) {
        if (array.length == 0) {
            return array;
        }
        //當(dāng)前待排序數(shù)據(jù),該元素之前的元素均已被排序過
        int currentValue;
        //在已被排序過數(shù)據(jù)中倒序?qū)ふ液线m的位置履怯,如果當(dāng)前待排序數(shù)據(jù)比比較的元素要小川无,將比較的元素元素后移一位
        for (int i = 0; i < array.length - 1; i++) {
            //已被排序數(shù)據(jù)的索引
            int preIndex = i;
            currentValue = array[preIndex + 1];

            while (preIndex >= 0 && currentValue < array[preIndex]) {
                //將當(dāng)前元素后移一位
                array[preIndex + 1] = array[preIndex];
                preIndex--;
            }
            //while循環(huán)結(jié)束時(shí),說明已經(jīng)找到了當(dāng)前待排序數(shù)據(jù)的合適位置虑乖,插入
            array[preIndex + 1] = currentValue;
           //   System.out.println("第" + (i + 1) + "次排序");
            //  MakeArray.printArray(array);
        }
        return array;
    }
歸并排序
/**
 * @author sxylml
 * @Date : 2019/5/17 10:26
 * @Description: 歸并排序
 */
public class MergeSort {



    public static int[] sort(int[] array, int threshold) {

        //如果數(shù)組長度小于等于閾值就直接用簡單插入排序
        if (array.length <= threshold) {
            return InsertionSort.sort(array);
        } else {
            //切分?jǐn)?shù)組懦趋,然后遞歸調(diào)用
            int mid = array.length / 2;
            int[] left = Arrays.copyOfRange(array, 0, mid);
            int[] right = Arrays.copyOfRange(array, mid + 1, array.length);
            return merge(left, right);
        }
    }

    public static int[] merge(int[] left, int[] right) {
        int[] result = new int[left.length + right.length];

        for (int index = 0, i = 0, j = 0; index < result.length; index++) {

            //左邊數(shù)組已經(jīng)取完,完全取右邊數(shù)組的值即可
            if (i >= left.length) {
                result[index] = right[j++];
            } else if (j >= right.length) {
                //右邊數(shù)組已經(jīng)取完疹味,完全取左邊數(shù)組的值即可
                result[index] = left[i++];
            } else if (left[i] > right[j]) {
                //左邊數(shù)組的元素值大于右邊數(shù)組仅叫,取右邊數(shù)組的值
                result[index] = right[j];
            } else {
                //右邊數(shù)組的元素值大于左邊數(shù)組,取左邊數(shù)組的值
                result[index] = left[i++];
            }

        }
        return result;
    }
}
測試類
public class SortTest {
    public static void main(String[] args) {
        System.out.println("================歸并排序================================");
        long start = System.currentTimeMillis();
        int arraySize = 100000;
        int[] array = MakeArray.makeArray(arraySize);
        array = MergeSort.sort(array, 10);
        System.out.println(" spend time:" + (System.currentTimeMillis() - start) + "ms");


        System.out.println("================直接插入排序============================");
        start = System.currentTimeMillis();
        array = MakeArray.makeArray(arraySize);
        array = InsertionSort.sort(array);
        System.out.println(" spend time:" + (System.currentTimeMillis() - start) + "ms");

        System.out.println("================冒泡插入排序============================");
        start = System.currentTimeMillis();
        array = MakeArray.makeArray(arraySize);
        array = BubbleSort.sort(array);
        System.out.println(" spend time:" + (System.currentTimeMillis() - start) + "ms");


    }
}

幾個(gè)排序算法的耗時(shí):


image.png

Fork-Join原理

image.png

工作密取

即當(dāng)前線程的Task已經(jīng)全被執(zhí)行完畢糙捺,則自動(dòng)取到其他線程的Task池中取出Task繼續(xù)執(zhí)行诫咱。
ForkJoinPool中維護(hù)著多個(gè)線程(一般為CPU核數(shù))在不斷地執(zhí)行Task,每個(gè)線程除了執(zhí)行自己職務(wù)內(nèi)的Task之外洪灯,還會(huì)根據(jù)自己工作線程的閑置情況去獲取其他繁忙的工作線程的Task坎缭,如此一來就能能夠減少線程阻塞或是閑置的時(shí)間,提高CPU利用率。


image.png

Fork/Join實(shí)戰(zhàn)

Fork/Join使用的標(biāo)準(zhǔn)范式

我們要使用ForkJoin框架掏呼,必須首先創(chuàng)建一個(gè)ForkJoin任務(wù)坏快。它提供在任務(wù)中執(zhí)行fork和join的操作機(jī)制,通常我們不直接繼承ForkjoinTask類憎夷,只需要直接繼承其子類莽鸿。

  1. RecursiveAction,用于沒有返回結(jié)果的任務(wù)
  2. RecursiveTask拾给,用于有返回值的任務(wù)
    task要通過ForkJoinPool來執(zhí)行祥得,使用submit 或 invoke 提交,兩者的區(qū)別是:invoke是同步執(zhí)行蒋得,調(diào)用之后需要等待任務(wù)完成级及,才能執(zhí)行后面的代碼;submit是異步執(zhí)行额衙。
    join()和get方法當(dāng)任務(wù)完成的時(shí)候返回計(jì)算結(jié)果创千。
image.png

在我們自己實(shí)現(xiàn)的compute方法里,首先需要判斷任務(wù)是否足夠小入偷,如果足夠小就直接執(zhí)行任務(wù)追驴。如果不足夠小,就必須分割成兩個(gè)子任務(wù)疏之,每個(gè)子任務(wù)在調(diào)用invokeAll方法時(shí)殿雪,又會(huì)進(jìn)入compute方法,看看當(dāng)前子任務(wù)是否需要繼續(xù)分割成孫任務(wù)锋爪,如果不需要繼續(xù)分割丙曙,則執(zhí)行當(dāng)前子任務(wù)并返回結(jié)果。使用join方法會(huì)等待子任務(wù)執(zhí)行完并得到其結(jié)果其骄。





import tools.SleepTools;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
 * @author sxylml
 * @Date : 2019/5/24 16:03
 * @Description:
 */
public class SumTest {
    public static void main(String[] args) {

        int[] srcArray = MakeArray.makeArray(100000);
        sumforkjoinSum(srcArray);
        sumNormal(srcArray);


    }


    public static void sumforkjoinSum(int[] srcArray) {

        //new 出池的實(shí)例
        ForkJoinPool pool = new ForkJoinPool();
        // new 出task 實(shí)例
        SumTask innerFind = new SumTask(srcArray, 0, srcArray.length - 1);
        long start = System.currentTimeMillis();
        pool.invoke(innerFind);
        long sum = innerFind.join();
        System.out.println("sumforkjoinSum:The count is " + sum + " spend time:" + (System.currentTimeMillis() - start) + "ms");
    }

    /**
     * ForkJoin執(zhí)行累加
     */
    private static class SumTask extends RecursiveTask<Long> {

        /*閾值*/
        private final int THRESHOLD;
        private int[] src;
        private int fromIndex;
        private int toIndex;

        public SumTask(int[] src, int fromIndex, int toIndex) {
            this.src = src;
            this.fromIndex = fromIndex;
            this.toIndex = toIndex;
            this.THRESHOLD = src.length / 10;
        }

        @Override
        protected Long compute() {
            /*任務(wù)的大小是否合適*/
            if (toIndex - fromIndex < THRESHOLD) {
//                System.out.println(" from index = "+fromIndex
//                        +" toIndex="+toIndex);
                long count = 0;
                for (int i = fromIndex; i <= toIndex; i++) {
                    //SleepTools.ms(1);
                    count = count + src[i];
                }
                return count;
            } else {
                //fromIndex....mid.....toIndex 拆分子任務(wù)
                int mid = (fromIndex + toIndex) / 2;
                SumTask left = new SumTask(src, fromIndex, mid);
                SumTask right = new SumTask(src, mid + 1, toIndex);
                // 提交給 pool 執(zhí)行
                invokeAll(left, right);
                // 左右兩邊的值
                return left.join() + right.join();
            }
        }

    }

    /**
     * 普通計(jì)算和的方法
     *
     * @param srcArray
     */
    public static void sumNormal(int[] srcArray) {

        long count = 0;
        long start = System.currentTimeMillis();
        for (int i = 0; i < srcArray.length; i++) {
            //休眠下可以看到forkjoin要快
            //SleepTools.ms(1);
            count += srcArray[i];
        }
        System.out.println("sumNormal:The count is " + count + " spend time:" + (System.currentTimeMillis() - start) + "ms");
    }

}

使用fork/join 遍歷指定目錄(含子目錄)找尋指定類型文件


import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

/**
 * @author sxylml
 * @Date : 2019/5/25 09:29
 * @Description: 遍歷指定目錄(含子目錄)找尋指定類型文件
 */
public class FindDirsFiles extends RecursiveAction {
    private File path;
    private String suffix;

    public FindDirsFiles(File path, String suffix) {
        this.path = path;
        this.suffix = suffix;
    }

    @Override
    protected void compute() {
        List<FindDirsFiles> subTasks = new ArrayList<>();
        File[] files = path.listFiles();

        if (files != null) {
            for (File file : files) {
                if (file.isDirectory()) {
                    //對每一個(gè)子目錄創(chuàng)建一個(gè)子任務(wù)
                    subTasks.add(new FindDirsFiles(file, suffix));
                } else {
                    //文件亏镰,就判斷后綴尋找自己需要的
                    if (file.getAbsolutePath().endsWith(suffix)) {
                        System.out.println(file.getAbsolutePath());
                    }
                }

            }

            if (!subTasks.isEmpty()) {
                // 在當(dāng)前的 ForkJoinPool 上調(diào)度所有的子任務(wù)。
                for (FindDirsFiles subTask : invokeAll(subTasks)) {
                    subTask.join();
                }
            }
        }
    }

    public static void findDirsFiles(String path, String suffix) {

        try {
            // 用一個(gè) ForkJoinPool 實(shí)例調(diào)度總?cè)蝿?wù)
            ForkJoinPool pool = new ForkJoinPool();
            FindDirsFiles task = new FindDirsFiles(new File(path), suffix);
            //異步提交
            pool.execute(task);

             /*主線程做自己的業(yè)務(wù)工作*/
            System.out.println("Task is Running......");
            Thread.sleep(1);
            int otherWork = 0;
            for (int i = 0; i < 100; i++) {
                otherWork = otherWork + i;
            }
            System.out.println("Main Thread done sth......,otherWork=" + otherWork);
            //阻塞方法
            task.join();
            System.out.println("Task end");
        } catch (Exception e) {

        }

    }


    public static void main(String[] args) {
        findDirsFiles("C:/","java");
    }
}

并發(fā)工具類

CountDownLatch

閉鎖拯爽,CountDownLatch 這個(gè)類能夠使一個(gè)線程等待其他線程完成各自的工作后再執(zhí)行索抓。
例如:應(yīng)用程序的主線程希望再負(fù)責(zé)啟動(dòng)框架服務(wù)的線程已經(jīng)啟動(dòng)所有的框架服務(wù)之后再執(zhí)行。
CountDownLatch是通過一個(gè)計(jì)數(shù)器來實(shí)現(xiàn)的毯炮,計(jì)數(shù)器的初始值為初始任務(wù)的數(shù)量逼肯。每當(dāng)完成了一個(gè)任務(wù)后,計(jì)數(shù)器的值就會(huì)減1
(CountDownLatch.countDown()方法)桃煎。當(dāng)計(jì)數(shù)器值到達(dá)0時(shí)篮幢,它表示所有的已經(jīng)完成了任務(wù),然后在閉鎖上等待CountDownLatch.await()方法的線程就可以恢復(fù)執(zhí)行任務(wù)为迈。

應(yīng)用場景:

實(shí)現(xiàn)最大的并行性:有時(shí)我們想同時(shí)啟動(dòng)多個(gè)線程三椿,實(shí)現(xiàn)最大程度的并行性缺菌。例如,我們想測試一個(gè)單例類搜锰。如果我們創(chuàng)建一個(gè)初始計(jì)數(shù)為1的CountDownLatch伴郁,并讓所有線程都在這個(gè)鎖上等待,那么我們可以很輕松地完成測試纽乱。我們只需調(diào)用 一次countDown()方法就可以讓所有的等待線程同時(shí)恢復(fù)執(zhí)行蛾绎。
開始執(zhí)行前等待n個(gè)線程完成各自任務(wù):例如應(yīng)用程序啟動(dòng)類要確保在處理用戶請求前昆箕,所有N個(gè)外部系統(tǒng)已經(jīng)啟動(dòng)和運(yùn)行了鸦列,例如處理excel中多個(gè)表單。


image.png
import tools.SleepTools;
import java.util.concurrent.CountDownLatch;
/**
 * @author sxylml
 * @Date : 2019/5/25 10:10
 * @Description: 演示CountDownLatch用法鹏倘,共5個(gè)初始化子線程薯嗤,6個(gè)閉鎖扣除點(diǎn),扣除完畢后纤泵,主線程和業(yè)務(wù)線程才能繼續(xù)執(zhí)行
 */
public class UseCountDownLatch {
    /**
     * 可以自己嘗試骆姐,分別設(shè)置 6,大于6 小于6捏题,看運(yùn)行結(jié)果
     */

    static CountDownLatch latch = new CountDownLatch(6);
    /**初始化線程*/
    private static class InitThread implements Runnable {

        @Override
        public void run() {
            System.out.println("Thread_" + Thread.currentThread().getId() + " ready init work......");
            latch.countDown();
            for (int i = 0; i < 2; i++) {
                System.out.println("Thread_" + Thread.currentThread().getId() + " ........continue do its work");
            }
        }
    }

    /**業(yè)務(wù)線程等待latch的計(jì)數(shù)器為0完成*/
    private static class BusiThread implements Runnable {

        @Override
        public void run() {
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for (int i = 0; i < 3; i++) {
                System.out.println("BusiThread_" + Thread.currentThread().getId() + " do business-----");
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                SleepTools.ms(1);
                System.out.println("Thread_" + Thread.currentThread().getId() + " ready init work step 1st......");
                latch.countDown();
                System.out.println("begin step 2nd.......");
                SleepTools.ms(1);
                System.out.println("Thread_" + Thread.currentThread().getId() + " ready init work step 2nd......");
                latch.countDown();
            }
        }).start();
        new Thread(new BusiThread()).start();
        for (int i = 0; i <= 3; i++) {
            Thread thread = new Thread(new InitThread());
            thread.start();
        }

        latch.await();
        System.out.println("Main do ites work........");
    }

}

CyclicBarrier

CyclicBarrier的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)玻褪。它要做的事情是,讓一組線程到達(dá)一個(gè)屏障(也可以叫同步點(diǎn))時(shí)被阻塞公荧,直到最后一個(gè)線程到達(dá)屏障時(shí)带射,屏障才會(huì)開門,所有被屏障攔截的線程才會(huì)繼續(xù)運(yùn)行循狰。CyclicBarrier默認(rèn)的構(gòu)造方法是CyclicBarrier(int parties)窟社,其參數(shù)表示屏障攔截的線程數(shù)量,每個(gè)線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達(dá)了屏障绪钥,然后當(dāng)前線程被阻塞灿里。
CyclicBarrier還提供一個(gè)更高級(jí)的構(gòu)造函數(shù)CyclicBarrier(int parties,Runnable barrierAction)程腹,用于在線程到達(dá)屏障時(shí)匣吊,優(yōu)先執(zhí)行barrierAction,方便處理更復(fù)雜的業(yè)務(wù)場景寸潦。
CyclicBarrier可以用于多線程計(jì)算數(shù)據(jù)缀去,最后合并計(jì)算結(jié)果的場景。


image.png
package com.ch2.forkjoin.tools;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;

/**
 * @author sxylml
 * @Date : 2019/5/25 10:39
 * @Description: 演示CyclicBarrier用法, 共4個(gè)子線程甸祭,他們?nèi)客瓿晒ぷ骱舐扑椋怀鲎约航Y(jié)果,
 * 再被統(tǒng)一釋放去做自己的事情池户,而交出的結(jié)果被另外的線程拿來拼接字符串
 */
public class UseCyclicBarrier {

    /**
     *
     */
    private static CyclicBarrier barrier = new CyclicBarrier(4, new CollectThread());

    /**
     * 存放子線程工作結(jié)果的容器
     */
    private static ConcurrentHashMap<String, Long> resultMap = new ConcurrentHashMap<>();

    /*匯總的任務(wù)*/
    private static class CollectThread implements Runnable {

        @Override
        public void run() {
            StringBuilder result = new StringBuilder();
            for (Map.Entry<String, Long> workResult : resultMap.entrySet()) {
                result.append("[" + workResult.getValue() + "]");
            }
            System.out.println(" the result = " + result);
            System.out.println("do other business........");
        }
    }

    /*相互等待的子線程*/
    private static class SubThread implements Runnable {

        @Override
        public void run() {
            long id = Thread.currentThread().getId();
            resultMap.put(Thread.currentThread().getId() + "", id);
            try {
                Thread.sleep(1000 + id);
                System.out.println("Thread_" + id + " ....do something ");
                barrier.await();
                Thread.sleep(1000 + id);
                System.out.println("Thread_" + id + " ....do its business ");
                // 可以重復(fù)執(zhí)行匯總
                barrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 4; i++) {
            Thread thread = new Thread(new SubThread());
            thread.start();
        }

    }
}

CountDownLatch和CyclicBarrier辨析

CountDownLatch的計(jì)數(shù)器只能使用一次咏雌,而CyclicBarrier的計(jì)數(shù)器可以反復(fù)使用凡怎。
CountDownLatch.await一般阻塞工作線程,所有的進(jìn)行預(yù)備工作的線程執(zhí)行countDown赊抖,而CyclicBarrier通過工作線程調(diào)用await從而自行阻塞统倒,直到所有工作線程達(dá)到指定屏障,再大家一起往下走氛雪。
在控制多個(gè)線程同時(shí)運(yùn)行上房匆,CountDownLatch可以不限線程數(shù)量,而CyclicBarrier是固定線程數(shù)报亩。
同時(shí)浴鸿,CyclicBarrier還可以提供一個(gè)barrierAction,合并多線程計(jì)算結(jié)果弦追。

Semaphore

Semaphore (信號(hào)量) 是用來控制同時(shí)訪問特定資源的線程數(shù)量岳链,它通過協(xié)調(diào)各個(gè)線程,以保證合理的使用公共資源劲件。應(yīng)用場景Semaphore 可以用于做流量控制掸哑,特別是公用資源有限的應(yīng)用場景,比如數(shù)據(jù)庫連接零远。

假如有一個(gè)需求苗分。

要讀取幾十萬個(gè)文件的數(shù)據(jù),因?yàn)槎际荌O密集型任務(wù)牵辣,我們可以啟用幾十個(gè)線程并發(fā)地讀取摔癣,但是如果讀到內(nèi)存后,還需要存儲(chǔ)到數(shù)據(jù)庫中服猪,二數(shù)據(jù)庫的連接數(shù)量只有10個(gè)供填,這時(shí)我們必須控制只有10個(gè)線程同事獲取數(shù)據(jù)庫連接保存數(shù)據(jù)。否則會(huì)報(bào)錯(cuò)無法獲取數(shù)據(jù)庫連接罢猪。這個(gè)時(shí)候近她,就可以使用Semaphore 來做流量控制。Semaphore 的構(gòu)造方法Semaphore (int permits) 接受一個(gè)整型的數(shù)字膳帕,表示可用的許可證數(shù)量粘捎。Semaphore 的用法也很簡單,首先線程使用Semaphore的acquire() 方法獲取一個(gè)許可證危彩,使用完之后調(diào)用release() 方法歸還許可證攒磨。還可以用tryAcquire()方法嘗試獲取許可證。
Semaphore 還提供一些其它方法汤徽,如下:
intavailablePermits(): 返回此信號(hào)量中當(dāng)前可用的許可證數(shù)娩缰。
intgetQuenueLength();返回正在等待獲取許可證的線程數(shù)。
booleanhasQueuedThreads(); 是否有線程正在等待獲取許可證谒府。
void reducePermits(int reduction); 減少reduction 個(gè)許可證拼坎,是個(gè)protected 方法浮毯。
Collection getQueuedThreads(); 返回所有等待獲取許可證的線程集合,是個(gè)protected方法泰鸡。

image.png
用Semaphore實(shí)現(xiàn)數(shù)據(jù)庫連接池

public class SqlConnectImpl implements Connection {
/**
     * 拿一個(gè)數(shù)據(jù)庫連接
     */
    public static final Connection fetchConnection() {
        return new SqlConnectImpl();
    }
 // 實(shí)現(xiàn)其它方法
}

package com.ch2.forkjoin.tools.semaphore;

import java.sql.Connection;
import java.util.LinkedList;
import java.util.concurrent.Semaphore;

/**
 * @author sxylml
 * @Date : 2019/5/25 11:27
 * @Description: 演示Semaphore用法债蓝,一個(gè)數(shù)據(jù)庫連接池的實(shí)現(xiàn)
 */
public class DBPoolSemaphore {
    /**
     * 連接池大小
     */
    private final static int POOL_SIZE = 10;
    /**
     * 兩個(gè)指示器,分別表示池子還有可用連接和已用連接
     */
    private final Semaphore useful, useless;

    /**
     * 存放數(shù)據(jù)庫連接的容器
     */
    private static LinkedList<Connection> pool = new LinkedList<Connection>();


    //初始化池
    static {
        for (int i = 0; i < POOL_SIZE; i++) {
            pool.addLast(SqlConnectImpl.fetchConnection());
        }
    }

    public DBPoolSemaphore() {
        this.useful = new Semaphore(10);
        this.useless = new Semaphore(0);
    }


    /**
     * 歸還連接
     */
    public void returnConnect(Connection connection) throws InterruptedException {
        if (connection != null) {
            System.out.println("當(dāng)前有" + useful.getQueueLength() + "個(gè)線程等待數(shù)據(jù)庫連接!"
                    + "可用連接數(shù):" + useful.availablePermits());
            useless.acquire();
            synchronized (pool) {
                pool.addLast(connection);
            }
            useful.release();
        }
    }


    /**
     * 從池子拿連接
     */
    public Connection takeConnect() throws InterruptedException {
        useful.acquire();
        Connection connection;
        synchronized (pool) {
            connection = pool.removeFirst();
        }
        useless.release();
        return connection;
    }
}

import tools.SleepTools;
import java.sql.Connection;
import java.util.Random;
/**
 * @author sxylml
 * @Date : 2019/5/25 14:09
 * @Description:
 */
public class AppTest {
    private static DBPoolSemaphore dbPool = new DBPoolSemaphore();

    private static class BusiThread extends Thread {
        @Override
        public void run() {
            /**
             * 讓每個(gè)線程持有連接的時(shí)間不一樣
             */
            Random r = new Random();
            long start = System.currentTimeMillis();
            try {
                Connection connect = dbPool.takeConnect();
                System.out.println("Thread_" + Thread.currentThread().getId() + "_獲取數(shù)據(jù)庫連接共耗時(shí)【" + (System.currentTimeMillis() - start) + "】ms.");
                /**
                 * 模擬業(yè)務(wù)操作盛龄,線程持有連接查詢數(shù)據(jù)
                 */
                SleepTools.ms(100 + r.nextInt(100));
                System.out.println("查詢數(shù)據(jù)完成饰迹,歸還連接!");
                dbPool.returnConnect(connect);
            } catch (InterruptedException e) {
            }
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 50; i++) {
            Thread thread = new BusiThread();
            thread.start();
        }
    }
}

Semaphore注意事項(xiàng)
package com.ch2.forkjoin.tools.semaphore;

import tools.SleepTools;

import java.sql.Connection;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.Semaphore;

/**
 * @author sxylml
 * @Date : 2019/5/25 11:36
 * @Description: 演示Semaphore用法余舶,一個(gè)數(shù)據(jù)庫連接池的實(shí)現(xiàn)
 */
public class DBPoolNoUseless {

    private final static int POOL_SIZE = 10;
    private final Semaphore useful;
    /**
     * 存放數(shù)據(jù)庫連接的容器
     */
    private static LinkedList<Connection> pool = new LinkedList<Connection>();

    /**
     *
     *初始化池
     */
    static {
        for (int i = 0; i < POOL_SIZE; i++) {
            pool.addLast(SqlConnectImpl.fetchConnection());
        }
    }

    public DBPoolNoUseless() {
        this.useful = new Semaphore(10);
    }


    /*歸還連接*/
    public void returnConnect(Connection connection) throws InterruptedException {
        if (connection != null) {
            System.out.println("當(dāng)前有" + useful.getQueueLength() + "個(gè)線程等待數(shù)據(jù)庫連接!!"
                    + "可用連接數(shù):" + useful.availablePermits());
            synchronized (pool) {
                pool.addLast(connection);
            }
            useful.release();
        }
    }


    /*從池子拿連接*/
    public Connection takeConnect() throws InterruptedException {
        useful.acquire();
        Connection connection;
        synchronized (pool) {
            connection = pool.removeFirst();
        }
        return connection;
    }

    private static DBPoolNoUseless dbPoolNoUseless = new DBPoolNoUseless();

    private static class BusiThread extends Thread {
        @Override
        public void run() {
            /**
             *讓每個(gè)線程持有連接的時(shí)間不一樣
             */

            Random r = new Random();
            long start = System.currentTimeMillis();
            try {
                System.out.println("Thread_" + Thread.currentThread().getId()
                        + "_獲取數(shù)據(jù)庫連接共耗時(shí)【" + (System.currentTimeMillis() - start) + "】ms.");
                /**
                 * 模擬業(yè)務(wù)操作啊鸭,線程持有連接查詢數(shù)據(jù)
                 */
                SleepTools.ms(100 + r.nextInt(100));
                System.out.println("查詢數(shù)據(jù)完成,歸還連接欧芽!");
                dbPoolNoUseless.returnConnect(new SqlConnectImpl());
            } catch (InterruptedException e) {
            }
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 50; i++) {
            Thread thread = new BusiThread();
            thread.start();
        }
    }

}

Exchange

Exchanger(交換者)是一個(gè)用于線程間協(xié)作的工具類莉掂。Exchanger用于進(jìn)行線程間的數(shù)據(jù)交換葛圃。它提供一個(gè)同步點(diǎn)千扔,在這個(gè)同步點(diǎn),兩個(gè)線程可以交換彼此的數(shù)據(jù)库正。這兩個(gè)線程通過exchange方法交換數(shù)據(jù)曲楚,如果第一個(gè)線程先執(zhí)行exchange()方法,它會(huì)一直等待第二個(gè)線程也執(zhí)行exchange方法褥符,當(dāng)兩個(gè)線程都到達(dá)同步點(diǎn)時(shí)龙誊,這兩個(gè)線程就可以交換數(shù)據(jù),將本線程生產(chǎn)出來的數(shù)據(jù)傳遞給對方喷楣。


image.png
package com.ch2.forkjoin.tools;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Exchanger;

/**
 * @author sxylml
 * @Date : 2019/5/25 16:04
 * @Description: 演示Exchange用法
 */
public class UseExchange {
    private static final Exchanger<Set<String>> exchange = new Exchanger<Set<String>>();

    public static void main(String[] args) {

        new Thread(new Runnable() {
            @Override
            public void run() {
                //存放數(shù)據(jù)的容器
                Set<String> setA = new HashSet<String>();
                try {
                    /*添加數(shù)據(jù)
                     * set.add(.....)
                     * */

                    setA.add("A");
                    setA.add("a");
                    //交換set
                    setA = exchange.exchange(setA);

                    setA.forEach(src -> System.out.println("setA"+src));
                    /**處理交換后的數(shù)據(jù)*/
                } catch (InterruptedException e) {
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                /**
                 * 存放數(shù)據(jù)的容器
                 */
                Set<String> setB = new HashSet<String>();
                try {
                    /*添加數(shù)據(jù)
                     * set.add(.....)
                     * set.add(.....)
                     * */
                    setB.add("B");
                    setB.add("b");
                    //交換set
                    setB = exchange.exchange(setB);
                    /**處理交換后的數(shù)據(jù)*/
                    setB.forEach(src -> System.out.println("setB:" + src));
                } catch (InterruptedException e) {
                }
            }
        }).start();

    }
}

image.png

Callable趟大、Future和FutureTask

Runnable是一個(gè)接口,在它里面只聲明了一個(gè)run()方法铣焊,由于run()方法返回值為void類型逊朽,所以在執(zhí)行完任務(wù)之后無法返回任何結(jié)果。
Callable位于java.util.concurrent包下曲伊,它也是一個(gè)接口叽讳,在它里面也只聲明了一個(gè)方法,只不過這個(gè)方法叫做call()坟募,這是一個(gè)泛型接口岛蚤,call()函數(shù)返回的類型就是傳遞進(jìn)來的V類型。
Future就是對于具體的Runnable或者Callable任務(wù)的執(zhí)行結(jié)果進(jìn)行取消懈糯、查詢是否完成涤妒、獲取結(jié)果。必要時(shí)可以通過get方法獲取執(zhí)行結(jié)果赚哗,該方法會(huì)阻塞直到任務(wù)返回結(jié)果她紫。


image.png

FutureTask類實(shí)現(xiàn)了RunnableFuture接口铁坎,RunnableFuture繼承了Runnable接口和Future接口,而FutureTask實(shí)現(xiàn)了RunnableFuture接口犁苏。所以它既可以作為Runnable被線程執(zhí)行硬萍,又可以作為Future得到Callable的返回值。


image.png

因此我們通過一個(gè)線程運(yùn)行Callable围详,但是Thread不支持構(gòu)造方法中傳遞Callable的實(shí)例朴乖,所以我們需要通過FutureTask把一個(gè)Callable包裝成Runnable,然后再通過這個(gè)FutureTask拿到Callable運(yùn)行后的返回值助赞。
要new一個(gè)FutureTask的實(shí)例买羞,有兩種方法

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**
 * @author sxylml
 * @Date : 2019/5/25 16:58
 * @Description:
 */
public class UseFuture {

    private static class UseCallable implements Callable<Integer> {
        int sum = 0;

        @Override
        public Integer call() throws Exception {
            System.out.println("Callable子線程開始計(jì)算庄萎!");

            for (int i = 0; i < 5000; i++) {
                if (Thread.currentThread().isInterrupted()) {
                    System.out.println("Callable子線程計(jì)算任務(wù)中斷耀怜!");
                    return null;
                }
                sum = sum + i;
                System.out.println("sum=" + sum);
            }
            System.out.println("Callable子線程計(jì)算結(jié)束淹父!結(jié)果為: " + sum);
            return sum;
        }
    }


    public static void main(String[] args) throws InterruptedException, ExecutionException {

        UseCallable useCallable = new UseCallable();
        //包裝
        FutureTask<Integer> futureTask = new FutureTask<>(useCallable);
        Random r = new Random();
        new Thread(futureTask).start();
        System.out.println("Get UseCallable result = " + futureTask.get());
        futureTask.cancel(true);
    }

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末钓葫,一起剝皮案震驚了整個(gè)濱河市具篇,隨后出現(xiàn)的幾起案子蒋譬,更是在濱河造成了極大的恐慌徐裸,老刑警劉巖羊初,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件街立,死亡現(xiàn)場離奇詭異舶衬,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)赎离,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進(jìn)店門逛犹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人梁剔,你說我怎么就攤上這事虽画。” “怎么了荣病?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵码撰,是天一觀的道長。 經(jīng)常有香客問我众雷,道長灸拍,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任砾省,我火速辦了婚禮鸡岗,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘编兄。我一直安慰自己轩性,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布狠鸳。 她就那樣靜靜地躺著揣苏,像睡著了一般悯嗓。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上卸察,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天脯厨,我揣著相機(jī)與錄音,去河邊找鬼坑质。 笑死合武,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的涡扼。 我是一名探鬼主播稼跳,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼吃沪!你這毒婦竟也來了汤善?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤票彪,失蹤者是張志新(化名)和其女友劉穎红淡,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體抹镊,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡锉屈,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年荤傲,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了垮耳。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,981評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡遂黍,死狀恐怖终佛,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情雾家,我是刑警寧澤铃彰,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布,位于F島的核電站芯咧,受9級(jí)特大地震影響牙捉,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜敬飒,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一邪铲、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧无拗,春花似錦带到、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽被饿。三九已至,卻和暖如春搪搏,著一層夾襖步出監(jiān)牢的瞬間狭握,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工疯溺, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留哥牍,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓喝检,卻偏偏與公主長得像嗅辣,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子挠说,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,933評論 2 355

推薦閱讀更多精彩內(nèi)容