【工作】Presto Hive讀取代碼閱讀筆記

PS:基于 presto-0.258

整體流程

接收語句

DispatchManager createQueryInternal
    queryPreparer.prepareQuer // preparedQuery [封裝Statement]
        dispatchQueryFactory.createDispatchQuery => DispatchQuery 
            resourceGroupManager.submit(preparedQuery.getStatement(), dq, selectionContext, queryExecutor)

提交成功

InternalResourceGroup run (LocalDispatchQuery)
    InternalResourceGroup startInBackground
        LocalDispatchQuery waitForMinimumWorkers
            LocalDispatchQuery startExecution
                SqlQueryExecution start

開始執(zhí)行

    PlanRoot plan = analyzeQuery();
    planDistribution(plan);
    scheduler.start(); // SqlQueryScheduler

一些細(xì)節(jié)

hive表的元數(shù)據(jù)訪問

元數(shù)據(jù)總體由 HiveMetadata維護(hù)衫樊,里面包含metastore連接,partitionManager以及一些輔助方法利花。

獲取表的元數(shù)據(jù)


        StatementAnalyzer visitTable
            TableMetadata tableMetadata = metadata.getTableMetadata(session, tableHandle.get());
                ConnectorMetadata metadata = getMetadata(session, connectorId); -> HiveMetadata
                    解析一些 
                        HiveStorageFormat 
                        properties 
                        partitionedBy 
                        bucketProperty 
                        preferredOrderingColumns 
                        orcBloomFilterColumns
                        orcBloomFilterFfp
                        comment
                    等信息
                封裝到ConnectorTableMetadata

