從源碼看ShardingSphere設(shè)計-JDBC篇

目前ShardingSphere提供兩種接入模式JDBC與Proxy(MySQL協(xié)議)芽狗,sidecar還未實現(xiàn)矮锈,本篇介紹JDBC接入脆丁。

JDBC作為java訪問數(shù)據(jù)庫的一個接口規(guī)范秃诵,它定義了一套與數(shù)據(jù)庫交互的方式肥矢,例如要通過SQL從MySQL數(shù)據(jù)中查詢一些數(shù)據(jù)斗埂,JDBC方式的代碼可能如下:

try (Connection conn = DriverManager.getConnection(
     "jdbc:mysql://localhost/mydb",
     "user",
     "password")) {
 try (PreparedStatement ps =
    conn.prepareStatement("SELECT i.*, j.* FROM Omega i, Zappa j WHERE i.name = ? AND 
 j.num = ?")
 ) {
    // In the SQL statement being prepared, each question mark is a placeholder
    // that must be replaced with a value you provide through a "set" method invocation.
    // The following two method calls replace the two placeholders; the first is
    // replaced by a string value, and the second by an integer value.
    ps.setString(1, "Poor Yorick");
    ps.setInt(2, 8008);

    // The ResultSet, rs, conveys the result of executing the SQL statement.
    // Each time you call rs.next(), an internal row pointer, or cursor,
    // is advanced to the next row of the result.  The cursor initially is
    // positioned before the first row.
    try (ResultSet rs = ps.executeQuery()) {
        while (rs.next()) {
            int numColumns = rs.getMetaData().getColumnCount();
            for (int i = 1; i <= numColumns; i++) {
                System.out.println("COLUMN " + i + " = " + rs.getObject(i));
            } 
        } 
    }
  }    
}  

對于ShardingSphere來說符糊,JDBC子項目的功能定位很明確,就是可以讓使用者按照J(rèn)DBC方式實現(xiàn)分庫分表呛凶、讀寫分離男娄、加解密等功能,在設(shè)計方面就是裝飾器模式,在完成SQL解析模闲、路由建瘫、改寫等操作后,由內(nèi)部尸折、底層真正的JDBC資源(DataSource啰脚、Connection、Statement实夹、ResultSet)來最終完成SQL的執(zhí)行橄浓。

與引擎篇不一樣,在JDBC篇亮航,將按照類層次結(jié)構(gòu)展開介紹荸实,這樣可以更好的從整體上理解設(shè)計。JDBC規(guī)范里最主要的幾個接口:DataSource塞赂、Connection泪勒、Statement、PreparedStatement宴猾、ResultSet、DatabaseMetaData叼旋、ResultSetMetaData仇哆、ParameterMetaData。本文將按照這個順序?qū)hardingSphere對這些接口的各主要實現(xiàn)類源碼進(jìn)行解讀夫植。

DataSource

ShardingSphere中DataSource接口實現(xiàn)類圖

從名字就可以到ShardingSphere根據(jù)不同的功能提供了對應(yīng)的DataSource接口實現(xiàn)讹剔,按照此類圖中從上往下看下。

首先是AbstractUnsupportedOperationDataSource類详民,它是ShardingSphere各DataSource實現(xiàn)類的基類延欠,雖然實現(xiàn)了DataSource接口,但只實現(xiàn)了getLoginTimeout和 setLoginTimeout方法沈跨,實現(xiàn)中直接拋出了SQLFeatureNotSupportedException由捎,這兩方法在其子類各功能的DataSource實現(xiàn)類中都未進(jìn)行重寫,所以ShardingSphere明確了不支持這兩個方法的調(diào)用饿凛。

JDBC是JAVA操作數(shù)據(jù)庫的一套標(biāo)準(zhǔn)接口狞玛,但各接口的實現(xiàn)是由數(shù)據(jù)庫驅(qū)動中負(fù)責(zé)實現(xiàn)的,而各家廠商也并非對接口中所有方法都完整的進(jìn)行了支持涧窒,ShardingSphere也類似心肪,對于一些不常用或者無法提供準(zhǔn)確值的方法并未提供對應(yīng)的實現(xiàn),為了統(tǒng)一管理不支持的方法纠吴,所以在實現(xiàn)JDBC接口時硬鞍,往往都設(shè)計了一個公有的父類:一個AbstractUnsupported*類,其中各方法實現(xiàn)就直接拋出SQLFeatureNotSupportedException,這樣設(shè)計也是為了更方便對不支持的JDBC方法進(jìn)行統(tǒng)一管理固该。

org.apache.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationDataSource

/**
 * Unsupported {@code Datasource} methods.
 */
public abstract class AbstractUnsupportedOperationDataSource extends WrapperAdapter implements DataSource {
    
    @Override
    public final int getLoginTimeout() throws SQLException {
        throw new SQLFeatureNotSupportedException("unsupported getLoginTimeout()");
    }
    
    @Override
    public final void setLoginTimeout(final int seconds) throws SQLException {
        throw new SQLFeatureNotSupportedException("unsupported setLoginTimeout(int seconds)");
    }
}

可以看到該類除了實現(xiàn)DataSource接口锅减,同事還繼承了WrapperAdapter類
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.WrapperAdapter

/**
 * Adapter for {@code java.sql.Wrapper}.
 */
public abstract class WrapperAdapter implements Wrapper {
    
    private final Collection<JdbcMethodInvocation> jdbcMethodInvocations = new ArrayList<>();
    
    @SuppressWarnings("unchecked")
    @Override
    public final <T> T unwrap(final Class<T> iface) throws SQLException {
        if (isWrapperFor(iface)) {
            return (T) this;
        }
        throw new SQLException(String.format("[%s] cannot be unwrapped as [%s]", getClass().getName(), iface.getName()));
    }
    
    @Override
    public final boolean isWrapperFor(final Class<?> iface) {
        return iface.isInstance(this);
    }
    
    /**
     * record method invocation.
     * 
     * @param targetClass target class
     * @param methodName method name
     * @param argumentTypes argument types
     * @param arguments arguments
     */
    @SneakyThrows
    public final void recordMethodInvocation(final Class<?> targetClass, final String methodName, final Class<?>[] argumentTypes, final Object[] arguments) {
        jdbcMethodInvocations.add(new JdbcMethodInvocation(targetClass.getMethod(methodName, argumentTypes), arguments));
    }
    
    /**
     * Replay methods invocation.
     * 
     * @param target target object
     */
    public final void replayMethodsInvocation(final Object target) {
        for (JdbcMethodInvocation each : jdbcMethodInvocations) {
            each.invoke(target);
        }
    }
}

該類實現(xiàn)了Wrapper接口,其內(nèi)部持有一個JdbcMethodInvocation的集合蹬音,它記錄了當(dāng)前JDBC資源的一些方法操作上煤,recordMethodInvocation方法負(fù)責(zé)記錄外圍應(yīng)用的一些設(shè)置方法的調(diào)用,例如setAutoCommit著淆、setReadOnly劫狠、setFetchSize、setMaxFieldSize等永部,在外圍程序調(diào)用ShardingConnection的setAutoCommit独泞、setReadOnly以及ShardingPreparedStatement的setFetchSize、setMaxFieldSize時進(jìn)行調(diào)用苔埋,replayMethodsInvocation完成在指定目標(biāo)對象回放這些方法調(diào)用懦砂,會在底層真實JDBC類(DB driver、數(shù)據(jù)庫連接池等)時進(jìn)行重新調(diào)用组橄。

WrapperAdapter是ShardingSphere各JDBC實現(xiàn)類的基礎(chǔ)基類荞膘,所以無論是DataSource、Connection玉工、Statement羽资、PrepareStatement都具備了該回放能力。

Wrapper是JDBC中提供的一個接口也是一種設(shè)計模式遵班,用于獲取JDBC代理類包裝的原始類屠升,在JDK該類的源碼有該接口的完整說明:
Interface for JDBC classes which provide the ability to retrieve the delegate instance when the instance in question is in fact a proxy class.
The wrapper pattern is employed by many JDBC driver implementations to provide extensions beyond
the traditional JDBC API that are specific to a data source. Developers may wish to gain access to
these resources that are wrapped (the delegates) as proxy class instances representing the
the actual resources. This interface describes a standard mechanism to access
these wrapped resources
represented by their proxy, to permit direct access to the resource delegates.

接下來看下AbstractDataSourceAdapter 類org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractDataSourceAdapter

/**
 * Adapter for {@code Datasource}.
 */
@Getter
public abstract class AbstractDataSourceAdapter extends AbstractUnsupportedOperationDataSource implements AutoCloseable {
    
    private final Map<String, DataSource> dataSourceMap;
    
    private final DatabaseType databaseType;
    
    @Setter
    private PrintWriter logWriter = new PrintWriter(System.out);
    
    public AbstractDataSourceAdapter(final Map<String, DataSource> dataSourceMap) throws SQLException {
        this.dataSourceMap = dataSourceMap;
        databaseType = createDatabaseType();
    }
…
    @Override
    public final Connection getConnection(final String username, final String password) throws SQLException {
        return getConnection();
    }
    
    @Override
    public final void close() throws Exception {
        close(dataSourceMap.keySet());
    }
    …
    protected abstract RuntimeContext getRuntimeContext();
}

可以看到,這個抽象的DataSource適配器類狭郑,其內(nèi)部維護(hù)了一個真實DataSource(可以是數(shù)據(jù)庫驅(qū)動提供的腹暖,也可以是第三方數(shù)據(jù)庫連接)的map以及數(shù)據(jù)庫類型,另外定義一個抽象方法生成對應(yīng)的RuntimeContext翰萨。RuntimeContext作為各JDBC各資源對象間傳遞的一個上下文對象脏答,其定義了對應(yīng)規(guī)則、屬性缨历、數(shù)據(jù)庫類型以蕴、執(zhí)行引擎以及SQL解析引擎。org.apache.shardingsphere.shardingjdbc.jdbc.core.context.RuntimeContext

/**
 * Runtime context.
 *
 * @param <T> type of rule
 */
public interface RuntimeContext<T extends BaseRule> extends AutoCloseable {
    
    /**
     * Get rule.
     * 
     * @return rule
     */
    T getRule();
    
    /**
     * Get properties.
     *
     * @return properties
     */
    ConfigurationProperties getProperties();
    
    /**
     * Get database type.
     * 
     * @return database type
     */
    DatabaseType getDatabaseType();
    
    /**
     * Get execute engine.
     * 
     * @return execute engine
     */
    ExecutorEngine getExecutorEngine();
    
    /**
     * Get SQL parser engine.
     * 
     * @return SQL parser engine
     */
    SQLParserEngine getSqlParserEngine();
}

不同的功能都有對應(yīng)的RuntimeContext實現(xiàn)類辛孵,其的類層次圖為:


首先看下AbstractRuntimeContext
org.apache.shardingsphere.shardingjdbc.jdbc.core.context.AbstractRuntimeContext

/**
 * Abstract runtime context.
 *
 * @param <T> type of rule
 */
@Getter
public abstract class AbstractRuntimeContext<T extends BaseRule> implements RuntimeContext<T> {
    
    private final T rule;
    
    private final ConfigurationProperties properties;
    
    private final DatabaseType databaseType;
    
