問(wèn)題
通過(guò)JDBC對(duì)MySQL進(jìn)行數(shù)據(jù)查詢時(shí)潦嘶,有個(gè)很容易踩的坑,以下面代碼為例:
public static void selectNormal() throws SQLException{
Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456");
PreparedStatement statement = connection.prepareStatement("select * from test",ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY);
//statement.setFetchSize(100);
ResultSet resultSet = statement.executeQuery();
while(resultSet.next()){
System.out.println(resultSet.getString(1));
}
resultSet.close();
statement.close();
connection.close();
}
這段代碼在查詢結(jié)果數(shù)據(jù)條數(shù)較大時(shí)則會(huì)出現(xiàn)內(nèi)存溢出OOM問(wèn)題:
為了更容易模擬錯(cuò)誤崇众,可將jvm內(nèi)存設(shè)置較小掂僵,增加jvm參數(shù) -Xms16m -Xmx16m
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2213)
at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1992)
at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3413)
at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:471)
at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3115)
at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2344)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2739)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486)
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1966)
at com.cmbc.dap.dao.test.MysqlBatchTest.selectNormal(MysqlBatchTest.java:46)
at com.cmbc.dap.dao.test.MysqlBatchTest.main(MysqlBatchTest.java:13)
你可能會(huì)說(shuō)設(shè)置fetchSize即可航厚,但不幸的是,將上述代碼設(shè)置fetchSize代碼注釋打開锰蓬,依然會(huì)報(bào)出同樣錯(cuò)誤幔睬,fetchSize并沒有生效,MySQL仍然一股腦將所有數(shù)據(jù)加載到內(nèi)存,直到撐爆芹扭。
對(duì)于大數(shù)據(jù)量下查詢麻顶,如果才能保證應(yīng)用程序正確運(yùn)行呢?尋根溯源舱卡,我們還是通過(guò)查看MySQL驅(qū)動(dòng)源碼來(lái)找答案辅肾。
MySQL驅(qū)動(dòng) 查詢實(shí)現(xiàn)原理
com.mysql.jdbc.PreparedStatement
public java.sql.ResultSet executeQuery() throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
MySQLConnection locallyScopedConn = this.connection;
checkForDml(this.originalSql, this.firstCharOfStmt);
this.batchedGeneratedKeys = null;
resetCancelledState();
implicitlyCloseAllOpenResults();
clearWarnings();
if (this.doPingInstead) {
doPingInstead();
return this.results;
}
setupStreamingTimeout(locallyScopedConn);
Buffer sendPacket = fillSendPacket();
String oldCatalog = null;
if (!locallyScopedConn.getCatalog().equals(this.currentCatalog)) {
oldCatalog = locallyScopedConn.getCatalog();
locallyScopedConn.setCatalog(this.currentCatalog);
}
//
// Check if we have cached metadata for this query...
//
CachedResultSetMetaData cachedMetadata = null;
if (locallyScopedConn.getCacheResultSetMetadata()) {
cachedMetadata = locallyScopedConn.getCachedMetaData(this.originalSql);
}
Field[] metadataFromCache = null;
if (cachedMetadata != null) {
metadataFromCache = cachedMetadata.fields;
}
locallyScopedConn.setSessionMaxRows(this.maxRows);
this.results = executeInternal(this.maxRows, sendPacket, createStreamingResultSet(), true, metadataFromCache, false);
if (oldCatalog != null) {
locallyScopedConn.setCatalog(oldCatalog);
}
if (cachedMetadata != null) {
locallyScopedConn.initializeResultsMetadataFromCache(this.originalSql, cachedMetadata, this.results);
} else {
if (locallyScopedConn.getCacheResultSetMetadata()) {
locallyScopedConn.initializeResultsMetadataFromCache(this.originalSql, null /* will be created */, this.results);
}
}
this.lastInsertId = this.results.getUpdateID();
return this.results;
}
}
上面代碼中我們特別注意createStreamingResultSet方法,此方法返回是否創(chuàng)建流式結(jié)果集轮锥,即采用流式查詢矫钓。流式查詢與普通查詢不同之處在于并不是一次性將所有數(shù)據(jù)加載到內(nèi)存,在調(diào)用next()方法時(shí)交胚,MySQL驅(qū)動(dòng)只從網(wǎng)絡(luò)數(shù)據(jù)流獲取到1條數(shù)據(jù)份汗,然后返回應(yīng)用盈电,這樣就避免了內(nèi)存溢出問(wèn)題蝴簇。我們看下該方法的實(shí)現(xiàn):
/**
* We only stream result sets when they are forward-only, read-only, and the
* fetch size has been set to Integer.MIN_VALUE
*
* @return true if this result set should be streamed row at-a-time, rather
* than read all at once.
*/
protected boolean createStreamingResultSet() {
return ((this.resultSetType == java.sql.ResultSet.TYPE_FORWARD_ONLY) && (this.resultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY)
&& (this.fetchSize == Integer.MIN_VALUE));
}
可以看到滿足這三個(gè)條件即會(huì)采用流式查詢,前面兩個(gè)其實(shí)就是MySQL創(chuàng)建Statement的默認(rèn)的游標(biāo)類型匆帚,在PreparedStatement類我們可以看到
private static final int DEFAULT_RESULT_SET_TYPE = ResultSet.TYPE_FORWARD_ONLY;
private static final int DEFAULT_RESULT_SET_CONCURRENCY = ResultSet.CONCUR_READ_ONLY;
public java.sql.PreparedStatement prepareStatement(String sql)
throws SQLException {
return prepareStatement(sql, DEFAULT_RESULT_SET_TYPE,
DEFAULT_RESULT_SET_CONCURRENCY);
}
因此創(chuàng)建statement熬词,不指定后面兩個(gè)參數(shù)默認(rèn)也是滿足流式查詢的條件的。
PreparedStatement statement = connection.prepareStatement("select * from test");
而第三個(gè)條件卻很奇怪吸重,fetchSize必須為Integer.MIN_VALUE即-2147483648互拾,而這樣一個(gè)負(fù)數(shù)是MySQL自定義的的特殊含義值,在JDBC接口規(guī)范并無(wú)此說(shuō)明嚎幸。至此我們就知道了如何使用流式查詢了颜矿,修改代碼如下:
public static void selectStream() throws SQLException{
Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456");
PreparedStatement statement = connection.prepareStatement("select * from test",ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY);
statement.setFetchSize(Integer.MIN_VALUE);
long begin = System.currentTimeMillis();
ResultSet resultSet = statement.executeQuery();
while(resultSet.next()){
//System.out.println(resultSet.getString(1));
}
long end = System.currentTimeMillis();
System.out.println("selectStream span time="+(end-begin) + "ms");
resultSet.close();
statement.close();
connection.close();
}
運(yùn)行,果然解決了OOM問(wèn)題嫉晶,無(wú)論數(shù)據(jù)量多大骑疆,都可以正常查詢了。
在StatementImpl中有enableStreamingResults()方法替废,該方法其實(shí)就是設(shè)置這三個(gè)條件的箍铭,網(wǎng)上很多文章介紹此種方式開啟流式查詢,但筆者不太推薦這種方式椎镣,因?yàn)樾枰獜?qiáng)制轉(zhuǎn)換為MySQL驅(qū)動(dòng)中的StatementImpl類诈火,這其實(shí)已經(jīng)并非JDBC的標(biāo)準(zhǔn)接口。
public void enableStreamingResults() throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
this.originalResultSetType = this.resultSetType;
this.originalFetchSize = this.fetchSize;
setFetchSize(Integer.MIN_VALUE);
setResultSetType(ResultSet.TYPE_FORWARD_ONLY);
}
}
至此状答,我們已經(jīng)知道如何使用流式查詢解決大數(shù)據(jù)查詢時(shí)的OOM問(wèn)題冷守,但流式查詢的實(shí)現(xiàn)原理我們還不清楚刀崖,因此我們繼續(xù)看源代碼一探究竟,為了更方便展示方法調(diào)用層次教沾,我畫了一個(gè)調(diào)用序列圖:
我們直接看com.mysql.jdbc.MysqlIO中的getResultSet方法:
/**
* Build a result set. Delegates to buildResultSetWithRows() to build a
* JDBC-version-specific ResultSet, given rows as byte data, and field
* information.
*
* @param callingStatement DOCUMENT ME!
* @param columnCount the number of columns in the result set
* @param maxRows the maximum number of rows to read (-1 means all rows)
* @param resultSetType (TYPE_FORWARD_ONLY, TYPE_SCROLL_????)
* @param resultSetConcurrency the type of result set (CONCUR_UPDATABLE or
* READ_ONLY)
* @param streamResults should the result set be read all at once, or
* streamed?
* @param catalog the database name in use when the result set was created
* @param isBinaryEncoded is this result set in native encoding?
* @param unpackFieldInfo should we read MYSQL_FIELD info (if available)?
*
* @return a result set
*
* @throws SQLException if a database access error occurs
*/
protected ResultSetImpl getResultSet(StatementImpl callingStatement,
long columnCount, int maxRows, int resultSetType,
int resultSetConcurrency, boolean streamResults, String catalog,
boolean isBinaryEncoded, Field[] metadataFromCache)
throws SQLException {
Buffer packet; // The packet from the server
Field[] fields = null;
// Read in the column information
if (metadataFromCache == null /* we want the metadata from the server */) {
fields = new Field[(int) columnCount];
for (int i = 0; i < columnCount; i++) {
Buffer fieldPacket = null;
fieldPacket = readPacket();
fields[i] = unpackField(fieldPacket, false);
}
} else {
for (int i = 0; i < columnCount; i++) {
skipPacket();
}
}
packet = reuseAndReadPacket(this.reusablePacket);
readServerStatusForResultSets(packet);
//
// Handle cursor-based fetch first
//
if (this.connection.versionMeetsMinimum(5, 0, 2)
&& this.connection.getUseCursorFetch()
&& isBinaryEncoded
&& callingStatement != null
&& callingStatement.getFetchSize() != 0
&& callingStatement.getResultSetType() == ResultSet.TYPE_FORWARD_ONLY) {
ServerPreparedStatement prepStmt = (com.mysql.jdbc.ServerPreparedStatement) callingStatement;
boolean usingCursor = true;
//
// Server versions 5.0.5 or newer will only open
// a cursor and set this flag if they can, otherwise
// they punt and go back to mysql_store_results() behavior
//
if (this.connection.versionMeetsMinimum(5, 0, 5)) {
usingCursor = (this.serverStatus &
SERVER_STATUS_CURSOR_EXISTS) != 0;
}
if (usingCursor) {
RowData rows = new RowDataCursor(
this,
prepStmt,
fields);
ResultSetImpl rs = buildResultSetWithRows(
callingStatement,
catalog,
fields,
rows, resultSetType, resultSetConcurrency, isBinaryEncoded);
if (usingCursor) {
rs.setFetchSize(callingStatement.getFetchSize());
}
return rs;
}
}
RowData rowData = null;
if (!streamResults) {
rowData = readSingleRowSet(columnCount, maxRows,
resultSetConcurrency, isBinaryEncoded,
(metadataFromCache == null) ? fields : metadataFromCache);
} else {
rowData = new RowDataDynamic(this, (int) columnCount,
(metadataFromCache == null) ? fields : metadataFromCache,
isBinaryEncoded);
this.streamingData = rowData;
}
ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog,
(metadataFromCache == null) ? fields : metadataFromCache,
rowData, resultSetType, resultSetConcurrency, isBinaryEncoded);
return rs;
}
三種查詢方式
上代碼可以看到蒲跨,MySQL驅(qū)動(dòng)會(huì)根據(jù)不同的參數(shù)設(shè)置選擇對(duì)應(yīng)的ResultSet實(shí)現(xiàn)類,分別對(duì)應(yīng)三種查詢方式:
- 1. RowDataStatic 靜態(tài)結(jié)果集授翻,默認(rèn)的查詢方式或悲,普通查詢
- 2. RowDataDynamic 動(dòng)態(tài)結(jié)果集,流式查詢
- 3. RowDataCursor 游標(biāo)結(jié)果集堪唐,服務(wù)器端基于游標(biāo)查詢
簡(jiǎn)單看下這幾個(gè)類的實(shí)現(xiàn)代碼:
方式1 普通查詢
private RowData readSingleRowSet(long columnCount, int maxRows, int resultSetConcurrency, boolean isBinaryEncoded, Field[] fields) throws SQLException {
RowData rowData;
ArrayList<ResultSetRow> rows = new ArrayList<ResultSetRow>();
boolean useBufferRowExplicit = useBufferRowExplicit(fields);
// Now read the data
ResultSetRow row = nextRow(fields, (int) columnCount, isBinaryEncoded, resultSetConcurrency, false, useBufferRowExplicit, false, null);
int rowCount = 0;
if (row != null) {
rows.add(row);
rowCount = 1;
}
while (row != null) {
row = nextRow(fields, (int) columnCount, isBinaryEncoded, resultSetConcurrency, false, useBufferRowExplicit, false, null);
if (row != null) {
if ((maxRows == -1) || (rowCount < maxRows)) {
rows.add(row);
rowCount++;
}
}
}
rowData = new RowDataStatic(rows);
return rowData;
}
可以看出巡语,此種方式其實(shí)就是一次性把查詢的所有結(jié)果集都保存在本地?cái)?shù)組中,所以如果數(shù)據(jù)量太大淮菠,超過(guò)jvm內(nèi)存男公,則會(huì)報(bào)文中篇頭所示的OOM錯(cuò)誤。
方式2 流式查詢
每次只獲取一條結(jié)果集合陵,待應(yīng)用處理完再次調(diào)用next()時(shí)枢赔,繼續(xù)獲取下一條數(shù)據(jù),由代碼可以看出流式查詢獲取數(shù)據(jù)的方法與普通查詢其實(shí)是一樣的( this.io.nextRow)拥知,不同之處在與普通查詢時(shí)先獲取所有數(shù)據(jù)踏拜,然后交給應(yīng)用處理(next方法其實(shí)都是從內(nèi)存數(shù)組遍歷),而流式查詢時(shí)逐條獲取低剔,待應(yīng)用處理完再去拿下一條數(shù)據(jù)速梗。
com.mysql.jdbc.RowDataDynamic
private void nextRecord() throws SQLException {
try {
if (!this.noMoreRows) {
this.nextRow = this.io.nextRow(this.metadata, this.columnCount, this.isBinaryEncoded, java.sql.ResultSet.CONCUR_READ_ONLY, true,
this.useBufferRowExplicit, true, null);
if (this.nextRow == null) {
this.noMoreRows = true;
this.isAfterEnd = true;
this.moreResultsExisted = this.io.tackOnMoreStreamingResults(this.owner);
if (this.index == -1) {
this.wasEmpty = true;
}
}
} else {
this.nextRow = null;
this.isAfterEnd = true;
}
} catch (SQLException sqlEx) {
if (sqlEx instanceof StreamingNotifiable) {
((StreamingNotifiable) sqlEx).setWasStreamingResults();
}
// There won't be any more rows
this.noMoreRows = true;
// don't wrap SQLExceptions
throw sqlEx;
} catch (Exception ex) {
String exceptionType = ex.getClass().getName();
String exceptionMessage = ex.getMessage();
exceptionMessage += Messages.getString("RowDataDynamic.7");
exceptionMessage += Util.stackTraceToString(ex);
SQLException sqlEx = SQLError.createSQLException(
Messages.getString("RowDataDynamic.8") + exceptionType + Messages.getString("RowDataDynamic.9") + exceptionMessage,
SQLError.SQL_STATE_GENERAL_ERROR, this.exceptionInterceptor);
sqlEx.initCause(ex);
throw sqlEx;
}
}
方式3 RowDataCursor 基于游標(biāo)
從代碼我們驚喜的發(fā)現(xiàn),MySQL其實(shí)是支持游標(biāo)查詢的襟齿,這種方式下MySQL服務(wù)器端一次只發(fā)送fetchSize條數(shù)據(jù)姻锁,MySQL驅(qū)動(dòng)會(huì)獲取完fetchSize條數(shù)據(jù)后返回給應(yīng)用,應(yīng)用處理完繼續(xù)調(diào)用next()時(shí)猜欺,繼續(xù)發(fā)送fetch命令位隶,繼續(xù)獲取下一批次fetchSize條數(shù)據(jù)。
protected List<ResultSetRow> fetchRowsViaCursor(List<ResultSetRow> fetchedRows, long statementId, Field[] columnTypes, int fetchSize,
boolean useBufferRowExplicit) throws SQLException {
if (fetchedRows == null) {
fetchedRows = new ArrayList<ResultSetRow>(fetchSize);
} else {
fetchedRows.clear();
}
this.sharedSendPacket.clear();
this.sharedSendPacket.writeByte((byte) MysqlDefs.COM_FETCH);
this.sharedSendPacket.writeLong(statementId);
this.sharedSendPacket.writeLong(fetchSize);
sendCommand(MysqlDefs.COM_FETCH, null, this.sharedSendPacket, true, null, 0);
ResultSetRow row = null;
while ((row = nextRow(columnTypes, columnTypes.length, true, ResultSet.CONCUR_READ_ONLY, false, useBufferRowExplicit, false, null)) != null) {
fetchedRows.add(row);
}
return fetchedRows;
}
我們看下基于游標(biāo)的查詢測(cè)試代碼:(設(shè)置useCursorFetch=true开皿,指定fetchSize)
public static void selectStreamWithUseCursorFetch() throws SQLException{
Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useCursorFetch=true", "root", "123456");
PreparedStatement statement = connection.prepareStatement("select * from test");
statement.setFetchSize(10);
long begin = System.currentTimeMillis();
ResultSet resultSet = statement.executeQuery();
while(resultSet.next()){
//System.out.println(resultSet.getString(1));
}
long end = System.currentTimeMillis();
System.out.println("selectStreamWithUseCursorFetch span time="+(end-begin) + "ms");
resultSet.close();
statement.close();
connection.close();
}
運(yùn)行發(fā)現(xiàn)大數(shù)據(jù)量時(shí)這種方式也可正常運(yùn)行涧黄。應(yīng)用指定每次查詢獲取的條數(shù)fetchSize,MySQL服務(wù)器每次只查詢指定條數(shù)的數(shù)據(jù)副瀑,因此單次查詢相比與前面兩種方式占用MySQL時(shí)間較短弓熏。但由于MySQL方不知道客戶端什么時(shí)候?qū)?shù)據(jù)消費(fèi)完,MySQL需要建立一個(gè)臨時(shí)空間來(lái)存放每次查詢出的數(shù)據(jù)糠睡,大數(shù)據(jù)量時(shí)MySQL服務(wù)器IOPS挽鞠、磁盤占用都會(huì)飆升,而且需要與服務(wù)器進(jìn)行更多次的網(wǎng)絡(luò)通訊,因此最終查詢效率是不如流式查詢的信认。
本地測(cè)試查詢100w數(shù)據(jù)材义,方式2與方式3執(zhí)行時(shí)間對(duì)比:
selectStreamWithUseCursorFetch span time=507ms
selectStream span time=155ms
從結(jié)果上看,由于基于游標(biāo)方式嫁赏,服務(wù)器端需要更多額外處理其掂,查詢性能更低些,對(duì)于大數(shù)據(jù)量一般情況下推薦基于動(dòng)態(tài)結(jié)果集的流式查詢潦蝇。
總結(jié):
本文通過(guò)對(duì)MySQL驅(qū)動(dòng)中查詢模塊的源碼進(jìn)行剖析款熬,可知MySQL支持三種不同的查詢方式,分別適用不同的場(chǎng)景攘乒,了解其各自優(yōu)缺點(diǎn)后贤牛,才能在實(shí)際項(xiàng)目中正確使用。
一则酝、普通查詢
- 優(yōu)點(diǎn):應(yīng)用代碼簡(jiǎn)單殉簸,數(shù)據(jù)量較小時(shí)操作速度快。
- 缺點(diǎn):數(shù)據(jù)量大時(shí)會(huì)出現(xiàn)OOM問(wèn)題沽讹。
二般卑、流式查詢
- 優(yōu)點(diǎn):大數(shù)據(jù)量時(shí)不會(huì)有OOM問(wèn)題。
- 缺點(diǎn):占用數(shù)據(jù)庫(kù)時(shí)間更長(zhǎng)爽雄,導(dǎo)致網(wǎng)絡(luò)擁塞的可能性較大蝠检。
三、游標(biāo)查詢
- 優(yōu)點(diǎn):大數(shù)據(jù)量時(shí)不會(huì)有OOM問(wèn)題盲链,相比流式查詢對(duì)數(shù)據(jù)庫(kù)單次占用時(shí)間較短蝇率。
- 缺點(diǎn):相比流式查詢迟杂,對(duì)服務(wù)端資源消耗更大刽沾,響應(yīng)時(shí)間更長(zhǎng)。