在Apache Spark 2.0中使用DataFrames和SQL (轉(zhuǎn)載)

在Apache Spark 2.0中使用DataFrames和SQL

時(shí)間?2017-05-27 10:15:16百度VR

原文http://ivr.baidu.com/it/s5928fa3babbb.html

主題SQL

作者|馬小龍(Dr. Christoph Schubert)

責(zé)編|郭芮

Spark 2.0中使用DataFrames和SQL的第一步

Spark 2.0開(kāi)發(fā)的一個(gè)動(dòng)機(jī)是讓它可以觸及更廣泛的受眾祷肯,特別是缺乏編程技能但可能非常熟悉SQL的數(shù)據(jù)分析師或業(yè)務(wù)分析師屈张。因此碟贾,Spark 2.0現(xiàn)在比以往更易使用琼腔。在這部分,我將介紹如何使用Apache Spark 2.0朝氓。并將重點(diǎn)關(guān)注DataFrames作為新Dataset API的無(wú)類型版本皆刺。

到Spark 1.3针饥,彈性分布式數(shù)據(jù)集(Resilient Distributed Dataset,RDD)一直是Spark中的主要抽象浪秘。RDD API是在Scala集合框架之后建模的蒋情,因此間接提供了Hadoop Map / Reduce熟悉的編程原語(yǔ)以及函數(shù)式編程(Map、Filter秫逝、Reduce)的常用編程原語(yǔ)恕出。雖然RDD API比Map / Reduce范例更具表達(dá)性,但表達(dá)復(fù)雜查詢?nèi)匀缓芊爆嵨シ貏e是對(duì)于來(lái)自典型數(shù)據(jù)分析背景的用戶浙巫,他們可能熟悉SQL,或來(lái)自R/Python編程語(yǔ)言的數(shù)據(jù)框架。

Spark 1.3引入了DataFrames作為RDD頂部的一個(gè)新抽象的畴。DataFrame是具有命名列的行集合渊抄,在R和Python相應(yīng)包之后建模。

Spark 1.6看到了Dataset類作為DataFrame的類型化版本而引入丧裁。在Spark 2.0中护桦,DataFrames實(shí)際上是Datasets的特殊版本,我們有type DataFrame = Dataset [Row]煎娇,因此DataFrame和Dataset API是統(tǒng)一的二庵。

表面上,DataFrame就像SQL表缓呛。Spark 2.0將這種關(guān)系提升到一個(gè)新水平:我們可以使用SQL來(lái)修改和查詢DataSets和DataFrames催享。通過(guò)限制表達(dá)數(shù)量,有助于更好地優(yōu)化哟绊。數(shù)據(jù)集也與Catalyst優(yōu)化器良好集成因妙,大大提高了Spark代碼的執(zhí)行速度。因此票髓,新的開(kāi)發(fā)應(yīng)該利用DataFrames攀涵。

在本文中,我將重點(diǎn)介紹Spark 2.0中DataFrames的基本用法洽沟。我將嘗試強(qiáng)調(diào)Dataset API和SQL間的相似性以故,以及如何使用SQL和Dataset API互換地查詢數(shù)據(jù)。借由整個(gè)代碼生成和Catalyst優(yōu)化器玲躯,兩個(gè)版本將編譯相同高效的代碼据德。

代碼示例以Scala編程語(yǔ)言給出。我認(rèn)為這樣的代碼最清晰跷车,因?yàn)镾park本身就是用Scala編寫的棘利。

?SparkSession

SparkSession類替換了Apache Spark 2.0中的SparkContext和SQLContext,并為Spark集群提供了唯一的入口點(diǎn)朽缴。

為了向后兼容善玫,SparkSession對(duì)象包含SparkContext和SQLContext對(duì)象,見(jiàn)下文密强。當(dāng)我們使用交互式Spark shell時(shí)茅郎,為我們創(chuàng)建一個(gè)名為spark的SparkSession對(duì)象。

?創(chuàng)建DataFrames

DataFrame是具有命名列的表或渤。最簡(jiǎn)單的DataFrame是使用SparkSession的range方法來(lái)創(chuàng)建:

使用show給我們一個(gè)DataFrame的表格表示系冗,可以使用describe來(lái)獲得數(shù)值屬性概述。describe返回一個(gè)DataFrame:

觀察到Spark為數(shù)據(jù)幀中唯一的列選擇了名稱id薪鹦。對(duì)于更有趣的示例掌敬,請(qǐng)考慮以下數(shù)據(jù)集:

在這種情況下惯豆,customerDF對(duì)象將有名為_(kāi)1、_2奔害、_3关拒、_4的列折剃,它們以某種方式違反了命名列的目的捶障〖⒆罚可以通過(guò)重命名列來(lái)恢復(fù):

使用printSchema和describe提供以下輸出:

一般來(lái)說(shuō)我們會(huì)從文件加載數(shù)據(jù)。SparkSession類為提供了以下方法:

在這里我們讓Spark從CSV文件的第一行提取頭信息(通過(guò)設(shè)置header選項(xiàng)為true)雅潭,并使用數(shù)字類型(age和total)將數(shù)字列轉(zhuǎn)換為相應(yīng)的數(shù)據(jù)類型 inferSchema選項(xiàng)揭厚。

其他可能的數(shù)據(jù)格式包括parquet文件和通過(guò)JDBC連接讀取數(shù)據(jù)的可能性。

?基本數(shù)據(jù)操作

我們現(xiàn)在將訪問(wèn)DataFrame中數(shù)據(jù)的基本功能寻馏,并將其與SQL進(jìn)行比較棋弥。

沿襲,操作诚欠,動(dòng)作和整個(gè)階段的代碼生成

相同的譜系概念,轉(zhuǎn)換操作和行動(dòng)操作之間的區(qū)別適用于Dataset和RDD漾岳。我們下面討論的大多數(shù)DataFrame操作都會(huì)產(chǎn)生一個(gè)新的DataFrame轰绵,但實(shí)際上不執(zhí)行任何計(jì)算。要觸發(fā)計(jì)算尼荆,必須調(diào)用行動(dòng)操作之一左腔,例如show(將DataFrame的第一行作為表打印)捅儒,collect(返回一個(gè)Row對(duì)象的Array)液样,count(返回DataFrame中的行數(shù)),foreach(對(duì)每一行應(yīng)用一個(gè)函數(shù))巧还。這是惰性求值(lazy evaluation)的常見(jiàn)概念鞭莽。

下面Dataset類的所有方法實(shí)際上依賴于所有數(shù)據(jù)集的有向非循環(huán)圖(Directed Acyclic Graph,DAG)麸祷,從現(xiàn)有數(shù)據(jù)集中創(chuàng)建一個(gè)新的“數(shù)據(jù)集”澎怒。這被稱為數(shù)據(jù)集的沿襲。僅使用調(diào)用操作時(shí)阶牍,Catalyst優(yōu)化程序?qū)⒎治鲅匾u中的所有轉(zhuǎn)換喷面,并生成實(shí)際代碼。這被稱為整階段代碼生成走孽,并且負(fù)責(zé)Dataset對(duì)RDD的性能改進(jìn)惧辈。

Row-行對(duì)象

Row類在DataFrame的一行不帶類型數(shù)據(jù)值中充當(dāng)容器。通常情況下我們不會(huì)自己創(chuàng)建Row對(duì)象磕瓷,而是使用下面的語(yǔ)法:

Row對(duì)象元素通過(guò)位置(從0開(kāi)始)或者使用apply進(jìn)行訪問(wèn):

它會(huì)產(chǎn)生一個(gè)Any的對(duì)象類型盒齿。或者最好使用get,方法之一:

因?yàn)檫@樣就不會(huì)出現(xiàn)原始類型的開(kāi)銷县昂。我們可以使用isNull方法檢查行中的一個(gè)條目是否為’null’:

我們現(xiàn)在來(lái)看看DataFrame類最常用的轉(zhuǎn)換操作:

select

我們將要看的第一個(gè)轉(zhuǎn)換是“select”肮柜,它允許我們對(duì)一個(gè)DataFrame的列進(jìn)行投影和變換。

引用列

通過(guò)它們的名稱有兩種方法來(lái)訪問(wèn)DataFrame列:可以將其引用為字符串倒彰;或者可以使用apply方法审洞,col-方法或$以字符串作為參數(shù)并返回一個(gè)Column(列)對(duì)象。所以customerDF.col(“customer”)和customerDF(“customer”)都是customerDF的第一列待讳。

選擇和轉(zhuǎn)換列

最簡(jiǎn)單的select轉(zhuǎn)換形式允許我們將DataFrame投影到包含較少列的DataFrame中芒澜。下面的四個(gè)表達(dá)式返回一個(gè)只包含customer和province列的DataFrame:

不能在單個(gè)select方法中調(diào)用混合字符串和列參數(shù):customerDF.select(“customer”, $”province”)導(dǎo)致錯(cuò)誤。

使用Column類定義的運(yùn)算符创淡,可以構(gòu)造復(fù)雜的列表達(dá)式:

應(yīng)用show得到以下結(jié)果:

列別名

新數(shù)據(jù)集的列名稱從用于創(chuàng)建的表達(dá)式中派生而來(lái)痴晦,我們可以使用alias或as將列名更改為其他助記符:

產(chǎn)生與前面相同內(nèi)容的DataFrame,但使用名為name琳彩,newAge和isZJ的列誊酌。

Column類包含用于執(zhí)行基本數(shù)據(jù)分析任務(wù)的各種有效方法。我們將參考讀者文檔的詳細(xì)信息露乏。

最后碧浊,我們可以使用lit函數(shù)添加一個(gè)具有常量值的列,并使用when和otherwise重新編碼列值瘟仿。例如箱锐,我們添加一個(gè)新列“ageGroup”,如果“age <20”劳较,則為1驹止,如果“age <30”則為2,否則為3观蜗,以及總是為“false”的列“trusted”:

給出以下DataFrame:

drop是select相對(duì)的轉(zhuǎn)換操作臊恋;它返回一個(gè)DataFrame,其中刪除了原始DataFrame的某些列嫂便。

最后可使用distinct方法返回原始DataFrame中唯一值的DataFrame:

返回一個(gè)包含單個(gè)列的DataFrame和包含值的三行:“北京”捞镰、“江蘇”、“浙江”毙替。

filter

第二個(gè)DataFrame轉(zhuǎn)換是Filter方法岸售,它在DataFrame行中進(jìn)行選擇。有兩個(gè)重載方法:一個(gè)接受一個(gè)Column厂画,另一個(gè)接受一個(gè)SQL表達(dá)式(一個(gè)String)凸丸。例如,有以下兩種等效方式來(lái)過(guò)濾年齡大于30歲的所有客戶:

Filter轉(zhuǎn)換接受一般的布爾連接符and(和)和or(或):

我們?cè)赟QL版本中使用單個(gè)等號(hào)袱院,或者使用三等式“===”(Column類的一個(gè)方法)屎慢。在==運(yùn)算符中使用Scala的等于符號(hào)會(huì)導(dǎo)致錯(cuò)誤瞭稼。我們?cè)俅我肅olumn類文檔中的有用方法。

聚合(aggregation)

執(zhí)行聚合是進(jìn)行數(shù)據(jù)分析的最基本任務(wù)之一腻惠。例如环肘,我們可能對(duì)每個(gè)訂單的總金額感興趣,或者更具體地集灌,對(duì)每個(gè)省或年齡組的總金額或平均金額感興趣悔雹。可能還有興趣了解哪個(gè)客戶的年齡組具有高于平均水平的總數(shù)欣喧。借用SQL腌零,我們可以使用GROUP BY表達(dá)式來(lái)解決這些問(wèn)題。DataFrames提供了類似的功能唆阿∫娼В可以根據(jù)一些列的值進(jìn)行分組,同樣驯鳖,還可以使用字符串或“Column”對(duì)象來(lái)指定闲询。

withColumn方法添加一個(gè)新的列或替換一個(gè)現(xiàn)有的列。

聚合數(shù)據(jù)分兩步進(jìn)行:一個(gè)調(diào)用GroupBy方法將特定列中相等值的行組合在一起浅辙,然后調(diào)用聚合函數(shù)嘹裂,如sum(求和值),max(最大值)或?yàn)樵糄ataFrame中每組行計(jì)算的“avg”(平均值)摔握。從技術(shù)上來(lái)說(shuō),GroupBy會(huì)返回一個(gè)RelationalGroupedDataFrame類的對(duì)象丁寄。RelationalGroupedDataFrame包含max氨淌、min、avg伊磺、mean和sum方法盛正,所有這些方法都對(duì)DataFrame的數(shù)字列執(zhí)行指定操作,并且可以接受一個(gè)String-參數(shù)來(lái)限制所操作的數(shù)字列屑埋。此外豪筝,我們有一個(gè)count方法計(jì)算每個(gè)組中的行數(shù),還有一個(gè)通用的agg方法允許我們指定更一般的聚合函數(shù)摘能。所有這些方法都會(huì)返回一個(gè)DataFrame续崖。

