數(shù)據(jù)結(jié)構(gòu)
kafka數(shù)據(jù)結(jié)構(gòu)
基于前兩章 數(shù)據(jù)埋點設(shè)計和SDK源碼和數(shù)據(jù)采集和驗證方案的介紹, 我們是使用filebeat采集容器日志到kafka, 使用kafka-eagle查看kafka數(shù)據(jù)泞遗。
經(jīng)過json格式化(bejson, json之后碗脊,可得到以下數(shù)據(jù)格式, 由此可知真正的日志數(shù)據(jù)在message.request字段。
{
"@timestamp": "2022-01-24T06:02:17.791Z",
"@metadata": {
"beat": "filebeat",
"type": "_doc",
"version": "7.9.3"
},
"host": {
"name": "df6d1b047497"
},
"message": {
"request": "{\"agent\":\"python-requests/2.25.1\",\"game_id\":8,\"ip\":\"172.188.22.22",\"properties\":{\"account_id\":\"143\",\"ad\":\"817\",\"ad_id\":\"476\",\"ad_set\":\"261\",\"ads_type\":\"type2\",\"campaign\":\"場景2\",\"campaign_id\":\"來源3\",\"channel\":\"渠道1\",\"coin_num\":\"7386\",\"create_role_time\":\"1642150216868\",\"current_base_level\":\"7\",\"current_level\":\"62\",\"current_role_power\":\"8667\",\"diamond_num\":\"3275\",\"first_game_start_time\":\"1642150216868\",\"first_login_time\":\"1642150216868\",\"first_pay_base_level\":\"4\",\"first_pay_level\":\"43\",\"first_pay_time\":\"1642150216868\",\"first_recharge_time\":\"1642150216868\",\"from_resource_id\":\"來源3\",\"guild_id\":\"19\",\"is_old_user\":\"0\",\"last_game_start_time\":\"1642150217727\",\"last_login_time\":\"1642150216868\",\"latest_login_time\":\"1642150217491\",\"max_battle_id\":\"592\",\"max_level_id\":\"894\",\"media_source\":\"渠道2\",\"register_time\":\"1642150216868\",\"role_id\":\"848\",\"role_type\":\"type3\",\"server_id\":\"2\",\"total_pay_amount\":\"1799\",\"user_level\":\"53\",\"vip_level\":\"33\"},\"time\":1643000614652,\"timestamp\":\"1643004137774\",\"timezone\":\"Asia/Shanghai\",\"type\":\"profile_set\",\"uid\":\"uid_143\",\"uid_type\":\"1\"}",
"type": "entity",
"app": "sdk_event",
"level": "info",
"ts": 1.6430041377917705e+09,
"caller": "log/logger.go:81",
"msg": "entity"
},
"log": {
"offset": 492754230,
"file": {
"path": "/var/lib/docker/containers/36847a16c7c8e029744475172847cd14dda0dc28d1c33199df9f8c443e2798ee/36847a16c7c8e029744475172847cd14dda0dc28d1c33199df9f8c443e2798ee-json.log"
}
},
"stream": "stderr",
"input": {
"type": "docker"
},
"agent": {
"name": "df6d1b047497",
"type": "filebeat",
"version": "7.9.3",
"hostname": "df6d1b047497",
"ephemeral_id": "c711a4c8-904a-4dfe-9696-9b54f9dde4a9",
"id": "d4671f09-1ec3-4bfd-bb6d-1a08761926f9"
},
"ecs": {
"version": "1.5.0"
}
}
最終的數(shù)據(jù)格式如下
{
"agent": "python-requests/2.25.1",
"game_id": 8,
"ip": "172.188.22.22",
"properties": {
"account_id": "143",
"ad": "817",
"ad_id": "476",
"ad_set": "261",
"ads_type": "type2",
"campaign": "場景2",
"campaign_id": "來源3",
"channel": "渠道1",
"coin_num": "7386",
"create_role_time": "1642150216868",
"current_base_level": "7",
"current_level": "62",
"current_role_power": "8667",
"diamond_num": "3275",
"first_game_start_time": "1642150216868",
"first_login_time": "1642150216868",
"first_pay_base_level": "4",
"first_pay_level": "43",
"first_pay_time": "1642150216868",
"first_recharge_time": "1642150216868",
"from_resource_id": "來源3",
"guild_id": "19",
"is_old_user": "0",
"last_game_start_time": "1642150217727",
"last_login_time": "1642150216868",
"latest_login_time": "1642150217491",
"max_battle_id": "592",
"max_level_id": "894",
"media_source": "渠道2",
"register_time": "1642150216868",
"role_id": "848",
"role_type": "type3",
"server_id": "2",
"total_pay_amount": "1799",
"user_level": "53",
"vip_level": "33"
},
"time": 1643000614652,
"timestamp": "1643004137774",
"timezone": "Asia/Shanghai",
"type": "profile_set",
"uid": "uid_143",
"uid_type": "1"
}
kudu表數(shù)據(jù)結(jié)構(gòu)
每個應(yīng)用創(chuàng)建一個用戶表 user_profile_應(yīng)用id, 定義表結(jié)構(gòu)字段, 假設(shè)定義的表結(jié)構(gòu)字段如下
// 建表的固定字段Schema定義
val userInfoSchema: StructType = StructType(
StructField("uid", StringType, false) ::
StructField("uid_type", StringType, false) ::
StructField("game_id", IntegerType, false) ::
StructField("day", StringType, false) ::
StructField("agent", StringType, true) ::
StructField("ip", StringType, true) ::
StructField("timestamp", TimestampType, true) ::
StructField("time", TimestampType, true) ::
StructField("timezone", StringType, true) ::
StructField("year", StringType, true) ::
StructField("month", StringType, true) ::
StructField("week", StringType, true) ::
StructField("hour", StringType, true) ::
StructField("minute", StringType, true) :: Nil
)
// 固定表主鍵
val keySeq = Seq("uid", "uid_type", "game_id")
// 固定分區(qū)鍵
val partitionList = List("uid_type", "game_id")
Spark處理邏輯
讀取kafka數(shù)據(jù)
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topic.split(","), kafkaParams))
日志解析
將kafka日志解析成kudu表所需數(shù)據(jù)字段和類型
def filebeat2log(value: String, myAccumulator: MyAccumulator): (String, Map[String, (String, String)]) = {
val data: JSONObject = JSON.parseObject(value).getJSONObject("message").getJSONObject("request")
val map: mutable.HashMap[String, (String,String)] = new mutable.HashMap[String, (String,String)]()
var uid = ""
var uid_type = ""
try {
val jsonKey = data.keySet()
val iter = jsonKey.iterator()
uid = data.getString("uid")
uid_type = data.getString("uid_type")
val time = data.getString("time")
val timezone = data.getString("timezone")
val timeStr = TimeUtil.timestamp2str(time, timezone = timezone)
// 添加新的gameId到累加器中
val game_id = data.get("game_id").toString.toInt
myAccumulator.add(game_id)
// 解析字段封裝到map中姑躲,會拆分properties
while (iter.hasNext) {
val instance = iter.next()
var value = ""
value = if (data.get(instance) == null) "" else data.get(instance).toString.toLowerCase
if (instance.toLowerCase == "timezone") value = data.get(instance).toString
if (instance == "properties") {
val propMap = data.getJSONObject(instance).asScala
for ((k, v) <- propMap) map.put(k, (v.toString,timeStr))
}
else {
map.put(instance, (value,timeStr))
}
}
}catch {
case e: NullPointerException => {
println("缺少關(guān)鍵字段")
}
}
(uid+"-"+uid_type,map.toMap)
}
創(chuàng)建應(yīng)用id的累加器
創(chuàng)建在集群模式下累加的變量,便于每個excutor數(shù)據(jù)同步
class MyAccumulator extends AccumulatorV2[Int,Set[Int]]{
// 定義一個集合排抬,用來裝game_id
var set: mutable.Set[Int] = mutable.Set[Int]()
// 是否為初始化狀態(tài)两踏,如果集合數(shù)據(jù)為空,即為初始化狀態(tài)
override def isZero: Boolean = set.isEmpty
// 復(fù)制累加器
override def copy(): AccumulatorV2[Int, Set[Int]] = {
val newAccumulator = new MyAccumulator()
newAccumulator.set = this.set
newAccumulator
}
// 重置累加器艰垂。清空集合元素泡仗。讓集合為空
override def reset(): Unit = {
this.set.clear()
}
override def add(v: Int): Unit = {
this.set.add(v)
}
override def merge(other: AccumulatorV2[Int, Set[Int]]): Unit = {
this.set ++= other.value
}
override def value: Set[Int] = {
this.set.toSet
}
}
kudu建表和數(shù)據(jù)寫入
// 創(chuàng)建kudu表
def createTable(kuduContext: KuduContext,
tableName: String,
schema: StructType,
keySeq: Seq[String],
partitionList: List[String],
bucketNum: Int = 2,
replicaNum: Int = 1): Unit = {
//判斷表是否存在
if (!kuduContext.tableExists(tableName)) {
// 表不存在,創(chuàng)建表
val createTableOptions = new CreateTableOptions()
// 設(shè)置哈希分區(qū)材泄、哈希桶數(shù)(大于2)及副本數(shù)
createTableOptions.addHashPartitions(partitionList.asJava, bucketNum).setNumReplicas(replicaNum)
// 創(chuàng)建表
kuduContext.createTable(tableName, schema, keySeq, createTableOptions)
println(s"==== create Kudu Table: $tableName ====")
}
}
// 插入數(shù)據(jù),append模式,對已有key的數(shù)據(jù)自動更新
def upsertTable2(tableName: String, df: DataFrame, kuduMaster: String): Unit = {
df.write.options(Map(
"kudu.master" ->kuduMaster,
"kudu.table" -> tableName,
"kudu.operation.timeout.ms" -> "1000000"))
.mode("append").kudu
}
// 插入數(shù)據(jù),append模式,對已有key的數(shù)據(jù)自動更新
def alterRow(kuduContext: KuduContext, tableName: String): Unit = {
val client = kuduContext.syncClient
val table = kuduContext.syncClient.openTable(tableName)
val session = client.newSession()
val upsert = table.newUpsert()
val row: PartialRow = upsert.getRow
row.addString("uid","uid_0")
session.apply(upsert)
}
// 讀取kudu表
def readKudu(spark: SparkSession,tableName: String, kuduMaster: String): DataFrame ={
val kuduDF: DataFrame = spark.read.options(Map(
"kudu.master" -> kuduMaster,
"kudu.table" -> tableName,
"kudu.operation.timeout.ms" -> "1000000")).kudu
kuduDF
}
Dolphinscheduler-Yarn調(diào)度
任務(wù)調(diào)度
查看日志
Presto查詢Kudu數(shù)據(jù)
客戶端連接Presto Server
presto-cli --server slaves01:18080 --catalog hive --schema default
presto-cli --server slaves01:18080 --catalog kudu --schema default
查詢數(shù)據(jù)
select * from kudu.default.user_profile_10 limit 1;
uid | uid_type | game_id | day | agent | ip | timestamp | time | timezone | year | month | week | hour | minute
-------+----------+---------+------------+------------------------+--------------+-------------------------+-------------------------+---------------+------+---------+-----------------------+---------------+------------------
uid_0 | 1 | 10 | 2022-01-22 | python-requests/2.25.1 | 172.13.1.230 | 2022-01-22 10:36:14.375 | 2022-01-22 10:26:15.338 | Asia/Shanghai | 2022 | 2022-01 | 2022-01-22:10(第四周) | 2022-01-22 10 | 2022-01-22 10:26
系列文章
第一篇: Ambari自動化部署
第二篇: 數(shù)據(jù)埋點設(shè)計和SDK源碼
第三篇: 數(shù)據(jù)采集和驗證方案
第四篇: ETL實時方案: Kafka->Flink->Hive
第五篇: ETL用戶數(shù)據(jù)處理: kafka->spark->kudu
第六篇: Presto分析模型SQL和UDF函數(shù)