執(zhí)行引擎的職責定位是將改寫后的SQL發(fā)送到對應(yīng)數(shù)據(jù)庫(經(jīng)路由計算所得)執(zhí)行的過程。執(zhí)行引擎采用了callback回調(diào)的設(shè)計模式僻焚,對給定的輸入分組集合執(zhí)行指定的callback函數(shù)踱卵。
與Spring的JDBCTemplate今豆、TransactionTemplate類似灯蝴,ShardingSphere中的SQLExecuteTemplate撇簿、ExecutorEngine也是如此設(shè)計聂渊,引擎使用者提供CallBack實現(xiàn)類推汽,使用該模式是因為在SQL執(zhí)行時,需要支持更多類型的SQL歧沪,不同的SQL如DQL歹撒、DML、DDL诊胞、不帶參數(shù)的SQL暖夭、參數(shù)化SQL等,不同的SQL操作邏輯并不一樣撵孤,但執(zhí)行引擎需要提供一個通用的執(zhí)行策略迈着。
代碼執(zhí)行分析
繼續(xù)回到起點,在ShardingPreparedStatement類中
@Override
public ResultSet executeQuery() throws SQLException {
ResultSet result;
try {
…
initPreparedStatementExecutor();//PreparedStatement執(zhí)行器初始化
MergedResult mergedResult = mergeQuery(preparedStatementExecutor.executeQuery());
…
}
private void initPreparedStatementExecutor() throws SQLException {
preparedStatementExecutor.init(executionContext);
setParametersForStatements();// 設(shè)置Statement參數(shù)
replayMethodForStatements();// satement設(shè)置方法調(diào)用
}
private void setParametersForStatements() {
for (int i = 0; i < preparedStatementExecutor.getStatements().size(); i++) {
replaySetParameter((PreparedStatement) preparedStatementExecutor.getStatements().get(i), preparedStatementExecutor.getParameterSets().get(i));
}
}
private void replayMethodForStatements() {
for (Statement each : preparedStatementExecutor.getStatements()) {
replayMethodsInvocation(each);
}
}
可以看到進行了preparedStatementExecutor的初始化邪码、Statement參數(shù)設(shè)置裕菠、方法回放等操作。進入PreparedStatementExecutor類中
org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor
/**
* Prepared statement executor.
*/
public final class PreparedStatementExecutor extends AbstractStatementExecutor {
@Getter
private final boolean returnGeneratedKeys;
public PreparedStatementExecutor(
final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys, final ShardingConnection shardingConnection) {
super(resultSetType, resultSetConcurrency, resultSetHoldability, shardingConnection);
this.returnGeneratedKeys = returnGeneratedKeys;
}
/**
* Initialize executor.
*
* @param executionContext execution context
* @throws SQLException SQL exception
*/
public void init(final ExecutionContext executionContext) throws SQLException {
setSqlStatementContext(executionContext.getSqlStatementContext());
getInputGroups().addAll(obtainExecuteGroups(executionContext.getExecutionUnits()));// 生成執(zhí)行分組
cacheStatements();
}
private Collection<InputGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<ExecutionUnit> executionUnits) throws SQLException {
return getSqlExecutePrepareTemplate().getExecuteUnitGroups(executionUnits, new SQLExecutePrepareCallback() {
@Override
// 在指定數(shù)據(jù)源上創(chuàng)建要求數(shù)量的數(shù)據(jù)庫連接
public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);
}
@Override
//根據(jù)執(zhí)行單元信息 創(chuàng)建Statement執(zhí)行單元對象
public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final ExecutionUnit executionUnit, final ConnectionMode connectionMode) throws SQLException {
return new StatementExecuteUnit(executionUnit, createPreparedStatement(connection, executionUnit.getSqlUnit().getSql()), connectionMode);
}
});
}
@SuppressWarnings("MagicConstant")
private PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException {
return returnGeneratedKeys ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
: connection.prepareStatement(sql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability());
}
/**
* Execute query.
*
* @return result set list
* @throws SQLException SQL exception
*/
public List<QueryResult> executeQuery() throws SQLException {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
@Override
// 在指定的Statement上執(zhí)行SQL闭专,將JDBC結(jié)果集包裝成查詢QueryResult對象(基于流模式奴潘、基于內(nèi)存模式兩類)
protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
return getQueryResult(statement, connectionMode);
}
};
return executeCallback(executeCallback);// 通過executeCallback操作
}
// 執(zhí)行SQL,然后將結(jié)果集轉(zhuǎn)成QueryResult對象
private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException {
PreparedStatement preparedStatement = (PreparedStatement) statement;
ResultSet resultSet = preparedStatement.executeQuery();
getResultSets().add(resultSet);
return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet) : new MemoryQueryResult(resultSet);
}
…
}
首先看init方法中調(diào)用obtainExecuteGroups方法影钉,obtainExecuteGroups方法中又調(diào)用SQLExecutePrepareTemplate.的getExecuteUnitGroups方法画髓,將輸入的ExecutionUnit集合和SQLExecutePrepareCallback生成InputGroup<StatementExecuteUnit>集合。
進入SQLExecutePrepareTemplate類看看getExecuteUnitGroups方法:
org.apache.shardingsphere.sharding.execute.sql.prepare.SQLExecutePrepareTemplate
/**
* SQL execute prepare template.
*/
@RequiredArgsConstructor
public final class SQLExecutePrepareTemplate {
private final int maxConnectionsSizePerQuery;
/**
* Get execute unit groups.
*
* @param executionUnits execution units
* @param callback SQL execute prepare callback
* @return statement execute unit groups
* @throws SQLException SQL exception
*/
public Collection<InputGroup<StatementExecuteUnit>> getExecuteUnitGroups(final Collection<ExecutionUnit> executionUnits, final SQLExecutePrepareCallback callback) throws SQLException {
return getSynchronizedExecuteUnitGroups(executionUnits, callback);
}
// 生成同步執(zhí)行單元分組
private Collection<InputGroup<StatementExecuteUnit>> getSynchronizedExecuteUnitGroups(
final Collection<ExecutionUnit> executionUnits, final SQLExecutePrepareCallback callback) throws SQLException {
Map<String, List<SQLUnit>> sqlUnitGroups = getSQLUnitGroups(executionUnits);// 生成數(shù)據(jù)源與其SQLUnit的對應(yīng)映射
Collection<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();
for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback));// 將SQLUnit轉(zhuǎn)化為InputGroup<StatementExecuteUnit>平委,對應(yīng)關(guān)系為1:1
}
return result;
}
// 根據(jù)執(zhí)行單元ExecutionUnit奈虾,生成各數(shù)據(jù)源對應(yīng)的SQLUnit集合
private Map<String, List<SQLUnit>> getSQLUnitGroups(final Collection<ExecutionUnit> executionUnits) {
Map<String, List<SQLUnit>> result = new LinkedHashMap<>(executionUnits.size(), 1);
for (ExecutionUnit each : executionUnits) {
if (!result.containsKey(each.getDataSourceName())) {
result.put(each.getDataSourceName(), new LinkedList<>());
}
result.get(each.getDataSourceName()).add(each.getSqlUnit());
}
return result;
}
// 生成SQL執(zhí)行分組
private List<InputGroup<StatementExecuteUnit>> getSQLExecuteGroups(final String dataSourceName,
final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {
List<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();
int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1);
List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize);
ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;
List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size()); // 根據(jù)要執(zhí)行的SQL數(shù)量和maxConnectionsSizePerQuery配置,計算
int count = 0;
for (List<SQLUnit> each : sqlUnitPartitions) {
result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback));// 根據(jù)要執(zhí)行的SQLUnit廉赔,生成對應(yīng)StatementExecuteUnit對象肉微,添加到返回結(jié)果集中
}
return result;
}
private InputGroup<StatementExecuteUnit> getSQLExecuteGroup(final ConnectionMode connectionMode, final Connection connection,
final String dataSourceName, final List<SQLUnit> sqlUnitGroup, final SQLExecutePrepareCallback callback) throws SQLException {
List<StatementExecuteUnit> result = new LinkedList<>();
for (SQLUnit each : sqlUnitGroup) {
result.add(callback.createStatementExecuteUnit(connection, new ExecutionUnit(dataSourceName, each), connectionMode));
}
return new InputGroup<>(result);
}
可以看到,SQLExecutePrepareTemplate類就是將ExecutionUnit集合進行分組轉(zhuǎn)化為InputGroup<StatementExecuteUnit>集合蜡塌。其核心邏輯是根據(jù)maxConnectionsSizePerQuery值(每個SQL最多可以配置多少數(shù)據(jù)庫連接供使用)碉纳,計算出當前SQL需要多少個數(shù)據(jù)庫連接
/**
* Max opened connection size for each query.
*/
MAX_CONNECTIONS_SIZE_PER_QUERY("max.connections.size.per.query", String.valueOf(1), int.class),
org.apache.shardingsphere.sharding.execute.sql.execute.SQLExecuteTemplate
public final class SQLExecuteTemplate {
private final ExecutorEngine executorEngine;
private final boolean serial;
/**
* Execute.
*
* @param inputGroups input groups
* @param callback SQL execute callback
* @param <T> class type of return value
* @return execute result
* @throws SQLException SQL exception
*/
public <T> List<T> execute(final Collection<InputGroup<? extends StatementExecuteUnit>> inputGroups, final SQLExecuteCallback<T> callback) throws SQLException {
return execute(inputGroups, null, callback);
}
/**
* Execute.
*
* @param inputGroups input groups
* @param firstCallback first SQL execute callback
* @param callback SQL execute callback
* @param <T> class type of return value
* @return execute result
* @throws SQLException SQL exception
*/
@SuppressWarnings("unchecked")
public <T> List<T> execute(final Collection<InputGroup<? extends StatementExecuteUnit>> inputGroups,
final SQLExecuteCallback<T> firstCallback, final SQLExecuteCallback<T> callback) throws SQLException {
try {
return executorEngine.execute((Collection) inputGroups, firstCallback, callback, serial);
} catch (final SQLException ex) {
ExecutorExceptionHandler.handleException(ex);
return Collections.emptyList();
}
}
}
可以看到其內(nèi)部操作又是通過ExecutorEngine類完成,進入該類看看
org.apache.shardingsphere.underlying.executor.engine.ExecutorEngine
/**
* Executor engine.
*/
public final class ExecutorEngine implements AutoCloseable {
private final ShardingSphereExecutorService executorService;
public ExecutorEngine(final int executorSize) {
executorService = new ShardingSphereExecutorService(executorSize);
}
/**
* Execute.
*
* @param inputGroups input groups
* @param callback grouped callback
* @param <I> type of input value
* @param <O> type of return value
* @return execute result
* @throws SQLException throw if execute failure
*/
public <I, O> List<O> execute(final Collection<InputGroup<I>> inputGroups, final GroupedCallback<I, O> callback) throws SQLException {
return execute(inputGroups, null, callback, false);
}
/**
* Execute.
*
* @param inputGroups input groups
* @param firstCallback first grouped callback
* @param callback other grouped callback
* @param serial whether using multi thread execute or not
* @param <I> type of input value
* @param <O> type of return value
* @return execute result
* @throws SQLException throw if execute failure
*/
public <I, O> List<O> execute(final Collection<InputGroup<I>> inputGroups,
final GroupedCallback<I, O> firstCallback, final GroupedCallback<I, O> callback, final boolean serial) throws SQLException {
if (inputGroups.isEmpty()) {
return Collections.emptyList();
}
return serial ? serialExecute(inputGroups, firstCallback, callback) : parallelExecute(inputGroups, firstCallback, callback);
}
// 串行執(zhí)行
private <I, O> List<O> serialExecute(final Collection<InputGroup<I>> inputGroups, final GroupedCallback<I, O> firstCallback, final GroupedCallback<I, O> callback) throws SQLException {
Iterator<InputGroup<I>> inputGroupsIterator = inputGroups.iterator();
InputGroup<I> firstInputs = inputGroupsIterator.next();
List<O> result = new LinkedList<>(syncExecute(firstInputs, null == firstCallback ? callback : firstCallback));
for (InputGroup<I> each : Lists.newArrayList(inputGroupsIterator)) {
result.addAll(syncExecute(each, callback));
}
return result;
}
// 并行執(zhí)行岗照,可以支持兩個回調(diào)函數(shù)村象,第一條記錄執(zhí)行第一個回調(diào)函數(shù),其它的執(zhí)行第二個回調(diào)函數(shù)
private <I, O> List<O> parallelExecute(final Collection<InputGroup<I>> inputGroups, final GroupedCallback<I, O> firstCallback, final GroupedCallback<I, O> callback) throws SQLException {
Iterator<InputGroup<I>> inputGroupsIterator = inputGroups.iterator();
InputGroup<I> firstInputs = inputGroupsIterator.next();
Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncExecute(Lists.newArrayList(inputGroupsIterator), callback);
return getGroupResults(syncExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
}
// 同步執(zhí)行
private <I, O> Collection<O> syncExecute(final InputGroup<I> inputGroup, final GroupedCallback<I, O> callback) throws SQLException {
return callback.execute(inputGroup.getInputs(), true, ExecutorDataMap.getValue());
}
// 異步執(zhí)行
private <I, O> Collection<ListenableFuture<Collection<O>>> asyncExecute(final List<InputGroup<I>> inputGroups, final GroupedCallback<I, O> callback) {
Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>();
for (InputGroup<I> each : inputGroups) {
result.add(asyncExecute(each, callback));
}
return result;
}
private <I, O> ListenableFuture<Collection<O>> asyncExecute(final InputGroup<I> inputGroup, final GroupedCallback<I, O> callback) {
final Map<String, Object> dataMap = ExecutorDataMap.getValue();
return executorService.getExecutorService().submit(() -> callback.execute(inputGroup.getInputs(), false, dataMap));
}
private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<ListenableFuture<Collection<O>>> restFutures) throws SQLException {
List<O> result = new LinkedList<>(firstResults);
for (ListenableFuture<Collection<O>> each : restFutures) {
try {
result.addAll(each.get());
} catch (final InterruptedException | ExecutionException ex) {
return throwException(ex);
}
}
return result;
}
ExecutorEngine類中方法主要分為兩個串行執(zhí)行serialExecute與并行執(zhí)行parallelExecute攒至,前者使用的是同步執(zhí)行即當前應(yīng)用線程厚者,后者則通過ShardingSphere內(nèi)置的線程池完成,該線程池類為ShardingSphereExecutorService迫吐。值得注意的是這些執(zhí)行方法中都對應(yīng)的有兩個CallBack參數(shù)库菲,在真正執(zhí)行時會對分組后的第一條記錄執(zhí)行第一個CallBack函數(shù),其它的執(zhí)行第二個CallBack函數(shù)志膀,這么設(shè)計的目的是有些操作只需執(zhí)行一次熙宇,例如獲取元數(shù)據(jù)鳖擒,只需要在第一條記錄操作生成,后續(xù)直接復(fù)用即可烫止。
例如在Sharding-proxy中
org.apache.shardingsphere.shardingproxy.backend.communication.jdbc.execute.JDBCExecuteEngine
public BackendResponse execute(final ExecutionContext executionContext) throws SQLException {
SQLStatementContext sqlStatementContext = executionContext.getSqlStatementContext();
boolean isReturnGeneratedKeys = sqlStatementContext.getSqlStatement() instanceof InsertStatement;
boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
Collection<InputGroup<StatementExecuteUnit>> inputGroups = sqlExecutePrepareTemplate.getExecuteUnitGroups(
executionContext.getExecutionUnits(), new ProxyJDBCExecutePrepareCallback(backendConnection, jdbcExecutorWrapper, isReturnGeneratedKeys));
Collection<ExecuteResponse> executeResponses = sqlExecuteTemplate.execute((Collection) inputGroups,
new ProxySQLExecuteCallback(sqlStatementContext, backendConnection, jdbcExecutorWrapper, isExceptionThrown, isReturnGeneratedKeys, true),
new ProxySQLExecuteCallback(sqlStatementContext, backendConnection, jdbcExecutorWrapper, isExceptionThrown, isReturnGeneratedKeys, false));
ExecuteResponse executeResponse = executeResponses.iterator().next();
…
}
回頭看下ShardingSphere自定義的線程池
org.apache.shardingsphere.underlying.executor.engine.impl.ShardingSphereExecutorService
/**
* ShardingSphere executor service.
*/
@Getter
public final class ShardingSphereExecutorService {
private static final String DEFAULT_NAME_FORMAT = "%d";
private static final ExecutorService SHUTDOWN_EXECUTOR = Executors.newSingleThreadExecutor(ShardingSphereThreadFactoryBuilder.build("Executor-Engine-Closer"));
private ListeningExecutorService executorService;
public ShardingSphereExecutorService(final int executorSize) {
this(executorSize, DEFAULT_NAME_FORMAT);
}
public ShardingSphereExecutorService(final int executorSize, final String nameFormat) {
executorService = MoreExecutors.listeningDecorator(getExecutorService(executorSize, nameFormat));
MoreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS);
}
private ExecutorService getExecutorService(final int executorSize, final String nameFormat) {
ThreadFactory threadFactory = ShardingSphereThreadFactoryBuilder.build(nameFormat);
return 0 == executorSize ? Executors.newCachedThreadPool(threadFactory) : Executors.newFixedThreadPool(executorSize, threadFactory);
}
…
}
可以看到ShardingSphereExecutorService 類中用的并不是JDK中原生的蒋荚,而是google guava工具包中的可監(jiān)聽ExecutorService,不過目前ShardingSphere中沒看到使用其listen功能馆蠕,應(yīng)該是為后續(xù)擴展考慮期升。
總結(jié)
相比其它引擎,可以看到執(zhí)行引擎較為簡單互躬,主要包括三部分:1. 是SQLExecutePrepareTemplate播赁,2. 是SQLExecuteTemplate,3. ExecutorEngine吼渡。
SQLExecutePrepareTemplate類負責生成執(zhí)行分組信息容为,輸入為 Collection<ExecutionUnit> ,輸出為Collection<InputGroup<StatementExecuteUnit>>寺酪;SQLExecuteTemplate類負責執(zhí)行具體的SQL操作坎背,輸入為Collection<InputGroup<StatementExecuteUnit>>與SQLExecuteCallback,這個類目前并沒有自身邏輯房维,它就是直接調(diào)用了ExecutorEngine類完成SQL執(zhí)行沼瘫;ExecutorEngine則真正負責完成SQL的串行和并行執(zhí)行。
在5.x中執(zhí)行引擎的類名進行了調(diào)整咙俩,SQLExecuteTemplate修改為org.apache.shardingsphere.infra.executor.sql.resourced.jdbc.executor.SQLExecutor,
ExecutorEngine類修改為org.apache.shardingsphere.infra.executor.kernel.ExecutorKernel,但具體功能實現(xiàn)沒有太大變化
最后畫一個執(zhí)行引擎的流程圖: