Rxjava2 Observable的數(shù)據(jù)變換詳解及實(shí)例(二)

接續(xù)上篇: Rxjava2 Observable的數(shù)據(jù)變換詳解及實(shí)例(一)

1. Window

定期將來自原始Observable的數(shù)據(jù)分解為一個(gè)Observable窗口,發(fā)射這些窗口,而不是每次發(fā)射一項(xiàng)數(shù)據(jù)畴蹭。

img-window

WindowBuffer 類似卫枝,但不是發(fā)射來自原始Observable的數(shù)據(jù)包捕虽,它發(fā)射的是 Observables簿盅,這些Observables中的每一個(gè)都發(fā)射原始Observable數(shù)據(jù)的一個(gè)子集诅病,最后發(fā) 射一個(gè) onCompleted 通知。

Buffer一樣卓鹿,Window 有很多變體菱魔,每一種都以自己的方式將原始Observable分解為多個(gè)作為結(jié)果的Observable,每一個(gè)都包含一個(gè)映射原始數(shù)據(jù)的 window 吟孙。用 Window操作符的術(shù)語(yǔ)描述就是澜倦,當(dāng)一個(gè)窗口打開(when a window "opens")意味著一個(gè)新的Observable已經(jīng)發(fā)射 (產(chǎn)生)了,而且這個(gè)Observable開始發(fā)射來自原始Observable的數(shù)據(jù)杰妓;當(dāng)一個(gè)窗口關(guān)閉 (when a window "closes")意味著發(fā)射(產(chǎn)生)的Observable停止發(fā)射原始Observable的數(shù)據(jù)藻治, 并且發(fā)射終止通知 onCompleted 給它的觀察者們。

在RxJava中有許多種Window操作符的方法稚失。

1.1 window(closingSelector)

window 的這個(gè)方法會(huì)立即打開它的第一個(gè)窗口栋艳。每當(dāng)它觀察到closingSelector返回的 Observable發(fā)射了一個(gè)對(duì)象時(shí),它就關(guān)閉當(dāng)前打開的窗口并立即打開一個(gè)新窗口句各。用這個(gè)方法吸占,這種 window 變體發(fā)射一系列不重疊的窗口,這些窗口的數(shù)據(jù)集合與原始Observable發(fā)射的數(shù)據(jù)是一一對(duì)應(yīng)的凿宾。

img-window(closingSelector)

解析: 一開始開啟一個(gè) window 接收原始數(shù)據(jù)矾屯,每當(dāng)它觀察到closingSelector返回的 Observable發(fā)射了一個(gè)對(duì)象時(shí),它就關(guān)閉當(dāng)前打開的窗口并取消此時(shí)訂閱closingSelector 的Observable ( 此時(shí)可能是沒有數(shù)據(jù) window )并立即打開一個(gè)新窗口初厚,注意: 每個(gè)窗口開啟前都會(huì)去訂閱一個(gè)closingSelector返回的 Observable件蚕。

實(shí)例代碼:

    // 1. window(Callable boundary)
    // 開啟一個(gè)window,并訂閱觀察boundary返回的Observable發(fā)射了一個(gè)數(shù)據(jù)产禾,
    // 則關(guān)閉此window排作,將收集的數(shù)據(jù)以O(shè)bservable發(fā)送, 重新訂閱boundary返回的Observable,開啟新window
    Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS)
        .window(new Callable<Observable<Long>>() {

            @Override
            public Observable<Long> call() throws Exception {
                System.out.println("--> call(1)");
                return Observable.timer(2, TimeUnit.SECONDS); // 兩秒后關(guān)閉當(dāng)前窗口
            }
        }).subscribe(new Consumer<Observable<Long>>() {

            @Override
            public void accept(Observable<Long> t) throws Exception {
                // 接受每個(gè)window接受的數(shù)據(jù)的Observable
                t.subscribe(new Consumer<Long>() {

                    @Override
                    public void accept(Long t) throws Exception {
                        System.out.println("--> accept(1): " + t);
                    }
                });
            }
        });

輸出:

--> call(1)
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> call(1)
--> accept(1): 4
--> accept(1): 5
--> call(1)
--> accept(1): 6
--> accept(1): 7
--> call(1)
--> accept(1): 8
--> accept(1): 9
--> call(1)
--> accept(1): 10

Javadoc: window(closingSelector)
Javadoc: window(closingSelector, bufferSize)

1.2 window(openingIndicator, closingIndicator)

當(dāng) openingIndicator 發(fā)射一個(gè)數(shù)據(jù)亚情,就會(huì)打開一個(gè) window, 同時(shí)訂閱 closingIndicator 返回的Observable妄痪,當(dāng)這個(gè)Observable發(fā)射一個(gè)數(shù)據(jù),就結(jié)束此 window 和 ,發(fā)送收集數(shù)據(jù)的 Observable楞件。

img-window(openingIndicator, closingIndicator)

無論何時(shí)衫生,只要 window 觀察到 windowOpenings 這個(gè)Observable發(fā)射了一個(gè) Opening 對(duì)象,它就打開一個(gè)窗口土浸,并且同時(shí)調(diào)用 closingSelector 生成一個(gè)與那個(gè)窗口關(guān)聯(lián)的關(guān)閉 (closing)Observable 罪针。當(dāng)這個(gè)關(guān)閉 (closing)Observable 發(fā)射了一個(gè)對(duì)象時(shí),window 操作符就會(huì)關(guān)閉那個(gè)窗口以及關(guān)聯(lián)的closingSelector的 Observable黄伊。

注意: 對(duì)這個(gè)方法來說泪酱,由于當(dāng)前窗口的關(guān)閉和新窗口的打開是由單獨(dú)的 Observable 管理的,它創(chuàng)建的窗口可能會(huì)存在重疊(重復(fù)某些來自原始Observable的數(shù)據(jù)) 或間隙(丟棄某些來自原始Observable的數(shù)據(jù))

