collect, collect_list, collect_set
collect常用的有兩個(gè)函數(shù):collect_list(不去重)和collect_set(去重)
# build a test dataframe
df = pd.DataFrame({'type1':['a1', 'a1', 'a1', 'a2', 'a2', 'a2'],
'type2':['b1', 'b2', 'b3', 'b4', 'b5', 'b5'],
'value':[1, 2, 3, 4, 5, 6]})
df = spark.createDataFrame(df)
df.createOrReplaceTempView('collect_test')
df.view()
+-----+-----+-----+
|type1|type2|value|
+-----+-----+-----+
| a1| b1| 1|
| a1| b2| 2|
| a1| b3| 3|
| a2| b4| 4|
| a2| b5| 5|
| a2| b6| 6|
+-----+-----+-----+
collect_list
spark.sql('''SELECT type1, COLLECT_LIST(type2) as type2
FROM collect_test
GROUP BY type1''').show()
+-----+------------+
|type1| type2|
+-----+------------+
| a2|[b4, b5, b5]|
| a1|[b1, b2, b3]|
+-----+------------+
collect_set
spark.sql('''SELECT type1, COLLECT_SET(type2) as type2
FROM collect_test
GROUP BY type1''').show()
+-----+------------+
|type1| type2|
+-----+------------+
| a2| [b4, b5]|
| a1|[b1, b3, b2]|
+-----+------------+
collect后返回的是一個(gè)數(shù)組,可以通過(guò)array[x]來(lái)調(diào)用數(shù)據(jù)欣尼。通過(guò)這點(diǎn)我們可以進(jìn)行透視表的操作爆雹,類似定義array[0] as a1, array[1] as a2...
explode
explode的定義是將數(shù)組的每個(gè)數(shù)據(jù)展開,如下我們就可以將上面的dataframe還原為最初的樣式愕鼓。
spark.sql('''SELECT type1, EXPLODE(type2) as type2
FROM(SELECT type1, COLLECT_LIST(type2) as type2
FROM collect_test
GROUP BY type1) a''').show()
+-----+-----+
|type1|type2|
+-----+-----+
| a2| b4|
| a2| b5|
| a2| b5|
| a1| b1|
| a1| b2|
| a1| b3|
+-----+-----+
posexplode可以在拆分列的同時(shí)钙态,增加一列序號(hào)
spark.sql('''SELECT type1, posexplode(type2) as (index, type2)
FROM(SELECT type1, COLLECT_LIST(type2) as type2
FROM collect_test
GROUP BY type1) a''').show()
+-----+-----+-----+
|type1|index|type2|
+-----+-----+-----+
| a2| 0| b4|
| a2| 1| b5|
| a2| 2| b5|
| a1| 0| b1|
| a1| 1| b2|
| a1| 2| b3|
+-----+-----+-----+
但是如果表內(nèi)有如下兩個(gè)一一對(duì)應(yīng)的數(shù)組,我們?cè)撊绾尾鸱帜兀?/p>
+-----+------------+---------+
|type1| type2| value|
+-----+------------+---------+
| a2|[b4, b5, b5]|[4, 5, 6]|
| a1|[b1, b2, b3]|[1, 2, 3]|
+-----+------------+---------+
按照直覺菇晃,我們嘗試分別explode()
spark.sql('''SELECT type1, explode(type2) as type2, explode(value) as value
FROM(SELECT type1, COLLECT_LIST(type2) as type2
, COLLECT_LIST(value) as value
FROM collect_test
GROUP BY type1) a''').show()
AnalysisException: Only one generator allowed per select clause but found 2: explode(type2), explode(value);
解決這個(gè)問題册倒,我們需要使用LATERAL VIEW
lateral view
lateral view可以理解為創(chuàng)建了一個(gè)表,然后JOIN到了查詢的表上磺送,這樣就避免了兩個(gè)生成器的問題
spark.sql('''SELECT type1, exploded_type2.type2, exploded_value.value
FROM(SELECT type1, COLLECT_LIST(type2) as type2
, COLLECT_LIST(value) as value
FROM collect_test
GROUP BY type1) a
LATERAL VIEW POSEXPLODE(type2) exploded_type2 as type_index, type2
LATERAL VIEW POSEXPLODE(value) exploded_value as value_index, value
WHERE type_index = value_index -- 避免為笛卡爾積
''').show()
split
split則是將一個(gè)字符串根據(jù)分隔符驻子,變化為一個(gè)數(shù)組
df = pd.DataFrame({'type1':['a', 'b', 'c'],
'type2':['1_2_3', '1_23', '_1']})
df = spark.createDataFrame(df)
df.createOrReplaceTempView('collect_test')
spark.sql('''SELECT * FROM collect_test''').show()
+-----+-----+
|type1|type2|
+-----+-----+
| a|1_2_3|
| b| 1_23|
| c| _1|
+-----+-----+
spark.sql('''SELECT type1, split(type2, '_') as splited_type2 FROM collect_test''').show()
+-----+-------------------+
|type1|splited_type2|
+-----+-------------------+
| a| [1, 2, 3]|
| b| [1, 23]|
| c| [, 1]|
+-----+-------------------+
transform
transform會(huì)引用一個(gè)函數(shù)在數(shù)組的每個(gè)元素上,返回一個(gè)數(shù)列
schema = StructType([StructField('celsius', ArrayType(IntegerType()))])
t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]
t_c = spark.createDataFrame(t_list, schema)
t_c.createOrReplaceTempView("tC")
t_c.show()
+--------------------+
| celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+
spark.sql(""" SELECT celsius, TRANSFORM(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit
FROM tC """).show()
+--------------------+--------------------+
| celsius| fahrenheit|
+--------------------+--------------------+
|[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
|[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
+--------------------+--------------------+
filter
filter為通過(guò)條件刪選估灿,返回一個(gè)數(shù)列
spark.sql(""" SELECT celsius, FILTER(celsius, t -> t >= 40) as high_temp
FROM tC """).show()
+--------------------+---------+
| celsius|high_temp|
+--------------------+---------+
|[35, 36, 32, 30, ...| [40, 42]|
|[31, 32, 34, 55, 56]| [55, 56]|
+--------------------+---------+
exists
exists為判斷是否包含該元素崇呵,返回一個(gè)布爾值
spark.sql(""" SELECT celsius, EXISTS(celsius, t -> t >= 40) as is_high_temp
FROM tC """).show()
+--------------------+------------+
| celsius|is_high_temp|
+--------------------+------------+
|[35, 36, 32, 30, ...| true|
|[31, 32, 34, 55, 56]| true|
+--------------------+------------+
reduce
reduce為通過(guò)兩個(gè)函數(shù),將數(shù)組聚合為一個(gè)值甲捏,然后對(duì)該值進(jìn)行運(yùn)算
spark.sql(""" SELECT celsius,
reduce(celsius
, (t, acc) -> ((t * 9) div 5) + 32 + acc
, acc -> (acc div size(celsius))) as avgFahrenheit
FROM tC """).show()
+--------------------+-------------+
| celsius|avgFahrenheit|
+--------------------+-------------+
|[35, 36, 32, 30, ...| 96|
|[31, 32, 34, 55, 56]| 105|
+--------------------+-------------+
其他函數(shù)
Reference
Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee