萬字長文解密數(shù)據(jù)異構最佳實踐(含完整代碼實現(xiàn))!啦膜!

寫在前面

在當今互聯(lián)網(wǎng)行業(yè)有送,尤其是現(xiàn)在分布式、微服務開發(fā)環(huán)境下僧家,為了提高搜索效率雀摘,以及搜索的精準度,會大量使用Redis八拱、Memcached等NoSQL數(shù)據(jù)庫阵赠,也會使用大量的Solr、Elasticsearch等全文檢索服務和搜索引擎肌稻。那么豌注,這個時候,就會有一個問題需要我們來思考和解決:那就是數(shù)據(jù)同步的問題灯萍!如何將實時變化的數(shù)據(jù)庫中的數(shù)據(jù)同步到Redis/Memcached或者Solr/Elasticsearch中呢轧铁?

互聯(lián)網(wǎng)背景下的數(shù)據(jù)同步需求

在當今互聯(lián)網(wǎng)行業(yè),尤其是現(xiàn)在分布式旦棉、微服務開發(fā)環(huán)境下齿风,為了提高搜索效率,以及搜索的精準度绑洛,會大量使用Redis救斑、Memcached等NoSQL數(shù)據(jù)庫,也會使用大量的Solr真屯、Elasticsearch等全文檢索服務脸候。那么,這個時候绑蔫,就會有一個問題需要我們來思考和解決:那就是數(shù)據(jù)同步的問題运沦!如何將實時變化的數(shù)據(jù)庫中的數(shù)據(jù)同步到Redis/Memcached或者Solr/Elasticsearch中呢?

例如配深,我們在分布式環(huán)境下向數(shù)據(jù)庫中不斷的寫入數(shù)據(jù)携添,而我們讀數(shù)據(jù)可能需要從Redis、Memcached或者Elasticsearch篓叶、Solr等服務中讀取烈掠。那么羞秤,數(shù)據(jù)庫與各個服務中數(shù)據(jù)的實時同步問題,成為了我們亟待解決的問題左敌。

試想瘾蛋,由于業(yè)務需要,我們引入了Redis矫限、Memcached或者Elasticsearch瘦黑、Solr等服務。使得我們的應用程序可能會從不同的服務中讀取數(shù)據(jù)奇唤,如下圖所示幸斥。

img

本質上講,無論我們引入了何種服務或者中間件咬扇,數(shù)據(jù)最終都是從我們的MySQL數(shù)據(jù)庫中讀取出來的甲葬。那么,問題來了懈贺,如何將MySQL中的數(shù)據(jù)實時同步到其他的服務或者中間件呢经窖?

注意:為了更好的說明問題,后面的內容以MySQL數(shù)據(jù)庫中的數(shù)據(jù)同步到Solr索引庫為例進行說明梭灿。

數(shù)據(jù)同步解決方案

1.在業(yè)務代碼中同步

在增加画侣、修改、刪除之后堡妒,執(zhí)行操作Solr索引庫的邏輯代碼配乱。例如下面的代碼片段。

public ResponseResult updateStatus(Long[] ids, String status){
    try{
        goodsService.updateStatus(ids, status);
        if("status_success".equals(status)){
            List<TbItem> itemList = goodsService.getItemList(ids, status);
            itemSearchService.importList(itemList);
            return new ResponseResult(true, "修改狀態(tài)成功")
        }
    }catch(Exception e){
        return new ResponseResult(false, "修改狀態(tài)失敗");
    }
}

優(yōu)點:

操作簡便皮迟。

缺點:

業(yè)務耦合度高搬泥。

執(zhí)行效率變低。

2.定時任務同步

在數(shù)據(jù)庫中執(zhí)行完增加伏尼、修改忿檩、刪除操作后,通過定時任務定時的將數(shù)據(jù)庫的數(shù)據(jù)同步到Solr索引庫中爆阶。

定時任務技術有:SpringTask燥透,Quartz。

哈哈辨图,還有我開源的mykit-delay框架班套,開源地址為:https://github.com/sunshinelyz/mykit-delay

這里執(zhí)行定時任務時徒役,需要注意的一個技巧是:第一次執(zhí)行定時任務時孽尽,從MySQL數(shù)據(jù)庫中以時間字段進行倒序排列查詢相應的數(shù)據(jù)窖壕,并記錄當前查詢數(shù)據(jù)的時間字段的最大值忧勿,以后每次執(zhí)行定時任務查詢數(shù)據(jù)的時候杉女,只要按時間字段倒序查詢數(shù)據(jù)表中的時間字段大于上次記錄的時間值的數(shù)據(jù),并且記錄本次任務查詢出的時間字段的最大值即可鸳吸,從而不需要再次查詢數(shù)據(jù)表中的所有數(shù)據(jù)熏挎。

注意:這里所說的時間字段指的是標識數(shù)據(jù)更新的時間字段,也就是說晌砾,使用定時任務同步數(shù)據(jù)時坎拐,為了避免每次執(zhí)行任務都會進行全表掃描,最好是在數(shù)據(jù)表中增加一個更新記錄的時間字段养匈。

優(yōu)點:

同步Solr索引庫的操作與業(yè)務代碼完全解耦哼勇。

缺點:

數(shù)據(jù)的實時性并不高。

3.通過MQ實現(xiàn)同步

在數(shù)據(jù)庫中執(zhí)行完增加呕乎、修改积担、刪除操作后,向MQ中發(fā)送一條消息猬仁,此時帝璧,同步程序作為MQ中的消費者,從消息隊列中獲取消息湿刽,然后執(zhí)行同步Solr索引庫的邏輯的烁。

