1 Spark Streaming 透徹理解之一

本文內(nèi)容基于Spark最新版1.6.1

  1. Spark 最初只有Spark Core槐瑞,通過逐步的發(fā)展悼沿,現(xiàn)在已擴展出Spark SQL、Spark Streaming、Spark MLlib(machine learning)、GraphX(graph)、Spark R等。 而Spark Streaming本是Spark Core上的一個子框架,如果我們試著去精通這個子框架蠕搜,不僅僅能寫出非常復(fù)雜的應(yīng)用程序,還能夠很好的駕馭Spark掉伏,進而研究并達到精通Spark的地步,及其尋找到Spark問題的解決之道箍镜。

  2. 我們?yōu)槭裁磸腟park Streaming切入研究Spark源碼的定制薪夕,因為Spark SQL涉及到很多SQL語法解析和優(yōu)化的細節(jié)埂淮,對于我們集中精力研究Spark有所干擾;Spark R還不是很成熟痪蝇,支持功能有限;GraphX最近幾個版本基本沒有改進耙册,里面有許多數(shù)學(xué)算法;MLlib也涉及到相當(dāng)多的數(shù)學(xué)知識饶辙。

  3. Spark Streaming的優(yōu)勢是在于可以結(jié)合SparkSQL蹋宦、圖計算蒿辙、機器學(xué)習(xí),使其功能更加強大。同時在Spark中Spark Streaming也是最容易出現(xiàn)問題的,因為它是不斷的運行,內(nèi)部比較復(fù)雜唉擂。掌握好Spark Streaming空扎,可以去窺視Spark的一切撮慨!

  4. Spark Streaming到底是什么规伐? Spark Streaming是一個流式計算框架,運行在Spark Core之上吵护。這是一個流處理的時代用爪,一切數(shù)據(jù)如果不是以流式來處理或者跟流式的處理不相關(guān)的話颇玷,都將是次數(shù)據(jù)谒亦,我們必將處在一個流的數(shù)據(jù)處理時代锁摔。Spark Streaming很像是基于Spark Core之上的一個應(yīng)用程序十气。不像其他子框架籍胯,比如機器學(xué)習(xí)是把數(shù)學(xué)算法直接應(yīng)用在Spark的RDD之上妖爷,Spark Streaming更像一般的應(yīng)用程序那樣,感知流進來的數(shù)據(jù)并進行相應(yīng)的處理挪圾。很像順其自然的一種感知操作棚赔,利用自己獨有的“神經(jīng)元”來對數(shù)據(jù)進行各類操作捆毫。

  5. Spark Streaming的幾大優(yōu)點

  • 對源源不斷流進來的數(shù)據(jù)濒憋,能夠迅速響應(yīng)并立即給出你所要是反饋信息
  • Spark非常強大的地方在于它的流式處理可以在線的利用機器學(xué)習(xí)黔夭、圖計算婚惫、Spark SQL或者Spark R的成果,這得益于Spark多元化、一體化的基礎(chǔ)架構(gòu)設(shè)計餐济。也就是說篙悯,在Spark技術(shù)堆棧中矮燎,Spark Streaming可以調(diào)用任何的API接口峡谊,不需要做任何的設(shè)置贤壁。這是Spark無可匹敵之處名船,也是Spark Streaming必將一統(tǒng)天下的根源迷扇。
  1. 如何清晰的看到數(shù)據(jù)的流入厨内、被處理的過程? 使用一個小技巧瞭亮,通過調(diào)節(jié)放大Batch Interval的方式唆缴,來降低批處理次數(shù)趟紊,以方便看清楚各個環(huán)節(jié)铛嘱。
    我們從已寫過的廣告點擊的在線黑名單過濾的Spark Streaming應(yīng)用程序入手纹磺,看一下是具體的實驗源碼:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
 * 使用Scala開發(fā)集群運行的Spark 在線黑名單過濾程序
 *
 * 背景描述:在廣告點擊計費系統(tǒng)中橄杨,我們在線過濾掉黑名單的點擊式矫,
進而保護廣告商的利益衷佃,只進行有效的廣告點擊計費
 *  或者在防刷評分(或者流量)系統(tǒng)锄列,過濾掉無效的投票或者評分或者流量竣况;
 * 實現(xiàn)技術(shù):使用transform Api直接基于RDD編程丹泉,進行join操作
 *
 */
