原理
zookeeper實(shí)現(xiàn)分布式鎖的原理就是多個(gè)節(jié)點(diǎn)同時(shí)在一個(gè)指定的節(jié)點(diǎn)下面創(chuàng)建臨時(shí)會(huì)話順序節(jié)點(diǎn)唬格,誰創(chuàng)建的節(jié)點(diǎn)序號(hào)最小,誰就獲得了鎖那婉,并且其他節(jié)點(diǎn)就會(huì)監(jiān)聽序號(hào)比自己小的節(jié)點(diǎn),一旦序號(hào)比自己小的節(jié)點(diǎn)被刪除了,其他節(jié)點(diǎn)就會(huì)得到相應(yīng)的事件并闲,然后查看自己是否為序號(hào)最小的節(jié)點(diǎn),如果是谷羞,則獲取鎖帝火。(注意同目錄的有序節(jié)點(diǎn)的序號(hào)是遞增的)如圖:
curator框架實(shí)現(xiàn)(整合springboot):
PS:zooKeeper用的版本3.5.6,springboot2.3.11
POM:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
<exclusions>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.6</version>
</dependency>
代碼:
#配置
@Bean(name = "curatorFramework")
public CuratorFramework curatorFramework() {
/*
* 重試的間隔時(shí)間,毫秒
*/
int elapsedTimeMs = 5000;
/*
* 連接重試次數(shù)
*/
int retryCount = 3;
/*
* 連接超時(shí)時(shí)間湃缎,毫秒
*/
int connectionTimeoutMs = 5000;
/*
* session超時(shí)時(shí)間犀填,毫秒
*/
int sessionTimeoutMs = 60000;
/*
* zookeeper連接地址
*/
String connectString = "192.168.6.59:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(elapsedTimeMs, retryCount);
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(sessionTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.retryPolicy(retryPolicy)
.build();
curatorFramework.start();
return curatorFramework;
}
#使用
@GetMapping("/zkLock/{id}")
public void getKeyWithCurator(@PathVariable Long id) {
InterProcessMutex rlock = null;
try {
Stat stat = curatorFramework.checkExists().forPath(PARENT_PATH);
if (stat == null) {
curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath(PARENT_PATH);
}
rlock = new InterProcessMutex(curatorFramework, PARENT_PATH + CHILD_PATH);
if (rlock.acquire(10L, TimeUnit.SECONDS)) {
Object value = redisTemplate.opsForValue().get("llc" + id.toString());
if (Objects.nonNull(value)) {
System.out.println(System.currentTimeMillis()+"------------"+ Thread.currentThread()
+ "-------在緩存查詢到id=" + id + "的值=" + value.toString());
return;
}
User user = userService.queryById(id);
if (Objects.nonNull(user)) {
redisTemplate.opsForValue().set("llc" + id.toString(), user, 30, TimeUnit.MINUTES);
System.out.println(System.currentTimeMillis()+"------------"+ Thread.currentThread()
+ "-------在數(shù)據(jù)庫查詢到id=" + id + "的值=" + user);
return;
}
}
System.out.println(System.currentTimeMillis()+"------------"+ Thread.currentThread()
+ "-------10s也拿不到鎖");
return;
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (rlock != null) {
rlock.release();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}