Flink 使用之 SQL Gateway

Flink 使用介紹相關(guān)文檔目錄

Flink 使用介紹相關(guān)文檔目錄

背景

Flink 1.16.0整合了SQL Gateway访忿,提供了多種客戶端遠(yuǎn)程并發(fā)執(zhí)行SQL的能力染乌。Flink終于擁有了類似于Spark Thrift server的能力。

本篇為大家?guī)鞦link SQL Gateway的部署絮蒿、配置和使用碗硬。

作者使用的環(huán)境信息:

  • Flink 1.16.0
  • Hadoop 3.1.1
  • Hive 3.1.2

官網(wǎng)關(guān)于SQL Gateway的講解參見https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql-gateway/overview/吝秕。

部署服務(wù)

SQL Gateway提交作業(yè)的執(zhí)行后端可以是Flink的standalone集群或者是Yarn集群。

Standalone 集群

部署standalone集群可參見官網(wǎng)https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/resource-providers/standalone/overview/备蚓。

簡單來說有如下步驟:

  1. 建立集群主節(jié)點(diǎn)到各個(gè)子節(jié)點(diǎn)的免密课蔬。
  2. 解壓Flink 1.16.0安裝包到主節(jié)點(diǎn)。
  3. 編輯$FLINK_HOME/conf/masters$FLINK_HOME/conf/workers文件郊尝,分別填寫job manager和task manager的ip或者h(yuǎn)ostname二跋,一行填寫一個(gè)。通過這種方式手工指定Flink結(jié)群各角色在集群中的分布情況流昏。
  4. 切換到需要運(yùn)行Flink集群的用戶扎即,在主節(jié)點(diǎn)執(zhí)行$FLINK_HOME/bin/start-cluster.sh,啟動(dòng)集群况凉。

關(guān)閉standalone集群可以執(zhí)行$FLINK_HOME/bin/stop-cluster.sh谚鄙。

集群成功啟動(dòng)之后可以接著啟動(dòng)sql-client。執(zhí)行:

$FLINK_HOME/bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=xxx.xxx.xxx.xxx

其中-Dsql-gateway.endpoint.rest.address用來指定SQL Gateway服務(wù)綁定的地址刁绒。注意如果指定為localhost則SQL Gateway只能通過本機(jī)訪問襟锐,無法對(duì)外提供服務(wù)。SQL Gateway服務(wù)日志文件在$FLINK_HOME/log目錄中膛锭。

可以執(zhí)行$FLINK_HOME/bin/sql-gateway.sh -h獲取sql-gateway.sh命令更多的使用方式:

Usage: sql-gateway.sh [start|start-foreground|stop|stop-all] [args]
  commands:
    start               - Run a SQL Gateway as a daemon
    start-foreground    - Run a SQL Gateway as a console application
    stop                - Stop the SQL Gateway daemon
    stop-all            - Stop all the SQL Gateway daemons
    -h | --help         - Show this help message

建議調(diào)試運(yùn)行的時(shí)候使用start-foreground前臺(tái)運(yùn)行粮坞,方便查看運(yùn)行日志和故障重啟服務(wù)蚊荣。

Yarn 集群

將Flink 1.16.0安裝包解壓在Yarn集群任意節(jié)點(diǎn),然后切換Flink用戶執(zhí)行:

export HADOOP_CLASSPATH=`hadoop classpath`
$FLINK_HOME/bin/yarn-session.sh -d -s 2 -jm 2048 -tm 2048

啟動(dòng)Flink Yarn集群莫杈。yarn-session.sh后面的參數(shù)按照實(shí)際情況修改互例。最后需要在Yarn管理頁面的RUNNING Applications頁面檢查Flink Yarn集群是否正常啟動(dòng)。

要求Flink用戶必須擁有提交Yarn作業(yè)的權(quán)限筝闹。如果沒有媳叨,需要切換用戶或者使用Ranger賦權(quán)。

