lagou 爪哇 3-2 zookeeper 筆記

Zookeeper 簡介

分布式系統(tǒng)的協(xié)調(diào)工作就是通過某種方式蛛壳,讓每個節(jié)點的信息能夠同步和共享。這依賴于服務(wù)進程之間的通信所刀。通信方式有兩種:

  • 通過網(wǎng)絡(luò)進行信息共享
    這就像現(xiàn)實中衙荐,開發(fā)leader在會上把任務(wù)傳達下去,組員通過聽leader命令或者看leader的郵件知道自己要干什么浮创。當任務(wù)分配有變化時忧吟,leader會單獨告訴組員,或者再次召開會議斩披。信息通過人與人之間的直接溝通溜族,完成傳遞。

  • 通過共享存儲
    這就好比開發(fā)leader按照約定的時間和路徑垦沉,把任務(wù)分配表放到了svn上煌抒,組員每天去svn上拉取最新的任務(wù)分配表,然后干活乡话。其中svn就是共享存儲摧玫。更好一點的做法是,當svn文件版本更新時,觸發(fā)郵件通知诬像,每個組員再去拉取最新的任務(wù)分配表屋群。這樣做更好,因為每次更新坏挠,組員都能第一時間得到消
    息芍躏,從而讓自己手中的任務(wù)分配表永遠是最新的。此種方式依賴于中央存儲降狠。

ZooKeeper如何解決分布式系統(tǒng)面臨的問題
ZooKeeper對分布式系統(tǒng)的協(xié)調(diào)对竣,使用的是第二種方式,共享存儲榜配。其實共享存儲否纬,分布式應(yīng)用也需要
和存儲進行網(wǎng)絡(luò)通信。

注:Slave節(jié)點要想獲取ZooKeeper的更新通知蛋褥,需事先在關(guān)心的數(shù)據(jù)節(jié)點上設(shè)置觀察點临燃。

大多數(shù)分布式系統(tǒng)中出現(xiàn)的問題募闲,都源于信息的共享出了問題搓逾。如果各個節(jié)點間信息不能及時共享和同步款票,那么就會在協(xié)作過程中產(chǎn)生各種問題焰情。ZooKeeper解決協(xié)同問題的關(guān)鍵,就是在于保證分布式系統(tǒng)信息的一致性茶宵。

zookeeper的基本概念

Zookeeper是一個開源的分布式協(xié)調(diào)服務(wù)倍踪,其設(shè)計目標是將那些復(fù)雜的且容易出錯的分布式一致性服務(wù)封裝起來梯捕,構(gòu)成一個高效可靠的原語集匙瘪,并以一些簡單的接口提供給用戶使用铆铆。zookeeper是一個典型的分布式數(shù)據(jù)一致性的解決方案,分布式應(yīng)用程序可以基于它實現(xiàn)諸如數(shù)據(jù)訂閱/發(fā)布辆苔、負載均衡算灸、命名服務(wù)、集群管理驻啤、分布式鎖和分布式隊列等功能
基本概念

①集群角色
通常在分布式系統(tǒng)中,構(gòu)成一個集群的每一臺機器都有自己的角色荐吵,最典型的集群就是Master/Slave模式(主備模式)骑冗,此情況下把所有能夠處理寫操作的機器稱為Master機器,把所有通過異步復(fù)制方式獲取最新數(shù)據(jù)先煎,并提供讀服務(wù)的機器為Slave機器贼涩。

而在Zookeeper中,這些概念被顛覆了薯蝎。它沒有沿用傳遞的Master/Slave概念遥倦,而是引入了Leader、Follower、Observer三種角色袒哥。Zookeeper集群中的所有機器通過Leader選舉來選定一臺被稱為Leader的機器缩筛,Leader服務(wù)器為客戶端提供讀和寫服務(wù),除Leader外堡称,其他機器包括Follower和Observer,Follower和Observer都能提供讀服務(wù)瞎抛,唯一的區(qū)別在于Observer不參與Leader選舉過程,不參與寫操作的過半寫成功策略却紧,因此Observer可以在不影響寫性能的情況下提升集群的性能桐臊。

②會話(session)
Session指客戶端會話,一個客戶端連接是指客戶端和服務(wù)端之間的一個TCP長連接, Zookeeper對外的服務(wù)端口默認為2181,客戶端啟動的時候,首先會與服務(wù)器建立一個TCP連接,從第一次連接建立開始,客戶端會話的生命周期也開始了,通過這個連接,客戶端能夠心跳檢測與服務(wù)器保持有效的會話,也能夠向 Zookeeper服務(wù)器發(fā)送請求并接受響應(yīng),同時還能夠通過該連接接受來自服務(wù)器的 Watch事件通知。

③數(shù)據(jù)節(jié)點(Znode)
在談到分布式的時候,我們通常說的“節(jié)點”是指組成集群的每一臺機器晓殊。然而,在 ZooKeeper中,“節(jié)點分為兩類,第一類同樣是指構(gòu)成集群的機器,我們稱之為機器節(jié)點;第二類則是指數(shù)據(jù)模型中的數(shù)據(jù)單元,我們稱之為數(shù)據(jù)節(jié)點——ZNode断凶。 ZooKeeper將所有數(shù)據(jù)存儲在內(nèi)存中,數(shù)據(jù)模型是一棵樹
(ZNode Tree),由斜杠(/)進行分割的路徑,就是一個node,例如app/path.每個node上都會保存自己的數(shù)據(jù)內(nèi)容,同時還會保存一系列屬性信息。

④版本
剛剛我們提到巫俺,Zookeeper的每個Znode上都會存儲數(shù)據(jù)认烁,對于每個ZNode,Zookeeper都會為其維護一個叫作 Stat 的數(shù)據(jù)結(jié)構(gòu)识藤,Stat記錄了這個ZNode的三個數(shù)據(jù)版本砚著,分別是version(當前ZNode的版本)、cversion(當前ZNode子節(jié)點的版本)痴昧、aversion(當前ZNode的ACL版本)稽穆。

⑤Watcher(事件監(jiān)聽器)
Wathcer(事件監(jiān)聽器),是Zookeeper中一個很重要的特性赶撰,Zookeeper允許用戶在指定節(jié)點上注冊一些Watcher舌镶,并且在一些特定事件觸發(fā)的時候,Zookeeper服務(wù)端會將事件通知到感興趣的客戶端豪娜,該機制是Zookeeper實現(xiàn)分布式協(xié)調(diào)服務(wù)的重要特性

⑥ ACL
Zookeeper采用ACL(Access Control Lists)策略來進行權(quán)限控制餐胀,其定義了如下五種權(quán)限:
·CREATE:創(chuàng)建子節(jié)點的權(quán)限。
·READ:獲取節(jié)點數(shù)據(jù)和子節(jié)點列表的權(quán)限瘤载。
·WRITE:更新節(jié)點數(shù)據(jù)的權(quán)限否灾。
·DELETE:刪除子節(jié)點的權(quán)限。
·ADMIN:設(shè)置節(jié)點ACL的權(quán)限鸣奔。
其中需要注意的是墨技,CREATE和DELETE這兩種權(quán)限都是針對子節(jié)點的權(quán)限控制

