How does Index Source work in Presto

Overview

Presto Index Source is an optimizing strategy based on the index of data source. It can improve query performance by avoiding reading of data that would be filtered by join condition.

In Index Source strategy, presto will transfer keys from the left table(probe side) to the right table(the Index Source), then the Index Source will do a lookup operation to fetch records according to the keys it received. After that the left and right table can do a hash join operation. That's to say, using index source will not create any splits or do a table scan operation, index source will read records according to the keys it received directly.

So Presto Index Source performs well when the right table is extremely large and we only need a few of them according to the join conditions. Besides, the right table must have an efficient way to fetch rows associated with keys.

Index Source is a little bit like dynamic filter, they all focus on how to reduce the data amount to be read, but dynamic filter has a timeout, and it focuses on the left table.

Code tracing

In order to figure out how Index Source works, I did some code debugging job, so let's start from a simple test case in class com.facebook.presto.tests.AbstractTestIndexedQueries#testBasicIndexJoin. SQL is :

SELECT * FROM (SELECT * FROM lineitem WHERE partkey % 8 = 0) l JOIN orders o ON l.orderkey = o.orderkey;

From its execution plan we can see that Index Source has changed its right table's ScanProjectOperator to an IndexSourceOperator.

IndexSource[tpch_indexed:com.facebook.presto.tests.tpch.TpchIndexHandle@2981c060, lookup = [orderkey_63]] => [orderkey_63:bigint, custkey:bigint, orderstatus:varchar(1), totalprice:double, orderdate:date, orderpriority:varchar(15), clerk:varchar(15), shippriority:integer, comment_64:varchar(79)]
        CPU: 1.68s (30.82%), Scheduled: 1.77s (28.76%), Output: 332605 rows (41.22MB)
        Input avg.: 523.79 rows, Input std.dev.: 102.78%
        orderkey_63 := tpch:orderkey
        custkey := tpch:custkey
        orderstatus := tpch:orderstatus
        totalprice := tpch:totalprice
        orderdate := tpch:orderdate
        orderpriority := tpch:orderpriority
        clerk := tpch:clerk
        shippriority := tpch:shippriority
        comment_64 := tpch:comment

IndexSourceOperator is defined in class com.facebook.presto.operator.index.IndexSourceOperator. Its core code is :

    @Override
    public Supplier<Optional<UpdatablePageSource>> addSplit(Split split)
    {
        requireNonNull(split, "split is null");
        checkState(source == null, "Index source split already set");

        IndexSplit indexSplit = (IndexSplit) split.getConnectorSplit();

        // Normalize the incoming RecordSet to something that can be consumed by the index
        RecordSet normalizedRecordSet = probeKeyNormalizer.apply(indexSplit.getKeyRecordSet());
        // !!!!!filter the right table's records according to the left table's key set ( indexSplit.getKeyRecordSet() )
        ConnectorPageSource result = index.lookup(normalizedRecordSet);
        // create right table's page source according to the filter result, and read the result set page by page later.
        source = new PageSourceOperator(result, operatorContext);

        Object splitInfo = split.getInfo();
        if (splitInfo != null) {
            operatorContext.setInfoSupplier(() -> new SplitOperatorInfo(splitInfo));
        }

        return Optional::empty;
    }

Method ConnectorIndex#lookup is refer to the lookup operation in execution plan. Step into this method. This interface is only implemented by TpchConnectorIndex:

    @Override
    public ConnectorPageSource lookup(RecordSet rawInputRecordSet)
    {
        // convert the input record set from the column ordering in the query to
        // match the column ordering of the index
        RecordSet inputRecordSet = keyFormatter.apply(rawInputRecordSet);

        // !!!!!lookup the values in the index
        RecordSet rawOutputRecordSet = indexedTable.lookupKeys(inputRecordSet);

        // convert the output record set of the index into the column ordering
        // expect by the query
        return new RecordPageSource(outputFormatter.apply(rawOutputRecordSet));
    }

Let's step into method IndexedTable#lookupKeys to see how they do this filtering job:

        public RecordSet lookupKeys(RecordSet recordSet)
        {
            // Since we only return a cached copy of IndexedTable, please make sure you reorder the input to same order of keyColumns
            checkArgument(recordSet.getColumnTypes().equals(keyTypes), "Input RecordSet keys do not match expected key type");

            Iterable<RecordSet> outputRecordSets = Iterables.transform(tupleIterable(recordSet), key -> {
                for (Object value : key.getValues()) {
                    if (value == null) {
                        throw new IllegalArgumentException("TPCH index does not support null values");
                    }
                }
                // lookup record by specified key
                return lookupKey(key);
            });

            // We will return result same order as outputColumns
            return new ConcatRecordSet(outputRecordSets, outputTypes);
        }

        private RecordSet lookupKey(MaterializedTuple tupleKey)
        {
            // fetch records from cache key -> record mapping in local attribute keyToValues
            return new MaterializedTupleRecordSet(keyToValues.get(tupleKey), outputTypes);
        }

