Mycat 概述

NoSQL 根本性的優(yōu)勢在于在云計算時代拌蜘,簡單、易于大規(guī)模分布式擴展陪竿,并且讀寫性能非常高

關系型數(shù)據(jù)庫 NoSQL 數(shù)據(jù)庫
特點 -數(shù)據(jù)關系模型基于關系模型禽翼,結(jié)構(gòu)化存儲,完整性約束
-基于二維表及其之間的聯(lián)系族跛,需要連接闰挡、并、交礁哄、差长酗、除等數(shù)據(jù)操作
-采用結(jié)構(gòu)化的查詢語言(SQL)做數(shù)據(jù)讀寫
-操作需要數(shù)據(jù)的一致性,需要事務甚至是強一致性
- 非結(jié)構(gòu)化的存儲
- 基于多維關系模型
- 具有特有的使用場景
優(yōu)點 - 保持數(shù)據(jù)的一致性(事務處理)
- 可以進行 join 等復雜查詢
- 通用化桐绒,技術成熟
- 高并發(fā)夺脾,大數(shù)據(jù)下讀寫能力較強
- 基本支持分布式之拨,易于擴展,可伸縮
- 簡單咧叭,弱結(jié)構(gòu)化存儲
缺點 - 數(shù)據(jù)讀寫必須經(jīng)過 sql 解析蚀乔,大量數(shù)據(jù)、高并發(fā)下讀寫性能不足
- 對數(shù)據(jù)做讀寫佳簸,或修改數(shù)據(jù)結(jié)構(gòu)時需要加鎖乙墙,影響并發(fā)操作
- 無法適應非結(jié)構(gòu)化存儲
- 擴展困難
- 昂貴、復雜
- join 等復雜操作能力較弱
- 事務支持較弱
- 通用性差
- 無完整約束復雜業(yè)務場景支持較差

數(shù)據(jù)的切分(Sharding)根據(jù)其切分規(guī)則的類型生均,可以分為兩種切分模式听想。一種是按照不同的表(或者Schema)來切分到不同的數(shù)據(jù)庫(主機)之上,這種切可以稱之為數(shù)據(jù)的垂直(縱向)切分马胧;另外一種則是根據(jù)表中的數(shù)據(jù)的邏輯關系汉买,將同一個表中的數(shù)據(jù)按照某種條件拆分到多臺數(shù)據(jù)庫(主機)上面,這種切分稱之為數(shù)據(jù)的水平(橫向)切分

對于 DBA 來說佩脊,可以這么理解 Mycat:

Mycat 就是 MySQL Server蛙粘,而 Mycat 后面連接的 MySQL Server,就好象是 MySQL 的存儲引擎,如 InnoDB威彰,MyISAM 等出牧,因此,Mycat 本身并不存儲數(shù)據(jù)歇盼,數(shù)據(jù)是在后端的 MySQL 上存儲的舔痕,因此數(shù)據(jù)可靠性以及事務等都是 MySQL 保證的,簡單的說豹缀,Mycat 就是 MySQL 最佳伴侶伯复,它在一定程度上讓 MySQL 擁有了能跟 Oracle PK 的能力。

對于軟件工程師來說邢笙,可以這么理解 Mycat:

Mycat 就是一個近似等于 MySQL 的數(shù)據(jù)庫服務器啸如,你可以用連接 MySQL 的方式去連接 Mycat(除了端口不同,默認的 Mycat 端口是 8066 而非 MySQL 的 3306氮惯,因此需要在連接字符串上增加端口信息)叮雳,大多數(shù)情況下,可以用你熟悉的對象映射框架使用 Mycat筐骇,但建議對于分片表债鸡,盡量使用基礎的 SQL 語句,因為這樣能達到最佳性能铛纬,特別是幾千萬甚至幾百億條記錄的情況下厌均。

對于架構(gòu)師來說,可以這么理解 Mycat:

Mycat 是一個強大的數(shù)據(jù)庫中間件告唆,不僅僅可以用作讀寫分離棺弊、以及分表分庫晶密、容災備份,而且可以用于多租戶應用開發(fā)模她、云平臺基礎設施稻艰、讓你的架構(gòu)具備很強的適應性和靈活性,借助于即將發(fā)布的 Mycat 智能優(yōu)化模塊侈净,系統(tǒng)的數(shù)據(jù)訪問瓶頸和熱點一目了然尊勿,根據(jù)這些統(tǒng)計分析數(shù)據(jù),你可以自動或手工調(diào)整后端存儲畜侦,將不同的表映射到不同存儲引擎上元扔,而整個應用的代碼一行也不用改變。

Mycat 原理

Mycat 的原理中最重要的一個動詞是“攔截”旋膳,它攔截了用戶發(fā)送過來的 SQL 語句涮俄,首先對 SQL 語句做了一些特定的分析:如分片分析掀亥、路由分析、讀寫分離分析换淆、緩存分析等轴脐,然后將此 SQL 發(fā)往后端的真實數(shù)據(jù)庫岸霹,并將返回的結(jié)果做適當?shù)奶幚砥苤遥罱K再返回給用戶

上述圖片里蝌戒,Orders 表被分為三個分片 datanode(簡稱 dn),這三個分片是分布在兩臺 MySQL Server 上(DataHost)碱工,即 datanode=database@datahost 方式垄懂,因此你可以用一臺到 N 臺服務器來分片,分片規(guī)則為(sharding rule)典型的字符串枚舉分片規(guī)則痛垛,一個規(guī)則的定義是分片字段(sharding column)+分片函數(shù)(rule function),這里的分片字段為 prov 而分片函數(shù)為字符串枚舉方式

當 Mycat 收到一個 SQL 時桶蛔,會先解析這個 SQL匙头,查找涉及到的表,然后看此表的定義仔雷,如果有分片規(guī)則蹂析,則獲取到 SQL 里分片字段的值,并匹配分片函數(shù)碟婆,得到該 SQL 對應的分片列表电抚,然后將 SQL 發(fā)往這些分片去執(zhí)行,最后收集和處理所有分片返回的結(jié)果數(shù)據(jù)竖共,并輸出到客戶端蝙叛。以 select * from Orders where prov=?語句為例,查到 prov=wuhan公给,按照分片函數(shù)借帘,wuhan 返回 dn1蜘渣,于是 SQL 就發(fā)給了 MySQL1,去取 DB1 上的查詢結(jié)果肺然,并返回給用戶蔫缸。

如果上述 SQL 改為 select * from Orders where prov in (‘wuhan’,‘beijing’),那么际起,SQL 就會發(fā)給MySQL1 與 MySQL2 去執(zhí)行拾碌,然后結(jié)果集合并后輸出給用戶。但通常業(yè)務中我們的 SQL 會有 Order By 以及Limit 翻頁語法街望,此時就涉及到結(jié)果集在 Mycat 端的二次處理校翔,這部分的代碼也比較復雜,而最復雜的則屬兩個表的 Jion 問題它匕,為此展融,Mycat 提出了創(chuàng)新性的 ER 分片、全局表豫柬、HBT(Human Brain Tech)人工智能的 Catlet告希、以及結(jié)合 Storm/Spark 引擎等十八般武藝的解決辦法,從而成為目前業(yè)界最強大的方案烧给,這就是開源的力量

應用場景

  • 單純的讀寫分離燕偶,此時配置最為簡單,支持讀寫分離础嫡,主從切換指么;
  • 分表分庫,對于超過 1000 萬的表進行分片榴鼎,最大支持 1000 億的單表分片伯诬;
  • 多租戶應用,每個應用一個庫巫财,但應用程序只連接 Mycat盗似,從而不改造程序本身,實現(xiàn)多租戶化平项;
  • 報表系統(tǒng)赫舒,借助于 Mycat 的分表能力,處理大規(guī)模報表的統(tǒng)計闽瓢;
  • 替代 Hbase接癌,分析大數(shù)據(jù);
  • 作為海量數(shù)據(jù)實時查詢的一種簡單有效方案扣讼,比如 100 億條頻繁查詢的記錄需要在 3 秒內(nèi)查詢出來結(jié)果缺猛,除了基于主鍵的查詢,還可能存在范圍查詢或其他屬性查詢,此時 Mycat 可能是最簡單有效的選擇

Mycat 中的概念

數(shù)據(jù)庫中間件

Mycat 是數(shù)據(jù)庫中間件枯夜,就是介于數(shù)據(jù)庫與應用之間弯汰,進行數(shù)據(jù)處理與交互的中間服務。由于前面講的對數(shù)據(jù)進行分片處理之后湖雹,從原有的一個庫咏闪,被切分為多個分片數(shù)據(jù)庫,所有的分片數(shù)據(jù)庫集群構(gòu)成了整個完整的數(shù)據(jù)庫存儲

邏輯庫(schema)

通常對實際應用來說摔吏,并不需要知道中間件的存在鸽嫂,業(yè)務開發(fā)人員只需要知道數(shù)據(jù)庫的概念,所以數(shù)據(jù)庫中間件可以被看做是一個或多個數(shù)據(jù)庫集群構(gòu)成的邏輯庫

邏輯表(table)

ER 表

關系型數(shù)據(jù)庫是基于實體關系模型(Entity-Relationship Model)之上征讲,通過其描述了真實世界中事物與關系据某,Mycat 中的 ER 表即是來源于此。根據(jù)這一思路诗箍,提出了基于 E-R 關系的數(shù)據(jù)分片策略癣籽,子表的記錄與所關聯(lián)的父表記錄存放在同一個數(shù)據(jù)分片上,即子表依賴于父表滤祖,通過表分組(Table Group)保證數(shù)據(jù) Join 不會跨庫操作

全局表

一個真實的業(yè)務系統(tǒng)中筷狼,往往存在大量的類似字典表的表,這些表基本上很少變動匠童,字典表具有以下幾個特性:

? 變動不頻繁埂材;

? 數(shù)據(jù)量總體變化不大;

? 數(shù)據(jù)規(guī)模不大汤求,很少有超過數(shù)十萬條記錄俏险。

對于這類的表,在分片的情況下扬绪,當業(yè)務表因為規(guī)模而進行分片以后竖独,業(yè)務表與這些附屬的字典表之間的關聯(lián),就成了比較棘手的問題挤牛,所以 Mycat 中通過數(shù)據(jù)冗余來解決這類表的 join预鬓,即所有的分片都有一份數(shù)據(jù)的拷貝,所有將字典表或者符合字典表特性的一些表定義為全局表赊颠。數(shù)據(jù)冗余是解決跨分片數(shù)據(jù) join 的一種很好的思路,也是數(shù)據(jù)切分規(guī)劃的另外一條重要規(guī)則

分片節(jié)點(dataNode)

數(shù)據(jù)切分后劈彪,一個大表被分到不同的分片數(shù)據(jù)庫上面竣蹦,每個表分片所在的數(shù)據(jù)庫就是分片節(jié)點(dataNode)

節(jié)點主機(dataHost)

數(shù)據(jù)切分后,每個分片節(jié)點(dataNode)不一定都會獨占一臺機器沧奴,同一機器上面可以有多個分片數(shù)據(jù)庫痘括,這樣一個或多個分片節(jié)點(dataNode)所在的機器就是節(jié)點主機(dataHost),為了規(guī)避單節(jié)點主機并發(fā)數(shù)限制,盡量將讀寫壓力高的分片節(jié)點(dataNode)均衡的放在不同的節(jié)點主機(dataHost)

分片規(guī)則(rule)

