并發(fā)包
同步容器類
Vector與ArrayList區(qū)別
- ArrayList是最常用的List實(shí)現(xiàn)類芍瑞,內(nèi)部是通過數(shù)組實(shí)現(xiàn)的锭魔,它允許對元素進(jìn)行快速隨機(jī)訪問骤视。數(shù)組的缺點(diǎn)是每個(gè)元素之間不能有間隔弛作,當(dāng)數(shù)組大小不滿足時(shí)需要增加存儲能力举娩,就要講已經(jīng)有數(shù)組的數(shù)據(jù)復(fù)制到新的存儲空間中锐墙。當(dāng)從ArrayList的中間位置插入或者刪除元素時(shí)礁哄,需要對數(shù)組進(jìn)行復(fù)制、移動贮匕、代價(jià)比較高姐仅。因此,它適合隨機(jī)查找和遍歷,不適合插入和刪除掏膏。
- Vector與ArrayList一樣劳翰,也是通過數(shù)組實(shí)現(xiàn)的,不同的是它支持線程的同步馒疹,即某一時(shí)刻只有一個(gè)線程能夠?qū)慥ector佳簸,避免多線程同時(shí)寫而引起的不一致性,但實(shí)現(xiàn)同步需要很高的花費(fèi)颖变,因此生均,訪問它比訪問ArrayList慢
注意: Vector線程安全、ArrayList線程不安全
Vector的add方法
ArrayList的add方法
Vector的get方法
ArrayList的get方法
HashTable與HashMap
- HashMap不是線程安全的
HastMap是一個(gè)接口是map接口的子接口腥刹,是將鍵映射到值的對象马胧,其中鍵和值都是對象,并且不能包含重復(fù)鍵衔峰,但可以包含重復(fù)值佩脊。HashMap允許null key和null value,而Hashtable不允許垫卤。 - HashTable是線程安全的一個(gè)Collection威彰。
- HashMap是Hashtable的輕量級實(shí)現(xiàn)(非線程安全的實(shí)現(xiàn)),他們都完成了Map接口穴肘,主要區(qū)別在于HashMap允許空)null)鍵值(key),由于非線程安全歇盼,效率上能高于Hashtable。
- HashMap允許將null作為一個(gè)entry的key或者value评抚,而Hashtable不允許豹缀。
HashMap把Hashtable的contains方法去掉了,改成containsvalue和containsKey盈咳。
注意: HashTable線程安全耿眉,HashMap線程不安全。
Hashtable存在contains方法
HashMap不存在contains方法
Hashtable的get方法
HashMap的get方法
Collections提供了將線程不安全的集合轉(zhuǎn)換成線程安全的集合的方法
將HashMap轉(zhuǎn)換成同步的HashMap
ConcurrentHashMap(JDK1.7)
- ConcurrentMap接口下有倆個(gè)重要的實(shí)現(xiàn) :
ConcurrentHashMap
ConcurrentskipListMap (支持并發(fā)排序功能鱼响。彌補(bǔ)ConcurrentHashMap) - ConcurrentHashMap內(nèi)部使用段(Segment)來表示這些不同的部分鸣剪,每個(gè)段其實(shí)就是一個(gè)小的HashTable,它們有自己的鎖。只要多個(gè)修改操作發(fā)生在不同的段上丈积,它們就可以并發(fā)進(jìn)行筐骇。把一個(gè)整體分成了16個(gè)段(Segment.也就是最高支持16個(gè)線程的并發(fā)修改操作。
- 這也是在重線程場景時(shí)減小鎖的粒度從而降低鎖競爭的一種方案江滨。并且代碼中大多共享變量使用volatile關(guān)鍵字聲明铛纬,目的是第一時(shí)間獲取修改的內(nèi)容,性能非常好唬滑。
分段鎖怎么分段告唆。
將一個(gè)整體拆分成多個(gè)小的HasTable,默認(rèn)分成16段棺弊。
CountDownLatch
CountDownLatch類位于java.util.concurrent包下,利用它可以實(shí)現(xiàn)類似計(jì)數(shù)器的功能擒悬。比如有一個(gè)任務(wù)A模她,它要等待其他4個(gè)任務(wù)執(zhí)行完畢之后才能執(zhí)行,此時(shí)就可以利用CountDownLatch來實(shí)現(xiàn)這種功能了懂牧。
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
System.out.println("等待子線程執(zhí)行完畢...");
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(() -> {
System.out.println("子線程," + Thread.currentThread().getName() + "開始執(zhí)行...");
countDownLatch.countDown();// 每次減去1
System.out.println("子線程," + Thread.currentThread().getName() + "結(jié)束執(zhí)行...");
}).start();
new Thread(() -> {
System.out.println("子線程," + Thread.currentThread().getName() + "開始執(zhí)行...");
countDownLatch.countDown();
System.out.println("子線程," + Thread.currentThread().getName() + "結(jié)束執(zhí)行...");
}).start();
countDownLatch.await();// 調(diào)用當(dāng)前方法主線程阻塞 countDown結(jié)果為0, 阻塞變?yōu)檫\(yùn)行狀態(tài)
System.out.println("兩個(gè)子線程執(zhí)行完畢....");
System.out.println("繼續(xù)主線程執(zhí)行..");
}
}
運(yùn)行結(jié)果
CyclicBarrier
- CyclicBarrier初始化時(shí)規(guī)定一個(gè)數(shù)目侈净,然后計(jì)算調(diào)用了
CyclicBarrier.await()
進(jìn)入等待的線程數(shù)。當(dāng)線程數(shù)達(dá)到了這個(gè)數(shù)目時(shí)僧凤,所有進(jìn)入等待狀態(tài)的線程被喚醒并繼續(xù)畜侦。 - CyclicBarrier就象它名字的意思一樣,可看成是個(gè)障礙躯保, 所有的線程必須到齊后才能一起通過這個(gè)障礙旋膳。
- CyclicBarrier初始時(shí)還可帶一個(gè)Runnable的參數(shù), 此Runnable任務(wù)在CyclicBarrier的數(shù)目達(dá)到后途事,所有其它線程被喚醒前被執(zhí)行溺忧。
需求:5個(gè)線程寫入數(shù)據(jù)完畢后才打印所有線程執(zhí)行完畢
import java.util.concurrent.CyclicBarrier;
class Writer extends Thread {
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("線程" + Thread.currentThread().getName() + ",正在寫入數(shù)據(jù)");
try {
Thread.sleep(2000);
} catch (Exception e) {
}
System.out.println("線程" + Thread.currentThread().getName() + ",寫入數(shù)據(jù)成功.....");
try {
cyclicBarrier.await();
} catch (Exception e) {
}
System.out.println("所有線程執(zhí)行完畢..........");
}
}
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
for (int i = 0; i < 5; i++) {
Writer writer = new Writer(cyclicBarrier);
writer.start();
}
}
}
運(yùn)行結(jié)果
Semaphore
Semaphore是一種基于計(jì)數(shù)的信號量。它可以設(shè)定一個(gè)閾值盯孙,基于此閾值多個(gè)線程競爭獲取許可信號,做自己的申請后歸還祟滴,超過閾值后振惰,線程申請?jiān)S可信號將會被阻塞。
Semaphore可以用來構(gòu)建一些對象池垄懂,資源池之類的骑晶,比如數(shù)據(jù)庫連接池,我們也可以創(chuàng)建計(jì)數(shù)為1的Semaphore草慧,將其作為一種類似互斥鎖的機(jī)制桶蛔,這也叫二元信號量,表示兩種互斥狀態(tài)漫谷。
- 用法如下:
availablePermits函數(shù)用來獲取當(dāng)前可用的資源數(shù)量
wc.acquire(); //申請資源
wc.release();// 釋放資源
需求
一個(gè)廁所只有3個(gè)坑位仔雷,但是有10個(gè)人來上廁所,那怎么辦舔示?
假設(shè)10的人的編號分別為1-10碟婆,并且1號先到廁所,10號最后到廁所惕稻。那么1-3號來的時(shí)候必然有可用坑位竖共,順利如廁,4號來的時(shí)候需要看看前面3人是否有人出來了俺祠,如果有人出來公给,進(jìn)去借帘,否則等待。同樣的道理淌铐,4-10號也遵守先來先上的規(guī)則肺然。
import java.util.Random;
import java.util.concurrent.Semaphore;
class Parent implements Runnable {
private String name;
private Semaphore wc;
public Parent(String name, Semaphore wc) {
this.name = name;
this.wc = wc;
}
@Override
public void run() {
try {
// 剩下的資源(剩下的茅坑)
int availablePermits = wc.availablePermits();
if (availablePermits > 0) {
System.out.println(name + "有坑了...");
} else {
System.out.println(name + "怎么沒有坑了...");
}
//申請茅坑 如果資源達(dá)到3次,就等待
wc.acquire();
System.out.println(name + "終于輪我上廁所了..");
Thread.sleep(new Random().nextInt(1000)); // 模擬上廁所時(shí)間匣沼。
System.out.println(name + "廁所上完了...");
wc.release();
} catch (Exception e) {
}
}
}
public class SemaphoreDemo2 {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 10; i++) {
Parent parent = new Parent("第" + i + "個(gè)人,", semaphore);
new Thread(parent).start();
}
}
}
并發(fā)隊(duì)列
在并發(fā)隊(duì)列上JDK提供了兩套實(shí)現(xiàn)加叁,一個(gè)是以ConcurrentLinkedQueue為代表的高性能隊(duì)列,一個(gè)是以BlockingQueue接口為代表的阻塞隊(duì)列唇撬,無論哪種都繼承自Queue它匕。
ConcurrentLinkedDeque
ConcurrentLinkedQueue : 是一個(gè)適用于高并發(fā)場景下的隊(duì)列,通過無鎖的方式窖认,實(shí)現(xiàn)
了高并發(fā)狀態(tài)下的高性能豫柬,通常ConcurrentLinkedQueue性能好于BlockingQueue.它
是一個(gè)基于鏈接節(jié)點(diǎn)的無界線程安全隊(duì)列。該隊(duì)列的元素遵循先進(jìn)先出的原則扑浸。頭是最先加入的烧给,尾是最近加入的,該隊(duì)列不允許null元素喝噪。
ConcurrentLinkedQueue重要方法:
add()
和offer()
都是加入元素的方法(在ConcurrentLinkedQueue中這倆個(gè)方法沒有任何區(qū)別)poll()
和peek()
都是取頭元素節(jié)點(diǎn)直撤,區(qū)別在于前者會刪除元素骇陈,后者不會。
import java.util.concurrent.ConcurrentLinkedDeque;
public class ConcurrentLinkedDequeDemo {
public static void main(String[] args) {
ConcurrentLinkedDeque q = new ConcurrentLinkedDeque();
q.offer("zhangsan");
q.offer("lisi");
q.offer("wangwu");
q.offer("123");
q.offer("456");
//從頭獲取元素,刪除該元素
System.out.println(q.poll());
//從頭獲取元素,不刪除該元素
System.out.println(q.peek());
//獲取總長度
System.out.println(q.size());
}
}
BlockingQueue
- 阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列。這兩個(gè)附加的操作是:
在隊(duì)列空時(shí)零聚,獲取元素的線程會等待隊(duì)列變?yōu)榉强铡?br> 當(dāng)隊(duì)列滿時(shí)卦洽,存儲元素的線程會等待隊(duì)列變?yōu)榭捎谩?/li> - 阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場景嗅战,生產(chǎn)者是往隊(duì)列里添加元素的線程杨何,消費(fèi)者是從隊(duì)列里拿元素的線程。阻塞隊(duì)列就是生產(chǎn)者存放元素的容器哩陕,而消費(fèi)者也只從容器里拿元素平项。
BlockingQueue詳解
BlockingQueue即阻塞隊(duì)列,從阻塞這個(gè)詞可以看出悍及,在某些情況下對阻塞隊(duì)列的訪問可能會造成阻塞葵礼。被阻塞的情況主要有如下兩種:
- 當(dāng)隊(duì)列滿了的時(shí)候進(jìn)行入隊(duì)列操作
- 當(dāng)隊(duì)列空了的時(shí)候進(jìn)行出隊(duì)列操作
因此,當(dāng)一個(gè)線程試圖對一個(gè)已經(jīng)滿了的隊(duì)列進(jìn)行入隊(duì)列操作時(shí)并鸵,它將會被阻塞鸳粉,除非有另一個(gè)線程做了出隊(duì)列操作;同樣园担,當(dāng)一個(gè)線程試圖對一個(gè)空隊(duì)列進(jìn)行出隊(duì)列操作時(shí)届谈,它將會被阻塞枯夜,除非有另一個(gè)線程進(jìn)行了入隊(duì)列操作。
在Java中艰山,BlockingQueue的接口位于java.util.concurrent 包中(在Java5版本開始提供)湖雹,由上面介紹的阻塞隊(duì)列的特性可知,阻塞隊(duì)列是線程安全的曙搬。
在新增的Concurrent包中摔吏,BlockingQueue很好的解決了多線程中,如何高效安全“傳輸”數(shù)據(jù)的問題纵装。通過這些高效并且線程安全的隊(duì)列類征讲,為我們快速搭建高質(zhì)量的多線程程序帶來極大的便利。本文詳細(xì)介紹了BlockingQueue家庭中的所有成員橡娄,包括他們各自的功能以及常見使用場景诗箍。
常用的隊(duì)列主要有以下兩種:(當(dāng)然通過不同的實(shí)現(xiàn)方式,還可以延伸出很多不同類型的隊(duì)列挽唉,DelayQueue就是其中的一種)
- 先進(jìn)先出(FIFO):先插入的隊(duì)列的元素也最先出隊(duì)列滤祖,類似于排隊(duì)的功能。從某種程度上來說這種隊(duì)列也體現(xiàn)了一種公平性瓶籽。
- 后進(jìn)先出(LIFO):后插入隊(duì)列的元素最先出隊(duì)列匠童,這種隊(duì)列優(yōu)先處理最近發(fā)生的事件。
多線程環(huán)境中塑顺,通過隊(duì)列可以很容易實(shí)現(xiàn)數(shù)據(jù)共享俏让,比如經(jīng)典的“生產(chǎn)者”和“消費(fèi)者”模型中,通過隊(duì)列可以很便利地實(shí)現(xiàn)兩者之間的數(shù)據(jù)共享茬暇。假設(shè)我們有若干生產(chǎn)者線程,另外又有若干個(gè)消費(fèi)者線程寡喝。如果生產(chǎn)者線程需要把準(zhǔn)備好的數(shù)據(jù)共享給消費(fèi)者線程糙俗,利用隊(duì)列的方式來傳遞數(shù)據(jù),就可以很方便地解決他們之間的數(shù)據(jù)共享問題预鬓。但如果生產(chǎn)者和消費(fèi)者在某個(gè)時(shí)間段內(nèi)巧骚,萬一發(fā)生數(shù)據(jù)處理速度不匹配的情況呢?
理想情況下格二,如果生產(chǎn)者產(chǎn)出數(shù)據(jù)的速度大于消費(fèi)者消費(fèi)的速度劈彪,并且當(dāng)生產(chǎn)出來的數(shù)據(jù)累積到一定程度的時(shí)候,那么生產(chǎn)者必須暫停等待一下(阻塞生產(chǎn)者線程)顶猜,以便等待消費(fèi)者線程把累積的數(shù)據(jù)處理完畢沧奴,反之亦然。然而长窄,在concurrent包發(fā)布以前滔吠,在多線程環(huán)境下纲菌,我們每個(gè)程序員都必須去自己控制這些細(xì)節(jié),尤其還要兼顧效率和線程安全疮绷,而這會給我們的程序帶來不小的復(fù)雜度翰舌。好在此時(shí),強(qiáng)大的concurrent包橫空出世了冬骚,而他也給我們帶來了強(qiáng)大的BlockingQueue椅贱。(在多線程領(lǐng)域:所謂阻塞,在某些情況下會掛起線程(即阻塞)只冻,一旦條件滿足庇麦,被掛起的線程又會自動被喚醒)
ArrayBlockingQueue
ArrayBlockingQueue是一個(gè)有邊界的阻塞隊(duì)列,它的內(nèi)部實(shí)現(xiàn)是一個(gè)數(shù)組属愤。有邊界的意思是它的容量是有限的女器,我們必須在其初始化的時(shí)候指定它的容量大小,容量大小一旦指定就不可改變住诸。
ArrayBlockingQueue是以先進(jìn)先出的方式存儲數(shù)據(jù)驾胆,最新插入的對象是尾部,最新移出的對象是頭部贱呐。
例子:
import java.util.concurrent.TimeUnit;
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<String> arrays = new ArrayBlockingQueue<String>(3);
arrays.add("李四");
arrays.add("張軍");
arrays.add("張軍");
// 添加阻塞隊(duì)列
boolean offer = arrays.offer("張三", 1, TimeUnit.SECONDS);
System.out.println(offer);
System.out.println(arrays.size());
}
}
運(yùn)行結(jié)果
LinkedBlockingQueue
LinkedBlockingQueue阻塞隊(duì)列大小的配置是可選的丧诺,如果我們初始化時(shí)指定一個(gè)大小,它就是有邊界的奄薇,如果不指定驳阎,它就是無邊界的。說是無邊界馁蒂,其實(shí)是采用了默認(rèn)大小為Integer.MAX_VALUE的容量 呵晚。它的內(nèi)部實(shí)現(xiàn)是一個(gè)鏈表。
和ArrayBlockingQueue一樣沫屡,LinkedBlockingQueue 也是以先進(jìn)先出的方式存儲數(shù)據(jù)饵隙,最新插入的對象是尾部,最新移出的對象是頭部沮脖。
例子:
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueDemo2 {
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3);
linkedBlockingQueue.add("張三");
linkedBlockingQueue.add("李四");
linkedBlockingQueue.add("李四");
System.out.println(linkedBlockingQueue.size());
}
}
運(yùn)行結(jié)果
3
PriorityBlockingQueue
- PriorityBlockingQueue是一個(gè)沒有邊界的隊(duì)列金矛,它的排序規(guī)則
java.util.PriorityQueue
一樣。需要注意勺届,PriorityBlockingQueue中允許插入null
對象驶俊。 - 所有插入PriorityBlockingQueue的對象必須實(shí)現(xiàn)
java.lang.Comparable
接口,隊(duì)列優(yōu)先級的排序規(guī)則就是按照我們對這個(gè)接口的實(shí)現(xiàn)來定義的免姿。 - 另外饼酿,我們可以從PriorityBlockingQueue獲得一個(gè)迭代器
Iterator
,但這個(gè)迭代器并不保證按照優(yōu)先級順序進(jìn)行迭代胚膊。
SynchronousQueue
SynchronousQueue隊(duì)列內(nèi)部僅允許容納一個(gè)元素嗜湃。當(dāng)一個(gè)線程插入一個(gè)元素后會被阻塞奈应,除非這個(gè)元素被另一個(gè)線程消費(fèi)。
使用BlockingQueue模擬生產(chǎn)者與消費(fèi)者
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
class ProducerThread implements Runnable {
private BlockingQueue queue;
private volatile boolean flag = true;
private static AtomicInteger count = new AtomicInteger();
public ProducerThread(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
System.out.println("生產(chǎn)線程啟動...");
while (flag) {
System.out.println("正在生產(chǎn)數(shù)據(jù)....");
String data = count.incrementAndGet() + "";
// 將數(shù)據(jù)存入隊(duì)列中
boolean offer = queue.offer(data, 2, TimeUnit.SECONDS);
if (offer) {
System.out.println("生產(chǎn)者,存入" + data + "到隊(duì)列中,成功.");
} else {
System.out.println("生產(chǎn)者,存入" + data + "到隊(duì)列中,失敗.");
}
Thread.sleep(1000);
}
} catch (Exception e) {
} finally {
System.out.println("生產(chǎn)者退出線程");
}
}
public void stop() {
this.flag = false;
}
}
class ConsumerThread implements Runnable {
private BlockingQueue<String> queue;
private volatile boolean flag = true;
public ConsumerThread(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("消費(fèi)線程啟動...");
try {
while (flag) {
System.out.println("消費(fèi)者,正在從隊(duì)列中獲取數(shù)據(jù)..");
String data = queue.poll(2, TimeUnit.SECONDS);
if (data != null) {
System.out.println("消費(fèi)者,拿到隊(duì)列中的數(shù)據(jù)data:" + data);
Thread.sleep(1000);
} else {
System.out.println("消費(fèi)者,超過2秒未獲取到數(shù)據(jù)..");
flag = false;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("消費(fèi)者退出線程...");
}
}
}
public class ProducerAndComsumer {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new LinkedBlockingQueue<>(10);
ProducerThread producerThread1 = new ProducerThread(queue);
ConsumerThread consumerThread1 = new ConsumerThread(queue);
Thread t1 = new Thread(producerThread1);
Thread c1 = new Thread(consumerThread1);
t1.start();
c1.start();
// 執(zhí)行10s
Thread.sleep(10 * 1000);
producerThread1.stop();
}
}
運(yùn)行結(jié)果
BlockingQueue
的阻塞是讓生產(chǎn)消費(fèi)有序的關(guān)鍵