同步容器類:
Vector與ArrayList區(qū)別
1.ArrayList是最常用的List實(shí)現(xiàn)類征唬,內(nèi)部是通過數(shù)組實(shí)現(xiàn)的嗽冒,它允許對元素進(jìn)行快速隨機(jī)訪問憔辫。數(shù)組的缺點(diǎn)是每個(gè)元素之間不能有間隔尺上,當(dāng)數(shù)組大小不滿足時(shí)需要增加存儲(chǔ)能力蛉威,就要將已經(jīng)有數(shù)組的數(shù)據(jù)復(fù)制到新的存儲(chǔ)空間中日丹。當(dāng)從ArrayList的中間位置插入或者刪除元素時(shí),需要對數(shù)組進(jìn)行復(fù)制蚯嫌、移動(dòng)哲虾、代價(jià)比較高丙躏。因此,它適合隨機(jī)查找和遍歷束凑,不適合插入和刪除晒旅。
2.Vector與ArrayList一樣,也是通過數(shù)組實(shí)現(xiàn)的汪诉,不同的是它支持線程的同步废恋,即某一時(shí)刻只有一個(gè)線程能夠?qū)慥ector,避免多線程同時(shí)寫而引起的不一致性扒寄,但實(shí)現(xiàn)同步需要很高的花費(fèi)拴签,因此,訪問它比訪問ArrayList慢旗们。
注意: Vector線程安全蚓哩,底層對方法加上synchronized關(guān)鍵字、ArrayList線程不安全上渴,底層方法未加同步關(guān)鍵字岸梨。
Vector源碼類——Add方法源碼類
ArrayList源碼——Add方法源碼
HashTable與HashMap
1.HashMap不是線程安全的
HashMap是一個(gè)接口,是Map接口的子接口稠氮,是將鍵映射到值的對象曹阔,其中鍵和值都是對象,并且不能包含重復(fù)鍵隔披,但可以包含重復(fù)值赃份。HashMap允許null key和null value,而hashtable不允許奢米。
2.HashTable是線程安全的一個(gè)Collection抓韩。
3.HashMap是Hashtable的輕量級(jí)實(shí)現(xiàn)(非線程安全的實(shí)現(xiàn)),他們都完成了Map接口鬓长,主要區(qū)別在于HashMap允許空(null)鍵值(key),由于非線程安全谒拴,效率上可能高于HashTable。
HashMap允許將null作為一個(gè)entry的key或者value涉波,而HashTable不允許英上。
HashMap把HashTable的contains方法去掉了,改成containsValue和containsKey啤覆。
注意: HashTable線程安全苍日,HashMap線程不安全。
源碼分析:原理同Vector與ArrayList
synchronizedMap
Collections.synchronizedMap(hashMap)窗声; 將線程不安全的集合變?yōu)榫€程安全集合相恃。
源碼分析:
ConcurrentHashMap
ConcurrentMap接口下有兩個(gè)重要的實(shí)現(xiàn) :
1)ConcurrentHashMap
2)ConcurrentskipListMap (支持并發(fā)排序功能。彌補(bǔ)ConcurrentHashMap)
ConcurrentHashMap內(nèi)部使用段(Segment)來表示這些不同的部分嫌佑,每個(gè)段其實(shí)就是一個(gè)小的HashTable豆茫,它們有自己的鎖侨歉。
只要多個(gè)修改操作發(fā)生在不同的段上屋摇,它們就可以并發(fā)進(jìn)行揩魂。
把一個(gè)整體分成了16個(gè)段Segment,也就是最高支持16個(gè)線程的并發(fā)修改操作炮温。
這也是在多線程場景時(shí)減小鎖的粒度從而降低鎖競爭的一種方案火脉。
并且代碼中大多共享變量使用volatile關(guān)鍵字聲明,目的是第一時(shí)間獲取修改的內(nèi)容柒啤,性能非常好倦挂。
CountDownLatch
CountDownLatch類位于java.util.concurrent包下,利用它可以實(shí)現(xiàn)類似計(jì)數(shù)器的功能担巩。
CountDownLatch結(jié)果為0, 阻塞變?yōu)檫\(yùn)行狀態(tài)方援。
比如有一個(gè)任務(wù)A,它要等待其他4個(gè)任務(wù)執(zhí)行完畢之后才能執(zhí)行涛癌,此時(shí)就可以利用CountDownLatch來實(shí)現(xiàn)這種功能了犯戏。
public class Test002 {
public static void main(String[] args) throws InterruptedException {
System.out.println("等待子線程執(zhí)行完畢...");
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("子線程," + Thread.currentThread().getName() + "開始執(zhí)行...");
countDownLatch.countDown();// 每次減去1
System.out.println("子線程," + Thread.currentThread().getName() + "結(jié)束執(zhí)行...");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("子線程," + Thread.currentThread().getName() + "開始執(zhí)行...");
countDownLatch.countDown();
System.out.println("子線程," + Thread.currentThread().getName() + "結(jié)束執(zhí)行...");
}
}).start();
countDownLatch.await();// 調(diào)用當(dāng)前方法主線程阻塞 countDown結(jié)果為0, 阻塞變?yōu)檫\(yùn)行狀態(tài)
System.out.println("兩個(gè)子線程執(zhí)行完畢....");
System.out.println("繼續(xù)主線程執(zhí)行..");
}
}
CyclicBarrier
CyclicBarrier初始化時(shí)規(guī)定一個(gè)數(shù)目,然后計(jì)算調(diào)用了CyclicBarrier.await()進(jìn)入等待的線程數(shù)拳话。當(dāng)線程數(shù)達(dá)到了這個(gè)數(shù)目時(shí)先匪,所有進(jìn)入等待狀態(tài)的線程被喚醒并繼續(xù)。
CyclicBarrier就象它名字的意思一樣弃衍,可看成是個(gè)障礙呀非, 所有的線程必須到齊后才能一起通過這個(gè)障礙。
CyclicBarrier初始時(shí)還可帶一個(gè)Runnable的參數(shù)镜盯, 此Runnable任務(wù)在CyclicBarrier的數(shù)目達(dá)到后岸裙,所有其它線程被喚醒前被執(zhí)行。
class Writer extends Thread {
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier){
this.cyclicBarrier=cyclicBarrier;
}
@Override
public void run() {
System.out.println("線程" + Thread.currentThread().getName() + ",正在寫入數(shù)據(jù)");
try {
Thread.sleep(3000);
} catch (Exception e) {
// TODO: handle exception
}
System.out.println("線程" + Thread.currentThread().getName() + ",寫入數(shù)據(jù)成功.....");
try {
cyclicBarrier.await();
} catch (Exception e) {
}
System.out.println("所有線程執(zhí)行完畢..........");
}
}
public class Test001 {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier=new CyclicBarrier(5);
for (int i = 0; i < 5; i++) {
Writer writer = new Writer(cyclicBarrier);
writer.start();
}
}
}
Semaphore
Semaphore是一種基于計(jì)數(shù)的信號(hào)量速缆。它可以設(shè)定一個(gè)閾值哥桥,基于此,多個(gè)線程競爭獲取許可信號(hào)激涤,做自己的申請后歸還拟糕,超過閾值后,線程申請?jiān)S可信號(hào)將會(huì)被阻塞倦踢。Semaphore可以用來構(gòu)建一些對象池送滞,資源池之類的,比如數(shù)據(jù)庫連接池辱挥,我們也可以創(chuàng)建計(jì)數(shù)為1的Semaphore犁嗅,將其作為一種類似互斥鎖的機(jī)制,這也叫二元信號(hào)量晤碘,表示兩種互斥狀態(tài)褂微。它的用法如下:
wc.availablePermits(); //用來獲取當(dāng)前可用的資源數(shù)量
wc.acquire(); //申請資源
wc.release();// 釋放資源
-
代碼結(jié)構(gòu)
// 創(chuàng)建一個(gè)計(jì)數(shù)閾值為5的信號(hào)量對象 // 只能5個(gè)線程同時(shí)訪問 Semaphore semp = new Semaphore(5); try { // 申請?jiān)S可 semp.acquire(); try { // 業(yè)務(wù)邏輯 } catch (Exception e) { } finally { // 釋放許可 semp.release(); } } catch (InterruptedException e) { }
案例:
需求: 一個(gè)廁所只有3個(gè)坑位功蜓,但是有10個(gè)人來上廁所,那怎么辦宠蚂?假設(shè)10的人的編號(hào)分別為1-10式撼,并且1號(hào)先到廁所,10號(hào)最后到廁所求厕。那么1-3號(hào)來的時(shí)候必然有可用坑位著隆,順利如廁,4號(hào)來的時(shí)候需要看看前面3人是否有人出來了呀癣,如果有人出來美浦,進(jìn)去,否則等待项栏。同樣的道理浦辨,4-10號(hào)也需要等待正在上廁所的人出來后才能進(jìn)去,并且誰先進(jìn)去這得看等待的人是否有素質(zhì)沼沈,是否能遵守先來先上的規(guī)則流酬。
- 代碼:
class Parent implements Runnable {
private String name;
private Semaphore wc;
public Parent(String name,Semaphore wc){
this.name=name;
this.wc=wc;
}
@Override
public void run() {
try {
// 剩下的資源(剩下的茅坑)
int availablePermits = wc.availablePermits();
if (availablePermits > 0) {
System.out.println(name+"天主我也,我有茅坑了...");
} else {
System.out.println(name+"怎么沒有茅坑了...");
}
//申請茅坑 如果資源達(dá)到3次,就等待
wc.acquire();
System.out.println(name+"終于輪我上廁所了..爽啊");
Thread.sleep(new Random().nextInt(1000)); // 模擬上廁所時(shí)間庆冕。
System.out.println(name+"廁所上完了...");
} catch (Exception e) {
}finally{
wc.release();
}
}
}
public class TestSemaphore02 {
public static void main(String[] args) {
// 一個(gè)廁所只有3個(gè)坑位康吵,但是有10個(gè)人來上廁所,那怎么辦访递?假設(shè)10的人的編號(hào)分別為1-10晦嵌,并且1號(hào)先到廁所,10號(hào)最后到廁所拷姿。那么1-3號(hào)來的時(shí)候必然有可用坑位惭载,順利如廁,4號(hào)來的時(shí)候需要看看前面3人是否有人出來了响巢,如果有人出來描滔,進(jìn)去,否則等待踪古。同樣的道理含长,4-10號(hào)也需要等待正在上廁所的人出來后才能進(jìn)去,并且誰先進(jìn)去這得看等待的人是否有素質(zhì)伏穆,是否能遵守先來先上的規(guī)則拘泞。
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <=10; i++) {
Parent parent = new Parent("第"+i+"個(gè)人,",semaphore);
new Thread(parent).start();
}
}
}
并發(fā)隊(duì)列:
在并發(fā)隊(duì)列上JDK提供了兩套實(shí)現(xiàn):
一個(gè)是以ConcurrentLinkedQueue為代表的高性能隊(duì)列,
一個(gè)是以BlockingQueue接口為代表的阻塞隊(duì)列枕扫,
無論哪種都繼承自Queue陪腌。
ConcurrentLinkedDeque
ConcurrentLinkedQueue是一個(gè)適用于高并發(fā)場景下的隊(duì)列,通過無鎖的方式,實(shí)現(xiàn)了高并發(fā)狀態(tài)下的高性能诗鸭。通常ConcurrentLinkedQueue性能好于BlockingQueue染簇。它是一個(gè)基于鏈接節(jié)點(diǎn)的無界線程安全隊(duì)列,該隊(duì)列的元素遵循先進(jìn)先出的原則强岸。頭是最先加入的锻弓,尾是最近加入的,該隊(duì)列不允許null元素请唱。
ConcurrentLinkedQueue重要方法:
add() 和offer() 都是加入元素的方法(在ConcurrentLinkedQueue中這兩個(gè)方法沒有任何區(qū)別)
poll() 和peek() 都是取頭元素節(jié)點(diǎn)弥咪,區(qū)別在于前者會(huì)刪除元素(出隊(duì)列)过蹂,后者不會(huì)十绑。
- 代碼示例:
ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
//入隊(duì)列
q.offer("張三");
q.offer("李四");
//獲取總長度
System.out.println(q.size());
//從頭獲取元素,刪除該元素(出隊(duì)列)
System.out.println(q.poll());
//從頭獲取元素,不刪除該元素
System.out.println(q.peek());
//獲取總長度
System.out.println(q.size());
- 輸出:
2
張三
李四
1
BlockingQueue
阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列。
這兩個(gè)附加的操作是:
在隊(duì)列為空時(shí)酷勺,獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强铡?br>
當(dāng)隊(duì)列滿時(shí)本橙,存儲(chǔ)元素的線程會(huì)等待隊(duì)列可用。
阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場景脆诉,生產(chǎn)者是往隊(duì)列里添加元素的線程甚亭,消費(fèi)者是從隊(duì)列里拿元素的線程。阻塞隊(duì)列就是生產(chǎn)者存放元素的容器击胜,而消費(fèi)者也只從容器里拿元素亏狰。
BlockingQueue即阻塞隊(duì)列,從阻塞這個(gè)詞可以看出偶摔,在某些情況下對阻塞隊(duì)列的訪問可能會(huì)造成阻塞暇唾。
被阻塞的情況主要有如下兩種:
1)當(dāng)隊(duì)列滿了的時(shí)候進(jìn)行入隊(duì)列操作
2)當(dāng)隊(duì)列空了的時(shí)候進(jìn)行出隊(duì)列操作
因此,當(dāng)一個(gè)線程試圖對一個(gè)已經(jīng)滿了的隊(duì)列進(jìn)行入隊(duì)列操作時(shí)辰斋,它將會(huì)被阻塞策州,除非有另一個(gè)線程做了出隊(duì)列操作;同樣宫仗,當(dāng)一個(gè)線程試圖對一個(gè)空隊(duì)列進(jìn)行出隊(duì)列操作時(shí)够挂,它將會(huì)被阻塞,除非有另一個(gè)線程進(jìn)行了入隊(duì)列操作藕夫。
在Java中孽糖,BlockingQueue的接口位于java.util.concurrent 包中(在Java5版本開始提供),由上面介紹的阻塞隊(duì)列的特性可知毅贮,阻塞隊(duì)列是線程安全的办悟。
在新增的Concurrent包中,BlockingQueue很好的解決了多線程中嫩码,如何高效安全“傳輸”數(shù)據(jù)的問題誉尖。通過這些高效并且線程安全的隊(duì)列類,為我們快速搭建高質(zhì)量的多線程程序帶來極大的便利铸题。
常用的隊(duì)列主要有以下兩種:(當(dāng)然通過不同的實(shí)現(xiàn)方式铡恕,還可以延伸出很多不同類型的隊(duì)列琢感,DelayQueue就是其中的一種)
1)先進(jìn)先出(FIFO):先插入的隊(duì)列的元素也最先出隊(duì)列,類似于排隊(duì)的功能探熔。從某種程度上來說這種隊(duì)列也體現(xiàn)了一種公平性驹针。
2)后進(jìn)先出(LIFO):后插入隊(duì)列的元素最先出隊(duì)列,這種隊(duì)列優(yōu)先處理最近發(fā)生的事件诀艰。
多線程環(huán)境中柬甥,通過隊(duì)列可以很容易實(shí)現(xiàn)數(shù)據(jù)共享,比如經(jīng)典的“生產(chǎn)者”和“消費(fèi)者”模型中其垄,通過隊(duì)列可以很便利地實(shí)現(xiàn)兩者之間的數(shù)據(jù)共享苛蒲。假設(shè)我們有若干生產(chǎn)者線程,另外又有若干個(gè)消費(fèi)者線程绿满。如果生產(chǎn)者線程需要把準(zhǔn)備好的數(shù)據(jù)共享給消費(fèi)者線程臂外,利用隊(duì)列的方式來傳遞數(shù)據(jù),就可以很方便地解決他們之間的數(shù)據(jù)共享問題喇颁。但如果生產(chǎn)者和消費(fèi)者在某個(gè)時(shí)間段內(nèi)漏健,萬一發(fā)生數(shù)據(jù)處理速度不匹配的情況呢?理想情況下橘霎,如果生產(chǎn)者產(chǎn)出數(shù)據(jù)的速度大于消費(fèi)者消費(fèi)的速度蔫浆,并且當(dāng)生產(chǎn)出來的數(shù)據(jù)累積到一定程度的時(shí)候,那么生產(chǎn)者必須暫停等待一下(阻塞生產(chǎn)者線程)姐叁,以便等待消費(fèi)者線程把累積的數(shù)據(jù)處理完畢瓦盛,反之亦然。然而七蜘,在concurrent包發(fā)布以前谭溉,在多線程環(huán)境下,我們每個(gè)程序員都必須去自己控制這些細(xì)節(jié)橡卤,尤其還要兼顧效率和線程安全扮念,而這會(huì)給我們的程序帶來不小的復(fù)雜度。好在此時(shí)碧库,強(qiáng)大的concurrent包橫空出世了柜与,而它也給我們帶來了強(qiáng)大的BlockingQueue。(在多線程領(lǐng)域:所謂阻塞嵌灰,在某些情況下會(huì)掛起線程(即阻塞)弄匕,一旦條件滿足,被掛起的線程又會(huì)自動(dòng)被喚醒)
ArrayBlockingQueue
ArrayBlockingQueue是一個(gè)有邊界的阻塞隊(duì)列沽瞭,它的內(nèi)部實(shí)現(xiàn)是一個(gè)數(shù)組坛梁。有邊界的意思是它的容量是有限的垫挨,我們必須在其初始化的時(shí)候指定它的容量大小炸客,容量大小一旦指定就不可改變玛瘸。
ArrayBlockingQueue是以先進(jìn)先出的方式存儲(chǔ)數(shù)據(jù)公般,最新插入的對象是尾部,最新移出的對象是頭部。
下面是一個(gè)初始化和使用ArrayBlockingQueue的例子:
ArrayBlockingQueue<String> arrays = new ArrayBlockingQueue<String>(3);
arrays.add("李四");
arrays.add("張軍");
arrays.add("張軍");
// 添加阻塞隊(duì)列
arrays.offer("張三", 1, TimeUnit.SECONDS);
LinkedBlockingQueue
LinkedBlockingQueue阻塞隊(duì)列大小的配置是可選的,如果我們初始化時(shí)指定一個(gè)大小枝缔,它就是有邊界的,如果不指定蚊惯,它就是無邊界的愿卸。說是無邊界,其實(shí)是采用了默認(rèn)大小為Integer.MAX_VALUE的容量 截型。它的內(nèi)部實(shí)現(xiàn)是一個(gè)鏈表趴荸。
和ArrayBlockingQueue一樣,LinkedBlockingQueue 也是以先進(jìn)先出的方式存儲(chǔ)數(shù)據(jù)菠劝,最新插入的對象是尾部赊舶,最新移出的對象是頭部睁搭。
下面是一個(gè)初始化和使LinkedBlockingQueue的例子:
LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3);
linkedBlockingQueue.add("張三");
linkedBlockingQueue.add("李四");
linkedBlockingQueue.add("李四");
System.out.println(linkedBlockingQueue.size());
PriorityBlockingQueue
PriorityBlockingQueue是一個(gè)沒有邊界的隊(duì)列赶诊,它的排序規(guī)則和java.util.PriorityQueue一樣。需要注意园骆,PriorityBlockingQueue中允許插入null對象舔痪。所有插入PriorityBlockingQueue的對象必須實(shí)現(xiàn) java.lang.Comparable接口,隊(duì)列優(yōu)先級(jí)的排序規(guī)則就是按照我們對這個(gè)接口的實(shí)現(xiàn)來定義的锌唾。另外锄码,我們可以從PriorityBlockingQueue獲得一個(gè)迭代器Iterator,但這個(gè)迭代器并不保證按照優(yōu)先級(jí)順序進(jìn)行迭代晌涕。
SynchronousQueue
SynchronousQueue隊(duì)列內(nèi)部僅允許容納一個(gè)元素滋捶。當(dāng)一個(gè)線程插入一個(gè)元素后會(huì)被阻塞,除非這個(gè)元素被另一個(gè)線程消費(fèi)余黎。
使用BlockingQueue模擬生產(chǎn)者與消費(fèi)者
class Producer extends Thread{
private BlockingQueue queue;
private volatile boolean flag=true;
private static AtomicInteger count=new AtomicInteger();
public Producer(BlockingQueue queue){
this.queue=queue;
}
@Override
public void run() {
System.out.println(getName()+"生產(chǎn)者線程啟動(dòng)...");
try {
while (flag){
System.out.println(getName()+"生產(chǎn)者開始生產(chǎn)消息...");
//如果flag為true重窟,queue就入隊(duì)列。(原子類進(jìn)行計(jì)數(shù))
Integer i = count.incrementAndGet();
boolean offer = queue.offer(i);
if(offer){
System.out.println(getName()+"生產(chǎn)者生產(chǎn)生產(chǎn)消息:"+i+"成功");
}else {
System.out.println(getName()+"生產(chǎn)者生產(chǎn)生產(chǎn)消息:"+i+"失敗");
}
Thread.sleep(1000);
}
}catch (Exception e){
}finally {
System.out.println(getName()+"生產(chǎn)者線程停止...");
}
}
public void stopThread(){
this.flag=false;
}
}
class Consumer extends Thread{
private BlockingQueue queue;
private volatile boolean flag=true;
public Consumer(BlockingQueue queue){
this.queue=queue;
}
@Override
public void run() {
System.out.println(getName()+"消費(fèi)者線程啟動(dòng)...");
try {
while (flag){
System.out.println(getName()+"消費(fèi)者開始消費(fèi)消息...");
//如果flag為true惧财,queue就出隊(duì)列
Integer poll = (Integer) queue.poll(2, TimeUnit.SECONDS);
if(poll != null){
System.out.println(getName()+"消費(fèi)者獲取消息:"+poll+"成功");
}else {
System.out.println(getName()+"消費(fèi)者獲取消息:"+poll+"失敗");
this.flag=false;
}
}
}catch (Exception e){
}finally {
System.out.println(getName()+"消費(fèi)者線程停止...");
}
}
}
public class ProduceConsumerThread {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);
Producer p1 =new Producer(queue);
Producer p2 =new Producer(queue);
Consumer c1 =new Consumer(queue);
p1.start();
p2.start();
c1.start();
Thread.sleep(3*1000);
p1.stopThread();
p2.stopThread();
}
}