Yarn啟動(dòng)成功之后接著啟動(dòng)SQL Gateway关顷。務(wù)必使用和啟動(dòng)yarn-session相同的用戶來啟動(dòng)SQL Gateway糊秆。否則SQL Gateway無法找到y(tǒng)arn application id。盡管能正常啟動(dòng)议双,但是執(zhí)行SQL提交任務(wù)的時(shí)候會(huì)失敗痘番。

SQL Gateway正常啟動(dòng)后應(yīng)能看到類似如下的日志:

INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-flink

Yarn properties file命名格式為.yarn-properties-{用戶名}。本文作者使用flink用戶平痰,所以文件名為.yarn-properties-flink汞舱。如果有這一行日志,說明SQL Gateway找到了Flink Yarn集群宗雇。

在后面使用過程中昂芜,作業(yè)成功提交之后,日志中可以看到類似如下內(nèi)容:

INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface xxx.xxx.xxx.xxx:40494 of application 'application_1670204805747_0006'.
INFO  org.apache.flink.client.program.rest.RestClusterClient       [] - Submitting job 'collect' (8bbea014547408c4716a483a701af8ab).
INFO  org.apache.flink.client.program.rest.RestClusterClient       [] - Successfully submitted job 'collect' (8bbea014547408c4716a483a701af8ab) to 'http://ip:40494'.

SQL Gateway能夠找到Flink Yarn集群對(duì)應(yīng)的application id赔蒲,并且將作業(yè)提交給這個(gè)集群泌神。

配置項(xiàng)

可以通過如下方式動(dòng)態(tài)指定SQL Gateway的配置項(xiàng)

$FLINK_HOME/bin/sql-gateway.sh -Dkey=value

官網(wǎng)給出的配置項(xiàng)列表如下:

Key Default Type Description
sql-gateway.session.check-interval 1 min Duration The check interval for idle session timeout, which can be disabled by setting to zero or negative value.
sql-gateway.session.idle-timeout 10 min Duration Timeout interval for closing the session when the session hasn't been accessed during the interval. If setting to zero or negative value, the session will not be closed.
sql-gateway.session.max-num 1000000 Integer The maximum number of the active session for sql gateway service.
sql-gateway.worker.keepalive-time 5 min Duration Keepalive time for an idle worker thread. When the number of workers exceeds min workers, excessive threads are killed after this time interval.
sql-gateway.worker.threads.max 500 Integer The maximum number of worker threads for sql gateway service.
sql-gateway.worker.threads.min 5 Integer The minimum number of worker threads for sql gateway service.
  • sql-gateway.session.check-interval: 多長時(shí)間檢查一次session是否超時(shí)。配置為0或者負(fù)數(shù)可以禁止這個(gè)行為舞虱。
  • sql-gateway.session.idle-timeout: session的超時(shí)時(shí)間欢际,超時(shí)的session會(huì)被自動(dòng)關(guān)閉。同樣配置為0或者負(fù)數(shù)可以禁止這個(gè)行為砾嫉。
  • sql-gateway.session.max-num: 活躍session數(shù)量的最大值幼苛。
  • sql-gateway.worker.keepalive-time: 空閑的worker線程敝侠椋活時(shí)間焕刮。當(dāng)實(shí)際worker線程數(shù)超過最小worker線程數(shù)之時(shí),多出來的線程會(huì)在這個(gè)時(shí)間之后被kill掉墙杯。
  • sql-gateway.worker.threads.max: 最大worker線程數(shù)配并。
  • sql-gateway.worker.threads.min: 最小worker線程數(shù)。

使用

Flink SQL Gateway支持Rest API模式和hiveserver2模式高镐。下面分別介紹它們的使用方式溉旋。

Rest API

前面部署過程中SQL Gateway默認(rèn)是以Rest API的形式提供服務(wù),這里直接講解使用方式嫉髓。假設(shè)在我們的測試環(huán)境SQL Gateway運(yùn)行的IP和端口為sql-gateway-ip:8083观腊。