我們可以使用下圖來簡單的標識通過MQ實現(xiàn)數(shù)據(jù)同步的過程。

img

我們可以使用如下代碼實現(xiàn)這個過程诈闺。

public ResponseResult updateStatus(Long[] ids, String status){
    try{
        goodsService.updateStatus(ids, status);
        if("status_success".equals(status)){
            List<TbItem> itemList = goodsService.getItemList(ids, status);
            final String jsonString = JSON.toJSONString(itemList);
            jmsTemplate.send(queueSolr, new MessageCreator(){
                @Override
                public Message createMessage(Session session) throws JMSException{
                    return session.createTextMessage(jsonString);
                }
            });
        }
        return new ResponseResult(true, "修改狀態(tài)成功");
    }catch(Exception e){
        return new ResponseResult(false, "修改狀態(tài)失敗");
    }
}

優(yōu)點:

業(yè)務代碼解耦渴庆,并且能夠做到準實時。

缺點:

需要在業(yè)務代碼中加入發(fā)送消息到MQ的代碼雅镊,數(shù)據(jù)調用接口耦合把曼。

4.通過Canal實現(xiàn)實時同步

Canal是阿里巴巴開源的一款數(shù)據(jù)庫日志增量解析組件,通過Canal來解析數(shù)據(jù)庫的日志信息漓穿,來檢測數(shù)據(jù)庫中表結構和數(shù)據(jù)的變化嗤军,從而更新Solr索引庫。

使用Canal可以做到業(yè)務代碼完全解耦晃危,API完全解耦锦聊,可以做到準實時。

Canal簡介

阿里巴巴MySQL數(shù)據(jù)庫binlog增量訂閱與消費組件孟抗,基于數(shù)據(jù)庫增量日志解析疫诽,提供增量數(shù)據(jù)訂閱與消費,目前主要支持了MySQL鳍鸵。

Canal開源地址:https://github.com/alibaba/canal苇瓣。

Canal工作原理

MySQL主從復制的實現(xiàn)

img

從上圖可以看出,主從復制主要分成三步:

  • Master節(jié)點將數(shù)據(jù)的改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件偿乖,binary log events击罪,可以通過show binlog events進行查看)哲嘲。
  • Slave節(jié)點將Master節(jié)點的二進制日志事件(binary log events)拷貝到它的中繼日志(relay log)。
  • Slave節(jié)點重做中繼日志中的事件將改變反映到自己本身的數(shù)據(jù)庫中媳禁。

Canal內部原理

首先眠副,我們來看下Canal的原理圖,如下所示竣稽。

img

原理大致描述如下:

  • Canal 模擬 MySQL slave 的交互協(xié)議囱怕,偽裝自己為 MySQL Slave ,向 MySQL Master 發(fā)送dump 協(xié)議
  • MySQL Master 收到 dump 請求毫别,開始推送 binary log 給 Slave (即 Canal )
  • Canal 解析 binary log 對象(原始為 byte 流)

Canal內部結構

img

說明如下:

  • Server:代表一個Canal運行實例娃弓,對應一個JVM進程。
  • Instance:對應一個數(shù)據(jù)隊列(1個Server對應1個或者多個Instance)岛宦。

接下來忘闻,我們再來看下Instance下的子模塊,如下所示恋博。

img
  • EventParser:數(shù)據(jù)源接入齐佳,模擬Slave協(xié)議和Master節(jié)點進行交互,協(xié)議解析债沮。
  • EventSink:EventParser和EventStore的連接器炼吴,對數(shù)據(jù)進行過濾、加工疫衩、歸并和分發(fā)等處理硅蹦。
  • EventSore:數(shù)據(jù)存儲。
  • MetaManager:增量訂閱和消費信息管理闷煤。

Canal環(huán)境準備

設置MySQL遠程訪問

grant all privileges on *.* to 'root'@'%' identified by '123456';
flush privileges;

MySQL配置

注意:這里的MySQL是基于5.7版本進行說明的童芹。

Canal的原理基于MySQL binlog技術,所以鲤拿,要想使用Canal就要開啟MySQL的binlog寫入功能假褪,建議配置binlog的模式為row。

可以在MySQL命令行輸入如下命令來查看binlog的模式近顷。

SHOW VARIABLES LIKE 'binlog_format';

執(zhí)行效果如下所示生音。

img

可以看到,在MySQL中默認的binlog格式為STATEMENT窒升,這里我們需要將STATEMENT修改為ROW缀遍。修改/etc/my.cnf文件。

vim /etc/my.cnf

在[mysqld]下面新增如下三項配置饱须。

log-bin=mysql-bin  #開啟MySQL二進制日志
binlog_format=ROW #將二進制日志的格式設置為ROW
server_id=1 #server_id需要唯一域醇,不能與Canal的slaveId重復

修改完my.cnf文件后,需要重啟MySQL服務。

service mysqld restart

接下來譬挚,我們再次查看binlog模式锅铅。

SHOW VARIABLES LIKE 'binlog_format';
img

可以看到,此時殴瘦,MySQL的binlog模式已經被設置為ROW了狠角。

MySQL創(chuàng)建用戶授權

Canal的原理是模式自己為MySQL Slave号杠,所以一定要設置MySQL Slave的相關權限蚪腋。這里,需要創(chuàng)建一個主從同步的賬戶姨蟋,并且賦予這個賬戶相關的權限屉凯。

CREATE USER canal@'localhost' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost';
FLUSH PRIVILEGES;
img

Canal部署安裝

