Spark之Spark Streaming原理

一靡羡、Spark Streaming概述

Spark Streaming類似于Apache Storm农曲,用于流式數(shù)據(jù)的處理,具有高吞吐量和容錯能力強等特點鸦难。Spark Streaming支持的數(shù)據(jù)輸入源很多欧聘,例如:Kafka自沧、Flume、Twitter和簡單的TCP套接字等等,而結(jié)果也能保存在很多地方拇厢,比如HDFS、數(shù)據(jù)庫等晒喷。Spark Streaming使用離散化流作為抽象表示孝偎,叫做DStream。DStream是隨著時間推移而收到的數(shù)據(jù)的序列凉敲。在內(nèi)部衣盾,每個時間區(qū)間收到的數(shù)據(jù)都作為RDD存在,而DStream是由這些RDD所組成的序列爷抓。創(chuàng)建出來的DStream支持兩種操作势决,一種是轉(zhuǎn)化操作(transformation),會生成一個新的DStream蓝撇,另一種是輸出操作(output operation)果复,可以把數(shù)據(jù)寫入外部系統(tǒng)中。DStream提供了許多與RDD所支持的操作相類似的操作支持渤昌,還增加了與時間相關(guān)的新操作虽抄,比如滑動窗口。

二独柑、Spark Streaming和Storm的區(qū)別

區(qū)別

三迈窟、架構(gòu)與抽象

Spark Streaming使用“微批次”的架構(gòu),把流式計算當(dāng)作一系列連續(xù)的小規(guī)模批處理來對待忌栅。Spark Streaming從各種輸入源中讀取數(shù)據(jù)车酣,并把數(shù)據(jù)分組為小的批次。新的批次按均勻的時間間隔創(chuàng)建出來索绪。在每個時間區(qū)間開始的時候湖员,一個新的批次就創(chuàng)建出來,在該區(qū)間內(nèi)收到的數(shù)據(jù)都會被添加到這個批次中者春。在時間區(qū)間結(jié)束時破衔,批次停止增長。時間區(qū)間的大小是由批次間隔這個參數(shù)決定的钱烟。批次間隔一般設(shè)置在500毫秒到幾秒之間晰筛,由應(yīng)用開發(fā)者配置。每個輸入批次都形成一個RDD拴袭,以spark作業(yè)的方式處理并生成其他的RDD读第。處理的結(jié)果可以以批處理的方式傳給外部系統(tǒng)。

Spark Streaming的編程抽象是離散化流拥刻,也就是DStream怜瞒。它是一個RDD序列,每個RDD代表數(shù)據(jù)流中一個時間片內(nèi)的數(shù)據(jù)。
Spark Streaming為每個輸入源啟動對應(yīng)的接收器吴汪。接收器以任務(wù)的形式運行在應(yīng)用的執(zhí)行器進程中惠窄,從輸入源收集數(shù)據(jù)并保存為RDD。它們收集到輸入數(shù)據(jù)后會把數(shù)據(jù)復(fù)制到另一個執(zhí)行器進程來保障容錯性漾橙。數(shù)據(jù)保存在執(zhí)行器進程的內(nèi)存中杆融,和緩存RDD的方式一樣。驅(qū)動器程序中的StreamingContext會周期性地運行Spark作業(yè)來處理這些數(shù)據(jù)霜运,把數(shù)據(jù)與之前時間區(qū)間中的RDD進行整合脾歇。

四、Spark Streaming解析

1淘捡、初始化StreamingContext

import org.apache.spark._
import org.apache.spark.streaming._
val conf=new SparkConf().setAppName(appName).setMaster(master)
val ssc=new StreamingContext(conf,Second(1))