實(shí)例代碼:

    // 2. window(ObservableSource openingIndicator, Function<T, ObservableSource<R>> closingIndicator)
    // 當(dāng)openingIndicator發(fā)射一個(gè)數(shù)據(jù)西篓,就會(huì)打開一個(gè)window, 同時(shí)訂閱closingIndicator返回的Observable愈腾,
    // 當(dāng)這個(gè)Observable發(fā)射一個(gè)數(shù)據(jù),就結(jié)束此window以及對(duì)應(yīng)的closingIndicator,發(fā)送收集數(shù)據(jù)的 Observable岂津。
    Observable<Long> openingIndicator = Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
            .doOnSubscribe(new Consumer<Disposable>() {

                @Override
                public void accept(Disposable t) throws Exception {
                    System.out.println("--> openingIndicator is subscribe!");
                }
            }).doOnComplete(new Action() {
                
                @Override
                public void run() throws Exception {
                    System.out.println("--> openingIndicator is completed!");
                }
            }).doOnNext(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> openingIndicator emitter: " + t);
                }
            });
    
    Observable<Long> dataSource = Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
            .doOnSubscribe(new Consumer<Disposable>() {

                @Override
                public void accept(Disposable t) throws Exception {
                    System.out.println("--> DataSource is subscribe!");
                }
            }).doOnNext(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> DataSource emitter: " + t);
                }
            });
    
    dataSource.window(openingIndicator, new Function<Long, Observable<Long>>() {

                @Override
                public Observable<Long> apply(Long t) throws Exception {
                    System.out.println("--> apply(2): " + t);
                    return Observable.timer(2, TimeUnit.SECONDS).doOnSubscribe(new Consumer<Disposable>() {

                        @Override
                        public void accept(Disposable t) throws Exception {
                            System.out.println("--> closingIndicator is subscribe!");
                        }
                    });
                }
            }).subscribe(new Consumer<Observable<Long>>() {

                @Override
                public void accept(Observable<Long> t) throws Exception {
                    System.out.println("-------------------> new window data");
                    t.subscribe(new Consumer<Long>() {

                        @Override
                        public void accept(Long t) throws Exception {
                            System.out.println("--> accept(2): " + t);
                        }
                    });
                }
            });

輸出:

--> DataSource is subscribe!
--> openingIndicator is subscribe!
--> openingIndicator emitter: 1
--> DataSource emitter: 1
-------------------> new window data
--> apply(2): 1
--> closingIndicator is subscribe!
--> openingIndicator emitter: 2
--> DataSource emitter: 2
-------------------> new window data
--> apply(2): 2
--> closingIndicator is subscribe!
--> accept(2): 2
--> accept(2): 2
--> openingIndicator emitter: 3
--> DataSource emitter: 3
-------------------> new window data
--> apply(2): 3
--> closingIndicator is subscribe!
--> accept(2): 3
--> accept(2): 3
--> accept(2): 3
--> DataSource emitter: 4
--> openingIndicator emitter: 4
--> accept(2): 4
--> accept(2): 4
-------------------> new window data
--> apply(2): 4
--> closingIndicator is subscribe!
--> DataSource emitter: 5
--> accept(2): 5
--> accept(2): 5
--> openingIndicator emitter: 5

Javadoc: window(openingIndicator, closingIndicator)
Javadoc: window(openingIndicator, closingIndicator虱黄,bufferSize)

1.3 window(count)

這個(gè) window 的方法立即打開它的第一個(gè)窗口。每當(dāng)當(dāng)前窗口發(fā)射了 count 項(xiàng)數(shù)據(jù)吮成,它就關(guān)閉當(dāng)前窗口并打開一個(gè)新窗口橱乱。如果從原始Observable收到了 onErroronCompleted 通知它也會(huì)關(guān)閉當(dāng)前窗口。

這種 window 方法發(fā)射一系列不重疊的窗口粱甫,這些窗口的數(shù)據(jù)集合與原始 Observable發(fā)射的數(shù)據(jù)是 一一對(duì)應(yīng) 的泳叠。

img-window(count)

實(shí)例代碼:

    // 3. window(count)
    // 以count為緩存大小收集的不重疊的Observables對(duì)象,接受的數(shù)據(jù)與原數(shù)據(jù)彼此對(duì)應(yīng)
    Observable.range(1, 20)
        .window(5)  // 設(shè)置緩存大小為5
        .subscribe(new Consumer<Observable<Integer>>() {

            @Override
            public void accept(Observable<Integer> t) throws Exception {
                System.out.println("--------------> new data window");
                t.subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer t) throws Exception {
                        System.out.println("--> accept window(3): " + t);
                    }
                });
            }
        });

輸出:

--------------> new data window
--> accept window(3): 1
--> accept window(3): 2
--> accept window(3): 3
--> accept window(3): 4
--> accept window(3): 5
--------------> new data window
--> accept window(3): 6
--> accept window(3): 7
--> accept window(3): 8
--> accept window(3): 9
--> accept window(3): 10
--------------> new data window
--> accept window(3): 11
--> accept window(3): 12
--> accept window(3): 13
--> accept window(3): 14
--> accept window(3): 15
--------------> new data window
--> accept window(3): 16
--> accept window(3): 17
--> accept window(3): 18
--> accept window(3): 19
--> accept window(3): 20

Javadoc: window(count)

1.4 window(count, skip)

這個(gè) window 的方法立即打開它的第一個(gè)窗口茶宵。原始Observable每發(fā)射 skip 項(xiàng)數(shù)據(jù)它就打開 一個(gè)新窗口(例如危纫,如果 skip 等于3,每到第三項(xiàng)數(shù)據(jù)乌庶,它會(huì)創(chuàng)建一個(gè)新窗口)种蝶。每當(dāng)當(dāng)前窗口發(fā)射了 count 項(xiàng)數(shù)據(jù),它就關(guān)閉當(dāng)前窗口并打開一個(gè)新窗口瞒大。如果從原始Observable 收到了onErroronCompleted 通知它也會(huì)關(guān)閉當(dāng)前窗口螃征。

