Zookeeper偽集群部署與分布式讀寫鎖實現(xiàn)

初識Zookeeper

Zookeeper是一個典型的分布式數(shù)據(jù)一致性的解決方案网缝,分布式應(yīng)用程序可以基于它實現(xiàn)諸如數(shù)據(jù)發(fā)布/訂閱懊缺、負(fù)載均衡疫稿、命名服務(wù)、分布式協(xié)調(diào)通知、Master選舉而克、分布式鎖和分布式隊列等功能靶壮。本篇主要是用java多線程模擬實現(xiàn)基于Zookeeper的分布式讀寫鎖怔毛。

Zookeeper偽集群部署實踐

Zookeeper搭建分布式集群至少需要三臺服務(wù)器员萍,手頭確實沒有那么多資源,好在Zookeeper允許在一臺機(jī)器上完成一個偽集群的搭建拣度。所謂偽集群其實就是所有的機(jī)器都在同一臺機(jī)器上碎绎,但還是以集群的特性來對外提供服務(wù)的。這種模式和集群十分相似抗果,只不過在同一臺機(jī)器上的不同Zookeeper實例是以不同的端口號來互相通信的筋帖。從官網(wǎng)下載到最新的Zookeeper發(fā)行版本壓縮包:zookeeper-3.4.10.tar.gz,將Zookeeper解壓到我的阿里云服務(wù)器/usr/local/zookeeper地址下。

配置

cp /usr/local/zookeeper/conf/zoo_sample.cfg zoo1.cfg
cp /usr/local/zookeeper/conf/zoo_sample.cfg zoo2.cfg
cp /usr/local/zookeeper/conf/zoo_sample.cfg zoo3.cfg

zoo_sample.cfg為模板復(fù)制三個偽集群的配置冤馏。三個配置文件的詳細(xì)配置如下:
zoo1.cfg:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data_1/
clientPort=2181
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889

zoo2.cfg:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data_2/
clientPort=2182
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889

zoo3.cfg:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data_3/
clientPort=2183
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889
  • tickTime : 心跳時間日麸,單位是毫秒 ,session的最小超時時間是2*tickTime
  • initLimit:多少個tickTime內(nèi)逮光,允許其他server連接并初始化數(shù)據(jù)
  • syncLimit:多少個tickTime內(nèi)代箭,允許follower同步
  • dataDir:存放zookeeper數(shù)據(jù)的路徑
  • clientPort:監(jiān)聽客戶端連接的端口號
  • server.id=ip:port1:port2:其中id表示該服務(wù)器的id號,需要注意的是,集群部署必須在dataDir路徑下新建一個myid的文件涕刚,內(nèi)容就是當(dāng)前服務(wù)器的id號嗡综。ip是當(dāng)前服務(wù)器的ip,port1表示Follower服務(wù)器和Leader進(jìn)行運行時通信和數(shù)據(jù)同步使用的端口號杜漠,port2端口用于Leader選舉過程中的投票通信极景。
    使用下面命令開啟三個Zookeeper server服務(wù):

root@iZwz9hs5ueqrblutmr5bncZ:/usr/local/zookeeper# bin/zkServer.sh start zoo1.cfg
root@iZwz9hs5ueqrblutmr5bncZ:/usr/local/zookeeper# bin/zkServer.sh start zoo2.cfg
root@iZwz9hs5ueqrblutmr5bncZ:/usr/local/zookeeper# bin/zkServer.sh start zoo3.cfg

可以在/usr/local/zookeeper路徑下的zookeeper.out文件中看到服務(wù)開啟的日志信息。

測試客戶端的連接

在測試前驾茴,需要在阿里云服務(wù)器管理中心打開相應(yīng)的Zookeeper客戶端連接端口盼樟。打開Windows命令行,定位到Zookeeper路徑執(zhí)行下面命令:

D:\zookeeper-3.4.10>bin\zkCli -server 服務(wù)器ip:2181

發(fā)現(xiàn)Windows Zookeeper客戶端已經(jīng)成功連接上阿里云上的Zookeeper集群锈至,創(chuàng)建一個test結(jié)點晨缴。此時我們連接的是2181端口,也就是偽集群當(dāng)中zoo1的監(jiān)聽的端口號裹赴。如下圖所示:

image

另外在阿里云服務(wù)器中也開啟一個客戶端的連接喜庞,看看我們之前添加的test結(jié)點有沒有真正的添加到Zookeeper集群中。

image

上圖可以看到test結(jié)點確實已經(jīng)在集群當(dāng)中了棋返,而且此時連接的是2183端口延都,也就是zoo3這個server。由此驗證了Zookeeper偽集群搭建成功睛竣。

分布式讀寫鎖實現(xiàn)