初始化完Context之后:
1藕各、定義消息輸入源來創(chuàng)建DStreams.
2、定義DStreams的轉(zhuǎn)化操作和輸出操作焦除。
3激况、通過streamingContext.start()來啟動消息采集和處理。
4踢京、等待程序終止誉碴,可以通過streamingContext.awaitTermination()來設(shè)置
5、通過streamingContext.stop()來手動終止處理程序瓣距。
注意:
StreamingContext一旦啟動黔帕,對DStreams的操作就不能修改了。在同一時間一個JVM中只有一個StreamingContext可以啟動蹈丸,stop()方法將同時停止SparkContext成黄,可以傳入?yún)?shù)stopSparkContext用于停止StreamingContext。

2逻杖、什么是DStreams

Discretized Stream是Spark Streaming的基本抽象奋岁,代表持續(xù)性的數(shù)據(jù)流和經(jīng)過各種Spark原語操作后的結(jié)果數(shù)據(jù)流。在內(nèi)部實現(xiàn)上荸百,DStream是一系列連續(xù)的RDD來表示闻伶。每個RDD含有一段時間間隔內(nèi)的數(shù)據(jù)。對數(shù)據(jù)的操作也是按照RDD為單位來進行的够话。

3蓝翰、DStreams輸入

Spark Streaming原生支持一些不同的數(shù)據(jù)源。每個接收器都以Spark執(zhí)行程序中一個長期運行的任務(wù)的形式運行女嘲,因此會占據(jù)分配給應(yīng)用的CPU核心畜份。此外,我們還需要有可用的CPU核心來處理數(shù)據(jù)欣尼。這意味這如果要運行多個接收器爆雹,就必須至少有和接收器數(shù)目相同的核心數(shù),還要加上用來完成計算所需的核心數(shù)。例如钙态,如果我們想要在流計算應(yīng)用中運行10個接收器慧起,那么至少需要為應(yīng)用分配11個CPU核心。所以如果在本地模式運行册倒,不要使用local或者local[1]完慧。

3.1基本數(shù)據(jù)源

3.1文件數(shù)據(jù)源

文件數(shù)據(jù)流:能夠讀取所有HDFS API兼容的文件系統(tǒng)文件,通過fileStream方法進行讀取

streamingContext.fileStream[KeyClass,ValueClass,InputFormatClass](dataDirectory)

Spark Streaming將會監(jiān)控dataDirectory目錄并不斷處理移動進來的文件剩失,記住目前不支持嵌套目錄。
1册着、文件需要有相同的數(shù)據(jù)格式
2拴孤、文件進入dataDirectory的方式需要通過移動或者重命名來實現(xiàn)
3、一旦文件移動進目錄甲捏,則不能修改演熟,即便修改了也不會讀取新數(shù)據(jù)。
如果文件比較簡單司顿,則可以使用streamingContext.textFileStream(dataDirectory)方法來讀取文件芒粹。文件流不需要接收器,不需要單獨分配CPU核大溜。

3.2 自定義數(shù)據(jù)源

通過繼承Receiver化漆,并實現(xiàn)onStart、onStop方法來自定義數(shù)據(jù)源采集钦奋∽疲可以通過streamingContext.receiverStream(<instance of custom receiver>)來使用自定義的數(shù)據(jù)采集源。

4付材、DStream轉(zhuǎn)換

DStream上的原語與RDD的類似朦拖,分為Transformations(轉(zhuǎn)換)和Output Operations(輸出)兩種,此外轉(zhuǎn)換操作中還有一些比較特殊的原語厌衔,比如:updateStateByKey()璧帝、transform()以及各種window相關(guān)的原語。
DStream的轉(zhuǎn)化操作可以分為無狀態(tài)(stateless)和有狀態(tài)(stateful)兩種:
(1)富寿、在無狀態(tài)轉(zhuǎn)化操作中睬隶,每個批次的處理不依賴于之前批次的數(shù)據(jù)。常見的RDD轉(zhuǎn)化操作作喘,例如map()理疙、filter()、reduceByKey()等泞坦,都是無狀態(tài)轉(zhuǎn)化操作窖贤;
(2)、有狀態(tài)操作需要使用之前批次的數(shù)據(jù)或者是中間結(jié)果來計算當(dāng)前批次的數(shù)據(jù)。有狀態(tài)轉(zhuǎn)化操作包括基于滑動窗口的轉(zhuǎn)化操作和追蹤狀態(tài)變化的轉(zhuǎn)化操作赃梧。

