Presto技術(shù)總結(jié) 因為內(nèi)容過長分為了上下兩集
1.環(huán)境準備
Hadoop環(huán)境,Hive環(huán)境办悟,mysql環(huán)境,ssh環(huán)境滩褥,presto本機debug環(huán)境
推薦hadoop2.2.0病蛉、hive1.2.1、mysql5.7、openssh-server&client铺然、presto最新版本
presto本地debug環(huán)境搭建參考presto in idea
2.查詢?nèi)肟?amp;流程
所有的查詢首先打到StatementResource對應(yīng)的路徑為@Path("/v1/statement")
Query query = Query.create(
sessionContext,
statement, //實際的sql
queryManager,
sessionPropertyManager,
exchangeClient,
responseExecutor,
timeoutExecutor,
blockEncodingSerde);
queries.put(query.getQueryId(), query); //創(chuàng)建query并放入執(zhí)行隊列
Query類中執(zhí)行
Query result = new Query(sessionContext, query, queryManager, sessionPropertyManager, exchangeClient, dataProcessorExecutor, timeoutExecutor, blockEncodingSerde);
Query類中的queryManager
QueryInfo queryInfo = queryManager.createQuery(sessionContext, query); //其中sessionContext為用戶的seesion信息俗孝,query為用戶sql
2.1詞法語法分析生成AST
queryManager是一個接口目前只有SqlQueryManager的實現(xiàn)類,createQuery方法
private final ConcurrentMap<QueryId, QueryExecution> queries = new ConcurrentHashMap<>();
//QueryQueueManager是一個接口魄健,sql相關(guān)的實現(xiàn)類SqlQueryQueueManager
private final QueryQueueManager queueManager;
//主要實現(xiàn)的邏輯赋铝,詞法和語法分析,生成AstNode
Statement wrappedStatement = sqlParser.createStatement(query, createParsingOptions(session));
statement = unwrapExecuteStatement(wrappedStatement, sqlParser, session);
List<Expression> parameters = wrappedStatement instanceof Execute ? ((Execute) wrappedStatement).getParameters() : emptyList();
//參數(shù)校驗
validateParameters(statement, parameters);
//獲取對應(yīng)的query執(zhí)行器工廠
QueryExecutionFactory<?> queryExecutionFactory = executionFactories.get(statement.getClass());
//query執(zhí)行器工廠創(chuàng)建query執(zhí)行器
queryExecution = queryExecutionFactory.createQueryExecution(queryId, query, session, statement, parameters);
//將query執(zhí)行器個queryId映射到map中
queries.put(queryId, queryExecution);
//將query執(zhí)行器提交到queueManager
queueManager.submit(statement, queryExecution, queryExecutor);
//返回query信息
return queryInfo;
SqlQueryQueueManager的submit方法
List<QueryQueue> queues;
try {
//按照配置的規(guī)則诀艰,選擇執(zhí)行隊列
queues = selectQueues(queryExecution.getSession(), executor);
}
catch (PrestoException e) {
queryExecution.fail(e);
return;
}
for (QueryQueue queue : queues) {
if (!queue.reserve(queryExecution)) {
queryExecution.fail(new PrestoException(QUERY_QUEUE_FULL, "Too many queued queries"));
return;
}
}
//如果符合規(guī)則則入隊
queues.get(0).enqueue(createQueuedExecution(queryExecution, queues.subList(1, queues.size()), executor));
//按照配置的規(guī)則柬甥,選擇執(zhí)行隊列
private List<QueryQueue> selectQueues(Session session, Executor executor)
{
for (QueryQueueRule rule : rules) {
Optional<List<QueryQueueDefinition>> queues = rule.match(session.toSessionRepresentation());
if (queues.isPresent()) {
//獲取或者創(chuàng)建一個Query隊列
return getOrCreateQueues(session, executor, queues.get());
}
}
throw new PrestoException(QUERY_REJECTED, "Query did not match any queuing rule");
}
//獲取或者創(chuàng)建一個Query隊列
private List<QueryQueue> getOrCreateQueues(Session session, Executor executor, List<QueryQueueDefinition> definitions)
{
ImmutableList.Builder<QueryQueue> queues = ImmutableList.builder();
for (QueryQueueDefinition definition : definitions) {
String expandedName = definition.getExpandedTemplate(session);
QueueKey key = new QueueKey(definition, expandedName);
if (!queryQueues.containsKey(key)) {
QueryQueue queue = new QueryQueue(executor, definition.getMaxQueued(), definition.getMaxConcurrent());
if (queryQueues.putIfAbsent(key, queue) == null) {
// Export the mbean, after checking for races
String objectName = ObjectNames.builder(QueryQueue.class, definition.getTemplate()).withProperty("expansion", expandedName).build();
mbeanExporter.export(objectName, queue);
}
}
queues.add(queryQueues.get(key));
}
return queues.build();
}
QueryQueue(Executor queryExecutor, int maxQueuedQueries, int maxConcurrentQueries)
{
requireNonNull(queryExecutor, "queryExecutor is null");
checkArgument(maxQueuedQueries > 0, "maxQueuedQueries must be greater than zero");
checkArgument(maxConcurrentQueries > 0, "maxConcurrentQueries must be greater than zero");
int permits = maxQueuedQueries + maxConcurrentQueries;
// Check for overflow
checkArgument(permits > 0, "maxQueuedQueries + maxConcurrentQueries must be less than or equal to %s", Integer.MAX_VALUE);
this.queuePermits = new AtomicInteger(permits);
this.asyncSemaphore = new AsyncSemaphore<>(maxConcurrentQueries,
queryExecutor,
queueEntry -> {
QueuedExecution queuedExecution = queueEntry.dequeue();
if (queuedExecution != null) {
queuedExecution.start();
return queuedExecution.getCompletionFuture();
}
return Futures.immediateFuture(null);
});
}
public void start()
{
// Only execute if the query is not already completed (e.g. cancelled)
if (listenableFuture.isDone()) {
return;
}
if (nextQueues.isEmpty()) {
executor.execute(() -> {
try (SetThreadName ignored = new SetThreadName("Query-%s", queryExecution.getQueryId())) {
//將statement轉(zhuǎn)化為analysis(plan)
queryExecution.start();
}
});
}
else {
nextQueues.get(0).enqueue(new QueuedExecution(queryExecution, nextQueues.subList(1, nextQueues.size()), executor, listenableFuture));
}
}
2.2語義分析&生成邏輯執(zhí)行計劃
2.2.1語義分析
先看看SqlQueryExecution類的構(gòu)成
private final QueryStateMachine stateMachine;
private final Statement statement; //詞法語法分析生成的astNode
private final Metadata metadata;
private final AccessControl accessControl;
private final SqlParser sqlParser; //sql解析器
private final SplitManager splitManager;
private final NodePartitioningManager nodePartitioningManager;
private final NodeScheduler nodeScheduler; //將task分配給node的核心模塊,stage調(diào)度的時候會詳細說明
private final List<PlanOptimizer> planOptimizers;
private final RemoteTaskFactory remoteTaskFactory;
private final LocationFactory locationFactory;
private final int scheduleSplitBatchSize;
private final ExecutorService queryExecutor;
private final ScheduledExecutorService schedulerExecutor;
private final FailureDetector failureDetector;
private final QueryExplainer queryExplainer;
private final PlanFlattener planFlattener;
private final CostCalculator costCalculator;
private final AtomicReference<SqlQueryScheduler> queryScheduler = new AtomicReference<>();
private final AtomicReference<Plan> queryPlan = new AtomicReference<>();
private final NodeTaskMap nodeTaskMap;
private final ExecutionPolicy executionPolicy;
private final List<Expression> parameters;
private final SplitSchedulerStats schedulerStats;
SqlQueryExecution類的start方法
PlanRoot plan = analyzeQuery(); //生成邏輯執(zhí)行計劃
//調(diào)用棧為
analyzeQuery() -> doAnalyzeQuery()
doAnalyzeQuery()
{
//創(chuàng)建語義分析器
Analyzer analyzer = new Analyzer(stateMachine.getSession(), metadata, sqlParser, accessControl, Optional.of(queryExplainer), parameters);
//開始語義分析
Analysis analysis = analyzer.analyze(statement);
//生成邏輯Planner
LogicalPlanner logicalPlanner = new LogicalPlanner(stateMachine.getSession(), planOptimizers, idAllocator, metadata, sqlParser, costCalculator);
//邏輯Planner開始生成邏輯執(zhí)行計劃其垄,還涉及到邏輯執(zhí)行計劃的優(yōu)化
Plan plan = logicalPlanner.plan(analysis);
queryPlan.set(plan);
//對邏輯執(zhí)行計劃進行分段苛蒲,準備生成分布式執(zhí)行計劃
SubPlan fragmentedPlan = PlanFragmenter.createSubPlans(stateMachine.getSession(), metadata, nodePartitioningManager, plan, false);
return new PlanRoot(fragmentedPlan, !explainAnalyze, extractConnectors(analysis));
}
Analyzer類的analyze方法
//sql重寫
Statement rewrittenStatement = StatementRewrite.rewrite(session, metadata, sqlParser, queryExplainer, statement, parameters, accessControl);
//初始化Analysis
Analysis analysis = new Analysis(rewrittenStatement, parameters, isDescribe);
//創(chuàng)建Statement分析器
StatementAnalyzer analyzer = new StatementAnalyzer(analysis, metadata, sqlParser, accessControl, session);
//調(diào)用Statement分析器去分析
analyzer.analyze(rewrittenStatement, Optional.empty());
analyze里面的具體實現(xiàn)就是遍歷ASTNode對每種類型的Node作分析,主要是獲取meta和校驗元信息
2.2.2生成邏輯執(zhí)行計劃
LogicalPlanner類的plan方法
PlanNode root = planStatement(analysis, analysis.getStatement());
//檢查執(zhí)行計劃的有效性
PlanSanityChecker.validateIntermediatePlan(root, session, metadata, sqlParser, symbolAllocator.getTypes());
//對生成的邏輯執(zhí)行進行優(yōu)化
root = optimizer.optimize(root, session, symbolAllocator.getTypes(), symbolAllocator, idAllocator);
LogicalPlanner類的planStatement方法
//對于普通的sql來說绿满,只執(zhí)行下面
createOutputPlan(planStatementWithoutOutput(analysis, statement), analysis);
LogicalPlanner類的planStatementWithoutOutput方法
private RelationPlan planStatementWithoutOutput(Analysis analysis, Statement statement)
{
if (statement instanceof CreateTableAsSelect) {
if (analysis.isCreateTableAsSelectNoOp()) {
throw new PrestoException(NOT_SUPPORTED, "CREATE TABLE IF NOT EXISTS is not supported in this context " + statement.getClass().getSimpleName());
}
return createTableCreationPlan(analysis, ((CreateTableAsSelect) statement).getQuery());
}
else if (statement instanceof Insert) {
checkState(analysis.getInsert().isPresent(), "Insert handle is missing");
return createInsertPlan(analysis, (Insert) statement);
}
else if (statement instanceof Delete) {
return createDeletePlan(analysis, (Delete) statement);
}
else if (statement instanceof Query) {
return createRelationPlan(analysis, (Query) statement);
}
else if (statement instanceof Explain && ((Explain) statement).isAnalyze()) {
return createExplainAnalyzePlan(analysis, (Explain) statement);
}
else {
throw new PrestoException(NOT_SUPPORTED, "Unsupported statement type " + statement.getClass().getSimpleName());
}
}
LogicalPlanner類的createRelationPlan方法
return new RelationPlanner(analysis, symbolAllocator, idAllocator, buildLambdaDeclarationToSymbolMap(analysis, symbolAllocator), metadata, session)
.process(query, null);
RelationPlanner類具體的實現(xiàn)就是臂外,遍歷ASTNode,生成邏輯執(zhí)行計劃里面對應(yīng)的Node
邏輯執(zhí)行計劃中常見的Node和Visit操作如下面所示:
AggregationNode 聚合操作的節(jié)點喇颁,有Final漏健、partial、single三種橘霎,表示最終聚合蔫浆、局部聚合和單點聚合,在執(zhí)行計劃優(yōu)化前姐叁,聚合類型都是單點聚合瓦盛,在優(yōu)化器中會拆成局部聚合和最終聚合,類似于MR任務(wù)中的外潜,map端局部reduce原环,和reduce端最終reduce
DeleteNode Delete操作的節(jié)點
ExchangeNode 邏輯執(zhí)行計劃中,不同Stage之間交換數(shù)據(jù)的節(jié)點
FilterNode 進行Filter過濾操作的節(jié)點
JoinNode 執(zhí)行Join操作的節(jié)點
LimitNode 執(zhí)行l(wèi)imit操作的節(jié)點
MarkDistinctNode 處理count distinct
OutputNode 輸出Node
ProjectNode 將下層的節(jié)點輸出列映射成上層節(jié)點 例如:select a + 1 from b將TableScanNode的a列 + 1 映射到OutputNode
RemoteSourceNode 類似于ExchangeNode处窥,在分布式執(zhí)行計劃中嘱吗,不同Stage之間交換數(shù)據(jù)的節(jié)點
SampleNode 抽樣函數(shù)Node
RowNumberNode 處理窗函數(shù)RowNumber
SortNode 排序Node
TableScanNode 讀取表的數(shù)據(jù)
TableWriterNode 寫入表的數(shù)據(jù)
TopNNode order by ... limit 會使用效率更高的TopNNode
UnionNode 處理Union操作
WindowNode 處理窗口函數(shù)
RelationPlanner類的visit操作
visitTable 生成TableScanNode
visitAliasedRelation 處理有別名的Relation
visitSampledRelation 添加一個SampleNode,主要處理抽樣函數(shù)
visitJoin 根據(jù)不同的join類型滔驾,生成不同的節(jié)點結(jié)構(gòu)谒麦,一般來說是將左右兩邊生成對應(yīng)的queryPlan,然后左右各添加一個ProjectNode,中間添加一個JoinNode相連嵌灰,讓上層添加一個FilterNode弄匕,F(xiàn)ilterNode為join條件
visitQuery 使用QueryPlanner處理Query,并返回生成的執(zhí)行計劃
visitQuerySpecification 使用QueryPlanner處理QueryBody沽瞭,并返回生成的執(zhí)行計劃
QueryPlanner類的plan操作(queryBody的定義就是指一個完整的sql迁匠,可以嵌套,例如select a from QueryBody b,通常來說里面的這個QueryBody會被當做AliasedRelation繼續(xù)plan)
Query和QuerySpecification相比,QuerySpecification代表完整的QueryBody城丧,而Query則包含了QueryBody--QueryBody是一個抽象類延曙,QuerySpecification繼承了QueryBody
plan(Query query) 首先取出Query中的queryBody,然后調(diào)用RelationPlanner進行分析,調(diào)用其visitQuerySpecification然后RelationPlanner調(diào)用QueryPlanner的plan方法
plan(QuerySpecification query) 生成一個queryBody中所有的組件Node
下面是最主要的 plan(QuerySpecification query)的plan過程
PlanBuilder builder = planFrom(node); //builder的root即為生成的NodeTree
RelationPlan fromRelationPlan = builder.getRelationPlan(); //生成TableScanNode
builder = filter(builder, analysis.getWhere(node), node); //生成FilterNode
builder = aggregate(builder, node); //生成AggregateNode
builder = filter(builder, analysis.getHaving(node), node); //如果有Having則生成FilterNode
builder = window(builder, node); //生成windowNode
List<Expression> outputs = analysis.getOutputExpressions(node);
builder = handleSubqueries(builder, node, outputs);
if (node.getOrderBy().isPresent() && !SystemSessionProperties.isLegacyOrderByEnabled(session)) {
if (analysis.getGroupingSets(node).isEmpty()) {
builder = project(builder, outputs, fromRelationPlan);
outputs = toSymbolReferences(computeOutputs(builder, outputs));
builder = planBuilderFor(builder, analysis.getScope(node.getOrderBy().get()));
}
else {
List<Expression> orderByAggregates = analysis.getOrderByAggregates(node.getOrderBy().get());
builder = project(builder, Iterables.concat(outputs, orderByAggregates));
outputs = toSymbolReferences(computeOutputs(builder, outputs));
List<Expression> complexOrderByAggregatesToRemap = orderByAggregates.stream()
.filter(expression -> !analysis.isColumnReference(expression))
.collect(toImmutableList());
builder = planBuilderFor(builder, analysis.getScope(node.getOrderBy().get()), complexOrderByAggregatesToRemap);
}
builder = window(builder, node.getOrderBy().get());
}
List<Expression> orderBy = analysis.getOrderByExpressions(node);
builder = handleSubqueries(builder, node, orderBy);
builder = project(builder, Iterables.concat(orderBy, outputs));
builder = distinct(builder, node);
builder = sort(builder, node);
builder = project(builder, outputs);
builder = limit(builder, node);
return new RelationPlan(
builder.getRoot(),
analysis.getScope(node),
computeOutputs(builder, outputs));
2.2.3邏輯執(zhí)行計劃優(yōu)化
LogicalPlanner類的plan方法
root = optimizer.optimize(root, session, symbolAllocator.getTypes(), symbolAllocator, idAllocator);
optimizer優(yōu)化器的具體實現(xiàn)就是亡哄,調(diào)用不同的具體實現(xiàn)的優(yōu)化去對枝缔,上一步生成的NodeTree(邏輯執(zhí)行計劃)進行逐個優(yōu)化
具體的優(yōu)化方法
AddExchanges //生成分布式執(zhí)行計劃,例如添加局部聚合和最終聚合
AddLocalExchanges
BeginTableWrite
CanonicalizeExpressions //將執(zhí)行計劃中的表達式標準化蚊惯,比如將is not null 改寫為not(is null)愿卸,將if語句改寫為case when
CheckSubqueryNodesAreRewritten
CountConstantOptimizer //將count(a)改寫為count(*)提高不同數(shù)據(jù)源的兼容性
DesugaringOptimizer
DetermineJoinDistributionType
EliminateCrossJoins
EmptyDeleteOptimizer
HashGenerationOptimizer //提前進行hash計算
ImplementIntersectAndExceptAsUnion
IndexJoinOptimizer //將Join優(yōu)化為IndexJoiJ,獲取Join表的索引截型,提升速度
IterativeOptimizer
LimitPushDown //limit條件下推趴荸,減小下層節(jié)點的數(shù)據(jù)量
MetadataDeleteOptimizer
MetadataQueryOptimizer //將對表的分區(qū)字段進行的聚合操作,改寫為針對表元數(shù)據(jù)的查詢宦焦,減少讀取表的操作
OptimizeMixedDistinctAggregations
PickLayout
PredicatePushDown //謂詞(過濾條件)下推发钝,減下下層節(jié)點的數(shù)據(jù)量
ProjectionPushDown //ProjectNode下推,減少Union節(jié)點的數(shù)據(jù)量
PruneUnreferencedOutputs //去除ProjectNodeP不在最終輸出中的列波闹,減小計算量
PruneRedundantProjections //去除多余的projectNode酝豪,如果上下節(jié)點全都直接映射,則去掉該層projectNode
PushTableWriteThroughUnion
RemoveUnreferencedScalarLateralNodes
SetFlatteningOptimizer //合并能夠合并的Union語句
SimplifyExpressions //對執(zhí)行計劃中涉及到的表達式進行化簡和優(yōu)化
TransformCorrelatedNoAggregationSubqueryToJoin
TransformCorrelatedNoAggregationSubqueryToJoin
TransformCorrelatedScalarAggregationToJoin
TransformCorrelatedSingleRowSubqueryToProject
TransformQuantifiedComparisonApplyToLateralJoin
TransformUncorrelatedInPredicateSubqueryToSemiJoin
TransformUncorrelatedLateralToJoin
UnaliasSymbolReferences //去除執(zhí)行計劃中projectNode無意義的映射精堕,如果列直接相對而沒有帶表達式則直接映射到上層節(jié)點
WindowFilterPushDown
2.3生成分布式執(zhí)行計劃
2.3.1邏輯執(zhí)行計劃分段
這個階段會將上面生成的邏輯執(zhí)行計劃切分為孵淘,多個Stage,其中Stage的階段分為四個階段:
Sourece歹篓、Fixed夺英、Single
Sourece:一般是TableScanNode、ProjectNode滋捶、FilterNode,一般是最下游的取數(shù)的Stage
Fixed:一般在Sourece之后余黎,將Sourece階段獲取的數(shù)據(jù)分散到多個節(jié)點上處理重窟,類似于Map端reduce操作,包括局部聚合惧财、局部Join巡扇、局部數(shù)據(jù)寫入
Single:一般在Fixed之后,只在一臺機器上進行垮衷,匯總所有的結(jié)果厅翔、做最終聚合、全局排序搀突,并將結(jié)果傳輸給Coordinator
Coordinator_only:只在Coordinator上
SqlQueryExecution類doAnalyzeQuery方法
SubPlan fragmentedPlan = PlanFragmenter.createSubPlans(stateMachine.getSession(), metadata, nodePartitioningManager, plan, false);
//SubPlan類的構(gòu)造
private final PlanFragment fragment;
private final List<SubPlan> children;
可以看出來刀闷,內(nèi)部是類似于B樹的樹形結(jié)構(gòu),這樣就將邏輯執(zhí)行計劃切分為了若干個段
2.3.2生成分布式執(zhí)行計劃
2.3.2.1獲取SplitSource分片
SqlQueryExecution類start方法
//生成分段的執(zhí)行計劃
PlanRoot plan = analyzeQuery();
//生成分布式執(zhí)行計劃
planDistribution(plan);
SqlQueryExecution類planDistribution方法
//獲取stage的執(zhí)行計劃
StageExecutionPlan outputStageExecutionPlan = distributedPlanner.plan(plan.getRoot(), stateMachine.getSession());
//創(chuàng)建SqlQuery調(diào)度
SqlQueryScheduler scheduler = new SqlQueryScheduler(
//狀態(tài)監(jiān)聽器
stateMachine,
locationFactory,
outputStageExecutionPlan,
nodePartitioningManager,
//將task分配給node的核心模塊
nodeScheduler,
remoteTaskFactory,
stateMachine.getSession(),
plan.isSummarizeTaskInfos(),
scheduleSplitBatchSize,
queryExecutor,
schedulerExecutor,
failureDetector,
rootOutputBuffers,
//保存了當前stage分配的task和node的映射關(guān)系
nodeTaskMap,
executionPolicy,
schedulerStats);
DistributedExecutionPlanner類plan方法
調(diào)用棧為plan -> doPlan
private StageExecutionPlan doPlan(SubPlan root, Session session, ImmutableList.Builder<SplitSource> allSplitSources)
{
PlanFragment currentFragment = root.getFragment();
//visitor模式遍歷分段,對TableScaTNode進行splitManager.getSplits()操作來獲取分片甸昏,實現(xiàn)類是HiveSplitManager顽分,內(nèi)部實現(xiàn)是調(diào)用HiveSplitLoader.start()方法,下面進行詳細說明
//這里好像說明了一個stage里面只能有一個tableScan施蜜?卒蘸??
Map<PlanNodeId, SplitSource> splitSources = currentFragment.getRoot().accept(new Visitor(session, currentFragment.getPipelineExecutionStrategy(), allSplitSources), null);
ImmutableList.Builder<StageExecutionPlan> dependencies = ImmutableList.builder();
for (SubPlan childPlan : root.getChildren()) {
dependencies.add(doPlan(childPlan, session, allSplitSources)); //此處會遞歸的調(diào)用翻默,將子邏輯執(zhí)行計劃全部轉(zhuǎn)化為帶用層級關(guān)系的stage執(zhí)行計劃
}
return new StageExecutionPlan(
currentFragment,
splitSources,
dependencies.build());
}
對TableScaTNode進行splitManager.getSplits()操作來獲取分片缸沃,并將結(jié)果保存在Map<PlanNodeId, SplitSource> splitSources中(其實就是每個Node對應(yīng)的SplitSource)
public Map<PlanNodeId, SplitSource> visitTableScan(TableScanNode node, Void context)
{
// get dataSource for table
SplitSource splitSource = splitManager.getSplits(
session,
node.getLayout().get(),
pipelineExecutionStrategy == GROUPED_EXECUTION ? GROUPED_SCHEDULING : UNGROUPED_SCHEDULING);
splitSources.add(splitSource);
return ImmutableMap.of(node.getId(), splitSource);
}
HiveSplitManager實現(xiàn)ConnectorSplitManager(Presto SPI接口)
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layoutHandle, SplitSchedulingStrategy splitSchedulingStrategy)
{
HiveTableLayoutHandle layout = (HiveTableLayoutHandle) layoutHandle;
SchemaTableName tableName = layout.getSchemaTableName();
// get table metadata
SemiTransactionalHiveMetastore metastore = metastoreProvider.apply((HiveTransactionHandle) transaction);
Table table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));
// verify table is not marked as non-readable
String tableNotReadable = table.getParameters().get(OBJECT_NOT_READABLE);
if (!isNullOrEmpty(tableNotReadable)) {
throw new HiveNotReadableException(tableName, Optional.empty(), tableNotReadable);
}
// 獲取hive的分區(qū)
List<HivePartition> partitions = layout.getPartitions()
.orElseThrow(() -> new PrestoException(GENERIC_INTERNAL_ERROR, "Layout does not contain partitions"));
// short circuit if we don't have any partitions
HivePartition partition = Iterables.getFirst(partitions, null);
if (partition == null) {
return new FixedSplitSource(ImmutableList.of());
}
// get buckets from first partition (arbitrary)
List<HiveBucket> buckets = partition.getBuckets();
// validate bucket bucketed execution
Optional<HiveBucketHandle> bucketHandle = layout.getBucketHandle();
if ((splitSchedulingStrategy == GROUPED_SCHEDULING) && !bucketHandle.isPresent()) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, "SchedulingPolicy is bucketed, but BucketHandle is not present");
}
// sort partitions
partitions = Ordering.natural().onResultOf(HivePartition::getPartitionId).reverse().sortedCopy(partitions);
Iterable<HivePartitionMetadata> hivePartitions = getPartitionMetadata(metastore, table, tableName, partitions, bucketHandle.map(HiveBucketHandle::toBucketProperty));
HiveSplitLoader hiveSplitLoader = new BackgroundHiveSplitLoader(
table,
hivePartitions,
layout.getCompactEffectivePredicate(),
createBucketSplitInfo(bucketHandle, buckets),
session,
hdfsEnvironment,
namenodeStats,
directoryLister,
executor,
splitLoaderConcurrency,
recursiveDfsWalkerEnabled);
HiveSplitSource splitSource;
switch (splitSchedulingStrategy) {
case UNGROUPED_SCHEDULING:
splitSource = HiveSplitSource.allAtOnce(
session,
table.getDatabaseName(),
table.getTableName(),
layout.getCompactEffectivePredicate(),
maxInitialSplits,
maxOutstandingSplits,
maxOutstandingSplitsSize,
hiveSplitLoader,
executor,
new CounterStat());
break;
case GROUPED_SCHEDULING:
splitSource = HiveSplitSource.bucketed(
session,
table.getDatabaseName(),
table.getTableName(),
layout.getCompactEffectivePredicate(),
maxInitialSplits,
maxOutstandingSplits,
new DataSize(32, MEGABYTE),
hiveSplitLoader,
executor,
new CounterStat());
break;
default:
throw new IllegalArgumentException("Unknown splitSchedulingStrategy: " + splitSchedulingStrategy);
}
hiveSplitLoader.start(splitSource);
return splitSource;
}
2.3.2.2產(chǎn)生stage執(zhí)行計劃
上面產(chǎn)生了一個StageExecutionPlan(stage執(zhí)行計劃),下面看看StageExecutionPlan的結(jié)構(gòu)**
private final PlanFragment fragment; //當前執(zhí)行計劃分段
private final Map<PlanNodeId, SplitSource> splitSources; //從HiveSplitManager獲取的分片映射關(guān)系
private final List<StageExecutionPlan> subStages; //子執(zhí)行計劃分段
private final Optional<List<String>> fieldNames; //字段名稱
經(jīng)過planDistribution方法之后修械,分段的邏輯執(zhí)行計劃就轉(zhuǎn)化成了stage執(zhí)行計劃趾牧,而presto對task的調(diào)度都是基于stage來調(diào)度的,緊接著SqlQueryScheduler會構(gòu)造SqlStage執(zhí)行器
SqlQueryScheduler類的構(gòu)造方法
List<SqlStageExecution> stages = createStages(
(fragmentId, tasks, noMoreExchangeLocations) -> updateQueryOutputLocations(queryStateMachine, rootBufferId, tasks, noMoreExchangeLocations),
new AtomicInteger(),
locationFactory,
plan.withBucketToPartition(Optional.of(new int[1])),
nodeScheduler,
remoteTaskFactory,
session,
splitBatchSize,
partitioningHandle -> partitioningCache.computeIfAbsent(partitioningHandle, handle -> nodePartitioningManager.getNodePartitioningMap(session, handle)),
nodePartitioningManager,
queryExecutor,
schedulerExecutor,
failureDetector,
nodeTaskMap,
stageSchedulers,
stageLinkages);
SqlStageExecution rootStage = stages.get(0);
2.3.2.3產(chǎn)生stage執(zhí)行器
SqlQueryScheduler類的createStages方法
ImmutableList.Builder<SqlStageExecution> stages = ImmutableList.builder();
StageId stageId = new StageId(queryStateMachine.getQueryId(), nextStageId.getAndIncrement());
SqlStageExecution stage = new SqlStageExecution( //創(chuàng)建當前的SqlStageExecution
stageId,
locationFactory.createStageLocation(stageId),
plan.getFragment(),
remoteTaskFactory,
session,
summarizeTaskInfo,
nodeTaskMap,
queryExecutor,
failureDetector,
schedulerStats);
stages.add(stage);
...
...
//中間省略創(chuàng)建stage調(diào)度器和分配策略的步驟祠肥,詳情見2.4.3
ImmutableSet.Builder<SqlStageExecution> childStagesBuilder = ImmutableSet.builder();
for (StageExecutionPlan subStagePlan : plan.getSubStages()) {
List<SqlStageExecution> subTree = createStages( //遞歸創(chuàng)建所有的子SqlStageExecution
stage::addExchangeLocations,
nextStageId,
locationFactory,
subStagePlan.withBucketToPartition(bucketToPartition),
nodeScheduler,
remoteTaskFactory,
session,
splitBatchSize,
partitioningCache,
nodePartitioningManager,
queryExecutor,
schedulerExecutor,
failureDetector,
nodeTaskMap,
stageSchedulers,
stageLinkages);
stages.addAll(subTree);
SqlStageExecution childStage = subTree.get(0);
childStagesBuilder.add(childStage);
}
Set<SqlStageExecution> childStages = childStagesBuilder.build();
至此所有SqlStageExecution生成完畢武氓,下面看一下SqlStageExecution的簡化構(gòu)成
private final StageStateMachine stateMachine; //stage狀態(tài)監(jiān)聽器
private final RemoteTaskFactory remoteTaskFactory; //生成Task的工廠類
private final NodeTaskMap nodeTaskMap; //保存當前stage分配的task和節(jié)點映射列表
private final Map<Node, Set<RemoteTask>> tasks = new ConcurrentHashMap<>();
private final AtomicInteger nextTaskId = new AtomicInteger();
private final Set<TaskId> allTasks = newConcurrentHashSet();
private final Set<TaskId> finishedTasks = newConcurrentHashSet();
private final Multimap<PlanNodeId, RemoteTask> sourceTasks = HashMultimap.create();
2.4生成分布式執(zhí)行計劃調(diào)度
2.4.1調(diào)度相關(guān)的服務(wù)類
先介紹一下上文提到的SqlQueryExecution中的NodeScheduler類
主要包括成員
InternalNodeManager nodeManager //獲取存活的節(jié)點列表,保存在NodeMap里面仇箱,定時更新內(nèi)容县恕,默認5秒
主要包括方法
List<Node> selectNodes //選取存活的Node列表
NodeSelector createNodeSelector //提供了NodeSelector,其中包括各個stage中task分配的算法
ResettableRandomizedIterator<Node> randomizedNodes //打亂給定的NodeMap
InternalNodeManager接口的定義為
public interface InternalNodeManager
{
Set<Node> getNodes(NodeState state); //獲取指定狀態(tài)的節(jié)點列表
Set<Node> getActiveConnectorNodes(ConnectorId connectorId); //根據(jù)connectorId獲取節(jié)點列表
Node getCurrentNode(); //獲取當前節(jié)點信息
Set<Node> getCoordinators(); //獲取Coordinator列表
AllNodes getAllNodes(); //獲取所有的節(jié)點列表
void refreshNodes(); //刷新節(jié)點的信息
}
//有DiscoveryNodeManager的實現(xiàn)類
NodeSelector接口定義為
public interface NodeSelector
{
void lockDownNodes();
List<Node> allNodes(); //選擇所有的節(jié)點
Node selectCurrentNode(); //選擇當前節(jié)點
List<Node> selectRandomNodes(int limit) //選擇limit個隨機的節(jié)點
List<Node> selectRandomNodes(int limit, Set<Node> excludedNodes); //選擇limit個隨機的節(jié)點排除給定的節(jié)點
SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks);
SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks, NodePartitionMap partitioning);
}
//SimpleNodeSelector和TopologyAwareNodeSelector實現(xiàn)類 Presto會根據(jù)不同的網(wǎng)絡(luò)拓撲結(jié)構(gòu)來選擇不同的NodeSelector
//在NodeScheduler的構(gòu)造方法中,只要不是 LEGACY網(wǎng)絡(luò) 就認為使用了網(wǎng)絡(luò)拓撲,LEGACY網(wǎng)絡(luò)指的是歷史的網(wǎng)絡(luò)剂桥,采用了非TCP/IP的網(wǎng)絡(luò)
this.useNetworkTopology = !config.getNetworkTopology().equals(NetworkTopologyType.LEGACY);
//在createNodeSelector方法中忠烛,實例化了NodeSelector
if (useNetworkTopology) {
//所以只要你的網(wǎng)絡(luò)使用了TCP/IP協(xié)議,實例化的NodeSelector都是TopologyAwareNodeSelector
return new TopologyAwareNodeSelector(
nodeManager,
nodeTaskMap,
includeCoordinator,
nodeMap,
minCandidates,
maxSplitsPerNode,
maxPendingSplitsPerTask,
topologicalSplitCounters,
networkLocationSegmentNames,
networkLocationCache);
}
else {
return new SimpleNodeSelector(nodeManager, nodeTaskMap, includeCoordinator, nodeMap, minCandidates, maxSplitsPerNode, maxPendingSplitsPerTask);
}
Node的定義為
public interface Node
{
HostAddress getHostAndPort(); //host和port
URI getHttpUri(); //url
String getNodeIdentifier();
String getVersion(); //version
boolean isCoordinator(); //是不是Coordinator
}
創(chuàng)建createNodeSelector過程
public NodeSelector createNodeSelector(ConnectorId connectorId)
{
//采用了谷歌的Supplier緩存技術(shù)
Supplier<NodeMap> nodeMap = Suppliers.memoizeWithExpiration(() -> {
ImmutableSetMultimap.Builder<HostAddress, Node> byHostAndPort = ImmutableSetMultimap.builder();
ImmutableSetMultimap.Builder<InetAddress, Node> byHost = ImmutableSetMultimap.builder();
ImmutableSetMultimap.Builder<NetworkLocation, Node> workersByNetworkPath = ImmutableSetMultimap.builder();
Set<Node> nodes;
if (connectorId != null) {
nodes = nodeManager.getActiveConnectorNodes(connectorId);
}
else {
nodes = nodeManager.getNodes(ACTIVE);
}
Set<String> coordinatorNodeIds = nodeManager.getCoordinators().stream()
.map(Node::getNodeIdentifier)
.collect(toImmutableSet());
for (Node node : nodes) {
if (useNetworkTopology && (includeCoordinator || !coordinatorNodeIds.contains(node.getNodeIdentifier()))) {
NetworkLocation location = networkLocationCache.get(node.getHostAndPort());
for (int i = 0; i <= location.getSegments().size(); i++) {
workersByNetworkPath.put(location.subLocation(0, i), node);
}
}
try {
byHostAndPort.put(node.getHostAndPort(), node);
InetAddress host = InetAddress.getByName(node.getHttpUri().getHost());
byHost.put(host, node);
}
catch (UnknownHostException e) {
// ignore
}
}
return new NodeMap(byHostAndPort.build(), byHost.build(), workersByNetworkPath.build(), coordinatorNodeIds);
}, 5, TimeUnit.SECONDS);
if (useNetworkTopology) {
return new TopologyAwareNodeSelector(
nodeManager,
nodeTaskMap,
includeCoordinator,
nodeMap,
minCandidates,
maxSplitsPerNode,
maxPendingSplitsPerTask,
topologicalSplitCounters,
networkLocationSegmentNames,
networkLocationCache);
}
else {
return new SimpleNodeSelector(nodeManager, nodeTaskMap, includeCoordinator, nodeMap, minCandidates, maxSplitsPerNode, maxPendingSplitsPerTask);
}
}
2.4.2調(diào)度選擇策略
Single和Fixed Stage策略权逗,比較簡單美尸,均為調(diào)用selectRandomNodes
2.4.3生成stage調(diào)度器和分配策略
承接2.3.2.3中間的代碼
Optional<int[]> bucketToPartition;
PartitioningHandle partitioningHandle = plan.getFragment().getPartitioning();
// 根據(jù)不同的stage類型,創(chuàng)建不同的stage調(diào)度器
if (partitioningHandle.equals(SOURCE_DISTRIBUTION)) {
// nodes are selected dynamically based on the constraints of the splits and the system load
Entry<PlanNodeId, SplitSource> entry = Iterables.getOnlyElement(plan.getSplitSources().entrySet());
PlanNodeId planNodeId = entry.getKey();
SplitSource splitSource = entry.getValue();
ConnectorId connectorId = splitSource.getConnectorId();
if (isInternalSystemConnector(connectorId)) {
connectorId = null;
}
//創(chuàng)建nodeSelector用來選擇執(zhí)行的節(jié)點斟薇,主要是通過從nodeManager獲取
NodeSelector nodeSelector = nodeScheduler.createNodeSelector(connectorId);
//split動態(tài)分配策略
SplitPlacementPolicy placementPolicy = new DynamicSplitPlacementPolicy(nodeSelector, stage::getAllTasks);
checkArgument(plan.getFragment().getPipelineExecutionStrategy() == UNGROUPED_EXECUTION);
//source階段的stage選擇simpleSourcePartitionedScheduler
stageSchedulers.put(stageId, simpleSourcePartitionedScheduler(stage, planNodeId, splitSource, placementPolicy, splitBatchSize));
bucketToPartition = Optional.of(new int[1]);
}
else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
bucketToPartition = Optional.of(new int[1]);
}
else {
// nodes are pre determined by the nodePartitionMap
NodePartitionMap nodePartitionMap = partitioningCache.apply(plan.getFragment().getPartitioning());
long nodeCount = nodePartitionMap.getPartitionToNode().values().stream().distinct().count();
OptionalInt concurrentLifespansPerTask = getConcurrentLifespansPerNode(session);
Map<PlanNodeId, SplitSource> splitSources = plan.getSplitSources();
//如果fixed階段的stage 分配到了SplitSource 則創(chuàng)建選擇FixedSourcePartitionedScheduler师坎,該調(diào)度器里面自己創(chuàng)建了一個FixedSplitPlacementPolicy分配策略
if (!splitSources.isEmpty()) {
List<PlanNodeId> schedulingOrder = plan.getFragment().getPartitionedSources();
List<ConnectorPartitionHandle> connectorPartitionHandles;
switch (plan.getFragment().getPipelineExecutionStrategy()) {
case GROUPED_EXECUTION:
connectorPartitionHandles = nodePartitioningManager.listPartitionHandles(session, partitioningHandle);
checkState(!ImmutableList.of(NOT_PARTITIONED).equals(connectorPartitionHandles));
break;
case UNGROUPED_EXECUTION:
connectorPartitionHandles = ImmutableList.of(NOT_PARTITIONED);
break;
default:
throw new UnsupportedOperationException();
}
stageSchedulers.put(stageId, new FixedSourcePartitionedScheduler(
stage,
splitSources,
plan.getFragment().getPipelineExecutionStrategy(),
schedulingOrder,
nodePartitionMap,
splitBatchSize,
concurrentLifespansPerTask.isPresent() ? OptionalInt.of(toIntExact(concurrentLifespansPerTask.getAsInt() * nodeCount)) : OptionalInt.empty(),
nodeScheduler.createNodeSelector(null),
connectorPartitionHandles));
bucketToPartition = Optional.of(nodePartitionMap.getBucketToPartition());
}
else {
//存活的node列表
Map<Integer, Node> partitionToNode = nodePartitionMap.getPartitionToNode();
// todo this should asynchronously wait a standard timeout period before failing
checkCondition(!partitionToNode.isEmpty(), NO_NODES_AVAILABLE, "No worker nodes available");
//如果fixed階段的stage 沒有分配到SplitSource,則選擇FixedSourcePartitionedScheduler
stageSchedulers.put(stageId, new FixedCountScheduler(stage, partitionToNode));
bucketToPartition = Optional.of(nodePartitionMap.getBucketToPartition());
}
}
2.4.4sqlQuery調(diào)度器開始調(diào)度
scheduler.start()啟動sqlQueryScheduler的調(diào)度里面涉及到Task的調(diào)度
public void start()
{
if (started.compareAndSet(false, true)) {
executor.submit(this::schedule);
}
}
方法引用調(diào)用schedule()
private void schedule()
{
try (SetThreadName ignored = new SetThreadName("Query-%s", queryStateMachine.getQueryId())) {
Set<StageId> completedStages = new HashSet<>();
ExecutionSchedule executionSchedule = executionPolicy.createExecutionSchedule(stages.values());
while (!executionSchedule.isFinished()) {
List<ListenableFuture<?>> blockedStages = new ArrayList<>();
for (SqlStageExecution stage : executionSchedule.getStagesToSchedule()) {
stage.beginScheduling();
// 調(diào)用每個stage上的stage調(diào)度器進行task的調(diào)度
// perform some scheduling work
ScheduleResult result = stageSchedulers.get(stage.getStageId())
.schedule();
// modify parent and children based on the results of the scheduling
if (result.isFinished()) {
stage.schedulingComplete();
}
else if (!result.getBlocked().isDone()) {
blockedStages.add(result.getBlocked());
}
stageLinkages.get(stage.getStageId())
.processScheduleResults(stage.getState(), result.getNewTasks());
schedulerStats.getSplitsScheduledPerIteration().add(result.getSplitsScheduled());
if (result.getBlockedReason().isPresent()) {
switch (result.getBlockedReason().get()) {
case WRITER_SCALING:
// no-op
break;
case WAITING_FOR_SOURCE:
schedulerStats.getWaitingForSource().update(1);
break;
case SPLIT_QUEUES_FULL:
schedulerStats.getSplitQueuesFull().update(1);
break;
case MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE:
case NO_ACTIVE_DRIVER_GROUP:
break;
default:
throw new UnsupportedOperationException("Unknown blocked reason: " + result.getBlockedReason().get());
}
}
}
// make sure to update stage linkage at least once per loop to catch async state changes (e.g., partial cancel)
for (SqlStageExecution stage : stages.values()) {
if (!completedStages.contains(stage.getStageId()) && stage.getState().isDone()) {
stageLinkages.get(stage.getStageId())
.processScheduleResults(stage.getState(), ImmutableSet.of());
completedStages.add(stage.getStageId());
}
}
// wait for a state change and then schedule again
if (!blockedStages.isEmpty()) {
try (TimeStat.BlockTimer timer = schedulerStats.getSleepTime().time()) {
tryGetFutureValue(whenAnyComplete(blockedStages), 1, SECONDS);
}
for (ListenableFuture<?> blockedStage : blockedStages) {
blockedStage.cancel(true);
}
}
}
for (SqlStageExecution stage : stages.values()) {
StageState state = stage.getState();
if (state != SCHEDULED && state != RUNNING && !state.isDone()) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, format("Scheduling is complete, but stage %s is in state %s", stage.getStageId(), state));
}
}
}
}
未寫完善待續(xù)…
如有錯誤請及時指出堪滨,共同進步~
每天晚上更新~
如需轉(zhuǎn)載請附上本文鏈接胯陋,原創(chuàng)不易謝謝~