Flink Sql教程(3)

維表Join嘗鮮

什么是維表

維表菇爪,維度表的簡(jiǎn)稱唠梨,來(lái)源于數(shù)據(jù)倉(cāng)庫(kù)袋励,一般用來(lái)給事實(shí)數(shù)據(jù)補(bǔ)充信息。假設(shè)現(xiàn)在有一張銷售記錄表当叭。銷售記錄表里面的一條銷售記錄就是一條事實(shí)數(shù)據(jù)茬故,而這條銷售記錄中的地區(qū)字段就是一個(gè)維度。通常銷售記錄表里面的地區(qū)字段是地區(qū)表的主鍵蚁鳖,地區(qū)表就是一張維表磺芭。更多的細(xì)節(jié)可以面向百度/谷歌編程

為什么Flink中需要維表

以流計(jì)算為例醉箕,一般情況下钾腺,消費(fèi)的消息中間件中的消息,是事實(shí)表中的數(shù)據(jù)讥裤,我們需要把數(shù)據(jù)補(bǔ)全垮庐,不然誰(shuí)也不知道字段地區(qū)對(duì)應(yīng)的值01、02是個(gè)什么東西坞琴。所以,我們通常會(huì)在計(jì)算過(guò)程中逗抑,通過(guò)Join維表來(lái)補(bǔ)全數(shù)據(jù)剧辐。

Flink如何Join維表

1.9之前版本

  • 如果我們使用的是DataStream,那直接通過(guò)Alibaba貢獻(xiàn)給社區(qū)的異步I/O邮府,完成維表的連接
  • 如果是Flink Sql荧关,那么需要先通過(guò)calcite將sql翻譯成DataStream,再通過(guò)異步I/O褂傀,完成維表關(guān)聯(lián)
  • 如果直接通過(guò)FlatMap之類的算子忍啤,在open()中連接第三方存儲(chǔ),在flatMap()中完成查詢,那么速度會(huì)非常之慢同波,查詢需要等待返回后再將數(shù)據(jù)下發(fā)鳄梅。期間如果數(shù)據(jù)量過(guò)大,很容易將第三方存儲(chǔ)比如mysql服務(wù)打爆未檩,因?yàn)槊織l數(shù)據(jù)都需要去查一遍mysql戴尸。所以這種方式非常不推薦。

1.9及之后

  • 直接在DDL中定義數(shù)據(jù)維表冤狡,這里貼一下官網(wǎng)的定義方式:
CREATE TABLE MyUserTable (
  ...
) WITH (
  'connector.type' = 'jdbc', -- required: specify this table type is jdbc
  
  'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- required: JDBC DB url
  
  'connector.table' = 'jdbc_table_name',  -- required: jdbc table name
  
  'connector.driver' = 'com.mysql.jdbc.Driver', -- optional: the class name of the JDBC driver to use to connect to this URL. 
                                                -- If not set, it will automatically be derived from the URL.

  'connector.username' = 'name', -- optional: jdbc user name and password
  'connector.password' = 'password',
  -- lookup options, optional, used in temporary join
  'connector.lookup.cache.max-rows' = '5000', -- optional, max number of rows of lookup cache, over this value, the oldest rows will
                                              -- be eliminated. "cache.max-rows" and "cache.ttl" options must all be specified if any
                                              -- of them is specified. Cache is not enabled as default.
  'connector.lookup.cache.ttl' = '10s', -- optional, the max time to live for each rows in lookup cache, over this time, the oldest rows
                                        -- will be expired. "cache.max-rows" and "cache.ttl" options must all be specified if any of
                                        -- them is specified. Cache is not enabled as default.
  'connector.lookup.max-retries' = '3', -- optional, max retry times if lookup database failed
)

