spark graphx最短路徑-java版本

網(wǎng)上graphx實現(xiàn)最短路徑的代碼比較多孽水,但是都是scala版本色乾,java版本的實現(xiàn)很少。
1.創(chuàng)建圖數(shù)據(jù)
使用的方法是Graph.apply()仙蛉,下面看一下scala的該方法的定義:

  def apply[VD, ED](vertices : org.apache.spark.rdd.RDD[scala.Tuple2[org.apache.spark.graphx.VertexId, VD]], edges : org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[ED]], defaultVertexAttr : VD = { /* compiled code */ }, edgeStorageLevel : org.apache.spark.storage.StorageLevel = { /* compiled code */ }, vertexStorageLevel : org.apache.spark.storage.StorageLevel = { /* compiled code */ })(implicit evidence$18 : scala.reflect.ClassTag[VD], evidence$19 : scala.reflect.ClassTag[ED]) : org.apache.spark.graphx.Graph[VD, ED] = { /* compiled code */ }

其中VD是頂點的屬性的類型(可以是list或者Tuple)主巍,ED是邊的屬性的類型(可以是list或者Tuple)冠息。
vertices是頂點的rdd,其中rdd中的元素結(jié)構(gòu)是Tuple2<點id孕索,VD>
edges是邊的rdd逛艰,其中rdd中的元素結(jié)構(gòu)是Tuple2<邊id,VD>
defaultVertexAttr是點屬性的默認值搞旭,假設(shè)創(chuàng)建一條邊散怖,1->2,但是id為2的點我沒有創(chuàng)建肄渗,只創(chuàng)建了1的點镇眷,那么這時候就會自動生產(chǎn)一個id為2的點,點的屬性就是這個默認值恳啥。
edgeStorageLevel點的存儲等級
vertexStorageLevel邊的存儲等級
點屬性的classTag
邊屬性的classTag
其中classTag偏灿,包含實際運行時的類的類型丹诀。
創(chuàng)建的圖數(shù)據(jù)如下:


image.png

代碼:

private static Graph<Tuple2<String, Integer>, Tuple2<Integer, Integer>> createGraph(
            JavaSparkContext javaSparkContext) {
        // 必須是Tuple2<Object, Tuple2<String, Integer>>钝的,不能是Tuple2<Long, Tuple2<String, Integer>>
        List<Tuple2<Object, Tuple2<String, Integer>>> vertexList = new ArrayList<>();
        vertexList.add(new Tuple2<>(1L, new Tuple2<>("mar_1", 21)));
        vertexList.add(new Tuple2<>(2L, new Tuple2<>("mar_2", 22)));
        vertexList.add(new Tuple2<>(3L, new Tuple2<>("mar_3", 23)));
        vertexList.add(new Tuple2<>(4L, new Tuple2<>("mar_4", 24)));
        vertexList.add(new Tuple2<>(5L, new Tuple2<>("mar_5", 25)));
        vertexList.add(new Tuple2<>(6L, new Tuple2<>("mar_6", 26)));
        vertexList.add(new Tuple2<>(7L, new Tuple2<>("mar_7", 27)));
        vertexList.add(new Tuple2<>(8L, new Tuple2<>("mar_8", 28)));
        vertexList.add(new Tuple2<>(9L, new Tuple2<>("mar_9", 29)));

        JavaRDD<Tuple2<Object, Tuple2<String, Integer>>> vertexRdd = javaSparkContext
                .parallelize(vertexList);

        List<Edge<Tuple2<Integer, Integer>>> edgeList = new ArrayList<>();
        edgeList.add(new Edge<>(1, 2, new Tuple2<>(1, 1)));
        edgeList.add(new Edge<>(2, 3, new Tuple2<>(2, 2)));
        edgeList.add(new Edge<>(1, 4, new Tuple2<>(3, 3)));
        edgeList.add(new Edge<>(1, 5, new Tuple2<>(4, 4)));
        edgeList.add(new Edge<>(1, 6, new Tuple2<>(5, 5)));
        edgeList.add(new Edge<>(4, 7, new Tuple2<>(6, 6)));
        edgeList.add(new Edge<>(7, 8, new Tuple2<>(7, 7)));
        edgeList.add(new Edge<>(5, 8, new Tuple2<>(8, 8)));
        edgeList.add(new Edge<>(8, 9, new Tuple2<>(9, 9)));
        edgeList.add(new Edge<>(6, 9, new Tuple2<>(10, 10)));
        edgeList.add(new Edge<>(3, 9, new Tuple2<>(11, 11)));

        JavaRDD<Edge<Tuple2<Integer, Integer>>> edgeRdd = javaSparkContext.parallelize(edgeList);

        Tuple2<String, Integer> defaultVertex = new Tuple2<>("default", -1);
        // ClassTag$.MODULE$.apply(Tuple2.class)所有用到的都改為ClassTag$.MODULE$.apply(Object.class)否則報錯
        Graph<Tuple2<String, Integer>, Tuple2<Integer, Integer>> graph = Graph
                .apply(vertexRdd.rdd(), edgeRdd.rdd(), defaultVertex, StorageLevels.MEMORY_ONLY,
                        StorageLevels.MEMORY_ONLY, ClassTag$.MODULE$.apply(Object.class),
                        ClassTag$.MODULE$.apply(Object.class));

        graph.vertices().toJavaRDD()
                .foreach(x -> System.out.println("vertex id:: " + x._1 + " , attr:: " + x._2));

        graph.edges().toJavaRDD().foreach(
                x -> System.out.println(
                        "edge id:: " + x.attr._1 + " , src:: " + x.srcId() + " , dest:: " + x
                                .dstId() + " weigh:: " + x.attr._2));
        return graph;
    }

注意其中翁垂,點的id要用Object類型的否則會報錯;還有classTag也用Object.class的否則也報錯硝桩,目前不知道什么原因沿猜。

2.最短路徑
這里用的是Pregel,這里簡單介紹一下詳細了解可以自行百度碗脊,Pregel框架是有谷歌提出啼肩,圖并行技術(shù)框架,以頂點為中心不斷的進行算法的迭代和數(shù)據(jù)同步衙伶。

Pregel的迭代過程如下:

  1. 最開始祈坠,圖中的所有頂點都會收到一個默認的消息,這個默認值就是方法的第一個參數(shù)矢劲。
  2. 各個頂點收到消息后調(diào)用vprog函數(shù)赦拘,生產(chǎn)新的消息。
  3. 調(diào)用sendMsg函數(shù)發(fā)送消息給下一輪迭代的頂點芬沉,這個函數(shù)將決定將消息發(fā)送給誰躺同。
  4. 接收到消息的頂點,調(diào)用vprog函數(shù)(如果收到多個消息丸逸,先調(diào)用mergeMsg)蹋艺,生產(chǎn)新的消息。其中最開始時是所有頂點都能收到消息的黄刚。
  5. 如果沒有頂點收到消息捎谨,或者到底迭代次數(shù)maxIterations退出計算,完成憔维。
    下面看下用到的方法的幾個參數(shù):
  def pregel[A](initialMsg : A, maxIterations : scala.Int = { /* compiled code */ }, activeDirection : org.apache.spark.graphx.EdgeDirection = { /* compiled code */ })(vprog : scala.Function3[org.apache.spark.graphx.VertexId, VD, A, VD], sendMsg : scala.Function1[org.apache.spark.graphx.EdgeTriplet[VD, ED], scala.Iterator[scala.Tuple2[org.apache.spark.graphx.VertexId, A]]], mergeMsg : scala.Function2[A, A, A])(implicit evidence$6 : scala.reflect.ClassTag[A]) : org.apache.spark.graphx.Graph[VD, ED] = { /* compiled code */ }

initialMsg:
第一輪迭代計算時侍芝,所有頂點收到的消息。(A表示消息類型)

maxIterations:
最大迭代次數(shù)(整型)

activeDirection:
沿著邊迭代的方向埋同。

vprog : scala.Function3[org.apache.spark.graphx.VertexId, VD, A, VD]:
在步驟2中調(diào)用的函數(shù)州叠,接收消息,然后生產(chǎn)頂點的新的屬性凶赁。
可以看到這個函數(shù)是scala.Function3類型的咧栗,3個入?yún)ⅲ?個返回值。
第一個入?yún)⑹屈cid虱肄,第二個參數(shù)是點的原來的屬性致板,第三個參數(shù)是接收到的消息,返回值是點的新的屬性咏窿。

scala.Function1[org.apache.spark.graphx.EdgeTriplet[VD, ED], scala.Iterator[scala.Tuple2[org.apache.spark.graphx.VertexId, A]]]:
發(fā)送消息的函數(shù)斟或,1個入?yún)ⅲ?個返回值。
入?yún)⑹荅dgeTriplet類型集嵌,這是保存邊的信息的一個類萝挤,這個類包括源點的屬性御毅、目的點的屬性、源點的id怜珍、目的點的id端蛆、以及邊的屬性黑竞。例如踱卵,如果a點收到消息,那么這個入?yún)⒕褪且詀為源的邊颜骤,a->b柔袁。返回值是發(fā)送消息到的點的id(例如b的id)呆躲,和發(fā)送的消息。

