zk 題外話:paxos 與 raft 理解
paxos算法就是為了解決分布式的一致性問(wèn)題的
最大的用途就是保持多個(gè)節(jié)點(diǎn)數(shù)據(jù)一致
由于leader的地位不清晰舶沛,就有了投票選舉leader的操作,那個(gè)leader發(fā)出去峰鄙,再收回來(lái)被認(rèn)可的票數(shù)多灿椅,最終提案就會(huì)被統(tǒng)一raft協(xié)議比paxos的優(yōu)點(diǎn)是 容易理解套蒂,容易實(shí)現(xiàn)。它強(qiáng)化了leader的地位茫蛹,把整個(gè)協(xié)議可以清楚的分割成兩個(gè)部分操刀,并利用日志的連續(xù)性做了一些簡(jiǎn)化:
- leader在時(shí)。由leader向follower同步日志
- leader掛掉了婴洼,選一個(gè)新leader骨坑,leader選舉算法。
zookeeper是一個(gè)分布式協(xié)調(diào)框架
一 理論
- ZNode 節(jié)點(diǎn)狀態(tài):(前兩個(gè)和單獨(dú)柬采,也可和順序性節(jié)點(diǎn)組合)
持久性節(jié)點(diǎn): Persistent
臨時(shí)性節(jié)點(diǎn):Ephemeral
順序性節(jié)點(diǎn):Sequential
- 事務(wù)ID
首先欢唾,事務(wù)是對(duì)物理和抽象的應(yīng)用狀態(tài)上的操作集合。一般事務(wù)通常會(huì)想到數(shù)據(jù)庫(kù)的事務(wù)操作粉捻,包括ACID特性礁遣,即:原子性Atomic,一致性Consistency肩刃,隔離性Isolation祟霍,和持久性Durability
而在 zookeeper 中:
事務(wù)是指能夠改變 Zookeeper 服務(wù)器狀態(tài)的操作杏头,也稱之為事務(wù)操作或更新操作
一般包含數(shù)據(jù)節(jié)點(diǎn) ZNode 的crud操作
-
每個(gè)事務(wù)請(qǐng)求,zk都會(huì)分配一個(gè)全局唯一的事務(wù)ID浅碾,即 ZXID 大州,通常是一個(gè)64位的數(shù)字
(每一次更新操作都對(duì)應(yīng)一個(gè) ZXID,間接突出zk處理更新操作的全局執(zhí)行順序)
- Watcher 數(shù)據(jù)更新變更通知 (監(jiān)聽(tīng))
zk 使用 Watcher 機(jī)制實(shí)現(xiàn)發(fā)布式數(shù)據(jù)的發(fā)布垂谢、訂閱功能厦画;
訂閱關(guān)系為一對(duì)多,可以讓多個(gè)訂閱者同時(shí)監(jiān)聽(tīng)同一個(gè)主題對(duì)象滥朱。
當(dāng)主題對(duì)象發(fā)行變化時(shí)根暑,會(huì)通知所有訂閱者,讓所有訂閱者做出相應(yīng)處理徙邻。zk引用watcher機(jī)制來(lái)實(shí)現(xiàn)這種分布式通知功能排嫌。
zk允許客戶端向服務(wù)端注冊(cè)一個(gè)watcher監(jiān)聽(tīng),當(dāng)服務(wù)端的一些指定事件觸發(fā)了這個(gè)watcher缰犁,那么這個(gè)watcher就會(huì)向指定客戶端發(fā)送事件通知來(lái)實(shí)現(xiàn)分布式的通知功能淳地;
下圖展示了watch的三部分 與 這三部分的工作流程執(zhí)行邏輯
- ACL 保障數(shù)據(jù)的安全
它是保障風(fēng)不是系統(tǒng)運(yùn)行狀態(tài)的數(shù)據(jù)安全的,避免因誤操作帶來(lái)的數(shù)據(jù)變更而導(dǎo)致數(shù)據(jù)異常
ACL ( Access Control List ) :
? 權(quán)限模式 Scheme帅容, 授權(quán)對(duì)象 ID颇象,權(quán)限 Permission,一般使用 "scheme:id:permission" 來(lái)標(biāo)識(shí)一個(gè)有效的ACL信息并徘。
- zk的crud
5.1 create 創(chuàng)建
./zkCli.sh 連接本地的zk服務(wù)器
./zkCli.sh -server ip:port 連接指定的服務(wù)器
create /zk-test1 content123 創(chuàng)建一個(gè)永久節(jié)點(diǎn)
create -s /zk-test2 content123 創(chuàng)建一個(gè)順序(-s)節(jié)點(diǎn)
create -e /zk-test1 content123 創(chuàng)建一個(gè)臨時(shí)節(jié)點(diǎn),會(huì)話異城睬或關(guān)閉節(jié)點(diǎn)就會(huì)被刪除
5.2 讀取節(jié)點(diǎn)
ls path 顯示path目錄下面的節(jié)點(diǎn),跟Linux的一致麦乞,列這個(gè)指定目錄的節(jié)點(diǎn)
get path 或者對(duì)應(yīng)目錄節(jié)點(diǎn)的信息蕴茴,可以展示內(nèi)容什么的
5.3 更新
set path data [version] path是節(jié)點(diǎn)連接和名稱,data是節(jié)點(diǎn)的內(nèi)容,version可以給這個(gè)節(jié)點(diǎn)指定版本
5.4 刪除
delete path [version] 刪除某個(gè)節(jié)點(diǎn)姐直,版本可以自由指定倦淀,也可以不帶,執(zhí)行了简肴,再ls就看不到這個(gè)節(jié)點(diǎn)了
單一刪除有限制晃听,必須先刪除子節(jié)點(diǎn),再刪除父節(jié)點(diǎn)
但是也有父子檢測(cè)輪循刪除的機(jī)制:代碼中調(diào)用有對(duì)應(yīng)的支持方法()
二 Code API
api的使用:
導(dǎo)入jar支持
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
- 建立會(huì)話:
public class CreateSession implements Watcher {
//countDownLatch這個(gè)類使?個(gè)線程等待,主要不讓main?法結(jié)束
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException,
IOException {
/*
客戶端可以通過(guò)創(chuàng)建?個(gè)zk實(shí)例來(lái)連接zk服務(wù)器
new Zookeeper(connectString,sesssionTimeOut,Wather)
connectString: 連接地址:IP:端?
sesssionTimeOut:會(huì)話超時(shí)時(shí)間:?jiǎn)挝缓撩? Wather:監(jiān)聽(tīng)器(當(dāng)特定事件觸發(fā)監(jiān)聽(tīng)時(shí)砰识,zk會(huì)通過(guò)watcher通知到客戶端)
*/
ZooKeeper zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, new
CreateSession());
System.out.println(zooKeeper.getState());
countDownLatch.await();
//表示會(huì)話真正建?
System.out.println("=========Client Connected to
zookeeper == ========");
}
// 當(dāng)前類實(shí)現(xiàn)了Watcher接?,重寫(xiě)了process?法佣渴,
// 該?法負(fù)責(zé)處理來(lái)?Zookeeper服務(wù)端的watcher通知辫狼,在收到服務(wù)端發(fā)送過(guò)來(lái)的SyncConnected事件之后,
// 解除主程序在CountDownLatch上的等待阻塞辛润,?此膨处,會(huì)話創(chuàng)建完畢
public void process(WatchedEvent watchedEvent) {
//當(dāng)連接創(chuàng)建了,服務(wù)端發(fā)送給客戶端SyncConnected事件
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
}
}
- 創(chuàng)建節(jié)點(diǎn)
public class CreateNote implements Watcher {
//countDownLatch這個(gè)類使?個(gè)線程等待,主要不讓main?法結(jié)束
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, new CreateNote());
countDownLatch.await();
}
public void process(WatchedEvent watchedEvent) {
//當(dāng)連接創(chuàng)建了,服務(wù)端發(fā)送給客戶端SyncConnected事件
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
//調(diào)?創(chuàng)建節(jié)點(diǎn)?法
try {
createNodeSync();
} catch (Exception e) {
e.printStackTrace();
}
}
private void createNodeSync() throws Exception {
/**
* path :節(jié)點(diǎn)創(chuàng)建的路徑
* data[] :節(jié)點(diǎn)創(chuàng)建要保存的數(shù)據(jù)真椿,是個(gè)byte類型的
* acl :節(jié)點(diǎn)創(chuàng)建的權(quán)限信息(4種類型)
* ANYONE_ID_UNSAFE : 表示任何?
* AUTH_IDS :此ID僅可?于設(shè)置ACL鹃答。它將被客戶機(jī)驗(yàn)證的ID替
換。
* OPEN_ACL_UNSAFE :這是?個(gè)完全開(kāi)放的ACL(常?)-->
world:anyone
* CREATOR_ALL_ACL :此ACL授予創(chuàng)建者身份驗(yàn)證ID的所有權(quán)限
* createMode :創(chuàng)建節(jié)點(diǎn)的類型(4種類型)
* PERSISTENT:持久節(jié)點(diǎn)
* PERSISTENT_SEQUENTIAL:持久順序節(jié)點(diǎn)
* EPHEMERAL:臨時(shí)節(jié)點(diǎn)
* EPHEMERAL_SEQUENTIAL:臨時(shí)順序節(jié)點(diǎn)
String node = zookeeper.create(path,data,acl,createMode);
*/
String node_PERSISTENT = zooKeeper.create("/lg_persistent", "持久節(jié)點(diǎn)內(nèi)容".getBytes(" utf - 8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
String node_PERSISTENT_SEQUENTIAL =
zooKeeper.create("/lg_persistent_sequential", "持久節(jié)點(diǎn)內(nèi)容".getBytes("utf-8"),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
String node_EPERSISTENT = zooKeeper.create("/lg_ephemeral", "臨時(shí)節(jié)點(diǎn)內(nèi)容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("創(chuàng)建的持久節(jié)點(diǎn)是:" + node_PERSISTENT);
System.out.println("創(chuàng)建的持久順序節(jié)點(diǎn)是:" + node_PERSISTENT_SEQUENTIAL);
System.out.println("創(chuàng)建的臨時(shí)節(jié)點(diǎn)是:" + node_EPERSISTENT);
}
}
- 獲取節(jié)點(diǎn)數(shù)據(jù)
public class GetNoteData implements Watcher {
//countDownLatch這個(gè)類使?個(gè)線程等待,主要不讓main?法結(jié)束
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper("10.211.55.4:2181", 10000, new
GetNoteDate());
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent watchedEvent) {
//?節(jié)點(diǎn)列表發(fā)?變化時(shí)突硝,服務(wù)器會(huì)發(fā)出NodeChildrenChanged通知测摔,但不會(huì)把變化情況告
訴給客戶端
// 需要客戶端??獲取,且通知是?次性的解恰,需反復(fù)注冊(cè)監(jiān)聽(tīng)
if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
//再次獲取節(jié)點(diǎn)數(shù)據(jù)
try {
List<String> children =
zooKeeper.getChildren(watchedEvent.getPath(), true);
System.out.println(children);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//當(dāng)連接創(chuàng)建了锋八,服務(wù)端發(fā)送給客戶端SyncConnected事件
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
try {
//調(diào)?獲取單個(gè)節(jié)點(diǎn)數(shù)據(jù)?法
getNoteDate();
getChildrens();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static void getNoteData() throws Exception {
/**
path : 獲取數(shù)據(jù)的路徑
watch : 是否開(kāi)啟監(jiān)聽(tīng)
stat : 節(jié)點(diǎn)狀態(tài)信息
null: 表示獲取最新版本的數(shù)據(jù)
zk.getData(path, watch, stat);
*/
byte[] data = zooKeeper.getData("/lg_persistent/lg-children", true, null);
System.out.println(new String(data, "utf-8"));
}
private static void getChildrens() throws KeeperException, InterruptedException {
/*
path:路徑
watch:是否要啟動(dòng)監(jiān)聽(tīng),當(dāng)?節(jié)點(diǎn)列表發(fā)?變化护盈,會(huì)觸發(fā)監(jiān)聽(tīng)
zooKeeper.getChildren(path, watch);
*/
List<String> children = zooKeeper.getChildren("/lg_persistent", true);
System.out.println(children);
}
}
- 修改節(jié)點(diǎn)數(shù)據(jù)
public class updateNote implements Watcher {
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, new updateNote());
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent watchedEvent) {
//當(dāng)連接創(chuàng)建了挟纱,服務(wù)端發(fā)送給客戶端SyncConnected事件
try {
updateNodeSync();
} catch (Exception e) {
e.printStackTrace();
}
}
private void updateNodeSync() throws Exception {
/*
path:路徑
data:要修改的內(nèi)容 byte[]
version:為-1,表示對(duì)最新版本的數(shù)據(jù)進(jìn)?修改
zooKeeper.setData(path, data,version);
*/
byte[] data = zooKeeper.getData("/lg_persistent", false, null);
System.out.println("修改前的值:" + new String(data));
//修改 stat:狀態(tài)信息對(duì)象 -1:最新版本
Stat stat = zooKeeper.setData("/lg_persistent", "客戶端修改內(nèi)容".getBytes(), -1);
byte[] data2 = zooKeeper.getData("/lg_persistent", false, null);
System.out.println("修改后的值:" + new String(data2));
}
}
- 刪除節(jié)點(diǎn)
public class DeleteNote implements Watcher {
private static ZooKeeper zooKeeper;
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, new DeleteNote());
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent watchedEvent) {
//當(dāng)連接創(chuàng)建了腐宋,服務(wù)端發(fā)送給客戶端SyncConnected事件
try {
deleteNodeSync();
} catch (Exception e) {
e.printStackTrace();
}
}
private void deleteNodeSync() throws KeeperException, InterruptedException {
/*
zooKeeper.exists(path,watch) :判斷節(jié)點(diǎn)是否存在
zookeeper.delete(path,version) : 刪除節(jié)點(diǎn)
*/
Stat exists = zooKeeper.exists("/lg_persistent/lg-children", false);
System.out.println(exists == null ? "該節(jié)點(diǎn)不存在" : "該節(jié)點(diǎn)存在");
zooKeeper.delete("/lg_persistent/lg-children", -1);
Stat exists2 = zooKeeper.exists("/lg_persistent/lg-children", false);
System.out.println(exists2 == null ? "該節(jié)點(diǎn)不存在" : "該節(jié)點(diǎn)存在");
}
}
三 Code Client
client 的使用:
導(dǎo)入jar支持
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.2</version>
</dependency>
- 建立會(huì)話:
import java.io.IOException;
import org.I0Itec.zkclient.ZkClient;
public class CreateSession {
/*
創(chuàng)建?個(gè)zkClient實(shí)例來(lái)進(jìn)?連接
注意:zkClient通過(guò)對(duì)zookeeperAPI內(nèi)部包裝紊服,將這個(gè)異步的會(huì)話創(chuàng)建過(guò)程同步化了
*/
public static void main(String[] args) {
ZkClient zkClient = new ZkClient("127.0.0.1:2181");
System.out.println("ZooKeeper session established.");
}
}
- 創(chuàng)建節(jié)點(diǎn)
import org.I0Itec.zkclient.ZkClient;
public class Create_Node_Sample {
public static void main(String[] args) {
ZkClient zkClient = new ZkClient("127.0.0.1:2181");
System.out.println("ZooKeeper session established.");
//createParents的值設(shè)置為true,可以遞歸創(chuàng)建節(jié)點(diǎn)
zkClient.createPersistent("/lg-zkClient/lg-c1", true);
System.out.println("success create znode.");
}
}
- 刪除節(jié)點(diǎn)
import org.I0Itec.zkclient.ZkClient;
public class Del_Data_Sample {
public static void main(String[] args) throws Exception {
String path = "/lg-zkClient/lg-c1";
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
zkClient.deleteRecursive(path);
System.out.println("success delete znode.");
}
}
- 獲取子節(jié)點(diǎn)
import java.util.List;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
public class Get_Children_Sample {
public static void main(String[] args) throws Exception {
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
List<String> children = zkClient.getChildren("/lg-zkClient");
System.out.println(children);
//注冊(cè)監(jiān)聽(tīng)事件
zkClient.subscribeChildChanges(path, new IZkChildListener() {
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println(parentPath + " 's child changed, currentChilds:" + currentChilds);
}
});
zkClient.createPersistent("/lg-zkClient");
Thread.sleep(1000);
zkClient.createPersistent("/lg-zkClient/c1");
Thread.sleep(1000);
zkClient.delete("/lg-zkClient/c1");
Thread.sleep(1000);
zkClient.delete(path);
Thread.sleep(Integer.MAX_VALUE);
}
}
運(yùn)行結(jié)果:
/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:[c1]
/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:null
- 獲取數(shù)據(jù)(節(jié)點(diǎn)是否存在胸竞、更新欺嗤、刪除)
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
public class Get_Data_Sample {
public static void main(String[] args) throws InterruptedException {
String path = "/lg-zkClient-Ep";
ZkClient zkClient = new ZkClient("127.0.0.1:2181");
//判斷節(jié)點(diǎn)是否存在
boolean exists = zkClient.exists(path);
if (!exists) {
zkClient.createEphemeral(path, "123");
}
//注冊(cè)監(jiān)聽(tīng)
zkClient.subscribeDataChanges(path, new IZkDataListener() {
public void handleDataChange(String path, Object data) throws Exception {
System.out.println(path + "該節(jié)點(diǎn)內(nèi)容被更新,更新后的內(nèi)容" + data);
}
public void handleDataDeleted(String s) throws Exception {
System.out.println(s + " 該節(jié)點(diǎn)被刪除");
}
});
//獲取節(jié)點(diǎn)內(nèi)容
Object o = zkClient.readData(path);
System.out.println(o);
//更新
zkClient.writeData(path, "4567");
Thread.sleep(1000);
//刪除
zkClient.delete(path);
Thread.sleep(1000);
}
}
運(yùn)行結(jié)果:
123
/lg-zkClient-Ep該節(jié)點(diǎn)內(nèi)容被更新撤师,更新后的內(nèi)容4567
/lg-zkClient-Ep 該節(jié)點(diǎn)被刪除
四 Code Curator 客戶端
Curator 的使用:
這個(gè)跟client是類似的剂府,只是編程風(fēng)格為Fluent的風(fēng)格,就是lombok那種鏈?zhǔn)降牟僮魈甓埽c(diǎn)點(diǎn)點(diǎn)那種
導(dǎo)入jar支持
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
- 建立會(huì)話:
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
- 通過(guò)調(diào)用 CuratorFramework中的 Start() 方法來(lái)啟動(dòng)會(huì)話:
// 片段一
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client =
CuratorFrameworkFactory.newClient("127.0.0.1:2181",retryPolicy);
client.start();
// 片段二
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",
5000,1000,retryPolicy);
client.start();
// 片段三
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
private static CuratorFramework Client = CuratorFrameworkFactory.builder()
.connectString("server1:2181,server2:2181,server3:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(retryPolicy)
.build();
client.start();
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class Create_Session_Sample {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000, 3000, retryPolicy);
client.start();
System.out.println("Zookeeper session1 established. ");
CuratorFramework client1 = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") //server地址
.sessionTimeoutMs(5000) // 會(huì)話超時(shí)時(shí)間
.connectionTimeoutMs(3000) // 連接超時(shí)時(shí)間
.retryPolicy(retryPolicy) // 重試策略
.namespace("base") // 獨(dú)立命名空間/base
.build(); //
client1.start();
System.out.println("Zookeeper session2 established. ");
}
}