RxJava2操作符總結(jié) -- 你想要的都在這里了

對RxJava的使用還一直停留在與Retrofit一起使用做網(wǎng)絡(luò)請求,其他的還都不了解伊群,所以花了不少時間整理把RxJava的操作符基本都敲了一遍考杉,熟悉了一遍。參考了一些博客舰始,現(xiàn)在把我整理的分享出來給大家崇棠。以下代碼完全在IDEA中編寫,可以直接放到IDEA里跑起來丸卷,需要注意的是枕稀,我使用的語言是Kotlin,所以如果沒有Kotlin環(huán)境的話需要先配置一下Kotlin的環(huán)境谜嫉。

首先RxJava2 gradle集成:

implementation "io.reactivex.rxjava2:rxjava:2.2.8"
implementation 'io.reactivex.rxjava2:rxkotlin:2.0.0'

同時我使用到了RxKotlin萎坷,一個非常不錯的RxJava Kotlin擴(kuò)展庫,也是reactivex出品沐兰。

2019/5/5 更新compose操作符

好了哆档,廢話不多說,話都在代碼里??
接下來看代碼住闯,有點(diǎn)長哦??

package com.kylec.kc.rxjava

import io.reactivex.Observable
import io.reactivex.ObservableTransformer
import io.reactivex.Observer
import io.reactivex.disposables.Disposable
import io.reactivex.functions.BiFunction
import io.reactivex.functions.Consumer
import io.reactivex.rxkotlin.subscribeBy
import io.reactivex.rxkotlin.toObservable
import io.reactivex.schedulers.Schedulers
import java.util.*
import java.util.concurrent.TimeUnit
import kotlin.collections.ArrayList
import kotlin.random.Random

/**
 * 給Android開發(fā)者的RxJava詳解
 * https://gank.io/post/560e15be2dca930e00da1083
 *
 * RxKotlin
 * https://github.com/ReactiveX/RxKotlin/blob/2.x/README.md
 *
 * RxJava系列
 * http://www.reibang.com/p/823252f110b0
 *
 * RxJava2看這一篇文章就夠了
 * https://juejin.im/post/5b17560e6fb9a01e2862246f
 *
 *
 * 基礎(chǔ)類:
 * Flowable: 多個流瓜浸,響應(yīng)式流和背壓
 * Observable: 多個流,無背壓   (被觀察者)
 * Single: 只有一個元素或者錯誤的流
 * Completable: 沒有任何元素比原,只有一個完成和錯誤信號的流
 * Maybe: 沒有任何元素或者只有一個元素或者只有一個錯誤的流
 *
 *
 * 調(diào)度器種類:
 * Schedulers.computation()
 * 用于計(jì)算任務(wù)插佛,如事件循環(huán)或和回調(diào)處理,不要用于IO操作(IO操作請使用Schedulers.io())量窘;默認(rèn)線程數(shù)等于處理器的數(shù)量
 *
 * Schedulers.from(executor)            使用指定的Executor作為調(diào)度器
 *
 * Schedulers.single()                  該調(diào)度器的線程池只能同時執(zhí)行一個線程
 *
 * Schedulers.io()
 * 用于IO密集型任務(wù)雇寇,如異步阻塞IO操作,這個調(diào)度器的線程池會根據(jù)需要增長蚌铜;
 * 對于普通的計(jì)算任務(wù)锨侯,請使用Schedulers.computation();
 * 默認(rèn)是一個CachedThreadScheduler厘线,很像一個有線程緩存的新線程調(diào)度器
 *
 * Schedulers.newThread()              為每個任務(wù)創(chuàng)建一個新線程
 *
 * Schedulers.trampoline()              當(dāng)其它排隊(duì)的任務(wù)完成后识腿,在當(dāng)前線程排隊(duì)開始執(zhí)行。
 *
 * AndroidSchedulers.mainThread()          主線程造壮,UI線程渡讼,可以用于更新界面
 *
 *
 * 注意:本例部分結(jié)果未打印 因?yàn)樵谶@里有延時但線程已掛掉 正常在Android中使用是沒事的
 *
 * Created by KYLE on 2019/4/28 - 19:57
 */
fun main() {
    // ---------------- `Observable` 創(chuàng)建事件序列的方法 ----------------
    println("---------------- `Observable`創(chuàng)建事件序列的方法 ----------------")
    create()
    interval()
    defer()
    emptyNeverError()
    repeat()
    timer()
    from()
    just()
    range()
    println()


    // ---------------- `Observable` 變換操作 ----------------
    println("---------------- `Observable`變換操作 ----------------")
    mapCast()
    flatMap2contactMap()
    flatMapExample()
    flatMapIterable()
    buffer()
    groupBy()
    scan()
    window()
    println()


    // ---------------- `Observable` 過濾操作/條件操作符 ----------------
    println("---------------- `Observable` 過濾操作/條件操作符 ----------------")
    filter()
    element()
    distinct()
    skip()
    take()
    ignoreElements()
    debounce()
    ofType()
    all()
    contains()
    isEmpty()
    defaultIfEmpty()
    amb()
    println()


    // ---------------- `Observable` 組合操作 ----------------
    println("---------------- `Observable` 組合操作 ----------------")
    concat()
    merge()
    startWith()
    zip()
    combineLast()
    reduce()
    collect()
    count()
    println()


    // ------------------- 功能操作符/輔助操作 -------------------
    println("------------------- 功能操作符/輔助操作 -------------------")
    delay()
    doSeries()
    retry()
    subscribeOn()
    observeOn()
    println()


    // ---------------- RxKotlin擴(kuò)展庫 ----------------
    println("---------------- RxKotlin擴(kuò)展庫 ----------------")
    rkExExample()
    println()


    // ------------------- 額外 其他 -------------------
    println("------------------- 額外 其他 -------------------")
    compose()
    println()
}

