Springcloud Stream詳解及整合kafka

官方定義 Spring Cloud Stream 是一個(gè)構(gòu)建消息驅(qū)動(dòng)微服務(wù)的框架。

應(yīng)用程序通過(guò) inputs 或者 outputs 來(lái)與 Spring Cloud Stream 中binder 交互欣尼,通過(guò)我們配置來(lái) binding 堕阔,而 Spring Cloud Stream 的 binder 負(fù)責(zé)與消息中間件交互溉贿。所以锥余,我們只需要搞清楚如何與 Spring Cloud Stream 交互就可以方便使用消息驅(qū)動(dòng)的方式躺苦。
通過(guò)使用Spring Integration來(lái)連接消息代理中間件以實(shí)現(xiàn)消息事件驅(qū)動(dòng)瓮恭。Spring Cloud Stream 為一些供應(yīng)商的消息中間件產(chǎn)品提供了個(gè)性化的自動(dòng)化配置實(shí)現(xiàn)雄坪,引用了發(fā)布-訂閱、消費(fèi)組屯蹦、分區(qū)的三個(gè)核心概念维哈。目前僅支持RabbitMQ绳姨、Kafka。
這里還要講解一下什么是Spring Integration 阔挠? Integration 集成

企業(yè)應(yīng)用集成(EAI)是集成應(yīng)用之間數(shù)據(jù)和服務(wù)的一種應(yīng)用技術(shù)飘庄。四種集成風(fēng)格:

1.文件傳輸:兩個(gè)系統(tǒng)生成文件,文件的有效負(fù)載就是由另一個(gè)系統(tǒng)處理的消息购撼。該類風(fēng)格的例子之一是針對(duì)文件輪詢目錄或FTP目錄跪削,并處理該文件。

2.共享數(shù)據(jù)庫(kù):兩個(gè)系統(tǒng)查詢同一個(gè)數(shù)據(jù)庫(kù)以獲取要傳遞的數(shù)據(jù)迂求。一個(gè)例子是你部署了兩個(gè)EAR應(yīng)用碾盐,它們的實(shí)體類(JPA、Hibernate等)共用同一個(gè)表揩局。

3.遠(yuǎn)程過(guò)程調(diào)用:兩個(gè)系統(tǒng)都暴露另一個(gè)能調(diào)用的服務(wù)毫玖。該類例子有EJB服務(wù),或SOAP和REST服務(wù)凌盯。

4.消息:兩個(gè)系統(tǒng)連接到一個(gè)公用的消息系統(tǒng)孕豹,互相交換數(shù)據(jù),并利用消息調(diào)用行為十气。該風(fēng)格的例子就是眾所周知的中心輻射式的(hub-and-spoke)JMS架構(gòu)励背。

為什么需要SpringCloud Stream消息驅(qū)動(dòng)呢?

比方說(shuō)我們用到了RabbitMQ和Kafka砸西,由于這兩個(gè)消息中間件的架構(gòu)上的不同叶眉,像RabbitMQ有exchange,kafka有Topic芹枷,partitions分區(qū)衅疙,這些中間件的差異性導(dǎo)致我們實(shí)際項(xiàng)目開(kāi)發(fā)給我們?cè)斐闪艘欢ǖ睦_,我們?nèi)绻昧藘蓚€(gè)消息隊(duì)列的其中一種鸳慈,后面的業(yè)務(wù)需求饱溢,我想往另外一種消息隊(duì)列進(jìn)行遷移,這時(shí)候無(wú)疑就是一個(gè)災(zāi)難性的走芋,一大堆東西都要重新推倒重新做绩郎,因?yàn)樗覀兊南到y(tǒng)耦合了,這時(shí)候springcloud Stream給我們提供了一種解耦合的方式翁逞。

Spring Cloud Stream 是一個(gè)構(gòu)建消息驅(qū)動(dòng)微服務(wù)的框架肋杖。應(yīng)用程序通過(guò) inputs 或者 outputs 來(lái)與 Spring Cloud Stream 中binder 交互,通過(guò)我們配置來(lái) binding 挖函,而 Spring Cloud Stream 的 binder 負(fù)責(zé)與中間件交互状植。所以,我們只需要搞清楚如何與 Spring Cloud Stream 交互就可以方便使用消息驅(qū)動(dòng)的方式。

