概述
在生成業(yè)務(wù)常有將MySQL數(shù)據(jù)同步到ES的需求毙死,如果需要很高的定制化珠移,往往需要開發(fā)同步程序用于處理數(shù)據(jù)。但沒有特殊業(yè)務(wù)需求渊抄,官方提供的logstash就很有優(yōu)勢(shì)了。
??在使用logstash我們應(yīng)先了解其特性丧裁,再?zèng)Q定是否使用:
- 無需開發(fā)护桦,僅需安裝配置logstash即可;
- 凡是SQL可以實(shí)現(xiàn)的logstash均可以實(shí)現(xiàn)(本就是通過sql查詢數(shù)據(jù))
- 支持每次全量同步或按照特定字段(如遞增ID煎娇、修改時(shí)間)增量同步二庵;
- 同步頻率可控,最快同步頻率每分鐘一次(如果對(duì)實(shí)效性要求較高缓呛,慎用)催享;
- 不支持被物理刪除的數(shù)據(jù)同步物理刪除ES中的數(shù)據(jù)(可在表設(shè)計(jì)中增加邏輯刪除字段IsDelete標(biāo)識(shí)數(shù)據(jù)刪除)。
1哟绊、安裝
前往官網(wǎng)下載logstash因妙,下載地址www.elastic.co/downloads/l…,zip壓縮包大約160M(覺得官網(wǎng)下載慢的可前往@zxiaofan的CSDN下載)票髓;
??程序目錄:【windows】G:\ELK\logstash-6.5.4攀涵;【linux】/tomcat/logstash/logstash-6.5.4。
??下文統(tǒng)一以【程序目錄】表示不同環(huán)境的安裝目錄洽沟。
2以故、配置
2.1、新建目錄存放配置文件及mysql依賴包
在【程序目錄】目錄(\bin同級(jí))新建mysql目錄玲躯,將下載好的mysql-connector-java-5.1.34.jar放入此目錄据德;
??在【程序目錄】\mysql目錄新建jdbc.conf文件,此文件將配置數(shù)據(jù)庫連接信息跷车、查詢數(shù)據(jù)sql棘利、分頁信息、同步頻率等核心信息朽缴。
??注意事項(xiàng)請(qǐng)查看注釋信息善玫。
2.2、單表同步配置
input {
stdin {}
jdbc {
type => "jdbc"
# 數(shù)據(jù)庫連接地址
jdbc_connection_string => "jdbc:mysql://192.168.1.1:3306/TestDB?characterEncoding=UTF-8&autoReconnect=true""
# 數(shù)據(jù)庫連接賬號(hào)密碼密强;
jdbc_user => "username"
jdbc_password => "pwd"
# MySQL依賴包路徑茅郎;
jdbc_driver_library => "mysql/mysql-connector-java-5.1.34.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 數(shù)據(jù)庫重連嘗試次數(shù)
connection_retry_attempts => "3"
# 判斷數(shù)據(jù)庫連接是否可用,默認(rèn)false不開啟
jdbc_validate_connection => "true"
# 數(shù)據(jù)庫連接可用校驗(yàn)超時(shí)時(shí)間或渤,默認(rèn)3600S
jdbc_validation_timeout => "3600"
# 開啟分頁查詢(默認(rèn)false不開啟)系冗;
jdbc_paging_enabled => "true"
# 單次分頁查詢條數(shù)(默認(rèn)100000,若字段較多且更新頻率較高,建議調(diào)低此值)薪鹦;
jdbc_page_size => "500"
# statement為查詢數(shù)據(jù)sql掌敬,如果sql較復(fù)雜惯豆,建議配通過statement_filepath配置sql文件的存放路徑;
# sql_last_value為內(nèi)置的變量奔害,存放上次查詢結(jié)果中最后一條數(shù)據(jù)tracking_column的值楷兽,此處即為ModifyTime;
# statement_filepath => "mysql/jdbc.sql"
statement => "SELECT KeyId,TradeTime,OrderUserName,ModifyTime FROM `DetailTab` WHERE ModifyTime>= :sql_last_value order by ModifyTime asc"
# 是否將字段名轉(zhuǎn)換為小寫华临,默認(rèn)true(如果有數(shù)據(jù)序列化芯杀、反序列化需求,建議改為false)雅潭;
lowercase_column_names => false
# Value can be any of: fatal,error,warn,info,debug揭厚,默認(rèn)info;
sql_log_level => warn
#
# 是否記錄上次執(zhí)行結(jié)果寻馏,true表示會(huì)將上次執(zhí)行結(jié)果的tracking_column字段的值保存到last_run_metadata_path指定的文件中棋弥;
record_last_run => true
# 需要記錄查詢結(jié)果某字段的值時(shí),此字段為true诚欠,否則默認(rèn)tracking_column為timestamp的值;
use_column_value => true
# 需要記錄的字段漾岳,用于增量同步轰绵,需是數(shù)據(jù)庫字段
tracking_column => "ModifyTime"
# Value can be any of: numeric,timestamp,Default value is "numeric"
tracking_column_type => timestamp
# record_last_run上次數(shù)據(jù)存放位置尼荆;
last_run_metadata_path => "mysql/last_id.txt"
# 是否清除last_run_metadata_path的記錄左腔,需要增量同步時(shí)此字段必須為false;
clean_run => false
#
# 同步頻率(分 時(shí) 天 月 年)捅儒,默認(rèn)每分鐘同步一次液样;
schedule => "* * * * *"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
# convert 字段類型轉(zhuǎn)換,將字段TotalMoney數(shù)據(jù)類型改為float巧还;
mutate {
convert => {
"TotalMoney" => "float"
}
}
}
output {
elasticsearch {
# host => "192.168.1.1"
# port => "9200"
# 配置ES集群地址
hosts => ["192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"]
# 索引名字鞭莽,必須小寫
index => "consumption"
# 數(shù)據(jù)唯一索引(建議使用數(shù)據(jù)庫KeyID)
document_id => "%{KeyId}"
}
stdout {
codec => json_lines
}
}
復(fù)制代碼
2.3、多表同步
多表配置和單表配置的區(qū)別在于input模塊的jdbc模塊有幾個(gè)type麸祷,output模塊就需對(duì)應(yīng)有幾個(gè)type澎怒;
input {
stdin {}
jdbc {
# 多表同步時(shí),表類型區(qū)分阶牍,建議命名為“庫名_表名”喷面,每個(gè)jdbc模塊需對(duì)應(yīng)一個(gè)type;
type => "TestDB_DetailTab"
# 其他配置此處省略走孽,參考單表配置
# ...
# ...
# record_last_run上次數(shù)據(jù)存放位置惧辈;
last_run_metadata_path => "mysql\last_id.txt"
# 是否清除last_run_metadata_path的記錄,需要增量同步時(shí)此字段必須為false磕瓷;
clean_run => false
#
# 同步頻率(分 時(shí) 天 月 年)盒齿,默認(rèn)每分鐘同步一次;
schedule => "* * * * *"
}
jdbc {
# 多表同步時(shí),表類型區(qū)分县昂,建議命名為“庫名_表名”肮柜,每個(gè)jdbc模塊需對(duì)應(yīng)一個(gè)type;
type => "TestDB_Tab2"
# 多表同步時(shí)倒彰,last_run_metadata_path配置的路徑應(yīng)不一致审洞,避免有影響;
# 其他配置此處省略
# ...
# ...
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
# output模塊的type需和jdbc模塊的type一致
if [type] == "TestDB_DetailTab" {
elasticsearch {
# host => "192.168.1.1"
# port => "9200"
# 配置ES集群地址
hosts => ["192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"]
# 索引名字待讳,必須小寫
index => "detailtab1"
# 數(shù)據(jù)唯一索引(建議使用數(shù)據(jù)庫KeyID)
document_id => "%{KeyId}"
}
}
if [type] == "TestDB_Tab2" {
elasticsearch {
# host => "192.168.1.1"
# port => "9200"
# 配置ES集群地址
hosts => ["192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"]
# 索引名字芒澜,必須小寫
index => "detailtab2"
# 數(shù)據(jù)唯一索引(建議使用數(shù)據(jù)庫KeyID)
document_id => "%{KeyId}"
}
}
stdout {
codec => json_lines
}
}
復(fù)制代碼
3、啟動(dòng)運(yùn)行
在【程序目錄】目錄執(zhí)行以下命令啟動(dòng):
【windows】
bin\logstash.bat -f mysql\jdbc.conf
【linux】
nohup ./bin/logstash -f mysql/jdbc_jx_moretable.conf &
復(fù)制代碼
可新建腳本配置好啟動(dòng)命令创淡,后期直接運(yùn)行即可痴晦。
??在【程序目錄】\logs目錄會(huì)有運(yùn)行日志。
Note:
??5.x/6.X/7.x版本需要jdk8支持琳彩,如果默認(rèn)jdk版本不是jdk8誊酌,那么需要在logstash或logstash.lib.sh的行首位置添加兩個(gè)環(huán)境變量:
export JAVA_CMD="/usr/tools/jdk1.8.0_162/bin"
export JAVA_HOME="/usr/tools/jdk1.8.0_162/"
復(fù)制代碼
開機(jī)自啟動(dòng):
- windows開機(jī)自啟:
- 方案1:使用windows自帶的任務(wù)計(jì)劃;
- 方案2:nssm注冊(cè)windows服務(wù)露乏,blog.csdn.net/u010887744/…
- linux開機(jī)自啟:
- CentOS 7將linux服務(wù)加入系統(tǒng)啟動(dòng) systemd service碧浊,blog.csdn.net/u010887744/…
4、問題及解決方案
4.1瘟仿、數(shù)據(jù)同步后箱锐,ES沒有數(shù)據(jù)
output.elasticsearch模塊的index必須是全小寫;
4.2劳较、增量同步后last_run_metadata_path文件內(nèi)容不改變
如果lowercase_column_names配置的不是false驹止,那么tracking_column字段配置的必須是全小寫。
4.3观蜗、提示找不到j(luò)dbc_driver_library
2032 com.mysql.jdbc.Driver not loaded.
Are you sure you've included the correct jdbc driver in :jdbc_driver_library?
復(fù)制代碼
檢測(cè)配置的地址是否正確臊恋,如果是linux環(huán)境,注意路徑分隔符是“/”嫂便,而不是“\”捞镰。
4.4、數(shù)據(jù)丟失
statement配置的sql中毙替,如果比較字段使用的是大于“>”岸售,可能存在數(shù)據(jù)丟失。
??假設(shè)當(dāng)同步完成后last_run_metadata_path存放的時(shí)間為2019-01-30 20:45:30厂画,而這時(shí)候新入庫一條數(shù)據(jù)的更新時(shí)間也為2019-01-30 20:45:30凸丸,那么這條數(shù)據(jù)將無法同步。
??解決方案:將比較字段使用 大于等于“>=”袱院。
4.5屎慢、數(shù)據(jù)重復(fù)更新
上一個(gè)問題“數(shù)據(jù)丟失”提供的解決方案是比較字段使用“大于等于”瞭稼,但這時(shí)又會(huì)產(chǎn)生新的問題。
??假設(shè)當(dāng)同步完成后last_run_metadata_path存放的時(shí)間為2019-01-30 20:45:30腻惠,而數(shù)據(jù)庫中更新時(shí)間最大值也為2019-01-30 20:45:30环肘,那么這些數(shù)據(jù)將重復(fù)更新,直到有更新時(shí)間更大的數(shù)據(jù)出現(xiàn)集灌。
??當(dāng)上述特殊數(shù)據(jù)很多悔雹,且長期沒有新的數(shù)據(jù)更新時(shí),會(huì)導(dǎo)致大量的數(shù)據(jù)重復(fù)同步到ES欣喧。
??何時(shí)會(huì)出現(xiàn)以上情況呢:①比較字段非“自增”腌零;②比較字段是程序生成插入。
解決方案:
- ①比較字段自增保證不重復(fù)或重復(fù)概率極兴舭ⅰ(比如使用自增ID或者數(shù)據(jù)庫的timestamp)益涧,這樣就能避免大部分異常情況了;
- ②如果確實(shí)存在大量程序插入的數(shù)據(jù)驯鳖,其更新時(shí)間相同闲询,且可能長期無數(shù)據(jù)更新,可考慮定期更新數(shù)據(jù)庫中的一條測(cè)試數(shù)據(jù)浅辙,避免最大值有大量數(shù)據(jù)嘹裂。
4.6、容災(zāi)
logstash本身無法集群摔握,我們常使用的組合ELK是通過kafka集群變相實(shí)現(xiàn)集群的。
??可供選擇的處理方式:①使用任務(wù)程序推送數(shù)據(jù)到kafaka丁寄,由kafka同步數(shù)據(jù)到ES氨淌,但任務(wù)程序本身也需要容災(zāi),并需要考慮重復(fù)推送的問題伊磺;②將logstash加入守護(hù)程序盛正,并輔以第三方監(jiān)控其運(yùn)行狀態(tài)。
??具體如何選擇屑埋,需要結(jié)合自身的應(yīng)用場(chǎng)景了豪筝。
4.7、海量數(shù)據(jù)同步
為什么會(huì)慢摘能?logstash分頁查詢使用臨時(shí)表分頁续崖,每條分頁SQL都是將全集查詢出來當(dāng)作臨時(shí)表,再在臨時(shí)表上分頁查詢团搞。這樣導(dǎo)致每次分頁查詢都要對(duì)主表進(jìn)行一次全表掃描严望。
SELECT * FROM (SELECT * FROM `ImageCN1`
WHERE ModifyTime>= '1970-01-01 08:00:00'
order by ModifyTime asc) AS `t1`
LIMIT 5000 OFFSET 10000000;
復(fù)制代碼
數(shù)據(jù)量太大,首次同步如何安全過渡同步逻恐?
??可考慮在statement對(duì)應(yīng)的sql中加上分頁條件像吻,比如ID在什么范圍峻黍,修改時(shí)間在什么區(qū)間,將單詞同步的數(shù)據(jù)總量減少拨匆。先少量數(shù)據(jù)同步測(cè)試驗(yàn)證姆涩,再根據(jù)測(cè)試情況修改區(qū)間條件啟動(dòng)logstash完成同步。比如將SQL修改為:
SELECT
*
FROM
`ImageCN1`
WHERE
ModifyTime < '2018-10-10 10:10:10' AND ModifyTime >= '1970-01-01 08:00:00'
ORDER BY
ModifyTime ASC
復(fù)制代碼
當(dāng)同步完ModifyTime<'2018-10-10 10:10:10'區(qū)間的數(shù)據(jù)在修改SQL同步剩余區(qū)間的數(shù)據(jù)惭每。
??這樣需要每次同步后就修改sql骨饿,線上運(yùn)營比較繁瑣,是否可以不修改sql洪鸭,同時(shí)保證同步效率呢样刷?SQL我們可以再修改下:
SELECT
*
FROM
`ImageCN1`
WHERE
ModifyTime >= '1970-01-01 08:00:00'
ORDER BY
ModifyTime ASC
LIMIT 100000
復(fù)制代碼
這樣就能保證每次子查詢的數(shù)據(jù)量不超過10W條,實(shí)際測(cè)試發(fā)現(xiàn)览爵,數(shù)據(jù)量很大時(shí)效果很明顯置鼻。
[SQL]USE XXXDataDB;
受影響的行: 0
時(shí)間: 0.001s
[SQL]
SELECT
*
FROM
( SELECT * FROM `ImageCN1` WHERE ModifyTime >= '1970-01-01 08:00:00' ORDER BY ModifyTime ASC ) AS `t1`
LIMIT 5000 OFFSET 900000;
受影響的行: 0
時(shí)間: 7.229s
[SQL]
SELECT
*
FROM
( SELECT * FROM `ImageCN1` WHERE ModifyTime >= '2018-07-18 19:35:10' ORDER BY ModifyTime ASC LIMIT 100000 ) AS `t1`
LIMIT 5000 OFFSET 90000
受影響的行: 0
時(shí)間: 1.778s
復(fù)制代碼
測(cè)試可以看出,SQL不加limit 10W時(shí)蜓竹,越往后分頁查詢?cè)铰福臅r(shí)達(dá)到8S,而加了limit條件的SQL耗時(shí)穩(wěn)定在2S以內(nèi)俱济。
作者:zxiaofan
鏈接:https://juejin.im/post/5daf2fa56fb9a04e054da1e3
來源:掘金
著作權(quán)歸作者所有嘶是。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處蛛碌。