NoSQL 根本性的優(yōu)勢在于在云計算時代拌蜘,簡單、易于大規(guī)模分布式擴展陪竿,并且讀寫性能非常高
關系型數(shù)據(jù)庫 | NoSQL 數(shù)據(jù)庫 | |
---|---|---|
特點 | -數(shù)據(jù)關系模型基于關系模型禽翼,結(jié)構(gòu)化存儲,完整性約束 -基于二維表及其之間的聯(lián)系族跛,需要連接闰挡、并、交礁哄、差长酗、除等數(shù)據(jù)操作 -采用結(jié)構(gòu)化的查詢語言(SQL)做數(shù)據(jù)讀寫 -操作需要數(shù)據(jù)的一致性,需要事務甚至是強一致性 |
- 非結(jié)構(gòu)化的存儲 - 基于多維關系模型 - 具有特有的使用場景 |
優(yōu)點 | - 保持數(shù)據(jù)的一致性(事務處理) - 可以進行 join 等復雜查詢 - 通用化桐绒,技術成熟 |
- 高并發(fā)夺脾,大數(shù)據(jù)下讀寫能力較強 - 基本支持分布式之拨,易于擴展,可伸縮 - 簡單咧叭,弱結(jié)構(gòu)化存儲 |
缺點 | - 數(shù)據(jù)讀寫必須經(jīng)過 sql 解析蚀乔,大量數(shù)據(jù)、高并發(fā)下讀寫性能不足 - 對數(shù)據(jù)做讀寫佳簸,或修改數(shù)據(jù)結(jié)構(gòu)時需要加鎖乙墙,影響并發(fā)操作 - 無法適應非結(jié)構(gòu)化存儲 - 擴展困難 - 昂貴、復雜 |
- join 等復雜操作能力較弱 - 事務支持較弱 - 通用性差 - 無完整約束復雜業(yè)務場景支持較差 |
數(shù)據(jù)的切分(Sharding)根據(jù)其切分規(guī)則的類型生均,可以分為兩種切分模式听想。一種是按照不同的表(或者Schema)來切分到不同的數(shù)據(jù)庫(主機)之上,這種切可以稱之為數(shù)據(jù)的垂直(縱向)切分马胧;另外一種則是根據(jù)表中的數(shù)據(jù)的邏輯關系汉买,將同一個表中的數(shù)據(jù)按照某種條件拆分到多臺數(shù)據(jù)庫(主機)上面,這種切分稱之為數(shù)據(jù)的水平(橫向)切分
對于 DBA 來說佩脊,可以這么理解 Mycat:
Mycat 就是 MySQL Server蛙粘,而 Mycat 后面連接的 MySQL Server,就好象是 MySQL 的存儲引擎,如 InnoDB威彰,MyISAM 等出牧,因此,Mycat 本身并不存儲數(shù)據(jù)歇盼,數(shù)據(jù)是在后端的 MySQL 上存儲的舔痕,因此數(shù)據(jù)可靠性以及事務等都是 MySQL 保證的,簡單的說豹缀,Mycat 就是 MySQL 最佳伴侶伯复,它在一定程度上讓 MySQL 擁有了能跟 Oracle PK 的能力。
對于軟件工程師來說邢笙,可以這么理解 Mycat:
Mycat 就是一個近似等于 MySQL 的數(shù)據(jù)庫服務器啸如,你可以用連接 MySQL 的方式去連接 Mycat(除了端口不同,默認的 Mycat 端口是 8066 而非 MySQL 的 3306氮惯,因此需要在連接字符串上增加端口信息)叮雳,大多數(shù)情況下,可以用你熟悉的對象映射框架使用 Mycat筐骇,但建議對于分片表债鸡,盡量使用基礎的 SQL 語句,因為這樣能達到最佳性能铛纬,特別是幾千萬甚至幾百億條記錄的情況下厌均。
對于架構(gòu)師來說,可以這么理解 Mycat:
Mycat 是一個強大的數(shù)據(jù)庫中間件告唆,不僅僅可以用作讀寫分離棺弊、以及分表分庫晶密、容災備份,而且可以用于多租戶應用開發(fā)模她、云平臺基礎設施稻艰、讓你的架構(gòu)具備很強的適應性和靈活性,借助于即將發(fā)布的 Mycat 智能優(yōu)化模塊侈净,系統(tǒng)的數(shù)據(jù)訪問瓶頸和熱點一目了然尊勿,根據(jù)這些統(tǒng)計分析數(shù)據(jù),你可以自動或手工調(diào)整后端存儲畜侦,將不同的表映射到不同存儲引擎上元扔,而整個應用的代碼一行也不用改變。
Mycat 原理
Mycat 的原理中最重要的一個動詞是“攔截”旋膳,它攔截了用戶發(fā)送過來的 SQL 語句涮俄,首先對 SQL 語句做了一些特定的分析:如分片分析掀亥、路由分析、讀寫分離分析换淆、緩存分析等轴脐,然后將此 SQL 發(fā)往后端的真實數(shù)據(jù)庫岸霹,并將返回的結(jié)果做適當?shù)奶幚砥苤遥罱K再返回給用戶
上述圖片里蝌戒,Orders 表被分為三個分片 datanode(簡稱 dn),這三個分片是分布在兩臺 MySQL Server 上(DataHost)碱工,即 datanode=database@datahost 方式垄懂,因此你可以用一臺到 N 臺服務器來分片,分片規(guī)則為(sharding rule)典型的字符串枚舉分片規(guī)則痛垛,一個規(guī)則的定義是分片字段(sharding column)+分片函數(shù)(rule function),這里的分片字段為 prov 而分片函數(shù)為字符串枚舉方式
當 Mycat 收到一個 SQL 時桶蛔,會先解析這個 SQL匙头,查找涉及到的表,然后看此表的定義仔雷,如果有分片規(guī)則蹂析,則獲取到 SQL 里分片字段的值,并匹配分片函數(shù)碟婆,得到該 SQL 對應的分片列表电抚,然后將 SQL 發(fā)往這些分片去執(zhí)行,最后收集和處理所有分片返回的結(jié)果數(shù)據(jù)竖共,并輸出到客戶端蝙叛。以 select * from Orders where prov=?語句為例,查到 prov=wuhan公给,按照分片函數(shù)借帘,wuhan 返回 dn1蜘渣,于是 SQL 就發(fā)給了 MySQL1,去取 DB1 上的查詢結(jié)果肺然,并返回給用戶蔫缸。
如果上述 SQL 改為 select * from Orders where prov in (‘wuhan’,‘beijing’),那么际起,SQL 就會發(fā)給MySQL1 與 MySQL2 去執(zhí)行拾碌,然后結(jié)果集合并后輸出給用戶。但通常業(yè)務中我們的 SQL 會有 Order By 以及Limit 翻頁語法街望,此時就涉及到結(jié)果集在 Mycat 端的二次處理校翔,這部分的代碼也比較復雜,而最復雜的則屬兩個表的 Jion 問題它匕,為此展融,Mycat 提出了創(chuàng)新性的 ER 分片、全局表豫柬、HBT(Human Brain Tech)人工智能的 Catlet告希、以及結(jié)合 Storm/Spark 引擎等十八般武藝的解決辦法,從而成為目前業(yè)界最強大的方案烧给,這就是開源的力量
應用場景
- 單純的讀寫分離燕偶,此時配置最為簡單,支持讀寫分離础嫡,主從切換指么;
- 分表分庫,對于超過 1000 萬的表進行分片榴鼎,最大支持 1000 億的單表分片伯诬;
- 多租戶應用,每個應用一個庫巫财,但應用程序只連接 Mycat盗似,從而不改造程序本身,實現(xiàn)多租戶化平项;
- 報表系統(tǒng)赫舒,借助于 Mycat 的分表能力,處理大規(guī)模報表的統(tǒng)計闽瓢;
- 替代 Hbase接癌,分析大數(shù)據(jù);
- 作為海量數(shù)據(jù)實時查詢的一種簡單有效方案扣讼,比如 100 億條頻繁查詢的記錄需要在 3 秒內(nèi)查詢出來結(jié)果缺猛,除了基于主鍵的查詢,還可能存在范圍查詢或其他屬性查詢,此時 Mycat 可能是最簡單有效的選擇
Mycat 中的概念
數(shù)據(jù)庫中間件
Mycat 是數(shù)據(jù)庫中間件枯夜,就是介于數(shù)據(jù)庫與應用之間弯汰,進行數(shù)據(jù)處理與交互的中間服務。由于前面講的對數(shù)據(jù)進行分片處理之后湖雹,從原有的一個庫咏闪,被切分為多個分片數(shù)據(jù)庫,所有的分片數(shù)據(jù)庫集群構(gòu)成了整個完整的數(shù)據(jù)庫存儲
邏輯庫(schema)
通常對實際應用來說摔吏,并不需要知道中間件的存在鸽嫂,業(yè)務開發(fā)人員只需要知道數(shù)據(jù)庫的概念,所以數(shù)據(jù)庫中間件可以被看做是一個或多個數(shù)據(jù)庫集群構(gòu)成的邏輯庫
邏輯表(table)
ER 表
關系型數(shù)據(jù)庫是基于實體關系模型(Entity-Relationship Model)之上征讲,通過其描述了真實世界中事物與關系据某,Mycat 中的 ER 表即是來源于此。根據(jù)這一思路诗箍,提出了基于 E-R 關系的數(shù)據(jù)分片策略癣籽,子表的記錄與所關聯(lián)的父表記錄存放在同一個數(shù)據(jù)分片上,即子表依賴于父表滤祖,通過表分組(Table Group)保證數(shù)據(jù) Join 不會跨庫操作
全局表
一個真實的業(yè)務系統(tǒng)中筷狼,往往存在大量的類似字典表的表,這些表基本上很少變動匠童,字典表具有以下幾個特性:
? 變動不頻繁埂材;
? 數(shù)據(jù)量總體變化不大;
? 數(shù)據(jù)規(guī)模不大汤求,很少有超過數(shù)十萬條記錄俏险。
對于這類的表,在分片的情況下扬绪,當業(yè)務表因為規(guī)模而進行分片以后竖独,業(yè)務表與這些附屬的字典表之間的關聯(lián),就成了比較棘手的問題挤牛,所以 Mycat 中通過數(shù)據(jù)冗余來解決這類表的 join预鬓,即所有的分片都有一份數(shù)據(jù)的拷貝,所有將字典表或者符合字典表特性的一些表定義為全局表赊颠。數(shù)據(jù)冗余是解決跨分片數(shù)據(jù) join 的一種很好的思路,也是數(shù)據(jù)切分規(guī)劃的另外一條重要規(guī)則
分片節(jié)點(dataNode)
數(shù)據(jù)切分后劈彪,一個大表被分到不同的分片數(shù)據(jù)庫上面竣蹦,每個表分片所在的數(shù)據(jù)庫就是分片節(jié)點(dataNode)
節(jié)點主機(dataHost)
數(shù)據(jù)切分后,每個分片節(jié)點(dataNode)不一定都會獨占一臺機器沧奴,同一機器上面可以有多個分片數(shù)據(jù)庫痘括,這樣一個或多個分片節(jié)點(dataNode)所在的機器就是節(jié)點主機(dataHost),為了規(guī)避單節(jié)點主機并發(fā)數(shù)限制,盡量將讀寫壓力高的分片節(jié)點(dataNode)均衡的放在不同的節(jié)點主機(dataHost)
分片規(guī)則(rule)
按照某種業(yè)務規(guī)則把數(shù)據(jù)分到某個分片的規(guī)則就是分片規(guī)則,數(shù)據(jù)切分選擇合適的分片規(guī)則非常重要纲菌,將極大的避免后續(xù)數(shù)據(jù)處理的難度
Mycat 的配置
1 schema.xml
管理著 MyCat 的邏輯庫挠日、表、分片規(guī)則翰舌、DataNode 以 及 DataSource
<?xml version="1.0"?>
<!DOCTYPE mycat:schema SYSTEM "schema.dtd">
<mycat:schema xmlns:mycat="http://io.mycat/">
<schema name="TESTDB" checkSQLschema="true" sqlMaxLimit="100" randomDataNode="dn1">
<table name="customer" primaryKey="id" dataNode="dn1,dn2" rule="sharding-by-intfile" autoIncrement="true" fetchStoreNodeByJdbc="true">
<childTable name="customer_addr" primaryKey="id" joinKey="customer_id" parentKey="id"> </childTable>
</table>
</schema>
<!-- dataNode 標簽定義了 MyCat 中的數(shù)據(jù)節(jié)點嚣潜,也就是我們通常說所的數(shù)據(jù)分片。一個 dataNode 標簽就是一個獨立的數(shù)據(jù)分片 -->
<dataNode name="dn1" dataHost="localhost1" database="db1" />
<dataNode name="dn2" dataHost="localhost1" database="db2" />
<dataNode name="dn3" dataHost="localhost1" database="db3" />
<!-- 定義了具體的數(shù)據(jù)庫實例椅贱、讀寫分離配置和心跳語句 -->
<dataHost name="localhost1" maxCon="1000" minCon="10" balance="0" writeType="0" dbType="mysql" dbDriver="jdbc" switchType="1" slaveThreshold="100">
<heartbeat>select user()</heartbeat>
<!-- can have multi write hosts -->
<writeHost host="hostM1" url="jdbc:mysql://localhost:3306" user="root" password="root">
<readHost host="" url="" password="" user=""></readHost>
</writeHost>
<!-- <writeHost host="hostM2" url="localhost:3316" user="root" password="123456"/> -->
</dataHost>
</mycat:schema>
1.1 schema 標簽
schema 標簽用于定義 MyCat 實例中的邏輯庫懂算,MyCat 可以有多個邏輯庫,每個邏輯庫都有自己的相關配置庇麦〖萍迹可以使用 schema 標簽來劃分這些不同的邏輯庫。
屬性名 | 屬性說明 |
---|---|
dataNode | 該屬性用于綁定邏輯庫到某個具體的 database 上山橄,1.3 版本如果配置了 dataNode垮媒,則不可以配置分片表,1.4 可以配置默認分片航棱,只需要配置需要分片的表即可 |
checkSQLschema | 為 true 時睡雇,schema 的字符去掉,select * from TESTDB.travelrecord 會修改為 select * from travelrecord |
sqlMaxLimit | 當該值設置為某個數(shù)值時丧诺。每條執(zhí)行的 SQL 語句入桂,如果沒有加上 limit 語句,MyCat 也會自動的加上所對應的值 |
1.1.1 table 標簽
定義了 MyCat 中的邏輯表驳阎,所有需要拆分的表都需要在這個標簽中定義
屬性名 | 屬性說明 |
---|---|
name | 定義邏輯表的表名抗愁,這個名字就如同我在數(shù)據(jù)庫中執(zhí)行 create table 命令指定的名字一樣,同個 schema 標簽中定義的名字必須唯一 |
dataNode | 定義這個邏輯表所屬的 dataNode, 該屬性的值需要和 dataNode 標簽中 name 屬性的值相互對應 |
rule | 該屬性用于指定邏輯表要使用的規(guī)則名字呵晚,規(guī)則名字在 rule.xml 中定義蜘腌,必須與 tableRule 標簽中 name 屬性屬性值一一對應 |
ruleRequired | 該屬性用于指定表是否綁定分片規(guī)則,如果配置為 true饵隙,但沒有配置具體 rule 的話 撮珠,程序會報錯 |
primaryKey | 該邏輯表對應真實表的主鍵 |
type | 該屬性定義了邏輯表的類型,目前邏輯表只有“全局表”和”普通表”兩種類型金矛。對應的配置: ? 全局表:global ? 普通表:不指定該值為 globla 的所有表 |
autoIncrement | 使用 autoIncrement=“true” 指定這個表有使用自增長主鍵 |
subTables | |
needAddLimit | 指定表是否需要自動的在每個語句后面加上 limit 限制 |
1.1.1.1 childTable 標簽
childTable 標簽用于定義 E-R 分片的子表芯急。通過標簽上的屬性與父表進行關聯(lián)
屬性名 | 屬性說明 |
---|---|
name | 定義子表的表名 |
joinKey | 插入子表的時候會使用這個列的值查找父表存儲的數(shù)據(jù)節(jié)點 |
parentKey | 屬性指定的值一般為與父表建立關聯(lián)關系的列名。程序首先獲取 joinkey 的值驶俊,再通過 parentKey 屬性指定的列名產(chǎn)生查詢語句娶耍,通過執(zhí)行該語句得到父表存儲在哪個分片上。從而確定子表存儲的位置 |
primaryKey | 同 table 標簽所描述的 |
needAddLimit | 同 table 標簽所描述的 |
1.2 dataNode 標簽
定義了 MyCat 中的數(shù)據(jù)節(jié)點饼酿,也就是我們通常說所的數(shù)據(jù)分片榕酒。一個 dataNode 標簽就是一個獨立的數(shù)據(jù)分片
<dataNode name="dn1" dataHost="lch3307" database="db1" ></dataNode>
例子中所表述的意思為:使用名字為 lch3307 數(shù)據(jù)庫實例上的 db1 物理數(shù)據(jù)庫胚膊,這就組成一個數(shù)據(jù)分片,最后想鹰,我們使用名字 dn1 標識這個分片
屬性名 | 屬性說明 |
---|---|
name | 定義數(shù)據(jù)節(jié)點的名字紊婉,這個名字需要是唯一的,我們需要在 table 標簽上應用這個名字辑舷,來建立表與分片對應的關系 |
dataHost | 該屬性用于定義該分片屬于哪個數(shù)據(jù)庫實例的喻犁,屬性值是引用 dataHost 標簽上定義的 name 屬性 |
database | 該屬性用于定義該分片屬性哪個具體數(shù)據(jù)庫實例上的具體庫,因為這里使用兩個緯度來定義分片惩妇,就是:實例+具體的庫株汉。因為每個庫上建立的表和表結(jié)構(gòu)是一樣的。所以這樣做就可以輕松的對表進行水平拆分 |
1.3 dataHost 標簽
屬性名 | 屬性說明 |
---|---|
name | 唯一標識 dataHost 標簽歌殃,供上層的標簽使用 |
maxCon | 指定每個讀寫實例連接池的最大連接 |
minCon | 指定每個讀寫實例連接池的最小連接乔妈,初始化連接池的大小 |
balance | 1. balance="0", 不開啟讀寫分離機制,所有讀操作都發(fā)送到當前可用的 writeHost 上氓皱。 2. balance="1"路召,全部的 readHost 與 stand by writeHost 參與 select 語句的負載均衡,簡單的說波材,當雙主雙從模式(M1->S1股淡,M2->S2,并且 M1 與 M2 互為主備)廷区,正常情況下唯灵,M2,S1,S2 都參與 select 語句的負載均衡。 3. balance="2"隙轻,所有讀操作都隨機的在 writeHost埠帕、readhost 上分發(fā)。 4. balance="3"玖绿,所有讀請求隨機的分發(fā)到 wiriterHost 對應的 readhost 執(zhí)行敛瓷,writerHost 不負擔讀壓力,注意 balance=3 只在 1.4 及其以后版本有斑匪,1.3 沒有 |
writeType | 1. writeType="0"呐籽,所有寫操作發(fā)送到配置的第一個 writeHost,第一個掛了切到還生存的第二個 writeHost蚀瘸,重新啟動后已切換后的為準狡蝶,切換記錄在配置文件中:dnindex.properties . 2. writeType="1",所有寫操作都隨機的發(fā)送到配置的 writeHost贮勃,1.5 以后廢棄不推薦 |
dbType | 指定后端連接的數(shù)據(jù)庫類型牢酵,例如:mongodb、oracle衙猪、spark 等 |
dbDriver | 指定連接后端數(shù)據(jù)庫使用的 Driver,目前可選的值有 native 和 jdbc 如果使用 JDBC 的話需要將符合 JDBC 4 標準的驅(qū)動 JAR 包放到 MYCAT\lib 目錄下,并檢查驅(qū)動 JAR 包中包括如下目錄結(jié)構(gòu)的文件:META-INF\services\java.sql.Driver垫释。在這個文件內(nèi)寫上具體的 Driver 類名丝格,例如:com.mysql.jdbc.Driver |
switchType | switchType = -1, 表示不自動切換 switchType = 1 默認值,自動切換 switchType = 2 基于 MySQL 主從同步的狀態(tài)決定是否切換心跳語句為 show slave status switchType = 3 基于 MySQL galary cluster 的切換機制(適合集群)(1.4.1)心跳語句為 show status like ‘wsrep%’ |
tempReadHostAvailable | 如果配置了這個屬性 writeHost 下面的 readHost 仍舊可用棵譬,默認 0 可配置(0显蝌、1) |
1.3.1 heartbeat 標簽
這個標簽內(nèi)指明用于和后端數(shù)據(jù)庫進行心跳檢查的語句。例如,MYSQL 可以使用 select user()订咸,Oracle 可以使用 select 1 from dual 等
1.3.2 writeHost 標簽曼尊、readHost 標簽
這兩個標簽都指定后端數(shù)據(jù)庫的相關配置給 mycat,用于實例化后端連接池脏嚷。唯一不同的是骆撇,writeHost 指定寫實例、readHost 指定讀實例
在一個 dataHost 內(nèi)可以定義多個 writeHost 和 readHost父叙。但是神郊,如果 writeHost 指定的后端數(shù)據(jù)庫宕機,那么這個 writeHost 綁定的所有 readHost 都將不可用趾唱。另一方面涌乳,由于這個 writeHost 宕機系統(tǒng)會自動的檢測到,并切換到備用的 writeHost 上去
屬性名 | 屬性說明 |
---|---|
host | 用于標識不同實例甜癞,一般 writeHost 我們使用 *M1夕晓,readHost 我們用 *S1 |
url | 后端實例連接地址,如果是使用 native 的 dbDriver悠咱,則一般為 address:port 這種形式蒸辆。用 JDBC 或其他的 dbDriver,則需要特殊指定乔煞。當使用 JDBC 時則可以這么寫:jdbc:mysql://localhost:3306/ |
password | 后端存儲實例需要的密碼 |
user | 后端存儲實例需要的用戶名字 |
weight | 權(quán)重吁朦,配置在 readhost 中作為讀節(jié)點的權(quán)重(1.4 以后) |
usingDecrypt | 是否對密碼加密默認 0 否 如需要開啟配置 1,同時使用加密程序?qū)γ艽a加密 |
2 server.xml
server.xml 幾乎保存了所有 mycat 需要的系統(tǒng)配置信息渡贾。其在代碼內(nèi)直接的映射類為 SystemConfig 類
2.1 user 標簽
<user name="test">
<property name="password">test</property>
<property name="schemas">TESTDB</property>
<property name="readOnly">true</property>
<property name="benchmark">11111</property>
<property name="usingDecrypt">1</property>
<privileges check="false">
<schema name="TESTDB" dml="0010" showTables="custome/mysql">
<table name="tbl_user" dml="0110"></table>
<table name="tbl_dynamic" dml="1111"></table>
</schema>
</privileges>
</user>
server.xml 中的標簽本就不多逗宜,這個標簽主要用于定義登錄 mycat 的用戶和權(quán)限。例如上面的例子中空骚,我定義了一個用戶纺讲,用戶名為 test、密碼也為 test囤屹,可訪問的 schema 也只有 TESTDB 一個
屬性名 | 屬性說明 |
---|---|
Benchmark | benchmark 基準, 當前端的整體 connection 數(shù)達到基準值是, 對來自該賬戶的請求開始拒絕連接熬甚,0 或不設表示不限制 |
usingDecrypt | 是否對密碼加密默認 0 否 如需要開啟配置 1,同時使用加密程序?qū)γ艽a加密 |
2.1.1 privileges 標簽
對用戶的 schema 及 下級的 table 進行精細化的 DML 權(quán)限控制肋坚,privileges 節(jié)點中的 check 屬性是用于標識是否開啟 DML 權(quán)限檢查乡括, 默認 false 標識不檢查肃廓,當然 privileges 節(jié)點不配置,等同 check=false诲泌,由于 Mycat 一個用戶的 schemas 屬性可配置多個 schema 盲赊,所以 privileges 的下級節(jié)點 schema 節(jié)點同樣可配置多個,對多庫多表進行細粒度的 DML 權(quán)限控制
Schema/Table 上的 dml 屬性描述:
屬性名 | 屬性說明 | 示例(禁止增刪改查) |
---|---|---|
dml | insert,update,select,delete | 0000 |
<!-- 禁止 insert 和 delete 操作 -->
<privileges check="true">
<schema name="TESTDB" dml="0110" >
<table name="table01" dml="0111"></table>
<table name="table02" dml="1111"></table>
</schema>
<schema name="TESTDB1" dml="0110">
<table name="table03" dml="1110"></table>
<table name="table04" dml="1010"></table>
</schema>
</privileges>
2.2 system 標簽
這個標簽內(nèi)嵌套的所有 property 標簽都與系統(tǒng)配置有關敷扫,請注意哀蘑,下面我會省去標簽 property 直接使用這個標簽的 name 屬性內(nèi)的值來介紹這個屬性的作用
2.2.1 charset 屬性
字符集設置
2.2.2 defaultSqlParser 屬性
由于 mycat 最初是時候 Foundation DB 的 sql 解析器,而后才添加的 Druid 的解析器葵第。所以這個屬性用來指定默認的解析器绘迁。目前的可用的取值有:druidparser 和 fdbparser。使用的時候可以選擇其中的一種卒密,目前一般都使用 druidparser
2.2.3 processors 屬性
這個屬性主要用于指定系統(tǒng)可用的線程數(shù)缀台,默認值為機器 CPU 核心線程數(shù)。主要影響 processorBufferPool栅受、processorBufferLocalPercent将硝、processorExecutor 屬性。NIOProcessor 的個數(shù)也是由這個屬性定義的屏镊,所以調(diào)優(yōu)的時候可以適當?shù)恼{(diào)高這個屬性
2.2.4 processorBufferChunk 屬性
這個屬性指定每次分配 Socket Direct Buffer 的大小依疼,默認是 4096 個字節(jié)荣挨。這個屬性也影響 buffer pool 的長度神帅。如果一次性獲取的數(shù)過大 buffer 不夠用經(jīng)常出現(xiàn)警告,則可以適當調(diào)大
2.2.5 processorBufferPool 屬性
這個屬性指定 bufferPool 計算比例值桶唐。由于每次執(zhí)行 NIO 讀棍丐、寫操作都需要使用到 buffer误辑,系統(tǒng)初始化的時候會建立一定長度的 buffer 池來加快讀、寫的效率歌逢,減少建立 buffer 的時間
Mycat 中有兩個主要的 buffer 池:
- BufferPool
- ThreadLocalPool
BufferPool 由 ThreadLocalPool 組合而成巾钉,每次從 BufferPool 中獲取 buffer 都會優(yōu)先獲取 ThreadLocalPool 中的 buffer,未命中之后才會去獲取 BufferPool 中的 buffer秘案。也就是說 ThreadLocalPool 是作為 BufferPool 的二級緩存砰苍,由每個線程內(nèi)部自己使用。當然這其中還有一些限制條件需要線程的名字是由$_開頭阱高。然而 BufferPool 上的 buffer 則是每個 NIOProcessor 都共享的赚导。
默認這個屬性的值為: 默認 bufferChunkSize(4096) * processors 屬性 * 1000
BufferPool 的總長度 = bufferPool / bufferChunk
若 bufferPool 不是 bufferChunk 的整數(shù)倍,則總長度為前面計算得出的商 + 1假設系統(tǒng)線程數(shù)為 4赤惊,其他都為屬性的默認值吼旧,則:bufferPool = 4096 * 4 * 1000,BufferPool 的總長度 : 4000 = 16384000 / 4096
2.2.6 processorBufferLocalPercent 屬性
前面提到了 ThreadLocalPool未舟。這個屬性就是用來控制分配這個 pool 的大小用的圈暗,但其也并不是一個準確的值掂为,也是一個比例值。這個屬性默認值為 100员串。
線程緩存百分比 = bufferLocalPercent / processors 屬性菩掏。
例如,系統(tǒng)可以同時運行 4 個線程昵济,使用默認值,則根據(jù)公式每個線程的百分比為 25野揪。最后根據(jù)這個百分比來計算出具體的 ThreadLocalPool 的長度公式如下:
ThreadLocalPool 的長度 = 線程緩存百分比 * BufferPool 長度 / 100
假設 BufferPool 的長度為 4000访忿,其他保持默認值。那么最后每個線程建立上的 ThreadLocalPool 的長度為: 1000 = 25 * 4000 / 100
2.2.7 processorExecutor 屬性
這個屬性主要用于指定 NIOProcessor 上共享的 businessExecutor 固定線程池大小斯稳。mycat 在需要處理一些異步邏輯的時候會把任務提交到這個線程池中海铆。新版本中這個連接池的使用頻率不是很大了,可以設置一個較小的值
2.2.8 sequnceHandlerType 屬性
指定使用 Mycat 全局序列的類型挣惰。0 為本地文件方式卧斟,1 為數(shù)據(jù)庫方式,2 為時間戳序列方式憎茂,3 為分布式 ZK ID 生成器珍语,4 為 zk 遞增 id 生成
3 rule.xml
rule.xml 里面就定義了我們對表進行拆分所涉及到的規(guī)則定義。我們可以靈活的對表使用不同的分片算法竖幔,或者對表使用相同的算法但具體的參數(shù)不同板乙。這個文件里面主要有 tableRule 和 function 這兩個標簽。在具體使用過程中可以按照需求添加 tableRule 和 function
3.1 tableRule 標簽
<tableRule name="mod-long">
<rule>
<columns>id</columns>
<algorithm>mod-long</algorithm>
</rule>
</tableRule>
屬性名 | 屬性說明 |
---|---|
name | 屬性指定唯一的名字拳氢,用于標識不同的表規(guī)則 |
內(nèi)嵌的 rule 標簽則指定對物理表中的哪一列進行拆分和使用什么路由算法:
標簽名 | 標簽說明 |
---|---|
columns | 指定要拆分的列名字 |
algorithm | 使用 function 標簽中的 name 屬性 |
3.2 function 標簽
<function name="mod-long" class="io.mycat.route.function.PartitionByMod">
<!-- how many data nodes -->
<property name="count">3</property>
</function>
屬性名 | 屬性說明 |
---|---|
name | 指定算法的名字 |
class | 制定路由算法具體的類名字 |
property | 算法需要用到的一些屬性 |
Mycat 的分片 join
Mycat 目前版本支持跨分片的 join,主要實現(xiàn)的方式有四種:全局表募逞,ER 分片,catletT(人工智能)和 ShareJoin馋评,ShareJoin 在開發(fā)版中支持放接,前面三種方式 1.3.0.1 支持
1 全局表
2 ER分片
MyCAT 借鑒了 NewSQL 領域的新秀 Foundation DB 的設計思路,F(xiàn)oundation DB 創(chuàng)新性的提出了 TableGroup 的概念留特,其將子表的存儲位置依賴于主表纠脾,并且物理上緊鄰存放,因此徹底解決了 JION 的效率和性能問題磕秤,根據(jù)這一思路乳乌,提出了基于 E-R 關系的數(shù)據(jù)分片策略,子表的記錄與所關聯(lián)的父表記錄存放在同一個數(shù)據(jù)分片上
customer 采用 sharding-by-intfile 這個分片策略市咆,分片在 dn1,dn2 上汉操,orders 依賴父表進行分片,兩個表的關聯(lián)關系為 orders.customer_id=customer.id蒙兰。于是數(shù)據(jù)分片和存儲的示意圖如下:
這樣一來磷瘤,分片 Dn1 上的的 customer 與 Dn1 上的 orders 就可以進行局部的 JOIN 聯(lián)合芒篷,Dn2 上也如此,再合并兩個節(jié)點的數(shù)據(jù)即可完成整體的 JOIN采缚,試想一下针炉,每個分片上 orders 表有 100 萬條,則 10 個分片就有 1 個億扳抽,基于 E-R 映射的數(shù)據(jù)分片模式篡帕,基本上解決了 80%以上的企業(yè)應用所面臨的問題
3 Share join
ShareJoin 是一個簡單的跨分片 Join,基于 HBT 的方式實現(xiàn)
目前支持 2 個表的 join,原理就是解析 SQL 語句,拆分成單表的 SQL 語句執(zhí)行贸呢,然后把各個節(jié)點的數(shù)據(jù)匯集
4 catlet(人工智能)
全局序列號
在實現(xiàn)分庫分表的情況下镰烧,數(shù)據(jù)庫自增主鍵已無法保證自增主鍵的全局唯一。為此楞陷,MyCat 提供了全局sequence怔鳖,并且提供了包含本地配置和數(shù)據(jù)庫配置等多種實現(xiàn)方式
1 本地文件方式
2 數(shù)據(jù)庫配置方式
Mycat SQL 攔截機制
用戶可以寫一個 java 類,將傳入 MyCAT 的 SQL 進行改寫然后交給Mycat 去執(zhí)行固蛾,此技巧可以完成如下一些特殊功能:
- 捕獲和記錄某些特殊的 SQL结执;
- 記錄 sql 查找異常;
- 出于性能優(yōu)化的考慮艾凯,改寫 SQL献幔,比如改變查詢條件的順序或增加分頁限制;
- 將某些 Select SQL 強制設置為 Read 模式览芳,走讀寫分離(很多事務框架很難剝離事務中的 Select SQL斜姥;
- 后期 Mycat 智能優(yōu)化,攔截所有 sql 做智能分析沧竟,自動監(jiān)控節(jié)點負載铸敏,自動優(yōu)化路由,提供數(shù)據(jù)庫優(yōu)化建議
SQL 攔截的原理是在路由之前攔截 SQL悟泵,然后做其他處理杈笔,完了之后再做路由,執(zhí)行,如下圖所示:
二糕非、實現(xiàn)自定義 SQL 攔截器
2.1 實現(xiàn) SQLInterceptor 接口
/**
* used for interceptor sql before execute ,can modify sql befor execute
* @author wuzhih
*
*/
public interface SQLInterceptor {
/**
* return new sql to handler,ca't modify sql's type
* @param sql
* @param sqlType
* @return new sql
*/
String interceptSQL(String sql ,int sqlType);
}
例如蒙具,我們自定義實現(xiàn)一個 SQL 攔截器,對每個 SQL 語句進行 EXPLAIN 分析朽肥,找出潛在的慢 sql
2.2 在 server.xml 中添加攔截器配置
運行采坑記錄
一禁筏、Client does not support authentication protocol requested by server
解決方案:
USE mysql;
ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY '${password}';
FLUSH PRIVILEGES;
官方解決鏈接:https://dev.mysql.com/doc/refman/5.6/en/old-client.html
源碼解析
一、mycat 通信模型
前端與后端通信框架都為NIO/AIO衡招,因為目前生產(chǎn)上用的linux發(fā)行版內(nèi)核都沒有真正實現(xiàn)網(wǎng)絡上的AIO篱昔,如果應用用AIO的話可能比NIO還要慢一些,所以,我們這里只分析NIO相關的通信模塊州刽。
- NIOAcceptor:作為服務器接受客戶端連接(前端 NIO 通信)
- NIOConnector:作為客戶端去連接后臺數(shù)據(jù)庫(MySql空执,后端 NIO 通信)
- NIOReactor:Reactor 模式的 NIO,處理并轉(zhuǎn)發(fā)請求到 RW 線程穗椅,其實就是把對應 AbstractConnection(就是 NIO 的 channel 的封裝)注冊到 RW 線程的selector上辨绊,只注冊讀標記
- NIOReactorPool:一般高性能網(wǎng)絡通信框架采用多 Reactor(多 dispatcher)模式,這里將 NIOReactor 池化匹表;每次 NIOConnector 接受一個連接或者NIOAcceptor 請求一個連接门坷,都會封裝成 AbstractConnection,同時請求 NIOReactorPool 每次輪詢出一個 NIOReactor袍镀,之后 AbstractConnection 與這個NIOReactor 綁定(就是3之中說的注冊)
- RW:RW 線程拜鹤,負責執(zhí)行 NIO 的 channel 讀寫,這里 channel 封裝成了 AbstractConnection
- NIOSocketWR:每個前端連接(FrontendConnection)和后端連接(BackendConnection)都有一個對應的緩沖區(qū)流椒,對連接讀寫操作具體如何操作的方法和緩存方式,封裝到了這個類里面
FrontendConnection
客戶端與 mycat 的連接明也,子類有 ServerConnection 和 ManagerConnection
BackendConnection
mycat 連接后天數(shù)據(jù)庫的 connection宣虾,子類有 MySqlConnection、JDBCConnection
二温数、客戶端連接建立及認證
- NIOAcceptor#run 方法輪詢是否有連接請求绣硝,若客戶端有連接請求到 mycat,則調(diào)用 io.mycat.net.NIOAcceptor#accept 方法
- NIOAcceptor#accept 方法調(diào)用 serverChannel.accept 創(chuàng)建一個 SocketChannel撑刺,并根據(jù) SocketChannel 創(chuàng)建一個新的前端連接實例 FrontendConnection鹉胖,然后從 NIOReactor 池中獲取的 NIOReactor 實例,并調(diào)用 NIOReactor#postRegister 方法將 FrontendConnection 放入 RW 線程的注冊隊列够傍,然后喚醒 RW 線程的 selector
- RW#run 輪詢注冊隊列中是否有 AbstractConnection甫菠,若有則將其取出,并作為讀事件注冊到 tSelector冕屯。此外還調(diào)用 FrontendConnection#register 進行前端連接建立和認證
三寂诱、客戶端 SQL 請求執(zhí)行流程
RW#run 輪詢注冊隊列中是否有 AbstractConnection,若存在且為讀事件則調(diào)用 AbstractConnection#asynRead 異步讀取數(shù)據(jù)并梳理安聘,實際處理邏輯見 NIOSocketWR#asynRead
-
NIOSocketWR#asynRead 從 前端連接的 channel 中讀取數(shù)據(jù)痰洒,并且保存到對應 AbstractConnection 的 readBuffer 中,之后調(diào)用 AbstractConnection#onReadData 處理讀取到的數(shù)據(jù)
@Override public void asynRead() throws IOException { ByteBuffer theBuffer = con.readBuffer; if (theBuffer == null) { theBuffer = con.processor.getBufferPool().allocate(con.processor.getBufferPool().getChunkSize()); con.readBuffer = theBuffer; } // 從 SocketChannel 中讀取數(shù)據(jù)浴韭,并且保存到 AbstractConnection 的 readBuffer 中丘喻,readBuffer 處于 write mode,返回讀取了多少字節(jié) int got = channel.read(theBuffer); // 調(diào)用處理讀取到的數(shù)據(jù)的方法 con.onReadData(got); }
AbstractConnection#onReadData 讀取 readBuffer 中的數(shù)據(jù)并調(diào)用 AbstractConnection#handle 方法進行下一步處理念颈,其內(nèi)部調(diào)用 FrontendCommandHandler#handle
FrontendCommandHandler#handle 根據(jù) data[4] 來判斷命令類型泉粉,客戶端命令請求報文格式如下圖:
data 的第五個字節(jié)存儲命令類型,客戶端命令請求報文命令類型詳情表見附錄1舍肠。我們以 MySQLPacket.COM_QUERY 為例進行接下來的討論搀继。當 data[4] == MySQLPacket.COM_QUERY 時窘面,調(diào)用 FrontendConnection#query(byte[])
public void handle(byte[] data) {
// 判斷命令類型
switch (data[4]) {
...
// INSERT/SELECT/UPDATE/DELETE 等 SQL 歸屬于 MySQLPacket.COM_QUERY
case MySQLPacket.COM_QUERY:
commands.doQuery();
source.query(data);
break;
...
}
}
-
FrontendConnection#query(byte[]) 將 data 字節(jié)數(shù)組轉(zhuǎn)化成 String 類型的 sql,之后調(diào)用 ServerQueryHandler#query(String) 方法
public void query(byte[] data) { MySQLMessage mm = new MySQLMessage(data); // 從 data[5] 即第六個字節(jié)開始讀取參數(shù)體 mm.position(5); String sql = mm.readString(charset); // 執(zhí)行 sql 語句叽躯,內(nèi)部調(diào)用 ServerQueryHandler#query(String) this.query( sql ); }
-
ServerQueryHandler#query(String) 解析 SQL 類型财边,根據(jù)不同類型做不同的處理,之后調(diào)用 ServerConnection#routeEndExecuteSQL 進行路由計算(包括全局序列號点骑、SQL 語句攔截等酣难。路由計算詳細另述)并得到路由結(jié)果 RouteResultset,然后調(diào)用 NonBlockingSession#execute
public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) { // 路由計算 RouteResultset rrs = MycatServer .getInstance() .getRouterservice() .route(MycatServer.getInstance().getConfig().getSystem(), schema, type, sql, this.charset, this); if (rrs != null) { // session 執(zhí)行 session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type); } }
-
NonBlockingSession#execute 獲取路由的 dataNode 節(jié)點黑滴,若節(jié)點數(shù)為1則調(diào)用 SingleNodeHandler#execute 處理 sql憨募,否則調(diào)用 MultiNodeQueryHandler#execute 處理 sql。此處我們假定前端 sql 命令只路由到一個 dataNode袁辈,則調(diào)用 SingleNodeHandler#execute 處理 sql
/** * NonBlockingSession#execute */ @Override public void execute(RouteResultset rrs, int type) { RouteResultsetNode[] nodes = rrs.getNodes(); if (nodes.length == 1) { singleNodeHandler = new SingleNodeHandler(rrs, this); singleNodeHandler.execute(); } else { multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit, this); multiNodeHandler.execute(); } }
-
SingleNodeHandler#execute 獲取后端連接 BackendConnection菜谣,并調(diào)用 SingleNodeHandler#_execute,該方法直接調(diào)用 BackendConnection#execute
public void execute() throws Exception { // 獲取后端數(shù)據(jù)庫連接 final BackendConnection conn = session.getTarget(node); // 若存在 dataNode 對應的 BackendConnection if (session.tryExistsCon(conn, node)) { _execute(conn); } else { // create new connection do something... } } private void _execute(BackendConnection conn) { conn.execute(node, session.getSource(), session.getSource().isAutocommit()); }
-
當 schema.xml 中配置 dataHost 的 dbDriver 為 jdbc 時晚缩,調(diào)用 JDBCConnection#execute 進行 sql 執(zhí)行(JDBCConnection 繼承 BackendConnection)尾膊。該方法新開一個線程處理 sql,最終調(diào)用 JDBCConnection#ouputResultSet 執(zhí)行 sql 并將結(jié)果寫入 ServerConnection
@Override public void execute(final RouteResultsetNode node, final ServerConnection source, final boolean autocommit) { this.sqlSelectLimit = source.getSqlSelectLimit(); Runnable runnable = new Runnable() { @Override public void run() { // 調(diào)用 JDBCConnection#ouputResultSet executeSQL(node, source, autocommit); } }; MycatServer.getInstance().getBusinessExecutor().execute(runnable); }
-
JDBCConnection#ouputResultSet 獲取數(shù)據(jù)庫連接并執(zhí)行 sql荞彼,然后將得到的結(jié)果集 ResultSet 解析為 Result Set 響應報文并寫入 ServerConnection
private void ouputResultSet(ServerConnection sc, String sql) throws SQLException { ResultSet rs = null; Statement stmt = null; try { stmt = con.createStatement(); rs = stmt.executeQuery(sql); List<FieldPacket> fieldPks = new LinkedList<FieldPacket>(); // 根據(jù) resultset 加載列信息冈敛,保存至 fieldPks ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs, this.isSpark); // 獲取列數(shù) int colunmCount = fieldPks.size(); ByteBuffer byteBuf = sc.allocate(); /* 1 寫入 resultset header packet */ ResultSetHeaderPacket headerPkg = new ResultSetHeaderPacket(); headerPkg.fieldCount = fieldPks.size(); headerPkg.packetId = ++packetId; // 將 ResultSetHeaderPacket 的數(shù)據(jù)寫入 byteBuf byteBuf = headerPkg.write(byteBuf, sc, true); byteBuf.flip(); byte[] header = new byte[byteBuf.limit()]; // 將 byteBuf 中的信息寫入 header 中 byteBuf.get(header); // byteBuf 標記歸位 byteBuf.clear(); /* 2 寫入 field packet */ List<byte[]> fields = new ArrayList<byte[]>(fieldPks.size()); Iterator<FieldPacket> itor = fieldPks.iterator(); while (itor.hasNext()) { FieldPacket curField = itor.next(); curField.packetId = ++packetId; // 將 FieldPacket 的數(shù)據(jù)寫入 byteBuf byteBuf = curField.write(byteBuf, sc, false); // position 設回 0,并將 limit 設成之前的 position 的值 // limit:緩沖區(qū)數(shù)組中不可操作的下一個元素的位置:limit<=capacity byteBuf.flip(); byte[] field = new byte[byteBuf.limit()]; // 將 byteBuf 中的信息寫入 field 中 byteBuf.get(field); byteBuf.clear(); // 將 field 放入 fields fields.add(field); } /* 3 寫入 EOF packet */ EOFPacket eofPckg = new EOFPacket(); eofPckg.packetId = ++packetId; // 將 EOFPacket 的數(shù)據(jù)寫入 byteBuf byteBuf = eofPckg.write(byteBuf, sc, false); byteBuf.flip(); byte[] eof = new byte[byteBuf.limit()]; // 將 byteBuf 中的信息寫入 eof 中 byteBuf.get(eof); byteBuf.clear(); this.respHandler.fieldEofResponse(header, fields, eof, this); /* 4 寫入 Row Data packet */ // output row while (rs.next()) { ResultSetMetaData resultSetMetaData = rs.getMetaData(); int size = resultSetMetaData.getColumnCount(); StringBuilder builder = new StringBuilder(); for (int i = 1; i <= size; i++) { builder.append(resultSetMetaData.getColumnName(i) + "=" + rs.getString(i)); if (i < size) { builder.append(", "); } } LOGGER.debug("JDBCConnection.ouputResultSet sql: {}, resultSet: {}", sql, builder.toString()); RowDataPacket curRow = new RowDataPacket(colunmCount); for (int i = 0; i < colunmCount; i++) { int j = i + 1; if (MysqlDefs.isBianry((byte) fieldPks.get(i).type)) { curRow.add(rs.getBytes(j)); } else if (fieldPks.get(i).type == MysqlDefs.FIELD_TYPE_DECIMAL || fieldPks.get(i).type == (MysqlDefs.FIELD_TYPE_NEW_DECIMAL - 256)) { // field type is unsigned byte // ensure that do not use scientific notation format BigDecimal val = rs.getBigDecimal(j); curRow.add(StringUtil.encode(val != null ? val.toPlainString() : null, sc.getCharset())); } else { curRow.add(StringUtil.encode(rs.getString(j), sc.getCharset())); } } curRow.packetId = ++packetId; // 將 RowDataPacket 的數(shù)據(jù)寫入 byteBuf byteBuf = curRow.write(byteBuf, sc, false); byteBuf.flip(); byte[] row = new byte[byteBuf.limit()]; byteBuf.get(row); byteBuf.clear(); this.respHandler.rowResponse(row, this); } fieldPks.clear(); // end row /* 5 寫入 EOF packet */ eofPckg = new EOFPacket(); eofPckg.packetId = ++packetId; byteBuf = eofPckg.write(byteBuf, sc, false); byteBuf.flip(); eof = new byte[byteBuf.limit()]; byteBuf.get(eof); sc.recycle(byteBuf); this.respHandler.rowEofResponse(eof, this); } finally { if (rs != null) { try { rs.close(); } catch (SQLException e) { } } if (stmt != null) { try { stmt.close(); } catch (SQLException e) { } } } }
四鸣皂、Mycat 初始化
Mycat 初始化主要負責啟動 MycatServer 實例抓谴,啟動 MycatServer 實例的過程中,核心工作是讀取并解析 Mycat 配置文件(schema.xml寞缝、rule.xml 和 server.xml)癌压。MycatServer 使用 “饑餓模式” 初始化一個單例
public class MycatServer {
private static final MycatServer INSTANCE = new MycatServer();
public static final MycatServer getInstance() {
return INSTANCE;
}
private MycatServer() {
// 讀取文件配置
this.config = new MycatConfig();
...
}
}
讀取并解析 Mycat 配置文件的具體實現(xiàn)交由 MycatConfig。MycatConfig 內(nèi)部使用 ConfigInitializer 解析全局配置荆陆。ConfigInitializer 主要處理一下幾件事情:
- 讀取 schema.xml措拇、rule.xml 和 server.xml 文件并將解析到的配置類賦給 ConfigInitializer 的變量中
- 解析 DataHost 和對應的 DataNode,創(chuàng)建物理數(shù)據(jù)庫連接池(PhysicalDBPool)和物理數(shù)據(jù)庫節(jié)點(PhysicalDBNode)
- 權(quán)限管理設置
- 加載全局序列處理器配置
- 配置文件自檢
在此我重點敘述 1 和 2
4.1 配置文件讀取
// 讀取 rule.xml和schema.xml
SchemaLoader schemaLoader = new XMLSchemaLoader();
// 讀取 server.xml
XMLConfigLoader configLoader = new XMLConfigLoader(schemaLoader);
4.2 創(chuàng)建物理數(shù)據(jù)庫連接池(PhysicalDBPool)
initDataHosts 為每一個 <dataHost> 節(jié)點創(chuàng)建一個數(shù)據(jù)庫連接池慎宾,創(chuàng)建完成后返回 Map<String, PhysicalDBPool> physicalDBPoolMap丐吓,其中 key 為 <dataHost> 節(jié)點的 name 屬性值,value 為 <dataHost> 節(jié)點對應的數(shù)據(jù)庫連接池
private Map<String, PhysicalDBPool> initDataHosts(ConfigLoader configLoader) {
Map<String, DataHostConfig> dataHostConfigMap = configLoader.getDataHosts();
Map<String, PhysicalDBPool> physicalDBPoolMap = new HashMap<>(dataHostConfigMap.size());
for (DataHostConfig dataHostConfig : dataHostConfigMap.values()) {
// 為每個 dataHost 節(jié)點建立一個 PhysicalDBPool
PhysicalDBPool pool = getPhysicalDBPool(dataHostConfig, configLoader);
physicalDBPoolMap.put(pool.getHostName(), pool);
}
return physicalDBPoolMap;
}
io.mycat.config.ConfigInitializer#getPhysicalDBPool 方法為每個 <dataHost> 節(jié)點建立一個 PhysicalDBPool趟据。主要工作如下:
- 為每一個 <dataHost> 節(jié)點的 <writeHost> 節(jié)點創(chuàng)建一個 PhysicalDatasource
- 為每一個 <dataHost> 節(jié)點的 <readHost> 節(jié)點創(chuàng)建一個 PhysicalDatasource
- 初始化 PhysicalDBPool 并返回
private PhysicalDBPool getPhysicalDBPool(DataHostConfig dataHostConfig, ConfigLoader configLoader) {
// dataHost 節(jié)點名
String name = dataHostConfig.getName();
// 數(shù)據(jù)庫類型券犁,我們這里只討論MySQL
String dbType = dataHostConfig.getDbType();
// 連接數(shù)據(jù)庫驅(qū)動,我們這里只討論 MyCat 自己實現(xiàn)的 native
String dbDriver = dataHostConfig.getDbDriver();
// 1 為每一個 <dataHost> 節(jié)點的 <writeHost> 節(jié)點創(chuàng)建一個 PhysicalDatasource
PhysicalDatasource[] writeSources = createDataSource(dataHostConfig, name, dbType, dbDriver, dataHostConfig.getWriteHosts(), false);
Map<Integer, DBHostConfig[]> readHostsMap = dataHostConfig.getReadHosts();
Map<Integer, PhysicalDatasource[]> readSourcesMap = new HashMap<Integer, PhysicalDatasource[]>(readHostsMap.size());
// 對于每個讀節(jié)點建立 key 為 writeHost 下標 value 為 readHost 的 PhysicalDatasource[] 的哈希表
for (Map.Entry<Integer, DBHostConfig[]> entry : readHostsMap.entrySet()) {
// 2 為每一個 <dataHost> 節(jié)點的 <readHost> 節(jié)點創(chuàng)建一個 PhysicalDatasource
PhysicalDatasource[] readSources = createDataSource(dataHostConfig, name, dbType, dbDriver, entry.getValue(), true);
readSourcesMap.put(entry.getKey(), readSources);
}
// 3 初始化 PhysicalDBPool 并返回
PhysicalDBPool pool = new PhysicalDBPool(dataHostConfig.getName(), dataHostConfig, writeSources, readSourcesMap, dataHostConfig.getBalance(), dataHostConfig.getWriteType());
pool.setSlaveIDs(dataHostConfig.getSlaveIDs());
return pool;
}
io.mycat.config.ConfigInitializer#createDataSource 完成具體的數(shù)據(jù)源創(chuàng)建汹碱。根據(jù)不同的 dvType 和 dbDriver 創(chuàng)建不同的 PhysicalDatasource:
- dvType == mysql && dbDriver == native --> MySQLDataSource
- dvType == mysql && dbDriver == jdbc --> JDBCDataSource
- dvType == postgresql && dbDriver == native --> PostgreSQLDataSource
private PhysicalDatasource[] createDataSource(DataHostConfig dataHostConfig, String hostName, String dbType, String dbDriver, DBHostConfig[] nodes, boolean isRead) {
PhysicalDatasource[] dataSources = new PhysicalDatasource[nodes.length];
if ("mysql".equals(dbType) && "native".equals(dbDriver)) {
for (int i = 0; i < nodes.length; i++) {
//設置最大 idle 時間粘衬,默認為 30 分鐘(可自定義)
nodes[i].setIdleTimeout(system.getIdleTimeout());
MySQLDataSource ds = new MySQLDataSource(nodes[i], dataHostConfig, isRead);
dataSources[i] = ds;
}
} else if ("jdbc".equals(dbDriver)) {
for (int i = 0; i < nodes.length; i++) {
nodes[i].setIdleTimeout(system.getIdleTimeout());
JDBCDatasource ds = new JDBCDatasource(nodes[i], dataHostConfig, isRead);
dataSources[i] = ds;
}
} else if ("postgresql".equalsIgnoreCase(dbType) && dbDriver.equalsIgnoreCase("native")) {
for (int i = 0; i < nodes.length; i++) {
nodes[i].setIdleTimeout(system.getIdleTimeout());
PostgreSQLDataSource ds = new PostgreSQLDataSource(nodes[i], dataHostConfig, isRead);
dataSources[i] = ds;
}
} else {
throw new ConfigException("not supported yet !" + hostName);
}
return dataSources;
}
4.3 創(chuàng)建物理數(shù)據(jù)庫節(jié)點(PhysicalDBNode)
io.mycat.config.ConfigInitializer#initDataNodes 為每個 <dataNode> 節(jié)點創(chuàng)建一個 PhysicalDBNode,根據(jù) <dataNode> 節(jié)點的 dataHost 屬性值從 Map<String, PhysicalDBPool> dataHosts 中找到 <dataNode> 對應的連接池,并賦予 PhysicalDBNode
private Map<String, PhysicalDBNode> initDataNodes(ConfigLoader configLoader) {
Map<String, PhysicalDBNode> nodes = new HashMap<String, PhysicalDBNode>(dataNodeConfigMap.size());
Map<String, DataNodeConfig> dataNodeConfigMap = configLoader.getDataNodes();
for (DataNodeConfig dataNodeConfig : dataNodeConfigMap.values()) {
// 根據(jù) dataHost 名稱獲取對應的 PhysicalDBPool
PhysicalDBPool pool = this.dataHosts.get(dataNodeConfig.getDataHost());
PhysicalDBNode dataNode = new PhysicalDBNode(dataNodeConfig.getName(), dataNodeConfig.getDatabase(), pool);
nodes.put(dataNode.getName(), dataNode);
}
return nodes;
}
五稚新、mycat 的連接池模型
Mycat 為了最高效的利用后端的 MySQL 連接勘伺,采取了不同于 Cobar 也不同于傳統(tǒng) JDBC 連接池的做法,傳統(tǒng)的做法是基于 Database 的連接池褂删,即一個 MySQL 服務器上有 5 個 Database飞醉,則每個 Database 獨占最大200 個連接。這種模式的最大問題在于屯阀,將一個數(shù)據(jù)庫所具備的最大 1000 個連接缅帘,隔離成了更新小的連接池,于是可能產(chǎn)生一個應用的連接不夠难衰,但其他應用的連接卻很空閑的資源浪費情況钦无,而對于分片這種場景,這個缺陷則幾乎是致命的盖袭,因為每個分片所對應的 Database 的連接數(shù)量被限制在了一個很小的范圍內(nèi)失暂,從而導致系統(tǒng)并發(fā)能力的大幅降低。而 Mycat 則采用了基于 MySQL 實例的連接池模式鳄虱,每個 Database 都可以用現(xiàn)有的 1000 個連接中的空閑連接
5.1 核心對象
5.1.1 ConMap 和 ConQueue
在 Mycat 的連接池里趣席,當前可用的、空閑的 MySQL 連接是放到一個 ConcurrentHashMap 的數(shù)據(jù)結(jié)構(gòu)里醇蝴,Key 為當前連接對應的 Database,另外還有二級分類 ConQueue想罕,按照連接是自動提交還是手動提交模式進行區(qū)分悠栓,這個設計是為了高效的查詢匹配的可用連接。ConMap 和 ConQueue 包含的關鍵對象有:
- ConcurrentHashMap<String, ConQueue> items:可用的 MySQL 連接容器按价,key 為當前連接對應的 database 名惭适,value 為 ConQueue 對象,里面包含了兩個存儲數(shù)據(jù)庫連接的隊列
- ConcurrentLinkedQueue<BackendConnection> autoCommitCons:自動提交的數(shù)據(jù)庫連接
- ConcurrentLinkedQueue<BackendConnection> manCommitCons:手動提交的數(shù)據(jù)庫連接
public class ConMap {
/**
* key:當前連接對應的 Database
* ConQueue:數(shù)據(jù)庫連接隊列(按照連接是自動提交還是手動提交模式進行區(qū)分楼镐,這個設計是為了高效的查詢匹配的可用連接)
*/
private final ConcurrentHashMap<String, ConQueue> items = new ConcurrentHashMap<String, ConQueue>();
}
public class ConQueue {
private final ConcurrentLinkedQueue<BackendConnection> autoCommitCons = new ConcurrentLinkedQueue<BackendConnection>();
private final ConcurrentLinkedQueue<BackendConnection> manCommitCons = new ConcurrentLinkedQueue<BackendConnection>();
private long executeCount;
}
BackendConnection 為后端數(shù)據(jù)庫連接癞志,其實現(xiàn)有 JDBCConnection、MySQLConnection 等
5.1.2 PhysicalDatasource
對應于 <dataHost> 節(jié)點下的 <writeHost> 或 <readHost> 子節(jié)點框产,表示一個物理數(shù)據(jù)庫實例凄杯。每個數(shù)據(jù)庫實例中保存了多個可用的數(shù)據(jù)庫連接(BackendConnection),mycat 初始化時秉宿,根據(jù) <dataHost> 節(jié)點的 minCon
屬性值初始化多個可用的數(shù)據(jù)庫連接戒突。其關鍵對象有:
- size:讀或?qū)戇B接池的最大連接數(shù)
- conMap:存放當前可用的數(shù)據(jù)庫連接
public abstract class PhysicalDatasource {
private final String name;
private final int size;
private final DBHostConfig config;
private final ConMap conMap = new ConMap();
private final boolean readNode;
private final DataHostConfig hostConfig;
private PhysicalDBPool dbPool;
}
PhysicalDatasource 的實現(xiàn)類有:
5.1.3 PhysicalDBPool
對應于 <dataHost name="localhost1" > 節(jié)點,表示物理數(shù)據(jù)庫實例池描睦。由于 <datahost> 節(jié)點可包含多個 <writeHost> 節(jié)點膊存,因此 PhysicalDBPool 可以包含多個物理數(shù)據(jù)庫實例,其關鍵對象有:
- hostName:<dataHosr> 標簽的 name 屬性
- writeSources 和 readSources:可寫和可讀的多個物理數(shù)據(jù)庫實例,對應于 <writeHost> 和 <readHost>
- activedIndex:表明了當前是哪個寫節(jié)點的數(shù)據(jù)源在生效
public class PhysicalDBPool {
private final String hostName;
protected PhysicalDatasource[] writeSources;
protected Map<Integer, PhysicalDatasource[]> readSources;
protected volatile int activedIndex;
private final DataHostConfig dataHostConfig;
}
5.1.4 PhysicalDBNode
對應于 <dataNode /> 節(jié)點隔崎,表示一個數(shù)據(jù)庫分片今艺,PhysicalDBNode 包含的關鍵對象有:
- name:dataNode 名稱,對應于 <dataNode> 標簽的 name 屬性
- database:數(shù)據(jù)庫名稱爵卒,對應于 <dataNode> 標簽的 database 屬性
- PhysicalDBPool dbPool:MySQL 連接池虚缎,里面包含了多個數(shù)據(jù)庫實例 PhysicalDatasource,并將其按照讀節(jié)點和寫節(jié)點分類技潘,實現(xiàn)讀寫分類和節(jié)點切換的功能遥巴。其中 activedIndex 屬性表明了當前是哪個寫節(jié)點的數(shù)據(jù)源在生效
public class PhysicalDBNode {
protected final String name;
protected final String database;
protected final PhysicalDBPool dbPool;
}
若 schema.xml 中配置了一下分片節(jié)點:
<dataNode name="dn1" dataHost="localhost1" database="db_demo_01"/>
<dataNode name="dn2" dataHost="localhost1" database="db_demo_02"/>
<dataNode name="dn3" dataHost="localhost1" database="db_demo_03"/>
當某個用戶會話需要一個自動提交的,到分片 <dataNode name="dn1" dataHost="localhost1" database="db_demo_01"/> 的 SQL 連接的時候享幽,分片節(jié)點 dn1 首先在連接池 dbPool 中查找是否有數(shù)據(jù)庫 db_demo_01 (對應于 PhysicalDatasource)上的可用連接铲掐,若有則看是否有自動提交模式的連接,找到就返回值桩,否則返回 db_demo_01 上的手動提交模式的連接摆霉;若沒有 db_demo_01 的可用連接,則隨機返回一個其他數(shù)據(jù)庫(db_demo_02 或 db_demo_03)對應的可用連接奔坟;若沒有其他數(shù)據(jù)庫也沒有可用連接携栋,并且連接池還沒達到上限,則創(chuàng)建一個新連接并返回
上述獲取數(shù)據(jù)庫連接的邏輯有一種情況是:用戶會話得到的數(shù)據(jù)庫連接可能不是來自于 db_demo_01 的咳秉,因此在執(zhí)行具體的 SQL 之前婉支,還有一個自動同步數(shù)據(jù)庫連接的過程:包括事務隔離級別、事務模式澜建、字符集向挖、database 等四個指標。同步完成以后炕舵,才會執(zhí)行具體的 SQL 請求
通過共享一個 MySQL 上的所有數(shù)據(jù)庫的可用連接何之,并結(jié)合連接狀態(tài)同步的特性,MyCAT 的連接池做到了最佳的吞吐量咽筋,也在一定程度上提升了整個系統(tǒng)的并發(fā)支撐能力
5.2 創(chuàng)建數(shù)據(jù)庫連接
5.2.1 創(chuàng)建新連接時機
創(chuàng)建新數(shù)據(jù)庫連接的方法為 PhysicalDatasource#createNewConnection(io.mycat.backend.mysql.nio.handler.ResponseHandler, java.lang.Object, java.lang.String)溶推,其有兩個觸發(fā)時機:
-
io.mycat.backend.datasource.PhysicalDatasource#createByIdleLitte
執(zhí)行空閑檢測時觸發(fā),若當前數(shù)據(jù)庫連接總數(shù)(空閑連接數(shù)和活動鏈接數(shù)之和)小于連接池的最大連接數(shù)奸攻,且空閑連接數(shù)小于連接池最小連接數(shù)蒜危,則調(diào)用 PhysicalDatasource#createByIdleLitte 方法創(chuàng)建新數(shù)據(jù)庫連接
if ((createCount > 0) && (idleCons + activeCons < size) && (idleCons < hostConfig.getMinCon())) { createByIdleLitte(idleCons, createCount); }
-
io.mycat.backend.datasource.PhysicalDatasource#getConnection
首先調(diào)用 ConMap#tryTakeCon(java.lang.String, boolean) 當前 database 上是否有可用連接,若有則立即返回睹耐,否則從其他的 database 上找一個可用連接返回舰褪。若 ConMap#tryTakeCon 返回 null,表示數(shù)據(jù)庫連接池中沒有空閑連接疏橄,則調(diào)用 PhysicalDatasource#createNewConnection 創(chuàng)建新連接
public void getConnection(String schema, boolean autocommit, final ResponseHandler handler, final Object attachment) throws IOException { // 從當前連接 map 中拿取已建立好的后端連接 BackendConnection con = this.conMap.tryTakeCon(schema, autocommit); if (con != null) { //如果不為空占拍,則綁定對應前端請求的handler takeCon(con, handler, attachment, schema); } else { long activeCons = increamentCount.longValue() + totalConnectionCount; if (activeCons < size) { createNewConnection(handler, attachment, schema); } else { LOGGER.error("the max activeConnnections size can not be max than maxconnections"); throw new IOException("the max activeConnnections size can not be max than maxconnections"); } } }
5.2.2 創(chuàng)建新數(shù)據(jù)庫連接
MycatServer#startup 方法里其中一件事情就是初始化 PhysicalDBPool
public void startup() throws IOException {
...
Map<String, PhysicalDBPool> dataHosts = config.getDataHosts();
for (PhysicalDBPool physicalDBPool : dataHosts.values()) {
String index = dnIndexProperties.getProperty(physicalDBPool.getHostName(), "0");
physicalDBPool.init(Integer.parseInt(index));
physicalDBPool.startHeartbeat();
}
...
}
physicalDBPool.init(Integer.parseInt(index)) 中 調(diào)用 PhysicalDBPool#initSource 方法略就,該方法對每一個 PhysicalDatasource 調(diào)用 getConnection 方法創(chuàng)建新的數(shù)據(jù)庫連接
public void init(int index, String reason) {
for (int i = 0; i < writeSources.length; i++) {
int j = loop(i + index);
initSource(j, writeSources[j])
}
}
private boolean initSource(int index, PhysicalDatasource physicalDatasource) {
int initSize = physicalDatasource.getConfig().getMinCon();
CopyOnWriteArrayList<BackendConnection> list = new CopyOnWriteArrayList<>();
GetConnectionHandler getConHandler = new GetConnectionHandler(list, initSize);
for (int i = 0; i < initSize; i++) {
try {
physicalDatasource.getConnection(this.schemas[i % schemas.length], true, getConHandler, null);
} catch (Exception e) {
LOGGER.warn(getMessage(index, " init connection error."), e);
}
}
...
}
PhysicalDatasource#createNewConnection 方法新建一個線程異步執(zhí)行創(chuàng)建數(shù)據(jù)庫連接的操作,每個線程通過調(diào)用抽象方法來進行具體的創(chuàng)建邏輯晃酒。
private void createNewConnection(final ResponseHandler handler, final Object attachment, final String schema) throws IOException {
// aysn create connection
final AtomicBoolean hasError = new AtomicBoolean(false);
MycatServer.getInstance().getBusinessExecutor().execute(new Runnable() {
@Override
public void run() {
try {
createNewConnection(new DelegateResponseHandler(handler) {
@Override
public void connectionError(Throwable e, BackendConnection conn) {
if (hasError.compareAndSet(false, true)) {
handler.connectionError(e, conn);
} else {
LOGGER.info("connection connectionError ");
}
}
@Override
public void connectionAcquired(BackendConnection conn) {
LOGGER.info("connection id is " + conn.getId());
takeCon(conn, handler, attachment, schema);
}
}, schema);
} catch (IOException e) {
if (hasError.compareAndSet(false, true)) {
handler.connectionError(e, null);
} else {
LOGGER.info("connection connectionError ");
}
}
}
});
}
每個繼承 PhysicalDatasource 的數(shù)據(jù)源對象自己實現(xiàn)如下抽象方法:
public abstract void createNewConnection(ResponseHandler handler, String schema) throws IOException;
我們以 JDBCDatasource 為例表牢,其實現(xiàn)的 createNewConnection 方法實現(xiàn)代碼如下:
public class JDBCDatasource extends PhysicalDatasource {
@Override
public void createNewConnection(ResponseHandler handler, String schema) throws IOException {
DBHostConfig cfg = getConfig();
JDBCConnection jdbcConnection = new JDBCConnection();
jdbcConnection.setHost(cfg.getIp());
jdbcConnection.setPort(cfg.getPort());
jdbcConnection.setJdbcDatasource(this);
jdbcConnection.setSchema(schema);
jdbcConnection.setDbType(cfg.getDbType());
// 復用mysql的Backend的ID,需要在process中存儲
jdbcConnection.setId(NIOConnector.ID_GENERATOR.getId());
NIOProcessor processor = MycatServer.getInstance().nextProcessor();
jdbcConnection.setProcessor(processor);
processor.addBackend(jdbcConnection);
try {
Connection con = getConnection();
jdbcConnection.setCon(con);
// notify handler
handler.connectionAcquired(jdbcConnection);
} catch (Exception e) {
handler.connectionError(e, jdbcConnection);
}
}
}
主要做了一下幾件事:
實例化一個 JDBCConnection贝次,設置相關參數(shù)
-
調(diào)用 JDBCDatasource#getConnection 獲取 Connection
JDBCDatasource#getConnection 直接使用 DriverManager 創(chuàng)建一個新連接并返回
public Connection getConnection() throws SQLException { DBHostConfig cfg = getConfig(); Connection connection = DriverManager.getConnection(cfg.getUrl(), cfg.getUser(), cfg.getPassword()); return connection; }
-
調(diào)用 DelegateResponseHandler#connectionAcquired崔兴,作為已獲得有效連接的響應處理
@Override public void connectionAcquired(BackendConnection conn) { LOGGER.info("connection id is " + conn.getId()); takeCon(conn, handler, attachment, schema); }
ResponseHandler#connectionAcquired 調(diào)用 PhysicalDatasource#takeCon 方法進行相應處理,代碼如下:
private BackendConnection takeCon(BackendConnection conn, final ResponseHandler handler, final Object attachment, String schema) { // 該連接已被借用 conn.setBorrowed(true); if (!conn.getSchema().equals(schema)) { // need do schema syn in before sql send conn.setSchema(schema); } ConQueue queue = conMap.getSchemaConQueue(schema); queue.incExecuteCount(); conn.setAttachment(attachment); // 每次取連接的時候蛔翅,更新下 lasttime敲茄,防止在前端連接檢查的時候,關閉連接山析,導致sql執(zhí)行失敗 conn.setLastTime(System.currentTimeMillis()); handler.connectionAcquired(conn); return conn; }
主要做了一下幾件事:
設置連接已被借用
獲取 ConQueue堰燎,增加可以執(zhí)行連接的數(shù)量
-
調(diào)用 ResponseHandler#connectionAcquired,具體實現(xiàn)見 GetConnectionHandler#connectionAcquired笋轨,該方法調(diào)用 BackendConnection#release 釋放連接秆剪,調(diào)用 PhysicalDatasource#returnCon 方法將釋放的連接放回 ConMap 的 ConQueue 中
@Override public void connectionAcquired(BackendConnection conn) { successCons.add(conn); finishedCount.addAndGet(1); logger.info("connected successfully " + conn); conn.release(); } @Override public void JDBCConnection#release() { jdbcDatasource.releaseChannel(this); } public void PhysicalDatasource#releaseChannel(BackendConnection c) { returnCon(c); } private void PhysicalDatasource#returnCon(BackendConnection c) { c.setAttachment(null); c.setBorrowed(false); c.setLastTime(TimeUtil.currentTimeMillis()); ConQueue queue = this.conMap.getSchemaConQueue(c.getSchema()); boolean ok = false; if (c.isAutocommit()) { ok = queue.getAutoCommitCons().offer(c); } else { ok = queue.getManCommitCons().offer(c); } // 若無法放入 ConQueue 則將連接關閉 if (!ok) { LOGGER.warn("can't return to pool ,so close con " + c); c.close("can't return to pool "); } }
5.2.3 總結(jié)
Mycat 服務啟動時調(diào)用 MycatServer 的 startUp 方法對每一個 <dataHost> 節(jié)點的多個 <writeHost> 節(jié)點對應的數(shù)據(jù)源做初始化工作。初始創(chuàng)建數(shù)據(jù)庫連接數(shù)由 <dataHost> 節(jié)點的minCon 屬性值決定爵政。每創(chuàng)建一個 BackendConnection 便回調(diào) GetConnectionHandler#connectionAcquired 將新生成的 connection 的 borrowed 屬性設置為 false(該屬性個人理解是標記連接是否空閑)仅讽,然后將connection 保存于 ConQueue 中
因此一個 <dataHost> 節(jié)點對應一個 PhysicalDBPool
,PhysicalDBPool
類中的 PhysicalDatasource[] writeSources
對應于 <dataHost> 節(jié)點下多個 <writeHost> 節(jié)點钾挟。每個 PhysicalDatasource
中持有一個 ConMap conMap
作為數(shù)據(jù)源的連接池洁灵, 里面存放著可用的數(shù)據(jù)庫連接 BackendConnection
Mycat 根據(jù) <dataNode> 節(jié)點的 dataHost 屬性和 database 屬性,將數(shù)據(jù)庫連接均勻得分配給在同一個 dataHost 中的不同數(shù)據(jù)庫掺出。例如對于以下配置:
<!-- 分片節(jié)點 -->
<dataNode name="dn1" dataHost="localhost1" database="db_demo_01"/>
<dataNode name="dn2" dataHost="localhost1" database="db_demo_02"/>
<dataNode name="dn3" dataHost="localhost1" database="db_demo_03"/>
<!-- 節(jié)點主機 -->
<dataHost name="localhost1" maxCon="10000" minCon="100" balance="0" writeType="0" dbType="mysql" dbDriver="jdbc" switchType="1" slaveThreshold="100">
<heartbeat>select user()</heartbeat>
<writeHost host="hostM1" url="jdbc:mysql://localhost:3306?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&autoReconnect=true" user="root" password="root">
</writeHost>
</dataHost>
Mycat 會初始化 100 個(minCon="100")數(shù)據(jù)庫連接徽千,并將這 100 個連接均分給 db_demo_01、db_demo_02 和 db_demo_03蛛砰,如下圖所示
5.3 獲取數(shù)據(jù)庫連接
本節(jié)主要講述獲取可用的數(shù)據(jù)庫連接用于執(zhí)行客戶端的 SQL 請求
5.3.1 涉及的核心類
- NIOAcceptor:負責處理 Accept 事件,即 MyCAT 作為服務端去處理前端業(yè)務程序發(fā)過來的連接請求
- ServerConnectionFactory:客戶端和 Mycat 連接工廠黍衙,用于創(chuàng)建客戶端連接
- ServerConnection:客戶端連接(客戶端和 Mycat 之間的連接)
- NonBlockingSession:客戶端連接和后端數(shù)據(jù)庫連接的會話泥畅,其核心對象有:
- ServerConnection serverConnection:客戶端連接
- ConcurrentMap<RouteResultSetNode, BackendConnection> backendConnectionMap:存放路由節(jié)點和對應的數(shù)據(jù)庫連接的容器
- SingleNodeHandler singleNodeHandler:單路由節(jié)點請求處理器
- MultiNodeQueryHandler multiNodeHandler:多路由節(jié)點請求處理器
- SingleNodeHandler(MultiNodeQueryHandler):路由節(jié)點請求處理器,其核心對象有:
- RouteResultSetNode routeResultSetNode
- RouteResultset rrs
- NonBlockingSession session
- PhysicalDBNode
- PhysicalDatasource
5.3.2 獲取過程解析
NIOAcceptor 在接受到前端發(fā)來的連接請求后琅翻,會調(diào)用 ServerConnectionFactory 實例化一個 ServerConnection位仁,并實例化一個 NonBlockingSession 賦予 ServerConnection
public class ServerConnectionFactory extends FrontendConnectionFactory {
@Override
protected FrontendConnection getConnection(NetworkChannel channel) throws IOException {
SystemConfig systemConfig = MycatServer.getInstance().getConfig().getSystem();
// 將 channel 包裝為一個 ServerConnection
ServerConnection serverConnection = new ServerConnection(channel);
// 設置客戶端查詢處理器
serverConnection.setQueryHandler(new ServerQueryHandler(serverConnection));
// 設置客戶端和 Mycat 的一個會話 session
serverConnection.setSession2(new NonBlockingSession(serverConnection));
...
return serverConnection;
}
}
前端 SQL 請求進來之后诸典,Mycat 調(diào)用 ServerConnection#routeEndExecuteSQL 進行路由計算并得到路由結(jié)果 RouteResultset移层,然后調(diào)用 NonBlockingSession#execute 進行處理,若 RouteResultset 中包含多個路由節(jié)點帖努,則調(diào)用 MultiNodeQueryHandler 的 execute 方法棠众;若 RouteResultset 只包含單個路由節(jié)點琳疏,則調(diào)用 SingleNodeHandler 的 execute 方法有决。此處我們假設是單個路由節(jié)點
public class NonBlockingSession implements Session {
@Override
public void execute(RouteResultset routeResultset, int type) {
RouteResultSetNode[] nodes = routeResultset.getNodes();
if (nodes.length == 1) {
// 實例化一個 SingleNodeHandler 對象
singleNodeHandler = new SingleNodeHandler(routeResultset, this);
singleNodeHandler.execute();
}
}
}
SingleNodeHandler#execute 首先通過 session 獲取客戶端連接 ServerConnection 以及后端數(shù)據(jù)庫連接 BackendConnection。第一次獲取 BackendConnection 時由于 session 還沒有將 routeResultSetNode 和 BackendConnection 綁定空盼,故 backendConnection 返回 null书幕,SingleNodeHandler#execute 要調(diào)用 PhysicalDBNode#getConnection 創(chuàng)建一個新的數(shù)據(jù)庫連接,并將其綁定到 session 中
public class SingleNodeHandler implements ResponseHandler {
public void execute() throws Exception {
// 通過 session 拿到客戶端連接 ServerConnection
ServerConnection serverConnection = session.getServerConnection();
// 通過 session 拿到后端數(shù)據(jù)庫連接
final BackendConnection backendConnection = session.getTarget(routeResultSetNode);
// 若存在 routeResultsetNode 對應的 BackendConnection
if (session.tryExistsCon(backendConnection, routeResultSetNode)) {
_execute(backendConnection);
} else { // 若不存在 routeResultsetNode 對應的 BackendConnection揽趾,則創(chuàng)建新的連接
MycatConfig conf = MycatServer.getInstance().getConfig();
PhysicalDBNode dn = conf.getDataNodes().get(routeResultSetNode.getName());
dn.getConnection(dn.getDatabase(), serverConnection.isAutocommit(), routeResultSetNode, this, routeResultSetNode);
}
}
}
PhysicalDBNode#getConnection 從分片節(jié)點 dataNode 的數(shù)據(jù)庫連接池中獲取一個可寫的 PhysicalDatasource台汇,并調(diào)用 PhysicalDatasource#getConnection 從 ConMap 中獲取一個可用的數(shù)據(jù)庫連接
public class PhysicalDBNode {
public void getConnection(String schema, boolean autoCommit, RouteResultSetNode routeResultSetNode, ResponseHandler handler, Object attachment) throws Exception {
// 從分片節(jié)點 dataNode 的數(shù)據(jù)庫連接池中獲取一個可寫的 PhysicalDatasource
PhysicalDatasource writeSource = dbPool.getSource();
writeSource.getConnection(schema, autoCommit, handler, attachment);
}
}
PhysicalDatasource#getConnection 從 ConMap 中獲取一個可用的數(shù)據(jù)庫連接后,調(diào)用 PhysicalDatasource#takeCon 將獲取的 con 標記為已用(borrowed = true)
public class PhysicalDatasource {
public void getConnection(String schema, boolean autocommit, final ResponseHandler handler, final Object attachment)
throws IOException {
// 從 conMap 中拿取已建立好的后端連接
BackendConnection con = this.conMap.tryTakeCon(schema, autocommit);
if (con != null) {
takeCon(con, handler, attachment, schema);
}
}
private BackendConnection takeCon(BackendConnection backendConnection, final ResponseHandler handler, final Object attachment, String schema) {
// 標記該連接為已用
backendConnection.setBorrowed(true);
handler.connectionAcquired(backendConnection);
return backendConnection;
}
}
之后調(diào)用 ResponseHandler#connectionAcquired 進行連接獲取后的確認邏輯篱瞎,此處調(diào)用的實際實現(xiàn)類為 SingleNodeHandler苟呐,其在創(chuàng)建數(shù)據(jù)庫連接時將自己作為 ResponseHandler 傳入 PhysicalDBNode#getConnection 中
dn.getConnection(dn.getDatabase(), serverConnection.isAutocommit(), routeResultSetNode, this, routeResultSetNode);
SingleNodeHandler 本身實現(xiàn)了 ResponseHandler 接口,并實現(xiàn)了 connectionAcquired 方法俐筋,具體代碼如下:
public class SingleNodeHandler implements ResponseHandler {
public void connectionAcquired(final BackendConnection backendConnection) {
// 將 routeResultsetNode 對應的后端連接記錄在 session 的 backendConnectionMap 中
session.bindConnection(routeResultSetNode, backendConnection);
_execute(backendConnection);
}
}
因此牵素,PhysicalDatasource#getConnection 從 ConMap 中獲取一個可用的數(shù)據(jù)庫連接后,首先將該連接標記為已用(borrowed = true)校哎,然后將 backendConnection 和對應的路由節(jié)點 node 綁定到 session 的 backendConnectionMap 中两波,最后調(diào)用 _execute(backendConnection)
最進一步執(zhí)行請求處理,具體客戶端 SQL 請求執(zhí)行邏輯參見:三闷哆、客戶端 SQL 請求執(zhí)行流程
5.3.3 總結(jié)
當客戶端發(fā)送 SQL 請求至 Mycat 時腰奋,Mycat 首先在 NonBlockingSession 的 ConcurrentMap<RouteResultSetNode, BackendConnection> backendConnectionMap
中查找是否存在 SQL 的路由節(jié)點 RouteResultSetNode 對應的 BackendConnection,若存在則返回抱怔,繼續(xù)執(zhí)行后續(xù)操作劣坊;若不存在,則從 PhysicalDatasource 的 ConMap 中獲取一個可用的數(shù)據(jù)庫連接 BackendConnection屈留,并將其標記為已用(BackendConnection 的 borrowed 屬性設置為 true)局冰,然后將 BackendConnection 和 RouteResultSetNode 注冊于 NonBlockingSession 的 backendConnectionMap 中
5.4 釋放已用的數(shù)據(jù)庫連接
5.5 關閉數(shù)據(jù)庫連接
附錄
附錄1 客戶端命令請求報文命令類型詳情表
類型值 | 命令 | 功能 | 關聯(lián)函數(shù) |
---|---|---|---|
0x00 | COM_SLEEP | (內(nèi)部線程狀態(tài)) | (無) |
0x01 | COM_QUIT | 關閉連接 | mysql_close |
0x02 | COM_INIT_DB | 切換數(shù)據(jù)庫 | mysql_select_db |
0x03 | COM_QUERY | SQL查詢請求 | mysql_real_query |
0x04 | COM_FIELD_LIST | 獲取數(shù)據(jù)表字段信息 | mysql_list_fields |
0x05 | COM_CREATE_DB | 創(chuàng)建數(shù)據(jù)庫 | mysql_create_db |
0x06 | COM_DROP_DB | 刪除數(shù)據(jù)庫 | mysql_drop_db |
0x07 | COM_REFRESH | 清除緩存 | mysql_refresh |
0x08 | COM_SHUTDOWN | 停止服務器 | mysql_shutdown |
0x09 | COM_STATISTICS | 獲取服務器統(tǒng)計信息 | mysql_stat |
0x0A | COM_PROCESS_INFO | 獲取當前連接的列表 | mysql_list_processes |
0x0B | COM_CONNECT | (內(nèi)部線程狀態(tài)) | (無) |
0x0C | COM_PROCESS_KILL | 中斷某個連接 | mysql_kill |
0x0D | COM_DEBUG | 保存服務器調(diào)試信息 | mysql_dump_debug_info |
0x0E | COM_PING | 測試連通性 | mysql_ping |
0x0F | COM_TIME | (內(nèi)部線程狀態(tài)) | (無) |
0x10 | COM_DELAYED_INSERT | (內(nèi)部線程狀態(tài)) | (無) |
0x11 | COM_CHANGE_USER | 重新登陸(不斷連接) | mysql_change_user |
0x12 | COM_BINLOG_DUMP | 獲取二進制日志信息 | (無) |
0x13 | COM_TABLE_DUMP | 獲取數(shù)據(jù)表結(jié)構(gòu)信息 | (無) |
0x14 | COM_CONNECT_OUT | (內(nèi)部線程狀態(tài)) | (無) |
0x15 | COM_REGISTER_SLAVE | 從服務器向主服務器進行注冊 | (無) |
0x16 | COM_STMT_PREPARE | 預處理SQL語句 | mysql_stmt_prepare |
0x17 | COM_STMT_EXECUTE | 執(zhí)行預處理語句 | mysql_stmt_execute |
0x18 | COM_STMT_SEND_LONG_DATA | 發(fā)送BLOB類型的數(shù)據(jù) | mysql_stmt_send_long_data |
0x19 | COM_STMT_CLOSE | 銷毀預處理語句 | mysql_stmt_close |
0x1A | COM_STMT_RESET | 清除預處理語句參數(shù)緩存 | mysql_stmt_reset |
0x1B | COM_SET_OPTION | 設置語句選項 | mysql_set_server_option |
0x1C | COM_STMT_FETCH | 獲取預處理語句的執(zhí)行結(jié)果 | mysql_stmt_fetch |