按照某種業(yè)務規(guī)則把數(shù)據(jù)分到某個分片的規(guī)則就是分片規(guī)則,數(shù)據(jù)切分選擇合適的分片規(guī)則非常重要纲菌,將極大的避免后續(xù)數(shù)據(jù)處理的難度

Mycat 的配置

1 schema.xml

管理著 MyCat 的邏輯庫挠日、表、分片規(guī)則翰舌、DataNode 以 及 DataSource

<?xml version="1.0"?>
<!DOCTYPE mycat:schema SYSTEM "schema.dtd">
<mycat:schema xmlns:mycat="http://io.mycat/">
    <schema name="TESTDB" checkSQLschema="true" sqlMaxLimit="100" randomDataNode="dn1">
        <table name="customer" primaryKey="id" dataNode="dn1,dn2" rule="sharding-by-intfile" autoIncrement="true" fetchStoreNodeByJdbc="true">
            <childTable name="customer_addr" primaryKey="id" joinKey="customer_id" parentKey="id"> </childTable>
        </table>
    </schema>

    <!-- dataNode 標簽定義了 MyCat 中的數(shù)據(jù)節(jié)點嚣潜,也就是我們通常說所的數(shù)據(jù)分片。一個 dataNode 標簽就是一個獨立的數(shù)據(jù)分片 -->
    <dataNode name="dn1" dataHost="localhost1" database="db1" />
    <dataNode name="dn2" dataHost="localhost1" database="db2" />
    <dataNode name="dn3" dataHost="localhost1" database="db3" />

    <!-- 定義了具體的數(shù)據(jù)庫實例椅贱、讀寫分離配置和心跳語句 -->
    <dataHost name="localhost1" maxCon="1000" minCon="10" balance="0" writeType="0" dbType="mysql" dbDriver="jdbc" switchType="1"  slaveThreshold="100">
        <heartbeat>select user()</heartbeat>
        <!-- can have multi write hosts -->
        <writeHost host="hostM1" url="jdbc:mysql://localhost:3306" user="root" password="root">
            <readHost host="" url="" password="" user=""></readHost>
        </writeHost>
        <!-- <writeHost host="hostM2" url="localhost:3316" user="root" password="123456"/> -->
    </dataHost>
</mycat:schema>

1.1 schema 標簽

schema 標簽用于定義 MyCat 實例中的邏輯庫懂算,MyCat 可以有多個邏輯庫,每個邏輯庫都有自己的相關配置庇麦〖萍迹可以使用 schema 標簽來劃分這些不同的邏輯庫。

屬性名 屬性說明
dataNode 該屬性用于綁定邏輯庫到某個具體的 database 上山橄,1.3 版本如果配置了 dataNode垮媒,則不可以配置分片表,1.4 可以配置默認分片航棱,只需要配置需要分片的表即可
checkSQLschema 為 true 時睡雇,schema 的字符去掉,select * from TESTDB.travelrecord 會修改為 select * from travelrecord
sqlMaxLimit 當該值設置為某個數(shù)值時丧诺。每條執(zhí)行的 SQL 語句入桂,如果沒有加上 limit 語句,MyCat 也會自動的加上所對應的值
1.1.1 table 標簽

定義了 MyCat 中的邏輯表驳阎,所有需要拆分的表都需要在這個標簽中定義

屬性名 屬性說明
name 定義邏輯表的表名抗愁,這個名字就如同我在數(shù)據(jù)庫中執(zhí)行 create table 命令指定的名字一樣,同個 schema 標簽中定義的名字必須唯一
dataNode 定義這個邏輯表所屬的 dataNode, 該屬性的值需要和 dataNode 標簽中 name 屬性的值相互對應
rule 該屬性用于指定邏輯表要使用的規(guī)則名字呵晚,規(guī)則名字在 rule.xml 中定義蜘腌,必須與 tableRule 標簽中 name 屬性屬性值一一對應
ruleRequired 該屬性用于指定表是否綁定分片規(guī)則,如果配置為 true饵隙,但沒有配置具體 rule 的話 撮珠,程序會報錯
primaryKey 該邏輯表對應真實表的主鍵
type 該屬性定義了邏輯表的類型,目前邏輯表只有“全局表”和”普通表”兩種類型金矛。對應的配置:
? 全局表:global
? 普通表:不指定該值為 globla 的所有表
autoIncrement 使用 autoIncrement=“true” 指定這個表有使用自增長主鍵
subTables
needAddLimit 指定表是否需要自動的在每個語句后面加上 limit 限制
1.1.1.1 childTable 標簽

childTable 標簽用于定義 E-R 分片的子表芯急。通過標簽上的屬性與父表進行關聯(lián)

屬性名 屬性說明
name 定義子表的表名
joinKey 插入子表的時候會使用這個列的值查找父表存儲的數(shù)據(jù)節(jié)點
parentKey 屬性指定的值一般為與父表建立關聯(lián)關系的列名。程序首先獲取 joinkey 的值驶俊,再通過 parentKey 屬性指定的列名產(chǎn)生查詢語句娶耍,通過執(zhí)行該語句得到父表存儲在哪個分片上。從而確定子表存儲的位置
primaryKey 同 table 標簽所描述的
needAddLimit 同 table 標簽所描述的

1.2 dataNode 標簽

定義了 MyCat 中的數(shù)據(jù)節(jié)點饼酿,也就是我們通常說所的數(shù)據(jù)分片榕酒。一個 dataNode 標簽就是一個獨立的數(shù)據(jù)分片

<dataNode name="dn1" dataHost="lch3307" database="db1" ></dataNode>

例子中所表述的意思為:使用名字為 lch3307 數(shù)據(jù)庫實例上的 db1 物理數(shù)據(jù)庫胚膊,這就組成一個數(shù)據(jù)分片,最后想鹰,我們使用名字 dn1 標識這個分片

屬性名 屬性說明
name 定義數(shù)據(jù)節(jié)點的名字紊婉,這個名字需要是唯一的,我們需要在 table 標簽上應用這個名字辑舷,來建立表與分片對應的關系
dataHost 該屬性用于定義該分片屬于哪個數(shù)據(jù)庫實例的喻犁,屬性值是引用 dataHost 標簽上定義的 name 屬性
database 該屬性用于定義該分片屬性哪個具體數(shù)據(jù)庫實例上的具體庫,因為這里使用兩個緯度來定義分片惩妇,就是:實例+具體的庫株汉。因為每個庫上建立的表和表結(jié)構(gòu)是一樣的。所以這樣做就可以輕松的對表進行水平拆分

1.3 dataHost 標簽

屬性名 屬性說明
name 唯一標識 dataHost 標簽歌殃,供上層的標簽使用
maxCon 指定每個讀寫實例連接池的最大連接
minCon 指定每個讀寫實例連接池的最小連接乔妈,初始化連接池的大小
balance 1. balance="0", 不開啟讀寫分離機制,所有讀操作都發(fā)送到當前可用的 writeHost 上氓皱。
2. balance="1"路召,全部的 readHost 與 stand by writeHost 參與 select 語句的負載均衡,簡單的說波材,當雙主雙從模式(M1->S1股淡,M2->S2,并且 M1 與 M2 互為主備)廷区,正常情況下唯灵,M2,S1,S2 都參與 select 語句的負載均衡。
3. balance="2"隙轻,所有讀操作都隨機的在 writeHost埠帕、readhost 上分發(fā)。
4. balance="3"玖绿,所有讀請求隨機的分發(fā)到 wiriterHost 對應的 readhost 執(zhí)行敛瓷,writerHost 不負擔讀壓力,注意 balance=3 只在 1.4 及其以后版本有斑匪,1.3 沒有
writeType 1. writeType="0"呐籽,所有寫操作發(fā)送到配置的第一個 writeHost,第一個掛了切到還生存的第二個 writeHost蚀瘸,重新啟動后已切換后的為準狡蝶,切換記錄在配置文件中:dnindex.properties .
2. writeType="1",所有寫操作都隨機的發(fā)送到配置的 writeHost贮勃,1.5 以后廢棄不推薦
dbType 指定后端連接的數(shù)據(jù)庫類型牢酵,例如:mongodb、oracle衙猪、spark 等
dbDriver 指定連接后端數(shù)據(jù)庫使用的 Driver,目前可選的值有 native 和 jdbc
如果使用 JDBC 的話需要將符合 JDBC 4 標準的驅(qū)動 JAR 包放到 MYCAT\lib 目錄下,并檢查驅(qū)動 JAR 包中包括如下目錄結(jié)構(gòu)的文件:META-INF\services\java.sql.Driver垫释。在這個文件內(nèi)寫上具體的 Driver 類名丝格,例如:com.mysql.jdbc.Driver
switchType switchType = -1, 表示不自動切換
switchType = 1 默認值,自動切換
switchType = 2 基于 MySQL 主從同步的狀態(tài)決定是否切換心跳語句為 show slave status
switchType = 3 基于 MySQL galary cluster 的切換機制(適合集群)(1.4.1)心跳語句為 show status like ‘wsrep%’
tempReadHostAvailable 如果配置了這個屬性 writeHost 下面的 readHost 仍舊可用棵譬,默認 0 可配置(0显蝌、1)
1.3.1 heartbeat 標簽

這個標簽內(nèi)指明用于和后端數(shù)據(jù)庫進行心跳檢查的語句。例如,MYSQL 可以使用 select user()订咸,Oracle 可以使用 select 1 from dual 等

1.3.2 writeHost 標簽曼尊、readHost 標簽

這兩個標簽都指定后端數(shù)據(jù)庫的相關配置給 mycat,用于實例化后端連接池脏嚷。唯一不同的是骆撇,writeHost 指定寫實例、readHost 指定讀實例

在一個 dataHost 內(nèi)可以定義多個 writeHost 和 readHost父叙。但是神郊,如果 writeHost 指定的后端數(shù)據(jù)庫宕機,那么這個 writeHost 綁定的所有 readHost 都將不可用趾唱。另一方面涌乳,由于這個 writeHost 宕機系統(tǒng)會自動的檢測到,并切換到備用的 writeHost 上去

屬性名 屬性說明
host 用于標識不同實例甜癞,一般 writeHost 我們使用 *M1夕晓,readHost 我們用 *S1
url 后端實例連接地址,如果是使用 native 的 dbDriver悠咱,則一般為 address:port 這種形式蒸辆。用 JDBC 或其他的 dbDriver,則需要特殊指定乔煞。當使用 JDBC 時則可以這么寫:jdbc:mysql://localhost:3306/
password 后端存儲實例需要的密碼
user 后端存儲實例需要的用戶名字
weight 權(quán)重吁朦,配置在 readhost 中作為讀節(jié)點的權(quán)重(1.4 以后)
usingDecrypt 是否對密碼加密默認 0 否 如需要開啟配置 1,同時使用加密程序?qū)γ艽a加密

2 server.xml

server.xml 幾乎保存了所有 mycat 需要的系統(tǒng)配置信息渡贾。其在代碼內(nèi)直接的映射類為 SystemConfig 類

2.1 user 標簽

<user name="test">
    <property name="password">test</property>
    <property name="schemas">TESTDB</property>
    <property name="readOnly">true</property>
    <property name="benchmark">11111</property>
    <property name="usingDecrypt">1</property>
    <privileges check="false">
        <schema name="TESTDB" dml="0010" showTables="custome/mysql">
            <table name="tbl_user" dml="0110"></table>
            <table name="tbl_dynamic" dml="1111"></table>
        </schema>
    </privileges>
</user>

