Presto技術(shù)源碼解析總結(jié)-一個SQL的奇幻之旅 上

Presto技術(shù)總結(jié) 因為內(nèi)容過長分為了上下兩集

1.環(huán)境準備

Hadoop環(huán)境,Hive環(huán)境办悟,mysql環(huán)境,ssh環(huán)境滩褥,presto本機debug環(huán)境

推薦hadoop2.2.0病蛉、hive1.2.1、mysql5.7、openssh-server&client铺然、presto最新版本

presto本地debug環(huán)境搭建參考presto in idea

2.查詢?nèi)肟?amp;流程

所有的查詢首先打到StatementResource對應(yīng)的路徑為@Path("/v1/statement")

Query query = Query.create(
                sessionContext,
                statement,         //實際的sql
                queryManager,
                sessionPropertyManager,
                exchangeClient,
                responseExecutor,
                timeoutExecutor,
                blockEncodingSerde);
        queries.put(query.getQueryId(), query);  //創(chuàng)建query并放入執(zhí)行隊列

Query類中執(zhí)行

Query result = new Query(sessionContext, query, queryManager, sessionPropertyManager, exchangeClient, dataProcessorExecutor, timeoutExecutor, blockEncodingSerde);

Query類中的queryManager

QueryInfo queryInfo = queryManager.createQuery(sessionContext, query);  //其中sessionContext為用戶的seesion信息俗孝,query為用戶sql

2.1詞法語法分析生成AST

queryManager是一個接口目前只有SqlQueryManager的實現(xiàn)類,createQuery方法

private final ConcurrentMap<QueryId, QueryExecution> queries = new ConcurrentHashMap<>();
//QueryQueueManager是一個接口魄健,sql相關(guān)的實現(xiàn)類SqlQueryQueueManager
private final QueryQueueManager queueManager;

//主要實現(xiàn)的邏輯赋铝,詞法和語法分析,生成AstNode
Statement wrappedStatement = sqlParser.createStatement(query, createParsingOptions(session));
statement = unwrapExecuteStatement(wrappedStatement, sqlParser, session);
List<Expression> parameters = wrappedStatement instanceof Execute ? ((Execute) wrappedStatement).getParameters() : emptyList();

//參數(shù)校驗
validateParameters(statement, parameters);
//獲取對應(yīng)的query執(zhí)行器工廠
QueryExecutionFactory<?> queryExecutionFactory = executionFactories.get(statement.getClass());
//query執(zhí)行器工廠創(chuàng)建query執(zhí)行器
queryExecution = queryExecutionFactory.createQueryExecution(queryId, query, session, statement, parameters);
//將query執(zhí)行器個queryId映射到map中
queries.put(queryId, queryExecution);
//將query執(zhí)行器提交到queueManager
queueManager.submit(statement, queryExecution, queryExecutor);
//返回query信息
return queryInfo;

SqlQueryQueueManager的submit方法

List<QueryQueue> queues;
        try {
            //按照配置的規(guī)則诀艰,選擇執(zhí)行隊列
            queues = selectQueues(queryExecution.getSession(), executor);
        }
        catch (PrestoException e) {
            queryExecution.fail(e);
            return;
        }
        for (QueryQueue queue : queues) {
            if (!queue.reserve(queryExecution)) {
                queryExecution.fail(new PrestoException(QUERY_QUEUE_FULL, "Too many queued queries"));
                return;
            }
        }
        //如果符合規(guī)則則入隊
        queues.get(0).enqueue(createQueuedExecution(queryExecution, queues.subList(1, queues.size()), executor));

    //按照配置的規(guī)則柬甥,選擇執(zhí)行隊列
    private List<QueryQueue> selectQueues(Session session, Executor executor)
    {
        for (QueryQueueRule rule : rules) {
            Optional<List<QueryQueueDefinition>> queues = rule.match(session.toSessionRepresentation());
            if (queues.isPresent()) {
               //獲取或者創(chuàng)建一個Query隊列
                return getOrCreateQueues(session, executor, queues.get());
            }
        }
        throw new PrestoException(QUERY_REJECTED, "Query did not match any queuing rule");
    }

    //獲取或者創(chuàng)建一個Query隊列
    private List<QueryQueue> getOrCreateQueues(Session session, Executor executor, List<QueryQueueDefinition> definitions)
    {
        ImmutableList.Builder<QueryQueue> queues = ImmutableList.builder();
        for (QueryQueueDefinition definition : definitions) {
            String expandedName = definition.getExpandedTemplate(session);
            QueueKey key = new QueueKey(definition, expandedName);
            if (!queryQueues.containsKey(key)) {
                QueryQueue queue = new QueryQueue(executor, definition.getMaxQueued(), definition.getMaxConcurrent());
                if (queryQueues.putIfAbsent(key, queue) == null) {
                    // Export the mbean, after checking for races
                    String objectName = ObjectNames.builder(QueryQueue.class, definition.getTemplate()).withProperty("expansion", expandedName).build();
                    mbeanExporter.export(objectName, queue);
                }
            }
            queues.add(queryQueues.get(key));
        }
        return queues.build();
    }

QueryQueue(Executor queryExecutor, int maxQueuedQueries, int maxConcurrentQueries)
    {
        requireNonNull(queryExecutor, "queryExecutor is null");
        checkArgument(maxQueuedQueries > 0, "maxQueuedQueries must be greater than zero");
        checkArgument(maxConcurrentQueries > 0, "maxConcurrentQueries must be greater than zero");

        int permits = maxQueuedQueries + maxConcurrentQueries;
        // Check for overflow
        checkArgument(permits > 0, "maxQueuedQueries + maxConcurrentQueries must be less than or equal to %s", Integer.MAX_VALUE);

        this.queuePermits = new AtomicInteger(permits);
        this.asyncSemaphore = new AsyncSemaphore<>(maxConcurrentQueries,
                queryExecutor,
                queueEntry -> {
                    QueuedExecution queuedExecution = queueEntry.dequeue();
                    if (queuedExecution != null) {
                        queuedExecution.start();
                        return queuedExecution.getCompletionFuture();
                    }
                    return Futures.immediateFuture(null);
                });
    }

