pyspark系列7-Spark streaming介紹

一.Spark streaming介紹

1.1 Spark streaming簡介

Spark Streaming是Spark API的核心擴展留夜,支持實時數(shù)據(jù)流的可擴展、高吞吐量和容錯流處理古胆。數(shù)據(jù)可以從Kafka蒂教、Kinesis或TCP套接字等多種來源中獲取,并且可以使用復雜的算法進行處理筷频,這些算法用高級函數(shù)表示熏挎,如map、reduce晌砾、join和window坎拐。最后,處理過的數(shù)據(jù)可以推送到文件系統(tǒng)、數(shù)據(jù)庫和實時儀表板哼勇。事實上都伪,您可以在數(shù)據(jù)流上應用Spark的機器學習和圖形處理算法。


image.png

在內(nèi)部积担,它的工作方式如下陨晶。Spark Streaming接收實時輸入的數(shù)據(jù)流,并對數(shù)據(jù)進行分批處理帝璧,由Spark引擎進行處理先誉,生成最終的批量結果流。


image.png

Spark Streaming提供了一種高級抽象的烁,稱為離散流或DStream褐耳,它表示連續(xù)的數(shù)據(jù)流。Dstream可以通過來自Kafka和Kinesis等源的輸入數(shù)據(jù)流創(chuàng)建渴庆,也可以通過在其他Dstream上應用高級操作來創(chuàng)建铃芦。在內(nèi)部,DStream表示為rdd序列襟雷。

1.2 Spark 與storm區(qū)別

Storm

  1. 流式計算框架
  2. 以record為單位處理數(shù)據(jù)
  3. 也支持micro-batch方式(Trident)

Spark

  1. 批處理計算框架
  2. 以RDD為單位處理數(shù)據(jù)
  3. 也支持micro-batch流式處理數(shù)據(jù)(Spark Streaming)

兩者異同

  1. 吞吐量: Spark Streaming 優(yōu)于Storm
  2. 延遲: Spark Streaming差于Storm

1.3 一個簡單的例子

在我們深入了解如何編寫自己的Spark Streaming程序之前刃滓,讓我們快速了解一下簡單的Spark Streaming程序是什么樣的。

首先耸弄,我們導入StreamingContext咧虎,它是所有流功能的主要入口點。我們創(chuàng)建一個具有兩個執(zhí)行線程的本地StreamingContext叙赚,批處理間隔為1秒老客。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

為了初始化一個Spark Streaming程序,必須創(chuàng)建一個StreamingContext對象震叮,它是所有Spark Streaming功能的主要入口點胧砰。

appName參數(shù)是應用程序在集群UI上顯示的名稱。master是Spark苇瓣、Mesos或YARN集群的URL尉间,或者是一個特殊的“l(fā)ocal[*]”字符串,在本地模式下運行击罪。實際上哲嘲,當在集群上運行時,您不希望在程序中硬編碼master媳禁,而是使用spark-submit啟動應用程序并在那里接收它眠副。但是,對于本地測試和單元測試竣稽,可以通過“l(fā)ocal[*]”來運行Spark Streaming in-process(檢測本地系統(tǒng)中的核數(shù))囱怕。

在定義了上下文之后霍弹,必須執(zhí)行以下操作:

  1. 通過創(chuàng)建輸入DStreams來定義輸入源。
  2. 通過對DStreams應用轉(zhuǎn)換和輸出操作來定義流計算娃弓。
  3. 開始接收數(shù)據(jù)并使用streamingContext.start()處理它典格。
  4. 使用streamingContext.awaitTermination()等待處理停止(手動或由于任何錯誤)。
  5. 可以使用streamingContext.stop()手動停止處理台丛。

