響應(yīng)式編程(reactive programming)簡介 -- 基于RxJava2

1蝗蛙、什么是響應(yīng)式編程 著角?

部分內(nèi)容參考自:《RxJava Essentials 中文版》by yuxingxin

響應(yīng)式編程是一種基于異步數(shù)據(jù)流概念的編程模式长已。數(shù)據(jù)流就像一條河:它可以被觀測砰逻,被過濾鸣驱,被操作,或者為新的消費者與另外一條流合并為一條新的流蝠咆。

1.1 時間表

  • 90年代后期
    受微軟的一名計算機科學(xué)家Erik Meijer啟發(fā)的思想踊东,用來設(shè)計和開發(fā)微軟的Rx庫。
    Rx是微軟.NET的一個響應(yīng)式擴展刚操。Rx借助可觀測的序列提供一種簡單的方式來創(chuàng)建異步的闸翅,基于事件驅(qū)動的程序。開發(fā)者可以使用Observables模擬異步數(shù)據(jù)流菊霜,使用LINQ語法查詢Observables坚冀,并且很容易管理調(diào)度器的并發(fā)。
    Rx讓眾所周知的概念變得易于實現(xiàn)和消費鉴逞,例如push方法记某。在響應(yīng)式的世界里,我們不能假裝作用戶不關(guān)注或者是不抱怨它而一味的等待函數(shù)的返回結(jié)果构捡,網(wǎng)絡(luò)調(diào)用辙纬,或者數(shù)據(jù)庫查詢的返回結(jié)果。我們時刻都在等待某些東西叭喜,這就讓我們失去了并行處理其他事情的機會贺拣,提供更好的用戶體驗,讓我們的軟件免受順序鏈的影響捂蕴,而阻塞編程譬涡。
  • 2012年
    Netflix在2012年開始意識到他們的架構(gòu)要滿足他們龐大的用戶群體已經(jīng)變得步履維艱。因此他們決定重新設(shè)計架構(gòu)來減少REST調(diào)用的次數(shù)啥辨。取代幾十次的REST調(diào)用涡匀,而是讓客戶端自己處理需要的數(shù)據(jù),他們決定基于客戶端需求創(chuàng)建一個專門優(yōu)化過的REST調(diào)用溉知。
  • 2013年
    2013年2月份陨瘩,Ben ChristensenJafar Husain發(fā)在Netflix技術(shù)博客的一篇文章第一次向世界展示了RxJava腕够。
  • 2014年
    2014年9月份,發(fā)布RxJava 1.0.0正式版舌劳。
  • 2016年
    2016年9月份帚湘,發(fā)布RxJava 2.0.0正式版。
  • 2020年
    2020年2月份甚淡,發(fā)布RxJava 3.0.0正式版大诸。

1.2 定義

這里只貼出鏈接,不做介紹贯卦,感興趣的自行查看资柔。

1.3 更友好的介紹

參考自:《什么是響應(yīng)式編程?》by 享學(xué)IT
介紹了響應(yīng)式編程的三大特點:變化傳遞(propagation of change)撵割、基于數(shù)據(jù)流(data stream)贿堰、聲明式(declarative)
具體形象的例子:“堪稱“響應(yīng)式典范”的強大的生產(chǎn)力工具——電子表格”

1.3.1 【滿199減40活動】購物計劃

下方【購物計劃表】中啡彬,【單價】【數(shù)量】是原始輸入羹与,【商品金額】跟隨【單價】和【數(shù)量】的變化而變化,【滿199減40】跟隨【商品金額】的變化而變化外遇,以此類推注簿,【訂單總金額】、【郵費】跳仿、【最終應(yīng)付款】也跟隨相應(yīng)的項的變化而變化诡渴。具體的公式以及變化的傳遞流向見【購物計劃表 公式】

  • 變化傳遞(propagation of change)
  • 基于數(shù)據(jù)流(data stream)
  • 聲明式(declarative)
購物計劃表

購物計劃表 公式

2菲语、擴展的觀察者模式

2.1 觀察者模式

觀察者模式
  • Observable
    可觀察對象妄辩,也有叫做Source,內(nèi)部維護(hù)一組觀察者observers山上,當(dāng)event有更新時眼耀,observableevent推(push)給observer
  • Observer
    觀察者佩憾,也有叫做Consumer哮伟、Subscriber,觀察observable妄帘,接收observable推(push)過來的event楞黄,做出相應(yīng)的反應(yīng)(不同的observer的反應(yīng)可能不一樣)。
  • Event
    observer所關(guān)心的事件event抡驼。
  • subscrbe
    observerobservable連接起來的操作鬼廓,叫做訂閱(subscribe)。

2.2 擴展的觀察者

上述4個概念致盟,也就是RxJava中碎税,最基本的幾個概念尤慰。Observer通過subscribe方法訂閱Observable,從而雷蹂,在Event有變化時伟端,Observable可以分發(fā)給Observer

  • Observable
  • Observer
  • Event
  • subscribe

與傳統(tǒng)的觀察者模式不同的是萎河,RxJava不光會通過onNext方法分發(fā)普通事件(相當(dāng)于上節(jié)描述的Observer中的accept方法)荔泳,另外還會通過onCompleteonError方法分發(fā)兩個特殊事件蕉饼。

  • onComplete
    事件流已完成虐杯。表明事件流已成功發(fā)出所有的事件,后續(xù)不會再有新的事件發(fā)出昧港。(成功結(jié)束)
  • onError
    事件流異常擎椰。表明由于發(fā)生異常,事件流將被打斷创肥,后續(xù)不會再有新的事件發(fā)出达舒。(異常結(jié)束;特殊情況下叹侄,可能會人為在事件流過程中刻意發(fā)出error事件)

在一個正確實現(xiàn)的事件流中巩搏,都應(yīng)該有一個onCompleteonError作為事件流的最后一個事件,并且這兩者也是互斥趾代,發(fā)出了其中一個事件贯底,另一個事件就不應(yīng)該再被發(fā)出。

2.3 RxJava = Observer + 異步處理

本節(jié)參考自馬士兵教育視頻

RxJava = Observer + 異步處理

3撒强、相關(guān)概念

