本文分兩部分:
第一部分講解 從NIO獲取sql命令到開始路由解析 的這段流程。該部分只是粗略講解,以便調(diào)試代碼的時候大約知道“我從何處來寻馏?”兆龙;
第二部分詳細講解路由解析的流程驱闷。
一耻台、從NIO獲取sql命令到開始路由解析
1、前端處理器的消費者
啟動Skynet空另,SkynetServer類會開啟4個前端處理器線程FrontEndHandlerRunner.java盆耽,4個是CPU核數(shù)。
for (int i = 0; i < system.getProcessorExecutor(); i++) {
businessExecutor.execute(new FrontEndHandlerRunnable(frontHandlerQueue));
}
從下圖可以看出扼菠,前端處理器線程用來處理一個阻塞隊列FrontHandlerQueue中的元素征字,是一個消費者,run()方法中是一個while(true)的死循環(huán)娇豫,當阻塞隊列FrontHandlerQueue中沒有值時匙姜,代碼阻塞,有值時冯痢,向下執(zhí)行氮昧,主要業(yè)務(wù)由handler.handle();去處理。
public class FrontEndHandlerRunnable implements Runnable {
private final BlockingQueue<FrontendCommandHandler> frontHandlerQueue;
public FrontEndHandlerRunnable(BlockingQueue<FrontendCommandHandler> frontHandlerQueue) {
this.frontHandlerQueue = frontHandlerQueue;
}
@Override
public void run() {
FrontendCommandHandler handler;
while (true) {
try {
handler = frontHandlerQueue.take();
.........
handler.handle();
.........
2浦楣、前端處理器的生產(chǎn)者
消費者有了袖肥,接下來我們?nèi)フ疑a(chǎn)者≌窭停回到SkynetServer.java中椎组,顯而易見,這是一個餓漢式單例历恐,它有很多成員變量寸癌,在這里我們可以看到我們需要的前端處理器隊列:
private BlockingQueue<FrontendCommandHandler> frontHandlerQueue;
結(jié)合前面的代碼,我們可以發(fā)現(xiàn)弱贼,項目中任何給frontHandlerQueue隊列添加元素的地方都是生產(chǎn)者蒸苇,我們在代碼中一直追尋frontHandlerQueue變量,最終到了FrontCommandHandler.handler(byte[] data)方法中吮旅,該方法的最后一行代碼如下:
SkynetServer.getInstance().getFrontHandlerQueue().offer(this);
this為FrontCommandHandler的實例溪烤,顯而易見,此處就是生產(chǎn)者生產(chǎn)對象庇勃,并放入前端處理隊列的地方檬嘀。
由此方法向上追溯,會找到NIOSocketWR.java的asyncRead()方法责嚷,它是負責(zé)讀數(shù)據(jù)的方法鸳兽,由RW.java這個負責(zé)讀寫數(shù)據(jù)的線程來調(diào)用。
管中窺豹再层,可見一斑贸铜。其實Skynet中幾乎所有的生產(chǎn)者消費者模式都是如此堡纬,在SkynetServer.java這個單例中統(tǒng)一定義,根據(jù)它里面的引用都能找到其生產(chǎn)者蒿秦,然后由多線程的處理器去消費烤镐。
3、獲取sql語句和sql類型
上文提到FrontEndHandlerRunnable.java中的run()方法中調(diào)用了 handler.handle()棍鳖,幾乎所有的業(yè)務(wù)邏輯都由該方法處理炮叶,我們點進去繼續(xù)看這個方法,一直到FrontendCommandHandler.java的handleData(byte[] data)方法渡处,里面有許多的switch—case判斷镜悉,如下圖:
protected void handleData(byte[] data) {
.........
switch (data[4]) {
case MySQLPacket.COM_INIT_DB:
commands.doInitDB();
source.initDB(data);
break;
case MySQLPacket.COM_QUERY:
commands.doQuery();
source.query(data);
break;
.........
handleData(byte[] data)方法的入?yún)⒕褪且粋€完成的數(shù)據(jù)包,我們?nèi)×薲ata中的第5個字節(jié)來判斷命令類型医瘫,之所以取第5個字節(jié)侣肄,這是根據(jù)mysql協(xié)議的約定的報文結(jié)構(gòu)來的,在這里對mysql的報文結(jié)構(gòu)做一個極簡單回顧:
如圖醇份,Mysql的報文結(jié)構(gòu)前4個字節(jié)為消息頭稼锅,其中前3個是字節(jié)合在一起說明本條消息的長度,第4個字節(jié)是序號僚纷,用于保證消息的順序矩距。從第5個字節(jié)往后為客戶端正式的命令請求報文怖竭,而命令請求報文的第1個字節(jié)又為當前命令類型锥债,如下圖:
上述代碼中有多個switch—case判斷,我們只需關(guān)注case MySQLPacket.COM_QUERY绽左,我們發(fā)出的大部分sql命令都會走這個分支,我們進入source.query(data)中戏蔑,發(fā)現(xiàn)該方法中首次將數(shù)據(jù)解析成了sql語句:
public void query(byte[] data) {
// 取得語句
String sql = null;
try {
MySQLMessage mm = new MySQLMessage(data);
mm.position(5);
sql = mm.readString(charsetName.getClient());
} catch (UnsupportedEncodingException e) {
.........
this.query(sql);
}
追蹤代碼中 this.query(sql)方法总棵,一直到ServerQueryHandler.java中的query(String sql):
public void query(String sql) {
.........
//通過字符串解析,獲取sql語句的類型
int rs = ServerParse.parse(sql);
.........
int sqlType = rs & 0xff;
//處理注釋
if (isWithHint) {
.........
c.execute(sql, rs & 0xff);
} else {
//根據(jù)sql類型迄汛,分發(fā)給不同的handler進行處理
switch (sqlType) {
case ServerParse.EXPLAIN:
ExplainHandler.handle(sql, c, rs >>> 8);
break;
case ServerParse.EXPLAIN2:
Explain2Handler.handle(sql, c, rs >>> 8);
break;
case ServerParse.DESCRIBE:
DescribeHandler.handle(sql, c);
break;
case ServerParse.SET:
SetHandler.handle(sql, c, rs >>> 8);
break;
case ServerParse.SHOW:
ShowHandler.handle(sql, c, rs >>> 8);
break;
case ServerParse.SELECT:
SelectHandler.handle(sql, c, rs >>> 8);
break;
.........
default:
.........
c.execute(sql, rs & 0xff);
}
}
}
該方法較長鹃觉,篇幅原因,上面只截取了相對重要的部分疗隶,注釋其實已經(jīng)很清楚了,最終會交由各個sql類型的handler去處理卵沉,以常用的 select 類型為例,調(diào)用了SelectHandler.handle(sql, c, rs >>> 8)停撞,該方法最終還是調(diào)用ServerConnection.java的execute(String sql, int type)方法横堡,殊途同歸道宅,其它種類的解析器也會除了特殊情況直接返回的污茵,都會調(diào)用該方法去做路由解析民珍。