基于 Kotlin Coroutine 實現(xiàn)的 EventBus

PLMM.jpg

一. 背景

這段時間接手了一個比較緊急的項目梗脾,它是一個運行在某開發(fā)板上的 Android 項目。

該項目采用的架構比較老,例如 RxJava 還在使用 1.x 的版本帅戒。起初看到源碼,我內心是拒絕的崖技。(這大半年來逻住,我在使用 C++ 開發(fā)桌面端、 Java/Kotlin 開發(fā)后端迎献,不過沒關系瞎访。)好在該項目最近開發(fā)的部分功能采用 Kotlin 編寫,那我開發(fā)的功能也打算使用 Kotlin吁恍。

二. RxJava 版本的 EventBus

兩年前装诡,我在寫《RxJava 2.x 實戰(zhàn)》的時候,寫過一個 RxJava 2 版本的 EventBus践盼,并且在實際的項目中驗證過鸦采。

它還需要一個第三方庫 RxRelay。RxRelay中的各個 Relay 既是 Observable 也是 Consumer 的 RxJava 類型咕幻,它們是一個沒有 onComplete 和 onError 的Subject渔伯。所以不必要擔心下游的觸發(fā)的終止狀態(tài)(onComplete 或 onError)。

RxRelay的Github地址:https://github.com/JakeWharton/RxRelay

RxBus 的源碼:

package com.safframework.study.rxbus4;

import com.jakewharton.rxrelay2.PublishRelay;
import com.jakewharton.rxrelay2.Relay;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Scheduler;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;

/**
 * Created by Tony Shen on 2017/6/14.
 */

public class RxBus {

    private Relay<Object> bus = null;
    private static RxBus instance;
    private final Map<Class<?>, Object> mStickyEventMap;

    //禁用構造方法
    private RxBus() {
        bus = PublishRelay.create().toSerialized();
        mStickyEventMap = new ConcurrentHashMap<>();
    }

    public static RxBus get() {
        return Holder.BUS;
    }

    public void post(Object event) {
        bus.accept(event);
    }

    public void postSticky(Object event) {
        synchronized (mStickyEventMap) {
            mStickyEventMap.put(event.getClass(), event);
        }
        bus.accept(event);
    }

    public <T> Observable<T> toObservable(Class<T> eventType) {
        return bus.ofType(eventType);
    }

    /**
     * 根據(jù)傳遞的 eventType 類型返回特定類型(eventType)的 被觀察者
     */
    public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
        synchronized (mStickyEventMap) {
            Observable<T> observable = bus.ofType(eventType);
            final Object event = mStickyEventMap.get(eventType);

            if (event != null) {
                return observable.mergeWith(Observable.create(new ObservableOnSubscribe<T>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<T> e) throws Exception {
                        e.onNext(eventType.cast(event));
                    }
                }));
            } else {
                return observable;
            }
        }
    }

    public boolean hasObservers() {
        return bus.hasObservers();
    }

    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext);
    }

    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError,
                                   Action onComplete, Consumer onSubscribe) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError, onComplete, onSubscribe);
    }

    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError,
                                   Action onComplete) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError, onComplete);
    }

    public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError) {
        return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError);
    }

    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext);
    }

    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError,
                                   Action onComplete, Consumer onSubscribe) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError, onComplete, onSubscribe);
    }

    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError,
                                   Action onComplete) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError, onComplete);
    }

    public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError) {
        return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError);
    }

    public <T> Disposable registerSticky(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext) {
        return toObservableSticky(eventType).observeOn(scheduler).subscribe(onNext);
    }

    public <T> Disposable registerSticky(Class<T> eventType, Consumer<T> onNext) {
        return toObservableSticky(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext);
    }

    public <T> Disposable registerSticky(Class<T> eventType, Consumer<T> onNext, Consumer onError) {
        return toObservableSticky(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext,onError);
    }

    /**
     * 移除指定eventType的Sticky事件
     */
    public <T> T removeStickyEvent(Class<T> eventType) {
        synchronized (mStickyEventMap) {
            return eventType.cast(mStickyEventMap.remove(eventType));
        }
    }

    /**
     * 移除所有的Sticky事件
     */
    public void removeAllStickyEvents() {
        synchronized (mStickyEventMap) {
            mStickyEventMap.clear();
        }
    }

    public void unregister(Disposable disposable) {
        if (disposable != null && !disposable.isDisposed()) {
            disposable.dispose();
        }
    }

    private static class Holder {
        private static final RxBus BUS = new RxBus();
    }

}

