5. sharding-jdbc源碼之結(jié)果合并

阿飛Javaer,轉(zhuǎn)載請注明原創(chuàng)出處,謝謝!

單表查詢之結(jié)果合并

接下來以執(zhí)行SELECT o.* FROM t_order o where o.user_id=10 order by o.order_id desc limit 2,3分析下面這段Java代碼是如何對結(jié)果進(jìn)行合并的:

result = new ShardingResultSet(resultSets, new MergeEngine(resultSets, (SelectStatement) routeResult.getSqlStatement()).merge());

MergeEngine.merge()方法的源碼如下:

public ResultSetMerger merge() throws SQLException {
    selectStatement.setIndexForItems(columnLabelIndexMap);
    return decorate(build());
}

build()方法源碼如下:

private ResultSetMerger build() throws SQLException {
    // 說明:GroupBy***ResultSetMerger在第六篇文章單獨(dú)講解,所以此次分析的SQL條件中沒有g(shù)roup by
    if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) {
        if (selectStatement.isSameGroupByAndOrderByItems()) {
            return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement);
        } else {
            return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement);
        }
    }
    // 如果select語句中有order by字段江场,那么需要OrderByStreamResultSetMerger對結(jié)果處理
    if (!selectStatement.getOrderByItems().isEmpty()) {
        return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems());
    }
    return new IteratorStreamResultSetMerger(resultSets);
}

根據(jù)這段代碼可知,其作用是根據(jù)sql語句選擇多個不同的ResultSetMerger對結(jié)果進(jìn)行合并處理窖逗,ResultSetMerger實現(xiàn)有這幾種:GroupByStreamResultSetMerger址否,GroupByMemoryResultSetMerger,OrderByStreamResultSetMerger碎紊,IteratorStreamResultSetMerger佑附,LimitDecoratorResultSetMerger樊诺;以測試SQLSELECT o.* FROM t_order o where o.user_id=10 order by o.order_id desc limit 2,3為例,沒有g(shù)roup by音同,但是有order by词爬,所以使用到了OrderByStreamResultSetMerger和LimitDecoratorResultSetMerger對結(jié)果進(jìn)行合并(GroupByStreamResultSetMerger&GroupByMemoryResultSetMerger后面單獨(dú)講解)

decorate()源碼如下:

private ResultSetMerger decorate(final ResultSetMerger resultSetMerger) throws SQLException {
    ResultSetMerger result = resultSetMerger;
    // 如果SQL語句中有l(wèi)imist,還需要LimitDecoratorResultSetMerger配合進(jìn)行結(jié)果歸并瘟斜;
    if (null != selectStatement.getLimit()) {
        result = new LimitDecoratorResultSetMerger(result, selectStatement.getLimit());
    }
    return result;
}

接下來將以執(zhí)行SQL:SELECT o.* FROM t_order o where o.user_id=10 order by o.order_id desc limit 2,3(該SQL會被改寫成SELECT o.* , o.order_id AS ORDER_BY_DERIVED_0 FROM t_order_0 o where o.user_id=? order by o.order_id desc limit 2,3)為例缸夹,一一講解OrderByStreamResultSetMerger,LimitDecoratorResultSetMerger和IteratorStreamResultSetMerger螺句,了解這幾個ResultSetMerger的原理;

OrderByStreamResultSetMerger

OrderByStreamResultSetMerger的核心源碼如下:

private final Queue<OrderByValue> orderByValuesQueue;

public OrderByStreamResultSetMerger(final List<ResultSet> resultSets, final List<OrderItem> orderByItems) throws SQLException {
    // sql中order by列的信息橡类,實例sql是order by order_id desc蛇尚,即此處就是order_id
    this.orderByItems = orderByItems;
    // 初始化一個優(yōu)先級隊列,優(yōu)先級隊列中的元素會根據(jù)OrderByValue中compareTo()方法排序顾画,并且SQL重寫后發(fā)送到多少個目標(biāo)實際表取劫,List<ResultSet>的size就有多大,Queue的capacity就有多大研侣;
    this.orderByValuesQueue = new PriorityQueue<>(resultSets.size());
    // 將結(jié)果壓入隊列中
    orderResultSetsToQueue(resultSets);
    isFirstNext = true;
}

