動(dòng)腦學(xué)院Rxjava預(yù)習(xí)資料 RxJava2 響應(yīng)式編程介紹

RxJava2 響應(yīng)式編程介紹

響應(yīng)式編程

基本概念

  • 同步/異步: 關(guān)注的是消息通信機(jī)制颂鸿,同步是指 發(fā)出一個(gè)調(diào)用净捅,在沒有得到結(jié)果之前,該調(diào)用就不返回拣技,但是一旦調(diào)用返回,就得到返回值了; 而異步是指 調(diào)用發(fā)出后藕届,調(diào)用直接返回本刽,但不會(huì)立刻得到調(diào)用的結(jié)果鲸湃。而是在調(diào)用發(fā)出后,被調(diào)用者通過狀態(tài)子寓、通知來通知調(diào)用者暗挑,或通過回調(diào)函數(shù)處理這個(gè)調(diào)用; 異步強(qiáng)調(diào)被動(dòng)通知别瞭。

  • 阻塞/非阻塞:關(guān)注的是程序在等待調(diào)用結(jié)果(消息窿祥,返回值)時(shí)的狀態(tài);阻塞調(diào)用是指調(diào)用結(jié)果返回之前蝙寨,當(dāng)前線程會(huì)被掛起晒衩。調(diào)用線程只有在得到結(jié)果之后才會(huì)返回。 非阻塞調(diào)用指在不能立刻得到結(jié)果之前墙歪,該調(diào)用不會(huì)阻塞當(dāng)前線程听系。如java8 stream為阻塞式,F(xiàn)uture為非阻塞式的虹菲; 非阻塞強(qiáng)調(diào)狀態(tài)主動(dòng)輪詢靠胜。

  • 并發(fā)(Concurrency)與并行(parallelism): 并發(fā)是一個(gè)更通用的概念,兩個(gè)并發(fā)線程被調(diào)度在一個(gè)單核處理上運(yùn)行毕源,這就是并發(fā)性浪漠,而非并行. 并行性更可能出現(xiàn)在多核,多CPU或分布式系統(tǒng)上霎褐。編寫代碼時(shí)一般不區(qū)分兩者址愿。

  • 函數(shù)式編程Functional Programming):函數(shù)式編程將程序描述為表達(dá)式和變換,以數(shù)學(xué)方程的形式建立模型冻璃,并盡量避免可變的狀態(tài)响谓。每個(gè)邏輯分類(filter,map损合,reduce等)都由不同函數(shù)所表示,這些實(shí)現(xiàn)底層次的變換娘纷,而用戶定義的高階函數(shù)作為參數(shù)來實(shí)現(xiàn)真正的業(yè)務(wù)嫁审。

  • 函數(shù)響應(yīng)式編程(Functional Reactive Programming)):響應(yīng)式編程是建立在觀者者模式上的一種編程范式,對(duì)異步數(shù)據(jù)流進(jìn)行編程赖晶,同時(shí)該事件流是按時(shí)間排序的序列(不同于java8中的stream)律适;雖然響應(yīng)式編程框架不一定要求是函數(shù)式的,但是RxJava等響應(yīng)式編程框架都是結(jié)合了函數(shù)式編程的嬉探。

  • 其它名詞: https://github.com/reactivemanifesto/reactivemanifesto/blob/master/glossary.zh-cn.md

響應(yīng)式宣言

響應(yīng)式宣言,Reactive Manifesto: 來自不同領(lǐng)域的組織正在不約而同地發(fā)現(xiàn)一些看起來如出一轍的軟件構(gòu)建模式擦耀。它們的系統(tǒng)更加穩(wěn)健,更加有可回復(fù)性涩堤,更加靈活眷蜓,并且以更好的定位來滿足現(xiàn)代的需求。

響應(yīng)式宣言針對(duì)一個(gè)系統(tǒng)而言胎围,并不等同于 響應(yīng)式編程規(guī)范吁系,響應(yīng)式系統(tǒng)應(yīng)該滿足如下特點(diǎn):

  • 反應(yīng)靈敏的[Responsive]:只要有可能,系統(tǒng)就會(huì)及時(shí)響應(yīng)白魂。
  • 有回復(fù)性的[Resilient]:系統(tǒng)在面臨故障時(shí)也能保持及時(shí)響應(yīng)汽纤。
  • 可伸縮的[Elastic]:系統(tǒng)在變化的工作負(fù)載下保持及時(shí)響應(yīng)。
  • 消息驅(qū)動(dòng)的[Message Driven]:響應(yīng)式系統(tǒng)依賴異步消息傳遞來建立組件之間的界限福荸,這一界限確保了松耦合蕴坪,隔離,位置透明性等特性的實(shí)現(xiàn)敬锐,還提供了以消息的形式把故障委派出去的手段背传。
QQ截圖20180525165803.png

響應(yīng)式擴(kuò)展

  • 響應(yīng)式擴(kuò)展(Reactive Extensions, 簡寫為ReactiveX,Rx),最初是LINQ的一個(gè)擴(kuò)展台夺,由微軟的架構(gòu)師Erik Meijer領(lǐng)導(dǎo)的團(tuán)隊(duì)開發(fā)径玖,在2012年11月開源[1]

  • ReactiveX = Observer Pattern + Iterator Pattern + Functional Programming颤介。Rx 讓開發(fā)者可以利用可觀察序列和LINQ風(fēng)格查詢操作符來編寫異步和基于事件的程序;

  • ReactiveX是一個(gè)編程模型梳星,目標(biāo)是提供一致的編程接口,幫助開發(fā)者更方便的處理異步數(shù)據(jù)流滚朵,ReactiveX的思想是跨平臺(tái)的冤灾,學(xué)習(xí)其他語言的基本語法之后,可以做到 “l(fā)earn once, write everywhere”;

  • Rx近幾年越來越流行了辕近,現(xiàn)在已經(jīng)支持幾乎全部的流行編程語言了韵吨,Rx的大部分語言庫由ReactiveX這個(gè)組織負(fù)責(zé)維護(hù),比較流行的有RxJava/RxJS/Rx.NET亏推,社區(qū)網(wǎng)站是 reactivex.io学赛。

  • RxJava 是 ReactiveX 規(guī)范的JVM平臺(tái)實(shí)現(xiàn);

