阿里Canal框架(數(shù)據(jù)同步中間件)初步實踐

背景介紹

早期络凿,阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業(yè)務(wù)需求昂羡。不過早期的數(shù)據(jù)庫同步業(yè)務(wù)絮记,主要是基于trigger的方式獲取增量變更,不過從2010年開始紧憾,阿里系公司開始逐步的嘗試基于數(shù)據(jù)庫的日志解析到千,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業(yè)務(wù)赴穗,從此開啟了一段新紀元憔四。

適用版本

支持mysql5.7及以下版本

傳統(tǒng)的主從同步原理

master將數(shù)據(jù)記錄到了binlog日志里面膀息,然后slave會通過一個io線程去讀取master那邊指定位置點開始的binlog日志內(nèi)容,并將相應(yīng)的信息寫會到slave這邊的relay日志里面了赵,最后slave會有單獨的sql線程來讀取這些master那邊執(zhí)行的sql語句記錄潜支,達成兩端的數(shù)據(jù)同步。

傳統(tǒng)的mysql主從同步實現(xiàn)的原理圖如下所示:

Canal中間件功能

基于純java語言開發(fā)柿汛,可以用于做增量數(shù)據(jù)訂閱和消費功能冗酿。

相比于傳統(tǒng)的數(shù)據(jù)同步,我們通常需要進行先搭建主從架構(gòu)络断,然后使用binlog日志進行讀取裁替,然后指定需要同步的數(shù)據(jù)庫,數(shù)據(jù)庫表等信息貌笨。但是隨著我們業(yè)務(wù)的不斷復(fù)雜弱判,這種傳統(tǒng)的數(shù)據(jù)同步方式以及開始變得較為繁瑣,不夠靈活锥惋。

canal模擬mysql slave的交互協(xié)議昌腰,偽裝自己為mysql slave,向mysql master發(fā)送dump協(xié)議mysql master收到dump請求膀跌,開始推送binary log給slave(也就是canal)遭商,canal解析binary log對象(原始為byte流),通過對binlog數(shù)據(jù)進行解析即可獲取需要同步的數(shù)據(jù)捅伤,在進行同步數(shù)據(jù)的過程中還可以加入開發(fā)人員的一些額外邏輯處理劫流,比較開放。

Binlog的三種基本類型分別為:

STATEMENT模式只記錄了sql語句暑认,但是沒有記錄上下文信息困介,在進行數(shù)據(jù)恢復(fù)的時候可能會導致數(shù)據(jù)的丟失情況

ROW模式除了記錄sql語句之外,還會記錄每個字段的變化情況蘸际,能夠清楚的記錄每行數(shù)據(jù)的變化歷史,但是會占用較多的空間徒扶,需要使用mysqlbinlog工具進行查看粮彤。

MIX模式比較靈活的記錄励烦,例如說當遇到了表結(jié)構(gòu)變更的時候艺晴,就會記錄為statement模式。當遇到了數(shù)據(jù)更新或者刪除情況下就會變?yōu)閞ow模式

Canal環(huán)境搭建

需要先登錄mysql數(shù)據(jù)庫悠栓,檢查binlog功能是否有開啟圈澈。

mysql> show variables like 'log_bin';

+---------------+-------+

| Variable_name | Value |

+---------------+-------+

| log_bin | OFF |

+---------------+-------+

1 row in set (0.00 sec)

如果顯示狀態(tài)為OFF表示該功能未開啟惫周,那么這個時候就需要到my.ini里面進行相關(guān)配置了,在原來的my.ini配置底部插入以下內(nèi)容:

server-id=192

log-bin=mysql-bin

binlog_format = ROW

當再次通過客戶端查看log_bin狀態(tài)為ON的時候康栈,就表示binlog已經(jīng)開啟:

mysql> show variables like 'log_bin';

+---------------+-------+

| Variable_name | Value |

+---------------+-------+

| log_bin | ON |

+---------------+-------+

1 row in set (0.00 sec)

然后在mysql里面添加以下的相關(guān)用戶和權(quán)限:

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';

GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

開啟之后递递,我們可以前往canal的官方地址進行相應(yīng)版本的安裝包進行下載:

https://github.com/alibaba/canal/releases

下載好指定的版本之后喷橙,找到里面的bin目錄底下的startup腳本,啟動登舞。

啟動之后會發(fā)現(xiàn)黑窗停止在這樣一行的內(nèi)容上贰逾,然后就不動了

Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0

Listening for transport dt_socket at address: 9099

這時候需要前往日志文件夾底下canallogs,查看canal日志文件是否已經(jīng)開啟菠秒,如果顯示以下內(nèi)容疙剑,就表示啟動已經(jīng)成功

2019-05-06 10:41:56.116 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler

2019-05-06 10:41:56.144 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations

2019-05-06 10:41:56.145 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.

2019-05-06 10:41:56.233 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.164.1:11111]

2019-05-06 10:41:58.179 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now .....

canal server的默認端口號為:11111,如果需要調(diào)整的話践叠,可以去到conf目錄底下的canal.properties文件中進行修改言缤。

啟動了canal的server之后,便是基于java的客戶端搭建了。

首先在canalconf目錄底下創(chuàng)建一個獨立的文件夾(文件命名?idea_user_data)禁灼,用于做額外的數(shù)據(jù)源配置:

然后創(chuàng)建一份特定的properties文件:(名稱最好為:instance.properties)管挟,這里面只需要創(chuàng)建properties文件即可,其余幾份文件會自動生成匾二,instance.properties可以直接從example文件夾里面進行copy哮独。

首先是導入相應(yīng)的依賴文件:

com.alibaba.otter

canal.client

1.1.0

單機版本的canal連接案例

單機版本的環(huán)境比較好搭建,相應(yīng)的代碼如下:

首先是canal客戶端的配置類

/**

* @author idea

* @date 2019/5/6

* @Version V1.0

*/

public class CanalConfig {

public static String CANAL_ADDRESS="127.0.0.1";

public static int PORT=11111;

public static String DESTINATION="idea_user_data";

public static String FILTER=".*\..*";

}

客戶端代碼:

package com.sise.client;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.CanalEntry.*;

import com.alibaba.otter.canal.protocol.Message;

import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;

import java.util.List;

import java.util.Queue;

import java.util.concurrent.ConcurrentLinkedQueue;

import static com.sise.config.CanalConfig.*;

/**

* @author idea

* @date 2019/5/6

* @Version V1.0

*/

public class CanalClient {

private static Queue SQL_QUEUE = new ConcurrentLinkedQueue<>();

public static void main(String args[]) {

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(CANAL_ADDRESS,

PORT), DESTINATION, "", "");

int batchSize = 1000;

try {

connector.connect();

connector.subscribe(FILTER);

connector.rollback();

try {

while (true) {

//嘗試從master那邊拉去數(shù)據(jù)batchSize條記錄察藐,有多少取多少

Message message = connector.getWithoutAck(batchSize);

long batchId = message.getId();

int size = message.getEntries().size();

if (batchId == -1 || size == 0) {

Thread.sleep(1000);

} else {

dataHandle(message.getEntries());

}

connector.ack(batchId);

//當隊列里面堆積的sql大于一定數(shù)值的時候就模擬執(zhí)行

if (SQL_QUEUE.size() >= 10) {

executeQueueSql();

}

}

} catch (InterruptedException e) {

e.printStackTrace();

} catch (InvalidProtocolBufferException e) {

e.printStackTrace();

}

} finally {

connector.disconnect();

}

}

/**

* 模擬執(zhí)行隊列里面的sql語句

*/

public static void executeQueueSql() {

int size = SQL_QUEUE.size();

for (int i = 0; i < size; i++) {

String sql = SQL_QUEUE.poll();

System.out.println("[sql]----> " + sql);

}

}

/**

* 數(shù)據(jù)處理

*

* @param entrys

*/

private static void dataHandle(List entrys) throws InvalidProtocolBufferException {

for (Entry entry : entrys) {

if (EntryType.ROWDATA == entry.getEntryType()) {

RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

EventType eventType = rowChange.getEventType();

if (eventType == EventType.DELETE) {

saveDeleteSql(entry);

} else if (eventType == EventType.UPDATE) {

saveUpdateSql(entry);

} else if (eventType == EventType.INSERT) {

saveInsertSql(entry);

}

}

}

}

