本文介紹了使用Spark連接Mysql的五種方式宠默。
主要內(nèi)容:
- 不指定查詢條件
- 指定數(shù)據(jù)庫字段的范圍
- 根據(jù)任意字段進行分區(qū)
- 通過load獲取,和方式二類似
- 加載條件查詢后的數(shù)據(jù)
1.不指定查詢條件
def main(args: Array[String]): Unit = {
val spark =
SparkSession.builder()
.appName("MysqlSupport")
.master("local[2]")
.getOrCreate()
method1(spark)
//method2(spark)
//method3(spark)
//method4(spark)
//method5(spark)
}
/**
* 方式一:不指定查詢條件
* 所有的數(shù)據(jù)由RDD的一個分區(qū)處理灵巧,如果你這個表很大搀矫,很可能會出現(xiàn)OOM
*
* @param spark
*/
def method1(spark: SparkSession): Unit = {
val url = "jdbc:mysql://127.0.0.1:3306/test?user=root&password=root"
val prop = new Properties()
val df = spark.read.jdbc(url, "t_score", prop)
println(df.count())
println(df.rdd.partitions.size)
df.createOrReplaceTempView("t_score")
import spark.sql
sql("select * from t_score where score=98").show()
}
2.指定數(shù)據(jù)庫字段的范圍
/**
* 方式二:指定數(shù)據(jù)庫字段的范圍
* 通過lowerBound和upperBound 指定分區(qū)的范圍
* 通過columnName 指定分區(qū)的列(只支持整形)
* 通過numPartitions 指定分區(qū)數(shù)量 (不宜過大)
*
* @param spark
*/
def method2(spark: SparkSession): Unit = {
val lowerBound = 1
val upperBound = 100000
val numPartitions = 5
val url = "jdbc:mysql://127.0.0.1:3306/test?user=root&password=root"
val prop = new Properties()
val df = spark.read.jdbc(url, "t_score", "id", lowerBound, upperBound, numPartitions, prop)
println(df.count())
println(df.rdd.partitions.size)
}
3.根據(jù)任意字段進行分區(qū)
/**
* 方式三:根據(jù)任意字段進行分區(qū)
* 通過predicates將數(shù)據(jù)根據(jù)score分為2個區(qū)
*
* @param spark
*/
def method3(spark: SparkSession): Unit = {
val predicates = Array[String]("score <= 97", "score > 97 and score <= 100")
val url = "jdbc:mysql://127.0.0.1:3306/test?user=root&password=root"
val prop = new Properties()
val df = spark.read.jdbc(url, "t_score", predicates, prop)
println(df.count())
println(df.rdd.partitions.size)
import spark.sql
df.createOrReplaceTempView("t_score")
sql("select * from t_score").show()
}
4.通過load獲取,和方式二類似
/**
* 方式四:通過load獲取刻肄,和方式二類似
* @param spark
*/
def method4(spark: SparkSession): Unit = {
val url = "jdbc:mysql://127.0.0.1:3306/test?user=root&password=root"
val df = spark.read.format("jdbc").options(Map("url" -> url, "dbtable" -> "t_score")).load()
println(df.count())
println(df.rdd.partitions.size)
import spark.sql
df.createOrReplaceTempView("t_score")
sql("select * from t_score").show()
}
5.加載條件查詢后的數(shù)據(jù)
/**
* 方式五:加載條件查詢后的數(shù)據(jù)
* @param spark
*/
def method5(spark: SparkSession): Unit = {
val url = "jdbc:mysql://127.0.0.1:3306/test?user=root&password=root"
val df = spark.read.format("jdbc").options(Map("url" -> url, "dbtable" -> "(SELECT s.*,u.name FROM t_score s JOIN t_user u ON s.id=u.score_id) t_score")).load()
println(df.count())
println(df.rdd.partitions.size)
import spark.sql
df.createOrReplaceTempView("t_score")
sql("select * from t_score").show()
Thread.sleep(60 * 1000)
}