hadoop和spark讀取GBK編碼亂碼

轉(zhuǎn)自 https://www.cnblogs.com/teagnes/p/6112019.html

其實在很久之前灑家剛剛搞hadoop的時候就遇到了這個問題囊颅,只是那個時候只知道讀取hdfs上的文本文件的時候一定要是utf8編碼,不然就會出現(xiàn)亂碼傅瞻,后來倒也沒遇到這個問題踢代,畢竟平時的數(shù)據(jù)都是從hive里來的,那時候也不懂這是為什么俭正,最近又遇到了奸鬓,有感于斯,從新總結(jié)一下,如何在hadoop和spark上處理讀取GBK編碼文件

首先來看一下為什么會出現(xiàn)這個問題掸读, 下面是一個最簡單的spark的wordcount程序串远,sc.textFile(filePath)方法從文本文件創(chuàng)建RDD,傳入文件路徑filePath儿惫,查看textFile方法澡罚, 可以看到,實際上調(diào)用了TextInputformat類來解析文本文件肾请,熟悉hadoop的一定知道留搔,mapreudce默認的解析文件文件的類就是TextInputformat,并返回了K V鍵值對

object Wordcount {
  def main(args: Array[String]) {
     val filePath = "";
     val conf = new SparkConf().setAppName("WordCountApp")
     val sc = new SparkContext(conf)
     val line = sc.textFile(filePath)
     line.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect.foreach(println)
     sc.stop
  }
}

def textFile(
   path: String,
   minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
      assertNotStopped()
      hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
}

繼續(xù)看TextInputFormat源碼铛铁,TextInputFormat有兩個作用隔显。

一是對輸入文件分片,mapreduce會為每一個分片都起動一個map任務(wù)來處理饵逐,分片的任務(wù)由TextInputFormat的父類FileInputFormat完成括眠,這里就不做深究了, TextInputFormat中只有讀取數(shù)據(jù)的方法倍权。

二是從分片的數(shù)據(jù)掷豺,生成k v鍵值對也就是Recordreader ,createRecordReader方法不斷的生成Recordreader對像并交給map端去處理 ,下面的代碼中在delimiter.getBytes(Charsets.UTF_8)設(shè)置了字符集当船,很可惜這里并不是讀取文件時使用的题画,而是指定了redcord的分割符,默認情況下是每一行生成一個record德频,一般情況下我們不需要使用到這個參數(shù)苍息,只有在設(shè)置多行作為一個record輸入的時候才會用到,可以通過設(shè)置參數(shù)“textinputformat.record.delimiter”來設(shè)置壹置,那我們是不是可以在代碼中指定我們的讀取文件的字符集呢档叔?

package org.apache.hadoop.mapreduce.lib.input;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import com.google.common.base.Charsets;

/** An {@link InputFormat} for plain text files.  Files are broken into lines.
 * Either linefeed or carriage-return are used to signal end of line.  Keys are
 * the position in the file, and values are the line of text.. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

  @Override
  public RecordReader<LongWritable, Text> 
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
    String delimiter = context.getConfiguration().get(
        "textinputformat.record.delimiter");
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
    return new LineRecordReader(recordDelimiterBytes);
  }

  @Override
  protected boolean isSplitable(JobContext context, Path file) {
    final CompressionCodec codec =
      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    if (null == codec) {
      return true;
    }
    return codec instanceof SplittableCompressionCodec;
  }

}

繼續(xù)看LineRecordReader類,查看其中的nextKeyValue方法蒸绩,該方法是具體生成k v記錄時候使用的,這里有兩個很意思的點铃肯,需要注意患亿。

一是skipUtfByteOrderMark()方法,該方法處理了當文件是有bom的utf-8格式的時候押逼,讀取程序自動跳過bom步藕,有待具體測試一下

二是如果我們讀到的行跨塊了怎么處理?因為hdfs是按文件的大小來切分文件的挑格,難免一行數(shù)據(jù)被切分到兩個塊中去了咙冗,這里有相應(yīng)的處理的邏輯,這里就不再詳細說明了

public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    // We always read one extra line, which lies outside the upper
    // 具體讀取記錄的方法split limit i.e. (end - 1)
    while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
      if (pos == 0) {
        newSize = skipUtfByteOrderMark();
      } else {
        newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
        pos += newSize;
      }

      if ((newSize == 0) || (newSize < maxLineLength)) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
      return true;
    }
  }

這里的value就是在map端獲得的value漂彤,看它是怎么被賦值的雾消,可以看到是從輸入流中讀取數(shù)據(jù),這里有兩種讀取的方法挫望,默認readDefaultLine的讀取一行和通過自定義readCustomLine的分隔符的跨行

public int readLine(Text str, int maxLineLength,
                      int maxBytesToConsume) throws IOException {
    if (this.recordDelimiterBytes != null) {
      return readCustomLine(str, maxLineLength, maxBytesToConsume);
    } else {
      return readDefaultLine(str, maxLineLength, maxBytesToConsume);
    }
  }

默認的方式讀取文件并沒有用到自定義的分割符立润,而value獲取到的還是輸入流中的字節(jié)碼,所以value的獲得的依舊是文件的字節(jié)碼媳板,并沒有做過處理桑腮,那么我們是不是可以在map端獲取到的字節(jié)碼按照“GBK”的方式來解碼讀取呢?經(jīng)過測試之后發(fā)現(xiàn)的確是可以正常讀取的

private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
  throws IOException {
    /* We're reading data from in, but the head of the stream may be
     * already buffered in buffer, so we have several cases:
     * 1. No newline characters are in the buffer, so we need to copy
     *    everything and read another buffer from the stream.
     * 2. An unambiguously terminated line is in buffer, so we just
     *    copy to str.
     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
     *    in CR.  In this case we copy everything up to CR to str, but
     *    we also need to see what follows CR: if it's LF, then we
     *    need consume LF as well, so next call to readLine will read
     *    from after that.
     * We use a flag prevCharCR to signal if previous character was CR
     * and, if it happens to be at the end of the buffer, delay
     * consuming it until we have a chance to look at the char that
     * follows.
     */
    str.clear();
    int txtLength = 0; //tracks str.getLength(), as an optimization
    int newlineLength = 0; //length of terminating newline
    boolean prevCharCR = false; //true of prev char was CR
    long bytesConsumed = 0;
    do {
      int startPosn = bufferPosn; //starting from where we left off the last time
      if (bufferPosn >= bufferLength) {
        startPosn = bufferPosn = 0;
        if (prevCharCR) {
          ++bytesConsumed; //account for CR from previous read
        }
        bufferLength = fillBuffer(in, buffer, prevCharCR);
        if (bufferLength <= 0) {
          break; // EOF
        }
      }
      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
        if (buffer[bufferPosn] == LF) {
          newlineLength = (prevCharCR) ? 2 : 1;
          ++bufferPosn; // at next invocation proceed from following byte
          break;
        }
        if (prevCharCR) { //CR + notLF, we are at notLF
          newlineLength = 1;
          break;
        }
        prevCharCR = (buffer[bufferPosn] == CR);
      }
      int readLength = bufferPosn - startPosn;
      if (prevCharCR && newlineLength == 0) {
        --readLength; //CR at the end of the buffer
      }
      bytesConsumed += readLength;
      int appendLength = readLength - newlineLength;
      if (appendLength > maxLineLength - txtLength) {
        appendLength = maxLineLength - txtLength;
      }
      if (appendLength > 0) {
        str.append(buffer, startPosn, appendLength);
        txtLength += appendLength;
      }
    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);

    if (bytesConsumed > Integer.MAX_VALUE) {
      throw new IOException("Too many bytes before newline: " + bytesConsumed);
    }
    return (int)bytesConsumed;
  }

