Spark2.1中用結構化流處理復雜的數據格式(譯)

在第一章節(jié)系列結構化流的博客文章中渡八,我們展示了怎樣用簡單的方式用結構化流實現端到端的流式ETL程序,將json日志數據轉換成Parquet格式表传货。該文強調構建從各種復雜格式數據源讀入并對數據進行轉換的管道所面臨的挑戰(zhàn)屎鳍。在本篇博文中,我們將深入的研討該問題问裕,并展示如何用Spark SQL內置函數解決數據轉換中面臨的挑戰(zhàn)逮壁。

確切的說,我們將從以下幾個方面進行討論:

有哪些不同數據格式及怎樣權衡

如何簡單的用Spark SQL對這些數據進行處理

針對不同的場景粮宛,如何選擇正確的數據格式

數據源和格式

數據有無數不同的格式窥淆,電子表格可用用XML,CSV,TSV表示,應用程序指標可以以原始文本或JSON格式存儲巍杈。每種場景都有特定的數據格式與此相適應忧饭。在大數據世界,我們一般會遇到

像Parquet筷畦,ORC词裤,Avro,JSON鳖宾,CSV亚斋,SQL以及NoSQL數據源、文本文件攘滩。我們可以將這些數據格式大致分為一下三類:結構化帅刊、半結構化和非結構化。讓我們嘗試了解每個類別的好處和短處漂问。

結構化數據

結構化數據源對數據定義了一種模式赖瞒。通過這些關于底層數據的額外信息女揭,結構化數據源提供高效的存儲和性能。例如栏饮,列式數據存儲Parquet和ORC吧兔,使得從一個列子集中提取數據更加容易。當數據查詢只需要獲取一少部分列的數據時袍嬉,通過遍歷每行數據的方式需要查詢出過多的數據境蔼。基于行的存儲格式伺通,如Avro通過高效的序列化存儲數據提供了存儲優(yōu)勢箍土。但是,這種優(yōu)勢是以復雜性為代價的罐监。例如吴藻,由于結構不夠靈活,模式轉換將成為挑戰(zhàn)弓柱。

非結構化數據

相比之下沟堡,非結構化數據源是任意格式的文本或不包含標記或元數據的二進制對象(例如以逗號分隔的CSV文件)來組織數據。新聞文章矢空,醫(yī)療記錄航罗,圖像斑點,應用日志經常被當成是非結構化數據屁药。這些數據源分類一般需要根據數據的上下文才能解析粥血。因此,需要清楚知道某個文件是圖片還是新聞者祖,才能正確進行解析。大多數數據源都是非結構化的绢彤,要從這些非結構化的數據中獲取數據價值七问,由于其格式本身的笨重,需要經過大量轉換和特征提取技術去解釋這些數據集茫舶,成本較高械巡。

半結構化數據

半結構化數據源是每行記錄一個結構,但不需要對整體記錄有一個全局的模式定義饶氏。因此讥耗,每行記錄是通過其自身的模式信息對其進行擴充。JSON和XML就是其中最流行的例子疹启。半結構化數據的優(yōu)勢在于通過每行記錄自身的描述信息古程,增強了展示數據信息的靈活性。由于有很多輕量級的解析器用于處理這些記錄喊崖,因此半結構化數據格式在很多應用中普遍被使用挣磨,并且在可讀性上存在優(yōu)勢雇逞。但是,它的主要缺陷也在于會產生額外的解析開銷茁裙,不能專門應用于即席查詢塘砸。

用Spark SQL進行數據交換(讀和寫)

在之前的博文中,我們討論了如何將Cloudtrail日志從JSON轉為為Parquet晤锥,以此將即席查詢時間縮短10倍掉蔬。Spark SQL允許用戶從批處理和流式查詢中獲取這些數據源類的數據。它原生的支持在Parquet矾瘾,ORC女轿,JSON,CSV和文本格式中讀取和寫入數據霜威,并且Spark還提供了大量其他數據源連接器的包乙漓。你可以通過JDBC ?DataSource鏈接SQL數據庫蚊荣。

Apache Spark能夠用以下簡單的方式來實現數據交換:

events = spark.readStream \

.format("json") \? ? ? ? ? # or parquet, kafka, orc...

.option() \? ? ? ? ? ? ? ? # format specific options

.schema(my_schema) \? ? ? ? # required

.load("path/to/data")

output = …? ? ? ? ? ? ? ? ? # perform your transformations

output.writeStream \? ? ? ? ? # write out your data

.format("parquet") \

.start("path/to/write")

