如何基于 Pulsar 和 Spark 進(jìn)行批流一體的彈性數(shù)據(jù)處理忘嫉?

作者 | yjshen缅刽,Anonymitaet編輯 | Linda批流現(xiàn)狀

在大規(guī)模并行數(shù)據(jù)分析領(lǐng)域藤滥,AMPLab 的『One stack to rule them all』提出用 Apache Spark 作為統(tǒng)一的引擎支持批處理底燎、流處理刃榨、交互查詢和機(jī)器學(xué)習(xí)等常見的數(shù)據(jù)處理場景弹砚。 2017 年 7 月双仍,Spark 2.2.0 版本正式推出的 Spark structured streaming 將 Spark SQL 作為流處理、批處理底層統(tǒng)一的執(zhí)行引擎桌吃,提供對無界表(無邊界的源源不斷到達(dá)的流數(shù)據(jù))和有界表(靜態(tài)歷史數(shù)據(jù))的優(yōu)化查詢朱沃,而向用戶提供 Dataset/DataFrame API 對批流數(shù)據(jù)聯(lián)合處理,進(jìn)一步模糊了批流數(shù)據(jù)處理的邊界茅诱。

另一方面逗物,Apache Flink 在 2016 年左右進(jìn)入大眾視野,憑借其當(dāng)時更優(yōu)的流處理引擎瑟俭,原生的 Watermark 支持『Exaclty Once』的數(shù)據(jù)一致性保證翎卓,和批流一體計算等各種場景的支持,成為 Spark 的有力挑戰(zhàn)者摆寄。無論是使用 Spark 還是 Flink失暴,用戶真正關(guān)心的是如何更好地使用數(shù)據(jù),更快地挖掘數(shù)據(jù)中的價值微饥,流數(shù)據(jù)和靜態(tài)數(shù)據(jù)不再是分離的個體逗扒,而是一份數(shù)據(jù)的兩種不同表征方式。

然而在實踐中欠橘,構(gòu)建一個批流一體的數(shù)據(jù)平臺并不只是計算引擎層的任務(wù)矩肩。因為在傳統(tǒng)解決方案中,近實時的流肃续、事件數(shù)據(jù)通常采用消息隊列(例如 RabbitMQ)黍檩、實時數(shù)據(jù)管道(例如 Apache Kafka)存儲,而批處理所需要的靜態(tài)數(shù)據(jù)通常使用文件系統(tǒng)始锚、對象存儲進(jìn)行保存建炫。這就意味著,一方面疼蛾,在數(shù)據(jù)分析過程中肛跌,為了保證結(jié)果的正確性和實時性,需要對分別存儲在兩類系統(tǒng)中數(shù)據(jù)進(jìn)行聯(lián)合查詢;另一方面衍慎,在運維過程中转唉,需要定期將流數(shù)據(jù)轉(zhuǎn)存到文件 / 對象存儲中,通過維持流形式的數(shù)據(jù)總量在閾值之下來保證消息隊列稳捆、數(shù)據(jù)管道的性能(因為這類系統(tǒng)的以分區(qū)為主的架構(gòu)設(shè)計緊耦合了消息服務(wù)和消息存儲赠法,而且多數(shù)都太過依賴文件系統(tǒng),隨著數(shù)據(jù)量的增加乔夯,系統(tǒng)性能會急劇下降)砖织,但人為的數(shù)據(jù)搬遷不但會提升系統(tǒng)的運維成本,而且搬遷過程中的數(shù)據(jù)清洗末荐、讀取侧纯、加載也是對集群資源的巨大消耗。

