使用logstash同步MySQL數(shù)據(jù)到ES

概述

在生成業(yè)務(wù)常有將MySQL數(shù)據(jù)同步到ES的需求渣蜗,如果需要很高的定制化芹彬,往往需要開發(fā)同步程序用于處理數(shù)據(jù)掸读。但沒有特殊業(yè)務(wù)需求玻熙,官方提供的logstash就很有優(yōu)勢(shì)了否彩。
??在使用logstash我們應(yīng)先了解其特性,再?zèng)Q定是否使用:

  • 無需開發(fā)嗦随,僅需安裝配置logstash即可列荔;
  • 凡是SQL可以實(shí)現(xiàn)的logstash均可以實(shí)現(xiàn)(本就是通過sql查詢數(shù)據(jù))
  • 支持每次全量同步或按照特定字段(如遞增ID敬尺、修改時(shí)間)增量同步;
  • 同步頻率可控贴浙,最快同步頻率每分鐘一次(如果對(duì)實(shí)效性要求較高砂吞,慎用);
  • 不支持被物理刪除的數(shù)據(jù)同步物理刪除ES中的數(shù)據(jù)(可在表設(shè)計(jì)中增加邏輯刪除字段IsDelete標(biāo)識(shí)數(shù)據(jù)刪除)崎溃。

1蜻直、安裝

前往官網(wǎng)下載logstash,下載地址www.elastic.co/downloads/l…袁串,zip壓縮包大約160M(覺得官網(wǎng)下載慢的可前往@zxiaofan的CSDN下載)概而;
??程序目錄:【windows】G:\ELK\logstash-6.5.4;【linux】/tomcat/logstash/logstash-6.5.4囱修。
??下文統(tǒng)一以【程序目錄】表示不同環(huán)境的安裝目錄赎瑰。

2、配置

2.1破镰、新建目錄存放配置文件及mysql依賴包

訪問https://dev.mysql.com/downloads/connector/j/下載mysql依賴包餐曼。
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.46.zip
??在【程序目錄】目錄(\bin同級(jí))新建mysql目錄,將下載好的mysql-connector-java-5.1.34.jar放入此目錄鲜漩;
??在【程序目錄】\mysql目錄新建jdbc.conf文件源譬,此文件將配置數(shù)據(jù)庫連接信息、查詢數(shù)據(jù)sql宇整、分頁信息瓶佳、同步頻率等核心信息。
??注意事項(xiàng)請(qǐng)查看注釋信息鳞青。

2.2霸饲、單表同步配置

input {
    stdin {}
    jdbc {
        type => "jdbc"
         # 數(shù)據(jù)庫連接地址
        jdbc_connection_string => "jdbc:mysql://192.168.1.1:3306/TestDB?characterEncoding=UTF-8&autoReconnect=true""
         # 數(shù)據(jù)庫連接賬號(hào)密碼;
        jdbc_user => "username"
        jdbc_password => "pwd"
         # MySQL依賴包路徑臂拓;
        jdbc_driver_library => "mysql/mysql-connector-java-5.1.34.jar"
         # the name of the driver class for mysql
        jdbc_driver_class => "com.mysql.jdbc.Driver"
         # 數(shù)據(jù)庫重連嘗試次數(shù)
        connection_retry_attempts => "3"
         # 判斷數(shù)據(jù)庫連接是否可用厚脉,默認(rèn)false不開啟
        jdbc_validate_connection => "true"
         # 數(shù)據(jù)庫連接可用校驗(yàn)超時(shí)時(shí)間,默認(rèn)3600S
        jdbc_validation_timeout => "3600"
         # 開啟分頁查詢(默認(rèn)false不開啟)胶惰;
        jdbc_paging_enabled => "true"
         # 單次分頁查詢條數(shù)(默認(rèn)100000,若字段較多且更新頻率較高傻工,建議調(diào)低此值);
        jdbc_page_size => "500"
         # statement為查詢數(shù)據(jù)sql孵滞,如果sql較復(fù)雜中捆,建議配通過statement_filepath配置sql文件的存放路徑;
         # sql_last_value為內(nèi)置的變量坊饶,存放上次查詢結(jié)果中最后一條數(shù)據(jù)tracking_column的值泄伪,此處即為ModifyTime;
         # statement_filepath => "mysql/jdbc.sql"
        statement => "SELECT KeyId,TradeTime,OrderUserName,ModifyTime FROM `DetailTab` WHERE ModifyTime>= :sql_last_value order by ModifyTime asc"
         # 是否將字段名轉(zhuǎn)換為小寫匿级,默認(rèn)true(如果有數(shù)據(jù)序列化蟋滴、反序列化需求染厅,建議改為false);
        lowercase_column_names => false
         # Value can be any of: fatal,error,warn,info,debug津函,默認(rèn)info肖粮;
        sql_log_level => warn
         #
         # 是否記錄上次執(zhí)行結(jié)果,true表示會(huì)將上次執(zhí)行結(jié)果的tracking_column字段的值保存到last_run_metadata_path指定的文件中尔苦;
        record_last_run => true
         # 需要記錄查詢結(jié)果某字段的值時(shí)涩馆,此字段為true,否則默認(rèn)tracking_column為timestamp的值蕉堰;
        use_column_value => true
         # 需要記錄的字段凌净,用于增量同步,需是數(shù)據(jù)庫字段
        tracking_column => "ModifyTime"
         # Value can be any of: numeric,timestamp屋讶,Default value is "numeric"
        tracking_column_type => timestamp
         # record_last_run上次數(shù)據(jù)存放位置冰寻;
        last_run_metadata_path => "mysql/last_id.txt"
         # 是否清除last_run_metadata_path的記錄,需要增量同步時(shí)此字段必須為false皿渗;
        clean_run => false
         #
         # 同步頻率(分 時(shí) 天 月 年)斩芭,默認(rèn)每分鐘同步一次;
        schedule => "* * * * *"
    }
}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
    # convert 字段類型轉(zhuǎn)換乐疆,將字段TotalMoney數(shù)據(jù)類型改為float划乖;
    mutate {
        convert => {
            "TotalMoney" => "float"
        }
    }
}
output {
    elasticsearch {
         # host => "192.168.1.1"
         # port => "9200"
         # 配置ES集群地址
        hosts => ["192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"]
         # 索引名字,必須小寫
        index => "consumption"
         # 數(shù)據(jù)唯一索引(建議使用數(shù)據(jù)庫KeyID)
        document_id => "%{KeyId}"
    }
    stdout {
        codec => json_lines
    }
}
復(fù)制代碼

2.3挤土、多表同步

多表配置和單表配置的區(qū)別在于input模塊的jdbc模塊有幾個(gè)type琴庵,output模塊就需對(duì)應(yīng)有幾個(gè)type;

input {
    stdin {}
    jdbc {
         # 多表同步時(shí)仰美,表類型區(qū)分迷殿,建議命名為“庫名_表名”,每個(gè)jdbc模塊需對(duì)應(yīng)一個(gè)type咖杂;
        type => "TestDB_DetailTab"

         # 其他配置此處省略庆寺,參考單表配置
         # ...
         # ...
         # record_last_run上次數(shù)據(jù)存放位置;
        last_run_metadata_path => "mysql\last_id.txt"
         # 是否清除last_run_metadata_path的記錄诉字,需要增量同步時(shí)此字段必須為false懦尝;
        clean_run => false
         #
         # 同步頻率(分 時(shí) 天 月 年),默認(rèn)每分鐘同步一次壤圃;
        schedule => "* * * * *"
    }
    jdbc {
         # 多表同步時(shí)陵霉,表類型區(qū)分,建議命名為“庫名_表名”伍绳,每個(gè)jdbc模塊需對(duì)應(yīng)一個(gè)type撩匕;
        type => "TestDB_Tab2"
        # 多表同步時(shí),last_run_metadata_path配置的路徑應(yīng)不一致墨叛,避免有影響止毕;
         # 其他配置此處省略
         # ...
         # ...
    }
}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
    # output模塊的type需和jdbc模塊的type一致
    if [type] == "TestDB_DetailTab" {
        elasticsearch {
             # host => "192.168.1.1"
             # port => "9200"
             # 配置ES集群地址
            hosts => ["192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"]
             # 索引名字,必須小寫
            index => "detailtab1"
             # 數(shù)據(jù)唯一索引(建議使用數(shù)據(jù)庫KeyID)
            document_id => "%{KeyId}"
        }
    }
    if [type] == "TestDB_Tab2" {
        elasticsearch {
            # host => "192.168.1.1"
            # port => "9200"
            # 配置ES集群地址
            hosts => ["192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"]
            # 索引名字漠趁,必須小寫
            index => "detailtab2"
            # 數(shù)據(jù)唯一索引(建議使用數(shù)據(jù)庫KeyID)
            document_id => "%{KeyId}"
        }
    }
    stdout {
        codec => json_lines
    }
}
復(fù)制代碼

3扁凛、啟動(dòng)運(yùn)行

在【程序目錄】目錄執(zhí)行以下命令啟動(dòng):

【windows】
bin\logstash.bat -f mysql\jdbc.conf
【linux】
nohup ./bin/logstash -f mysql/jdbc_jx_moretable.conf &
復(fù)制代碼

可新建腳本配置好啟動(dòng)命令,后期直接運(yùn)行即可闯传。
??在【程序目錄】\logs目錄會(huì)有運(yùn)行日志谨朝。

Note:
??5.x/6.X/7.x版本需要jdk8支持,如果默認(rèn)jdk版本不是jdk8甥绿,那么需要在logstash或logstash.lib.sh的行首位置添加兩個(gè)環(huán)境變量:

export JAVA_CMD="/usr/tools/jdk1.8.0_162/bin"
export JAVA_HOME="/usr/tools/jdk1.8.0_162/"
復(fù)制代碼

開機(jī)自啟動(dòng):

  • windows開機(jī)自啟:
    • 方案1:使用windows自帶的任務(wù)計(jì)劃字币;
    • 方案2:nssm注冊(cè)windows服務(wù),blog.csdn.net/u010887744/…
  • linux開機(jī)自啟:

4洗出、問題及解決方案

4.1、數(shù)據(jù)同步后图谷,ES沒有數(shù)據(jù)

output.elasticsearch模塊的index必須是全小寫翩活;

4.2、增量同步后last_run_metadata_path文件內(nèi)容不改變

如果lowercase_column_names配置的不是false便贵,那么tracking_column字段配置的必須是全小寫菠镇。

4.3、提示找不到j(luò)dbc_driver_library

2032 com.mysql.jdbc.Driver not loaded.
Are you sure you've included the correct jdbc driver in :jdbc_driver_library?
復(fù)制代碼

檢測(cè)配置的地址是否正確承璃,如果是linux環(huán)境利耍,注意路徑分隔符是“/”,而不是“\”盔粹。

4.4隘梨、數(shù)據(jù)丟失

statement配置的sql中,如果比較字段使用的是大于“>”玻佩,可能存在數(shù)據(jù)丟失出嘹。
??假設(shè)當(dāng)同步完成后last_run_metadata_path存放的時(shí)間為2019-01-30 20:45:30,而這時(shí)候新入庫一條數(shù)據(jù)的更新時(shí)間也為2019-01-30 20:45:30咬崔,那么這條數(shù)據(jù)將無法同步税稼。
??解決方案:將比較字段使用 大于等于“>=”。

4.5垮斯、數(shù)據(jù)重復(fù)更新

上一個(gè)問題“數(shù)據(jù)丟失”提供的解決方案是比較字段使用“大于等于”郎仆,但這時(shí)又會(huì)產(chǎn)生新的問題。
??假設(shè)當(dāng)同步完成后last_run_metadata_path存放的時(shí)間為2019-01-30 20:45:30兜蠕,而數(shù)據(jù)庫中更新時(shí)間最大值也為2019-01-30 20:45:30扰肌,那么這些數(shù)據(jù)將重復(fù)更新,直到有更新時(shí)間更大的數(shù)據(jù)出現(xiàn)熊杨。
??當(dāng)上述特殊數(shù)據(jù)很多曙旭,且長期沒有新的數(shù)據(jù)更新時(shí)盗舰,會(huì)導(dǎo)致大量的數(shù)據(jù)重復(fù)同步到ES。
??何時(shí)會(huì)出現(xiàn)以上情況呢:①比較字段非“自增”桂躏;②比較字段是程序生成插入钻趋。
解決方案:

  • ①比較字段自增保證不重復(fù)或重復(fù)概率極小(比如使用自增ID或者數(shù)據(jù)庫的timestamp)剂习,這樣就能避免大部分異常情況了蛮位;
  • ②如果確實(shí)存在大量程序插入的數(shù)據(jù),其更新時(shí)間相同鳞绕,且可能長期無數(shù)據(jù)更新失仁,可考慮定期更新數(shù)據(jù)庫中的一條測(cè)試數(shù)據(jù),避免最大值有大量數(shù)據(jù)们何。

4.6萄焦、容災(zāi)

logstash本身無法集群,我們常使用的組合ELK是通過kafka集群變相實(shí)現(xiàn)集群的垂蜗。
??可供選擇的處理方式:①使用任務(wù)程序推送數(shù)據(jù)到kafaka楷扬,由kafka同步數(shù)據(jù)到ES,但任務(wù)程序本身也需要容災(zāi)贴见,并需要考慮重復(fù)推送的問題烘苹;②將logstash加入守護(hù)程序,并輔以第三方監(jiān)控其運(yùn)行狀態(tài)片部。
??具體如何選擇镣衡,需要結(jié)合自身的應(yīng)用場(chǎng)景了。

4.7档悠、海量數(shù)據(jù)同步

為什么會(huì)慢廊鸥?logstash分頁查詢使用臨時(shí)表分頁,每條分頁SQL都是將全集查詢出來當(dāng)作臨時(shí)表辖所,再在臨時(shí)表上分頁查詢惰说。這樣導(dǎo)致每次分頁查詢都要對(duì)主表進(jìn)行一次全表掃描。

SELECT * FROM (SELECT * FROM `ImageCN1`
 WHERE ModifyTime>= '1970-01-01 08:00:00'
 order by ModifyTime asc) AS `t1`
 LIMIT 5000 OFFSET 10000000;
復(fù)制代碼

數(shù)據(jù)量太大缘回,首次同步如何安全過渡同步吆视?
??可考慮在statement對(duì)應(yīng)的sql中加上分頁條件,比如ID在什么范圍酥宴,修改時(shí)間在什么區(qū)間啦吧,將單詞同步的數(shù)據(jù)總量減少。先少量數(shù)據(jù)同步測(cè)試驗(yàn)證拙寡,再根據(jù)測(cè)試情況修改區(qū)間條件啟動(dòng)logstash完成同步授滓。比如將SQL修改為:

SELECT
    * 
FROM
    `ImageCN1` 
WHERE
    ModifyTime < '2018-10-10 10:10:10' AND ModifyTime >= '1970-01-01 08:00:00' 
ORDER BY
    ModifyTime ASC
復(fù)制代碼

當(dāng)同步完ModifyTime<'2018-10-10 10:10:10'區(qū)間的數(shù)據(jù)在修改SQL同步剩余區(qū)間的數(shù)據(jù)。
??這樣需要每次同步后就修改sql,線上運(yùn)營比較繁瑣般堆,是否可以不修改sql在孝,同時(shí)保證同步效率呢?SQL我們可以再修改下:

SELECT
    * 
FROM
    `ImageCN1` 
WHERE
    ModifyTime >= '1970-01-01 08:00:00' 
ORDER BY
    ModifyTime ASC 
    LIMIT 100000
復(fù)制代碼

這樣就能保證每次子查詢的數(shù)據(jù)量不超過10W條淮摔,實(shí)際測(cè)試發(fā)現(xiàn)浑玛,數(shù)據(jù)量很大時(shí)效果很明顯。

[SQL]USE XXXDataDB;
受影響的行: 0
時(shí)間: 0.001s

[SQL]
SELECT
    * 
FROM
    ( SELECT * FROM `ImageCN1` WHERE ModifyTime >= '1970-01-01 08:00:00' ORDER BY ModifyTime ASC ) AS `t1` 
    LIMIT 5000 OFFSET 900000;
受影響的行: 0
時(shí)間: 7.229s

[SQL]
SELECT
    * 
FROM
    ( SELECT * FROM `ImageCN1` WHERE ModifyTime >= '2018-07-18 19:35:10' ORDER BY ModifyTime ASC LIMIT 100000 ) AS `t1` 
    LIMIT 5000 OFFSET 90000
受影響的行: 0
時(shí)間: 1.778s
復(fù)制代碼

測(cè)試可以看出噩咪,SQL不加limit 10W時(shí),越往后分頁查詢?cè)铰模臅r(shí)達(dá)到8S胃碾,而加了limit條件的SQL耗時(shí)穩(wěn)定在2S以內(nèi)。

作者:zxiaofan
鏈接:https://juejin.im/post/5daf2fa56fb9a04e054da1e3
來源:掘金
著作權(quán)歸作者所有筋搏。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán)仆百,非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末奔脐,一起剝皮案震驚了整個(gè)濱河市俄周,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌髓迎,老刑警劉巖峦朗,帶你破解...
    沈念sama閱讀 216,591評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異排龄,居然都是意外死亡波势,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門橄维,熙熙樓的掌柜王于貴愁眉苦臉地迎上來尺铣,“玉大人,你說我怎么就攤上這事争舞×莘蓿” “怎么了?”我有些...
    開封第一講書人閱讀 162,823評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵竞川,是天一觀的道長店溢。 經(jīng)常有香客問我,道長流译,這世上最難降的妖魔是什么逞怨? 我笑而不...
    開封第一講書人閱讀 58,204評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮福澡,結(jié)果婚禮上叠赦,老公的妹妹穿的比我還像新娘。我一直安慰自己,他們只是感情好除秀,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,228評(píng)論 6 388
  • 文/花漫 我一把揭開白布糯累。 她就那樣靜靜地躺著,像睡著了一般册踩。 火紅的嫁衣襯著肌膚如雪泳姐。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,190評(píng)論 1 299
  • 那天暂吉,我揣著相機(jī)與錄音胖秒,去河邊找鬼。 笑死慕的,一個(gè)胖子當(dāng)著我的面吹牛阎肝,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播肮街,決...
    沈念sama閱讀 40,078評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼风题,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了嫉父?” 一聲冷哼從身側(cè)響起沛硅,我...
    開封第一講書人閱讀 38,923評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎绕辖,沒想到半個(gè)月后摇肌,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,334評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡引镊,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,550評(píng)論 2 333
  • 正文 我和宋清朗相戀三年朦蕴,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片弟头。...
    茶點(diǎn)故事閱讀 39,727評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡吩抓,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出赴恨,到底是詐尸還是另有隱情疹娶,我是刑警寧澤,帶...
    沈念sama閱讀 35,428評(píng)論 5 343
  • 正文 年R本政府宣布伦连,位于F島的核電站雨饺,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏惑淳。R本人自食惡果不足惜额港,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,022評(píng)論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望歧焦。 院中可真熱鬧移斩,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,672評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至猖任,卻和暖如春你稚,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背朱躺。 一陣腳步聲響...
    開封第一講書人閱讀 32,826評(píng)論 1 269
  • 我被黑心中介騙來泰國打工刁赖, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人长搀。 一個(gè)月前我還...
    沈念sama閱讀 47,734評(píng)論 2 368
  • 正文 我出身青樓乾闰,卻偏偏與公主長得像,于是被迫代替她去往敵國和親盈滴。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,619評(píng)論 2 354