項(xiàng)目中遇到一個(gè)這樣的需求, 使用POS機(jī)對(duì)用戶進(jìn)行充值,且渠道實(shí)時(shí)知道充值結(jié)果在POS機(jī)顯示. 但后端因?yàn)樯婕暗谌狡脚_(tái)的調(diào)用, 第三方平臺(tái)的結(jié)果異步通知, 所以后端無(wú)法給POS機(jī)同步響應(yīng)結(jié)果, 解決這個(gè)問(wèn)題的方案有兩種.
- 后端提供兩個(gè)接口, 下單接口與查詢接口, POS機(jī)調(diào)用下單接口之后, 輪詢調(diào)用查詢接口, 查詢最終結(jié)果, 若規(guī)定時(shí)間(根據(jù)業(yè)務(wù)需求而定)一直沒(méi)有終態(tài), 顯示處理中, 提示最終結(jié)果會(huì)已push, 短信通知或人工處理.
- 后端只提供一個(gè)下單接口, 再后端使用CountDownLatch對(duì)異步進(jìn)行同步化, 下面我們主要分析這種方法
先上代碼
public CreateTradeRspDTO createTrade(CreateTradeReqDTO createTradeReqDTO) {
// 省略下單操作
// 異步同步化
final CountDownLatch countDown = new CountDownLatch(1);
// 使用線程池創(chuàng)建線程執(zhí)行訂單結(jié)果查詢
// 我們服務(wù)調(diào)用使用dubbo, 超時(shí)時(shí)間設(shè)置5s, 所以CHECK_ACCOUNT_TIMES設(shè)置為8
// 最多進(jìn)行查詢8次, 每次0.5s, 如果8次也沒(méi)有查詢到最終結(jié)果, 也會(huì)執(zhí)行countDown.countDown();
ThreadPoolExecutorUtil.execute(() -> {
for (int i = 0; i < CHECK_ACCOUNT_TIMES; i++) {
try {
TimeUnit.MILLISECONDS.sleep(500);
// 查詢數(shù)據(jù)庫(kù)中訂單結(jié)果
ChannelFlowBO bo= channelFlowService.selectByOrderNo(orderNo);
// 有終態(tài)跳出循環(huán), 否則一直循環(huán)
if(FlowStatusEnum.SUCCESS == bo.getFlowStatus || FlowStatusEnum.FALS== bo.getFlowStatus) {
// 省略設(shè)置訂單狀態(tài)
break;
}
} catch (Exception e) {
log.warn("**************************查詢報(bào)錯(cuò)******************", e);
}
}
countDown.countDown();
});
try {
// 這一步需要在線程執(zhí)行后面, 如果 countDown.await()放在線程執(zhí)行前面, 程序會(huì)卡死
countDown.await();
} catch (Exception e) {
log.error("*****************CountDownLatch異常*****", e);
}
// 省略下單返回?cái)?shù)據(jù)
}
經(jīng)過(guò)上面的預(yù)熱, 下面現(xiàn)在開(kāi)始對(duì)CountDownLatch進(jìn)行分析
CountDownLatch
CountDownLatch.png
CountDownLatch原理是創(chuàng)建是指定state的值, 然后調(diào)用await掛起線程, 后續(xù)每調(diào)用移除countDown,state-1,當(dāng)state值為0時(shí), 喚醒線程. 下面進(jìn)行源碼分析.
@Test
public void testCountDownLatch() throws InterruptedException {
final CountDownLatch downLatch = new CountDownLatch(2);
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
downLatch.countDown();
System.out.println("執(zhí)行第一次countDown");
}).start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
downLatch.countDown();
System.out.println("執(zhí)行第二次countDown");
}).start();
downLatch.await();
System.out.println("線程放行");
}
執(zhí)行結(jié)果.png
await分析
// 創(chuàng)建過(guò)程很簡(jiǎn)單, 就是為state設(shè)置值
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
// await開(kāi)始
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 只有當(dāng) state == 0 的時(shí)候酒贬,這個(gè)方法才會(huì)返回 1,否則返回-1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 對(duì)head進(jìn)行延時(shí)初始化, 將node設(shè)置為tail
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 這下面之前分析
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
創(chuàng)建CountDownLatch與await操作不難理解, 只要看過(guò)上一篇AbstractQueuedSynchronizer源碼分析- ReentrantLock搶鎖解鎖就能很快理解.
countDown分析
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// 對(duì)state -1 操作, 若state-1 == 0返回true,否則返回false
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
// tryReleaseShared方法返回true進(jìn)行下面操作
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 將ws設(shè)置為0, 然后喚醒線程h
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 對(duì)head進(jìn)行延時(shí)初始化, 將node設(shè)置為tail
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//此時(shí)state為0, r為1
int r = tryAcquireShared(arg);
if (r >= 0) {
// 開(kāi)始執(zhí)行這里
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 喚醒線程之后會(huì)接著執(zhí)行這一步
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// 又是執(zhí)行這個(gè)方法, 喚醒線程
doReleaseShared();
}
}
CountDownLatch的countDown不難理解,每調(diào)用一次就對(duì)state-1, 當(dāng)state為0時(shí), 換新之前掛起的線程, 源碼比較難已理解的地方就是setHeadAndPropagate中為什么還要再次執(zhí)行doReleaseShared(), 下面有一種Debug圖解來(lái)分析一下.
CountDownLatch.await多次添加.png
喚醒操作.png
如上圖所示, 若多次執(zhí)行await操作, 會(huì)多次執(zhí)行addWaiter方法在鏈表中添加n+1個(gè)node,當(dāng)執(zhí)行count操作導(dǎo)致state為0是, 會(huì)依次喚醒鏈表中node節(jié)點(diǎn)的線程, 執(zhí)行對(duì)應(yīng)任務(wù).
總結(jié)
1.CountDownLatch在工作中還是經(jīng)常用的, 模擬多線程并發(fā)下的接口測(cè)試, 異步同步化, 一個(gè)線程完成任務(wù)同時(shí)通知多個(gè)線程, 計(jì)算匯總等待問(wèn)題.
2.CyclicBarrier,Semaphore源碼與CountDownLach差不多,用空對(duì)這兩個(gè)類進(jìn)行一下源碼解讀.