img-window(count, skip)

解析: window 一開始打開一個(gè) window,每發(fā)射 skip 項(xiàng)數(shù)據(jù)就會(huì)打開一個(gè) window 獨(dú)立收集 原始數(shù)據(jù)透敌,當(dāng) window 收集了 count 個(gè)數(shù)據(jù)就會(huì)關(guān)閉盯滚,開啟另外一個(gè)。當(dāng)原始Observable發(fā)送了onError或者onCompleted通知也會(huì)關(guān)閉當(dāng)前窗口酗电。

  • skip = count: 會(huì)依次順序接受原始數(shù)據(jù)魄藕,同window(count)
  • skip > count: 兩個(gè)窗口可能會(huì)有 skip-count 項(xiàng)數(shù)據(jù)丟失
  • skip < count: 兩個(gè)窗口可能會(huì)有 count-skip 項(xiàng)數(shù)據(jù)重疊

實(shí)例代碼:

    // 4. window(count,skip)
    // window一開始打開一個(gè)window,每發(fā)射skip項(xiàng)數(shù)據(jù)就會(huì)打開一個(gè)window獨(dú)立收集原始數(shù)據(jù)
    // 當(dāng)window收集了count個(gè)數(shù)據(jù)就會(huì)關(guān)閉window撵术,開啟另外一個(gè)背率。
    // 當(dāng)原始Observable發(fā)送了onError 或者 onCompleted 通知也會(huì)關(guān)閉當(dāng)前窗口。
    // 4.1 skip = count: 會(huì)依次順序接受原始數(shù)據(jù)荷荤,同window(count)
    Observable.range(1, 10)
        .window(2, 2)   // skip = count, 數(shù)據(jù)會(huì)依次順序輸出
        .subscribe(new Consumer<Observable<Integer>>() {

            @Override
            public void accept(Observable<Integer> t) throws Exception {
                
                t.observeOn(Schedulers.newThread())
                    .subscribe(new Consumer<Integer>() {
    
                        @Override
                        public void accept(Integer t) throws Exception {
                            System.out.println("--> accept window(4-1): " + t +" , ThreadID: "+ Thread.currentThread().getId());
                        }
                    });
            }
        });
    
    // 4.2 skip > count: 兩個(gè)窗口可能會(huì)有 skip-count 項(xiàng)數(shù)據(jù)丟失
    Observable.range(1, 10)
        .window(2, 3)   // skip > count, 數(shù)據(jù)會(huì)存在丟失
        .subscribe(new Consumer<Observable<Integer>>() {

            @Override
            public void accept(Observable<Integer> t) throws Exception {
                
                t.observeOn(Schedulers.newThread())
                    .subscribe(new Consumer<Integer>() {
    
                        @Override
                        public void accept(Integer t) throws Exception {
                            System.out.println("--> accept window(4-2): " + t +" , ThreadID: "+ Thread.currentThread().getId());
                        }
                    });
            }
        });
    
    // 4.3 skip < count: 兩個(gè)窗口可能會(huì)有 count-skip 項(xiàng)數(shù)據(jù)重疊
    Observable.range(1, 10)
        .window(3, 2)   // skip < count, 數(shù)據(jù)會(huì)重疊
        .subscribe(new Consumer<Observable<Integer>>() {

            @Override
            public void accept(Observable<Integer> t) throws Exception {
                
                t.observeOn(Schedulers.newThread())
                    .subscribe(new Consumer<Integer>() {
    
                        @Override
                        public void accept(Integer t) throws Exception {
                            System.out.println("--> accept window(4-3): " + t +" , ThreadID: "+ Thread.currentThread().getId());
                        }
                    });
            }
        });

輸出:

--> accept window(4-1): 1 , ThreadID: 11
--> accept window(4-1): 2 , ThreadID: 11
--> accept window(4-1): 4 , ThreadID: 12
--> accept window(4-1): 3 , ThreadID: 11
--> accept window(4-1): 5 , ThreadID: 12
--> accept window(4-1): 6 , ThreadID: 12
--> accept window(4-1): 7 , ThreadID: 13
--> accept window(4-1): 8 , ThreadID: 13
--> accept window(4-1): 9 , ThreadID: 13
--> accept window(4-1): 10 , ThreadID: 14
--> accept window(4-2): 1 , ThreadID: 15
--> accept window(4-2): 2 , ThreadID: 15
--> accept window(4-2): 4 , ThreadID: 16
--> accept window(4-2): 5 , ThreadID: 16
--> accept window(4-2): 7 , ThreadID: 17
--> accept window(4-2): 8 , ThreadID: 17
--> accept window(4-2): 10 , ThreadID: 18
--> accept window(4-3): 1 , ThreadID: 19
--> accept window(4-3): 2 , ThreadID: 19
--> accept window(4-3): 3 , ThreadID: 19
--> accept window(4-3): 3 , ThreadID: 20
--> accept window(4-3): 4 , ThreadID: 20
--> accept window(4-3): 5 , ThreadID: 20
--> accept window(4-3): 5 , ThreadID: 21
--> accept window(4-3): 6 , ThreadID: 21
--> accept window(4-3): 7 , ThreadID: 21
--> accept window(4-3): 7 , ThreadID: 22
--> accept window(4-3): 8 , ThreadID: 22
--> accept window(4-3): 9 , ThreadID: 22
--> accept window(4-3): 9 , ThreadID: 23
--> accept window(4-3): 10 , ThreadID: 23

Javadoc: window(count, skip)

1.5 window(timespan, TimeUnit)

這個(gè) window 的方法立即打開它的第一個(gè)窗口收集數(shù)據(jù)退渗。每當(dāng)過了 timespan 這么長(zhǎng)的時(shí)間段它就關(guān)閉當(dāng)前窗口并打開一個(gè)新窗口(時(shí)間單位是 unit 移稳,可選在調(diào)度器 scheduler 上執(zhí)行)收集數(shù)據(jù)蕴纳。如果從原始 Observable 收到了 onError 或 onCompleted 通知它也會(huì)關(guān)閉當(dāng)前窗口。