    private final ExecutorEngine executorEngine;
    
    private final SQLParserEngine sqlParserEngine;
    
    protected AbstractRuntimeContext(final T rule, final Properties props, final DatabaseType databaseType) {
        this.rule = rule;
        properties = new ConfigurationProperties(null == props ? new Properties() : props);
        this.databaseType = databaseType;
        executorEngine = new ExecutorEngine(properties.<Integer>getValue(ConfigurationPropertyKey.EXECUTOR_SIZE));
        sqlParserEngine = SQLParserEngineFactory.getSQLParserEngine(DatabaseTypes.getTrunkDatabaseTypeName(databaseType));
        ConfigurationLogger.log(rule.getRuleConfiguration());
        ConfigurationLogger.log(props);
    }
    
protected abstract ShardingSphereMetaData getMetaData();
…
}

可以看到在構(gòu)造函數(shù)中丛肮,對執(zhí)行引擎、解析引擎等屬性進(jìn)行賦值與初始化魄缚,另外關(guān)鍵的是其定義了一個抽象方法getMetaData宝与,其返回ShardingSphereMetaData焚廊。各功能實現(xiàn)的RuntimeContex實現(xiàn)類的主要邏輯都為圍繞如果生成ShardingSphereMetaData對象。根據(jù)數(shù)據(jù)源是單個還是多個习劫,又分別定義了兩個抽象類SingleDataSourceRuntimeContext和MultipleDataSourcesRuntimeContext咆瘟,其操作就是根據(jù)應(yīng)用傳入的DataSource的map,然后生成DataSourceMetas與SchemaMetaData對象從而構(gòu)建ShardingSphereMetaData實例诽里,其中后者是需要真正的功能RuntimeContext類這種實現(xiàn)袒餐,例如在數(shù)據(jù)分片執(zhí)行上下文類中

/**
 * Runtime context for sharding.
 */
@Getter
public final class ShardingRuntimeContext extends MultipleDataSourcesRuntimeContext<ShardingRule> {
    
    private final CachedDatabaseMetaData cachedDatabaseMetaData;
    
    private final ShardingTransactionManagerEngine shardingTransactionManagerEngine;
    
    public ShardingRuntimeContext(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Properties props, final DatabaseType databaseType) throws SQLException {
        super(dataSourceMap, shardingRule, props, databaseType);
        cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSourceMap);// 創(chuàng)建緩存元數(shù)據(jù)
        shardingTransactionManagerEngine = new ShardingTransactionManagerEngine();// 創(chuàng)建事務(wù)管理器
        shardingTransactionManagerEngine.init(databaseType, dataSourceMap);
    }

    private CachedDatabaseMetaData createCachedDatabaseMetaData(final Map<String, DataSource> dataSourceMap) throws SQLException {
        try (Connection connection = dataSourceMap.values().iterator().next().getConnection()) {
            return new CachedDatabaseMetaData(connection.getMetaData());
        }
    }
    
    @Override
    protected SchemaMetaData loadSchemaMetaData(final Map<String, DataSource> dataSourceMap) throws SQLException {
        int maxConnectionsSizePerQuery = getProperties().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
        boolean isCheckingMetaData = getProperties().<Boolean>getValue(ConfigurationPropertyKey.CHECK_TABLE_METADATA_ENABLED);
        SchemaMetaData result = new ShardingMetaDataLoader(dataSourceMap, getRule(), maxConnectionsSizePerQuery, isCheckingMetaData).load(getDatabaseType());
        result = SchemaMetaDataDecorator.decorate(result, getRule(), new ShardingTableMetaDataDecorator());
        if (!getRule().getEncryptRule().getEncryptTableNames().isEmpty()) {
            result = SchemaMetaDataDecorator.decorate(result, getRule().getEncryptRule(), new EncryptTableMetaDataDecorator());
        }
        return result;
    }
..
}

ShardingSphereMetaData類中定義了關(guān)于數(shù)據(jù)庫、表谤狡、索引的元數(shù)據(jù)灸眼,作為RuntimeContext的一部分,這些元數(shù)據(jù)在后續(xù)各引擎都會使用到墓懂。這些元數(shù)據(jù)類并不復(fù)雜焰宣,快速瀏覽下:
org.apache.shardingsphere.underlying.common.metadata.ShardingSphereMetaData

/**
 * ShardingSphere meta data.
 */
@RequiredArgsConstructor
@Getter
public final class ShardingSphereMetaData {
    
    private final DataSourceMetas dataSources;
    
    private final SchemaMetaData schema;
}

org.apache.shardingsphere.underlying.common.metadata.datasource.DataSourceMetas

/**
 * Data source metas.
 */
public final class DataSourceMetas {
    
private final Map<String, DataSourceMetaData> dataSourceMetaDataMap;
…
}

數(shù)據(jù)源元數(shù)據(jù)
org.apache.shardingsphere.spi.database.metadata.DataSourceMetaData


/**
 * Data source meta data.
 */
public interface DataSourceMetaData {
    
    /**
     * Get host name.
     * 
     * @return host name
     */
    String getHostName();
    
    /**
     * Get port.
     * 
     * @return port
     */
    int getPort();
    
    /**
     * Get catalog.
     *
     * @return catalog
     */
    String getCatalog();
    
    /**
     * Get schema.
     * 
     * @return schema
     */
    String getSchema();
}

Schema元數(shù)據(jù)org.apache.shardingsphere.sql.parser.binder.metadata.schema.SchemaMetaData

/**
 * Schema meta data.
 */
public final class SchemaMetaData {
    
private final Map<String, TableMetaData> tables;
…
}

表元數(shù)據(jù)org.apache.shardingsphere.sql.parser.binder.metadata.table.TableMetaData

/**
 * Table meta data.
 */
@Getter
@EqualsAndHashCode
@ToString
public final class TableMetaData {
    
    private final Map<String, ColumnMetaData> columns;
    
private final Map<String, IndexMetaData> indexes;
…
}

列元數(shù)據(jù)org.apache.shardingsphere.sql.parser.binder.metadata.column.ColumnMetaData

/**
 * Column meta data.
 */
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public class ColumnMetaData {
    
    private final String name;
    
    private final int dataType;
    
    private final String dataTypeName;
    
    private final boolean primaryKey;
    
    private final boolean generated;
    
    private final boolean caseSensitive;
}

索引元數(shù)據(jù)org.apache.shardingsphere.sql.parser.binder.metadata.index.IndexMetaData

/**
 * Index meta data.
 */
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class IndexMetaData {
    
    private final String name;
}

這些元數(shù)據(jù)類的加載分別有對應(yīng)的加載類SchemaMetaDataLoader、TableMetaDataLoader捕仔、ColumnMetaDataLoader匕积、IndexMetaDataLoader,其核心邏輯都是通過獲取數(shù)據(jù)庫連接Connection實例榜跌,然后通過其getMetaData()獲得DatabaseMetaData實例闪唆,然后調(diào)用getSchemas、getTables钓葫、getColums等方法拿到對應(yīng)的表與列信息苞氮。例如ColumnMetaDataLoader。

org.apache.shardingsphere.sql.parser.binder.metadata.column.ColumnMetaDataLoader

/**
 * Column meta data loader.
 */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ColumnMetaDataLoader {
    
    private static final String COLUMN_NAME = "COLUMN_NAME";
    
    private static final String DATA_TYPE = "DATA_TYPE";
    
    private static final String TYPE_NAME = "TYPE_NAME";
    
    /**
     * Load column meta data list.
     * 
     * @param connection connection
     * @param table table name
     * @param databaseType database type
     * @return column meta data list
     * @throws SQLException SQL exception
     */
    public static Collection<ColumnMetaData> load(final Connection connection, final String table, final String databaseType) throws SQLException {
        if (!isTableExist(connection, connection.getCatalog(), table, databaseType)) {
            return Collections.emptyList();
        }
        Collection<ColumnMetaData> result = new LinkedList<>();
        Collection<String> primaryKeys = loadPrimaryKeys(connection, table, databaseType);
        List<String> columnNames = new ArrayList<>();
        List<Integer> columnTypes = new ArrayList<>();
        List<String> columnTypeNames = new ArrayList<>();
        List<Boolean> isPrimaryKeys = new ArrayList<>();
        List<Boolean> isCaseSensitives = new ArrayList<>();
        try (ResultSet resultSet = connection.getMetaData().getColumns(connection.getCatalog(), JdbcUtil.getSchema(connection, databaseType), table, "%")) {
            while (resultSet.next()) {
                String columnName = resultSet.getString(COLUMN_NAME);
                columnTypes.add(resultSet.getInt(DATA_TYPE));
                columnTypeNames.add(resultSet.getString(TYPE_NAME));
                isPrimaryKeys.add(primaryKeys.contains(columnName));
                columnNames.add(columnName);
            }
        }
        try (ResultSet resultSet = connection.createStatement().executeQuery(generateEmptyResultSQL(table, databaseType))) {
            for (String each : columnNames) {
                isCaseSensitives.add(resultSet.getMetaData().isCaseSensitive(resultSet.findColumn(each)));
            }
        }
        for (int i = 0; i < columnNames.size(); i++) {
            // TODO load auto generated from database meta data
            result.add(new ColumnMetaData(columnNames.get(i), columnTypes.get(i), columnTypeNames.get(i), isPrimaryKeys.get(i), false, isCaseSensitives.get(i)));
        }
        return result;
}
…
}

回到功能對應(yīng)的DataSource最終實現(xiàn)類瓤逼,例如數(shù)據(jù)分片ShardingDataSource類

org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.ShardingDataSource

/**
 * Sharding data source.
 */
@Getter
public class ShardingDataSource extends AbstractDataSourceAdapter {
    
    private final ShardingRuntimeContext runtimeContext;
    
    static {
        NewInstanceServiceLoader.register(RouteDecorator.class);
        NewInstanceServiceLoader.register(SQLRewriteContextDecorator.class);
        NewInstanceServiceLoader.register(ResultProcessEngine.class);
    }
    
    public ShardingDataSource(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final Properties props) throws SQLException {
        super(dataSourceMap);
        checkDataSourceType(dataSourceMap);
        runtimeContext = new ShardingRuntimeContext(dataSourceMap, shardingRule, props, getDatabaseType());
    }
    
    private void checkDataSourceType(final Map<String, DataSource> dataSourceMap) {
        for (DataSource each : dataSourceMap.values()) {
            Preconditions.checkArgument(!(each instanceof MasterSlaveDataSource), "Initialized data sources can not be master-slave data sources.");
        }
    }
    
    @Override
    public final ShardingConnection getConnection() {
        return new ShardingConnection(getDataSourceMap(), runtimeContext, TransactionTypeHolder.get());
    }
}
…
}

可以看到,在static代碼中注冊了該功能需要的路由裝飾器库物、SQL重寫上下文裝飾器以及結(jié)果處理引擎霸旗,該類實現(xiàn)了getConnection()方法。

Connection

在ShardingSphere中Connection的實現(xiàn)類層次圖:


ShardingSphere中Connection接口的實現(xiàn)類圖

與DataSource類似戚揭,ShardingSphere在Connection接口的實現(xiàn)中诱告,也是先定義了一個AbstractUnsupportedOperationConnection類,對于主從和數(shù)據(jù)分片民晒,又定義了一個抽象Connection適配器類精居。

