干掉RxJava系列--2. 手寫FlowBus替代RxBus/EventBus/LiveDataBus

LiveData的不足

  • LiveData 是一個專用于 Android 的具備自主生命周期感知能力的可觀察的數(shù)據(jù)存儲器類席吴,被有意簡化設(shè)計,這使得開發(fā)者很容易上手,但其不足有如下兩點:
  1. LiveData只能在主線程更新數(shù)據(jù)(postValue底層也是切換到主線程的,而且可能會有丟數(shù)據(jù)的問題)抢腐;
  2. LiveData操作符不夠強大, 對于較為復(fù)雜的交互數(shù)據(jù)流場景姑曙,建議使用 RxJava 或 Flow;
  3. LiveData與Android平臺緊密相連,雖然LiveData在表現(xiàn)層中運行良好迈倍,但它并不適合領(lǐng)域?qū)由丝浚驗轭I(lǐng)域?qū)幼詈檬仟毩⒂谄脚_的;
  • LiveData 對于 Java 開發(fā)者、初學(xué)者或是一些簡單場景而言仍是可行的解決方案啼染。對于MVVM架構(gòu)而言宴合,View和ViewModel之間可以通過LiveData交互(看了下面就知道其實也可以用StateFlow), ViewModel和Repository之間就可以通過Flow交互;