public void start()
    {
        // Only execute if the query is not already completed (e.g. cancelled)
        if (listenableFuture.isDone()) {
            return;
        }
        if (nextQueues.isEmpty()) {
            executor.execute(() -> {
                try (SetThreadName ignored = new SetThreadName("Query-%s", queryExecution.getQueryId())) {
                    //將statement轉(zhuǎn)化為analysis(plan)
                    queryExecution.start();
                }
            });
        }
        else {
            nextQueues.get(0).enqueue(new QueuedExecution(queryExecution, nextQueues.subList(1, nextQueues.size()), executor, listenableFuture));
        }
    }

2.2語義分析&生成邏輯執(zhí)行計劃

2.2.1語義分析

先看看SqlQueryExecution類的構(gòu)成

private final QueryStateMachine stateMachine;

private final Statement statement;                               //詞法語法分析生成的astNode
private final Metadata metadata;
private final AccessControl accessControl;
private final SqlParser sqlParser;                               //sql解析器
private final SplitManager splitManager;
private final NodePartitioningManager nodePartitioningManager;
private final NodeScheduler nodeScheduler; //將task分配給node的核心模塊,stage調(diào)度的時候會詳細說明
private final List<PlanOptimizer> planOptimizers;
private final RemoteTaskFactory remoteTaskFactory;
private final LocationFactory locationFactory;
private final int scheduleSplitBatchSize;
private final ExecutorService queryExecutor;
private final ScheduledExecutorService schedulerExecutor;
private final FailureDetector failureDetector;

private final QueryExplainer queryExplainer;
private final PlanFlattener planFlattener;
private final CostCalculator costCalculator;
private final AtomicReference<SqlQueryScheduler> queryScheduler = new AtomicReference<>();
private final AtomicReference<Plan> queryPlan = new AtomicReference<>();
private final NodeTaskMap nodeTaskMap;
private final ExecutionPolicy executionPolicy;
private final List<Expression> parameters;
private final SplitSchedulerStats schedulerStats;

SqlQueryExecution類的start方法

PlanRoot plan = analyzeQuery();  //生成邏輯執(zhí)行計劃

//調(diào)用棧為
analyzeQuery() -> doAnalyzeQuery()
  
doAnalyzeQuery()
{
    //創(chuàng)建語義分析器
    Analyzer analyzer = new Analyzer(stateMachine.getSession(), metadata, sqlParser, accessControl, Optional.of(queryExplainer), parameters);
    //開始語義分析
    Analysis analysis = analyzer.analyze(statement);
    //生成邏輯Planner
    LogicalPlanner logicalPlanner = new LogicalPlanner(stateMachine.getSession(), planOptimizers, idAllocator, metadata, sqlParser, costCalculator);
    //邏輯Planner開始生成邏輯執(zhí)行計劃其垄,還涉及到邏輯執(zhí)行計劃的優(yōu)化
    Plan plan = logicalPlanner.plan(analysis);
    queryPlan.set(plan);
    //對邏輯執(zhí)行計劃進行分段苛蒲,準備生成分布式執(zhí)行計劃
    SubPlan fragmentedPlan = PlanFragmenter.createSubPlans(stateMachine.getSession(), metadata, nodePartitioningManager, plan, false);
  
    return new PlanRoot(fragmentedPlan, !explainAnalyze, extractConnectors(analysis));
}

Analyzer類的analyze方法

//sql重寫
Statement rewrittenStatement = StatementRewrite.rewrite(session, metadata, sqlParser, queryExplainer, statement, parameters, accessControl);
//初始化Analysis
Analysis analysis = new Analysis(rewrittenStatement, parameters, isDescribe);
//創(chuàng)建Statement分析器
StatementAnalyzer analyzer = new StatementAnalyzer(analysis, metadata, sqlParser, accessControl, session);
//調(diào)用Statement分析器去分析
analyzer.analyze(rewrittenStatement, Optional.empty());

analyze里面的具體實現(xiàn)就是遍歷ASTNode對每種類型的Node作分析,主要是獲取meta和校驗元信息

2.2.2生成邏輯執(zhí)行計劃

LogicalPlanner類的plan方法

PlanNode root = planStatement(analysis, analysis.getStatement());
//檢查執(zhí)行計劃的有效性
PlanSanityChecker.validateIntermediatePlan(root, session, metadata, sqlParser, symbolAllocator.getTypes());
//對生成的邏輯執(zhí)行進行優(yōu)化
root = optimizer.optimize(root, session, symbolAllocator.getTypes(), symbolAllocator, idAllocator);

LogicalPlanner類的planStatement方法

//對于普通的sql來說绿满,只執(zhí)行下面
createOutputPlan(planStatementWithoutOutput(analysis, statement), analysis);

LogicalPlanner類的planStatementWithoutOutput方法

private RelationPlan planStatementWithoutOutput(Analysis analysis, Statement statement)
    {
        if (statement instanceof CreateTableAsSelect) {
            if (analysis.isCreateTableAsSelectNoOp()) {
                throw new PrestoException(NOT_SUPPORTED, "CREATE TABLE IF NOT EXISTS is not supported in this context " + statement.getClass().getSimpleName());
            }
            return createTableCreationPlan(analysis, ((CreateTableAsSelect) statement).getQuery());
        }
        else if (statement instanceof Insert) {
            checkState(analysis.getInsert().isPresent(), "Insert handle is missing");
            return createInsertPlan(analysis, (Insert) statement);
        }
        else if (statement instanceof Delete) {
            return createDeletePlan(analysis, (Delete) statement);
        }
        else if (statement instanceof Query) {
            return createRelationPlan(analysis, (Query) statement);
        }
        else if (statement instanceof Explain && ((Explain) statement).isAnalyze()) {
            return createExplainAnalyzePlan(analysis, (Explain) statement);
        }
        else {
            throw new PrestoException(NOT_SUPPORTED, "Unsupported statement type " + statement.getClass().getSimpleName());
        }
    }

LogicalPlanner類的createRelationPlan方法

return new RelationPlanner(analysis, symbolAllocator, idAllocator, buildLambdaDeclarationToSymbolMap(analysis, symbolAllocator), metadata, session)
        .process(query, null);

RelationPlanner類具體的實現(xiàn)就是臂外,遍歷ASTNode,生成邏輯執(zhí)行計劃里面對應(yīng)的Node

邏輯執(zhí)行計劃中常見的Node和Visit操作如下面所示:

AggregationNode            聚合操作的節(jié)點喇颁,有Final漏健、partial、single三種橘霎,表示最終聚合蔫浆、局部聚合和單點聚合,在執(zhí)行計劃優(yōu)化前姐叁,聚合類型都是單點聚合瓦盛,在優(yōu)化器中會拆成局部聚合和最終聚合,類似于MR任務(wù)中的外潜,map端局部reduce原环,和reduce端最終reduce

DeleteNode                 Delete操作的節(jié)點
ExchangeNode               邏輯執(zhí)行計劃中,不同Stage之間交換數(shù)據(jù)的節(jié)點
FilterNode                 進行Filter過濾操作的節(jié)點
JoinNode                   執(zhí)行Join操作的節(jié)點
LimitNode                  執(zhí)行l(wèi)imit操作的節(jié)點
MarkDistinctNode           處理count distinct
OutputNode                 輸出Node

ProjectNode                將下層的節(jié)點輸出列映射成上層節(jié)點 例如:select a + 1 from b將TableScanNode的a列 + 1 映射到OutputNode

RemoteSourceNode           類似于ExchangeNode处窥,在分布式執(zhí)行計劃中嘱吗,不同Stage之間交換數(shù)據(jù)的節(jié)點
SampleNode                 抽樣函數(shù)Node
RowNumberNode              處理窗函數(shù)RowNumber
SortNode                   排序Node
TableScanNode              讀取表的數(shù)據(jù)
TableWriterNode            寫入表的數(shù)據(jù)
TopNNode                   order by ... limit 會使用效率更高的TopNNode
UnionNode                  處理Union操作
WindowNode                 處理窗口函數(shù)

RelationPlanner類的visit操作

visitTable                 生成TableScanNode
visitAliasedRelation       處理有別名的Relation
visitSampledRelation       添加一個SampleNode,主要處理抽樣函數(shù)

visitJoin                  根據(jù)不同的join類型滔驾,生成不同的節(jié)點結(jié)構(gòu)谒麦,一般來說是將左右兩邊生成對應(yīng)的queryPlan,然后左右各添加一個ProjectNode,中間添加一個JoinNode相連嵌灰,讓上層添加一個FilterNode弄匕,F(xiàn)ilterNode為join條件

visitQuery                 使用QueryPlanner處理Query,并返回生成的執(zhí)行計劃
visitQuerySpecification    使用QueryPlanner處理QueryBody沽瞭,并返回生成的執(zhí)行計劃

QueryPlanner類的plan操作(queryBody的定義就是指一個完整的sql迁匠,可以嵌套,例如select a from QueryBody b,通常來說里面的這個QueryBody會被當做AliasedRelation繼續(xù)plan)

Query和QuerySpecification相比,QuerySpecification代表完整的QueryBody城丧,而Query則包含了QueryBody--QueryBody是一個抽象類延曙,QuerySpecification繼承了QueryBody

plan(Query query)                 首先取出Query中的queryBody,然后調(diào)用RelationPlanner進行分析,調(diào)用其visitQuerySpecification然后RelationPlanner調(diào)用QueryPlanner的plan方法
plan(QuerySpecification query)    生成一個queryBody中所有的組件Node

下面是最主要的 plan(QuerySpecification query)的plan過程

PlanBuilder builder = planFrom(node);                      //builder的root即為生成的NodeTree
RelationPlan fromRelationPlan = builder.getRelationPlan(); //生成TableScanNode
builder = filter(builder, analysis.getWhere(node), node);  //生成FilterNode
builder = aggregate(builder, node);                        //生成AggregateNode
builder = filter(builder, analysis.getHaving(node), node); //如果有Having則生成FilterNode
builder = window(builder, node);                           //生成windowNode
List<Expression> outputs = analysis.getOutputExpressions(node);
builder = handleSubqueries(builder, node, outputs);
if (node.getOrderBy().isPresent() && !SystemSessionProperties.isLegacyOrderByEnabled(session)) {
    if (analysis.getGroupingSets(node).isEmpty()) {
        builder = project(builder, outputs, fromRelationPlan);
        outputs = toSymbolReferences(computeOutputs(builder, outputs));
        builder = planBuilderFor(builder, analysis.getScope(node.getOrderBy().get()));
    }
    else {
        List<Expression> orderByAggregates = analysis.getOrderByAggregates(node.getOrderBy().get());
        builder = project(builder, Iterables.concat(outputs, orderByAggregates));
        outputs = toSymbolReferences(computeOutputs(builder, outputs));
        List<Expression> complexOrderByAggregatesToRemap = orderByAggregates.stream()
                .filter(expression -> !analysis.isColumnReference(expression))
                .collect(toImmutableList());
        builder = planBuilderFor(builder, analysis.getScope(node.getOrderBy().get()), complexOrderByAggregatesToRemap);
    }
    builder = window(builder, node.getOrderBy().get());
}
List<Expression> orderBy = analysis.getOrderByExpressions(node);
builder = handleSubqueries(builder, node, orderBy);
builder = project(builder, Iterables.concat(orderBy, outputs));
builder = distinct(builder, node);
builder = sort(builder, node);
builder = project(builder, outputs);
builder = limit(builder, node);
return new RelationPlan(
        builder.getRoot(),
        analysis.getScope(node),
        computeOutputs(builder, outputs));

2.2.3邏輯執(zhí)行計劃優(yōu)化

LogicalPlanner類的plan方法

root = optimizer.optimize(root, session, symbolAllocator.getTypes(), symbolAllocator, idAllocator);

optimizer優(yōu)化器的具體實現(xiàn)就是亡哄,調(diào)用不同的具體實現(xiàn)的優(yōu)化去對枝缔,上一步生成的NodeTree(邏輯執(zhí)行計劃)進行逐個優(yōu)化

具體的優(yōu)化方法

