初識Zookeeper
Zookeeper
是一個典型的分布式數(shù)據(jù)一致性的解決方案网缝,分布式應(yīng)用程序可以基于它實現(xiàn)諸如數(shù)據(jù)發(fā)布/訂閱懊缺、負(fù)載均衡疫稿、命名服務(wù)、分布式協(xié)調(diào)通知、Master選舉而克、分布式鎖和分布式隊列等功能靶壮。本篇主要是用java多線程模擬實現(xiàn)基于Zookeeper
的分布式讀寫鎖怔毛。
Zookeeper偽集群部署實踐
Zookeeper
搭建分布式集群至少需要三臺服務(wù)器员萍,手頭確實沒有那么多資源,好在Zookeeper
允許在一臺機(jī)器上完成一個偽集群的搭建拣度。所謂偽集群其實就是所有的機(jī)器都在同一臺機(jī)器上碎绎,但還是以集群的特性來對外提供服務(wù)的。這種模式和集群十分相似抗果,只不過在同一臺機(jī)器上的不同Zookeeper實例是以不同的端口號來互相通信的筋帖。從官網(wǎng)下載到最新的Zookeeper
發(fā)行版本壓縮包:zookeeper-3.4.10.tar.gz
,將Zookeeper
解壓到我的阿里云服務(wù)器/usr/local/zookeeper
地址下。
配置
cp /usr/local/zookeeper/conf/zoo_sample.cfg zoo1.cfg
cp /usr/local/zookeeper/conf/zoo_sample.cfg zoo2.cfg
cp /usr/local/zookeeper/conf/zoo_sample.cfg zoo3.cfg
以zoo_sample.cfg
為模板復(fù)制三個偽集群的配置冤馏。三個配置文件的詳細(xì)配置如下:
zoo1.cfg:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data_1/
clientPort=2181
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889
zoo2.cfg:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data_2/
clientPort=2182
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889
zoo3.cfg:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data_3/
clientPort=2183
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889
-
tickTime
: 心跳時間日麸,單位是毫秒 ,session的最小超時時間是2*tickTime -
initLimit
:多少個tickTime內(nèi)逮光,允許其他server連接并初始化數(shù)據(jù) -
syncLimit
:多少個tickTime內(nèi)代箭,允許follower同步 -
dataDir
:存放zookeeper數(shù)據(jù)的路徑 -
clientPort
:監(jiān)聽客戶端連接的端口號 -
server.id=ip:port1:port2
:其中id表示該服務(wù)器的id號,需要注意的是,集群部署必須在dataDir
路徑下新建一個myid
的文件涕刚,內(nèi)容就是當(dāng)前服務(wù)器的id號嗡综。ip是當(dāng)前服務(wù)器的ip,port1表示Follower服務(wù)器和Leader進(jìn)行運行時通信和數(shù)據(jù)同步使用的端口號杜漠,port2端口用于Leader選舉過程中的投票通信极景。
使用下面命令開啟三個Zookeeper server服務(wù):
root@iZwz9hs5ueqrblutmr5bncZ:/usr/local/zookeeper# bin/zkServer.sh start zoo1.cfg
root@iZwz9hs5ueqrblutmr5bncZ:/usr/local/zookeeper# bin/zkServer.sh start zoo2.cfg
root@iZwz9hs5ueqrblutmr5bncZ:/usr/local/zookeeper# bin/zkServer.sh start zoo3.cfg
可以在/usr/local/zookeeper
路徑下的zookeeper.out
文件中看到服務(wù)開啟的日志信息。
測試客戶端的連接
在測試前驾茴,需要在阿里云服務(wù)器管理中心打開相應(yīng)的Zookeeper
客戶端連接端口盼樟。打開Windows命令行,定位到Zookeeper
路徑執(zhí)行下面命令:
D:\zookeeper-3.4.10>bin\zkCli -server 服務(wù)器ip:2181
發(fā)現(xiàn)Windows Zookeeper
客戶端已經(jīng)成功連接上阿里云上的Zookeeper
集群锈至,創(chuàng)建一個test結(jié)點晨缴。此時我們連接的是2181端口,也就是偽集群當(dāng)中zoo1的監(jiān)聽的端口號裹赴。如下圖所示:
另外在阿里云服務(wù)器中也開啟一個客戶端的連接喜庞,看看我們之前添加的test結(jié)點有沒有真正的添加到Zookeeper
集群中。
上圖可以看到test結(jié)點確實已經(jīng)在集群當(dāng)中了棋返,而且此時連接的是2183端口延都,也就是zoo3這個server。由此驗證了
Zookeeper
偽集群搭建成功睛竣。
分布式讀寫鎖實現(xiàn)
下面就基于上面搭建的Zookeeper
偽集群晰房,實現(xiàn)分布式讀寫鎖。我們都知道當(dāng)一個事務(wù)獲得讀鎖之后,在這之后的事務(wù)只能獲取讀鎖殊者,寫鎖獲取必須等到所有讀鎖全部釋放与境。而寫鎖一旦獲取后其他事務(wù)的讀鎖以及寫鎖都必須等待該寫鎖釋放后獲取。那么Zookeeper
是怎么實現(xiàn)這一特性的呢猖吴?
首先Zookeeper
創(chuàng)建節(jié)點時可以創(chuàng)建4中形式的節(jié)點摔刁,分別是持久節(jié)點(PERSISTENT)、持久順序節(jié)點(PERSISTENT_SEQUENTIAL)海蔽、臨時節(jié)點(EPHEMERAL)共屈、臨時順序節(jié)點(EPHEMERAL_SEQUENTIAL)。
- 持久節(jié)點:該數(shù)據(jù)節(jié)點被創(chuàng)建后党窜,就會一直存在于
Zookeeper
服務(wù)器上拗引,直到有刪除操作主動清除這個節(jié)點。 - 持久順序節(jié)點:基本特性和持久節(jié)點是一致的幌衣,額外的特性表現(xiàn)在順序性上矾削,在創(chuàng)建節(jié)點的過程中,
Zookeeper
會自動為給定節(jié)點名加上一個遞增的數(shù)字后綴作為新的節(jié)點名豁护。 - 臨時節(jié)點:臨時節(jié)點的生命周期和客戶端的會話綁定在一起哼凯,也就是說,如果客戶端會話失效择镇,那么節(jié)點就會被自動清理掉挡逼。另外,臨時節(jié)點不能創(chuàng)建子結(jié)點腻豌。
- 臨時順序節(jié)點:特性和臨時節(jié)點一致家坎,額外擁有順序的特性。
其實分布式讀寫鎖就是基于Zookeeper
順序節(jié)點特性來實現(xiàn)的吝梅。每個事務(wù)想要獲得鎖時都去同一個節(jié)點/lock
下虱疏,創(chuàng)建命名規(guī)范為[hostname]-鎖類型-序號
的臨時順序節(jié)點。如果自身想要獲取的是讀鎖苏携,那么只要查看/lock
下節(jié)點順序比自身小的節(jié)點中沒有寫類型的節(jié)點便可獲得讀鎖做瞪。如果自身想要獲取寫鎖,那么只要看到/lock
下自己是順序最小的節(jié)點便可獲得寫鎖右冻。下面是多線程模擬事務(wù)的詳細(xì)實現(xiàn):
public class DistributedLockDemo {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static String rootPath = "/lock";
private static ZooKeeper zooKeeper;
public static void main(String[] args) {
try {
zooKeeper = new ZooKeeper("119.23.216.241:2181", 5000, null); //連接我的Zookeeper集群
zooKeeper.create(rootPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
WriteThread[] writeThreads = new WriteThread[5]; //創(chuàng)建5個寫鎖線程
ReadThread[] readThreads = new ReadThread[10]; //創(chuàng)建10個讀鎖線程
for (int i = 0; i < writeThreads.length; i++) {
writeThreads[i] = new WriteThread("WriteThread_" + i);
writeThreads[i].start();
}
for (int i = 0; i < readThreads.length; i++) {
readThreads[i] = new ReadThread("ReadThread_" + i);
readThreads[i].start();
}
TimeUnit.SECONDS.sleep(1);
countDownLatch.countDown();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
static class WriteThread extends Thread {
public WriteThread(String name){
super(name);
}
@Override
public void run() {
try {
countDownLatch.await(); //多個線程同一時間競爭資源
//path示例:/lock/WriteThread_0-W-0000000001
String path = zooKeeper.create(rootPath + "/" + Thread.currentThread().getName() + "-W-", "".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() + ": try to acquire write lock...");
while (!canGetWriteLock(path)) {
TimeUnit.MILLISECONDS.sleep(500);
}
afterGetLockDo(path, true);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class ReadThread extends Thread {
public ReadThread(String name){
super(name);
}
@Override
public void run() {
try {
countDownLatch.await();
String path = zooKeeper.create(rootPath + "/" + Thread.currentThread().getName() + "-R-", "".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() + ": try to acquire read lock...");
while (!canGetReadLock(path)){
TimeUnit.MILLISECONDS.sleep(500);
}
afterGetLockDo(path, false);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
//判斷是否可以獲取寫鎖
private static boolean canGetWriteLock(String path) {
List<String> children = null; // /lock下所有子節(jié)點列表
try {
children = zooKeeper.getChildren(rootPath, false);
//基于節(jié)點名當(dāng)中的序號進(jìn)行排序
Collections.sort(children, new Comparator<String>() {
public int compare(String o1, String o2) {
int index1 = o1.lastIndexOf("-");
int index2 = o2.lastIndexOf("-");
long a = Long.parseLong(o1.substring(index1 + 1));
long b = Long.parseLong(o2.substring(index2 + 1));
return (int) (a - b);
}
});
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
//只要最小的節(jié)點是自身便可獲得寫鎖
boolean result = path.replace("/lock/", "").equals(children.get(0));
return result;
}
//判斷是否可以獲得讀鎖
private static boolean canGetReadLock(String path){
List<String> children = null;
try {
children = zooKeeper.getChildren(rootPath, false);
Collections.sort(children, new Comparator<String>() {
public int compare(String o1, String o2) {
int index1 = o1.lastIndexOf("-");
int index2 = o2.lastIndexOf("-");
long a = Long.parseLong(o1.substring(index1 + 1));
long b = Long.parseLong(o2.substring(index2 + 1));
return (int) (a - b);
}
});
//只要序號比自己小的節(jié)點中沒有寫類型節(jié)點便可獲得讀鎖
int index = children.indexOf(path.replace("/lock/", ""));
for (int i = 0; i < index; i++) {
String child = children.get(i);
int k = child.indexOf("-");
if (child.substring(k+1, k+2).equals("W")) return false;
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
return true;
}
//獲得鎖后所做的工作
private static void afterGetLockDo(String path, boolean isWriteLock) {
if (isWriteLock) {
System.out.println(Thread.currentThread().getName() + ": get write lock...");
try {
TimeUnit.SECONDS.sleep(2); //模擬寫事務(wù)所做的工作
System.out.println(Thread.currentThread().getName() + ": release write lock...");
zooKeeper.delete(path, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
} else {
System.out.println(Thread.currentThread().getName() + ": get read lock...");
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + ": release read lock...");
zooKeeper.delete(path, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
}
最后的運行結(jié)果如下圖所示:
** 可以看到所有線程都是符合正常的讀寫鎖邏輯并且獲取到相關(guān)的鎖資源装蓬。至此,Zookeeper讀寫鎖就實現(xiàn)啦纱扭。 **