DStreams轉(zhuǎn)換(Transformation)
和RDD類似,轉(zhuǎn)換中允許輸入DStream中的數(shù)據(jù)被修改邪锌。DStream支持很多Spark RDD上的轉(zhuǎn)換。常用的轉(zhuǎn)換如下。
轉(zhuǎn)換 | 含義 |
---|---|
map(func) | 將源DStream中的每個(gè)元素傳給函數(shù)func ,返回新的DStream寡痰。 |
flatMap(func) | 和map類似抗楔,但是每個(gè)輸入條目可以映射到0或多個(gè)輸出條目。 |
filter(func) | 選擇源DStream中經(jīng)過func 處理后返回true的元素拦坠,返回新的DStream连躏。 |
repartition(numPartitions) | 改變DStream的并行級(jí)別,可創(chuàng)建更多或者更少的分區(qū)贞滨。 |
union(otherStream) | 返回新的DStream入热,包含源DStream和otherDStream元素的union。 |
count() | 通過對(duì)每個(gè)RDD的元素計(jì)數(shù)返回單元素RDDs的DStream晓铆。 |
reduce(func) | 使用函數(shù)func(兩個(gè)參數(shù)和一個(gè)返回值)勺良,通過對(duì)每個(gè)RDD元素進(jìn)行聚合返回單元素RDDs的DStream。函數(shù)應(yīng)該是可結(jié)合和可交換的尤蒿,以便進(jìn)行并行計(jì)算郑气。 |
countByValue() | 在元素類型為K的DStream上調(diào)用時(shí),返回一個(gè)(K, Long)對(duì)的DStream腰池,每個(gè)key的value是每個(gè)RDD中這個(gè)key出現(xiàn)的頻率。 |
reduceByKey(func, [numTasks]) | 在(K, V)對(duì)的DStream上調(diào)用時(shí)忙芒,返回一個(gè)新(K, V)對(duì)的DStream示弓,每個(gè)key的value是使用給定reduce函數(shù)進(jìn)行聚合的結(jié)果。注意:默認(rèn)情況呵萨,使用Spark的默認(rèn)并行任務(wù)數(shù)量(本地模式為2奏属,集群模式數(shù)量由spark.default.parallelism 配置項(xiàng)決定)進(jìn)行分組〕甭停可以傳遞可選參數(shù)numTasks 任務(wù)數(shù)量囱皿。 |
join(otherStream, [numTasks]) | 當(dāng)在(K, V)對(duì)的DStream和(K, W)對(duì)的DStream上調(diào)用時(shí),返回(K, (V, W))對(duì)的DStream忱嘹。 |
cogroup(otherStream, [numTasks]) | 當(dāng)在(K, V)對(duì)的DStream和(K, W)對(duì)的DStream上調(diào)用時(shí)嘱腥,返回(K, Seq[V], Seq[W])元組的DStream。 |
transform(func) | 對(duì)源DStream的每個(gè)RDD應(yīng)用一個(gè)RDD-to-RDD的函數(shù)拘悦,返回一個(gè)新DStream齿兔。可用于在DStream上進(jìn)行任意RDD操作础米。 |
updateStateByKey(func) | 返回新的"state" DStream分苇,通過對(duì)key的前一個(gè)state和新值應(yīng)用給定方法更新每個(gè)key的state∑ㄉ#可用于維護(hù)每個(gè)key的任意state數(shù)據(jù)医寿。 |
其中幾個(gè)轉(zhuǎn)換需要詳細(xì)說明。
UpdateStateByKey操作
當(dāng)連續(xù)使用新信息更新state時(shí)蘑斧,updateStateByKey
操作允許用戶維護(hù)任意狀態(tài)靖秩。使用這個(gè)操作艾帐,必須包含以下兩個(gè)步驟。
- 定義state - state可以是任意數(shù)據(jù)類型盆偿。
- 定義state更新函數(shù) - 用一個(gè)函數(shù)指定如何使用前一個(gè)state和來自輸入流的新值更新state柒爸。
在每個(gè)批次,Spark會(huì)對(duì)所有存在的key應(yīng)用state更新函數(shù)事扭,不管這些key是否有新數(shù)據(jù)捎稚。如果更新函數(shù)返回None
,則key-value對(duì)會(huì)消除求橄。
用一個(gè)例子說明今野。維護(hù)每個(gè)單詞的運(yùn)行計(jì)數(shù),運(yùn)行計(jì)數(shù)是一個(gè)state并且是個(gè)整數(shù)罐农。定義更新函數(shù)如下:
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ... // add the new values with the previous running count to get the new count
Some(newCount)
}
應(yīng)用在包含單詞對(duì)(pair
DStream包含(word, 1)
条霜,參見示例)的DStream上。
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
更新函數(shù)會(huì)在每個(gè)單詞上進(jìn)行調(diào)用涵亏,newValues
是1's的序列(來自(word, 1)
對(duì))宰睡,runningCount
是之前的計(jì)數(shù)。
注意气筋,使用updateStateByKey
要求配置檢查點(diǎn)目錄拆内,之后詳細(xì)討論。
Transform操作
transform
操作(以及其變化transformWith
)可以在DStream上應(yīng)用任意RDD-to-RDD函數(shù)宠默◆锘校可用于使用任意沒有暴露給DStream API的RDD操作。例如搀矫,對(duì)數(shù)據(jù)流中每個(gè)批次的數(shù)據(jù)和另一個(gè)數(shù)據(jù)集進(jìn)行join的功能沒有直接暴露給DStream API抹沪。但是,可以簡(jiǎn)單地使用transform
完成瓤球。這增加了很多可能性融欧。例如,通過對(duì)輸入數(shù)據(jù)流和預(yù)先計(jì)算的垃圾信息(也可能是Spark生成的)進(jìn)行join冰垄,完成實(shí)時(shí)數(shù)據(jù)清理蹬癌,然后基于結(jié)果進(jìn)行篩選。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform { rdd =>
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
}
注意虹茶,在每個(gè)批次時(shí)間間隔中逝薪,提供的函數(shù)都會(huì)被調(diào)用『铮可用于完成隨時(shí)間變化的RDD操作董济,也就是說,RDD操作要门,分區(qū)數(shù)量和廣播變量等可以在批次之間改變虏肾。
Window操作
Spark Streaming也提供了windowed計(jì)算廓啊,可在一個(gè)滑動(dòng)window數(shù)據(jù)上應(yīng)用轉(zhuǎn)換。下圖進(jìn)行說明封豪。
如上圖所示谴轮,每次在源DStream滑動(dòng)window,落在window中的源RDDs會(huì)被合并和操作用于產(chǎn)生windowed DStream的RDDs吹埠。在這個(gè)例子中第步,操作應(yīng)用在后面3個(gè)時(shí)間單位的數(shù)據(jù)上,以2個(gè)時(shí)間單位進(jìn)行滑動(dòng)缘琅。任何window操作都需要指定兩個(gè)參數(shù)粘都。
- window長(zhǎng)度 - window持續(xù)時(shí)間(上圖是3)。
- 滑動(dòng)間隔 - window操作執(zhí)行的間隔(上圖是2)刷袍。
這兩個(gè)參數(shù)必須是源DStream批時(shí)間間隔的倍數(shù)翩隧。
下面用一個(gè)示例說明window操作。擴(kuò)展之前的示例呻纹,生成過去30s的單詞計(jì)數(shù)堆生,每10s一次。為實(shí)現(xiàn)這個(gè)功能居暖,必須對(duì)pair
DStream過去30s的數(shù)據(jù)應(yīng)用reduceByKey
顽频。這里使用reduceByKeyAndWindow
操作。
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
一些通用的window操作如下太闺。所有操作都需要兩個(gè)參數(shù),windowLength和slideInterval嘁圈。
轉(zhuǎn)換 | 含義 |
---|---|
window(windowLength, slideInterval) | 返回新的DStream省骂,基于源DStream的windowed批次進(jìn)行計(jì)算。 |
countByWindow(windowLength, slideInterval) | 返回?cái)?shù)據(jù)流中元素的滑動(dòng)窗口計(jì)數(shù)最住。 |
reduceByWindow(func, windowLength, slideInterval) | 返回單元素?cái)?shù)據(jù)流钞澳,使用func函數(shù),聚合滑動(dòng)時(shí)間間隔的數(shù)據(jù)流元素進(jìn)行創(chuàng)建涨缚。 |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | 當(dāng)在(K, V)對(duì)DStream上調(diào)用時(shí)轧粟,返回一個(gè)新的(K, V)對(duì)DStream,在滑動(dòng)window的批次上脓魏,使用給定reduce函數(shù)func通過聚合得到每個(gè)key的value兰吟。注意:默認(rèn)情況下,使用Spark的默認(rèn)并行任務(wù)數(shù)量(本地模式是2茂翔,集群模式根據(jù)配置屬性spark.default.parallelism 確定)混蔼。可傳遞可選參數(shù)numTasks 設(shè)置不同的任務(wù)數(shù)量珊燎。 |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | 上面方法的另一個(gè)版本惭嚣,每個(gè)window的reduce value使用前一個(gè)window的reduce values增量計(jì)算遵湖。通過reduce進(jìn)入滑動(dòng)window的數(shù)據(jù)完成這個(gè)操作,然后逆向reduce離開window的舊數(shù)據(jù)晚吞。舉個(gè)例子延旧,加減keys的數(shù)量作為window slides。但是槽地,只適用于"可逆向reduce函數(shù)"迁沫,也就是說,有對(duì)應(yīng)"逆向reduce"函數(shù)的那些reduce函數(shù)(即invFunc參數(shù))闷盔。和上面的方法類似弯洗,reduce任務(wù)的數(shù)量可以通過可選參數(shù)進(jìn)行配置。注意逢勾,使用這個(gè)操作必須啟用檢查點(diǎn)牡整。 |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | 當(dāng)在(K, V)對(duì)DStream上調(diào)用時(shí),返回一個(gè)新的(K, Long)對(duì)DStream溺拱,每個(gè)key的value是這個(gè)key在滑動(dòng)window中的頻次逃贝。和reduceByKeyAndWindow 類似,reduce任務(wù)的數(shù)量可以通過可選參數(shù)進(jìn)行配置迫摔。 |
Join操作
最后沐扳,值得強(qiáng)調(diào)的是,可以很容易地在Spark Streaming中執(zhí)行不同類型的join句占。
Stream-stream joins
數(shù)據(jù)流可以很容易地與其它數(shù)據(jù)流進(jìn)行join沪摄。
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
這里,在每個(gè)批時(shí)間間隔中纱烘,stream1
生成的RDD都會(huì)和stream2
生成deRDD進(jìn)行join杨拐。也可以使用leftOuterJoin
,rightOuterJoin
和fullOuterJoin
擂啥。另外哄陶,在流的window之間進(jìn)行join非常有用。
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
Stream-dataset joins
之前解釋DStream.transform
操作時(shí)進(jìn)行了說明哺壶。這里是另外一個(gè)windowed數(shù)據(jù)流和數(shù)據(jù)集之間進(jìn)行join的示例屋吨。
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
實(shí)際上,可以動(dòng)態(tài)修改要進(jìn)行join的數(shù)據(jù)集山宾。提供給transform
的函數(shù)會(huì)在每個(gè)批時(shí)間間隔進(jìn)行評(píng)估至扰,然后dataset
指向的當(dāng)前數(shù)據(jù)集。
完整的DStream轉(zhuǎn)換列表在API文件中塌碌。Scala請(qǐng)參考DStream和PairDStreamFunctions渊胸。Java請(qǐng)參考avaDStream和JavaPairDStream。Python請(qǐng)參考DStream台妆。
DStreams輸出操作
輸出操作允許DStream的數(shù)據(jù)寫入到外部系統(tǒng)中翎猛,如數(shù)據(jù)庫或者文件系統(tǒng)胖翰。由于輸出操作實(shí)際上允許轉(zhuǎn)換數(shù)據(jù)被外部系統(tǒng)消費(fèi),所以輸出操作出發(fā)了所有DStream轉(zhuǎn)換的額實(shí)際執(zhí)行(類似RDDs的action)切厘。目前萨咳,定義了如下輸出操作:
輸出操作 | 含義 |
---|---|
print() | 在運(yùn)行streaming應(yīng)用程序的驅(qū)動(dòng)節(jié)點(diǎn)上輸出DStream中每個(gè)批次的前10個(gè)元素。 對(duì)于開發(fā)和調(diào)試非常有用疫稿。Python API要調(diào)用pprint()培他。 |
saveAsTextFiles(prefix, [suffix]) | 將DStream的內(nèi)容存儲(chǔ)為文件。每個(gè)批次時(shí)間間隔的文件名字基于prefix和suffix生成:"prefix-TIME_IN_MS[.suffix]"遗座。 |
saveAsObjectFiles(prefix, [suffix]) | 將DStream的內(nèi)容存儲(chǔ)為Java對(duì)象序列化的SequenceFiles 舀凛。每個(gè)批次時(shí)間間隔的文件名字基于prefix和suffix生成:"prefix-TIME_IN_MS[.suffix]"。Python API不支持途蒋。 |
saveAsHadoopFiles(prefix, [suffix]) | 將DStream的內(nèi)容存儲(chǔ)為Hadoop文件猛遍。每個(gè)批次時(shí)間間隔的文件名字基于prefix和suffix生成:"prefix-TIME_IN_MS[.suffix]"。Python API不支持号坡。 |
foreachRDD(func) | 最通用的輸出操作符懊烤,在stream生成的每個(gè)RDD上應(yīng)用函數(shù)func。這個(gè)函數(shù)應(yīng)該講每個(gè)RDD的數(shù)據(jù)發(fā)送到外部系統(tǒng)宽堆,如保存RDD到文件或者通過網(wǎng)絡(luò)寫入到數(shù)據(jù)庫腌紧。注意,函數(shù)func在運(yùn)行streaming應(yīng)用程序的驅(qū)動(dòng)進(jìn)程中執(zhí)行畜隶,并且函數(shù)中通常會(huì)有RDD actions出發(fā)streaming RDDs的計(jì)算壁肋。 |
使用foreachRDD的設(shè)計(jì)模式
dstream.foreachRDD
是一個(gè)強(qiáng)大的原語,允許數(shù)據(jù)發(fā)送到外部系統(tǒng)籽慢。但是墩划,理解如何正確高效地使用這個(gè)原語非常重要。避免一些常見錯(cuò)誤的方式如下嗡综。
通常寫數(shù)據(jù)到外部系統(tǒng)要求創(chuàng)建連接對(duì)象(如TCP連接到遠(yuǎn)程服務(wù))并使用連接發(fā)送數(shù)據(jù)到遠(yuǎn)程系統(tǒng)。為達(dá)到這個(gè)目的杜漠,開發(fā)者可能大一地在Spark driver創(chuàng)建連接對(duì)象极景,然后嘗試在Spark worker中使用連接來保存RDDs中的記錄。例如:
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach { record =>
connection.send(record) // executed at the worker
}
}
這種做法是錯(cuò)誤驾茴,因?yàn)檫@要求連接對(duì)象要序列化并且發(fā)送到worker中去盼樟。這樣的連接對(duì)象很少跨機(jī)器進(jìn)行傳遞。這個(gè)錯(cuò)誤可能會(huì)顯示為序列化錯(cuò)誤(連接對(duì)象未進(jìn)行序列化)锈至,初始化錯(cuò)誤(連接對(duì)象需要在worker節(jié)點(diǎn)初始化)等等晨缴。正確的方法是在worker節(jié)點(diǎn)創(chuàng)建連接對(duì)象。
但是峡捡,這會(huì)導(dǎo)致另外一個(gè)常見錯(cuò)誤 - 為每條記錄創(chuàng)建一個(gè)新的連接击碗。例如筑悴,
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
創(chuàng)建連接對(duì)象需要時(shí)間和資源開銷。因此稍途,為每條記錄創(chuàng)建和銷毀連接對(duì)象會(huì)引發(fā)不必要的高開銷并會(huì)顯著降低系統(tǒng)的吞吐量阁吝。好的解決防范是使用rdd.foreachPartition
- 創(chuàng)建一個(gè)連接對(duì)象,然后使用它發(fā)送一個(gè)RDD分區(qū)中的所有記錄械拍。
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
這種方式將連接創(chuàng)建的開銷平攤到多條記錄當(dāng)中了突勇。
最后,可以通過在多個(gè)RDDs/批次之間重用連接對(duì)象進(jìn)一步優(yōu)化坷虑〖撞觯可以維護(hù)一個(gè)連接對(duì)象的靜態(tài)池,重用連接對(duì)象將RDDs的多個(gè)批次發(fā)送到外部系統(tǒng)迄损,進(jìn)一步減少開銷定躏。
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
注意,池子中的連接應(yīng)該是按照需要懶惰創(chuàng)建的并且如果一定時(shí)間不用會(huì)超時(shí)海蔽。這實(shí)現(xiàn)了向外部系統(tǒng)發(fā)送數(shù)據(jù)最有效的方式共屈。
其它需要記住的點(diǎn):
- DStream通過輸出操作懶惰執(zhí)行,就想RDDs通過RDD action懶惰執(zhí)行党窜。DStream輸出操作中的RDD actions會(huì)強(qiáng)制接收數(shù)據(jù)的處理拗引。因此,如果應(yīng)用程序沒有任何輸出操作或者有
dstream.foreachRDD()
這種不帶任何RDD action的輸出操作幌衣,什么也不會(huì)執(zhí)行矾削。系統(tǒng)會(huì)簡(jiǎn)單地接收數(shù)據(jù)并丟棄數(shù)據(jù)。 - 默認(rèn)地豁护,輸出操作只會(huì)執(zhí)行一次哼凯。會(huì)按照在應(yīng)用程序的定義順序執(zhí)行。
DataFrame和SQL操作
可對(duì)流數(shù)據(jù)使用DataFrames and SQL操作楚里。需要使用SparkContext創(chuàng)建一個(gè)SparkSession断部。必須這樣做,才能從驅(qū)動(dòng)程序錯(cuò)誤中恢復(fù)重啟班缎。通過創(chuàng)建SparkSession的懶實(shí)例化單例完成蝴光。下面是一個(gè)示例。修改了之前的示例达址,使用DataFrames和SQL生成單詞計(jì)數(shù)蔑祟。每個(gè)RDD會(huì)轉(zhuǎn)換為DataFrame,作為臨時(shí)表注冊(cè)沉唠,然后使用SQL查詢疆虚。
/** DataFrame operations inside your streaming program */
val words: DStream[String] = ...
words.foreachRDD { rdd =>
// Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// Convert RDD[String] to DataFrame
val wordsDataFrame = rdd.toDF("word")
// Create a temporary view
wordsDataFrame.createOrReplaceTempView("words")
// Do word count on DataFrame using SQL and print it
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
}
完整代碼參見source code。
也可以從另外一個(gè)線程在表上執(zhí)行查詢(異步運(yùn)行StreamingContext)。只要保證設(shè)置StreamingContext記住查詢需要的足量數(shù)據(jù)即可径簿。否則StreamingContext罢屈,無法識(shí)別異步查詢,會(huì)在查詢完成前刪除舊的流數(shù)據(jù)牍帚。例如儡遮,如果想查詢上一個(gè)批次的數(shù)據(jù),但是你的查詢可能需要運(yùn)行5分鐘暗赶,則調(diào)用streamingContext.remember(Minutes(5))
鄙币。
關(guān)于DataFrame的更多信息參見DataFrames and SQL。
MLib操作
可以使用MLlib提供的機(jī)器學(xué)習(xí)算法蹂随。首先十嘿,streaming機(jī)器學(xué)習(xí)算法(如Streaming Linear Regression, Streaming KMeans等)岳锁,可以同時(shí)從流數(shù)據(jù)中學(xué)習(xí)并在流數(shù)據(jù)上應(yīng)用模型绩衷。除了這些,對(duì)于更大一類的機(jī)器學(xué)習(xí)算法激率,可以離線學(xué)習(xí)模型(如使用歷史數(shù)據(jù))咳燕,然后將模型應(yīng)用于在線流數(shù)據(jù)。具體參見MLlib乒躺。