AddExchanges                   //生成分布式執(zhí)行計劃,例如添加局部聚合和最終聚合
AddLocalExchanges
BeginTableWrite
CanonicalizeExpressions        //將執(zhí)行計劃中的表達式標準化蚊惯,比如將is not null 改寫為not(is null)愿卸,將if語句改寫為case when
CheckSubqueryNodesAreRewritten
CountConstantOptimizer         //將count(a)改寫為count(*)提高不同數(shù)據(jù)源的兼容性
DesugaringOptimizer
DetermineJoinDistributionType
EliminateCrossJoins
EmptyDeleteOptimizer
HashGenerationOptimizer     //提前進行hash計算
ImplementIntersectAndExceptAsUnion
IndexJoinOptimizer          //將Join優(yōu)化為IndexJoiJ,獲取Join表的索引截型,提升速度
IterativeOptimizer
LimitPushDown               //limit條件下推趴荸,減小下層節(jié)點的數(shù)據(jù)量
MetadataDeleteOptimizer
MetadataQueryOptimizer      //將對表的分區(qū)字段進行的聚合操作,改寫為針對表元數(shù)據(jù)的查詢宦焦,減少讀取表的操作
OptimizeMixedDistinctAggregations
PickLayout
PredicatePushDown           //謂詞(過濾條件)下推发钝,減下下層節(jié)點的數(shù)據(jù)量
ProjectionPushDown          //ProjectNode下推,減少Union節(jié)點的數(shù)據(jù)量
PruneUnreferencedOutputs    //去除ProjectNodeP不在最終輸出中的列波闹,減小計算量
PruneRedundantProjections   //去除多余的projectNode酝豪,如果上下節(jié)點全都直接映射,則去掉該層projectNode
PushTableWriteThroughUnion
RemoveUnreferencedScalarLateralNodes
SetFlatteningOptimizer         //合并能夠合并的Union語句
SimplifyExpressions            //對執(zhí)行計劃中涉及到的表達式進行化簡和優(yōu)化
TransformCorrelatedNoAggregationSubqueryToJoin
TransformCorrelatedNoAggregationSubqueryToJoin
TransformCorrelatedScalarAggregationToJoin
TransformCorrelatedSingleRowSubqueryToProject
TransformQuantifiedComparisonApplyToLateralJoin
TransformUncorrelatedInPredicateSubqueryToSemiJoin
TransformUncorrelatedLateralToJoin
UnaliasSymbolReferences        //去除執(zhí)行計劃中projectNode無意義的映射精堕,如果列直接相對而沒有帶表達式則直接映射到上層節(jié)點
WindowFilterPushDown

2.3生成分布式執(zhí)行計劃

2.3.1邏輯執(zhí)行計劃分段

這個階段會將上面生成的邏輯執(zhí)行計劃切分為孵淘,多個Stage,其中Stage的階段分為四個階段:

Sourece歹篓、Fixed夺英、Single

Sourece:一般是TableScanNode、ProjectNode滋捶、FilterNode,一般是最下游的取數(shù)的Stage

Fixed:一般在Sourece之后余黎,將Sourece階段獲取的數(shù)據(jù)分散到多個節(jié)點上處理重窟,類似于Map端reduce操作,包括局部聚合惧财、局部Join巡扇、局部數(shù)據(jù)寫入

Single:一般在Fixed之后,只在一臺機器上進行垮衷,匯總所有的結(jié)果厅翔、做最終聚合、全局排序搀突,并將結(jié)果傳輸給Coordinator

Coordinator_only:只在Coordinator上

SqlQueryExecution類doAnalyzeQuery方法

SubPlan fragmentedPlan = PlanFragmenter.createSubPlans(stateMachine.getSession(), metadata, nodePartitioningManager, plan, false);

//SubPlan類的構(gòu)造
private final PlanFragment fragment;
private final List<SubPlan> children;

可以看出來刀闷,內(nèi)部是類似于B樹的樹形結(jié)構(gòu),這樣就將邏輯執(zhí)行計劃切分為了若干個段

2.3.2生成分布式執(zhí)行計劃

2.3.2.1獲取SplitSource分片

SqlQueryExecution類start方法

//生成分段的執(zhí)行計劃
PlanRoot plan = analyzeQuery();
//生成分布式執(zhí)行計劃
planDistribution(plan);   

SqlQueryExecution類planDistribution方法

//獲取stage的執(zhí)行計劃
StageExecutionPlan outputStageExecutionPlan = distributedPlanner.plan(plan.getRoot(), stateMachine.getSession());
//創(chuàng)建SqlQuery調(diào)度
SqlQueryScheduler scheduler = new SqlQueryScheduler(
                //狀態(tài)監(jiān)聽器
                stateMachine,
                locationFactory,
                outputStageExecutionPlan,
                nodePartitioningManager,
                //將task分配給node的核心模塊
                nodeScheduler,
                remoteTaskFactory,
                stateMachine.getSession(),
                plan.isSummarizeTaskInfos(),
                scheduleSplitBatchSize,
                queryExecutor,
                schedulerExecutor,
                failureDetector,
                rootOutputBuffers,
                //保存了當前stage分配的task和node的映射關(guān)系
                nodeTaskMap,
                executionPolicy,
                schedulerStats);

DistributedExecutionPlanner類plan方法

調(diào)用棧為plan -> doPlan

private StageExecutionPlan doPlan(SubPlan root, Session session, ImmutableList.Builder<SplitSource> allSplitSources)
{
PlanFragment currentFragment = root.getFragment();
//visitor模式遍歷分段,對TableScaTNode進行splitManager.getSplits()操作來獲取分片甸昏,實現(xiàn)類是HiveSplitManager顽分,內(nèi)部實現(xiàn)是調(diào)用HiveSplitLoader.start()方法,下面進行詳細說明
//這里好像說明了一個stage里面只能有一個tableScan施蜜?卒蘸??
Map<PlanNodeId, SplitSource> splitSources = currentFragment.getRoot().accept(new Visitor(session, currentFragment.getPipelineExecutionStrategy(), allSplitSources), null);
ImmutableList.Builder<StageExecutionPlan> dependencies = ImmutableList.builder();
for (SubPlan childPlan : root.getChildren()) {
    dependencies.add(doPlan(childPlan, session, allSplitSources)); //此處會遞歸的調(diào)用翻默,將子邏輯執(zhí)行計劃全部轉(zhuǎn)化為帶用層級關(guān)系的stage執(zhí)行計劃
}
return new StageExecutionPlan(
        currentFragment,
        splitSources,
        dependencies.build());
}

對TableScaTNode進行splitManager.getSplits()操作來獲取分片缸沃,并將結(jié)果保存在Map<PlanNodeId, SplitSource> splitSources中(其實就是每個Node對應(yīng)的SplitSource)

