《分布式_Zookeeper》_其他兩種API介紹和分布式鎖的實(shí)現(xiàn)

前面介紹倆中客戶端,后面實(shí)現(xiàn)分布式鎖,注冊(cè)中心 見Dubbo專題

zookeeper客戶端

1.zkclient.sh(linux)(這里忽略,啟動(dòng)原生api,ZookeeperMain)

2.原生api(之前已介紹)

3.zkClient (https://github.com/sgroschupf/zkclient

4.curator(http://curator.apache.org

原理

`同mybatis與hibernate`的區(qū)別,對(duì)原生api的一種封裝,只是程度不一樣凡泣,curator更復(fù)雜一些,但是它的stream風(fēng)格不錯(cuò)!

簡(jiǎn)單實(shí)用

1.zkclient的使用

依賴
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>
相關(guān)crud及Test
package com.huey.zkclient.znode;
/**
* @author huey China.
* @Description : zkClient Crud
* @Date Created in 2018/11/18 下午2:55
*/

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;

import java.util.List;

public class ZkClientCrud<T> {

   ZkClient zkClient;
   private String connectString = "192.168.59.2:2181,192.168.59.3:2181,192.168.59.4:2181";

   public ZkClientCrud() {
      this.zkClient = new ZkClient(connectString,5000,5000,new SerializableSerializer());
   }


   /***
    *
    * @param path
    * @param data
    */
   public void createPersistent(String path,Object data){
      zkClient.createPersistent(path,data);
   }

   public  T readData(String path){
      return zkClient.readData(path);

   }

   public List<String> getChildren(String path){
      return zkClient.getChildren(path);

   }

   public  void writeData(String path,Object object){
      zkClient.writeData(path,object);

   }

   public  void deleteRecursive(String path){
      zkClient.deleteRecursive(path);

   }



   /***
    * 支持創(chuàng)建遞歸方式
    * @param path
    * @param createParents
    */
   public void createPersistent(String path,boolean createParents){
      zkClient.createPersistent(path,createParents);
   }
}
package com.huey.zkclient.znode;


import org.junit.Test;

/**
* @author huey China.
* @Description : zkClient CRUD
* @Date Created in 2018/11/18 下午3:05
*/

public class ZkclientTest {
    public static void main(String[] args) {
        ZkClientCrud zkClientCrud = new ZkClientCrud();
        User user = new User();
        user.setAge(18);
        user.setName("huey");
        zkClientCrud.createPersistent("/huey_zkClient", user);// ok
        System.out.println(zkClientCrud.readData("/huey_zkClient")); //ok
        user.setAge(20);
        zkClientCrud.writeData("/huey_zkClient",user);
        System.out.println(zkClientCrud.readData("/huey_zkClient")); //ok
    }

    @Test
    public void testDel(){
        ZkClientCrud zkClientCrud = new ZkClientCrud();
        User user = new User();
        user.setAge(18);
        user.setName("huey");
        zkClientCrud.deleteRecursive("/huey_zkClient"); // ok
    }
}

watcher
package com.huey.zkclient.watcher;
/**
* @author huey China.
* @Description : zkClientWatcher
* @Date Created in 2018/11/18 下午2:54
*/

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.zookeeper.Watcher;

import java.util.List;

public class ZkClientWatcher<T> {
   ZkClient zkClient;
   private String connectString = "192.168.59.2:2181,192.168.59.3:2181,192.168.59.4:2181";
   public ZkClientWatcher() {
      this.zkClient = new ZkClient(connectString,5000,5000,new SerializableSerializer());
   }

   public  T readData(String path){
      return zkClient.readData(path);

   }

   public List<String> getChildren(String path){
      return zkClient.getChildren(path);

   }

   public  void writeData(String path,Object object){
      zkClient.writeData(path,object);

   }

   public  void deleteRecursive(String path){
      zkClient.deleteRecursive(path);

   }

   /***
    *
    * @param path
    * @param data
    */
   public void createPersistent(String path,Object data){
      zkClient.createPersistent(path,data);
   }


   public void lister(String path){
      //對(duì)父節(jié)點(diǎn)添加監(jiān)聽變化皮假。
      zkClient.subscribeDataChanges(path, new IZkDataListener() {
         @Override
         public void handleDataChange(String dataPath, Object data) throws Exception {
            System.out.printf("變更的節(jié)點(diǎn)為:%s,%s", dataPath,data );
         }
         @Override
         public void handleDataDeleted(String dataPath) throws Exception {
            System.out.printf("刪除的節(jié)點(diǎn)為:%s", dataPath );
         }
      });
      //對(duì)父節(jié)點(diǎn)添加監(jiān)聽子節(jié)點(diǎn)變化鞋拟。
      zkClient.subscribeChildChanges(path, new IZkChildListener() {
         @Override
         public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
            System.out.println("parentPath: " + parentPath+",currentChilds:"+currentChilds);
         }
      });
      //對(duì)父節(jié)點(diǎn)添加監(jiān)聽子節(jié)點(diǎn)變化。
      zkClient.subscribeStateChanges(new IZkStateListener() {
         @Override
         public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
            if(state== Watcher.Event.KeeperState.SyncConnected){
               //當(dāng)我重新啟動(dòng)后start惹资,監(jiān)聽觸發(fā)
               System.out.println("連接成功");
            }else if(state== Watcher.Event.KeeperState.Disconnected){
               System.out.println("連接斷開");//當(dāng)我在服務(wù)端將zk服務(wù)stop時(shí)贺纲,監(jiān)聽觸發(fā)
            }else
               System.out.println("其他狀態(tài)"+state);
         }
         @Override
         public void handleNewSession() throws Exception {
            System.out.println("重建session");
         }
         @Override
         public void handleSessionEstablishmentError(Throwable error) throws Exception {
         }
      });

   }

}

