es常用來解決大數據量下mysql查詢的性能問題顽分,而他們之間的數據同步問題就很關鍵。mysql和es的數據同步方案網上有很多顽馋,在這里總結記錄下我使用過的三種方案皱碘。
還有一個阿里云的開源軟件 canal 也可以解決這個同步問題,他的原理與下邊的 go-mysql-elasticsearch 很像均唉,都是通過監(jiān)控MySQL的binlog日志來實現同步的是晨。但是我沒具體使用過,就不多說它了舔箭,感興趣的自己去搜一下這款工具署鸡。
第一種方案:代碼層面實現
項目開發(fā)我使用的是Laravel框架,所以采用了 Laravel Redis 隊列 + ES API 的方式來實現的數據同步限嫌。
原理:使用 Laravel Redis 隊列靴庆,在代碼中MySQL新增數據之后觸發(fā)異步任務調用 ES 的 API,將數據同步到ES中怒医。
這種方案的好處就是實現和維護簡單炉抒,缺點就是與業(yè)務代碼耦合太重
PS:如何在laravel中接入es這個就不說了,不了解的可以看之前的文章:docker安裝es以及在Laravel中的接入
-
在es中先創(chuàng)建好相應的索引(這是個商城項目稚叹,以新增商品為例)
PUT /products/ { "mappings": { "properties": { "name":{ "type": "text", "analyzer": "ik_smart" }, "long_name":{ "type": "text", "analyzer": "ik_smart" }, "brand_id":{ "type": "integer" }, "category_id":{ "type":"integer" }, "shop_id":{ "type":"integer" }, "price":{ "type":"scaled_float", "scaling_factor":100 }, "sold_count":{ "type":"integer" }, "review_count":{ "type":"integer" }, "status":{ "type":"integer" }, "create_time" : { "type" : "date" }, "last_time" : { "type" : "date" } } } }
-
修改laravel隊列驅動為Redis
# 在.env文件中修改 QUEUE_CONNECTION=redis # 如果要修改更多默認配置在 config/queue.php 文件中
-
在商品模型(App\Models\Product.php)中配置
/** * 取出要同步到 es中的數據 * @return array */ public function toESArray() { $arr = Arr::only($this->toArray(), [ 'id', 'name', 'long_name', 'brand_id', 'category_id', 'shop_id', 'price', 'sold_count', 'review_count', 'status', 'create_time', 'last_time' ]); return $arr; }
-
創(chuàng)建監(jiān)聽任務
php artisan make:job SyncProductToES
-
編寫任務中的代碼
<?php namespace App\Jobs; use App\Models\Product; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; class SyncProductToES implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; protected $product; /** * Create a new job instance. * * @return void */ public function __construct(Product $product) { $this->product = $product; } /** * Execute the job. * * @return void */ public function handle() { $data = $this->product->toESArray(); app('es')->index([ 'index' => 'products', 'type' => '_doc', 'id' => $data['id'], 'body' => $data, ]); } }
-
在需要數據同步的地方觸發(fā)這個任務
$form->saved(function (Form $form) { $product = $form->model(); dispatch(new SyncProductToES($product)); });
-
啟動隊列
php artisan queue:work
-
將mysql中已有的數據導入到es中
上述一系列操作焰薄,可以實現增量同步,在每次新增數據時都會寫入es扒袖。舊數據的全量同步我這里通過創(chuàng)建一個 Artisan 命令來實現塞茅。
創(chuàng)建命令
php artisan make:command Elasticsearch/SyncProducts
編寫代碼
<?php namespace App\Console\Commands\Elasticsearch; use App\Models\Product; use Illuminate\Console\Command; class SyncProducts extends Command { /** * The name and signature of the console command. * * @var string */ protected $signature = 'es:sync-products'; /** * The console command description. * * @var string */ protected $description = '將商品數據同步到 Elasticsearch'; /** * Create a new command instance. * * @return void */ public function __construct() { parent::__construct(); } /** * Execute the console command. */ public function handle() { // 獲取 es 對象 $es = app('es'); Product::query() // 使用 chunkById 避免一次性加載過多數據 ->chunkById(100, function ($products) use ($es) { $this->info(sprintf('正在同步 ID 范圍為 %s 至 %s 的商品', $products->first()->id, $products->last()->id)); // 初始化請求體 $req = ['body' => []]; // 遍歷商品 foreach ($products as $product) { // 將商品模型轉為 es 所用的數組 $data = $product->toESArray(); $req['body'][] = [ 'index' => [ '_index' => 'products', '_type' => '_doc', '_id' => $data['id'], ], ]; $req['body'][] = $data; } try { // 使用 bulk 方法批量創(chuàng)建 $es->bulk($req); } catch (\Exception $e) { $this->error($e->getMessage()); } }); $this->info('同步完成'); } }
測試命令
php artisan es:sync-products
-
線上部署
在生產環(huán)境中,一般需要安裝 Horizon 隊列管理工具 和 Supervisor 進程監(jiān)視器 來更好的管理隊列以及提高穩(wěn)定性季率。這兩款工具的安裝配置直接看laravel官方文檔就好野瘦,寫的很詳細:https://learnku.com/docs/laravel/7.x/horizon/7514
第二種方案:使用 go-mysql-elasticsearch 工具
go-mysql-elasticsearch是一款開源的高性能的MySQL數據同步到ES的工具,由go語言開發(fā),編譯及使用非常簡單鞭光。
原理:使用mysqldump獲取當前MySQL的數據吏廉,然后再通過此時binlog的name和position獲取增量數據,再根據binlog構建restful api寫入數據到ES中惰许。
這種方案的好處是數據同步性能非常高席覆,而且與業(yè)務代碼完全解耦;缺點是增加了開發(fā)成本汹买,使用相對復雜佩伤,需要安裝go語言的運行環(huán)境,在多表關聯(lián)同步下操作比較繁瑣
注意事項:(很重要晦毙,一定要看)
GitHub文檔中說使用的版本要求是:MySQL < 8.0 ES < 6.0
但經過測試畦戒,我的版本是 MySQL:8.0.26,ES:7.12.1结序,也可以實現增量同步障斋。只不過不能用mysqldump來同步舊數據,因為MySQL8.0之后與之前版本相比改變挺多徐鹤,目前的 go-mysql-elasticsearch 版本還不支持MySQL8.0的mysqldump
MySQL binlog 格式必須是ROW模式
必須在MySQL配置文件中修改此參數垃环,改為row:binlog_format=row
要同步的MySQL數據表必須包含主鍵,否則直接忽略返敬。這是因為如果數據表沒有主鍵遂庄,UPDATE和DELETE操作就會因為在ES中找不到對應的document而無法進行同步
在 go-mysql-elasticsearch 運行時不能更改MySQL表結構
-
安裝 go
官網下載地址,自行選擇版本:https://golang.google.cn/dl/
[root@VM-0-8-centos]# wget https://golang.google.cn/dl/go1.15.5.linux-amd64.tar.gz [root@VM-0-8-centos]# tar -C /usr/local -zxvf go1.15.5.linux-amd64.tar.gz
或者centos下直接使用yum源安裝
yum install -y go
配置環(huán)境變量(GOPATH 是go項目代碼放置的目錄)
[root@VM-0-8-centos go]# vim /etc/profile export GOROOT=/usr/local/go export GOPATH=/usr/local/app/go export PATH=$PATH:/usr/local/go/bin [root@VM-0-8-centos go]# source /etc/profile
測試劲赠,查看go版本
[root@VM-0-8-centos]# go version go version go1.15.5 linux/amd64 [root@VM-0-8-centos]#
-
安裝 go-mysql-elasticsearch
安裝依賴包
yum install -y gettext-devel openssl-devel perl-CPAN perl-devel zlib-devel
安裝 go-mysql-elasticsearch
PS:因為GitHub國內時常上不去涛目,所以這條命令如果拉取失敗的話就翻個墻,去GitHub下載安裝包
go get github.com/siddontang/go-mysql-elasticsearch
下載完成后會存放到上邊環(huán)境變量中配置的項目地址中凛澎,進入執(zhí)行 make 操作
[root@VM-0-8-centos ~]# cd $GOPATH/src/github.com/siddontang/go-mysql-elasticsearch [root@VM-0-8-centos go-mysql-elasticsearch]# ls clear_vendor.sh cmd Dockerfile elastic etc go.mod go.sum LICENSE Makefile README.md river [root@VM-0-8-centos go-mysql-elasticsearch]# make
安裝完成修改配置文件霹肝,配置文件路徑就是下載的這個安裝包的 etc 目錄下
需要修改的地方我都加了注釋,其他的配置用默認的即可
[root@VM-0-8-centos go-mysql-elasticsearch]# vim etc/river.toml # MySQL address, user and password # user must have replication privilege in MySQL. my_addr = "127.0.0.1:3306" # mysql地址與端口 my_user = "root" # mysql用戶名 my_pass = "" # mysql密碼 my_charset = "utf8" # mysql字符集 # Set true when elasticsearch use https #es_https = false # Elasticsearch address es_addr = "127.0.0.1:9200" # es的地址與端口 # Elasticsearch user and password, maybe set by shield, nginx, or x-pack es_user = "" # es用戶名塑煎,沒有默認為空即可 es_pass = "" # es密碼沫换,沒有默認為空即可 # Path to store data, like master.info, if not set or empty, # we must use this to support breakpoint resume syncing. # TODO: support other storage, like etcd. data_dir = "./var" # 數據存儲目錄 # Inner Http status address stat_addr = "127.0.0.1:12800" stat_path = "/metrics" # pseudo server id like a slave server_id = 1001 # mysql or mariadb flavor = "mysql" # mysqldump execution path # if not set or empty, ignore mysqldump. mysqldump = "mysqldump" # 如果設置為空,則不會同步mysql中現有的舊數據 # if we have no privilege to use mysqldump with --master-data, # we must skip it. #skip_master_data = false # minimal items to be inserted in one bulk bulk_size = 128 # force flush the pending requests if we don't have enough items >= bulk_size flush_bulk_time = "200ms" # Ignore table without primary key skip_no_pk_table = false # MySQL data source [[source]] schema = "test" # 需要同步的mysql數據庫 # Only below tables will be synced into Elasticsearch. # "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023 # I don't think it is necessary to sync all tables in a database. tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"] # 需要同步的mysql數據表 # Below is for special rule mapping # Very simple example # # desc t; # +-------+--------------+------+-----+---------+-------+ # | Field | Type | Null | Key | Default | Extra | # +-------+--------------+------+-----+---------+-------+ # | id | int(11) | NO | PRI | NULL | | # | name | varchar(256) | YES | | NULL | | # +-------+--------------+------+-----+---------+-------+ # # The table `t` will be synced to ES index `test` and type `t`. # 定義mysql和es同步的對應關系最铁,有幾個寫幾個讯赏,下邊多余的可以刪掉 [[rule]] schema = "test" # 需要同步的mysql數據庫 table = "t" # 需要同步的mysql數據表 index = "test" # 需要同步的es索引 type = "t" # 需要同步的es類型,es7之后類型只有一種冷尉,只能設為 _doc # Wildcard table rule, the wildcard table must be in source tables # All tables which match the wildcard format will be synced to ES index `test` and type `t`. # In this example, all tables must have same schema with above table `t`; [[rule]] schema = "test" table = "t_[0-9]{4}" index = "test" type = "t" # Simple field rule # # desc tfield; # +----------+--------------+------+-----+---------+-------+ # | Field | Type | Null | Key | Default | Extra | # +----------+--------------+------+-----+---------+-------+ # | id | int(11) | NO | PRI | NULL | | # | tags | varchar(256) | YES | | NULL | | # | keywords | varchar(256) | YES | | NULL | | # +----------+--------------+------+-----+---------+-------+ # [[rule]] schema = "test" table = "tfield" index = "test" type = "tfield" # 這個配置是定義mysql中的字段對應es中的字段漱挎,如果全都一致可以刪掉這個配置 [rule.field] # Map column `id` to ES field `es_id` id="es_id" # 這個就是指mysql中的id字段對應es中的es_id字段,下邊同理 # Map column `tags` to ES field `es_tags` with array type tags="es_tags,list" # Map column `keywords` to ES with array type keywords=",list" # Filter rule # # desc tfilter; # +-------+--------------+------+-----+---------+-------+ # | Field | Type | Null | Key | Default | Extra | # +-------+--------------+------+-----+---------+-------+ # | id | int(11) | NO | PRI | NULL | | # | c1 | int(11) | YES | | 0 | | # | c2 | int(11) | YES | | 0 | | # | name | varchar(256) | YES | | NULL | | # +-------+--------------+------+-----+---------+-------+ # [[rule]] schema = "test" table = "tfilter" index = "test" type = "tfilter" # Only sync following columns filter = ["id", "name"] # 指定mysql中哪些字段需要同步 # id rule # # desc tid_[0-9]{4}; # +----------+--------------+------+-----+---------+-------+ # | Field | Type | Null | Key | Default | Extra | # +----------+--------------+------+-----+---------+-------+ # | id | int(11) | NO | PRI | NULL | | # | tag | varchar(256) | YES | | NULL | | # | desc | varchar(256) | YES | | NULL | | # +----------+--------------+------+-----+---------+-------+ # [[rule]] schema = "test" table = "tid_[0-9]{4}" index = "test" type = "t" # The es doc's id will be `id`:`tag` # It is useful for merge muliple table into one type while theses tables have same PK id = ["id", "tag"]
再提供個本次測試使用的配置文件雀哨,去掉了所有的注釋磕谅,這樣看起來簡潔一點
my_addr = "172.17.0.4:3306" my_user = "root" my_pass = "root" my_charset = "utf8" es_addr = "172.17.0.7:9200" es_user = "" es_pass = "" data_dir = "/docker/data" stat_addr = "127.0.0.1:12800" stat_path = "/metrics" server_id = 1001 flavor = "mysql" mysqldump = "" bulk_size = 128 flush_bulk_time = "200ms" skip_no_pk_table = false [[source]] schema = "lmrs" tables = ["lmrs_products"] [[rule]] schema = "lmrs" table = "lmrs_products" index = "products" type = "_doc" filter = ["id","name","long_name","brand_id","shop_id","price","sold_count","review_count","status","create_time","last_time","three_category_id"] [rule.field] mysql = "three_category_id" elastic = "category_id"
啟動 go-mysql-elasticsearch,輸出以下信息證明成功
[root@VM-0-8-centos go-mysql-elasticsearch]# ./bin/go-mysql-elasticsearch -config=./etc/river.toml [2021/08/01 13:37:06] [info] binlogsyncer.go:141 create BinlogSyncer with config {1001 mysql 127.0.0.1 3306 root utf8mb4 false false <nil> false UTC false 0 0s 0s 0 false 0} [2021/08/01 13:37:06] [info] dump.go:180 skip dump, use last binlog replication pos (mysql-bin.000001, 2606) or GTID set <nil> [2021/08/01 13:37:06] [info] binlogsyncer.go:362 begin to sync binlog from position (mysql-bin.000001, 2606) [2021/08/01 13:37:06] [info] binlogsyncer.go:211 register slave for master server 127.0.0.1:3306 [2021/08/01 13:37:06] [info] sync.go:25 start sync binlog at binlog file (mysql-bin.000001, 2606) [2021/08/01 13:37:06] [info] binlogsyncer.go:731 rotate to (mysql-bin.000001, 2606) [2021/08/01 13:37:06] [info] sync.go:71 rotate binlog to (mysql-bin.000001, 2606) [2021/08/01 13:37:06] [info] master.go:54 save position (mysql-bin.000001, 2606)
-
如果覺得上述兩步太麻煩,可以直接使用docker來安裝 go-mysql-elasticsearch怜庸,鏡像中自帶了go語言環(huán)境
拉取鏡像
docker pull gozer/go-mysql-elasticsearch
構建容器,其中 river.toml 配置文件與上邊的內容一樣
docker run -p 12345:12345 -d --name go-mysql-es -v /docker/go-mysql-es/river.toml:/config/river.toml --privileged=true gozer/go-mysql-elasticsearch
第三種方案:使用 Logstash 工具
Logstash 是免費且開放的服務器端數據處理管道垢村,能夠從多個來源采集數據割疾,轉換數據,然后將數據發(fā)送到您最喜歡的“存儲庫”中嘉栓,可與各種部署集成宏榕。 它提供了大量插件,可幫助你解析侵佃,豐富麻昼,轉換和緩沖來自各種來源的數據。 如果你的數據需要 Beats 中沒有的其他處理馋辈,則需要將 Logstash 添加到部署中抚芦。
這個工具不止可以用來做mysql到es的數據同步,它的應用場景還有:日志搜索器( logstash采集迈螟、處理叉抡、轉發(fā)到elasticsearch存儲,在kibana進行展示)答毫、elk日志分析(elasticsearch + logstash + kibana)等褥民。
它既可以全量同步舊數據,也可以增量同步新數據洗搂,而且對mysql和es沒有版本方面的限制消返,只需對應版本即可
-
安裝
官方下載地址:https://www.elastic.co/cn/downloads/past-releases#logstash
PS: logstash 的版本一定要和 es 保持一致,我的 es 是 7.12.1 版本耘拇,所以 logstash 也下載的 7.12.1 版本
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.12.1-linux-x86_64.tar.gz
也可以直接使用docker安裝撵颊,更方便
docker pull logstash:7.12.1
-
安裝兩個插件
logstash-input-jdbc:連接讀取mysql中數據的插件(6.0之后的版本已經自帶了,再次安裝會提示報錯)
logstash-output-elasticsearch:數據輸出到es的插件
[root@localhost]# tar -C /usr/local -zxvf logstash-7.12.1-linux-x86_64.tar.gz [root@localhost]# cd /usr/local/logstash-7.12.1/bin [root@localhost bin]# ./logstash-plugin install logstash-input-jdbc ... ERROR: Installation aborted, plugin 'logstash-input-jdbc' is already provided by 'logstash-integration-jdbc' [root@localhost bin]# ./logstash-plugin install logstash-output-elasticsearch ... Installation successful [root@localhost bin]#
-
下載 jdbc 的 mysql-connection.jar 包惫叛,版本與自己的 mysql 版本保持一致
[root@localhost logstash-7.12.1]# mkdir pipeline [root@localhost logstash-7.12.1]# cd pipeline/ [root@localhost pipeline]# wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.26/mysql-connector-java-8.0.26.jar
-
更改配置文件
[root@localhost logstash-7.12.1]# vi config/logstash.yml # 加入以下內容秦驯,下邊那個是es的地址,根據自己的情況改 http.host: "0.0.0.0" xpack.monitoring.elasticsearch.hosts: ["http://172.17.0.2:9200"] [root@localhost logstash-7.12.1]# vi config/pipelines.yml # 加入以下內容挣棕,路徑同樣也是根據自己實際的來 pipeline.id: table1 path.config: "/usr/local/logstash-7.12.1/pipeline/logstash.config"
-
創(chuàng)建上邊配置里的指定的配置文件 logstash.config
vi pipeline/logstash.config
input { stdin {} # 可以有多個jdbc译隘,來同步不同的數據表 jdbc { # 類型,區(qū)分開每個 jdbc洛心,以便輸出的時候做判斷 type => "product" # 注意mysql連接地址一定要用ip固耘,不能使用localhost等 jdbc_connection_string => "jdbc:mysql://172.17.0.4:3306/lmrs" jdbc_user => "root" jdbc_password => "root" # 數據庫重連嘗試次數 connection_retry_attempts => "3" # 數據庫連接校驗超時時間,默認為3600s jdbc_validation_timeout => "3600" # 這個jar包就是上邊下載那個词身,可以是絕對路徑也可以是相對路徑厅目,把地址寫對 jdbc_driver_library => "/usr/local/logstash-7.12.1/pipeline/mysql-connector-java-8.0.26.jar" # 驅動類名 jdbc_driver_class => "com.mysql.jdbc.Driver" # 開啟分頁,默認是 false jdbc_paging_enabled => "true" # 單次分頁查詢條數(默認100000,字段較多的話损敷,可以適當調整這個數值) jdbc_page_size => "50000" # 要執(zhí)行的sql葫笼,從這查出的數據就會同步到es中 statement => "select id,`name`,long_name,brand_id,three_category_id as category_id,shop_id,price,status,sold_count,review_count,create_time,last_time from lmrs_products" # 執(zhí)行的sql文件路徑,這與上邊的 statement 參數 二選一 # statement_filepath => "/usr/local/logstash-7.12.1/pipeline/products.sql" # 是否將字段名轉為小寫拗馒,默認為true(如果具備序列化或者反序列化路星,建議設置為false) lowercase_column_names => false # 需要記錄查詢結果某字段的值時,此字段為true诱桂,否則默認tracking_colum為timestamp的值 use_column_value => true # 需要記錄的字段洋丐,同于增量同步,需要是數據庫字段 tracking_column => id # 記錄字段的數據類型 tracking_column_type => numeric # 上次數據存放位置 record_last_run => true # 上一個sql_last_value的存放路徑挥等,必須在文件中指定字段的初始值友绝,手動創(chuàng)建文件并賦予讀寫權限 last_run_metadata_path => "/usr/local/logstash-7.12.1/pipeline/products.txt" # 是否清除last_run_metadata_path的記錄,需要增量同步這個字段的值必須為false clean_run => false # 設置定時任務間隔 含義:分肝劲、時猜揪、天碧库、月蝇裤、年柄错,全部為*默認為每分鐘跑一次任務 schedule => "* * * * *" } } output { # 判斷類型 if [type] == "product" { # es的配置 elasticsearch { hosts => "172.17.0.2:9200" index => "products" document_type => "_doc" document_id => "%{id}" } } # 日志輸出 stdout { codec => json_lines } }
-
啟動 Logstash(--config.reload.automatic 選項啟用自動配置重新加載,不必在每次修改配置文件時停止并重新啟動 Logstash)
[root@localhost logstash-7.12.1]# ./bin/logstash -f pipeline/logstash.config --config.reload.automatic
瀏覽器訪問 ip:9600 可以打印出以下信息證明啟動成功
{"host":"localhost.localdomain","version":"7.12.1","http_address":"0.0.0.0:9600","id":"15320442-569b-4bfd-a0d6-4c71619bc06d","name":"localhost.localdomain","ephemeral_id":"f6868c4c-fff1-4b6a-89d9-4ca7ea469c6e","status":"green","snapshot":false,"pipeline":{"workers":4,"batch_size":125,"batch_delay":50},"build_date":"2021-04-20T19:51:54Z","build_sha":"a0a95c823ae2da19a75f44a01784665e7ad23d15","build_snapshot":false}
總結
go-mysql-elasticsearch 和 Logstash 工具都可以放到 Supervisor 中來管控催蝗,來提高穩(wěn)定性切威。
這三種方案總的來說各有利弊,至于選擇哪種個人認為:如果項目不是特別大丙号,數據量增長速度也不快先朦,對性能沒太高的要求的話可以考慮第一種,因為實現簡單犬缨,利于維護喳魏;如果對雙方同步性能要求比較高,或者數據量很大的情況下怀薛,就考慮后兩種刺彩。