Java-Spark系列7-Spark streaming介紹

一.Spark streaming介紹

1.1 Spark streaming簡介

Spark Streaming是Spark API的核心擴展漾月,支持實時數(shù)據(jù)流的可擴展、高吞吐量和容錯流處理仰迁。數(shù)據(jù)可以從Kafka、Kinesis或TCP套接字等多種來源中獲取顽分,并且可以使用復(fù)雜的算法進行處理轩勘,這些算法用高級函數(shù)表示,如map怯邪、reduce绊寻、join和window。最后悬秉,處理過的數(shù)據(jù)可以推送到文件系統(tǒng)澄步、數(shù)據(jù)庫和實時儀表板。事實上和泌,您可以在數(shù)據(jù)流上應(yīng)用Spark的機器學習和圖形處理算法村缸。


image.png

在內(nèi)部,它的工作方式如下武氓。Spark Streaming接收實時輸入的數(shù)據(jù)流梯皿,并對數(shù)據(jù)進行分批處理仇箱,由Spark引擎進行處理,生成最終的批量結(jié)果流东羹。


image.png

Spark Streaming提供了一種高級抽象剂桥,稱為離散流或DStream,它表示連續(xù)的數(shù)據(jù)流属提。Dstream可以通過來自Kafka和Kinesis等源的輸入數(shù)據(jù)流創(chuàng)建权逗,也可以通過在其他Dstream上應(yīng)用高級操作來創(chuàng)建。在內(nèi)部冤议,DStream表示為rdd序列斟薇。

1.2 Spark 與storm區(qū)別

Storm

  1. 流式計算框架
  2. 以record為單位處理數(shù)據(jù)
  3. 也支持micro-batch方式(Trident)

Spark

  1. 批處理計算框架
  2. 以RDD為單位處理數(shù)據(jù)
  3. 也支持micro-batch流式處理數(shù)據(jù)(Spark Streaming)

兩者異同

  1. 吞吐量: Spark Streaming 優(yōu)于Storm
  2. 延遲: Spark Streaming差于Storm

1.3 一個簡單的例子

在我們深入了解如何編寫自己的Spark Streaming程序之前,讓我們快速了解一下簡單的Spark Streaming程序是什么樣的恕酸。

首先堪滨,我們導入StreamingContext,它是所有流功能的主要入口點蕊温。我們創(chuàng)建一個具有兩個執(zhí)行線程的本地StreamingContext椿猎,批處理間隔為1秒。

import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

為了初始化一個Spark Streaming程序寿弱,必須創(chuàng)建一個StreamingContext對象犯眠,它是所有Spark Streaming功能的主要入口點。

appName參數(shù)是應(yīng)用程序在集群UI上顯示的名稱症革。master是Spark筐咧、Mesos或YARN集群的URL,或者是一個特殊的“l(fā)ocal[*]”字符串噪矛,在本地模式下運行量蕊。實際上,當在集群上運行時艇挨,您不希望在程序中硬編碼master残炮,而是使用spark-submit啟動應(yīng)用程序并在那里接收它。但是缩滨,對于本地測試和單元測試势就,可以通過“l(fā)ocal[*]”來運行Spark Streaming in-process(檢測本地系統(tǒng)中的核數(shù))。

在定義了上下文之后脉漏,必須執(zhí)行以下操作:

  1. 通過創(chuàng)建輸入DStreams來定義輸入源苞冯。
  2. 通過對DStreams應(yīng)用轉(zhuǎn)換和輸出操作來定義流計算。
  3. 開始接收數(shù)據(jù)并使用streamingContext.start()處理它侧巨。
  4. 使用streamingContext.awaitTermination()等待處理停止(手動或由于任何錯誤)舅锄。
  5. 可以使用streamingContext.stop()手動停止處理。