Source Split的切分

        從plan里createStageScheduler
            splitSourceProvider // 這里會出現(xiàn)HiveTableLayoutHandle 描述了表的目錄 分區(qū) 字段 謂詞等 甚至有tableParameters
                HiveSplitSource allAtOnce //返回的是HiveSplitSource實例 封裝了一個AsyncQueue隊列去存儲split
                    HiveSplitSource getNextBatch //這是每一批
                        BackgroundHiveSplitLoader loadSplits //這里觸發(fā)分區(qū) 文件的迭代 和split計算 科侈。。炒事。
                            StoragePartitionLoader loadPartition //這里有個 DirectoryLister 【重點關(guān)注】
                                這里夾雜幾種情況
                                    SymlinkTextInputFormat
                                    shouldUseFileSplitsFromInputFormat(inputFormat))
                                        InputSplit[] splits = inputFormat.getSplits(jobConf, 0); 去拿到split 臀栈。。
                                    if (tableBucketInfo.isPresent()) {
                                不同情況解析split的邏輯不一樣
                                正常情況是非bucket普通表
                                是用DirectoryLister去list分區(qū)目錄path 一個文件對應(yīng)一個InternalHiveSplit(也可能被path filter過濾)
                                    Optional<InternalHiveSplit> createInternalHiveSplit(HiveFileInfo fileInfo
                                    這里的邏輯:
                                        1)提取 List<HostAddress> addresses
                                        2)計算分區(qū)這個文件的相對路徑 URI relativePath = partitionInfo.getPath().relativize(path.toUri());
  • 上面返回的只是InternalHiveSplit 還需要在 HiveSplitSource的getNextBatch里變成HiveSplit

  • queues.borrowBatchAsync(bucketNumber xxx 觸發(fā)future list目錄任務(wù) 挠乳。权薯。

  • 最后對外輸出的是 HiveSplit【封裝了一大堆東西姑躲。∶蓑迹基于maxSplitSize算出來的 即一個文件 可能有多個】

  • 關(guān)于split元數(shù)據(jù)這塊比spark調(diào)度要好很多 因為是流式的 不是靜態(tài)的集合 黍析。。 內(nèi)存需求會少很多屎开。

  • 最主要的是ListenableFuture<?> future = hiveSplitSource.addToQueue(splits.next());

  • 最后輸出的HiveSplit在一個PerBucket + AsyncQueue 組合的復(fù)雜的隊列緩存結(jié)構(gòu)里

節(jié)點選擇 [SOFT Affinity scheduler]

  • 這里實際上是用path的哈希取模所有節(jié)點 得到固定的目標(biāo)節(jié)點映射列表
    (好像忽略了文件實際位置橄仍。。但是因為這有緩存 包括文件的 所以可能是綜合考慮 如果是hard的話 是不是可能不均衡 牍戚?)
  • 貌似只適合于存算分離的架構(gòu)。虑粥。
  • 如果是存算一體的 建議選HARD Affinity 如孝,即類似spark的preference local node

緩存(Raptorx中的特性)

  • 1)文件 cache 【coordinater上 放內(nèi)存】【done】
            本質(zhì)是guava的Cache<Path, List<HiveFileInfo>> cache類實例 分區(qū)目錄也假設(shè)為不動的。娩贷。
            This can only be applied to sealed directories
                見:StoragePartitionLoader.createInternalHiveSplitIterator 
                    boolean cacheable = isUseListDirectoryCache(session);
                    if (partition.isPresent()) {
                        // Use cache only for sealed partitions
                        cacheable &= partition.get().isSealedPartition();
                    }

            文件的list是根據(jù) hdfs 的 remoteIterator 迭代的 第晰。。不像spark 跑了并行任務(wù)去獲取location信息 全部一起緩存 彬祖。茁瘦。
  • 2)tail/footer cache【在節(jié)點上 也是放內(nèi)存】
            注:OrcDataSource這個類和tail/footer沒關(guān)系 只是封裝了流讀取的一些入口 
            這個類是必須要打開至少一次ORC文件的 

            HiveClientModule -> createOrcFileTailSource 里決定了是否啟用緩存 。储笑。
                Cache<OrcDataSourceId, OrcFileTail> cache

            具體來說

            OrcReader里面的兩個主要元數(shù)據(jù) 都來自 orcFileTailSource提供的OrcFileTail // Slice 里保存了 byte[]
                private final Footer footer; // 文件級別的統(tǒng)計 stripe摘要
                private final Metadata metadata; //stripe級的統(tǒng)計 
                
            還有stripe的StripeMetadataSource -> 這個類提供獲取StripeFooter的方法 
                (StripeFooter 包含一堆Stream 即各列數(shù)據(jù)信息 以及索引信息 StripeReader會用 selectRowGroups )
                這里面會判斷是否要緩存isCachedStream 
                return streamKind == BLOOM_FILTER || streamKind == ROW_INDEX; 

            注意:這個方法調(diào)用時是傳入OrcDataSource的 所以能拿到ORC文件流 但是之后就不需要這個流了甜熔。seek 等也不需要了。
            OrcFileTail orcFileTail = orcFileTailSource.getOrcFileTail(OrcDataSource orcDataSource)

謂詞裁剪(plan層)

  • 1)分區(qū)裁剪
            SqlQueryExecution analyzeQuery
                logicalPlanner plan
                    IterativeOptimizer【這個類類似于scala里面的模式匹配 不同的規(guī)則去catch其對應(yīng)的語法樹節(jié)點去執(zhí)行邏輯】
                    而所有的規(guī)則都在 PlanOptimizers 去添加 每個匹配邏輯是一個Rule類的實現(xiàn)
                        如PickTableLayout 有一個規(guī)則是pickTableLayoutForPredicate
                            hivePartitionResult = partitionManager.getPartitions(
                                這里如果有謂詞 where 就會把tablescan替換成FilterNode(里邊包含tablescan)
            這樣就完成了查詢計劃的替換

            分區(qū)裁剪過程【這里很抽象 謂詞傳遞 命名很不好理解 突倍。腔稀。∮鹄】
  • 2)謂詞表示體系