3.1 函數(shù)式編程

  • 函數(shù)式編程是與面向?qū)ο缶幊逃胁町惖囊粋€編程范式禽捆,函數(shù)式編程是一個很大的領(lǐng)域,本文不打算對此做深入分析飘哨、介紹胚想;
  • 在函數(shù)式編程范式中,函數(shù)是頭等公民芽隆,可以獨立存在(不像面向?qū)ο笞欠瘮?shù)或稱為方法,必須屬于某個類)胚吁;并且牙躺,函數(shù)可以作為方法的入?yún)ⅲ部梢宰鳛榉椒ǖ姆祷刂担?/li>
  • Java是純面向?qū)ο笳Z言囤采,本質(zhì)上是不支持函數(shù)式編程的述呐,但是,通過函數(shù)式接口(一個有且僅有一個抽象方法的接口)蕉毯,可以部分模擬函數(shù)式編程乓搬;

3.2 函數(shù)式接口

  • 直接看例子
        // 函數(shù)式接口實例思犁,最常見的Runnable接口
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("Runnable is a Functional Interface");
            }
        };
    
        // 函數(shù)式接口FunctionalInterface,只有一個accept方法进肯,有入?yún)⒑头祷刂?    interface FunctionalInterface {
            String accept(int i1, int i2);
        }
    
        // FunctionalInterface實例
        FunctionalInterface functionalInterface = new FunctionalInterface() {
            @Override
            public String accept(int i1, int i2) {
                return String.valueOf(i1 + i2);
            }
        };
    

3.3 lambda表達(dá)式

一開始不習(xí)慣的情況下激蹲,可以先像上一節(jié)那樣,先按顯性new實例的方式寫出代碼江掩,然后光標(biāo)移動到Android Studio標(biāo)成灰色字的部分(new FunctionalInterface處)学辱,敲擊alt + enter,即可通過IDE直接進(jìn)行l(wèi)ambda改造环形。

  • Lambda 表達(dá)式策泣,也可稱為閉包,它是推動 Java 8 發(fā)布的最重要新特性抬吟。
  • Lambda 允許把函數(shù)作為一個方法的參數(shù)(函數(shù)作為參數(shù)傳遞進(jìn)方法中)萨咕。
  • 使用 Lambda 表達(dá)式可以使代碼變的更加簡潔緊湊。(下一章火本,看示例代碼會深有感觸
  • 我們把上節(jié)的例子做一下lambda改造
        // Runnable實例危队,lambda形式,() -> { statement; };
        Runnable runnable = () -> {
            System.out.println("Runnable is a Functional Interface");
        };
    
        // 當(dāng)方法體只有一行時钙畔,可以進(jìn)一步簡寫茫陆,() -> statement;
        Runnable runnable = () -> System.out.println("Runnable is a Functional Interface");
        
        // FunctionalInterface實例,lambda形式擎析,(param1, param2, ...) -> { return expression; };
        FunctionalInterface functionalInterface = (i1, i2) -> {
            return String.valueOf(i1 + i2);
        };
    
        // 當(dāng)方法體只有一行時簿盅,可以進(jìn)一步簡寫,(param1, param2, ...) -> expression;
        FunctionalInterface functionalInterface = (i1, i2) -> String.valueOf(i1 + i2);
    

4叔锐、RxJava2的使用

基于以下RxJava版本:'io.reactivex.rxjava2:rxjava:2.2.11'

4.1 最簡單的示例(create創(chuàng)建挪鹏、subscribe訂閱)

  • 示例代碼

        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                System.out.println(Thread.currentThread().getName() + " start to emit");
                emitter.onNext("Hello");
                emitter.onNext("CodingDog1024");
            }
        });
    
        Consumer<String> consumer = new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(Thread.currentThread().getName() + " consumer accept: " + s);
            }
        };
    
        observable.subscribe(consumer);
    
  • 輸出

        main start to emit
        main consumer accept: Hello
        main consumer accept: CodingDog1024
    
  • lambda化 (對比前面的代碼,可以看到代碼明顯簡潔緊湊很多)

        Observable<String> observable = Observable.create(emitter -> {
            System.out.println(Thread.currentThread().getName() + " start to emit");
            emitter.onNext("Hello");
            emitter.onNext("CodingDog1024");
        });
    
        Consumer<String> consumer = s -> System.out.println(Thread.currentThread().getName() + " consumer accept: " + s);
    
        observable.subscribe(consumer);
    
  • 鏈?zhǔn)秸{(diào)用 (可以看到愉烙,結(jié)合lambda和鏈?zhǔn)秸{(diào)用讨盒,代碼更加的緊湊,少了很多干擾性的代碼步责,讓我們可以更加聚焦于業(yè)務(wù)邏輯)

        Observable
                .<String>create(emitter -> {
                    System.out.println(Thread.currentThread().getName() + " start to emit");
                    emitter.onNext("Hello");
                    emitter.onNext("CodingDog1024");
                })
                .subscribe(s -> System.out.println(Thread.currentThread().getName() + " consumer accept: " + s));
    
  • 后續(xù)的示例代碼都會以lambda和鏈?zhǔn)秸{(diào)用結(jié)合的方式給出返顺,除非特別需要說明具體類型的情況

4.2 map轉(zhuǎn)換

  • 現(xiàn)在我們來將上一節(jié)例子里的String轉(zhuǎn)成大寫字母;

  • 相信很多人的第一反應(yīng)是在subscribe方法里蔓肯,打印時調(diào)用stoUpperCase方法遂鹊;這個方式當(dāng)然可以實現(xiàn)轉(zhuǎn)換成大寫的需求;

  • 但是蔗包,從業(yè)務(wù)邏輯解耦秉扑、代碼復(fù)用的角度,我們希望不要改動到原有代碼(只擴展新邏輯、不修改原邏輯舟陆,開閉原則)误澳。consumer的邏輯保持最簡單(拿到String,顯示就是秦躯,沒有任何其他復(fù)雜邏輯)

  • 此時忆谓,我們可以使用map操作符

        Observable
                .<String>create(emitter -> {
                    System.out.println(Thread.currentThread().getName() + " start to emit");
                    emitter.onNext("Hello");
                    emitter.onNext("CodingDog1024");
                })
                .map(s -> {
                    String upperCase = s.toUpperCase();
                    System.out.println(Thread.currentThread().getName() + " map "  + s + " -> " + upperCase);
                    return upperCase;
                })
                .subscribe(s -> System.out.println(Thread.currentThread().getName() + " consumer accept: " + s));
    
  • 輸出

        main start to emit
        main map Hello -> HELLO
        main consumer accept: HELLO
        main map CodingDog1024 -> CODINGDOG1024
        main consumer accept: CODINGDOG1024
    
  • Function<T, R>
    map方法傳入的是一個Function<T, R>,泛型TR都為String踱承,為了更清楚的看下map方法倡缠,我們將map方法里的lambda恢復(fù)成匿名類實例的樣子(同時,再次感受lambda的簡潔茎活、緊湊)

        Observable
                .<String>create(emitter -> {
                    System.out.println(Thread.currentThread().getName() + " start to emit");
                    emitter.onNext("Hello");
                    emitter.onNext("CodingDog1024");
                })
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        String upperCase = s.toUpperCase();
                        System.out.println(Thread.currentThread().getName() + " map "  + s + " -> " + upperCase);
                        return upperCase;
                    }
                })
                .subscribe(s -> System.out.println(Thread.currentThread().getName() + " consumer accept: " + s));
    
  • 輸出(與上面的輸出是一樣的)

        main start to emit
        main map Hello -> HELLO
        main consumer accept: HELLO
        main map CodingDog1024 -> CODINGDOG1024
        main consumer accept: CODINGDOG1024
    

