序
本文主要研究一下jdbc statement的fetchSize
fetchSize
這里以postgres jdbc driver為例撵颊,主要是因為postgres的jdbc driver有公開源碼政己,而且命名比較規(guī)范袱饭。之前看oracle jdbc富稻,由于沒有源碼,反編譯出來一大堆var1,var2等的變量命名,非炒沓溃晦澀。
默認(rèn)情況下pgjdbc driver會一次性拉取所有結(jié)果集,也就是在executeQuery的時候以清。對于大數(shù)據(jù)量的查詢來說儿普,非常容易造成OOM。這種場景就需要設(shè)置fetchSize掷倔,執(zhí)行query的時候先返回第一批數(shù)據(jù)眉孩,之后next完一批數(shù)據(jù)之后再去拉取下一批。
但是這個有幾個要求:
- 數(shù)據(jù)庫必須使用V3協(xié)議勒葱,即pg7.4+
- connection的autoCommit必須為false浪汪,因為開啟autoCommit的話,查詢完成cursor會被關(guān)閉凛虽,那么下次就不能再fetch了死遭。另外ResultSet必須是ResultSet.TYPE_FORWARD_ONLY類型,這個是默認(rèn)的涩维。也就是說無法向后滾動殃姓。
- 查詢語句必須是單條,不能是用分號組成的多條查詢
實例代碼
@Test
public void testReadTimeout() throws SQLException {
Connection connection = dataSource.getConnection();
//https://jdbc.postgresql.org/documentation/head/query.html
connection.setAutoCommit(false); //NOTE 為了設(shè)置fetchSize,必須設(shè)置為false
String sql = "select * from demo_table";
PreparedStatement pstmt;
try {
pstmt = (PreparedStatement)connection.prepareStatement(sql);
pstmt.setFetchSize(50);
System.out.println("ps.getQueryTimeout():" + pstmt.getQueryTimeout());
System.out.println("ps.getFetchSize():" + pstmt.getFetchSize());
System.out.println("ps.getFetchDirection():" + pstmt.getFetchDirection());
System.out.println("ps.getMaxFieldSize():" + pstmt.getMaxFieldSize());
ResultSet rs = pstmt.executeQuery();
//NOTE 這里返回了就代表statement執(zhí)行完成,默認(rèn)返回fetchSize的數(shù)據(jù)
int col = rs.getMetaData().getColumnCount();
System.out.println("============================");
while (rs.next()) {
for (int i = 1; i <= col; i++) {
System.out.print(rs.getObject(i));
}
System.out.println("");
}
System.out.println("============================");
} catch (SQLException e) {
e.printStackTrace();
} finally {
//close resources
}
}
源碼解析
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgPreparedStatement.java
/*
* A Prepared SQL query is executed and its ResultSet is returned
*
* @return a ResultSet that contains the data produced by the * query - never null
*
* @exception SQLException if a database access error occurs
*/
public java.sql.ResultSet executeQuery() throws SQLException {
if (!executeWithFlags(0)) {
throw new PSQLException(GT.tr("No results were returned by the query."), PSQLState.NO_DATA);
}
if (result.getNext() != null) {
throw new PSQLException(GT.tr("Multiple ResultSets were returned by the query."),
PSQLState.TOO_MANY_RESULTS);
}
return result.getResultSet();
}
executeQuery首先調(diào)用executeWithFlags方法瓦阐,源碼里頭直接寫在if里頭的,這個不是推薦的方式篷牌,因為放在if比較容易忽略睡蟋。
- executeWithFlags
public boolean executeWithFlags(int flags) throws SQLException {
try {
checkClosed();
if (connection.getPreferQueryMode() == PreferQueryMode.SIMPLE) {
flags |= QueryExecutor.QUERY_EXECUTE_AS_SIMPLE;
}
execute(preparedQuery, preparedParameters, flags);
return (result != null && result.getResultSet() != null);
} finally {
defaultTimeZone = null;
}
}
protected final void execute(CachedQuery cachedQuery, ParameterList queryParameters, int flags)
throws SQLException {
try {
executeInternal(cachedQuery, queryParameters, flags);
} catch (SQLException e) {
// Don't retry composite queries as it might get partially executed
if (cachedQuery.query.getSubqueries() != null
|| !connection.getQueryExecutor().willHealOnRetry(e)) {
throw e;
}
cachedQuery.query.close();
// Execute the query one more time
executeInternal(cachedQuery, queryParameters, flags);
}
}
這里又調(diào)用execute方法,在調(diào)用executeInternal
executeInternal
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgPreparedStatement.java
private void executeInternal(CachedQuery cachedQuery, ParameterList queryParameters, int flags)
throws SQLException {
closeForNextExecution();
// Enable cursor-based resultset if possible.
if (fetchSize > 0 && !wantsScrollableResultSet() && !connection.getAutoCommit()
&& !wantsHoldableResultSet()) {
flags |= QueryExecutor.QUERY_FORWARD_CURSOR;
}
if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) {
flags |= QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS;
// If the no results flag is set (from executeUpdate)
// clear it so we get the generated keys results.
//
if ((flags & QueryExecutor.QUERY_NO_RESULTS) != 0) {
flags &= ~(QueryExecutor.QUERY_NO_RESULTS);
}
}
if (isOneShotQuery(cachedQuery)) {
flags |= QueryExecutor.QUERY_ONESHOT;
}
// Only use named statements after we hit the threshold. Note that only
// named statements can be transferred in binary format.
if (connection.getAutoCommit()) {
flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;
}
// updateable result sets do not yet support binary updates
if (concurrency != ResultSet.CONCUR_READ_ONLY) {
flags |= QueryExecutor.QUERY_NO_BINARY_TRANSFER;
}
Query queryToExecute = cachedQuery.query;
if (queryToExecute.isEmpty()) {
flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;
}
if (!queryToExecute.isStatementDescribed() && forceBinaryTransfers
&& (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) == 0) {
// Simple 'Q' execution does not need to know parameter types
// When binaryTransfer is forced, then we need to know resulting parameter and column types,
// thus sending a describe request.
int flags2 = flags | QueryExecutor.QUERY_DESCRIBE_ONLY;
StatementResultHandler handler2 = new StatementResultHandler();
connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler2, 0, 0,
flags2);
ResultWrapper result2 = handler2.getResults();
if (result2 != null) {
result2.getResultSet().close();
}
}
StatementResultHandler handler = new StatementResultHandler();
result = null;
try {
startTimer();
connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler, maxrows,
fetchSize, flags);
} finally {
killTimerTask();
}
result = firstUnclosedResult = handler.getResults();
if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) {
generatedKeys = result;
result = result.getNext();
if (wantsGeneratedKeysOnce) {
wantsGeneratedKeysOnce = false;
}
}
}
主要看這段
connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler, maxrows,
fetchSize, flags);
通過把fetchSize傳遞進(jìn)去枷颊,拉取指定大小的result
最后調(diào)用sendExecute以及processResults方法來拉取數(shù)據(jù)
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/core/v3/QueryExecutorImpl.java
private void sendExecute(SimpleQuery query, Portal portal, int limit) throws IOException {
//
// Send Execute.
//
if (logger.logDebug()) {
logger.debug(" FE=> Execute(portal=" + portal + ",limit=" + limit + ")");
}
byte[] encodedPortalName = (portal == null ? null : portal.getEncodedPortalName());
int encodedSize = (encodedPortalName == null ? 0 : encodedPortalName.length);
// Total size = 4 (size field) + 1 + N (source portal) + 4 (max rows)
pgStream.sendChar('E'); // Execute
pgStream.sendInteger4(4 + 1 + encodedSize + 4); // message size
if (encodedPortalName != null) {
pgStream.send(encodedPortalName); // portal name
}
pgStream.sendChar(0); // portal name terminator
pgStream.sendInteger4(limit); // row limit
pendingExecuteQueue.add(new ExecuteRequest(query, portal, false));
}
protected void processResults(ResultHandler handler, int flags) throws IOException {
boolean noResults = (flags & QueryExecutor.QUERY_NO_RESULTS) != 0;
boolean bothRowsAndStatus = (flags & QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS) != 0;
List<byte[][]> tuples = null;
int c;
boolean endQuery = false;
// At the end of a command execution we have the CommandComplete
// message to tell us we're done, but with a describeOnly command
// we have no real flag to let us know we're done. We've got to
// look for the next RowDescription or NoData message and return
// from there.
boolean doneAfterRowDescNoData = false;
while (!endQuery) {
c = pgStream.receiveChar();
switch (c) {
case 'A': // Asynchronous Notify
receiveAsyncNotify();
break;
case '1': // Parse Complete (response to Parse)
pgStream.receiveInteger4(); // len, discarded
SimpleQuery parsedQuery = pendingParseQueue.removeFirst();
String parsedStatementName = parsedQuery.getStatementName();
//...
}
}
}
next
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgResultSet.java
public boolean next() throws SQLException {
checkClosed();
if (onInsertRow) {
throw new PSQLException(GT.tr("Can''t use relative move methods while on the insert row."),
PSQLState.INVALID_CURSOR_STATE);
}
if (current_row + 1 >= rows.size()) {
if (cursor == null || (maxRows > 0 && row_offset + rows.size() >= maxRows)) {
current_row = rows.size();
this_row = null;
rowBuffer = null;
return false; // End of the resultset.
}
// Ask for some more data.
row_offset += rows.size(); // We are discarding some data.
int fetchRows = fetchSize;
if (maxRows != 0) {
if (fetchRows == 0 || row_offset + fetchRows > maxRows) {
// Fetch would exceed maxRows, limit it.
fetchRows = maxRows - row_offset;
}
}
// Execute the fetch and update this resultset.
connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows);
current_row = 0;
// Test the new rows array.
if (rows.isEmpty()) {
this_row = null;
rowBuffer = null;
return false;
}
} else {
current_row++;
}
initRowBuffer();
return true;
}
next方法可以看到戳杀,首先判斷current_row + 1是否小于rows.size(),小于的話夭苗,那就current_row++信卡;否則表示這一批fetchSize的數(shù)據(jù)被消費完了,需要判斷是否結(jié)束或者拉取下一批數(shù)據(jù)题造,之后更新current_row
connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows);
這個方法拉取fetchRows條數(shù)的下一批數(shù)據(jù)
- initRowBuffer
private void initRowBuffer() {
this_row = rows.get(current_row);
// We only need a copy of the current row if we're going to
// modify it via an updatable resultset.
if (resultsetconcurrency == ResultSet.CONCUR_UPDATABLE) {
rowBuffer = new byte[this_row.length][];
System.arraycopy(this_row, 0, rowBuffer, 0, this_row.length);
} else {
rowBuffer = null;
}
}
這就是next移動之后傍菇,把要消費的這行數(shù)據(jù)放到rowBuffer里頭。
小結(jié)
對于查詢數(shù)據(jù)量大的場景下界赔,非常有必要設(shè)置fetchSize丢习,否則全量拉取很容易OOM,但是使用fetchSize的時候淮悼,要求數(shù)據(jù)能夠在遍歷resultSet的時候及時處理咐低,而不是收集完所有數(shù)據(jù)返回回去再去處理。