下載Canal

這里,我們以Canal 1.1.1版本進行說明眼溶,小伙伴們可以到鏈接 https://github.com/alibaba/canal/releases/tag/canal-1.1.1 下載Canal 1.1.1版本悠砚。

img

上傳解壓

將下載好的Canal安裝包,上傳到服務器堂飞,并執(zhí)行如下命令進行解壓

mkdir -p /usr/local/canal
tar -zxvf canal.deployer-1.1.1.tar.gz -C /usr/local/canal/

解壓后的目錄如下所示灌旧。

img

各目錄的說明如下:

  • bin:存儲可執(zhí)行腳本。
  • conf:存放配置文件绰筛。
  • lib:存放其他依賴或者第三方庫枢泰。
  • logs:存放的是日志文件。

修改配置文件

在Canal的conf目錄下有一個canal.properties文件铝噩,這個文件中配置的是Canal Server相關的配置衡蚂,在這個文件中有如下一行配置。

canal.destinations=example

這里的example就相當于Canal的一個Instance骏庸,可以在這里配置多個Instance毛甲,多個Instance之間以逗號分隔即可。同時具被,這里的example也對應著Canal的conf目錄下的一個文件夾玻募。也就是說,Canal中的每個Instance實例都對應著conf目錄下的一個子目錄一姿。

接下來补箍,我們需要修改Canal的conf目錄下的example目錄的一個配置文件instance.properties。

vim instance.properties

修改如下配置項啸蜜。

#################################################################
## canal slaveId,注意:不要與MySQL的server_id重復
canal.instance.mysql.slaveId = 1234

#position info坑雅,需要改成自己的數(shù)據(jù)庫信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =

#username/password,需要改成自己的數(shù)據(jù)庫信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =canaldb
canal.instance.connectionCharset = UTF-8

#table regex
canal.instance.filter.regex = canaldb\\..*
#################################################################

選項含義:

  • canal.instance.mysql.slaveId : mysql集群配置中的serverId概念衬横,需要保證和當前mysql集群中id唯一;
  • canal.instance.master.address: mysql主庫鏈接地址;
  • canal.instance.dbUsername : mysql數(shù)據(jù)庫帳號;
  • canal.instance.dbPassword : mysql數(shù)據(jù)庫密碼;
  • canal.instance.defaultDatabaseName : mysql鏈接時默認數(shù)據(jù)庫;
  • canal.instance.connectionCharset : mysql 數(shù)據(jù)解析編碼;
  • canal.instance.filter.regex : mysql 數(shù)據(jù)解析關注的表裹粤,Perl正則表達式.

啟動Canal

配置完Canal后,就可以啟動Canal了。進入到Canal的bin目錄下遥诉,輸入如下命令啟動Canal拇泣。

./startup.sh

測試Canal

導入并修改源碼

這里,我們使用Canal的源碼進行測試矮锈,下載Canal的源碼后霉翔,將其導入到IDEA中。

img

接下來苞笨,我們找到example下的SimpleCanalClientTest類進行測試债朵。這個類的源碼如下所示。

package com.alibaba.otter.canal.example;

import java.net.InetSocketAddress;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;

/**
 * 單機模式的測試例子
 * 
 * @author jianghang 2013-4-15 下午04:19:20
 * @version 1.0.4
 */
public class SimpleCanalClientTest extends AbstractCanalClientTest {

    public SimpleCanalClientTest(String destination){
           super(destination);
     }

    public static void main(String args[]) {
        // 根據(jù)ip瀑凝,直接創(chuàng)建鏈接序芦,無HA的功能
        String destination = "example";
        String ip = AddressUtils.getHostIp();
        CanalConnector connector = CanalConnectors.newSingleConnector(
            new InetSocketAddress(ip, 11111),
                destination,
                "canal",
                "canal");

        final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination);
        clientTest.setConnector(connector);
        clientTest.start();
        Runtime.getRuntime().addShutdownHook(new Thread() {

            public void run() {
                try {
                    logger.info("## stop the canal client");
                    clientTest.stop();
                } catch (Throwable e) {
                    logger.warn("##something goes wrong when stopping canal:", e);
                } finally {
                    logger.info("## canal client is down.");
                }
            }

        });
    }
}

可以看到,這個類中粤咪,使用的destination為example谚中。在這個類中,我們只需要將IP地址修改為Canal Server的IP即可寥枝。

具體為:將如下一行代碼宪塔。

String ip = AddressUtils.getHostIp();

修改為:

String ip = "192.168.175.100"

由于我們在配置Canal時,沒有指定用戶名和密碼囊拜,所以某筐,我們還需要將如下代碼。

CanalConnector connector = CanalConnectors.newSingleConnector(
    new InetSocketAddress(ip, 11111),
    destination,
    "canal",
    "canal");

修改為:

CanalConnector connector = CanalConnectors.newSingleConnector(
    new InetSocketAddress(ip, 11111),
    destination,
    "",
    "");

修改完成后艾疟,運行main方法啟動程序来吩。

測試數(shù)據(jù)變更

接下來,在MySQL中創(chuàng)建一個canaldb數(shù)據(jù)庫蔽莱。

create database canaldb;

此時會在IDEA的命令行輸出相關的日志信息弟疆。

****************************************************
* Batch Id: [7] ,count : [3] , memsize : [149] , Time : 2020-08-05 23:25:35
* Start : [mysql-bin.000007:6180:1540286735000(2020-08-05 23:25:35)] 
* End : [mysql-bin.000007:6356:1540286735000(2020-08-05 23:25:35)] 
****************************************************