這里特別說(shuō)明一下孙蒙,connector.lookup.cache.max-rowsconnector.lookup.cache.ttl必須同時(shí)出現(xiàn)。這里指的是配置緩存和緩存時(shí)間悲雳。

  • 依舊存在幾個(gè)小問(wèn)題
    • 指定緩存后挎峦,Join維表時(shí)拿到的可能是舊數(shù)據(jù)。所以合瓢,如果想要拿到盡可能新的數(shù)據(jù)坦胶,緩存時(shí)間又需要設(shè)置的較短或者沒(méi)有,那又失去了緩存的意義歪玲;緩存條數(shù)如果過(guò)大會(huì)將內(nèi)存撐爆迁央,而且內(nèi)存中的數(shù)據(jù)相對(duì)第三方存儲(chǔ)中的數(shù)據(jù)來(lái)說(shuō),可能又是舊數(shù)據(jù)滥崩,而太小又得去頻繁查詢數(shù)據(jù)庫(kù)岖圈,也就失去了緩存的意義。所以钙皮,緩存的大小和時(shí)間需要一個(gè)很好的平衡蜂科。不過(guò)通常來(lái)說(shuō),維表中的數(shù)據(jù)應(yīng)該是很少變化的短条,同樣导匣,如果緩存較多的條數(shù),那在啟動(dòng)任務(wù)時(shí)相應(yīng)的提高TM的內(nèi)存茸时,也能夠緩解撐爆內(nèi)存的情形贡定。
    • 目前Flink Sql原生支持的維表關(guān)聯(lián)只支持同步模式,如果需要異步模式或者想用其他的第三方存儲(chǔ)只能夠自己去實(shí)現(xiàn)可都。當(dāng)然缓待,我們之后也會(huì)有真正的異步維表Join案例分享,大家敬請(qǐng)期待渠牲。

接下來(lái)旋炒,我們通過(guò)案例來(lái)了解如何使用維表Join

Flink Sql 1.10 維表Join

在正式開(kāi)始之前,先做好準(zhǔn)備工作签杈,之前通過(guò)docker安裝的mysql存在一點(diǎn)小問(wèn)題瘫镇,我們往表中插入中文字符時(shí)會(huì)出現(xiàn)亂碼,我們需要去解決一下。
首先铣除,進(jìn)入我們的mysql容器里面docker exec -it 9f7da4663dc1 '/bin/bash'谚咬,然后執(zhí)行echo "character-set-server=utf8" >> /etc/mysql/mysql.conf.d/mysqld.cnf 。執(zhí)行完畢后一路ctrl+D退到最外面通孽,通過(guò)docker restart 容器idmysql服務(wù)重啟序宦。重新連接上mysql,執(zhí)行show variables like '%character%';背苦,然后確定character_set_server是否為utf-8互捌。

接著我們?cè)?code>mysql中創(chuàng)建一張維表

CREATE TABLE `dim_behavior` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `c_name` varchar(255) COLLATE utf8_bin DEFAULT NULL,
  `e_name` varchar(255) COLLATE utf8_bin DEFAULT NULL,
  PRIMARY KEY (`id`)
)

再插入兩條數(shù)據(jù)

INSERT INTO `test`.`dim_behavior`(`id`, `c_name`, `e_name`) VALUES (1, '瀏覽', 'pv');
INSERT INTO `test`.`dim_behavior`(`id`, `c_name`, `e_name`) VALUES (2, '購(gòu)買', 'buy');

準(zhǔn)備工作完畢,下面我們直接看代碼

package FlinkSql;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;


public class FlinkSql03 {