與此同時甲脏,從 Mesos 和 YARN 的流行眶熬、Docker 的興起到現(xiàn)在的 Kubernetes 被廣泛采用,整個基礎(chǔ)架構(gòu)正在全面地向容器化方向發(fā)展块请,傳統(tǒng)緊耦合消息服務(wù)和消息計算的架構(gòu)并不能很好地適應(yīng)容器化的架構(gòu)娜氏。以 Kafka 為例,其以分區(qū)為中心的架構(gòu)緊耦合了消息服務(wù)和消息存儲墩新。Kafka 的分區(qū)與一臺或者一組物理機(jī)強(qiáng)綁定贸弥,這帶來的問題是在機(jī)器失效或集群擴(kuò)容中,需要進(jìn)行昂貴且漫長的分區(qū)數(shù)據(jù)重新均衡的過程海渊;其以分區(qū)為粒度的存儲設(shè)計也不能很好利用已有的云存儲資源绵疲;此外,過于簡單的設(shè)計導(dǎo)致其為了進(jìn)行容器化需要解決多租戶管理切省、IO 隔離等方面很多架構(gòu)上的缺陷最岗。

Pulsar 簡介

Apache Pulsar 是一個多租戶、高性能的企業(yè)級消息發(fā)布訂閱系統(tǒng)朝捆,最初由 Yahoo 研發(fā)般渡, 2018 年 9 月從 Apache 孵化器畢業(yè),成為 Apache 基金會的頂級開源項目芙盘。Pulsar 基于發(fā)布訂閱模式(pub-sub)構(gòu)建驯用,生產(chǎn)者(producer)發(fā)布消息(message)到主題(topic),消費者可以訂閱主題儒老,處理收到的消息蝴乔,并在消息處理完成后發(fā)送確認(rèn)(Ack)。Pulsar 提供了四種訂閱類型驮樊,它們可以共存在同一個主題上薇正,以訂閱名進(jìn)行區(qū)分:

  • 獨享(exclusive)訂閱——一個訂閱名下同時只能有一個消費者片酝。

  • 共享(shared)訂閱——可以由多個消費者訂閱,每個消費者接收其中一部分消息挖腰。

  • 失效備援(failover)訂閱——允許多個消費者連接到同一個主題雕沿,但只有一個消費者能夠接收消息。只有在當(dāng)前消費者發(fā)生失效時猴仑,其他消費者才開始接收消息审轮。

  • 鍵劃分(key-shared)訂閱(測試版功能)——多個消費者連接到同一主題,相同 Key 總會發(fā)送給同一個消費者辽俗。

Pulsar 從設(shè)計之初就支持多租戶(multi-tenancy)的概念疾渣,租戶(tenant)可以橫跨多個集群(clusters),每個租戶都有其認(rèn)證和鑒權(quán)方式崖飘,租戶也是存儲配額榴捡、消息生存時間(TTL)和隔離策略的管理單元。Pulsar 多租戶的特性可以在 topic URL 上得到充分體現(xiàn)坐漏,其結(jié)構(gòu)是 persistent://tenant/namespace/topic薄疚。命名空間(namespace)是 Pulsar 中最基本的管理單元碧信,我們可以設(shè)置權(quán)限赊琳、調(diào)整復(fù)制選項、管理跨集群的數(shù)據(jù)復(fù)制砰碴、控制消息的過期時間或執(zhí)行其他關(guān)鍵任務(wù)躏筏。

Pulsar 獨特架構(gòu)

Pulsar 和其他消息系統(tǒng)的最根本區(qū)別在于其采用計算和存儲分離的分層架構(gòu)。Pulsar 集群由兩層組成:無狀態(tài)服務(wù)層呈枉,它由一組接受和傳遞消息的 broker 組成趁尼;分布式存儲層,它由一組名為 bookies 的 Apache BookKeeper 存儲節(jié)點組成猖辫,具備高可用酥泞、強(qiáng)一致、低延時的特點啃憎。

image

