一. 背景
這段時間接手了一個比較緊急的項目梗脾,它是一個運行在某開發(fā)板上的 Android 項目。
該項目采用的架構比較老,例如 RxJava 還在使用 1.x 的版本帅戒。起初看到源碼,我內心是拒絕的崖技。(這大半年來逻住,我在使用 C++ 開發(fā)桌面端、 Java/Kotlin 開發(fā)后端迎献,不過沒關系瞎访。)好在該項目最近開發(fā)的部分功能采用 Kotlin 編寫,那我開發(fā)的功能也打算使用 Kotlin吁恍。
二. RxJava 版本的 EventBus
兩年前装诡,我在寫《RxJava 2.x 實戰(zhàn)》的時候,寫過一個 RxJava 2 版本的 EventBus践盼,并且在實際的項目中驗證過鸦采。
它還需要一個第三方庫 RxRelay。RxRelay中的各個 Relay 既是 Observable 也是 Consumer 的 RxJava 類型咕幻,它們是一個沒有 onComplete 和 onError 的Subject渔伯。所以不必要擔心下游的觸發(fā)的終止狀態(tài)(onComplete 或 onError)。
RxRelay的Github地址:https://github.com/JakeWharton/RxRelay
RxBus 的源碼:
package com.safframework.study.rxbus4;
import com.jakewharton.rxrelay2.PublishRelay;
import com.jakewharton.rxrelay2.Relay;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Scheduler;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
/**
* Created by Tony Shen on 2017/6/14.
*/
public class RxBus {
private Relay<Object> bus = null;
private static RxBus instance;
private final Map<Class<?>, Object> mStickyEventMap;
//禁用構造方法
private RxBus() {
bus = PublishRelay.create().toSerialized();
mStickyEventMap = new ConcurrentHashMap<>();
}
public static RxBus get() {
return Holder.BUS;
}
public void post(Object event) {
bus.accept(event);
}
public void postSticky(Object event) {
synchronized (mStickyEventMap) {
mStickyEventMap.put(event.getClass(), event);
}
bus.accept(event);
}
public <T> Observable<T> toObservable(Class<T> eventType) {
return bus.ofType(eventType);
}
/**
* 根據(jù)傳遞的 eventType 類型返回特定類型(eventType)的 被觀察者
*/
public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
synchronized (mStickyEventMap) {
Observable<T> observable = bus.ofType(eventType);
final Object event = mStickyEventMap.get(eventType);
if (event != null) {
return observable.mergeWith(Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(@NonNull ObservableEmitter<T> e) throws Exception {
e.onNext(eventType.cast(event));
}
}));
} else {
return observable;
}
}
}
public boolean hasObservers() {
return bus.hasObservers();
}
public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext) {
return toObservable(eventType).observeOn(scheduler).subscribe(onNext);
}
public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError,
Action onComplete, Consumer onSubscribe) {
return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError, onComplete, onSubscribe);
}
public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError,
Action onComplete) {
return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError, onComplete);
}
public <T> Disposable register(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext, Consumer onError) {
return toObservable(eventType).observeOn(scheduler).subscribe(onNext, onError);
}
public <T> Disposable register(Class<T> eventType, Consumer<T> onNext) {
return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext);
}
public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError,
Action onComplete, Consumer onSubscribe) {
return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError, onComplete, onSubscribe);
}
public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError,
Action onComplete) {
return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError, onComplete);
}
public <T> Disposable register(Class<T> eventType, Consumer<T> onNext, Consumer onError) {
return toObservable(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext, onError);
}
public <T> Disposable registerSticky(Class<T> eventType, Scheduler scheduler, Consumer<T> onNext) {
return toObservableSticky(eventType).observeOn(scheduler).subscribe(onNext);
}
public <T> Disposable registerSticky(Class<T> eventType, Consumer<T> onNext) {
return toObservableSticky(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext);
}
public <T> Disposable registerSticky(Class<T> eventType, Consumer<T> onNext, Consumer onError) {
return toObservableSticky(eventType).observeOn(AndroidSchedulers.mainThread()).subscribe(onNext,onError);
}
/**
* 移除指定eventType的Sticky事件
*/
public <T> T removeStickyEvent(Class<T> eventType) {
synchronized (mStickyEventMap) {
return eventType.cast(mStickyEventMap.remove(eventType));
}
}
/**
* 移除所有的Sticky事件
*/
public void removeAllStickyEvents() {
synchronized (mStickyEventMap) {
mStickyEventMap.clear();
}
}
public void unregister(Disposable disposable) {
if (disposable != null && !disposable.isDisposed()) {
disposable.dispose();
}
}
private static class Holder {
private static final RxBus BUS = new RxBus();
}
}
該版本 RxBus 支持異常處理和 Sticky 事件肄程。唯一的缺點是锣吼,不支持 Backpressure。
三. Kotlin Coroutine 版本的 EventBus
既然有了之前的 RxBus蓝厌,為何要重新寫一個呢玄叠?
首先,我們目前的項目并沒有采用 EventBus拓提。但是读恃,我寫的某一個 Service 需要跟 Activities 通信。我想偷懶代态,當然采用 EventBus 會比較簡單寺惫。但是,我們的 RxJava 版本還在用 1.x1囊伞西雀!
幸好,我們用了 Kotlin歉摧,部分代碼還用了 Coroutine艇肴,于是我想到了使用 Coroutine 的 Channel 來實現(xiàn) EventBus腔呜。
Channel 可以實現(xiàn)協(xié)程之間的數(shù)據(jù)通信。Kotlin 的 Channel 與 Java 的 BlockingQueue 類似再悼。BlockingQueue 的 put 和 take 操作核畴,相當于 Channel 的 send 和 receive 操作,但是 BlockingQueue 是阻塞操作而 Channel 都是掛起操作帮哈。
EventBus 用于注冊普通事件膛檀、Sticky 事件,事件的發(fā)布等等娘侍。
package com.safframework.eventbus
import android.util.Log
import kotlinx.coroutines.*
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.CoroutineContext
/**
*
* @FileName:
* com.safframework.eventbus.EventBus
* @author: Tony Shen
* @date: 2019-08-24 23:28
* @version: V1.0 <描述當前版本功能>
*/
val UI: CoroutineDispatcher = Dispatchers.Main
object EventBus: CoroutineScope {
private val TAG = "EventBus"
private val job = SupervisorJob()
override val coroutineContext: CoroutineContext = Dispatchers.Default + job
private val contextMap = ConcurrentHashMap<String, MutableMap<Class<*>, EventData<*>>>()
private val mStickyEventMap = ConcurrentHashMap<Class<*>, Any>()
@JvmStatic
fun <T> register(
contextName: String,
eventDispatcher: CoroutineDispatcher = UI,
eventClass: Class<T>,
eventCallback: (T) -> Unit
) {
val eventDataMap = if (contextMap.containsKey(contextName)) {
contextMap[contextName]!!
} else {
val eventDataMap = mutableMapOf<Class<*>, EventData<*>>()
contextMap[contextName] = eventDataMap
eventDataMap
}
eventDataMap[eventClass] = EventData(this, eventDispatcher, eventCallback)
}
@JvmStatic
fun <T> register(
contextName: String,
eventDispatcher: CoroutineDispatcher = UI,
eventClass: Class<T>,
eventCallback: (T) -> Unit,
eventFail:(Throwable)->Unit
) {
val eventDataMap = if (contextMap.containsKey(contextName)) {
contextMap[contextName]!!
} else {
val eventDataMap = mutableMapOf<Class<*>, EventData<*>>()
contextMap[contextName] = eventDataMap
eventDataMap
}
eventDataMap[eventClass] = EventData(this, eventDispatcher, eventCallback, eventFail)
}
@JvmStatic
fun <T> registerSticky(
contextName: String,
eventDispatcher: CoroutineDispatcher = UI,
eventClass: Class<T>,
eventCallback: (T) -> Unit
) {
val eventDataMap = if (contextMap.containsKey(contextName)) {
contextMap[contextName]!!
} else {
val eventDataMap = mutableMapOf<Class<*>, EventData<*>>()
contextMap[contextName] = eventDataMap
eventDataMap
}
eventDataMap[eventClass] = EventData(this, eventDispatcher, eventCallback)
val event = mStickyEventMap[eventClass]
event?.let {
postEvent(it)
}
}
@JvmStatic
fun <T> registerSticky(
contextName: String,
eventDispatcher: CoroutineDispatcher = UI,
eventClass: Class<T>,
eventCallback: (T) -> Unit,
eventFail:(Throwable)->Unit
) {
val eventDataMap = if (contextMap.containsKey(contextName)) {
contextMap[contextName]!!
} else {
val eventDataMap = mutableMapOf<Class<*>, EventData<*>>()
contextMap[contextName] = eventDataMap
eventDataMap
}
eventDataMap[eventClass] = EventData(this, eventDispatcher, eventCallback, eventFail)
val event = mStickyEventMap[eventClass]
event?.let {
postEvent(it)
}
}
@JvmStatic
fun post(event: Any, delayTime: Long = 0) {
if (delayTime > 0) {
launch {
delay(delayTime)
postEvent(event)
}
} else {
postEvent(event)
}
}
@JvmStatic
fun postSticky(event: Any) {
mStickyEventMap[event.javaClass] = event
}
@JvmStatic
fun unregisterAllEvents() {
Log.i(TAG,"unregisterAllEvents()")
coroutineContext.cancelChildren()
for ((_, eventDataMap) in contextMap) {
eventDataMap.values.forEach {
it.cancel()
}
eventDataMap.clear()
}
contextMap.clear()
}
@JvmStatic
fun unregister(contextName: String) {
Log.i(TAG,"$contextName")
val cloneContexMap = ConcurrentHashMap<String, MutableMap<Class<*>, EventData<*>>>()
cloneContexMap.putAll(contextMap)
val map = cloneContexMap.filter { it.key == contextName }
for ((_, eventDataMap) in map) {
eventDataMap.values.forEach {
it.cancel()
}
eventDataMap.clear()
}
contextMap.remove(contextName)
}
@JvmStatic
fun <T> removeStickyEvent(eventType: Class<T>) {
mStickyEventMap.remove(eventType)
}
private fun postEvent(event: Any) {
val cloneContexMap = ConcurrentHashMap<String, MutableMap<Class<*>, EventData<*>>>()
cloneContexMap.putAll(contextMap)
for ((_, eventDataMap) in cloneContexMap) {
eventDataMap.keys
.firstOrNull { it == event.javaClass || it == event.javaClass.superclass }
?.let { key -> eventDataMap[key]?.postEvent(event) }
}
}
}
EventData 通過 channel 實現(xiàn)真正的發(fā)送咖刃、消費事件。
package com.safframework.eventbus
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import java.lang.Exception
/**
*
* @FileName:
* com.safframework.eventbus.EventData
* @author: Tony Shen
* @date: 2019-08-25 00:20
* @version: V1.0 <描述當前版本功能>
*/
data class EventData<T>(
val coroutineScope: CoroutineScope,
val eventDispatcher: CoroutineDispatcher,
val onEvent: (T) -> Unit,
val exception: ((Throwable)->Unit)? = null
) {
private val channel = Channel<T>()
init {
coroutineScope.launch {
channel.consumeEach { // 消費者循環(huán)地消費消息
launch(eventDispatcher) {
if (exception!=null) {
try{
onEvent(it)
} catch (e:Exception) {
exception.invoke(e)
}
} else {
onEvent(it)
}
}
}
}
}
fun postEvent(event: Any) {
if (!channel.isClosedForSend) {
coroutineScope.launch {
channel.send(event as T)
}
} else {
println("Channel is closed for send")
}
}
fun cancel() {
channel.cancel()
}
}
EventBus github 地址:https://github.com/fengzhizi715/EventBus
該版本的 EventBus 跟 RxBus 的功能基本一致憾筏。上述 github 地址中嚎杨,包含有 demo 介紹了 EventBus 的具體使用,其實也跟 RxBus 的使用一致氧腰。
題外話枫浙,最近還抽空優(yōu)化了另外一個跟 Coroutines 相關的項目:https://github.com/fengzhizi715/Lifecycle-Coroutines-Extension
四. 總結
該版本的 EventBus 是給不使用 RxBus 或者其他版本 EventBus 提供了另一種選擇。當然古拴,該項目未來還有優(yōu)化的空間箩帚。