響應(yīng)式流規(guī)范

  • Reactive Streams 規(guī)范提供一個(gè)非堵塞的異步流處理的背壓(backpressure)標(biāo)準(zhǔn);Reactive Streams的目標(biāo)是增加抽象層吞杭,而不是進(jìn)行底層的流處理盏浇,規(guī)范將這些問題留給了庫實(shí)現(xiàn)來解決。

  • 對(duì)于JVM芽狗,目前已經(jīng)有多個(gè)庫實(shí)現(xiàn)該標(biāo)準(zhǔn)绢掰,RxJava2, akka-streams(Playing 有使用),Reactor(Spring Reactive有使用) 等;

  • 統(tǒng)一標(biāo)準(zhǔn)的好處就是 各個(gè)實(shí)現(xiàn)產(chǎn)生的數(shù)據(jù)可以方便的轉(zhuǎn)換和消費(fèi)童擎;

示例

    Path filePath = Paths.get("build.gradle");
    // RxJava2 to Reactor
    Flowable<String> flowable = Flowable
            .fromCallable(() -> Files.readAllLines(filePath))
            .flatMap(x -> Flowable.fromIterable(x));
    Flux.from(flowable).count().subscribe(System.out::println);

    // Reactor to RxJava2
    try
    {
        Flux<String> flux = Flux.fromIterable(Files.readAllLines(filePath));
        Flowable.fromPublisher(flux).count()
                .subscribe(System.out::println);
    }
    catch (IOException e)
    {
        e.printStackTrace();
    }
  • Reactive Streams JVM接口由以下四個(gè)interface 組成:

    • Publisher : 消息發(fā)布者

    • Subscriber : 消息訂閱者

    • Subscription : 一個(gè)訂閱

    • Processor : Publisher + Subscriber 的結(jié)合體

  • Reactive Streams 規(guī)范主要目標(biāo)

    • 通過異步邊界(Asynchronous Boundary)來解耦系統(tǒng)組件滴劲。 解偶的先決條件,分離事件/數(shù)據(jù)流的發(fā)送方和接收方的資源使用;
    • 為背壓( back pressure ) 處理定義一種模型顾复。流處理的理想范式是將數(shù)據(jù)從發(fā)布者推送到訂閱者班挖,這樣發(fā)布者就可以快速發(fā)布數(shù)據(jù),同時(shí)通過壓力處理來確保速度更快的發(fā)布者不會(huì)對(duì)速度較慢的訂閱者造成過載芯砸。背壓處理通過使用流控制來確保操作的穩(wěn)定性并能實(shí)現(xiàn)優(yōu)雅降級(jí)萧芙,從而提供彈性能力。
  • 該規(guī)范將包含在 JDK 9 的java.util.concurrent.Flow 類中假丧,包含四個(gè)接口類双揪。

RxJava 基礎(chǔ)

RxJava現(xiàn)狀

  • RxJava 項(xiàng)目地址 https://github.com/ReactiveX/RxJava
  • RxJava 1.x 先于 Reactive Streams 規(guī)范出現(xiàn),部分接口支持Reactive Streams 規(guī)范;
  • RxJava 2.0 于 2016.10.29 正式發(fā)布包帚,已經(jīng)按照Reactive-Streams 規(guī)范完全的重寫, 基于Java8+;
  • RxJava 2.0已經(jīng)獨(dú)立于RxJava 1.x而存在渔期,即 RxJava2(io.reactivex.) 使用與RxJava1(rx.) 不同的包名。
  • RxJava 目前在Android 開發(fā)上應(yīng)用較多渴邦;

為什么選擇 RxJava2

與其它編程模式/庫相比

  • Rx擴(kuò)展了觀察者模式用于支持?jǐn)?shù)據(jù)和事件序列栽烂,通過一些的操作符號(hào)箭养,統(tǒng)一風(fēng)格,用戶無需關(guān)注底層的實(shí)現(xiàn):如線程、同步购城、線程安全、并發(fā)數(shù)據(jù)結(jié)構(gòu)和非阻塞IO兜辞;
單個(gè)數(shù)據(jù) 多個(gè)數(shù)據(jù)
同步 T getData() Iterable<T> getData()
異步 Future<T> getData() Observable<T> getData()
  • 對(duì)于單條數(shù)據(jù)可以選擇Future 模式蜻懦,但是多條異步數(shù)據(jù)組合,F(xiàn)uture就相對(duì)不方便纤垂;

  • 對(duì)于同步的多條數(shù)據(jù)矾策,Observable/Flowable 和 Java8 Stream 都要比Iterable更加方便。

  • 對(duì)比 Java 8 Stream,都是屬于函數(shù)式編程(Monad)峭沦,Stream主要是對(duì)數(shù)據(jù)集的阻塞處理贾虽, 而Rx 是非阻塞的,RxJava 提供更加豐富的操作符吼鱼。

  • 對(duì)比 Java8 Stream, Rx 中的數(shù)據(jù)流/事件流蓬豁,擴(kuò)展到了時(shí)間維度绰咽,在時(shí)間維度上處理發(fā)射的數(shù)據(jù),所以很多操作符都是與時(shí)間有關(guān)的地粪。

  • 對(duì)比 Reactor-core, 都遵循響應(yīng)式流規(guī)范取募,Reactor(Flux)和RxJava2(Flowable)可以相互轉(zhuǎn)換, Reactor更多的依賴java8的函數(shù)式接口,RxJava2 所有函數(shù)式接口都提供異常拋出蟆技,在寫代碼時(shí)更加便利玩敏。

  • RxJava 函數(shù)式風(fēng)格,簡化代碼(Rx的操作符通通常可以將復(fù)雜的難題簡化為很少的幾行代碼), 異步錯(cuò)誤處理,輕松使用并發(fā);

