Presto連接器-grouped_execution功能要實現(xiàn)的接口

presto連接器-grouped_execution功能要實現(xiàn)的接口

原理

分組執(zhí)行是presto在2017年就支持的功能瞻颂,它的原理是:

根據(jù)相同字段(orderid)分桶(bucketing)且分桶數(shù)量相同的兩個表(orders,orders_item)篮幢,

在通過orderid進行join的時候,由于兩個表相同的orderid都分到相同id的桶里侥锦,所以是可以獨立進行join以及聚合計算的(參考MapReduer的partition過程)。

這樣濒翻,每當一個桶的數(shù)據(jù)計算完成后中狂,可以立即把這個桶所占用的內(nèi)存釋放掉凫碌,因此,通過控制并行處理桶的數(shù)量可以限制內(nèi)存的占用吃型。

計算理論占用的內(nèi)存:優(yōu)化后的內(nèi)存占用=原內(nèi)存占用/表的桶數(shù)量*并行處理桶的數(shù)量

完整的示例參考:

https://archongum.cn/2019/08/21/presto-query-memory-optimze/

要實現(xiàn)的接口

我是在基于legacy機制實現(xiàn)的kudu connector的基礎上實現(xiàn)的分組執(zhí)行证鸥。下面是要實現(xiàn)的接口:

ConnectorPartitioningHandle

這個接口可以存放我們在處理分桶邏輯時需要用到的參數(shù)。比如數(shù)據(jù)庫名勤晚、表名枉层、分桶個數(shù)、分桶字段明細信息等等赐写。

  1. equals
    該方法用來判斷join操作的左右表是否可以執(zhí)行分桶操作鸟蜡,所以一定不要根據(jù)表名是否相等來判斷true和false,否則會導致join操作永遠也無法做grouped_execution挺邀∪嗤可以使用分桶個數(shù)、分桶字段類型等屬性來判斷端铛。

ConnectorNodePartitioningProvider

  1. listPartitionHandles
    列出當前表所有的bucket泣矛,入?yún)onnectorPartitioningHandle就是我們封裝的KuduPartitioningHandle,可以把接口實現(xiàn)需要用到的參數(shù)放到這個類中禾蚕。返回的bucket只要有一個bucket_number就行了您朽。這樣在ConnectorSplitSource中會依次處理這些bucket_number。

  2. getBucketNodeMap
    仿照hive的實現(xiàn)即可换淆,他的作用是構建bucket和node的映射關系哗总,供調(diào)度使用。相關邏輯可以參考FixedSourcePartitionedScheduler的構造函數(shù)倍试。

  3. getSplitBucketFunction
    獲取bucket的bucket_number讯屈。

  4. getBucketFunction
    封裝BucketFunction對象。這個對象的作用參考下文县习。

BucketFunction

這個類只在join操作時會使用涮母,作用還不能確定,理論上說是獲取一個page是屬于哪一個bucket的躁愿。他的作用體現(xiàn)在唯一的接口getBucket中哈蝇。

  1. getBucket
    入?yún)age是上層封裝的某個operator要處理的page中的所有分桶字段的值。每一列的值以page中一個block的形式存在攘已。入?yún)osition是行的index。這一部分的實現(xiàn)可以參考HiveBucketFunction和IcebergBucketFunction怜跑。

ConnectorMetadata#getTableLayouts

這個接口即使不實現(xiàn)grouped_execution功能我們也要實現(xiàn)样勃。通常情況只會封裝一個ConnectorTableLayoutHandle對象吠勘,然后調(diào)用下圖api返回一個ConnectorTableLayout對象。

    public ConnectorTableLayout(ConnectorTableLayoutHandle handle)
    {
        this(handle,
                Optional.empty(),
                TupleDomain.all(),
                Optional.empty(),
                Optional.empty(),
                Optional.empty(),
                emptyList());
    }

但是如果要實現(xiàn)grouped_execution就需要通過這個構造函數(shù)來將其他參數(shù)實例化:

    public ConnectorTableLayout(
            ConnectorTableLayoutHandle handle,
            Optional<List<ColumnHandle>> columns,
            TupleDomain<ColumnHandle> predicate,
            Optional<ConnectorTablePartitioning> tablePartitioning,
            Optional<Set<ColumnHandle>> streamPartitioningColumns,
            Optional<DiscretePredicates> discretePredicates,
            List<LocalProperty<ColumnHandle>> localProperties)

因為在PlanFragmenter#visitTableScan方法中會判斷tablePartitioning是否存在峡眶,如果不存在則返回GroupedExecutionProperties.notCapable()剧防,這將導致presto認定這個sql不符合grouped_execution的要求。

