Spark入門實(shí)戰(zhàn)系列--7.Spark Streaming(上)--實(shí)時(shí)流計(jì)算Spark Streaming原理介紹 - shishanyuan - 博客園
http://www.cnblogs.com/shishanyuan/p/4747735.html
1、Spark Streaming簡(jiǎn)介
1.1 概述
Spark Streaming 是Spark核心API的一個(gè)擴(kuò)展各吨,可以實(shí)現(xiàn)高吞吐量的粥航、具備容錯(cuò)機(jī)制的實(shí)時(shí)流數(shù)據(jù)的處理蚌卤。支持從多種數(shù)據(jù)源獲取數(shù)據(jù)似忧,包括Kafk扛点、Flume术辐、Twitter贷揽、ZeroMQ、Kinesis 以及TCP sockets镜悉,從數(shù)據(jù)源獲取數(shù)據(jù)之后祟辟,可以使用諸如map、reduce侣肄、join和window等高級(jí)函數(shù)進(jìn)行復(fù)雜算法的處理旧困。最后還可以將處理結(jié)果存儲(chǔ)到文件系統(tǒng),數(shù)據(jù)庫(kù)和現(xiàn)場(chǎng)儀表盤茫孔。在“One Stack rule them all”的基礎(chǔ)上叮喳,還可以使用Spark的其他子框架,如集群學(xué)習(xí)缰贝、圖計(jì)算等,對(duì)流數(shù)據(jù)進(jìn)行處理畔濒。
Spark Streaming處理的數(shù)據(jù)流圖:
Spark的各個(gè)子框架,都是基于核心Spark的,Spark Streaming在內(nèi)部的處理機(jī)制是红符,接收實(shí)時(shí)流的數(shù)據(jù),并根據(jù)一定的時(shí)間間隔拆分成一批批的數(shù)據(jù)毅整,然后通過Spark Engine處理這些批數(shù)據(jù),最終得到處理后的一批批結(jié)果數(shù)據(jù)绽左。
對(duì)應(yīng)的批數(shù)據(jù)悼嫉,在Spark內(nèi)核對(duì)應(yīng)一個(gè)RDD實(shí)例,因此拼窥,對(duì)應(yīng)流數(shù)據(jù)的DStream可以看成是一組RDDs戏蔑,即RDD的一個(gè)序列。通俗點(diǎn)理解的話鲁纠,在流數(shù)據(jù)分成一批一批后总棵,通過一個(gè)先進(jìn)先出的隊(duì)列,然后 Spark Engine從該隊(duì)列中依次取出一個(gè)個(gè)批數(shù)據(jù)改含,把批數(shù)據(jù)封裝成一個(gè)RDD情龄,然后進(jìn)行處理,這是一個(gè)典型的生產(chǎn)者消費(fèi)者模型捍壤,對(duì)應(yīng)的就有生產(chǎn)者消費(fèi)者模型的問題骤视,即如何協(xié)調(diào)生產(chǎn)速率和消費(fèi)速率。
1.2 術(shù)語(yǔ)定義
l離散流(discretized stream)或DStream:這是Spark Streaming對(duì)內(nèi)部持續(xù)的實(shí)時(shí)數(shù)據(jù)流的抽象描述鹃觉,即我們處理的一個(gè)實(shí)時(shí)數(shù)據(jù)流专酗,在Spark Streaming中對(duì)應(yīng)于一個(gè)DStream 實(shí)例。
l批數(shù)據(jù)(batch data):這是化整為零的第一步帜慢,將實(shí)時(shí)流數(shù)據(jù)以時(shí)間片為單位進(jìn)行分批笼裳,將流處理轉(zhuǎn)化為時(shí)間片數(shù)據(jù)的批處理。隨著持續(xù)時(shí)間的推移粱玲,這些處理結(jié)果就形成了對(duì)應(yīng)的結(jié)果數(shù)據(jù)流了躬柬。
l時(shí)間片或批處理時(shí)間間隔( batch interval):這是人為地對(duì)流數(shù)據(jù)進(jìn)行定量的標(biāo)準(zhǔn),以時(shí)間片作為我們拆分流數(shù)據(jù)的依據(jù)抽减。一個(gè)時(shí)間片的數(shù)據(jù)對(duì)應(yīng)一個(gè)RDD實(shí)例允青。
l窗口長(zhǎng)度(window length):一個(gè)窗口覆蓋的流數(shù)據(jù)的時(shí)間長(zhǎng)度。必須是批處理時(shí)間間隔的倍數(shù)卵沉,
l滑動(dòng)時(shí)間間隔:前一個(gè)窗口到后一個(gè)窗口所經(jīng)過的時(shí)間長(zhǎng)度颠锉。必須是批處理時(shí)間間隔的倍數(shù)
lInput DStream :一個(gè)input DStream是一個(gè)特殊的DStream,將Spark Streaming連接到一個(gè)外部數(shù)據(jù)源來(lái)讀取數(shù)據(jù)史汗。
1.3 Storm與Spark Streming比較
l處理模型以及延遲
雖然兩框架都提供了可擴(kuò)展性(scalability)和可容錯(cuò)性(fault tolerance)琼掠,但是它們的處理模型從根本上說(shuō)是不一樣的。Storm可以實(shí)現(xiàn)亞秒級(jí)時(shí)延的處理停撞,而每次只處理一條event瓷蛙,而Spark Streaming可以在一個(gè)短暫的時(shí)間窗口里面處理多條(batches)Event悼瓮。所以說(shuō)Storm可以實(shí)現(xiàn)亞秒級(jí)時(shí)延的處理,而Spark Streaming則有一定的時(shí)延艰猬。
l容錯(cuò)和數(shù)據(jù)保證
然而兩者的代價(jià)都是容錯(cuò)時(shí)候的數(shù)據(jù)保證横堡,Spark Streaming的容錯(cuò)為有狀態(tài)的計(jì)算提供了更好的支持。在Storm中冠桃,每條記錄在系統(tǒng)的移動(dòng)過程中都需要被標(biāo)記跟蹤命贴,所以Storm只能保證每條記錄最少被處理一次,但是允許從錯(cuò)誤狀態(tài)恢復(fù)時(shí)被處理多次食听。這就意味著可變更的狀態(tài)可能被更新兩次從而導(dǎo)致結(jié)果不正確胸蛛。
任一方面,Spark Streaming僅僅需要在批處理級(jí)別對(duì)記錄進(jìn)行追蹤碳蛋,所以他能保證每個(gè)批處理記錄僅僅被處理一次胚泌,即使是node節(jié)點(diǎn)掛掉。雖然說(shuō)Storm的 Trident library可以保證一條記錄被處理一次肃弟,但是它依賴于事務(wù)更新狀態(tài)玷室,而這個(gè)過程是很慢的,并且需要由用戶去實(shí)現(xiàn)笤受。
l實(shí)現(xiàn)和編程API
Storm主要是由Clojure語(yǔ)言實(shí)現(xiàn)穷缤,Spark Streaming是由Scala實(shí)現(xiàn)。如果你想看看這兩個(gè)框架是如何實(shí)現(xiàn)的或者你想自定義一些東西你就得記住這一點(diǎn)箩兽。Storm是由BackType和 Twitter開發(fā)津肛,而Spark Streaming是在UC Berkeley開發(fā)的。
Storm提供了Java API汗贫,同時(shí)也支持其他語(yǔ)言的API身坐。 Spark Streaming支持Scala和Java語(yǔ)言(其實(shí)也支持Python)。
l批處理框架集成
Spark Streaming的一個(gè)很棒的特性就是它是在Spark框架上運(yùn)行的落包。這樣你就可以想使用其他批處理代碼一樣來(lái)寫Spark Streaming程序部蛇,或者是在Spark中交互查詢。這就減少了單獨(dú)編寫流批量處理程序和歷史數(shù)據(jù)處理程序咐蝇。
l生產(chǎn)支持
Storm已經(jīng)出現(xiàn)好多年了涯鲁,而且自從2011年開始就在Twitter內(nèi)部生產(chǎn)環(huán)境中使用,還有其他一些公司有序。而Spark Streaming是一個(gè)新的項(xiàng)目抹腿,并且在2013年僅僅被Sharethrough使用(據(jù)作者了解)。
Storm是 Hortonworks Hadoop數(shù)據(jù)平臺(tái)中流處理的解決方案旭寿,而Spark Streaming出現(xiàn)在 MapR的分布式平臺(tái)和Cloudera的企業(yè)數(shù)據(jù)平臺(tái)中警绩。除此之外,Databricks是為Spark提供技術(shù)支持的公司盅称,包括了Spark Streaming房蝉。
雖然說(shuō)兩者都可以在各自的集群框架中運(yùn)行僚匆,但是Storm可以在Mesos上運(yùn)行, 而Spark Streaming可以在YARN和Mesos上運(yùn)行微渠。
2搭幻、運(yùn)行原理
2.1 Streaming架構(gòu)
SparkStreaming是一個(gè)對(duì)實(shí)時(shí)數(shù)據(jù)流進(jìn)行高通量、容錯(cuò)處理的流式處理系統(tǒng)逞盆,可以對(duì)多種數(shù)據(jù)源(如Kdfka檀蹋、Flume、Twitter云芦、Zero和TCP 套接字)進(jìn)行類似Map俯逾、Reduce和Join等復(fù)雜操作,并將結(jié)果保存到外部文件系統(tǒng)舅逸、數(shù)據(jù)庫(kù)或應(yīng)用到實(shí)時(shí)儀表盤桌肴。
l計(jì)算流程:Spark Streaming是將流式計(jì)算分解成一系列短小的批處理作業(yè)。這里的批處理引擎是Spark Core琉历,也就是把Spark Streaming的輸入數(shù)據(jù)按照batch size(如1秒)分成一段一段的數(shù)據(jù)(Discretized Stream)坠七,每一段數(shù)據(jù)都轉(zhuǎn)換成Spark中的RDD(Resilient Distributed Dataset),然后將Spark Streaming中對(duì)DStream的Transformation操作變?yōu)獒槍?duì)Spark中對(duì)RDD的Transformation操作旗笔,將RDD經(jīng)過操作變成中間結(jié)果保存在內(nèi)存中彪置。整個(gè)流式計(jì)算根據(jù)業(yè)務(wù)的需求可以對(duì)中間的結(jié)果進(jìn)行疊加或者存儲(chǔ)到外部設(shè)備。下圖顯示了Spark Streaming的整個(gè)流程蝇恶。
圖Spark Streaming構(gòu)架
l容錯(cuò)性:對(duì)于流式計(jì)算來(lái)說(shuō)拳魁,容錯(cuò)性至關(guān)重要。首先我們要明確一下Spark中RDD的容錯(cuò)機(jī)制撮弧。每一個(gè)RDD都是一個(gè)不可變的分布式可重算的數(shù)據(jù)集潘懊,其記錄著確定性的操作繼承關(guān)系(lineage),所以只要輸入數(shù)據(jù)是可容錯(cuò)的贿衍,那么任意一個(gè)RDD的分區(qū)(Partition)出錯(cuò)或不可用授舟,都是可以利用原始輸入數(shù)據(jù)通過轉(zhuǎn)換操作而重新算出的。
對(duì)于Spark Streaming來(lái)說(shuō)舌厨,其RDD的傳承關(guān)系如下圖所示岂却,圖中的每一個(gè)橢圓形表示一個(gè)RDD,橢圓形中的每個(gè)圓形代表一個(gè)RDD中的一個(gè)Partition裙椭,圖中的每一列的多個(gè)RDD表示一個(gè)DStream(圖中有三個(gè)DStream)躏哩,而每一行最后一個(gè)RDD則表示每一個(gè)Batch Size所產(chǎn)生的中間結(jié)果RDD。我們可以看到圖中的每一個(gè)RDD都是通過lineage相連接的揉燃,由于Spark Streaming輸入數(shù)據(jù)可以來(lái)自于磁盤扫尺,例如HDFS(多份拷貝)或是來(lái)自于網(wǎng)絡(luò)的數(shù)據(jù)流(Spark Streaming會(huì)將網(wǎng)絡(luò)輸入數(shù)據(jù)的每一個(gè)數(shù)據(jù)流拷貝兩份到其他的機(jī)器)都能保證容錯(cuò)性,所以RDD中任意的Partition出錯(cuò)炊汤,都可以并行地在其他機(jī)器上將缺失的Partition計(jì)算出來(lái)正驻。這個(gè)容錯(cuò)恢復(fù)方式比連續(xù)計(jì)算模型(如Storm)的效率更高弊攘。
Spark Streaming中RDD的lineage關(guān)系圖
l實(shí)時(shí)性:對(duì)于實(shí)時(shí)性的討論,會(huì)牽涉到流式處理框架的應(yīng)用場(chǎng)景姑曙。Spark Streaming將流式計(jì)算分解成多個(gè)Spark Job襟交,對(duì)于每一段數(shù)據(jù)的處理都會(huì)經(jīng)過Spark DAG圖分解以及Spark的任務(wù)集的調(diào)度過程。對(duì)于目前版本的Spark Streaming而言伤靠,其最小的Batch Size的選取在0.5~2秒鐘之間(Storm目前最小的延遲是100ms左右)捣域,所以Spark Streaming能夠滿足除對(duì)實(shí)時(shí)性要求非常高(如高頻實(shí)時(shí)交易)之外的所有流式準(zhǔn)實(shí)時(shí)計(jì)算場(chǎng)景。
l擴(kuò)展性與吞吐量:Spark目前在EC2上已能夠線性擴(kuò)展到100個(gè)節(jié)點(diǎn)(每個(gè)節(jié)點(diǎn)4Core)宴合,可以以數(shù)秒的延遲處理6GB/s的數(shù)據(jù)量(60M records/s)焕梅,其吞吐量也比流行的Storm高2~5倍,圖4是Berkeley利用WordCount和Grep兩個(gè)用例所做的測(cè)試卦洽,在Grep這個(gè)測(cè)試中贞言,Spark Streaming中的每個(gè)節(jié)點(diǎn)的吞吐量是670k records/s,而Storm是115k records/s阀蒂。
Spark Streaming與Storm吞吐量比較圖
2.2 編程模型
DStream(Discretized Stream)作為Spark Streaming的基礎(chǔ)抽象该窗,它代表持續(xù)性的數(shù)據(jù)流。這些數(shù)據(jù)流既可以通過外部輸入源賴獲取脂新,也可以通過現(xiàn)有的Dstream的transformation操作來(lái)獲得挪捕。在內(nèi)部實(shí)現(xiàn)上,DStream由一組時(shí)間序列上連續(xù)的RDD來(lái)表示争便。每個(gè)RDD都包含了自己特定時(shí)間間隔內(nèi)的數(shù)據(jù)流级零。如圖7-3所示。
圖7-3 DStream中在時(shí)間軸下生成離散的RDD序列
對(duì)DStream中數(shù)據(jù)的各種操作也是映射到內(nèi)部的RDD上來(lái)進(jìn)行的滞乙,如圖7-4所示奏纪,對(duì)Dtream的操作可以通過RDD的transformation生成新的DStream。這里的執(zhí)行引擎是Spark斩启。
2.2.1 如何使用Spark Streaming
作為構(gòu)建于Spark之上的應(yīng)用框架序调,Spark Streaming承襲了Spark的編程風(fēng)格,對(duì)于已經(jīng)了解Spark的用戶來(lái)說(shuō)能夠快速地上手兔簇。接下來(lái)以Spark Streaming官方提供的WordCount代碼為例來(lái)介紹Spark Streaming的使用方式发绢。
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
1.****創(chuàng)建StreamingContext對(duì)象 同Spark初始化需要?jiǎng)?chuàng)建SparkContext對(duì)象一樣,使用Spark Streaming就需要?jiǎng)?chuàng)建StreamingContext對(duì)象垄琐。創(chuàng)建StreamingContext對(duì)象所需的參數(shù)與SparkContext基本一致边酒,包括指明Master,設(shè)定名稱(如NetworkWordCount)狸窘。需要注意的是參數(shù)Seconds(1)墩朦,Spark Streaming需要指定處理數(shù)據(jù)的時(shí)間間隔,如上例所示的1s翻擒,那么Spark Streaming會(huì)以1s為時(shí)間窗口進(jìn)行數(shù)據(jù)處理氓涣。此參數(shù)需要根據(jù)用戶的需求和集群的處理能力進(jìn)行適當(dāng)?shù)脑O(shè)置牛哺;
2.****創(chuàng)建InputDStream如同Storm的Spout,Spark Streaming需要指明數(shù)據(jù)源劳吠。如上例所示的socketTextStream引润,Spark Streaming以socket連接作為數(shù)據(jù)源讀取數(shù)據(jù)。當(dāng)然Spark Streaming支持多種不同的數(shù)據(jù)源赴背,包括Kafka椰拒、Flume、HDFS/S3凰荚、Kinesis和Twitter等數(shù)據(jù)源;
3.****操作DStream對(duì)于從數(shù)據(jù)源得到的DStream褒脯,用戶可以在其基礎(chǔ)上進(jìn)行各種操作便瑟,如上例所示的操作就是一個(gè)典型的WordCount執(zhí)行流程:對(duì)于當(dāng)前時(shí)間窗口內(nèi)從數(shù)據(jù)源得到的數(shù)據(jù)首先進(jìn)行分割,然后利用Map和ReduceByKey方法進(jìn)行計(jì)算番川,當(dāng)然最后還有使用print()方法輸出結(jié)果到涂;
4.****啟動(dòng)Spark Streaming之前所作的所有步驟只是創(chuàng)建了執(zhí)行流程,程序沒有真正連接上數(shù)據(jù)源颁督,也沒有對(duì)數(shù)據(jù)進(jìn)行任何操作践啄,只是設(shè)定好了所有的執(zhí)行計(jì)劃,當(dāng)ssc.start()啟動(dòng)后程序才真正進(jìn)行所有預(yù)期的操作沉御。
至此對(duì)于Spark Streaming的如何使用有了一個(gè)大概的印象屿讽,在后面的章節(jié)我們會(huì)通過源代碼深入探究一下Spark Streaming的執(zhí)行流程。
2.2.2 DStream的輸入源
在Spark Streaming中所有的操作都是基于流的吠裆,而輸入源是這一系列操作的起點(diǎn)伐谈。輸入 DStreams 和 DStreams 接收的流都代表輸入數(shù)據(jù)流的來(lái)源,在Spark Streaming 提供兩種內(nèi)置數(shù)據(jù)流來(lái)源:
l 基礎(chǔ)來(lái)源 在 StreamingContext API 中直接可用的來(lái)源试疙。例如:文件系統(tǒng)诵棵、Socket(套接字)連接和 Akka actors;
l 高級(jí)來(lái)源 如 Kafka祝旷、Flume履澳、Kinesis、Twitter 等怀跛,可以通過額外的實(shí)用工具類創(chuàng)建距贷。
2.2.2.1 基礎(chǔ)來(lái)源
在前面分析怎樣使用Spark Streaming的例子中我們已看到ssc.socketTextStream()方法,可以通過 TCP 套接字連接敌完,從從文本數(shù)據(jù)中創(chuàng)建了一個(gè) DStream储耐。除了套接字,StreamingContext 的API還提供了方法從文件和 Akka actors 中創(chuàng)建DStreams作為輸入源滨溉。
Spark Streaming提供了streamingContext.fileStream(dataDirectory)方法可以從任何文件系統(tǒng)(如:HDFS什湘、S3长赞、NFS等)的文件中讀取數(shù)據(jù),然后創(chuàng)建一個(gè)DStream闽撤。Spark Streaming 監(jiān)控 dataDirectory 目錄和在該目錄下任何文件被創(chuàng)建處理(不支持在嵌套目錄下寫文件)得哆。需要注意的是:讀取的必須是具有相同的數(shù)據(jù)格式的文件;創(chuàng)建的文件必須在dataDirectory 目錄下哟旗,并通過自動(dòng)移動(dòng)或重命名成數(shù)據(jù)目錄贩据;文件一旦移動(dòng)就不能被改變,如果文件被不斷追加,新的數(shù)據(jù)將不會(huì)被閱讀闸餐。對(duì)于簡(jiǎn)單的文本文饱亮,可以使用一個(gè)簡(jiǎn)單的方法streamingContext.textFileStream(dataDirectory)來(lái)讀取數(shù)據(jù)。
Spark Streaming也可以基于自定義 Actors 的流創(chuàng)建DStream 舍沙,通過 Akka actors 接受數(shù)據(jù)流近上,使用方法streamingContext.actorStream(actorProps, actor-name)。Spark Streaming使用streamingContext.queueStream(queueOfRDDs)方法可以創(chuàng)建基于 RDD 隊(duì)列的DStream拂铡,每個(gè)RDD 隊(duì)列將被視為DStream 中一塊數(shù)據(jù)流進(jìn)行加工處理壹无。
2.2.2.2 高級(jí)來(lái)源
這一類的來(lái)源需要外部 non-Spark 庫(kù)的接口,其中一些有復(fù)雜的依賴關(guān)系(如 Kafka感帅、Flume)斗锭。因此通過這些來(lái)源創(chuàng)建DStreams 需要明確其依賴。例如失球,如果想創(chuàng)建一個(gè)使用 Twitter tweets 的數(shù)據(jù)的DStream 流岖是,必須按以下步驟來(lái)做:
1)在 SBT 或 Maven工程里添加 spark-streaming-twitter_2.10 依賴。
2)開發(fā):導(dǎo)入 TwitterUtils 包她倘,通過 TwitterUtils.createStream 方法創(chuàng)建一個(gè)DStream璧微。
3)部署:添加所有依賴的 jar 包(包括依賴的spark-streaming-twitter_2.10 及其依賴),然后部署應(yīng)用程序硬梁。
需要注意的是前硫,這些高級(jí)的來(lái)源一般在Spark Shell中不可用,因此基于這些高級(jí)來(lái)源的應(yīng)用不能在Spark Shell中進(jìn)行測(cè)試荧止。如果你必須在Spark shell中使用它們屹电,你需要下載相應(yīng)的Maven工程的Jar依賴并添加到類路徑中。
其中一些高級(jí)來(lái)源如下:
lTwitter Spark Streaming的TwitterUtils工具類使用Twitter4j跃巡,Twitter4J 庫(kù)支持通過任何方法提供身份驗(yàn)證信息赁咙,你可以得到公眾的流歌殃,或得到基于關(guān)鍵詞過濾流援所。
lFlume Spark Streaming可以從Flume中接受數(shù)據(jù)不狮。
lKafka Spark Streaming可以從Kafka中接受數(shù)據(jù)。
lKinesis Spark Streaming可以從Kinesis中接受數(shù)據(jù)。
需要重申的一點(diǎn)是在開始編寫自己的 SparkStreaming 程序之前偷线,一定要將高級(jí)來(lái)源依賴的Jar添加到SBT 或 Maven 項(xiàng)目相應(yīng)的artifact中磨确。常見的輸入源和其對(duì)應(yīng)的Jar包如下圖所示。
另外声邦,輸入DStream也可以創(chuàng)建自定義的數(shù)據(jù)源乏奥,需要做的就是實(shí)現(xiàn)一個(gè)用戶定義的接收器。
2.2.3 DStream的操作
與RDD類似亥曹,DStream也提供了自己的一系列操作方法邓了,這些操作可以分成三類:普通的轉(zhuǎn)換操作、窗口轉(zhuǎn)換操作和輸出操作媳瞪。
*2.2.3.1 *普通的轉(zhuǎn)換操作
普通的轉(zhuǎn)換操作如下表所示:
轉(zhuǎn)換
描述
map(func)
源 DStream的每個(gè)元素通過函數(shù)func返回一個(gè)新的DStream骗炉。
flatMap(func)
類似與map操作,不同的是每個(gè)輸入元素可以被映射出0或者更多的輸出元素材失。
filter(func)
在源DSTREAM上選擇Func函數(shù)返回僅為true的元素,最終返回一個(gè)新的DSTREAM 痕鳍。
repartition(numPartitions)
通過輸入的參數(shù)numPartitions的值來(lái)改變DStream的分區(qū)大小。
union(otherStream)
返回一個(gè)包含源DStream與其他 DStream的元素合并后的新DSTREAM龙巨。
count()
對(duì)源DStream內(nèi)部的所含有的RDD的元素?cái)?shù)量進(jìn)行計(jì)數(shù),返回一個(gè)內(nèi)部的RDD只包含一個(gè)元素的DStreaam熊响。
reduce(func)
使用函數(shù)func(有兩個(gè)參數(shù)并返回一個(gè)結(jié)果)將源DStream 中每個(gè)RDD的元素進(jìn)行聚 合操作,返回一個(gè)內(nèi)部所包含的RDD只有一個(gè)元素的新DStream旨别。
countByValue()
計(jì)算DStream中每個(gè)RDD內(nèi)的元素出現(xiàn)的頻次并返回新的DStream[(K,Long)],其中K是RDD中元素的類型汗茄,Long是元素出現(xiàn)的頻次秸弛。
reduceByKey(func, [numTasks])
當(dāng)一個(gè)類型為(K,V)鍵值對(duì)的DStream被調(diào)用的時(shí)候,返回類型為類型為(K洪碳,V)鍵值對(duì)的新 DStream,其中每個(gè)鍵的值V都是使用聚合函數(shù)func匯總递览。注意:默認(rèn)情況下,使用 Spark的默認(rèn)并行度提交任務(wù)(本地模式下并行度為2瞳腌,集群模式下位8)绞铃,可以通過配置numTasks設(shè)置不同的并行任務(wù)數(shù)。
join(otherStream, [numTasks])
當(dāng)被調(diào)用類型分別為(K嫂侍,V)和(K儿捧,W)鍵值對(duì)的2個(gè)DStream時(shí),返回類型為(K挑宠,(V菲盾,W))鍵值對(duì)的一個(gè)新 DSTREAM。
cogroup(otherStream, [numTasks])
當(dāng)被調(diào)用的兩個(gè)DStream分別含有(K, V) 和(K, W)鍵值對(duì)時(shí),返回一個(gè)(K, Seq[V], Seq[W])類型的新的DStream各淀。
transform(func)
通過對(duì)源DStream的每RDD應(yīng)用RDD-to-RDD函數(shù)返回一個(gè)新的DStream懒鉴,這可以用來(lái)在DStream做任意RDD操作。
updateStateByKey(func)
返回一個(gè)新狀態(tài)的DStream,其中每個(gè)鍵的狀態(tài)是根據(jù)鍵的前一個(gè)狀態(tài)和鍵的新值應(yīng)用給定函數(shù)func后的更新碎浇。這個(gè)方法可以被用來(lái)維持每個(gè)鍵的任何狀態(tài)數(shù)據(jù)临谱。
在上面列出的這些操作中璃俗,transform()方法和updateStateByKey()方法值得我們深入的探討一下:
l transform(func)****操作
該transform操作(轉(zhuǎn)換操作)連同其其類似的 transformWith操作允許DStream 上應(yīng)用任意RDD-to-RDD函數(shù)。它可以被應(yīng)用于未在 DStream API 中暴露任何的RDD操作吴裤。例如旧找,在每批次的數(shù)據(jù)流與另一數(shù)據(jù)集的連接功能不直接暴露在DStream API 中,但可以輕松地使用transform操作來(lái)做到這一點(diǎn)麦牺,這使得DStream的功能非常強(qiáng)大钮蛛。例如,你可以通過連接預(yù)先計(jì)算的垃圾郵件信息的輸入數(shù)據(jù)流(可能也有Spark生成的)剖膳,然后基于此做實(shí)時(shí)數(shù)據(jù)清理的篩選魏颓,如下面官方提供的偽代碼所示。事實(shí)上吱晒,也可以在transform方法中使用機(jī)器學(xué)習(xí)和圖形計(jì)算的算法甸饱。
l updateStateByKey****操作
該 updateStateByKey 操作可以讓你保持任意狀態(tài),同時(shí)不斷有新的信息進(jìn)行更新仑濒。要使用此功能叹话,必須進(jìn)行兩個(gè)步驟 :
(1) 定義狀態(tài) - 狀態(tài)可以是任意的數(shù)據(jù)類型。
(2) 定義狀態(tài)更新函數(shù) - 用一個(gè)函數(shù)指定如何使用先前的狀態(tài)和從輸入流中獲取的新值 更新狀態(tài)墩瞳。
讓我們用一個(gè)例子來(lái)說(shuō)明驼壶,假設(shè)你要進(jìn)行文本數(shù)據(jù)流中單詞計(jì)數(shù)。在這里喉酌,正在運(yùn)行的計(jì)數(shù)是狀態(tài)而且它是一個(gè)整數(shù)热凹。我們定義了更新功能如下:
此函數(shù)應(yīng)用于含有鍵值對(duì)的DStream中(如前面的示例中,在DStream中含有(word泪电,1)鍵值對(duì))般妙。它會(huì)針對(duì)里面的每個(gè)元素(如wordCount中的word)調(diào)用一下更新函數(shù),newValues是最新的值相速,runningCount是之前的值碟渺。
*2.2.3.2 *窗口轉(zhuǎn)換操作
Spark Streaming 還提供了窗口的計(jì)算,它允許你通過滑動(dòng)窗口對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換和蚪,窗口轉(zhuǎn)換操作如下:
轉(zhuǎn)換
描述
window(windowLength, slideInterval)
返回一個(gè)基于源DStream的窗口批次計(jì)算后得到新的DStream止状。
countByWindow(windowLength,slideInterval)
返回基于滑動(dòng)窗口的DStream中的元素的數(shù)量。
reduceByWindow(func, windowLength,slideInterval)
基于滑動(dòng)窗口對(duì)源DStream中的元素進(jìn)行聚合操作攒霹,得到一個(gè)新的DStream怯疤。
reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks])
基于滑動(dòng)窗口對(duì)(K,V)鍵值對(duì)類型的DStream中的值按K使用聚合函數(shù)func進(jìn)行聚合操作催束,得到一個(gè)新的DStream集峦。
reduceByKeyAndWindow(func, invFunc,windowLength,slideInterval, [numTasks])
一個(gè)更高效的reduceByKkeyAndWindow()的實(shí)現(xiàn)版本,先對(duì)滑動(dòng)窗口中新的時(shí)間間隔內(nèi)數(shù)據(jù)增量聚合并移去最早的與新增數(shù)據(jù)量的時(shí)間間隔內(nèi)的數(shù)據(jù)統(tǒng)計(jì)量。例如塔淤,計(jì)算t+4秒這個(gè)時(shí)刻過去5秒窗口的WordCount摘昌,那么我們可以將t+3時(shí)刻過去5秒的統(tǒng)計(jì)量加上[t+3,t+4]的統(tǒng)計(jì)量高蜂,在減去[t-2聪黎,t-1]的統(tǒng)計(jì)量,這種方法可以復(fù)用中間三秒的統(tǒng)計(jì)量备恤,提高統(tǒng)計(jì)的效率稿饰。
countByValueAndWindow(windowLength,slideInterval, [numTasks])
基于滑動(dòng)窗口計(jì)算源DStream中每個(gè)RDD內(nèi)每個(gè)元素出現(xiàn)的頻次并返回DStream[(K,Long)],其中K是RDD中元素的類型露泊,Long是元素頻次喉镰。與countByValue一樣,reduce任務(wù)的數(shù)量可以通過一個(gè)可選參數(shù)進(jìn)行配置惭笑。
批處理間隔示意圖
在Spark Streaming中侣姆,數(shù)據(jù)處理是按批進(jìn)行的,而數(shù)據(jù)采集是逐條進(jìn)行的沉噩,因此在Spark Streaming中會(huì)先設(shè)置好批處理間隔(batch duration)捺宗,當(dāng)超過批處理間隔的時(shí)候就會(huì)把采集到的數(shù)據(jù)匯總起來(lái)成為一批數(shù)據(jù)交給系統(tǒng)去處理。
對(duì)于窗口操作而言川蒙,在其窗口內(nèi)部會(huì)有N個(gè)批處理數(shù)據(jù)偿凭,批處理數(shù)據(jù)的大小由窗口間隔(window duration)決定,而窗口間隔指的就是窗口的持續(xù)時(shí)間派歌,在窗口操作中,只有窗口的長(zhǎng)度滿足了才會(huì)觸發(fā)批數(shù)據(jù)的處理痰哨。除了窗口的長(zhǎng)度胶果,窗口操作還有另一個(gè)重要的參數(shù)就是滑動(dòng)間隔(slide duration),它指的是經(jīng)過多長(zhǎng)時(shí)間窗口滑動(dòng)一次形成新的窗口斤斧,滑動(dòng)窗口默認(rèn)情況下和批次間隔的相同早抠,而窗口間隔一般設(shè)置的要比它們兩個(gè)大。在這里必須注意的一點(diǎn)是滑動(dòng)間隔和窗口間隔的大小一定得設(shè)置為批處理間隔的整數(shù)倍撬讽。
如批處理間隔示意圖所示蕊连,批處理間隔是1個(gè)時(shí)間單位,窗口間隔是3個(gè)時(shí)間單位游昼,滑動(dòng)間隔是2個(gè)時(shí)間單位甘苍。對(duì)于初始的窗口time 1-time 3,只有窗口間隔滿足了才觸發(fā)數(shù)據(jù)的處理烘豌。這里需要注意的一點(diǎn)是载庭,初始的窗口有可能流入的數(shù)據(jù)沒有撐滿,但是隨著時(shí)間的推進(jìn),窗口最終會(huì)被撐滿囚聚。當(dāng)每個(gè)2個(gè)時(shí)間單位靖榕,窗口滑動(dòng)一次后,會(huì)有新的數(shù)據(jù)流入窗口顽铸,這時(shí)窗口會(huì)移去最早的兩個(gè)時(shí)間單位的數(shù)據(jù)茁计,而與最新的兩個(gè)時(shí)間單位的數(shù)據(jù)進(jìn)行匯總形成新的窗口(time3-time5)。
對(duì)于窗口操作谓松,批處理間隔星压、窗口間隔和滑動(dòng)間隔是非常重要的三個(gè)時(shí)間概念,是理解窗口操作的關(guān)鍵所在毒返。
*2.2.3.3 *輸出操作
Spark Streaming允許DStream的數(shù)據(jù)被輸出到外部系統(tǒng)租幕,如數(shù)據(jù)庫(kù)或文件系統(tǒng)。由于輸出操作實(shí)際上使transformation操作后的數(shù)據(jù)可以通過外部系統(tǒng)被使用拧簸,同時(shí)輸出操作觸發(fā)所有DStream的transformation操作的實(shí)際執(zhí)行(類似于RDD操作)劲绪。以下表列出了目前主要的輸出操作:
轉(zhuǎn)換
描述
print()
在Driver中打印出DStream中數(shù)據(jù)的前10個(gè)元素。
saveAsTextFiles(prefix, [suffix])
將DStream中的內(nèi)容以文本的形式保存為文本文件盆赤,其中每次批處理間隔內(nèi)產(chǎn)生的文件以prefix-TIME_IN_MS[.suffix]的方式命名贾富。
saveAsObjectFiles(prefix, [suffix])
將DStream中的內(nèi)容按對(duì)象序列化并且以SequenceFile的格式保存。其中每次批處理間隔內(nèi)產(chǎn)生的文件以prefix-TIME_IN_MS[.suffix]的方式命名牺六。
saveAsHadoopFiles(prefix, [suffix])
將DStream中的內(nèi)容以文本的形式保存為Hadoop文件颤枪,其中每次批處理間隔內(nèi)產(chǎn)生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
foreachRDD(func)
最基本的輸出操作淑际,將func函數(shù)應(yīng)用于DStream中的RDD上畏纲,這個(gè)操作會(huì)輸出數(shù)據(jù)到外部系統(tǒng),比如保存RDD到文件或者網(wǎng)絡(luò)數(shù)據(jù)庫(kù)等春缕。需要注意的是func函數(shù)是在運(yùn)行該streaming應(yīng)用的Driver進(jìn)程里執(zhí)行的盗胀。
dstream.foreachRDD是一個(gè)非常強(qiáng)大的輸出操作,它允將許數(shù)據(jù)輸出到外部系統(tǒng)锄贼。但是 票灰,如何正確高效地使用這個(gè)操作是很重要的,下面展示了如何去避免一些常見的錯(cuò)誤宅荤。
通常將數(shù)據(jù)寫入到外部系統(tǒng)需要?jiǎng)?chuàng)建一個(gè)連接對(duì)象(如 TCP連接到遠(yuǎn)程服務(wù)器)屑迂,并用它來(lái)發(fā)送數(shù)據(jù)到遠(yuǎn)程系統(tǒng)。出于這個(gè)目的冯键,開發(fā)者可能在不經(jīng)意間在Spark driver端創(chuàng)建了連接對(duì)象惹盼,并嘗試使用它保存RDD中的記錄到Spark worker上,如下面代碼:
這是不正確的琼了,這需要連接對(duì)象進(jìn)行序列化并從Driver端發(fā)送到Worker上逻锐。連接對(duì)象很少在不同機(jī)器間進(jìn)行這種操作夫晌,此錯(cuò)誤可能表現(xiàn)為序列化錯(cuò)誤(連接對(duì)不可序列化),初始化錯(cuò)誤(連接對(duì)象在需要在Worker 上進(jìn)行需要初始化) 等等昧诱,正確的解決辦法是在 worker上創(chuàng)建的連接對(duì)象晓淀。
通常情況下,創(chuàng)建一個(gè)連接對(duì)象有時(shí)間和資源開銷盏档。因此凶掰,創(chuàng)建和銷毀的每條記錄的連接對(duì)象可能招致不必要的資源開銷,并顯著降低系統(tǒng)整體的吞吐量 蜈亩。一個(gè)更好的解決方案是使用rdd.foreachPartition方法創(chuàng)建一個(gè)單獨(dú)的連接對(duì)象懦窘,然后使用該連接對(duì)象輸出的所有RDD分區(qū)中的數(shù)據(jù)到外部系統(tǒng)。
這緩解了創(chuàng)建多條記錄連接的開銷稚配。最后畅涂,還可以進(jìn)一步通過在多個(gè)RDDs/ batches上重用連接對(duì)象進(jìn)行優(yōu)化。一個(gè)保持連接對(duì)象的靜態(tài)池可以重用在多個(gè)批處理的RDD上將其輸出到外部系統(tǒng)道川,從而進(jìn)一步降低了開銷午衰。
需要注意的是,在靜態(tài)池中的連接應(yīng)該按需延遲創(chuàng)建冒萄,這樣可以更有效地把數(shù)據(jù)發(fā)送到外部系統(tǒng)臊岸。另外需要要注意的是:DStreams延遲執(zhí)行的,就像RDD的操作是由actions觸發(fā)一樣尊流。默認(rèn)情況下帅戒,輸出操作會(huì)按照它們?cè)赟treaming應(yīng)用程序中定義的順序一個(gè)個(gè)執(zhí)行。
2.3 容錯(cuò)崖技、持久化和性能調(diào)優(yōu)
2.3.1 容錯(cuò)
DStream基于RDD組成逻住,RDD的容錯(cuò)性依舊有效,我們首先回憶一下SparkRDD的基本特性迎献。
lRDD是一個(gè)不可變的鄙信、確定性的可重復(fù)計(jì)算的分布式數(shù)據(jù)集。RDD的某些partition丟失了忿晕,可以通過血統(tǒng)(lineage)信息重新計(jì)算恢復(fù);
l如果RDD任何分區(qū)因worker節(jié)點(diǎn)故障而丟失银受,那么這個(gè)分區(qū)可以從原來(lái)依賴的容錯(cuò)數(shù)據(jù)集中恢復(fù)践盼;
l由于Spark中所有的數(shù)據(jù)的轉(zhuǎn)換操作都是基于RDD的,即使集群出現(xiàn)故障宾巍,只要輸入數(shù)據(jù)集存在咕幻,所有的中間結(jié)果都是可以被計(jì)算的。
Spark Streaming是可以從HDFS和S3這樣的文件系統(tǒng)讀取數(shù)據(jù)的顶霞,這種情況下所有的數(shù)據(jù)都可以被重新計(jì)算肄程,不用擔(dān)心數(shù)據(jù)的丟失锣吼。但是在大多數(shù)情況下,Spark Streaming是基于網(wǎng)絡(luò)來(lái)接受數(shù)據(jù)的蓝厌,此時(shí)為了實(shí)現(xiàn)相同的容錯(cuò)處理玄叠,在接受網(wǎng)絡(luò)的數(shù)據(jù)時(shí)會(huì)在集群的多個(gè)Worker節(jié)點(diǎn)間進(jìn)行數(shù)據(jù)的復(fù)制(默認(rèn)的復(fù)制數(shù)是2),這導(dǎo)致產(chǎn)生在出現(xiàn)故障時(shí)被處理的兩種類型的數(shù)據(jù):
1)Data received and replicated :一旦一個(gè)Worker節(jié)點(diǎn)失效拓提,系統(tǒng)會(huì)從另一份還存在的數(shù)據(jù)中重新計(jì)算读恃。
2)Data received but buffered for replication :一旦數(shù)據(jù)丟失,可以通過RDD之間的依賴關(guān)系代态,從HDFS這樣的外部文件系統(tǒng)讀取數(shù)據(jù)寺惫。
此外,有兩種故障蹦疑,我們應(yīng)該關(guān)心:
(1)Worker節(jié)點(diǎn)失效:通過上面的講解我們知道西雀,這時(shí)系統(tǒng)會(huì)根據(jù)出現(xiàn)故障的數(shù)據(jù)的類型,選擇是從另一個(gè)有復(fù)制過數(shù)據(jù)的工作節(jié)點(diǎn)上重新計(jì)算歉摧,還是直接從從外部文件系統(tǒng)讀取數(shù)據(jù)艇肴。
(2)Driver(驅(qū)動(dòng)節(jié)點(diǎn))失效 :如果運(yùn)行 Spark Streaming應(yīng)用時(shí)驅(qū)動(dòng)節(jié)點(diǎn)出現(xiàn)故障,那么很明顯的StreamingContext已經(jīng)丟失判莉,同時(shí)在內(nèi)存中的數(shù)據(jù)全部丟失豆挽。對(duì)于這種情況,Spark Streaming應(yīng)用程序在計(jì)算上有一個(gè)內(nèi)在的結(jié)構(gòu)——在每段micro-batch數(shù)據(jù)周期性地執(zhí)行同樣的Spark計(jì)算券盅。這種結(jié)構(gòu)允許把應(yīng)用的狀態(tài)(亦稱checkpoint)周期性地保存到可靠的存儲(chǔ)空間中帮哈,并在driver重新啟動(dòng)時(shí)恢復(fù)該狀態(tài)。具體做法是在ssc.checkpoint(<checkpoint directory>)函數(shù)中進(jìn)行設(shè)置锰镀,Spark Streaming就會(huì)定期把DStream的元信息寫入到HDFS中娘侍,一旦驅(qū)動(dòng)節(jié)點(diǎn)失效,丟失的StreamingContext會(huì)通過已經(jīng)保存的檢查點(diǎn)信息進(jìn)行恢復(fù)泳炉。
最后我們談一下Spark Stream的容錯(cuò)在Spark 1.2版本的一些改進(jìn):
實(shí)時(shí)流處理系統(tǒng)必須要能在24/7時(shí)間內(nèi)工作憾筏,因此它需要具備從各種系統(tǒng)故障中恢復(fù)過來(lái)的能力。最開始花鹅,SparkStreaming就支持從driver和worker故障恢復(fù)的能力氧腰。然而有些數(shù)據(jù)源的輸入可能在故障恢復(fù)以后丟失數(shù)據(jù)。在Spark1.2版本中刨肃,Spark已經(jīng)在SparkStreaming中對(duì)預(yù)寫日志(也被稱為journaling)作了初步支持古拴,改進(jìn)了恢復(fù)機(jī)制,并使更多數(shù)據(jù)源的零數(shù)據(jù)丟失有了可靠真友。
對(duì)于文件這樣的源數(shù)據(jù)黄痪,driver恢復(fù)機(jī)制足以做到零數(shù)據(jù)丟失,因?yàn)樗械臄?shù)據(jù)都保存在了像HDFS或S3這樣的容錯(cuò)文件系統(tǒng)中了盔然。但對(duì)于像Kafka和Flume等其它數(shù)據(jù)源桅打,有些接收到的數(shù)據(jù)還只緩存在內(nèi)存中是嗜,尚未被處理,它們就有可能會(huì)丟失挺尾。這是由于Spark應(yīng)用的分布操作方式引起的鹅搪。當(dāng)driver進(jìn)程失敗時(shí),所有在standalone/yarn/mesos集群運(yùn)行的executor潦嘶,連同它們?cè)趦?nèi)存中的所有數(shù)據(jù)涩嚣,也同時(shí)被終止。對(duì)于Spark Streaming來(lái)說(shuō)掂僵,從諸如Kafka和Flume的數(shù)據(jù)源接收到的所有數(shù)據(jù)航厚,在它們處理完成之前,一直都緩存在executor的內(nèi)存中锰蓬♂2牵縱然driver重新啟動(dòng),這些緩存的數(shù)據(jù)也不能被恢復(fù)芹扭。為了避免這種數(shù)據(jù)損失麻顶,在Spark1.2發(fā)布版本中引進(jìn)了預(yù)寫日志(WriteAheadLogs)功能。
預(yù)寫日志功能的流程是:1)一個(gè)SparkStreaming應(yīng)用開始時(shí)(也就是driver開始時(shí))舱卡,相關(guān)的StreamingContext使用SparkContext啟動(dòng)接收器成為長(zhǎng)駐運(yùn)行任務(wù)辅肾。這些接收器接收并保存流數(shù)據(jù)到Spark內(nèi)存中以供處理。2)接收器通知driver轮锥。3)接收塊中的元數(shù)據(jù)(metadata)被發(fā)送到driver的StreamingContext矫钓。這個(gè)元數(shù)據(jù)包括:(a)定位其在executor內(nèi)存中數(shù)據(jù)的塊referenceid,(b)塊數(shù)據(jù)在日志中的偏移信息(如果啟用了)舍杜。
用戶傳送數(shù)據(jù)的生命周期如下圖所示新娜。
類似Kafka這樣的系統(tǒng)可以通過復(fù)制數(shù)據(jù)保持可靠性。允許預(yù)寫日志兩次高效地復(fù)制同樣的數(shù)據(jù):一次由Kafka既绩,而另一次由SparkStreaming概龄。Spark未來(lái)版本將包含Kafka容錯(cuò)機(jī)制的原生支持,從而避免第二個(gè)日志饲握。
2.3.2 持久化
與RDD一樣嘶伟,DStream同樣也能通過persist()方法將數(shù)據(jù)流存放在內(nèi)存中出嘹,默認(rèn)的持久化方式是MEMORY_ONLY_SER柜裸,也就是在內(nèi)存中存放數(shù)據(jù)同時(shí)序列化的方式桃漾,這樣做的好處是遇到需要多次迭代計(jì)算的程序時(shí)丹莲,速度優(yōu)勢(shì)十分的明顯揖铜。而對(duì)于一些基于窗口的操作述寡,如reduceByWindow找岖、reduceByKeyAndWindow嫉晶,以及基于狀態(tài)的操作骑疆,如updateStateBykey田篇,其默認(rèn)的持久化策略就是保存在內(nèi)存中。
對(duì)于來(lái)自網(wǎng)絡(luò)的數(shù)據(jù)源(Kafka箍铭、Flume泊柬、sockets等),默認(rèn)的持久化策略是將數(shù)據(jù)保存在兩臺(tái)機(jī)器上诈火,這也是為了容錯(cuò)性而設(shè)計(jì)的兽赁。
另外,對(duì)于窗口和有狀態(tài)的操作必須checkpoint冷守,通過StreamingContext的checkpoint來(lái)指定目錄刀崖,通過 Dtream的checkpoint指定間隔時(shí)間,間隔必須是滑動(dòng)間隔(slide interval)的倍數(shù)拍摇。
2.3.3 性能調(diào)優(yōu)
1. 優(yōu)化運(yùn)行時(shí)間
l 增加并行度 確保使用整個(gè)集群的資源亮钦,而不是把任務(wù)集中在幾個(gè)特定的節(jié)點(diǎn)上。對(duì)于包含shuffle的操作充活,增加其并行度以確保更為充分地使用集群資源蜂莉;
l 減少數(shù)據(jù)序列化,反序列化的負(fù)擔(dān) Spark Streaming默認(rèn)將接受到的數(shù)據(jù)序列化后存儲(chǔ)混卵,以減少內(nèi)存的使用映穗。但是序列化和反序列話需要更多的CPU時(shí)間,因此更加高效的序列化方式(Kryo)和自定義的系列化接口可以更高效地使用CPU幕随;
l 設(shè)置合理的batch duration(批處理時(shí)間間) 在Spark Streaming中蚁滋,Job之間有可能存在依賴關(guān)系,后面的Job必須確保前面的作業(yè)執(zhí)行結(jié)束后才能提交合陵。若前面的Job執(zhí)行的時(shí)間超出了批處理時(shí)間間隔枢赔,那么后面的Job就無(wú)法按時(shí)提交,這樣就會(huì)進(jìn)一步拖延接下來(lái)的Job拥知,造成后續(xù)Job的阻塞踏拜。因此設(shè)置一個(gè)合理的批處理間隔以確保作業(yè)能夠在這個(gè)批處理間隔內(nèi)結(jié)束時(shí)必須的;
l 減少因任務(wù)提交和分發(fā)所帶來(lái)的負(fù)擔(dān) 通常情況下低剔,Akka框架能夠高效地確保任務(wù)及時(shí)分發(fā)速梗,但是當(dāng)批處理間隔非常小(500ms)時(shí)襟齿,提交和分發(fā)任務(wù)的延遲就變得不可接受了姻锁。使用Standalone和Coarse-grained Mesos模式通常會(huì)比使用Fine-grained Mesos模式有更小的延遲。
2. 優(yōu)化內(nèi)存使用
l控制batch size(批處理間隔內(nèi)的數(shù)據(jù)量) Spark Streaming會(huì)把批處理間隔內(nèi)接收到的所有數(shù)據(jù)存放在Spark內(nèi)部的可用內(nèi)存區(qū)域中猜欺,因此必須確保當(dāng)前節(jié)點(diǎn)Spark的可用內(nèi)存中少能容納這個(gè)批處理時(shí)間間隔內(nèi)的所有數(shù)據(jù)位隶,否則必須增加新的資源以提高集群的處理能力;
l及時(shí)清理不再使用的數(shù)據(jù) 前面講到Spark Streaming會(huì)將接受的數(shù)據(jù)全部存儲(chǔ)到內(nèi)部可用內(nèi)存區(qū)域中开皿,因此對(duì)于處理過的不再需要的數(shù)據(jù)應(yīng)及時(shí)清理涧黄,以確保Spark Streaming有富余的可用內(nèi)存空間篮昧。通過設(shè)置合理的spark.cleaner.ttl時(shí)長(zhǎng)來(lái)及時(shí)清理超時(shí)的無(wú)用數(shù)據(jù),這個(gè)參數(shù)需要小心設(shè)置以免后續(xù)操作中所需要的數(shù)據(jù)被超時(shí)錯(cuò)誤處理笋妥;
l觀察及適當(dāng)調(diào)整GC策略 GC會(huì)影響Job的正常運(yùn)行懊昨,可能延長(zhǎng)Job的執(zhí)行時(shí)間,引起一系列不可預(yù)料的問題春宣。觀察GC的運(yùn)行情況酵颁,采用不同的GC策略以進(jìn)一步減小內(nèi)存回收對(duì)Job運(yùn)行的影響。
參考資料:
(1)《Spark Streaming》 http://blog.debugo.com/spark-streaming/