七周七并發(fā)(線程與鎖)

1.概述

1.1并發(fā)還是并行(Concurrent or Parallel)

  • A concurrent program has multiple logical threads of control. These threads may or may not run in parallel.
  • A parallel program may or may not have more than one logical thread of control.
  • 并發(fā)是問題域中的概念:程序需要設(shè)計(jì)成能夠處理多個(gè)同時(shí)(幾乎同時(shí))發(fā)生的事件;而并行則是方法域中的概念:通過(guò)將問題中的多個(gè)部分并發(fā)執(zhí)行,來(lái)加速解決問題.
  • 并發(fā)是同一時(shí)間應(yīng)對(duì)(dealing with)多件事件的處理能力;并行是同一時(shí)間動(dòng)手做(doing)多件事情的能力
  • 可以同時(shí)處理多個(gè)問題,但是每次只做一件事,是并發(fā).
  • 我妻子是一位教師停巷。與眾多教師一樣,她極其善于處理多個(gè)任務(wù)芜茵。她雖然每次只能做一件事臀栈,但可以并發(fā)地處理多個(gè)任務(wù)鞋囊。比如,在聽一位學(xué)生朗讀的時(shí)候,她可以暫停學(xué)生的朗讀尿招,以維持課堂秩序,或者回答學(xué)生的問題阱驾。這是并發(fā)就谜,但并不并行(因?yàn)閮H有她一個(gè)人,某一時(shí)刻只能進(jìn)行一件事)里覆。
    但如果還有一位助教丧荐,則她們中一位可以聆聽朗讀,而同時(shí)另一位可以回答問題喧枷。這種方式既是并發(fā)虹统,也是并行弓坞。
    假設(shè)班級(jí)設(shè)計(jì)了自己的賀卡并要批量制作。一種方法是讓每位學(xué)生制作五枚賀卡车荔。這種方法是并行渡冻,而(從整體看)不是并發(fā),因?yàn)檫@個(gè)過(guò)程整體來(lái)說(shuō)只有一個(gè)任務(wù)忧便。

  • 并發(fā)和并行經(jīng)常被混淆的一個(gè)原因是族吻,傳統(tǒng)的“線程與鎖”模型并沒有顯式支持并行。如果要用線程與鎖模型為多核進(jìn)行開發(fā)珠增,唯一的選擇就是寫一個(gè)并發(fā)的程序超歌,讓其并行地運(yùn)行在多核上。

  • 并發(fā)程序通常是不確定的, 并行程序可能是確定的. 使用一門直接支持并行的編程語(yǔ)言可以寫出并行程序蒂教,而不會(huì)引入不入不 確定性.

1.2 并行架構(gòu)

位級(jí)(bit-level)并行

  • 32位計(jì)算機(jī)比8位計(jì)算機(jī)運(yùn)行速度更快,因?yàn)椴⑿?對(duì)于兩個(gè)32位數(shù)的加法巍举,8位計(jì)算 機(jī)必須進(jìn)行多次8位計(jì)算,而32位計(jì)算機(jī)可以一步完成悴品,即并行地處理32位數(shù)的4字節(jié)禀综。

指令級(jí)(instruction-level)并行

  • 現(xiàn)代CPU的并行度很高,其中使用的技術(shù)包括流水線苔严、亂序執(zhí)行和猜測(cè)執(zhí)行等定枷。入多核時(shí)代,我們必須面對(duì)的情況是:無(wú)論是表面上還是實(shí)質(zhì)上届氢,指令都不再串行 執(zhí)行了欠窒。這個(gè)就是內(nèi)存可見性的問題啊,由于cpu對(duì)指令的重排序!!重排序問題的引入

數(shù)據(jù)級(jí)(data)并行 數(shù)據(jù)級(jí)并行

  • 圖像處理就是一種適合進(jìn)行數(shù)據(jù)級(jí)并行的場(chǎng)景。比如退子,為了增加圖片亮度就需要增加每一個(gè)像 素的亮度♂現(xiàn)代GPU(圖形處理器)也因圖像處理的特點(diǎn)而演化成了極其強(qiáng)大的數(shù)據(jù)并行處理器。

任務(wù)級(jí)(task-level)并行

  • 終于來(lái)到了大家所默認(rèn)的并行形式——多處理器寂祥。從程序員的角度來(lái)看荐虐,多處理器架構(gòu)最明 顯的分類特征是其內(nèi)存模型(共享內(nèi)存模型或分布式內(nèi)存模型)。
  • 共享內(nèi)存模型(通過(guò)內(nèi)存通信)
  • 分布式內(nèi)存模型( 通過(guò)網(wǎng)絡(luò)通信 )

