之前的文章有講過(guò)MySQL到Elasticsearch的多種數(shù)據(jù)同步方案(多種MySQL與Elasticsearch的數(shù)據(jù)同步解決方案)损俭,今天再來(lái)講下MySQL到Redis的幾種數(shù)據(jù)同步方案。
第一種方案:使用 canal 工具
在之前MySQL同步ES的文章中有簡(jiǎn)單提過(guò)這款工具,但是因?yàn)闆](méi)有用過(guò)所以沒(méi)有詳細(xì)講刚盈,今天就使用這款工具來(lái)實(shí)現(xiàn)MySQL到Redis的數(shù)據(jù)同步(同理肛著,同步到ES甲喝、MySQL等也是一樣的操作)
canal 是阿里巴巴開(kāi)源的一款提供增量數(shù)據(jù)訂閱和消費(fèi)的工具悠咱,應(yīng)用場(chǎng)景有:數(shù)據(jù)庫(kù)鏡像荣瑟、數(shù)據(jù)庫(kù)實(shí)時(shí)備份扫皱、索引構(gòu)建和實(shí)時(shí)維護(hù)足绅、業(yè)務(wù) cache 刷新捷绑、帶業(yè)務(wù)邏輯的增量數(shù)據(jù)處理等。
原理就與MySQL主從復(fù)制相似编检,canal 模擬 MySQL slave 的交互協(xié)議胎食,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送dump 協(xié)議允懂,從binlog日志中獲取數(shù)據(jù)厕怜。
-
MySQL配置
MySQL需要開(kāi)啟 Binlog 寫(xiě)入功能,配置 binlog-format 為 ROW 模式
[mysqld] log-bin=mysql-bin # 開(kāi)啟 binlog binlog-format=ROW # 選擇 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定義蕾总,不要和 canal 的 slaveId 重復(fù)
授權(quán) canal 鏈接 MySQL 賬號(hào)具有作為 MySQL slave 的權(quán)限, 如果已有賬戶可直接 grant
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
-
下載 canal
下載地址粥航,選擇自己需要的版本:https://github.com/alibaba/canal/releases
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz mkdir /usr/local/canal tar -zxvf canal.deployer-1.1.5.tar.gz -C /usr/local/canal/ rm -rf canal.deployer-1.1.5.tar.gz cd /usr/local/canal/
-
修改配置
vi conf/example/instance.properties
## mysql serverId 不能與mysql的server-id一致 canal.instance.mysql.slaveId = 1234 #position info,需要改成自己的數(shù)據(jù)庫(kù)信息 canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #username/password生百,需要改成自己的數(shù)據(jù)庫(kù)信息 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 #table regex canal.instance.filter.regex = .\*\\\\..\*
-
安裝java的JDK
# 查看可安裝jdk版本 yum search java | grep -i --color JDK # 選擇某一版本進(jìn)行安裝 yum install java-1.8.0-openjdk.x86_64 # 安裝完成后確認(rèn)JDK安裝完畢递雀,如果輸出了版本號(hào),證明安裝正確 java -version
PS:這里有個(gè)坑蚀浆,要安裝 JDK8缀程,我用的 JDK11的環(huán)境,發(fā)現(xiàn)啟動(dòng)不了canal市俊,報(bào)錯(cuò)
Error: Could not create the Java Virtual Machine.
杨凑,切換成 JDK8 就好了 -
啟動(dòng)
sh bin/startup.sh
-
檢查
查看 server 日志 和 instance 的日志,有正確的內(nèi)容輸出證明啟動(dòng)成功
vi logs/canal/canal.log vi logs/example/example.log
或者使用
ps -ef | grep canal
查看canal進(jìn)程 -
關(guān)閉命令
sh bin/stop.sh
-
安裝 canal-php
canal 提供了多語(yǔ)言的客戶端摆昧,可采用不同語(yǔ)言實(shí)現(xiàn)不同的消費(fèi)邏輯撩满,我用的PHP客戶端,它的詳細(xì)介紹以及其他語(yǔ)言客戶端看文檔:https://github.com/alibaba/canal/wiki#%E5%A4%9A%E8%AF%AD%E8%A8%80
composer require xingwenge/canal_php
我這通過(guò)創(chuàng)建一個(gè)命令來(lái)測(cè)試
php artisan make:command canal
內(nèi)容
<?php namespace App\Console\Commands; use App\Services\CanalToRedisService; use Illuminate\Console\Command; use xingwenge\canal_php\CanalClient; use xingwenge\canal_php\CanalConnectorFactory; use xingwenge\canal_php\Fmt; class canal extends Command { /** * The name and signature of the console command. * * @var string */ protected $signature = 'canal'; /** * The console command description. * * @var string */ protected $description = 'canal客戶端'; /** * Create a new command instance. * * @return void */ public function __construct() { parent::__construct(); } /** * */ public function handle() { try { // 創(chuàng)建客戶端绅你,默認(rèn)使用 socket 來(lái)通信 $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE); // 這個(gè)是使用 swoole //$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE); $client->connect("127.0.0.1", 11111); $client->checkValid(); // $client->subscribe("1001", "example", ".*\\..*"); // 設(shè)置過(guò)濾伺帘,指定要同步的表,上邊那種方式是不限制 $client->subscribe("1001", "example", "lmrs.lmrs_shops"); while (true) { $message = $client->get(100); if ($entries = $message->getEntries()) { foreach ($entries as $entry) { // Fmt::println($entry); // 在這里進(jìn)行具體業(yè)務(wù)的邏輯處理忌锯,比如同步數(shù)據(jù)到 redis伪嫁,es,mysql等 CanalToRedisService::println($entry); } } sleep(1); } $client->disConnect(); } catch (\Exception $e) { echo $e->getMessage(), PHP_EOL; } } }
這里我創(chuàng)建了個(gè)service來(lái)處理同步到Redis的邏輯偶垮,內(nèi)容如下
<?php namespace App\Services; use Com\Alibaba\Otter\Canal\Protocol\Column; use Com\Alibaba\Otter\Canal\Protocol\Entry; use Com\Alibaba\Otter\Canal\Protocol\EntryType; use Com\Alibaba\Otter\Canal\Protocol\EventType; use Com\Alibaba\Otter\Canal\Protocol\RowChange; use Com\Alibaba\Otter\Canal\Protocol\RowData; class CanalToRedisService { /** * @param Entry $entry * @throws \Exception */ public static function println($entry) { switch ($entry->getEntryType()) { case EntryType::TRANSACTIONBEGIN: case EntryType::TRANSACTIONEND: return; break; } $rowChange = new RowChange(); $rowChange->mergeFromString($entry->getStoreValue()); $evenType = $rowChange->getEventType(); $header = $entry->getHeader(); $table = $header->getSchemaName().'_'.$header->getTableName(); /** @var RowData $rowData */ foreach ($rowChange->getRowDatas() as $rowData) { switch ($evenType) { case EventType::DELETE: // 刪除 self::delete($table, self::ptColumn($rowData->getBeforeColumns())); break; case EventType::INSERT: // 新增 self::insert($table, self::ptColumn($rowData->getAfterColumns())); break; default: // 更新 self::update($table, self::ptColumn($rowData->getBeforeColumns()), self::ptColumn($rowData->getAfterColumns())); break; } } } /** * 將數(shù)據(jù)表的字段名和值組裝成數(shù)組 * @param $columns * @return array */ private static function ptColumn($columns) { $argv = []; foreach ($columns as $value) { $argv[$value->getName()] = $value->getValue(); } // dump($argv); return $argv; } /** * 新增操作 * 可以根據(jù)表名進(jìn)行判斷具體的業(yè)務(wù)操作 * @param string $table 數(shù)據(jù)表名 * @param array $data 數(shù)據(jù) */ private static function insert($table, $data) { app('redis')->set("shop::".$data['id'], serialize($data)); } /** * 刪除操作 * 業(yè)務(wù)處理很簡(jiǎn)單礼殊,這里就不寫(xiě)了,自己完善 * @param string $table 數(shù)據(jù)表名 * @param array $data 數(shù)據(jù) */ private static function delete($table, $data) { // } /** * 更新操作 * 業(yè)務(wù)處理很簡(jiǎn)單针史,這里就不寫(xiě)了晶伦,自己完善 * @param string $table 數(shù)據(jù)表名 * @param array $befor_data 更改前的數(shù)據(jù) * @param array $after_data 更改后的數(shù)據(jù) */ private static function update($table, $befor_data, $after_data) { // } }
-
測(cè)試
可以在service里多dump一些參數(shù),運(yùn)行
php artisan canal
查看輸出啄枕,新增MySQL數(shù)據(jù)婚陪,查看Redis是否有變化 canal 還可以結(jié)合消息中間件來(lái)實(shí)現(xiàn)更高效的數(shù)據(jù)同步,比如:Kafka/RocketMQ 频祝。使用文檔:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
第二種方案:使用 RabbitMQ 消息隊(duì)列
RabbitMQ 是一個(gè)由erlang語(yǔ)言編寫(xiě)的泌参、開(kāi)源的脆淹、在AMQP基礎(chǔ)上完整的、可復(fù)用的企業(yè)消息系統(tǒng)沽一。支持多種語(yǔ)言盖溺,包括java、Python铣缠、ruby烘嘱、PHP、C/C++等蝗蛙。
RabbitMQ 的核心概念:
- 生產(chǎn)者(Producer):發(fā)送消息的應(yīng)用
- 消費(fèi)者(Consumer):接收消息的應(yīng)用
- 隊(duì)列(Queue):存儲(chǔ)消息的緩存
- 消息(Message):由生產(chǎn)者通過(guò)RabbitMQ發(fā)送給消費(fèi)者的信息
- 連接(Connection):連接RabbitMQ和應(yīng)用服務(wù)器的TCP連接
- 通道(Channel):連接里的一個(gè)虛擬通道蝇庭。當(dāng)你通過(guò)消息隊(duì)列發(fā)送或者接收消息時(shí),這個(gè)操作都是通過(guò)通道進(jìn)行的
- 交換機(jī)(Exchange):交換機(jī)負(fù)責(zé)從生產(chǎn)者那里接收消息捡硅,并根據(jù)交換類(lèi)型分發(fā)到對(duì)應(yīng)的消息列隊(duì)里哮内。要實(shí)現(xiàn)消息的接收,一個(gè)隊(duì)列必須到綁定一個(gè)交換機(jī)
- 綁定(Binding):綁定是隊(duì)列和交換機(jī)的一個(gè)關(guān)聯(lián)連接
- 路由鍵(Routing Key):路由鍵是供交換機(jī)查看并根據(jù)鍵來(lái)決定如何分發(fā)消息到列隊(duì)的一個(gè)鍵壮韭。路由鍵可以說(shuō)是消息的目的地址
RabbitMQ 的工作模式:
- 簡(jiǎn)單隊(duì)列
- 工作隊(duì)列
- 發(fā)布訂閱模式
- 路由模式
- 主題模式
理論介紹完畢北发,接下來(lái)進(jìn)入實(shí)操
-
安裝 RabbitMQ
手動(dòng)編譯安裝 RabbitMQ 很麻煩,還得先安裝 erlang 環(huán)境喷屋,所以這里我就直接使用docker安裝了鲫竞。附上erlang和RabbitMQ的下載地址,之后有時(shí)間再去嘗試手動(dòng)安裝
erlang:https://www.erlang.org/downloads
RabbitMQ:https://github.com/rabbitmq/rabbitmq-server/releases/
拉取docker鏡像
docker pull rabbitmq
構(gòu)建容器
docker run -d -p 5672:5672 -p 15672:15672 --hostname my-rabbit -v /docker/rabbitmq:/var/lib/rabbitmq --privileged=true --name rabbitmq rabbitmq
-
進(jìn)入RabbitMQ 容器安裝可視化界面:rabbitmq_management
docker exec -it rabbitmq bash rabbitmq-plugins enable rabbitmq_management
在瀏覽器訪問(wèn) ip:15672 打開(kāi)可視化界面逼蒙,賬號(hào)密碼默認(rèn)都是:guest
-
安裝擴(kuò)展
PHP調(diào)用RabbitMQ需要amqp的擴(kuò)展,下載地址:https://pecl.php.net/package/amqp
wget https://pecl.php.net/get/amqp-1.10.2.tgz tar -zxvf amqp-1.10.2.tgz cd amqp-1.10.2 phpize ./configure --with-php-config=/usr/local/bin/php-config
到這里報(bào)了一個(gè)錯(cuò)寄疏,
configure: error: librabbitmq not found
意思是還缺少個(gè)rabbitmq-c接著下載是牢,地址:https://github.com/alanxz/rabbitmq-c/releases
wget https://github.com/alanxz/rabbitmq-c/archive/refs/tags/v0.11.0.tar.gz tar -zxvf v0.11.0.tar.gz cd rabbitmq-c-0.11.0/ yum -y install cmake cmake . -DCMAKE_INSTALL_PREFIX=/usr/local/rabbitmq-c make && make install
重新編譯amqp
cd amqp-1.10.2 ./configure --with-php-config=/usr/local/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c make && make install
在 php.ini 中加入
extension=amqp.so
,然后重啟php -
rabbimq在laravel中使用
安裝組件
composer require vladimir-yuldashev/laravel-queue-rabbitmq "10.X" --ignore-platform-reqs
在 config/queue.php文件的 connections 中加入配置
'rabbitmq' => [ 'driver' => 'rabbitmq', 'queue' => env('RABBITMQ_QUEUE', 'default'), 'connection' => PhpAmqpLib\Connection\AMQPLazyConnection::class, 'hosts' => [ [ 'host' => env('RABBITMQ_HOST', '127.0.0.1'), 'port' => env('RABBITMQ_PORT', 5672), 'user' => env('RABBITMQ_USER', 'guest'), 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'vhost' => env('RABBITMQ_VHOST', '/'), ], ], 'options' => [ 'ssl_options' => [ 'cafile' => env('RABBITMQ_SSL_CAFILE', null), 'local_cert' => env('RABBITMQ_SSL_LOCALCERT', null), 'local_key' => env('RABBITMQ_SSL_LOCALKEY', null), 'verify_peer' => env('RABBITMQ_SSL_VERIFY_PEER', true), 'passphrase' => env('RABBITMQ_SSL_PASSPHRASE', null), ], 'queue' => [ 'job' => VladimirYuldashev\LaravelQueueRabbitMQ\Queue\Jobs\RabbitMQJob::class, ], ], /* * Set to "horizon" if you wish to use Laravel Horizon. */ 'worker' => env('RABBITMQ_WORKER', 'default'), ],
在 .env 文件中加入配置
# 將默認(rèn)的 sync 改為 rabbitmq QUEUE_CONNECTION=rabbitmq # mq的ip地址 RABBITMQ_HOST=172.17.0.10 # mq的端口 RABBITMQ_PORT=5672 # mq的賬號(hào) RABBITMQ_USER=guest # mq的密碼 RABBITMQ_PASSWORD=guest # 默認(rèn)的虛擬主機(jī) RABBITMQ_VHOST=my_vhost # 默認(rèn)隊(duì)列名稱 RABBITMQ_QUEUE=lmrs
-
創(chuàng)建 service
<?php /** * Created by PhpStorm * User: Ricky Wong * Date: 2021/8/5 * Time: 0:49 */ namespace App\Services; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class RabbitmqService { public static function getConnect() { //RABBITMQ 配置項(xiàng) $config = [ 'host' => env('RABBITMQ_HOST', '127.0.0.1'), 'port' => env('RABBITMQ_PORT', 5672), 'user' => env('RABBITMQ_USER', 'guest'), 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'vhost' => env('RABBITMQ_VHOST', '/'), ]; return new AMQPStreamConnection($config["host"],$config["port"],$config["user"],$config["password"],$config["vhost"]); } /** * 生產(chǎn)者 * @param $queue * @param $messageBody * @param string $exchange */ public static function push($queue,$messageBody,$exchange='router') { //獲取連接 $connection = self::getConnect(); //構(gòu)建通道 $channel = $connection->channel(); //聲明一個(gè)隊(duì)列 $channel->queue_declare($queue,false,true,false,false); //指定交換機(jī) 以路由模式 $channel->exchange_declare($exchange,'direct',false,true,false); //綁定隊(duì)列和類(lèi)型 $channel->queue_bind($queue,$exchange); $message = new AMQPMessage($messageBody,array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)); //消息推送 $channel->basic_publish($message,$exchange); $channel->close(); $connection->close(); } /** * 消費(fèi)者 * @param $queue * @param $callback * @param string $exchange */ public static function pop($queue,$callback,$exchange='router') { $connection = self::getConnect(); $channel = $connection->channel(); //從隊(duì)列中取出消息 $message = $channel->basic_get($queue); $res = $callback($message->getBody()); if ($res){ //ack 驗(yàn)證 $channel->basic_ack($message->getDeliveryTag()); } $channel->close(); $connection->close(); } }
-
創(chuàng)建異步任務(wù)
php artisan make:job SyncToRedis
編輯內(nèi)容
<?php namespace App\Jobs; use Illuminate\Bus\Queueable; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; use App\Services\RabbitmqService; class SyncToRedis implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; protected $key; /** * Create a new job instance. * * @param $data */ public function __construct($data) { $this->key = 'lmrs::product::info::'.$data->id; //寫(xiě)入隊(duì)列 RabbitmqService::push('update_queue', $data); } /** * Execute the job. * * @return void */ public function handle() { // 消費(fèi)消息 RabbitmqService::pop('update_queue', function ($message) { $product = app('redis')->set($this->key, serialize($message)); if (!$product){ return; } return true; }); } /** * 異常處理 * @param \Exception $exception */ public function failed(\Exception $exception) { print_r($exception->getMessage()); } }
-
在需要同步的地方觸發(fā)任務(wù)
dispatch(new SyncToRedis(Product::find($request->input("id"))));