環(huán)境搭建

Zookeeper的搭建方式

Zookeeper安裝方式有三種,單機模式和集群模式以及偽集群模式挎狸。

  • 單機模式:Zookeeper只運行在一臺服務(wù)器上扣汪,適合測試環(huán)境;
  • 集群模式:Zookeeper運行于一個集群上锨匆,適合生產(chǎn)環(huán)境崭别,這個計算機集群被稱為一個“集合體”
  • 偽集群模式:就是在一臺服務(wù)器上運行多個Zookeeper實例;

單機模式搭建:
zookeeper安裝以linux環(huán)境為例:

1、下載
首先我們下載穩(wěn)定版本的zookeeper http://zookeeper.apache.org/releases.html

配置單節(jié)點

$ tar -zxf zookeeper-3.4.6.tar.gz
$ cd zookeeper-3.4.6
$ mkdir data

cd conf
mv zoo_sample.cfg zoo.cfg

vi conf/zoo.cfg
編輯文件設(shè)置 dataDir = /path/to/zookeeper/data

$ bin/zkServer.sh start

啟動CLI

$ bin/zkCli.sh
$ bin/zkCli.sh  -server  需要連接的ip:需要連接的port

windows 下啟用 zk

$ zkCli.cmd
$ zkCli.cmd  -server  需要連接的ip:需要連接的port

例如 zkCli.cmd 106.75.105.152, 不加端口茅主,默認為 2181

若報錯, 則檢查是否防火墻攔截了.

Opening socket connection to server 192.168.153.12/192.168.153.12:2181.Will not attempt to authentic

停止ZooKeeper服務(wù)器
連接服務(wù)器并執(zhí)行所有操作后舞痰,可以使用以下命令停止zookeeper服務(wù)器。

$ bin/zkServer.sh stop

在 Zookeeper中,每一個數(shù)據(jù)節(jié)點都是一個 Znode,上圖根目錄下有兩個節(jié)點,分別是:app1和app2,其中app1下面又有三個子節(jié)點所有 Znode!按層次化進行組織,形成這么一顆樹, Znodel的節(jié)點路徑標識方式和 Unix文件系統(tǒng)路徑非常相似,都是由一系列使用斜杠(/)進行分割的路徑表示,開發(fā)人員可以向這個節(jié)點寫入數(shù)據(jù),也可以在這個節(jié)點下面創(chuàng)建子節(jié)點暗膜。

默認端口為 2181

配置偽集群模式

創(chuàng)建 data 文件夾 和 logs 文件夾

clientPort=2181
# 配置快照文件存放的目錄
dataDir=/zkcluster/zookeeper01/data 
# 配置日志文件存放的目錄
dataLogDir=/zkcluster/zookeeper01/data/logs

clientPort=2182 
dataDir=/zkcluster/zookeeper02/data 
dataLogDir=/zkcluster/zookeeper02/data/logs

clientPort=2183 
dataDir=/zkcluster/zookeeper03/data 
dataLogDir=/zkcluster/zookeeper03/data/logs

data 下創(chuàng)建 myid 文件, 內(nèi)容分別為 1, 2, 3 (數(shù)字可以依次累增), 這個文件的作用就是記錄zk的id

server.服務(wù)器ID=服務(wù)器IP地址:服務(wù)器之間通信端口:服務(wù)器之間投票選舉端口

server.1=10.211.55.4:2881:3881 
server.2=10.211.55.4:2882:3882 
server.3=10.211.55.4:2883:3883 
touch myid

分別向三臺服務(wù)器寫入數(shù)字 1 2 和 3.

啟動集群
分別啟動這三臺服務(wù)器

$ bin/zkServer.sh start

查看狀態(tài)

./zkServer.sh status

Zookeeper基本使用

ZooKeeper系統(tǒng)模型

ZooKeeper數(shù)據(jù)模型Znode在ZooKeeper中匀奏,數(shù)據(jù)信息被保存在一個個數(shù)據(jù)節(jié)點上,這些節(jié)點被稱為 ZNode学搜。ZNode是Zookeeper中最小數(shù)據(jù)單位娃善,在ZNode下面又可以再掛 ZNode,這樣一層層下去就形成了一個層次化命名空間ZNode樹瑞佩,我們稱為ZNode Tree聚磺,它采用了類似文件系統(tǒng)的層級樹狀結(jié)構(gòu)進行管理。見下圖示例:

四種節(jié)點類型

  • 持久節(jié)點
  • 持久順序節(jié)點
  • 臨時節(jié)點
  • 臨時順序節(jié)點

事務(wù)ID

ZNode 狀態(tài)信息

ACL
我們可以從三個方面來理解ACL機制:權(quán)限模式( Scheme)炬丸、授權(quán)對象(ID)瘫寝、權(quán)限( Permission),通常使用" scheme:id: permission"來標識一個有效的ACL信息。

權(quán)限模式: Scheme
權(quán)限模式用來確定權(quán)限驗證過程中使用的檢驗策略

授權(quán)對象:ID
授權(quán)對象指的是權(quán)限賦予的用戶或一個指定實體,例如IP地址或是機器等稠炬。在不同的權(quán)限模式下,授權(quán)對象是不同的,表中列出了各個權(quán)限模式和授權(quán)對象之間的對應(yīng)關(guān)系