RxJava 1 vs RxJava 2

  • RxJava 2x 不再支持 null 值质礼,如果傳入一個(gè)null會(huì)拋出 NullPointerException
    Observable.just(null);
    Single.just(null);
    Flowable.just(null);
    Maybe.just(null);
    Observable.fromCallable(() -> null)
            .subscribe(System.out::println, Throwable::printStackTrace);
    Observable.just(1).map(v -> null)
            .subscribe(System.out::println, Throwable::printStackTrace);
  • RxJava2 所有的函數(shù)接口(Function/Action/Consumer)均設(shè)計(jì)為可拋出Exception旺聚,解決編譯異常需要轉(zhuǎn)換問題。

  • RxJava1 中Observable不能很好支持背壓眶蕉,在RxJava2 中將Oberservable實(shí)現(xiàn)成不支持背壓砰粹,而新增Flowable 來支持背壓。

  • 詳細(xì)參考請(qǐng)參考[5].

RxJava2中的響應(yīng)式類

RxJava2 主要類關(guān)系圖

如下圖所示妻坝,為RxJava2中的主要類關(guān)系圖伸眶,可清晰知道各響應(yīng)式類的聯(lián)系和區(qū)別。后面無特別說明均以Flowable說明刽宪。
Publisher-Subscriber-class-relation.png

Flowable & Observable

  • Observable: 不支持背壓厘贼;

  • Flowable : Observable新的實(shí)現(xiàn),支持背壓圣拄,同時(shí)實(shí)現(xiàn)Reactive Streams 的 Publisher 接口嘴秸。

  • 什么時(shí)候用 Observable:

    • 一般處理最大不超過1000條數(shù)據(jù),并且?guī)缀醪粫?huì)出現(xiàn)內(nèi)存溢出庇谆;
    • 如果式GUI 鼠標(biāo)事件岳掐,頻率不超過1000 Hz,基本上不會(huì)背壓(可以結(jié)合 sampling/debouncing 操作);
    • 如果處理的式同步流而你的Java平臺(tái)又不支持Java Stream(如果有異常處理饭耳,Observable 比Stream也更適合);
  • 什么時(shí)候用 Flowable:

    • 處理以某種方式產(chǎn)生超過10K的元素串述;
    • 文件讀取與分析,例如 讀取指定行數(shù)的請(qǐng)求寞肖;
    • 通過JDBC 讀取數(shù)據(jù)庫記錄纲酗, 也是一個(gè)阻塞的和基于拉取模式,并且由ResultSet.next() 控制新蟆;
    • 網(wǎng)絡(luò)IO流;
    • 有很多的阻塞和/或 基于拉取的數(shù)據(jù)源觅赊,但是又想得到一個(gè)響應(yīng)式非阻塞接口的。

Single & Completable & Maybe

  • Single: 可以發(fā)射一個(gè)單獨(dú)onSuccess 或 onError消息琼稻。它現(xiàn)在按照Reactive-Streams規(guī)范被重新設(shè)計(jì),并遵循協(xié)議 onSubscribe (onSuccess | onError)? .SingleObserver改成了如下的接口;
interface SingleObserver<T> {
    void onSubscribe(Disposable d);
    void onSuccess(T value);
    void onError(Throwable error);
}
  • Completable: 可以發(fā)送一個(gè)單獨(dú)的成功或異常的信號(hào)吮螺,按照Reactive-Streams規(guī)范被重新設(shè)計(jì),并遵循協(xié)議onSubscribe (onComplete | onError)?
    Completable.create(new CompletableOnSubscribe()
    {
        @Override
        public void subscribe(CompletableEmitter e) throws Exception
        {
            Path filePath = Paths.get("build.gradle");
            Files.readAllLines(filePath);
            e.onComplete();
        }
    }).subscribe(() -> System.out.println("OK!"),
            Throwable::printStackTrace);
  • Maybe:從概念上來說,它是Single 和 Completable 的結(jié)合體。它可以發(fā)射0個(gè)或1個(gè)通知或錯(cuò)誤的信號(hào), 遵循協(xié)議 onSubscribe (onSuccess | onError | onComplete)?鸠补。
Maybe.just(1)
        .map(v -> v + 1)
        .filter(v -> v == 1)
        .defaultIfEmpty(2)
        .test()
        .assertResult(21);
//        java.lang.AssertionError: Values at position 0 differ; Expected: 21 (class: Integer), Actual: 2 (class: Integer) (latch = 0, values = 1, errors = 0, completions = 1)
//
//        at io.reactivex.observers.BaseTestConsumer.fail(BaseTestConsumer.java:133)
//        ....

RxJava2 的主要操作

我們已經(jīng)知道 RxJava主要特性為為一個(gè)擴(kuò)展的觀察者模式萝风、流式操作和異步編程,支持ReactiveX 規(guī)范給出的一些操作紫岩, 同時(shí)RxJava2 符合響應(yīng)式流規(guī)范闹丐,接下來以Flowable為例,按照功能分類講解RxJava2中的重要操作[9];

