8.spark core之讀寫數據

??spark支持多種數據源渤闷,從總體來分分為兩大部分:文件系統(tǒng)和數據庫。

文件系統(tǒng)

??文件系統(tǒng)主要有本地文件系統(tǒng)脖镀、Amazon S3飒箭、HDFS等。

??文件系統(tǒng)中存儲的文件有多種存儲格式蜒灰。spark支持的一些常見格式有:

格式名稱 結構化 說明
文件文件 普通文件文件弦蹂,每行一條記錄
JSON 半結構化 常見的基于文本的半結構化數據
CSV 常見的基于文本的格式,在電子表格應用中使用
SequenceFiles 一種用于鍵值對數據的常見Hadoop文件格式

文本文件

  • 讀取

    • 讀取單個文件卷员,參數為文件全路徑盈匾,輸入的每一行都會成為RDD的一個元素。

      • python
      input = sc.textFile("file://opt/module/spark/README.md")
      
      • scala
      val input = sc.textFile("file://opt/module/spark/README.md")
      
      • java
      JavaRDD<String> input = sc.textFile("file://opt/module/spark/README.md")
      
    • 讀取多個文件時毕骡,可以使用textFile將參數改為目錄或以逗號文件的多個文件名即可削饵。如果是小文件,也可以使用wholeTextFiles讀取為一個Pair RDD(鍵是文件名未巫,值是文件內容)窿撬。

    val input = sc.wholeTextFiles("file://opt/module/spark/datas")
    val result = input.mapValues{
        y => {
            val nums = y.split(" ").map(x => x.toDouble)
            nums.sum / nums.size.toDouble
        }
    }
    
  • 寫入

??輸出文本文件時,可使用saveAsTextFile()方法接收一個目錄叙凡,將RDD中的內容輸出到目錄中的多個文件中劈伴。

```
result.saveAsTextFile(outputFile)
```

