編寫優(yōu)質的并發(fā)代碼是一件難度極高的事情。Java語言從第一版本開始內置了對多線程的支持,這一點在當年是非常了不起的亚享,但是當我們對并發(fā)編程有了更深刻的認識和更多的實踐后,實現(xiàn)并發(fā)編程就有了更多的方案和更好的選擇潮剪。本文是對并發(fā)編程的一點總結和思考,同時也分享了Java 5以后的版本中如何編寫并發(fā)代碼的一點點經(jīng)驗分唾。
為什么需要并發(fā)
并發(fā)其實是一種解耦合的策略抗碰,它幫助我們把做什么(目標)和什么時候做(時機)分開。這樣做可以明顯改進應用程序的吞吐量(獲得更多的CPU調度時間)和結構(程序有多個部分在協(xié)同工作)绽乔。做過Java Web開發(fā)的人都知道弧蝇,Java Web中的Servlet程序在Servlet容器的支持下采用單實例多線程的工作模式,Servlet容器為你處理了并發(fā)問題折砸。
誤解和正解
最常見的對并發(fā)編程的誤解有以下這些:
- 并發(fā)總能改進性能(并發(fā)在CPU有很多空閑時間時能明顯改進程序的性能看疗,但當線程數(shù)量較多的時候,線程間頻繁的調度切換反而會讓系統(tǒng)的性能下降)
- 編寫并發(fā)程序無需修改原有的設計(目的與時機的解耦往往會對系統(tǒng)結構產(chǎn)生巨大的影響)
- 在使用Web或EJB容器時不用關注并發(fā)問題(只有了解了容器在做什么鞍爱,才能更好的使用容器)
下面的這些說法才是對并發(fā)客觀的認識:
- 編寫并發(fā)程序會在代碼上增加額外的開銷
- 正確的并發(fā)是非常復雜的,即使對于很簡單的問題
- 并發(fā)中的缺陷因為不易重現(xiàn)也不容易被發(fā)現(xiàn)
- 并發(fā)往往需要對設計策略從根本上進行修改
并發(fā)編程的原則和技巧
單一職責原則
分離并發(fā)相關代碼和其他代碼(并發(fā)相關代碼有自己的開發(fā)专酗、修改和調優(yōu)生命周期)睹逃。
限制數(shù)據(jù)作用域
兩個線程修改共享對象的同一字段時可能會相互干擾,導致不可預期的行為,解決方案之一是構造臨界區(qū)沉填,但是必須限制臨界區(qū)的數(shù)量疗隶。
使用數(shù)據(jù)副本
數(shù)據(jù)副本是避免共享數(shù)據(jù)的好方法,復制出來的對象只是以只讀的方式對待翼闹。Java 5的java.util.concurrent包中增加一個名為CopyOnWriteArrayList的類斑鼻,它是List接口的子類型,所以你可以認為它是ArrayList的線程安全的版本猎荠,它使用了寫時復制的方式創(chuàng)建數(shù)據(jù)副本進行操作來避免對共享數(shù)據(jù)并發(fā)訪問而引發(fā)的問題坚弱。
線程應盡可能獨立
讓線程存在于自己的世界中,不與其他線程共享數(shù)據(jù)关摇。有過Java Web開發(fā)經(jīng)驗的人都知道荒叶,Servlet就是以單實例多線程的方式工作,和每個請求相關的數(shù)據(jù)都是通過Servlet子類的service方法(或者是doGet或doPost方法)的參數(shù)傳入的输虱。只要Servlet中的代碼只使用局部變量些楣,Servlet就不會導致同步問題。Spring MVC的控制器也是這么做的宪睹,從請求中獲得的對象都是以方法的參數(shù)傳入而不是作為類的成員愁茁,很明顯Struts 2的做法就正好相反,因此Struts 2中作為控制器的Action類都是每個請求對應一個實例亭病。
Java 5以前的并發(fā)編程
Java的線程模型建立在搶占式線程調度的基礎上鹅很,也就是說:
- 所有線程可以很容易的共享同一進程中的對象。
- 能夠引用這些對象的任何線程都可以修改這些對象命贴。
- 為了保護數(shù)據(jù)道宅,對象可以被鎖住。
Java基于線程和鎖的并發(fā)過于底層胸蛛,而且使用鎖很多時候都是很萬惡的污茵,因為它相當于讓所有的并發(fā)都變成了排隊等待。
在Java 5以前葬项,可以用synchronized關鍵字來實現(xiàn)鎖的功能泞当,它可以用在代碼塊和方法上,表示在執(zhí)行整個代碼塊或方法之前線程必須取得合適的鎖民珍。對于類的非靜態(tài)方法(成員方法)而言襟士,這意味這要取得對象實例的鎖,對于類的靜態(tài)方法(類方法)而言嚷量,要取得類的Class對象的鎖陋桂,對于同步代碼塊,程序員可以指定要取得的是那個對象的鎖蝶溶。
不管是同步代碼塊還是同步方法嗜历,每次只有一個線程可以進入宣渗,如果其他線程試圖進入(不管是同一同步塊還是不同的同步塊),JVM會將它們掛起(放入到等鎖池中)梨州。這種結構在并發(fā)理論中稱為臨界區(qū)(critical section)痕囱。這里我們可以對Java中用synchronized實現(xiàn)同步和鎖的功能做一個總結:
- 只能鎖定對象,不能鎖定基本數(shù)據(jù)類型
- 被鎖定的對象數(shù)組中的單個對象不會被鎖定
- 同步方法可以視為包含整個方法的synchronized(this) { … }代碼塊
- 靜態(tài)同步方法會鎖定它的Class對象
- 內部類的同步是獨立于外部類的
- synchronized修飾符并不是方法簽名的組成部分暴匠,所以不能出現(xiàn)在接口的方法聲明中
- 非同步的方法不關心鎖的狀態(tài)鞍恢,它們在同步方法運行時仍然可以得以運行
- synchronized實現(xiàn)的鎖是可重入的鎖。
在JVM內部每窖,為了提高效率帮掉,同時運行的每個線程都會有它正在處理的數(shù)據(jù)的緩存副本,當我們使用synchronzied進行同步的時候岛请,真正被同步的是在不同線程中表示被鎖定對象的內存塊(副本數(shù)據(jù)會保持和主內存的同步旭寿,現(xiàn)在知道為什么要用同步這個詞匯了吧),簡單的說就是在同步塊或同步方法執(zhí)行完后崇败,對被鎖定的對象做的任何修改要在釋放鎖之前寫回到主內存中盅称;在進入同步塊得到鎖之后,被鎖定對象的數(shù)據(jù)是從主內存中讀出來的后室,持有鎖的線程的數(shù)據(jù)副本一定和主內存中的數(shù)據(jù)視圖是同步的 缩膝。
在Java最初的版本中,就有一個叫volatile的關鍵字岸霹,它是一種簡單的同步的處理機制疾层,因為被volatile修飾的變量遵循以下規(guī)則:
- 變量的值在使用之前總會從主內存中再讀取出來。
- 對變量值的修改總會在完成之后寫回到主內存中贡避。
使用volatile關鍵字可以在多線程環(huán)境下預防編譯器不正確的優(yōu)化假設(編譯器可能會將在一個線程中值不會發(fā)生改變的變量優(yōu)化成常量)痛黎,但只有修改時不依賴當前狀態(tài)(讀取時的值)的變量才應該聲明為volatile變量。
不變模式也是并發(fā)編程時可以考慮的一種設計刮吧。讓對象的狀態(tài)是不變的湖饱,如果希望修改對象的狀態(tài),就會創(chuàng)建對象的副本并將改變寫入副本而不改變原來的對象杀捻,這樣就不會出現(xiàn)狀態(tài)不一致的情況井厌,因此不變對象是線程安全的。Java中我們使用頻率極高的String類就采用了這樣的設計致讥。如果對不變模式不熟悉仅仆,可以閱讀閻宏博士的《Java與模式》一書的第34章。說到這里你可能也體會到final關鍵字的重要意義了垢袱。
Java 5的并發(fā)編程
不管今后的Java向著何種方向發(fā)展或者滅忙墓拜,Java 5絕對是Java發(fā)展史中一個極其重要的版本,這個版本提供的各種語言特性我們不在這里討論(有興趣的可以閱讀我的另一篇文章《Java的第20年:從Java版本演進看編程技術的發(fā)展》)请契,但是我們必須要感謝Doug Lea在Java 5中提供了他里程碑式的杰作java.util.concurrent包咳榜,它的出現(xiàn)讓Java的并發(fā)編程有了更多的選擇和更好的工作方式潘懊。Doug Lea的杰作主要包括以下內容:
- 更好的線程安全的容器
- 線程池和相關的工具類
- 可選的非阻塞解決方案
- 顯示的鎖和信號量機制
- 下面我們對這些東西進行一一解讀。
原子類
Java 5中的java.util.concurrent包下面有一個atomic子包贿衍,其中有幾個以Atomic打頭的類,例如AtomicInteger和AtomicLong救恨。它們利用了現(xiàn)代處理器的特性贸辈,可以用非阻塞的方式完成原子操作,代碼如下所示:
/**
ID序列生成器
*/
public class IdGenerator {
private final AtomicLong sequenceNumber = new AtomicLong(0);
public long next() {
return sequenceNumber.getAndIncrement();
}
}
顯示鎖
基于synchronized關鍵字的鎖機制有以下問題:
- 鎖只有一種類型肠槽,而且對所有同步操作都是一樣的作用
- 鎖只能在代碼塊或方法開始的地方獲得擎淤,在結束的地方釋放
- 線程要么得到鎖,要么阻塞秸仙,沒有其他的可能性
Java 5對鎖機制進行了重構嘴拢,提供了顯示的鎖,這樣可以在以下幾個方面提升鎖機制:
- 可以添加不同類型的鎖寂纪,例如讀取鎖和寫入鎖
- 可以在一個方法中加鎖席吴,在另一個方法中解鎖
- 可以使用tryLock方式嘗試獲得鎖,如果得不到鎖可以等待捞蛋、回退或者干點別的事情孝冒,當然也可以在超時之后放棄操作
顯示的鎖都實現(xiàn)了java.util.concurrent.Lock接口,主要有兩個實現(xiàn)類:
- ReentrantLock - 比synchronized稍微靈活一些的重入鎖
- ReentrantReadWriteLock - 在讀操作很多寫操作很少時性能更好的一種重入鎖
對于如何使用顯示鎖拟杉,可以參考我的Java面試系列文章《Java面試題集51-70》中第60題的代碼庄涡。只有一點需要提醒,解鎖的方法unlock的調用最好能夠在finally塊中搬设,因為這里是釋放外部資源最好的地方穴店,當然也是釋放鎖的最佳位置,因為不管正常異衬醚ǎ可能都要釋放掉鎖來給其他線程以運行的機會泣洞。
CountDownLatch
CountDownLatch是一種簡單的同步模式,它讓一個線程可以等待一個或多個線程完成它們的工作從而避免對臨界資源并發(fā)訪問所引發(fā)的各種問題贞言。下面借用別人的一段代碼(我對它做了一些重構)來演示CountDownLatch是如何工作的斜棚。
import java.util.concurrent.CountDownLatch;
/**
* 工人類
* @author 駱昊
*
*/
class Worker {
private String name; // 名字
private long workDuration; // 工作持續(xù)時間
/**
* 構造器
*/
public Worker(String name, long workDuration) {
this.name = name;
this.workDuration = workDuration;
}
/**
* 完成工作
*/
public void doWork() {
System.out.println(name + " begins to work...");
try {
Thread.sleep(workDuration); // 用休眠模擬工作執(zhí)行的時間
} catch(InterruptedException ex) {
ex.printStackTrace();
}
System.out.println(name + " has finished the job...");
}
}
/**
* 測試線程
* @author 駱昊
*
*/
class WorkerTestThread implements Runnable {
private Worker worker;
private CountDownLatch cdLatch;
public WorkerTestThread(Worker worker, CountDownLatch cdLatch) {
this.worker = worker;
this.cdLatch = cdLatch;
}
@Override
public void run() {
worker.doWork(); // 讓工人開始工作
cdLatch.countDown(); // 工作完成后倒計時次數(shù)減1
}
}
class CountDownLatchTest {
private static final int MAX_WORK_DURATION = 5000; // 最大工作時間
private static final int MIN_WORK_DURATION = 1000; // 最小工作時間
// 產(chǎn)生隨機的工作時間
private static long getRandomWorkDuration(long min, long max) {
return (long) (Math.random() * (max - min) + min);
}
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(2); // 創(chuàng)建倒計時閂并指定倒計時次數(shù)為2
Worker w1 = new Worker("駱昊", getRandomWorkDuration(MIN_WORK_DURATION, MAX_WORK_DURATION));
Worker w2 = new Worker("王大錘", getRandomWorkDuration(MIN_WORK_DURATION, MAX_WORK_DURATION));
new Thread(new WorkerTestThread(w1, latch)).start();
new Thread(new WorkerTestThread(w2, latch)).start();
try {
latch.await(); // 等待倒計時閂減到0
System.out.println("All jobs have been finished!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
ConcurrentHashMap
ConcurrentHashMap是HashMap在并發(fā)環(huán)境下的版本,大家可能要問该窗,既然已經(jīng)可以通過Collections.synchronizedMap獲得線程安全的映射型容器弟蚀,為什么還需要ConcurrentHashMap呢?因為通過Collections工具類獲得的線程安全的HashMap會在讀寫數(shù)據(jù)時對整個容器對象上鎖酗失,這樣其他使用該容器的線程無論如何也無法再獲得該對象的鎖义钉,也就意味著要一直等待前一個獲得鎖的線程離開同步代碼塊之后才有機會執(zhí)行。實際上规肴,HashMap是通過哈希函數(shù)來確定存放鍵值對的桶(桶是為了解決哈希沖突而引入的)捶闸,修改HashMap時并不需要將整個容器鎖住夜畴,只需要鎖住即將修改的“桶”就可以了。HashMap的數(shù)據(jù)結構如下圖所示删壮。
此外贪绘,ConcurrentHashMap還提供了原子操作的方法,如下所示:
- putIfAbsent:如果還沒有對應的鍵值對映射央碟,就將其添加到HashMap中税灌。
- remove:如果鍵存在而且值與當前狀態(tài)相等(equals比較結果為true),則用原子方式移除該鍵值對映射
- replace:替換掉映射中元素的原子操作
CopyOnWriteArrayList
CopyOnWriteArrayList是ArrayList在并發(fā)環(huán)境下的替代品亿虽。CopyOnWriteArrayList通過增加寫時復制語義來避免并發(fā)訪問引起的問題菱涤,也就是說任何修改操作都會在底層創(chuàng)建一個列表的副本,也就意味著之前已有的迭代器不會碰到意料之外的修改洛勉。這種方式對于不要嚴格讀寫同步的場景非常有用粘秆,因為它提供了更好的性能。記住收毫,要盡量減少鎖的使用攻走,因為那勢必帶來性能的下降(對數(shù)據(jù)庫中數(shù)據(jù)的并發(fā)訪問不也是如此嗎?如果可以的話就應該放棄悲觀鎖而使用樂觀鎖)此再,CopyOnWriteArrayList很明顯也是通過犧牲空間獲得了時間(在計算機的世界里陋气,時間和空間通常是不可調和的矛盾,可以犧牲空間來提升效率獲得時間引润,當然也可以通過犧牲時間來減少對空間的使用)巩趁。
可以通過下面兩段代碼的運行狀況來驗證一下CopyOnWriteArrayList是不是線程安全的容器。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class AddThread implements Runnable {
private List<Double> list;
public AddThread(List<Double> list) {
this.list = list;
}
@Override
public void run() {
for(int i = 0; i < 10000; ++i) {
list.add(Math.random());
}
}
}
public class Test05 {
private static final int THREAD_POOL_SIZE = 2;
public static void main(String[] args) {
List<Double> list = new ArrayList<>();
ExecutorService es = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
es.execute(new AddThread(list));
es.execute(new AddThread(list));
es.shutdown();
}
}
上面的代碼會在運行時產(chǎn)生ArrayIndexOutOfBoundsException淳附,試一試將上面代碼25行的ArrayList換成CopyOnWriteArrayList再重新運行议慰。
List<Double> list = new CopyOnWriteArrayList<>();
Queue
隊列是一個無處不在的美妙概念,它提供了一種簡單又可靠的方式將資源分發(fā)給處理單元(也可以說是將工作單元分配給待處理的資源奴曙,這取決于你看待問題的方式)别凹。實現(xiàn)中的并發(fā)編程模型很多都依賴隊列來實現(xiàn),因為它可以在線程之間傳遞工作單元洽糟。
Java 5中的BlockingQueue就是一個在并發(fā)環(huán)境下非常好用的工具炉菲,在調用put方法向隊列中插入元素時,如果隊列已滿坤溃,它會讓插入元素的線程等待隊列騰出空間拍霜;在調用take方法從隊列中取元素時,如果隊列為空薪介,取出元素的線程就會阻塞祠饺。
可以用BlockingQueue來實現(xiàn)生產(chǎn)者-消費者并發(fā)模型(下一節(jié)中有介紹),當然在Java 5以前也可以通過wait和notify來實現(xiàn)線程調度汁政,比較一下兩種代碼就知道基于已有的并發(fā)工具類來重構并發(fā)代碼到底好在哪里了道偷。
- 基于wait和notify的實現(xiàn)
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 公共常量
* @author 駱昊
*
*/
class Constants {
public static final int MAX_BUFFER_SIZE = 10;
public static final int NUM_OF_PRODUCER = 2;
public static final int NUM_OF_CONSUMER = 3;
}
/**
* 工作任務
* @author 駱昊
*
*/
class Task {
private String id; // 任務的編號
public Task() {
id = UUID.randomUUID().toString();
}
@Override
public String toString() {
return "Task[" + id + "]";
}
}
/**
* 消費者
* @author 駱昊
*
*/
class Consumer implements Runnable {
private List<Task> buffer;
public Consumer(List<Task> buffer) {
this.buffer = buffer;
}
@Override
public void run() {
while(true) {
synchronized(buffer) {
while(buffer.isEmpty()) {
try {
buffer.wait();
} catch(InterruptedException e) {
e.printStackTrace();
}
}
Task task = buffer.remove(0);
buffer.notifyAll();
System.out.println("Consumer[" + Thread.currentThread().getName() + "] got " + task);
}
}
}
}
/**
* 生產(chǎn)者
* @author 駱昊
*
*/
class Producer implements Runnable {
private List<Task> buffer;
public Producer(List<Task> buffer) {
this.buffer = buffer;
}
@Override
public void run() {
while(true) {
synchronized (buffer) {
while(buffer.size() >= Constants.MAX_BUFFER_SIZE) {
try {
buffer.wait();
} catch(InterruptedException e) {
e.printStackTrace();
}
}
Task task = new Task();
buffer.add(task);
buffer.notifyAll();
System.out.println("Producer[" + Thread.currentThread().getName() + "] put " + task);
}
}
}
}
public class Test06 {
public static void main(String[] args) {
List<Task> buffer = new ArrayList<>(Constants.MAX_BUFFER_SIZE);
ExecutorService es = Executors.newFixedThreadPool(Constants.NUM_OF_CONSUMER + Constants.NUM_OF_PRODUCER);
for(int i = 1; i <= Constants.NUM_OF_PRODUCER; ++i) {
es.execute(new Producer(buffer));
}
for(int i = 1; i <= Constants.NUM_OF_CONSUMER; ++i) {
es.execute(new Consumer(buffer));
}
}
}
- 基于BlockingQueue的實現(xiàn)
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 公共常量
* @author 駱昊
*
*/
class Constants {
public static final int MAX_BUFFER_SIZE = 10;
public static final int NUM_OF_PRODUCER = 2;
public static final int NUM_OF_CONSUMER = 3;
}
/**
* 工作任務
* @author 駱昊
*
*/
class Task {
private String id; // 任務的編號
public Task() {
id = UUID.randomUUID().toString();
}
@Override
public String toString() {
return "Task[" + id + "]";
}
}
/**
* 消費者
* @author 駱昊
*
*/
class Consumer implements Runnable {
private BlockingQueue<Task> buffer;
public Consumer(BlockingQueue<Task> buffer) {
this.buffer = buffer;
}
@Override
public void run() {
while(true) {
try {
Task task = buffer.take();
System.out.println("Consumer[" + Thread.currentThread().getName() + "] got " + task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 生產(chǎn)者
* @author 駱昊
*
*/
class Producer implements Runnable {
private BlockingQueue<Task> buffer;
public Producer(BlockingQueue<Task> buffer) {
this.buffer = buffer;
}
@Override
public void run() {
while(true) {
try {
Task task = new Task();
buffer.put(task);
System.out.println("Producer[" + Thread.currentThread().getName() + "] put " + task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Test07 {
public static void main(String[] args) {
BlockingQueue<Task> buffer = new LinkedBlockingQueue<>(Constants.MAX_BUFFER_SIZE);
ExecutorService es = Executors.newFixedThreadPool(Constants.NUM_OF_CONSUMER + Constants.NUM_OF_PRODUCER);
for(int i = 1; i <= Constants.NUM_OF_PRODUCER; ++i) {
es.execute(new Producer(buffer));
}
for(int i = 1; i <= Constants.NUM_OF_CONSUMER; ++i) {
es.execute(new Consumer(buffer));
}
}
}
使用BlockingQueue后代碼優(yōu)雅了很多缀旁。
并發(fā)模型
在繼續(xù)下面的探討之前,我們還是重溫一下幾個概念:
- 概念和解釋
臨界資源:并發(fā)環(huán)境中有著固定數(shù)量的資源
互斥:對資源的訪問是排他式的
饑餓:一個或一組線程長時間或永遠無法取得進展
死鎖:兩個或多個線程相互等待對方結束
活鎖:想要執(zhí)行的線程總是發(fā)現(xiàn)其他的線程正在執(zhí)行以至于長時間或永遠無法執(zhí)行
重溫了這幾個概念后勺鸦,我們可以探討一下下面的幾種并發(fā)模型并巍。
生產(chǎn)者-消費者
一個或多個生產(chǎn)者創(chuàng)建某些工作并將其置于緩沖區(qū)或隊列中,一個或多個消費者會從隊列中獲得這些工作并完成之换途。這里的緩沖區(qū)或隊列是臨界資源履澳。當緩沖區(qū)或隊列放滿的時候,生產(chǎn)這會被阻塞怀跛;而緩沖區(qū)或隊列為空的時候,消費者會被阻塞柄冲。生產(chǎn)者和消費者的調度是通過二者相互交換信號完成的吻谋。
讀者-寫者
當存在一個主要為讀者提供信息的共享資源,它偶爾會被寫者更新现横,但是需要考慮系統(tǒng)的吞吐量漓拾,又要防止饑餓和陳舊資源得不到更新的問題。在這種并發(fā)模型中戒祠,如何平衡讀者和寫者是最困難的骇两,當然這個問題至今還是一個被熱議的問題,恐怕必須根據(jù)具體的場景來提供合適的解決方案而沒有那種放之四海而皆準的方法(不像我在國內的科研文獻中看到的那樣)姜盈。
哲學家進餐
1965年低千,荷蘭計算機科學家圖靈獎得主Edsger Wybe Dijkstra提出并解決了一個他稱之為哲學家進餐的同步問題。這個問題可以簡單地描述如下:五個哲學家圍坐在一張圓桌周圍馏颂,每個哲學家面前都有一盤通心粉示血。由于通心粉很滑,所以需要兩把叉子才能夾住救拉。相鄰兩個盤子之間放有一把叉子如下圖所示难审。哲學家的生活中有兩種交替活動時段:即吃飯和思考。當一個哲學家覺得餓了時亿絮,他就試圖分兩次去取其左邊和右邊的叉子告喊,每次拿一把,但不分次序派昧。如果成功地得到了兩把叉子黔姜,就開始吃飯,吃完后放下叉子繼續(xù)思考蒂萎。
??把上面問題中的哲學家換成線程地淀,把叉子換成競爭的臨界資源,上面的問題就是線程競爭資源的問題岖是。如果沒有經(jīng)過精心的設計帮毁,系統(tǒng)就會出現(xiàn)死鎖实苞、活鎖、吞吐量下降等問題。
下面是用信號量原語來解決哲學家進餐問題的代碼,使用了Java 5并發(fā)工具包中的Semaphore類(代碼不夠漂亮但是已經(jīng)足以說明問題了)爵卒。
//import java.util.concurrent.ExecutorService;
//import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* 存放線程共享信號量的上下問
* @author 駱昊
*
*/
class AppContext {
public static final int NUM_OF_FORKS = 5; // 叉子數(shù)量(資源)
public static final int NUM_OF_PHILO = 5; // 哲學家數(shù)量(線程)
public static Semaphore[] forks; // 叉子的信號量
public static Semaphore counter; // 哲學家的信號量
static {
forks = new Semaphore[NUM_OF_FORKS];
for (int i = 0, len = forks.length; i < len; ++i) {
forks[i] = new Semaphore(1); // 每個叉子的信號量為1
}
counter = new Semaphore(NUM_OF_PHILO - 1); // 如果有N個哲學家毙籽,最多只允許N-1人同時取叉子
}
/**
* 取得叉子
* @param index 第幾個哲學家
* @param leftFirst 是否先取得左邊的叉子
* @throws InterruptedException
*/
public static void putOnFork(int index, boolean leftFirst) throws InterruptedException {
if(leftFirst) {
forks[index].acquire();
forks[(index + 1) % NUM_OF_PHILO].acquire();
}
else {
forks[(index + 1) % NUM_OF_PHILO].acquire();
forks[index].acquire();
}
}
/**
* 放回叉子
* @param index 第幾個哲學家
* @param leftFirst 是否先放回左邊的叉子
* @throws InterruptedException
*/
public static void putDownFork(int index, boolean leftFirst) throws InterruptedException {
if(leftFirst) {
forks[index].release();
forks[(index + 1) % NUM_OF_PHILO].release();
}
else {
forks[(index + 1) % NUM_OF_PHILO].release();
forks[index].release();
}
}
}
/**
* 哲學家
* @author 駱昊
*
*/
class Philosopher implements Runnable {
private int index; // 編號
private String name; // 名字
public Philosopher(int index, String name) {
this.index = index;
this.name = name;
}
@Override
public void run() {
while(true) {
try {
AppContext.counter.acquire();
boolean leftFirst = index % 2 == 0;
AppContext.putOnFork(index, leftFirst);
System.out.println(name + "正在吃意大利面(通心粉)..."); // 取到兩個叉子就可以進食
AppContext.putDownFork(index, leftFirst);
AppContext.counter.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Test04 {
public static void main(String[] args) {
String[] names = { "駱昊", "王大錘", "張三豐", "楊過", "李莫愁" }; // 5位哲學家的名字
// ExecutorService es = Executors.newFixedThreadPool(AppContext.NUM_OF_PHILO); // 創(chuàng)建固定大小的線程池
// for(int i = 0, len = names.length; i < len; ++i) {
// es.execute(new Philosopher(i, names[i])); // 啟動線程
// }
// es.shutdown();
for(int i = 0, len = names.length; i < len; ++i) {
new Thread(new Philosopher(i, names[i])).start();
}
}
}
現(xiàn)實中的并發(fā)問題基本上都是這三種模型或者是這三種模型的變體。
測試并發(fā)代碼
對并發(fā)代碼的測試也是非常棘手的事情蔓搞,棘手到無需說明大家也很清楚的程度,所以這里我們只是探討一下如何解決這個棘手的問題。我們建議大家編寫一些能夠發(fā)現(xiàn)問題的測試并經(jīng)常性的在不同的配置和不同的負載下運行這些測試金赦。不要忽略掉任何一次失敗的測試,線程代碼中的缺陷可能在上萬次測試中僅僅出現(xiàn)一次对嚼。具體來說有這么幾個注意事項:
不要將系統(tǒng)的失效歸結于偶發(fā)事件夹抗,就像拉不出屎的時候不能怪地球沒有引力。
先讓非并發(fā)代碼工作起來纵竖,不要試圖同時找到并發(fā)和非并發(fā)代碼中的缺陷漠烧。
編寫可以在不同配置環(huán)境下運行的線程代碼。
編寫容易調整的線程代碼靡砌,這樣可以調整線程使性能達到最優(yōu)已脓。
讓線程的數(shù)量多于CPU或CPU核心的數(shù)量,這樣CPU調度切換過程中潛在的問題才會暴露出來通殃。
讓并發(fā)代碼在不同的平臺上運行度液。
通過自動化或者硬編碼的方式向并發(fā)代碼中加入一些輔助測試的代碼。
Java 7的并發(fā)編程
Java 7中引入了TransferQueue画舌,它比BlockingQueue多了一個叫transfer的方法恨诱,如果接收線程處于等待狀態(tài),該操作可以馬上將任務交給它骗炉,否則就會阻塞直至取走該任務的線程出現(xiàn)照宝。可以用TransferQueue代替BlockingQueue句葵,因為它可以獲得更好的性能厕鹃。
??剛才忘記了一件事情,Java 5中還引入了Callable接口乍丈、Future接口和FutureTask接口剂碴,通過他們也可以構建并發(fā)應用程序,代碼如下所示轻专。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Test07 {
private static final int POOL_SIZE = 10;
static class CalcThread implements Callable<Double> {
private List<Double> dataList = new ArrayList<>();
public CalcThread() {
for(int i = 0; i < 10000; ++i) {
dataList.add(Math.random());
}
}
@Override
public Double call() throws Exception {
double total = 0;
for(Double d : dataList) {
total += d;
}
return total / dataList.size();
}
}
public static void main(String[] args) {
List<Future<Double>> fList = new ArrayList<>();
ExecutorService es = Executors.newFixedThreadPool(POOL_SIZE);
for(int i = 0; i < POOL_SIZE; ++i) {
fList.add(es.submit(new CalcThread()));
}
for(Future<Double> f : fList) {
try {
System.out.println(f.get());
} catch (Exception e) {
e.printStackTrace();
}
}
es.shutdown();
}
}
Callable接口也是一個單方法接口忆矛,顯然這是一個回調方法,類似于函數(shù)式編程中的回調函數(shù),在Java 8 以前催训,Java中還不能使用Lambda表達式來簡化這種函數(shù)式編程洽议。和Runnable接口不同的是Callable接口的回調方法call方法會返回一個對象,這個對象可以用將來時的方式在線程執(zhí)行結束的時候獲得信息漫拭。上面代碼中的call方法就是將計算出的10000個0到1之間的隨機小數(shù)的平均值返回亚兄,我們通過一個Future接口的對象得到了這個返回值。目前最新的Java版本中采驻,Callable接口和Runnable接口都被打上了@FunctionalInterface的注解审胚,也就是說它可以用函數(shù)式編程的方式(Lambda表達式)創(chuàng)建接口對象。
下面是Future接口的主要方法:
- get():獲取結果礼旅。如果結果還沒有準備好膳叨,get方法會阻塞直到取得結果;當然也可以通過參數(shù)設置阻塞超時時間痘系。
- cancel():在運算結束前取消菲嘴。
- isDone():可以用來判斷運算是否結束。
Java 7中還提供了分支/合并(fork/join)框架碎浇,它可以實現(xiàn)線程池中任務的自動調度,并且這種調度對用戶來說是透明的璃俗。為了達到這種效果奴璃,必須按照用戶指定的方式對任務進行分解,然后再將分解出的小型任務的執(zhí)行結果合并成原來任務的執(zhí)行結果城豁。這顯然是運用了分治法(divide-and-conquer)的思想苟穆。下面的代碼使用了分支/合并框架來計算1到10000的和,當然對于如此簡單的任務根本不需要分支/合并框架唱星,因為分支和合并本身也會帶來一定的開銷雳旅,但是這里我們只是探索一下在代碼中如何使用分支/合并框架,讓我們的代碼能夠充分利用現(xiàn)代多核CPU的強大運算能力间聊。
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
class Calculator extends RecursiveTask<Integer> {
private static final long serialVersionUID = 7333472779649130114L;
private static final int THRESHOLD = 10;
private int start;
private int end;
public Calculator(int start, int end) {
this.start = start;
this.end = end;
}
@Override
public Integer compute() {
int sum = 0;
if ((end - start) < THRESHOLD) { // 當問題分解到可求解程度時直接計算結果
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
int middle = (start + end) >>> 1;
// 將任務一分為二
Calculator left = new Calculator(start, middle);
Calculator right = new Calculator(middle + 1, end);
left.fork();
right.fork();
// 注意:由于此處是遞歸式的任務分解攒盈,也就意味著接下來會二分為四,四分為八...
sum = left.join() + right.join(); // 合并兩個子任務的結果
}
return sum;
}
}
public class Test08 {
public static void main(String[] args) throws Exception {
ForkJoinPool forkJoinPool = new ForkJoinPool();
Future<Integer> result = forkJoinPool.submit(new Calculator(1, 10000));
System.out.println(result.get());
}
}
伴隨著Java 7的到來哎榴,Java中默認的數(shù)組排序算法已經(jīng)不再是經(jīng)典的快速排序(雙樞軸快速排序)了型豁,新的排序算法叫TimSort,它是歸并排序和插入排序的混合體尚蝌,TimSort可以通過分支合并框架充分利用現(xiàn)代處理器的多核特性迎变,從而獲得更好的性能(更短的排序時間)。
版權聲明:本文為CSDN博主「駱昊」的原創(chuàng)文章
原文鏈接:https://blog.csdn.net/jackfrued/article/details/44499227