接下來,我在canaldb數(shù)據(jù)庫中創(chuàng)建數(shù)據(jù)表盗冷,并對數(shù)據(jù)表中的數(shù)據(jù)進行增刪改查怠苔,程序輸出的日志信息如下所示。

#在mysql進行數(shù)據(jù)變更后仪糖,這里會顯示mysql的bin日志柑司。
****************************************************
* Batch Id: [7] ,count : [3] , memsize : [149] , Time : 2020-08-05 23:25:35
* Start : [mysql-bin.000007:6180:1540286735000(2020-08-05 23:25:35)] 
* End : [mysql-bin.000007:6356:1540286735000(2020-08-05 23:25:35)] 
****************************************************

================> binlog[mysql-bin.000007:6180] , executeTime : 1540286735000(2020-08-05 23:25:35) , gtid : () , delay : 393ms
 BEGIN ----> Thread id: 43
----------------> binlog[mysql-bin.000007:6311] , name[canal,canal_table] , eventType : DELETE , executeTime : 1540286735000(2020-08-05 23:25:35) , gtid : () , delay : 393 ms
id : 8    type=int(10) unsigned
name : 512    type=varchar(255)
----------------
 END ----> transaction id: 249
================> binlog[mysql-bin.000007:6356] , executeTime : 1540286735000(2020-08-05 23:25:35) , gtid : () , delay : 394ms

****************************************************
* Batch Id: [8] ,count : [3] , memsize : [149] , Time : 2020-08-05 23:25:35
* Start : [mysql-bin.000007:6387:1540286869000(2020-08-05 23:25:49)] 
* End : [mysql-bin.000007:6563:1540286869000(2020-08-05 23:25:49)] 
****************************************************

================> binlog[mysql-bin.000007:6387] , executeTime : 1540286869000(2020-08-05 23:25:49) , gtid : () , delay : 976ms
 BEGIN ----> Thread id: 43
----------------> binlog[mysql-bin.000007:6518] , name[canal,canal_table] , eventType : INSERT , executeTime : 1540286869000(2020-08-05 23:25:49) , gtid : () , delay : 976 ms
id : 21    type=int(10) unsigned    update=true
name : aaa    type=varchar(255)    update=true
----------------
 END ----> transaction id: 250
================> binlog[mysql-bin.000007:6563] , executeTime : 1540286869000(2020-08-05 23:25:49) , gtid : () , delay : 977ms

****************************************************
* Batch Id: [9] ,count : [3] , memsize : [161] , Time : 2020-08-05 23:26:22
* Start : [mysql-bin.000007:6594:1540286902000(2020-08-05 23:26:22)] 
* End : [mysql-bin.000007:6782:1540286902000(2020-08-05 23:26:22)] 
****************************************************

================> binlog[mysql-bin.000007:6594] , executeTime : 1540286902000(2020-08-05 23:26:22) , gtid : () , delay : 712ms
 BEGIN ----> Thread id: 43
----------------> binlog[mysql-bin.000007:6725] , name[canal,canal_table] , eventType : UPDATE , executeTime : 1540286902000(2020-08-05 23:26:22) , gtid : () , delay : 712 ms
id : 21    type=int(10) unsigned
name : aaac    type=varchar(255)    update=true
----------------
 END ----> transaction id: 252
================> binlog[mysql-bin.000007:6782] , executeTime : 1540286902000(2020-08-05 23:26:22) , gtid : () , delay : 713ms

數(shù)據(jù)同步實現(xiàn)

需求

將數(shù)據(jù)庫數(shù)據(jù)的變化, 通過canal解析binlog日志, 實時更新到solr的索引庫中。

具體實現(xiàn)

創(chuàng)建工程

創(chuàng)建Maven工程mykit-canal-demo锅劝,并在pom.xml文件中添加如下配置攒驰。

<dependencies>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.0.24</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.protocol</artifactId>
        <version>1.0.24</version>
    </dependency>
    <dependency>
        <groupId>commons-lang</groupId>
        <artifactId>commons-lang</artifactId>
        <version>2.6</version>
    </dependency>
    <dependency>
        <groupId>org.codehaus.jackson</groupId>
        <artifactId>jackson-mapper-asl</artifactId>
        <version>1.8.9</version>
    </dependency>

    <dependency>
        <groupId>org.apache.solr</groupId>
        <artifactId>solr-solrj</artifactId>
        <version>4.10.3</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.9</version>
        <scope>test</scope>
    </dependency>

</dependencies>

創(chuàng)建log4j配置文件xml

在工程的src/main/resources目錄下創(chuàng)建log4j.properties文件,內容如下所示故爵。

log4j.rootCategory=debug, CONSOLE

# CONSOLE is set to be a ConsoleAppender using a PatternLayout.
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-6r [%15.15t] %-5p %30.30c %x - %m\n

# LOGFILE is set to be a File appender using a PatternLayout.
# log4j.appender.LOGFILE=org.apache.log4j.FileAppender
# log4j.appender.LOGFILE.File=d:\axis.log
# log4j.appender.LOGFILE.Append=true
# log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout
# log4j.appender.LOGFILE.layout.ConversionPattern=%d{ISO8601} %-6r [%15.15t] %-5p %30.30c %x - %m\n

創(chuàng)建實體類

在io.mykit.canal.demo.bean包下創(chuàng)建一個Book實體類玻粪,用于測試Canal的數(shù)據(jù)傳輸,如下所示。

