實(shí)戰(zhàn)Java高并發(fā)程序設(shè)計(jì)筆記第三章

JDK并發(fā)包

本章內(nèi)容:
1巫员、關(guān)于同步控制的工具
2豪硅、線程池
3捎琐、JDK的一些并發(fā)容器

多線程的團(tuán)隊(duì)協(xié)作:同步控制

synchronized的功能擴(kuò)展:重入鎖

  • 可以完全替代synchronized,使用java.util.concurrent.locks.ReentrantLock類來(lái)實(shí)現(xiàn)

    public class ReenterLock implements Runnable {
      public static ReentrantLock lock = new ReentrantLock();
      public static int i = 0;
    
      @Override
      public void run() {
          for (int j = 0; j < 1000000; j++) {
              lock.lock();
              try {
                  i++;
              } finally {
                  lock.unlock();
              }
    
          }
      }
    
      public static void main(String args[]) throws InterruptedException {
          ReenterLock reenterLock = new ReenterLock();
          Thread thread1 = new Thread(reenterLock);
          Thread thread2 = new Thread(reenterLock);
    
          thread1.start();
          thread2.start();
    
          thread1.join();
          thread2.join();
    
          System.out.println(i);
      }
    
    }
    
    • 執(zhí)行結(jié)果: 2000000
  • 如何理解重入虱咧?