這種 window 方法發(fā)射一系列不重疊的窗口个粱,這些窗口的數(shù)據(jù)集合與原始Observable發(fā)射的數(shù)據(jù)也是 一一對(duì)應(yīng) 的古毛。

實(shí)例代碼:

    // 5. window(long timespan, TimeUnit unit)
    // 每當(dāng)過了 timespan 的時(shí)間段,它就關(guān)閉當(dāng)前窗口并打開另一個(gè)新window收集數(shù)據(jù)
    Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS)
        .window(2, TimeUnit.SECONDS)                            // 間隔2秒關(guān)閉當(dāng)前 window 并打開一個(gè)新 window 收集數(shù)據(jù)
    //  .window(2, TimeUnit.SECONDS, Schedulers.newThread())    // 指定在 newThread 線程中
        .subscribe(new Consumer<Observable<Long>>() {

            @Override
            public void accept(Observable<Long> t) throws Exception {
                t.observeOn(Schedulers.newThread())
                    .subscribe(new Consumer<Long>() {
    
                            @Override
                            public void accept(Long t) throws Exception {
                                System.out.println("--> accept window(5): " + t +" , ThreadID: "+ Thread.currentThread().getId() );
                            }
                        });
            }
        });

輸出:

--> accept window(5): 1 , ThreadID: 11
--> accept window(5): 2 , ThreadID: 11
--> accept window(5): 3 , ThreadID: 11
--> accept window(5): 4 , ThreadID: 14
--> accept window(5): 5 , ThreadID: 14
--> accept window(5): 6 , ThreadID: 15
--> accept window(5): 7 , ThreadID: 16
--> accept window(5): 8 , ThreadID: 16
--> accept window(5): 9 , ThreadID: 17
--> accept window(5): 10 , ThreadID: 17

Javadoc: window(timespan, TimeUnit)
Javadoc: window(timespan, TimeUnit, scheduler)

1.6 window(timespan, TimeUnit, count)

這個(gè) window 的方法立即打開它的第一個(gè)窗口。這個(gè)變體是 window(count) 和 window(timespan, unit[, scheduler]) 的結(jié)合稻薇,每當(dāng)過了 timespan 的時(shí)長(zhǎng)或者當(dāng)前窗口收到了 count 項(xiàng)數(shù)據(jù)嫂冻,它就關(guān)閉當(dāng)前窗口并打開另一個(gè)。如果從原始 Observable收到了 onErroronCompleted 通知它也會(huì)關(guān)閉當(dāng)前窗口塞椎。

這種window方法發(fā)射 一系列不重疊的窗口桨仿,這些窗口的數(shù)據(jù)集合與原始Observable發(fā)射的數(shù)據(jù)也是 一一對(duì)應(yīng) 的。

img-window(timespan, TimeUnit, count)

實(shí)例代碼:

    // 6. window(long timespan, TimeUnit unit, long count)
    // 每當(dāng)過了timespan的時(shí)間段或者當(dāng)前窗口收到了count項(xiàng)數(shù)據(jù)案狠,它就關(guān)閉當(dāng)前window并打開另一個(gè)window收集數(shù)據(jù)
    Observable.intervalRange(1, 12, 0, 500, TimeUnit.MILLISECONDS)
        .window(2, TimeUnit.SECONDS, 5) // 每隔2秒關(guān)閉當(dāng)前收集數(shù)據(jù)的window并開啟一個(gè)window收集5項(xiàng)數(shù)據(jù)
    //  .window(2, TimeUnit.SECONDS,Schedulers.newThread(), 5 ) // 指定在 newThread 線程中
        .subscribe(new Consumer<Observable<Long>>() {

            @Override
            public void accept(Observable<Long> t) throws Exception {
                t.observeOn(Schedulers.newThread())
                    .subscribe(new Consumer<Long>() {

                            @Override
                            public void accept(Long t) throws Exception {
                                System.out.println("--> accept window(6): " + t + " , ThreadID: "+ Thread.currentThread().getId() );
                            }
                        });
            }
        });

輸出:

--> accept window(6): 1 , ThreadID: 11
--> accept window(6): 2 , ThreadID: 11
--> accept window(6): 3 , ThreadID: 11
--> accept window(6): 4 , ThreadID: 11
--> accept window(6): 5 , ThreadID: 11
--> accept window(6): 6 , ThreadID: 14
--> accept window(6): 7 , ThreadID: 14
--> accept window(6): 8 , ThreadID: 14
--> accept window(6): 9 , ThreadID: 14
--> accept window(6): 10 , ThreadID: 14
--> accept window(6): 11 , ThreadID: 15
--> accept window(6): 12 , ThreadID: 15

Javadoc: window(timespan, TimeUnit, count)
Javadoc: window(timespan, TimeUnit, scheduler, count)

1.7 window(timespan, timeskip, TimeUnit)

這個(gè) window 的方法立即打開它的第一個(gè)窗口服傍。隨后每當(dāng)過了 timeskip 的時(shí)長(zhǎng)就打開一個(gè)新窗口(時(shí)間單位是 unit ,可選在調(diào)度器 scheduler 上執(zhí)行)骂铁,當(dāng)窗口打開的時(shí)長(zhǎng)達(dá) 到 timespan 吹零,它就關(guān)閉當(dāng)前打開的窗口。如果從原始Observable收到 了 onError 或 onCompleted 通知它也會(huì)關(guān)閉當(dāng)前窗口拉庵。窗口的數(shù)據(jù)可能重疊也可能有間隙灿椅,取決于你設(shè)置的 timeskiptimespan 的值。

img-window(timespan,timeskip, TimeUnit)

