前言:
????????在 Flink 和 Iceberg 的集成方面,社區(qū)實現(xiàn)了Iceberg 的 Flink Streaming Reader,意味著我們可以通過 Flink 流作業(yè)增量地去拉取 Apache Iceberg 中新增數(shù)據(jù)。對 Apache Iceberg 這樣流批統(tǒng)一的存儲層來說,Apache Flink 是真正意義上第一個實現(xiàn)了流批讀寫 ?Iceberg 的計算引擎,這也標志著 Apache Flink 和 Apache Iceberg ?在共同打造流批統(tǒng)一的數(shù)據(jù)湖架構(gòu)上開啟了新的篇章。
相關組件版本:
HDFS:3.0.0-CDH6.2.1
Hive:2.1.1-CDH6.2.1
Flink:1.11.1
Iceberg:0.11.0
通過Flink SQL Client 流式讀取 Iceberg
過程大體參照 Iceberg-Flink官方文檔?
?https://github.com/apache/iceberg/blob/master/site/docs/flink.md
Step 1:解壓Flink损搬,基于Hadoop環(huán)境啟動Standalone的Flink集群
1. tar xzvf flink-1.11.1-bin-scala_2.11.tgz
2. export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
3. ./bin/start-cluster.sh
Step 2:啟動Flink SQL Client
如果Iceberg Catalog 為 Hadoop
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/sql-client.sh embedded -j <flink-runtime-directory>/iceberg-flink-runtime-0.11.0.jar shell
如果Iceberg Catalog 為 Hive(后面測試基于Iceberg Hive Catalog為例)
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/sql-client.sh embedded \
? ? -j <flink-runtime-directory>/iceberg-flink-runtime-0.11.0.jar \
? ? -j <hive-bundlded-jar-directory>/flink-sql-connector-hive-2.2.0_2.11-1.11.0.jar \
? ? -j <hive-bundlded-jar-directory>/flink-connector-hive_2.11-1.11.1.jar \
? ? -j <hive-bundlded-jar-directory>/hive-exec-2.1.1-cdh6.2.1.jar \
? ? shell
jar包可以去maven倉庫下載,或者通過IDEA用Maven直接下載(CDH要配置CDH的repository)
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.2.0_2.11/
<repository>
????<id>cloudera</id>
????<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
如果這里按Iceberg官網(wǎng),不加 flink-connector-hive_2.11-1.11.1.jar 和 hive-exec-2.1.1-cdh6.2.1.jar依賴场躯,查詢時會報錯谈为。
通過Flink集成Hive的官方文檔?
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/
提示增加上述兩個依賴后問題解決。
Step 3:創(chuàng)建Iceberg Hive Catalog
CREATE CATALOG iceberg_catalog WITH (
? 'type'='iceberg',
? 'catalog-type'='hive',
? 'uri'='thrift://node103:9083',
? 'clients'='5',
? 'property-version'='1',
? 'hive-conf-dir'='/etc/hive/conf.cloudera.hive');
Step 4:開啟對Iceberg表的實時查詢
1. use catalog iceberg_catalog;
2. create database iceberg踢关;
3. use iceberg;
4. SET execution.type = streaming;
5. SET table.dynamic-table-options.enabled=true;
6. SELECT * FROM sample2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;
7. 啟動Flink任務向Iceberg實時寫入數(shù)據(jù)