ETL用戶數(shù)據(jù)處理: kafka->spark->kudu

數(shù)據(jù)結(jié)構(gòu)

kafka數(shù)據(jù)結(jié)構(gòu)

基于前兩章 數(shù)據(jù)埋點設(shè)計和SDK源碼數(shù)據(jù)采集和驗證方案的介紹, 我們是使用filebeat采集容器日志到kafka, 使用kafka-eagle查看kafka數(shù)據(jù)泞遗。

image.png

經(jīng)過json格式化(bejson, json之后碗脊,可得到以下數(shù)據(jù)格式, 由此可知真正的日志數(shù)據(jù)在message.request字段。

{
    "@timestamp": "2022-01-24T06:02:17.791Z",
    "@metadata": {
        "beat": "filebeat",
        "type": "_doc",
        "version": "7.9.3"
    },
    "host": {
        "name": "df6d1b047497"
    },
    "message": {
        "request": "{\"agent\":\"python-requests/2.25.1\",\"game_id\":8,\"ip\":\"172.188.22.22",\"properties\":{\"account_id\":\"143\",\"ad\":\"817\",\"ad_id\":\"476\",\"ad_set\":\"261\",\"ads_type\":\"type2\",\"campaign\":\"場景2\",\"campaign_id\":\"來源3\",\"channel\":\"渠道1\",\"coin_num\":\"7386\",\"create_role_time\":\"1642150216868\",\"current_base_level\":\"7\",\"current_level\":\"62\",\"current_role_power\":\"8667\",\"diamond_num\":\"3275\",\"first_game_start_time\":\"1642150216868\",\"first_login_time\":\"1642150216868\",\"first_pay_base_level\":\"4\",\"first_pay_level\":\"43\",\"first_pay_time\":\"1642150216868\",\"first_recharge_time\":\"1642150216868\",\"from_resource_id\":\"來源3\",\"guild_id\":\"19\",\"is_old_user\":\"0\",\"last_game_start_time\":\"1642150217727\",\"last_login_time\":\"1642150216868\",\"latest_login_time\":\"1642150217491\",\"max_battle_id\":\"592\",\"max_level_id\":\"894\",\"media_source\":\"渠道2\",\"register_time\":\"1642150216868\",\"role_id\":\"848\",\"role_type\":\"type3\",\"server_id\":\"2\",\"total_pay_amount\":\"1799\",\"user_level\":\"53\",\"vip_level\":\"33\"},\"time\":1643000614652,\"timestamp\":\"1643004137774\",\"timezone\":\"Asia/Shanghai\",\"type\":\"profile_set\",\"uid\":\"uid_143\",\"uid_type\":\"1\"}",
        "type": "entity",
        "app": "sdk_event",
        "level": "info",
        "ts": 1.6430041377917705e+09,
        "caller": "log/logger.go:81",
        "msg": "entity"
    },
    "log": {
        "offset": 492754230,
        "file": {
            "path": "/var/lib/docker/containers/36847a16c7c8e029744475172847cd14dda0dc28d1c33199df9f8c443e2798ee/36847a16c7c8e029744475172847cd14dda0dc28d1c33199df9f8c443e2798ee-json.log"
        }
    },
    "stream": "stderr",
    "input": {
        "type": "docker"
    },
    "agent": {
        "name": "df6d1b047497",
        "type": "filebeat",
        "version": "7.9.3",
        "hostname": "df6d1b047497",
        "ephemeral_id": "c711a4c8-904a-4dfe-9696-9b54f9dde4a9",
        "id": "d4671f09-1ec3-4bfd-bb6d-1a08761926f9"
    },
    "ecs": {
        "version": "1.5.0"
    }
}

最終的數(shù)據(jù)格式如下

{
    "agent": "python-requests/2.25.1",
    "game_id": 8,
    "ip": "172.188.22.22",
    "properties": {
        "account_id": "143",
        "ad": "817",
        "ad_id": "476",
        "ad_set": "261",
        "ads_type": "type2",
        "campaign": "場景2",
        "campaign_id": "來源3",
        "channel": "渠道1",
        "coin_num": "7386",
        "create_role_time": "1642150216868",
        "current_base_level": "7",
        "current_level": "62",
        "current_role_power": "8667",
        "diamond_num": "3275",
        "first_game_start_time": "1642150216868",
        "first_login_time": "1642150216868",
        "first_pay_base_level": "4",
        "first_pay_level": "43",
        "first_pay_time": "1642150216868",
        "first_recharge_time": "1642150216868",
        "from_resource_id": "來源3",
        "guild_id": "19",
        "is_old_user": "0",
        "last_game_start_time": "1642150217727",
        "last_login_time": "1642150216868",
        "latest_login_time": "1642150217491",
        "max_battle_id": "592",
        "max_level_id": "894",
        "media_source": "渠道2",
        "register_time": "1642150216868",
        "role_id": "848",
        "role_type": "type3",
        "server_id": "2",
        "total_pay_amount": "1799",
        "user_level": "53",
        "vip_level": "33"
    },
    "time": 1643000614652,
    "timestamp": "1643004137774",
    "timezone": "Asia/Shanghai",
    "type": "profile_set",
    "uid": "uid_143",
    "uid_type": "1"
}

kudu表數(shù)據(jù)結(jié)構(gòu)

每個應(yīng)用創(chuàng)建一個用戶表 user_profile_應(yīng)用id, 定義表結(jié)構(gòu)字段, 假設(shè)定義的表結(jié)構(gòu)字段如下

   // 建表的固定字段Schema定義
   val userInfoSchema: StructType = StructType(
       StructField("uid", StringType, false) ::
           StructField("uid_type", StringType, false) ::
           StructField("game_id", IntegerType, false) ::
           StructField("day", StringType, false) ::
           StructField("agent", StringType, true) ::
           StructField("ip", StringType, true) ::
           StructField("timestamp", TimestampType, true) ::
           StructField("time", TimestampType, true) ::
           StructField("timezone", StringType, true) ::
           StructField("year", StringType, true) ::
           StructField("month", StringType, true) ::
           StructField("week", StringType, true) ::
           StructField("hour", StringType, true) ::
           StructField("minute", StringType, true) :: Nil
       )
   // 固定表主鍵
   val keySeq = Seq("uid", "uid_type", "game_id")
   // 固定分區(qū)鍵
   val partitionList = List("uid_type", "game_id")
image.png

Spark處理邏輯

源代碼倉庫

讀取kafka數(shù)據(jù)
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
    ssc,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe[String, String](topic.split(","), kafkaParams))