首先執(zhí)行:

curl --request POST http://sql-gateway-ip:8083/v1/sessions

創(chuàng)建并獲取到一個(gè)sessionHandle邑闲。示例返回如下:

{"sessionHandle":"2f35eb7e-97f0-40a4-b22d-f49c3a8fe7ef"}

然后以執(zhí)行SQL SELECT 1語句為例。格式為:

curl --request POST http://sql-gateway-ip:8083/v1/sessions/${sessionHandle}/statements/ --data '{"statement": "SELECT 1"}'

我們替換sessionHandle為上面返回的sessionHandle梧油,實(shí)際命令如下:

curl --request POST http://sql-gateway-ip:8083/v1/sessions/2f35eb7e-97f0-40a4-b22d-f49c3a8fe7ef/statements/ --data '{"statement": "SELECT 1"}'

得到的返回值包含一個(gè)operationHandle苫耸,如下所示:

{"operationHandle":"7dcb0266-ed64-423d-a984-310dc6398e5e"}

最后我們使用sessionHandleoperationHandle來獲取運(yùn)行結(jié)果。格式為:

curl --request GET http://sql-gateway-ip:8083/v1/sessions/${sessionHandle}/operations/${operationHandle}/result/0

其中最后一個(gè)0為token儡陨⊥首樱可以理解為查詢結(jié)果是分頁(分批)返回,token為頁碼骗村。

替換sessionHandleoperationHandle為前面獲取的真實(shí)值嫌褪,實(shí)際命令如下:

curl --request GET http://localhost:8083/v1/sessions/2f35eb7e-97f0-40a4-b22d-f49c3a8fe7ef/operations/7dcb0266-ed64-423d-a984-310dc6398e5e/result/0

得到結(jié)果如下:

{"results":{"columns":[{"name":"EXPR$0","logicalType":{"type":"INTEGER","nullable":false},"comment":null}],"data":[{"kind":"INSERT","fields":[1]}]},"resultType":"PAYLOAD","nextResultUri":"/v1/sessions/2f35eb7e-97f0-40a4-b22d-f49c3a8fe7ef/operations/7dcb0266-ed64-423d-a984-310dc6398e5e/result/1"}

我們從result -> data -> fields 可以得到SELECT 1的運(yùn)行結(jié)果為1。

前面提到token的作用類似于分頁胚股。上面JSON的nextResultUri告訴我們獲取下一批結(jié)果的URL笼痛。發(fā)現(xiàn)token從0變成了1。我們訪問這個(gè)nextResultUri

curl --request GET http://localhost:8083/v1/sessions/2f35eb7e-97f0-40a4-b22d-f49c3a8fe7ef/operations/7dcb0266-ed64-423d-a984-310dc6398e5e/result/1

返回如下內(nèi)容:

{"results":{"columns":[{"name":"EXPR$0","logicalType":{"type":"INTEGER","nullable":false},"comment":null}],"data":[]},"resultType":"EOS","nextResultUri":null}

可以看到resultTypeEOS信轿,表示所有結(jié)果都已經(jīng)獲取到了晃痴。此時(shí)nextResultUri為null,沒有下一頁結(jié)果财忽。

hiveserver2

除了上述的Rest API之外倘核,SQL Gateway還支持hiveserver2模式。

官網(wǎng)SQL Gateway hiveserver2模式相關(guān)內(nèi)容參見https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/hive-compatibility/hiveserver2/即彪。

要支持hiveserver2模式要求配置相關(guān)的依賴紧唱。首先需要添加flink-connector-hive_2.12-1.16.0.jar到Flink的lib目錄中。jar下載地址為:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.16.0/flink-connector-hive_2.12-1.16.0.jar

