mysql 作為成熟穩(wěn)定的數(shù)據(jù)持久化解決方案,廣泛地應(yīng)用在各種領(lǐng)域,但是在數(shù)據(jù)分析方面稍有不足,而 elasticsearch 作為數(shù)據(jù)分析領(lǐng)域的佼佼者,剛好可以彌補(bǔ)這項(xiàng)不足侧啼,而我們要做的只需要將 mysql 中的數(shù)據(jù)同步到 elasticsearch 中即可,而 logstash 剛好就可以支持堪簿,所有你需要做的只是寫(xiě)一個(gè)配置文件而已
logstash 獲取
獲取 logstash
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.3.zip
unzip logstash-6.2.3.zip && cd logstash-6.2.3
安裝 jdbc 和 elasticsearch 插件
bin/logstash-plugin install logstash-input-jdbc
bin/logstash-plugin install logstash-output-elasticsearch
獲取 jdbc mysql 驅(qū)動(dòng)
wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.46.zip
unzip mysql-connector-java-5.1.46.zip
編寫(xiě)配置文件
logstash-input-jdbc
使用 logstash-input-jdbc 插件讀取 mysql 的數(shù)據(jù)痊乾,這個(gè)插件的工作原理比較簡(jiǎn)單,就是定時(shí)執(zhí)行一個(gè) sql戴甩,然后將 sql 執(zhí)行的結(jié)果寫(xiě)入到流中符喝,增量獲取的方式?jīng)]有通過(guò) binlog 方式同步,而是用一個(gè)遞增字段作為條件去查詢(xún)甜孤,每次都記錄當(dāng)前查詢(xún)的位置协饲,由于遞增的特性畏腕,只需要查詢(xún)比當(dāng)前大的記錄即可獲取這段時(shí)間內(nèi)的全部增量,一般的遞增字段有兩種茉稠,AUTO_INCREMENT
的主鍵 id
和 ON UPDATE CURRENT_TIMESTAMP
的 update_time
字段描馅,id
字段只適用于那種只有插入沒(méi)有更新的表,update_time
更加通用一些而线,建議在 mysql 表設(shè)計(jì)的時(shí)候都增加一個(gè) update_time
字段
input {
jdbc {
jdbc_driver_library => "../mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://<mysql_host>:3306/rta"
jdbc_user => "<username>"
jdbc_password => "<password>"
schedule => "* * * * *"
statement => "SELECT * FROM table WHERE update_time >= :sql_last_value"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "update_time"
last_run_metadata_path => "syncpoint_table"
}
}
-
jdbc_driver_library
: jdbc mysql 驅(qū)動(dòng)的路徑铭污,在上一步中已經(jīng)下載 -
jdbc_driver_class
: 驅(qū)動(dòng)類(lèi)的名字,mysql 填com.mysql.jdbc.Driver
就好了 -
jdbc_connection_string
: mysql 地址 -
jdbc_user
: mysql 用戶(hù) -
jdbc_password
: mysql 密碼 -
schedule
: 執(zhí)行 sql 時(shí)機(jī)膀篮,類(lèi)似 crontab 的調(diào)度 -
statement
: 要執(zhí)行的 sql嘹狞,以 ":" 開(kāi)頭是定義的變量,可以通過(guò) parameters 來(lái)設(shè)置變量誓竿,這里的sql_last_value
是內(nèi)置的變量磅网,表示上一次 sql 執(zhí)行中 update_time 的值,這里update_time
條件是>=
因?yàn)闀r(shí)間有可能相等筷屡,沒(méi)有等號(hào)可能會(huì)漏掉一些增量 -
use_column_value
: 使用遞增列的值 -
tracking_column_type
: 遞增字段的類(lèi)型涧偷,numeric
表示數(shù)值類(lèi)型,timestamp
表示時(shí)間戳類(lèi)型 -
tracking_column
: 遞增字段的名稱(chēng),這里使用 update_time 這一列毙死,這列的類(lèi)型是timestamp
-
last_run_metadata_path
: 同步點(diǎn)文件燎潮,這個(gè)文件記錄了上次的同步點(diǎn),重啟時(shí)會(huì)讀取這個(gè)文件扼倘,這個(gè)文件可以手動(dòng)修改
logstash-output-elasticsearch
output {
elasticsearch {
hosts => ["172.31.22.165", "172.31.17.241", "172.31.30.84", "172.31.18.178"]
user => "<user>"
password => "<password>"
index => "table"
document_id => "%{id}"
}
}
-
hosts
: es 集群地址 -
user
: es 用戶(hù)名 -
password
: es 密碼 -
index
: 導(dǎo)入到 es 中的 index 名确封,這里我直接設(shè)置成了 mysql 表的名字 -
document_id
: 導(dǎo)入到 es 中的文檔 id,這個(gè)需要設(shè)置成主鍵唉锌,否則同一條記錄更新后在 es 中會(huì)出現(xiàn)兩條記錄隅肥,%{id}
表示引用 mysql 表中id
字段的值
運(yùn)行
把上面的代碼保存到一個(gè)配置文件里面 sync_table.cfg
,執(zhí)行下面命令即可
cd logstash-6.2.3 && bin/logstash -f config/sync_table.cfg
如果成功了會(huì)在標(biāo)準(zhǔn)輸出輸出執(zhí)行的 sql 語(yǔ)句
[2018-04-14T18:12:00,278][INFO ][logstash.inputs.jdbc ] (0.001011s) SELECT version()
[2018-04-14T18:12:00,284][INFO ][logstash.inputs.jdbc ] (0.000723s) SELECT * FROM table WHERE update_time > '2018-04-14 17:55:00'
其他問(wèn)題
多表同步
一個(gè) logstash 實(shí)例可以借助 pipelines 機(jī)制同步多個(gè)表袄简,只需要寫(xiě)多個(gè)配置文件就可以了,假設(shè)我們有兩個(gè)表 table1 和 table2泛啸,對(duì)應(yīng)兩個(gè)配置文件 sync_table1.cfg
和 sync_table2.cfg
在 config/pipelines.yml
中配置
- pipeline.id: table1
path.config: "config/sync_table1.cfg"
- pipeline.id: table2
path.config: "config/sync_table2.cfg"
直接 bin/logstash
啟動(dòng)即可
@timestamp
字段
默認(rèn)情況下 @timestamp
字段是 logstash-input-jdbc 添加的字段绿语,默認(rèn)是當(dāng)前時(shí)間,這個(gè)字段在數(shù)據(jù)分析的時(shí)候非常有用候址,但是有時(shí)候我們希望使用數(shù)據(jù)中的某些字段來(lái)指定這個(gè)字段吕粹,這個(gè)時(shí)候可以使用 filter.date, 這個(gè)插件是專(zhuān)門(mén)用來(lái)設(shè)置 @timestamp
這個(gè)字段的
比如我有我希望用字段 timeslice
來(lái)表示 @timestamp
,timeslice
是一個(gè)字符串岗仑,格式為 %Y%m%d%H%M
filter {
date {
match => [ "timeslice", "yyyyMMddHHmm" ]
timezone => "Asia/Shanghai"
}
}
把這一段配置加到 sync_table.cfg
中匹耕,現(xiàn)在 @timestamp
和 timeslice
一致了
參考鏈接
- logstash-input-jdbc 插件: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
- logstash-output-elasticsearch 插件: https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
- logstash-multiple-piplines: https://www.elastic.co/blog/logstash-multiple-pipelines
- logstash-filter-date 插件: https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html
轉(zhuǎn)載請(qǐng)注明出處
本文鏈接:http://www.hatlonely.com/2018/04/14/logstash-mysql-%E5%87%86%E5%AE%9E%E6%97%B6%E5%90%8C%E6%AD%A5%E5%88%B0-elasticsearch/