/**
 * 使用基本`create()`方法創(chuàng)建事件序列
 */
fun create() {
    print("[create]: ")
    Observable.create<String> { emitter ->
        with(emitter) {
            onNext("Hello")
            onNext("Handsome")
            onNext("Kotlin")
            onComplete()
        }
    }.subscribe(object : Observer<String> {
        override fun onSubscribe(d: Disposable) {
            print("onSubscribe ")
        }

        override fun onError(e: Throwable) {}

        override fun onComplete() {
            print(" onComplete ")
        }

        override fun onNext(t: String) {
            print("$t  ")
        }
    })
    println()
}

/**
 * 使用`interval()`方法創(chuàng)建事件序列
 * 間隔發(fā)射
 */
fun interval() {
    print("[interval]: ")

    // 每隔1秒發(fā)送一個整數(shù) 從0開始 (默認(rèn)執(zhí)行無數(shù)次 使用`take(int)`方法限制執(zhí)行次數(shù))
    val disposable = Observable.interval(0, 1, TimeUnit.SECONDS)
        .take(5)
        .subscribe { print("$it  ") }
    if (!disposable.isDisposed) disposable.dispose()

    println()

    /*
        其他重載方法:
        public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
        public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler)
        public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

        `initialDelay`參數(shù)用來指示開始發(fā)射第一個整數(shù)的之前要停頓的時間,時間的單位與period一樣耳璧,都是通過unit參數(shù)來指定的成箫;
        `period`參數(shù)用來表示每個發(fā)射之間停頓多少時間;
        `unit`表示時間的單位旨枯,是TimeUnit類型的蹬昌;
        `scheduler`參數(shù)指定數(shù)據(jù)發(fā)射和等待時所在的線程。
     */
}

/**
 * 使用`defer()`方法創(chuàng)建事件序列
 *
 * `defer`直到有觀察者訂閱時才創(chuàng)建Observable攀隔,并且為每個觀察者創(chuàng)建一個新的Observable
 * `defer`操作符會一直等待直到有觀察者訂閱它皂贩,然后它使用Observable工廠方法生成一個Observable
 */
fun defer() {
    print("[defer]: ")

    val observable =
        Observable.defer { Observable.just(System.currentTimeMillis()) }
    observable.subscribe { print("$it ") }   // 454  訂閱時才產(chǎn)生了Observable
    print("   ")
    observable.subscribe { print("$it ") }   // 459  訂閱時才產(chǎn)生了Observable

    println()
}

/**
 * 使用`empty()` `never()` `error()`方法創(chuàng)建事件序列
 *
 * public static <T> Observable<T> `empty()`:創(chuàng)建一個不發(fā)射任何數(shù)據(jù)但是正常終止的Observable
 * public static <T> Observable<T> `never()`:創(chuàng)建一個不發(fā)射數(shù)據(jù)也不終止的Observable
 * public static <T> Observable<T> `error(Throwable exception)`:創(chuàng)建一個不發(fā)射數(shù)據(jù)以一個錯誤終止的Observable
 */
fun emptyNeverError() {
    print("[empty]: ")

    Observable.empty<String>().subscribeBy(
        onNext = { print(" next ") },
        onComplete = { print(" complete ") },
        onError = { print(" error ") }
    )
    // `empty()`只會調(diào)用onComplete方法

    println()

    print("[never]: ")

    Observable.never<String>().subscribeBy(
        onNext = { print(" next ") },
        onComplete = { print(" complete ") },
        onError = { print(" error ") }
    )
    // 什么也不會做

    println()

    print("[error]: ")

    Observable.error<Exception>(Exception()).subscribeBy(
        onNext = { print(" next ") },
        onComplete = { print(" complete ") },
        onError = { print(" error ") }
    )
    // `error()`只會調(diào)用onError方法

    println()
}

/**
 * 使用`repeat()`方法創(chuàng)建事件序列
 *
 * 表示指定的序列要發(fā)射多少次
 */
fun repeat() {
    // 重載方法
    // public final Observable<T> repeat(long times)
    // public final Observable<T> repeatUntil(BooleanSupplier stop)
    // public final Observable<T> repeatWhen(Function<? super Observable<Object>, ? extends ObservableSource<?>> handler)

    print("[repeat]: ")

    // 不指定次數(shù)即無限次發(fā)送(內(nèi)部調(diào)用有次數(shù)的重載方法并傳入Long.MAX_VALUE)  別執(zhí)行啊 ~ 卡的不行 ~
    // Observable.range(5, 10).repeat().subscribe { print("$it  ") }

    Observable.range(5, 10).repeat(1).subscribe { print("$it  ") }

    println()

    print("[repeatUntil]: ")

    // repeatUntil在滿足指定要求的時候停止重復(fù)發(fā)送栖榨,否則會一直發(fā)送
    // 這里當(dāng)隨機(jī)產(chǎn)生的數(shù)字`<10`時停止發(fā)送 否則繼續(xù)  (這里始終為true(即停止重復(fù)) 省的瘋了似的執(zhí)行)
    val numbers = arrayOf(0, 1, 2, 3, 4)
    numbers.toObservable().repeatUntil {
        Random(10).nextInt() < 10
    }.subscribe { print("$it  ") }

    println()
}

/**
 * 使用`timer()`方法創(chuàng)建事件序列
 *
 * 執(zhí)行延時任務(wù)
 *
 * 創(chuàng)建一個在給定的時間段之后返回一個特殊值的Observable,它在延遲一段給定的時間后發(fā)射一個簡單的數(shù)字0
 */