4.1 無狀態(tài)轉(zhuǎn)化操作

無狀態(tài)轉(zhuǎn)化操作就是把簡單的RDD轉(zhuǎn)化操作應(yīng)用到每個批次上滤蝠,也就是轉(zhuǎn)化DStream中的每一個RDD。注意授嘀,針對鍵值對的DStream轉(zhuǎn)化操作(比如reduceByKey())要添加import StreamingContext._才能在scala中使用物咳。

#這里列舉一下無狀態(tài)轉(zhuǎn)化操作的例子
1、def map[U: ClassTag](mapFunc: T => U): DStream[U] 將源DStream中的每個元素通過一個函數(shù)func從而得到新的DStreams蹄皱。
2览闰、def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] 和map類似,但是每個輸入的項可以被映射為0或更多項巷折。
3压鉴、def filter(filterFunc: T => Boolean): DStream[T] 選擇源DStream中函數(shù)func判為true的記錄作為新DStream
4、def repartition(numPartitions: Int): DStream[T]   通過創(chuàng)建更多或者更少的partition來改變此DStream的并行級別锻拘。
5油吭、def union(that: DStream[T]): DStream[T]  將一個具有相同slideDuration新的DStream和當(dāng)前DStream進行合并,返回新的DStream
6署拟、def count(): DStream[Long]  統(tǒng)計源DStreams中每個RDD所含元素的個數(shù)得到單元素RDD的新DStreams婉宰。
7、def reduce(reduceFunc: (T, T) => T): DStream[T]  通過函數(shù)func(兩個參數(shù)一個輸出)來整合源DStreams中每個RDD元素得到單元素RDD的DStream推穷。
8心包、def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null): DStream[(T, Long)]   對于DStreams中元素類型為K調(diào)用此函數(shù),得到包含(K,Long)對的新DStream缨恒,其中Long值表明相應(yīng)的K在源DStream中每個RDD出現(xiàn)的次數(shù)谴咸。
9、def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)]  對(K,V)對的DStream調(diào)用此函數(shù)骗露,返回同樣(K,V)對的新DStream岭佳,但是新DStream中的對應(yīng)V為使用reduce函數(shù)整合而來
10、def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]  兩DStream分別為(K,V)和(K,W)對萧锉,返回(K,(V,W))對的新DStream珊随。
11、def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]  兩DStream分別為(K,V)和(K,W)對柿隙,返回(K,(Seq[V],Seq[W])對新DStream
12叶洞、def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]  將RDD到RDD映射的函數(shù)func作用于源DStream中每個RDD上得到新DStream。這個可用于在DStream的RDD上做任意操作禀崖。注意的是衩辟,在這個轉(zhuǎn)換函數(shù)里面能夠應(yīng)用所有RDD的轉(zhuǎn)換操作。

4.2 有狀態(tài)轉(zhuǎn)換操作

1波附、def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)]
           (1)艺晴、 S是你需要保存的狀態(tài)的類型昼钻。
           (2)、updateFunc 是定義了每一批次RDD如何來更新的狀態(tài)值封寞。 Seq[V] 是當(dāng)前批次相同key的值的集合然评。 Option[S] 是框架自動提供的,上一次保存的狀態(tài)的值狈究。
           (3)碗淌、updateStateByKey會返回一個新的DStream,該DStream中保存了(Key,State)的序列抖锥。