日志解析

將kafka日志解析成kudu表所需數(shù)據(jù)字段和類型

def filebeat2log(value: String, myAccumulator: MyAccumulator): (String, Map[String, (String, String)]) = {
        val data: JSONObject = JSON.parseObject(value).getJSONObject("message").getJSONObject("request")
        val map: mutable.HashMap[String, (String,String)] = new mutable.HashMap[String, (String,String)]()
        var uid = ""
        var uid_type = ""
        try {
            val jsonKey = data.keySet()
            val iter = jsonKey.iterator()
            uid = data.getString("uid")
            uid_type = data.getString("uid_type")
            val time = data.getString("time")
            val timezone = data.getString("timezone")
            val timeStr = TimeUtil.timestamp2str(time, timezone = timezone)
            // 添加新的gameId到累加器中
            val game_id = data.get("game_id").toString.toInt
            myAccumulator.add(game_id)
            // 解析字段封裝到map中姑躲,會拆分properties
            while (iter.hasNext) {
                val instance = iter.next()
                var value = ""
                value = if (data.get(instance) == null) "" else data.get(instance).toString.toLowerCase
                if (instance.toLowerCase == "timezone") value = data.get(instance).toString
                if (instance == "properties") {
                    val propMap = data.getJSONObject(instance).asScala
                    for ((k, v) <- propMap) map.put(k, (v.toString,timeStr))
                }
                else {
                    map.put(instance, (value,timeStr))
                }
            }
        }catch {
            case e: NullPointerException => {
                println("缺少關(guān)鍵字段")
            }
        }
        (uid+"-"+uid_type,map.toMap)
    }
創(chuàng)建應(yīng)用id的累加器

創(chuàng)建在集群模式下累加的變量,便于每個excutor數(shù)據(jù)同步

class MyAccumulator extends AccumulatorV2[Int,Set[Int]]{
    // 定義一個集合排抬,用來裝game_id
    var set: mutable.Set[Int] = mutable.Set[Int]()
    
    // 是否為初始化狀態(tài)两踏,如果集合數(shù)據(jù)為空,即為初始化狀態(tài)
    override def isZero: Boolean = set.isEmpty
    
    // 復(fù)制累加器
    override def copy(): AccumulatorV2[Int, Set[Int]] = {
        val newAccumulator = new MyAccumulator()
        newAccumulator.set = this.set
        newAccumulator
    }
    
    // 重置累加器艰垂。清空集合元素泡仗。讓集合為空
    override def reset(): Unit = {
        this.set.clear()
    }
    
    override def add(v: Int): Unit = {
        this.set.add(v)
    }
    
    override def merge(other: AccumulatorV2[Int, Set[Int]]): Unit = {
        this.set ++= other.value
    }
    