package com.huey.zkclient.watcher;

import com.huey.zkclient.znode.User;
import org.junit.Test;

/**
 * 由于zkClient創(chuàng)建連接的時(shí)候指定了默認(rèn)的序列化類-new SerializableSerializer(),
 * 所以存儲(chǔ)在節(jié)點(diǎn)上的值也是序列化后的字節(jié)數(shù)組,當(dāng)使用zkCli.sh在控制臺(tái)set /xxx/xx的值時(shí)褪测,
 * 存儲(chǔ)的是普通的字符串字節(jié)數(shù)組猴誊。所以當(dāng)set值時(shí)雖然觸發(fā)了值改變事件,但zkClient無法反序列化這個(gè)值侮措。
 * 1懈叹、在我們ZkClientWatcher這個(gè)類中是加了序列化的(org.I0Itec.zkclient.ZkClient#ZkClient(org.I0Itec.zkclient.IZkConnection, int, org.I0Itec.zkclient.serialize.ZkSerializer)
 * 在zkCli.sh 并沒有 然后我為了驗(yàn)證 我在zkCli.sh 刪除節(jié)點(diǎn)和增加節(jié)點(diǎn)都可以
 * 感應(yīng)到事件
 *
 * @author huey China.?
 * @Description :
 * @Date Created in 2018/11/18 下午3:39
 */

public class ZkClientWatcherTest {

    private  static  ZkClientWatcher zkClientWatcher = new ZkClientWatcher();

    public static void main(String[] args) throws InterruptedException {

        String path = "/huey_zkClient";
        zkClientWatcher.deleteRecursive(path);
        zkClientWatcher.lister(path);
        User user = new User();
        user.setAge(18);
        user.setName("huey");
        zkClientWatcher.createPersistent(path, user);
        Thread.sleep(2000);
        user.setAge(23);
        zkClientWatcher.writeData(path, user);//更改 ok
        Thread.sleep(Integer.MAX_VALUE);
    }


    /**
    *
    *ok
    */
    @Test
    public void testUpdate(){
        String path = "/huey_zkClient";
        zkClientWatcher.writeData(path,System.currentTimeMillis());
    }

    /**
    * @author huey China.
    * @Description : ok
    * @Date Created in 2018/11/18 下午3:56
    */
    @Test
    public void testDel(){
        String path = "/huey_zkClient";
        zkClientWatcher.deleteRecursive(path);
    }

}

2.curator的使用

依賴
<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>4.0.0</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>zookeeper</artifactId>
                    <groupId>org.apache.zookeeper</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>
