spark 基礎(chǔ)知識(shí)整理(三)- spark SQL專題

一兽埃、簡(jiǎn)介

Spark SQL是Spark中處理結(jié)構(gòu)化數(shù)據(jù)的模塊哀峻。與基礎(chǔ)的Spark RDD API不同玫氢,Spark SQL的接口提供了更多關(guān)于數(shù)據(jù)的結(jié)構(gòu)信息和計(jì)算任務(wù)的運(yùn)行時(shí)信息。在Spark內(nèi)部谜诫,Spark SQL會(huì)能夠用于做優(yōu)化的信息比RDD API更多一些。Spark SQL如今有了三種不同的API:SQL語(yǔ)句攻旦、DataFrame API和最新的Dataset API喻旷。不過(guò)真正運(yùn)行計(jì)算的時(shí)候,無(wú)論你使用哪種API或語(yǔ)言牢屋,Spark SQL使用的執(zhí)行引擎都是同一個(gè)且预。這種底層的統(tǒng)一,使開(kāi)發(fā)者可以在不同的API之間來(lái)回切換烙无,你可以選擇一種最自然的方式锋谐,來(lái)表達(dá)你的需求。
(本文針對(duì)spark1.6版本截酷,示例語(yǔ)言為Scala)

二涮拗、概念

1. SQL。Spark SQL的一種用法是直接執(zhí)行SQL查詢語(yǔ)句,你可使用最基本的SQL語(yǔ)法三热,也可以選擇HiveQL語(yǔ)法鼓择。Spark SQL可以從已有的Hive中讀取數(shù)據(jù)。更詳細(xì)的請(qǐng)參考Hive Tables 這一節(jié)就漾。如果用其他編程語(yǔ)言運(yùn)行SQL呐能,Spark SQL將以DataFrame返回結(jié)果。你還可以通過(guò)命令行command-line 或者 JDBC/ODBC 使用Spark SQL抑堡。

2. DataFrame摆出。是一種分布式數(shù)據(jù)集合,每一條數(shù)據(jù)都由幾個(gè)命名字段組成首妖。概念上來(lái)說(shuō)偎漫,她和關(guān)系型數(shù)據(jù)庫(kù)的表 或者 R和Python中的data frame等價(jià),只不過(guò)在底層悯搔,DataFrame采用了更多優(yōu)化骑丸。DataFrame可以從很多數(shù)據(jù)源(sources)加載數(shù)據(jù)并構(gòu)造得到,如:結(jié)構(gòu)化數(shù)據(jù)文件妒貌,Hive中的表通危,外部數(shù)據(jù)庫(kù),或者已有的RDD灌曙。
DataFrame API支持Scala, Java, Python, and R菊碟。


3. Datasets。是Spark-1.6新增的一種API在刺,目前還是實(shí)驗(yàn)性的逆害。Dataset想要把RDD的優(yōu)勢(shì)(強(qiáng)類型,可以使用lambda表達(dá)式函數(shù))和Spark SQL的優(yōu)化執(zhí)行引擎的優(yōu)勢(shì)結(jié)合到一起蚣驼。Dataset可以由JVM對(duì)象構(gòu)建(constructed )得到魄幕,而后Dataset上可以使用各種transformation算子(map,flatMap颖杏,filter 等)纯陨。
Dataset API 對(duì) ScalaJava的支持接口是一致的,但目前還不支持Python留储,不過(guò)Python自身就有語(yǔ)言動(dòng)態(tài)特性優(yōu)勢(shì)(例如翼抠,你可以使用字段名來(lái)訪問(wèn)數(shù)據(jù),row.columnName)获讳。對(duì)Python的完整支持在未來(lái)的版本會(huì)增加進(jìn)來(lái)阴颖。

三、創(chuàng)建并操作DataFrame