1.3 并發(fā):不只是多核

  • 并發(fā)的世界丸凭,并發(fā)的軟件
  • 分布式的世界福扬,分布式的軟件
  • 不可預(yù)測(cè)的世界,容錯(cuò)性強(qiáng)的軟件
  • 復(fù)雜的世界惜犀,簡(jiǎn)單的軟件
  • 在選對(duì)編程語(yǔ)言和工具的情況下铛碑,比起串行的等價(jià)解決方案,一個(gè)并發(fā)的解決方案會(huì)更簡(jiǎn)潔清晰虽界。
  • 如果解決方案有著與問題類似的并發(fā)結(jié)構(gòu)汽烦,就會(huì)簡(jiǎn)單許多:我們不需要?jiǎng)?chuàng)建一個(gè)復(fù)雜的線程來(lái)處理問題中的多個(gè)任務(wù),只需要用多個(gè)簡(jiǎn)單的線程分別處理不同的任務(wù)即可莉御。

1.4 七個(gè)模型

線程與鎖

  • 線程與鎖模型有很多眾所周知的不足撇吞,但仍是其他模型的技術(shù)基礎(chǔ)俗冻,也是很多并發(fā)軟件開發(fā)的首選。

函數(shù)式編程

  • 函數(shù)式編程日漸重要的原因之一梢夯,是其對(duì)并發(fā)編程和并行編程提供了良好的支持言疗。函數(shù)式編程消除了可變狀態(tài),所以從根本上是線程安全的颂砸,而且易于并行執(zhí)行噪奄。

Clojure之道——分離標(biāo)識(shí)與狀態(tài)

  • 編程語(yǔ)言Clojure是一種指令式編程和函數(shù)式編程的混搭方案,在兩種編程方式上取得了微妙的平衡來(lái)發(fā)揮兩者的優(yōu)勢(shì)人乓。

actor

  • actor模型是一種適用性很廣的并發(fā)編程模型勤篮,適用于共享內(nèi)存模型和分布式內(nèi)存模型,也適合解決地理分布型問題色罚,能提供強(qiáng)大的容錯(cuò)性碰缔。

通信順序進(jìn)程(Communicating Sequential Processes,CSP)

  • 表面上看戳护,CSP模型與actor模型很相似金抡,兩者都基于消息傳遞。不過(guò)CSP模型側(cè)重于傳遞信息的通道腌且,而actor模型側(cè)重于通道兩端的實(shí)體梗肝,使用CSP模型的代碼會(huì)帶有明顯不同的風(fēng)格。

數(shù)據(jù)級(jí)并行

  • 每個(gè)筆記本電腦里都藏著一臺(tái)超級(jí)計(jì)算機(jī)——GPU铺董。GPU利用了數(shù)據(jù)級(jí)并行巫击,不僅可以快速進(jìn)行圖像處理,也可以用于更廣闊的領(lǐng)域精续。如果要進(jìn)行有限元分析坝锰、流體力學(xué)計(jì)算或其他的大量數(shù)字計(jì)算,GPU的性能將是不二選擇重付。

Lambda架構(gòu)

  • 大數(shù)據(jù)時(shí)代的到來(lái)離不開并行——現(xiàn)在我們只需要增加計(jì)算資源顷级,就能具有處理TB級(jí)數(shù)據(jù)的能力。Lambda架構(gòu)綜合了MapReduce和流式處理的特點(diǎn)确垫,是一種可以處理多種大數(shù)據(jù)問題的架構(gòu)愕把。

2.線程與鎖

2.1 簡(jiǎn)單粗暴

  • 線程與鎖其實(shí)是是對(duì)底層硬件運(yùn)行過(guò)程的形式化.這是他的最大優(yōu)點(diǎn)也是最大缺點(diǎn)
  • 現(xiàn)在的優(yōu)秀代碼很少直接使用底層服務(wù):不應(yīng)在產(chǎn)品代碼上直接使用Thread類等底層服務(wù)

2.2 第一天:互斥和內(nèi)存模型

競(jìng)態(tài)條件

內(nèi)存可見性

class Counter {
    private int count = 0;
    public synchronized void increment() { ++count; } ?
    public int getCount() { return count; }
}
  • 這段代碼沒有競(jìng)態(tài)條件的bug 但是又內(nèi)存可見性的bug 因?yàn)間etCount()沒有加鎖,調(diào)用getCount()可能獲得一個(gè)失效的值

死鎖

  • 哲學(xué)家進(jìn)餐問題
class Philosopher extends Thread {
    private Chopstick left, right;
    private Random random;

    public Philosopher(Chopstick left, Chopstick right) {
        this.left = left; this.right = right;
                random = new Random();
    }