4.3 異步處理

  • 從上面的輸出可以看到昙沦,目前的邏輯都是跑在主線程的。現(xiàn)在妙色,我們假設(shè)emitter發(fā)出事件的操作是耗時操作桅滋,我們希望這個操作不要阻塞主線程
  • 此時慧耍,我們可以使用調(diào)度器(Scheduler)身辨,新增一行代碼,即可切換線程
  • Thread.sleep(500)只是為了模擬耗時芍碧、CountDownLatch只是為了測試代碼的順利執(zhí)行煌珊,對這個流程沒有任何影響,忽略即可
        CountDownLatch countDown = new CountDownLatch(2);
    
        Observable
                .<String>create(emitter -> {
                    System.out.println(Thread.currentThread().getName() + " start to emit");
                    Thread.sleep(500);
                    emitter.onNext("Hello");
                    Thread.sleep(500);
                    emitter.onNext("CodingDog1024");
                })
                .subscribeOn(Schedulers.newThread()) // 這一行是新增泌豆,其他的完全沒變
                .map(s -> {
                    String upperCase = s.toUpperCase();
                    System.out.println(Thread.currentThread().getName() + " map " + s + " -> " + upperCase);
                    return upperCase;
                })
                .subscribe(s -> {
                    System.out.println(Thread.currentThread().getName() + " consumer accept: " + s);
                    countDown.countDown();
                });
    
        countDown.await();
    
  • 輸出(可以看到定庵,已經(jīng)切換到新線程執(zhí)行)
        RxNewThreadScheduler-1 start to emit
        RxNewThreadScheduler-1 map Hello -> HELLO
        RxNewThreadScheduler-1 consumer accept: HELLO
        RxNewThreadScheduler-1 map CodingDog1024 -> CODINGDOG1024
        RxNewThreadScheduler-1 consumer accept: CODINGDOG1024
    

4.4 不同操作執(zhí)行在不同線程

  • 示例
        CountDownLatch countDown = new CountDownLatch(2);
    
        Observable
                .<String>create(emitter -> {
                    System.out.println(Thread.currentThread().getName() + " start to emit");
                    Thread.sleep(500);
                    emitter.onNext("Hello");
                    Thread.sleep(500);
                    emitter.onNext("CodingDog1024");
                })
                .subscribeOn(Schedulers.newThread()) // emit操作執(zhí)行在newThread
                .observeOn(Schedulers.computation()) // 接下去的操作(即map操作)執(zhí)行在computation
                .map(s -> {
                    String upperCase = s.toUpperCase();
                    System.out.println(Thread.currentThread().getName() + " map " + s + " -> " + upperCase);
                    return upperCase;
                })
                .observeOn(Schedulers.single()) // 接下去的操作(即consumer)執(zhí)行在single
                .subscribe(s -> {
                    System.out.println(Thread.currentThread().getName() + " consumer accept: " + s);
                    countDown.countDown();
                });
    
        countDown.await();
    
  • 輸出
    emit操作執(zhí)行在RxNewThreadScheduler-1
    map操作執(zhí)行在RxComputationThreadPool-1
    consumer執(zhí)行在RxSingleScheduler-1
        RxNewThreadScheduler-1 start to emit
        RxComputationThreadPool-1 map Hello -> HELLO
        RxSingleScheduler-1 consumer accept: HELLO
        RxComputationThreadPool-1 map CodingDog1024 -> CODINGDOG1024
        RxSingleScheduler-1 consumer accept: CODINGDOG1024
    