注意:

  1. 一旦啟動了Context耍缴,就不能再設置或向其添加新的流計算。
  2. 一旦停止了Context挽霉,就不能重新啟動它防嗡。
  3. 同一時間,JVM中只能有一個StreamingContext是活動的炼吴。
  4. StreamingContext上的stop()也會停止SparkContext本鸣。要只停止StreamingContext,請將stop()的可選參數(shù)stopSparkContext設置為false硅蹦。
  5. 一個SparkContext可以被重用來創(chuàng)建多個StreamingContext荣德,只要在創(chuàng)建下一個StreamingContext之前停止前一個StreamingContext(不停止SparkContext)。

二.Spark Streaming的組件介紹

Spark Streaming的核心組件有2個:

  1. Streaming Context
  2. Dstream(離散流)

2.1 Streaming Context

Streaming Context是Spark Streaming程序的起點童芹,生成Streaming Context之前需要生成SparkContext涮瞻,SparkContext可以理解為申請Spark集群的計算資源,Streaming Context可以理解為申請Spark Streaming的計算資源

2.2 Dstream(離散流)

Dstream是Spark Streaming的數(shù)據(jù)抽象假褪,同DataFrame署咽,其實底層依舊是RDD。

Discretized Stream或DStream是Spark Streaming提供的基本抽象生音。它表示一個連續(xù)的數(shù)據(jù)流宁否,要么是從源接收的輸入數(shù)據(jù)流,要么是通過轉(zhuǎn)換輸入流生成的處理數(shù)據(jù)流缀遍。在內(nèi)部慕匠,DStream由一系列連續(xù)的rdd表示,這是Spark對不可變的分布式數(shù)據(jù)集的抽象域醇。DStream中的每個RDD都包含一定時間間隔的數(shù)據(jù)台谊,如下圖所示:


image.png

在DStream上應用的任何操作都轉(zhuǎn)換為在底層rdd上的操作。


image.png

這些底層RDD轉(zhuǎn)換是由Spark引擎計算的譬挚。DStream操作隱藏了大部分細節(jié)锅铅,并為開發(fā)人員提供了更高級的API。

DStream存在如下概念:

  1. Receiver
  2. 數(shù)據(jù)源: 基本源减宣、高級源
  3. 可靠性
  4. Dstream的操作
  5. 緩存
  6. Checkpoint
image.png

2.1 Receiver

每個輸入DStream(文件流除外)都與一個Receiver (Scala doc, Java doc)對象相關聯(lián)盐须,接收來自源的數(shù)據(jù)并將其存儲在Spark的內(nèi)存中進行處理。

2.2 數(shù)據(jù)源

Spark Streaming提供了兩類內(nèi)置流源:

  1. 基本源:在StreamingContext API中直接可用的源漆腌。例如文件系統(tǒng)和套接字連接丰歌。
  2. 高級資源:像Kafka, Kinesis等資源可以通過額外的實用程序類獲得姨蟋。這些需要根據(jù)鏈接部分中討論的額外依賴項進行鏈接。

注意,如果希望在流應用程序中并行接收多個數(shù)據(jù)流,可以創(chuàng)建多個輸入Dstream趁矾。這將創(chuàng)建多個接收器石咬,這些接收器將同時接收多個數(shù)據(jù)流。但是請注意戳粒,Spark worker/executor是一個長期運行的任務,因此它占用分配給Spark Streaming應用程序的一個核心。因此绑咱,Spark Streaming應用程序需要分配足夠的內(nèi)核(或者線程,如果在本地運行的話)來處理接收到的數(shù)據(jù)枢泰,以及運行接收端描融,記住這一點很重要。

記住
在本地運行Spark Streaming程序時衡蚂,不要使用“l(fā)ocal”或“l(fā)ocal[1]”作為主URL窿克。這兩種情況都意味著只有一個線程用于本地運行任務。如果你使用一個基于接收器的輸入DStream(例如毛甲,socket, Kafka等)年叮,那么單線程將被用來運行Receiver ,不留下任何線程來處理接收的數(shù)據(jù)玻募。因此只损,當本地運行時,總是使用“l(fā)ocal[n]”作為主URL七咧,其中要運行n個>數(shù)量的Receiver 跃惫。

