1谍咆、ConcurrentHashMap的實現(xiàn)原理與使用
1.1变隔、為什么使用ConcurrentHashMap
- HashMap非線程安全
- HashTable讀寫都需要加鎖哼转,效率低下
- ConcurrentHashMap的鎖分段技術可以提高并發(fā)效率
1.2、ConcurrentHashMap的結構
ConcurrentHashMap由Segment數(shù)組結構和HashEntry數(shù)組結構組成,Segement是一種可重入鎖抖坪,在ConcurrentHashMap扮演著鎖的角色;HashEntry用于存儲鍵值對數(shù)據(jù)闷叉,一個ConcurrentHashMap中包含一個Segment數(shù)組擦俐,它是數(shù)組和鏈表結構。一個Segment里包含一個HashEntry數(shù)組握侧,當對HashEntry數(shù)組進行修改操作時必須要獲取它對應的Segment鎖蚯瞧。
1.3、ConcurrentHashMap的初始化
1.4品擎、定位Segment
通過散列算法定位Segment埋合,散列沖突
2、ConcurrentLinkedQueue
并發(fā)編程中實現(xiàn)線程安全的隊列有兩種方式萄传,一種是阻塞隊列甚颂,一種是非阻塞隊列,非阻塞的實現(xiàn)方式可以通過CAS方式來實現(xiàn)秀菱。
ConcurrentLinkedQueue是一個基于鏈接節(jié)點的無界安全隊列振诬。它采用先進先出的方式對節(jié)點進行排序,當我們添加一個元素的時候答朋,它會添加至隊列的隊尾贷揽,當我們獲取元素的時候,它會返回隊列頭部的元素梦碗。
3禽绪、java中的阻塞隊列
阻塞隊列支持阻塞的添加/移除元素的方法环壤。支持阻塞的插入的意思是:當隊列已滿時萨蚕,隊列會阻塞插入隊列的線程,直到隊列有空位稼锅;支持阻塞的移除的意思是:當隊列為空時斩例,隊列會阻塞移除隊列元素的線程雄人,直到隊列中有新的元素添加進來。
阻塞隊列場用于生產(chǎn)/消費者模式念赶,生產(chǎn)者是向隊列中添加元素的線程础钠,消費者是從隊列中獲取元素的線程,而阻塞隊列在其中充當著容器的角色叉谜。
阻塞隊列的插入和移除有四種操作方式旗吁,詳情請參考文檔。
java中有7中阻塞隊列停局,分別是:
- ArrayBlockingQueue:一個由數(shù)組結構組成的有界阻塞隊列
- LinkedBlockingQueue:一個由鏈表結構組成的有界阻塞隊列很钓。
- PriorityBlockingQueue:一個支持優(yōu)先級排序的無界阻塞隊列香府。
- DelayQueue:一個支持延時獲取元素的無界阻塞隊列。
- SynchronousQueue:一個不存儲元素的阻塞隊列码倦。
- LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列企孩。
- LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。
著重介紹下DelayQueue袁稽,它可以運用在以下業(yè)務場景:
- 緩存系統(tǒng)的設置:可以用DelayQueue保存元素的有效期勿璃,使用一個線程無限循環(huán)查詢DelayQueue,一旦能從DelayQueue中獲取到元素推汽,則說明緩存有效期到了蝗柔。
- 使用DelayQueue保存當天將會執(zhí)行的任務和時間,一旦從DelayQueue獲取到任務民泵,就開始執(zhí)行,TimeQueue就是使用DelayQueue來實現(xiàn)的槽畔。
在公司有這么一個業(yè)務場景:訂單支付后要給商戶發(fā)送相應的通知栈妆,針對同一條通知記錄,如果是第一次發(fā)厢钧,則需要等待的時間是0分鐘鳞尔,第二次發(fā)則需要等待1分鐘,第三次發(fā)則需要等待3分鐘早直,即發(fā)送次數(shù)每+1寥假,則需要等待的時長也要相應的增加,那使用DelayQueue就能很好的實現(xiàn)這個功能了霞扬。以下是參考實現(xiàn):
package main.java.com.robot.demo;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @author: 會跳舞的機器人
* @date: 2017/8/17 14:43
* @description: 通知任務
*/
public class NotifyTask implements Delayed, Runnable {
/**
* 任務名稱
*/
private String notifyTaskName;
/**
* 執(zhí)行時間糕韧,單位:毫秒
*/
private long executeTime;
public NotifyTask(String notifyTaskName, long executeTime) {
this.notifyTaskName = notifyTaskName;
this.executeTime = executeTime;
}
/**
* 獲取延遲時間
*
* @param unit
* @return 返回當前元素還需要延長多久時間
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), unit.MILLISECONDS);
}
/**
* 用來指定元素的順序,讓延時時間最長的放在隊列的末尾
*
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
NotifyTask notifyTask = (NotifyTask) o;
return executeTime > notifyTask.executeTime ? 1 : (executeTime < notifyTask.executeTime ? -1 : 0);
}
@Override
public void run() {
System.out.println("當前時間毫秒數(shù):" + System.currentTimeMillis() + ",線程:" + this.toString() + "正在執(zhí)行");
}
@Override
public String toString() {
return "NotifyTask{" +
"notifyTaskName='" + notifyTaskName + '\'' +
", executeTime=" + executeTime +
'}';
}
}
測試:
package main.java.com.robot.demo;
import java.util.Random;
import java.util.concurrent.DelayQueue;
/**
* @author: 會跳舞的機器人
* @date: 2017/8/17 14:52
* @description: 延遲隊列DelayQueue測試
*/
public class NotifyTaskTest {
/**
* 通知任務存放的延遲隊列
*/
private static DelayQueue<NotifyTask> tasks = new DelayQueue<>();
public static void main(String[] args) {
Random random = new Random();
for (int i = 0; i < 5; i++) {
// 隨機產(chǎn)生一個秒數(shù)
int seconds = random.nextInt(5);
NotifyTask notifyTask = new NotifyTask("任務" + i, System.currentTimeMillis() + (seconds * 1000));
tasks.put(notifyTask);
}
while (true) {
NotifyTask notifyTask = tasks.poll();
if (notifyTask != null) {
notifyTask.run();
}
// 如果隊列中的元素全部被取完,則跳出循環(huán)
if (tasks.size() == 0) {
break;
}
}
}
}
控制臺輸出:
當前時間毫秒數(shù):1502953649855喻圃,線程:NotifyTask{notifyTaskName='任務1', executeTime=1502953649855}正在執(zhí)行
當前時間毫秒數(shù):1502953649855萤彩,線程:NotifyTask{notifyTaskName='任務3', executeTime=1502953649855}正在執(zhí)行
當前時間毫秒數(shù):1502953650855,線程:NotifyTask{notifyTaskName='任務0', executeTime=1502953650855}正在執(zhí)行
當前時間毫秒數(shù):1502953651855斧拍,線程:NotifyTask{notifyTaskName='任務2', executeTime=1502953651855}正在執(zhí)行
當前時間毫秒數(shù):1502953651855雀扶,線程:NotifyTask{notifyTaskName='任務4', executeTime=1502953651855}正在執(zhí)行
從輸出中可以看出任務的執(zhí)行時間都是我們創(chuàng)建任務的時候指定的時間。
4肆汹、Fork/Join框架
4.1愚墓、什么是Fork/Join框架
Fork/Join框架是Java 7提供的一個用于并行執(zhí)行任務的框架,是一個把大任務分割成若干
個小任務昂勉,最終匯總每個小任務結果后得到大任務結果的框架浪册。
我們再通過Fork和Join這兩個單詞來理解一下Fork/Join框架。Fork就是把一個大任務切分
為若干子任務并行的執(zhí)行硼啤,Join就是合并這些子任務的執(zhí)行結果议经,最后得到這個大任務的結
果斧账。比如計算1+2+…+10000,可以分割成10個子任務煞肾,每個子任務分別對1000個數(shù)進行求和咧织,
最終匯總這10個子任務的結果。
4.2籍救、工作竊取算法
工作竊认熬睢(work-stealing)算法是指某個線程從其他隊列里竊取任務來執(zhí)行。那么蝙昙,為什么
需要使用工作竊取算法呢闪萄?假如我們需要做一個比較大的任務,可以把這個任務分割為若干
互不依賴的子任務奇颠,為了減少線程間的競爭败去,把這些子任務分別放到不同的隊列里,并為每個
隊列創(chuàng)建一個單獨的線程來執(zhí)行隊列里的任務烈拒,線程和隊列一一對應圆裕。比如A線程負責處理A
隊列里的任務。但是荆几,有的線程會先把自己隊列里的任務干完吓妆,而其他線程對應的隊列里還有
任務等待處理。干完活的線程與其等著吨铸,不如去幫其他線程干活行拢,于是它就去其他線程的隊列
里竊取一個任務來執(zhí)行。而在這時它們會訪問同一個隊列诞吱,所以為了減少竊取任務線程和被
竊取任務線程之間的競爭舟奠,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿
任務執(zhí)行房维,而竊取任務的線程永遠從雙端隊列的尾部拿任務執(zhí)行鸭栖。
工作竊取算法的優(yōu)點:充分利用線程進行并行計算,減少了線程間的競爭握巢。
工作竊取算法的缺點:在某些情況下還是存在競爭晕鹊,比如雙端隊列里只有一個任務時。并
且該算法會消耗了更多的系統(tǒng)資源暴浦,比如創(chuàng)建多個線程和多個雙端隊列溅话。
4.3、使用Fork/Join框架
讓我們通過一個簡單的需求來使用Fork/Join框架歌焦,需求是:計算1+2+3+4的結果飞几。
package main.java.com.robot.demo;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
/**
* @author: 會跳舞的機器人
* @date: 2017/8/17 15:45
* @description: Fork/Join框架簡單demo
*/
public class CountTask extends RecursiveTask<Integer> {
/**
* 閾值
*/
private static final int THRESHOLD = 2;
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
// 判斷任務是否足夠小,足夠小就直接計算
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任務大于閾值独撇,就分解成兩個任務執(zhí)行
int midel = (end - start) / 2;
CountTask leftTask = new CountTask(start, midel);
CountTask rightTask = new CountTask(midel + 1, end);
// 執(zhí)行子任務
leftTask.fork();
rightTask.fork();
// 等待計算結果屑墨,合并子任務
int leftResult = leftTask.join();
int rightResult = rightTask.join();
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 生成一個計算任務躁锁,負責計算1+2+3+4
CountTask task = new CountTask(1, 4);
// 執(zhí)行一個任務
Future<Integer> result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException e) {
} catch (ExecutionException e) {
}
}
}
4.4、Fork/Join框架設計及其實現(xiàn)原理
參考書本