解決方法:

spark讀取GBK編碼文件

將value的字節(jié)碼按照GBK的方式讀取變成字符串蛉幸,運行之后能夠正常顯示

object GBKtoUTF8 {
 
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName(" GBK TO UTF8")
      .setMaster("local")
 
    val sc = new SparkContext(conf)
 
    val rdd = sc.hadoopFile("F:\\data\\score.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 1)
      .map(p => new String(p._2.getBytes, 0, p._2.getLength, "GBK"))
      .flatMap(s => s.split(","))
      .map(x => (x, 1))
      .reduceByKey(_ + _)
      .collect
      .foreach(println)
  }
}

hadoop讀取GBK編碼文件

public void map(LongWritable key, Text value, Context context) {
        try {

            String line;
            line = new String(value.getBytes(), 0, value.getLength(), "GBK");//使用GBK解析字節(jié)碼 破讨,轉(zhuǎn)成String
            logger.info("gbkstr " + line);
            
            //不要使用toStirng方法來獲取字符串
            //line = value.toString();    
            //logger.info("str " + line);
            
              String[] item = line.split(",");
            for (String str : item) {
                outkey = new Text(str);
                context.write(outkey, outvalue);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市奕纫,隨后出現(xiàn)的幾起案子提陶,更是在濱河造成了極大的恐慌,老刑警劉巖若锁,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件搁骑,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機仲器,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進店門煤率,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人乏冀,你說我怎么就攤上這事蝶糯。” “怎么了辆沦?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵昼捍,是天一觀的道長。 經(jīng)常有香客問我肢扯,道長妒茬,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任蔚晨,我火速辦了婚禮乍钻,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘铭腕。我一直安慰自己银择,他們只是感情好,可當我...
    茶點故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布累舷。 她就那樣靜靜地躺著浩考,像睡著了一般。 火紅的嫁衣襯著肌膚如雪被盈。 梳的紋絲不亂的頭發(fā)上析孽,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天,我揣著相機與錄音只怎,去河邊找鬼绿淋。 笑死,一個胖子當著我的面吹牛尝盼,可吹牛的內(nèi)容都是我干的吞滞。 我是一名探鬼主播,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼盾沫,長吁一口氣:“原來是場噩夢啊……” “哼裁赠!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起赴精,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤佩捞,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后蕾哟,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體一忱,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡莲蜘,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了帘营。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片票渠。...
    茶點故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖芬迄,靈堂內(nèi)的尸體忽然破棺而出问顷,到底是詐尸還是另有隱情,我是刑警寧澤禀梳,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布杜窄,位于F島的核電站送悔,受9級特大地震影響芥备,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜程癌,卻給世界環(huán)境...
    茶點故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一嘴瓤、第九天 我趴在偏房一處隱蔽的房頂上張望荷科。 院中可真熱鬧,春花似錦纱注、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至蜀涨,卻和暖如春瞎嬉,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背厚柳。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工氧枣, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人别垮。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓便监,卻偏偏與公主長得像,于是被迫代替她去往敵國和親碳想。 傳聞我的和親對象是個殘疾皇子烧董,可洞房花燭夜當晚...
    茶點故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內(nèi)容