定義鎖根路徑:/locks ,過期時間
獲得鎖:
1先創(chuàng)建父節(jié)點locks,當前線程創(chuàng)建臨時順序節(jié)點(/locks/non000001)
2 獲取/locks下所有孩子節(jié)點(get /locks)并排序,最小的節(jié)點排最前面
3 如果子節(jié)點為空說明會話斷開了能耻,節(jié)點被刪除淤齐,返回獲取鎖失敗
4 如果當前節(jié)點=最小節(jié)點缸血,則返回獲取鎖成功,并新建一個線程來判定鎖過期。
5 如果當前節(jié)點不是最小節(jié)點拍埠,則設(shè)置監(jiān)聽比自己次小節(jié)點的刪除事件,然后掛起當前線程。(公平鎖)崔列,如果要實現(xiàn)非公平鎖添诉,則設(shè)置監(jiān)聽最小節(jié)點屁桑。
6 當最小編號的線程獲取鎖,處理完業(yè)務(wù)則刪除自己對應(yīng)的zk節(jié)點,刪除后會激活比自己大一號的節(jié)點線程從阻塞變?yōu)檫\行栏赴,被激活的線程是當前節(jié)點最小的了蘑斧,然后就可以獲取到鎖。
代碼實現(xiàn):
package geektime.spring.springbucks.waiter;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author liangxianliang
* @create 2020-01-09 21:54
*/
public class ZookeeperLockTest {
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperLockTest.class);
private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();
private static final ThreadLocal<ZooKeeper> ZK_THREAD_LOCAL = new ThreadLocal<>();
/** 鎖的根路徑 **/
private static final String LOCK_ROOT_PATH = "/locks";
/** 鎖后綴 **/
private static final String LOCK_SUFFIX = "_NO_";
/** 創(chuàng)建根節(jié)點同步鎖 **/
private static final String CREATE_ROOT_LOCK = "LOCK";
/** 公平鎖 **/
private static final boolean LOCK_FAIR = false;
private final static byte[] BUF = new byte[0];
static int LOCK_EXPIRES = 10;
static int LOCK_WAITTIME = 10 ;
/**
* 獲得所有
* @param key 路徑不含‘/’
* @return boolean
*/
public static boolean tryLock(String key) {
return tryLock(key, LOCK_EXPIRES, LOCK_WAITTIME);
}
/**
* 獲得鎖
* @param key 鍵/路徑
* @param expire 過期時間
* @param wait 等待時間
* @return boolean
*/
public static boolean tryLock(String key, long expire, long wait) {
ZooKeeper zooKeeper = getZooKeeper();
ZK_THREAD_LOCAL.set(zooKeeper);
return tryLock(zooKeeper, key, expire, wait);
}
/**
* 獲得鎖
* @param zooKeeper zk連接
* @param key 路徑
* @param expire 過期時間
* @param wait 等待時間
* @return boolean
*/
private static boolean tryLock(ZooKeeper zooKeeper, String key, long expire, long wait) {
expire = expire * 1000;
wait = wait * 1000;
final String currNode;
String path = LOCK_ROOT_PATH + "/" + key + LOCK_SUFFIX;
try {
currNode = zooKeeper.create(path, BUF, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//步驟一
List<String> nodes = zooKeeper.getChildren(LOCK_ROOT_PATH, false);
//過濾掉集合中不是當前業(yè)務(wù)的臨時節(jié)點
nodes = nodes.stream().filter(o -> o.startsWith(key)).collect(Collectors.toList());
nodes.sort(String::compareTo);
//如果集合為空說明當前創(chuàng)建節(jié)點的session在步驟一處已經(jīng)斷開,并且創(chuàng)建的節(jié)點已經(jīng)被zk服務(wù)器刪除, 此種情況比較極端
if (nodes.size() == 0) {
return false;
}
//最小的節(jié)點就是自己創(chuàng)建的節(jié)點表示拿到鎖
if (currNode.endsWith(nodes.get(0))) {
runExpireThread(zooKeeper, currNode, expire);
return true;
}
//沒有拿到鎖
CountDownLatch countDownLatch = new CountDownLatch(1);
//非公平鎖
if(LOCK_FAIR){
for (int i = 0; i < nodes.size(); i++) {
String node = nodes.get(i);
if (currNode.endsWith(node)) {
runExpireThread(zooKeeper, currNode, expire);
return true;
}
Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + node, new LockWatcher(countDownLatch));
if (stat != null) {
delPath(zooKeeper);
//等待鎖超時
if(!countDownLatch.await(wait, TimeUnit.MILLISECONDS)){
return tryLock(zooKeeper, key, expire, wait);
}
}
}
}else{
for (int i = 0; i < nodes.size(); i++) {
String node = nodes.get(i);
if (currNode.endsWith(node)) {
runExpireThread(zooKeeper, currNode, expire);
return true;
}
//當前節(jié)點的前一個節(jié)點
if (currNode.endsWith(nodes.get(i + 1))) {
Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + node, new LockWatcher(countDownLatch));
if (stat != null) {
// 等待鎖超時竖瘾,如果是公平鎖沟突,等待時間是默認等待時間的2倍,防止因為拿鎖的線程處理業(yè)務(wù)時間太久
// 導(dǎo)致當前線程等待超時
if(!countDownLatch.await(wait * 2, TimeUnit.MILLISECONDS)){
delPath(zooKeeper);
return false;
}
return true;
}
}
}
}
} catch (KeeperException | InterruptedException e) {
LOG.error("create '{}' node fail.", key, e);
}
return false;
}
/**
* 釋放鎖
*/
public static void unLock() {
ZooKeeper zooKeeper = ZK_THREAD_LOCAL.get();
delPath(zooKeeper);
close(ZK_THREAD_LOCAL.get());
THREAD_LOCAL.remove();
ZK_THREAD_LOCAL.remove();
}
/**
* 創(chuàng)建分布式鎖的根路徑
*/
private static void createLockRootPath() {
ZooKeeper zooKeeper = getZooKeeper();
try {
Stat stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
if (stat == null) {
synchronized (CREATE_ROOT_LOCK) {
stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
if (stat == null) {
LOG.info("create lock root path '{}'", LOCK_ROOT_PATH);
zooKeeper.create(LOCK_ROOT_PATH, BUF, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
String path = LOCK_ROOT_PATH + "/key" + LOCK_SUFFIX;
Stat stats = zooKeeper.exists(path, false);
zooKeeper.create(path, BUF, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
/**
* 獲得zooke會話連接
* @return org.apache.zookeeper.ZooKeeper
*/
private static ZooKeeper getZooKeeper() {
final CountDownLatch countDownLatch = new CountDownLatch(1);
try {
final ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 30000, null);
zooKeeper.register((watchedEvent) -> {
switch (watchedEvent.getState()) {
case Expired:
close(zooKeeper);
break;
case SyncConnected:
countDownLatch.countDown();
default:
}
});
if(!countDownLatch.await(3000, TimeUnit.MILLISECONDS)){
close(zooKeeper);
throw new RuntimeException("wait for creating zookeeper connection timeout, timeout is [3000]");
}
return zooKeeper;
} catch (Exception e) {
throw new RuntimeException("create Zookeeper instance fail.", e);
}
}
/**
* 啟動一個線程來判斷鎖的過期時間准浴,方式業(yè)務(wù)假死事扭,zk不斷開導(dǎo)致死鎖
* @param zooKeeper zk連接
* @param currNode 當前節(jié)點
*/
private static void runExpireThread(final ZooKeeper zooKeeper, String currNode, long expire){
THREAD_LOCAL.set(currNode);
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
try {
Thread.sleep(expire * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LOG.info("等待了{}秒, 主動結(jié)束.", expire);
delPath(zooKeeper);
});
}
/**
* 刪除創(chuàng)建的路徑
* @param zooKeeper zk連接
*/
private static void delPath(ZooKeeper zooKeeper) {
try {
//無論節(jié)點是否存在,直接執(zhí)行刪除操作
zooKeeper.delete(THREAD_LOCAL.get(), -1);
} catch (Exception e){
LOG.error("lock expire, delete lock");
}
}
/**
* 斷開連接
* @param zooKeeper zk連接
*/
private static void close(ZooKeeper zooKeeper) {
if (zooKeeper != null) {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 監(jiān)聽節(jié)點刪除事件
*/
static class LockWatcher implements Watcher {
private CountDownLatch latch;
public LockWatcher(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void process(WatchedEvent event) {
if(event.getType() == Event.EventType.NodeDeleted){
latch.countDown();
}
}
}
public static void main(String[] args) throws Exception {
//createLockRootPath();
int count = 30;
ExecutorService executorService = Executors.newFixedThreadPool(count);
for( int i = 0; i < count; i++) {
executorService.execute(() -> {
try {
if(ZookeeperLockTest.tryLock("lock")){
System.out.println("我拿到了鎖");
System.out.println(Thread.currentThread().getName() + ": 我要等待6秒乐横,測試其他超時是否有效.");
Thread.sleep(1000);
System.out.println("我的等待結(jié)束了求橄,其他人可以過來拿鎖了.");
}else{
System.out.println("沒有拿到鎖");
}
} catch (Exception e) {
e.printStackTrace();
}finally{
ZookeeperLockTest.unLock();
}
});
}
executorService.shutdown();
}
}