4.5 subscribeOn 和 observeOn

  • subscribeOn
    • subscribeOn作用于發(fā)射事件處(如上一節(jié)中的create方法),多次調(diào)用subscribeOn方法踪危,將只有離create最近的一處生效蔬浙。當(dāng)你需要提供接口給外部調(diào)用,如果想要保證發(fā)射事件代碼執(zhí)行在指定調(diào)度器贞远,則可以直接通過subscribeOn方法設(shè)置調(diào)度器畴博,接口調(diào)用方就算通過subscribeOn方法設(shè)置其他的調(diào)度器,最終結(jié)果也是在你指定的調(diào)度器里執(zhí)行蓝仲。
    • 示例
          CountDownLatch countDown = new CountDownLatch(2);
      
          Observable
                  .<String>create(emitter -> {
                      System.out.println(Thread.currentThread().getName() + " start to emit");
                      Thread.sleep(500);
                      emitter.onNext("Hello");
                      Thread.sleep(500);
                      emitter.onNext("CodingDog1024");
                  })
                  .subscribeOn(Schedulers.computation()) // 設(shè)置為computation調(diào)度器俱病,最終結(jié)果為執(zhí)行在computation調(diào)度器
                  .subscribeOn(Schedulers.io())  // 設(shè)置為io調(diào)度器
                  .map(s -> {
                      String upperCase = s.toUpperCase();
                      System.out.println(Thread.currentThread().getName() + " map " + s + " -> " + upperCase);
                      return upperCase;
                  })
                  .subscribeOn(Schedulers.newThread())  // 設(shè)置newThread調(diào)度器
                  .subscribe(s -> {
                      System.out.println(Thread.currentThread().getName() + " consumer accept: " + s);
                      countDown.countDown();
                  });
      
          countDown.await();
      
    • 輸出
          RxComputationThreadPool-1 start to emit
          RxComputationThreadPool-1 map Hello -> HELLO
          RxComputationThreadPool-1 consumer accept: HELLO
          RxComputationThreadPool-1 map CodingDog1024 -> CODINGDOG1024
          RxComputationThreadPool-1 consumer accept: CODINGDOG1024
      
  • observeOn
    • observeOn影響的是該操作符后續(xù)的事件所運行的線程,多次調(diào)用observeOn方法袱结,每次調(diào)用互不影響亮隙,可以實現(xiàn)多次切換不同線程。
    • 上一節(jié)示例代碼已經(jīng)做了使用到該特性垢夹,兩次調(diào)用observeOn溢吻,使map操作執(zhí)行在computation調(diào)度器、subscribe執(zhí)行在single調(diào)度器果元。
    • Android開發(fā)中最常用的場景為:Observable前面的操作都是在工作線程執(zhí)行(io促王、computation等調(diào)度器)掩完,一切處理邏輯執(zhí)行妥當(dāng)后,調(diào)用observeOn方法將最終的subscribe切換到UI線程執(zhí)行(AndroidSchedulers.mainThread()調(diào)度器)硼砰,從而可以在subscribe方法里更新UI且蓬。

4.6 Scheduler類型

4.6.1 RxJava內(nèi)置了5種調(diào)度器

本節(jié)內(nèi)容參考自 《RxJava Essentials 中文版》by yuxingxin

  • Schedulers.io()
    這個調(diào)度器時用于I/O操作。它基于根據(jù)需要题翰,增長或縮減來自適應(yīng)的線程池恶阴。我們將使用它來修復(fù)我們之前看到的StrictMode違規(guī)做法。由于它專用于I/O操作豹障,所以并不是RxJava的默認(rèn)方法冯事;正確的使用它是由開發(fā)者決定的。
    重點需要注意的是線程池是無限制的血公,大量的I/O調(diào)度操作將創(chuàng)建許多個線程并占用內(nèi)存昵仅。一如既往的是,我們需要在性能和簡捷兩者之間找到一個有效的平衡點累魔。
  • Schedulers.computation()
    這個是計算工作默認(rèn)的調(diào)度器摔笤,它與I/O操作無關(guān)。它也是許多RxJava方法的默認(rèn)調(diào)度器:buffer(),debounce(),delay(),interval(),sample(),skip()垦写。
  • Schedulers.immediate()
    這個調(diào)度器允許你立即在當(dāng)前線程執(zhí)行你指定的工作吕世。它是timeout(),timeInterval(),以及timestamp()方法默認(rèn)的調(diào)度器。
  • Schedulers.newThread()
    這個調(diào)度器正如它所看起來的那樣:它為指定任務(wù)啟動一個新的線程梯投。
  • Schedulers.trampoline()
    當(dāng)我們想在當(dāng)前線程執(zhí)行一個任務(wù)時命辖,并不是立即,我們可以用trampoline()將它入隊分蓖。這個調(diào)度器將會處理它的隊列并且按序運行隊列中每一個任務(wù)尔艇。它是repeat()retry()方法默認(rèn)的調(diào)度器。

4.6.2 RxAndroid額外提供了一個對應(yīng)UI主線程的調(diào)度器

RxAndroid: 'io.reactivex.rxjava2:rxandroid:2.1.1'

  • AndroidSchedulers.mainThread()
    在該調(diào)度器執(zhí)行的操作么鹤,會被封裝到Message终娃,sendUI Handler執(zhí)行。

4.6.3 自定義調(diào)度器

  • Schedulers.from(Executor executor)
    如果上述的5種內(nèi)置調(diào)度器都不能滿足需求,我們也可以傳入自己定義的Executor

5呆盖、操作符(Operators)

完整介紹見:ReactiveX Operators

Rx提供了非常非常豐富的操作符纫谅,為方便查看操作符含義,ReactiveX官網(wǎng)提供了一種操作符示意圖,下面會舉幾個例子做一下介紹。

5.1 操作符示意圖

  • create
    看圖的下半部分,橫向箭頭代表整個事件流搅荞、有顏色的圖形代表事件、豎線代表完成。


    create

以我們上面寫過的示例為例咕痛,則是:

----------------Hello------------CodingDog1024---------------|----->
  • map
    下圖顯示將原事件流里的每個數(shù)都乘以10的轉(zhuǎn)換痢甘。


    map

    以我們上面寫過的示例為例,則是:

----------------Hello------------CodingDog1024---------------|----->
                   map (s -> s.toUpperCase())