object OnlineBlackListFilter {
  def main(args: Array[String]){
    /**
     * 第1步:創(chuàng)建Spark的配置對象SparkConf摹恨,設(shè)置Spark程序的運行時的配置信息晒哄,
     * 例如說通過setMaster來設(shè)置程序要鏈接的Spark集群的Master的URL,如果設(shè)置
     * 為local寝凌,則代表Spark程序在本地運行较木,特別適合于機器配置條件非常差(例如
     * 只有1G的內(nèi)存)的初學(xué)者       *
     */
    val conf = new SparkConf() //創(chuàng)建SparkConf對象
    conf.setAppName("OnlineBlackListFilter") //設(shè)置應(yīng)用程序的名稱预侯,在程序運行的監(jiān)控界面可以看到名稱
    conf.setMaster("spark://Master:7077") //此時雌桑,程序在Spark集群
    val ssc = new StreamingContext(conf, Seconds(30))
    /**
     * 黑名單數(shù)據(jù)準(zhǔn)備校坑,實際上黑名單一般都是動態(tài)的耍目,例如在Redis或者數(shù)據(jù)庫中,黑名單的生成往往有復(fù)雜的業(yè)務(wù)
     * 邏輯毅访,具體情況算法不同,但是在Spark Streaming進行處理的時候每次都能工訪問完整的信息
     */
    val blackList = Array(("hadoop", true),("mahout", true))
    val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)
    val adsClickStream = ssc.socketTextStream("Master", 9999)
    /**
     * 此處模擬的廣告點擊的每條數(shù)據(jù)的格式為:time、name
     * 此處map操作的結(jié)果是name查乒、(time棚亩,name)的格式
     */
    val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(" ")(1), ads) }
    adsClickStreamFormatted.transform(userClickRDD => {
      //通過leftOuterJoin操作既保留了左側(cè)用戶廣告點擊內(nèi)容的RDD的所有內(nèi)容蔑舞,又獲得了相應(yīng)點擊內(nèi)容是否在黑名單中
      val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)
      /**
       * 進行filter過濾的時候从撼,其輸入元素是一個Tuple:(name,((time,name), boolean))
       * 其中第一個元素是黑名單的名稱掏婶,第二元素的第二個元素是進行l(wèi)eftOuterJoin的時候是否存在在值
       * 如果存在的話雄妥,表面當(dāng)前廣告點擊是黑名單老厌,需要過濾掉枝秤,否則的話則是有效點擊內(nèi)容丹壕;
       */
      val validClicked = joinedBlackListRDD.filter(joinedItem => {
        if(joinedItem._2._2.getOrElse(false))
        {
          false
        } else {
          true
        }
      })
      validClicked.map(validClick => {validClick._2._1})
    }).print
    /**
     * 計算后的有效數(shù)據(jù)一般都會寫入Kafka中菌赖,下游的計費系統(tǒng)會從kafka中pull到有效數(shù)據(jù)進行計費
     */
    ssc.start()
    ssc.awaitTermination()
  }
}

集群中需要先執(zhí)行nc,啟動 9999端口

nc -lk 9999

將代碼打包上傳到集群運行

  1. 我們運行完程序薄啥,看到過濾結(jié)果以后,停止程序到逊,打開HistoryServer http://master:18080/

  2. 點擊App ID進去觉壶,打開,會看到如下圖所示的4個Job争剿,從實際執(zhí)行的Job是1個Job蚕苇,但是圖中顯示有4個Job涩笤,從這里可以看出Spark Streaming運行的時候自己會啟動一些Job辆它。



    先看看job id 為0 的詳細信息


  3. 很明顯是我們定義的blackListRDD數(shù)據(jù)的生成。對應(yīng)的代碼為

val blackList = Array((“Hadoop”, true), (“Mathou”, true)) 
//把Array變成RDD 
val blackListRDD = ssc.sparkContext.parallelize(blackList) 

并且它做了reduceBykey的操作(代碼中并沒有此步操作片吊,SparkStreaming框架自行生成的)俏脊。
這里有兩個Stage爷贫,Stage 0和Stage 1

Job 1的詳細信息



