目前ShardingSphere提供兩種接入模式JDBC與Proxy(MySQL協(xié)議)芽狗,sidecar還未實現(xiàn)矮锈,本篇介紹JDBC接入脆丁。
JDBC作為java訪問數(shù)據(jù)庫的一個接口規(guī)范秃诵,它定義了一套與數(shù)據(jù)庫交互的方式肥矢,例如要通過SQL從MySQL數(shù)據(jù)中查詢一些數(shù)據(jù)斗埂,JDBC方式的代碼可能如下:
try (Connection conn = DriverManager.getConnection(
"jdbc:mysql://localhost/mydb",
"user",
"password")) {
try (PreparedStatement ps =
conn.prepareStatement("SELECT i.*, j.* FROM Omega i, Zappa j WHERE i.name = ? AND
j.num = ?")
) {
// In the SQL statement being prepared, each question mark is a placeholder
// that must be replaced with a value you provide through a "set" method invocation.
// The following two method calls replace the two placeholders; the first is
// replaced by a string value, and the second by an integer value.
ps.setString(1, "Poor Yorick");
ps.setInt(2, 8008);
// The ResultSet, rs, conveys the result of executing the SQL statement.
// Each time you call rs.next(), an internal row pointer, or cursor,
// is advanced to the next row of the result. The cursor initially is
// positioned before the first row.
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
int numColumns = rs.getMetaData().getColumnCount();
for (int i = 1; i <= numColumns; i++) {
System.out.println("COLUMN " + i + " = " + rs.getObject(i));
}
}
}
}
}
對于ShardingSphere來說符糊,JDBC子項目的功能定位很明確,就是可以讓使用者按照J(rèn)DBC方式實現(xiàn)分庫分表呛凶、讀寫分離男娄、加解密等功能,在設(shè)計方面就是裝飾器模式,在完成SQL解析模闲、路由建瘫、改寫等操作后,由內(nèi)部尸折、底層真正的JDBC資源(DataSource啰脚、Connection、Statement实夹、ResultSet)來最終完成SQL的執(zhí)行橄浓。
與引擎篇不一樣,在JDBC篇亮航,將按照類層次結(jié)構(gòu)展開介紹荸实,這樣可以更好的從整體上理解設(shè)計。JDBC規(guī)范里最主要的幾個接口:DataSource塞赂、Connection泪勒、Statement、PreparedStatement宴猾、ResultSet、DatabaseMetaData叼旋、ResultSetMetaData仇哆、ParameterMetaData。本文將按照這個順序?qū)hardingSphere對這些接口的各主要實現(xiàn)類源碼進(jìn)行解讀夫植。
DataSource
從名字就可以到ShardingSphere根據(jù)不同的功能提供了對應(yīng)的DataSource接口實現(xiàn)讹剔,按照此類圖中從上往下看下。
首先是AbstractUnsupportedOperationDataSource類详民,它是ShardingSphere各DataSource實現(xiàn)類的基類延欠,雖然實現(xiàn)了DataSource接口,但只實現(xiàn)了getLoginTimeout和 setLoginTimeout方法沈跨,實現(xiàn)中直接拋出了SQLFeatureNotSupportedException由捎,這兩方法在其子類各功能的DataSource實現(xiàn)類中都未進(jìn)行重寫,所以ShardingSphere明確了不支持這兩個方法的調(diào)用饿凛。
JDBC是JAVA操作數(shù)據(jù)庫的一套標(biāo)準(zhǔn)接口狞玛,但各接口的實現(xiàn)是由數(shù)據(jù)庫驅(qū)動中負(fù)責(zé)實現(xiàn)的,而各家廠商也并非對接口中所有方法都完整的進(jìn)行了支持涧窒,ShardingSphere也類似心肪,對于一些不常用或者無法提供準(zhǔn)確值的方法并未提供對應(yīng)的實現(xiàn),為了統(tǒng)一管理不支持的方法纠吴,所以在實現(xiàn)JDBC接口時硬鞍,往往都設(shè)計了一個公有的父類:一個AbstractUnsupported*類,其中各方法實現(xiàn)就直接拋出SQLFeatureNotSupportedException,這樣設(shè)計也是為了更方便對不支持的JDBC方法進(jìn)行統(tǒng)一管理固该。
org.apache.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationDataSource
/**
* Unsupported {@code Datasource} methods.
*/
public abstract class AbstractUnsupportedOperationDataSource extends WrapperAdapter implements DataSource {
@Override
public final int getLoginTimeout() throws SQLException {
throw new SQLFeatureNotSupportedException("unsupported getLoginTimeout()");
}
@Override
public final void setLoginTimeout(final int seconds) throws SQLException {
throw new SQLFeatureNotSupportedException("unsupported setLoginTimeout(int seconds)");
}
}
可以看到該類除了實現(xiàn)DataSource接口锅减,同事還繼承了WrapperAdapter類
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.WrapperAdapter
/**
* Adapter for {@code java.sql.Wrapper}.
*/
public abstract class WrapperAdapter implements Wrapper {
private final Collection<JdbcMethodInvocation> jdbcMethodInvocations = new ArrayList<>();
@SuppressWarnings("unchecked")
@Override
public final <T> T unwrap(final Class<T> iface) throws SQLException {
if (isWrapperFor(iface)) {
return (T) this;
}
throw new SQLException(String.format("[%s] cannot be unwrapped as [%s]", getClass().getName(), iface.getName()));
}
@Override
public final boolean isWrapperFor(final Class<?> iface) {
return iface.isInstance(this);
}
/**
* record method invocation.
*
* @param targetClass target class
* @param methodName method name
* @param argumentTypes argument types
* @param arguments arguments
*/
@SneakyThrows
public final void recordMethodInvocation(final Class<?> targetClass, final String methodName, final Class<?>[] argumentTypes, final Object[] arguments) {
jdbcMethodInvocations.add(new JdbcMethodInvocation(targetClass.getMethod(methodName, argumentTypes), arguments));
}
/**
* Replay methods invocation.
*
* @param target target object
*/
public final void replayMethodsInvocation(final Object target) {
for (JdbcMethodInvocation each : jdbcMethodInvocations) {
each.invoke(target);
}
}
}
該類實現(xiàn)了Wrapper接口,其內(nèi)部持有一個JdbcMethodInvocation的集合蹬音,它記錄了當(dāng)前JDBC資源的一些方法操作上煤,recordMethodInvocation方法負(fù)責(zé)記錄外圍應(yīng)用的一些設(shè)置方法的調(diào)用,例如setAutoCommit著淆、setReadOnly劫狠、setFetchSize、setMaxFieldSize等永部,在外圍程序調(diào)用ShardingConnection的setAutoCommit独泞、setReadOnly以及ShardingPreparedStatement的setFetchSize、setMaxFieldSize時進(jìn)行調(diào)用苔埋,replayMethodsInvocation完成在指定目標(biāo)對象回放這些方法調(diào)用懦砂,會在底層真實JDBC類(DB driver、數(shù)據(jù)庫連接池等)時進(jìn)行重新調(diào)用组橄。
WrapperAdapter是ShardingSphere各JDBC實現(xiàn)類的基礎(chǔ)基類荞膘,所以無論是DataSource、Connection玉工、Statement羽资、PrepareStatement都具備了該回放能力。
Wrapper是JDBC中提供的一個接口也是一種設(shè)計模式遵班,用于獲取JDBC代理類包裝的原始類屠升,在JDK該類的源碼有該接口的完整說明:
Interface for JDBC classes which provide the ability to retrieve the delegate instance when the instance in question is in fact a proxy class.
The wrapper pattern is employed by many JDBC driver implementations to provide extensions beyond
the traditional JDBC API that are specific to a data source. Developers may wish to gain access to
these resources that are wrapped (the delegates) as proxy class instances representing the
the actual resources. This interface describes a standard mechanism to access
these wrapped resources
represented by their proxy, to permit direct access to the resource delegates.
接下來看下AbstractDataSourceAdapter 類org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractDataSourceAdapter
/**
* Adapter for {@code Datasource}.
*/
@Getter
public abstract class AbstractDataSourceAdapter extends AbstractUnsupportedOperationDataSource implements AutoCloseable {
private final Map<String, DataSource> dataSourceMap;
private final DatabaseType databaseType;
@Setter
private PrintWriter logWriter = new PrintWriter(System.out);
public AbstractDataSourceAdapter(final Map<String, DataSource> dataSourceMap) throws SQLException {
this.dataSourceMap = dataSourceMap;
databaseType = createDatabaseType();
}
…
@Override
public final Connection getConnection(final String username, final String password) throws SQLException {
return getConnection();
}
@Override
public final void close() throws Exception {
close(dataSourceMap.keySet());
}
…
protected abstract RuntimeContext getRuntimeContext();
}
可以看到,這個抽象的DataSource適配器類狭郑,其內(nèi)部維護(hù)了一個真實DataSource(可以是數(shù)據(jù)庫驅(qū)動提供的腹暖,也可以是第三方數(shù)據(jù)庫連接)的map以及數(shù)據(jù)庫類型,另外定義一個抽象方法生成對應(yīng)的RuntimeContext翰萨。RuntimeContext作為各JDBC各資源對象間傳遞的一個上下文對象脏答,其定義了對應(yīng)規(guī)則、屬性缨历、數(shù)據(jù)庫類型以蕴、執(zhí)行引擎以及SQL解析引擎。org.apache.shardingsphere.shardingjdbc.jdbc.core.context.RuntimeContext
/**
* Runtime context.
*
* @param <T> type of rule
*/
public interface RuntimeContext<T extends BaseRule> extends AutoCloseable {
/**
* Get rule.
*
* @return rule
*/
T getRule();
/**
* Get properties.
*
* @return properties
*/
ConfigurationProperties getProperties();
/**
* Get database type.
*
* @return database type
*/
DatabaseType getDatabaseType();
/**
* Get execute engine.
*
* @return execute engine
*/
ExecutorEngine getExecutorEngine();
/**
* Get SQL parser engine.
*
* @return SQL parser engine
*/
SQLParserEngine getSqlParserEngine();
}
不同的功能都有對應(yīng)的RuntimeContext實現(xiàn)類辛孵,其的類層次圖為:
首先看下AbstractRuntimeContext
org.apache.shardingsphere.shardingjdbc.jdbc.core.context.AbstractRuntimeContext
/**
* Abstract runtime context.
*
* @param <T> type of rule
*/
@Getter
public abstract class AbstractRuntimeContext<T extends BaseRule> implements RuntimeContext<T> {
private final T rule;
private final ConfigurationProperties properties;
private final DatabaseType databaseType;
private final ExecutorEngine executorEngine;
private final SQLParserEngine sqlParserEngine;
protected AbstractRuntimeContext(final T rule, final Properties props, final DatabaseType databaseType) {
this.rule = rule;
properties = new ConfigurationProperties(null == props ? new Properties() : props);
this.databaseType = databaseType;
executorEngine = new ExecutorEngine(properties.<Integer>getValue(ConfigurationPropertyKey.EXECUTOR_SIZE));
sqlParserEngine = SQLParserEngineFactory.getSQLParserEngine(DatabaseTypes.getTrunkDatabaseTypeName(databaseType));
ConfigurationLogger.log(rule.getRuleConfiguration());
ConfigurationLogger.log(props);
}
protected abstract ShardingSphereMetaData getMetaData();
…
}
可以看到在構(gòu)造函數(shù)中丛肮,對執(zhí)行引擎、解析引擎等屬性進(jìn)行賦值與初始化魄缚,另外關(guān)鍵的是其定義了一個抽象方法getMetaData宝与,其返回ShardingSphereMetaData焚廊。各功能實現(xiàn)的RuntimeContex實現(xiàn)類的主要邏輯都為圍繞如果生成ShardingSphereMetaData對象。根據(jù)數(shù)據(jù)源是單個還是多個习劫,又分別定義了兩個抽象類SingleDataSourceRuntimeContext和MultipleDataSourcesRuntimeContext咆瘟,其操作就是根據(jù)應(yīng)用傳入的DataSource的map,然后生成DataSourceMetas與SchemaMetaData對象從而構(gòu)建ShardingSphereMetaData實例诽里,其中后者是需要真正的功能RuntimeContext類這種實現(xiàn)袒餐,例如在數(shù)據(jù)分片執(zhí)行上下文類中
/**
* Runtime context for sharding.
*/
@Getter
public final class ShardingRuntimeContext extends MultipleDataSourcesRuntimeContext<ShardingRule> {
private final CachedDatabaseMetaData cachedDatabaseMetaData;
private final ShardingTransactionManagerEngine shardingTransactionManagerEngine;
public ShardingRuntimeContext(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Properties props, final DatabaseType databaseType) throws SQLException {
super(dataSourceMap, shardingRule, props, databaseType);
cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSourceMap);// 創(chuàng)建緩存元數(shù)據(jù)
shardingTransactionManagerEngine = new ShardingTransactionManagerEngine();// 創(chuàng)建事務(wù)管理器
shardingTransactionManagerEngine.init(databaseType, dataSourceMap);
}
private CachedDatabaseMetaData createCachedDatabaseMetaData(final Map<String, DataSource> dataSourceMap) throws SQLException {
try (Connection connection = dataSourceMap.values().iterator().next().getConnection()) {
return new CachedDatabaseMetaData(connection.getMetaData());
}
}
@Override
protected SchemaMetaData loadSchemaMetaData(final Map<String, DataSource> dataSourceMap) throws SQLException {
int maxConnectionsSizePerQuery = getProperties().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
boolean isCheckingMetaData = getProperties().<Boolean>getValue(ConfigurationPropertyKey.CHECK_TABLE_METADATA_ENABLED);
SchemaMetaData result = new ShardingMetaDataLoader(dataSourceMap, getRule(), maxConnectionsSizePerQuery, isCheckingMetaData).load(getDatabaseType());
result = SchemaMetaDataDecorator.decorate(result, getRule(), new ShardingTableMetaDataDecorator());
if (!getRule().getEncryptRule().getEncryptTableNames().isEmpty()) {
result = SchemaMetaDataDecorator.decorate(result, getRule().getEncryptRule(), new EncryptTableMetaDataDecorator());
}
return result;
}
..
}
ShardingSphereMetaData類中定義了關(guān)于數(shù)據(jù)庫、表谤狡、索引的元數(shù)據(jù)灸眼,作為RuntimeContext的一部分,這些元數(shù)據(jù)在后續(xù)各引擎都會使用到墓懂。這些元數(shù)據(jù)類并不復(fù)雜焰宣,快速瀏覽下:
org.apache.shardingsphere.underlying.common.metadata.ShardingSphereMetaData
/**
* ShardingSphere meta data.
*/
@RequiredArgsConstructor
@Getter
public final class ShardingSphereMetaData {
private final DataSourceMetas dataSources;
private final SchemaMetaData schema;
}
org.apache.shardingsphere.underlying.common.metadata.datasource.DataSourceMetas
/**
* Data source metas.
*/
public final class DataSourceMetas {
private final Map<String, DataSourceMetaData> dataSourceMetaDataMap;
…
}
數(shù)據(jù)源元數(shù)據(jù)
org.apache.shardingsphere.spi.database.metadata.DataSourceMetaData
/**
* Data source meta data.
*/
public interface DataSourceMetaData {
/**
* Get host name.
*
* @return host name
*/
String getHostName();
/**
* Get port.
*
* @return port
*/
int getPort();
/**
* Get catalog.
*
* @return catalog
*/
String getCatalog();
/**
* Get schema.
*
* @return schema
*/
String getSchema();
}
Schema元數(shù)據(jù)org.apache.shardingsphere.sql.parser.binder.metadata.schema.SchemaMetaData
/**
* Schema meta data.
*/
public final class SchemaMetaData {
private final Map<String, TableMetaData> tables;
…
}
表元數(shù)據(jù)org.apache.shardingsphere.sql.parser.binder.metadata.table.TableMetaData
/**
* Table meta data.
*/
@Getter
@EqualsAndHashCode
@ToString
public final class TableMetaData {
private final Map<String, ColumnMetaData> columns;
private final Map<String, IndexMetaData> indexes;
…
}
列元數(shù)據(jù)org.apache.shardingsphere.sql.parser.binder.metadata.column.ColumnMetaData
/**
* Column meta data.
*/
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public class ColumnMetaData {
private final String name;
private final int dataType;
private final String dataTypeName;
private final boolean primaryKey;
private final boolean generated;
private final boolean caseSensitive;
}
索引元數(shù)據(jù)org.apache.shardingsphere.sql.parser.binder.metadata.index.IndexMetaData
/**
* Index meta data.
*/
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class IndexMetaData {
private final String name;
}
這些元數(shù)據(jù)類的加載分別有對應(yīng)的加載類SchemaMetaDataLoader、TableMetaDataLoader捕仔、ColumnMetaDataLoader匕积、IndexMetaDataLoader,其核心邏輯都是通過獲取數(shù)據(jù)庫連接Connection實例榜跌,然后通過其getMetaData()獲得DatabaseMetaData實例闪唆,然后調(diào)用getSchemas、getTables钓葫、getColums等方法拿到對應(yīng)的表與列信息苞氮。例如ColumnMetaDataLoader。
org.apache.shardingsphere.sql.parser.binder.metadata.column.ColumnMetaDataLoader
/**
* Column meta data loader.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ColumnMetaDataLoader {
private static final String COLUMN_NAME = "COLUMN_NAME";
private static final String DATA_TYPE = "DATA_TYPE";
private static final String TYPE_NAME = "TYPE_NAME";
/**
* Load column meta data list.
*
* @param connection connection
* @param table table name
* @param databaseType database type
* @return column meta data list
* @throws SQLException SQL exception
*/
public static Collection<ColumnMetaData> load(final Connection connection, final String table, final String databaseType) throws SQLException {
if (!isTableExist(connection, connection.getCatalog(), table, databaseType)) {
return Collections.emptyList();
}
Collection<ColumnMetaData> result = new LinkedList<>();
Collection<String> primaryKeys = loadPrimaryKeys(connection, table, databaseType);
List<String> columnNames = new ArrayList<>();
List<Integer> columnTypes = new ArrayList<>();
List<String> columnTypeNames = new ArrayList<>();
List<Boolean> isPrimaryKeys = new ArrayList<>();
List<Boolean> isCaseSensitives = new ArrayList<>();
try (ResultSet resultSet = connection.getMetaData().getColumns(connection.getCatalog(), JdbcUtil.getSchema(connection, databaseType), table, "%")) {
while (resultSet.next()) {
String columnName = resultSet.getString(COLUMN_NAME);
columnTypes.add(resultSet.getInt(DATA_TYPE));
columnTypeNames.add(resultSet.getString(TYPE_NAME));
isPrimaryKeys.add(primaryKeys.contains(columnName));
columnNames.add(columnName);
}
}
try (ResultSet resultSet = connection.createStatement().executeQuery(generateEmptyResultSQL(table, databaseType))) {
for (String each : columnNames) {
isCaseSensitives.add(resultSet.getMetaData().isCaseSensitive(resultSet.findColumn(each)));
}
}
for (int i = 0; i < columnNames.size(); i++) {
// TODO load auto generated from database meta data
result.add(new ColumnMetaData(columnNames.get(i), columnTypes.get(i), columnTypeNames.get(i), isPrimaryKeys.get(i), false, isCaseSensitives.get(i)));
}
return result;
}
…
}
回到功能對應(yīng)的DataSource最終實現(xiàn)類瓤逼,例如數(shù)據(jù)分片ShardingDataSource類
org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.ShardingDataSource
/**
* Sharding data source.
*/
@Getter
public class ShardingDataSource extends AbstractDataSourceAdapter {
private final ShardingRuntimeContext runtimeContext;
static {
NewInstanceServiceLoader.register(RouteDecorator.class);
NewInstanceServiceLoader.register(SQLRewriteContextDecorator.class);
NewInstanceServiceLoader.register(ResultProcessEngine.class);
}
public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Properties props) throws SQLException {
super(dataSourceMap);
checkDataSourceType(dataSourceMap);
runtimeContext = new ShardingRuntimeContext(dataSourceMap, shardingRule, props, getDatabaseType());
}
private void checkDataSourceType(final Map<String, DataSource> dataSourceMap) {
for (DataSource each : dataSourceMap.values()) {
Preconditions.checkArgument(!(each instanceof MasterSlaveDataSource), "Initialized data sources can not be master-slave data sources.");
}
}
@Override
public final ShardingConnection getConnection() {
return new ShardingConnection(getDataSourceMap(), runtimeContext, TransactionTypeHolder.get());
}
}
…
}
可以看到,在static代碼中注冊了該功能需要的路由裝飾器库物、SQL重寫上下文裝飾器以及結(jié)果處理引擎霸旗,該類實現(xiàn)了getConnection()方法。
Connection
在ShardingSphere中Connection的實現(xiàn)類層次圖:
與DataSource類似戚揭,ShardingSphere在Connection接口的實現(xiàn)中诱告,也是先定義了一個AbstractUnsupportedOperationConnection類,對于主從和數(shù)據(jù)分片民晒,又定義了一個抽象Connection適配器類精居。
/**
* Adapter for {@code Connection}.
*/
public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOperationConnection {
@Getter
private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
@Getter
private final ForceExecuteTemplate<Connection> forceExecuteTemplate = new ForceExecuteTemplate<>();
private final ForceExecuteTemplate<Entry<String, Connection>> forceExecuteTemplateForClose = new ForceExecuteTemplate<>();
private final RootInvokeHook rootInvokeHook = new SPIRootInvokeHook();
private boolean autoCommit = true;
private boolean readOnly;
private volatile boolean closed;
private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
protected AbstractConnectionAdapter() {
rootInvokeHook.start();
}
/**
* Get database connection.
*
* @param dataSourceName data source name
* @return database connection
* @throws SQLException SQL exception
*/
public final Connection getConnection(final String dataSourceName) throws SQLException {
return getConnections(ConnectionMode.MEMORY_STRICTLY, dataSourceName, 1).get(0);
}
/**
* Get database connections.
*
* @param connectionMode connection mode
* @param dataSourceName data source name
* @param connectionSize size of connection list to be get
* @return database connections
* @throws SQLException SQL exception
*/
public final List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
DataSource dataSource = getDataSourceMap().get(dataSourceName);
Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName);
Collection<Connection> connections;
synchronized (cachedConnections) {
connections = cachedConnections.get(dataSourceName);
}
List<Connection> result;
if (connections.size() >= connectionSize) {
result = new ArrayList<>(connections).subList(0, connectionSize);
} else if (!connections.isEmpty()) {
result = new ArrayList<>(connectionSize);
result.addAll(connections);
List<Connection> newConnections = createConnections(dataSourceName, connectionMode, dataSource, connectionSize - connections.size());
result.addAll(newConnections);
synchronized (cachedConnections) {
cachedConnections.putAll(dataSourceName, newConnections);
}
} else {
result = new ArrayList<>(createConnections(dataSourceName, connectionMode, dataSource, connectionSize));
synchronized (cachedConnections) {
cachedConnections.putAll(dataSourceName, result);
}
}
return result;
}
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
private List<Connection> createConnections(final String dataSourceName, final ConnectionMode connectionMode, final DataSource dataSource, final int connectionSize) throws SQLException {
if (1 == connectionSize) {
Connection connection = createConnection(dataSourceName, dataSource);
replayMethodsInvocation(connection);
return Collections.singletonList(connection);
}
if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {
return createConnections(dataSourceName, dataSource, connectionSize);
}
synchronized (dataSource) {
return createConnections(dataSourceName, dataSource, connectionSize);
}
}
…
}
可以看到該類內(nèi)部持有一個Multimap<String, Connection> cachedConnections,該類為guava中支持集合value的一個map潜必,類似Map<String,List<Connection>>靴姿,ShardingSphere選擇此類型Map,是因為同一數(shù)據(jù)源可能生成多個數(shù)據(jù)庫連接磁滚。
在getConnections方法中佛吓,根據(jù)cachedConnections已有的連接和目標(biāo)連接數(shù)宵晚,如果小于目標(biāo)連接數(shù),則創(chuàng)建相差的連接维雇。
值得注意的cachedConnections是否有必要加同步鎖淤刃?
一般而言,各數(shù)據(jù)庫驅(qū)動提供的Connection吱型、Statement并不是線程安全的逸贾,也就是說并不支持并發(fā)操作。通過查看該方法的調(diào)用鏈ShardingPreparedStatement.executeQuery-> PreparedStatementExecutor.init-> PreparedStatementExecutor .obtainExecuteGroups-> AbstractConnectionAdapter.getConnection津滞,可以看到如果并發(fā)執(zhí)行ShardingPreparedStatement.executeQuery方法就會產(chǎn)生對cachedConnections的操作铝侵,這里對cachedConnections加synchronized,應(yīng)該也是為了保證線程安全性据沈,雖然應(yīng)用這種用法其實并不正確哟沫。
這個類還有好多屬性設(shè)置方法,例如setReadOnly方法锌介,其中recordMethodInvocation方法是為了后續(xù)新建的連接都能通過回放已設(shè)置的屬性嗜诀,而forceExecuteTemplate.execute則將已創(chuàng)建的連接分別設(shè)置該屬性值。
public final void setReadOnly(final boolean readOnly) throws SQLException {
this.readOnly = readOnly;
recordMethodInvocation(Connection.class, "setReadOnly", new Class[]{boolean.class}, new Object[]{readOnly});
forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setReadOnly(readOnly));
}
看完Connection的抽象適配器類后孔祸,看下其功能實現(xiàn)類隆敢,數(shù)據(jù)分片ShardingConnection,主從MasterSlaveConnection
org.apache.shardingsphere.shardingjdbc.jdbc.core.connection.ShardingConnection
/**
* Connection that support sharding.
*/
@Getter
public final class ShardingConnection extends AbstractConnectionAdapter {
private final Map<String, DataSource> dataSourceMap;
private final ShardingRuntimeContext runtimeContext;
private final TransactionType transactionType;
private final ShardingTransactionManager shardingTransactionManager;
public ShardingConnection(final Map<String, DataSource> dataSourceMap, final ShardingRuntimeContext runtimeContext, final TransactionType transactionType) {
this.dataSourceMap = dataSourceMap;
this.runtimeContext = runtimeContext;
this.transactionType = transactionType;
shardingTransactionManager = runtimeContext.getShardingTransactionManagerEngine().getTransactionManager(transactionType);
}
/**
* Whether hold transaction or not.
*
* @return true or false
*/
public boolean isHoldTransaction() {
return (TransactionType.LOCAL == transactionType && !getAutoCommit()) || (TransactionType.XA == transactionType && isInShardingTransaction());
}
@Override
protected Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
return isInShardingTransaction() ? shardingTransactionManager.getConnection(dataSourceName) : dataSource.getConnection();
}
private boolean isInShardingTransaction() {
return null != shardingTransactionManager && shardingTransactionManager.isInTransaction();
}
@Override
public DatabaseMetaData getMetaData() {
return new ShardingDatabaseMetaData(this);
}
@Override
public PreparedStatement prepareStatement(final String sql) throws SQLException {
return new ShardingPreparedStatement(this, sql);
}
@Override
public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
return new ShardingPreparedStatement(this, sql, resultSetType, resultSetConcurrency);
}
...
@Override
public Statement createStatement() {
return new ShardingStatement(this);
}
@Override
public Statement createStatement(final int resultSetType, final int resultSetConcurrency) {
return new ShardingStatement(this, resultSetType, resultSetConcurrency);
}
…
}
可以看到prepareStatement方法返回的是ShardingPreparedStatement實例崔慧,createStatement方法返回的是ShardingStatement拂蝎。
另外createConnection方法(AbstractConnectionAdapter類會調(diào)用該方法創(chuàng)建Connection)中,判斷是否為分片事務(wù)惶室,如果是則通過分片事務(wù)管理器獲取連接温自。
org.apache.shardingsphere.shardingjdbc.jdbc.core.connection.MasterSlaveConnection
/**
* Connection that support master-slave.
*/
@RequiredArgsConstructor
@Getter
public final class MasterSlaveConnection extends AbstractConnectionAdapter {
private final Map<String, DataSource> dataSourceMap;
private final MasterSlaveRuntimeContext runtimeContext;
@Override
protected Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
return dataSource.getConnection();
}
@Override
public DatabaseMetaData getMetaData() {
return new MasterSlaveDatabaseMetaData(this);
}
@Override
public Statement createStatement() {
return new MasterSlaveStatement(this);
}
@Override
public Statement createStatement(final int resultSetType, final int resultSetConcurrency) {
return new MasterSlaveStatement(this, resultSetType, resultSetConcurrency);
}
…
}
相比ShardingConnection,MasterSlaveConnection類則簡單一些皇钞,因為不涉及分片事務(wù)悼泌,可以看到創(chuàng)建的Statement分別為MasterSlavePreparedStatement和MasterSlaveStatement。
Statement
接下來我們看下ShardingSphere中Statement的各個實現(xiàn)類夹界,類圖如下:
與DataSource與Connection接口實現(xiàn)類一樣馆里,分別定義了兩個抽象不支持類AbstractUnsupportedOperationStatement和AbstractUnsupportedOperationPreparedStatement,分別對Statement和PreparedStatement接口中不支持的方法進(jìn)行默認(rèn)實現(xiàn)(直接拋出SQLFeatureNotSupportedException異常)可柿。
org.apache.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationStatement
/**
* Unsupported {@code Statement} METHODS.
*/
public abstract class AbstractUnsupportedOperationStatement extends WrapperAdapter implements Statement {
@Override
public final int getFetchDirection() throws SQLException {
throw new SQLFeatureNotSupportedException("getFetchDirection");
}
@Override
public final void setFetchDirection(final int direction) throws SQLException {
throw new SQLFeatureNotSupportedException("setFetchDirection");
}
@Override
public final void addBatch(final String sql) throws SQLException {
throw new SQLFeatureNotSupportedException("addBatch sql");
}
@Override
public void clearBatch() throws SQLException {
throw new SQLFeatureNotSupportedException("clearBatch");
}
…
}
org.apache.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationPreparedStatement
**
* Unsupported {@code PreparedStatement} methods.
*/
public abstract class AbstractUnsupportedOperationPreparedStatement extends AbstractStatementAdapter implements PreparedStatement {
public AbstractUnsupportedOperationPreparedStatement() {
super(PreparedStatement.class);
}
@Override
public final ResultSetMetaData getMetaData() throws SQLException {
throw new SQLFeatureNotSupportedException("getMetaData");
}
/**
* Get parameter meta data.
*
* @return parameter metadata
* @throws SQLException SQL exception
*/
@Override
public ParameterMetaData getParameterMetaData() throws SQLException {
throw new SQLFeatureNotSupportedException("ParameterMetaData");
}
@Override
public final void setNString(final int parameterIndex, final String x) throws SQLException {
throw new SQLFeatureNotSupportedException("setNString");
}
…
}
這兩個類作為抽象父類鸠踪,只是提供了一個默認(rèn)實現(xiàn),在最終的【功能】實現(xiàn)類中复斥,根據(jù)功能邏輯對部分方法進(jìn)行了重寫實現(xiàn)营密。
來看下Statement分支的另外一個抽象類
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractStatementAdapter
/**
* Adapter for {@code Statement}.
*/
@RequiredArgsConstructor
public abstract class AbstractStatementAdapter extends AbstractUnsupportedOperationStatement {
private final Class<? extends Statement> targetClass;
private boolean closed;
private boolean poolable;
private int fetchSize;
private final ForceExecuteTemplate<Statement> forceExecuteTemplate = new ForceExecuteTemplate<>();
@SuppressWarnings("unchecked")
@Override
public final void close() throws SQLException {
closed = true;
try {
forceExecuteTemplate.execute((Collection) getRoutedStatements(), Statement::close);
} finally {
getRoutedStatements().clear();
}
}
@Override
public final boolean isClosed() {
return closed;
}
…
@Override
public final int getFetchSize() {
return fetchSize;
}
@SuppressWarnings("unchecked")
@Override
public final void setFetchSize(final int rows) throws SQLException {
this.fetchSize = rows;
recordMethodInvocation(targetClass, "setFetchSize", new Class[] {int.class}, new Object[] {rows});
forceExecuteTemplate.execute((Collection) getRoutedStatements(), statement -> statement.setFetchSize(rows));
}
…
@Override
public final int getUpdateCount() throws SQLException {
if (isAccumulate()) {
return accumulate();
} else {
Collection<? extends Statement> statements = getRoutedStatements();
if (statements.isEmpty()) {
return -1;
}
return getRoutedStatements().iterator().next().getUpdateCount();
}
}
private int accumulate() throws SQLException {
long result = 0;
boolean hasResult = false;
for (Statement each : getRoutedStatements()) {
int updateCount = each.getUpdateCount();
if (updateCount > -1) {
hasResult = true;
}
result += updateCount;
}
if (result > Integer.MAX_VALUE) {
result = Integer.MAX_VALUE;
}
return hasResult ? Long.valueOf(result).intValue() : -1;
}
…
protected abstract boolean isAccumulate();
protected abstract Collection<? extends Statement> getRoutedStatements();
}
從代碼中可以看到該類實現(xiàn)了close、poolable永票、fetchSize卵贱、maxFieldSize滥沫、maxRows岖常、queryTimeout的getter和setter方法暂筝,其實現(xiàn)邏輯是類似的,即依次設(shè)置其底層對應(yīng)的Statement的對應(yīng)方法聋丝,這里依然使用了recordMethodInvocation和forceExecuteTemplate.execute方法编振。
該類中定義了一個抽象方法getRoutedStatements()缀辩,該方法返回路由后對應(yīng)的真正底層的Statement實現(xiàn)類,不同功能中該方法實現(xiàn)不同踪央,例如數(shù)據(jù)分片ShardingPreparedStatement中就是路由的Statement臀玄,可能有多個,而主從畅蹂、加密健无、影子表中該方法則只有一個。
該類中另外一個抽象方法是isAccumulate()液斜,這個方法判斷是否需要進(jìn)行累計累贤,在getUpdateCount()方法中,會首先判斷isAccumulate()的值少漆,如果是true臼膏,則需要將getRoutedStatements()返回的各Statement的getUpdateCount()值進(jìn)行累計。此方法的子類實現(xiàn)中示损,數(shù)據(jù)分片中如果并非全部是廣播表時渗磅,此方法返回即為true,主從检访、加密始鱼、影子表
此方法返回則都為false。
在ShardingPreparedStatement和ShardingStatement類中
@Override
public boolean isAccumulate() {
return !connection.getRuntimeContext().getRule().isAllBroadcastTables(executionContext.getSqlStatementContext().getTablesContext().getTableNames());
}
接下來我們看下看下數(shù)據(jù)分片對應(yīng)的Statement實現(xiàn)類
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingStatement
**
* Statement that support sharding.
*/
public final class ShardingStatement extends AbstractStatementAdapter {
@Getter
private final ShardingConnection connection;
private final StatementExecutor statementExecutor;
private boolean returnGeneratedKeys;
private ExecutionContext executionContext;
private ResultSet currentResultSet;
public ShardingStatement(final ShardingConnection connection) {
this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
public ShardingStatement(final ShardingConnection connection, final int resultSetType, final int resultSetConcurrency) {
this(connection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
public ShardingStatement(final ShardingConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
super(Statement.class);
this.connection = connection;
statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, connection);
}
@Override
public ResultSet executeQuery(final String sql) throws SQLException {
if (Strings.isNullOrEmpty(sql)) {
throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
}
ResultSet result;
try {
executionContext = prepare(sql);
List<QueryResult> queryResults = statementExecutor.executeQuery();
MergedResult mergedResult = mergeQuery(queryResults);
result = new ShardingResultSet(statementExecutor.getResultSets(), mergedResult, this, executionContext);
} finally {
currentResultSet = null;
}
currentResultSet = result;
return result;
}
@Override
public int executeUpdate(final String sql) throws SQLException {
try {
executionContext = prepare(sql);
return statementExecutor.executeUpdate();
} finally {
currentResultSet = null;
}
}
…
private ExecutionContext prepare(final String sql) throws SQLException {
statementExecutor.clear();
ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
BasePrepareEngine prepareEngine = new SimpleQueryPrepareEngine(
runtimeContext.getRule().toRules(), runtimeContext.getProperties(), runtimeContext.getMetaData(), runtimeContext.getSqlParserEngine());
ExecutionContext result = prepareEngine.prepare(sql, Collections.emptyList());
statementExecutor.init(result);
statementExecutor.getStatements().forEach(this::replayMethodsInvocation);
return result;
}
private MergedResult mergeQuery(final List<QueryResult> queryResults) throws SQLException {
ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
MergeEngine mergeEngine = new MergeEngine(runtimeContext.getRule().toRules(), runtimeContext.getProperties(), runtimeContext.getDatabaseType(), runtimeContext.getMetaData().getSchema());
return mergeEngine.merge(queryResults, executionContext.getSqlStatementContext());
}
…
}
在該類的SQL執(zhí)行方法executeQuery脆贵、executeUpdate风响、execute方法中可以看到,其內(nèi)部先執(zhí)行了prepare方法丹禀,然后調(diào)用了StatementExecutor的executeQuery、executeUpdate鞋怀、execute方法双泪。
在prepare方法中,首先通過SimpleQueryPrepareEngine.prepare方法完成SQL的解析密似、路由計算焙矛、SQL改寫操作(這里已經(jīng)屬于內(nèi)核引擎部分,在總覽篇中已介紹残腌,本篇就不重復(fù)展開)村斟,然后對StatementExecutor進(jìn)行初始化以及底層Statement的方法回放(例如setFetchSize贫导、setQueryTimeout)。
真正的SQL執(zhí)行其實是由StatementExecutor完成的蟆盹,看下StatementExecutor的類層次圖:
org.apache.shardingsphere.shardingjdbc.executor.AbstractStatementExecutor
/**
* Abstract statement executor.
*/
@Getter
public abstract class AbstractStatementExecutor {
private final DatabaseType databaseType;
private final int resultSetType;
private final int resultSetConcurrency;
private final int resultSetHoldability;
private final ShardingConnection connection;
private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;
private final SQLExecuteTemplate sqlExecuteTemplate;
private final Collection<Connection> connections = new LinkedList<>();
private final List<List<Object>> parameterSets = new LinkedList<>();
private final List<Statement> statements = new LinkedList<>();
private final List<ResultSet> resultSets = new CopyOnWriteArrayList<>();
private final Collection<InputGroup<StatementExecuteUnit>> inputGroups = new LinkedList<>();
@Setter
private SQLStatementContext sqlStatementContext;
public AbstractStatementExecutor(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final ShardingConnection shardingConnection) {
this.databaseType = shardingConnection.getRuntimeContext().getDatabaseType();
this.resultSetType = resultSetType;
this.resultSetConcurrency = resultSetConcurrency;
this.resultSetHoldability = resultSetHoldability;
this.connection = shardingConnection;
int maxConnectionsSizePerQuery = connection.getRuntimeContext().getProperties().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
ExecutorEngine executorEngine = connection.getRuntimeContext().getExecutorEngine();
sqlExecutePrepareTemplate = new SQLExecutePrepareTemplate(maxConnectionsSizePerQuery);
sqlExecuteTemplate = new SQLExecuteTemplate(executorEngine, connection.isHoldTransaction());
}
…
protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException {
List<T> result = sqlExecuteTemplate.execute((Collection) inputGroups, executeCallback);// 對輸入的分組對象孩灯,執(zhí)行指定的回調(diào)操作
refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);// 刷新元數(shù)據(jù), 因為DDL會修改元數(shù)據(jù)信息
return result;
}
…
private void refreshMetaDataIfNeeded(final ShardingRuntimeContext runtimeContext, final SQLStatementContext sqlStatementContext) throws SQLException {
if (null == sqlStatementContext) {
return;
}
if (sqlStatementContext instanceof CreateTableStatementContext) {
refreshTableMetaData(runtimeContext, ((CreateTableStatementContext) sqlStatementContext).getSqlStatement());
} else if (sqlStatementContext instanceof AlterTableStatementContext) {
refreshTableMetaData(runtimeContext, ((AlterTableStatementContext) sqlStatementContext).getSqlStatement());
} else if (sqlStatementContext instanceof DropTableStatementContext) {
refreshTableMetaData(runtimeContext, ((DropTableStatementContext) sqlStatementContext).getSqlStatement());
} else if (sqlStatementContext instanceof CreateIndexStatementContext) {
refreshTableMetaData(runtimeContext, ((CreateIndexStatementContext) sqlStatementContext).getSqlStatement());
} else if (sqlStatementContext instanceof DropIndexStatementContext) {
refreshTableMetaData(runtimeContext, ((DropIndexStatementContext) sqlStatementContext).getSqlStatement());
}
}
private void refreshTableMetaData(final ShardingRuntimeContext runtimeContext, final CreateTableStatement createTableStatement) throws SQLException {
String tableName = createTableStatement.getTable().getTableName().getIdentifier().getValue();
runtimeContext.getMetaData().getSchema().put(tableName, loadTableMeta(tableName, databaseType));
}
….
}
AbstractStatementExecutor是各類StatementExecutor的抽象類逾滥,它包含了Statement執(zhí)行的各種信息峰档,包括resultSetType、resultSetConcurrency寨昙、resultSetHoldability讥巡,另外也持有ShardingSphere的Connection實例、路由后的真實Statement列表舔哪,參數(shù)集合欢顷、執(zhí)行后的結(jié)果集、執(zhí)行分組信息捉蚤、以及執(zhí)行準(zhǔn)備引擎抬驴、執(zhí)行引擎。
在executeCallback 方法中外里,可以看到其內(nèi)部是通過SQLExecuteTemplate來真正完成SQL的執(zhí)行怎爵,SQLExecuteTemplate是執(zhí)行引擎的關(guān)鍵類,已在執(zhí)行引擎篇進(jìn)行了分析盅蝗,這里就不展開鳖链;之后調(diào)用了refreshMetaDataIfNeeded方法,對元數(shù)據(jù)進(jìn)行了刷新墩莫,因為當(dāng)執(zhí)行DDL語句時芙委,表名、列名狂秦、字段類型灌侣、主鍵以及索引等信息可能發(fā)生了變化,因此需要同步對元數(shù)據(jù)進(jìn)行修改裂问。這里的元數(shù)據(jù)指的就是RuntimeContext中的ShardingSphereMetaData屬性侧啼,該類實例封裝了ShardingSphere所需分庫分表等各類元數(shù)據(jù),在各環(huán)節(jié)堪簿、引擎之間傳遞痊乾,提供所需的各類基礎(chǔ)信息。
org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor
/**
* Prepared statement executor.
*/
public final class PreparedStatementExecutor extends AbstractStatementExecutor {
@Getter
private final boolean returnGeneratedKeys;
public PreparedStatementExecutor(
final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys, final ShardingConnection shardingConnection) {
super(resultSetType, resultSetConcurrency, resultSetHoldability, shardingConnection);
this.returnGeneratedKeys = returnGeneratedKeys;
}
/**
* Initialize executor.
*
* @param executionContext execution context
* @throws SQLException SQL exception
*/
public void init(final ExecutionContext executionContext) throws SQLException {
setSqlStatementContext(executionContext.getSqlStatementContext());
getInputGroups().addAll(obtainExecuteGroups(executionContext.getExecutionUnits()));// 生成執(zhí)行分組
cacheStatements();
}
private Collection<InputGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<ExecutionUnit> executionUnits) throws SQLException {
return getSqlExecutePrepareTemplate().getExecuteUnitGroups(executionUnits, new SQLExecutePrepareCallback() {
@Override
// 在指定數(shù)據(jù)源上創(chuàng)建要求數(shù)量的數(shù)據(jù)庫連接
public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);
}
@Override
//根據(jù)執(zhí)行單元信息 創(chuàng)建Statement執(zhí)行單元對象
public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final ExecutionUnit executionUnit, final ConnectionMode connectionMode) throws SQLException {
return new StatementExecuteUnit(executionUnit, createPreparedStatement(connection, executionUnit.getSqlUnit().getSql()), connectionMode);
}
});
}
@SuppressWarnings("MagicConstant")
private PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException {
return returnGeneratedKeys ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
: connection.prepareStatement(sql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability());
}
/**
* Execute query.
*
* @return result set list
* @throws SQLException SQL exception
*/
public List<QueryResult> executeQuery() throws SQLException {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
@Override
// 在指定的Statement上執(zhí)行SQL椭更,將JDBC結(jié)果集包裝成查詢QueryResult對象(基于流模式哪审、基于內(nèi)存模式兩類)
protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
return getQueryResult(statement, connectionMode);
}
};
return executeCallback(executeCallback);// 通過executeCallback操作
}
…
}
可以看到在最終的StatementExecutor實現(xiàn)類中,實現(xiàn)了executeQuery虑瀑、executeUpdate湿滓、execute方法滴须,這些方法中定義SQLExecuteCallback實例,然后由父類中executeCallback完成最后的SQL執(zhí)行叽奥。
在該類中完成的關(guān)鍵操作是init方法扔水,其生成了執(zhí)行分組信息,同時調(diào)用父類中cacheStatements方法而线,將底層Statement和參數(shù)進(jìn)行了記錄铭污。
回到Statement實現(xiàn)類,簡單看下主從功能對應(yīng)的
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.MasterSlaveStatement
/**
* Statement that support master-slave.
*/
@Getter
public final class MasterSlaveStatement extends AbstractStatementAdapter {
private final MasterSlaveConnection connection;
@Getter(AccessLevel.NONE)
private final DataNodeRouter dataNodeRouter;
private final int resultSetType;
private final int resultSetConcurrency;
private final int resultSetHoldability;
private final Collection<Statement> routedStatements = new LinkedList<>();
public MasterSlaveStatement(final MasterSlaveConnection connection) {
this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
public MasterSlaveStatement(final MasterSlaveConnection connection, final int resultSetType, final int resultSetConcurrency) {
this(connection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
public MasterSlaveStatement(final MasterSlaveConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
super(Statement.class);
this.connection = connection;
dataNodeRouter = new DataNodeRouter(connection.getRuntimeContext().getMetaData(), connection.getRuntimeContext().getProperties(), connection.getRuntimeContext().getSqlParserEngine());
this.resultSetType = resultSetType;
this.resultSetConcurrency = resultSetConcurrency;
this.resultSetHoldability = resultSetHoldability;
}
@Override
public ResultSet executeQuery(final String sql) throws SQLException {
if (Strings.isNullOrEmpty(sql)) {
throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
}
clearPrevious();
MasterSlaveRuntimeContext runtimeContext = connection.getRuntimeContext();
SimpleQueryPrepareEngine prepareEngine = new SimpleQueryPrepareEngine(
Collections.singletonList(runtimeContext.getRule()), runtimeContext.getProperties(), runtimeContext.getMetaData(), runtimeContext.getSqlParserEngine());
ExecutionContext executionContext = prepareEngine.prepare(sql, Collections.emptyList());
ExecutionUnit executionUnit = executionContext.getExecutionUnits().iterator().next();
Preconditions.checkState(1 == executionContext.getExecutionUnits().size(), "Cannot support executeQuery for DML or DDL");
Statement statement = connection.getConnection(executionUnit.getDataSourceName()).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
return statement.executeQuery(executionUnit.getSqlUnit().getSql());
}
@Override
public int executeUpdate(final String sql) throws SQLException {
clearPrevious();
int result = 0;
MasterSlaveRuntimeContext runtimeContext = connection.getRuntimeContext();
RouteContext routeContext = dataNodeRouter.route(sql, Collections.emptyList(), false);
routeContext = new MasterSlaveRouteDecorator().decorate(routeContext, runtimeContext.getMetaData(), runtimeContext.getRule(), runtimeContext.getProperties());
for (RouteUnit each : routeContext.getRouteResult().getRouteUnits()) {
Statement statement = connection.getConnection(each.getDataSourceMapper().getActualName()).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result += statement.executeUpdate(sql);
}
return result;
}
…
}
可以看到膀篮,與ShardingStatement類似嘹狞,executeQuery方法中,也是通過SimpleQueryPrepareEngine進(jìn)行prepare操作誓竿,不過與ShardingStatement中通過StatementExecutor創(chuàng)建Statement磅网,執(zhí)行SQL不同,這里是直接調(diào)用MasterSlaveConnection. getConnection拿到Connection筷屡,然后直接調(diào)用createStatement方法進(jìn)行Statement涧偷,進(jìn)而執(zhí)行SQL的。
另外在其executeUpdate方法中毙死,也不像ShardingStatement中統(tǒng)一都使用SimpleQueryPrepareEngine進(jìn)行prepare操作燎潮,而是,直接就操作了dataNodeRouter. route方法扼倘,和MasterSlaveRouteDecorator. decorate方法确封,這種不一致顯然不如ShardingStatement更加容易讓人理解。
PreparedStatement
接下來看下PreparedStatement分支再菊,首先看下其抽象類
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractShardingPreparedStatementAdapter
/**
* Sharding adapter for {@code PreparedStatement}.
*/
public abstract class AbstractShardingPreparedStatementAdapter extends AbstractUnsupportedOperationPreparedStatement {
private final List<SetParameterMethodInvocation> setParameterMethodInvocations = new LinkedList<>();
@Getter
private final List<Object> parameters = new ArrayList<>();
@Override
public final void setNull(final int parameterIndex, final int sqlType) {
setParameter(parameterIndex, null);
}
@Override
public final void setNull(final int parameterIndex, final int sqlType, final String typeName) {
setParameter(parameterIndex, null);
}
…
private void setParameter(final int parameterIndex, final Object value) {
if (parameters.size() == parameterIndex - 1) {
parameters.add(value);
return;
}
for (int i = parameters.size(); i <= parameterIndex - 1; i++) {
parameters.add(null);
}
parameters.set(parameterIndex - 1, value);
}
protected final void replaySetParameter(final PreparedStatement preparedStatement, final List<Object> parameters) {
setParameterMethodInvocations.clear();
addParameters(parameters);
for (SetParameterMethodInvocation each : setParameterMethodInvocations) {
each.invoke(preparedStatement);
}
}
private void addParameters(final List<Object> parameters) {
int i = 0;
for (Object each : parameters) {
setParameters(new Class[]{int.class, Object.class}, i++ + 1, each);
}
}
@SneakyThrows
private void setParameters(final Class[] argumentTypes, final Object... arguments) {
setParameterMethodInvocations.add(new SetParameterMethodInvocation(PreparedStatement.class.getMethod("setObject", argumentTypes), arguments, arguments[1]));
}
@Override
public final void clearParameters() {
parameters.clear();
setParameterMethodInvocations.clear();
}
}
可以看到該類實現(xiàn)了PreparedStatement接口的各種參數(shù)設(shè)置方法爪喘,其parameters記錄了設(shè)置的各參數(shù),setParameterMethodInvocations屬性記錄了參數(shù)設(shè)置方法調(diào)用纠拔。
而在其子類中秉剑,org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement
/**
* PreparedStatement that support sharding.
*/
public final class ShardingPreparedStatement extends AbstractShardingPreparedStatementAdapter {
@Getter
private final ShardingConnection connection;
private final String sql;
@Getter
private final ParameterMetaData parameterMetaData;
private final BasePrepareEngine prepareEngine;
private final PreparedStatementExecutor preparedStatementExecutor;
private final BatchPreparedStatementExecutor batchPreparedStatementExecutor;
private final Collection<Comparable<?>> generatedValues = new LinkedList<>();
private ExecutionContext executionContext;
private ResultSet currentResultSet;
public ShardingPreparedStatement(final ShardingConnection connection, final String sql) throws SQLException {
this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false);
}
public ShardingPreparedStatement(final ShardingConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
this(connection, sql, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT, false);
}
public ShardingPreparedStatement(final ShardingConnection connection, final String sql, final int autoGeneratedKeys) throws SQLException {
this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, Statement.RETURN_GENERATED_KEYS == autoGeneratedKeys);
}
public ShardingPreparedStatement(
final ShardingConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
this(connection, sql, resultSetType, resultSetConcurrency, resultSetHoldability, false);
}
private ShardingPreparedStatement(final ShardingConnection connection, final String sql,
final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys) throws SQLException {
if (Strings.isNullOrEmpty(sql)) {
throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
}
this.connection = connection;
this.sql = sql;
ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
parameterMetaData = new ShardingParameterMetaData(runtimeContext.getSqlParserEngine(), sql);
prepareEngine = new PreparedQueryPrepareEngine(runtimeContext.getRule().toRules(), runtimeContext.getProperties(), runtimeContext.getMetaData(), runtimeContext.getSqlParserEngine());
preparedStatementExecutor = new PreparedStatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, returnGeneratedKeys, connection);
batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, returnGeneratedKeys, connection);
}
@Override
public ResultSet executeQuery() throws SQLException {
ResultSet result;
try {
clearPrevious();
prepare();
initPreparedStatementExecutor();
MergedResult mergedResult = mergeQuery(preparedStatementExecutor.executeQuery());
result = new ShardingResultSet(preparedStatementExecutor.getResultSets(), mergedResult, this, executionContext);
} finally {
clearBatch();
}
currentResultSet = result;
return result;
}
@Override
public int executeUpdate() throws SQLException {
try {
clearPrevious();
prepare();
initPreparedStatementExecutor();
return preparedStatementExecutor.executeUpdate();
} finally {
clearBatch();
}
}
…
private void initPreparedStatementExecutor() throws SQLException {
preparedStatementExecutor.init(executionContext);
setParametersForStatements();
replayMethodForStatements();
}
private void setParametersForStatements() {
for (int i = 0; i < preparedStatementExecutor.getStatements().size(); i++) {
replaySetParameter((PreparedStatement) preparedStatementExecutor.getStatements().get(i), preparedStatementExecutor.getParameterSets().get(i));
}
}
private void replayMethodForStatements() {
for (Statement each : preparedStatementExecutor.getStatements()) {
replayMethodsInvocation(each);
}
}
…
}
可以看到,該類中實現(xiàn)了executeQuery稠诲、executeUpdate侦鹏、execute方法,這幾個方法的 內(nèi)部又分別調(diào)用clearPrevious、prepare臀叙、 initPreparedStatementExecutor以及PreparedStatementExecutor中對應(yīng)的execute*方法种柑。
- clearPrevious方法即清空該PreparedStatement之前執(zhí)行的參數(shù)、連接匹耕、結(jié)果集、分組信息以及Statement對象荠雕;
- prepare方法是通過調(diào)用PreparedQueryPrepareEngine#prepare稳其,其內(nèi)部實現(xiàn)邏輯在總覽篇和引擎篇已進(jìn)行了分析驶赏,這里不重復(fù)展開。
- initPreparedStatementExecutor方法則獲取底層真實的PreparedStatement既鞠,然后設(shè)置對應(yīng)的參數(shù)煤傍。
- PreparedStatementExecutor的execute*方法,前面已介紹嘱蛋。
除了參數(shù)設(shè)置蚯姆,與ShardingStatement相比,ShardingPreparedStatement還支持批量操作洒敏,即JDBC中的addBatch龄恋、executeBatch方法。
看下ShardingPreparedStatement中這兩批量操作方法的實現(xiàn)凶伙。
@Override
public void addBatch() {
try {
prepare();
batchPreparedStatementExecutor.addBatchForRouteUnits(executionContext);
} finally {
currentResultSet = null;
clearParameters();
}
}
@Override
public int[] executeBatch() throws SQLException {
try {
initBatchPreparedStatementExecutor();
return batchPreparedStatementExecutor.executeBatch();
} finally {
clearBatch();
}
}
private void initBatchPreparedStatementExecutor() throws SQLException {
batchPreparedStatementExecutor.init(executionContext.getSqlStatementContext());
setBatchParametersForStatements();
}
private void setBatchParametersForStatements() throws SQLException {
for (Statement each : batchPreparedStatementExecutor.getStatements()) {
List<List<Object>> parameterSet = batchPreparedStatementExecutor.getParameterSet(each);
for (List<Object> parameters : parameterSet) {
replaySetParameter((PreparedStatement) each, parameters);
((PreparedStatement) each).addBatch();
}
}
}
@Override
public void clearBatch() throws SQLException {
currentResultSet = null;
batchPreparedStatementExecutor.clear();
clearParameters();
}
可以看到郭毕,addBatch方法在調(diào)用prepare方法后,又調(diào)用了BatchPreparedStatementExecutor#addBatchForRouteUnits(executionContext)函荣,而executeBatch方法則調(diào)用了BatchPreparedStatementExecutor#init和executeBatch方法显押,setBatchParametersForStatements方法則獲取到BatchPreparedStatementExecutor內(nèi)部的Statement和參數(shù)集合后,調(diào)用底層PreparedStatement#addBatch方法傻挂,添加批量參數(shù)乘碑。接下來看下BatchPreparedStatementExecutor類。
org.apache.shardingsphere.shardingjdbc.executor.batch.BatchPreparedStatementExecutor
/**
* Prepared statement executor to process add batch.
*/
public final class BatchPreparedStatementExecutor extends AbstractStatementExecutor {
private final Collection<BatchRouteUnit> routeUnits = new LinkedList<>();
@Getter
private final boolean returnGeneratedKeys;
private int batchCount;
public BatchPreparedStatementExecutor(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys,
final ShardingConnection shardingConnection) {
super(resultSetType, resultSetConcurrency, resultSetHoldability, shardingConnection);
this.returnGeneratedKeys = returnGeneratedKeys;
}
/**
* Initialize executor.
*
* @param sqlStatementContext SQL statement context
* @throws SQLException SQL exception
*/
public void init(final SQLStatementContext sqlStatementContext) throws SQLException {
setSqlStatementContext(sqlStatementContext);
getInputGroups().addAll(obtainExecuteGroups(routeUnits));
}
…
/**
* Add batch for route units.
*
* @param executionContext execution context
*/
public void addBatchForRouteUnits(final ExecutionContext executionContext) {
handleOldBatchRouteUnits(createBatchRouteUnits(executionContext.getExecutionUnits()));
handleNewBatchRouteUnits(createBatchRouteUnits(executionContext.getExecutionUnits()));
batchCount++;
}
private Collection<BatchRouteUnit> createBatchRouteUnits(final Collection<ExecutionUnit> executionUnits) {
Collection<BatchRouteUnit> result = new LinkedList<>();
for (ExecutionUnit each : executionUnits) {
result.add(new BatchRouteUnit(each));
}
return result;
}
private void handleOldBatchRouteUnits(final Collection<BatchRouteUnit> newRouteUnits) {
for (BatchRouteUnit each : newRouteUnits) {
for (BatchRouteUnit unit : routeUnits) {
if (unit.equals(each)) {
reviseBatchRouteUnit(unit, each);
}
}
}
}
private void reviseBatchRouteUnit(final BatchRouteUnit oldBatchRouteUnit, final BatchRouteUnit newBatchRouteUnit) {
oldBatchRouteUnit.getExecutionUnit().getSqlUnit().getParameters().addAll(newBatchRouteUnit.getExecutionUnit().getSqlUnit().getParameters());
oldBatchRouteUnit.mapAddBatchCount(batchCount);
}
private void handleNewBatchRouteUnits(final Collection<BatchRouteUnit> newRouteUnits) {
newRouteUnits.removeAll(routeUnits);
for (BatchRouteUnit each : newRouteUnits) {
each.mapAddBatchCount(batchCount);
}
routeUnits.addAll(newRouteUnits);
}
…
/**
* Execute batch.
*
* @return execute results
* @throws SQLException SQL exception
*/
public int[] executeBatch() throws SQLException {
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
SQLExecuteCallback<int[]> callback = new SQLExecuteCallback<int[]>(getDatabaseType(), isExceptionThrown) {
@Override
protected int[] executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
return statement.executeBatch();
}
};
List<int[]> results = executeCallback(callback);
if (isAccumulate()) {
return accumulate(results);
} else {
return results.get(0);
}
}
…
}
可以看到在addBatchForRouteUnits方法中金拒,handleOldBatchRouteUnits和
handleNewBatchRouteUnits這兩個方法其實就是按照路由單元進(jìn)行合并批量執(zhí)行的參數(shù)兽肤;而在init方法中則生成了執(zhí)行分組信息;而executeBatch依然是通過父類中executeCallback方法按照分組殖蚕,依次調(diào)用底層PreparedStatement的executeBatch進(jìn)行SQL的執(zhí)行轿衔。
ResultSet
ResultSet接口表示數(shù)據(jù)庫結(jié)果集(通常由Statement執(zhí)行完查詢SQL后得到),它包含了一個類似表格的數(shù)據(jù)睦疫,內(nèi)部維護(hù)了一個游標(biāo)害驹,訪問完一條記錄的各列后,可以通過next()方法將游標(biāo)指向下一行數(shù)據(jù)蛤育,直到訪問到最后一條數(shù)據(jù)為止宛官。
同樣,ResultSet的實現(xiàn)類中瓦糕,也定義了幾個抽象的UnSupported類底洗,AbstractUnsupportedUpdateOperationResultSet實現(xiàn)了ResultSet中更新類的方法,目前在ShardingSphere中不支持ResultSet接口中update*方法咕娄,AbstractUnsupportedOperationResultSet實現(xiàn)了其它操作類方法亥揖,AbstractUnsupportedDatabaseMetaDataResultSet則定義了DatabaseMetaData不支持的方法(ShardingDatabaseMetaData中g(shù)etSchemas、getColumns等方法的返回值),AbstractUnsupportedGeneratedKeysResultSet定義了GeneratedKeys不支持方法(Statement中g(shù)etGeneratedKeys()返回值)费变。
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractResultSetAdapter
/**
* Adapter for {@code ResultSet}.
*/
public abstract class AbstractResultSetAdapter extends AbstractUnsupportedOperationResultSet {
@Getter
private final List<ResultSet> resultSets;
@Getter
private final Statement statement;
private boolean closed;
private final ForceExecuteTemplate<ResultSet> forceExecuteTemplate = new ForceExecuteTemplate<>();
@Getter
private final ExecutionContext executionContext;
public AbstractResultSetAdapter(final List<ResultSet> resultSets, final Statement statement, final ExecutionContext executionContext) {
Preconditions.checkArgument(!resultSets.isEmpty());
this.resultSets = resultSets;
this.statement = statement;
this.executionContext = executionContext;
}
@Override
public final ResultSetMetaData getMetaData() throws SQLException {
return new ShardingResultSetMetaData(resultSets.get(0).getMetaData(), getShardingRule(), executionContext.getSqlStatementContext());
}
private ShardingRule getShardingRule() {
ShardingConnection connection = statement instanceof ShardingPreparedStatement ? ((ShardingPreparedStatement) statement).getConnection() : ((ShardingStatement) statement).getConnection();
return connection.getRuntimeContext().getRule();
}
@Override
public final int findColumn(final String columnLabel) throws SQLException {
return resultSets.get(0).findColumn(columnLabel);
}
…
}
可以看到大部分方法實現(xiàn)摧扇,是通過第一個底層ResultSet,調(diào)用其對應(yīng)方法獲得挚歧,這些屬性的設(shè)置扛稽,也依次對各個底層ResultSet執(zhí)行對應(yīng)設(shè)置方法即可。
看下數(shù)據(jù)分片對應(yīng)的ShardingResultSet類
org.apache.shardingsphere.shardingjdbc.jdbc.core.resultset.ShardingResultSet
/**
* Result that support sharding.
*/
public final class ShardingResultSet extends AbstractResultSetAdapter {
private final MergedResult mergeResultSet;
private final Map<String, Integer> columnLabelAndIndexMap;
public ShardingResultSet(final List<ResultSet> resultSets, final MergedResult mergeResultSet, final Statement statement, final ExecutionContext executionContext) throws SQLException {
super(resultSets, statement, executionContext);
this.mergeResultSet = mergeResultSet;
columnLabelAndIndexMap = createColumnLabelAndIndexMap(resultSets.get(0).getMetaData());
}
private Map<String, Integer> createColumnLabelAndIndexMap(final ResultSetMetaData resultSetMetaData) throws SQLException {
Map<String, Integer> result = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
for (int columnIndex = resultSetMetaData.getColumnCount(); columnIndex > 0; columnIndex--) {
result.put(resultSetMetaData.getColumnLabel(columnIndex), columnIndex);
}
return result;
}
@Override
public boolean next() throws SQLException {
return mergeResultSet.next();
}
@Override
public boolean wasNull() throws SQLException {
return mergeResultSet.wasNull();
}
@Override
public boolean getBoolean(final int columnIndex) throws SQLException {
return (boolean) ResultSetUtil.convertValue(mergeResultSet.getValue(columnIndex, boolean.class), boolean.class);
}
@Override
public boolean getBoolean(final String columnLabel) throws SQLException {
int columnIndex = columnLabelAndIndexMap.get(columnLabel);
return (boolean) ResultSetUtil.convertValue(mergeResultSet.getValue(columnIndex, boolean.class), boolean.class);
}
…
}
可以看到各方法調(diào)用的其實是MergedResult#getValue獲取指定的列的值滑负。
ResultSet作為JDBC的一個通用接口在张,不僅作為SQL查詢返回的結(jié)果標(biāo)識,還用在了很多多條記錄表示的場景矮慕,例如在DatabaseMetaData帮匾、Statement的自增生成鍵結(jié)果等。
org.apache.shardingsphere.shardingjdbc.jdbc.core.resultset.DatabaseMetaDataResultSet
/**
* Database meta data result set.
*/
public final class DatabaseMetaDataResultSet<T extends BaseRule> extends AbstractUnsupportedDatabaseMetaDataResultSet {
private static final String TABLE_NAME = "TABLE_NAME";
private static final String INDEX_NAME = "INDEX_NAME";
private final int type;
private final int concurrency;
private final T rule;
private final ResultSetMetaData resultSetMetaData;
private final Map<String, Integer> columnLabelIndexMap;
private final Iterator<DatabaseMetaDataObject> databaseMetaDataObjectIterator;
private volatile boolean closed;
private DatabaseMetaDataObject currentDatabaseMetaDataObject;
public DatabaseMetaDataResultSet(final ResultSet resultSet, final T rule) throws SQLException {
this.type = resultSet.getType();
this.concurrency = resultSet.getConcurrency();
this.rule = rule;
this.resultSetMetaData = resultSet.getMetaData();
this.columnLabelIndexMap = initIndexMap();
this.databaseMetaDataObjectIterator = initIterator(resultSet);
}
private Map<String, Integer> initIndexMap() throws SQLException {
Map<String, Integer> result = new HashMap<>(resultSetMetaData.getColumnCount());
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
result.put(resultSetMetaData.getColumnLabel(i), i);
}
return result;
}
private Iterator<DatabaseMetaDataObject> initIterator(final ResultSet resultSet) throws SQLException {
LinkedList<DatabaseMetaDataObject> result = new LinkedList<>();
Set<DatabaseMetaDataObject> removeDuplicationSet = new HashSet<>();
int tableNameColumnIndex = columnLabelIndexMap.getOrDefault(TABLE_NAME, -1);
int indexNameColumnIndex = columnLabelIndexMap.getOrDefault(INDEX_NAME, -1);
while (resultSet.next()) {
DatabaseMetaDataObject databaseMetaDataObject = generateDatabaseMetaDataObject(tableNameColumnIndex, indexNameColumnIndex, resultSet);
if (!removeDuplicationSet.contains(databaseMetaDataObject)) {
result.add(databaseMetaDataObject);
removeDuplicationSet.add(databaseMetaDataObject);
}
}
return result.iterator();
}
private DatabaseMetaDataObject generateDatabaseMetaDataObject(final int tableNameColumnIndex, final int indexNameColumnIndex, final ResultSet resultSet) throws SQLException {
DatabaseMetaDataObject result = new DatabaseMetaDataObject(resultSetMetaData.getColumnCount());
for (int i = 1; i <= columnLabelIndexMap.size(); i++) {
if (tableNameColumnIndex == i) {
String tableName = resultSet.getString(i);
Collection<String> logicTableNames = rule instanceof ShardingRule ? ((ShardingRule) rule).getLogicTableNames(tableName) : Collections.emptyList();
result.addObject(logicTableNames.isEmpty() ? tableName : logicTableNames.iterator().next());
} else if (indexNameColumnIndex == i) {
String tableName = resultSet.getString(tableNameColumnIndex);
String indexName = resultSet.getString(i);
result.addObject(null != indexName && indexName.endsWith(tableName) ? indexName.substring(0, indexName.indexOf(tableName) - 1) : indexName);
} else {
result.addObject(resultSet.getObject(i));
}
}
return result;
}
@Override
public boolean next() throws SQLException {
checkClosed();
if (databaseMetaDataObjectIterator.hasNext()) {
currentDatabaseMetaDataObject = databaseMetaDataObjectIterator.next();
return true;
}
return false;
}
@Override
public void close() throws SQLException {
checkClosed();
closed = true;
}
@Override
public boolean wasNull() throws SQLException {
checkClosed();
return false;
}
@Override
public String getString(final int columnIndex) throws SQLException {
checkClosed();
checkColumnIndex(columnIndex);
return (String) ResultSetUtil.convertValue(currentDatabaseMetaDataObject.getObject(columnIndex), String.class);
}
…
}
這些ResultSet的實現(xiàn)類凡傅,其構(gòu)造函數(shù)都會傳入一個底層ResultSet或者迭代器Iterator辟狈,next()方法其實就是通過調(diào)用內(nèi)部ResultSet或者Iterator的nex()完成游標(biāo)的遷移,而get方法則直接調(diào)用 ResultSet或者Iterator中的值的get方法夏跷。
MetaData類接口
JDBC中有幾個元數(shù)據(jù)接口DatabaseMetaData哼转、ResultSetMetaData、ParameterMetaData槽华,
分別表示數(shù)據(jù)庫+驅(qū)動元數(shù)據(jù)(Catalog壹蔓、Schema、Table猫态、Column佣蓉、Index等信息,通過Connection#getMetaData()方法獲得)亲雪、結(jié)果集元數(shù)據(jù)(列的類型屬性勇凭,通過ResultSet# getMetaData()方法獲得)以及參數(shù)元數(shù)據(jù)(參數(shù)數(shù)量、類型以及屬性义辕,通過PreparedStatement# getParameterMetaData()獲得)虾标。
DatabaseMetaData
看下涉及到的幾個類
org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.metadata.AdaptedDatabaseMetaData
/**
* Adapted database meta data.
*/
@RequiredArgsConstructor
public abstract class AdaptedDatabaseMetaData extends WrapperAdapter implements DatabaseMetaData {
private final CachedDatabaseMetaData cachedDatabaseMetaData;
@Override
public final String getURL() {
return cachedDatabaseMetaData.getUrl();
}
@Override
public final String getUserName() {
return cachedDatabaseMetaData.getUserName();
}
@Override
public final String getDatabaseProductName() {
return cachedDatabaseMetaData.getDatabaseProductName();
}
看到此類對DatabaseMetaData方法進(jìn)行了實現(xiàn),都是直接調(diào)用CachedDatabaseMetaData實例對應(yīng)方法直接返回灌砖。
org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.metadata.MultipleDatabaseMetaData
/**
* Multiple database meta data.
*/
@Getter
public abstract class MultipleDatabaseMetaData<C extends AbstractConnectionAdapter> extends AdaptedDatabaseMetaData {
private final C connection;
private final Collection<String> datasourceNames;
private final ShardingSphereMetaData shardingSphereMetaData;
private String currentDataSourceName;
private DatabaseMetaData currentDatabaseMetaData;
public MultipleDatabaseMetaData(final C connection, final Collection<String> datasourceNames,
final CachedDatabaseMetaData cachedDatabaseMetaData, final ShardingSphereMetaData shardingSphereMetaData) {
super(cachedDatabaseMetaData);
this.connection = connection;
this.datasourceNames = datasourceNames;
this.shardingSphereMetaData = shardingSphereMetaData;
}
@Override
public final Connection getConnection() throws SQLException {
return connection.getConnection(getDataSourceName());
}
@Override
public final ResultSet getSuperTypes(final String catalog, final String schemaPattern, final String typeNamePattern) throws SQLException {
return createDatabaseMetaDataResultSet(getDatabaseMetaData().getSuperTypes(getActualCatalog(catalog), getActualSchema(schemaPattern), typeNamePattern));
}
@Override
public final ResultSet getSuperTables(final String catalog, final String schemaPattern, final String tableNamePattern) throws SQLException {
return createDatabaseMetaDataResultSet(getDatabaseMetaData().getSuperTables(getActualCatalog(catalog), getActualSchema(schemaPattern), getActualTableNamePattern(tableNamePattern)));
@Override
public final ResultSet getColumns(final String catalog, final String schemaPattern, final String tableNamePattern, final String columnNamePattern) throws SQLException {
return createDatabaseMetaDataResultSet(
getDatabaseMetaData().getColumns(getActualCatalog(catalog), getActualSchema(schemaPattern), getActualTableNamePattern(tableNamePattern), columnNamePattern));
}
…
protected abstract String getActualTableNamePattern(String tableNamePattern);
protected abstract String getActualTable(String table);
protected abstract ResultSet createDatabaseMetaDataResultSet(ResultSet resultSet) throws SQLException;
…
}
可以看到璧函,MultipleDatabaseMetaData類中,對AdaptedDatabaseMetaData類中大多數(shù)方法進(jìn)行了重寫基显,通過調(diào)用子類實現(xiàn)的createDatabaseMetaDataResultSet方法獲得真實的DatabaseMetaDataResultSet蘸吓,同時通過子類實現(xiàn)getActualTableNamePattern、getActualTable方法獲取到真實物理表名信息撩幽。
org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.metadata.ShardingDatabaseMetaData
/**
* Sharding database meta data.
*/
public final class ShardingDatabaseMetaData extends MultipleDatabaseMetaData<ShardingConnection> {
private ShardingRule shardingRule;
public ShardingDatabaseMetaData(final ShardingConnection connection) {
super(connection, connection.getDataSourceMap().keySet(), connection.getRuntimeContext().getCachedDatabaseMetaData(), connection.getRuntimeContext().getMetaData());
shardingRule = connection.getRuntimeContext().getRule();
}
@Override
public String getActualTableNamePattern(final String tableNamePattern) {
if (null == tableNamePattern) {
return null;
}
return shardingRule.findTableRule(tableNamePattern).isPresent() ? "%" + tableNamePattern + "%" : tableNamePattern;
}
@Override
public String getActualTable(final String table) {
if (null == table) {
return null;
}
String result = table;
if (shardingRule.findTableRule(table).isPresent()) {
DataNode dataNode = shardingRule.getDataNode(table);
result = dataNode.getTableName();
}
return result;
}
@Override
protected ResultSet createDatabaseMetaDataResultSet(final ResultSet resultSet) throws SQLException {
return new DatabaseMetaDataResultSet<>(resultSet, shardingRule);
}
}
可以看到库继,在ShardingDatabaseMetaData類中,主要實現(xiàn)的getActualTable 、getActualTableNamePattern方法中就是根據(jù)ShardingRule查找到真實的表名宪萄、表名pattern以及創(chuàng)建DatabaseMetaDataResultSet實例舅桩。
ResultSetMetaData
*org.apache.shardingsphere.shardingjdbc.jdbc.core.resultset.ShardingResultSetMetaData *
/**
* Sharding result set meta data.
*/
@RequiredArgsConstructor
public final class ShardingResultSetMetaData extends WrapperAdapter implements ResultSetMetaData {
private final ResultSetMetaData resultSetMetaData;
private final ShardingRule shardingRule;
private final SQLStatementContext sqlStatementContext;
@Override
public int getColumnCount() {
return sqlStatementContext instanceof SelectStatementContext ? ((SelectStatementContext) sqlStatementContext).getProjectionsContext().getExpandProjections().size() : 0;
}
@Override
public boolean isAutoIncrement(final int column) throws SQLException {
return resultSetMetaData.isAutoIncrement(column);
}
…
@Override
public String getTableName(final int column) throws SQLException {
String actualTableName = resultSetMetaData.getTableName(column);
return shardingRule.getLogicTableNames(actualTableName).isEmpty() ? actualTableName : shardingRule.getLogicTableNames(actualTableName).iterator().next();
}
@Override
public String getCatalogName(final int column) {
return DefaultSchema.LOGIC_NAME;
}
…
}
這個類中實現(xiàn)比較簡單,大多數(shù)方法邏輯就是調(diào)用構(gòu)造函數(shù)傳入的ResultSetMetaData對應(yīng)方法雨膨,其它有一些則進(jìn)行了物理表名與邏輯表名的轉(zhuǎn)換和擴(kuò)展projection的判斷等。ResultSetMetaData的傳入則是在AbstractResultSetAdapter中取的第一個內(nèi)部底層ResultSet读串。
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractResultSetAdapter
@Override
public final ResultSetMetaData getMetaData() throws SQLException {
return new ShardingResultSetMetaData(resultSets.get(0).getMetaData(), getShardingRule(), executionContext.getSqlStatementContext());
}
ParameterMetaData
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.metadata.ShardingParameterMetaData
/**
* Sharding parameter meta data.
*/
@RequiredArgsConstructor
public final class ShardingParameterMetaData extends AbstractUnsupportedOperationParameterMetaData {
private final SQLParserEngine sqlParserEngine;
private final String sql;
@Override
public int getParameterCount() {
return sqlParserEngine.parse(sql, true).getParameterCount();
}
}
可以看到聊记,該類只實現(xiàn)了ParameterMetaData接口中g(shù)etParameterCount,通過解析引擎獲取到參數(shù)的數(shù)量恢暖。這個類實例會在ShardingPreparedStatement的構(gòu)造函數(shù)中進(jìn)行創(chuàng)建排监,以供getParameterMetaData()方法返回。
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement
/**
* PreparedStatement that support sharding.
*/
public final class ShardingPreparedStatement extends AbstractShardingPreparedStatementAdapter {
@Getter
private final ShardingConnection connection;
private final String sql;
@Getter
private final ParameterMetaData parameterMetaData;
…
private ShardingPreparedStatement(final ShardingConnection connection, final String sql,
final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys) throws SQLException {
if (Strings.isNullOrEmpty(sql)) {
throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
}
this.connection = connection;
this.sql = sql;
ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
parameterMetaData = new ShardingParameterMetaData(runtimeContext.getSqlParserEngine(), sql);
…
}
5.x中的變化
在5.x中杰捂,對JDBC的實現(xiàn)類做了進(jìn)一步的合并與簡化舆床,并不是根據(jù)功能定義各JDBC實現(xiàn)類,而是統(tǒng)一成ShardingSphere*類嫁佳,例如將4.1.1中ShardingDataSource挨队、MasterSalveDataSource、ShadowDataSource蒿往、EncryptDataSource都統(tǒng)一成了ShardingSphereDataSource類盛垦;
ShardingConnection、MasterSlaveConnection瓤漏、ShadowConnection腾夯、EncryptConnection都合并為ShardingSphereConnection類;
ShardingPreparedStatement蔬充、MasterSalvePreparedStatement蝶俱、ShadowPreparedStatement等也類似,都合并成為ShardingSpherePreparedStatement類饥漫;
原來不同的功能邏輯都以裝飾器方式榨呆,分散到了各個統(tǒng)一處理里環(huán)節(jié)(解析、路由趾浅、改寫愕提、執(zhí)行、歸并)皿哨,由內(nèi)核引擎負(fù)責(zé)觸發(fā)執(zhí)行浅侨,這種設(shè)計顯然更加優(yōu)雅,也降低了開發(fā)人員心智負(fù)擔(dān)证膨。
5.x中的DataSource類層次圖
5.x中Connection的類層次圖
5.x中Statement的類層次圖
由于5.x還沒有發(fā)布release版本如输,代碼后續(xù)可能還會有大的變化,所以5.x的詳細(xì)源碼待release版本正式發(fā)布后再分析。