For tpch is just a test connector, so when com.facebook.presto.tests.tpch.IndexedTpchConnectorFactory is initializing, it has read all the records in table orders and cached them as key -> record mapping in a ListMultimap named keyToValues, so here we can just get record by key and return. Below is how tpch connector cache data in table orders.

    public TpchIndexedData(String connectorId, TpchIndexSpec tpchIndexSpec)
    {
        requireNonNull(connectorId, "connectorId is null");
        requireNonNull(tpchIndexSpec, "tpchIndexSpec is null");

        TpchMetadata tpchMetadata = new TpchMetadata(connectorId);
        TpchRecordSetProvider tpchRecordSetProvider = new TpchRecordSetProvider();

        ImmutableMap.Builder<Set<TpchScaledColumn>, IndexedTable> indexedTablesBuilder = ImmutableMap.builder();

        Set<TpchScaledTable> tables = tpchIndexSpec.listIndexedTables();
        for (TpchScaledTable table : tables) {
            SchemaTableName tableName = new SchemaTableName("sf" + table.getScaleFactor(), table.getTableName());
            TpchTableHandle tableHandle = tpchMetadata.getTableHandle(null, tableName);
            Map<String, ColumnHandle> columnHandles = new LinkedHashMap<>(tpchMetadata.getColumnHandles(null, tableHandle));
            for (Set<String> columnNames : tpchIndexSpec.getColumnIndexes(table)) {
                List<String> keyColumnNames = ImmutableList.copyOf(columnNames); // Finalize the key order
                Set<TpchScaledColumn> keyColumns = keyColumnNames.stream()
                        .map(name -> new TpchScaledColumn(table, name))
                        .collect(toImmutableSet());

                TpchTable<?> tpchTable = TpchTable.getTable(table.getTableName());
                RecordSet recordSet = tpchRecordSetProvider.getRecordSet(tpchTable, ImmutableList.copyOf(columnHandles.values()), table.getScaleFactor(), 0, 1, TupleDomain.all());
                IndexedTable indexedTable = indexTable(recordSet, ImmutableList.copyOf(columnHandles.keySet()), keyColumnNames);
                indexedTablesBuilder.put(keyColumns, indexedTable);
            }
        }

        indexedTables = indexedTablesBuilder.build();
    }

    private static IndexedTable indexTable(RecordSet recordSet, final List<String> outputColumns, List<String> keyColumns)
    {
        List<Integer> keyPositions = keyColumns.stream()
                .map(columnName -> {
                    int position = outputColumns.indexOf(columnName);
                    checkState(position != -1);
                    return position;
                })
                .collect(toImmutableList());

        ImmutableListMultimap.Builder<MaterializedTuple, MaterializedTuple> indexedValuesBuilder = ImmutableListMultimap.builder();

        List<Type> outputTypes = recordSet.getColumnTypes();
        List<Type> keyTypes = extractPositionValues(outputTypes, keyPositions);

        RecordCursor cursor = recordSet.cursor();
        while (cursor.advanceNextPosition()) {
            List<Object> values = extractValues(cursor, outputTypes);
            List<Object> keyValues = extractPositionValues(values, keyPositions);

            indexedValuesBuilder.put(new MaterializedTuple(keyValues), new MaterializedTuple(values));
        }

        return new IndexedTable(keyColumns, keyTypes, outputColumns, outputTypes, indexedValuesBuilder.build());
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末秸架,一起剝皮案震驚了整個濱河市翁授,隨后出現(xiàn)的幾起案子颤霎,更是在濱河造成了極大的恐慌抚恒,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,331評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蓝翰,死亡現(xiàn)場離奇詭異壤巷,居然都是意外死亡,警方通過查閱死者的電腦和手機宣虾,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,372評論 3 398
  • 文/潘曉璐 我一進店門惯裕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人绣硝,你說我怎么就攤上這事蜻势。” “怎么了域那?”我有些...
    開封第一講書人閱讀 167,755評論 0 360
  • 文/不壞的土叔 我叫張陵咙边,是天一觀的道長。 經(jīng)常有香客問我次员,道長,這世上最難降的妖魔是什么王带? 我笑而不...
    開封第一講書人閱讀 59,528評論 1 296
  • 正文 為了忘掉前任淑蔚,我火速辦了婚禮,結(jié)果婚禮上愕撰,老公的妹妹穿的比我還像新娘刹衫。我一直安慰自己,他們只是感情好搞挣,可當我...
    茶點故事閱讀 68,526評論 6 397
  • 文/花漫 我一把揭開白布带迟。 她就那樣靜靜地躺著,像睡著了一般囱桨。 火紅的嫁衣襯著肌膚如雪仓犬。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,166評論 1 308
  • 那天舍肠,我揣著相機與錄音搀继,去河邊找鬼。 笑死翠语,一個胖子當著我的面吹牛叽躯,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播肌括,決...
    沈念sama閱讀 40,768評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼点骑,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了谍夭?” 一聲冷哼從身側(cè)響起黑滴,我...
    開封第一講書人閱讀 39,664評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎慧库,沒想到半個月后跷跪,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,205評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡齐板,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,290評論 3 340
  • 正文 我和宋清朗相戀三年吵瞻,在試婚紗的時候發(fā)現(xiàn)自己被綠了葛菇。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,435評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡橡羞,死狀恐怖眯停,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情卿泽,我是刑警寧澤莺债,帶...
    沈念sama閱讀 36,126評論 5 349
  • 正文 年R本政府宣布,位于F島的核電站签夭,受9級特大地震影響齐邦,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜第租,卻給世界環(huán)境...
    茶點故事閱讀 41,804評論 3 333
  • 文/蒙蒙 一措拇、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧慎宾,春花似錦丐吓、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,276評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至汹碱,卻和暖如春粘衬,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背比被。 一陣腳步聲響...
    開封第一講書人閱讀 33,393評論 1 272
  • 我被黑心中介騙來泰國打工色难, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人等缀。 一個月前我還...
    沈念sama閱讀 48,818評論 3 376
  • 正文 我出身青樓枷莉,卻偏偏與公主長得像,于是被迫代替她去往敵國和親尺迂。 傳聞我的和親對象是個殘疾皇子笤妙,可洞房花燭夜當晚...
    茶點故事閱讀 45,442評論 2 359

推薦閱讀更多精彩內(nèi)容