本文就兩個問題進(jìn)行討論:1. 相比于Shark,為什么像Hive之類的傳統(tǒng)MapReduce框架比較慢? 2. 對于細(xì)粒度的任務(wù)模型(fine-grained task model)片拍,究竟有些什么優(yōu)勢?
background
本文翻譯自Shark: SQL and Rich Analytics at Scale的論文第七章節(jié)流译,從理論上討論了相比于Hive,Shark的優(yōu)勢在哪里黔州,原文可見http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-214.pdf.
為什么之前的MapReduce系統(tǒng)比較慢
常理上有幾個理由使得MapReduce框架慢于MPP數(shù)據(jù)庫:
容錯所引入的昂貴數(shù)據(jù)實(shí)體化(data materialization)開銷耍鬓。
孱弱的數(shù)據(jù)布局(data layout),比如缺少索引流妻。執(zhí)行策略的開銷[1 2]界斜。
而我們對于Hive的實(shí)驗(yàn)也進(jìn)一步證明了上述的理由,但是通過對Hive“工程上”的改進(jìn)合冀,如改變存儲引擎(內(nèi)存存儲引擎)各薇、改善執(zhí)行架構(gòu)(partial DAG execution
)能夠縮小此種差距。同時我們也發(fā)現(xiàn)一些MapReduce實(shí)現(xiàn)的細(xì)節(jié)會對性能有巨大的影響,如任務(wù)調(diào)度的開銷峭判,如果減小調(diào)度開銷將極大地提高負(fù)載的均衡性开缎。
中間結(jié)果輸出:類似于Hive這樣的基于MapReduce的查詢引擎,往往會將中間結(jié)果實(shí)體化(materialize)
到磁盤上:
在MapReduce任務(wù)內(nèi)部林螃,為了防止Reduce任務(wù)的失敗奕删,Map通常會把結(jié)果存儲在磁盤上。
通常一些查詢在翻譯到MapReduce任務(wù)的時候疗认,往往會產(chǎn)生多個stage完残,而這些串聯(lián)的stage則又依賴于底層文件系統(tǒng)(如HDFS)來存儲每一個stage的輸出結(jié)果。
對于第一種情況横漏,Map的輸出結(jié)果存儲在磁盤上是為了確保能夠有足夠的空間來存儲這些大數(shù)據(jù)批量任務(wù)的輸出谨设。而Map的輸出并不會復(fù)制到不同的節(jié)點(diǎn)上去,因此如果執(zhí)行Map任務(wù)的節(jié)點(diǎn)失效的話仍會造成數(shù)據(jù)丟失[3]缎浇。由此可以推出扎拣,如果將這部分輸出數(shù)據(jù)緩存在內(nèi)存中,而不是全部輸出到磁盤上面也是合理的素跺。Shark Shuffle的實(shí)現(xiàn)正是應(yīng)用了此推論二蓝,將Map的輸出結(jié)果存儲在內(nèi)存中,極大地提高Shuffle的吞吐量指厌。通常對于聚合(aggregation)
和過濾之類的查詢刊愚,它們的輸出結(jié)果往往遠(yuǎn)小于輸入,這種設(shè)計是非常合理的踩验。而SSD的流行鸥诽,也會極大地提高隨機(jī)讀取的性能,對于大數(shù)據(jù)量的Shuffle晰甚,能夠獲得較大的吞吐量衙传,同時也擁有比內(nèi)存更大的空間。
對于第二種情況厕九,一些執(zhí)行引擎擴(kuò)展了MapReduce的執(zhí)行模型蓖捶,將MapReduce的執(zhí)行模型泛化成更為通用的執(zhí)行計劃圖(task DAG)
,可以將多stage的任務(wù)串聯(lián)執(zhí)行而無需將stage中間結(jié)果輸出到HDFS中去扁远,這些引擎包括Dryad[4], Tenzing[5]和Spark[6]俊鱼。
數(shù)據(jù)格式和布局(layout)
: 由于MapReduce單純的Schema-on-read的處理方式會引起較大的處理開銷,許多系統(tǒng)在MapReduce模型內(nèi)部設(shè)計和使用了更高效的存儲結(jié)構(gòu)來加速查詢畅买。Hive本身支持“分區(qū)表(table partitions)
”(一種基本的類索引系統(tǒng)并闲,它將特定的鍵段存儲在特定的文件中,可以避免對于整個表的掃描)谷羞,類似于磁盤數(shù)據(jù)的列式存儲結(jié)構(gòu)[7]帝火。在Shark中我們更進(jìn)一步地采用了基于內(nèi)存的列式存儲結(jié)構(gòu)溜徙,Shark在實(shí)現(xiàn)此結(jié)構(gòu)時并沒有修改Spark的代碼,而是簡單地將一組列式元組存儲為Spark內(nèi)的一條記錄犀填,而對于列式元組內(nèi)的結(jié)構(gòu)則有Shark負(fù)責(zé)解析蠢壹。
另一個Spark獨(dú)有的特性是能夠控制數(shù)據(jù)在不同節(jié)點(diǎn)上的分區(qū),這為Shark帶來了一種新的功能:對表進(jìn)行聯(lián)合分區(qū)(co-partition)
九巡。
最后图贸,對于RDD我們還未挖掘其隨機(jī)讀取的能力,雖然對于寫入操作冕广,RDD只能支持粗粒度的操作疏日,但對于讀取操作,RDD可以精確到每一條記錄[6]撒汉,這使得RDD可以用來作為索引, Tenzing 可以用此來作為join操作的遠(yuǎn)程查詢表(remote-lookup)
沟优。
執(zhí)行策略: Hive在數(shù)據(jù)Shuffle之前花費(fèi)了大量的時間用來排序,同時將MapReduce結(jié)果輸出到HDFS上面也占用了大量的時間神凑,這些都是由于Hadoop自身基本的净神,單次迭代的MapReduce模型所限制的何吝。對于Spark這樣的更通用的執(zhí)行引擎溉委,則可減輕上述問題帶來的開銷。舉例來說爱榕,Spark支持基于Hash的分布式聚合和更為通用任務(wù)執(zhí)行計劃圖(DAG)
瓣喊。
事實(shí)上,為了能夠真正優(yōu)化關(guān)系型查詢的執(zhí)行黔酥,我們發(fā)現(xiàn)在基于數(shù)據(jù)統(tǒng)計的基礎(chǔ)上來選擇執(zhí)行計劃是非常有必要的藻三。但是由于UDF和復(fù)雜分析函數(shù)的存在,而Shark又將其視為一等公民(first-class citizens)
擂错,這種統(tǒng)計將變得十分困難关划。為了能夠解決這個問題儡首,我們提出了partial DAG execution (PDE),這使得Spark能夠在基于數(shù)據(jù)統(tǒng)計的基礎(chǔ)上改變后續(xù)執(zhí)行計劃圖逗概,PDE與其他系統(tǒng)(DryadLINQ)的運(yùn)行時執(zhí)行計劃圖重寫的不同在于:它能夠收集鍵值范圍內(nèi)的細(xì)粒度統(tǒng)計數(shù)據(jù);能夠完全重新選擇join的執(zhí)行策略忘衍,如broadcast join逾苫,而不僅僅是選擇Reduce任務(wù)的個數(shù)。
任務(wù)調(diào)度的開銷: 大概在諸多影響Shark的部分中枚钓,最令人感到意外的卻只是一個純粹工程上的問題:運(yùn)行任務(wù)帶來的開銷铅搓。傳統(tǒng)的MapReduce系統(tǒng),就比如Hadoop搀捷,是為了運(yùn)行長達(dá)數(shù)小時的批量作業(yè)而設(shè)計的星掰,而組成作業(yè)的每個任務(wù)其運(yùn)行時間則有數(shù)分鐘之久,他們會在獨(dú)立的系統(tǒng)進(jìn)程中執(zhí)行任務(wù),在某些極端情況下提交一個任務(wù)的延遲非常之高氢烘。拿Hadoop打比方便斥,它使用周期性的“心跳”消息來向工作節(jié)點(diǎn)分配任務(wù),而這個周期是3秒鐘威始,因此總共的任務(wù)啟動延時就會高達(dá)5-10秒枢纠。這對于批處理的系統(tǒng)顯然是可以忍受的,但是對于實(shí)時查詢這顯然是不夠的黎棠。
為了避免上述問題晋渺,Spark采用了事件驅(qū)動的RPC類庫來啟動任務(wù),通過復(fù)用工作進(jìn)程來避免系統(tǒng)進(jìn)程開銷脓斩。它能夠在一秒鐘內(nèi)啟動上千個任務(wù)木西,任務(wù)之間的延時小于5毫秒,從而使得50-100毫秒的任務(wù)随静,500毫秒的作業(yè)變得可能八千。而這種改進(jìn)對于查詢性能的提升,甚至對于較長執(zhí)行時間的查詢性能的提升也令我們感到吃驚不已燎猛。
亞秒級的任務(wù)使得引擎能夠更好地在工作節(jié)點(diǎn)之間平衡任務(wù)的分配恋捆,甚至在某些節(jié)點(diǎn)遇到了不可預(yù)知的延遲(網(wǎng)絡(luò)延遲或是JVM垃圾回收)的情況下面也能較好地平衡。同時對于數(shù)據(jù)傾斜也有巨大的幫助重绷,考慮到在100個核上做哈希聚合(hash aggregation)
沸停,對于每一個任務(wù)所處理的鍵范圍需要精心選定,任何的數(shù)據(jù)傾斜的部分都會拖慢整個作業(yè)昭卓。但是如果將作業(yè)分發(fā)到1000個核上面愤钾,那么最慢的任務(wù)只會比平均任務(wù)慢10倍,這就大大提高了可接受程度候醒。而當(dāng)我們在PDE中應(yīng)用傾斜感知的選擇策略后能颁,令我們感到失望的是相比于增大Reduce任務(wù)個數(shù)帶來的提升,這種策略所帶來的提升卻比較小倒淫。但不可否認(rèn)的是伙菊,引擎對于異常數(shù)據(jù)傾斜有了更高的穩(wěn)定性。
在Hadoop/Hive中昌简,錯誤的選擇任務(wù)數(shù)量往往會比優(yōu)化好的執(zhí)行策略慢上10倍占业,因此有大量的工作集中在如何自動的選擇Reduce任務(wù)的數(shù)量[8 9],下圖可以看到Hadoop/Hive和Spark Reduce任務(wù)數(shù)量對于作業(yè)執(zhí)行時間的影響纯赎。因?yàn)镾park作業(yè)能夠以較小的開銷運(yùn)行數(shù)千個Reduce任務(wù)谦疾,數(shù)據(jù)傾斜的影響可以通過運(yùn)行較多任務(wù)來減小。
事實(shí)上犬金,對于更大規(guī)模集群(數(shù)萬個節(jié)點(diǎn))上亞秒級任務(wù)的可行性我們還未探究念恍。但是對于Dremel[10]這樣的周期性地在數(shù)千個節(jié)點(diǎn)上運(yùn)行亞秒級作業(yè)的系統(tǒng)六剥,實(shí)際情況下當(dāng)單個主節(jié)點(diǎn)無法滿足任務(wù)調(diào)度的速度時,調(diào)度策略可以將任務(wù)委派給子集群的“副”主節(jié)點(diǎn)峰伙。同時細(xì)粒度的任務(wù)執(zhí)行策略相比于粗粒度的設(shè)計不僅僅帶來了負(fù)載均衡的好處疗疟,而且還包括快速恢復(fù)(fast recovery)
(通過將失敗任務(wù)分發(fā)到更多的節(jié)點(diǎn)上去)、查詢的彈性(query elasticity)
瞳氓。
細(xì)粒度任務(wù)模型(Fine-Grained Task Modle)
帶來的其他好處
雖然這篇文章主要關(guān)注的是細(xì)粒度任務(wù)模型帶來的容錯性優(yōu)勢策彤,這個模型同樣也提供了許多誘人的特性,接下將會介紹在MapReduce系統(tǒng)中已被證明的兩個特性匣摘。
**伸縮性(Elasticity):
** 在傳統(tǒng)的MPP數(shù)據(jù)庫中店诗,一旦分布式執(zhí)行計劃被選中,系統(tǒng)就必須以此并行度執(zhí)行整一個的查詢音榜。但是在細(xì)粒度任務(wù)系統(tǒng)中庞瘸,在執(zhí)行查詢的過程中節(jié)點(diǎn)可以增刪節(jié)點(diǎn),系統(tǒng)會自動地把阻塞的作業(yè)分發(fā)到其他節(jié)點(diǎn)上去赠叼,這使得整個系統(tǒng)變得非常具有伸縮性擦囊。如果數(shù)據(jù)庫管理者需要在這個系統(tǒng)中移除某些節(jié)點(diǎn),系統(tǒng)可以簡單地將這些節(jié)點(diǎn)視為失效節(jié)點(diǎn)嘴办,或者更好的處理方法是將這些節(jié)點(diǎn)上的數(shù)據(jù)復(fù)制到其他節(jié)點(diǎn)上去瞬场。與刪除節(jié)點(diǎn)相對應(yīng)的是,當(dāng)執(zhí)行查詢變得更慢時户辞,數(shù)據(jù)庫系統(tǒng)可以動態(tài)地申請更多的資源來提升計算能力泌类。亞馬遜的Elastic MapReduce[11]已經(jīng)支持運(yùn)行時調(diào)整集群規(guī)模癞谒。
多租戶架構(gòu)(Multitenancy)
: 多租戶架構(gòu)如同上面提到伸縮性一樣底燎,目的是為了在不同用戶之間動態(tài)地共享資源。在傳統(tǒng)的MPP數(shù)據(jù)庫中弹砚,當(dāng)一個重要的查詢提交的時候已經(jīng)有一個較大的查詢占據(jù)了大多數(shù)的集群資源双仍,這時能做的選擇不外乎就是取消先前的查詢等有限的操作。而在基于細(xì)粒度任務(wù)模型的系統(tǒng)中桌吃,查詢作業(yè)可以等待幾秒到當(dāng)前作業(yè)完成朱沃,然后提交新的查詢作業(yè)。Facebook和Microsoft已經(jīng)為Hadoop和Dryad開發(fā)了公平調(diào)度器茅诱,使得大型的逗物、計算密集型的歷史記錄查詢與實(shí)時的小型查詢可以共享集群資源而不會產(chǎn)生饑餓現(xiàn)象[12 13]。
Reference
[1] “A. Pavlo et al. A comparison of approaches to large-scale data analysis. In SIGMOD, 2009.”
[2] “M. Stonebraker et al. Mapreduce and parallel dbmss: friends or foes? Commun. ACM.”
[3] http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-214.pdf.
[4] “M. Isard et al. Dryad: distributed data-parallel programs from sequential building blocks. SIGOPS, 2007.”
[5] “B. Chattopadhyay, , et al. Tenzing a sql implementation on the mapreduce framework. PVLDB, 4(12):1318–1327, 2011.”
[6] “M. Zaharia et al. Resilient distributed datasets: a fault-tolerant abstraction for in-memory cluster computing. NSDI, 2012.”
[7] “A. Thusoo et al. Hive-a petabyte scale data warehouse using hadoop. In ICDE, 2010.”
[8] “Y. Kwon et al. Skewtune: mitigating skew in mapreduce applications. In SIGMOD ’12, 2012.”
[9] “B. Guffler et al. Handling data skew in mapreduce. In CLOSER, 2011.”
[10] “S. Melnik et al. Dremel: interactive analysis of web-scale datasets. Proc. VLDB Endow., 3:330–339, Sept 2010.”
[11] http://aws.amazon.com/about-aws/whats-new/2010/10/20/amazon-elastic-mapreduce-introduces-resizing-running-job-flows.
[12] “M. Zaharia et al. Delay scheduling: A simple technique for achieving locality and fairness in cluster scheduling. In EuroSys 10, 2010.”
[13] “M. Isard et al. Quincy: Fair scheduling for distributed computing clusters. In SOSP ’09, 2009.”