這種鎖可以反復(fù)進(jìn)入耕拷,一個(gè)線程連續(xù)兩次獲得同一把鎖

  • 中斷響應(yīng):lockInterruptibly
    public class IntLock implements Runnable {
      public static ReentrantLock lock1 = new ReentrantLock();
      public static ReentrantLock lock2 = new ReentrantLock();
      int lock;
    
      public IntLock(int lock) {
          this.lock = lock;
      }
    
      @Override
      public void run() {
          try {
              if (lock == 1) {
                  lock1.lockInterruptibly();
                  Thread.sleep(500);
                  lock2.lockInterruptibly();
              } else {
                  lock2.lockInterruptibly();
                  Thread.sleep(500);
                  lock1.lockInterruptibly();
              }
          } catch (InterruptedException e) {
              e.printStackTrace();
          } finally {
              if (lock1.isHeldByCurrentThread()) {
                  lock1.unlock();
              }
              if (lock2.isHeldByCurrentThread()) {
                  lock2.unlock();
              }
              System.out.println(Thread.currentThread().getId() + ":線程退出");
          }
    
      }
    
      public static void main(String args[]) throws InterruptedException {
          IntLock r1 = new IntLock(1);
          IntLock r2 = new IntLock(2);
    
          Thread thread1 = new Thread(r1);
          Thread thread2 = new Thread(r2);
    
          thread1.start();
          thread2.start();
    
          Thread.sleep(1000);
    
          thread2.interrupt();
    
      }
    }
    
    • 執(zhí)行結(jié)果:
    java.lang.InterruptedException
      at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:944)
      at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1263)
      at java.base/java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:317)
      at chapter3.IntLock.run(IntLock.java:27)
      at   java.base/java.lang.Thread.run(Thread.java:835)
    15:線程退出
    14:線程退出
    
  • 鎖申請(qǐng)等待限時(shí)trylock
    帶參的trylock
    
    public class TimeLock implements Runnable {
      public static ReentrantLock lock = new ReentrantLock();
    
      @Override
      public void run() {
          try {
              if (lock.tryLock(5, TimeUnit.SECONDS)) {
                  System.out.println(Thread.currentThread().getName());
                  System.out.println("get lock success");
                  Thread.sleep(60000);
              } else {
                  System.out.println(Thread.currentThread().getName());
                  System.out.println("get lock failed");
              }
          } catch (InterruptedException e) {
              e.printStackTrace();
          } finally {
              if (lock.isHeldByCurrentThread()) {
                  lock.unlock();
              }
          }
      }
    
    
      public static void main(String args[]) {
          TimeLock timeLock = new TimeLock();
          Thread thread1 = new Thread(timeLock);
          Thread thread2 = new Thread(timeLock);
    
          thread1.start();
          thread2.start();
      }
    }
    
    不帶參的trylock
    public class TryLock implements Runnable {
      public static ReentrantLock lock1 = new ReentrantLock();
      public static ReentrantLock lock2 = new ReentrantLock();
      int lock;
    
      public TryLock(int lock) {
          this.lock = lock;
      }
    
      @Override
      public void run() {
          if (lock == 1) {
              while (true) {
                  if (lock1.tryLock()) {
                      try {
                          try {
                              Thread.sleep(500);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          if (lock2.tryLock()) {
                              try {
                                  System.out.println(Thread.currentThread().getId() + ":My Job done;");
                                  return;
                              } finally {
                                  lock2.unlock();
                              }
                          }
                      } finally {
                          lock1.unlock();
                      }
                  }
              }
          } else {
              while (true) {
                  if (lock2.tryLock()) {
                      try {
                          try {
                              Thread.sleep(500);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          if (lock1.tryLock()) {
                              try {
                                  System.out.println(Thread.currentThread().getId() + ":My Job done;");
                                  return;
                              } finally {
                                  lock1.unlock();
                              }
                          }
                      } finally {
                          lock2.unlock();
                      }
                  }
              }
    
          }
      }
    
      public static void main(String args[]) {
          TryLock r1 = new TryLock(1);
          TryLock r2 = new TryLock(2);
          Thread thread1 = new Thread(r1);
          Thread thread2 = new Thread(r2);
    
          thread1.start();
          thread2.start();
      }
    
    }
    
  • 公平鎖

大多數(shù)情況下汹族,鎖的申請(qǐng)都是非公平的萧求,系統(tǒng)只是從等待隊(duì)列中隨機(jī)地的選出一個(gè)線程。公平類似于一種先到先服務(wù)策略顶瞒,不會(huì)導(dǎo)致某些線程一直得不到執(zhí)行從而產(chǎn)生饑餓的現(xiàn)象

重入鎖可以對(duì)公平性進(jìn)行設(shè)置

public ReentrantLock(boolean fair)

公平鎖案例:


public class FairLock implements Runnable {

  public static ReentrantLock fairLock = new ReentrantLock(true);//設(shè)置true指定鎖是公平的,也可以不設(shè)置,分別運(yùn)行觀察公平鎖與非公平鎖間的區(qū)別
  //public static ReentrantLock unfairLock = new ReentrantLock();

  @Override
  public void run() {
      while (true) {
          try {
              fairLock.lock();
              // unfairLock.lock();
              System.out.println(Thread.currentThread().getName() + "獲得鎖");
          } finally {
              fairLock.unlock();
              // unfairLock.unlock();
          }
      }
  }

  /**
   * 公平鎖的一個(gè)特點(diǎn)是:不會(huì)產(chǎn)生饑餓現(xiàn)象,只要排隊(duì)最終都會(huì)得到資源.
   * <p/>
   * 但是實(shí)現(xiàn)公平鎖要求系統(tǒng)維護(hù)一個(gè)有序隊(duì)列,因此公平鎖的實(shí)現(xiàn)成本較高,性能相對(duì)低下.
   *
   * @param args
   */
  public static void main(String args[]) {
      FairLock r1 = new FairLock();
      Thread thread1 = new Thread(r1, "Thread_t1");
      Thread thread2 = new Thread(r1, "Thread_t2");
      Thread thread3 = new Thread(r1, "Thread_t3");

      thread1.start();
      thread2.start();
      thread3.start();
  }

}

ReetrantLock()的幾個(gè)重要方法:

lock()  //獲得鎖夸政,如果鎖已經(jīng)被占有,則等待
lockInterruptibly()   //獲得鎖榴徐,但優(yōu)先響應(yīng)中斷
tryLock()  //嘗試獲得鎖守问,如果成功,返回true坑资,失敗返回false,不等待耗帕,立即返回
tryLock(long time,TimeUnit unit)//在給定的時(shí)間嘗試獲取鎖
unlock()    //釋放鎖

重入鎖的實(shí)現(xiàn)主要包含三個(gè)要素:
1、原子狀態(tài)

使用CAS操作來(lái)村粗當(dāng)前鎖狀態(tài)袱贮,判斷鎖是否已經(jīng)被別的線程持有
2仿便、等待隊(duì)列
所有沒(méi)有請(qǐng)求到鎖的線程,會(huì)進(jìn)入等待隊(duì)列進(jìn)行等待。待有線程釋放鎖后探越,系統(tǒng)從等待隊(duì)列中喚醒一個(gè)線程狡赐,繼續(xù)工作
3、阻塞原語(yǔ)park()和unpark()钦幔,用來(lái)掛起和恢復(fù)線程

重入鎖的好搭檔:Condition條件(與wait()和notify()的作用大致相同)

  • Condition接口提供的基本方法如下:
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long tine,TimeUnit unit) throws InterruptedException;
    boolean awaiUtilt(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
    
    Condition功能案例:參照wait與notify的工作流程
    
    public class ReenterLockCondition implements Runnable {
      public static ReentrantLock lock = new ReentrantLock();
      public static Condition condition = lock.newCondition();
    
      @Override
      public void run() {
    
          try {
              lock.lock();
              condition.await();
              System.out.println("Thread is going on");
          } catch (InterruptedException e) {
              e.printStackTrace();
          } finally {
              lock.unlock();
          }
    
      }
    
      public static void main(String args[]) throws InterruptedException {
          ReenterLockCondition reenterLockCondition = new ReenterLockCondition();
          Thread thread1 = new Thread(reenterLockCondition);
          thread1.start();
          System.out.println("睡眠2秒鐘");
          Thread.sleep(2000);
          lock.lock();
          condition.signal();
          lock.unlock();
      }
    }
    
    ArrayBlockingQueue使用重入鎖和Condition對(duì)象的案例:
private final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();

public void put(E e) throws InterruptedException{
   if(e==null) throw new NullPointerException;
   final E[] items = this.items;
   final ReentrantLock lock = this.lock;
   lock.lockInterruptibly();
   try{
       try{
           while(count==items.length){
               notFull.await();
           }catch(InterruptedException e){
               notFull.signal();
               throw e;
           }
       }
       insert(e);
   }finally{
       lock.unlock();
   }
}

private void insert(E x){
   items[putIndex] = x;
   putIndex = inc(putIndex);
   ++count;
   notEmpty.signal();
}


public E take(E e) throws InterruptedException{
   final ReentrantLock lock = this.lock;
   lock.lockInterruptibly();
   try{
       try{
           while(count==0){
               notEmpty.await();
           }catch(InterruptedException e){
               notEmpty.signal();
               throw e;
           }
       }
       extract(e);
   }finally{
       lock.unlock();
   }
}

private E extract(){
   final E[] items = this.items;
   E x = items[takeIndex];
   items[takeIndex] = null;
   takeIndex = inc(takeIndex);
   --count;
   notFull.signal();
   return x;
}

允許多個(gè)線程同時(shí)訪問(wèn):信號(hào)量(Semphore)

什么是信號(hào)量枕屉?
信號(hào)量是對(duì)鎖的擴(kuò)展,對(duì)于內(nèi)部鎖synchronized和重入鎖ReentrantLock,一次只允許一個(gè)線程訪問(wèn)一個(gè)資源鲤氢,而信號(hào)量可以指定多個(gè)線程搀擂,同時(shí)訪問(wèn)同一個(gè)資源

  • 信號(hào)量的構(gòu)造函數(shù)
public Semaphore(int permits)
public Semaphore(int permits,boolean fair)

許可是什么意思?
許可也就是準(zhǔn)入數(shù)卷玉,表示一次可以有多少個(gè)線程同時(shí)訪問(wèn)同一個(gè)資源

  • 信號(hào)量的主要邏輯方法:
public void acquire()
public void acquireUninterruptibly()
public boolean tryAcquire()
public boolean tryAcquire(long timeout,TimeUnit unit)
public void release()

信號(hào)量使用實(shí)例:程序不會(huì)停止哨颂??相种?


  public class SemapDemo implements Runnable {
    final Semaphore semp = new Semaphore(5);

    @Override
    public void run() {
        try {
            semp.acquire();
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getId() + ":done!");
            semp.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 總共20個(gè)線程,系統(tǒng)會(huì)以5個(gè)線程一組為單位,依次執(zhí)行并輸出
     *
     * @param args
     */
    public static void main(String args[]) {
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        final SemapDemo demo = new SemapDemo();
        for (int i = 0; i < 20; i++) {
            executorService.submit(demo);
        }
    }
  }

ReadWriteLock讀寫(xiě)鎖

什么是讀寫(xiě)鎖威恼?
讀寫(xiě)鎖準(zhǔn)確的來(lái)說(shuō),是讀寫(xiě)分離的鎖寝并,當(dāng)使用內(nèi)部鎖或重入鎖時(shí)箫措,所有的讀與寫(xiě)之間的線程都是串行執(zhí)行,然而實(shí)際上衬潦,讀與讀之間不存在線程安全的問(wèn)題斤蔓,可以同時(shí)操作,讀寫(xiě)鎖就是為了解決這個(gè)問(wèn)題而出現(xiàn)的

  • 讀寫(xiě)鎖的訪問(wèn)約束
    寫(xiě)
    非阻塞 阻塞
    寫(xiě) 阻塞 阻塞
  • 適用場(chǎng)景:系統(tǒng)中讀的次數(shù)遠(yuǎn)遠(yuǎn)多于寫(xiě)的次數(shù)

讀寫(xiě)鎖ReadWriteLock與非讀寫(xiě)鎖Lock的對(duì)比案例:

    public Object handleRead(Lock lock) throws InterruptedException {
        try {
            lock.lock();
            Thread.sleep(1000);//模擬讀操作
            System.out.println("讀操作:" + value);
            return value;
        } finally {
            lock.unlock();
        }
    }

    public void handleWrite(Lock lock, int index) throws InterruptedException {
        try {
            lock.lock();
            Thread.sleep(1000);//模擬寫(xiě)操作
            System.out.println("寫(xiě)操作:" + value);
            value = index;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String args[]) {
        final ReadWriteLockDemo demo = new ReadWriteLockDemo();

        Runnable readRunnable = new Runnable() {
            @Override
            public void run() {
                //分別使用兩種鎖來(lái)運(yùn)行,性能差別很直觀的就體現(xiàn)出來(lái),使用讀寫(xiě)鎖后讀操作可以并行,節(jié)省了大量時(shí)間
                try {
                    demo.handleRead(readLock);
                    //demo.handleRead(lock);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };

        Runnable writeRunnable = new Runnable() {
            @Override
            public void run() {
                //分別使用兩種鎖來(lái)運(yùn)行,性能差別很直觀的就體現(xiàn)出來(lái)
                try {
                    demo.handleWrite(writeLock, new Random().nextInt(100));
                    //demo.handleWrite(lock, new Random().nextInt(100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };
        for (int i = 0; i < 18; i++) {
            new Thread(readRunnable).start();
        }
        for (int i = 18; i < 20; i++) {
            new Thread(writeRunnable).start();
        }
    }
  }

倒計(jì)時(shí)器:CountDownLatch

什么是CountDownLatch?
是一種多線程并發(fā)控制工具镀岛,類比于與火箭的檢查工作弦牡,在倒計(jì)時(shí)結(jié)束后,線程才開(kāi)始執(zhí)行

  • 構(gòu)造函數(shù):
    public CountDownLatch(int count)
    
  • 案例:
    public class CountDownLatchDemo implements Runnable {
      static final CountDownLatch end = new CountDownLatch(10);
      static final CountDownLatchDemo demo = new CountDownLatchDemo();
    
      @Override
      public void run() {
    
          try {
              Thread.sleep(new Random().nextInt(3) * 1000);
              System.out.println("check complete");
              end.countDown();
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }
    
      public static void main(String args[]) throws InterruptedException {
          ExecutorService executorService = Executors.newFixedThreadPool(10);
          for (int i = 0; i < 10; i++) {
              executorService.submit(demo);
          }
          //等待檢查
          end.await();
          //發(fā)射火箭
          System.out.println("Fire!");
          executorService.shutdown();
      }
    }
    

循環(huán)柵欄:CyclicBarrier

什么是循環(huán)柵欄漂羊?
另外一種多線程并發(fā)控制實(shí)用工具驾锰,與CountDownlatch類似,比其更強(qiáng)大
實(shí)例:

線程阻塞工具類:LockSupport

類似于wait()和notify()

線程復(fù)用:線程

在實(shí)際生產(chǎn)環(huán)境中拨与,線程的數(shù)量必須得以控制稻据。盲目的大量創(chuàng)造線程是對(duì)系統(tǒng)有傷害的,所以就出現(xiàn)了線程池买喧,對(duì)線程加以控制和管理

什么是線程池

當(dāng)需要使用線程時(shí)捻悯,從線程池獲取一個(gè)線程執(zhí)行,當(dāng)執(zhí)行完畢后淤毛,將線程返還給線程池

  • 線程池的作用.PNG

不重復(fù)發(fā)明輪子:JDK 對(duì)線程池的支持

  • Executor框架結(jié)構(gòu)圖
  • Executor 提供了各種類型的線程池
  • 1.固定大小的線程池案例:
  • 2.計(jì)劃任務(wù)newScheduledThreadPool

刨根究底:核心線程池的內(nèi)部實(shí)現(xiàn)

無(wú)論是newFixedThreadPool()方法今缚、newSingleThreadExecutor還是newCachedThreadPool()方法,均使用ThreadPoolExecutor實(shí)現(xiàn)

public static ExecutorService newFixedThreadPool(int nThreads)
{ return new ThreadPoolExecutor(nThreads,
nThreads,
0L,
TimeUnit.MILLSECONDS,
new LinkedBlockingQueue<Runanble>());
}
public static ExecutorService newSingleThreadPool(int nThreads){ return new FinalizableDelegatedExecutorService( new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLSECONDS,new LinkedBlockingQueue<Runanble>()));
}
public static ExecutorService newCachedThreadPool(int nThreads){ return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueue<Runanble>());
}

ThreadPoolExecutor最重要的構(gòu)造函數(shù):

public ThreadPoolExecutor(int corePoolSize,//指定線程池中的線程數(shù)量
int maximumPoolSize,//指定線程池中最大線程數(shù)量
long keepAliveTime,//超過(guò)corePoolSize的空閑線程低淡,在多長(zhǎng)時(shí)間內(nèi)姓言,會(huì)被銷毀
TimeUnit unit,//keepAliveTime的單位
BlockingQueue<Runanble> workQueue,//任務(wù)隊(duì)列瞬项,被提交但尚未被執(zhí)行的任務(wù)
ThreadFactory threadFactory,//線程工廠
RejecttedExecutionHandler handler)//拒絕策略,當(dāng)任務(wù)太多來(lái)不及處理何荚,如何拒絕任務(wù)

workQueue:指被提交但未執(zhí)行的任務(wù)隊(duì)列囱淋,是一個(gè)BlockingQueue接口的對(duì)象,ThreadPoolExecutor可使用以下幾種BlockingQueue

  • 直接提交的隊(duì)列:SynchronousQueue
  • 有界的任務(wù)隊(duì)列:ArrayBlockingQueue
  • 無(wú)界的任務(wù)隊(duì)列:LinkedBlockingQueue
  • 優(yōu)先任務(wù)隊(duì)列:PriorityBlockingQueue
    ThreadPoolExecutor線程池的核心調(diào)度代碼

調(diào)度邏輯可總結(jié)為:

  • ThreadPoolExecutor的調(diào)度邏輯.PNG

超負(fù)載了怎么辦:拒絕策略

RejecttedExecutionHandler handler
JDK 內(nèi)置提供了四種拒絕策略:

  • AbortPolicy :直接拋出異常餐塘,阻止系統(tǒng)正常工作
  • CallerRunsPolicy:只要線程池未關(guān)閉妥衣,該策略直接在調(diào)用者線程中,運(yùn)行當(dāng)前被丟棄的任務(wù)
  • DiscardOldestPolicy:丟棄最老的一個(gè)請(qǐng)求(也就是即將被執(zhí)行的任務(wù))戒傻,并嘗試再次提交當(dāng)前任務(wù)
  • DiscardPolicy:默默丟棄税手,不做任何處理

自定義線程池和拒絕策略的使用:

public class RejectThreadPoolDemo{
   public static class MyTask implements Runnable{
       @Override
       public void run(){
           System.out.println(System.currentTimeMills()+":Thread ID:"
               + Thread.currentThread().getId());
           try{
               Thread.sleep(100);
           }catch(InterruptedException e){
               e.printStackTrace();
           }
       }
   }
   public static void main(String[] args) {
       MyTask task = new MyTask();
       ExecutorService es = new ThreadPoolExecutor(5,5,
           0L,TimeUnit.MILLSECONDS,
           new LinkedBlockingQueue<Runnable>(10),
           Executors.defaultThreadFactory(),
           new RejectExecutionHandler(){
               @Override
               public void rejectedExecution(Runnable r,
                   ThreadPoolExecutor executor){
                       System.out.println(r.toString()+ is disacrd"");
               }
       });
       for (int i=0;i<Integer.MAX_VALUE ;i++ ) {
           es.submit(task);
           Thread.sleep(10);
       }
   }

}

自定義線程創(chuàng)建:ThreadFactory

ThreadFactory是一個(gè)接口,只有一個(gè)方法需纳,用來(lái)創(chuàng)建線程:

Thread newThread(Runnable r);

自定義線程池的好處:

  • 可以跟蹤線程池究竟在何時(shí)創(chuàng)建了線程芦倒,也饑餓自定義線程的名稱、組以及優(yōu)先級(jí)等信息不翩,甚至可以任性地將所有的線程設(shè)置為守護(hù)線程
    如下:main主線程運(yùn)行2s后兵扬,JVM自動(dòng)退出,將會(huì)強(qiáng)制銷毀線程池
public static void main(String args[]) throws InterruptedException {
        MyTask myTask = new MyTask();

        ExecutorService executorService = new ThreadPoolExecutor(5, 5, 
            0L, TimeUnit.SECONDS, 
            new SynchronousQueue<Runnable>(),
            new ThreadFactory(){
                @Override
                public Thread new Thread(){
                    Thread t = new Thread(r);
                    t.setDaemon(true);
                    System.out.println("create" + t);
                    return t;
                }
            }, 
        });

        for (int i = 0; i < 5; i++) {
            es.submit(task);
        }
        Thread.sleep(2000);
    }

我的應(yīng)用我做主:擴(kuò)展線程池

什么時(shí)候需要擴(kuò)展線程池口蝠?
當(dāng)想監(jiān)控每個(gè)任務(wù)的開(kāi)始和結(jié)束時(shí)間周霉,或者其他一些自定義的增強(qiáng)功能時(shí)(動(dòng)態(tài)代理),就需要對(duì)線程池進(jìn)行擴(kuò)展

ThreadPoolExecutor也是一個(gè)可擴(kuò)展的線程池亚皂,提供了beforeExecute()、afterExecutor()和terminated()三個(gè)方法用于增強(qiáng)
ThreadPoolExecutor.Worker.runTask()方法的內(nèi)部實(shí)現(xiàn):

  • runTask方法.PNG

    線程池?cái)U(kuò)展的例子:

public class ExtThreadPool {

    public static class MyTask implements Runnable {
        public String name;

        public MyTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            System.out.println("正在執(zhí)行:Thread ID:" + Thread.currentThread().getId() + ",Task Name:" + name);

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    public static void main(String args[]) throws InterruptedException {
        ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>()) {
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("準(zhǔn)備執(zhí)行:" + ((MyTask) r).name);
            }

            protected void afterExecute(Thread t, Runnable r) {
                System.out.println("執(zhí)行完成:" + ((MyTask) r).name);
            }

            protected void terminated() {
                System.out.println("線程池退出!");
            }
        };

        for (int i = 0; i < 5; i++) {
            MyTask task = new MyTask("TASK-GEYM-" + i);
            executorService.execute(task);
            Thread.sleep(10);
        }
        executorService.shutdown();
    }
}

注意:

1.此處用的是execute()方法提交任務(wù)国瓮,沒(méi)有用submit()
2.shutdown()關(guān)閉線程池時(shí)灭必,并不是暴力終止所有線程,而是發(fā)給一個(gè)關(guān)閉信號(hào)給線程池乃摹,線程池此時(shí)不能再接收其他任務(wù)禁漓,執(zhí)行完池內(nèi)剩余的線程

合理的選擇:優(yōu)化線程池?cái)?shù)量

  • 線程池的數(shù)量.PNG

堆棧去哪里了:在線程池中尋找堆棧

submit()竟然沒(méi)有異常提示:


public class NoTraceDivTaskDemo {
    public static class DivTask implements Runnable {
        int a, b;

        public DivTask(int a, int b) {
            this.a = a;
            this.b = b;
        }

        @Override
        public void run() {
            double re = a / b;
            System.out.println(re);
        }
    }


    public static void main(String args[]) {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        for (int i = 0; i < 5; i++) {
            poolExecutor.submit(new DivTask(100, i)); //沒(méi)有報(bào)錯(cuò)提示
//            poolExecutor.execute(new DivTask(100, i));//有報(bào)錯(cuò)提示
        }
    }
}

很明顯,當(dāng)當(dāng)我執(zhí)行上述程序時(shí)孵睬,應(yīng)該后出現(xiàn)除零異常播歼,但是程序運(yùn)行結(jié)果實(shí)際上如下:

100.0
25.0
33.0
50.0

注釋submit,使用execute提交掰读,會(huì)有錯(cuò)誤提示:

Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
    at chapter3.NoTraceDivTaskDemo$DivTask.run(NoTraceDivTaskDemo.java:21)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:835)
100.0
25.0
33.0
50.0

自己查看提交任務(wù)線程的堆棧信息:


public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
    
    public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public void execute(Runnable task) {
        super.execute(wrap(task, clientTrace(), Thread.currentThread().getName()));
    }

    private Runnable wrap(final Runnable task, final Exception clientTrace, String name) {
        return new Runnable() {
            @Override
            public void run() {
                try {
                    task.run();
                } catch (Exception e) {
                    clientTrace.printStackTrace();
                    throw e;
                }
            }
        };
    }
    private Exception clientTrace() {
        return new Exception("Client stack trace");
    }
}
public static void main(String args[]) {
        ThreadPoolExecutor threadPoolExecutor = new TraceThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        for (int i = 0; i < 5; i++) {
            threadPoolExecutor.execute(new NoTraceDivTaskDemo.DivTask(100, i));
        }
    }
  • 運(yùn)行結(jié)果:
java.lang.Exception: Client stack trace
    at chapter3.TraceThreadPoolExecutor.clientTrace(TraceThreadPoolExecutor.java:35)
    at chapter3.TraceThreadPoolExecutor.execute(TraceThreadPoolExecutor.java:17)
    at chapter3.TraceDivTaskDemo.main(TraceDivTaskDemo.java:14)
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
    at chapter3.NoTraceDivTaskDemo$DivTask.run(NoTraceDivTaskDemo.java:21)
    at chapter3.TraceThreadPoolExecutor$1.run(TraceThreadPoolExecutor.java:25)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:835)