重要
這里要解釋一個較Domain的類焊虏。。實際上就是表示某個值的范圍(離散值秕磷,范圍诵闭,無窮等)
以及其服務(wù)類:TupleDomain 。澎嚣。是限定了字段 + 值范圍的組合
(PS:這命名實在讓人別扭疏尿。)

            參考 TestTupleDomainFilter 
            還搞了個緩存去防止多次解析 。币叹。
            TupleDomainFilterCache -> Converting Domain into TupleDomainFilter is expensive, hence, we use a cache keyed on Domain

            傳遞到下游的時候 是TupleDomain<Subfield> domainPredicate 
            這里面Subfield是一個可以多層表達(dá)的字段表示
            TupleDomain 是一個泛型Map 大概就是<字段 值范圍>的一個模式润歉。

            Constraint<ColumnHandle> 
                // 這又是另一個表示條件的類 。颈抚。里面封裝了 TupleDomain<T> summary; 
                // 和另一個 Optional<Predicate<Map<T, NullableValue>>> predicate 這個是Java Function接口里面的Predicate 
                // 有幾個主要方法 and/or/test -> 得到返回值是Boolean抽象 踩衩。

            這里面涉及到的泛型有
                ColumnHandle -> 一個空接口 這是presto spi 定義的 各個connector可能有不同實現(xiàn) 
                Map<Column, Domain> effectivePredicate -> 這個Column就是Hive元數(shù)據(jù)里Table下的列嚼鹉,獲取分區(qū)列表時候用到
                HiveColumnHandle -> hive的實現(xiàn) 
                HivePartition -> Map<ColumnHandle, NullableValue> getKeys() //表示field -> value 

讀split邏輯

        具體的task讀的是 hiveSplit 

        弄清楚split切分邏輯【】

        worker上的調(diào)用鏈:
        PrioritizedSplitRunner process
            DriverSplitRunner processFor
                Driver processInternal
                    xxOperator getOutput -> 觸發(fā)計算
                        HivePageSourceProvider createHivePageSource
                            OrcBatchPageSourceFactory createOrcPageSource
                                之后就是ORC的解析 OrcReader -> OrcRecordReader 去讀取到presto的page相關(guān)邏輯了。

是否緩存文件footer元數(shù)據(jù) 不只是開啟了cache配置 還需要選擇的split節(jié)點在期望節(jié)點里 才會去緩存 驱富。即 和nodeSelector策略有關(guān)锚赤。而且這個緩存 是以每個文件粒度調(diào)度的 。(包含在hiveSplit里面)

梳理stage/task/driver/split的并發(fā)關(guān)系

  • Query 根據(jù)SQL語句生成查詢執(zhí)行計劃褐鸥,進(jìn)而生成可以執(zhí)行的查詢(Query)线脚,一個查詢執(zhí)行由Stage、Task叫榕、Driver浑侥、Split、Operator和DataSource組成
  • Stage 執(zhí)行查詢階段 Stage之間是樹狀的結(jié)構(gòu) 晰绎,RootStage 將結(jié)果返回給coordinator 寓落,SourceStage接收coordinator數(shù)據(jù) 其他stage都有上下游 stage分為四種 single(root)、Fixed荞下、source伶选、coordinator_only(DML or DDL)
  • Exchange 兩個stage數(shù)據(jù)的交換通過Exchange 兩種Exchange ;Output Buffer (生產(chǎn)數(shù)據(jù)的stage通過此傳給下游stage)Exchange Client (下游消費)尖昏;如果stage 是source 直接通過connector 讀數(shù)據(jù)
  • 一個Task包含一或多個Driver仰税,是作用于一個Split的一系列Operator集合。一個Driver用于處理一個Split產(chǎn)生相應(yīng)輸出抽诉,輸出由Task收集并傳遞給下游Stage中的Task