public Map<PlanNodeId, SplitSource> visitTableScan(TableScanNode node, Void context)
{
    // get dataSource for table
    SplitSource splitSource = splitManager.getSplits(
            session,
            node.getLayout().get(),
            pipelineExecutionStrategy == GROUPED_EXECUTION ? GROUPED_SCHEDULING : UNGROUPED_SCHEDULING);
    splitSources.add(splitSource);
    return ImmutableMap.of(node.getId(), splitSource);
}

HiveSplitManager實現(xiàn)ConnectorSplitManager(Presto SPI接口)

public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableLayoutHandle layoutHandle, SplitSchedulingStrategy splitSchedulingStrategy)
{
    HiveTableLayoutHandle layout = (HiveTableLayoutHandle) layoutHandle;
    SchemaTableName tableName = layout.getSchemaTableName();

    // get table metadata
    SemiTransactionalHiveMetastore metastore = metastoreProvider.apply((HiveTransactionHandle) transaction);
    Table table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName())
            .orElseThrow(() -> new TableNotFoundException(tableName));

    // verify table is not marked as non-readable
    String tableNotReadable = table.getParameters().get(OBJECT_NOT_READABLE);
    if (!isNullOrEmpty(tableNotReadable)) {
        throw new HiveNotReadableException(tableName, Optional.empty(), tableNotReadable);
    }

    // 獲取hive的分區(qū)
    List<HivePartition> partitions = layout.getPartitions()
            .orElseThrow(() -> new PrestoException(GENERIC_INTERNAL_ERROR, "Layout does not contain partitions"));

    // short circuit if we don't have any partitions
    HivePartition partition = Iterables.getFirst(partitions, null);
    if (partition == null) {
        return new FixedSplitSource(ImmutableList.of());
    }

    // get buckets from first partition (arbitrary)
    List<HiveBucket> buckets = partition.getBuckets();

    // validate bucket bucketed execution
    Optional<HiveBucketHandle> bucketHandle = layout.getBucketHandle();
    if ((splitSchedulingStrategy == GROUPED_SCHEDULING) && !bucketHandle.isPresent()) {
        throw new PrestoException(GENERIC_INTERNAL_ERROR, "SchedulingPolicy is bucketed, but BucketHandle is not present");
    }

    // sort partitions
    partitions = Ordering.natural().onResultOf(HivePartition::getPartitionId).reverse().sortedCopy(partitions);

    Iterable<HivePartitionMetadata> hivePartitions = getPartitionMetadata(metastore, table, tableName, partitions, bucketHandle.map(HiveBucketHandle::toBucketProperty));

    HiveSplitLoader hiveSplitLoader = new BackgroundHiveSplitLoader(
            table,
            hivePartitions,
            layout.getCompactEffectivePredicate(),
            createBucketSplitInfo(bucketHandle, buckets),
            session,
            hdfsEnvironment,
            namenodeStats,
            directoryLister,
            executor,
            splitLoaderConcurrency,
            recursiveDfsWalkerEnabled);

    HiveSplitSource splitSource;
    switch (splitSchedulingStrategy) {
        case UNGROUPED_SCHEDULING:
            splitSource = HiveSplitSource.allAtOnce(
                    session,
                    table.getDatabaseName(),
                    table.getTableName(),
                    layout.getCompactEffectivePredicate(),
                    maxInitialSplits,
                    maxOutstandingSplits,
                    maxOutstandingSplitsSize,
                    hiveSplitLoader,
                    executor,
                    new CounterStat());
            break;
        case GROUPED_SCHEDULING:
            splitSource = HiveSplitSource.bucketed(
                    session,
                    table.getDatabaseName(),
                    table.getTableName(),
                    layout.getCompactEffectivePredicate(),
                    maxInitialSplits,
                    maxOutstandingSplits,
                    new DataSize(32, MEGABYTE),
                    hiveSplitLoader,
                    executor,
                    new CounterStat());
            break;
        default:
            throw new IllegalArgumentException("Unknown splitSchedulingStrategy: " + splitSchedulingStrategy);
    }
    hiveSplitLoader.start(splitSource);

    return splitSource;
}
2.3.2.2產(chǎn)生stage執(zhí)行計劃

上面產(chǎn)生了一個StageExecutionPlan(stage執(zhí)行計劃),下面看看StageExecutionPlan的結(jié)構(gòu)**

private final PlanFragment fragment;                       //當前執(zhí)行計劃分段
private final Map<PlanNodeId, SplitSource> splitSources;   //從HiveSplitManager獲取的分片映射關(guān)系
private final List<StageExecutionPlan> subStages;          //子執(zhí)行計劃分段
private final Optional<List<String>> fieldNames;           //字段名稱

經(jīng)過planDistribution方法之后修械,分段的邏輯執(zhí)行計劃就轉(zhuǎn)化成了stage執(zhí)行計劃趾牧,而presto對task的調(diào)度都是基于stage來調(diào)度的,緊接著SqlQueryScheduler會構(gòu)造SqlStage執(zhí)行器

SqlQueryScheduler類的構(gòu)造方法

List<SqlStageExecution> stages = createStages(
                (fragmentId, tasks, noMoreExchangeLocations) -> updateQueryOutputLocations(queryStateMachine, rootBufferId, tasks, noMoreExchangeLocations),
                new AtomicInteger(),
                locationFactory,
                plan.withBucketToPartition(Optional.of(new int[1])),
                nodeScheduler,
                remoteTaskFactory,
                session,
                splitBatchSize,
                partitioningHandle -> partitioningCache.computeIfAbsent(partitioningHandle, handle -> nodePartitioningManager.getNodePartitioningMap(session, handle)),
                nodePartitioningManager,
                queryExecutor,
                schedulerExecutor,
                failureDetector,
                nodeTaskMap,
                stageSchedulers,
                stageLinkages);

SqlStageExecution rootStage = stages.get(0);
2.3.2.3產(chǎn)生stage執(zhí)行器

SqlQueryScheduler類的createStages方法

ImmutableList.Builder<SqlStageExecution> stages = ImmutableList.builder();

StageId stageId = new StageId(queryStateMachine.getQueryId(), nextStageId.getAndIncrement());
SqlStageExecution stage = new SqlStageExecution(  //創(chuàng)建當前的SqlStageExecution
        stageId,
        locationFactory.createStageLocation(stageId),
        plan.getFragment(),
        remoteTaskFactory,
        session,
        summarizeTaskInfo,
        nodeTaskMap,
        queryExecutor,
        failureDetector,
        schedulerStats);