----------------HELLO -----------CODINGDOG1024---------------|----->
  • flatMap
    map操作符返回的是事件(如我們例子中的s.toUpperCase())不同茉贡,flatMap操作符返回的是Observable塞栅。但是,consumer收到的依舊是Observable內(nèi)包含的數(shù)據(jù)腔丧,因此放椰,稱為扁平化(flat)。

    flatMap

    • 可以將我們上面寫過的例子做一下改造愉粤,使用flatMap操作符達(dá)到同樣的功能:
          Observable
                  .<String>create(emitter -> {
                      System.out.println(Thread.currentThread().getName() + " start to emit");
                      emitter.onNext("Hello");
                      emitter.onNext("CodingDog1024");
                  })
                  .flatMap(s -> getUpperCase(s))
                  .subscribe(s -> {
                      System.out.println(Thread.currentThread().getName() + " consumer accept: " + s);
                  });
      
      private Observable<String> getUpperCase(String s) {
          return Observable.create(emitter -> {
              String upperCase = s.toUpperCase();
              System.out.println(Thread.currentThread().getName() + " getUpperCase " + s + " -> " + upperCase);
              emitter.onNext(upperCase);
              emitter.onComplete();
          });
      }
      
    • 有人可能會有疑問砾医,flatMap比起map明顯難理解得多,為什么要用它衣厘。一個比較常見的場景是如蚜,想象一下如果上述的getUpperCase方法是其他模塊或sdk提供的接口,我們并不清楚其內(nèi)部實現(xiàn)影暴,但是需要依賴這個接口的功能错邦,此時,就是flatMap的一個派上用場的時候坤检。
  • filter
    小于等于10的數(shù)據(jù)將被過濾掉兴猩。


    filter
    • 示例
          Observable
                  .<String>create(emitter -> {
                      System.out.println(Thread.currentThread().getName() + " start to emit");
                      emitter.onNext("Hello");
                      emitter.onNext("CodingDog1024");
                  })
                  .filter(s -> s.startsWith("C"))
                  .subscribe(s -> {
                      System.out.println(Thread.currentThread().getName() + " consumer accept: " + s);
                  });
      
    • 輸出
          main start to emit
          main consumer accept: CodingDog1024
      

以我們上面寫過的示例為例,則是:

----------------Hello------------CodingDog1024---------------|----->
                   filter(s -> s.startsWith("C"))
---------------------------------CodingDog1024---------------|----->
  • merge
    將兩個以上Observable合并一起早歇,得到的事件將是所有事件流的一個合集。
    merge
    • 示例
          Observable hello = Observable.create(emitter -> {
              System.out.println(Thread.currentThread().getName() + " hello start to emit");
              emitter.onNext("Hello");
          });
      
          Observable codingDog = Observable.create(emitter -> {
              System.out.println(Thread.currentThread().getName() + " codingDog start to emit");
              emitter.onNext("CodingDog1024");
          });
      
          Observable
                  .merge(hello, codingDog)
                  .subscribe(s -> {
                      System.out.println(Thread.currentThread().getName() + " consumer accept: " + s);
                  });
      
    • 輸出
          main hello start to emit
          main consumer accept: Hello
          main codingDog start to emit
          main consumer accept: CodingDog1024
      
----------------Hello----------------------------------------|----->
---------------------------------CodingDog1024---------------|----->
                           merge
----------------Hello------------CodingDog1024---------------|----->

5.2 Creating Observables

Operators that originate new Observables.

  • Create — create an Observable from scratch by calling observer methods programmatically
  • Defer — do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
  • Empty/Never/Throw — create Observables that have very precise and limited behavior
  • From — convert some other object or data structure into an Observable
  • Interval — create an Observable that emits a sequence of integers spaced by a particular time interval
  • Just — convert an object or a set of objects into an Observable that emits that or those objects
  • Range — create an Observable that emits a range of sequential integers
  • Repeat — create an Observable that emits a particular item or sequence of items repeatedly
  • Start — create an Observable that emits the return value of a function
  • Timer — create an Observable that emits a single item after a given delay

5.3 Transforming Observables

Operators that transform items that are emitted by an Observable.

  • Buffer — periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time
  • FlatMap — transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
  • GroupBy — divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key
  • Map — transform the items emitted by an Observable by applying a function to each item
  • Scan — apply a function to each item emitted by an Observable, sequentially, and emit each successive value
  • Window — periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time

5.4 Filtering Observables

Operators that selectively emit items from a source Observable.

  • Debounce — only emit an item from an Observable if a particular timespan has passed without it emitting another item
  • Distinct — suppress duplicate items emitted by an Observable
  • ElementAt — emit only item n emitted by an Observable
  • Filter — emit only those items from an Observable that pass a predicate test
  • First — emit only the first item, or the first item that meets a condition, from an Observable
  • IgnoreElements — do not emit any items from an Observable but mirror its termination notification
  • Last — emit only the last item emitted by an Observable
  • Sample — emit the most recent item emitted by an Observable within periodic time intervals
  • Skip — suppress the first n items emitted by an Observable
  • SkipLast — suppress the last n items emitted by an Observable
  • Take — emit only the first n items emitted by an Observable
  • TakeLast — emit only the last n items emitted by an Observable

5.5 Combining Observables

Operators that work with multiple source Observables to create a single Observable

  • And/Then/When — combine sets of items emitted by two or more Observables by means of Pattern and Plan intermediaries
  • CombineLatest — when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
  • Join — combine items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable
  • Merge — combine multiple Observables into one by merging their emissions
  • StartWith — emit a specified sequence of items before beginning to emit the items from the source Observable
  • Switch — convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables
  • Zip — combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

6讨勤、Subject = Observable + Observer

subject是一個神奇的對象箭跳,它可以是一個Observable同時也可以是一個Observer:它作為連接這兩個世界的一座橋梁。一個Subject可以訂閱一個Observable潭千,就像一個Observer谱姓,并且它可以發(fā)射新的數(shù)據(jù),或者傳遞它接受到的數(shù)據(jù)刨晴,就像一個Observable屉来。很明顯,作為一個Observable狈癞,觀察者們或者其它Subject都可以訂閱它茄靠。
RxJava提供了4種不同的Subject

  • PublishSubject
    PublishSubject會向他的訂閱者發(fā)送訂閱后的數(shù)據(jù)流。
  • BehaviorSubject
    BehaviorSubject會首先向他的訂閱者發(fā)送截至訂閱前最新的一個數(shù)據(jù)對象(或初始值)蝶桶,然后正常發(fā)送訂閱后的數(shù)據(jù)流慨绳。
  • ReplaySubject
    ReplaySubject會緩存它所訂閱的所有數(shù)據(jù),向任意一個訂閱它的觀察者重發(fā)。
  • AsyncSubject
    當(dāng)Observable完成時AsyncSubject只會發(fā)布最后一個數(shù)據(jù)給已經(jīng)訂閱的每一個觀察者脐雪。