相關(guān)crud及Test
package com.huey.curator.znode;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class CuratorCrud {
   private String connectString = "192.168.59.2:2181,192.168.59.3:2181,192.168.59.4:2181";
   CuratorFramework cf ;
   public CuratorCrud() {
      //1 重試策略:初試時(shí)間為1s 重試10次
      RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
      //2 通過工廠創(chuàng)建連接
      cf = CuratorFrameworkFactory.builder()
              .connectString(connectString)
              .sessionTimeoutMs(5000)
              .retryPolicy(retryPolicy)
//                  .namespace("super")
              .build();
      //3 開啟連接
      cf.start();
   }

   public String createPersistent(String path,String  data){
      try {
         cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,data.getBytes());
      } catch (Exception e) {
         e.printStackTrace();
      }

      return  null;
   }

   public String getData(String path){
      try {
         return new String(cf.getData().forPath(path));
      } catch (Exception e) {
         e.printStackTrace();
      }
      return  null;
   }


   public void delete(String path){
      try {
         cf.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
      } catch (Exception e) {
         e.printStackTrace();
      }

   }


   public void setData(String path,String  data){
      try {
         cf.setData().forPath(path,data.getBytes());
      } catch (Exception e) {
         e.printStackTrace();
      }
   }

}

package com.huey.curator.znode;

/**
* @author huey China.
* @Description : curator CRUD 類似 具體看api
* @Date Created in 2018/11/18 下午4:04
*/
public class CuratorTest {
   public static void main(String[] args) {
      CuratorCrud zkClientCrud=new CuratorCrud();
      zkClientCrud.createPersistent("/huey/abc","abc");
      System.out.println(zkClientCrud.getData("/huey/abc"));


   }
}

package com.huey.curator.watcher;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.RetryNTimes;

/**
* @author huey China.
* @Description : Curator 監(jiān)聽 test
* @Date Created in 2018/11/18 下午4:09
*/
public class CuratorWatcherTest {
    /** Zookeeper info */
    private static String connectString = "192.168.59.2:2181,192.168.59.3:2181,192.168.59.4:2181";
    private static final String ZK_PATH = "/curator_test";

    public static void main(String[] args) throws Exception {
        // 1.Connect to zk
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                connectString,
                new RetryNTimes(10, 5000)
        );
        client.start();
        System.out.println("zk client start successfully!");

        // 2.Register watcher  子目錄事件
        PathChildrenCache watcher = new PathChildrenCache(
                client,
                ZK_PATH,
                true    // if cache data
        );
        watcher.getListenable().addListener((client1, event) -> {
            ChildData data = event.getData();
            if (data == null) {
                System.out.println("No data in event[" + event + "]");
            } else {
                System.out.println("Receive event: "
                        + "type=[" + event.getType() + "]"
                        + ", path=[" + data.getPath() + "]"
                        + ", data=[" + new String(data.getData()) + "]"
                        + ", stat=[" + data.getStat() + "]");
            }
        });
        watcher.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        System.out.println("Register zk watcher successfully!");

        Thread.sleep(Integer.MAX_VALUE);
    }

}

分布式鎖

1.原生api簡(jiǎn)單模擬

package com.huey.locks;

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/***
 * 唯一特性 重復(fù)獲取
 */
public class WkLock {

