Spark SQL支持使用JDBC從關(guān)系型數(shù)據(jù)庫(比如MySQL)中讀取數(shù)據(jù)丧肴。讀取的數(shù)據(jù)偏塞,依然由DataFrame表示府瞄,可以很方便地使用Spark Core提供的各種算子進(jìn)行處理廉赔。
實(shí)際上使用Spark SQL處理JDBC中的數(shù)據(jù)是非常有用的青抛。比如說旗闽,我們的MySQL業(yè)務(wù)數(shù)據(jù)庫中,有大量的數(shù)據(jù)蜜另,比如3000萬适室,現(xiàn)在需要編寫一個程序,對線上的臟數(shù)據(jù)進(jìn)行某種復(fù)雜業(yè)務(wù)邏輯的處理(統(tǒng)計(jì)業(yè)務(wù)举瑰,算法變了后捣辆,就需要對所有數(shù)據(jù)重新統(tǒng)計(jì)),甚至復(fù)雜到可能要用Spark SQL反復(fù)查詢Hive中的數(shù)據(jù)此迅,來進(jìn)行關(guān)聯(lián)處理罪帖。
那么此時,用Spark SQL來通過JDBC數(shù)據(jù)源邮屁,加載MySQL中的數(shù)據(jù),然后通過各種算子進(jìn)行處理菠齿,是最好的選擇佑吝。因?yàn)镾park是分布式的計(jì)算框架,對于3000萬數(shù)據(jù)绳匀,肯定是分布式處理的芋忿。而如果我們寫一個Java程序炸客,那么只能分批次處理了(定時任務(wù)),先處理2萬條戈钢,再處理2萬條痹仙,可能運(yùn)行完你的Java程序,已經(jīng)是幾天以后的事情了殉了。
操作過程:
1开仰、 首先,是通過SQLContext的read系列方法薪铜,將mysql中的數(shù)據(jù)加載為DataFrame
2众弓、然后,可以將DataFrame轉(zhuǎn)換為RDD隔箍,使用Spark Core提供的各種算子進(jìn)行操作
3谓娃、最后,可以將得到的數(shù)據(jù)結(jié)果蜒滩,通過foreach()算子滨达,寫入mysql、hbase俯艰、redis等等db / cache中
package cn.spark.study.sql;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
/**
* JDBC數(shù)據(jù)源
*/
public class JDBCDataSource {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("JDBCDataSource");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
// 分別將mysql中兩張表的數(shù)據(jù)加載為DataFrame
Map<String, String> options = new HashMap<String, String>();
options.put("url", "jdbc:mysql://hadoop1:3306/testdb");
options.put("dbtable", "student_infos");
DataFrame studentInfosDF = sqlContext.read().format("jdbc")
.options(options).load();
options.put("dbtable", "student_scores");
DataFrame studentScoresDF = sqlContext.read().format("jdbc")
.options(options).load();
// 將兩個DataFrame轉(zhuǎn)換為JavaPairRDD捡遍,執(zhí)行join操作
JavaPairRDD<String, Tuple2<Integer, Integer>> studentsRDD =
studentInfosDF.javaRDD().mapToPair(
new PairFunction<Row, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(Row row) throws Exception {
return new Tuple2<String, Integer>(row.getString(0),
Integer.valueOf(String.valueOf(row.get(1))));
}
})
.join(studentScoresDF.javaRDD().mapToPair(
new PairFunction<Row, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(Row row) throws Exception {
return new Tuple2<String, Integer>(String.valueOf(row.get(0)),
Integer.valueOf(String.valueOf(row.get(1))));
}
}));
// 將JavaPairRDD轉(zhuǎn)換為JavaRDD<Row>
JavaRDD<Row> studentRowsRDD = studentsRDD.map(
new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Row call(
Tuple2<String, Tuple2<Integer, Integer>> tuple)
throws Exception {
return RowFactory.create(tuple._1, tuple._2._1, tuple._2._2);
}
});
// 過濾出分?jǐn)?shù)大于80分的數(shù)據(jù)
JavaRDD<Row> filteredStudentRowsRDD = studentRowsRDD.filter(
new Function<Row, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Row row) throws Exception {
if(row.getInt(2) > 80) {
return true;
}
return false;
}
});
// 轉(zhuǎn)換為DataFrame
List<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));
StructType structType = DataTypes.createStructType(structFields);
DataFrame studentsDF = sqlContext.createDataFrame(filteredStudentRowsRDD, structType);
Row[] rows = studentsDF.collect();
for(Row row : rows) {
System.out.println(row);
}
// 將DataFrame中的數(shù)據(jù)保存到mysql表中
//也有可能是插入mysql、有可能是插入hbase蟆炊,還有可能是插入redis緩存
studentsDF.javaRDD().foreach(new VoidFunction<Row>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Row row) throws Exception {
String sql = "insert into good_student_infos values("
+ "'" + String.valueOf(row.getString(0)) + "',"
+ Integer.valueOf(String.valueOf(row.get(1))) + ","
+ Integer.valueOf(String.valueOf(row.get(2))) + ")";
Class.forName("com.mysql.jdbc.Driver");
Connection conn = null;
Statement stmt = null;
try {
conn = DriverManager.getConnection(
"jdbc:mysql://hadoop1:3306/testdb", "", "");
stmt = conn.createStatement();
stmt.executeUpdate(sql);
} catch (Exception e) {
e.printStackTrace();
} finally {
if(stmt != null) {
stmt.close();
}
if(conn != null) {
conn.close();
}
}
}
});
sc.close();
}
}
Scala版本
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:mysql://spark1:3306/testdb",
"dbtable" -> "students")).load()