除此之外還需要Hive的相關(guān)依賴:

  • hive-common.jar
  • hive-service-rpc.jar
  • hive-exec.jar
  • libthrift.jar
  • libfb303.jar
  • antlr-runtime.jar

這些包的版本需要和集群內(nèi)的Hive保持一致隶校,建議從集群Hive安裝位置的lib目錄直接復(fù)制漏益。

以hiveserver2模式啟動(dòng)SQL Gateway的命令為:

$FLINK_HOME/bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=xxx.xxx.xxx.xxx -Dsql-gateway.endpoint.type=hiveserver2 -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/path/to/hive/conf -Dsql-gateway.endpoint.hiveserver2.thrift.port=10000

其參數(shù)的含義為:

  • -Dsql-gateway.endpoint.rest.address: SQL Gateway服務(wù)綁定地址。
  • -Dsql-gateway.endpoint.type: 指定endpoint類型深胳。默認(rèn)值為rest即Rest API绰疤。使用hiveserver2類型必須顯式配置。
  • -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir: hive-site.xml配置文件所在目錄舞终。方便連接到Hive metastore轻庆,獲取表的元數(shù)據(jù)信息。
  • -Dsql-gateway.endpoint.hiveserver2.thrift.port: hiveserver2模式SQL Gateway使用的端口敛劝。相當(dāng)于Hive thriftserver的端口余爆。

除了上面列舉出的之外,hiveserver2模式還有很多配置項(xiàng)夸盟,參見https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/hive-compatibility/hiveserver2/#endpoint-options蛾方。這里不再一一列出。

現(xiàn)在啟動(dòng)SQL Gateway可能出現(xiàn)下面的錯(cuò)誤:

org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hive' that implements 'org.apache.flink.table.planner.delegation.DialectFactory' in the classpath.

Available factory identifiers are:

Note: if you want to use Hive dialect, please first move the jar `flink-table-planner_2.12` located in `FLINK_HOME/opt` to `FLINK_HOME/lib` and then move out the jar `flink-table-planner-loader` from `FLINK_HOME/lib`.
        at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:545) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
        at org.apache.flink.table.planner.delegation.PlannerBase.getDialectFactory(PlannerBase.scala:161) ~[?:?]
        at org.apache.flink.table.planner.delegation.PlannerBase.getParser(PlannerBase.scala:171) ~[?:?]
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.getParser(TableEnvironmentImpl.java:1694) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.<init>(TableEnvironmentImpl.java:240) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
        at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.<init>(AbstractStreamTableEnvironmentImpl.java:89) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
        at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.<init>(StreamTableEnvironmentImpl.java:84) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
        at org.apache.flink.table.gateway.service.context.SessionContext.createStreamTableEnvironment(SessionContext.java:309) ~[flink-sql-gateway-1.16.0.jar:1.16.0]
        at org.apache.flink.table.gateway.service.context.SessionContext.createTableEnvironment(SessionContext.java:269) ~[flink-sql-gateway-1.16.0.jar:1.16.0]
        at org.apache.flink.table.gateway.service.operation.OperationExecutor.getTableEnvironment(OperationExecutor.java:218) ~[flink-sql-gateway-1.16.0.jar:1.16.0]
        at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:89) ~[flink-sql-gateway-1.16.0.jar:1.16.0]
        at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$0(SqlGatewayServiceImpl.java:182) ~[flink-sql-gateway-1.16.0.jar:1.16.0]
        at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:111) ~[flink-sql-gateway-1.16.0.jar:1.16.0]
        at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:239) ~[flink-sql-gateway-1.16.0.jar:1.16.0]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_121]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_121]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_121]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_121]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
