sharding-sphere之sql執(zhí)行那些事

以官方例子如下,調(diào)試sharding-sphere代碼:

    public static void main(final String[] args) throws SQLException {
        DataSource dataSource = getShardingDataSource();
        dropTable(dataSource);
        createTable(dataSource);
        insert(dataSource);
        updateFailure(dataSource);
    }

可以看到端朵,首先獲取數(shù)據(jù)源連接池好芭,然后執(zhí)行drop語句,創(chuàng)建表冲呢,插入數(shù)據(jù)舍败,再修改。在獲取數(shù)據(jù)源的時候,實(shí)質(zhì)初始化的是sharding-sphere的數(shù)據(jù)源邻薯。

 private static DataSource getShardingDataSource() throws SQLException {
        ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
        TableRuleConfiguration orderTableRuleConfig = new TableRuleConfiguration();
        orderTableRuleConfig.setLogicTable("t_order");
        orderTableRuleConfig.setActualDataNodes("ds_trans_${0..1}.t_order_${0..1}");
        shardingRuleConfig.getTableRuleConfigs().add(orderTableRuleConfig);
        
        TableRuleConfiguration orderItemTableRuleConfig = new TableRuleConfiguration();
        orderItemTableRuleConfig.setLogicTable("t_order_item");
        orderItemTableRuleConfig.setActualDataNodes("ds_trans_${0..1}.t_order_item_${0..1}");
        shardingRuleConfig.getTableRuleConfigs().add(orderItemTableRuleConfig);
        
        shardingRuleConfig.getBindingTableGroups().add("t_order, t_order_item");
        shardingRuleConfig.setDefaultDatabaseShardingStrategyConfig(new StandardShardingStrategyConfiguration("user_id", new ModuloShardingAlgorithm()));
        shardingRuleConfig.setDefaultTableShardingStrategyConfig(new StandardShardingStrategyConfiguration("order_id", new ModuloShardingAlgorithm()));
        return ShardingDataSourceFactory.createDataSource(createDataSourceMap(), shardingRuleConfig, new HashMap<String, Object>(), new Properties());
    }
}
public final class ShardingDataSourceFactory {
    public static DataSource createDataSource(Map<String, DataSource> dataSourceMap, ShardingRuleConfiguration shardingRuleConfig, Map<String, Object> configMap, Properties props) throws SQLException {
        return new ShardingDataSource(dataSourceMap, new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), configMap, props);
    }

    private ShardingDataSourceFactory() {
    }
}

可以看到裙戏,最終初始化的是ShardingDataSource數(shù)據(jù)源,該數(shù)據(jù)源實(shí)現(xiàn)了datasource接口弛说,最終執(zhí)行邏輯挽懦,sql詞法分析,sql語法分析和jdbc強(qiáng)行扯上了不明不白的關(guān)系木人。如圖:

datasource.png

sharding-sphere.png

再看drop語句信柿,實(shí)質(zhì)是執(zhí)行了update語句。

    private static void dropTable(final DataSource dataSource) throws SQLException {
        executeUpdate(dataSource, "DROP TABLE IF EXISTS t_order_item");
        executeUpdate(dataSource, "DROP TABLE IF EXISTS t_order");
    }
![statement.png](https://upload-images.jianshu.io/upload_images/3397380-7132d7299fd9ef5b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
private static void executeUpdate(final DataSource dataSource, final String sql) throws SQLException {
        try (
                Connection conn = dataSource.getConnection();
                PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
            preparedStatement.executeUpdate();
        }
}
public ShardingConnection getConnection() {
    return new ShardingConnection(this.shardingContext);
}

這里拿到的connection是ShardingConnection語句醒第,connections中實(shí)質(zhì)是ShardingConnection渔嚷。
而preparedStatement對象,則為ShardingPreparedStatement稠曼。

public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) {
   return new ShardingPreparedStatement(this, sql, autoGeneratedKeys);
}

statement.png

connnection.png

statement.png

從類圖來看形病,可以看出,sharding-sphere是重寫了jdbc接口霞幅,包含datasource接口漠吻,connection接口,preparedStatement接口司恳。
而在執(zhí)行sql的時候途乃,則是調(diào)用ShardingPreparedStatementexecuteUpdate方法,如下:

@Override
public int executeUpdate() throws SQLException {
    try {
        Collection<PreparedStatementUnit> preparedStatementUnits = route();
        return new PreparedStatementExecutor(
                getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits).executeUpdate();
    } finally {
        if (routeResult != null && connection != null) {
            JDBCShardingRefreshHandler.build(routeResult, connection).execute();
        }
        clearBatch();
    }
}

可以看到扔傅,先做sql路由耍共,獲取sql執(zhí)行單元,然后new一個執(zhí)行器去執(zhí)行猎塞,在獲取執(zhí)行單元的時候试读,首先通過sql路由引擎做服務(wù)路由,獲取sql執(zhí)行單元荠耽,遍歷并組裝參數(shù)钩骇,返回執(zhí)行引擎單元,替代占位符铝量,并返回伊履,交由sql執(zhí)行器去執(zhí)行。

