一.Spark streaming介紹
1.1 Spark streaming簡介
Spark Streaming是Spark API的核心擴展留夜,支持實時數(shù)據(jù)流的可擴展、高吞吐量和容錯流處理古胆。數(shù)據(jù)可以從Kafka蒂教、Kinesis或TCP套接字等多種來源中獲取,并且可以使用復雜的算法進行處理筷频,這些算法用高級函數(shù)表示熏挎,如map、reduce晌砾、join和window坎拐。最后,處理過的數(shù)據(jù)可以推送到文件系統(tǒng)、數(shù)據(jù)庫和實時儀表板哼勇。事實上都伪,您可以在數(shù)據(jù)流上應用Spark的機器學習和圖形處理算法。
在內(nèi)部积担,它的工作方式如下陨晶。Spark Streaming接收實時輸入的數(shù)據(jù)流,并對數(shù)據(jù)進行分批處理帝璧,由Spark引擎進行處理先誉,生成最終的批量結果流。
Spark Streaming提供了一種高級抽象的烁,稱為離散流或DStream褐耳,它表示連續(xù)的數(shù)據(jù)流。Dstream可以通過來自Kafka和Kinesis等源的輸入數(shù)據(jù)流創(chuàng)建渴庆,也可以通過在其他Dstream上應用高級操作來創(chuàng)建铃芦。在內(nèi)部,DStream表示為rdd序列襟雷。
1.2 Spark 與storm區(qū)別
Storm
- 流式計算框架
- 以record為單位處理數(shù)據(jù)
- 也支持micro-batch方式(Trident)
Spark
- 批處理計算框架
- 以RDD為單位處理數(shù)據(jù)
- 也支持micro-batch流式處理數(shù)據(jù)(Spark Streaming)
兩者異同
- 吞吐量: Spark Streaming 優(yōu)于Storm
- 延遲: Spark Streaming差于Storm
1.3 一個簡單的例子
在我們深入了解如何編寫自己的Spark Streaming程序之前刃滓,讓我們快速了解一下簡單的Spark Streaming程序是什么樣的。
首先耸弄,我們導入StreamingContext咧虎,它是所有流功能的主要入口點。我們創(chuàng)建一個具有兩個執(zhí)行線程的本地StreamingContext叙赚,批處理間隔為1秒老客。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
為了初始化一個Spark Streaming程序,必須創(chuàng)建一個StreamingContext對象震叮,它是所有Spark Streaming功能的主要入口點胧砰。
appName參數(shù)是應用程序在集群UI上顯示的名稱。master是Spark苇瓣、Mesos或YARN集群的URL尉间,或者是一個特殊的“l(fā)ocal[*]”字符串,在本地模式下運行击罪。實際上哲嘲,當在集群上運行時,您不希望在程序中硬編碼master媳禁,而是使用spark-submit啟動應用程序并在那里接收它眠副。但是,對于本地測試和單元測試竣稽,可以通過“l(fā)ocal[*]”來運行Spark Streaming in-process(檢測本地系統(tǒng)中的核數(shù))囱怕。
在定義了上下文之后霍弹,必須執(zhí)行以下操作:
- 通過創(chuàng)建輸入DStreams來定義輸入源。
- 通過對DStreams應用轉(zhuǎn)換和輸出操作來定義流計算娃弓。
- 開始接收數(shù)據(jù)并使用streamingContext.start()處理它典格。
- 使用streamingContext.awaitTermination()等待處理停止(手動或由于任何錯誤)。
- 可以使用streamingContext.stop()手動停止處理台丛。
注意:
- 一旦啟動了Context耍缴,就不能再設置或向其添加新的流計算。
- 一旦停止了Context挽霉,就不能重新啟動它防嗡。
- 同一時間,JVM中只能有一個StreamingContext是活動的炼吴。
- StreamingContext上的stop()也會停止SparkContext本鸣。要只停止StreamingContext,請將stop()的可選參數(shù)stopSparkContext設置為false硅蹦。
- 一個SparkContext可以被重用來創(chuàng)建多個StreamingContext荣德,只要在創(chuàng)建下一個StreamingContext之前停止前一個StreamingContext(不停止SparkContext)。
二.Spark Streaming的組件介紹
Spark Streaming的核心組件有2個:
- Streaming Context
- Dstream(離散流)
2.1 Streaming Context
Streaming Context是Spark Streaming程序的起點童芹,生成Streaming Context之前需要生成SparkContext涮瞻,SparkContext可以理解為申請Spark集群的計算資源,Streaming Context可以理解為申請Spark Streaming的計算資源
2.2 Dstream(離散流)
Dstream是Spark Streaming的數(shù)據(jù)抽象假褪,同DataFrame署咽,其實底層依舊是RDD。
Discretized Stream或DStream是Spark Streaming提供的基本抽象生音。它表示一個連續(xù)的數(shù)據(jù)流宁否,要么是從源接收的輸入數(shù)據(jù)流,要么是通過轉(zhuǎn)換輸入流生成的處理數(shù)據(jù)流缀遍。在內(nèi)部慕匠,DStream由一系列連續(xù)的rdd表示,這是Spark對不可變的分布式數(shù)據(jù)集的抽象域醇。DStream中的每個RDD都包含一定時間間隔的數(shù)據(jù)台谊,如下圖所示:
在DStream上應用的任何操作都轉(zhuǎn)換為在底層rdd上的操作。
這些底層RDD轉(zhuǎn)換是由Spark引擎計算的譬挚。DStream操作隱藏了大部分細節(jié)锅铅,并為開發(fā)人員提供了更高級的API。
DStream存在如下概念:
- Receiver
- 數(shù)據(jù)源: 基本源减宣、高級源
- 可靠性
- Dstream的操作
- 緩存
- Checkpoint
2.1 Receiver
每個輸入DStream(文件流除外)都與一個Receiver (Scala doc, Java doc)對象相關聯(lián)盐须,接收來自源的數(shù)據(jù)并將其存儲在Spark的內(nèi)存中進行處理。
2.2 數(shù)據(jù)源
Spark Streaming提供了兩類內(nèi)置流源:
- 基本源:在StreamingContext API中直接可用的源漆腌。例如文件系統(tǒng)和套接字連接丰歌。
- 高級資源:像Kafka, Kinesis等資源可以通過額外的實用程序類獲得姨蟋。這些需要根據(jù)鏈接部分中討論的額外依賴項進行鏈接。
注意,如果希望在流應用程序中并行接收多個數(shù)據(jù)流,可以創(chuàng)建多個輸入Dstream趁矾。這將創(chuàng)建多個接收器石咬,這些接收器將同時接收多個數(shù)據(jù)流。但是請注意戳粒,Spark worker/executor是一個長期運行的任務,因此它占用分配給Spark Streaming應用程序的一個核心。因此绑咱,Spark Streaming應用程序需要分配足夠的內(nèi)核(或者線程,如果在本地運行的話)來處理接收到的數(shù)據(jù)枢泰,以及運行接收端描融,記住這一點很重要。
記住
在本地運行Spark Streaming程序時衡蚂,不要使用“l(fā)ocal”或“l(fā)ocal[1]”作為主URL窿克。這兩種情況都意味著只有一個線程用于本地運行任務。如果你使用一個基于接收器的輸入DStream(例如毛甲,socket, Kafka等)年叮,那么單線程將被用來運行Receiver ,不留下任何線程來處理接收的數(shù)據(jù)玻募。因此只损,當本地運行時,總是使用“l(fā)ocal[n]”作為主URL七咧,其中要運行n個>數(shù)量的Receiver 跃惫。
將邏輯擴展到集群上,分配給Spark Streaming應用的內(nèi)核數(shù)必須大于接收端數(shù)艾栋。否則系統(tǒng)將接收到數(shù)據(jù)爆存,但無法進行處理。
2.3 可靠性
根據(jù)數(shù)據(jù)源的可靠性裹粤,可以有兩種數(shù)據(jù)源终蒂。源(如Kafka)允許傳輸?shù)臄?shù)據(jù)被確認。如果從這些可靠來源接收數(shù)據(jù)的系統(tǒng)正確地確認了接收的數(shù)據(jù)遥诉,就可以確保不會由于任何類型的故障而丟失數(shù)據(jù)拇泣。這就產(chǎn)生了兩種接收者:
1). 可靠的接收端—當數(shù)據(jù)被接收到并存儲在Spark中并進行復制時,一個可靠的接收端會正確地向一個可靠的源發(fā)送確認矮锈。
2), 不可靠的接收者——不可靠的接收者不向源發(fā)送確認霉翔。這可以用于不支持確認的來源,甚至當一個人不想或需要進入確認的復雜性時苞笨,用于可靠的來源债朵。
對于不可靠的接收者子眶,Spark streaming有自己的可靠機制,來保證數(shù)據(jù)的可靠性序芦。
2.4 Dstream的操作
與rdd類似臭杰,轉(zhuǎn)換允許修改來自輸入DStream的數(shù)據(jù)。DStreams支持許多普通Spark RDD上可用的轉(zhuǎn)換谚中。下面是一些常見的.
Transformations on DStreams
Output Operations on DStreams:
2.5 緩存
與rdd類似渴杆,DStreams也允許開發(fā)人員在內(nèi)存中持久化流數(shù)據(jù)。也就是說宪塔,在DStream上使用persist()方法將自動在內(nèi)存中持久化該DStream的每個RDD磁奖。如果DStream中的數(shù)據(jù)將被計算多次(例如,對同一數(shù)據(jù)的多次操作)某筐,這是有用的比搭。對于基于窗口的操作,如reduceByWindow和reduceByKeyAndWindow南誊,以及基于狀態(tài)的操作身诺,如updateStateByKey,這是隱式true弟疆。因此戚长,由基于窗口的操作生成的DStreams會自動持久化到內(nèi)存中,而不需要開發(fā)人員調(diào)用persist()怠苔。
對于通過網(wǎng)絡接收數(shù)據(jù)的輸入流(例如同廉,Kafka, socket等),默認的持久性級別被設置為將數(shù)據(jù)復制到兩個節(jié)點以實現(xiàn)容錯柑司。
注意迫肖,與rdd不同,DStreams的默認持久性級別將數(shù)據(jù)序列化保存在內(nèi)存中攒驰。
2.6 Checkpoint
流應用程序必須全天候運行蟆湖,因此必須對與應用程序邏輯無關的故障(例如,系統(tǒng)故障玻粪、JVM崩潰等)具有彈性隅津。為了使這成為可能,Spark Streaming需要對容錯存儲系統(tǒng)進行足夠的信息檢查點劲室,以便從故障中恢復伦仍。有兩種類型的數(shù)據(jù)是檢查點的:
元數(shù)據(jù)檢查點——將定義流計算的信息保存到像HDFS這樣的容錯存儲中。這用于從運行流應用程序驅(qū)動程序的節(jié)點的故障中恢復(稍后將詳細討論)很洋。元數(shù)據(jù)包括:
1.1) 配置—用于創(chuàng)建流應用程序的配置充蓝。
1.2) DStream操作-定義流應用程序的DStream操作集。
1.3) 未完成批-作業(yè)已排隊但尚未完成的批。數(shù)據(jù)檢查點——將生成的rdd保存到可靠的存儲中谓苟。在一些跨多個批組合數(shù)據(jù)的有狀態(tài)轉(zhuǎn)換中官脓,這是必要的。在這種轉(zhuǎn)換中涝焙,生成的rdd依賴于以前批次的rdd卑笨,這導致依賴鏈的長度隨著時間不斷增加。為了避免恢復時間的無限增長(與依賴鏈成正比)仑撞,有狀態(tài)轉(zhuǎn)換的中間rdd會定期被檢查到可靠的存儲(例如HDFS)湾趾,以切斷依賴鏈。
總之派草,元數(shù)據(jù)檢查點主要用于從驅(qū)動程序失敗中恢復,而數(shù)據(jù)或RDD檢查點即使是用于基本功能(如果使用有狀態(tài)轉(zhuǎn)換)也是必要的铛楣。
三.一個簡單的測試用例
3.1 linux服務器安裝nc服務
yum -y install netcat.x86_64 -- centos7 正確
yum -y install nc.x86_64 -- centos7 錯誤
nc -lk 9999
[root@hp2 yum.repos.d]# nc -help
usage: nc [-46cDdFhklNnrStUuvz] [-C certfile] [-e name] [-H hash] [-I length]
[-i interval] [-K keyfile] [-M ttl] [-m minttl] [-O length]
[-o staplefile] [-P proxy_username] [-p source_port] [-R CAfile]
[-s sourceaddr] [-T keyword] [-V rtable] [-W recvlimit] [-w timeout]
[-X proxy_protocol] [-x proxy_address[:port]] [-Z peercertfile]
[destination] [port]
Command Summary:
-4 Use IPv4
-6 Use IPv6
-C certfile Public key file
-c Use TLS
-D Enable the debug socket option
-d Detach from stdin
-e name Required name in peer certificate
-F Pass socket fd
-H hash Hash string of peer certificate
-h This help text
-I length TCP receive buffer length
-i interval Delay interval for lines sent, ports scanned
-K keyfile Private key file
-k Keep inbound sockets open for multiple connects
-l Listen mode, for inbound connects
-M ttl Outgoing TTL / Hop Limit
-m minttl Minimum incoming TTL / Hop Limit
-N Shutdown the network socket after EOF on stdin
-n Suppress name/port resolutions
-O length TCP send buffer length
-o staplefile Staple file
-P proxyuser Username for proxy authentication
-p port Specify local port for remote connects
-R CAfile CA bundle
-r Randomize remote ports
-S Enable the TCP MD5 signature option
-s sourceaddr Local source address
-T keyword TOS value or TLS options
-t Answer TELNET negotiation
-U Use UNIX domain socket
-u UDP mode
-V rtable Specify alternate routing table
-v Verbose
-W recvlimit Terminate after receiving a number of packets
-w timeout Timeout for connects and final net reads
-X proto Proxy protocol: "4", "5" (SOCKS) or "connect"
-x addr[:port] Specify proxy address and port
-Z Peer certificate file
-z Zero-I/O mode [used for scanning]
Port numbers can be individual or ranges: lo-hi [inclusive]
[root@hp2 yum.repos.d]#
[root@hp2 yum.repos.d]#
[root@hp2 yum.repos.d]#
[root@hp2 yum.repos.d]# nc -lk 9999
3.2 pyspark代碼
代碼:
#!/usr/bin/env python
# encoding: utf-8
"""
@author: 'Administrator'
@contact:
@time:
"""
#!/usr/bin/python
# encoding: utf-8
#
# Streaming Word Count Example
# Original Source: https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html
#
# To run this example:
# yum install nc.x86_64
# Terminal 1: nc -lk 9999
# Terminal 2: ./bin/spark-submit streaming_word_count.py
# Note, type words into Terminal 1
#
# Import the necessary classes and create a local SparkContext and Streaming Contexts
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__=='__main__':
# Create Spark Context with two working threads (note, `local[2]`)
sc = SparkContext("local[2]", "NetworkWordCount")
# Create local StreamingContextwith batch interval of 1 second
ssc = StreamingContext(sc, 1)
# Create DStream that will connect to the stream of input lines from connection to localhost:9999
# lines is DStream representing the data stream extracted via the ssc.socketTextStream.
lines = ssc.socketTextStream("localhost", 9999)
# Split lines into words
words = lines.flatMap(lambda line: line.split(" "))
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()
# Start the computation
ssc.start()
# Wait for the computation to terminate
ssc.awaitTermination()
測試記錄:
滾動太快近迁,只能從日志中找到記錄
參考:
1.http://spark.apache.org/docs/latest/streaming-programming-guide.html