1蟹演,簡單使用
首先引入guave的依賴;
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>23.6-jre</version>
</dependency>
再來看一下基礎(chǔ)的使用:
EventBus eventBus = new EventBus();
eventBus.register(new Object() {
@Subscribe
public void hehe(String name) {
System.out.println(name);
}
@Subscribe
public void haha(Object object) {
System.out.println(object);
}
});
eventBus.post("huang");//輸出兩遍huang臭墨;
eventBus.post(17);//輸出一遍17;
- 首先需要一個(gè)總線EventBus膘盖,直接new出來;
- 然后需要一個(gè)觀察者尤误,可以理解為監(jiān)聽器侠畔,最簡潔的方式是直接new一個(gè)Object,然后向其中新增方法损晤;
- 然后需要將觀察者注冊到總線中软棺;
- 最后發(fā)布事件;
注意點(diǎn):
- 使用@Subscribe注解來標(biāo)注方法尤勋,該方法的返回值不會被處理喘落,所以有無返回值都是一樣的茵宪,方法的參數(shù)類型是接受的事件類型,所以方法有且只能有一個(gè)參數(shù)瘦棋;
- 當(dāng)多個(gè)標(biāo)注了@Subscribe的方法的參數(shù)存在父子關(guān)系的時(shí)候稀火,當(dāng)發(fā)布子類型的事件時(shí),父類型的方法也將被執(zhí)行赌朋,這就是上面代碼輸出兩遍huang的原因凰狞;
- 如果方法只標(biāo)注了@Subscribe,那么該方法的執(zhí)行是同步的沛慢,即使是多線程發(fā)布同一事件赡若,那多個(gè)線程之間存在互斥鎖,同一時(shí)間點(diǎn)团甲,只能有一個(gè)或零個(gè)執(zhí)行該方法逾冬,如下面的代碼所示;
EventBus eventBus = new EventBus();
eventBus.register(new Object() {
@Subscribe
public void hehe(Integer num) throws InterruptedException {
System.out.println(num + ":" + System.currentTimeMillis());
Thread.currentThread().sleep(100);
}
});
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(new Runnable() {
@Override
public void run() {
eventBus.post(finalI);
}
}).start();
}
//4:1517538494324
//1:1517538494425
//8:1517538494529
//0:1517538494632
//5:1517538494735
//2:1517538494838
//9:1517538494942
//6:1517538495043
//3:1517538495147
//7:1517538495250
2躺苦,關(guān)于EventBus的并行
線程安全問題是每個(gè)java程序猿都應(yīng)該時(shí)刻注意的身腻,當(dāng)帶有@Subscribe的方法被多個(gè)方法同時(shí)執(zhí)行,且該方法內(nèi)部邏輯涉及到更改成員變量時(shí)圾另,就會出現(xiàn)線程安全問題雨涛,在默認(rèn)情況下,如果只有@Subscribe注解時(shí)蛔钙,方法時(shí)是異步執(zhí)行的笨农,即使多個(gè)線程同時(shí)調(diào)用,也需要競爭方法的同步鎖扰路,然后依次執(zhí)行尤溜;
當(dāng)需要@Subscribe標(biāo)注的方法能被多個(gè)線程同時(shí)調(diào)用,需要配合@AllowConcurrentEvents注解使用汗唱,該注解表示允許并行執(zhí)行該方法宫莱,當(dāng)有多個(gè)線程同時(shí)調(diào)用方法時(shí),因?yàn)榉椒o鎖哩罪,所以線程可以同時(shí)進(jìn)入執(zhí)行授霸,有鎖和無鎖,取決于@AllowConcurrentEvents际插,當(dāng)沒有該注解時(shí)碘耳,EventBus在生成Subscriber時(shí),使用了SynchronizedSubscriber框弛,該類型在真實(shí)調(diào)用帶有@Subscribe方法時(shí)辛辨,使用了同步鎖,具體后面講解;
EventBus eventBus = new EventBus();
eventBus.register(new Object() {
@Subscribe
@AllowConcurrentEvents
public void hehe(Integer num) throws InterruptedException {
System.out.println(num + ":" + System.currentTimeMillis());
Thread.currentThread().sleep(100);
}
});
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(new Runnable() {
@Override
public void run() {
eventBus.post(finalI);
}
}).start();
}
//7:1517543101034
//9:1517543101034
//1:1517543101034
//4:1517543101034
//5:1517543101034
//8:1517543101034
//0:1517543101034
//6:1517543101034
//2:1517543101034
//3:1517543101034
關(guān)于多線程調(diào)用:
- 當(dāng)直接new一個(gè)EventBus時(shí)斗搞,當(dāng)使用post方法發(fā)布一個(gè)事件是指攒,那么調(diào)用帶有@Subscribe的方法的線程是當(dāng)前線程,在哪個(gè)線程中調(diào)用僻焚,就在哪個(gè)線程中執(zhí)行允悦;
- 當(dāng)需要在多線程中執(zhí)行同一個(gè)標(biāo)注了@Subscribe的方法時(shí),需要使用AsyncEventBus類溅呢,并指定一個(gè)Executor線程池澡屡,那么發(fā)布事件時(shí),調(diào)用的方法都將在指定的線程池中執(zhí)行咐旧;
- 即使指定了線程池驶鹉,如果沒有使用@AllowConcurrentEvents,那么即使調(diào)用方法(同一方法)的線程不一樣铣墨,因?yàn)橥芥i的存在室埋,執(zhí)行的時(shí)機(jī)還是依次執(zhí)行,多個(gè)線程并不會同時(shí)執(zhí)行同一方法伊约;
- 只有指定了線程池姚淆,并且使用了@AllowConcurrentEvents注解,才能實(shí)現(xiàn)在多個(gè)線程中同時(shí)調(diào)用某個(gè)標(biāo)注了@Subscribe的方法屡律,這時(shí)需要格外注意線程并發(fā)導(dǎo)致的線程安全問題腌逢;
EventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(3));
eventBus.register(new Object() {
@Subscribe
@AllowConcurrentEvents
public void hehe(Integer num) throws InterruptedException {
Thread.currentThread().sleep(100);
System.out.println(Thread.currentThread().getName() + "-" + num + "-" + System.currentTimeMillis());
}
});
for (int i = 0; i < 9; i++) {
eventBus.post(i);
}
//pool-1-thread-2-1-1517546343599
//pool-1-thread-1-0-1517546343599
//pool-1-thread-3-2-1517546343599
//pool-1-thread-2-3-1517546343704
//pool-1-thread-3-5-1517546343704
//pool-1-thread-1-4-1517546343704
//pool-1-thread-3-7-1517546343807
//pool-1-thread-1-8-1517546343807
//pool-1-thread-2-6-1517546343807
3,源碼解析
基本組件:
- Executor:執(zhí)行的線程池超埋,默認(rèn)是DirectExecutor搏讶,他未開啟新的線程,而是在當(dāng)前線程中直接執(zhí)行霍殴;
- SubscriberRegistry:Subscriber注冊器媒惕,每個(gè)帶有@Subscribe的方法會被注冊到該類中;
- Dispatcher:調(diào)度器来庭,負(fù)責(zé)將事件妒蔚,分發(fā)給事件對應(yīng)的Subscriber,并使用Executor執(zhí)行這些Subscriber月弛;
- SubscriberExceptionHandler:異常處理器肴盏,用來處理異常;
3.1帽衙,Executor
private enum DirectExecutor implements Executor {
INSTANCE;
@Override
public void execute(Runnable command) {
command.run();
}
@Override
public String toString() {
return "MoreExecutors.directExecutor()";
}
}
- 默認(rèn)的執(zhí)行器叁鉴,直接實(shí)現(xiàn)了Executor接口,然后覆寫方法佛寿,因?yàn)闆]有開啟新的線程,所以默認(rèn)是在當(dāng)前線程中執(zhí)行;
- 如果是在主線程中調(diào)用冀泻,那么DirectExecutor對應(yīng)的線程就是主線程常侣,如果在其他線程中執(zhí)行,那么DirectExecutor對應(yīng)的就是其他線程弹渔,總之胳施,就是調(diào)用線程;
3.2肢专,Dispatcher
private static final class PerThreadQueuedDispatcher extends Dispatcher {
private final ThreadLocal<Queue<Event>> queue =
new ThreadLocal<Queue<Event>>() {
@Override
protected Queue<Event> initialValue() {
return Queues.newArrayDeque();
}
};
private final ThreadLocal<Boolean> dispatching =
new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return false;
}
};
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
}
- 該調(diào)度器名為單線程調(diào)度器舞肆,當(dāng)某個(gè)線程調(diào)用dispatch方法時(shí),首先從當(dāng)前線程中獲取queue博杖,這是一個(gè)ThreadLocal類型的Event隊(duì)列椿胯,然后將當(dāng)前事件放置進(jìn)去;
- 然后根據(jù)同樣是ThreadLocal類型的dispatching字段剃根,判斷是否正在調(diào)用中哩盲,如果不是,則開始執(zhí)行具體的調(diào)度任務(wù)狈醉;
- 但是此處的if判斷我不是很理解廉油,因?yàn)榻Y(jié)果必然一定成立;
- 如果是單個(gè)線程苗傅,if條件中的邏輯沒有執(zhí)行完時(shí)抒线,是不可能再次調(diào)用dispatch方法的;
- 如果是多線程調(diào)用渣慕,每個(gè)線程拿到的dispatching都是不同的嘶炭,相互之間不存在干擾,所以這個(gè)if條件是必然成立的摇庙,望大神排異解惑旱物;
下面的代碼是具體的調(diào)用邏輯,使用的是Executor進(jìn)行具體調(diào)用卫袒,并執(zhí)行方法:
final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException e) {
throw new Error("Method rejected target/argument: " + event, e);
} catch (IllegalAccessException e) {
throw new Error("Method became inaccessible: " + event, e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}
3.3宵呛,SubscriberRegistry
- SubscriberRegistry只有一個(gè)類就是SubscriberRegistry;
- 該類可以注冊也可以取消注冊Subscriber夕凝;
- 還需要注意的是宝穗,當(dāng)沒有@AllowConcurrentEvents注解時(shí),Subscriber使用的是SynchronizedSubscriber類型码秉,而有@AllowConcurrentEvents注解時(shí)逮矛,使用的是Subscriber類型;
- 當(dāng)方法被調(diào)用時(shí)都是調(diào)用invokeSubscriberMethod方法转砖;
- SynchronizedSubscriber類繼承了Subscriber類须鼎,并重寫了invokeSubscriberMethod方法鲸伴;
- 不同的是SynchronizedSubscriber類型對方法使用了同步鎖,導(dǎo)致的結(jié)果就是晋控,沒有@AllowConcurrentEvents注解時(shí)invokeSubscriberMethod方法會在多個(gè)線程中同步執(zhí)行汞窗;
static final class SynchronizedSubscriber extends Subscriber {
private SynchronizedSubscriber(EventBus bus, Object target, Method method) {
super(bus, target, method);
}
@Override
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
synchronized (this) {
super.invokeSubscriberMethod(event);
}
}
}