使用JAVA api讀取HDFS文件亂碼踩坑
想寫一個(gè)讀取HFDS上的部分文件數(shù)據(jù)做預(yù)覽的接口撤缴,根據(jù)網(wǎng)上的博客實(shí)現(xiàn)后,發(fā)現(xiàn)有時(shí)讀取信息會(huì)出現(xiàn)亂碼幻工,例如讀取一個(gè)csv時(shí)巫延,字符串之間被逗號(hào)分割
英文字符串a(chǎn)aa效五,能正常顯示
中文字符串“你好”,能正常顯示
中英混合字符串如“aaa你好”炉峰,出現(xiàn)亂碼
查閱了眾多博客畏妖,解決方案大概都是:使用xxx字符集解碼。抱著不信的想法疼阔,我依次嘗試戒劫,果然沒用。
解決思路
因?yàn)镠DFS支持6種字符集編碼婆廊,每個(gè)本地文件編碼方式又是極可能不一樣的迅细,我們上傳本地文件的時(shí)候其實(shí)就是把文件編碼成字節(jié)流上傳到文件系統(tǒng)存儲(chǔ)。那么在GET文件數(shù)據(jù)時(shí)淘邻,面對(duì)不同文件茵典、不同字符集編碼的字節(jié)流,肯定不是一種固定字符集解碼就能正確解碼的吧宾舅。
那么解決方案其實(shí)有兩種
固定HDFS的編解碼字符集统阿。比如我選用UTF-8,那么在上傳文件時(shí)統(tǒng)一編碼筹我,即把不同文件的字節(jié)流都轉(zhuǎn)化為UTF-8編碼再進(jìn)行存儲(chǔ)扶平。這樣的話在獲取文件數(shù)據(jù)的時(shí)候,采用UTF-8字符集解碼就沒什么問題了蔬蕊。但這樣做的話仍然會(huì)在轉(zhuǎn)碼部分存在諸多問題结澄,且不好實(shí)現(xiàn)。
動(dòng)態(tài)解碼袁串。根據(jù)文件的編碼字符集選用對(duì)應(yīng)的字符集對(duì)解碼概而,這樣的話并不會(huì)對(duì)文件的原生字符流進(jìn)行改動(dòng),基本不會(huì)亂碼囱修。
我選用動(dòng)態(tài)解碼的思路后,其難點(diǎn)在于如何判斷使用哪種字符集解碼王悍。參考下面的內(nèi)容破镰,獲得了解決方案
java檢測(cè)文本(字節(jié)流)的編碼方式
需求:
某文件或者某字節(jié)流要檢測(cè)他的編碼格式。
實(shí)現(xiàn):
基于jchardetdependency>
????<groupId>net.sourceforge.jchardet</groupId>
????<artifactId>jchardet</artifactId>
????<version>1.0</version>
</dependency>
代碼如下:
publicclassDetectorUtils {
????privateDetectorUtils() {
????}
????staticclassChineseCharsetDetectionObserver implements
????????????nsICharsetDetectionObserver {
????????privatebooleanfound = false;
????????privateString result;
????????publicvoidNotify(String charset) {
????????????found = true;
????????????result = charset;
????????}
????????publicChineseCharsetDetectionObserver(booleanfound, String result) {
????????????super();
????????????this.found = found;
????????????this.result = result;
????????}
????????publicbooleanisFound() {
????????????returnfound;
????????}
????????publicString getResult() {
????????????returnresult;
????????}
????}
????publicstaticString[] detectChineseCharset(InputStream in)
????????????throwsException {
????????String[] prob=null;
????????BufferedInputStream imp = null;
????????try{
????????????booleanfound = false;
????????????String result = Charsets.UTF_8.toString();
????????????intlang = nsPSMDetector.CHINESE;
????????????nsDetector det = newnsDetector(lang);
????????????ChineseCharsetDetectionObserver detectionObserver = newChineseCharsetDetectionObserver(
????????????????????found, result);
????????????det.Init(detectionObserver);
????????????imp = newBufferedInputStream(in);
????????????byte[] buf = newbyte[1024];
????????????intlen;
????????????booleanisAscii = true;
????????????while((len = imp.read(buf, 0, buf.length)) != -1) {
????????????????if(isAscii)
????????????????????isAscii = det.isAscii(buf, len);
????????????????if(!isAscii) {
????????????????????if(det.DoIt(buf, len, false))
????????????????????????break;
????????????????}
????????????}
????????????det.DataEnd();
????????????booleanisFound = detectionObserver.isFound();
????????????if(isAscii) {
????????????????isFound = true;
????????????????prob = newString[] { "ASCII"};
????????????} elseif(isFound) {
????????????????prob = newString[] { detectionObserver.getResult() };
????????????} else{
????????????????prob = det.getProbableCharsets();
????????????}
????????????returnprob;
????????} finally{
????????????IOUtils.closeQuietly(imp);
????????????IOUtils.closeQuietly(in);
????????}
????}
測(cè)試:?
String file = "C:/3737001.xml";
String[] probableSet = DetectorUtils.detectChineseCharset(newFileInputStream(file));
for(String charset : probableSet) {
????System.out.println(charset);
}
從HDFS讀取部分文件做預(yù)覽的邏輯
// 獲取文件的部分?jǐn)?shù)據(jù)做預(yù)覽
publicList<String> getFileDataWithLimitLines(String filePath, Integer limit) {
?FSDataInputStream fileStream = openFile(filePath);
?returnreadFileWithLimit(fileStream, limit);
}
// 獲取文件的數(shù)據(jù)流
privateFSDataInputStream openFile(String filePath) {
?FSDataInputStream fileStream = null;
?try{
??fileStream = fs.open(newPath(getHdfsPath(filePath)));
?} catch(IOException e) {
??logger.error("fail to open file:{}", filePath, e);
?}
?returnfileStream;
}
// 讀取最多l(xiāng)imit行文件數(shù)據(jù)
privateList<String> readFileWithLimit(FSDataInputStream fileStream, Integer limit) {
?byte[] bytes = readByteStream(fileStream);
?String data = decodeByteStream(bytes);
?if(data == null) {
??returnnull;
?}
?List<String> rows = Arrays.asList(data.split("\\r\\n"));
?returnrows.stream().filter(StringUtils::isNotEmpty)
???.limit(limit)
???.collect(Collectors.toList());
}
// 從文件數(shù)據(jù)流中讀取字節(jié)流
privatebyte[] readByteStream(FSDataInputStream fileStream) {
?byte[] bytes = newbyte[1024*30];
?intlen;
?ByteArrayOutputStream stream = newByteArrayOutputStream();
?try{
??while((len = fileStream.read(bytes)) != -1) {
???stream.write(bytes, 0, len);
??}
?} catch(IOException e) {
??logger.error("read file bytes stream failed.", e);
??returnnull;
?}
?returnstream.toByteArray();
}
// 解碼字節(jié)流
privateString decodeByteStream(byte[] bytes) {
?if(bytes == null) {
??returnnull;
?}
?String encoding = guessEncoding(bytes);
?String data = null;
?try{
??data = newString(bytes, encoding);
?} catch(Exception e) {
??logger.error("decode byte stream failed.", e);
?}
?returndata;
}
// 根據(jù)Google的工具判別編碼
privateString guessEncoding(byte[] bytes) {
?UniversalDetector detector = newUniversalDetector(null);
?detector.handleData(bytes, 0, bytes.length);
?detector.dataEnd();
?String encoding = detector.getDetectedCharset();
?detector.reset();
?if(StringUtils.isEmpty(encoding)) {
??encoding = "UTF-8";
?}
?returnencoding;
}