Apache Doris 整合 FLINK CDC 、Paimon 構建實時湖倉一體的聯(lián)邦查詢入門

1.概覽

多源數(shù)據目錄(Multi-Catalog)功能沧奴,旨在能夠更方便對接外部數(shù)據目錄痘括,以增強Doris的數(shù)據湖分析和聯(lián)邦數(shù)據查詢能力。

在之前的 Doris 版本中,用戶數(shù)據只有兩個層級:Database 和 Table纲菌。當我們需要連接一個外部數(shù)據目錄時挠日,我們只能在Database 或 Table 層級進行對接。比如通過 create external table 的方式創(chuàng)建一個外部數(shù)據目錄中的表的映射翰舌,或通過 create external database 的方式映射一個外部數(shù)據目錄中的 Database嚣潜。如果外部數(shù)據目錄中的 Database 或 Table 非常多,則需要用戶手動進行一一映射椅贱,使用體驗不佳懂算。

而新的 Multi-Catalog 功能在原有的元數(shù)據層級上,新增一層Catalog庇麦,構成 Catalog -> Database -> Table 的三層元數(shù)據層級计技。其中,Catalog 可以直接對應到外部數(shù)據目錄女器。目前支持的外部數(shù)據目錄包括:

  1. Apache Hive
  2. Apache Iceberg
  3. Apache Hudi
  4. Elasticsearch
  5. JDBC: 對接數(shù)據庫訪問的標準接口(JDBC)來訪問各式數(shù)據庫的數(shù)據酸役。
  6. Apache Paimon(Incubating)

該功能將作為之前外表連接方式(External Table)的補充和增強,幫助用戶進行快速的多數(shù)據目錄聯(lián)邦查詢驾胆。

這篇教程將展示如何使用 Flink + paimon + Doris 構建實時湖倉一體的聯(lián)邦查詢分析涣澡,Doris 2.0.3 版本提供了 的支持,本文主要展示 Doris 和 paimon 怎么使用丧诺,同時本教程整個環(huán)境是都基于偽分布式環(huán)境搭建入桂,大家按照步驟可以一步步完成。完整體驗整個搭建操作的過程驳阎。

2. 環(huán)境

本教程的演示環(huán)境如下:

  1. Apache doris 2.0.2
  2. Hadoop 3.3.3
  3. hive 3.1.3
  4. Fink 1.17.1
  5. Apache paimon 0.5.0
  6. JDK 1.8.0_311

3. 安裝

  1. 下載 Flink 1.17.1
    wget https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz

    解壓安裝

    tar zxf flink-1.17.1-bin-scala_2.12.tgz
  2. 下載相關的依賴到 Flink/lib 目錄
cp /Users/zhangfeng/hadoop/hadoop-3.3.6/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.6.jar ./lib/
wget https://repo1.maven.org/maven2/org/apache/paimon/paimon-flink-1.17/0.5.0-incubating/paimon-flink-1.17-0.5.0-incubating.jar
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.17.1/flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar
  1. 配置并啟動 Flink

配置環(huán)境變量抗愁,修改flink-conf.yaml配置文件

env.java.opts.all: "-Dfile.encoding=UTF-8"
classloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 3
execution.checkpointing.interval: 10s
state.backend: rocksdb
state.checkpoints.dir: hdfs://zhangfeng:9000/flink/myckp
state.savepoints.dir: hdfs://zhangfeng:9000/flink/savepoints
state.backend.incremental: true

啟動 Flink

bin/start-cluster.sh
bin/sql-client.sh embedded 
set 'sql-client.execution.result-mode' = 'tableau';
image.png

Catalog

Paimon Catalog可以持久化元數(shù)據,當前支持兩種類型的metastore

  • 文件系統(tǒng)(默認):將元數(shù)據和表文件存儲在文件系統(tǒng)中呵晚。
  • hive:在hive metastore存儲元數(shù)據蜘腌,用戶可以直接從hive訪問表。

文件系統(tǒng)

