Spark 學習筆記(三)-數(shù)據(jù)讀存-JSON

JSON是一種半結(jié)構(gòu)化的數(shù)據(jù)格式虐唠,最簡單的讀取方式是將數(shù)據(jù)作為文本文件讀取,然后使用JSON解析器來對RDD的值進行映射操作辆床。

  • 讀取JSON:將數(shù)據(jù)作為文本文件讀取说订,這種方法在所有的編程語言中都可以使用。方法假設(shè)文件中每一行都是一條JSON記錄坐求。如果數(shù)據(jù)跨行了蚕泽,就需要讀取整個文件,然后對文件進行解析桥嗤。
    可以使用mapPartitions()來重用解析器须妻,這個對每一個分區(qū)進行操作。
    Java中使用Jackson進行JSON操作泛领。
    Java中通常將記錄讀取到一個代表結(jié)構(gòu)的類(與JavaBean同)
例子數(shù)據(jù)
{"name":"上海灘","singer":"葉麗儀","album":"香港電視劇主題歌","path":"mp3/shanghaitan.mp3"}
{"name":"一生何求","singer":"陳百強","album":"香港電視劇主題歌","path":"mp3/shanghaitan.mp3"}
{"name":"紅日","singer":"李克勤","album":"懷舊專輯","path":"mp3/shanghaitan.mp3"}
{"name":"愛如潮水","singer":"張信哲","album":"懷舊專輯","path":"mp3/airucaoshun.mp3"}
{"name":"紅茶館","singer":"陳惠嫻","album":"懷舊專輯","path":"mp3/redteabar.mp3"}
package spark_Function;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.codehaus.jackson.map.ObjectMapper;




public class json {

    public static void main(String[] args) {
        // TODO 自動生成的方法存根

        SparkConf conf = new SparkConf().setMaster("local").setAppName("MyMp3");
        JavaSparkContext jsc = new JavaSparkContext(conf);
                JavaRDD<String> input = jsc.textFile("G:/sparkRS/JSON.json");
        JavaRDD<Mp3Info> result = input.mapPartitions(new ParseJson());
        result.foreach(x -> System.out.println(x));


        jsc.close();

    }

}



class ParseJson implements FlatMapFunction<Iterator<String>,Mp3Info>{
    /**
     * 
     */
    private static final long serialVersionUID = 8603650874403773926L;

    @Override
    public Iterator<Mp3Info> call(Iterator<String> lines) throws Exception {
        // TODO 自動生成的方法存根
        ArrayList<Mp3Info> mp3 = new ArrayList<Mp3Info>();
        ObjectMapper mapper = new ObjectMapper();
        while(lines.hasNext()){
                String line = lines.next();
            try{
                mp3.add(mapper.readValue(line, Mp3Info.class));
            }catch(Exception e){


            }       
        }
        return mp3.iterator();
    }

}




class Mp3Info implements Serializable{
    private static final long serialVersionUID = -3811808269846588364L;
    private String name;
    private String album;
    private String path;
    private String singer;

    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getAlbum() {
        return album;
    }
    public void setAlbum(String album) {
        this.album = album;
    }
    public String getPath() {
        return path;
    }
    public void setPath(String path) {
        this.path = path;
    }
    public String getSinger() {
        return singer;
    }
    public void setSinger(String singer) {
        this.singer = singer;
    }
    @Override
    public String toString() {
        return "Mp3Info [name=" + name + ", album=" 
                 + album + ", path=" + path + ", singer=" + singer + "]";
    }

}

ObjectMapper類是Jackson庫的主要類荒吏,提供方法將java對象與json結(jié)構(gòu)匹配,

處理格式不正確的記錄可能會引起很要中的錯誤渊鞋,尤其是像JSON這樣的半結(jié)構(gòu)化數(shù)據(jù)來說绰更。對于大規(guī)模的數(shù)據(jù)集來說格式錯誤很常見,所以如果選這跳過錯誤的數(shù)據(jù)應該使用累加器來跟蹤錯誤锡宋。

  • 保存JSON
    寫出JSON一般不需要考慮格式錯誤的數(shù)據(jù)儡湾,并且也知道要寫出的數(shù)據(jù)類型,
    讀是將字符串RDD轉(zhuǎn)化為解析好的JSON數(shù)據(jù)
    寫由結(jié)構(gòu)化的數(shù)據(jù)組成的RDD轉(zhuǎn)為字符串RDD执俩,然后使用文本文件API寫出去
package spark_Function;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.codehaus.jackson.map.ObjectMapper;




