第五章筆記
5.1 同步容器類
同步容器類包括Vector和Hashtable登下,還有Collections.synchronizedXxx等帜乞。
5.1.1 同步容器類的問題
同步線程類都是線程安全的,但在某些情況下可能需要額外的客戶端加鎖來保護復合操作搀绣。
- 迭代
- 條件運算(檢查在Map中是否存在鍵值K,如果沒有肝集,就加入二元組)
在并發(fā)容器中虐先,這些復合操作也是線程安全的暮蹂,但當其他線程并發(fā)修改容器時,可能會出現(xiàn)意料之外的行為癌压。
程序清單5-1
public class UnsafeVectorHelpers {
public static Object getLast(Vector list) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
public static void deleteLast(Vector list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
}
該程序的問題是仰泻,當A線程在包含10個元素的Vector上調(diào)用getLast,同時B線程在同一個Vector上調(diào)用deleteLast滩届,這些操作交替執(zhí)行時集侯,getLast將拋出ArrayIndexOutOfBoundsException異常。在調(diào)用size與調(diào)用getLast這兩個操作之間丐吓,Vector變小了浅悉,因此在調(diào)用size時得到的索引值將不再有效。
由于同步容器類要遵守同步策略券犁,即支持客戶端加鎖术健,所以在list上加鎖就可以將getLast和deleteLast變?yōu)樵硬僮鳌?/p>
程序清單5-2
public class SafeVectorHelpers {
public static Object getLast(Vector list) {
synchronized (list) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
}
public static void deleteLast(Vector list) {
synchronized (list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
}
}
這種風險在迭代時依然會出現(xiàn)
程序清單5-3
for(int i = 0; i < vector.size(); i++)
doSomething(vector.get(i));
同之前一樣,當?shù)臅r候粘衬,size發(fā)生變化荞估,依然會拋出異常,可以通過在客戶端加鎖的方式來避免稚新,但是要犧牲性能勘伺。
程序清單5-4
synchronized(vector){
for(int i = 0; i < vector.size(); i++)
doSomething(vector.get(i));
}
5.1.2 迭代器與ConcurrentModificationException
對容器進行迭代的時候一會選用Iterator,但是即便用迭代器如果不對容器加鎖褂删,也無法避免運行時出現(xiàn)ConcurrentModificationException異常飞醉。
程序清單5-5
List<Widget> widgetList
= Collections.synchronizedList(new ArrayList<Widget>);
...
// 可能拋出ConcurrentModificationException
for (Widget w : widgetList)
doSomething(w);
上面程序就是用for-each語法對List容器進行迭代,javac將生成使用Iterator的代碼屯阀,反復調(diào)用hasNext和next來迭代List對象缅帘。
要想避免出現(xiàn)ConcurrentModificationException異常,就必須在迭代過程持有容器的鎖难衰。
但是在容器上加鎖钦无,可能會出現(xiàn)性能問題以及死鎖。
另一種替代方法就是“克隆”容器盖袭,并在副本上進行迭代失暂,由于副本被封閉在線程內(nèi)部,所以不會出現(xiàn)ConcurrentModificationException異常鳄虱,但是會帶來顯著的性能開銷弟塞。
5.1.3 隱藏迭代器
雖然加鎖可以防止迭代器拋出ConcurrentModificationException,但是必須在所有對共享容器做操作的地方加鎖拙已。但是實際情況中宣肚,迭代器會隱藏起來,來看下面程序
程序清單5-6
public class HiddenIterator {
@GuardedBy("this") private final Set<Integer> set = new HashSet<Integer>();
public synchronized void add(Integer i) {
set.add(i);
}
public synchronized void remove(Integer i) {
set.remove(i);
}
public void addTenThings() {
Random r = new Random();
for (int i = 0; i < 10; i++)
add(r.nextInt());
System.out.println("DEBUG: added ten elements to " + set);
}
}
System.out.println("DEBUG: added ten elements to " + set)
這行代碼編譯器將字符串的連接操作轉(zhuǎn)換為調(diào)用StringBuilder.append(Object)悠栓,而這個方法又會調(diào)用容器的toString方法霉涨,標準容器的toString方法將迭代容器按价,并在每個元素上調(diào)用toString來生成容器內(nèi)容的格式化表示。
正如封裝對象的狀態(tài)有助于維持不變性條件一樣笙瑟,封裝對象的同步機制同樣有助于確保實施同步策略楼镐。
類似的操作還有容器的hashCode、equals往枷、containsAll框产、removeAll和retainAll方法。
5.2 并發(fā)容器
通過并發(fā)容器來替代同步容器错洁,可以極大地提高伸縮性并降低風險秉宿。
ConcurrentHashMap、CopyOnWriteArrayList
Queue:用來臨時保存一組等待處理的元素屯碴,Queue上的操作不會阻塞描睦,如果隊列為空,那么獲取元素的操作將返回空值导而。它提供了幾種實現(xiàn)忱叭,
- ConcurrentLinkQueue:傳統(tǒng)的先進先出隊列
- PriorityQueue:這是一個(非并發(fā))優(yōu)先隊列。
- BlockingQueue:擴展了Queue今艺,增加了可阻塞的插入和獲取等操作韵丑。如果隊列為空,那么獲取元素的操作將一直阻塞虚缎,直到隊列中出現(xiàn)一個可用的元素撵彻。如果隊列已滿,那么插入元素的操作將一直阻塞实牡,直到隊列中出現(xiàn)可用的空間陌僵。在“生產(chǎn)者 - 消費者”這種設計模式中,阻塞隊列是非常有用的铲掐。
5.2.1 ConcurrentHashMap
同步容器類在執(zhí)行每個操作期間都持有一個鎖拾弃。
ConcurrentHashMap是一個基于散列的Map值桩,并不是將每個方法都在同一個鎖上同步并使得每次只能有一個線程訪問容器摆霉,而是使用一種粒度更細的加鎖機制來實現(xiàn)更大程度的共享,這種機制稱為分段鎖奔坟。
ConcurrentHashMap提供的迭代器不會拋出ConcurrentModificationException携栋,因此不需要在迭代過程中對容器加鎖。
ConcurrentHashMap中沒有實現(xiàn)對Map加鎖以提供獨占訪問咳秉。
5.2.2 額外的原子操作
由于ConcurrentHashMap中沒有實現(xiàn)對Map加鎖以提供獨占訪問婉支,因此我們無法使用客戶端加鎖來創(chuàng)建新的原子操作,但是一些常見的復合操作都已經(jīng)實現(xiàn)為原子操作并且在ConcurrentMap的接口中聲明澜建。
程序清單5-7
public interface ConcurrentMap<K,V> extends Map<K,V> {
//僅當K沒有相應的映射值時才插入
V putIfAbsent(K key, V value);
//僅當K被映射到V時才移除
boolean remove(Object key, Object value);
//僅當K被映射到oldValue時才替換為newValue
boolean replace(K key, V oldValue, V newValue);
//僅當K被映射到某個值時才替換為newValue
V replace(K key, V value);
}
5.2.3 CopyOnWriteArrayList
用于替代同步List向挖,在迭代期間不需要對容器進行加鎖或復制蝌以。(CopyOnWriteArraySet的作用是替代同步Set)。
該容器的線程安全性在于何之,只要正確地發(fā)布一個事實不可變的對象跟畅,那么在訪問該對象時就不再需要進一步的同步。
每次修改容器時都會復制底層數(shù)組溶推,當容器規(guī)模較大時徊件,需要很大的性能開銷。
5.3 阻塞隊列和生產(chǎn)者 - 消費者模式
阻塞隊列提供了可阻塞的put和take方法蒜危,以及offer和poll方法虱痕。如果隊列已經(jīng)滿了,那么put方法將阻塞直到有空間可用辐赞;如果隊列為空那么take方法將會阻塞直到有元素可用部翘。
常見的生產(chǎn)者-消費者設計模式就是線程池與工作隊列的組合,在Executor任務執(zhí)行框架中就體現(xiàn)了這種模式占拍。
offer方法作用是略就,如果數(shù)據(jù)項不能被添加到隊列中,那么就返回一個失敗狀態(tài)晃酒。
BlockingQueue簡化了生產(chǎn)者-消費者設計的實現(xiàn)過程表牢,他有多種實現(xiàn):
- LinkedBlockingQueue,是FIFO隊列贝次,和LinkedList類似
- ArrayBlockingQueue崔兴,是FIFO隊列,和ArrayList類似
- PriorityBlockingQueue是一個按優(yōu)先級排列的隊列蛔翅,根據(jù)元素的自然順序來比較元素
- SynchronousQueue(后面稱SQ)內(nèi)部沒有容量敲茄,所以不能通過peek方法獲取頭部元素;也不能單獨插入元素山析,可以簡單理解為它的插入和移除是“一對”對稱的操作堰燎。為了兼容 Collection 的某些操作(例如contains),SQ 扮演了一個空集合的角色笋轨。
SQ 的一個典型應用場景是在線程池中秆剪,Executors.newCachedThreadPool() 就使用了它,這個構(gòu)造使線程池根據(jù)需要(新任務到來時)創(chuàng)建新的線程爵政,如果有空閑線程則會重復使用仅讽,線程空閑了60秒后會被回收。
5.3.1 示例:桌面搜索
掃描本地驅(qū)動器上的文件建立索引以便隨后進行搜索钾挟,類似于桌面搜索程序或Windows索引服務洁灵。
程序清單5-8
public class ProducerConsumer {
static class FileCrawler implements Runnable {
private final BlockingQueue<File> fileQueue;
private final FileFilter fileFilter;
private final File root;
public FileCrawler(BlockingQueue<File> fileQueue,
final FileFilter fileFilter,
File root) {
this.fileQueue = fileQueue;
this.root = root;
this.fileFilter = new FileFilter() {
public boolean accept(File f) {
return f.isDirectory() || fileFilter.accept(f);
}
};
}
private boolean alreadyIndexed(File f) {
return false;
}
public void run() {
try {
crawl(root);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void crawl(File root) throws InterruptedException {
File[] entries = root.listFiles(fileFilter);
if (entries != null) {
for (File entry : entries)
if (entry.isDirectory())
crawl(entry);
else if (!alreadyIndexed(entry))
fileQueue.put(entry);
}
}
}
static class Indexer implements Runnable {
private final BlockingQueue<File> queue;
public Indexer(BlockingQueue<File> queue) {
this.queue = queue;
}
public void run() {
try {
while (true)
indexFile(queue.take());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void indexFile(File file) {
// Index the file...
};
}
private static final int BOUND = 10;
private static final int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
public static void startIndexing(File[] roots) {
BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND);
FileFilter filter = new FileFilter() {
public boolean accept(File file) {
return true;
}
};
for (File root : roots)
new Thread(new FileCrawler(queue, filter, root)).start();
for (int i = 0; i < N_CONSUMERS; i++)
new Thread(new Indexer(queue)).start();
}
}
FileCrawler給出了一個生產(chǎn)者任務,在某個文件層次結(jié)構(gòu)中搜索符合索引標準的文件掺出,并將它們的名稱放入工作隊列徽千。而且苫费,在Indexer中還給出了一個消費者任務,從隊列中取出文件名稱并對它們建立索引双抽。
生產(chǎn)者-消費者模式可以提高代碼的可讀性和可重用性:每個操作只需完成一個任務黍衙,并且阻塞隊列將負責所有的控制流,因此每個功能的代碼都更加簡單和清晰荠诬。
生產(chǎn)者 - 消費者模式帶來性能優(yōu)勢琅翻。可以并發(fā)地執(zhí)行柑贞。
程序清單5-9 啟動桌面搜索
public class ProducerConsumer {
static class FileCrawler implements Runnable {
private final BlockingQueue<File> fileQueue;
private final FileFilter fileFilter;
private final File root;
public FileCrawler(BlockingQueue<File> fileQueue,
final FileFilter fileFilter,
File root) {
this.fileQueue = fileQueue;
this.root = root;
this.fileFilter = new FileFilter() {
public boolean accept(File f) {
return f.isDirectory() || fileFilter.accept(f);
}
};
}
private boolean alreadyIndexed(File f) {
return false;
}
public void run() {
try {
crawl(root);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void crawl(File root) throws InterruptedException {
File[] entries = root.listFiles(fileFilter);
if (entries != null) {
for (File entry : entries)
if (entry.isDirectory())
crawl(entry);
else if (!alreadyIndexed(entry))
fileQueue.put(entry);
}
}
}
static class Indexer implements Runnable {
private final BlockingQueue<File> queue;
public Indexer(BlockingQueue<File> queue) {
this.queue = queue;
}
public void run() {
try {
while (true)
indexFile(queue.take());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void indexFile(File file) {
// Index the file...
};
}
private static final int BOUND = 10;
private static final int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
public static void startIndexing(File[] roots) {
BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND);
FileFilter filter = new FileFilter() {
public boolean accept(File file) {
return true;
}
};
for (File root : roots)
new Thread(new FileCrawler(queue, filter, root)).start();
for (int i = 0; i < N_CONSUMERS; i++)
new Thread(new Indexer(queue)).start();
}
}
5.3.2 串行線程封閉
5.3.3 雙端隊列與工作密取
Java6增加了兩種容器類型方椎,Deque和BlockingDeque,它們分別對Queue和BlockingQueue進行擴展钧嘶。
Deque是一個雙端隊列棠众,實現(xiàn)了在隊列頭和隊列尾的高效插入和刪除。具體實現(xiàn)包括ArrayDeque和LinkedBlockingDeque有决。
在生產(chǎn)者-消費者模式中闸拿,所有消費者有一個共享的工作隊列,而在工作密取設計中书幕,每個消費者都有自己的雙端隊列新荤,如果一個消費者完成了自己雙端隊列中的全部工作,那么它可以從其他消費者雙端隊列末尾秘密地獲取工作台汇。
工作密取非常適用于既是消費者也是生產(chǎn)者問題 - 當執(zhí)行某個工作時可能導致出現(xiàn)更多的工作挤土。
例如:網(wǎng)頁爬蟲時搜骡,發(fā)現(xiàn)有更多的頁面需要處理;垃圾回收階段對堆進行標記冠骄。
5.4 阻塞方法與中斷方法
線程可能會阻塞或暫停執(zhí)行懦尝,原因有多種:等待I/O操作結(jié)束躲雅,等待獲得一個鎖劫乱,等待從Thread.sleep方法中醒來巡李,或是等待另一個線程的計算結(jié)果。
阻塞操作與執(zhí)行時間很長的普通操作的差別在于笆呆,被阻塞的線程必須等待某個不受它控制的事件發(fā)生后才能繼續(xù)執(zhí)行请琳,例如等待I/O操作完成,等待某個鎖變?yōu)榭捎醚埽蛘叩却獠坑嬎愕慕Y(jié)束单起。
BlockingQueue的put和take等方法會拋出InterruptedException抱怔,當某方法拋出InterruptedException時劣坊,表示這是一個阻塞方法。如果這個方法被終端屈留,那么它將努力提前結(jié)束阻塞狀態(tài)局冰。
中斷是一種協(xié)作機制测蘑。一個線程不能強制其他線程停止正在執(zhí)行的操作而去執(zhí)行其他的操作。
處理對中斷的響應康二,有兩種選擇:
- 傳遞InterruptedException:向上拋出該異常
- 恢復中斷:有時不能拋出該異常碳胳,例如當代碼是Runnable的一部分時,這時必須捕獲該異常沫勿,并通過調(diào)用當前線程上的interrupt方法恢復中斷狀態(tài)挨约,比如下面程序這樣
程序清單5-10
public class TaskRunnable implements Runnable {
BlockingQueue<Task> queue;
public void run() {
try {
processTask(queue.take());
} catch (InterruptedException e) {
// restore interrupted status
Thread.currentThread().interrupt();
}
}
void processTask(Task task) {
// Handle the task
}
interface Task {
}
}
5.5 同步工具類
同步工具類包含一些特定的結(jié)構(gòu)化屬性:它們封裝了一些狀態(tài),這些狀態(tài)將決定執(zhí)行同步工具類的線程是繼續(xù)執(zhí)行還是等待产雹,此外還提供了一些方法對狀態(tài)進行操作诫惭,以及另一些方法用于高效地等待同步工具類進入到預期狀態(tài),同步工具類包括:阻塞隊列蔓挖、信號量(Semaphore)夕土、柵欄(Barrier)以及閉鎖(Latch)。
5.5.1 閉鎖
閉鎖可以延遲線程的進度知道其到達終止狀態(tài)瘟判。
作用:相當于一扇門怨绣,在閉鎖到達結(jié)束狀態(tài)之前,這扇門一直是關(guān)閉的拷获,并且沒有任何線程能通過篮撑,當?shù)竭_結(jié)束狀態(tài)時,這扇門會打開并允許所有的線程通過匆瓜⊙噬龋可以用來確保某些活動直到其他活動都完成后才繼續(xù)執(zhí)行。
CountDownLatch是一種靈活的閉鎖實現(xiàn)陕壹。countDown方法遞減計數(shù)器质欲,await方法等待計數(shù)器達到零。
程序清單5-11
public class TestHarness {
public long timeTasks(int nThreads, final Runnable task)
throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException ignored) {
}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end - start;
}
}
該程序創(chuàng)建一定數(shù)量的線程糠馆,利用它們并發(fā)地執(zhí)行指定的任務嘶伟。使用了兩個閉鎖startGate和endGate,分別表示起始門和結(jié)束門又碌。起始門計數(shù)器的初始值為1九昧,而結(jié)束門計數(shù)器的初始值為工作線程的數(shù)量。每個工作線程首先要做的就是在啟動門上等待毕匀,從而確保所有線程都就緒后才開始執(zhí)行铸鹰。而每個線程要做的最后一件事情是將調(diào)用結(jié)束門的countDown方法減1,這能使主線程高效地等待直到所有工作線程都執(zhí)行完成皂岔。
5.5.2 FutureTask
FutureTask也可用做閉鎖蹋笼。
FutureTask表示的計算是通過Callable來實現(xiàn)的。
Future.get的行為取決于任務的狀態(tài)。如果任務已經(jīng)完成剖毯,那么get會立即返回結(jié)果圾笨,否則get將阻塞直到任務進入完成狀態(tài),然后返回結(jié)果或者拋出異常逊谋。
FutureTask在Executor框架中表示異步任務擂达,此外還可以用來表示一些時間較長的計算。這些計算可以在使用計算結(jié)果之前啟動胶滋。
程序清單5-12
public class Preloader {
//從數(shù)據(jù)庫加載產(chǎn)品信息
ProductInfo loadProductInfo() throws DataLoadException {
return null;
}
private final FutureTask<ProductInfo> future =
new FutureTask<ProductInfo>(new Callable<ProductInfo>() {
public ProductInfo call() throws DataLoadException {
return loadProductInfo();
}
});
private final Thread thread = new Thread(future);
//啟動線程
public void start() { thread.start(); }
//通過get方法獲取結(jié)果
public ProductInfo get()
throws DataLoadException, InterruptedException {
try {
return future.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof DataLoadException)
throw (DataLoadException) cause;
else
throw LaunderThrowable.launderThrowable(cause);
}
}
interface ProductInfo {
}
}
class DataLoadException extends Exception { }
這里需要注意自定義異常ExecutionException板鬓,在調(diào)用get方法時,無論代碼拋出什么異常究恤,都會被封裝到該異常中穗熬。
在Preloader中,當get方法拋出ExecutionException時丁溅,可能是以下三種情況之一:Callable拋出的受檢查異常唤蔗,RuntimeException,以及Error窟赏。必須對每種情況單獨處理妓柜。
程序清單5-13
public class LaunderThrowable {
/**
* Coerce an unchecked Throwable to a RuntimeException
* <p/>
* If the Throwable is an Error, throw it; if it is a
* RuntimeException return it, otherwise throw IllegalStateException
*/
public static RuntimeException launderThrowable(Throwable t) {
if (t instanceof RuntimeException)
return (RuntimeException) t;
else if (t instanceof Error)
throw (Error) t;
else
throw new IllegalStateException("Not unchecked", t);
}
}
5.5.3 信號量
計數(shù)信號量用來控制同時訪問某個特定資源的操作數(shù)量,或者同時執(zhí)行某個指定操作的數(shù)量涯穷。用來實現(xiàn)某種資源池棍掐,或者對容器施加邊界。
Semaphore管理著一組許可(permit),許可的初始數(shù)量可以通過構(gòu)造函數(shù)設定拷况,操作時首先要獲取到許可作煌,才能進行操作,操作完成后需要釋放許可赚瘦。如果沒有獲取許可粟誓,則阻塞到有許可被釋放。如果初始化了一個許可為1的Semaphore起意,那么就相當于一個不可重入的互斥鎖(Mutex)鹰服。
理論的聽起來有些繞口,其實假設生活中一個常見的場景:每天早上揽咕,大家都熱衷于帶薪上廁所悲酷,但是公司廁所一共只有10個坑位。亲善。那么只能同時10個人用著设易,后面來的人都得等著(阻塞),如果走了2個人蛹头,那么又可以進去2個人顿肺。這里面就是Semaphore的應用場景戏溺,爭奪有限的資源。
程序清單5-14
public class BoundedHashSet <T> {
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}
public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded)
//注意一定要在finally中釋放挟冠,否則就會出現(xiàn)死鎖
sem.release();
}
}
public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved)
sem.release();
return wasRemoved;
}
}
5.5.4 柵欄
柵欄可以阻塞一組線程直到某個事件發(fā)生。
柵欄與閉鎖的區(qū)別:所有線程必須同時到達柵欄位置袍睡,才能繼續(xù)執(zhí)行知染。閉鎖用于等待事件,柵欄用于等待其他線程斑胜。例如:幾個家庭決定在某個地方集合:“所有人6:00在麥當勞碰頭控淡,到了以后要等待其他人,之后再討論下一步要做的事情”止潘。
CyclicBarrier可以使一定數(shù)量的參與方反復地在柵欄位置匯集掺炭,當線程到達柵欄位置時將調(diào)用await方法,這個方法將阻塞直到所有線程都到達柵欄位置之后才釋放凭戴,當所有線程都釋放之后涧狮,柵欄將唄重置以便下次使用;如果對await的調(diào)用超時么夫、中斷者冤,那么所有阻塞的線程都將拋出BrokenBarrierException。
程序清單5-15
public class CellularAutomata {
private final Board mainBoard;
private final CyclicBarrier barrier;
private final Worker[] workers;
public CellularAutomata(Board board) {
this.mainBoard = board;
int count = Runtime.getRuntime().availableProcessors();
this.barrier = new CyclicBarrier(count,
new Runnable() {
public void run() {
mainBoard.commitNewValues();
}});
this.workers = new Worker[count];
for (int i = 0; i < count; i++)
workers[i] = new Worker(mainBoard.getSubBoard(count, i));
}
private class Worker implements Runnable {
private final Board board;
public Worker(Board board) { this.board = board; }
public void run() {
while (!board.hasConverged()) {
for (int x = 0; x < board.getMaxX(); x++)
for (int y = 0; y < board.getMaxY(); y++)
board.setNewValue(x, y, computeValue(x, y));
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
private int computeValue(int x, int y) {
// Compute the new value that goes in (x,y)
return 0;
}
}
public void start() {
for (int i = 0; i < workers.length; i++)
new Thread(workers[i]).start();
mainBoard.waitForConvergence();
}
interface Board {
int getMaxX();
int getMaxY();
int getValue(int x, int y);
int setNewValue(int x, int y, int value);
void commitNewValues();
boolean hasConverged();
void waitForConvergence();
Board getSubBoard(int numPartitions, int index);
}
}
另一種形式的柵欄是Exchanger档痪,它是一種兩房柵欄涉枫,各方在柵欄位置上交換數(shù)據(jù)。例如當一個線程向緩沖區(qū)寫入數(shù)據(jù)腐螟,而另一個線程從緩沖區(qū)中讀取數(shù)據(jù)愿汰。這些線程可以使用Exchanger來匯合,并將滿的緩沖區(qū)與空的緩沖區(qū)交換乐纸。
5.6 構(gòu)建高效且可伸縮的結(jié)果緩存
本節(jié)將開發(fā)一個高效且可伸縮的緩存衬廷,用于改進一個高計算開銷的函數(shù),首先從簡單的HashMap開始汽绢,然后分析它的并發(fā)性缺陷泵督,并討論如何修復它們。
程序清單5-16
public class Memoizer1 <A, V> implements Computable<A, V> {
@GuardedBy("this") private final Map<A, V> cache = new HashMap<A, V>();
private final Computable<A, V> c;
public Memoizer1(Computable<A, V> c) {
this.c = c;
}
public synchronized V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}
}
interface Computable <A, V> {
V compute(A arg) throws InterruptedException;
}
class ExpensiveFunction
implements Computable<String, BigInteger> {
public BigInteger compute(String arg) {
// after deep thought...
return new BigInteger(arg);
}
}
在ExpensiveFunction中實現(xiàn)的Computable庶喜,需要很長時間才能計算出結(jié)果小腊,我們將創(chuàng)建一個Computable包裝器,幫助記住之前的計算結(jié)果久窟,并將緩存過程封裝起來秩冈。
在程序清單5-16中給出了第一種嘗試,用HashMap來保存斥扛,但是它不是線程安全的入问,所以Memoizer1采取了加鎖的方式丹锹,對整個compute方法進行同步。如果另一個線程正在計算芬失,其他調(diào)用compute的線程可能會被阻塞很長時間楣黍。這一種不是我們想要的。
程序清單5-17
public class Memoizer2 <A, V> implements Computable<A, V> {
private final Map<A, V> cache = new ConcurrentHashMap<A, V>();
private final Computable<A, V> c;
public Memoizer2(Computable<A, V> c) {
this.c = c;
}
public V compute(A arg) throws InterruptedException {
V result = cache.get(arg);
if (result == null) {
result = c.compute(arg);
cache.put(arg, result);
}
return result;
}
}
程序清單5-17中的Memoizer2用ConcurrentHashMap代替HashMap棱烂,解決了線程不安全的問題租漂,但是當兩個線程同時調(diào)用compute時存在一個漏洞,可能會導致計算得到相同的值颊糜。
Memoizer2的問題在于哩治,如果某個線程啟動了一個開銷很多的計算,而其他線程并不知道這個計算正在進行衬鱼,那么很可能會重復這個計算业筏,可以用FutureTask來實現(xiàn)。
如果有結(jié)果可用鸟赫,那么FutureTask.get將立即返回結(jié)果蒜胖,否則它會一直阻塞,直到結(jié)果計算出來再將其返回抛蚤。
程序清單5-18
public class Memoizer3<A, V> implements Computable<A, V> {
private final Map<A, Future<V>> cache
= new ConcurrentHashMap<A, Future<V>>();
private final Computable<A, V> c;
public Memoizer3(Computable<A, V> c) { this.c = c; }
public V compute(final A arg) throws InterruptedException {
Future<V> f = cache.get(args);
if (f == null){
Callable<V> eval = new Callable<V>() {
public V call() throws InterruptedException {
return c.compute(args);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = ft;
cache.put(arg, ft);
ft.run(); //在這里將調(diào)用c.compute
}
try{
return f.get();
} catch(ExecutionException e){
throw launderThrowable(e.getCause());
}
}
}
該程序的問題是if代碼塊中的復合操作cache.put(arg, ft)是在底層的Map對象上執(zhí)行的翠勉,而這個對象無法通過加鎖來確保原子性。
程序清單5-19
public class Memoizer <A, V> implements Computable<A, V> {
private final ConcurrentMap<A, Future<V>> cache
= new ConcurrentHashMap<A, Future<V>>();
private final Computable<A, V> c;
public Memoizer(Computable<A, V> c) {
this.c = c;
}
public V compute(final A arg) throws InterruptedException {
while (true) {
Future<V> f = cache.get(arg);
if (f == null) {
Callable<V> eval = new Callable<V>() {
public V call() throws InterruptedException {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = cache.putIfAbsent(arg, ft);
if (f == null) {
f = ft;
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
cache.remove(arg, f);
} catch (ExecutionException e) {
throw LaunderThrowable.launderThrowable(e.getCause());
}
}
}
}
在完成并發(fā)緩存的實現(xiàn)后霉颠,就可以為第2章中因式分解servlet添加緩存对碌。
程序清單5-20
@ThreadSafe
public class Factorizer extends GenericServlet implements Servlet {
private final Computable<BigInteger, BigInteger[]> c =
new Computable<BigInteger, BigInteger[]>() {
public BigInteger[] compute(BigInteger arg) {
return factor(arg);
}
};
private final Computable<BigInteger, BigInteger[]> cache
= new Memoizer<BigInteger, BigInteger[]>(c);
public void service(ServletRequest req,
ServletResponse resp) {
try {
BigInteger i = extractFromRequest(req);
encodeIntoResponse(resp, cache.compute(i));
} catch (InterruptedException e) {
encodeError(resp, "factorization interrupted");
}
}
void encodeIntoResponse(ServletResponse resp, BigInteger[] factors) {
}
void encodeError(ServletResponse resp, String errorString) {
}
BigInteger extractFromRequest(ServletRequest req) {
return new BigInteger("7");
}
BigInteger[] factor(BigInteger i) {
// Doesn't really factor
return new BigInteger[]{i};
}
}