在上一篇文章 大豬 已經(jīng)介紹了日志存儲設(shè)計方案 失球,我們數(shù)據(jù)已經(jīng)落地到數(shù)據(jù)中心上了,那接下來如何ETL呢帮毁?畢竟可是生產(chǎn)環(huán)境級別的实苞,可不能亂來。其實只要解決幾個問題即可烈疚,不必要引入很大級別的組件來做黔牵,當然了各有各的千秋,本文主要從 易懂爷肝、小巧 猾浦、簡潔、 高性能 這三個方面去設(shè)計出發(fā)點灯抛,順便還實現(xiàn)了一個精巧的 Filebeat金赦。
要實現(xiàn)的功能就是掃描每天的增量日志并寫入Hbase中
需要搞定下面幾個不務(wù)正業(yè)的小老弟
- 需要把文件中的每一行數(shù)據(jù)都取出來
- 能處理超過10G以上的大日志文件,并且只能占用機器一定的內(nèi)存对嚼,越小越好
- 從上圖可以看到標黃的是已經(jīng)寫入Hbase的數(shù)據(jù)夹抗,不能重復(fù)讀取
- 非活躍文件不能掃,因為文件過多會影響整體讀取IO性能
- 讀取中的過程要保證增量數(shù)據(jù)不能錄入纵竖,因為要保證offset的時候?qū)懭雖ysql穩(wěn)定不跳躍
大豬 根據(jù)線上的生產(chǎn)環(huán)境一一把上面的功能重新分析給實現(xiàn)一下漠烧。
從第一點看還是比較簡單的嘛?但是我們要結(jié)合上面的 5 個問題來看才行靡砌。
總結(jié)一句話就是:要實現(xiàn)一個高性能而且能隨時重啟繼續(xù)工作的 loghub ETL 程序
已脓。
實際也必需這樣做,因為生產(chǎn)環(huán)境容不得馬虎通殃,不然就等著被BOSS
需要有一個讀取所有日志文件方法
還要實現(xiàn)一個保存并讀取文件進度的方法
由于不能把一個日志文件全部讀入內(nèi)存進行處理
所以還需要一個能根據(jù)索引一行一行接著讀取數(shù)據(jù)的方法
最后剩下一個Hbase的連接池小工具
幾個核心方法已經(jīng)寫完了度液,接著是我們的主程序
def run(logPath: File, defaultOffsetDay: String): Unit = {
val sdfstr = Source.fromFile(seekDayFile).getLines().mkString
val offsetDay = Option(if (sdfstr == "") null else sdfstr)
//讀取設(shè)置讀取日期的倒數(shù)一天之后的日期文件夾
val noneOffsetFold = logPath
.listFiles()
.filter(_.getName >= LocalDate.parse(offsetDay.getOrElse(defaultOffsetDay)).minusDays(1).toString)
.sortBy(f => LocalDate.parse(f.getName).toEpochDay)
//讀取文件夾中的所有日志文件,并取出索引進行匹配
val filesPar = noneOffsetFold
.flatMap(files(_, file => file.getName.endsWith(".log")))
.map(file => (file, seeks().getOrDefault(MD5Hash.getMD5AsHex(file.getAbsolutePath.getBytes()), 0), file.length()))
.filter(tp2 => {
//過濾出新文件画舌,與有增量的日志文件
val fileMd5 = MD5Hash.getMD5AsHex(tp2._1.getAbsolutePath.getBytes())
val result = offsets.asScala.filter(m => fileMd5.equals(m._1))
result.isEmpty || tp2._3 > result.head._2
})
.par
filesPar.tasksupport = pool
val willUpdateOffset = new util.HashMap[String, Long]()
val formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")
var logTime:String = null
filesPar
.foreach(tp3 => {
val hbaseClient = HbasePool.getTable
//因為不能全量讀取數(shù)據(jù),所有只能一條一條讀取,批量提出交給HbaseClient的客戶端的mutate方式優(yōu)雅處理
//foreach 里面的部分就是我們的業(yè)務(wù)處理部分
lines(tp3._1, tp3._2, tp3._3, () => {
willUpdateOffset.put(tp3._1.getAbsolutePath, tp3._3)
offsets.put(MD5Hash.getMD5AsHex(tp3._1.getAbsolutePath.getBytes), tp3._3)
})
.foreach(line => {
val jsonObject = parse(line)
val time = (jsonObject \ "time").extract[Long]
val data = jsonObject \ "data"
val dataMap = data.values.asInstanceOf[Map[String, Any]]
.filter(_._2 != null)
.map(x => x._1 -> x._2.toString)
val uid = dataMap("uid")
logTime = time.getLocalDateTime.toString
val rowkey = uid.take(2) + "|" + time.getLocalDateTime.format(formatter) + "|" + uid.substring(2, 8)
val row = new Put(Bytes.toBytes(rowkey))
dataMap.foreach(tp2 => row.addColumn(Bytes.toBytes("info"), Bytes.toBytes(tp2._1), Bytes.toBytes(tp2._2)))
hbaseClient.mutate(row)
})
hbaseClient.flush()
})
//更新索引到文件上
writeSeek(willUpdateOffset)
//更新索引日期到文件上
writeSeekDay(noneOffsetFold.last.getName)
//把 logTime offset 寫到mysql中,方便Spark+Hbase程序讀取并計算
}
程序很精簡堕担,沒有任何沒用的功能在里面,線上的生產(chǎn)環(huán)境就應(yīng)該是這子的了骗炉。
大家還可以根據(jù)需求加入程序退出發(fā)郵件通知功能之類的照宝。
真正去算了一下也就100行功能代碼,而且占用極小的內(nèi)存句葵,都不到100M厕鹃,很精很精兢仰。
傳送門 完整ETL程序源碼
心明眼亮的你、從此刻開始剂碴。