100.0
25.0
50.0
33.0

分而治之:Fork/Join框架

Fork和Join是什么意思秘狞?
Fork:創(chuàng)建子線程
Join:等待
使用fork后系統(tǒng)多了一個(gè)執(zhí)行分支(線程),因此需要等待這個(gè)執(zhí)行分支完成蹈集,才能得到最終的結(jié)果
分而治之:把一個(gè)計(jì)算任務(wù)分成許多小任務(wù)烁试,將這些小任務(wù)的計(jì)算結(jié)果再合成

  • Fork/Join的執(zhí)行邏輯如下圖:


    fork-join.PNG
  • 互相幫助的線程:
  • 互相幫助的線程.PNG

    當(dāng)線程試圖幫助別人是,總是從其任務(wù)隊(duì)列的底部開(kāi)始拿數(shù)據(jù)拢肆,而線程執(zhí)行自己的任務(wù)時(shí)减响,則是從相反的頂部開(kāi)始拿靖诗。有利于避免數(shù)據(jù)競(jìng)爭(zhēng)
    Fork/Join框架的使用,計(jì)算數(shù)列之和:

package chapter3;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
 * Created by 13 on 2017/5/5.
 */
public class CountTask extends RecursiveTask {
    private static final int THRESHOLD = 10000;

