Spark The Definitive Guide(Spark權(quán)威指南) 中文版错洁。本書詳細介紹了Spark2.x版本的各個模塊秉宿,目前市面上最好的Spark2.x學(xué)習(xí)書籍!M筒辍描睦!
關(guān)注:登峰大數(shù)據(jù),閱讀中文Spark權(quán)威指南(完整版)导而,系統(tǒng)學(xué)習(xí)Spark大數(shù)據(jù)框架忱叭!
如果您覺得作者翻譯的內(nèi)容有幫助,請分享給更多人今艺。您的分享韵丑,是作者翻譯的動力
本書前幾章已經(jīng)從用戶的角度介紹了結(jié)構(gòu)化流。這自然是應(yīng)用程序的核心虚缎。本章將介紹在開發(fā)應(yīng)用程序之后撵彻,在生產(chǎn)環(huán)境中穩(wěn)定運行結(jié)構(gòu)化流所需的一些操作工具。在Apache Spark 2.2.0中,結(jié)構(gòu)化流被標(biāo)記為可生產(chǎn)的千康,這意味著該版本具有生產(chǎn)使用所需的所有特性和穩(wěn)定的API享幽。許多組織已經(jīng)在生產(chǎn)中使用該系統(tǒng),因為坦率地說拾弃,它與運行其他生產(chǎn)Spark應(yīng)用程序沒有太大的不同值桩。的確,通過諸如事務(wù)source/sink和精確的一次處理等特性豪椿,結(jié)構(gòu)化流設(shè)計人員試圖使其盡可能易于操作奔坟。本章將帶您了解一些特定于結(jié)構(gòu)化流的關(guān)鍵操作任務(wù)。這將補充我們在第2部分中看到和了解到的有關(guān)Spark操作的所有內(nèi)容搭盾。
23.1.容錯和檢查點
流應(yīng)用程序最重要的操作關(guān)注點是故障恢復(fù)咳秉。錯誤是不可避免的:例如丟失集群中的一臺機器,模式將在沒有適當(dāng)遷移的情況下意外更改鸯隅,甚至可能故意重啟集群或應(yīng)用程序澜建。在這些情況下,結(jié)構(gòu)化流允許您通過重新啟動應(yīng)用程序來恢復(fù)應(yīng)用程序蝌以。為此炕舵,必須將應(yīng)用程序配置為使用檢查點和WAL預(yù)寫日志,這兩個操作都由引擎自動處理跟畅。具體地說咽筋,您必須配置一個查詢來寫入可靠文件系統(tǒng)(例如,HDFS徊件、S3或任何兼容的文件系統(tǒng))上的檢查點位置奸攻。然后,結(jié)構(gòu)化流將定期將所有相關(guān)的進度信息(例如虱痕,給定觸發(fā)器中處理的偏移量范圍)以及當(dāng)前中間狀態(tài)值保存到檢查點位置睹耐。在失敗場景中,您只需重新啟動應(yīng)用程序部翘,確保指向相同的檢查點位置疏橄,它就會自動恢復(fù)其狀態(tài),并從停止的地方開始處理數(shù)據(jù)略就。您不必代表應(yīng)用程序手動管理此狀態(tài)----結(jié)構(gòu)化流媒體為您做這些功能。
要使用檢查點晃酒,請在通過writeStream上的checkpointLocation選項啟動應(yīng)用程序之前指定檢查點位置表牢。你可以這樣做:
// in Scalaval static = spark.read.json("/data/activity-data")val streaming = spark? .readStream? .schema(static.schema)? .option("maxFilesPerTrigger", 10)? .json("/data/activity-data")? .groupBy("gt")? .count()val query = streaming? .writeStream? .outputMode("complete")? .option("checkpointLocation", "/some/location/")? .queryName("test_stream")? .format("memory")? .start()# in Pythonstatic = spark.read.json("/data/activity-data")streaming = spark\? .readStream\? .schema(static.schema)\? .option("maxFilesPerTrigger", 10)\.json("/data/activity-data")\? .groupBy("gt")\? .count()query = streaming\? .writeStream\? .outputMode("complete")\? .option("checkpointLocation", "/some/python/location/")\? .queryName("test_python_stream")\? .format("memory")\??.start()
如果丟失檢查點目錄或其中的信息,應(yīng)用程序?qū)o法從失敗中恢復(fù)贝次,您將不得不從頭開始重新啟動流宇整。
23.2.更新程序
為了在生產(chǎn)環(huán)境中運行應(yīng)用程序华烟,啟用檢查點可能是最重要的儒旬。這是因為檢查點將存儲到目前為止您的流所處理的所有信息现喳,以及它可能存儲的中間狀態(tài)。然而憔涉,檢查點確實帶來了一個小陷阱——當(dāng)您更新流應(yīng)用程序時,您將不得不對舊檢查點數(shù)據(jù)進行推理。當(dāng)您更新您的應(yīng)用程序時掏父,您必須確保您的更新不是破壞性的更改。當(dāng)我們查看這兩種類型的更新時秆剪,我們將詳細討論這些內(nèi)容:對應(yīng)用程序代碼的更新或運行新的Spark版本赊淑。
23.2.1.對應(yīng)用程序代碼的更新
結(jié)構(gòu)化流的設(shè)計允許在應(yīng)用程序重啟之間對應(yīng)用程序代碼進行某些類型的更改。最重要的是仅讽,允許您更改用戶定義函數(shù)(udf)陶缺,只要它們具有相同的類型簽名。這個特性對于bug修復(fù)非常有用洁灵。例如饱岸,假設(shè)應(yīng)用程序開始接收一種新類型的數(shù)據(jù),并且當(dāng)前邏輯中的數(shù)據(jù)解析函數(shù)之一崩潰徽千。使用結(jié)構(gòu)化流苫费,您可以使用該函數(shù)的新版本重新編譯應(yīng)用程序,并在流中它之前崩潰的同一位置重新編譯罐栈。
雖然添加新列或更改UDF之類的小調(diào)整不會破壞更改黍衙,也不需要新的檢查點目錄,但是較大的更改確實需要一個全新的檢查點目錄荠诬。例如琅翻,如果更新流應(yīng)用程序以添加新的聚合鍵或從根本上更改查詢本身,Spark就不能從舊檢查點目錄為新查詢構(gòu)造所需的狀態(tài)柑贞。在這些情況下方椎,結(jié)構(gòu)化流將拋出一個異常,說它不能從檢查點目錄開始钧嘶,您必須從頭開始使用一個新的(空的)目錄作為檢查點位置棠众。
23.2.2.運行新的Spark版本
結(jié)構(gòu)化的流處理應(yīng)用程序應(yīng)該能夠從一個舊的檢查點目錄重新啟動到Spark的補丁版本更新(例如,從Spark 2.2.0到2.2.1再到2.2.2)有决。檢查點格式被設(shè)計為向前兼容的闸拿,所以它可能被破壞的唯一方法是由于關(guān)鍵的錯誤修復(fù)。如果Spark發(fā)行版不能從舊檢查點恢復(fù)书幕,那么它的發(fā)行說明中將清楚地記錄這一點新荤。結(jié)構(gòu)化流媒體開發(fā)人員還希望在較小的版本更新(如Spark 2.2)之間保持格式的兼容性。但是您應(yīng)該檢查發(fā)布說明台汇,看看是否支持每次升級苛骨。在這兩種情況下篱瞎,如果不能從檢查點啟動,都需要使用新的檢查點目錄重新啟動應(yīng)用程序痒芝。
23.2.3.調(diào)整應(yīng)用程序的規(guī)模大小
通常俐筋,集群的大小應(yīng)該能夠輕松地處理高于數(shù)據(jù)速率的突發(fā)事件。下面討論應(yīng)用程序和集群中應(yīng)該監(jiān)控的關(guān)鍵指標(biāo)严衬。一般來說澄者,如果您看到您的輸入速率遠遠高于您的處理速率(稍后將進行詳細說明),那么就該擴展您的集群或應(yīng)用程序了瞳步。根據(jù)您的資源管理器和部署闷哆,您可能只是能夠動態(tài)地將executor添加到應(yīng)用程序中。當(dāng)需要時单起,您可以用同樣的方法縮小應(yīng)用程序的規(guī)谋д——刪除執(zhí)行器(可能通過您的云提供商)或使用更低的資源計數(shù)重新啟動應(yīng)用程序。這些更改可能會導(dǎo)致一些處理延遲(當(dāng)刪除執(zhí)行器時嘀倒,將重新計算數(shù)據(jù)或重新分配分區(qū))屈留。最后,是否值得創(chuàng)建一個具有更復(fù)雜的資源管理功能的系統(tǒng)是一個業(yè)務(wù)決策测蘑。
雖然對集群或應(yīng)用程序進行底層基礎(chǔ)設(shè)施更改有時是必要的灌危,但有時更改可能只需要重新啟動應(yīng)用程序或使用新配置的流。例如在流處理程序運行過程中碳胳,更改 spark.sql.shuffle.partitions參數(shù)是不會生效的勇蝙。這需要重新啟動實際的流處理程序,而不一定是整個應(yīng)用程序挨约。較重的更改(如更改任意Spark應(yīng)用程序配置)可能需要重新啟動應(yīng)用程序味混。
23.3.度量和監(jiān)控
流應(yīng)用程序中的度量和監(jiān)視與使用第18章中描述的工具的一般Spark應(yīng)用程序基本相同。不過诫惭,結(jié)構(gòu)化流確實添加了一些更具體的內(nèi)容翁锡,以幫助您更好地理解應(yīng)用程序的狀態(tài)。您可以使用兩個關(guān)鍵api來查詢流查詢的狀態(tài)并查看其最近的執(zhí)行進度夕土。使用這兩個api馆衔,您可以了解您的流是否按預(yù)期運行。
23.3.1.Query Status 查詢狀態(tài)
查詢狀態(tài)是最基本的監(jiān)控API怨绣,因此它是一個很好的起點角溃。它的目的是回答這個問題:“我的流現(xiàn)在正在執(zhí)行什么處理?” 此信息在startStream返回的查詢對象的status字段中報告。例如篮撑,您可能有一個簡單的計數(shù)流开镣,它提供了由以下查詢定義的物聯(lián)網(wǎng)設(shè)備的計數(shù)(這里我們只是使用了與前一章相同的查詢,沒有初始化代碼):
query.status
要獲得給定查詢的狀態(tài)咽扇,只需運行命令query.status將返回流的當(dāng)前狀態(tài)邪财。這為我們提供了關(guān)于流中在那個時間點上發(fā)生的事情的詳細信息。下面是查詢此狀態(tài)時將返回的示例:
{? "message" : "Getting offsets from ...",? "isDataAvailable" : true,? "isTriggerActive" : true}
上面的代碼片段描述了從結(jié)構(gòu)化流數(shù)據(jù)源獲取偏移量(因此描述獲取偏移量的消息)质欲。有多種消息描述流的狀態(tài)树埠。
注意我們以在Spark shell中調(diào)用的方式在這里內(nèi)聯(lián)顯示了status命令。但是嘶伟,對于獨立應(yīng)用程序怎憋,您可能沒有附加shell來在流程中運行任意代碼。在這種情況下九昧,您可以通過實現(xiàn)監(jiān)控服務(wù)器來公開它的狀態(tài)绊袋,例如一個小型HTTP服務(wù)器,它監(jiān)聽端口并返回查詢铸鹰。獲取請求時的狀態(tài)癌别。或者蹋笼,您可以使用稍后描述的更豐富的StreamingQueryListener API來偵聽更多事件展姐。
23.3.2.Recent Progress當(dāng)前進展
雖然可以查看查詢的當(dāng)前狀態(tài),但是查看查詢過程的能力同樣重要剖毯。progress API允許我們回答諸如“我處理元組的速度是多少?”或者“元組從源到達的速度有多快?” 通過運行query.recentProgress命令圾笨,流查詢過程還包括關(guān)于流內(nèi)部的輸入源和輸出接收器的信息。
query.recentProgress
這是我們運行之前的代碼后Scala版本的結(jié)果;Python版本將是類似的:
Array({? "id" : "d9b5eac5-2b27-4655-8dd3-4be626b1b59b",? "runId" : "f8da8bc7-5d0a-4554-880d-d21fe43b983d",? "name" : "test_stream",? "timestamp" : "2017-08-06T21:11:21.141Z",? "numInputRows" : 780119,? "processedRowsPerSecond" : 19779.89350912779,? "durationMs" : {? ? "addBatch" : 38179,? ? "getBatch" : 235,? ? "getOffset" : 518,? ? "queryPlanning" : 138,? ? "triggerExecution" : 39440,? ? "walCommit" : 312? },? "stateOperators" : [ {? ? "numRowsTotal" : 7,? ? "numRowsUpdated" : 7? } ],? "sources" : [ {? ? "description" : "FileStreamSource[/some/stream/source/]",? ? "startOffset" : null,? ? "endOffset" : {? ? ? "logOffset" : 0? ? },? ? "numInputRows" : 780119,? ? "processedRowsPerSecond" : 19779.89350912779? } ],? "sink" : {? ? "description" : "MemorySink"? }})
正如您從剛才顯示的輸出中所看到的逊谋,這包括關(guān)于流狀態(tài)的許多細節(jié)擂达。需要注意的是,這是一個實時快照(根據(jù)查詢進度的時間)胶滋。為了一致地獲得關(guān)于流狀態(tài)的輸出板鬓,您需要反復(fù)查詢這個API以獲得更新后的狀態(tài)。前面輸出中的大多數(shù)字段應(yīng)該是不言自明的镀钓。但是穗熬,讓我們詳細回顧一些更重要的字段。
Input rate and processing rate輸入速率和處理速率
輸入速率指定有多少數(shù)據(jù)從輸入源流向結(jié)構(gòu)化流丁溅。處理速率是應(yīng)用程序分析數(shù)據(jù)的速度唤蔗。在理想情況下,輸入和處理速率應(yīng)該同時變化窟赏。另一種情況可能是輸入速率遠遠大于處理速率妓柜。當(dāng)這種情況發(fā)生時,流就會落后涯穷,您需要將集群擴展到更高的級別來處理更大的負載棍掐。
Batch duration批次間隔
幾乎所有的流系統(tǒng)都利用批處理以任何合理的吞吐量進行操作(有些系統(tǒng)可以選擇高延遲來換取較低的吞吐量)。結(jié)構(gòu)化流實現(xiàn)了這兩種功能拷况。當(dāng)它對數(shù)據(jù)進行操作時作煌,您可能會看到批處理持續(xù)時間隨著結(jié)構(gòu)化流處理事件數(shù)量的變化而振蕩掘殴。當(dāng)然,當(dāng)連續(xù)處理引擎成為執(zhí)行選項時粟誓,這個度量將幾乎沒有相關(guān)性奏寨。
提示通常,將批處理持續(xù)時間鹰服、輸入和處理速率的變化可視化是最佳實踐病瞳。它比簡單地報告隨時間的變化更有幫助。
23.3.3.Spark UI
Spark web UI(在第18章中詳細介紹)還顯示了結(jié)構(gòu)化流應(yīng)用程序的任務(wù)悲酷、作業(yè)和數(shù)據(jù)處理指標(biāo)套菜。在Spark UI上,每個流應(yīng)用程序都將顯示為一系列短作業(yè)设易,每個觸發(fā)器對應(yīng)一個短作業(yè)逗柴。但是,您可以使用相同的UI查看來自應(yīng)用程序的度量亡嫌、查詢計劃嚎于、任務(wù)持續(xù)時間和日志。與DStream API不同的一點是挟冠,流選項卡不用于結(jié)構(gòu)化流于购。
23.4.預(yù)警
理解和查看結(jié)構(gòu)化流查詢的指標(biāo)是重要的第一步。然而知染,這需要不斷地監(jiān)控儀表板或度量肋僧,以發(fā)現(xiàn)潛在的問題。您將需要健壯的自動警報控淡,以便在不手動監(jiān)控作業(yè)的情況下嫌吠,在作業(yè)失敗或無法跟上輸入數(shù)據(jù)速率時通知您。有幾種方法可以將現(xiàn)有的警報工具集成到Spark中掺炭,通常是基于我們前面介紹的recent progress API辫诅。例如,您可以直接將度量數(shù)據(jù)提供給監(jiān)控系統(tǒng)涧狮,比如開源Coda Hale度量庫或Prometheus炕矮,或者您可以簡單地對它們進行日志記錄,并使用Splunk這樣的日志聚合系統(tǒng)者冤。除了監(jiān)控和警告查詢之外肤视,還需要監(jiān)控和警告集群和整個應(yīng)用程序的狀態(tài)(如果同時運行多個查詢)。
23.5.使用流監(jiān)聽器進行高級監(jiān)控
我們已經(jīng)討論了結(jié)構(gòu)化流中的一些高級監(jiān)控工具涉枫。使用一些粘合邏輯邢滑,您可以使用status和queryProgress api將監(jiān)控事件輸出到您選擇的監(jiān)控平臺(例如,日志聚合系統(tǒng)或Prometheus儀表板) 除了這些方法之外愿汰,還有一種更底層但更強大的方法來觀察應(yīng)用程序的執(zhí)行:StreamingQueryListener類困后。
StreamingQueryListener類允許您從流查詢接收異步更新乐纸,以便自動將此信息輸出到其他系統(tǒng),并實現(xiàn)健壯的監(jiān)控和警報機制操灿。首先開發(fā)自己的對象來擴展StreamingQueryListener锯仪,然后將其附加到正在運行的SparkSession。一旦您使用sparkssession .streams. addlistener()附加自定義偵聽器趾盐,當(dāng)查詢啟動或停止時,或者在活動查詢上取得進展時小腊,您的類將收到通知救鲤。下面是結(jié)構(gòu)化流文檔中偵聽器的一個簡單示例:
val spark: SparkSession = ...spark.streams.addListener(new StreamingQueryListener() {? ? override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {? ? ? ? println("Query started: " + queryStarted.id)? ? }? ? override def onQueryTerminated(? ? ? queryTerminated: QueryTerminatedEvent): Unit = {? ? ? ? println("Query terminated: " + queryTerminated.id)? ? }? ? override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {? ? ? ? println("Query made progress: " + queryProgress.progress)? ? }})
流監(jiān)聽器允許您使用自定義代碼處理每個進度更新或狀態(tài)更改,并將其傳遞給外部系統(tǒng)秩冈。例如本缠,StreamingQueryListener的以下代碼將把所有查詢進度信息轉(zhuǎn)發(fā)給Kafka。從Kafka讀取數(shù)據(jù)后入问,您必須解析這個JSON字符串丹锹,以便訪問實際的指標(biāo):
classKafkaMetrics(servers: String) extends StreamingQueryListener {valkafkaProperties = new Properties()? kafkaProperties.put("bootstrap.servers",? ? servers)? kafkaProperties.put("key.serializer","kafkashaded.org.apache.kafka.common.serialization.StringSerializer")? kafkaProperties.put("value.serializer","kafkashaded.org.apache.kafka.common.serialization.StringSerializer")valproducer = new KafkaProducer[String, String](kafkaProperties)importorg.apache.spark.sql.streaming.StreamingQueryListenerimportorg.apache.kafka.clients.producer.KafkaProduceroverridedef onQueryProgress(event:StreamingQueryListener.QueryProgressEvent):Unit= {producer.send(new ProducerRecord("streaming-metrics",? ? ? event.progress.json))? }overridedef onQueryStarted(event:StreamingQueryListener.QueryStartedEvent):Unit= {}overridedef onQueryTerminated(event:StreamingQueryListener.QueryTerminatedEvent):Unit= {}}
使用StreamingQueryListener接口,您甚至可以通過在同一個(或另一個)集群上運行結(jié)構(gòu)化流應(yīng)用程序來監(jiān)控一個集群上的結(jié)構(gòu)化流應(yīng)用程序芬失。您還可以用這種方式管理多個流楣黍。
23.6.結(jié)束語
在本章中,我們討論了在生產(chǎn)環(huán)境中運行結(jié)構(gòu)化流所需的主要工具:容錯檢查點和各種監(jiān)控api棱烂,這些api允許您觀察應(yīng)用程序如何運行租漂。幸運的是,如果您已經(jīng)在生產(chǎn)環(huán)境中運行Spark颊糜,那么許多概念和工具都是類似的哩治,因此您應(yīng)該能夠重用大量現(xiàn)有知識。請務(wù)必檢查第4部分衬鱼,以查看監(jiān)控Spark應(yīng)用程序的其他一些有用工具业筏。