6.1 PublishSubject

  • 示例
        PublishSubject<String> publishSubject = PublishSubject.create();
    
        Consumer<String> consumer0 = s -> System.out.println(Thread.currentThread().getName() + " consumer0 accept: " + s);
        publishSubject.subscribe(consumer0);
    
        publishSubject.onNext("Hello");
        publishSubject.onNext("CodingDog1024");
    
        Consumer<String> consumer1 = s -> System.out.println(Thread.currentThread().getName() + " consumer1 accept: " + s);
        publishSubject.subscribe(consumer1);
    
        publishSubject.onNext("I");
        publishSubject.onNext("am");
        publishSubject.onNext("RxJava");
    
  • 輸出
        main consumer0 accept: Hello
        main consumer0 accept: CodingDog1024
        main consumer0 accept: I
        main consumer1 accept: I
        main consumer0 accept: am
        main consumer1 accept: am
        main consumer0 accept: RxJava
        main consumer1 accept: RxJava
    
  • 說明
    consumer0收到完整的"Hello"厌小、"CodingDog1024""I"战秋、"am"璧亚、"RxJava"
    consumer1只收到其訂閱之后來到的"I"脂信、"am"涨岁、"RxJava"
    PublishSubject的行為就類似我們常見的addXXXListener注冊監(jiān)聽吉嚣,consumer可以接收到其訂閱之后的所有event梢薪。

6.1 BehaviorSubject

  • 示例
        BehaviorSubject<String> behaviorSubject = BehaviorSubject.create();
    
        Consumer<String> consumer0 = s -> System.out.println(Thread.currentThread().getName() + " consumer0 accept: " + s);
        behaviorSubject.subscribe(consumer0);
    
        behaviorSubject.onNext("Hello");
        behaviorSubject.onNext("CodingDog1024");
    
        Consumer<String> consumer1 = s -> System.out.println(Thread.currentThread().getName() + " consumer1 accept: " + s);
        behaviorSubject.subscribe(consumer1);
    
        behaviorSubject.onNext("I");
        behaviorSubject.onNext("am");
        behaviorSubject.onNext("RxJava");
    
  • 輸出
        main consumer0 accept: Hello
        main consumer0 accept: CodingDog1024
        main consumer1 accept: CodingDog1024
        main consumer0 accept: I
        main consumer1 accept: I
        main consumer0 accept: am
        main consumer1 accept: am
        main consumer0 accept: RxJava
        main consumer1 accept: RxJava
    
  • 說明
    consumer0收到完整的"Hello""CodingDog1024"尝哆、"I"秉撇、"am""RxJava"秋泄。
    consumer1收到"CodingDog1024"琐馆、"I""am"恒序、"RxJava"瘦麸。(比上節(jié)PublishSubject的例子,多了"CodingDog1024")
    訂閱BehaviorSubject時歧胁,consumer會先收到最新的一個event滋饲,然后再接收到之后到來的所有event
  • 使用場景
    在很常見的【先獲取一次值喊巍,執(zhí)行邏輯屠缭,然后值變化時需要重新執(zhí)行邏輯】的場景下,使用BehaviorSubject可以很自然的實現(xiàn)崭参。

7呵曹、How it works ?

基于以下RxJava版本:'io.reactivex.rxjava2:rxjava:2.2.11'

7.1 Observable.create() & subscribe()

7.1.1 示例 & 實現(xiàn)代碼走讀

  • sample

        // (1)實例化【observableOnSubscribe1】
        ObservableOnSubscribe<String> observableOnSubscribe1 = new ObservableOnSubscribe<String>() {
            // (11)第(10)步中何暮,調(diào)用了subscribe方法奄喂,入?yún)閜arent,parent為第(5)步observer的一個代理
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                // (12)調(diào)用emitter onNext方法海洼。通過parent跨新,最終轉(zhuǎn)調(diào)到第(5)步observer的onNext方法
                emitter.onNext("CodingDog1024");
            }
        };
    
        // (4)返回值為一個ObservableCreate實例
        Observable<String> observable1 = Observable.create(observableOnSubscribe1);
    
        // (5)實例化【observer1】
        Observer observer1 = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }
    
            // (13) 收到事件/數(shù)據(jù)
            @Override
            public void onNext(String s) {
                System.out.println("onNext " + s);
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println("Throwable " + e.getMessage());
            }
    
            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };
    
        // (6)訂閱
        observable1.subscribe(observer1);
    
  • 輸出

        onSubscribe
        onNext CodingDog1024
    
  • create源碼

        // 代碼出處:Observable
        // (2)source即為【observableOnSubscribe1】
        public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { 
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    
        // 代碼出處:RxJavaPlugins
        public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
            // 默認(rèn)onObservableAssembly為null,因此贰军,該方法可以視為直接return source
            Function<? super Observable, ? extends Observable> f = onObservableAssembly;
            if (f != null) {
                return apply(f, source);
            }
            return source;
        }
        
        // 代碼出處:ObservableCreate
        public final class ObservableCreate<T> extends Observable<T> {
            // source即為【observableOnSubscribe1】
            final ObservableOnSubscribe<T> source; 
        
            // (3)source即為【observableOnSubscribe1】
            public ObservableCreate(ObservableOnSubscribe<T> source) { 
                this.source = source;
            }
        
             // (9)observer即為【observer1】
            @Override
            protected void subscribeActual(Observer<? super T> observer) { 
                // CreateEmitter為observer的代理玻蝌,增加一些異常處理蟹肘,可以先直接理解為只是轉(zhuǎn)調(diào)observer的方法
                CreateEmitter<T> parent = new CreateEmitter<T>(observer); 
                observer.onSubscribe(parent);
        
                try {
                     // (10)該subscribe方法即為第(11)步的【subscribe】方法,這里就是create和subscribe的連接處8┦鳌A备埂!
                    source.subscribe(parent);
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    parent.onError(ex);
                }
            }
            ... ...
        }
    
  • subscribe源碼

         // 代碼出處:Observable
         // (7)observer即為【observer1】
        public final void subscribe(Observer<? super T> observer) {
                ... ...
                observer = RxJavaPlugins.onSubscribe(this, observer);
                subscribeActual(observer);
                ... ...
        }
        
        // 代碼出處:Observable
        // (8)observer即為【observer1】
        protected abstract void subscribeActual(Observer<? super T> observer); 
    

