我們已經(jīng)看到了形成Java并發(fā)程序設(shè)計(jì)基礎(chǔ)的底層構(gòu)建塊作媚。然而穷躁,對(duì)于實(shí)際編程來(lái)說(shuō)祠挫,應(yīng)該盡可能遠(yuǎn)離底層結(jié)構(gòu)。使用由并發(fā)處理的專(zhuān)業(yè)人士實(shí)現(xiàn)的較高層次的結(jié)構(gòu)要方便得多酝掩、安全得多。
對(duì)于許多線程問(wèn)題眷柔,可以通過(guò)使用一個(gè)或多個(gè)隊(duì)列以優(yōu)雅且安全的方式將其形式化期虾。生產(chǎn)者線程向隊(duì)列插入元素,消費(fèi)者線程則取出他媽闯割。使用隊(duì)列彻消,可以安全地從一個(gè)線程向另一個(gè)線程傳遞數(shù)據(jù)。例如宙拉,考慮銀行轉(zhuǎn)賬程序宾尚,轉(zhuǎn)賬線程將轉(zhuǎn)賬指令對(duì)象插入到一個(gè)隊(duì)列中,而不是直接訪問(wèn)銀行對(duì)象谢澈。另一個(gè)線程從隊(duì)列中取出指令轉(zhuǎn)賬煌贴。只有該線程可以訪問(wèn)該銀行對(duì)象的內(nèi)部。因此不需要同步锥忿。(當(dāng)然牛郑,線程安全的隊(duì)列類(lèi)的實(shí)現(xiàn)者不能不考慮鎖和條件,但是敬鬓,那是他們的問(wèn)題而不是你的問(wèn)題淹朋。)
一笙各、阻塞隊(duì)列方法
方法 | 正常動(dòng)作 | 特殊情況下的動(dòng)作 |
---|---|---|
put | 添加一個(gè)元素 | 如果隊(duì)列滿,則阻塞 |
take | 移出并返回頭元素 | 如果隊(duì)列空础芍,則阻塞 |
add | 添加一個(gè)元素 | 如果隊(duì)列滿杈抢,拋出IllegalStateException |
remove | 移出并返回頭元素 | 如果隊(duì)列空,則拋出NoSuchElementException |
element | 返回隊(duì)列頭元素 | 如果隊(duì)列空仑性,拋出NoSuchElementException |
offer | 添加一個(gè)元素并返回true | 如果隊(duì)列滿惶楼,返回false |
poll | 移出并返回隊(duì)列的頭元素 | 如果隊(duì)列空,返回null |
peek | 返回隊(duì)列的頭元素 | 如果隊(duì)列空诊杆,返回null |
阻塞隊(duì)列方法使用上分下面三類(lèi):
1 當(dāng)將隊(duì)列當(dāng)做線程管理工具來(lái)使用時(shí)歼捐,將要用到put和take方法。
2 當(dāng)視圖向滿的隊(duì)列中添加或從空的隊(duì)列中移出元素時(shí)晨汹,add豹储、remove和element拋出異常。
3 當(dāng)然淘这,在一個(gè)多線程程序中颂翼,隊(duì)列會(huì)在任何時(shí)空或滿,因此慨灭,可以用offer、poll和peek代替球及。
二氧骤、java.util.concurrent包
java.util.concurrent包提供了阻塞隊(duì)列的幾個(gè)變種:
LinkedBlockingQueue:鏈表結(jié)構(gòu)組成的無(wú)界(可指定容量)阻塞隊(duì)列
ArrayBlockingQueue:數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列
PriorityBlockingQueue:無(wú)界優(yōu)先級(jí)隊(duì)列,非FIFO隊(duì)列吃引,元素按照優(yōu)先級(jí)順序被移出
DelayQueue:無(wú)界阻塞隊(duì)列筹陵,只有延遲期滿時(shí)才能提取元素
SynchronousQueue:阻塞隊(duì)列,插入操作必須等待另一線程的移除操作 镊尺,反之亦然
LinkedBlockingDeque:鏈表結(jié)構(gòu)組成的無(wú)界(可指定容量)雙端阻塞隊(duì)列
絕大部分場(chǎng)景下朦佩,我們只要使用ArrayBlockingQueue或LinkedBlockingQueue就夠了。
示例程序展示了如何使用阻塞隊(duì)列來(lái)控制一組線程庐氮。程序在一個(gè)目錄及它的所有子目錄下搜索所有文件语稠,打印出包含指定關(guān)鍵字的行:
public class BlockingQueueTest {
private static final int FILE_QUEUE_SIZE = 10;
private static final int SEARCH_THREADS = 100;
private static final File DUMMY = new File("");
private static BlockingQueue<File> queue = new ArrayBlockingQueue<>(FILE_QUEUE_SIZE);
public static void main(String[] args) {
try (Scanner in = new Scanner(System.in)) {
System.out.println("Enter base directory (e.g. /opt/jdk1.8.0/src): ");
String directory = in.nextLine();
System.out.println("Enter keyword (e.g. volatile): ");
String keyword = in.nextLine();
Runnable enumerator = () -> {
try {
enumerate(new File(directory));
queue.put(DUMMY);
} catch (InterruptedException e) {
}
};
new Thread(enumerator).start();
for(int i = 1; i <= SEARCH_THREADS; i++) {
Runnable searcher = () -> {
try {
boolean done = false;
while (!done) {
File file = queue.take();
if(file == DUMMY) {
queue.put(file);
done = true;
} else {
search(file, keyword);
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
}
};
new Thread(searcher).start();
}
}
}
// 遞歸地枚舉給定目錄及其子目錄中的所有文件
public static void enumerate(File directory) throws InterruptedException {
File[] files = directory.listFiles();
for (File file : files) {
if (file.isDirectory()) enumerate(file);
else queue.put(file);
}
}
// 搜索一個(gè)給定關(guān)鍵字的文件,并打印出所有匹配的行
public static void search(File file, String keyword) throws IOException {
try (Scanner in = new Scanner(file, "UTF-8")) {
int lineNumber = 0;
while (in.hasNextLine()) {
lineNumber++;
String line = in.nextLine();
if (line.contains(keyword))
System.out.printf("%s:%d:%n", file.getPath(), lineNumber, line);
}
}
}
}
生產(chǎn)者線程枚舉所有子目錄下的所有文件并把它們放到一個(gè)阻塞隊(duì)列中弄砍。同時(shí)啟動(dòng)大量消費(fèi)者線程仙畦,每個(gè)消費(fèi)者線程從隊(duì)列取出一個(gè)文件,打開(kāi)它音婶,打印所有包含該關(guān)鍵字的行慨畸,然后取出下一個(gè)文件。我們使用一個(gè)小技巧在工作結(jié)束后終止這個(gè)應(yīng)用程序衣式。為了發(fā)出完成信號(hào)寸士,枚舉線程放置一個(gè)虛擬對(duì)象到隊(duì)列中(這就像在行李輸送帶上放著一個(gè)寫(xiě)著“最后一個(gè)包”的虛擬包)檐什。當(dāng)搜索線程取到這個(gè)虛擬對(duì)象時(shí),將其放回并終止弱卡。
注意乃正,不需要顯示的線程同步。在這個(gè)示例中谐宙,已使用了阻塞隊(duì)列作為一種同步機(jī)制烫葬。