server.xml 中的標簽本就不多逗宜,這個標簽主要用于定義登錄 mycat 的用戶和權(quán)限。例如上面的例子中空骚,我定義了一個用戶纺讲,用戶名為 test、密碼也為 test囤屹,可訪問的 schema 也只有 TESTDB 一個

屬性名 屬性說明
Benchmark benchmark 基準, 當前端的整體 connection 數(shù)達到基準值是, 對來自該賬戶的請求開始拒絕連接熬甚,0 或不設表示不限制
usingDecrypt 是否對密碼加密默認 0 否 如需要開啟配置 1,同時使用加密程序?qū)γ艽a加密
2.1.1 privileges 標簽

對用戶的 schema 及 下級的 table 進行精細化的 DML 權(quán)限控制肋坚,privileges 節(jié)點中的 check 屬性是用于標識是否開啟 DML 權(quán)限檢查乡括, 默認 false 標識不檢查肃廓,當然 privileges 節(jié)點不配置,等同 check=false诲泌,由于 Mycat 一個用戶的 schemas 屬性可配置多個 schema 盲赊,所以 privileges 的下級節(jié)點 schema 節(jié)點同樣可配置多個,對多庫多表進行細粒度的 DML 權(quán)限控制

Schema/Table 上的 dml 屬性描述:

屬性名 屬性說明 示例(禁止增刪改查)
dml insert,update,select,delete 0000
<!-- 禁止 insert 和 delete 操作 -->
<privileges check="true">
    <schema name="TESTDB" dml="0110" >
        <table name="table01" dml="0111"></table>
        <table name="table02" dml="1111"></table>
    </schema>
    <schema name="TESTDB1" dml="0110">
        <table name="table03" dml="1110"></table>
        <table name="table04" dml="1010"></table>
    </schema>
</privileges>

2.2 system 標簽

這個標簽內(nèi)嵌套的所有 property 標簽都與系統(tǒng)配置有關敷扫,請注意哀蘑,下面我會省去標簽 property 直接使用這個標簽的 name 屬性內(nèi)的值來介紹這個屬性的作用

2.2.1 charset 屬性

字符集設置

2.2.2 defaultSqlParser 屬性

由于 mycat 最初是時候 Foundation DB 的 sql 解析器,而后才添加的 Druid 的解析器葵第。所以這個屬性用來指定默認的解析器绘迁。目前的可用的取值有:druidparser 和 fdbparser。使用的時候可以選擇其中的一種卒密,目前一般都使用 druidparser

2.2.3 processors 屬性