2亿眠、window 函數(shù)
           (1)、def window(windowDuration: Duration, slideDuration: Duration): DStream[T] 基于對源DStream窗化的批次進行計算返回一個新的DStream磅废,windowDuration是窗口大小缕探,slideDuration滑動步長。
           (2)还蹲、def countByWindow( windowDuration: Duration, slideDuration: Duration): DStream[Long]  注意,返回的是window中記錄的條數(shù)耙考。
           (3)谜喊、def reduceByWindow( reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T]   通過使用自定義函數(shù)整合滑動區(qū)間流元素來創(chuàng)建一個新的單元素流。
           (4)倦始、 def reduceByKeyAndWindow(reduceFunc: (V, V) => V,windowDuration: Duration斗遏, slideDuration: Duration): DStream[(K, V)]   通過給定的窗口大小以滑動步長來應(yīng)用reduceFunc函數(shù),返回DStream[(K, V)], K就是DStream中相應(yīng)的K鞋邑,V是window應(yīng)用了reduce之后產(chǎn)生的最終值诵次。
           (5)、def reduceByKeyAndWindow(reduceFunc: (V, V) => V,invReduceFunc: (V, V) => V,windowDuration: Duration,slideDuration: Duration = self.slideDuration,numPartitions: Int =ssc.sc.defaultParallelism,filterFunc: ((K, V)) => Boolean = null): DStream[(K, V)]

五枚碗、Spark Streaming應(yīng)用案例

這里我們使用spark streaming來編寫一個實時統(tǒng)計單詞的案例:

1逾一、這里我使用的是idea創(chuàng)建的一個maven工程
接下來選擇工程的目錄,然后finish即可肮雨。

2遵堵、在pom.xml文件中添加一下依賴:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sparkstreaming</groupId>
    <artifactId>sparkstreamingdemo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.4.1</spark.version>
    </properties>
    <dependencies>
        <!--添加Scala依賴-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
            <!--如果有provided存在,那么打包的時候該依賴不會打到j(luò)ar包中-->
            <scope>provided</scope>
        </dependency>
        <!--添加spark-core依賴-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.0</version>
            <scope>provided</scope>
        </dependency>
        <!--添加spark-streaming依賴-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.0</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!--添加編譯支持怨规,都編譯成java1.8版本-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <!--添加Scala編譯的支持-->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!--添加打jar包的支持工具-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!--在你應(yīng)用maven package這個階段的時候陌宿,該插件會啟動-->
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <archive>
                        <!--在你jar包中指定啟動類-->
                        <manifest>
                            <mainClass>com.SparkStreaming.WordCount</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

3、編寫Scala文件

在編寫scala之前我們先要改一下文件名(方便)
注意紅框標(biāo)出的內(nèi)容與pom文件中指定的啟動類必須一致

接下來編寫scala代碼波丰,每一行都有注釋壳坪,可以仔細(xì)看一下:

package com.SparkStreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WordCount extends App{
  //需要新建一個sparkConf變量,來提供spark的配置
  val sparkConf=new SparkConf().setAppName("StreamWordCount").setMaster("local[2]")

  //新建一個StreamingContext入口
  val ssc=new StreamingContext(sparkConf,Seconds(2))

  //從master機器上的9999端口不斷的獲取輸入的文本數(shù)據(jù)
  val lines=ssc.socketTextStream("master",9999)

  //將每行文本通過空格分割多個單詞
  val words=lines.flatMap(_.split(" "))

  //將每一個單詞裝換成一個元組
  val pairs=words.map((_,1))

  //根據(jù)單詞來統(tǒng)計相同單詞的數(shù)量
  val result=pairs.reduceByKey(_+_)

  //打印結(jié)果
  result.print()

  //啟動你的流式處理程序
  ssc.start()

  //等待你的停止信號
  ssc.awaitTermination()

}

4掰烟、寫完這個項目代碼之后爽蝴,打包上傳到服務(wù)器

打包結(jié)束后項目會產(chǎn)生一個target文件沐批,里面就有打成的jar包(選擇后綴帶有denpendencies)
接下來就是上傳到服務(wù)器,這里我就不演示了霜瘪,使用xftp相對簡單珠插。我把它上傳到自己建的一個文件夾下