該版本 RxBus 支持異常處理和 Sticky 事件肄程。唯一的缺點是锣吼,不支持 Backpressure。

三. Kotlin Coroutine 版本的 EventBus

既然有了之前的 RxBus蓝厌,為何要重新寫一個呢玄叠?

首先,我們目前的項目并沒有采用 EventBus拓提。但是读恃,我寫的某一個 Service 需要跟 Activities 通信。我想偷懶代态,當然采用 EventBus 會比較簡單寺惫。但是,我們的 RxJava 版本還在用 1.x1囊伞西雀!

幸好,我們用了 Kotlin歉摧,部分代碼還用了 Coroutine艇肴,于是我想到了使用 Coroutine 的 Channel 來實現(xiàn) EventBus腔呜。

Channel 可以實現(xiàn)協(xié)程之間的數(shù)據(jù)通信。Kotlin 的 Channel 與 Java 的 BlockingQueue 類似再悼。BlockingQueue 的 put 和 take 操作核畴,相當于 Channel 的 send 和 receive 操作,但是 BlockingQueue 是阻塞操作而 Channel 都是掛起操作帮哈。

EventBus 用于注冊普通事件膛檀、Sticky 事件,事件的發(fā)布等等娘侍。

package com.safframework.eventbus

import android.util.Log
import kotlinx.coroutines.*
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.CoroutineContext

/**
 *
 * @FileName:
 *          com.safframework.eventbus.EventBus
 * @author: Tony Shen
 * @date: 2019-08-24 23:28
 * @version: V1.0 <描述當前版本功能>
 */
val UI: CoroutineDispatcher = Dispatchers.Main

object EventBus: CoroutineScope {

    private val TAG = "EventBus"

    private val job = SupervisorJob()

    override val coroutineContext: CoroutineContext = Dispatchers.Default + job

    private val contextMap = ConcurrentHashMap<String, MutableMap<Class<*>, EventData<*>>>()
    private val mStickyEventMap = ConcurrentHashMap<Class<*>, Any>()

    @JvmStatic
    fun <T> register(
        contextName: String,
        eventDispatcher: CoroutineDispatcher = UI,
        eventClass: Class<T>,
        eventCallback: (T) -> Unit
    ) {
        val eventDataMap = if (contextMap.containsKey(contextName)) {
            contextMap[contextName]!!
        } else {
            val eventDataMap = mutableMapOf<Class<*>, EventData<*>>()
            contextMap[contextName] = eventDataMap
            eventDataMap
        }

        eventDataMap[eventClass] = EventData(this, eventDispatcher, eventCallback)
    }

    @JvmStatic
    fun <T> register(
        contextName: String,
        eventDispatcher: CoroutineDispatcher = UI,
        eventClass: Class<T>,
        eventCallback: (T) -> Unit,
        eventFail:(Throwable)->Unit
    ) {
        val eventDataMap = if (contextMap.containsKey(contextName)) {
            contextMap[contextName]!!
        } else {
            val eventDataMap = mutableMapOf<Class<*>, EventData<*>>()
            contextMap[contextName] = eventDataMap
            eventDataMap
        }

        eventDataMap[eventClass] = EventData(this, eventDispatcher, eventCallback, eventFail)
    }