注意:

  1. 一旦啟動了Context司忱,就不能再設(shè)置或向其添加新的流計算皇忿。
  2. 一旦停止了Context畴蹭,就不能重新啟動它。
  3. 同一時間鳍烁,JVM中只能有一個StreamingContext是活動的叨襟。
  4. StreamingContext上的stop()也會停止SparkContext。要只停止StreamingContext老翘,請將stop()的可選參數(shù)stopSparkContext設(shè)置為false。
  5. 一個SparkContext可以被重用來創(chuàng)建多個StreamingContext锻离,只要在創(chuàng)建下一個StreamingContext之前停止前一個StreamingContext(不停止SparkContext)铺峭。

二.Spark Streaming的組件介紹

Spark Streaming的核心組件有2個:

  1. Streaming Context
  2. Dstream(離散流)

2.1 Streaming Context

Streaming Context是Spark Streaming程序的起點,生成Streaming Context之前需要生成SparkContext汽纠,SparkContext可以理解為申請Spark集群的計算資源卫键,Streaming Context可以理解為申請Spark Streaming的計算資源

2.2 Dstream(離散流)

Dstream是Spark Streaming的數(shù)據(jù)抽象,同DataFrame虱朵,其實底層依舊是RDD莉炉。

Discretized Stream或DStream是Spark Streaming提供的基本抽象。它表示一個連續(xù)的數(shù)據(jù)流碴犬,要么是從源接收的輸入數(shù)據(jù)流絮宁,要么是通過轉(zhuǎn)換輸入流生成的處理數(shù)據(jù)流。在內(nèi)部服协,DStream由一系列連續(xù)的rdd表示绍昂,這是Spark對不可變的分布式數(shù)據(jù)集的抽象。DStream中的每個RDD都包含一定時間間隔的數(shù)據(jù)偿荷,如下圖所示:


image.png

在DStream上應(yīng)用的任何操作都轉(zhuǎn)換為在底層rdd上的操作窘游。


image.png

這些底層RDD轉(zhuǎn)換是由Spark引擎計算的。DStream操作隱藏了大部分細節(jié)跳纳,并為開發(fā)人員提供了更高級的API忍饰。

DStream存在如下概念:

  1. Receiver
  2. 數(shù)據(jù)源: 基本源、高級源
  3. 可靠性
  4. Dstream的操作
  5. 緩存
  6. Checkpoint
image.png

2.1 Receiver

每個輸入DStream(文件流除外)都與一個Receiver (Scala doc, Java doc)對象相關(guān)聯(lián)寺庄,接收來自源的數(shù)據(jù)并將其存儲在Spark的內(nèi)存中進行處理艾蓝。

2.2 數(shù)據(jù)源

Spark Streaming提供了兩類內(nèi)置流源:

  1. 基本源:在StreamingContext API中直接可用的源。例如文件系統(tǒng)和套接字連接斗塘。
  2. 高級資源:像Kafka, Kinesis等資源可以通過額外的實用程序類獲得饶深。這些需要根據(jù)鏈接部分中討論的額外依賴項進行鏈接。

注意,如果希望在流應(yīng)用程序中并行接收多個數(shù)據(jù)流逛拱,可以創(chuàng)建多個輸入Dstream敌厘。這將創(chuàng)建多個接收器,這些接收器將同時接收多個數(shù)據(jù)流朽合。但是請注意俱两,Spark worker/executor是一個長期運行的任務(wù)饱狂,因此它占用分配給Spark Streaming應(yīng)用程序的一個核心。因此宪彩,Spark Streaming應(yīng)用程序需要分配足夠的內(nèi)核(或者線程休讳,如果在本地運行的話)來處理接收到的數(shù)據(jù),以及運行接收端尿孔,記住這一點很重要俊柔。