解析: 在每一個(gè) timeskip 時(shí)期內(nèi)都創(chuàng)建一個(gè)新的 window钞支,然后獨(dú)立收集 timespan 時(shí)間段的原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)茫蛹。注意:因?yàn)槊總€(gè) window 都是獨(dú)立接收數(shù)據(jù),當(dāng)接收數(shù)據(jù)的時(shí)間與創(chuàng)建新 window 的時(shí)間不一致時(shí)會(huì)有數(shù)據(jù)項(xiàng)重復(fù)伸辟,丟失等情況麻惶。

  • skip = timespan: 會(huì)依次順序接受原始數(shù)據(jù),同window(count)
  • skip > timespan: 兩個(gè)窗口可能會(huì)有 skip-timespan 項(xiàng)數(shù)據(jù)丟失
  • skip < timespan: 兩個(gè)窗口可能會(huì)有 timespan-skip 項(xiàng)數(shù)據(jù)重疊

實(shí)例代碼:

        // 7. window(long timespan, long timeskip, TimeUnit unit)
        // 在每一個(gè)timeskip時(shí)期內(nèi)都創(chuàng)建一個(gè)新的window,然后獨(dú)立收集timespan時(shí)間段的原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)信夫,
        // 如果timespan長(zhǎng)于timeskip窃蹋,它發(fā)射的數(shù)據(jù)包將會(huì)重疊,因此可能包含重復(fù)的數(shù)據(jù)項(xiàng)静稻。
        // 7.1 skip = timespan: 會(huì)依次順序接受原始數(shù)據(jù)警没,同window(count)
        Observable.intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)
            .window(1, 1, TimeUnit.SECONDS)                             // 設(shè)置每秒創(chuàng)建一個(gè)window,收集2秒的數(shù)據(jù)
        //  .window(2, 1, TimeUnit.SECONDS, Schedulers.newThread())     // 指定在 newThread 線程中
            .subscribe(new Consumer<Observable<Long>>() {

                @Override
                public void accept(Observable<Long> t) throws Exception {
                    t.observeOn(Schedulers.newThread())
                        .subscribe(new Consumer<Long>() {
    
                            @Override
                            public void accept(Long t) throws Exception {
                                System.out.println("--> accept window(7-1): " + t + " , ThreadID: "+ Thread.currentThread().getId());
                            }
                        });
                }
            });
        
        // 7.2 skip > timespan: 兩個(gè)窗口可能會(huì)有 skip-timespan 項(xiàng)數(shù)據(jù)丟失
        Observable.intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)
            .window(1, 2, TimeUnit.SECONDS)                             // 設(shè)置每秒創(chuàng)建一個(gè)window振湾,收集2秒的數(shù)據(jù)
        //  .window(2, 1, TimeUnit.SECONDS, Schedulers.newThread())     // 指定在 newThread 線程中
            .subscribe(new Consumer<Observable<Long>>() {

                @Override
                public void accept(Observable<Long> t) throws Exception {
                    t.observeOn(Schedulers.newThread())
                        .subscribe(new Consumer<Long>() {
    
                            @Override
                            public void accept(Long t) throws Exception {
                                System.out.println("--> accept window(7-2): " + t + " , ThreadID: "+ Thread.currentThread().getId());
                            }
                        });
                }
            });
        
        // 7.3 skip < timespan: 兩個(gè)窗口可能會(huì)有 timespan-skip 項(xiàng)數(shù)據(jù)重疊
        Observable.intervalRange(1, 5, 0, 1000, TimeUnit.MILLISECONDS)
            .window(2, 1, TimeUnit.SECONDS)                             // 設(shè)置每秒創(chuàng)建一個(gè)window杀迹,收集2秒的數(shù)據(jù)
        //  .window(2, 1, TimeUnit.SECONDS, Schedulers.newThread())     // 指定在 newThread 線程中
            .subscribe(new Consumer<Observable<Long>>() {

                @Override
                public void accept(Observable<Long> t) throws Exception {
                    t.observeOn(Schedulers.newThread())
                        .subscribe(new Consumer<Long>() {
    
                            @Override
                            public void accept(Long t) throws Exception {
                                System.out.println("--> accept window(7-3): " + t + " , ThreadID: "+ Thread.currentThread().getId());
                            }
                        });
                }
            });

輸出:

--> accept window(7-1): 1 , ThreadID: 11
--> accept window(7-1): 2 , ThreadID: 11
--> accept window(7-1): 3 , ThreadID: 14
--> accept window(7-1): 4 , ThreadID: 15
--> accept window(7-1): 5 , ThreadID: 17
----------------------------------------------------------------------
--> accept window(7-2): 1 , ThreadID: 11
--> accept window(7-2): 3 , ThreadID: 14
--> accept window(7-2): 5 , ThreadID: 15
----------------------------------------------------------------------
--> accept window(7-3): 1 , ThreadID: 11
--> accept window(7-3): 2 , ThreadID: 11
--> accept window(7-3): 2 , ThreadID: 14
--> accept window(7-3): 3 , ThreadID: 14
--> accept window(7-3): 3 , ThreadID: 15
--> accept window(7-3): 4 , ThreadID: 15
--> accept window(7-3): 4 , ThreadID: 16
--> accept window(7-3): 5 , ThreadID: 16
--> accept window(7-3): 5 , ThreadID: 17

Javadoc: window(timespan, timeskip, TimeUnit)
Javadoc: window(timespan, timeskip, TimeUnit, scheduler)

2. GroupBy

將一個(gè) Observable 分拆為一些 Observables 集合,它們中的每一個(gè)發(fā)射原始 Observable 的一個(gè)子序列押搪。

RxJava實(shí)現(xiàn)了 groupBy 操作符树酪。它返回Observable的一個(gè)特殊子類 GroupedObservable ,實(shí)現(xiàn)了GroupedObservable 接口的對(duì)象有一個(gè)額外的方法 getKey 大州,這個(gè) Key 用于將數(shù)據(jù)分組到指定的Observable续语。有一個(gè)版本的 groupBy 允許你傳遞一個(gè)變換函數(shù),這樣它可以在發(fā)射結(jié)果 GroupedObservable 之前改變數(shù)據(jù)項(xiàng)厦画。

