JDK并發(fā)包
本章內(nèi)容:
1巫员、關(guān)于同步控制的工具
2豪硅、線程池
3捎琐、JDK的一些并發(fā)容器
多線程的團(tuán)隊(duì)協(xié)作:同步控制
synchronized的功能擴(kuò)展:重入鎖
-
可以完全替代synchronized,使用java.util.concurrent.locks.ReentrantLock類來(lái)實(shí)現(xiàn)
public class ReenterLock implements Runnable { public static ReentrantLock lock = new ReentrantLock(); public static int i = 0; @Override public void run() { for (int j = 0; j < 1000000; j++) { lock.lock(); try { i++; } finally { lock.unlock(); } } } public static void main(String args[]) throws InterruptedException { ReenterLock reenterLock = new ReenterLock(); Thread thread1 = new Thread(reenterLock); Thread thread2 = new Thread(reenterLock); thread1.start(); thread2.start(); thread1.join(); thread2.join(); System.out.println(i); } }
- 執(zhí)行結(jié)果:
2000000
- 執(zhí)行結(jié)果:
如何理解重入虱咧?
這種鎖可以反復(fù)進(jìn)入耕拷,一個(gè)線程連續(xù)兩次獲得同一把鎖
- 中斷響應(yīng):lockInterruptibly
public class IntLock implements Runnable { public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; public IntLock(int lock) { this.lock = lock; } @Override public void run() { try { if (lock == 1) { lock1.lockInterruptibly(); Thread.sleep(500); lock2.lockInterruptibly(); } else { lock2.lockInterruptibly(); Thread.sleep(500); lock1.lockInterruptibly(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { if (lock1.isHeldByCurrentThread()) { lock1.unlock(); } if (lock2.isHeldByCurrentThread()) { lock2.unlock(); } System.out.println(Thread.currentThread().getId() + ":線程退出"); } } public static void main(String args[]) throws InterruptedException { IntLock r1 = new IntLock(1); IntLock r2 = new IntLock(2); Thread thread1 = new Thread(r1); Thread thread2 = new Thread(r2); thread1.start(); thread2.start(); Thread.sleep(1000); thread2.interrupt(); } }
- 執(zhí)行結(jié)果:
java.lang.InterruptedException at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:944) at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1263) at java.base/java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:317) at chapter3.IntLock.run(IntLock.java:27) at java.base/java.lang.Thread.run(Thread.java:835) 15:線程退出 14:線程退出
- 鎖申請(qǐng)等待限時(shí)trylock
帶參的trylock
不帶參的trylockpublic class TimeLock implements Runnable { public static ReentrantLock lock = new ReentrantLock(); @Override public void run() { try { if (lock.tryLock(5, TimeUnit.SECONDS)) { System.out.println(Thread.currentThread().getName()); System.out.println("get lock success"); Thread.sleep(60000); } else { System.out.println(Thread.currentThread().getName()); System.out.println("get lock failed"); } } catch (InterruptedException e) { e.printStackTrace(); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } public static void main(String args[]) { TimeLock timeLock = new TimeLock(); Thread thread1 = new Thread(timeLock); Thread thread2 = new Thread(timeLock); thread1.start(); thread2.start(); } }
public class TryLock implements Runnable { public static ReentrantLock lock1 = new ReentrantLock(); public static ReentrantLock lock2 = new ReentrantLock(); int lock; public TryLock(int lock) { this.lock = lock; } @Override public void run() { if (lock == 1) { while (true) { if (lock1.tryLock()) { try { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } if (lock2.tryLock()) { try { System.out.println(Thread.currentThread().getId() + ":My Job done;"); return; } finally { lock2.unlock(); } } } finally { lock1.unlock(); } } } } else { while (true) { if (lock2.tryLock()) { try { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } if (lock1.tryLock()) { try { System.out.println(Thread.currentThread().getId() + ":My Job done;"); return; } finally { lock1.unlock(); } } } finally { lock2.unlock(); } } } } } public static void main(String args[]) { TryLock r1 = new TryLock(1); TryLock r2 = new TryLock(2); Thread thread1 = new Thread(r1); Thread thread2 = new Thread(r2); thread1.start(); thread2.start(); } }
- 公平鎖
大多數(shù)情況下汹族,鎖的申請(qǐng)都是非公平的萧求,系統(tǒng)只是從等待隊(duì)列中隨機(jī)地的選出一個(gè)線程。公平類似于一種先到先服務(wù)策略顶瞒,不會(huì)導(dǎo)致某些線程一直得不到執(zhí)行從而產(chǎn)生饑餓的現(xiàn)象
重入鎖可以對(duì)公平性進(jìn)行設(shè)置
public ReentrantLock(boolean fair)
公平鎖案例:
public class FairLock implements Runnable {
public static ReentrantLock fairLock = new ReentrantLock(true);//設(shè)置true指定鎖是公平的,也可以不設(shè)置,分別運(yùn)行觀察公平鎖與非公平鎖間的區(qū)別
//public static ReentrantLock unfairLock = new ReentrantLock();
@Override
public void run() {
while (true) {
try {
fairLock.lock();
// unfairLock.lock();
System.out.println(Thread.currentThread().getName() + "獲得鎖");
} finally {
fairLock.unlock();
// unfairLock.unlock();
}
}
}
/**
* 公平鎖的一個(gè)特點(diǎn)是:不會(huì)產(chǎn)生饑餓現(xiàn)象,只要排隊(duì)最終都會(huì)得到資源.
* <p/>
* 但是實(shí)現(xiàn)公平鎖要求系統(tǒng)維護(hù)一個(gè)有序隊(duì)列,因此公平鎖的實(shí)現(xiàn)成本較高,性能相對(duì)低下.
*
* @param args
*/
public static void main(String args[]) {
FairLock r1 = new FairLock();
Thread thread1 = new Thread(r1, "Thread_t1");
Thread thread2 = new Thread(r1, "Thread_t2");
Thread thread3 = new Thread(r1, "Thread_t3");
thread1.start();
thread2.start();
thread3.start();
}
}
ReetrantLock()的幾個(gè)重要方法:
lock() //獲得鎖夸政,如果鎖已經(jīng)被占有,則等待
lockInterruptibly() //獲得鎖榴徐,但優(yōu)先響應(yīng)中斷
tryLock() //嘗試獲得鎖守问,如果成功,返回true坑资,失敗返回false,不等待耗帕,立即返回
tryLock(long time,TimeUnit unit)//在給定的時(shí)間嘗試獲取鎖
unlock() //釋放鎖
重入鎖的實(shí)現(xiàn)主要包含三個(gè)要素:
1、原子狀態(tài)
使用CAS操作來(lái)村粗當(dāng)前鎖狀態(tài)袱贮,判斷鎖是否已經(jīng)被別的線程持有
2仿便、等待隊(duì)列
所有沒(méi)有請(qǐng)求到鎖的線程,會(huì)進(jìn)入等待隊(duì)列進(jìn)行等待。待有線程釋放鎖后探越,系統(tǒng)從等待隊(duì)列中喚醒一個(gè)線程狡赐,繼續(xù)工作
3、阻塞原語(yǔ)park()和unpark()钦幔,用來(lái)掛起和恢復(fù)線程
重入鎖的好搭檔:Condition條件(與wait()和notify()的作用大致相同)
- Condition接口提供的基本方法如下:
Condition功能案例:參照wait與notify的工作流程void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long tine,TimeUnit unit) throws InterruptedException; boolean awaiUtilt(Date deadline) throws InterruptedException; void signal(); void signalAll();
ArrayBlockingQueue使用重入鎖和Condition對(duì)象的案例:public class ReenterLockCondition implements Runnable { public static ReentrantLock lock = new ReentrantLock(); public static Condition condition = lock.newCondition(); @Override public void run() { try { lock.lock(); condition.await(); System.out.println("Thread is going on"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String args[]) throws InterruptedException { ReenterLockCondition reenterLockCondition = new ReenterLockCondition(); Thread thread1 = new Thread(reenterLockCondition); thread1.start(); System.out.println("睡眠2秒鐘"); Thread.sleep(2000); lock.lock(); condition.signal(); lock.unlock(); } }
private final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
public void put(E e) throws InterruptedException{
if(e==null) throw new NullPointerException;
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try{
try{
while(count==items.length){
notFull.await();
}catch(InterruptedException e){
notFull.signal();
throw e;
}
}
insert(e);
}finally{
lock.unlock();
}
}
private void insert(E x){
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
public E take(E e) throws InterruptedException{
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try{
try{
while(count==0){
notEmpty.await();
}catch(InterruptedException e){
notEmpty.signal();
throw e;
}
}
extract(e);
}finally{
lock.unlock();
}
}
private E extract(){
final E[] items = this.items;
E x = items[takeIndex];
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();
return x;
}
允許多個(gè)線程同時(shí)訪問(wèn):信號(hào)量(Semphore)
什么是信號(hào)量枕屉?
信號(hào)量是對(duì)鎖的擴(kuò)展,對(duì)于內(nèi)部鎖synchronized和重入鎖ReentrantLock,一次只允許一個(gè)線程訪問(wèn)一個(gè)資源鲤氢,而信號(hào)量可以指定多個(gè)線程搀擂,同時(shí)訪問(wèn)同一個(gè)資源
- 信號(hào)量的構(gòu)造函數(shù)
public Semaphore(int permits)
public Semaphore(int permits,boolean fair)
許可是什么意思?
許可也就是準(zhǔn)入數(shù)卷玉,表示一次可以有多少個(gè)線程同時(shí)訪問(wèn)同一個(gè)資源
- 信號(hào)量的主要邏輯方法:
public void acquire()
public void acquireUninterruptibly()
public boolean tryAcquire()
public boolean tryAcquire(long timeout,TimeUnit unit)
public void release()
信號(hào)量使用實(shí)例:程序不會(huì)停止哨颂??相种?
public class SemapDemo implements Runnable {
final Semaphore semp = new Semaphore(5);
@Override
public void run() {
try {
semp.acquire();
Thread.sleep(2000);
System.out.println(Thread.currentThread().getId() + ":done!");
semp.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 總共20個(gè)線程,系統(tǒng)會(huì)以5個(gè)線程一組為單位,依次執(zhí)行并輸出
*
* @param args
*/
public static void main(String args[]) {
ExecutorService executorService = Executors.newFixedThreadPool(20);
final SemapDemo demo = new SemapDemo();
for (int i = 0; i < 20; i++) {
executorService.submit(demo);
}
}
}
ReadWriteLock讀寫(xiě)鎖
什么是讀寫(xiě)鎖威恼?
讀寫(xiě)鎖準(zhǔn)確的來(lái)說(shuō),是讀寫(xiě)分離的鎖寝并,當(dāng)使用內(nèi)部鎖或重入鎖時(shí)箫措,所有的讀與寫(xiě)之間的線程都是串行執(zhí)行,然而實(shí)際上衬潦,讀與讀之間不存在線程安全的問(wèn)題斤蔓,可以同時(shí)操作,讀寫(xiě)鎖就是為了解決這個(gè)問(wèn)題而出現(xiàn)的
- 讀寫(xiě)鎖的訪問(wèn)約束
讀 寫(xiě) 讀 非阻塞 阻塞 寫(xiě) 阻塞 阻塞 - 適用場(chǎng)景:系統(tǒng)中讀的次數(shù)遠(yuǎn)遠(yuǎn)多于寫(xiě)的次數(shù)
讀寫(xiě)鎖ReadWriteLock與非讀寫(xiě)鎖Lock的對(duì)比案例:
public Object handleRead(Lock lock) throws InterruptedException {
try {
lock.lock();
Thread.sleep(1000);//模擬讀操作
System.out.println("讀操作:" + value);
return value;
} finally {
lock.unlock();
}
}
public void handleWrite(Lock lock, int index) throws InterruptedException {
try {
lock.lock();
Thread.sleep(1000);//模擬寫(xiě)操作
System.out.println("寫(xiě)操作:" + value);
value = index;
} finally {
lock.unlock();
}
}
public static void main(String args[]) {
final ReadWriteLockDemo demo = new ReadWriteLockDemo();
Runnable readRunnable = new Runnable() {
@Override
public void run() {
//分別使用兩種鎖來(lái)運(yùn)行,性能差別很直觀的就體現(xiàn)出來(lái),使用讀寫(xiě)鎖后讀操作可以并行,節(jié)省了大量時(shí)間
try {
demo.handleRead(readLock);
//demo.handleRead(lock);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Runnable writeRunnable = new Runnable() {
@Override
public void run() {
//分別使用兩種鎖來(lái)運(yùn)行,性能差別很直觀的就體現(xiàn)出來(lái)
try {
demo.handleWrite(writeLock, new Random().nextInt(100));
//demo.handleWrite(lock, new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 18; i++) {
new Thread(readRunnable).start();
}
for (int i = 18; i < 20; i++) {
new Thread(writeRunnable).start();
}
}
}
倒計(jì)時(shí)器:CountDownLatch
什么是CountDownLatch?
是一種多線程并發(fā)控制工具镀岛,類比于與火箭的檢查工作弦牡,在倒計(jì)時(shí)結(jié)束后,線程才開(kāi)始執(zhí)行
- 構(gòu)造函數(shù):
public CountDownLatch(int count)
- 案例:
public class CountDownLatchDemo implements Runnable { static final CountDownLatch end = new CountDownLatch(10); static final CountDownLatchDemo demo = new CountDownLatchDemo(); @Override public void run() { try { Thread.sleep(new Random().nextInt(3) * 1000); System.out.println("check complete"); end.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String args[]) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { executorService.submit(demo); } //等待檢查 end.await(); //發(fā)射火箭 System.out.println("Fire!"); executorService.shutdown(); } }
循環(huán)柵欄:CyclicBarrier
什么是循環(huán)柵欄漂羊?
另外一種多線程并發(fā)控制實(shí)用工具驾锰,與CountDownlatch類似,比其更強(qiáng)大
實(shí)例:
線程阻塞工具類:LockSupport
類似于wait()和notify()
線程復(fù)用:線程
在實(shí)際生產(chǎn)環(huán)境中拨与,線程的數(shù)量必須得以控制稻据。盲目的大量創(chuàng)造線程是對(duì)系統(tǒng)有傷害的,所以就出現(xiàn)了線程池买喧,對(duì)線程加以控制和管理
什么是線程池
當(dāng)需要使用線程時(shí)捻悯,從線程池獲取一個(gè)線程執(zhí)行,當(dāng)執(zhí)行完畢后淤毛,將線程返還給線程池
- 線程池的作用.PNG
不重復(fù)發(fā)明輪子:JDK 對(duì)線程池的支持
- Executor框架結(jié)構(gòu)圖
- Executor 提供了各種類型的線程池
- 1.固定大小的線程池案例:
- 2.計(jì)劃任務(wù)newScheduledThreadPool
刨根究底:核心線程池的內(nèi)部實(shí)現(xiàn)
無(wú)論是newFixedThreadPool()方法今缚、newSingleThreadExecutor還是newCachedThreadPool()方法,均使用ThreadPoolExecutor實(shí)現(xiàn)
public static ExecutorService newFixedThreadPool(int nThreads)
{ return new ThreadPoolExecutor(nThreads,
nThreads,
0L,
TimeUnit.MILLSECONDS,
new LinkedBlockingQueue<Runanble>());
}
public static ExecutorService newSingleThreadPool(int nThreads){ return new FinalizableDelegatedExecutorService( new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLSECONDS,new LinkedBlockingQueue<Runanble>()));
}
public static ExecutorService newCachedThreadPool(int nThreads){ return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueue<Runanble>());
}
ThreadPoolExecutor最重要的構(gòu)造函數(shù):
public ThreadPoolExecutor(int corePoolSize,//指定線程池中的線程數(shù)量
int maximumPoolSize,//指定線程池中最大線程數(shù)量
long keepAliveTime,//超過(guò)corePoolSize的空閑線程低淡,在多長(zhǎng)時(shí)間內(nèi)姓言,會(huì)被銷毀
TimeUnit unit,//keepAliveTime的單位
BlockingQueue<Runanble> workQueue,//任務(wù)隊(duì)列瞬项,被提交但尚未被執(zhí)行的任務(wù)
ThreadFactory threadFactory,//線程工廠
RejecttedExecutionHandler handler)//拒絕策略,當(dāng)任務(wù)太多來(lái)不及處理何荚,如何拒絕任務(wù)
workQueue:指被提交但未執(zhí)行的任務(wù)隊(duì)列囱淋,是一個(gè)BlockingQueue接口的對(duì)象,ThreadPoolExecutor可使用以下幾種BlockingQueue
- 直接提交的隊(duì)列:SynchronousQueue
- 有界的任務(wù)隊(duì)列:ArrayBlockingQueue
- 無(wú)界的任務(wù)隊(duì)列:LinkedBlockingQueue
- 優(yōu)先任務(wù)隊(duì)列:PriorityBlockingQueue
ThreadPoolExecutor線程池的核心調(diào)度代碼
調(diào)度邏輯可總結(jié)為:
- ThreadPoolExecutor的調(diào)度邏輯.PNG
超負(fù)載了怎么辦:拒絕策略
RejecttedExecutionHandler handler
JDK 內(nèi)置提供了四種拒絕策略:
- AbortPolicy :直接拋出異常餐塘,阻止系統(tǒng)正常工作
- CallerRunsPolicy:只要線程池未關(guān)閉妥衣,該策略直接在調(diào)用者線程中,運(yùn)行當(dāng)前被丟棄的任務(wù)
- DiscardOldestPolicy:丟棄最老的一個(gè)請(qǐng)求(也就是即將被執(zhí)行的任務(wù))戒傻,并嘗試再次提交當(dāng)前任務(wù)
- DiscardPolicy:默默丟棄税手,不做任何處理
自定義線程池和拒絕策略的使用:
public class RejectThreadPoolDemo{
public static class MyTask implements Runnable{
@Override
public void run(){
System.out.println(System.currentTimeMills()+":Thread ID:"
+ Thread.currentThread().getId());
try{
Thread.sleep(100);
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
public static void main(String[] args) {
MyTask task = new MyTask();
ExecutorService es = new ThreadPoolExecutor(5,5,
0L,TimeUnit.MILLSECONDS,
new LinkedBlockingQueue<Runnable>(10),
Executors.defaultThreadFactory(),
new RejectExecutionHandler(){
@Override
public void rejectedExecution(Runnable r,
ThreadPoolExecutor executor){
System.out.println(r.toString()+ is disacrd"");
}
});
for (int i=0;i<Integer.MAX_VALUE ;i++ ) {
es.submit(task);
Thread.sleep(10);
}
}
}
自定義線程創(chuàng)建:ThreadFactory
ThreadFactory是一個(gè)接口,只有一個(gè)方法需纳,用來(lái)創(chuàng)建線程:
Thread newThread(Runnable r);
自定義線程池的好處:
- 可以跟蹤線程池究竟在何時(shí)創(chuàng)建了線程芦倒,也饑餓自定義線程的名稱、組以及優(yōu)先級(jí)等信息不翩,甚至可以任性地將所有的線程設(shè)置為守護(hù)線程
如下:main主線程運(yùn)行2s后兵扬,JVM自動(dòng)退出,將會(huì)強(qiáng)制銷毀線程池
public static void main(String args[]) throws InterruptedException {
MyTask myTask = new MyTask();
ExecutorService executorService = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactory(){
@Override
public Thread new Thread(){
Thread t = new Thread(r);
t.setDaemon(true);
System.out.println("create" + t);
return t;
}
},
});
for (int i = 0; i < 5; i++) {
es.submit(task);
}
Thread.sleep(2000);
}
我的應(yīng)用我做主:擴(kuò)展線程池
什么時(shí)候需要擴(kuò)展線程池口蝠?
當(dāng)想監(jiān)控每個(gè)任務(wù)的開(kāi)始和結(jié)束時(shí)間周霉,或者其他一些自定義的增強(qiáng)功能時(shí)(動(dòng)態(tài)代理),就需要對(duì)線程池進(jìn)行擴(kuò)展
ThreadPoolExecutor也是一個(gè)可擴(kuò)展的線程池亚皂,提供了beforeExecute()、afterExecutor()和terminated()三個(gè)方法用于增強(qiáng)
ThreadPoolExecutor.Worker.runTask()方法的內(nèi)部實(shí)現(xiàn):
-
runTask方法.PNG
線程池?cái)U(kuò)展的例子:
public class ExtThreadPool {
public static class MyTask implements Runnable {
public String name;
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("正在執(zhí)行:Thread ID:" + Thread.currentThread().getId() + ",Task Name:" + name);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String args[]) throws InterruptedException {
ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>()) {
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("準(zhǔn)備執(zhí)行:" + ((MyTask) r).name);
}
protected void afterExecute(Thread t, Runnable r) {
System.out.println("執(zhí)行完成:" + ((MyTask) r).name);
}
protected void terminated() {
System.out.println("線程池退出!");
}
};
for (int i = 0; i < 5; i++) {
MyTask task = new MyTask("TASK-GEYM-" + i);
executorService.execute(task);
Thread.sleep(10);
}
executorService.shutdown();
}
}
注意:
1.此處用的是execute()方法提交任務(wù)国瓮,沒(méi)有用submit()
2.shutdown()關(guān)閉線程池時(shí)灭必,并不是暴力終止所有線程,而是發(fā)給一個(gè)關(guān)閉信號(hào)給線程池乃摹,線程池此時(shí)不能再接收其他任務(wù)禁漓,執(zhí)行完池內(nèi)剩余的線程
合理的選擇:優(yōu)化線程池?cái)?shù)量
- 線程池的數(shù)量.PNG
堆棧去哪里了:在線程池中尋找堆棧
submit()竟然沒(méi)有異常提示:
public class NoTraceDivTaskDemo {
public static class DivTask implements Runnable {
int a, b;
public DivTask(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public void run() {
double re = a / b;
System.out.println(re);
}
}
public static void main(String args[]) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
for (int i = 0; i < 5; i++) {
poolExecutor.submit(new DivTask(100, i)); //沒(méi)有報(bào)錯(cuò)提示
// poolExecutor.execute(new DivTask(100, i));//有報(bào)錯(cuò)提示
}
}
}
很明顯,當(dāng)當(dāng)我執(zhí)行上述程序時(shí)孵睬,應(yīng)該后出現(xiàn)除零異常播歼,但是程序運(yùn)行結(jié)果實(shí)際上如下:
100.0
25.0
33.0
50.0
注釋submit,使用execute提交掰读,會(huì)有錯(cuò)誤提示:
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at chapter3.NoTraceDivTaskDemo$DivTask.run(NoTraceDivTaskDemo.java:21)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:835)
100.0
25.0
33.0
50.0
自己查看提交任務(wù)線程的堆棧信息:
public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public void execute(Runnable task) {
super.execute(wrap(task, clientTrace(), Thread.currentThread().getName()));
}
private Runnable wrap(final Runnable task, final Exception clientTrace, String name) {
return new Runnable() {
@Override
public void run() {
try {
task.run();
} catch (Exception e) {
clientTrace.printStackTrace();
throw e;
}
}
};
}
private Exception clientTrace() {
return new Exception("Client stack trace");
}
}
public static void main(String args[]) {
ThreadPoolExecutor threadPoolExecutor = new TraceThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
for (int i = 0; i < 5; i++) {
threadPoolExecutor.execute(new NoTraceDivTaskDemo.DivTask(100, i));
}
}
- 運(yùn)行結(jié)果:
java.lang.Exception: Client stack trace
at chapter3.TraceThreadPoolExecutor.clientTrace(TraceThreadPoolExecutor.java:35)
at chapter3.TraceThreadPoolExecutor.execute(TraceThreadPoolExecutor.java:17)
at chapter3.TraceDivTaskDemo.main(TraceDivTaskDemo.java:14)
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at chapter3.NoTraceDivTaskDemo$DivTask.run(NoTraceDivTaskDemo.java:21)
at chapter3.TraceThreadPoolExecutor$1.run(TraceThreadPoolExecutor.java:25)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:835)
100.0
25.0
50.0
33.0
分而治之:Fork/Join框架
Fork和Join是什么意思秘狞?
Fork:創(chuàng)建子線程
Join:等待
使用fork后系統(tǒng)多了一個(gè)執(zhí)行分支(線程),因此需要等待這個(gè)執(zhí)行分支完成蹈集,才能得到最終的結(jié)果
分而治之:把一個(gè)計(jì)算任務(wù)分成許多小任務(wù)烁试,將這些小任務(wù)的計(jì)算結(jié)果再合成
-
Fork/Join的執(zhí)行邏輯如下圖:
fork-join.PNG - 互相幫助的線程:
-
互相幫助的線程.PNG
當(dāng)線程試圖幫助別人是,總是從其任務(wù)隊(duì)列的底部開(kāi)始拿數(shù)據(jù)拢肆,而線程執(zhí)行自己的任務(wù)時(shí)减响,則是從相反的頂部開(kāi)始拿靖诗。有利于避免數(shù)據(jù)競(jìng)爭(zhēng)
Fork/Join框架的使用,計(jì)算數(shù)列之和:
package chapter3;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* Created by 13 on 2017/5/5.
*/
public class CountTask extends RecursiveTask {
private static final int THRESHOLD = 10000;
private long start;
private long end;
public CountTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
boolean canCompute = (end - start) < THRESHOLD;
if (canCompute) {
for (long i = start; i <= end; i++) {
sum += i;
}
} else {
long step = (start+end) / 100;
ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
long pos = start;
for (int i = 0; i < 100; i++) {
long lastOne = pos + step;
if (lastOne > end) {
lastOne = end;
}
CountTask subTask = new CountTask(pos, lastOne);
pos += step + 1;
subTasks.add(subTask);
subTask.fork();
}
for (CountTask t : subTasks) {
sum += (Long) t.join();
}
}
return sum;
}
public static void main(String args[]) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
CountTask task = new CountTask(0, 200000L);
ForkJoinTask<Long> result = forkJoinPool.submit(task);
long res = 0;
try {
res = result.get();
System.out.println("sum=" + res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
不要重復(fù)發(fā)明輪子:JDK的并發(fā)容器
超好用的工具類:并發(fā)集合簡(jiǎn)介
JDK提供的這些容器大部分都在java.util.concurrent
包中
- ConcurrentHashMap:線程安全的HashMap
- CopyOnWriteArrayList:在讀多寫(xiě)少的場(chǎng)合支示,性能遠(yuǎn)好于Vector
- ConcurrentLinkedQueue:高效的并發(fā)隊(duì)列刊橘,使用鏈表實(shí)現(xiàn),線程安全的LinkedList
- BlockingQueue:一個(gè)接口颂鸿,JDK內(nèi)部通過(guò)鏈表促绵、數(shù)組等方式實(shí)現(xiàn)了這個(gè)接口
- ConcurrentSkipListMap:跳表的實(shí)現(xiàn),Map据途,使用跳表的數(shù)據(jù)結(jié)構(gòu)進(jìn)行快速查找
- Vector和Collections工具類可以將任何集合包裝成線程安全帶集合
線程安全的HashMap
- 兩種實(shí)現(xiàn)方式:
- Collections對(duì)HashMap包裝
- ConcurrentHashMap
- 使用Collections.synchronizedMap()包裝HashMap,如下得到的m就是線程安全的HashMap:
public static Map m = Collections.synchronizedMap(new HashMap());
高效讀冉视蕖:不變模式下的CopyOnWriteArrayList
數(shù)據(jù)共享通道:BlockingQueue
“中間媒介”
- ArrayBlockingQueue
- LinkedBlockingQueue
隨機(jī)數(shù)據(jù)結(jié)構(gòu):跳表
什么是跳表?
一種用來(lái)快速查找的數(shù)據(jù)結(jié)構(gòu)颖医,類似于平衡樹(shù)位衩。但是平衡樹(shù)的插入和刪除可能導(dǎo)致平衡樹(shù)進(jìn)行一次全局的調(diào)整,而對(duì)跳表的插入和刪除只需要對(duì)整個(gè)數(shù)據(jù)結(jié)構(gòu)的局部進(jìn)行操作即可熔萧。因此在高并發(fā)時(shí)糖驴,平衡樹(shù)需要一個(gè)全局鎖來(lái)保證線程安全,跳表只需要局部鎖即可佛致,跳表的查找時(shí)間復(fù)雜度同樣為O(logn).JDK 使用跳表實(shí)現(xiàn)一個(gè)Map.
-
隨機(jī)贮缕。本質(zhì)是同時(shí)維護(hù)了多個(gè)鏈表,并且鏈表是分層的
跳表的數(shù)據(jù)結(jié)構(gòu).PNG - 所有鏈表都是有序的
- 使用跳表實(shí)現(xiàn)Map和使用哈希算法實(shí)現(xiàn)Map的一個(gè)區(qū)別:哈希并不會(huì)保存元素的順序俺榆,而跳表內(nèi)所有元素都是有序的
- ConcurrentSkipListMap
- Node:每個(gè)節(jié)點(diǎn)存放鍵值對(duì)和指向下一個(gè)節(jié)點(diǎn)的指向
static final class Node<K,V>{ final K key; volatile Object value; volatile Node<K,V> next; }
- Index:包裝了Node感昼,同時(shí)增加了向下和向右的引用
static class Index<K,V>{ final Node<K,V> node; final Index<K,V> down; final Index<K,V> right; }
- HeadIndex