一靡羡、Spark Streaming概述
Spark Streaming類似于Apache Storm农曲,用于流式數(shù)據(jù)的處理,具有高吞吐量和容錯能力強等特點鸦难。Spark Streaming支持的數(shù)據(jù)輸入源很多欧聘,例如:Kafka自沧、Flume、Twitter和簡單的TCP套接字等等,而結(jié)果也能保存在很多地方拇厢,比如HDFS、數(shù)據(jù)庫等晒喷。Spark Streaming使用離散化流作為抽象表示孝偎,叫做DStream。DStream是隨著時間推移而收到的數(shù)據(jù)的序列凉敲。在內(nèi)部衣盾,每個時間區(qū)間收到的數(shù)據(jù)都作為RDD存在,而DStream是由這些RDD所組成的序列爷抓。創(chuàng)建出來的DStream支持兩種操作势决,一種是轉(zhuǎn)化操作(transformation),會生成一個新的DStream蓝撇,另一種是輸出操作(output operation)果复,可以把數(shù)據(jù)寫入外部系統(tǒng)中。DStream提供了許多與RDD所支持的操作相類似的操作支持渤昌,還增加了與時間相關(guān)的新操作虽抄,比如滑動窗口。
二独柑、Spark Streaming和Storm的區(qū)別
三迈窟、架構(gòu)與抽象
Spark Streaming使用“微批次”的架構(gòu),把流式計算當(dāng)作一系列連續(xù)的小規(guī)模批處理來對待忌栅。Spark Streaming從各種輸入源中讀取數(shù)據(jù)车酣,并把數(shù)據(jù)分組為小的批次。新的批次按均勻的時間間隔創(chuàng)建出來索绪。在每個時間區(qū)間開始的時候湖员,一個新的批次就創(chuàng)建出來,在該區(qū)間內(nèi)收到的數(shù)據(jù)都會被添加到這個批次中者春。在時間區(qū)間結(jié)束時破衔,批次停止增長。時間區(qū)間的大小是由批次間隔這個參數(shù)決定的钱烟。批次間隔一般設(shè)置在500毫秒到幾秒之間晰筛,由應(yīng)用開發(fā)者配置。每個輸入批次都形成一個RDD拴袭,以spark作業(yè)的方式處理并生成其他的RDD读第。處理的結(jié)果可以以批處理的方式傳給外部系統(tǒng)。四、Spark Streaming解析
1淘捡、初始化StreamingContext
import org.apache.spark._
import org.apache.spark.streaming._
val conf=new SparkConf().setAppName(appName).setMaster(master)
val ssc=new StreamingContext(conf,Second(1))
初始化完Context之后:
1藕各、定義消息輸入源來創(chuàng)建DStreams.
2、定義DStreams的轉(zhuǎn)化操作和輸出操作焦除。
3激况、通過streamingContext.start()來啟動消息采集和處理。
4踢京、等待程序終止誉碴,可以通過streamingContext.awaitTermination()來設(shè)置
5、通過streamingContext.stop()來手動終止處理程序瓣距。
注意:
StreamingContext一旦啟動黔帕,對DStreams的操作就不能修改了。在同一時間一個JVM中只有一個StreamingContext可以啟動蹈丸,stop()方法將同時停止SparkContext成黄,可以傳入?yún)?shù)stopSparkContext用于停止StreamingContext。
2逻杖、什么是DStreams
Discretized Stream是Spark Streaming的基本抽象奋岁,代表持續(xù)性的數(shù)據(jù)流和經(jīng)過各種Spark原語操作后的結(jié)果數(shù)據(jù)流。在內(nèi)部實現(xiàn)上荸百,DStream是一系列連續(xù)的RDD來表示闻伶。每個RDD含有一段時間間隔內(nèi)的數(shù)據(jù)。對數(shù)據(jù)的操作也是按照RDD為單位來進行的够话。
3蓝翰、DStreams輸入
Spark Streaming原生支持一些不同的數(shù)據(jù)源。每個接收器都以Spark執(zhí)行程序中一個長期運行的任務(wù)的形式運行女嘲,因此會占據(jù)分配給應(yīng)用的CPU核心畜份。此外,我們還需要有可用的CPU核心來處理數(shù)據(jù)欣尼。這意味這如果要運行多個接收器爆雹,就必須至少有和接收器數(shù)目相同的核心數(shù),還要加上用來完成計算所需的核心數(shù)。例如钙态,如果我們想要在流計算應(yīng)用中運行10個接收器慧起,那么至少需要為應(yīng)用分配11個CPU核心。所以如果在本地模式運行册倒,不要使用local或者local[1]完慧。
3.1基本數(shù)據(jù)源
3.1文件數(shù)據(jù)源
文件數(shù)據(jù)流:能夠讀取所有HDFS API兼容的文件系統(tǒng)文件,通過fileStream方法進行讀取
streamingContext.fileStream[KeyClass,ValueClass,InputFormatClass](dataDirectory)
Spark Streaming將會監(jiān)控dataDirectory目錄并不斷處理移動進來的文件剩失,記住目前不支持嵌套目錄。
1册着、文件需要有相同的數(shù)據(jù)格式
2拴孤、文件進入dataDirectory的方式需要通過移動或者重命名來實現(xiàn)
3、一旦文件移動進目錄甲捏,則不能修改演熟,即便修改了也不會讀取新數(shù)據(jù)。
如果文件比較簡單司顿,則可以使用streamingContext.textFileStream(dataDirectory)方法來讀取文件芒粹。文件流不需要接收器,不需要單獨分配CPU核大溜。
3.2 自定義數(shù)據(jù)源
通過繼承Receiver化漆,并實現(xiàn)onStart、onStop方法來自定義數(shù)據(jù)源采集钦奋∽疲可以通過streamingContext.receiverStream(<instance of custom receiver>)來使用自定義的數(shù)據(jù)采集源。
4付材、DStream轉(zhuǎn)換
DStream上的原語與RDD的類似朦拖,分為Transformations(轉(zhuǎn)換)和Output Operations(輸出)兩種,此外轉(zhuǎn)換操作中還有一些比較特殊的原語厌衔,比如:updateStateByKey()璧帝、transform()以及各種window相關(guān)的原語。
DStream的轉(zhuǎn)化操作可以分為無狀態(tài)(stateless)和有狀態(tài)(stateful)兩種:
(1)富寿、在無狀態(tài)轉(zhuǎn)化操作中睬隶,每個批次的處理不依賴于之前批次的數(shù)據(jù)。常見的RDD轉(zhuǎn)化操作作喘,例如map()理疙、filter()、reduceByKey()等泞坦,都是無狀態(tài)轉(zhuǎn)化操作窖贤;
(2)、有狀態(tài)操作需要使用之前批次的數(shù)據(jù)或者是中間結(jié)果來計算當(dāng)前批次的數(shù)據(jù)。有狀態(tài)轉(zhuǎn)化操作包括基于滑動窗口的轉(zhuǎn)化操作和追蹤狀態(tài)變化的轉(zhuǎn)化操作赃梧。
4.1 無狀態(tài)轉(zhuǎn)化操作
無狀態(tài)轉(zhuǎn)化操作就是把簡單的RDD轉(zhuǎn)化操作應(yīng)用到每個批次上滤蝠,也就是轉(zhuǎn)化DStream中的每一個RDD。注意授嘀,針對鍵值對的DStream轉(zhuǎn)化操作(比如reduceByKey())要添加import StreamingContext._才能在scala中使用物咳。
#這里列舉一下無狀態(tài)轉(zhuǎn)化操作的例子
1、def map[U: ClassTag](mapFunc: T => U): DStream[U] 將源DStream中的每個元素通過一個函數(shù)func從而得到新的DStreams蹄皱。
2览闰、def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] 和map類似,但是每個輸入的項可以被映射為0或更多項巷折。
3压鉴、def filter(filterFunc: T => Boolean): DStream[T] 選擇源DStream中函數(shù)func判為true的記錄作為新DStream
4、def repartition(numPartitions: Int): DStream[T] 通過創(chuàng)建更多或者更少的partition來改變此DStream的并行級別锻拘。
5油吭、def union(that: DStream[T]): DStream[T] 將一個具有相同slideDuration新的DStream和當(dāng)前DStream進行合并,返回新的DStream
6署拟、def count(): DStream[Long] 統(tǒng)計源DStreams中每個RDD所含元素的個數(shù)得到單元素RDD的新DStreams婉宰。
7、def reduce(reduceFunc: (T, T) => T): DStream[T] 通過函數(shù)func(兩個參數(shù)一個輸出)來整合源DStreams中每個RDD元素得到單元素RDD的DStream推穷。
8心包、def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null): DStream[(T, Long)] 對于DStreams中元素類型為K調(diào)用此函數(shù),得到包含(K,Long)對的新DStream缨恒,其中Long值表明相應(yīng)的K在源DStream中每個RDD出現(xiàn)的次數(shù)谴咸。
9、def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] 對(K,V)對的DStream調(diào)用此函數(shù)骗露,返回同樣(K,V)對的新DStream岭佳,但是新DStream中的對應(yīng)V為使用reduce函數(shù)整合而來
10、def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] 兩DStream分別為(K,V)和(K,W)對萧锉,返回(K,(V,W))對的新DStream珊随。
11、def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] 兩DStream分別為(K,V)和(K,W)對柿隙,返回(K,(Seq[V],Seq[W])對新DStream
12叶洞、def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] 將RDD到RDD映射的函數(shù)func作用于源DStream中每個RDD上得到新DStream。這個可用于在DStream的RDD上做任意操作禀崖。注意的是衩辟,在這個轉(zhuǎn)換函數(shù)里面能夠應(yīng)用所有RDD的轉(zhuǎn)換操作。
4.2 有狀態(tài)轉(zhuǎn)換操作
1波附、def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)]
(1)艺晴、 S是你需要保存的狀態(tài)的類型昼钻。
(2)、updateFunc 是定義了每一批次RDD如何來更新的狀態(tài)值封寞。 Seq[V] 是當(dāng)前批次相同key的值的集合然评。 Option[S] 是框架自動提供的,上一次保存的狀態(tài)的值狈究。
(3)碗淌、updateStateByKey會返回一個新的DStream,該DStream中保存了(Key,State)的序列抖锥。
2亿眠、window 函數(shù)
(1)、def window(windowDuration: Duration, slideDuration: Duration): DStream[T] 基于對源DStream窗化的批次進行計算返回一個新的DStream磅废,windowDuration是窗口大小缕探,slideDuration滑動步長。
(2)还蹲、def countByWindow( windowDuration: Duration, slideDuration: Duration): DStream[Long] 注意,返回的是window中記錄的條數(shù)耙考。
(3)谜喊、def reduceByWindow( reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T] 通過使用自定義函數(shù)整合滑動區(qū)間流元素來創(chuàng)建一個新的單元素流。
(4)倦始、 def reduceByKeyAndWindow(reduceFunc: (V, V) => V,windowDuration: Duration斗遏, slideDuration: Duration): DStream[(K, V)] 通過給定的窗口大小以滑動步長來應(yīng)用reduceFunc函數(shù),返回DStream[(K, V)], K就是DStream中相應(yīng)的K鞋邑,V是window應(yīng)用了reduce之后產(chǎn)生的最終值诵次。
(5)、def reduceByKeyAndWindow(reduceFunc: (V, V) => V,invReduceFunc: (V, V) => V,windowDuration: Duration,slideDuration: Duration = self.slideDuration,numPartitions: Int =ssc.sc.defaultParallelism,filterFunc: ((K, V)) => Boolean = null): DStream[(K, V)]
五枚碗、Spark Streaming應(yīng)用案例
這里我們使用spark streaming來編寫一個實時統(tǒng)計單詞的案例:
2遵堵、在pom.xml文件中添加一下依賴:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sparkstreaming</groupId>
<artifactId>sparkstreamingdemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.4.1</spark.version>
</properties>
<dependencies>
<!--添加Scala依賴-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
<!--如果有provided存在,那么打包的時候該依賴不會打到j(luò)ar包中-->
<scope>provided</scope>
</dependency>
<!--添加spark-core依賴-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
<!--添加spark-streaming依賴-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!--添加編譯支持怨规,都編譯成java1.8版本-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!--添加Scala編譯的支持-->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!--添加打jar包的支持工具-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!--在你應(yīng)用maven package這個階段的時候陌宿,該插件會啟動-->
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<!--在你jar包中指定啟動類-->
<manifest>
<mainClass>com.SparkStreaming.WordCount</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
3、編寫Scala文件
接下來編寫scala代碼波丰,每一行都有注釋壳坪,可以仔細(xì)看一下:
package com.SparkStreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WordCount extends App{
//需要新建一個sparkConf變量,來提供spark的配置
val sparkConf=new SparkConf().setAppName("StreamWordCount").setMaster("local[2]")
//新建一個StreamingContext入口
val ssc=new StreamingContext(sparkConf,Seconds(2))
//從master機器上的9999端口不斷的獲取輸入的文本數(shù)據(jù)
val lines=ssc.socketTextStream("master",9999)
//將每行文本通過空格分割多個單詞
val words=lines.flatMap(_.split(" "))
//將每一個單詞裝換成一個元組
val pairs=words.map((_,1))
//根據(jù)單詞來統(tǒng)計相同單詞的數(shù)量
val result=pairs.reduceByKey(_+_)
//打印結(jié)果
result.print()
//啟動你的流式處理程序
ssc.start()
//等待你的停止信號
ssc.awaitTermination()
}
4掰烟、寫完這個項目代碼之后爽蝴,打包上傳到服務(wù)器5、最后我們就是測試一下颖对,因為是實時計算所以我們這里需要有一個輸入的地方捻撑,在代碼我們已經(jīng)給出了一個監(jiān)聽端口號,所以我們另打開一個終端輸入以下命令:
nc -lk 9999
//或者(注意這里的l不是1是小寫的L)
nc -l -p 9999
接下來我們就是開啟我們的程序顾患,這里要注意:開啟的順序不能錯,先開啟監(jiān)聽端口號个唧,然后再啟動程序:
./spark-submit --class com.SparkStreaming.WordCount /root/Pro-jar/sparkstreamingdemo-1.0-SNAPSHOT-jar-with-dependencies.jar
案例到這里就結(jié)束了。
這里主要是淺顯的講解了一下Spark Streaming徙歼,后期還會擴展犁河。