    public void run() {
        try {
            while(true) {
                    Thread.sleep(random.nextInt(1000)); // Think for a while
                synchronized(left) { // Grab left chopstick //
                    synchronized(right) { // Grab right chopstick // 15
                        Thread.sleep(random.nextInt(1000)); // Eat for a while
                    }
                }
            }
        } catch(InterruptedException e) {}
    }
}
  • 創(chuàng)建五個(gè)哲學(xué)家實(shí)例,這個(gè)程序可以愉快的運(yùn)行很久,但到某個(gè)時(shí)刻一切會(huì)停下來(lái):如果所有哲學(xué)家同時(shí)進(jìn)餐,就會(huì)死鎖(相鄰的幾個(gè)同時(shí)準(zhǔn)備進(jìn)餐還不至于會(huì)死鎖)
  • 一個(gè)線程想使用多把鎖的時(shí)候就要考慮死鎖的可能,有一個(gè)簡(jiǎn)單的規(guī)則還有避免死鎖:總是按照一個(gè)全局的固定的順序獲取多把鎖,其中一種實(shí)現(xiàn)如下:
class Philosopher extends Thread {
    private Chopstick first, second;
    private Random random;
    private int thinkCount;

    public Philosopher(Chopstick left, Chopstick right) {
        if(left.getId() < right.getId()) {
            first = left; second = right;
        } else {
            first = right; second = left;
        }
        random = new Random();
    }

    public void run() {
        try {
            while(true) {
                ++thinkCount;
                if (thinkCount % 10 == 0)
                    System.out.println("Philosopher " + this + " has thought " + thinkCount + " times");
                Thread.sleep(random.nextInt(1000));     // Think for a while
                synchronized(first) {                   // Grab first chopstick
                    synchronized(second) {                // Grab second chopstick
                        Thread.sleep(random.nextInt(1000)); // Eat for a while
                    }
                }
            }
        } catch(InterruptedException e) {}
    }
}
  • 程序解釋:當(dāng)所有人同時(shí)決定進(jìn)餐的時(shí)候,ABCD左手分別拿起1234號(hào)筷子(對(duì)于他們小的編號(hào)的筷子還是在左手),這和上面的程序沒啥不同,但是差別就在這個(gè)E,他左邊的筷子是大編號(hào),所以他左手拿的是1,然而1被A拿了,所以他就一只筷子都拿不到,所以D可以正常進(jìn)餐,就不會(huì)死鎖
  • 局限:獲取鎖的代碼寫的比較集中的話,有利于維護(hù)這個(gè)全局順序,但是對(duì)于規(guī)模比較大的程序,使用鎖的地方比較零散,各處都遵守這個(gè)順序就顯得不太實(shí)際.
  • 技巧:使用對(duì)象的散列值作為鎖的全局順序
  • 優(yōu)點(diǎn):適用于所有java對(duì)象,不用為鎖對(duì)象專門定義并維護(hù)一個(gè)順序,
  • 缺點(diǎn):但是對(duì)象的散列值不能保證唯一性(雖然幾率很小), 不是迫不得已不要使用
if(System.identityHashCode(left) < System.identityHashCode(right)) {
            first = left; second = right;
} else {
            first = right; second = left;
}

來(lái)自外星方法的危害

  private synchronized void updateProgress(int n) {
    for (ProgressListener listener: listeners) // listeners是累的一個(gè)field
      listener.onProgress(n);
  }
  • 上面的方法乍一看好像沒啥問題,但是這個(gè)方法調(diào)用了onProgress()方法,我們對(duì)這個(gè)方法一無(wú)所知, 要是他里面還有一把鎖,就可能會(huì)死鎖
  • 解決方案:在遍歷前對(duì)listeners進(jìn)行保護(hù)性復(fù)制(defensive copy),再針對(duì)這份副本進(jìn)行遍歷

  private void updateProgress(int n) {
    ArrayList<ProgressListener> listenersCopy;
    synchronized(this) {
      listenersCopy = (ArrayList<ProgressListener>)listeners.clone();
    }
    for (ProgressListener listener: listenersCopy)
      listener.onProgress(n);
  }
  • 這個(gè)方案一石多鳥,不僅調(diào)用外星方法的時(shí)候不用加鎖,而且還大大減少了代碼持有鎖的時(shí)間(前面是對(duì)方法加synchronized,這里是對(duì)語(yǔ)句塊)

避免危害的準(zhǔn)則

  • 1.對(duì)共享變量的所有訪問都需要同步化(讀臟數(shù)據(jù),競(jìng)態(tài)條件)
  • 2.讀線程和寫線程都需要同步化(內(nèi)存可見性)
  • 3.按照約定的全局順序獲取多把鎖(死鎖)
  • 4.當(dāng)持有鎖的時(shí)候避免調(diào)用外星方法(你對(duì)外星方法一無(wú)所知,要是他里面有鎖,就會(huì)死鎖)
  • 5.持有鎖的時(shí)間盡可能短