Spark應(yīng)用可以用SparkContext創(chuàng)建DataFrame丐膝,所需的數(shù)據(jù)來(lái)源可以是已有的RDD(existing RDD
)量愧,或者Hive表钾菊,或者其他數(shù)據(jù)源(data sources.)以下是一個(gè)從JSON文件創(chuàng)建并操作DataFrame的小例子:

val sc: SparkContext // 已有的 SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("examples/src/main/resources/people.json")

// 將DataFrame內(nèi)容打印到stdout
df.show()
// age  name
// null Michael
// 30   Andy
// 19   Justin

// 打印數(shù)據(jù)樹(shù)形結(jié)構(gòu)
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// select "name" 字段
df.select("name").show()
// name
// Michael
// Andy
// Justin

// 展示所有人,但所有人的 age 都加1
df.select(df("name"), df("age") + 1).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// 篩選出年齡大于21的人
df.filter(df("age") > 21).show()
// age name
// 30  Andy

// 計(jì)算各個(gè)年齡的人數(shù)
df.groupBy("age").count().show()
// age  count
// null 1
// 19   1
// 30   1

SQLContext.sql可以執(zhí)行一個(gè)SQL查詢侠畔,并返回DataFrame結(jié)果结缚。

val sqlContext = ... // 已有一個(gè) SQLContext 對(duì)象
val df = sqlContext.sql("SELECT * FROM table")

三、spark SQL與RDD互操作

Spark SQL有兩種方法將RDD轉(zhuǎn)為DataFrame软棺。分別為反射機(jī)制編程方式红竭。

1. 利用反射推導(dǎo)schema。####

Spark SQL的Scala接口支持自動(dòng)將包含case class對(duì)象的RDD轉(zhuǎn)為DataFrame喘落。對(duì)應(yīng)的case class定義了表的schema茵宪。case class的參數(shù)名通過(guò)反射,映射為表的字段名瘦棋。case class還可以嵌套一些復(fù)雜類型稀火,如Seq和Array。RDD隱式轉(zhuǎn)換成DataFrame后赌朋,可以進(jìn)一步注冊(cè)成表凰狞。隨后,你就可以對(duì)表中數(shù)據(jù)使用 SQL語(yǔ)句查詢了沛慢。

// sc 是已有的 SparkContext 對(duì)象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 為了支持RDD到DataFrame的隱式轉(zhuǎn)換
import sqlContext.implicits._

// 定義一個(gè)case class.
// 注意:Scala 2.10的case class最多支持22個(gè)字段赡若,要繞過(guò)這一限制,
// 你可以使用自定義class团甲,并實(shí)現(xiàn)Product接口逾冬。當(dāng)然,你也可以改用編程方式定義schema
case class Person(name: String, age: Int)

// 創(chuàng)建一個(gè)包含Person對(duì)象的RDD躺苦,并將其注冊(cè)成table
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// sqlContext.sql方法可以直接執(zhí)行SQL語(yǔ)句
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// SQL查詢的返回結(jié)果是一個(gè)DataFrame身腻,且能夠支持所有常見(jiàn)的RDD算子
// 查詢結(jié)果中每行的字段可以按字段索引訪問(wèn):
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// 或者按字段名訪問(wèn):
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] 會(huì)一次性返回多列,并以Map[String, T]為返回結(jié)果類型
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// 返回結(jié)果: Map("name" -> "Justin", "age" -> 19)

2. 編程方式定義Schema匹厘。####

如果不能事先通過(guò)case class定義schema(例如嘀趟,記錄的字段結(jié)構(gòu)是保存在一個(gè)字符串,或者其他文本數(shù)據(jù)集中愈诚,需要先解析去件,又或者字段對(duì)不同用戶有所不同),那么你可能需要按以下三個(gè)步驟扰路,以編程方式的創(chuàng)建一個(gè)DataFrame:

從已有的RDD創(chuàng)建一個(gè)包含Row對(duì)象的RDD,用StructType創(chuàng)建一個(gè)schema倔叼,和步驟1中創(chuàng)建的RDD的結(jié)構(gòu)相匹配汗唱,把得到的schema應(yīng)用于包含Row對(duì)象的RDD,調(diào)用這個(gè)方法來(lái)實(shí)現(xiàn)這一步:SQLContext.createDataFrame
例如:

// sc 是已有的SparkContext對(duì)象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 創(chuàng)建一個(gè)RDD
val people = sc.textFile("examples/src/main/resources/people.txt")

// 數(shù)據(jù)的schema被編碼與一個(gè)字符串中
val schemaString = "name age"

// Import Row.
import org.apache.spark.sql.Row;

// Import Spark SQL 各個(gè)數(shù)據(jù)類型
import org.apache.spark.sql.types.{StructType,StructField,StringType};

// 基于前面的字符串生成schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// 將RDD[people]的各個(gè)記錄轉(zhuǎn)換為Rows丈攒,即:得到一個(gè)包含Row對(duì)象的RDD
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// 將schema應(yīng)用到包含Row對(duì)象的RDD上哩罪,得到一個(gè)DataFrame
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// 將DataFrame注冊(cè)為table
peopleDataFrame.registerTempTable("people")

// 執(zhí)行SQL語(yǔ)句
val results = sqlContext.sql("SELECT name FROM people")

// SQL查詢的結(jié)果是DataFrame授霸,且能夠支持所有常見(jiàn)的RDD算子
// 并且其字段可以以索引訪問(wèn),也可以用字段名訪問(wèn)
results.map(t => "Name: " + t(0)).collect().foreach(println)

四际插、spark SQL與其它數(shù)據(jù)源的連接與操作

Spark SQL支持基于DataFrame操作一系列不同的數(shù)據(jù)源碘耳。DataFrame既可以當(dāng)成一個(gè)普通RDD來(lái)操作,也可以將其注冊(cè)成一個(gè)臨時(shí)表來(lái)查詢框弛。把 DataFrame注冊(cè)為table之后辛辨,你就可以基于這個(gè)table執(zhí)行SQL語(yǔ)句了。本節(jié)將描述加載和保存數(shù)據(jù)的一些通用方法瑟枫,包含了不同的 Spark數(shù)據(jù)源斗搞,然后深入介紹一下內(nèi)建數(shù)據(jù)源可用選項(xiàng)。
  在最簡(jiǎn)單的情況下慷妙,所有操作都會(huì)以默認(rèn)類型數(shù)據(jù)源來(lái)加載數(shù)據(jù)(默認(rèn)是Parquet僻焚,除非修改了spark.sql.sources.default 配置)。

val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

你也可以手動(dòng)指定數(shù)據(jù)源膝擂,并設(shè)置一些額外的選項(xiàng)參數(shù)虑啤。數(shù)據(jù)源可由其全名指定(如,org.apache.spark.sql.parquet)架馋,而 對(duì)于內(nèi)建支持的數(shù)據(jù)源狞山,可以使用簡(jiǎn)寫名(json, parquet, jdbc)。任意類型數(shù)據(jù)源創(chuàng)建的DataFrame都可以用下面這種語(yǔ)法轉(zhuǎn)成其他類型數(shù)據(jù)格式绩蜻。

val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

Spark SQL還支持直接對(duì)文件使用SQL查詢铣墨,不需要用read方法把文件加載進(jìn)來(lái)。

val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

1. 連接JSON數(shù)據(jù)集####

Spark SQL在加載JSON數(shù)據(jù)的時(shí)候办绝,可以自動(dòng)推導(dǎo)其schema并返回DataFrame伊约。用SQLContext.read.json讀取一個(gè)包含String的RDD或者JSON文件,即可實(shí)現(xiàn)這一轉(zhuǎn)換孕蝉。