創(chuàng)建一個(gè)Flowable

  • fromArray & fromIterable & just,直接從數(shù)組或迭代器中產(chǎn)生被因;
    List<String> list = Arrays.asList(
            "blue", "red", "green", "yellow", "orange", "cyan", "purple"
    );
    Flowable.fromIterable(list).skip(2).subscribe(System.out::println);
    Flowable.fromArray(list.toArray()).subscribe(System.out::println);
    Flowable.just("blue").subscribe(System.out::println);
  • fromFuture & fromCallable:

    fromFuture, 事件從非主線程中產(chǎn)生; fromCallable, 事件從主線程中產(chǎn)生衫仑, 在需要消費(fèi)時(shí)生產(chǎn)梨与;

    ExecutorService executor = Executors.newFixedThreadPool(2);
    System.out.println("MAIN: " + Thread.currentThread().getId());
    Callable<String> callable = () -> {
        System.out.println("callable [" + Thread.currentThread().getId() + "]: ");
        Path filePath = Paths.get("build.gradle");
        return Files.readAllLines(filePath).stream().flatMap(s -> Arrays.stream(s.split
                (""))).count() + "";
    };

    Future<String> future = executor.submit(callable);

    Consumer<String> onNext = v -> System.out
            .println("consumer[" + Thread.currentThread().getId() + "]:" + v);

    Flowable.fromCallable(callable).subscribe(onNext);
    Flowable.fromFuture(future).subscribe(onNext);
    System.out.println("END");
  • fromPublisher ,從標(biāo)準(zhǔn)(Reactive Streams)的發(fā)布者中產(chǎn)生文狱;

  • 自定義創(chuàng)建(generate & create)

    下面以斐波那契數(shù)列產(chǎn)生為例說明 generate & create的使用粥鞋, generate為RxJava2新增的創(chuàng)建方式。

    class Fib
    {
        long a;
        long b;

        public Fib(long a, long b)
        {
            this.a = a;
            this.b = b;
        }

        public long fib()
        {
            return a + b;
        }
    }

    //斐波那契數(shù)列
    Flowable.create(new FlowableOnSubscribe<Fib>()
    {
        @Override
        public void subscribe(FlowableEmitter<Fib> e) throws Exception
        {
            Fib start = new Fib(1L, 1L);

            while (!e.isCancelled()) {
                e.onNext(start);
                start = new Fib(start.b, start.fib());
            }
            e.onComplete();
        }
    }, BackpressureStrategy.BUFFER).map(x -> x.fib()).take(10).subscribe(System.out::println);

    Flowable.generate(() -> new Fib(1L, 1L), (x, y) -> {
        Fib fib = new Fib(x.b, x.fib());
        y.onNext(fib);
        return fib;
    }).ofType(Fib.class).map(x -> x.fib()).take(10).subscribe(System.out::println);
  • amb & concat & merge, 由多個(gè)Flowable產(chǎn)生結(jié)合;

    • amb: 給定兩個(gè)或多個(gè)Flowable瞄崇,只發(fā)射最先發(fā)射數(shù)據(jù)的Flowable呻粹,如下面示例中的f1被發(fā)射;

    • concat: 給定多個(gè)Flowable苏研, 按照Flowable數(shù)組順序,依次發(fā)射數(shù)據(jù)等浊,不會(huì)交錯(cuò),下面示例中f1,f2中數(shù)據(jù)依次發(fā)射;

    • merge: 給定多個(gè)Flowable摹蘑, 按照Flowable數(shù)組中數(shù)據(jù)發(fā)射的順序組合成新的Flowable筹燕,各Flowable數(shù)據(jù)可能會(huì)交錯(cuò)(等價(jià)于轉(zhuǎn)換操作中的flatMap);

    • switchOnNext:給定能發(fā)射多個(gè)Flowable的Flowable,順序發(fā)射各子Flowable,最新發(fā)射的子Flowable覆蓋當(dāng)前子Flowable中還未發(fā)射的元素(由switchMap實(shí)現(xiàn))衅鹿。

      zip

    Flowable<String> f1 = Flowable.intervalRange(1, 10, 1, 1, TimeUnit.SECONDS).map(index -> "f1-" + index);
    Flowable<String> f2 = Flowable.intervalRange(1, 3, 2, 2, TimeUnit.SECONDS).map(index -> "f2-" + index);

    Flowable.ambArray(f1, f2).map(x -> "amb: " + x).subscribe(System.out::println);
    System.out.println("----------concat-----------");
    Flowable.concat(f1, f2).map(x -> "concat: " + x).subscribe(System.out::println);

    System.out.println("----------merge-----------");
    Flowable.merge(f1, f2).map(x -> "merge: " + x).subscribe(System.out::println);

    Flowable<String>[] flowables = new Flowable[]{f1, f2};
    Flowable.switchOnNext(Flowable.intervalRange(0, 2, 0, 3, TimeUnit.SECONDS).map(i -> flowables[i.intValue()]))
            .map(x -> "switchOnNext-" + x).subscribe(System.out::println);
    Flowable.intervalRange(0, 2, 0, 3, TimeUnit.SECONDS).map(i -> flowables[i.intValue()])
            .switchMap((io.reactivex.functions.Function) Functions.identity())
            .map(x -> "switchMap-" + x).subscribe(System.out::println);
  • zip & combineLatest, 多Flowable中元素結(jié)合變換

    • zip :每個(gè)Flowable中的元素都按順序結(jié)合變換撒踪,直到元素最少Flowable的已經(jīng)發(fā)射完畢;

      zip

    • combineLatest: 每個(gè)Flowable中的發(fā)射的元素都與其他Flowable最近發(fā)射的元素結(jié)合變換大渤,知道所有的Flowable的元素發(fā)射完畢制妄;

      zip

轉(zhuǎn)換、過濾與聚合操作

在Java8中Stream也有包含這些功能的操作泵三,由于多了時(shí)間這個(gè)維度耕捞,在 RxJava 中操作相對(duì)更加豐富。 這里主要介紹一些重點(diǎn)操作切黔。

  • buffer & groupBy & window

    buffer 和 window 都可以按時(shí)間或者元素?cái)?shù)量窗口砸脊,buffer是直接轉(zhuǎn)換成元素集,window是將元素集轉(zhuǎn)換成另一個(gè)Flowable纬霞, groupBy,按照key 來分組凌埂,需要元素發(fā)射完成才能消費(fèi),如果只是對(duì)數(shù)據(jù)處理使用Java8 groupBy更方便诗芜;

    Flowable<String> f1 = Flowable.intervalRange(1, 10, 1, 1, TimeUnit.SECONDS).delay((t) ->
            Flowable.timer(t % 3 + new Random().nextLong() % 3, TimeUnit.SECONDS))
            .map(index -> index % 3 + "-f1-" + index);
    f1.buffer(5, TimeUnit.SECONDS).map(x -> "buffer-" + x).subscribe(System.out::println);

    f1.window(5, TimeUnit.SECONDS).map(x -> x.toList())
            .subscribe(x -> x.subscribe(System.out::println));

    Disposable b = f1.groupBy((x) -> x.split("-", 2)[0])
            .subscribe(x -> x.toList().subscribe(System.out::println));
    Map<String, List<String>> map = f1.toList().blockingGet().stream()
                .collect(Collectors.groupingBy((x) -> x.split
                        ("-", 2)[0]));
    System.out.println(map);

    while (!b.isDisposed()) {
    }
  • debounce & throttleFirst & sample 按照時(shí)間區(qū)間采集數(shù)據(jù)

