RW#run
輪詢注冊隊列中是否有AbstractConnection
士飒,若存在且為讀事件則調(diào)用AbstractConnection#asynRead
異步讀取數(shù)據(jù)挽霉,實際處理邏輯見NIOSocketWR#asynRead
-
NIOSocketWR#asynRead
從 前端連接的channel
中讀取數(shù)據(jù),并且保存到對應(yīng)AbstractConnection
的readBuffer
中变汪,之后調(diào)用AbstractConnection#onReadData
處理讀取到的數(shù)據(jù)@Override public void asynRead() throws IOException { ByteBuffer theBuffer = con.readBuffer; if (theBuffer == null) { theBuffer = con.processor.getBufferPool().allocate(con.processor.getBufferPool().getChunkSize()); con.readBuffer = theBuffer; } // 從 SocketChannel 中讀取數(shù)據(jù)侠坎,并且保存到 AbstractConnection 的 readBuffer 中,readBuffer 處于 write mode裙盾,返回讀取了多少字節(jié) int got = channel.read(theBuffer); // 調(diào)用處理讀取到的數(shù)據(jù)的方法 con.onReadData(got); }
AbstractConnection#onReadData
讀取readBuffer
中的數(shù)據(jù)并調(diào)用AbstractConnection#handle
方法進行下一步處理实胸,其內(nèi)部調(diào)用FrontendCommandHandler#handle
FrontendCommandHandler#handle
根據(jù)data[4]
來判斷命令類型,客戶端命令請求報文格式如下圖:
data 的第五個字節(jié)存儲命令類型番官,客戶端命令請求報文命令類型詳情表見附錄1庐完。我們以 MySQLPacket.COM_QUERY
為例進行接下來的討論。當 data[4] == MySQLPacket.COM_QUERY
時徘熔,調(diào)用 FrontendConnection#query(byte[])
public void handle(byte[] data) {
// 判斷命令類型
switch (data[4]) {
...
// INSERT/SELECT/UPDATE/DELETE 等 SQL 歸屬于 MySQLPacket.COM_QUERY
case MySQLPacket.COM_QUERY:
commands.doQuery();
source.query(data);
break;
...
}
}
-
FrontendConnection#query(byte[])
將data
字節(jié)數(shù)組轉(zhuǎn)化成String
類型的 SQL门躯,ServerQueryHandler#query(String)
方法public void query(byte[] data) { MySQLMessage mm = new MySQLMessage(data); // 從 data[5] 即第六個字節(jié)開始讀取參數(shù)體 mm.position(5); String sql = mm.readString(charset); // 執(zhí)行 sql 語句,內(nèi)部調(diào)用 ServerQueryHandler#query(String) this.query( sql ); }
-
ServerQueryHandler#query(String)
解析 SQL 類型酷师,根據(jù)sqlType
使用不同的Handler
做處理@Override public void query(String sql) { ServerConnection c = this.source; /* 解析 SQL 類型 */ int rs = ServerParse.parse(sql); int sqlType = rs & 0xff; switch (sqlType) { // explain2 datanode=? sql=? case ServerParse.EXPLAIN2: Explain2Handler.handle(sql, c, rs >>> 8); break; case ServerParse.SELECT: SelectHandler.handle(sql, c, rs >>> 8); break; case ... default: if (readOnly) { LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString()); c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly"); break; } c.execute(sql, rs & 0xff); } }
-
例如
sqlType == ServerParse.SELECT
時使用SelectHandler
做進一步處理public static void handle(String stmt, ServerConnection c, int offs) { int offset = offs; c.setExecuteSql(null); switch (ServerParseSelect.parse(stmt, offs)) { case ServerParseSelect.DATABASE: SelectDatabase.response(c); break; case ServerParseSelect.USER: SelectUser.response(c); break; case ... default: c.setExecuteSql(stmt); c.execute(stmt, ServerParse.SELECT); } }
-
SelectHandler
進一步解析select
語句讶凉,針對不同的select
進行不同的處理染乌,默認直接調(diào)用ServerConnection#execute(java.lang.String, int)
,該方法首先進行一些常規(guī)檢查(連接狀態(tài)檢查懂讯、事務(wù)狀態(tài)檢查荷憋、當前 DB 檢查等),然后調(diào)用ServerConnection#routeEndExecuteSQL
進行路由計算(包括全局序列號褐望、SQL 語句攔截等勒庄。路由計算詳細另述)并得到路由結(jié)果RouteResultset
,之后調(diào)用NonBlockingSession#execute
public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) { // 路由計算 RouteResultset rrs = MycatServer .getInstance() .getRouterservice() .route(MycatServer.getInstance().getConfig().getSystem(), schema, type, sql, this.charset, this); if (rrs != null) { // session 執(zhí)行 session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type); } }
-
NonBlockingSession#execute
獲取路由的dataNode
節(jié)點瘫里,若節(jié)點數(shù)為 1 則調(diào)用SingleNodeHandler#execute
處理 sql实蔽,否則調(diào)用MultiNodeQueryHandler#execute
處理 SQL。此處我們假定前端 SQL 命令只路由到一個 dataNode谨读,則調(diào)用SingleNodeHandler#execute
處理 SQL/** * NonBlockingSession#execute */ @Override public void execute(RouteResultset rrs, int type) { RouteResultsetNode[] nodes = rrs.getNodes(); if (nodes.length == 1) { singleNodeHandler = new SingleNodeHandler(rrs, this); singleNodeHandler.execute(); } else { multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit, this); multiNodeHandler.execute(); } }
-
SingleNodeHandler#execute
獲取后端連接BackendConnection
盐须,并調(diào)用SingleNodeHandler#_execute
,該方法直接調(diào)用BackendConnection#execute
public void execute() throws Exception { // 獲取后端數(shù)據(jù)庫連接 final BackendConnection conn = session.getTarget(node); // 若存在 dataNode 對應(yīng)的 BackendConnection if (session.tryExistsCon(conn, node)) { _execute(conn); } else { // create new connection do something... } } private void _execute(BackendConnection conn) { conn.execute(node, session.getSource(), session.getSource().isAutocommit()); }
-
當 schema.xml 中配置
<dataHost>
的dbDriver=='jdbc'
時漆腌,調(diào)用JDBCConnection#execute
處理 SQL(JDBCConnection
繼承BackendConnection
)。該方法新開一個線程處理 SQL阶冈,最終調(diào)用JDBCConnection#ouputResultSet
執(zhí)行 SQL 并將結(jié)果寫入ServerConnection
// JDBCConnection.class @Override public void execute(final RouteResultsetNode node, final ServerConnection source, final boolean autocommit) { this.sqlSelectLimit = source.getSqlSelectLimit(); Runnable runnable = new Runnable() { @Override public void run() { // 調(diào)用 JDBCConnection#ouputResultSet executeSQL(node, source, autocommit); } }; MycatServer.getInstance().getBusinessExecutor().execute(runnable); }
-
JDBCConnection#ouputResultSet
獲取數(shù)據(jù)庫連接并執(zhí)行 SQL闷尿,然后將得到的結(jié)果集ResultSet
解析為 ResultSet 響應(yīng)報文并寫入ServerConnection
private void ouputResultSet(ServerConnection sc, String sql) throws SQLException { ResultSet rs = null; Statement stmt = null; try { stmt = con.createStatement(); rs = stmt.executeQuery(sql); List<FieldPacket> fieldPks = new LinkedList<FieldPacket>(); // 根據(jù) resultset 加載列信息,保存至 fieldPks ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs, this.isSpark); // 獲取列數(shù) int colunmCount = fieldPks.size(); ByteBuffer byteBuf = sc.allocate(); /* 1 寫入 resultset header packet */ ResultSetHeaderPacket headerPkg = new ResultSetHeaderPacket(); headerPkg.fieldCount = fieldPks.size(); headerPkg.packetId = ++packetId; // 將 ResultSetHeaderPacket 的數(shù)據(jù)寫入 byteBuf byteBuf = headerPkg.write(byteBuf, sc, true); byteBuf.flip(); byte[] header = new byte[byteBuf.limit()]; // 將 byteBuf 中的信息寫入 header 中 byteBuf.get(header); // byteBuf 標記歸位 byteBuf.clear(); /* 2 寫入 field packet */ List<byte[]> fields = new ArrayList<byte[]>(fieldPks.size()); Iterator<FieldPacket> itor = fieldPks.iterator(); while (itor.hasNext()) { FieldPacket curField = itor.next(); curField.packetId = ++packetId; // 將 FieldPacket 的數(shù)據(jù)寫入 byteBuf byteBuf = curField.write(byteBuf, sc, false); // position 設(shè)回 0女坑,并將 limit 設(shè)成之前的 position 的值 // limit:緩沖區(qū)數(shù)組中不可操作的下一個元素的位置:limit<=capacity byteBuf.flip(); byte[] field = new byte[byteBuf.limit()]; // 將 byteBuf 中的信息寫入 field 中 byteBuf.get(field); byteBuf.clear(); // 將 field 放入 fields fields.add(field); } /* 3 寫入 EOF packet */ EOFPacket eofPckg = new EOFPacket(); eofPckg.packetId = ++packetId; // 將 EOFPacket 的數(shù)據(jù)寫入 byteBuf byteBuf = eofPckg.write(byteBuf, sc, false); byteBuf.flip(); byte[] eof = new byte[byteBuf.limit()]; // 將 byteBuf 中的信息寫入 eof 中 byteBuf.get(eof); byteBuf.clear(); this.respHandler.fieldEofResponse(header, fields, eof, this); /* 4 寫入 Row Data packet */ // output row while (rs.next()) { ResultSetMetaData resultSetMetaData = rs.getMetaData(); int size = resultSetMetaData.getColumnCount(); StringBuilder builder = new StringBuilder(); for (int i = 1; i <= size; i++) { builder.append(resultSetMetaData.getColumnName(i) + "=" + rs.getString(i)); if (i < size) { builder.append(", "); } } LOGGER.debug("JDBCConnection.ouputResultSet sql: {}, resultSet: {}", sql, builder.toString()); RowDataPacket curRow = new RowDataPacket(colunmCount); for (int i = 0; i < colunmCount; i++) { int j = i + 1; if (MysqlDefs.isBianry((byte) fieldPks.get(i).type)) { curRow.add(rs.getBytes(j)); } else if (fieldPks.get(i).type == MysqlDefs.FIELD_TYPE_DECIMAL || fieldPks.get(i).type == (MysqlDefs.FIELD_TYPE_NEW_DECIMAL - 256)) { // field type is unsigned byte // ensure that do not use scientific notation format BigDecimal val = rs.getBigDecimal(j); curRow.add(StringUtil.encode(val != null ? val.toPlainString() : null, sc.getCharset())); } else { curRow.add(StringUtil.encode(rs.getString(j), sc.getCharset())); } } curRow.packetId = ++packetId; // 將 RowDataPacket 的數(shù)據(jù)寫入 byteBuf byteBuf = curRow.write(byteBuf, sc, false); byteBuf.flip(); byte[] row = new byte[byteBuf.limit()]; byteBuf.get(row); byteBuf.clear(); this.respHandler.rowResponse(row, this); } fieldPks.clear(); // end row /* 5 寫入 EOF packet */ eofPckg = new EOFPacket(); eofPckg.packetId = ++packetId; byteBuf = eofPckg.write(byteBuf, sc, false); byteBuf.flip(); eof = new byte[byteBuf.limit()]; byteBuf.get(eof); sc.recycle(byteBuf); this.respHandler.rowEofResponse(eof, this); } finally { if (rs != null) { try { rs.close(); } catch (SQLException e) { } } if (stmt != null) { try { stmt.close(); } catch (SQLException e) { } } } }