/**

* 保存更新語句

*

* @param entry

*/

private static void saveUpdateSql(Entry entry) {

try {

RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

List rowDatasList = rowChange.getRowDatasList();

for (RowData rowData : rowDatasList) {

List newColumnList = rowData.getAfterColumnsList();

StringBuffer sql = new StringBuffer("update " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " set ");

for (int i = 0; i < newColumnList.size(); i++) {

sql.append(" " + newColumnList.get(i).getName()

+ " = '" + newColumnList.get(i).getValue() + "'");

if (i != newColumnList.size() - 1) {

sql.append(",");

}

}

sql.append(" where ");

List oldColumnList = rowData.getBeforeColumnsList();

for (Column column : oldColumnList) {

if (column.getIsKey()) {

//暫時只支持單一主鍵

sql.append(column.getName() + "=" + column.getValue());

break;

}

}

SQL_QUEUE.add(sql.toString());

}

} catch (InvalidProtocolBufferException e) {

e.printStackTrace();

}

}

/**

* 保存刪除語句

*

* @param entry

*/

private static void saveDeleteSql(Entry entry) {

try {

RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

List rowDatasList = rowChange.getRowDatasList();

for (RowData rowData : rowDatasList) {

List columnList = rowData.getBeforeColumnsList();

StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " where ");

for (Column column : columnList) {

if (column.getIsKey()) {

//暫時只支持單一主鍵

sql.append(column.getName() + "=" + column.getValue());

break;

}

}

SQL_QUEUE.add(sql.toString());

}

} catch (InvalidProtocolBufferException e) {

e.printStackTrace();

}

}

/**

* 保存插入語句

*

* @param entry

*/

private static void saveInsertSql(Entry entry) {

try {

RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

List rowDatasList = rowChange.getRowDatasList();

for (RowData rowData : rowDatasList) {

List columnList = rowData.getAfterColumnsList();

StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " (");

for (int i = 0; i < columnList.size(); i++) {

sql.append(columnList.get(i).getName());

if (i != columnList.size() - 1) {

sql.append(",");

}

}

sql.append(") VALUES (");

for (int i = 0; i < columnList.size(); i++) {

sql.append("'" + columnList.get(i).getValue() + "'");

if (i != columnList.size() - 1) {

sql.append(",");

}

}

sql.append(")");

SQL_QUEUE.add(sql.toString());

}

} catch (InvalidProtocolBufferException e) {

e.printStackTrace();

}

}

}

啟動程序之后皮璧,我們對數(shù)據(jù)庫表進行10次左右的修改操作之后,便可以從控制臺中看到sql的打印信息分飞。

關(guān)于canal集群搭建的一些坑

在實際開發(fā)中悴务,如果只有一臺canal機器作為server,當該臺機器掛掉之后譬猫,服務(wù)就會終止讯檐,那么這個時候我們便需要引入集群部署的方式了。

搭建canal集群的環(huán)境需要先搭建好相應(yīng)的zk集群模式染服。zk的集群搭建網(wǎng)上資料很多别洪,這里就不進行講解了。

canal搭建集群的一些資料可以參考以下鏈接:

https://github.com/alibaba/canal/wiki/AdminGuide

canal在搭建HA模式的時候有幾個容易掉坑的步驟:

canal.properties配置里面需要添加zk的地址柳刮,同時canal.instance.global.spring.xml

需要修改為classpath:spring/default-instance.xml

每臺機子的canal里面的具體instance所在目錄的名稱需要統(tǒng)一挖垛,每個實例都有對應(yīng)的slaveId,他們的id需要保證不重復(fù)秉颗。搭建好了canal集群環(huán)境之后痢毒,然后代碼部分需要在鏈接的那個模塊進行稍微的調(diào)整:

CanalConnector connector = CanalConnectors.newClusterConnector(CLUSTER_ADDRESS, DESTINATION, "", "");

為了保證master在某些特殊場景下掛掉,mysql需要搭建為雙M模式蚕甥,那么我們這個時候可以在每個canal機器的instance配置文件中加入master的地址和standby的地址:

canal.instance.master.address=******

canal.instance.standby.address = ******

