譯者前言
近期的主要工作是在為公司的 APP 增加搜索功能膛壹。因?yàn)橐灿龅搅诵枰殃P(guān)系型數(shù)據(jù)庫(kù)中的數(shù)據(jù)同步 ElasticSearch 中的問題,故抽了點(diǎn)時(shí)間翻譯了這篇官方的博文疼电。最近,在數(shù)據(jù)同步方面也有些思考减拭。
本篇文章的重點(diǎn)不在 Logstash 的 JDBC 插件的使用方法蔽豺,而是數(shù)據(jù)同步會(huì)遇到的一些細(xì)節(jié)問題如何處理。我覺得拧粪,這些設(shè)計(jì)思想是通用的修陡,無論你使用的何種方式進(jìn)行數(shù)據(jù)同步。
翻譯正文
為了利用 ElasticSearch 強(qiáng)大的搜索能力可霎,大部分的業(yè)務(wù)都會(huì)在關(guān)系型數(shù)據(jù)庫(kù)的基礎(chǔ)上部署 ElasticSearch魄鸦。這類場(chǎng)景下,保持 ElasticSearch 和關(guān)系型數(shù)據(jù)庫(kù)之間的數(shù)據(jù)同步是非常必要的癣朗。
本篇博文將會(huì)介紹如何通過 Logstash 實(shí)現(xiàn)在 MySQL 和 ElasticSearch 之間數(shù)據(jù)的高效復(fù)制與同步拾因。
注:文中演示的代碼和方法都經(jīng)過在 MySQL 中的測(cè)試,理論上適應(yīng)于所有的關(guān)系型數(shù)據(jù)庫(kù)旷余。
本文中绢记,組件的相關(guān)信息如下:
- MySQL: 8.0.16.
- Elasticsearch: 7.1.1
- Logstash: 7.1.1
- Java: 1.8.0_162-b12
- JDBC input plugin: v4.3.13
- JDBC connector: Connector/J 8.0.16
數(shù)據(jù)同步概述
本文將會(huì)通過 Logstash 的 JDBC input 插件進(jìn)行 ElasticSearch 和 MySQL 之間的數(shù)據(jù)同步。從概念上講正卧,JDBC 插件將通過周期性的輪詢以發(fā)現(xiàn)上次迭代后的新增和更新的數(shù)據(jù)蠢熄。為了正常工作,幾個(gè)條件需要滿足:
ElasticSearch 中 _id 設(shè)置必須來自 MySQL 中 id 字段炉旷。它提供了 MySQL 和 ElasticSearch 之間文檔數(shù)據(jù)的映射關(guān)系签孔。如果一條記錄在 MySQL 更新,那么砾跃,ElasticSearch 所有關(guān)聯(lián)文檔都應(yīng)該被重寫骏啰。要說明的是,重寫 ElasticSearch 中的文檔和更新操作的效率相同抽高。在內(nèi)部實(shí)現(xiàn)上,一個(gè)更新操作由刪除一個(gè)舊文檔和創(chuàng)建一個(gè)新文檔兩部分組成透绩。
當(dāng) MySQL 中插入或更新一條記錄時(shí)翘骂,必須包含一個(gè)字段用于保存字段的插入或更新時(shí)間。如此一來帚豪, Logstash 就可以實(shí)現(xiàn)每次請(qǐng)求只獲取上次輪詢后更新或插入的記錄碳竟。Logstash 每次輪詢都會(huì)保存從 MySQL 中讀取到的最新的插入或更新時(shí)間,該時(shí)間大于上次輪詢最新時(shí)間狸臣。
如果滿足了上述條件莹桅,我們就可以配置 Logstash 周期性的從 MySQL 中讀取所有最新更新或插入的記錄,然后寫入到 Elasticsearch 中烛亦。
關(guān)于 Logstash 的配置代碼诈泼,本文稍后會(huì)給出懂拾。
MySQL 設(shè)置
MySQL 庫(kù)和表的配置如下:
CREATE DATABASE es_db
USE es_db
DROP TABLE IF EXISTS es_table
CREATE TABLE es_table (
id BIGINT(20) UNSIGNED NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY unique_id (id),
client_name VARCHAR(32) NOT NULL,
modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
insertion_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
復(fù)制代碼
配置中有幾點(diǎn)需要說明,如下:
- es_table铐达,MySQL 的數(shù)據(jù)表岖赋,我們將把它的數(shù)據(jù)同步到 ElasticSearch 中;
- id瓮孙,記錄的唯一標(biāo)識(shí)唐断。注意,id 定義為主鍵的同時(shí)杭抠,也定義為唯一建脸甘,可以保證每個(gè) id 在表中只出現(xiàn)一次。同步 ElasticSearch 時(shí)偏灿,將會(huì)轉(zhuǎn)化為文檔的 _id斤程;
- client_name,表示用戶定義用來保存數(shù)據(jù)的字段菩混,為使博文保持簡(jiǎn)潔忿墅,我們只定義了一個(gè)字段,更多字段也很容易加入沮峡。接下來的演示疚脐,我們會(huì)更新該字段,用以說明不僅僅新插入記錄會(huì)同步到 MySQL邢疙,更新記錄同樣會(huì)同步到 MySQL棍弄;
- modification_time,用于保存記錄的更新或插入時(shí)間疟游,它使得 Logstash 可以在每次輪詢時(shí)只請(qǐng)求上次輪詢后新增更新的記錄呼畸;
- insertion_time,該字段用于一條記錄插入時(shí)間颁虐,主要是為演示方便蛮原,對(duì)同步而言,并非必須另绩;
MySQL 操作
前面設(shè)置完成儒陨,我們可以通過如下命令插入記錄:
INSERT INTO es_table (id, client_name) VALUES (<id>, <client name>);
復(fù)制代碼
使用如下命令更新記錄:
UPDATE es_table SET client_name = <new client name> WHERE id=<id>;
復(fù)制代碼
使用如下命令更新插入記錄:
INSERT INTO es_table (id, client_name) VALUES (<id>, <client_name when created>) ON DUPLICATE KEY UPDATE client_name=<client name when updated>
復(fù)制代碼
同步代碼
Logstash 的 pipeline 配置代碼如下,它實(shí)現(xiàn)了前面描述的功能笋籽,從 MySQL 到 ElasticSearch 的數(shù)據(jù)同步蹦漠。
input {
jdbc {
jdbc_driver_library => "<path>/mysql-connector-java-8.0.16.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://<MySQL host>:3306/es_db"
jdbc_user => "<my username>"
jdbc_password => "<my password>"
jdbc_paging_enabled => true
tracking_column => "unix_ts_in_secs"
use_column_value => true
tracking_column_type => "numeric"
schedule => "*/5 * * * * *",
statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time)) > :sql_last_value AND modification_time < NOW() ORDER BY modification_time desc"
}
}
filter {
mutate {
copy => { "id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "unix_ts_in_secs"]
}
}
output {
# stdout { codec => "rubydebug" }
elasticsearch {
index => "rdbms_sync_idx"
document_id => "%{[%metedata][_id]}"
}
}
復(fù)制代碼
關(guān)于 Pipeline 配置的幾點(diǎn)說明,如下:
- tracking_column
此處配置為 "unix_ts_in_secs"车海。它被用于追蹤最新的記錄笛园,并被保存在 .logstash_jdbc_last_run 文件中,下一次輪詢將以這個(gè)邊界位置為準(zhǔn)進(jìn)行記錄獲取。SELECT 語句中研铆,可通過 :sql_last_value 訪問該配置字段的值埋同。
- unix_ts_in_secs
由 SELECT 語句生成,是 "modification_time" 的 UNIX TIMESTAMP蚜印。它被前面討論的 "track_column" 引用莺禁。使用 UNIX TIMESTAMP,而非其他時(shí)間形式窄赋,可以減少?gòu)?fù)雜性哟冬,防止時(shí)區(qū)導(dǎo)致的時(shí)間不一致問題。
- sql_last_value
內(nèi)建的配置參數(shù)忆绰,指定每次輪詢的開始位置浩峡。在 input 配置中,可被 SELECT 語句引用错敢。在每次輪詢開始前翰灾,從 .logstash_jdbc_last_run 中讀取,此案例中稚茅,即為 "unix_ts_in_secs" 的最近值纸淮。如此便可保證每次輪詢只獲取最新插入和更新的記錄。
- schedule
通過 cron 語法指定輪詢的執(zhí)行周期亚享,例子中咽块,"*/5 * * * * *" 表示每 5 秒輪詢一次。
- modification_time < NOW()
SELECT 語句查詢條件的一部分欺税,當(dāng)前解釋不清侈沪,具體情況待下面的章節(jié)再作介紹。
- filter
該配置指定將 MySQL 中的 id 復(fù)制到 metadata 字段 _id 中晚凿,用以確保 ElasticSearch 中的文檔寫入正確的 _id亭罪。而之所以使用 metadata,因?yàn)樗桥R時(shí)的歼秽,不會(huì)使文檔中產(chǎn)生新的字段应役。同時(shí),我們也會(huì)把不希望寫入 Elasticsearch 的字段 id 和 @version 移除哲银。
- output
在 output 輸出段的配置扛吞,我們指定了文檔應(yīng)該被輸出到 ElasticSearch,并且設(shè)置輸出文檔 _id 為 filter 段創(chuàng)建的 metadata 的 _id荆责。如果需要調(diào)試,注釋部分的 rubydebug 可以實(shí)現(xiàn)亚脆。
SELECT 語句的正確性分析
接下來做院,我們將開始解釋為什么 SELECT 語句中包含 modification_time < NOW() 是非常重要的。為了解釋這個(gè)問題,我們將引入兩個(gè)反例演示說明键耕,為什么下面介紹的兩種最直觀的方法是錯(cuò)誤的寺滚。還有,為什么 modification_time < Now() 可以克服這些問題屈雄。
直觀場(chǎng)景一
當(dāng) where 子句中僅僅包含 UNIX_TIMESTAMP(modification_time) > :sql_last_value村视,而沒有 modification < Now() 的情況下,工作是否正常酒奶。這個(gè)場(chǎng)景下蚁孔,SELECT 語句是如下形式:
statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value) ORDER BY modification_time ASC"
復(fù)制代碼
粗略一看,似乎沒發(fā)現(xiàn)什么問題惋嚎,應(yīng)該可以正常工作杠氢。但其實(shí),這里有一些邊界情況另伍,可能導(dǎo)致一些文檔的丟失鼻百。舉個(gè)例子,假設(shè) MySQL 每秒插入兩個(gè)文檔摆尝,Logstash 每 5 秒執(zhí)行一次温艇。如下圖所示,時(shí)間范圍 T0 至 T10堕汞,數(shù)據(jù)記錄 R1 至 R22勺爱。
Logstash 的第一次輪詢發(fā)生在 T5 時(shí)刻,讀取記錄 R1 至 R11臼朗,即圖中青色區(qū)域邻寿。此時(shí),sql_last_value 即為 T5视哑,這個(gè)時(shí)間是從 R11 中獲取到的绣否。
如果,當(dāng) Logstash 完成從 MySQL 讀取數(shù)據(jù)后挡毅,同樣在 T5 時(shí)刻蒜撮,又有一條記錄插入到 MySQL 中。 而下一次的輪詢只會(huì)拉取到大于 T5 的記錄跪呈,這意味著 R12 將會(huì)丟失段磨。如圖所示,青色和灰色區(qū)域分別表示當(dāng)次和上次輪詢獲取到的記錄耗绿。
注意苹支,這類場(chǎng)景下的 R12 將永遠(yuǎn)不會(huì)再被寫入到 ElasticSearch。
直觀場(chǎng)景二
為了解決這個(gè)問題误阻,或許有人會(huì)想债蜜,如果把 where 子句中的大于(>)改為大于等于(>=)是否可行晴埂。SELECT 語句如下:
statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) >= :sql_last_value) ORDER BY modification_time ASC"
復(fù)制代碼
這種方式其實(shí)也不理想。這種情況下寻定,某些文檔可能會(huì)被兩次讀取儒洛,重復(fù)寫入到 ElasticSearch 中。雖然這不影響結(jié)果的正確性狼速,但卻做了多余的工作琅锻。如下圖所示,Logstash 的首次輪詢和場(chǎng)景一相同向胡,青色區(qū)域表示已經(jīng)讀取的記錄恼蓬。
Logstash 的第二次輪詢將會(huì)讀取所有大于等于 T5 的記錄。如下圖所示捷枯,注意 R11滚秩,即紫色區(qū)域,將會(huì)被再次發(fā)送到 ElasticSearch 中淮捆。
這兩種場(chǎng)景的實(shí)現(xiàn)效果都不理想郁油。場(chǎng)景一會(huì)導(dǎo)致數(shù)據(jù)丟失,這是無法容忍的攀痊。場(chǎng)景二桐腌,存在重復(fù)讀取寫入的問題,雖然對(duì)數(shù)據(jù)正確性沒有影響苟径,但執(zhí)行了多余的 IO案站。
終極方案
前面的兩場(chǎng)方案都不可行,我們需要繼續(xù)尋找其他解決方案棘街。其實(shí)也很簡(jiǎn)單蟆盐,通過指定 (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()),我們就可以保證每條記錄有且只發(fā)送一次遭殉。
如下圖所示石挂,Logstash 輪詢發(fā)生在 T5 時(shí)刻。因?yàn)橹付?modification_time < NOW()险污,文檔只會(huì)讀取到 T4 時(shí)刻痹愚,并且 sql_last_value 的值也將會(huì)被設(shè)置為 T4。
開始下一次的輪詢蛔糯,當(dāng)前時(shí)間 T10拯腮。
由于設(shè)置了 UNIX_TIMESTAMP(modification_time) > :sql_last_value,并且當(dāng)前 sql_last_value 為 T4蚁飒,因此动壤,本次的輪詢將從 T5 開始。而 modification_time < NOW() 決定了只有時(shí)間小于等于 T9 的記錄才會(huì)被讀取淮逻。最后狼电,sql_last_value 也將被設(shè)置為 T9蜒灰。
如此弦蹂,MySQL 中的每個(gè)記錄就可以做到都能被精確讀取了一次肩碟,如此就可以避免每次輪詢可能導(dǎo)致的當(dāng)前時(shí)間間隔內(nèi)數(shù)據(jù)丟失或重復(fù)讀取的問題。
系統(tǒng)測(cè)試
簡(jiǎn)單的測(cè)試可以幫助我們驗(yàn)證配置是否如我們所愿凸椿。我們可以寫入一些數(shù)據(jù)至數(shù)據(jù)庫(kù)削祈,如下:
INSERT INTO es_table (id, client_name) VALUES (1, 'Jim Carrey');
INSERT INTO es_table (id, client_name) VALUES (2, 'Mike Myers');
INSERT INTO es_table (id, client_name) VALUES (3, 'Bryan Adams');
復(fù)制代碼
一旦 JDBC 輸入插件觸發(fā)執(zhí)行,將會(huì)從 MySQL 中讀取所有記錄脑漫,并寫入到 ElasticSearch 中髓抑。我們可以通過查詢語句查看 ElasticSearch 中的文檔。
GET rdbms_sync_idx/_search
復(fù)制代碼
執(zhí)行結(jié)果如下:
"hits" : {
"total" : {
"value" : 3,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "rdbms_sync_idx",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"insertion_time" : "2019-06-18T12:58:56.000Z",
"@timestamp" : "2019-06-18T13:04:27.436Z",
"modification_time" : "2019-06-18T12:58:56.000Z",
"client_name" : "Jim Carrey"
}
},
Etc …
復(fù)制代碼
更新 id=1 的文檔优幸,如下:
UPDATE es_table SET client_name = 'Jimbo Kerry' WHERE id=1;
復(fù)制代碼
通過 _id = 1吨拍,可以實(shí)現(xiàn)文檔的正確更新。通過執(zhí)行如下命令查看文檔:
GET rdbms_sync_idx/_doc/1
復(fù)制代碼
響應(yīng)結(jié)果如下:
{
"_index" : "rdbms_sync_idx",
"_type" : "_doc",
"_id" : "1",
"_version" : 2,
"_seq_no" : 3,
"_primary_term" : 1,
"found" : true,
"_source" : {
"insertion_time" : "2019-06-18T12:58:56.000Z",
"@timestamp" : "2019-06-18T13:09:30.300Z",
"modification_time" : "2019-06-18T13:09:28.000Z",
"client_name" : "Jimbo Kerry"
}
}
復(fù)制代碼
文檔 _version 被設(shè)置為 2网杆,并且 modification_time 和 insertion_time 已經(jīng)不一樣了羹饰,client_name 已經(jīng)正確更新。而 @timestamp碳却,不是我們需要關(guān)注的队秩,它是 Logstash 默認(rèn)添加的。
更新添加 upsert 執(zhí)行語句如下:
INSERT INTO es_table (id, client_name) VALUES (4, 'Bob is new') ON DUPLICATE KEY UPDATE client_name='Bob exists already';
復(fù)制代碼
和之前一樣昼浦,我們可以通過查看 ElasticSearch 中相應(yīng)文檔馍资,便可驗(yàn)證同步的正確性。
文檔刪除
不知道你是否已經(jīng)發(fā)現(xiàn)关噪,如果一個(gè)文檔從 MySQL 中刪除鸟蟹,并不會(huì)同步到 ElasticSearch 。關(guān)于這個(gè)問題使兔,列舉一些可供我們考慮的方案建钥,如下:
MySQL 中的記錄可通過包含 is_deleted 字段用以表明該條記錄是否有效。一旦發(fā)生更新火诸,is_deleted 也會(huì)同步更新到 ElasticSearch 中锦针。如果通過這種方式,在執(zhí)行 MySQL 或 ElasticSearch 查詢時(shí)置蜀,我們需要重寫查詢語句來過濾掉 is_deleted 為 true 的記錄奈搜。同時(shí),需要一些后臺(tái)進(jìn)程將 MySQL 和 ElasticSearch 中的這些文檔刪除盯荤。
另一個(gè)可選方案馋吗,應(yīng)用系統(tǒng)負(fù)責(zé) MySQL 和 ElasticSearch 中數(shù)據(jù)的刪除,即應(yīng)用系統(tǒng)在刪除 MySQL 中數(shù)據(jù)的同時(shí)秋秤,也要負(fù)責(zé)將 ElasticSearch 中相應(yīng)的文檔刪除宏粤。
總結(jié)
本文介紹了如何通過 Logstash 進(jìn)行關(guān)系型數(shù)據(jù)庫(kù)和 ElasticSearch 之間的數(shù)據(jù)同步脚翘。文中以 MySQL 為例,但理論上绍哎,演示的方法和代碼也應(yīng)該同樣適應(yīng)于其他的關(guān)系型數(shù)據(jù)庫(kù)来农。
歡迎工作一到五年的Java工程師朋友們加入JavaQQ群:219571750,群內(nèi)提供免費(fèi)的Java架構(gòu)學(xué)習(xí)資料(里面有高可用崇堰、高并發(fā)沃于、高性能及分布式、Jvm性能調(diào)優(yōu)海诲、Spring源碼繁莹,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個(gè)知識(shí)點(diǎn)的架構(gòu)資料)合理利用自己每一分每一秒的時(shí)間來學(xué)習(xí)提升自己特幔,不要再用"沒有時(shí)間“來掩飾自己思想上的懶惰咨演!趁年輕,使勁拼蚯斯,給未來的自己一個(gè)交代薄风!