mergeMsg : scala.Function2[A, A, A]:
合并函數(shù)捶索,每個點可能收到多個消息歼秽,需要對消息進行合并。合并后在將消息作為入?yún)⒄{(diào)用vprog情组。

求最短路徑(指定起始點)的思路:

  1. 先對圖的點進行加工燥筷,即mapVertices,如果是起點院崇,那么該點的屬性為0肆氓,否則為整型最大值(這個值要大于圖中的最長路徑),這個屬性的意思就是路徑的長度底瓣,起始點路徑長度是0谢揪,其他的點路徑長度是一個很大的值。
    2.然后就開始用Pregel進行迭代捐凭,第一次所有點都收到消息(代碼里面設(shè)置的是整型最大值)拨扶,收到消息后點的屬性和消息取最小值,結(jié)果作為點的新的屬性茁肠。所以起始點的屬性為0患民,其他的為整型最大值。
    3.發(fā)送消息給下一輪迭代的頂點垦梆,函數(shù)中判斷srcAttr + 1 < dstAttr匹颤,如果滿足這發(fā)送,不滿足則不發(fā)送托猩,這樣只有和起始頂點直接連接的頂點才能收到消息印蓖。發(fā)送的消息是srcAttr + 1,這正好表示路徑的長度京腥。
    4.頂點收到消息后赦肃,進行merge操作,取最小的,這就模擬了最短路徑他宛,例如到該點有兩條路徑船侧,一條長度為3,一條為長度為2堕汞,那么把2作為該點的新的屬性勺爱。
    5.滿足迭代條件后結(jié)束計算晃琳,最后生產(chǎn)的點讯检,包含id和一個屬性,這個屬性就是起始點到該點的路徑長度卫旱。

整體代碼:

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.graphx.Edge;
import org.apache.spark.graphx.EdgeDirection;
import org.apache.spark.graphx.EdgeTriplet;
import org.apache.spark.graphx.Graph;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction1;
import scala.runtime.AbstractFunction2;
import scala.runtime.AbstractFunction3;

public class ShortestPath {

    public static void main(String[] args) {

        SparkConf sparkConf = new SparkConf().setAppName("shortest_path").setMaster("local[2]");
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
        Graph<Tuple2<String, Integer>, Tuple2<Integer, Integer>> graph = createGraph(
                javaSparkContext);

        
        Object obj = Predef1.reflexivity();
        scala.Predef.$eq$colon$eq<Tuple2<String, Integer>, Long> ev = (scala.Predef.$eq$colon$eq<Tuple2<String, Integer>, Long>) obj;
        Graph<Long, Tuple2<Integer, Integer>> initGraph = graph
                .mapVertices(new MapVerticesFunction(), ClassTag$.MODULE$.apply(Object.class), ev);

//        initGraph.vertices().toJavaRDD().foreach(x -> System.out.println(x));

        Graph<Long, Tuple2<Integer, Integer>> sssp = initGraph.ops()
                .pregel((long)Integer.MAX_VALUE, 1, EdgeDirection.Out(), new VertexProgram(), new SendMsgFunction(),
                        new MergeMsgFunction(), ClassTag$.MODULE$.apply(Object.class));

        sssp.vertices().toJavaRDD().foreach(x -> System.out.println(x));
    }


    static class VertexProgram extends AbstractFunction3<Object, Long, Long, Long> implements
            Serializable {

        @Override
        public Long apply(Object id, Long vertexAttr, Long newVertexAttr) {

            Long min = Math.min(vertexAttr, newVertexAttr);
            System.out.println("id:: "+id+" :: "+vertexAttr+" :: "+newVertexAttr+" min "+min);
            return min;
        }
    }

    static class SendMsgFunction extends
            AbstractFunction1<EdgeTriplet<Long, Tuple2<Integer, Integer>>, Iterator<Tuple2<Object, Long>>> implements
            Serializable {

        @Override
        public Iterator<Tuple2<Object, Long>> apply(
                EdgeTriplet<Long, Tuple2<Integer, Integer>> triplet) {

            long srcAttr = triplet.srcAttr();
            long dstAttr = triplet.dstAttr();

//      System.out.println(srcAttr+" "+attr+" "+dstAttr);
            System.out.println("srcid: "+triplet.srcId()+" destid: "+triplet.dstId()+" srca: "+srcAttr+" desa: "+dstAttr);
            if (srcAttr + 1 < dstAttr) {
                return JavaConversions.asScalaIterator(
                        Collections.singletonList(
                                new Tuple2<Object, Long>(triplet.dstId(), srcAttr + 1))
                                .iterator());
            } else {
                return JavaConversions.asScalaIterator(Collections.emptyIterator());
            }
        }
    }

    static class MergeMsgFunction extends AbstractFunction2<Long, Long, Long>
            implements Serializable {

        @Override
        public Long apply(Long a, Long b) {
//            System.out.println("marge ->" + Math.min((long) a, (long) b) + " ->  " + a + " " + b);
            return Math.min(a, b);
        }
    }

    public static class Predef1 {

        static public <T> scala.Predef.$eq$colon$eq<T, T> reflexivity() {
            return scala.Predef.$eq$colon$eq$.MODULE$.tpEquals();
        }
    }

    static class MapVerticesFunction extends
            AbstractFunction2<Object, Tuple2<String, Integer>, Long> implements Serializable {

        @Override
        public Long apply(Object v1, Tuple2<String, Integer> v2) {
            if ((long) v1 == 1) {
                return 0L;
            } else {
                return (long) Integer.MAX_VALUE;
            }
        }
    }

    private static Graph<Tuple2<String, Integer>, Tuple2<Integer, Integer>> createGraph(
            JavaSparkContext javaSparkContext) {
        // 必須是Tuple2<Object, Tuple2<String, Integer>>人灼,不能是Tuple2<Long, Tuple2<String, Integer>>
        List<Tuple2<Object, Tuple2<String, Integer>>> vertexList = new ArrayList<>();
        vertexList.add(new Tuple2<>(1L, new Tuple2<>("mar_1", 21)));
        vertexList.add(new Tuple2<>(2L, new Tuple2<>("mar_2", 22)));
        vertexList.add(new Tuple2<>(3L, new Tuple2<>("mar_3", 23)));
        vertexList.add(new Tuple2<>(4L, new Tuple2<>("mar_4", 24)));
        vertexList.add(new Tuple2<>(5L, new Tuple2<>("mar_5", 25)));
        vertexList.add(new Tuple2<>(6L, new Tuple2<>("mar_6", 26)));
        vertexList.add(new Tuple2<>(7L, new Tuple2<>("mar_7", 27)));
        vertexList.add(new Tuple2<>(8L, new Tuple2<>("mar_8", 28)));
        vertexList.add(new Tuple2<>(9L, new Tuple2<>("mar_9", 29)));

        JavaRDD<Tuple2<Object, Tuple2<String, Integer>>> vertexRdd = javaSparkContext
                .parallelize(vertexList);

        List<Edge<Tuple2<Integer, Integer>>> edgeList = new ArrayList<>();
        edgeList.add(new Edge<>(1, 2, new Tuple2<>(1, 1)));
        edgeList.add(new Edge<>(2, 3, new Tuple2<>(2, 2)));
        edgeList.add(new Edge<>(1, 4, new Tuple2<>(3, 3)));
        edgeList.add(new Edge<>(1, 5, new Tuple2<>(4, 4)));
        edgeList.add(new Edge<>(1, 6, new Tuple2<>(5, 5)));
        edgeList.add(new Edge<>(4, 7, new Tuple2<>(6, 6)));
        edgeList.add(new Edge<>(7, 8, new Tuple2<>(7, 7)));
        edgeList.add(new Edge<>(5, 8, new Tuple2<>(8, 8)));
        edgeList.add(new Edge<>(8, 9, new Tuple2<>(9, 9)));
        edgeList.add(new Edge<>(6, 9, new Tuple2<>(10, 10)));
        edgeList.add(new Edge<>(3, 9, new Tuple2<>(11, 11)));

        JavaRDD<Edge<Tuple2<Integer, Integer>>> edgeRdd = javaSparkContext.parallelize(edgeList);

        Tuple2<String, Integer> defaultVertex = new Tuple2<>("default", -1);
        // ClassTag$.MODULE$.apply(Tuple2.class)所有用到的都改為ClassTag$.MODULE$.apply(Object.class)否則報錯
        Graph<Tuple2<String, Integer>, Tuple2<Integer, Integer>> graph = Graph
                .apply(vertexRdd.rdd(), edgeRdd.rdd(), defaultVertex, StorageLevels.MEMORY_ONLY,
                        StorageLevels.MEMORY_ONLY, ClassTag$.MODULE$.apply(Object.class),
                        ClassTag$.MODULE$.apply(Object.class));

        graph.vertices().toJavaRDD()
                .foreach(x -> System.out.println("vertex id:: " + x._1 + " , attr:: " + x._2));

        graph.edges().toJavaRDD().foreach(
                x -> System.out.println(
                        "edge id:: " + x.attr._1 + " , src:: " + x.srcId() + " , dest:: " + x
                                .dstId() + " weigh:: " + x.attr._2));
        return graph;
    }
}

看下輸出結(jié)果:

(1,0)
(3,2147483647)
(7,2147483647)
(4,1)
(9,2147483647)
(5,1)
(6,1)
(8,2147483647)
(2,1)

上面的迭代次數(shù)只設(shè)置為1次,就是以起始點開始向外第一層(可以自行修改)顾翼,對比圖看結(jié)果投放,和1點直接連接的是2、4适贸、5灸芳、6,結(jié)果中對應的長度為1拜姿,其他點為Integer.MAX_VALUE烙样,如果想取固定的目的點,加過過濾即可蕊肥。
注意:
由于java和scala的兼容性問題谒获,如果你的idea爆紅,不用理會壁却,不影響編譯和運行批狱。
maven依賴:

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.3.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.0</version>
    </dependency>
   <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_2.11</artifactId>
      <version>2.3.0</version>
    </dependency>

版權(quán)聲明
1.以上文章為本人原創(chuàng),首發(fā)于簡書網(wǎng)展东,文責自負赔硫。
2.未經(jīng)作者同意不得轉(zhuǎn)載,如需轉(zhuǎn)載請聯(lián)系作者盐肃。感謝卦停。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市恼蓬,隨后出現(xiàn)的幾起案子惊完,更是在濱河造成了極大的恐慌,老刑警劉巖处硬,帶你破解...
    沈念sama閱讀 217,542評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件小槐,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機凿跳,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,822評論 3 394
  • 文/潘曉璐 我一進店門件豌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人控嗜,你說我怎么就攤上這事茧彤。” “怎么了疆栏?”我有些...
    開封第一講書人閱讀 163,912評論 0 354
  • 文/不壞的土叔 我叫張陵曾掂,是天一觀的道長。 經(jīng)常有香客問我壁顶,道長珠洗,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,449評論 1 293
  • 正文 為了忘掉前任若专,我火速辦了婚禮许蓖,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘调衰。我一直安慰自己膊爪,他們只是感情好,可當我...
    茶點故事閱讀 67,500評論 6 392
  • 文/花漫 我一把揭開白布嚎莉。 她就那樣靜靜地躺著米酬,像睡著了一般。 火紅的嫁衣襯著肌膚如雪萝喘。 梳的紋絲不亂的頭發(fā)上淮逻,一...
    開封第一講書人閱讀 51,370評論 1 302
  • 那天,我揣著相機與錄音阁簸,去河邊找鬼爬早。 笑死,一個胖子當著我的面吹牛启妹,可吹牛的內(nèi)容都是我干的筛严。 我是一名探鬼主播,決...
    沈念sama閱讀 40,193評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼饶米,長吁一口氣:“原來是場噩夢啊……” “哼桨啃!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起檬输,我...
    開封第一講書人閱讀 39,074評論 0 276
  • 序言:老撾萬榮一對情侶失蹤照瘾,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后丧慈,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體析命,經(jīng)...
    沈念sama閱讀 45,505評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡主卫,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,722評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了鹃愤。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片簇搅。...
    茶點故事閱讀 39,841評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖软吐,靈堂內(nèi)的尸體忽然破棺而出瘩将,到底是詐尸還是另有隱情,我是刑警寧澤凹耙,帶...
    沈念sama閱讀 35,569評論 5 345
  • 正文 年R本政府宣布姿现,位于F島的核電站,受9級特大地震影響使兔,放射性物質(zhì)發(fā)生泄漏建钥。R本人自食惡果不足惜藤韵,卻給世界環(huán)境...
    茶點故事閱讀 41,168評論 3 328
  • 文/蒙蒙 一虐沥、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧泽艘,春花似錦欲险、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,783評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至然低,卻和暖如春喜每,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背雳攘。 一陣腳步聲響...
    開封第一講書人閱讀 32,918評論 1 269
  • 我被黑心中介騙來泰國打工带兜, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人吨灭。 一個月前我還...
    沈念sama閱讀 47,962評論 2 370
  • 正文 我出身青樓刚照,卻偏偏與公主長得像,于是被迫代替她去往敵國和親喧兄。 傳聞我的和親對象是個殘疾皇子无畔,可洞房花燭夜當晚...
    茶點故事閱讀 44,781評論 2 354