下面就基于上面搭建的Zookeeper偽集群晰房,實現(xiàn)分布式讀寫鎖。我們都知道當(dāng)一個事務(wù)獲得讀鎖之后,在這之后的事務(wù)只能獲取讀鎖殊者,寫鎖獲取必須等到所有讀鎖全部釋放与境。而寫鎖一旦獲取后其他事務(wù)的讀鎖以及寫鎖都必須等待該寫鎖釋放后獲取。那么Zookeeper是怎么實現(xiàn)這一特性的呢猖吴?
首先Zookeeper創(chuàng)建節(jié)點時可以創(chuàng)建4中形式的節(jié)點摔刁,分別是持久節(jié)點(PERSISTENT)、持久順序節(jié)點(PERSISTENT_SEQUENTIAL)海蔽、臨時節(jié)點(EPHEMERAL)共屈、臨時順序節(jié)點(EPHEMERAL_SEQUENTIAL)。

  • 持久節(jié)點:該數(shù)據(jù)節(jié)點被創(chuàng)建后党窜,就會一直存在于Zookeeper服務(wù)器上拗引,直到有刪除操作主動清除這個節(jié)點。
  • 持久順序節(jié)點:基本特性和持久節(jié)點是一致的幌衣,額外的特性表現(xiàn)在順序性上矾削,在創(chuàng)建節(jié)點的過程中,Zookeeper會自動為給定節(jié)點名加上一個遞增的數(shù)字后綴作為新的節(jié)點名豁护。
  • 臨時節(jié)點:臨時節(jié)點的生命周期和客戶端的會話綁定在一起哼凯,也就是說,如果客戶端會話失效择镇,那么節(jié)點就會被自動清理掉挡逼。另外,臨時節(jié)點不能創(chuàng)建子結(jié)點腻豌。
  • 臨時順序節(jié)點:特性和臨時節(jié)點一致家坎,額外擁有順序的特性。

其實分布式讀寫鎖就是基于Zookeeper順序節(jié)點特性來實現(xiàn)的吝梅。每個事務(wù)想要獲得鎖時都去同一個節(jié)點/lock下虱疏,創(chuàng)建命名規(guī)范為[hostname]-鎖類型-序號的臨時順序節(jié)點。如果自身想要獲取的是讀鎖苏携,那么只要查看/lock下節(jié)點順序比自身小的節(jié)點中沒有寫類型的節(jié)點便可獲得讀鎖做瞪。如果自身想要獲取寫鎖,那么只要看到/lock下自己是順序最小的節(jié)點便可獲得寫鎖右冻。下面是多線程模擬事務(wù)的詳細(xì)實現(xiàn):

public class DistributedLockDemo {
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    private static String rootPath = "/lock";
    private static ZooKeeper zooKeeper;