    private long start;
    private long end;

    public CountTask(long start, long end) {
        this.start = start;
        this.end = end;
    }


    @Override
    protected Long compute() {
        long sum = 0;
        boolean canCompute = (end - start) < THRESHOLD;
        if (canCompute) {
            for (long i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            long step = (start+end) / 100;
            ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
            long pos = start;
            for (int i = 0; i < 100; i++) {
                long lastOne = pos + step;
                if (lastOne > end) {
                    lastOne = end;
                }
                CountTask subTask = new CountTask(pos, lastOne);
                pos += step + 1;
                subTasks.add(subTask);
                subTask.fork();
            }
            for (CountTask t : subTasks) {
                sum += (Long) t.join();
            }
        }
        return sum;
    }
    public static void main(String args[]) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask task = new CountTask(0, 200000L);
        ForkJoinTask<Long> result = forkJoinPool.submit(task);
        long res = 0;
        try {
            res = result.get();
            System.out.println("sum=" + res);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

不要重復(fù)發(fā)明輪子:JDK的并發(fā)容器

超好用的工具類:并發(fā)集合簡(jiǎn)介

JDK提供的這些容器大部分都在java.util.concurrent包中

  • ConcurrentHashMap:線程安全的HashMap
  • CopyOnWriteArrayList:在讀多寫(xiě)少的場(chǎng)合支示,性能遠(yuǎn)好于Vector
  • ConcurrentLinkedQueue:高效的并發(fā)隊(duì)列刊橘,使用鏈表實(shí)現(xiàn),線程安全的LinkedList
  • BlockingQueue:一個(gè)接口颂鸿,JDK內(nèi)部通過(guò)鏈表促绵、數(shù)組等方式實(shí)現(xiàn)了這個(gè)接口
  • ConcurrentSkipListMap:跳表的實(shí)現(xiàn),Map据途,使用跳表的數(shù)據(jù)結(jié)構(gòu)進(jìn)行快速查找
  • Vector和Collections工具類可以將任何集合包裝成線程安全帶集合

線程安全的HashMap

  • 兩種實(shí)現(xiàn)方式:
  • 使用Collections.synchronizedMap()包裝HashMap,如下得到的m就是線程安全的HashMap:
public static Map m = Collections.synchronizedMap(new HashMap());

高效讀冉视蕖:不變模式下的CopyOnWriteArrayList

數(shù)據(jù)共享通道:BlockingQueue

“中間媒介”

  • ArrayBlockingQueue
  • LinkedBlockingQueue

隨機(jī)數(shù)據(jù)結(jié)構(gòu):跳表

什么是跳表?
一種用來(lái)快速查找的數(shù)據(jù)結(jié)構(gòu)颖医,類似于平衡樹(shù)位衩。但是平衡樹(shù)的插入和刪除可能導(dǎo)致平衡樹(shù)進(jìn)行一次全局的調(diào)整,而對(duì)跳表的插入和刪除只需要對(duì)整個(gè)數(shù)據(jù)結(jié)構(gòu)的局部進(jìn)行操作即可熔萧。因此在高并發(fā)時(shí)糖驴,平衡樹(shù)需要一個(gè)全局鎖來(lái)保證線程安全,跳表只需要局部鎖即可佛致,跳表的查找時(shí)間復(fù)雜度同樣為O(logn).JDK 使用跳表實(shí)現(xiàn)一個(gè)Map.

  • 隨機(jī)贮缕。本質(zhì)是同時(shí)維護(hù)了多個(gè)鏈表,并且鏈表是分層的


    跳表的數(shù)據(jù)結(jié)構(gòu).PNG
  • 所有鏈表都是有序的
  • 使用跳表實(shí)現(xiàn)Map和使用哈希算法實(shí)現(xiàn)Map的一個(gè)區(qū)別:哈希并不會(huì)保存元素的順序俺榆,而跳表內(nèi)所有元素都是有序的
  • ConcurrentSkipListMap
    • Node:每個(gè)節(jié)點(diǎn)存放鍵值對(duì)和指向下一個(gè)節(jié)點(diǎn)的指向
    static final class Node<K,V>{
      final K key;
      volatile Object value;
      volatile Node<K,V> next;
    }
    
    • Index:包裝了Node感昼,同時(shí)增加了向下和向右的引用
    static class Index<K,V>{
      final Node<K,V> node;  
      final Index<K,V> down;
      final Index<K,V> right;
    }
    
    • HeadIndex
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市罐脊,隨后出現(xiàn)的幾起案子定嗓,更是在濱河造成了極大的恐慌,老刑警劉巖萍桌,帶你破解...
    沈念sama閱讀 218,386評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件宵溅,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡上炎,警方通過(guò)查閱死者的電腦和手機(jī)恃逻,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,142評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)藕施,“玉大人寇损,你說(shuō)我怎么就攤上這事∏Π” “怎么了润绵?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,704評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)胞谈。 經(jīng)常有香客問(wèn)我尘盼,道長(zhǎng)憨愉,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,702評(píng)論 1 294
  • 正文 為了忘掉前任卿捎,我火速辦了婚禮配紫,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘午阵。我一直安慰自己躺孝,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,716評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布底桂。 她就那樣靜靜地躺著植袍,像睡著了一般。 火紅的嫁衣襯著肌膚如雪籽懦。 梳的紋絲不亂的頭發(fā)上于个,一...
    開(kāi)封第一講書(shū)人閱讀 51,573評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音暮顺,去河邊找鬼厅篓。 笑死,一個(gè)胖子當(dāng)著我的面吹牛捶码,可吹牛的內(nèi)容都是我干的羽氮。 我是一名探鬼主播,決...
    沈念sama閱讀 40,314評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼惫恼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼档押!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起祈纯,我...
    開(kāi)封第一講書(shū)人閱讀 39,230評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤汇荐,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后盆繁,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,680評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡旬蟋,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,873評(píng)論 3 336
  • 正文 我和宋清朗相戀三年油昂,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片倾贰。...
    茶點(diǎn)故事閱讀 39,991評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡冕碟,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出匆浙,到底是詐尸還是另有隱情安寺,我是刑警寧澤,帶...
    沈念sama閱讀 35,706評(píng)論 5 346
  • 正文 年R本政府宣布首尼,位于F島的核電站挑庶,受9級(jí)特大地震影響言秸,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜迎捺,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,329評(píng)論 3 330
  • 文/蒙蒙 一举畸、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧凳枝,春花似錦抄沮、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,910評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至蹋订,卻和暖如春率挣,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背辅辩。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,038評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工难礼, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人玫锋。 一個(gè)月前我還...
    沈念sama閱讀 48,158評(píng)論 3 370
  • 正文 我出身青樓蛾茉,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親撩鹿。 傳聞我的和親對(duì)象是個(gè)殘疾皇子谦炬,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,941評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容