概述
Spark Streaming是核心Spark API的擴展,對實時數(shù)據(jù)流地處理具有可擴展,高吞吐量和容錯特性走敌。數(shù)據(jù)可從很多源獲取,如Kafka逗噩,F(xiàn)lume掉丽,Kinesis或者TCP sockets,并且可以使用復雜算法進行處理给赞,用高層函數(shù)表示如map
机打,reduce
矫户,join
和window
片迅。最后,處理結(jié)果可以輸出到文件系統(tǒng)皆辽,數(shù)據(jù)庫或者實時儀表盤柑蛇。實際上,你可以在數(shù)據(jù)流上應(yīng)用machine learning和graph processing算法驱闷。
在Spark內(nèi)部耻台,工作流程如下。Spark Streaming接收實時輸入數(shù)據(jù)流并且將數(shù)據(jù)劃分為不同的批次空另,然后交給Spark engine進行處理盆耽,按照批次生成最終的結(jié)果流。
Spark Streaming提供了高層抽象扼菠,叫做離散流( discretized stream)或者DStream摄杂,代表連續(xù)數(shù)據(jù)流。DStream可以通過Kafka循榆,F(xiàn)lume和Kinesis的輸入數(shù)據(jù)流創(chuàng)建析恢,或者通過在其它DStream上應(yīng)用高層操作創(chuàng)建。在Spark內(nèi)部秧饮,DStream是一系列RDD映挂。
快速示例
在詳細介紹如何寫Spark Streaming程序之前,先快速看一下簡單的Spark Streaming程序示例盗尸。假設(shè)我們想計算從數(shù)據(jù)服務(wù)器通過TCP socket發(fā)過來的文本數(shù)據(jù)的單詞數(shù)量柑船,可參見如下代碼。
首先泼各,import Spark Streaming的類和一些隱式轉(zhuǎn)換鞍时。StreamingContext是所有流功能的主入口。我們創(chuàng)建一個本地StreamingContext历恐,2個執(zhí)行線程寸癌,批時間間隔為1s专筷。
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
使用這個context,可以創(chuàng)建DStream蒸苇,代表TCP源的數(shù)據(jù)流磷蛹,指定 hostname (e.g. localhost
)和port (e.g. 9999
)。
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
lines
DStream代表從數(shù)據(jù)服務(wù)器接收的數(shù)據(jù)流溪烤。DStream中的每條記錄是文本的一行味咳。接下來,用空格將每行切分成單詞檬嘀。
// Split each line into words
val words = lines.flatMap(_.split(" "))
flatMap
是一個一對多的DStream操作槽驶,通過從源DStream中的每條記錄創(chuàng)建多條新紀錄來創(chuàng)建新DStream。我們這個例子中鸳兽,每行會被切分成多個單詞掂铐,words
DStream代表單詞流。接下來揍异,對單詞進行計數(shù)全陨。
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
words
DStream進行map(一對一轉(zhuǎn)換)到一個(word, 1)
DStream,然后reduce獲取每個批次數(shù)據(jù)的詞頻衷掷。最后辱姨,wordCounts.print()
會打印其中一些詞頻。
注意戚嗅,上面代碼行執(zhí)行時雨涛,Spark Streaming只會設(shè)置啟動時要執(zhí)行的計算,不會開始真正的處理懦胞。要在所有轉(zhuǎn)換完成后開始進行處理替久,調(diào)用以下方法:
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
完整代碼可參見NetworkWordCount。
如果你已經(jīng)下載并且構(gòu)建了Spark医瘫,可以運行下面的示例侣肄。需要先運行Netcat(一個小工具,大多數(shù)類Unix系統(tǒng)都有)作為數(shù)據(jù)服務(wù)器醇份,如下:
$ nc -lk 9999
然后稼锅,在另外一個終端中,啟動下面的示例:
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
然后僚纷,在運行netcat服務(wù)的終端中輸入的任意行都會每秒進行計數(shù)并打印出來矩距。如下:
# TERMINAL 1:
# Running Netcat
$ nc -lk 9999
hello world
...
# TERMINAL 2: RUNNING NetworkWordCount
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...