Spring Cloud Stream由一個(gè)中間件中立的核組成津畸。應(yīng)用通過(guò)Spring Cloud Stream插入的input(相當(dāng)于消費(fèi)者consumer振定,它是從隊(duì)列中接收消息的)和output(相當(dāng)于生產(chǎn)者producer,它是從隊(duì)列中發(fā)送消息的肉拓。)通道與外界交流吩案。

通道通過(guò)指定中間件的Binder實(shí)現(xiàn)與外部代理連接。業(yè)務(wù)開(kāi)發(fā)者不再關(guān)注具體消息中間件帝簇,只需關(guān)注Binder對(duì)應(yīng)用程序提供的抽象概念來(lái)使用消息中間件實(shí)現(xiàn)業(yè)務(wù)即可。


springcloudStream.png
  • Binder

Binder 是 Spring Cloud Stream 的一個(gè)抽象概念靠益,是應(yīng)用與消息中間件之間的粘合劑丧肴。目前 Spring Cloud Stream 實(shí)現(xiàn)了 Kafka 和 Rabbit MQ 的binder。

通過(guò) binder 胧后,可以很方便的連接中間件芋浮,可以動(dòng)態(tài)的改變消息的destinations(對(duì)應(yīng)于 Kafka 的topic,Rabbit MQ 的 exchanges)壳快,這些都可以通過(guò)外部配置項(xiàng)來(lái)做到纸巷。甚至可以任意的改變中間件的類型而不需要修改一行代碼。

  • Publish-Subscribe

消息的發(fā)布(Publish)和訂閱(Subscribe)是事件驅(qū)動(dòng)的經(jīng)典模式眶痰。Spring Cloud Stream 的數(shù)據(jù)交互也是基于這個(gè)思想瘤旨。生產(chǎn)者把消息通過(guò)某個(gè) topic 廣播出去(Spring Cloud Stream 中的 destinations)。其他的微服務(wù)竖伯,通過(guò)訂閱特定 topic 來(lái)獲取廣播出來(lái)的消息來(lái)觸發(fā)業(yè)務(wù)的進(jìn)行存哲。

這種模式,極大的降低了生產(chǎn)者與消費(fèi)者之間的耦合七婴。即使有新的應(yīng)用的引入祟偷,也不需要破壞當(dāng)前系統(tǒng)的整體結(jié)構(gòu)。

  • Consumer Groups

“Group”打厘,如果使用過(guò) Kafka 的童鞋并不會(huì)陌生修肠。Spring Cloud Stream 的這個(gè)分組概念的意思基本和 Kafka 一致。

微服務(wù)中動(dòng)態(tài)的縮放同一個(gè)應(yīng)用的數(shù)量以此來(lái)達(dá)到更高的處理能力是非常必須的户盯。對(duì)于這種情況嵌施,同一個(gè)事件防止被重復(fù)消費(fèi),只要把這些應(yīng)用放置于同一個(gè) “group” 中莽鸭,就能夠保證消息只會(huì)被其中一個(gè)應(yīng)用消費(fèi)一次艰管。

  • Consumer Groups

bindings 是我們通過(guò)配置把應(yīng)用和spring cloud stream 的 binder 綁定在一起,之后我們只需要修改 binding 的配置來(lái)達(dá)到動(dòng)態(tài)修改topic蒋川、exchange牲芋、type等一系列信息而不需要修改一行代碼。


Demo演練

