1. 維度查詢
在上一篇中闰蛔,我們已經(jīng)把訂單和訂單明細(xì)表join完,本文將關(guān)聯(lián)訂單的其他維度數(shù)據(jù)图柏,維度關(guān)聯(lián)實(shí)際上就是在流中查詢存儲(chǔ)在 hbase 中的數(shù)據(jù)表序六。但是即使通過(guò)主鍵的方式查詢,hbase 速度的查詢也是不及流之間的 join蚤吹。外部數(shù)據(jù)源的查詢常常是流式計(jì)算的性能瓶頸例诀,所以我們?cè)诓樵僪base維度數(shù)據(jù)的基礎(chǔ)上做一些優(yōu)化及封裝随抠。
phoenix查詢封裝
phoenix作為hbase的一個(gè)上層sql封裝,或者叫做皮膚繁涂,可以使用標(biāo)準(zhǔn)的sql語(yǔ)法來(lái)使用hbase拱她,我們做一些簡(jiǎn)單的查詢hbase的工具類。
import cn.hutool.core.bean.BeanUtil;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.common.GmallConfig;
import org.apache.commons.beanutils.BeanUtils;
import java.io.PrintStream;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
/**
* @author zhangbaohpu
* @date 2021/11/13 21:26
* @desc phoenix 工具類扔罪,操作hbase數(shù)據(jù)
*/
public class PhoenixUtil {
private static Connection conn = null;
public static void init(){
try {
Class.forName(GmallConfig.PHOENIX_DRIVER);
conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
conn.setSchema(GmallConfig.HBASE_SCHEMA);
}catch (Exception e){
e.printStackTrace();
throw new RuntimeException("連接phoenix失敗 -> " + e.getMessage());
}
}
public static <T> List<T> getList(String sql, Class<T> clazz){
if(conn == null){
init();
}
PreparedStatement ps = null;
ResultSet rs = null;
List<T> resultList = new ArrayList<>();
try {
//獲取數(shù)據(jù)庫(kù)對(duì)象
ps = conn.prepareStatement(sql);
//執(zhí)行sql語(yǔ)句
rs = ps.executeQuery();
//獲取元數(shù)據(jù)
ResultSetMetaData metaData = rs.getMetaData();
while (rs.next()){
//創(chuàng)建對(duì)象
T rowObj = clazz.newInstance();
//動(dòng)態(tài)給對(duì)象賦值
for (int i = 1; i <= metaData.getColumnCount(); i++) {
BeanUtils.setProperty(rowObj,metaData.getColumnName(i),rs.getObject(i));
}
resultList.add(rowObj);
}
}catch (Exception e){
throw new RuntimeException("phoenix 查詢失敗 -> " + e.getMessage());
}finally {
if(rs!=null){
try {
rs.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
if(ps!=null){
try {
ps.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
if(conn!=null){
try {
conn.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
}
return resultList;
}
public static void main(String[] args) {
String sql = "select * from GMALL_REALTIME.BASE_TRADEMARK";
System.out.println(getList(sql,JSONObject.class));
}
}
有了對(duì)hbase的查詢秉沼,我們?cè)賹?duì)維度數(shù)據(jù)的查詢做一個(gè)封裝,根據(jù)某個(gè)表的id查詢維度數(shù)據(jù)矿酵。
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple2;
import java.util.List;
/**
* @author zhangbaohpu
* @date 2021/11/13 22:24
* @desc 維度查詢封裝唬复,底層調(diào)用PhoenixUtil
*/
public class DimUtil {
//直接從 Phoenix 查詢,沒(méi)有緩存
public static JSONObject getDimInfoNoCache(String tableName, Tuple2<String, String>...
colNameAndValue) {
//組合查詢條件
String wheresql = new String(" where ");
for (int i = 0; i < colNameAndValue.length; i++) {
//獲取查詢列名以及對(duì)應(yīng)的值
Tuple2<String, String> nameValueTuple = colNameAndValue[i];
String fieldName = nameValueTuple.f0;
String fieldValue = nameValueTuple.f1;
if (i > 0) {
wheresql += " and ";
}
wheresql += fieldName + "='" + fieldValue + "'";
}
//組合查詢 SQL
String sql = "select * from " + tableName + wheresql;
System.out.println("查詢維度 SQL:" + sql);
JSONObject dimInfoJsonObj = null;
List<JSONObject> dimList = PhoenixUtil.getList(sql, JSONObject.class);
if (dimList != null && dimList.size() > 0) {
//因?yàn)殛P(guān)聯(lián)維度全肮,肯定都是根據(jù) key 關(guān)聯(lián)得到一條記錄
dimInfoJsonObj = dimList.get(0);
}else{
System.out.println("維度數(shù)據(jù)未找到:" + sql);
}
return dimInfoJsonObj;
}
public static void main(String[] args) {
JSONObject dimInfooNoCache = DimUtil.getDimInfoNoCache("base_trademark",
Tuple2.of("id", "13"));
System.out.println(dimInfooNoCache);
}
}
2. 優(yōu)化1:加入旁路緩存模式
我們?cè)谏厦鎸?shí)現(xiàn)的功能中敞咧,直接查詢的 Hbase。外部數(shù)據(jù)源的查詢常常是流式計(jì)算的性能瓶頸倔矾,所以我們需要在上面實(shí)現(xiàn)的基礎(chǔ)上進(jìn)行一定的優(yōu)化妄均。我們這里使用旁路緩存。
旁路緩存模式是一種非常常見(jiàn)的按需分配緩存的模式哪自。如下圖丰包,任何請(qǐng)求優(yōu)先訪問(wèn)緩存,緩存命中壤巷,直接獲得數(shù)據(jù)返回請(qǐng)求邑彪。如果未命中則,查詢數(shù)據(jù)庫(kù)胧华,同時(shí)把結(jié)果寫入緩存以備后續(xù)請(qǐng)求使用寄症。
1) 這種緩存策略有幾個(gè)注意點(diǎn)
緩存要設(shè)過(guò)期時(shí)間,不然冷數(shù)據(jù)會(huì)常駐緩存浪費(fèi)資源矩动。
要考慮維度數(shù)據(jù)是否會(huì)發(fā)生變化有巧,如果發(fā)生變化要主動(dòng)清除緩存。
2) 緩存的選型
一般兩種:堆緩存或者獨(dú)立緩存服務(wù)(redis悲没,memcache)篮迎,
堆緩存,從性能角度看更好示姿,畢竟訪問(wèn)數(shù)據(jù)路徑更短甜橱,減少過(guò)程消耗。但是管理性差栈戳,其他進(jìn)程無(wú)法維護(hù)緩存中的數(shù)據(jù)岂傲。
獨(dú)立緩存服務(wù)(redis,memcache)本身性能也不錯(cuò),不過(guò)會(huì)有創(chuàng)建連接子檀、網(wǎng)絡(luò) IO 等 消耗镊掖。但是考慮到數(shù)據(jù)如果會(huì)發(fā)生變化乃戈,那還是獨(dú)立緩存服務(wù)管理性更強(qiáng),而且如果數(shù)據(jù)量特別大堰乔,獨(dú)立緩存更容易擴(kuò)展偏化。
因?yàn)樵蹅兊木S度數(shù)據(jù)都是可變數(shù)據(jù),所以這里還是采用 Redis 管理緩存镐侯。
代碼優(yōu)化
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.tuple.Tuple2;
import redis.clients.jedis.Jedis;
import java.util.List;
/**
* @author zhangbaohpu
* @date 2021/11/13 22:24
* @desc 維度查詢封裝侦讨,底層調(diào)用PhoenixUtil
*/
public class DimUtil {
/**
* 查詢優(yōu)化
* redis緩存
* 類型 string list set zset hash
* 這里使用key格式:
* key dim:table_name:value 示例:dim:base_trademark:13
* value json字符串
* 過(guò)期時(shí)間:24*3600
*/
public static JSONObject getDimInfo(String tableName, Tuple2<String, String>...
colNameAndValue) {
//組合查詢條件
String wheresql = new String(" where ");
//redis key
String redisKey = "dim:"+tableName+":";
for (int i = 0; i < colNameAndValue.length; i++) {
//獲取查詢列名以及對(duì)應(yīng)的值
Tuple2<String, String> nameValueTuple = colNameAndValue[i];
String fieldName = nameValueTuple.f0;
String fieldValue = nameValueTuple.f1;
if (i > 0) {
wheresql += " and ";
redisKey += "_";
}
wheresql += fieldName + "='" + fieldValue + "'";
redisKey += fieldValue;
}
Jedis jedis = null;
String redisStr = null;
JSONObject dimInfoJsonObj = null;
try {
jedis = RedisUtil.getJedis();
redisStr = jedis.get(redisKey);
dimInfoJsonObj = null;
} catch (Exception e) {
e.printStackTrace();
System.out.println("獲取redis數(shù)據(jù)錯(cuò)誤");
}
if(redisStr!=null && redisStr.length()>0){
dimInfoJsonObj = JSON.parseObject(redisStr);
}else {
//從phoenix中去數(shù)據(jù)
//組合查詢 SQL
String sql = "select * from " + tableName + wheresql;
System.out.println("查詢維度 SQL:" + sql);
List<JSONObject> dimList = PhoenixUtil.getList(sql, JSONObject.class);
if (dimList != null && dimList.size() > 0) {
//因?yàn)殛P(guān)聯(lián)維度,肯定都是根據(jù) key 關(guān)聯(lián)得到一條記錄
dimInfoJsonObj = dimList.get(0);
if(jedis!=null){
jedis.setex(redisKey,3600*24,dimInfoJsonObj.toString());
}
}else{
System.out.println("維度數(shù)據(jù)未找到:" + sql);
}
}
//關(guān)閉jedis
if(jedis!=null){
jedis.close();
}
return dimInfoJsonObj;
}
public static JSONObject getDimInfoNoCacheById(String tableName, String idValue) {
return getDimInfoNoCache(tableName,new Tuple2<>("id",idValue));
}
//直接從 Phoenix 查詢苟翻,沒(méi)有緩存
public static JSONObject getDimInfoNoCache(String tableName, Tuple2<String, String>...
colNameAndValue) {
//組合查詢條件
String wheresql = new String(" where ");
for (int i = 0; i < colNameAndValue.length; i++) {
//獲取查詢列名以及對(duì)應(yīng)的值
Tuple2<String, String> nameValueTuple = colNameAndValue[i];
String fieldName = nameValueTuple.f0;
String fieldValue = nameValueTuple.f1;
if (i > 0) {
wheresql += " and ";
}
wheresql += fieldName + "='" + fieldValue + "'";
}
//組合查詢 SQL
String sql = "select * from " + tableName + wheresql;
System.out.println("查詢維度 SQL:" + sql);
JSONObject dimInfoJsonObj = null;
List<JSONObject> dimList = PhoenixUtil.getList(sql, JSONObject.class);
if (dimList != null && dimList.size() > 0) {
//因?yàn)殛P(guān)聯(lián)維度韵卤,肯定都是根據(jù) key 關(guān)聯(lián)得到一條記錄
dimInfoJsonObj = dimList.get(0);
}else{
System.out.println("維度數(shù)據(jù)未找到:" + sql);
}
return dimInfoJsonObj;
}
public static void main(String[] args) {
JSONObject dimInfooNoCache = DimUtil.getDimInfoNoCache("base_trademark",
Tuple2.of("id", "13"));
System.out.println(dimInfooNoCache);
}
}
緩存依賴于redisUtil.java工具類
import redis.clients.jedis.*;
/**
* @author zhangbaohpu
* @date 2021/11/13 23:31
* @desc
*/
public class RedisUtil {
public static JedisPool jedisPool=null;
public static Jedis getJedis(){
if(jedisPool==null){
JedisPoolConfig jedisPoolConfig =new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(100); //最大可用連接數(shù)
jedisPoolConfig.setBlockWhenExhausted(true); //連接耗盡是否等待
jedisPoolConfig.setMaxWaitMillis(2000); //等待時(shí)間
jedisPoolConfig.setMaxIdle(5); //最大閑置連接數(shù)
jedisPoolConfig.setMinIdle(5); //最小閑置連接數(shù)
jedisPoolConfig.setTestOnBorrow(true); //取連接的時(shí)候進(jìn)行一下測(cè)試 ping pong
jedisPool=new JedisPool( jedisPoolConfig, "hadoop101",6379 ,1000);
System.out.println("開(kāi)辟連接池");
return jedisPool.getResource();
}else{
System.out.println(" 連接池:"+jedisPool.getNumActive());
return jedisPool.getResource();
}
}
public static void main(String[] args) {
Jedis jedis = getJedis();
System.out.println(jedis.ping());
}
}
維度數(shù)據(jù)發(fā)生變化
如果維度數(shù)據(jù)發(fā)生了變化,這時(shí)緩存的數(shù)據(jù)就不是最新的了崇猫,所以這里優(yōu)化將發(fā)生變化的維度數(shù)據(jù)沈条,在緩存中清除。
在DimUtil.java加入清除緩存方法
//根據(jù) key 讓 Redis 中的緩存失效
public static void deleteCached( String tableName, String id){
String key = "dim:" + tableName.toLowerCase() + ":" + id;
try {
Jedis jedis = RedisUtil.getJedis();
// 通過(guò) key 清除緩存
jedis.del(key);
jedis.close();
} catch (Exception e) {
System.out.println("緩存異常诅炉!");
e.printStackTrace();
}
}
另外一個(gè)蜡歹,在實(shí)時(shí)同步mysql數(shù)據(jù)BaseDbTask任務(wù)中,將維度數(shù)據(jù)通過(guò)DimSink.java放入hbase涕烧,在invoke方法中添加清除緩存操作
@Override
public void invoke(JSONObject jsonObject, Context context) throws Exception {
String sinkTable = jsonObject.getString("sink_table");
JSONObject data = jsonObject.getJSONObject("data");
PreparedStatement ps = null;
if(data!=null && data.size()>0){
try {
//生成phoenix的upsert語(yǔ)句月而,這個(gè)包含insert和update操作
String sql = generateUpsert(data,sinkTable.toUpperCase());
log.info("開(kāi)始執(zhí)行 phoenix sql -->{}",sql);
ps = conn.prepareStatement(sql);
ps.executeUpdate();
conn.commit();
log.info("執(zhí)行 phoenix sql 成功");
} catch (SQLException throwables) {
throwables.printStackTrace();
throw new RuntimeException("執(zhí)行 phoenix sql 失敗议纯!");
}finally {
if(ps!=null){
ps.close();
}
}
//如果是更新維度數(shù)據(jù)父款,則把redis數(shù)據(jù)清空
if(jsonObject.getString("type").endsWith("update")){
DimUtil.deleteCached(sinkTable,data.getString("id"));
}
}
}
3. 優(yōu)化2:異步查詢
在 Flink 流處理過(guò)程中,經(jīng)常需要和外部系統(tǒng)進(jìn)行交互瞻凤,用維度表補(bǔ)全事實(shí)表中的字段憨攒。例如:在電商場(chǎng)景中,需要一個(gè)商品的 skuid 去關(guān)聯(lián)商品的一些屬性阀参,例如商品所屬行業(yè)肝集、商品的生產(chǎn)廠家、生產(chǎn)廠家的一些情況蛛壳;在物流場(chǎng)景中包晰,知道包裹 id,需要去關(guān)聯(lián)包裹的行業(yè)屬性炕吸、發(fā)貨信息、收貨信息等等勉痴。
默認(rèn)情況下赫模,在 Flink 的 MapFunction 中,單個(gè)并行只能用同步方式去交互: 將請(qǐng)求發(fā)送到外部存儲(chǔ)蒸矛,IO 阻塞瀑罗,等待請(qǐng)求返回胸嘴,然后繼續(xù)發(fā)送下一個(gè)請(qǐng)求。這種同步交互的方式往往在網(wǎng)絡(luò)等待上就耗費(fèi)了大量時(shí)間斩祭。為了提高處理效率劣像,可以增加 MapFunction 的并行度,但增加并行度就意味著更多的資源摧玫,并不是一種非常好的解決方式耳奕。
Flink 在 1.2 中引入了 Async I/O,在異步模式下诬像,將 IO 操作異步化屋群,單個(gè)并行可以連續(xù)發(fā)送多個(gè)請(qǐng)求,哪個(gè)請(qǐng)求先返回就先處理坏挠,從而在連續(xù)的請(qǐng)求間不需要阻塞式等待芍躏,大大提高了流處理效率。
Async I/O 是阿里巴巴貢獻(xiàn)給社區(qū)的一個(gè)呼聲非常高的特性降狠,解決與外部系統(tǒng)交互時(shí)網(wǎng)絡(luò)延遲成為了系統(tǒng)瓶頸的問(wèn)題对竣。
異步查詢實(shí)際上是把維表的查詢操作托管給單獨(dú)的線程池完成,這樣不會(huì)因?yàn)槟骋粋€(gè)查詢?cè)斐勺枞衽洌瑔蝹€(gè)并行可以連續(xù)發(fā)送多個(gè)請(qǐng)求否纬,提高并發(fā)效率。
這種方式特別針對(duì)涉及網(wǎng)絡(luò) IO 的操作芥牌,減少因?yàn)檎?qǐng)求等待帶來(lái)的消耗烦味。
flink異步查詢官方文檔:
3.1 封裝線程池工具
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author zhangbaohpu
* @date 2021/11/28 12:18
* @desc 線程池工具類
*
*/
public class ThreadPoolUtil {
private static ThreadPoolExecutor poolExecutor;
/**
* 獲取單例的線程池對(duì)象
* corePoolSize:指定了線程池中的線程數(shù)量,它的數(shù)量決定了添加的任務(wù)是開(kāi)辟新的線程去執(zhí)行壁拉,還是放到 workQueue任務(wù)隊(duì)列中去谬俄;
* maximumPoolSize:指定了線程池中的最大線程數(shù)量,這個(gè)參數(shù)會(huì)根據(jù)你使用的 workQueue 任務(wù)隊(duì)列的類型弃理,決定線程池會(huì)開(kāi)辟的最大線程數(shù)量溃论;
* keepAliveTime:當(dāng)線程池中空閑線程數(shù)量超過(guò) corePoolSize 時(shí),多余的線程會(huì)在多長(zhǎng)時(shí)間內(nèi)被銷毀痘昌;
* unit:keepAliveTime 的單位
* workQueue:任務(wù)隊(duì)列钥勋,被添加到線程池中,但尚未被執(zhí)行的任務(wù)
* @return
*/
public static ThreadPoolExecutor getPoolExecutor(){
if (poolExecutor == null){
synchronized (ThreadPoolUtil.class){
if (poolExecutor == null){
poolExecutor = new ThreadPoolExecutor(
4,20,300, TimeUnit.SECONDS,new LinkedBlockingDeque<>(Integer.MAX_VALUE)
);
}
}
}
return poolExecutor;
}
}
3.2 自定義維度接口
這個(gè)異步維表查詢的方法適用于各種維表的查詢辆苔,用什么條件查算灸,查出來(lái)的結(jié)果如何合并到數(shù)據(jù)流對(duì)象中,需要使用者自己定義驻啤。
這就是自己定義了一個(gè)接口 DimJoinFunction<T>包括兩個(gè)方法菲驴。
import com.alibaba.fastjson.JSONObject;
/**
* @author zhangbaohpu
* @date 2021/11/28 12:34
* @desc 維度關(guān)聯(lián)接口
*/
public interface DimJoinFunction<T> {
//根據(jù)流中獲取主鍵
String getKey(T obj);
//維度關(guān)聯(lián)
void join(T stream, JSONObject dimInfo);
}
3.3 封裝維度異步查詢類
新建包func下創(chuàng)建DimAsyncFunction.java,該類繼承異步方法類 RichAsyncFunction骑冗,實(shí)現(xiàn)自定義維度查詢接口赊瞬,其中 RichAsyncFunction<IN,OUT>是 Flink 提供的異步方法類先煎,此處因?yàn)槭遣樵儾僮鬏斎腩惡头祷仡愐恢拢允?lt;T,T>巧涧。
RichAsyncFunction 這個(gè)類要實(shí)現(xiàn)兩個(gè)方法:
open 用于初始化異步連接池薯蝎。
asyncInvoke 方法是核心方法,里面的操作必須是異步的谤绳,如果你查詢的數(shù)據(jù)庫(kù)有異步api 也可以用線程的異步方法占锯,如果沒(méi)有異步方法,就要自己利用線程池等方式實(shí)現(xiàn)異步查詢闷供。
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.zhangbao.gmall.realtime.utils.DimUtil;
import com.zhangbao.gmall.realtime.utils.ThreadPoolUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
/**
* @author zhangbaohpu
* @date 2021/11/28 12:24
* @desc 通用的維度關(guān)聯(lián)查詢接口
* 模板方法設(shè)計(jì)模式
* 在父類中只定義方法的聲明
* 具體實(shí)現(xiàn)由子類完成
*/
public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T,T> implements DimJoinFunction<T> {
private String tableName;
private static ExecutorService executorPool;
public DimAsyncFunction(String tableName) {
this.tableName = tableName;
}
@Override
public void open(Configuration parameters) throws Exception {
//初始化線程池
executorPool = ThreadPoolUtil.getPoolExecutor();
}
@Override
public void asyncInvoke(T obj, ResultFuture<T> resultFuture) throws Exception {
executorPool.submit(new Runnable() {
@Override
public void run() {
try {
long start = System.currentTimeMillis();
String key = getKey(obj);
//獲取維度信息
JSONObject dimInfoJsonObj = DimUtil.getDimInfo(tableName, key);
//關(guān)聯(lián)維度
if (dimInfoJsonObj != null){
join(obj,dimInfoJsonObj);
}
long end = System.currentTimeMillis();
System.out.println("關(guān)聯(lián)維度數(shù)據(jù)烟央,耗時(shí):"+(end - start)+" 毫秒。");
resultFuture.complete(Arrays.asList(obj));
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(tableName+"維度查詢失敗");
}
}
});
}
}
3.4 添加到主任務(wù)
將維度數(shù)據(jù)加入到訂單寬表任務(wù)中歪脏,在訂單寬表任務(wù)中OrderWideApp.java疑俭,完成對(duì)訂單明細(xì)的雙流join后,將用戶維度數(shù)據(jù)關(guān)聯(lián)到訂單寬表中婿失。
/**
* 關(guān)聯(lián)用戶維度數(shù)據(jù)
* flink異步查詢
* https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/operators/asyncio/#%e5%bc%82%e6%ad%a5-io-api
*/
SingleOutputStreamOperator<OrderWide> orderWideWithUserDs = AsyncDataStream.unorderedWait(orderWideDs, new DimAsyncFunction<OrderWide>("DIM_USER_INFO") {
@Override
public String getKey(OrderWide obj) {
return obj.getOrder_id().toString();
}
@Override
public void join(OrderWide orderWide, JSONObject dimInfo) {
Date birthday = dimInfo.getDate("BIRTHDAY");
Long age = DateUtil.betweenYear(birthday, new Date(), false);
orderWide.setUser_age(age.intValue());
orderWide.setUser_gender(dimInfo.getString("GENDER"));
}
}, 60, TimeUnit.SECONDS);
orderWideWithUserDs.print("order wide with users >>>");
3.5 測(cè)試
開(kāi)啟的服務(wù):zk钞艇,kf,redis豪硅,hdfs哩照,hbase,maxwell懒浮,BaseDbTask.java
注:要清除的數(shù)據(jù)
-
mysql配置表飘弧,之前手動(dòng)加的配置表刪除,通過(guò)腳本執(zhí)行要同步的表
/*Data for the table `table_process` */ INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_info', 'insert', 'hbase', 'dim_activity_info', 'id,activity_name,activity_type,activity_desc,start_time,end_time,create_time', 'id', NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_info', 'update', 'hbase', 'dim_activity_info', 'id,activity_name,activity_type,activity_desc,start_time,end_time,create_time', NULL, NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_rule', 'insert', 'hbase', 'dim_activity_rule', 'id,activity_id,activity_type,condition_amount,condition_num,benefit_amount,benefit_discount,benefit_level', 'id', NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_rule', 'update', 'hbase', 'dim_activity_rule', 'id,activity_id,activity_type,condition_amount,condition_num,benefit_amount,benefit_discount,benefit_level', NULL, NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_sku', 'insert', 'hbase', 'dim_activity_sku', 'id,activity_id,sku_id,create_time', 'id', NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('activity_sku', 'update', 'hbase', 'dim_activity_sku', 'id,activity_id,sku_id,create_time', NULL, NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category1', 'insert', 'hbase', 'dim_base_category1', 'id,name', 'id', NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category1', 'update', 'hbase', 'dim_base_category1', 'id,name', NULL, NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category2', 'insert', 'hbase', 'dim_base_category2', 'id,name,category1_id', 'id', NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category2', 'update', 'hbase', 'dim_base_category2', 'id,name,category1_id', NULL, NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category3', 'insert', 'hbase', 'dim_base_category3', 'id,name,category2_id', 'id', NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_category3', 'update', 'hbase', 'dim_base_category3', 'id,name,category2_id', NULL, NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_dic', 'insert', 'hbase', 'dim_base_dic', 'id,dic_name,parent_code,create_time,operate_time', 'id', NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_dic', 'update', 'hbase', 'dim_base_dic', 'id,dic_name,parent_code,create_time,operate_time', NULL, NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_province', 'insert', 'hbase', 'dim_base_province', 'id,name,region_id,area_code,iso_code,iso_3166_2', NULL, NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_province', 'update', 'hbase', 'dim_base_province', 'id,name,region_id,area_code,iso_code,iso_3166_2', NULL, NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_region', 'insert', 'hbase', 'dim_base_region', 'id,region_name', NULL, NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_region', 'update', 'hbase', 'dim_base_region', 'id,region_name', NULL, NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_trademark', 'insert', 'hbase', 'dim_base_trademark', 'id,tm_name', 'id', NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('base_trademark', 'update', 'hbase', 'dim_base_trademark', 'id,tm_name', NULL, NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('cart_info', 'insert', 'kafka', 'dwd_cart_info', 'id,user_id,sku_id,cart_price,sku_num,img_url,sku_name,is_checked,create_time,operate_time,is_ordered,order_time,source_type,source_id', 'id', NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('comment_info', 'insert', 'kafka', 'dwd_comment_info', 'id,user_id,nick_name,head_img,sku_id,spu_id,order_id,appraise,comment_txt,create_time,operate_time', 'id', NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_info', 'insert', 'hbase', 'dim_coupon_info', 'id,coupon_name,coupon_type,condition_amount,condition_num,activity_id,benefit_amount,benefit_discount,create_time,range_type,limit_num,taken_count,start_time,end_time,operate_time,expire_time,range_desc', 'id', NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_info', 'update', 'hbase', 'dim_coupon_info', 'id,coupon_name,coupon_type,condition_amount,condition_num,activity_id,benefit_amount,benefit_discount,create_time,range_type,limit_num,taken_count,start_time,end_time,operate_time,expire_time,range_desc', NULL, NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_range', 'insert', 'hbase', 'dim_coupon_range', 'id,coupon_id,range_type,range_id', 'id', NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_range', 'update', 'hbase', 'dim_coupon_range', 'id,coupon_id,range_type,range_id', NULL, NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_use', 'insert', 'kafka', 'dwd_coupon_use', 'id,coupon_id,user_id,order_id,coupon_status,get_type,get_time,using_time,used_time,expire_time', 'id', ' SALT_BUCKETS = 3'); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('coupon_use', 'update', 'kafka', 'dwd_coupon_use', 'id,coupon_id,user_id,order_id,coupon_status,get_type,get_time,using_time,used_time,expire_time', NULL, NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('favor_info', 'insert', 'kafka', 'dwd_favor_info', 'id,user_id,sku_id,spu_id,is_cancel,create_time,cancel_time', 'id', NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('financial_sku_cost', 'insert', 'hbase', 'dim_financial_sku_cost', 'id,sku_id,sku_name,busi_date,is_lastest,sku_cost,create_time', 'id', NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('financial_sku_cost', 'update', 'hbase', 'dim_financial_sku_cost', 'id,sku_id,sku_name,busi_date,is_lastest,sku_cost,create_time', NULL, NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_detail', 'insert', 'kafka', 'dwd_order_detail', 'id,order_id,sku_id,sku_name,order_price,sku_num,create_time,source_type,source_id,split_activity_amount,split_coupon_amount,split_total_amount', 'id', NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_detail_activity', 'insert', 'kafka', 'dwd_order_detail_activity', 'id,order_id,order_detail_id,activity_id,activity_rule_id,sku_id,create_time', 'id', NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_detail_coupon', 'insert', 'kafka', 'dwd_order_detail_coupon', 'id,order_id,order_detail_id,coupon_id,coupon_use_id,sku_id,create_time', 'id', NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_info', 'insert', 'kafka', 'dwd_order_info', 'id,consignee,consignee_tel,total_amount,order_status,user_id,payment_way,delivery_address,order_comment,out_trade_no,trade_body,create_time,operate_time,expire_time,process_status,tracking_no,parent_order_id,img_url,province_id,activity_reduce_amount,coupon_reduce_amount,original_total_amount,feight_fee,feight_fee_reduce,refundable_time', 'id', NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_info', 'update', 'kafka', 'dwd_order_info_update', 'id,consignee,consignee_tel,total_amount,order_status,user_id,payment_way,delivery_address,order_comment,out_trade_no,trade_body,create_time,operate_time,expire_time,process_status,tracking_no,parent_order_id,img_url,province_id,activity_reduce_amount,coupon_reduce_amount,original_total_amount,feight_fee,feight_fee_reduce,refundable_time', NULL, NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('order_refund_info', 'insert', 'kafka', 'dwd_order_refund_info', 'id,user_id,order_id,sku_id,refund_type,refund_num,refund_amount,refund_reason_type,refund_reason_txt,refund_status,create_time', 'id', NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('payment_info', 'insert', 'kafka', 'dwd_payment_info', 'id,out_trade_no,order_id,user_id,payment_type,trade_no,total_amount,subject,payment_status,create_time,callback_time,callback_content', 'id', NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('payment_info', 'update', 'kafka', 'dwd_payment_info', 'id,out_trade_no,order_id,user_id,payment_type,trade_no,total_amount,subject,payment_status,create_time,callback_time,callback_content', NULL, NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('refund_payment', 'insert', 'kafka', 'dwd_refund_payment', 'id,out_trade_no,order_id,sku_id,payment_type,trade_no,total_amount,subject,refund_status,create_time,callback_time,callback_content', 'id', NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('refund_payment', 'update', 'kafka', 'dwd_refund_payment', 'id,out_trade_no,order_id,sku_id,payment_type,trade_no,total_amount,subject,refund_status,create_time,callback_time,callback_content', NULL, NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('sku_info', 'insert', 'hbase', 'dim_sku_info', 'id,spu_id,price,sku_name,sku_desc,weight,tm_id,category3_id,sku_default_img,is_sale,create_time', 'id', ' SALT_BUCKETS = 4'); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('sku_info', 'update', 'hbase', 'dim_sku_info', 'id,spu_id,price,sku_name,sku_desc,weight,tm_id,category3_id,sku_default_img,is_sale,create_time', NULL, NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('spu_info', 'insert', 'hbase', 'dim_spu_info', 'id,spu_name,description,category3_id,tm_id', 'id', ' SALT_BUCKETS = 3'); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('spu_info', 'update', 'hbase', 'dim_spu_info', 'id,spu_name,description,category3_id,tm_id', NULL, NULL); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('user_info', 'insert', 'hbase', 'dim_user_info', 'id,login_name,name,user_level,birthday,gender,create_time,operate_time', 'id', ' SALT_BUCKETS = 3'); INSERT INTO `table_process`(`source_table`, `operate_type`, `sink_type`, `sink_table`, `sink_columns`, `sink_pk`, `sink_extend`) VALUES ('user_info', 'update', 'hbase', 'dim_user_info', 'id,login_name,name,user_level,birthday,gender,create_time,operate_time', NULL, NULL);
-
hbase數(shù)據(jù)清除砚著,重新建立維度表
!tables
:查看所有表drop table GMALL_REALTIME.BASE_TRADEMARK;
:刪除表 -
初始化維度數(shù)據(jù)
將用戶表的歷史全量同步到hbase中次伶,通過(guò)Maxwell的Bootstrap完成,Maxwell安裝及使用可查看之前的文章稽穆。
bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop101 --database gmall2021 --table user_info --client_id maxwell_1
啟動(dòng)模擬生成業(yè)務(wù)數(shù)據(jù)jar
經(jīng)過(guò)測(cè)試冠王,可以看到訂單寬表中用戶信息的年齡及性別分別都有值。
4. 其他維度關(guān)聯(lián)
4.1 關(guān)聯(lián)省份維度
關(guān)聯(lián)省份維度和關(guān)聯(lián)用戶維度處理邏輯一樣舌镶,這里就要以關(guān)聯(lián)用戶維度后的結(jié)果流為基礎(chǔ)柱彻,再去關(guān)聯(lián)省份
需要做的要先把省份的維度數(shù)據(jù)全同步到hbase练慕,還是通過(guò)Maxwell完成
bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop101 --database gmall2021 --table base_province --client_id maxwell_1
/**
* 關(guān)聯(lián)省份維度
* 以上一個(gè)流為基礎(chǔ)厕妖,關(guān)聯(lián)省份數(shù)據(jù)
*/
SingleOutputStreamOperator<OrderWide> orderWideWithProvinceDs = AsyncDataStream.unorderedWait(orderWideWithUserDs,
new DimAsyncFunction<OrderWide>("DIM_BASE_PROVINCE") {
@Override
public String getKey(OrderWide orderWide) {
return orderWide.getProvince_id().toString();
}
@Override
public void join(OrderWide orderWide, JSONObject dimInfo) {
orderWide.setProvince_name(dimInfo.getString("NAME"));
orderWide.setProvince_iso_code(dimInfo.getString("ISO_CODE"));
orderWide.setProvince_area_code(dimInfo.getString("AREA_CODE"));
orderWide.setProvince_3166_2_code(dimInfo.getString("ISO_3166_2"));
}
}, 60, TimeUnit.SECONDS);
orderWideWithProvinceDs.print("order wide with province>>>");
4.2 關(guān)聯(lián)sku維度
初始化sku維度數(shù)據(jù)
bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop101 --database gmall2021 --table sku_info --client_id maxwell_1
/**
* 關(guān)聯(lián)sku數(shù)據(jù)
*/
SingleOutputStreamOperator<OrderWide> orderWideWithSkuDs = AsyncDataStream.unorderedWait(orderWideWithProvinceDs,
new DimAsyncFunction<OrderWide>("DIM_SKU_INFO") {
@Override
public String getKey(OrderWide orderWide) {
return orderWide.getSku_id().toString();
}
@Override
public void join(OrderWide orderWide, JSONObject dimInfo) {
orderWide.setSku_name(dimInfo.getString("SKU_NAME"));
orderWide.setSpu_id(dimInfo.getLong("SPU_ID"));
orderWide.setCategory3_id(dimInfo.getLong("CATEGORY3_ID"));
orderWide.setTm_id(dimInfo.getLong("TM_ID"));
}
}, 60, TimeUnit.SECONDS);
4.3 關(guān)聯(lián)spu維度
初始化spu維度數(shù)據(jù)
bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop101 --database gmall2021 --table spu_info --client_id maxwell_1
/**
* 關(guān)聯(lián)spu數(shù)據(jù)
*/
SingleOutputStreamOperator<OrderWide> orderWideWithSpuDs = AsyncDataStream.unorderedWait(orderWideWithSkuDs, new DimAsyncFunction<OrderWide>("DIM_SPU_INFO") {
@Override
public String getKey(OrderWide orderWide) {
return orderWide.getSpu_id().toString();
}
@Override
public void join(OrderWide orderWide, JSONObject dimInfo) {
orderWide.setSpu_name(dimInfo.getString("SPU_NAME"));
}
}, 60, TimeUnit.SECONDS);
4.4 關(guān)聯(lián)品類維度
初始化品類維度數(shù)據(jù)
bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop101 --database gmall2021 --table base_category3 --client_id maxwell_1
/**
* 關(guān)聯(lián)品類數(shù)據(jù)
*/
SingleOutputStreamOperator<OrderWide> orderWideWithCategoryDs = AsyncDataStream.unorderedWait(orderWideWithSpuDs, new DimAsyncFunction<OrderWide>("DIM_BASE_CATEGORY3") {
@Override
public String getKey(OrderWide orderWide) {
return orderWide.getCategory3_id().toString();
}
@Override
public void join(OrderWide orderWide, JSONObject dimInfo) {
orderWide.setCategory3_name(dimInfo.getString("NAME"));
}
}, 60, TimeUnit.SECONDS);
4.5 關(guān)聯(lián)品牌維度
初始化品牌維度數(shù)據(jù)
bin/maxwell-bootstrap --user maxwell --password 123456 --host hadoop101 --database gmall2021 --table base_trademark --client_id maxwell_1
/**
* 關(guān)聯(lián)品牌數(shù)據(jù)
*/
SingleOutputStreamOperator<OrderWide> orderWideWithTmDs = AsyncDataStream.unorderedWait(orderWideWithCategoryDs, new DimAsyncFunction<OrderWide>("DIM_BASE_TRADEMARK") {
@Override
public String getKey(OrderWide orderWide) {
return orderWide.getTm_id().toString();
}
@Override
public void join(OrderWide orderWide, JSONObject dimInfo) {
orderWide.setTm_name(dimInfo.getString("TM_NAME"));
}
}, 60, TimeUnit.SECONDS);
orderWideWithTmDs.print("order wide with sku_spu_category_tm >>> ");
5. 訂單寬表寫入kafka
/**
* 將關(guān)聯(lián)后的訂單寬表數(shù)據(jù)發(fā)送到kafka的dwm層
*/
orderWideWithTmDs.map(orderWide -> JSONObject.toJSONString(orderWide))
.addSink(MyKafkaUtil.getKafkaSink(orderWideTopic));
項(xiàng)目地址:https://github.com/zhangbaohpu/gmall-flink-parent
更多請(qǐng)?jiān)谀彻?hào)平臺(tái)搜索:選手一號(hào)位仆潮,本文編號(hào):1010跳芳,回復(fù)即可獲取。