    public static void main(String[] args) {
        try {
            zooKeeper = new ZooKeeper("119.23.216.241:2181", 5000, null); //連接我的Zookeeper集群
            zooKeeper.create(rootPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            WriteThread[] writeThreads = new WriteThread[5];  //創(chuàng)建5個寫鎖線程
            ReadThread[] readThreads = new ReadThread[10];    //創(chuàng)建10個讀鎖線程
            for (int i = 0; i < writeThreads.length; i++) {
                writeThreads[i] = new WriteThread("WriteThread_" + i);
                writeThreads[i].start();
            }
            for (int i = 0; i < readThreads.length; i++) {
                readThreads[i] = new ReadThread("ReadThread_" + i);
                readThreads[i].start();
            }
            TimeUnit.SECONDS.sleep(1);
            countDownLatch.countDown();
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    static class WriteThread extends Thread {
        public WriteThread(String name){
            super(name);
        }
        @Override
        public void run() {
            try {
                countDownLatch.await(); //多個線程同一時間競爭資源
                //path示例:/lock/WriteThread_0-W-0000000001
                String path = zooKeeper.create(rootPath + "/" + Thread.currentThread().getName() + "-W-", "".getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                System.out.println(Thread.currentThread().getName() + ": try to acquire write lock...");
                while (!canGetWriteLock(path)) {
                    TimeUnit.MILLISECONDS.sleep(500);
                }
                afterGetLockDo(path, true);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class ReadThread extends Thread {
        public ReadThread(String name){
            super(name);
        }
        @Override
        public void run() {
            try {
                countDownLatch.await();
                String path = zooKeeper.create(rootPath + "/" + Thread.currentThread().getName() + "-R-", "".getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                System.out.println(Thread.currentThread().getName() + ": try to acquire read lock...");
                while (!canGetReadLock(path)){
                    TimeUnit.MILLISECONDS.sleep(500);
                }
                afterGetLockDo(path, false);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    }

    //判斷是否可以獲取寫鎖
    private static boolean canGetWriteLock(String path) {
        List<String> children = null; // /lock下所有子節(jié)點列表
        try {
            children = zooKeeper.getChildren(rootPath, false);
            //基于節(jié)點名當(dāng)中的序號進(jìn)行排序
            Collections.sort(children, new Comparator<String>() {
                public int compare(String o1, String o2) {
                    int index1 = o1.lastIndexOf("-");
                    int index2 = o2.lastIndexOf("-");
                    long a = Long.parseLong(o1.substring(index1 + 1));
                    long b = Long.parseLong(o2.substring(index2 + 1));
                    return (int) (a - b);
                }
            });
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //只要最小的節(jié)點是自身便可獲得寫鎖
        boolean result = path.replace("/lock/", "").equals(children.get(0));
        return result;
    }

    //判斷是否可以獲得讀鎖
    private static boolean canGetReadLock(String path){
        List<String> children = null;
        try {
            children = zooKeeper.getChildren(rootPath, false);
            Collections.sort(children, new Comparator<String>() {
                public int compare(String o1, String o2) {
                    int index1 = o1.lastIndexOf("-");
                    int index2 = o2.lastIndexOf("-");
                    long a = Long.parseLong(o1.substring(index1 + 1));
                    long b = Long.parseLong(o2.substring(index2 + 1));
                    return (int) (a - b);
                }
            });
            //只要序號比自己小的節(jié)點中沒有寫類型節(jié)點便可獲得讀鎖
            int index = children.indexOf(path.replace("/lock/", ""));
            for (int i = 0; i < index; i++) {
                String child = children.get(i);
                int k = child.indexOf("-");
                if (child.substring(k+1, k+2).equals("W")) return false;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        return true;
    }

    //獲得鎖后所做的工作
    private static void afterGetLockDo(String path, boolean isWriteLock) {
        if (isWriteLock) {
            System.out.println(Thread.currentThread().getName() + ": get write lock...");
            try {
                TimeUnit.SECONDS.sleep(2); //模擬寫事務(wù)所做的工作
                System.out.println(Thread.currentThread().getName() + ": release write lock...");
                zooKeeper.delete(path, -1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println(Thread.currentThread().getName() + ": get read lock...");
            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName() + ": release read lock...");
                zooKeeper.delete(path, -1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    }

}

最后的運行結(jié)果如下圖所示:

image

** 可以看到所有線程都是符合正常的讀寫鎖邏輯并且獲取到相關(guān)的鎖資源装蓬。至此,Zookeeper讀寫鎖就實現(xiàn)啦纱扭。 **

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末牍帚,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子乳蛾,更是在濱河造成了極大的恐慌暗赶,老刑警劉巖鄙币,帶你破解...
    沈念sama閱讀 223,002評論 6 519
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異蹂随,居然都是意外死亡十嘿,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,357評論 3 400
  • 文/潘曉璐 我一進(jìn)店門岳锁,熙熙樓的掌柜王于貴愁眉苦臉地迎上來绩衷,“玉大人,你說我怎么就攤上這事浸锨〈狡福” “怎么了版姑?”我有些...
    開封第一講書人閱讀 169,787評論 0 365
  • 文/不壞的土叔 我叫張陵柱搜,是天一觀的道長。 經(jīng)常有香客問我剥险,道長聪蘸,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,237評論 1 300
  • 正文 為了忘掉前任表制,我火速辦了婚禮健爬,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘么介。我一直安慰自己娜遵,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 69,237評論 6 398
  • 文/花漫 我一把揭開白布壤短。 她就那樣靜靜地躺著设拟,像睡著了一般。 火紅的嫁衣襯著肌膚如雪久脯。 梳的紋絲不亂的頭發(fā)上纳胧,一...
    開封第一講書人閱讀 52,821評論 1 314
  • 那天,我揣著相機(jī)與錄音帘撰,去河邊找鬼跑慕。 笑死,一個胖子當(dāng)著我的面吹牛摧找,可吹牛的內(nèi)容都是我干的核行。 我是一名探鬼主播,決...
    沈念sama閱讀 41,236評論 3 424
  • 文/蒼蘭香墨 我猛地睜開眼蹬耘,長吁一口氣:“原來是場噩夢啊……” “哼芝雪!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起婆赠,我...
    開封第一講書人閱讀 40,196評論 0 277
  • 序言:老撾萬榮一對情侶失蹤绵脯,失蹤者是張志新(化名)和其女友劉穎佳励,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蛆挫,經(jīng)...
    沈念sama閱讀 46,716評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡赃承,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,794評論 3 343
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了悴侵。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瞧剖。...
    茶點故事閱讀 40,928評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖可免,靈堂內(nèi)的尸體忽然破棺而出抓于,到底是詐尸還是另有隱情,我是刑警寧澤浇借,帶...
    沈念sama閱讀 36,583評論 5 351
  • 正文 年R本政府宣布捉撮,位于F島的核電站,受9級特大地震影響妇垢,放射性物質(zhì)發(fā)生泄漏巾遭。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,264評論 3 336
  • 文/蒙蒙 一闯估、第九天 我趴在偏房一處隱蔽的房頂上張望灼舍。 院中可真熱鬧,春花似錦涨薪、人聲如沸骑素。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,755評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽献丑。三九已至,卻和暖如春光督,著一層夾襖步出監(jiān)牢的瞬間阳距,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,869評論 1 274
  • 我被黑心中介騙來泰國打工结借, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留筐摘,地道東北人。 一個月前我還...
    沈念sama閱讀 49,378評論 3 379
  • 正文 我出身青樓船老,卻偏偏與公主長得像咖熟,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子柳畔,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,937評論 2 361

推薦閱讀更多精彩內(nèi)容