JSON

  • 讀取

    • 將數據作為文本文件讀取,然后使用JSON解析器對數據進行解析握爷。
    • python使用內置庫讀取JSON
    import json
    ...
    input = sc.textFile("file.json")
    data = input.map(lambda x: json.loads(x))
    
    • scala使用Jackson讀取JSON
    import com.fasterxml.jackson.databind.ObjectMapper
    import com.fasterxml.jackson.module.scala.DefaultScalaModule
    ...
    case class Person(name: String, lovesPandas: Boolean)
    ...
    val input = sc.textFile("file.json")
    val mapper = new ObjectMapper()
    mapper.registerModule(DefaultScalaModule)
    val result = input.flatMap(record => {
        try {
            Some(mapper.readValue(record, classOf[Person]))
        } catch {
            case e: Exception => None
        }
    })
    
    • java使用Jackson讀取JSON
    class ParseJson implements FlatMapFunction<Iterator<String>, Person> {
        public Iterable<Person> call(Iterator<String> lines) throws Exception {
            ArrayList<Person> people = new ArrayList<Person>();
            ObjectMapper mapper = new ObjectMapper();
            while(lines.hasNext()) {
                String line = lines.next();
                try {
                    people.add(mapper.readValue(line, Person.class));    
                } catch(Exception e) {
                    //跳過失敗的數據
                }
            }
            return people;
        }
    }
    
    JavaRDD<String> input = sc.textFile("file.json");
    JavaRDD<Person> result = input.mapPartitions(new ParseJson());
    
  • 寫入

    • 使用JSON解析器將結構化的RDD轉為字符串RDD跛璧,然后使用文本文件API輸出。
    • python
    (data.filter(lambda x: x["lovesPandas"]).map(lambda x: json.dumps(x)).saveAsTextFile(outputFile)
    
    • scala
    result.filter(p => p.lovesPandas).map(mapper.writeValueAsString(_)).saveAsTextFile(outputFile)
    
    • java
    class WriteJson implements FlatMapFunction<Iterator<Person>, String> {
        public Iterable<String> call(Iterator<Person> people) throws Exception {
            ArrayList<String> text = new ArrayList<String>();
            ObjectMapper mapper = new ObjectMapper();
            while(people.hasNext()) {
                Person person = people.next();
                text.add(mapper.writeValueAsString(person));
            }
            return text;
        }
    }
    
    JavaRDD<Person> result = input.mapPartitions(new ParseJson()).filter(new LikesPandas());
    JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
    formatted.saveAsTextFile(outfile);
    

CSV與TSV

??CSV與TSV文件每行都有固定的字段新啼,字段之間使用分隔符(CSV使用逗號追城;tsv使用制表符)分隔。

  • 讀取

    • 將csv或tsv文件當作普通文本文件讀取燥撞,然后使用響應的解析器進行解析座柱,同json處理方式迷帜。

    • python使用內置庫讀取csv

      • 文件中所有字段沒有包含換行符
      import csv
      import StringIO
      ...
      def loadRecord(line):
          input = StringIO.StringIO(line)
          reader = csv.DictReader(input, fieldnames=["name","favouriteAnimal"])
          return reader.next()
      
      """讀取每行記錄"""    
      input = sc.textFile(inputFile).map(loadRecord)
      
      • 文件中的字段包含換行符
      def loadRecords(fileNameContents):
          input = StringIO.StringIO(fileNameContents[1])
          reader = csv.DictReader(input, fieldnames=["name","favoriteAnimal"])
          return reader
          
      """讀取整個文件"""
      fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)
      
    • scala使用opencsv庫讀取csv

      • 文件中所有字段沒有包含換行符
      import Java.io.StringReader
      import au.com.bytecode.opencsv.CSVReader
      ...
      val input = sc.textFile(inputFile)
      val result = input.map{
          line => {
              val reader = new CSVReader(new StringReader(line))
              reader.readNext()
          }
      }
      
      • 文件中的字段包含換行符
      case class Person(name: String, favoriteAnimal: String)
      
      val input = sc.wholeTextFiles(inputFile)
      val result = input.flatMap(
          case(_, txt) => {
              val reader = new CSVReader(new StringReader(txt))
              reader.readAll().map(x => Person(x(0), x(1)))
          }
      
    • java使用opencsv庫讀取csv

      • 文件中所有字段沒有包含換行符
      import Java.io.StringReader
      import au.com.bytecode.opencsv.CSVReader
      ...
      public static class ParseLine implements Function<String, String[]> {
          public String[] call(String line) throws Exception {
              CSVReader reader = new CSVReader(new StringReader(line));
              return reader.readNext();
          }
      }
      
      JavaPairRDD<String[]> csvData = sc.textFile(inputFile).map(new ParseLine());
      
      • 文件中的字段包含換行符
      public static class ParseLine implements FlatMapFunction<Tuple2<String, String>, String[]> {
          public Iterable<String[]> call(Tuple2<String, String> file) throws Exception {
              CSVReader reader = new CSVReader(new StringReader(file._2);
              return reader.readAll();
          }
      }
      
      JavaRDD<String[]> keyedRDD = sc.wholeTextFiles(inputFile).flatMap(new ParseLine());
      
  • 寫入

    • csv或tsv文件輸出時,將個字段轉為指定順序的數組色洞,然后采用普通文本文件的方式進行輸出戏锹。
    • python
    def writeRecords(records):
        output = StringIO.StringIO()
        writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"])
        for record in records:
            writer.writerow(record)
        return [output.getValue()]
        
    pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)
    
    • scala
    pandasLovers.map(person => List(person.name, person.favoriteAnimal).toArray).mapPartitions{
        people => {
        val stringWriter = new StringWriter()
        val csvWriter = new CSVWriter(stringWriter)
        csvWriter.writeAll(people.toList)
        Iterator(stringWriter.toString)    
        }
    }.saveAsTextFile(outFile)
    

SequenceFile

??SequenceFile是鍵值對形式的常用Hadoop數據格式。由于Hadoop使用一套自定義的序列化框架火诸,因此SequenceFile的鍵值對類型需實現(xiàn)Hadoop的Writable接口锦针。

  • 讀取

    • python
    data = sc.sequenceFile(inFile, "org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")
    
    • scala
    val data = sc.sequenceFile(inFile, classOf[Text], classOf[IntWritable]).map{case (x, y) => (x.toString, y.get())}    
    
    • java
    public static class ConvertToNativeTypes implements PairFunction<Tuple2<Text, IntWritable>, String, Integer> {
        public Tuple2<String, Integer> call(Tuple2<Text, IntWritable> record) {
            return new Tuple2(record._1.toString(), record._2.get());
        }
    }
    
    JavaPairRDD<String, Integer> result = sc.sequenceFile(fileName, Text.class, IntWritable.class).mapToPair(new ConvertToNativeTypes());
    
  • 寫入

    • python
    data = sc.parallelize([("Panda", 3), ("Kay", 6), ("Snail", 2)])
    data.saveAsSequeceFile(outputFile)
    
    • scala
    val data = sc.parallelize(List(("Panda", 3), ("Kay", 6), ("Snail", 2)))
    data.saveAsSequenceFile(outputFile)
    
    • java(java中沒有saveAsSequenceFile方法,用自定義hadoop格式的方式實現(xiàn))
    public static class ConvertToWritableTypes implements PairFunction<Tuple2<String, Integer>, Text, IntWritable> {
        public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) {
            return new Tuple2(new Text(record._1), new IntWritable(record._2));
        }
    }
    
    JavaPairRDD<Text, IntWritable> result = sc.parallelizePairs(input).mapToPair(new ConvertToNativeTypes());
    result.saveAsHadoopFile(fileName, Text.class, IntWritable.class, SequenceFileOutputFormat.class);
    

數據庫

??數據庫主要分為關系型數據庫(MySQL惭蹂、PostgreSQL等)和非關系型數據庫(HBase伞插、ElasticSearch等)。

JDBC數據庫連接

??spark使用JDBC訪問關系型數據庫(MySQL盾碗、PostgreSQL等),只需要構建一個org.apache.spark.rdd.JdbcRDD即可舀瓢。

