Curator一套zookeeper客戶端框架恩伺,解決了很多Zookeeper客戶端非常底層的細(xì)節(jié)開發(fā)工作颈将,本文主要介紹使用curator框架來實(shí)現(xiàn)zookeeper的分布式鎖實(shí)現(xiàn)方案翠霍。
使用curator來實(shí)現(xiàn)zookeeper分布式鎖有多種方案牲剃,本文主要使用 InterProcessMutex 來實(shí)現(xiàn)全局共享鎖纹冤。
代碼已經(jīng)上傳至github:https://github.com/xsg1995/spring-boot-curator
引入依賴
pom.xml文件如下所示:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xsg</groupId>
<artifactId>spring-boot-curator</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring-boot-curator</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
</parent>
<dependencies>
<!--spring-boot-starter-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--spring-boot-starter-test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--spring-boot-configuration-processor-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!--curator-recipes-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<!--curator-framework-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<!--curator-test 用來模擬 zookeeper 環(huán)境-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.12.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
zookeeper的配置信息
在 application.yml 中配置 zookeeper 的配置:
zookeeper:
#每次重試時(shí)間間隔话速,單位毫秒
baseSleepTimeMs: 1000
#重試次數(shù)
maxRetries: 3
#zookeeper服務(wù)連接id與端口
connectString: 127.0.0.1:2181
#會(huì)話超時(shí)時(shí)間讶踪,單位毫秒
sessionTimeoutMs: 5000
#連接創(chuàng)建超時(shí)時(shí)間,單位毫秒
connection-timeout-ms: 5000
后臺(tái)定義一個(gè) ZookeeperProperties 類注入 zookeeper 的配置泊交,實(shí)現(xiàn)如下:
/**
* 注入 zookeeper 的配置信息
*/
@Component
@ConfigurationProperties(prefix = "zookeeper")
public class ZookeeperProperties {
private int baseSleepTimeMs;
private int maxRetries;
private String connectString;
private int sessionTimeoutMs;
private int connectionTimeoutMs;
...
}
配置 CuratorFramework
CuratorFramework 類封裝了對zookeeper底層的操作乳讥,配置如下:
/**
* curator 配置
*/
@Configuration
public class ZookeeperConfig {
/**
* 獲取 CuratorFramework
* 使用 curator 操作 zookeeper
* @return
*/
@Bean
public CuratorFramework curatorFramework(ZookeeperProperties zookeeperProperties) {
//配置zookeeper連接的重試策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(zookeeperProperties.getBaseSleepTimeMs(), zookeeperProperties.getMaxRetries());
//構(gòu)建 CuratorFramework
CuratorFramework curatorFramework =
CuratorFrameworkFactory.builder()
.connectString(zookeeperProperties.getConnectString())
.sessionTimeoutMs(zookeeperProperties.getSessionTimeoutMs())
.connectionTimeoutMs(zookeeperProperties.getConnectionTimeoutMs())
.retryPolicy(retryPolicy)
.build();
//連接 zookeeper
curatorFramework.start();
return curatorFramework;
}
}
編寫加鎖柱查、釋放鎖具體實(shí)現(xiàn)邏輯
使用模板模式,將加鎖云石、釋放鎖的通用代碼給抽取出來唉工,通過接口回調(diào)方式回調(diào)具體的業(yè)務(wù)實(shí)現(xiàn)邏輯,實(shí)現(xiàn)如下:
/**
* 可重入共享鎖組件
*/
@Component
public class ShardReentrantLockComponent {
@Autowired
private CuratorFramework curatorFramework;
/**
* 該方法為模板方法汹忠,獲得鎖后回調(diào) BaseLockHandler 中的 handler 方法
* @return
*/
public <T> T acquireLock(BaseLockHandler<T> baseLockHandler) {
//獲取要加鎖的路徑
String path = baseLockHandler.getPath();
//獲取超時(shí)時(shí)間
int timeOut = baseLockHandler.getTimeOut();
//時(shí)間單位
TimeUnit timeUnit = baseLockHandler.getTimeUnit();
//通過 InterProcessMutex 該類來獲取可重入共性鎖
InterProcessMutex lock = new InterProcessMutex(this.curatorFramework, path);
//用于標(biāo)識(shí)是否獲取了鎖
boolean acquire = false;
try {
try {
//成功獲得鎖后返回 true
acquire = lock.acquire(timeOut, timeUnit);
} catch (Exception e) {
e.printStackTrace();
}
if(acquire) {
//獲得鎖后回調(diào)具體的業(yè)務(wù)邏輯
return baseLockHandler.handler();
} else {
//沒有獲得鎖返回 null
return null;
}
} finally {
try {
if(acquire) {
//釋放鎖
lock.release();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
BaseLockHandler 抽象類回調(diào)具體的業(yè)務(wù)邏輯淋硝,實(shí)現(xiàn)如下:
/**
* 業(yè)務(wù)邏輯抽象類
* @param <T>
*/
public abstract class BaseLockHandler<T> {
//獲得鎖的默認(rèn)超時(shí)時(shí)間,默認(rèn)為 200ms
private static final int DEFAULT_TIME_OUT = 200;
//加鎖的資源路徑
private String path;
public BaseLockHandler(String path) {
this.path = path;
}
/**
* 具體的業(yè)務(wù)實(shí)現(xiàn)邏輯宽菜,重寫該方法
* @return
*/
public abstract T handler();
/**
* 返回加鎖的路徑
* @return
*/
public String getPath() {
return this.path;
}
/**
* 返回加鎖的超時(shí)時(shí)間
* @return
*/
public int getTimeOut() {
return DEFAULT_TIME_OUT;
}
/**
* 時(shí)間單位
* @return
*/
public TimeUnit getTimeUnit() {
return TimeUnit.MILLISECONDS;
}
}
測試
編寫測試用例類進(jìn)行測試:
/**
* LockComponent的測試類
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class LockComponentTest {
/**
* 表示開啟多個(gè)線程并行執(zhí)行
*/
private static final int THREAD_COUNT = 100;
@Autowired
private ShardReentrantLockComponent lockComponent;
/**
* 用來模擬 zookeeper 服務(wù)
*/
private TestingServer server;
/**
* 用來實(shí)現(xiàn)具體邏輯谣膳,對該計(jì)數(shù)器加1
*/
private int count;
@Before
public void before() throws Exception {
//模擬一個(gè)zookeeper節(jié)點(diǎn),端口號(hào)為 2181
server = new TestingServer(2181);
}
@After
public void after() {
if(server != null) {
//關(guān)閉資源
CloseableUtils.closeQuietly(server);
}
}
/**
* 不加鎖實(shí)現(xiàn)多個(gè)線程同時(shí)對 count 執(zhí)行 ++ 操作
* 會(huì)出現(xiàn)數(shù)據(jù)不一致現(xiàn)象
* @throws Exception
*/
@Test
public void noAcquireLockTest() throws Exception {
//初始化一個(gè)擁有 100 個(gè)線程的線程池
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
//使用 CountDownLatch 實(shí)現(xiàn)線程的協(xié)調(diào)
CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
for(int i = 0; i < THREAD_COUNT; i++) {
final int index = i;
//提交線程
executorService.submit(() -> {
//name 表示該線程的名稱
String name = "client" + (index + 1);
//執(zhí)行 count++
count++;
try {
//睡眠 50ms ,使測試結(jié)果更明顯
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
//打印各個(gè)線程執(zhí)行結(jié)果
System.out.println(name + " 執(zhí)行業(yè)務(wù)方法铅乡,對 count 執(zhí)行 ++ 操作后 count 的值為 : " + count);
//調(diào)用countDown方法继谚,表示該線程執(zhí)行完畢
countDownLatch.countDown();
});
}
//使該方法阻塞住,不然看不到效果
countDownLatch.await();
}
/**
* 使用 zookeeper 加鎖實(shí)現(xiàn)多個(gè)線程同時(shí)對 count 執(zhí)行 ++ 操作
* @throws Exception
*/
@Test
public void acquireLockTest() throws Exception {
//要加鎖節(jié)點(diǎn)的路徑
String path = "/path/test";
//初始化一個(gè)擁有 100 個(gè)線程的線程池
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
//使用 CountDownLatch 實(shí)現(xiàn)線程的協(xié)調(diào)
CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
for(int i = 0; i < THREAD_COUNT; i++) {
final int index = i;
//提交線程
executorService.submit(() -> {
//name 表示該線程的名稱
String name = "client" + (index + 1);
//result 獲取執(zhí)行完業(yè)務(wù)邏輯后返回值
String result = null;
while (result == null) {
//result 為 null 表示沒有的鎖阵幸,會(huì)繼續(xù)執(zhí)行while循環(huán)去競爭獲取鎖
result = lockComponent.acquireLock(new BaseLockHandler<String>(path) {
//執(zhí)行具體的業(yè)務(wù)邏輯
@Override
public String handler() {
//執(zhí)行 count++
count++;
try {
//睡眠 50ms ,使測試結(jié)果更明顯
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
//打印各個(gè)線程執(zhí)行結(jié)果
System.out.println(name + " 執(zhí)行業(yè)務(wù)方法花履,對count++ : " + count);
//執(zhí)行成功后不要返回null,如果返回null挚赊,會(huì)繼續(xù)執(zhí)行while去競爭獲取鎖
return this.getPath();
}
});
}
//調(diào)用countDown方法诡壁,表示該線程執(zhí)行完畢
countDownLatch.countDown();
});
}
//使該方法阻塞住,不然看不到效果
countDownLatch.await();
}
}
測試的業(yè)務(wù)邏輯是對 count 局部變量執(zhí)行 ++ 操作咬腕,這里編寫了兩個(gè)方法:
- noAcquireLockTest 該方法實(shí)現(xiàn)是通過開啟 100 個(gè)線程欢峰,以不加鎖的方法,并行的對 count 執(zhí)行 ++ 操作涨共;
- acquireLockTest 該方法通過獲取 zookeeper 的加鎖機(jī)制,開啟 100 個(gè)線程宠漩,并行的對 count 執(zhí)行 ++ 操作举反;
不加鎖的執(zhí)行方式結(jié)果如下所示:
可以看到,數(shù)據(jù)已經(jīng)出現(xiàn)不一致現(xiàn)象扒吁。
使用 zookeeper 加鎖執(zhí)行結(jié)果如下所示:
從右邊計(jì)數(shù)器 count 的值可以看出火鼻,加鎖操作是成功的。