這里新建3個(gè)模塊stream-Publish、stream-Subscribe1缸浦、stream-Subscribe2夕冲,其中stream-Publish作為消息發(fā)布模塊,stream-Subscribe1和stream-Subscribe2作為消息消費(fèi)模塊(沒(méi)有kafka的裂逐,windows下載鏈接:http://kafka.apache.org/downloads內(nèi)帶zookeeper)
springcloud版本:Hoxton.SR3

3個(gè)項(xiàng)目都需要以下依賴

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

stream-Publish的yml配置

server:
  port: 7888
spring:
  application:
    name: producer
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092         #Kafka的消息中間件服務(wù)器
          zk-nodes: localhost:2181        #Zookeeper的節(jié)點(diǎn)歹鱼,如果集群,后面加,號(hào)分隔
          auto-create-topics: true        #如果設(shè)置為false,就不會(huì)自動(dòng)創(chuàng)建Topic 有可能你Topic還沒(méi)創(chuàng)建就直接調(diào)用了卜高。
      bindings:
        output:      #這里用stream給我們提供的默認(rèn)output弥姻,后面會(huì)講到自定義output        
            destination: stream-demo    #消息發(fā)往的目的地            
            content-type: text/plain    #消息發(fā)送的格式,接收端不用指定格式掺涛,但是發(fā)送端要

stream-Publish的SendService

@EnableBinding(Source.class)
public class SendService {
     @Autowired
        private Source source;
     
     public void sendMsg(String msg) { h
         source.output().send(MessageBuilder.withPayload(msg).build());
     }
}

stream-Publish的測(cè)試Controller

@RestController
@SpringBootApplication
public class DemoApplication {
    
     @Autowired
        private SendService sendService;
     
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
    
    @GetMapping("/send/{msg}")
    public void send(@PathVariable("msg") String msg){
        sendService.sendMsg(msg);
    }

}

然后需要來(lái)寫(xiě)消息消費(fèi)者的代碼庭敦,stream-Subscribe1、stream-Subscribe2(同前一個(gè)薪缆,換個(gè)端口)
stream-Subscribe1的yml文件

server:
  port: 7889
spring:
  application:
    name: consumer_1
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          zk-nodes: localhost:2181
          auto-create-topics: true
      bindings:
#input是接收秧廉,注意這里不能再像前面一樣寫(xiě)output了
          input:
            destination: stream-demo

stream-Subscribe1的接受消息的Service(啟動(dòng)類默認(rèn)就OK)

@EnableBinding(Sink.class)
public class RecieveService {
 
    @StreamListener(Sink.INPUT)
    public void recieve(Object payload){
        System.out.println(payload);
    }
}

然后我們啟動(dòng)zookeeper,和Kafka拣帽,然后啟動(dòng)這三個(gè)項(xiàng)目
訪問(wèn)我們stream-Publish的測(cè)試接口疼电,如下:


測(cè)試.png

請(qǐng)求成功后我們可以看到消息消費(fèi)者的控制臺(tái)打印出消息hello


image.png

自定義信道

好了到現(xiàn)在為止,我們進(jìn)行了一個(gè)簡(jiǎn)單的消息發(fā)送和接收减拭,用的是Stream給我們提供的默認(rèn)Source蔽豺,Sink,接下來(lái)我們要自己進(jìn)行自定義拧粪,這種方式在工作中還是用的比較多的茫虽,因?yàn)槲覀円煌南⑼ǖ腊l(fā)消息,必然不能全都叫input,output的既们,那樣的話就亂套了濒析,因此首先自定義一個(gè)接口,如下:

  • Source(發(fā)射器): 一個(gè)接口類啥纸,內(nèi)部定義了一個(gè)輸出管道号杏,例如定義一個(gè)輸出管道 @output("XXOO")。說(shuō)明這個(gè)發(fā)射器將會(huì)向這個(gè)管道發(fā)射數(shù)據(jù)斯棒。

  • Sink(接收器):一個(gè)接口類盾致,內(nèi)部定義了一個(gè)輸入管道,例如定義一個(gè)輸入管道 @input("XXOO")荣暮。說(shuō)明這個(gè)接收器將會(huì)從這個(gè)管道接收數(shù)據(jù)庭惜。

  • Binder(綁定器):用于與管道進(jìn)行綁定。Binder將于消息中間件進(jìn)行關(guān)聯(lián)穗酥。@ EnableBinding (Source.class/Sink.class)护赊。@EnableBinding()里面是可以定義多個(gè)發(fā)射器/接收器

自定義MySource:
在stream-Publish中創(chuàng)建自定義接口

public interface MySource {    
    @Output("myOutput")//管道名稱為"myOutput,對(duì)應(yīng)在yml文件里
    MessageChannel myOutput();
}

修改stream-Publish中的SendService文件

@EnableBinding(MySource.class) //使用我們自定義的Mysource
public class SendService {
     @Autowired
        private MySource mySource;
     
     public void sendMsg(String msg) { 
         mySource.myOutput().send(MessageBuilder.withPayload(msg).build());
     }
}

修改stream-Publish中yml文件

server:
  port: 7888