private Collection<PreparedStatementUnit> route() throws SQLException {
    Collection<PreparedStatementUnit> result = new LinkedList<>();
    routeResult = routingEngine.route(getParameters());
    for (SQLExecutionUnit each : routeResult.getExecutionUnits()) {
        PreparedStatement preparedStatement = generatePreparedStatement(each);
        routedStatements.add(preparedStatement);
        replaySetParameter(preparedStatement, each.getSqlUnit().getParameterSets().get(0));
        result.add(new PreparedStatementUnit(each, preparedStatement));
    }
    return result;
}
public int executeUpdate() throws SQLException {
    List<Integer> results = executorEngine.execute(sqlType, preparedStatementUnits, new ExecuteCallback<Integer>() {
        
        @Override
        public Integer execute(final BaseStatementUnit baseStatementUnit) throws Exception {
            return ((PreparedStatement) baseStatementUnit.getStatement()).executeUpdate();
        }
    });
    return accumulate(results);
}

sql執(zhí)行引擎在執(zhí)行的過程中款违,遍歷執(zhí)行單元,分別在不同的數(shù)據(jù)庫中執(zhí)行群凶,最終合并結(jié)果集插爹,返回結(jié)果。

public <T> List<T> execute(
        final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws SQLException {
    //異步執(zhí)行
    ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), executeCallback);
    T firstOutput;
    List<T> restOutputs;
    try {
        firstOutput = syncExecute(sqlType, firstInput, executeCallback);
        restOutputs = restFutures.get();
        // CHECKSTYLE:OFF
    } catch (final Exception ex) {
        // CHECKSTYLE:ON
        event.setException(ex);
        event.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
        EventBusInstance.getInstance().post(event);
        ExecutorExceptionHandler.handleException(ex);
        return null;
    }
    event.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
    EventBusInstance.getInstance().post(event);
    List<T> result = Lists.newLinkedList(restOutputs);
    result.add(0, firstOutput);
    return result;
}

在異步執(zhí)行的時候,實(shí)質(zhì)是多線程編程赠尾,future等待力穗,最后合并結(jié)果。

private <T> ListenableFuture<List<T>> asyncExecute(
        final SQLType sqlType, final Collection<BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) {
    List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size());
    final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
    final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
    for (final BaseStatementUnit each : baseStatementUnits) {
        result.add(executorService.submit(new Callable<T>() {
            
            @Override
            public T call() throws Exception {
                return executeInternal(sqlType, each, executeCallback, isExceptionThrown, dataMap);
            }
        }));
    }
    return Futures.allAsList(result);
}

private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final ExecuteCallback<T> executeCallback,
                              final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception {
    synchronized (baseStatementUnit.getStatement().getConnection()) {
        T result;
        ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
        ExecutorDataMap.setDataMap(dataMap);
        List<AbstractExecutionEvent> events = new LinkedList<>();
        for (List<Object> each : baseStatementUnit.getSqlExecutionUnit().getSqlUnit().getParameterSets()) {
            events.add(getExecutionEvent(sqlType, baseStatementUnit, each));
        }
        for (AbstractExecutionEvent event : events) {
            EventBusInstance.getInstance().post(event);
        }
        try {
            result = executeCallback.execute(baseStatementUnit);
        } catch (final SQLException ex) {
            for (AbstractExecutionEvent each : events) {
                each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
                each.setException(ex);
                EventBusInstance.getInstance().post(each);
                ExecutorExceptionHandler.handleException(ex);
            }
            return null;
        }
        for (AbstractExecutionEvent each : events) {
            each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
            EventBusInstance.getInstance().post(each);
        }
        return result;
    }
}

fyi

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末气嫁,一起剝皮案震驚了整個濱河市当窗,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌寸宵,老刑警劉巖崖面,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異梯影,居然都是意外死亡巫员,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進(jìn)店門甲棍,熙熙樓的掌柜王于貴愁眉苦臉地迎上來简识,“玉大人,你說我怎么就攤上這事感猛∑呷牛” “怎么了?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵陪白,是天一觀的道長颈走。 經(jīng)常有香客問我,道長拷泽,這世上最難降的妖魔是什么疫鹊? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮司致,結(jié)果婚禮上拆吆,老公的妹妹穿的比我還像新娘。我一直安慰自己脂矫,他們只是感情好枣耀,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著庭再,像睡著了一般捞奕。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上拄轻,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天颅围,我揣著相機(jī)與錄音,去河邊找鬼恨搓。 笑死院促,一個胖子當(dāng)著我的面吹牛筏养,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播常拓,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼渐溶,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了弄抬?” 一聲冷哼從身側(cè)響起茎辐,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎掂恕,沒想到半個月后拖陆,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡竹海,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年慕蔚,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片斋配。...
    茶點(diǎn)故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡孔飒,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出艰争,到底是詐尸還是另有隱情坏瞄,我是刑警寧澤,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布甩卓,位于F島的核電站鸠匀,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏逾柿。R本人自食惡果不足惜缀棍,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望机错。 院中可真熱鬧爬范,春花似錦跟伏、人聲如沸役耕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽萧诫。三九已至斥难,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間帘饶,已是汗流浹背哑诊。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留及刻,地道東北人搭儒。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓穷当,卻偏偏與公主長得像,于是被迫代替她去往敵國和親淹禾。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容