一.Spark streaming介紹
1.1 Spark streaming簡介
Spark Streaming是Spark API的核心擴展漾月,支持實時數(shù)據(jù)流的可擴展、高吞吐量和容錯流處理仰迁。數(shù)據(jù)可以從Kafka、Kinesis或TCP套接字等多種來源中獲取顽分,并且可以使用復(fù)雜的算法進行處理轩勘,這些算法用高級函數(shù)表示,如map怯邪、reduce绊寻、join和window。最后悬秉,處理過的數(shù)據(jù)可以推送到文件系統(tǒng)澄步、數(shù)據(jù)庫和實時儀表板。事實上和泌,您可以在數(shù)據(jù)流上應(yīng)用Spark的機器學習和圖形處理算法村缸。
在內(nèi)部,它的工作方式如下武氓。Spark Streaming接收實時輸入的數(shù)據(jù)流梯皿,并對數(shù)據(jù)進行分批處理仇箱,由Spark引擎進行處理,生成最終的批量結(jié)果流东羹。
Spark Streaming提供了一種高級抽象剂桥,稱為離散流或DStream,它表示連續(xù)的數(shù)據(jù)流属提。Dstream可以通過來自Kafka和Kinesis等源的輸入數(shù)據(jù)流創(chuàng)建权逗,也可以通過在其他Dstream上應(yīng)用高級操作來創(chuàng)建。在內(nèi)部冤议,DStream表示為rdd序列斟薇。
1.2 Spark 與storm區(qū)別
Storm
- 流式計算框架
- 以record為單位處理數(shù)據(jù)
- 也支持micro-batch方式(Trident)
Spark
- 批處理計算框架
- 以RDD為單位處理數(shù)據(jù)
- 也支持micro-batch流式處理數(shù)據(jù)(Spark Streaming)
兩者異同
- 吞吐量: Spark Streaming 優(yōu)于Storm
- 延遲: Spark Streaming差于Storm
1.3 一個簡單的例子
在我們深入了解如何編寫自己的Spark Streaming程序之前,讓我們快速了解一下簡單的Spark Streaming程序是什么樣的恕酸。
首先堪滨,我們導入StreamingContext,它是所有流功能的主要入口點蕊温。我們創(chuàng)建一個具有兩個執(zhí)行線程的本地StreamingContext椿猎,批處理間隔為1秒。
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
為了初始化一個Spark Streaming程序寿弱,必須創(chuàng)建一個StreamingContext對象犯眠,它是所有Spark Streaming功能的主要入口點。
appName參數(shù)是應(yīng)用程序在集群UI上顯示的名稱症革。master是Spark筐咧、Mesos或YARN集群的URL,或者是一個特殊的“l(fā)ocal[*]”字符串噪矛,在本地模式下運行量蕊。實際上,當在集群上運行時艇挨,您不希望在程序中硬編碼master残炮,而是使用spark-submit啟動應(yīng)用程序并在那里接收它。但是缩滨,對于本地測試和單元測試势就,可以通過“l(fā)ocal[*]”來運行Spark Streaming in-process(檢測本地系統(tǒng)中的核數(shù))。
在定義了上下文之后脉漏,必須執(zhí)行以下操作:
- 通過創(chuàng)建輸入DStreams來定義輸入源苞冯。
- 通過對DStreams應(yīng)用轉(zhuǎn)換和輸出操作來定義流計算。
- 開始接收數(shù)據(jù)并使用streamingContext.start()處理它侧巨。
- 使用streamingContext.awaitTermination()等待處理停止(手動或由于任何錯誤)舅锄。
- 可以使用streamingContext.stop()手動停止處理。
注意:
- 一旦啟動了Context司忱,就不能再設(shè)置或向其添加新的流計算皇忿。
- 一旦停止了Context畴蹭,就不能重新啟動它。
- 同一時間鳍烁,JVM中只能有一個StreamingContext是活動的叨襟。
- StreamingContext上的stop()也會停止SparkContext。要只停止StreamingContext老翘,請將stop()的可選參數(shù)stopSparkContext設(shè)置為false。
- 一個SparkContext可以被重用來創(chuàng)建多個StreamingContext锻离,只要在創(chuàng)建下一個StreamingContext之前停止前一個StreamingContext(不停止SparkContext)铺峭。
二.Spark Streaming的組件介紹
Spark Streaming的核心組件有2個:
- Streaming Context
- Dstream(離散流)
2.1 Streaming Context
Streaming Context是Spark Streaming程序的起點,生成Streaming Context之前需要生成SparkContext汽纠,SparkContext可以理解為申請Spark集群的計算資源卫键,Streaming Context可以理解為申請Spark Streaming的計算資源
2.2 Dstream(離散流)
Dstream是Spark Streaming的數(shù)據(jù)抽象,同DataFrame虱朵,其實底層依舊是RDD莉炉。
Discretized Stream或DStream是Spark Streaming提供的基本抽象。它表示一個連續(xù)的數(shù)據(jù)流碴犬,要么是從源接收的輸入數(shù)據(jù)流絮宁,要么是通過轉(zhuǎn)換輸入流生成的處理數(shù)據(jù)流。在內(nèi)部服协,DStream由一系列連續(xù)的rdd表示绍昂,這是Spark對不可變的分布式數(shù)據(jù)集的抽象。DStream中的每個RDD都包含一定時間間隔的數(shù)據(jù)偿荷,如下圖所示:
在DStream上應(yīng)用的任何操作都轉(zhuǎn)換為在底層rdd上的操作窘游。
這些底層RDD轉(zhuǎn)換是由Spark引擎計算的。DStream操作隱藏了大部分細節(jié)跳纳,并為開發(fā)人員提供了更高級的API忍饰。
DStream存在如下概念:
- Receiver
- 數(shù)據(jù)源: 基本源、高級源
- 可靠性
- Dstream的操作
- 緩存
- Checkpoint
2.1 Receiver
每個輸入DStream(文件流除外)都與一個Receiver (Scala doc, Java doc)對象相關(guān)聯(lián)寺庄,接收來自源的數(shù)據(jù)并將其存儲在Spark的內(nèi)存中進行處理艾蓝。
2.2 數(shù)據(jù)源
Spark Streaming提供了兩類內(nèi)置流源:
- 基本源:在StreamingContext API中直接可用的源。例如文件系統(tǒng)和套接字連接斗塘。
- 高級資源:像Kafka, Kinesis等資源可以通過額外的實用程序類獲得饶深。這些需要根據(jù)鏈接部分中討論的額外依賴項進行鏈接。
注意,如果希望在流應(yīng)用程序中并行接收多個數(shù)據(jù)流逛拱,可以創(chuàng)建多個輸入Dstream敌厘。這將創(chuàng)建多個接收器,這些接收器將同時接收多個數(shù)據(jù)流朽合。但是請注意俱两,Spark worker/executor是一個長期運行的任務(wù)饱狂,因此它占用分配給Spark Streaming應(yīng)用程序的一個核心。因此宪彩,Spark Streaming應(yīng)用程序需要分配足夠的內(nèi)核(或者線程休讳,如果在本地運行的話)來處理接收到的數(shù)據(jù),以及運行接收端尿孔,記住這一點很重要俊柔。
記住
在本地運行Spark Streaming程序時,不要使用“l(fā)ocal”或“l(fā)ocal[1]”作為主URL活合。這兩種情況都意味著只有一個線程用于本地運行任務(wù)雏婶。如果你使用一個基于接收器的輸入DStream(例如,socket, Kafka等)白指,那么單線程將被用來運行Receiver 留晚,不留下任何線程來處理接收的數(shù)據(jù)。因此告嘲,當本地運行時错维,總是使用“l(fā)ocal[n]”作為主URL,其中要運行n個>數(shù)量的Receiver 橄唬。
將邏輯擴展到集群上赋焕,分配給Spark Streaming應(yīng)用的內(nèi)核數(shù)必須大于接收端數(shù)。否則系統(tǒng)將接收到數(shù)據(jù)仰楚,但無法進行處理宏邮。
2.3 可靠性
根據(jù)數(shù)據(jù)源的可靠性,可以有兩種數(shù)據(jù)源缸血。源(如Kafka)允許傳輸?shù)臄?shù)據(jù)被確認蜜氨。如果從這些可靠來源接收數(shù)據(jù)的系統(tǒng)正確地確認了接收的數(shù)據(jù),就可以確保不會由于任何類型的故障而丟失數(shù)據(jù)捎泻。這就產(chǎn)生了兩種接收者:
1). 可靠的接收端—當數(shù)據(jù)被接收到并存儲在Spark中并進行復(fù)制時飒炎,一個可靠的接收端會正確地向一個可靠的源發(fā)送確認。
2), 不可靠的接收者——不可靠的接收者不向源發(fā)送確認笆豁。這可以用于不支持確認的來源郎汪,甚至當一個人不想或需要進入確認的復(fù)雜性時,用于可靠的來源闯狱。
對于不可靠的接收者煞赢,Spark streaming有自己的可靠機制,來保證數(shù)據(jù)的可靠性哄孤。
2.4 Dstream的操作
與rdd類似照筑,轉(zhuǎn)換允許修改來自輸入DStream的數(shù)據(jù)。DStreams支持許多普通Spark RDD上可用的轉(zhuǎn)換。下面是一些常見的.
Transformations on DStreams
Output Operations on DStreams:
2.5 緩存
與rdd類似凝危,DStreams也允許開發(fā)人員在內(nèi)存中持久化流數(shù)據(jù)波俄。也就是說,在DStream上使用persist()方法將自動在內(nèi)存中持久化該DStream的每個RDD蛾默。如果DStream中的數(shù)據(jù)將被計算多次(例如懦铺,對同一數(shù)據(jù)的多次操作),這是有用的支鸡。對于基于窗口的操作冬念,如reduceByWindow和reduceByKeyAndWindow,以及基于狀態(tài)的操作牧挣,如updateStateByKey急前,這是隱式true。因此浸踩,由基于窗口的操作生成的DStreams會自動持久化到內(nèi)存中叔汁,而不需要開發(fā)人員調(diào)用persist()统求。
對于通過網(wǎng)絡(luò)接收數(shù)據(jù)的輸入流(例如检碗,Kafka, socket等),默認的持久性級別被設(shè)置為將數(shù)據(jù)復(fù)制到兩個節(jié)點以實現(xiàn)容錯码邻。
注意折剃,與rdd不同,DStreams的默認持久性級別將數(shù)據(jù)序列化保存在內(nèi)存中像屋。
2.6 Checkpoint
流應(yīng)用程序必須全天候運行怕犁,因此必須對與應(yīng)用程序邏輯無關(guān)的故障(例如,系統(tǒng)故障己莺、JVM崩潰等)具有彈性奏甫。為了使這成為可能,Spark Streaming需要對容錯存儲系統(tǒng)進行足夠的信息檢查點凌受,以便從故障中恢復(fù)阵子。有兩種類型的數(shù)據(jù)是檢查點的:
元數(shù)據(jù)檢查點——將定義流計算的信息保存到像HDFS這樣的容錯存儲中。這用于從運行流應(yīng)用程序驅(qū)動程序的節(jié)點的故障中恢復(fù)(稍后將詳細討論)胜蛉。元數(shù)據(jù)包括:
1.1) 配置—用于創(chuàng)建流應(yīng)用程序的配置挠进。
1.2) DStream操作-定義流應(yīng)用程序的DStream操作集。
1.3) 未完成批-作業(yè)已排隊但尚未完成的批誊册。數(shù)據(jù)檢查點——將生成的rdd保存到可靠的存儲中领突。在一些跨多個批組合數(shù)據(jù)的有狀態(tài)轉(zhuǎn)換中,這是必要的案怯。在這種轉(zhuǎn)換中君旦,生成的rdd依賴于以前批次的rdd,這導致依賴鏈的長度隨著時間不斷增加。為了避免恢復(fù)時間的無限增長(與依賴鏈成正比)于宙,有狀態(tài)轉(zhuǎn)換的中間rdd會定期被檢查到可靠的存儲(例如HDFS)浮驳,以切斷依賴鏈。
總之捞魁,元數(shù)據(jù)檢查點主要用于從驅(qū)動程序失敗中恢復(fù)至会,而數(shù)據(jù)或RDD檢查點即使是用于基本功能(如果使用有狀態(tài)轉(zhuǎn)換)也是必要的。
三.一個簡單的測試用例
3.1 linux服務(wù)器安裝nc服務(wù)
yum -y install netcat.x86_64 -- centos7 正確
yum -y install nc.x86_64 -- centos7 錯誤
nc -lk 9999
[root@hp2 yum.repos.d]# nc -help
usage: nc [-46cDdFhklNnrStUuvz] [-C certfile] [-e name] [-H hash] [-I length]
[-i interval] [-K keyfile] [-M ttl] [-m minttl] [-O length]
[-o staplefile] [-P proxy_username] [-p source_port] [-R CAfile]
[-s sourceaddr] [-T keyword] [-V rtable] [-W recvlimit] [-w timeout]
[-X proxy_protocol] [-x proxy_address[:port]] [-Z peercertfile]
[destination] [port]
Command Summary:
-4 Use IPv4
-6 Use IPv6
-C certfile Public key file
-c Use TLS
-D Enable the debug socket option
-d Detach from stdin
-e name Required name in peer certificate
-F Pass socket fd
-H hash Hash string of peer certificate
-h This help text
-I length TCP receive buffer length
-i interval Delay interval for lines sent, ports scanned
-K keyfile Private key file
-k Keep inbound sockets open for multiple connects
-l Listen mode, for inbound connects
-M ttl Outgoing TTL / Hop Limit
-m minttl Minimum incoming TTL / Hop Limit
-N Shutdown the network socket after EOF on stdin
-n Suppress name/port resolutions
-O length TCP send buffer length
-o staplefile Staple file
-P proxyuser Username for proxy authentication
-p port Specify local port for remote connects
-R CAfile CA bundle
-r Randomize remote ports
-S Enable the TCP MD5 signature option
-s sourceaddr Local source address
-T keyword TOS value or TLS options
-t Answer TELNET negotiation
-U Use UNIX domain socket
-u UDP mode
-V rtable Specify alternate routing table
-v Verbose
-W recvlimit Terminate after receiving a number of packets
-w timeout Timeout for connects and final net reads
-X proto Proxy protocol: "4", "5" (SOCKS) or "connect"
-x addr[:port] Specify proxy address and port
-Z Peer certificate file
-z Zero-I/O mode [used for scanning]
Port numbers can be individual or ranges: lo-hi [inclusive]
[root@hp2 yum.repos.d]#
[root@hp2 yum.repos.d]#
[root@hp2 yum.repos.d]#
[root@hp2 yum.repos.d]# nc -lk 9999
3.2 Java spark代碼
maven配置:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.0</version>
</dependency>
代碼:
package org.example;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import java.util.Arrays;
public class SparkStreaming1 {
public static void main(String[] args) throws Exception{
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
// Split each line into words
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();
jssc.start(); // Start the computation
jssc.awaitTermination(); // Wait for the computation to terminate
}
}
運行spark程序代碼:
spark-submit \
--class org.example.SparkStreaming1 \
--master local[2] \
/home/javaspark/SparkStudy-1.0-SNAPSHOT.jar
測試記錄:
滾動太快谱俭,只能從日志中找到記錄
參考:
1.http://spark.apache.org/docs/latest/streaming-programming-guide.html