前言
Flow
是kotlin協(xié)程中的流围橡。RxJava就是流式編程的庫。Flow屬于冷流對應RxJava中的Observable Flowable Single MayBe和Completable等战坤。Kotlin協(xié)程中的熱流實現(xiàn)MutableSharedFlow和MutableStateFlow等,對應RxJava中熱流PublisherSubject和BehaviorSubject残拐。
- 冷流:較少的訪問和修改
- 熱流:頻繁地讀取和更新
Flow使用
fun main() {
runBlocking (Dispatchers.Default){
// 發(fā)送10個元素途茫,從0到9
val myFlow = flow {
repeat(10){
emit(it)
}
}
launch {
myFlow.collect{
println("Coroutine1:$it")
}
}
launch {
myFlow.collect{
println("Coroutine2:$it")
}
}
}
}
協(xié)程1和2通過Flow.collect
訂閱Flow。
fun main() {
runBlocking (Dispatchers.Default){
// 發(fā)送10個元素溪食,從0到9
val myFlow = flow {
repeat(10){
// 修改原來的CoroutineContext,會異常
withContext(Dispatchers.IO){
emit(it)
}
}
}
launch {
myFlow.collect{
println("Coroutine1:$it")
}
}
}
}
Flow限制囊卜,不能修改原來的CoroutineContext。可以使用ChannelFlow
就能正常使用栅组。
fun main() {
runBlocking (Dispatchers.Default){
// 發(fā)送10個元素雀瓢,從0到9
val myFlow = channelFlow {
repeat(10){
// 可以修改原來的CoroutineContext
withContext(Dispatchers.IO){
channel.send(it)
}
}
}
launch {
myFlow.collect{
println("Coroutine1:$it")
}
}
}
}
fun main() {
runBlocking(Dispatchers.Default) {
// 發(fā)送10個元素,從0到9
val myFlow = flow {
repeat(10) {
try {
emit(it)
} catch (e: Throwable) {
emit(22)
}
}
}
launch {
myFlow.collect {
if (it == 2) {
// 這里出現(xiàn)異常后玉掸,collect訂閱就結束了(只打印2次刃麸,第三次就異常了)
error("Error")
}
println("Coroutine1:$it")
}
}
}
}
Flow中collect異常,那么訂閱就結束了司浪。
Flow工作原理
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
創(chuàng)建SafeFlow,繼承于AbstractFlow,訂閱調用的是collect泊业。檢查當前CoroutineContext和調用的collect方法傳入的是否一致,不一致就拋出異常啊易。 Flow被SafeCollector代理去檢查異常吁伺。 轉換前的流稱上游Upstream
,處理后再發(fā)送到下游Downstream
flatMap操作符
類似于RxJava中的concatMap操作符
fun main() {
runBlocking(Dispatchers.Default) {
val myFlow = flow {
repeat(3) {
emit(it)
}
}
launch {
// 將原來的流元素構建成一個新的流(按照原來的流元素輸出)
myFlow.flatMapConcat { upstreamValue ->
flow {
delay(1000L - upstreamValue * 100)
repeat(2) {
emit(upstreamValue * 10 + it)
}
}
}.collect {
println("collect $it")
}
}
}
}
輸出:
collect 0
collect 1
collect 10
collect 11
collect 20
collect 21
將原來發(fā)送3個元素,通過flatMapConcat()
發(fā)送兩個元素认罩。越是先發(fā)送的元素延遲時間越長,然后按順序輸出6個元素续捂。
flatMapMerge
類似于RxJava中的flatMap
fun main() {
runBlocking(Dispatchers.Default) {
val myFlow = flow {
repeat(3) {
emit(it)
}
}
launch {
// 將原來的流元素構建成一個新的流(默認并發(fā)16個)誰先執(zhí)行完就發(fā)送誰
myFlow.flatMapMerge { upstreamValue ->
flow {
delay(1000L - upstreamValue * 100)
repeat(2) {
emit(upstreamValue * 10 + it)
}
}
}.collect {
println("collect $it")
}
}
}
}
輸出:
collect 20
collect 21
collect 10
collect 11
collect 0
collect 1
不會保證原來的順序垦垂,哪個流先處理完就先發(fā)送數(shù)據(jù)。concurrency
默認值16牙瓢,并行執(zhí)行的數(shù)量劫拗。當concurrency為1時和flatMapConcat一樣。
fun main() {
runBlocking(Dispatchers.Default) {
val myFlow = flow {
repeat(3) {
emit(it)
}
}
launch {
// 將原來的流元素構建成一個新的流(并發(fā)數(shù)是2矾克,達到2個的時候等待然后再執(zhí)行下一個)
myFlow.flatMapMerge(2) { upstreamValue ->
flow {
delay(1000L - upstreamValue * 100)
repeat(2) {
emit(upstreamValue * 10 + it)
}
}
}.collect {
println("collect $it")
}
}
}
}
輸出:
collect 10
collect 11
collect 0
collect 1
collect 20
collect 21
flatMapLatest
類似于RxJava中的switchMap
fun main() {
runBlocking(Dispatchers.Default) {
val myFlow = flow {
repeat(3) {
emit(it)
}
}
launch {
// 前面沒執(zhí)行完的Flow會被取消页慷,然后被后續(xù)的Flow替換
myFlow.flatMapLatest { upstreamValue ->
flow {
delay(1000L - upstreamValue * 100)
repeat(2) {
emit(upstreamValue * 10 + it)
}
}
}.collect {
println("collect $it")
}
}
}
}
輸出:
collect 20
collect 21
總結
介紹了Flow常用的操作符map flatMap(串式) flatMapMerge(并發(fā)) flatmapLatest(取代舊的)等簡單使用胁附。