Spark SQL(六):JDBC數(shù)據(jù)源

Spark SQL支持使用JDBC從關(guān)系型數(shù)據(jù)庫(比如MySQL)中讀取數(shù)據(jù)丧肴。讀取的數(shù)據(jù)偏塞,依然由DataFrame表示府瞄,可以很方便地使用Spark Core提供的各種算子進(jìn)行處理廉赔。

實(shí)際上使用Spark SQL處理JDBC中的數(shù)據(jù)是非常有用的青抛。比如說旗闽,我們的MySQL業(yè)務(wù)數(shù)據(jù)庫中,有大量的數(shù)據(jù)蜜另,比如3000萬适室,現(xiàn)在需要編寫一個程序,對線上的臟數(shù)據(jù)進(jìn)行某種復(fù)雜業(yè)務(wù)邏輯的處理(統(tǒng)計(jì)業(yè)務(wù)举瑰,算法變了后捣辆,就需要對所有數(shù)據(jù)重新統(tǒng)計(jì)),甚至復(fù)雜到可能要用Spark SQL反復(fù)查詢Hive中的數(shù)據(jù)此迅,來進(jìn)行關(guān)聯(lián)處理罪帖。

那么此時,用Spark SQL來通過JDBC數(shù)據(jù)源邮屁,加載MySQL中的數(shù)據(jù),然后通過各種算子進(jìn)行處理菠齿,是最好的選擇佑吝。因?yàn)镾park是分布式的計(jì)算框架,對于3000萬數(shù)據(jù)绳匀,肯定是分布式處理的芋忿。而如果我們寫一個Java程序炸客,那么只能分批次處理了(定時任務(wù)),先處理2萬條戈钢,再處理2萬條痹仙,可能運(yùn)行完你的Java程序,已經(jīng)是幾天以后的事情了殉了。

操作過程:
1开仰、 首先,是通過SQLContext的read系列方法薪铜,將mysql中的數(shù)據(jù)加載為DataFrame
2众弓、然后,可以將DataFrame轉(zhuǎn)換為RDD隔箍,使用Spark Core提供的各種算子進(jìn)行操作
3谓娃、最后,可以將得到的數(shù)據(jù)結(jié)果蜒滩,通過foreach()算子滨达,寫入mysql、hbase俯艰、redis等等db / cache中

package cn.spark.study.sql;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;

/**
 * JDBC數(shù)據(jù)源
 */
public class JDBCDataSource {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("JDBCDataSource");  
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        
        // 分別將mysql中兩張表的數(shù)據(jù)加載為DataFrame
        Map<String, String> options = new HashMap<String, String>();
        options.put("url", "jdbc:mysql://hadoop1:3306/testdb");
        options.put("dbtable", "student_infos");
        DataFrame studentInfosDF = sqlContext.read().format("jdbc")
                .options(options).load();
    
        options.put("dbtable", "student_scores");
        DataFrame studentScoresDF = sqlContext.read().format("jdbc")
                .options(options).load();
        
        // 將兩個DataFrame轉(zhuǎn)換為JavaPairRDD捡遍,執(zhí)行join操作
        JavaPairRDD<String, Tuple2<Integer, Integer>> studentsRDD = 
                
                studentInfosDF.javaRDD().mapToPair(
                
                        new PairFunction<Row, String, Integer>() {
        
                            private static final long serialVersionUID = 1L;
                
                            @Override
                            public Tuple2<String, Integer> call(Row row) throws Exception {
                                return new Tuple2<String, Integer>(row.getString(0), 
                                        Integer.valueOf(String.valueOf(row.get(1))));  
                            }
                            
                        })
                .join(studentScoresDF.javaRDD().mapToPair(
                            
                        new PairFunction<Row, String, Integer>() {
        
                            private static final long serialVersionUID = 1L;
        
                            @Override
                            public Tuple2<String, Integer> call(Row row) throws Exception {
                                return new Tuple2<String, Integer>(String.valueOf(row.get(0)),
                                        Integer.valueOf(String.valueOf(row.get(1))));  
                            }
                            
                        }));
        