debounce 防抖動(dòng)瞳抓,元素發(fā)射后在設(shè)定的超時(shí)時(shí)間內(nèi)沒有其它元素發(fā)射埃疫,則將此元素用于后續(xù)處理, 在前端APP應(yīng)用較多孩哑。 如果是空間上的防抖動(dòng)可以利用distinctUntilChanged操作符栓霜。
debounce

throttle 限流操作,對(duì)于 throttleFirst是 取發(fā)射后元素横蜒,經(jīng)過間隔時(shí)間后的第一個(gè)元素進(jìn)行發(fā)射胳蛮。
throttleFirst

sample 數(shù)據(jù)采樣, 對(duì)于源數(shù)據(jù),發(fā)射間隔時(shí)間內(nèi)的最后出現(xiàn)的元素丛晌。
sample
  • take & skip & first & emlmentAt,精確獲取數(shù)據(jù)(集)

    take, 類似java8 limit 操作仅炊,但是這里支持更多的操作(take/takeLast/takeUntil/takeWhen),同時(shí)支持在時(shí)間區(qū)間上獲取數(shù)據(jù)集澎蛛; skip, 類似java8 skip 操作,但是這里的可以擴(kuò)展到時(shí)間區(qū)間上 first/firstElement/last/lastElement, 由 Flowable -> Single/Maybe.

        Flowable<String> f1 = Flowable
                .fromArray("blue", "red", "green", "yellow11", "orange", "cyan", "purple"
                );

        f1.elementAt(4, "hello").subscribe(System.out::println);
        //out: orange
        f1.takeUntil(x -> x.length() > 5).map(x -> "takeUntil-" + x).toList()
                .subscribe(System.out::println);
        //out: [takeUntil-blue, takeUntil-red, takeUntil-green, takeUntil-yellow11]
        f1.takeWhile(x -> x.length() <= 5).map(x -> "takeWhile-" + x).toList()
                .subscribe(System.out::println);
        //out: [takeWhile-blue, takeWhile-red, takeWhile-green]

        f1.skipWhile(x -> x.length() <= 5).map(x -> "skipWhile-" + x).toList()
                .subscribe(System.out::println);
        //[skipWhile-yellow11, skipWhile-orange, skipWhile-cyan, skipWhile-purple]

        Disposable d = f1.delay(v -> Flowable.timer(v.length(), TimeUnit.SECONDS))
                .skipUntil(Flowable.timer(5, TimeUnit.SECONDS)).map(x -> "skipUntil-" + x)
                .subscribe(System.out::println);
//        skipUntil-green
//        skipUntil-orange
//        skipUntil-purple
//        skipUntil-yellow11
        while (!d.isDisposed()) {
        }

異步與并發(fā)(Asynchronized & Concurrency)

RxJava 通過一些操作統(tǒng)一了 同步和異步抚垄,阻塞與非阻塞,并行與并發(fā)編程谋逻。

observeOn & subscribeOn & Scheduler

  • subscribeOn 和 observeOn 都是用來切換線程用的,都需要參數(shù) Scheduler.

  • Scheduler ,調(diào)度器, 是RxJava 對(duì)線程控制器 的 一個(gè)抽象,RxJava 已經(jīng)內(nèi)置了幾個(gè) Scheduler 呆馁,它們已經(jīng)適合大多數(shù)的使用場(chǎng)景:

    • trampoline, 直接在當(dāng)前線程運(yùn)行(繼續(xù)上一個(gè)操作中,最后處理完成的數(shù)據(jù)源所處線程毁兆,并不一定是主線程)浙滤,相當(dāng)于不指定線程;

    • computation, 這個(gè) Scheduler 使用的固定的線程池(FixedSchedulerPool),大小為 CPU 核數(shù), 適用于CPU 密集型計(jì)算气堕。

    • io,I/O 操作(讀寫文件瓷叫、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler送巡。行為模式和 newThread() 差不多摹菠,區(qū)別在于 io() 的內(nèi)部實(shí)現(xiàn)是是用一個(gè)無數(shù)量上限的線程池,可以重用空閑的線程骗爆,因此多數(shù)情況下 io() 比 newThread() 更有效率;

    • newThread, 總是啟用新線程次氨,并在新線程中執(zhí)行操作;

    • single摘投, 使用定長為1 的線程池(newScheduledThreadPool(1))煮寡,重復(fù)利用這個(gè)線程;

    • Schedulers.from, 將java.util.concurrent.Executor 轉(zhuǎn)換成一個(gè)調(diào)度器實(shí)例犀呼。

      java.util.function.Consumer<Object> pc = x -> System.out
             .println("Thread[" + Thread.currentThread().getName() + " ," + Thread
                     .currentThread().getId() + "] :" + x);
      Executor executor = Executors.newFixedThreadPool(2);
      Schedulers.from(executor).scheduleDirect(() -> pc.accept("executor one"));
      Schedulers.from(executor).scheduleDirect(() -> pc.accept("executor two"));
      Schedulers.trampoline().scheduleDirect(() -> pc.accept("trampoline"), 1, TimeUnit.SECONDS);
      Schedulers.single().scheduleDirect(() -> pc.accept("single one DONE"));
      Schedulers.single().scheduleDirect(() -> pc.accept("single two DONE"));
      Schedulers.computation()
             .scheduleDirect(() -> pc.accept("computation one DONE"), 1, TimeUnit.SECONDS);
      Schedulers.computation()
             .scheduleDirect(() -> pc.accept("computation two DONE"), 1, TimeUnit.SECONDS);
      Schedulers.io().scheduleDirect(() -> pc.accept("io one DONE"));
      Schedulers.io().scheduleDirect(() -> pc.accept("io two DONE"), 1, TimeUnit.SECONDS);
      Schedulers.io().scheduleDirect(() -> pc.accept("io tree DONE"), 1, TimeUnit.SECONDS);
      Schedulers.newThread().scheduleDirect(() -> pc.accept("newThread tree DONE"));
      
  • subscribeOn 將Flowable 的數(shù)據(jù)發(fā)射 切換到 Scheduler 所定義的線程幸撕, 只有第一個(gè) subscribeOn 操作有效 ;

  • observeOn 指定 observeOn 后續(xù)操作所在線程外臂,可以聯(lián)合多個(gè) observeOn 將切換多次 線程 坐儿;

    示例 Schedulers.newThread() 定義的線程發(fā)送數(shù)據(jù);

    Schedulers.computation() 定義的線程 執(zhí)行doOnNext;

    Schedulers.single() 執(zhí)行 subscribe

      Consumer<Object> threadConsumer = x -> System.out
              .println("Thread[" + Thread.currentThread().getName() + " ," + Thread
                      .currentThread().getId() + "] :" + x);
    
      Flowable<Path> f1 = Flowable.create((FlowableEmitter<Path> e) -> {
          Path dir = Paths.get("/home/clouder/berk/workspaces/cattle").toRealPath();
          try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(dir)) {
              Iterator<Path> iter = dirStream.iterator();
              while (iter.hasNext() && !e.isCancelled()) {
                  e.onNext(iter.next());
              }
              e.onComplete();
          }
      }, BackpressureStrategy.BUFFER);
      f1.subscribeOn(Schedulers.newThread()).subscribeOn(Schedulers.io())
              .observeOn(Schedulers.computation()).take(5).doOnNext(consumer).observeOn(Schedulers
              .single()).subscribe(consumer);
    