spring:
  application:
    name: producer
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092         #Kafka的消息中間件服務(wù)器
          zk-nodes: localhost:2181        #Zookeeper的節(jié)點(diǎn)惠遏,如果集群,后面加,號(hào)分隔
          auto-create-topics: true        #如果設(shè)置為false,就不會(huì)自動(dòng)創(chuàng)建Topic 有可能你Topic還沒(méi)創(chuàng)建就直接調(diào)用了骏啰。
      bindings:
        myOutput:   #自定義output
            destination: stream-demo    #消息發(fā)往的目的地
            content-type: text/plain    #消息發(fā)送的格式节吮,接收端不用指定格式,但是發(fā)送端要      #這里用stream給我們提供的默認(rèn)output判耕,后面會(huì)講到自定義output        

到這里透绩,我們的消息發(fā)送服務(wù)已經(jīng)修改完啦,接下來(lái)修改消息消費(fèi)服務(wù)stream-Subscribe1壁熄,類似于上面帚豪,之類我直接貼代碼了:

public interface MySink {
    @Input("myInput")
    SubscribableChannel myInput();
}

@EnableBinding(MySink.class)
public class RecieveService {
 
    @StreamListener("myInput")
    public void recieve(Object payload){
        System.out.println(payload);
    }
}
server:
  port: 7889
spring:
  application:
    name: consumer_1
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          zk-nodes: localhost:2181
          auto-create-topics: true
      bindings:
#修改為我們自己的myInput
          myInput:
            destination: stream-demo

然后測(cè)試成功,和上面一樣就不放圖了

消息分組(Consumer Groups)

“Group”草丧,如果使用過(guò) Kafka 的讀者并不會(huì)陌生狸臣。Spring Cloud Stream 的這個(gè)分組概念的意思基本和 Kafka 一致。微服務(wù)中動(dòng)態(tài)的縮放同一個(gè)應(yīng)用的數(shù)量以此來(lái)達(dá)到更高的處理能力是非常必須的方仿。對(duì)于這種情況,同一個(gè)事件防止被重復(fù)消費(fèi)统翩,

只要把這些應(yīng)用放置于同一個(gè) “group” 中仙蚜,就能夠保證消息只會(huì)被其中一個(gè)應(yīng)用消費(fèi)一次。不同的組是可以消費(fèi)的厂汗,同一個(gè)組內(nèi)會(huì)發(fā)生競(jìng)爭(zhēng)關(guān)系委粉,只有其中一個(gè)可以消費(fèi)。

我們只需要修改yml文件就可以啦娶桦,兩個(gè)消費(fèi)者服務(wù)都配置相同的名稱的group

server:
  port: 7889
spring:
  application:
    name: consumer_1
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          zk-nodes: localhost:2181
          auto-create-topics: true
      bindings:
          input:
            destination: stream-demo
            group: group #加上一條group就可以啦

可以看到stream-Subscribe1和stream-Subscribe2是屬于同一組的贾节。springcloud-stream模塊的發(fā)的消息只能被stream-Subscribe1或stream-Subscribe2其中一個(gè)接收到,這樣避免了重復(fù)消費(fèi)衷畦。

消息分區(qū)

Spring Cloud Stream對(duì)給定應(yīng)用的多個(gè)實(shí)例之間分隔數(shù)據(jù)予以支持栗涂。在分隔方案中,物理交流媒介(如:代理主題)被視為分隔成了多個(gè)片(partitions)祈争。一個(gè)或者多個(gè)生產(chǎn)者應(yīng)用實(shí)例給多個(gè)消費(fèi)者應(yīng)用實(shí)例發(fā)送消息并確保相同特征的數(shù)據(jù)被同一消費(fèi)者實(shí)例處理斤程。

Spring Cloud Stream對(duì)分割的進(jìn)程實(shí)例實(shí)現(xiàn)進(jìn)行了抽象。使得Spring Cloud Stream 為不具備分區(qū)功能的消息中間件(RabbitMQ)也增加了分區(qū)功能擴(kuò)展菩混。

那么我們就要進(jìn)行一些配置了忿墅,比如我只想要stream-Subscribe2模塊接收到消息,stream-Subscribe2配置如下:

server:
  port: 7890
spring:
  application:
    name: consumer_2
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          zk-nodes: localhost:2181
          auto-create-topics: true
      bindings:
          input:
            destination: stream-demo
            group: group
            consumer:
              partitioned: true   #開(kāi)啟分區(qū)
      instance-count: 2     #分區(qū)數(shù)量

