Flink 使用介紹相關(guān)文檔目錄
背景
Flink 1.16.0整合了SQL Gateway访忿,提供了多種客戶端遠(yuǎn)程并發(fā)執(zhí)行SQL的能力染乌。Flink終于擁有了類似于Spark Thrift server的能力。
本篇為大家?guī)鞦link SQL Gateway的部署絮蒿、配置和使用碗硬。
作者使用的環(huán)境信息:
- Flink 1.16.0
- Hadoop 3.1.1
- Hive 3.1.2
官網(wǎng)關(guān)于SQL Gateway的講解參見https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql-gateway/overview/吝秕。
部署服務(wù)
SQL Gateway提交作業(yè)的執(zhí)行后端可以是Flink的standalone集群或者是Yarn集群。
Standalone 集群
部署standalone集群可參見官網(wǎng)https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/resource-providers/standalone/overview/备蚓。
簡單來說有如下步驟:
- 建立集群主節(jié)點(diǎn)到各個(gè)子節(jié)點(diǎn)的免密课蔬。
- 解壓Flink 1.16.0安裝包到主節(jié)點(diǎn)。
- 編輯
$FLINK_HOME/conf/masters
和$FLINK_HOME/conf/workers
文件郊尝,分別填寫job manager和task manager的ip或者h(yuǎn)ostname二跋,一行填寫一個(gè)。通過這種方式手工指定Flink結(jié)群各角色在集群中的分布情況流昏。 - 切換到需要運(yùn)行Flink集群的用戶扎即,在主節(jié)點(diǎn)執(zhí)行
$FLINK_HOME/bin/start-cluster.sh
,啟動(dòng)集群况凉。
關(guān)閉standalone集群可以執(zhí)行
$FLINK_HOME/bin/stop-cluster.sh
谚鄙。
集群成功啟動(dòng)之后可以接著啟動(dòng)sql-client。執(zhí)行:
$FLINK_HOME/bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=xxx.xxx.xxx.xxx
其中-Dsql-gateway.endpoint.rest.address
用來指定SQL Gateway服務(wù)綁定的地址刁绒。注意如果指定為localhost則SQL Gateway只能通過本機(jī)訪問襟锐,無法對(duì)外提供服務(wù)。SQL Gateway服務(wù)日志文件在$FLINK_HOME/log
目錄中膛锭。
可以執(zhí)行$FLINK_HOME/bin/sql-gateway.sh -h
獲取sql-gateway.sh
命令更多的使用方式:
Usage: sql-gateway.sh [start|start-foreground|stop|stop-all] [args]
commands:
start - Run a SQL Gateway as a daemon
start-foreground - Run a SQL Gateway as a console application
stop - Stop the SQL Gateway daemon
stop-all - Stop all the SQL Gateway daemons
-h | --help - Show this help message
建議調(diào)試運(yùn)行的時(shí)候使用start-foreground
前臺(tái)運(yùn)行粮坞,方便查看運(yùn)行日志和故障重啟服務(wù)蚊荣。
Yarn 集群
將Flink 1.16.0安裝包解壓在Yarn集群任意節(jié)點(diǎn),然后切換Flink用戶執(zhí)行:
export HADOOP_CLASSPATH=`hadoop classpath`
$FLINK_HOME/bin/yarn-session.sh -d -s 2 -jm 2048 -tm 2048
啟動(dòng)Flink Yarn集群莫杈。yarn-session.sh
后面的參數(shù)按照實(shí)際情況修改互例。最后需要在Yarn管理頁面的RUNNING Applications頁面檢查Flink Yarn集群是否正常啟動(dòng)。
要求Flink用戶必須擁有提交Yarn作業(yè)的權(quán)限筝闹。如果沒有媳叨,需要切換用戶或者使用Ranger賦權(quán)。
Yarn啟動(dòng)成功之后接著啟動(dòng)SQL Gateway关顷。務(wù)必使用和啟動(dòng)yarn-session相同的用戶來啟動(dòng)SQL Gateway糊秆。否則SQL Gateway無法找到y(tǒng)arn application id。盡管能正常啟動(dòng)议双,但是執(zhí)行SQL提交任務(wù)的時(shí)候會(huì)失敗痘番。
SQL Gateway正常啟動(dòng)后應(yīng)能看到類似如下的日志:
INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-flink
Yarn properties file命名格式為.yarn-properties-{用戶名}
。本文作者使用flink用戶平痰,所以文件名為.yarn-properties-flink
汞舱。如果有這一行日志,說明SQL Gateway找到了Flink Yarn集群宗雇。
在后面使用過程中昂芜,作業(yè)成功提交之后,日志中可以看到類似如下內(nèi)容:
INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface xxx.xxx.xxx.xxx:40494 of application 'application_1670204805747_0006'.
INFO org.apache.flink.client.program.rest.RestClusterClient [] - Submitting job 'collect' (8bbea014547408c4716a483a701af8ab).
INFO org.apache.flink.client.program.rest.RestClusterClient [] - Successfully submitted job 'collect' (8bbea014547408c4716a483a701af8ab) to 'http://ip:40494'.
SQL Gateway能夠找到Flink Yarn集群對(duì)應(yīng)的application id赔蒲,并且將作業(yè)提交給這個(gè)集群泌神。
配置項(xiàng)
可以通過如下方式動(dòng)態(tài)指定SQL Gateway的配置項(xiàng)
$FLINK_HOME/bin/sql-gateway.sh -Dkey=value
官網(wǎng)給出的配置項(xiàng)列表如下:
Key | Default | Type | Description |
---|---|---|---|
sql-gateway.session.check-interval | 1 min | Duration | The check interval for idle session timeout, which can be disabled by setting to zero or negative value. |
sql-gateway.session.idle-timeout | 10 min | Duration | Timeout interval for closing the session when the session hasn't been accessed during the interval. If setting to zero or negative value, the session will not be closed. |
sql-gateway.session.max-num | 1000000 | Integer | The maximum number of the active session for sql gateway service. |
sql-gateway.worker.keepalive-time | 5 min | Duration | Keepalive time for an idle worker thread. When the number of workers exceeds min workers, excessive threads are killed after this time interval. |
sql-gateway.worker.threads.max | 500 | Integer | The maximum number of worker threads for sql gateway service. |
sql-gateway.worker.threads.min | 5 | Integer | The minimum number of worker threads for sql gateway service. |
- sql-gateway.session.check-interval: 多長時(shí)間檢查一次session是否超時(shí)。配置為0或者負(fù)數(shù)可以禁止這個(gè)行為舞虱。
- sql-gateway.session.idle-timeout: session的超時(shí)時(shí)間欢际,超時(shí)的session會(huì)被自動(dòng)關(guān)閉。同樣配置為0或者負(fù)數(shù)可以禁止這個(gè)行為砾嫉。
- sql-gateway.session.max-num: 活躍session數(shù)量的最大值幼苛。
- sql-gateway.worker.keepalive-time: 空閑的worker線程敝侠椋活時(shí)間焕刮。當(dāng)實(shí)際worker線程數(shù)超過最小worker線程數(shù)之時(shí),多出來的線程會(huì)在這個(gè)時(shí)間之后被kill掉墙杯。
- sql-gateway.worker.threads.max: 最大worker線程數(shù)配并。
- sql-gateway.worker.threads.min: 最小worker線程數(shù)。
使用
Flink SQL Gateway支持Rest API模式和hiveserver2模式高镐。下面分別介紹它們的使用方式溉旋。
Rest API
前面部署過程中SQL Gateway默認(rèn)是以Rest API的形式提供服務(wù),這里直接講解使用方式嫉髓。假設(shè)在我們的測試環(huán)境SQL Gateway運(yùn)行的IP和端口為sql-gateway-ip:8083
观腊。
首先執(zhí)行:
curl --request POST http://sql-gateway-ip:8083/v1/sessions
創(chuàng)建并獲取到一個(gè)sessionHandle
邑闲。示例返回如下:
{"sessionHandle":"2f35eb7e-97f0-40a4-b22d-f49c3a8fe7ef"}
然后以執(zhí)行SQL SELECT 1
語句為例。格式為:
curl --request POST http://sql-gateway-ip:8083/v1/sessions/${sessionHandle}/statements/ --data '{"statement": "SELECT 1"}'
我們替換sessionHandle
為上面返回的sessionHandle
梧油,實(shí)際命令如下:
curl --request POST http://sql-gateway-ip:8083/v1/sessions/2f35eb7e-97f0-40a4-b22d-f49c3a8fe7ef/statements/ --data '{"statement": "SELECT 1"}'
得到的返回值包含一個(gè)operationHandle
苫耸,如下所示:
{"operationHandle":"7dcb0266-ed64-423d-a984-310dc6398e5e"}
最后我們使用sessionHandle
和operationHandle
來獲取運(yùn)行結(jié)果。格式為:
curl --request GET http://sql-gateway-ip:8083/v1/sessions/${sessionHandle}/operations/${operationHandle}/result/0
其中最后一個(gè)0
為token儡陨⊥首樱可以理解為查詢結(jié)果是分頁(分批)返回,token為頁碼骗村。
替換sessionHandle
和operationHandle
為前面獲取的真實(shí)值嫌褪,實(shí)際命令如下:
curl --request GET http://localhost:8083/v1/sessions/2f35eb7e-97f0-40a4-b22d-f49c3a8fe7ef/operations/7dcb0266-ed64-423d-a984-310dc6398e5e/result/0
得到結(jié)果如下:
{"results":{"columns":[{"name":"EXPR$0","logicalType":{"type":"INTEGER","nullable":false},"comment":null}],"data":[{"kind":"INSERT","fields":[1]}]},"resultType":"PAYLOAD","nextResultUri":"/v1/sessions/2f35eb7e-97f0-40a4-b22d-f49c3a8fe7ef/operations/7dcb0266-ed64-423d-a984-310dc6398e5e/result/1"}
我們從result -> data -> fields 可以得到SELECT 1
的運(yùn)行結(jié)果為1。
前面提到token的作用類似于分頁胚股。上面JSON的nextResultUri
告訴我們獲取下一批結(jié)果的URL笼痛。發(fā)現(xiàn)token從0變成了1。我們訪問這個(gè)nextResultUri
:
curl --request GET http://localhost:8083/v1/sessions/2f35eb7e-97f0-40a4-b22d-f49c3a8fe7ef/operations/7dcb0266-ed64-423d-a984-310dc6398e5e/result/1
返回如下內(nèi)容:
{"results":{"columns":[{"name":"EXPR$0","logicalType":{"type":"INTEGER","nullable":false},"comment":null}],"data":[]},"resultType":"EOS","nextResultUri":null}
可以看到resultType
為EOS
信轿,表示所有結(jié)果都已經(jīng)獲取到了晃痴。此時(shí)nextResultUri
為null,沒有下一頁結(jié)果财忽。
hiveserver2
除了上述的Rest API之外倘核,SQL Gateway還支持hiveserver2模式。
官網(wǎng)SQL Gateway hiveserver2模式相關(guān)內(nèi)容參見https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/hive-compatibility/hiveserver2/即彪。
要支持hiveserver2模式要求配置相關(guān)的依賴紧唱。首先需要添加flink-connector-hive_2.12-1.16.0.jar
到Flink的lib
目錄中。jar下載地址為:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.16.0/flink-connector-hive_2.12-1.16.0.jar
除此之外還需要Hive的相關(guān)依賴:
- hive-common.jar
- hive-service-rpc.jar
- hive-exec.jar
- libthrift.jar
- libfb303.jar
- antlr-runtime.jar
這些包的版本需要和集群內(nèi)的Hive保持一致隶校,建議從集群Hive安裝位置的lib
目錄直接復(fù)制漏益。
以hiveserver2模式啟動(dòng)SQL Gateway的命令為:
$FLINK_HOME/bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=xxx.xxx.xxx.xxx -Dsql-gateway.endpoint.type=hiveserver2 -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/path/to/hive/conf -Dsql-gateway.endpoint.hiveserver2.thrift.port=10000
其參數(shù)的含義為:
- -Dsql-gateway.endpoint.rest.address: SQL Gateway服務(wù)綁定地址。
- -Dsql-gateway.endpoint.type: 指定endpoint類型深胳。默認(rèn)值為
rest
即Rest API绰疤。使用hiveserver2
類型必須顯式配置。 - -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir:
hive-site.xml
配置文件所在目錄舞终。方便連接到Hive metastore轻庆,獲取表的元數(shù)據(jù)信息。 - -Dsql-gateway.endpoint.hiveserver2.thrift.port: hiveserver2模式SQL Gateway使用的端口敛劝。相當(dāng)于Hive thriftserver的端口余爆。
除了上面列舉出的之外,hiveserver2模式還有很多配置項(xiàng)夸盟,參見https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/hive-compatibility/hiveserver2/#endpoint-options蛾方。這里不再一一列出。
現(xiàn)在啟動(dòng)SQL Gateway可能出現(xiàn)下面的錯(cuò)誤:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hive' that implements 'org.apache.flink.table.planner.delegation.DialectFactory' in the classpath.
Available factory identifiers are:
Note: if you want to use Hive dialect, please first move the jar `flink-table-planner_2.12` located in `FLINK_HOME/opt` to `FLINK_HOME/lib` and then move out the jar `flink-table-planner-loader` from `FLINK_HOME/lib`.
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:545) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
at org.apache.flink.table.planner.delegation.PlannerBase.getDialectFactory(PlannerBase.scala:161) ~[?:?]
at org.apache.flink.table.planner.delegation.PlannerBase.getParser(PlannerBase.scala:171) ~[?:?]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.getParser(TableEnvironmentImpl.java:1694) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.<init>(TableEnvironmentImpl.java:240) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.<init>(AbstractStreamTableEnvironmentImpl.java:89) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.<init>(StreamTableEnvironmentImpl.java:84) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
at org.apache.flink.table.gateway.service.context.SessionContext.createStreamTableEnvironment(SessionContext.java:309) ~[flink-sql-gateway-1.16.0.jar:1.16.0]
at org.apache.flink.table.gateway.service.context.SessionContext.createTableEnvironment(SessionContext.java:269) ~[flink-sql-gateway-1.16.0.jar:1.16.0]
at org.apache.flink.table.gateway.service.operation.OperationExecutor.getTableEnvironment(OperationExecutor.java:218) ~[flink-sql-gateway-1.16.0.jar:1.16.0]
at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:89) ~[flink-sql-gateway-1.16.0.jar:1.16.0]
at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$0(SqlGatewayServiceImpl.java:182) ~[flink-sql-gateway-1.16.0.jar:1.16.0]
at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:111) ~[flink-sql-gateway-1.16.0.jar:1.16.0]
at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:239) ~[flink-sql-gateway-1.16.0.jar:1.16.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_121]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_121]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_121]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
2022-12-08 17:42:03,007 INFO org.apache.flink.table.catalog.hive.HiveCatalog [] - Created HiveCatalog 'hive'
2022-12-08 17:42:03,008 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Trying to connect to metastore with URI thrift://xxx.xxx.xxx.xxx:9083
2022-12-08 17:42:03,008 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Opened a connection to metastore, current connections: 3
2022-12-08 17:42:03,009 INFO org.apache.hadoop.hive.metastore.HiveMetaStoreClient [] - Connected to metastore.
2022-12-08 17:42:03,010 INFO org.apache.hadoop.hive.metastore.RetryingMetaStoreClient [] - RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=yarn (auth:SIMPLE) retries=24 delay=5 lifetime=0
2022-12-08 17:42:03,010 INFO org.apache.flink.table.catalog.hive.HiveCatalog [] - Connected to Hive metastore
2022-12-08 17:42:03,026 INFO org.apache.flink.table.module.ModuleManager [] - Loaded module 'hive' from class org.apache.flink.table.module.hive.HiveModule
2022-12-08 17:42:03,030 INFO org.apache.flink.table.gateway.service.session.SessionManager [] - Session f3f6f339-f5b0-425f-94ad-3e9ad11981c1 is opened, and the number of current sessions is 3.
2022-12-08 17:42:03,043 ERROR org.apache.flink.table.gateway.service.operation.OperationManager [] - Failed to execute the operation 7922e186-8110-4bb8-b93d-db17d88eac48.
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hive' that implements 'org.apache.flink.table.planner.delegation.DialectFactory' in the classpath.
如果遇到這個(gè)錯(cuò)誤,說明Flink沒有發(fā)現(xiàn)Hive方言桩砰,需要將Flink opt
目錄中的flink-table-planner_2.12-1.16.0.jar
到lib
目錄拓春,然后將lib
目錄中的flink-table-planner-loader-1.16.0.jar
移除掉。
到目前為止Flink的lib
目錄內(nèi)容為:
antlr-runtime-3.5.2.jar
flink-cep-1.16.0.jar
flink-connector-files-1.16.0.jar
flink-connector-hive_2.12-1.16.0.jar
flink-csv-1.16.0.jar
flink-dist-1.16.0.jar
flink-json-1.16.0.jar
flink-scala_2.12-1.16.0.jar
flink-shaded-zookeeper-3.5.9.jar
flink-table-api-java-uber-1.16.0.jar
flink-table-planner_2.12-1.16.0.jar
flink-table-runtime-1.16.0.jar
hive-common-3.1.0.3.0.1.0-187.jar
hive-exec-3.1.0.3.0.1.0-187.jar
hive-service-rpc-3.1.0.3.0.1.0-187.jar
libfb303-0.9.3.jar
libthrift-0.9.3.jar
log4j-1.2-api-2.17.1.jar
log4j-api-2.17.1.jar
log4j-core-2.17.1.jar
log4j-slf4j-impl-2.17.1.jar
此時(shí)已經(jīng)可以正常使用SQL Gateway亚隅。但是使用Flink查詢Hive表仍會(huì)出現(xiàn)缺少依賴問題痘儡。還需要添加Hadoop相關(guān)依賴:
- hadoop-common.jar
- hadoop-mapreduce-client-common.jar
- hadoop-mapreduce-client-core.jar
- hadoop-mapreduce-client-jobclient.jar
最終lib
目錄內(nèi)容為:
antlr-runtime-3.5.2.jar
flink-cep-1.16.0.jar
flink-connector-files-1.16.0.jar
flink-connector-hive_2.12-1.16.0.jar
flink-csv-1.16.0.jar
flink-dist-1.16.0.jar
flink-json-1.16.0.jar
flink-scala_2.12-1.16.0.jar
flink-shaded-zookeeper-3.5.9.jar
flink-table-api-java-uber-1.16.0.jar
flink-table-planner_2.12-1.16.0.jar
flink-table-runtime-1.16.0.jar
hadoop-common-3.1.1.3.0.1.0-187.jar
hadoop-mapreduce-client-common-3.1.1.3.0.1.0-187.jar
hadoop-mapreduce-client-core-3.1.1.3.0.1.0-187.jar
hadoop-mapreduce-client-jobclient-3.1.1.3.0.1.0-187.jar
hive-common-3.1.0.3.0.1.0-187.jar
hive-exec-3.1.0.3.0.1.0-187.jar
hive-service-rpc-3.1.0.3.0.1.0-187.jar
libfb303-0.9.3.jar
libthrift-0.9.3.jar
log4j-1.2-api-2.17.1.jar
log4j-api-2.17.1.jar
log4j-core-2.17.1.jar
log4j-slf4j-impl-2.17.1.jar
最后再次嘗試啟動(dòng),筆者測試能夠啟動(dòng)成功枢步。
接下來的工作是使用JDBC連接SQL Gateway沉删。需要注意的是連接URL必須添加auth=noSasl
屬性。比如:
jdbc:hive2://sql-gateway-ip:10000/default;auth=noSasl
否則SQL Gateway會(huì)出現(xiàn)下面錯(cuò)誤:
org.apache.thrift.protocol.TProtocolException: Missing version in readMessageBegin, old client?
接下來分別介紹使用DBeaver醉途,Java代碼和Beeline方式連接Flink SQL Gateway矾瑰。
DBeaver
依次點(diǎn)擊 新建連接 -> Apache Hive(可以搜索出來)。在主要 -> 一般窗格中填寫主機(jī)端口號(hào)和數(shù)據(jù)庫(可不寫)隘擎。然后在驅(qū)動(dòng)屬性tab頁殴穴,添加名稱為auth
的用戶屬性,值為noSasl
货葬。點(diǎn)擊完成按鈕采幌,連接創(chuàng)建完畢,可以點(diǎn)擊工具欄SQL按鈕打開SQL窗口編寫SQL震桶。
注意:在創(chuàng)建連接的最后異步需要從GitHub上下載Hive JDBC驅(qū)動(dòng)休傍。可能會(huì)因?yàn)榫W(wǎng)絡(luò)問題下載超時(shí)蹲姐,在DBeaver中點(diǎn)擊重試也沒辦法解決磨取。我們可以手動(dòng)下載。方法為在連接到數(shù)據(jù)庫向?qū)е悬c(diǎn)擊編輯驅(qū)動(dòng)柴墩,點(diǎn)擊庫這個(gè)tab頁忙厌。可以看到驅(qū)動(dòng)的下載鏈接江咳。將其復(fù)制到瀏覽器下載逢净。然后我們進(jìn)入
C:\Users\xxx\AppData\Roaming\DBeaverData\drivers\remote\
目錄逐層向下查找驅(qū)動(dòng)類的存放路徑,例如C:\Users\xxx\AppData\Roaming\DBeaverData\drivers\remote\timveil\hive-jdbc-uber-jar\releases\download\v1.9-2.6.5
歼指。將瀏覽器下載好的驅(qū)動(dòng)放置到這個(gè)目錄(如果目錄中有DBeaver下載了一半失敗的驅(qū)動(dòng)文件爹土,需要先刪除掉)。點(diǎn)擊在連接到數(shù)據(jù)庫向?qū)У耐瓿砂粹o關(guān)閉向?qū)Ь涂梢粤恕?/p>
使用Java代碼
Maven需要添加如下依賴:
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>3.1.2</version>
</dependency>
然后編寫Java代碼:
public static void main(String[] args) throws Exception {
Class.forName("org.apache.hive.jdbc.HiveDriver");
try (
// Please replace the JDBC URI with your actual host, port and database.
Connection connection = DriverManager.getConnection("jdbc:hive2://sql-gateway-ip:10000/default;auth=noSasl");
Statement statement = connection.createStatement()) {
statement.execute("select * from some_table");
ResultSet resultSet = statement.getResultSet();
while (resultSet.next()) {
System.out.println(resultSet.getString(1));
}
}
}
和傳統(tǒng)JDBC使用方式?jīng)]有任何區(qū)別东臀。需要注意Hive驅(qū)動(dòng)的類名為org.apache.hive.jdbc.HiveDriver
着饥。
使用 Beeline
啟動(dòng)beeline并使用如下命令連接SQL Gateway:
./beeline
!connect jdbc:hive2://sql-gateway-ip:10000/default;auth=noSasl
接下來會(huì)詢問使用的用戶名和密碼犀农。由于當(dāng)前版本不支持認(rèn)證惰赋,可直接回車略過。連接成功之后可以像使用Hive一樣使用SQL語句。
上面是官網(wǎng)給出的使用beeline工具的方式赁濒。但本人在驗(yàn)證的過程中遇到了如下錯(cuò)誤:
2022-12-09 10:24:28,600 ERROR org.apache.flink.table.endpoint.hive.HiveServer2Endpoint [] - Failed to GetInfo.
java.lang.UnsupportedOperationException: Unrecognized TGetInfoType value: CLI_ODBC_KEYWORDS.
at org.apache.flink.table.endpoint.hive.HiveServer2Endpoint.GetInfo(HiveServer2Endpoint.java:371) [flink-connector-hive_2.12-1.16.0.jar:1.16.0]
at org.apache.hive.service.rpc.thrift.TCLIService$Processor$GetInfo.getResult(TCLIService.java:1537) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
at org.apache.hive.service.rpc.thrift.TCLIService$Processor$GetInfo.getResult(TCLIService.java:1522) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
2022-12-09 10:24:28,600 ERROR org.apache.thrift.server.TThreadPoolServer [] - Thrift error occurred during processing of message.
org.apache.thrift.protocol.TProtocolException: Required field 'infoValue' is unset! Struct:TGetInfoResp(status:TStatus(statusCode:ERROR_STATUS, infoMessages:[*java.lang.UnsupportedOperationException:Unrecognized TGetInfoType value: CLI_ODBC_KEYWORDS.:9:8, org.apache.flink.table.endpoint.hive.HiveServer2Endpoint:GetInfo:HiveServer2Endpoint.java:371, org.apache.hive.service.rpc.thrift.TCLIService$Processor$GetInfo:getResult:TCLIService.java:1537, org.apache.hive.service.rpc.thrift.TCLIService$Processor$GetInfo:getResult:TCLIService.java:1522, org.apache.thrift.ProcessFunction:process:ProcessFunction.java:39, org.apache.thrift.TBaseProcessor:process:TBaseProcessor.java:39, org.apache.thrift.server.TThreadPoolServer$WorkerProcess:run:TThreadPoolServer.java:286, java.util.concurrent.ThreadPoolExecutor:runWorker:ThreadPoolExecutor.java:1142, java.util.concurrent.ThreadPoolExecutor$Worker:run:ThreadPoolExecutor.java:617, java.lang.Thread:run:Thread.java:745], errorMessage:Unrecognized TGetInfoType value: CLI_ODBC_KEYWORDS.), infoValue:null)
at org.apache.hive.service.rpc.thrift.TGetInfoResp.validate(TGetInfoResp.java:379) ~[hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
at org.apache.hive.service.rpc.thrift.TCLIService$GetInfo_result.validate(TCLIService.java:5228) ~[hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
at org.apache.hive.service.rpc.thrift.TCLIService$GetInfo_result$GetInfo_resultStandardScheme.write(TCLIService.java:5285) ~[hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
at org.apache.hive.service.rpc.thrift.TCLIService$GetInfo_result$GetInfo_resultStandardScheme.write(TCLIService.java:5254) ~[hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
at org.apache.hive.service.rpc.thrift.TCLIService$GetInfo_result.write(TCLIService.java:5205) ~[hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:53) ~[hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) ~[hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
2022-12-09 10:24:28,600 WARN org.apache.thrift.transport.TIOStreamTransport [] - Error closing output stream.
java.net.SocketException: Socket closed
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:118) ~[?:1.8.0_121]
at java.net.SocketOutputStream.write(SocketOutputStream.java:155) ~[?:1.8.0_121]
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) ~[?:1.8.0_121]
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) ~[?:1.8.0_121]
at java.io.FilterOutputStream.close(FilterOutputStream.java:158) ~[?:1.8.0_121]
at org.apache.thrift.transport.TIOStreamTransport.close(TIOStreamTransport.java:110) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
at org.apache.thrift.transport.TSocket.close(TSocket.java:235) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:303) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
調(diào)查這個(gè)錯(cuò)誤發(fā)現(xiàn)是Flnk 1.16.0版本的bug轨奄。這個(gè)問題鏈接為FLINK-29839。社區(qū)已經(jīng)在1.16.1版本中解決拒炎。
本博客為作者原創(chuàng)挪拟,歡迎大家參與討論和批評(píng)指正。如需轉(zhuǎn)載請注明出處击你。