1.blink在flink的基礎(chǔ)上做了大量的優(yōu)化,其中有兩點(diǎn):
1.1Catalog
在catalog上做了如下修改和優(yōu)化:
- 通過(guò)引入全新的 ReadableCatalog and ReadableWritableCatalog 接口統(tǒng)一了 Flink 的內(nèi)部和外部 catalog栖榨。Flink 所有的 catalog 會(huì)被 TableEnvironment 中的 CatalogManager管理龙考。
- 實(shí)現(xiàn)了兩種新的 catalog - FlinkInMemoryCatalog and HiveCatalog黔姜。FlinkInMemoryCatalog 會(huì)將所有元數(shù)據(jù)存在內(nèi)存中。HiveCatalog 會(huì)連接 Hive metastore 并橋接 Flink 和 Hive 之間的元數(shù)據(jù)分苇。目前彤蔽,這個(gè)HiveCatalog 可以提供讀取 Hive 元數(shù)據(jù)的能力,包括數(shù)據(jù)庫(kù)(databases)挑社,表(tables)陨界,表分區(qū)(table partitions), 簡(jiǎn)單的數(shù)據(jù)類型(simple data types), 表和列的統(tǒng)計(jì)信息(table and column stats)。
- 重新清晰定義了引用目標(biāo)的層級(jí)痛阻,即 'mycatalog.mydatabase.mytable'菌瘪。通過(guò)定義默認(rèn) catalog 和默認(rèn)數(shù)據(jù)庫(kù),用戶可以將引用層級(jí)簡(jiǎn)單化為 'mytable’阱当。
未來(lái)俏扩,我們還將加入對(duì)更多類型的元數(shù)據(jù)以及catalog的支持。
1.2Hive兼容性
我們的目標(biāo)是在元數(shù)據(jù)(meta data)和數(shù)據(jù)層將 Flink 和 Hive 對(duì)接和打通弊添。
- 在這個(gè)版本上录淡,F(xiàn)link可以通過(guò)上面提到的HiveCatalog讀取Hive的metaData。
- 這個(gè)版本實(shí)現(xiàn)了HiveTableSource油坝,使得Flink job可以直接讀取Hive中普通表和分區(qū)表的數(shù)據(jù)嫉戚,以及做分區(qū)的裁剪刨裆。
通過(guò)這個(gè)版本,用戶可以使用Flink SQL讀取已有的Hive meta和data彬檀,做數(shù)據(jù)處理帆啃。未來(lái)我們將在Flink上繼續(xù)加大對(duì)Hive兼容性的支持,包括支持Hive特有的data type窍帝,和Hive UDF等等努潘。
2.如何連接hive 源數(shù)據(jù)
2.1 代碼
通過(guò)flink-sql連接外部數(shù)據(jù)源(比如hive),需要寫一些代碼聲明盯桦。
2.2 blink sql-client
3. 環(huán)境準(zhǔn)備
3.1安裝hadoop
參考https://blog.csdn.net/hubin232/article/details/76769265
cd /usr/local/Cellar/hadoop/3.1.1/libexec/sbin
./start-all.sh //即可啟動(dòng) hadoop namenode,secondnamenode,datanode,resource mananger組件
3.2 安裝hive
mac 環(huán)境下brew install hive 即可安裝最新版 (需要先裝mysql或者一個(gè)能連的上mysql也行)
3.3 配置
cd /usr/local/Cellar/hive/3.1.1/libexec/conf
cp hive-default.xml.template hive-site.xml
編輯hive-site.xml及后續(xù)參考http://www.reibang.com/p/5c11073d19d3
安裝后慈俯,建表,插數(shù)據(jù)拥峦。
3.4.metastore server開啟
注意L臁!略号! 一定要確保hive開啟了 metastore server
lsof -i:9083 查詢是否開啟了刑峡。
開啟方式有兩種
1
/usr/local/Cellar/hive/3.1.1/libexec/hcatalog/sbin/hcat_server.sh start
提示
Started metastore server init, testing if initialized correctly...
Metastore initialized successfully on port[9083].
就說(shuō)明成功了。
2 hive --service metastore (這個(gè)沒試過(guò))
lsof -i:9083
3.5 為什么要開啟metastore 呢玄柠?
blink catalog架構(gòu)圖
紅框內(nèi)就是連接的hive metastore,所以需要先開啟 hive 的metastore server突梦。
4. blink源碼修改
1. sql-client Environment 類
修改位置:
org.apache.flink.table.client.config.Environment
enrich 方法
enrichedEnv.deployment = DeploymentEntry.enrich(env.deployment, properties);
下方加入
enrichedEnv.catalogs = new HashMap<>(env.catalogs);
這塊沒有將catalogs復(fù)制過(guò)去,會(huì)導(dǎo)致從環(huán)境中讀取到的catalogs丟失羽利,用戶永遠(yuǎn)沒發(fā)定義catalog宫患。
2.hive connector
官方支持的hive版本是2.4,我的是3.1.1这弧,會(huì)報(bào)錯(cuò)
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.BatchTableSourceFactory' in
the classpath.
Reason:
The matching factory 'org.apache.flink.streaming.connectors.hive.HiveTableFactory' doesn't support 'bucketing_version'.
所以要在 flink-connector-hive模塊
在類org.apache.flink.table.catalog.hive.config.HiveTableConfig
加入:
public static final String DEFAULT_TABLE_BUCKETING_VERSION = "bucketing_version";
public static final String DEFAULT_TABLE_COLUMN_STATS_ACCURATE = "column_stats_accurate";
在類org.apache.flink.streaming.connectors.hive.HiveTableFactory
supportedProperties 方法加入:
properties.add(HiveTableConfig.DEFAULT_TABLE_BUCKETING_VERSION);
properties.add(HiveTableConfig.DEFAULT_TABLE_COLUMN_STATS_ACCURATE);
修改pom
否則會(huì)報(bào)找不到HadoopInputFormatCommonBase 這個(gè)類娃闲。
3 jar包替換。
代碼修改后
在sql-client模塊mvn clean install -Dmaven.test.skip -Dcheckstyle.skip -Drat.ignoreErrors=true
將flink-sql-client-1.5.1.jar包拷到 /apache-flink/build-target/opt/sql-client
在flink-connector-hive模塊 mvn clean install -Dmaven.test.skip -Dcheckstyle.skip -Drat.ignoreErrors=true
將flink-connector-hive_2.11-1.5.1.jar拷到/apache-flink/build-target/opt/connectors
5.應(yīng)用
5.1配置
進(jìn)入/apache-flink/build-target/bin
cp ../conf/sql-client-default.ymal sql-client-hive.ymal
修改sql-client-hive.ymal
execution配置:
(streaming模式不支持匾浪,應(yīng)該可以通過(guò)修改flink-connector-hive模塊代碼支持皇帮。)
catalogs配置:
5.2.執(zhí)行./sql-client.sh embedded -e sql-client-hive.yaml
至此就打通blink的 sql-client與 hive源數(shù)據(jù)了。