系統層級
Sharding-JDBC本質是JDBC的增強持寄,使服務能夠實現數據的分布式存儲效果稍味∧B可查看如何理解ShardingSphere油宜?。深入ShardingSphere之前需要了解其定義的相關基本概念如分片社牲、邏輯表歇盼、物理表、廣播表搞莺、分片算法分類等掂咒,具體可以查看官網绍刮。如下圖所示Sharding-JDBC整體還是屬于數據訪問層的孩革,在數據訪問層中處于ORM框架之下和ORM是完全解耦的膝蜈,所以他是可以完全兼容各種類型的ORM框架饱搏。Sharding-JDBC對jdbc-connector進行了封裝,對其核心的四大對象重新進行了實現备绽,在實現中加入了相關的內核邏輯疯坤,包括:SQL解析压怠、SQL路由菌瘫、 SQL改寫、SQL執(zhí)行雇盖、SQL歸并等核心邏輯崔挖。本文所有分析和文檔基于版本4.1.1
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-jdbc-spring-boot-starter</artifactId>
<version>4.1.1</version>
</dependency>
調用過程
以ShardingStatement
執(zhí)行一次查詢的過程為例分析具體的調用過程。
執(zhí)行的入口都是在ShardingStatement
中脓鹃,StatementExecutor
封裝了SQL解析、重寫的核心過程古沥。MergeEngine
負責統籌結果的合并瘸右,最后返回合并結果在ShardingStatement
中封裝成ShardingResultSet
返回。
JDBC增強
-
java.sql.Wrapper
提供了判斷當前類是否是目標包裝類和反包裝為目標類對象的API岩齿,目的是為了方便調用太颤,將工具類提供的功能放到對象維度去使用,跟設計模式中的包裝器模式沒什么關系纯衍。 -
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.WrapperAdapter
實現了java.sql.Wrapper
的兩個工具方法栋齿,同時增加了兩個記錄調用方法和回放調用方法的API和容器來存儲回放方法列表襟诸。該類重點是增加支持記錄方法和調用方法瓦堵,而非適配。
為什么需要調用方法的記錄和回放歌亲?
針對JDBC四大對象Sharding-JDBC是重新做了封裝菇用,而對實際的四大對象的一些方法調用往往發(fā)生在SQL路由操作完成之后,所以需要提前記錄之后回放陷揪。
- 所有
AbstractUnsupportedXXX
對象代表了對不支持的操作的默認實現(拋異常SQLFeatureNotSupportedException
)惋鸥。
配置
sharding-JDBC的自動化配置類是:
org.apache.shardingsphere.shardingjdbc.spring.boot.SpringBootConfiguration
主要是針對配置參數和不同的場景的數據源進行了配置。DataSource在應用的實例只能存在一份悍缠,不同的場景通過注解@Conditional
的配置判斷不同的配置參數卦绣。
啟動類注解@AutoConfigureBefore(DataSourceAutoConfiguration.class)
表明它啟動在Spring管理的數據源自動化配置DataSourceAutoConfiguration
之前能很好兼容歷史數據源以及配置。
DataSource
抽象數據源適配器對象:
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractDataSourceAdapter
做了兩點適配:
- AutoCloseable的實現飞蚓,在close方法中關閉資源滤港。
- 增加了
org.apache.shardingsphere.shardingjdbc.jdbc.core.context.RuntimeContext
實現類Getter方法,用于支撐相關的JDBC-Sharding操作趴拧。
針對不同場景的數據源都作出了對應的實現溅漾,不同的實現關鍵區(qū)別點主要是在RuntimeContext
的實現不同山叮、不同Connection對象的實現和每個實現靜態(tài)代碼中初始化注冊的裝飾對象的不同。例如:
public class ShardingDataSource extends AbstractDataSourceAdapter {
private final ShardingRuntimeContext runtimeContext;
static {
NewInstanceServiceLoader.register(RouteDecorator.class);
NewInstanceServiceLoader.register(SQLRewriteContextDecorator.class);
NewInstanceServiceLoader.register(ResultProcessEngine.class);
}
//...ignore...
}
public class MasterSlaveDataSource extends AbstractDataSourceAdapter {
private final MasterSlaveRuntimeContext runtimeContext;
static {
NewInstanceServiceLoader.register(RouteDecorator.class);
}
//...ignore...
}
...
這種在靜態(tài)代碼塊中的通過注冊不同的Java SPI實現添履,可以完成對不同的場景的特殊處理屁倔。可以把這種處理邏輯理解成 SPI裝飾層:針對不同業(yè)務場景(主從暮胧、加密锐借、正常分片)的核心邏輯(SQL路由、SQL重新叔壤,結果集歸并)進行的獨立裝飾處理的一層瞎饲,實現方式是Java SPI。
Connection
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractConnectionAdapter
:增加了獲取實際數據庫連接對象的方法炼绘,同時將連接對象的創(chuàng)建留給子類實現。獲取連接列表時針對不同鏈接模式MEMORY_STRICTLY, CONNECTION_STRICTLY
會有不同操作妄田。內存限制模式時鏈接沒有限制可以并發(fā)的請求數據然后在內存中做歸并俺亮,如果一個鏈接一個鏈接的獲取可能存在饑餓等待導致死鎖所以需要加鎖并一次性獲取所有連接適合OLAP業(yè)務。連接限制模式將結果集裝載在內存之后直接釋放資源不需要加鎖疟呐,保證了資源的使用率適合OLTP業(yè)務脚曾。關于鏈接模式的判斷邏輯為每個連接執(zhí)行SQL的數量,執(zhí)行1個為內存限制模式启具,執(zhí)行1個以上為連接限制模式本讥,代碼邏輯如下:
org.apache.shardingsphere.sharding.execute.sql.prepare.SQLExecutePrepareTemplate#getSQLExecuteGroups
private List<InputGroup<StatementExecuteUnit>> getSQLExecuteGroups(final String dataSourceName,
final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {
List<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();
int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1);
List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize);
ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;
List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size());
int count = 0;
for (List<SQLUnit> each : sqlUnitPartitions) {
result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback));
}
return result;
}
org.apache.shardingsphere.underlying.common.hook.SPIRootInvokeHook
:通過SPI實現在鏈接創(chuàng)建和關閉處埋點用于可能的邏輯擴展。
Statement
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractStatementAdapter
增加了如下兩個適配方法鲁冯,其直接子類均是普通Statement對象拷沸。
protected abstract boolean isAccumulate();//用于判斷是否返回累加結果作為更新影響數。
protected abstract Collection<? extends Statement> getRoutedStatements();//獲取路由后的語句對象列表薯演。
預定制的Statement均繼承了org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractShardingPreparedStatementAdapter
:實現了PreparedStatement
相關的參數設置外撞芍,單獨維護了參數設置方法調用的列表并提供了setter和回放調用,主要用于實際SQL執(zhí)行之前跨扮、邏輯SQL路由之后SQL參數的設置序无。sharding-jdbc中相關Statement
實現類是相關內核邏輯執(zhí)行的入口。
ResultSet
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractResultSetAdapter
:僅僅做了ResultSet
的一些抽象實現未做其他適配衡创。ResultSet
的相關實現類針對不同場景結果集做了不同的封裝和實現帝嗡。
內核邏輯
SQL解析
SQL解析是Sharding-JDBC進行路由的開始,主要分為幾個步驟:
- 通過SQL解析器將SQL解析成
AST(抽象語法樹)
- 通過抽取器將語法樹抽取成SQL片段
- 通過填充器將片段拼接成解析結果
- 通過優(yōu)化器輸出最后的結果
相關概念可參考官網
代碼層面解析邏輯由org.apache.shardingsphere.sql.parser.SQLParserEngine#parse
作為入口璃氢,本方法中加入的SQL解析的SPI埋點:org.apache.shardingsphere.sql.parser.hook.SPIParsingHook
org.apache.shardingsphere.sql.parser.SQLParserEngine#parse0
調用對象進行解析并加入了緩存的邏輯哟玷。
public final class SQLParserEngine {
private final String databaseTypeName;
private final SQLParseResultCache cache = new SQLParseResultCache();
/** * Parse SQL. * * @param sql SQL * @param useCache use cache or not * @return SQL statement */
public SQLStatement parse(final String sql, final boolean useCache) {
ParsingHook parsingHook = new SPIParsingHook();
parsingHook.start(sql);
try {
SQLStatement result = parse0(sql, useCache);
parsingHook.finishSuccess(result);
return result;
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
parsingHook.finishFailure(ex);
throw ex;
}
}
private SQLStatement parse0(final String sql, final boolean useCache) {
if (useCache) {
Optional<SQLStatement> cachedSQLStatement = cache.getSQLStatement(sql);
if (cachedSQLStatement.isPresent()) {
return cachedSQLStatement.get();
}
}
ParseTree parseTree = new SQLParserExecutor(databaseTypeName, sql).execute().getRootNode();
SQLStatement result = (SQLStatement) ParseTreeVisitorFactory.newInstance(databaseTypeName, VisitorRule.valueOf(parseTree.getClass())).visit(parseTree);
if (useCache) {
cache.put(sql, result);
}
return result;
}
}
SQL路由
根據不同的場景對SQL的路由也分為不同的方式,參考下圖:
SQL路由
具體參考官網文檔
SQL的解析和路由發(fā)生在Statement
對應SQL執(zhí)行方法exeXXXX
的準備階段拔莱。
- 準備階段的邏輯繼續(xù)分層和下沉首先會到準備引擎
BasePrepareEngine(SimpleQueryPrepareEngine/PreparedQueryPrepareEngine)
碗降,兩個實現類的區(qū)別在于是否使用SQL解析的緩存(PreparedQueryPrepareEngine用緩存隘竭,推薦)。同時在這層加載SPI裝飾層對象讼渊。 - 接著進入下層
org.apache.shardingsphere.underlying.route.DataNodeRouter
动看,這層主要是增加了SPI埋點org.apache.shardingsphere.underlying.route.hook.SPIRoutingHook
:分為開始,成功爪幻,失敗三階段菱皆。接著傳遞到下層。 - 本層解析引擎
org.apache.shardingsphere.sql.parser.SQLParserEngine
負責解析挨稿,并將解析的SQLStatement
包裝在上下文SQLStatementContext
中作為整體構成RouteContext
返回仇轻。 - 此時上下文中已經持有SQL的解析結果的上下文傳遞到SPI路由裝飾層進行實際的路由。本層中會更加Statement的不同獲取不同策略的路由引擎(
org.apache.shardingsphere.sharding.route.engine.type.ShardingRouteEngine
)實現類進行路由奶甘,然后得到路由結果篷店,至此準備階段結束。
SQL改寫
SQL改寫主要做什么臭家?直接查看官方說明疲陕。改表名索引之類的標識符、補列钉赁、分頁修正蹄殃、優(yōu)化的范疇。
同樣是在準備階段你踩,SQL路由完成返回了路由上下文(RouteContext
)之后org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#executeRewrite
方法中:
private Collection<ExecutionUnit> executeRewrite(final String sql, final List<Object> parameters, final RouteContext routeContext) {
registerRewriteDecorator();
SQLRewriteContext sqlRewriteContext = rewriter.createSQLRewriteContext(sql, parameters, routeContext.getSqlStatementContext(), routeContext);
return routeContext.getRouteResult().getRouteUnits().isEmpty() ? rewrite(sqlRewriteContext) : rewrite(routeContext, sqlRewriteContext);
}
- SPI裝飾層對象加載
org.apache.shardingsphere.underlying.rewrite.context.SQLRewriteContextDecorator
- 進入
org.apache.shardingsphere.underlying.rewrite.SQLRewriteEntry#createSQLRewriteContext
完成上下文的創(chuàng)建并執(zhí)行裝飾層邏輯诅岩。 - 接著調用
org.apache.shardingsphere.underlying.rewrite.engine.SQLRouteRewriteEngine
重寫引擎執(zhí)行重寫邏輯。
SQL執(zhí)行
SQL在解析带膜、路由吩谦,初始化后進入了執(zhí)行環(huán)節(jié)。
- 首先由執(zhí)行器
org.apache.shardingsphere.shardingjdbc.executor.AbstractStatementExecutor#exeXXX
進入執(zhí)行钱慢,維護關鍵的執(zhí)行邏輯并以匿名內部類的方式將邏輯下傳逮京,如下例:org.apache.shardingsphere.shardingjdbc.executor.StatementExecutor#executeQuery
public List<QueryResult> executeQuery() throws SQLException {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
@Override
protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
return getQueryResult(sql, statement, connectionMode);
}
};
return executeCallback(executeCallback);
}
private QueryResult getQueryResult(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
ResultSet resultSet = statement.executeQuery(sql);
getResultSets().add(resultSet);
return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet) : new MemoryQueryResult(resultSet);
}
- 然后進入
org.apache.shardingsphere.sharding.execute.sql.execute.SQLExecuteTemplate
執(zhí)行模板,由模板調度執(zhí)行引擎:org.apache.shardingsphere.underlying.executor.engine.ExecutorEngine
- 執(zhí)行引擎
org.apache.shardingsphere.underlying.executor.engine.ExecutorEngine
負責發(fā)起執(zhí)行束莫,然后同步或者異步執(zhí)行懒棉。 - 執(zhí)行的邏輯單元封裝咋對象
org.apache.shardingsphere.sharding.execute.sql.execute.SQLExecuteCallback
中,如下的execute0方法览绿,封裝了異常處理策严、SQL執(zhí)行SPI埋點``、SQL執(zhí)行饿敲。
private T execute0(final StatementExecuteUnit statementExecuteUnit, final boolean isTrunkThread, final Map<String, Object> dataMap) throws SQLException {
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
DataSourceMetaData dataSourceMetaData = getDataSourceMetaData(statementExecuteUnit.getStatement().getConnection().getMetaData());
SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();
try {
ExecutionUnit executionUnit = statementExecuteUnit.getExecutionUnit();
sqlExecutionHook.start(executionUnit.getDataSourceName(), executionUnit.getSqlUnit().getSql(), executionUnit.getSqlUnit().getParameters(), dataSourceMetaData, isTrunkThread, dataMap);
T result = executeSQL(executionUnit.getSqlUnit().getSql(), statementExecuteUnit.getStatement(), statementExecuteUnit.getConnectionMode());
sqlExecutionHook.finishSuccess();
return result;
} catch (final SQLException ex) {
sqlExecutionHook.finishFailure(ex);
ExecutorExceptionHandler.handleException(ex);
return null;
}
}
SQL歸并
SQL的歸并主要針對于多節(jié)點返回到的數據進行處理的過程妻导,相關的介紹參考官網
- SQL的歸并依賴于當前的連接模式
ConnectionMode
,參考上文Connection部分,連接模式決定了SQL執(zhí)行的返回結果org.apache.shardingsphere.sharding.execute.sql.execute.result.StreamQueryResult
ororg.apache.shardingsphere.sharding.execute.sql.execute.result.MemoryQueryResult
- 進入歸并引擎
org.apache.shardingsphere.underlying.pluggble.merge.MergeEngine#merge
進行歸并操作倔韭。MergeEngine
會加載SPI裝飾層的處理引擎org.apache.shardingsphere.underlying.merge.engine.ResultProcessEngine
并注冊到org.apache.shardingsphere.underlying.merge.MergeEntry
中术浪。 - 然后進入
org.apache.shardingsphere.underlying.merge.MergeEntry#process
進行處理。