和 Kafka 一樣芝囤,Pulsar 也是基于主題分區(qū)(Topic partition)的邏輯概念進(jìn)行主題數(shù)據(jù)的存儲。不同的是辛萍,Kafka 的物理存儲也是以分區(qū)為單位悯姊,每個 partition 必須作為一個整體(一個目錄)被存儲在一個 broker 上,而 Pulsar 的每個主題分區(qū)本質(zhì)上都是存儲在 BookKeeper 上的分布式日志贩毕,每個日志又被分成分段(Segment)悯许。每個 Segment 作為 BookKeeper 上的一個 Ledger,均勻分布并存儲在多個 bookie 中辉阶。存儲分層的架構(gòu)和以 Segment 為中心的分片存儲是 Pulsar 的兩個關(guān)鍵設(shè)計理念先壕。以此為基礎(chǔ)為 Pulsar 提供了很多重要的優(yōu)勢:無限制的主題分區(qū)瘩扼、存儲即時擴(kuò)展,無需數(shù)據(jù)遷移 垃僚、無縫 broker 故障恢復(fù)邢隧、無縫集群擴(kuò)展、無縫的存儲(Bookie)故障恢復(fù)和獨立的可擴(kuò)展性冈在。

消息系統(tǒng)解耦了生產(chǎn)者與消費者倒慧,但實際的消息本質(zhì)上仍是有結(jié)構(gòu)的,因此生產(chǎn)者和消費者之間需要一種協(xié)調(diào)機(jī)制包券,達(dá)到生產(chǎn)纫谅、消費過程中對消息結(jié)構(gòu)的共識,以達(dá)到類型安全的目的溅固。Pulsar 有內(nèi)置的 Schema 注冊方式在消息系統(tǒng)端提供傳輸消息類型約定的方式付秕,客戶端可以通過上傳 Schema 來約定主題級別的消息類型信息,而由 Pulsar 負(fù)責(zé)消息的類型檢查和有類型消息的自動序列化侍郭、反序列化询吴,從而降低多應(yīng)用間的消息解析代碼反復(fù)開發(fā)、維護(hù)的成本亮元。當(dāng)然猛计,Schema 定義與類型安全是一種可選的機(jī)制,并不會給非類型化消息的發(fā)布爆捞、消費產(chǎn)生任何性能開銷奉瘤。

在 Spark 中實現(xiàn)對 Pulsar 數(shù)據(jù)的讀寫

自 Spark 2.2 版本 Structured Streaming 正式發(fā)布,Spark 只保留了 SparkSession 作為主程序入口煮甥,你只需編寫 DataSet/DataFrame API 程序盗温,以聲明形式對數(shù)據(jù)的操作,而將具體的查詢優(yōu)化與批流處理執(zhí)行的細(xì)節(jié)交由 Spark SQL 引擎進(jìn)行處理成肘。對于一個數(shù)據(jù)處理作業(yè)卖局,需要定義 DataFrame 的產(chǎn)生、變換和寫出三個部分双霍,而將 Pulsar 作為流數(shù)據(jù)平臺與 Spark 進(jìn)行集成正是要解決如何從 Pulsar 中讀取數(shù)據(jù)(Source)和如何向 Pulsar 寫出運算結(jié)果(Sink)兩個問題砚偶。

為了實現(xiàn)以 Pulsar 為源讀取批流數(shù)據(jù)與支持批流數(shù)據(jù)向 Pulsar 的寫入,我們構(gòu)建了 Spark Pulsar Connector店煞。

對 Structured Streaming 的支持

image

