JSON是一種半結(jié)構(gòu)化的數(shù)據(jù)格式虐唠,最簡單的讀取方式是將數(shù)據(jù)作為文本文件讀取,然后使用JSON解析器來對RDD的值進行映射操作辆床。
- 讀取JSON:將數(shù)據(jù)作為文本文件讀取说订,這種方法在所有的編程語言中都可以使用。方法假設(shè)文件中每一行都是一條JSON記錄坐求。如果數(shù)據(jù)跨行了蚕泽,就需要讀取整個文件,然后對文件進行解析桥嗤。
可以使用mapPartitions()來重用解析器须妻,這個對每一個分區(qū)進行操作。
Java中使用Jackson進行JSON操作泛领。
Java中通常將記錄讀取到一個代表結(jié)構(gòu)的類(與JavaBean同)
例子數(shù)據(jù)
{"name":"上海灘","singer":"葉麗儀","album":"香港電視劇主題歌","path":"mp3/shanghaitan.mp3"}
{"name":"一生何求","singer":"陳百強","album":"香港電視劇主題歌","path":"mp3/shanghaitan.mp3"}
{"name":"紅日","singer":"李克勤","album":"懷舊專輯","path":"mp3/shanghaitan.mp3"}
{"name":"愛如潮水","singer":"張信哲","album":"懷舊專輯","path":"mp3/airucaoshun.mp3"}
{"name":"紅茶館","singer":"陳惠嫻","album":"懷舊專輯","path":"mp3/redteabar.mp3"}
package spark_Function;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.codehaus.jackson.map.ObjectMapper;
public class json {
public static void main(String[] args) {
// TODO 自動生成的方法存根
SparkConf conf = new SparkConf().setMaster("local").setAppName("MyMp3");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> input = jsc.textFile("G:/sparkRS/JSON.json");
JavaRDD<Mp3Info> result = input.mapPartitions(new ParseJson());
result.foreach(x -> System.out.println(x));
jsc.close();
}
}
class ParseJson implements FlatMapFunction<Iterator<String>,Mp3Info>{
/**
*
*/
private static final long serialVersionUID = 8603650874403773926L;
@Override
public Iterator<Mp3Info> call(Iterator<String> lines) throws Exception {
// TODO 自動生成的方法存根
ArrayList<Mp3Info> mp3 = new ArrayList<Mp3Info>();
ObjectMapper mapper = new ObjectMapper();
while(lines.hasNext()){
String line = lines.next();
try{
mp3.add(mapper.readValue(line, Mp3Info.class));
}catch(Exception e){
}
}
return mp3.iterator();
}
}
class Mp3Info implements Serializable{
private static final long serialVersionUID = -3811808269846588364L;
private String name;
private String album;
private String path;
private String singer;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAlbum() {
return album;
}
public void setAlbum(String album) {
this.album = album;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
public String getSinger() {
return singer;
}
public void setSinger(String singer) {
this.singer = singer;
}
@Override
public String toString() {
return "Mp3Info [name=" + name + ", album="
+ album + ", path=" + path + ", singer=" + singer + "]";
}
}
ObjectMapper類是Jackson庫的主要類荒吏,提供方法將java對象與json結(jié)構(gòu)匹配,
處理格式不正確的記錄可能會引起很要中的錯誤渊鞋,尤其是像JSON這樣的半結(jié)構(gòu)化數(shù)據(jù)來說绰更。對于大規(guī)模的數(shù)據(jù)集來說格式錯誤很常見,所以如果選這跳過錯誤的數(shù)據(jù)應該使用累加器來跟蹤錯誤锡宋。
- 保存JSON
寫出JSON一般不需要考慮格式錯誤的數(shù)據(jù)儡湾,并且也知道要寫出的數(shù)據(jù)類型,
讀是將字符串RDD轉(zhuǎn)化為解析好的JSON數(shù)據(jù)
寫由結(jié)構(gòu)化的數(shù)據(jù)組成的RDD轉(zhuǎn)為字符串RDD执俩,然后使用文本文件API寫出去
package spark_Function;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.codehaus.jackson.map.ObjectMapper;
public class json {
public static void main(String[] args) {
// TODO 自動生成的方法存根
SparkConf conf = new SparkConf().setMaster("local").setAppName("MyMp3");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> input = jsc.textFile("G:/sparkRS/JSON.json");
JavaRDD<Mp3Info> result = input.mapPartitions(new ParseJson()).
filter(
x->x.getAlbum().equals("懷舊專輯")
);
JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
result.foreach(x->System.out.println(x));
formatted.saveAsTextFile("G:/sparkRS/wjson");
jsc.close();
}
}
class WriteJson implements FlatMapFunction<Iterator<Mp3Info>, String> {
/**
*
*/
private static final long serialVersionUID = -6590868830029412793L;
public Iterator<String> call(Iterator<Mp3Info> song) throws Exception {
ArrayList<String> text = new ArrayList<String>();
ObjectMapper mapper = new ObjectMapper();
while (song.hasNext()) {
Mp3Info person = song.next();
text.add(mapper.writeValueAsString(person));
}
return text.iterator();
}
}
class Mp3Info implements Serializable{
private static final long serialVersionUID = -3811808269846588364L;
private String name;
private String album;
private String path;
private String singer;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAlbum() {
return album;
}
public void setAlbum(String album) {
this.album = album;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
public String getSinger() {
return singer;
}
public void setSinger(String singer) {
this.singer = singer;
}
@Override
public String toString() {
return "Mp3Info [name=" + name + ", album="
+ album + ", path=" + path + ", singer=" + singer + "]";
}
}