無論是批處理還是流式數據,我們知道怎樣讀寫不同的數據源格式,但是不同的數據源支持不同的模式和數據類型讲仰。傳統(tǒng)數據庫僅僅支持原始數據類型,而像JSON允許用戶在列中嵌套對象疼进,有數組類型和鍵值對類型诅需。用戶經常需要擦除這些數據類型,以此達到高效存儲和正確表示他們的數據挽绩。幸運的是膛壹,Spark SQL在處理原始類型和復雜數據類型方面非常容易。讓我們看下如何從負責的數據烈性到原始數據類型唉堪,反之亦然模聋。

轉換復雜數據類型


在使用半結構化格式時,通常需要復雜數據類型唠亚,如結構體链方、Map,數組灶搜。舉個例子祟蚀,你可能需要一個日志接口去請求你的web服務器,這個接口請求包含了字符串鍵值對類型的HTTP頭割卖,另外它還包含json格式的form數據前酿,它們包含嵌套字段或數組。有些 數據源格式可能支持或可能不支持復雜數據類型鹏溯。而有些格式通過特定的數據類型來存儲數據從而提供性能優(yōu)勢罢维,例如,當使用Parquet時丙挽,所有結構列都將獲得與頂級列相同的處理方式言津,因此攻人,如果你在一個嵌套字段上進行過濾,將會獲得與處理頂級列一樣的優(yōu)勢悬槽。但是怀吻,maps被視為兩個數組列,因此你將不會收到有效的過濾語義初婆。

我們來看一些關于Spark SQL如何允許你使用一些數據轉換技術來自由的構造你的數據蓬坡。

從嵌套列中查詢

點符號表示訪問結構體或map中的嵌套列

// input

{

"a": {

"b": 1

}

}

Python: events.select("a.b")

Scala: events.select("a.b")

SQL: select a.b from events

// output

{

"b": 1

}

整平結構體

*表示結構體中所有的子字段

// input

{

"a": {

"b": 1,

"c": 2

}

}

Python:? events.select("a.*")

Scala:? events.select("a.*")

SQL:? select a.* from events

// output

{

"b": 1,

"c": 2

}

嵌套列

結構函數或SQL能被用于創(chuàng)建新的結構

// input

{

"a": 1,

"b": 2,

"c": 3

}

Python: events.select(struct(col("a").alias("y")).alias("x"))