注意屡律,通常所說(shuō)的json文件只是包含一些json數(shù)據(jù)的文件,而不是我們所需要的JSON格式文件降淮。JSON格式文件必須每一行是一個(gè)獨(dú)立超埋、完整的的JSON對(duì)象。因此佳鳖,一個(gè)常規(guī)的多行json文件經(jīng)常會(huì)加載失敗霍殴。

// sc是已有的SparkContext對(duì)象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 數(shù)據(jù)集是由路徑指定的
// 路徑既可以是單個(gè)文件,也可以還是存儲(chǔ)文本文件的目錄
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)

// 推導(dǎo)出來(lái)的schema系吩,可由printSchema打印出來(lái)
people.printSchema()
// root
//  |-- age: integer (nullable = true)
//  |-- name: string (nullable = true)

// 將DataFrame注冊(cè)為table
people.registerTempTable("people")

// 跑SQL語(yǔ)句吧来庭!
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// 另一種方法是,用一個(gè)包含JSON字符串的RDD來(lái)創(chuàng)建DataFrame
val anotherPeopleRDD = sc.parallelize(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
2. 連接Hive表

Spark SQL支持從Apache Hive讀 寫數(shù)據(jù)穿挨。然而月弛,Hive依賴項(xiàng)太多肴盏,所以沒(méi)有把Hive包含在默認(rèn)的Spark發(fā)布包里。要支持Hive帽衙,需要在編譯spark的時(shí)候增加-Phive和 -Phive-thriftserver標(biāo)志菜皂。這樣編譯打包的時(shí)候?qū)?huì)把Hive也包含進(jìn)來(lái)。注意厉萝,hive的jar包也必須出現(xiàn)在所有的worker節(jié) 點(diǎn)上恍飘,訪問(wèn)Hive數(shù)據(jù)時(shí)候會(huì)用到(如:使用hive的序列化和反序列化SerDes時(shí))。
  Hive配置在conf/目錄下hive-site.xml冀泻,core-site.xml(安全配置)常侣,hdfs-site.xml(HDFS配 置)文件中。請(qǐng)注意弹渔,如果在YARN cluster(yarn-cluster mode)模式下執(zhí)行一個(gè)查詢的話胳施,lib_mananged/jar/下面的datanucleus 的jar包,和conf/下的hive-site.xml必須在驅(qū)動(dòng)器(driver)和所有執(zhí)行器(executor)都可用肢专。一種簡(jiǎn)便的方法是舞肆,通過(guò) spark-submit命令的–jars和–file選項(xiàng)來(lái)提交這些文件。
  如果使用Hive博杖,則必須構(gòu)建一個(gè)HiveContext椿胯,HiveContext是派生于SQLContext的,添加了在Hive Metastore里查詢表的支持剃根,以及對(duì)HiveQL的支持哩盲。用戶沒(méi)有現(xiàn)有的Hive部署,也可以創(chuàng)建一個(gè)HiveContext狈醉。如果沒(méi)有在 hive-site.xml里配置廉油,那么HiveContext將會(huì)自動(dòng)在當(dāng)前目錄下創(chuàng)建一個(gè)metastore_db目錄,再根據(jù)HiveConf設(shè)置 創(chuàng)建一個(gè)warehouse目錄(默認(rèn)/user/hive/warehourse)苗傅。所以請(qǐng)注意抒线,你必須把/user/hive/warehouse的 寫權(quán)限賦予啟動(dòng)spark應(yīng)用程序的用戶。

// sc是一個(gè)已有的SparkContext對(duì)象
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// 這里用的是HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

3. 用JDBC連接其他數(shù)據(jù)庫(kù)####

Spark SQL也可以用JDBC訪問(wèn)其他數(shù)據(jù)庫(kù)渣慕。這一功能應(yīng)該優(yōu)先于使用JdbcRDD嘶炭。因?yàn)樗祷匾粋€(gè)DataFrame,而DataFrame在Spark SQL中操作更簡(jiǎn)單逊桦,且更容易和來(lái)自其他數(shù)據(jù)源的數(shù)據(jù)進(jìn)行交互關(guān)聯(lián)眨猎。JDBC數(shù)據(jù)源在java和python中用起來(lái)也很簡(jiǎn)單,不需要用戶提供額外的 ClassTag强经。(注意宵呛,這與Spark SQL JDBC server不同,Spark SQL JDBC server允許其他應(yīng)用執(zhí)行Spark SQL查詢)
  首先夕凝,你需要在spark classpath中包含對(duì)應(yīng)數(shù)據(jù)庫(kù)的JDBC driver宝穗,下面這行包括了用于訪問(wèn)postgres的數(shù)據(jù)庫(kù)driver
SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

val jdbcDF = sqlContext.read.format("jdbc").options(
  Map("url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename")).load()

注意:

  • JDBC driver class必須在所有client session或者executor上,對(duì)java的原生classloader可見(jiàn)码秉。這是因?yàn)镴ava的DriverManager在打開(kāi)一個(gè)連接之 前逮矛,會(huì)做安全檢查,并忽略所有對(duì)原聲classloader不可見(jiàn)的driver转砖。最簡(jiǎn)單的一種方法须鼎,就是在所有worker節(jié)點(diǎn)上修改 compute_classpath.sh,并包含你所需的driver jar包府蔗。
  • 一些數(shù)據(jù)庫(kù)晋控,如H2,會(huì)把所有的名字轉(zhuǎn)大寫姓赤。對(duì)于這些數(shù)據(jù)庫(kù)赡译,在Spark SQL中必須也使用大寫。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末不铆,一起剝皮案震驚了整個(gè)濱河市蝌焚,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌誓斥,老刑警劉巖只洒,帶你破解...
    沈念sama閱讀 206,126評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異劳坑,居然都是意外死亡毕谴,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門距芬,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)涝开,“玉大人,你說(shuō)我怎么就攤上這事蔑穴≈已埃” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,445評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵存和,是天一觀的道長(zhǎng)奕剃。 經(jīng)常有香客問(wèn)我,道長(zhǎng)捐腿,這世上最難降的妖魔是什么纵朋? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,185評(píng)論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮茄袖,結(jié)果婚禮上操软,老公的妹妹穿的比我還像新娘。我一直安慰自己宪祥,他們只是感情好聂薪,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布家乘。 她就那樣靜靜地躺著,像睡著了一般藏澳。 火紅的嫁衣襯著肌膚如雪仁锯。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 48,970評(píng)論 1 284
  • 那天翔悠,我揣著相機(jī)與錄音业崖,去河邊找鬼。 笑死蓄愁,一個(gè)胖子當(dāng)著我的面吹牛双炕,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播撮抓,決...
    沈念sama閱讀 38,276評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼妇斤,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了胀滚?” 一聲冷哼從身側(cè)響起趟济,我...
    開(kāi)封第一講書(shū)人閱讀 36,927評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎咽笼,沒(méi)想到半個(gè)月后顷编,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,400評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡剑刑,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評(píng)論 2 323
  • 正文 我和宋清朗相戀三年媳纬,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片施掏。...
    茶點(diǎn)故事閱讀 37,997評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡钮惠,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出七芭,到底是詐尸還是另有隱情素挽,我是刑警寧澤,帶...
    沈念sama閱讀 33,646評(píng)論 4 322
  • 正文 年R本政府宣布狸驳,位于F島的核電站预明,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏耙箍。R本人自食惡果不足惜撰糠,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望辩昆。 院中可真熱鬧阅酪,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,204評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至辉词,卻和暖如春尉辑,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背较屿。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,423評(píng)論 1 260
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留卓练,地道東北人隘蝎。 一個(gè)月前我還...
    沈念sama閱讀 45,423評(píng)論 2 352
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像襟企,于是被迫代替她去往敵國(guó)和親嘱么。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容