將邏輯擴展到集群上,分配給Spark Streaming應用的內(nèi)核數(shù)必須大于接收端數(shù)艾栋。否則系統(tǒng)將接收到數(shù)據(jù)爆存,但無法進行處理。

2.3 可靠性

根據(jù)數(shù)據(jù)源的可靠性裹粤,可以有兩種數(shù)據(jù)源终蒂。源(如Kafka)允許傳輸?shù)臄?shù)據(jù)被確認。如果從這些可靠來源接收數(shù)據(jù)的系統(tǒng)正確地確認了接收的數(shù)據(jù)遥诉,就可以確保不會由于任何類型的故障而丟失數(shù)據(jù)拇泣。這就產(chǎn)生了兩種接收者:
1). 可靠的接收端—當數(shù)據(jù)被接收到并存儲在Spark中并進行復制時,一個可靠的接收端會正確地向一個可靠的源發(fā)送確認矮锈。
2), 不可靠的接收者——不可靠的接收者不向源發(fā)送確認霉翔。這可以用于不支持確認的來源,甚至當一個人不想或需要進入確認的復雜性時苞笨,用于可靠的來源债朵。

對于不可靠的接收者子眶,Spark streaming有自己的可靠機制,來保證數(shù)據(jù)的可靠性序芦。

2.4 Dstream的操作

與rdd類似臭杰,轉(zhuǎn)換允許修改來自輸入DStream的數(shù)據(jù)。DStreams支持許多普通Spark RDD上可用的轉(zhuǎn)換谚中。下面是一些常見的.

Transformations on DStreams

image.png

Output Operations on DStreams:

image.png

2.5 緩存

與rdd類似渴杆,DStreams也允許開發(fā)人員在內(nèi)存中持久化流數(shù)據(jù)。也就是說宪塔,在DStream上使用persist()方法將自動在內(nèi)存中持久化該DStream的每個RDD磁奖。如果DStream中的數(shù)據(jù)將被計算多次(例如,對同一數(shù)據(jù)的多次操作)某筐,這是有用的比搭。對于基于窗口的操作,如reduceByWindow和reduceByKeyAndWindow南誊,以及基于狀態(tài)的操作身诺,如updateStateByKey,這是隱式true弟疆。因此戚长,由基于窗口的操作生成的DStreams會自動持久化到內(nèi)存中,而不需要開發(fā)人員調(diào)用persist()怠苔。

對于通過網(wǎng)絡接收數(shù)據(jù)的輸入流(例如同廉,Kafka, socket等),默認的持久性級別被設置為將數(shù)據(jù)復制到兩個節(jié)點以實現(xiàn)容錯柑司。

注意迫肖,與rdd不同,DStreams的默認持久性級別將數(shù)據(jù)序列化保存在內(nèi)存中攒驰。

2.6 Checkpoint

流應用程序必須全天候運行蟆湖,因此必須對與應用程序邏輯無關的故障(例如,系統(tǒng)故障玻粪、JVM崩潰等)具有彈性隅津。為了使這成為可能,Spark Streaming需要對容錯存儲系統(tǒng)進行足夠的信息檢查點劲室,以便從故障中恢復伦仍。有兩種類型的數(shù)據(jù)是檢查點的:

  1. 元數(shù)據(jù)檢查點——將定義流計算的信息保存到像HDFS這樣的容錯存儲中。這用于從運行流應用程序驅(qū)動程序的節(jié)點的故障中恢復(稍后將詳細討論)很洋。元數(shù)據(jù)包括:
    1.1) 配置—用于創(chuàng)建流應用程序的配置充蓝。
    1.2) DStream操作-定義流應用程序的DStream操作集。
    1.3) 未完成批-作業(yè)已排隊但尚未完成的批。

  2. 數(shù)據(jù)檢查點——將生成的rdd保存到可靠的存儲中谓苟。在一些跨多個批組合數(shù)據(jù)的有狀態(tài)轉(zhuǎn)換中官脓,這是必要的。在這種轉(zhuǎn)換中涝焙,生成的rdd依賴于以前批次的rdd卑笨,這導致依賴鏈的長度隨著時間不斷增加。為了避免恢復時間的無限增長(與依賴鏈成正比)仑撞,有狀態(tài)轉(zhuǎn)換的中間rdd會定期被檢查到可靠的存儲(例如HDFS)湾趾,以切斷依賴鏈。

