1.場景是 通過flink sql 對表的操作提陶,通過添加多個算子 對結(jié)果的操作如圖:
例子不是很恰當(dāng)主要是 我們在創(chuàng)建數(shù)倉表時框都,通過先將一個函數(shù)的結(jié)果集放到一個視圖中确丢, 在通過一個一個函數(shù)將結(jié)果放入到下一個視圖中茫多, 然后在通過視圖View2 的結(jié)果放入到 數(shù)倉新表中祈匙。
比如 對一張表中的一列進行函數(shù)的結(jié)果放入到一個視圖中 在對視圖進行一個函數(shù) 結(jié)果集放入到視圖中,最終將視圖中的結(jié)果放入到表中天揖。
2. 版本
mysql | flink |
---|---|
5.7.20-log | fink 14.5 |
3.先創(chuàng)建mysql 表
CREATE DATABASE ;
USE `test`;
DROP TABLE IF EXISTS `Flink_cdc`;
CREATE TABLE `Flink_cdc` (
`id` bigint(64) NOT NULL AUTO_INCREMENT,
`name` varchar(64) DEFAULT NULL,
`age` int(20) DEFAULT NULL,
`birthday` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=72 DEFAULT CHARSET=utf8mb4;
insert into `Flink_cdc`(`id`,`name`,`age`,`birthday`,`ts`) values
(69,'flink',21,'2022-12-09 22:57:15','2022-12-09 22:57:17'),
(70,'flink sql',22,'2022-12-09 23:01:43','2022-12-09 23:01:46'),
(71,'flk sql',23,'2022-12-09 23:43:04','2022-12-09 23:43:07');
3.創(chuàng)建flink mysql cdc 表
CREATE TABLE source_mysql (
id BIGINT PRIMARY KEY NOT ENFORCED,
name STRING,
age INT,
birthday TIMESTAMP(3),
ts TIMESTAMP(3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.180',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'server-time-zone' = 'Asia/Shanghai',
'debezium.snapshot.mode' = 'initial',
'database-name' = 'test',
'table-name' = 'Flink_cdc'
);
4.創(chuàng)建視圖
Flink SQL> create view v1 as select * from source_mysql;
[INFO] Execute statement succeed.
5.創(chuàng)建算子
先說下這個函數(shù)的意思
INSTR(string1, string2) --返回string2在string1 中第一次出現(xiàn)的位置
--select INSTR('flinksql','k'); -- 返回5
create view v2 as select id,instr(name,'k'),age,birthday from v1;
6.將結(jié)果寫入到新表中 (比如創(chuàng)建表V3)
create view v3 as select * from v2 where name in (select min(name) from v2);
7. 也可以往視圖中添加數(shù)據(jù)
create view v6 as values ('1','flink','hbase','hive');