很多人在spark中使用默認(rèn)提供的jdbc方法時,在數(shù)據(jù)庫數(shù)據(jù)較大時經(jīng)常發(fā)現(xiàn)任務(wù) hang 住,其實是單線程任務(wù)過重導(dǎo)致,這時候需要提高讀取的并發(fā)度濒憋。
下文以mysql
為例進(jìn)行說明。
在spark中使用jdbc
在 spark-env.sh
文件中加入:
export SPARK_CLASSPATH=/path/mysql-connector-java-5.1.34.jar
任務(wù)提交時加入:
--jars /path/mysql-connector-java-5.1.34.jar
1. 單partition(無并發(fā))
調(diào)用函數(shù)
def jdbc(url: String, table: String, properties: Properties): DataFrame
使用:
val url = "jdbc:mysql://mysqlHost:3306/database"
val tableName = "table"
// 設(shè)置連接用戶&密碼
val prop = new java.util.Properties
prop.setProperty("user","username")
prop.setProperty("password","pwd")
// 取得該表數(shù)據(jù)
val jdbcDF = sqlContext.read.jdbc(url,tableName,prop)
// 一些操作
....
查看并發(fā)度
jdbcDF.rdd.partitions.size # 結(jié)果返回 1
該操作的并發(fā)度為1陶夜,你所有的數(shù)據(jù)都會在一個partition中進(jìn)行操作凛驮,意味著無論你給的資源有多少,只有一個task會執(zhí)行任務(wù)条辟,執(zhí)行效率可想而之黔夭,并且在稍微大點(diǎn)的表中進(jìn)行操作分分鐘就會OOM。
更直觀的說法是羽嫡,達(dá)到千萬級別的表就不要使用該操作本姥,count
操作就要等一萬年,no zuo no die ,don't to try !
WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 56, spark047219):
java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java:3380)
2. 根據(jù)Long類型字段分區(qū)
調(diào)用函數(shù)
def jdbc(
url: String,
table: String,
columnName: String, # 根據(jù)該字段分區(qū)杭棵,需要為整形婚惫,比如id等
lowerBound: Long, # 分區(qū)的下界
upperBound: Long, # 分區(qū)的上界
numPartitions: Int, # 分區(qū)的個數(shù)
connectionProperties: Properties): DataFrame
使用:
val url = "jdbc:mysql://mysqlHost:3306/database"
val tableName = "table"
val columnName = "colName"
val lowerBound = 1,
val upperBound = 10000000,
val numPartitions = 10,
// 設(shè)置連接用戶&密碼
val prop = new java.util.Properties
prop.setProperty("user","username")
prop.setProperty("password","pwd")
// 取得該表數(shù)據(jù)
val jdbcDF = sqlContext.read.jdbc(url,tableName,columnName,lowerBound,upperBound,numPartitions,prop)
// 一些操作
....
查看并發(fā)度
jdbcDF.rdd.partitions.size # 結(jié)果返回 10
該操作將字段 colName
中1-10000000條數(shù)據(jù)分到10個partition中,使用很方便魂爪,缺點(diǎn)也很明顯先舷,只能使用整形數(shù)據(jù)字段作為分區(qū)關(guān)鍵字。
3000w數(shù)據(jù)的表 count
跨集群操作只要2s滓侍。
3. 根據(jù)任意類型字段分區(qū)
調(diào)用函數(shù)
jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame
下面以使用最多的時間字段分區(qū)為例:
val url = "jdbc:mysql://mysqlHost:3306/database"
val tableName = "table"
// 設(shè)置連接用戶&密碼
val prop = new java.util.Properties
prop.setProperty("user","username")
prop.setProperty("password","pwd")
/**
* 將9月16-12月15三個月的數(shù)據(jù)取出蒋川,按時間分為6個partition
* 為了減少事例代碼,這里的時間都是寫死的
* modified_time 為時間字段
*/
val predicates =
Array(
"2015-09-16" -> "2015-09-30",
"2015-10-01" -> "2015-10-15",
"2015-10-16" -> "2015-10-31",
"2015-11-01" -> "2015-11-14",
"2015-11-15" -> "2015-11-30",
"2015-12-01" -> "2015-12-15"
).map {
case (start, end) =>
s"cast(modified_time as date) >= date '$start' " + s"AND cast(modified_time as date) <= date '$end'"
}
// 取得該表數(shù)據(jù)
val jdbcDF = sqlContext.read.jdbc(url,tableName,predicates,prop)
// 一些操作
....
查看并發(fā)度
jdbcDF.rdd.partitions.size # 結(jié)果返回 6
該操作的每個分區(qū)數(shù)據(jù)都由該段時間的分區(qū)組成撩笆,這種方式適合各種場景捺球,較為推薦缸浦。
結(jié)語
以
mysql
3000W 數(shù)據(jù)量表為例,單分區(qū)count
氮兵,僵死若干分鐘報OOM裂逐。
分成5-20個分區(qū)后,
count
操作只需要2s
高并發(fā)度可以大幅度提高讀取以及處理數(shù)據(jù)的速度胆剧,但是如果設(shè)置過高(大量的partition同時讀取)也可能會將數(shù)據(jù)源數(shù)據(jù)庫弄掛絮姆。