Flink SQL w/ Blink Planner執(zhí)行流程解析(上篇)

Foreword

大數(shù)據(jù)領(lǐng)域SQL化的風(fēng)潮方興未艾(所謂"Everybody knows SQL")当窗,F(xiàn)link自然也不能“免俗”够坐。Flink SQL是Flink系統(tǒng)內(nèi)部最高級(jí)別的API,也是流批一體思想的集大成者崖面。用戶可以通過簡(jiǎn)單明了的SQL語(yǔ)句像查表一樣執(zhí)行流任務(wù)或批任務(wù)元咙,屏蔽了底層DataStream/DataSet API的復(fù)雜細(xì)節(jié),降低了使用門檻巫员。

那么庶香,一條Flink SQL語(yǔ)句是如何轉(zhuǎn)化成可執(zhí)行的任務(wù)的呢?本文以最新的Flink 1.11版本為藍(lán)本疏遏,分上下兩篇詳細(xì)梳理流程脉课。在此之前,先簡(jiǎn)要介紹Apache Calcite與Blink Planner财异。

Prerequisite: Apache Calcite & Workflow

不同于Spark專門打造Catalyst作為其SQL層核心的騷操作倘零,F(xiàn)link沒有重復(fù)造輪子,而是務(wù)實(shí)地直接利用了通用的SQL解析與優(yōu)化引擎——Apache Calcite戳寸。Calcite在Hive呈驶、Drill、Pheonix疫鹊、Kylin諸框架中都已經(jīng)有了非常成熟的應(yīng)用袖瞻,F(xiàn)link選擇它也是情理之中。

下圖示出Calcite在整個(gè)Flink Table & SQL體系中的作用拆吆。

From https://ververica.cn/developers/flink-internals-table-and-sql-api/聋迎,注意右下角的DataSet RelNode Tree有誤,應(yīng)改為DataStream RelNode Tree

可見枣耀,Calcite在此體系內(nèi)需要負(fù)責(zé)以下任務(wù):

  • 解析(parsing)——將SQL語(yǔ)句轉(zhuǎn)化為抽象語(yǔ)法樹(AST)霉晕,即SqlNode樹。
  • 驗(yàn)證(validation)——根據(jù)Catalog中的元數(shù)據(jù)進(jìn)行語(yǔ)法檢查。
  • 邏輯計(jì)劃(logical planning)——根據(jù)AST和元數(shù)據(jù)構(gòu)造出邏輯計(jì)劃牺堰,即RelNode樹拄轻。
  • 邏輯計(jì)劃優(yōu)化(logical plan optimization)——按照預(yù)定義的優(yōu)化規(guī)則RelOptRule優(yōu)化邏輯計(jì)劃。Calcite中的優(yōu)化器RelOptPlanner有兩種伟葫,一是基于規(guī)則優(yōu)化(RBO)的HepPlanner恨搓,二是基于代價(jià)優(yōu)化(CBO)的VolcanoPlanner。
  • 物理計(jì)劃(physical planning)——將優(yōu)化的邏輯計(jì)劃翻譯成對(duì)應(yīng)執(zhí)行邏輯的物理計(jì)劃筏养。

前4個(gè)階段其實(shí)就是Calcite的標(biāo)準(zhǔn)工作流斧抱,同時(shí)這5個(gè)階段也基本上涵蓋了Flink SQL執(zhí)行流程的主體部分。Table API相對(duì)于SQL只是在解析撼玄、驗(yàn)證方面有些不同(解析的不是SQL語(yǔ)句而是算子樹夺姑,再用Calcite RelBuilder生成邏輯計(jì)劃)。而在物理計(jì)劃之后掌猛,還需要通過代碼生成(code generation)最終轉(zhuǎn)化為能夠直接執(zhí)行的DataStream/DataSet API程序。下面分析Flink SQL的執(zhí)行步驟時(shí)眉睹,就以上面的5 + 1 = 6個(gè)階段為準(zhǔn)荔茬。

本文作為上篇,先講解比較簡(jiǎn)單的解析竹海、驗(yàn)證和邏輯計(jì)劃慕蔚,下篇再講解比較復(fù)雜的邏輯計(jì)劃優(yōu)化、物理計(jì)劃和代碼生成斋配。

關(guān)于Calcite細(xì)節(jié)的講解有珠玉在前孔飒,看官可直接參考以下幾篇文章,本文不再班門弄斧了艰争。

Prerequisite: Blink Planner