記住
在本地運行Spark Streaming程序時,不要使用“l(fā)ocal”或“l(fā)ocal[1]”作為主URL活合。這兩種情況都意味著只有一個線程用于本地運行任務(wù)雏婶。如果你使用一個基于接收器的輸入DStream(例如,socket, Kafka等)白指,那么單線程將被用來運行Receiver 留晚,不留下任何線程來處理接收的數(shù)據(jù)。因此告嘲,當本地運行時错维,總是使用“l(fā)ocal[n]”作為主URL,其中要運行n個>數(shù)量的Receiver 橄唬。

將邏輯擴展到集群上赋焕,分配給Spark Streaming應(yīng)用的內(nèi)核數(shù)必須大于接收端數(shù)。否則系統(tǒng)將接收到數(shù)據(jù)仰楚,但無法進行處理宏邮。

2.3 可靠性

根據(jù)數(shù)據(jù)源的可靠性,可以有兩種數(shù)據(jù)源缸血。源(如Kafka)允許傳輸?shù)臄?shù)據(jù)被確認蜜氨。如果從這些可靠來源接收數(shù)據(jù)的系統(tǒng)正確地確認了接收的數(shù)據(jù),就可以確保不會由于任何類型的故障而丟失數(shù)據(jù)捎泻。這就產(chǎn)生了兩種接收者:
1). 可靠的接收端—當數(shù)據(jù)被接收到并存儲在Spark中并進行復(fù)制時飒炎,一個可靠的接收端會正確地向一個可靠的源發(fā)送確認。
2), 不可靠的接收者——不可靠的接收者不向源發(fā)送確認笆豁。這可以用于不支持確認的來源郎汪,甚至當一個人不想或需要進入確認的復(fù)雜性時,用于可靠的來源闯狱。

對于不可靠的接收者煞赢,Spark streaming有自己的可靠機制,來保證數(shù)據(jù)的可靠性哄孤。

2.4 Dstream的操作

與rdd類似照筑,轉(zhuǎn)換允許修改來自輸入DStream的數(shù)據(jù)。DStreams支持許多普通Spark RDD上可用的轉(zhuǎn)換。下面是一些常見的.

Transformations on DStreams

image.png

Output Operations on DStreams:

image.png

2.5 緩存

與rdd類似凝危,DStreams也允許開發(fā)人員在內(nèi)存中持久化流數(shù)據(jù)波俄。也就是說,在DStream上使用persist()方法將自動在內(nèi)存中持久化該DStream的每個RDD蛾默。如果DStream中的數(shù)據(jù)將被計算多次(例如懦铺,對同一數(shù)據(jù)的多次操作),這是有用的支鸡。對于基于窗口的操作冬念,如reduceByWindow和reduceByKeyAndWindow,以及基于狀態(tài)的操作牧挣,如updateStateByKey急前,這是隱式true。因此浸踩,由基于窗口的操作生成的DStreams會自動持久化到內(nèi)存中叔汁,而不需要開發(fā)人員調(diào)用persist()统求。

對于通過網(wǎng)絡(luò)接收數(shù)據(jù)的輸入流(例如检碗,Kafka, socket等),默認的持久性級別被設(shè)置為將數(shù)據(jù)復(fù)制到兩個節(jié)點以實現(xiàn)容錯码邻。

注意折剃,與rdd不同,DStreams的默認持久性級別將數(shù)據(jù)序列化保存在內(nèi)存中像屋。

2.6 Checkpoint