這個屬性主要用于指定系統(tǒng)可用的線程數(shù)缀台,默認值為機器 CPU 核心線程數(shù)。主要影響 processorBufferPool栅受、processorBufferLocalPercent将硝、processorExecutor 屬性。NIOProcessor 的個數(shù)也是由這個屬性定義的屏镊,所以調(diào)優(yōu)的時候可以適當?shù)恼{(diào)高這個屬性

2.2.4 processorBufferChunk 屬性

這個屬性指定每次分配 Socket Direct Buffer 的大小依疼,默認是 4096 個字節(jié)荣挨。這個屬性也影響 buffer pool 的長度神帅。如果一次性獲取的數(shù)過大 buffer 不夠用經(jīng)常出現(xiàn)警告,則可以適當調(diào)大

2.2.5 processorBufferPool 屬性

這個屬性指定 bufferPool 計算比例值桶唐。由于每次執(zhí)行 NIO 讀棍丐、寫操作都需要使用到 buffer误辑,系統(tǒng)初始化的時候會建立一定長度的 buffer 池來加快讀、寫的效率歌逢,減少建立 buffer 的時間

Mycat 中有兩個主要的 buffer 池:

  • BufferPool
  • ThreadLocalPool

BufferPool 由 ThreadLocalPool 組合而成巾钉,每次從 BufferPool 中獲取 buffer 都會優(yōu)先獲取 ThreadLocalPool 中的 buffer,未命中之后才會去獲取 BufferPool 中的 buffer秘案。也就是說 ThreadLocalPool 是作為 BufferPool 的二級緩存砰苍,由每個線程內(nèi)部自己使用。當然這其中還有一些限制條件需要線程的名字是由$_開頭阱高。然而 BufferPool 上的 buffer 則是每個 NIOProcessor 都共享的赚导。

默認這個屬性的值為: 默認 bufferChunkSize(4096) * processors 屬性 * 1000

BufferPool 的總長度 = bufferPool / bufferChunk

若 bufferPool 不是 bufferChunk 的整數(shù)倍,則總長度為前面計算得出的商 + 1假設系統(tǒng)線程數(shù)為 4赤惊,其他都為屬性的默認值吼旧,則:bufferPool = 4096 * 4 * 1000,BufferPool 的總長度 : 4000 = 16384000 / 4096

2.2.6 processorBufferLocalPercent 屬性

前面提到了 ThreadLocalPool未舟。這個屬性就是用來控制分配這個 pool 的大小用的圈暗,但其也并不是一個準確的值掂为,也是一個比例值。這個屬性默認值為 100员串。

線程緩存百分比 = bufferLocalPercent / processors 屬性菩掏。

例如,系統(tǒng)可以同時運行 4 個線程昵济,使用默認值,則根據(jù)公式每個線程的百分比為 25野揪。最后根據(jù)這個百分比來計算出具體的 ThreadLocalPool 的長度公式如下:

ThreadLocalPool 的長度 = 線程緩存百分比 * BufferPool 長度 / 100

假設 BufferPool 的長度為 4000访忿,其他保持默認值。那么最后每個線程建立上的 ThreadLocalPool 的長度為: 1000 = 25 * 4000 / 100

2.2.7 processorExecutor 屬性

這個屬性主要用于指定 NIOProcessor 上共享的 businessExecutor 固定線程池大小斯稳。mycat 在需要處理一些異步邏輯的時候會把任務提交到這個線程池中海铆。新版本中這個連接池的使用頻率不是很大了,可以設置一個較小的值

2.2.8 sequnceHandlerType 屬性

指定使用 Mycat 全局序列的類型挣惰。0 為本地文件方式卧斟,1 為數(shù)據(jù)庫方式,2 為時間戳序列方式憎茂,3 為分布式 ZK ID 生成器珍语,4 為 zk 遞增 id 生成

3 rule.xml

rule.xml 里面就定義了我們對表進行拆分所涉及到的規(guī)則定義。我們可以靈活的對表使用不同的分片算法竖幔,或者對表使用相同的算法但具體的參數(shù)不同板乙。這個文件里面主要有 tableRule 和 function 這兩個標簽。在具體使用過程中可以按照需求添加 tableRule 和 function

3.1 tableRule 標簽

<tableRule name="mod-long">
   <rule>
      <columns>id</columns>
      <algorithm>mod-long</algorithm>
   </rule>
</tableRule>
屬性名 屬性說明
name 屬性指定唯一的名字拳氢,用于標識不同的表規(guī)則

內(nèi)嵌的 rule 標簽則指定對物理表中的哪一列進行拆分和使用什么路由算法:

標簽名 標簽說明
columns 指定要拆分的列名字
algorithm 使用 function 標簽中的 name 屬性

3.2 function 標簽

<function name="mod-long" class="io.mycat.route.function.PartitionByMod">
   <!-- how many data nodes -->
   <property name="count">3</property>
</function>
屬性名 屬性說明
name 指定算法的名字
class 制定路由算法具體的類名字
property 算法需要用到的一些屬性

Mycat 的分片 join

Mycat 目前版本支持跨分片的 join,主要實現(xiàn)的方式有四種:全局表募逞,ER 分片,catletT(人工智能)和 ShareJoin馋评,ShareJoin 在開發(fā)版中支持放接,前面三種方式 1.3.0.1 支持

1 全局表

2 ER分片

MyCAT 借鑒了 NewSQL 領域的新秀 Foundation DB 的設計思路,F(xiàn)oundation DB 創(chuàng)新性的提出了 TableGroup 的概念留特,其將子表的存儲位置依賴于主表纠脾,并且物理上緊鄰存放,因此徹底解決了 JION 的效率和性能問題磕秤,根據(jù)這一思路乳乌,提出了基于 E-R 關系的數(shù)據(jù)分片策略,子表的記錄與所關聯(lián)的父表記錄存放在同一個數(shù)據(jù)分片上

customer 采用 sharding-by-intfile 這個分片策略市咆,分片在 dn1,dn2 上汉操,orders 依賴父表進行分片,兩個表的關聯(lián)關系為 orders.customer_id=customer.id蒙兰。于是數(shù)據(jù)分片和存儲的示意圖如下:

這樣一來磷瘤,分片 Dn1 上的的 customer 與 Dn1 上的 orders 就可以進行局部的 JOIN 聯(lián)合芒篷,Dn2 上也如此,再合并兩個節(jié)點的數(shù)據(jù)即可完成整體的 JOIN采缚,試想一下针炉,每個分片上 orders 表有 100 萬條,則 10 個分片就有 1 個億扳抽,基于 E-R 映射的數(shù)據(jù)分片模式篡帕,基本上解決了 80%以上的企業(yè)應用所面臨的問題

3 Share join

ShareJoin 是一個簡單的跨分片 Join,基于 HBT 的方式實現(xiàn)

目前支持 2 個表的 join,原理就是解析 SQL 語句,拆分成單表的 SQL 語句執(zhí)行贸呢,然后把各個節(jié)點的數(shù)據(jù)匯集

4 catlet(人工智能)

全局序列號

在實現(xiàn)分庫分表的情況下镰烧,數(shù)據(jù)庫自增主鍵已無法保證自增主鍵的全局唯一。為此楞陷,MyCat 提供了全局sequence怔鳖,并且提供了包含本地配置和數(shù)據(jù)庫配置等多種實現(xiàn)方式

1 本地文件方式

2 數(shù)據(jù)庫配置方式

Mycat SQL 攔截機制

用戶可以寫一個 java 類,將傳入 MyCAT 的 SQL 進行改寫然后交給Mycat 去執(zhí)行固蛾,此技巧可以完成如下一些特殊功能:

  • 捕獲和記錄某些特殊的 SQL结执;
  • 記錄 sql 查找異常;
  • 出于性能優(yōu)化的考慮艾凯,改寫 SQL献幔,比如改變查詢條件的順序或增加分頁限制;
  • 將某些 Select SQL 強制設置為 Read 模式览芳,走讀寫分離(很多事務框架很難剝離事務中的 Select SQL斜姥;
  • 后期 Mycat 智能優(yōu)化,攔截所有 sql 做智能分析沧竟,自動監(jiān)控節(jié)點負載铸敏,自動優(yōu)化路由,提供數(shù)據(jù)庫優(yōu)化建議

SQL 攔截的原理是在路由之前攔截 SQL悟泵,然后做其他處理杈笔,完了之后再做路由,執(zhí)行,如下圖所示:

二糕非、實現(xiàn)自定義 SQL 攔截器

2.1 實現(xiàn) SQLInterceptor 接口

/**
 * used for interceptor sql before execute ,can modify sql befor execute
 * @author wuzhih
 *
 */
public interface SQLInterceptor {

   /**
    * return new sql to handler,ca't modify sql's type 
    * @param sql
    * @param sqlType
    * @return new sql
    */
   String interceptSQL(String sql ,int sqlType);
}

例如蒙具,我們自定義實現(xiàn)一個 SQL 攔截器,對每個 SQL 語句進行 EXPLAIN 分析朽肥,找出潛在的慢 sql

2.2 在 server.xml 中添加攔截器配置

運行采坑記錄

一禁筏、Client does not support authentication protocol requested by server

解決方案:

USE mysql;
ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY '${password}';
FLUSH PRIVILEGES;

官方解決鏈接:https://dev.mysql.com/doc/refman/5.6/en/old-client.html

源碼解析

一、mycat 通信模型

前端與后端通信框架都為NIO/AIO衡招,因為目前生產(chǎn)上用的linux發(fā)行版內(nèi)核都沒有真正實現(xiàn)網(wǎng)絡上的AIO篱昔,如果應用用AIO的話可能比NIO還要慢一些,所以,我們這里只分析NIO相關的通信模塊州刽。

  • NIOAcceptor:作為服務器接受客戶端連接(前端 NIO 通信)
  • NIOConnector:作為客戶端去連接后臺數(shù)據(jù)庫(MySql空执,后端 NIO 通信)
  • NIOReactor:Reactor 模式的 NIO,處理并轉(zhuǎn)發(fā)請求到 RW 線程穗椅,其實就是把對應 AbstractConnection(就是 NIO 的 channel 的封裝)注冊到 RW 線程的selector上辨绊,只注冊讀標記
  • NIOReactorPool:一般高性能網(wǎng)絡通信框架采用多 Reactor(多 dispatcher)模式,這里將 NIOReactor 池化匹表;每次 NIOConnector 接受一個連接或者NIOAcceptor 請求一個連接门坷,都會封裝成 AbstractConnection,同時請求 NIOReactorPool 每次輪詢出一個 NIOReactor袍镀,之后 AbstractConnection 與這個NIOReactor 綁定(就是3之中說的注冊)
  • RW:RW 線程拜鹤,負責執(zhí)行 NIO 的 channel 讀寫,這里 channel 封裝成了 AbstractConnection
  • NIOSocketWR:每個前端連接(FrontendConnection)和后端連接(BackendConnection)都有一個對應的緩沖區(qū)流椒,對連接讀寫操作具體如何操作的方法和緩存方式,封裝到了這個類里面

FrontendConnection

客戶端與 mycat 的連接明也,子類有 ServerConnection 和 ManagerConnection

BackendConnection

mycat 連接后天數(shù)據(jù)庫的 connection宣虾,子類有 MySqlConnection、JDBCConnection

二温数、客戶端連接建立及認證

客戶端建立連接及認證
  • NIOAcceptor#run 方法輪詢是否有連接請求绣硝,若客戶端有連接請求到 mycat,則調(diào)用 io.mycat.net.NIOAcceptor#accept 方法
  • NIOAcceptor#accept 方法調(diào)用 serverChannel.accept 創(chuàng)建一個 SocketChannel撑刺,并根據(jù) SocketChannel 創(chuàng)建一個新的前端連接實例 FrontendConnection鹉胖,然后從 NIOReactor 池中獲取的 NIOReactor 實例,并調(diào)用 NIOReactor#postRegister 方法將 FrontendConnection 放入 RW 線程的注冊隊列够傍,然后喚醒 RW 線程的 selector
  • RW#run 輪詢注冊隊列中是否有 AbstractConnection甫菠,若有則將其取出,并作為讀事件注冊到 tSelector冕屯。此外還調(diào)用 FrontendConnection#register 進行前端連接建立和認證

三寂诱、客戶端 SQL 請求執(zhí)行流程

mycat流程梳理
  • RW#run 輪詢注冊隊列中是否有 AbstractConnection,若存在且為讀事件則調(diào)用 AbstractConnection#asynRead 異步讀取數(shù)據(jù)并梳理安聘,實際處理邏輯見 NIOSocketWR#asynRead

  • NIOSocketWR#asynRead 從 前端連接的 channel 中讀取數(shù)據(jù)痰洒,并且保存到對應 AbstractConnection 的 readBuffer 中,之后調(diào)用 AbstractConnection#onReadData 處理讀取到的數(shù)據(jù)

    @Override
    public void asynRead() throws IOException {
       ByteBuffer theBuffer = con.readBuffer;
       if (theBuffer == null) {
          theBuffer = con.processor.getBufferPool().allocate(con.processor.getBufferPool().getChunkSize());
          con.readBuffer = theBuffer;
       }
       // 從 SocketChannel 中讀取數(shù)據(jù)浴韭,并且保存到 AbstractConnection 的 readBuffer 中丘喻,readBuffer 處于 write mode,返回讀取了多少字節(jié)
       int got = channel.read(theBuffer);
       // 調(diào)用處理讀取到的數(shù)據(jù)的方法
       con.onReadData(got);
    }
    
  • AbstractConnection#onReadData 讀取 readBuffer 中的數(shù)據(jù)并調(diào)用 AbstractConnection#handle 方法進行下一步處理念颈,其內(nèi)部調(diào)用 FrontendCommandHandler#handle

  • FrontendCommandHandler#handle 根據(jù) data[4] 來判斷命令類型泉粉,客戶端命令請求報文格式如下圖:

客戶端命令請求報文格式

data 的第五個字節(jié)存儲命令類型,客戶端命令請求報文命令類型詳情表見附錄1舍肠。我們以 MySQLPacket.COM_QUERY 為例進行接下來的討論搀继。當 data[4] == MySQLPacket.COM_QUERY 時窘面,調(diào)用 FrontendConnection#query(byte[])

public void handle(byte[] data) {
    // 判斷命令類型
    switch (data[4]) {
        ...
        // INSERT/SELECT/UPDATE/DELETE 等 SQL 歸屬于 MySQLPacket.COM_QUERY
        case MySQLPacket.COM_QUERY:
            commands.doQuery();
            source.query(data);
            break;
      ...
    }
}
  • FrontendConnection#query(byte[]) 將 data 字節(jié)數(shù)組轉(zhuǎn)化成 String 類型的 sql,之后調(diào)用 ServerQueryHandler#query(String) 方法

    public void query(byte[] data) {  
        MySQLMessage mm = new MySQLMessage(data);
        // 從 data[5] 即第六個字節(jié)開始讀取參數(shù)體
        mm.position(5);
        String sql = mm.readString(charset);  
        // 執(zhí)行 sql 語句叽躯,內(nèi)部調(diào)用 ServerQueryHandler#query(String)
        this.query( sql );
    }
    
  • ServerQueryHandler#query(String) 解析 SQL 類型财边,根據(jù)不同類型做不同的處理,之后調(diào)用 ServerConnection#routeEndExecuteSQL 進行路由計算(包括全局序列號点骑、SQL 語句攔截等酣难。路由計算詳細另述)并得到路由結(jié)果 RouteResultset,然后調(diào)用 NonBlockingSession#execute

    public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) {
        // 路由計算
        RouteResultset rrs = MycatServer
                    .getInstance()
                    .getRouterservice()
                    .route(MycatServer.getInstance().getConfig().getSystem(), schema, type, sql, this.charset, this);
        if (rrs != null) {
            // session 執(zhí)行
            session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type);
        }
    }
    
  • NonBlockingSession#execute 獲取路由的 dataNode 節(jié)點黑滴,若節(jié)點數(shù)為1則調(diào)用 SingleNodeHandler#execute 處理 sql憨募,否則調(diào)用 MultiNodeQueryHandler#execute 處理 sql。此處我們假定前端 sql 命令只路由到一個 dataNode袁辈,則調(diào)用 SingleNodeHandler#execute 處理 sql

    /**
     * NonBlockingSession#execute
     */
    @Override
    public void execute(RouteResultset rrs, int type) {
        RouteResultsetNode[] nodes = rrs.getNodes();
        if (nodes.length == 1) {
            singleNodeHandler = new SingleNodeHandler(rrs, this);
            singleNodeHandler.execute();
        } else {
            multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit, this);
            multiNodeHandler.execute();
        }
    }
    
  • SingleNodeHandler#execute 獲取后端連接 BackendConnection菜谣,并調(diào)用 SingleNodeHandler#_execute,該方法直接調(diào)用 BackendConnection#execute

    public void execute() throws Exception {
        // 獲取后端數(shù)據(jù)庫連接
        final BackendConnection conn = session.getTarget(node);
        // 若存在 dataNode 對應的 BackendConnection
        if (session.tryExistsCon(conn, node)) {
            _execute(conn);
        } else {
            // create new connection
              do something...
        }
    }
    
    private void _execute(BackendConnection conn) {
        conn.execute(node, session.getSource(), session.getSource().isAutocommit());
    }
    
  • 當 schema.xml 中配置 dataHost 的 dbDriver 為 jdbc 時晚缩,調(diào)用 JDBCConnection#execute 進行 sql 執(zhí)行(JDBCConnection 繼承 BackendConnection)尾膊。該方法新開一個線程處理 sql,最終調(diào)用 JDBCConnection#ouputResultSet 執(zhí)行 sql 并將結(jié)果寫入 ServerConnection

    @Override
    public void execute(final RouteResultsetNode node, final ServerConnection source, final boolean autocommit) {
        this.sqlSelectLimit = source.getSqlSelectLimit();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                // 調(diào)用 JDBCConnection#ouputResultSet
                executeSQL(node, source, autocommit);
            }
        };
        MycatServer.getInstance().getBusinessExecutor().execute(runnable);
    }
    
  • JDBCConnection#ouputResultSet 獲取數(shù)據(jù)庫連接并執(zhí)行 sql荞彼,然后將得到的結(jié)果集 ResultSet 解析為 Result Set 響應報文并寫入 ServerConnection

    private void ouputResultSet(ServerConnection sc, String sql) throws SQLException {
        ResultSet rs = null;
        Statement stmt = null;
        
        try {
            stmt = con.createStatement();
            rs = stmt.executeQuery(sql);
    
            List<FieldPacket> fieldPks = new LinkedList<FieldPacket>();
            // 根據(jù) resultset 加載列信息冈敛,保存至 fieldPks
            ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs,
                    this.isSpark);
            // 獲取列數(shù)
            int colunmCount = fieldPks.size();
            ByteBuffer byteBuf = sc.allocate();
    
            /* 1 寫入 resultset header packet */
            ResultSetHeaderPacket headerPkg = new ResultSetHeaderPacket();
            headerPkg.fieldCount = fieldPks.size();
            headerPkg.packetId = ++packetId;
            // 將 ResultSetHeaderPacket 的數(shù)據(jù)寫入 byteBuf
            byteBuf = headerPkg.write(byteBuf, sc, true);
            byteBuf.flip();
            byte[] header = new byte[byteBuf.limit()];
            // 將 byteBuf 中的信息寫入 header 中
            byteBuf.get(header);
            // byteBuf 標記歸位
            byteBuf.clear();
    
            /* 2 寫入 field packet */
            List<byte[]> fields = new ArrayList<byte[]>(fieldPks.size());
            Iterator<FieldPacket> itor = fieldPks.iterator();
            while (itor.hasNext()) {
                FieldPacket curField = itor.next();
                curField.packetId = ++packetId;
                // 將 FieldPacket 的數(shù)據(jù)寫入 byteBuf
                byteBuf = curField.write(byteBuf, sc, false);
                // position 設回 0,并將 limit 設成之前的 position 的值
                // limit:緩沖區(qū)數(shù)組中不可操作的下一個元素的位置:limit<=capacity
                byteBuf.flip();
                byte[] field = new byte[byteBuf.limit()];
                // 將 byteBuf 中的信息寫入 field 中
                byteBuf.get(field);
                byteBuf.clear();
                // 將 field 放入 fields
                fields.add(field);
            }
            /* 3 寫入 EOF packet */
            EOFPacket eofPckg = new EOFPacket();
            eofPckg.packetId = ++packetId;
            // 將 EOFPacket 的數(shù)據(jù)寫入 byteBuf
            byteBuf = eofPckg.write(byteBuf, sc, false);
            byteBuf.flip();
            byte[] eof = new byte[byteBuf.limit()];
            // 將 byteBuf 中的信息寫入 eof 中
            byteBuf.get(eof);
            byteBuf.clear();
            this.respHandler.fieldEofResponse(header, fields, eof, this);
    
            /* 4 寫入 Row Data packet */
            // output row
            while (rs.next()) {
                ResultSetMetaData resultSetMetaData = rs.getMetaData();
                int size = resultSetMetaData.getColumnCount();
                StringBuilder builder = new StringBuilder();
                for (int i = 1; i <= size; i++) {
                    builder.append(resultSetMetaData.getColumnName(i) + "=" + rs.getString(i));
                    if (i < size) {
                        builder.append(", ");
                    }
                }
                LOGGER.debug("JDBCConnection.ouputResultSet sql: {}, resultSet: {}", sql, builder.toString());
                RowDataPacket curRow = new RowDataPacket(colunmCount);
                for (int i = 0; i < colunmCount; i++) {
                    int j = i + 1;
                    if (MysqlDefs.isBianry((byte) fieldPks.get(i).type)) {
                        curRow.add(rs.getBytes(j));
                    } else if (fieldPks.get(i).type == MysqlDefs.FIELD_TYPE_DECIMAL ||
                            fieldPks.get(i).type == (MysqlDefs.FIELD_TYPE_NEW_DECIMAL - 256)) { // field type is unsigned byte
                        // ensure that do not use scientific notation format
                        BigDecimal val = rs.getBigDecimal(j);
                        curRow.add(StringUtil.encode(val != null ? val.toPlainString() : null,
                                sc.getCharset()));
                    } else {
                        curRow.add(StringUtil.encode(rs.getString(j),
                                sc.getCharset()));
                    }
    
                }
                curRow.packetId = ++packetId;
                // 將 RowDataPacket 的數(shù)據(jù)寫入 byteBuf
                byteBuf = curRow.write(byteBuf, sc, false);
                byteBuf.flip();
                byte[] row = new byte[byteBuf.limit()];
                byteBuf.get(row);
                byteBuf.clear();
                this.respHandler.rowResponse(row, this);
            }
    
            fieldPks.clear();
    
            // end row
            /* 5 寫入 EOF packet */
            eofPckg = new EOFPacket();
            eofPckg.packetId = ++packetId;
            byteBuf = eofPckg.write(byteBuf, sc, false);
            byteBuf.flip();
            eof = new byte[byteBuf.limit()];
            byteBuf.get(eof);
            sc.recycle(byteBuf);
            this.respHandler.rowEofResponse(eof, this);
        } finally {
            if (rs != null) {
                try {
                    rs.close();
                } catch (SQLException e) {
    
                }
            }
            if (stmt != null) {
                try {
                    stmt.close();
                } catch (SQLException e) {
    
                }
            }
        }
    }
    