總之派草,元數(shù)據(jù)檢查點主要用于從驅(qū)動程序失敗中恢復,而數(shù)據(jù)或RDD檢查點即使是用于基本功能(如果使用有狀態(tài)轉(zhuǎn)換)也是必要的铛楣。

三.一個簡單的測試用例

3.1 linux服務器安裝nc服務

yum -y install netcat.x86_64    -- centos7 正確
yum -y install nc.x86_64          -- centos7 錯誤

nc -lk 9999
[root@hp2 yum.repos.d]# nc -help
usage: nc [-46cDdFhklNnrStUuvz] [-C certfile] [-e name] [-H hash] [-I length]
          [-i interval] [-K keyfile] [-M ttl] [-m minttl] [-O length]
          [-o staplefile] [-P proxy_username] [-p source_port] [-R CAfile]
          [-s sourceaddr] [-T keyword] [-V rtable] [-W recvlimit] [-w timeout]
          [-X proxy_protocol] [-x proxy_address[:port]] [-Z peercertfile]
          [destination] [port]
        Command Summary:
                -4              Use IPv4
                -6              Use IPv6
                -C certfile     Public key file
                -c              Use TLS
                -D              Enable the debug socket option
                -d              Detach from stdin
                -e name         Required name in peer certificate
                -F              Pass socket fd
                -H hash         Hash string of peer certificate
                -h              This help text
                -I length       TCP receive buffer length
                -i interval     Delay interval for lines sent, ports scanned
                -K keyfile      Private key file
                -k              Keep inbound sockets open for multiple connects
                -l              Listen mode, for inbound connects
                -M ttl          Outgoing TTL / Hop Limit
                -m minttl       Minimum incoming TTL / Hop Limit
                -N              Shutdown the network socket after EOF on stdin
                -n              Suppress name/port resolutions
                -O length       TCP send buffer length
                -o staplefile   Staple file
                -P proxyuser    Username for proxy authentication
                -p port         Specify local port for remote connects
                -R CAfile       CA bundle
                -r              Randomize remote ports
                -S              Enable the TCP MD5 signature option
                -s sourceaddr   Local source address
                -T keyword      TOS value or TLS options
                -t              Answer TELNET negotiation
                -U              Use UNIX domain socket
                -u              UDP mode
                -V rtable       Specify alternate routing table
                -v              Verbose
                -W recvlimit    Terminate after receiving a number of packets
                -w timeout      Timeout for connects and final net reads
                -X proto        Proxy protocol: "4", "5" (SOCKS) or "connect"
                -x addr[:port]  Specify proxy address and port
                -Z              Peer certificate file
                -z              Zero-I/O mode [used for scanning]
        Port numbers can be individual or ranges: lo-hi [inclusive]
[root@hp2 yum.repos.d]# 
[root@hp2 yum.repos.d]# 
[root@hp2 yum.repos.d]# 
[root@hp2 yum.repos.d]# nc -lk 9999

3.2 pyspark代碼

代碼:

#!/usr/bin/env python
# encoding: utf-8


"""
@author:  'Administrator'
@contact:
@time:
"""

#!/usr/bin/python
# encoding: utf-8

#
# Streaming Word Count Example
#    Original Source: https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html
#
# To run this example:
# yum install nc.x86_64
#   Terminal 1:  nc -lk 9999
#   Terminal 2:  ./bin/spark-submit streaming_word_count.py
#   Note, type words into Terminal 1
#

