Fork-Join
java 下多線程的開發(fā)可以我們自己啟用多線程狭园,線程池胜臊,還可以使用fork-join ,fork-join 可以讓我們不去了解諸如Thread,Runnable等相關(guān)的知識(shí)舞终,只要遵循forkjoin的開發(fā)模式趟紊,就可以寫出很好的多線程并發(fā)程序椒振。
分而治之
同時(shí)forkjoin在處理某一類問題時(shí)非常的有用朽基,哪一類問題布隔?分而治之的問題。十大計(jì)算機(jī)經(jīng)典算法:
快速排序稼虎、堆排序衅檀、歸并排序、二分查找霎俩、線性查找哀军、深度優(yōu)先、廣度優(yōu)先打却、Dijkstra杉适、動(dòng)態(tài)規(guī)劃、樸素貝葉斯分類柳击,有幾個(gè)屬于分而治之猿推?
3個(gè),快速排序捌肴、歸并排序蹬叭、二分查找,還有大數(shù)據(jù)中M/R都是状知。
分治法的設(shè)計(jì)思想是:將一個(gè)難以直接解決的大問題秽五,分割成一些規(guī)模較小的相同問題,以便各個(gè)擊破试幽,分而治之筝蚕。
分治策略是:對于一個(gè)規(guī)模為n的問題,若該問題可以容易地解決(比如說規(guī)模n較衅涛搿)則直接解決起宽,否則將其分解為k個(gè)規(guī)模較小的子問題,這些子問題互相獨(dú)立且與原問題形式相同(子問題相互之間有聯(lián)系就會(huì)變?yōu)閯?dòng)態(tài)規(guī)劃算法)济榨,遞歸地解這些子問題坯沪,然后將各子問題的解合并得到原問題的解。這種算法設(shè)計(jì)策略叫做分治法擒滑。
歸并排序
歸并排序是建立在歸并操作上的一種有效的排序算法腐晾。該算法是采用分治法的一個(gè)非常典型的應(yīng)用。將已有序的子序列合并丐一,得到完全有序的序列藻糖;即先使每個(gè)子序列有序,再使子序列段間有序库车。
若將兩個(gè)有序表合并成一個(gè)有序表巨柒,稱為2-路歸并,與之對應(yīng)的還有多路歸并柠衍。
對于給定的一組數(shù)據(jù)洋满,利用遞歸與分治技術(shù)將數(shù)據(jù)序列劃分成為越來越小的半子表,在對半子表排序后珍坊,再用遞歸方法將排好序的半子表合并成為越來越大的有序序列牺勾。
為了提升性能,有時(shí)我們在半子表的個(gè)數(shù)小于某個(gè)數(shù)(比如15)的情況下阵漏,對半子表的排序采用其他排序算法驻民,比如插入排序
1.先將數(shù)組劃分為左右兩個(gè)子表:
2.然后繼續(xù)左右兩個(gè)子表拆分:
3.對有序的子表進(jìn)行排序和比較合并:
4.對合并后的子表繼續(xù)比較合并
幾個(gè)排序算法的比較
創(chuàng)建數(shù)據(jù)工具類
/**
* @author sxylml
* @Date : 2019/5/17 10:13
* @Description: 創(chuàng)建數(shù)據(jù)
*/
public class MakeArray {
public static int[] makeArray(int arraySize) {
if (arraySize <= 0) {
arraySize = 0;
}
//new一個(gè)隨機(jī)數(shù)發(fā)生器
Random r = new Random();
int[] result = new int[arraySize];
for (int i = 0; i < arraySize; i++) {
//用隨機(jī)數(shù)填充數(shù)組
result[i] = r.nextInt(arraySize * 3);
}
return result;
}
public static void printArray(int[] array) {
for (int i = 0; i < array.length; i++) {
System.out.print(array[i]);
System.out.print(",");
}
System.out.println();
}
}
冒泡排序
public class BubbleSort {
public static int[] sort(int[] array) {
for (int i = 0; i < array.length - 1; i++) {
//-1為了防止溢出
for (int j = 0; j < array.length - i - 1; j++) {
if (array[j] > array[j + 1]) {
int temp = array[j];
array[j] = array[j + 1];
array[j + 1] = temp;
}
}
}
return array;
}
}
簡單插入排序
public class InsertionSort {
public static int[] sort(int[] array) {
if (array.length == 0) {
return array;
}
//當(dāng)前待排序數(shù)據(jù),該元素之前的元素均已被排序過
int currentValue;
//在已被排序過數(shù)據(jù)中倒序?qū)ふ液线m的位置履怯,如果當(dāng)前待排序數(shù)據(jù)比比較的元素要小川无,將比較的元素元素后移一位
for (int i = 0; i < array.length - 1; i++) {
//已被排序數(shù)據(jù)的索引
int preIndex = i;
currentValue = array[preIndex + 1];
while (preIndex >= 0 && currentValue < array[preIndex]) {
//將當(dāng)前元素后移一位
array[preIndex + 1] = array[preIndex];
preIndex--;
}
//while循環(huán)結(jié)束時(shí),說明已經(jīng)找到了當(dāng)前待排序數(shù)據(jù)的合適位置虑乖,插入
array[preIndex + 1] = currentValue;
// System.out.println("第" + (i + 1) + "次排序");
// MakeArray.printArray(array);
}
return array;
}
歸并排序
/**
* @author sxylml
* @Date : 2019/5/17 10:26
* @Description: 歸并排序
*/
public class MergeSort {
public static int[] sort(int[] array, int threshold) {
//如果數(shù)組長度小于等于閾值就直接用簡單插入排序
if (array.length <= threshold) {
return InsertionSort.sort(array);
} else {
//切分?jǐn)?shù)組懦趋,然后遞歸調(diào)用
int mid = array.length / 2;
int[] left = Arrays.copyOfRange(array, 0, mid);
int[] right = Arrays.copyOfRange(array, mid + 1, array.length);
return merge(left, right);
}
}
public static int[] merge(int[] left, int[] right) {
int[] result = new int[left.length + right.length];
for (int index = 0, i = 0, j = 0; index < result.length; index++) {
//左邊數(shù)組已經(jīng)取完,完全取右邊數(shù)組的值即可
if (i >= left.length) {
result[index] = right[j++];
} else if (j >= right.length) {
//右邊數(shù)組已經(jīng)取完疹味,完全取左邊數(shù)組的值即可
result[index] = left[i++];
} else if (left[i] > right[j]) {
//左邊數(shù)組的元素值大于右邊數(shù)組仅叫,取右邊數(shù)組的值
result[index] = right[j];
} else {
//右邊數(shù)組的元素值大于左邊數(shù)組,取左邊數(shù)組的值
result[index] = left[i++];
}
}
return result;
}
}
測試類
public class SortTest {
public static void main(String[] args) {
System.out.println("================歸并排序================================");
long start = System.currentTimeMillis();
int arraySize = 100000;
int[] array = MakeArray.makeArray(arraySize);
array = MergeSort.sort(array, 10);
System.out.println(" spend time:" + (System.currentTimeMillis() - start) + "ms");
System.out.println("================直接插入排序============================");
start = System.currentTimeMillis();
array = MakeArray.makeArray(arraySize);
array = InsertionSort.sort(array);
System.out.println(" spend time:" + (System.currentTimeMillis() - start) + "ms");
System.out.println("================冒泡插入排序============================");
start = System.currentTimeMillis();
array = MakeArray.makeArray(arraySize);
array = BubbleSort.sort(array);
System.out.println(" spend time:" + (System.currentTimeMillis() - start) + "ms");
}
}
幾個(gè)排序算法的耗時(shí):
Fork-Join原理
工作密取
即當(dāng)前線程的Task已經(jīng)全被執(zhí)行完畢糙捺,則自動(dòng)取到其他線程的Task池中取出Task繼續(xù)執(zhí)行诫咱。
ForkJoinPool中維護(hù)著多個(gè)線程(一般為CPU核數(shù))在不斷地執(zhí)行Task,每個(gè)線程除了執(zhí)行自己職務(wù)內(nèi)的Task之外洪灯,還會(huì)根據(jù)自己工作線程的閑置情況去獲取其他繁忙的工作線程的Task坎缭,如此一來就能能夠減少線程阻塞或是閑置的時(shí)間,提高CPU利用率。
Fork/Join實(shí)戰(zhàn)
Fork/Join使用的標(biāo)準(zhǔn)范式
我們要使用ForkJoin框架掏呼,必須首先創(chuàng)建一個(gè)ForkJoin任務(wù)坏快。它提供在任務(wù)中執(zhí)行fork和join的操作機(jī)制,通常我們不直接繼承ForkjoinTask類憎夷,只需要直接繼承其子類莽鸿。
- RecursiveAction,用于沒有返回結(jié)果的任務(wù)
- RecursiveTask拾给,用于有返回值的任務(wù)
task要通過ForkJoinPool來執(zhí)行祥得,使用submit 或 invoke 提交,兩者的區(qū)別是:invoke是同步執(zhí)行蒋得,調(diào)用之后需要等待任務(wù)完成级及,才能執(zhí)行后面的代碼;submit是異步執(zhí)行额衙。
join()和get方法當(dāng)任務(wù)完成的時(shí)候返回計(jì)算結(jié)果创千。
在我們自己實(shí)現(xiàn)的compute方法里,首先需要判斷任務(wù)是否足夠小入偷,如果足夠小就直接執(zhí)行任務(wù)追驴。如果不足夠小,就必須分割成兩個(gè)子任務(wù)疏之,每個(gè)子任務(wù)在調(diào)用invokeAll方法時(shí)殿雪,又會(huì)進(jìn)入compute方法,看看當(dāng)前子任務(wù)是否需要繼續(xù)分割成孫任務(wù)锋爪,如果不需要繼續(xù)分割丙曙,則執(zhí)行當(dāng)前子任務(wù)并返回結(jié)果。使用join方法會(huì)等待子任務(wù)執(zhí)行完并得到其結(jié)果其骄。
import tools.SleepTools;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
* @author sxylml
* @Date : 2019/5/24 16:03
* @Description:
*/
public class SumTest {
public static void main(String[] args) {
int[] srcArray = MakeArray.makeArray(100000);
sumforkjoinSum(srcArray);
sumNormal(srcArray);
}
public static void sumforkjoinSum(int[] srcArray) {
//new 出池的實(shí)例
ForkJoinPool pool = new ForkJoinPool();
// new 出task 實(shí)例
SumTask innerFind = new SumTask(srcArray, 0, srcArray.length - 1);
long start = System.currentTimeMillis();
pool.invoke(innerFind);
long sum = innerFind.join();
System.out.println("sumforkjoinSum:The count is " + sum + " spend time:" + (System.currentTimeMillis() - start) + "ms");
}
/**
* ForkJoin執(zhí)行累加
*/
private static class SumTask extends RecursiveTask<Long> {
/*閾值*/
private final int THRESHOLD;
private int[] src;
private int fromIndex;
private int toIndex;
public SumTask(int[] src, int fromIndex, int toIndex) {
this.src = src;
this.fromIndex = fromIndex;
this.toIndex = toIndex;
this.THRESHOLD = src.length / 10;
}
@Override
protected Long compute() {
/*任務(wù)的大小是否合適*/
if (toIndex - fromIndex < THRESHOLD) {
// System.out.println(" from index = "+fromIndex
// +" toIndex="+toIndex);
long count = 0;
for (int i = fromIndex; i <= toIndex; i++) {
//SleepTools.ms(1);
count = count + src[i];
}
return count;
} else {
//fromIndex....mid.....toIndex 拆分子任務(wù)
int mid = (fromIndex + toIndex) / 2;
SumTask left = new SumTask(src, fromIndex, mid);
SumTask right = new SumTask(src, mid + 1, toIndex);
// 提交給 pool 執(zhí)行
invokeAll(left, right);
// 左右兩邊的值
return left.join() + right.join();
}
}
}
/**
* 普通計(jì)算和的方法
*
* @param srcArray
*/
public static void sumNormal(int[] srcArray) {
long count = 0;
long start = System.currentTimeMillis();
for (int i = 0; i < srcArray.length; i++) {
//休眠下可以看到forkjoin要快
//SleepTools.ms(1);
count += srcArray[i];
}
System.out.println("sumNormal:The count is " + count + " spend time:" + (System.currentTimeMillis() - start) + "ms");
}
}
使用fork/join 遍歷指定目錄(含子目錄)找尋指定類型文件
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
/**
* @author sxylml
* @Date : 2019/5/25 09:29
* @Description: 遍歷指定目錄(含子目錄)找尋指定類型文件
*/
public class FindDirsFiles extends RecursiveAction {
private File path;
private String suffix;
public FindDirsFiles(File path, String suffix) {
this.path = path;
this.suffix = suffix;
}
@Override
protected void compute() {
List<FindDirsFiles> subTasks = new ArrayList<>();
File[] files = path.listFiles();
if (files != null) {
for (File file : files) {
if (file.isDirectory()) {
//對每一個(gè)子目錄創(chuàng)建一個(gè)子任務(wù)
subTasks.add(new FindDirsFiles(file, suffix));
} else {
//文件亏镰,就判斷后綴尋找自己需要的
if (file.getAbsolutePath().endsWith(suffix)) {
System.out.println(file.getAbsolutePath());
}
}
}
if (!subTasks.isEmpty()) {
// 在當(dāng)前的 ForkJoinPool 上調(diào)度所有的子任務(wù)。
for (FindDirsFiles subTask : invokeAll(subTasks)) {
subTask.join();
}
}
}
}
public static void findDirsFiles(String path, String suffix) {
try {
// 用一個(gè) ForkJoinPool 實(shí)例調(diào)度總?cè)蝿?wù)
ForkJoinPool pool = new ForkJoinPool();
FindDirsFiles task = new FindDirsFiles(new File(path), suffix);
//異步提交
pool.execute(task);
/*主線程做自己的業(yè)務(wù)工作*/
System.out.println("Task is Running......");
Thread.sleep(1);
int otherWork = 0;
for (int i = 0; i < 100; i++) {
otherWork = otherWork + i;
}
System.out.println("Main Thread done sth......,otherWork=" + otherWork);
//阻塞方法
task.join();
System.out.println("Task end");
} catch (Exception e) {
}
}
public static void main(String[] args) {
findDirsFiles("C:/","java");
}
}
并發(fā)工具類
CountDownLatch
閉鎖拯爽,CountDownLatch 這個(gè)類能夠使一個(gè)線程等待其他線程完成各自的工作后再執(zhí)行索抓。
例如:應(yīng)用程序的主線程希望再負(fù)責(zé)啟動(dòng)框架服務(wù)的線程已經(jīng)啟動(dòng)所有的框架服務(wù)之后再執(zhí)行。
CountDownLatch是通過一個(gè)計(jì)數(shù)器來實(shí)現(xiàn)的毯炮,計(jì)數(shù)器的初始值為初始任務(wù)的數(shù)量逼肯。每當(dāng)完成了一個(gè)任務(wù)后,計(jì)數(shù)器的值就會(huì)減1
(CountDownLatch.countDown()方法)桃煎。當(dāng)計(jì)數(shù)器值到達(dá)0時(shí)篮幢,它表示所有的已經(jīng)完成了任務(wù),然后在閉鎖上等待CountDownLatch.await()方法的線程就可以恢復(fù)執(zhí)行任務(wù)为迈。
應(yīng)用場景:
實(shí)現(xiàn)最大的并行性:有時(shí)我們想同時(shí)啟動(dòng)多個(gè)線程三椿,實(shí)現(xiàn)最大程度的并行性缺菌。例如,我們想測試一個(gè)單例類搜锰。如果我們創(chuàng)建一個(gè)初始計(jì)數(shù)為1的CountDownLatch伴郁,并讓所有線程都在這個(gè)鎖上等待,那么我們可以很輕松地完成測試纽乱。我們只需調(diào)用 一次countDown()方法就可以讓所有的等待線程同時(shí)恢復(fù)執(zhí)行蛾绎。
開始執(zhí)行前等待n個(gè)線程完成各自任務(wù):例如應(yīng)用程序啟動(dòng)類要確保在處理用戶請求前昆箕,所有N個(gè)外部系統(tǒng)已經(jīng)啟動(dòng)和運(yùn)行了鸦列,例如處理excel中多個(gè)表單。
import tools.SleepTools;
import java.util.concurrent.CountDownLatch;
/**
* @author sxylml
* @Date : 2019/5/25 10:10
* @Description: 演示CountDownLatch用法鹏倘,共5個(gè)初始化子線程薯嗤,6個(gè)閉鎖扣除點(diǎn),扣除完畢后纤泵,主線程和業(yè)務(wù)線程才能繼續(xù)執(zhí)行
*/
public class UseCountDownLatch {
/**
* 可以自己嘗試骆姐,分別設(shè)置 6,大于6 小于6捏题,看運(yùn)行結(jié)果
*/
static CountDownLatch latch = new CountDownLatch(6);
/**初始化線程*/
private static class InitThread implements Runnable {
@Override
public void run() {
System.out.println("Thread_" + Thread.currentThread().getId() + " ready init work......");
latch.countDown();
for (int i = 0; i < 2; i++) {
System.out.println("Thread_" + Thread.currentThread().getId() + " ........continue do its work");
}
}
}
/**業(yè)務(wù)線程等待latch的計(jì)數(shù)器為0完成*/
private static class BusiThread implements Runnable {
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < 3; i++) {
System.out.println("BusiThread_" + Thread.currentThread().getId() + " do business-----");
}
}
}
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
SleepTools.ms(1);
System.out.println("Thread_" + Thread.currentThread().getId() + " ready init work step 1st......");
latch.countDown();
System.out.println("begin step 2nd.......");
SleepTools.ms(1);
System.out.println("Thread_" + Thread.currentThread().getId() + " ready init work step 2nd......");
latch.countDown();
}
}).start();
new Thread(new BusiThread()).start();
for (int i = 0; i <= 3; i++) {
Thread thread = new Thread(new InitThread());
thread.start();
}
latch.await();
System.out.println("Main do ites work........");
}
}
CyclicBarrier
CyclicBarrier的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)玻褪。它要做的事情是,讓一組線程到達(dá)一個(gè)屏障(也可以叫同步點(diǎn))時(shí)被阻塞公荧,直到最后一個(gè)線程到達(dá)屏障時(shí)带射,屏障才會(huì)開門,所有被屏障攔截的線程才會(huì)繼續(xù)運(yùn)行循狰。CyclicBarrier默認(rèn)的構(gòu)造方法是CyclicBarrier(int parties)窟社,其參數(shù)表示屏障攔截的線程數(shù)量,每個(gè)線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達(dá)了屏障绪钥,然后當(dāng)前線程被阻塞灿里。
CyclicBarrier還提供一個(gè)更高級(jí)的構(gòu)造函數(shù)CyclicBarrier(int parties,Runnable barrierAction)程腹,用于在線程到達(dá)屏障時(shí)匣吊,優(yōu)先執(zhí)行barrierAction,方便處理更復(fù)雜的業(yè)務(wù)場景寸潦。
CyclicBarrier可以用于多線程計(jì)算數(shù)據(jù)缀去,最后合并計(jì)算結(jié)果的場景。
package com.ch2.forkjoin.tools;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
/**
* @author sxylml
* @Date : 2019/5/25 10:39
* @Description: 演示CyclicBarrier用法, 共4個(gè)子線程甸祭,他們?nèi)客瓿晒ぷ骱舐扑椋怀鲎约航Y(jié)果,
* 再被統(tǒng)一釋放去做自己的事情池户,而交出的結(jié)果被另外的線程拿來拼接字符串
*/
public class UseCyclicBarrier {
/**
*
*/
private static CyclicBarrier barrier = new CyclicBarrier(4, new CollectThread());
/**
* 存放子線程工作結(jié)果的容器
*/
private static ConcurrentHashMap<String, Long> resultMap = new ConcurrentHashMap<>();
/*匯總的任務(wù)*/
private static class CollectThread implements Runnable {
@Override
public void run() {
StringBuilder result = new StringBuilder();
for (Map.Entry<String, Long> workResult : resultMap.entrySet()) {
result.append("[" + workResult.getValue() + "]");
}
System.out.println(" the result = " + result);
System.out.println("do other business........");
}
}
/*相互等待的子線程*/
private static class SubThread implements Runnable {
@Override
public void run() {
long id = Thread.currentThread().getId();
resultMap.put(Thread.currentThread().getId() + "", id);
try {
Thread.sleep(1000 + id);
System.out.println("Thread_" + id + " ....do something ");
barrier.await();
Thread.sleep(1000 + id);
System.out.println("Thread_" + id + " ....do its business ");
// 可以重復(fù)執(zhí)行匯總
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 4; i++) {
Thread thread = new Thread(new SubThread());
thread.start();
}
}
}
CountDownLatch和CyclicBarrier辨析
CountDownLatch的計(jì)數(shù)器只能使用一次咏雌,而CyclicBarrier的計(jì)數(shù)器可以反復(fù)使用凡怎。
CountDownLatch.await一般阻塞工作線程,所有的進(jìn)行預(yù)備工作的線程執(zhí)行countDown赊抖,而CyclicBarrier通過工作線程調(diào)用await從而自行阻塞统倒,直到所有工作線程達(dá)到指定屏障,再大家一起往下走氛雪。
在控制多個(gè)線程同時(shí)運(yùn)行上房匆,CountDownLatch可以不限線程數(shù)量,而CyclicBarrier是固定線程數(shù)报亩。
同時(shí)浴鸿,CyclicBarrier還可以提供一個(gè)barrierAction,合并多線程計(jì)算結(jié)果弦追。
Semaphore
Semaphore (信號(hào)量) 是用來控制同時(shí)訪問特定資源的線程數(shù)量岳链,它通過協(xié)調(diào)各個(gè)線程,以保證合理的使用公共資源劲件。應(yīng)用場景Semaphore 可以用于做流量控制掸哑,特別是公用資源有限的應(yīng)用場景,比如數(shù)據(jù)庫連接零远。
假如有一個(gè)需求苗分。
要讀取幾十萬個(gè)文件的數(shù)據(jù),因?yàn)槎际荌O密集型任務(wù)牵辣,我們可以啟用幾十個(gè)線程并發(fā)地讀取摔癣,但是如果讀到內(nèi)存后,還需要存儲(chǔ)到數(shù)據(jù)庫中服猪,二數(shù)據(jù)庫的連接數(shù)量只有10個(gè)供填,這時(shí)我們必須控制只有10個(gè)線程同事獲取數(shù)據(jù)庫連接保存數(shù)據(jù)。否則會(huì)報(bào)錯(cuò)無法獲取數(shù)據(jù)庫連接罢猪。這個(gè)時(shí)候近她,就可以使用Semaphore 來做流量控制。Semaphore 的構(gòu)造方法Semaphore (int permits) 接受一個(gè)整型的數(shù)字膳帕,表示可用的許可證數(shù)量粘捎。Semaphore 的用法也很簡單,首先線程使用Semaphore的acquire() 方法獲取一個(gè)許可證危彩,使用完之后調(diào)用release() 方法歸還許可證攒磨。還可以用tryAcquire()方法嘗試獲取許可證。
Semaphore 還提供一些其它方法汤徽,如下:
intavailablePermits(): 返回此信號(hào)量中當(dāng)前可用的許可證數(shù)娩缰。
intgetQuenueLength();返回正在等待獲取許可證的線程數(shù)。
booleanhasQueuedThreads(); 是否有線程正在等待獲取許可證谒府。
void reducePermits(int reduction); 減少reduction 個(gè)許可證拼坎,是個(gè)protected 方法浮毯。
Collection getQueuedThreads(); 返回所有等待獲取許可證的線程集合,是個(gè)protected方法泰鸡。
用Semaphore實(shí)現(xiàn)數(shù)據(jù)庫連接池
public class SqlConnectImpl implements Connection {
/**
* 拿一個(gè)數(shù)據(jù)庫連接
*/
public static final Connection fetchConnection() {
return new SqlConnectImpl();
}
// 實(shí)現(xiàn)其它方法
}
package com.ch2.forkjoin.tools.semaphore;
import java.sql.Connection;
import java.util.LinkedList;
import java.util.concurrent.Semaphore;
/**
* @author sxylml
* @Date : 2019/5/25 11:27
* @Description: 演示Semaphore用法债蓝,一個(gè)數(shù)據(jù)庫連接池的實(shí)現(xiàn)
*/
public class DBPoolSemaphore {
/**
* 連接池大小
*/
private final static int POOL_SIZE = 10;
/**
* 兩個(gè)指示器,分別表示池子還有可用連接和已用連接
*/
private final Semaphore useful, useless;
/**
* 存放數(shù)據(jù)庫連接的容器
*/
private static LinkedList<Connection> pool = new LinkedList<Connection>();
//初始化池
static {
for (int i = 0; i < POOL_SIZE; i++) {
pool.addLast(SqlConnectImpl.fetchConnection());
}
}
public DBPoolSemaphore() {
this.useful = new Semaphore(10);
this.useless = new Semaphore(0);
}
/**
* 歸還連接
*/
public void returnConnect(Connection connection) throws InterruptedException {
if (connection != null) {
System.out.println("當(dāng)前有" + useful.getQueueLength() + "個(gè)線程等待數(shù)據(jù)庫連接!"
+ "可用連接數(shù):" + useful.availablePermits());
useless.acquire();
synchronized (pool) {
pool.addLast(connection);
}
useful.release();
}
}
/**
* 從池子拿連接
*/
public Connection takeConnect() throws InterruptedException {
useful.acquire();
Connection connection;
synchronized (pool) {
connection = pool.removeFirst();
}
useless.release();
return connection;
}
}
import tools.SleepTools;
import java.sql.Connection;
import java.util.Random;
/**
* @author sxylml
* @Date : 2019/5/25 14:09
* @Description:
*/
public class AppTest {
private static DBPoolSemaphore dbPool = new DBPoolSemaphore();
private static class BusiThread extends Thread {
@Override
public void run() {
/**
* 讓每個(gè)線程持有連接的時(shí)間不一樣
*/
Random r = new Random();
long start = System.currentTimeMillis();
try {
Connection connect = dbPool.takeConnect();
System.out.println("Thread_" + Thread.currentThread().getId() + "_獲取數(shù)據(jù)庫連接共耗時(shí)【" + (System.currentTimeMillis() - start) + "】ms.");
/**
* 模擬業(yè)務(wù)操作盛龄,線程持有連接查詢數(shù)據(jù)
*/
SleepTools.ms(100 + r.nextInt(100));
System.out.println("查詢數(shù)據(jù)完成饰迹,歸還連接!");
dbPool.returnConnect(connect);
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 50; i++) {
Thread thread = new BusiThread();
thread.start();
}
}
}
Semaphore注意事項(xiàng)
package com.ch2.forkjoin.tools.semaphore;
import tools.SleepTools;
import java.sql.Connection;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.Semaphore;
/**
* @author sxylml
* @Date : 2019/5/25 11:36
* @Description: 演示Semaphore用法余舶,一個(gè)數(shù)據(jù)庫連接池的實(shí)現(xiàn)
*/
public class DBPoolNoUseless {
private final static int POOL_SIZE = 10;
private final Semaphore useful;
/**
* 存放數(shù)據(jù)庫連接的容器
*/
private static LinkedList<Connection> pool = new LinkedList<Connection>();
/**
*
*初始化池
*/
static {
for (int i = 0; i < POOL_SIZE; i++) {
pool.addLast(SqlConnectImpl.fetchConnection());
}
}
public DBPoolNoUseless() {
this.useful = new Semaphore(10);
}
/*歸還連接*/
public void returnConnect(Connection connection) throws InterruptedException {
if (connection != null) {
System.out.println("當(dāng)前有" + useful.getQueueLength() + "個(gè)線程等待數(shù)據(jù)庫連接!!"
+ "可用連接數(shù):" + useful.availablePermits());
synchronized (pool) {
pool.addLast(connection);
}
useful.release();
}
}
/*從池子拿連接*/
public Connection takeConnect() throws InterruptedException {
useful.acquire();
Connection connection;
synchronized (pool) {
connection = pool.removeFirst();
}
return connection;
}
private static DBPoolNoUseless dbPoolNoUseless = new DBPoolNoUseless();
private static class BusiThread extends Thread {
@Override
public void run() {
/**
*讓每個(gè)線程持有連接的時(shí)間不一樣
*/
Random r = new Random();
long start = System.currentTimeMillis();
try {
System.out.println("Thread_" + Thread.currentThread().getId()
+ "_獲取數(shù)據(jù)庫連接共耗時(shí)【" + (System.currentTimeMillis() - start) + "】ms.");
/**
* 模擬業(yè)務(wù)操作啊鸭,線程持有連接查詢數(shù)據(jù)
*/
SleepTools.ms(100 + r.nextInt(100));
System.out.println("查詢數(shù)據(jù)完成,歸還連接欧芽!");
dbPoolNoUseless.returnConnect(new SqlConnectImpl());
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 50; i++) {
Thread thread = new BusiThread();
thread.start();
}
}
}
Exchange
Exchanger(交換者)是一個(gè)用于線程間協(xié)作的工具類莉掂。Exchanger用于進(jìn)行線程間的數(shù)據(jù)交換葛圃。它提供一個(gè)同步點(diǎn)千扔,在這個(gè)同步點(diǎn),兩個(gè)線程可以交換彼此的數(shù)據(jù)库正。這兩個(gè)線程通過exchange方法交換數(shù)據(jù)曲楚,如果第一個(gè)線程先執(zhí)行exchange()方法,它會(huì)一直等待第二個(gè)線程也執(zhí)行exchange方法褥符,當(dāng)兩個(gè)線程都到達(dá)同步點(diǎn)時(shí)龙誊,這兩個(gè)線程就可以交換數(shù)據(jù),將本線程生產(chǎn)出來的數(shù)據(jù)傳遞給對方喷楣。
package com.ch2.forkjoin.tools;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Exchanger;
/**
* @author sxylml
* @Date : 2019/5/25 16:04
* @Description: 演示Exchange用法
*/
public class UseExchange {
private static final Exchanger<Set<String>> exchange = new Exchanger<Set<String>>();
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
//存放數(shù)據(jù)的容器
Set<String> setA = new HashSet<String>();
try {
/*添加數(shù)據(jù)
* set.add(.....)
* */
setA.add("A");
setA.add("a");
//交換set
setA = exchange.exchange(setA);
setA.forEach(src -> System.out.println("setA"+src));
/**處理交換后的數(shù)據(jù)*/
} catch (InterruptedException e) {
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
/**
* 存放數(shù)據(jù)的容器
*/
Set<String> setB = new HashSet<String>();
try {
/*添加數(shù)據(jù)
* set.add(.....)
* set.add(.....)
* */
setB.add("B");
setB.add("b");
//交換set
setB = exchange.exchange(setB);
/**處理交換后的數(shù)據(jù)*/
setB.forEach(src -> System.out.println("setB:" + src));
} catch (InterruptedException e) {
}
}
}).start();
}
}
Callable趟大、Future和FutureTask
Runnable是一個(gè)接口,在它里面只聲明了一個(gè)run()方法铣焊,由于run()方法返回值為void類型逊朽,所以在執(zhí)行完任務(wù)之后無法返回任何結(jié)果。
Callable位于java.util.concurrent包下曲伊,它也是一個(gè)接口叽讳,在它里面也只聲明了一個(gè)方法,只不過這個(gè)方法叫做call()坟募,這是一個(gè)泛型接口岛蚤,call()函數(shù)返回的類型就是傳遞進(jìn)來的V類型。
Future就是對于具體的Runnable或者Callable任務(wù)的執(zhí)行結(jié)果進(jìn)行取消懈糯、查詢是否完成涤妒、獲取結(jié)果。必要時(shí)可以通過get方法獲取執(zhí)行結(jié)果赚哗,該方法會(huì)阻塞直到任務(wù)返回結(jié)果她紫。
FutureTask類實(shí)現(xiàn)了RunnableFuture接口铁坎,RunnableFuture繼承了Runnable接口和Future接口,而FutureTask實(shí)現(xiàn)了RunnableFuture接口犁苏。所以它既可以作為Runnable被線程執(zhí)行硬萍,又可以作為Future得到Callable的返回值。
因此我們通過一個(gè)線程運(yùn)行Callable围详,但是Thread不支持構(gòu)造方法中傳遞Callable的實(shí)例朴乖,所以我們需要通過FutureTask把一個(gè)Callable包裝成Runnable,然后再通過這個(gè)FutureTask拿到Callable運(yùn)行后的返回值助赞。
要new一個(gè)FutureTask的實(shí)例买羞,有兩種方法
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
* @author sxylml
* @Date : 2019/5/25 16:58
* @Description:
*/
public class UseFuture {
private static class UseCallable implements Callable<Integer> {
int sum = 0;
@Override
public Integer call() throws Exception {
System.out.println("Callable子線程開始計(jì)算庄萎!");
for (int i = 0; i < 5000; i++) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("Callable子線程計(jì)算任務(wù)中斷耀怜!");
return null;
}
sum = sum + i;
System.out.println("sum=" + sum);
}
System.out.println("Callable子線程計(jì)算結(jié)束淹父!結(jié)果為: " + sum);
return sum;
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
UseCallable useCallable = new UseCallable();
//包裝
FutureTask<Integer> futureTask = new FutureTask<>(useCallable);
Random r = new Random();
new Thread(futureTask).start();
System.out.println("Get UseCallable result = " + futureTask.get());
futureTask.cancel(true);
}
}