package io.mykit.canal.demo.bean;
import org.apache.solr.client.solrj.beans.Field;
import java.util.Date;
public class Book implements Serializable {
    private static final long serialVersionUID = -6350345408771427834L;{

    @Field("id")
    private Integer id;

    @Field("book_name")
    private String name;

    @Field("book_author")
    private String author;

    @Field("book_publishtime")
    private Date publishtime;

    @Field("book_price")
    private Double price;

    @Field("book_publishgroup")
    private String publishgroup;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getAuthor() {
        return author;
    }

    public void setAuthor(String author) {
        this.author = author;
    }

    public Date getPublishtime() {
        return publishtime;
    }

    public void setPublishtime(Date publishtime) {
        this.publishtime = publishtime;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    public String getPublishgroup() {
        return publishgroup;
    }

    public void setPublishgroup(String publishgroup) {
        this.publishgroup = publishgroup;
    }

    @Override
    public String toString() {
        return "Book{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", author='" + author + '\'' +
                ", publishtime=" + publishtime +
                ", price=" + price +
                ", publishgroup='" + publishgroup + '\'' +
                '}';
    }
}

其中劲室,我們在Book實體類中伦仍,使用Solr的注解@Field定義了實體類字段與Solr域之間的關系。

各種工具類的實現(xiàn)

接下來很洋,我們就在io.mykit.canal.demo.utils包下創(chuàng)建各種工具類充蓝。

  • BinlogValue

用于存儲binlog分析的每行每列的value值,代碼如下所示喉磁。

package io.mykit.canal.demo.utils;
import java.io.Serializable;
/**
 * 
 * ClassName: BinlogValue <br/> 
 * 
 * binlog分析的每行每列的value值谓苟;<br>
 * 新增數(shù)據(jù):beforeValue 和 value 均為現(xiàn)有值;<br>
 * 修改數(shù)據(jù):beforeValue是修改前的值线定;value為修改后的值娜谊;<br>
 * 刪除數(shù)據(jù):beforeValue和value均是刪除前的值确买; 這個比較特殊主要是為了刪除數(shù)據(jù)時方便獲取刪除前的值<br>
 */
public class BinlogValue implements Serializable {

    private static final long serialVersionUID = -6350345408773943086L;
    
    private String value;
    private String beforeValue;
    
    /**
     * binlog分析的每行每列的value值斤讥;<br>
     * 新增數(shù)據(jù): value:為現(xiàn)有值;<br>
     * 修改數(shù)據(jù):value為修改后的值湾趾;<br>
     * 刪除數(shù)據(jù):value是刪除前的值芭商; 這個比較特殊主要是為了刪除數(shù)據(jù)時方便獲取刪除前的值<br>
     */
    public String getValue() {
        return value;
    }
    public void setValue(String value) {
        this.value = value;
    }
    
    /**
     * binlog分析的每行每列的beforeValue值;<br>
     * 新增數(shù)據(jù):beforeValue為現(xiàn)有值搀缠;<br>
     * 修改數(shù)據(jù):beforeValue是修改前的值铛楣;<br>
     * 刪除數(shù)據(jù):beforeValue為刪除前的值; <br>
     */
    public String getBeforeValue() {
        return beforeValue;
    }
    public void setBeforeValue(String beforeValue) {
        this.beforeValue = beforeValue;
    }
}
  • CanalDataParser

用于解析數(shù)據(jù)艺普,代碼如下所示簸州。

package io.mykit.canal.demo.utils;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionEnd;
import com.google.protobuf.InvalidProtocolBufferException;

/**
 * 解析數(shù)據(jù)
 */
public class CanalDataParser {
    
    protected static final String DATE_FORMAT   = "yyyy-MM-dd HH:mm:ss";
    protected static final String yyyyMMddHHmmss = "yyyyMMddHHmmss";
    protected static final String yyyyMMdd      = "yyyyMMdd";
    protected static final String SEP           = SystemUtils.LINE_SEPARATOR;
    protected static String  context_format     = null;
    protected static String  row_format         = null;
    protected static String  transaction_format = null;
    protected static String row_log = null;
    
    private static Logger logger = LoggerFactory.getLogger(CanalDataParser.class);
    
    static {
        context_format = SEP + "****************************************************" + SEP;
        context_format += "* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}" + SEP;
        context_format += "* Start : [{}] " + SEP;
        context_format += "* End : [{}] " + SEP;
        context_format += "****************************************************" + SEP;

        row_format = SEP
                     + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {} , delay : {}ms"
                     + SEP;

        transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {} , delay : {}ms" + SEP;

        row_log = "schema[{}], table[{}]";
    }

    public static List<InnerBinlogEntry> convertToInnerBinlogEntry(Message message) {
        List<InnerBinlogEntry> innerBinlogEntryList = new ArrayList<InnerBinlogEntry>();
        
        if(message == null) {
            logger.info("接收到空的 message; 忽略");
            return innerBinlogEntryList;
        }
        
        long batchId = message.getId();
        int size = message.getEntries().size();
        if (batchId == -1 || size == 0) {
            logger.info("接收到空的message[size=" + size + "]; 忽略");
            return innerBinlogEntryList;
        }

        printLog(message, batchId, size);
        List<Entry> entrys = message.getEntries();

        //輸出日志
        for (Entry entry : entrys) {
            long executeTime = entry.getHeader().getExecuteTime();
            long delayTime = new Date().getTime() - executeTime;
            
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
                    TransactionBegin begin = null;
                    try {
                        begin = TransactionBegin.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                    }
                    // 打印事務頭信息,執(zhí)行的線程id歧譬,事務耗時
                    logger.info("BEGIN ----> Thread id: {}",  begin.getThreadId());
                    logger.info(transaction_format, new Object[] {entry.getHeader().getLogfileName(),
                                String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });

                } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    TransactionEnd end = null;
                    try {
                        end = TransactionEnd.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                    }
                    // 打印事務提交信息岸浑,事務id
                    logger.info("END ----> transaction id: {}", end.getTransactionId());
                    logger.info(transaction_format,
                        new Object[] {entry.getHeader().getLogfileName(),  String.valueOf(entry.getHeader().getLogfileOffset()),
                                String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });
                }
                continue;
            }

            //解析結果
            if (entry.getEntryType() == EntryType.ROWDATA) {
                RowChange rowChage = null;
                try {
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                }

                EventType eventType = rowChage.getEventType();

                logger.info(row_format, new Object[] { entry.getHeader().getLogfileName(),
                            String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
                            entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });

                //組裝數(shù)據(jù)結果
                if (eventType == EventType.INSERT || eventType == EventType.DELETE || eventType == EventType.UPDATE) {
                    String schemaName = entry.getHeader().getSchemaName();
                    String tableName = entry.getHeader().getTableName();
                    List<Map<String, BinlogValue>> rows = parseEntry(entry);

                    InnerBinlogEntry innerBinlogEntry = new InnerBinlogEntry();
                    innerBinlogEntry.setEntry(entry);
                    innerBinlogEntry.setEventType(eventType);
                    innerBinlogEntry.setSchemaName(schemaName);
                    innerBinlogEntry.setTableName(tableName.toLowerCase());
                    innerBinlogEntry.setRows(rows);

                    innerBinlogEntryList.add(innerBinlogEntry);
                } else {
                    logger.info(" 存在 INSERT INSERT UPDATE 操作之外的SQL [" + eventType.toString() + "]");
                }
                continue;
            }
        }
        return innerBinlogEntryList;
    }

    private static List<Map<String, BinlogValue>> parseEntry(Entry entry) {
        List<Map<String, BinlogValue>> rows = new ArrayList<Map<String, BinlogValue>>();
        try {
            String schemaName = entry.getHeader().getSchemaName();
            String tableName = entry.getHeader().getTableName();
            RowChange rowChage = RowChange.parseFrom(entry.getStoreValue());
            EventType eventType = rowChage.getEventType();

            // 處理每個Entry中的每行數(shù)據(jù)
            for (RowData rowData : rowChage.getRowDatasList()) {
                StringBuilder rowlog = new StringBuilder("rowlog schema[" + schemaName + "], table[" + tableName + "], event[" + eventType.toString() + "]");
                
                Map<String, BinlogValue> row = new HashMap<String, BinlogValue>();
                List<Column> beforeColumns = rowData.getBeforeColumnsList();
                List<Column> afterColumns = rowData.getAfterColumnsList();
                beforeColumns = rowData.getBeforeColumnsList();
                if (eventType == EventType.DELETE) {//delete
                    for(Column column : beforeColumns) {
                        BinlogValue binlogValue = new BinlogValue();
                        binlogValue.setValue(column.getValue());
                        binlogValue.setBeforeValue(column.getValue());
                        row.put(column.getName(), binlogValue);
                    }
                } else if(eventType == EventType.UPDATE) {//update
                    for(Column column : beforeColumns) {
                        BinlogValue binlogValue = new BinlogValue();
                        binlogValue.setBeforeValue(column.getValue());
                        row.put(column.getName(), binlogValue);
                    }
                    for(Column column : afterColumns) {
                        BinlogValue binlogValue = row.get(column.getName());
                        if(binlogValue == null) {
                            binlogValue = new BinlogValue();
                        }
                        binlogValue.setValue(column.getValue());
                        row.put(column.getName(), binlogValue);
                    }
                } else { // insert
                    for(Column column : afterColumns) {
                        BinlogValue binlogValue = new BinlogValue();
                        binlogValue.setValue(column.getValue());
                        binlogValue.setBeforeValue(column.getValue());
                        row.put(column.getName(), binlogValue);
                    }
                } 
               
                rows.add(row);
                String rowjson = JacksonUtil.obj2str(row);
                
                logger.info("#################################### Data Parse Result ####################################");
                logger.info(rowlog + " , " + rowjson);
                logger.info("#################################### Data Parse Result ####################################");
                logger.info("");
            }
        } catch (InvalidProtocolBufferException e) {
            throw new RuntimeException("parseEntry has an error , data:" + entry.toString(), e);
        }
        return rows;
    }

    private static void printLog(Message message, long batchId, int size) {
        long memsize = 0;
        for (Entry entry : message.getEntries()) {
            memsize += entry.getHeader().getEventLength();
        }

        String startPosition = null;
        String endPosition = null;
        if (!CollectionUtils.isEmpty(message.getEntries())) {
            startPosition = buildPositionForDump(message.getEntries().get(0));
            endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));
        }

        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        logger.info(context_format, new Object[] {batchId, size, memsize, format.format(new Date()), startPosition, endPosition });
    }

    private static String buildPositionForDump(Entry entry) {
        long time = entry.getHeader().getExecuteTime();
        Date date = new Date(time);
        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        return entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":" + entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")";
    }
}
  • DateUtils