權(quán)限
權(quán)限就是指那些通過權(quán)限檢査后可以被允許執(zhí)行的操作焕阿。在 Zookeeper中,所有對數(shù)據(jù)的操作權(quán)限分為以下五大類

  • CREATE(C):數(shù)據(jù)節(jié)點的創(chuàng)建權(quán)限,允許授權(quán)對象在該數(shù)據(jù)節(jié)點下創(chuàng)建子節(jié)點。
  • DELETE(D子節(jié)點的刪除權(quán)限,允許授權(quán)對象刪除該數(shù)據(jù)節(jié)點的子節(jié)點首启。?
  • READ(R):數(shù)據(jù)節(jié)點的讀取權(quán)限,允許授權(quán)對象訪問該數(shù)據(jù)節(jié)點并讀取其數(shù)據(jù)內(nèi)容或子節(jié)點列表等暮屡。
  • WRTE(W):數(shù)據(jù)節(jié)點的更新權(quán)限,允許授權(quán)對象對該數(shù)據(jù)節(jié)點進行更新操作。
  • ADMIN(A):數(shù)據(jù)節(jié)點的管理權(quán)限,允許授權(quán)對象對該數(shù)據(jù)節(jié)點進行ACL相關(guān)的設(shè)置操作毅桃。

創(chuàng)建節(jié)點
使用 creates命令,可以創(chuàng)建一個 Zookeeper節(jié)點,如

create [-s][-e] path data acl

其中,-s-e 分別指定節(jié)點特性,順序或臨時節(jié)點,若不指定,則創(chuàng)建持久節(jié)點;ac1用來進行權(quán)限控制褒纲。

  1. 創(chuàng)建永久(持久)節(jié)點
    使用 create /zk-permanent 123命令創(chuàng)建zk- permanent永久節(jié)點
[zk: Loca thost: 2181(CONNECTED) 1] create/zk-permanent 123
Created/Zk-permanent
[zk: localhost: 2181(CONNECTED) 2] Ls/
[zk-permanent, zookeeper, zk-test00000000041
  1. 創(chuàng)建持久順序節(jié)點
    使用 create -s /zk-test 123 命令創(chuàng)建zk-test順序節(jié)點
lzk: Localhost: 2181(CONNECTED)0] create-s/zk-test 123
Created/Zk-testo000000004

執(zhí)行完后,就在根節(jié)點下創(chuàng)建了一個叫做 / zk-test 的節(jié)點,該節(jié)點內(nèi)容就是123,同時可以看到創(chuàng)建的
zk-test 節(jié)點后面添加了一串數(shù)字以示區(qū)別

  1. 創(chuàng)建臨時節(jié)點
    使用 create -e /zk-temp 123命令創(chuàng)建zk-temp臨時節(jié)點

  2. 創(chuàng)建臨時順序節(jié)點
    create -e -s /zk-temp 123

可以看到永久節(jié)點不同于順序節(jié)點, 不會自動在后面添加一串數(shù)字

quit 退出客戶端

讀取節(jié)點
與讀取相關(guān)的命令有 ls 命令和 get 命令

ls 命令可以列出 Zookeeper指定節(jié)點下的所有子節(jié)點,但只能查看指定節(jié)點下的第一級的所有子節(jié)點;

ls path

其中,path表示的是指定數(shù)據(jù)節(jié)點的節(jié)點路徑
get命令可以獲取 Zookeeper:指定節(jié)點的數(shù)據(jù)內(nèi)容和屬性信息。

get path

若獲取根節(jié)點下面的所有子節(jié)點,使用 ls 命令即可

若想獲取/zk-permanente的數(shù)據(jù)內(nèi)容和屬性,可使用如下命令:get /zk-permanent

更新節(jié)點
使用set命令,可以更新指定節(jié)點的數(shù)據(jù)內(nèi)容,用法如下

set path data [version]

其中,data就是要更新的新內(nèi)容, version表示數(shù)據(jù)版本,在 zookeeper中,節(jié)點的數(shù)據(jù)是有版本概
念的,這個參數(shù)用于指定本次更新操作是基于 Inode的哪一個數(shù)據(jù)版本進行的,如將/zk- permanent節(jié)
點的數(shù)據(jù)更新為455,可以使用如下命令:set /zk-permanent 456

zk: Loca Lhost: 2181( CONNECTED)3] set /zk-permanent 456
Iczxid 0X12
Ctime Sat Mar 07 18: 11: 14 CST 2020
Imzxid =0x13
Mtime =Sat Mar 07 18: 13: 48 CST 2020
Zxid = 0x12
cversion =O
ldataversion 1
laclversion =0
ephemera lowner 0x0
ldatalength =3
numchildren =0

刪除節(jié)點
使用 delete 命令可以刪除 Zookeeper上的指定節(jié)點,用法如下
delete path [version]
其中 version也是表示數(shù)據(jù)版本,使用 delete /zk-permanent 命令即可刪除 zk-permanent節(jié)點

zk 的 Java 客戶端工具

zk 的 Java 客戶端工具 curator

創(chuàng)建節(jié)點

獲取數(shù)據(jù)

// 普通查詢 
client.getData().forPath(path); 
// 包含狀態(tài)查詢 
Stat stat = new Stat(); 
client.getData().storingStatIn(stat).forPath(path);

更新數(shù)據(jù)

// 普通更新
client.setData().forPath(path,"新內(nèi)容".getBytes());
// 指定版本更新 
client.setData().withVersion(1).forPath(path);

刪除數(shù)據(jù)

配置存儲

命名服務(wù)

如果加了排它鎖則只對一個事務(wù)可見, 若加上共享鎖,則對所有事務(wù)可見.

作業(yè)

編程題一:
在基于 Netty 的自定義RPC的案例基礎(chǔ)上钥飞,進行改造莺掠。基于 Zookeeper 實現(xiàn)簡易版服務(wù)的注冊與發(fā)現(xiàn)機制

要求完成改造版本:

  1. 啟動 2 個服務(wù)端读宙,可以將IP及端口信息自動注冊到 Zookeeper
  2. 客戶端啟動時彻秆,從Zookeeper中獲取所有服務(wù)提供端節(jié)點信息,客戶端與每一個服務(wù)端都建立連接
  3. 某個服務(wù)端下線后结闸,Zookeeper注冊列表會自動剔除下線的服務(wù)端節(jié)點掖棉,客戶端與下線的服務(wù)端斷開連接
  4. 服務(wù)端重新上線,客戶端能感知到膀估,并且與重新上線的服務(wù)端重新建立連接

編程題二:
基于作業(yè)一的基礎(chǔ)上,實現(xiàn)基于 Zookeeper 的簡易版負載均衡策略

要求完成改造版本:

  1. Zookeeper 記錄每個服務(wù)端的最后一次響應(yīng)時間耻讽,有效時間為 5秒察纯,5s內(nèi)如果該服務(wù)端沒有新的請求,響應(yīng)時間清零或失效
  2. 當客戶端發(fā)起調(diào)用,每次都選擇最后一次響應(yīng)時間短的服務(wù)端進行服務(wù)調(diào)用饼记,如果時間一致香伴,隨機選取一個服務(wù)端進行調(diào)用,從而實現(xiàn)負載均衡

編程題三:
基于Zookeeper實現(xiàn)簡易版配置中心

要求實現(xiàn)以下功能:

  1. 創(chuàng)建一個 Web 項目具则,將數(shù)據(jù)庫連接信息交給Zookeeper配置中心管理即纲,即:當項目Web項目啟動時,從 Zookeeper 進行 MySQL 配置參數(shù)的拉取
  2. 要求項目通過數(shù)據(jù)庫連接池訪問MySQL(連接池可以自由選擇熟悉的)
  3. 當 Zookeeper 配置信息變化后Web項目自動感知博肋,正確釋放之前連接池低斋,創(chuàng)建新的連接池

作業(yè)資料說明:
1、提供資料:3個代碼工程匪凡、驗證及講解視頻膊畴。(倉庫中只有本次作業(yè)內(nèi)容)

2、講解內(nèi)容包含:題目分析病游、實現(xiàn)思路唇跨、代碼講解。

3衬衬、效果視頻驗證:
3.1 作業(yè)1:服務(wù)端的上線與下線买猖,客戶端能動態(tài)感知,并能重新構(gòu)成負載均衡滋尉。
3.2 作業(yè)2:作業(yè)完成情況下玉控,選擇性能好的服務(wù)器處理(響應(yīng)時間短的服務(wù)器即為性能好)。Zookeeper記錄客戶端響應(yīng)有效時間為5s兼砖,超時判定該客戶端失效奸远。
3.3 作業(yè)3:Zookeeper配置中心,web訪問數(shù)據(jù)庫需要從Zookeeper獲取連接資源讽挟。當Zookeeper配置發(fā)生改變懒叛,web自動切換到新的連接資源,保持正常訪問耽梅。