2022-12-08 17:42:03,007 INFO  org.apache.flink.table.catalog.hive.HiveCatalog              [] - Created HiveCatalog 'hive'
2022-12-08 17:42:03,008 INFO  org.apache.hadoop.hive.metastore.HiveMetaStoreClient         [] - Trying to connect to metastore with URI thrift://xxx.xxx.xxx.xxx:9083
2022-12-08 17:42:03,008 INFO  org.apache.hadoop.hive.metastore.HiveMetaStoreClient         [] - Opened a connection to metastore, current connections: 3
2022-12-08 17:42:03,009 INFO  org.apache.hadoop.hive.metastore.HiveMetaStoreClient         [] - Connected to metastore.
2022-12-08 17:42:03,010 INFO  org.apache.hadoop.hive.metastore.RetryingMetaStoreClient     [] - RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.metastore.HiveMetaStoreClient ugi=yarn (auth:SIMPLE) retries=24 delay=5 lifetime=0
2022-12-08 17:42:03,010 INFO  org.apache.flink.table.catalog.hive.HiveCatalog              [] - Connected to Hive metastore
2022-12-08 17:42:03,026 INFO  org.apache.flink.table.module.ModuleManager                  [] - Loaded module 'hive' from class org.apache.flink.table.module.hive.HiveModule
2022-12-08 17:42:03,030 INFO  org.apache.flink.table.gateway.service.session.SessionManager [] - Session f3f6f339-f5b0-425f-94ad-3e9ad11981c1 is opened, and the number of current sessions is 3.
2022-12-08 17:42:03,043 ERROR org.apache.flink.table.gateway.service.operation.OperationManager [] - Failed to execute the operation 7922e186-8110-4bb8-b93d-db17d88eac48.
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hive' that implements 'org.apache.flink.table.planner.delegation.DialectFactory' in the classpath.

如果遇到這個(gè)錯(cuò)誤,說明Flink沒有發(fā)現(xiàn)Hive方言桩砰,需要將Flink opt目錄中的flink-table-planner_2.12-1.16.0.jarlib目錄拓春,然后將lib目錄中的flink-table-planner-loader-1.16.0.jar移除掉。

到目前為止Flink的lib目錄內(nèi)容為:

antlr-runtime-3.5.2.jar
flink-cep-1.16.0.jar
flink-connector-files-1.16.0.jar
flink-connector-hive_2.12-1.16.0.jar
flink-csv-1.16.0.jar
flink-dist-1.16.0.jar
flink-json-1.16.0.jar
flink-scala_2.12-1.16.0.jar
flink-shaded-zookeeper-3.5.9.jar
flink-table-api-java-uber-1.16.0.jar
flink-table-planner_2.12-1.16.0.jar
flink-table-runtime-1.16.0.jar
hive-common-3.1.0.3.0.1.0-187.jar
hive-exec-3.1.0.3.0.1.0-187.jar
hive-service-rpc-3.1.0.3.0.1.0-187.jar
libfb303-0.9.3.jar
libthrift-0.9.3.jar
log4j-1.2-api-2.17.1.jar
log4j-api-2.17.1.jar
log4j-core-2.17.1.jar
log4j-slf4j-impl-2.17.1.jar

此時(shí)已經(jīng)可以正常使用SQL Gateway亚隅。但是使用Flink查詢Hive表仍會(huì)出現(xiàn)缺少依賴問題痘儡。還需要添加Hadoop相關(guān)依賴:

  • hadoop-common.jar
  • hadoop-mapreduce-client-common.jar
  • hadoop-mapreduce-client-core.jar
  • hadoop-mapreduce-client-jobclient.jar

最終lib目錄內(nèi)容為:

antlr-runtime-3.5.2.jar
flink-cep-1.16.0.jar
flink-connector-files-1.16.0.jar
flink-connector-hive_2.12-1.16.0.jar
flink-csv-1.16.0.jar
flink-dist-1.16.0.jar
flink-json-1.16.0.jar
flink-scala_2.12-1.16.0.jar
flink-shaded-zookeeper-3.5.9.jar
flink-table-api-java-uber-1.16.0.jar
flink-table-planner_2.12-1.16.0.jar
flink-table-runtime-1.16.0.jar
hadoop-common-3.1.1.3.0.1.0-187.jar
hadoop-mapreduce-client-common-3.1.1.3.0.1.0-187.jar
hadoop-mapreduce-client-core-3.1.1.3.0.1.0-187.jar
hadoop-mapreduce-client-jobclient-3.1.1.3.0.1.0-187.jar
hive-common-3.1.0.3.0.1.0-187.jar
hive-exec-3.1.0.3.0.1.0-187.jar
hive-service-rpc-3.1.0.3.0.1.0-187.jar
libfb303-0.9.3.jar
libthrift-0.9.3.jar
log4j-1.2-api-2.17.1.jar
log4j-api-2.17.1.jar
log4j-core-2.17.1.jar
log4j-slf4j-impl-2.17.1.jar

最后再次嘗試啟動(dòng),筆者測試能夠啟動(dòng)成功枢步。

接下來的工作是使用JDBC連接SQL Gateway沉删。需要注意的是連接URL必須添加auth=noSasl屬性。比如:

jdbc:hive2://sql-gateway-ip:10000/default;auth=noSasl

否則SQL Gateway會(huì)出現(xiàn)下面錯(cuò)誤:

org.apache.thrift.protocol.TProtocolException: Missing version in readMessageBegin, old client?

接下來分別介紹使用DBeaver醉途,Java代碼和Beeline方式連接Flink SQL Gateway矾瑰。

DBeaver

依次點(diǎn)擊 新建連接 -> Apache Hive(可以搜索出來)。在主要 -> 一般窗格中填寫主機(jī)端口號(hào)和數(shù)據(jù)庫(可不寫)隘擎。然后在驅(qū)動(dòng)屬性tab頁殴穴,添加名稱為auth的用戶屬性,值為noSasl货葬。點(diǎn)擊完成按鈕采幌,連接創(chuàng)建完畢,可以點(diǎn)擊工具欄SQL按鈕打開SQL窗口編寫SQL震桶。

注意:在創(chuàng)建連接的最后異步需要從GitHub上下載Hive JDBC驅(qū)動(dòng)休傍。可能會(huì)因?yàn)榫W(wǎng)絡(luò)問題下載超時(shí)蹲姐,在DBeaver中點(diǎn)擊重試也沒辦法解決磨取。我們可以手動(dòng)下載。方法為在連接到數(shù)據(jù)庫向?qū)е悬c(diǎn)擊編輯驅(qū)動(dòng)柴墩,點(diǎn)擊庫這個(gè)tab頁忙厌。可以看到驅(qū)動(dòng)的下載鏈接江咳。將其復(fù)制到瀏覽器下載逢净。然后我們進(jìn)入C:\Users\xxx\AppData\Roaming\DBeaverData\drivers\remote\目錄逐層向下查找驅(qū)動(dòng)類的存放路徑,例如C:\Users\xxx\AppData\Roaming\DBeaverData\drivers\remote\timveil\hive-jdbc-uber-jar\releases\download\v1.9-2.6.5歼指。將瀏覽器下載好的驅(qū)動(dòng)放置到這個(gè)目錄(如果目錄中有DBeaver下載了一半失敗的驅(qū)動(dòng)文件爹土,需要先刪除掉)。點(diǎn)擊在連接到數(shù)據(jù)庫向?qū)У耐瓿砂粹o關(guān)閉向?qū)Ь涂梢粤恕?/p>

使用Java代碼

Maven需要添加如下依賴:

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-jdbc</artifactId>
    <version>3.1.2</version>
</dependency>

然后編寫Java代碼:

public static void main(String[] args) throws Exception {
    Class.forName("org.apache.hive.jdbc.HiveDriver");
    try (
            // Please replace the JDBC URI with your actual host, port and database.
            Connection connection = DriverManager.getConnection("jdbc:hive2://sql-gateway-ip:10000/default;auth=noSasl");
            Statement statement = connection.createStatement()) {
        statement.execute("select * from some_table");
        ResultSet resultSet = statement.getResultSet();
        while (resultSet.next()) {
            System.out.println(resultSet.getString(1));
        }
    }
}