一個makeRDD,這個RDD是receiver不斷的接收數(shù)據(jù)流中的數(shù)據(jù)腾务,在時間間隔達到batchInterval后岩瘦,將所有數(shù)據(jù)變成一個RDD启昧。并且它的耗時也是最長的59s

  1. 此處可以看出,receiver也是一個獨立的job苏遥。由此我們可以得出一個結(jié)論:我們在應(yīng)用程序中田炭,可以啟動多個job教硫,并且不用的job之間可以相互配合瞬矩,這就為我們編寫復(fù)雜的應(yīng)用程序打下了基礎(chǔ)涵叮。
    我們點擊上面的start at OnlineBlackListFilter.scala:64查看詳細信息
  2. 根據(jù)上圖的信息割粮,只有一個Executor在接收數(shù)據(jù),最最重要的是紅色框中的數(shù)據(jù)本地性為PROCESS_LOCAL,由此可以知道receiver接收到數(shù)據(jù)后會保存到內(nèi)存中商架,只要內(nèi)存充足是不會寫到磁盤中的飞傀。
    即便在創(chuàng)建receiver時,指定的存儲默認(rèn)策略為
MEMORY_AND_DISK_SER_2 
def socketTextStream( 
hostname: String, 
port: Int, 
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 
): ReceiverInputDStream[String] = withNamedScope(“socket text stream”) { 
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) 
}
  1. job 2的詳細信息




    Job 2 將前兩個job生成的RDD進行l(wèi)eftOuterJoin操作汰聋。
    從Stage Id的編號就可以看出乾吻,它是依賴于上兩個Job的。
    Receiver接收數(shù)據(jù)時是在spark-master節(jié)點上,但是Job 2在處理數(shù)據(jù)時袱巨,數(shù)據(jù)已經(jīng)到了spark-worker1上了(因為我的環(huán)境只有兩個worker嫉入,數(shù)據(jù)并沒有分散到所有worker節(jié)點爷光,worker節(jié)點如果多一點欢瞪,情況可能不一樣重贺,每個節(jié)點都會處理數(shù)據(jù))
    點擊上面的Stage Id 3查看詳細信息:



    Executor上運行,并且有5個Task 。
    Job 3的詳細信息

  2. 總結(jié):我們可以看出型诚,一個batchInterval并不是僅僅觸發(fā)一個Job。
    根據(jù)上面的描述,我們更細致的了解了DStream和RDD的關(guān)系了。DStream就是一個個batchInterval時間內(nèi)的RDD組成的嗦玖。只不過DStream帶上了時間維度酪术,是一個無邊界的集合。



    以上的連續(xù)4個圖继阻,分別對應(yīng)以下4個段落的描述:
    Spark Streaming接收Kafka、Flume、HDFS和Kinesis等各種來源的實時輸入數(shù)據(jù),進行處理后,處理結(jié)果保存在HDFS、Databases等各種地方搪锣。
    Spark Streaming接收這些實時輸入數(shù)據(jù)流堵幽,會將它們按批次劃分,然后交給Spark引擎處理溃肪,生成按照批次劃分的結(jié)果流厨钻。
    Spark Streaming提供了表示連續(xù)數(shù)據(jù)流的苍蔬、高度抽象的被稱為離散流的DStream。DStream本質(zhì)上表示RDD的序列。任何對DStream的操作都會轉(zhuǎn)變?yōu)閷Φ讓覴DD的操作抓狭。
    Spark Streaming使用數(shù)據(jù)源產(chǎn)生的數(shù)據(jù)流創(chuàng)建DStream苗桂,也可以在已有的DStream上使用一些操作來創(chuàng)建新的DStream木缝。
    在我們前面的實驗中,每300秒會接收一批數(shù)據(jù)吱殉,基于這批數(shù)據(jù)會生成RDD友雳,進而觸發(fā)Job押赊,執(zhí)行處理流礁。
    DStream是一個沒有邊界的集合,沒有大小的限制。
    DStream代表了時空的概念。隨著時間的推移谜嫉,里面不斷產(chǎn)生RDD沐兰。
    鎖定到時間片后住闯,就是空間的操作,也就是對本時間片的對應(yīng)批次的數(shù)據(jù)的處理插佛。
    下面用實例來講解數(shù)據(jù)處理過程雇寇。
    從Spark Streaming程序轉(zhuǎn)換為Spark執(zhí)行的作業(yè)的過程中锨侯,使用了DStreamGraph囚痴。
    Spark Streaming程序中一般會有若干個對DStream的操作渡讼。DStreamGraph就是由這些操作的依賴關(guān)系構(gòu)成成箫。