作業(yè)1
新增 NodeChangeListener 類

public interface NodeChangeListener {

    /**
     *
     * @param serviceName 服務(wù)名稱
     * @param serviceList  服務(wù)名稱對應(yīng)節(jié)點下的所有子節(jié)點, 目前沒有用到
     * @param pathChildrenCacheEvent
     */
    void notify(String serviceName, List<String> serviceList, PathChildrenCacheEvent pathChildrenCacheEvent);

}

將 zk 的行為抽象成接口

public interface RpcRegistryHandler extends NodeChangeListener {

    /**
     * 服務(wù)端進行調(diào)用
     *
     * @param service
     * @param ip
     * @param port
     * @return
     */
    boolean registry(final String service, final String ip, final int port);

    /**
     * 客戶端進行調(diào)用
     *
     * @param service
     * @return
     */
    List<String> discovery(final String service);

    void addListener(NodeChangeListener service);

    void destroy();
}

ConfigKeeper 配置類

public class ConfigKeeper {

    /**
     * netty 的端口號
     */
    private int nettyPort;

    /**
     * zk 地址: ip + 端口
     */
    private String zkAddr;

    /**
     * 主動上報時間薛窥,單位 秒
     */
    private int interval;

    /**
     * 區(qū)分是客戶端 還是 server 端, true 是服務(wù)端眼姐, false 是客戶端
     */
    private boolean providerSide;

    // 單例類诅迷,setter 和 getter 方法
}

新增 RpcResponse 類

package com.lagou;

public class RpcResponse  {

    /**
     * 響應(yīng)ID
     */
    private String requestId;
    /**
     * 錯誤信息
     */
    private String error;
    /**
     * 返回的結(jié)果
     */
    private Object result;

    // setter 和 getter 方法, toString方法 
}

RpcServerHandler 類,這次主要對 channelRead 的方法內(nèi)容作了調(diào)整

    /**
     * 服務(wù)端將數(shù)據(jù) 寫入 客戶端众旗, 繼續(xù)傳遞下去
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 222222
        RpcRequest request = (RpcRequest) msg;
        final RpcResponse rpcResponse = new RpcResponse();
        rpcResponse.setRequestId(request.getRequestId());
        System.out.println("111 接收到" + request.getRequestId());
        rpcResponse.setResult(handler(request));
        // 3333333
        ctx.writeAndFlush(rpcResponse);
    }

服務(wù)端 rpc -server 用到的配置類 RpcServerConfig

public class RpcServerConfig {

    private String nettyHost;

    private int nettyPort;

    private int delay;

    /**
     * 是否是服務(wù)端
     */
    private boolean providerSide;

    /**
     * 應(yīng)用的名稱
     */
    private String applicationName;

    private Map<String, Class> services;
   
    // setter 和 getter 方法
}

RpcServer 類
自身 implements InitializingBean, DisposableBean 接口
Autowired 了 RpcRegistryFactory 對象
主要關(guān)注 afterPropertiesSet 和 destroy 方法即可

@Override
    public void afterPropertiesSet() throws Exception {
        this.initRpcServerConfig();
        this.startServer();
    }

    @Override
    public void destroy() {
        bossGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
    }

這里牽涉到了RpcRegistryFactory

/**
 * 注冊中心工廠類
 */
@Component
public class RpcRegistryFactory implements FactoryBean<RpcRegistryHandler>, DisposableBean {

    private RpcRegistryHandler rpcRegistryHandler;

    @Override
    public RpcRegistryHandler getObject()  {

        if (this.rpcRegistryHandler == null) {
            rpcRegistryHandler = new ZkRegistryHandler(ConfigKeeper.getInstance().getZkAddr());
        }
        return rpcRegistryHandler;
    }

    @Override
    public Class<?> getObjectType() {
        System.out.println("RpcRegistryFactory ### getObjectType.....");
        return RpcRegistryHandler.class;
    }

    @Override
    public void destroy() {
        System.out.println("RpcRegistryFactory ### destroy.....");
        rpcRegistryHandler.destroy();
    }
}

用于設(shè)計參數(shù)的 ProviderLoader

public class ProviderLoader {

    private ProviderLoader() {
    }

    /**
     * 返回類的全路徑名 -> 該類的 class
     * @return
     */
    public static Map<String, Class> getInstanceCacheMap() {
        Map<String, Class> services = new HashMap<>();
        services.put(IUserService.class.getName(), IUserService.class);
        return services;
    }
}

ZkRegistryHandler 是對 RpcRegistryHandler 接口的 zk 實現(xiàn)罢杉。

public class ZkRegistryHandler implements RpcRegistryHandler {

    private static final String ZK_PATH_SPLITER = "/";
    private static final String LAGOU_EDU_RPC_ZK_ROOT = ZK_PATH_SPLITER + "lg-rpc-provider" + ZK_PATH_SPLITER;
    private List<NodeChangeListener> listenerList = new ArrayList<>();
    private final String url;

    private final CuratorFramework client;
    private volatile boolean closed;
    /**
     * 子節(jié)點列表
     */
    private List<String> serviceList;
    private static final ScheduledExecutorService REPORT_WORKER = Executors.newScheduledThreadPool(5);