    public static final String CSV_TABLE_SOURCE_DDL = "" +
            "CREATE TABLE csv_source (\n" +
            " user_id bigint,\n" +
            " item_id bigint,\n" +
            " category_id bigint,\n" +
            " behavior varchar,\n" +
            " ts bigint,\n" +
            " proctime as PROCTIME() \n"+
            ") WITH (\n" +
            " 'connector.type' = 'filesystem', -- 指定連接類型\n" +
            " 'connector.path' = 'C:\\Users\\tzmaj\\Desktop\\教程\\3\\UserBehavior.csv',-- 目錄 \n" +
            " 'format.type' = 'csv', -- 文件格式 \n" +
            " 'format.field-delimiter' = ',' ,-- 字段分隔符 \n" +
            " 'format.fields.0.name' = 'user_id',-- 第N字段名行剂,相當(dāng)于表的schema秕噪,索引從0開(kāi)始 \n" +
            " 'format.fields.0.data-type' = 'bigint',-- 字段類型\n" +
            " 'format.fields.1.name' = 'item_id', \n" +
            " 'format.fields.1.data-type' = 'bigint',\n" +
            " 'format.fields.2.name' = 'category_id',\n" +
            " 'format.fields.2.data-type' = 'bigint',\n" +
            " 'format.fields.3.name' = 'behavior', \n" +
            " 'format.fields.3.data-type' = 'String',\n" +
            " 'format.fields.4.name' = 'ts', \n" +
            " 'format.fields.4.data-type' = 'bigint'\n" +
            ")      ";

    public static final String MYSQL_TABLE_DIM_DDL = ""+
            "CREATE TABLE `dim_behavior` (\n" +
            "  `id` int  ,\n" +
            "  `c_name` varchar  ,\n" +
            "  `e_name` varchar  \n" +
            ")WITH (\n" +
            "  'connector.type' = 'jdbc', -- 連接方式\n" +
            "  'connector.url' = 'jdbc:mysql://localhost:3306/test', -- jdbc的url\n" +
            "  'connector.table' = 'dim_behavior',  -- 表名\n" +
            "  'connector.driver' = 'com.mysql.jdbc.Driver', -- 驅(qū)動(dòng)名字,可以不填厚宰,會(huì)自動(dòng)從上面的jdbc url解析 \n" +
            "  'connector.username' = 'root', -- 顧名思義 用戶名\n" +
            "  'connector.password' = '123456' , -- 密碼\n" +
            "  'connector.lookup.cache.max-rows' = '5000', -- 緩存條數(shù) \n"+
            "  'connector.lookup.cache.ttl' = '10s' -- 緩存時(shí)間 \n"+
            ")";

    public static final String MYSQL_TABLE_SINK_DDL=""+
            "CREATE TABLE `result_1` (\n" +
            "  `behavior` varchar  ,\n" +
            "  `count_unique_user` bigint,  \n" +
            "  `e_name` varchar  \n" +
            ")WITH (\n" +
            "  'connector.type' = 'jdbc', -- 連接方式\n" +
            "  'connector.url' = 'jdbc:mysql://localhost:3306/test', -- jdbc的url\n" +
            "  'connector.table' = 'result_1',  -- 表名\n" +
            "  'connector.driver' = 'com.mysql.jdbc.Driver', -- 驅(qū)動(dòng)名字腌巾,可以不填,會(huì)自動(dòng)從上面的jdbc url解析 \n" +
            "  'connector.username' = 'root', -- 顧名思義 用戶名\n" +
            "  'connector.password' = '123456' , -- 密碼\n" +
            "  'connector.write.flush.max-rows' = '5000', -- 意思是攢滿多少條才觸發(fā)寫入 \n" +
            "  'connector.write.flush.interval' = '1' -- 意思是攢滿多少秒才觸發(fā)寫入铲觉;這2個(gè)參數(shù)澈蝙,無(wú)論數(shù)據(jù)滿足哪個(gè)條件,就會(huì)觸發(fā)寫入\n" +
//            "  'update-mode' = 'upsert' -- 指定為插入更新模式 \n"+
            ")";

    public static void main(String[] args) throws Exception {

        //構(gòu)建StreamExecutionEnvironment
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //構(gòu)建EnvironmentSettings 并指定Blink Planner
         EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

        //構(gòu)建StreamTableEnvironment
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);

        //注冊(cè)csv文件數(shù)據(jù)源表
        tEnv.sqlUpdate(CSV_TABLE_SOURCE_DDL);

