感謝您的關(guān)注 + 點(diǎn)贊 + 再看灭返,對(duì)博主的肯定庄蹋,會(huì)督促博主持續(xù)的輸出更多的優(yōu)質(zhì)實(shí)戰(zhàn)內(nèi)容=芗恕!埋涧!
1.序篇-本文結(jié)構(gòu)
- 背景篇-為啥需要 redis 維表
- 目標(biāo)篇-做 redis 維表的預(yù)期效果是什么
- 難點(diǎn)剖析篇-此框架建設(shè)的難點(diǎn)、目前有哪些實(shí)現(xiàn)
- 維表實(shí)現(xiàn)篇-維表實(shí)現(xiàn)的過(guò)程
- 總結(jié)與展望篇
本文主要介紹了 flink sql redis 維表的實(shí)現(xiàn)過(guò)程奇瘦。
如果想在本地測(cè)試下:
- 在公眾號(hào)后臺(tái)回復(fù)flink sql 知其所以然(二)| sql 自定義 redis 數(shù)據(jù)維表獲取源碼(源碼基于 1.13.1 實(shí)現(xiàn))
- 在你的本地安裝并打開 redis-server棘催,然后使用 redis-cli 執(zhí)行命令
set a "{\"score\":3,\"name\":\"namehhh\",\"name1\":\"namehhh112\"}"
- 執(zhí)行源碼包中的
flink.examples.sql._03.source_sink.RedisLookupTest
測(cè)試類,就可以在 console 中看到結(jié)果耳标。
如果想直接在集群環(huán)境使用:
- 命令行執(zhí)行
mvn package -DskipTests=true
打包 - 將生成的包
flink-examples-0.0.1-SNAPSHOT.jar
引入 flink lib 中即可醇坝,無(wú)需其它設(shè)置。
2.背景篇-為啥需要 redis 維表
2.1.啥是維表?事實(shí)表呼猪?
Dimension Table 概念多出現(xiàn)于數(shù)據(jù)倉(cāng)庫(kù)里面画畅,維表與事實(shí)表相互對(duì)應(yīng)。
給兩個(gè)場(chǎng)景來(lái)看看:
比如需要統(tǒng)計(jì)分性別的 DAU:
- 客戶端上報(bào)的日志中(事實(shí)表)只有設(shè)備 id宋距,只用這個(gè)事實(shí)表是沒(méi)法統(tǒng)計(jì)出分性別的 DAU 的轴踱。
- 這時(shí)候就需要一張帶有設(shè)備 id、性別映射的表(這就是維表)來(lái)提供性別數(shù)據(jù)谚赎。
- 然后使用事實(shí)表去 join 這張維表去獲取到每一個(gè)設(shè)備 id 對(duì)應(yīng)的性別淫僻,然后就可以統(tǒng)計(jì)出分性別的 DAU。相當(dāng)于一個(gè)擴(kuò)充維度的操作壶唤。
https://blog.csdn.net/weixin_47482194/article/details/105855116?spm=1001.2014.3001.5501
比如目前想要統(tǒng)計(jì)整體銷售額:
- 目前已有 “銷售統(tǒng)計(jì)表”雳灵,是一個(gè)事實(shí)表,其中沒(méi)有具體銷售品項(xiàng)的金額闸盔。
- “商品價(jià)格表” 可以用于提供具體銷售品項(xiàng)的金額悯辙,這就是銷售統(tǒng)計(jì)的一個(gè)維度表。
事實(shí)數(shù)據(jù)和維度數(shù)據(jù)的識(shí)別必須依據(jù)具體的主題問(wèn)題而定迎吵《阕“事實(shí)表” 用來(lái)存儲(chǔ)事實(shí)的度量及指向各個(gè)維的外鍵值。維表用來(lái)保存該維的元數(shù)據(jù)钓觉。
參考:https://blog.csdn.net/lindan1984/article/details/96566626
2.2.為啥需要 redis 維表茴肥?
目前在實(shí)時(shí)計(jì)算的場(chǎng)景中,熟悉 datastream 的同學(xué)大多數(shù)都使用過(guò) mysql\Hbase\redis 作為維表引擎存儲(chǔ)一些維度數(shù)據(jù)荡灾,然后在 datastream api 中調(diào)用 mysql\Hbase\redis 客戶端去獲取到維度數(shù)據(jù)進(jìn)行維度擴(kuò)充瓤狐。
而 redis 作為 flink 實(shí)時(shí)場(chǎng)景中最常用的高速維表引擎,官方是沒(méi)有提供 flink sql api 的 redis 維表 connector 的批幌。如下圖础锐,基于 1.13 版本。
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/
阿里云 flink 是提供了這個(gè)能力的荧缘。但是這個(gè)需要使用阿里云的產(chǎn)品才能使用皆警。有錢人可以直接上苏研。
https://www.alibabacloud.com/help/zh/faq-detail/122722.htm?spm=a2c63.q38357.a3.7.a1227a53TBMuSY
因此本文在介紹怎樣自定義一個(gè) sql 數(shù)據(jù)維表的同時(shí)昂利,實(shí)現(xiàn)一個(gè) sql redis 來(lái)給大家使用。
3.目標(biāo)篇-做 redis 維表預(yù)期效果是什么
redis 作為維表在 datastream 中的最常用的數(shù)據(jù)結(jié)構(gòu)就是 kv张弛、hmap 兩種绸罗。本文實(shí)現(xiàn)主要實(shí)現(xiàn) kv 結(jié)構(gòu)意推,map 結(jié)構(gòu)大家可以拿到源碼之后進(jìn)行自定義實(shí)現(xiàn)。也就多加幾行代碼就完事了珊蟀。
預(yù)期效果就如阿里云的 flink redis:
下面是我在本地跑的結(jié)果菊值,先看看 redis 中存儲(chǔ)的數(shù)據(jù),只有這一條數(shù)據(jù),是 json 字符串:
下面是預(yù)期 flink sql:
CREATE TABLE dimTable (
name STRING,
name1 STRING,
score BIGINT -- redis 中存儲(chǔ)數(shù)據(jù)的 schema
) WITH (
'connector' = 'redis', -- 指定 connector 是 redis 類型的
'hostname' = '127.0.0.1', -- redis server ip
'port' = '6379', -- redis server 端口
'format' = 'json' -- 指定 format 解析格式
'lookup.cache.max-rows' = '500', -- guava local cache 最大條目
'lookup.cache.ttl' = '3600', -- guava local cache ttl
'lookup.max-retries' = '1' -- redis 命令執(zhí)行失敗后重復(fù)次數(shù)
)
SELECT o.f0, o.f1, c.name, c.name1, c.score
FROM leftTable AS o
-- 維表 join
LEFT JOIN dimTable FOR SYSTEM_TIME AS OF o.proctime AS c
ON o.f0 = c.name
結(jié)果如下腻窒,后面三列就對(duì)應(yīng)到 c.name, c.name1, c.score
:
+I[a, b, namehhh, namehhh112, 3]
+I[a, b, namehhh, namehhh112, 3]
+I[a, b, namehhh, namehhh112, 3]
+I[a, b, namehhh, namehhh112, 3]
+I[a, b, namehhh, namehhh112, 3]
+I[a, b, namehhh, namehhh112, 3]
+I[a, b, namehhh, namehhh112, 3]
+I[a, b, namehhh, namehhh112, 3]
+I[a, b, namehhh, namehhh112, 3]
4.難點(diǎn)剖析篇-目前有哪些實(shí)現(xiàn)
目前可以從網(wǎng)上搜到的實(shí)現(xiàn)昵宇、以及可以參考的實(shí)現(xiàn)有以下兩個(gè):
- https://github.com/jeff-zou/flink-connector-redis。 但是其沒(méi)有實(shí)現(xiàn) flink sql redis 維表儿子,只實(shí)現(xiàn)了 sink 表瓦哎,并且使用起來(lái)有比較多的限制,包括需要在建表時(shí)就指定 key-column典徊,value-column 等杭煎,其實(shí)博主覺(jué)得沒(méi)必要指定這些字段,這些都可以動(dòng)態(tài)調(diào)整卒落。其實(shí)現(xiàn)是對(duì) apache-bahir-flink https://github.com/apache/bahir-flink 的二次開發(fā)羡铲,但與 bahir 原生實(shí)現(xiàn)有割裂感,因?yàn)檫@個(gè)項(xiàng)目幾乎重新實(shí)現(xiàn)了一遍儡毕,接口也和 bahir 不同也切。
- 阿里云實(shí)現(xiàn) https://www.alibabacloud.com/help/zh/faq-detail/122722.htm?spm=a2c63.q38357.a3.7.a1227a53TBMuSY。 可以參考的只有用法和配置等腰湾。但是有些配置項(xiàng)也屬于阿里自定義的雷恃。
因此博主在實(shí)現(xiàn)時(shí),就定了一個(gè)基調(diào)费坊。
- 復(fù)用 connector:復(fù)用 bahir 提供的 redis connnector
- 復(fù)用 format:復(fù)用 flink 目前的 format 機(jī)制倒槐,目前這個(gè)上述兩個(gè)實(shí)現(xiàn)都沒(méi)有做到
- 簡(jiǎn)潔性:實(shí)現(xiàn) kv 結(jié)構(gòu)。hget 封裝一部分
- 維表 local cache:為避免高頻率訪問(wèn) redis附井,維表加了 local cache 作為緩存
5.維表實(shí)現(xiàn)篇-維表實(shí)現(xiàn)的過(guò)程
在實(shí)現(xiàn) redis 維表之前讨越,不得不談?wù)?flink 維表加載和使用機(jī)制。
5.1.flink 維表原理
其實(shí)上節(jié)已經(jīng)詳細(xì)描述了 flink sql 對(duì)于 source\sink 的加載機(jī)制永毅,維表屬于 source 的中的 lookup 表把跨,在具體 flink 程序運(yùn)行的過(guò)程之中可以簡(jiǎn)單的理解為一個(gè) map,在 map 中調(diào)用 redis-client 接口訪問(wèn) redis 進(jìn)行擴(kuò)充維度的過(guò)程沼死。
- 通過(guò) SPI 機(jī)制加載所有的 source\sink\format 工廠
Factory
- 過(guò)濾出 DynamicTableSourceFactory + connector 標(biāo)識(shí)的 source 工廠類
- 通過(guò) source 工廠類創(chuàng)建出對(duì)應(yīng)的 source
如圖 source 和 sink 是通過(guò) FactoryUtil.createTableSource
和 FactoryUtil.createTableSink
創(chuàng)建的
所有通過(guò) SPI 的 source\sink\formt 插件都繼承自 Factory
着逐。
整體創(chuàng)建 source 方法的調(diào)用鏈如下圖。
5.2.flink 維表實(shí)現(xiàn)方案
先看下博主的最終實(shí)現(xiàn)意蛀。
總重要的三個(gè)實(shí)現(xiàn)類:
RedisDynamicTableFactory
RedisDynamicTableSource
RedisRowDataLookupFunction
具體流程:
- 定義 SPI 的工廠類
RedisDynamicTableFactory implements DynamicTableSourceFactory
耸别,并且在 resource\META-INF 下創(chuàng)建 SPI 的插件文件 - 實(shí)現(xiàn) factoryIdentifier 標(biāo)識(shí)
redis
- 實(shí)現(xiàn)
RedisDynamicTableFactory#createDynamicTableSource
來(lái)創(chuàng)建對(duì)應(yīng)的 sourceRedisDynamicTableSource
- 定義
RedisDynamicTableSource implements LookupTableSource
- 實(shí)現(xiàn)
RedisDynamicTableFactory#getLookupRuntimeProvider
方法,創(chuàng)建具體的維表 UDFTableFunction<T>
县钥,定義為RedisRowDataLookupFunction
- 實(shí)現(xiàn)
RedisRowDataLookupFunction
的 eval 方法太雨,這個(gè)方法就是用于訪問(wèn) redis 擴(kuò)充維度的。
介紹完流程魁蒜,進(jìn)入具體實(shí)現(xiàn)方案細(xì)節(jié):
RedisDynamicTableFactory
主要?jiǎng)?chuàng)建 source 的邏輯:
public class RedisDynamicTableFactory implements DynamicTableSourceFactory {
...
@Override
public String factoryIdentifier() {
// 標(biāo)識(shí) redis
return "redis";
}
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
// either implement your custom validation logic here ...
// or use the provided helper utility
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// discover a suitable decoding format
// format 實(shí)現(xiàn)
final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
DeserializationFormatFactory.class,
FactoryUtil.FORMAT);
// validate all options
// 所有 option 配置的校驗(yàn),比如 cache 類參數(shù)
helper.validate();
// get the validated options
final ReadableConfig options = helper.getOptions();
final RedisLookupOptions redisLookupOptions = RedisOptions.getRedisLookupOptions(options);
TableSchema schema = context.getCatalogTable().getSchema();
// 創(chuàng)建 RedisDynamicTableSource
return new RedisDynamicTableSource(
schema.toPhysicalRowDataType()
, decodingFormat
, redisLookupOptions);
}
}
resources\META-INF 文件:
RedisDynamicTableSource
主要?jiǎng)?chuàng)建 table udf 的邏輯:
public class RedisDynamicTableSource implements LookupTableSource {
...
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
// 初始化 redis 客戶端配置
FlinkJedisConfigBase flinkJedisConfigBase = new FlinkJedisPoolConfig.Builder()
.setHost(this.redisLookupOptions.getHostname())
.setPort(this.redisLookupOptions.getPort())
.build();
// redis key,value 序列化器
LookupRedisMapper lookupRedisMapper = new LookupRedisMapper(
this.createDeserialization(context, this.decodingFormat, createValueFormatProjection(this.physicalDataType)));
// 創(chuàng)建 table udf
return TableFunctionProvider.of(new RedisRowDataLookupFunction(
flinkJedisConfigBase
, lookupRedisMapper
, this.redisLookupOptions));
}
}
RedisRowDataLookupFunction
table udf 執(zhí)行維表關(guān)聯(lián)的主要流程:
public class RedisRowDataLookupFunction extends TableFunction<RowData> {
...
/**
* 具體 redis 執(zhí)行方法
*/
public void eval(Object... objects) throws IOException {
for (int retry = 0; retry <= maxRetryTimes; retry++) {
try {
// fetch result
this.evaler.accept(objects);
break;
} catch (Exception e) {
LOG.error(String.format("HBase lookup error, retry times = %d", retry), e);
if (retry >= maxRetryTimes) {
throw new RuntimeException("Execution of Redis lookup failed.", e);
}
try {
Thread.sleep(1000 * retry);
} catch (InterruptedException e1) {
throw new RuntimeException(e1);
}
}
}
}
@Override
public void open(FunctionContext context) {
LOG.info("start open ...");
// redis 命令執(zhí)行器兜看,初始化 redis 鏈接
try {
this.redisCommandsContainer =
RedisCommandsContainerBuilder
.build(this.flinkJedisConfigBase);
this.redisCommandsContainer.open();
} catch (Exception e) {
LOG.error("Redis has not been properly initialized: ", e);
throw new RuntimeException(e);
}
// 初始化 local cache
this.cache = cacheMaxSize <= 0 || cacheExpireMs <= 0 ? null : CacheBuilder.newBuilder()
.recordStats()
.expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
.maximumSize(cacheMaxSize)
.build();
if (cache != null) {
context.getMetricGroup()
.gauge("lookupCacheHitRate", (Gauge<Double>) () -> cache.stats().hitRate());
this.evaler = in -> {
RowData cacheRowData = cache.getIfPresent(in);
if (cacheRowData != null) {
collect(cacheRowData);
} else {
// fetch result
byte[] key = lookupRedisMapper.serialize(in);
byte[] value = null;
switch (redisCommand) {
case GET:
value = this.redisCommandsContainer.get(key);
break;
case HGET:
value = this.redisCommandsContainer.hget(key, this.additionalKey.getBytes());
break;
default:
throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
}
RowData rowData = this.lookupRedisMapper.deserialize(value);
collect(rowData);
cache.put(key, rowData);
}
};
}
...
}
}
5.2.1.復(fù)用 bahir connector
如圖是 bahir redis connector 的實(shí)現(xiàn)锥咸。
博主在實(shí)現(xiàn)過(guò)程中將能復(fù)用的都盡力復(fù)用。如圖是最終實(shí)現(xiàn)目錄细移。
可以看到目錄結(jié)構(gòu)是與 bahir redis connector 一致的搏予。
其中 redis 客戶端及其配置
是直接復(fù)用了 bahir redis 的。由于 bahir redis 基本都是 sink 實(shí)現(xiàn)弧轧,某些實(shí)現(xiàn)沒(méi)法繼承復(fù)用雪侥,所以這里我單獨(dú)開辟了目錄,redis 命令執(zhí)行器
和 redis 命令定義器
精绎,但是也基本和 bahir 一致速缨。
如果你想要在生產(chǎn)環(huán)境中進(jìn)行使用,可以直接將兩部分代碼合并代乃,成本很低旬牲。
5.2.2.復(fù)用 format
博主直接復(fù)用了 flink 本身自帶的 format 機(jī)制來(lái)作為維表反序列化機(jī)制。參考 HBase connector 實(shí)現(xiàn)將 cache 命中率添加到 metric 中搁吓。
public class RedisDynamicTableFactory implements DynamicTableSourceFactory {
...
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
...
// discover a suitable decoding format
// 復(fù)用 format 實(shí)現(xiàn)
final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
DeserializationFormatFactory.class,
FactoryUtil.FORMAT);
...
}
}
format 同樣也是 SPI 機(jī)制加載原茅。
源碼公眾號(hào)后臺(tái)回復(fù)flink sql 知其所以然(二)| sql 自定義 redis 數(shù)據(jù)維表獲取。
5.2.3.維表 local cache
local cache 在初始化時(shí)可以指定 cache 大小堕仔,緩存時(shí)長(zhǎng)等擂橘。
this.evaler = in -> {
RowData cacheRowData = cache.getIfPresent(in);
if (cacheRowData != null) {
collect(cacheRowData);
} else {
// fetch result
byte[] key = lookupRedisMapper.serialize(in);
byte[] value = null;
switch (redisCommand) {
case GET:
value = this.redisCommandsContainer.get(key);
break;
case HGET:
value = this.redisCommandsContainer.hget(key, this.additionalKey.getBytes());
break;
default:
throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
}
RowData rowData = this.lookupRedisMapper.deserialize(value);
collect(rowData);
cache.put(key, rowData);
}
};
6.總結(jié)與展望篇
6.1.總結(jié)
本文主要是針對(duì) flink sql redis 維表進(jìn)行了擴(kuò)展以及實(shí)現(xiàn),并且復(fù)用 bahir redis connector 的配置摩骨,具有良好的擴(kuò)展性通贞。
如果你正好需要這么一個(gè) connector,直接公眾號(hào)后臺(tái)回復(fù)flink sql 知其所以然(二)| sql 自定義 redis 數(shù)據(jù)維表獲取源碼吧仿吞。
6.2.展望
當(dāng)然上述只是 redis 維表一個(gè)基礎(chǔ)的實(shí)現(xiàn)滑频,用于生產(chǎn)環(huán)境還有很多方面可以去擴(kuò)展的。
- jedis cluster 的擴(kuò)展:目前 bahir datastream 中已經(jīng)實(shí)現(xiàn)了唤冈,可以直接參考峡迷,擴(kuò)展起來(lái)非常簡(jiǎn)單
- aync lookup 維表的擴(kuò)展:目前 hbase lookup 表已經(jīng)實(shí)現(xiàn)了,可以直接參考實(shí)現(xiàn)
- 異常 AOP你虹,alert 等