流應(yīng)用程序必須全天候運行怕犁,因此必須對與應(yīng)用程序邏輯無關(guān)的故障(例如,系統(tǒng)故障己莺、JVM崩潰等)具有彈性奏甫。為了使這成為可能,Spark Streaming需要對容錯存儲系統(tǒng)進行足夠的信息檢查點凌受,以便從故障中恢復(fù)阵子。有兩種類型的數(shù)據(jù)是檢查點的:

  1. 元數(shù)據(jù)檢查點——將定義流計算的信息保存到像HDFS這樣的容錯存儲中。這用于從運行流應(yīng)用程序驅(qū)動程序的節(jié)點的故障中恢復(fù)(稍后將詳細討論)胜蛉。元數(shù)據(jù)包括:
    1.1) 配置—用于創(chuàng)建流應(yīng)用程序的配置挠进。
    1.2) DStream操作-定義流應(yīng)用程序的DStream操作集。
    1.3) 未完成批-作業(yè)已排隊但尚未完成的批誊册。

  2. 數(shù)據(jù)檢查點——將生成的rdd保存到可靠的存儲中领突。在一些跨多個批組合數(shù)據(jù)的有狀態(tài)轉(zhuǎn)換中,這是必要的案怯。在這種轉(zhuǎn)換中君旦,生成的rdd依賴于以前批次的rdd,這導致依賴鏈的長度隨著時間不斷增加。為了避免恢復(fù)時間的無限增長(與依賴鏈成正比)于宙,有狀態(tài)轉(zhuǎn)換的中間rdd會定期被檢查到可靠的存儲(例如HDFS)浮驳,以切斷依賴鏈。

總之捞魁,元數(shù)據(jù)檢查點主要用于從驅(qū)動程序失敗中恢復(fù)至会,而數(shù)據(jù)或RDD檢查點即使是用于基本功能(如果使用有狀態(tài)轉(zhuǎn)換)也是必要的。

三.一個簡單的測試用例

3.1 linux服務(wù)器安裝nc服務(wù)

yum -y install netcat.x86_64    -- centos7 正確
yum -y install nc.x86_64          -- centos7 錯誤

nc -lk 9999
[root@hp2 yum.repos.d]# nc -help
usage: nc [-46cDdFhklNnrStUuvz] [-C certfile] [-e name] [-H hash] [-I length]
          [-i interval] [-K keyfile] [-M ttl] [-m minttl] [-O length]
          [-o staplefile] [-P proxy_username] [-p source_port] [-R CAfile]
          [-s sourceaddr] [-T keyword] [-V rtable] [-W recvlimit] [-w timeout]
          [-X proxy_protocol] [-x proxy_address[:port]] [-Z peercertfile]
          [destination] [port]
        Command Summary:
                -4              Use IPv4
                -6              Use IPv6
                -C certfile     Public key file
                -c              Use TLS
                -D              Enable the debug socket option
                -d              Detach from stdin
                -e name         Required name in peer certificate
                -F              Pass socket fd
                -H hash         Hash string of peer certificate
                -h              This help text
                -I length       TCP receive buffer length
                -i interval     Delay interval for lines sent, ports scanned
                -K keyfile      Private key file
                -k              Keep inbound sockets open for multiple connects
                -l              Listen mode, for inbound connects
                -M ttl          Outgoing TTL / Hop Limit
                -m minttl       Minimum incoming TTL / Hop Limit
                -N              Shutdown the network socket after EOF on stdin
                -n              Suppress name/port resolutions
                -O length       TCP send buffer length
                -o staplefile   Staple file
                -P proxyuser    Username for proxy authentication
                -p port         Specify local port for remote connects
                -R CAfile       CA bundle
                -r              Randomize remote ports
                -S              Enable the TCP MD5 signature option
                -s sourceaddr   Local source address
                -T keyword      TOS value or TLS options
                -t              Answer TELNET negotiation
                -U              Use UNIX domain socket
                -u              UDP mode
                -V rtable       Specify alternate routing table
                -v              Verbose
                -W recvlimit    Terminate after receiving a number of packets
                -w timeout      Timeout for connects and final net reads
                -X proto        Proxy protocol: "4", "5" (SOCKS) or "connect"
                -x addr[:port]  Specify proxy address and port
                -Z              Peer certificate file
                -z              Zero-I/O mode [used for scanning]
        Port numbers can be individual or ranges: lo-hi [inclusive]