上圖展示了 Structured Streaming(以下簡稱 SS )的主要組件:

  • 輸入和輸出——為了提供細(xì)粒度的容錯蟹演,SS 要求輸入數(shù)據(jù)源(Source)是可重放(replayable)的;為了提供端到端的 Exactly-Once 的語義顷蟀,需要輸出(Sink)支持冪等寫出(一條消息被多次寫入與一次寫入效果一致酒请,可由 DBMS、KV 系統(tǒng)通過鍵約束的方式支持)鸣个。

  • API——用戶通過編寫 Spark SQL 的 batch API(SQL 或 DataFrame)指定對一個或多個流羞反、表的查詢布朦,并定義一個輸出表保存所有的輸出結(jié)果,而引擎內(nèi)部決定如何將結(jié)果增量地寫到 Sink 中昼窗。為了支持流處理是趴,SS 在原有的 Spark SQL API 上添加了一些接口:

  • 觸發(fā)器(Trigger)——控制引擎觸發(fā)流處理執(zhí)行、在 Sink 中更新結(jié)果的頻率澄惊。

  • 水印機(jī)制(Watermark policy)——用戶通過指定字段做 event time唆途,來決定對晚到數(shù)據(jù)的處理。

  • 有狀態(tài)算子(Stateful operator)——用戶可以根據(jù) Key 跟蹤和更新算子內(nèi)部的可變狀態(tài)掸驱,完成復(fù)雜的業(yè)務(wù)需求(例如肛搬,基于會話的窗口)。

  • 執(zhí)行層——當(dāng)收到一個查詢時毕贼,SS 決定它的增量執(zhí)行方式温赔,進(jìn)行優(yōu)化、并開始執(zhí)行鬼癣。SS 有兩種可選的執(zhí)行模型:

  • Microbatch model(微批處理模式)——默認(rèn)的執(zhí)行方式陶贼,與 Spark Streaming 的 DStream 類似,將流切成 micro batch待秃,對每個 batch 分別處理拜秧。這種模式支持動態(tài)負(fù)載均衡、故障恢復(fù)等機(jī)制锥余,適合將吞吐率作為主要性能指標(biāo)的應(yīng)用腹纳。

  • Continuous mode(持續(xù)模式)——在集群上啟動長時間運行的算子痢掠,適合處理較為簡單驱犹、延遲敏感類應(yīng)用。

  • Log 和 State Store —— SS 利用兩種持久化存儲來提供容錯保障:一個 Write-ahead-Log(WAL)足画,記錄被成功消費且持久化寫出的每個數(shù)據(jù)源中的位置雄驹;一個大規(guī)模的 state store, 存儲長期運行的聚集算子內(nèi)部的狀態(tài)快照淹辞。當(dāng)故障發(fā)生時医舆,SS 會根據(jù)快照的位置,通過重放之后的消息完成流處理狀態(tài)的恢復(fù)象缀。

具體到源碼層面蔬将,Source 接口定義了可重放數(shù)據(jù)源需要提供的功能。

trait Source {  def schema: StructType  def getOffset: Option[Offset]  def getBatch(start: Option[Offset], end: Offset): DataFrame  def commit(end: Offset): Unit  def stop(): Unit}trait Sink {  def addBatch(batchId: Long, data: DataFrame): Unit}

以 microbatch 執(zhí)行模式為例:

  1. 在每個 microbatch 的最開始央星,SS 會向 source 詢問當(dāng)前的最新進(jìn)度(getOffset)霞怀,并將其持久化到 WAL 中。

  2. 隨后莉给,source 根據(jù) SS 提供的 start end 偏移量毙石,提供區(qū)間范圍的數(shù)據(jù)(getBatch)廉沮。

  3. SS 觸發(fā)計算邏輯的優(yōu)化和編譯,把計算結(jié)果寫出給 sink(addBatch)徐矩,這時才觸發(fā)實際的取數(shù)據(jù)操作以及計算過程滞时。

  4. 在數(shù)據(jù)完整寫出到 sink 后,SS 通知 source 可以廢棄數(shù)據(jù)(commit)滤灯,并將成功執(zhí)行的 batchId 寫入內(nèi)部維護(hù)的 commitLog 中坪稽。