    private ZooKeeper zookeeper;
    private String path = "/huey";
    private CountDownLatch latch=null;
    public WkLock(String host, String path) {
        try {
            this.zookeeper =new ZooKeeper(host, 3000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {

                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
        this.path = path;
    }

    /**
    * @author huey China.
    * @Description : 同步鎖控制唯一性
    * @Date Created in 2018/11/18 下午4:25
    */
    public void lock() {
        try {

                zookeeper.create(path, path.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } catch (Exception e) {

           this.latch = new CountDownLatch(1);
            try {
                this.latch.await(1000, TimeUnit.MILLISECONDS);//等待,這里應(yīng)該一直等待其他線程釋放鎖 來個(gè)線程
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
            this.latch = null;
            lock();
        }


    }

    /**
    * @author huey China.
    * @Description : 釋放鎖
    * @Date Created in 2018/11/18 下午4:26
    */
    public void unlock() {
        try {
            zookeeper.delete(path, -1);
        } catch (Exception e) {
        }
    }
}

package com.huey.locks;



/**
* @author huey China.
* @Description : demo
* @Date Created in 2018/11/18 下午4:31
*/
public class WukongLockTest implements  Runnable{

    WkLock wkLock=new WkLock("192.168.59.2:2181,192.168.59.3:2181,192.168.59.4:2181","/wklock");
     static int i=0;

    public static void main(String[] args) throws InterruptedException {

        WukongLockTest lockTest2=new WukongLockTest();
        Thread t1= new Thread(lockTest2);
        Thread t2=  new Thread(lockTest2);
        t1.start();t2.start();
        t1.join();t2.join();
        System.out.println(i);

    }



        @Override
        public void run() {
            try {
                for(int j=0;j<300;j++){
                    wkLock.lock();
                    i++;
                    wkLock.unlock();
                }

            } catch (Exception e) {
                e.printStackTrace();
            }


    }
}

2.Curator內(nèi)置API

package com.huey.locks;


import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorLockTest implements Runnable {

    final static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.59.2:2181,192.168.59.3:2181,192.168.59.4:2181").retryPolicy(new ExponentialBackoffRetry(100, 1)).build();
    static int i = 0;
    /**
     * @author huey China.
     * @Description : Curator內(nèi)置分布式api鎖處理 類似juc的信號(hào)量
     * @Date Created in 2018/11/18 下午4:30
     */
    final InterProcessMutex lock = new InterProcessMutex(client, "/lock");


    public static void main(String[] args) throws InterruptedException {
        client.start();
        CuratorLockTest lockTest2 = new CuratorLockTest();
        Thread t1 = new Thread(lockTest2);
        Thread t2 = new Thread(lockTest2);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(i);

    }

    @Override
    public void run() {
        try {
            for (int j = 0; j < 300; j++) {
                lock.acquire();

                i++;
                lock.release();
            }

        } catch (Exception e) {
            e.printStackTrace();
        }


    }
}

運(yùn)行結(jié)果為600

總結(jié)

原生api采用唯一性實(shí)現(xiàn)分扎,原生及curator性能略差澄成,適合低并發(fā),zk做分布式鎖并不是很好畏吓,redis實(shí)現(xiàn)更好一些(待續(xù))

.參考

官網(wǎng):http://zookeeper.apache.org
書籍:從Paxos到Zookeeper
網(wǎng)課: 推薦 慕課網(wǎng) 圖靈學(xué)院 谷粒學(xué)院

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末墨状,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子菲饼,更是在濱河造成了極大的恐慌肾砂,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,470評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件宏悦,死亡現(xiàn)場(chǎng)離奇詭異镐确,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)饼煞,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,393評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門辫塌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人派哲,你說我怎么就攤上這事〔粲鳎” “怎么了芭届?”我有些...
    開封第一講書人閱讀 162,577評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵储矩,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我褂乍,道長(zhǎng)持隧,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,176評(píng)論 1 292
  • 正文 為了忘掉前任逃片,我火速辦了婚禮屡拨,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘褥实。我一直安慰自己呀狼,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,189評(píng)論 6 388
  • 文/花漫 我一把揭開白布损离。 她就那樣靜靜地躺著哥艇,像睡著了一般。 火紅的嫁衣襯著肌膚如雪僻澎。 梳的紋絲不亂的頭發(fā)上貌踏,一...
    開封第一講書人閱讀 51,155評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音窟勃,去河邊找鬼祖乳。 笑死,一個(gè)胖子當(dāng)著我的面吹牛秉氧,可吹牛的內(nèi)容都是我干的眷昆。 我是一名探鬼主播,決...
    沈念sama閱讀 40,041評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼谬运,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼隙赁!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起梆暖,我...
    開封第一講書人閱讀 38,903評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤伞访,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后轰驳,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體厚掷,經(jīng)...
    沈念sama閱讀 45,319評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,539評(píng)論 2 332
  • 正文 我和宋清朗相戀三年级解,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了冒黑。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,703評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡勤哗,死狀恐怖抡爹,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情芒划,我是刑警寧澤冬竟,帶...
    沈念sama閱讀 35,417評(píng)論 5 343
  • 正文 年R本政府宣布欧穴,位于F島的核電站,受9級(jí)特大地震影響泵殴,放射性物質(zhì)發(fā)生泄漏涮帘。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,013評(píng)論 3 325
  • 文/蒙蒙 一笑诅、第九天 我趴在偏房一處隱蔽的房頂上張望调缨。 院中可真熱鬧,春花似錦吆你、人聲如沸弦叶。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,664評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽湾蔓。三九已至,卻和暖如春砌梆,著一層夾襖步出監(jiān)牢的瞬間默责,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,818評(píng)論 1 269
  • 我被黑心中介騙來泰國打工咸包, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留桃序,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,711評(píng)論 2 368
  • 正文 我出身青樓烂瘫,卻偏偏與公主長(zhǎng)得像媒熊,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子坟比,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,601評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容