    @JvmStatic
    fun <T> registerSticky(
        contextName: String,
        eventDispatcher: CoroutineDispatcher = UI,
        eventClass: Class<T>,
        eventCallback: (T) -> Unit
    ) {
        val eventDataMap = if (contextMap.containsKey(contextName)) {
            contextMap[contextName]!!
        } else {
            val eventDataMap = mutableMapOf<Class<*>, EventData<*>>()
            contextMap[contextName] = eventDataMap
            eventDataMap
        }

        eventDataMap[eventClass] = EventData(this, eventDispatcher, eventCallback)

        val event = mStickyEventMap[eventClass]
        event?.let {

            postEvent(it)
        }
    }

    @JvmStatic
    fun <T> registerSticky(
        contextName: String,
        eventDispatcher: CoroutineDispatcher = UI,
        eventClass: Class<T>,
        eventCallback: (T) -> Unit,
        eventFail:(Throwable)->Unit
    ) {
        val eventDataMap = if (contextMap.containsKey(contextName)) {
            contextMap[contextName]!!
        } else {
            val eventDataMap = mutableMapOf<Class<*>, EventData<*>>()
            contextMap[contextName] = eventDataMap
            eventDataMap
        }

        eventDataMap[eventClass] = EventData(this, eventDispatcher, eventCallback, eventFail)

        val event = mStickyEventMap[eventClass]
        event?.let {

            postEvent(it)
        }
    }

    @JvmStatic
    fun post(event: Any, delayTime: Long = 0) {

        if (delayTime > 0) {
            launch {
                delay(delayTime)
                postEvent(event)
            }
        } else {
            postEvent(event)
        }
    }

    @JvmStatic
    fun postSticky(event: Any) {

        mStickyEventMap[event.javaClass] = event
    }

    @JvmStatic
    fun unregisterAllEvents() {

        Log.i(TAG,"unregisterAllEvents()")

        coroutineContext.cancelChildren()
        for ((_, eventDataMap) in contextMap) {
            eventDataMap.values.forEach {
                it.cancel()
            }
            eventDataMap.clear()
        }
        contextMap.clear()
    }

    @JvmStatic
    fun unregister(contextName: String) {

        Log.i(TAG,"$contextName")

        val cloneContexMap = ConcurrentHashMap<String, MutableMap<Class<*>, EventData<*>>>()
        cloneContexMap.putAll(contextMap)
        val map = cloneContexMap.filter { it.key == contextName }
        for ((_, eventDataMap) in map) {
            eventDataMap.values.forEach {
                it.cancel()
            }
            eventDataMap.clear()
        }
        contextMap.remove(contextName)
    }

    @JvmStatic
    fun <T> removeStickyEvent(eventType: Class<T>) {
        mStickyEventMap.remove(eventType)
    }

    private fun postEvent(event: Any) {
        
        val cloneContexMap = ConcurrentHashMap<String, MutableMap<Class<*>, EventData<*>>>()
        cloneContexMap.putAll(contextMap)
        for ((_, eventDataMap) in cloneContexMap) {
            eventDataMap.keys
                .firstOrNull { it == event.javaClass || it == event.javaClass.superclass }
                ?.let { key -> eventDataMap[key]?.postEvent(event) }
        }
    }
}

EventData 通過 channel 實現(xiàn)真正的發(fā)送咖刃、消費事件。

package com.safframework.eventbus

import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import java.lang.Exception

/**
 *
 * @FileName:
 *          com.safframework.eventbus.EventData
 * @author: Tony Shen
 * @date: 2019-08-25 00:20
 * @version: V1.0 <描述當前版本功能>
 */

data class EventData<T>(
    val coroutineScope: CoroutineScope,
    val eventDispatcher: CoroutineDispatcher,
    val onEvent: (T) -> Unit,
    val exception: ((Throwable)->Unit)? = null
) {

    private val channel = Channel<T>()

    init {
        coroutineScope.launch {
            channel.consumeEach { // 消費者循環(huán)地消費消息
                launch(eventDispatcher) {

                    if (exception!=null) {

                        try{
                            onEvent(it)
                        } catch (e:Exception) {

                            exception.invoke(e)
                        }
                    } else {

                        onEvent(it)
                    }
                }
            }
        }
    }

    fun postEvent(event: Any) {
        if (!channel.isClosedForSend) {

            coroutineScope.launch {
                channel.send(event as T)
            }
        } else {
            println("Channel is closed for send")
        }
    }

    fun cancel() {
        channel.cancel()
    }
}