5、最后我們就是測試一下颖对,因為是實時計算所以我們這里需要有一個輸入的地方捻撑,在代碼我們已經(jīng)給出了一個監(jiān)聽端口號,所以我們另打開一個終端輸入以下命令:
nc -lk 9999
//或者(注意這里的l不是1是小寫的L)
nc -l -p 9999

開啟程序之后我們要在這個窗口輸入要統(tǒng)計的單詞缤底。
接下來我們就是開啟我們的程序顾患,這里要注意:開啟的順序不能錯,先開啟監(jiān)聽端口號个唧,然后再啟動程序

./spark-submit --class com.SparkStreaming.WordCount  /root/Pro-jar/sparkstreamingdemo-1.0-SNAPSHOT-jar-with-dependencies.jar

程序啟動后的效果如下:

我們設(shè)置的是兩秒一次刷新江解,所以每個2秒會出現(xiàn)一次time
最后我們在輸入窗口輸入測試數(shù)據(jù):

案例到這里就結(jié)束了。
這里主要是淺顯的講解了一下Spark Streaming徙歼,后期還會擴展犁河。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市魄梯,隨后出現(xiàn)的幾起案子桨螺,更是在濱河造成了極大的恐慌,老刑警劉巖酿秸,帶你破解...
    沈念sama閱讀 212,599評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件灭翔,死亡現(xiàn)場離奇詭異,居然都是意外死亡辣苏,警方通過查閱死者的電腦和手機肝箱,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,629評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來稀蟋,“玉大人煌张,你說我怎么就攤上這事⊥丝停” “怎么了唱矛?”我有些...
    開封第一講書人閱讀 158,084評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長井辜。 經(jīng)常有香客問我绎谦,道長,這世上最難降的妖魔是什么粥脚? 我笑而不...
    開封第一講書人閱讀 56,708評論 1 284
  • 正文 為了忘掉前任窃肠,我火速辦了婚禮,結(jié)果婚禮上刷允,老公的妹妹穿的比我還像新娘冤留。我一直安慰自己碧囊,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,813評論 6 386
  • 文/花漫 我一把揭開白布纤怒。 她就那樣靜靜地躺著糯而,像睡著了一般。 火紅的嫁衣襯著肌膚如雪泊窘。 梳的紋絲不亂的頭發(fā)上熄驼,一...
    開封第一講書人閱讀 50,021評論 1 291
  • 那天,我揣著相機與錄音烘豹,去河邊找鬼瓜贾。 笑死,一個胖子當(dāng)著我的面吹牛携悯,可吹牛的內(nèi)容都是我干的祭芦。 我是一名探鬼主播,決...
    沈念sama閱讀 39,120評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼憔鬼,長吁一口氣:“原來是場噩夢啊……” “哼龟劲!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起轴或,我...
    開封第一講書人閱讀 37,866評論 0 268
  • 序言:老撾萬榮一對情侶失蹤咸灿,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后侮叮,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,308評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡悼瘾,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,633評論 2 327
  • 正文 我和宋清朗相戀三年囊榜,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片亥宿。...
    茶點故事閱讀 38,768評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡卸勺,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出烫扼,到底是詐尸還是另有隱情曙求,我是刑警寧澤,帶...
    沈念sama閱讀 34,461評論 4 333
  • 正文 年R本政府宣布映企,位于F島的核電站悟狱,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏堰氓。R本人自食惡果不足惜挤渐,卻給世界環(huán)境...
    茶點故事閱讀 40,094評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望双絮。 院中可真熱鬧浴麻,春花似錦得问、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,850評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至膏萧,卻和暖如春漓骚,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背向抢。 一陣腳步聲響...
    開封第一講書人閱讀 32,082評論 1 267
  • 我被黑心中介騙來泰國打工认境, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人挟鸠。 一個月前我還...
    沈念sama閱讀 46,571評論 2 362
  • 正文 我出身青樓叉信,卻偏偏與公主長得像,于是被迫代替她去往敵國和親艘希。 傳聞我的和親對象是個殘疾皇子硼身,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,666評論 2 350

推薦閱讀更多精彩內(nèi)容