Zookeeper 簡介
分布式系統(tǒng)的協(xié)調(diào)工作就是通過某種方式蛛壳,讓每個節(jié)點的信息能夠同步和共享。這依賴于服務(wù)進程之間的通信所刀。通信方式有兩種:
通過網(wǎng)絡(luò)進行信息共享
這就像現(xiàn)實中衙荐,開發(fā)leader在會上把任務(wù)傳達下去,組員通過聽leader命令或者看leader的郵件知道自己要干什么浮创。當任務(wù)分配有變化時忧吟,leader會單獨告訴組員,或者再次召開會議斩披。信息通過人與人之間的直接溝通溜族,完成傳遞。通過共享存儲
這就好比開發(fā)leader按照約定的時間和路徑垦沉,把任務(wù)分配表放到了svn上煌抒,組員每天去svn上拉取最新的任務(wù)分配表,然后干活乡话。其中svn就是共享存儲摧玫。更好一點的做法是,當svn文件版本更新時,觸發(fā)郵件通知诬像,每個組員再去拉取最新的任務(wù)分配表屋群。這樣做更好,因為每次更新坏挠,組員都能第一時間得到消
息芍躏,從而讓自己手中的任務(wù)分配表永遠是最新的。此種方式依賴于中央存儲降狠。
ZooKeeper如何解決分布式系統(tǒng)面臨的問題
ZooKeeper對分布式系統(tǒng)的協(xié)調(diào)对竣,使用的是第二種方式,共享存儲榜配。其實共享存儲否纬,分布式應(yīng)用也需要
和存儲進行網(wǎng)絡(luò)通信。
注:Slave節(jié)點要想獲取ZooKeeper的更新通知蛋褥,需事先在關(guān)心的數(shù)據(jù)節(jié)點上設(shè)置觀察點临燃。
大多數(shù)分布式系統(tǒng)中出現(xiàn)的問題募闲,都源于信息的共享出了問題搓逾。如果各個節(jié)點間信息不能及時共享和同步款票,那么就會在協(xié)作過程中產(chǎn)生各種問題焰情。ZooKeeper解決協(xié)同問題的關(guān)鍵,就是在于保證分布式系統(tǒng)信息的一致性茶宵。
zookeeper的基本概念
Zookeeper是一個開源的分布式協(xié)調(diào)服務(wù)倍踪,其設(shè)計目標是將那些復(fù)雜的且容易出錯的分布式一致性服務(wù)封裝起來梯捕,構(gòu)成一個高效可靠的原語集匙瘪,并以一些簡單的接口提供給用戶使用铆铆。zookeeper是一個典型的分布式數(shù)據(jù)一致性的解決方案,分布式應(yīng)用程序可以基于它實現(xiàn)諸如數(shù)據(jù)訂閱/發(fā)布辆苔、負載均衡算灸、命名服務(wù)、集群管理驻啤、分布式鎖和分布式隊列等功能
基本概念
①集群角色
通常在分布式系統(tǒng)中,構(gòu)成一個集群的每一臺機器都有自己的角色荐吵,最典型的集群就是Master/Slave模式(主備模式)骑冗,此情況下把所有能夠處理寫操作的機器稱為Master機器,把所有通過異步復(fù)制方式獲取最新數(shù)據(jù)先煎,并提供讀服務(wù)的機器為Slave機器贼涩。
而在Zookeeper中,這些概念被顛覆了薯蝎。它沒有沿用傳遞的Master/Slave概念遥倦,而是引入了Leader、Follower、Observer三種角色袒哥。Zookeeper集群中的所有機器通過Leader選舉來選定一臺被稱為Leader的機器缩筛,Leader服務(wù)器為客戶端提供讀和寫服務(wù),除Leader外堡称,其他機器包括Follower和Observer,Follower和Observer都能提供讀服務(wù)瞎抛,唯一的區(qū)別在于Observer不參與Leader選舉過程,不參與寫操作的過半寫成功策略却紧,因此Observer可以在不影響寫性能的情況下提升集群的性能桐臊。
②會話(session)
Session指客戶端會話,一個客戶端連接是指客戶端和服務(wù)端之間的一個TCP長連接, Zookeeper對外的服務(wù)端口默認為2181,客戶端啟動的時候,首先會與服務(wù)器建立一個TCP連接,從第一次連接建立開始,客戶端會話的生命周期也開始了,通過這個連接,客戶端能夠心跳檢測與服務(wù)器保持有效的會話,也能夠向 Zookeeper服務(wù)器發(fā)送請求并接受響應(yīng),同時還能夠通過該連接接受來自服務(wù)器的 Watch事件通知。
③數(shù)據(jù)節(jié)點(Znode)
在談到分布式的時候,我們通常說的“節(jié)點”是指組成集群的每一臺機器晓殊。然而,在 ZooKeeper中,“節(jié)點分為兩類,第一類同樣是指構(gòu)成集群的機器,我們稱之為機器節(jié)點;第二類則是指數(shù)據(jù)模型中的數(shù)據(jù)單元,我們稱之為數(shù)據(jù)節(jié)點——ZNode断凶。 ZooKeeper將所有數(shù)據(jù)存儲在內(nèi)存中,數(shù)據(jù)模型是一棵樹
(ZNode Tree),由斜杠(/)進行分割的路徑,就是一個node,例如app/path.每個node上都會保存自己的數(shù)據(jù)內(nèi)容,同時還會保存一系列屬性信息。
④版本
剛剛我們提到巫俺,Zookeeper的每個Znode上都會存儲數(shù)據(jù)认烁,對于每個ZNode,Zookeeper都會為其維護一個叫作 Stat 的數(shù)據(jù)結(jié)構(gòu)识藤,Stat記錄了這個ZNode的三個數(shù)據(jù)版本砚著,分別是version(當前ZNode的版本)、cversion(當前ZNode子節(jié)點的版本)痴昧、aversion(當前ZNode的ACL版本)稽穆。
⑤Watcher(事件監(jiān)聽器)
Wathcer(事件監(jiān)聽器),是Zookeeper中一個很重要的特性赶撰,Zookeeper允許用戶在指定節(jié)點上注冊一些Watcher舌镶,并且在一些特定事件觸發(fā)的時候,Zookeeper服務(wù)端會將事件通知到感興趣的客戶端豪娜,該機制是Zookeeper實現(xiàn)分布式協(xié)調(diào)服務(wù)的重要特性
⑥ ACL
Zookeeper采用ACL(Access Control Lists)策略來進行權(quán)限控制餐胀,其定義了如下五種權(quán)限:
·CREATE:創(chuàng)建子節(jié)點的權(quán)限。
·READ:獲取節(jié)點數(shù)據(jù)和子節(jié)點列表的權(quán)限瘤载。
·WRITE:更新節(jié)點數(shù)據(jù)的權(quán)限否灾。
·DELETE:刪除子節(jié)點的權(quán)限。
·ADMIN:設(shè)置節(jié)點ACL的權(quán)限鸣奔。
其中需要注意的是墨技,CREATE和DELETE這兩種權(quán)限都是針對子節(jié)點的權(quán)限控制
環(huán)境搭建
Zookeeper的搭建方式
Zookeeper安裝方式有三種,單機模式和集群模式以及偽集群模式挎狸。
- 單機模式:Zookeeper只運行在一臺服務(wù)器上扣汪,適合測試環(huán)境;
- 集群模式:Zookeeper運行于一個集群上锨匆,適合生產(chǎn)環(huán)境崭别,這個計算機集群被稱為一個“集合體”
- 偽集群模式:就是在一臺服務(wù)器上運行多個Zookeeper實例;
單機模式搭建:
zookeeper安裝以linux環(huán)境為例:
1、下載
首先我們下載穩(wěn)定版本的zookeeper http://zookeeper.apache.org/releases.html
配置單節(jié)點
$ tar -zxf zookeeper-3.4.6.tar.gz
$ cd zookeeper-3.4.6
$ mkdir data
cd conf
mv zoo_sample.cfg zoo.cfg
vi conf/zoo.cfg
編輯文件設(shè)置 dataDir = /path/to/zookeeper/data
$ bin/zkServer.sh start
啟動CLI
$ bin/zkCli.sh
$ bin/zkCli.sh -server 需要連接的ip:需要連接的port
windows 下啟用 zk
$ zkCli.cmd
$ zkCli.cmd -server 需要連接的ip:需要連接的port
例如 zkCli.cmd 106.75.105.152
, 不加端口茅主,默認為 2181
若報錯, 則檢查是否防火墻攔截了.
Opening socket connection to server 192.168.153.12/192.168.153.12:2181.Will not attempt to authentic
停止ZooKeeper服務(wù)器
連接服務(wù)器并執(zhí)行所有操作后舞痰,可以使用以下命令停止zookeeper服務(wù)器。
$ bin/zkServer.sh stop
在 Zookeeper中,每一個數(shù)據(jù)節(jié)點都是一個 Znode,上圖根目錄下有兩個節(jié)點,分別是:app1和app2,其中app1下面又有三個子節(jié)點所有 Znode!按層次化進行組織,形成這么一顆樹, Znodel的節(jié)點路徑標識方式和 Unix文件系統(tǒng)路徑非常相似,都是由一系列使用斜杠(/)進行分割的路徑表示,開發(fā)人員可以向這個節(jié)點寫入數(shù)據(jù),也可以在這個節(jié)點下面創(chuàng)建子節(jié)點暗膜。
默認端口為 2181
配置偽集群模式
創(chuàng)建 data 文件夾 和 logs 文件夾
clientPort=2181
# 配置快照文件存放的目錄
dataDir=/zkcluster/zookeeper01/data
# 配置日志文件存放的目錄
dataLogDir=/zkcluster/zookeeper01/data/logs
clientPort=2182
dataDir=/zkcluster/zookeeper02/data
dataLogDir=/zkcluster/zookeeper02/data/logs
clientPort=2183
dataDir=/zkcluster/zookeeper03/data
dataLogDir=/zkcluster/zookeeper03/data/logs
data 下創(chuàng)建 myid 文件, 內(nèi)容分別為 1, 2, 3 (數(shù)字可以依次累增), 這個文件的作用就是記錄zk的id
server.服務(wù)器ID=服務(wù)器IP地址:服務(wù)器之間通信端口:服務(wù)器之間投票選舉端口
server.1=10.211.55.4:2881:3881
server.2=10.211.55.4:2882:3882
server.3=10.211.55.4:2883:3883
touch myid
分別向三臺服務(wù)器寫入數(shù)字 1 2 和 3.
啟動集群
分別啟動這三臺服務(wù)器
$ bin/zkServer.sh start
查看狀態(tài)
./zkServer.sh status
Zookeeper基本使用
ZooKeeper系統(tǒng)模型
ZooKeeper數(shù)據(jù)模型Znode在ZooKeeper中匀奏,數(shù)據(jù)信息被保存在一個個數(shù)據(jù)節(jié)點上,這些節(jié)點被稱為 ZNode学搜。ZNode是Zookeeper中最小數(shù)據(jù)單位娃善,在ZNode下面又可以再掛 ZNode,這樣一層層下去就形成了一個層次化命名空間ZNode樹瑞佩,我們稱為ZNode Tree聚磺,它采用了類似文件系統(tǒng)的層級樹狀結(jié)構(gòu)進行管理。見下圖示例:
四種節(jié)點類型
- 持久節(jié)點
- 持久順序節(jié)點
- 臨時節(jié)點
- 臨時順序節(jié)點
事務(wù)ID
ZNode 狀態(tài)信息
ACL
我們可以從三個方面來理解ACL機制:權(quán)限模式( Scheme)炬丸、授權(quán)對象(ID)瘫寝、權(quán)限( Permission),通常使用" scheme:id: permission"來標識一個有效的ACL信息。
權(quán)限模式: Scheme
權(quán)限模式用來確定權(quán)限驗證過程中使用的檢驗策略
授權(quán)對象:ID
授權(quán)對象指的是權(quán)限賦予的用戶或一個指定實體,例如IP地址或是機器等稠炬。在不同的權(quán)限模式下,授權(quán)對象是不同的,表中列出了各個權(quán)限模式和授權(quán)對象之間的對應(yīng)關(guān)系
權(quán)限
權(quán)限就是指那些通過權(quán)限檢査后可以被允許執(zhí)行的操作焕阿。在 Zookeeper中,所有對數(shù)據(jù)的操作權(quán)限分為以下五大類
- CREATE(C):數(shù)據(jù)節(jié)點的創(chuàng)建權(quán)限,允許授權(quán)對象在該數(shù)據(jù)節(jié)點下創(chuàng)建子節(jié)點。
- DELETE(D子節(jié)點的刪除權(quán)限,允許授權(quán)對象刪除該數(shù)據(jù)節(jié)點的子節(jié)點首启。?
- READ(R):數(shù)據(jù)節(jié)點的讀取權(quán)限,允許授權(quán)對象訪問該數(shù)據(jù)節(jié)點并讀取其數(shù)據(jù)內(nèi)容或子節(jié)點列表等暮屡。
- WRTE(W):數(shù)據(jù)節(jié)點的更新權(quán)限,允許授權(quán)對象對該數(shù)據(jù)節(jié)點進行更新操作。
- ADMIN(A):數(shù)據(jù)節(jié)點的管理權(quán)限,允許授權(quán)對象對該數(shù)據(jù)節(jié)點進行ACL相關(guān)的設(shè)置操作毅桃。
創(chuàng)建節(jié)點
使用 creates命令,可以創(chuàng)建一個 Zookeeper節(jié)點,如
create [-s][-e] path data acl
其中,-s
或 -e
分別指定節(jié)點特性,順序或臨時節(jié)點,若不指定,則創(chuàng)建持久節(jié)點;ac1用來進行權(quán)限控制褒纲。
- 創(chuàng)建永久(持久)節(jié)點
使用create /zk-permanent 123
命令創(chuàng)建zk- permanent永久節(jié)點
[zk: Loca thost: 2181(CONNECTED) 1] create/zk-permanent 123
Created/Zk-permanent
[zk: localhost: 2181(CONNECTED) 2] Ls/
[zk-permanent, zookeeper, zk-test00000000041
- 創(chuàng)建持久順序節(jié)點
使用create -s /zk-test 123
命令創(chuàng)建zk-test順序節(jié)點
lzk: Localhost: 2181(CONNECTED)0] create-s/zk-test 123
Created/Zk-testo000000004
執(zhí)行完后,就在根節(jié)點下創(chuàng)建了一個叫做 / zk-test
的節(jié)點,該節(jié)點內(nèi)容就是123,同時可以看到創(chuàng)建的
zk-test 節(jié)點后面添加了一串數(shù)字以示區(qū)別
創(chuàng)建臨時節(jié)點
使用create -e /zk-temp 123
命令創(chuàng)建zk-temp臨時節(jié)點創(chuàng)建臨時順序節(jié)點
create -e -s /zk-temp 123
可以看到永久節(jié)點不同于順序節(jié)點, 不會自動在后面添加一串數(shù)字
quit 退出客戶端
讀取節(jié)點
與讀取相關(guān)的命令有 ls 命令和 get 命令
ls 命令可以列出 Zookeeper指定節(jié)點下的所有子節(jié)點,但只能查看指定節(jié)點下的第一級的所有子節(jié)點;
ls path
其中,path表示的是指定數(shù)據(jù)節(jié)點的節(jié)點路徑
get命令可以獲取 Zookeeper:指定節(jié)點的數(shù)據(jù)內(nèi)容和屬性信息。
get path
若獲取根節(jié)點下面的所有子節(jié)點,使用 ls 命令即可
若想獲取/zk-permanente的數(shù)據(jù)內(nèi)容和屬性,可使用如下命令:get /zk-permanent
更新節(jié)點
使用set命令,可以更新指定節(jié)點的數(shù)據(jù)內(nèi)容,用法如下
set path data [version]
其中,data就是要更新的新內(nèi)容, version表示數(shù)據(jù)版本,在 zookeeper中,節(jié)點的數(shù)據(jù)是有版本概
念的,這個參數(shù)用于指定本次更新操作是基于 Inode的哪一個數(shù)據(jù)版本進行的,如將/zk- permanent節(jié)
點的數(shù)據(jù)更新為455,可以使用如下命令:set /zk-permanent 456
zk: Loca Lhost: 2181( CONNECTED)3] set /zk-permanent 456
Iczxid 0X12
Ctime Sat Mar 07 18: 11: 14 CST 2020
Imzxid =0x13
Mtime =Sat Mar 07 18: 13: 48 CST 2020
Zxid = 0x12
cversion =O
ldataversion 1
laclversion =0
ephemera lowner 0x0
ldatalength =3
numchildren =0
刪除節(jié)點
使用 delete 命令可以刪除 Zookeeper上的指定節(jié)點,用法如下
delete path [version]
其中 version也是表示數(shù)據(jù)版本,使用 delete /zk-permanent
命令即可刪除 zk-permanent節(jié)點
zk 的 Java 客戶端工具
zk 的 Java 客戶端工具 curator
創(chuàng)建節(jié)點
獲取數(shù)據(jù)
// 普通查詢
client.getData().forPath(path);
// 包含狀態(tài)查詢
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
更新數(shù)據(jù)
// 普通更新
client.setData().forPath(path,"新內(nèi)容".getBytes());
// 指定版本更新
client.setData().withVersion(1).forPath(path);
刪除數(shù)據(jù)
配置存儲
命名服務(wù)
如果加了排它鎖則只對一個事務(wù)可見, 若加上共享鎖,則對所有事務(wù)可見.
作業(yè)
編程題一:
在基于 Netty 的自定義RPC的案例基礎(chǔ)上钥飞,進行改造莺掠。基于 Zookeeper 實現(xiàn)簡易版服務(wù)的注冊與發(fā)現(xiàn)機制
要求完成改造版本:
- 啟動 2 個服務(wù)端读宙,可以將IP及端口信息自動注冊到 Zookeeper
- 客戶端啟動時彻秆,從Zookeeper中獲取所有服務(wù)提供端節(jié)點信息,客戶端與每一個服務(wù)端都建立連接
- 某個服務(wù)端下線后结闸,Zookeeper注冊列表會自動剔除下線的服務(wù)端節(jié)點掖棉,客戶端與下線的服務(wù)端斷開連接
- 服務(wù)端重新上線,客戶端能感知到膀估,并且與重新上線的服務(wù)端重新建立連接
編程題二:
基于作業(yè)一的基礎(chǔ)上,實現(xiàn)基于 Zookeeper 的簡易版負載均衡策略
要求完成改造版本:
- Zookeeper 記錄每個服務(wù)端的最后一次響應(yīng)時間耻讽,有效時間為 5秒察纯,5s內(nèi)如果該服務(wù)端沒有新的請求,響應(yīng)時間清零或失效
- 當客戶端發(fā)起調(diào)用,每次都選擇最后一次響應(yīng)時間短的服務(wù)端進行服務(wù)調(diào)用饼记,如果時間一致香伴,隨機選取一個服務(wù)端進行調(diào)用,從而實現(xiàn)負載均衡
編程題三:
基于Zookeeper實現(xiàn)簡易版配置中心
要求實現(xiàn)以下功能:
- 創(chuàng)建一個 Web 項目具则,將數(shù)據(jù)庫連接信息交給Zookeeper配置中心管理即纲,即:當項目Web項目啟動時,從 Zookeeper 進行 MySQL 配置參數(shù)的拉取
- 要求項目通過數(shù)據(jù)庫連接池訪問MySQL(連接池可以自由選擇熟悉的)
- 當 Zookeeper 配置信息變化后Web項目自動感知博肋,正確釋放之前連接池低斋,創(chuàng)建新的連接池
作業(yè)資料說明:
1、提供資料:3個代碼工程匪凡、驗證及講解視頻膊畴。(倉庫中只有本次作業(yè)內(nèi)容)
2、講解內(nèi)容包含:題目分析病游、實現(xiàn)思路唇跨、代碼講解。
3衬衬、效果視頻驗證:
3.1 作業(yè)1:服務(wù)端的上線與下線买猖,客戶端能動態(tài)感知,并能重新構(gòu)成負載均衡滋尉。
3.2 作業(yè)2:作業(yè)完成情況下玉控,選擇性能好的服務(wù)器處理(響應(yīng)時間短的服務(wù)器即為性能好)。Zookeeper記錄客戶端響應(yīng)有效時間為5s兼砖,超時判定該客戶端失效奸远。
3.3 作業(yè)3:Zookeeper配置中心,web訪問數(shù)據(jù)庫需要從Zookeeper獲取連接資源讽挟。當Zookeeper配置發(fā)生改變懒叛,web自動切換到新的連接資源,保持正常訪問耽梅。
作業(yè)1
新增 NodeChangeListener 類
public interface NodeChangeListener {
/**
*
* @param serviceName 服務(wù)名稱
* @param serviceList 服務(wù)名稱對應(yīng)節(jié)點下的所有子節(jié)點, 目前沒有用到
* @param pathChildrenCacheEvent
*/
void notify(String serviceName, List<String> serviceList, PathChildrenCacheEvent pathChildrenCacheEvent);
}
將 zk 的行為抽象成接口
public interface RpcRegistryHandler extends NodeChangeListener {
/**
* 服務(wù)端進行調(diào)用
*
* @param service
* @param ip
* @param port
* @return
*/
boolean registry(final String service, final String ip, final int port);
/**
* 客戶端進行調(diào)用
*
* @param service
* @return
*/
List<String> discovery(final String service);
void addListener(NodeChangeListener service);
void destroy();
}
ConfigKeeper 配置類
public class ConfigKeeper {
/**
* netty 的端口號
*/
private int nettyPort;
/**
* zk 地址: ip + 端口
*/
private String zkAddr;
/**
* 主動上報時間薛窥,單位 秒
*/
private int interval;
/**
* 區(qū)分是客戶端 還是 server 端, true 是服務(wù)端眼姐, false 是客戶端
*/
private boolean providerSide;
// 單例類诅迷,setter 和 getter 方法
}
新增 RpcResponse 類
package com.lagou;
public class RpcResponse {
/**
* 響應(yīng)ID
*/
private String requestId;
/**
* 錯誤信息
*/
private String error;
/**
* 返回的結(jié)果
*/
private Object result;
// setter 和 getter 方法, toString方法
}
RpcServerHandler 類,這次主要對 channelRead 的方法內(nèi)容作了調(diào)整
/**
* 服務(wù)端將數(shù)據(jù) 寫入 客戶端众旗, 繼續(xù)傳遞下去
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 222222
RpcRequest request = (RpcRequest) msg;
final RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setRequestId(request.getRequestId());
System.out.println("111 接收到" + request.getRequestId());
rpcResponse.setResult(handler(request));
// 3333333
ctx.writeAndFlush(rpcResponse);
}
服務(wù)端 rpc -server 用到的配置類 RpcServerConfig
public class RpcServerConfig {
private String nettyHost;
private int nettyPort;
private int delay;
/**
* 是否是服務(wù)端
*/
private boolean providerSide;
/**
* 應(yīng)用的名稱
*/
private String applicationName;
private Map<String, Class> services;
// setter 和 getter 方法
}
RpcServer 類
自身 implements InitializingBean, DisposableBean 接口
Autowired 了 RpcRegistryFactory 對象
主要關(guān)注 afterPropertiesSet 和 destroy 方法即可
@Override
public void afterPropertiesSet() throws Exception {
this.initRpcServerConfig();
this.startServer();
}
@Override
public void destroy() {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
這里牽涉到了RpcRegistryFactory
/**
* 注冊中心工廠類
*/
@Component
public class RpcRegistryFactory implements FactoryBean<RpcRegistryHandler>, DisposableBean {
private RpcRegistryHandler rpcRegistryHandler;
@Override
public RpcRegistryHandler getObject() {
if (this.rpcRegistryHandler == null) {
rpcRegistryHandler = new ZkRegistryHandler(ConfigKeeper.getInstance().getZkAddr());
}
return rpcRegistryHandler;
}
@Override
public Class<?> getObjectType() {
System.out.println("RpcRegistryFactory ### getObjectType.....");
return RpcRegistryHandler.class;
}
@Override
public void destroy() {
System.out.println("RpcRegistryFactory ### destroy.....");
rpcRegistryHandler.destroy();
}
}
用于設(shè)計參數(shù)的 ProviderLoader
public class ProviderLoader {
private ProviderLoader() {
}
/**
* 返回類的全路徑名 -> 該類的 class
* @return
*/
public static Map<String, Class> getInstanceCacheMap() {
Map<String, Class> services = new HashMap<>();
services.put(IUserService.class.getName(), IUserService.class);
return services;
}
}
ZkRegistryHandler 是對 RpcRegistryHandler 接口的 zk 實現(xiàn)罢杉。
public class ZkRegistryHandler implements RpcRegistryHandler {
private static final String ZK_PATH_SPLITER = "/";
private static final String LAGOU_EDU_RPC_ZK_ROOT = ZK_PATH_SPLITER + "lg-rpc-provider" + ZK_PATH_SPLITER;
private List<NodeChangeListener> listenerList = new ArrayList<>();
private final String url;
private final CuratorFramework client;
private volatile boolean closed;
/**
* 子節(jié)點列表
*/
private List<String> serviceList;
private static final ScheduledExecutorService REPORT_WORKER = Executors.newScheduledThreadPool(5);
public ZkRegistryHandler(final String zkPath) {
url = zkPath;
this.client = CuratorFrameworkFactory.builder()
.connectString(zkPath)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
if (ConnectionState.CONNECTED.equals(connectionState)) {
System.out.println("注冊中心連接成功");
}
}
});
client.start();
// 定時上報
final ConfigKeeper configKeeper = ConfigKeeper.getInstance();
final boolean providerSide = configKeeper.isProviderSide();
final int interval = configKeeper.getInterval();
if (!providerSide && interval > 0) {
REPORT_WORKER.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println("我是一個定時任務(wù)");
// RequestMetri
}
}, interval, interval, TimeUnit.SECONDS);
}
}
/**
* 服務(wù)端注冊用到的方法
* @param serviceName
* @param nettyHost
* @param nettyPort
* @return
*/
@Override
public boolean registry(final String serviceName, final String nettyHost, final int nettyPort) {
String zkPath = providePath(serviceName);
if (!exists(zkPath)) {
create(zkPath, false);
}
// /lg-rpc-provider/com.lagou.server.IUserService/provider/localhost:8999
String instancePath = zkPath + ZK_PATH_SPLITER + nettyHost + ":" + nettyPort;
create(instancePath, true);
return true;
}
/**
* 客戶端查找服務(wù)的方法
*
* @param serviceName
* @return
*/
@Override
public List<String> discovery(final String serviceName) {
final String path = providePath(serviceName);
if (serviceList == null || serviceList.isEmpty()) {
System.out.println("首次查找地址");
try {
serviceList = client.getChildren().forPath(path);
} catch (Exception e) {
e.printStackTrace();
}
}
this.registryWatch(serviceName, path);
return serviceList;
}
@Override
public void addListener(NodeChangeListener listener) {
listenerList.add(listener);
}
@Override
public void destroy() {
client.close();
}
@Override
public void notify(String children, List<String> serviceList, PathChildrenCacheEvent pathChildrenCacheEvent) {
for (NodeChangeListener nodeChangeListener : listenerList) {
nodeChangeListener.notify(children, serviceList, pathChildrenCacheEvent);
}
}
private void create(final String path, final boolean ephemeral) {
CreateMode createMode;
if (ephemeral) {
createMode = CreateMode.EPHEMERAL;
} else {
createMode = CreateMode.PERSISTENT;
}
try {
client.create().creatingParentsIfNeeded().withMode(createMode).forPath(path);
} catch (KeeperException.NodeExistsException e) {
// do nothing
System.out.println("該路徑已存在" + path);
}
catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
private boolean exists(final String path) {
try {
if (client.checkExists().forPath(path) != null) {
return true;
}
} catch (KeeperException.NoNodeException e) {
// do nothing
} catch (Exception e) {
throw new RuntimeException(e);
}
return false;
}
/**
* 設(shè)置監(jiān)聽的方法
*
* @param serviceName
* @param path
*/
private void registryWatch(final String serviceName, final String path) {
PathChildrenCache nodeCache = new PathChildrenCache(client, path, true);
try {
nodeCache.getListenable().addListener((client, pathChildrenCacheEvent) -> {
// 更新本地緩存
serviceList = client.getChildren().forPath(path);
listenerList.forEach(nodeChangeListener -> {
System.out.println("節(jié)點變化");
nodeChangeListener.notify(serviceName, serviceList, pathChildrenCacheEvent);
});
});
nodeCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
} catch (Exception e) {
}
}
/**
* 返回 /lg-rpc-provider/com.lagou.server.IUserService/provider
* @param serviceName
* @return
*/
private String providePath(String serviceName) {
return LAGOU_EDU_RPC_ZK_ROOT + serviceName + ZK_PATH_SPLITER + "provider";
}
private String metricsPath() {
return LAGOU_EDU_RPC_ZK_ROOT + "metrics";
}
}
rpc-server 的啟動類
@SpringBootApplication
public class MyApplication {
public static void main(String[] args) {
// ["localhost:2181", "8999"]
// ["localhost:2181", "9000"]
final String zkPath = args[0];
final int nettyPort = Integer.parseInt(args[1]);
// 將IP及端口信息自動注冊到 Zookeeper
ConfigKeeper configKeeper = ConfigKeeper.getInstance();
configKeeper.setProviderSide(true);
configKeeper.setInterval(5);
configKeeper.setNettyPort(nettyPort);
configKeeper.setZkAddr(zkPath);
System.out.println(configKeeper);
SpringApplication.run(MyApplication.class, args);
// 可以通過 ls /lg-rpc-provider/com.lagou.server.IUserService/provider 查看節(jié)點信息
}
}
分別為 netty 啟動 8888 和 8900 端口
接下來講解 rpc-client
UserClientHandler 類可以復(fù)用
新增 RpcClient
主要對外暴露了 initClient 和 send 方法
// 2. 初始化netty客戶端(創(chuàng)建連接池 bootstrap, 設(shè)置 BootStrap 連接服務(wù)器)
public void initClient(String serviceClassName) throws InterruptedException {
// 創(chuàng)建連接池
this.group = new NioEventLoopGroup();
// 創(chuàng)建客戶端啟動類
Bootstrap bootstrap = new Bootstrap();
// 配置啟動引導(dǎo)類
bootstrap.group(group)
// 通道類型為 NIO
.channel(NioSocketChannel.class)
// 設(shè)置請求協(xié)議為 tcp
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
// 監(jiān)聽 channel 并初始化
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
// 獲取管道對象
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new RpcEncoder(RpcRequest.class, new JSONSerializer()));
pipeline.addLast(new RpcDecoder(RpcResponse.class, new JSONSerializer()));
// 自定義事件處理器
pipeline.addLast(new UserClientHandler());
}
});
this.channel = bootstrap.connect(this.nettyIp, this.nettyPort).sync().channel();
if (!isValidate()) {
close();
return;
}
System.out.println("啟動客戶端" + serviceClassName + ", ip = " + this.nettyIp + ", port = " + nettyPort);
}
public Object send(RpcRequest request) throws InterruptedException, ExecutionException {
// 統(tǒng)計請求時間
RequestMetrics.getInstance().put(nettyIp, this.nettyPort, request.getRequestId());
return this.channel.writeAndFlush(request).sync().get();
}
新增 RpcConsumer 類
主要有用的方法有構(gòu)造方法,createProxy的方法贡歧,notify為類自身 實現(xiàn) NodeChangeListener 接口的方法(因為構(gòu)造時會this.rpcRegistryHandler.addListener(this);)滩租。
public class RpcConsumer implements NodeChangeListener {
private final RpcRegistryHandler rpcRegistryHandler;
private final Map<String, Class> serviceMap;
/**
* 服務(wù)名 -> List<RpcClient>
*/
private final Map<String, List<RpcClient>> CLIENT_POOL = new HashMap<>();
private LoadBalanceStrategy balanceStrategy = new RandomLoadBalance();
/**
* 初始化
* @param rpcRegistryHandler
* @param instanceCacheMap
*/
public RpcConsumer(final RpcRegistryHandler rpcRegistryHandler, final Map<String, Class> instanceCacheMap) {
this.rpcRegistryHandler = rpcRegistryHandler;
this.serviceMap = instanceCacheMap;
// 開始自動注冊消費者邏輯: accept 方法
serviceMap.forEach((className, clazz) -> {
List<RpcClient> rpcClients = CLIENT_POOL.get(className);
if (rpcClients == null) {
rpcClients = new ArrayList<>();
}
// 127.0.0.1:8999 127.0.0.1:9000
final List<String> discovery = this.rpcRegistryHandler.discovery(className);
for (String s : discovery) {
// s -> rpcClient
final String[] split = s.split(":");
String nettyIp = split[0];
int nettyPort = Integer.parseInt(split[1]);
final RpcClient rpcClient = new RpcClient(nettyIp, nettyPort);
try {
rpcClient.initClient(className);
} catch (InterruptedException e) {
e.printStackTrace();
}
rpcClients.add(rpcClient);
CLIENT_POOL.put(className, rpcClients);
}
});
this.rpcRegistryHandler.addListener(this);
}
// 4. 編寫一個方法赋秀,使用 jdk 動態(tài)代理對象
@SuppressWarnings("unchecked")
public <T> T createProxyEnhance(final Class<T> serverClass) {
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[]{serverClass}, (proxy, method, args) -> {
final String serverClassName = serverClass.getName();
// 封裝
final RpcRequest request = new RpcRequest();
final String requestId = UUID.randomUUID().toString().substring(0, 7);
String className = method.getDeclaringClass().getName();
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
request.setRequestId(requestId);
request.setClassName(className);
request.setMethodName(methodName);
request.setParameterTypes(parameterTypes);
request.setParameters(args);
System.out.println("******************************\n請求id = " + requestId + ", 請求方法名 = " + methodName + ", 請求參數(shù) = " + Arrays.toString(args));
RpcClient rpcClient = balanceStrategy.route(CLIENT_POOL, serverClassName);
if (rpcClient == null) {
System.out.println("沒找到對應(yīng)服務(wù)端,返 NULL");
return null;
}
System.out.println(request);
// request 最終會客戶端發(fā)送給服務(wù)端進行消費
return rpcClient.send(request);
});
}
/**
* 監(jiān)聽臨時節(jié)點的變化
*
* @param service 服務(wù)名稱
* @param serviceList 服務(wù)名稱對應(yīng)節(jié)點下的所有子節(jié)點
* @param pathChildrenCacheEvent
*/
@Override
public void notify(final String service, final List<String> serviceList,
final PathChildrenCacheEvent pathChildrenCacheEvent) {
// 取出變化的節(jié)點名稱律想, 例如為 /lg-rpc-provider/com.lagou.server.IUserService/provider/localhost:9000
final String path = pathChildrenCacheEvent.getData().getPath();
System.out.println("變化節(jié)點的路徑: " + path + ", 變化的類型: " + pathChildrenCacheEvent.getType());
// 分離出 ip:port 的組合猎莲。
final String instanceConfig = path.substring(path.lastIndexOf("/") + 1);
System.out.println("instanceConfig: " + instanceConfig);
final String[] address = instanceConfig.split(":");
System.out.println("address: " + address);
final String nettyIp = address[0];
final int nettyPort = Integer.parseInt(address[1]);
List<RpcClient> rpcClients = CLIENT_POOL.get(service);
switch (pathChildrenCacheEvent.getType()) {
// 增加節(jié)點
case CHILD_ADDED:
case CONNECTION_RECONNECTED:
if (rpcClients == null) {
rpcClients = new ArrayList<>();
}
final RpcClient rpcClient = new RpcClient(nettyIp, nettyPort);
try {
rpcClient.initClient(service);
} catch (InterruptedException e) {
e.printStackTrace();
}
rpcClients.add(rpcClient);
// 節(jié)點耗時統(tǒng)計
RequestMetrics.getInstance().addNode(nettyIp, nettyPort);
System.out.println("新增節(jié)點" + instanceConfig);
break;
// 增加節(jié)點
case CHILD_REMOVED:
case CONNECTION_SUSPENDED:
case CONNECTION_LOST:
if (rpcClients != null) {
for (RpcClient client : rpcClients) {
if (client.getNettyIp().equals(nettyIp) && client.getNettyPort() == nettyPort) {
rpcClients.remove(client);
// 節(jié)點耗時統(tǒng)計
RequestMetrics.getInstance().remoteNode(nettyIp, nettyPort);
System.out.println("移除節(jié)點" + instanceConfig);
break;
}
}
}
break;
}
}
}
最后再講講 ConsumerBootStrap, 該類基本沒啥改動
public class ConsumerBootStrap {
public static void main(final String[] args) throws Exception {
final ConfigKeeper configKeeper = ConfigKeeper.getInstance();
configKeeper.setZkAddr(args[0]);
// 之后會啟動一個定時的線程池,每 5s 上傳到注冊中心
configKeeper.setInterval(5);
configKeeper.setProviderSide(false);
final RpcRegistryHandler rpcRegistryHandler = new ZkRegistryHandler(configKeeper.getZkAddr());
System.out.println("客戶端 Zookeeper session established.");
// 最后一步
final RpcConsumer consumer = new RpcConsumer(rpcRegistryHandler, ProviderLoader.getInstanceCacheMap());
final IUserService userService = consumer.createProxyEnhance(IUserService.class);
while (true) {
Thread.sleep(4900);
final String result = userService.sayHello("are you ok?");
// 恒為 null
System.out.println("返回 = " + result);
}
}
}
作業(yè)2
1.消費者每次請求完成時更新最后一次請求耗時和系統(tǒng)時間
這部分工作主要在客戶端做技即。
首先介紹一下這次用到的兩個類
package com.lagou.boot;
public class Metrics {
private String nettyIp;
private int nettyPort;
private long start;
private Long cost;
public Metrics(String nettyIp, int nettyPort, long start, Long cost) {
this.nettyIp = nettyIp;
this.nettyPort = nettyPort;
this.start = start;
this.cost = cost;
}
public Metrics(String nettyIp, int nettyPort, long start) {
this(nettyIp, nettyPort, start, null);
}
public String getNettyIp() {
return nettyIp;
}
public void setNettyIp(String nettyIp) {
this.nettyIp = nettyIp;
}
public int getNettyPort() {
return nettyPort;
}
public void setNettyPort(int nettyPort) {
this.nettyPort = nettyPort;
}
public long getStart() {
return start;
}
public void setStart(long start) {
this.start = start;
}
public Long getCost() {
return cost;
}
public void setCost(Long cost) {
this.cost = cost;
}
}
RequestMetrics 類
COST_TIME_MAP變量 ip:端口 -》 耗時
REQUEST_ID_MAP變量 requestId -> ip + 端口 + 起始時間戳 + 耗時
calculate 方法用于 根據(jù)requestId 進行耗時統(tǒng)計
統(tǒng)計請求時間 在RpcClient的 send 方法中進行
public class RequestMetrics {
/**
* ip:端口 -》 耗時
*/
private static final ConcurrentHashMap<String, Long> COST_TIME_MAP = new ConcurrentHashMap<>();
/**
* requestId -> ip + 端口 + 起始時間戳 + 耗時
* 每個 requestId 用完一次后就會被銷毀
*/
private static final ConcurrentHashMap<String, Metrics> REQUEST_ID_MAP = new ConcurrentHashMap<>();
private static final RequestMetrics requestMetrics = new RequestMetrics();
public ConcurrentHashMap<String, Long> getMetricMap() {
return COST_TIME_MAP;
}
private RequestMetrics() {
}
public static RequestMetrics getInstance() {
return requestMetrics;
}
public void addNode(String nettyIp, int nettyPort) {
COST_TIME_MAP.put(nettyIp + ":" + nettyPort, 0L);
}
public void remoteNode(String nettyIp, int nettyPort) {
COST_TIME_MAP.remove(nettyIp + ":" + nettyPort);
}
/**
* 響應(yīng)時放入著洼, 根據(jù)requestId 進行耗時統(tǒng)計
* @param requestId
*/
public void calculate(String requestId) {
final Metrics metrics = REQUEST_ID_MAP.get(requestId);
Long cost = System.currentTimeMillis() - metrics.getStart();
COST_TIME_MAP.put(metrics.getNettyIp() + ":" + metrics.getNettyPort(), cost);
REQUEST_ID_MAP.remove(requestId);
}
/**
* 獲取所有節(jié)點耗時統(tǒng)計
*/
public List<Metrics> getAllInstances() {
List<Metrics> result = new ArrayList<>();
COST_TIME_MAP.forEach((url, aLong) -> {
String[] split = url.split(":");
result.add(new Metrics(split[0], Integer.parseInt(split[1]), aLong));
});
return result;
}
/**
* 請求時放入
* @param nettyIp
* @param nettyPort
* @param requestId
*/
public void put(String nettyIp, int nettyPort, String requestId) {
REQUEST_ID_MAP.put(requestId, new Metrics(nettyIp, nettyPort, System.currentTimeMillis(), null));
}
}
2.消費者定時在啟動時創(chuàng)建定時線程池,每隔5s自動上報而叼,更新Zookeeper臨時節(jié)點的值
ConsumerBootStrap 入口有一個參數(shù)配置
// 之后會啟動一個定時的線程池身笤,每 5s 上傳到注冊中心
configKeeper.setInterval(5);
ZkRegistryHandler 會開啟一個 ScheduledExecutorService 線程池服務(wù)
RequestMetrics 的
// 定時上報
final ConfigKeeper configKeeper = ConfigKeeper.getInstance();
final boolean providerSide = configKeeper.isProviderSide();
final int interval = configKeeper.getInterval();
if (!providerSide && interval > 0) {
REPORT_WORKER.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println("我是一個定時任務(wù)");
// ...
}
}, interval, interval, TimeUnit.SECONDS);
}
- 每次上報時判斷當前時間距離最后一次請求是否超過5s,超過5s則需要刪除Zookeeper上面的內(nèi)容
這里介紹下 RequestMetrics 的 getAllInstances() 方法, 如果 5 秒內(nèi)沒有響應(yīng)清空請求時間澈歉。
/**
* 獲取所有節(jié)點耗時統(tǒng)計
*/
public List<Metrics> getAllInstances() {
List<Metrics> result = new ArrayList<>();
COST_TIME_MAP.forEach((url, aLong) -> {
String[] split = url.split(":");
result.add(new Metrics(split[0], Integer.parseInt(split[1]), aLong));
});
return result;
}
接下來簡單說一下負載均衡策略展鸡,這里主要涉及到了使用那個客戶端進行服務(wù)的請求。
public abstract class AbstractLoadBalanceStrategy implements LoadBalanceStrategy{
@Override
public RpcClient route(Map<String, List<RpcClient>> clientPool, String serverClassName) {
List<RpcClient> rpcClients = clientPool.get(serverClassName);
if (null == rpcClients) return null;
return doSelect(rpcClients);
}
protected abstract RpcClient doSelect(List<RpcClient> rpcClients);
}
MinCostLoadBalance (該類未經(jīng)驗證)
public class MinCostLoadBalance extends AbstractLoadBalanceStrategy {
@Override
protected RpcClient doSelect(final List<RpcClient> rpcClients) {
ConcurrentHashMap<String, Long> metricMap = RequestMetrics.getInstance().getMetricMap();
RpcClient minCostRpcClient = rpcClients.get(0);
final Long minLong = metricMap.get(minCostRpcClient.getNettyIp() + minCostRpcClient.getNettyPort());
for (int i = 1; i < rpcClients.size(); i++) {
RpcClient rpcClient = rpcClients.get(i);
String nettyIp = rpcClient.getNettyIp();
int nettyPort = rpcClient.getNettyPort();
// 取出最小響應(yīng)時間的客戶端埃难,并進行調(diào)用
Long aLong = metricMap.get(nettyIp + nettyPort);
if (aLong != null && aLong < minLong) {
minCostRpcClient = rpcClient;
}
}
return minCostRpcClient;
}
}
RandomLoadBalance (該類未經(jīng)驗證)
public class RandomLoadBalance extends AbstractLoadBalanceStrategy {
private final Random random = new Random();
@Override
protected RpcClient doSelect(List<RpcClient> rpcClients) {
int size = rpcClients.size();
int index = random.nextInt(size);
return rpcClients.get(index);
}
}
作業(yè)3
以下項目主要使用了 commons-dbcp + fastjson + apache.curator 技術(shù)進行實現(xiàn)莹弊。
這里會通過create [-s][-e] path data acl
命令創(chuàng)建節(jié)點:
建立所需節(jié)點
我會將數(shù)據(jù)庫配置的用戶名和密碼等信息寫入/dbConfig/lagou.config.DbConfig
節(jié)點中。
先建立父節(jié)點
create /dbConfig ""
然后若不存在臨時節(jié)點則重新創(chuàng)建
# 向 /dbConfig/lagou.config.DbConfig 中寫入配置信息
create -e /dbConfig/lagou.config.DbConfig {"username":"root","password":"123456","url":"jdbc:mysql://localhost:3306/aaaa?serverTimezone=UTC"}
# 查看是否能正常獲取節(jié)點信息
get /dbConfig/lagou.config.DbConfig
更改數(shù)據(jù)
# 更改數(shù)據(jù)庫為 aaaa
set /dbConfig/lagou.config.DbConfig {"username":"root","password":"123456","url":"jdbc:mysql://localhost:3306/aaaa?serverTimezone=UTC"}
# 更改數(shù)據(jù)庫為 bbbb
set /dbConfig/lagou.config.DbConfig {"username":"root","password":"123456","url":"jdbc:mysql://localhost:3306/bbbb?serverTimezone=UTC"}
創(chuàng)建Java類
- 創(chuàng)建工具類 RuntimeContext
@Component
public class RuntimeContext implements ApplicationContextAware {
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (RuntimeContext.applicationContext == null) {
RuntimeContext.applicationContext = applicationContext;
}
}
//獲取applicationContext
private static ApplicationContext getApplicationContext() {
return applicationContext;
}
//通過name獲取Bean
public static Object getBean(String name) {
return getApplicationContext().getBean(name);
}
// ...
}
- 實體類
package lagou.config;
public class DbConfig {
private String url;
private String username;
private String password;
// setter getter 方法
}
- 創(chuàng)建 MyDataSource 自定義數(shù)據(jù)源涡尘,我們重寫的dbcp中的類BasicDataSource忍弛,我們將其全部拷貝了出來,然后重命名為MyDataSource類考抄,然后在其中修改了以下內(nèi)容
3.1將 UNKNOWN_TRANSACTIONISOLATION 的值改為 -1. 否則這個內(nèi)部變量會找不到
/**
* The default TransactionIsolation state of connections created by this pool.
*/
protected volatile int defaultTransactionIsolation = PoolableConnectionFactory.UNKNOWN_TRANSACTIONISOLATION;
3.2jdk 1.7 之后需要實現(xiàn)該方法 getParentLogger()
@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return null;
}
3.3 修改已有的 createDataSource() 方法细疚,刪除這幾行代碼
3.4 新增 changeDataSource 方法
public static void changeDataSource() {
MyDataSource dataSource = (MyDataSource) RuntimeContext.getBean("dataSource");
try {
dataSource.close();
dataSource.createDataSource();
} catch (SQLException e) {
e.printStackTrace();
}
}
- 新建 InitListener 類,該類實現(xiàn)了ServletContextListener 來對Zookeeper節(jié)點 /db/url 的監(jiān)聽
public class InitListener implements ServletContextListener {
private static final String CONNENT_ADDR = "localhost:2181";
private static final String PATH = "/dbConfig";
private static final String SUB_PATH = PATH + "/" + DbConfig.class.getName();
@Override
public void contextInitialized(ServletContextEvent sce) {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(CONNENT_ADDR)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
curatorFramework.start();
PathChildrenCache nodeCache = new PathChildrenCache(curatorFramework, "/dbConfig", true);
try {
nodeCache.getListenable().addListener((client, pathChildrenCacheEvent) -> {
System.out.println(pathChildrenCacheEvent.getType());
if (PathChildrenCacheEvent.Type.CHILD_UPDATED.equals(pathChildrenCacheEvent.getType())) {
final ChildData data = pathChildrenCacheEvent.getData();
if (data != null) {
final String path = data.getPath();
System.out.println(path);
System.out.println(SUB_PATH);
if (path.equals(SUB_PATH)) {
MyDataSource datasource = (MyDataSource) RuntimeContext.getBean("dataSource");
final DbConfig dbConfig = JSON.parseObject(new String(data.getData()), DbConfig.class);
System.out.println(dbConfig.toString());
datasource.setUrl(dbConfig.getUrl());
datasource.setUsername(dbConfig.getUsername());
datasource.setPassword(dbConfig.getPassword());
MyDataSource.changeDataSource();
}
}
}
});
nodeCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
}
}
- 修改spring boot 啟動類川梅,注冊 InitListener 疯兼,配置 我們自定義的 DataSource,建立一個query方法(可通過/query 進行訪問)暴露出去贫途。
@SpringBootApplication
@RestController
public class MyApplication {
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
@RequestMapping("/query")
public String query() {
String sql = "select id from `info` limit 1";
return jdbcTemplate.queryForObject(sql, String.class);
}
@Bean
public ServletListenerRegistrationBean servletListenerRegistrationBean() {
ServletListenerRegistrationBean servletListenerRegistrationBean = new ServletListenerRegistrationBean();
servletListenerRegistrationBean.setListener(new InitListener());
return servletListenerRegistrationBean;
}
@Bean
public DataSource dataSource(@Value("${spring.datasource.url}") String url,
@Value("${spring.datasource.username}") String username,
@Value("${spring.datasource.password}") String password) {
MyDataSource dataSource = new MyDataSource();
dataSource.setUrl(url);
dataSource.setUsername(username);
dataSource.setPassword(password);
return dataSource;
}
@Autowired
private JdbcTemplate jdbcTemplate;
}
6.application.properties 配置
server.port=80
spring.datasource.url=jdbc:mysql://localhost:3306/aaaa?serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=123456
參考
基于Zookeeper動態(tài)切換數(shù)據(jù)源_BXS_0107的博客-CSDN博客
https://blog.csdn.net/newbie0107/article/details/105500579