多線程并發(fā)示例

上小節(jié)給出示例 發(fā)射元素都會(huì)經(jīng)過同樣的線程切換貌矿,元素間不會(huì)產(chǎn)生并行執(zhí)行的效果炭菌。 如果需要達(dá)到 類似 Java8 parallel 執(zhí)行效果」渎可以采用FlatMap 變換 自定義并發(fā)操作黑低,在返回的Flowable進(jìn)行線程操作,如下示例所示:

  • f1 中元素會(huì)在Schedulers.newThread()中發(fā)射酌毡;

  • 讀取文本內(nèi)容的操作(Files::readAllLines, Collection::size) 會(huì)在 Schedulers.io() 所指定的線程池執(zhí)行克握;

  • sorted 操作會(huì)在 Schedulers.computation() 所指定的線程池中執(zhí)行;

  • subscribe() 同樣會(huì)在 Schedulers.computation() 所指定的線程池中執(zhí)行枷踏;

    f1.filter(Files::isRegularFile).doOnNext(consumer).subscribeOn(Schedulers.newThread())
            .flatMap(y -> Flowable.just(y).subscribeOn(Schedulers.io())
                    .map(Files::readAllLines)).map(Collection::size)
            .observeOn(Schedulers.computation()).doOnNext(consumer)
            .sorted(Comparator.naturalOrder())
            .observeOn(Schedulers.trampoline()).subscribe(consumer);

阻塞與非阻塞示例

  • 從阻塞到非阻塞 我們可以通過 subscribeOn() 來達(dá)到玛荞;

     //f1 為 主線程發(fā)射數(shù)據(jù)的Flowable
     //會(huì)阻塞主線程知道消費(fèi)完成
     f1.subscribe(System.out::println);
    
     // d會(huì)理解返回.
     Disposable d = f1.subscribeOn(Schedulers.newThread())
             .subscribe(System.out::println);
     while (!d.isDisposed()) {
     }
    
    • 從非阻塞到阻塞,可以通過blocking* 相關(guān)操作來實(shí)現(xiàn)
    Flowable<Path> f2 = f1.subscribeOn(Schedulers.newThread());
    //f2 為非阻塞flowable
    // 可以通過 blockingSubscribe 變?yōu)樵谥骶€程上消費(fèi)
    f2.blockingSubscribe(System.out::println);
    // 也可以通過下面操作返回結(jié)果呕寝。
    List<Path> list = f2.toList().blockingGet();
    Iterable<Path> iterator = f2.blockingIterable();