RxJava的不足

  • RxJava還是相當(dāng)強大的,基于事件流的鏈?zhǔn)秸{(diào)用迹鹅,進(jìn)行耗時任務(wù)卦洽,線程切換,是一個很好的異步操作庫, 但是對于Android開發(fā)來說其也有一些不足之處
  1. 強大意味著復(fù)雜斜棚,其繁多的操作符簡直是初學(xué)者的噩夢阀蒂;
  2. 它是非官方的,google自然也就不會花大力氣去推廣和優(yōu)化弟蚀;
  3. 為項目的包體積帶來了額外的增加蚤霞;

Flow

  • Flow 是一種 "冷流"(Cold Stream)。"冷流" 是一種數(shù)據(jù)源义钉,該類數(shù)據(jù)源的生產(chǎn)者會在每個監(jiān)聽者開始消費事件的時候執(zhí)行(即不消費則不生產(chǎn)數(shù)據(jù)昧绣,而LiveData的發(fā)送端并不依賴于接收端),從而在每個訂閱上創(chuàng)建新的數(shù)據(jù)流(有多個訂閱者的時候捶闸,他們各自的事件是獨立的)夜畴。一旦消費者停止監(jiān)聽或者生產(chǎn)者的阻塞結(jié)束,數(shù)據(jù)流將會被自動關(guān)閉删壮。
  • Flow 是 Kotlin 協(xié)程與響應(yīng)式編程模型結(jié)合的產(chǎn)物贪绘,支持線程切換、背壓央碟,通過協(xié)程取消功能提供自動清理功能兔簇,因此傾向于執(zhí)行一些重型任務(wù)。
  • 使用 take, first, toList 等操作符可以簡化 Flow 的相關(guān)代碼測試硬耍。
  • Flow本身并不了解Android的生命周期,也不提供Android生命周期狀態(tài)變化時收集器的自動暫停和恢復(fù),可以使用LifecycleCoroutineScope的擴展边酒,如 launchWhenStarted來啟動coroutine來收集我們的Flow--這些收集器將自動暫停经柴,并與組件的Lifecycle同步恢復(fù)。
  • 相較于 Channel墩朦,F(xiàn)low 末端操作符 會觸發(fā)數(shù)據(jù)流的執(zhí)行坯认,同時會根據(jù)生產(chǎn)者一側(cè)流操作來決定是成功完成操作還是拋出異常,因此 Flows 會自動地關(guān)閉數(shù)據(jù)流,不會在生產(chǎn)者一側(cè)泄漏資源牛哺;而一旦 Channel 沒有正確關(guān)閉陋气,生產(chǎn)者可能不會清理大型資源,因此 Channels 更容易造成資源泄漏引润。

Flow的一些常用操作符

//        val flow = flowOf(1,2,3,4,5)
//        val flow: Flow<Int> = flow {
//            List(20) {
//                emit(it)//發(fā)送數(shù)據(jù)
//                delay(300)
//            }
//        }
val flow = (1..10).asFlow()
lifecycleScope.launch {
    flow.flowOn(Dispatchers.IO)//設(shè)定它運行時所使用的調(diào)度器,設(shè)置的調(diào)度器只對它之前的操作有影響
        .onStart { log("onStart") }
        .flowOn(Dispatchers.Main)
        .onEach {
            log("onEach:$it")
            delay(300)
        }
        .filter {//過濾
            it % 2 == 0
        }
        .map {//變換
            log("map:$it*$it")
            it * it
        }
        .transform<Int,String> {
            "num=$it"
//                    emit("num1=$it")
//                    emit("num2=$it")
        }
        .flowOn(Dispatchers.IO)
        .onCompletion {//訂閱流的完成,執(zhí)行在流完成時的邏輯
            log("onCompletion: $it")
        }
        .catch {//捕獲 Flow 的異常,catch 函數(shù)只能捕獲它的上游的異常
            log("catch: $it")
        }
        .flowOn(Dispatchers.Main)
        .collect {//消費Flow
            log("collect1_1: $it")
        }
    //Flow 可以被重復(fù)消費
    flow.collect { log("collect1_2: $it") }
    //除了可以在 collect 處消費 Flow 的元素以外巩趁,還可以通過 onEach 來做到這一點。
    // 這樣消費的具體操作就不需要與末端操作符放到一起淳附,collect 函數(shù)可以放到其他任意位置調(diào)用
    flow.onEach {
        log("onEach2:$it")
    }
    withContext(Dispatchers.IO) {
        delay(1000)
        flow.collect()
    }
    //除了使用子協(xié)程執(zhí)行上流外议慰,我們還可以使用launchIn函數(shù)來讓Flow使用全新的協(xié)程上下文
    flow.onEach {
        log("onEach2:$it")
    }.launchIn(CoroutineScope(Dispatchers.IO))
        .join()//主線程等待這個協(xié)程執(zhí)行結(jié)束

Flow的取消

lifecycleScope.launch(Dispatchers.IO) {
    val flow2 = (1..10).asFlow().onEach { delay(1000) }
    val job: Job = lifecycleScope.launch {
        log("lifecycleScope.launch")
        flow2.flowOn(Dispatchers.IO)//設(shè)定它運行時所使用的調(diào)度器
            .collect {//消費Flow
                log("flow2:$it")
            }
    }
    delay(2000)
    job.cancelAndJoin()
}

Flow 的背壓

  • 只要是響應(yīng)式編程,就一定會有背壓問題奴曙,我們先來看看背壓究竟是什么别凹。
  • 背壓問題在生產(chǎn)者的生產(chǎn)速率高于消費者的處理速率的情況下出現(xiàn)。為了保證數(shù)據(jù)不丟失洽糟,我們也會考慮添加緩存來緩解問題:
//為 Flow 添加緩沖
flow {
    List(5) {
        emit(it)
    }
}.buffer().collect {
    log("flow buffer collect:$it")
}
  • 也可以為 buffer 指定一個容量炉菲。不過,如果我們只是單純地添加緩存坤溃,而不是從根本上解決問題就始終會造成數(shù)據(jù)積壓拍霜。
  • 問題產(chǎn)生的根本原因是生產(chǎn)和消費速率的不匹配,除直接優(yōu)化消費者的性能以外浇雹,我們也可以采取一些取舍的手段沉御。
  • 第一種是 conflate。與 Channel 的 Conflate 模式一致昭灵,新數(shù)據(jù)會覆蓋老數(shù)據(jù)吠裆,
flow {
    List(10) {
        emit(it)
    }
}
.conflate()
.collect { value ->
    log("flow conflate Collecting $value")
    delay(100)
    log("$value collected flow conflate ")
}
  • 第二種是 collectLatest。顧名思義烂完,只處理最新的數(shù)據(jù)试疙,這看上去似乎與 conflate 沒有區(qū)別,其實區(qū)別大了:它并不會直接用新數(shù)據(jù)覆蓋老數(shù)據(jù)抠蚣,而是每一個都會被處理祝旷,只不過如果前一個還沒被處理完后一個就來了的話,處理前一個數(shù)據(jù)的邏輯就會被取消除 collectLatest 之外還有 mapLatest嘶窄、flatMapLatest 等等怀跛,都是這個作用。
flow {
    List(10) {
        emit(it)
    }
}.collectLatest { value ->
    log("flow collectLatest Collecting $value")
    delay(100)
    log("$value collected flow collectLatest ")
}

使用更為安全的方式收集 Android UI 數(shù)據(jù)流

  • 在 Android 開發(fā)中柄冲,請使用 LifecycleOwner.addRepeatingJob吻谋、suspend Lifecycle.repeatOnLifecycle 或 Flow.flowWithLifecycle 從 UI 層安全地收集數(shù)據(jù)流。(lifecycle-runtime-ktx:2.4.+ 庫中所提供的)
lifecycleScope.launch {
    delay(500)
    repeatOnLifecycle(Lifecycle.State.STARTED) {
        flow.collect { log("collect2: $it") }
    }
}
lifecycleScope.launchWhenStarted {
    delay(1000)
    flow.collect { log("collect3: $it") }
}
lifecycleScope.launch {
    delay(1500)
    flow.flowWithLifecycle(lifecycle,Lifecycle.State.STARTED)
        .collect { log("collect4: $it") }
}

SharedFlow

  • 冷流和訂閱者只能是一對一的關(guān)系现横,當(dāng)我們要實現(xiàn)一個流漓拾,多個訂閱者的需求時阁最,就需要熱流了,SharedFlow就是一種熱流
  • 其構(gòu)造函數(shù)如下
public fun <T> MutableSharedFlow(
    replay: Int = 0,//當(dāng)新的訂閱者Collect時骇两,發(fā)送幾個已經(jīng)發(fā)送過的數(shù)據(jù)給它速种,默認(rèn)為0,即默認(rèn)新訂閱者不會獲取以前的數(shù)據(jù)
    extraBufferCapacity: Int = 0,//表示減去replay低千,MutableSharedFlow還緩存多少數(shù)據(jù)配阵,默認(rèn)為0
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND//表示緩存策略,即緩沖區(qū)滿了之后Flow如何處理
    //BufferOverflow.SUSPEND 策略栋操,也就是掛起策略, 默認(rèn)為掛起
    //BufferOverflow.DROP_OLDEST: 丟棄舊數(shù)據(jù)
    //BufferOverflow.DROP_LATEST: 丟棄最新的數(shù)據(jù)
)
  • 簡單使用如下
val sharedFlow = MutableSharedFlow<String>()
lifecycleScope.launch(Dispatchers.IO) {
    delay(1000)
    sharedFlow.emit("aaa")
    delay(1000)
    sharedFlow.emit("bbb")
    delay(1000)
    sharedFlow.emit("ccc")
}
lifecycleScope.launch {
    delay(500)
    sharedFlow.collect { log("collect1:$it") }
}
lifecycleScope.launch {
    delay(1500)
    sharedFlow.collect { log("collect2:$it") }
}
lifecycleScope.launch {
    delay(2500)
    sharedFlow.collect { log("collect3:$it") }
}
lifecycleScope.launch {
    delay(3500)
    sharedFlow.collect { log("collect4:$it") }
}
  • 將冷流Flow轉(zhuǎn)化為SharedFlow
lifecycleScope.launch {
    (1..5).asFlow().shareIn(
        //1. 共享開始時所在的協(xié)程作用域范圍
        scope = lifecycleScope,
        //2. 控制共享的開始和結(jié)束的策略
        // started = SharingStarted.Lazily,//當(dāng)首個訂閱者出現(xiàn)時開始闸餐,在scope指定的作用域被結(jié)束時終止
        // started = SharingStarted.Eagerly,//立即開始,而在scope指定的作用域被結(jié)束時終止
        //對于那些只執(zhí)行一次的操作矾芙,您可以使用Lazily或者Eagerly舍沙。然而,如果您需要觀察其他的流剔宪,就應(yīng)該使用WhileSubscribed來實現(xiàn)細(xì)微但又重要的優(yōu)化工作
        //WhileSubscribed策略會在沒有收集器的情況下取消上游數(shù)據(jù)流
        started = SharingStarted.WhileSubscribed(
            500,//stopTimeoutMillis 控制一個以毫秒為單位的延遲值拂铡,指的是最后一個訂閱者結(jié)束訂閱與停止上游流的時間差。默認(rèn)值是 0(比如當(dāng)用戶旋轉(zhuǎn)設(shè)備時葱绒,原來的視圖會先被銷毀感帅,然后數(shù)秒鐘內(nèi)重建)
            Long.MAX_VALUE//replayExpirationMillis表示數(shù)據(jù)重播的過時時間,如果用戶離開應(yīng)用太久地淀,此時您不想讓用戶看到陳舊的數(shù)據(jù)失球,你可以用到這個參數(shù)
        ),
        //3. 狀態(tài)流的重播個數(shù)
        replay = 0
    ).collect { log("shareIn.collect:$it") }
}

StateFlow

  • StateFlow繼承于SharedFlow,是SharedFlow的一個特殊變種
  • 構(gòu)造函數(shù)如下殷绍,只需要傳入一個默認(rèn)值
public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
  • StateFlow本質(zhì)上是一個replay為1部脚,并且沒有緩沖區(qū)的SharedFlow,因此第一次訂閱時會先獲得默認(rèn)值
  • StateFlow僅在值已更新忱叭,并且值發(fā)生了變化時才會返回瘫证,即如果更新后的值沒有變化,也不會回調(diào)Collect方法该互,這點與LiveData不同
  • StateFlow 與 LiveData是最接近的,因為:
1. 它始終是有值的斗蒋。
2. 它的值是唯一的煤惩。
3. 它允許被多個觀察者共用 (因此是共享的數(shù)據(jù)流)爷肝。
4. 它永遠(yuǎn)只會把最新的值重現(xiàn)給訂閱者猾浦,這與活躍觀察者的數(shù)量是無關(guān)的。
  • 簡單使用
log("StateFlow 默認(rèn)值:111")
val stateFlow = MutableStateFlow("111")

lifecycleScope.launch {
    delay(500)
    stateFlow.collect { log("StateFlow collect1:$it") }
}
lifecycleScope.launch {
    delay(1500)
    stateFlow.collect { log("StateFlow collect2:$it") }
}
lifecycleScope.launch {
    delay(2500)
    stateFlow.collect { log("StateFlow collect3:$it") }
}

lifecycleScope.launch(Dispatchers.IO) {
    delay(5000)
    log("StateFlow re emit:111")
    stateFlow.emit("111")
    delay(1000)
    log("StateFlow emit:222")
    stateFlow.emit("222")
}
  • 普通流Flow轉(zhuǎn)化成StateFlow
val stateFlow2: StateFlow<Int> = flow {
    List(10) {
        delay(300)
        emit(it)
    }
}.stateIn(
    scope = lifecycleScope,
    started = WhileSubscribed(5000),//等待5秒后仍然沒有訂閱者存在就終止協(xié)程
    initialValue = 666//默認(rèn)值
)
lifecycleScope.launchWhenStarted {//STARTED狀態(tài)時會開始收集流灯抛,并且在RESUMED狀態(tài)時保持收集金赦,進(jìn)入STOPPED狀態(tài)時結(jié)束收集過程
    stateFlow2.collect { log("StateFlow shareIn.collect:$it") }

}

StateFlow與SharedFlow 的區(qū)別

  1. SharedFlow配置更為靈活,支持配置replay,緩沖區(qū)大小等对嚼,StateFlow是SharedFlow的特化版本夹抗,replay固定為1,緩沖區(qū)大小默認(rèn)為0;
  2. StateFlow與LiveData類似猪半,支持通過myFlow.value獲取當(dāng)前狀態(tài)兔朦,如果有這個需求,必須使用StateFlow;
  3. SharedFlow支持發(fā)出和收集重復(fù)值磨确,而StateFlow當(dāng)value重復(fù)時沽甥,不會回調(diào)collect;
  4. 對于新的訂閱者,StateFlow只會重播當(dāng)前最新值乏奥,SharedFlow可配置重播元素個數(shù)(默認(rèn)為0摆舟,即不重播);

基于SharedFlow封裝FlowBus

創(chuàng)建消息類EventMessage

class EventMessage {
    /**
     * 消息的key
     */
    var key: Int

    /**
     * 消息的主體message
     */
    var message: Any? = null
    private var messageMap: HashMap<String, Any?>? = null

    constructor(key: Int, message: Any?) {
        this.key = key
        this.message = message
    }

    constructor(key: Int) {
        this.key = key
    }

    fun put(key: String, message: Any?) {
        if (messageMap == null) {
            messageMap = HashMap<String, Any?>()
        }
        messageMap?.set(key, message)
    }

    operator fun <T> get(key: String?): T? {
        if (messageMap != null) {
            try {
                return messageMap!![key] as T?
            } catch (e: ClassCastException) {
                e.printStackTrace()
            }
        }
        return null
    }
}

創(chuàng)建FlowBus

class FlowBus : ViewModel() {
    companion object {
        val instance by lazy(mode = LazyThreadSafetyMode.SYNCHRONIZED) { FlowBus() }
    }

    //正常事件
    private val events = mutableMapOf<String, Event<*>>()

    //粘性事件
    private val stickyEvents = mutableMapOf<String, Event<*>>()

    fun with(key: String, isSticky: Boolean = false): Event<Any> {
        return with(key, Any::class.java, isSticky)
    }

    fun <T> with(eventType: Class<T>, isSticky: Boolean = false): Event<T> {
        return with(eventType.name, eventType, isSticky)
    }

    @Synchronized
    fun <T> with(key: String, type: Class<T>?, isSticky: Boolean): Event<T> {
        val flows = if (isSticky) stickyEvents else events
        if (!flows.containsKey(key)) {
            flows[key] = Event<T>(key, isSticky)
        }
        return flows[key] as Event<T>
    }


    class Event<T>(private val key: String, isSticky: Boolean) {

        // private mutable shared flow
        private val _events = MutableSharedFlow<T>(
            replay = if (isSticky) 1 else 0,
            extraBufferCapacity = Int.MAX_VALUE
        )

        // publicly exposed as read-only shared flow
        val events = _events.asSharedFlow()

        /**
         * need main thread execute
         */
        fun observeEvent(
            lifecycleOwner: LifecycleOwner,
            dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate,
            minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
            action: (t: T) -> Unit
        ) {
            lifecycleOwner.lifecycle.addObserver(object : DefaultLifecycleObserver {
                override fun onDestroy(owner: LifecycleOwner) {
                    super.onDestroy(owner)
                    LjyLogUtil.d("EventBus.onDestroy:remove key=$key")
                    val subscriptCount = _events.subscriptionCount.value
                    if (subscriptCount <= 0)
                        instance.events.remove(key)
                }
            })
            lifecycleOwner.lifecycleScope.launch(dispatcher) {
                lifecycleOwner.lifecycle.whenStateAtLeast(minActiveState) {
                    events.collect {
                        try {
                            action(it)
                        } catch (e: Exception) {
                            LjyLogUtil.d("ker=$key , error=${e.message}")
                        }
                    }
                }
            }
        }

        /**
         * send value
         */
        suspend fun setValue(
            event: T,
            dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate
        ) {
            withContext(dispatcher) {
                _events.emit(event)
            }

        }
    }
}

使用FlowBus

FlowBus.instance.with(EventMessage::class.java).observeEvent(this) {
    LjyLogUtil.d("FlowBus.register1:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
}
lifecycleScope.launch(Dispatchers.IO) {
    withContext(Dispatchers.Main) {//不創(chuàng)建新的協(xié)程,指定協(xié)程上運行代碼塊,可以切換線程
        FlowBus.instance.with(EventMessage::class.java)
            .observeEvent(this@EventBusActivity) {
                LjyLogUtil.d("FlowBus.register2:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
            }
    }
}
FlowBus.instance.with(EventMessage::class.java).observeEvent(this) {
    LjyLogUtil.d("FlowBus.register3:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
}
lifecycleScope.launch(Dispatchers.Main) {

    val event = EventMessage(111)
    LjyLogUtil.d(
        "FlowBus:send1_${Thread.currentThread().name}_${
            GsonUtils.toJson(
                event
            )
        }"
    )
    FlowBus.instance.with(EventMessage::class.java).setValue(event)
    delay(2000)
    FlowBus.instance.with(EventMessage::class.java)
        .setValue(EventMessage(101))
    FlowBus.instance.with(EventMessage::class.java)
        .setValue(EventMessage(102))
    FlowBus.instance.with(EventMessage::class.java)
        .setValue(EventMessage(103))
    FlowBus.instance.with(EventMessage::class.java)
        .setValue(EventMessage(104))
    FlowBus.instance.with(EventMessage::class.java)
        .setValue(EventMessage(105))
}
lifecycleScope.launch(Dispatchers.IO) {
    delay(4000)
    val event = EventMessage(222, "bbb")
    LjyLogUtil.d(
        "FlowBus:send2_${Thread.currentThread().name}_${
            GsonUtils.toJson(
                event
            )
        }"
    )
    FlowBus.instance.with(EventMessage::class.java).setValue(event)
}
lifecycleScope.launch(Dispatchers.Default) {
    delay(6000)
    withContext(Dispatchers.Main) {
        val event = EventMessage(333, "ccc")
        event.put("key1", 123)
        event.put("key2", "abc")
        LjyLogUtil.d(
            "FlowBus:send3_${Thread.currentThread().name}_${
                GsonUtils.toJson(
                    event
                )
            }"
        )
        FlowBus.instance.with(EventMessage::class.java).setValue(event)
    }
}

進(jìn)一步優(yōu)化

  • 利用擴展函數(shù)邓了,ViewModelStoreOwner恨诱,及預(yù)傳EventMessage::class.javas是當(dāng)前項目中的使用更加簡單
//利用擴展函數(shù)
fun LifecycleOwner.observeEvent(
    dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate,
    minActiveState: Lifecycle.State = Lifecycle.State.STARTED,
    isSticky: Boolean = false,
    action: (t: EventMessage) -> Unit
) {
    ApplicationScopeViewModelProvider
        .getApplicationScopeViewModel(FlowBus::class.java)
        .with(EventMessage::class.java, isSticky = isSticky)
        .observeEvent(this@observeEvent, dispatcher, minActiveState, action)
}

fun postValue(
    event: EventMessage,
    delayTimeMillis: Long = 0,
    isSticky: Boolean = false,
    dispatcher: CoroutineDispatcher = Dispatchers.Main.immediate,
) {
    LjyLogUtil.d("FlowBus:send_${Thread.currentThread().name}_${GsonUtils.toJson(event)}")
    ApplicationScopeViewModelProvider
        .getApplicationScopeViewModel(FlowBus::class.java)
        .viewModelScope
        .launch(dispatcher) {
            delay(delayTimeMillis)
            ApplicationScopeViewModelProvider
                .getApplicationScopeViewModel(FlowBus::class.java)
                .with(EventMessage::class.java, isSticky = isSticky)
                .setValue(event)
        }
}

private object ApplicationScopeViewModelProvider : ViewModelStoreOwner {

    private val eventViewModelStore: ViewModelStore = ViewModelStore()

    override fun getViewModelStore(): ViewModelStore {
        return eventViewModelStore
    }

    private val mApplicationProvider: ViewModelProvider by lazy {
        ViewModelProvider(
            ApplicationScopeViewModelProvider,
            ViewModelProvider.AndroidViewModelFactory.getInstance(FlowBusInitializer.application)
        )
    }

    fun <T : ViewModel> getApplicationScopeViewModel(modelClass: Class<T>): T {
        return mApplicationProvider[modelClass]
    }

}

object FlowBusInitializer {
    lateinit var application: Application
    //在Application中初始化
    fun init(application: Application) {
        FlowBusInitializer.application = application
    }
}
  • 使用
lifecycleScope.launch(Dispatchers.IO) {
    observeEvent {
        LjyLogUtil.d("FlowBus.register1:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
    }
    observeEvent(Dispatchers.IO) {
        LjyLogUtil.d("FlowBus.register2:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
    }

    observeEvent(Dispatchers.Main) {
        LjyLogUtil.d("FlowBus.register3:${GsonUtils.toJson(it)}_${Thread.currentThread().name}")
    }
}

lifecycleScope.launch(Dispatchers.IO) {
    delay(1000)
    postValue(EventMessage(100))
    postValue(EventMessage(101), 1000)
    postValue(EventMessage(102, "bbb"), dispatcher = Dispatchers.IO)
    val event3 = EventMessage(103, "ccc")
    event3.put("key1", 123)
    event3.put("key2", "abc")
    postValue(event3, 2000, dispatcher = Dispatchers.Main)
}

參考

我是今陽句葵,如果想要進(jìn)階和了解更多的干貨厕鹃,歡迎關(guān)注微信公眾號 “今陽說” 接收我的最新文章

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市乍丈,隨后出現(xiàn)的幾起案子剂碴,更是在濱河造成了極大的恐慌,老刑警劉巖轻专,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件忆矛,死亡現(xiàn)場離奇詭異,居然都是意外死亡请垛,警方通過查閱死者的電腦和手機催训,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來叼屠,“玉大人瞳腌,你說我怎么就攤上這事【涤辏” “怎么了嫂侍?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長荚坞。 經(jīng)常有香客問我挑宠,道長,這世上最難降的妖魔是什么颓影? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任各淀,我火速辦了婚禮,結(jié)果婚禮上诡挂,老公的妹妹穿的比我還像新娘碎浇。我一直安慰自己临谱,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布奴璃。 她就那樣靜靜地躺著悉默,像睡著了一般。 火紅的嫁衣襯著肌膚如雪苟穆。 梳的紋絲不亂的頭發(fā)上抄课,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天,我揣著相機與錄音雳旅,去河邊找鬼跟磨。 笑死,一個胖子當(dāng)著我的面吹牛攒盈,可吹牛的內(nèi)容都是我干的抵拘。 我是一名探鬼主播,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼沦童,長吁一口氣:“原來是場噩夢啊……” “哼仑濒!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起偷遗,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤墩瞳,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后氏豌,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體喉酌,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年泵喘,在試婚紗的時候發(fā)現(xiàn)自己被綠了泪电。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡纪铺,死狀恐怖相速,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情鲜锚,我是刑警寧澤突诬,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布,位于F島的核電站芜繁,受9級特大地震影響旺隙,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜骏令,卻給世界環(huán)境...
    茶點故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一蔬捷、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧榔袋,春花似錦周拐、人聲如沸铡俐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽高蜂。三九已至,卻和暖如春罕容,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背稿饰。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工锦秒, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人喉镰。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓旅择,卻偏偏與公主長得像,于是被迫代替她去往敵國和親侣姆。 傳聞我的和親對象是個殘疾皇子生真,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容