fun timer() {
    print("[timer]: ")

    // 在500毫秒之后輸出一個數(shù)字0
    val disposable = Observable.timer(500, TimeUnit.MILLISECONDS).subscribe { print("$it  ") }
    if (!disposable.isDisposed) disposable.dispose()

    println()
}

/**
 * 使用`from()`方法快捷創(chuàng)建事件隊(duì)列
 *
 * `fromArray`
 * `fromCallable`
 * `fromIterable`  (和上面的fromArray類似 只不過這里是集合罷了)
 *
 * 將傳入的數(shù)組依次發(fā)送出來
 */
fun from() {
    print("[fromArray]: ")

    val names = arrayOf("ha", "hello", "yummy", "kt", "world", "green", "delicious")
    // 注意:使用`*`展開數(shù)組
    val disposable = Observable.fromArray(*names).subscribe { print("$it  ") }
    if (!disposable.isDisposed) disposable.dispose()

    println()

    print("[fromCallable]: ")

    // 可以在Callable內(nèi)執(zhí)行一段代碼 并返回一個值給觀察者
    Observable.fromCallable { 1 }.subscribe { print("$it  ") }

    println()
}

/**
 * 使用`just()`方法快捷創(chuàng)建事件隊(duì)列
 *
 * 將傳入的參數(shù)依次發(fā)送出來(最少1個 最多10個)
 */
fun just() {
    print("[just]: ")

    val disposable = Observable.just("Just1", "Just2", "Just3")
        // 將會依次調(diào)用:
        // onNext("Just1");
        // onNext("Just2");
        // onNext("Just3");
        // onCompleted();
        .subscribe { print("$it  ") }
    if (!disposable.isDisposed) disposable.dispose()

    println()
}

/**
 * 使用`range()`方法快捷創(chuàng)建事件隊(duì)列
 *
 * 創(chuàng)建一個序列
 */
fun range() {
    print("[range]: ")

    // 用Observable.range()方法產(chǎn)生一個序列
    // 用map方法將該整數(shù)序列映射成一個字符序列
    // 最后將得到的序列輸出來 forEach內(nèi)部也是調(diào)用的 subscribe(Consumer<? super T> onNext)
    val disposable = Observable.range(0, 10)
        .map { item -> "range$item" }
//        .forEach { print("$it  ") }
        .subscribeBy(
            onNext = { print("$it  ") },
            onComplete = { print("range complete !!! ") }
        )
    if (!disposable.isDisposed) disposable.dispose()

    println()
}

/**
 * 使用`map()` `cast()` 做變換操作
 *
 * `map`操作符對原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個你選擇的函數(shù)明刷,然后返回一個發(fā)射這些結(jié)果的Observable婴栽。默認(rèn)不在任何特定的調(diào)度器上執(zhí)行
 *
 * `cast`操作符將原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)都強(qiáng)制轉(zhuǎn)換為一個指定的類型`(多態(tài))`,然后再發(fā)射數(shù)據(jù)辈末,它是map的一個特殊版本
 */
fun mapCast() {
    print("[map]: ")

    Observable.range(1, 5).map { item -> "to String $item" }.subscribe { print("$it  ") }

    println()

    print("[cast]: ")

    // 將`Date`轉(zhuǎn)換為`Any` (如果前面的Class無法轉(zhuǎn)換成第二個Class就會出現(xiàn)ClassCastException)
    Observable.just(Date()).cast(Any::class.java).subscribe { print("$it  ") }

    println()
}

/*
    `map`與`flatMap`的區(qū)別(出自朱凱):
    `map`是在一個 item 被發(fā)射之后愚争,到達(dá) map 處經(jīng)過轉(zhuǎn)換變成另一個 item ,然后繼續(xù)往下走挤聘;
    `flapMap`是 item 被發(fā)射之后轰枝,到達(dá) flatMap 處經(jīng)過轉(zhuǎn)換變成一個 Observable
    而這個 Observable 并不會直接被發(fā)射出去,而是會立即被激活组去,然后把它發(fā)射出的每個 item 都傳入流中鞍陨,再繼續(xù)走下去。
 */

/**
 * 使用`flatMap()` `contactMap()` 做變換操作
 *
 * `flatMap`將一個發(fā)送事件的上游Observable變換為多個發(fā)送事件的Observables添怔,然后將它們發(fā)射的事件合并后放進(jìn)一個單獨(dú)的Observable里
 * `flatMap`不保證順序  `contactMap()`保證順序
 */
fun flatMap2contactMap() {
    print("[flatMap]: ")

    /*
        `flatMap()` 的原理是這樣的:
        1. 使用傳入的事件對象創(chuàng)建一個 Observable 對象湾戳;
        2. 并不發(fā)送這個 Observable, 而是將它激活,于是它開始發(fā)送事件广料;
        3. 每一個創(chuàng)建出來的 Observable 發(fā)送的事件砾脑,都被匯入同一個 Observable ,
        而這個 Observable 負(fù)責(zé)將這些事件統(tǒng)一交給 Subscriber 的回調(diào)方法艾杏。
        這三個步驟韧衣,把事件拆成了兩級,通過一組新創(chuàng)建的 Observable 將初始的對象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去购桑。
        而這個『鋪平』就是 flatMap() 所謂的 flat畅铭。
     */

    Observable.range(1, 5).flatMap {
        Observable.just("$it to flat")
    }.subscribe { print("$it  ") }

    println()

    print("[contactMap]: ")

    Observable.range(1, 5).concatMap {
        Observable.just("$it to concat")
    }.subscribe { print("$it  ") }

    println()

}

/**
 * `flatMap`挺重要的,再舉一個例子
 *
 * 依次打印Person集合中每個元素中Plan的action
 */