注:非legacy的實現(xiàn)會自動調(diào)用getTableProperties方法辫樱,就不需要再從getTableLayouts方法中做初始化了(非legacy也不會有getTableLayouts方法)峭拘。

PlanFragmenter#visitTableScan方法中判斷tablePartitioning是否存在的邏輯如下圖:

@Override
public GroupedExecutionProperties visitTableScan(TableScanNode node, Void context)
{
    Optional<TablePartitioning> tablePartitioning = metadata.getTableProperties(session, node.getTable()).getTablePartitioning();
    if (!tablePartitioning.isPresent()) {
        return GroupedExecutionProperties.notCapable();
    }
    List<ConnectorPartitionHandle> partitionHandles = nodePartitioningManager.listPartitionHandles(session, tablePartitioning.get().getPartitioningHandle());
    if (ImmutableList.of(NOT_PARTITIONED).equals(partitionHandles)) {
        return new GroupedExecutionProperties(false, false, ImmutableList.of());
    }
    else {
        return new GroupedExecutionProperties(true, false, ImmutableList.of(node.getId()));
    }
}

所以我們必須保證返回的tableProperties方法包含完整的tablePartitioning信息。

封裝ConnectorTableLayout用到的其他幾個參數(shù)需要通過ConnectorMetadata#getTableProperties方法一并返回狮暑。該方法的說明如下:

ConnectorMetadata#getTableProperties

該接口用來返回表的擴展信息鸡挠,例如ConnectorPartitioningHandle、切分bucket的字段(streamingPartitionColumns)搬男、分桶個數(shù)等等拣展。

同時在這里也可以使用自定義的session參數(shù)(或者表名)判斷是否要走分組執(zhí)行模式。如果不走只需要將tablePartitioning信息返回為optional.empty()即可缔逛。

注意:方法返回的ConnectorTableProperties對象的localProperties屬性可以設置分桶內(nèi)數(shù)據(jù)的排序方式备埃。這里如果設置的與實際不匹配,會導致group by操作中褐奴,同一個組中的數(shù)據(jù)被拆分到多個組中按脚。Presto的邏輯我沒細看,跟同事討論認為有可能是因為因為排序被預設了敦冬,所以當處理到其他key的數(shù)據(jù)時就認為當前正在統(tǒng)計的分桶結束了辅搬,會自動開始統(tǒng)計一個新的組。

ConnectorMetadata#getTableLayout

返回一個帶有ConnectorTablePartitioning等信息的ConnectorTableLayout即可匪补∩⌒粒可以調(diào)用getTableProperties方法獲取相關信息。

ConnectorSplitSource

  1. getNextBatch
    KuduSplitSource中有一個集合用來存放所有Splits(或者有一個方法能獲取所有splits)夯缺,這些Splits都是與Group(即入?yún)artitionHandle)相關聯(lián)的蚤氏。

    外界會遍歷所有的Group,依次獲取每個Group下所有的split踊兜,

    如果調(diào)用一次getNextBatch方法無法獲取完當前Group的所有split竿滨,可以在返回的ConnectorSplitBatch中把noMoreSplits設置為false,這樣外界還會再基于同樣的Group重新調(diào)用這個接口:

GetNextBatch#fetchSplits -> ConnectorAwareSplitSource#getNextBatch

直到這個Group所有的split都被獲取到之后捏境,我們把返回的ConnectorSplitBatch的noMoreSplits設置為true于游,外界就會再繼續(xù)獲取其他Group的splits了。而kudu是一個bucket切分一個split垫言,所以每次返回一個split茧泪,并把noMoreSplits設置為true。如下:

@Override
public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHandle partitionHandle, int maxSize) {
    KuduPartitionHandle kuduPartitionHandle = (KuduPartitionHandle) partitionHandle;

    KuduSplit kuduSplit = splitMap.remove(kuduPartitionHandle.getBucket());

    // 一個bucket只有一個split艘刚,所以noMoreSplits一定是true
    return completedFuture(new ConnectorSplitBatch(kuduSplit == null ? ImmutableList.of() : ImmutableList.of(kuduSplit), true));
}

外界調(diào)用getNextBatch獲取所有Group的Split的邏輯,可以從SourcePartitionedScheduler#schedule開始閱讀前痘。

  1. isFinished
    是否表中所有的split都被獲取完了。
    /**
     * Returns whether any more {@link ConnectorSplit} may be produced.
     * <p>
     * This method should only be called when there has been no invocation of getNextBatch,
     * or result Future of previous getNextBatch is done.
     * Calling this method at other time is not useful because the contract of such an invocation
     * will be inherently racy.
     */
    boolean isFinished();

