阿飛Javaer,轉(zhuǎn)載請注明原創(chuàng)出處,謝謝!
單表查詢之結(jié)果合并
接下來以執(zhí)行SELECT o.* FROM t_order o where o.user_id=10 order by o.order_id desc limit 2,3
分析下面這段Java代碼是如何對結(jié)果進(jìn)行合并的:
result = new ShardingResultSet(resultSets, new MergeEngine(resultSets, (SelectStatement) routeResult.getSqlStatement()).merge());
MergeEngine.merge()方法的源碼如下:
public ResultSetMerger merge() throws SQLException {
selectStatement.setIndexForItems(columnLabelIndexMap);
return decorate(build());
}
build()方法源碼如下:
private ResultSetMerger build() throws SQLException {
// 說明:GroupBy***ResultSetMerger在第六篇文章單獨(dú)講解,所以此次分析的SQL條件中沒有g(shù)roup by
if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) {
if (selectStatement.isSameGroupByAndOrderByItems()) {
return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement);
} else {
return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement);
}
}
// 如果select語句中有order by字段江场,那么需要OrderByStreamResultSetMerger對結(jié)果處理
if (!selectStatement.getOrderByItems().isEmpty()) {
return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems());
}
return new IteratorStreamResultSetMerger(resultSets);
}
根據(jù)這段代碼可知,其作用是根據(jù)sql語句選擇多個不同的ResultSetMerger對結(jié)果進(jìn)行合并處理窖逗,ResultSetMerger實現(xiàn)有這幾種:GroupByStreamResultSetMerger址否,GroupByMemoryResultSetMerger,OrderByStreamResultSetMerger碎紊,IteratorStreamResultSetMerger佑附,LimitDecoratorResultSetMerger樊诺;以測試SQL
SELECT o.* FROM t_order o where o.user_id=10 order by o.order_id desc limit 2,3
為例,沒有g(shù)roup by音同,但是有order by词爬,所以使用到了OrderByStreamResultSetMerger和LimitDecoratorResultSetMerger對結(jié)果進(jìn)行合并(GroupByStreamResultSetMerger&GroupByMemoryResultSetMerger后面單獨(dú)講解)
decorate()源碼如下:
private ResultSetMerger decorate(final ResultSetMerger resultSetMerger) throws SQLException {
ResultSetMerger result = resultSetMerger;
// 如果SQL語句中有l(wèi)imist,還需要LimitDecoratorResultSetMerger配合進(jìn)行結(jié)果歸并瘟斜;
if (null != selectStatement.getLimit()) {
result = new LimitDecoratorResultSetMerger(result, selectStatement.getLimit());
}
return result;
}
接下來將以執(zhí)行SQL:
SELECT o.* FROM t_order o where o.user_id=10 order by o.order_id desc limit 2,3
(該SQL會被改寫成SELECT o.* , o.order_id AS ORDER_BY_DERIVED_0 FROM t_order_0 o where o.user_id=? order by o.order_id desc limit 2,3
)為例缸夹,一一講解OrderByStreamResultSetMerger,LimitDecoratorResultSetMerger和IteratorStreamResultSetMerger螺句,了解這幾個ResultSetMerger的原理;
OrderByStreamResultSetMerger
OrderByStreamResultSetMerger的核心源碼如下:
private final Queue<OrderByValue> orderByValuesQueue;
public OrderByStreamResultSetMerger(final List<ResultSet> resultSets, final List<OrderItem> orderByItems) throws SQLException {
// sql中order by列的信息橡类,實例sql是order by order_id desc蛇尚,即此處就是order_id
this.orderByItems = orderByItems;
// 初始化一個優(yōu)先級隊列,優(yōu)先級隊列中的元素會根據(jù)OrderByValue中compareTo()方法排序顾画,并且SQL重寫后發(fā)送到多少個目標(biāo)實際表取劫,List<ResultSet>的size就有多大,Queue的capacity就有多大研侣;
this.orderByValuesQueue = new PriorityQueue<>(resultSets.size());
// 將結(jié)果壓入隊列中
orderResultSetsToQueue(resultSets);
isFirstNext = true;
}
private void orderResultSetsToQueue(final List<ResultSet> resultSets) throws SQLException {
// 遍歷resultSets--在多少個目標(biāo)實際表上執(zhí)行SQL谱邪,該集合的size就有多大
for (ResultSet each : resultSets) {
// 將ResultSet和排序列信息封裝成一個OrderByValue類型
OrderByValue orderByValue = new OrderByValue(each, orderByItems);
// 如果值存在,那么壓入隊列中
if (orderByValue.next()) {
orderByValuesQueue.offer(orderByValue);
}
}
// 重置currentResultSet的位置:如果隊列不為空庶诡,那么將隊列的頂部(peek)位置設(shè)置為currentResultSet的位置
setCurrentResultSet(orderByValuesQueue.isEmpty() ? resultSets.get(0) : orderByValuesQueue.peek().getResultSet());
}
@Override
public boolean next() throws SQLException {
// 調(diào)用next()判斷是否還有值, 如果隊列為空, 表示沒有任何值, 那么直接返回false
if (orderByValuesQueue.isEmpty()) {
return false;
}
// 如果隊列不為空, 那么第一次一定返回true惦银;即有結(jié)果可取(且將isFirstNext置為false末誓,表示接下來的請求都不是第一次請求next()方法)
if (isFirstNext) {
isFirstNext = false;
return true;
}
// 從隊列中彈出第一個元素(因為是優(yōu)先級隊列扯俱,所以poll()返回的值,就是此次要取的值)
OrderByValue firstOrderByValue = orderByValuesQueue.poll();
// 如果它的next()存在喇澡,那么將它的next()再添加到隊列中
if (firstOrderByValue.next()) {
orderByValuesQueue.offer(firstOrderByValue);
}
// 隊列中所有元素全部處理完后就返回false
if (orderByValuesQueue.isEmpty()) {
return false;
}
// 再次重置currentResultSet的位置為隊列的頂部位置迅栅;
setCurrentResultSet(orderByValuesQueue.peek().getResultSet());
return true;
}
繼續(xù)深入剖析:這段代碼初看可能有點(diǎn)繞,假設(shè)運(yùn)行SQL
SELECT o.* FROM t_order o where o.user_id=10 order by o.order_id desc limit 3
會分發(fā)到兩個目標(biāo)實際表晴玖,且第一個實際表返回的結(jié)果是1读存,3,5呕屎,7让簿,9;第二個實際表返回的結(jié)果是2榨惰,4拜英,6,8琅催,10居凶;那么虫给,經(jīng)過OrderByStreamResultSetMerger的構(gòu)造方法中的orderResultSetsToQueue()方法后,Queue<OrderByValue> orderByValuesQueue
中包含兩個OrderByValue侠碧,一個是10抹估,一個是9;接下來取值運(yùn)行過程如下:
- 取得10弄兜,并且10的next()是8药蜻,然后執(zhí)行orderByValuesQueue.offer(8);,這時候orderByValuesQueue中包含8和9替饿;
- 取得9语泽,并且9的next()是7,然后執(zhí)行orderByValuesQueue.offer(7);视卢,這時候orderByValuesQueue中包含7和8踱卵;
- 取得8,并且8的next()是6据过,然后執(zhí)行orderByValuesQueue.offer(6);惋砂,這時候orderByValuesQueue中包含7和6;
取值數(shù)量已經(jīng)達(dá)到limit 3的限制(源碼在LimitDecoratorResultSetMerger中的next()方法中)绳锅,退出西饵;
這段代碼運(yùn)行示意圖如下所示:
LimitDecoratorResultSetMerger
LimitDecoratorResultSetMerger核心源碼如下:
public LimitDecoratorResultSetMerger(final ResultSetMerger resultSetMerger, final Limit limit) throws SQLException {
super(resultSetMerger);
// limit賦值(Limit對象包括limit m,n中的m和n兩個值)
this.limit = limit;
// 判斷是否會跳過所有的結(jié)果項,即判斷是否有符合條件的結(jié)果
skipAll = skipOffset();
}
private boolean skipOffset() throws SQLException {
// 假定limit.getOffsetValue()就是offset鳞芙,實例sql中為limit 2,3眷柔,所以offset=2
for (int i = 0; i < limit.getOffsetValue(); i++) {
// 嘗試從OrderByStreamResultSetMerger生成的優(yōu)先級隊列中跳過offset個元素,如果.next()一直為true积蜻,表示有足夠符合條件的結(jié)果闯割,那么返回false;否則沒有足夠符合條件的結(jié)果竿拆,那么返回true宙拉;即skilAll=true就表示跳過了所有沒有符合條件的結(jié)果;
if (!getResultSetMerger().next()) {
return true;
}
}
// limit m,n的sql會被重寫為limit 0, m+n丙笋,所以limit.isRowCountRewriteFlag()為true谢澈,rowNumber的值為0;
rowNumber = limit.isRowCountRewriteFlag() ? 0 : limit.getOffsetValue();
return false;
}
@Override
public boolean next() throws SQLException {
// 如果skipAll為true御板,即跳過所有锥忿,表示沒有任何符合條件的值,那么返回false
if (skipAll) {
return false;
}
if (limit.getRowCountValue() > -1) {
// 每次next()獲取值后怠肋,rowNumber自增敬鬓,當(dāng)自增rowCountValue次后,就不能再往下繼續(xù)取值了,因為條件limit 2,3(rowCountValue=3)限制了
return ++rowNumber <= limit.getRowCountValue() && getResultSetMerger().next();
}
return getResultSetMerger().next();
}
IteratorStreamResultSetMerger
構(gòu)造方法核心源碼:
private final Iterator<ResultSet> resultSets;
public IteratorStreamResultSetMerger(final List<ResultSet> resultSets) {
// 將List<ResultSet>改成Iterator<ResultSet>钉答,方便接下來迭代取得結(jié)果础芍;
this.resultSets = resultSets.iterator();
// 重置currentResultSet
setCurrentResultSet(this.resultSets.next());
}