fun flatMapExample() {
    print("[flatMapExample]: ")

    arrayListOf(
        Person("KYLE", arrayListOf(Plan(arrayListOf("Study RxJava", "May 1 to go home")))),
        Person("WEN QI", arrayListOf(Plan(arrayListOf("Study Java", "See a Movie")))),
        Person("LU", arrayListOf(Plan(arrayListOf("Study Kotlin", "Play Game")))),
        Person("SUNNY", arrayListOf(Plan(arrayListOf("Study PHP", "Listen to music"))))
    ).toObservable().flatMap {
        Observable.fromIterable(it.planList)
    }.flatMap {
        Observable.fromIterable(it.actionList)
    }.subscribeBy(
        onNext = { print("$it  ") }
    )

    println()
}

/**
 * 使用`flatMapIterable()`做變換操作
 *
 * 將上流的任意一個元素轉(zhuǎn)換成一個Iterable對象
 */
fun flatMapIterable() {
    print("[flatMapIterable]: ")

    Observable.range(1, 5)
        .flatMapIterable { integer ->
            Collections.singletonList("$integer")
        }
        .subscribe { print("$it  ") }

    // [flatMapIterable]: 1  2  3  4  5

    println()
}

/**
 * 使用`buffer()`做變換操作
 *
 * 用于將整個流進(jìn)行分組
 */
fun buffer() {
    print("[buffer]: ")

    // 生成一個7個整數(shù)構(gòu)成的流勃蜘,然后使用`buffer`之后硕噩,這些整數(shù)會被3個作為一組進(jìn)行輸出

    // count 是一個buffer的最大值
    // skip 是指針后移的距離(不定義時就為count)
    // 例如 1 2 3 4 5 buffer(3) 的結(jié)果為:[1,2,3] [4,5]      (buffer(3)也就是buffer(3,3))
    // 例如 1 2 3 4 5 buffer(3,2) 的結(jié)果為:[1,2,3] [3,4,5] [5]
    Observable.range(1, 5).buffer(3)
        .subscribe {
            print("${Arrays.toString(it.toIntArray())}  ")
        }

    println()
}

/**
 * 使用`groupBy()`做變換操作
 *
 * 用于分組元素(根據(jù)groupBy()方法返回的值進(jìn)行分組)
 * 將發(fā)送的數(shù)據(jù)進(jìn)行分組,每個分組都會返回一個被觀察者缭贡。
 */
fun groupBy() {
    print("[groupBy]: ")

    // 使用concat方法先將兩個Observable拼接成一個Observable炉擅,然后對其元素進(jìn)行分組。
    // 這里我們的分組依據(jù)是整數(shù)的值阳惹,這樣我們將得到一個Observable<GroupedObservable<Integer, Integer>>類型的Observable谍失。
    // 然后,我們再將得到的序列拼接成一個莹汤,并進(jìn)行訂閱輸出

    Observable.concat(Observable.concat(
        Observable.range(1, 3), Observable.range(1, 4)
    ).groupBy { it }
    ).subscribe { print("groupBy: $it  ") }

    // [groupBy]: groupBy: 1  groupBy: 1  groupBy: 2  groupBy: 2  groupBy: 3  groupBy: 3  groupBy: 4

    println()
}

/**
 * 使用`scan()`做變換操作
 *
 * 將數(shù)據(jù)以一定的邏輯聚合起來
 *
 * scan操作符對原始Observable發(fā)射的第一項(xiàng)數(shù)據(jù)應(yīng)用一個函數(shù)快鱼,然后將那個函數(shù)的結(jié)果作為自己的第一項(xiàng)數(shù)據(jù)發(fā)射。
 * 它將函數(shù)的結(jié)果同第二項(xiàng)數(shù)據(jù)一起填充給這個函數(shù)來產(chǎn)生它自己的第二項(xiàng)數(shù)據(jù)。
 * 它持續(xù)進(jìn)行這個過程來產(chǎn)生剩余的數(shù)據(jù)序列抹竹。這個操作符在某些情況下被叫做`accumulator`
 */
fun scan() {
    print("[scan]: ")

    val disposable = Observable.just(1, 2, 3, 4, 5)
        .scan { t1: Int, t2: Int -> t1 + t2 }
        .subscribe { print("$it  ") }
    if (!disposable.isDisposed) disposable.dispose()

    // [scan]: 1  3  6  10  15

    println()
}

/**
 * 使用`window()`做變換操作
 *
 * 將事件分組 參數(shù)`count`就是分的組數(shù)
 *
 * `window`和`buffer`類似线罕,但不是發(fā)射來自原始Observable的數(shù)據(jù)包,它發(fā)射的是Observable柒莉,
 * 這些Observables中的每一個都發(fā)射原始Observable數(shù)據(jù)的一個子集闻坚,最后發(fā)射一個onCompleted通知。
 */
fun window() {
    print("[window]: ")

    Observable.range(1, 10).window(3)
        .subscribeBy(
            onNext = { it.subscribe { int -> print("{${it.hashCode()} : $int} ") } },
            onComplete = { print("onComplete ") }
        )

    println()
}

/**
 * 使用`filter()`做過濾操作
 *
 * 對源做過濾
 */
fun filter() {
    print("[filter]: ")

    // 過濾掉 <=5 的數(shù)據(jù)源 只有 >5 的數(shù)據(jù)源會發(fā)送出去
    Observable.range(1, 10).filter { it > 5 }.subscribe { print("$it  ") }

    println()
}

/**
 * 使用`element()`獲取源中指定位置的數(shù)據(jù)
 *
 * `elementAt`  指定位置
 * `firstElement`  第一個
 * `lastElement`   最后一個
 */
