需求
將數(shù)據(jù)庫中的數(shù)據(jù)同步到 ES,借助 ES 的全文搜索安疗,提高搜索的速度绘搞。
- 需要把新增用戶信息同步到 ElasticSearch 中;
- 用戶信息被 Update 后嚷缭,需要能被更新到 ElasticSearch 中饮亏;
- 支持增量更新耍贾;
- 用戶注銷后阅爽,不能被 ES 搜索到;
JDBC Input Plugin
JDBC Input Plugin 可以將數(shù)據(jù)從數(shù)據(jù)庫讀到 Logstash荐开。
- 需要自己提供所需的 JDBC Driver付翁;
- JDBC Input Plugin 支持定時任務(wù) Scheduling,其語法來自 Rufus-scheduler晃听,其擴展了 Cron百侧,使用 Cron 的語法可以完成任務(wù)的觸發(fā);
- JDBC Input Plugin 支持通過 Tracking_column / sql_last_value 的方式記錄 State能扒,最終實現(xiàn)增量的更新佣渴;
JDBC Input Plugin | 舉個栗子
把驅(qū)動拷貝到 Logstash 的目錄下
/home/lixinlei/application/logstash-7.7.0/logstash-core/lib/jars/mysql-connector-java-8.0.11.jar
準備好 JDBC Input Plugin 需要的 .yaml 文件
input {
jdbc {
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/db_example?useSSL=false"
jdbc_user => root
jdbc_password => Jiangdi_2018
#啟用增量更新,如果為true初斑,則需要指定 tracking_column
use_column_value => true
#指定追蹤的字段辛润,
tracking_column => "last_updated"
#追蹤字段的類型,目前只有數(shù)字(numeric)和時間類型(timestamp)见秤,默認是數(shù)字類型
tracking_column_type => "numeric"
#記錄最后一次運行的結(jié)果
record_last_run => true
#最后一次運行的結(jié)果的保存位置砂竖,在 Logstash 的工作目錄下
last_run_metadata_path => "jdbc-position.txt"
# 觸發(fā)時執(zhí)行的 SQL,sql_last_value 就是保存在 jdbc-position.txt 中的
statement => "SELECT * FROM user where last_updated >:sql_last_value;"
# Cron 語法鹃答,每秒做一次觸發(fā)
schedule => " * * * * * *"
}
}
output {
elasticsearch {
document_id => "%{id}"
document_type => "_doc"
index => "users"
hosts => ["http://localhost:9200"]
}
stdout{
codec => rubydebug
}
}
準備 Springboot 的程序
- 這個程序的作用主要是對 MySQL 中的一張表做增刪改查乎澄;
用 mysql-demo.yaml 啟動 Logstash
bin/logstash -f mysql-demo.conf
通過 Springboot 程序向 MySQL 中寫入數(shù)據(jù)
curl localhost:8080/demo/add -d name=Mike -d email=mike@xyz.com -d tags=Elasticsearch,IntelliJ
curl localhost:8080/demo/add -d name=Jack -d email=jack@xyz.com -d tags=Mysql,IntelliJ
curl localhost:8080/demo/add -d name=Bob -d email=bob@xyz.com -d tags=Mysql,IntelliJ
- Logstash 讀到新數(shù)據(jù),并寫進 ElasticSearch测摔;
- 通過 Kibana 可以查詢到數(shù)據(jù)
GET users/_search
置济;
更新 MySQL 中的數(shù)據(jù)
curl -X PUT localhost:8080/demo/update -d id=3 -d name=Bob2 -d email=bob2@xyz.com -d tags=Mysql,IntelliJ
Logstash 可以讀到更新,完了把更新也寫到 ElasticSearch 中;
刪除 MySQL 中的用戶
curl -X DELETE localhost:8080/demo/delete -d id=3
Logstash 可以讀到更新浙于,完了把更新也寫到 ElasticSearch 中修噪;
通過給索引 users 設(shè)置 alias,使得查詢只查沒被刪除的用戶
- 通過 view_users 查詢用戶路媚,就只能查到?jīng)]有被刪除的用戶黄琼;
POST /_aliases
{
"actions": [
{
"add": {
"index": "users",
"alias": "view_users",
"filter" : {
"term" : {
"is_deleted" : false
}
}
}
}
]
}
POST view_users/_search
{
}