對DStream的操作會構(gòu)建成DStream Graph



從每個foreach開始蹬昌,都會進行回溯皂贩。從后往前回溯這些操作之間的依賴關(guān)系,也就形成了DStreamGraph婴栽。
在每到batchInterval時間間隔后愚争,Job被觸發(fā)轰枝,DStream Graph將會被轉(zhuǎn)換成RDD Graph



空間維度確定之后,隨著時間不斷推進,會不斷實例化RDD Graph,然后觸發(fā)Job去執(zhí)行處理砾脑。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末韧衣,一起剝皮案震驚了整個濱河市畅铭,隨后出現(xiàn)的幾起案子硕噩,更是在濱河造成了極大的恐慌炉擅,老刑警劉巖阳惹,帶你破解...
    沈念sama閱讀 211,042評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件莹汤,死亡現(xiàn)場離奇詭異,居然都是意外死亡线罕,警方通過查閱死者的電腦和手機钞楼,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評論 2 384
  • 文/潘曉璐 我一進店門询件,熙熙樓的掌柜王于貴愁眉苦臉地迎上來雳殊,“玉大人橘沥,你說我怎么就攤上這事座咆〗樘眨” “怎么了哺呜?”我有些...
    開封第一講書人閱讀 156,674評論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長国撵。 經(jīng)常有香客問我介牙,道長澳厢,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,340評論 1 283
  • 正文 為了忘掉前任线得,我火速辦了婚禮框都,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘熬尺。我一直安慰自己粱哼,他們只是感情好揭措,可當(dāng)我...
    茶點故事閱讀 65,404評論 5 384
  • 文/花漫 我一把揭開白布绊含。 她就那樣靜靜地躺著躬充,像睡著了一般充甚。 火紅的嫁衣襯著肌膚如雪霸褒。 梳的紋絲不亂的頭發(fā)上废菱,一...
    開封第一講書人閱讀 49,749評論 1 289
  • 那天殊轴,我揣著相機與錄音梳凛,去河邊找鬼韧拒。 笑死,一個胖子當(dāng)著我的面吹牛叛溢,可吹牛的內(nèi)容都是我干的楷掉。 我是一名探鬼主播,決...
    沈念sama閱讀 38,902評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼愕贡,長吁一口氣:“原來是場噩夢啊……” “哼固以!你這毒婦竟也來了嘱巾?” 一聲冷哼從身側(cè)響起篙螟,我...
    開封第一講書人閱讀 37,662評論 0 266
  • 序言:老撾萬榮一對情侶失蹤遍略,失蹤者是張志新(化名)和其女友劉穎墅冷,沒想到半個月后或油,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體顶岸,經(jīng)...
    沈念sama閱讀 44,110評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年卷谈,在試婚紗的時候發(fā)現(xiàn)自己被綠了世蔗。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片污淋。...
    茶點故事閱讀 38,577評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡寸爆,死狀恐怖赁豆,靈堂內(nèi)的尸體忽然破棺而出魔种,到底是詐尸還是另有隱情,我是刑警寧澤甲抖,帶...
    沈念sama閱讀 34,258評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站去扣,受9級特大地震影響愉棱,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜艾岂,卻給世界環(huán)境...
    茶點故事閱讀 39,848評論 3 312
  • 文/蒙蒙 一王浴、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧袱蚓,春花似錦喇潘、人聲如沸体斩。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,726評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽硕勿。三九已至,卻和暖如春枫甲,著一層夾襖步出監(jiān)牢的瞬間源武,已是汗流浹背扼褪。 一陣腳步聲響...
    開封第一講書人閱讀 31,952評論 1 264
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留粱栖,地道東北人话浇。 一個月前我還...
    沈念sama閱讀 46,271評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像闹究,于是被迫代替她去往敵國和親赏寇。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,452評論 2 348

推薦閱讀更多精彩內(nèi)容