聊聊jdbc statement的fetchSize

本文主要研究一下jdbc statement的fetchSize

fetchSize

這里以postgres jdbc driver為例撵颊,主要是因為postgres的jdbc driver有公開源碼政己,而且命名比較規(guī)范袱饭。之前看oracle jdbc富稻,由于沒有源碼,反編譯出來一大堆var1,var2等的變量命名,非炒沓溃晦澀。

默認(rèn)情況下pgjdbc driver會一次性拉取所有結(jié)果集,也就是在executeQuery的時候以清。對于大數(shù)據(jù)量的查詢來說儿普,非常容易造成OOM。這種場景就需要設(shè)置fetchSize掷倔,執(zhí)行query的時候先返回第一批數(shù)據(jù)眉孩,之后next完一批數(shù)據(jù)之后再去拉取下一批。

但是這個有幾個要求:

  • 數(shù)據(jù)庫必須使用V3協(xié)議勒葱,即pg7.4+
  • connection的autoCommit必須為false浪汪,因為開啟autoCommit的話,查詢完成cursor會被關(guān)閉凛虽,那么下次就不能再fetch了死遭。另外ResultSet必須是ResultSet.TYPE_FORWARD_ONLY類型,這個是默認(rèn)的涩维。也就是說無法向后滾動殃姓。
  • 查詢語句必須是單條,不能是用分號組成的多條查詢

實例代碼

    @Test
    public void testReadTimeout() throws SQLException {
        Connection connection = dataSource.getConnection();
        //https://jdbc.postgresql.org/documentation/head/query.html
        connection.setAutoCommit(false); //NOTE 為了設(shè)置fetchSize,必須設(shè)置為false

        String sql = "select * from demo_table";
        PreparedStatement pstmt;
        try {
            pstmt = (PreparedStatement)connection.prepareStatement(sql);
            pstmt.setFetchSize(50); 
            System.out.println("ps.getQueryTimeout():" + pstmt.getQueryTimeout());
            System.out.println("ps.getFetchSize():" + pstmt.getFetchSize());
            System.out.println("ps.getFetchDirection():" + pstmt.getFetchDirection());
            System.out.println("ps.getMaxFieldSize():" + pstmt.getMaxFieldSize());

            ResultSet rs = pstmt.executeQuery(); 
            //NOTE 這里返回了就代表statement執(zhí)行完成,默認(rèn)返回fetchSize的數(shù)據(jù)
            int col = rs.getMetaData().getColumnCount();
            System.out.println("============================");
            while (rs.next()) { 
                for (int i = 1; i <= col; i++) {
                    System.out.print(rs.getObject(i));
                }
                System.out.println("");
            }
            System.out.println("============================");
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            //close resources
        }
    }

源碼解析

postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgPreparedStatement.java

  /*
   * A Prepared SQL query is executed and its ResultSet is returned
   *
   * @return a ResultSet that contains the data produced by the * query - never null
   *
   * @exception SQLException if a database access error occurs
   */
  public java.sql.ResultSet executeQuery() throws SQLException {
    if (!executeWithFlags(0)) {
      throw new PSQLException(GT.tr("No results were returned by the query."), PSQLState.NO_DATA);
    }

    if (result.getNext() != null) {
      throw new PSQLException(GT.tr("Multiple ResultSets were returned by the query."),
          PSQLState.TOO_MANY_RESULTS);
    }

    return result.getResultSet();
  }

executeQuery首先調(diào)用executeWithFlags方法瓦阐,源碼里頭直接寫在if里頭的,這個不是推薦的方式篷牌,因為放在if比較容易忽略睡蟋。

  • executeWithFlags
public boolean executeWithFlags(int flags) throws SQLException {
    try {
      checkClosed();

      if (connection.getPreferQueryMode() == PreferQueryMode.SIMPLE) {
        flags |= QueryExecutor.QUERY_EXECUTE_AS_SIMPLE;
      }

      execute(preparedQuery, preparedParameters, flags);

      return (result != null && result.getResultSet() != null);
    } finally {
      defaultTimeZone = null;
    }
  }

protected final void execute(CachedQuery cachedQuery, ParameterList queryParameters, int flags)
      throws SQLException {
    try {
      executeInternal(cachedQuery, queryParameters, flags);
    } catch (SQLException e) {
      // Don't retry composite queries as it might get partially executed
      if (cachedQuery.query.getSubqueries() != null
          || !connection.getQueryExecutor().willHealOnRetry(e)) {
        throw e;
      }
      cachedQuery.query.close();
      // Execute the query one more time
      executeInternal(cachedQuery, queryParameters, flags);
    }
  }

這里又調(diào)用execute方法,在調(diào)用executeInternal

executeInternal

postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgPreparedStatement.java

