在 java 中我倒是較少使用 switch case 缭召,大部分都是if else 搞定
但是在 scala 中 模式匹配 可以說是登峰造極了,你可以用它完成之前很多繁瑣機(jī)械的問題蟋座,最主要是靈活多變,總有一款適合你
先講講最簡(jiǎn)單的
我這里有一個(gè)需求
把 hdfs 上的原始日志數(shù)據(jù)進(jìn)行壓縮假勿,其中壓縮方式 有 snappy gzip lzo defalte 大致四種这吻,原先我把每一種壓縮方式都寫了一個(gè)獨(dú)立的方法,驗(yàn)證都可以正常使用般堆,寫代碼要時(shí)刻有重構(gòu)的想法在孝,我一看大部分 流程都一致,可以把這些合四為一,把單獨(dú)不一致的內(nèi)容拿出來淮摔,做case 判斷識(shí)別
比如
單個(gè)的 snappy 壓縮
/**
* hdfs 文件 snappy 壓縮
*
* @param fs
* @param conf
* @param inpath
* @param outPath
*/
def hdfsFileCompressBySnappyCodec(fs: FileSystem, conf: Configuration, inpath: String, outPath: String): Unit = {
//壓縮時(shí) 讀取fsdata流 寫入 compress流【fs-buff-compress]
val inputPath: Path = new Path(inpath)
val inFsData: FSDataInputStream = fs.open(inputPath)
val snappyCC: SnappyCodec = new SnappyCodec()
//val snappyComp:SnappyCompressor=new SnappyCompressor()
snappyCC.setConf(conf)
// val snappyFile :String= getFileOriginName(inpath) +snappyCC.getDefaultExtension
val inSubPath: String = getOutFileSubPath(inpath)
var nOutPath = ""
if (outPath.endsWith("/")) {
nOutPath = outPath.substring(0, outPath.length - 1)
} else {
nOutPath = outPath
}
val snappyFile: String = nOutPath + inSubPath + snappyCC.getDefaultExtension
val outdir: Path = new Path(snappyFile)
val fsDataOutStream: FSDataOutputStream = fs.create(outdir)
val fsBufferOutStream: BufferedOutputStream = new BufferedOutputStream(fsDataOutStream)
val compressOutStream: CompressionOutputStream = snappyCC.createOutputStream(fsBufferOutStream)
val bufInpStream: BufferedInputStream = new BufferedInputStream(inFsData)
val ioBuffer: Array[Byte] = new Array[Byte](64 * 1024)
var readLen: Int = 0
val start = System.currentTimeMillis()
println("snappy codec begining || " + snappyFile)
try {
while ( {
readLen = bufInpStream.read(ioBuffer)
readLen != -1
}) {
compressOutStream.write(ioBuffer, 0, readLen)
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
compressOutStream.flush()
compressOutStream.finish()
compressOutStream.close()
IOUtils.closeStream(inFsData)
IOUtils.closeStream(fsDataOutStream)
//fs.close()
}
val end = System.currentTimeMillis()
val timeCause = end - start
println("snappy codec finish || " + snappyFile + " || 時(shí)間消耗 " + timeCause + " ms")
}
其實(shí)就是 不同壓縮格式 創(chuàng)建不同的 codec
所以我做了 case就是創(chuàng)建不同codec 私沮,通過 查詢 發(fā)現(xiàn) 這些 codec
都 是 CompressionCodec的子類,
所以就這樣辦了
方法增加一個(gè) 輸入的參數(shù) string codecMethod作為制定 壓縮格式
val codec:CompressionCodec =codecMethod match {
case "SNAPPY" => new SnappyCodec()
// { codec.asInstanceOf[SnappyCodec].setConf(conf)
// }
case "GZIP" => new GzipCodec()
case "LZO" => new Lz4Codec()
case "DEFALTE" => new DefaultCodec()
case _ => new DefaultCodec()
}
這樣一來 剩下的流程都一致和橙,可以做一些 一致的處理
仔燕,
除了上邊的,其實(shí)我還有另一種case的使用 就是在一個(gè)方法中 通過
case 調(diào)用不同的方法魔招,
我前面講過 我在最開始前分別寫了 四個(gè)獨(dú)立的壓縮方法晰搀,這四個(gè)方法相當(dāng)于壓縮算子,只能對(duì)單獨(dú)文件進(jìn)行 壓縮办斑,可是我們 在hdfs上 往往都是 目錄嵌套好幾層外恕,直接從最外層 開始?jí)嚎s杆逗,一口氣可能就是成千上萬個(gè)文件了,所以我寫了一個(gè) 目錄壓縮方法鳞疲,來對(duì)目錄嵌套下的文件進(jìn)行壓縮罪郊,并且我在 目錄壓縮方法中使用了 遞歸,簡(jiǎn)直 不能再好了
尚洽,當(dāng)然在目錄壓縮方法中也是可以指定 壓縮格式的悔橄,我在這里 是通過調(diào)用獨(dú)立的壓縮算子方法實(shí)現(xiàn) 的
如
codec match {
case "SNAPPY" => hdfsFileCompressBySnappyCodec(fs, conf, inpath, outPath)
case "GZIP" => hdfsFileCompressByGzipCodec(fs, conf, inpath, outPath)
case "LZO" => LZOCodecHdfsFileCompressBy(fs, conf, inpath, outPath)
case "DEFALTE" =>deflateCompressForHdfsFile(fs,conf,inpath,outPath)
case _ => deflateCompressForHdfsFile(fs,conf,inpath,outPath)
}
具體的目錄壓縮方法
/**
* 按目錄對(duì)文件 進(jìn)行 snappy gzip lzo 壓縮
* @param fs
* @param conf
* @param inpath 輸入目錄
* @param outPath 輸出目錄
* @param codec 壓縮格式 縮寫 SNAPPY GZIP LZO DEFALTE 默認(rèn)為defalte
* @param propertiesPath 壓縮文件類型刷選 屬性文件路徑
*/
def DirCompressBySnappyGzipLzoCodec(fs: FileSystem, conf: Configuration, inpath: String, outPath: String, codec: String="GZIP")(propertiesPath: String="/usr/local/info.properties"): Unit = {
val inputPa: Path = new Path(inpath)
val fsStatus: FileStatus = fs.getFileStatus(inputPa)
var flag = false
try{
if (fsStatus.isFile) {
flag = CompressUtils.boolFilePrefixContains(inpath, propertiesPath)
if (flag) {
codec match {
case "SNAPPY" => hdfsFileCompressBySnappyCodec(fs, conf, inpath, outPath)
case "GZIP" => hdfsFileCompressByGzipCodec(fs, conf, inpath, outPath)
case "LZO" => LZOCodecHdfsFileCompressBy(fs, conf, inpath, outPath)
case "DEFALTE" =>deflateCompressForHdfsFile(fs,conf,inpath,outPath)
case _ => deflateCompressForHdfsFile(fs,conf,inpath,outPath)
}
}
} else if (fsStatus.isDirectory) {
val listFs: Array[FileStatus] = fs.listStatus(inputPa)
listFs.foreach(fil => {
val fsiN = fil.getPath.getName
println("path dir name " + fsiN)
println("path parent " + fil.getPath.getParent)
val uriPath = fil.getPath.getParent.toString
var newInp = ""
if (uriPath.contains(":9000/")) {
val uriIndex = uriPath.indexOf(":9000/")
newInp = uriPath.substring(uriIndex + 5) + "/" + fsiN
} else {
newInp = uriPath + "/" + fsiN
}
if (fil.isFile) {
flag = CompressUtils.boolFilePrefixContains(newInp, propertiesPath)
if (flag) {
codec match {
case "SNAPPY" => hdfsFileCompressBySnappyCodec(fs, conf, newInp, outPath)
case "GZIP" => hdfsFileCompressByGzipCodec(fs, conf, newInp, outPath)
case "LZO" => LZOCodecHdfsFileCompressBy(fs, conf, newInp, outPath)
case "DEFALTE" =>deflateCompressForHdfsFile(fs,conf,inpath,outPath)
case _ => deflateCompressForHdfsFile(fs,conf,inpath,outPath)
}
}
} else {
DirCompressBySnappyGzipLzoCodec(fs, conf, newInp, outPath, codec)(propertiesPath)
}
})
}