三桐磁、MyCat 客戶端 SQL 請求執(zhí)行流程

  • RW#run 輪詢注冊隊列中是否有 AbstractConnection士飒,若存在且為讀事件則調(diào)用 AbstractConnection#asynRead 異步讀取數(shù)據(jù)挽霉,實際處理邏輯見 NIOSocketWR#asynRead

  • NIOSocketWR#asynRead 從 前端連接的 channel 中讀取數(shù)據(jù),并且保存到對應(yīng) AbstractConnectionreadBuffer 中变汪,之后調(diào)用 AbstractConnection#onReadData 處理讀取到的數(shù)據(jù)

    @Override
    public void asynRead() throws IOException {
       ByteBuffer theBuffer = con.readBuffer;
       if (theBuffer == null) {
          theBuffer = con.processor.getBufferPool().allocate(con.processor.getBufferPool().getChunkSize());
          con.readBuffer = theBuffer;
       }
       // 從 SocketChannel 中讀取數(shù)據(jù)侠坎,并且保存到 AbstractConnection 的 readBuffer 中,readBuffer 處于 write mode裙盾,返回讀取了多少字節(jié)
       int got = channel.read(theBuffer);
       // 調(diào)用處理讀取到的數(shù)據(jù)的方法
       con.onReadData(got);
    }
    
  • AbstractConnection#onReadData 讀取 readBuffer 中的數(shù)據(jù)并調(diào)用 AbstractConnection#handle 方法進行下一步處理实胸,其內(nèi)部調(diào)用 FrontendCommandHandler#handle

  • FrontendCommandHandler#handle 根據(jù) data[4] 來判斷命令類型,客戶端命令請求報文格式如下圖:

data 的第五個字節(jié)存儲命令類型番官,客戶端命令請求報文命令類型詳情表見附錄1庐完。我們以 MySQLPacket.COM_QUERY 為例進行接下來的討論。當 data[4] == MySQLPacket.COM_QUERY 時徘熔,調(diào)用 FrontendConnection#query(byte[])