四鸣皂、Mycat 初始化

Mycat 初始化主要負責啟動 MycatServer 實例抓谴,啟動 MycatServer 實例的過程中,核心工作是讀取并解析 Mycat 配置文件(schema.xml寞缝、rule.xml 和 server.xml)癌压。MycatServer 使用 “饑餓模式” 初始化一個單例

public class MycatServer {
    private static final MycatServer INSTANCE = new MycatServer();
    
    public static final MycatServer getInstance() {
        return INSTANCE;
    }
    
    private MycatServer() {
        // 讀取文件配置
        this.config = new MycatConfig();
        ...
    }
}

讀取并解析 Mycat 配置文件的具體實現(xiàn)交由 MycatConfig。MycatConfig 內(nèi)部使用 ConfigInitializer 解析全局配置荆陆。ConfigInitializer 主要處理一下幾件事情:

  1. 讀取 schema.xml措拇、rule.xml 和 server.xml 文件并將解析到的配置類賦給 ConfigInitializer 的變量中
  2. 解析 DataHost 和對應的 DataNode,創(chuàng)建物理數(shù)據(jù)庫連接池(PhysicalDBPool)和物理數(shù)據(jù)庫節(jié)點(PhysicalDBNode)
  3. 權(quán)限管理設置
  4. 加載全局序列處理器配置
  5. 配置文件自檢

在此我重點敘述 1 和 2

4.1 配置文件讀取

// 讀取 rule.xml和schema.xml
SchemaLoader schemaLoader = new XMLSchemaLoader();
// 讀取 server.xml
XMLConfigLoader configLoader = new XMLConfigLoader(schemaLoader);

4.2 創(chuàng)建物理數(shù)據(jù)庫連接池(PhysicalDBPool)

initDataHosts 為每一個 <dataHost> 節(jié)點創(chuàng)建一個數(shù)據(jù)庫連接池慎宾,創(chuàng)建完成后返回 Map<String, PhysicalDBPool> physicalDBPoolMap丐吓,其中 key 為 <dataHost> 節(jié)點的 name 屬性值,value 為 <dataHost> 節(jié)點對應的數(shù)據(jù)庫連接池

private Map<String, PhysicalDBPool> initDataHosts(ConfigLoader configLoader) {
    Map<String, DataHostConfig> dataHostConfigMap = configLoader.getDataHosts();
    Map<String, PhysicalDBPool> physicalDBPoolMap = new HashMap<>(dataHostConfigMap.size());
    for (DataHostConfig dataHostConfig : dataHostConfigMap.values()) {
        // 為每個 dataHost 節(jié)點建立一個 PhysicalDBPool
        PhysicalDBPool pool = getPhysicalDBPool(dataHostConfig, configLoader);
        physicalDBPoolMap.put(pool.getHostName(), pool);
    }
    return physicalDBPoolMap;
}

io.mycat.config.ConfigInitializer#getPhysicalDBPool 方法為每個 <dataHost> 節(jié)點建立一個 PhysicalDBPool趟据。主要工作如下:

  1. 為每一個 <dataHost> 節(jié)點的 <writeHost> 節(jié)點創(chuàng)建一個 PhysicalDatasource
  2. 為每一個 <dataHost> 節(jié)點的 <readHost> 節(jié)點創(chuàng)建一個 PhysicalDatasource
  3. 初始化 PhysicalDBPool 并返回
private PhysicalDBPool getPhysicalDBPool(DataHostConfig dataHostConfig, ConfigLoader configLoader) {
    // dataHost 節(jié)點名
    String name = dataHostConfig.getName();
    // 數(shù)據(jù)庫類型券犁,我們這里只討論MySQL
    String dbType = dataHostConfig.getDbType();
    // 連接數(shù)據(jù)庫驅(qū)動,我們這里只討論 MyCat 自己實現(xiàn)的 native
    String dbDriver = dataHostConfig.getDbDriver();
    // 1 為每一個 <dataHost> 節(jié)點的 <writeHost> 節(jié)點創(chuàng)建一個 PhysicalDatasource
    PhysicalDatasource[] writeSources = createDataSource(dataHostConfig, name, dbType, dbDriver, dataHostConfig.getWriteHosts(), false);

    Map<Integer, DBHostConfig[]> readHostsMap = dataHostConfig.getReadHosts();
    Map<Integer, PhysicalDatasource[]> readSourcesMap = new HashMap<Integer, PhysicalDatasource[]>(readHostsMap.size());
    // 對于每個讀節(jié)點建立 key 為 writeHost 下標 value 為 readHost 的 PhysicalDatasource[] 的哈希表
    for (Map.Entry<Integer, DBHostConfig[]> entry : readHostsMap.entrySet()) {
        // 2 為每一個 <dataHost> 節(jié)點的 <readHost> 節(jié)點創(chuàng)建一個 PhysicalDatasource
        PhysicalDatasource[] readSources = createDataSource(dataHostConfig, name, dbType, dbDriver, entry.getValue(), true);
        readSourcesMap.put(entry.getKey(), readSources);
    }

    // 3 初始化 PhysicalDBPool 并返回
    PhysicalDBPool pool = new PhysicalDBPool(dataHostConfig.getName(), dataHostConfig, writeSources, readSourcesMap, dataHostConfig.getBalance(), dataHostConfig.getWriteType());
    pool.setSlaveIDs(dataHostConfig.getSlaveIDs());
    return pool;
}

io.mycat.config.ConfigInitializer#createDataSource 完成具體的數(shù)據(jù)源創(chuàng)建汹碱。根據(jù)不同的 dvType 和 dbDriver 創(chuàng)建不同的 PhysicalDatasource:

  • dvType == mysql && dbDriver == native --> MySQLDataSource
  • dvType == mysql && dbDriver == jdbc --> JDBCDataSource
  • dvType == postgresql && dbDriver == native --> PostgreSQLDataSource
private PhysicalDatasource[] createDataSource(DataHostConfig dataHostConfig, String hostName, String dbType, String dbDriver, DBHostConfig[] nodes, boolean isRead) {
    PhysicalDatasource[] dataSources = new PhysicalDatasource[nodes.length];
    if ("mysql".equals(dbType) && "native".equals(dbDriver)) {
        for (int i = 0; i < nodes.length; i++) {
            //設置最大 idle 時間粘衬,默認為 30 分鐘(可自定義)
            nodes[i].setIdleTimeout(system.getIdleTimeout());
            MySQLDataSource ds = new MySQLDataSource(nodes[i], dataHostConfig, isRead);
            dataSources[i] = ds;
        }
    } else if ("jdbc".equals(dbDriver)) {
        for (int i = 0; i < nodes.length; i++) {
            nodes[i].setIdleTimeout(system.getIdleTimeout());
            JDBCDatasource ds = new JDBCDatasource(nodes[i], dataHostConfig, isRead);
            dataSources[i] = ds;
        }
    } else if ("postgresql".equalsIgnoreCase(dbType) && dbDriver.equalsIgnoreCase("native")) {
        for (int i = 0; i < nodes.length; i++) {
            nodes[i].setIdleTimeout(system.getIdleTimeout());
            PostgreSQLDataSource ds = new PostgreSQLDataSource(nodes[i], dataHostConfig, isRead);
            dataSources[i] = ds;
        }
    } else {
        throw new ConfigException("not supported yet !" + hostName);
    }
    return dataSources;
}

4.3 創(chuàng)建物理數(shù)據(jù)庫節(jié)點(PhysicalDBNode)

io.mycat.config.ConfigInitializer#initDataNodes 為每個 <dataNode> 節(jié)點創(chuàng)建一個 PhysicalDBNode,根據(jù) <dataNode> 節(jié)點的 dataHost 屬性值從 Map<String, PhysicalDBPool> dataHosts 中找到 <dataNode> 對應的連接池,并賦予 PhysicalDBNode

private Map<String, PhysicalDBNode> initDataNodes(ConfigLoader configLoader) {
    Map<String, PhysicalDBNode> nodes = new HashMap<String, PhysicalDBNode>(dataNodeConfigMap.size());
    
    Map<String, DataNodeConfig> dataNodeConfigMap = configLoader.getDataNodes();
    for (DataNodeConfig dataNodeConfig : dataNodeConfigMap.values()) {
        // 根據(jù) dataHost 名稱獲取對應的 PhysicalDBPool
        PhysicalDBPool pool = this.dataHosts.get(dataNodeConfig.getDataHost());
        PhysicalDBNode dataNode = new PhysicalDBNode(dataNodeConfig.getName(), dataNodeConfig.getDatabase(), pool);
        nodes.put(dataNode.getName(), dataNode);
    }
    return nodes;
}