stages.add(stage);

...
...
//中間省略創(chuàng)建stage調(diào)度器和分配策略的步驟祠肥,詳情見2.4.3

ImmutableSet.Builder<SqlStageExecution> childStagesBuilder = ImmutableSet.builder();
        for (StageExecutionPlan subStagePlan : plan.getSubStages()) {
            List<SqlStageExecution> subTree = createStages( //遞歸創(chuàng)建所有的子SqlStageExecution
                    stage::addExchangeLocations,
                    nextStageId,
                    locationFactory,
                    subStagePlan.withBucketToPartition(bucketToPartition),
                    nodeScheduler,
                    remoteTaskFactory,
                    session,
                    splitBatchSize,
                    partitioningCache,
                    nodePartitioningManager,
                    queryExecutor,
                    schedulerExecutor,
                    failureDetector,
                    nodeTaskMap,
                    stageSchedulers,
                    stageLinkages);
            stages.addAll(subTree);

            SqlStageExecution childStage = subTree.get(0);
            childStagesBuilder.add(childStage);
        }
Set<SqlStageExecution> childStages = childStagesBuilder.build();

至此所有SqlStageExecution生成完畢武氓,下面看一下SqlStageExecution的簡化構(gòu)成

private final StageStateMachine stateMachine;       //stage狀態(tài)監(jiān)聽器
private final RemoteTaskFactory remoteTaskFactory;  //生成Task的工廠類
private final NodeTaskMap nodeTaskMap;              //保存當前stage分配的task和節(jié)點映射列表
private final Map<Node, Set<RemoteTask>> tasks = new ConcurrentHashMap<>();
private final AtomicInteger nextTaskId = new AtomicInteger();
private final Set<TaskId> allTasks = newConcurrentHashSet();
private final Set<TaskId> finishedTasks = newConcurrentHashSet();
private final Multimap<PlanNodeId, RemoteTask> sourceTasks = HashMultimap.create();

2.4生成分布式執(zhí)行計劃調(diào)度

2.4.1調(diào)度相關(guān)的服務(wù)類

先介紹一下上文提到的SqlQueryExecution中的NodeScheduler類

主要包括成員
InternalNodeManager nodeManager       //獲取存活的節(jié)點列表,保存在NodeMap里面仇箱,定時更新內(nèi)容县恕,默認5秒

主要包括方法
List<Node> selectNodes                //選取存活的Node列表
NodeSelector createNodeSelector       //提供了NodeSelector,其中包括各個stage中task分配的算法
ResettableRandomizedIterator<Node> randomizedNodes  //打亂給定的NodeMap

InternalNodeManager接口的定義為

public interface InternalNodeManager
{
    Set<Node> getNodes(NodeState state);                         //獲取指定狀態(tài)的節(jié)點列表
    Set<Node> getActiveConnectorNodes(ConnectorId connectorId);  //根據(jù)connectorId獲取節(jié)點列表
    Node getCurrentNode();       //獲取當前節(jié)點信息
    Set<Node> getCoordinators(); //獲取Coordinator列表
    AllNodes getAllNodes();      //獲取所有的節(jié)點列表
    void refreshNodes();         //刷新節(jié)點的信息
}
//有DiscoveryNodeManager的實現(xiàn)類

NodeSelector接口定義為

public interface NodeSelector
{
    void lockDownNodes();
    List<Node> allNodes();                   //選擇所有的節(jié)點
    Node selectCurrentNode();                //選擇當前節(jié)點
    List<Node> selectRandomNodes(int limit)  //選擇limit個隨機的節(jié)點
    List<Node> selectRandomNodes(int limit, Set<Node> excludedNodes);  //選擇limit個隨機的節(jié)點排除給定的節(jié)點
    SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks);
    SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks, NodePartitionMap partitioning);
}
//SimpleNodeSelector和TopologyAwareNodeSelector實現(xiàn)類 Presto會根據(jù)不同的網(wǎng)絡(luò)拓撲結(jié)構(gòu)來選擇不同的NodeSelector

//在NodeScheduler的構(gòu)造方法中,只要不是 LEGACY網(wǎng)絡(luò) 就認為使用了網(wǎng)絡(luò)拓撲,LEGACY網(wǎng)絡(luò)指的是歷史的網(wǎng)絡(luò)剂桥,采用了非TCP/IP的網(wǎng)絡(luò)
this.useNetworkTopology = !config.getNetworkTopology().equals(NetworkTopologyType.LEGACY);

//在createNodeSelector方法中忠烛,實例化了NodeSelector
if (useNetworkTopology) {
    //所以只要你的網(wǎng)絡(luò)使用了TCP/IP協(xié)議,實例化的NodeSelector都是TopologyAwareNodeSelector
    return new TopologyAwareNodeSelector(
            nodeManager,
            nodeTaskMap,
            includeCoordinator,
            nodeMap,
            minCandidates,
            maxSplitsPerNode,
            maxPendingSplitsPerTask,
            topologicalSplitCounters,
            networkLocationSegmentNames,
            networkLocationCache);
}
else {
    return new SimpleNodeSelector(nodeManager, nodeTaskMap, includeCoordinator, nodeMap, minCandidates, maxSplitsPerNode, maxPendingSplitsPerTask);
}

Node的定義為

public interface Node
{
    HostAddress getHostAndPort();   //host和port
    URI getHttpUri();               //url
    String getNodeIdentifier();
    String getVersion();            //version
    boolean isCoordinator();        //是不是Coordinator
}

創(chuàng)建createNodeSelector過程

