Rxjava2 可連接的Observable(ConnectableObservable)操作詳解及實例

簡要:

需求了解:

Rxjava中的普通的 Observable 在觀察者訂閱的時候就會發(fā)射數據,但是有的時候我們想自己控制數據的發(fā)射岩喷,比如在有指定的觀察者或者全部的觀察者訂閱后開始發(fā)射數據呆奕,這個時候我們就要要用到Rxjava中的可連接的Observable來完成這個需求甘畅。

這一節(jié)主要介紹 ConnectableObservable 和它的子類以及它們的操作符:

  • ConnectableObservable: 一個可連接的Observable慨蓝,在訂閱后不發(fā)射數據剧防,調用 connect() 方法后開始發(fā)射數據京革。
  • Observable.publish():將一個Observable轉換為一個可連接的Observable 奇唤。
  • ConnectableObservable.connect():指示一個可連接的Observable開始發(fā)射數據。
  • Observable.replay():確保所有的訂閱者看到相同的數據序列匹摇,即使它們在Observable開始發(fā)射數據之后才訂閱咬扇。
  • ConnectableObservable.refCount():讓一個可連接的Observable表現得像一個普通的Observable。
  • Observable.share():可以直接將Observable轉換為一個具有ConnectableObservable特性的Observable對象廊勃,等價于Observable.publish().refCount()
  • Observable.replay():保證所有的觀察者收到相同的數據序列懈贺,即使它們在Observable開始發(fā)射數據之后才訂閱。

1. ConnectableObservable

一個可連接的Observable(ConnectableObservable)與普通的Observable差不多供搀。不同之處:可連接的Observable在被訂閱時并不開始發(fā)射數據隅居,只有在它的 connect() 被調用時才開始。用這種方法葛虐,你可以等部分或者所有的潛在訂閱者都訂閱了這個Observable之后才開始發(fā)射數據胎源。

img-ConnectableObservable

注意: ConnectableObservable 的線程切換只能通過 replay 操作符來實現,普通 Observable 的 subscribeOn()observerOn() 在 ConnectableObservable 中不起作用屿脐√樵椋可以通過 replay 操作符的指定線程調度器的方式來進行線程的切換。

Javadoc: ConnectableObservable

2. Publish

將普通的Observable轉換為可連接的Observable(ConnectableObservable)的诵。

如果要使用可連接的Observable万栅,可以使用Observable的 publish 操作符,來將相應轉換為ConnectableObservable對象西疤。

有一個變體接受一個函數作為參數(publish(Function selector))烦粒。這個函數用原始Observable發(fā)射的數據作為參數,產生 一個新的數據作為 ConnectableObservable 給發(fā)射,替換原位置的數據項扰她。實質是在簽名的基礎上添加一個 Map 操作兽掰。

簡單實例:

  // 1. publish()
  // 創(chuàng)建ConnectableObservable
  ConnectableObservable<Integer> connectableObservable = Observable.range(1, 5)
          .publish();    // publish操作將Observable轉化為一個可連接的Observable

    // 2. publish(Function<Observable<T>, ObservableSource<R>> selector)
  // 接受原始Observable的數據,產生一個新的Observable徒役,可以對這個Observable進行函數處理
  Observable<String> publish = Observable.range(1, 5)
          .publish(new Function<Observable<Integer>, ObservableSource<String>>() {

              @Override
              public ObservableSource<String> apply(Observable<Integer> integerObservable) throws Exception {
                  System.out.println("--> apply(4): " + integerObservable.toString());

                  Observable<String> map = integerObservable.map(new Function<Integer, String>() {

                      @Override
                      public String apply(Integer integer) throws Exception {
                          return "[this is map value]: " + integer * integer;
                      }
                  });
                  return map;
              }
          });
          
    publish.subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("--> accept(4): " + s);
        }
    });

輸出:

--> apply(4): io.reactivex.subjects.PublishSubject@3fb4f649
--> accept(4): [this is map value]: 1
--> accept(4): [this is map value]: 4
--> accept(4): [this is map value]: 9
--> accept(4): [this is map value]: 16
--> accept(4): [this is map value]: 25

Javadoc: Observable.publish()
Javadoc: Observable.publish(Function<Observable<T>,ObservableSource<R> selector)

3. Connect

讓一個可連接的Observable開始發(fā)射數據給訂閱者孽尽。

  • 可連接的Observable (connectableObservable)與普通的Observable差不多,不過它并不會在被訂閱時開始發(fā)射數據忧勿,而是直到使用了 Connect 操作符時才會開始杉女。
  • RxJava中 connect 是 ConnectableObservable 接口的一個方法,使用 publish 操作符可以將一個普通的Observable轉換為一個 ConnectableObservable 鸳吸。
  • 調用 ConnectableObservable 的 connect 方法會讓它后面的Observable開始給發(fā)射數據給訂閱 者熏挎。connect 方法返回一個 Subscription 對象,可以調用它的 unsubscribe 方法讓Observable停 止發(fā)射數據給觀察者层释。
  • 即使沒有任何訂閱者訂閱它婆瓜,你也可以使用 connect 方法讓一個Observable開始發(fā)射數據 (或者開始生成待發(fā)射的數據)。這樣贡羔,你可以將一個"冷"的Observable變?yōu)?熱"的廉白。

實例代碼:

    // 1. publish()
    // 創(chuàng)建ConnectableObservable
    ConnectableObservable<Integer> connectableObservable = Observable.range(1, 5)
            .publish();    // publish操作將Observable轉化為一個可連接的Observable

    // 創(chuàng)建普通的Observable
    Observable<Integer> range = Observable.range(1, 5);

    // 1.1 connectableObservable在被訂閱時并不開始發(fā)射數據,只有在它的 connect() 被調用時才開始
    connectableObservable.subscribe(new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(1)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(1): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(1): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(1)");
        }
    });

    // 1.2 connectableObservable在被訂閱時并不開始發(fā)射數據乖寒,只有在它的 connect() 被調用時才開始
    connectableObservable.subscribe(new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(2)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(2): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(2): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(2)");
        }
    });

    // 1.3 普通Observable在被訂閱時就會發(fā)射數據
    range.subscribe(new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(3)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(3): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(3): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(3)");
        }
    });

    System.out.println("----------------start connect------------------");
    // 可連接的Observable在被訂閱時并不開始發(fā)射數據猴蹂,只有在它的connect()被調用時才開始發(fā)射數據
    // connectableObservable.connect();
    
    // 可選參數Consumer,返回一個Disposable對象楣嘁,可以獲取訂閱狀態(tài)和取消當前的訂閱
    connectableObservable.connect(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("--> connect accept: " + disposable.isDisposed());
            // disposable.dispose();
        }
    });

輸出:

--> onSubscribe(1)
--> onSubscribe(2)
--> onSubscribe(3)
--> onNext(3): 1
--> onNext(3): 2
--> onNext(3): 3
--> onNext(3): 4
--> onNext(3): 5
--> onComplete(3)
----------------start connect------------------
--> connect accept: false
--> onNext(1): 1
--> onNext(2): 1
--> onNext(1): 2
--> onNext(2): 2
--> onNext(1): 3
--> onNext(2): 3
--> onNext(1): 4
--> onNext(2): 4
--> onNext(1): 5
--> onNext(2): 5
--> onComplete(1)
--> onComplete(2)

Javadoc: ConnectableObservable.connect()
Javadoc: ConnectableObservable.connect(Consumer<Disposable> connection)

4. RefCount

RefCount 的作用是讓一個可連接的Observable行為像普通的Observable磅轻。

RefCount 操作符把從一個可連接的Observable連接和斷開的過程自動化了。它操作一個可連接的Observable逐虚,返回一個普通的Observable聋溜。當第一個訂閱者訂閱這個Observable 時, RefCount 連接到下層的可連接Observable叭爱。 RefCount 跟蹤有多少個觀察者訂閱它撮躁,直到最后一個觀察者完成才斷開與下層可連接Observable的連接。

解析: refCount() 把 ConnectableObservable 變?yōu)橐粋€普通的 Observable 但又保持了 ConnectableObservable 的特性买雾。如果出現第一個 Observer把曼,它就會自動調用 connect(),如果所有的 Observer 全部 dispose漓穿,那么它也會停止接受上游 Observable 的數據嗤军。

實例代碼:

    /**
     * refCount(int subscriberCount, long timeout, TimeUnit unit, Scheduler scheduler)
     *
     * 具有以下可選參數:
     * subscriberCount: 指定需要連接到上游的訂閱者數量。注意:當訂閱者滿足此數量后才會處理
     * timeout:         所有訂閱用戶退訂后斷開連接前的等待時間
     * unit:            時間單位
     * scheduler:        斷開連接之前要等待的目標調度器
     */
    Observable<Long> refCountObservable = Observable
            .intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)
            .publish()
            .refCount()
            .subscribeOn(Schedulers.newThread())    // 指定訂閱調度在子線程
            .observeOn(Schedulers.newThread());     // 指定觀察者調度在子線程
        //  .refCount(1, 500, TimeUnit.MILLISECONDS, Schedulers.newThread());

    // 第1個訂閱者
    refCountObservable.subscribe(new Observer<Long>() {
        private  Disposable disposable;
        private  int buff = 0;

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("----> onSubscribe(1): ");
            disposable = d;
        }

        @Override
        public void onNext(Long value) {
            if (buff == 3) {
                disposable.dispose();   // 解除當前的訂閱
                System.out.println("----> Subscribe(1) is dispose! ");
            } else {
                System.out.println("--> onNext(1): " + value);
            }
            buff++;
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(1): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(1): ");
        }
    });

    // 第2個訂閱者
    refCountObservable.doOnSubscribe(new Consumer<Disposable>() {

                @Override
                public void accept(Disposable disposable) throws Exception {
                    System.out.println("----> onSubscribe(2): ");
                }
            })
            .delaySubscription(2, TimeUnit.SECONDS)   // 延遲2秒后訂閱
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long value) throws Exception {
                    System.out.println("--> accept(2): " + value);
                }
            });

    System.in.read();

輸出:

----> onSubscribe(1): 
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
----> onSubscribe(2): 
----> Subscribe(1) is dispose! 
--> accept(2): 4
--> accept(2): 5

Javadoc: ConnectableObservable.refCount(subscriberCount, timeout, unit, scheduler)

5. Share

一個普通的Observable可以通過 publish 來將其轉換為ConnectableObservable晃危,然后可以調用其 refCount() 的方法將其轉換為一個具有 ConnectableObservable 特性的Observable叙赚。

其實Observable中還有一個操作方法,可以直接完成此步驟的操作,這就是 Observable.share() 操作符纠俭。

可以來看一下share操作符的源碼:

    public final Observable<T> share() {
        return publish().refCount();
    }

通過源碼可以知道沿量,share() 方法可以直接將Observable轉換為一個具有ConnectableObservable特性的Observable對象,即Observable.publish().refCount() == Observable.share() 冤荆。

實例代碼:

    // share()
    // 通過share() 同時應用 publish 和 refCount 操作
    Observable<Long> share = Observable
            .intervalRange(1, 5, 0, 500, TimeUnit.MILLISECONDS)
      //    .publish().refCount()
            .share()  // 等價于上面的操作
            .subscribeOn(Schedulers.newThread())    // 指定訂閱調度在子線程
            .observeOn(Schedulers.newThread());     // 指定觀察者調度在子線程

    // 1. 第一個訂閱者
    share.subscribe(new Observer<Long>() {
        private  Disposable disposable;
        private  int buff = 0;

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("----> onSubscribe(1): ");
            disposable = d;
        }

        @Override
        public void onNext(Long value) {
            if (buff == 3) {
                disposable.dispose();   // 解除當前的訂閱
                System.out.println("----> Subscribe(1) is dispose! ");
            } else {
                System.out.println("--> onNext(1): " + value);
            }
            buff++;
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(1): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onComplete(1): ");
        }
    });

    // 2. 第二個訂閱者
    share.doOnSubscribe(new Consumer<Disposable>() {

                @Override
                public void accept(Disposable disposable) throws Exception {
                    System.out.println("----> onSubscribe(2): ");
                }
            })
            .delaySubscription(1, TimeUnit.SECONDS)    // 延遲1秒后訂閱
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long value) throws Exception {
                    System.out.println("--> accept(2): " + value);
                }
            });

    System.in.read();

輸出:

----> onSubscribe(1): 
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 3
----> onSubscribe(2): 
----> Subscribe(1) is dispose! 
--> accept(2): 4
--> accept(2): 5

Javadoc: Observable.share()

6. Replay

保證所有的觀察者收到相同的數據序列,即使它們在Observable開始發(fā)射數據之后才訂閱权纤。

img-Replay

如果在將一個Observable轉換為可連接的Observable之前對它使用 Replay 操作符钓简,產生的這個可連接Observable將總是發(fā)射完整的數據序列給任何未來的觀察者,可以緩存發(fā)射過的數據汹想,即使那些觀察者在這 個Observable開始給其它觀察者發(fā)射數據之后才訂閱外邓。

注意: replay操作符生成的 connectableObservable ,如果沒有對緩存進行限定古掏,那么無論觀察者何時去訂閱损话,都可以收到 Observable 完整的數據序列項。

replay 操作符最好根據實際情況限定緩存的大小槽唾,否則數據發(fā)射過快或者較多時會占用很高的內存丧枪。replay 操作符有可以接受不同參數的變體,有的可以指定 replay 的最大緩存數量或者指定緩存時間庞萍,還可以指定調度器拧烦。

  • replay不僅可以緩存Observable的所有數據序列,也可以進行限定緩存大小的操作钝计。
  • 還有有一種 replay 返回一個普通的Observable恋博。它可以接受一個變換函數為參數,這個函數接受原始Observable發(fā)射的數據項為參數私恬,返回結果Observable要發(fā)射的一項數據债沮。因此,這個操作符其實是 replay 變換之后的數據項本鸣。

實例代碼:

    // 創(chuàng)建發(fā)射數據的Observable
    Observable<Long> observable = Observable
            .intervalRange(1,
                    10,
                    1,
                    500,
                    TimeUnit.MILLISECONDS,
                    Schedulers.newThread());

    /**
     * 1.1 replay(Scheduler scheduler)
     * 可選參數:scheduler, 指定線程調度器
     * 接受原始數據的所有數據
     */
//  ConnectableObservable<Long> replay1 = observable.replay();

    /**
     * 1.2 replay(int bufferSize, Scheduler scheduler)
     * 可選參數:scheduler, 指定線程調度器
     * 只緩存 bufferSize 個最近的原始數據
     */
//  ConnectableObservable<Long> replay1 = observable.replay(1); // 設置緩存大小為1, 從原數據中緩存最近的1個數據

    /**
     * 1.3 replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler)
     * 可選參數:scheduler, 指定線程調度器
     * 在訂閱前指定的時間段內緩存 bufferSize 個數據, 注意計時開始是原始數據發(fā)射第1個數據項之后開始
     */
//  ConnectableObservable<Long> replay1 = observable.replay(5, 1000, TimeUnit.MILLISECONDS);

    /**
     * 1.4 replay(long time, TimeUnit unit, Scheduler scheduler)
     * 可選參數:scheduler, 指定線程調度器
     * 在訂閱前指定的時間段內緩存數據, 注意計時開始是原始數據發(fā)射第1個數據項之后開始
     */
   ConnectableObservable<Long> replay1 = observable.replay( 1000, TimeUnit.MILLISECONDS);

    // 進行 connect 操作
    replay1.connect();

    // 第一個觀察者
    replay1.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> onSubScribe(1-1)");
        }
    }).subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            System.out.println("--> accept(1-1): " + aLong);
        }
    });

    // 第二個觀察者(延遲1秒后訂閱)
    replay1.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> onSubScribe(1-2)");
        }
    }).delaySubscription(1, TimeUnit.SECONDS)
      .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                System.out.println("--> accept(1-2): " + aLong);
            }
      });

    // 第三個觀察者(延遲2秒后訂閱)
    replay1.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> onSubScribe(1-3)");
        }
    }).delaySubscription(2, TimeUnit.SECONDS)
       .subscribe(new Consumer<Long>() {
           @Override
           public void accept(Long aLong) throws Exception {
               System.out.println("--> accept(1-3): " + aLong);
           }
       });

    System.in.read();
    System.out.println("----------------------------------------------------------");
    /**
     * 2. replay(Function<Observable<T>, ObservableSource<R>> selector,
     * int bufferSize,                              可選參數: 指定從元數據序列數據的緩存大小
     * long time, TimeUnit unit,        可選參數: 指定緩存指定時間段的數據序列
     * Scheduler scheduler)                 可選參數: 指定線程調度器
     *
     * 接受一個變換函數 function 為參數疫衩,這個函數接受原始Observable發(fā)射的數據項為參數
     * 通過指定的函數處理后,返回一個處理后的Observable
     */
    Observable<String> replayObservable = observable.replay(new Function<Observable<Long>, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Observable<Long> longObservable) throws Exception {
            // 對原始數據進行處理
            Observable<String> map = longObservable.map(new Function<Long, String>() {
                @Override
                public String apply(Long aLong) throws Exception {
                    return aLong + "2 = " + aLong * aLong;  // 將原始數據進行平方處理永高,并轉換為字符串數據類型
                }
            });

            return map;
        }
    }, 1, Schedulers.newThread());

    replayObservable.subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.newThread());

    // 第一個觀察者
    replayObservable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("--> onSubScribe(2-1)");
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println("--> accept(2-1): " + s);
        }
    });

    // 訂閱第二個觀察者 (延遲2秒后訂閱)
    replayObservable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("--> onSubScribe(2-2)");
        }
    }).delaySubscription(2, TimeUnit.SECONDS)
      .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("--> accept(2-2): " + s);
            }
       });

    System.in.read();

輸出:

----> onSubScribe(1-1)
--> accept(1-1): 1
--> accept(1-1): 2
--> accept(1-1): 3
----> onSubScribe(1-2)
--> accept(1-2): 2
--> accept(1-2): 3
--> accept(1-1): 4
--> accept(1-2): 4
--> accept(1-1): 5
--> accept(1-2): 5
----> onSubScribe(1-3)
--> accept(1-3): 4
--> accept(1-3): 5
--> accept(1-1): 6
--> accept(1-2): 6
--> accept(1-3): 6
--> accept(1-1): 7
--> accept(1-2): 7
--> accept(1-3): 7
--> accept(1-1): 8
--> accept(1-2): 8
--> accept(1-3): 8
--> accept(1-1): 9
--> accept(1-2): 9
--> accept(1-3): 9
--> accept(1-1): 10
--> accept(1-2): 10
--> accept(1-3): 10
----------------------------------------------------------
--> onSubScribe(2-1)
--> accept(2-1): 12 = 1
--> accept(2-1): 22 = 4
--> accept(2-1): 32 = 9
--> accept(2-1): 42 = 16
--> onSubScribe(2-2)
--> accept(2-1): 52 = 25
--> accept(2-2): 12 = 1
--> accept(2-2): 22 = 4
--> accept(2-1): 62 = 36
--> accept(2-2): 32 = 9
--> accept(2-1): 72 = 49
--> accept(2-1): 82 = 64
--> accept(2-2): 42 = 16
--> accept(2-2): 52 = 25
--> accept(2-1): 92 = 81
--> accept(2-2): 62 = 36
--> accept(2-1): 102 = 100
--> accept(2-2): 72 = 49
--> accept(2-2): 82 = 64
--> accept(2-2): 92 = 81
--> accept(2-2): 102 = 100

Javadoc: Observable.replay(int bufferSize, long time, TimeUnit unit, Scheduler scheduler)
Javadoc: Observable.replay(Function<Observable<T>,ObservableSource<R>> selector, int bufferSize, long time, TimeUnit unit, Scheduler scheduler)

小結

Rxjava 的連接操作符主要的核心是 ConnectableObservable 這個可連接的Observable對象的概念隧土。可連接的 Observable 在被訂閱時并不會直接發(fā)射數據命爬,只有在他的 connect() 方法被調用時才會發(fā)射數據曹傀。便于更好的對數據的發(fā)射行為的控制,同時也對數據有很好的操作能力饲宛,可以緩存數據皆愉,指定緩存大小,時間片段緩存等。

提示:以上使用的Rxjava2版本: 2.2.12

Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例

實例代碼:

?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末幕庐,一起剝皮案震驚了整個濱河市久锥,隨后出現的幾起案子,更是在濱河造成了極大的恐慌异剥,老刑警劉巖瑟由,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異冤寿,居然都是意外死亡歹苦,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進店門督怜,熙熙樓的掌柜王于貴愁眉苦臉地迎上來殴瘦,“玉大人,你說我怎么就攤上這事号杠◎揭福” “怎么了?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵姨蟋,是天一觀的道長屉凯。 經常有香客問我,道長芬探,這世上最難降的妖魔是什么神得? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮偷仿,結果婚禮上哩簿,老公的妹妹穿的比我還像新娘。我一直安慰自己酝静,他們只是感情好节榜,可當我...
    茶點故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著别智,像睡著了一般宗苍。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上薄榛,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天讳窟,我揣著相機與錄音,去河邊找鬼敞恋。 笑死丽啡,一個胖子當著我的面吹牛,可吹牛的內容都是我干的硬猫。 我是一名探鬼主播补箍,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼改执,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了坑雅?” 一聲冷哼從身側響起辈挂,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎裹粤,沒想到半個月后终蒂,有當地人在樹林里發(fā)現了一具尸體,經...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡遥诉,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年后豫,在試婚紗的時候發(fā)現自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片突那。...
    茶點故事閱讀 39,981評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖构眯,靈堂內的尸體忽然破棺而出愕难,到底是詐尸還是另有隱情,我是刑警寧澤惫霸,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布猫缭,位于F島的核電站,受9級特大地震影響壹店,放射性物質發(fā)生泄漏猜丹。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一硅卢、第九天 我趴在偏房一處隱蔽的房頂上張望射窒。 院中可真熱鬧,春花似錦将塑、人聲如沸脉顿。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽艾疟。三九已至,卻和暖如春敢辩,著一層夾襖步出監(jiān)牢的瞬間蔽莱,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工戚长, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留盗冷,地道東北人。 一個月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓历葛,卻偏偏與公主長得像正塌,于是被迫代替她去往敵國和親嘀略。 傳聞我的和親對象是個殘疾皇子沦辙,可洞房花燭夜當晚...
    茶點故事閱讀 44,933評論 2 355

推薦閱讀更多精彩內容