在第一章節(jié)系列結構化流的博客文章中渡八,我們展示了怎樣用簡單的方式用結構化流實現端到端的流式ETL程序,將json日志數據轉換成Parquet格式表传货。該文強調構建從各種復雜格式數據源讀入并對數據進行轉換的管道所面臨的挑戰(zhàn)屎鳍。在本篇博文中,我們將深入的研討該問題问裕,并展示如何用Spark SQL內置函數解決數據轉換中面臨的挑戰(zhàn)逮壁。
確切的說,我們將從以下幾個方面進行討論:
有哪些不同數據格式及怎樣權衡
如何簡單的用Spark SQL對這些數據進行處理
針對不同的場景粮宛,如何選擇正確的數據格式
數據源和格式
數據有無數不同的格式窥淆,電子表格可用用XML,CSV,TSV表示,應用程序指標可以以原始文本或JSON格式存儲巍杈。每種場景都有特定的數據格式與此相適應忧饭。在大數據世界,我們一般會遇到
像Parquet筷畦,ORC词裤,Avro,JSON鳖宾,CSV亚斋,SQL以及NoSQL數據源、文本文件攘滩。我們可以將這些數據格式大致分為一下三類:結構化帅刊、半結構化和非結構化。讓我們嘗試了解每個類別的好處和短處漂问。
結構化數據
結構化數據源對數據定義了一種模式赖瞒。通過這些關于底層數據的額外信息女揭,結構化數據源提供高效的存儲和性能。例如栏饮,列式數據存儲Parquet和ORC吧兔,使得從一個列子集中提取數據更加容易。當數據查詢只需要獲取一少部分列的數據時袍嬉,通過遍歷每行數據的方式需要查詢出過多的數據境蔼。基于行的存儲格式伺通,如Avro通過高效的序列化存儲數據提供了存儲優(yōu)勢箍土。但是,這種優(yōu)勢是以復雜性為代價的罐监。例如吴藻,由于結構不夠靈活,模式轉換將成為挑戰(zhàn)弓柱。
非結構化數據
相比之下沟堡,非結構化數據源是任意格式的文本或不包含標記或元數據的二進制對象(例如以逗號分隔的CSV文件)來組織數據。新聞文章矢空,醫(yī)療記錄航罗,圖像斑點,應用日志經常被當成是非結構化數據屁药。這些數據源分類一般需要根據數據的上下文才能解析粥血。因此,需要清楚知道某個文件是圖片還是新聞者祖,才能正確進行解析。大多數數據源都是非結構化的绢彤,要從這些非結構化的數據中獲取數據價值七问,由于其格式本身的笨重,需要經過大量轉換和特征提取技術去解釋這些數據集茫舶,成本較高械巡。
半結構化數據
半結構化數據源是每行記錄一個結構,但不需要對整體記錄有一個全局的模式定義饶氏。因此讥耗,每行記錄是通過其自身的模式信息對其進行擴充。JSON和XML就是其中最流行的例子疹启。半結構化數據的優(yōu)勢在于通過每行記錄自身的描述信息古程,增強了展示數據信息的靈活性。由于有很多輕量級的解析器用于處理這些記錄喊崖,因此半結構化數據格式在很多應用中普遍被使用挣磨,并且在可讀性上存在優(yōu)勢雇逞。但是,它的主要缺陷也在于會產生額外的解析開銷茁裙,不能專門應用于即席查詢塘砸。
用Spark SQL進行數據交換(讀和寫)
在之前的博文中,我們討論了如何將Cloudtrail日志從JSON轉為為Parquet晤锥,以此將即席查詢時間縮短10倍掉蔬。Spark SQL允許用戶從批處理和流式查詢中獲取這些數據源類的數據。它原生的支持在Parquet矾瘾,ORC女轿,JSON,CSV和文本格式中讀取和寫入數據霜威,并且Spark還提供了大量其他數據源連接器的包乙漓。你可以通過JDBC ?DataSource鏈接SQL數據庫蚊荣。
Apache Spark能夠用以下簡單的方式來實現數據交換:
events = spark.readStream \
.format("json") \? ? ? ? ? # or parquet, kafka, orc...
.option() \? ? ? ? ? ? ? ? # format specific options
.schema(my_schema) \? ? ? ? # required
.load("path/to/data")
output = …? ? ? ? ? ? ? ? ? # perform your transformations
output.writeStream \? ? ? ? ? # write out your data
.format("parquet") \
.start("path/to/write")
無論是批處理還是流式數據,我們知道怎樣讀寫不同的數據源格式,但是不同的數據源支持不同的模式和數據類型讲仰。傳統(tǒng)數據庫僅僅支持原始數據類型,而像JSON允許用戶在列中嵌套對象疼进,有數組類型和鍵值對類型诅需。用戶經常需要擦除這些數據類型,以此達到高效存儲和正確表示他們的數據挽绩。幸運的是膛壹,Spark SQL在處理原始類型和復雜數據類型方面非常容易。讓我們看下如何從負責的數據烈性到原始數據類型唉堪,反之亦然模聋。
轉換復雜數據類型
在使用半結構化格式時,通常需要復雜數據類型唠亚,如結構體链方、Map,數組灶搜。舉個例子祟蚀,你可能需要一個日志接口去請求你的web服務器,這個接口請求包含了字符串鍵值對類型的HTTP頭割卖,另外它還包含json格式的form數據前酿,它們包含嵌套字段或數組。有些 數據源格式可能支持或可能不支持復雜數據類型鹏溯。而有些格式通過特定的數據類型來存儲數據從而提供性能優(yōu)勢罢维,例如,當使用Parquet時丙挽,所有結構列都將獲得與頂級列相同的處理方式言津,因此攻人,如果你在一個嵌套字段上進行過濾,將會獲得與處理頂級列一樣的優(yōu)勢悬槽。但是怀吻,maps被視為兩個數組列,因此你將不會收到有效的過濾語義初婆。
我們來看一些關于Spark SQL如何允許你使用一些數據轉換技術來自由的構造你的數據蓬坡。
從嵌套列中查詢
點符號表示訪問結構體或map中的嵌套列
// input
{
"a": {
"b": 1
}
}
Python: events.select("a.b")
Scala: events.select("a.b")
SQL: select a.b from events
// output
{
"b": 1
}
整平結構體
*表示結構體中所有的子字段
// input
{
"a": {
"b": 1,
"c": 2
}
}
Python:? events.select("a.*")
Scala:? events.select("a.*")
SQL:? select a.* from events
// output
{
"b": 1,
"c": 2
}
嵌套列
結構函數或SQL能被用于創(chuàng)建新的結構
// input
{
"a": 1,
"b": 2,
"c": 3
}
Python: events.select(struct(col("a").alias("y")).alias("x"))
Scala: events.select(struct('a as 'y) as 'x)
SQL: select named_struct("y", a) as x from events
// output
{
"x": {
"y": 1
}
}
嵌套所有列
*能被用于包含嵌套結構中的所有列
// input
{
"a": 1,
"b": 2
}
Python: events.select(struct("*").alias("x"))
Scala: events.select(struct("*") as 'x)
SQL: select struct(*) as x from events
// output
{
"x": {
"a": 1,
"b": 2
}
}
從數組或map中選擇一個
getItem( )或[ ] 可以被用于從數據或map中查詢一個元素
// input
{
"a": [1, 2]
}
Python: events.select(col("a").getItem(0).alias("x"))
Scala: events.select('a.getItem(0) as 'x)
SQL: select a[0] as x from events
// output
{ "x": 1 }
// input
{
"a": {
"b": 1
}
}
Python: events.select(col("a").getItem("b").alias("x"))
Scala: events.select('a.getItem("b") as 'x)
SQL: select a['b'] as x from events
// output
{ "x": 1 }
將數組或map中的每一個元素創(chuàng)建一行
explode( )用于創(chuàng)建為數組或map中的元素創(chuàng)建一個新的行,這和HiveQL中的LATERAL VIEW EXPLODE和相似
// input
{
"a": [1, 2]
}
Python: events.select(explode("a").alias("x"))
Scala: events.select(explode('a) as 'x)
SQL: select explode(a) as x from events
// output
[{ "x": 1 }, { "x": 2 }]
// input
{
"a": {
"b": 1,
"c": 2
}
}
Python: events.select(explode("a").alias("x", "y"))
Scala: events.select(explode('a) as Seq("x", "y"))
SQL: select explode(a) as (x, y) from events
// output
[{ "x": "b", "y": 1 }, { "x": "c", "y": 2 }]
將多行數據收集成一個數組
collect_list()和collect_set()能被用于將多個項聚合到一個數組
// input
[{ "x": 1 }, { "x": 2 }]
Python: events.select(collect_list("x").alias("x"))
Scala: events.select(collect_list('x) as 'x)
SQL: select collect_list(x) as x from events
// output
{ "x": [1, 2] }
// input
[{ "x": 1, "y": "a" }, { "x": 2, "y": "b" }]
Python: events.groupBy("y").agg(collect_list("x").alias("x"))
Scala: events.groupBy("y").agg(collect_list('x) as 'x)
SQL: select y, collect_list(x) as x from events group by y
// output
[{ "y": "a", "x": [1]}, { "y": "b", "x": [2]}]
從數組的每個項查詢一個字段
當你用點符號作用于數組上時磅叛,將會返回一個新的數組屑咳,該數組的字段是從源數據組中選擇的每一個元素
// input
{
"a": [
{"b": 1},
{"b": 2}
]
}
Python: events.select("a.b")
Scala: events.select("a.b")
SQL: select a.b from events
// output
{
"b": [1, 2]
}
強大的to_json和from_json
如果你真的想保留列的復雜結構,但是你需要將其編碼為字符串嗎?Spark SQL提供了像to_json這樣的函數用于將結構數據編碼成字符串弊琴,from_json()用于將結構體恢復成復雜類型兆龙。從流式數據源像kafka中讀取或寫入數據時,用JSON字符串存儲列非常有用敲董,每一個kafka的鍵值對記錄將被增加一些元數據紫皇,比如將時間戳、位置信息注入到kafka腋寨。如果value字段數據的是JSON格式的聪铺,你就能用from_json去抽提取你的數據,豐富它萄窜,清理它铃剔,然后將其再次推向下游kafka或寫入文件。
用JSON編碼結構體
to_json()能被用于將結構體轉換成JSON字符串查刻。當你在寫入數據至kafka,需要重新編碼多個列的數據成一個列時键兜,這個方法非常有用。此方法目前在SQL中還不能用穗泵。
//?input{
"a":?{
"b":?1
}}
Python:?events.select(to_json("a").alias("c"))
Scala:?events.select(to_json('a)?as?'c)
//?output{
"c":?"{\"b\":1}"}
解碼json列為一個結構體
from_json()方法能被用于轉換JSON字符串列數據為結構體普气,那么你可以按照上述方式展平結構體,使其具有單獨的列火欧。該方法目前也不能用于SQL.
//?input{
"a":?"{\"b\":1}"}
Python:
schema?=?StructType().add("b",?IntegerType())
events.select(from_json("a",?schema).alias("c"))Scala:
val?schema?=?new?StructType().add("b",?IntegerType)
events.select(from_json('a,?schema)?as?'c)
//?output{
"c":?{
"b":?1
}}
有些時候你想保留JSON 字符串的一部分仍舊為JSON棋电,以避免模式中的復雜性過高茎截。
//?input{
"a":?"{\"b\":{\"x\":1,\"y\":{\"z\":2}}}"}
Python:
schema?=?StructType().add("b",?StructType().add("x",?IntegerType())
.add("y",?StringType()))
events.select(from_json("a",?schema).alias("c"))Scala:
val?schema?=?new?StructType().add("b",?new?StructType().add("x",?IntegerType)
.add("y",?StringType))
events.select(from_json('a,?schema)?as?'c)
//?output{
"c":?{
"b":?{
"x":?1,
"y":?"{\"z\":2}"
}
}}
從包含JSON的列中解析字段集合
json_tuple()能從包含JSON數據的字符串類型列中抽取可用的字段
//?input{
"a":?"{\"b\":1}"}
Python:?events.select(json_tuple("a",?"b").alias("c"))Scala:? events.select(json_tuple('a,?"b")?as?'c)
SQL:? ? select?json_tuple(a,?"b")?as?c?from?events
//?output{?"c":?1?}
有時候字符串列并不完全符合JSON的描述格式苇侵,但是仍舊有良好的結構。例如企锌,Log4j生成的日志消息格式榆浓。Spark SQL能輕松的將這些字符串進行結構化。
解析格式良好的字符串列
regexp_extract() 使用正則表達式來解析字符串數據
//?input[{?"a":?"x:?1"?},?{?"a":?"y:?2"?}]
Python:?events.select(regexp_extract("a",?"([a-z]):",?1).alias("c"))Scala:? events.select(regexp_extract('a,?"([a-z]):",?1)?as?'c)
SQL:? ? select?regexp_extract(a,?"([a-z]):",?1)?as?c?from?events
//?output[{?"c":?"x"?},?{?"c":?"y"?}]
以上有很多種轉換撕攒,現在我們來看一下現實生活中的一些用例陡鹃,把所有這些數據格式和數據處理能力用到恰到好處烘浦。
利用好所有的這些強大的轉換能力
在Databricks,我們收集服務的日志信息萍鲸,并在客戶受到影響之前使用它們執(zhí)行實時監(jiān)控以檢測問題闷叉。日志文件是非結構化的文件,但是由于是以良好的Log4j格式定義的脊阴,因此文件是可解析的握侧。我們運行一個日志收集服務,將每行日志和額外的元數據信息以JSON的格式發(fā)送至kinesis嘿期。JSON格式的記錄批量上傳至S3作為文件品擎。通過直接查詢這些JSON日志去發(fā)現問題是非常乏味的:因為對于回答任務和查詢,這些文件會包含重復項备徐,即使它涉及單個列萄传,整個JSON記錄也可能需要反序列化。
為了解決這個問題蜜猾,我們執(zhí)行一個管道去讀取這些JSON記錄秀菱,并對元數據執(zhí)行數據刪除。這樣我們就只剩下那些JSON格式或非結構化文本的原始日志信息瓣铣。如果我們需要處理JSON答朋,可以使用from_json()和上面描述的一些其它轉換去格式化我們的數據。如果是文本棠笑,我們可以用regexp_extract()去解析Log4j格式成一個更加結構化的形式梦碗。一旦完成了我們所有的轉換和重組,我們將記錄按日期分區(qū)保存在Parquet中蓖救。這將能從日志中發(fā)現的時間提高10至100倍洪规。
不再需要花很大的代價去反序列化JSON記錄
不在需要對原始日志信息進行復雜的字符串對比
僅僅需要在查詢中抽取兩個列:時間和日志級別
下面是很多客戶提供的通用用例場景:
“我想用我的數據運行機器學習管道。 我的數據已經被預處理循捺,我將在整個管道中使用我的所有功能”
當你要訪問數據的所有行斩例,Avro是一個不錯的選擇:
“我有一個IoT用例,我的傳感器發(fā)送給我的事件从橘。 對于每個事件念赶,元數據的重要性是不同的”
有些場景你的模式需要更好的靈活性,也許你可以考慮使用JSON存儲你的數據恰力。
“我想在報紙文章或情感分析上對產品評論進行語音識別算法”
如果你的數據沒有固定的模式叉谜,也不是固定的模式或結構,那么將其存儲為純文本可能更易于使用踩萎。有可能你又一個管道停局,它在分結構化數據上進行特征提取,并將其存儲為Avro,為你的機器學習流程做準備。
結論
在這片博文中董栽,我們探討了Spark SQL如何使用許多來源和格式的數據码倦,并輕松的執(zhí)行這些數據格式之間的轉換和交換。我們分享了Databrics是如何策劃數據锭碳,并考慮了其它生產用例袁稽,因為你可能想做不同事情。
Spark SQL為您提供必要的工具擒抛,以便以任何形式訪問您的數據运提,無論其格式如何,都可以為下游應用程序做準備闻葵,并且在流式數據上具有低延時民泵,歷史數據上具有高吞吐量。