Java Concurrency
在多線程環(huán)境下,為了保證共享數(shù)據(jù)的原子和內(nèi)存可見(jiàn)性昧廷,需要進(jìn)行鎖操作憔晒。在JAVA中提供了內(nèi)置鎖和顯示鎖。本文使用用例結(jié)合明吩,來(lái)介紹以下鎖的用法:
內(nèi)置鎖(synchronized)
- 內(nèi)置鎖用來(lái)鎖定代碼塊间学,在進(jìn)入代碼的時(shí)候獲取鎖定,在退出(或者異常退出)釋放鎖定印荔。內(nèi)置鎖是互斥的低葫,意味中同一時(shí)刻只能有一個(gè)線程獲取該鎖,其它線程只能等待或者阻塞直到鎖的釋放躏鱼。如下面代碼中氮采,假如線程1執(zhí)行addOne操作,當(dāng)線程2調(diào)用getOne時(shí)染苛,就需要等待線程1執(zhí)行完成并釋放鎖鹊漠。
public class ProductPool {
private Integer product = new Integer(0);
public synchronized Integer getProduct() {
return product;
}
public synchronized void addOne() {
this.product = this.product + 1;
LOG.info("produce value: {}", this.product);
}
public synchronized Integer getOne() {
Integer old = new Integer(this.product);
this.product = this.product - 1;
return old;
}
}
- 內(nèi)置鎖是可以重入的。當(dāng)線程A獲取鎖執(zhí)行某操作茶行,如果在當(dāng)前線程A內(nèi)躯概,某個(gè)步驟也需要獲取該鎖,該步驟是可以獲取到鎖的畔师。如下例子娶靡,當(dāng)ChildClass的對(duì)象執(zhí)行doPrint時(shí)已經(jīng)獲取到了鎖,內(nèi)部繼續(xù)調(diào)用super.doPrint看锉,如果不能重入就會(huì)發(fā)生死鎖姿锭。在同一線程內(nèi)塔鳍,鎖可以重入。
public class SynchronizedDeakLock {
private static final Logger LOG = LoggerFactory.getLogger(SynchronizedLock.class);
public class BaseClass {
public synchronized void doPrint() {
LOG.info("base class print");
}
}
public class ChildClass extends BaseClass {
@Override
public synchronized void doPrint() {
LOG.info("child class do print");
super.doPrint();
}
}
@Test
public void testDeadLock() {
ChildClass childClass = new ChildClass();
childClass.doPrint();
}
}
- 內(nèi)置鎖使用非常簡(jiǎn)單呻此,在需要同步的方法轮纫、代碼塊上加入synchronized就行了,不需要顯示的獲取和釋放鎖焚鲜。且內(nèi)置鎖是JVM內(nèi)置的掌唾,它可以執(zhí)行部分優(yōu)化,比如在線程封閉鎖對(duì)象(該對(duì)象使用了鎖忿磅,但是卻不是共享對(duì)象糯彬,只在某一個(gè)線程使用)的鎖消除,改變鎖的粒度來(lái)消除內(nèi)置鎖的同步等葱她。
- 在某些情況下撩扒,我們希望獲取鎖但又不想一直等待,所以我們指定獲取到鎖的最大時(shí)間览效,如果獲取不到就超時(shí)却舀。內(nèi)置鎖對(duì)這種細(xì)粒度的控制是不支持的,JAVA提供了一種新的鎖機(jī)制:顯示鎖锤灿。下章挽拔,我們就對(duì)該話題進(jìn)行討論。
ReentrantLock
ReentrantLock是JAVA 5提供的細(xì)粒度的鎖但校,作為內(nèi)置鎖在某些場(chǎng)景的補(bǔ)充螃诅。比如:支持線程獲取鎖的時(shí)間設(shè)置,支持獲取鎖線程對(duì)interrupt事件響應(yīng)状囱。但是在使用時(shí)必須顯示的獲取鎖术裸,然后在finally中釋放。如果不釋放亭枷,相當(dāng)于在程序中放置了個(gè)定時(shí)炸彈袭艺,后期很難發(fā)現(xiàn)。它實(shí)現(xiàn)了Lock的以下API(部分例子為了達(dá)到測(cè)試效果沒(méi)有unlock, 實(shí)際使用中絕對(duì)不能這樣):
1 . void lock() 獲取鎖叨粘,一致等待直到獲取猾编。下面的例子中,在主線程中獲取鎖且不釋放升敲, 子線程調(diào)用lock方法來(lái)獲取鎖答倡。可以看到驴党,子線程一致處于RUNNABLE狀態(tài)瘪撇,即使它被interrupt。
@Test
public void testLockWithInterrupt() throws InterruptedException {
final Lock lock = new ReentrantLock();
lock.lock();
Thread childThread = new Thread(() -> {
lock.lock();
}, "t1 thread");
childThread.start();
childThread.interrupt();
LOG.info("the child thread state: {}", childThread.getState().name());
assertFalse(childThread.isInterrupted());
}
2 . void lockInterruptibly() throws InterruptedException; 獲取鎖直到線程被interrupt, 線程拋出InterruptedException。下面的例子中倔既,主線程獲取鎖且不釋放恕曲,子線程調(diào)用lockInterruptibly方法來(lái)獲取鎖。首先在子線程獲取不到鎖的時(shí)候叉存,會(huì)處于一直等待狀態(tài)码俩;當(dāng)主線程中調(diào)用子線程interrupt時(shí),子線程會(huì)拋出InterruptedException歼捏。
@Test(expected = InterruptedException.class)
public void testLockInterruptibly() throws Exception {
final Lock lock = new ReentrantLock();
lock.lock();
Thread.sleep(1000);
Thread mainThread = Thread.currentThread();
Thread t1=new Thread(new Runnable(){
@Override
public void run() {
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
LOG.error(" thread interrupted: {}", e);
mainThread.interrupt();
}
}
}, "t1 thread");
t1.start();
Thread.sleep(1000);
t1.interrupt();
Thread.sleep(1000000);
}
3 . boolean tryLock() 獲取鎖,如果獲取不到則立即返回false笨篷。
@Test
public void testTryLock() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
ReentrantLock reentrantLock = new ReentrantLock();
Runnable runnable = () -> {
reentrantLock.lock();
try {
Thread.sleep(2 * 1000l);
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
};
Runnable interruptRunnable = () -> {
boolean result = reentrantLock.tryLock();
if (result) {
LOG.info("lock success");
reentrantLock.unlock();
} else {
LOG.info("lock failed");
}
};
new Thread(runnable).start();
new Thread(interruptRunnable).start();
countDownLatch.await();
}
4 . boolean tryLock(long time, TimeUnit unit) throws InterruptedException 在指定的時(shí)間內(nèi)獲取鎖瞳秽,且返回結(jié)果。
@Test
public void testTryLockWithTime() throws InterruptedException, ExecutionException {
final Lock lock = new ReentrantLock();
lock.lock();
CompletableFuture<Boolean> completableFuture = CompletableFuture.supplyAsync(() -> tryLock(lock));
assertFalse(completableFuture.get().booleanValue());
}
private boolean tryLock(Lock lock) {
try {
boolean result = lock.tryLock(100, TimeUnit.MILLISECONDS);
LOG.info("lock result: {}", result);
return result;
} catch (InterruptedException e) {
LOG.error("interrupted: {}", e);
}
return false;
}
Semaphore
信號(hào)量常常用來(lái)控制對(duì)某一資源的訪問(wèn)數(shù)量率翅。例如练俐,下面的測(cè)試中我們?cè)O(shè)置信號(hào)量的permits為5,當(dāng)其中5個(gè)現(xiàn)在獲取且沒(méi)釋放冕臭,其它訪問(wèn)線程是獲取不到permit的腺晾。
@Test
public void testSemaphore() throws InterruptedException {
Semaphore semaphore = new Semaphore(5);
CountDownLatch countDownLatch = new CountDownLatch(2000);
Executor executor = Executors.newFixedThreadPool(10);
Runnable runnable = () -> {
boolean isAcquired = semaphore.tryAcquire();
if (isAcquired) {
try {
LOG.info("semaphore is acquired");
TimeUnit.MICROSECONDS.sleep(2);
} catch (InterruptedException ex) {
LOG.error("error: {}", ex);
} finally {
semaphore.release();
}
} else {
LOG.info("semaphore is not acquired");
}
countDownLatch.countDown();
};
IntStream.range(1, 2001).forEach(i ->
executor.execute(runnable)
);
countDownLatch.await();
}
線程池(Thread pool)
線程池中的任務(wù)相對(duì)獨(dú)立,才能使它的性能達(dá)到最優(yōu)辜贵。在線程池中悯蝉,如果出現(xiàn)相互依賴的線程,這可能導(dǎo)致線程死鎖托慨。比如:我們開(kāi)啟一個(gè)只有1個(gè)線程的線程池鼻由,調(diào)用A任務(wù)時(shí),A開(kāi)始了B任務(wù)厚棵。然后A任務(wù)依賴B任務(wù)的完成蕉世。在實(shí)際執(zhí)行中,A使用了線程池中的線程婆硬,B任務(wù)不能獲取線程執(zhí)行狠轻,導(dǎo)致A任務(wù)不停的處于等待,而B(niǎo)任務(wù)也在等待A釋放線程彬犯。
@Test
public void testThreadPoolThreadDependency() {
ExecutorService executor = Executors.newSingleThreadExecutor();
Callable<String> stringRunnable = () -> {
return "test";
};
Callable<String> runnable = () -> {
Future<String> result = executor.submit(stringRunnable);
try {
return result.get();
} catch (InterruptedException e) {
return null;
} catch (ExecutionException e) {
return null;
}
};
try {
LOG.info(executor.submit(runnable).get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
運(yùn)行上面測(cè)試向楼,會(huì)發(fā)現(xiàn)處于一直等待的情況,查看thread dump:
"pool-1-thread-1" #11 prio=5 os_prio=31 tid=0x00007fd6d606f000 nid=0x5703 waiting on condition [0x0000000122af2000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000795f453d8> (a java.util.concurrent.FutureTask)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
at java.util.concurrent.FutureTask.get(FutureTask.java:191)
at com.eyesee.concurrency.threadpool.ThreadPoolExecutorTest.lambda$testThreadPoolThreadDependency$1(ThreadPoolExecutorTest.java:25)
at com.eyesee.concurrency.threadpool.ThreadPoolExecutorTest$$Lambda$2/183264084.call(Unknown Source)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- <0x0000000795f22fd0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
源代碼詳見(jiàn):https://github.com/jessepys/javaconcurrency