customerAgeGroupDF.groupBy(“agegroup”).max.show輸出:

最后,customerAgeGroupDF.groupBy(“agegroup”).min(“age”, “total”).show輸出:

還有一個(gè)通用的agg方法团搞,接受復(fù)雜的列表達(dá)式严望。agg在RelationalGroupedDataFrame和Dataset中都可用。后一種方法對(duì)整個(gè)數(shù)據(jù)集執(zhí)行聚合逻恐。這兩種方法都允許我們給出列表達(dá)式的列表:

可用的聚合函數(shù)在org.apache.spark.sql.functions中定義像吻。類RelationalGroupedDataset在Apache Spark 1.x中被稱為“GroupedData”峻黍。 RelationalGroupedDataset的另一個(gè)特點(diǎn)是可以對(duì)某些列值進(jìn)行透視。例如拨匆,以下內(nèi)容允許我們列出每個(gè)年齡組的總數(shù):

其中null值表示沒(méi)有省/年齡組的組合姆涩。Pivot的重載版本接受一個(gè)值列表以進(jìn)行透視。這一方面允許我們限制列數(shù)惭每,另一方面更加有效骨饿,因?yàn)镾park不需要計(jì)算樞軸列中的所有值。例如:

最后洪鸭,使用樞紐數(shù)據(jù)也可以進(jìn)行復(fù)雜聚合:

這里=!=是Column類的“不等于”方法样刷。

排序和限制

OrderBy方法允許我們根據(jù)一些列對(duì)數(shù)據(jù)集的內(nèi)容進(jìn)行排序。和以前一樣览爵,我們可以使用Strings或Column對(duì)象來(lái)指定列:customerDF.orderBy(”age”)和 customerDF.orderBy($”age”)給出相同的結(jié)果置鼻。默認(rèn)排序順序?yàn)樯颉H绻敌蚺判蝌阎瘢梢允褂肅olumn類的desc方法或者desc函數(shù):

觀察到desc函數(shù)返回了一個(gè)Column-object箕母,任何其他列也需要被指定為Column-對(duì)象。

最后俱济,limit方法返回一個(gè)包含原始DataFrame中第一個(gè)n行的DataFrame嘶是。

?DataFrame方法與SQL對(duì)比

我們已經(jīng)發(fā)現(xiàn),DataFrame類的基本方法與SQLselect語(yǔ)句的部分密切相關(guān)蛛碌。下表總結(jié)了這一對(duì)應(yīng)關(guān)系:

到目前為止連接(join)在我們的討論中已經(jīng)缺失聂喇。Spark的DataFrame支持連接,我們將在文章的下一部分討論它們蔚携。

下面將討論完全類型化的DataSets API希太,連接和用戶定義的函數(shù)(UDF)。

?使用SQL來(lái)處理DataFrames

我們還在Apache Spark 2.0中直接執(zhí)行SQL語(yǔ)句酝蜒。SparkSession的SQL方法返回一個(gè)DataFrame誊辉。此外,DataFrame的selectExp方法也允許我們?yōu)閱瘟兄付⊿QL表達(dá)式亡脑,如下所示堕澄。為了能夠引用SQL表達(dá)式中的DataFrame,首先有必要將DataFrame注冊(cè)為臨時(shí)表霉咨,在Spark 2中稱為臨時(shí)視圖(temporary view蛙紫,簡(jiǎn)稱為tempview)。DataFrame為我們提供了以下兩種方法:

createTempView創(chuàng)建一個(gè)新視圖躯护,如果具有該名稱的視圖已存在惊来,則拋出一個(gè)異常;

createOrReplaceTempView創(chuàng)建一個(gè)用來(lái)替換的臨時(shí)視圖棺滞。

兩種方法都將視圖名稱作為唯一參數(shù)裁蚁。

注冊(cè)表后矢渊,可以使用SparkSession的SQL方法來(lái)執(zhí)行SQL語(yǔ)句:

返回具有以下內(nèi)容的DataFrame:

SparkSession類的catalog字段是Catalog類的一個(gè)對(duì)象,具有多種處理會(huì)話注冊(cè)表和視圖的方法枉证。例如矮男,Catalog的ListTables方法返回一個(gè)包含所有已注冊(cè)表信息的Dataset:

會(huì)返回一個(gè)包含有關(guān)注冊(cè)表“tableName”中列信息的Dataset,例如:

此外室谚,可以使用DataSet的SelectExpr方法執(zhí)行某些產(chǎn)生單列的SQL表達(dá)式毡鉴,例如:

這兩者都產(chǎn)生DataFrame對(duì)象。

?第一步結(jié)束語(yǔ)

我們希望讓讀者相信秒赤,Apache Spark 2.0的統(tǒng)一性能夠?yàn)槭煜QL的分析師們提供Spark的學(xué)習(xí)曲線猪瞬。下一部分將進(jìn)一步介紹類型化Dataset API的使用、用戶定義的函數(shù)以及Datasets間的連接入篮。此外陈瘦,我們將討論新Dataset API的使用缺陷。

Spark 2.0中使用DataFrames和SQL的第二步

本文第一部分使用了無(wú)類型的DataFrame API潮售,其中每行都表示一個(gè)Row對(duì)象痊项。在下面的內(nèi)容中,我們將使用更新的DatasetAPI酥诽。Dataset是在Apache Spark 1.6中引入的鞍泉,并已在Spark 2.0中使用DataFrames進(jìn)行了統(tǒng)一,我們現(xiàn)在有了type DataFrame = Dataset [Row]肮帐,其中方括號(hào)([和] Scala中的泛型類型咖驮,因此類似于Java的<和>)。因此训枢,上面討論的所有諸如select游沿、filter、groupBy肮砾、agg、orderBy袋坑、limit等方法都以相同的方式使用仗处。

?Datasets:返回類型信息

Spark 2.0以前的DataFrame API本質(zhì)上是一個(gè)無(wú)類型的API,這也就意味著在編譯期間很可能會(huì)因?yàn)槟承┚幾g器錯(cuò)誤枣宫,導(dǎo)致無(wú)法訪問(wèn)類型信息婆誓。

和之前一樣,我們將在示例中使用Scala也颤,因?yàn)槲蚁嘈臩cala最為簡(jiǎn)潔洋幻。可能涉及的例子:spark將表示SparkSession對(duì)象翅娶,代表我們的Spark集群文留。

?例子:分析Apache訪問(wèn)日志

我們將使用Apache訪問(wèn)日志格式數(shù)據(jù)好唯。先一起回顧Apache日志中的典型行,如下所示:

此行包含以下部分:

127.0.0.1是向服務(wù)器發(fā)出請(qǐng)求的客戶端(遠(yuǎn)程主機(jī))IP地址(或主機(jī)名燥翅,如果可用)骑篙;

輸出中的第一個(gè)-表示所請(qǐng)求的信息(來(lái)自遠(yuǎn)程機(jī)器的用戶身份)不可用;

輸出中的第二個(gè)-表示所請(qǐng)求的信息(來(lái)自本地登錄的用戶身份)不可用森书;

[01 / Aug / 1995:00:00:01 -0400]表示服務(wù)器完成處理請(qǐng)求的時(shí)間靶端,格式為:[日/月/年:小時(shí):分:秒時(shí)區(qū)],有三個(gè)部件:”GET /images/launch-logo.gif HTTP / 1.0”凛膏;

請(qǐng)求方法(例如杨名,GET,POST等)猖毫;

端點(diǎn)(統(tǒng)一資源標(biāo)識(shí)符)台谍;

和客戶端協(xié)議版本(’HTTP / 1.0’)。

1.200這是服務(wù)器返回客戶端的狀態(tài)代碼鄙麦。這些信息非常有價(jià)值:成功回復(fù)(從2開(kāi)始的代碼)典唇,重定向(從3開(kāi)始的代碼),客戶端導(dǎo)致的錯(cuò)誤(以4開(kāi)頭的代碼)胯府,服務(wù)器錯(cuò)誤(代碼從5開(kāi)始)介衔。最后一個(gè)條目表示返回給客戶端的對(duì)象大小。如果沒(méi)有返回任何內(nèi)容則是-或0骂因。

首要任務(wù)是創(chuàng)建適當(dāng)?shù)念愋蛠?lái)保存日志行信息炎咖,因此我們使用Scala的case類,具體如下:

默認(rèn)情況下寒波,case類對(duì)象不可變乘盼。通過(guò)它們的值來(lái)比較相等性,而不是通過(guò)比較對(duì)象引用俄烁。