同時對于detecing也需要進行配置修改

canal.instance.detecting.enable = true ## 需要開啟心跳檢查

canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() ##心跳檢查sql

canal.instance.detecting.interval.time = 3 ##心跳檢查頻率

canal.instance.detecting.retry.threshold = 3 ## 心跳檢查失敗次數(shù)閥值哪替,當超過這個次數(shù)之后,就會自動切換到standby上邊的機器進行binlog的訂閱讀取

canal.instance.detecting.heartbeatHaEnable = true ## 是否開啟master和standby的主動切換

ps: master和standby進行切換機器的時候可能會有時間延遲菇怀。

啟動2臺canal機器凭舶,可以在zk里面查看到canal注冊的節(jié)點信息:

通過模擬測試晌块,關(guān)閉當前端口為11111的canal機器,節(jié)點信息會自動更換為第二臺canal進行替換:

ClusterCanalConnector和SimpleCanalConnector類發(fā)現(xiàn)了username和password的參數(shù)库快,但是似乎具體配置中并沒有做具體的設(shè)置摸袁,這是為什么呢?

后來也在github上邊查看到了一些網(wǎng)友的相關(guān)討論:

canal結(jié)合kafka發(fā)送sql數(shù)據(jù)案例

pom依賴:

org.apache.kafka

kafka_2.11

1.0.1

org.apache.kafka

kafka-clients

1.0.1

kafka的配置類:

public class KafkaProperties

{

public final static String ZK_CONNECTION = "XXX.XXX.XXX.XXX:2181";

public final static String BROKER_LIST_ADDRESS = "XXX.XXX.XXX.XXX:9092";

public final static String GROUP_ID = "group1";

public final static String TOPIC = "USER-DATA";

}

關(guān)于kafka的環(huán)境搭建步驟比較簡單义屏,網(wǎng)上有很多的資料靠汁,這里就不多一一介紹了。

首先是kafka的producer部分代碼:

import org.apache.kafka.clients.producer.*;

import org.apache.kafka.common.serialization.StringSerializer;

import org.apache.log4j.Logger;

import java.util.Properties;

import static com.sise.kafka.KafkaProperties.TOPIC;

/**

* @author idea

* @date 2019/5/7

* @Version V1.0

*/

public class KafkaProducerDemo extends Thread {

public static Logger log = Logger.getLogger(KafkaProducerDemo.class);

//kafka的鏈接地址要使用hostname 默認9092端口

private static final String BROKER_LIST = BROKER_LIST_ADDRESS;

private static KafkaProducer producer = null;

static {

Properties configs = initConfig();

producer = new KafkaProducer(configs);

}

/*

初始化配置

*/

private static Properties initConfig() {

Properties properties = new Properties();

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

return properties;

}

public static void sendMsg(String msg) {

ProducerRecord record = new ProducerRecord<>(TOPIC, msg);

producer.send(record, new Callback() {

@Override

public void onCompletion(RecordMetadata recordMetadata, Exception e) {

if (null != e) {

log.info("send error" + e.getMessage());

} else {

System.out.println("send success");

}

}

});

}

}

接著是consumer部分的代碼:

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

/**

* @author idea

* @date 2019/5/7

* @Version V1.0

*/

public class KafkaConsumerDemo extends Thread {

private final ConsumerConnector consumer;

private final String topic;

public KafkaConsumerDemo(String topic) {

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(

createConsumerConfig());

this.topic = topic;

}

private static ConsumerConfig createConsumerConfig() {

Properties props = new Properties();

props.put("zookeeper.connect", KafkaProperties.ZK_CONNECTION);

props.put("group.id", KafkaProperties.GROUP_ID);

props.put("zookeeper.session.timeout.ms", "40000");

props.put("zookeeper.sync.time.ms", "200");

props.put("auto.commit.interval.ms", "1000");

return new ConsumerConfig(props);

}

@Override

public void run() {

Map topicCountMap = new HashMap();

topicCountMap.put(topic, new Integer(1));

Map>> consumerMap = consumer.createMessageStreams(topicCountMap);

KafkaStream stream = consumerMap.get(topic).get(0);

ConsumerIterator it = stream.iterator();

while (it.hasNext()) {

System.out.println("【receive】" + new String(it.next().message()));

}

}

}