        //注冊(cè)mysql數(shù)據(jù)維表
        tEnv.sqlUpdate(MYSQL_TABLE_DIM_DDL);

        //注冊(cè)mysql數(shù)據(jù)結(jié)果表
        tEnv.sqlUpdate(MYSQL_TABLE_SINK_DDL);

        //計(jì)算每種類型的行為有多少用戶
        Table group = tEnv.sqlQuery("select behavior,count(distinct user_id) count_unique_user from csv_source group by behavior ");

        //轉(zhuǎn)回datastream撵幽,因?yàn)樾枰黾觩roctime灯荧,而目前定義proctime方式只有兩種,一種是在定義DDL的時(shí)候盐杂,一種是在DataStream轉(zhuǎn) Table的時(shí)候
        //轉(zhuǎn)撤回流是因?yàn)樯厦娴膕ql用了group by逗载,所以只能使用撤回流
        DataStream<Row> ds = tEnv.toRetractStream(group, Row.class).flatMap(
                new FlatMapFunction<Tuple2<Boolean, Row>, Row>() {
                    @Override
                    public void flatMap(Tuple2<Boolean, Row> value, Collector<Row> collect) throws Exception {

                        collect.collect(value.f1);
                    }
                }
        ).returns(Types.ROW(Types.STRING,Types.LONG));

        //給Table增加proctime字段,ts可以隨便改成別的你喜歡的名字
        Table table = tEnv.fromDataStream(ds, "behavior,count_unique_user,ts.proctime");

        //建立視圖链烈,保留臨時(shí)表
        tEnv.createTemporaryView("group_by_view",table);

        //pv厉斟,buy,cart...等行為對(duì)應(yīng)的英文名强衡,我們通過(guò)維表Join的方式擦秽,替換為中文名
        //FOR SYSTEM_TIME AS OF a.ts AS b 這是固定寫法,ts與上面指定dataStream schema時(shí)候用的名字一致
        //這里之所以再group by漩勤,是讓這次查詢變成撤回流号涯,這樣插入mysql時(shí),可以通過(guò)主鍵自動(dòng)update數(shù)據(jù)
        Table join = tEnv.sqlQuery("select b.c_name as behavior , max(a.count_unique_user) ,a.behavior as e_name " +
                "from group_by_view a " +
                "left join dim_behavior FOR SYSTEM_TIME AS OF a.ts AS b " +
                "on a.behavior = b.e_name " +
                "group by a.behavior,b.c_name");

        //建立視圖锯七,保留臨時(shí)表
        tEnv.createTemporaryView("join_view",join);

        //數(shù)據(jù)輸出到mysql
        tEnv.sqlUpdate("insert into result_1 select * from join_view");

        //任務(wù)啟動(dòng),這行必不可少誉己!
        env.execute("FlinkSql03");
    }

}

如果UserBehavior.csv沒(méi)有的話眉尸,可以通過(guò)第一課Flink Sql教程(1)最下方的連接下載。
接下來(lái)讓我們看看mysql中的結(jié)果
behavior count_unique_user e_name

behavior count_unique_user e_name
購(gòu)買 10587 buy
23143 cart
12194 fav
瀏覽 215662 pv

中間兩行的behavior列之所以沒(méi)有值,是因?yàn)槲覀儧](méi)有在維表中定義噪猾,加上我們采用的是left join霉祸。大家可以自己定義然后測(cè)試數(shù)據(jù)能否正確補(bǔ)齊。