public NodeSelector createNodeSelector(ConnectorId connectorId)
  {
      //采用了谷歌的Supplier緩存技術(shù)
      Supplier<NodeMap> nodeMap = Suppliers.memoizeWithExpiration(() -> {
          ImmutableSetMultimap.Builder<HostAddress, Node> byHostAndPort = ImmutableSetMultimap.builder();
          ImmutableSetMultimap.Builder<InetAddress, Node> byHost = ImmutableSetMultimap.builder();
          ImmutableSetMultimap.Builder<NetworkLocation, Node> workersByNetworkPath = ImmutableSetMultimap.builder();
          Set<Node> nodes;
          if (connectorId != null) {
              nodes = nodeManager.getActiveConnectorNodes(connectorId);
          }
          else {
              nodes = nodeManager.getNodes(ACTIVE);
          }

          Set<String> coordinatorNodeIds = nodeManager.getCoordinators().stream()
                  .map(Node::getNodeIdentifier)
                  .collect(toImmutableSet());
          for (Node node : nodes) {
              if (useNetworkTopology && (includeCoordinator || !coordinatorNodeIds.contains(node.getNodeIdentifier()))) {
                  NetworkLocation location = networkLocationCache.get(node.getHostAndPort());
                  for (int i = 0; i <= location.getSegments().size(); i++) {
                      workersByNetworkPath.put(location.subLocation(0, i), node);
                  }
              }
              try {
                  byHostAndPort.put(node.getHostAndPort(), node);

                  InetAddress host = InetAddress.getByName(node.getHttpUri().getHost());
                  byHost.put(host, node);
              }
              catch (UnknownHostException e) {
                  // ignore
              }
          }

          return new NodeMap(byHostAndPort.build(), byHost.build(), workersByNetworkPath.build(), coordinatorNodeIds);
      }, 5, TimeUnit.SECONDS);
      if (useNetworkTopology) {
          return new TopologyAwareNodeSelector(
                  nodeManager,
                  nodeTaskMap,
                  includeCoordinator,
                  nodeMap,
                  minCandidates,
                  maxSplitsPerNode,
                  maxPendingSplitsPerTask,
                  topologicalSplitCounters,
                  networkLocationSegmentNames,
                  networkLocationCache);
      }
      else {
          return new SimpleNodeSelector(nodeManager, nodeTaskMap, includeCoordinator, nodeMap, minCandidates, maxSplitsPerNode, maxPendingSplitsPerTask);
      }
  }

2.4.2調(diào)度選擇策略

Single和Fixed Stage策略权逗,比較簡單美尸,均為調(diào)用selectRandomNodes

2.4.3生成stage調(diào)度器和分配策略

承接2.3.2.3中間的代碼

Optional<int[]> bucketToPartition;
PartitioningHandle partitioningHandle = plan.getFragment().getPartitioning();

    // 根據(jù)不同的stage類型,創(chuàng)建不同的stage調(diào)度器
if (partitioningHandle.equals(SOURCE_DISTRIBUTION)) {
    // nodes are selected dynamically based on the constraints of the splits and the system load
    Entry<PlanNodeId, SplitSource> entry = Iterables.getOnlyElement(plan.getSplitSources().entrySet());
    PlanNodeId planNodeId = entry.getKey();
    SplitSource splitSource = entry.getValue();
    ConnectorId connectorId = splitSource.getConnectorId();
    if (isInternalSystemConnector(connectorId)) {
        connectorId = null;
    }
    //創(chuàng)建nodeSelector用來選擇執(zhí)行的節(jié)點斟薇,主要是通過從nodeManager獲取
    NodeSelector nodeSelector = nodeScheduler.createNodeSelector(connectorId);
    //split動態(tài)分配策略
    SplitPlacementPolicy placementPolicy = new DynamicSplitPlacementPolicy(nodeSelector, stage::getAllTasks);

    checkArgument(plan.getFragment().getPipelineExecutionStrategy() == UNGROUPED_EXECUTION);
  
    //source階段的stage選擇simpleSourcePartitionedScheduler
    stageSchedulers.put(stageId, simpleSourcePartitionedScheduler(stage, planNodeId, splitSource, placementPolicy, splitBatchSize));
    bucketToPartition = Optional.of(new int[1]);
}
else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
    bucketToPartition = Optional.of(new int[1]);
}
else {
    // nodes are pre determined by the nodePartitionMap
    NodePartitionMap nodePartitionMap = partitioningCache.apply(plan.getFragment().getPartitioning());
    long nodeCount = nodePartitionMap.getPartitionToNode().values().stream().distinct().count();
    OptionalInt concurrentLifespansPerTask = getConcurrentLifespansPerNode(session);

    Map<PlanNodeId, SplitSource> splitSources = plan.getSplitSources();
    //如果fixed階段的stage 分配到了SplitSource 則創(chuàng)建選擇FixedSourcePartitionedScheduler师坎,該調(diào)度器里面自己創(chuàng)建了一個FixedSplitPlacementPolicy分配策略
    if (!splitSources.isEmpty()) {
        List<PlanNodeId> schedulingOrder = plan.getFragment().getPartitionedSources();
        List<ConnectorPartitionHandle> connectorPartitionHandles;
        switch (plan.getFragment().getPipelineExecutionStrategy()) {
            case GROUPED_EXECUTION:
                connectorPartitionHandles = nodePartitioningManager.listPartitionHandles(session, partitioningHandle);
                checkState(!ImmutableList.of(NOT_PARTITIONED).equals(connectorPartitionHandles));
                break;
            case UNGROUPED_EXECUTION:
                connectorPartitionHandles = ImmutableList.of(NOT_PARTITIONED);
                break;
            default:
                throw new UnsupportedOperationException();
        }
        stageSchedulers.put(stageId, new FixedSourcePartitionedScheduler(
                stage,
                splitSources,
                plan.getFragment().getPipelineExecutionStrategy(),
                schedulingOrder,
                nodePartitionMap,
                splitBatchSize,
                concurrentLifespansPerTask.isPresent() ? OptionalInt.of(toIntExact(concurrentLifespansPerTask.getAsInt() * nodeCount)) : OptionalInt.empty(),
                nodeScheduler.createNodeSelector(null),
                connectorPartitionHandles));
        bucketToPartition = Optional.of(nodePartitionMap.getBucketToPartition());
    }
    else {
        //存活的node列表
        Map<Integer, Node> partitionToNode = nodePartitionMap.getPartitionToNode();
        // todo this should asynchronously wait a standard timeout period before failing
        checkCondition(!partitionToNode.isEmpty(), NO_NODES_AVAILABLE, "No worker nodes available");

        //如果fixed階段的stage 沒有分配到SplitSource,則選擇FixedSourcePartitionedScheduler
        stageSchedulers.put(stageId, new FixedCountScheduler(stage, partitionToNode));
        bucketToPartition = Optional.of(nodePartitionMap.getBucketToPartition());
    }
}

2.4.4sqlQuery調(diào)度器開始調(diào)度

scheduler.start()啟動sqlQueryScheduler的調(diào)度里面涉及到Task的調(diào)度