public class json {

    public static void main(String[] args) {
        // TODO 自動生成的方法存根

        SparkConf conf = new SparkConf().setMaster("local").setAppName("MyMp3");
        JavaSparkContext jsc = new JavaSparkContext(conf);


        JavaRDD<String> input = jsc.textFile("G:/sparkRS/JSON.json");
        JavaRDD<Mp3Info> result = input.mapPartitions(new ParseJson()).
                                      filter(
                                          x->x.getAlbum().equals("懷舊專輯")
                                      );
        JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
        result.foreach(x->System.out.println(x));
        formatted.saveAsTextFile("G:/sparkRS/wjson");

        jsc.close();

    }

}

class WriteJson implements FlatMapFunction<Iterator<Mp3Info>, String> {
    /**
     * 
     */
    private static final long serialVersionUID = -6590868830029412793L;

    public Iterator<String> call(Iterator<Mp3Info> song) throws Exception {
        ArrayList<String> text = new ArrayList<String>();
        ObjectMapper mapper = new ObjectMapper();
        while (song.hasNext()) {
            Mp3Info person = song.next();
            text.add(mapper.writeValueAsString(person));
        }
        return text.iterator();
    }
}



class Mp3Info implements Serializable{
    private static final long serialVersionUID = -3811808269846588364L;
    private String name;
    private String album;
    private String path;
    private String singer;

    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getAlbum() {
        return album;
    }
    public void setAlbum(String album) {
        this.album = album;
    }
    public String getPath() {
        return path;
    }
    public void setPath(String path) {
        this.path = path;
    }
    public String getSinger() {
        return singer;
    }
    public void setSinger(String singer) {
        this.singer = singer;
    }
    @Override
    public String toString() {
        return "Mp3Info [name=" + name + ", album=" 
                 + album + ", path=" + path + ", singer=" + singer + "]";
    }

}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末徐钠,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子奠滑,更是在濱河造成了極大的恐慌丹皱,老刑警劉巖妒穴,帶你破解...
    沈念sama閱讀 218,640評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異摊崭,居然都是意外死亡讼油,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,254評論 3 395
  • 文/潘曉璐 我一進店門呢簸,熙熙樓的掌柜王于貴愁眉苦臉地迎上來矮台,“玉大人,你說我怎么就攤上這事根时∈莺眨” “怎么了?”我有些...
    開封第一講書人閱讀 165,011評論 0 355
  • 文/不壞的土叔 我叫張陵蛤迎,是天一觀的道長确虱。 經(jīng)常有香客問我,道長替裆,這世上最難降的妖魔是什么校辩? 我笑而不...
    開封第一講書人閱讀 58,755評論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮辆童,結(jié)果婚禮上宜咒,老公的妹妹穿的比我還像新娘。我一直安慰自己把鉴,他們只是感情好故黑,可當我...
    茶點故事閱讀 67,774評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著庭砍,像睡著了一般场晶。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上逗威,一...
    開封第一講書人閱讀 51,610評論 1 305
  • 那天峰搪,我揣著相機與錄音,去河邊找鬼凯旭。 笑死概耻,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的罐呼。 我是一名探鬼主播鞠柄,決...
    沈念sama閱讀 40,352評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼嫉柴!你這毒婦竟也來了厌杜?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,257評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎夯尽,沒想到半個月后瞧壮,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,717評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡匙握,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,894評論 3 336
  • 正文 我和宋清朗相戀三年咆槽,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片圈纺。...
    茶點故事閱讀 40,021評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡秦忿,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出蛾娶,到底是詐尸還是另有隱情灯谣,我是刑警寧澤,帶...
    沈念sama閱讀 35,735評論 5 346
  • 正文 年R本政府宣布蛔琅,位于F島的核電站胎许,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏罗售。R本人自食惡果不足惜呐萨,卻給世界環(huán)境...
    茶點故事閱讀 41,354評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望莽囤。 院中可真熱鬧,春花似錦切距、人聲如沸朽缎。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,936評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽话肖。三九已至,卻和暖如春葡幸,著一層夾襖步出監(jiān)牢的瞬間最筒,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,054評論 1 270
  • 我被黑心中介騙來泰國打工蔚叨, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留床蜘,地道東北人。 一個月前我還...
    沈念sama閱讀 48,224評論 3 371
  • 正文 我出身青樓蔑水,卻偏偏與公主長得像邢锯,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子搀别,可洞房花燭夜當晚...
    茶點故事閱讀 44,974評論 2 355

推薦閱讀更多精彩內(nèi)容