fun element() {
    print("[elementAt]: ")

    // `elementAt` 在輸入的 index 超出事件序列的總數(shù)就不會出現(xiàn)任何結(jié)果
    // `elementAtOrError` 則會報(bào)異常
    // `first...` 和 `last...` 都類似

    // 只有index=0的數(shù)據(jù)源會被發(fā)射
    Observable.range(1, 10).elementAt(0).subscribe { print("$it  ") }

    println()

    print("[firstElement]: ")

    Observable.range(1, 19).firstElement().subscribe { print("$it  ") }

    println()

    print("[lastElement]: ")

    Observable.range(34, 2).lastElement().subscribe { print("$it  ") }

    println()
}

/**
 * 使用`distinct()`對源中相同的數(shù)據(jù)進(jìn)行過濾
 */
fun distinct() {
    print("[distinct]: ")

    Observable.just(1, 1, 1, 2, 3, 4, 1, 5, 5, 6)
        .distinct()
        .subscribe { print("$it  ") }

    // [distinct]: 1  2  3  4  5  6

    println()

    print("[distinctUntilChanged]: ")

    Observable.just(1, 1, 1, 2, 3, 4, 1, 5, 5, 6)
        .distinctUntilChanged()
        .subscribe { print("$it  ") }

    // [distinctUntilChanged]: 1  2  3  4  1  5  6

    println()
}

/**
 * 使用`skip()` 過濾掉數(shù)據(jù)的前n項(xiàng)
 *
 * `skip`         過濾掉數(shù)據(jù)的前n項(xiàng) 參數(shù)count代表跳過事件的數(shù)量
 * `skipLast`     與`skip` 功能相反 過濾掉數(shù)據(jù)的后n項(xiàng)
 * `skipUntil`    當(dāng) skipUntil() 中的 Observable 發(fā)送事件了兢孝,原來的 Observable 才會發(fā)送事件給觀察者。
 * `skipWhile`    可以設(shè)置條件仅偎,當(dāng)某個數(shù)據(jù)滿足條件時不發(fā)送該數(shù)據(jù)跨蟹,反之則發(fā)送。
 */
fun skip() {
    print("[skip]: ")

    Observable.just(1, 2, 3, 4, 5, 6)
        .skip(2)
        .subscribe { print("$it  ") }

    // [skip]: 3  4  5  6

    println()

    print("[skipUntil]: ")

    Observable.just(6)
        .skipUntil<Int> { Observable.just(2).delay(2, TimeUnit.SECONDS).subscribe { print("$it  ") } }
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())
        .subscribe { print("$it  ") }

    println()

    print("[skipWhile]: ")

    Observable.just(1, 2, 3, 4)
        .skipWhile {
            it < 3
        }.subscribe { print("$it  ") }

    // [skipWhile]: 3  4

    println()
}

/**
 * 使用`take()` 取數(shù)據(jù)的前n項(xiàng)
 *
 * `take`          取數(shù)據(jù)的前n項(xiàng) 參數(shù)count代表取的事件的數(shù)量
 * `takeLast`      與`take`功能相反 取數(shù)據(jù)的后n項(xiàng)
 * `takeUntil`
 * `takeWhile`
 *
 * 與`skip...`對應(yīng)
 */
fun take() {
    print("[take]: ")

    Observable.range(1, 5).take(2).subscribe { print("$it  ") }

    // [take]: 1  2

    println()
}

/**
 * 使用`ignoreElements()` 過濾所有源Observable產(chǎn)生的結(jié)果
 *
 * 只會把Observable的`onComplete`和`onError`事件通知給訂閱者
 */
fun ignoreElements() {
    print("[ignoreElements]: ")

    Observable.just(1, 1, 2, 3, 4)
        .ignoreElements()
        .subscribeBy(
            onComplete = { print(" onComplete ") },
            onError = { print(" onError ") }
            // 沒有`onNext`
        )

    println()
}

/**
 * 使用`debounce()` 限制發(fā)射頻率過快
 *
 * 如果兩件事件發(fā)送的時間間隔小于設(shè)定的時間間隔則`前一件`事件就不會發(fā)送給觀察者
 */
fun debounce() {

    print("[debounce]: ")

    Observable.create<Int> { emitter ->
        emitter.onNext(1)
        Thread.sleep(900)
        emitter.onNext(2)
    }.debounce(1, TimeUnit.SECONDS)
        .subscribe { print("$it  ") }

    // 2

    println()
}

/**
 * 使用`ofType()` 過濾不符合該類型事件
 */
fun ofType() {
    print("[ofType]: ")

    Observable.just(1, 2, 3, "k", "Y")
        .ofType(String::class.java)
        .subscribe { print("$it  ") }

    // [ofType]: k  Y

    println()
}

/**
 * `all`
 *
 * 判斷事件序列是否全部滿足某個事件橘沥,如果都滿足則返回 true窗轩,反之則返回 false
 */
fun all() {
    print("[all]: ")

    Observable.just(1, 2, 3, 4)
        .all { it < 5 }
        .subscribe(Consumer {
            print("$it  ")
        })

    // [all]: true

    println()
}

/**
 * `contains`
 *
 * 判斷事件序列中是否含有某個元素,如果有則返回 true座咆,如果沒有則返回 false痢艺。
 */
fun contains() {
    print("[contains]: ")

    Observable.just(1, 2, 3, 4)
        .contains(3)
        .subscribe(Consumer {
            print("$it  ")
        })

    // [contains]: true

    println()
}

/**
 * `isEmpty`
 *
 * 判斷事件序列是否為空  是返回true  否返回false
 */
fun isEmpty() {
    print("[isEmpty]: ")

    Observable.create<String> { emitter ->
        emitter.onComplete()
    }.isEmpty
        .subscribe(Consumer {
            print("$it  ")
        })

    // [isEmpty]: true

    println()
}

/**
 * `defaultIfEmpty`
 *
 * 如果觀察者只發(fā)送一個 onComplete() 事件,則可以利用這個方法發(fā)送一個值介陶。
 */