# Import the necessary classes and create a local SparkContext and Streaming Contexts
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__=='__main__':
    # Create Spark Context with two working threads (note, `local[2]`)
    sc = SparkContext("local[2]", "NetworkWordCount")

    # Create local StreamingContextwith batch interval of 1 second
    ssc = StreamingContext(sc, 1)

    # Create DStream that will connect to the stream of input lines from connection to localhost:9999
    # lines is DStream representing the data stream extracted via the ssc.socketTextStream.

    lines = ssc.socketTextStream("localhost", 9999)

    # Split lines into words
    words = lines.flatMap(lambda line: line.split(" "))

    # Count each word in each batch
    pairs = words.map(lambda word: (word, 1))
    wordCounts = pairs.reduceByKey(lambda x, y: x + y)

    # Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.pprint()

    # Start the computation
    ssc.start()

    # Wait for the computation to terminate
    ssc.awaitTermination()

測試記錄:

image.png

滾動太快近迁,只能從日志中找到記錄


image.png

參考:

1.http://spark.apache.org/docs/latest/streaming-programming-guide.html

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市簸州,隨后出現(xiàn)的幾起案子鉴竭,更是在濱河造成了極大的恐慌,老刑警劉巖岸浑,帶你破解...
    沈念sama閱讀 211,265評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件搏存,死亡現(xiàn)場離奇詭異,居然都是意外死亡矢洲,警方通過查閱死者的電腦和手機璧眠,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評論 2 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來读虏,“玉大人责静,你說我怎么就攤上這事「乔牛” “怎么了灾螃?”我有些...
    開封第一講書人閱讀 156,852評論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長揩徊。 經(jīng)常有香客問我腰鬼,道長,這世上最難降的妖魔是什么塑荒? 我笑而不...
    開封第一講書人閱讀 56,408評論 1 283
  • 正文 為了忘掉前任熄赡,我火速辦了婚禮,結果婚禮上袜炕,老公的妹妹穿的比我還像新娘本谜。我一直安慰自己,他們只是感情好偎窘,可當我...
    茶點故事閱讀 65,445評論 5 384
  • 文/花漫 我一把揭開白布乌助。 她就那樣靜靜地躺著溜在,像睡著了一般。 火紅的嫁衣襯著肌膚如雪他托。 梳的紋絲不亂的頭發(fā)上掖肋,一...
    開封第一講書人閱讀 49,772評論 1 290
  • 那天,我揣著相機與錄音赏参,去河邊找鬼志笼。 笑死,一個胖子當著我的面吹牛把篓,可吹牛的內(nèi)容都是我干的纫溃。 我是一名探鬼主播,決...
    沈念sama閱讀 38,921評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼韧掩,長吁一口氣:“原來是場噩夢啊……” “哼紊浩!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起疗锐,我...
    開封第一講書人閱讀 37,688評論 0 266
  • 序言:老撾萬榮一對情侶失蹤坊谁,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后滑臊,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體口芍,經(jīng)...
    沈念sama閱讀 44,130評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,467評論 2 325
  • 正文 我和宋清朗相戀三年雇卷,在試婚紗的時候發(fā)現(xiàn)自己被綠了鬓椭。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,617評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡聋庵,死狀恐怖膘融,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情祭玉,我是刑警寧澤氧映,帶...
    沈念sama閱讀 34,276評論 4 329
  • 正文 年R本政府宣布,位于F島的核電站脱货,受9級特大地震影響岛都,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜振峻,卻給世界環(huán)境...
    茶點故事閱讀 39,882評論 3 312
  • 文/蒙蒙 一臼疫、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧扣孟,春花似錦烫堤、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽拔创。三九已至,卻和暖如春富蓄,著一層夾襖步出監(jiān)牢的瞬間剩燥,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評論 1 265
  • 我被黑心中介騙來泰國打工立倍, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留灭红,地道東北人。 一個月前我還...
    沈念sama閱讀 46,315評論 2 360
  • 正文 我出身青樓口注,卻偏偏與公主長得像变擒,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子寝志,可洞房花燭夜當晚...
    茶點故事閱讀 43,486評論 2 348

推薦閱讀更多精彩內(nèi)容