整體思路是通過partition并行鏈接關系數(shù)據(jù)庫墨辛。
實現(xiàn):
1. 加載驅動程序
正確配置:
--driver-class-path "driver_local_file_system_jdbc_driver1.jar:driver_local_file_system_jdbc_driver2.jar"
--class "spark.executor.extraClassPath=executors_local_file_system_jdbc_driver1.jar:executors_local_file_system_jdbc_driver2.jar"
如果需要在NoteBook中執(zhí)行任務硼控,需要在啟動前設置EXTRA_CLASSPATH信认,執(zhí)行如下命令:
export EXTRA_CLASSPATH=path_to_the_first_jar:path_to_the_second_jar
2. 并行加載
有兩種方式:
1)按照指定列進行統(tǒng)一分區(qū)
2)通過用戶自定義謂詞分區(qū)
按照指定列進行統(tǒng)一分區(qū)
指定列必須是數(shù)字類型
使用方法
sqlctx.read.jdbc(url = "<URL>", table = "<TABLE>",
columnName = "<INTEGRAL_COLUMN_TO_PARTITION>",
lowerBound = minValue,
upperBound = maxValue,
numPartitions = 20,
connectionProperties = new java.util.Properties()
)
通過用戶自定義謂詞分區(qū)
使用方法
val predicates = Array("2015-06-20" -> "2015-06-30", "2015-07-01" -> "2015-07-10", "2015-07-11" -> "2015-07-20",
"2015-07-21" -> "2015-07-31").map {
case (start, end) => s"cast(DAT_TME as date) >= date '$start' " + "AND cast(DAT_TME as date) <= date '$end'"
}
sqlctx.read.jdbc(url = "<URL>", table = "<TABLE>", predicates = predicates, connectionProperties = new java.util.Properties())
3.表格union
def readTable(table: String): DataFrame
List("<TABLE1>", "<TABLE2>", "<TABLE3>").par.map(readTable).reduce(_ unionAll _)
.par 表示readTable函數(shù)會并行調用临庇,而不是線性順序酪碘。
4.映射為Case Class
case class MyClass(a: Long, b: String, c: Int, d: String, e: String)
dataframe.map {
case Row(a: java.math.BigDecimal, b: String, c: Int, _: String, _: java.sql.Date,
e: java.sql.Date, _: java.sql.Timestamp, _: java.sql.Timestamp, _: java.math.BigDecimal,
_: String) => MyClass(a = a.longValue(), b = b, c = c, d = d.toString, e = e.toString)
}
不可以處理包含null值的記錄取董。可以通過
dataframe.na.drop()
通過處理后沸枯,丟棄包含null的記錄日矫。