維表Join嘗鮮
什么是維表
維表菇爪,維度表的簡(jiǎn)稱唠梨,來(lái)源于數(shù)據(jù)倉(cāng)庫(kù)袋励,一般用來(lái)給事實(shí)
數(shù)據(jù)補(bǔ)充信息。假設(shè)現(xiàn)在有一張銷售記錄表当叭。銷售記錄表里面的一條銷售記錄就是一條事實(shí)數(shù)據(jù)
茬故,而這條銷售記錄中的地區(qū)
字段就是一個(gè)維度。通常銷售記錄表里面的地區(qū)
字段是地區(qū)表的主鍵蚁鳖,地區(qū)表就是一張維表磺芭。更多的細(xì)節(jié)可以面向百度/谷歌編程
。
為什么Flink中需要維表
以流計(jì)算為例醉箕,一般情況下钾腺,消費(fèi)的消息中間件中的消息,是事實(shí)表
中的數(shù)據(jù)讥裤,我們需要把數(shù)據(jù)補(bǔ)全垮庐,不然誰(shuí)也不知道字段地區(qū)
對(duì)應(yīng)的值01、02
是個(gè)什么東西坞琴。所以,我們通常會(huì)在計(jì)算過(guò)程中逗抑,通過(guò)Join維表來(lái)補(bǔ)全數(shù)據(jù)剧辐。
Flink如何Join維表
1.9之前版本
- 如果我們使用的是DataStream,那直接通過(guò)Alibaba貢獻(xiàn)給社區(qū)的異步I/O邮府,完成維表的連接
- 如果是Flink Sql荧关,那么需要先通過(guò)
calcite
將sql翻譯成DataStream,再通過(guò)異步I/O褂傀,完成維表關(guān)聯(lián) - 如果直接通過(guò)
FlatMap
之類的算子忍啤,在open()
中連接第三方存儲(chǔ),在flatMap()
中完成查詢,那么速度會(huì)非常之慢同波,查詢需要等待返回后再將數(shù)據(jù)下發(fā)鳄梅。期間如果數(shù)據(jù)量過(guò)大,很容易將第三方存儲(chǔ)比如mysql
服務(wù)打爆未檩,因?yàn)槊織l數(shù)據(jù)都需要去查一遍mysql
戴尸。所以這種方式非常不推薦。
1.9及之后
- 直接在DDL中定義數(shù)據(jù)維表冤狡,這里貼一下官網(wǎng)的定義方式:
CREATE TABLE MyUserTable (
...
) WITH (
'connector.type' = 'jdbc', -- required: specify this table type is jdbc
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- required: JDBC DB url
'connector.table' = 'jdbc_table_name', -- required: jdbc table name
'connector.driver' = 'com.mysql.jdbc.Driver', -- optional: the class name of the JDBC driver to use to connect to this URL.
-- If not set, it will automatically be derived from the URL.
'connector.username' = 'name', -- optional: jdbc user name and password
'connector.password' = 'password',
-- lookup options, optional, used in temporary join
'connector.lookup.cache.max-rows' = '5000', -- optional, max number of rows of lookup cache, over this value, the oldest rows will
-- be eliminated. "cache.max-rows" and "cache.ttl" options must all be specified if any
-- of them is specified. Cache is not enabled as default.
'connector.lookup.cache.ttl' = '10s', -- optional, the max time to live for each rows in lookup cache, over this time, the oldest rows
-- will be expired. "cache.max-rows" and "cache.ttl" options must all be specified if any of
-- them is specified. Cache is not enabled as default.
'connector.lookup.max-retries' = '3', -- optional, max retry times if lookup database failed
)
這里特別說(shuō)明一下孙蒙,connector.lookup.cache.max-rows
和 connector.lookup.cache.ttl
必須同時(shí)出現(xiàn)。這里指的是配置緩存和緩存時(shí)間悲雳。
- 依舊存在幾個(gè)小問(wèn)題
- 指定緩存后挎峦,Join維表時(shí)拿到的可能是舊數(shù)據(jù)。所以合瓢,如果想要拿到盡可能新的數(shù)據(jù)坦胶,緩存時(shí)間又需要設(shè)置的較短或者沒(méi)有,那又失去了緩存的意義歪玲;緩存條數(shù)如果過(guò)大會(huì)將內(nèi)存撐爆迁央,而且內(nèi)存中的數(shù)據(jù)相對(duì)第三方存儲(chǔ)中的數(shù)據(jù)來(lái)說(shuō),可能又是舊數(shù)據(jù)滥崩,而太小又得去頻繁查詢數(shù)據(jù)庫(kù)岖圈,也就失去了緩存的意義。所以钙皮,緩存的大小和時(shí)間需要一個(gè)很好的平衡蜂科。不過(guò)通常來(lái)說(shuō),維表中的數(shù)據(jù)應(yīng)該是很少變化的短条,同樣导匣,如果緩存較多的條數(shù),那在啟動(dòng)任務(wù)時(shí)相應(yīng)的提高TM的內(nèi)存茸时,也能夠緩解撐爆內(nèi)存的情形贡定。
- 目前Flink Sql原生支持的維表關(guān)聯(lián)只支持同步模式,如果需要異步模式或者想用其他的第三方存儲(chǔ)只能夠自己去實(shí)現(xiàn)可都。當(dāng)然缓待,我們之后也會(huì)有真正的異步維表Join案例分享,大家敬請(qǐng)期待渠牲。
接下來(lái)旋炒,我們通過(guò)案例來(lái)了解如何使用維表Join
Flink Sql 1.10 維表Join
在正式開(kāi)始之前,先做好準(zhǔn)備工作签杈,之前通過(guò)docker
安裝的mysql
存在一點(diǎn)小問(wèn)題瘫镇,我們往表中插入中文字符時(shí)會(huì)出現(xiàn)亂碼,我們需要去解決一下。
首先铣除,進(jìn)入我們的mysql
容器里面docker exec -it 9f7da4663dc1 '/bin/bash'
谚咬,然后執(zhí)行echo "character-set-server=utf8" >> /etc/mysql/mysql.conf.d/mysqld.cnf
。執(zhí)行完畢后一路ctrl+D
退到最外面通孽,通過(guò)docker restart 容器id
將mysql
服務(wù)重啟序宦。重新連接上mysql
,執(zhí)行show variables like '%character%';
背苦,然后確定character_set_server
是否為utf-8互捌。
接著我們?cè)?code>mysql中創(chuàng)建一張維表
CREATE TABLE `dim_behavior` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`c_name` varchar(255) COLLATE utf8_bin DEFAULT NULL,
`e_name` varchar(255) COLLATE utf8_bin DEFAULT NULL,
PRIMARY KEY (`id`)
)
再插入兩條數(shù)據(jù)
INSERT INTO `test`.`dim_behavior`(`id`, `c_name`, `e_name`) VALUES (1, '瀏覽', 'pv');
INSERT INTO `test`.`dim_behavior`(`id`, `c_name`, `e_name`) VALUES (2, '購(gòu)買', 'buy');
準(zhǔn)備工作完畢,下面我們直接看代碼
package FlinkSql;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
public class FlinkSql03 {
public static final String CSV_TABLE_SOURCE_DDL = "" +
"CREATE TABLE csv_source (\n" +
" user_id bigint,\n" +
" item_id bigint,\n" +
" category_id bigint,\n" +
" behavior varchar,\n" +
" ts bigint,\n" +
" proctime as PROCTIME() \n"+
") WITH (\n" +
" 'connector.type' = 'filesystem', -- 指定連接類型\n" +
" 'connector.path' = 'C:\\Users\\tzmaj\\Desktop\\教程\\3\\UserBehavior.csv',-- 目錄 \n" +
" 'format.type' = 'csv', -- 文件格式 \n" +
" 'format.field-delimiter' = ',' ,-- 字段分隔符 \n" +
" 'format.fields.0.name' = 'user_id',-- 第N字段名行剂,相當(dāng)于表的schema秕噪,索引從0開(kāi)始 \n" +
" 'format.fields.0.data-type' = 'bigint',-- 字段類型\n" +
" 'format.fields.1.name' = 'item_id', \n" +
" 'format.fields.1.data-type' = 'bigint',\n" +
" 'format.fields.2.name' = 'category_id',\n" +
" 'format.fields.2.data-type' = 'bigint',\n" +
" 'format.fields.3.name' = 'behavior', \n" +
" 'format.fields.3.data-type' = 'String',\n" +
" 'format.fields.4.name' = 'ts', \n" +
" 'format.fields.4.data-type' = 'bigint'\n" +
") ";
public static final String MYSQL_TABLE_DIM_DDL = ""+
"CREATE TABLE `dim_behavior` (\n" +
" `id` int ,\n" +
" `c_name` varchar ,\n" +
" `e_name` varchar \n" +
")WITH (\n" +
" 'connector.type' = 'jdbc', -- 連接方式\n" +
" 'connector.url' = 'jdbc:mysql://localhost:3306/test', -- jdbc的url\n" +
" 'connector.table' = 'dim_behavior', -- 表名\n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', -- 驅(qū)動(dòng)名字,可以不填厚宰,會(huì)自動(dòng)從上面的jdbc url解析 \n" +
" 'connector.username' = 'root', -- 顧名思義 用戶名\n" +
" 'connector.password' = '123456' , -- 密碼\n" +
" 'connector.lookup.cache.max-rows' = '5000', -- 緩存條數(shù) \n"+
" 'connector.lookup.cache.ttl' = '10s' -- 緩存時(shí)間 \n"+
")";
public static final String MYSQL_TABLE_SINK_DDL=""+
"CREATE TABLE `result_1` (\n" +
" `behavior` varchar ,\n" +
" `count_unique_user` bigint, \n" +
" `e_name` varchar \n" +
")WITH (\n" +
" 'connector.type' = 'jdbc', -- 連接方式\n" +
" 'connector.url' = 'jdbc:mysql://localhost:3306/test', -- jdbc的url\n" +
" 'connector.table' = 'result_1', -- 表名\n" +
" 'connector.driver' = 'com.mysql.jdbc.Driver', -- 驅(qū)動(dòng)名字腌巾,可以不填,會(huì)自動(dòng)從上面的jdbc url解析 \n" +
" 'connector.username' = 'root', -- 顧名思義 用戶名\n" +
" 'connector.password' = '123456' , -- 密碼\n" +
" 'connector.write.flush.max-rows' = '5000', -- 意思是攢滿多少條才觸發(fā)寫入 \n" +
" 'connector.write.flush.interval' = '1' -- 意思是攢滿多少秒才觸發(fā)寫入铲觉;這2個(gè)參數(shù)澈蝙,無(wú)論數(shù)據(jù)滿足哪個(gè)條件,就會(huì)觸發(fā)寫入\n" +
// " 'update-mode' = 'upsert' -- 指定為插入更新模式 \n"+
")";
public static void main(String[] args) throws Exception {
//構(gòu)建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//構(gòu)建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//構(gòu)建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
//注冊(cè)csv文件數(shù)據(jù)源表
tEnv.sqlUpdate(CSV_TABLE_SOURCE_DDL);
//注冊(cè)mysql數(shù)據(jù)維表
tEnv.sqlUpdate(MYSQL_TABLE_DIM_DDL);
//注冊(cè)mysql數(shù)據(jù)結(jié)果表
tEnv.sqlUpdate(MYSQL_TABLE_SINK_DDL);
//計(jì)算每種類型的行為有多少用戶
Table group = tEnv.sqlQuery("select behavior,count(distinct user_id) count_unique_user from csv_source group by behavior ");
//轉(zhuǎn)回datastream撵幽,因?yàn)樾枰黾觩roctime灯荧,而目前定義proctime方式只有兩種,一種是在定義DDL的時(shí)候盐杂,一種是在DataStream轉(zhuǎn) Table的時(shí)候
//轉(zhuǎn)撤回流是因?yàn)樯厦娴膕ql用了group by逗载,所以只能使用撤回流
DataStream<Row> ds = tEnv.toRetractStream(group, Row.class).flatMap(
new FlatMapFunction<Tuple2<Boolean, Row>, Row>() {
@Override
public void flatMap(Tuple2<Boolean, Row> value, Collector<Row> collect) throws Exception {
collect.collect(value.f1);
}
}
).returns(Types.ROW(Types.STRING,Types.LONG));
//給Table增加proctime字段,ts可以隨便改成別的你喜歡的名字
Table table = tEnv.fromDataStream(ds, "behavior,count_unique_user,ts.proctime");
//建立視圖链烈,保留臨時(shí)表
tEnv.createTemporaryView("group_by_view",table);
//pv厉斟,buy,cart...等行為對(duì)應(yīng)的英文名强衡,我們通過(guò)維表Join的方式擦秽,替換為中文名
//FOR SYSTEM_TIME AS OF a.ts AS b 這是固定寫法,ts與上面指定dataStream schema時(shí)候用的名字一致
//這里之所以再group by漩勤,是讓這次查詢變成撤回流号涯,這樣插入mysql時(shí),可以通過(guò)主鍵自動(dòng)update數(shù)據(jù)
Table join = tEnv.sqlQuery("select b.c_name as behavior , max(a.count_unique_user) ,a.behavior as e_name " +
"from group_by_view a " +
"left join dim_behavior FOR SYSTEM_TIME AS OF a.ts AS b " +
"on a.behavior = b.e_name " +
"group by a.behavior,b.c_name");
//建立視圖锯七,保留臨時(shí)表
tEnv.createTemporaryView("join_view",join);
//數(shù)據(jù)輸出到mysql
tEnv.sqlUpdate("insert into result_1 select * from join_view");
//任務(wù)啟動(dòng),這行必不可少誉己!
env.execute("FlinkSql03");
}
}
如果UserBehavior.csv
沒(méi)有的話眉尸,可以通過(guò)第一課Flink Sql教程(1)最下方的連接下載。
接下來(lái)讓我們看看mysql中的結(jié)果
behavior count_unique_user e_name
behavior | count_unique_user | e_name |
---|---|---|
購(gòu)買 | 10587 | buy |
23143 | cart | |
12194 | fav | |
瀏覽 | 215662 | pv |
中間兩行的behavior
列之所以沒(méi)有值,是因?yàn)槲覀儧](méi)有在維表中定義噪猾,加上我們采用的是left join霉祸。大家可以自己定義然后測(cè)試數(shù)據(jù)能否正確補(bǔ)齊。
那么袱蜡,我們這次的維表Join嘗鮮就這樣結(jié)束了丝蹭,下一章將更新UDF
和Temporal table function Join
,不過(guò)應(yīng)該是五一之后的事了坪蚁。