public void handle(byte[] data) {
    // 判斷命令類型
    switch (data[4]) {
        ...
        // INSERT/SELECT/UPDATE/DELETE 等 SQL 歸屬于 MySQLPacket.COM_QUERY
        case MySQLPacket.COM_QUERY:
            commands.doQuery();
            source.query(data);
            break;
      ...
    }
}
  • FrontendConnection#query(byte[])data 字節(jié)數(shù)組轉(zhuǎn)化成 String 類型的 SQL门躯, ServerQueryHandler#query(String) 方法

    public void query(byte[] data) {  
        MySQLMessage mm = new MySQLMessage(data);
        // 從 data[5] 即第六個字節(jié)開始讀取參數(shù)體
        mm.position(5);
        String sql = mm.readString(charset);
        // 執(zhí)行 sql 語句,內(nèi)部調(diào)用 ServerQueryHandler#query(String)
        this.query( sql );
    }
    
  • ServerQueryHandler#query(String) 解析 SQL 類型酷师,根據(jù) sqlType 使用不同的 Handler 做處理

    @Override
    public void query(String sql) {
        ServerConnection c = this.source;
        /* 解析 SQL 類型 */
        int rs = ServerParse.parse(sql);
        int sqlType = rs & 0xff;
        
        switch (sqlType) {
            // explain2 datanode=? sql=?
            case ServerParse.EXPLAIN2:
                Explain2Handler.handle(sql, c, rs >>> 8);
                break;
            case ServerParse.SELECT:
                SelectHandler.handle(sql, c, rs >>> 8);
                break;
            case ...
            default:
                if (readOnly) {
                    LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString());
                    c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly");
                    break;
                }
                c.execute(sql, rs & 0xff);
        }
    }
    
  • 例如 sqlType == ServerParse.SELECT 時使用 SelectHandler 做進一步處理

    public static void handle(String stmt, ServerConnection c, int offs) {
        int offset = offs;
        c.setExecuteSql(null);
        switch (ServerParseSelect.parse(stmt, offs)) {
        case ServerParseSelect.DATABASE:
            SelectDatabase.response(c);
            break;
        case ServerParseSelect.USER:
            SelectUser.response(c);
            break;
        case ...
        default:
            c.setExecuteSql(stmt);
            c.execute(stmt, ServerParse.SELECT);
        }
    }
    
  • SelectHandler 進一步解析 select 語句讶凉,針對不同的 select 進行不同的處理染乌,默認直接調(diào)用 ServerConnection#execute(java.lang.String, int) ,該方法首先進行一些常規(guī)檢查(連接狀態(tài)檢查懂讯、事務(wù)狀態(tài)檢查荷憋、當前 DB 檢查等),然后調(diào)用 ServerConnection#routeEndExecuteSQL 進行路由計算(包括全局序列號褐望、SQL 語句攔截等勒庄。路由計算詳細另述)并得到路由結(jié)果 RouteResultset,之后調(diào)用 NonBlockingSession#execute

    public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) {
        // 路由計算
        RouteResultset rrs = MycatServer
                    .getInstance()
                    .getRouterservice()
                    .route(MycatServer.getInstance().getConfig().getSystem(), schema, type, sql, this.charset, this);
        if (rrs != null) {
            // session 執(zhí)行
            session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type);
        }
    }
    
  • NonBlockingSession#execute 獲取路由的 dataNode 節(jié)點瘫里,若節(jié)點數(shù)為 1 則調(diào)用 SingleNodeHandler#execute 處理 sql实蔽,否則調(diào)用 MultiNodeQueryHandler#execute 處理 SQL。此處我們假定前端 SQL 命令只路由到一個 dataNode谨读,則調(diào)用 SingleNodeHandler#execute 處理 SQL

    /**
     * NonBlockingSession#execute
     */
    @Override
    public void execute(RouteResultset rrs, int type) {
        RouteResultsetNode[] nodes = rrs.getNodes();
        if (nodes.length == 1) {
            singleNodeHandler = new SingleNodeHandler(rrs, this);
            singleNodeHandler.execute();
        } else {
            multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit, this);
            multiNodeHandler.execute();
        }
    }
    
  • SingleNodeHandler#execute 獲取后端連接 BackendConnection盐须,并調(diào)用 SingleNodeHandler#_execute,該方法直接調(diào)用 BackendConnection#execute

    public void execute() throws Exception {
        // 獲取后端數(shù)據(jù)庫連接
        final BackendConnection conn = session.getTarget(node);
        // 若存在 dataNode 對應(yīng)的 BackendConnection
        if (session.tryExistsCon(conn, node)) {
            _execute(conn);
        } else {
            // create new connection
              do something...
        }
    }
    
    private void _execute(BackendConnection conn) {
        conn.execute(node, session.getSource(), session.getSource().isAutocommit());
    }
    
  • 當 schema.xml 中配置 <dataHost>dbDriver=='jdbc' 時漆腌,調(diào)用 JDBCConnection#execute 處理 SQL( JDBCConnection 繼承 BackendConnection)。該方法新開一個線程處理 SQL阶冈,最終調(diào)用 JDBCConnection#ouputResultSet 執(zhí)行 SQL 并將結(jié)果寫入 ServerConnection

    // JDBCConnection.class
    @Override
    public void execute(final RouteResultsetNode node, final ServerConnection source, final boolean autocommit) {
        this.sqlSelectLimit = source.getSqlSelectLimit();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                // 調(diào)用 JDBCConnection#ouputResultSet
                executeSQL(node, source, autocommit);
            }
        };
        MycatServer.getInstance().getBusinessExecutor().execute(runnable);
    }
    
  • JDBCConnection#ouputResultSet 獲取數(shù)據(jù)庫連接并執(zhí)行 SQL闷尿,然后將得到的結(jié)果集 ResultSet 解析為 ResultSet 響應(yīng)報文并寫入 ServerConnection

    private void ouputResultSet(ServerConnection sc, String sql) throws SQLException {
        ResultSet rs = null;
        Statement stmt = null;
        
        try {
            stmt = con.createStatement();
            rs = stmt.executeQuery(sql);
    
            List<FieldPacket> fieldPks = new LinkedList<FieldPacket>();
            // 根據(jù) resultset 加載列信息,保存至 fieldPks
            ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs,
                    this.isSpark);
            // 獲取列數(shù)
            int colunmCount = fieldPks.size();
            ByteBuffer byteBuf = sc.allocate();
    
            /* 1 寫入 resultset header packet */
            ResultSetHeaderPacket headerPkg = new ResultSetHeaderPacket();
            headerPkg.fieldCount = fieldPks.size();
            headerPkg.packetId = ++packetId;
            // 將 ResultSetHeaderPacket 的數(shù)據(jù)寫入 byteBuf
            byteBuf = headerPkg.write(byteBuf, sc, true);
            byteBuf.flip();
            byte[] header = new byte[byteBuf.limit()];
            // 將 byteBuf 中的信息寫入 header 中
            byteBuf.get(header);
            // byteBuf 標記歸位
            byteBuf.clear();
    
            /* 2 寫入 field packet */
            List<byte[]> fields = new ArrayList<byte[]>(fieldPks.size());
            Iterator<FieldPacket> itor = fieldPks.iterator();
            while (itor.hasNext()) {
                FieldPacket curField = itor.next();
                curField.packetId = ++packetId;
                // 將 FieldPacket 的數(shù)據(jù)寫入 byteBuf
                byteBuf = curField.write(byteBuf, sc, false);
                // position 設(shè)回 0女坑,并將 limit 設(shè)成之前的 position 的值
                // limit:緩沖區(qū)數(shù)組中不可操作的下一個元素的位置:limit<=capacity
                byteBuf.flip();
                byte[] field = new byte[byteBuf.limit()];
                // 將 byteBuf 中的信息寫入 field 中
                byteBuf.get(field);
                byteBuf.clear();
                // 將 field 放入 fields
                fields.add(field);
            }
            /* 3 寫入 EOF packet */
            EOFPacket eofPckg = new EOFPacket();
            eofPckg.packetId = ++packetId;
            // 將 EOFPacket 的數(shù)據(jù)寫入 byteBuf
            byteBuf = eofPckg.write(byteBuf, sc, false);
            byteBuf.flip();
            byte[] eof = new byte[byteBuf.limit()];
            // 將 byteBuf 中的信息寫入 eof 中
            byteBuf.get(eof);
            byteBuf.clear();
            this.respHandler.fieldEofResponse(header, fields, eof, this);
    
            /* 4 寫入 Row Data packet */
            // output row
            while (rs.next()) {
                ResultSetMetaData resultSetMetaData = rs.getMetaData();
                int size = resultSetMetaData.getColumnCount();
                StringBuilder builder = new StringBuilder();
                for (int i = 1; i <= size; i++) {
                    builder.append(resultSetMetaData.getColumnName(i) + "=" + rs.getString(i));
                    if (i < size) {
                        builder.append(", ");
                    }
                }
                LOGGER.debug("JDBCConnection.ouputResultSet sql: {}, resultSet: {}", sql, builder.toString());
                RowDataPacket curRow = new RowDataPacket(colunmCount);
                for (int i = 0; i < colunmCount; i++) {
                    int j = i + 1;
                    if (MysqlDefs.isBianry((byte) fieldPks.get(i).type)) {
                        curRow.add(rs.getBytes(j));
                    } else if (fieldPks.get(i).type == MysqlDefs.FIELD_TYPE_DECIMAL ||
                            fieldPks.get(i).type == (MysqlDefs.FIELD_TYPE_NEW_DECIMAL - 256)) { // field type is unsigned byte
                        // ensure that do not use scientific notation format
                        BigDecimal val = rs.getBigDecimal(j);
                        curRow.add(StringUtil.encode(val != null ? val.toPlainString() : null,
                                sc.getCharset()));
                    } else {
                        curRow.add(StringUtil.encode(rs.getString(j),
                                sc.getCharset()));
                    }
    
                }
                curRow.packetId = ++packetId;
                // 將 RowDataPacket 的數(shù)據(jù)寫入 byteBuf
                byteBuf = curRow.write(byteBuf, sc, false);
                byteBuf.flip();
                byte[] row = new byte[byteBuf.limit()];
                byteBuf.get(row);
                byteBuf.clear();
                this.respHandler.rowResponse(row, this);
            }
    
            fieldPks.clear();
    
            // end row
            /* 5 寫入 EOF packet */
            eofPckg = new EOFPacket();
            eofPckg.packetId = ++packetId;
            byteBuf = eofPckg.write(byteBuf, sc, false);
            byteBuf.flip();
            eof = new byte[byteBuf.limit()];
            byteBuf.get(eof);
            sc.recycle(byteBuf);
            this.respHandler.rowEofResponse(eof, this);
        } finally {
            if (rs != null) {
                try {
                    rs.close();
                } catch (SQLException e) {
    
                }
            }
            if (stmt != null) {
                try {
                    stmt.close();
                } catch (SQLException e) {
    
                }
            }
        }
    }
    
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末填具,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子匆骗,更是在濱河造成了極大的恐慌劳景,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件碉就,死亡現(xiàn)場離奇詭異盟广,居然都是意外死亡,警方通過查閱死者的電腦和手機瓮钥,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進店門筋量,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人碉熄,你說我怎么就攤上這事桨武。” “怎么了锈津?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵呀酸,是天一觀的道長。 經(jīng)常有香客問我琼梆,道長性誉,這世上最難降的妖魔是什么窿吩? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮艾栋,結(jié)果婚禮上爆存,老公的妹妹穿的比我還像新娘。我一直安慰自己蝗砾,他們只是感情好先较,可當我...
    茶點故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著悼粮,像睡著了一般闲勺。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上扣猫,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天菜循,我揣著相機與錄音,去河邊找鬼申尤。 笑死癌幕,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的昧穿。 我是一名探鬼主播勺远,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼时鸵!你這毒婦竟也來了胶逢?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤饰潜,失蹤者是張志新(化名)和其女友劉穎初坠,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體彭雾,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡碟刺,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了薯酝。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片南誊。...
    茶點故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖蜜托,靈堂內(nèi)的尸體忽然破棺而出抄囚,到底是詐尸還是另有隱情,我是刑警寧澤橄务,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布幔托,位于F島的核電站,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏重挑。R本人自食惡果不足惜嗓化,卻給世界環(huán)境...
    茶點故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望谬哀。 院中可真熱鬧刺覆,春花似錦、人聲如沸史煎。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽篇梭。三九已至氢橙,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間恬偷,已是汗流浹背悍手。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留袍患,地道東北人坦康。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓,卻偏偏與公主長得像诡延,于是被迫代替她去往敵國和親滞欠。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,033評論 2 355