核心問題
1)task個數(shù)
正常就是1個stage節(jié)點個數(shù)個陨簇,而presto會盡可能使用資源。每個stage每個節(jié)點都有一個task迹淌。(當(dāng)然是非root stage)
2)driver個數(shù)
其實就是split個數(shù)
3)split個數(shù)(根據(jù)stage的類型不同而不同)

    single(root)-> 1個
    coordinator only -> 元數(shù)據(jù)操作 也是一個
    如果是source的stage -> 由connector的splitmanager決定
    一個文件最少一個split
    remainingInitialSplits 有個參數(shù)影響了maxSplitBytes // 如果計算次數(shù)少于remainingInitialSplits 會采用 maxInitialSplitSize
        否則用配置的maxSplitSize去滾動每個文件生成HiveSplit
                (最后2個split會平衡 避免過小的split 導(dǎo)致時間不太均衡...)
      hive.max-split-size
      hive.max-initial-splits(默認(rèn)200 不調(diào)節(jié)也行塞帐。。需要調(diào)節(jié) maxInitialSplitSize 如果不設(shè)置就是默認(rèn) maxSplitSize/2 )
      hive.max-initial-split-size

    如果是中間stage -> hash_partition_count 這個session 參數(shù)巍沙?還是 task.concurrency 葵姥?

舉例說明:對與讀取hive表來說,1G的數(shù)據(jù)句携,設(shè)置 hive.max-split-size = 64MB榔幸,hive.max-initial-split-size= 64MB,最后才會得到期望的1G/64MB個source split

線程并發(fā)模型

  • task.max-worker-threads // worker啟動的線程池的大小矮嫉,即工作線程個數(shù)
  • task.concurrency // set session task_concurrency=1; 這個影響 agg/join 的并發(fā)
  • task.min-drivers // 默認(rèn)是 task.max-worker-threads x2 削咆,worker最少在執(zhí)行的split數(shù),如果有足夠資源和任務(wù)
  • task.min-drivers-per-task // task最少并行執(zhí)行的split數(shù)
  • initial_splits_per_node // 蠢笋。拨齐。(應(yīng)該是調(diào)度時候)
