Learning Spark [6] - Spark SQL高級(jí)函數(shù)

collect, collect_list, collect_set

collect常用的有兩個(gè)函數(shù):collect_list(不去重)和collect_set(去重)

# build a test dataframe
df = pd.DataFrame({'type1':['a1', 'a1', 'a1', 'a2', 'a2', 'a2'],
                   'type2':['b1', 'b2', 'b3', 'b4', 'b5', 'b5'],
                   'value':[1, 2, 3, 4, 5, 6]})
df = spark.createDataFrame(df)
df.createOrReplaceTempView('collect_test')
df.view()
+-----+-----+-----+
|type1|type2|value|
+-----+-----+-----+
|   a1|   b1|    1|
|   a1|   b2|    2|
|   a1|   b3|    3|
|   a2|   b4|    4|
|   a2|   b5|    5|
|   a2|   b6|    6|
+-----+-----+-----+

collect_list

spark.sql('''SELECT type1, COLLECT_LIST(type2) as type2 
             FROM collect_test 
             GROUP BY type1''').show()
+-----+------------+
|type1|       type2|
+-----+------------+
|   a2|[b4, b5, b5]|
|   a1|[b1, b2, b3]|
+-----+------------+

collect_set

spark.sql('''SELECT type1, COLLECT_SET(type2) as type2 
             FROM collect_test 
             GROUP BY type1''').show()
+-----+------------+
|type1|       type2|
+-----+------------+
|   a2|    [b4, b5]|
|   a1|[b1, b3, b2]|
+-----+------------+

collect后返回的是一個(gè)數(shù)組,可以通過(guò)array[x]來(lái)調(diào)用數(shù)據(jù)欣尼。通過(guò)這點(diǎn)我們可以進(jìn)行透視表的操作爆雹,類似定義array[0] as a1, array[1] as a2...

explode

explode的定義是將數(shù)組的每個(gè)數(shù)據(jù)展開,如下我們就可以將上面的dataframe還原為最初的樣式愕鼓。

spark.sql('''SELECT type1, EXPLODE(type2) as type2
             FROM(SELECT type1, COLLECT_LIST(type2) as type2 
                  FROM collect_test 
                  GROUP BY type1) a''').show()
+-----+-----+
|type1|type2|
+-----+-----+
|   a2|   b4|
|   a2|   b5|
|   a2|   b5|
|   a1|   b1|
|   a1|   b2|
|   a1|   b3|
+-----+-----+

posexplode可以在拆分列的同時(shí)钙态,增加一列序號(hào)

spark.sql('''SELECT type1, posexplode(type2) as (index, type2)
             FROM(SELECT type1, COLLECT_LIST(type2) as type2 
                  FROM collect_test 
                  GROUP BY type1) a''').show()
+-----+-----+-----+
|type1|index|type2|
+-----+-----+-----+
|   a2|    0|   b4|
|   a2|    1|   b5|
|   a2|    2|   b5|
|   a1|    0|   b1|
|   a1|    1|   b2|
|   a1|    2|   b3|
+-----+-----+-----+

但是如果表內(nèi)有如下兩個(gè)一一對(duì)應(yīng)的數(shù)組,我們?cè)撊绾尾鸱帜兀?/p>

+-----+------------+---------+
|type1|       type2|    value|
+-----+------------+---------+
|   a2|[b4, b5, b5]|[4, 5, 6]|
|   a1|[b1, b2, b3]|[1, 2, 3]|
+-----+------------+---------+

按照直覺菇晃,我們嘗試分別explode()

spark.sql('''SELECT type1, explode(type2) as type2, explode(value) as value
             FROM(SELECT type1, COLLECT_LIST(type2) as type2
                    , COLLECT_LIST(value) as value 
                  FROM collect_test 
                  GROUP BY type1) a''').show()
AnalysisException: Only one generator allowed per select clause but found 2: explode(type2), explode(value);

解決這個(gè)問題册倒,我們需要使用LATERAL VIEW

lateral view

lateral view可以理解為創(chuàng)建了一個(gè)表,然后JOIN到了查詢的表上磺送,這樣就避免了兩個(gè)生成器的問題

spark.sql('''SELECT type1, exploded_type2.type2, exploded_value.value
             FROM(SELECT type1, COLLECT_LIST(type2) as type2
                    , COLLECT_LIST(value) as value 
                  FROM collect_test 
                  GROUP BY type1) a
             LATERAL VIEW POSEXPLODE(type2) exploded_type2 as type_index, type2
             LATERAL VIEW POSEXPLODE(value) exploded_value as value_index, value
             WHERE type_index = value_index -- 避免為笛卡爾積
             ''').show()

split

split則是將一個(gè)字符串根據(jù)分隔符驻子,變化為一個(gè)數(shù)組

df = pd.DataFrame({'type1':['a', 'b', 'c'],
                   'type2':['1_2_3', '1_23', '_1']})
df = spark.createDataFrame(df)
df.createOrReplaceTempView('collect_test')
spark.sql('''SELECT * FROM collect_test''').show()
+-----+-----+
|type1|type2|
+-----+-----+
|    a|1_2_3|
|    b| 1_23|
|    c|   _1|
+-----+-----+
spark.sql('''SELECT type1, split(type2, '_') as splited_type2 FROM collect_test''').show()
+-----+-------------------+
|type1|splited_type2|
+-----+-------------------+
|    a|          [1, 2, 3]|
|    b|            [1, 23]|
|    c|              [, 1]|
+-----+-------------------+