Scala: events.select(struct('a as 'y) as 'x)

SQL: select named_struct("y", a) as x from events

// output

{

"x": {

"y": 1

}

}

嵌套所有列

*能被用于包含嵌套結構中的所有列

// input

{

"a": 1,

"b": 2

}

Python: events.select(struct("*").alias("x"))

Scala: events.select(struct("*") as 'x)

SQL: select struct(*) as x from events

// output

{

"x": {

"a": 1,

"b": 2

}

}

從數組或map中選擇一個

getItem( )或[ ] 可以被用于從數據或map中查詢一個元素

// input

{

"a": [1, 2]

}

Python: events.select(col("a").getItem(0).alias("x"))

Scala: events.select('a.getItem(0) as 'x)

SQL: select a[0] as x from events

// output

{ "x": 1 }

// input

{

"a": {

"b": 1

}

}

Python: events.select(col("a").getItem("b").alias("x"))

Scala: events.select('a.getItem("b") as 'x)

SQL: select a['b'] as x from events

// output

{ "x": 1 }

將數組或map中的每一個元素創(chuàng)建一行

explode( )用于創(chuàng)建為數組或map中的元素創(chuàng)建一個新的行,這和HiveQL中的LATERAL VIEW EXPLODE和相似

// input

{

"a": [1, 2]

}

Python: events.select(explode("a").alias("x"))

Scala: events.select(explode('a) as 'x)

SQL: select explode(a) as x from events

// output

[{ "x": 1 }, { "x": 2 }]

// input

{

"a": {

"b": 1,

"c": 2

}

}

Python: events.select(explode("a").alias("x", "y"))

Scala: events.select(explode('a) as Seq("x", "y"))

SQL: select explode(a) as (x, y) from events

// output

[{ "x": "b", "y": 1 }, { "x": "c", "y": 2 }]

將多行數據收集成一個數組

collect_list()和collect_set()能被用于將多個項聚合到一個數組

// input

[{ "x": 1 }, { "x": 2 }]

Python: events.select(collect_list("x").alias("x"))

Scala: events.select(collect_list('x) as 'x)

SQL: select collect_list(x) as x from events

// output

{ "x": [1, 2] }

// input

[{ "x": 1, "y": "a" }, { "x": 2, "y": "b" }]

Python: events.groupBy("y").agg(collect_list("x").alias("x"))

Scala: events.groupBy("y").agg(collect_list('x) as 'x)

SQL: select y, collect_list(x) as x from events group by y

// output

[{ "y": "a", "x": [1]}, { "y": "b", "x": [2]}]

從數組的每個項查詢一個字段

當你用點符號作用于數組上時磅叛,將會返回一個新的數組屑咳,該數組的字段是從源數據組中選擇的每一個元素

// input

{

"a": [

{"b": 1},

{"b": 2}

]

}

Python: events.select("a.b")

Scala: events.select("a.b")

SQL: select a.b from events

// output

{

"b": [1, 2]

}

強大的to_json和from_json

如果你真的想保留列的復雜結構,但是你需要將其編碼為字符串嗎?Spark SQL提供了像to_json這樣的函數用于將結構數據編碼成字符串弊琴,from_json()用于將結構體恢復成復雜類型兆龙。從流式數據源像kafka中讀取或寫入數據時,用JSON字符串存儲列非常有用敲董,每一個kafka的鍵值對記錄將被增加一些元數據紫皇,比如將時間戳、位置信息注入到kafka腋寨。如果value字段數據的是JSON格式的聪铺,你就能用from_json去抽提取你的數據,豐富它萄窜,清理它铃剔,然后將其再次推向下游kafka或寫入文件。

用JSON編碼結構體

to_json()能被用于將結構體轉換成JSON字符串查刻。當你在寫入數據至kafka,需要重新編碼多個列的數據成一個列時键兜,這個方法非常有用。此方法目前在SQL中還不能用穗泵。

//?input{

"a":?{

"b":?1

}}

Python:?events.select(to_json("a").alias("c"))

Scala:?events.select(to_json('a)?as?'c)

//?output{

"c":?"{\"b\":1}"}

解碼json列為一個結構體

from_json()方法能被用于轉換JSON字符串列數據為結構體普气,那么你可以按照上述方式展平結構體,使其具有單獨的列火欧。該方法目前也不能用于SQL.

//?input{

"a":?"{\"b\":1}"}

Python:

schema?=?StructType().add("b",?IntegerType())

events.select(from_json("a",?schema).alias("c"))Scala:

val?schema?=?new?StructType().add("b",?IntegerType)

events.select(from_json('a,?schema)?as?'c)

//?output{

"c":?{

"b":?1

}}

有些時候你想保留JSON 字符串的一部分仍舊為JSON棋电,以避免模式中的復雜性過高茎截。

//?input{

"a":?"{\"b\":{\"x\":1,\"y\":{\"z\":2}}}"}

Python:

schema?=?StructType().add("b",?StructType().add("x",?IntegerType())

.add("y",?StringType()))

events.select(from_json("a",?schema).alias("c"))Scala:

val?schema?=?new?StructType().add("b",?new?StructType().add("x",?IntegerType)

.add("y",?StringType))

events.select(from_json('a,?schema)?as?'c)

//?output{

"c":?{

"b":?{

"x":?1,

"y":?"{\"z\":2}"

}

}}

從包含JSON的列中解析字段集合

json_tuple()能從包含JSON數據的字符串類型列中抽取可用的字段

//?input{

"a":?"{\"b\":1}"}

Python:?events.select(json_tuple("a",?"b").alias("c"))Scala:? events.select(json_tuple('a,?"b")?as?'c)

SQL:? ? select?json_tuple(a,?"b")?as?c?from?events

//?output{?"c":?1?}

有時候字符串列并不完全符合JSON的描述格式苇侵,但是仍舊有良好的結構。例如企锌,Log4j生成的日志消息格式榆浓。Spark SQL能輕松的將這些字符串進行結構化。

解析格式良好的字符串列

regexp_extract() 使用正則表達式來解析字符串數據

//?input[{?"a":?"x:?1"?},?{?"a":?"y:?2"?}]

Python:?events.select(regexp_extract("a",?"([a-z]):",?1).alias("c"))Scala:? events.select(regexp_extract('a,?"([a-z]):",?1)?as?'c)

SQL:? ? select?regexp_extract(a,?"([a-z]):",?1)?as?c?from?events

//?output[{?"c":?"x"?},?{?"c":?"y"?}]

以上有很多種轉換撕攒,現在我們來看一下現實生活中的一些用例陡鹃,把所有這些數據格式和數據處理能力用到恰到好處烘浦。

利用好所有的這些強大的轉換能力

在Databricks,我們收集服務的日志信息萍鲸,并在客戶受到影響之前使用它們執(zhí)行實時監(jiān)控以檢測問題闷叉。日志文件是非結構化的文件,但是由于是以良好的Log4j格式定義的脊阴,因此文件是可解析的握侧。我們運行一個日志收集服務,將每行日志和額外的元數據信息以JSON的格式發(fā)送至kinesis嘿期。JSON格式的記錄批量上傳至S3作為文件品擎。通過直接查詢這些JSON日志去發(fā)現問題是非常乏味的:因為對于回答任務和查詢,這些文件會包含重復項备徐,即使它涉及單個列萄传,整個JSON記錄也可能需要反序列化。

為了解決這個問題蜜猾,我們執(zhí)行一個管道去讀取這些JSON記錄秀菱,并對元數據執(zhí)行數據刪除。這樣我們就只剩下那些JSON格式或非結構化文本的原始日志信息瓣铣。如果我們需要處理JSON答朋,可以使用from_json()和上面描述的一些其它轉換去格式化我們的數據。如果是文本棠笑,我們可以用regexp_extract()去解析Log4j格式成一個更加結構化的形式梦碗。一旦完成了我們所有的轉換和重組,我們將記錄按日期分區(qū)保存在Parquet中蓖救。這將能從日志中發(fā)現的時間提高10至100倍洪规。

不再需要花很大的代價去反序列化JSON記錄

不在需要對原始日志信息進行復雜的字符串對比

僅僅需要在查詢中抽取兩個列:時間和日志級別

下面是很多客戶提供的通用用例場景:

“我想用我的數據運行機器學習管道。 我的數據已經被預處理循捺,我將在整個管道中使用我的所有功能”

當你要訪問數據的所有行斩例,Avro是一個不錯的選擇:

“我有一個IoT用例,我的傳感器發(fā)送給我的事件从橘。 對于每個事件念赶,元數據的重要性是不同的”

有些場景你的模式需要更好的靈活性,也許你可以考慮使用JSON存儲你的數據恰力。

“我想在報紙文章或情感分析上對產品評論進行語音識別算法”

如果你的數據沒有固定的模式叉谜,也不是固定的模式或結構,那么將其存儲為純文本可能更易于使用踩萎。有可能你又一個管道停局,它在分結構化數據上進行特征提取,并將其存儲為Avro,為你的機器學習流程做準備。

結論

在這片博文中董栽,我們探討了Spark SQL如何使用許多來源和格式的數據码倦,并輕松的執(zhí)行這些數據格式之間的轉換和交換。我們分享了Databrics是如何策劃數據锭碳,并考慮了其它生產用例袁稽,因為你可能想做不同事情。

Spark SQL為您提供必要的工具擒抛,以便以任何形式訪問您的數據运提,無論其格式如何,都可以為下游應用程序做準備闻葵,并且在流式數據上具有低延時民泵,歷史數據上具有高吞吐量。

原文鏈接:https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末槽畔,一起剝皮案震驚了整個濱河市栈妆,隨后出現的幾起案子,更是在濱河造成了極大的恐慌厢钧,老刑警劉巖鳞尔,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異早直,居然都是意外死亡寥假,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進店門霞扬,熙熙樓的掌柜王于貴愁眉苦臉地迎上來糕韧,“玉大人,你說我怎么就攤上這事喻圃∮┎剩” “怎么了?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵斧拍,是天一觀的道長雀扶。 經常有香客問我,道長肆汹,這世上最難降的妖魔是什么愚墓? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮昂勉,結果婚禮上浪册,老公的妹妹穿的比我還像新娘。我一直安慰自己硼啤,他們只是感情好议经,可當我...
    茶點故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著谴返,像睡著了一般煞肾。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上嗓袱,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天籍救,我揣著相機與錄音,去河邊找鬼渠抹。 笑死蝙昙,一個胖子當著我的面吹牛,可吹牛的內容都是我干的梧却。 我是一名探鬼主播奇颠,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼放航!你這毒婦竟也來了烈拒?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤广鳍,失蹤者是張志新(化名)和其女友劉穎荆几,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體赊时,經...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡吨铸,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了祖秒。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片诞吱。...
    茶點故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖竭缝,靈堂內的尸體忽然破棺而出狐胎,到底是詐尸還是另有隱情,我是刑警寧澤歌馍,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布握巢,位于F島的核電站,受9級特大地震影響松却,放射性物質發(fā)生泄漏暴浦。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一晓锻、第九天 我趴在偏房一處隱蔽的房頂上張望歌焦。 院中可真熱鬧,春花似錦砚哆、人聲如沸独撇。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽纷铣。三九已至卵史,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間搜立,已是汗流浹背以躯。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留啄踊,地道東北人忧设。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓,卻偏偏與公主長得像颠通,于是被迫代替她去往敵國和親址晕。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,033評論 2 355

推薦閱讀更多精彩內容