具體到 Pulsar 的 connector 實現(xiàn)中:

  1. 在所有批次開始執(zhí)行前,SS 會調(diào)用 schema 方法返回消息的結(jié)構(gòu)信息鳞骤,在 schema 方法內(nèi)部刽漂,我們從 Pulsar 的 Schema Registry 提取出所有主題的 Schema,并進(jìn)行一致性檢查弟孟。

  2. 隨后贝咙,我們?yōu)槊總€主題分區(qū)創(chuàng)建一個消費者,按照 (start, end] 返回主題分區(qū)中的數(shù)據(jù)拂募。

  3. 當(dāng)收到 SS 的 commit 通知時庭猩,通過 topics 中的 resetCursor 向 Pulsar 標(biāo)志消息消費的完成。Sink 中構(gòu)建的生產(chǎn)者則將 addBatch 中獲取的實際數(shù)據(jù)以消息形式追加寫入相應(yīng)的主題中陈症。

    對批處理作業(yè)的支持

在某個時間點執(zhí)行的批作業(yè)蔼水,可以看作是對 Pulsar 平臺中的流數(shù)據(jù)在一個時間點的快照進(jìn)行的數(shù)據(jù)分析。Spark 對歷史數(shù)據(jù)的查詢是以 Relation 為單位录肯,Spark Pulsar Connector 提供 createRelation 方法的實現(xiàn)根據(jù)用戶指定的多個主題分區(qū)構(gòu)建表趴腋,并返回包含 Schema 信息的 DataSet。在查詢計劃階段论咏,Connector 的功能分成兩步:首先优炬,根據(jù)用戶提供的一個或多個主題,在 Pulsar Schema Registry 中查找主題 Schema厅贪,并檢查多個主題 Schema 的一致性蠢护;其次,將用戶指定的所有主題分區(qū)進(jìn)行任務(wù)劃分(Partition)养涮,得到的分片即是 Spark source task 的執(zhí)行粒度葵硕。

Pulsar 提供了兩層的接口對其中的數(shù)據(jù)進(jìn)行訪問,基于主題分區(qū)的 Consumer/Reader 接口贯吓,以傳統(tǒng)消息接收為語義的順序數(shù)據(jù)讀刃赴肌;Segment 級的讀接口悄谐,提供對 Segment 數(shù)據(jù)的直接讀取介评。因此,相應(yīng)地從 Pulsar 讀數(shù)據(jù)執(zhí)行批作業(yè)可以分成兩種粒度(即讀取數(shù)據(jù)的并行度)進(jìn)行:以主題分區(qū)為粒度(每個主題分區(qū)作為一個分片)尊沸;以 Segment 為粒度(將一個主題分區(qū)的多個 Segment 組織成一個分片威沫,因此一個主題分區(qū)會有多個對應(yīng)的分片)贤惯。你可以按照批作業(yè)的并行度需求和可分配計算資源選擇合適的消息讀取的并行粒度。另一方面棒掠,將批作業(yè)的執(zhí)行存儲到 Pulsar 也很直觀孵构,你只需指定寫入的主題和消息路由規(guī)則(RoundRobin 或者按 Key 劃分),在 Sink task 中創(chuàng)建的每個生產(chǎn)者會將待寫出的消息送至對應(yīng)的主題分區(qū)烟很。

如何使用 Spark Pulsar Connector

根據(jù)一個或多個主題創(chuàng)建流處理 Source颈墅。

val df = spark  .readStream  .format("pulsar")  .option("service.url", "pulsar://localhost:6650")  .option("admin.url", "http://localhost:8080")  .option("topicsPattern", "topic.*") // Subscribe to a pattern  // .option("topics", "topic1,topic2")    // Subscribe to multiple topics  // .option("topic", "topic1"). //subscribe to a single topic  .option("startingOffsets", startingOffsets)  .load()df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")  .as[(String, String)]

構(gòu)建批處理 Source。

val df = spark  .read  .format("pulsar")  .option("service.url", "pulsar://localhost:6650")  .option("admin.url", "http://localhost:8080")  .option("topicsPattern", "topic.*")  .option("startingOffsets", "earliest")  .option("endingOffsets", "latest")  .load()df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")  .as[(String, String)]

