目錄
1.zookeeper是什么
2.zookeeper使用
2.1.api介紹
2.2.使用案例
2.2.1.概述
2.2.2.準備工作
2.2.3.配置中心
2.2.4.分布式鎖
2.2.5.小結
3.集群搭建
3.1.準備工作
3.2.配置文件詳解
3.3.啟動
4.原理
4.1.獨立模式
4.1.1.啟動步驟
4.1.2.請求處理過程
4.1.3.請求處理器詳解
4.2.核心數(shù)據(jù)結構
4.2.1.DataTree
4.2.2.ZKDatabase(簡稱zkdb)
4.3.持久化及重建機制
4.3.1.概述
4.3.2.SnapLog
4.3.3.TxnLog
4.3.4.FileSnapTxnLog
4.4.集群模式運行過程
4.4.1.概述
4.4.2.選舉(FastLeaderElection)
4.4.3.集群初始化
4.4.3.1.內部通信數(shù)據(jù)結構
4.4.3.2.初始化過程
4.4.4.對外服務(請求處理器)
4.4.4.1.Server體系概述
4.4.4.2.LeaderZooKeeperServer
4.4.4.3.FollowerZooKeeperServer:
5.總結
1.zookeeper是什么
官方解釋:zookeeper(以下簡稱zk)是一個高可用的分布式協(xié)調服務
-
是一個內存級數(shù)據(jù)管理系統(tǒng)
zk提供內存級的數(shù)據(jù)存儲功能,它內部定義了一個核心的數(shù)據(jù)結構DataTree,用來存儲數(shù)據(jù).DataTree是一個類似于文件系統(tǒng)的多叉樹結構,樹的每個節(jié)點類型為DataNode,俗稱為ZNode.
每個ZNode具有一個指向父節(jié)點的引用變量,和一個保存子節(jié)點名稱的集合(Set<String>),以及用來保存節(jié)點數(shù)據(jù)的字節(jié)數(shù)據(jù).
與此同時,所有ZNode被以平鋪的方式保存在一個Map<String,DataNode>對象中,key為ZNode的完整路徑,value為ZNode本身.
- 是一個高可用的分布式集群
zk是一個多副本集群,集群中每一個節(jié)點都保存一個完整的DataTree數(shù)據(jù)副本.
集群使用ZAB(zookeeper atomic broadcast)協(xié)議解決多副本的數(shù)據(jù)一致性問題,使其具有極高的可用性. - 是一個協(xié)調服務
zk有3大特性:
1.寫操作嚴格有序:所有寫操作按請求順序執(zhí)行,在同一時間并發(fā)修改同一個ZNode時,只有一個請求能成功.這種特性就是俗稱的master選舉模式.
2.watch機制:zk支持推拉結合的發(fā)布訂閱模式,可以在讀取某個節(jié)點數(shù)據(jù)的同時對該節(jié)點設置監(jiān)視器(原子操作),以監(jiān)視從讀取那一刻起該節(jié)點后續(xù)發(fā)生的數(shù)據(jù)變更.
3.臨時節(jié)點:ZNode的生命周期默認是從創(chuàng)建那一刻起一直存在直到被刪除,同時zk也支持創(chuàng)建臨時節(jié)點,臨時節(jié)點生命周期與Session會話一致,會話中斷節(jié)點也隨之被刪除.
2.zookeeper使用
2.1.api介紹
zk為客戶端提供了對DataTree的8種操作,如下:
1.創(chuàng)建ZNode節(jié)點
2.刪除ZNode節(jié)點
3.變更ZNode節(jié)點數(shù)據(jù)
4.變更ZNode節(jié)點權限
5.查詢節(jié)點是否存在(可同時配置監(jiān)視器)
6.查詢節(jié)點數(shù)據(jù)(可同時配置監(jiān)視器)
7.查詢節(jié)點權限
8.查詢子節(jié)點名稱(可同時配置監(jiān)視器)
以上這8種操作,zk均提供了同步調用和異步調用兩種調用方式,但是實際上,客戶端與服務端都是采用異步的方式進行通信,客戶端內部通過線程通信(wait && notify)實現(xiàn)異步轉同步的操作.
同步api如下:
public interface ClientAPI extends Closeable {
String create(String path, byte data[], List<ACL> acls, CreateMode createMode) throws KeeperException, InterruptedException;
void delete(String path, int version) throws InterruptedException, KeeperException;
Status setData(String path, byte data[], int version) throws KeeperException, InterruptedException;
Status setACL(String path, List<ACL> acls, int version) throws KeeperException, InterruptedException;
Status exists(String path, boolean watch) throws KeeperException, InterruptedException;
Status exists(String path, Watcher watcher) throws KeeperException, InterruptedException;
byte[] getData(String path, boolean watch, Status status) throws KeeperException, InterruptedException;
byte[] getData(String path, Watcher watcher, Status status) throws KeeperException, InterruptedException;
List<ACL> getACL(String path, Status status) throws KeeperException, InterruptedException;
List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException;
List<String> getChildren(String path, boolean watch, Status status) throws KeeperException, InterruptedException;
List<String> getChildren(final String path, Watcher watcher) throws KeeperException, InterruptedException;
List<String> getChildren(final String path, Watcher watcher, Status status) throws KeeperException, InterruptedException;
}
異步api如下:
public interface AsyncClientAPI extends Closeable {
void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback callback, Object context);
void delete(final String path, int version, VoidCallback callback, Object context);
void setData(final String path, byte data[], int version, StatCallback callback, Object context);
void setACL(final String path, List<ACL> acl, int version, StatCallback callback, Object context);
void exists(String path, boolean watch, StatCallback callback, Object context);
void exists(final String path, Watcher watcher, StatCallback callback, Object context);
void getData(final String path, boolean watch, DataCallback callback, Object context);
void getData(final String path, Watcher watcher, DataCallback callback, Object context);
void getACL(final String path, Status stat, ACLCallback callback, Object context);
void getChildren(final String path, boolean watch, ChildrenCallback callback, Object context);
void getChildren(final String path, boolean watch, Children2Callback callback, Object context);
void getChildren(final String path, Watcher watcher, ChildrenCallback callback, Object context);
void getChildren(final String path, Watcher watcher, Children2Callback callback, Object context);
}
除了上述操作之外,zk還提供了事務操作api,可以把多個寫操作合成為一個原子操作事務(要么全部成功要么全部失敗),api如下:
public interface TransactionAPI {
List<OpResult> multi(Iterable<Op> ops) throws InterruptedException, KeeperException;
Transaction transaction();
}
multi()
方法用于一次向zk服務器發(fā)送多個操作.
當然,我們也可以調用transaction()
方法,以一種鏈式調用方式構建原子操作.
public class Transaction {
private ZooKeeper zk;
private List<Op> ops = new ArrayList<>();
Transaction(ZooKeeper zk) {
this.zk = zk;
}
public Transaction create(final String path, byte data[], List<ACL> acl, CreateMode createMode) {
ops.add(Op.create(path, data, acl, createMode.toFlag()));
return this;
}
public Transaction delete(final String path, int version) {
ops.add(Op.delete(path, version));
return this;
}
public Transaction check(String path, int version) {
ops.add(Op.check(path, version));
return this;
}
public Transaction setData(final String path, byte data[], int version) {
ops.add(Op.setData(path, data, version));
return this;
}
public List<OpResult> commit() throws InterruptedException, KeeperException {
return zk.multi(ops);
}
}
2.2.使用案例
2.2.1.概述
在回顧一下zk的3大特性:
1.寫操作嚴格有序
2.watch機制
3.臨時節(jié)點
利用這三大特性,我們可以進一步封裝,實現(xiàn)一些業(yè)務組件,如:分布式鎖,配置中心,主備切換,負載均衡,服務發(fā)現(xiàn),任務調度,等等.
下面通過一些demo來演示如何使用zk.
2.2.2.準備工作
首先需要引入zk的構件,maven坐標如下:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.5</version>
</dependency>
2.2.3.案例一:配置中心
接下來的例子是模擬一個美團mcc的實現(xiàn),在這個例子中,我們將/mcc/{appkey}定義為我們獲取配置數(shù)據(jù)的路徑,ZNode中的數(shù)據(jù)存儲Properties序列化后的數(shù)據(jù).
ConfigUtilAdapter初始化時,通過getData拉取服務配置,拉取的同時,向zk集群注冊watch,來監(jiān)聽ZNode數(shù)據(jù)的變化,當接到數(shù)據(jù)變更事件時,重新進行數(shù)據(jù)拉取(并再次注冊watch),如此循環(huán).
代碼實現(xiàn)如下:
public class MccDemo {
/**
* 模擬美團的mcc配置工具類
*/
public static class ConfigUtilAdapter implements Watcher {
public static final String CONFIG_PATH = "/mcc/${yourAppkey}";
private ConcurrentHashMap<String, String> config = new ConcurrentHashMap<>();
private static volatile boolean init = false;
private ZooKeeper client;
private static ConfigUtilAdapter instance;
public static void init() {
if (init) {
return;
}
instance = new ConfigUtilAdapter();
}
public ConfigUtilAdapter() {
try {
client = new ZooKeeper("ip:port", 1000, this, false);
} catch (IOException e) {
throw new IllegalStateException("初始化失敗", e);
}
pullData();
}
private synchronized void pullData() {
try {
byte[] data = client.getData(CONFIG_PATH, this, null);
Properties props = new Properties();
props.load(new ByteArrayInputStream(data));
config.clear();
for (Map.Entry<Object, Object> entry : props.entrySet()) {
config.put(entry.getKey().toString(), entry.getValue().toString());
}
} catch (Exception e) {
throw new IllegalStateException("拉取配置失敗", e);
}
}
public static String getString(String key) {
return instance.config.get(key);
}
@Override
public void process(WatchedEvent event) {
if ((event.getType() == EventType.NodeDataChanged) && (CONFIG_PATH.equals(event.getPath()))) {
pullData();
}
}
}
public static void main(String[] args) throws Exception {
mccInit();
ConfigUtilAdapter.init();
Thread.sleep(2000);
String value = ConfigUtilAdapter.getString("key");
System.out.println("第1次獲取配置:" + value);
mccCentralChange();
Thread.sleep(500);
value = ConfigUtilAdapter.getString("key");
System.out.println("第2次獲取配置:" + value);
}
private static void mccCentralChange() throws Exception {
try (ZooKeeper client = new ZooKeeper("ip:port", 1000, even -> {
}, false)) {
Properties props = new Properties();
props.put("key", "helloWorld");
ByteArrayOutputStream out = new ByteArrayOutputStream();
props.store(out, null);
client.setData("/mcc/{yourAppkey}", out.toByteArray(), 0);
}
}
/**
* 此初始化過程相當于服務申請時伴隨appkey產生而執(zhí)行的初始化操作
*
* @throws Exception
*/
public static void mccInit() throws Exception {
try (ZooKeeper client = new ZooKeeper("172.18.212.149:2189", 1000, even -> {
}, false)) {
try {
Status status = client.exists(CONFIG_PATH, false);
client.delete(CONFIG_PATH, status.getVersion());
} catch (Exception e) {
e.printStackTrace();
}
try {
Status status = client.exists("/mcc", false);
client.delete("/mcc", status.getVersion());
} catch (Exception e) {
e.printStackTrace();
}
client.create("/mcc", new byte[0], ZooDefs.AclLists.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
client.create("/mcc/${yourAppkey}", new byte[0], ZooDefs.AclLists.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
}
代碼運行結果如下:
第1次獲取配置:null
第2次獲取配置:helloWorld
2.2.4.案例二:分布式鎖
加鎖
分布式鎖的阻塞式獲取,其實就是鎖資源的并發(fā)競爭,在zk的視角里,就是對同一個path對應的ZNode的競爭創(chuàng)建.
zk的進程間同步的特性保證的同一時間,只有有一個client成功創(chuàng)建節(jié)點,其他client都會創(chuàng)建失敗并提示NodeExist.
當client收到NodeExist的提示時,說明自己加鎖沒有成功,則此時需要進行鎖等待,只要利用zk的watch機制監(jiān)控鎖對應的ZNode變更,
來喚醒等待線程,進行加鎖重試,即可完成整個加鎖流程.
釋放鎖
鎖的釋放分兩種情況:
1.client宕機,server檢測不到心跳,則當達到會話超時時間時,server中的SessionTracker會自動刪除會話,并同時刪除該會話創(chuàng)建的所有臨時節(jié)點,從而鎖節(jié)點被釋放(利用zk的臨時節(jié)點特性)
2.client在執(zhí)行完業(yè)務邏輯后,主動進行鎖釋放,其實就是主動調用delete進行節(jié)點刪除
粗略的代碼實現(xiàn)如下(只為了驗證zk的特性,因此很多異常處理的邏輯被省略掉了)
public class LockDemo {
/**
* zk實現(xiàn)的分布式鎖
*/
public static class ZooKeeperLock implements Watcher {
private static final Logger LOG = LoggerFactory.getLogger(com.lixin.lock.ZooKeeperLock.class);
private static final String LOCK_PATH = "/lock";
private ZooKeeper client = new ZooKeeper("172.18.212.149:2189", 60000, this, false);
private Object lock = new Object();
public ZooKeeperLock() throws IOException {
while (!client.getClientState().isConnected()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.error("", e);
}
}
}
public void lock() throws Exception {
boolean success = false;
while (!success) {
try {
client.create(LOCK_PATH, new byte[0], ZooDefs.AclLists.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
success = true;
} catch (KeeperException e) {
if (e.code() == Code.NODEEXISTS) {
synchronized (lock) {
client.exists(LOCK_PATH, this);
lock.wait();
}
}
}
}
LOG.info(Thread.currentThread().getName() + "獲取鎖成功");
}
public boolean tryLock() {
try {
client.create(LOCK_PATH, new byte[0], ZooDefs.AclLists.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
return true;
} catch (Exception e) {
return false;
}
}
public void unlock() throws Exception {
client.delete(LOCK_PATH, 0);
LOG.info(Thread.currentThread().getName() + "釋放鎖成功");
}
@Override
public void process(WatchedEvent event) {
if ((event.getType() == EventType.NodeDeleted) && (LOCK_PATH.equals(event.getPath()))) {
synchronized (lock) {
lock.notifyAll();
}
}
}
}
/**
* 競爭鎖的線程,這里模擬加鎖->業(yè)務操作->釋放鎖的流程
*/
public static class FightThread extends Thread {
public FightThread(String name) {
super(name);
}
@Override
public void run() {
ZooKeeperLock lock;
try {
lock = new ZooKeeperLock();
} catch (IOException e) {
throw new RuntimeException(e);
}
System.out.println("開始競爭鎖");
while (true) {
try {
lock.lock();
Thread.sleep(2000);
}catch (Exception e){
e.printStackTrace();
}finally {
try {
lock.unlock();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
public static void main(String[] args) throws Exception{
FightThread t1 = new FightThread("線程-1");
t1.setDaemon(true);
FightThread t2 = new FightThread("線程-2");
t2.setDaemon(true);
t1.start();
t2.start();
Thread.sleep(60000);
}
}
該Demo的執(zhí)行結果如下:
開始競爭鎖
開始競爭鎖
2018-07-31 14:15:34,666 [myid:] - INFO [線程-2:ZooKeeperLock@53] - 線程-2獲取鎖成功
2018-07-31 14:15:36,675 [myid:] - INFO [線程-2:ZooKeeperLock@69] - 線程-2釋放鎖成功
2018-07-31 14:15:36,678 [myid:] - INFO [線程-2:ZooKeeperLock@53] - 線程-2獲取鎖成功
2018-07-31 14:15:38,687 [myid:] - INFO [線程-2:ZooKeeperLock@69] - 線程-2釋放鎖成功
2018-07-31 14:15:38,690 [myid:] - INFO [線程-2:ZooKeeperLock@53] - 線程-2獲取鎖成功
2018-07-31 14:15:40,699 [myid:] - INFO [線程-2:ZooKeeperLock@69] - 線程-2釋放鎖成功
2018-07-31 14:15:40,701 [myid:] - INFO [線程-1:ZooKeeperLock@53] - 線程-1獲取鎖成功
2018-07-31 14:15:42,707 [myid:] - INFO [線程-1:ZooKeeperLock@69] - 線程-1釋放鎖成功
2018-07-31 14:15:42,709 [myid:] - INFO [線程-2:ZooKeeperLock@53] - 線程-2獲取鎖成功
2018-07-31 14:15:44,717 [myid:] - INFO [線程-2:ZooKeeperLock@69] - 線程-2釋放鎖成功
2018-07-31 14:15:44,718 [myid:] - INFO [線程-1:ZooKeeperLock@53] - 線程-1獲取鎖成功
2018-07-31 14:15:46,726 [myid:] - INFO [線程-1:ZooKeeperLock@69] - 線程-1釋放鎖成功
2018-07-31 14:15:46,729 [myid:] - INFO [線程-2:ZooKeeperLock@53] - 線程-2獲取鎖成功
2018-07-31 14:15:48,736 [myid:] - INFO [線程-2:ZooKeeperLock@69] - 線程-2釋放鎖成功
2018-07-31 14:15:48,738 [myid:] - INFO [線程-1:ZooKeeperLock@53] - 線程-1獲取鎖成功
2.2.5.小結
- 我們利用zk的watch特性,模擬了美團配置中心的實現(xiàn).基于這一特性,我們還可以實現(xiàn)負載均衡,服務發(fā)現(xiàn)等相關問題的解決方案.
- 我們還可以利用zk的順序寫特性,實現(xiàn)任務調度系統(tǒng)(crane),主備切換(ResourceManager單點問題解決)等相關系統(tǒng),這類問題統(tǒng)稱為master選舉.這里未舉例子.
- 最后,分布式鎖的案例,則是zk三大特性的綜合利用.
3.集群搭建
3.1.準備工作
下載zookeeper分發(fā)包,本文使用的版本為3.4.5,對應下載地址:https://archive.apache.org/dist/zookeeper/zookeeper-3.4.5/
3.2.配置文件詳解
zk服務器啟動時,需要讀取配置文件對自身服務進行配置,配置文件初始位置在分發(fā)包的conf/zoo_sample.cfg.
zk服務器啟動時默認讀取的路徑為conf/zoo.cfg.
配置文件本質是一個properties文件,服務器啟動初始,會首先解析配置文件,在內存中創(chuàng)建一個類型為QuorumPeerConfig的對象(文件解析邏輯也在其中).
下面詳細解釋下文件中各項配置的含義和作用:
# 快照存儲目錄,無默認值
dataDir=
# 事務存儲目錄,無默認值
dataLogDir=
# 集群節(jié)點對外提供服務的監(jiān)聽端口,默認2181
clientPort=2181
# 集群節(jié)點對外提供服務的監(jiān)聽host地址,默認監(jiān)聽0.0.0.0
clientPortAddress=0.0.0.0
# 每個客戶端ip的最大連接數(shù)限制,默認60
maxClientCnxns=60
# zk服務器的時鐘周期定義,它是一個時間單位度量,單位為毫秒
# 默認值為3000,也就是說一個時鐘周期為3秒
tickTime=3000
######################################################
# 最小和最大會話超時時間,用于限制客戶端連接的超時時間設置
# 如果客戶端超時值比最小值小,則被修正為最小值
# 如果客戶端超市之比最大值大,則被修正為最大值
######################################################
# 客戶端會話的最小超時時間,-1代表使用默認值,默認為2個tickTime
minSessionTimeout=-1
# 客戶端會話的最大超時時間,-1代表使用默認值,默認為20個tickTime
maxSessionTimeout=-1
######################################################
# 以下兩個配置,用于配置數(shù)據(jù)目錄清理管理器(DatadirCleanupManager)的運行邏輯
######################################################
# 清理數(shù)據(jù)時,需要保存的快照數(shù)量,默認值為3,且配置值至少為3
autopurge.snapRetainCount=3
# 運行清理任務的時間間隔,單位為小時,默認值為1,負數(shù)和0代表代表不執(zhí)行清理
autopurge.purgeInterval=1
#-----------------------------------------------------------------------------------------------------------------------
######################################################
# 以上為獨立模式和集群模式都需要配置的屬性
# 以下為集群模式的專用配置
######################################################
# 集群初始化階段的網(wǎng)絡讀取超時時間,單位為tickTime,無默認值
# 從選舉結束到集群對外提供服務的這段時間,稱之為初始化階段
# 初始化需要經過epoch協(xié)商,以及為達到一致性狀態(tài)leader向follower發(fā)起的數(shù)據(jù)傳輸
initLimit=10
# 集群對外服務階段,leader向follower的同步數(shù)據(jù)的網(wǎng)絡讀取超時時間,單位為tickTime,無默認值
syncLimit=5
# 集群群首選舉時使用的算法標識,當前只支持3快速群首選舉(FastLeaderElection,簡稱FLE)
electionAlg=3
# 當前服務器節(jié)點的角色,可選值為:participant(參與者) or observer(觀察者)
# 默認為參與者
peerType=participant
# 節(jié)點配置,格式為: server.sid=ip:syncPort:electionPort:role
#server.1=127.0.0.1:2183:2183:participant
#server.2=192.168.0.1:2183:2183:participant
######################################################
# 以下為組策略配置,如果進行了組配置,zk會改變仲裁策略
# 默認仲裁策略QuorumMajority為數(shù)量過半
# 配置組策略后,仲裁策略變?yōu)镼uorumHierarchical分層仲裁,即組內服務器過半的組的數(shù)量要過半
# 組策略在zk多機房多機架部署下,可以更好提高集群可靠性
# 集群組配置,格式為: group.gid=sid1:sid2:......
#group.1=1:2
# 節(jié)點權重,格式為: server.sid=weight,默認權重都為1
# 上述所說的數(shù)量過半中的[數(shù)量],實際是權重之和
#weight.1=1
#weight.2=1
######################################################
# 剩余的其他key/value配置,都會給key追加"zookeeper."前綴
# 用于zk的擴展配置
######################################################
注意:集群服務器節(jié)點個數(shù)建議配置為奇數(shù),這不是必須的,但是是相對最優(yōu)的.
舉例說明:相比于一個5個服務器的集群(描述為5n)而言,
(1)4個服務器的集群,則更加脆弱(5n允許2個服務器宕機,而4n只能允許1個服務器宕機);
(2)6個服務器的集群,則會使集群法定人數(shù)增加(5n法定人數(shù)為3,而6n法定人數(shù)為4),進而使服務器需要更多的確認操作.
3.2.啟動
把配置好的文件,放置到conf/zoo.cfg位置
執(zhí)行命令
# 啟動服務
bin/zkServer.sh start
# 停止服務
bin/zkServer.sh stop
使用client測試服務啟動是否成功
ZooKeeper zk = new ZooKeeper("10.4.236.198:2181,10.4.233.228:2181,10.4.244.77:2181", Integer.MAX_VALUE, new Watcher() {
@Override
public void process(WatchedEvent event) {
KeeperState state = event.getState();
if (KeeperState.SyncConnected.equals(state)||KeeperState.ConnectedReadOnly.equals(state)) {
System.out.println("連接成功");
}
}
}, false);
4.原理
4.1.獨立模式
4.1.1.啟動步驟
zk服務器啟動分為以下步驟:
1.啟動服務端口監(jiān)聽(ServerCnxnFactory)
2.啟動zkDatabase(創(chuàng)建內存實例,從文件加載數(shù)據(jù))
3.啟動會話跟蹤器(SessionTracker)
4.安裝并啟動請求處理器(RequestProcessor)
之后zk服務就開始對外提供服務
4.1.2.請求處理過程
客戶端連接服務器,TCP3次握手成功后,
客戶端和服務器還需要通過上層數(shù)據(jù)包的方式,進行一些協(xié)商,數(shù)據(jù)包統(tǒng)一用Packet封裝.
這個過程會包含兩個特殊的包交互:
1.第一個包負責會話的初始化,包含會話超時時間協(xié)商,會話id分配,zxid校驗
2.第二個包負責進行鑒權,客戶端通過ZooKeeper.addAuthInfo()添加自己的認證信息,向服務器發(fā)包,服務器進行認證,認證失敗會關閉會話.
除連接包和認證包由底層傳輸層處理外,客戶端后續(xù)發(fā)來的操作請求,會分別經過:
1.PrepRequestProcessor
2.SyncRequestProcessor
3.FinalRequestProcessor
最終被服務器端處理成功后,回發(fā)響應.
一次請求過程大致如下圖所示.
4.1.3.請求處理器詳解
下面分別介紹不同的請求處理器的職責:(先上圖)
- PrepRequestProcessor(準備請求處理器):
該處理器是一個單獨的線程,當接收到ServerCnxnFactory發(fā)來的請求后,只是簡單將請求加入到提交隊列中,
之后由線程邏輯一直循環(huán)消費隊列并處理器請求.
當前處理器最主要的職責,就是為寫操作生成對應的事務對象,之后調用下一個處理器進行繼續(xù)處理. - SyncRequestProcessor(同步請求處理器):
該處理器同樣是一個單獨線程,收到請求后,將請求對象添加到隊列中,由線程邏輯一直循環(huán)消費隊列,處理請求.
當前處理的職責很單一,就是把請求中附帶的事務對象序列化到磁盤當中,
并定期進行事務文件的滾動(就像log4j的rollingFile一定,定期創(chuàng)建新的文件),
以及保存快照操作(把當前內存中的DataTree和SessionMap序列化到磁盤文件中).
處理器會切換執(zhí)行兩套邏輯,
當請求隊列中存在請求時,則會優(yōu)先處理請求,把請求中的事務對象追加到日志文件流中,并把請求對象加入刷新隊列中;
當請求隊列中沒有要處理的請求時,則會執(zhí)行刷新操作(flush),關閉所有打開的流文件,確保數(shù)據(jù)全部落盤,之后把刷新隊列中的請求全部移交給下一個處理器進行處理. - FinalRequestProcessor(最終請求處理器)
該處理器是請求處理的終點,負責應用請求對象中的事務,變更內存中的數(shù)據(jù)結構,并最終給客戶端響應結果.
當前處理器不是線程,會占用前一個處理器線程的資源執(zhí)行.
名詞解釋:
演變記錄(ChangeRecord),負責記錄某一次事務請求操作后某個ZNode節(jié)點的結果狀態(tài).
ZooKeeperServer內部有一個List和一個Map,List負責按需保存整個演變過程,而Map負責以path為key,記錄節(jié)點的最終變更結果.
因為zk在生成事務之前會先做業(yè)務校驗(比如,創(chuàng)建節(jié)點前需要先校驗節(jié)點是否已經存在),校驗通過后才會生成事務對象(事務對象一旦生成就一定會被應用到DataTree中),由于在業(yè)務校驗時,可能有的已經生成的事務還沒有被成功應用,而可能導致校驗出錯,所以需要借助ChangeRecord來暫存未應用的事務節(jié)點在應用之后的節(jié)點狀態(tài),
在業(yè)務校驗時只需要校驗Map中path對應的ChangeRecord狀態(tài)即可,當獲取ChangeRecord對象時,如果Map中不存在(當前path不存在中間態(tài)的事務),則默認返回ZNode當前狀態(tài)的封裝.
4.2.核心數(shù)據(jù)結構
4.2.1.DataTree
其中包含5個數(shù)據(jù)部分:
- 節(jié)點Map:key為數(shù)據(jù)節(jié)點的path,value為數(shù)據(jù)節(jié)點本身(DataNode)
- 臨時會話Map:key為會話的sessionId,value為該會話創(chuàng)建的臨時path的集合
- 監(jiān)視管理器:負責保存客戶端請求讀操作時,注冊的path監(jiān)視,以及事件觸發(fā)
- 路徑查找樹:用于保存受quota監(jiān)控的所有的路徑
- 訪問控制列表Map,key為訪問控制列表的id,其由一個內存級的自增id分配,value為對應的控制列表配置(List<ACL>)
DataTree除了提供自身數(shù)據(jù)的操作能力以外,還提供了自身的序列化(內存數(shù)據(jù)持久化)和反序列化機制(磁盤數(shù)據(jù)加載).
注意:DataTree中只有節(jié)點Map和訪問控制列表Map可以進行持久化操作,其他數(shù)據(jù)都是內存級的
public class DataTree {
/**
* node樹(持久化)
* key為path,value為ZNode節(jié)點
*/
private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<>();
/**
* 臨時節(jié)點列表
* key為sessionId,value為path集合
*/
private final ConcurrentMap<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<>();
/**
* 數(shù)據(jù)變化監(jiān)視管理器
*/
private final WatchManager dataWatches = new WatchManager();
/**
* 子節(jié)點變化監(jiān)視管理器
*/
private final WatchManager childWatches = new WatchManager();
/**
* 路徑查找樹,用于path的快速檢索.
* 內部保存開啟配額檢查的所有路徑
*/
private final PathTrie pathTrie = new PathTrie();
/**
* 當前DataTree中包含List<ACL>的個數(shù)
* 是一個自增id,每新增一個List<ACL>值,aclIndex自增給該值分配唯一id
*/
private long aclIndex = 0;
/**
* 權限索引(持久化)
* key為aclIndex,value為List<ACL>
*/
private final Map<Long, List<ACL>> longKeyMap = new HashMap<>();
/**
* 權限索引
* key為List<ACL>,value為aclIndex
*/
private final Map<List<ACL>, Long> aclKeyMap = new HashMap<>();
/**
* 最大事務id,
* DataTree每次處理器,都會更新此值
*/
private volatile long lastProcessedZxid = 0;
}
4.2.2.ZKDatabase(簡稱zkdb)
zkdb比較簡單,它負責同時管理DataTree的數(shù)據(jù),已經所有會話的生命周期(ConcurrentHashMap<Long, Integer> sessionsWithTimeouts).
zkdb是外部進行數(shù)據(jù)操作的入口.
ZKDatabase.loadDatabase()可以完成數(shù)據(jù)從磁盤到內存的加載工作,
其內部通過調用FileTxnSnapLog.restore()完成反序列化和重建.
4.3.持久化及重建機制
4.3.1.概述
zk的持久化數(shù)據(jù)存儲中,包含兩種類型的文件,
- 一種稱為快照文件,包含某一時間點開始的DataTree和SessionMap的數(shù)據(jù),DataTree中又包含NodeMap和AclMap兩部分數(shù)據(jù),
快照文件命名格式為snapshot.{zxid},其中,zxid為開始保存快照時(也是文件創(chuàng)建時),DataTree中最大的事務id. - 另一種稱為事務日志文件,客戶端發(fā)起的每一次寫請求都會被轉化為一個事務對象,被追加到事務日志文件中.
事務日志文件的命名格式為txn.{zxid},zxid同樣為事務文件創(chuàng)建時,DataTree中最大的事務id.
事務日志文件與我們熟悉的log4j配置文件類似,也具有roll滾動機制,當文件日志量達到閾值時會觸發(fā)滾動(rollLog),創(chuàng)建新的日志文件進行繼續(xù)寫入.
同時,為了提高性能,事務文件的空間大小是采用預分配的,在文件創(chuàng)建時會以0x00填充一大塊存儲空間.
注意:
zk在持久化快照時,依然會接收客戶端請求并對DataTree進行變更操作,因此快照文件保存的數(shù)據(jù)并不能反應出任意時間點DataTree的準確狀態(tài).
不過不用擔心,我們的最終目的是通過持久化的文件數(shù)據(jù)可以在內存中重建最新狀態(tài)的DataTree.
我們已經記錄了開始序列化快照時DataTree的最大zxid,我們只需要基于快照再重播zxid之后的所有事務記錄,即可將DataTree恢復至最新狀態(tài).
4.3.2.SnapLog
該組件提供快照的序列化和反序列化,同時,我們可以從文件系統(tǒng)中查詢當前最新的快照文件.
API如下:
public interface SnapShot extends Closeable {
long deserialize(DataTree dataTree, ConcurrentMap<Long, Integer> sessions) throws IOException;
void serialize(DataTree dataTree, Map<Long, Integer> sessions, File name) throws IOException;
File findMostRecentSnapshot() throws IOException;
}
4.3.3.TxnLog
該組件提供事務日志的序列化和反序列化,功能包括:
- 將事務對象序列化后追加到文件中
- 滾動日志,創(chuàng)建新的日志文件
- 提交,將當前打開的所有日志文件進行刷新并關閉,確保緩存數(shù)據(jù)全部落盤
- 將指定zxid后面的事務數(shù)據(jù)全部從文件中刪除
- 創(chuàng)建一個迭代器,初始指向指定的zxid對應的事務記錄,可向后迭代遍歷
- 獲取當前記錄的事務中最大的zxid
API如下:
public interface TxnLog {
boolean append(TxnHeader header, Record record) throws IOException;
void rollLog() throws IOException;
void commit() throws IOException;
boolean truncate(long zxid) throws IOException;
TxnIterator read(long zxid) throws IOException;
long getLastLoggedZxid() throws IOException;
}
4.3.4.FileSnapTxnLog
該組件是SnapLog和TxnLog兩個組件的門面,通過它可以完成根據(jù)快照文件和事務文件,重建內存數(shù)據(jù)最新狀態(tài)的能力.
4.4.集群模式運行過程
4.4.1.概述
一個zk集群由一個leader節(jié)點和若干個follower節(jié)點構成.
集群正常運行的前提條件為:leader節(jié)點正常,且集群超過半數(shù)(包含leader)節(jié)點正常時,集群處于服務狀態(tài).
部分follower節(jié)點宕機,可自行恢復.
但是,如果leader節(jié)點宕機,或超過半數(shù)節(jié)點宕機,整個集群所有節(jié)點都會停止服務,進入選舉狀態(tài).
4.4.2.選舉(FastLeaderElection)
在集群節(jié)點進行選舉時,需要節(jié)點兩兩之間建立socket連接.
也就是說,每個節(jié)點都要通過ServetSocket進行端口監(jiān)聽,與此同時向其他節(jié)點發(fā)起socket連接.
但是節(jié)點之間沒必要建立兩條連接,因此zk規(guī)定:連接方向為serverId大的向serverId小的節(jié)點發(fā)起連接.
因此,假設一個5個節(jié)點組成的集群(n=5),則總共存在n*(n-1)/2=10條TCP通道.如下圖:
選舉過程中,一個很重要的數(shù)據(jù)結構,就是投票(Vote),其用于描述某個節(jié)點的競選信息.
節(jié)點保存哪個Vote對象,即代表支持哪個節(jié)點做leader.
public class Vote {
/**
* 節(jié)點id
*/
private final long id;
/**
* 節(jié)點最大事務id
*/
private final long zxid;
/**
* 節(jié)點競選輪次
*/
private final long electionEpoch;
/**
* 節(jié)點的服務輪次,
* 該值持久化在快照目錄的currentEpoch文件中
*/
private final long peerEpoch;
/**
* 節(jié)點當前狀態(tài):
* LOOKING:正在選舉
* FOLLOWING:正在追隨
* LEADING:正在領導
* OBSERVING:正在觀察
*/
private final ServerState state;
}
在進行群首選舉時,
首先,所有節(jié)點初始時都投自己(Vote=myServerid,myZxid,myPeerEpoch).
其次,需要確保大家都在相同的選舉輪次進行投票,如果某個節(jié)點小于當前的選舉輪次,則它需要重新初始化自己(丟棄自己收到的所有投票,重新投自己,廣播自己的投票)
然后,在選舉輪次一致的前提下,選舉過程中,每個節(jié)點都在重復的做兩件事,直到最終投票結束:
- 接收其他節(jié)點的投票,根據(jù)其他節(jié)點的狀態(tài),會走不同的邏輯處理邏輯
- 如果自己的票型變化,則廣播自己的投票
投票組件的組件架構如下:
4.4.2.集群初始化
4.4.2.1.內部通信數(shù)據(jù)結構
集群節(jié)點內部通信,均使用QuorumPacket傳遞數(shù)據(jù),數(shù)據(jù)結構如下:
public class QuorumPacket implements Record {
// 包類型
private int type;
// 事務id
private long zxid;
// 數(shù)據(jù)包附帶的數(shù)據(jù)
private byte[] data;
// 認證信息,包含scheme(認證方案)和id(身份信息)
private List<Id> authinfo;
}
4.4.2.2.初始化過程
初始化過程分為2個階段:服務輪次(epoch)協(xié)商和數(shù)據(jù)同步
- 服務輪次協(xié)商(acceptedEpoch和currentEpoch)
當選舉結束后,QuorumPeer的ServerState會由looking轉變?yōu)閘eading/following/observing.
QuorumPeer根據(jù)自身狀態(tài),創(chuàng)建一個對應角色(Leader or Follower)的實例,來進行epoch協(xié)商.
leader首先建立端口監(jiān)聽,等待follower向自己發(fā)起連接.
連接建立后,通過內部數(shù)據(jù)結構QuorumPacket進行epoch協(xié)商,具體過程如下:
- 數(shù)據(jù)同步
leader會根據(jù)自己與follower之間zxid差異的情況,選擇是使用補發(fā)事務記錄的同步方式,還是快照加事務的同步方式.
數(shù)據(jù)同步的具體過程如下:
4.4.3.對外服務(請求處理器)
4.4.3.1.Server體系概述
在zk內部,由ZooKeeperServer(或它的子類)實例統(tǒng)一對客戶端提供服務的,根據(jù)集群節(jié)點角色不同,會創(chuàng)建不同的子類實例.
不同的的ZooKeeperServer實例之間,最主要的區(qū)別在于對于請求處理器的組裝不同,因此對于一個請求的處理邏輯也不同.
ZooKeeperServer類體系如下:
4.4.3.2.LeaderZooKeeperServer
4.4.3.3.FollowerZooKeeperServer:
5.總結
- 第1章 介紹了zk是一個高可用的分布式數(shù)據(jù)管理集群服務,以及zk擁有3大特性,分別是:寫順序性,watch發(fā)布訂閱機制和臨時節(jié)點.
- 第2章 介紹了zk提供的zpi,以及zk的適用場景,并舉了2個使用案例:配置中心和分布式鎖.
- 第3章 介紹了如何搭建一個zk集群,以及如何對集群進行配置.
- 第4章 介紹了zk的原理,包括啟動過程,服務過程,核心數(shù)據(jù)結構,持久化及重建機制,以及集群節(jié)點選舉的全過程,等等.