1.死鎖是什么扯罐?有什么危害戒悠?
1.1 什么是死鎖
- 發(fā)生在并發(fā)中
- 互不相讓:當兩個(或更多)線程(或進程)相互持有對方所需要的資源,又不主動釋放讥此,導致所有人都無法繼續(xù)前進,導致程序陷入無盡的阻塞谣妻,這就是死鎖萄喳。
- 多個線程造成死鎖的情況
1.2 死鎖的影響
死鎖的影響在不同系統(tǒng)中是不一樣的,這取決于系統(tǒng)對死鎖的處理能力
- 數(shù)據(jù)庫中:檢測并放棄事務
- JVM中:無法自動處理
幾率不高但危害大
- 不一定發(fā)生蹋半,但是遵守"墨菲定律"
- 一旦發(fā)生他巨,多是高并發(fā)場景,影響用戶多
- 整個系統(tǒng)崩潰、子系統(tǒng)崩潰染突、性能降低
- 壓力測試無法找出所有潛在的死鎖
1.3 發(fā)生死鎖的例子
必定發(fā)生死鎖的情況
/**
* 描述: 必定發(fā)生死鎖的情況
*/
public class MustDeadLock implements Runnable {
int flag = 1;
static Object o1 = new Object();
static Object o2 = new Object();
public static void main(String[] args) {
MustDeadLock r1 = new MustDeadLock();
MustDeadLock r2 = new MustDeadLock();
r1.flag = 1;
r2.flag = 0;
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();
t2.start();
}
@Override
public void run() {
System.out.println("flag = " + flag);
if (flag == 1) {
synchronized (o1) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o2) {
System.out.println("線程1成功拿到兩把鎖");
}
}
}
if (flag == 0) {
synchronized (o2) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o1) {
System.out.println("線程2成功拿到兩把鎖");
}
}
}
}
}
flag = 1
flag = 0
多個人相互轉賬
- 需要兩把鎖
- 獲取兩把鎖成功捻爷,且余額大于0,則扣除轉出人觉痛,增加收款人的余額役衡,是原子操作
-
順序相反
導致死鎖
public class MultiTransferMoney {
private static final int NUM_ACCOUNTS = 500;
private static final int NUM_MONEY = 1000;
private static final int NUM_ITERATIONS = 1000000;
private static final int NUM_THREADS = 100;
public static void main(String[] args) {
Random rnd = new Random();
TransferMoney.Account[] accounts = new TransferMoney.Account[NUM_ACCOUNTS];
for (int i = 0; i < accounts.length; i++) {
accounts[i] = new TransferMoney.Account(NUM_MONEY);
}
class TransferThread extends Thread {
@Override
public void run() {
for (int i = 0; i < NUM_ITERATIONS; i++) {
int fromAcct = rnd.nextInt(NUM_ACCOUNTS);
int toAcct = rnd.nextInt(NUM_ACCOUNTS);
int amount = rnd.nextInt(NUM_MONEY);
TransferMoney.transferMoney(accounts[fromAcct], accounts[toAcct], amount);
}
System.out.println("運行結束");
}
}
for (int i = 0; i < NUM_THREADS; i++) {
new TransferThread().start();
}
}
}
public class TransferMoney implements Runnable {
static Account a = new Account(500);
static Account b = new Account(500);
static Object lock = new Object();
int flag = 1;
public static void transferMoney(Account from, Account to, int amount) {
class Helper {
public void transfer() {
if (from.balance - amount < 0) {
System.out.println("余額不足,轉賬失敗薪棒。");
return;
}
from.balance -= amount;
to.balance = to.balance + amount;
System.out.println("成功轉賬" + amount + "元");
}
}
synchronized (from) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (to) {
new Helper().transfer();
}
}
}
@Override
public void run() {
if (flag == 1) {
transferMoney(a, b, 200);
}
if (flag == 0) {
transferMoney(b, a, 200);
}
}
static class Account {
int balance;
public Account(int balance) {
this.balance = balance;
}
}
}
死鎖
程序停止輸出發(fā)生死鎖
2. 死鎖的4個必要條件:
- 互斥條件
- 請求與保持條件
- 不剝奪條件
- 循環(huán)等待條件
3. 如何定位死鎖
3.1.jstack定位死鎖
3.2.ThreadMXBean定位死鎖
public class ThreadMXBeanDetection implements Runnable {
int flag = 1;
static Object o1 = new Object();
static Object o2 = new Object();
public static void main(String[] args) throws InterruptedException {
ThreadMXBeanDetection r1 = new ThreadMXBeanDetection();
ThreadMXBeanDetection r2 = new ThreadMXBeanDetection();
r1.flag = 1;
r2.flag = 0;
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
t1.start();
t2.start();
Thread.sleep(1000);
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
if (deadlockedThreads != null && deadlockedThreads.length > 0) {
for (int i = 0; i < deadlockedThreads.length; i++) {
ThreadInfo threadInfo = threadMXBean.getThreadInfo(deadlockedThreads[i]);
System.out.println("發(fā)現(xiàn)死鎖" + threadInfo.getThreadName());
}
}
}
@Override
public void run() {
System.out.println("flag = " + flag);
if (flag == 1) {
synchronized (o1) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o2) {
System.out.println("線程1成功拿到兩把鎖");
}
}
}
if (flag == 0) {
synchronized (o2) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (o1) {
System.out.println("線程2成功拿到兩把鎖");
}
}
}
}
}
flag = 0
flag = 1
發(fā)現(xiàn)死鎖Thread-1
發(fā)現(xiàn)死鎖Thread-0
4. 修復死鎖策略
線上發(fā)生死鎖應該什么辦
- 線上問題都需要防患與未然手蝎,不造成損失幾乎是不可能
- 保存死鎖數(shù)據(jù),然后立刻重啟服務器
- 暫時保證線上服務的安全俐芯,然后再利用剛才保存的信息棵介,排查死鎖,修改代碼吧史,重新發(fā)版
常見修復策略
4.1. 避免策略
示例1 修改兩人轉賬時獲取鎖的順序
經(jīng)過思考邮辽,我們可以發(fā)現(xiàn),其實轉賬時贸营,
并不在乎兩把鎖的相對獲取順序
吨述。轉賬的時候,我們無論先獲取到轉出賬戶鎖對象钞脂,還是先獲取到轉入賬戶鎖對象揣云,只要最終能拿到兩把鎖,就能進行安全的操作冰啃。所以我們來調整一下獲取鎖的順序邓夕,使得先獲取的賬戶和該賬戶是“轉入”或“轉出”無關,而是使用 HashCode 的值來決定順序阎毅,從而保證線程安全
public static void transferMoney(Account from, Account to, int amount) {
class Helper {
public void transfer() {
if (from.balance - amount < 0) {
System.out.println("余額不足焚刚,轉賬失敗。");
return;
}
from.balance -= amount;
to.balance = to.balance + amount;
System.out.println("成功轉賬" + amount + "元");
}
}
int fromHash = System.identityHashCode(from);
int toHash = System.identityHashCode(to);
if (fromHash < toHash) {
synchronized (from) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (to) {
new Helper().transfer();
}
}
}
else if (fromHash > toHash) {
synchronized (to) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (from) {
new Helper().transfer();
}
}
}else {
synchronized (lock) {
synchronized (to) {
synchronized (from) {
new Helper().transfer();
}
}
}
}
}
可以看到扇调,我們會分別計算出這兩個 Account 的 HashCode矿咕,然后根據(jù) HashCode 的大小來決定獲取鎖的順序。這樣一來肃拜,不論是哪個線程先執(zhí)行痴腌,不論是轉出還是被轉入,它獲取鎖的順序都會嚴格根據(jù) HashCode 的值來決定燃领,那么大家獲取鎖的順序就一樣了士聪,就不會出現(xiàn)獲取鎖順序相反的情況,也就避免了死鎖
總結:通過hashcode來決定獲取鎖的順序猛蔽、沖突時需要“加時賽”(再加一把鎖)獲取鎖
示例2 哲學家換手解決.改變一個哲學家拿筷子的順序.
/**
* 描述: 演示哲學家就餐問題導致的死鎖
*/
public class DiningForPhilosophers {
/**
* 哲學家類
*/
public static class Philosophers implements Runnable{
private Object leftChopsticks;
private Object rightChopsticks;
public Philosophers(Object leftChopsticks, Object rightChopsticks) {
this.leftChopsticks = leftChopsticks;
this.rightChopsticks = rightChopsticks;
}
@Override
public void run() {
try {
while(true){
action("思考......");
synchronized (leftChopsticks){
action("拿起左邊的筷子");
synchronized (rightChopsticks){
action("拿起右邊的筷子---吃飯");
action("放下右邊的筷子");
}
action("放下左邊的筷子");
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void action(String action) throws InterruptedException {
System.out.println(Thread.currentThread().getName()+"-->"+action);
//隨機休眠,代表這個行為執(zhí)行的耗時
Thread.sleep((long) (Math.random()*10));
}
};
public static void main(String[] args) {
Philosophers[] philosophers = new Philosophers[5];
Object[] chopsticks = new Object[philosophers.length];
//初始化筷子對象
for (int i=0;i<chopsticks.length;i++){
chopsticks[i] = new Object();
}
//初始化哲學家對象,并啟動線程.
for (int i=0;i<chopsticks.length;i++){
Object leftChopsticks = chopsticks[i];
Object rightChopsticks = chopsticks[(i+1)%chopsticks.length];
philosophers[i] = new Philosophers(leftChopsticks, rightChopsticks);
new Thread(philosophers[i],"哲學家"+i+"號").start();
}
}
}
同樣發(fā)生死鎖,這里大家陷入了一種循環(huán)等待的狀態(tài),0號獲取了他左邊的0號筷子,請求他右邊的1號筷子,1號獲取了左邊的1號筷子,請求等待他右邊的2號筷子...........5號獲取了他左邊的5號筷子等待他右手邊的0號筷子........這樣就形成了一個環(huán).
哲學家問題解決策略
- 服務員檢查
哲學家拿叉子時 服務員檢查叉子數(shù)量 - 改變一個哲學家拿叉子的順序
- 餐票
哲學家就餐時先拿到餐票就能就餐 - 領導調節(jié)(檢測與恢復策略)
這里演示改變一個哲學家拿叉子的順序修復
這樣就成功的避免了死鎖
4.2 檢測與恢復策略:一段時間檢測是否有死鎖,如果有就剝奪某個資源,來解除死鎖
允許死鎖的發(fā)生,但是發(fā)生死鎖后要記錄下來并通過停止線程或其他方式停止死鎖
死鎖檢測算法
- 允許發(fā)生死鎖
- 每次調用鎖的
記錄
- 定期檢查"
鎖的調用鏈路圖
"中是否存在環(huán)路 - 一旦發(fā)生死鎖剥悟,就用死鎖恢復機制進行
恢復
機制進行恢復
恢復方法1:進程終止
- 逐個終止線程灵寺,直到死鎖消除。
- 終止順序:
- 優(yōu)先級(重要性区岗,是前臺交互還是后臺處理)
- 已占用資源略板、還需要的資源
- 已經(jīng)運行時間
恢復方法2:資源搶占
- 把已經(jīng)分發(fā)出去的鎖給
收回來
- 讓線程
回退幾步
,這樣就不用結束整個線程慈缔,成本比較低
比如 讓哲學家把拿起的筷子再放下 - 缺點:可能同一個線程一直被搶占叮称,那就造成
饑餓
4.3 鴕鳥策略
死鎖發(fā)生的幾率特別小,忽略他,等死鎖發(fā)生了,再去處理修改
5 實際工程中如何避免死鎖
1. 設置超時
時間
- Lock的tryLock(long timeout,TimeUnit unit)
造成超時的可能性很多,發(fā)生了死鎖,線程陷入了死循環(huán),線程執(zhí)行很慢.
/**
* 描述: 用tryLock來避免死鎖
*/
public class TryLockDeadlock implements Runnable {
int flag = 1;
static Lock lock1 = new ReentrantLock();
static Lock lock2 = new ReentrantLock();
public static void main(String[] args) {
TryLockDeadlock r1 = new TryLockDeadlock();
TryLockDeadlock r2 = new TryLockDeadlock();
r1.flag = 1;
r2.flag = 0;
new Thread(r1).start();
new Thread(r2).start();
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
if (flag == 1) {
try {
if (lock1.tryLock(800, TimeUnit.MILLISECONDS)) {
System.out.println("線程1獲取到了鎖1");
Thread.sleep(new Random().nextInt(1000));
if (lock2.tryLock(800, TimeUnit.MILLISECONDS)) {
System.out.println("線程1獲取到了鎖2");
System.out.println("線程1成功獲取到了兩把鎖");
lock2.unlock();
lock1.unlock();
break;
} else {
System.out.println("線程1嘗試獲取鎖2失敗藐鹤,已重試");
lock1.unlock();
Thread.sleep(new Random().nextInt(1000));
}
} else {
System.out.println("線程1獲取鎖1失敗瓤檐,已重試");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (flag == 0) {
try {
if (lock2.tryLock(3000, TimeUnit.MILLISECONDS)) {
System.out.println("線程2獲取到了鎖2");
Thread.sleep(new Random().nextInt(1000));
if (lock1.tryLock(3000, TimeUnit.MILLISECONDS)) {
System.out.println("線程2獲取到了鎖1");
System.out.println("線程2成功獲取到了兩把鎖");
lock1.unlock();
lock2.unlock();
break;
} else {
System.out.println("線程2嘗試獲取鎖1失敗,已重試");
lock2.unlock();
Thread.sleep(new Random().nextInt(1000));
}
} else {
System.out.println("線程2獲取鎖2失敗娱节,已重試");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
線程1獲取到了鎖1
線程2獲取到了鎖2
線程1嘗試獲取鎖2失敗挠蛉,已重試
線程2獲取到了鎖1
線程2成功獲取到了兩把鎖
線程1獲取到了鎖1
線程1獲取到了鎖2
線程1成功獲取到了兩把鎖
2.多使用并發(fā)類而不是自己設計鎖
如ConcurrentHashMap
java.util.concurrent.atomic.
3.盡量降低鎖的使用粒度:用不同的鎖而不是一個鎖
縮小鎖的臨界區(qū)
4.如果能使用同步代碼塊,就不使用同步方法:自己指定鎖對象
縮小了同步范圍,可以自己指定鎖對象