EventBus github 地址:https://github.com/fengzhizi715/EventBus

該版本的 EventBus 跟 RxBus 的功能基本一致憾筏。上述 github 地址中嚎杨,包含有 demo 介紹了 EventBus 的具體使用,其實也跟 RxBus 的使用一致氧腰。

題外話枫浙,最近還抽空優(yōu)化了另外一個跟 Coroutines 相關的項目:https://github.com/fengzhizi715/Lifecycle-Coroutines-Extension

四. 總結

該版本的 EventBus 是給不使用 RxBus 或者其他版本 EventBus 提供了另一種選擇。當然古拴,該項目未來還有優(yōu)化的空間箩帚。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市黄痪,隨后出現(xiàn)的幾起案子紧帕,更是在濱河造成了極大的恐慌,老刑警劉巖桅打,帶你破解...
    沈念sama閱讀 219,427評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件是嗜,死亡現(xiàn)場離奇詭異,居然都是意外死亡挺尾,警方通過查閱死者的電腦和手機鹅搪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,551評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來遭铺,“玉大人丽柿,你說我怎么就攤上這事〉嘟” “怎么了航厚?”我有些...
    開封第一講書人閱讀 165,747評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長锰蓬。 經(jīng)常有香客問我,道長眯漩,這世上最難降的妖魔是什么芹扭? 我笑而不...
    開封第一講書人閱讀 58,939評論 1 295
  • 正文 為了忘掉前任麻顶,我火速辦了婚禮,結果婚禮上舱卡,老公的妹妹穿的比我還像新娘辅肾。我一直安慰自己,他們只是感情好轮锥,可當我...
    茶點故事閱讀 67,955評論 6 392
  • 文/花漫 我一把揭開白布矫钓。 她就那樣靜靜地躺著,像睡著了一般舍杜。 火紅的嫁衣襯著肌膚如雪新娜。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,737評論 1 305
  • 那天既绩,我揣著相機與錄音概龄,去河邊找鬼。 笑死饲握,一個胖子當著我的面吹牛私杜,可吹牛的內容都是我干的。 我是一名探鬼主播救欧,決...
    沈念sama閱讀 40,448評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼衰粹,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了笆怠?” 一聲冷哼從身側響起铝耻,我...
    開封第一講書人閱讀 39,352評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎骑疆,沒想到半個月后田篇,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,834評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡箍铭,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,992評論 3 338
  • 正文 我和宋清朗相戀三年泊柬,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片诈火。...
    茶點故事閱讀 40,133評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡兽赁,死狀恐怖,靈堂內的尸體忽然破棺而出冷守,到底是詐尸還是另有隱情刀崖,我是刑警寧澤,帶...
    沈念sama閱讀 35,815評論 5 346
  • 正文 年R本政府宣布拍摇,位于F島的核電站亮钦,受9級特大地震影響,放射性物質發(fā)生泄漏充活。R本人自食惡果不足惜蜂莉,卻給世界環(huán)境...
    茶點故事閱讀 41,477評論 3 331
  • 文/蒙蒙 一蜡娶、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧映穗,春花似錦窖张、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,022評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至辕录,卻和暖如春睦霎,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背踏拜。 一陣腳步聲響...
    開封第一講書人閱讀 33,147評論 1 272
  • 我被黑心中介騙來泰國打工碎赢, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人速梗。 一個月前我還...
    沈念sama閱讀 48,398評論 3 373
  • 正文 我出身青樓肮塞,卻偏偏與公主長得像,于是被迫代替她去往敵國和親姻锁。 傳聞我的和親對象是個殘疾皇子枕赵,可洞房花燭夜當晚...
    茶點故事閱讀 45,077評論 2 355

推薦閱讀更多精彩內容