應(yīng)用內(nèi)消息傳遞
對于Android系統(tǒng)來說,消息傳遞是最基本的組件,每一個App內(nèi)的不同頁面,不同組件都在進(jìn)行消息傳遞膊升。消息傳遞既可以用于Android四大組件之間的通信,也可用于異步線程和主線程之間的通信谭企。對于Android開發(fā)者來說廓译,經(jīng)常使用的消息傳遞方式有很多種,從最早使用的Handler债查、BroadcastReceiver责循、接口回調(diào),到近幾年流行的通信總線類框架EventBus攀操、RxBus院仿。Android消息傳遞框架,總在不斷的演進(jìn)之中速和。
什么是事件總線呢歹垫?
在簡單觀察模式中,觀察者訂閱被觀察者颠放,單被觀察者狀態(tài)或者數(shù)據(jù)發(fā)生變化時通知觀察者排惨,這是一對一的關(guān)系。
但當(dāng)觀察者和被觀察者是多個或者不確定數(shù)量的時候碰凶,這就需要一個總線來存儲這些觀察者和被觀察者暮芭,方便在發(fā)送通知的時候找到對應(yīng)的觀察者鹿驼。
常用的事件總線方案
EventBus
詳情見另一篇文章Eventbus源碼分析
Rxbus
RxBus不是一個庫,而是一個文件辕宏,實現(xiàn)只有短短30行代碼畜晰。RxBus本身不需要過多分析,它的強(qiáng)大完全來自于它基于的RxJava技術(shù)瑞筐。
Rxbus屬于Rxjava庫下一小部分功能凄鼻,如果項目已經(jīng)使用Rxjava,就不需要再額外引入EventBus庫來使用了聚假,直接使用Rxbus就可以了撒块蚌。需要從Eventbus轉(zhuǎn)到Rxbus使用的童鞋可以放心轉(zhuǎn),使用方式大同小異膘格。
引入庫:
implementation 'io.reactivex:rxjava:1.1.0'
implementation 'io.reactivex:rxandroid:1.1.0'
創(chuàng)建Rxbus操作類:
Subject是非線程安全的峭范,在并發(fā)情況下,不推薦使用通常的Subject對象瘪贱,而是推薦使用SerializedSubject纱控。
public class RxBus {
private static volatile RxBus instance;
private Subject<Object, Object> bus;
private RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
public static RxBus getDefault() {
if (instance == null) {
synchronized (RxBus.class) {
instance = new RxBus();
}
}
return instance;
}
/**
* 發(fā)送事件
* @param object
*/
public void post(Object object) {
bus.onNext(object);
}
/**
* 根據(jù)類型接收相應(yīng)類型事件
* @param eventType
* @param <T>
* @return
*/
public <T> Observable toObservable(Class<T> eventType) {
return bus.ofType(eventType);
}
}
在BaseActivity中保存和取消訂閱事件。
public class BaseActivity extends AppCompatActivity {
protected ArrayList<Subscription> rxBusList = new ArrayList<>();
@Override
protected void onDestroy() {
super.onDestroy();
clearSubscription();
}
/**
* 取消該頁面所有訂閱
*/
private void clearSubscription() {
for (Subscription subscription : rxBusList) {
if (subscription != null && subscription.isUnsubscribed()) {
subscription.unsubscribe();
}
}
}
}
發(fā)送事件方式:
RxBus.getDefault().post(new EventBean(1, "聽說名字長回頭率很高"));
接收事件方式:
Subscription subscription = RxBus.getDefault().toObservable(EventBean.class)
.subscribe(new Action1<EventBean>() {
@Override
public void call(EventBean eventBean) {
tvContent.setText(eventBean.getUserId() + "------" + eventBean.getNickName());
}
});
rxBusList.add(subscription);
Rxbus原理
這里是通過Rxjava中的PublishSubject.create().toSerialized() 來創(chuàng)建總線用來存儲觀察者政敢。簡單的就把它當(dāng)做集合吧其徙。
RxBus工作流程圖
1胚迫、首先創(chuàng)建一個可同時充當(dāng)Observer和Observable的Subject喷户;
2、在需要接收事件的地方访锻,訂閱該Subject(此時Subject是作為Observable)褪尝,在這之后,一旦Subject接收到事件期犬,立即發(fā)射給該訂閱者河哑;
3、在我們需要發(fā)送事件的地方龟虎,將事件post至Subject璃谨,此時Subject作為Observer接收到事件(onNext),然后會發(fā)射給所有訂閱該Subject的訂閱者鲤妥。
LiveDataBus
為何使用liveData做事件總線佳吞?
LiveData具有的這種可觀察性和生命周期感知的能力,使其非常適合作為Android通信總線的基礎(chǔ)構(gòu)件棉安。在一對多的場景中底扳,發(fā)布消息事件后,訂閱事件的頁面只有在可見的時候才會處理事件邏輯贡耽。
使用者不用顯示調(diào)用反注冊方法衷模。LiveData具有生命周期感知能力鹊汛,所以LiveDataBus只需要調(diào)用注冊回調(diào)方法,而不需要顯示的調(diào)用反注冊方法阱冶。這樣帶來的好處不僅可以編寫更少的代碼刁憋,而且可以完全杜絕其他通信總線類框架(如EventBus、RxBus)忘記調(diào)用反注冊所帶來的內(nèi)存泄漏的風(fēng)險熙揍。
LiveDataBus的組成
- 消息: 消息可以是任何的 Object职祷,可以定義不同類型的消息,如 Boolean届囚、String有梆。也可以定義自定義類型的消息。
- 消息通道: LiveData 扮演了消息通道的角色意系,不同的消息通道用不同的名字區(qū)分泥耀,名字是 String 類型的,可以通過名字獲取到一個 LiveData 消息通道蛔添。
- 消息總線: 消息總線通過單例實現(xiàn)痰催,不同的消息通道存放在一個 HashMap 中。
- 訂閱: 訂閱者通過 getChannel() 獲取消息通道迎瞧,然后調(diào)用 observe() 訂閱這個通道的消息夸溶。
- 發(fā)布: 發(fā)布者通過 getChannel() 獲取消息通道,然后調(diào)用 setValue() 或者 postValue() 發(fā)布消息凶硅。
代碼實現(xiàn):
public final class LiveDataBus {
private final Map<String, MutableLiveData<Object>> bus;
private LiveDataBus() {
bus = new HashMap<>();
}
private static class SingletonHolder {
private static final LiveDataBus DATA_BUS = new LiveDataBus();
}
public static LiveDataBus get() {
return SingletonHolder.DATA_BUS;
}
public <T> MutableLiveData<T> getChannel(String target, Class<T> type) {
if (!bus.containsKey(target)) {
bus.put(target, new MutableLiveData<T>());
}
return (MutableLiveData<T>) bus.get(target);
}
public MutableLiveData<Object> getChannel(String target) {
return getChannel(target, Object.class);
}
}
發(fā)送/接收消息:
//發(fā)送消息
LiveDataBus.get().getChannel("mykey").setValue(text);
//接收消息
LiveDataBus.get().getChannel("mykey", String.class)
.observe(this, new Observer<String>() {
@Override
public void onChanged(@Nullable String newText) {
// 更新數(shù)據(jù)
tvText.setText(newText);
}
});
做法就是在單例的LiveDataBus中缝裁,使用Hashmap存儲,key為自定義的String足绅,value為MutablelivaData捷绑,通過發(fā)送和接收時候?qū)ey-value進(jìn)行存取。注冊利用了MutablelivaData的observer注冊觀察者氢妈。傳遞的消息對象類作為泛型粹污,與MutablelivaData的泛型一致。
LiveDataBus遇到的問題和分析思路
1.訂閱者會收到訂閱之前發(fā)布的消息首量,類似于粘性消息壮吩。對于一個消息總線來說,這是不可接受的加缘。
2.多次調(diào)用了 postValue() 方法鸭叙,只有最后次調(diào)用的值會得到更新。也就是此方法是有可能會丟失事件生百!
FlowEventBus
MutableSharedFlow作為事件載體 :
優(yōu)點:
依托協(xié)程輕松切換線程
可以通過replay實現(xiàn)粘性效果
可以被多個觀察者訂閱
無觀察者自動清除事件不會造成積壓
代碼實現(xiàn):
internal object FlowEventBus {
/**
* private mutable shared flow
*/
private val mutableSharedFlow = MutableSharedFlow<Event>()
/**
* publicly exposed as read-only shared flow
*/
private val asSharedFlow = mutableSharedFlow.asSharedFlow()
val eventBus: SharedFlow<Event>
get() = asSharedFlow
init {
GlobalScope.launch {
//日志打印當(dāng)前訂閱的訂閱者數(shù)量
mutableSharedFlow.subscriptionCount.collect {
Log.d("flow", "subscriptionCount $it")
}
}
}
/**
* 發(fā)布事件
* Launches a new coroutine without blocking the current thread and returns a reference to the coroutine as a [Job].
* The coroutine is cancelled when the resulting job is [cancelled][Job.cancel].
*/
fun <T : Event> LifecycleOwner.produceEvent(event: T): Job {
// suspends until all subscribers receive it
return lifecycleScope.launch {
mutableSharedFlow.emit(event)
}
}
/**
* 在GlobalScope中發(fā)布
*/
fun <T : Event> produceEventGlobal(event: T) {
// suspends until all subscribers receive it
GlobalScope.launch {
mutableSharedFlow.emit(event)
}
}
/**
* Launches and runs the given block when the [Lifecycle] controlling this
* [LifecycleCoroutineScope] is at least in [Lifecycle.State.CREATED] state.
*
* The returned [Job] will be cancelled when the [Lifecycle] is destroyed.
*/
fun <T : Event> LifecycleOwner.produceEventWhenCreated(event: T): Job {
// suspends until all subscribers receive it
return lifecycleScope.launchWhenCreated {
mutableSharedFlow.emit(event)
}
}
/**
* Launches and runs the given block when the [Lifecycle] controlling this
* [LifecycleCoroutineScope] is at least in [Lifecycle.State.STARTED] state.
*
* The returned [Job] will be cancelled when the [Lifecycle] is destroyed.
*/
fun <T : Event> LifecycleOwner.produceEventWhenStared(event: T): Job {
// suspends until all subscribers receive it
return lifecycleScope.launchWhenStarted {
mutableSharedFlow.emit(event)
}
}
/**
* Launches and runs the given block when the [Lifecycle] controlling this
* [LifecycleCoroutineScope] is at least in [Lifecycle.State.RESUMED] state.
*
* The returned [Job] will be cancelled when the [Lifecycle] is destroyed.
*/
fun <T : Event> LifecycleOwner.produceEventWhenResumed(event: T): Job {
// suspends until all subscribers receive it
return lifecycleScope.launchWhenResumed {
mutableSharedFlow.emit(event)
}
}
/**
* subscribe event
* The returned [Job] can be cancelled
*/
inline fun LifecycleOwner.subscribeEvent(
crossinline predicate: suspend (e: Event) -> Boolean,
crossinline action: suspend (e: Event) -> Unit,
): Job {
return eventBus
.filter { predicate.invoke(it) }
.onEach {
action.invoke(it)
}.cancellable()
.launchIn(lifecycleScope)
}
}
open class Event(open val key: String)
實現(xiàn)方案递雀,利用單例持有MutableSharedFlow,發(fā)送時調(diào)用flow的emit發(fā)送消息蚀浆。接收消息獲取flow缀程,先過濾消息是否接收搜吧,然后進(jìn)行接收、處理消息杨凑。
參考:
http://www.reibang.com/p/3fd322f2bff9
http://www.reibang.com/p/116de9c747c5
戳這里簡單demo地址