Flink Table/SQL體系中的Planner(即查詢處理器)是溝通Flink與Calcite的橋梁坏瞄,為Table/SQL API提供完整的解析、優(yōu)化和執(zhí)行環(huán)境甩卓。Blink Planner從1.9版本開始合并到Flink主干鸠匀,并從1.11版本開始成為默認(rèn)Planner,而原有的Old Planner將會(huì)逐漸退役逾柿。

Blink Planner真正地踐行了流批一體的處理方式缀棍。它根據(jù)流處理作業(yè)和批處理作業(yè)的不同,分別提供了StreamPlanner和BatchPlanner兩種實(shí)現(xiàn)机错。這兩種Planner的底層共享了基類PlannerBase的很多源碼爬范,且作業(yè)最終都會(huì)翻譯成基于DataStream Transformation API的執(zhí)行邏輯(即將批處理視為流處理的特殊情況)。通過如下類圖即可看出一二弱匪。

Blink Planner正式發(fā)布時(shí)社區(qū)的介紹見這里青瀑,不多廢話了。

Example Preparation

為了方便講解,使用一個(gè)簡(jiǎn)單的基于官方StreamSQLExample改造而來的示例狱窘,完整代碼如下杜顺。

object StreamSQLExample {
  def main(args: Array[String]): Unit = {
    val settings = EnvironmentSettings.newInstance()
        .useBlinkPlanner()
        .inStreamingMode()
        .build()

    // set up execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tEnv = StreamTableEnvironment.create(env, settings)

    val orders: DataStream[Order] = env.fromCollection(Seq(
      Order(20200819177L, 1L, "beer", 3),
      Order(20200819234L, 2L, "diaper", 4),
      Order(20200819239L, 2L, "beef", 6),
      Order(20200820066L, 3L, "rubber", 2),
      Order(20200820100L, 3L, "beer", 5)))

    val users: DataStream[User] = env.fromCollection(Seq(
      User(1L, "Alice", 27),
      User(2L, "Bob", 26),
      User(3L, "Charlie", 25)))

    // register DataStream as Table
    val tableA = tEnv.createTemporaryView("orders", orders, 'id, 'uid, 'product, 'amount)
    val tableB = tEnv.createTemporaryView("users", users, 'id, 'name, 'age)

    // join the two tables
    val sql =
      s"""
         |SELECT u.name,sum(o.amount) AS total
         |FROM orders o
         |INNER JOIN users u ON o.uid = u.id
         |WHERE u.age < 27
         |GROUP BY u.name
        """.stripMargin

    print(tEnv.explainSql(sql))

    val result = tEnv.sqlQuery(sql)
    result.toRetractStream[Row].print()

    env.execute()
  }

  case class Order(id: Long, uid: Long, product: String, amount: Int)

  case class User(id: Long, name: String, age: Int)
}

通過TableEnvironment.explainSql()方法可以直接以文本形式獲取到上述SQL語(yǔ)句的查詢計(jì)劃,包括抽象語(yǔ)法樹蘸炸、優(yōu)化的邏輯計(jì)劃和物理執(zhí)行計(jì)劃三部分躬络,在接下來的行文中會(huì)逐漸將查詢計(jì)劃貼出來。

好了搭儒,Let's get our hands dirty.

Stage 1: Parsing

首先來到執(zhí)行SQL語(yǔ)句的入口TableEnvironmentImpl.sqlQuery()方法穷当,第一句就是調(diào)用Parser.parse()方法解析SQL。

// TableEnvironmentImpl.sqlQuery()
@Override
public Table sqlQuery(String query) {
    List<Operation> operations = parser.parse(query);
    // ......
}

繼續(xù)來到ParserImpl.parse()以及它調(diào)用的CalciteParser.parse()方法淹禾。

// ParserImpl.parse()
@Override
public List<Operation> parse(String statement) {
    CalciteParser parser = calciteParserSupplier.get();
    FlinkPlannerImpl planner = validatorSupplier.get();
    // parse the sql query
    SqlNode parsed = parser.parse(statement);

    Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
        .orElseThrow(() -> new TableException("Unsupported query: " + statement));
    return Collections.singletonList(operation);
}

// CalciteParser.parse()
public SqlNode parse(String sql) {
    try {
        SqlParser parser = SqlParser.create(sql, config);
        return parser.parseStmt();
    } catch (SqlParseException e) {
        throw new SqlParserException("SQL parse failed. " + e.getMessage(), e);
    }
}

可見是直接調(diào)用Calcite的SQL解析器SqlParser進(jìn)行解析的馁菜。限于篇幅原因,本文就不繼續(xù)向下追蹤了铃岔,看官可在上文提到的參考文檔中找到詳細(xì)的Calcite源碼分析汪疮。來觀察一下解析出的SqlNode數(shù)據(jù)吧。

我們知道毁习,F(xiàn)link的SQL方言與標(biāo)準(zhǔn)SQL相比有很大差別智嚷,那么如何實(shí)現(xiàn)Flink SQL專用的解析器呢?注意到構(gòu)造SqlParser的配置類SqlParser.Config時(shí)纺且,需要傳入解析器工廠SqlParserImplFactory盏道,對(duì)應(yīng)代碼如下。

// PlanningConfigurationBuilder.getSqlParserConfig()
public SqlParser.Config getSqlParserConfig() {
    return JavaScalaConversionUtil.toJava(calciteConfig(tableConfig).sqlParserConfig()).orElseGet(() ->
        // we use Java lex because back ticks are easier than double quotes in programming
        // and cases are preserved
        SqlParser
            .configBuilder()
            .setParserFactory(FlinkSqlParserImpl.FACTORY)
            .setConformance(getSqlConformance())
            .setLex(Lex.JAVA)
            .build());
}

但是flink-sql-parser模塊中默認(rèn)并沒有FlinkSqlParserImpl這個(gè)類载碌。我們只需要將該模塊build一下猜嘱,就會(huì)發(fā)現(xiàn)JavaCC開始編譯Flink SQL的語(yǔ)法描述文件(包含Calcite內(nèi)置的Parser.jj與Flink定制好的Freemarker模板),輸出如下信息:

[INFO] --- javacc-maven-plugin:2.4:javacc (javacc) @ flink-sql-parser ---
Java Compiler Compiler Version 4.0 (Parser Generator)
(type "javacc" with no arguments for help)
Reading from file /Users/lmagic/workspace-new/gitee/flink/flink-table/flink-sql-parser/target/generated-sources/javacc/Parser.jj . . .
Note: UNICODE_INPUT option is specified. Please make sure you create the parser/lexer using a Reader with the correct character encoding.
File "TokenMgrError.java" does not exist.  Will create one.
File "ParseException.java" does not exist.  Will create one.
File "Token.java" does not exist.  Will create one.
File "SimpleCharStream.java" does not exist.  Will create one.
Parser generated successfully.
[INFO] Processed 1 grammar

最終在generated-sources目錄下生成了FlinkSqlParserImpl及其附屬的類嫁艇,Calcite會(huì)利用它們進(jìn)行Flink SQL的解析朗伶。codegen目錄下則是語(yǔ)法描述文件的本體。

// FlinkSqlParserImpl
/**
 * SQL parser, generated from Parser.jj by JavaCC.
 *
 * <p>The public wrapper for this parser is {@link SqlParser}.
 */
public class FlinkSqlParserImpl extends SqlAbstractParserImpl implements FlinkSqlParserImplConstants 

Stage 2: Validation

SQL解析完成后裳仆,上文所述ParserImpl.parse()方法緊接著就會(huì)調(diào)用驗(yàn)證相關(guān)的邏輯腕让。查看SqlToOperationConverter.convert()方法的代碼。

// SqlToOperationConverter.convert()
public static Optional<Operation> convert(
        FlinkPlannerImpl flinkPlanner,
        CatalogManager catalogManager,
        SqlNode sqlNode) {
    // validate the query
    final SqlNode validated = flinkPlanner.validate(sqlNode);
    // ......
}

FlinkPlannerImpl.validate()方法與其調(diào)用的validateInternal()方法如下所示歧斟。

def validate(sqlNode: SqlNode): SqlNode = {
  val validator = getOrCreateSqlValidator()
  validateInternal(sqlNode, validator)
}

private def validateInternal(sqlNode: SqlNode, validator: FlinkCalciteSqlValidator): SqlNode = {
  try {
    sqlNode.accept(new PreValidateReWriter(
      validator.getCatalogReader.unwrap(classOf[CatalogReader]), typeFactory))
    // do extended validation.
    sqlNode match {
      case node: ExtendedSqlNode =>
        node.validate()
      case _ =>
    }
    // no need to validate row type for DDL and insert nodes.
    if (sqlNode.getKind.belongsTo(SqlKind.DDL)
      || sqlNode.getKind == SqlKind.INSERT
      || sqlNode.getKind == SqlKind.CREATE_FUNCTION
      || sqlNode.getKind == SqlKind.DROP_FUNCTION
      || sqlNode.getKind == SqlKind.OTHER_DDL
      || sqlNode.isInstanceOf[SqlShowCatalogs]
      || sqlNode.isInstanceOf[SqlShowDatabases]
      || sqlNode.isInstanceOf[SqlShowTables]
      || sqlNode.isInstanceOf[SqlShowFunctions]
      || sqlNode.isInstanceOf[SqlShowViews]
      || sqlNode.isInstanceOf[SqlRichDescribeTable]) {
      return sqlNode
    }
    sqlNode match {
      case explain: SqlExplain =>
        val validated = validator.validate(explain.getExplicandum)
        explain.setOperand(0, validated)
        explain
      case _ =>
        validator.validate(sqlNode)
    }
  }
  catch {
    case e: RuntimeException =>
      throw new ValidationException(s"SQL validation failed. ${e.getMessage}", e)
  }
}

可見纯丸,對(duì)于某些SqlNode類型是不需要驗(yàn)證的,直接返回静袖。FlinkCalciteSqlValidator繼承了Calcite的默認(rèn)驗(yàn)證器SqlValidatorImpl觉鼻,并額外規(guī)定了對(duì)字面量和join的驗(yàn)證邏輯,代碼就不再貼出來了队橙。

觀察驗(yàn)證過后的SqlNode數(shù)據(jù)坠陈,可以發(fā)現(xiàn)多出了catalog和database的名稱萨惑,說明確實(shí)根據(jù)元數(shù)據(jù)校驗(yàn)了各個(gè)元素(表名、列名及類型仇矾、函數(shù)名等)庸蔼。

經(jīng)歷了解析和驗(yàn)證階段之后,我們的查詢計(jì)劃仍然停留在SqlNode樹的形態(tài)贮匕。如果用AST的方式表達(dá)姐仅,如下所示。

== Abstract Syntax Tree ==
LogicalAggregate(group=[{0}], total=[SUM($1)])
+- LogicalProject(name=[$5], amount=[$3])
   +- LogicalFilter(condition=[<($6, 27)])
      +- LogicalJoin(condition=[=($1, $4)], joinType=[inner])
         :- LogicalTableScan(table=[[default_catalog, default_database, orders]])
         +- LogicalTableScan(table=[[default_catalog, default_database, users]])

Stage 3: Logical Planning

在邏輯計(jì)劃階段刻盐,SqlNode將被轉(zhuǎn)化成RelNode掏膏,從單純的語(yǔ)句轉(zhuǎn)化為對(duì)數(shù)據(jù)的處理邏輯,即關(guān)系代數(shù)的具體操作敦锌,如Scan馒疹、Project、Filter乙墙、Join等颖变。接著上一節(jié)SqlToOperationConverter.convert()方法來看。

// SqlToOperationConverter.convert()
public static Optional<Operation> convert(
        FlinkPlannerImpl flinkPlanner,
        CatalogManager catalogManager,
        SqlNode sqlNode) {
    // validate the query
    final SqlNode validated = flinkPlanner.validate(sqlNode);
    SqlToOperationConverter converter = new SqlToOperationConverter(flinkPlanner, catalogManager);
    if (validated instanceof SqlCreateTable) {
        return Optional.of(converter.createTableConverter.convertCreateTable((SqlCreateTable) validated));
    } else if (validated instanceof SqlDropTable) {
        return Optional.of(converter.convertDropTable((SqlDropTable) validated));
    } else if (validated instanceof SqlAlterTable) {
        return Optional.of(converter.convertAlterTable((SqlAlterTable) validated));
    } else if (validated instanceof SqlAlterView) {
        return Optional.of(converter.convertAlterView((SqlAlterView) validated));
    } else if (validated instanceof SqlCreateFunction) {
        return Optional.of(converter.convertCreateFunction((SqlCreateFunction) validated));
    } else if (/*...*/) { 
        // ......
    } else if (validated instanceof SqlRichDescribeTable) {
        return Optional.of(converter.convertDescribeTable((SqlRichDescribeTable) validated));
    } else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
        return Optional.of(converter.convertSqlQuery(validated));
    } else {
        return Optional.empty();
    }
}

