Flink 使用介紹相關(guān)文檔目錄
前言
本篇我們配置Zeppelin環(huán)境,實現(xiàn)Zeppelin可視化提交作業(yè)到Flink集群怕轿,操作Hudi表。
Zeppelin Impersonation
官網(wǎng)鏈接:Apache Zeppelin 0.10.1 Documentation: Impersonation
通過Zeppelin Impersonation脱吱,用戶能夠以登錄Zeppelin的身份來運行作業(yè)。
需要的配置:
- Linux中創(chuàng)建了目標(biāo)用戶艳悔,且開啟了zeppelin服務(wù)啟動用戶到目標(biāo)用戶的ssh免密登錄急凰。
- Zeppelin配置了認(rèn)證女仰。例如Shiro的固定用戶猜年,或者是LDAP等。
- 對應(yīng)的interpreter配置為
The interpreter will be instantiated Per User in isolated process
疾忍。并且勾選User Impersonate復(fù)選框乔外。
下面我們以配置Flink interpreter的user impersonation為例,講解整個配置過程一罩。Zeppelin server的啟動用戶為root
杨幼,實際執(zhí)行作業(yè)的用戶為paul
。
創(chuàng)建目標(biāo)用戶并配置免密
我們需要創(chuàng)建paul
用戶聂渊,并配置root
用戶到本機(jī)paul
用戶的免密差购。以root
用戶執(zhí)行:
# 創(chuàng)建paul用戶
useradd paul
# 修改paul用戶的密碼
passwd paul
# 生成密鑰
ssh-keygen
# 配置root到paul的免密
ssh-copy-id paul@localhost # 執(zhí)行后輸入paul用戶密碼
配置完畢后執(zhí)行測試:
ssh paul@localhost
如果成功切換到paul用戶并且沒有彈出密碼輸入提示,說明配置成功汉嗽。
配置目標(biāo)用戶的權(quán)限
新建的目標(biāo)用戶默認(rèn)是沒有訪問HDFS和提交Yarn作業(yè)到隊列權(quán)限的欲逃。在進(jìn)行下一步之前,需要通過Ranger配置paul
用戶的HDFS和Yarn隊列提交作業(yè)權(quán)限饼暑。
Zeppelin配置認(rèn)證
執(zhí)行:
mv $ZEPPELIN_HOME/conf/shiro.ini.template $ZEPPELIN_HOME/conf/shiro.ini
然后編輯shiro.ini
稳析,修改[users]
部分:
[users]
paul = password, admin
這里配置了Zeppelin的paul
用戶洗做,密碼為password
,具有admin
角色彰居。
然后執(zhí)行$ZEPPELIN_HOME/bin/zeppelin-daemon.sh restart
命令诚纸,重啟Zeppelin服務(wù)。
重啟完之后陈惰,可以通過web頁面使用paul
用戶登錄畦徘。
如果要使用LDAP用戶認(rèn)證,請參考:Zeppelin 集成 LDAP(FreeIPA)奴潘。
配置用戶名密碼需要注意的是旧烧,直接在shiro.ini
中配置明文密碼可能會導(dǎo)致敏感內(nèi)容泄露挪圾。為了解決這個問題Shiro支持配置Hash之后的密碼趾浅,例如:
$shiro1$SHA-256$500000$xZupSS3XInqQoMHK4NAosQ==$bD3n16AfQD2zQ7FdiP2uPh8GWL4dkILfTXAB/lXEZ7c=
支持這個特性需要在shiro.ini
的[main]
一節(jié)中添加:
passwordMatcher = org.apache.shiro.authc.credential.PasswordMatcher
iniRealm.credentialsMatcher = $passwordMatcher
隨之而來的問題是我們?nèi)绾紊蛇@一串Hash之后的密碼雌桑“颖瑁可以使用Shiro提供的DefaultPasswordService
驻售。接下來舉個例子笙各。
新建Maven項目引入如下依賴:
<dependency>
<groupId>org.apache.shiro</groupId>
<artifactId>shiro-core</artifactId>
<version>1.13.0</version>
</dependency>
然后編寫如下代碼:
DefaultPasswordService defaultPasswordService = new DefaultPasswordService();
String encryptedPassword = defaultPasswordService.encryptPassword("password");
// 這里就是Shiro可以直接使用的Hash之后的password
System.out.println(encryptedPassword);
// 使用這種方式校驗密碼是否匹配
boolean matched = defaultPasswordService.passwordsMatch("password", encryptedPassword);
System.out.println(matched);
注意:如果運行多次上面的代碼托猩,每次得到的
encryptedPassword
都是不同的青抛,這是因為每次生成的時候會使用隨機(jī)的salt肉微。校驗密碼是否匹配不受隨機(jī)salt的影響匾鸥,因為可以從Hash之后的密碼字符串中解析出salt。
配置Flink Interpreter
點擊Zeppelin web界面右上角菜單碉纳,選擇Interpreter
勿负,然后搜索flink
。然后點擊右側(cè)的edit
按鈕劳曹,修改interpreter配置如下:
兩個下拉框分別選擇Per User
和isolated
奴愉,然后勾選User Impersonate
復(fù)選框。
Zeppelin Flink HUDI整合使用
這里我們使用Flink on yarn模式铁孵,通過Zeppelin編寫SQL提交作業(yè)到Y(jié)arn集群锭硼,以paul用戶執(zhí)行。
配置zeppelin-env.sh
執(zhí)行mv $ZEPPELIN_HOME/conf/zeppelin-env.sh.template $ZEPPELIN_HOME/conf/zeppelin-env.sh
蜕劝,然后編輯這個文件檀头。修改如下內(nèi)容:
export JAVA_HOME=/opt/zy/jdk8u342-b07 # Zeppelin對使用的JDK版本有特殊要求,需要專門配置
export HADOOP_CONF_DIR=/usr/hdp/3.0.1.0-187/hadoop/conf # core-site.xml等配置文件所在的目錄
export USE_HADOOP=true # Whether include hadoop jars into zeppelin server process. (true or false)
export FLINK_CONF_DIR=/opt/zy/flink-1.13.2/conf # 必須配置岖沛,否則會遇到問題
注意暑始,使用這種方式必須要配置FLINK_CONF_DIR
環(huán)境變量。否則會出現(xiàn)如下錯誤婴削,找不到配置文件目錄廊镜。
WARN [2022-08-02 23:25:45,537] ({SchedulerFactory43} NotebookServer.java[onStatusChange]:1984) - Job paragraph_1580998080340_1531975932 is finished, status: ERROR, exception: null, result: %text org.apache.zeppelin.interpreter.InterpreterException: org.apache.zeppelin.interpreter.InterpreterException: Fail to open FlinkInterpreter
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:76)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:844)
at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:752)
at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
at org.apache.zeppelin.scheduler.FIFOScheduler.lambda$runJobInScheduler$0(FIFOScheduler.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.zeppelin.interpreter.InterpreterException: Fail to open FlinkInterpreter
at org.apache.zeppelin.flink.FlinkInterpreter.open(FlinkInterpreter.java:80)
at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70)
... 8 more
Caused by: org.apache.flink.configuration.IllegalConfigurationException: The given configuration directory name '' (/home/paul) does not describe an existing directory.
at org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:119)
at org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:94)
at org.apache.zeppelin.flink.FlinkScalaInterpreter.initFlinkConfig(FlinkScalaInterpreter.scala:216)
at org.apache.zeppelin.flink.FlinkScalaInterpreter.open(FlinkScalaInterpreter.scala:122)
at org.apache.zeppelin.flink.FlinkInterpreter.open(FlinkInterpreter.java:75)
配置修改后需要重啟Zeppelin服務(wù)。
配置Flink interpreter
按照上面的章節(jié)打開Flink inteterpreter配置頁面馆蠕。有以下配置項需要特別注意:
- FLINK_HOME: Flink安裝目錄期升。
- HADOOP_CONF_DIR: Hadoop配置文件所在目錄惊奇,例如
/usr/hdp/3.0.1.0-187/hadoop/conf
。 - HIVE_CONF_DIR: Hive配置文件所在目錄播赁,例如
/usr/hdp/3.0.1.0-187/hive/conf
颂郎。 - flink.execution.mode: Flink執(zhí)行模式。采用yarn模式填寫
yarn
容为。 - flink.execution.jars: Flink用戶作業(yè)依賴的jar乓序,多個jar需要使用逗號分隔(注意不是冒號)。例如connector-kafka, connector-hive, HUDI依賴坎背,table依賴等替劈。這個配置項很重要,否則執(zhí)行作業(yè)的時候會出現(xiàn)class not found等眾多問題得滤。
- zeppelin.flink.enableHive: 是否啟用Flink的Hive支持陨献。我們需要使用Hive metastore,所以配置為
true
懂更。 - zeppelin.flink.hive.version: 配套使用的Hive版本眨业,例如
3.1.0
。
特別注意:Flink Interpreter執(zhí)行的時候如果提示缺少jar包沮协,需要修改Flink interpreter的flink.execution.jars
為類似如下配置:
/opt/flink-1.13.2/lib/flink-sql-connector-hive-3.1.2_2.11-1.13.2.jar,/opt/flink-1.13.2/lib/flink-connector-files-1.13.2.jar,/opt/flink-1.13.2/lib/flink-table-blink_2.11-1.13.2.jar,/opt/flink-1.13.2/lib/hadoop-mapreduce-client-core-3.1.1.3.0.1.0-187.jar,/opt/flink-1.13.2/lib/hudi-flink1.13-bundle_2.11-0.11.1.jar
也就是說除了Flink自身的jar外龄捡,作業(yè)依賴的額外jar都必須通過flink.execution.jars
配置項引入。僅僅將這些jar放置于Flink的lib
目錄下是不行的慷暂。
運行Flink SQL查詢Hudi表數(shù)據(jù)
在Zeppelin中創(chuàng)建一個新的note聘殖,default interperter選擇Flink。然后編寫如下SQL:
%flink.ssql
show catalogs;
use catalog hive;
show tables;
如果能查到Hive catalog中的表行瑞,說明Flink interperter的Hive配置是正確的奸腺。
然后我們嘗試查詢一個已存在的Hudi表,例如hudi_cow
蘑辑。編寫如下SQL:
%flink.ssql
use catalog hive;
select * from hudi_cow;
如果上述配置過程正確洋机,這里可以查到數(shù)據(jù)坠宴。
同樣洋魂,create和insert等語句也是可以執(zhí)行成功的,這里就不再貼出截圖喜鼓。
本博客為作者原創(chuàng)副砍,歡迎大家參與討論和批評指正。如需轉(zhuǎn)載請注明出處庄岖。