1.基礎(chǔ)介紹
- sharing-jdbc是一個(gè)在客戶(hù)端的數(shù)據(jù)源層面實(shí)現(xiàn)分庫(kù)分表的中間件,對(duì)應(yīng)分析源碼首先要找到代碼執(zhí)行的入口笤喳,對(duì)于一個(gè)數(shù)據(jù)庫(kù)操作入口當(dāng)然是Statement的相關(guān)接口,所以我們應(yīng)該拋開(kāi)各種ORM框架,用原始的Statement來(lái)分析sharding-jdbc源碼
- Statement是jdbc中用來(lái)執(zhí)行靜態(tài)sql,并得到返回的接口的抽象接口愉舔,本文主要介紹在sharding-jdbc中來(lái)實(shí)現(xiàn)Statement的實(shí)現(xiàn)類(lèi):ShardingStatement
2.總體概括
以下為官方網(wǎng)站提供的流程圖
image.png
3.看源碼
- 調(diào)用,基礎(chǔ)的調(diào)用方式回顧,此時(shí)的Statement就是ShardingStatement
String sql="insert into t_order (name) VALUES (\"我是4\")";
Connection connection=dataSource.getConnection();
// ShardingStatement
Statement statement=connection.createStatement();
statement.executeUpdate(sql);
- ShardingStatement.executeUpdate;執(zhí)行更新的sql伙菜;重點(diǎn)的方法為sqlRoute,所以后續(xù)會(huì)主要分析這個(gè)方法
@Override
public int executeUpdate(final String sql) throws SQLException {
try {
//這一步是調(diào)用statementExecutor.clear方法轩缤,目的是清空上次執(zhí)行的語(yǔ)句,參數(shù),結(jié)果
clearPrevious();
//這個(gè)方法就實(shí)現(xiàn)了火的,SQL解析壶愤,查詢(xún)優(yōu)化,SQL路由馏鹤,SQL改寫(xiě)任務(wù)
sqlRoute(sql);
//賦值到StatementExecutor(一個(gè)用來(lái)執(zhí)行真正sql的包裝類(lèi))
initStatementExecutor();
//對(duì)多個(gè)庫(kù)進(jìn)行真正的更新操作
return statementExecutor.executeUpdate();
} finally {
refreshTableMetaData();
currentResultSet = null;
}
}
- ShardingStatement.sqlRoute;SQL解析征椒,查詢(xún)優(yōu)化,SQL路由湃累,SQL改寫(xiě)
private void sqlRoute(final String sql) {
//獲取傳遞參數(shù)的上下文對(duì)象
ShardingContext shardingContext = connection.getShardingContext();
//StatementRoutingEngine;用來(lái)封裝執(zhí)行分庫(kù)分表邏輯和主從邏輯勃救,此時(shí)調(diào)用此類(lèi)的route方法
routeResult = new StatementRoutingEngine(shardingContext.getShardingRule(),
shardingContext.getMetaData().getTable(), shardingContext.getDatabaseType(), shardingContext.isShowSQL(), shardingContext.getMetaData().getDataSource()).route(sql);
}
- StatementRoutingEngine.route:里面就是調(diào)用了ShardingRouter.route方法,并將結(jié)果傳給ShardingMasterSlaveRouter.route方法
public SQLRouteResult route(final String logicSQL) {
//解析SQL轉(zhuǎn)化為SQLStatement,表示這個(gè)SQL的對(duì)象類(lèi)
SQLStatement sqlStatement = shardingRouter.parse(logicSQL, false);
//首先調(diào)用ShardingRouter.route治力,得到SQLRouteResult:邏輯SQL經(jīng)過(guò)優(yōu)化蒙秒,路由,改寫(xiě)后的結(jié)果對(duì)象,如下圖
return masterSlaveRouter.route(shardingRouter.route(logicSQL, Collections.emptyList(), sqlStatement));
}
SQLRouteResult對(duì)象內(nèi)容:
image.png
- ParsingSQLRouter.parse:解析邏輯sql宵统,形成SQL執(zhí)行對(duì)象
public SQLStatement parse(final boolean useCache) {
// SQL解析緩存,有則返回
Optional<SQLStatement> cachedSQLStatement = getSQLStatementFromCache(useCache);
if (cachedSQLStatement.isPresent()) {
return cachedSQLStatement.get();
}
//詞法解析器
LexerEngine lexerEngine = LexerEngineFactory.newInstance(dbType, sql);
lexerEngine.nextToken();
//語(yǔ)法解析結(jié)果
SQLStatement result = SQLParserFactory.newInstance(dbType, lexerEngine.getCurrentToken().getType(), shardingRule, lexerEngine, shardingTableMetaData).parse();
//添加緩存
if (useCache) {
ParsingResultCache.getInstance().put(sql, result);
}
return result;
}
- ParsingSQLRouter.route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement):執(zhí)行查詢(xún)優(yōu)化晕讲,SQL路由,SQL改寫(xiě)榜田,當(dāng)時(shí)insert并且沒(méi)有手動(dòng)寫(xiě)入id時(shí)益兄,則此時(shí)會(huì)生成分布式ID
@Override
public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement) {
//判斷是否是insert,如果是則生成分布式主鍵ID
GeneratedKey generatedKey = null;
if (sqlStatement instanceof InsertStatement) {
generatedKey = getGenerateKey(shardingRule, (InsertStatement) sqlStatement, parameters);
}
//初始化返回結(jié)果
SQLRouteResult result = new SQLRouteResult(sqlStatement, generatedKey);
//調(diào)用優(yōu)化引擎優(yōu)化SQL
ShardingConditions shardingConditions = OptimizeEngineFactory.newInstance(shardingRule, sqlStatement, parameters, generatedKey).optimize();
//賦值分布式主鍵
if (null != generatedKey) {
setGeneratedKeys(result, generatedKey);
}
//根據(jù)CRUD調(diào)用不同的路由引擎獲取應(yīng)該操作的物理庫(kù)和物理表,形成路由結(jié)果箭券,后續(xù)會(huì)分析該方法
RoutingResult routingResult = route(sqlStatement, shardingConditions);
//初始化重寫(xiě)引擎
SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, logicSQL, databaseType, sqlStatement, shardingConditions, parameters);
//判斷是否路由到一個(gè)物理庫(kù)中
boolean isSingleRouting = routingResult.isSingleRouting();
//處理在路由到多個(gè)物理庫(kù)時(shí)處理limit語(yǔ)法
if (sqlStatement instanceof SelectStatement && null != ((SelectStatement) sqlStatement).getLimit()) {
processLimit(parameters, (SelectStatement) sqlStatement, isSingleRouting);
}
//按照路由結(jié)果重寫(xiě)SQL,生成物理庫(kù)可執(zhí)行的SQL
SQLBuilder sqlBuilder = rewriteEngine.rewrite(!isSingleRouting);
for (TableUnit each : routingResult.getTableUnits().getTableUnits()) {
result.getRouteUnits().add(new RouteUnit(each.getDataSourceName(), rewriteEngine.generateSQL(each, sqlBuilder, shardingDataSourceMetaData)));
}
//是否打印最終的SQL
if (showSQL) {
SQLLogger.logSQL(logicSQL, sqlStatement, result.getRouteUnits());
}
return result;
}
7.ParsingSQLRouter.route(final SQLStatement sqlStatement, final ShardingConditions shardingConditions):調(diào)用分庫(kù)分表策略生成分庫(kù)分表結(jié)果
private RoutingResult route(final SQLStatement sqlStatement, final ShardingConditions shardingConditions) {
Collection<String> tableNames = sqlStatement.getTables().getTableNames();
RoutingEngine routingEngine;
if (sqlStatement instanceof UseStatement) {
routingEngine = new IgnoreRoutingEngine();
} else if (sqlStatement instanceof DDLStatement || (sqlStatement instanceof DCLStatement && ((DCLStatement) sqlStatement).isGrantForSingleTable())) {
routingEngine = new TableBroadcastRoutingEngine(shardingRule, sqlStatement);
} else if (sqlStatement instanceof ShowDatabasesStatement || sqlStatement instanceof ShowTablesStatement) {
routingEngine = new DatabaseBroadcastRoutingEngine(shardingRule);
} else if (sqlStatement instanceof DCLStatement) {
routingEngine = new InstanceBroadcastRoutingEngine(shardingRule, shardingDataSourceMetaData);
} else if (shardingConditions.isAlwaysFalse()) {
routingEngine = new UnicastRoutingEngine(shardingRule, tableNames);
} else if (sqlStatement instanceof DALStatement) {
routingEngine = new UnicastRoutingEngine(shardingRule, tableNames);
} else if (tableNames.isEmpty() && sqlStatement instanceof SelectStatement) {
routingEngine = new UnicastRoutingEngine(shardingRule, tableNames);
} else if (tableNames.isEmpty()) {
routingEngine = new DatabaseBroadcastRoutingEngine(shardingRule);
// CRUD語(yǔ)句會(huì)進(jìn)入下面其中一個(gè)
} else if (1 == tableNames.size() || shardingRule.isAllBindingTables(tableNames) || shardingRule.isAllInDefaultDataSource(tableNames)) {
routingEngine = new StandardRoutingEngine(shardingRule, tableNames.iterator().next(), shardingConditions);
} else {
// TODO config for cartesian set
routingEngine = new ComplexRoutingEngine(shardingRule, tableNames, shardingConditions);
}
return routingEngine.route();
}
StandardRoutingEngine.route:查看進(jìn)去會(huì)看到净捅,進(jìn)入到如下源碼
private Collection<DataNode> routeByShardingConditions(final TableRule tableRule) {
Collection<DataNode> result = new LinkedList<>();
if (shardingConditions.getShardingConditions().isEmpty()) {
result.addAll(route(tableRule, Collections.<ShardingValue>emptyList(), Collections.<ShardingValue>emptyList()));
} else {
//獲取配置的分庫(kù)策略類(lèi),里面會(huì)調(diào)用我們的比如根據(jù)Id做hash的分庫(kù)算法等等
ShardingStrategy dataBaseShardingStrategy = shardingRule.getDatabaseShardingStrategy(tableRule);
ShardingStrategy tableShardingStrategy = shardingRule.getTableShardingStrategy(tableRule);
for (ShardingCondition each : shardingConditions.getShardingConditions()) {
List<ShardingValue> databaseShardingValues = isGettingShardingValuesFromHint(dataBaseShardingStrategy)
? getDatabaseShardingValuesFromHint() : getShardingValues(dataBaseShardingStrategy.getShardingColumns(), each);
List<ShardingValue> tableShardingValues = isGettingShardingValuesFromHint(tableShardingStrategy)
? getTableShardingValuesFromHint() : getShardingValues(tableShardingStrategy.getShardingColumns(), each);
Collection<DataNode> dataNodes = route(tableRule, databaseShardingValues, tableShardingValues);
reviseShardingConditions(each, dataNodes);
result.addAll(dataNodes);
}
}
return result;
}
4.總結(jié)
此文是一個(gè)簡(jiǎn)單的流程分析辩块,希望幫助大家蛔六,對(duì)整個(gè)流程有個(gè)認(rèn)知,為后續(xù)關(guān)鍵節(jié)點(diǎn)的實(shí)現(xiàn)有個(gè)整體的概覽