Spark+Hbase 億級流量分析實戰(zhàn)(小巧高性能的ETL)

在上一篇文章 大豬 已經(jīng)介紹了日志存儲設(shè)計方案 失球,我們數(shù)據(jù)已經(jīng)落地到數(shù)據(jù)中心上了,那接下來如何ETL呢帮毁?畢竟可是生產(chǎn)環(huán)境級別的实苞,可不能亂來。其實只要解決幾個問題即可烈疚,不必要引入很大級別的組件來做黔牵,當然了各有各的千秋,本文主要從 易懂爷肝、小巧 猾浦、簡潔高性能 這三個方面去設(shè)計出發(fā)點灯抛,順便還實現(xiàn)了一個精巧的 Filebeat金赦。

要實現(xiàn)的功能就是掃描每天的增量日志并寫入Hbase中

需要搞定下面幾個不務(wù)正業(yè)的小老弟

  1. 需要把文件中的每一行數(shù)據(jù)都取出來
  2. 能處理超過10G以上的大日志文件,并且只能占用機器一定的內(nèi)存对嚼,越小越好
  3. 從上圖可以看到標黃的是已經(jīng)寫入Hbase的數(shù)據(jù)夹抗,不能重復(fù)讀取
  4. 非活躍文件不能掃,因為文件過多會影響整體讀取IO性能
  5. 讀取中的過程要保證增量數(shù)據(jù)不能錄入纵竖,因為要保證offset的時候?qū)懭雖ysql穩(wěn)定不跳躍

大豬 根據(jù)線上的生產(chǎn)環(huán)境一一把上面的功能重新分析給實現(xiàn)一下漠烧。

從第一點看還是比較簡單的嘛?但是我們要結(jié)合上面的 5 個問題來看才行靡砌。

總結(jié)一句話就是:要實現(xiàn)一個高性能而且能隨時重啟繼續(xù)工作的 loghub ETL 程序已脓。

實際也必需這樣做,因為生產(chǎn)環(huán)境容不得馬虎通殃,不然就等著被BOSS

需要有一個讀取所有日志文件方法

還要實現(xiàn)一個保存并讀取文件進度的方法

由于不能把一個日志文件全部讀入內(nèi)存進行處理
所以還需要一個能根據(jù)索引一行一行接著讀取數(shù)據(jù)的方法

最后剩下一個Hbase的連接池小工具

幾個核心方法已經(jīng)寫完了度液,接著是我們的主程序

def run(logPath: File, defaultOffsetDay: String): Unit = {
    val sdfstr = Source.fromFile(seekDayFile).getLines().mkString
    val offsetDay = Option(if (sdfstr == "") null else sdfstr)
    
    //讀取設(shè)置讀取日期的倒數(shù)一天之后的日期文件夾
    val noneOffsetFold = logPath
      .listFiles()
      .filter(_.getName >= LocalDate.parse(offsetDay.getOrElse(defaultOffsetDay)).minusDays(1).toString)
      .sortBy(f => LocalDate.parse(f.getName).toEpochDay)

    //讀取文件夾中的所有日志文件,并取出索引進行匹配
    val filesPar = noneOffsetFold
      .flatMap(files(_, file => file.getName.endsWith(".log")))
      .map(file => (file, seeks().getOrDefault(MD5Hash.getMD5AsHex(file.getAbsolutePath.getBytes()), 0), file.length()))
      .filter(tp2 => {
        //過濾出新文件画舌,與有增量的日志文件
        val fileMd5 = MD5Hash.getMD5AsHex(tp2._1.getAbsolutePath.getBytes())
        val result = offsets.asScala.filter(m => fileMd5.equals(m._1))
        result.isEmpty || tp2._3 > result.head._2
      })
      .par

    filesPar.tasksupport = pool

    val willUpdateOffset = new util.HashMap[String, Long]()
    val formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")
    var logTime:String = null
    filesPar
      .foreach(tp3 => {
        val hbaseClient = HbasePool.getTable
        //因為不能全量讀取數(shù)據(jù),所有只能一條一條讀取,批量提出交給HbaseClient的客戶端的mutate方式優(yōu)雅處理
        //foreach 里面的部分就是我們的業(yè)務(wù)處理部分
        lines(tp3._1, tp3._2, tp3._3, () => {
          willUpdateOffset.put(tp3._1.getAbsolutePath, tp3._3)
          offsets.put(MD5Hash.getMD5AsHex(tp3._1.getAbsolutePath.getBytes), tp3._3)
        })
          .foreach(line => {
            val jsonObject = parse(line)
            val time = (jsonObject \ "time").extract[Long]
            val data = jsonObject \ "data"
            val dataMap = data.values.asInstanceOf[Map[String, Any]]
              .filter(_._2 != null)
              .map(x => x._1 -> x._2.toString)

            val uid = dataMap("uid")
            logTime = time.getLocalDateTime.toString
            val rowkey = uid.take(2) + "|" + time.getLocalDateTime.format(formatter) + "|" + uid.substring(2, 8)

            val row = new Put(Bytes.toBytes(rowkey))
            dataMap.foreach(tp2 => row.addColumn(Bytes.toBytes("info"), Bytes.toBytes(tp2._1), Bytes.toBytes(tp2._2)))
            hbaseClient.mutate(row)
          })
        hbaseClient.flush()
      })
    //更新索引到文件上
    writeSeek(willUpdateOffset)
    //更新索引日期到文件上
    writeSeekDay(noneOffsetFold.last.getName)
    //把 logTime offset 寫到mysql中,方便Spark+Hbase程序讀取并計算
  }

