在使用 Apache Doris 時(shí)赴魁,我們可以通過(guò) Apache Doris FE Web 頁(yè)面或者 Mysql 協(xié)議執(zhí)行 SQL 語(yǔ)句尚粘,但是對(duì)于 Apache Doris 背后如何對(duì) SQL 進(jìn)行處理郎嫁,我們無(wú)從所知。本文章內(nèi)容主要講解 Apache Doris 查詢 SQL 在 FE 節(jié)點(diǎn)處理原理。Doris 查詢語(yǔ)句和市面主流的數(shù)據(jù)庫(kù)處理階段都差不多杠茬,需要經(jīng)過(guò) Parse,Analyze,Optimize,Plan,Schedule,Execute 等階段瓢喉。 在 Doris 中舀透,F(xiàn)E 負(fù)責(zé)查詢的 Parse,Analyze,Optimize,Plan, Schedule走贪,BE 負(fù)責(zé)執(zhí)行 FE 下發(fā) Plan Fragment
## 一坠狡、前言
在使用 Apache Doris 時(shí)逃沿,我們可以通過(guò) Apache Doris FE Web 頁(yè)面或者 Mysql 協(xié)議執(zhí)行 SQL 語(yǔ)句感挥,但是對(duì)于 Apache Doris 背后如何對(duì) SQL 進(jìn)行處理触幼,我們無(wú)從所知。本文章內(nèi)容主要講解 Apache Doris 查詢 SQL 在 FE 節(jié)點(diǎn)處理原理媒峡。Doris 查詢語(yǔ)句和市面主流的數(shù)據(jù)庫(kù)處理階段都差不多谅阿,需要經(jīng)過(guò) Parse,Analyze,Optimize,Plan,Schedule,Execute 等階段。 在 Doris 中氯檐,F(xiàn)E 負(fù)責(zé)查詢的 Parse,Analyze,Optimize,Plan, Schedule冠摄,BE 負(fù)責(zé)執(zhí)行 FE 下發(fā) Plan Fragment
## 二、名詞解釋
* FE:Frontend乔询,即 Doris 的前端節(jié)點(diǎn)竿刁。主要負(fù)責(zé)接收和返回客戶端請(qǐng)求鸵熟、元數(shù)據(jù)以及集群管理、查詢計(jì)劃生成等工作。
* BE:Backend奏篙,即 Doris 的后端節(jié)點(diǎn)。主要負(fù)責(zé)數(shù)據(jù)存儲(chǔ)與管理、查詢計(jì)劃執(zhí)行等工作。
* slot:計(jì)算槽繁仁,是一個(gè)資源單位, 只有給 task 分配了一個(gè) slot 之后, 這個(gè) task 才可以運(yùn)行
* planNode : 邏輯算子
* planNodeTree: 邏輯執(zhí)行計(jì)劃
## 三蔓倍、執(zhí)行流程
在使用 Apache Doris 時(shí)默勾,我們可以通過(guò) Apache Doris FE Web 頁(yè)面或者 Mysql 協(xié)議執(zhí)行 SQL 語(yǔ)句聚谁,但是對(duì)于 Apache Doris 背后如何對(duì) SQL 進(jìn)行處理母剥,我們無(wú)從所知。本文章內(nèi)容主要講解 Apache Doris 查詢 SQL 在 FE 節(jié)點(diǎn)處理原理。Doris 查詢語(yǔ)句和市面主流的數(shù)據(jù)庫(kù)處理階段都差不多环疼,需要經(jīng)過(guò) Parse,Analyze,Optimize,Plan,Schedule,Execute 等階段习霹。 在 Doris 中,F(xiàn)E 負(fù)責(zé)查詢的 Parse,Analyze,Optimize,Plan, Schedule炫隶,BE 負(fù)責(zé)執(zhí)行 FE 下發(fā) Plan Fragment
## 四淋叶、Apache Doris 查詢?cè)?/p>
#### (一)SQL 接收
本文只說(shuō) mysql 協(xié)議如何接收 SQL 語(yǔ)句, 如果感興趣的同學(xué)可以看看 Apache Doris FE Web 的 Rest Api。Apache Doris 兼容 Mysql 協(xié)議伪阶,用戶可以通過(guò) Mysql 客戶端和其他支持 Mysql 協(xié)議的工具向 Doris 發(fā)送查詢請(qǐng)求煞檩。MysqlServer Listener() 負(fù)責(zé)監(jiān)聽客戶端發(fā)送來(lái)的 Mysql 連接請(qǐng)求,每個(gè)連接請(qǐng)求都被封裝成一個(gè) ConnectContext 對(duì)象栅贴,并被提交給 ConnectScheduler斟湃。ConnectScheduler 會(huì)維護(hù)一個(gè)線程池,每個(gè) ConnectContext 會(huì)在線程池中由一個(gè) ConnectProcessor 線程處理檐薯。
* MysqlServer 類 Listener 處理:
```
private class Listener implements Runnable {
? ? ? ? @Override
? ? ? ? public void run(){while (running && serverChannel.isOpen()) {
? ? ? ? ? ? ? ? SocketChannel clientChannel;
? ? ? ? ? ? ? ? try {clientChannel = serverChannel.accept();
? ? ? ? ? ? ? ? ? ? if (clientChannel == null) {continue;}
? ? ? ? ? ? ? ? ? ? // 構(gòu)建 ConnectContext 對(duì)象
? ? ? ? ? ? ? ? ? ? ConnectContext context = new ConnectContext(clientChannel);
? ? ? ? ? ? ? ? ? ? // catelog 日志
? ? ? ? ? ? ? ? ? ? context.setCatalog(Catalog.getCurrentCatalog());
? ? ? ? ? ? ? ? ? ? // 向 ExecutorService 提交 new LoopHandler(context) ==>(源碼)executor.submit(new LoopHandler(context))
? ? ? ? ? ? ? ? ? ? if (!scheduler.submit(context)) {LOG.warn("Submit one connect request failed. Client=" + clientChannel.toString());
? ? ? ? ? ? ? ? ? ? ? ? // clear up context
? ? ? ? ? ? ? ? ? ? ? ? context.cleanup();}
? ? ? ? ? ? ? ? } catch (IOException e) {
? ? ? ? ? ? ? ? ? ? // ClosedChannelException
? ? ? ? ? ? ? ? ? ? // AsynchronousCloseException
? ? ? ? ? ? ? ? ? ? // ClosedByInterruptException
? ? ? ? ? ? ? ? ? ? // Other IOException, for example "to many open files" ...
? ? ? ? ? ? ? ? ? ? LOG.warn("Query server encounter exception.", e);
? ? ? ? ? ? ? ? ? ? try {Thread.sleep(100);
? ? ? ? ? ? ? ? ? ? } catch (InterruptedException e1) {// Do nothing}
? ? ? ? ? ? ? ? } catch (Throwable e) {
? ? ? ? ? ? ? ? ? ? // NotYetBoundException
? ? ? ? ? ? ? ? ? ? // SecurityException
? ? ? ? ? ? ? ? ? ? LOG.warn("Query server failed when calling accept.", e);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }
```
* ExecutorService 線程 LoopHandler 處理:
```
@Override
? ? ? ? public void run() {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? // Set thread local info
? ? ? ? ? ? ? ? context.setThreadLocalInfo();
? ? ? ? ? ? ? ? context.setConnectScheduler(ConnectScheduler.this);
? ? ? ? ? ? ? ? // authenticate check failed.
? ? ? ? ? ? ? ? if (!MysqlProto.negotiate(context)) {return;}
? ? ? ? ? ? ? ? if (registerConnection(context)) {MysqlProto.sendResponsePacket(context);
? ? ? ? ? ? ? ? } else {context.getState().setError(ErrorCode.ERR_USER_LIMIT_REACHED, "Reach limit of connections");
? ? ? ? ? ? ? ? ? ? MysqlProto.sendResponsePacket(context);
? ? ? ? ? ? ? ? ? ? return;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? context.setStartTime();
? ? ? ? ? ? ? ? ConnectProcessor processor = new ConnectProcessor(context);
? ? ? ? ? ? ? ? processor.loop();} catch (Exception e) {
? ? ? ? ? ? ? ? // for unauthorized access such lvs probe request, may cause exception, just log it in debug level
? ? ? ? ? ? ? ? if (context.getCurrentUserIdentity() != null){LOG.warn("connect processor exception because", e);
? ? ? ? ? ? ? ? } else {LOG.debug("connect processor exception because", e);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } finally {unregisterConnection(context);
? ? ? ? ? ? ? ? context.cleanup();}
? ? ? ? }
```
* processOnce(讀取 Mysql 客戶端的 sql) 方法
```
// 處理 mysql 的請(qǐng)求
? ? public void processOnce()throws IOException {ctx.getState().reset();
? ? ? ? executor = null;
? ? ? ? // 重置 MySQL 協(xié)議的序列號(hào)
? ? ? ? final MysqlChannel channel = ctx.getMysqlChannel();
? ? ? ? channel.setSequenceId(0);
? ? ? ? // 從通道讀取數(shù)據(jù)包 ==>SQL
? ? ? ? try {packetBuf = channel.fetchOnePacket();
? ? ? ? ? ? if (packetBuf == null) {LOG.warn("Null packet received from network. remote: {}", channel.getRemoteHostPortString());
? ? ? ? ? ? ? ? throw new IOException("Error happened when receiving packet.");
? ? ? ? ? ? }
? ? ? ? } catch (AsynchronousCloseException e) {
? ? ? ? ? ? // when this happened, timeout checker close this channel
? ? ? ? ? ? // killed flag in ctx has been already set, just return
? ? ? ? ? ? return;
? ? ? ? }
? ? ? ? // 下發(fā) SQL
? ? ? ? dispatch();
? ? ? ? // finalize
? ? ? ? finalizeCommand();
? ? ? ? ctx.setCommand(MysqlCommand.COM_SLEEP);
? ? }
```
#### (二)Parse
ConnectProcessor 接收到 SQL 之后會(huì)進(jìn)行 analyze 凝赛,Apache Doris SQL 解析使用的 Parse 是 Java CUP Parser,語(yǔ)法規(guī)則 定義的文件在 sql_parser.cup厨剪。
> 感興趣的同學(xué)可以詳細(xì)看一下 StatementBase 類
* analyze 方法, 返回 List<StatementBase> (這里主要是語(yǔ)法解析)
```
// 解析 origin哄酝,返回 list<stmt>
? ? private List<StatementBase> analyze(String originStmt) throws AnalysisException, DdlException {LOG.debug("the originStmts are: {}", originStmt);
? ? ? ? // 使用 CUP&FLEX 生成的解析器解析語(yǔ)句
? ? ? ? SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());
? ? ? ? SqlParser parser = new SqlParser(input);
? ? ? ? try {return SqlParserUtils.getMultiStmts(parser);
? ? ? ? } catch (Error e) {throw new AnalysisException("Please check your sql, we meet an error when parsing.", e);
? ? ? ? } catch (AnalysisException | DdlException e) {String errorMessage = parser.getErrorMsg(originStmt);
? ? ? ? ? ? LOG.debug("origin stmt: {}; Analyze error message: {}", originStmt, parser.getErrorMsg(originStmt), e);
? ? ? ? ? ? if (errorMessage == null) {throw e;} else {throw new AnalysisException(errorMessage, e);
? ? ? ? ? ? }
? ? ? ? } catch (Exception e) {// TODO(lingbin): we catch 'Exception' to prevent unexpected error,
? ? ? ? ? ? // should be removed this try-catch clause future.
? ? ? ? ? ? throw new AnalysisException("Internal Error, maybe syntax error or this is a bug");
? ? ? ? }
? ? }
```
因?yàn)楸疚闹v述的是查詢語(yǔ)句(不同類型會(huì)轉(zhuǎn)換成不通 Stmt友存,比如 InsertStmt, ShowStmt, SetStmt, AlterStmt, AlterTableStmt, CreateTableStmt 等)祷膳,最后我們會(huì)得到 QueryStmt,originStmt 會(huì)轉(zhuǎn)換成 QueryStmt屡立,QueryStmt 通常是用 SelectList, FromClause, wherePredicate, GroupByClause, havingPredicate, OrderByElement, LimitElement 組成
#### (三)Analyze
SQL 語(yǔ)句被解析成 AST 之后直晨,會(huì)被交給 StmtExecutor 。StmtExecutor 會(huì)首先對(duì) AST 進(jìn)行語(yǔ)法和語(yǔ)義分析膨俐,大概會(huì)做下面的事情:
1. 檢查并綁定 Cluster, Database, Table, Column 等元信息勇皇。
2. SQL 的合法性檢查:窗口函數(shù)不能 DISTINCT,HLL 和 Bitmap 列不能 sum, count, where 中不能有 grouping 操作等焚刺。
3. SQL 重寫:比如將 select * 擴(kuò)展成 select 所有列敛摘,count distinct 查詢重寫等。
4. Table 與 Column 別名處理乳愉。
5. 為 Tuple, Slot, Expr 等分配唯一 ID兄淫。
6. 函數(shù)參數(shù)的合法性檢測(cè)。
7. 表達(dá)式替換蔓姚。
8. 類型檢查捕虽,類型轉(zhuǎn)換(BIGINT 和 DECIMAL 比較,BIGINT 類型需要 Cast 成 DECIMAL)坡脐。
主要代碼:
```
analyzeAndGenerateQueryPlan 方法 -->? parsedStmt.analyze(analyzer);
```
#### (四)Rewrite
* analyzeAndGenerateQueryPlan 方法(部分代碼泄私,此處不做重點(diǎn)講解)
StmtExecutor 在對(duì) AST 進(jìn)行語(yǔ)法和語(yǔ)義分析后,會(huì)讓 ExprRewriter 根據(jù) ExprRewriteRule 進(jìn)行一次 Rewrite。目前 Doris 的重寫規(guī)則比較簡(jiǎn)單晌端,主要是進(jìn)行了常量表達(dá)式的化簡(jiǎn)和謂詞的簡(jiǎn)單處理捅暴。 常量表達(dá)式的化簡(jiǎn)是指 1 + 1 + 1 重寫成 3,1 > 2 重寫成 Flase 等伶唯。
如果重寫后,有部分節(jié)點(diǎn)被成功改寫惧盹,比如乳幸, 1 > 2 被改寫成 Flase,那么就會(huì)再觸發(fā)一次語(yǔ)法和語(yǔ)義分析的過(guò)程钧椰。
對(duì)于有子查詢的 SQL粹断,StmtRewriter 會(huì)進(jìn)行重寫,比如將 where in, where exists 重寫成 semi join, where not in, where not exists 重寫成 anti join嫡霞。
```
if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt) {ExprRewriter rewriter = analyzer.getExprRewriter();
? ? ? ? ? ? rewriter.reset();
? ? ? ? ? ? if (context.getSessionVariable().isEnableFoldConstantByBe()) {parsedStmt.foldConstant(rewriter);
? ? ? ? ? ? }
? ? ? ? ? ? // explan 標(biāo)簽
? ? ? ? ? ? ExplainOptions explainOptions = parsedStmt.getExplainOptions();
? ? ? ? ? ? boolean reAnalyze = false;
? ? ? ? ? ? parsedStmt.rewriteExprs(rewriter);
? ? ? ? ? ? reAnalyze = rewriter.changed();
? ? ? ? ? ? if (analyzer.containSubquery()) {parsedStmt = StmtRewriter.rewrite(analyzer, parsedStmt);
? ? ? ? ? ? ? ? reAnalyze = true;
? ? ? ? ? ? }
? ? ? ? ? ? if (parsedStmt instanceof SelectStmt) {if (StmtRewriter.rewriteByPolicy(parsedStmt, analyzer)) {reAnalyze = true;}
? ? ? ? ? ? }
? ? ? ? ? ? if (parsedStmt instanceof SetOperationStmt) {List<SetOperationStmt.SetOperand> operands = ((SetOperationStmt) parsedStmt).getOperands();
? ? ? ? ? ? ? ? for (SetOperationStmt.SetOperand operand : operands) {if (StmtRewriter.rewriteByPolicy(operand.getQueryStmt(), analyzer)){reAnalyze = true;}
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? if (parsedStmt instanceof InsertStmt) {QueryStmt queryStmt = ((InsertStmt) parsedStmt).getQueryStmt();
? ? ? ? ? ? ? ? if (queryStmt != null && StmtRewriter.rewriteByPolicy(queryStmt, analyzer)) {reAnalyze = true;}
? ? ? ? ? ? }
? ? ? ? ? ? if (reAnalyze) {
? ? ? ? ? ? ? ? // 對(duì)重寫語(yǔ)句進(jìn)行處理
? ? ? ? ? ? ? ? List<Type> origResultTypes = Lists.newArrayList();
? ? ? ? ? ? ? ? for (Expr e : parsedStmt.getResultExprs()) {origResultTypes.add(e.getType());
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? List<String> origColLabels =
? ? ? ? ? ? ? ? ? ? ? ? Lists.newArrayList(parsedStmt.getColLabels());
? ? ? ? ? ? ? ? // 重寫語(yǔ)句進(jìn)行 analyzer
? ? ? ? ? ? ? ? analyzer = new Analyzer(context.getCatalog(), context);
? ? ? ? ? ? ? ? // 重寫語(yǔ)句 analyzer 信息
? ? ? ? ? ? ? ? parsedStmt.reset();
? ? ? ? ? ? ? ? parsedStmt.analyze(analyzer);
? ? ? ? ? ? ? ? // 恢復(fù)原始結(jié)果類型和列標(biāo)簽
? ? ? ? ? ? ? ? parsedStmt.castResultExprs(origResultTypes);
? ? ? ? ? ? ? ? parsedStmt.setColLabels(origColLabels);
? ? ? ? ? ? ? ? if (LOG.isTraceEnabled()) {LOG.trace("rewrittenStmt:" + parsedStmt.toSql());
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? if (explainOptions != null) {parsedStmt.setIsExplain(explainOptions);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
```
#### (五)SingleNodePlan
經(jīng)過(guò) parse瓶埋、Analyze、Rewrite 階段后诊沪,AST 會(huì)生成 singleNodePlanner养筒,源碼如下:
```
singleNodePlanner = new SingleNodePlanner(plannerContext);
PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
```
單機(jī) Plan 由 SingleNodePlanner 執(zhí)行,輸入是 AST端姚,輸出是單機(jī)物理執(zhí)行 Plan, Plan 中每個(gè)節(jié)點(diǎn)是一個(gè) PlanNode晕粪。
SingleNodePlanner 核心任務(wù)就是根據(jù) AST 生成 OlapScanNode, AggregationNode, HashJoinNode, SortNode, UnionNode 等。
Doris 在生成單機(jī) Plan 的時(shí)候主要進(jìn)行了以下**工作或優(yōu)化** :
1. Slot 物化:指確定一個(gè)表達(dá)式對(duì)應(yīng)的列需要 Scan 和計(jì)算渐裸,比如聚合節(jié)點(diǎn)的聚合函數(shù)表達(dá)式和 Group By 表達(dá)式需要進(jìn)行物化
```
//Slot物化巫湘,處理 Base表
analyzer.materializeSlots(queryStmt.getBaseTblResultExprs());
// Slot物化 處理 where 語(yǔ)句的子查詢
selectStmt.materializeRequiredSlots(analyzer);
```
2. 投影下推:BE 在 Scan 時(shí)只會(huì) Scan 必須讀取的列
```
? ? projectPlanNode(resultSlotIds, root);
```
3. 謂詞下推:在滿足語(yǔ)義正確的前提下將過(guò)濾條件盡可能下推到 Scan 節(jié)點(diǎn)
```
? ? pushDownPredicates(analyzer, selectStmt);
```
4. 分區(qū),分桶裁剪:比如建表時(shí)按照 UserId 分桶昏鹃,每個(gè)分區(qū) 100 個(gè)分桶尚氛,那么當(dāng)不包含 or 的 Filter 條件包含 UserId ==xxx 時(shí),Doris 就只會(huì)將查詢發(fā)送 100 個(gè)分桶中的一個(gè)發(fā)送給 BE洞渤,可以大大減少不必要的數(shù)據(jù)讀取
5. Join Reorder:對(duì)于 join操作阅嘶,在保證結(jié)果不變的情況,通過(guò)規(guī)則計(jì)算最優(yōu)(最少資源)join 操作载迄。
```
? ? createCheapestJoinPlan(analyzer, refPlans);
```
6. Sort + Limit 優(yōu)化成 TopN(FE 進(jìn)行useTopN標(biāo)識(shí)讯柔,BE標(biāo)識(shí)執(zhí)行)
```
? ? root = new SortNode(ctx_.getNextNodeId(), root, stmt.getSortInfo(),useTopN, limit == -1, stmt.getOffset());
```
7. MaterializedView 選擇:會(huì)根據(jù)查詢需要的列,過(guò)濾宪巨,排序和 Join 的列磷杏,行數(shù),列數(shù)等因素選擇最佳的 MaterializedView
```
? ? boolean selectFailed = singleNodePlanner.selectMaterializedView(queryStmt, analyzer);
```
8. 向量化執(zhí)行引擎選擇:基于現(xiàn)代CPU的特點(diǎn)與火山模型的執(zhí)行特點(diǎn)捏卓,重新設(shè)計(jì)列式存儲(chǔ)系統(tǒng)的SQL執(zhí)行引擎极祸,從而提高了CPU在SQL執(zhí)行時(shí)的效率慈格,提升了SQL查詢的性能。
```
? ? if (VectorizedUtil.isVectorized()) {
? ? ? ? ? ? singleNodePlan.convertToVectoriezd();
? ? }
```
9. Runtime Filter Join:Doris 在進(jìn)行 Hash Join 計(jì)算時(shí)會(huì)在右表構(gòu)建一個(gè)哈希表遥金,左表流式的通過(guò)右表的哈希表從而得出 Join 結(jié)果浴捆。而 RuntimeFilter 就是充分利用了右表的 Hash 表,在右表生成哈希表的時(shí)稿械,同時(shí)生成一個(gè)基于哈希表數(shù)據(jù)的一個(gè)過(guò)濾條件选泻,然后下推到左表的數(shù)據(jù)掃描節(jié)點(diǎn)
```
RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot());
```
創(chuàng)建 **singleNodePlanner** 主要代碼:**createSingleNodePlan()**
#### (六)DistributedPlan
分布式查詢計(jì)劃 PlanFragmentTree ,每個(gè) PlanFragment 是由
PlanNodeTree 的子樹 和 Sink 節(jié)點(diǎn)組成的美莫。分布式化的目標(biāo)是最小化數(shù)據(jù)移動(dòng)和最大化本地 Scan页眯。
分布式查詢計(jì)劃 PlanFragmentTree ,每個(gè) PlanFragment 是由
PlanNodeTree 的子樹 和 Sink 節(jié)點(diǎn)組成的厢呵。分布式化的目標(biāo)是最小化數(shù)據(jù)移動(dòng)和最大化本地 Scan窝撵。
每個(gè) PlanFragment 由 PlanNodeTree 和 Data Sink 組成,我們從上圖的 Plan Fragment 2 可以看出襟铭,由 AggregationNode碌奉、HashJoinNode 和 DataSink。Plan 分布式化的方法是增加 ExchangeNode寒砖,執(zhí)行計(jì)劃樹會(huì)以 ExchangeNode 為邊界拆分為 PlanFragment赐劣。
ExchangeNode 主要是用于 BE 之間的數(shù)據(jù)交換與共享,類似 Spark 和 MR 中的 Shuffle哩都。
各個(gè) Fragment 的數(shù)據(jù)流轉(zhuǎn)和最終的結(jié)果發(fā)送依賴:DataSink魁兼。比如 DataStreamSink 會(huì)將一個(gè) Fragment 的數(shù)據(jù)發(fā)送到另一個(gè) Fragment 的 ExchangeNode,ResultSink 會(huì)將查詢的結(jié)果集發(fā)送到 FE茅逮。
每個(gè) PlanFragment 可以在每個(gè) BE 節(jié)點(diǎn)生成 1 個(gè)或多個(gè)執(zhí)行實(shí)例璃赡,不同執(zhí)行實(shí)例處理不同的數(shù)據(jù)集判哥,通過(guò)并發(fā)來(lái)提升查詢性能献雅。
DistributedPlanner 中最主要的工作是決定 Join 的分布式執(zhí)行策略:Shuffle Join,Bucket Join塌计,Broadcast Join挺身,Colocate Join,和增加 Aggregation 的 Merge 階段锌仅。
決定 Join 的分布式執(zhí)行策略的邏輯如下:
如果兩種表示 Colocate Join 表章钾,且 Join 的 Key 和分桶的 Key 一致,且兩張表沒有正在數(shù)據(jù) balance热芹,就會(huì)執(zhí)行 Colocate Join
如果 Join 的右表比較少贱傀,集群節(jié)點(diǎn)數(shù)較少,計(jì)算出的 Broadcast Join 成本較低伊脓,就會(huì)選擇 Broadcast Join府寒,否則就會(huì)選擇 Shuffle Join。
如果兩種表示 Colocate Join 表,且 Join 的 Key 和分桶的 Key 一致株搔,且兩張表沒有正在數(shù)據(jù) balance剖淀,就會(huì)執(zhí)行 Colocate Join
如果 Join 的右表比較少,集群節(jié)點(diǎn)數(shù)較少纤房,計(jì)算出的 Broadcast Join 成本較低纵隔,就會(huì)選擇 Broadcast Join,否則就會(huì)選擇 Shuffle Join炮姨。
#### (七)Schedule
生成了 Plan Fragment Tree 之后捌刮,Apache Doris FE 通過(guò) Coordinator 類對(duì) Fragment 進(jìn)行分配、分發(fā)步驟舒岸,主要涉及的方法有:computeScanRangeAssignment()糊啡、computeFragmentExecParams()、sendFragment()吁津。
生成了 Plan Fragment Tree 之后棚蓄,Apache Doris FE 通過(guò) Coordinator 類對(duì) Fragment 進(jìn)行分配、分發(fā)步驟碍脏,主要涉及的方法有:computeScanRangeAssignment()梭依、computeFragmentExecParams()、sendFragment()典尾。
* computeScanRangeAssignment():主要邏輯對(duì)fragment合理分配役拴,盡可能保證每個(gè)BE節(jié)點(diǎn)的請(qǐng)求都是平均。
* computeFragmentExecParams():處理Fragment執(zhí)行參數(shù)钾埂。
* sendFragment():發(fā)送Fragment至BE節(jié)點(diǎn)河闰,
#### (八)Execute
Doris 的查詢執(zhí)行模式 Volcano 模式,不過(guò)做了 Batch 的優(yōu)化褥紫,不同的 operator 之間以 RowBatch 的方式傳輸數(shù)據(jù)姜性。
BE 的 BackendService 會(huì)接收 FE 的 查詢請(qǐng)求,讓 FragmentMgr 進(jìn)行處理髓考。 FragmentMgr::exec_plan_fragment 會(huì)啟動(dòng)一個(gè)線程由 PlanFragmentExecutor 具體執(zhí)行一個(gè) plan fragment部念。PlanFragmentExecutor 會(huì)根據(jù) plan fragment 創(chuàng)建一個(gè) ExecNode 樹,F(xiàn)E 每個(gè) PlanNode 都會(huì)對(duì)應(yīng) ExecNode 的一個(gè)子類氨菇。
PlanFragmentExecutor::get_next_internal 會(huì)驅(qū)動(dòng)整個(gè) ExecNode 樹的執(zhí)行儡炼,會(huì)自頂向下調(diào)用每個(gè) ExecNode 的 get_next 方法,最終數(shù)據(jù)會(huì)從 ScanNode 節(jié)點(diǎn)產(chǎn)生查蓉,向上層節(jié)點(diǎn)傳遞乌询,每個(gè)節(jié)點(diǎn)都會(huì)按照自己的邏輯處理 RowBatch。 PlanFragmentExecutor 在拿到每個(gè) RowBatch 后豌研,如果是中間結(jié)果妹田,就會(huì)將數(shù)據(jù)傳輸給其他 BE 節(jié)點(diǎn)竣灌,如果是最終結(jié)果,就會(huì)將數(shù)據(jù)傳輸給 FE 節(jié)點(diǎn)秆麸。
## 五初嘹、參考獻(xiàn)文
* Apache Doris Join原理
? > https://doris.apache.org/zh-CN/advanced/join-optimization/doris-join-optimization.html#doris-shuffle-%25E6%2596%25B9%25E5%25BC%258F
* Apache Doris 存儲(chǔ)層設(shè)計(jì)
? > https://doris.apache.org/zh-CN/article/articles/doris-storage-reader-compaction.html
* Apache Doris 元數(shù)據(jù)涉及
? > https://doris.apache.org/zh-CN/design/metadata-design.html#%25E5%2585%2583%25E6%2595%25B0%25E6%258D%25AE%25E7%25BB%2593%25E6%259E%2584
* Apache Doris 查詢?cè)?/p>
? > https://blog.bcmeng.com/post/apache-doris-query.html#doris-query-%25E6%2589%25A7%25E8%25A1%258C
* [Apache Doris Join原理](https://doris.apache.org/zh-CN/advanced/join-optimization/doris-join-optimization.html#doris-shuffle-%25E6%2596%25B9%25E5%25BC%258F)
* [Apache Doris 存儲(chǔ)層設(shè)計(jì)](https://doris.apache.org/zh-CN/article/articles/doris-storage-reader-compaction.html)
* [Apache Doris 元數(shù)據(jù)涉及](https://doris.apache.org/zh-CN/design/metadata-design.html#%25E5%2585%2583%25E6%2595%25B0%25E6%258D%25AE%25E7%25BB%2593%25E6%259E%2584)
* [Apache Doris 查詢?cè)韂(https://blog.bcmeng.com/post/apache-doris-query.html#doris-query-%25E6%2589%25A7%25E8%25A1%258C)
## 六、實(shí)踐分享
* [Apache Doris 在網(wǎng)易互娛的應(yīng)用實(shí)踐](https://mp.weixin.qq.com/s/3gQiN6trYmmXVsuZVXNl5Q)
* [Apache Doris 在知乎用戶畫像與實(shí)時(shí)數(shù)據(jù)的架構(gòu)與實(shí)踐](https://mp.weixin.qq.com/s/i5qbiKN6ruOk2Snpyy6DBw)
* [Apache Doris 物化視圖與索引在京東的典型應(yīng)用](https://mp.weixin.qq.com/s/3WAdi40yg7dRt2QNWcTARw)
* [Apache Doris Join 實(shí)現(xiàn)與調(diào)優(yōu)實(shí)踐](https://mp.weixin.qq.com/s/pukjERSOW-D-BM4z1G9JlA)
## 七沮趣、總結(jié)
本文主要介紹查詢 SQL 在 Apache Doris Fe 節(jié)點(diǎn)經(jīng)歷 parse屯烦、analyze、rewrite房铭、GenerateQueryPlan驻龟、schedule、send 等階段處理缸匪。Apache Doris Fe 的 parse翁狐、analyze、rewrite 階段和其他數(shù)據(jù)庫(kù)處理過(guò)程差不多凌蔬,本文主要講解的核心是 GenerateQueryPlan露懒、schedule、send 階段的原理砂心。我們可以深度了解 Apache Doris Fe 節(jié)點(diǎn)對(duì)查詢 SQL 的優(yōu)化操作懈词,以及未來(lái)遇到相關(guān)性能問(wèn)題不會(huì)無(wú)從下手。