那么袱蜡,我們這次的維表Join嘗鮮就這樣結(jié)束了丝蹭,下一章將更新UDFTemporal table function Join,不過(guò)應(yīng)該是五一之后的事了坪蚁。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末奔穿,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子敏晤,更是在濱河造成了極大的恐慌贱田,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,185評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件嘴脾,死亡現(xiàn)場(chǎng)離奇詭異男摧,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)译打,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門耗拓,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人奏司,你說(shuō)我怎么就攤上這事乔询。” “怎么了结澄?”我有些...
    開(kāi)封第一講書人閱讀 163,524評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵哥谷,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我麻献,道長(zhǎng)们妥,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書人閱讀 58,339評(píng)論 1 293
  • 正文 為了忘掉前任勉吻,我火速辦了婚禮监婶,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘齿桃。我一直安慰自己惑惶,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,387評(píng)論 6 391
  • 文/花漫 我一把揭開(kāi)白布短纵。 她就那樣靜靜地躺著带污,像睡著了一般。 火紅的嫁衣襯著肌膚如雪香到。 梳的紋絲不亂的頭發(fā)上鱼冀,一...
    開(kāi)封第一講書人閱讀 51,287評(píng)論 1 301
  • 那天报破,我揣著相機(jī)與錄音,去河邊找鬼千绪。 笑死充易,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的荸型。 我是一名探鬼主播盹靴,決...
    沈念sama閱讀 40,130評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼瑞妇!你這毒婦竟也來(lái)了稿静?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 38,985評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤踪宠,失蹤者是張志新(化名)和其女友劉穎自赔,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體柳琢,經(jīng)...
    沈念sama閱讀 45,420評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡绍妨,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,617評(píng)論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了柬脸。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片他去。...
    茶點(diǎn)故事閱讀 39,779評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖倒堕,靈堂內(nèi)的尸體忽然破棺而出灾测,到底是詐尸還是另有隱情,我是刑警寧澤垦巴,帶...
    沈念sama閱讀 35,477評(píng)論 5 345
  • 正文 年R本政府宣布媳搪,位于F島的核電站,受9級(jí)特大地震影響骤宣,放射性物質(zhì)發(fā)生泄漏秦爆。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,088評(píng)論 3 328
  • 文/蒙蒙 一憔披、第九天 我趴在偏房一處隱蔽的房頂上張望等限。 院中可真熱鬧,春花似錦芬膝、人聲如沸望门。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,716評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)筹误。三九已至,卻和暖如春癣缅,著一層夾襖步出監(jiān)牢的瞬間厨剪,已是汗流浹背勘畔。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,857評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留丽惶,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,876評(píng)論 2 370
  • 正文 我出身青樓爬立,卻偏偏與公主長(zhǎng)得像钾唬,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子侠驯,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,700評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • 一抡秆、維表join使用場(chǎng)景 維表Join是流與表的關(guān)聯(lián)操作,為了補(bǔ)全流里的額外字段吟策,通常這些待補(bǔ)全的維度字段很少發(fā)生...
    data之道閱讀 5,963評(píng)論 1 8
  • 我喜歡早起儒士,因?yàn)榍宄康娘L(fēng)景,從夜幕星河到朝陽(yáng)噴薄的變幻檩坚,總給你意想不到的美麗着撩。 上周末的早晨,我和往常一樣去海邊跑...
    山方方閱讀 874評(píng)論 14 15
  • 一一寫給我的班主任牛守峰老師 文/九九艷陽(yáng)天 屈指一算匾委,小學(xué)畢業(yè)已將近十六年了拖叙,在成長(zhǎng)的歲月里,許許多多...
    九九艷陽(yáng)天_4385閱讀 881評(píng)論 5 13
  • 我有一個(gè)小我一輪的小老弟赂乐,他跟我同父異母薯鳍,我特別喜歡他小時(shí)候,超級(jí)可愛(ài)挨措。 他小時(shí)候特別高冷挖滤,怎么逗他他都不笑,有一...
    溫小草閱讀 131評(píng)論 0 5
  • 作者:小木屋圖書早讀社lydia浅役,加入早讀社可以免費(fèi)暢讀紙質(zhì)書斩松。 《實(shí)用折紙大全》2014年4月1日 作者:犀文圖...
    lydia_fba0閱讀 239評(píng)論 0 0