錯(cuò)誤處理 (error handling)

  • RxJava

  • 無需顯示的catch 編譯異常,RxJava2 已經(jīng)支持所有函數(shù)接口拋出Exception婴梧;

  • 更方便在異步多線程環(huán)境下進(jìn)行錯(cuò)誤處理下梢;

    如下示例 會(huì)打印第一個(gè)異常;

    Flowable<Long> f1 = Flowable.interval(500, TimeUnit.MILLISECONDS).map(index -> {
        throw new IOException(index + "");
    }).map(index -> {
        throw new IllegalArgumentException(index + "");
    });
    Disposable d = f1.subscribe(System.out::println, Throwable::printStackTrace);
    while (!d.isDisposed()) {
    }
  • 異橙洌可以被轉(zhuǎn)換,源數(shù)據(jù)發(fā)射終止
    • 當(dāng)出現(xiàn)異常時(shí)孽江,可以通過 onErrorReturn* 轉(zhuǎn)換成一個(gè)正常值返回;
    • 當(dāng)出現(xiàn)異常時(shí)番电,通過 onErrorResumeNext 自定義一個(gè)Publisher返回岗屏,意味著可以轉(zhuǎn)換一個(gè)異常類型;
    Flowable<Long> f1 = Flowable.interval(500, TimeUnit.MILLISECONDS).map(index -> {
        throw new IOException(index + "");
    }).map(index -> {
        throw new IllegalArgumentException(index + "");
    });
    f1.onErrorReturnItem(-1L).take(5)
            .subscribe(System.out::println, Throwable::printStackTrace);
    // 打印 -1 
    Disposable d = f1.onErrorResumeNext(e -> {
        if (e instanceof IOException) {
            return Flowable.error(new UncheckedIOException((IOException) e));
        }
        return Flowable.error(e);
    }).subscribe(System.out::println, Throwable::printStackTrace);
    // 打印 UncheckedIOException 異常
    while (!d.isDisposed()) {
    }
  • Flowable,map拋出異常,但數(shù)據(jù)繼續(xù)發(fā)射

    暫沒有找到直接方法可以達(dá)到漱办,但可以采取如下兩種方法達(dá)到

    Function<Long, Long> exceptionMap = x -> {
        if (new Random().nextInt(5) > 2) {
            throw new IOException(x + "");
        }
        return x;
    };

    // 使用flatMap + onErrorReturnItem
    Flowable<Long> f1 = Flowable.interval(500, TimeUnit.MILLISECONDS);
    f1.flatMap(index -> Flowable.just(index).map(exceptionMap).onErrorReturnItem(-1L))
    .take(5).subscribe(System.out::println);

   //直接封裝lift 操作
   public class ErrorResumeOperator<D, U> implements FlowableOperator<D, U>
    {
        private final Function<U, D> function;
        private final D defaultValue;

        public ErrorResumeOperator(Function<U, D> function, D defaultValue)
        {
            this.function = function;
            this.defaultValue = defaultValue;
        }

        @Override
        public Subscriber<? super U> apply(Subscriber<? super D> observer) throws Exception
        {
            Subscriber<U> subscriber = new Subscriber<U>()
            {
                @Override
                public void onSubscribe(Subscription s)
                {
                    observer.onSubscribe(s);
                }

                @Override
                public void onNext(U onNext)
                {
                    try {
                        observer.onNext(function.apply(onNext));
                    }
                    catch (Exception e) {
                        observer.onNext(defaultValue);
                    }
                }

                @Override
                public void onError(Throwable t)
                {
                    observer.onError(t);
                }

                @Override
                public void onComplete()
                {
                    observer.onComplete();
                }
            };
            return subscriber;
        }
    }
    Disposable d = f1.lift(new ErrorResumeOperator<>(exceptionMap, -1L)).take(5)
            .subscribe(System.out::println);

    while (!d.isDisposed()) {
    }
  • 出錯(cuò)重試(retry)

    RxJava 提供了retry以及相關(guān)的多個(gè)操作这刷,提供出錯(cuò)后重新發(fā)射數(shù)據(jù)功能;


    Function<Long, Long> exceptionMap = x -> {
        if (new Random().nextInt(5) > 3) {
            throw new IOException(x + "");
        }
        if (new Random().nextInt(6) < 1) {
            throw new SQLException(x + "");
        }
        return x;
    };
    Flowable<Long> f1 = Flowable.interval(500, TimeUnit.MILLISECONDS);
// 僅為 IOException 異常時(shí)最多重試3次,其它異常立即打印異常
    Disposable d = f1.map(exceptionMap).retry(3, e -> e instanceof IOException)
            .subscribe(System.out::println, Throwable::printStackTrace);
    while (!d.isDisposed()) {
    }

冷熱數(shù)據(jù)流

ConnectableFlowable & publish & connect

  • ConnectableFlowable 可連接的Flowable娩井, 不管是否消費(fèi)暇屋,只有調(diào)用了connect, 數(shù)據(jù)就一直在發(fā)射洞辣,不受消費(fèi)影響 ('冷' 的Flowable 變成'熱'的)
  • publish 將 普通 Flowable咐刨,變成 ConnectableFlowable ;
publishConnect

;

    ConnectableFlowable<String> f1 = Flowable.generate(() -> new BufferedReader(new InputStreamReader(System.in))
            , (reader, e) -> {
                while (true) {
                    String line = reader.readLine();
                    if (line == null || line.equalsIgnoreCase("exit")) {
                        break;
                    }
                    e.onNext(line);
                }
                e.onComplete();
            }).ofType(String.class).subscribeOn(Schedulers.io()).publish();

    TimeUnit.SECONDS.sleep(5);
    f1.connect(System.out::println);
    TimeUnit.SECONDS.sleep(5);
    f1.observeOn(Schedulers.newThread()).map(x -> "s0- " + x).subscribe(System.out::println);
    TimeUnit.SECONDS.sleep(5);
    f1.map(x -> "s1- " + x).subscribe(System.out::println);
    TimeUnit.SECONDS.sleep(50);

replay

replay 將Flowable變成 ConnectableFlowable, 在connect之后扬霜,確保每次消費(fèi)都使用相同數(shù)據(jù)定鸟。

    java.util.function.Function<String, Consumer<Object>> m = s -> v -> System.out
            .println("[" + System.currentTimeMillis() / 100 + "] " + s + "-" + v);
    ConnectableFlowable<Long> f1 = Flowable.intervalRange(1, 100, 0, 1, TimeUnit.SECONDS)
            .onBackpressureBuffer().replay();
    m.apply("").accept("start");
    TimeUnit.SECONDS.sleep(5);
    f1.connect();
    TimeUnit.SECONDS.sleep(5);
    f1.subscribe(m.apply("o1"));

    TimeUnit.SECONDS.sleep(5);
    f1.subscribe(m.apply("o2"));
    TimeUnit.SECONDS.sleep(20);

cache

緩存功能,將Flowable進(jìn)行緩存

    java.util.function.Function<String, Consumer<Object>> m = s -> v -> System.out
            .println("[" + System.currentTimeMillis() / 100 + "] " + s + "-" + v);

    Flowable<Path> f1 = Flowable.create((FlowableEmitter<Path> e) -> {
        Path dir = Paths.get("/home/clouder/berk/workspaces/cattle").toRealPath();
        try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(dir)) {
            Iterator<Path> iter = dirStream.iterator();
            while (iter.hasNext() && !e.isCancelled()) {
                Path path = iter.next();
                m.apply("-----create").accept(path);
                e.onNext(path);
            }
            e.onComplete();
        }
    }, BackpressureStrategy.BUFFER).cache();

    f1.count().subscribe(m.apply("count"));
    f1.filter(Files::isDirectory).subscribe(m.apply("filter"));

背壓(Backpressure)問題

問題描述: 在rxjava中會(huì)經(jīng)常遇到一種情況就是被觀察者發(fā)送消息太快以至于它的操作符或者訂閱者不能及時(shí)處理相關(guān)的消息著瓶。那么隨之而來的就是如何處理這些未處理的消息联予。