private void executeInternal(CachedQuery cachedQuery, ParameterList queryParameters, int flags)
      throws SQLException {
    closeForNextExecution();

    // Enable cursor-based resultset if possible.
    if (fetchSize > 0 && !wantsScrollableResultSet() && !connection.getAutoCommit()
        && !wantsHoldableResultSet()) {
      flags |= QueryExecutor.QUERY_FORWARD_CURSOR;
    }

    if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) {
      flags |= QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS;

      // If the no results flag is set (from executeUpdate)
      // clear it so we get the generated keys results.
      //
      if ((flags & QueryExecutor.QUERY_NO_RESULTS) != 0) {
        flags &= ~(QueryExecutor.QUERY_NO_RESULTS);
      }
    }

    if (isOneShotQuery(cachedQuery)) {
      flags |= QueryExecutor.QUERY_ONESHOT;
    }
    // Only use named statements after we hit the threshold. Note that only
    // named statements can be transferred in binary format.

    if (connection.getAutoCommit()) {
      flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;
    }

    // updateable result sets do not yet support binary updates
    if (concurrency != ResultSet.CONCUR_READ_ONLY) {
      flags |= QueryExecutor.QUERY_NO_BINARY_TRANSFER;
    }

    Query queryToExecute = cachedQuery.query;

    if (queryToExecute.isEmpty()) {
      flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;
    }

    if (!queryToExecute.isStatementDescribed() && forceBinaryTransfers
        && (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) == 0) {
      // Simple 'Q' execution does not need to know parameter types
      // When binaryTransfer is forced, then we need to know resulting parameter and column types,
      // thus sending a describe request.
      int flags2 = flags | QueryExecutor.QUERY_DESCRIBE_ONLY;
      StatementResultHandler handler2 = new StatementResultHandler();
      connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler2, 0, 0,
          flags2);
      ResultWrapper result2 = handler2.getResults();
      if (result2 != null) {
        result2.getResultSet().close();
      }
    }

    StatementResultHandler handler = new StatementResultHandler();
    result = null;
    try {
      startTimer();
      connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler, maxrows,
          fetchSize, flags);
    } finally {
      killTimerTask();
    }
    result = firstUnclosedResult = handler.getResults();

    if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) {
      generatedKeys = result;
      result = result.getNext();

      if (wantsGeneratedKeysOnce) {
        wantsGeneratedKeysOnce = false;
      }
    }

  }

主要看這段

connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler, maxrows,
          fetchSize, flags);

通過把fetchSize傳遞進(jìn)去枷颊,拉取指定大小的result

最后調(diào)用sendExecute以及processResults方法來拉取數(shù)據(jù)
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/core/v3/QueryExecutorImpl.java

private void sendExecute(SimpleQuery query, Portal portal, int limit) throws IOException {
    //
    // Send Execute.
    //

    if (logger.logDebug()) {
      logger.debug(" FE=> Execute(portal=" + portal + ",limit=" + limit + ")");
    }

    byte[] encodedPortalName = (portal == null ? null : portal.getEncodedPortalName());
    int encodedSize = (encodedPortalName == null ? 0 : encodedPortalName.length);

    // Total size = 4 (size field) + 1 + N (source portal) + 4 (max rows)
    pgStream.sendChar('E'); // Execute
    pgStream.sendInteger4(4 + 1 + encodedSize + 4); // message size
    if (encodedPortalName != null) {
      pgStream.send(encodedPortalName); // portal name
    }
    pgStream.sendChar(0); // portal name terminator
    pgStream.sendInteger4(limit); // row limit

    pendingExecuteQueue.add(new ExecuteRequest(query, portal, false));
  }

protected void processResults(ResultHandler handler, int flags) throws IOException {
    boolean noResults = (flags & QueryExecutor.QUERY_NO_RESULTS) != 0;
    boolean bothRowsAndStatus = (flags & QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS) != 0;

    List<byte[][]> tuples = null;

    int c;
    boolean endQuery = false;

    // At the end of a command execution we have the CommandComplete
    // message to tell us we're done, but with a describeOnly command
    // we have no real flag to let us know we're done. We've got to
    // look for the next RowDescription or NoData message and return
    // from there.
    boolean doneAfterRowDescNoData = false;

    while (!endQuery) {
      c = pgStream.receiveChar();
      switch (c) {
        case 'A': // Asynchronous Notify
          receiveAsyncNotify();
          break;

        case '1': // Parse Complete (response to Parse)
          pgStream.receiveInteger4(); // len, discarded

          SimpleQuery parsedQuery = pendingParseQueue.removeFirst();
          String parsedStatementName = parsedQuery.getStatementName();
          //...
      }
  }
}        

next

postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgResultSet.java