transform

transform會(huì)引用一個(gè)函數(shù)在數(shù)組的每個(gè)元素上,返回一個(gè)數(shù)列

schema = StructType([StructField('celsius', ArrayType(IntegerType()))])
t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]] 
t_c = spark.createDataFrame(t_list, schema) 
t_c.createOrReplaceTempView("tC")
t_c.show()
+--------------------+
|             celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+
spark.sql(""" SELECT celsius, TRANSFORM(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit 
              FROM tC """).show()
+--------------------+--------------------+
|             celsius|          fahrenheit|
+--------------------+--------------------+
|[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
|[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
+--------------------+--------------------+

filter

filter為通過(guò)條件刪選估灿,返回一個(gè)數(shù)列

spark.sql(""" SELECT celsius, FILTER(celsius, t -> t >= 40) as high_temp 
              FROM tC """).show()
+--------------------+---------+
|             celsius|high_temp|
+--------------------+---------+
|[35, 36, 32, 30, ...| [40, 42]|
|[31, 32, 34, 55, 56]| [55, 56]|
+--------------------+---------+

exists

exists為判斷是否包含該元素崇呵,返回一個(gè)布爾值

spark.sql(""" SELECT celsius, EXISTS(celsius, t -> t >= 40) as is_high_temp 
              FROM tC """).show()
+--------------------+------------+
|             celsius|is_high_temp|
+--------------------+------------+
|[35, 36, 32, 30, ...|        true|
|[31, 32, 34, 55, 56]|        true|
+--------------------+------------+

reduce

reduce為通過(guò)兩個(gè)函數(shù),將數(shù)組聚合為一個(gè)值甲捏,然后對(duì)該值進(jìn)行運(yùn)算

spark.sql(""" SELECT celsius, 
                     reduce(celsius
                            , (t, acc) -> ((t * 9) div 5) + 32 + acc
                            , acc -> (acc div size(celsius))) as avgFahrenheit 
              FROM tC """).show()
+--------------------+-------------+ 
|             celsius|avgFahrenheit| 
+--------------------+-------------+ 
|[35, 36, 32, 30, ...|           96| 
|[31, 32, 34, 55, 56]|          105| 
+--------------------+-------------+

其他函數(shù)

Spark SQL高級(jí)函數(shù) part1

Spark SQL高級(jí)函數(shù) part2

Spark SQL高級(jí)函數(shù) part3

Reference
Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末演熟,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子司顿,更是在濱河造成了極大的恐慌芒粹,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,470評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件大溜,死亡現(xiàn)場(chǎng)離奇詭異化漆,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)钦奋,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,393評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門座云,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)疙赠,“玉大人,你說(shuō)我怎么就攤上這事朦拖∑匝簦” “怎么了?”我有些...
    開封第一講書人閱讀 162,577評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵璧帝,是天一觀的道長(zhǎng)捍岳。 經(jīng)常有香客問我,道長(zhǎng)睬隶,這世上最難降的妖魔是什么锣夹? 我笑而不...
    開封第一講書人閱讀 58,176評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮苏潜,結(jié)果婚禮上银萍,老公的妹妹穿的比我還像新娘。我一直安慰自己恤左,他們只是感情好贴唇,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,189評(píng)論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著赃梧,像睡著了一般滤蝠。 火紅的嫁衣襯著肌膚如雪豌熄。 梳的紋絲不亂的頭發(fā)上授嘀,一...
    開封第一講書人閱讀 51,155評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音锣险,去河邊找鬼蹄皱。 笑死,一個(gè)胖子當(dāng)著我的面吹牛芯肤,可吹牛的內(nèi)容都是我干的巷折。 我是一名探鬼主播,決...
    沈念sama閱讀 40,041評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼崖咨,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼锻拘!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起击蹲,我...
    開封第一講書人閱讀 38,903評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤署拟,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后歌豺,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體推穷,經(jīng)...
    沈念sama閱讀 45,319評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,539評(píng)論 2 332
  • 正文 我和宋清朗相戀三年类咧,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了馒铃。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蟹腾。...
    茶點(diǎn)故事閱讀 39,703評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖区宇,靈堂內(nèi)的尸體忽然破棺而出娃殖,到底是詐尸還是另有隱情,我是刑警寧澤议谷,帶...
    沈念sama閱讀 35,417評(píng)論 5 343
  • 正文 年R本政府宣布珊随,位于F島的核電站,受9級(jí)特大地震影響柿隙,放射性物質(zhì)發(fā)生泄漏叶洞。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,013評(píng)論 3 325
  • 文/蒙蒙 一禀崖、第九天 我趴在偏房一處隱蔽的房頂上張望衩辟。 院中可真熱鬧,春花似錦波附、人聲如沸艺晴。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,664評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)封寞。三九已至,卻和暖如春仅财,著一層夾襖步出監(jiān)牢的瞬間狈究,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,818評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工盏求, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留抖锥,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,711評(píng)論 2 368
  • 正文 我出身青樓碎罚,卻偏偏與公主長(zhǎng)得像磅废,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子荆烈,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,601評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容