時間工具類,代碼如下所示瑰步。

package io.mykit.canal.demo.utils;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class DateUtils {
    
    private static final String FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss";
    
    private static SimpleDateFormat sdf = new SimpleDateFormat(FORMAT_PATTERN);
    
    public static Date parseDate(String datetime) throws ParseException{
        if(datetime != null && !"".equals(datetime)){
            return sdf.parse(datetime);
        }
        return null;
    }
    
    
    public static String formatDate(Date datetime) throws ParseException{
        if(datetime != null ){
            return sdf.format(datetime);
        }
        return null;
    }
    
    public static Long formatStringDateToLong(String datetime) throws ParseException{
        if(datetime != null && !"".equals(datetime)){
            Date d =  sdf.parse(datetime);
            return d.getTime();
        }
        return null;
    }
    
    public static Long formatDateToLong(Date datetime) throws ParseException{
        if(datetime != null){
            return datetime.getTime();
        }
        return null;
    }
}
  • InnerBinlogEntry

Binlog實體類矢洲,代碼如下所示。

package io.mykit.canal.demo.utils;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;

public class InnerBinlogEntry {
    
    /**
     * canal原生的Entry
     */
    private Entry entry;
    
    /**
     * 該Entry歸屬于的表名
     */
    private String tableName;
    