和傳統(tǒng)JDBC使用方式?jīng)]有任何區(qū)別东臀。需要注意Hive驅(qū)動(dòng)的類名為org.apache.hive.jdbc.HiveDriver着饥。

使用 Beeline

啟動(dòng)beeline并使用如下命令連接SQL Gateway:

./beeline
!connect jdbc:hive2://sql-gateway-ip:10000/default;auth=noSasl

接下來會(huì)詢問使用的用戶名和密碼犀农。由于當(dāng)前版本不支持認(rèn)證惰赋,可直接回車略過。連接成功之后可以像使用Hive一樣使用SQL語句。

上面是官網(wǎng)給出的使用beeline工具的方式赁濒。但本人在驗(yàn)證的過程中遇到了如下錯(cuò)誤:

2022-12-09 10:24:28,600 ERROR org.apache.flink.table.endpoint.hive.HiveServer2Endpoint     [] - Failed to GetInfo.
java.lang.UnsupportedOperationException: Unrecognized TGetInfoType value: CLI_ODBC_KEYWORDS.
        at org.apache.flink.table.endpoint.hive.HiveServer2Endpoint.GetInfo(HiveServer2Endpoint.java:371) [flink-connector-hive_2.12-1.16.0.jar:1.16.0]
        at org.apache.hive.service.rpc.thrift.TCLIService$Processor$GetInfo.getResult(TCLIService.java:1537) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
        at org.apache.hive.service.rpc.thrift.TCLIService$Processor$GetInfo.getResult(TCLIService.java:1522) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
2022-12-09 10:24:28,600 ERROR org.apache.thrift.server.TThreadPoolServer                   [] - Thrift error occurred during processing of message.
org.apache.thrift.protocol.TProtocolException: Required field 'infoValue' is unset! Struct:TGetInfoResp(status:TStatus(statusCode:ERROR_STATUS, infoMessages:[*java.lang.UnsupportedOperationException:Unrecognized TGetInfoType value: CLI_ODBC_KEYWORDS.:9:8, org.apache.flink.table.endpoint.hive.HiveServer2Endpoint:GetInfo:HiveServer2Endpoint.java:371, org.apache.hive.service.rpc.thrift.TCLIService$Processor$GetInfo:getResult:TCLIService.java:1537, org.apache.hive.service.rpc.thrift.TCLIService$Processor$GetInfo:getResult:TCLIService.java:1522, org.apache.thrift.ProcessFunction:process:ProcessFunction.java:39, org.apache.thrift.TBaseProcessor:process:TBaseProcessor.java:39, org.apache.thrift.server.TThreadPoolServer$WorkerProcess:run:TThreadPoolServer.java:286, java.util.concurrent.ThreadPoolExecutor:runWorker:ThreadPoolExecutor.java:1142, java.util.concurrent.ThreadPoolExecutor$Worker:run:ThreadPoolExecutor.java:617, java.lang.Thread:run:Thread.java:745], errorMessage:Unrecognized TGetInfoType value: CLI_ODBC_KEYWORDS.), infoValue:null)
        at org.apache.hive.service.rpc.thrift.TGetInfoResp.validate(TGetInfoResp.java:379) ~[hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
        at org.apache.hive.service.rpc.thrift.TCLIService$GetInfo_result.validate(TCLIService.java:5228) ~[hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
        at org.apache.hive.service.rpc.thrift.TCLIService$GetInfo_result$GetInfo_resultStandardScheme.write(TCLIService.java:5285) ~[hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
        at org.apache.hive.service.rpc.thrift.TCLIService$GetInfo_result$GetInfo_resultStandardScheme.write(TCLIService.java:5254) ~[hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
        at org.apache.hive.service.rpc.thrift.TCLIService$GetInfo_result.write(TCLIService.java:5205) ~[hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:53) ~[hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) ~[hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
2022-12-09 10:24:28,600 WARN  org.apache.thrift.transport.TIOStreamTransport               [] - Error closing output stream.
java.net.SocketException: Socket closed
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:118) ~[?:1.8.0_121]
        at java.net.SocketOutputStream.write(SocketOutputStream.java:155) ~[?:1.8.0_121]
        at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) ~[?:1.8.0_121]
        at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) ~[?:1.8.0_121]
        at java.io.FilterOutputStream.close(FilterOutputStream.java:158) ~[?:1.8.0_121]
        at org.apache.thrift.transport.TIOStreamTransport.close(TIOStreamTransport.java:110) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
        at org.apache.thrift.transport.TSocket.close(TSocket.java:235) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:303) [hive-exec-3.1.0.3.0.1.0-187.jar:3.1.0.3.0.1.0-187]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_121]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_121]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]