如下示例: f1 比 f2 元素發(fā)射速度快一倍。而zip是按照發(fā)射順序結(jié)合,所以出現(xiàn)f1的產(chǎn)生速度快于其消費(fèi)速度躯泰,因此會(huì)有背壓問題產(chǎn)生(當(dāng)發(fā)射到一定數(shù)量時(shí)會(huì)有異常拋出)谭羔。

    Consumer<Object> consumer = v -> System.out.println("[" + System.currentTimeMillis() / 100 + "] " + v);
    Flowable<Long> f1 = Flowable.interval(100, TimeUnit.MILLISECONDS);
    Flowable<Long> f2 = Flowable.interval(200, TimeUnit.MILLISECONDS);

    Flowable<Long> f3 = Flowable.zip(f1, f2, (x, y) -> x * 10000 + y);
    f3.subscribe(consumer);

對(duì)于出現(xiàn)的背壓問題: - Flowable默認(rèn)隊(duì)列大小為128,并且規(guī)范要求麦向,所有的操作符強(qiáng)制支持背壓瘟裸。 - 通過操作節(jié)流(Throttling)相關(guān)操作(sample 、throttleLast诵竭、throttleFirst话告、throttleWithTimeout、debounce等)來改變Flowable的發(fā)射數(shù)率卵慰;

  • 通過設(shè)置緩沖區(qū)和窗口(buffer,window)操作,來緩存過剩的數(shù)據(jù)沙郭,然后發(fā)送特定數(shù)據(jù)。

  • 設(shè)置背壓策略(onBackpressurebuffer & onBackpressureDrop & onBackpressureLatest)

RxJava 測(cè)試

RxJava2 支持test() 操作符裳朋,將Flowable轉(zhuǎn)變?yōu)?TestSubscriber,從而支持多種斷言操作病线。

    List<String> list = Arrays.asList(
            "orange", "blue", "red", "green", "yellow", "cyan", "purple");

    Flowable.fromIterable(list).subscribeOn(Schedulers.newThread()).sorted().test().assertValues(list.stream().sorted().toArray(String[]::new));
    Flowable.fromIterable(list).count().test().assertValue(Integer.valueOf(list.size()).longValue());
    List<String> out1 = Flowable.fromIterable(list).sorted().test().values();

Reference

  1. 響應(yīng)式宣言.https://github.com/reactivemanifesto/reactivemanifesto/blob/master/README.zh-cn.md

  2. RxJava 2.0 Released with Support for Reactive Streams Specification. https://www.infoq.com/news/2016/11/rxjava-2-with-reactive-streams

  3. https://www.lightbend.com/blog/7-ways-washing-dishes-and-message-driven-reactive-systems

  4. Use reactive streams API to combine akka-streams with rxJava. http://www.smartjava.org/content/use-reactive-streams-api-combine-akka-streams-rxjava

  5. What's different in 2.0. https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0

  6. Learning Reactive Programming with Java8. https://github.com/zouzhberk/rxjava-study/raw/master/docs/LearningReactiveProgramming.pdf

示例代碼位于: https://github.com/zouzhberk/rxjava-study

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市鲤嫡,隨后出現(xiàn)的幾起案子送挑,更是在濱河造成了極大的恐慌,老刑警劉巖暖眼,帶你破解...
    沈念sama閱讀 207,113評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件惕耕,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡诫肠,警方通過查閱死者的電腦和手機(jī)司澎,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評(píng)論 2 381
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來栋豫,“玉大人挤安,你說我怎么就攤上這事∩パ欤” “怎么了漱受?”我有些...
    開封第一講書人閱讀 153,340評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長骡送。 經(jīng)常有香客問我昂羡,道長,這世上最難降的妖魔是什么摔踱? 我笑而不...
    開封第一講書人閱讀 55,449評(píng)論 1 279
  • 正文 為了忘掉前任虐先,我火速辦了婚禮,結(jié)果婚禮上派敷,老公的妹妹穿的比我還像新娘蛹批。我一直安慰自己撰洗,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,445評(píng)論 5 374
  • 文/花漫 我一把揭開白布腐芍。 她就那樣靜靜地躺著差导,像睡著了一般。 火紅的嫁衣襯著肌膚如雪猪勇。 梳的紋絲不亂的頭發(fā)上设褐,一...
    開封第一講書人閱讀 49,166評(píng)論 1 284
  • 那天,我揣著相機(jī)與錄音泣刹,去河邊找鬼助析。 笑死,一個(gè)胖子當(dāng)著我的面吹牛椅您,可吹牛的內(nèi)容都是我干的外冀。 我是一名探鬼主播,決...
    沈念sama閱讀 38,442評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼掀泳,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼雪隧!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起员舵,我...
    開封第一講書人閱讀 37,105評(píng)論 0 261
  • 序言:老撾萬榮一對(duì)情侶失蹤脑沿,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后固灵,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,601評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡劫流,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,066評(píng)論 2 325
  • 正文 我和宋清朗相戀三年巫玻,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片祠汇。...
    茶點(diǎn)故事閱讀 38,161評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡仍秤,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出可很,到底是詐尸還是另有隱情诗力,我是刑警寧澤,帶...
    沈念sama閱讀 33,792評(píng)論 4 323
  • 正文 年R本政府宣布我抠,位于F島的核電站苇本,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏菜拓。R本人自食惡果不足惜瓣窄,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,351評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望纳鼎。 院中可真熱鬧俺夕,春花似錦裳凸、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至映九,卻和暖如春梦湘,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背氯迂。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評(píng)論 1 261
  • 我被黑心中介騙來泰國打工践叠, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人嚼蚀。 一個(gè)月前我還...
    沈念sama閱讀 45,618評(píng)論 2 355
  • 正文 我出身青樓禁灼,卻偏偏與公主長得像,于是被迫代替她去往敵國和親轿曙。 傳聞我的和親對(duì)象是個(gè)殘疾皇子弄捕,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,916評(píng)論 2 344

推薦閱讀更多精彩內(nèi)容