五稚新、mycat 的連接池模型

Mycat 為了最高效的利用后端的 MySQL 連接勘伺,采取了不同于 Cobar 也不同于傳統(tǒng) JDBC 連接池的做法,傳統(tǒng)的做法是基于 Database 的連接池褂删,即一個 MySQL 服務器上有 5 個 Database飞醉,則每個 Database 獨占最大200 個連接。這種模式的最大問題在于屯阀,將一個數(shù)據(jù)庫所具備的最大 1000 個連接缅帘,隔離成了更新小的連接池,于是可能產(chǎn)生一個應用的連接不夠难衰,但其他應用的連接卻很空閑的資源浪費情況钦无,而對于分片這種場景,這個缺陷則幾乎是致命的盖袭,因為每個分片所對應的 Database 的連接數(shù)量被限制在了一個很小的范圍內(nèi)失暂,從而導致系統(tǒng)并發(fā)能力的大幅降低。而 Mycat 則采用了基于 MySQL 實例的連接池模式鳄虱,每個 Database 都可以用現(xiàn)有的 1000 個連接中的空閑連接

5.1 核心對象

5.1.1 ConMap 和 ConQueue

在 Mycat 的連接池里趣席,當前可用的、空閑的 MySQL 連接是放到一個 ConcurrentHashMap 的數(shù)據(jù)結(jié)構(gòu)里醇蝴,Key 為當前連接對應的 Database,另外還有二級分類 ConQueue想罕,按照連接是自動提交還是手動提交模式進行區(qū)分悠栓,這個設計是為了高效的查詢匹配的可用連接。ConMap 和 ConQueue 包含的關鍵對象有:

  • ConcurrentHashMap<String, ConQueue> items:可用的 MySQL 連接容器按价,key 為當前連接對應的 database 名惭适,value 為 ConQueue 對象,里面包含了兩個存儲數(shù)據(jù)庫連接的隊列
  • ConcurrentLinkedQueue<BackendConnection> autoCommitCons:自動提交的數(shù)據(jù)庫連接
  • ConcurrentLinkedQueue<BackendConnection> manCommitCons:手動提交的數(shù)據(jù)庫連接
public class ConMap {
    /**
    * key:當前連接對應的 Database
    * ConQueue:數(shù)據(jù)庫連接隊列(按照連接是自動提交還是手動提交模式進行區(qū)分楼镐,這個設計是為了高效的查詢匹配的可用連接)
    */
    private final ConcurrentHashMap<String, ConQueue> items = new ConcurrentHashMap<String, ConQueue>();
}

public class ConQueue {
    private final ConcurrentLinkedQueue<BackendConnection> autoCommitCons = new ConcurrentLinkedQueue<BackendConnection>();
    private final ConcurrentLinkedQueue<BackendConnection> manCommitCons = new ConcurrentLinkedQueue<BackendConnection>();
    private long executeCount;
}

BackendConnection 為后端數(shù)據(jù)庫連接癞志,其實現(xiàn)有 JDBCConnection、MySQLConnection 等

5.1.2 PhysicalDatasource

對應于 <dataHost> 節(jié)點下的 <writeHost> 或 <readHost> 子節(jié)點框产,表示一個物理數(shù)據(jù)庫實例凄杯。每個數(shù)據(jù)庫實例中保存了多個可用的數(shù)據(jù)庫連接(BackendConnection),mycat 初始化時秉宿,根據(jù) <dataHost> 節(jié)點的 minCon 屬性值初始化多個可用的數(shù)據(jù)庫連接戒突。其關鍵對象有:

  • size:讀或?qū)戇B接池的最大連接數(shù)
  • conMap:存放當前可用的數(shù)據(jù)庫連接
public abstract class PhysicalDatasource {
    private final String name;
    private final int size;
    private final DBHostConfig config;
    private final ConMap conMap = new ConMap();
    private final boolean readNode;
    private final DataHostConfig hostConfig;
    private PhysicalDBPool dbPool;
}

PhysicalDatasource 的實現(xiàn)類有:

5.1.3 PhysicalDBPool

對應于 <dataHost name="localhost1" > 節(jié)點,表示物理數(shù)據(jù)庫實例池描睦。由于 <datahost> 節(jié)點可包含多個 <writeHost> 節(jié)點膊存,因此 PhysicalDBPool 可以包含多個物理數(shù)據(jù)庫實例,其關鍵對象有:

  • hostName:<dataHosr> 標簽的 name 屬性
  • writeSources 和 readSources:可寫和可讀的多個物理數(shù)據(jù)庫實例,對應于 <writeHost> 和 <readHost>
  • activedIndex:表明了當前是哪個寫節(jié)點的數(shù)據(jù)源在生效
public class PhysicalDBPool {
    private final String hostName;

    protected PhysicalDatasource[] writeSources;
    protected Map<Integer, PhysicalDatasource[]> readSources;

    protected volatile int activedIndex;
    private final DataHostConfig dataHostConfig;
}
5.1.4 PhysicalDBNode

對應于 <dataNode /> 節(jié)點隔崎,表示一個數(shù)據(jù)庫分片今艺,PhysicalDBNode 包含的關鍵對象有:

  • name:dataNode 名稱,對應于 <dataNode> 標簽的 name 屬性
  • database:數(shù)據(jù)庫名稱爵卒,對應于 <dataNode> 標簽的 database 屬性
  • PhysicalDBPool dbPool:MySQL 連接池虚缎,里面包含了多個數(shù)據(jù)庫實例 PhysicalDatasource,并將其按照讀節(jié)點和寫節(jié)點分類技潘,實現(xiàn)讀寫分類和節(jié)點切換的功能遥巴。其中 activedIndex 屬性表明了當前是哪個寫節(jié)點的數(shù)據(jù)源在生效
public class PhysicalDBNode {
    protected final String name;
    protected final String database;
    protected final PhysicalDBPool dbPool;
}

若 schema.xml 中配置了一下分片節(jié)點:

<dataNode name="dn1" dataHost="localhost1" database="db_demo_01"/>
<dataNode name="dn2" dataHost="localhost1" database="db_demo_02"/>
<dataNode name="dn3" dataHost="localhost1" database="db_demo_03"/>

當某個用戶會話需要一個自動提交的,到分片 <dataNode name="dn1" dataHost="localhost1" database="db_demo_01"/> 的 SQL 連接的時候享幽,分片節(jié)點 dn1 首先在連接池 dbPool 中查找是否有數(shù)據(jù)庫 db_demo_01 (對應于 PhysicalDatasource)上的可用連接铲掐,若有則看是否有自動提交模式的連接,找到就返回值桩,否則返回 db_demo_01 上的手動提交模式的連接摆霉;若沒有 db_demo_01 的可用連接,則隨機返回一個其他數(shù)據(jù)庫(db_demo_02 或 db_demo_03)對應的可用連接奔坟;若沒有其他數(shù)據(jù)庫也沒有可用連接携栋,并且連接池還沒達到上限,則創(chuàng)建一個新連接并返回

上述獲取數(shù)據(jù)庫連接的邏輯有一種情況是:用戶會話得到的數(shù)據(jù)庫連接可能不是來自于 db_demo_01 的咳秉,因此在執(zhí)行具體的 SQL 之前婉支,還有一個自動同步數(shù)據(jù)庫連接的過程:包括事務隔離級別、事務模式澜建、字符集向挖、database 等四個指標。同步完成以后炕舵,才會執(zhí)行具體的 SQL 請求

通過共享一個 MySQL 上的所有數(shù)據(jù)庫的可用連接何之,并結(jié)合連接狀態(tài)同步的特性,MyCAT 的連接池做到了最佳的吞吐量咽筋,也在一定程度上提升了整個系統(tǒng)的并發(fā)支撐能力

5.2 創(chuàng)建數(shù)據(jù)庫連接

5.2.1 創(chuàng)建新連接時機

創(chuàng)建新數(shù)據(jù)庫連接的方法為 PhysicalDatasource#createNewConnection(io.mycat.backend.mysql.nio.handler.ResponseHandler, java.lang.Object, java.lang.String)溶推,其有兩個觸發(fā)時機:

  • io.mycat.backend.datasource.PhysicalDatasource#createByIdleLitte

    執(zhí)行空閑檢測時觸發(fā),若當前數(shù)據(jù)庫連接總數(shù)(空閑連接數(shù)和活動鏈接數(shù)之和)小于連接池的最大連接數(shù)奸攻,且空閑連接數(shù)小于連接池最小連接數(shù)蒜危,則調(diào)用 PhysicalDatasource#createByIdleLitte 方法創(chuàng)建新數(shù)據(jù)庫連接

    if ((createCount > 0) && (idleCons + activeCons < size) && (idleCons < hostConfig.getMinCon())) {
       createByIdleLitte(idleCons, createCount);
    }
    
  • io.mycat.backend.datasource.PhysicalDatasource#getConnection

    首先調(diào)用 ConMap#tryTakeCon(java.lang.String, boolean) 當前 database 上是否有可用連接,若有則立即返回睹耐,否則從其他的 database 上找一個可用連接返回舰褪。若 ConMap#tryTakeCon 返回 null,表示數(shù)據(jù)庫連接池中沒有空閑連接疏橄,則調(diào)用 PhysicalDatasource#createNewConnection 創(chuàng)建新連接

    public void getConnection(String schema, boolean autocommit, final ResponseHandler handler, final Object attachment) throws IOException {
        // 從當前連接 map 中拿取已建立好的后端連接
        BackendConnection con = this.conMap.tryTakeCon(schema, autocommit);
        if (con != null) {
            //如果不為空占拍,則綁定對應前端請求的handler
            takeCon(con, handler, attachment, schema);
        } else {
            long activeCons = increamentCount.longValue() + totalConnectionCount;
            if (activeCons < size) {
                createNewConnection(handler, attachment, schema);
            } else {
                LOGGER.error("the max activeConnnections size can not be max than maxconnections");
                throw new IOException("the max activeConnnections size can not be max than maxconnections");
            }
        }
    }
    
5.2.2 創(chuàng)建新數(shù)據(jù)庫連接

MycatServer#startup 方法里其中一件事情就是初始化 PhysicalDBPool

public void startup() throws IOException {
    ...
    Map<String, PhysicalDBPool> dataHosts = config.getDataHosts();
    for (PhysicalDBPool physicalDBPool : dataHosts.values()) {
        String index = dnIndexProperties.getProperty(physicalDBPool.getHostName(), "0");
        physicalDBPool.init(Integer.parseInt(index));
        physicalDBPool.startHeartbeat();
    }
    ...
}

physicalDBPool.init(Integer.parseInt(index)) 中 調(diào)用 PhysicalDBPool#initSource 方法略就,該方法對每一個 PhysicalDatasource 調(diào)用 getConnection 方法創(chuàng)建新的數(shù)據(jù)庫連接

public void init(int index, String reason) {
    for (int i = 0; i < writeSources.length; i++) {
        int j = loop(i + index);
        initSource(j, writeSources[j])
    }
}
    
private boolean initSource(int index, PhysicalDatasource physicalDatasource) {
    int initSize = physicalDatasource.getConfig().getMinCon();
    CopyOnWriteArrayList<BackendConnection> list = new CopyOnWriteArrayList<>();
    GetConnectionHandler getConHandler = new GetConnectionHandler(list, initSize);
    for (int i = 0; i < initSize; i++) {
        try {
            physicalDatasource.getConnection(this.schemas[i % schemas.length], true, getConHandler, null);
        } catch (Exception e) {
            LOGGER.warn(getMessage(index, " init connection error."), e);
        }
    }
    ...
}

