背景
用Spark作數(shù)據(jù)計算框架厦画,將計算結(jié)果寫入傳統(tǒng)關(guān)系數(shù)據(jù)庫疮茄,例如MySQL,供業(yè)務(wù)查詢根暑,這是工作中經(jīng)常使用的模式力试。
在寫入MySQL時,經(jīng)常要加個自增的ID字段排嫌。
第一種方案畸裳,可以手動創(chuàng)建數(shù)據(jù)表,定義自增ID字段躏率,Spark寫入時用追加模式躯畴,ID設(shè)為空即可民鼓。
第二種方案,Spark寫之前就生成好自增ID蓬抄,直接覆蓋寫入MySQL丰嘉。
實際中,我們使用更多的是覆蓋寫入(自動創(chuàng)建表)嚷缭,所以本文介紹一下方案二的實現(xiàn)饮亏。
實現(xiàn)
- Schema添加一列:ID
DataFrame df = ...
StructType schema = df.schema().add(DataTypes.createStructField("id", DataTypes.LongType, false));
- 使用RDD的zipWithIndex得到索引,作為ID值:
JavaRDD<Row> rdd = df
.javaRDD() // 轉(zhuǎn)為JavaRDD
.zipWithIndex() // 添加索引阅爽,結(jié)果為JavaPairRDD<Row, Long>路幸,即行數(shù)據(jù)和對應(yīng)的索引
.map(new Function<Tuple2<Row, Long>, Row>() {
@Override
public Row call(Tuple2<Row, Long> v1) throws Exception {
Object[] objects = new Object[v1._1.size() + 1];
for (int i = 0; i < v1._1.size(); i++) {
objects[i] = v1._1.get(i);
}
objects[objects.length - 1] = v1._2;
return RowFactory.create(objects);
}
}); // 把索引值作為ID字段值,構(gòu)造新的行數(shù)據(jù)
- 將RDD再轉(zhuǎn)回DataFrame
df = sqlContext.createDataFrame(rdd, schema);
- 使用Overwrite模式寫入MySQL
Properties props = new Properties();
props.setProperty("user", "user");
props.setProperty("password", "password");
props.setProperty("driver", "com.mysql.jdbc.Driver"));
df
.write()
.mode(SaveMode.Overwrite) // 覆蓋模式付翁,自動創(chuàng)建表
.jdbc("jdbcUrl", "tableName", props);