public boolean next() throws SQLException {
    checkClosed();

    if (onInsertRow) {
      throw new PSQLException(GT.tr("Can''t use relative move methods while on the insert row."),
          PSQLState.INVALID_CURSOR_STATE);
    }

    if (current_row + 1 >= rows.size()) {
      if (cursor == null || (maxRows > 0 && row_offset + rows.size() >= maxRows)) {
        current_row = rows.size();
        this_row = null;
        rowBuffer = null;
        return false; // End of the resultset.
      }

      // Ask for some more data.
      row_offset += rows.size(); // We are discarding some data.

      int fetchRows = fetchSize;
      if (maxRows != 0) {
        if (fetchRows == 0 || row_offset + fetchRows > maxRows) {
          // Fetch would exceed maxRows, limit it.
          fetchRows = maxRows - row_offset;
        }
      }

      // Execute the fetch and update this resultset.
      connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows);

      current_row = 0;

      // Test the new rows array.
      if (rows.isEmpty()) {
        this_row = null;
        rowBuffer = null;
        return false;
      }
    } else {
      current_row++;
    }

    initRowBuffer();
    return true;
  }

next方法可以看到戳杀,首先判斷current_row + 1是否小于rows.size(),小于的話夭苗,那就current_row++信卡;否則表示這一批fetchSize的數(shù)據(jù)被消費完了,需要判斷是否結(jié)束或者拉取下一批數(shù)據(jù)题造,之后更新current_row

connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows);

這個方法拉取fetchRows條數(shù)的下一批數(shù)據(jù)

  • initRowBuffer
private void initRowBuffer() {
    this_row = rows.get(current_row);
    // We only need a copy of the current row if we're going to
    // modify it via an updatable resultset.
    if (resultsetconcurrency == ResultSet.CONCUR_UPDATABLE) {
      rowBuffer = new byte[this_row.length][];
      System.arraycopy(this_row, 0, rowBuffer, 0, this_row.length);
    } else {
      rowBuffer = null;
    }
  }

這就是next移動之后傍菇,把要消費的這行數(shù)據(jù)放到rowBuffer里頭。

小結(jié)

對于查詢數(shù)據(jù)量大的場景下界赔,非常有必要設(shè)置fetchSize丢习,否則全量拉取很容易OOM,但是使用fetchSize的時候淮悼,要求數(shù)據(jù)能夠在遍歷resultSet的時候及時處理咐低,而不是收集完所有數(shù)據(jù)返回回去再去處理。

doc

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末袜腥,一起剝皮案震驚了整個濱河市见擦,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖鲤屡,帶你破解...
    沈念sama閱讀 217,734評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件儡湾,死亡現(xiàn)場離奇詭異,居然都是意外死亡执俩,警方通過查閱死者的電腦和手機(jī)徐钠,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來役首,“玉大人尝丐,你說我怎么就攤上這事『獍拢” “怎么了爹袁?”我有些...
    開封第一講書人閱讀 164,133評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長矮固。 經(jīng)常有香客問我失息,道長,這世上最難降的妖魔是什么档址? 我笑而不...
    開封第一講書人閱讀 58,532評論 1 293
  • 正文 為了忘掉前任盹兢,我火速辦了婚禮,結(jié)果婚禮上守伸,老公的妹妹穿的比我還像新娘绎秒。我一直安慰自己,他們只是感情好尼摹,可當(dāng)我...
    茶點故事閱讀 67,585評論 6 392
  • 文/花漫 我一把揭開白布见芹。 她就那樣靜靜地躺著,像睡著了一般蠢涝。 火紅的嫁衣襯著肌膚如雪玄呛。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,462評論 1 302
  • 那天和二,我揣著相機(jī)與錄音徘铝,去河邊找鬼。 笑死儿咱,一個胖子當(dāng)著我的面吹牛庭砍,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播混埠,決...
    沈念sama閱讀 40,262評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼怠缸,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了钳宪?” 一聲冷哼從身側(cè)響起揭北,我...
    開封第一講書人閱讀 39,153評論 0 276
  • 序言:老撾萬榮一對情侶失蹤扳炬,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后搔体,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體恨樟,經(jīng)...
    沈念sama閱讀 45,587評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,792評論 3 336
  • 正文 我和宋清朗相戀三年疚俱,在試婚紗的時候發(fā)現(xiàn)自己被綠了劝术。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,919評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡呆奕,死狀恐怖养晋,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情梁钾,我是刑警寧澤绳泉,帶...
    沈念sama閱讀 35,635評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站姆泻,受9級特大地震影響零酪,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜拇勃,卻給世界環(huán)境...
    茶點故事閱讀 41,237評論 3 329
  • 文/蒙蒙 一四苇、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧潜秋,春花似錦蛔琅、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽辜窑。三九已至钩述,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間穆碎,已是汗流浹背牙勘。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留所禀,地道東北人方面。 一個月前我還...
    沈念sama閱讀 48,048評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像色徘,于是被迫代替她去往敵國和親恭金。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,864評論 2 354

推薦閱讀更多精彩內(nèi)容