如果你取消訂閱一個(gè) GroupedObservable 疮茄,那個(gè) Observable 將會(huì)終止滥朱。如果之后原始的 Observable又發(fā)射了一個(gè)與這個(gè)Observable的Key匹配的數(shù)據(jù), groupBy 將會(huì)為這個(gè) Key 創(chuàng)建一個(gè)新的 GroupedObservable力试。

img-GroupBy

注意: groupBy 將原始 Observable 分解為一個(gè)發(fā)射多個(gè) GroupedObservable 的Observable徙邻,一旦有訂閱,每個(gè) GroupedObservable 就開始緩存數(shù)據(jù)畸裳。因此缰犁,如果你忽略這 些 GroupedObservable 中的任何一個(gè),這個(gè)緩存可能形成一個(gè)潛在的內(nèi)存泄露怖糊。因此民鼓,如果你不想觀察,也不要忽略 GroupedObservable 蓬抄。你應(yīng)該使用像 take(0) 這樣會(huì)丟棄自己的緩存的操作符丰嘉。

2.1 groupBy(keySelector)

GroupBy 操作符將原始 Observable 分拆為一些 Observables 集合,它們中的每一個(gè)發(fā)射原始 Observable 數(shù)據(jù)序列的一個(gè)子序列嚷缭。哪個(gè)數(shù)據(jù)項(xiàng)由哪一個(gè) Observable 發(fā)射是由一個(gè)函數(shù)判定的饮亏,這個(gè)函數(shù)給每一項(xiàng)指定一個(gè)KeyKey相同的數(shù)據(jù)會(huì)被同一個(gè) Observable 發(fā)射阅爽。還有一個(gè) delayError 參數(shù)的方法路幸,指定是否延遲 Error 通知的Observable。

實(shí)例代碼:

    // 1. groupBy(keySelector)
    // 將原始數(shù)據(jù)處理后加上分組tag付翁,通過GroupedObservable發(fā)射分組數(shù)據(jù)
    Observable.range(1, 10)
        .groupBy(new Function<Integer, String>() {

            @Override
            public String apply(Integer t) throws Exception {
                // 不同的key將會(huì)產(chǎn)生不同分組的Observable
                return t % 2 == 0 ? "Even" : "Odd"; // 將數(shù)據(jù)奇偶數(shù)進(jìn)行分組,
            }
        }).observeOn(Schedulers.newThread())
            .subscribe(new Consumer<GroupedObservable<String, Integer>>() {

                @Override
                public void accept(GroupedObservable<String, Integer> grouped) throws Exception {
                    // 得到每個(gè)分組數(shù)據(jù)的的Observable
                    grouped.subscribe(new Consumer<Integer>() {

                        @Override
                        public void accept(Integer t) throws Exception {
                            // 得到數(shù)據(jù)
                            System.out.println("--> accept groupBy(1):   groupKey: " + grouped.getKey() + ", value: " + t);
                        }
                    });
                }
            });

輸出:

--> accept groupBy(1):   groupKey: Odd, value: 1
--> accept groupBy(1):   groupKey: Odd, value: 3
--> accept groupBy(1):   groupKey: Odd, value: 5
--> accept groupBy(1):   groupKey: Odd, value: 7
--> accept groupBy(1):   groupKey: Odd, value: 9
--> accept groupBy(1):   groupKey: Even, value: 2
--> accept groupBy(1):   groupKey: Even, value: 4
--> accept groupBy(1):   groupKey: Even, value: 6
--> accept groupBy(1):   groupKey: Even, value: 8
--> accept groupBy(1):   groupKey: Even, value: 10

Javadoc: groupBy(keySelector)
Javadoc: groupBy(keySelector, delayError)

2.2 groupBy(keySelector, valueSelector)

GroupBy 操作符通過 keySelector 將原始 Observable 按照 Key 分組简肴,產(chǎn)生不同的 Observable,再通過 valueSelector 對(duì)原始的數(shù)據(jù)進(jìn)行處理百侧,在發(fā)送每一個(gè)被處理完成的數(shù)據(jù)砰识。

實(shí)例代碼:

    // 2. groupBy(Function(T,R),F(xiàn)unction(T,R))
    // 第一個(gè)func對(duì)原數(shù)據(jù)進(jìn)行分組處理(僅僅分組添加key佣渴,不處理原始數(shù)據(jù))辫狼,第二個(gè)func對(duì)原始數(shù)據(jù)進(jìn)行處理
    Observable.range(1, 10)
        .groupBy(new Function<Integer, String>() {

            @Override
            public String apply(Integer t) throws Exception {
                // 對(duì)原始數(shù)據(jù)進(jìn)行分組處理
                return t % 2 == 0 ? "even" : "odd";
            }
        },new Function<Integer, String>() {

            @Override
            public String apply(Integer t) throws Exception {
                // 對(duì)原始數(shù)據(jù)進(jìn)行數(shù)據(jù)轉(zhuǎn)換處理
                return t + " is " + (t % 2 == 0 ? "even" : "odd");
            }
            }).observeOn(Schedulers.newThread()).subscribe(new Consumer<GroupedObservable<String, String>>() {

                @Override
                public void accept(GroupedObservable<String, String> grouped) throws Exception {
                    grouped.subscribe(new Consumer<String>() {

                        @Override
                        public void accept(String t) throws Exception {
                            // 接受最終的分組處理以及原數(shù)據(jù)處理后的數(shù)據(jù)
                            System.out.println("--> accept groupBy(2):   groupKey = " + grouped.getKey()
                                    + ", value = " + t);
                        }
                    });
                }
            });

輸出:

--> accept groupBy(2):   groupKey = odd, value = 1 is odd
--> accept groupBy(2):   groupKey = odd, value = 3 is odd
--> accept groupBy(2):   groupKey = odd, value = 5 is odd
--> accept groupBy(2):   groupKey = odd, value = 7 is odd
--> accept groupBy(2):   groupKey = odd, value = 9 is odd
--> accept groupBy(2):   groupKey = even, value = 2 is even
--> accept groupBy(2):   groupKey = even, value = 4 is even
--> accept groupBy(2):   groupKey = even, value = 6 is even
--> accept groupBy(2):   groupKey = even, value = 8 is even
--> accept groupBy(2):   groupKey = even, value = 10 is even

Javadoc: groupBy(keySelector, valueSelector)
Javadoc: groupBy(keySelector, valueSelector, delayError)
Javadoc: groupBy(keySelector, valueSelector, delayError, bufferSize)

3. Scan

連續(xù)地對(duì)數(shù)據(jù)序列的每一項(xiàng)應(yīng)用一個(gè)函數(shù),然后連續(xù)發(fā)射結(jié)果辛润。

3.1 scan(accumulator)

Scan 操作符對(duì)原始 Observable 發(fā)射的第一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù)膨处,然后將那個(gè)函數(shù)的結(jié)果作為 自己的第一項(xiàng)數(shù)據(jù)發(fā)射。它將函數(shù)的結(jié)果同第二項(xiàng)數(shù)據(jù)一起填充給這個(gè)函數(shù)來產(chǎn)生它自己的第二項(xiàng)數(shù)據(jù)砂竖。它持續(xù)進(jìn)行這個(gè)過程來產(chǎn)生剩余的數(shù)據(jù)序列真椿。這個(gè)操作符在某些情況下被叫做 accumulator

img-scan(accumulator)

解析: 先發(fā)送原始數(shù)據(jù)第一項(xiàng)數(shù)據(jù)乎澄,然后將這個(gè)數(shù)據(jù)與下一個(gè)原始數(shù)據(jù)作為參數(shù)傳遞給 accumulator突硝, 處理后發(fā)送這個(gè)數(shù)據(jù),并與下一個(gè)原始數(shù)據(jù)一起傳遞到下一次 accumulator 三圆,直到數(shù)據(jù)序列結(jié)束狞换。類似一個(gè)累積的過程

實(shí)例代碼:

        // 1. scan(BiFunction(Integer sum, Integer t2)) 
        // 接受數(shù)據(jù)序列舟肉,從第二個(gè)數(shù)據(jù)開始修噪,每次會(huì)將上次處理數(shù)據(jù)和現(xiàn)在接受的數(shù)據(jù)進(jìn)行處理后發(fā)送
        Observable.range(1, 10)
            .scan(new BiFunction<Integer, Integer, Integer>() {
                
                @Override
                public Integer apply(Integer LastItem, Integer item) throws Exception {
                    System.out.println("--> apply: LastItem = " + LastItem + ", CurrentItem = " + item);
                    return LastItem + item; // 實(shí)現(xiàn)求和操作
                }
            }).subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept scan(1): " + t);
                    
                }
            });

輸出:

--> accept scan(1): 1
--> apply: LastItem = 1, CurrentItem = 2
--> accept scan(1): 3
--> apply: LastItem = 3, CurrentItem = 3
--> accept scan(1): 6
--> apply: LastItem = 6, CurrentItem = 4
--> accept scan(1): 10
--> apply: LastItem = 10, CurrentItem = 5
--> accept scan(1): 15
--> apply: LastItem = 15, CurrentItem = 6
--> accept scan(1): 21
--> apply: LastItem = 21, CurrentItem = 7
--> accept scan(1): 28
--> apply: LastItem = 28, CurrentItem = 8
--> accept scan(1): 36
--> apply: LastItem = 36, CurrentItem = 9
--> accept scan(1): 45
--> apply: LastItem = 45, CurrentItem = 10
--> accept scan(1): 55

Javadoc: scan(accumulator)

3.2 scan(initialValue, accumulator)

有一個(gè) scan 操作符的方法,你可以傳遞一個(gè)種子值給累加器函數(shù)的第一次調(diào)用(Observable 發(fā)射的第一項(xiàng)數(shù)據(jù))路媚。如果你使用這個(gè)版本黄琼,scan 將發(fā)射種子值作為自己的第一項(xiàng)數(shù)據(jù)。

注意: 傳遞 null 作為種子值與不傳遞是不同的整慎,null 種子值是合法的脏款。

img-scan(initialValue, accumulator)

解析: 指定初始種子值,第一次發(fā)送種子值裤园,后續(xù)發(fā)送原始數(shù)據(jù)序列以及累計(jì)處理數(shù)據(jù)撤师。

實(shí)例代碼:

    // 2. scan(R,Func2)
    // 指定初始種子值,第一次發(fā)送種子值,后續(xù)發(fā)送原始數(shù)據(jù)序列以及累計(jì)處理數(shù)據(jù)
    Observable.range(1, 10)
        .scan(100, new BiFunction<Integer, Integer, Integer>() {    // 指定初始種子數(shù)據(jù)為100

            @Override
            public Integer apply(Integer lastValue, Integer item) throws Exception {
                System.out.println("--> apply: lastValue = " + lastValue + ", item = " + item);
                return lastValue + item;    // 指定初值的求和操作
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept sacn(2) = " + t);
            }
        });

輸出:

--> accept sacn(2) = 100
--> apply: lastValue = 100, item = 1
--> accept sacn(2) = 101
--> apply: lastValue = 101, item = 2
--> accept sacn(2) = 103
--> apply: lastValue = 103, item = 3
--> accept sacn(2) = 106
--> apply: lastValue = 106, item = 4
--> accept sacn(2) = 110
--> apply: lastValue = 110, item = 5
--> accept sacn(2) = 115
--> apply: lastValue = 115, item = 6
--> accept sacn(2) = 121
--> apply: lastValue = 121, item = 7
--> accept sacn(2) = 128
--> apply: lastValue = 128, item = 8
--> accept sacn(2) = 136
--> apply: lastValue = 136, item = 9
--> accept sacn(2) = 145
--> apply: lastValue = 145, item = 10
--> accept sacn(2) = 155