下面的 Flink SQL 注冊并使用一個名為 paimon_catalog 的catalog饵隙。元數(shù)據和表文件存放在hdfs://localhost:9000/paimon/data

CREATE CATALOG paimon_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs://localhost:9000/paimon/data'
);

show catalogs;
image.png

Hive Catalog

我們也可以直接使用 hive metastore 來存儲 paimon 元數(shù)據撮珠。

下面是創(chuàng)建語句

CREATE CATALOG paimon_hive WITH (
    'type' = 'paimon',
    'metastore' = 'hive',
    'uri' = 'thrift://localhost:9083',
    'hive-conf-dir' = '/Users/zhangfeng/hadoop/apache-hive-3.1.3-bin/conf/', 
    'warehouse' = 'hdfs://localhost:9000/paimon/hive'
);
show catalogs;
image.png

創(chuàng)建 paimon 表

USE CATALOG paimon_hive;
CREATE TABLE test_paimon_01 (
  userid BIGINT,
  age INT,
  address STRING,
  regiter_dt STRING  ,
  PRIMARY KEY(userid, regiter_dt) NOT ENFORCED
) PARTITIONED BY (regiter_dt);

show tables
image.png

4. 同步MySQL 數(shù)據到 Paimon表

下面我們演示怎么基于Flink CDC 快速實時同步 MySQL 表的數(shù)據到 Paimon表里。

這里首先你的MySQL 數(shù)據庫要開啟 binlog金矛,具體的方法網上很多芯急,這里不在敘述。

MySQL 表:

CREATE DATABASE emp_1;
 USE emp_1;
CREATE TABLE employees_1 (
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no)
);

INSERT INTO `employees_1` VALUES  (10055,'1956-06-06','Georgy','Dredge','M','1992-04-27'),
(10056,'1961-09-01','Brendon','Bernini','F','1990-02-01'),
(10057,'1954-05-30','Ebbe','Callaway','F','1992-01-15'),
(10058,'1954-10-01','Berhard','McFarlin','M','1987-04-13'),
(10059,'1953-09-19','Alejandro','McAlpine','F','1991-06-26'),
(10060,'1961-10-15','Breannda','Billingsley','M','1987-11-02'),
(10061,'1962-10-19','Tse','Herber','M','1985-09-17'),
(10062,'1961-11-02','Anoosh','Peyn','M','1991-08-30'),
(10063,'1952-08-06','Gino','Leonhardt','F','1989-04-08'),
(10064,'1959-04-07','Udi','Jansch','M','1985-11-20'),
(10065,'1963-04-14','Satosi','Awdeh','M','1988-05-18'),
(10066,'1952-11-13','Kwee','Schusler','M','1986-02-26'),
(10067,'1953-01-07','Claudi','Stavenow','M','1987-03-04'),
(10068,'1962-11-26','Charlene','Brattka','M','1987-08-07'),
(10069,'1960-09-06','Margareta','Bierman','F','1989-11-05'),
(10070,'1955-08-20','Reuven','Garigliano','M','1985-10-14'),
(10071,'1958-01-21','Hisao','Lipner','M','1987-10-01'),
(10072,'1952-05-15','Hironoby','Sidou','F','1988-07-21'),
(10073,'1954-02-23','Shir','McClurg','M','1991-12-01'),
(10074,'1955-08-28','Mokhtar','Bernatsky','F','1990-08-13'),
(10075,'1960-03-09','Gao','Dolinsky','F','1987-03-19'),
(10076,'1952-06-13','Erez','Ritzmann','F','1985-07-09'),
(10077,'1964-04-18','Mona','Azuma','M','1990-03-02'),
(10078,'1959-12-25','Danel','Mondadori','F','1987-05-26'),
(10079,'1961-10-05','Kshitij','Gils','F','1986-03-27'),
(10080,'1957-12-03','Premal','Baek','M','1985-11-19'),
(10081,'1960-12-17','Zhongwei','Rosen','M','1986-10-30'),
(10082,'1963-09-09','Parviz','Lortz','M','1990-01-03'),
(10083,'1959-07-23','Vishv','Zockler','M','1987-03-31'),
(10084,'1960-05-25','Tuval','Kalloufi','M','1995-12-15');