/**
 * Adapter for {@code Connection}.
 */
public abstract class AbstractConnectionAdapter extends AbstractUnsupportedOperationConnection {
    
    @Getter
    private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
    
    @Getter
    private final ForceExecuteTemplate<Connection> forceExecuteTemplate = new ForceExecuteTemplate<>();
    
    private final ForceExecuteTemplate<Entry<String, Connection>> forceExecuteTemplateForClose = new ForceExecuteTemplate<>();
    
    private final RootInvokeHook rootInvokeHook = new SPIRootInvokeHook();
    
    private boolean autoCommit = true;
    
    private boolean readOnly;
    
    private volatile boolean closed;
    
    private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
    
    protected AbstractConnectionAdapter() {
        rootInvokeHook.start();
    }
    
    /**
     * Get database connection.
     *
     * @param dataSourceName data source name
     * @return database connection
     * @throws SQLException SQL exception
     */
    public final Connection getConnection(final String dataSourceName) throws SQLException {
        return getConnections(ConnectionMode.MEMORY_STRICTLY, dataSourceName, 1).get(0);
    }
    
    /**
     * Get database connections.
     *
     * @param connectionMode connection mode
     * @param dataSourceName data source name
     * @param connectionSize size of connection list to be get
     * @return database connections
     * @throws SQLException SQL exception
     */
    public final List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
        DataSource dataSource = getDataSourceMap().get(dataSourceName);
        Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName);
        Collection<Connection> connections;
        synchronized (cachedConnections) {
            connections = cachedConnections.get(dataSourceName);
        }
        List<Connection> result;
        if (connections.size() >= connectionSize) {
            result = new ArrayList<>(connections).subList(0, connectionSize);
        } else if (!connections.isEmpty()) {
            result = new ArrayList<>(connectionSize);
            result.addAll(connections);
            List<Connection> newConnections = createConnections(dataSourceName, connectionMode, dataSource, connectionSize - connections.size());
            result.addAll(newConnections);
            synchronized (cachedConnections) {
                cachedConnections.putAll(dataSourceName, newConnections);
            }
        } else {
            result = new ArrayList<>(createConnections(dataSourceName, connectionMode, dataSource, connectionSize));
            synchronized (cachedConnections) {
                cachedConnections.putAll(dataSourceName, result);
            }
        }
        return result;
    }
    
    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
    private List<Connection> createConnections(final String dataSourceName, final ConnectionMode connectionMode, final DataSource dataSource, final int connectionSize) throws SQLException {
        if (1 == connectionSize) {
            Connection connection = createConnection(dataSourceName, dataSource);
            replayMethodsInvocation(connection);
            return Collections.singletonList(connection);
        }
        if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {
            return createConnections(dataSourceName, dataSource, connectionSize);
        }
        synchronized (dataSource) {
            return createConnections(dataSourceName, dataSource, connectionSize);
        }
}
…
}

可以看到該類內(nèi)部持有一個Multimap<String, Connection> cachedConnections,該類為guava中支持集合value的一個map潜必,類似Map<String,List<Connection>>靴姿,ShardingSphere選擇此類型Map,是因為同一數(shù)據(jù)源可能生成多個數(shù)據(jù)庫連接磁滚。
在getConnections方法中佛吓,根據(jù)cachedConnections已有的連接和目標(biāo)連接數(shù)宵晚,如果小于目標(biāo)連接數(shù),則創(chuàng)建相差的連接维雇。

值得注意的cachedConnections是否有必要加同步鎖淤刃?

一般而言,各數(shù)據(jù)庫驅(qū)動提供的Connection吱型、Statement并不是線程安全的逸贾,也就是說并不支持并發(fā)操作。通過查看該方法的調(diào)用鏈ShardingPreparedStatement.executeQuery-> PreparedStatementExecutor.init-> PreparedStatementExecutor .obtainExecuteGroups-> AbstractConnectionAdapter.getConnection津滞,可以看到如果并發(fā)執(zhí)行ShardingPreparedStatement.executeQuery方法就會產(chǎn)生對cachedConnections的操作铝侵,這里對cachedConnections加synchronized,應(yīng)該也是為了保證線程安全性据沈,雖然應(yīng)用這種用法其實并不正確哟沫。

這個類還有好多屬性設(shè)置方法,例如setReadOnly方法锌介,其中recordMethodInvocation方法是為了后續(xù)新建的連接都能通過回放已設(shè)置的屬性嗜诀,而forceExecuteTemplate.execute則將已創(chuàng)建的連接分別設(shè)置該屬性值。

  public final void setReadOnly(final boolean readOnly) throws SQLException {
        this.readOnly = readOnly;
        recordMethodInvocation(Connection.class, "setReadOnly", new Class[]{boolean.class}, new Object[]{readOnly});
        forceExecuteTemplate.execute(cachedConnections.values(), connection -> connection.setReadOnly(readOnly));
    }

看完Connection的抽象適配器類后孔祸,看下其功能實現(xiàn)類隆敢,數(shù)據(jù)分片ShardingConnection,主從MasterSlaveConnection
org.apache.shardingsphere.shardingjdbc.jdbc.core.connection.ShardingConnection

/**
 * Connection that support sharding.
 */
@Getter
public final class ShardingConnection extends AbstractConnectionAdapter {
    
    private final Map<String, DataSource> dataSourceMap;
    
    private final ShardingRuntimeContext runtimeContext;
    
    private final TransactionType transactionType;
    
    private final ShardingTransactionManager shardingTransactionManager;
    
    public ShardingConnection(final Map<String, DataSource> dataSourceMap, final ShardingRuntimeContext runtimeContext, final TransactionType transactionType) {
        this.dataSourceMap = dataSourceMap;
        this.runtimeContext = runtimeContext;
        this.transactionType = transactionType;
        shardingTransactionManager = runtimeContext.getShardingTransactionManagerEngine().getTransactionManager(transactionType);
    }
    
    /**
     * Whether hold transaction or not.
     *
     * @return true or false
     */
    public boolean isHoldTransaction() {
        return (TransactionType.LOCAL == transactionType && !getAutoCommit()) || (TransactionType.XA == transactionType && isInShardingTransaction());
    }
    
    @Override
    protected Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
        return isInShardingTransaction() ? shardingTransactionManager.getConnection(dataSourceName) : dataSource.getConnection();
    }
    
    private boolean isInShardingTransaction() {
        return null != shardingTransactionManager && shardingTransactionManager.isInTransaction();
    }
    
    @Override
    public DatabaseMetaData getMetaData() {
        return new ShardingDatabaseMetaData(this);
    }
    
    @Override
    public PreparedStatement prepareStatement(final String sql) throws SQLException {
        return new ShardingPreparedStatement(this, sql);
    }
    
    @Override
    public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
        return new ShardingPreparedStatement(this, sql, resultSetType, resultSetConcurrency);
}
...
@Override
    public Statement createStatement() {
        return new ShardingStatement(this);
    }
    
    @Override
    public Statement createStatement(final int resultSetType, final int resultSetConcurrency) {
        return new ShardingStatement(this, resultSetType, resultSetConcurrency);
    }
…
}

可以看到prepareStatement方法返回的是ShardingPreparedStatement實例崔慧,createStatement方法返回的是ShardingStatement拂蝎。
另外createConnection方法(AbstractConnectionAdapter類會調(diào)用該方法創(chuàng)建Connection)中,判斷是否為分片事務(wù)惶室,如果是則通過分片事務(wù)管理器獲取連接温自。
org.apache.shardingsphere.shardingjdbc.jdbc.core.connection.MasterSlaveConnection

/**
 * Connection that support master-slave.
 */
@RequiredArgsConstructor
@Getter
public final class MasterSlaveConnection extends AbstractConnectionAdapter {
    
    private final Map<String, DataSource> dataSourceMap;
    
    private final MasterSlaveRuntimeContext runtimeContext;
    
    @Override
    protected Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException {
        return dataSource.getConnection();
    }
    
    @Override
    public DatabaseMetaData getMetaData() {
        return new MasterSlaveDatabaseMetaData(this);
    }
    
    @Override
    public Statement createStatement() {
        return new MasterSlaveStatement(this);
    }
    
    @Override
    public Statement createStatement(final int resultSetType, final int resultSetConcurrency) {
        return new MasterSlaveStatement(this, resultSetType, resultSetConcurrency);
    }
…
}

相比ShardingConnection,MasterSlaveConnection類則簡單一些皇钞,因為不涉及分片事務(wù)悼泌,可以看到創(chuàng)建的Statement分別為MasterSlavePreparedStatement和MasterSlaveStatement。

Statement

接下來我們看下ShardingSphere中Statement的各個實現(xiàn)類夹界,類圖如下:


ShaShardingSphere中Statement接口的實現(xiàn)類圖

與DataSource與Connection接口實現(xiàn)類一樣馆里,分別定義了兩個抽象不支持類AbstractUnsupportedOperationStatement和AbstractUnsupportedOperationPreparedStatement,分別對Statement和PreparedStatement接口中不支持的方法進(jìn)行默認(rèn)實現(xiàn)(直接拋出SQLFeatureNotSupportedException異常)可柿。

org.apache.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationStatement

/**
 * Unsupported {@code Statement} METHODS.
 */
public abstract class AbstractUnsupportedOperationStatement extends WrapperAdapter implements Statement {
    
    @Override
    public final int getFetchDirection() throws SQLException {
        throw new SQLFeatureNotSupportedException("getFetchDirection");
    }
    
    @Override
    public final void setFetchDirection(final int direction) throws SQLException {
        throw new SQLFeatureNotSupportedException("setFetchDirection");
    }
    
    @Override
    public final void addBatch(final String sql) throws SQLException {
        throw new SQLFeatureNotSupportedException("addBatch sql");
    }
    
    @Override
    public void clearBatch() throws SQLException {
        throw new SQLFeatureNotSupportedException("clearBatch");
    }
…
}

org.apache.shardingsphere.shardingjdbc.jdbc.unsupported.AbstractUnsupportedOperationPreparedStatement

**
 * Unsupported {@code PreparedStatement} methods.
 */
public abstract class AbstractUnsupportedOperationPreparedStatement extends AbstractStatementAdapter implements PreparedStatement {
    
    public AbstractUnsupportedOperationPreparedStatement() {
        super(PreparedStatement.class);
    }
    
    @Override
    public final ResultSetMetaData getMetaData() throws SQLException {
        throw new SQLFeatureNotSupportedException("getMetaData");
    }
    
    /**
     * Get parameter meta data.
     *
     * @return parameter metadata
     * @throws SQLException SQL exception
     */
    @Override
    public ParameterMetaData getParameterMetaData() throws SQLException {
        throw new SQLFeatureNotSupportedException("ParameterMetaData");
    }
    
    @Override
    public final void setNString(final int parameterIndex, final String x) throws SQLException {
        throw new SQLFeatureNotSupportedException("setNString");
    }
…
}

這兩個類作為抽象父類鸠踪,只是提供了一個默認(rèn)實現(xiàn),在最終的【功能】實現(xiàn)類中复斥,根據(jù)功能邏輯對部分方法進(jìn)行了重寫實現(xiàn)营密。

