1. 概念和特性
很多線程問題可以使用一個或多個隊列優(yōu)雅而安全地解決惭婿。
比如說不恭,生產(chǎn)者線程向隊列插入元素,消費者線程負責獲取元素财饥。
利用這種方式换吧,我們可以安全地從一個線程向另一個線程傳遞參數(shù)。
阻塞隊列(BlockingQueue)是協(xié)調(diào)多個線程之間合作的有用工具钥星。
當試圖向阻塞隊列添加元素而隊列已滿沾瓦,或者從隊列移出元素而隊列為空的時候,將導致線程阻塞谦炒。
阻塞隊列的應用場景:
工作線程可以周期性地將中間結果存儲在阻塞隊列中贯莺。
其它工作線程移除中間結果襟衰,并進一步進行修改票唆。
這樣,隊列可以自動地平衡負載秀存。
?
2. BlockingQueue的方法
方法 | 動作 | 特殊情況 |
---|---|---|
add | 添加一個元素 | 如果隊列滿透且,拋出IllegalStateException |
element | 返回隊頭元素 | 如果對列空撕蔼,拋出NoSuchElementException |
offer | 添加一個元素并返回true | 如果隊列滿,則返回false |
peek | 返回隊頭元素 | 如果隊列空秽誊,則返回null |
poll | 移除并返回隊頭元素 | 如果隊列空鲸沮,則返回null |
put | 添加一個元素 | 如果隊列滿,則阻塞 |
remove | 移除并返回隊頭元素 | 如果對列空锅论,拋出NoSuchElementException |
take | 移除并返回隊頭元素 | 如果對列空讼溺,則阻塞 |
offer和poll方法還可以有超時時間的參數(shù)
// 在100毫秒的時間內(nèi)向隊尾插入一個元素;超時返回false
boolean success = q.offer(x, 100, TimeUnit.MILLISECONDS);
// 在100毫秒的時間內(nèi)移除隊頭元素;超時返回null
Object head = q.poll(100, TimeUnit.MILLISECONDS);
?
3. Java并發(fā)包的阻塞隊列實現(xiàn)
java.util.concurrent包提供了阻塞隊列的幾種實現(xiàn)。
類名 | 解釋 |
---|---|
LinkedBlockingQueue | 雙端隊列最易,容量沒有上界怒坯,但是也可以選擇指定一個最大容量。 |
ArrayBlockingQueue | 在構造是需要指定容量藻懒,并且有一個可選的參數(shù)來指定是否需要公平性剔猿。如果設置了公平性,那么等待時間最長的線程會優(yōu)先得到處理嬉荆。公平性會降低性能归敬。 |
PriorityBlockingQueue | 優(yōu)先隊列,元素按照它們的優(yōu)先級順序移除。這個隊列沒有容量上限汪茧。但是隊列為空獲取元素也會造成阻塞椅亚。 |
?
4. 程序?qū)崙?zhàn)
SearchFileByBlockingQueueDemo.java
package zeus.playground.concurrent.queue;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Scanner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class SearchFileByBlockingQueueDemo {
private static final int FILE_QUEUE_SIZE = 10;
private static final int SEARCH_THREADS = 100;
private static final Path DUMMY = Path.of("");
private static BlockingQueue<Path> queue = new ArrayBlockingQueue<>(FILE_QUEUE_SIZE);
public static void main(String[] args) {
try (var in = new Scanner(System.in)) {
System.out.print("Enter base directory (e.g. /opt/jdk-9-src): ");
String directory = in.nextLine();
System.out.print("Enter keyword (e.g. volatile): ");
String keyword = in.nextLine();
Runnable enumerator = () -> {
try {
enumerate(Path.of(directory));
queue.put(DUMMY);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
}
};
// 工作線程負責向隊列添加元素
new Thread(enumerator).start();
for (int i = 1; i <= SEARCH_THREADS; i++) {
Runnable searcher = () -> {
try {
var done = false;
while (!done) {
Path file = queue.take();
if (file == DUMMY) {
queue.put(file);
done = true;
} else {
search(file, keyword);
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
}
};
// 工作線程負責從隊列移除元素
new Thread(searcher).start();
}
}
}
/**
* Recursively enumerates all files in a given directory and its subdirectories.
* @param directory the directory in which to start
*/
public static void enumerate(Path directory) throws IOException, InterruptedException {
try (Stream<Path> children = Files.list(directory)) {
for (Path child : children.collect(Collectors.toList())) {
if (Files.isDirectory(child)) {
enumerate(child);
} else {
queue.put(child);
}
}
}
}
/**
* Searches a file for a given keyword and prints all matching lines.
*
* @param file the file to search
* @param keyword the keyword to search for
*/
public static void search(Path file, String keyword) throws IOException {
try (var in = new Scanner(file, StandardCharsets.UTF_8)) {
int lineNumber = 0;
while (in.hasNextLine()) {
lineNumber++;
String line = in.nextLine();
if (line.contains(keyword)) {
System.out.printf("%s:%d:%s%n", file, lineNumber, line);
}
}
}
}
}
上面程序通過阻塞隊列的方式搜索指定文件夾下的所有文件是否有某個關鍵字。
程序通過一批工作線程來完成:
一個生產(chǎn)者工作線程通過調(diào)用遞歸方法來遍歷文件夾舱污,并把文件路勁塞入阻塞隊列呀舔。
一批消費者工作線程通過從阻塞隊列中隊頭挨個取文件路徑,再根據(jù)文件路徑搜索指定文件是否含有某個關鍵字扩灯。