使用Kettle實現(xiàn)數(shù)據(jù)實時增量同步
0. 前言
本文介紹了使用Kettle
對一張業(yè)務表數(shù)據(jù)(500萬條數(shù)據(jù)以上)進行實時(10秒)同步祭钉,采用了時間戳增量回滾同步的方法隐绵。關(guān)于ETL和Kettle的入門知識大家可以閱讀相關(guān)的blog和文檔學習心软。
1. 時間戳增量回滾同步
假定在源數(shù)據(jù)表中有一個字段會記錄數(shù)據(jù)的新增或修改時間扎瓶,可以通過它對數(shù)據(jù)在時間維度上進行排序。通過中間表記錄每次更新的時間戳浊闪,在下一個同步周期時恼布,通過這個時間戳同步該時間戳以后的增量數(shù)據(jù)。這是時間戳增量同步搁宾。
但是時間戳增量同步不能對源數(shù)據(jù)庫中歷史數(shù)據(jù)的刪除操作進行同步折汞,我們可以通過在每次同步時,把時間戳往前回滾一段時間盖腿,從而同步一定時間段內(nèi)的刪除操作爽待。這就是時間戳增量回滾同步,這個名字是我自己給取得翩腐,意會即可鸟款,就是在時間戳增量同步的同時回滾一定的時間段。
說明:
- 源數(shù)據(jù)表 需要被同步的數(shù)據(jù)表
- 目標數(shù)據(jù)表 同步至的數(shù)據(jù)表
- 中間表 存儲時間戳的表
2. 前期準備
在兩個數(shù)據(jù)庫中分別創(chuàng)建數(shù)據(jù)表茂卦,并通過腳本在源數(shù)據(jù)表中插入500萬條數(shù)據(jù)何什,完成后再以每秒一條的速度插入新數(shù)據(jù),模擬生產(chǎn)環(huán)境等龙。
源數(shù)據(jù)表結(jié)構(gòu)如下:
CREATE TABLE `im_message` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`sender` varchar(45) COLLATE utf8_bin NOT NULL COMMENT '消息發(fā)送者:SYSTEM',
`send_time` datetime(6) NOT NULL,
`receiver` varchar(45) COLLATE utf8_bin NOT NULL COMMENT '消息接受者',
`content` varchar(255) COLLATE utf8_bin NOT NULL COMMENT '消息內(nèi)容',
`is_read` tinyint(4) NOT NULL COMMENT '消息是否被讀却υ:0-未讀;非0-已讀',
`read_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `id_UNIQUE` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='消息表'
3. 作業(yè)流程
- 開始組件
- 建時間戳中間表
- 獲取中間表的時間戳蛛砰,并設(shè)置為全局變量
- 刪除目標表中時間戳及時間戳以后的數(shù)據(jù)
- 抽取兩個數(shù)據(jù)表的時間戳及時間戳以后的數(shù)據(jù)進行比對罐栈,并根據(jù)比對結(jié)果進行刪除、新增或修改操作
- 更新時間戳
4. 創(chuàng)建作業(yè)
作業(yè)的最終截圖如下:
4.1 創(chuàng)建作業(yè)和DB連接
打開Spoon工具暴备,新建作業(yè)悠瞬,然后在左側(cè)主對象樹DB連接中新建DB連接们豌。創(chuàng)建連接并測試通過后可以在左側(cè)DB連接下右鍵共享出來涯捻。因為在單個作業(yè)或者轉(zhuǎn)換中新建的DB連接都是局域數(shù)據(jù)源,在其他轉(zhuǎn)換和作業(yè)中是不能使用的望迎,即使屬于同一個作業(yè)下的不同轉(zhuǎn)換障癌,所以需要把他們共享,這樣DB連接就會成為全局數(shù)據(jù)源辩尊,不用多次編輯涛浙。
4.2 建時間戳中間表
這一步是為了在目標數(shù)據(jù)庫建中間表etl_temp
,并插入初始的時間戳字段。因為該作業(yè)在生產(chǎn)環(huán)境是循環(huán)調(diào)用的,該步驟在每一個同步周期中都會調(diào)用轿亮,所以在建表時需要判斷該表是否已經(jīng)存在疮薇,如果不存在才建表。
SQL代碼和組件配置截圖如下:
CREATE TABLE IF NOT EXISTS etl_temp(id int primary key,time_stamp timestamp);
INSERT IGNORE INTO etl_temp (id,time_stamp) VALUES (1,'2018-05-22 00:00:00');
我把該作業(yè)時間戳的ID設(shè)為1我注,在接下來的步驟中也是通過這個ID查詢我們想要的時間戳
4.2 獲取時間戳并設(shè)為變量
新建一個轉(zhuǎn)換按咒,在轉(zhuǎn)換中使用表輸入和設(shè)置變量兩個組件
表輸入
SQL
代碼和組件配置截圖如下
在Kettle
中設(shè)置的變量都是字符串類型,為了便于比較但骨。我在SQL語句把查出的時間戳進行了格式轉(zhuǎn)換
select date_format(time_stamp , '%Y-%m-%d %H:%i:%s') time_stamp from etl_temp where id='1'
設(shè)置變量
變量活動類型可以為該變量設(shè)置四種有效活動范圍励七,分別是JVM、該Job奔缠、父Job和祖父Job
4.3 刪除目標表中時間戳及時間戳以后的數(shù)據(jù)
這樣做有兩個好處:
-
避免在同步中重復或者遺漏數(shù)據(jù)掠抬。例如當時間戳在源數(shù)據(jù)表中不是唯一的,上一次同步周期最后一條數(shù)據(jù)的時間戳是
2018-05-25 18:12:12
,那么上一次同步周期結(jié)束后中間表中的時間戳就會更新為2018-05-25 18:12:12
校哎。如果在下一個同步周期時源數(shù)據(jù)表中仍然有時間戳為2018-05-25 18:12:12
的新數(shù)據(jù)两波,那么同步就會出現(xiàn)數(shù)據(jù)不一致。采用大于時間戳的方式同步就會遺漏數(shù)據(jù)闷哆,采用等于時間戳的方式同步就會重復同步數(shù)據(jù)雨女。 - 增加健壯性 當作業(yè)異常結(jié)束后,不用做任何多余的操作就可以重啟阳准。因為會刪除目標表中時間戳及時間戳以后的數(shù)據(jù)氛堕,所以不用擔心數(shù)據(jù)一致性問題
2018-09-29:對增加健壯性進行補充:在一次同步周期中腳本異常中斷,這時候中間表的時間戳沒有更新野蝇,但是目標表已經(jīng)同步了部分數(shù)據(jù)讼稚,當再次啟動腳本就會出現(xiàn)數(shù)據(jù)重復的情況,而且在很多時候因為主鍵的存在绕沈,腳本啟動會報錯
在組件中使用了上一步驟設(shè)置的變量锐想,所以必須勾選使用變量替換
delete from test_kettle.im_message where send_time>='${TIME_STAMP}'
4.4 抽取、比對和更新數(shù)據(jù)
這一步才是真正的數(shù)據(jù)同步步驟乍狐,完成了數(shù)據(jù)的抽取赠摇、比對,并根據(jù)不同的比對結(jié)果刪除浅蚪、更新藕帜、插入或不做任何操作。
正如前文所說惜傲,為了同步刪除操作洽故,在原始表輸入和目標表輸入步驟中回滾了一定時間段。其中回滾的時間段設(shè)置為了全局的參數(shù)盗誊。左右空白處右鍵即可設(shè)置參數(shù)时甚,該作業(yè)下的所有作業(yè)和轉(zhuǎn)換都能使用隘弊,設(shè)置如下圖
轉(zhuǎn)換截圖如下
原始表輸入
SELECT
id
, sender
, send_time
, receiver
, content
, is_read
, read_time
FROM ueqcsd.im_message
where send_time>= date_sub(str_to_date('${TIME_STAMP}','%Y-%m-%d %H:%i:%s'), interval ${ROLL_BACK_DAYS} day);
目標表輸入
SELECT
id
, sender
, send_time
, receiver
, content
, is_read
, read_time
FROM test_kettle.im_message
where send_time>= date_sub(str_to_date('${TIME_STAMP}','%Y-%m-%d %H:%i:%s'), interval ${ROLL_BACK_DAYS} day);
注意兩個組件的數(shù)據(jù)庫鏈接是不同的,當然它們也就這個和名字不同
比對記錄
對兩個表輸入查出的數(shù)據(jù)進行比對荒适,并把比對的結(jié)果寫進輸入流梨熙,傳遞給后面的組件。
比對的結(jié)果有三種:
- new
- changed
- deleted
標注字段表示比對結(jié)果的字段名刀诬,后面有用串结。關(guān)鍵字段表示比對的字段,在這個作業(yè)中我們比較兩個的主鍵ID
舅列。
Switch
該步驟對上一步驟產(chǎn)生的標注字段進行路由肌割,不同的結(jié)果路由到不同的步驟。其中目標步驟表示下一步驟的名字帐要。
插入
Kettle
有一個插入/更新組件把敞,但是據(jù)網(wǎng)友介紹這個組件性能低下,每秒最多只能同步幾百條數(shù)據(jù)榨惠,所有我對插入和更新分別作了不同的處理奋早。插入使用表輸出組件;更新使用更新組件赠橙。
為了進一步提升同步效率耽装,我在表輸出組件使用了多線程(右鍵>改變開始復制的數(shù)量),使同步速度達到每秒12000條期揪。Switch組件和表輸出組件中間的虛擬組件(空操作)也是為了使用多線程添加的掉奄。
勾選批量插入,可以極大提高同步速度
更新和刪除
4.5 更新時間戳
set @new_etl_start_time_stamp = (SELECT SEND_TIME FROM test_kettle.im_message ORDER BY SEND_TIME DESC LIMIT 1);
update etl_temp set time_stamp=@new_etl_start_time_stamp where id='1';
4.6 發(fā)送郵箱
關(guān)于發(fā)送郵件組件網(wǎng)上有很多資料凤薛,就不多做介紹姓建。特別強調(diào)一點,郵箱密碼是 單獨的授權(quán)碼缤苫,而不是郵箱登錄密碼速兔。
運行
在開發(fā)環(huán)境點擊Spoon界面左上角三角符號運行作業(yè)即可。
在第一次運行時活玲,為了提高同步效率涣狗,可以先不創(chuàng)建目標表的索引。在第一此同步完成后舒憾,再創(chuàng)建索引镀钓。然后在START組件中編輯調(diào)度邏輯,再次啟動珍剑。
如下圖所示
運行日志如下圖
這樣掸宛,一個使用時間戳增量回滾同步數(shù)據(jù)的作業(yè)就完成了。