然后需要在CanalClient 的executeQueueSql函數(shù)出進行部分功能的修改:

/**

* 給kafka發(fā)送sql語句

*/

public static void executeQueueSql() {

int size = SQL_QUEUE.size();

for (int i = 0; i < size; i++) {

String sql = SQL_QUEUE.poll();

//發(fā)送sql給kafka

KafkaProducerDemo.sendMsg(sql);

}

}

為了驗證程序是否正常闽铐,啟動canal和kafka之后蝶怔,對canal監(jiān)聽的數(shù)據(jù)庫里面的表進行數(shù)據(jù)信息的修改,然后canal會將修改的binlog里面的sql放入隊列中兄墅,當隊列滿了之后便向kafka中進行發(fā)送:

consumer端接受到數(shù)據(jù)之后控制臺便打印出相應(yīng)內(nèi)容:

如果想學習Java工程化踢星、高性能及分布式、深入淺出隙咸。微服務(wù)沐悦、Spring,MyBatis五督,Netty源碼分析的朋友可以加我的Java高級交流:787707172藏否,群里有阿里大牛直播講解技術(shù),以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費分享給大家充包。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末副签,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子基矮,更是在濱河造成了極大的恐慌淆储,老刑警劉巖,帶你破解...
    沈念sama閱讀 210,978評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件家浇,死亡現(xiàn)場離奇詭異本砰,居然都是意外死亡,警方通過查閱死者的電腦和手機钢悲,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評論 2 384
  • 文/潘曉璐 我一進店門灌具,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人譬巫,你說我怎么就攤上這事《桨剩” “怎么了芦昔?”我有些...
    開封第一講書人閱讀 156,623評論 0 345
  • 文/不壞的土叔 我叫張陵,是天一觀的道長娃肿。 經(jīng)常有香客問我咕缎,道長珠十,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,324評論 1 282
  • 正文 為了忘掉前任凭豪,我火速辦了婚禮焙蹭,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘嫂伞。我一直安慰自己孔厉,他們只是感情好,可當我...
    茶點故事閱讀 65,390評論 5 384
  • 文/花漫 我一把揭開白布帖努。 她就那樣靜靜地躺著撰豺,像睡著了一般。 火紅的嫁衣襯著肌膚如雪拼余。 梳的紋絲不亂的頭發(fā)上污桦,一...
    開封第一講書人閱讀 49,741評論 1 289
  • 那天,我揣著相機與錄音匙监,去河邊找鬼凡橱。 笑死,一個胖子當著我的面吹牛亭姥,可吹牛的內(nèi)容都是我干的稼钩。 我是一名探鬼主播,決...
    沈念sama閱讀 38,892評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼致份,長吁一口氣:“原來是場噩夢啊……” “哼变抽!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起氮块,我...
    開封第一講書人閱讀 37,655評論 0 266
  • 序言:老撾萬榮一對情侶失蹤绍载,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后滔蝉,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體击儡,經(jīng)...
    沈念sama閱讀 44,104評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年蝠引,在試婚紗的時候發(fā)現(xiàn)自己被綠了阳谍。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,569評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡螃概,死狀恐怖矫夯,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情吊洼,我是刑警寧澤训貌,帶...
    沈念sama閱讀 34,254評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站,受9級特大地震影響递沪,放射性物質(zhì)發(fā)生泄漏豺鼻。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,834評論 3 312
  • 文/蒙蒙 一款慨、第九天 我趴在偏房一處隱蔽的房頂上張望儒飒。 院中可真熱鬧,春花似錦檩奠、人聲如沸桩了。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽圣猎。三九已至,卻和暖如春乞而,著一層夾襖步出監(jiān)牢的瞬間送悔,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評論 1 264
  • 我被黑心中介騙來泰國打工爪模, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留欠啤,地道東北人。 一個月前我還...
    沈念sama閱讀 46,260評論 2 360
  • 正文 我出身青樓屋灌,卻偏偏與公主長得像洁段,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子共郭,可洞房花燭夜當晚...
    茶點故事閱讀 43,446評論 2 348

推薦閱讀更多精彩內(nèi)容