調(diào)查這個(gè)錯(cuò)誤發(fā)現(xiàn)是Flnk 1.16.0版本的bug轨奄。這個(gè)問題鏈接為FLINK-29839。社區(qū)已經(jīng)在1.16.1版本中解決拒炎。

本博客為作者原創(chuàng)挪拟,歡迎大家參與討論和批評(píng)指正。如需轉(zhuǎn)載請注明出處击你。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末玉组,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子丁侄,更是在濱河造成了極大的恐慌惯雳,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,490評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件鸿摇,死亡現(xiàn)場離奇詭異石景,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)拙吉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,581評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門潮孽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人筷黔,你說我怎么就攤上這事往史。” “怎么了佛舱?”我有些...
    開封第一講書人閱讀 165,830評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵怠堪,是天一觀的道長。 經(jīng)常有香客問我名眉,道長粟矿,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,957評(píng)論 1 295
  • 正文 為了忘掉前任损拢,我火速辦了婚禮陌粹,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘福压。我一直安慰自己掏秩,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,974評(píng)論 6 393
  • 文/花漫 我一把揭開白布荆姆。 她就那樣靜靜地躺著蒙幻,像睡著了一般。 火紅的嫁衣襯著肌膚如雪胆筒。 梳的紋絲不亂的頭發(fā)上邮破,一...
    開封第一講書人閱讀 51,754評(píng)論 1 307
  • 那天渗柿,我揣著相機(jī)與錄音栓霜,去河邊找鬼。 笑死,一個(gè)胖子當(dāng)著我的面吹牛雹有,可吹牛的內(nèi)容都是我干的潭陪。 我是一名探鬼主播旺遮,決...
    沈念sama閱讀 40,464評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼陨界,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了镊辕?” 一聲冷哼從身側(cè)響起油够,我...
    開封第一講書人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎征懈,沒想到半個(gè)月后叠聋,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,847評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡受裹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,995評(píng)論 3 338
  • 正文 我和宋清朗相戀三年碌补,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片棉饶。...
    茶點(diǎn)故事閱讀 40,137評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡厦章,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出照藻,到底是詐尸還是另有隱情袜啃,我是刑警寧澤,帶...
    沈念sama閱讀 35,819評(píng)論 5 346
  • 正文 年R本政府宣布幸缕,位于F島的核電站群发,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏发乔。R本人自食惡果不足惜熟妓,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,482評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望栏尚。 院中可真熱鬧起愈,春花似錦、人聲如沸译仗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,023評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽纵菌。三九已至阐污,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間咱圆,已是汗流浹背笛辟。 一陣腳步聲響...
    開封第一講書人閱讀 33,149評(píng)論 1 272
  • 我被黑心中介騙來泰國打工功氨, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人隘膘。 一個(gè)月前我還...
    沈念sama閱讀 48,409評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像杠览,于是被迫代替她去往敵國和親弯菊。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,086評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容