    public ZkRegistryHandler(final String zkPath) {
        url = zkPath;
        this.client = CuratorFrameworkFactory.builder()
                .connectString(zkPath)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();

        client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            @Override
            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                if (ConnectionState.CONNECTED.equals(connectionState)) {
                    System.out.println("注冊中心連接成功");
                }
            }
        });
        client.start();

        // 定時上報
        final ConfigKeeper configKeeper = ConfigKeeper.getInstance();
        final boolean providerSide = configKeeper.isProviderSide();
        final int interval = configKeeper.getInterval();
        if (!providerSide && interval > 0) {
            REPORT_WORKER.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    System.out.println("我是一個定時任務(wù)");
                    // RequestMetri
                }
            }, interval, interval, TimeUnit.SECONDS);
        }
    }

    /**
     * 服務(wù)端注冊用到的方法
     * @param serviceName
     * @param nettyHost
     * @param nettyPort
     * @return
     */
    @Override
    public boolean registry(final String serviceName, final String nettyHost, final int nettyPort) {
        String zkPath = providePath(serviceName);
        if (!exists(zkPath)) {
            create(zkPath, false);
        }

        // /lg-rpc-provider/com.lagou.server.IUserService/provider/localhost:8999
        String instancePath = zkPath + ZK_PATH_SPLITER + nettyHost + ":" + nettyPort;
        create(instancePath, true);
        return true;
    }

    /**
     * 客戶端查找服務(wù)的方法
     *
     * @param serviceName
     * @return
     */
    @Override
    public List<String> discovery(final String serviceName) {
        final String path = providePath(serviceName);
        if (serviceList == null || serviceList.isEmpty()) {
            System.out.println("首次查找地址");
            try {
                serviceList = client.getChildren().forPath(path);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        this.registryWatch(serviceName, path);
        return serviceList;
    }

    @Override
    public void addListener(NodeChangeListener listener) {
        listenerList.add(listener);
    }

    @Override
    public void destroy() {
        client.close();
    }

    @Override
    public void notify(String children, List<String> serviceList, PathChildrenCacheEvent pathChildrenCacheEvent) {
        for (NodeChangeListener nodeChangeListener : listenerList) {
            nodeChangeListener.notify(children, serviceList, pathChildrenCacheEvent);
        }
    }
    private void create(final String path, final boolean ephemeral) {
        CreateMode createMode;
        if (ephemeral) {
            createMode = CreateMode.EPHEMERAL;
        } else {
            createMode = CreateMode.PERSISTENT;
        }
        try {
            client.create().creatingParentsIfNeeded().withMode(createMode).forPath(path);
        } catch (KeeperException.NodeExistsException e) {
            // do nothing
            System.out.println("該路徑已存在" + path);
        }
        catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    private boolean exists(final String path) {
        try {
            if (client.checkExists().forPath(path) != null) {
                return true;
            }
        } catch (KeeperException.NoNodeException e) {
            // do nothing
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return false;
    }

    /**
     * 設(shè)置監(jiān)聽的方法
     *
     * @param serviceName
     * @param path
     */
    private void registryWatch(final String serviceName, final String path) {
        PathChildrenCache nodeCache = new PathChildrenCache(client, path, true);
        try {
            nodeCache.getListenable().addListener((client, pathChildrenCacheEvent) -> {
                // 更新本地緩存
                serviceList = client.getChildren().forPath(path);

                listenerList.forEach(nodeChangeListener -> {
                    System.out.println("節(jié)點變化");
                    nodeChangeListener.notify(serviceName, serviceList, pathChildrenCacheEvent);
                });
            });

            nodeCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        } catch (Exception e) {
        }
    }

    /**
     * 返回 /lg-rpc-provider/com.lagou.server.IUserService/provider
     * @param serviceName
     * @return
     */
    private String providePath(String serviceName) {
        return LAGOU_EDU_RPC_ZK_ROOT + serviceName + ZK_PATH_SPLITER + "provider";
    }

    private String metricsPath() {
        return LAGOU_EDU_RPC_ZK_ROOT + "metrics";
    }
}

rpc-server 的啟動類

@SpringBootApplication
public class MyApplication {

    public static void main(String[] args) {
        // ["localhost:2181", "8999"]
        // ["localhost:2181", "9000"]
        final String zkPath = args[0];
        final int nettyPort = Integer.parseInt(args[1]);
        // 將IP及端口信息自動注冊到 Zookeeper

        ConfigKeeper configKeeper = ConfigKeeper.getInstance();
        configKeeper.setProviderSide(true);
        configKeeper.setInterval(5);
        configKeeper.setNettyPort(nettyPort);
        configKeeper.setZkAddr(zkPath);
        System.out.println(configKeeper);

        SpringApplication.run(MyApplication.class, args);

        // 可以通過 ls /lg-rpc-provider/com.lagou.server.IUserService/provider 查看節(jié)點信息
    }
}

分別為 netty 啟動 8888 和 8900 端口


接下來講解 rpc-client

UserClientHandler 類可以復(fù)用

新增 RpcClient
主要對外暴露了 initClient 和 send 方法

// 2. 初始化netty客戶端(創(chuàng)建連接池 bootstrap, 設(shè)置 BootStrap 連接服務(wù)器)
    public void initClient(String serviceClassName) throws InterruptedException {
        // 創(chuàng)建連接池
        this.group = new NioEventLoopGroup();
        // 創(chuàng)建客戶端啟動類
        Bootstrap bootstrap = new Bootstrap();
        // 配置啟動引導(dǎo)類
        bootstrap.group(group)
                // 通道類型為 NIO
                .channel(NioSocketChannel.class)
                // 設(shè)置請求協(xié)議為 tcp
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                // 監(jiān)聽 channel 并初始化
                .handler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // 獲取管道對象
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new RpcEncoder(RpcRequest.class, new JSONSerializer()));
                        pipeline.addLast(new RpcDecoder(RpcResponse.class, new JSONSerializer()));
                        // 自定義事件處理器
                        pipeline.addLast(new UserClientHandler());
                    }
                });
        this.channel = bootstrap.connect(this.nettyIp, this.nettyPort).sync().channel();

        if (!isValidate()) {
            close();
            return;
        }
        System.out.println("啟動客戶端" + serviceClassName + ", ip = " + this.nettyIp + ", port = " + nettyPort);
    }

    public Object send(RpcRequest request) throws InterruptedException, ExecutionException {
        // 統(tǒng)計請求時間
        RequestMetrics.getInstance().put(nettyIp, this.nettyPort, request.getRequestId());
        return this.channel.writeAndFlush(request).sync().get();
    }

新增 RpcConsumer 類

主要有用的方法有構(gòu)造方法,createProxy的方法贡歧,notify為類自身 實現(xiàn) NodeChangeListener 接口的方法(因為構(gòu)造時會this.rpcRegistryHandler.addListener(this);)滩租。

public class RpcConsumer implements NodeChangeListener {

    private final RpcRegistryHandler rpcRegistryHandler;

    private final Map<String, Class> serviceMap;

    /**
     * 服務(wù)名 -> List<RpcClient>
     */
    private final Map<String, List<RpcClient>> CLIENT_POOL = new HashMap<>();
    private LoadBalanceStrategy balanceStrategy = new RandomLoadBalance();

    /**
     * 初始化
     * @param rpcRegistryHandler
     * @param instanceCacheMap
     */
    public RpcConsumer(final RpcRegistryHandler rpcRegistryHandler, final Map<String, Class> instanceCacheMap) {
        this.rpcRegistryHandler = rpcRegistryHandler;
        this.serviceMap = instanceCacheMap;

        // 開始自動注冊消費者邏輯: accept 方法
        serviceMap.forEach((className, clazz) -> {
            List<RpcClient> rpcClients = CLIENT_POOL.get(className);
            if (rpcClients == null) {
                rpcClients = new ArrayList<>();
            }
            // 127.0.0.1:8999 127.0.0.1:9000
            final List<String> discovery = this.rpcRegistryHandler.discovery(className);
            for (String s : discovery) {
                // s -> rpcClient
                final String[] split = s.split(":");
                String nettyIp = split[0];
                int nettyPort = Integer.parseInt(split[1]);
                final RpcClient rpcClient = new RpcClient(nettyIp, nettyPort);

                try {
                    rpcClient.initClient(className);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                rpcClients.add(rpcClient);
                CLIENT_POOL.put(className, rpcClients);
            }
        });
        this.rpcRegistryHandler.addListener(this);
    }

    // 4. 編寫一個方法赋秀,使用 jdk 動態(tài)代理對象
    @SuppressWarnings("unchecked")
    public <T> T createProxyEnhance(final Class<T> serverClass) {
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[]{serverClass}, (proxy, method, args) -> {

                    final String serverClassName = serverClass.getName();
                    // 封裝
                    final RpcRequest request = new RpcRequest();
                    final String requestId = UUID.randomUUID().toString().substring(0, 7);

                    String className = method.getDeclaringClass().getName();
                    String methodName = method.getName();

                    Class<?>[] parameterTypes = method.getParameterTypes();

                    request.setRequestId(requestId);
                    request.setClassName(className);
                    request.setMethodName(methodName);
                    request.setParameterTypes(parameterTypes);
                    request.setParameters(args);

                    System.out.println("******************************\n請求id = " + requestId + ", 請求方法名 = " + methodName + ", 請求參數(shù) = " + Arrays.toString(args));
                    RpcClient rpcClient = balanceStrategy.route(CLIENT_POOL, serverClassName);
                    if (rpcClient == null) {
                        System.out.println("沒找到對應(yīng)服務(wù)端,返 NULL");
                        return null;
                    }
                    System.out.println(request);
                    // request 最終會客戶端發(fā)送給服務(wù)端進行消費
                    return rpcClient.send(request);
                });
    }