來看下Statement分支的另外一個抽象類
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractStatementAdapter

/**
 * Adapter for {@code Statement}.
 */
@RequiredArgsConstructor
public abstract class AbstractStatementAdapter extends AbstractUnsupportedOperationStatement {
    
    private final Class<? extends Statement> targetClass;
    
    private boolean closed;
    
    private boolean poolable;
    
    private int fetchSize;
    
    private final ForceExecuteTemplate<Statement> forceExecuteTemplate = new ForceExecuteTemplate<>();
    
    @SuppressWarnings("unchecked")
    @Override
    public final void close() throws SQLException {
        closed = true;
        try {
            forceExecuteTemplate.execute((Collection) getRoutedStatements(), Statement::close);
        } finally {
            getRoutedStatements().clear();
        }
    }
    
    @Override
    public final boolean isClosed() {
        return closed;
}
…
@Override
    public final int getFetchSize() {
        return fetchSize;
    }
    
    @SuppressWarnings("unchecked")
    @Override
    public final void setFetchSize(final int rows) throws SQLException {
        this.fetchSize = rows;
        recordMethodInvocation(targetClass, "setFetchSize", new Class[] {int.class}, new Object[] {rows});
        forceExecuteTemplate.execute((Collection) getRoutedStatements(), statement -> statement.setFetchSize(rows));
    }
…
@Override
    public final int getUpdateCount() throws SQLException {
        if (isAccumulate()) {
            return accumulate();
        } else {
            Collection<? extends Statement> statements = getRoutedStatements();
            if (statements.isEmpty()) {
                return -1;
            }
            return getRoutedStatements().iterator().next().getUpdateCount();
        }
    }
    
    private int accumulate() throws SQLException {
        long result = 0;
        boolean hasResult = false;
        for (Statement each : getRoutedStatements()) {
            int updateCount = each.getUpdateCount();
            if (updateCount > -1) {
                hasResult = true;
            }
            result += updateCount;
        }
        if (result > Integer.MAX_VALUE) {
            result = Integer.MAX_VALUE;
        }
        return hasResult ? Long.valueOf(result).intValue() : -1;
    }
…
    protected abstract boolean isAccumulate();
    
    protected abstract Collection<? extends Statement> getRoutedStatements();
}

從代碼中可以看到該類實現(xiàn)了close、poolable永票、fetchSize卵贱、maxFieldSize滥沫、maxRows岖常、queryTimeout的getter和setter方法暂筝,其實現(xiàn)邏輯是類似的,即依次設(shè)置其底層對應(yīng)的Statement的對應(yīng)方法聋丝,這里依然使用了recordMethodInvocation和forceExecuteTemplate.execute方法编振。

該類中定義了一個抽象方法getRoutedStatements()缀辩,該方法返回路由后對應(yīng)的真正底層的Statement實現(xiàn)類,不同功能中該方法實現(xiàn)不同踪央,例如數(shù)據(jù)分片ShardingPreparedStatement中就是路由的Statement臀玄,可能有多個,而主從畅蹂、加密健无、影子表中該方法則只有一個。

該類中另外一個抽象方法是isAccumulate()液斜,這個方法判斷是否需要進(jìn)行累計累贤,在getUpdateCount()方法中,會首先判斷isAccumulate()的值少漆,如果是true臼膏,則需要將getRoutedStatements()返回的各Statement的getUpdateCount()值進(jìn)行累計。此方法的子類實現(xiàn)中示损,數(shù)據(jù)分片中如果并非全部是廣播表時渗磅,此方法返回即為true,主從检访、加密始鱼、影子表
此方法返回則都為false。

ShardingPreparedStatement和ShardingStatement類中

    @Override
    public boolean isAccumulate() {
return !connection.getRuntimeContext().getRule().isAllBroadcastTables(executionContext.getSqlStatementContext().getTablesContext().getTableNames());
    }

接下來我們看下看下數(shù)據(jù)分片對應(yīng)的Statement實現(xiàn)類
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingStatement

**
 * Statement that support sharding.
 */
public final class ShardingStatement extends AbstractStatementAdapter {
    
    @Getter
    private final ShardingConnection connection;
    
    private final StatementExecutor statementExecutor;
    
    private boolean returnGeneratedKeys;
    
    private ExecutionContext executionContext;
    
    private ResultSet currentResultSet;
    
    public ShardingStatement(final ShardingConnection connection) {
        this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
    }
    
    public ShardingStatement(final ShardingConnection connection, final int resultSetType, final int resultSetConcurrency) {
        this(connection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
    }
    
    public ShardingStatement(final ShardingConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
        super(Statement.class);
        this.connection = connection;
        statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, connection);
    }
    
    @Override
    public ResultSet executeQuery(final String sql) throws SQLException {
        if (Strings.isNullOrEmpty(sql)) {
            throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
        }
        ResultSet result;
        try {
            executionContext = prepare(sql);
            List<QueryResult> queryResults = statementExecutor.executeQuery();
            MergedResult mergedResult = mergeQuery(queryResults);
            result = new ShardingResultSet(statementExecutor.getResultSets(), mergedResult, this, executionContext);
        } finally {
            currentResultSet = null;
        }
        currentResultSet = result;
        return result;
    }
    
    @Override
    public int executeUpdate(final String sql) throws SQLException {
        try {
            executionContext = prepare(sql);
            return statementExecutor.executeUpdate();
        } finally {
            currentResultSet = null;
        }
    }
…
private ExecutionContext prepare(final String sql) throws SQLException {
        statementExecutor.clear();
        ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
        BasePrepareEngine prepareEngine = new SimpleQueryPrepareEngine(
                runtimeContext.getRule().toRules(), runtimeContext.getProperties(), runtimeContext.getMetaData(), runtimeContext.getSqlParserEngine());
        ExecutionContext result = prepareEngine.prepare(sql, Collections.emptyList());
        statementExecutor.init(result);
        statementExecutor.getStatements().forEach(this::replayMethodsInvocation);
        return result;
    }
    
    private MergedResult mergeQuery(final List<QueryResult> queryResults) throws SQLException {
        ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
        MergeEngine mergeEngine = new MergeEngine(runtimeContext.getRule().toRules(), runtimeContext.getProperties(), runtimeContext.getDatabaseType(), runtimeContext.getMetaData().getSchema());
        return mergeEngine.merge(queryResults, executionContext.getSqlStatementContext());
    }
…
}

在該類的SQL執(zhí)行方法executeQuery脆贵、executeUpdate风响、execute方法中可以看到,其內(nèi)部先執(zhí)行了prepare方法丹禀,然后調(diào)用了StatementExecutor的executeQuery、executeUpdate鞋怀、execute方法双泪。
在prepare方法中,首先通過SimpleQueryPrepareEngine.prepare方法完成SQL的解析密似、路由計算焙矛、SQL改寫操作(這里已經(jīng)屬于內(nèi)核引擎部分,在總覽篇中已介紹残腌,本篇就不重復(fù)展開)村斟,然后對StatementExecutor進(jìn)行初始化以及底層Statement的方法回放(例如setFetchSize贫导、setQueryTimeout)。

真正的SQL執(zhí)行其實是由StatementExecutor完成的蟆盹,看下StatementExecutor的類層次圖:

org.apache.shardingsphere.shardingjdbc.executor.AbstractStatementExecutor

/**
 * Abstract statement executor.
 */
@Getter
public abstract class AbstractStatementExecutor {
    
    private final DatabaseType databaseType;
    
    private final int resultSetType;
    
    private final int resultSetConcurrency;
    
    private final int resultSetHoldability;
    
    private final ShardingConnection connection;
    
    private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;
    
    private final SQLExecuteTemplate sqlExecuteTemplate;
    
    private final Collection<Connection> connections = new LinkedList<>();
    
    private final List<List<Object>> parameterSets = new LinkedList<>();
    
    private final List<Statement> statements = new LinkedList<>();
    
    private final List<ResultSet> resultSets = new CopyOnWriteArrayList<>();
    
    private final Collection<InputGroup<StatementExecuteUnit>> inputGroups = new LinkedList<>();
    
    @Setter
    private SQLStatementContext sqlStatementContext;
    
    public AbstractStatementExecutor(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final ShardingConnection shardingConnection) {
        this.databaseType = shardingConnection.getRuntimeContext().getDatabaseType();
        this.resultSetType = resultSetType;
        this.resultSetConcurrency = resultSetConcurrency;
        this.resultSetHoldability = resultSetHoldability;
        this.connection = shardingConnection;
        int maxConnectionsSizePerQuery = connection.getRuntimeContext().getProperties().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
        ExecutorEngine executorEngine = connection.getRuntimeContext().getExecutorEngine();
        sqlExecutePrepareTemplate = new SQLExecutePrepareTemplate(maxConnectionsSizePerQuery);
        sqlExecuteTemplate = new SQLExecuteTemplate(executorEngine, connection.isHoldTransaction());
    }
    
…
protected final <T> List<T> executeCallback(final SQLExecuteCallback<T> executeCallback) throws SQLException {
        List<T> result = sqlExecuteTemplate.execute((Collection) inputGroups, executeCallback);// 對輸入的分組對象孩灯,執(zhí)行指定的回調(diào)操作
        refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);// 刷新元數(shù)據(jù), 因為DDL會修改元數(shù)據(jù)信息
        return result;
    }
…
    private void refreshMetaDataIfNeeded(final ShardingRuntimeContext runtimeContext, final SQLStatementContext sqlStatementContext) throws SQLException {
        if (null == sqlStatementContext) {
            return;
        }
        if (sqlStatementContext instanceof CreateTableStatementContext) {
            refreshTableMetaData(runtimeContext, ((CreateTableStatementContext) sqlStatementContext).getSqlStatement());
        } else if (sqlStatementContext instanceof AlterTableStatementContext) {
            refreshTableMetaData(runtimeContext, ((AlterTableStatementContext) sqlStatementContext).getSqlStatement());
        } else if (sqlStatementContext instanceof DropTableStatementContext) {
            refreshTableMetaData(runtimeContext, ((DropTableStatementContext) sqlStatementContext).getSqlStatement());
        } else if (sqlStatementContext instanceof CreateIndexStatementContext) {
            refreshTableMetaData(runtimeContext, ((CreateIndexStatementContext) sqlStatementContext).getSqlStatement());
        } else if (sqlStatementContext instanceof DropIndexStatementContext) {
            refreshTableMetaData(runtimeContext, ((DropIndexStatementContext) sqlStatementContext).getSqlStatement());
        }
    }
    
    private void refreshTableMetaData(final ShardingRuntimeContext runtimeContext, final CreateTableStatement createTableStatement) throws SQLException {
        String tableName = createTableStatement.getTable().getTableName().getIdentifier().getValue();
        runtimeContext.getMetaData().getSchema().put(tableName, loadTableMeta(tableName, databaseType));
}
….
}

AbstractStatementExecutor是各類StatementExecutor的抽象類逾滥,它包含了Statement執(zhí)行的各種信息峰档,包括resultSetType、resultSetConcurrency寨昙、resultSetHoldability讥巡,另外也持有ShardingSphere的Connection實例、路由后的真實Statement列表舔哪,參數(shù)集合欢顷、執(zhí)行后的結(jié)果集、執(zhí)行分組信息捉蚤、以及執(zhí)行準(zhǔn)備引擎抬驴、執(zhí)行引擎。

