對RxJava的使用還一直停留在與Retrofit一起使用做網(wǎng)絡(luò)請求,其他的還都不了解伊群,所以花了不少時間整理把RxJava的操作符基本都敲了一遍考杉,熟悉了一遍。參考了一些博客舰始,現(xiàn)在把我整理的分享出來給大家崇棠。以下代碼完全在IDEA中編寫,可以直接放到IDEA里跑起來丸卷,需要注意的是枕稀,我使用的語言是Kotlin,所以如果沒有Kotlin環(huán)境的話需要先配置一下Kotlin的環(huán)境谜嫉。
首先RxJava2 gradle集成:
implementation "io.reactivex.rxjava2:rxjava:2.2.8"
implementation 'io.reactivex.rxjava2:rxkotlin:2.0.0'
同時我使用到了RxKotlin
萎坷,一個非常不錯的RxJava Kotlin擴(kuò)展庫,也是reactivex
出品沐兰。
2019/5/5 更新
compose
操作符
好了哆档,廢話不多說,話都在代碼里??
接下來看代碼住闯,有點(diǎn)長哦??
package com.kylec.kc.rxjava
import io.reactivex.Observable
import io.reactivex.ObservableTransformer
import io.reactivex.Observer
import io.reactivex.disposables.Disposable
import io.reactivex.functions.BiFunction
import io.reactivex.functions.Consumer
import io.reactivex.rxkotlin.subscribeBy
import io.reactivex.rxkotlin.toObservable
import io.reactivex.schedulers.Schedulers
import java.util.*
import java.util.concurrent.TimeUnit
import kotlin.collections.ArrayList
import kotlin.random.Random
/**
* 給Android開發(fā)者的RxJava詳解
* https://gank.io/post/560e15be2dca930e00da1083
*
* RxKotlin
* https://github.com/ReactiveX/RxKotlin/blob/2.x/README.md
*
* RxJava系列
* http://www.reibang.com/p/823252f110b0
*
* RxJava2看這一篇文章就夠了
* https://juejin.im/post/5b17560e6fb9a01e2862246f
*
*
* 基礎(chǔ)類:
* Flowable: 多個流瓜浸,響應(yīng)式流和背壓
* Observable: 多個流,無背壓 (被觀察者)
* Single: 只有一個元素或者錯誤的流
* Completable: 沒有任何元素比原,只有一個完成和錯誤信號的流
* Maybe: 沒有任何元素或者只有一個元素或者只有一個錯誤的流
*
*
* 調(diào)度器種類:
* Schedulers.computation()
* 用于計(jì)算任務(wù)插佛,如事件循環(huán)或和回調(diào)處理,不要用于IO操作(IO操作請使用Schedulers.io())量窘;默認(rèn)線程數(shù)等于處理器的數(shù)量
*
* Schedulers.from(executor) 使用指定的Executor作為調(diào)度器
*
* Schedulers.single() 該調(diào)度器的線程池只能同時執(zhí)行一個線程
*
* Schedulers.io()
* 用于IO密集型任務(wù)雇寇,如異步阻塞IO操作,這個調(diào)度器的線程池會根據(jù)需要增長蚌铜;
* 對于普通的計(jì)算任務(wù)锨侯,請使用Schedulers.computation();
* 默認(rèn)是一個CachedThreadScheduler厘线,很像一個有線程緩存的新線程調(diào)度器
*
* Schedulers.newThread() 為每個任務(wù)創(chuàng)建一個新線程
*
* Schedulers.trampoline() 當(dāng)其它排隊(duì)的任務(wù)完成后识腿,在當(dāng)前線程排隊(duì)開始執(zhí)行。
*
* AndroidSchedulers.mainThread() 主線程造壮,UI線程渡讼,可以用于更新界面
*
*
* 注意:本例部分結(jié)果未打印 因?yàn)樵谶@里有延時但線程已掛掉 正常在Android中使用是沒事的
*
* Created by KYLE on 2019/4/28 - 19:57
*/
fun main() {
// ---------------- `Observable` 創(chuàng)建事件序列的方法 ----------------
println("---------------- `Observable`創(chuàng)建事件序列的方法 ----------------")
create()
interval()
defer()
emptyNeverError()
repeat()
timer()
from()
just()
range()
println()
// ---------------- `Observable` 變換操作 ----------------
println("---------------- `Observable`變換操作 ----------------")
mapCast()
flatMap2contactMap()
flatMapExample()
flatMapIterable()
buffer()
groupBy()
scan()
window()
println()
// ---------------- `Observable` 過濾操作/條件操作符 ----------------
println("---------------- `Observable` 過濾操作/條件操作符 ----------------")
filter()
element()
distinct()
skip()
take()
ignoreElements()
debounce()
ofType()
all()
contains()
isEmpty()
defaultIfEmpty()
amb()
println()
// ---------------- `Observable` 組合操作 ----------------
println("---------------- `Observable` 組合操作 ----------------")
concat()
merge()
startWith()
zip()
combineLast()
reduce()
collect()
count()
println()
// ------------------- 功能操作符/輔助操作 -------------------
println("------------------- 功能操作符/輔助操作 -------------------")
delay()
doSeries()
retry()
subscribeOn()
observeOn()
println()
// ---------------- RxKotlin擴(kuò)展庫 ----------------
println("---------------- RxKotlin擴(kuò)展庫 ----------------")
rkExExample()
println()
// ------------------- 額外 其他 -------------------
println("------------------- 額外 其他 -------------------")
compose()
println()
}
/**
* 使用基本`create()`方法創(chuàng)建事件序列
*/
fun create() {
print("[create]: ")
Observable.create<String> { emitter ->
with(emitter) {
onNext("Hello")
onNext("Handsome")
onNext("Kotlin")
onComplete()
}
}.subscribe(object : Observer<String> {
override fun onSubscribe(d: Disposable) {
print("onSubscribe ")
}
override fun onError(e: Throwable) {}
override fun onComplete() {
print(" onComplete ")
}
override fun onNext(t: String) {
print("$t ")
}
})
println()
}
/**
* 使用`interval()`方法創(chuàng)建事件序列
* 間隔發(fā)射
*/
fun interval() {
print("[interval]: ")
// 每隔1秒發(fā)送一個整數(shù) 從0開始 (默認(rèn)執(zhí)行無數(shù)次 使用`take(int)`方法限制執(zhí)行次數(shù))
val disposable = Observable.interval(0, 1, TimeUnit.SECONDS)
.take(5)
.subscribe { print("$it ") }
if (!disposable.isDisposed) disposable.dispose()
println()
/*
其他重載方法:
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler)
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
`initialDelay`參數(shù)用來指示開始發(fā)射第一個整數(shù)的之前要停頓的時間,時間的單位與period一樣耳璧,都是通過unit參數(shù)來指定的成箫;
`period`參數(shù)用來表示每個發(fā)射之間停頓多少時間;
`unit`表示時間的單位旨枯,是TimeUnit類型的蹬昌;
`scheduler`參數(shù)指定數(shù)據(jù)發(fā)射和等待時所在的線程。
*/
}
/**
* 使用`defer()`方法創(chuàng)建事件序列
*
* `defer`直到有觀察者訂閱時才創(chuàng)建Observable攀隔,并且為每個觀察者創(chuàng)建一個新的Observable
* `defer`操作符會一直等待直到有觀察者訂閱它皂贩,然后它使用Observable工廠方法生成一個Observable
*/
fun defer() {
print("[defer]: ")
val observable =
Observable.defer { Observable.just(System.currentTimeMillis()) }
observable.subscribe { print("$it ") } // 454 訂閱時才產(chǎn)生了Observable
print(" ")
observable.subscribe { print("$it ") } // 459 訂閱時才產(chǎn)生了Observable
println()
}
/**
* 使用`empty()` `never()` `error()`方法創(chuàng)建事件序列
*
* public static <T> Observable<T> `empty()`:創(chuàng)建一個不發(fā)射任何數(shù)據(jù)但是正常終止的Observable
* public static <T> Observable<T> `never()`:創(chuàng)建一個不發(fā)射數(shù)據(jù)也不終止的Observable
* public static <T> Observable<T> `error(Throwable exception)`:創(chuàng)建一個不發(fā)射數(shù)據(jù)以一個錯誤終止的Observable
*/
fun emptyNeverError() {
print("[empty]: ")
Observable.empty<String>().subscribeBy(
onNext = { print(" next ") },
onComplete = { print(" complete ") },
onError = { print(" error ") }
)
// `empty()`只會調(diào)用onComplete方法
println()
print("[never]: ")
Observable.never<String>().subscribeBy(
onNext = { print(" next ") },
onComplete = { print(" complete ") },
onError = { print(" error ") }
)
// 什么也不會做
println()
print("[error]: ")
Observable.error<Exception>(Exception()).subscribeBy(
onNext = { print(" next ") },
onComplete = { print(" complete ") },
onError = { print(" error ") }
)
// `error()`只會調(diào)用onError方法
println()
}
/**
* 使用`repeat()`方法創(chuàng)建事件序列
*
* 表示指定的序列要發(fā)射多少次
*/
fun repeat() {
// 重載方法
// public final Observable<T> repeat(long times)
// public final Observable<T> repeatUntil(BooleanSupplier stop)
// public final Observable<T> repeatWhen(Function<? super Observable<Object>, ? extends ObservableSource<?>> handler)
print("[repeat]: ")
// 不指定次數(shù)即無限次發(fā)送(內(nèi)部調(diào)用有次數(shù)的重載方法并傳入Long.MAX_VALUE) 別執(zhí)行啊 ~ 卡的不行 ~
// Observable.range(5, 10).repeat().subscribe { print("$it ") }
Observable.range(5, 10).repeat(1).subscribe { print("$it ") }
println()
print("[repeatUntil]: ")
// repeatUntil在滿足指定要求的時候停止重復(fù)發(fā)送栖榨,否則會一直發(fā)送
// 這里當(dāng)隨機(jī)產(chǎn)生的數(shù)字`<10`時停止發(fā)送 否則繼續(xù) (這里始終為true(即停止重復(fù)) 省的瘋了似的執(zhí)行)
val numbers = arrayOf(0, 1, 2, 3, 4)
numbers.toObservable().repeatUntil {
Random(10).nextInt() < 10
}.subscribe { print("$it ") }
println()
}
/**
* 使用`timer()`方法創(chuàng)建事件序列
*
* 執(zhí)行延時任務(wù)
*
* 創(chuàng)建一個在給定的時間段之后返回一個特殊值的Observable,它在延遲一段給定的時間后發(fā)射一個簡單的數(shù)字0
*/
fun timer() {
print("[timer]: ")
// 在500毫秒之后輸出一個數(shù)字0
val disposable = Observable.timer(500, TimeUnit.MILLISECONDS).subscribe { print("$it ") }
if (!disposable.isDisposed) disposable.dispose()
println()
}
/**
* 使用`from()`方法快捷創(chuàng)建事件隊(duì)列
*
* `fromArray`
* `fromCallable`
* `fromIterable` (和上面的fromArray類似 只不過這里是集合罷了)
*
* 將傳入的數(shù)組依次發(fā)送出來
*/
fun from() {
print("[fromArray]: ")
val names = arrayOf("ha", "hello", "yummy", "kt", "world", "green", "delicious")
// 注意:使用`*`展開數(shù)組
val disposable = Observable.fromArray(*names).subscribe { print("$it ") }
if (!disposable.isDisposed) disposable.dispose()
println()
print("[fromCallable]: ")
// 可以在Callable內(nèi)執(zhí)行一段代碼 并返回一個值給觀察者
Observable.fromCallable { 1 }.subscribe { print("$it ") }
println()
}
/**
* 使用`just()`方法快捷創(chuàng)建事件隊(duì)列
*
* 將傳入的參數(shù)依次發(fā)送出來(最少1個 最多10個)
*/
fun just() {
print("[just]: ")
val disposable = Observable.just("Just1", "Just2", "Just3")
// 將會依次調(diào)用:
// onNext("Just1");
// onNext("Just2");
// onNext("Just3");
// onCompleted();
.subscribe { print("$it ") }
if (!disposable.isDisposed) disposable.dispose()
println()
}
/**
* 使用`range()`方法快捷創(chuàng)建事件隊(duì)列
*
* 創(chuàng)建一個序列
*/
fun range() {
print("[range]: ")
// 用Observable.range()方法產(chǎn)生一個序列
// 用map方法將該整數(shù)序列映射成一個字符序列
// 最后將得到的序列輸出來 forEach內(nèi)部也是調(diào)用的 subscribe(Consumer<? super T> onNext)
val disposable = Observable.range(0, 10)
.map { item -> "range$item" }
// .forEach { print("$it ") }
.subscribeBy(
onNext = { print("$it ") },
onComplete = { print("range complete !!! ") }
)
if (!disposable.isDisposed) disposable.dispose()
println()
}
/**
* 使用`map()` `cast()` 做變換操作
*
* `map`操作符對原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個你選擇的函數(shù)明刷,然后返回一個發(fā)射這些結(jié)果的Observable婴栽。默認(rèn)不在任何特定的調(diào)度器上執(zhí)行
*
* `cast`操作符將原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)都強(qiáng)制轉(zhuǎn)換為一個指定的類型`(多態(tài))`,然后再發(fā)射數(shù)據(jù)辈末,它是map的一個特殊版本
*/
fun mapCast() {
print("[map]: ")
Observable.range(1, 5).map { item -> "to String $item" }.subscribe { print("$it ") }
println()
print("[cast]: ")
// 將`Date`轉(zhuǎn)換為`Any` (如果前面的Class無法轉(zhuǎn)換成第二個Class就會出現(xiàn)ClassCastException)
Observable.just(Date()).cast(Any::class.java).subscribe { print("$it ") }
println()
}
/*
`map`與`flatMap`的區(qū)別(出自朱凱):
`map`是在一個 item 被發(fā)射之后愚争,到達(dá) map 處經(jīng)過轉(zhuǎn)換變成另一個 item ,然后繼續(xù)往下走挤聘;
`flapMap`是 item 被發(fā)射之后轰枝,到達(dá) flatMap 處經(jīng)過轉(zhuǎn)換變成一個 Observable
而這個 Observable 并不會直接被發(fā)射出去,而是會立即被激活组去,然后把它發(fā)射出的每個 item 都傳入流中鞍陨,再繼續(xù)走下去。
*/
/**
* 使用`flatMap()` `contactMap()` 做變換操作
*
* `flatMap`將一個發(fā)送事件的上游Observable變換為多個發(fā)送事件的Observables添怔,然后將它們發(fā)射的事件合并后放進(jìn)一個單獨(dú)的Observable里
* `flatMap`不保證順序 `contactMap()`保證順序
*/
fun flatMap2contactMap() {
print("[flatMap]: ")
/*
`flatMap()` 的原理是這樣的:
1. 使用傳入的事件對象創(chuàng)建一個 Observable 對象湾戳;
2. 并不發(fā)送這個 Observable, 而是將它激活,于是它開始發(fā)送事件广料;
3. 每一個創(chuàng)建出來的 Observable 發(fā)送的事件砾脑,都被匯入同一個 Observable ,
而這個 Observable 負(fù)責(zé)將這些事件統(tǒng)一交給 Subscriber 的回調(diào)方法艾杏。
這三個步驟韧衣,把事件拆成了兩級,通過一組新創(chuàng)建的 Observable 將初始的對象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去购桑。
而這個『鋪平』就是 flatMap() 所謂的 flat畅铭。
*/
Observable.range(1, 5).flatMap {
Observable.just("$it to flat")
}.subscribe { print("$it ") }
println()
print("[contactMap]: ")
Observable.range(1, 5).concatMap {
Observable.just("$it to concat")
}.subscribe { print("$it ") }
println()
}
/**
* `flatMap`挺重要的,再舉一個例子
*
* 依次打印Person集合中每個元素中Plan的action
*/
fun flatMapExample() {
print("[flatMapExample]: ")
arrayListOf(
Person("KYLE", arrayListOf(Plan(arrayListOf("Study RxJava", "May 1 to go home")))),
Person("WEN QI", arrayListOf(Plan(arrayListOf("Study Java", "See a Movie")))),
Person("LU", arrayListOf(Plan(arrayListOf("Study Kotlin", "Play Game")))),
Person("SUNNY", arrayListOf(Plan(arrayListOf("Study PHP", "Listen to music"))))
).toObservable().flatMap {
Observable.fromIterable(it.planList)
}.flatMap {
Observable.fromIterable(it.actionList)
}.subscribeBy(
onNext = { print("$it ") }
)
println()
}
/**
* 使用`flatMapIterable()`做變換操作
*
* 將上流的任意一個元素轉(zhuǎn)換成一個Iterable對象
*/
fun flatMapIterable() {
print("[flatMapIterable]: ")
Observable.range(1, 5)
.flatMapIterable { integer ->
Collections.singletonList("$integer")
}
.subscribe { print("$it ") }
// [flatMapIterable]: 1 2 3 4 5
println()
}
/**
* 使用`buffer()`做變換操作
*
* 用于將整個流進(jìn)行分組
*/
fun buffer() {
print("[buffer]: ")
// 生成一個7個整數(shù)構(gòu)成的流勃蜘,然后使用`buffer`之后硕噩,這些整數(shù)會被3個作為一組進(jìn)行輸出
// count 是一個buffer的最大值
// skip 是指針后移的距離(不定義時就為count)
// 例如 1 2 3 4 5 buffer(3) 的結(jié)果為:[1,2,3] [4,5] (buffer(3)也就是buffer(3,3))
// 例如 1 2 3 4 5 buffer(3,2) 的結(jié)果為:[1,2,3] [3,4,5] [5]
Observable.range(1, 5).buffer(3)
.subscribe {
print("${Arrays.toString(it.toIntArray())} ")
}
println()
}
/**
* 使用`groupBy()`做變換操作
*
* 用于分組元素(根據(jù)groupBy()方法返回的值進(jìn)行分組)
* 將發(fā)送的數(shù)據(jù)進(jìn)行分組,每個分組都會返回一個被觀察者缭贡。
*/
fun groupBy() {
print("[groupBy]: ")
// 使用concat方法先將兩個Observable拼接成一個Observable炉擅,然后對其元素進(jìn)行分組。
// 這里我們的分組依據(jù)是整數(shù)的值阳惹,這樣我們將得到一個Observable<GroupedObservable<Integer, Integer>>類型的Observable谍失。
// 然后,我們再將得到的序列拼接成一個莹汤,并進(jìn)行訂閱輸出
Observable.concat(Observable.concat(
Observable.range(1, 3), Observable.range(1, 4)
).groupBy { it }
).subscribe { print("groupBy: $it ") }
// [groupBy]: groupBy: 1 groupBy: 1 groupBy: 2 groupBy: 2 groupBy: 3 groupBy: 3 groupBy: 4
println()
}
/**
* 使用`scan()`做變換操作
*
* 將數(shù)據(jù)以一定的邏輯聚合起來
*
* scan操作符對原始Observable發(fā)射的第一項(xiàng)數(shù)據(jù)應(yīng)用一個函數(shù)快鱼,然后將那個函數(shù)的結(jié)果作為自己的第一項(xiàng)數(shù)據(jù)發(fā)射。
* 它將函數(shù)的結(jié)果同第二項(xiàng)數(shù)據(jù)一起填充給這個函數(shù)來產(chǎn)生它自己的第二項(xiàng)數(shù)據(jù)。
* 它持續(xù)進(jìn)行這個過程來產(chǎn)生剩余的數(shù)據(jù)序列抹竹。這個操作符在某些情況下被叫做`accumulator`
*/
fun scan() {
print("[scan]: ")
val disposable = Observable.just(1, 2, 3, 4, 5)
.scan { t1: Int, t2: Int -> t1 + t2 }
.subscribe { print("$it ") }
if (!disposable.isDisposed) disposable.dispose()
// [scan]: 1 3 6 10 15
println()
}
/**
* 使用`window()`做變換操作
*
* 將事件分組 參數(shù)`count`就是分的組數(shù)
*
* `window`和`buffer`類似线罕,但不是發(fā)射來自原始Observable的數(shù)據(jù)包,它發(fā)射的是Observable柒莉,
* 這些Observables中的每一個都發(fā)射原始Observable數(shù)據(jù)的一個子集闻坚,最后發(fā)射一個onCompleted通知。
*/
fun window() {
print("[window]: ")
Observable.range(1, 10).window(3)
.subscribeBy(
onNext = { it.subscribe { int -> print("{${it.hashCode()} : $int} ") } },
onComplete = { print("onComplete ") }
)
println()
}
/**
* 使用`filter()`做過濾操作
*
* 對源做過濾
*/
fun filter() {
print("[filter]: ")
// 過濾掉 <=5 的數(shù)據(jù)源 只有 >5 的數(shù)據(jù)源會發(fā)送出去
Observable.range(1, 10).filter { it > 5 }.subscribe { print("$it ") }
println()
}
/**
* 使用`element()`獲取源中指定位置的數(shù)據(jù)
*
* `elementAt` 指定位置
* `firstElement` 第一個
* `lastElement` 最后一個
*/
fun element() {
print("[elementAt]: ")
// `elementAt` 在輸入的 index 超出事件序列的總數(shù)就不會出現(xiàn)任何結(jié)果
// `elementAtOrError` 則會報(bào)異常
// `first...` 和 `last...` 都類似
// 只有index=0的數(shù)據(jù)源會被發(fā)射
Observable.range(1, 10).elementAt(0).subscribe { print("$it ") }
println()
print("[firstElement]: ")
Observable.range(1, 19).firstElement().subscribe { print("$it ") }
println()
print("[lastElement]: ")
Observable.range(34, 2).lastElement().subscribe { print("$it ") }
println()
}
/**
* 使用`distinct()`對源中相同的數(shù)據(jù)進(jìn)行過濾
*/
fun distinct() {
print("[distinct]: ")
Observable.just(1, 1, 1, 2, 3, 4, 1, 5, 5, 6)
.distinct()
.subscribe { print("$it ") }
// [distinct]: 1 2 3 4 5 6
println()
print("[distinctUntilChanged]: ")
Observable.just(1, 1, 1, 2, 3, 4, 1, 5, 5, 6)
.distinctUntilChanged()
.subscribe { print("$it ") }
// [distinctUntilChanged]: 1 2 3 4 1 5 6
println()
}
/**
* 使用`skip()` 過濾掉數(shù)據(jù)的前n項(xiàng)
*
* `skip` 過濾掉數(shù)據(jù)的前n項(xiàng) 參數(shù)count代表跳過事件的數(shù)量
* `skipLast` 與`skip` 功能相反 過濾掉數(shù)據(jù)的后n項(xiàng)
* `skipUntil` 當(dāng) skipUntil() 中的 Observable 發(fā)送事件了兢孝,原來的 Observable 才會發(fā)送事件給觀察者。
* `skipWhile` 可以設(shè)置條件仅偎,當(dāng)某個數(shù)據(jù)滿足條件時不發(fā)送該數(shù)據(jù)跨蟹,反之則發(fā)送。
*/
fun skip() {
print("[skip]: ")
Observable.just(1, 2, 3, 4, 5, 6)
.skip(2)
.subscribe { print("$it ") }
// [skip]: 3 4 5 6
println()
print("[skipUntil]: ")
Observable.just(6)
.skipUntil<Int> { Observable.just(2).delay(2, TimeUnit.SECONDS).subscribe { print("$it ") } }
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe { print("$it ") }
println()
print("[skipWhile]: ")
Observable.just(1, 2, 3, 4)
.skipWhile {
it < 3
}.subscribe { print("$it ") }
// [skipWhile]: 3 4
println()
}
/**
* 使用`take()` 取數(shù)據(jù)的前n項(xiàng)
*
* `take` 取數(shù)據(jù)的前n項(xiàng) 參數(shù)count代表取的事件的數(shù)量
* `takeLast` 與`take`功能相反 取數(shù)據(jù)的后n項(xiàng)
* `takeUntil`
* `takeWhile`
*
* 與`skip...`對應(yīng)
*/
fun take() {
print("[take]: ")
Observable.range(1, 5).take(2).subscribe { print("$it ") }
// [take]: 1 2
println()
}
/**
* 使用`ignoreElements()` 過濾所有源Observable產(chǎn)生的結(jié)果
*
* 只會把Observable的`onComplete`和`onError`事件通知給訂閱者
*/
fun ignoreElements() {
print("[ignoreElements]: ")
Observable.just(1, 1, 2, 3, 4)
.ignoreElements()
.subscribeBy(
onComplete = { print(" onComplete ") },
onError = { print(" onError ") }
// 沒有`onNext`
)
println()
}
/**
* 使用`debounce()` 限制發(fā)射頻率過快
*
* 如果兩件事件發(fā)送的時間間隔小于設(shè)定的時間間隔則`前一件`事件就不會發(fā)送給觀察者
*/
fun debounce() {
print("[debounce]: ")
Observable.create<Int> { emitter ->
emitter.onNext(1)
Thread.sleep(900)
emitter.onNext(2)
}.debounce(1, TimeUnit.SECONDS)
.subscribe { print("$it ") }
// 2
println()
}
/**
* 使用`ofType()` 過濾不符合該類型事件
*/
fun ofType() {
print("[ofType]: ")
Observable.just(1, 2, 3, "k", "Y")
.ofType(String::class.java)
.subscribe { print("$it ") }
// [ofType]: k Y
println()
}
/**
* `all`
*
* 判斷事件序列是否全部滿足某個事件橘沥,如果都滿足則返回 true窗轩,反之則返回 false
*/
fun all() {
print("[all]: ")
Observable.just(1, 2, 3, 4)
.all { it < 5 }
.subscribe(Consumer {
print("$it ")
})
// [all]: true
println()
}
/**
* `contains`
*
* 判斷事件序列中是否含有某個元素,如果有則返回 true座咆,如果沒有則返回 false痢艺。
*/
fun contains() {
print("[contains]: ")
Observable.just(1, 2, 3, 4)
.contains(3)
.subscribe(Consumer {
print("$it ")
})
// [contains]: true
println()
}
/**
* `isEmpty`
*
* 判斷事件序列是否為空 是返回true 否返回false
*/
fun isEmpty() {
print("[isEmpty]: ")
Observable.create<String> { emitter ->
emitter.onComplete()
}.isEmpty
.subscribe(Consumer {
print("$it ")
})
// [isEmpty]: true
println()
}
/**
* `defaultIfEmpty`
*
* 如果觀察者只發(fā)送一個 onComplete() 事件,則可以利用這個方法發(fā)送一個值介陶。
*/
fun defaultIfEmpty() {
print("[defaultIfEmpty]: ")
Observable.create<Int> { emitter ->
emitter.onComplete()
}.defaultIfEmpty(666)
.subscribe { print("$it ") }
// [defaultIfEmpty]: 666
println()
}
/**
* `amb`
*
* amb() 要傳入一個 Observable 集合堤舒,但是只會發(fā)送最先發(fā)送事件的 Observable 中的事件,其余 Observable 將會被丟棄哺呜。
*/
fun amb() {
print("[amb]: ")
val list = ArrayList<Observable<Long>>()
list.add(Observable.intervalRange(1, 5, 2, 1, TimeUnit.SECONDS))
list.add(Observable.intervalRange(6, 5, 0, 1, TimeUnit.SECONDS))
Observable.amb(list)
.subscribe { print("$it ") }
// [amb]: 6 7 8 9 10
println()
}
/**
* 使用`concat()`做組合操作
*
* 將多個Observable拼接起來舌缤,但是它會嚴(yán)格按照傳入的Observable的順序進(jìn)行發(fā)射,一個Observable沒有發(fā)射完畢之前不會發(fā)射另一個Observable里面的數(shù)據(jù)
*
* `concat()`方法內(nèi)部還是調(diào)用的`concatArray(source1, source2)`方法某残,只是在調(diào)用前對傳入的參數(shù)做了`null`判斷
*
* 與 `merge()` 作用基本一樣国撵,只是 `merge()` 是并行發(fā)送事件,而 concat() 串行發(fā)送事件
*/
fun concat() {
print("[concat]: ")
val disposable = Observable.concat(Observable.range(1, 5), Observable.range(6, 5))
.subscribe { print("$it ") }
if (!disposable.isDisposed) disposable.dispose()
// [concat]: 1 2 3 4 5 6 7 8 9 10
println()
}
/**
* 使用`merge` 做組合操作
*
* 讓多個數(shù)據(jù)源的數(shù)據(jù)合并起來進(jìn)行發(fā)射(merge后的數(shù)據(jù)可能會交錯發(fā)射)
* 與 `concat()` 作用基本一樣玻墅,只是 `concat()` 是串行發(fā)送事件介牙,而 merge() 并行發(fā)送事件
*
* 內(nèi)部實(shí)際操作為調(diào)用了`fromArray()+flatMap`方法 只是在調(diào)用前對數(shù)據(jù)源參數(shù)做了`null`判斷
*
* 與`mergeError`的比較
* `mergeError`方法與`merge`方法的表現(xiàn)一致,
* 只是在處理由`onError`觸發(fā)的錯誤的時候有所不同澳厢。
* `mergeError`方法會等待所有的數(shù)據(jù)發(fā)射完畢之后才把錯誤發(fā)射出來环础,即使多個錯誤被觸發(fā),該方法也只會發(fā)射出一個錯誤信息赏酥。
* 而如果使用`merger`方法喳整,那么當(dāng)有錯誤被觸發(fā)的時候,該錯誤會直接被拋出來裸扶,并結(jié)束發(fā)射操作
*/
fun merge() {
print("[merge]: ")
Observable.merge(Observable.range(1, 5), Observable.range(6, 5))
.subscribe { print("$it ") }
// [merge]: 1 2 3 4 5 6 7 8 9 10
println()
}
/**
* 使用`startWith` 做組合操作 在發(fā)送事件之前追加事件
*
* `startWith` 追加一個事件
* `startWithArray` 追加多個事件
*
* `追加的事件會先發(fā)出`
*
* `startWith`方法可以用來在指定的數(shù)據(jù)源的之前插入幾個數(shù)據(jù)
*/
fun startWith() {
print("[startWith]: ")
Observable.range(5, 3)
.startWithArray(1, 2, 3, 4)
.startWith(0).subscribe { print("$it ") }
// [startWith]: 0 1 2 3 4 5 6 7
println()
}
/**
* 使用`zip` 做組合操作
*
* 用來將多個數(shù)據(jù)項(xiàng)進(jìn)行合并 根據(jù)各個被觀察者發(fā)送事件的順序一個個結(jié)合起來框都,最終發(fā)送的事件數(shù)量會與源 Observable 中最少事件的數(shù)量一樣
* 為什么呢?因?yàn)閿?shù)據(jù)源少的那個 Observable 發(fā)送完成后發(fā)送了 onComplete 方法,所以數(shù)據(jù)源多的那個就不會再發(fā)送事件了
*/
fun zip() {
print("[zip]: ")
Observable.zip(Observable.range(1, 6), Observable.range(6, 5),
BiFunction<Int, Int, Int> { t1, t2 -> t1 * t2 })
.subscribe { print("$it ") }
// 1 2 3 4 5 6
// 6 7 8 9 10
// 看上面兩行再看結(jié)果很明顯了吧
// [zip]: 6 14 24 36 50
println()
}
/**
* 使用`combineLast` 做組合操作
*
* 用第一個數(shù)據(jù)源的最后一項(xiàng)和第二個數(shù)據(jù)源的每一項(xiàng)做合并
*/
fun combineLast() {
print("[combineLast]: ")
Observable.combineLatest(Observable.range(1, 6), Observable.range(6, 5),
BiFunction<Int, Int, Int> { t1, t2 -> t1 * t2 })
.subscribe { print("$it ") }
// 1 2 3 4 5 6
// 6 7 8 9 10
// 看上面兩行再看結(jié)果很明顯了吧
// [combineLast]: 36 42 48 54 60
println()
}
/**
* 使用`reduce` 做組合操作
*
* 與 scan() 操作符的作用一樣也是將發(fā)送數(shù)據(jù)以一定邏輯聚合起來魏保,
* 這兩個的區(qū)別在于 scan() 每處理一次數(shù)據(jù)就會將事件發(fā)送給觀察者熬尺,而 reduce() 會將所有數(shù)據(jù)聚合在一起才會發(fā)送事件給觀察者
*/
fun reduce() {
print("[reduce]: ")
Observable.just(0, 1, 2, 3)
.reduce { t1, t2 -> t1 + t2 }
.subscribe { print("$it ") }
// [reduce]: 6
println()
}
/**
* 使用`collect` 做組合操作
*
* 將數(shù)據(jù)收集到數(shù)據(jù)結(jié)構(gòu)當(dāng)中
*/
fun collect() {
print("[collect]: ")
// `collect`接收兩個參數(shù) 第一個是要收集到的數(shù)據(jù)解構(gòu) 第二個是數(shù)據(jù)到數(shù)據(jù)結(jié)構(gòu)中的操作
Observable.just(1, 2, 3, 4)
.collect({ ArrayList<Int>() }, { t1, t2 -> t1.add(t2) })
.subscribe(Consumer<ArrayList<Int>> {
print("$it ")
})
// [collect]: [1, 2, 3, 4]
println()
}
/**
* 使用`count` 做組合操作
*
* 返回被觀察者發(fā)送事件的數(shù)量
*/
fun count() {
print("[count]: ")
Observable.just(1, 2, 3)
.count()
.subscribe(Consumer {
print("$it ")
})
// [count]: 3
println()
}
/**
* 用于在發(fā)射數(shù)據(jù)之前停頓指定的時間
*/
fun delay() {
print("[delay]: ")
Observable.range(1, 5).delay(1, TimeUnit.SECONDS).subscribe { print("$it ") }
println()
}
/**
* do 系列
*/
fun doSeries() {
print("[doSeries]: ")
// `doOnEach` 當(dāng)每個`onNext`調(diào)用[前]觸發(fā) 并可取出`onNext`發(fā)送的值 但是方法參數(shù)是一個`Notification<T>`的包裝 可以通過`.value`取出`onNext`的值
// `doOnNext` 在每個`onNext`調(diào)用[前]觸發(fā) 并可取出`onNext`發(fā)送的值 方法參數(shù)就是`onNext`的值
// `doAfterNext` 在每個`onNext`調(diào)用[后]觸發(fā) 并可取出`onNext`發(fā)送的值 方法參數(shù)就是`onNext`的值
// `doOnComplete` 在`onComplete`調(diào)用[前]觸發(fā)
// `doOnError` 在`onError`調(diào)用[前]觸發(fā)
// `doOnSubscribe` 在`onSubscribe`調(diào)用[前]觸發(fā)
// `doOnDispose` 在調(diào)用 Disposable 的 dispose() 之[后]回調(diào)該方法
// `doOnTerminate ` 在 onError 或者 onComplete 發(fā)送之[前]回調(diào)
// `doAfterTerminate ` 在onError 或者 onComplete 發(fā)送之[后]回調(diào) 取消訂閱后就不會回調(diào)
// `doFinally` 在所有事件發(fā)送完畢之后回調(diào)該方法 即使取消訂閱也會回調(diào)
// `onErrorReturn` 當(dāng)接受到一個 onError() 事件之后回調(diào),返回的值會回調(diào) onNext() 方法谓罗,并正常結(jié)束該事件序列
// `onErrorResumeNext` 當(dāng)接收到 onError() 事件時粱哼,返回一個新的 Observable,并正常結(jié)束事件序列
// `onExceptionResumeNext` 與 onErrorResumeNext() 作用基本一致檩咱,但是這個方法只能捕捉 Exception
// Test Code:
Observable.create<String> { emitter ->
emitter.onNext("K")
emitter.onNext("Y")
emitter.onNext("L")
emitter.onNext("E")
emitter.onComplete()
}.doOnTerminate {
print("doOnNext: $ ")
}.subscribeBy(
onNext = { print("accept: $it ") },
onComplete = { print(" onComplete ") },
onError = { print(" onError ") }
)
println()
}
/**
* `retry`
*
* 另:`retryUntil` 出現(xiàn)錯誤事件之后揭措,可以通過此方法判斷是否繼續(xù)發(fā)送事件 true不重試 false重試
*
* 如果出現(xiàn)錯誤事件,則會重新發(fā)送所有事件序列刻蚯。times 是代表重新發(fā)的次數(shù)绊含。
*/
fun retry() {
print("[retry]: ")
Observable.create<String> { emitter ->
emitter.onNext("K")
emitter.onError(Exception("404"))
}
.retry(2)
.subscribeBy(
onNext = { print("accept: $it ") },
onComplete = { print(" onComplete ") },
onError = { print(" onError ") }
)
// [retry]: accept: K accept: K accept: K onError
// 重試了2次
println()
}
/**
* `subscribeOn`
*
* 指定被觀察者的線程,要注意的時炊汹,如果多次調(diào)用此方法躬充,只有第一次有效。
*/
fun subscribeOn() {
print("[subscribeOn]: ")
// Observable.create<String> { emitter ->
// emitter.onNext("K")
// print("current thread: ${Thread.currentThread().name}")
// }.subscribeOn(Schedulers.computation()).subscribe()
// current thread: RxComputationThreadPool-2
println()
}
/**
* `observeOn`
*
* 指定觀察者的線程讨便,每指定一次就會生效一次充甚。
*/
fun observeOn() {
print("[observeOn]: ")
// Observable.create<String> { emitter ->
// emitter.onNext("K")
// }.observeOn(Schedulers.io())
// .subscribe { print("current thread: ${Thread.currentThread().name}") }
// current thread : RxCachedThreadScheduler -1
println()
}
/**
* RxKotlin擴(kuò)展庫的一個簡單使用
* 也是RxKotlin官方給出的一個例子
*
* 更多查看:https://github.com/ReactiveX/RxKotlin/blob/2.x/README.md
*/
fun rkExExample() {
print("[rkExExample]: ")
val list = listOf("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
// 相當(dāng)于是Observable.fromIterable(this) 和上面的fromArray()類似 一個數(shù)組 一個集合
list.toObservable() // extension function for Iterables
.filter { it.length > 5 }
.subscribeBy( // 對應(yīng)上面`create`創(chuàng)建方式的最后調(diào)用的subscribe
onNext = { print("$it ") },
onError = { it.printStackTrace() },
onComplete = { print(" Done! ") }
)
// Result:
// [rkExExample]: Epsilon Done!
println()
}
/**
* `compose`
* 與`Transformer`連用
*
* `compose`操作符和Transformer結(jié)合使用,一方面讓代碼看起來更加簡潔化霸褒,另一方面能夠提高代碼的復(fù)用性伴找。
* RxJava提倡鏈?zhǔn)秸{(diào)用,`compose`能夠防止鏈?zhǔn)奖淮蚱啤? *
* compose操作于整個數(shù)據(jù)流中傲霸,能夠從數(shù)據(jù)流中得到原始的Observable<T>/Flowable<T>...
* 當(dāng)創(chuàng)建Observable/Flowable...時疆瑰,compose操作符會立即執(zhí)行,而不像其他的操作符需要在onNext()調(diào)用后才執(zhí)行
*/
fun compose() {
println("[compose]: ")
Observable.just(1, 2)
.compose(transformerInt2String())
.compose(applySchedulers())
.subscribe {
print("$it ")
if (it == "1") print(" ${Thread.currentThread().name} ")
}
println()
}
// 用于`flatMap`舉例子
data class Person(private val name: String, val planList: List<Plan>)
data class Plan(val actionList: List<String>)
// 用于`compose`舉例子
// 將發(fā)射的Int轉(zhuǎn)換為String
fun transformerInt2String() = ObservableTransformer<Int, String> { upstream -> upstream.map { int -> "$int" } }
// 切換線程
fun <T> applySchedulers() =
ObservableTransformer<T, T> { upstream -> upstream.observeOn(Schedulers.io()).subscribeOn(Schedulers.io()) }