[root@hp2 yum.repos.d]# 
[root@hp2 yum.repos.d]# 
[root@hp2 yum.repos.d]# 
[root@hp2 yum.repos.d]# nc -lk 9999

3.2 Java spark代碼

maven配置:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.4.0</version>
</dependency>

代碼:

package org.example;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

import java.util.Arrays;


public class SparkStreaming1 {
    public static void main(String[] args) throws Exception{
        // Create a local StreamingContext with two working thread and batch interval of 1 second
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

        // Create a DStream that will connect to hostname:port, like localhost:9999
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

        // Split each line into words
        JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());

        // Count each word in each batch
        JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);

        // Print the first ten elements of each RDD generated in this DStream to the console
        wordCounts.print();

        jssc.start();              // Start the computation
        jssc.awaitTermination();   // Wait for the computation to terminate
    }
}

運行spark程序代碼:

spark-submit \
  --class org.example.SparkStreaming1 \
  --master local[2] \
  /home/javaspark/SparkStudy-1.0-SNAPSHOT.jar

測試記錄:

image.png

滾動太快谱俭,只能從日志中找到記錄

image.png

參考:

1.http://spark.apache.org/docs/latest/streaming-programming-guide.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末奉件,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子昆著,更是在濱河造成了極大的恐慌县貌,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,265評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件凑懂,死亡現(xiàn)場離奇詭異煤痕,居然都是意外死亡,警方通過查閱死者的電腦和手機接谨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評論 2 385
  • 文/潘曉璐 我一進店門摆碉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人脓豪,你說我怎么就攤上這事巷帝。” “怎么了扫夜?”我有些...
    開封第一講書人閱讀 156,852評論 0 347
  • 文/不壞的土叔 我叫張陵楞泼,是天一觀的道長。 經(jīng)常有香客問我,道長,這世上最難降的妖魔是什么唤崭? 我笑而不...
    開封第一講書人閱讀 56,408評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮超陆,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘脱衙。我一直安慰自己侥猬,他們只是感情好,可當我...
    茶點故事閱讀 65,445評論 5 384
  • 文/花漫 我一把揭開白布捐韩。 她就那樣靜靜地躺著退唠,像睡著了一般。 火紅的嫁衣襯著肌膚如雪荤胁。 梳的紋絲不亂的頭發(fā)上瞧预,一...
    開封第一講書人閱讀 49,772評論 1 290
  • 那天,我揣著相機與錄音,去河邊找鬼垢油。 笑死盆驹,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的滩愁。 我是一名探鬼主播躯喇,決...
    沈念sama閱讀 38,921評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼硝枉!你這毒婦竟也來了廉丽?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,688評論 0 266
  • 序言:老撾萬榮一對情侶失蹤妻味,失蹤者是張志新(化名)和其女友劉穎正压,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體责球,經(jīng)...
    沈念sama閱讀 44,130評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡焦履,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,467評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了雏逾。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片嘉裤。...
    茶點故事閱讀 38,617評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖校套,靈堂內(nèi)的尸體忽然破棺而出价脾,到底是詐尸還是另有隱情牧抵,我是刑警寧澤笛匙,帶...
    沈念sama閱讀 34,276評論 4 329
  • 正文 年R本政府宣布,位于F島的核電站犀变,受9級特大地震影響妹孙,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜获枝,卻給世界環(huán)境...
    茶點故事閱讀 39,882評論 3 312
  • 文/蒙蒙 一蠢正、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧省店,春花似錦嚣崭、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至粗俱,卻和暖如春说榆,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評論 1 265
  • 我被黑心中介騙來泰國打工签财, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留串慰,地道東北人。 一個月前我還...
    沈念sama閱讀 46,315評論 2 360
  • 正文 我出身青樓唱蒸,卻偏偏與公主長得像邦鲫,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子神汹,可洞房花燭夜當晚...
    茶點故事閱讀 43,486評論 2 348

推薦閱讀更多精彩內(nèi)容