為日志條目定義了合適的數(shù)據(jù)結(jié)構(gòu)后绸栅,現(xiàn)在需要將表示日志條目的String轉(zhuǎn)換為ApacheLog對(duì)象。我們將使用正則表達(dá)式來(lái)達(dá)到這一點(diǎn)页屠,參考如下:

可以看到正則表達(dá)式包含9個(gè)捕獲組粹胯,用于表示ApacheLog類的字段。

使用正則表達(dá)式解析訪問(wèn)日志時(shí)辰企,會(huì)面臨以下問(wèn)題:

一些日志行的內(nèi)容大小以-表示风纠,我們想將它轉(zhuǎn)換為0;

一些日志行不符合所選正則表達(dá)式給出的格式牢贸。

為了克服第二個(gè)問(wèn)題竹观,我們使用Scala的“Option”類型來(lái)丟棄不對(duì)的格式并進(jìn)行確認(rèn)。Option也是一個(gè)泛型類型,類型Option[ApacheLog]的對(duì)象可以有以下形式:

None臭增,表示不存在一個(gè)值(在其他語(yǔ)言中懂酱,可能使用null);

Some(log)for a ApacheLog-objectlog速址。

以下為一行函數(shù)解析玩焰,并為不可解析的日志條目返回None:

最好的方法是修改正則表達(dá)式以捕獲所有日志條目,但Option是處理一般錯(cuò)誤或不可解析條目的常用技術(shù)芍锚。

綜合起來(lái)昔园,現(xiàn)在來(lái)剖析一個(gè)真正的數(shù)據(jù)集。我們將使用著名的NASA Apache訪問(wèn)日志數(shù)據(jù)集并炮,它可以在ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz下載默刚。

下載和解壓縮文件后,首先將其打開(kāi)為String的Dataset逃魄,然后使用正則表達(dá)式解析:

用spark.read.text方法打開(kāi)文本文件并返回一個(gè)DataFrame荤西,是textfile的行。使用Dataset的as方法將其轉(zhuǎn)換為包含Strings的Dataset對(duì)象(而不是Rows包含字符串)伍俘,并導(dǎo)入spark.implicits._以允許創(chuàng)建一個(gè)包含字符串或其他原始類型的Dataset邪锌。

flatMap將parse_logline函數(shù)應(yīng)用于rawData的每一行,并將Some(ApacheLog)形式的所有結(jié)果收集到apacheLogs中癌瘾,同時(shí)丟棄所有不可解析的日志行(所有結(jié)果的形式None)觅丰。

我們現(xiàn)在可以對(duì)“數(shù)據(jù)集”執(zhí)行分析,就像在“DataFrame”上一樣妨退。Dataset中的列名稱只是ApacheLog case類的字段名稱妇萄。

例如,以下代碼打印生成最多404個(gè)響應(yīng)的10個(gè)端點(diǎn):

如前所述咬荷,可以將Dataset注冊(cè)為臨時(shí)視圖冠句,然后使用SQL執(zhí)行查詢:

上面的SQL查詢具有與上面的Scala代碼相同的結(jié)果。

用戶定義的函數(shù)(user defined function, UDF)

在Spark SQL中幸乒,我們可以使用范圍廣泛的函數(shù)懦底,包括處理日期、基本統(tǒng)計(jì)和其他數(shù)學(xué)函數(shù)的函數(shù)罕扎。Spark在函數(shù)中的構(gòu)建是在org.apache.spark.sql.functions對(duì)象中定義的基茵。

作為示例,我們使用以下函數(shù)提取主機(jī)名的頂級(jí)域:

如果想在SQL查詢中使用這個(gè)函數(shù)壳影,首先需要注冊(cè)。這是通過(guò)SparkSession的udf對(duì)象實(shí)現(xiàn)的:

函數(shù)名后的最后一個(gè)下劃線將extractTLD轉(zhuǎn)換為部分應(yīng)用函數(shù)(partially applied function)弥臼,這是必要的宴咧,如果省略它會(huì)導(dǎo)致錯(cuò)誤。register方法返回一個(gè)UserDefinedFunction對(duì)象径缅,可以應(yīng)用于列表達(dá)式掺栅。

一旦注冊(cè)烙肺,我們可以在SQL查詢中使用extractTLD:

要獲得注冊(cè)的用戶定義函數(shù)概述,可以使用spark.catalog對(duì)象的listFunctions方法氧卧,該對(duì)象返回SparkSession定義的所有函數(shù)DataFrame:

注意Spark SQL遵循通常的SQL約定桃笙,即不區(qū)分大小寫。也就是說(shuō)沙绝,以下SQL表達(dá)式都是有效的并且彼此等價(jià):select extractTLD(host)from apacheLogs搏明,select extracttld(host)from apacheLogs,”select EXTRACTTLD(host) from apacheLogs”闪檬。spark.catalog.listFunctions返回的函數(shù)名將總是小寫字母星著。

除了在SQL查詢中使用UDF,我們還可以直接將它們應(yīng)用到列表達(dá)式粗悯。以下表達(dá)式返回.net域中的所有請(qǐng)求:

值得注意的是虚循,與Spark在諸如filter,select等方法中的構(gòu)建相反样傍,用戶定義的函數(shù)只采用列表達(dá)式作為參數(shù)横缔。寫extractTLD_UDF(“host”)會(huì)導(dǎo)致錯(cuò)誤。

除了在目錄中注冊(cè)UDF并用于Column表達(dá)式和SQL中衫哥,我們還可以使用org.apache.spark.sql.functions對(duì)象中的udf函數(shù)注冊(cè)一個(gè)UDF:

注冊(cè)UDF后茎刚,可以將它應(yīng)用到Column表達(dá)式(例如filter里面),如下所示:

但是不能在SQL查詢中使用它炕檩,因?yàn)檫€沒(méi)有通過(guò)名稱注冊(cè)它斗蒋。

Spark中用Catalyst優(yōu)化器來(lái)優(yōu)化所有涉及數(shù)據(jù)集的查詢,會(huì)將用戶定義的函數(shù)視作黑盒笛质。值得注意的是泉沾,當(dāng)過(guò)濾器操作涉及UDF時(shí),在連接之前可能不會(huì)“下推”過(guò)濾器操作妇押。我們通過(guò)下面的例子來(lái)說(shuō)明跷究。

通常來(lái)說(shuō),不依賴UDF而是從內(nèi)置的“Column”表達(dá)式進(jìn)行組合操作可能效果更好敲霍。

?加盟

最后俊马,我們將討論如何使用以下兩個(gè)Dataset方法連接數(shù)據(jù)集:

join返回一個(gè)DataFrame

joinWith返回一對(duì)Datasets

以下示例連接兩個(gè)表1、表2(來(lái)自維基百科):

表1員工(Employee)

表2部門(Department)

定義兩個(gè)case類肩杈,將兩個(gè)表編碼為case類對(duì)象的序列(由于空間原因不顯示)柴我,最后創(chuàng)建兩個(gè)Dataset對(duì)象:

為了執(zhí)行內(nèi)部等連接,只需提供要作為“String”連接的列名稱:

Spark會(huì)自動(dòng)刪除雙列扩然,joined.show給出以下輸出:

表3輸出

在上面艘儒,joined是一個(gè)DataFrame,不再是Dataset。連接數(shù)據(jù)集的行可以作為Seq列名稱給出界睁,或者可以指定要執(zhí)行的equi-join(inner觉增,outer,left_outer翻斟,right_outer或leftsemi)類型逾礁。想要指定連接類型的話,需要使用Seq表示法來(lái)指定要連接的列访惜。請(qǐng)注意嘹履,如果執(zhí)行內(nèi)部聯(lián)接(例如,獲取在同一部門中工作的所有員工的對(duì)):employees.join(employees疾牲,Seq(“depID”))植捎,我們沒(méi)有辦法訪問(wèn)連接的DataFrame列:employees.join(employees, Seq(“depID”)).select(“l(fā)astname”)會(huì)因?yàn)橹貜?fù)的列名而失敗。處理這種情況的方法是重命名部分列:

除了等連接之外阳柔,我們還可以給出更復(fù)雜的連接表達(dá)式焰枢,例如以下查詢,它將所有部門連接到不知道部門ID且不在本部門工作的員工:

然后可以不指定任何連接條件舌剂,在兩個(gè)Datasets間執(zhí)行笛卡爾聯(lián)接:departments.join(employees).show济锄。

最后,Dataset的joinWith方法返回一個(gè)Dataset霍转,包含原始數(shù)據(jù)集中匹配行的Scala元組荐绝。

表4返回Dataset

這可以用于自連接后想要規(guī)避上述不可訪問(wèn)列的問(wèn)題情況。

