寫在前面
本文是對CockRoachDB的設計文檔:https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20160421_distributed_sql.md
的翻譯类咧。
內(nèi)容目錄
- Table of Contents
- Summary
- Motivation
- Detailed design
- A more complex example: Daily Promotion
- Implementation strategy
- Alternate approaches considered (and rejected)
- Unresolved questions
概述
在該設計文檔中琼讽,我們提出了一種分布式執(zhí)行SQL處理的方法番枚,本文的目的是為了發(fā)起一次分布式SQL執(zhí)行的討論碑韵,而不是一個完整的詳細設計文檔擒权。
詞匯
- KV - CockRoachDB的KV系統(tǒng)巨税,由CockRoachDB的key-value达吞、range和batch API構成
- k/v - 指一個key-value對,通常被用來引用KV系統(tǒng)中的一個entry绸罗。
- Node - 集群中的一臺服務器意推。
- Client / Client-side - SQL客戶端。
- Gateway node / Gateway-side - 集群中首先接收到客戶端SQL查詢的節(jié)點从诲。
- Leader node / Leader-side - 集群中直接進行KV操作并且針對本地KV數(shù)據(jù)進行本地訪問的節(jié)點左痢。
本文的大部分內(nèi)容都是從gateway-side的視角進行描述的,gateway-side上負責進行查詢的解析和并行執(zhí)行俊性。
動機
通過distsql我們期望帶來如下提升和改進:
- remote-side filter(源端過濾)
當通過過濾條件查詢一些數(shù)據(jù)的時候定页,我們目前都是先將特定range中的所有key對應的數(shù)據(jù)通過網(wǎng)絡全部都拿到gateway node上來,然后在gateway node上再對這些數(shù)據(jù)進行過濾杭煎。可以看出這種方式太低效了卒落,占用了大量的網(wǎng)絡資源儡毕,因此我們想要將這些過濾條件下推到源端(leaseholder)雷恃,從而極大的節(jié)省網(wǎng)絡資源倒槐,并提升數(shù)據(jù)處理效率。
在源端不需要支持所有的sql表達式-源端只需要支持常用表達式的一個子集(例如:所有的操作都可以轉換成基于string字符串的操作)谎痢,只要源端支持的這些子集能夠完成數(shù)據(jù)讀取過程中最終的過濾條件即可。
- 源端更新和刪除
對于例如: UPDATE .. WHERE
和 DELETE .. WHERE
這樣的語句,我們目前的實現(xiàn)方式是:先根據(jù)where條件查詢到相應的數(shù)據(jù)并返回到gateway上浸间,然后在gateway上執(zhí)行update或者delete操作。這種實現(xiàn)方式涉及到了太多次的網(wǎng)絡傳輸兜看,性能低下细移;因此我們希望直接在可以訪問到數(shù)據(jù)的node上直接執(zhí)行update或者delete操作雪侥,這樣就可以極大地節(jié)省網(wǎng)絡傳輸速缨,進而提升性能。
這里引谜,也不需要實現(xiàn)所有的SQL語法,但是除了實現(xiàn)一些簡單的過濾表達式之外贝室,我們還需要實現(xiàn)一些其他的語法(例如:UPDATE
中經(jīng)常使用的用于計算新值的方法和函數(shù)等)
- 分布式SQL操作
現(xiàn)在的sql操作都是在一個節(jié)點上進行處理的,所以sql操作的性能并不能通過集群規(guī)模的增大而提升峡迷。我們想要在多個node上對sql操作進行分布式的處理 (sql操作的性能就可以與集群規(guī)模正相關)。
3.1. Distributed joins
Join:多個table基于關聯(lián)列進行查詢夯辖,通常關聯(lián)列的關聯(lián)條件是“相等”圆米,然后返回相關結果榨咐。
一種分布式出列這種計算的策略就是基于hash的分布式:
選擇K個node(這K個node中的每個node,我們都稱之為:k-node)数焊,集群中可以訪問到數(shù)據(jù)的各個node,將訪問到的數(shù)據(jù)根據(jù)關聯(lián)列中的值進行hash干厚,
然后根據(jù)hash值對K取模后的值,確定將每行數(shù)據(jù)發(fā)送到哪個k-node上(這樣就可以保證關聯(lián)列的值相同的行,肯定可以發(fā)送到同一個k-node上)闲先。
Hash-join在F1中被廣泛采用。
Distributed joins 和 remote-side filtering 可以一起使用:
例如下面的SQL語句將會查詢出所有在用戶生日附近產(chǎn)生的訂單训桶。
注意:本查詢語句需要對最后的結果進行過濾。因為若只采用簡單的相等過濾條件,那么該過濾條件將會下推到源端進行過濾箱叁。
SELECT * FROM Customers c INNER JOIN Orders o ON c.ID = i.CustomerID
WHERE DayOfYear(c.birthday) - DayOfYear(o.date) < 7
3.2. Distributed aggregation
當使用`GROUP BY`時耕漱,我們根據(jù)一組列或者表達式來分組計算出每組的結果〖梭希可以采用與hash-join類似的思想進行aggregation的分布式處理,只不過計算hash值的因子變?yōu)榱朔纸M元素
- Distributed sorting
當進行排序時,我們希望能夠進行分布式排序钥屈,從而分散排序的壓力。各個node可以對自己的數(shù)據(jù)集先進行排序腻脏,然后有一個或多個節(jié)點將各個部分排序的結果進行合并。
設計細節(jié)
Overview
實現(xiàn)方法最初的靈感來自Sawzall-- Rob Pike等人在google的一個項目。Sawzall提出了一個“框架”(高級語言解釋器)來減少MapReduce的使用炕桨。它的主要創(chuàng)新是通過一個簡潔的語法來定義“本地”進程,來處理一部分本地數(shù)據(jù)并產(chǎn)生零個或多個結果(這些結果被轉換為Map邏輯);然后采用另一種語法從本地轉換中獲取結果并以不同的方式聚合它們(這被轉換為Reduce邏輯)姊途。簡而言之:MapReduce + high-level syntax + new terminology(以更簡單的方式實現(xiàn)分布式計算)立叛。
我們提出了一些與Map-Reduce類似,但與Map-Reduce的執(zhí)行模型又完全不同的概念。
- 一組預定義的聚合器秽浇,通過這些聚合器提供SQL所需的功能。大多數(shù)聚合器都是可配置的斑举,但不是完全可編程的。
- 一個特殊的聚合器,即“evaluator”励两,可以使用非常簡單的語言進行編程,但僅限于一次操作一行數(shù)據(jù)盲憎。
- 將聚合器的結果路由到查詢管道中的下一個聚合器。
- 一個允許以與數(shù)據(jù)位置無關的方式編譯SQL的邏輯模型儿惫,但它需要獲得足夠多的信息以便我們可以進行分布式計算留搔。
除了累積或聚合數(shù)據(jù)之外,聚合器還可以將其結果提供給另一個node或node集合,也有可能作為其他程序的輸入。具有批量處理結果數(shù)據(jù)和執(zhí)行KV命令等特殊功能的聚合器可用于讀取數(shù)據(jù)或對數(shù)據(jù)庫進行更新捞烟。
關鍵思想就是:是我們可以將SQL映射成邏輯模型,然后我們可以將該邏輯模型轉換為分布式執(zhí)行計劃。
Logical model 和 logical plans
我們將SQL編譯成 logical plan (從表面來看竞思,類似于當前的planNode
樹),logical plan代表著各個計算階段的抽象數(shù)據(jù)處理流。 logical plan與數(shù)據(jù)在集群中分區(qū)和分布無關;但是惦界,它包含足夠多的有關計劃計算結構的信息雾消,后續(xù)我們可以利用這些信息進行數(shù)據(jù)并行處理 - 在后續(xù)階段狂窑,logical plan將轉換為physical plan:將抽象計算和數(shù)據(jù)流映射成具體的數(shù)據(jù)處理器和它們之間的數(shù)據(jù)傳輸通道蛉幸。
logical plan由聚合器(aggregators)組成。每個聚合器消費行的input stream(或者更多用于join的流)并產(chǎn)生行的output stream匹层。每行都是含有多個列值的元組;輸入和輸出流都有一個對應的schema信息。schema是一組column和type,每行都具有各個列的基準洋只。在這里妒茬,我們強調(diào)stream是邏輯概念肛循,可能不會映射到實際計算中的單個數(shù)據(jù)流。
我們引入了grouping的概念來表征聚合器內(nèi)部發(fā)生的分組計算夹孔。這些groups是基于group key定義的只怎,group key是輸入stream schema中列的子集佑菩。對每個group進行的計算獨立于其他group中的數(shù)據(jù),并且聚合器會將所有組結果串聯(lián)后進行輸出绞幌。各個不同的group結果之間順序不是固定的 - 一些聚合器可能保證某種順序帘营,而另一些則可能不保證问顷。
更確切地說,我們可以使用函數(shù)agg
在聚合器中定義計算,該函數(shù)采用單個group(相同的group key)中的一系列輸入行并生成一組輸出行扫外。聚合器的輸出是所有組的“agg”輸出的串聯(lián)狞贱,并按某種順序排列蝎毡。
當我們稍后決定如何對聚合器中的計算進行分布式處理時别垮,你會發(fā)現(xiàn)grouping特性有用:由于每個group的結果是獨立的毁靶,因此可以在不同節(jié)點上處理不同的組龙填。我們擁有的group越多越好。當使用單group聚合器(group key是空組列 - “group key:[]”,表示所有數(shù)據(jù)都在同一組中)時,不能進行分布式處理涤姊。另一方面次酌,不存在可以任意并行化的group聚合器剂公。請注意, no-grouping aggregators與group key為所有列的aggregators是不同的鳞上。no-grouping的聚合器是一個特殊但重要的情況怠硼,在這種情況下香璃,我們不會聚合多片數(shù)據(jù)这难,但我們可能會對各片數(shù)據(jù)進行過濾,轉換或重新排序增显。
聚合器可以使用SQL表達式雁佳,輸入多個input同云,并計算出相應的值。例如堵腹,所有聚合器都可以選擇使用output filter表達式 - 一個布爾函數(shù)炸站,用于丟棄原本屬于輸出流的元素。
一種特殊類型的聚合器是evaluator聚合器疚顷,它是一個“可編程”的聚合器旱易,它按順序處理輸入流(一次一個元素),可能會產(chǎn)生輸出元素腿堤。這是一個沒有分組的聚合器(group key是完整的列集);每行都是獨立處理阀坏。例如,一個evaluator可以使用求值程序從任意表達式生成新值(如SELECT a + b FROM ..
中的a + b
);或根據(jù)謂詞過濾行笆檀。
特殊table reader聚合器忌堂,沒有輸入,這種聚合器可以作為數(shù)據(jù)源;table reader可以配置為根據(jù)需要僅輸出某些列酗洒。還有一種特殊的** final **聚合器士修,這種聚合器沒有任何輸出,通常被用來表示查詢/語句的結果樱衷。
某些聚合器(final棋嘲,limit)在要求輸入流有序(要求1列或者多列升序/降序)。一些聚合器(如table reader)可以保證其輸出流上的某種排序矩桂,稱為ordering guarantee(與當前代碼中的orderingInfo
相同)沸移。所有聚合器都有一個相關的排序特征函數(shù)ord(input_order) - > output_order
,它將input_order
(輸入流??上的排序保證)映射到output_order
(輸出流的排序保證) - 意思是如果輸入流中的行是根據(jù)input_order
排序的,那么輸出流中的行將根據(jù)output_order
進行排序雹锣。
表讀取器的排序保證以及特征函數(shù)可用于在邏輯計劃中傳播排序信息流妻。當存在不匹配(聚合器具有與保證不匹配的排序要求)時,我們插入sorting aggregator - 這是一個非分組聚合器笆制,其輸出schema與輸入流中的重新排序元素的輸入schema相同绅这,無論輸入順序如何,都一定能提供保證輸出的順序在辆。我們可以在邏輯計劃級別進行排序來執(zhí)行優(yōu)化 - 我們可以將排序聚合器放在整個pipeline的早期证薇,或者將其拆分為多個節(jié)點(其中一個節(jié)點在前一階段執(zhí)行初步排序)。
我們使用一些簡單的查詢來介紹聚合器的主要類型匆篓。
例 1
TABLE Orders (OId INT PRIMARY KEY, CId INT, Value DECIMAL, Date DATE)
SELECT CID, SUM(VALUE) FROM Orders
WHERE DATE > 2015
GROUP BY CID
ORDER BY 1 - SUM(Value)
這是聚合器和流的潛在描述:
TABLE-READER src
Table: Orders
Table schema: Oid:INT, Cid:INT, Value:DECIMAL, Date:DATE
Output filter: (Date > 2015)
Output schema: Cid:INT, Value:DECIMAL
Ordering guarantee: Oid
AGGREGATOR summer
Input schema: Cid:INT, Value:DECIMAL
Output schema: Cid:INT, ValueSum:DECIMAL
Group Key: Cid
Ordering characterization: if input ordered by Cid, output ordered by Cid
EVALUATOR sortval
Input schema: Cid:INT, ValueSum:DECIMAL
Output schema: SortVal:DECIMAL, Cid:INT, ValueSum:DECIMAL
Ordering characterization:
ValueSum -> ValueSum and -SortVal
Cid,ValueSum -> Cid,ValueSum and Cid,-SortVal
ValueSum,Cid -> ValueSum,Cid and -SortVal,Cid
SQL Expressions: E(x:INT) INT = (1 - x)
Code {
EMIT E(ValueSum), CId, ValueSum
}
AGGREGATOR final:
Input schema: SortVal:DECIMAL, Cid:INT, ValueSum:DECIMAL
Input ordering requirement: SortVal
Group Key: []
Composition: src -> summer -> sortval -> final
請注意浑度,邏輯描述不包括排序聚合器。當加入排序操作的時候鸦概,這個初步的邏輯執(zhí)行計劃將會變成一個完整的邏輯計劃箩张。我們必須在最終之前插入一個排序聚合器:
src -> summer -> sortval -> sort(OrderSum) -> final
每個箭頭都表示一個邏輯上的數(shù)據(jù)流,這就是一個完整的邏輯執(zhí)行計劃窗市。
在這個例子中先慷,我們只有一個排序列,讓我們在看另一個例子咨察。
例 2
TABLE People (Age INT, NetWorth DECIMAL, ...)
SELECT Age, Sum(NetWorth) FROM People GROUP BY AGE ORDER BY AGE
初步的邏輯計劃如下:
TABLE-READER src
Table: People
Table schema: Age:INT, NetWorth:DECIMAL
Output schema: Age:INT, NetWorth:DECIMAL
Ordering guarantee: XXX // will consider different cases later
AGGREGATOR summer
Input schema: Age:INT, NetWorth:DECIMAL
Output schema: Age:INT, NetWorthSum:DECIMAL
Group Key: Age
Ordering characterization: if input ordered by Age, output ordered by Age
AGGREGATOR final:
Input schema: Age:INT, NetWorthSum:DECIMAL
Input ordering requirement: Age
Group Key: []
Composition: src -> summer -> final
summer
聚合器可以通過兩種方式執(zhí)行聚合操作-若輸入沒有按照age排序论熙,summer
聚合器將會使用一個無序的map來存儲輸入:每個age
一個數(shù)據(jù)實體,最終會以order亂序的方式產(chǎn)生輸出摄狱;若輸入是按照age
排序的脓诡,聚合器將會按照age的順序,每次sum一個age的數(shù)據(jù)媒役,產(chǎn)生的輸出也會與輸入中age
的排序一致祝谚。
我們看看下面兩種情況:
-
輸入按照
Age
排序在這種情況下,我們發(fā)現(xiàn)
summer
聚合器會保存并傳遞age的順序酣衷,這時我們不需要添加額外的排序聚合器交惯。 -
數(shù)據(jù)沒有按照任何字段排序
在這種情況下,
summer
聚合器不會保證輸出的任何順序并且我們需要在final
聚合器之前需要自己添加一個排序聚合器鸥诽。src -> summer -> sort(Age) -> final
我們應該意識到在
summer
聚合器執(zhí)行summer操作之前商玫,可以先對age進行排序:src -> sort(Age) -> summer -> final
上述兩種邏輯執(zhí)行計劃都沒有問題,我們可以任選其一牡借。
也有可能summer
使用一個有序的map拳昌,這樣可以保證所有的輸出結果都是按照age
排序的。這樣就可以保證:無論輸入是否有序钠龙,我們都可以保證輸出是基于某個字段有序的炬藤。
排序的反向傳遞
在前面的例子中御铃,我們看到可以通過table reader流中的排序以及排序保證來避免排序。初步的邏輯計劃將盡可能地保持順序沈矿,從而盡可能的減少額外的排序上真。
但是,在某些情況下羹膳,保持順序可能需要一些額外的開銷;某些聚合器可以配置為保持順序或不保持睡互。為了避免不必要地排序保持,在排序聚合器ready之后陵像,我們重新分析logical plan并盡可能的去掉對streams的排序就珠。具體來說,我們檢查每個logical stream(以相反的拓撲順序)并檢查若刪除其排序是否仍然可以得到正確的logical plan;這就導致了排序的反向傳播醒颖。
總結一下妻怎,邏輯執(zhí)行計劃有三個階段:
- 初步的邏輯計劃,盡可能保留排序泞歉,沒有排序節(jié)點
- 滿足排序的邏輯計劃逼侦,根據(jù)需要添加排序節(jié)點
- 最終的邏輯計劃,將所有排序操作統(tǒng)一腰耙,盡可能的優(yōu)化整個排序的性能榛丢,并減少性能消耗
例 3
TABLE v (Name STRING, Age INT, Account INT)
SELECT COUNT(DISTINCT(account)) FROM v
WHERE age > 10 and age < 30
GROUP BY age HAVING MIN(Name) > 'k'
TABLE-READER src
Table: v
Table schema: Name:STRING, Age:INT, Account:INT
Filter: (Age > 10 AND Age < 30)
Output schema: Name:STRING, Age:INT, Account:INT
Ordering guarantee: Name
AGGREGATOR countdistinctmin
Input schema: Name:String, Age:INT, Account:INT
Group Key: Age
Group results: distinct count as AcctCount:INT
MIN(Name) as MinName:STRING
Output filter: (MinName > 'k')
Output schema: AcctCount:INT
Ordering characterization: if input ordered by Age, output ordered by Age
AGGREGATOR final:
Input schema: AcctCount:INT
Input ordering requirement: none
Group Key: []
Composition: src -> countdistinctmin -> final
聚合器的類型
TABLE READER
是一個特殊的聚合器,它沒有input stream. 主要用來掃描一個table或者index 以及它需要讀取的schema信息沟优。和其他的aggregator一樣,它也可以通過配置搭配上一個可編程的output filter一起使用涕滋。EVALUATOR
是一個完全可編程的非分組的聚合器.它作用于每一個單獨的行。通過該聚合器可以刪除行或者任意地修改行挠阁。JOIN
對兩個輸入流進行連接,在某些列之間具有相等約束溯饵。聚合器分組只能作用于join列上侵俗。詳細信息請查看 Stream joins.JOIN READER
從input stream中根據(jù)指定的Key值進行點查》峥可以通過執(zhí)行KV讀劝ァ(潛在的遠程讀取)或者建立遠程數(shù)據(jù)流來進行Key值的點查啄巧。詳情請查看 Join-by-lookup 和 On-the-fly flows setup.MUTATE
執(zhí)行基于KV的插入/刪除/更新寻歧。SET OPERATION
可以將任意的值設置為OPTION。-
AGGREGATOR
在SQL語義中做聚合的聚合器秩仆。它會對每一行分組并且針對于每一組進行聚合操作码泛。分組是根據(jù)group key進行的,AGGREGATOR
可以配置為通過一個或者多個聚合函數(shù)進行聚合:SUM
COUNT
COUNT DISTINCT
DISTINCT
AGGREGATOR
的輸出schema信息包括group key, 生成的一些生成列以及原有的一些列晌缘】辗保可以添加一些輸出過濾規(guī)則(當然也可以不添加)針對group key和生成的值進行過濾维蒙。 (例如:可以對最終沒有輸出的值進行過濾). SORT
對輸入的數(shù)據(jù)針對配置的列進行排序纽门。需要注意:這是一個 no-grouping 的聚合器, 因此該聚合器可以分配給任意的數(shù)據(jù)生產(chǎn)者。當然悠夯,這也就意味著它不會產(chǎn)生任何的全局排序結果癌淮,而只是針對于每個數(shù)據(jù)流的內(nèi)部進行排序÷俨梗可以通過分組處理器的輸入同步器實現(xiàn)全局排序(例如:LIMIT
或者FINAL
)乳蓄。LIMIT
是一個 single-group aggregator,在讀取到limit指定的行數(shù)之后夕膀,就會停止讀入虚倒。INTENT-COLLECTOR
是在網(wǎng)關上執(zhí)行的single-group aggregator,它接收由MUTATE
聚合器生成的所有意圖产舞,并在內(nèi)存中跟蹤它們魂奥,直到提交事務為止。FINAL
是在網(wǎng)關上執(zhí)行的single-group aggregator易猫,用于收集查詢結果耻煤。此聚合器通過pg協(xié)議連接到客戶端。
從邏輯執(zhí)行計劃到物理執(zhí)行計劃
我們基于以下事實准颓,進行聚合器和邏輯流中的計算的分布式處理:
對于任何聚合器哈蝇,只要組的所有處理都發(fā)生在單個節(jié)點上,就可以將組劃分為子集并并行處理攘已。
聚合器的排序特征適用于具有特定順序的任一輸入流;聚合器的排序特征甚至可以用在邏輯節(jié)點上的多個并行計算實例:如果所有并行實例中的物理輸入流和邏輯輸入流中的順序一樣(在邏輯計劃中)买鸽,則所有實例中的物理輸出流將具有邏輯輸出流中保證的輸出順序。如果在后續(xù)階段中將這些流合并為單個流(合并排序)贯被,則該物理流將具有和邏輯流一樣的正確順序。
具有空分組鍵的聚合器(
limit
妆艘,final
)必須在單個節(jié)點上進行最終處理(但它們可以具有初始的分布式處理階段)彤灶。
因此,每個邏輯聚合器可以對應于多個分布式實例批旺,并且每個邏輯流可以對應于多個具有順序保證物理流幌陕。
我們可以基于一些簡單的規(guī)則進行分布式處理:
可以有多個實例從而進行并行處理,根據(jù)range分割;每個table reader實例由相關range的raft leader處理汽煮,并且該實例是 physical stream的開始搏熄。
streams繼續(xù)在程序中并行處理棚唆。當streams到達聚合器時,可以基于group key的值進行hash 散列將streams重新分布到到任意數(shù)量的node上心例。具有空group key的聚合器將只有一個實例(node)宵凌,并且根據(jù)期望的順序合并上游過來的多個輸入流。如上所述止后,每個物理流其實早已被排好序了(因為它們都對應于有序的邏輯流)瞎惫。
排序聚合器作用于每個physical stream(與logical stream的排序一致)。排序聚合器不會導致將結果合并到單個節(jié)點中译株。
需要注意:按照range的邊界進行分布式處理并不是為了保證結果正確的必要條件 - 即使在我們進行query plan的時候瓜喇,出現(xiàn)了range的分裂或者移動,也不會產(chǎn)生錯誤的結果歉糜。對于一些key的讀取可能較慢乘寒,這主要是因為對他們是遠程讀取的,但只要大多數(shù)時間匪补,大多數(shù)key都在本地讀取伞辛,性能都不是問題。
Assume that we run the Example 1 query on a Gateway node and the table has data that on two nodes A and B (i.e. these two nodes are masters for all the relevant range). The logical plan is:
假設我們在** Gateway **節(jié)點上運行示例1中的查詢叉袍,并且該表的數(shù)據(jù)分布在兩個節(jié)點:A和B上(即始锚,這兩個節(jié)點是所有相關range的主節(jié)點)。那么邏輯執(zhí)行計劃應該是:
原始查詢語句及表結構:
TABLE Orders (OId INT PRIMARY KEY, CId INT, Value DECIMAL, Date DATE)
SELECT CID, SUM(VALUE) FROM Orders
WHERE DATE > 2015
GROUP BY CID
ORDER BY 1 - SUM(Value)
邏輯執(zhí)行計劃:
TABLE-READER src
Table: Orders
Table schema: Oid:INT, Cid:INT, Value:DECIMAL, Date:DATE
Output filter: (Date > 2015)
Output schema: Cid:INT, Value:DECIMAL
Ordering guarantee: Oid
AGGREGATOR summer
Input schema: Cid:INT, Value:DECIMAL
Output schema: Cid:INT, ValueSum:DECIMAL
Group Key: Cid
Ordering characterization: if input ordered by Cid, output ordered by Cid
EVALUATOR sortval
Input schema: Cid:INT, ValueSum:DECIMAL
Output schema: SortVal:DECIMAL, Cid:INT, ValueSum:DECIMAL
Ordering characterization: if input ordered by [Cid,]ValueSum[,Cid], output ordered by [Cid,]-ValueSum[,Cid]
SQL Expressions: E(x:INT) INT = (1 - x)
Code {
EMIT E(ValueSum), CId, ValueSum
}
上面的邏輯計劃可以實例化為以下物理計劃:
physical plan中的每個方格都是一個 processor:
-
src
是一個table reader喳逛,執(zhí)行KV Get操作并形成行;它被用于讀取屬于相應節(jié)點的上的所有數(shù)據(jù)瞧捌。它在輸出行之前會首先通過Date> 2015
過濾器對結果進行過濾。 -
summer-stage1
is the first stage of thesummer
aggregator; its purpose is to do the aggregation it can do locally and distribute the partial results to thesummer-stage2
processes, such that all values for a certain group key (CId
) reach the same process (by hashingCId
to one of two "buckets").
summer-stage1
是summer
聚合器的第一個階段;它的作用是進行本地聚合并將部分結果分發(fā)到summer-stage2
進程润文,這樣某個group key(CId
)的所有值都會被分發(fā)到同一個目的端(summer-stage2的特定的process)(通過哈希CId
到兩個“桶”中的一個)姐呐。 -
summer-stage2
執(zhí)行實際的sum操作并輸出index (CId
) 和相應的sum值:ValueSum
。 -
sortval
計算出額外的列:SortVal
的值典蝌,以及CId
和ValueSum
曙砂。 -
sort
基于SortVal
的值對stream中的數(shù)據(jù)進行排序。 -
final
合并前面的兩個輸入數(shù)據(jù)流以產(chǎn)生最終的排序結果骏掀。
注意:summer
聚合器的第二階段不需要在相同的節(jié)點上運行;例如鸠澈,可以采用另外一種物理執(zhí)行計劃:可以在單個節(jié)點上執(zhí)行summer-stage2
processor:
所有的processor總是形成有向無環(huán)圖。
Processors
Processor 通常由三個部分組成:
- input synchronizer: 將所有的input streams 合并成單獨的一個data stream截驮。主要如以下幾種 input synchronizer:
* single-input (pass-through)
* unsynchronized:從所有輸入流傳遞行笑陈,任意交錯,不保證順序葵袭,也不做排序涵妥。
* ordered: input physical stream本身就已經(jīng)排好序了(與physical stream的順序一致);synchronizer小心交錯讀取流中的數(shù)據(jù),以便merged stream具有相同的順序(該過程實際上就是一個K路歸并排序)坡锡。
data processor :數(shù)據(jù)轉換或聚合邏輯的核心實現(xiàn)部分(在某些情況下執(zhí)行KV操作)蓬网。
-
output router : 將processor的輸出分成多個流;主要有以下幾種output router:
- single-output (pass-through)
- mirror: 將每一行都輸出到所有的output stream中
- hashing: 每行輸出到output stream窒所,根據(jù)應用于數(shù)據(jù)元組的某些元素的散列函數(shù)進行選擇目標output stream。
- by range: router配置了range信息(與特定的某個表有關)帆锋,并且能夠將行發(fā)送到相應的range的leaseholder節(jié)點(對于
JoinReader
節(jié)點很有用(將索引值發(fā)送到負責PK的節(jié)點)和INSERT
(將新行添加到它們的leaseholder上))吵取。
Joins
Join-by-lookup
join-by-lookup方法涉及從一個表接收數(shù)據(jù)并從另一個表中查找相應的行。它通常用于將索引與表連接窟坐,但它們可以在適當?shù)那闆r下可用于任何連接海渊,例如,將一個表中的少量行與另一個表的主鍵連接哲鸳。我們引入了一個TABLE-READER
的變體臣疑,它有一個輸入流。針對輸入流的每個元素都在另一個表或索引中進行點查徙菠,并在輸出中產(chǎn)生相應的值讯沈。在內(nèi)部,聚合器分批執(zhí)行查找婿奔,例如:
TABLE t (k INT PRIMARY KEY, u INT, v INT, INDEX(u))
SELECT k, u, v FROM t WHERE u >= 1 AND u <= 5
Logical plan:
TABLE-READER indexsrc
Table: t@u, span /1-/6
Output schema: k:INT, u:INT
Output ordering: u
JOIN-READER pksrc
Table: t
Input schema: k:INT, u:INT
Output schema: k:INT, u:INT, v:INT
Ordering characterization: preserves any ordering on k/u
AGGREGATOR final
Input schema: k:INT, u:INT, v:INT
indexsrc -> pksrc -> final
也可以用在join查詢中:
TABLE t1 (k INT PRIMARY KEY, v INT, INDEX(v))
TABLE t2 (k INT PRIMARY KEY, w INT)
SELECT t1.k, t1.v, t2.w FROM t1 INNER JOIN t2 ON t1.k = t2.k WHERE t1.v >= 1 AND t1.v <= 5
Logical plan:
TABLE-READER t1src
Table: t1@v, span /1-/6
Output schema: k:INT, v:INT
Output ordering: v
JOIN-READER t2src
Table: t2
Input schema: k:INT, v:INT
Output schema: k:INT, v:INT, w:INT
Ordering characterization: preserves any ordering on k
AGGREGATOR final
Input schema: k:INT, u:INT, v:INT
t1src -> t2src -> final
注意缺狠,JOIN-READER
具有將輸入列直接轉換為輸出列的能力(在本示例中“v”就屬于這種情況)。在基于索引join的情況下萍摊,它僅用于跳過讀取或解碼“v”的值;但在一般情況下挤茄,有必要從第一個表中傳遞列。
在“JOIN-READER”的物理實現(xiàn)方面冰木,有兩種實現(xiàn)方式:
1.它可以在接收物理輸入流的節(jié)點上執(zhí)行KV查詢(分批);并在同一節(jié)點上繼續(xù)輸出穷劈。
這種方式這很簡單,但會涉及到在node和range leaseholder之間的多于的網(wǎng)絡開銷踊沸。我們可能會有限采用此策略進行實現(xiàn)歇终。
- 可以通過基于range的router將每個輸入路由到其對應的range的leaseholder節(jié)點上的
JOIN-READER
實例;然后在range的leaseholder節(jié)點上繼續(xù)后續(xù)的數(shù)據(jù)處理。
這種做法避免了過多的網(wǎng)絡往返開銷逼龟,但是存在其他的問題评凝,因為我們可能在太多節(jié)點上創(chuàng)建這種處理流(對于大型表,群集中的許多/所有節(jié)點都會存在range)腺律。為了有效地實現(xiàn)上述特性奕短,只有當我們實際找到需要經(jīng)過某個特定流的行時,我們才會“l(fā)azy”的(根據(jù)需要)創(chuàng)建流匀钧。當t1
和t2
的順序相關時(例如篡诽,t1可以按日期排序,t2
可以由隱式主鍵排序)榴捡,這種策略特別有用。
即使進行了這種優(yōu)化朱浴,如果我們只進行少量的一些查詢吊圾,也會涉及到很多remote node达椰,這就存在極大地浪費。因此我們可以研究一種混合方法项乒,根據(jù)處理數(shù)據(jù)的大小以及這些數(shù)據(jù)跨越多少range和node啰劲,在兩種策略之間進行選擇。
Stream joins
join聚合器在兩個流上執(zhí)行join檀何,在join列之間存在的是相等約束蝇裤。聚合器基于join key進行分組。
TABLE People (First STRING, Last STRING, Age INT)
TABLE Applications (College STRING PRIMARY KEY, First STRING, Last STRING)
SELECT College, Last, First, Age FROM People INNER JOIN Applications ON First, Last
TABLE-READER src1
Table: People
Output Schema: First:STRING, Last:STRING, Age:INT
Output Ordering: none
TABLE_READER src2
Table: Applications
Output Schema: College:STRING, First:STRING, Last:STRING
Output Ordering: none
JOIN AGGREGATOR join
Input schemas:
1: First:STRING, Last:STRING, Age:INT
2: College:STRING, First:STRING, Last:STRING
Output schema: First:STRING, Last:STRING, Age:INT, College:STRING
Group key: (1.First, 1.Last) = (2.First, 2.Last) // we need to get the group key from either stream
Order characterization: no order preserved // could also preserve the order of one of the streams
AGGREGATOR final
Ordering requirement: none
Input schema: First:STRING, Last:STRING, Age:INT, College:STRING
stream join aggregators的物理實現(xiàn)的核心是 join processor 频鉴。 join processor 通常將來自一個流的所有行放入哈希映射中栓辜,然后處理另一個流。如果兩個流都按group column 排序垛孔,則它可以消耗很少內(nèi)存的前提下就可以執(zhí)行merge-join藕甩。執(zhí)行需要較少內(nèi)存的合并連接。
即使采用相同的join processor 實現(xiàn)周荐,我們也可以根據(jù)我們創(chuàng)建physical streams和routers的方式不同而采用不同的分布式處理策略:
-
router可以根據(jù)group key元素的散列值將每一行分配給多個join processor之一;這可確保group key的值相同的所有元素到達同一join processor狭莱,從而實現(xiàn)hash-join。一個示例物理計劃如下:
-
router可以復制來自一個表的物理流的所有行概作,并將副本分發(fā)給所有processor實例;另一個表的流在各自的節(jié)點上處理腋妙。當我們使用小表join大表時,此策略非常有用讯榕,并且對于子查詢尤其有用骤素,例如:
SELECT ... WHERE ... AND x IN(SELECT ...)
。
上面的查詢瘩扼,若src2中的數(shù)據(jù)比較少谆甜,那么這樣的物理執(zhí)行計劃的效率會更高:
這種物理執(zhí)行計劃的不同之處在于第一個表的數(shù)據(jù)不會通過網(wǎng)絡傳輸?shù)狡渌?jié)點,而src2table reader之后的router會將src2的結果向每個src1所在的節(jié)點都廣播一份(而不是在前一種物理執(zhí)行計劃下通過hash-join將兩個表的所有數(shù)據(jù)全部進行hash分布)集绰。
Inter-stream ordering
這是一個優(yōu)化功能特性规辱,不會改變邏輯或物理計劃的結構。也不是初始執(zhí)行計劃的一部分栽燕,但是會在后續(xù)的執(zhí)行計劃優(yōu)化中用到該功能特性罕袋。
示例如下:
TABLE t (k INT PRIMARY KEY, v INT)
SELECT k, v FROM t WHERE k + v > 10 ORDER BY k
最簡單的執(zhí)行計劃如下:
READER src
Table: t
Output filter: (k + v > 10)
Output schema: k:INT, v:INT
Ordering guarantee: k
AGGREGATOR final:
Input schema: k:INT, v:INT
Input ordering requirement: k
Group Key: []
Composition: src -> final
現(xiàn)在假設該表跨越兩個不同節(jié)點上的兩個range - 一個range的key范圍是:“k <= 10”摊册,一個range的key范圍是:“k> 10”立镶。在物理計劃中,將存在兩個table reader產(chǎn)生的兩個stream;在“final”stage之前究流,兩個stream將合并為單個stream蔼啦。但是我們知道榆纽,在合并之前,其實在兩個stream中,每個stream中的元素是已經(jīng)排好序的了- 這種情況我們稱之為:**inter-stream ordering**奈籽。這樣我們在合并時(在`final`之前)會更加高效:我們只需要簡單地先讀取第一個流中的所有元素饥侵,然后再讀取第二個流中的所有元素。在第一個流被消耗完畢之前不需要調(diào)度第二個流的讀取器和其他處理器衣屏。特別是躏升,當我們使用`ORDER BY`和`LIMIT`進行查詢時:可以使用具有單個group的aggregator來表示limit,通過該aggregator合并所有的physical stream;通過`inter-physical-stream`我們就可以僅僅通過從一個range中讀取數(shù)據(jù)狼忱,就可以實現(xiàn)limit功能膨疏。
我們在logical plan中加入了 inter-physical-stream ordering的概念,將其作為logical stream的屬性(即使它指的是與該logical stream相關聯(lián)的多個physical stream)钻弄。我們使用inter-stream ordering characterization function來注釋所有聚合器(類似于上面描述的"intra-stream"排序特征)佃却。 inter-stream排序函數(shù)將輸入流的排序映射到輸出流的排序,其含義是:若輸入流中的數(shù)據(jù)本來就已經(jīng)有序了斧蜕,經(jīng)過了aggregator的處理后双霍,其輸出流中元素的順序會和輸入流中的順序一致。
如果logical stream具有適當?shù)南嚓P聯(lián)的inter-stream ordering批销,則可以通過順序讀取流來實現(xiàn)物理流的合并洒闸。
執(zhí)行架構
一旦生成了物理計劃,系統(tǒng)就需要將其拆分并分布到各個node之間進行運行均芽。每個node負責本地調(diào)度data processors 和 input synchronizers丘逸。node還需要能夠彼此通信以將輸出output router連接到input synchronizer。特別是掀宋,需要一個streaming interface來連接這些組件深纲。為了避免額外的同步成本,需要足夠靈活的執(zhí)行環(huán)境以滿足上面的所有這些操作劲妙,以便不同的node除了執(zhí)行計劃初始的調(diào)度之外湃鹊,可以相對獨立的啟動相應的數(shù)據(jù)處理工作,而不會受到gateway節(jié)點的其他編排影響镣奋。
創(chuàng)建一個local plan: ScheduleFlows
RPC
當開始對執(zhí)行計劃開始進行分布式執(zhí)行的時候币呵,首先由gateway向每個節(jié)點發(fā)送request,由每個node執(zhí)行其負責的部分計劃侨颈,并要求node調(diào)度它負責的sub-plan(詳情可以參考下文的:模數(shù)“on-the-fly”flows)余赢。一個node可能負責整個DAG的多個不同部分。我們將DAG中的每個部分稱為*flow*哈垢。一個flow由其中的physical plan節(jié)點序列妻柒,它們之間的連接(input synchronizers, output routers)加上physical plan中起始節(jié)點的輸入流的標識符和(可能是多個)結束節(jié)點的輸出流組成。集群中的一個node可能同時負責多個不同的flow耘分。更常見的情況是举塔,當集群中的一個node是一個query中涉及到的多個range的leaseholder的時候绑警,它將負責一組相同類型的flow,每個range對一個一個同類的flow啤贩,全部以“TableReader”處理器開始待秃。在開始時,我們將所有這些`TableReader'合并為一個痹屹,由這一個TableReader讀取所有位于該node上的ranges,但是這也就意味著我們將不會進行流間排序(因為我們已將所有內(nèi)容都轉換為單個流)枉氮。稍后我們可能會在每個range內(nèi)使用一個“TableReader”志衍,以便我們可以進行并行讀取,從而加快讀取速度聊替。
因此楼肪,集群中的node實現(xiàn)了一個ScheduleFlows
RPC,它接受一組flow惹悄,設置輸入和輸出相關的信息(見下文)春叫,創(chuàng)建本地processor并開始執(zhí)行。在node對輸入和輸出數(shù)據(jù)進行處理的時候泣港,我們需要對flow進行一些控制暂殖,通過這種控制,我們可以拒絕request中的某些請求当纱。
flow的本地調(diào)度
在本地節(jié)點調(diào)度不同processor的最簡單方法就是并行執(zhí)行:每個processor呛每,synchronizer和router都可以作為goroutine運行,它們之間由channel互聯(lián)坡氯。這些channel可以緩沖信道以使生產(chǎn)者和消費者同步晨横。
Mailboxes
不同節(jié)點上的flow通過GRPC stream相互通信。為了允許生產(chǎn)者和消費者可以在不同時間啟動箫柳,ScheduleFlows
為所有輸入和輸出流都創(chuàng)建了命名mailboxes手形。這些命名mailboxes實際上有一個內(nèi)部隊列用于臨時保留一定數(shù)量的數(shù)據(jù),直到建立GRPC流來傳輸它們悯恍。一旦建立起GRPC流库糠,GRPC flow control 將會控制同步生產(chǎn)者和消費者的數(shù)據(jù)同步傳輸。消費者根據(jù)mailbox id(與已傳遞給ScheduleFlows
的流中已使用的相同)使用StreamMailbox
RPC建立GRPC流坪稽。mailboxes是動態(tài)創(chuàng)建的曼玩,一旦創(chuàng)建之后希望很快就會出現(xiàn)ScheduleFlows
。如果在超時時間內(nèi)沒有創(chuàng)建ScheduleFlows
窒百,則mailbox將停用黍判。郵箱代表了本地處理器的channel接口。如果我們想要每個節(jié)點的多個TableReader/flow合并成一個篙梢,那么我們也要將這些flow的輸出也合并成一個顷帖。(如果一個節(jié)點有10個range,我們可以通過合并來減少所需要的mailbox/stream的個數(shù))。此時贬墩,我們可能希望將進入同一個mailbox的不同stream打上不同的tag榴嗅,以便消費者仍可以進行流間排序。
使用mailbox執(zhí)行的簡單查詢的執(zhí)行圖:
創(chuàng)建即時flows
在一些情況下陶舞,我們不希望從一開始就將所有的flow創(chuàng)建起來嗽测。 PointLookup
和Mutate
操作通常從幾個range開始,然后將數(shù)據(jù)發(fā)送到任意node肿孵。要發(fā)送到每個節(jié)點的數(shù)據(jù)量通常非常羞胫唷(例如,PointLookup
可能會在表A上執(zhí)行少量查找停做,因此我們不希望在所有節(jié)點上都為這些查找設置receiver晤愧,來接受這些少量的數(shù)據(jù)。相反蛉腌,物理執(zhí)行計劃將只包含一個processor官份,使PointLookup
聚合器成為只有一個stage;此node可以選擇是否針對該查找直接執(zhí)行KV操作(對于查找次數(shù)較少的range)或者使用ScheduleFlows
RPC動態(tài)設置遠程流量來查找大量的查找range。在這種情況下烙丛,最好的解決方案就是將計算發(fā)送到數(shù)據(jù)所在的node舅巷,因此傳遞給ScheduleFlows
的flow將是聚合器下游的物理節(jié)點上的計算邏輯,包括過濾和聚合蜀变。一旦processor發(fā)現(xiàn)它在同一range內(nèi)需要批量進行大量的點查悄谐,它將會進行下游計算邏輯的發(fā)送。
廢棄 flows
在很多情況下Processor和mailbox都需要銷毀:
1.processor在其所有輸入流上接收到一個哨兵標記并將哨兵標記以及其之前的所有數(shù)據(jù)都已經(jīng)輸出完畢后库北,會進行銷毀爬舰。
2.一旦processor的輸入或輸出流關閉,processor就會退出寒瓦。消費者可以使用它來告知其生產(chǎn)者它已獲得所需的所有數(shù)據(jù)情屹。
3.input mailbox將會在將哨兵標記發(fā)送出去或遠程關閉GRPC流后退出。
4.output mailbox一旦將哨兵標記傳遞給reader就會退出杂腰,一旦所有輸入通道都關閉垃你,它就會退出(請記住,output mailbox可能會接收來自多個channel的輸入喂很,每個同類的flow一個channel)如果其GRPC流遠程關閉惜颇,它也會銷毀。
5.TableReader
一旦它在其range內(nèi)傳遞了最后一個元組(+一個哨兵標記)就銷毀少辣。
錯誤處理
最初凌摄,執(zhí)行計劃是沒有錯誤恢復的(執(zhí)行期間出現(xiàn)任何問題,查詢失敗并且事務被回滾)漓帅。唯一的問題是釋放執(zhí)行計劃的各個處理節(jié)點采用的所有資源锨亏〕赵梗可以通過在任意的GRPC流突然關閉時發(fā)送一個error signal來完成資源釋放。類似地器予,可以通過在“FINAL”processor中關閉其輸入通道來取消正在運行的查詢浪藻。此關閉將向后傳播到所有查詢執(zhí)行計劃的節(jié)點。
一個更加復雜的例子: Daily Promotion
我們嘗試一個更復雜的查詢乾翔。查詢的目的是幫助推廣每日發(fā)布的廣告,定位去年花費超過1000美元的客戶反浓,'DailyPromotion`中存儲了每個客戶以及他們最近的訂單。
TABLE DailyPromotion (
Email TEXT,
Name TEXT,
OrderCount INT
)
TABLE Customers (
CustomerID INT PRIMARY KEY,
Email TEXT,
Name TEXT
)
TABLE Orders (
CustomerID INT,
Date DATETIME,
Value INT,
PRIMARY KEY (CustomerID, Date),
INDEX date (Date)
)
INSERT INTO DailyPromotion
(SELECT c.Email, c.Name, os.OrderCount FROM
Customers AS c
INNER JOIN
(SELECT CustomerID, COUNT(*) as OrderCount FROM Orders
WHERE Date >= '2015-01-01'
GROUP BY CustomerID HAVING SUM(Value) >= 1000) AS os
ON c.CustomerID = os.CustomerID)
Logical plan:
TABLE-READER orders-by-date
Table: Orders@OrderByDate /2015-01-01 -
Input schema: Date: Datetime, OrderID: INT
Output schema: Cid:INT, Value:DECIMAL
Output filter: None (the filter has been turned into a scan range)
Intra-stream ordering characterization: Date
Inter-stream ordering characterization: Date
JOIN-READER orders
Table: Orders
Input schema: Oid:INT, Date:DATETIME
Output filter: None
Output schema: Cid:INT, Date:DATETIME, Value:INT
// TODO: The ordering characterizations aren't necessary in this example
// and we might get better performance if we remove it and let the aggregator
// emit results out of order. Update after the section on backpropagation of
// ordering requirements.
Intra-stream ordering characterization: same as input
Inter-stream ordering characterization: Oid
AGGREGATOR count-and-sum
Input schema: CustomerID:INT, Value:INT
Aggregation: SUM(Value) as sumval:INT
COUNT(*) as OrderCount:INT
Group key: CustomerID
Output schema: CustomerID:INT, OrderCount:INT
Output filter: sumval >= 1000
Intra-stream ordering characterization: None
Inter-stream ordering characterization: None
JOIN-READER customers
Table: Customers
Input schema: CustomerID:INT, OrderCount: INT
Output schema: e-mail: TEXT, Name: TEXT, OrderCount: INT
Output filter: None
// TODO: The ordering characterizations aren't necessary in this example
// and we might get better performance if we remove it and let the aggregator
// emit results out of order. Update after the section on backpropagation of
// ordering requirements.
Intra-stream ordering characterization: same as input
Inter-stream ordering characterization: same as input
INSERT inserter
Table: DailyPromotion
Input schema: email: TEXT, name: TEXT, OrderCount: INT
Table schema: email: TEXT, name: TEXT, OrderCount: INT
INTENT-COLLECTOR intent-collector
Group key: []
Input schema: k: TEXT, v: TEXT
AGGREGATOR final:
Input schema: rows-inserted:INT
Aggregation: SUM(rows-inserted) as rows-inserted:INT
Group Key: []
Composition:
order-by-date -> orders -> count-and-sum -> customers -> inserter -> intent-collector
\-> final (sum)
可能的物理執(zhí)行計劃如下:
實現(xiàn)策略
實現(xiàn)的時候懈玻,有兩個設計原則:
- Milestone M1: 將過濾盡量下發(fā)到源端
- Milestone M2: 將更新下發(fā)到源端
邏輯執(zhí)行計劃
基于查詢語句構建邏輯執(zhí)行計劃會涉及到很多方面:
- 索引選擇
- 查詢優(yōu)化
- 選擇各種排序涂乌,聚合策略
- 選擇各種join策略
基于語句構建查詢執(zhí)行計劃涉及到的領域很多艺栈,我們可以從基于現(xiàn)有代碼的基本實現(xiàn)開始,隨著時間的推移湾盒,不斷改進湿右。
物理執(zhí)行計劃
物理執(zhí)行計劃階段的許多決策都是“強制”的 - table reader根據(jù)range的分布情況進行分布式分發(fā),并且大部分物理執(zhí)行計劃都遵循這一點罚勾。
物理執(zhí)行計劃涉及到的困難的決策是在對aggregation 或者join操作的第二階段的分布式處理的時候-我們可以在任意nodes上設置任意數(shù)量的“buckets”(以及后續(xù)的flow)毅人。例如。 summer
的例子尖殃。幸運的是丈莺,我們可以從一個簡單的策略開始 - 使用盡可能多的桶作為輸入流并在相同的節(jié)點之間分配它們。此策略可以根據(jù)查詢的數(shù)據(jù)量的大小很好地進行擴展:如果查詢從單個節(jié)點中提取數(shù)據(jù)送丰,我們將在該節(jié)點上進行所有聚合;如果查詢從許多節(jié)點中提取數(shù)據(jù)缔俄,我們將在這些節(jié)點之間進行分布式聚合。
我們還將支持通過配置的方式以最小化分布式處理 - 盡可能快地在單個網(wǎng)關節(jié)點上獲取所有內(nèi)容器躏。這也是一個保守的做法俐载,可以避免在太多節(jié)點之間進行分布式查詢出現(xiàn)的一些問題。
“階段2”將一直檢測何時計算(和后續(xù)階段)可能足夠快以及消耗的資源足夠小從而以不需要進行分布式處理登失,而自動切換為在gateway節(jié)點上直接執(zhí)行聚合遏佣。可以在以后研究進一步的改進(基于統(tǒng)計)壁畸。
我們應該添加擴展的SQL語法贼急,以允許查詢發(fā)起者控制其中一些參數(shù)太抓。
Processor 架構及實現(xiàn)
本節(jié)會詳細說明processor的架構碴倾。我們可以從table reader開始(足夠實現(xiàn)M1)跌榔。
Joins
正交工作流是為了支持join(最初是非分布式的)。這會涉及構建processor项炼,該processor將成為hash join實現(xiàn)的核心,并將該代碼與當前的“planNode”樹集成锭部。
調(diào)度
有效排隊和processor調(diào)度的問題將不斷改善拌禾。但我們可以從一個基于簡單策略的基本實現(xiàn)開始:
- 只會在事務中才會進行隊列排序;我們在單個processor內(nèi)是不進行排序的
任何時候都要限制事務的個數(shù)或者運行的processor的總數(shù) - txn排隊的排序是基于txn時間戳及其優(yōu)先級的闻蛀,允許節(jié)點自動就事務的相對排序達成一致循榆,消除死鎖情況(死鎖的示例:txn A有一些processor在節(jié)點1上運行并等待,在節(jié)點2上被txnB使用的processor盗尸;并且txn B也有一些處理器在節(jié)點2上運行泼各,同時等待在節(jié)點1上運行的被txn A使用的processor)
KV 整合
我們不建議引入任何新的KV Get / Put API逆巍。當前的這些API已經(jīng)夠用了锐极。當在lease holder上運行查詢的時候,查詢的速度將會和在本地運行的速度一樣快翎迁。
但是,我們還需要對KV層進行一些其他的集成:
-
查找range信息
在物理執(zhí)行計劃階段肃拜,我們需要將關鍵span拆分為range,并確定每個range的lease holder柿菩。我們還可以在邏輯執(zhí)行階段使用range信息來幫助評估table大小(用于索引選擇凉泄,join順序等)。 KV層已經(jīng)有一個range cache來維護這些信息颅拦,但我們需要根據(jù)我們維護的cache中的信息量以及更新/過期cache信息的方式來更加主動的對cache中的信息進行維護右锨。
-
分布式讀
KV層幾乎不需要更改就可以支持分布式讀取
分布式寫
事務協(xié)調(diào)器會跟蹤所有修改的key或者修改的range悄窃。sql分發(fā)層會將修改后的key的信息匯集回gateway節(jié)點(充當事務協(xié)調(diào)器)。我們需要做些集成的工作鸦致,以便我們將此信息傳遞給KV層分唾。最終我們需要一個可以用于range的流式讀取接口。
實現(xiàn)說明
可視化/追蹤
必須提供有關邏輯和物理計劃的詳細信息折砸,以及查詢所有階段的詳細跟蹤,包括執(zhí)行時間摔寨,統(tǒng)計信息等删顶。
一些簡單查詢的數(shù)據(jù)流向說明
一個簡單查詢(基于過濾的查詢或者更新)的數(shù)據(jù)流:
Node A | Node B | Node C | Node D |
---|---|---|---|
Receives statement | |||
Finds that the table data spans three ranges on B, C, D | |||
Sends scan requests to B, C, D | |||
Starts scan (w/filtering, updates) | Starts scan (w/filtering, updates) | Starts scan (w/filtering, updates) | |
Sends results back to A | Sends results back to A | Sends results back to A | |
Aggregates and returns results. |
hash join的數(shù)據(jù)流:
Node A | Node B | Node C | Node D |
---|---|---|---|
Receives statement | |||
Finds that the table data spans three ranges on B, C, D | |||
Sets up 3 join buckets on B, C, D | |||
Expects join data for bucket 0 | Expects join data for bucket 1 | Expects join data for bucket 2 | |
Sends scan requests to B, C, D | |||
Starts scan (w/ filtering). Results are sent to the three buckets in batches | Starts scan (w/ filtering) Results are sent to the three buckets in batches | Starts scan (w/ filtering). Results are sent to the three buckets in batches | |
Tells A scan is finished | Tells A scan is finished | Tells A scan is finished | |
Sends finalize requests to the buckets | |||
Sends bucket data to A | Sends bucket data to A | Sends bucket data to A | |
Returns results |
復雜性
我們需要為SQL-to-SQL構建新的基礎結構和API录粱。 API需要支持SQL表達式,可以是SQL字符串(需要每個節(jié)點重新解析表達式)输虱,也可以是更有效的AST序列化愁茁。
API還需要包含有關請求應限制到哪些key range的信息。由于表可以跨越許多raft ranges促煮,因此該信息可包括大量不相交的key range。
在未來绳匀,我們可能會實現(xiàn)像EPaxos這樣的共識算法,它允許直接在副本上直接操作是尔,這樣就為我們的分布式處理提供了更多的選擇嗜历。
最后,API的設計必須要考慮到數(shù)據(jù)處理傻粘,網(wǎng)絡傳輸和存儲操作之間的并行- 應該可以在結果全部可用之前對結果進行流式傳輸(F1通過取消流結果的排序而加快了獲取結果的速度)弦悉。
Spark: 將SQL編譯成一個運行于分布式執(zhí)行環(huán)境之上的數(shù)據(jù)并行處理語言
這里我們介紹一個新系統(tǒng)- 一個分布式計算的執(zhí)行環(huán)境。計算采用類似于M/R的編程模型劈猪,或者是更流水線化的模型 - Spark, 或者 Google's Dataflow (Dataflow的一部分作為apache的項目战得,可以運行在其他執(zhí)行環(huán)境(例如:spark)之上)。
在這些模型中贬媒,您可以考慮使用可以并行操作的數(shù)據(jù)結構聋亡,如:數(shù)據(jù)和map。對于這些數(shù)據(jù)結構的存儲是分布式的掖蛤。您所做的就是對這些數(shù)組或map進行操作 - 對它們進行排序杀捻,按key對它們進行分組,轉換和過濾蚓庭。您還可以對多個數(shù)據(jù)集進行操作以進行join操作。
這些模型試圖擁有 * a)執(zhí)行符號執(zhí)行的智能編譯器请契,例如融合盡可能多的操作 - “map(f氯夷,map(g,dataset))== map(f●g,dataset)`和 * b)動態(tài)運行拟杉。運行時會一遍根據(jù)部分輸入的計算結果動態(tài)的決定后續(xù)的計算流程泣洞。
我們的想法是將SQL編譯成這種語言吃度,考慮到我們以一個大的有序map作為數(shù)據(jù)集,然后在該數(shù)據(jù)集上運行查詢和計算。如果執(zhí)行環(huán)境良好苞也,它會利用數(shù)據(jù)拓撲。這與“分布式sql”不同,因為* a)執(zhí)行環(huán)境是動態(tài)的猴抹,所以你不需要預先提出一個執(zhí)行計劃,說明哪個節(jié)點會向其他節(jié)點發(fā)出什么命令和 b )*數(shù)據(jù)可以從一個節(jié)點推送到另一個節(jié)點汁政,而不僅僅是拉動懊渡。
我們可以從小的開始 - 不執(zhí)行分布式運行辫继,只需過濾“SELECTS”并過濾“UPDATE示血,DELETE告喊,INSERT FROM SELECT”纳寂。