    /**
     * 該Entry歸屬數(shù)據(jù)庫名
     */
    private String schemaName;
    
    /**
     * 該Entry本次的操作類型缩焦,對應canal原生的枚舉读虏;EventType.INSERT; EventType.UPDATE; EventType.DELETE;
     */
    private EventType eventType;
    
    private List<Map<String, BinlogValue>> rows = new ArrayList<Map<String, BinlogValue>>();
    
    
    public Entry getEntry() {
        return entry;
    }
    public void setEntry(Entry entry) {
        this.entry = entry;
    }
    public String getTableName() {
        return tableName;
    }
    public void setTableName(String tableName) {
        this.tableName = tableName;
    }
    public EventType getEventType() {
        return eventType;
    }
    public void setEventType(EventType eventType) {
        this.eventType = eventType;
    }
    public String getSchemaName() {
        return schemaName;
    }
    public void setSchemaName(String schemaName) {
        this.schemaName = schemaName;
    }
    public List<Map<String, BinlogValue>> getRows() {
        return rows;
    }
    public void setRows(List<Map<String, BinlogValue>> rows) {
        this.rows = rows;
    }
}
  • JacksonUtil

Json工具類,代碼如下所示袁滥。

package io.mykit.canal.demo.utils;

import java.io.IOException;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;


public class JacksonUtil {
    private static ObjectMapper mapper = new ObjectMapper();