在taskExecutor的enqueueSplits里
        for (SplitRunner taskSplit : taskSplits) {
            xxx
            scheduleTaskIfNecessary(taskHandle); //按task級別調(diào)度 會用到 task.min-drivers-per-task 即可并發(fā)運行的split 

            addNewEntrants(); 
            //在資源變動( 如task remove/split finish/等時候 去嘗試去調(diào)度更多split 【這里比較模糊。昨寞≌巴铮】用到 task.min-drivers 參數(shù) )
            //比如 task.min-drivers-per-task 是4 task.min-drivers是10 則相當(dāng)于進(jìn)行了2次調(diào)度 厦滤。。
        }

    在Presto中有一個配置query.execution-policy歼狼,它有兩個選項掏导,一個是all-at-once,另一個是 phased // set session execution_policy='phased'; 

    線程和并發(fā)模型:
        SqlTaskExecutionFactory -> SqlTaskExecution
        Coordinator分發(fā)Task到對應(yīng)Worker羽峰,通過HttpClient發(fā)送給節(jié)點上TaskResource提供的RESTful接口
        Worker啟動一個SqlTaskExecution對象或者更新對應(yīng)對象需要處理的Split
            這里能看到每個split其實對應(yīng)一個driverSplitRunner(這個類里面有DriverSplitRunnerFactory)
                    // Enqueue driver runners with split lifecycle for this plan node and driver life cycle combination.
                    ImmutableList.Builder<DriverSplitRunner> runners = ImmutableList.builder();
                    for (ScheduledSplit scheduledSplit : pendingSplits.removeAllSplits()) {
                        // create a new driver for the split
                        runners.add(partitionedDriverRunnerFactory.createDriverRunner(scheduledSplit, lifespan));
                    }
                    enqueueDriverSplitRunner(false, runners.build());

                    在DriverSplitRunner的Process方法里
                    this.driver = driverSplitRunnerFactory.createDriver(driverContext, partitionedSplit);

        TaskExecutor 封裝了TaskRunner(執(zhí)行split的地方 PrioritizedSplitRunner(實現(xiàn)類是DriverSplitRunner))
        TaskExecutor 里具體執(zhí)行任務(wù)是是一個線程池
                config.getMaxWorkerThreads(), // 這個是啟動的固定線程池 趟咆。。不同SQL不同task都在里面執(zhí)行 梅屉。值纱。線程池大小是固定的:task.max-worker-threads
                config.getMinDrivers(),// 這個默認(rèn)是上面 x 2 不知有什么用?
                config.getMinDriversPerTask(), // ?
                config.getMaxDriversPerTask(),
        PrioritizedSplitRunner實現(xiàn)了時間片機制(固定1秒去執(zhí)行split 挑選優(yōu)先級)
        這種調(diào)度是不是犧牲了部分性能 換取迭代 優(yōu)先級 多租戶 多任務(wù)管理 結(jié)果快速反饋機制坯汤。计雌。。
        
        PrioritizedSplitRunner里實際運行的是Driver玫霎,封裝的一堆Operatior 如表Scan/filter/limit/taskoutPut 作用在split上


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市妈橄,隨后出現(xiàn)的幾起案子庶近,更是在濱河造成了極大的恐慌,老刑警劉巖眷蚓,帶你破解...
    沈念sama閱讀 207,113評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件鼻种,死亡現(xiàn)場離奇詭異,居然都是意外死亡沙热,警方通過查閱死者的電腦和手機叉钥,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評論 2 381
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來篙贸,“玉大人投队,你說我怎么就攤上這事【舸ǎ” “怎么了敷鸦?”我有些...
    開封第一講書人閱讀 153,340評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長寝贡。 經(jīng)常有香客問我扒披,道長,這世上最難降的妖魔是什么圃泡? 我笑而不...
    開封第一講書人閱讀 55,449評論 1 279
  • 正文 為了忘掉前任碟案,我火速辦了婚禮,結(jié)果婚禮上颇蜡,老公的妹妹穿的比我還像新娘价说。我一直安慰自己辆亏,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,445評論 5 374
  • 文/花漫 我一把揭開白布熔任。 她就那樣靜靜地躺著褒链,像睡著了一般。 火紅的嫁衣襯著肌膚如雪疑苔。 梳的紋絲不亂的頭發(fā)上甫匹,一...
    開封第一講書人閱讀 49,166評論 1 284
  • 那天,我揣著相機與錄音惦费,去河邊找鬼兵迅。 笑死,一個胖子當(dāng)著我的面吹牛薪贫,可吹牛的內(nèi)容都是我干的恍箭。 我是一名探鬼主播,決...
    沈念sama閱讀 38,442評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼瞧省,長吁一口氣:“原來是場噩夢啊……” “哼扯夭!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起鞍匾,我...
    開封第一講書人閱讀 37,105評論 0 261
  • 序言:老撾萬榮一對情侶失蹤交洗,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后橡淑,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體构拳,經(jīng)...
    沈念sama閱讀 43,601評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,066評論 2 325
  • 正文 我和宋清朗相戀三年梁棠,在試婚紗的時候發(fā)現(xiàn)自己被綠了置森。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,161評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡符糊,死狀恐怖凫海,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情男娄,我是刑警寧澤盐碱,帶...
    沈念sama閱讀 33,792評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站沪伙,受9級特大地震影響瓮顽,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜围橡,卻給世界環(huán)境...
    茶點故事閱讀 39,351評論 3 307
  • 文/蒙蒙 一暖混、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧翁授,春花似錦拣播、人聲如沸晾咪。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽谍倦。三九已至,卻和暖如春泪勒,著一層夾襖步出監(jiān)牢的瞬間昼蛀,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評論 1 261
  • 我被黑心中介騙來泰國打工圆存, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留叼旋,地道東北人。 一個月前我還...
    沈念sama閱讀 45,618評論 2 355
  • 正文 我出身青樓沦辙,卻偏偏與公主長得像夫植,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子油讯,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,916評論 2 344

推薦閱讀更多精彩內(nèi)容