PhysicalDatasource#createNewConnection 方法新建一個線程異步執(zhí)行創(chuàng)建數(shù)據(jù)庫連接的操作,每個線程通過調(diào)用抽象方法來進行具體的創(chuàng)建邏輯晃酒。

private void createNewConnection(final ResponseHandler handler, final Object attachment, final String schema) throws IOException {
    // aysn create connection
    final AtomicBoolean hasError = new AtomicBoolean(false);

    MycatServer.getInstance().getBusinessExecutor().execute(new Runnable() {
        @Override
        public void run() {
            try {
                createNewConnection(new DelegateResponseHandler(handler) {
                    @Override
                    public void connectionError(Throwable e, BackendConnection conn) {
                        if (hasError.compareAndSet(false, true)) {
                            handler.connectionError(e, conn);
                        } else {
                            LOGGER.info("connection connectionError ");
                        }
                    }

                    @Override
                    public void connectionAcquired(BackendConnection conn) {
                        LOGGER.info("connection id is " + conn.getId());
                        takeCon(conn, handler, attachment, schema);
                    }
                }, schema);
            } catch (IOException e) {
                if (hasError.compareAndSet(false, true)) {
                    handler.connectionError(e, null);
                } else {
                    LOGGER.info("connection connectionError ");
                }
            }
        }
    });
}

每個繼承 PhysicalDatasource 的數(shù)據(jù)源對象自己實現(xiàn)如下抽象方法:

public abstract void createNewConnection(ResponseHandler handler, String schema) throws IOException;

我們以 JDBCDatasource 為例表牢,其實現(xiàn)的 createNewConnection 方法實現(xiàn)代碼如下:

public class JDBCDatasource extends PhysicalDatasource {
    @Override
    public void createNewConnection(ResponseHandler handler, String schema) throws IOException {
        DBHostConfig cfg = getConfig();
        JDBCConnection jdbcConnection = new JDBCConnection();
        jdbcConnection.setHost(cfg.getIp());
        jdbcConnection.setPort(cfg.getPort());
        jdbcConnection.setJdbcDatasource(this);
        jdbcConnection.setSchema(schema);
        jdbcConnection.setDbType(cfg.getDbType());
        // 復用mysql的Backend的ID,需要在process中存儲
        jdbcConnection.setId(NIOConnector.ID_GENERATOR.getId());

        NIOProcessor processor = MycatServer.getInstance().nextProcessor();
        jdbcConnection.setProcessor(processor);
        processor.addBackend(jdbcConnection);

        try {
            Connection con = getConnection();
            jdbcConnection.setCon(con);
            // notify handler
            handler.connectionAcquired(jdbcConnection);
        } catch (Exception e) {
            handler.connectionError(e, jdbcConnection);
        }
    }
}

主要做了一下幾件事:

  • 實例化一個 JDBCConnection贝次,設置相關參數(shù)

  • 調(diào)用 JDBCDatasource#getConnection 獲取 Connection

    JDBCDatasource#getConnection 直接使用 DriverManager 創(chuàng)建一個新連接并返回

    public Connection getConnection() throws SQLException {
        DBHostConfig cfg = getConfig();
        Connection connection = DriverManager.getConnection(cfg.getUrl(), cfg.getUser(), cfg.getPassword());
        return connection;
    }
    
  • 調(diào)用 DelegateResponseHandler#connectionAcquired崔兴,作為已獲得有效連接的響應處理

    @Override
    public void connectionAcquired(BackendConnection conn) {
        LOGGER.info("connection id is " + conn.getId());
        takeCon(conn, handler, attachment, schema);
    }
    

    ResponseHandler#connectionAcquired 調(diào)用 PhysicalDatasource#takeCon 方法進行相應處理,代碼如下:

    private BackendConnection takeCon(BackendConnection conn, final ResponseHandler handler, final Object attachment, String schema) {
        // 該連接已被借用
        conn.setBorrowed(true);
    
        if (!conn.getSchema().equals(schema)) {
            // need do schema syn in before sql send
            conn.setSchema(schema);
        }
    
        ConQueue queue = conMap.getSchemaConQueue(schema);
        queue.incExecuteCount();
        conn.setAttachment(attachment);
        // 每次取連接的時候蛔翅,更新下 lasttime敲茄,防止在前端連接檢查的時候,關閉連接山析,導致sql執(zhí)行失敗
        conn.setLastTime(System.currentTimeMillis());
        handler.connectionAcquired(conn);
        return conn;
    }
    

    主要做了一下幾件事:

    • 設置連接已被借用

    • 獲取 ConQueue堰燎,增加可以執(zhí)行連接的數(shù)量

    • 調(diào)用 ResponseHandler#connectionAcquired,具體實現(xiàn)見 GetConnectionHandler#connectionAcquired笋轨,該方法調(diào)用 BackendConnection#release 釋放連接秆剪,調(diào)用 PhysicalDatasource#returnCon 方法將釋放的連接放回 ConMap 的 ConQueue 中

      @Override
      public void connectionAcquired(BackendConnection conn) {
          successCons.add(conn);
          finishedCount.addAndGet(1);
          logger.info("connected successfully " + conn);
          conn.release();
      }
      
      @Override
      public void JDBCConnection#release() {
          jdbcDatasource.releaseChannel(this);
      }
      
      public void PhysicalDatasource#releaseChannel(BackendConnection c) {
          returnCon(c);
      }
      
      private void PhysicalDatasource#returnCon(BackendConnection c) {
          c.setAttachment(null);
          c.setBorrowed(false);
          c.setLastTime(TimeUtil.currentTimeMillis());
          ConQueue queue = this.conMap.getSchemaConQueue(c.getSchema());
      
          boolean ok = false;
          if (c.isAutocommit()) {
              ok = queue.getAutoCommitCons().offer(c);
          } else {
              ok = queue.getManCommitCons().offer(c);
          }
      
          // 若無法放入 ConQueue 則將連接關閉
          if (!ok) {
              LOGGER.warn("can't return to pool ,so close con " + c);
              c.close("can't return to pool ");
          }
      }
      
5.2.3 總結(jié)

Mycat 服務啟動時調(diào)用 MycatServer 的 startUp 方法對每一個 <dataHost> 節(jié)點的多個 <writeHost> 節(jié)點對應的數(shù)據(jù)源做初始化工作。初始創(chuàng)建數(shù)據(jù)庫連接數(shù)由 <dataHost> 節(jié)點的minCon 屬性值決定爵政。每創(chuàng)建一個 BackendConnection 便回調(diào) GetConnectionHandler#connectionAcquired 將新生成的 connection 的 borrowed 屬性設置為 false(該屬性個人理解是標記連接是否空閑)仅讽,然后將connection 保存于 ConQueue 中

因此一個 <dataHost> 節(jié)點對應一個 PhysicalDBPoolPhysicalDBPool 類中的 PhysicalDatasource[] writeSources 對應于 <dataHost> 節(jié)點下多個 <writeHost> 節(jié)點钾挟。每個 PhysicalDatasource 中持有一個 ConMap conMap 作為數(shù)據(jù)源的連接池洁灵, 里面存放著可用的數(shù)據(jù)庫連接 BackendConnection

Mycat 根據(jù) <dataNode> 節(jié)點的 dataHost 屬性和 database 屬性,將數(shù)據(jù)庫連接均勻得分配給在同一個 dataHost 中的不同數(shù)據(jù)庫掺出。例如對于以下配置:

<!-- 分片節(jié)點 -->
<dataNode name="dn1" dataHost="localhost1" database="db_demo_01"/>
<dataNode name="dn2" dataHost="localhost1" database="db_demo_02"/>
<dataNode name="dn3" dataHost="localhost1" database="db_demo_03"/>
<!-- 節(jié)點主機 -->
<dataHost name="localhost1" maxCon="10000" minCon="100" balance="0" writeType="0" dbType="mysql" dbDriver="jdbc" switchType="1" slaveThreshold="100">
    <heartbeat>select user()</heartbeat>
    <writeHost host="hostM1" url="jdbc:mysql://localhost:3306?characterEncoding=utf8&amp;useSSL=false&amp;serverTimezone=UTC&amp;autoReconnect=true" user="root" password="root">
    </writeHost>
</dataHost>

Mycat 會初始化 100 個(minCon="100")數(shù)據(jù)庫連接徽千,并將這 100 個連接均分給 db_demo_01、db_demo_02 和 db_demo_03蛛砰,如下圖所示

5.3 獲取數(shù)據(jù)庫連接

本節(jié)主要講述獲取可用的數(shù)據(jù)庫連接用于執(zhí)行客戶端的 SQL 請求

5.3.1 涉及的核心類
  1. NIOAcceptor:負責處理 Accept 事件,即 MyCAT 作為服務端去處理前端業(yè)務程序發(fā)過來的連接請求
  2. ServerConnectionFactory:客戶端和 Mycat 連接工廠黍衙,用于創(chuàng)建客戶端連接
  3. ServerConnection:客戶端連接(客戶端和 Mycat 之間的連接)
  4. NonBlockingSession:客戶端連接和后端數(shù)據(jù)庫連接的會話泥畅,其核心對象有:
    • ServerConnection serverConnection:客戶端連接
    • ConcurrentMap<RouteResultSetNode, BackendConnection> backendConnectionMap:存放路由節(jié)點和對應的數(shù)據(jù)庫連接的容器
    • SingleNodeHandler singleNodeHandler:單路由節(jié)點請求處理器
    • MultiNodeQueryHandler multiNodeHandler:多路由節(jié)點請求處理器
  5. SingleNodeHandler(MultiNodeQueryHandler):路由節(jié)點請求處理器,其核心對象有:
    • RouteResultSetNode routeResultSetNode
    • RouteResultset rrs
    • NonBlockingSession session
  6. PhysicalDBNode
  7. PhysicalDatasource
5.3.2 獲取過程解析

NIOAcceptor 在接受到前端發(fā)來的連接請求后琅翻,會調(diào)用 ServerConnectionFactory 實例化一個 ServerConnection位仁,并實例化一個 NonBlockingSession 賦予 ServerConnection

public class ServerConnectionFactory extends FrontendConnectionFactory {
    @Override
    protected FrontendConnection getConnection(NetworkChannel channel) throws IOException {
        SystemConfig systemConfig = MycatServer.getInstance().getConfig().getSystem();
        // 將 channel 包裝為一個 ServerConnection
        ServerConnection serverConnection = new ServerConnection(channel);
        // 設置客戶端查詢處理器
        serverConnection.setQueryHandler(new ServerQueryHandler(serverConnection));
        // 設置客戶端和 Mycat 的一個會話 session
        serverConnection.setSession2(new NonBlockingSession(serverConnection));
        ...
        return serverConnection;
    }
}

前端 SQL 請求進來之后诸典,Mycat 調(diào)用 ServerConnection#routeEndExecuteSQL 進行路由計算并得到路由結(jié)果 RouteResultset移层,然后調(diào)用 NonBlockingSession#execute 進行處理,若 RouteResultset 中包含多個路由節(jié)點帖努,則調(diào)用 MultiNodeQueryHandler 的 execute 方法棠众;若 RouteResultset 只包含單個路由節(jié)點琳疏,則調(diào)用 SingleNodeHandler 的 execute 方法有决。此處我們假設是單個路由節(jié)點