2.3第二天:超越內(nèi)置鎖

  • ReentrantLock提供了顯式的lock和unlock
Lock lock = new ReentrantLock();
lock.lock();
try{
  //使用共享資源

} finally { //使用finally確保鎖釋放
  lock.unlock();
}

可中斷的鎖

  • 使用內(nèi)置鎖,由于阻塞的線程無(wú)法被中斷,所以程序不可能從死鎖中恢復(fù),可以用ReentrantLock代替內(nèi)置鎖,使用它的lockInterruptibly 在下面的程序中使用Thread.interrupt()可以讓線程終止(這里說(shuō)的都是死鎖情況下)
    final ReentrantLock l1 = new ReentrantLock();
    final ReentrantLock l2 = new ReentrantLock();


    Thread t1 = new Thread() {
      public void run() {
        try {
          l1.lockInterruptibly();
          Thread.sleep(1000);
          l2.lockInterruptibly();
        } catch (InterruptedException e) { System.out.println("t1 interrupted"); }
      }
    };

超時(shí)

  • ReentrantLock突破了內(nèi)置鎖的另一個(gè)限制:可以為獲取鎖的操作設(shè)置超時(shí)時(shí)間,可以用這種方式來(lái)解決哲學(xué)家進(jìn)餐問題
class Philosopher extends Thread {
  private ReentrantLock leftChopstick, rightChopstick;
  private Random random;
  private int thinkCount;

  public Philosopher(ReentrantLock leftChopstick, ReentrantLock rightChopstick) {
    this.leftChopstick = leftChopstick; this.rightChopstick = rightChopstick;
    random = new Random();
  }

  public void run() {
    try {
      while(true) {
        ++thinkCount;
        if (thinkCount % 10 == 0)
          System.out.println("Philosopher " + this + " has thought " + thinkCount + " times");
        Thread.sleep(random.nextInt(1000)); // Think for a while
        leftChopstick.lock();
        try {
          if (rightChopstick.tryLock(1000, TimeUnit.MILLISECONDS)) {
            // Got the right chopstick
            try {
              Thread.sleep(random.nextInt(1000)); // Eat for a while
            } finally { rightChopstick.unlock(); }
          } else {
            // Didn't get the right chopstick - give up and go back to thinking
            System.out.println("Philosopher " + this + " timed out");
          }
        } finally { leftChopstick.unlock(); }
      }
    } catch(InterruptedException e) {}
  }
}
  • 雖然上述代碼不會(huì)死鎖,但也不是一個(gè)足夠好的方案,后面有更好的方案
  • 1.這個(gè)方案不能避免死鎖,只能避免無(wú)盡的死鎖 只是提供了從死鎖中恢復(fù)的手段
  • 2.會(huì)受到活鎖現(xiàn)象,如果所有死鎖線程同時(shí)超時(shí),它們極有可能再次陷入死鎖,雖然死鎖沒有永遠(yuǎn)持續(xù)下去,但對(duì)資源的爭(zhēng)奪狀況沒有得到任何改善(可以用一些方法減少活鎖的幾率,比如為每個(gè)線程設(shè)置不同的超時(shí)時(shí)間)

交替鎖(hand-over-hand locking)

  • 交替鎖可以只鎖住鏈表的一部分,允許不涉及被鎖部分的其他線程自由訪問鏈表.插入新的鏈表節(jié)點(diǎn)時(shí),需要將待插入位置兩邊的節(jié)點(diǎn)加鎖.首先鎖住鏈表的前兩個(gè)節(jié)點(diǎn),如果這兩個(gè)節(jié)點(diǎn)之間不是待插入的位置,那么就解鎖第一個(gè),并鎖住第三個(gè),以此類推,知道找到待插入位置并插入新的節(jié)點(diǎn),最后解鎖兩邊的節(jié)點(diǎn)

  • 這種交替型的加鎖和解鎖順序無(wú)法用內(nèi)置鎖實(shí)現(xiàn),使用ReentrantLock可以

class ConcurrentSortedList {
  private class Node {
    int value;
    Node prev;
    Node next;
    ReentrantLock lock = new ReentrantLock();

    Node() {}

    Node(int value, Node prev, Node next) {
      this.value = value; this.prev = prev; this.next = next;
    }
  }

  private final Node head;
  private final Node tail;

  public ConcurrentSortedList() {
    head = new Node(); tail = new Node();
    head.next = tail; tail.prev = head;
  }