        // 將JavaPairRDD轉(zhuǎn)換為JavaRDD<Row>
        JavaRDD<Row> studentRowsRDD = studentsRDD.map(
                
                new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Row call(
                            Tuple2<String, Tuple2<Integer, Integer>> tuple)
                            throws Exception {
                        return RowFactory.create(tuple._1, tuple._2._1, tuple._2._2);
                    }
                    
                });
        
        // 過濾出分?jǐn)?shù)大于80分的數(shù)據(jù)
        JavaRDD<Row> filteredStudentRowsRDD = studentRowsRDD.filter(
                
                new Function<Row, Boolean>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Boolean call(Row row) throws Exception {
                        if(row.getInt(2) > 80) {
                            return true;
                        } 
                        return false;
                    }
                    
                });
        
        // 轉(zhuǎn)換為DataFrame
        List<StructField> structFields = new ArrayList<StructField>();
        structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));  
        structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); 
        structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true)); 
        StructType structType = DataTypes.createStructType(structFields);
        
        DataFrame studentsDF = sqlContext.createDataFrame(filteredStudentRowsRDD, structType);
        
        Row[] rows = studentsDF.collect();
        for(Row row : rows) {
            System.out.println(row);  
        }
        
        // 將DataFrame中的數(shù)據(jù)保存到mysql表中
        //也有可能是插入mysql、有可能是插入hbase蟆炊,還有可能是插入redis緩存
        studentsDF.javaRDD().foreach(new VoidFunction<Row>() {
            
            private static final long serialVersionUID = 1L;

            @Override
            public void call(Row row) throws Exception {
                String sql = "insert into good_student_infos values(" 
                        + "'" + String.valueOf(row.getString(0)) + "',"
                        + Integer.valueOf(String.valueOf(row.get(1))) + ","
                        + Integer.valueOf(String.valueOf(row.get(2))) + ")";   
                
                Class.forName("com.mysql.jdbc.Driver");  
                
                Connection conn = null;
                Statement stmt = null;
                try {
                    conn = DriverManager.getConnection(
                            "jdbc:mysql://hadoop1:3306/testdb", "", "");
                    stmt = conn.createStatement();
                    stmt.executeUpdate(sql);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if(stmt != null) {
                        stmt.close();
                    } 
                    if(conn != null) {
                        conn.close();
                    }
                }
            }
            
        }); 
        
        sc.close();
    }
}

Scala版本

val jdbcDF = sqlContext.read.format("jdbc").options( 
  Map("url" -> "jdbc:mysql://spark1:3306/testdb",
  "dbtable" -> "students")).load()
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末稽莉,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子涩搓,更是在濱河造成了極大的恐慌污秆,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,807評論 6 518
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件昧甘,死亡現(xiàn)場離奇詭異良拼,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)充边,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,284評論 3 399
  • 文/潘曉璐 我一進(jìn)店門庸推,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人浇冰,你說我怎么就攤上這事贬媒。” “怎么了肘习?”我有些...
    開封第一講書人閱讀 169,589評論 0 363
  • 文/不壞的土叔 我叫張陵际乘,是天一觀的道長。 經(jīng)常有香客問我漂佩,道長脖含,這世上最難降的妖魔是什么罪塔? 我笑而不...
    開封第一講書人閱讀 60,188評論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮养葵,結(jié)果婚禮上征堪,老公的妹妹穿的比我還像新娘。我一直安慰自己关拒,他們只是感情好佃蚜,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,185評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著夏醉,像睡著了一般爽锥。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上畔柔,一...
    開封第一講書人閱讀 52,785評論 1 314
  • 那天氯夷,我揣著相機(jī)與錄音,去河邊找鬼靶擦。 笑死腮考,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的玄捕。 我是一名探鬼主播踩蔚,決...
    沈念sama閱讀 41,220評論 3 423
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼枚粘!你這毒婦竟也來了馅闽?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,167評論 0 277
  • 序言:老撾萬榮一對情侶失蹤馍迄,失蹤者是張志新(化名)和其女友劉穎福也,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體攀圈,經(jīng)...
    沈念sama閱讀 46,698評論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡暴凑,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,767評論 3 343
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了赘来。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片现喳。...
    茶點(diǎn)故事閱讀 40,912評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖犬辰,靈堂內(nèi)的尸體忽然破棺而出嗦篱,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 36,572評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站藤韵,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏腿宰。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,254評論 3 336
  • 文/蒙蒙 一缘厢、第九天 我趴在偏房一處隱蔽的房頂上張望吃度。 院中可真熱鬧,春花似錦贴硫、人聲如沸椿每。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,746評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽间护。三九已至,卻和暖如春挖诸,著一層夾襖步出監(jiān)牢的瞬間汁尺,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,859評論 1 274
  • 我被黑心中介騙來泰國打工多律, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留痴突,地道東北人。 一個月前我還...
    沈念sama閱讀 49,359評論 3 379
  • 正文 我出身青樓狼荞,卻偏偏與公主長得像辽装,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子相味,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,922評論 2 361

推薦閱讀更多精彩內(nèi)容