文章作者:王亮 , 版權(quán)歸原作者所有域携,未經(jīng)作者同意,請勿轉(zhuǎn)載
文章來源:騰訊云技術(shù)社區(qū)——騰云閣:https://www.qcloud.com/community
原文鏈接:https://www.qcloud.com/community/article/220
Oracle里存儲的結(jié)構(gòu)化數(shù)據(jù)導(dǎo)出到Hadoop體系做離線計(jì)算是一種常見數(shù)據(jù)處置手段敷存。近期有場景需要做Oracle到Hadoop體系的實(shí)時(shí)導(dǎo)入腔剂,這里以此案例做以介紹。
Oracle作為商業(yè)化的數(shù)據(jù)庫解決方案靡砌,自發(fā)性的獲取數(shù)據(jù)庫事務(wù)日志等比較困難,故選擇官方提供的同步工具OGG(Oracle GoldenGate)來解決珊楼。
安裝與基本配置
環(huán)境說明
軟件配置
角色 | 數(shù)據(jù)存儲服務(wù)及版本 | OGG版本 | IP |
---|---|---|---|
源服務(wù)器 | OracleRelease11.2.0.1 | Oracle GoldenGate 11.2.1.0 for Oracle on Linux x86-64 | 10.0.0.25 |
目標(biāo)服務(wù)器 | Hadoop 2.7.2 | Oracle GoldenGate for Big Data 12.2.0.1 on Linux x86-64 | 10.0.0.2 |
以上源服務(wù)器上OGG安裝在Oracle用戶下通殃,目標(biāo)服務(wù)器上OGG安裝在root用戶下。
注意
Oracle導(dǎo)出到異構(gòu)的存儲系統(tǒng)厕宗,如MySQL画舌,DB2,PG等以及對應(yīng)的不同平臺已慢,如AIX曲聂,Windows,Linux等官方都有提供對應(yīng)的Oracle GoldenGate版本佑惠,可在這里或者在舊版本查詢下載安裝朋腋。
Oracle源端基礎(chǔ)配置
將下載到的對應(yīng)OGG版本放在方便的位置并解壓,本示例Oracle源端最終的解壓目錄為/u01/gg膜楷。
- 配置環(huán)境變量
這里的環(huán)境變量主要是對執(zhí)行OGG的用戶添加OGG相關(guān)的環(huán)境變量旭咽,本示例為Oracle用戶添加的環(huán)境變量如下:(/home/oracle/.bash_profile
文件)
export OGG_HOME=/u01/gg/
export LD_LIBRARY_PATH=$ORACLE_HOME/lib:$OGG_HOME:/lib:/usr/lib
export CLASSPATH=$ORACLE_HOME/jdk/jre:$ORACLE_HOME/jlib:$ORACLE_HOME/rdbms/jlib
- Oracle打開歸檔模式
使用如下命令查看當(dāng)前是否為歸檔模式(archive)
SQL> archive log list
Database log mode Archive Mode
Automatic archival Enabled
Archive destination /u01/arch_log
Oldest online log sequence 6
Next log sequence to archive 8
Current log sequence 8
如非以上狀態(tài),手動調(diào)整即可
SQL> conn / as sysdba(以DBA身份連接數(shù)據(jù)庫)
SQL> shutdown immediate(立即關(guān)閉數(shù)據(jù)庫)
SQL> startup mount(啟動實(shí)例并加載數(shù)據(jù)庫赌厅,但不打開)
SQL> alter database archivelog(更改數(shù)據(jù)庫為歸檔模式)
SQL> alter database open(打開數(shù)據(jù)庫)
SQL> alter system archive log start(啟用自動歸檔)
- Oracle打開日志相關(guān)
OGG基于輔助日志等進(jìn)行實(shí)時(shí)傳輸穷绵,故需要打開相關(guān)日志確保可獲取事務(wù)內(nèi)容特愿。通過一下命令查看當(dāng)前狀態(tài):
SQL> select force_logging, supplemental_log_data_min from v$database;
FOR SUPPLEME--- --------
YES YES
如果以上查詢結(jié)果非YES仲墨,可通過以下命令修改狀態(tài):
SQL> alter database force logging;
SQL> alter database add supplemental log data;
- Oracle創(chuàng)建復(fù)制用戶
為了使Oracle里用戶的復(fù)制權(quán)限更加單純勾缭,故專門創(chuàng)建復(fù)制用戶,并賦予dba權(quán)限
SQL> create tablespaceoggtbsdatafile '/u01/app/oracle/oradata/orcl/oggtbs01.dbf' size 1000M autoextend on;
SQL> create user ggs identified by ggs default tablespaceoggtbs;
User created.
SQL> grant dba to ggs;
Grant succeeded.
最終這個(gè)ggs帳號的權(quán)限如下所示:
SQL> select * from dba_sys_privs where GRANTEE='GGS';
GRANTEE PRIVILEGE ADM
GGS DROP ANY DIRECTORY NO
GGS ALTER ANY TABLE NO
GGS ALTER SESSION NO
GGS SELECT ANY DICTIONARY NO
GGS CREATE ANY DIRECTORY NO
GGS RESTRICTED SESSION NO
GGS FLASHBACK ANY TABLE NO
GGS UPDATE ANY TABLE NO
GGS DELETE ANY TABLE NO
GGS CREATE TABLE NO
GGS INSERT ANY TABLE NO
GRANTEE PRIVILEGE ADM
GGS UNLIMITED TABLESPACE NO
GGS CREATE SESSION NO
GGS SELECT ANY TABLE NO
- OGG初始化
進(jìn)入OGG的主目錄執(zhí)行./ggsci目养,進(jìn)入OGG命令行
[oracle@VM_0_25_centos gg]$ ./ggsci
Oracle GoldenGate Command Interpreter for Oracle
Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258_FBO
Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 20:20:21
Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.
GGSCI (VM_0_25_centos) 1>
執(zhí)行create subdirs進(jìn)行目錄創(chuàng)建
GGSCI (VM_0_25_centos) 4> create subdirs
Creating subdirectories under current directory /u01/gg
Parameter files /u01/gg/dirprm: already exists
Report files /u01/gg/dirrpt: already exists
Checkpoint files /u01/gg/dirchk: already exists
Process status files /u01/gg/dirpcs: already exists
SQL script files /u01/gg/dirsql: already exists
Database definitions files /u01/gg/dirdef: already exists
Extract data files /u01/gg/dirdat: already exists
Temporary files /u01/gg/dirtmp: already exists
Stdout files /u01/gg/dirout: already exists
- Oracle創(chuàng)建模擬復(fù)制庫表
模擬建一個(gè)用戶叫tcloud俩由,密碼tcloud,同時(shí)基于這個(gè)用戶建一張表混稽,叫t_ogg
采驻。
SQL> create user tcloud identified by tcloud default tablespace users;
User created.
SQL> grant dba to tcloud;
Grant succeeded.
SQL> conn tcloud/tcloud;
Connected.
SQL> create table t_ogg(id int ,text_name varchar(20),primary key(id));
Table created.
目標(biāo)端基礎(chǔ)配置
將下載到的對應(yīng)OGG版本放在方便的位置并解壓,本示例Oracle目標(biāo)端最終的解壓目錄為/data/gg
匈勋。
- 配置環(huán)境變量
這里需要用到HDFS相關(guān)的庫礼旅,故需要配置java環(huán)境變量以及OGG相關(guān),并引入HDFS的相關(guān)庫文件洽洁,參考配置如下:
export JAVA_HOME=/usr/java/jdk1.7.0_75/
export LD_LIBRARY_PATH=/usr/java/jdk1.7.0_75/jre/lib/amd64:/usr/java/jdk1.7.0_75/jre/lib/amd64/server:/usr/java/jdk1.7.0_75/jre/lib/amd64/libjsig.so:/usr/java/jdk1.7.0_75/jre/lib/amd64/server/libjvm.so:$OGG_HOME:/lib
export OGG_HOME=/data/gg
- OGG初始化
目標(biāo)端的OGG初始化和源端類似進(jìn)入OGG的主目錄執(zhí)行./ggsci
痘系,進(jìn)入OGG命令行
GGSCI (10.0.0.2) 2> create subdirs
Creating subdirectories under current directory /data/gg
Parameter files /data/gg/dirprm: already exists
Report files /data/gg/dirrpt: already exists
Checkpoint files /data/gg/dirchk: already exists
Process status files /data/gg/dirpcs: already exists
SQL script files /data/gg/dirsql: already exists
Database definitions files /data/gg/dirdef: already exists
Extract data files /data/gg/dirdat: already exists
Temporary files /data/gg/dirtmp: already exists
Credential store files /data/gg/dircrd: already exists
Masterkey wallet files /data/gg/dirwlt: already exists
Dump files /data/gg/dirdmp: already exists
Oracle源配置
Oracle實(shí)時(shí)傳輸?shù)紿adoop集群(HDFS,Hive饿自,Kafka等)的基本原理如圖:
根據(jù)如上原理汰翠,配置大概分為如下步驟:源端目標(biāo)端配置ogg管理器(mgr);源端配置extract進(jìn)程進(jìn)行Oracle日志抓日汛啤复唤;源端配置pump進(jìn)程傳輸抓取內(nèi)容到目標(biāo)端;目標(biāo)端配置replicate進(jìn)程復(fù)制日志到Hadoop集群或者復(fù)制到用戶自定義的解析器將最終結(jié)果落入到Hadoop集群烛卧。
配置全局變量
在源端服務(wù)器OGG主目錄下佛纫,執(zhí)行./ggsci
到OGG命令行下,執(zhí)行如下命令:
GGSCI (VM_0_25_centos) 1> dblogin userid ggs password ggs
Successfully logged into database.
GGSCI (VM_0_25_centos) 3> view params ./globals
ggschema ggs
```
其中`./globals`變量沒有的話可以用`edit params ./globals`來編輯添加即可(編輯器默認(rèn)使用的vim)
#### 配置管理器mgr
在OGG命令行下執(zhí)行如下命令:
```
GGSCI (VM_0_25_centos) 4> edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
```
說明:PORT即mgr的默認(rèn)監(jiān)聽端口总放;DYNAMICPORTLIST動態(tài)端口列表呈宇,當(dāng)指定的mgr端口不可用時(shí),會在這個(gè)端口列表中選擇一個(gè)局雄,最大指定范圍為256個(gè)甥啄;AUTORESTART重啟參數(shù)設(shè)置表示重啟所有EXTRACT進(jìn)程,最多5次炬搭,每次間隔3分鐘蜈漓;PURGEOLDEXTRACTS即TRAIL文件的定期清理
在命令行下執(zhí)行start mgr即可啟動管理進(jìn)程,通過info mgr可查看mgr狀態(tài)
```
GGSCI (VM_0_25_centos) 5> info mgr
Manager is running (IP port VM_0_25_centos.7809).
```
#### 添加復(fù)制表
在OGG命令行下執(zhí)行添加需要復(fù)制的表的操作尚蝌,如下:
```
GGSCI (VM_0_25_centos) 7> add trandata tcloud.t_ogg
Logging of supplemental redo data enabled for table TCLOUD.T_OGG.
GGSCI (VM_0_25_centos) 8> info trandata tcloud.t_ogg
Logging of supplemental redo log data is enabled for table TCLOUD.T_OGG.
Columns supplementally logged for table TCLOUD.T_OGG: ID.
```
#### 配置extract進(jìn)程
配置extract進(jìn)程OGG命令行下執(zhí)行如下命令:
```
GGSCI (VM_0_25_centos) 10> edit params ext2hd
extract ext2hd
dynamicresolution
SETENV (ORACLE_SID = "orcl")
SETENV (NLS_LANG = "american_america.AL32UTF8")
userid ggs,password ggs
exttrail /u01/gg/dirdat/tc
table tcloud.t_ogg;
```
說明:第一行指定extract進(jìn)程名稱迎变;dynamicresolution動態(tài)解析;SETENV設(shè)置環(huán)境變量飘言,這里分別設(shè)置了Oracle數(shù)據(jù)庫以及字符集衣形;userid ggs,password ggs即OGG連接Oracle數(shù)據(jù)庫的帳號密碼,這里使用2.3.4中特意創(chuàng)建的復(fù)制帳號;exttrail定義trail文件的保存位置以及文件名谆吴,注意這里文件名只能是2個(gè)字母倒源,其余部分OGG會補(bǔ)齊;table即復(fù)制表的表明句狼,支持`*`通配笋熬,必須以;結(jié)尾
接下來在OGG命令行執(zhí)行如下命令添加extract進(jìn)程:
```
GGSCI (VM_0_25_centos) 11> add extract ext2hd,tranlog,begin now
EXTRACT added.```
最后添加trail文件的定義與extract進(jìn)程綁定:
```
GGSCI (VM_0_25_centos) 12> add exttrail /u01/gg/dirdat/tc,extract ext2hd
EXTTRAIL added```
可在OGG命令行下通過info命令查看狀態(tài):
```
GGSCI (VM_0_25_centos) 14> info ext2hd
EXTRACT EXT2HD Initialized 2016-11-09 15:37 Status STOPPED
Checkpoint Lag 00:00:00 (updated 00:02:32 ago)
Log Read Checkpoint Oracle Redo Logs
2016-11-09 15:37:14 Seqno 0, RBA 0
SCN 0.0 (0)
```
#### 配置pump進(jìn)程
pump進(jìn)程本質(zhì)上來說也是一個(gè)extract,只不過他的作用僅僅是把trail文件傳遞到目標(biāo)端腻菇,配置過程和extract進(jìn)程類似胳螟,只是邏輯上稱之為pump進(jìn)程
在OGG命令行下執(zhí)行:
```
GGSCI (VM_0_25_centos) 16> edit params push2hd
extract push2hd
passthru
dynamicresolution
userid ggs,password ggs
rmthost 10.0.0.2 mgrport 7809
rmttrail /data/gg/dirdat/tc
table tcloud.t_ogg;
```
說明:第一行指定extract進(jìn)程名稱;passthru即禁止OGG與Oracle交互筹吐,我們這里使用pump邏輯傳輸糖耸,故禁止即可;dynamicresolution動態(tài)解析丘薛;userid ggs,password ggs即OGG連接Oracle數(shù)據(jù)庫的帳號密碼嘉竟,這里使用2.3.4中特意創(chuàng)建的復(fù)制帳號;rmthost和mgrhost即目標(biāo)端OGG的mgr服務(wù)的地址以及監(jiān)聽端口洋侨;rmttrail即目標(biāo)端trail文件存儲位置以及名稱
分別將本地trail文件和目標(biāo)端的trail文件綁定到extract進(jìn)程:
```
GGSCI (VM_0_25_centos) 17> add extract push2hd,exttrailsource /u01/gg/dirdat/tc
EXTRACT added.
GGSCI (VM_0_25_centos) 18> add rmttrail /data/gg/dirdat/tc,extract push2hd
RMTTRAIL added.
```
同樣可以在OGG命令行下使用info查看進(jìn)程狀態(tài):
```
GGSCI (VM_0_25_centos) 19> info push2hd
EXTRACT PUSH2HD Initialized 2016-11-09 15:52 Status STOPPED
Checkpoint Lag 00:00:00 (updated 00:01:04 ago)
Log Read Checkpoint File /u01/gg/dirdat/tc000000
First Record RBA 0
```
#### 配置define文件
Oracle與MySQL舍扰,Hadoop集群(HDFS,Hive希坚,kafka等)等之間數(shù)據(jù)傳輸可以定義為異構(gòu)數(shù)據(jù)類型的傳輸边苹,故需要定義表之間的關(guān)系映射,在OGG命令行執(zhí)行:
```
GGSCI (VM_0_25_centos) 20> edit params tcloud
defsfile /u01/gg/dirdef/tcloud.t_ogg
userid ggs,password ggs
table tcloud.t_ogg;
```
在OGG主目錄下執(zhí)行:
`./defgen paramfile dirprm/tcloud.prm`
完成之后會生成這樣的文件/u01/gg/dirdef/tcloud.t_ogg裁僧,將這個(gè)文件拷貝到目標(biāo)端的OGG主目錄下的dirdef目錄即可勾给。
## 目標(biāo)端的配置
#### 創(chuàng)建目標(biāo)表(目錄)
這里主要是當(dāng)目標(biāo)端為HDFS目錄或者Hive表或者M(jìn)ySQL數(shù)據(jù)庫時(shí)需要手動先在目標(biāo)端創(chuàng)建好目錄或者表,創(chuàng)建方法都類似锅知,這里我們模擬實(shí)時(shí)傳入到HDFS目錄,故手動創(chuàng)建一個(gè)接收目錄即可
`hadoop –fs mkdir /gg/replication/hive/`
#### 配置管理器mgr
目標(biāo)端的OGG管理器(mgr)和源端的配置類似脓钾,在OGG命令行下執(zhí)行:
```
GGSCI (10.0.0.2) 2> edit params mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
```
#### 配置checkpoint
checkpoint即復(fù)制可追溯的一個(gè)偏移量記錄售睹,在全局配置里添加checkpoint表即可
```
GGSCI (10.0.0.2) 5> edit params ./GLOBALS
CHECKPOINTTABLE tcloud.checkpoint
```
保存即可
#### 配置replicate進(jìn)程
在OGG的命令行下執(zhí)行:
```
GGSCI (10.0.0.2) 8> edit params r2hdfs
REPLICAT r2hdfs
sourcedefs /data/gg/dirdef/tcloud.t_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/hdfs.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP tcloud.t_ogg, TARGET tcloud.t_ogg;
```
說明:REPLICATE r2hdfs定義rep進(jìn)程名稱;sourcedefs即在3.6中在源服務(wù)器上做的表映射文件可训;TARGETDB LIBFILE即定義HDFS一些適配性的庫文件以及配置文件昌妹,配置文件位于OGG主目錄下的`dirprm/hdfs.props`;REPORTCOUNT即復(fù)制任務(wù)的報(bào)告生成頻率握截;GROUPTRANSOPS為以事務(wù)傳輸時(shí)飞崖,事務(wù)合并的單位,減少IO操作谨胞;MAP即源端與目標(biāo)端的映射關(guān)系
其中`property=dirprm/hdfs.props`的配置中固歪,最主要的幾項(xiàng)配置及注釋如下:
```
gg.handlerlist=hdfs //OGG for Big Data中handle類型
gg.handler.hdfs.type=hdfs //OGG for Big Data中HDFS目標(biāo)
gg.handler.hdfs.rootFilePath=/gg/replication/hive/ //OGG for Big Data中HDFS存儲主目錄
gg.handler.hdfs.mode=op //OGG for Big Data中傳輸模式,即op為一次SQL傳輸一次,tx為一次事務(wù)傳輸一次
gg.handler.hdfs.format=delimitedtext //OGG for Big Data中文件傳輸格式
gg.classpath=/usr/hdp/2.2.0.0-2041/hadoop/share/hadoop/common/*:/usr/hdp/2.2.0.0-2041/hadoop/share/hadoop/common/lib/*:/usr/hdp/2.2.0.0-2041/hadoop/share/hadoop/hdfs/*:/usr/hdp/2.2.0.0-2041/hadoop/etc/hadoop/:/data/gg/:/data/gg/lib/*:/usr/hdp/2.2.0.0-2041/hadoop/client/* //OGG for Big Data中使用到的HDFS庫的定義
```
具體的OGG for Big Data支持參數(shù)以及定義可參考[地址](http://docs.oracle.com/goldengate/bd1221/gg-bd/GADBD/GUID-85A82B2E-CD51-463A-8674-3D686C3C0EC0.htm#GADBD376)
最后在OGG的命令行下執(zhí)行:
```
GGSCI (10.0.0.2) 9> add replicat r2hdfs exttrail /data/gg/dirdat/tc,checkpointtable tcloud.checkpointtab
REPLICAT added.
```
將文件與復(fù)制進(jìn)程綁定即可
## 測試
#### 啟動進(jìn)程
在源端和目標(biāo)端的OGG命令行下使用start [進(jìn)程名]的形式啟動所有進(jìn)程牢裳。
啟動順序按照源mgr——目標(biāo)mgr——源extract——源pump——目標(biāo)replicate來完成逢防。
#### 檢查進(jìn)程狀態(tài)
以上啟動完成之后,可在源端與目標(biāo)端的OGG命令行下使用info [進(jìn)程名]來查看所有進(jìn)程狀態(tài)蒲讯,如下:
源端:
```
GGSCI (VM_0_25_centos) 7> info mgr
Manager is running (IP port VM_0_25_centos.7809).
GGSCI (VM_0_25_centos) 9> info ext2hd
EXTRACT EXT2HD Last Started 2016-11-09 16:05 Status RUNNING
Checkpoint Lag 00:00:00 (updated 00:00:09 ago)
Log Read Checkpoint Oracle Redo Logs
2016-11-09 16:45:51 Seqno 8, RBA 132864000
SCN 0.1452333 (1452333)
GGSCI (VM_0_25_centos) 10> info push2hd
EXTRACT PUSH2HD Last Started 2016-11-09 16:05 Status RUNNING
Checkpoint Lag 00:00:00 (updated 00:00:01 ago)
Log Read Checkpoint File /u01/gg/dirdat/tc000000
First Record RBA 1043
```
目標(biāo)端:
```
GGSCI (10.0.0.2) 13> info mgr
Manager is running (IP port 10.0.0.2.7809, Process ID 8242).
GGSCI (10.0.0.2) 14> info r2hdfs
REPLICAT R2HDFS Last Started 2016-11-09 16:45 Status RUNNING
Checkpoint Lag 00:00:00 (updated 00:00:02 ago)
Process ID 4733
Log Read Checkpoint File /data/gg/dirdat/tc000000
First Record RBA 0
```
所有的狀態(tài)均是RUNNING即可忘朝。(當(dāng)然也可以使用info all來查看所有進(jìn)程狀態(tài))
#### 測試同步更新效果
測試方法比較簡單,直接在源端的數(shù)據(jù)表中insert判帮,update局嘁,delete操作即可。由于Oracle到Hadoop集群的同步是異構(gòu)形式晦墙,目前尚不支持truncate操作悦昵。
源端進(jìn)行insert操作
```
SQL> conn tcloud/tcloud
Connected.
SQL> select * from t_ogg;
no rows selected
SQL> desc t_ogg;
Name Null? Type
----------------------------------------- -------- ----------------------------
ID NOT NULL NUMBER(38)
TEXT_NAME VARCHAR2(20)
SQL> insert into t_ogg values(1,'test');
1 row created.
SQL> commit;
Commit complete.
```
查看源端trail文件狀態(tài)
```
[oracle@VM_0_25_centos dirdat]$ ls -l /u01/gg/dirdat/tc*
-rw-rw-rw- 1 oracle oinstall 1180 Nov 9 17:05 /u01/gg/dirdat/tc000000
```
查看目標(biāo)端trail文件狀態(tài)
```
[root@10 dirdat]# ls -l /data/gg/dirdat/tc*
-rw-r----- 1 root root 1217 Nov 9 17:05 /data/gg/dirdat/tc000000
```
查看HDFS中是否有寫入
```
hadoop fs -ls /gg/replication/hive/tcloud.t_ogg
-rw-rw-r-- 3 root hdfs 110 2016-11-09 17:05
/gg/replication/hive/tcloud.t_ogg/tcloud.t_ogg_2016-11-09_17-05-30.514.txt
```
注意:從寫入到HDFS的文件內(nèi)容看,文件的格式如下:
```
ITCLOUD.T_OGG2016-11-09 09:05:25.0670822016-11-09T17:05:30.51200000000000000000001080ID1TEXT_NAMEtest
```
很明顯Oracle的數(shù)據(jù)已準(zhǔn)實(shí)時(shí)導(dǎo)入到HDFS了偎痛。導(dǎo)入的內(nèi)容實(shí)際是一條條的類似流水日志(具體日志格式不同的傳輸格式旱捧,內(nèi)容略有差異,本例使用的delimitedtext踩麦。格式為操作符 數(shù)據(jù)庫.表名 操作時(shí)間戳(GMT+0) 當(dāng)前時(shí)間戳(GMT+8) 偏移量 字段1名稱 字段1內(nèi)容 字段2名稱 字段2內(nèi)容)枚赡,如果要和Oracle的表內(nèi)容完全一致,需要客戶手動實(shí)現(xiàn)解析日志并寫入到Hive的功能谓谦,這里官方并沒有提供適配器贫橙。目前騰訊側(cè)已實(shí)現(xiàn)該功能的開發(fā)。
當(dāng)然你可以直接把這個(gè)HDFS的路徑通過LOCATION的方式在Hive上建外表(external table)達(dá)到實(shí)時(shí)導(dǎo)入Hive的目的反粥。
## 總結(jié)
OGG for Big Data實(shí)現(xiàn)了Oracle實(shí)時(shí)同步到Hadoop體系的接口卢肃,但得到的日志目前仍需應(yīng)用層來解析(關(guān)系型數(shù)據(jù)庫如MySQL時(shí)OGG對應(yīng)版本已實(shí)現(xiàn)應(yīng)用層的解析,無需人工解析)才顿。
OGG的幾個(gè)主要進(jìn)程mgr莫湘,extract,pump郑气,replicate配置方便幅垮,可快速配置OGG與異構(gòu)關(guān)系存儲結(jié)構(gòu)的實(shí)時(shí)同步。后續(xù)如果有新增表尾组,修改對應(yīng)的extract忙芒,pump和replicate進(jìn)程即可,當(dāng)然如果是一整個(gè)庫讳侨,在配置上述2個(gè)進(jìn)程時(shí)呵萨,使用通配的方式即可。
#### 附錄
OGG到Hadoop體系的實(shí)時(shí)同步時(shí)跨跨,可在源端extract和pump進(jìn)程配置不變的情況下潮峦,直接在目標(biāo)端增加replicate進(jìn)程的方式,增加同步目標(biāo),以下簡單介紹本示例中增加同步到Kafka的配置方法跑杭。
本示例中extract铆帽,pump進(jìn)程都是現(xiàn)成的,無需再添加德谅。只需要在目標(biāo)端增加同步到Kafka的replicate進(jìn)程即可爹橱。
在OGG的命令行下執(zhí)行:
```
GGSCI (10.0.0.2) 4> edit params r2kafka
REPLICAT r2kafka
sourcedefs /data/gg/dirdef/tcloud.t_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/r2kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP tcloud.t_ogg, TARGET tcloud.t_ogg;
```
replicate進(jìn)程和導(dǎo)入到HDFS的配置類似,差異是調(diào)用不同的配置dirprm/r2kafka.props窄做。這個(gè)配置的主要配置如下:
```
gg.handlerlist = kafkahandler //handler類型
gg.handler.kafkahandler.type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相關(guān)配置
gg.handler.kafkahandler.TopicName =ggtopic //kafka的topic名稱愧驱,無需手動創(chuàng)建
gg.handler.kafkahandler.format =json //傳輸文件的格式,支持json椭盏,xml等
gg.handler.kafkahandler.mode =op //OGG for Big Data中傳輸模式组砚,即op為一次SQL傳輸一次,tx為一次事務(wù)傳輸一次
gg.classpath=dirprm/:/usr/hdp/2.2.0.0-2041/kafka/libs/*:/data/gg/:/data/gg/lib/* //相關(guān)庫文件的引用
```
`r2kafka.props`引用的`custom_kafka_producer.properties`定義了Kafka的相關(guān)配置如下:
```
bootstrap.servers=10.0.0.62:6667 //kafkabroker的地址
acks=1
compression.type=gzip //壓縮類型
reconnect.backoff.ms=1000 //重連延時(shí)
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000
```
以上配置以及其他可配置項(xiàng)可參考[地址](http://docs.oracle.com/goldengate/bd1221/gg-bd/GADBD/GUID-2561CA12-9BAC-454B-A2E3-2D36C5C60EE5.htm#GADBD449):
以上配置完成后掏颊,在OGG命令行下添加trail文件到replicate進(jìn)程并啟動導(dǎo)入到Kafka的replicate進(jìn)程
```
GGSCI (10.0.0.2) 5> add replicat r2kafka exttrail
/data/gg/dirdat/tc,checkpointtable tcloud.checkpoint
REPLICAT added.
GGSCI (10.0.0.2) 6> start r2kafka
Sending START request to MANAGER ...
REPLICAT R2KAFKA starting
GGSCI (10.0.0.2) 10> info r2kafka
REPLICAT R2KAFKA Last Started 2016-11-09 17:59 Status RUNNING
Checkpoint Lag 00:00:00 (updated 00:00:09 ago)
Process ID 5236
Log Read Checkpoint File /data/gg/dirdat/tc000000
2016-11-09 17:05:25.067082 RBA 1217
```
檢查實(shí)時(shí)同步到kafka的效果糟红,在Oracle源端更新表的同時(shí),使用kafka客戶端自帶的腳本去查看這里配置的ggtopic這個(gè)kafkatopic下的消息:
```
SQL> insert into t_ogg values(2,'test2');
1 row created.
SQL> commit;
Commit complete.
```
目標(biāo)端Kafka的同步情況:
```
[root@10 kafka]# bin/kafka-console-consumer.sh --zookeeper 10.0.0.223:2181 --
from-beginning --topic ggtopic
{"table":"TCLOUD.T_OGG","op_type":"I","op_ts":"2016-11-09
09:05:25.067082","current_ts":"2016-11-
09T17:59:20.943000","pos":"00000000000000001080","after":
{"ID":"1","TEXT_NAME":"test"}}
{"table":"TCLOUD.T_OGG","op_type":"I","op_ts":"2016-11-09
10:02:06.827204","current_ts":"2016-11-
09T18:02:12.323000","pos":"00000000000000001217","after":
{"ID":"2","TEXT_NAME":"test2"}}
```
顯然乌叶,Oracle的數(shù)據(jù)已準(zhǔn)實(shí)時(shí)同步到Kafka盆偿。從頭開始消費(fèi)這個(gè)topic發(fā)現(xiàn)之前的同步信息也存在。架構(gòu)上可以直接接Storm准浴,SparkStreaming等直接消費(fèi)kafka消息進(jìn)行業(yè)務(wù)邏輯的處理事扭。
從Oracle實(shí)時(shí)同步到其他的Hadoop集群中,官方最新版本提供了HDFS乐横,HBase求橄,F(xiàn)lume和Kafka,相關(guān)配置可參考官網(wǎng)給出的例子配置即可葡公。
> 參考文檔:
> http://docs.oracle.com/goldengate/bd1221/gg-bd/GADBD/toc.htm