背景
linux下使用kettle(pentaho data integration)將mysql數(shù)據(jù)巾遭,通過 Parquet Output 導(dǎo)入CDH 6.2集群hive的parquet表中。
解決了直接用 Parquet Output 導(dǎo)入hive parquet 表中字段不匹配的問題吼和。
步驟:
1. 下載 pdi 8.3骑素,解壓后賦予相應(yīng)的權(quán)限
unzip pdi-ce-8.3.0.0-371.zip
cd data-integration/
chmod +x -R *.sh
在vnc中,data-integration目錄執(zhí)行 ./spoon.sh
末捣,打開操作界面创橄。
2. 新建轉(zhuǎn)換,選擇“主對(duì)象樹”卒茬,配置DB連接(mysql連接)
填寫對(duì)應(yīng)的mysql連接信息咖熟。點(diǎn)擊測試顯示正確連接柳畔。
如果此處報(bào)錯(cuò)缺少msyql驅(qū)動(dòng),則把 mssql-jdbc-7.0.0.jre10.jar
拷貝到 data-integration/lib 下确沸,注意 .jar 權(quán)限俘陷,重啟 pdi 有效。
3. 編輯轉(zhuǎn)換
選擇“核心對(duì)象”桨菜,從“輸入”中拖出“表輸入”,雙擊打開
選擇上一步創(chuàng)建的mysql連接泻红,并填寫sql語句霞掺。可以點(diǎn)擊預(yù)覽查看數(shù)據(jù)缠劝。
4. 連接到 hive 輸出
4.1 更新pdi中CDH集群的配置文件
從 "Cloudera Manager -- HDFS -- 下載客戶端配置" 下載 core-site.xml
和 hdfs-site.xml
更新到 pdi 目錄下的 data-integration\plugins\pentaho-big-data-plugin\hadoop-configurations\cdh61
打開 core-site.xml
和 hdfs-site.xml
惨恭,確認(rèn)所有的 url 都是對(duì)應(yīng)的 hostname 而不是 localhost
4.2 連接CDH集群
在 “主對(duì)象樹 -- Hadoop clusters” 下新建 cluster矿卑,填寫對(duì)應(yīng)信息
注意:此處所有 hostname 不能寫 localhost
,要改為對(duì)應(yīng)的 hostname
點(diǎn)擊測試轻黑,查看連接
可以看到 HDFS 已經(jīng)成功連接琴昆。
4.3 新建 Parquet Output
在 “核心對(duì)象 -- Big Data” 下拖出 Parquet Output,按住 shift 單擊 "mysql輸入"抖拦,拖出連接線連到 "Parquet Output"舷暮,雙擊打開
點(diǎn)擊“瀏覽”下面,選擇之前配置好的 cluster,選擇目標(biāo)地址沥割。
這里的地址是到文件名的,但不包含后綴帜讲,后續(xù)導(dǎo)入到hive中時(shí)會(huì)自動(dòng)添加后綴名
所以需要在選好的地址后手動(dòng)在地址后面添加 /<filename>
注意:這里有個(gè)BUG椒拗,就是不能將數(shù)據(jù)直接導(dǎo)在已有的表中获黔,會(huì)報(bào)字段類型不匹配的問題玩郊。因?yàn)閔ive中已有表的字段類型(比如 INT)和parquet表的字段類型(比如 Int64译红,見下圖)不一致。
直接導(dǎo)入表侦厚,刷新數(shù)據(jù)后查詢的報(bào)錯(cuò)
[slave01:21000] test> select * from log_operation;
Query: select * from log_operation
Query submitted at: 2019-11-25 15:01:38 (Coordinator: http://slave01:25000)
Query progress can be monitored at: http://slave01:25000/query_plan?query_id=af4f332a5f30f746:7f0a5b4900000000
ERROR: File 'hdfs://master:8020/user/hive/warehouse/test.db/log_operation/f.parquet' has an incompatible Parquet schema for column 'test.log_operation.instance_id'. Column type: BIGINT, Parquet schema:
optional byte_array instance_id [i:1 d:1 r:0]
對(duì)于這個(gè)問題,筆者正巧因工作原因咨詢過Pentaho負(fù)責(zé)開發(fā)pdi的工作人員诗宣,得到答案可通過將源表里的數(shù)據(jù)都轉(zhuǎn)為String暫時(shí)克服想诅。不過筆者通過另一種方式繞過了這個(gè)BUG,下面介紹這個(gè)曲線救國方法篮灼。
筆者通過先將表導(dǎo)入一個(gè)臨時(shí)的文件夾內(nèi)(如上一步選路徑所示徘禁,選數(shù)據(jù)庫下的臨時(shí)文件夾xxx.db/tmp
。實(shí)際是選到數(shù)據(jù)庫送朱,手動(dòng)添加后面的路徑),然后通過sql(impala sql 或 hive sql)根據(jù)導(dǎo)入的文件創(chuàng)建表炮沐,然后再把/tmp
下的文件加載過來回怜,就完成了,順便繞過了上面所說字段不一致的BUG。
下面繼續(xù)
選好地址后专控,回到 Parquet Output 界面,點(diǎn)擊Get Fields
自動(dòng)獲取字段(這里可以看到parquet的數(shù)據(jù)格式)赢底。然后確定。
(定時(shí)任務(wù)的話幸冻,可以通過Option那個(gè)tab下勾選“文件名包含時(shí)間”等選項(xiàng)避免每次導(dǎo)入的文件因重名而覆蓋)
5. 新建 “執(zhí)行sql語句”
由于筆者用的是impala洽损,所以本步驟是在講連接到impala執(zhí)行sql。使用hive的話相應(yīng)步驟改為hive就可以流码,是一樣的延刘。
5.1 連接impala
將 impala jdbc 驅(qū)動(dòng) ImpalaJDBC41.jar
拷貝到 data-integration/lib
和 data-integration/plugins/pentaho-big-data-plugin/lib
和 data-integration/plugins/pentaho-big-data-plugin/hadoop-configurations/cdh61/lib
下。注意權(quán)限碘赖,重啟有效。
按照步驟2播掷,選擇“主對(duì)象樹”劫哼,配置DB連接,連接到 impala
5.2 新建 “執(zhí)行sql腳本”
拖出“核心對(duì)象” -- “腳本” -- “執(zhí)行sql腳本”
雙擊打開眯亦,填入sql腳本
create table log_operation
like parquetfile '/user/hive/warehouse/test.db/tmp/init.parquet'
stored as parquet
location '/user/hive/warehouse/test.db/log_operation';
load data inpath '/user/hive/warehouse/test.db/tmp/init.parquet'
into table log_operation;
refresh log_operation;
三個(gè)命令分別是:
- 按照導(dǎo)入文件(
test.db/tmp/init.parquet
)的格式創(chuàng)建表 - 將文件導(dǎo)入到創(chuàng)建的表中
- 刷新表
這樣建的表和parquet表的字段格式是匹配的般码,即使后續(xù)繼續(xù)往表中直接導(dǎo)入增量數(shù)據(jù)(同樣通過 parquet output)板祝,也不會(huì)出現(xiàn)字段格式不匹配的錯(cuò)誤。
5.3 加入“停止等待”
pdi 轉(zhuǎn)換的 執(zhí)行sql語句 雖然連接在 Parquet Output 之后券时,但確是并行執(zhí)行的。因此需要加入停止等待捌袜,等待 Parquet Output 執(zhí)行完后炸枣,再執(zhí)行sql弄唧。
至此霍衫,可以直接點(diǎn)擊轉(zhuǎn)換下的運(yùn)行圖標(biāo)運(yùn)行。
也可以打包成作業(yè)澄干,方便調(diào)度峰髓。
6. 新建作業(yè)
保存之前的轉(zhuǎn)換。新建作業(yè)携兵。
從“核心對(duì)象”拖出 start 和 轉(zhuǎn)換,雙擊轉(zhuǎn)換配置静檬,添加剛剛保存的轉(zhuǎn)換并级。確定。
執(zhí)行作業(yè)嘲碧。
作為job望抽,可以在 Start
里設(shè)置調(diào)度時(shí)間履婉,然后運(yùn)行就可以了。