ConnectorSplitManager#getSplits

該方法中担忧,需要根據(jù)入?yún)plitSchedulingStrategy的值判斷應該走自定義的ConnectorSplitSource還是FixedSplitSource

switch (splitSchedulingStrategy) {
    case UNGROUPED_SCHEDULING:
        return new FixedSplitSource(splits);
    case GROUPED_SCHEDULING:
        return new KuduSplitSource(splits);
    default:
        throw new IllegalArgumentException("Unknown splitSchedulingStrategy: " + splitSchedulingStrategy);
}

Connector

在該實現(xiàn)類的構造函數(shù)中增加ConnectorNodePartitioningProvider參數(shù)芹缔,并賦值給本地變量。

  1. getNodePartitioningProvider
@Override
public ConnectorNodePartitioningProvider getNodePartitioningProvider()
{
    return nodePartitioningProvider;
}

ConnectorHandleResolver

  1. getPartitioningHandleClass
@Override
public Class<? extends ConnectorPartitioningHandle> getPartitioningHandleClass()
{
    return KuduPartitioningHandle.class;
}

AbstractModule

  1. configure
    在該方法中設置ConnectorNodePartitioningProvider
bind(ConnectorNodePartitioningProvider.class).to(KuduNodePartitioningProvider.class).in(Scopes.SINGLETON);

注:presto的dynamic filtering和grouped_execution不能同時采用瓶盛,所以需要通過以下設置將dynamic filtering關閉
set session enable_dynamic_filtering=false;

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末最欠,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子惩猫,更是在濱河造成了極大的恐慌芝硬,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,406評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件帆锋,死亡現(xiàn)場離奇詭異吵取,居然都是意外死亡,警方通過查閱死者的電腦和手機锯厢,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,395評論 3 398
  • 文/潘曉璐 我一進店門皮官,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人实辑,你說我怎么就攤上這事捺氢。” “怎么了剪撬?”我有些...
    開封第一講書人閱讀 167,815評論 0 360
  • 文/不壞的土叔 我叫張陵摄乒,是天一觀的道長。 經(jīng)常有香客問我残黑,道長馍佑,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,537評論 1 296
  • 正文 為了忘掉前任梨水,我火速辦了婚禮拭荤,結果婚禮上,老公的妹妹穿的比我還像新娘疫诽。我一直安慰自己舅世,他們只是感情好,可當我...
    茶點故事閱讀 68,536評論 6 397
  • 文/花漫 我一把揭開白布奇徒。 她就那樣靜靜地躺著雏亚,像睡著了一般。 火紅的嫁衣襯著肌膚如雪摩钙。 梳的紋絲不亂的頭發(fā)上罢低,一...
    開封第一講書人閱讀 52,184評論 1 308
  • 那天,我揣著相機與錄音胖笛,去河邊找鬼奕短。 笑死宜肉,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的翎碑。 我是一名探鬼主播,決...
    沈念sama閱讀 40,776評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼之斯,長吁一口氣:“原來是場噩夢啊……” “哼日杈!你這毒婦竟也來了?” 一聲冷哼從身側響起佑刷,我...
    開封第一講書人閱讀 39,668評論 0 276
  • 序言:老撾萬榮一對情侶失蹤莉擒,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后瘫絮,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體涨冀,經(jīng)...
    沈念sama閱讀 46,212評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,299評論 3 340
  • 正文 我和宋清朗相戀三年麦萤,在試婚紗的時候發(fā)現(xiàn)自己被綠了鹿鳖。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,438評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡壮莹,死狀恐怖翅帜,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情命满,我是刑警寧澤涝滴,帶...
    沈念sama閱讀 36,128評論 5 349
  • 正文 年R本政府宣布,位于F島的核電站胶台,受9級特大地震影響歼疮,放射性物質發(fā)生泄漏。R本人自食惡果不足惜诈唬,卻給世界環(huán)境...
    茶點故事閱讀 41,807評論 3 333
  • 文/蒙蒙 一韩脏、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧讯榕,春花似錦骤素、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,279評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至霎槐,卻和暖如春送浊,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背丘跌。 一陣腳步聲響...
    開封第一講書人閱讀 33,395評論 1 272
  • 我被黑心中介騙來泰國打工袭景, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留唁桩,地道東北人。 一個月前我還...
    沈念sama閱讀 48,827評論 3 376
  • 正文 我出身青樓耸棒,卻偏偏與公主長得像荒澡,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子与殃,可洞房花燭夜當晚...
    茶點故事閱讀 45,446評論 2 359