public class NonBlockingSession implements Session {
    @Override
    public void execute(RouteResultset routeResultset, int type) {
        RouteResultSetNode[] nodes = routeResultset.getNodes();
        if (nodes.length == 1) {
            // 實例化一個 SingleNodeHandler 對象
            singleNodeHandler = new SingleNodeHandler(routeResultset, this);
            singleNodeHandler.execute();
        }
    }
}

SingleNodeHandler#execute 首先通過 session 獲取客戶端連接 ServerConnection 以及后端數(shù)據(jù)庫連接 BackendConnection。第一次獲取 BackendConnection 時由于 session 還沒有將 routeResultSetNode 和 BackendConnection 綁定空盼,故 backendConnection 返回 null书幕,SingleNodeHandler#execute 要調(diào)用 PhysicalDBNode#getConnection 創(chuàng)建一個新的數(shù)據(jù)庫連接,并將其綁定到 session 中

public class SingleNodeHandler implements ResponseHandler {
    public void execute() throws Exception {
        // 通過 session 拿到客戶端連接 ServerConnection
        ServerConnection serverConnection = session.getServerConnection();
        // 通過 session 拿到后端數(shù)據(jù)庫連接
        final BackendConnection backendConnection = session.getTarget(routeResultSetNode);
        // 若存在 routeResultsetNode 對應的 BackendConnection
        if (session.tryExistsCon(backendConnection, routeResultSetNode)) {
            _execute(backendConnection);
        } else { // 若不存在 routeResultsetNode 對應的 BackendConnection揽趾,則創(chuàng)建新的連接
            MycatConfig conf = MycatServer.getInstance().getConfig();
            PhysicalDBNode dn = conf.getDataNodes().get(routeResultSetNode.getName());
            dn.getConnection(dn.getDatabase(), serverConnection.isAutocommit(), routeResultSetNode, this, routeResultSetNode);
        }
    }
}

PhysicalDBNode#getConnection 從分片節(jié)點 dataNode 的數(shù)據(jù)庫連接池中獲取一個可寫的 PhysicalDatasource台汇,并調(diào)用 PhysicalDatasource#getConnection 從 ConMap 中獲取一個可用的數(shù)據(jù)庫連接

public class PhysicalDBNode {
    public void getConnection(String schema, boolean autoCommit, RouteResultSetNode routeResultSetNode, ResponseHandler handler, Object attachment) throws Exception {
        // 從分片節(jié)點 dataNode 的數(shù)據(jù)庫連接池中獲取一個可寫的 PhysicalDatasource
        PhysicalDatasource writeSource = dbPool.getSource();
        writeSource.getConnection(schema, autoCommit, handler, attachment);
    }
}

PhysicalDatasource#getConnection 從 ConMap 中獲取一個可用的數(shù)據(jù)庫連接后,調(diào)用 PhysicalDatasource#takeCon 將獲取的 con 標記為已用(borrowed = true)

public class PhysicalDatasource {
    public void getConnection(String schema, boolean autocommit, final ResponseHandler handler, final Object attachment)
            throws IOException {
        // 從 conMap 中拿取已建立好的后端連接
        BackendConnection con = this.conMap.tryTakeCon(schema, autocommit);
        if (con != null) {
            takeCon(con, handler, attachment, schema);
        }
    }
    
    private BackendConnection takeCon(BackendConnection backendConnection, final ResponseHandler handler, final Object attachment, String schema) {
        // 標記該連接為已用
        backendConnection.setBorrowed(true);
        handler.connectionAcquired(backendConnection);
        return backendConnection;
    }
}

之后調(diào)用 ResponseHandler#connectionAcquired 進行連接獲取后的確認邏輯篱瞎,此處調(diào)用的實際實現(xiàn)類為 SingleNodeHandler苟呐,其在創(chuàng)建數(shù)據(jù)庫連接時將自己作為 ResponseHandler 傳入 PhysicalDBNode#getConnection 中

dn.getConnection(dn.getDatabase(), serverConnection.isAutocommit(), routeResultSetNode, this, routeResultSetNode);

SingleNodeHandler 本身實現(xiàn)了 ResponseHandler 接口,并實現(xiàn)了 connectionAcquired 方法俐筋,具體代碼如下:

public class SingleNodeHandler implements ResponseHandler {
    public void connectionAcquired(final BackendConnection backendConnection) {
        // 將 routeResultsetNode 對應的后端連接記錄在 session 的 backendConnectionMap 中
        session.bindConnection(routeResultSetNode, backendConnection);
        _execute(backendConnection);
    }
}

因此牵素,PhysicalDatasource#getConnection 從 ConMap 中獲取一個可用的數(shù)據(jù)庫連接后,首先將該連接標記為已用(borrowed = true)校哎,然后將 backendConnection 和對應的路由節(jié)點 node 綁定到 session 的 backendConnectionMap 中两波,最后調(diào)用 _execute(backendConnection) 最進一步執(zhí)行請求處理,具體客戶端 SQL 請求執(zhí)行邏輯參見:三闷哆、客戶端 SQL 請求執(zhí)行流程

5.3.3 總結(jié)

當客戶端發(fā)送 SQL 請求至 Mycat 時腰奋,Mycat 首先在 NonBlockingSession 的 ConcurrentMap<RouteResultSetNode, BackendConnection> backendConnectionMap 中查找是否存在 SQL 的路由節(jié)點 RouteResultSetNode 對應的 BackendConnection,若存在則返回抱怔,繼續(xù)執(zhí)行后續(xù)操作劣坊;若不存在,則從 PhysicalDatasource 的 ConMap 中獲取一個可用的數(shù)據(jù)庫連接 BackendConnection屈留,并將其標記為已用(BackendConnection 的 borrowed 屬性設置為 true)局冰,然后將 BackendConnection 和 RouteResultSetNode 注冊于 NonBlockingSession 的 backendConnectionMap 中

5.4 釋放已用的數(shù)據(jù)庫連接

5.5 關閉數(shù)據(jù)庫連接

附錄

附錄1 客戶端命令請求報文命令類型詳情表

類型值 命令 功能 關聯(lián)函數(shù)
0x00 COM_SLEEP (內(nèi)部線程狀態(tài)) (無)
0x01 COM_QUIT 關閉連接 mysql_close
0x02 COM_INIT_DB 切換數(shù)據(jù)庫 mysql_select_db
0x03 COM_QUERY SQL查詢請求 mysql_real_query
0x04 COM_FIELD_LIST 獲取數(shù)據(jù)表字段信息 mysql_list_fields
0x05 COM_CREATE_DB 創(chuàng)建數(shù)據(jù)庫 mysql_create_db
0x06 COM_DROP_DB 刪除數(shù)據(jù)庫 mysql_drop_db
0x07 COM_REFRESH 清除緩存 mysql_refresh
0x08 COM_SHUTDOWN 停止服務器 mysql_shutdown
0x09 COM_STATISTICS 獲取服務器統(tǒng)計信息 mysql_stat
0x0A COM_PROCESS_INFO 獲取當前連接的列表 mysql_list_processes
0x0B COM_CONNECT (內(nèi)部線程狀態(tài)) (無)
0x0C COM_PROCESS_KILL 中斷某個連接 mysql_kill
0x0D COM_DEBUG 保存服務器調(diào)試信息 mysql_dump_debug_info
0x0E COM_PING 測試連通性 mysql_ping
0x0F COM_TIME (內(nèi)部線程狀態(tài)) (無)
0x10 COM_DELAYED_INSERT (內(nèi)部線程狀態(tài)) (無)
0x11 COM_CHANGE_USER 重新登陸(不斷連接) mysql_change_user
0x12 COM_BINLOG_DUMP 獲取二進制日志信息 (無)
0x13 COM_TABLE_DUMP 獲取數(shù)據(jù)表結(jié)構(gòu)信息 (無)
0x14 COM_CONNECT_OUT (內(nèi)部線程狀態(tài)) (無)
0x15 COM_REGISTER_SLAVE 從服務器向主服務器進行注冊 (無)
0x16 COM_STMT_PREPARE 預處理SQL語句 mysql_stmt_prepare
0x17 COM_STMT_EXECUTE 執(zhí)行預處理語句 mysql_stmt_execute
0x18 COM_STMT_SEND_LONG_DATA 發(fā)送BLOB類型的數(shù)據(jù) mysql_stmt_send_long_data
0x19 COM_STMT_CLOSE 銷毀預處理語句 mysql_stmt_close
0x1A COM_STMT_RESET 清除預處理語句參數(shù)緩存 mysql_stmt_reset
0x1B COM_SET_OPTION 設置語句選項 mysql_set_server_option
0x1C COM_STMT_FETCH 獲取預處理語句的執(zhí)行結(jié)果 mysql_stmt_fetch
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市灌危,隨后出現(xiàn)的幾起案子康二,更是在濱河造成了極大的恐慌,老刑警劉巖勇蝙,帶你破解...
    沈念sama閱讀 206,602評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件沫勿,死亡現(xiàn)場離奇詭異,居然都是意外死亡味混,警方通過查閱死者的電腦和手機产雹,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來翁锡,“玉大人蔓挖,你說我怎么就攤上這事」菹危” “怎么了瘟判?”我有些...
    開封第一講書人閱讀 152,878評論 0 344
  • 文/不壞的土叔 我叫張陵怨绣,是天一觀的道長。 經(jīng)常有香客問我荒适,道長梨熙,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,306評論 1 279
  • 正文 為了忘掉前任刀诬,我火速辦了婚禮咽扇,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘陕壹。我一直安慰自己质欲,他們只是感情好,可當我...
    茶點故事閱讀 64,330評論 5 373
  • 文/花漫 我一把揭開白布糠馆。 她就那樣靜靜地躺著嘶伟,像睡著了一般。 火紅的嫁衣襯著肌膚如雪又碌。 梳的紋絲不亂的頭發(fā)上九昧,一...
    開封第一講書人閱讀 49,071評論 1 285
  • 那天,我揣著相機與錄音毕匀,去河邊找鬼铸鹰。 笑死,一個胖子當著我的面吹牛皂岔,可吹牛的內(nèi)容都是我干的蹋笼。 我是一名探鬼主播,決...
    沈念sama閱讀 38,382評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼躁垛,長吁一口氣:“原來是場噩夢啊……” “哼剖毯!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起教馆,我...
    開封第一講書人閱讀 37,006評論 0 259
  • 序言:老撾萬榮一對情侶失蹤逊谋,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后土铺,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體胶滋,經(jīng)...
    沈念sama閱讀 43,512評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,965評論 2 325
  • 正文 我和宋清朗相戀三年舒憾,在試婚紗的時候發(fā)現(xiàn)自己被綠了镀钓。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片穗熬。...
    茶點故事閱讀 38,094評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡镀迂,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出唤蔗,到底是詐尸還是另有隱情探遵,我是刑警寧澤窟赏,帶...
    沈念sama閱讀 33,732評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站箱季,受9級特大地震影響涯穷,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜藏雏,卻給世界環(huán)境...
    茶點故事閱讀 39,283評論 3 307
  • 文/蒙蒙 一拷况、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧掘殴,春花似錦赚瘦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至病瞳,卻和暖如春揽咕,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背套菜。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評論 1 262
  • 我被黑心中介騙來泰國打工亲善, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人笼踩。 一個月前我還...
    沈念sama閱讀 45,536評論 2 354
  • 正文 我出身青樓逗爹,卻偏偏與公主長得像,于是被迫代替她去往敵國和親嚎于。 傳聞我的和親對象是個殘疾皇子掘而,可洞房花燭夜當晚...
    茶點故事閱讀 42,828評論 2 345

推薦閱讀更多精彩內(nèi)容