fun defaultIfEmpty() {
    print("[defaultIfEmpty]: ")

    Observable.create<Int> { emitter ->
        emitter.onComplete()
    }.defaultIfEmpty(666)
        .subscribe { print("$it  ") }

    // [defaultIfEmpty]: 666

    println()

}

/**
 * `amb`
 *
 * amb() 要傳入一個 Observable 集合堤舒,但是只會發(fā)送最先發(fā)送事件的 Observable 中的事件,其余 Observable 將會被丟棄哺呜。
 */
fun amb() {
    print("[amb]: ")

    val list = ArrayList<Observable<Long>>()
    list.add(Observable.intervalRange(1, 5, 2, 1, TimeUnit.SECONDS))
    list.add(Observable.intervalRange(6, 5, 0, 1, TimeUnit.SECONDS))

    Observable.amb(list)
        .subscribe { print("$it  ") }

    // [amb]:  6  7  8  9  10

    println()
}

/**
 * 使用`concat()`做組合操作
 *
 * 將多個Observable拼接起來舌缤,但是它會嚴(yán)格按照傳入的Observable的順序進(jìn)行發(fā)射,一個Observable沒有發(fā)射完畢之前不會發(fā)射另一個Observable里面的數(shù)據(jù)
 *
 * `concat()`方法內(nèi)部還是調(diào)用的`concatArray(source1, source2)`方法某残,只是在調(diào)用前對傳入的參數(shù)做了`null`判斷
 *
 * 與 `merge()` 作用基本一樣国撵,只是 `merge()` 是并行發(fā)送事件,而 concat() 串行發(fā)送事件
 */
fun concat() {
    print("[concat]: ")

    val disposable = Observable.concat(Observable.range(1, 5), Observable.range(6, 5))
        .subscribe { print("$it  ") }
    if (!disposable.isDisposed) disposable.dispose()

    // [concat]: 1  2  3  4  5  6  7  8  9  10

    println()
}

/**
 * 使用`merge` 做組合操作
 *
 * 讓多個數(shù)據(jù)源的數(shù)據(jù)合并起來進(jìn)行發(fā)射(merge后的數(shù)據(jù)可能會交錯發(fā)射)
 * 與 `concat()` 作用基本一樣玻墅,只是 `concat()` 是串行發(fā)送事件介牙,而 merge() 并行發(fā)送事件
 *
 * 內(nèi)部實(shí)際操作為調(diào)用了`fromArray()+flatMap`方法 只是在調(diào)用前對數(shù)據(jù)源參數(shù)做了`null`判斷
 *
 * 與`mergeError`的比較
 * `mergeError`方法與`merge`方法的表現(xiàn)一致,
 * 只是在處理由`onError`觸發(fā)的錯誤的時候有所不同澳厢。
 * `mergeError`方法會等待所有的數(shù)據(jù)發(fā)射完畢之后才把錯誤發(fā)射出來环础,即使多個錯誤被觸發(fā),該方法也只會發(fā)射出一個錯誤信息赏酥。
 * 而如果使用`merger`方法喳整,那么當(dāng)有錯誤被觸發(fā)的時候,該錯誤會直接被拋出來裸扶,并結(jié)束發(fā)射操作
 */
fun merge() {
    print("[merge]: ")

    Observable.merge(Observable.range(1, 5), Observable.range(6, 5))
        .subscribe { print("$it  ") }

    // [merge]: 1  2  3  4  5  6  7  8  9  10

    println()
}

/**
 * 使用`startWith` 做組合操作 在發(fā)送事件之前追加事件
 *
 * `startWith`         追加一個事件
 * `startWithArray`    追加多個事件
 *
 * `追加的事件會先發(fā)出`
 *
 * `startWith`方法可以用來在指定的數(shù)據(jù)源的之前插入幾個數(shù)據(jù)
 */
fun startWith() {
    print("[startWith]: ")

    Observable.range(5, 3)
        .startWithArray(1, 2, 3, 4)
        .startWith(0).subscribe { print("$it  ") }

    // [startWith]: 0  1  2  3  4  5  6  7

    println()
}

/**
 * 使用`zip` 做組合操作
 *
 * 用來將多個數(shù)據(jù)項(xiàng)進(jìn)行合并 根據(jù)各個被觀察者發(fā)送事件的順序一個個結(jié)合起來框都,最終發(fā)送的事件數(shù)量會與源 Observable 中最少事件的數(shù)量一樣
 * 為什么呢?因?yàn)閿?shù)據(jù)源少的那個 Observable 發(fā)送完成后發(fā)送了 onComplete 方法,所以數(shù)據(jù)源多的那個就不會再發(fā)送事件了
 */
fun zip() {
    print("[zip]: ")

    Observable.zip(Observable.range(1, 6), Observable.range(6, 5),
        BiFunction<Int, Int, Int> { t1, t2 -> t1 * t2 })
        .subscribe { print("$it  ") }

    // 1 2 3 4  5 6
    // 6 7 8 9 10
    // 看上面兩行再看結(jié)果很明顯了吧

    // [zip]: 6  14  24  36  50

    println()
}

/**
 * 使用`combineLast` 做組合操作
 *
 * 用第一個數(shù)據(jù)源的最后一項(xiàng)和第二個數(shù)據(jù)源的每一項(xiàng)做合并
 */
fun combineLast() {
    print("[combineLast]: ")

    Observable.combineLatest(Observable.range(1, 6), Observable.range(6, 5),
        BiFunction<Int, Int, Int> { t1, t2 -> t1 * t2 })
        .subscribe { print("$it  ") }

    // 1 2 3 4  5 6
    // 6 7 8 9 10
    // 看上面兩行再看結(jié)果很明顯了吧

    // [combineLast]: 36  42  48  54  60

    println()
}