    override def value: Set[Int] = {
        this.set.toSet
    }
}
kudu建表和數(shù)據(jù)寫入
    // 創(chuàng)建kudu表
    def createTable(kuduContext: KuduContext,
                    tableName: String,
                    schema: StructType,
                    keySeq: Seq[String],
                    partitionList: List[String],
                    bucketNum: Int = 2,
                    replicaNum: Int = 1): Unit = {
        //判斷表是否存在
        if (!kuduContext.tableExists(tableName)) {
            // 表不存在,創(chuàng)建表
            val createTableOptions = new CreateTableOptions()
            // 設(shè)置哈希分區(qū)材泄、哈希桶數(shù)(大于2)及副本數(shù)
            createTableOptions.addHashPartitions(partitionList.asJava, bucketNum).setNumReplicas(replicaNum)
            // 創(chuàng)建表
            kuduContext.createTable(tableName, schema, keySeq, createTableOptions)
            println(s"==== create Kudu Table: $tableName ====")
        }
    }

    // 插入數(shù)據(jù),append模式,對已有key的數(shù)據(jù)自動更新
    def upsertTable2(tableName: String, df: DataFrame, kuduMaster: String): Unit = {
        df.write.options(Map(
            "kudu.master" ->kuduMaster,
            "kudu.table" -> tableName,
            "kudu.operation.timeout.ms" -> "1000000"))
          .mode("append").kudu
    }
    
    // 插入數(shù)據(jù),append模式,對已有key的數(shù)據(jù)自動更新
    def alterRow(kuduContext: KuduContext, tableName: String): Unit = {
        val client = kuduContext.syncClient
        val table = kuduContext.syncClient.openTable(tableName)
        val session = client.newSession()
        val upsert = table.newUpsert()
        val row: PartialRow = upsert.getRow
        row.addString("uid","uid_0")
        session.apply(upsert)
    }

   // 讀取kudu表
    def readKudu(spark: SparkSession,tableName: String, kuduMaster: String): DataFrame ={
        val kuduDF: DataFrame = spark.read.options(Map(
            "kudu.master" -> kuduMaster,
            "kudu.table" -> tableName,
            "kudu.operation.timeout.ms" -> "1000000")).kudu
        kuduDF
    }

Dolphinscheduler-Yarn調(diào)度

任務(wù)調(diào)度

image.png

查看日志

image.png

Presto查詢Kudu數(shù)據(jù)

客戶端連接Presto Server

presto-cli --server slaves01:18080 --catalog hive --schema default
presto-cli --server slaves01:18080 --catalog kudu --schema default

查詢數(shù)據(jù)

select * from kudu.default.user_profile_10 limit 1;

  uid  | uid_type | game_id |    day     |         agent          |      ip      |        timestamp        |          time           |   timezone    | year |  month  |         week          |     hour      |      minute
-------+----------+---------+------------+------------------------+--------------+-------------------------+-------------------------+---------------+------+---------+-----------------------+---------------+------------------
 uid_0 | 1        |      10 | 2022-01-22 | python-requests/2.25.1 | 172.13.1.230 | 2022-01-22 10:36:14.375 | 2022-01-22 10:26:15.338 | Asia/Shanghai | 2022 | 2022-01 | 2022-01-22:10(第四周) | 2022-01-22 10 | 2022-01-22 10:26

系列文章

第一篇: Ambari自動化部署
第二篇: 數(shù)據(jù)埋點設(shè)計和SDK源碼
第三篇: 數(shù)據(jù)采集和驗證方案
第四篇: ETL實時方案: Kafka->Flink->Hive
第五篇: ETL用戶數(shù)據(jù)處理: kafka->spark->kudu
第六篇: Presto分析模型SQL和UDF函數(shù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末沮焕,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子拉宗,更是在濱河造成了極大的恐慌峦树,老刑警劉巖辣辫,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異魁巩,居然都是意外死亡急灭,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進(jìn)店門谷遂,熙熙樓的掌柜王于貴愁眉苦臉地迎上來葬馋,“玉大人,你說我怎么就攤上這事肾扰〕胨唬” “怎么了?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵集晚,是天一觀的道長窗悯。 經(jīng)常有香客問我,道長偷拔,這世上最難降的妖魔是什么蒋院? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮莲绰,結(jié)果婚禮上欺旧,老公的妹妹穿的比我還像新娘。我一直安慰自己蛤签,他們只是感情好辞友,可當(dāng)我...
    茶點故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著震肮,像睡著了一般踏枣。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上钙蒙,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天茵瀑,我揣著相機(jī)與錄音,去河邊找鬼躬厌。 笑死马昨,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的扛施。 我是一名探鬼主播鸿捧,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼疙渣!你這毒婦竟也來了匙奴?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤妄荔,失蹤者是張志新(化名)和其女友劉穎泼菌,沒想到半個月后谍肤,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡哗伯,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年荒揣,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片焊刹。...
    茶點故事閱讀 38,117評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡系任,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出虐块,到底是詐尸還是另有隱情俩滥,我是刑警寧澤,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布贺奠,位于F島的核電站举农,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏敞嗡。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一航背、第九天 我趴在偏房一處隱蔽的房頂上張望喉悴。 院中可真熱鬧,春花似錦玖媚、人聲如沸箕肃。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽勺像。三九已至,卻和暖如春错森,著一層夾襖步出監(jiān)牢的瞬間吟宦,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工涩维, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留殃姓,地道東北人。 一個月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓瓦阐,卻偏偏與公主長得像蜗侈,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子睡蟋,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,877評論 2 345

推薦閱讀更多精彩內(nèi)容