在這篇文章中袋马,我們會(huì)先分析一下 RxJava2 中的 Subject ;然后,我們會(huì)使用 Subject 制作一個(gè)類(lèi)似于 EventBus 的全局的通信工具损话。
在了解本篇文章的內(nèi)容之前,你需要先了解 RxJava2 中的一些基本的用法槽唾,比如 Observable 以及背壓的概念丧枪,你可以參考我的其他兩篇文章來(lái)獲取這部分內(nèi)容:《RxJava2 系列 (1):一篇的比較全面的 RxJava2 方法總結(jié)》和《RxJava2 系列 (2):背壓和Flowable》。
1庞萍、Subject
1.1 Subject 的兩個(gè)特性
Subject 可以同時(shí)代表 Observer 和 Observable拧烦,允許從數(shù)據(jù)源中多次發(fā)送結(jié)果給多個(gè)觀察者。除了 onSubscribe(), onNext(), onError() 和 onComplete() 之外钝计,所有的方法都是線程安全的恋博。此外,你還可以使用 toSerialized() 方法私恬,也就是轉(zhuǎn)換成串行的债沮,將這些方法設(shè)置成線程安全的。
如果你已經(jīng)了解了 Observable 和 Observer 本鸣,那么也許直接看 Subject 的源碼定義會(huì)更容易理解:
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
// ...
}
從上面看出疫衩,Subject 同時(shí)繼承了 Observable 和 Observer 兩個(gè)接口,說(shuō)明它既是被觀察的對(duì)象永高,同時(shí)又是觀察對(duì)象隧土,也就是可以生產(chǎn)、可以消費(fèi)命爬、也可以自己生產(chǎn)自己消費(fèi)曹傀。所以,我們可以項(xiàng)下面這樣來(lái)使用它饲宛。這里我們用到的是該接口的一個(gè)實(shí)現(xiàn) PublishSubject :
public static void main(String...args) {
PublishSubject<Integer> subject = PublishSubject.create();
subject.subscribe(System.out::println);
Executor executor = Executors.newFixedThreadPool(5);
Disposable disposable = Observable.range(1, 5).subscribe(i ->
executor.execute(() -> {
try {
Thread.sleep(i * 200);
subject.onNext(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
}
根據(jù)程序的執(zhí)行結(jié)果皆愉,程序在第200, 400, 600, 800, 1000毫秒依次輸出了1到5的數(shù)字。
在這里,我們用 PublishSubject 創(chuàng)建了一個(gè)主題并對(duì)其監(jiān)聽(tīng)幕庐,然后在線程當(dāng)中又通知該主題內(nèi)容變化久锥,整個(gè)過(guò)程我們都只操作了 PublishSubject 一個(gè)對(duì)象。顯然异剥,使用 Subject 我們可以達(dá)到對(duì)一個(gè)指定類(lèi)型的值的結(jié)果進(jìn)行監(jiān)聽(tīng)的目的——我們把值改變之后對(duì)應(yīng)的邏輯寫(xiě)在 subscribe() 方法中瑟由,然后每次調(diào)用 onNext() 等方法通知結(jié)果之后就可以自動(dòng)調(diào)用 subscribe() 方法進(jìn)行更新操作。
同時(shí)冤寿,因?yàn)?Subject 實(shí)現(xiàn)了 Observer 接口歹苦,并且在 Observable 等的 subscribe() 方法中存在一個(gè)以 Observer 作為參數(shù)的方法(如下),所以督怜,Subject 也是可以作為消費(fèi)者來(lái)對(duì)事件進(jìn)行消費(fèi)的殴瘦。
public final void subscribe(Observer<? super T> observer)
以上就是 Subject 的兩個(gè)主要的特性。
1.2 Subject 的實(shí)現(xiàn)類(lèi)
在 RxJava2 号杠,Subject 有幾個(gè)默認(rèn)的實(shí)現(xiàn)蚪腋,下面我們對(duì)它們之間的區(qū)別做簡(jiǎn)單的說(shuō)明:
-
AsyncSubject
:只有當(dāng) Subject 調(diào)用 onComplete 方法時(shí),才會(huì)將 Subject 中的最后一個(gè)事件傳遞給所有的 Observer姨蟋。 -
BehaviorSubject
:該類(lèi)有創(chuàng)建時(shí)需要一個(gè)默認(rèn)參數(shù)屉凯,該默認(rèn)參數(shù)會(huì)在 Subject 未發(fā)送過(guò)其他的事件時(shí),向注冊(cè)的 Observer 發(fā)送芬探;新注冊(cè)的 Observer 不會(huì)收到之前發(fā)送的事件神得,這點(diǎn)和 PublishSubject 一致。 -
PublishSubject
:不會(huì)改變事件的發(fā)送順序偷仿;在已經(jīng)發(fā)送了一部分事件之后注冊(cè)的 Observer 不會(huì)收到之前發(fā)送的事件哩簿。 -
ReplaySubject
:無(wú)論什么時(shí)候注冊(cè) Observer 都可以接收到任何時(shí)候通過(guò)該 Observable 發(fā)射的事件。 -
UnicastSubject
:只允許一個(gè) Observer 進(jìn)行監(jiān)聽(tīng)酝静,在該 Observer 注冊(cè)之前會(huì)將發(fā)射的所有的事件放進(jìn)一個(gè)隊(duì)列中节榜,并在 Observer 注冊(cè)的時(shí)候一起通知給它。
對(duì)比 PublishSubject 和 ReplaySubject别智,它們的區(qū)別在于新注冊(cè)的 Observer 是否能夠收到在它注冊(cè)之前發(fā)送的事件宗苍。這個(gè)類(lèi)似于 EventBus 中的 StickyEvent 即黏性事件,為了說(shuō)明這一點(diǎn)薄榛,我們準(zhǔn)備了下面兩段代碼:
private static void testPublishSubject() throws InterruptedException {
PublishSubject<Integer> subject = PublishSubject.create();
subject.subscribe(i -> System.out.print("(1: " + i + ") "));
Executor executor = Executors.newFixedThreadPool(5);
Disposable disposable = Observable.range(1, 5).subscribe(i -> executor.execute(() -> {
try {
Thread.sleep(i * 200);
subject.onNext(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
Thread.sleep(500);
subject.subscribe(i -> System.out.print("(2: " + i + ") "));
Observable.timer(2, TimeUnit.SECONDS).subscribe(i -> ((ExecutorService) executor).shutdown());
}
private static void testReplaySubject() throws InterruptedException {
ReplaySubject<Integer> subject = ReplaySubject.create();
subject.subscribe(i -> System.out.print("(1: " + i + ") "));
Executor executor = Executors.newFixedThreadPool(5);
Disposable disposable = Observable.range(1, 5).subscribe(i -> executor.execute(() -> {
try {
Thread.sleep(i * 200);
subject.onNext(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
Thread.sleep(500);
subject.subscribe(i -> System.out.print("(2: " + i + ") "));
Observable.timer(2, TimeUnit.SECONDS).subscribe(i -> ((ExecutorService) executor).shutdown());
}
它們的輸出結(jié)果依次是
PublishSubject的結(jié)果:(1: 1) (1: 2) (1: 3) (2: 3) (1: 4) (2: 4) (1: 5) (2: 5)
ReplaySubject的結(jié)果: (1: 1) (1: 2) (2: 1) (2: 2) (1: 3) (2: 3) (1: 4) (2: 4) (1: 5) (2: 5)
從上面的結(jié)果對(duì)比中讳窟,我們可以看出前者與后者的區(qū)別在于新注冊(cè)的 Observer 并沒(méi)有收到在它注冊(cè)之前發(fā)送的事件。試驗(yàn)的結(jié)果與上面的敘述是一致的敞恋。
其他的測(cè)試代碼這不一并給出了丽啡,詳細(xì)的代碼可以參考Github - Java Advanced。
2硬猫、用 RxJava 打造 EventBus
2.1 打造 EventBus
清楚了 Subject 的概念之后补箍,讓我們來(lái)做一個(gè)實(shí)踐——用 RxJava 打造 EventBus改执。
我們先考慮用一個(gè)全局的 PublishSubject 來(lái)解決這個(gè)問(wèn)題沦疾,當(dāng)然穴墅,這意味著我們發(fā)送的事件不是黏性事件碰镜。不過(guò)古劲,沒(méi)關(guān)系,只要這種實(shí)現(xiàn)方式搞懂了颁褂,用 ReplaySubject 做一個(gè)發(fā)送黏性事件的 EventBus 也非難事席里。
考慮一下签则,如果要實(shí)現(xiàn)這個(gè)功能我們需要做哪些準(zhǔn)備:
- 我們需要發(fā)送事件并能夠正確地接收到事件遥诉。要實(shí)現(xiàn)這個(gè)目的并不難后豫,因?yàn)?Subject 本身就具有發(fā)送和接收兩個(gè)能力,作為全局的之后就具有了全局的注冊(cè)和通知的能力突那。因此,不論你在什么位置發(fā)送了事件构眯,任何訂閱的地方都能收到該事件愕难。
- 首先,我們要在合適的位置對(duì)事件進(jìn)行監(jiān)聽(tīng)惫霸,并在合適的位置取消事件的監(jiān)聽(tīng)猫缭。如果我們沒(méi)有在適當(dāng)?shù)臅r(shí)機(jī)釋放事件,會(huì)不會(huì)造成內(nèi)存泄漏呢壹店?這還是有可能的猜丹。所以,我們需要對(duì)注冊(cè)監(jiān)聽(tīng)的觀察者進(jìn)行記錄硅卢,并提供注冊(cè)和取消注冊(cè)的方法射窒,給它們?cè)谥付ǖ纳芷谥羞M(jìn)行調(diào)用。
好了将塑,首先是全局的 Subject 的問(wèn)題脉顿,我們可以實(shí)現(xiàn)一個(gè)靜態(tài)的或者單例的 Subject。這里我們選擇使用后者点寥,所以艾疟,我們需要一個(gè)單例的方式來(lái)使用 Subject:
public class RxBus {
private static volatile RxBus rxBus;
private final Subject<Object> subject = PublishSubject.create().toSerialized();
public static RxBus getRxBus() {
if (rxBus == null) {
synchronized (RxBus.class) {
if(rxBus == null) {
rxBus = new RxBus();
}
}
}
return rxBus;
}
}
這里我們應(yīng)用了 DCL 的單例模式提供一個(gè)單例的 RxBus,對(duì)應(yīng)一個(gè)唯一的 Subject. 這里我們用到了 Subject 的toSerialized()
敢辩,我們上面已經(jīng)提到過(guò)它的作用蔽莱,就是用來(lái)保證 onNext() 等方法的線程安全性。
另外戚长,因?yàn)?Observalbe 本身是不支持背壓的盗冷,所以,我們還需要將該 Observable 轉(zhuǎn)換成 Flowable 來(lái)實(shí)現(xiàn)背壓的效果:
public <T> Flowable<T> getObservable(Class<T> type){
return subject.toFlowable(BackpressureStrategy.BUFFER).ofType(type);
}
這里我們用到的背壓的策略是BackpressureStrategy.BUFFER
历葛,它會(huì)緩存發(fā)射結(jié)果正塌,直到有消費(fèi)者訂閱了它嘀略。而這里的ofType()
方法的作用是用來(lái)過(guò)濾發(fā)射的事件的類(lèi)型,只有指定類(lèi)型的事件會(huì)被發(fā)布乓诽。
然后帜羊,我們需要記錄訂閱者的信息以便在適當(dāng)?shù)臅r(shí)機(jī)取消訂閱,這里我們用一個(gè)Map<String, CompositeDisposable>
類(lèi)型的哈希表來(lái)解決鸠天。這里的CompositeDisposable
用來(lái)存儲(chǔ) Disposable讼育,從而達(dá)到一個(gè)訂閱者對(duì)應(yīng)多個(gè) Disposable 的目的。CompositeDisposable
是一個(gè) Disposable 的容器稠集,聲稱(chēng)可以達(dá)到 O(1) 的增奶段、刪的復(fù)雜度。這里的做法目的是使用注冊(cè)觀察之后的 Disposable 的 dispose() 方法來(lái)取消訂閱剥纷。所以痹籍,我們可以得到下面的這段代碼:
public void addSubscription(Object o, Disposable disposable) {
String key = o.getClass().getName();
if (disposableMap.get(key) != null) {
disposableMap.get(key).add(disposable);
} else {
CompositeDisposable disposables = new CompositeDisposable();
disposables.add(disposable);
disposableMap.put(key, disposables);
}
}
public void unSubscribe(Object o) {
String key = o.getClass().getName();
if (!disposableMap.containsKey(key)) {
return;
}
if (disposableMap.get(key) != null) {
disposableMap.get(key).dispose();
}
disposableMap.remove(key);
}
最后,對(duì)外提供一下 Subject 的訂閱和發(fā)布方法晦鞋,整個(gè) EventBus 就制作完成了:
public void post(Object o){
subject.onNext(o);
}
public <T> Disposable doSubscribe(Class<T> type, Consumer<T> next, Consumer<Throwable> error){
return getObservable(type)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(next,error);
}
2.2 測(cè)試效果
我們只需要在最頂層的 Activity 基類(lèi)中加入如下的代碼蹲缠。這樣,我們就不需要在各個(gè) Activity 中取消注冊(cè)了悠垛。然后线定,就可以使用這些頂層的方法來(lái)進(jìn)行操作了。
protected void postEvent(Object object) {
RxBus.getRxBus().post(object);
}
protected <M> void addSubscription(Class<M> eventType, Consumer<M> action) {
Disposable disposable = RxBus.getRxBus().doSubscribe(eventType, action, LogUtils::d);
RxBus.getRxBus().addSubscription(this, disposable);
}
protected <M> void addSubscription(Class<M> eventType, Consumer<M> action, Consumer<Throwable> error) {
Disposable disposable = RxBus.getRxBus().doSubscribe(eventType, action, error);
RxBus.getRxBus().addSubscription(this, disposable);
}
@Override
protected void onDestroy() {
super.onDestroy();
RxBus.getRxBus().unSubscribe(this);
}
在第一個(gè) Activity 中我們對(duì)指定的類(lèi)型的結(jié)果進(jìn)行監(jiān)聽(tīng):
addSubscription(RxMessage.class, rxMessage -> ToastUtils.makeToast(rxMessage.message));
然后确买,我們?cè)诹硪粋€(gè) Activity 中發(fā)布事件:
postEvent(new RxMessage("Hello world!"));
這樣當(dāng)?shù)诙€(gè) Activity 中調(diào)用指定的發(fā)送事件的方法之后斤讥,第一個(gè) Activity 就可以接收到發(fā)射的事件了。
總結(jié)
好了湾趾,以上就是 Subject 的使用芭商,如果要用一個(gè)詞來(lái)形容它的話,那么只能是“自給自足”了撑帖。就是說(shuō)蓉坎,它同時(shí)做了 Observable 和 Observer 的工作,既可以發(fā)射事件又可以對(duì)事件進(jìn)行消費(fèi)胡嘿,可謂身兼數(shù)職蛉艾。它在那種想要對(duì)某個(gè)值進(jìn)行監(jiān)聽(tīng)并處理的情形特別有用。因?yàn)樗恍枰銓?xiě)多個(gè)冗余的類(lèi)衷敌,只要它一個(gè)就完成了其他兩個(gè)類(lèi)來(lái)完成的任務(wù)勿侯,因而代碼更加簡(jiǎn)潔。