/**
 * 使用`reduce` 做組合操作
 *
 * 與 scan() 操作符的作用一樣也是將發(fā)送數(shù)據(jù)以一定邏輯聚合起來魏保,
 * 這兩個的區(qū)別在于 scan() 每處理一次數(shù)據(jù)就會將事件發(fā)送給觀察者熬尺,而 reduce() 會將所有數(shù)據(jù)聚合在一起才會發(fā)送事件給觀察者
 */
fun reduce() {
    print("[reduce]: ")

    Observable.just(0, 1, 2, 3)
        .reduce { t1, t2 -> t1 + t2 }
        .subscribe { print("$it  ") }

    // [reduce]: 6

    println()
}

/**
 * 使用`collect` 做組合操作
 *
 * 將數(shù)據(jù)收集到數(shù)據(jù)結(jié)構(gòu)當(dāng)中
 */
fun collect() {
    print("[collect]: ")

    // `collect`接收兩個參數(shù) 第一個是要收集到的數(shù)據(jù)解構(gòu) 第二個是數(shù)據(jù)到數(shù)據(jù)結(jié)構(gòu)中的操作
    Observable.just(1, 2, 3, 4)
        .collect({ ArrayList<Int>() }, { t1, t2 -> t1.add(t2) })
        .subscribe(Consumer<ArrayList<Int>> {
            print("$it  ")
        })

    // [collect]: [1, 2, 3, 4]

    println()
}

/**
 * 使用`count` 做組合操作
 *
 * 返回被觀察者發(fā)送事件的數(shù)量
 */
fun count() {
    print("[count]: ")

    Observable.just(1, 2, 3)
        .count()
        .subscribe(Consumer {
            print("$it  ")
        })

    // [count]: 3

    println()
}

/**
 * 用于在發(fā)射數(shù)據(jù)之前停頓指定的時間
 */
fun delay() {
    print("[delay]: ")

    Observable.range(1, 5).delay(1, TimeUnit.SECONDS).subscribe { print("$it  ") }

    println()
}

/**
 * do 系列
 */
fun doSeries() {
    print("[doSeries]: ")
    // `doOnEach`  當(dāng)每個`onNext`調(diào)用[前]觸發(fā) 并可取出`onNext`發(fā)送的值  但是方法參數(shù)是一個`Notification<T>`的包裝 可以通過`.value`取出`onNext`的值
    // `doOnNext`  在每個`onNext`調(diào)用[前]觸發(fā) 并可取出`onNext`發(fā)送的值  方法參數(shù)就是`onNext`的值
    // `doAfterNext`   在每個`onNext`調(diào)用[后]觸發(fā) 并可取出`onNext`發(fā)送的值  方法參數(shù)就是`onNext`的值
    // `doOnComplete`  在`onComplete`調(diào)用[前]觸發(fā)
    // `doOnError`  在`onError`調(diào)用[前]觸發(fā)
    // `doOnSubscribe`  在`onSubscribe`調(diào)用[前]觸發(fā)
    // `doOnDispose`  在調(diào)用 Disposable 的 dispose() 之[后]回調(diào)該方法
    // `doOnTerminate `  在 onError 或者 onComplete 發(fā)送之[前]回調(diào)
    // `doAfterTerminate `   在onError 或者 onComplete 發(fā)送之[后]回調(diào)  取消訂閱后就不會回調(diào)
    // `doFinally`   在所有事件發(fā)送完畢之后回調(diào)該方法   即使取消訂閱也會回調(diào)
    // `onErrorReturn`   當(dāng)接受到一個 onError() 事件之后回調(diào),返回的值會回調(diào) onNext() 方法谓罗,并正常結(jié)束該事件序列
    // `onErrorResumeNext`   當(dāng)接收到 onError() 事件時粱哼,返回一個新的 Observable,并正常結(jié)束事件序列
    // `onExceptionResumeNext`   與 onErrorResumeNext() 作用基本一致檩咱,但是這個方法只能捕捉 Exception

    // Test Code:
    Observable.create<String> { emitter ->
        emitter.onNext("K")
        emitter.onNext("Y")
        emitter.onNext("L")
        emitter.onNext("E")
        emitter.onComplete()
    }.doOnTerminate {
        print("doOnNext: $  ")
    }.subscribeBy(
        onNext = { print("accept: $it  ") },
        onComplete = { print("  onComplete  ") },
        onError = { print("  onError  ") }
    )

    println()
}

/**
 * `retry`
 *
 * 另:`retryUntil` 出現(xiàn)錯誤事件之后揭措,可以通過此方法判斷是否繼續(xù)發(fā)送事件 true不重試 false重試
 *
 * 如果出現(xiàn)錯誤事件,則會重新發(fā)送所有事件序列刻蚯。times 是代表重新發(fā)的次數(shù)绊含。
 */
fun retry() {
    print("[retry]: ")

    Observable.create<String> { emitter ->
        emitter.onNext("K")
        emitter.onError(Exception("404"))
    }
        .retry(2)
        .subscribeBy(
            onNext = { print("accept: $it  ") },
            onComplete = { print("  onComplete  ") },
            onError = { print("  onError  ") }
        )

    // [retry]: accept: K  accept: K  accept: K    onError
    // 重試了2次

    println()
}

/**
 * `subscribeOn`
 *
 * 指定被觀察者的線程,要注意的時炊汹,如果多次調(diào)用此方法躬充,只有第一次有效。
 */
fun subscribeOn() {
    print("[subscribeOn]: ")

//    Observable.create<String> { emitter ->
//        emitter.onNext("K")
//        print("current thread: ${Thread.currentThread().name}")
//    }.subscribeOn(Schedulers.computation()).subscribe()

    // current thread: RxComputationThreadPool-2

    println()
}