    public static String obj2str(Object obj) {
        String json = null;
        try {
            json = mapper.writeValueAsString(obj);
        } catch (JsonGenerationException e) {
            e.printStackTrace();
        } catch (JsonMappingException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return json;
    }

    public static <T> T str2obj(String content, Class<T> valueType) {
        try {
            return mapper.readValue(content, valueType);
        } catch (JsonParseException e) {
            e.printStackTrace();
        } catch (JsonMappingException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
}

同步程序的實現(xiàn)

準備好實體類和工具類后盖桥,我們就可以編寫同步程序來實現(xiàn)MySQL數(shù)據(jù)庫中的數(shù)據(jù)實時同步到Solr索引庫了,我們在io.mykit.canal.demo.main包中常見MykitCanalDemoSync類题翻,代碼如下所示揩徊。

package io.mykit.canal.demo.main;

import io.mykit.canal.demo.bean.Book;
import io.mykit.canal.demo.utils.BinlogValue;
import io.mykit.canal.demo.utils.CanalDataParser;
import io.mykit.canal.demo.utils.DateUtils;
import io.mykit.canal.demo.utils.InnerBinlogEntry;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.text.ParseException;
import java.util.List;
import java.util.Map;

public class SyncDataBootStart {

    private static Logger logger = LoggerFactory.getLogger(SyncDataBootStart.class);

    public static void main(String[] args) throws Exception {

        String hostname = "192.168.175.100";
        Integer port = 11111;
        String destination = "example";

        //獲取CanalServer 連接
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(hostname, port), destination, "", "");

        //連接CanalServer
        canalConnector.connect();

        //訂閱Destination
        canalConnector.subscribe();

        //輪詢拉取數(shù)據(jù)
        Integer batchSize = 5*1024;
        while (true){
            Message message = canalConnector.getWithoutAck(batchSize);

            long messageId = message.getId();
            int size = message.getEntries().size();

            if(messageId == -1 || size == 0){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else{
                //進行數(shù)據(jù)同步
                //1. 解析Message對象
                List<InnerBinlogEntry> innerBinlogEntries = CanalDataParser.convertToInnerBinlogEntry(message);

                //2. 將解析后的數(shù)據(jù)信息 同步到Solr的索引庫中.
                syncDataToSolr(innerBinlogEntries);
            }

            //提交確認
            canalConnector.ack(messageId);

        }

    }
    private static void syncDataToSolr(List<InnerBinlogEntry> innerBinlogEntries) throws Exception {
        //獲取solr的連接
        SolrServer solrServer = new HttpSolrServer("http://192.168.175.101:8080/solr");

        //遍歷數(shù)據(jù)集合 , 根據(jù)數(shù)據(jù)集合中的數(shù)據(jù)信息, 來決定執(zhí)行增加, 修改 , 刪除操作 .
        if(innerBinlogEntries != null){
            for (InnerBinlogEntry innerBinlogEntry : innerBinlogEntries) {

                CanalEntry.EventType eventType = innerBinlogEntry.getEventType();

                //如果是Insert, update , 則需要同步數(shù)據(jù)到 solr 索引庫
                if(eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE){
                    List<Map<String, BinlogValue>> rows = innerBinlogEntry.getRows();
                    if(rows != null){
                        for (Map<String, BinlogValue> row : rows) {
                            BinlogValue id = row.get("id");
                            BinlogValue name = row.get("name");
                            BinlogValue author = row.get("author");
                            BinlogValue publishtime = row.get("publishtime");
                            BinlogValue price = row.get("price");
                            BinlogValue publishgroup = row.get("publishgroup");

                            Book book = new Book();
                            book.setId(Integer.parseInt(id.getValue()));
                            book.setName(name.getValue());
                            book.setAuthor(author.getValue());
                            book.setPrice(Double.parseDouble(price.getValue()));
                            book.setPublishgroup(publishgroup.getValue());
                            book.setPublishtime(DateUtils.parseDate(publishtime.getValue()));


                            //導入數(shù)據(jù)到solr索引庫
                            solrServer.addBean(book);
                            solrServer.commit();
                        }
                    }

                }else if(eventType == CanalEntry.EventType.DELETE){
                    //如果是Delete操作, 則需要刪除solr索引庫中的數(shù)據(jù) .
                    List<Map<String, BinlogValue>> rows = innerBinlogEntry.getRows();
                    if(rows != null){
                        for (Map<String, BinlogValue> row : rows) {
                            BinlogValue id = row.get("id");

                            //根據(jù)ID刪除solr的索引庫
                            solrServer.deleteById(id.getValue());
                            solrServer.commit();
                        }
                    }

                }
            }
        }
    }
}

接下來,啟動SyncDataBootStart類的main方法,監(jiān)聽Canal Server靴拱,而Canal Server監(jiān)聽MySQL binlog的日志變化垃喊,一旦MySQL的binlog日志發(fā)生變化,則SyncDataBootStart會立刻收到變更信息袜炕,并將變更信息解析成Book對象實時更新到Solr庫中本谜。如果在MySQL數(shù)據(jù)庫中刪除了數(shù)據(jù),則也會實時刪除Solr庫中的數(shù)據(jù)偎窘。

部分參考Canal官方文檔:https://github.com/alibaba/canal乌助。

好了,今天就到這兒吧陌知,我是冰河他托,我們下期見~~

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市仆葡,隨后出現(xiàn)的幾起案子赏参,更是在濱河造成了極大的恐慌,老刑警劉巖沿盅,帶你破解...
    沈念sama閱讀 222,807評論 6 518
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件把篓,死亡現(xiàn)場離奇詭異,居然都是意外死亡腰涧,警方通過查閱死者的電腦和手機韧掩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,284評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來窖铡,“玉大人疗锐,你說我怎么就攤上這事》驯耍” “怎么了滑臊?”我有些...
    開封第一講書人閱讀 169,589評論 0 363
  • 文/不壞的土叔 我叫張陵,是天一觀的道長敌买。 經常有香客問我简珠,道長,這世上最難降的妖魔是什么虹钮? 我笑而不...
    開封第一講書人閱讀 60,188評論 1 300
  • 正文 為了忘掉前任聋庵,我火速辦了婚禮,結果婚禮上芙粱,老公的妹妹穿的比我還像新娘祭玉。我一直安慰自己,他們只是感情好春畔,可當我...
    茶點故事閱讀 69,185評論 6 398
  • 文/花漫 我一把揭開白布脱货。 她就那樣靜靜地躺著岛都,像睡著了一般。 火紅的嫁衣襯著肌膚如雪振峻。 梳的紋絲不亂的頭發(fā)上臼疫,一...
    開封第一講書人閱讀 52,785評論 1 314
  • 那天,我揣著相機與錄音扣孟,去河邊找鬼烫堤。 笑死,一個胖子當著我的面吹牛凤价,可吹牛的內容都是我干的鸽斟。 我是一名探鬼主播,決...
    沈念sama閱讀 41,220評論 3 423
  • 文/蒼蘭香墨 我猛地睜開眼利诺,長吁一口氣:“原來是場噩夢啊……” “哼富蓄!你這毒婦竟也來了?” 一聲冷哼從身側響起慢逾,我...
    開封第一講書人閱讀 40,167評論 0 277
  • 序言:老撾萬榮一對情侶失蹤立倍,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后氛改,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體帐萎,經...
    沈念sama閱讀 46,698評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡比伏,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,767評論 3 343
  • 正文 我和宋清朗相戀三年胜卤,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片赁项。...
    茶點故事閱讀 40,912評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡葛躏,死狀恐怖,靈堂內的尸體忽然破棺而出悠菜,到底是詐尸還是另有隱情舰攒,我是刑警寧澤,帶...
    沈念sama閱讀 36,572評論 5 351
  • 正文 年R本政府宣布悔醋,位于F島的核電站摩窃,受9級特大地震影響,放射性物質發(fā)生泄漏芬骄。R本人自食惡果不足惜猾愿,卻給世界環(huán)境...
    茶點故事閱讀 42,254評論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望账阻。 院中可真熱鬧蒂秘,春花似錦、人聲如沸淘太。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,746評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至撇贺,卻和暖如春赌莺,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背松嘶。 一陣腳步聲響...
    開封第一講書人閱讀 33,859評論 1 274
  • 我被黑心中介騙來泰國打工雄嚣, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人喘蟆。 一個月前我還...
    沈念sama閱讀 49,359評論 3 379
  • 正文 我出身青樓缓升,卻偏偏與公主長得像,于是被迫代替她去往敵國和親蕴轨。 傳聞我的和親對象是個殘疾皇子港谊,可洞房花燭夜當晚...
    茶點故事閱讀 45,922評論 2 361

推薦閱讀更多精彩內容