以官方例子如下,調(diào)試sharding-sphere代碼:
public static void main(final String[] args) throws SQLException {
DataSource dataSource = getShardingDataSource();
dropTable(dataSource);
createTable(dataSource);
insert(dataSource);
updateFailure(dataSource);
}
可以看到端朵,首先獲取數(shù)據(jù)源連接池好芭,然后執(zhí)行drop語句,創(chuàng)建表冲呢,插入數(shù)據(jù)舍败,再修改。在獲取數(shù)據(jù)源的時候,實(shí)質(zhì)初始化的是sharding-sphere的數(shù)據(jù)源邻薯。
private static DataSource getShardingDataSource() throws SQLException {
ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
TableRuleConfiguration orderTableRuleConfig = new TableRuleConfiguration();
orderTableRuleConfig.setLogicTable("t_order");
orderTableRuleConfig.setActualDataNodes("ds_trans_${0..1}.t_order_${0..1}");
shardingRuleConfig.getTableRuleConfigs().add(orderTableRuleConfig);
TableRuleConfiguration orderItemTableRuleConfig = new TableRuleConfiguration();
orderItemTableRuleConfig.setLogicTable("t_order_item");
orderItemTableRuleConfig.setActualDataNodes("ds_trans_${0..1}.t_order_item_${0..1}");
shardingRuleConfig.getTableRuleConfigs().add(orderItemTableRuleConfig);
shardingRuleConfig.getBindingTableGroups().add("t_order, t_order_item");
shardingRuleConfig.setDefaultDatabaseShardingStrategyConfig(new StandardShardingStrategyConfiguration("user_id", new ModuloShardingAlgorithm()));
shardingRuleConfig.setDefaultTableShardingStrategyConfig(new StandardShardingStrategyConfiguration("order_id", new ModuloShardingAlgorithm()));
return ShardingDataSourceFactory.createDataSource(createDataSourceMap(), shardingRuleConfig, new HashMap<String, Object>(), new Properties());
}
}
public final class ShardingDataSourceFactory {
public static DataSource createDataSource(Map<String, DataSource> dataSourceMap, ShardingRuleConfiguration shardingRuleConfig, Map<String, Object> configMap, Properties props) throws SQLException {
return new ShardingDataSource(dataSourceMap, new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), configMap, props);
}
private ShardingDataSourceFactory() {
}
}
可以看到裙戏,最終初始化的是ShardingDataSource數(shù)據(jù)源,該數(shù)據(jù)源實(shí)現(xiàn)了datasource接口弛说,最終執(zhí)行邏輯挽懦,sql詞法分析,sql語法分析和jdbc強(qiáng)行扯上了不明不白的關(guān)系木人。如圖:
再看drop語句信柿,實(shí)質(zhì)是執(zhí)行了update語句。
private static void dropTable(final DataSource dataSource) throws SQLException {
executeUpdate(dataSource, "DROP TABLE IF EXISTS t_order_item");
executeUpdate(dataSource, "DROP TABLE IF EXISTS t_order");
}
![statement.png](https://upload-images.jianshu.io/upload_images/3397380-7132d7299fd9ef5b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
private static void executeUpdate(final DataSource dataSource, final String sql) throws SQLException {
try (
Connection conn = dataSource.getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
preparedStatement.executeUpdate();
}
}
public ShardingConnection getConnection() {
return new ShardingConnection(this.shardingContext);
}
這里拿到的connection是ShardingConnection語句醒第,connections中實(shí)質(zhì)是ShardingConnection渔嚷。
而preparedStatement對象,則為ShardingPreparedStatement稠曼。
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) {
return new ShardingPreparedStatement(this, sql, autoGeneratedKeys);
}
從類圖來看形病,可以看出,sharding-sphere是重寫了jdbc接口霞幅,包含datasource接口漠吻,connection接口,preparedStatement接口司恳。
而在執(zhí)行sql的時候途乃,則是調(diào)用ShardingPreparedStatement的executeUpdate方法,如下:
@Override
public int executeUpdate() throws SQLException {
try {
Collection<PreparedStatementUnit> preparedStatementUnits = route();
return new PreparedStatementExecutor(
getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits).executeUpdate();
} finally {
if (routeResult != null && connection != null) {
JDBCShardingRefreshHandler.build(routeResult, connection).execute();
}
clearBatch();
}
}
可以看到扔傅,先做sql路由耍共,獲取sql執(zhí)行單元,然后new一個執(zhí)行器去執(zhí)行猎塞,在獲取執(zhí)行單元的時候试读,首先通過sql路由引擎做服務(wù)路由,獲取sql執(zhí)行單元荠耽,遍歷并組裝參數(shù)钩骇,返回執(zhí)行引擎單元,替代占位符铝量,并返回伊履,交由sql執(zhí)行器去執(zhí)行。
private Collection<PreparedStatementUnit> route() throws SQLException {
Collection<PreparedStatementUnit> result = new LinkedList<>();
routeResult = routingEngine.route(getParameters());
for (SQLExecutionUnit each : routeResult.getExecutionUnits()) {
PreparedStatement preparedStatement = generatePreparedStatement(each);
routedStatements.add(preparedStatement);
replaySetParameter(preparedStatement, each.getSqlUnit().getParameterSets().get(0));
result.add(new PreparedStatementUnit(each, preparedStatement));
}
return result;
}
public int executeUpdate() throws SQLException {
List<Integer> results = executorEngine.execute(sqlType, preparedStatementUnits, new ExecuteCallback<Integer>() {
@Override
public Integer execute(final BaseStatementUnit baseStatementUnit) throws Exception {
return ((PreparedStatement) baseStatementUnit.getStatement()).executeUpdate();
}
});
return accumulate(results);
}
sql執(zhí)行引擎在執(zhí)行的過程中款违,遍歷執(zhí)行單元,分別在不同的數(shù)據(jù)庫中執(zhí)行群凶,最終合并結(jié)果集插爹,返回結(jié)果。
public <T> List<T> execute(
final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) throws SQLException {
//異步執(zhí)行
ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), executeCallback);
T firstOutput;
List<T> restOutputs;
try {
firstOutput = syncExecute(sqlType, firstInput, executeCallback);
restOutputs = restFutures.get();
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
event.setException(ex);
event.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
EventBusInstance.getInstance().post(event);
ExecutorExceptionHandler.handleException(ex);
return null;
}
event.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
EventBusInstance.getInstance().post(event);
List<T> result = Lists.newLinkedList(restOutputs);
result.add(0, firstOutput);
return result;
}
在異步執(zhí)行的時候,實(shí)質(zhì)是多線程編程赠尾,future等待力穗,最后合并結(jié)果。
private <T> ListenableFuture<List<T>> asyncExecute(
final SQLType sqlType, final Collection<BaseStatementUnit> baseStatementUnits, final ExecuteCallback<T> executeCallback) {
List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size());
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
for (final BaseStatementUnit each : baseStatementUnits) {
result.add(executorService.submit(new Callable<T>() {
@Override
public T call() throws Exception {
return executeInternal(sqlType, each, executeCallback, isExceptionThrown, dataMap);
}
}));
}
return Futures.allAsList(result);
}
private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final ExecuteCallback<T> executeCallback,
final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception {
synchronized (baseStatementUnit.getStatement().getConnection()) {
T result;
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
List<AbstractExecutionEvent> events = new LinkedList<>();
for (List<Object> each : baseStatementUnit.getSqlExecutionUnit().getSqlUnit().getParameterSets()) {
events.add(getExecutionEvent(sqlType, baseStatementUnit, each));
}
for (AbstractExecutionEvent event : events) {
EventBusInstance.getInstance().post(event);
}
try {
result = executeCallback.execute(baseStatementUnit);
} catch (final SQLException ex) {
for (AbstractExecutionEvent each : events) {
each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
each.setException(ex);
EventBusInstance.getInstance().post(each);
ExecutorExceptionHandler.handleException(ex);
}
return null;
}
for (AbstractExecutionEvent each : events) {
each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
EventBusInstance.getInstance().post(each);
}
return result;
}
}
fyi