PS:基于 presto-0.258
整體流程
接收語句
DispatchManager createQueryInternal
queryPreparer.prepareQuer // preparedQuery [封裝Statement]
dispatchQueryFactory.createDispatchQuery => DispatchQuery
resourceGroupManager.submit(preparedQuery.getStatement(), dq, selectionContext, queryExecutor)
提交成功
InternalResourceGroup run (LocalDispatchQuery)
InternalResourceGroup startInBackground
LocalDispatchQuery waitForMinimumWorkers
LocalDispatchQuery startExecution
SqlQueryExecution start
開始執(zhí)行
PlanRoot plan = analyzeQuery();
planDistribution(plan);
scheduler.start(); // SqlQueryScheduler
一些細(xì)節(jié)
hive表的元數(shù)據(jù)訪問
元數(shù)據(jù)總體由 HiveMetadata維護(hù)衫樊,里面包含metastore連接,partitionManager以及一些輔助方法利花。
獲取表的元數(shù)據(jù)
StatementAnalyzer visitTable
TableMetadata tableMetadata = metadata.getTableMetadata(session, tableHandle.get());
ConnectorMetadata metadata = getMetadata(session, connectorId); -> HiveMetadata
解析一些
HiveStorageFormat
properties
partitionedBy
bucketProperty
preferredOrderingColumns
orcBloomFilterColumns
orcBloomFilterFfp
comment
等信息
封裝到ConnectorTableMetadata
Source Split的切分
從plan里createStageScheduler
splitSourceProvider // 這里會出現(xiàn)HiveTableLayoutHandle 描述了表的目錄 分區(qū) 字段 謂詞等 甚至有tableParameters
HiveSplitSource allAtOnce //返回的是HiveSplitSource實例 封裝了一個AsyncQueue隊列去存儲split
HiveSplitSource getNextBatch //這是每一批
BackgroundHiveSplitLoader loadSplits //這里觸發(fā)分區(qū) 文件的迭代 和split計算 科侈。。炒事。
StoragePartitionLoader loadPartition //這里有個 DirectoryLister 【重點關(guān)注】
這里夾雜幾種情況
SymlinkTextInputFormat
shouldUseFileSplitsFromInputFormat(inputFormat))
InputSplit[] splits = inputFormat.getSplits(jobConf, 0); 去拿到split 臀栈。。
if (tableBucketInfo.isPresent()) {
不同情況解析split的邏輯不一樣
正常情況是非bucket普通表
是用DirectoryLister去list分區(qū)目錄path 一個文件對應(yīng)一個InternalHiveSplit(也可能被path filter過濾)
Optional<InternalHiveSplit> createInternalHiveSplit(HiveFileInfo fileInfo
這里的邏輯:
1)提取 List<HostAddress> addresses
2)計算分區(qū)這個文件的相對路徑 URI relativePath = partitionInfo.getPath().relativize(path.toUri());
上面返回的只是InternalHiveSplit 還需要在 HiveSplitSource的getNextBatch里變成HiveSplit
queues.borrowBatchAsync(bucketNumber xxx 觸發(fā)future list目錄任務(wù) 挠乳。权薯。
最后對外輸出的是 HiveSplit【封裝了一大堆東西姑躲。∶蓑迹基于maxSplitSize算出來的 即一個文件 可能有多個】
關(guān)于split元數(shù)據(jù)這塊比spark調(diào)度要好很多 因為是流式的 不是靜態(tài)的集合 黍析。。 內(nèi)存需求會少很多屎开。
最主要的是ListenableFuture<?> future = hiveSplitSource.addToQueue(splits.next());
最后輸出的HiveSplit在一個PerBucket + AsyncQueue 組合的復(fù)雜的隊列緩存結(jié)構(gòu)里
節(jié)點選擇 [SOFT Affinity scheduler]
- 這里實際上是用path的哈希取模所有節(jié)點 得到固定的目標(biāo)節(jié)點映射列表
(好像忽略了文件實際位置橄仍。。但是因為這有緩存 包括文件的 所以可能是綜合考慮 如果是hard的話 是不是可能不均衡 牍戚?) - 貌似只適合于存算分離的架構(gòu)。虑粥。
- 如果是存算一體的 建議選HARD Affinity 如孝,即類似spark的preference local node
緩存(Raptorx中的特性)
- 1)文件 cache 【coordinater上 放內(nèi)存】【done】
本質(zhì)是guava的Cache<Path, List<HiveFileInfo>> cache類實例 分區(qū)目錄也假設(shè)為不動的。娩贷。
This can only be applied to sealed directories
見:StoragePartitionLoader.createInternalHiveSplitIterator
boolean cacheable = isUseListDirectoryCache(session);
if (partition.isPresent()) {
// Use cache only for sealed partitions
cacheable &= partition.get().isSealedPartition();
}
文件的list是根據(jù) hdfs 的 remoteIterator 迭代的 第晰。。不像spark 跑了并行任務(wù)去獲取location信息 全部一起緩存 彬祖。茁瘦。
- 2)tail/footer cache【在節(jié)點上 也是放內(nèi)存】
注:OrcDataSource這個類和tail/footer沒關(guān)系 只是封裝了流讀取的一些入口
這個類是必須要打開至少一次ORC文件的
HiveClientModule -> createOrcFileTailSource 里決定了是否啟用緩存 。储笑。
Cache<OrcDataSourceId, OrcFileTail> cache
具體來說
OrcReader里面的兩個主要元數(shù)據(jù) 都來自 orcFileTailSource提供的OrcFileTail // Slice 里保存了 byte[]
private final Footer footer; // 文件級別的統(tǒng)計 stripe摘要
private final Metadata metadata; //stripe級的統(tǒng)計
還有stripe的StripeMetadataSource -> 這個類提供獲取StripeFooter的方法
(StripeFooter 包含一堆Stream 即各列數(shù)據(jù)信息 以及索引信息 StripeReader會用 selectRowGroups )
這里面會判斷是否要緩存isCachedStream
return streamKind == BLOOM_FILTER || streamKind == ROW_INDEX;
注意:這個方法調(diào)用時是傳入OrcDataSource的 所以能拿到ORC文件流 但是之后就不需要這個流了甜熔。seek 等也不需要了。
OrcFileTail orcFileTail = orcFileTailSource.getOrcFileTail(OrcDataSource orcDataSource)
謂詞裁剪(plan層)
- 1)分區(qū)裁剪
SqlQueryExecution analyzeQuery
logicalPlanner plan
IterativeOptimizer【這個類類似于scala里面的模式匹配 不同的規(guī)則去catch其對應(yīng)的語法樹節(jié)點去執(zhí)行邏輯】
而所有的規(guī)則都在 PlanOptimizers 去添加 每個匹配邏輯是一個Rule類的實現(xiàn)
如PickTableLayout 有一個規(guī)則是pickTableLayoutForPredicate
hivePartitionResult = partitionManager.getPartitions(
這里如果有謂詞 where 就會把tablescan替換成FilterNode(里邊包含tablescan)
這樣就完成了查詢計劃的替換
分區(qū)裁剪過程【這里很抽象 謂詞傳遞 命名很不好理解 突倍。腔稀。∮鹄】
- 2)謂詞表示體系
重要
這里要解釋一個較Domain的類焊虏。。實際上就是表示某個值的范圍(離散值秕磷,范圍诵闭,無窮等)
以及其服務(wù)類:TupleDomain 。澎嚣。是限定了字段 + 值范圍的組合
(PS:這命名實在讓人別扭疏尿。)
參考 TestTupleDomainFilter
還搞了個緩存去防止多次解析 。币叹。
TupleDomainFilterCache -> Converting Domain into TupleDomainFilter is expensive, hence, we use a cache keyed on Domain
傳遞到下游的時候 是TupleDomain<Subfield> domainPredicate
這里面Subfield是一個可以多層表達(dá)的字段表示
TupleDomain 是一個泛型Map 大概就是<字段 值范圍>的一個模式润歉。
Constraint<ColumnHandle>
// 這又是另一個表示條件的類 。颈抚。里面封裝了 TupleDomain<T> summary;
// 和另一個 Optional<Predicate<Map<T, NullableValue>>> predicate 這個是Java Function接口里面的Predicate
// 有幾個主要方法 and/or/test -> 得到返回值是Boolean抽象 踩衩。
這里面涉及到的泛型有
ColumnHandle -> 一個空接口 這是presto spi 定義的 各個connector可能有不同實現(xiàn)
Map<Column, Domain> effectivePredicate -> 這個Column就是Hive元數(shù)據(jù)里Table下的列嚼鹉,獲取分區(qū)列表時候用到
HiveColumnHandle -> hive的實現(xiàn)
HivePartition -> Map<ColumnHandle, NullableValue> getKeys() //表示field -> value
讀split邏輯
具體的task讀的是 hiveSplit
弄清楚split切分邏輯【】
worker上的調(diào)用鏈:
PrioritizedSplitRunner process
DriverSplitRunner processFor
Driver processInternal
xxOperator getOutput -> 觸發(fā)計算
HivePageSourceProvider createHivePageSource
OrcBatchPageSourceFactory createOrcPageSource
之后就是ORC的解析 OrcReader -> OrcRecordReader 去讀取到presto的page相關(guān)邏輯了。
是否緩存文件footer元數(shù)據(jù) 不只是開啟了cache配置 還需要選擇的split節(jié)點在期望節(jié)點里 才會去緩存 驱富。即 和nodeSelector策略有關(guān)锚赤。而且這個緩存 是以每個文件粒度調(diào)度的 。(包含在hiveSplit里面)
梳理stage/task/driver/split的并發(fā)關(guān)系
- Query 根據(jù)SQL語句生成查詢執(zhí)行計劃褐鸥,進(jìn)而生成可以執(zhí)行的查詢(Query)线脚,一個查詢執(zhí)行由Stage、Task叫榕、Driver浑侥、Split、Operator和DataSource組成
- Stage 執(zhí)行查詢階段 Stage之間是樹狀的結(jié)構(gòu) 晰绎,RootStage 將結(jié)果返回給coordinator 寓落,SourceStage接收coordinator數(shù)據(jù) 其他stage都有上下游 stage分為四種 single(root)、Fixed荞下、source伶选、coordinator_only(DML or DDL)
- Exchange 兩個stage數(shù)據(jù)的交換通過Exchange 兩種Exchange ;Output Buffer (生產(chǎn)數(shù)據(jù)的stage通過此傳給下游stage)Exchange Client (下游消費)尖昏;如果stage 是source 直接通過connector 讀數(shù)據(jù)
- 一個Task包含一或多個Driver仰税,是作用于一個Split的一系列Operator集合。一個Driver用于處理一個Split產(chǎn)生相應(yīng)輸出抽诉,輸出由Task收集并傳遞給下游Stage中的Task
核心問題
1)task個數(shù)
正常就是1個stage節(jié)點個數(shù)個陨簇,而presto會盡可能使用資源。每個stage每個節(jié)點都有一個task迹淌。(當(dāng)然是非root stage)
2)driver個數(shù)
其實就是split個數(shù)
3)split個數(shù)(根據(jù)stage的類型不同而不同)
single(root)-> 1個
coordinator only -> 元數(shù)據(jù)操作 也是一個
如果是source的stage -> 由connector的splitmanager決定
一個文件最少一個split
remainingInitialSplits 有個參數(shù)影響了maxSplitBytes // 如果計算次數(shù)少于remainingInitialSplits 會采用 maxInitialSplitSize
否則用配置的maxSplitSize去滾動每個文件生成HiveSplit
(最后2個split會平衡 避免過小的split 導(dǎo)致時間不太均衡...)
hive.max-split-size
hive.max-initial-splits(默認(rèn)200 不調(diào)節(jié)也行塞帐。。需要調(diào)節(jié) maxInitialSplitSize 如果不設(shè)置就是默認(rèn) maxSplitSize/2 )
hive.max-initial-split-size
如果是中間stage -> hash_partition_count 這個session 參數(shù)巍沙?還是 task.concurrency 葵姥?
舉例說明:對與讀取hive表來說,1G的數(shù)據(jù)句携,設(shè)置 hive.max-split-size = 64MB榔幸,hive.max-initial-split-size= 64MB,最后才會得到期望的1G/64MB個source split
線程并發(fā)模型
- task.max-worker-threads // worker啟動的線程池的大小矮嫉,即工作線程個數(shù)
- task.concurrency // set session task_concurrency=1; 這個影響 agg/join 的并發(fā)
- task.min-drivers // 默認(rèn)是 task.max-worker-threads x2 削咆,worker最少在執(zhí)行的split數(shù),如果有足夠資源和任務(wù)
- task.min-drivers-per-task // task最少并行執(zhí)行的split數(shù)
- initial_splits_per_node // 蠢笋。拨齐。(應(yīng)該是調(diào)度時候)
在taskExecutor的enqueueSplits里
for (SplitRunner taskSplit : taskSplits) {
xxx
scheduleTaskIfNecessary(taskHandle); //按task級別調(diào)度 會用到 task.min-drivers-per-task 即可并發(fā)運行的split
addNewEntrants();
//在資源變動( 如task remove/split finish/等時候 去嘗試去調(diào)度更多split 【這里比較模糊。昨寞≌巴铮】用到 task.min-drivers 參數(shù) )
//比如 task.min-drivers-per-task 是4 task.min-drivers是10 則相當(dāng)于進(jìn)行了2次調(diào)度 厦滤。。
}
在Presto中有一個配置query.execution-policy歼狼,它有兩個選項掏导,一個是all-at-once,另一個是 phased // set session execution_policy='phased';
線程和并發(fā)模型:
SqlTaskExecutionFactory -> SqlTaskExecution
Coordinator分發(fā)Task到對應(yīng)Worker羽峰,通過HttpClient發(fā)送給節(jié)點上TaskResource提供的RESTful接口
Worker啟動一個SqlTaskExecution對象或者更新對應(yīng)對象需要處理的Split
這里能看到每個split其實對應(yīng)一個driverSplitRunner(這個類里面有DriverSplitRunnerFactory)
// Enqueue driver runners with split lifecycle for this plan node and driver life cycle combination.
ImmutableList.Builder<DriverSplitRunner> runners = ImmutableList.builder();
for (ScheduledSplit scheduledSplit : pendingSplits.removeAllSplits()) {
// create a new driver for the split
runners.add(partitionedDriverRunnerFactory.createDriverRunner(scheduledSplit, lifespan));
}
enqueueDriverSplitRunner(false, runners.build());
在DriverSplitRunner的Process方法里
this.driver = driverSplitRunnerFactory.createDriver(driverContext, partitionedSplit);
TaskExecutor 封裝了TaskRunner(執(zhí)行split的地方 PrioritizedSplitRunner(實現(xiàn)類是DriverSplitRunner))
TaskExecutor 里具體執(zhí)行任務(wù)是是一個線程池
config.getMaxWorkerThreads(), // 這個是啟動的固定線程池 趟咆。。不同SQL不同task都在里面執(zhí)行 梅屉。值纱。線程池大小是固定的:task.max-worker-threads
config.getMinDrivers(),// 這個默認(rèn)是上面 x 2 不知有什么用?
config.getMinDriversPerTask(), // ?
config.getMaxDriversPerTask(),
PrioritizedSplitRunner實現(xiàn)了時間片機制(固定1秒去執(zhí)行split 挑選優(yōu)先級)
這種調(diào)度是不是犧牲了部分性能 換取迭代 優(yōu)先級 多租戶 多任務(wù)管理 結(jié)果快速反饋機制坯汤。计雌。。
PrioritizedSplitRunner里實際運行的是Driver玫霎,封裝的一堆Operatior 如表Scan/filter/limit/taskoutPut 作用在split上