這個(gè)方法會(huì)用很多重if-else判斷驗(yàn)證之后的SqlNode屬于何種類型听想,再分別調(diào)用不同的方法觸發(fā)轉(zhuǎn)換為RelNode的操作悼做。由于示例中執(zhí)行的是一個(gè)SELECT語(yǔ)句,所以從convertSqlQuery()方法繼續(xù)哗魂。

// SqlToOperationConverter.convertSqlQuery()
private Operation convertSqlQuery(SqlNode node) {
    return toQueryOperation(flinkPlanner, node);
}

// SqlToOperationConverter.toQueryOperation()
private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated) {
    // transform to a relational tree
    RelRoot relational = planner.rel(validated);
    return new PlannerQueryOperation(relational.project());
}

注釋已經(jīng)寫得很明白了,F(xiàn)linkPlannerImpl.rel()方法將SqlNode樹轉(zhuǎn)化為RelNode樹漓雅,并返回其根RelRoot录别。而rel()方法直接利用Calcite內(nèi)置的SqlToRelConverter組件來遞歸地轉(zhuǎn)換,其具體邏輯仍然可見參考文檔邻吞。

// FlinkPlannerImpl.rel()
private def rel(validatedSqlNode: SqlNode, sqlValidator: FlinkCalciteSqlValidator) = {
  try {
    assert(validatedSqlNode != null)
    val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
      createToRelContext(),
      sqlValidator,
      sqlValidator.getCatalogReader.unwrap(classOf[CalciteCatalogReader]),
      cluster,
      convertletTable,
      sqlToRelConverterConfig)
    sqlToRelConverter.convertQuery(validatedSqlNode, false, true)
    // ......
  } catch {
    case e: RelConversionException => throw new TableException(e.getMessage)
  }
}

