Spark Join 操作
[TOC]
從文檔中可以看到關(guān)于join的介紹:join(other, on=None, how=None)
從函數(shù)中可以看到有三個參數(shù):
- other:需要合并的DataFrame格式的數(shù)據(jù)涕癣。官方寫的是Right side of the join只泼,翻譯過來就是放在右側(cè)的DataFrame數(shù)據(jù)莲祸。
- on:用來執(zhí)行對等連接的列名,可以是字符串沛硅、字符串列表或者表達(dá)式。如果是字符串或者字符串列表养匈,那么兩邊的數(shù)據(jù)都得存在該列秋度。spark的橫向合并不向pandas那么簡單芍秆,直接橫向拼接。spark合并必須有對應(yīng)的列作為參照翠勉,列值形同的就合并妖啥,不存在的就填充空值。
- how:合并方式对碌。默認(rèn)的是inner荆虱,其他的還有cross,outer朽们,full怀读,full_outer,left骑脱,left_outer菜枷,right,right_outer叁丧,left_semi啤誊,left_anti。
一.創(chuàng)建數(shù)據(jù)
首先創(chuàng)建如下兩組數(shù)據(jù):
我將在這兩組數(shù)據(jù)基礎(chǔ)上做些測試拥娄。
創(chuàng)建代碼:
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.master("local") \
.appName("create df") \
.getOrCreate()
# 第一組數(shù)據(jù)蚊锹,包含年齡、體重条舔、身高信息
body_info = [["Bom", 20, 97.6, 165],
["Alice", 23, 90.0, 160],
["kuke", 33, 190.0, 170],
["jike", 19, 120.0, 170],
["Joe", 24, 89.0, 162]]
body_df = spark.createDataFrame(body_info, ["name", "age", "weight", "height"])
body_df.show()
# 第二組數(shù)據(jù)枫耳,包含一些成績信息,語數(shù)外
score_info = [["Bom", 88, 97, 90],
["Alice", 85, 99, 92],
["kuke", 77, 82, 80],
["jike", 65, 58, 30],
["Joe", 90, 100, 92]]
score_df = spark.createDataFrame(score_info, ["name", "Chinese", "Math", "English"])
score_df.show()
# 輸出
+-----+---+------+------+
| name|age|weight|height|
+-----+---+------+------+
| Bom| 20| 97.6| 165|
|Alice| 23| 90.0| 160|
| kuke| 33| 190.0| 170|
| jike| 19| 120.0| 170|
| Joe| 24| 89.0| 162|
+-----+---+------+------+
+-----+-------+----+-------+
| name|Chinese|Math|English|
+-----+-------+----+-------+
| Bom| 88| 97| 90|
|Alice| 85| 99| 92|
| kuke| 77| 82| 80|
| jike| 65| 58| 30|
| Joe| 90| 100| 92|
+-----+-------+----+-------+
二.合并操作
這節(jié)的操作只針對第三個參數(shù)how做實驗孟抗,第二個參數(shù)on都設(shè)為name迁杨。
1.inner
情況一:第一組數(shù)據(jù)行數(shù) = 第二組數(shù)據(jù)行數(shù)
合并代碼:
merge_df = body_df.join(score_df, on="name", how="inner")
merge_df.show()
輸出
+-----+---+------+------+-------+----+-------+
| name|age|weight|height|Chinese|Math|English|
+-----+---+------+------+-------+----+-------+
| jike| 19| 120.0| 170| 65| 58| 30|
| Bom| 20| 97.6| 165| 88| 97| 90|
| Joe| 24| 89.0| 162| 90| 100| 92|
|Alice| 23| 90.0| 160| 85| 99| 92|
| kuke| 33| 190.0| 170| 77| 82| 80|
+-----+---+------+------+-------+----+-------+
結(jié)果顯而易見,是把第二組數(shù)據(jù)除name列拼到了第一組的右邊凄硼。
情況二:第一組數(shù)據(jù)行數(shù) > 第二組數(shù)據(jù)行數(shù)
刪掉第二組數(shù)據(jù)的2和4行铅协,變?yōu)椋?/p>
+-----+-------+----+-------+
| name|Chinese|Math|English|
+-----+-------+----+-------+
| Bom| 88| 97| 90|
| kuke| 77| 82| 80|
| Joe| 90| 100| 92|
+-----+-------+----+-------+
合并代碼:
merge_df = body_df.join(score_df, on="name", how="inner")
merge_df.show()
輸出:
+----+---+------+------+-------+----+-------+
|name|age|weight|height|Chinese|Math|English|
+----+---+------+------+-------+----+-------+
| Bom| 20| 97.6| 165| 88| 97| 90|
| Joe| 24| 89.0| 162| 90| 100| 92|
|kuke| 33| 190.0| 170| 77| 82| 80|
+----+---+------+------+-------+----+-------+
從輸出結(jié)果可以看到,inner操作先根據(jù)兩組的name列求交集摊沉,再合并數(shù)據(jù)狐史。
情況三:第一組數(shù)據(jù)行數(shù) < 第二組數(shù)據(jù)行數(shù)
刪掉第一組數(shù)據(jù)的2和4行,變?yōu)椋?/p>
+-----+---+------+------+
| name|age|weight|height|
+-----+---+------+------+
| Bom| 20| 97.6| 165|
| kuke| 33| 190.0| 170|
| Joe| 24| 89.0| 162|
+-----+---+------+------+
合并代碼:
merge_df = body_df.join(score_df, on="name", how="inner")
merge_df.show()
輸出:
+----+---+------+------+-------+----+-------+
|name|age|weight|height|Chinese|Math|English|
+----+---+------+------+-------+----+-------+
| Bom| 20| 97.6| 165| 88| 97| 90|
| Joe| 24| 89.0| 162| 90| 100| 92|
|kuke| 33| 190.0| 170| 77| 82| 80|
+----+---+------+------+-------+----+-------+
結(jié)果和情況二一樣说墨,先求交集骏全,在合并。
2.cross
情況一:第一組數(shù)據(jù)行數(shù) = 第二組數(shù)據(jù)行數(shù)
合并代碼:
merge_df = body_df.join(score_df, on="name", how="cross")
merge_df.show()
在spark2.4.7中用上邊的代碼運行會毫不客氣的報錯:
pyspark.sql.utils.IllegalArgumentException: 'requirement failed: Unsupported using join type Cross'
之后通過Google尼斧,搜索到不是這么用:
改成如下:
merge_df = body_df.crossJoin(score_df)
merge_df.show()
輸出:
+-----+---+------+------+-----+-------+----+-------+
| name|age|weight|height| name|Chinese|Math|English|
+-----+---+------+------+-----+-------+----+-------+
| Bom| 20| 97.6| 165| Bom| 88| 97| 90|
| Bom| 20| 97.6| 165|Alice| 85| 99| 92|
| Bom| 20| 97.6| 165| kuke| 77| 82| 80|
| Bom| 20| 97.6| 165| jike| 65| 58| 30|
| Bom| 20| 97.6| 165| Joe| 90| 100| 92|
|Alice| 23| 90.0| 160| Bom| 88| 97| 90|
|Alice| 23| 90.0| 160|Alice| 85| 99| 92|
|Alice| 23| 90.0| 160| kuke| 77| 82| 80|
|Alice| 23| 90.0| 160| jike| 65| 58| 30|
|Alice| 23| 90.0| 160| Joe| 90| 100| 92|
| kuke| 33| 190.0| 170| Bom| 88| 97| 90|
| kuke| 33| 190.0| 170|Alice| 85| 99| 92|
| kuke| 33| 190.0| 170| kuke| 77| 82| 80|
| kuke| 33| 190.0| 170| jike| 65| 58| 30|
| kuke| 33| 190.0| 170| Joe| 90| 100| 92|
| jike| 19| 120.0| 170| Bom| 88| 97| 90|
| jike| 19| 120.0| 170|Alice| 85| 99| 92|
| jike| 19| 120.0| 170| kuke| 77| 82| 80|
| jike| 19| 120.0| 170| jike| 65| 58| 30|
| jike| 19| 120.0| 170| Joe| 90| 100| 92|
+-----+---+------+------+-----+-------+----+-------+
看到這結(jié)果開始有點摸不著頭腦姜贡,后來看懂了是第一組數(shù)據(jù)的每一行都會和第二組的每一行生成新的一行。
情況二:第一組數(shù)據(jù)行數(shù) > 第二組數(shù)據(jù)行數(shù)
刪掉第二組數(shù)據(jù)的2和4行棺棵,變?yōu)椋?/p>
+-----+-------+----+-------+
| name|Chinese|Math|English|
+-----+-------+----+-------+
| Bom| 88| 97| 90|
| kuke| 77| 82| 80|
| Joe| 90| 100| 92|
+-----+-------+----+-------+
合并代碼:
merge_df = body_df.crossJoin(score_df)
merge_df.show()
輸出:
+-----+---+------+------+----+-------+----+-------+
| name|age|weight|height|name|Chinese|Math|English|
+-----+---+------+------+----+-------+----+-------+
| Bom| 20| 97.6| 165| Bom| 88| 97| 90|
| Bom| 20| 97.6| 165|kuke| 77| 82| 80|
| Bom| 20| 97.6| 165| Joe| 90| 100| 92|
|Alice| 23| 90.0| 160| Bom| 88| 97| 90|
|Alice| 23| 90.0| 160|kuke| 77| 82| 80|
|Alice| 23| 90.0| 160| Joe| 90| 100| 92|
| kuke| 33| 190.0| 170| Bom| 88| 97| 90|
| kuke| 33| 190.0| 170|kuke| 77| 82| 80|
| kuke| 33| 190.0| 170| Joe| 90| 100| 92|
| jike| 19| 120.0| 170| Bom| 88| 97| 90|
| jike| 19| 120.0| 170|kuke| 77| 82| 80|
| jike| 19| 120.0| 170| Joe| 90| 100| 92|
| Joe| 24| 89.0| 162| Bom| 88| 97| 90|
| Joe| 24| 89.0| 162|kuke| 77| 82| 80|
| Joe| 24| 89.0| 162| Joe| 90| 100| 92|
+-----+---+------+------+----+-------+----+-------+
雖然少了兩行楼咳,但不影響情況一對結(jié)論熄捍。
情況三:第一組數(shù)據(jù)行數(shù) < 第二組數(shù)據(jù)行數(shù)
刪掉第一組數(shù)據(jù)的2和4行,變?yōu)椋?/p>
+-----+---+------+------+
| name|age|weight|height|
+-----+---+------+------+
| Bom| 20| 97.6| 165|
| kuke| 33| 190.0| 170|
| Joe| 24| 89.0| 162|
+-----+---+------+------+
合并代碼:
merge_df = body_df.crossJoin(score_df)
merge_df.show()
輸出:
+----+---+------+------+-----+-------+----+-------+
|name|age|weight|height| name|Chinese|Math|English|
+----+---+------+------+-----+-------+----+-------+
| Bom| 20| 97.6| 165| Bom| 88| 97| 90|
| Bom| 20| 97.6| 165|Alice| 85| 99| 92|
| Bom| 20| 97.6| 165| kuke| 77| 82| 80|
| Bom| 20| 97.6| 165| jike| 65| 58| 30|
| Bom| 20| 97.6| 165| Joe| 90| 100| 92|
|kuke| 33| 190.0| 170| Bom| 88| 97| 90|
|kuke| 33| 190.0| 170|Alice| 85| 99| 92|
|kuke| 33| 190.0| 170| kuke| 77| 82| 80|
|kuke| 33| 190.0| 170| jike| 65| 58| 30|
|kuke| 33| 190.0| 170| Joe| 90| 100| 92|
| Joe| 24| 89.0| 162| Bom| 88| 97| 90|
| Joe| 24| 89.0| 162|Alice| 85| 99| 92|
| Joe| 24| 89.0| 162| kuke| 77| 82| 80|
| Joe| 24| 89.0| 162| jike| 65| 58| 30|
| Joe| 24| 89.0| 162| Joe| 90| 100| 92|
+----+---+------+------+-----+-------+----+-------+
結(jié)論和情況一一樣母怜。
3.outer
情況一:第一組數(shù)據(jù)行數(shù) = 第二組數(shù)據(jù)行數(shù)
合并代碼:
merge_df = body_df.join(score_df, on="name", how="outer")
merge_df.show()
輸出:
+-----+---+------+------+-------+----+-------+
| name|age|weight|height|Chinese|Math|English|
+-----+---+------+------+-------+----+-------+
| jike| 19| 120.0| 170| 65| 58| 30|
| Bom| 20| 97.6| 165| 88| 97| 90|
| Joe| 24| 89.0| 162| 90| 100| 92|
|Alice| 23| 90.0| 160| 85| 99| 92|
| kuke| 33| 190.0| 170| 77| 82| 80|
+-----+---+------+------+-------+----+-------+
結(jié)果和inner合并的情況一一樣余耽,結(jié)論暫時未知。
情況二:第一組數(shù)據(jù)行數(shù) > 第二組數(shù)據(jù)行數(shù)
刪掉第二組數(shù)據(jù)的2和4行苹熏,變?yōu)椋?/p>
+-----+-------+----+-------+
| name|Chinese|Math|English|
+-----+-------+----+-------+
| Bom| 88| 97| 90|
| kuke| 77| 82| 80|
| Joe| 90| 100| 92|
+-----+-------+----+-------+
合并代碼:
merge_df = body_df.join(score_df, on="name", how="outer")
merge_df.show()
輸出:
+-----+---+------+------+-------+----+-------+
| name|age|weight|height|Chinese|Math|English|
+-----+---+------+------+-------+----+-------+
| jike| 19| 120.0| 170| null|null| null|
| Bom| 20| 97.6| 165| 88| 97| 90|
| Joe| 24| 89.0| 162| 90| 100| 92|
|Alice| 23| 90.0| 160| null|null| null|
| kuke| 33| 190.0| 170| 77| 82| 80|
+-----+---+------+------+-------+----+-------+
從輸出結(jié)果可以看到碟贾,沒有的數(shù)據(jù)用null填充。
情況三:第一組數(shù)據(jù)行數(shù) < 第二組數(shù)據(jù)行數(shù)
刪掉第一組數(shù)據(jù)2和4行轨域,變?yōu)椋?/p>
+-----+---+------+------+
| name|age|weight|height|
+-----+---+------+------+
| Bom| 20| 97.6| 165|
| kuke| 33| 190.0| 170|
| Joe| 24| 89.0| 162|
+-----+---+------+------+
合并代碼:
merge_df = body_df.join(score_df, on="name", how="outer")
merge_df.show()
輸出:
+-----+----+------+------+-------+----+-------+
| name| age|weight|height|Chinese|Math|English|
+-----+----+------+------+-------+----+-------+
| jike|null| null| null| 65| 58| 30|
| Bom| 20| 97.6| 165| 88| 97| 90|
| Joe| 24| 89.0| 162| 90| 100| 92|
|Alice|null| null| null| 85| 99| 92|
| kuke| 33| 190.0| 170| 77| 82| 80|
+-----+----+------+------+-------+----+-------+
結(jié)合前面兩種情況缕陕,先根據(jù)name列的值求并集,有數(shù)據(jù)的直接合并疙挺,沒有的填充null。