    /**
     * 監(jiān)聽臨時節(jié)點的變化
     *
     * @param service 服務(wù)名稱
     * @param serviceList  服務(wù)名稱對應(yīng)節(jié)點下的所有子節(jié)點
     * @param pathChildrenCacheEvent
     */
    @Override
    public void notify(final String service, final List<String> serviceList,
                       final PathChildrenCacheEvent pathChildrenCacheEvent) {
        // 取出變化的節(jié)點名稱律想, 例如為 /lg-rpc-provider/com.lagou.server.IUserService/provider/localhost:9000
        final String path = pathChildrenCacheEvent.getData().getPath();
        System.out.println("變化節(jié)點的路徑: " + path + ", 變化的類型: " + pathChildrenCacheEvent.getType());
        // 分離出 ip:port 的組合猎莲。
        final String instanceConfig = path.substring(path.lastIndexOf("/") + 1);
        System.out.println("instanceConfig: " + instanceConfig);
        final String[] address = instanceConfig.split(":");
        System.out.println("address: " + address);
        final String nettyIp = address[0];
        final int nettyPort = Integer.parseInt(address[1]);

        List<RpcClient> rpcClients = CLIENT_POOL.get(service);
        switch (pathChildrenCacheEvent.getType()) {
            // 增加節(jié)點
            case CHILD_ADDED:
            case CONNECTION_RECONNECTED:
                if (rpcClients == null) {
                    rpcClients = new ArrayList<>();
                }

                final RpcClient rpcClient = new RpcClient(nettyIp, nettyPort);
                try {
                    rpcClient.initClient(service);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                rpcClients.add(rpcClient);
                // 節(jié)點耗時統(tǒng)計
                RequestMetrics.getInstance().addNode(nettyIp, nettyPort);
                System.out.println("新增節(jié)點" + instanceConfig);
                break;
            // 增加節(jié)點
            case CHILD_REMOVED:
            case CONNECTION_SUSPENDED:
            case CONNECTION_LOST:
                if (rpcClients != null) {
                    for (RpcClient client : rpcClients) {
                        if (client.getNettyIp().equals(nettyIp) && client.getNettyPort() == nettyPort) {
                            rpcClients.remove(client);
                            // 節(jié)點耗時統(tǒng)計
                            RequestMetrics.getInstance().remoteNode(nettyIp, nettyPort);
                            System.out.println("移除節(jié)點" + instanceConfig);
                            break;
                        }
                    }
                }
                break;
        }
    }
}

最后再講講 ConsumerBootStrap, 該類基本沒啥改動

public class ConsumerBootStrap {

    public static void main(final String[] args) throws Exception {
        final ConfigKeeper configKeeper = ConfigKeeper.getInstance();
        configKeeper.setZkAddr(args[0]);
        // 之后會啟動一個定時的線程池,每 5s 上傳到注冊中心
        configKeeper.setInterval(5);
        configKeeper.setProviderSide(false);

        final RpcRegistryHandler rpcRegistryHandler = new ZkRegistryHandler(configKeeper.getZkAddr());
        System.out.println("客戶端 Zookeeper session established.");

        // 最后一步
        final RpcConsumer consumer = new RpcConsumer(rpcRegistryHandler, ProviderLoader.getInstanceCacheMap());
        final IUserService userService = consumer.createProxyEnhance(IUserService.class);
        while (true) {
            Thread.sleep(4900);
            final String result = userService.sayHello("are you ok?");
            // 恒為 null
            System.out.println("返回 = " + result);
        }
    }
}

作業(yè)2

1.消費者每次請求完成時更新最后一次請求耗時和系統(tǒng)時間
這部分工作主要在客戶端做技即。

首先介紹一下這次用到的兩個類

package com.lagou.boot;

public class Metrics {

    private String nettyIp;
    private int nettyPort;
    private long start;
    private Long cost;

    public Metrics(String nettyIp, int nettyPort, long start, Long cost) {
        this.nettyIp = nettyIp;
        this.nettyPort = nettyPort;
        this.start = start;
        this.cost = cost;
    }

    public Metrics(String nettyIp, int nettyPort, long start) {
        this(nettyIp, nettyPort, start, null);
    }

    public String getNettyIp() {
        return nettyIp;
    }

    public void setNettyIp(String nettyIp) {
        this.nettyIp = nettyIp;
    }

    public int getNettyPort() {
        return nettyPort;
    }

    public void setNettyPort(int nettyPort) {
        this.nettyPort = nettyPort;
    }

    public long getStart() {
        return start;
    }

    public void setStart(long start) {
        this.start = start;
    }

    public Long getCost() {
        return cost;
    }

    public void setCost(Long cost) {
        this.cost = cost;
    }
}

RequestMetrics 類

COST_TIME_MAP變量 ip:端口 -》 耗時

REQUEST_ID_MAP變量 requestId -> ip + 端口 + 起始時間戳 + 耗時

calculate 方法用于 根據(jù)requestId 進行耗時統(tǒng)計

統(tǒng)計請求時間 在RpcClient的 send 方法中進行

public class RequestMetrics {

    /**
     * ip:端口 -》 耗時
     */
    private static final ConcurrentHashMap<String, Long> COST_TIME_MAP = new ConcurrentHashMap<>();

    /**
     * requestId -> ip + 端口 + 起始時間戳 + 耗時
     * 每個 requestId 用完一次后就會被銷毀
     */
    private static final ConcurrentHashMap<String, Metrics> REQUEST_ID_MAP = new ConcurrentHashMap<>();

    private static final RequestMetrics requestMetrics = new RequestMetrics();

    public ConcurrentHashMap<String, Long> getMetricMap() {
        return COST_TIME_MAP;
    }

    private RequestMetrics() {
    }

    public static RequestMetrics getInstance() {
        return requestMetrics;
    }

    public void addNode(String nettyIp, int nettyPort) {
        COST_TIME_MAP.put(nettyIp + ":" + nettyPort, 0L);
    }

    public void remoteNode(String nettyIp, int nettyPort) {
        COST_TIME_MAP.remove(nettyIp + ":" + nettyPort);
    }

    /**
     * 響應(yīng)時放入著洼, 根據(jù)requestId 進行耗時統(tǒng)計
     * @param requestId
     */
    public void calculate(String requestId) {
        final Metrics metrics = REQUEST_ID_MAP.get(requestId);
        Long cost = System.currentTimeMillis() - metrics.getStart();
        COST_TIME_MAP.put(metrics.getNettyIp() + ":" + metrics.getNettyPort(), cost);
        REQUEST_ID_MAP.remove(requestId);
    }

    /**
     * 獲取所有節(jié)點耗時統(tǒng)計
     */
    public List<Metrics> getAllInstances() {
        List<Metrics> result = new ArrayList<>();
        COST_TIME_MAP.forEach((url, aLong) -> {
            String[] split = url.split(":");
            result.add(new Metrics(split[0], Integer.parseInt(split[1]), aLong));
        });
        return result;
    }

    /**
     * 請求時放入
     * @param nettyIp
     * @param nettyPort
     * @param requestId
     */
    public void put(String nettyIp, int nettyPort, String requestId) {
        REQUEST_ID_MAP.put(requestId, new Metrics(nettyIp, nettyPort, System.currentTimeMillis(), null));
    }
}

2.消費者定時在啟動時創(chuàng)建定時線程池,每隔5s自動上報而叼,更新Zookeeper臨時節(jié)點的值

ConsumerBootStrap 入口有一個參數(shù)配置

        // 之后會啟動一個定時的線程池身笤,每 5s 上傳到注冊中心
        configKeeper.setInterval(5);

ZkRegistryHandler 會開啟一個 ScheduledExecutorService 線程池服務(wù)

RequestMetrics 的

       // 定時上報
        final ConfigKeeper configKeeper = ConfigKeeper.getInstance();
        final boolean providerSide = configKeeper.isProviderSide();
        final int interval = configKeeper.getInterval();
        if (!providerSide && interval > 0) {
            REPORT_WORKER.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    System.out.println("我是一個定時任務(wù)");
                    // ...
                }
            }, interval, interval, TimeUnit.SECONDS);
        }
  1. 每次上報時判斷當前時間距離最后一次請求是否超過5s,超過5s則需要刪除Zookeeper上面的內(nèi)容

這里介紹下 RequestMetrics 的 getAllInstances() 方法, 如果 5 秒內(nèi)沒有響應(yīng)清空請求時間澈歉。

/**
     * 獲取所有節(jié)點耗時統(tǒng)計
     */
    public List<Metrics> getAllInstances() {
        List<Metrics> result = new ArrayList<>();
        COST_TIME_MAP.forEach((url, aLong) -> {
            String[] split = url.split(":");
            result.add(new Metrics(split[0], Integer.parseInt(split[1]), aLong));
        });
        return result;
    }

接下來簡單說一下負載均衡策略展鸡,這里主要涉及到了使用那個客戶端進行服務(wù)的請求。

public abstract class AbstractLoadBalanceStrategy implements LoadBalanceStrategy{

    @Override
    public RpcClient route(Map<String, List<RpcClient>> clientPool, String serverClassName) {

        List<RpcClient> rpcClients = clientPool.get(serverClassName);
        if (null == rpcClients) return null;
        return doSelect(rpcClients);
    }

    protected abstract RpcClient doSelect(List<RpcClient> rpcClients);
}

MinCostLoadBalance (該類未經(jīng)驗證)

public class MinCostLoadBalance extends AbstractLoadBalanceStrategy {

    @Override
    protected RpcClient doSelect(final List<RpcClient> rpcClients) {
        ConcurrentHashMap<String, Long> metricMap = RequestMetrics.getInstance().getMetricMap();

        RpcClient minCostRpcClient = rpcClients.get(0);
        final Long minLong = metricMap.get(minCostRpcClient.getNettyIp() + minCostRpcClient.getNettyPort());

        for (int i = 1; i < rpcClients.size(); i++) {
            RpcClient rpcClient = rpcClients.get(i);

            String nettyIp = rpcClient.getNettyIp();
            int nettyPort = rpcClient.getNettyPort();

            // 取出最小響應(yīng)時間的客戶端埃难,并進行調(diào)用
            Long aLong = metricMap.get(nettyIp + nettyPort);
            if (aLong != null && aLong < minLong) {
                minCostRpcClient = rpcClient;
            }
        }
        return minCostRpcClient;
    }
}

RandomLoadBalance (該類未經(jīng)驗證)

public class RandomLoadBalance extends AbstractLoadBalanceStrategy {
    private final Random random = new Random();

    @Override
    protected RpcClient doSelect(List<RpcClient> rpcClients) {
        int size = rpcClients.size();
        int index = random.nextInt(size);
        return rpcClients.get(index);
    }
}

作業(yè)3
以下項目主要使用了 commons-dbcp + fastjson + apache.curator 技術(shù)進行實現(xiàn)莹弊。

這里會通過create [-s][-e] path data acl命令創(chuàng)建節(jié)點:

建立所需節(jié)點

我會將數(shù)據(jù)庫配置的用戶名和密碼等信息寫入/dbConfig/lagou.config.DbConfig 節(jié)點中。

先建立父節(jié)點

create  /dbConfig  ""

然后若不存在臨時節(jié)點則重新創(chuàng)建

# 向 /dbConfig/lagou.config.DbConfig 中寫入配置信息
create -e /dbConfig/lagou.config.DbConfig {"username":"root","password":"123456","url":"jdbc:mysql://localhost:3306/aaaa?serverTimezone=UTC"}

# 查看是否能正常獲取節(jié)點信息
get /dbConfig/lagou.config.DbConfig 

更改數(shù)據(jù)

# 更改數(shù)據(jù)庫為 aaaa
set /dbConfig/lagou.config.DbConfig {"username":"root","password":"123456","url":"jdbc:mysql://localhost:3306/aaaa?serverTimezone=UTC"}

# 更改數(shù)據(jù)庫為 bbbb
set /dbConfig/lagou.config.DbConfig {"username":"root","password":"123456","url":"jdbc:mysql://localhost:3306/bbbb?serverTimezone=UTC"}

創(chuàng)建Java類

  1. 創(chuàng)建工具類 RuntimeContext
@Component
public class RuntimeContext implements ApplicationContextAware {

    private static ApplicationContext applicationContext = null;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (RuntimeContext.applicationContext == null) {
            RuntimeContext.applicationContext = applicationContext;
        }
    }

    //獲取applicationContext
    private static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    //通過name獲取Bean
    public static Object getBean(String name) {
        return getApplicationContext().getBean(name);
    }

    // ...
}
  1. 實體類
package lagou.config;

public class DbConfig {

    private String url;

    private String username;

    private String password;

    // setter getter 方法
}
  1. 創(chuàng)建 MyDataSource 自定義數(shù)據(jù)源涡尘,我們重寫的dbcp中的類BasicDataSource忍弛,我們將其全部拷貝了出來,然后重命名為MyDataSource類考抄,然后在其中修改了以下內(nèi)容