下圖示出RelRoot所表示的RelNode樹形結(jié)構(gòu)组题,注意LogicalAggregate、LogicalProject等都是Calcite中AbstractRelNode的實(shí)現(xiàn)類抱冷。

To Be Continued...

民那晚安晚安崔列。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市旺遮,隨后出現(xiàn)的幾起案子赵讯,更是在濱河造成了極大的恐慌,老刑警劉巖耿眉,帶你破解...
    沈念sama閱讀 210,835評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件边翼,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡鸣剪,警方通過查閱死者的電腦和手機(jī)组底,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,900評(píng)論 2 383
  • 文/潘曉璐 我一進(jìn)店門丈积,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人债鸡,你說我怎么就攤上這事江滨。” “怎么了厌均?”我有些...
    開封第一講書人閱讀 156,481評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵唬滑,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我莫秆,道長(zhǎng)间雀,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,303評(píng)論 1 282
  • 正文 為了忘掉前任镊屎,我火速辦了婚禮惹挟,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘缝驳。我一直安慰自己连锯,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,375評(píng)論 5 384
  • 文/花漫 我一把揭開白布用狱。 她就那樣靜靜地躺著运怖,像睡著了一般。 火紅的嫁衣襯著肌膚如雪夏伊。 梳的紋絲不亂的頭發(fā)上摇展,一...
    開封第一講書人閱讀 49,729評(píng)論 1 289
  • 那天,我揣著相機(jī)與錄音溺忧,去河邊找鬼避诽。 笑死争剿,一個(gè)胖子當(dāng)著我的面吹牛功氨,可吹牛的內(nèi)容都是我干的麦到。 我是一名探鬼主播,決...
    沈念sama閱讀 38,877評(píng)論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼歌溉,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼垄懂!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起痛垛,我...
    開封第一講書人閱讀 37,633評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤草慧,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后榜晦,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體冠蒋,經(jīng)...
    沈念sama閱讀 44,088評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,443評(píng)論 2 326
  • 正文 我和宋清朗相戀三年乾胶,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了抖剿。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片朽寞。...
    茶點(diǎn)故事閱讀 38,563評(píng)論 1 339
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖斩郎,靈堂內(nèi)的尸體忽然破棺而出脑融,到底是詐尸還是另有隱情,我是刑警寧澤缩宜,帶...
    沈念sama閱讀 34,251評(píng)論 4 328
  • 正文 年R本政府宣布肘迎,位于F島的核電站,受9級(jí)特大地震影響锻煌,放射性物質(zhì)發(fā)生泄漏妓布。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,827評(píng)論 3 312
  • 文/蒙蒙 一宋梧、第九天 我趴在偏房一處隱蔽的房頂上張望匣沼。 院中可真熱鬧,春花似錦捂龄、人聲如沸释涛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,712評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)唇撬。三九已至,卻和暖如春展融,著一層夾襖步出監(jiān)牢的瞬間窖认,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,943評(píng)論 1 264
  • 我被黑心中介騙來泰國(guó)打工告希, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留耀态,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,240評(píng)論 2 360
  • 正文 我出身青樓暂雹,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親创夜。 傳聞我的和親對(duì)象是個(gè)殘疾皇子杭跪,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,435評(píng)論 2 348