在executeCallback 方法中外里,可以看到其內(nèi)部是通過SQLExecuteTemplate來真正完成SQL的執(zhí)行怎爵,SQLExecuteTemplate是執(zhí)行引擎的關(guān)鍵類,已在執(zhí)行引擎篇進(jìn)行了分析盅蝗,這里就不展開鳖链;之后調(diào)用了refreshMetaDataIfNeeded方法,對元數(shù)據(jù)進(jìn)行了刷新墩莫,因為當(dāng)執(zhí)行DDL語句時芙委,表名、列名狂秦、字段類型灌侣、主鍵以及索引等信息可能發(fā)生了變化,因此需要同步對元數(shù)據(jù)進(jìn)行修改裂问。這里的元數(shù)據(jù)指的就是RuntimeContext中的ShardingSphereMetaData屬性侧啼,該類實例封裝了ShardingSphere所需分庫分表等各類元數(shù)據(jù),在各環(huán)節(jié)堪簿、引擎之間傳遞痊乾,提供所需的各類基礎(chǔ)信息。

org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor

/**
 * Prepared statement executor.
 */
public final class PreparedStatementExecutor extends AbstractStatementExecutor {
    
    @Getter
    private final boolean returnGeneratedKeys;
    
    public PreparedStatementExecutor(
            final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys, final ShardingConnection shardingConnection) {
        super(resultSetType, resultSetConcurrency, resultSetHoldability, shardingConnection);
        this.returnGeneratedKeys = returnGeneratedKeys;
    }
    
    /**
     * Initialize executor.
     *
     * @param executionContext execution context
     * @throws SQLException SQL exception
     */
    public void init(final ExecutionContext executionContext) throws SQLException {
        setSqlStatementContext(executionContext.getSqlStatementContext());
        getInputGroups().addAll(obtainExecuteGroups(executionContext.getExecutionUnits()));// 生成執(zhí)行分組
        cacheStatements();
    }

    private Collection<InputGroup<StatementExecuteUnit>> obtainExecuteGroups(final Collection<ExecutionUnit> executionUnits) throws SQLException {
        return getSqlExecutePrepareTemplate().getExecuteUnitGroups(executionUnits, new SQLExecutePrepareCallback() {

            @Override
            // 在指定數(shù)據(jù)源上創(chuàng)建要求數(shù)量的數(shù)據(jù)庫連接
            public List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException {
                return PreparedStatementExecutor.super.getConnection().getConnections(connectionMode, dataSourceName, connectionSize);
            }

            @Override
            //根據(jù)執(zhí)行單元信息 創(chuàng)建Statement執(zhí)行單元對象
            public StatementExecuteUnit createStatementExecuteUnit(final Connection connection, final ExecutionUnit executionUnit, final ConnectionMode connectionMode) throws SQLException {
                return new StatementExecuteUnit(executionUnit, createPreparedStatement(connection, executionUnit.getSqlUnit().getSql()), connectionMode);
            }
        });
    }

    @SuppressWarnings("MagicConstant")
    private PreparedStatement createPreparedStatement(final Connection connection, final String sql) throws SQLException {
        return returnGeneratedKeys ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)
                : connection.prepareStatement(sql, getResultSetType(), getResultSetConcurrency(), getResultSetHoldability());
    }
    
    /**
     * Execute query.
     *
     * @return result set list
     * @throws SQLException SQL exception
     */
    public List<QueryResult> executeQuery() throws SQLException {
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        SQLExecuteCallback<QueryResult> executeCallback = new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
            
            @Override
            // 在指定的Statement上執(zhí)行SQL椭更,將JDBC結(jié)果集包裝成查詢QueryResult對象(基于流模式哪审、基于內(nèi)存模式兩類)
            protected QueryResult executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
                return getQueryResult(statement, connectionMode);
            }
        };
        return executeCallback(executeCallback);// 通過executeCallback操作
    }
…
}

可以看到在最終的StatementExecutor實現(xiàn)類中,實現(xiàn)了executeQuery虑瀑、executeUpdate湿滓、execute方法滴须,這些方法中定義SQLExecuteCallback實例,然后由父類中executeCallback完成最后的SQL執(zhí)行叽奥。

在該類中完成的關(guān)鍵操作是init方法扔水,其生成了執(zhí)行分組信息,同時調(diào)用父類中cacheStatements方法而线,將底層Statement和參數(shù)進(jìn)行了記錄铭污。

回到Statement實現(xiàn)類,簡單看下主從功能對應(yīng)的
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.MasterSlaveStatement

/**
 * Statement that support master-slave.
 */
@Getter
public final class MasterSlaveStatement extends AbstractStatementAdapter {
    
    private final MasterSlaveConnection connection;
    
    @Getter(AccessLevel.NONE)
    private final DataNodeRouter dataNodeRouter;
    
    private final int resultSetType;
    
    private final int resultSetConcurrency;
    
    private final int resultSetHoldability;
    
    private final Collection<Statement> routedStatements = new LinkedList<>();
    
    public MasterSlaveStatement(final MasterSlaveConnection connection) {
        this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
    }
    
    public MasterSlaveStatement(final MasterSlaveConnection connection, final int resultSetType, final int resultSetConcurrency) {
        this(connection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);
    }
    
    public MasterSlaveStatement(final MasterSlaveConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
        super(Statement.class);
        this.connection = connection;
        dataNodeRouter = new DataNodeRouter(connection.getRuntimeContext().getMetaData(), connection.getRuntimeContext().getProperties(), connection.getRuntimeContext().getSqlParserEngine());
        this.resultSetType = resultSetType;
        this.resultSetConcurrency = resultSetConcurrency;
        this.resultSetHoldability = resultSetHoldability;
    }
    
    @Override
    public ResultSet executeQuery(final String sql) throws SQLException {
        if (Strings.isNullOrEmpty(sql)) {
            throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
        }
        clearPrevious();
        MasterSlaveRuntimeContext runtimeContext = connection.getRuntimeContext();
        SimpleQueryPrepareEngine prepareEngine = new SimpleQueryPrepareEngine(
                Collections.singletonList(runtimeContext.getRule()), runtimeContext.getProperties(), runtimeContext.getMetaData(), runtimeContext.getSqlParserEngine());
        ExecutionContext executionContext = prepareEngine.prepare(sql, Collections.emptyList());
        ExecutionUnit executionUnit = executionContext.getExecutionUnits().iterator().next();
        Preconditions.checkState(1 == executionContext.getExecutionUnits().size(), "Cannot support executeQuery for DML or DDL");
        Statement statement = connection.getConnection(executionUnit.getDataSourceName()).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
        routedStatements.add(statement);
        return statement.executeQuery(executionUnit.getSqlUnit().getSql());
    }
    
    @Override
    public int executeUpdate(final String sql) throws SQLException {
        clearPrevious();
        int result = 0;
        MasterSlaveRuntimeContext runtimeContext = connection.getRuntimeContext();
        RouteContext routeContext = dataNodeRouter.route(sql, Collections.emptyList(), false);
        routeContext = new MasterSlaveRouteDecorator().decorate(routeContext, runtimeContext.getMetaData(), runtimeContext.getRule(), runtimeContext.getProperties());
        for (RouteUnit each : routeContext.getRouteResult().getRouteUnits()) {
            Statement statement = connection.getConnection(each.getDataSourceMapper().getActualName()).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
            routedStatements.add(statement);
            result += statement.executeUpdate(sql);
        }
        return result;
}
…
}

可以看到膀篮,與ShardingStatement類似嘹狞,executeQuery方法中,也是通過SimpleQueryPrepareEngine進(jìn)行prepare操作誓竿,不過與ShardingStatement中通過StatementExecutor創(chuàng)建Statement磅网,執(zhí)行SQL不同,這里是直接調(diào)用MasterSlaveConnection. getConnection拿到Connection筷屡,然后直接調(diào)用createStatement方法進(jìn)行Statement涧偷,進(jìn)而執(zhí)行SQL的。

另外在其executeUpdate方法中毙死,也不像ShardingStatement中統(tǒng)一都使用SimpleQueryPrepareEngine進(jìn)行prepare操作燎潮,而是,直接就操作了dataNodeRouter. route方法扼倘,和MasterSlaveRouteDecorator. decorate方法确封,這種不一致顯然不如ShardingStatement更加容易讓人理解。

PreparedStatement

接下來看下PreparedStatement分支再菊,首先看下其抽象類

org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractShardingPreparedStatementAdapter

/**
 * Sharding adapter for {@code PreparedStatement}.
 */
public abstract class AbstractShardingPreparedStatementAdapter extends AbstractUnsupportedOperationPreparedStatement {
    
    private final List<SetParameterMethodInvocation> setParameterMethodInvocations = new LinkedList<>();
    
    @Getter
    private final List<Object> parameters = new ArrayList<>();
    
    @Override
    public final void setNull(final int parameterIndex, final int sqlType) {
        setParameter(parameterIndex, null);
    }
    
    @Override
    public final void setNull(final int parameterIndex, final int sqlType, final String typeName) {
        setParameter(parameterIndex, null);
    }
…
private void setParameter(final int parameterIndex, final Object value) {
        if (parameters.size() == parameterIndex - 1) {
            parameters.add(value);
            return;
        }
        for (int i = parameters.size(); i <= parameterIndex - 1; i++) {
            parameters.add(null);
        }
        parameters.set(parameterIndex - 1, value);
    }
    
    protected final void replaySetParameter(final PreparedStatement preparedStatement, final List<Object> parameters) {
        setParameterMethodInvocations.clear();
        addParameters(parameters);
        for (SetParameterMethodInvocation each : setParameterMethodInvocations) {
            each.invoke(preparedStatement);
        }
    }
    
    private void addParameters(final List<Object> parameters) {
        int i = 0;
        for (Object each : parameters) {
            setParameters(new Class[]{int.class, Object.class}, i++ + 1, each);
        }
    }
    
    @SneakyThrows
    private void setParameters(final Class[] argumentTypes, final Object... arguments) {
        setParameterMethodInvocations.add(new SetParameterMethodInvocation(PreparedStatement.class.getMethod("setObject", argumentTypes), arguments, arguments[1]));
    }
    
    @Override
    public final void clearParameters() {
        parameters.clear();
        setParameterMethodInvocations.clear();
    }
}

可以看到該類實現(xiàn)了PreparedStatement接口的各種參數(shù)設(shè)置方法爪喘,其parameters記錄了設(shè)置的各參數(shù),setParameterMethodInvocations屬性記錄了參數(shù)設(shè)置方法調(diào)用纠拔。

而在其子類中秉剑,org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement

/**
 * PreparedStatement that support sharding.
 */
public final class ShardingPreparedStatement extends AbstractShardingPreparedStatementAdapter {
    
    @Getter
    private final ShardingConnection connection;
    
    private final String sql;
    
    @Getter
    private final ParameterMetaData parameterMetaData;
    
    private final BasePrepareEngine prepareEngine;
    
    private final PreparedStatementExecutor preparedStatementExecutor;
    
    private final BatchPreparedStatementExecutor batchPreparedStatementExecutor;
    
