前面介紹倆中客戶端,后面實(shí)現(xiàn)
分布式鎖
,注冊(cè)中心 見Dubbo專題
zookeeper客戶端
1.zkclient.sh(linux)(這里忽略,啟動(dòng)原生api,ZookeeperMain)
2.原生api(之前已介紹)
3.zkClient (https://github.com/sgroschupf/zkclient)
4.curator(http://curator.apache.org)
原理
`同mybatis與hibernate`的區(qū)別,對(duì)原生api的一種封裝,只是程度不一樣凡泣,curator更復(fù)雜一些,但是它的stream風(fēng)格不錯(cuò)!
簡(jiǎn)單實(shí)用
1.zkclient的使用
依賴
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
相關(guān)crud及Test
package com.huey.zkclient.znode;
/**
* @author huey China.
* @Description : zkClient Crud
* @Date Created in 2018/11/18 下午2:55
*/
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import java.util.List;
public class ZkClientCrud<T> {
ZkClient zkClient;
private String connectString = "192.168.59.2:2181,192.168.59.3:2181,192.168.59.4:2181";
public ZkClientCrud() {
this.zkClient = new ZkClient(connectString,5000,5000,new SerializableSerializer());
}
/***
*
* @param path
* @param data
*/
public void createPersistent(String path,Object data){
zkClient.createPersistent(path,data);
}
public T readData(String path){
return zkClient.readData(path);
}
public List<String> getChildren(String path){
return zkClient.getChildren(path);
}
public void writeData(String path,Object object){
zkClient.writeData(path,object);
}
public void deleteRecursive(String path){
zkClient.deleteRecursive(path);
}
/***
* 支持創(chuàng)建遞歸方式
* @param path
* @param createParents
*/
public void createPersistent(String path,boolean createParents){
zkClient.createPersistent(path,createParents);
}
}
package com.huey.zkclient.znode;
import org.junit.Test;
/**
* @author huey China.
* @Description : zkClient CRUD
* @Date Created in 2018/11/18 下午3:05
*/
public class ZkclientTest {
public static void main(String[] args) {
ZkClientCrud zkClientCrud = new ZkClientCrud();
User user = new User();
user.setAge(18);
user.setName("huey");
zkClientCrud.createPersistent("/huey_zkClient", user);// ok
System.out.println(zkClientCrud.readData("/huey_zkClient")); //ok
user.setAge(20);
zkClientCrud.writeData("/huey_zkClient",user);
System.out.println(zkClientCrud.readData("/huey_zkClient")); //ok
}
@Test
public void testDel(){
ZkClientCrud zkClientCrud = new ZkClientCrud();
User user = new User();
user.setAge(18);
user.setName("huey");
zkClientCrud.deleteRecursive("/huey_zkClient"); // ok
}
}
watcher
package com.huey.zkclient.watcher;
/**
* @author huey China.
* @Description : zkClientWatcher
* @Date Created in 2018/11/18 下午2:54
*/
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.zookeeper.Watcher;
import java.util.List;
public class ZkClientWatcher<T> {
ZkClient zkClient;
private String connectString = "192.168.59.2:2181,192.168.59.3:2181,192.168.59.4:2181";
public ZkClientWatcher() {
this.zkClient = new ZkClient(connectString,5000,5000,new SerializableSerializer());
}
public T readData(String path){
return zkClient.readData(path);
}
public List<String> getChildren(String path){
return zkClient.getChildren(path);
}
public void writeData(String path,Object object){
zkClient.writeData(path,object);
}
public void deleteRecursive(String path){
zkClient.deleteRecursive(path);
}
/***
*
* @param path
* @param data
*/
public void createPersistent(String path,Object data){
zkClient.createPersistent(path,data);
}
public void lister(String path){
//對(duì)父節(jié)點(diǎn)添加監(jiān)聽變化皮假。
zkClient.subscribeDataChanges(path, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.printf("變更的節(jié)點(diǎn)為:%s,%s", dataPath,data );
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.printf("刪除的節(jié)點(diǎn)為:%s", dataPath );
}
});
//對(duì)父節(jié)點(diǎn)添加監(jiān)聽子節(jié)點(diǎn)變化鞋拟。
zkClient.subscribeChildChanges(path, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("parentPath: " + parentPath+",currentChilds:"+currentChilds);
}
});
//對(duì)父節(jié)點(diǎn)添加監(jiān)聽子節(jié)點(diǎn)變化。
zkClient.subscribeStateChanges(new IZkStateListener() {
@Override
public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
if(state== Watcher.Event.KeeperState.SyncConnected){
//當(dāng)我重新啟動(dòng)后start惹资,監(jiān)聽觸發(fā)
System.out.println("連接成功");
}else if(state== Watcher.Event.KeeperState.Disconnected){
System.out.println("連接斷開");//當(dāng)我在服務(wù)端將zk服務(wù)stop時(shí)贺纲,監(jiān)聽觸發(fā)
}else
System.out.println("其他狀態(tài)"+state);
}
@Override
public void handleNewSession() throws Exception {
System.out.println("重建session");
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
}
});
}
}
package com.huey.zkclient.watcher;
import com.huey.zkclient.znode.User;
import org.junit.Test;
/**
* 由于zkClient創(chuàng)建連接的時(shí)候指定了默認(rèn)的序列化類-new SerializableSerializer(),
* 所以存儲(chǔ)在節(jié)點(diǎn)上的值也是序列化后的字節(jié)數(shù)組,當(dāng)使用zkCli.sh在控制臺(tái)set /xxx/xx的值時(shí)褪测,
* 存儲(chǔ)的是普通的字符串字節(jié)數(shù)組猴誊。所以當(dāng)set值時(shí)雖然觸發(fā)了值改變事件,但zkClient無法反序列化這個(gè)值侮措。
* 1懈叹、在我們ZkClientWatcher這個(gè)類中是加了序列化的(org.I0Itec.zkclient.ZkClient#ZkClient(org.I0Itec.zkclient.IZkConnection, int, org.I0Itec.zkclient.serialize.ZkSerializer)
* 在zkCli.sh 并沒有 然后我為了驗(yàn)證 我在zkCli.sh 刪除節(jié)點(diǎn)和增加節(jié)點(diǎn)都可以
* 感應(yīng)到事件
*
* @author huey China.?
* @Description :
* @Date Created in 2018/11/18 下午3:39
*/
public class ZkClientWatcherTest {
private static ZkClientWatcher zkClientWatcher = new ZkClientWatcher();
public static void main(String[] args) throws InterruptedException {
String path = "/huey_zkClient";
zkClientWatcher.deleteRecursive(path);
zkClientWatcher.lister(path);
User user = new User();
user.setAge(18);
user.setName("huey");
zkClientWatcher.createPersistent(path, user);
Thread.sleep(2000);
user.setAge(23);
zkClientWatcher.writeData(path, user);//更改 ok
Thread.sleep(Integer.MAX_VALUE);
}
/**
*
*ok
*/
@Test
public void testUpdate(){
String path = "/huey_zkClient";
zkClientWatcher.writeData(path,System.currentTimeMillis());
}
/**
* @author huey China.
* @Description : ok
* @Date Created in 2018/11/18 下午3:56
*/
@Test
public void testDel(){
String path = "/huey_zkClient";
zkClientWatcher.deleteRecursive(path);
}
}
2.curator的使用
依賴
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.0.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
相關(guān)crud及Test
package com.huey.curator.znode;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class CuratorCrud {
private String connectString = "192.168.59.2:2181,192.168.59.3:2181,192.168.59.4:2181";
CuratorFramework cf ;
public CuratorCrud() {
//1 重試策略:初試時(shí)間為1s 重試10次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
//2 通過工廠創(chuàng)建連接
cf = CuratorFrameworkFactory.builder()
.connectString(connectString)
.sessionTimeoutMs(5000)
.retryPolicy(retryPolicy)
// .namespace("super")
.build();
//3 開啟連接
cf.start();
}
public String createPersistent(String path,String data){
try {
cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,data.getBytes());
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public String getData(String path){
try {
return new String(cf.getData().forPath(path));
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public void delete(String path){
try {
cf.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
} catch (Exception e) {
e.printStackTrace();
}
}
public void setData(String path,String data){
try {
cf.setData().forPath(path,data.getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.huey.curator.znode;
/**
* @author huey China.
* @Description : curator CRUD 類似 具體看api
* @Date Created in 2018/11/18 下午4:04
*/
public class CuratorTest {
public static void main(String[] args) {
CuratorCrud zkClientCrud=new CuratorCrud();
zkClientCrud.createPersistent("/huey/abc","abc");
System.out.println(zkClientCrud.getData("/huey/abc"));
}
}
package com.huey.curator.watcher;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.RetryNTimes;
/**
* @author huey China.
* @Description : Curator 監(jiān)聽 test
* @Date Created in 2018/11/18 下午4:09
*/
public class CuratorWatcherTest {
/** Zookeeper info */
private static String connectString = "192.168.59.2:2181,192.168.59.3:2181,192.168.59.4:2181";
private static final String ZK_PATH = "/curator_test";
public static void main(String[] args) throws Exception {
// 1.Connect to zk
CuratorFramework client = CuratorFrameworkFactory.newClient(
connectString,
new RetryNTimes(10, 5000)
);
client.start();
System.out.println("zk client start successfully!");
// 2.Register watcher 子目錄事件
PathChildrenCache watcher = new PathChildrenCache(
client,
ZK_PATH,
true // if cache data
);
watcher.getListenable().addListener((client1, event) -> {
ChildData data = event.getData();
if (data == null) {
System.out.println("No data in event[" + event + "]");
} else {
System.out.println("Receive event: "
+ "type=[" + event.getType() + "]"
+ ", path=[" + data.getPath() + "]"
+ ", data=[" + new String(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
});
watcher.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
System.out.println("Register zk watcher successfully!");
Thread.sleep(Integer.MAX_VALUE);
}
}
分布式鎖
1.原生api簡(jiǎn)單模擬
package com.huey.locks;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/***
* 唯一特性 重復(fù)獲取
*/
public class WkLock {
private ZooKeeper zookeeper;
private String path = "/huey";
private CountDownLatch latch=null;
public WkLock(String host, String path) {
try {
this.zookeeper =new ZooKeeper(host, 3000, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
} catch (IOException e) {
e.printStackTrace();
}
this.path = path;
}
/**
* @author huey China.
* @Description : 同步鎖控制唯一性
* @Date Created in 2018/11/18 下午4:25
*/
public void lock() {
try {
zookeeper.create(path, path.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (Exception e) {
this.latch = new CountDownLatch(1);
try {
this.latch.await(1000, TimeUnit.MILLISECONDS);//等待,這里應(yīng)該一直等待其他線程釋放鎖 來個(gè)線程
} catch (InterruptedException e1) {
e1.printStackTrace();
}
this.latch = null;
lock();
}
}
/**
* @author huey China.
* @Description : 釋放鎖
* @Date Created in 2018/11/18 下午4:26
*/
public void unlock() {
try {
zookeeper.delete(path, -1);
} catch (Exception e) {
}
}
}
package com.huey.locks;
/**
* @author huey China.
* @Description : demo
* @Date Created in 2018/11/18 下午4:31
*/
public class WukongLockTest implements Runnable{
WkLock wkLock=new WkLock("192.168.59.2:2181,192.168.59.3:2181,192.168.59.4:2181","/wklock");
static int i=0;
public static void main(String[] args) throws InterruptedException {
WukongLockTest lockTest2=new WukongLockTest();
Thread t1= new Thread(lockTest2);
Thread t2= new Thread(lockTest2);
t1.start();t2.start();
t1.join();t2.join();
System.out.println(i);
}
@Override
public void run() {
try {
for(int j=0;j<300;j++){
wkLock.lock();
i++;
wkLock.unlock();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.Curator內(nèi)置API
package com.huey.locks;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorLockTest implements Runnable {
final static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.59.2:2181,192.168.59.3:2181,192.168.59.4:2181").retryPolicy(new ExponentialBackoffRetry(100, 1)).build();
static int i = 0;
/**
* @author huey China.
* @Description : Curator內(nèi)置分布式api鎖處理 類似juc的信號(hào)量
* @Date Created in 2018/11/18 下午4:30
*/
final InterProcessMutex lock = new InterProcessMutex(client, "/lock");
public static void main(String[] args) throws InterruptedException {
client.start();
CuratorLockTest lockTest2 = new CuratorLockTest();
Thread t1 = new Thread(lockTest2);
Thread t2 = new Thread(lockTest2);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);
}
@Override
public void run() {
try {
for (int j = 0; j < 300; j++) {
lock.acquire();
i++;
lock.release();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
運(yùn)行結(jié)果為600
總結(jié)
原生api采用唯一性實(shí)現(xiàn)分扎,原生及curator性能略差澄成,適合低并發(fā),
zk做分布式鎖并不是很好
畏吓,redis實(shí)現(xiàn)更好一些(待續(xù))
.參考
官網(wǎng):http://zookeeper.apache.org
書籍:從Paxos到Zookeeper
網(wǎng)課: 推薦 慕課網(wǎng) 圖靈學(xué)院 谷粒學(xué)院