前言
首先說說LockSupport吧芹助,它的作用是提供一組直接block或unblock線程的方法也糊,其底層實(shí)現(xiàn)利用了Unsafe(前面文章有講過Unsafe)。LockSupport是一個(gè)非常底層的API砖第,我們利用其可以做很多事情荷腊,本文將利用LockSupport實(shí)現(xiàn)互斥鎖和共享鎖。
Lock
在JDK中已經(jīng)提供了很多種鎖的實(shí)現(xiàn)悄窃,原生的synchronized(優(yōu)先推薦使用),juc中的ReentrantLock等讥电,本文不糾結(jié)synchronized和ReentrantLock的實(shí)現(xiàn),本文只從Lock的語(yǔ)義出發(fā)實(shí)現(xiàn)兩種鎖轧抗。
Lock的語(yǔ)義
juc中對(duì)于Lock接口的定義如下:
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long var1, TimeUnit var3) throws InterruptedException;
void unlock();
Condition newCondition();
- void lock():獲取鎖的語(yǔ)義恩敌,如果沒有獲取到鎖一直阻塞當(dāng)前線程(不響應(yīng)中斷interrupt)
- void lockInterruptibly() throws InterruptedException; 獲取鎖,但是當(dāng)前線程在阻塞期間可以響應(yīng)中斷(后面稍微會(huì)扯一下InterruptedException)
- boolean tryLock(); 嘗試獲取鎖横媚,不阻塞纠炮;獲取到鎖返回true,沒有獲取到返回false
- boolean tryLock(long var1, TimeUnit var3) throws InterruptedException; 嘗試獲取鎖灯蝴,并嘗試阻塞等待一定時(shí)間恢口,阻塞期間可以響應(yīng)中斷
- void unlock(); 釋放鎖;
- Condition newCondition();在鎖上新建Condition
以上的關(guān)于鎖的語(yǔ)義稍微復(fù)雜了點(diǎn)穷躁,特別是相應(yīng)中斷部分和newCondition部分耕肩,所以這次實(shí)現(xiàn)上簡(jiǎn)化了Lock的語(yǔ)義如下:
void lock();
void unLock();
boolean tryLock();
boolean tryLock(long maxWaitInMills);
基本功能和上面保持一致,但是都不響應(yīng)中斷
分析鎖的實(shí)現(xiàn)
- Lock有可重入的語(yǔ)義问潭,一個(gè)線程擁有鎖之后再次調(diào)用lock應(yīng)該完全沒有任何問題猿诸,所以鎖的實(shí)現(xiàn)中需要維護(hù)一個(gè)已經(jīng)獲取鎖的線程隊(duì)列;
- Lock未成功需要阻塞當(dāng)前線程狡忙,所以需要底層阻塞原語(yǔ)(LockSupport)等的支持两芳,并且在有線程釋放鎖之后需要喚起阻塞線程進(jìn)行鎖的競(jìng)爭(zhēng),所以需要維護(hù)等待鎖的線程隊(duì)列
- Lock需要維護(hù)當(dāng)前鎖的狀態(tài)(是否可以被獲取等)
互斥鎖
public class MutexLock implements Lock {
private volatile Thread threadOwnsTheLock;
private final AtomicInteger state = new AtomicInteger(0);
private final ConcurrentLinkedQueue<Thread> waitThreadsQueue = new ConcurrentLinkedQueue<Thread>();
//一直等待
public void lock() {
tryLock(-1L);
}
//invoke all的語(yǔ)義,也可以做invokeNext
public void unLock() {
tryRelease(-1);
threadOwnsTheLock = null;
if (!waitThreadsQueue.isEmpty()) {
for (Thread thread : waitThreadsQueue) {
LockSupport.unpark(thread);
}
}
}
public boolean tryLock() {
if (threadOwnsTheLock != null && (threadOwnsTheLock == Thread.currentThread())) {
return true;
}
if (tryAcquire(1)) {
threadOwnsTheLock = Thread.currentThread();
return true;
}
return false;
}
//沒有實(shí)現(xiàn)interrupt的語(yǔ)義,不能打斷
public boolean tryLock(long maxWaitInMills) {
Thread currentThread = Thread.currentThread();
try {
waitThreadsQueue.add(currentThread);
if (maxWaitInMills > 0) {
boolean acquired = false;
long left = maxWaitInMills * 1000L * 1000L;
long cost = 0;
while (true) {
//需要判斷一次interrupt
if (tryAcquire(1)) {
threadOwnsTheLock = currentThread;
acquired = true;
break;
}
left = left - cost;
long mark = System.nanoTime();
if (left <= 0) {
break;
}
LockSupport.parkNanos(left);
cost = mark - System.nanoTime();
}
return acquired;
}else {
while (true) {
if (tryAcquire(1)) {
threadOwnsTheLock = currentThread;
break;
}
LockSupport.park();
}
return true;
}
} finally {
waitThreadsQueue.remove(currentThread);
}
}
protected boolean tryAcquire(int acquire) {
return state.compareAndSet(0, 1);
}
protected void tryRelease(int release) {
if (threadOwnsTheLock == null || (threadOwnsTheLock != Thread.currentThread())) {
System.out.println("Wrong state, this thread don't own this lock.");
}
while (true) {
if (state.compareAndSet(1, 0)) {
return;
}
}
}
}
以上互斥鎖使用了一個(gè)AtomicInteger去枷,利用了CAS來維持鎖的狀態(tài)
共享鎖
public class ShareLock implements Lock {
private volatile Set<Thread> threadsOwnsLock = Sets.newConcurrentHashSet();
private final AtomicInteger state;
private final ConcurrentLinkedQueue<Thread> waitThreadsQueue = new ConcurrentLinkedQueue<Thread>();
public ShareLock(int shareNum) {
this.state = new AtomicInteger(shareNum);
}
//一直等待
public void lock() {
tryLock(-1L);
}
public void unLock() {
tryRelease(-1);
threadsOwnsLock.remove(Thread.currentThread());
if (!waitThreadsQueue.isEmpty()) {
for (Thread thread : waitThreadsQueue) {
LockSupport.unpark(thread);
}
}
}
public boolean tryLock() {
if ( !(threadsOwnsLock.contains(Thread.currentThread()))) {
return true;
}
if (tryAcquire(1)) {
threadsOwnsLock.add(Thread.currentThread());
return true;
}
return false;
}
public boolean tryLock(long maxWaitInMills) {
Thread currentThread = Thread.currentThread();
try {
waitThreadsQueue.add(currentThread);
if (maxWaitInMills > 0) {
boolean acquired = false;
long left = TimeUnit.MILLISECONDS.toNanos(maxWaitInMills);
long cost = 0;
while (true) {
if (tryAcquire(1)) {
threadsOwnsLock.add(Thread.currentThread());
acquired = true;
break;
}
left = left - cost;
long mark = System.nanoTime();
if (left <= 0) {
break;
}
LockSupport.parkNanos(left);
cost = mark - System.nanoTime(); //有可能是被喚醒重新去獲取鎖,沒獲取到還得繼續(xù)等待剩下的時(shí)間(并不精確)
}
return acquired;
}else {
while (true) {
if (tryAcquire(1)) {
threadsOwnsLock.add(Thread.currentThread());
break;
}
LockSupport.park();
}
return true;
}
} finally {
waitThreadsQueue.remove(currentThread);
}
}
protected boolean tryAcquire(int acquire) {
if (state.getAndDecrement() > 0) {
return true;
} else {
state.getAndIncrement();//恢復(fù)回來
return false;
}
}
protected void tryRelease(int release) {
if (!(threadsOwnsLock.contains(Thread.currentThread()))) {
System.out.println("Wrong state, this thread don't own this lock.");
}
state.getAndIncrement();
}
}
總結(jié)
以上利用了LockSupport來實(shí)現(xiàn)了互斥鎖和共享鎖怖辆,但是實(shí)現(xiàn)中并沒有完成中斷響應(yīng)是复。后面應(yīng)該會(huì)有文章單獨(dú)說明關(guān)于InterruptedException的注意點(diǎn)。下篇文章將講述如何利用LockSupport實(shí)現(xiàn)Future語(yǔ)義