  //insert方法是有序的 遍歷列表直到找到第一個(gè)值小于等于新插入的值得節(jié)點(diǎn),在這個(gè)位置插入
  public void insert(int value) {
    Node current = head;
    current.lock.lock();
    Node next = current.next;
    try {
      while (true) {
        next.lock.lock();
        try {
          if (next == tail || next.value < value) {
            Node node = new Node(value, current, next);
            next.prev = node;
            current.next = node;
              //!!!這里return要在兩個(gè)finally都執(zhí)行完后才會(huì)執(zhí)行啊!!!但只是finally里的.不過(guò)要是return換成exit(0)就直接退出了

            return;
          }
        } finally { current.lock.unlock(); }
        current = next;
        next = current.next;
      }
    } finally { next.lock.unlock(); }
  }

  public int size() {
    Node current = tail;
    int count = 0;

    while (current.prev != head) {
      ReentrantLock lock = current.lock;
      lock.lock();
      try {
        ++count;
        current = current.prev;
      } finally { lock.unlock(); }
    }

    return count;
  }

  public boolean isSorted() {
    Node current = head;
    while (current.next.next != tail) {
      current = current.next;
      if (current.value < current.next.value)
        return false;
    }
    return true;
  }
}

class LinkedList {
  public static void main(String[] args) throws InterruptedException {
    final ConcurrentSortedList list = new ConcurrentSortedList();
    final Random random = new Random();

    class TestThread extends Thread {
      public void run() {
        for (int i = 0; i < 10000; ++i)
          list.insert(random.nextInt());
      }
    }

    class CountingThread extends Thread {
      public void run() {
        while (!interrupted()) {
          System.out.print("\r" + list.size());
          System.out.flush();
        }
      }
    }

    Thread t1 = new TestThread();
    Thread t2 = new TestThread();
    Thread t3 = new CountingThread();
    //注意一下這里的用法 這里先join再interrupted的用法
    t1.start(); t2.start(); t3.start();
    t1.join(); t2.join();
    t3.interrupt();
 
    System.out.println("\r" + list.size());
 
    if (list.size() != 20000)
      System.out.println("*** Wrong size!");
 
    if (!list.isSorted())
      System.out.println("*** Not sorted!");
  }
}
  • 26行的 next.lock.lock();鎖住了頭,36行的 next.lock.lock();鎖住了下一個(gè)節(jié)點(diǎn). if ( next == tail || next.value < value )判斷兩個(gè)節(jié)點(diǎn)之間是否是待插入位置,如果不是,在38行的finally解鎖 current.lock.unlock();并繼續(xù)遍歷,如果找到待插入位置,33~36行構(gòu)造新節(jié)點(diǎn)并將其插入鏈表后返回.兩把鎖的解鎖操作在倆finally塊中進(jìn)行
  • 這種方案可以讓多個(gè)線程并發(fā)的進(jìn)行鏈表插入操作

條件變量

  • 并發(fā)編程經(jīng)常需要等待某個(gè)事件的發(fā)生.比如隊(duì)列刪除元素前需要等待隊(duì)列非空等等
  • 按照下面的模式使用條件變量
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
try {
  while (! ? condition is true ? ) {
    condition.await();
  }
  //使用共享資源
} finally { lock.unlock(); }
  • 為何要在一個(gè)循環(huán)中循環(huán)調(diào)用await():當(dāng)另一個(gè)線程調(diào)用了signal()或signalAll(),意味著對(duì)應(yīng)的條件可能為真,await()將原子地恢復(fù)運(yùn)行并重新加鎖.從await()返回后需要重新檢查等待的條件是否為真,必要的話可能再次調(diào)用await()并阻塞
  • 哲學(xué)家進(jìn)餐問題新解決方法
class Philosopher extends Thread {

    private boolean eating;
    private Philosopher left;
    private Philosopher right;
    private ReentrantLock table;
    private Condition condition;
    private Random random;
    private int thinkCount;

    public Philosopher ( ReentrantLock table ) {
        eating = false;
        this.table = table;
        condition = table.newCondition();
        random = new Random();
    }

    public void setLeft ( Philosopher left ) {
        this.left = left;
    }

    public void setRight ( Philosopher right ) {
        this.right = right;
    }

    public void run () {
        try {
            while ( true ) {
                think();
                eat();
            }
        }
        catch ( InterruptedException e ) {
        }
    }

    private void think () throws InterruptedException {
        table.lock();
        try {
            eating = false;
            left.condition.signal();
            right.condition.signal();
        } finally {
            table.unlock();
        }
        ++thinkCount;
        if ( thinkCount % 10 == 0 ) {
            System.out.println( "Philosopher " + this + " has thought " + thinkCount + " times" );
        }
        Thread.sleep( 1000 );
    }

