讀取mysql數(shù)據(jù)作為DataFrame
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import com.iptv.domain.DatePattern
import com.iptv.job.JobBase
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}
/**
* 獲取配置文件
*
* @param proPath
* @return
*/
def getProPerties(proPath: String): Properties = {
val properties: Properties = new Properties()
properties.load(new FileInputStream(proPath))
properties
}
/**
* 獲取 Mysql 表的數(shù)據(jù)
*
* @param sqlContext
* @param tableName 讀取Mysql表的名字
* @param proPath 配置文件的路徑
* @return 返回 Mysql 表的 DataFrame
*/
def readMysqlTable(sqlContext: SQLContext, tableName: String, proPath: String) = {
val properties: Properties = getProPerties(proPath)
sqlContext
.read
.format("jdbc")
.option("url", properties.getProperty("mysql.url"))
.option("driver", properties.getProperty("mysql.driver"))
.option("user", properties.getProperty("mysql.username"))
.option("password", properties.getProperty("mysql.password"))
// .option("dbtable", tableName.toUpperCase)
.option("dbtable", tableName)
.load()
}
/**
* 獲取 Mysql 表的數(shù)據(jù) 添加過濾條件
*
* @param sqlContext
* @param table 讀取Mysql表的名字
* @param filterCondition 過濾條件
* @param proPath 配置文件的路徑
* @return 返回 Mysql 表的 DataFrame
*/
def readMysqlTable(sqlContext: SQLContext, table: String, filterCondition: String, proPath: String) = {
val properties: Properties = getProPerties(proPath)
var tableName = ""
tableName = "(select * from " + table + " where " + filterCondition + " ) as t1" //支持將條件套入sql將臨時表用作數(shù)據(jù)源
sqlContext
.read
.format("jdbc")
.option("url", properties.getProperty("mysql.url"))
.option("driver", properties.getProperty("mysql.driver"))
.option("user", properties.getProperty("mysql.username"))
.option("password", properties.getProperty("mysql.password"))
// .option("dbtable", tableName.toUpperCase)
.option("dbtable", tableName)
.load()
}
使用示例
//不添加過濾條件
val conf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
val sc: SparkContext = new SparkContext(conf)
val sqlContext: SQLContext = getSQLContext(sc)
import sqlContext.implicits._
val test_table_dataFrame: DataFrame = readMysqlTable(sqlContext, "TEST_TABLE", proPath).persist(PERSIST_LEVEL)
----------------------------------------------------------------------------------------------------
//添加過濾條件
//獲取 task_id = ${task_id} 數(shù)據(jù)做為DataFrame
val conf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
val sc: SparkContext = new SparkContext(conf)
val sqlContext: SQLContext = getSQLContext(sc)
import sqlContext.implicits._
val test_table_dataFrame = readMysqlTable(sqlContext, "TEST_TABLE", s"task_id=${task_id}", configPath)
配置文件部分內(nèi)容
配置文件部分內(nèi)容
#mysql數(shù)據(jù)庫配置
mysql.driver=com.mysql.jdbc.Driver
mysql.url=jdbc:mysql://0.0.0.0:3306/iptv?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true
mysql.username=lillclol
mysql.password=123456
#hive
hive.root_path=hdfs://ns1/user/hive/warehouse/
此為本人日常工作中的原創(chuàng)總結(jié),轉(zhuǎn)載請注明出處K砀唷!E熘搿W诩妗躏鱼!