7.1.2 簡單概括一下

  • sample代碼

        // (1)實例化【observableOnSubscribe1】
        ObservableOnSubscribe<String> observableOnSubscribe1 = new ObservableOnSubscribe<String>() {
            // (11)第(10)步中许饿,調(diào)用了subscribe方法阳欲,入?yún)閜arent,parent為第(5)步observer的一個代理
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                // (12)調(diào)用emitter onNext方法陋率。通過parent球化,最終轉(zhuǎn)調(diào)到第(5)步observer的onNext方法
                emitter.onNext("CodingDog1024");
            }
        };
    
        // (4)返回值為一個ObservableCreate實例
        Observable<String> observable1 = Observable.create(observableOnSubscribe1);
    
        // (5)實例化【observer1】
        Observer observer1 = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }
    
            // (13) 收到事件/數(shù)據(jù)
            @Override
            public void onNext(String s) {
                System.out.println("onNext " + s);
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println("Throwable " + e.getMessage());
            }
    
            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };
    
        // (6)訂閱
        observable1.subscribe(observer1);
    
  • 文字概括

    1. Observable.create方法入?yún)橐粋€ObservableOnSubscribe實例observableOnSubscribe1,返回值為一個ObservableCreate實例observable1瓦糟,observable1持有observableOnSubscribe1source
    2. 以入?yún)?code>Observer實例observer1調(diào)用subscribe方法時筒愚,實際調(diào)用的就是ObservableCreatesubscribe方法,接著調(diào)到ObservableCreatesubscribeActual(Observer observer)方法
    3. subscribeActual方法里new了一個CreateEmitter實例parent菩浙,parent持有上述observer1
    4. subscribeActual方法里執(zhí)行source.subscribe(parent)巢掺,這個source即為第1步的入?yún)?code>source,因此劲蜻,此處就是執(zhí)行了observableOnSubscribe1subscribe方法
    5. observableOnSubscribe1subscribe方法執(zhí)行了
              emitter.onNext("CodingDog1024");
      
    6. 也就是執(zhí)行了第3步中CreateEmitter實例parentonNext方法
    7. CreateEmitter的核心邏輯是調(diào)用持有的Observer實例observer的對應(yīng)方法陆淀,即調(diào)用第2步入?yún)?code>Observer實例observer1onNext方法,即打印log的方法
          @Override
          public void onNext(String s) {
              System.out.println("onNext " + s);
          }
      
  • 提取最核心邏輯先嬉,簡化理解
    前一段總共列了7步轧苫,出現(xiàn)了數(shù)量眾多的類,理解起來稍微有點復(fù)雜疫蔓。我們做一下簡化含懊,提取最核心邏輯,上述分析中鳄袍,ObservableCreateObservable描述的是同一個東西绢要,我們統(tǒng)一視為ObservableCreateEmitter對象parent其實就是Observer對象observer的一個代理拗小,最終調(diào)用的是observer的方法,我們暫時忽略代理類邏輯樱哼,直接將兩者統(tǒng)一視為observer哀九;這么合并之后,前一段的分析就變成了:

    1. Observable.create方法入?yún)?code>observableOnSubscribe1(或者稱為source)搅幅,返回值為observable阅束,observable持有source
    2. observable.subscribe(observer),最終會調(diào)用到sourcesubscribe方法(入?yún)?code>observer)
    3. sourcesubscribe方法里茄唐,執(zhí)行了observeronNext方法
Observable create subscribe

7.2 Observable.map()

7.2.1 示例 & 實現(xiàn)代碼走讀

  • sample
    在上一節(jié)例子基礎(chǔ)上息裸,修改3處蝇更,見下方修改點1 2 3。

        // (1)實例化【observableOnSubscribe1】
        ObservableOnSubscribe<String> observableOnSubscribe1 = new ObservableOnSubscribe<String>() {
            // (11)第(10)步中呼盆,調(diào)用了subscribe方法年扩,入?yún)閜arent,parent為第(5)步observer的一個代理
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                // (12)調(diào)用emitter onNext方法访圃。通過parent厨幻,最終轉(zhuǎn)調(diào)到第(5)步observer的onNext方法
                emitter.onNext("CodingDog1024");
            }
        };
    
        // (4)返回值為一個ObservableCreate實例
        Observable<String> observable1 = Observable.create(observableOnSubscribe1);
    
        // 修改點1:新增將String轉(zhuǎn)成大寫的mapper
        Function<String, String> mapper1 = new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                return s.toUpperCase();
            }
        };
    
        // 修改點2:observable1應(yīng)用mapper得到新的Observable實例observable2,具體類型為ObservableMap
        Observable<String> observable2 = observable1.map(mapper1);
    
        // (5)實例化【observer1】
        Observer observer1 = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }
    
            // (13) 收到事件/數(shù)據(jù)
            @Override
            public void onNext(String s) {
                System.out.println("onNext " + s);
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println("Throwable " + e.getMessage());
            }
    
            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };
    
        // 修改點3:訂閱的是應(yīng)用了mapper后的observable2
        // (6)訂閱
        observable2.subscribe(observer1);
    
  • 輸出

        onSubscribe
        onNext CODINGDOG1024
    
  • map源碼

       // 代碼出處:Observable
        public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
            ObjectHelper.requireNonNull(mapper, "mapper is null");
            // observable1上調(diào)用的map方法腿时,因此况脆,this就是observable1,mapper就是mapper1
            return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
        }
    
        // 代碼出處:ObservableMap
        public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
            final Function<? super T, ? extends U> function;
        
             // source就是observable1批糟,function就是mapper1
            public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
                super(source);
                this.function = function;
            }
        
            @Override
            public void subscribeActual(Observer<? super U> t) {
                // source就是observable1格了,function就是mapper1,t就是observer1
                source.subscribe(new MapObserver<T, U>(t, function));
            }
            ... ...
        }
    
        // 代碼出處:ObservableMap
        static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
            final Function<? super T, ? extends U> mapper;
    
            // actual就是observer1徽鼎,mapper就是mapper1
            MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
                // super方法盛末,將actual賦給downstream,也就是說:
                // downstream就是observer1
                super(actual);
                this.mapper = mapper;
            }
    
            @Override
            public void onNext(T t) {
                ... ...
                U v;
                ... ...
                    // 調(diào)用mapper.apply纬傲,將上游的輸入 t 轉(zhuǎn)為 v 
                    v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
                ... ...
                // 調(diào)用下游的onNext方法满败,入?yún)?v 
                downstream.onNext(v);
            }
            ... ...
        }
    
  • 分析


    Observable create map subscribe