    private void eat () throws InterruptedException {
        table.lock();
        try {
            while ( left.eating || right.eating ) {
                condition.await();
            }
            eating = true;
        } finally {
            table.unlock();
        }
        Thread.sleep( 1000 );
    }
}
  • 現(xiàn)在沒有筷子類,現(xiàn)在僅當(dāng)哲學(xué)家的左右鄰座都沒有進(jìn)餐的時(shí)候,他才可以進(jìn)餐
  • 當(dāng)一個(gè)哲學(xué)家饑餓的時(shí)候,他首先鎖住餐桌,這樣其他哲學(xué)家無(wú)法改變狀態(tài)(進(jìn)餐/思考),然后查看左右的哲學(xué)家有沒有在進(jìn)餐,沒有的話開始進(jìn)餐并解鎖餐桌,否則調(diào)用await(),解鎖餐桌
  • 當(dāng)一個(gè)哲學(xué)家進(jìn)餐結(jié)束開始思考的時(shí)候,他首先鎖住餐桌并將eating設(shè)置為false,然后通知左鄰右舍可以進(jìn)餐了,最后解鎖餐桌.
  • 之前的解決方案經(jīng)常只有一個(gè)哲學(xué)家能進(jìn)餐,其他人都持有一根筷子在等,這個(gè)方案中當(dāng)一個(gè)哲學(xué)家理論上可以進(jìn)餐,他肯定可以進(jìn)餐

原子變量

  • 原子變量是無(wú)鎖非阻塞算法的基礎(chǔ)
  • volatile是一種低級(jí)形式的同步,他的適用場(chǎng)景也越來(lái)越少,如果你要使用volatile,可以在atomic包中尋找更適合的工具

2.4 站在巨人的肩膀上

寫入時(shí)復(fù)制

  • 之前有用到保護(hù)性復(fù)制,Java標(biāo)準(zhǔn)庫(kù)提供了更優(yōu)雅的現(xiàn)成的方案--CopyOnWriteArrayList,他不是在遍歷列表前進(jìn)行復(fù)制,而是在列表被修改時(shí)進(jìn)行
  • 先將當(dāng)前容器進(jìn)行Copy,復(fù)制出一個(gè)新的容器森爽,然后新的容器里添加元素,添加完元素之后嚣镜,再將原容器的引用指向新的容器 所以CopyOnWrite容器也是一種讀寫分離的思想爬迟,讀和寫不同的容器。CopyOnWriteArrayList適合使用在讀操作遠(yuǎn)遠(yuǎn)大于寫操作的場(chǎng)景里菊匿,比如緩存
  • 缺點(diǎn):
  1. 內(nèi)存占用問題
  2. 數(shù)據(jù)一致性問題:CopyOnWrite容器只能保證數(shù)據(jù)的最終一致性付呕,不能保證數(shù)據(jù)的實(shí)時(shí)一致性计福。所以如果你希望寫入的的數(shù)據(jù),馬上能讀到徽职,請(qǐng)不要使用CopyOnWrite容器象颖。
//Downloader.java
  private CopyOnWriteArrayList<ProgressListener> listeners;


  public void addListener(ProgressListener listener) {
    listeners.add(listener);
  }
  public void removeListener(ProgressListener listener) {
    listeners.remove(listener);
  }
  private void updateProgress(int n) {
    for (ProgressListener listener: listeners)
      listener.onProgress(n);
  }

一個(gè)完整的程序

  • Q:wiki上出現(xiàn)頻率最高的詞

版本一的并行:生產(chǎn)者和消費(fèi)者(串行的我略過(guò)了)

//生產(chǎn)者 將page加到隊(duì)尾
class Parser implements Runnable {
  private BlockingQueue<Page> queue;


  public Parser(BlockingQueue<Page> queue) {
    this.queue = queue;
  }
  
  public void run() {
    try {
      Iterable<Page> pages = new Pages(100000, "enwiki.xml");
      for (Page page: pages)
        queue.put(page);
    } catch (Exception e) { e.printStackTrace(); }
  }
}
//消費(fèi)者
class Counter implements Runnable {
  private BlockingQueue<Page> queue;
  private Map<String, Integer> counts;

  public Counter(BlockingQueue<Page> queue,
                 Map<String, Integer> counts) {
    this.queue = queue;
    this.counts = counts;
  }


  public void run() {
    try {
      while(true) {
        Page page = queue.take();
        if (page.isPoisonPill())
          break;


        Iterable<String> words = new Words(page.getText());
        for (String word: words)
          countWord(word);
      }
    } catch (Exception e) { e.printStackTrace(); }
  }


