1 定義數(shù)據(jù)庫連接
Class.forName("com.mysql.jdbc.Driver").newInstance()
val DB_URL_R = "jdbc:mysql://10.1.11.18/medical_waste?useSSL=false&characterEncoding=utf8"
val PROPERTIES = new java.util.Properties()
PROPERTIES.setProperty("user", "********")
PROPERTIES.setProperty("password", "********")
2 讀取兩個(gè)表
val rfidCardDF = spark.read.jdbc(DB_URL_R, "t_rfid_card", Array("card_type = 22"), PROPERTIES).select("card_id","card_label").cache()
val medicalWasteDF = spark.read.jdbc(DB_URL_R, "t_medical_waste", Array("YEAR(rec_ts) = 2017"), PROPERTIES).select("team_id","mw_weight").cache()
3 連接
val df2 = medicalWasteDF.join(rfidCardDF,medicalWasteDF("team_id") equalTo(rfidCardDF("card_id"))).drop("card_id").cache()
使用join引有,默認(rèn)是left out join刃宵。條件判斷是相等。然后刪除掉一個(gè)重復(fù)的列card_id刺啦。
4 統(tǒng)計(jì)
df2.groupBy("card_label").agg(sum("mw_weight")).orderBy(col("sum(mw_weight)").desc).show()?
group by操作埃疫,生成一個(gè)新的數(shù)據(jù)集伏恐,增加了一列sum操作,生成一個(gè)默認(rèn)列名sum(mw_weight)的列栓霜,然后倒序排個(gè)序翠桦。