Spark是一個(gè)快速的集群化的實(shí)時(shí)計(jì)算系統(tǒng)。支持Java, Scala, Python 和R語(yǔ)言的高級(jí)API说榆。
一 Spark生態(tài):
支持Spark Sql用于sql和結(jié)構(gòu)化數(shù)據(jù)查詢(xún)處理儡嘶;支持MLlib用于機(jī)器學(xué)習(xí)瞒爬;支持GraphX用于圖形處理卑惜;支持Spark Streaming和Structured Sql(spark2.1.1版本發(fā)布)用于實(shí)時(shí)計(jì)算。(其中妒蔚,我們使用的Spark功能主要是Spark Sql和Structured Sql穿挨。其中Spark sql用于查詢(xún)模塊,可以聯(lián)合多個(gè)數(shù)據(jù)源進(jìn)行查詢(xún)肴盏。Structured Sql用于流式數(shù)據(jù)處理科盛。)
部署方式有:
1、本地運(yùn)行模式:new SparkConf().setAppName(“sparkName”)
.setMaster(config.getString(“l(fā)ocal[*]”)))
2菜皂、Stanalone模式:
1)由master/slaves服務(wù)組成的
2)各個(gè)節(jié)點(diǎn)上的資源被抽象成粗粒度的slot贞绵,有多少slot就能同時(shí)運(yùn)行多少task。
3)部署時(shí)通過(guò)spark-env.sh和slave配置文件進(jìn)行配置恍飘,使用start-all.sh可以一鍵啟動(dòng)榨崩。
3、EC2模式:
部署于云端章母。
4母蛛、Spark on Mesos模式:
支持粗粒度模式和細(xì)粒度模式。
1)粗粒度模式:應(yīng)用程序的各個(gè)任務(wù)正式運(yùn)行之前乳怎,需要將運(yùn)行環(huán)境中的資源全部申請(qǐng)好彩郊,且運(yùn)行過(guò) 程中要一直占用這些資源,即使不用蚪缀,最后程序運(yùn)行結(jié)束后秫逝,回收這些資源。比如你提交應(yīng)用程序時(shí)询枚,指定使用5個(gè)executor運(yùn)行你的應(yīng)用程序违帆,每個(gè)executor占用5GB內(nèi)存和5個(gè)CPU,每個(gè)executor內(nèi)部設(shè)置了5個(gè)slot金蜀,則Mesos需要先為executor分配資源并啟動(dòng)它們刷后,之后開(kāi)始調(diào)度任務(wù)。另外渊抄,在程序運(yùn)行過(guò)程中惠险,mesos的master和slave并不知道executor內(nèi)部各個(gè)task的運(yùn)行情況,executor直接將任務(wù)狀態(tài)通過(guò)內(nèi)部的通信機(jī)制匯報(bào)給Driver抒线,從一定程度上可以認(rèn)為班巩,每個(gè)應(yīng)用程序利用mesos搭建了一個(gè)虛擬集群自己使用。
2)細(xì)粒度模式:鑒于粗粒度模式會(huì)造成大量資源浪費(fèi)嘶炭,Spark On Mesos還提供了另外一種調(diào)度模式:細(xì)粒度模式抱慌,這種模式類(lèi)似于現(xiàn)在的云計(jì)算,思想是按需分配眨猎。與粗粒度模式一樣抑进,應(yīng)用程序啟動(dòng)時(shí),先會(huì)啟動(dòng)executor睡陪,但每個(gè)executor占用資源僅僅是自己運(yùn)行所需的資源寺渗,不需要考慮將來(lái)要運(yùn)行的任務(wù)匿情,之后,mesos會(huì)為每個(gè)executor動(dòng)態(tài)分配資源信殊,每分配一些炬称,便可以運(yùn)行一個(gè)新任務(wù),單個(gè)Task運(yùn)行完之后可以馬上釋放對(duì)應(yīng)的資源涡拘。每個(gè)Task會(huì)匯報(bào)狀態(tài)給Mesos slave和Mesos Master玲躯,便于更加細(xì)粒度管理和容錯(cuò),這種調(diào)度模式類(lèi)似于MapReduce調(diào)度模式鳄乏,每個(gè)Task完全獨(dú)立跷车,優(yōu)點(diǎn)是便于資源控制和隔離,但缺點(diǎn)也很明顯橱野,短作業(yè)運(yùn)行延遲大朽缴。
5、Spark on yarn模式:
支持粗粒度模式水援,只要用yarn的resource manage進(jìn)行調(diào)度管理不铆。(目前選擇的是該模式)
(細(xì)粒度模式尚未實(shí)現(xiàn) https://issues.apache.org/jira/browse/YARN-1197)
集成性:
Spark可以很好的集成HDFS,HBase裹唆,Elatatic Search誓斥,kudu等存儲(chǔ)系統(tǒng),mysql等關(guān)系性數(shù)據(jù)庫(kù)和json csv等靜態(tài)文件處理。
二许帐、Spark基本架構(gòu):
1)Cluster Manager:在standalone模式中即為Master主節(jié)點(diǎn)劳坑,控制整個(gè)集群,監(jiān)控worker成畦。在YARN模式中為資源管理器
2)Worker節(jié)點(diǎn):從節(jié)點(diǎn)距芬,負(fù)責(zé)控制計(jì)算節(jié)點(diǎn),啟動(dòng)Executor或者Driver循帐。
3)Driver: 運(yùn)行Application 的main()函數(shù)
4)Executor:執(zhí)行器框仔,是為某個(gè)Application運(yùn)行在worker node上的一個(gè)進(jìn)程
三、運(yùn)行流程
1)創(chuàng)建Spark context
2)Spark context向Cluster manager申請(qǐng)運(yùn)行Executor資源拄养,并啟動(dòng)StandaloneExecutorbackend
3)Executor向SparkContext申請(qǐng)Task
4)SparkContext將應(yīng)用程序分發(fā)給Executor
5)SparkContext構(gòu)建成DAG圖离斩,將DAG圖分解成Stage、將Taskset發(fā)送給Task Scheduler瘪匿,最后由Task Scheduler將Task發(fā)送給Executor運(yùn)行
6)Task在Executor上運(yùn)行跛梗,運(yùn)行完釋放所有資源
四、Cluster模式和client模式
yarn-cluster模式下棋弥,driver運(yùn)行在AM(Application Master)中核偿,它負(fù)責(zé)向YARN申請(qǐng)資源,并監(jiān)督作業(yè)的運(yùn)行狀況顽染。當(dāng)用戶(hù)提交了作業(yè)之后漾岳,就可以關(guān)掉Client轰绵,作業(yè)會(huì)繼續(xù)在YARN上運(yùn)行。yarn-cluster模式不適合運(yùn)行交互類(lèi)型的作業(yè)尼荆。
Yarn-client模式下左腔,Application Master僅僅向YARN請(qǐng)求executor,client會(huì)和請(qǐng)求的container通信來(lái)調(diào)度他們工作耀找。
五翔悠、Spark sql
Spark sql應(yīng)用于查詢(xún)模塊业崖。
以CSV文件為例野芒,前端查詢(xún)?yōu)閟elect * from CSV.test
1)通過(guò)CSV.test查詢(xún)數(shù)據(jù)庫(kù)獲取對(duì)應(yīng)的csv文件存儲(chǔ)路徑path。
2)spark讀取path對(duì)應(yīng)的hdfs文件生成dataset
3)dataset.createTempView()生成臨時(shí)表testTable
4)spark執(zhí)行sql双炕,select * from testTable并返回結(jié)果
六狞悲、Structured Streaming
Spark2.0中提出一個(gè)概念,continuous applications(連續(xù)應(yīng)用程序)妇斤。
Spark Streaming等流式處理引擎致力于流式數(shù)據(jù)的運(yùn)算摇锋,比如通過(guò)map運(yùn)行一個(gè)方法來(lái)改變流中的每一條記錄,通過(guò)reduce可以基于時(shí)間做數(shù)據(jù)聚合站超。但是荸恕,事實(shí)上很少有只在流式數(shù)據(jù)上做運(yùn)算的需求,相對(duì)的死相,流式處理往往是一個(gè)大型應(yīng)用的一部分融求。continuous applications提出后,實(shí)時(shí)運(yùn)算作為一部分算撮,不同系統(tǒng)間的交互等也可以由Structured Streaming來(lái)處理生宛。如下圖,左側(cè)為Spark Streaming類(lèi)的流式引擎肮柜,交互是由使用者來(lái)處理陷舅;右側(cè)為Strctured Streaming類(lèi)的連續(xù)應(yīng)用,交互由應(yīng)用來(lái)處理审洞。(https://databricks.com/blog/2016/07/28/continuous-applications-evolving-streaming-in-apache-spark-2-0.html)
Structured Streaming是一個(gè)建立在Spark sql引擎上的可擴(kuò)展高容錯(cuò)的流式處理引擎莱睁。它使得可以像對(duì)靜態(tài)數(shù)據(jù)進(jìn)行批量處理一樣來(lái)處理流式數(shù)據(jù)。Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
Structured Streaming抽象了一個(gè)DataSet中無(wú)邊界的表芒澜。structured streaming將流數(shù)據(jù)看作是一張沒(méi)有邊界的表缩赛,流數(shù)據(jù)不斷的向表尾增加數(shù)據(jù)
在每一個(gè)周期(默認(rèn)1s),新的內(nèi)容將會(huì)增加到表尾撰糠,查詢(xún)的結(jié)果將會(huì)更新到結(jié)果表中酥馍。一旦結(jié)果表被更新,就需要將改變后的表內(nèi)容輸出到外部的sink中阅酪。
如Kafka—etl—es的過(guò)程旨袒,spark每秒鐘從source—kafka讀取一批數(shù)據(jù)汁针,寫(xiě)入無(wú)邊界表中,通過(guò)dataset的spark sql操作進(jìn)行ETL轉(zhuǎn)換砚尽,更新result表施无,隨著result表更新,變化的result行將被寫(xiě)入外部sink—es必孤。
source類(lèi)型:File source猾骡,Kafka source Socket source
sink類(lèi)型:File sink,F(xiàn)oreach sink敷搪,Console sink兴想,Memory sink,其中es sink是由Elastatic search擴(kuò)展的赡勘。
輸出模式:
Complete mode: 不刪除任何數(shù)據(jù)嫂便,在 Result Table 中保留所有數(shù)據(jù),每次觸發(fā)操作輸出所有窗口數(shù)據(jù)闸与;
Append mode: 當(dāng)確定不會(huì)更新窗口時(shí)毙替,將會(huì)輸出該窗口的數(shù)據(jù)并刪除,保證每個(gè)窗口的數(shù)據(jù)只會(huì)輸出一次践樱;
Updated mode: 刪除不再更新的時(shí)間窗口厂画,每次觸發(fā)聚合操作時(shí),輸出更新的窗口拷邢。
聚合:輸出模式必須是Append或Updated袱院。sink為es時(shí)只能是Append。
Event time:時(shí)間發(fā)生時(shí)間解孙,來(lái)源于source數(shù)據(jù)中的時(shí)間列坑填。
Watermark:數(shù)據(jù)過(guò)期時(shí)間。