1.概述
1.1并發(fā)還是并行(Concurrent or Parallel)
- A concurrent program has multiple logical threads of control. These threads may or may not run in parallel.
- A parallel program may or may not have more than one logical thread of control.
- 并發(fā)是問題域中的概念:程序需要設(shè)計(jì)成能夠處理多個(gè)同時(shí)(幾乎同時(shí))發(fā)生的事件;而并行則是方法域中的概念:通過(guò)將問題中的多個(gè)部分并發(fā)執(zhí)行,來(lái)加速解決問題.
- 并發(fā)是同一時(shí)間應(yīng)對(duì)(dealing with)多件事件的處理能力;并行是同一時(shí)間動(dòng)手做(doing)多件事情的能力
- 可以同時(shí)處理多個(gè)問題,但是每次只做一件事,是并發(fā).
我妻子是一位教師停巷。與眾多教師一樣,她極其善于處理多個(gè)任務(wù)芜茵。她雖然每次只能做一件事臀栈,但可以并發(fā)地處理多個(gè)任務(wù)鞋囊。比如,在聽一位學(xué)生朗讀的時(shí)候,她可以暫停學(xué)生的朗讀尿招,以維持課堂秩序,或者回答學(xué)生的問題阱驾。這是并發(fā)就谜,但并不并行(因?yàn)閮H有她一個(gè)人,某一時(shí)刻只能進(jìn)行一件事)里覆。
但如果還有一位助教丧荐,則她們中一位可以聆聽朗讀,而同時(shí)另一位可以回答問題喧枷。這種方式既是并發(fā)虹统,也是并行弓坞。
假設(shè)班級(jí)設(shè)計(jì)了自己的賀卡并要批量制作。一種方法是讓每位學(xué)生制作五枚賀卡车荔。這種方法是并行渡冻,而(從整體看)不是并發(fā),因?yàn)檫@個(gè)過(guò)程整體來(lái)說(shuō)只有一個(gè)任務(wù)忧便。
并發(fā)和并行經(jīng)常被混淆的一個(gè)原因是族吻,傳統(tǒng)的“線程與鎖”模型并沒有顯式支持并行。如果要用線程與鎖模型為多核進(jìn)行開發(fā)珠增,唯一的選擇就是寫一個(gè)并發(fā)的程序超歌,讓其并行地運(yùn)行在多核上。
并發(fā)程序通常是不確定的, 并行程序可能是確定的. 使用一門直接支持并行的編程語(yǔ)言可以寫出并行程序蒂教,而不會(huì)引入不入不 確定性.
1.2 并行架構(gòu)
位級(jí)(bit-level)并行
- 32位計(jì)算機(jī)比8位計(jì)算機(jī)運(yùn)行速度更快,因?yàn)椴⑿?對(duì)于兩個(gè)32位數(shù)的加法巍举,8位計(jì)算 機(jī)必須進(jìn)行多次8位計(jì)算,而32位計(jì)算機(jī)可以一步完成悴品,即并行地處理32位數(shù)的4字節(jié)禀综。
指令級(jí)(instruction-level)并行
- 現(xiàn)代CPU的并行度很高,其中使用的技術(shù)包括流水線苔严、亂序執(zhí)行和猜測(cè)執(zhí)行等定枷。入多核時(shí)代,我們必須面對(duì)的情況是:無(wú)論是表面上還是實(shí)質(zhì)上届氢,指令都不再串行 執(zhí)行了欠窒。這個(gè)就是內(nèi)存可見性的問題啊,由于cpu對(duì)指令的重排序!!重排序問題的引入
數(shù)據(jù)級(jí)(data)并行 數(shù)據(jù)級(jí)并行
- 圖像處理就是一種適合進(jìn)行數(shù)據(jù)級(jí)并行的場(chǎng)景。比如退子,為了增加圖片亮度就需要增加每一個(gè)像 素的亮度♂現(xiàn)代GPU(圖形處理器)也因圖像處理的特點(diǎn)而演化成了極其強(qiáng)大的數(shù)據(jù)并行處理器。
任務(wù)級(jí)(task-level)并行
- 終于來(lái)到了大家所默認(rèn)的并行形式——多處理器寂祥。從程序員的角度來(lái)看荐虐,多處理器架構(gòu)最明 顯的分類特征是其內(nèi)存模型(共享內(nèi)存模型或分布式內(nèi)存模型)。
- 共享內(nèi)存模型(通過(guò)內(nèi)存通信)
- 分布式內(nèi)存模型( 通過(guò)網(wǎng)絡(luò)通信 )
1.3 并發(fā):不只是多核
- 并發(fā)的世界丸凭,并發(fā)的軟件
- 分布式的世界福扬,分布式的軟件
- 不可預(yù)測(cè)的世界,容錯(cuò)性強(qiáng)的軟件
- 復(fù)雜的世界惜犀,簡(jiǎn)單的軟件
- 在選對(duì)編程語(yǔ)言和工具的情況下铛碑,比起串行的等價(jià)解決方案,一個(gè)并發(fā)的解決方案會(huì)更簡(jiǎn)潔清晰虽界。
- 如果解決方案有著與問題類似的并發(fā)結(jié)構(gòu)汽烦,就會(huì)簡(jiǎn)單許多:我們不需要?jiǎng)?chuàng)建一個(gè)復(fù)雜的線程來(lái)處理問題中的多個(gè)任務(wù),只需要用多個(gè)簡(jiǎn)單的線程分別處理不同的任務(wù)即可莉御。
1.4 七個(gè)模型
線程與鎖
- 線程與鎖模型有很多眾所周知的不足撇吞,但仍是其他模型的技術(shù)基礎(chǔ)俗冻,也是很多并發(fā)軟件開發(fā)的首選。
函數(shù)式編程
- 函數(shù)式編程日漸重要的原因之一梢夯,是其對(duì)并發(fā)編程和并行編程提供了良好的支持言疗。函數(shù)式編程消除了可變狀態(tài),所以從根本上是線程安全的颂砸,而且易于并行執(zhí)行噪奄。
Clojure之道——分離標(biāo)識(shí)與狀態(tài)
- 編程語(yǔ)言Clojure是一種指令式編程和函數(shù)式編程的混搭方案,在兩種編程方式上取得了微妙的平衡來(lái)發(fā)揮兩者的優(yōu)勢(shì)人乓。
actor
- actor模型是一種適用性很廣的并發(fā)編程模型勤篮,適用于共享內(nèi)存模型和分布式內(nèi)存模型,也適合解決地理分布型問題色罚,能提供強(qiáng)大的容錯(cuò)性碰缔。
通信順序進(jìn)程(Communicating Sequential Processes,CSP)
- 表面上看戳护,CSP模型與actor模型很相似金抡,兩者都基于消息傳遞。不過(guò)CSP模型側(cè)重于傳遞信息的通道腌且,而actor模型側(cè)重于通道兩端的實(shí)體梗肝,使用CSP模型的代碼會(huì)帶有明顯不同的風(fēng)格。
數(shù)據(jù)級(jí)并行
- 每個(gè)筆記本電腦里都藏著一臺(tái)超級(jí)計(jì)算機(jī)——GPU铺董。GPU利用了數(shù)據(jù)級(jí)并行巫击,不僅可以快速進(jìn)行圖像處理,也可以用于更廣闊的領(lǐng)域精续。如果要進(jìn)行有限元分析坝锰、流體力學(xué)計(jì)算或其他的大量數(shù)字計(jì)算,GPU的性能將是不二選擇重付。
Lambda架構(gòu)
- 大數(shù)據(jù)時(shí)代的到來(lái)離不開并行——現(xiàn)在我們只需要增加計(jì)算資源顷级,就能具有處理TB級(jí)數(shù)據(jù)的能力。Lambda架構(gòu)綜合了MapReduce和流式處理的特點(diǎn)确垫,是一種可以處理多種大數(shù)據(jù)問題的架構(gòu)愕把。
2.線程與鎖
2.1 簡(jiǎn)單粗暴
- 線程與鎖其實(shí)是是對(duì)底層硬件運(yùn)行過(guò)程的形式化.這是他的最大優(yōu)點(diǎn)也是最大缺點(diǎn)
- 現(xiàn)在的優(yōu)秀代碼很少直接使用底層服務(wù):不應(yīng)在產(chǎn)品代碼上直接使用Thread類等底層服務(wù)
2.2 第一天:互斥和內(nèi)存模型
競(jìng)態(tài)條件
內(nèi)存可見性
class Counter {
private int count = 0;
public synchronized void increment() { ++count; } ?
public int getCount() { return count; }
}
- 這段代碼沒有競(jìng)態(tài)條件的bug 但是又內(nèi)存可見性的bug 因?yàn)間etCount()沒有加鎖,調(diào)用getCount()可能獲得一個(gè)失效的值
死鎖
- 哲學(xué)家進(jìn)餐問題
class Philosopher extends Thread {
private Chopstick left, right;
private Random random;
public Philosopher(Chopstick left, Chopstick right) {
this.left = left; this.right = right;
random = new Random();
}
public void run() {
try {
while(true) {
Thread.sleep(random.nextInt(1000)); // Think for a while
synchronized(left) { // Grab left chopstick //
synchronized(right) { // Grab right chopstick // 15
Thread.sleep(random.nextInt(1000)); // Eat for a while
}
}
}
} catch(InterruptedException e) {}
}
}
- 創(chuàng)建五個(gè)哲學(xué)家實(shí)例,這個(gè)程序可以愉快的運(yùn)行很久,但到某個(gè)時(shí)刻一切會(huì)停下來(lái):如果所有哲學(xué)家同時(shí)進(jìn)餐,就會(huì)死鎖(相鄰的幾個(gè)同時(shí)準(zhǔn)備進(jìn)餐還不至于會(huì)死鎖)
- 一個(gè)線程想使用多把鎖的時(shí)候就要考慮死鎖的可能,有一個(gè)簡(jiǎn)單的規(guī)則還有避免死鎖:總是按照一個(gè)全局的固定的順序獲取多把鎖,其中一種實(shí)現(xiàn)如下:
class Philosopher extends Thread {
private Chopstick first, second;
private Random random;
private int thinkCount;
public Philosopher(Chopstick left, Chopstick right) {
if(left.getId() < right.getId()) {
first = left; second = right;
} else {
first = right; second = left;
}
random = new Random();
}
public void run() {
try {
while(true) {
++thinkCount;
if (thinkCount % 10 == 0)
System.out.println("Philosopher " + this + " has thought " + thinkCount + " times");
Thread.sleep(random.nextInt(1000)); // Think for a while
synchronized(first) { // Grab first chopstick
synchronized(second) { // Grab second chopstick
Thread.sleep(random.nextInt(1000)); // Eat for a while
}
}
}
} catch(InterruptedException e) {}
}
}
- 程序解釋:當(dāng)所有人同時(shí)決定進(jìn)餐的時(shí)候,ABCD左手分別拿起1234號(hào)筷子(對(duì)于他們小的編號(hào)的筷子還是在左手),這和上面的程序沒啥不同,但是差別就在這個(gè)E,他左邊的筷子是大編號(hào),所以他左手拿的是1,然而1被A拿了,所以他就一只筷子都拿不到,所以D可以正常進(jìn)餐,就不會(huì)死鎖
- 局限:獲取鎖的代碼寫的比較集中的話,有利于維護(hù)這個(gè)全局順序,但是對(duì)于規(guī)模比較大的程序,使用鎖的地方比較零散,各處都遵守這個(gè)順序就顯得不太實(shí)際.
- 技巧:使用對(duì)象的散列值作為鎖的全局順序
- 優(yōu)點(diǎn):適用于所有java對(duì)象,不用為鎖對(duì)象專門定義并維護(hù)一個(gè)順序,
- 缺點(diǎn):但是對(duì)象的散列值不能保證唯一性(雖然幾率很小), 不是迫不得已不要使用
if(System.identityHashCode(left) < System.identityHashCode(right)) {
first = left; second = right;
} else {
first = right; second = left;
}
來(lái)自外星方法的危害
private synchronized void updateProgress(int n) {
for (ProgressListener listener: listeners) // listeners是累的一個(gè)field
listener.onProgress(n);
}
- 上面的方法乍一看好像沒啥問題,但是這個(gè)方法調(diào)用了onProgress()方法,我們對(duì)這個(gè)方法一無(wú)所知, 要是他里面還有一把鎖,就可能會(huì)死鎖
- 解決方案:在遍歷前對(duì)listeners進(jìn)行保護(hù)性復(fù)制(defensive copy),再針對(duì)這份副本進(jìn)行遍歷
private void updateProgress(int n) {
ArrayList<ProgressListener> listenersCopy;
synchronized(this) {
listenersCopy = (ArrayList<ProgressListener>)listeners.clone();
}
for (ProgressListener listener: listenersCopy)
listener.onProgress(n);
}
- 這個(gè)方案一石多鳥,不僅調(diào)用外星方法的時(shí)候不用加鎖,而且還大大減少了代碼持有鎖的時(shí)間(前面是對(duì)方法加synchronized,這里是對(duì)語(yǔ)句塊)
避免危害的準(zhǔn)則
- 1.對(duì)共享變量的所有訪問都需要同步化(讀臟數(shù)據(jù),競(jìng)態(tài)條件)
- 2.讀線程和寫線程都需要同步化(內(nèi)存可見性)
- 3.按照約定的全局順序獲取多把鎖(死鎖)
- 4.當(dāng)持有鎖的時(shí)候避免調(diào)用外星方法(你對(duì)外星方法一無(wú)所知,要是他里面有鎖,就會(huì)死鎖)
- 5.持有鎖的時(shí)間盡可能短
2.3第二天:超越內(nèi)置鎖
- ReentrantLock提供了顯式的lock和unlock
Lock lock = new ReentrantLock();
lock.lock();
try{
//使用共享資源
} finally { //使用finally確保鎖釋放
lock.unlock();
}
可中斷的鎖
- 使用內(nèi)置鎖,由于阻塞的線程無(wú)法被中斷,所以程序不可能從死鎖中恢復(fù),可以用ReentrantLock代替內(nèi)置鎖,使用它的lockInterruptibly 在下面的程序中使用Thread.interrupt()可以讓線程終止(這里說(shuō)的都是死鎖情況下)
final ReentrantLock l1 = new ReentrantLock();
final ReentrantLock l2 = new ReentrantLock();
Thread t1 = new Thread() {
public void run() {
try {
l1.lockInterruptibly();
Thread.sleep(1000);
l2.lockInterruptibly();
} catch (InterruptedException e) { System.out.println("t1 interrupted"); }
}
};
超時(shí)
- ReentrantLock突破了內(nèi)置鎖的另一個(gè)限制:可以為獲取鎖的操作設(shè)置超時(shí)時(shí)間,可以用這種方式來(lái)解決哲學(xué)家進(jìn)餐問題
class Philosopher extends Thread {
private ReentrantLock leftChopstick, rightChopstick;
private Random random;
private int thinkCount;
public Philosopher(ReentrantLock leftChopstick, ReentrantLock rightChopstick) {
this.leftChopstick = leftChopstick; this.rightChopstick = rightChopstick;
random = new Random();
}
public void run() {
try {
while(true) {
++thinkCount;
if (thinkCount % 10 == 0)
System.out.println("Philosopher " + this + " has thought " + thinkCount + " times");
Thread.sleep(random.nextInt(1000)); // Think for a while
leftChopstick.lock();
try {
if (rightChopstick.tryLock(1000, TimeUnit.MILLISECONDS)) {
// Got the right chopstick
try {
Thread.sleep(random.nextInt(1000)); // Eat for a while
} finally { rightChopstick.unlock(); }
} else {
// Didn't get the right chopstick - give up and go back to thinking
System.out.println("Philosopher " + this + " timed out");
}
} finally { leftChopstick.unlock(); }
}
} catch(InterruptedException e) {}
}
}
- 雖然上述代碼不會(huì)死鎖,但也不是一個(gè)足夠好的方案,后面有更好的方案
- 1.這個(gè)方案不能避免死鎖,只能避免無(wú)盡的死鎖 只是提供了從死鎖中恢復(fù)的手段
- 2.會(huì)受到活鎖現(xiàn)象,如果所有死鎖線程同時(shí)超時(shí),它們極有可能再次陷入死鎖,雖然死鎖沒有永遠(yuǎn)持續(xù)下去,但對(duì)資源的爭(zhēng)奪狀況沒有得到任何改善(可以用一些方法減少活鎖的幾率,比如為每個(gè)線程設(shè)置不同的超時(shí)時(shí)間)
交替鎖(hand-over-hand locking)
交替鎖可以只鎖住鏈表的一部分,允許不涉及被鎖部分的其他線程自由訪問鏈表.插入新的鏈表節(jié)點(diǎn)時(shí),需要將待插入位置兩邊的節(jié)點(diǎn)加鎖.首先鎖住鏈表的前兩個(gè)節(jié)點(diǎn),如果這兩個(gè)節(jié)點(diǎn)之間不是待插入的位置,那么就解鎖第一個(gè),并鎖住第三個(gè),以此類推,知道找到待插入位置并插入新的節(jié)點(diǎn),最后解鎖兩邊的節(jié)點(diǎn)
這種交替型的加鎖和解鎖順序無(wú)法用內(nèi)置鎖實(shí)現(xiàn),使用ReentrantLock可以
class ConcurrentSortedList {
private class Node {
int value;
Node prev;
Node next;
ReentrantLock lock = new ReentrantLock();
Node() {}
Node(int value, Node prev, Node next) {
this.value = value; this.prev = prev; this.next = next;
}
}
private final Node head;
private final Node tail;
public ConcurrentSortedList() {
head = new Node(); tail = new Node();
head.next = tail; tail.prev = head;
}
//insert方法是有序的 遍歷列表直到找到第一個(gè)值小于等于新插入的值得節(jié)點(diǎn),在這個(gè)位置插入
public void insert(int value) {
Node current = head;
current.lock.lock();
Node next = current.next;
try {
while (true) {
next.lock.lock();
try {
if (next == tail || next.value < value) {
Node node = new Node(value, current, next);
next.prev = node;
current.next = node;
//!!!這里return要在兩個(gè)finally都執(zhí)行完后才會(huì)執(zhí)行啊!!!但只是finally里的.不過(guò)要是return換成exit(0)就直接退出了
return;
}
} finally { current.lock.unlock(); }
current = next;
next = current.next;
}
} finally { next.lock.unlock(); }
}
public int size() {
Node current = tail;
int count = 0;
while (current.prev != head) {
ReentrantLock lock = current.lock;
lock.lock();
try {
++count;
current = current.prev;
} finally { lock.unlock(); }
}
return count;
}
public boolean isSorted() {
Node current = head;
while (current.next.next != tail) {
current = current.next;
if (current.value < current.next.value)
return false;
}
return true;
}
}
class LinkedList {
public static void main(String[] args) throws InterruptedException {
final ConcurrentSortedList list = new ConcurrentSortedList();
final Random random = new Random();
class TestThread extends Thread {
public void run() {
for (int i = 0; i < 10000; ++i)
list.insert(random.nextInt());
}
}
class CountingThread extends Thread {
public void run() {
while (!interrupted()) {
System.out.print("\r" + list.size());
System.out.flush();
}
}
}
Thread t1 = new TestThread();
Thread t2 = new TestThread();
Thread t3 = new CountingThread();
//注意一下這里的用法 這里先join再interrupted的用法
t1.start(); t2.start(); t3.start();
t1.join(); t2.join();
t3.interrupt();
System.out.println("\r" + list.size());
if (list.size() != 20000)
System.out.println("*** Wrong size!");
if (!list.isSorted())
System.out.println("*** Not sorted!");
}
}
- 26行的 next.lock.lock();鎖住了頭,36行的 next.lock.lock();鎖住了下一個(gè)節(jié)點(diǎn). if ( next == tail || next.value < value )判斷兩個(gè)節(jié)點(diǎn)之間是否是待插入位置,如果不是,在38行的finally解鎖 current.lock.unlock();并繼續(xù)遍歷,如果找到待插入位置,33~36行構(gòu)造新節(jié)點(diǎn)并將其插入鏈表后返回.兩把鎖的解鎖操作在倆finally塊中進(jìn)行
- 這種方案可以讓多個(gè)線程并發(fā)的進(jìn)行鏈表插入操作
條件變量
- 并發(fā)編程經(jīng)常需要等待某個(gè)事件的發(fā)生.比如隊(duì)列刪除元素前需要等待隊(duì)列非空等等
- 按照下面的模式使用條件變量
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
try {
while (! ? condition is true ? ) {
condition.await();
}
//使用共享資源
} finally { lock.unlock(); }
- 為何要在一個(gè)循環(huán)中循環(huán)調(diào)用await():當(dāng)另一個(gè)線程調(diào)用了signal()或signalAll(),意味著對(duì)應(yīng)的條件可能為真,await()將原子地恢復(fù)運(yùn)行并重新加鎖.從await()返回后需要重新檢查等待的條件是否為真,必要的話可能再次調(diào)用await()并阻塞
- 哲學(xué)家進(jìn)餐問題新解決方法
class Philosopher extends Thread {
private boolean eating;
private Philosopher left;
private Philosopher right;
private ReentrantLock table;
private Condition condition;
private Random random;
private int thinkCount;
public Philosopher ( ReentrantLock table ) {
eating = false;
this.table = table;
condition = table.newCondition();
random = new Random();
}
public void setLeft ( Philosopher left ) {
this.left = left;
}
public void setRight ( Philosopher right ) {
this.right = right;
}
public void run () {
try {
while ( true ) {
think();
eat();
}
}
catch ( InterruptedException e ) {
}
}
private void think () throws InterruptedException {
table.lock();
try {
eating = false;
left.condition.signal();
right.condition.signal();
} finally {
table.unlock();
}
++thinkCount;
if ( thinkCount % 10 == 0 ) {
System.out.println( "Philosopher " + this + " has thought " + thinkCount + " times" );
}
Thread.sleep( 1000 );
}
private void eat () throws InterruptedException {
table.lock();
try {
while ( left.eating || right.eating ) {
condition.await();
}
eating = true;
} finally {
table.unlock();
}
Thread.sleep( 1000 );
}
}
- 現(xiàn)在沒有筷子類,現(xiàn)在僅當(dāng)哲學(xué)家的左右鄰座都沒有進(jìn)餐的時(shí)候,他才可以進(jìn)餐
- 當(dāng)一個(gè)哲學(xué)家饑餓的時(shí)候,他首先鎖住餐桌,這樣其他哲學(xué)家無(wú)法改變狀態(tài)(進(jìn)餐/思考),然后查看左右的哲學(xué)家有沒有在進(jìn)餐,沒有的話開始進(jìn)餐并解鎖餐桌,否則調(diào)用await(),解鎖餐桌
- 當(dāng)一個(gè)哲學(xué)家進(jìn)餐結(jié)束開始思考的時(shí)候,他首先鎖住餐桌并將eating設(shè)置為false,然后通知左鄰右舍可以進(jìn)餐了,最后解鎖餐桌.
- 之前的解決方案經(jīng)常只有一個(gè)哲學(xué)家能進(jìn)餐,其他人都持有一根筷子在等,這個(gè)方案中當(dāng)一個(gè)哲學(xué)家理論上可以進(jìn)餐,他肯定可以進(jìn)餐
原子變量
- 原子變量是無(wú)鎖非阻塞算法的基礎(chǔ)
- volatile是一種低級(jí)形式的同步,他的適用場(chǎng)景也越來(lái)越少,如果你要使用volatile,可以在atomic包中尋找更適合的工具
2.4 站在巨人的肩膀上
寫入時(shí)復(fù)制
- 之前有用到保護(hù)性復(fù)制,Java標(biāo)準(zhǔn)庫(kù)提供了更優(yōu)雅的現(xiàn)成的方案--CopyOnWriteArrayList,他不是在遍歷列表前進(jìn)行復(fù)制,而是在列表被修改時(shí)進(jìn)行
- 先將當(dāng)前容器進(jìn)行Copy,復(fù)制出一個(gè)新的容器森爽,然后新的容器里添加元素,添加完元素之后嚣镜,再將原容器的引用指向新的容器 所以CopyOnWrite容器也是一種讀寫分離的思想爬迟,讀和寫不同的容器。CopyOnWriteArrayList適合使用在讀操作遠(yuǎn)遠(yuǎn)大于寫操作的場(chǎng)景里菊匿,比如緩存
- 缺點(diǎn):
- 內(nèi)存占用問題
- 數(shù)據(jù)一致性問題:CopyOnWrite容器只能保證數(shù)據(jù)的最終一致性付呕,不能保證數(shù)據(jù)的實(shí)時(shí)一致性计福。所以如果你希望寫入的的數(shù)據(jù),馬上能讀到徽职,請(qǐng)不要使用CopyOnWrite容器象颖。
//Downloader.java
private CopyOnWriteArrayList<ProgressListener> listeners;
public void addListener(ProgressListener listener) {
listeners.add(listener);
}
public void removeListener(ProgressListener listener) {
listeners.remove(listener);
}
private void updateProgress(int n) {
for (ProgressListener listener: listeners)
listener.onProgress(n);
}
一個(gè)完整的程序
- Q:wiki上出現(xiàn)頻率最高的詞
版本一的并行:生產(chǎn)者和消費(fèi)者(串行的我略過(guò)了)
//生產(chǎn)者 將page加到隊(duì)尾
class Parser implements Runnable {
private BlockingQueue<Page> queue;
public Parser(BlockingQueue<Page> queue) {
this.queue = queue;
}
public void run() {
try {
Iterable<Page> pages = new Pages(100000, "enwiki.xml");
for (Page page: pages)
queue.put(page);
} catch (Exception e) { e.printStackTrace(); }
}
}
//消費(fèi)者
class Counter implements Runnable {
private BlockingQueue<Page> queue;
private Map<String, Integer> counts;
public Counter(BlockingQueue<Page> queue,
Map<String, Integer> counts) {
this.queue = queue;
this.counts = counts;
}
public void run() {
try {
while(true) {
Page page = queue.take();
if (page.isPoisonPill())
break;
Iterable<String> words = new Words(page.getText());
for (String word: words)
countWord(word);
}
} catch (Exception e) { e.printStackTrace(); }
}
private void countWord(String word) {
Integer currentCount = counts.get(word);
if (currentCount == null)
counts.put(word, 1);
else
counts.put(word, currentCount + 1);
}
}
- 最后創(chuàng)建兩個(gè)線程運(yùn)行
public static void main(String[] args) throws Exception {
ArrayBlockingQueue<Page> queue = new ArrayBlockingQueue<Page>(100);
HashMap<String, Integer> counts = new HashMap<String, Integer>();
Thread counter = new Thread(new Counter(queue, counts));
Thread parser = new Thread(new Parser(queue));
long start = System.currentTimeMillis();
counter.start();
parser.start();
parser.join();
queue.put(new PoisonPill());
counter.join();
long end = System.currentTimeMillis();
System.out.println("Elapsed time: " + (end - start) + "ms");
}
程序解釋:該程序由兩個(gè)線程在跑.一個(gè)讀取一個(gè)分析,性能還不是最高.這里
ArrayBlockingQueue<Page> queue = new ArrayBlockingQueue<Page>(100);
用了一個(gè)阻塞的并發(fā)隊(duì)列來(lái)存放讀取到的page.這個(gè)并發(fā)隊(duì)列很適合實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式,提供了高效的并發(fā)方法put()和take(),這些方法會(huì)在必要時(shí)阻塞:當(dāng)一個(gè)空隊(duì)列調(diào)用take(一個(gè)滿隊(duì)列調(diào)用put()),程序會(huì)阻塞直到隊(duì)列變?yōu)榉强?非滿)為什么要用阻塞隊(duì)列?
concurrent包不僅提供了阻塞隊(duì)列,還提供了一種容量無(wú)限,操作不需等待,非阻塞的隊(duì)列ConcurrentLinkedQueue.為何不用他?關(guān)鍵在與生產(chǎn)者和消費(fèi)者幾乎不會(huì)保持相同的速度,當(dāng)生產(chǎn)者速度快于消費(fèi)者,生產(chǎn)者越來(lái)越大的時(shí)候,會(huì)撐爆內(nèi)存.相比之下,阻塞隊(duì)列只允許生產(chǎn)者的速度在一定程度上超過(guò)消費(fèi)者的速度,但不會(huì)超過(guò)很多.
第二個(gè)版本:多個(gè)消費(fèi)者
- 上個(gè)版本的解析文件花了10s,統(tǒng)計(jì)花了95s,一共花了95s.進(jìn)一步優(yōu)化就對(duì)統(tǒng)計(jì)過(guò)程進(jìn)行并行化,建立多個(gè)消費(fèi)者.(他這里還是用一個(gè)count,不同的消費(fèi)者都寫這一個(gè)count,所以要加鎖)
private void countWord(String word) {
lock.lock();
try {
Integer currentCount = counts.get(word);
if (currentCount == null) counts.put(word, 1);
else counts.put(word, currentCount + 1);
} finally { lock.unlock(); }
}
- 運(yùn)行多個(gè)消費(fèi)者
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < NUM_COUNTERS; ++i)
executor.execute(new Counter(queue, counts));
Thread parser = new Thread(new Parser(queue));
parser.start();
parser.join();
for (int i = 0; i < NUM_COUNTERS; ++i)
queue.put(new PoisonPill());
executor.shutdown();
- 但是一運(yùn)行發(fā)現(xiàn)比串行還慢一半.主要原因就是過(guò)多的線程嘗試訪問同一個(gè)共享資源,等待的時(shí)間比運(yùn)行的時(shí)間還長(zhǎng).改用ConcurrentHashMap(使用了鎖分段)
//改用 ConcurrentHashMap
private void countWord(String word) {
while (true) { //理解一下這里的循環(huán) 如果下面的操作沒有成功的話,就重試
Integer currentCount = counts.get(word);
if (currentCount == null) {
if (counts.putIfAbsent(word, 1) == null) //如果沒有與1關(guān)聯(lián) 則關(guān)聯(lián),有原子性
break;
} else if (counts.replace(word, currentCount, currentCount + 1)) {
break;
}
}
- 這次的測(cè)速要好很多,但是沒有理論上的提速.因?yàn)橄M(fèi)者對(duì)conuts有一些不必要的競(jìng)爭(zhēng),與其所有消費(fèi)者都共享一個(gè)counts,不如每個(gè)消費(fèi)者各自維護(hù)一個(gè)計(jì)數(shù)map,再對(duì)這些計(jì)數(shù)map進(jìn)行合并
class Counter implements Runnable {
private BlockingQueue<Page> queue;
private ConcurrentMap<String, Integer> counts;
private HashMap<String, Integer> localCounts;
public Counter(BlockingQueue<Page> queue,
ConcurrentMap<String, Integer> counts) {
this.queue = queue;
this.counts = counts;
localCounts = new HashMap<String, Integer>();
}
public void run() {
try {
while(true) {
Page page = queue.take();
if (page.isPoisonPill())
break;
Iterable<String> words = new Words(page.getText());
for (String word: words)
countWord(word);
}
//所以計(jì)數(shù)的那個(gè)可以是普通的map 他只在自己的線程里
mergeCounts();
} catch (Exception e) { e.printStackTrace(); }
}
private void countWord(String word) {
Integer currentCount = localCounts.get(word);
if (currentCount == null)
localCounts.put(word, 1);
else
localCounts.put(word, currentCount + 1);
}
private void mergeCounts() {
for (Map.Entry<String, Integer> e: localCounts.entrySet()) {
String word = e.getKey();
Integer count = e.getValue();
while (true) {
Integer currentCount = counts.get(word);
if (currentCount == null) {
if (counts.putIfAbsent(word, count) == null)
break;
} else if (counts.replace(word, currentCount, currentCount + count)) {
break;
}
}
}
}
}
第三天總結(jié)
- 1.使用線程池,不要直接創(chuàng)建線程
- 2.使用CopyOnWriteArrayList讓監(jiān)聽器相關(guān)的代碼更簡(jiǎn)單高效
- 3.使用ArrayBlockingQueue讓生產(chǎn)者和消費(fèi)者之間高效合作
- 4.ConcurrentHashMap提供了更好的并發(fā)訪問
2.5 復(fù)習(xí)
- 線程與鎖的缺點(diǎn):沒有為并行提供直接的支持,對(duì)于程序員,編程語(yǔ)言層面沒有提供足夠的幫助
- 應(yīng)用多線程的難點(diǎn)不在編程,而在于難以測(cè)試,多線程的bug很難重現(xiàn).可維護(hù)性更讓人頭疼,如果不能對(duì)多線程問題進(jìn)行可靠的測(cè)試,就無(wú)法對(duì)多線程進(jìn)行可靠的重構(gòu).使用其他不那么底層的模型會(huì)好一些.