stream-Publish模塊配置如下:

server:
  port: 7888
spring:
  application:
    name: producer
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          zk-nodes: localhost:2181
          auto-create-topics: true
      bindings:
        myOutput:
            destination: stream-demo
            content-type: text/plain
            producer:
              partitionKeyExpression:  payload.id(你自己的key)   #分區(qū)的主鍵沮峡,根據(jù)什么來(lái)分區(qū)疚脐,下面的payload.id只是一個(gè)對(duì)象的id用于做為Key,用來(lái)說(shuō)明的邢疙。希望不要誤解
              partitionCount: 2    #Key和分區(qū)數(shù)量進(jìn)行取模去分配消息棍弄,這里分區(qū)數(shù)量配置為2

其他的代碼基本不變望薄,這里就不演示了。這里要給大家說(shuō)明一下照卦,比如分區(qū)的Key是一個(gè)對(duì)象的id式矫,比如說(shuō)id=1,每次發(fā)送消息的對(duì)象的id為相同值1,則消息只會(huì)被同一個(gè)消費(fèi)者消費(fèi)役耕,比如說(shuō)Key和分區(qū)數(shù)量取模計(jì)算的結(jié)果是分到stream2模塊中采转,那么下一次進(jìn)行進(jìn)行消息發(fā)送,

只要分組的key即id的值依然還是1的話瞬痘,消息永遠(yuǎn)只會(huì)分配到stream2模塊中故慈。

參考:https://blog.csdn.net/JinXYan/article/details/90813592

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市框全,隨后出現(xiàn)的幾起案子察绷,更是在濱河造成了極大的恐慌,老刑警劉巖津辩,帶你破解...
    沈念sama閱讀 216,692評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件拆撼,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡喘沿,警方通過(guò)查閱死者的電腦和手機(jī)闸度,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)蚜印,“玉大人莺禁,你說(shuō)我怎么就攤上這事≌常” “怎么了哟冬?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,995評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)忆绰。 經(jīng)常有香客問(wèn)我浩峡,道長(zhǎng),這世上最難降的妖魔是什么错敢? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,223評(píng)論 1 292
  • 正文 為了忘掉前任红符,我火速辦了婚禮,結(jié)果婚禮上伐债,老公的妹妹穿的比我還像新娘预侯。我一直安慰自己,他們只是感情好峰锁,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,245評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布萎馅。 她就那樣靜靜地躺著,像睡著了一般虹蒋。 火紅的嫁衣襯著肌膚如雪糜芳。 梳的紋絲不亂的頭發(fā)上飒货,一...
    開(kāi)封第一講書(shū)人閱讀 51,208評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音峭竣,去河邊找鬼塘辅。 笑死,一個(gè)胖子當(dāng)著我的面吹牛皆撩,可吹牛的內(nèi)容都是我干的扣墩。 我是一名探鬼主播,決...
    沈念sama閱讀 40,091評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼扛吞,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼呻惕!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起滥比,我...
    開(kāi)封第一講書(shū)人閱讀 38,929評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤亚脆,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后盲泛,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體濒持,經(jīng)...
    沈念sama閱讀 45,346評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,570評(píng)論 2 333
  • 正文 我和宋清朗相戀三年寺滚,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了柑营。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,739評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡玛迄,死狀恐怖由境,靈堂內(nèi)的尸體忽然破棺而出棚亩,到底是詐尸還是另有隱情蓖议,我是刑警寧澤,帶...
    沈念sama閱讀 35,437評(píng)論 5 344
  • 正文 年R本政府宣布讥蟆,位于F島的核電站勒虾,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏瘸彤。R本人自食惡果不足惜修然,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,037評(píng)論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望质况。 院中可真熱鬧愕宋,春花似錦、人聲如沸结榄。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,677評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)臼朗。三九已至邻寿,卻和暖如春蝎土,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背绣否。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,833評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工誊涯, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人蒜撮。 一個(gè)月前我還...
    沈念sama閱讀 47,760評(píng)論 2 369
  • 正文 我出身青樓暴构,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親淀弹。 傳聞我的和親對(duì)象是個(gè)殘疾皇子丹壕,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,647評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容