public void start()
{
    if (started.compareAndSet(false, true)) {
        executor.submit(this::schedule);
    }
}

方法引用調(diào)用schedule()

 
private void schedule()
{
    try (SetThreadName ignored = new SetThreadName("Query-%s", queryStateMachine.getQueryId())) {
        Set<StageId> completedStages = new HashSet<>();
        ExecutionSchedule executionSchedule = executionPolicy.createExecutionSchedule(stages.values());
        while (!executionSchedule.isFinished()) {
            List<ListenableFuture<?>> blockedStages = new ArrayList<>();
            for (SqlStageExecution stage : executionSchedule.getStagesToSchedule()) {
                stage.beginScheduling();

                // 調(diào)用每個stage上的stage調(diào)度器進行task的調(diào)度
                // perform some scheduling work
                ScheduleResult result = stageSchedulers.get(stage.getStageId())
                        .schedule();

                // modify parent and children based on the results of the scheduling
                if (result.isFinished()) {
                    stage.schedulingComplete();
                }
                else if (!result.getBlocked().isDone()) {
                    blockedStages.add(result.getBlocked());
                }
                stageLinkages.get(stage.getStageId())
                        .processScheduleResults(stage.getState(), result.getNewTasks());
                schedulerStats.getSplitsScheduledPerIteration().add(result.getSplitsScheduled());
                if (result.getBlockedReason().isPresent()) {
                    switch (result.getBlockedReason().get()) {
                        case WRITER_SCALING:
                            // no-op
                            break;
                        case WAITING_FOR_SOURCE:
                            schedulerStats.getWaitingForSource().update(1);
                            break;
                        case SPLIT_QUEUES_FULL:
                            schedulerStats.getSplitQueuesFull().update(1);
                            break;
                        case MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE:
                        case NO_ACTIVE_DRIVER_GROUP:
                            break;
                        default:
                            throw new UnsupportedOperationException("Unknown blocked reason: " + result.getBlockedReason().get());
                    }
                }
            }

            // make sure to update stage linkage at least once per loop to catch async state changes (e.g., partial cancel)
            for (SqlStageExecution stage : stages.values()) {
                if (!completedStages.contains(stage.getStageId()) && stage.getState().isDone()) {
                    stageLinkages.get(stage.getStageId())
                            .processScheduleResults(stage.getState(), ImmutableSet.of());
                    completedStages.add(stage.getStageId());
                }
            }

            // wait for a state change and then schedule again
            if (!blockedStages.isEmpty()) {
                try (TimeStat.BlockTimer timer = schedulerStats.getSleepTime().time()) {
                    tryGetFutureValue(whenAnyComplete(blockedStages), 1, SECONDS);
                }
                for (ListenableFuture<?> blockedStage : blockedStages) {
                    blockedStage.cancel(true);
                }
            }
        }

        for (SqlStageExecution stage : stages.values()) {
            StageState state = stage.getState();
            if (state != SCHEDULED && state != RUNNING && !state.isDone()) {
                throw new PrestoException(GENERIC_INTERNAL_ERROR, format("Scheduling is complete, but stage %s is in state %s", stage.getStageId(), state));
            }
        }
    }
}

未寫完善待續(xù)…

如有錯誤請及時指出堪滨,共同進步~

每天晚上更新~

如需轉(zhuǎn)載請附上本文鏈接胯陋,原創(chuàng)不易謝謝~

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市袱箱,隨后出現(xiàn)的幾起案子遏乔,更是在濱河造成了極大的恐慌,老刑警劉巖发笔,帶你破解...
    沈念sama閱讀 216,692評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件盟萨,死亡現(xiàn)場離奇詭異,居然都是意外死亡了讨,警方通過查閱死者的電腦和手機捻激,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評論 3 392
  • 文/潘曉璐 我一進店門制轰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人铺罢,你說我怎么就攤上這事艇挨。” “怎么了韭赘?”我有些...
    開封第一講書人閱讀 162,995評論 0 353
  • 文/不壞的土叔 我叫張陵缩滨,是天一觀的道長。 經(jīng)常有香客問我泉瞻,道長脉漏,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,223評論 1 292
  • 正文 為了忘掉前任袖牙,我火速辦了婚禮侧巨,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘鞭达。我一直安慰自己司忱,他們只是感情好,可當我...
    茶點故事閱讀 67,245評論 6 388
  • 文/花漫 我一把揭開白布畴蹭。 她就那樣靜靜地躺著坦仍,像睡著了一般。 火紅的嫁衣襯著肌膚如雪叨襟。 梳的紋絲不亂的頭發(fā)上繁扎,一...
    開封第一講書人閱讀 51,208評論 1 299
  • 那天,我揣著相機與錄音糊闽,去河邊找鬼梳玫。 笑死,一個胖子當著我的面吹牛右犹,可吹牛的內(nèi)容都是我干的提澎。 我是一名探鬼主播,決...
    沈念sama閱讀 40,091評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼念链,長吁一口氣:“原來是場噩夢啊……” “哼虱朵!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起钓账,我...
    開封第一講書人閱讀 38,929評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎絮宁,沒想到半個月后梆暮,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,346評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡绍昂,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,570評論 2 333
  • 正文 我和宋清朗相戀三年啦粹,在試婚紗的時候發(fā)現(xiàn)自己被綠了偿荷。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,739評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡唠椭,死狀恐怖跳纳,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情贪嫂,我是刑警寧澤寺庄,帶...
    沈念sama閱讀 35,437評論 5 344
  • 正文 年R本政府宣布,位于F島的核電站力崇,受9級特大地震影響斗塘,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜亮靴,卻給世界環(huán)境...
    茶點故事閱讀 41,037評論 3 326
  • 文/蒙蒙 一馍盟、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧茧吊,春花似錦贞岭、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,677評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至休讳,卻和暖如春讲婚,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背俊柔。 一陣腳步聲響...
    開封第一講書人閱讀 32,833評論 1 269
  • 我被黑心中介騙來泰國打工筹麸, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人雏婶。 一個月前我還...
    沈念sama閱讀 47,760評論 2 369
  • 正文 我出身青樓物赶,卻偏偏與公主長得像,于是被迫代替她去往敵國和親留晚。 傳聞我的和親對象是個殘疾皇子酵紫,可洞房花燭夜當晚...
    茶點故事閱讀 44,647評論 2 354

推薦閱讀更多精彩內(nèi)容