設(shè)計(jì)動(dòng)機(jī)
ElasticSearch 毫秒級的查詢響應(yīng)時(shí)間還是很驚艷的蹲堂。其優(yōu)點(diǎn)有:
- 優(yōu)秀的全文檢索能力
- 高效的列式存儲(chǔ)與查詢能力
- 數(shù)據(jù)分布式存儲(chǔ)(Shard 分片)
其列式存儲(chǔ)可以有效的支持高效的聚合類查詢,譬如groupBy等操作雷酪,分布式存儲(chǔ)則提升了處理的數(shù)據(jù)規(guī)模。
相應(yīng)的也存在一些缺點(diǎn):
- 缺乏優(yōu)秀的SQL支持
- 缺乏水平擴(kuò)展的Reduce(Merge)能力须揣,現(xiàn)階段的實(shí)現(xiàn)局限在單機(jī)
- JSON格式的查詢語言钟沛,缺乏編程能力窄刘,難以實(shí)現(xiàn)非常復(fù)雜的數(shù)據(jù)加工,自定義函數(shù)(類似Hive的UDF等)
Spark 作為一個(gè)計(jì)算引擎就谜,可以克服ES存在的這些缺點(diǎn):
- 良好的SQL支持
- 強(qiáng)大的計(jì)算引擎怪蔑,可以進(jìn)行分布式Reduce
- 支持自定義編程(采用原生API或者編寫UDF等函數(shù)對SQL做增強(qiáng))
所以在構(gòu)建即席多維查詢系統(tǒng)時(shí),Spark 可以和ES取得良好的互補(bǔ)效果丧荐。通過ES的列式存儲(chǔ)特性缆瓣,我們可以非常快的過濾出數(shù)據(jù)篮奄,
并且支持全文檢索捆愁,之后這些過濾后的數(shù)據(jù)從各個(gè)Shard 進(jìn)入Spark,Spark分布式的進(jìn)行Reduce/Merge操作,并且做一些更高層的工作割去,最后輸出給用戶。
通常而言昼丑,結(jié)構(gòu)化的數(shù)據(jù)結(jié)構(gòu)可以有效提升數(shù)據(jù)的查詢速度呻逆,但是會(huì)對數(shù)據(jù)的構(gòu)建產(chǎn)生一定的吞吐影響。ES強(qiáng)大的Query能力取決于數(shù)據(jù)結(jié)構(gòu)化的存儲(chǔ)(索引文件)菩帝,為了解決這個(gè)問題咖城,我們可以通過Spark Streaming
有效的對接各個(gè)數(shù)據(jù)源(Kafka/文件系統(tǒng))等,將數(shù)據(jù)規(guī)范化后批量導(dǎo)入到ES的各個(gè)Shard呼奢。Spark Streaming 基于以下兩點(diǎn)可以實(shí)現(xiàn)為ES快速導(dǎo)入數(shù)據(jù)宜雀。
- Spark RDD 的Partition 能夠良好的契合ES的Shard的概念。能夠?qū)崿F(xiàn)一一對應(yīng)握础。避免經(jīng)過ES的二次分發(fā)
- Spark Streaming 批處理的模式 和 Lucene(ES的底層存儲(chǔ)引擎)的Segment對應(yīng)的非常好辐董。一次批處理意味著新生成一個(gè)文件,
我們可以有效的控制生成文件的大小禀综,頻度等简烘。
架構(gòu)設(shè)計(jì)
下面是架構(gòu)設(shè)計(jì)圖:
整個(gè)系統(tǒng)大概分成四個(gè)部分。分別是:
- API層
- Spark 計(jì)算引擎層
- ES 存儲(chǔ)層
- ES 索引構(gòu)建層
API 層
API 層主要是做多查詢協(xié)議的支持定枷,比如可以支持SQL,JSON等形態(tài)的查詢語句孤澎。并且可是做一些啟發(fā)式查詢優(yōu)化。從而決定將查詢請求是直接轉(zhuǎn)發(fā)給后端的ES來完成欠窒,還是走Spark 計(jì)算引擎覆旭。也就是上圖提到的 Query Optimize,根據(jù)條件決定是否需要短路掉 Spark Compute。
Spark 計(jì)算引擎層
前面我們提到了ES的三個(gè)缺陷岖妄,而Spark 可以有效的解決這個(gè)問題型将。對于一個(gè)普通的SQL語句,我們可以把 where 條件的語句荐虐,部分group 等相關(guān)的語句下沉到ES引擎進(jìn)行執(zhí)行茶敏,之后可能匯總了較多的數(shù)據(jù),然后放到Spark中進(jìn)行合并和加工缚俏,最后轉(zhuǎn)發(fā)給用戶惊搏。相對應(yīng)的,Spark 的初始的RDD 類似和Kafka的對接忧换,每個(gè)Kafka 的partition對應(yīng)RDD的一個(gè)partiton,每個(gè)ES的Shard 也對應(yīng)RDD的一個(gè)partition恬惯。
ES 存儲(chǔ)層
ES的Shard 數(shù)量在索引構(gòu)建時(shí)就需要確定,確定后無法進(jìn)行更改亚茬。這樣單個(gè)索引里的Shard 會(huì)越來越大從而影響單Shard的查詢速度酪耳。但因?yàn)樯蠈佑辛?Spark Compute層,所以我們可以通過添加Index的方式來擴(kuò)大Shard的數(shù)目,然后查詢時(shí)查詢所有分片數(shù)據(jù)碗暗,由Spark完成數(shù)據(jù)的合并工作颈将。
ES 索引構(gòu)建層
數(shù)據(jù)的結(jié)構(gòu)化必然帶來了構(gòu)建的困難。所以有了Spark Streaming層作為數(shù)據(jù)的構(gòu)建層言疗。這里你有兩種選擇:
- 通過ES原生的bulk API 完成索引的構(gòu)建
- 然Spark 直接對接到 ES的每個(gè)Shard,直接針對該Shard 進(jìn)行索引晴圾,可有效替身索引的吞吐量。