    private final Collection<Comparable<?>> generatedValues = new LinkedList<>();
    
    private ExecutionContext executionContext;
    
    private ResultSet currentResultSet;
    
    public ShardingPreparedStatement(final ShardingConnection connection, final String sql) throws SQLException {
        this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false);
    }
    
    public ShardingPreparedStatement(final ShardingConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
        this(connection, sql, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT, false);
    }
    
    public ShardingPreparedStatement(final ShardingConnection connection, final String sql, final int autoGeneratedKeys) throws SQLException {
        this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, Statement.RETURN_GENERATED_KEYS == autoGeneratedKeys);
    }
    
    public ShardingPreparedStatement(
        final ShardingConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
        this(connection, sql, resultSetType, resultSetConcurrency, resultSetHoldability, false);
    }
    
    private ShardingPreparedStatement(final ShardingConnection connection, final String sql,
                                      final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys) throws SQLException {
        if (Strings.isNullOrEmpty(sql)) {
            throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
        }
        this.connection = connection;
        this.sql = sql;
        ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
        parameterMetaData = new ShardingParameterMetaData(runtimeContext.getSqlParserEngine(), sql);
        prepareEngine = new PreparedQueryPrepareEngine(runtimeContext.getRule().toRules(), runtimeContext.getProperties(), runtimeContext.getMetaData(), runtimeContext.getSqlParserEngine());
        preparedStatementExecutor = new PreparedStatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, returnGeneratedKeys, connection);
        batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, returnGeneratedKeys, connection);
    }
    
    @Override
    public ResultSet executeQuery() throws SQLException {
        ResultSet result;
        try {
            clearPrevious();
            prepare();
            initPreparedStatementExecutor();
            MergedResult mergedResult = mergeQuery(preparedStatementExecutor.executeQuery());
            result = new ShardingResultSet(preparedStatementExecutor.getResultSets(), mergedResult, this, executionContext);
        } finally {
            clearBatch();
        }
        currentResultSet = result;
        return result;
    }
    
    @Override
    public int executeUpdate() throws SQLException {
        try {
            clearPrevious();
            prepare();
            initPreparedStatementExecutor();
            return preparedStatementExecutor.executeUpdate();
        } finally {
            clearBatch();
        }
}
…
private void initPreparedStatementExecutor() throws SQLException {
        preparedStatementExecutor.init(executionContext);
        setParametersForStatements();
        replayMethodForStatements();
    }
    
    private void setParametersForStatements() {
        for (int i = 0; i < preparedStatementExecutor.getStatements().size(); i++) {
            replaySetParameter((PreparedStatement) preparedStatementExecutor.getStatements().get(i), preparedStatementExecutor.getParameterSets().get(i));
        }
    }
    
    private void replayMethodForStatements() {
        for (Statement each : preparedStatementExecutor.getStatements()) {
            replayMethodsInvocation(each);
        }
}
…
}

可以看到,該類中實現(xiàn)了executeQuery稠诲、executeUpdate侦鹏、execute方法,這幾個方法的 內(nèi)部又分別調(diào)用clearPrevious、prepare臀叙、 initPreparedStatementExecutor以及PreparedStatementExecutor中對應(yīng)的execute*方法种柑。

  • clearPrevious方法即清空該PreparedStatement之前執(zhí)行的參數(shù)、連接匹耕、結(jié)果集、分組信息以及Statement對象荠雕;
  • prepare方法是通過調(diào)用PreparedQueryPrepareEngine#prepare稳其,其內(nèi)部實現(xiàn)邏輯在總覽篇和引擎篇已進(jìn)行了分析驶赏,這里不重復(fù)展開。
  • initPreparedStatementExecutor方法則獲取底層真實的PreparedStatement既鞠,然后設(shè)置對應(yīng)的參數(shù)煤傍。
  • PreparedStatementExecutor的execute*方法,前面已介紹嘱蛋。

除了參數(shù)設(shè)置蚯姆,與ShardingStatement相比,ShardingPreparedStatement還支持批量操作洒敏,即JDBC中的addBatch龄恋、executeBatch方法。

看下ShardingPreparedStatement中這兩批量操作方法的實現(xiàn)凶伙。

    
    @Override
    public void addBatch() {
        try {
            prepare();
            batchPreparedStatementExecutor.addBatchForRouteUnits(executionContext);
        } finally {
            currentResultSet = null;
            clearParameters();
        }
    }
    
    @Override
    public int[] executeBatch() throws SQLException {
        try {
            initBatchPreparedStatementExecutor();
            return batchPreparedStatementExecutor.executeBatch();
        } finally {
            clearBatch();
        }
    }
    
    private void initBatchPreparedStatementExecutor() throws SQLException {
        batchPreparedStatementExecutor.init(executionContext.getSqlStatementContext());
        setBatchParametersForStatements();
    }
    
    private void setBatchParametersForStatements() throws SQLException {
        for (Statement each : batchPreparedStatementExecutor.getStatements()) {
            List<List<Object>> parameterSet = batchPreparedStatementExecutor.getParameterSet(each);
            for (List<Object> parameters : parameterSet) {
                replaySetParameter((PreparedStatement) each, parameters);
                ((PreparedStatement) each).addBatch();
            }
        }
    }
    
    @Override
    public void clearBatch() throws SQLException {
        currentResultSet = null;
        batchPreparedStatementExecutor.clear();
        clearParameters();
    }
    

可以看到郭毕,addBatch方法在調(diào)用prepare方法后,又調(diào)用了BatchPreparedStatementExecutor#addBatchForRouteUnits(executionContext)函荣,而executeBatch方法則調(diào)用了BatchPreparedStatementExecutor#init和executeBatch方法显押,setBatchParametersForStatements方法則獲取到BatchPreparedStatementExecutor內(nèi)部的Statement和參數(shù)集合后,調(diào)用底層PreparedStatement#addBatch方法傻挂,添加批量參數(shù)乘碑。接下來看下BatchPreparedStatementExecutor類。

org.apache.shardingsphere.shardingjdbc.executor.batch.BatchPreparedStatementExecutor

/**
 * Prepared statement executor to process add batch.
 */
public final class BatchPreparedStatementExecutor extends AbstractStatementExecutor {
    
    private final Collection<BatchRouteUnit> routeUnits = new LinkedList<>();
    
    @Getter
    private final boolean returnGeneratedKeys;
    
    private int batchCount;
    
    public BatchPreparedStatementExecutor(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys,
                                          final ShardingConnection shardingConnection) {
        super(resultSetType, resultSetConcurrency, resultSetHoldability, shardingConnection);
        this.returnGeneratedKeys = returnGeneratedKeys;
}
    /**
     * Initialize executor.
     *
     * @param sqlStatementContext SQL statement context
     * @throws SQLException SQL exception
     */
    public void init(final SQLStatementContext sqlStatementContext) throws SQLException {
        setSqlStatementContext(sqlStatementContext);
        getInputGroups().addAll(obtainExecuteGroups(routeUnits));
    }
…
    /**
     * Add batch for route units.
     *
     * @param executionContext execution context
     */
    public void addBatchForRouteUnits(final ExecutionContext executionContext) {
        handleOldBatchRouteUnits(createBatchRouteUnits(executionContext.getExecutionUnits()));
        handleNewBatchRouteUnits(createBatchRouteUnits(executionContext.getExecutionUnits()));
        batchCount++;
    }
    
    private Collection<BatchRouteUnit> createBatchRouteUnits(final Collection<ExecutionUnit> executionUnits) {
        Collection<BatchRouteUnit> result = new LinkedList<>();
        for (ExecutionUnit each : executionUnits) {
            result.add(new BatchRouteUnit(each));
        }
        return result;
    }
    
    private void handleOldBatchRouteUnits(final Collection<BatchRouteUnit> newRouteUnits) {
        for (BatchRouteUnit each : newRouteUnits) {
            for (BatchRouteUnit unit : routeUnits) {
                if (unit.equals(each)) {
                    reviseBatchRouteUnit(unit, each);
                }
            }
        }
    }
    
    private void reviseBatchRouteUnit(final BatchRouteUnit oldBatchRouteUnit, final BatchRouteUnit newBatchRouteUnit) {
        oldBatchRouteUnit.getExecutionUnit().getSqlUnit().getParameters().addAll(newBatchRouteUnit.getExecutionUnit().getSqlUnit().getParameters());
        oldBatchRouteUnit.mapAddBatchCount(batchCount);
    }
    
    private void handleNewBatchRouteUnits(final Collection<BatchRouteUnit> newRouteUnits) {
        newRouteUnits.removeAll(routeUnits);
        for (BatchRouteUnit each : newRouteUnits) {
            each.mapAddBatchCount(batchCount);
        }
        routeUnits.addAll(newRouteUnits);
    }
…
/**
     * Execute batch.
     * 
     * @return execute results
     * @throws SQLException SQL exception
     */
    public int[] executeBatch() throws SQLException {
        final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
        SQLExecuteCallback<int[]> callback = new SQLExecuteCallback<int[]>(getDatabaseType(), isExceptionThrown) {
            
            @Override
            protected int[] executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode) throws SQLException {
                return statement.executeBatch();
            }
        };
        List<int[]> results = executeCallback(callback);
        if (isAccumulate()) {
            return accumulate(results);
        } else {
            return results.get(0);
        }
}
…
}

可以看到在addBatchForRouteUnits方法中金拒,handleOldBatchRouteUnits和
handleNewBatchRouteUnits這兩個方法其實就是按照路由單元進(jìn)行合并批量執(zhí)行的參數(shù)兽肤;而在init方法中則生成了執(zhí)行分組信息;而executeBatch依然是通過父類中executeCallback方法按照分組殖蚕,依次調(diào)用底層PreparedStatement的executeBatch進(jìn)行SQL的執(zhí)行轿衔。

ResultSet

ResultSet接口表示數(shù)據(jù)庫結(jié)果集(通常由Statement執(zhí)行完查詢SQL后得到),它包含了一個類似表格的數(shù)據(jù)睦疫,內(nèi)部維護(hù)了一個游標(biāo)害驹,訪問完一條記錄的各列后,可以通過next()方法將游標(biāo)指向下一行數(shù)據(jù)蛤育,直到訪問到最后一條數(shù)據(jù)為止宛官。

ShardingSphere中ResultSet接口的實現(xiàn)類圖

同樣,ResultSet的實現(xiàn)類中瓦糕,也定義了幾個抽象的UnSupported類底洗,AbstractUnsupportedUpdateOperationResultSet實現(xiàn)了ResultSet中更新類的方法,目前在ShardingSphere中不支持ResultSet接口中update*方法咕娄,AbstractUnsupportedOperationResultSet實現(xiàn)了其它操作類方法亥揖,AbstractUnsupportedDatabaseMetaDataResultSet則定義了DatabaseMetaData不支持的方法(ShardingDatabaseMetaData中g(shù)etSchemas、getColumns等方法的返回值),AbstractUnsupportedGeneratedKeysResultSet定義了GeneratedKeys不支持方法(Statement中g(shù)etGeneratedKeys()返回值)费变。

org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractResultSetAdapter

/**
 * Adapter for {@code ResultSet}.
 */
public abstract class AbstractResultSetAdapter extends AbstractUnsupportedOperationResultSet {
    
