在講并發(fā)容器之前冀膝,先看一個(gè)小案例,經(jīng)典的多線程買票問題
(1)普通的思路來寫霎挟,分析問題所在
下面程序模擬賣票可能會(huì)出現(xiàn)兩個(gè)問題:①票賣重了 ②還剩最后一張票時(shí)窝剖,好幾個(gè)線程同時(shí)搶,出現(xiàn)-1張票
出現(xiàn)上面兩個(gè)問題主要是因?yàn)椋孩賠emove()方法不是原子性的 ②判斷+操作不是原子性的
public class TicketSeller1 {
static List<String> tickets = new ArrayList<>();
static {
for (int i=0; i<10000; i++) { //共一萬張票
tickets.add("票編號(hào)--" + i);
}
}
public static void main(String[] args) {
for (int i=0; i<10; i++) { //共10個(gè)線程賣票
new Thread(()->{
while(tickets.size() > 0) { //判斷余票
System.out.println("銷售了..." + tickets.remove(0)); //操作減票
}
}).start();
}
}
}
(2)使用線程安全的容器Vector
本程序雖然用了Vector作為容器酥夭,Vector中的方法都是原子性的赐纱,但是在判斷size和減票的中間還是可能被打斷的,即被減到-1張
public class TicketSeller2 {
static Vector<String> tickets = new Vector<>(); //Vector是一個(gè)同步容器
static {
for (int i=0; i<100; i++) tickets.add("票編號(hào)-" + i);
}
public static void main(String[] args) {
for (int i=0; i<10; i++) {
new Thread(()->{
while(tickets.size() > 0) { //判斷余票
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("銷售了--"+tickets.remove(0)); //操作減票
}
}).start();
}
}
}
(3)在判斷和操作放在同步代碼塊中
將判斷和操作外面加鎖熬北,程序完全沒有功能上的問題疙描,但是效率很低
public class TicketSeller3 {
static List<String> tickets = new LinkedList<>();
static {
for (int i=0; i<100; i++) { //共100張票
tickets.add("票編號(hào):" + i);
}
}
public static void main(String[] args) {
for (int i=0; i<10; i++) { //共10個(gè)線程賣票
new Thread(()->{
while(true) {
synchronized (tickets) {
if (tickets.size() <= 0) break; //判斷 余票
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("銷售了--" + tickets.remove(0)); //操作減票
}
}
}).start();
}
}
}
(4)使用隊(duì)列(Queue)來實(shí)現(xiàn)
ConcurrentLinkedQueue底層不是加鎖的實(shí)現(xiàn),而是ConcurrentSet讶隐,效率會(huì)高很多起胰。
Queue一開始不是空的。先poll巫延,再判斷tickets是不是空的效五,最后沒有任何操作地消,所以不用加鎖也不會(huì)出現(xiàn)任何問題
public class TicketSeller4 {
static Queue<String> tickets = new ConcurrentLinkedQueue<>();
static {
for (int i=0; i<1000; i++) {
System.out.println("票編號(hào):" + i );
}
}
public static void main(String[] args) {
for (int i=0; i<10; i++) {
new Thread( ()-> {
while(true) {
String str = tickets.poll(); //poll方法是原子性的,拿出一張票
if(str == null) break;
else System.out.println("銷售了.." + str);
}
}).start();
}
}
}
一畏妖、List和Map相關(guān)
1脉执、ConcurrentHashMap
HashMap和HashTable的區(qū)別就是HashTable是線程安全的,支持并發(fā)操作戒劫,效率不夠高半夷,所有方法都加了鎖,而HashMap線程不安全谱仪,實(shí)現(xiàn)相對簡單玻熙,不支持并發(fā)操作。
JDK1.7中ConcurrentHashMap支持并發(fā)操作疯攒,采用ReentrantLock+Segment+HashEntry機(jī)制嗦随,整個(gè) ConcurrentHashMap 由一個(gè)個(gè) Segment 組成,需要經(jīng)過兩次hash運(yùn)算敬尺,先定位到Segment枚尼,第二次定位到元素所在的頭部。Segment 通過繼承 ReentrantLock 來加鎖砂吞,所以每次需要加鎖的操作鎖住的是一個(gè) segment署恍,這樣只要保證每個(gè) Segment 是線程安全的,也就實(shí)現(xiàn)了全局的線程安全蜻直。concurrencyLevel(并行級(jí)別/并發(fā)數(shù)/Segment 數(shù))默認(rèn)是 16盯质,即 ConcurrentHashMap 有 16 個(gè) Segments,所以理論上概而,最多可以同時(shí)支持 16 個(gè)線程并發(fā)寫呼巷,只要它們的操作分別分布在不同的 Segment 上。這個(gè)值可以在初始化的時(shí)候設(shè)置為其他值赎瑰,但是一旦初始化以后王悍,它是不可以擴(kuò)容的。
JDK 1.8摒棄了1.7中的分段鎖的概念餐曼,采用的是synchronized+CAS+HashEntry+紅黑樹压储。
CAS是compare and swap的縮寫,即我們所說的比較交換源譬。cas是一種基于鎖的操作集惋,而且是樂觀鎖。在java中鎖分為樂觀鎖和悲觀鎖踩娘。悲觀鎖是將資源鎖住刮刑,等一個(gè)之前獲得鎖的線程釋放鎖之后,下一個(gè)線程才可以訪問。而樂觀鎖采取了一種寬泛的態(tài)度为朋,通過某種方式不加鎖來處理資源,比如通過給記錄加version來獲取數(shù)據(jù)厚脉,性能較悲觀鎖有很大的提高习寸。
CAS 操作包含三個(gè)操作數(shù) —— 內(nèi)存位置(V)、預(yù)期原值(A)和新值(B)傻工。如果內(nèi)存地址里面的值和A的值是一樣的霞溪,那么就將內(nèi)存里面的值更新成B。CAS是通過無限循環(huán)來獲取數(shù)據(jù)的中捆,若果在第一輪循環(huán)中鸯匹,a線程獲取地址里面的值被b線程修改了,那么a線程需要自旋泄伪,到下次循環(huán)才有可能機(jī)會(huì)執(zhí)行殴蓬。
Node:保存key,value及key的hash值的數(shù)據(jù)結(jié)構(gòu)蟋滴。其中value和next都用volatile修飾染厅,保證并發(fā)的可見性。
2津函、ConcurrentSkipListMap和ConcurrentSkipSet
跳表(SkipList)是一種隨機(jī)化的數(shù)據(jù)結(jié)構(gòu)肖粮,通過“空間來換取時(shí)間”的一個(gè)算法,建立多級(jí)索引尔苦,實(shí)現(xiàn)以二分查找遍歷一個(gè)有序鏈表涩馆。時(shí)間復(fù)雜度等同于紅黑樹,O(log n)允坚。ConcurrentSkipListMap和ConcurrentSkipSet
是基于跳表實(shí)現(xiàn)的一種線程安全的并且可以排序的容器魂那。插入時(shí)效率比較低,但查找時(shí)效率較高屋讶。
總結(jié)
- 在不加鎖的情況下冰寻,可以用:HashMap、TreeMap皿渗、LinkedHashMap斩芭。想加鎖可以用Hashtable(用的非常少)。
- 在并發(fā)量不是很高的情況下乐疆,可以用Collections.synchronizedXxx()方法划乖,在該方法中傳一個(gè)不加鎖的容器(如Map),它返回一個(gè)加了鎖的容器(容器中的所有方法加鎖)挤土!
- 在并發(fā)性比較高的情況下琴庵,用ConcurrentHashMap ,如果并發(fā)性高且要排序的情況下,用ConcurrentSkipListMap迷殿。
3儿礼、CopyOnWriteArrayList
CopyOnWriteArrayList在多線程環(huán)境下,寫時(shí)效率低庆寺,讀時(shí)效率高蚊夫,適合寫少讀多的環(huán)境,比如事件監(jiān)聽器懦尝。
- 寫時(shí)復(fù)制:添加元素的時(shí)候知纷,會(huì)把這個(gè)容器復(fù)制一份,在復(fù)制的那份后面加一個(gè)新的陵霉,將引用指向復(fù)制的那份琅轧。
- 讀的時(shí)候不用加鎖,適合寫的很少踊挠,讀的特別多的時(shí)候乍桂。
public class CopyOnWriteListTest {
public static void main(String[] args) {
List<String> list =
// new ArrayList<>(); //這個(gè)會(huì)出并發(fā)問題,最后size<100000,止毕,運(yùn)行時(shí)間:0.1秒多
// new Vector<>(); //size=100000模蜡,,運(yùn)行時(shí)間:0.1秒多
new CopyOnWriteArrayList<>(); //size=100000扁凛,效率很低忍疾,因?yàn)橐恢痹凇皬?fù)制、寫”谨朝,運(yùn)行時(shí)間:5秒多
Random r = new Random();
Thread[] threads = new Thread[100];
for (int i=0; i<threads.length; i++) { //起100個(gè)線程卤妒,每個(gè)線程向容器中加1000個(gè)數(shù)(最終應(yīng)該是10萬個(gè)數(shù))
Runnable task = new Runnable() {
@Override
public void run() {
for (int j=0; j<1000; j++) list.add("a" + r.nextInt());
}
};
threads[i] = new Thread(task);
}
runAndComputeTime(threads);
System.out.println(list.size());
}
static void runAndComputeTime(Thread[] threads) {
long start = System.currentTimeMillis();
Arrays.asList(threads).forEach(t->t.start());
Arrays.asList(threads).forEach(t->{
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.currentTimeMillis();
System.out.println(end-start);
}
}
二、Queue隊(duì)列相關(guān)
Java提供的線程安全的Queue可以分為阻塞隊(duì)列和非阻塞隊(duì)列字币,其中阻塞隊(duì)列的典型例子是BlockingQueue则披,非阻塞隊(duì)列的典型例子是ConcurrentLinkedQueue
1、ConcurrentLinkedQueue 并發(fā)隊(duì)列
- 一個(gè)基于鏈接節(jié)點(diǎn)的無界線程安全隊(duì)列洗出。此隊(duì)列按照 FIFO(先進(jìn)先出)原則對元素進(jìn)行排序士复。隊(duì)列的頭部 是隊(duì)列中時(shí)間最長的元素。隊(duì)列的尾部 是隊(duì)列中時(shí)間最短的元素翩活。
*ConcurrentLinkedQueue是非阻塞隊(duì)列阱洪,其沒有put和take方法,可以無限制存菠镇,且是線程安全的冗荸,N個(gè)用戶同時(shí)存也能保證每次存放在隊(duì)尾而不亂掉,但是其的size()方法會(huì)不定時(shí)遍歷利耍,所以特耗時(shí)
public class ConcurrentQueue {
public static void main(String[] args) {
Queue<String> strs = new ConcurrentLinkedQueue<>(); //還有雙端隊(duì)列...Deque
for (int i=0; i<10; i++) {
//類似于add方法蚌本,如果是ArrayQueue盔粹,add方法可能會(huì)拋異常,但是offer方法不會(huì)拋異常程癌,返回boolean類型即是否添加成功
strs.offer("a"+i);
}
System.out.println(strs); //[a0, a1, a2, a3, a4, a5, a6, a7, a8, a9]
System.out.println("隊(duì)列原始大邢衔恕:" + strs.size()); //隊(duì)列原始大小:10
//poll方法表示從頭上拿出一個(gè)刪掉嵌莉;peek方法表示從頭上拿出一個(gè)用一下不刪咬崔。
System.out.println("poll " + strs.poll() + "后的大小為:" + strs.size()); //poll a0后的大小為:9
System.out.println("peek " + strs.peek() + "后的大小為:" + strs.size()); //peek a1后的大小為:9
}
}
2、 LinkedBlockingQueue 無界阻塞式隊(duì)列
public class LinkedBlockingQueueTest {
static BlockingQueue<String> strs = new LinkedBlockingQueue<>();
static Random r = new Random();
public static void main(String[] args) {
new Thread(()->{ //1個(gè)生產(chǎn)者線程
for (int i=0; i<100; i++) {
try {
strs.put("a" + i); //如果滿了烦秩,就會(huì)等待
TimeUnit.MILLISECONDS.sleep(r.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"producer").start();
for (int i=0; i<5; i++) { //5個(gè)消費(fèi)者進(jìn)程
new Thread(()-> {
for (;;) {
try {
System.out.println(Thread.currentThread().getName()
+ " take-" + strs.take()); //如果空了,就等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"customer"+i).start();
}
}
}
3郎仆、 ArrayBlockingQueue 有界阻塞式隊(duì)列
/*有界阻塞式隊(duì)列*/
public class ArrayBlockingQueueTest {
static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10); //最多裝10個(gè)
static Random r = new Random();
public static void main(String[] args) {
for (int i=0; i<10; i++) {
try {
strs.put("a" + i); //向容器中添加10個(gè)只祠,就滿了
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try { //strs已經(jīng)滿了,以下方法都加不進(jìn)去扰肌,但是處理方式不同
strs.put("aaa");//發(fā)現(xiàn)滿了抛寝,就會(huì)等待,程序阻塞
strs.add("aaa"); //已經(jīng)滿了曙旭,再往里面裝就會(huì)報(bào)異常
strs.offer("aaa");//不會(huì)報(bào)異常盗舰,但是加不進(jìn)去,返回是否添加成功
strs.offer("aaa",1,TimeUnit.SECONDS); //1秒鐘后加不進(jìn)去桂躏,就不往里面加了
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(strs);
}
}
4钻趋、 DelayQueue 執(zhí)行定時(shí)任務(wù)
往DelayQueue里加的元素是按時(shí)間排好序的,該隊(duì)列是無界的剂习。另外元素要實(shí)現(xiàn)Delayed接口蛮位,而Delayed接口又繼承了Comparable接口,所以該類元素需要實(shí)現(xiàn)compareTo()方法鳞绕,用來對元素在隊(duì)列中進(jìn)行排序失仁;并且每個(gè)元素記載著自己還有多長時(shí)間才能被拿走,還要實(shí)現(xiàn)getDelay()方法们何,只有g(shù)etDelay()返回值為負(fù)數(shù)時(shí)才能被拿走萄焦。
public class DelayQueueTest {
static DelayQueue<MyTask> tasks = new DelayQueue<>();
static class MyTask implements Delayed { //實(shí)現(xiàn)Delayed接口
long runningTime;
String name;
MyTask(long rt,String name) {
this.runningTime = rt;
this.name = name;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(runningTime-System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
return -1;
else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
return 1;
else // ==
return 0;
}
@Override
public String toString() {
return name + "--" + runningTime;
}
}
public static void main(String[] args) {
long now = System.currentTimeMillis();
MyTask t1 = new MyTask(now + 1000, "task1"); //1 s 后執(zhí)行 //②
MyTask t2 = new MyTask(now + 2000, "task2"); //2 s后執(zhí)行 //④
MyTask t3 = new MyTask(now + 1500, "task3"); //1.5s后執(zhí)行 //③
MyTask t4 = new MyTask(now + 500, "task4"); //0.5s后執(zhí)行 //①
MyTask t5 = new MyTask(now + 2500, "task5"); //2.5s后執(zhí)行 //⑤
tasks.put(t1);
tasks.put(t2);
tasks.put(t3);
tasks.put(t4);
tasks.put(t5);
System.out.println(tasks);
for (int i=0; i<5; i++) {
try {
System.out.println(tasks.take()); //按放進(jìn)去的順序拿出
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
5、 TransferQueue(接口)
消費(fèi)者先啟動(dòng)冤竹,生產(chǎn)者生產(chǎn)一個(gè)東西的時(shí)候拂封,不扔在隊(duì)列里,而是直接去找有沒有消費(fèi)者贴见,有的話直接扔給消費(fèi)者烘苹,若沒有消費(fèi)者線程,調(diào)用transfer()方法就會(huì)阻塞片部,調(diào)用add()镣衡、offer()霜定、put()方法不會(huì)阻塞。
LinkedTransferQueue為TransferQueue接口的實(shí)現(xiàn)類
public class TransferQueueTest {
public static void main(String[] args) throws InterruptedException {
LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
new Thread(()->{ //消費(fèi)者先啟動(dòng),可以拿走aaa
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
strs.transfer("aaa");
// strs.put("aaaa"); //add廊鸥、offer
// new Thread(()->{ //消費(fèi)者在生產(chǎn)者后啟動(dòng),拿不到aaa望浩,程序阻塞
// try {
// System.out.println(strs.take());
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }).start();
}
}
5、 SynchronousQueue
SynchronousQueue為一種特殊的TransferQueue惰说,實(shí)現(xiàn)了BlockingQueue接口磨德,生產(chǎn)的任何一個(gè)東西必須直接交給消費(fèi)者消費(fèi),不能擱在容器里吆视,容器的容量為0典挑。因此,調(diào)用add()會(huì)報(bào)錯(cuò)啦吧,offer()始終為false您觉,poll()始終為null。調(diào)用put()方法授滓,如果沒有消費(fèi)者琳水,則線程阻塞,該方法內(nèi)部調(diào)用了TransferQueue的transfer()方法般堆。
public class SynchronizeQueueTest {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> strs = new SynchronousQueue<>();
new Thread(()->{ //消費(fèi)者線程
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
strs.put("aaaa"); //不能調(diào)用add(報(bào)錯(cuò))在孝,add不進(jìn)去,put阻塞淮摔,等待消費(fèi)者消費(fèi)私沮,內(nèi)部調(diào)用的transfer.
System.out.println(strs.size()); //0
}
}