沒時間寫細(xì)節(jié),直接上代碼
/**
* <p>集群模式下的切面</p>
* @since
*/
@Component
@Aspect
@Order(1)
@ConditionalOnExpression("${resource.sync.cluster:false}==true") //只有當(dāng)集群模式下該切面才生效
@Slf4j
public class ClusterAspect {
private RLock redisLock;
//由于改變量被多個線程讀寫弧关,所以上讀寫鎖懊烤,保障任務(wù)執(zhí)行完成后,才能改變變量值
private volatile boolean leader;
private static final ReentrantReadWriteLock LEADER_READ_WRITE_LOCK = new ReentrantReadWriteLock();
private static final ReentrantReadWriteLock.ReadLock LEADER_READ_LOCK = LEADER_READ_WRITE_LOCK.readLock();
private static final ReentrantReadWriteLock.WriteLock LEADER_WRITE_LOCK = LEADER_READ_WRITE_LOCK.writeLock();
private static volatile boolean LOOP_FLAG = true;
private static final String LOCK_KEY = "CLUSTER_LEADER";
//選舉間隔茸习,默認(rèn)10000ms
@Value("${resource.sync.cluster.elect.interval:10000}")
private Integer electInterval;
/**
* 門栓号胚,當(dāng)首次獲取redis鎖執(zhí)行后才能打開
*/
public static final CountDownLatch LATCH = new CountDownLatch(1);
@Autowired
private TaskExecutor taskExecutor;
/**
* 利用redis模擬選主
* 弊端: 主節(jié)點(diǎn)掛掉后猫胁,可能無法立即選出新主節(jié)點(diǎn)弃秆,20s后才有
*/
@EventListener(ApplicationReadyEvent.class)
@Async
public void electLeader() {
redisLock = SyncDistributedLockUtil.getLock(LOCK_KEY);
//每隔10s拿一次鎖,相當(dāng)于10s選舉一次脑豹,
while (LOOP_FLAG) {
//LEADER寫鎖
LEADER_WRITE_LOCK.lock();
try {
//如果當(dāng)前是主節(jié)點(diǎn)瘩欺,放鎖
if (leader && redisLock.isHeldByCurrentThread()) {
redisLock.unlock();
}
//設(shè)置鎖過期時間, 20s
//重新獲取鎖俱饿,相當(dāng)于選舉過程
leader = redisLock.tryLock(0L, 20L, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("- get redis lock error!", e);
} finally {
LEADER_WRITE_LOCK.unlock();
}
//開門拍埠,放行啟動同步
if (LATCH.getCount() == 1L) {
LATCH.countDown();
}
//每隔十秒放一次鎖
LockSupport.parkNanos(electInterval * 1000000L);
}
//項(xiàng)目關(guān)閉記得放鎖
if (leader && redisLock.isHeldByCurrentThread()) {
log.debug("- project is shutting down, release the redis lock...");
redisLock.unlock();
}
}
@PreDestroy
public void destroy() {
endLoop();
}
@Around("@annotation(syncPreProcessor)")
public Object clusterModel(ProceedingJoinPoint joinPoint, SyncPreProcessor syncPreProcessor) {
RedisExtendTask redisExtendTask = new RedisExtendTask(redisLock);
try {
//等待首次獲取選主執(zhí)行完成
LATCH.await();
//判斷是否為主節(jié)點(diǎn)
log.info("- +++++++++++ entering cluster mode械拍! Check whether this node is Leader: {}坷虑!...", leader);
//LEADER變量讀鎖
LEADER_READ_LOCK.lock();
//如果不是主節(jié)點(diǎn)迄损,直接退出
if (!leader) {
log.info("- +++++++++++ not LEADER, return...");
return null;
}
//是主節(jié)點(diǎn),執(zhí)行任務(wù)
log.info("- +++++++++++ this node is LEADER, begin to sync resource ...");
//一旦任務(wù)開始執(zhí)行,redis鎖不能放氏捞,啟動redis鎖延期線程
taskExecutor.execute(redisExtendTask);
return joinPoint.proceed();
} catch (Throwable throwable) {
log.error("- +++++++++++ error when doing sync job, message: {}", throwable.getMessage());
throw new BaseRuntimeException(throwable.getMessage());
} finally {
//放鎖
LEADER_READ_LOCK.unlock();
//停止redis鎖延長線程
redisExtendTask.endTask();
log.debug("- +++++++++++ release LEADER_READ_LOCK: {} success ...");
}
}
private static void endLoop() {
LOOP_FLAG = false;
}
/**
* redis lock 延時線程
*/
private static class RedisExtendTask implements Runnable {
private volatile boolean flag = true;
private RLock redisLock;
RedisExtendTask(RLock redisLock) {
this.redisLock = redisLock;
}
@Override
public void run() {
log.debug("- begin to extend the expire time for redis lock: {}", redisLock.getName());
//每秒延長一下過期時間,直到任務(wù)結(jié)束
while (flag) {
redisLock.expire(20L, TimeUnit.SECONDS);
if (log.isDebugEnabled()) {
log.debug("- redis lock remain time to live = {}ms", redisLock.remainTimeToLive());
}
LockSupport.parkNanos(1000000000L);
}
log.debug("- finish extending the expire time for redis lock: {}", redisLock.getName());
}
void endTask() {
this.flag = false;
}
}
}