在Flink sql-client 下創(chuàng)建 MySQL CDC 表:

CREATE TABLE employees_source (
    database_name STRING METADATA VIRTUAL,
    table_name STRING METADATA VIRTUAL,
    emp_no int NOT NULL,
    birth_date date,
    first_name STRING,
    last_name STRING,
    gender STRING,
    hire_date date,
    PRIMARY KEY (`emp_no`) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = 'zhangfeng',
    'database-name' = 'emp_1',
    'table-name' = 'employees_1'
  );

使用Create table as select 創(chuàng)建Paimon表驶俊,并將數(shù)據實時同步到Paimon表里:

create table mysql_to_paimon_01 as select * from default_catalog.default_database.employees_source;

查看Job

image.png

我們這個時候可以在Flink sql-client 下查詢 paimon 娶耍,看到 Paimon 表里已經有數(shù)據了。

image.png

5. Doris On Paimon

Doris 提供了 Paimon 的 catalog 支持饼酿,我們可以通過這種方式榕酒,通過Doris 快速的去讀 Paimon 表的數(shù)據胚膊,同時也可以通過 catalog 方式將 paimon 表的數(shù)據遷移到 Doris 表里

5.1 Doris 整合查詢Paimon表

首先我們創(chuàng)建 Paimon catalog,有兩種方式:

  1. 一種是基于 Hive metastore service
  2. 一種是基于 HDFS 文件系統(tǒng)
CREATE CATALOG `paimon_hdfs` PROPERTIES (
    "type" = "paimon",
    "warehouse" = "hdfs://localhost:9000/paimon/hive",
    "hadoop.username" = "hadoop"
);

CREATE CATALOG `paimon_hms` PROPERTIES (
    "type" = "paimon",
    "paimon.catalog.type" = "hms",
    "warehouse" = "hdfs://localhost:9000/paimon/hive",
    "hive.metastore.uris" = "thrift://localhost:9083"
);

創(chuàng)建成功之后我們通過 show catalogs方式可以看到我們創(chuàng)建好的 paimon catalog想鹰;

mysql> show catalogs;
+-----------+-------------+----------+-----------+-------------------------+---------------------+------------------------+
| CatalogId | CatalogName | Type     | IsCurrent | CreateTime              | LastUpdateTime      | Comment                |
+-----------+-------------+----------+-----------+-------------------------+---------------------+------------------------+
|   1308010 | hive        | hms      |           | 2023-11-17 09:42:22.872 | 2023-11-17 09:42:46 | NULL                   |
|   1326307 | hudi        | hms      |           | 2023-11-27 11:33:22.231 | 2023-11-27 11:33:35 | NULL                   |
|         0 | internal    | internal |           | UNRECORDED              | NULL                | Doris internal catalog |
|     35689 | jdbc        | jdbc     |           | 2023-11-03 12:52:24.695 | 2023-11-03 12:52:59 | NULL                   |
|     38003 | mysql       | jdbc     |           | 2023-11-07 11:46:40.006 | 2023-11-07 11:46:54 | NULL                   |
|   1329142 | paimon_hdfs | paimon   |           | 2023-11-27 14:06:13.744 | 2023-11-27 14:06:41 |                        |
|   1328144 | paimon_hms  | paimon   | yes       | 2023-11-27 14:00:32.925 | 2023-11-27 14:00:44 | NULL                   |
+-----------+-------------+----------+-----------+-------------------------+---------------------+------------------------+
7 rows in set (0.00 sec)

切換 paimon catalog澜掩,通過下面這些操作我們可以看到我們在 paimon 里創(chuàng)建的表

mysql> switch  paimon_hdfs;
Query OK, 0 rows affected (0.00 sec)

mysql> show databases;
+----------+
| Database |
+----------+
| default  |
+----------+
1 row in set (0.02 sec)