  private void countWord(String word) {
    Integer currentCount = counts.get(word);
    if (currentCount == null)
      counts.put(word, 1);
    else
      counts.put(word, currentCount + 1);
  }
}


  • 最后創(chuàng)建兩個(gè)線程運(yùn)行
  public static void main(String[] args) throws Exception {
    ArrayBlockingQueue<Page> queue = new ArrayBlockingQueue<Page>(100);
    HashMap<String, Integer> counts = new HashMap<String, Integer>();
 
    Thread counter = new Thread(new Counter(queue, counts));
    Thread parser = new Thread(new Parser(queue));
    long start = System.currentTimeMillis();
 
    counter.start();
    parser.start();
    parser.join();
    queue.put(new PoisonPill());
    counter.join();
    long end = System.currentTimeMillis();
    System.out.println("Elapsed time: " + (end - start) + "ms");
  }
  • 程序解釋:該程序由兩個(gè)線程在跑.一個(gè)讀取一個(gè)分析,性能還不是最高.這里ArrayBlockingQueue<Page> queue = new ArrayBlockingQueue<Page>(100);用了一個(gè)阻塞的并發(fā)隊(duì)列來(lái)存放讀取到的page.這個(gè)并發(fā)隊(duì)列很適合實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式,提供了高效的并發(fā)方法put()和take(),這些方法會(huì)在必要時(shí)阻塞:當(dāng)一個(gè)空隊(duì)列調(diào)用take(一個(gè)滿隊(duì)列調(diào)用put()),程序會(huì)阻塞直到隊(duì)列變?yōu)榉强?非滿)

  • 為什么要用阻塞隊(duì)列?
    concurrent包不僅提供了阻塞隊(duì)列,還提供了一種容量無(wú)限,操作不需等待,非阻塞的隊(duì)列ConcurrentLinkedQueue.為何不用他?關(guān)鍵在與生產(chǎn)者和消費(fèi)者幾乎不會(huì)保持相同的速度,當(dāng)生產(chǎn)者速度快于消費(fèi)者,生產(chǎn)者越來(lái)越大的時(shí)候,會(huì)撐爆內(nèi)存.相比之下,阻塞隊(duì)列只允許生產(chǎn)者的速度在一定程度上超過(guò)消費(fèi)者的速度,但不會(huì)超過(guò)很多.

第二個(gè)版本:多個(gè)消費(fèi)者

  • 上個(gè)版本的解析文件花了10s,統(tǒng)計(jì)花了95s,一共花了95s.進(jìn)一步優(yōu)化就對(duì)統(tǒng)計(jì)過(guò)程進(jìn)行并行化,建立多個(gè)消費(fèi)者.(他這里還是用一個(gè)count,不同的消費(fèi)者都寫這一個(gè)count,所以要加鎖)
  private void countWord(String word) {
    lock.lock();
    try {
      Integer currentCount = counts.get(word);
      if (currentCount == null)  counts.put(word, 1);
      else  counts.put(word, currentCount + 1);
    } finally { lock.unlock(); }
  }
  • 運(yùn)行多個(gè)消費(fèi)者
    ExecutorService executor = Executors.newCachedThreadPool();
    for (int i = 0; i < NUM_COUNTERS; ++i)
      executor.execute(new Counter(queue, counts));
    Thread parser = new Thread(new Parser(queue));
    parser.start();
    parser.join();
    for (int i = 0; i < NUM_COUNTERS; ++i)
      queue.put(new PoisonPill());
    executor.shutdown();
  • 但是一運(yùn)行發(fā)現(xiàn)比串行還慢一半.主要原因就是過(guò)多的線程嘗試訪問同一個(gè)共享資源,等待的時(shí)間比運(yùn)行的時(shí)間還長(zhǎng).改用ConcurrentHashMap(使用了鎖分段)
  //改用 ConcurrentHashMap
  private void countWord(String word) {
    while (true) { //理解一下這里的循環(huán) 如果下面的操作沒有成功的話,就重試
      Integer currentCount = counts.get(word);
      if (currentCount == null) {
        if (counts.putIfAbsent(word, 1) == null) //如果沒有與1關(guān)聯(lián) 則關(guān)聯(lián),有原子性
          break;
      } else if (counts.replace(word, currentCount, currentCount + 1)) {
        break;
      }
    }
  • 這次的測(cè)速要好很多,但是沒有理論上的提速.因?yàn)橄M(fèi)者對(duì)conuts有一些不必要的競(jìng)爭(zhēng),與其所有消費(fèi)者都共享一個(gè)counts,不如每個(gè)消費(fèi)者各自維護(hù)一個(gè)計(jì)數(shù)map,再對(duì)這些計(jì)數(shù)map進(jìn)行合并
class Counter implements Runnable {

  private BlockingQueue<Page> queue;
  private ConcurrentMap<String, Integer> counts;
  private HashMap<String, Integer> localCounts;