Catalyst優(yōu)化器嘗試通過(guò)將“過(guò)濾器”操作向“下推”避消,以盡可能多地優(yōu)化連接低滩,因此它們?cè)趯?shí)際連接之前執(zhí)行。

為了這個(gè)工作岩喷,用戶定義的函數(shù)(UDF)恕沫,不應(yīng)該在連接條件內(nèi)使用用因?yàn)檫@些被Catalyst處理為黑盒子。

?結(jié)論

我們已經(jīng)討論了在Apache Spark 2.0中使用類型化的DatasetAPI纱意,如何在Apache Spark中定義和使用用戶定義的函數(shù)婶溯,以及這樣做的危險(xiǎn)。使用UDF可能產(chǎn)生的主要困難是它們會(huì)被Catalyst優(yōu)化器視作黑盒偷霉。

作者:馬小龍(Dr. Christoph Schubert)迄委,浙江財(cái)經(jīng)大學(xué)數(shù)據(jù)分析和大數(shù)據(jù)計(jì)算客座教授。2006年在德國(guó)不來(lái)梅大學(xué)獲得數(shù)學(xué)博士學(xué)位后类少,在多特蒙德大學(xué)軟件工程研究所從事研究和教學(xué)工作直到2011年來(lái)到中國(guó)叙身。他的研究方向重點(diǎn)在大數(shù)據(jù)技術(shù)和NoSQL數(shù)據(jù)庫(kù)以及功能規(guī)劃和隨機(jī)計(jì)算模型與模態(tài)邏輯。他還是國(guó)際大數(shù)據(jù)分析大會(huì)主席硫狞。

PS:另有CSDN Spark用戶微信群信轿,請(qǐng)?zhí)砑游⑿舋uorui_1118并備注公司+實(shí)名+職位申請(qǐng)入群赞警。

本文為《程序員》原創(chuàng)文章,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載虏两,更多精彩文章請(qǐng)[閱讀原文]訂閱《程序員》。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末世剖,一起剝皮案震驚了整個(gè)濱河市定罢,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌旁瘫,老刑警劉巖祖凫,帶你破解...
    沈念sama閱讀 218,682評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異酬凳,居然都是意外死亡惠况,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門宁仔,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)稠屠,“玉大人,你說(shuō)我怎么就攤上這事翎苫∪ú海” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 165,083評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵煎谍,是天一觀的道長(zhǎng)攘蔽。 經(jīng)常有香客問(wèn)我,道長(zhǎng)呐粘,這世上最難降的妖魔是什么满俗? 我笑而不...
    開(kāi)封第一講書人閱讀 58,763評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮作岖,結(jié)果婚禮上唆垃,老公的妹妹穿的比我還像新娘。我一直安慰自己鳍咱,他們只是感情好降盹,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,785評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著谤辜,像睡著了一般蓄坏。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上丑念,一...
    開(kāi)封第一講書人閱讀 51,624評(píng)論 1 305
  • 那天涡戳,我揣著相機(jī)與錄音,去河邊找鬼脯倚。 笑死渔彰,一個(gè)胖子當(dāng)著我的面吹牛嵌屎,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播恍涂,決...
    沈念sama閱讀 40,358評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼宝惰,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了再沧?” 一聲冷哼從身側(cè)響起尼夺,我...
    開(kāi)封第一講書人閱讀 39,261評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎炒瘸,沒(méi)想到半個(gè)月后淤堵,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,722評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡顷扩,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年拐邪,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片隘截。...
    茶點(diǎn)故事閱讀 40,030評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡扎阶,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出技俐,到底是詐尸還是另有隱情乘陪,我是刑警寧澤,帶...
    沈念sama閱讀 35,737評(píng)論 5 346
  • 正文 年R本政府宣布雕擂,位于F島的核電站啡邑,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏井赌。R本人自食惡果不足惜谤逼,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,360評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望仇穗。 院中可真熱鬧流部,春花似錦、人聲如沸纹坐。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,941評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)耘子。三九已至果漾,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間谷誓,已是汗流浹背绒障。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,057評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留捍歪,地道東北人户辱。 一個(gè)月前我還...
    沈念sama閱讀 48,237評(píng)論 3 371
  • 正文 我出身青樓鸵钝,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親庐镐。 傳聞我的和親對(duì)象是個(gè)殘疾皇子恩商,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,976評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容