7.3 Observable.filter()

7.3.1 示例 & 實現(xiàn)代碼走讀

  • sample

            ObservableOnSubscribe<String> observableOnSubscribe1 = new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("Hello");
                    emitter.onNext("CodingDog1024");
                }
            };
    
            // (4)返回值為一個ObservableCreate實例
            Observable<String> observable1 = Observable.create(observableOnSubscribe1);
    
            Predicate<String> predicate1 = new Predicate<String>() {
                @Override
                public boolean test(String s) throws Exception {
                    return s.startsWith("C");
                }
            };
    
            // observable2具體類型為ObservableFilter,內(nèi)部只有predicate1
            Observable<String> observable2 = observable1.filter(predicate1);
    
            Observer observer1 = new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("onSubscribe");
                }
    
                @Override
                public void onNext(String s) {
                    System.out.println("onNext " + s);
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("Throwable " + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
            };
    
            observable2.subscribe(observer1);
    
  • 輸出

        onSubscribe
        onNext CodingDog1024
    
  • filter源碼

        // 代碼出處:Observable
        public final Observable<T> filter(Predicate<? super T> predicate) {
            ObjectHelper.requireNonNull(predicate, "predicate is null");
            // observable1上調(diào)用的filter方法叹括,因此算墨,this就是observable1,predicate就是predicate1
            return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
        }
    
        // 代碼出處:ObservableFilter
        public final class ObservableFilter<T> extends AbstractObservableWithUpstream<T, T> {
            final Predicate<? super T> predicate;
    
            // source就是observable1汁雷,predicate就是predicate1
            public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) {
                super(source);
                this.predicate = predicate;
            }
        
            @Override
            public void subscribeActual(Observer<? super T> observer) {
                // source就是observable1净嘀,predicate就是predicate1,observer就是observer1
                source.subscribe(new FilterObserver<T>(observer, predicate));
            }
            ... ...
        }
    
        // 代碼出處:ObservableFilter
        static final class FilterObserver<T> extends BasicFuseableObserver<T, T> {
            final Predicate<? super T> filter;
    
            // actual就是observer1侠讯,filter就是predicate1
            FilterObserver(Observer<? super T> actual, Predicate<? super T> filter) {
                // super方法挖藏,將actual賦給downstream,也就是說:
                // downstream就是observer1
                super(actual);
                this.filter = filter;
            }
    
            @Override
            public void onNext(T t) {
                    ... ...
                    boolean b;
                    ... ...
                        // 調(diào)用filter.test方法厢漩,入?yún)樯嫌蔚妮斎?t 
                        b = filter.test(t);
                    ... ...
                    // filter.test結(jié)果為true膜眠,才調(diào)用下游的onNext方法,入?yún)?t
                    if (b) {
                        downstream.onNext(t);
                    }
                    ... ...
            }
            ... ...
        }
    
  • 分析


    Observable create filter subscribe
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末溜嗜,一起剝皮案震驚了整個濱河市宵膨,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌炸宵,老刑警劉巖辟躏,帶你破解...
    沈念sama閱讀 222,865評論 6 518
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異土全,居然都是意外死亡捎琐,警方通過查閱死者的電腦和手機会涎,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,296評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來瑞凑,“玉大人末秃,你說我怎么就攤上這事〔η” “怎么了蛔溃?”我有些...
    開封第一講書人閱讀 169,631評論 0 364
  • 文/不壞的土叔 我叫張陵,是天一觀的道長篱蝇。 經(jīng)常有香客問我贺待,道長,這世上最難降的妖魔是什么零截? 我笑而不...
    開封第一講書人閱讀 60,199評論 1 300
  • 正文 為了忘掉前任麸塞,我火速辦了婚禮,結(jié)果婚禮上涧衙,老公的妹妹穿的比我還像新娘哪工。我一直安慰自己,他們只是感情好弧哎,可當(dāng)我...
    茶點故事閱讀 69,196評論 6 398
  • 文/花漫 我一把揭開白布雁比。 她就那樣靜靜地躺著,像睡著了一般撤嫩。 火紅的嫁衣襯著肌膚如雪偎捎。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,793評論 1 314
  • 那天序攘,我揣著相機與錄音茴她,去河邊找鬼。 笑死程奠,一個胖子當(dāng)著我的面吹牛丈牢,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播瞄沙,決...
    沈念sama閱讀 41,221評論 3 423
  • 文/蒼蘭香墨 我猛地睜開眼己沛,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了距境?” 一聲冷哼從身側(cè)響起泛粹,我...
    開封第一講書人閱讀 40,174評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎肮疗,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體扒接,經(jīng)...
    沈念sama閱讀 46,699評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡伪货,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,770評論 3 343
  • 正文 我和宋清朗相戀三年们衙,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片碱呼。...
    茶點故事閱讀 40,918評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡蒙挑,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出愚臀,到底是詐尸還是另有隱情忆蚀,我是刑警寧澤,帶...
    沈念sama閱讀 36,573評論 5 351
  • 正文 年R本政府宣布姑裂,位于F島的核電站馋袜,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏舶斧。R本人自食惡果不足惜欣鳖,卻給世界環(huán)境...
    茶點故事閱讀 42,255評論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望茴厉。 院中可真熱鬧泽台,春花似錦、人聲如沸矾缓。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,749評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽嗜闻。三九已至蜕依,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間泞辐,已是汗流浹背笔横。 一陣腳步聲響...
    開封第一講書人閱讀 33,862評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留咐吼,地道東北人吹缔。 一個月前我還...
    沈念sama閱讀 49,364評論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像锯茄,于是被迫代替她去往敵國和親厢塘。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,926評論 2 361