/**
 * `observeOn`
 *
 * 指定觀察者的線程讨便,每指定一次就會生效一次充甚。
 */
fun observeOn() {
    print("[observeOn]: ")

//    Observable.create<String> { emitter ->
//        emitter.onNext("K")
//    }.observeOn(Schedulers.io())
//        .subscribe { print("current thread: ${Thread.currentThread().name}") }

    // current thread : RxCachedThreadScheduler -1

    println()
}

/**
 * RxKotlin擴(kuò)展庫的一個簡單使用
 * 也是RxKotlin官方給出的一個例子
 *
 * 更多查看:https://github.com/ReactiveX/RxKotlin/blob/2.x/README.md
 */
fun rkExExample() {
    print("[rkExExample]: ")

    val list = listOf("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

    // 相當(dāng)于是Observable.fromIterable(this) 和上面的fromArray()類似 一個數(shù)組 一個集合
    list.toObservable()  // extension function for Iterables
        .filter { it.length > 5 }
        .subscribeBy(   // 對應(yīng)上面`create`創(chuàng)建方式的最后調(diào)用的subscribe
            onNext = { print("$it  ") },
            onError = { it.printStackTrace() },
            onComplete = { print(" Done! ") }
        )

    // Result:
    // [rkExExample]: Epsilon   Done!
    println()
}


/**
 * `compose`
 * 與`Transformer`連用
 *
 * `compose`操作符和Transformer結(jié)合使用,一方面讓代碼看起來更加簡潔化霸褒,另一方面能夠提高代碼的復(fù)用性伴找。
 * RxJava提倡鏈?zhǔn)秸{(diào)用,`compose`能夠防止鏈?zhǔn)奖淮蚱啤? *
 * compose操作于整個數(shù)據(jù)流中傲霸,能夠從數(shù)據(jù)流中得到原始的Observable<T>/Flowable<T>...
 * 當(dāng)創(chuàng)建Observable/Flowable...時疆瑰,compose操作符會立即執(zhí)行,而不像其他的操作符需要在onNext()調(diào)用后才執(zhí)行
 */
fun compose() {
    println("[compose]: ")

    Observable.just(1, 2)
        .compose(transformerInt2String())
        .compose(applySchedulers())
        .subscribe {
            print("$it  ")
            if (it == "1") print(" ${Thread.currentThread().name} ")
        }

    println()
}


// 用于`flatMap`舉例子
data class Person(private val name: String, val planList: List<Plan>)

data class Plan(val actionList: List<String>)

// 用于`compose`舉例子
// 將發(fā)射的Int轉(zhuǎn)換為String
fun transformerInt2String() = ObservableTransformer<Int, String> { upstream -> upstream.map { int -> "$int" } }

// 切換線程
fun <T> applySchedulers() =
    ObservableTransformer<T, T> { upstream -> upstream.observeOn(Schedulers.io()).subscribeOn(Schedulers.io()) }

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末昙啄,一起剝皮案震驚了整個濱河市穆役,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌梳凛,老刑警劉巖耿币,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異韧拒,居然都是意外死亡淹接,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進(jìn)店門叛溢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來塑悼,“玉大人,你說我怎么就攤上這事楷掉∠崴猓” “怎么了?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長斑鸦。 經(jīng)常有香客問我愕贡,道長,這世上最難降的妖魔是什么巷屿? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任固以,我火速辦了婚禮,結(jié)果婚禮上嘱巾,老公的妹妹穿的比我還像新娘憨琳。我一直安慰自己,他們只是感情好浓冒,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布栽渴。 她就那樣靜靜地躺著,像睡著了一般稳懒。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上慢味,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天场梆,我揣著相機(jī)與錄音,去河邊找鬼纯路。 笑死或油,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的驰唬。 我是一名探鬼主播顶岸,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼叫编!你這毒婦竟也來了辖佣?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤搓逾,失蹤者是張志新(化名)和其女友劉穎卷谈,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體霞篡,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡世蔗,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了朗兵。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片污淋。...
    茶點(diǎn)故事閱讀 40,102評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖余掖,靈堂內(nèi)的尸體忽然破棺而出寸爆,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布而昨,位于F島的核電站救氯,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏歌憨。R本人自食惡果不足惜着憨,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望务嫡。 院中可真熱鬧甲抖,春花似錦、人聲如沸心铃。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽去扣。三九已至柱衔,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間愉棱,已是汗流浹背唆铐。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留奔滑,地道東北人艾岂。 一個月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像朋其,于是被迫代替她去往敵國和親王浴。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,044評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 本文首發(fā)于“隨手記技術(shù)團(tuán)隊(duì)”公眾號 大概從2015年開始梅猿,RxJava1.0開始快速流行起來氓辣,短短兩年時間,RxJ...
    HolenZhou閱讀 2,162評論 0 19
  • 請?jiān)试S我借鑒前輩們的東西~~~~ 感激不盡~~~~~ 以下為Android 框架排行榜 么么噠~ Android...
    嗯_新閱讀 2,057評論 3 32
  • 注意Rxjava配合Retrofit進(jìn)行網(wǎng)絡(luò)請求進(jìn)行了更新,對Rxjava生命周期處理更加合理,詳情請看Demo ...
    sweetying閱讀 11,988評論 3 97
  • 一粒没、Retrofit詳解 ·Retrofit的官網(wǎng)地址為 : http://square.github.io/re...
    余生_d630閱讀 1,862評論 0 5
  • 人物:M君(已婚已育)筛婉,H君(已婚未育),W君(大齡單身)癞松,三人為大學(xué)同寢爽撒,關(guān)系非常好,畢業(yè)多年响蓉,依然樂于互相八卦...
    陸拾雜記閱讀 322評論 2 5