mysql> use default;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> show tables;
+--------------------------+
| Tables_in_default        |
+--------------------------+
| example_tbl_partition_01 |
| example_tbl_unique_01    |
| mysql_to_paimon_01       |
| test_paimon_01           |
+--------------------------+
4 rows in set (0.00 sec)

通過 Doris 查詢 Paimon 表

select * from mysql_to_paimon_01;
image.png

5.2 將Paimon 表的數(shù)據導入到 Doris

我們也可以快速的利用catalog 方式將 paimon 數(shù)據遷移到 Doris 里,我們可以使用 CATS方式:

create table doris_paimon_01
PROPERTIES("replication_num" = "1")  as  select * from paimon_hdfs.`default`.mysql_to_paimon_01;
image.png

注意:

1. 查詢paimon的時候如果報下面的錯誤:

org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "hdfs"

需要再 hdfs 需要再core-site.xml 文件中加上下面的配置:

<property>
  <name>fs.hdfs.impl</name>
  <value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
  <description>The FileSystem for hdfs: uris.</description>
</property>

6. 總結

是不是使用非常簡單,快快體驗Doris 湖倉一體,聯(lián)邦查詢的能力匾效,來加速你的數(shù)據分析性能

編輯于 2023-11-30 10:59?IP 屬地陜西

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市株汉,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌歌殃,老刑警劉巖乔妈,帶你破解...
    沈念sama閱讀 206,602評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異氓皱,居然都是意外死亡路召,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評論 2 382
  • 文/潘曉璐 我一進店門波材,熙熙樓的掌柜王于貴愁眉苦臉地迎上來股淡,“玉大人,你說我怎么就攤上這事廷区∥椋” “怎么了?”我有些...
    開封第一講書人閱讀 152,878評論 0 344
  • 文/不壞的土叔 我叫張陵隙轻,是天一觀的道長埠帕。 經常有香客問我,道長玖绿,這世上最難降的妖魔是什么敛瓷? 我笑而不...
    開封第一講書人閱讀 55,306評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮斑匪,結果婚禮上呐籽,老公的妹妹穿的比我還像新娘。我一直安慰自己秤标,他們只是感情好绝淡,可當我...
    茶點故事閱讀 64,330評論 5 373
  • 文/花漫 我一把揭開白布宙刘。 她就那樣靜靜地躺著苍姜,像睡著了一般。 火紅的嫁衣襯著肌膚如雪悬包。 梳的紋絲不亂的頭發(fā)上衙猪,一...
    開封第一講書人閱讀 49,071評論 1 285
  • 那天,我揣著相機與錄音,去河邊找鬼垫释。 笑死丝格,一個胖子當著我的面吹牛,可吹牛的內容都是我干的棵譬。 我是一名探鬼主播显蝌,決...
    沈念sama閱讀 38,382評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼订咸!你這毒婦竟也來了曼尊?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,006評論 0 259
  • 序言:老撾萬榮一對情侶失蹤脏嚷,失蹤者是張志新(化名)和其女友劉穎骆撇,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體父叙,經...
    沈念sama閱讀 43,512評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡神郊,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,965評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了趾唱。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片涌乳。...
    茶點故事閱讀 38,094評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖甜癞,靈堂內的尸體忽然破棺而出爷怀,到底是詐尸還是另有隱情,我是刑警寧澤带欢,帶...
    沈念sama閱讀 33,732評論 4 323
  • 正文 年R本政府宣布运授,位于F島的核電站,受9級特大地震影響乔煞,放射性物質發(fā)生泄漏吁朦。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,283評論 3 307
  • 文/蒙蒙 一渡贾、第九天 我趴在偏房一處隱蔽的房頂上張望逗宜。 院中可真熱鬧,春花似錦空骚、人聲如沸纺讲。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽熬甚。三九已至,卻和暖如春肋坚,著一層夾襖步出監(jiān)牢的瞬間乡括,已是汗流浹背肃廓。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留诲泌,地道東北人盲赊。 一個月前我還...
    沈念sama閱讀 45,536評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像敷扫,于是被迫代替她去往敵國和親哀蘑。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,828評論 2 345

推薦閱讀更多精彩內容