    @Getter
    private final List<ResultSet> resultSets;
    
    @Getter
    private final Statement statement;
    
    private boolean closed;
    
    private final ForceExecuteTemplate<ResultSet> forceExecuteTemplate = new ForceExecuteTemplate<>();
    
    @Getter
    private final ExecutionContext executionContext;
    
    public AbstractResultSetAdapter(final List<ResultSet> resultSets, final Statement statement, final ExecutionContext executionContext) {
        Preconditions.checkArgument(!resultSets.isEmpty());
        this.resultSets = resultSets;
        this.statement = statement;
        this.executionContext = executionContext;
    }
    
    @Override
    public final ResultSetMetaData getMetaData() throws SQLException {
        return new ShardingResultSetMetaData(resultSets.get(0).getMetaData(), getShardingRule(), executionContext.getSqlStatementContext());
    }
    
    private ShardingRule getShardingRule() {
        ShardingConnection connection = statement instanceof ShardingPreparedStatement ? ((ShardingPreparedStatement) statement).getConnection() : ((ShardingStatement) statement).getConnection();
        return connection.getRuntimeContext().getRule();
    }
    
    @Override
    public final int findColumn(final String columnLabel) throws SQLException {
        return resultSets.get(0).findColumn(columnLabel);
}
…
}

可以看到大部分方法實現(xiàn)摧扇,是通過第一個底層ResultSet,調(diào)用其對應(yīng)方法獲得挚歧,這些屬性的設(shè)置扛稽,也依次對各個底層ResultSet執(zhí)行對應(yīng)設(shè)置方法即可。

看下數(shù)據(jù)分片對應(yīng)的ShardingResultSet類

org.apache.shardingsphere.shardingjdbc.jdbc.core.resultset.ShardingResultSet

/**
 * Result that support sharding.
 */
public final class ShardingResultSet extends AbstractResultSetAdapter {
    
    private final MergedResult mergeResultSet;
    
    private final Map<String, Integer> columnLabelAndIndexMap;

    public ShardingResultSet(final List<ResultSet> resultSets, final MergedResult mergeResultSet, final Statement statement, final ExecutionContext executionContext) throws SQLException {
        super(resultSets, statement, executionContext);
        this.mergeResultSet = mergeResultSet;
        columnLabelAndIndexMap = createColumnLabelAndIndexMap(resultSets.get(0).getMetaData());
    }
    
    private Map<String, Integer> createColumnLabelAndIndexMap(final ResultSetMetaData resultSetMetaData) throws SQLException {
        Map<String, Integer> result = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
        for (int columnIndex = resultSetMetaData.getColumnCount(); columnIndex > 0; columnIndex--) {
            result.put(resultSetMetaData.getColumnLabel(columnIndex), columnIndex);
        }
        return result;
    }
    
    @Override
    public boolean next() throws SQLException {
        return mergeResultSet.next();
    }
    
    @Override
    public boolean wasNull() throws SQLException {
        return mergeResultSet.wasNull();
    }
    
    @Override
    public boolean getBoolean(final int columnIndex) throws SQLException {
        return (boolean) ResultSetUtil.convertValue(mergeResultSet.getValue(columnIndex, boolean.class), boolean.class);
    }
    
    @Override
    public boolean getBoolean(final String columnLabel) throws SQLException {
        int columnIndex = columnLabelAndIndexMap.get(columnLabel);
        return (boolean) ResultSetUtil.convertValue(mergeResultSet.getValue(columnIndex, boolean.class), boolean.class);
    }
…

}

可以看到各方法調(diào)用的其實是MergedResult#getValue獲取指定的列的值滑负。

ResultSet作為JDBC的一個通用接口在张,不僅作為SQL查詢返回的結(jié)果標(biāo)識,還用在了很多多條記錄表示的場景矮慕,例如在DatabaseMetaData帮匾、Statement的自增生成鍵結(jié)果等。

org.apache.shardingsphere.shardingjdbc.jdbc.core.resultset.DatabaseMetaDataResultSet

/**
 * Database meta data result set.
 */
public final class DatabaseMetaDataResultSet<T extends BaseRule> extends AbstractUnsupportedDatabaseMetaDataResultSet {
    
    private static final String TABLE_NAME = "TABLE_NAME";
    
    private static final String INDEX_NAME = "INDEX_NAME";
    
    private final int type;
    
    private final int concurrency;
    
    private final T rule;
    
    private final ResultSetMetaData resultSetMetaData;
    
    private final Map<String, Integer> columnLabelIndexMap;
    
    private final Iterator<DatabaseMetaDataObject> databaseMetaDataObjectIterator;
    
    private volatile boolean closed;
    
    private DatabaseMetaDataObject currentDatabaseMetaDataObject;
    
    public DatabaseMetaDataResultSet(final ResultSet resultSet, final T rule) throws SQLException {
        this.type = resultSet.getType();
        this.concurrency = resultSet.getConcurrency();
        this.rule = rule;
        this.resultSetMetaData = resultSet.getMetaData();
        this.columnLabelIndexMap = initIndexMap();
        this.databaseMetaDataObjectIterator = initIterator(resultSet);
    }
    
    private Map<String, Integer> initIndexMap() throws SQLException {
        Map<String, Integer> result = new HashMap<>(resultSetMetaData.getColumnCount());
        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
            result.put(resultSetMetaData.getColumnLabel(i), i);
        }
        return result;
    }
    
    private Iterator<DatabaseMetaDataObject> initIterator(final ResultSet resultSet) throws SQLException {
        LinkedList<DatabaseMetaDataObject> result = new LinkedList<>();
        Set<DatabaseMetaDataObject> removeDuplicationSet = new HashSet<>();
        int tableNameColumnIndex = columnLabelIndexMap.getOrDefault(TABLE_NAME, -1);
        int indexNameColumnIndex = columnLabelIndexMap.getOrDefault(INDEX_NAME, -1);
        while (resultSet.next()) {
            DatabaseMetaDataObject databaseMetaDataObject = generateDatabaseMetaDataObject(tableNameColumnIndex, indexNameColumnIndex, resultSet);
            if (!removeDuplicationSet.contains(databaseMetaDataObject)) {
                result.add(databaseMetaDataObject);
                removeDuplicationSet.add(databaseMetaDataObject);
            }
        }
        return result.iterator();
    }
    
    private DatabaseMetaDataObject generateDatabaseMetaDataObject(final int tableNameColumnIndex, final int indexNameColumnIndex, final ResultSet resultSet) throws SQLException {
        DatabaseMetaDataObject result = new DatabaseMetaDataObject(resultSetMetaData.getColumnCount());
        for (int i = 1; i <= columnLabelIndexMap.size(); i++) {
            if (tableNameColumnIndex == i) {
                String tableName = resultSet.getString(i);
                Collection<String> logicTableNames = rule instanceof ShardingRule ? ((ShardingRule) rule).getLogicTableNames(tableName) : Collections.emptyList();
                result.addObject(logicTableNames.isEmpty() ? tableName : logicTableNames.iterator().next());
            } else if (indexNameColumnIndex == i) {
                String tableName = resultSet.getString(tableNameColumnIndex);
                String indexName = resultSet.getString(i);
                result.addObject(null != indexName && indexName.endsWith(tableName) ? indexName.substring(0, indexName.indexOf(tableName) - 1) : indexName);
            } else {
                result.addObject(resultSet.getObject(i));
            }
        }
        return result;
    }
    
    @Override
    public boolean next() throws SQLException {
        checkClosed();
        if (databaseMetaDataObjectIterator.hasNext()) {
            currentDatabaseMetaDataObject = databaseMetaDataObjectIterator.next();
            return true;
        }
        return false;
    }
    
    @Override
    public void close() throws SQLException {
        checkClosed();
        closed = true;
    }
    
    @Override
    public boolean wasNull() throws SQLException {
        checkClosed();
        return false;
    }
    
    @Override
    public String getString(final int columnIndex) throws SQLException {
        checkClosed();
        checkColumnIndex(columnIndex);
        return (String) ResultSetUtil.convertValue(currentDatabaseMetaDataObject.getObject(columnIndex), String.class);
}
…
}

這些ResultSet的實現(xiàn)類凡傅,其構(gòu)造函數(shù)都會傳入一個底層ResultSet或者迭代器Iterator辟狈,next()方法其實就是通過調(diào)用內(nèi)部ResultSet或者Iterator的nex()完成游標(biāo)的遷移,而get方法則直接調(diào)用 ResultSet或者Iterator中的值的get方法夏跷。

MetaData類接口