private void orderResultSetsToQueue(final List<ResultSet> resultSets) throws SQLException {
    // 遍歷resultSets--在多少個目標(biāo)實際表上執(zhí)行SQL谱邪,該集合的size就有多大
    for (ResultSet each : resultSets) {
        // 將ResultSet和排序列信息封裝成一個OrderByValue類型
        OrderByValue orderByValue = new OrderByValue(each, orderByItems);
        // 如果值存在,那么壓入隊列中
        if (orderByValue.next()) {
            orderByValuesQueue.offer(orderByValue);
        }
    }
    // 重置currentResultSet的位置:如果隊列不為空庶诡,那么將隊列的頂部(peek)位置設(shè)置為currentResultSet的位置
    setCurrentResultSet(orderByValuesQueue.isEmpty() ? resultSets.get(0) : orderByValuesQueue.peek().getResultSet());
}

@Override
public boolean next() throws SQLException {
    // 調(diào)用next()判斷是否還有值, 如果隊列為空, 表示沒有任何值, 那么直接返回false
    if (orderByValuesQueue.isEmpty()) {
        return false;
    }
    // 如果隊列不為空, 那么第一次一定返回true惦银;即有結(jié)果可取(且將isFirstNext置為false末誓,表示接下來的請求都不是第一次請求next()方法)
    if (isFirstNext) {
        isFirstNext = false;
        return true;
    }
    // 從隊列中彈出第一個元素(因為是優(yōu)先級隊列扯俱,所以poll()返回的值,就是此次要取的值)
    OrderByValue firstOrderByValue = orderByValuesQueue.poll();
    // 如果它的next()存在喇澡,那么將它的next()再添加到隊列中
    if (firstOrderByValue.next()) {
        orderByValuesQueue.offer(firstOrderByValue);
    }
    // 隊列中所有元素全部處理完后就返回false
    if (orderByValuesQueue.isEmpty()) {
        return false;
    }
    // 再次重置currentResultSet的位置為隊列的頂部位置迅栅;
    setCurrentResultSet(orderByValuesQueue.peek().getResultSet());
    return true;
}

繼續(xù)深入剖析:這段代碼初看可能有點(diǎn)繞,假設(shè)運(yùn)行SQLSELECT o.* FROM t_order o where o.user_id=10 order by o.order_id desc limit 3會分發(fā)到兩個目標(biāo)實際表晴玖,且第一個實際表返回的結(jié)果是1读存,3,5呕屎,7让簿,9;第二個實際表返回的結(jié)果是2榨惰,4拜英,6,8琅催,10居凶;那么虫给,經(jīng)過OrderByStreamResultSetMerger的構(gòu)造方法中的orderResultSetsToQueue()方法后,Queue<OrderByValue> orderByValuesQueue中包含兩個OrderByValue侠碧,一個是10抹估,一個是9;接下來取值運(yùn)行過程如下:

  1. 取得10弄兜,并且10的next()是8药蜻,然后執(zhí)行orderByValuesQueue.offer(8);,這時候orderByValuesQueue中包含8和9替饿;
  2. 取得9语泽,并且9的next()是7,然后執(zhí)行orderByValuesQueue.offer(7);视卢,這時候orderByValuesQueue中包含7和8踱卵;
  3. 取得8,并且8的next()是6据过,然后執(zhí)行orderByValuesQueue.offer(6);惋砂,這時候orderByValuesQueue中包含7和6;
    取值數(shù)量已經(jīng)達(dá)到limit 3的限制(源碼在LimitDecoratorResultSetMerger中的next()方法中)绳锅,退出西饵;

這段代碼運(yùn)行示意圖如下所示:


image-1.png

image-2.png

LimitDecoratorResultSetMerger

LimitDecoratorResultSetMerger核心源碼如下:

public LimitDecoratorResultSetMerger(final ResultSetMerger resultSetMerger, final Limit limit) throws SQLException {
    super(resultSetMerger);
    // limit賦值(Limit對象包括limit m,n中的m和n兩個值)
    this.limit = limit;
    // 判斷是否會跳過所有的結(jié)果項,即判斷是否有符合條件的結(jié)果
    skipAll = skipOffset();
}

private boolean skipOffset() throws SQLException {
    // 假定limit.getOffsetValue()就是offset鳞芙,實例sql中為limit 2,3眷柔,所以offset=2
    for (int i = 0; i < limit.getOffsetValue(); i++) {
        // 嘗試從OrderByStreamResultSetMerger生成的優(yōu)先級隊列中跳過offset個元素,如果.next()一直為true积蜻,表示有足夠符合條件的結(jié)果闯割,那么返回false;否則沒有足夠符合條件的結(jié)果竿拆,那么返回true宙拉;即skilAll=true就表示跳過了所有沒有符合條件的結(jié)果;
        if (!getResultSetMerger().next()) {
            return true;
        }
    }
    // limit m,n的sql會被重寫為limit 0, m+n丙笋,所以limit.isRowCountRewriteFlag()為true谢澈,rowNumber的值為0;
    rowNumber = limit.isRowCountRewriteFlag() ? 0 : limit.getOffsetValue();
    return false;
}

@Override
public boolean next() throws SQLException {
    // 如果skipAll為true御板,即跳過所有锥忿,表示沒有任何符合條件的值,那么返回false
    if (skipAll) {
        return false;
    }
    if (limit.getRowCountValue() > -1) {
        // 每次next()獲取值后怠肋,rowNumber自增敬鬓,當(dāng)自增rowCountValue次后,就不能再往下繼續(xù)取值了,因為條件limit 2,3(rowCountValue=3)限制了
        return ++rowNumber <= limit.getRowCountValue() && getResultSetMerger().next();
    }
    return getResultSetMerger().next();
}

IteratorStreamResultSetMerger

構(gòu)造方法核心源碼:

private final Iterator<ResultSet> resultSets;

public IteratorStreamResultSetMerger(final List<ResultSet> resultSets) {
    // 將List<ResultSet>改成Iterator<ResultSet>钉答,方便接下來迭代取得結(jié)果础芍;
    this.resultSets = resultSets.iterator();
    // 重置currentResultSet
    setCurrentResultSet(this.resultSets.next());
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市数尿,隨后出現(xiàn)的幾起案子仑性,更是在濱河造成了極大的恐慌,老刑警劉巖右蹦,帶你破解...
    沈念sama閱讀 211,639評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件诊杆,死亡現(xiàn)場離奇詭異,居然都是意外死亡何陆,警方通過查閱死者的電腦和手機(jī)晨汹,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來甲献,“玉大人宰缤,你說我怎么就攤上這事』稳鳎” “怎么了?”我有些...
    開封第一講書人閱讀 157,221評論 0 348
  • 文/不壞的土叔 我叫張陵朦乏,是天一觀的道長球及。 經(jīng)常有香客問我,道長呻疹,這世上最難降的妖魔是什么吃引? 我笑而不...
    開封第一講書人閱讀 56,474評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮刽锤,結(jié)果婚禮上镊尺,老公的妹妹穿的比我還像新娘。我一直安慰自己并思,他們只是感情好庐氮,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,570評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著宋彼,像睡著了一般弄砍。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上输涕,一...
    開封第一講書人閱讀 49,816評論 1 290
  • 那天音婶,我揣著相機(jī)與錄音,去河邊找鬼莱坎。 笑死衣式,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播碴卧,決...
    沈念sama閱讀 38,957評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼弱卡,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了螟深?” 一聲冷哼從身側(cè)響起谐宙,我...
    開封第一講書人閱讀 37,718評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎界弧,沒想到半個月后凡蜻,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,176評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡垢箕,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,511評論 2 327
  • 正文 我和宋清朗相戀三年划栓,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片条获。...
    茶點(diǎn)故事閱讀 38,646評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡忠荞,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出帅掘,到底是詐尸還是另有隱情委煤,我是刑警寧澤,帶...
    沈念sama閱讀 34,322評論 4 330
  • 正文 年R本政府宣布修档,位于F島的核電站碧绞,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏吱窝。R本人自食惡果不足惜讥邻,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,934評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望院峡。 院中可真熱鬧兴使,春花似錦、人聲如沸照激。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽实抡。三九已至欠母,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間吆寨,已是汗流浹背赏淌。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留啄清,地道東北人六水。 一個月前我還...
    沈念sama閱讀 46,358評論 2 360
  • 正文 我出身青樓俺孙,卻偏偏與公主長得像,于是被迫代替她去往敵國和親掷贾。 傳聞我的和親對象是個殘疾皇子睛榄,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,514評論 2 348

推薦閱讀更多精彩內(nèi)容

  • 1. Java基礎(chǔ)部分 基礎(chǔ)部分的順序:基本語法,類相關(guān)的語法想帅,內(nèi)部類的語法场靴,繼承相關(guān)的語法,異常的語法港准,線程的語...
    子非魚_t_閱讀 31,598評論 18 399
  • 一. Java基礎(chǔ)部分.................................................
    wy_sure閱讀 3,805評論 0 11
  • 50個常用的sql語句Student(S#,Sname,Sage,Ssex) 學(xué)生表Course(C#,Cname...
    哈哈海閱讀 1,227評論 0 7
  • 木犀漸濃桂影深 可憐清香最先聞 浮云略日何所至 一樹秋風(fēng)落九塵
    迷失的指南針閱讀 202評論 0 4
  • 現(xiàn)實生活中浅缸,我們天天掙錢花錢轨帜,錢可以說是離我們最近的東西。 可很多人對金錢連沒一個正確的認(rèn)識都沒有衩椒,說什么“金錢是...
    化蛹為碟閱讀 685評論 0 0