  public Counter(BlockingQueue<Page> queue,
                 ConcurrentMap<String, Integer> counts) {
    this.queue = queue;
    this.counts = counts;
    localCounts = new HashMap<String, Integer>();
  }

  public void run() {
    try {
      while(true) {
        Page page = queue.take();
        if (page.isPoisonPill())
          break;
        Iterable<String> words = new Words(page.getText());
        for (String word: words)
          countWord(word);
      }
      //所以計(jì)數(shù)的那個(gè)可以是普通的map 他只在自己的線程里
      mergeCounts();
    } catch (Exception e) { e.printStackTrace(); }
  }

  private void countWord(String word) {
    Integer currentCount = localCounts.get(word);
    if (currentCount == null)
      localCounts.put(word, 1);
    else
      localCounts.put(word, currentCount + 1);
  }

  private void mergeCounts() {
    for (Map.Entry<String, Integer> e: localCounts.entrySet()) {
      String word = e.getKey();
      Integer count = e.getValue();
      while (true) {
        Integer currentCount = counts.get(word);
        if (currentCount == null) {
          if (counts.putIfAbsent(word, count) == null)
            break;
        } else if (counts.replace(word, currentCount, currentCount + count)) {
          break;
        }
      }
    }
  }
}

第三天總結(jié)

  • 1.使用線程池,不要直接創(chuàng)建線程
  • 2.使用CopyOnWriteArrayList讓監(jiān)聽器相關(guān)的代碼更簡(jiǎn)單高效
  • 3.使用ArrayBlockingQueue讓生產(chǎn)者和消費(fèi)者之間高效合作
  • 4.ConcurrentHashMap提供了更好的并發(fā)訪問

2.5 復(fù)習(xí)

  • 線程與鎖的缺點(diǎn):沒有為并行提供直接的支持,對(duì)于程序員,編程語(yǔ)言層面沒有提供足夠的幫助
  • 應(yīng)用多線程的難點(diǎn)不在編程,而在于難以測(cè)試,多線程的bug很難重現(xiàn).可維護(hù)性更讓人頭疼,如果不能對(duì)多線程問題進(jìn)行可靠的測(cè)試,就無(wú)法對(duì)多線程進(jìn)行可靠的重構(gòu).使用其他不那么底層的模型會(huì)好一些.
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市姆钉,隨后出現(xiàn)的幾起案子说订,更是在濱河造成了極大的恐慌,老刑警劉巖潮瓶,帶你破解...
    沈念sama閱讀 219,039評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件陶冷,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡毯辅,警方通過(guò)查閱死者的電腦和手機(jī)埂伦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)思恐,“玉大人沾谜,你說(shuō)我怎么就攤上這事≌陀ǎ” “怎么了基跑?”我有些...
    開封第一講書人閱讀 165,417評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)嗜逻。 經(jīng)常有香客問我涩僻,道長(zhǎng),這世上最難降的妖魔是什么栈顷? 我笑而不...
    開封第一講書人閱讀 58,868評(píng)論 1 295
  • 正文 為了忘掉前任逆日,我火速辦了婚禮,結(jié)果婚禮上萄凤,老公的妹妹穿的比我還像新娘室抽。我一直安慰自己,他們只是感情好靡努,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,892評(píng)論 6 392
  • 文/花漫 我一把揭開白布坪圾。 她就那樣靜靜地躺著,像睡著了一般惑朦。 火紅的嫁衣襯著肌膚如雪兽泄。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,692評(píng)論 1 305
  • 那天漾月,我揣著相機(jī)與錄音病梢,去河邊找鬼。 笑死,一個(gè)胖子當(dāng)著我的面吹牛蜓陌,可吹牛的內(nèi)容都是我干的觅彰。 我是一名探鬼主播,決...
    沈念sama閱讀 40,416評(píng)論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼钮热,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼填抬!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起隧期,我...
    開封第一講書人閱讀 39,326評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤飒责,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后厌秒,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體读拆,經(jīng)...
    沈念sama閱讀 45,782評(píng)論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,957評(píng)論 3 337
  • 正文 我和宋清朗相戀三年鸵闪,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了檐晕。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,102評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡蚌讼,死狀恐怖辟灰,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情篡石,我是刑警寧澤芥喇,帶...
    沈念sama閱讀 35,790評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站凰萨,受9級(jí)特大地震影響继控,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜胖眷,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,442評(píng)論 3 331
  • 文/蒙蒙 一武通、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧珊搀,春花似錦冶忱、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至劳淆,卻和暖如春链沼,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背沛鸵。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工忆植, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,332評(píng)論 3 373
  • 正文 我出身青樓朝刊,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親蜈缤。 傳聞我的和親對(duì)象是個(gè)殘疾皇子拾氓,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,044評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容