程序很精簡堕担,沒有任何沒用的功能在里面,線上的生產(chǎn)環(huán)境就應(yīng)該是這子的了骗炉。
大家還可以根據(jù)需求加入程序退出發(fā)郵件通知功能之類的照宝。
真正去算了一下也就100行功能代碼,而且占用極小的內(nèi)存句葵,都不到100M厕鹃,很精很精兢仰。

傳送門 完整ETL程序源碼

心明眼亮的你、從此刻開始剂碴。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末把将,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子忆矛,更是在濱河造成了極大的恐慌察蹲,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,222評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件催训,死亡現(xiàn)場離奇詭異洽议,居然都是意外死亡,警方通過查閱死者的電腦和手機漫拭,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,455評論 3 385
  • 文/潘曉璐 我一進店門亚兄,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人采驻,你說我怎么就攤上這事审胚。” “怎么了礼旅?”我有些...
    開封第一講書人閱讀 157,720評論 0 348
  • 文/不壞的土叔 我叫張陵膳叨,是天一觀的道長。 經(jīng)常有香客問我痘系,道長菲嘴,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,568評論 1 284
  • 正文 為了忘掉前任碎浇,我火速辦了婚禮临谱,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘奴璃。我一直安慰自己,他們只是感情好城豁,可當我...
    茶點故事閱讀 65,696評論 6 386
  • 文/花漫 我一把揭開白布苟穆。 她就那樣靜靜地躺著,像睡著了一般唱星。 火紅的嫁衣襯著肌膚如雪雳旅。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,879評論 1 290
  • 那天间聊,我揣著相機與錄音攒盈,去河邊找鬼。 笑死哎榴,一個胖子當著我的面吹牛型豁,可吹牛的內(nèi)容都是我干的僵蛛。 我是一名探鬼主播,決...
    沈念sama閱讀 39,028評論 3 409
  • 文/蒼蘭香墨 我猛地睜開眼迎变,長吁一口氣:“原來是場噩夢啊……” “哼充尉!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起衣形,我...
    開封第一講書人閱讀 37,773評論 0 268
  • 序言:老撾萬榮一對情侶失蹤驼侠,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后谆吴,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體倒源,經(jīng)...
    沈念sama閱讀 44,220評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,550評論 2 327
  • 正文 我和宋清朗相戀三年句狼,在試婚紗的時候發(fā)現(xiàn)自己被綠了相速。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,697評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡鲜锚,死狀恐怖突诬,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情芜繁,我是刑警寧澤旺隙,帶...
    沈念sama閱讀 34,360評論 4 332
  • 正文 年R本政府宣布,位于F島的核電站骏令,受9級特大地震影響蔬捷,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜榔袋,卻給世界環(huán)境...
    茶點故事閱讀 40,002評論 3 315
  • 文/蒙蒙 一周拐、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧凰兑,春花似錦妥粟、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,782評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至锅知,卻和暖如春播急,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背售睹。 一陣腳步聲響...
    開封第一講書人閱讀 32,010評論 1 266
  • 我被黑心中介騙來泰國打工桩警, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人昌妹。 一個月前我還...
    沈念sama閱讀 46,433評論 2 360
  • 正文 我出身青樓捶枢,卻偏偏與公主長得像握截,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子柱蟀,可洞房花燭夜當晚...
    茶點故事閱讀 43,587評論 2 350