3.1將 UNKNOWN_TRANSACTIONISOLATION 的值改為 -1. 否則這個內(nèi)部變量會找不到

/**
     * The default TransactionIsolation state of connections created by this pool.
     */
    protected volatile int defaultTransactionIsolation =            PoolableConnectionFactory.UNKNOWN_TRANSACTIONISOLATION;

3.2jdk 1.7 之后需要實現(xiàn)該方法 getParentLogger()

@Override
    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
        return null;
    }

3.3 修改已有的 createDataSource() 方法细疚,刪除這幾行代碼


3.4 新增 changeDataSource 方法

public static void changeDataSource() {
        MyDataSource dataSource = (MyDataSource) RuntimeContext.getBean("dataSource");
        try {
            dataSource.close();
            dataSource.createDataSource();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
  1. 新建 InitListener 類,該類實現(xiàn)了ServletContextListener 來對Zookeeper節(jié)點 /db/url 的監(jiān)聽
public class InitListener implements ServletContextListener {

    private static final String CONNENT_ADDR = "localhost:2181";
    private static final String PATH = "/dbConfig";
    private static final String SUB_PATH = PATH + "/" + DbConfig.class.getName();

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                .connectString(CONNENT_ADDR)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();
        curatorFramework.start();

        PathChildrenCache nodeCache = new PathChildrenCache(curatorFramework, "/dbConfig", true);
        try {
            nodeCache.getListenable().addListener((client, pathChildrenCacheEvent) -> {
                System.out.println(pathChildrenCacheEvent.getType());

                if (PathChildrenCacheEvent.Type.CHILD_UPDATED.equals(pathChildrenCacheEvent.getType())) {
                    final ChildData data = pathChildrenCacheEvent.getData();
                    if (data != null) {
                        final String path = data.getPath();
                        System.out.println(path);
                        System.out.println(SUB_PATH);
                        if (path.equals(SUB_PATH)) {
                            MyDataSource datasource = (MyDataSource) RuntimeContext.getBean("dataSource");

                            final DbConfig dbConfig = JSON.parseObject(new String(data.getData()), DbConfig.class);
                            System.out.println(dbConfig.toString());
                            datasource.setUrl(dbConfig.getUrl());
                            datasource.setUsername(dbConfig.getUsername());
                            datasource.setPassword(dbConfig.getPassword());

                            MyDataSource.changeDataSource();
                        }
                    }
                }
            });
            nodeCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {
    }
}
  1. 修改spring boot 啟動類川梅,注冊 InitListener 疯兼,配置 我們自定義的 DataSource,建立一個query方法(可通過/query 進行訪問)暴露出去贫途。
@SpringBootApplication
@RestController
public class MyApplication {

    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }

    @RequestMapping("/query")
    public String query() {
        String sql = "select id from `info` limit 1";
        return jdbcTemplate.queryForObject(sql, String.class);
    }

    @Bean
    public ServletListenerRegistrationBean servletListenerRegistrationBean() {
        ServletListenerRegistrationBean servletListenerRegistrationBean = new ServletListenerRegistrationBean();
        servletListenerRegistrationBean.setListener(new InitListener());
        return servletListenerRegistrationBean;
    }

    @Bean
    public DataSource dataSource(@Value("${spring.datasource.url}") String url,
                                 @Value("${spring.datasource.username}") String username,
                                 @Value("${spring.datasource.password}") String password) {
        MyDataSource dataSource = new MyDataSource();
        dataSource.setUrl(url);
        dataSource.setUsername(username);
        dataSource.setPassword(password);
        return dataSource;
    }

    @Autowired
    private JdbcTemplate jdbcTemplate;
}

6.application.properties 配置

server.port=80

spring.datasource.url=jdbc:mysql://localhost:3306/aaaa?serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=123456

驗證
http://localhost/query

參考

基于Zookeeper動態(tài)切換數(shù)據(jù)源_BXS_0107的博客-CSDN博客
https://blog.csdn.net/newbie0107/article/details/105500579

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末吧彪,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子丢早,更是在濱河造成了極大的恐慌姨裸,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,692評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件怨酝,死亡現(xiàn)場離奇詭異傀缩,居然都是意外死亡,警方通過查閱死者的電腦和手機农猬,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評論 3 392
  • 文/潘曉璐 我一進店門赡艰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人斤葱,你說我怎么就攤上這事瞄摊⊙郑” “怎么了?”我有些...
    開封第一講書人閱讀 162,995評論 0 353
  • 文/不壞的土叔 我叫張陵换帜,是天一觀的道長。 經(jīng)常有香客問我鹤啡,道長惯驼,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,223評論 1 292
  • 正文 為了忘掉前任递瑰,我火速辦了婚禮祟牲,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘抖部。我一直安慰自己说贝,他們只是感情好,可當我...
    茶點故事閱讀 67,245評論 6 388
  • 文/花漫 我一把揭開白布慎颗。 她就那樣靜靜地躺著乡恕,像睡著了一般。 火紅的嫁衣襯著肌膚如雪俯萎。 梳的紋絲不亂的頭發(fā)上傲宜,一...
    開封第一講書人閱讀 51,208評論 1 299
  • 那天,我揣著相機與錄音夫啊,去河邊找鬼函卒。 笑死,一個胖子當著我的面吹牛撇眯,可吹牛的內(nèi)容都是我干的报嵌。 我是一名探鬼主播,決...
    沈念sama閱讀 40,091評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼熊榛,長吁一口氣:“原來是場噩夢啊……” “哼锚国!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起来候,我...
    開封第一講書人閱讀 38,929評論 0 274
  • 序言:老撾萬榮一對情侶失蹤跷叉,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后营搅,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體云挟,經(jīng)...
    沈念sama閱讀 45,346評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,570評論 2 333
  • 正文 我和宋清朗相戀三年转质,在試婚紗的時候發(fā)現(xiàn)自己被綠了园欣。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,739評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡休蟹,死狀恐怖沸枯,靈堂內(nèi)的尸體忽然破棺而出日矫,到底是詐尸還是另有隱情,我是刑警寧澤绑榴,帶...
    沈念sama閱讀 35,437評論 5 344
  • 正文 年R本政府宣布哪轿,位于F島的核電站,受9級特大地震影響翔怎,放射性物質(zhì)發(fā)生泄漏窃诉。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,037評論 3 326
  • 文/蒙蒙 一赤套、第九天 我趴在偏房一處隱蔽的房頂上張望飘痛。 院中可真熱鬧,春花似錦容握、人聲如沸宣脉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,677評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽塑猖。三九已至,卻和暖如春介蛉,著一層夾襖步出監(jiān)牢的瞬間萌庆,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,833評論 1 269
  • 我被黑心中介騙來泰國打工币旧, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留践险,地道東北人。 一個月前我還...
    沈念sama閱讀 47,760評論 2 369
  • 正文 我出身青樓吹菱,卻偏偏與公主長得像巍虫,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子鳍刷,可洞房花燭夜當晚...
    茶點故事閱讀 44,647評論 2 354