注意: 這個(gè)操作符默認(rèn)不在任何特定的調(diào)度器上執(zhí)行拧揽。
Javadoc: scan(initialValue, accumulator)

4. Cast

Cast 將原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)都強(qiáng)制轉(zhuǎn)換為一個(gè)指定的類型剃盾,然后再發(fā)射數(shù)據(jù),它是 map 的一個(gè)特殊版本淤袜。轉(zhuǎn)換失敗會(huì)有Error通知痒谴。

4.1 cast(clazz)

將原始數(shù)據(jù)強(qiáng)制轉(zhuǎn)換為指定的 clazz 類型,如果轉(zhuǎn)換成功發(fā)送轉(zhuǎn)換后的數(shù)據(jù)铡羡,否則發(fā)送Error通知积蔚。一般用于 數(shù)據(jù)類型的轉(zhuǎn)換數(shù)據(jù)實(shí)際類型的檢查(多態(tài))

img-cast(clazz)

實(shí)例代碼:

    //  cast(clazz) 
    // 1. 基本類型轉(zhuǎn)換
    Observable.range(1, 5)
        .cast(Integer.class)
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("-- accept cast(1): " + t);
            }
        });
            
    // 2. 轉(zhuǎn)換失敗通知
    System.out.println("------------------------------------");
    Observable.just((byte)1)
        .cast(Integer.class)
        .subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe(2)");
            }

            @Override
            public void onNext(Integer t) {
                System.out.println("--> onNext(2) = " + t);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(2) = " + e.toString());
            }

            @Override
            public void onComplete() {
                System.out.println("--> onComplete(2)");
            }
        });
    
    System.out.println("------------------------------------");
    class Animal{
        public int id;
    }

    class Dog extends Animal{
        public String name;

        @Override
        public String toString() {
            return "Dog [name=" + name + ", id=" + id + "]";
        }
    }
    
    //  3. 多態(tài)轉(zhuǎn)換烦周,檢查數(shù)據(jù)的實(shí)際類型
    Animal animal = new Dog();
    animal.id = 666;
    Observable.just(animal)
        .cast(Dog.class)
        .subscribe(new Consumer<Dog>() {

            @Override
            public void accept(Dog t) throws Exception {
                System.out.println("--> accept cast(3): " + t);
            }
        });

輸出:

-- accept cast(1): 1
-- accept cast(1): 2
-- accept cast(1): 3
-- accept cast(1): 4
-- accept cast(1): 5
------------------------------------
--> onSubscribe(2)
--> onError(2) = java.lang.ClassCastException: Cannot cast java.lang.Byte to java.lang.Integer
------------------------------------
--> accept cast(3): Dog [name=null, id=666]

Javadoc: cast(clazz)

小結(jié):

在實(shí)際開發(fā)場(chǎng)景中尽爆,比如網(wǎng)絡(luò)數(shù)據(jù)請(qǐng)求場(chǎng)景,原始的數(shù)據(jù)格式或類型可能并不滿足開發(fā)的實(shí)際需要读慎,需要對(duì)數(shù)據(jù)進(jìn)行處理教翩。數(shù)據(jù)變換操作在實(shí)際開發(fā)場(chǎng)景中還是非常多的,所以數(shù)據(jù)的變換是非常重要的贪壳。使用Rx的數(shù)據(jù)變換操作可以輕松完成大多數(shù)場(chǎng)景的數(shù)據(jù)變換操作饱亿,提高開發(fā)效率。

Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實(shí)例

實(shí)例代碼:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末闰靴,一起剝皮案震驚了整個(gè)濱河市彪笼,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌蚂且,老刑警劉巖配猫,帶你破解...
    沈念sama閱讀 218,284評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異杏死,居然都是意外死亡泵肄,警方通過查閱死者的電腦和手機(jī)捆交,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來腐巢,“玉大人品追,你說我怎么就攤上這事》氡” “怎么了肉瓦?”我有些...
    開封第一講書人閱讀 164,614評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)胃惜。 經(jīng)常有香客問我泞莉,道長(zhǎng),這世上最難降的妖魔是什么船殉? 我笑而不...
    開封第一講書人閱讀 58,671評(píng)論 1 293
  • 正文 為了忘掉前任鲫趁,我火速辦了婚禮,結(jié)果婚禮上利虫,老公的妹妹穿的比我還像新娘饮寞。我一直安慰自己,他們只是感情好列吼,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,699評(píng)論 6 392
  • 文/花漫 我一把揭開白布幽崩。 她就那樣靜靜地躺著,像睡著了一般寞钥。 火紅的嫁衣襯著肌膚如雪慌申。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,562評(píng)論 1 305
  • 那天理郑,我揣著相機(jī)與錄音蹄溉,去河邊找鬼。 笑死您炉,一個(gè)胖子當(dāng)著我的面吹牛柒爵,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播赚爵,決...
    沈念sama閱讀 40,309評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼棉胀,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了冀膝?” 一聲冷哼從身側(cè)響起唁奢,我...
    開封第一講書人閱讀 39,223評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎窝剖,沒想到半個(gè)月后麻掸,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,668評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡赐纱,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,859評(píng)論 3 336
  • 正文 我和宋清朗相戀三年脊奋,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了熬北。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,981評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡诚隙,死狀恐怖讶隐,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情最楷,我是刑警寧澤,帶...
    沈念sama閱讀 35,705評(píng)論 5 347
  • 正文 年R本政府宣布待错,位于F島的核電站籽孙,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏火俄。R本人自食惡果不足惜犯建,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,310評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望瓜客。 院中可真熱鬧适瓦,春花似錦、人聲如沸谱仪。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)疯攒。三九已至嗦随,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間敬尺,已是汗流浹背枚尼。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評(píng)論 1 270
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留砂吞,地道東北人署恍。 一個(gè)月前我還...
    沈念sama閱讀 48,146評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像蜻直,于是被迫代替她去往敵國(guó)和親盯质。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,933評(píng)論 2 355