Zookeeper系列文章
1.Zookeeper簡介
2.Zookeeper集群安裝
3.原生API操作Zookeeper
4.zkClient框架操作Zookeeper
5.Curator框架操作Zookeeper
原生API操作Zookeeper
- 首先要使用 java 操作 zookeeper, zookeeper 的 javaclient 使我們更輕松的去對 zookeeper 進行各種操作,我們引入 zookeeper-3.3.4. Jar 和 zkclient-0.1. Jar 即可岛马。
- zookeeper-3.3.4. Jar 為官方提供的 javaAPI, zkclient-0.1. Jar 則為在源生 api 基礎(chǔ)之上進行擴展的開源 JAVA 客戶端伪阶。
zookeeper-3.3.4. Jar API介紹
-
創(chuàng)建會話方法:客戶端可以通過創(chuàng)建一個 zookeeper 實例來連接 zookeeper 服務(wù)器。
ZooKeeper zk = new ZooKeeper(); //后面會有樣例詳細介紹怎么使用
//Zookeeper類一共了 4 個構(gòu)造方法哎迄,根據(jù)參數(shù)不同,參數(shù)說明如下:
//connectString:連接服務(wù)器列表退客,用”,“分割误证。
//sessionTimeout:心跳檢測時間周期(毫秒)
//wather:事件處理通知器倔幼。
//canBeReadOnly:標(biāo)識當(dāng)前會話是否支持只讀盖腿。
//sessionld 和 sessionPasswd:提供連接 zookeeper 的 sessionld 和密碼,通過這倆個確定唯一一臺客戶端损同,目的是可以提供重復(fù)會話翩腐。
-
創(chuàng)建節(jié)點(znode)方法:create:
提供了兩套創(chuàng)建節(jié)點的方法,同步和異步創(chuàng)建節(jié)點方式膏燃。
-
同步方式:
- 參數(shù) 1茂卦,節(jié)點路徑(名稱): /nodeName(不允許遞歸創(chuàng)建節(jié)點,也就是說在父節(jié)點不存在的情況下组哩,不允許創(chuàng)建子節(jié)點)
- 參數(shù) 2等龙,節(jié)點內(nèi)容:要求類型是字節(jié)數(shù)組(也就是說,不支持序列化方式伶贰,如果需要實現(xiàn)序列化蛛砰,可使用 java 相關(guān)序列化框架,如 Hessian黍衙、Kryo 框架)
- 參數(shù) 3泥畅,節(jié)點權(quán)限:使用 Ids. OPEN_ ACL UNSAFE 開放權(quán)限即可。(這個參數(shù)一般在權(quán)限沒有太高要求的場景下琅翻,沒必要關(guān)注)
- 參數(shù) 4位仁,節(jié)點類型:創(chuàng)建節(jié)點的類型:CreateMode.“,提供四種節(jié)點類型
- PERSISTENT(持久節(jié)點)
- PERSISTENT SEQUENTIAL(持久順序節(jié)點)
- EPHEMERAL(臨時節(jié)點)
- EPHEMERAL SEQUENTIAL(臨時順序節(jié)點)
-
異步方式:(在同步參數(shù)基礎(chǔ)上增加倆個參數(shù))
- 參數(shù) 5方椎,注冊一個異步回調(diào)函數(shù)聂抢,要實現(xiàn) AsynCallBack. StringCallBack 接口,重寫processResult (int rc, String path, Object ctx, String name)方法棠众,當(dāng)節(jié)點創(chuàng)建完畢后執(zhí)行此方法琳疏。
- rc:為服務(wù)端響應(yīng)碼 0 表示調(diào)用成功、4 表示端口連接闸拿、-110 表示指定節(jié)點存在轿亮、-112 表示會話已經(jīng)過期。
- path:接口調(diào)用時傳入 API 的數(shù)據(jù)節(jié)點的路徑參數(shù)
- ctx:為調(diào)用接口傳入 API 的 ctx 值
- name:實際在服務(wù)器端創(chuàng)建節(jié)點的名稱
- 參數(shù) 5方椎,注冊一個異步回調(diào)函數(shù)聂抢,要實現(xiàn) AsynCallBack. StringCallBack 接口,重寫processResult (int rc, String path, Object ctx, String name)方法棠众,當(dāng)節(jié)點創(chuàng)建完畢后執(zhí)行此方法琳疏。
參數(shù) 6胸墙,傳遞給回調(diào)函數(shù)的參數(shù),一般為上下文(Context)信息
-
刪除節(jié)點:delete 方法(api 提供了兩個接口:同步刪除和異步刪除方式)
- 同步方式:
- 參數(shù)1按咒,節(jié)點名稱/deletePath
- 參數(shù) 2迟隅,版本號但骨,即表明本次刪除操作是針對該數(shù)據(jù)的某個版本進行的操作。
- 異步方式:(也是在同步方法基礎(chǔ)上增加兩個參數(shù),使用方式和 create 方法一致)
- 參數(shù) 3:智袭,一個異步回調(diào)函數(shù)
- 參數(shù) 4: 用于傳遞上下文信息的對象奔缠。
- 同步方式:
注意:在 zookeeper 中,只允許刪除葉子節(jié)點信息吼野,也就是說如果當(dāng)前節(jié)點不是葉子節(jié)點則無法刪除校哎,或必須先刪除其下所有子節(jié)點。
-
樣例程序(創(chuàng)建會話,創(chuàng)建節(jié)點,刪除節(jié)點)
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
/**
* Zookeeper base學(xué)習(xí)筆記
*/
public class ZookeeperBase {
/** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.0.160:2181,192.168.0.161:2181,192.168.0.162:2181";
/** session超時時間 */
static final int SESSION_OUTTIME = 2000;//ms
/** 信號量瞳步,阻塞程序執(zhí)行闷哆,用于等待zookeeper連接成功,發(fā)送成功信號 */
static final CountDownLatch connectedSemaphore = new CountDownLatch(1);
public static void main(String[] args) throws Exception{
ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher(){
@Override
public void process(WatchedEvent event) {
//獲取事件的狀態(tài)
KeeperState keeperState = event.getState();
EventType eventType = event.getType();
//如果是建立連接
if(KeeperState.SyncConnected == keeperState){
if(EventType.None == eventType){
//如果建立連接成功单起,則發(fā)送信號量抱怔,讓后續(xù)阻塞程序向下執(zhí)行
connectedSemaphore.countDown();
System.out.println("zk 建立連接");
}
}
}
});
//進行阻塞
connectedSemaphore.await();
System.out.println("..");
//創(chuàng)建父節(jié)點
// zk.create("/testRoot", "testRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//創(chuàng)建子節(jié)點
// zk.create("/testRoot/children", "children data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//獲取節(jié)點洗信息
// byte[] data = zk.getData("/testRoot", false, null);
// System.out.println(new String(data));
// System.out.println(zk.getChildren("/testRoot", false));
//修改節(jié)點的值
// zk.setData("/testRoot", "modify data root".getBytes(), -1);
// byte[] data = zk.getData("/testRoot", false, null);
// System.out.println(new String(data));
//判斷節(jié)點是否存在
// System.out.println(zk.exists("/testRoot/children", false));
//刪除節(jié)點
// zk.delete("/testRoot/children", -1);
// System.out.println(zk.exists("/testRoot/children", false));
zk.close();
}
}
-
GetChildren 讀取數(shù)據(jù)方法:包括子節(jié)點列表的獲取和子節(jié)點數(shù)據(jù)的獲取。
- 參數(shù) 1嘀倒,path:獲取指定節(jié)點的下的數(shù)據(jù)(獲取子節(jié)點列表)
- 參數(shù) 2屈留,watcher:注冊的 watcher,一旦在本次子節(jié)點獲取后测蘑,子節(jié)點列表發(fā)生變更的話灌危,那么就會向客戶端發(fā)送通知。該參數(shù)允許為 null碳胳。(如果為null,但wath參數(shù)為true時,會去取實例化ZooKeeper()連接對象時設(shè)置的那個watcher)
- 參數(shù) 3勇蝙,wath:表明是否需要注冊一個 watcher;如果為 true固逗,則會使用到 zookeeper 客戶端上文中提到的那個默認(rèn) watcher浅蚪。如果為 false,則表明不需要注冊 Watcher烫罩。
- 參數(shù) 4惜傲,cb:回調(diào)函數(shù)。
- 參數(shù) 5贝攒,ctx:上下文信息對象盗誊。
- 參數(shù) 6,stat:指定數(shù)據(jù)節(jié)點的節(jié)點狀態(tài)信息隘弊。
注意:當(dāng)我們獲取指定節(jié)點的子節(jié)點列表后哈踱,還需要訂閱這個子節(jié)點列表的變化通知,這時候就可以通過注冊一個 watcher 來實現(xiàn)梨熙,當(dāng)子節(jié)點被添加或刪除時开镣,服務(wù)器端就會觸發(fā)一個“NodeChildrenChanged“類型的事件通知,需要注意的是服務(wù)器端發(fā)送給客戶端的事件通知中咽扇,是不包含最新的節(jié)點列表的邪财,客戶端必須主動從新進行獲取陕壹,通常在客戶端收到這個事件通知后,就可以再次主動獲取最新的子節(jié)點列表了树埠。也就是說糠馆,zookeeper 服務(wù)端在向客戶端發(fā)送 watcher"NodeChildrenChanged“事件通知的時候,僅僅只發(fā)了一個通知怎憋,不會把節(jié)點變化情況發(fā)給客戶端又碌,需要客戶端自己重新獲取,另外 Watcher 通知是一次性的绊袋,即觸發(fā)后失效毕匀,因此客戶端需要反復(fù)注冊 Watcher 才行。
-
getData 方法:獲取指定節(jié)點的數(shù)據(jù)內(nèi)容愤炸。
- 參數(shù) 1期揪,path:路徑
- 參數(shù) 2,watcher:注冊的 watcher 對象规个。一旦之后節(jié)點內(nèi)容有變更凤薛,則會像客戶端發(fā)送通知,該參數(shù)允許為 null诞仓。
- 參數(shù) 3缤苫,stat:指定節(jié)點的狀態(tài)信息。
- 參數(shù) 4墅拭,watch:是否使用 watcher活玲,如果為 true 則使用默認(rèn)上文中的 watcher, false 則不使用 watcher。
- 參數(shù) 5谍婉,cb:回調(diào)函數(shù)舒憾。
- 參數(shù) 6,ctx:用于傳遞的下文信息對象穗熬。
注意:該方法和 getChildren 方法基本相同镀迂,主要是注冊的 watcher 有所不同,客戶端在獲取一個階段數(shù)據(jù)內(nèi)容時唤蔗,是可以進行 watcher 注冊的探遵,一旦節(jié)點發(fā)生變更,則服務(wù)器端會發(fā)送給客戶端一個“NodeDataChanged“的事件通知妓柜。
-
setData 方法:修改指定節(jié)點的數(shù)據(jù)內(nèi)容箱季。
- 參數(shù) 1, path:路徑。
- 參數(shù) 2, data:數(shù)據(jù)內(nèi)容棍掐。
- 參數(shù) 3藏雏,版本號(-1 覆蓋之前所有的版本)
- 參數(shù) 4,cb:回調(diào)函數(shù)作煌。
- 參數(shù) 5诉稍,ctx:用于傳遞的下文信息對象蝠嘉。
-
exists 方法:檢測節(jié)點是否存在。
- 參數(shù) 1杯巨,path:路徑
- 參數(shù) 2,watcher:注冊的 watcher 對象努酸。一旦之后節(jié)點內(nèi)容有變更服爷,則會像客戶端發(fā)送通知,該參數(shù)允許為 null获诈。(用于三類事件監(jiān)聽:節(jié)點的創(chuàng)建仍源、刪除、更新)
- 參數(shù) 3舔涎,watch:是否使用 watcher笼踩,如果為 true 則使用默認(rèn)上文中的 watcher, false 則不使用 watcher。
- 參數(shù) 4亡嫌,cb:回調(diào)函數(shù)嚎于。
- 參數(shù) 5,ctx:用于傳遞的下文信息對象挟冠。
注意:exists 方法意義在于無論節(jié)點是否存在于购,都可以進行注冊 watcher,能夠?qū)?jié)點的創(chuàng)建知染、刪除和修改進行監(jiān)聽肋僧,但是其子節(jié)點發(fā)送各種變化,都不會通知客戶端控淡。
-
Zookeeper 有 watch 事件嫌吠,是一次性觸發(fā)的,當(dāng) watch 監(jiān)視的數(shù)據(jù)發(fā)生變化時掺炭,通知設(shè)置了該 watch 的 client辫诅,即 watcher.
-
同樣,其 watcher 是監(jiān)聽數(shù)據(jù)發(fā)送了某些變化竹伸,那就一定會有對應(yīng)的事件類型泥栖,和狀態(tài)類型。
- 事件類型: (znode 節(jié)點相關(guān)的)
- EventType. NodeCreated
- EventType. NodeDataChanged
- EventType. NodeChildrenChanged
- EventType. NodeDeleted
- 狀態(tài)類型:(是跟客戶端實例相關(guān)的)
- KeeperState. Disconnected
- KeeperState. SyncConnected
- KeeperState. AuthFailed
- KeeperState. Expired
- 事件類型: (znode 節(jié)點相關(guān)的)
-
Watcher 的特性:一次性勋篓、客戶端串行執(zhí)行吧享、輕量。
- 一次性:對于 ZK 的 watcher譬嚣,你只需要記住一~點:zookeeper 有 watch 事件钢颂,是一次性觸發(fā)的,當(dāng) watch 監(jiān)視的數(shù)據(jù)發(fā)生變化時拜银,通知設(shè)置了該 watch 的 client殊鞭,即 watcher遭垛,由于 zookeeper 的監(jiān)控都是一次性的所以每次必須設(shè)置監(jiān)控。
- 客戶端串行執(zhí)行:客戶端 Watcher 回調(diào)的過程是一個串行同步的過程操灿,這為我們保證了順序锯仪,同時需要開發(fā)人員注意一點,千萬不要因為一個 Watcher 的處理邏輯影響了整個客戶端的 Watcher 回調(diào)趾盐。
- 輕量:WatchedEvent 是 Zookeeper 整個 Watcher 通知機制的最小通知單元庶喜,整個結(jié)構(gòu)只包含三部分:通知狀態(tài)、事件類型和節(jié)點路徑救鲤。也就是說 Watcher 通知非常的簡單久窟,只會告訴客戶端發(fā)生了事件而不會告知其具體內(nèi)容,需要客戶自己去進行獲取本缠,比如 NodeDataChanged 事件斥扛,Zookeeper 只會通知客戶端指定節(jié)點的數(shù)據(jù)發(fā)生了變更,而不會直接提供具體的數(shù)據(jù)內(nèi)容丹锹。
我們通過一個示例稀颁,詳細學(xué)習(xí)下 Watcher 的概念和其目的。Watcher 示例:
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* Zookeeper Wathcher
* 本類就是一個Watcher類(實現(xiàn)了org.apache.zookeeper.Watcher類)
*/
public class ZooKeeperWatcher implements Watcher {
/** 定義原子變量 */
AtomicInteger seq = new AtomicInteger();
/** 定義session失效時間 */
private static final int SESSION_TIMEOUT = 10000;
/** zookeeper服務(wù)器地址 */
private static final String CONNECTION_ADDR = "192.168.0.160:2181";
/** zk父路徑設(shè)置 */
private static final String PARENT_PATH = "/testWatch";
/** zk子路徑設(shè)置 */
private static final String CHILDREN_PATH = "/testWatch/children";
/** 進入標(biāo)識 */
private static final String LOG_PREFIX_OF_MAIN = "【Main】";
/** zk變量 */
private ZooKeeper zk = null;
/** 信號量設(shè)置卷仑,用于等待zookeeper連接建立之后 通知阻塞程序繼續(xù)向下執(zhí)行 */
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
/**
* 創(chuàng)建ZK連接
* @param connectAddr ZK服務(wù)器地址列表
* @param sessionTimeout Session超時時間
*/
public void createConnection(String connectAddr, int sessionTimeout) {
this.releaseConnection();
try {
zk = new ZooKeeper(connectAddr, sessionTimeout, this);
System.out.println(LOG_PREFIX_OF_MAIN + "開始連接ZK服務(wù)器");
connectedSemaphore.await();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 關(guān)閉ZK連接
*/
public void releaseConnection() {
if (this.zk != null) {
try {
this.zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 創(chuàng)建節(jié)點
* @param path 節(jié)點路徑
* @param data 數(shù)據(jù)內(nèi)容
* @return
*/
public boolean createPath(String path, String data) {
try {
//設(shè)置監(jiān)控(由于zookeeper的監(jiān)控都是一次性的所以 每次必須設(shè)置監(jiān)控)
this.zk.exists(path, true);
System.out.println(LOG_PREFIX_OF_MAIN + "節(jié)點創(chuàng)建成功, Path: " +
this.zk.create( /**路徑*/
path,
/**數(shù)據(jù)*/
data.getBytes(),
/**所有可見*/
Ids.OPEN_ACL_UNSAFE,
/**永久存儲*/
CreateMode.PERSISTENT ) +
", content: " + data);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* 讀取指定節(jié)點數(shù)據(jù)內(nèi)容
* @param path 節(jié)點路徑
* @return
*/
public String readData(String path, boolean needWatch) {
try {
return new String(this.zk.getData(path, needWatch, null));
} catch (Exception e) {
e.printStackTrace();
return "";
}
}
/**
* 更新指定節(jié)點數(shù)據(jù)內(nèi)容
* @param path 節(jié)點路徑
* @param data 數(shù)據(jù)內(nèi)容
* @return
*/
public boolean writeData(String path, String data) {
try {
System.out.println(LOG_PREFIX_OF_MAIN + "更新數(shù)據(jù)成功峻村,path:" + path + ", stat: " +
this.zk.setData(path, data.getBytes(), -1));
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* 刪除指定節(jié)點
*
* @param path
* 節(jié)點path
*/
public void deleteNode(String path) {
try {
this.zk.delete(path, -1);
System.out.println(LOG_PREFIX_OF_MAIN + "刪除節(jié)點成功,path:" + path);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 判斷指定節(jié)點是否存在
* @param path 節(jié)點路徑
*/
public Stat exists(String path, boolean needWatch) {
try {
return this.zk.exists(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 獲取子節(jié)點
* @param path 節(jié)點路徑
*/
private List<String> getChildren(String path, boolean needWatch) {
try {
return this.zk.getChildren(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 刪除所有節(jié)點
*/
public void deleteAllTestPath() {
if(this.exists(CHILDREN_PATH, false) != null){
this.deleteNode(CHILDREN_PATH);
}
if(this.exists(PARENT_PATH, false) != null){
this.deleteNode(PARENT_PATH);
}
}
/**
* 收到來自Server的Watcher通知后的處理锡凝。
*/
@Override
public void process(WatchedEvent event) {
System.out.println("進入 process 粘昨。。窜锯。张肾。。event = " + event);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (event == null) {
return;
}
// 連接狀態(tài)
KeeperState keeperState = event.getState();
// 事件類型
EventType eventType = event.getType();
// 受影響的path
String path = event.getPath();
String logPrefix = "【W(wǎng)atcher-" + this.seq.incrementAndGet() + "】";
System.out.println(logPrefix + "收到Watcher通知");
System.out.println(logPrefix + "連接狀態(tài):\t" + keeperState.toString());
System.out.println(logPrefix + "事件類型:\t" + eventType.toString());
if (KeeperState.SyncConnected == keeperState) {
// 成功連接上ZK服務(wù)器
if (EventType.None == eventType) {
System.out.println(logPrefix + "成功連接上ZK服務(wù)器");
connectedSemaphore.countDown();
}
//創(chuàng)建節(jié)點
else if (EventType.NodeCreated == eventType) {
System.out.println(logPrefix + "節(jié)點創(chuàng)建");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.exists(path, true);
}
//更新節(jié)點
else if (EventType.NodeDataChanged == eventType) {
System.out.println(logPrefix + "節(jié)點數(shù)據(jù)更新");
System.out.println("我看看走不走這里........");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(logPrefix + "數(shù)據(jù)內(nèi)容: " + this.readData(PARENT_PATH, true));
}
//更新子節(jié)點
else if (EventType.NodeChildrenChanged == eventType) {
System.out.println(logPrefix + "子節(jié)點變更");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(logPrefix + "子節(jié)點列表:" + this.getChildren(PARENT_PATH, true));
}
//刪除節(jié)點
else if (EventType.NodeDeleted == eventType) {
System.out.println(logPrefix + "節(jié)點 " + path + " 被刪除");
}
else ;
}
else if (KeeperState.Disconnected == keeperState) {
System.out.println(logPrefix + "與ZK服務(wù)器斷開連接");
}
else if (KeeperState.AuthFailed == keeperState) {
System.out.println(logPrefix + "權(quán)限檢查失敗");
}
else if (KeeperState.Expired == keeperState) {
System.out.println(logPrefix + "會話失效");
}
else ;
System.out.println("--------------------------------------------");
}
/**
* <B>方法名稱:</B>測試zookeeper監(jiān)控<BR>
* <B>概要說明:</B>主要測試watch功能<BR>
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
//建立watcher
ZooKeeperWatcher zkWatch = new ZooKeeperWatcher();
//創(chuàng)建連接
zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);
//System.out.println(zkWatch.zk.toString());
Thread.sleep(1000);
// 清理節(jié)點
zkWatch.deleteAllTestPath();
if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "")) {
Thread.sleep(1000);
// 讀取數(shù)據(jù)
System.out.println("---------------------- read parent ----------------------------");
//zkWatch.readData(PARENT_PATH, true);
// 讀取子節(jié)點
System.out.println("---------------------- read children path ----------------------------");
zkWatch.getChildren(PARENT_PATH, true);
// 更新數(shù)據(jù)
zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");
Thread.sleep(1000);
// 創(chuàng)建子節(jié)點
zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "");
Thread.sleep(1000);
zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");
}
Thread.sleep(50000);
// 清理節(jié)點
zkWatch.deleteAllTestPath();
Thread.sleep(1000);
zkWatch.releaseConnection();
}
}
我們再通過一個示例锚扎,模擬分布式情況下修改節(jié)點數(shù)據(jù)時,多個服務(wù)監(jiān)聽此節(jié)點,示例:
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class ZKWatcher implements Watcher {
/** zk變量 */
private ZooKeeper zk = null;
/** 父節(jié)點path */
static final String PARENT_PATH = "/super";
/** 信號量設(shè)置吞瞪,用于等待zookeeper連接建立之后 通知阻塞程序繼續(xù)向下執(zhí)行 */
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
private List<String> cowaList = new CopyOnWriteArrayList<String>();
/** zookeeper服務(wù)器地址 */
public static final String CONNECTION_ADDR = "192.168.0.160:2181,192.168.0.161:2181,192.168.0.162:2181";
/** 定義session失效時間 */
public static final int SESSION_TIMEOUT = 30000;
public ZKWatcher() throws Exception{
zk = new ZooKeeper(CONNECTION_ADDR, SESSION_TIMEOUT, this);
System.out.println("開始連接ZK服務(wù)器");
connectedSemaphore.await();
}
@Override
public void process(WatchedEvent event) {
// 連接狀態(tài)
KeeperState keeperState = event.getState();
// 事件類型
EventType eventType = event.getType();
// 受影響的path
String path = event.getPath();
System.out.println("受影響的path : " + path);
if (KeeperState.SyncConnected == keeperState) {
// 成功連接上ZK服務(wù)器
if (EventType.None == eventType) {
System.out.println("成功連接上ZK服務(wù)器");
connectedSemaphore.countDown();
try {
if(this.zk.exists(PARENT_PATH, false) == null){
this.zk.create(PARENT_PATH, "root".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
List<String> paths = this.zk.getChildren(PARENT_PATH, true);
for (String p : paths) {
System.out.println(p);
this.zk.exists(PARENT_PATH + "/" + p, true);
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
//創(chuàng)建節(jié)點
else if (EventType.NodeCreated == eventType) {
System.out.println("節(jié)點創(chuàng)建");
try {
this.zk.exists(path, true);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
//更新節(jié)點
else if (EventType.NodeDataChanged == eventType) {
System.out.println("節(jié)點數(shù)據(jù)更新");
try {
//update nodes call function
this.zk.exists(path, true);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
//更新子節(jié)點
else if (EventType.NodeChildrenChanged == eventType) {
System.out.println("子節(jié)點 ... 變更");
try {
List<String> paths = this.zk.getChildren(path, true);
if(paths.size() >= cowaList.size()){
paths.removeAll(cowaList);
for(String p : paths){
this.zk.exists(path + "/" + p, true);
//this.zk.getChildren(path + "/" + p, true);
System.out.println("這個是新增的子節(jié)點 : " + path + "/" + p);
//add new nodes call function
}
cowaList.addAll(paths);
} else {
cowaList = paths;
}
System.out.println("cowaList: " + cowaList.toString());
System.out.println("paths: " + paths.toString());
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
//刪除節(jié)點
else if (EventType.NodeDeleted == eventType) {
System.out.println("節(jié)點 " + path + " 被刪除");
try {
//delete nodes call function
this.zk.exists(path, true);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
else ;
}
else if (KeeperState.Disconnected == keeperState) {
System.out.println("與ZK服務(wù)器斷開連接");
}
else if (KeeperState.AuthFailed == keeperState) {
System.out.println("權(quán)限檢查失敗");
}
else if (KeeperState.Expired == keeperState) {
System.out.println("會話失效");
}
else ;
System.out.println("--------------------------------------------");
}
}
-
實例兩個上面ZKWatcher對象,ZKWatcher其實就是簡單了實現(xiàn)了監(jiān)聽了/super數(shù)據(jù)節(jié)點,你可以把這想象成兩個不同的服務(wù)
public class Client1 {
public static void main(String[] args) throws Exception{
ZKWatcher myWatcher = new ZKWatcher();
Thread.sleep(100000000);
}
}
public class Client2 {
public static void main(String[] args) throws Exception{
ZKWatcher myWatcher = new ZKWatcher();
Thread.sleep(100000000);
}
}
-
通過下面這個小測試,測試對/super數(shù)據(jù)節(jié)點不同的操作,可以看到上面兩個小程序會分別打印對應(yīng)的信息
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class Test {
/** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.1.106:2181,192.168.1.107:2181,192.168.1.108:2181";
/** session超時時間 */
static final int SESSION_OUTTIME = 2000;//ms
/** 信號量,阻塞程序執(zhí)行驾孔,用于等待zookeeper連接成功芍秆,發(fā)送成功信號 */
static final CountDownLatch connectedSemaphore = new CountDownLatch(1);
public static void main(String[] args) throws Exception{
ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher(){
@Override
public void process(WatchedEvent event) {
//獲取事件的狀態(tài)
KeeperState keeperState = event.getState();
EventType eventType = event.getType();
//如果是建立連接
if(KeeperState.SyncConnected == keeperState){
if(EventType.None == eventType){
//如果建立連接成功,則發(fā)送信號量翠勉,讓后續(xù)阻塞程序向下執(zhí)行
connectedSemaphore.countDown();
System.out.println("zk 建立連接");
}
}
}
});
//進行阻塞
connectedSemaphore.await();
// //創(chuàng)建子節(jié)點
// zk.create("/super/c1", "c1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//創(chuàng)建子節(jié)點
// zk.create("/super/c2", "c2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//創(chuàng)建子節(jié)點
zk.create("/super/c3", "c3".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//創(chuàng)建子節(jié)點
// zk.create("/super/c4", "c4".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// zk.create("/super/c4/c44", "c44".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//獲取節(jié)點信息
// byte[] data = zk.getData("/testRoot", false, null);
// System.out.println(new String(data));
// System.out.println(zk.getChildren("/testRoot", false));
//修改節(jié)點的值
// zk.setData("/super/c1", "modify c1".getBytes(), -1);
// zk.setData("/super/c2", "modify c2".getBytes(), -1);
// byte[] data = zk.getData("/super/c2", false, null);
// System.out.println(new String(data));
// //判斷節(jié)點是否存在
// System.out.println(zk.exists("/super/c3", false));
// //刪除節(jié)點
// zk.delete("/super/c3", -1);
zk.close();
}
}
-
Zookeeperd的ACL
-
ACL (Access ControlList)妖啥,Zookeeper 作為一個分布式協(xié)調(diào)框架,其內(nèi)部存儲的都是一些關(guān)乎分布式系統(tǒng)運行時狀態(tài)的元數(shù)據(jù)对碌,尤其是設(shè)計到一些分布式鎖荆虱、Master 選舉和協(xié)調(diào)等應(yīng)用場景。我們需要有效地保障 zookeeper 中的數(shù)據(jù)安全,Zookeeper 提供一套完善的 ACL 權(quán)限控制機制來保障數(shù)據(jù)的安全怀读。ZK 提供了三種模式诉位。權(quán)限模式、授權(quán)對象菜枷、權(quán)限苍糠。
- 權(quán)限模式:Scheme,開發(fā)人員最多使用的如下四種權(quán)限模式:
- IP: ip 模式通過 ip 地址粒度來進行控制權(quán)限犁跪,例如配置了:ip:192.168.1.107 即表示權(quán)限控制都是針對這個 ip 地址的店茶,同時也支持按網(wǎng)段分配瑰煎,比如 192.168.1. *
- Digest: digest 是最常用的權(quán)限控制模式,也更符合我們對權(quán)限控制的認(rèn)識苫亦,其類似于_ "usermame: password“形式的權(quán)限標(biāo)識進行權(quán)限配置条舔。ZK 會對形成的權(quán)限標(biāo)識先后進行倆次編碼處理枫耳,分別是 SHA-1 加密算法、BASE64 編碼孟抗。
- World: World 是一直最開放的權(quán)限控制模式迁杨。這種模式可以看做為特殊的 Digest,他僅僅是一個標(biāo)識而已凄硼。
- Super:超級用戶模式铅协,在超級用戶模式下可以對 ZK 任意進行操作。
- 權(quán)限模式:Scheme,開發(fā)人員最多使用的如下四種權(quán)限模式:
授權(quán)對象:指的是權(quán)限賦予的用戶或者一個指定的實體摊沉,例如 ip 地址或機器等狐史。在不同的模式下,授權(quán)對象是不同的说墨。這種模式和權(quán)限對象一一對應(yīng)骏全。
-
權(quán)限:權(quán)限就是指那些通過權(quán)限檢測后可以被允許執(zhí)行的操作,在 ZK 中尼斧,對數(shù)據(jù)的操作權(quán)限分為頭下五大類:
- CREATE姜贡、DELETE、READ棺棵、WRITE楼咳、ADMIN
-
我們通過一個示例,詳細學(xué)習(xí)下 Auth 的概念和其目的烛恤。Auth 示例: [ZooKeeperAuth]
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
/**
* Zookeeper 節(jié)點授權(quán)
*/
public class ZookeeperAuth implements Watcher {
/** 連接地址 */
final static String CONNECT_ADDR = "192.168.0.160:2181";
/** 測試路徑 */
final static String PATH = "/testAuth";
final static String PATH_DEL = "/testAuth/delNode";
/** 認(rèn)證類型 */
final static String authentication_type = "digest";
/** 認(rèn)證正確方法 */
final static String correctAuthentication = "123456";
/** 認(rèn)證錯誤方法 */
final static String badAuthentication = "654321";
static ZooKeeper zk = null;
/** 計時器 */
AtomicInteger seq = new AtomicInteger();
/** 標(biāo)識 */
private static final String LOG_PREFIX_OF_MAIN = "【Main】";
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
@Override
public void process(WatchedEvent event) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (event==null) {
return;
}
// 連接狀態(tài)
KeeperState keeperState = event.getState();
// 事件類型
EventType eventType = event.getType();
// 受影響的path
String path = event.getPath();
String logPrefix = "【W(wǎng)atcher-" + this.seq.incrementAndGet() + "】";
System.out.println(logPrefix + "收到Watcher通知");
System.out.println(logPrefix + "連接狀態(tài):\t" + keeperState.toString());
System.out.println(logPrefix + "事件類型:\t" + eventType.toString());
if (KeeperState.SyncConnected == keeperState) {
// 成功連接上ZK服務(wù)器
if (EventType.None == eventType) {
System.out.println(logPrefix + "成功連接上ZK服務(wù)器");
connectedSemaphore.countDown();
}
} else if (KeeperState.Disconnected == keeperState) {
System.out.println(logPrefix + "與ZK服務(wù)器斷開連接");
} else if (KeeperState.AuthFailed == keeperState) {
System.out.println(logPrefix + "權(quán)限檢查失敗");
} else if (KeeperState.Expired == keeperState) {
System.out.println(logPrefix + "會話失效");
}
System.out.println("--------------------------------------------");
}
/**
* 創(chuàng)建ZK連接
*
* @param connectString
* ZK服務(wù)器地址列表
* @param sessionTimeout
* Session超時時間
*/
public void createConnection(String connectString, int sessionTimeout) {
this.releaseConnection();
try {
zk = new ZooKeeper(connectString, sessionTimeout, this);
//添加節(jié)點授權(quán)
zk.addAuthInfo(authentication_type,correctAuthentication.getBytes());
System.out.println(LOG_PREFIX_OF_MAIN + "開始連接ZK服務(wù)器");
//倒數(shù)等待
connectedSemaphore.await();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 關(guān)閉ZK連接
*/
public void releaseConnection() {
if (this.zk!=null) {
try {
this.zk.close();
} catch (InterruptedException e) {
}
}
}
/**
*
* <B>方法名稱:</B>測試函數(shù)<BR>
* <B>概要說明:</B>測試認(rèn)證<BR>
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
ZookeeperAuth testAuth = new ZookeeperAuth();
testAuth.createConnection(CONNECT_ADDR,2000);
List<ACL> acls = new ArrayList<ACL>(1);
for (ACL ids_acl : Ids.CREATOR_ALL_ACL) {
acls.add(ids_acl);
}
try {
zk.create(PATH, "init content".getBytes(), acls, CreateMode.PERSISTENT);
System.out.println("使用授權(quán)key:" + correctAuthentication + "創(chuàng)建節(jié)點:"+ PATH + ", 初始內(nèi)容是: init content");
} catch (Exception e) {
e.printStackTrace();
}
try {
zk.create(PATH_DEL, "will be deleted! ".getBytes(), acls, CreateMode.PERSISTENT);
System.out.println("使用授權(quán)key:" + correctAuthentication + "創(chuàng)建節(jié)點:"+ PATH_DEL + ", 初始內(nèi)容是: init content");
} catch (Exception e) {
e.printStackTrace();
}
// 獲取數(shù)據(jù)
getDataByNoAuthentication();
getDataByBadAuthentication();
getDataByCorrectAuthentication();
// 更新數(shù)據(jù)
updateDataByNoAuthentication();
updateDataByBadAuthentication();
updateDataByCorrectAuthentication();
// 刪除數(shù)據(jù)
deleteNodeByBadAuthentication();
deleteNodeByNoAuthentication();
deleteNodeByCorrectAuthentication();
//
Thread.sleep(1000);
deleteParent();
//釋放連接
testAuth.releaseConnection();
}
/** 獲取數(shù)據(jù):采用錯誤的密碼 */
static void getDataByBadAuthentication() {
String prefix = "[使用錯誤的授權(quán)信息]";
try {
ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);
//授權(quán)
badzk.addAuthInfo(authentication_type,badAuthentication.getBytes());
Thread.sleep(2000);
System.out.println(prefix + "獲取數(shù)據(jù):" + PATH);
System.out.println(prefix + "成功獲取數(shù)據(jù):" + badzk.getData(PATH, false, null));
} catch (Exception e) {
System.err.println(prefix + "獲取數(shù)據(jù)失敗母怜,原因:" + e.getMessage());
}
}
/** 獲取數(shù)據(jù):不采用密碼 */
static void getDataByNoAuthentication() {
String prefix = "[不使用任何授權(quán)信息]";
try {
System.out.println(prefix + "獲取數(shù)據(jù):" + PATH);
ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);
Thread.sleep(2000);
System.out.println(prefix + "成功獲取數(shù)據(jù):" + nozk.getData(PATH, false, null));
} catch (Exception e) {
System.err.println(prefix + "獲取數(shù)據(jù)失敗,原因:" + e.getMessage());
}
}
/** 采用正確的密碼 */
static void getDataByCorrectAuthentication() {
String prefix = "[使用正確的授權(quán)信息]";
try {
System.out.println(prefix + "獲取數(shù)據(jù):" + PATH);
System.out.println(prefix + "成功獲取數(shù)據(jù):" + zk.getData(PATH, false, null));
} catch (Exception e) {
System.out.println(prefix + "獲取數(shù)據(jù)失敗棒动,原因:" + e.getMessage());
}
}
/**
* 更新數(shù)據(jù):不采用密碼
*/
static void updateDataByNoAuthentication() {
String prefix = "[不使用任何授權(quán)信息]";
System.out.println(prefix + "更新數(shù)據(jù): " + PATH);
try {
ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);
Thread.sleep(2000);
Stat stat = nozk.exists(PATH, false);
if (stat!=null) {
nozk.setData(PATH, prefix.getBytes(), -1);
System.out.println(prefix + "更新成功");
}
} catch (Exception e) {
System.err.println(prefix + "更新失敗糙申,原因是:" + e.getMessage());
}
}
/**
* 更新數(shù)據(jù):采用錯誤的密碼
*/
static void updateDataByBadAuthentication() {
String prefix = "[使用錯誤的授權(quán)信息]";
System.out.println(prefix + "更新數(shù)據(jù):" + PATH);
try {
ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);
//授權(quán)
badzk.addAuthInfo(authentication_type,badAuthentication.getBytes());
Thread.sleep(2000);
Stat stat = badzk.exists(PATH, false);
if (stat!=null) {
badzk.setData(PATH, prefix.getBytes(), -1);
System.out.println(prefix + "更新成功");
}
} catch (Exception e) {
System.err.println(prefix + "更新失敗,原因是:" + e.getMessage());
}
}
/**
* 更新數(shù)據(jù):采用正確的密碼
*/
static void updateDataByCorrectAuthentication() {
String prefix = "[使用正確的授權(quán)信息]";
System.out.println(prefix + "更新數(shù)據(jù):" + PATH);
try {
Stat stat = zk.exists(PATH, false);
if (stat!=null) {
zk.setData(PATH, prefix.getBytes(), -1);
System.out.println(prefix + "更新成功");
}
} catch (Exception e) {
System.err.println(prefix + "更新失敗,原因是:" + e.getMessage());
}
}
/**
* 不使用密碼 刪除節(jié)點
*/
static void deleteNodeByNoAuthentication() throws Exception {
String prefix = "[不使用任何授權(quán)信息]";
try {
System.out.println(prefix + "刪除節(jié)點:" + PATH_DEL);
ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);
Thread.sleep(2000);
Stat stat = nozk.exists(PATH_DEL, false);
if (stat!=null) {
nozk.delete(PATH_DEL,-1);
System.out.println(prefix + "刪除成功");
}
} catch (Exception e) {
System.err.println(prefix + "刪除失敗柜裸,原因是:" + e.getMessage());
}
}
/**
* 采用錯誤的密碼刪除節(jié)點
*/
static void deleteNodeByBadAuthentication() throws Exception {
String prefix = "[使用錯誤的授權(quán)信息]";
try {
System.out.println(prefix + "刪除節(jié)點:" + PATH_DEL);
ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);
//授權(quán)
badzk.addAuthInfo(authentication_type,badAuthentication.getBytes());
Thread.sleep(2000);
Stat stat = badzk.exists(PATH_DEL, false);
if (stat!=null) {
badzk.delete(PATH_DEL, -1);
System.out.println(prefix + "刪除成功");
}
} catch (Exception e) {
System.err.println(prefix + "刪除失敗缕陕,原因是:" + e.getMessage());
}
}
/**
* 使用正確的密碼刪除節(jié)點
*/
static void deleteNodeByCorrectAuthentication() throws Exception {
String prefix = "[使用正確的授權(quán)信息]";
try {
System.out.println(prefix + "刪除節(jié)點:" + PATH_DEL);
Stat stat = zk.exists(PATH_DEL, false);
if (stat!=null) {
zk.delete(PATH_DEL, -1);
System.out.println(prefix + "刪除成功");
}
} catch (Exception e) {
System.out.println(prefix + "刪除失敗,原因是:" + e.getMessage());
}
}
/**
* 使用正確的密碼刪除節(jié)點
*/
static void deleteParent() throws Exception {
try {
Stat stat = zk.exists(PATH_DEL, false);
if (stat == null) {
zk.delete(PATH, -1);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}