def createConnection() = {
    Class.forName("com.mysql.jdbc.Driver").newInstance()
    DriverManager.getConnection("jdbc:mysql://localhost/test", "root", "root")
}

def extractValues(r: ResultSet) = {
    (r.getInt(1), r.getString(2))
}

val data = new JdbcRDD(sc, createConnection, 
                "SELECT * FROM panda WHERE id >= ? AND id <= ?"),
                lowerBound = 1, upperBound = 3, 
                numPartitions = 2, mapRow = extractValues)
println(data.collect().toList)

HBase

??spark通過Hadoop輸入格式(org.apache.hadoop.hbase.mapreduce.TableInputFormat)訪問HBase廷雅。這種格式返回鍵值對數據,鍵類型為org.apache.hadoop.hbase.io.ImmutableBytesWritable京髓,值類型為org.apache.hadoop.hbase.client.Result航缀。

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "tablename")

val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], ClassOf[Result])

ElasticSearch

??spark使用ElasticSearch-Hadoop連接器從ElasticSearch中讀寫數據。ElasticSearch連接器依賴于SparkContext設置的配置項堰怨。ElasticSearch連接器也沒有用到Spark封裝的類型芥玉,而使用saveAsHadoopDataSet。

  • 讀取
def mapWritableToInput(in: MapWritable): Map[String, String] = {
    in.map{case (k, v) => (k.toString, v.toString)}.toMap
}

val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set(ConfigurationOptions.ES_RESOURCE_READ, args[1])
jobConf.set(ConfigurationOptions.ES_NODES, args[2])

val currentTweets = sc.hadoopRDD(jobConf, classOf[EsInputFormat[Object, MapWritable]], classOf[Object], ClassOf[MapWritable])

val tweets = currentTweets.map{ case (key, value) => mapWritableToInput(value) }
  • 寫入
val jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("mapred.output.format.class", "org.elasticsearch.hadoop.mr.EsOutFormat")
jobConf.setOutputCommitter(classOf[FileOutputCommitter])
jobConf.set(ConfigurationOptions.ES_RESOURCE_WRITE, "twitter/tweets")
jobConf.set(ConfigurationOptions.ES_NODES, "localhost")
FileOutputFormat.setOutputPath(jobConf, new Path("-"))
output.saveAsHadoopDataset(jobConf)
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末备图,一起剝皮案震驚了整個濱河市灿巧,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌揽涮,老刑警劉巖抠藕,帶你破解...
    沈念sama閱讀 211,376評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異蒋困,居然都是意外死亡盾似,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,126評論 2 385
  • 文/潘曉璐 我一進店門雪标,熙熙樓的掌柜王于貴愁眉苦臉地迎上來零院,“玉大人,你說我怎么就攤上這事村刨「娉” “怎么了?”我有些...
    開封第一講書人閱讀 156,966評論 0 347
  • 文/不壞的土叔 我叫張陵烹困,是天一觀的道長玄妈。 經常有香客問我,道長,這世上最難降的妖魔是什么拟蜻? 我笑而不...
    開封第一講書人閱讀 56,432評論 1 283
  • 正文 為了忘掉前任绎签,我火速辦了婚禮,結果婚禮上酝锅,老公的妹妹穿的比我還像新娘诡必。我一直安慰自己,他們只是感情好搔扁,可當我...
    茶點故事閱讀 65,519評論 6 385
  • 文/花漫 我一把揭開白布爸舒。 她就那樣靜靜地躺著,像睡著了一般稿蹲。 火紅的嫁衣襯著肌膚如雪扭勉。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,792評論 1 290
  • 那天苛聘,我揣著相機與錄音涂炎,去河邊找鬼。 笑死设哗,一個胖子當著我的面吹牛唱捣,可吹牛的內容都是我干的。 我是一名探鬼主播网梢,決...
    沈念sama閱讀 38,933評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼震缭,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了战虏?” 一聲冷哼從身側響起拣宰,我...
    開封第一講書人閱讀 37,701評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎活烙,沒想到半個月后徐裸,有當地人在樹林里發(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 44,143評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡啸盏,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,488評論 2 327
  • 正文 我和宋清朗相戀三年重贺,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片回懦。...
    茶點故事閱讀 38,626評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡气笙,死狀恐怖,靈堂內的尸體忽然破棺而出怯晕,到底是詐尸還是另有隱情潜圃,我是刑警寧澤,帶...
    沈念sama閱讀 34,292評論 4 329
  • 正文 年R本政府宣布舟茶,位于F島的核電站谭期,受9級特大地震影響堵第,放射性物質發(fā)生泄漏。R本人自食惡果不足惜隧出,卻給世界環(huán)境...
    茶點故事閱讀 39,896評論 3 313
  • 文/蒙蒙 一踏志、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦、人聲如沸绍在。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至帆谍,卻和暖如春伪朽,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背既忆。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工驱负, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人患雇。 一個月前我還...
    沈念sama閱讀 46,324評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像宇挫,于是被迫代替她去往敵國和親苛吱。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,494評論 2 348

推薦閱讀更多精彩內容