JDBC中有幾個元數(shù)據(jù)接口DatabaseMetaData哼转、ResultSetMetaData、ParameterMetaData槽华,
分別表示數(shù)據(jù)庫+驅(qū)動元數(shù)據(jù)(Catalog壹蔓、Schema、Table猫态、Column佣蓉、Index等信息,通過Connection#getMetaData()方法獲得)亲雪、結(jié)果集元數(shù)據(jù)(列的類型屬性勇凭,通過ResultSet# getMetaData()方法獲得)以及參數(shù)元數(shù)據(jù)(參數(shù)數(shù)量、類型以及屬性义辕,通過PreparedStatement# getParameterMetaData()獲得)虾标。

DatabaseMetaData

ShardingSphere中DatabaseMetaData接口的實現(xiàn)類圖

看下涉及到的幾個類
org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.metadata.AdaptedDatabaseMetaData

/**
 * Adapted database meta data.
 */
@RequiredArgsConstructor
public abstract class AdaptedDatabaseMetaData extends WrapperAdapter implements DatabaseMetaData {
    
    private final CachedDatabaseMetaData cachedDatabaseMetaData;
    
    @Override
    public final String getURL() {
        return cachedDatabaseMetaData.getUrl();
    }
    
    @Override
    public final String getUserName() {
        return cachedDatabaseMetaData.getUserName();
    }
    
    @Override
    public final String getDatabaseProductName() {
        return cachedDatabaseMetaData.getDatabaseProductName();
    }

看到此類對DatabaseMetaData方法進(jìn)行了實現(xiàn),都是直接調(diào)用CachedDatabaseMetaData實例對應(yīng)方法直接返回灌砖。

org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.metadata.MultipleDatabaseMetaData

/**
 * Multiple database meta data.
 */
@Getter
public abstract class MultipleDatabaseMetaData<C extends AbstractConnectionAdapter> extends AdaptedDatabaseMetaData {
    
    private final C connection;
    
    private final Collection<String> datasourceNames;
    
    private final ShardingSphereMetaData shardingSphereMetaData;
    
    private String currentDataSourceName;
    
    private DatabaseMetaData currentDatabaseMetaData;
    
    public MultipleDatabaseMetaData(final C connection, final Collection<String> datasourceNames,
                                    final CachedDatabaseMetaData cachedDatabaseMetaData, final ShardingSphereMetaData shardingSphereMetaData) {
        super(cachedDatabaseMetaData);
        this.connection = connection;
        this.datasourceNames = datasourceNames;
        this.shardingSphereMetaData = shardingSphereMetaData;
    }
    
    @Override
    public final Connection getConnection() throws SQLException {
        return connection.getConnection(getDataSourceName());
    }
    
    @Override
    public final ResultSet getSuperTypes(final String catalog, final String schemaPattern, final String typeNamePattern) throws SQLException {
        return createDatabaseMetaDataResultSet(getDatabaseMetaData().getSuperTypes(getActualCatalog(catalog), getActualSchema(schemaPattern), typeNamePattern));
    }
    
    @Override
    public final ResultSet getSuperTables(final String catalog, final String schemaPattern, final String tableNamePattern) throws SQLException {
        return createDatabaseMetaDataResultSet(getDatabaseMetaData().getSuperTables(getActualCatalog(catalog), getActualSchema(schemaPattern), getActualTableNamePattern(tableNamePattern)));
    @Override
    public final ResultSet getColumns(final String catalog, final String schemaPattern, final String tableNamePattern, final String columnNamePattern) throws SQLException {
        return createDatabaseMetaDataResultSet(
            getDatabaseMetaData().getColumns(getActualCatalog(catalog), getActualSchema(schemaPattern), getActualTableNamePattern(tableNamePattern), columnNamePattern));
    }
    
…
  protected abstract String getActualTableNamePattern(String tableNamePattern);
    
    protected abstract String getActualTable(String table);
    
    protected abstract ResultSet createDatabaseMetaDataResultSet(ResultSet resultSet) throws SQLException;
…
}

可以看到璧函,MultipleDatabaseMetaData類中,對AdaptedDatabaseMetaData類中大多數(shù)方法進(jìn)行了重寫基显,通過調(diào)用子類實現(xiàn)的createDatabaseMetaDataResultSet方法獲得真實的DatabaseMetaDataResultSet蘸吓,同時通過子類實現(xiàn)getActualTableNamePattern、getActualTable方法獲取到真實物理表名信息撩幽。

org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.metadata.ShardingDatabaseMetaData

/**
 * Sharding database meta data.
 */
public final class ShardingDatabaseMetaData extends MultipleDatabaseMetaData<ShardingConnection> {
    
    private ShardingRule shardingRule;
    
    public ShardingDatabaseMetaData(final ShardingConnection connection) {
        super(connection, connection.getDataSourceMap().keySet(), connection.getRuntimeContext().getCachedDatabaseMetaData(), connection.getRuntimeContext().getMetaData());
        shardingRule = connection.getRuntimeContext().getRule();
    }
    
    @Override
    public String getActualTableNamePattern(final String tableNamePattern) {
        if (null == tableNamePattern) {
            return null;
        }
        return shardingRule.findTableRule(tableNamePattern).isPresent() ? "%" + tableNamePattern + "%" : tableNamePattern;
    }
    
    @Override
    public String getActualTable(final String table) {
        if (null == table) {
            return null;
        }
        String result = table;
        if (shardingRule.findTableRule(table).isPresent()) {
            DataNode dataNode = shardingRule.getDataNode(table);
            result = dataNode.getTableName();
        }
        return result;
    }
    
    @Override
    protected ResultSet createDatabaseMetaDataResultSet(final ResultSet resultSet) throws SQLException {
        return new DatabaseMetaDataResultSet<>(resultSet, shardingRule);
    }
}

可以看到库继,在ShardingDatabaseMetaData類中,主要實現(xiàn)的getActualTable 、getActualTableNamePattern方法中就是根據(jù)ShardingRule查找到真實的表名宪萄、表名pattern以及創(chuàng)建DatabaseMetaDataResultSet實例舅桩。

ResultSetMetaData

ShardingSphere中ResultSetMetaData接口的實現(xiàn)類圖

*org.apache.shardingsphere.shardingjdbc.jdbc.core.resultset.ShardingResultSetMetaData *

/**
 * Sharding result set meta data.
 */
@RequiredArgsConstructor
public final class ShardingResultSetMetaData extends WrapperAdapter implements ResultSetMetaData {
    
    private final ResultSetMetaData resultSetMetaData;
    
    private final ShardingRule shardingRule;
    
    private final SQLStatementContext sqlStatementContext;
    
    @Override
    public int getColumnCount() {
        return sqlStatementContext instanceof SelectStatementContext ? ((SelectStatementContext) sqlStatementContext).getProjectionsContext().getExpandProjections().size() : 0;
    }
    
    @Override
    public boolean isAutoIncrement(final int column) throws SQLException {
        return resultSetMetaData.isAutoIncrement(column);
}
…
    @Override
    public String getTableName(final int column) throws SQLException {
        String actualTableName = resultSetMetaData.getTableName(column);
        return shardingRule.getLogicTableNames(actualTableName).isEmpty() ? actualTableName : shardingRule.getLogicTableNames(actualTableName).iterator().next();
    }
    
    @Override
    public String getCatalogName(final int column) {
        return DefaultSchema.LOGIC_NAME;
}
…
}

這個類中實現(xiàn)比較簡單,大多數(shù)方法邏輯就是調(diào)用構(gòu)造函數(shù)傳入的ResultSetMetaData對應(yīng)方法雨膨,其它有一些則進(jìn)行了物理表名與邏輯表名的轉(zhuǎn)換和擴(kuò)展projection的判斷等。ResultSetMetaData的傳入則是在AbstractResultSetAdapter中取的第一個內(nèi)部底層ResultSet读串。

org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractResultSetAdapter

 @Override
    public final ResultSetMetaData getMetaData() throws SQLException {
        return new ShardingResultSetMetaData(resultSets.get(0).getMetaData(), getShardingRule(), executionContext.getSqlStatementContext());
    }

ParameterMetaData

image.png

org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.metadata.ShardingParameterMetaData

/**
 * Sharding parameter meta data.
 */
@RequiredArgsConstructor
public final class ShardingParameterMetaData extends AbstractUnsupportedOperationParameterMetaData {
    
    private final SQLParserEngine sqlParserEngine;
    
    private final String sql;
    
    @Override
    public int getParameterCount() {
        return sqlParserEngine.parse(sql, true).getParameterCount();
    }
}

可以看到聊记,該類只實現(xiàn)了ParameterMetaData接口中g(shù)etParameterCount,通過解析引擎獲取到參數(shù)的數(shù)量恢暖。這個類實例會在ShardingPreparedStatement的構(gòu)造函數(shù)中進(jìn)行創(chuàng)建排监,以供getParameterMetaData()方法返回。

org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement

/**
 * PreparedStatement that support sharding.
 */
public final class ShardingPreparedStatement extends AbstractShardingPreparedStatementAdapter {
    
    @Getter
    private final ShardingConnection connection;
    
    private final String sql;
    
    @Getter
private final ParameterMetaData parameterMetaData;
…

private ShardingPreparedStatement(final ShardingConnection connection, final String sql,
                                      final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys) throws SQLException {
        if (Strings.isNullOrEmpty(sql)) {
            throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
        }
        this.connection = connection;
        this.sql = sql;
        ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
        parameterMetaData = new ShardingParameterMetaData(runtimeContext.getSqlParserEngine(), sql);
…
    }

5.x中的變化

在5.x中杰捂,對JDBC的實現(xiàn)類做了進(jìn)一步的合并與簡化舆床,并不是根據(jù)功能定義各JDBC實現(xiàn)類,而是統(tǒng)一成ShardingSphere*類嫁佳,例如將4.1.1中ShardingDataSource挨队、MasterSalveDataSource、ShadowDataSource蒿往、EncryptDataSource都統(tǒng)一成了ShardingSphereDataSource類盛垦;
ShardingConnection、MasterSlaveConnection瓤漏、ShadowConnection腾夯、EncryptConnection都合并為ShardingSphereConnection類;
ShardingPreparedStatement蔬充、MasterSalvePreparedStatement蝶俱、ShadowPreparedStatement等也類似,都合并成為ShardingSpherePreparedStatement類饥漫;

原來不同的功能邏輯都以裝飾器方式榨呆,分散到了各個統(tǒng)一處理里環(huán)節(jié)(解析、路由趾浅、改寫愕提、執(zhí)行、歸并)皿哨,由內(nèi)核引擎負(fù)責(zé)觸發(fā)執(zhí)行浅侨,這種設(shè)計顯然更加優(yōu)雅,也降低了開發(fā)人員心智負(fù)擔(dān)证膨。

5.x中的DataSource類層次圖


ShardingSphere5.x中DataSource接口實現(xiàn)類圖

5.x中Connection的類層次圖


ShardingSphere5.x中Connection接口實現(xiàn)類圖

5.x中Statement的類層次圖

ShardingSphere5.x中Statement接口實現(xiàn)類圖

由于5.x還沒有發(fā)布release版本如输,代碼后續(xù)可能還會有大的變化,所以5.x的詳細(xì)源碼待release版本正式發(fā)布后再分析。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末不见,一起剝皮案震驚了整個濱河市澳化,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌稳吮,老刑警劉巖缎谷,帶你破解...
    沈念sama閱讀 222,183評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異灶似,居然都是意外死亡列林,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,850評論 3 399
  • 文/潘曉璐 我一進(jìn)店門酪惭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來希痴,“玉大人,你說我怎么就攤上這事春感∑龃矗” “怎么了?”我有些...
    開封第一講書人閱讀 168,766評論 0 361
  • 文/不壞的土叔 我叫張陵鲫懒,是天一觀的道長嫩实。 經(jīng)常有香客問我,道長刀疙,這世上最難降的妖魔是什么舶赔? 我笑而不...
    開封第一講書人閱讀 59,854評論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮谦秧,結(jié)果婚禮上竟纳,老公的妹妹穿的比我還像新娘。我一直安慰自己疚鲤,他們只是感情好锥累,可當(dāng)我...
    茶點故事閱讀 68,871評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著集歇,像睡著了一般桶略。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上诲宇,一...
    開封第一講書人閱讀 52,457評論 1 311
  • 那天际歼,我揣著相機(jī)與錄音,去河邊找鬼姑蓝。 笑死鹅心,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的纺荧。 我是一名探鬼主播旭愧,決...
    沈念sama閱讀 40,999評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼颅筋,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了输枯?” 一聲冷哼從身側(cè)響起议泵,我...
    開封第一講書人閱讀 39,914評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎桃熄,沒想到半個月后先口,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,465評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡瞳收,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,543評論 3 342
  • 正文 我和宋清朗相戀三年池充,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片缎讼。...
    茶點故事閱讀 40,675評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖坑匠,靈堂內(nèi)的尸體忽然破棺而出血崭,到底是詐尸還是另有隱情,我是刑警寧澤厘灼,帶...
    沈念sama閱讀 36,354評論 5 351
  • 正文 年R本政府宣布夹纫,位于F島的核電站,受9級特大地震影響设凹,放射性物質(zhì)發(fā)生泄漏舰讹。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,029評論 3 335
  • 文/蒙蒙 一闪朱、第九天 我趴在偏房一處隱蔽的房頂上張望月匣。 院中可真熱鬧,春花似錦奋姿、人聲如沸锄开。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,514評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽萍悴。三九已至,卻和暖如春寓免,著一層夾襖步出監(jiān)牢的瞬間癣诱,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,616評論 1 274
  • 我被黑心中介騙來泰國打工袜香, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留撕予,地道東北人。 一個月前我還...
    沈念sama閱讀 49,091評論 3 378
  • 正文 我出身青樓困鸥,卻偏偏與公主長得像嗅蔬,于是被迫代替她去往敵國和親剑按。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,685評論 2 360