轉(zhuǎn)自 https://www.cnblogs.com/teagnes/p/6112019.html
其實在很久之前灑家剛剛搞hadoop的時候就遇到了這個問題囊颅,只是那個時候只知道讀取hdfs上的文本文件的時候一定要是utf8編碼,不然就會出現(xiàn)亂碼傅瞻,后來倒也沒遇到這個問題踢代,畢竟平時的數(shù)據(jù)都是從hive里來的,那時候也不懂這是為什么俭正,最近又遇到了奸鬓,有感于斯,從新總結(jié)一下,如何在hadoop和spark上處理讀取GBK編碼文件
首先來看一下為什么會出現(xiàn)這個問題掸读, 下面是一個最簡單的spark的wordcount程序串远,sc.textFile(filePath)方法從文本文件創(chuàng)建RDD,傳入文件路徑filePath儿惫,查看textFile方法澡罚, 可以看到,實際上調(diào)用了TextInputformat類來解析文本文件肾请,熟悉hadoop的一定知道留搔,mapreudce默認的解析文件文件的類就是TextInputformat,并返回了K V鍵值對
object Wordcount {
def main(args: Array[String]) {
val filePath = "";
val conf = new SparkConf().setAppName("WordCountApp")
val sc = new SparkContext(conf)
val line = sc.textFile(filePath)
line.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect.foreach(println)
sc.stop
}
}
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
繼續(xù)看TextInputFormat源碼铛铁,TextInputFormat有兩個作用隔显。
一是對輸入文件分片,mapreduce會為每一個分片都起動一個map任務(wù)來處理饵逐,分片的任務(wù)由TextInputFormat的父類FileInputFormat完成括眠,這里就不做深究了, TextInputFormat中只有讀取數(shù)據(jù)的方法倍权。
二是從分片的數(shù)據(jù)掷豺,生成k v鍵值對也就是Recordreader ,createRecordReader方法不斷的生成Recordreader對像并交給map端去處理 ,下面的代碼中在delimiter.getBytes(Charsets.UTF_8)設(shè)置了字符集当船,很可惜這里并不是讀取文件時使用的题画,而是指定了redcord的分割符,默認情況下是每一行生成一個record德频,一般情況下我們不需要使用到這個參數(shù)苍息,只有在設(shè)置多行作為一個record輸入的時候才會用到,可以通過設(shè)置參數(shù)“textinputformat.record.delimiter”來設(shè)置壹置,那我們是不是可以在代碼中指定我們的讀取文件的字符集呢档叔?
package org.apache.hadoop.mapreduce.lib.input;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import com.google.common.base.Charsets;
/** An {@link InputFormat} for plain text files. Files are broken into lines.
* Either linefeed or carriage-return are used to signal end of line. Keys are
* the position in the file, and values are the line of text.. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
@Override
public RecordReader<LongWritable, Text>
createRecordReader(InputSplit split,
TaskAttemptContext context) {
String delimiter = context.getConfiguration().get(
"textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
return new LineRecordReader(recordDelimiterBytes);
}
@Override
protected boolean isSplitable(JobContext context, Path file) {
final CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}
}
繼續(xù)看LineRecordReader類,查看其中的nextKeyValue方法蒸绩,該方法是具體生成k v記錄時候使用的,這里有兩個很意思的點铃肯,需要注意患亿。
一是skipUtfByteOrderMark()方法,該方法處理了當文件是有bom的utf-8格式的時候押逼,讀取程序自動跳過bom步藕,有待具體測試一下
二是如果我們讀到的行跨塊了怎么處理?因為hdfs是按文件的大小來切分文件的挑格,難免一行數(shù)據(jù)被切分到兩個塊中去了咙冗,這里有相應(yīng)的處理的邏輯,這里就不再詳細說明了
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
// We always read one extra line, which lies outside the upper
// 具體讀取記錄的方法split limit i.e. (end - 1)
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
if (pos == 0) {
newSize = skipUtfByteOrderMark();
} else {
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
pos += newSize;
}
if ((newSize == 0) || (newSize < maxLineLength)) {
break;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
這里的value就是在map端獲得的value漂彤,看它是怎么被賦值的雾消,可以看到是從輸入流中讀取數(shù)據(jù),這里有兩種讀取的方法挫望,默認readDefaultLine的讀取一行和通過自定義readCustomLine的分隔符的跨行
public int readLine(Text str, int maxLineLength,
int maxBytesToConsume) throws IOException {
if (this.recordDelimiterBytes != null) {
return readCustomLine(str, maxLineLength, maxBytesToConsume);
} else {
return readDefaultLine(str, maxLineLength, maxBytesToConsume);
}
}
默認的方式讀取文件并沒有用到自定義的分割符立润,而value獲取到的還是輸入流中的字節(jié)碼,所以value的獲得的依舊是文件的字節(jié)碼媳板,并沒有做過處理桑腮,那么我們是不是可以在map端獲取到的字節(jié)碼按照“GBK”的方式來解碼讀取呢?經(jīng)過測試之后發(fā)現(xiàn)的確是可以正常讀取的
private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
/* We're reading data from in, but the head of the stream may be
* already buffered in buffer, so we have several cases:
* 1. No newline characters are in the buffer, so we need to copy
* everything and read another buffer from the stream.
* 2. An unambiguously terminated line is in buffer, so we just
* copy to str.
* 3. Ambiguously terminated line is in buffer, i.e. buffer ends
* in CR. In this case we copy everything up to CR to str, but
* we also need to see what follows CR: if it's LF, then we
* need consume LF as well, so next call to readLine will read
* from after that.
* We use a flag prevCharCR to signal if previous character was CR
* and, if it happens to be at the end of the buffer, delay
* consuming it until we have a chance to look at the char that
* follows.
*/
str.clear();
int txtLength = 0; //tracks str.getLength(), as an optimization
int newlineLength = 0; //length of terminating newline
boolean prevCharCR = false; //true of prev char was CR
long bytesConsumed = 0;
do {
int startPosn = bufferPosn; //starting from where we left off the last time
if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0;
if (prevCharCR) {
++bytesConsumed; //account for CR from previous read
}
bufferLength = fillBuffer(in, buffer, prevCharCR);
if (bufferLength <= 0) {
break; // EOF
}
}
for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
if (buffer[bufferPosn] == LF) {
newlineLength = (prevCharCR) ? 2 : 1;
++bufferPosn; // at next invocation proceed from following byte
break;
}
if (prevCharCR) { //CR + notLF, we are at notLF
newlineLength = 1;
break;
}
prevCharCR = (buffer[bufferPosn] == CR);
}
int readLength = bufferPosn - startPosn;
if (prevCharCR && newlineLength == 0) {
--readLength; //CR at the end of the buffer
}
bytesConsumed += readLength;
int appendLength = readLength - newlineLength;
if (appendLength > maxLineLength - txtLength) {
appendLength = maxLineLength - txtLength;
}
if (appendLength > 0) {
str.append(buffer, startPosn, appendLength);
txtLength += appendLength;
}
} while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
if (bytesConsumed > Integer.MAX_VALUE) {
throw new IOException("Too many bytes before newline: " + bytesConsumed);
}
return (int)bytesConsumed;
}
解決方法:
spark讀取GBK編碼文件
將value的字節(jié)碼按照GBK的方式讀取變成字符串蛉幸,運行之后能夠正常顯示
object GBKtoUTF8 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(" GBK TO UTF8")
.setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.hadoopFile("F:\\data\\score.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 1)
.map(p => new String(p._2.getBytes, 0, p._2.getLength, "GBK"))
.flatMap(s => s.split(","))
.map(x => (x, 1))
.reduceByKey(_ + _)
.collect
.foreach(println)
}
}
hadoop讀取GBK編碼文件
public void map(LongWritable key, Text value, Context context) {
try {
String line;
line = new String(value.getBytes(), 0, value.getLength(), "GBK");//使用GBK解析字節(jié)碼 破讨,轉(zhuǎn)成String
logger.info("gbkstr " + line);
//不要使用toStirng方法來獲取字符串
//line = value.toString();
//logger.info("str " + line);
String[] item = line.split(",");
for (String str : item) {
outkey = new Text(str);
context.write(outkey, outvalue);
}
} catch (Exception e) {
e.printStackTrace();
}
}