什么是線程池
Java中的線程池是運用場景最多的并發(fā)框架衬横,幾乎所有需要異步或并發(fā)執(zhí)行任務的程序都可以使用線程池。在開發(fā)過程中颁褂,合理地使用線程池能夠帶來3個好處趾牧。
第一:降低資源消耗检盼。通過重復利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
第二:提高響應速度翘单。當任務到達時梯皿,任務可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
第三:提高線程的可管理性县恕。線程是稀缺資源,如果無限制地創(chuàng)建剂桥,不僅會消耗系統(tǒng)資源忠烛,
還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進行統(tǒng)一分配权逗、調優(yōu)和監(jiān)控美尸。但是冤议,要做到合理利用
線程池,必須對其實現原理了如指掌师坎。
線程池的作用
線程池是為突然大量爆發(fā)的線程設計的恕酸,通過有限的幾個固定線程為大量的操作服務,減少了創(chuàng)建和銷毀線程所需的時間胯陋,從而提高效率蕊温。
如果一個線程的時間非常長,就沒必要用線程池了(不是不能作長時間操作遏乔,而是不宜义矛。),況且我們還不能控制線程池中線程的開始盟萨、掛起凉翻、和中止。
線程池的分類
ThreadPoolExecutor
Java是天生就支持并發(fā)的語言捻激,支持并發(fā)意味著多線程制轰,線程的頻繁創(chuàng)建在高并發(fā)及大數據量是非常消耗資源的,因為java提供了線程池胞谭,在jdk1.5以前的版本中垃杖,線程池的使用是及其簡陋的,但是在JDK1.5后韭赘,有了很大的改善缩滨。JDK1.5之后加入了java.util.concurrent包,java.util.concurrent包的加入給予開發(fā)人員開發(fā)并發(fā)程序以及解決并發(fā)問題很大的幫助泉瞻。
這里主要介紹下并發(fā)包下的Executor接口脉漏,Executor接口雖然作為一個非常舊的接口(JDK1.5 2004年發(fā)布),但是很多程序員對于其中的一些原理還是不熟悉袖牙。
Executor框架的最頂層實現是ThreadPoolExecutor類侧巨,Executors工廠類中提供的newScheduledThreadPool、newFixedThreadPool鞭达、newCachedThreadPool方法其實也只是ThreadPoolExecutor的構造函數參數不同而已司忱。通過傳入不同的參數,就可以構造出適用于不同應用場景下的線程池畴蹭。
傳入參數:
corePoolSize: 核心池的大小坦仍。 當有任務來之后,就會創(chuàng)建一個線程去執(zhí)行任務叨襟,當線程池中的線程數目達到corePoolSize后繁扎,就會把到達的任務放到緩存隊列當中。
maximumPoolSize: 線程池最大線程數,它表示在線程池中最多能創(chuàng)建多少個線程梳玫。
keepAliveTime: 表示線程沒有任務執(zhí)行時最多保持多久時間會終止爹梁。
unit: 參數keepAliveTime的時間單位,有7種取值提澎,在TimeUnit類中有7種靜態(tài)屬性
線程池四種創(chuàng)建方式
Java通過Executors(jdk1.5并發(fā)包)提供四種線程池姚垃,分別為:
newCachedThreadPool:創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要盼忌,可靈活回收空閑線程积糯,若無可回收,則新建線程碴犬。
newFixedThreadPool :創(chuàng)建一個定長線程池絮宁,可控制線程最大并發(fā)數,超出的線程會在隊列中等待服协。
newScheduledThreadPool :創(chuàng)建一個定時線程池绍昂,支持定時及周期性任務執(zhí)行。
newSingleThreadExecutor: 創(chuàng)建一個單線程化的線程池偿荷,它只會用唯一的工作線程來執(zhí)行任務窘游,保證所有任務按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行。
newCachedThreadPool
創(chuàng)建一個可緩存線程池跳纳,如果線程池長度超過處理需要忍饰,可靈活回收空閑線程,若無可回收寺庄,則新建線程艾蓝。
- 示例代碼如下
public class Demo01 {
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 1; i <= 13; i++) {
final int temp = i;
cachedThreadPool.execute(new Runnable() {
public void run() {
System.out.println("ThreadName:"+Thread.currentThread().getName()+",i:"+temp);
}
});
}
}
}
- 運行結果
ThreadName:pool-1-thread-2,i:2
ThreadName:pool-1-thread-3,i:3
ThreadName:pool-1-thread-1,i:1
ThreadName:pool-1-thread-4,i:4
ThreadName:pool-1-thread-1,i:13
ThreadName:pool-1-thread-3,i:12
ThreadName:pool-1-thread-5,i:5
ThreadName:pool-1-thread-6,i:6
ThreadName:pool-1-thread-7,i:7
ThreadName:pool-1-thread-8,i:8
ThreadName:pool-1-thread-9,i:9
ThreadName:pool-1-thread-10,i:10
ThreadName:pool-1-thread-11,i:11
可見,并沒有創(chuàng)建13個線程斗塘,而是重復利用了緩存赢织。
總結: 線程池為無限大,當執(zhí)行第二個任務時第一個任務已經完成馍盟,會復用執(zhí)行第一個任務的線程于置,而不用每次新建線程。
newFixedThreadPool
創(chuàng)建一個定長線程池贞岭,可控制線程最大并發(fā)數八毯,超出的線程會在隊列中等待。
- 示例代碼如下:
public class Demo01 {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
for (int i = 1; i <= 13; i++) {
final int temp = i;
threadPool.execute(new Runnable() {
public void run() {
System.out.println("ThreadName:"+Thread.currentThread().getName()+",i:"+temp);
}
});
}
}
}
- 運行結果
ThreadName:pool-1-thread-1,i:1
ThreadName:pool-1-thread-1,i:4
ThreadName:pool-1-thread-2,i:2
ThreadName:pool-1-thread-1,i:5
ThreadName:pool-1-thread-2,i:6
ThreadName:pool-1-thread-1,i:7
ThreadName:pool-1-thread-2,i:8
ThreadName:pool-1-thread-2,i:10
ThreadName:pool-1-thread-2,i:11
ThreadName:pool-1-thread-2,i:12
ThreadName:pool-1-thread-2,i:13
ThreadName:pool-1-thread-1,i:9
ThreadName:pool-1-thread-3,i:3
總結:因為線程池大小為3瞄桨,每個任務輸出index后sleep 2秒,所以每兩秒打印3個數字芯侥。定長線程池的大小最好根據系統(tǒng)資源進行設置泊交。如Runtime.getRuntime().availableProcessors();
newScheduledThreadPool
創(chuàng)建一個定長線程池,支持定時及周期性任務執(zhí)行活合。
- 延遲執(zhí)行示例代碼如下:
public class Demo01 {
public static void main(String[] args)
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(3);
for (int i = 1; i <= 10; i++) {
final int temp = i;
threadPool.schedule(new Runnable() {
public void run() {
System.out.println("ThreadName:"+Thread.currentThread().getName()+",i:"+temp);
}
},3,TimeUnit.SECONDS);
}
}
}
- 運行結果
ThreadName:pool-1-thread-3,i:2
ThreadName:pool-1-thread-1,i:1
ThreadName:pool-1-thread-3,i:4
ThreadName:pool-1-thread-2,i:3
ThreadName:pool-1-thread-3,i:6
ThreadName:pool-1-thread-1,i:5
ThreadName:pool-1-thread-3,i:8
ThreadName:pool-1-thread-3,i:10
ThreadName:pool-1-thread-1,i:9
ThreadName:pool-1-thread-2,i:7
表示延遲3秒執(zhí)行
newSingleThreadExecutor
創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務物赶,保證所有任務按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行白指。
- 示例代碼如下:
public class Demo01 {
public static void main(String[] args)
ExecutorService threadExecutor = Executors.newSingleThreadExecutor();
for (int i = 1; i <= 10; i++) {
final int temp = i;
threadExecutor.execute(new Runnable() {
public void run() {
System.out.println("ThreadName:"+Thread.currentThread().getName()+",i:"+temp);
}
});
}
}
}
- 運行結果
ThreadName:pool-1-thread-1,i:1
ThreadName:pool-1-thread-1,i:2
ThreadName:pool-1-thread-1,i:3
ThreadName:pool-1-thread-1,i:4
ThreadName:pool-1-thread-1,i:5
ThreadName:pool-1-thread-1,i:6
ThreadName:pool-1-thread-1,i:7
ThreadName:pool-1-thread-1,i:8
ThreadName:pool-1-thread-1,i:9
ThreadName:pool-1-thread-1,i:10
注意: 結果依次輸出,相當于順序執(zhí)行各個任務酵紫。
線程池原理剖析
提交一個任務到線程池中告嘲,線程池的處理流程如下:
1、判斷線程池里的核心線程是否都在執(zhí)行任務奖地,如果不是(核心線程空閑或者還有核心線程沒有被創(chuàng)建)則創(chuàng)建一個新的工作線程來執(zhí)行任務橄唬。如果核心線程都在執(zhí)行任務,則進入下個流程参歹。
2仰楚、線程池判斷工作隊列是否已滿,如果工作隊列沒有滿犬庇,則將新提交的任務存儲在這個工作隊列里僧界。如果工作隊列滿了,則進入下個流程臭挽。
3捂襟、判斷線程池里的線程是否都處于工作狀態(tài),如果沒有欢峰,則創(chuàng)建一個新的工作線程來執(zhí)行任務葬荷。如果已經滿了,則交給飽和策略來處理這個任務纽帖。
合理配置線程池
要想合理的配置線程池宠漩,就必須首先分析任務特性,可以從以下幾個角度來進行分析:
任務的性質:CPU密集型任務抛计,IO密集型任務和混合型任務哄孤。
任務的優(yōu)先級:高,中和低吹截。
任務的執(zhí)行時間:長瘦陈,中和短。
任務的依賴性:是否依賴其他系統(tǒng)資源波俄,如數據庫連接晨逝。
任務性質不同的任務可以用不同規(guī)模的線程池分開處理。CPU密集型任務配置盡可能少的線程數量懦铺,如配置Ncpu+1個線程的線程池捉貌。IO密集型任務則由于需要等待IO操作,線程并不是一直在執(zhí)行任務,則配置盡可能多的線程趁窃,如2*Ncpu牧挣。混合型的任務醒陆,如果可以拆分瀑构,則將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執(zhí)行的時間相差不是太大刨摩,那么分解后執(zhí)行的吞吐率要高于串行執(zhí)行的吞吐率寺晌,如果這兩個任務執(zhí)行時間相差太大,則沒必要進行分解澡刹。我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當前設備的CPU個數呻征。
優(yōu)先級不同的任務可以使用優(yōu)先級隊列PriorityBlockingQueue來處理。它可以讓優(yōu)先級高的任務先得到執(zhí)行罢浇,需要注意的是如果一直有優(yōu)先級高的任務提交到隊列里陆赋,那么優(yōu)先級低的任務可能永遠不能執(zhí)行。
執(zhí)行時間不同的任務可以交給不同規(guī)模的線程池來處理己莺,或者也可以使用優(yōu)先級隊列奏甫,讓執(zhí)行時間短的任務先執(zhí)行。
依賴數據庫連接池的任務凌受,因為線程提交SQL后需要等待數據庫返回結果阵子,如果等待的時間越長CPU空閑時間就越長,那么線程數應該設置越大胜蛉,這樣才能更好的利用CPU挠进。
總結:
CPU密集型時,任務可以少配置線程數誊册,大概和機器的cpu核數相當领突,這樣可以使得每個線程都在執(zhí)行任務
IO密集型時,大部分線程都阻塞案怯,故需要多配置線程數君旦,2*cpu核數
操作系統(tǒng)之名稱解釋:
某些進程花費了絕大多數時間在計算上,而其他則在等待I/O上花費了大多是時間嘲碱,
前者稱為計算密集型(CPU密集型)computer-bound金砍,后者稱為I/O密集型,I/O-bound麦锯。
悲觀鎖恕稠、樂觀鎖、排他鎖
場景
當多個請求同時操作數據庫時扶欣,首先將訂單狀態(tài)改為已支付鹅巍,在金額加上200千扶,在同時并發(fā)場景查詢條件下,會造成重復通知骆捧。
SQL:
悲觀鎖與樂觀鎖
悲觀鎖:
悲觀鎖悲觀的認為每一次操作都會造成更新丟失問題澎羞,在每次查詢時加上排他鎖。
每次去拿數據的時候都認為別人會修改敛苇,所以每次在拿數據的時候都會上鎖煤痕,這樣別人想拿這個數據就會block直到它拿到鎖。傳統(tǒng)的關系型數據庫里邊就用到了很多這種鎖機制接谨,比如行鎖,表鎖等塘匣,讀鎖脓豪,寫鎖等,都是在做操作之前先上鎖忌卤。
核心SQL代碼:
select * from xxx for update;
例如:
樂觀鎖:
總是認為不會產生并發(fā)問題扫夜,每次去取數據的時候總認為不會有其他線程對數據進行修改,因此不會上鎖驰徊,但是在更新時會判斷其他線程在這之前有沒有對數據進行修改笤闯,一般會使用版本號機制或CAS操作實現。
- version方式:
一般是在數據表中加上一個數據版本號version字段棍厂,表示數據被修改的次數颗味,當數據被修改時,version值會加一牺弹。當線程A要更新數據值時浦马,在讀取數據的同時也會讀取version值,在提交更新時张漂,若剛才讀取到的version值為當前數據庫中的version值相等時才更新晶默,否則重試更新操作,直到更新成功航攒。
核心SQL代碼:
update table set x=x+1, version=version+1 where id=#{id} and version=#{version}; - CAS操作方式:
即compare and swap 或者 compare and set磺陡,涉及到三個操作數,數據所在的內存值漠畜,預期值币他,新值。當需要更新時盆驹,判斷當前內存值與之前取到的值是否相等圆丹,若相等,則用新值更新躯喇,若失敗則重試辫封,一般情況下是一個自旋操作硝枉,即不斷的重試。
可重入鎖
鎖作為并發(fā)共享數據倦微,保證一致性的工具妻味,在JAVA平臺有多種實現(如 synchronized 和 ReentrantLock等等 ) 。這些已經寫好提供的鎖為我們開發(fā)提供了便利欣福。
重入鎖责球,也叫做遞歸鎖,指的是同一線程外層函數獲得鎖之后拓劝,內層遞歸函數仍然有獲取該鎖的代碼雏逾,但不受影響。
在JAVA環(huán)境下ReentrantLock和synchronized都是可重入鎖郑临。
- 代碼示例
public synchronized void get() {
System.out.println("name:" + Thread.currentThread().getName() + " get();");
set();
}
public synchronized void set() {
System.out.println("name:" + Thread.currentThread().getName() + " set();");
}
讀寫鎖
相比Java中的鎖(Locks in Java)里Lock實現栖博,讀寫鎖更復雜一些。假設你的程序中涉及到對一些共享資源的讀和寫操作厢洞,且寫操作沒有讀操作那么頻繁仇让。在沒有寫操作的時候,兩個線程同時讀一個資源沒有任何問題躺翻,所以應該允許多個線程能在同時讀取共享資源丧叽。但是如果有一個線程想去寫這些共享資源,就不應該再有其它線程對該資源進行讀或寫(譯者注:也就是說:讀-讀能共存公你,讀-寫不能共存踊淳,寫-寫不能共存)。這就需要一個讀/寫鎖來解決這個問題陕靠。Java5在java.util.concurrent包中已經包含了讀寫鎖嚣崭。盡管如此,我們還是應該了解其實現背后的原理懦傍。
public class Cache {
static Map<String, Object> map = new HashMap<String, Object>();
static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
static Lock r = rwl.readLock();
static Lock w = rwl.writeLock();
// 獲取一個key對應的value
public static final Object get(String key) {
r.lock();
try {
System.out.println("正在做讀的操作,key:" + key + " 開始");
Thread.sleep(100);
Object object = map.get(key);
System.out.println("正在做讀的操作,key:" + key + " 結束");
System.out.println();
return object;
} catch (InterruptedException e) {
} finally {
r.unlock();
}
return key;
}
// 設置key對應的value雹舀,并返回舊有的value
public static final Object put(String key, Object value) {
w.lock();
try {
System.out.println("正在做寫的操作,key:" + key + ",value:" + value + "開始.");
Thread.sleep(100);
Object object = map.put(key, value);
System.out.println("正在做寫的操作,key:" + key + ",value:" + value + "結束.");
System.out.println();
return object;
} catch (InterruptedException e) {
} finally {
w.unlock();
}
return value;
}
// 清空所有的內容
public static final void clear() {
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
Cache.put(i + "", i + "");
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
Cache.get(i + "");
}
}
}).start();
}
}
CAS無鎖機制
(1)與鎖相比,使用比較交換(下文簡稱CAS)會使程序看起來更加復雜一些粗俱。但由于其非阻塞性说榆,它對死鎖問題天生免疫,并且寸认,線程間的相互影響也遠遠比基于鎖的方式要小签财。更為重要的是,使用無鎖的方式完全沒有鎖競爭帶來的系統(tǒng)開銷偏塞,也沒有線程間頻繁調度帶來的開銷唱蒸,因此,它要比基于鎖的方式擁有更優(yōu)越的性能灸叼。
(2)無鎖的好處:
第一神汹,在高并發(fā)的情況下庆捺,它比有鎖的程序擁有更好的性能;
第二屁魏,它天生就是死鎖免疫的滔以。
就憑借這兩個優(yōu)勢,就值得我們冒險嘗試使用無鎖的并發(fā)氓拼。
(3)CAS算法的過程是這樣:它包含三個參數CAS(V,E,N): V表示要更新的變量你画,E表示預期值,N表示新值桃漾。僅當V值等于E值時坏匪,才會將V的值設為N,如果V值和E值不同撬统,則說明已經有其他線程做了更新剥槐,則當前線程什么都不做。最后宪摧,CAS返回當前V的真實值。
(4)CAS操作是抱著樂觀的態(tài)度進行的颅崩,它總是認為自己可以成功完成操作几于。當多個線程同時使用CAS操作一個變量時,只有一個會勝出沿后,并成功更新沿彭,其余均會失敗。失敗的線程不會被掛起尖滚,僅是被告知失敗喉刘,并且允許再次嘗試,當然也允許失敗的線程放棄操作漆弄∧郎眩基于這樣的原理,CAS操作即使沒有鎖撼唾,也可以發(fā)現其他線程對當前線程的干擾廉邑,并進行恰當的處理。
(5)簡單地說倒谷,CAS需要你額外給出一個期望值蛛蒙,也就是你認為這個變量現在應該是什么樣子的。如果變量不是你想象的那樣渤愁,那說明它已經被別人修改過了牵祟。你就重新讀取,再次嘗試修改就好了抖格。
(6)在硬件層面诺苹,大部分的現代處理器都已經支持原子化的CAS指令咕晋。在JDK 5.0以后,虛擬機便可以使用這個指令來實現并發(fā)操作和并發(fā)數據結構筝尾,并且捡需,這種操作在虛擬機中可以說是無處不在。
-
原子類
/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final int incrementAndGet() {
for (;;) {
//獲取當前值
int current = get();
//設置期望值
int next = current + 1;
//調用Native方法compareAndSet筹淫,執(zhí)行CAS操作
if (compareAndSet(current, next))
//成功后才會返回期望值站辉,否則無線循環(huán)
return next;
}
}
自旋鎖
自旋鎖是采用讓當前線程不停地的在循環(huán)體內執(zhí)行實現的,當循環(huán)的條件被其他線程改變時 才能進入臨界區(qū)损姜。
- 如下
private AtomicReference<Thread> sign =new AtomicReference<>();
public void lock() {
Thread current = Thread.currentThread();
while (!sign.compareAndSet(null, current)) { }
}
public void unlock() {
Thread current = Thread.currentThread();
sign.compareAndSet(current, null);
}
- 示例代碼
public class Test implements Runnable {
static int sum;
private SpinLock lock;
public Test(SpinLock lock) {
this.lock = lock;
}
/**
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
SpinLock lock = new SpinLock();
for (int i = 0; i < 100; i++) {
Test test = new Test(lock);
Thread t = new Thread(test);
t.start();
}
Thread.currentThread().sleep(1000);
System.out.println(sum);
}
@Override
public void run() {
this.lock.lock();
this.lock.lock();
sum++;
this.lock.unlock();
this.lock.unlock();
}
}
當一個線程 調用這個不可重入的自旋鎖去加鎖的時候沒問題饰剥,當再次調用lock()的時候,因為自旋鎖的持有引用已經不為空了摧阅,該線程對象會誤認為是別人的線程持有了自旋鎖使用了CAS原子操作萄唇,lock函數將owner設置為當前線程奈偏,并且預測原來的值為空。unlock函數將owner設置為null,并且預測值為當前線程达箍。
當有第二個線程調用lock操作時由于owner值不為空,導致循環(huán)一直被執(zhí)行守屉,直至第一個線程調用unlock函數將owner設置為null讥邻,第二個線程才能進入臨界區(qū)。
由于自旋鎖只是將當前線程不停地執(zhí)行循環(huán)體蜒什,不進行線程狀態(tài)的改變测秸,所以響應速度更快。但當線程數不停增加時灾常,性能下降明顯霎冯,因為每個線程都需要執(zhí)行,占用CPU時間钞瀑。如果線程競爭不激烈沈撞,并且保持鎖的時間段。適合使用自旋鎖雕什。
分布式鎖
如果想在不同的jvm中保證數據同步关串,使用分布式鎖技術。
有數據庫實現监徘、緩存實現晋修、Zookeeper分布式鎖