使用數(shù)據(jù)中本身的 topic 字段向多個主題進(jìn)行持續(xù) Sink雾袱。

val ds = df  .selectExpr("topic", "CAST(__key AS STRING)", "CAST(value AS STRING)")  .writeStream  .format("pulsar")  .option("service.url", "pulsar://localhost:6650")  .start()

將批處理結(jié)果寫回 Pulsar恤筛。

df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")  .write  .format("pulsar")  .option("service.url", "pulsar://localhost:6650")  .option("topic", "topic1")  .save()

注意

由于 Spark Pulsar Connector 支持結(jié)構(gòu)化消息的消費和寫入,為了避免消息負(fù)載中字段和消息元數(shù)據(jù)(event time芹橡、publish time肠虽、key 和 messageId)的潛在命名沖突吝秕,消息元數(shù)據(jù)字段在 Spark schema 中以雙下劃線做為前綴(例如,__eventTime)。

參考資料

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末合愈,一起剝皮案震驚了整個濱河市芜辕,隨后出現(xiàn)的幾起案子浙巫,更是在濱河造成了極大的恐慌雇逞,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,451評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件珠移,死亡現(xiàn)場離奇詭異弓乙,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)钧惧,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,172評論 3 394
  • 文/潘曉璐 我一進(jìn)店門暇韧,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人垢乙,你說我怎么就攤上這事锨咙。” “怎么了追逮?”我有些...
    開封第一講書人閱讀 164,782評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長粹舵。 經(jīng)常有香客問我钮孵,道長,這世上最難降的妖魔是什么眼滤? 我笑而不...
    開封第一講書人閱讀 58,709評論 1 294
  • 正文 為了忘掉前任巴席,我火速辦了婚禮,結(jié)果婚禮上诅需,老公的妹妹穿的比我還像新娘漾唉。我一直安慰自己荧库,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,733評論 6 392
  • 文/花漫 我一把揭開白布赵刑。 她就那樣靜靜地躺著分衫,像睡著了一般。 火紅的嫁衣襯著肌膚如雪般此。 梳的紋絲不亂的頭發(fā)上蚪战,一...
    開封第一講書人閱讀 51,578評論 1 305
  • 那天,我揣著相機(jī)與錄音铐懊,去河邊找鬼邀桑。 笑死,一個胖子當(dāng)著我的面吹牛科乎,可吹牛的內(nèi)容都是我干的壁畸。 我是一名探鬼主播,決...
    沈念sama閱讀 40,320評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼茅茂,長吁一口氣:“原來是場噩夢啊……” “哼瓤摧!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起玉吁,我...
    開封第一講書人閱讀 39,241評論 0 276
  • 序言:老撾萬榮一對情侶失蹤照弥,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后进副,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體这揣,經(jīng)...
    沈念sama閱讀 45,686評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,878評論 3 336
  • 正文 我和宋清朗相戀三年影斑,在試婚紗的時候發(fā)現(xiàn)自己被綠了给赞。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,992評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡矫户,死狀恐怖片迅,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情皆辽,我是刑警寧澤柑蛇,帶...
    沈念sama閱讀 35,715評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站驱闷,受9級特大地震影響耻台,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜空另,卻給世界環(huán)境...
    茶點故事閱讀 41,336評論 3 330
  • 文/蒙蒙 一盆耽、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦摄杂、人聲如沸坝咐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,912評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽墨坚。三九已至,卻和暖如春氮昧,著一層夾襖步出監(jiān)牢的瞬間框杜,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,040評論 1 270
  • 我被黑心中介騙來泰國打工袖肥, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留咪辱,地道東北人。 一個月前我還...
    沈念sama閱讀 48,173評論 3 370
  • 正文 我出身青樓椎组,卻偏偏與公主長得像油狂,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子寸癌,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,947評論 2 355

推薦閱讀更多精彩內(nèi)容