本文首發(fā)于泊浮目的簡書專欄:http://www.reibang.com/nb/21050520
前言
無論是事件和消息驅(qū)動漩蟆,都是解耦的有力手段之一垒探。ZStack作為一個大型軟件項目,也使用了這些方案對整個架構(gòu)進行了解耦怠李。
EventFacade
EventFacade是一個很有意思的組件叛复,因為它幾乎是自舉的。這就意味著有興趣的朋友可以copy and paste扔仓,然后稍作修改就可以在自己的項目里工作起來了。
如何使用它
在ZStack的repo中咖耘,同樣提供了相應(yīng)的case:
package org.zstack.test.core.cloudbus;
/**
* Created with IntelliJ IDEA.
* User: frank
* Time: 12:38 AM
* To change this template use File | Settings | File Templates.
*/
public class TestCanonicalEvent {
CLogger logger = Utils.getLogger(TestCanonicalEvent.class);
ComponentLoader loader;
EventFacade evtf;
boolean success;
@Before
public void setUp() throws Exception {
BeanConstructor con = new BeanConstructor();
loader = con.build();
evtf = loader.getComponent(EventFacade.class);
((EventFacadeImpl) evtf).start();
}
@Test
public void test() throws InterruptedException {
String path = "/test/event";
evtf.on(path, new EventRunnable() {
@Override
public void run() {
success = true;
}
});
evtf.fire(path, null);
TimeUnit.SECONDS.sleep(1);
Assert.assertTrue(success);
}
}
使用方法非常簡單翘簇,先注冊一個路徑用于接收事件,然后沿著該路徑發(fā)送一個事件儿倒,該事件注冊的函數(shù)則會被調(diào)用版保。
接口定義
package org.zstack.core.cloudbus;
import java.util.Map;
/**
* Created with IntelliJ IDEA.
* User: frank
* Time: 11:29 PM
* To change this template use File | Settings | File Templates.
*/
public interface EventFacade {
void on(String path, AutoOffEventCallback cb);
void on(String path, EventCallback cb);
void on(String path, EventRunnable runnable);
void off(AbstractEventFacadeCallback cb);
void onLocal(String path, AutoOffEventCallback cb);
void onLocal(String path, EventCallback cb);
void onLocal(String path, EventRunnable runnable);
void fire(String path, Object data);
boolean isFromThisManagementNode(Map tokens);
String META_DATA_MANAGEMENT_NODE_ID = "metadata::managementNodeId";
String META_DATA_PATH = "metadata::path";
String WEBHOOK_TYPE = "CanonicalEvent";
}
源碼解讀
on
@Override
public void on(String path, AutoOffEventCallback cb) {
global.put(cb.uniqueIdentity, new CallbackWrapper(path, cb));
}
@Override
public void on(String path, final EventCallback cb) {
global.put(cb.uniqueIdentity, new CallbackWrapper(path, cb));
}
@Override
public void on(String path, EventRunnable cb) {
global.put(cb.uniqueIdentity, new CallbackWrapper(path, cb));
}
on方法僅僅是將一個屬于EventRunnable
的uuid作為key,并將Callback作為value放入global這個map中夫否。為什么要這么做呢彻犁?因為在Map的key是不可重復(fù)的,存path肯定是不妥的凰慈。
EventFacadeImpl的方法簽名以及成員變量:
public class EventFacadeImpl implements EventFacade, CloudBusEventListener, Component, GlobalApiMessageInterceptor {
@Autowired
private CloudBus bus;
private final Map<String, CallbackWrapper> global = Collections.synchronizedMap(new HashMap<>());
private final Map<String, CallbackWrapper> local = new ConcurrentHashMap<>();
private EventSubscriberReceipt unsubscriber;
fire
相對的fire
方法:
@Override
public void fire(String path, Object data) {
assert path != null;
CanonicalEvent evt = new CanonicalEvent();
evt.setPath(path);
evt.setManagementNodeId(Platform.getManagementServerId());
if (data != null) {
/*
if (!TypeUtils.isPrimitiveOrWrapper(data.getClass()) && !data.getClass().isAnnotationPresent(NeedJsonSchema.class)) {
throw new CloudRuntimeException(String.format("data[%s] passed to canonical event is not annotated by @NeedJsonSchema", data.getClass().getName()));
}
*/
evt.setContent(data);
}
//從local這個map中找到對應(yīng)的event并調(diào)用
fireLocal(evt);
//將事件發(fā)送給對應(yīng)的webhook
callWebhooks(evt);
//通過cloudBus發(fā)送事件汞幢,關(guān)于cloudBus的源碼之后會講到
bus.publish(evt);
}
onLocal和on的區(qū)別
在上面的分析中并沒有看到global的event是如何被觸發(fā)的,如果想完全了解其中的過程微谓,還得從CloudBus說起森篷,我們稍后就會提到它。但是已經(jīng)可以猜到為何要區(qū)分on和onLocal了豺型。一個是通過消息總線觸發(fā)仲智,一個是在當前JVM進程內(nèi)觸發(fā)——這意味著一個支持ManagerNode集群事件,一個只支持單個MN事件姻氨。這也是來自于ZStack
的業(yè)務(wù)場景——有些事情需要MN一起做钓辆,有些事情一個MN做了其他MN就不用做了。介于篇幅肴焊,有興趣的讀者可以自行翻看代碼前联,這里不再詳舉。
WebHook
WebHook是ZStack向前端主動通信的手段之一抖韩。在注冊了相應(yīng)EventPath后蛀恩,該path被調(diào)用后則會向相應(yīng)的URL發(fā)送content。從case——CanonicalEventWebhookCase
和WebhookCase
可以看到它的正確使用姿勢茂浮。
CanonicalEventWebhookCase
class CanonicalEventWebhookCase extends SubCase {
EnvSpec envSpec
@Override
void clean() {
envSpec.delete()
}
@Override
void setup() {
INCLUDE_CORE_SERVICES = false
spring {
include("webhook.xml")
}
}
String WEBHOOK_PATH = "/canonical-event-webhook"
void testErrorToCreateWebhookifOpaqueFieldMissing() {
expect(AssertionError.class) {
createWebhook {
name = "webhook1"
url = "http://127.0.0.1:8989$WEBHOOK_PATH"
type = EventFacade.WEBHOOK_TYPE
}
}
}
void testCanonicalEventWithVariableInPath() {
String path = "/test/{uuid}/event"
int count = 0
WebhookInventory hook1 = createWebhook {
name = "webhook1"
url = "http://127.0.0.1:8989$WEBHOOK_PATH"
type = EventFacade.WEBHOOK_TYPE
opaque = path
}
// this webhook will not be called because path unmatching
WebhookInventory hook2 = createWebhook {
name = "webhook1"
url = "http://127.0.0.1:8989$WEBHOOK_PATH"
type = EventFacade.WEBHOOK_TYPE
opaque = "/this-path-does-not-match"
}
CanonicalEvent evt
envSpec.simulator(WEBHOOK_PATH) { HttpEntity<String> e ->
evt = json(e.getBody(), CanonicalEvent.class)
count ++
return [:]
}
String content = "hello world"
String eventPath = "/test/${Platform.uuid}/event"
bean(EventFacade.class).fire(eventPath, content)
retryInSecs {
assert count == 1
assert evt != null
assert evt.path == eventPath
assert evt.content == content
assert evt.managementNodeId == Platform.getManagementServerId()
}
}
void testCanonicalEventUseWebhook() {
String path = "/test/event"
WebhookInventory hook1 = createWebhook {
name = "webhook1"
url = "http://127.0.0.1:8989$WEBHOOK_PATH"
type = EventFacade.WEBHOOK_TYPE
opaque = path
}
WebhookInventory hook2 = createWebhook {
name = "webhook2"
url = "http://127.0.0.1:8989$WEBHOOK_PATH"
type = EventFacade.WEBHOOK_TYPE
opaque = path
}
def testFireTwoEvents = {
List<CanonicalEvent> evts = []
envSpec.simulator(WEBHOOK_PATH) { HttpEntity<String> e ->
CanonicalEvent evt = json(e.getBody(), CanonicalEvent.class)
evts.add(evt)
return [:]
}
String content = "hello world"
bean(EventFacade.class).fire(path, content)
retryInSecs {
assert evts.size() == 2
CanonicalEvent evt1 = evts[0]
CanonicalEvent evt2 = evts[1]
assert evt1.path == path
assert evt1.content == content
assert evt1.managementNodeId == Platform.getManagementServerId()
assert evt2.path == path
assert evt2.content == content
assert evt2.managementNodeId == Platform.getManagementServerId()
}
}
def testOneEventsGetAfterDeleteOneHook = {
deleteWebhook { uuid = hook1.uuid }
List<CanonicalEvent> evts = []
envSpec.simulator(WEBHOOK_PATH) { HttpEntity<String> e ->
CanonicalEvent evt = json(e.getBody(), CanonicalEvent.class)
evts.add(evt)
return [:]
}
String content = "hello world"
bean(EventFacade.class).fire(path, content)
retryInSecs {
assert evts.size() == 1
}
}
def testNoEventGetAfterDeleteAllHooks = {
deleteWebhook { uuid = hook2.uuid }
List<CanonicalEvent> evts = []
envSpec.simulator(WEBHOOK_PATH) { HttpEntity<String> e ->
CanonicalEvent evt = json(e.getBody(), CanonicalEvent.class)
evts.add(evt)
return [:]
}
String content = "hello world"
bean(EventFacade.class).fire(path, content)
retryInSecs {
assert evts.size() == 0
}
}
testFireTwoEvents()
testOneEventsGetAfterDeleteOneHook()
testNoEventGetAfterDeleteAllHooks()
}
@Override
void environment() {
envSpec = env {
// nothing
}
}
@Override
void test() {
envSpec.create {
testCanonicalEventUseWebhook()
testCanonicalEventWithVariableInPath()
testErrorToCreateWebhookifOpaqueFieldMissing()
}
}
}
WebhookCase
class WebhookCase extends SubCase {
EnvSpec envSpec
@Override
void clean() {
envSpec.delete()
}
@Override
void setup() {
INCLUDE_CORE_SERVICES = false
spring {
include("webhook.xml")
}
}
@Override
void environment() {
envSpec = env {
// nothing
}
}
void testWebhooksCRUD() {
WebhookInventory hook = null
def testCreateWebhook = {
def params = null
hook = createWebhook {
name = "webhook"
type = "custom-type"
url = "http://127.0.0.1:8080/webhook"
description = "desc"
opaque = "test data"
params = delegate
}
assert dbIsExists(hook.uuid, WebhookVO.class)
assert hook.name == params.name
assert hook.type == params.type
assert hook.url == params.url
assert hook.description == params.description
assert hook.opaque == params.opaque
}
def testQueryWebhook = {
List<WebhookInventory> invs = queryWebhook {
conditions = ["name=${hook.name}"]
}
assert invs.size() == 1
assert invs[0].uuid == hook.uuid
}
def testDeleteWebhook = {
deleteWebhook {
uuid = hook.uuid
}
assert !dbIsExists(hook.uuid, WebhookVO.class)
}
testCreateWebhook()
testQueryWebhook()
testDeleteWebhook()
}
void testInvalidUrl() {
expect(AssertionError.class) {
createWebhook {
name = "webhook"
type = "custom-type"
url = "this is not a url"
description = "desc"
opaque = "test data"
}
}
}
@Override
void test() {
envSpec.create {
testWebhooksCRUD()
testInvalidUrl()
}
}
}
CloudBus
CloudBus可以說是ZStack中最重要的組件了双谆,ZStack各個模塊的通信全部是由Message來完成的壳咕,而CloudBus就是它們的通信媒介,接下來我們來看它的源碼顽馋。
本節(jié)適合對AMQP有一定了解同學(xué)谓厘,如果不了解可以先看我的博客MQ學(xué)習小記
如何使用它
package org.zstack.test.core.cloudbus;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import org.zstack.core.cloudbus.CloudBusIN;
import org.zstack.core.componentloader.ComponentLoader;
import org.zstack.header.AbstractService;
import org.zstack.header.Service;
import org.zstack.header.message.Message;
import org.zstack.header.message.MessageReply;
import org.zstack.header.message.NeedReplyMessage;
import org.zstack.test.BeanConstructor;
import org.zstack.utils.Utils;
import org.zstack.utils.logging.CLogger;
import java.util.concurrent.TimeUnit;
public class TestCloudBusCall {
CLogger logger = Utils.getLogger(TestCloudBusCall.class);
ComponentLoader loader;
CloudBusIN bus;
Service serv;
public static class HelloWorldMsg extends NeedReplyMessage {
private String greet;
public String getGreet() {
return greet;
}
public void setGreet(String greet) {
this.greet = greet;
}
}
public static class HelloWorldReply extends MessageReply {
private String greet;
public String getGreet() {
return greet;
}
public void setGreet(String greet) {
this.greet = greet;
}
}
class FakeService extends AbstractService {
@Override
public boolean start() {
bus.registerService(this);
bus.activeService(this);
return true;
}
@Override
public boolean stop() {
bus.deActiveService(this);
bus.unregisterService(this);
return true;
}
@Override
public void handleMessage(Message msg) {
if (msg.getClass() == HelloWorldMsg.class) {
HelloWorldMsg hmsg = (HelloWorldMsg) msg;
HelloWorldReply r = new HelloWorldReply();
r.setGreet(hmsg.getGreet());
bus.reply(msg, r);
}
}
@Override
public String getId() {
return this.getClass().getCanonicalName();
}
}
@Before
public void setUp() throws Exception {
BeanConstructor con = new BeanConstructor();
loader = con.build();
bus = loader.getComponent(CloudBusIN.class);
serv = new FakeService();
serv.start();
}
@Test
public void test() throws InterruptedException, ClassNotFoundException {
HelloWorldMsg msg = new HelloWorldMsg();
msg.setGreet("Hello");
msg.setServiceId(FakeService.class.getCanonicalName());
msg.setTimeout(TimeUnit.SECONDS.toMillis(10));
HelloWorldReply r = (HelloWorldReply) bus.call(msg);
serv.stop();
Assert.assertEquals("Hello", r.getGreet());
}
}
我們注冊了一個Service,并覆寫HandleMessage方法寸谜,在Case中竟稳,我們成功收到了消息并通過了斷言。
再看一個:
~~~java
package org.zstack.test.core.cloudbus;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import org.zstack.core.cloudbus.CloudBusCallBack;
import org.zstack.core.cloudbus.CloudBusIN;
import org.zstack.core.componentloader.ComponentLoader;
import org.zstack.header.AbstractService;
import org.zstack.header.Service;
import org.zstack.header.message.Message;
import org.zstack.header.message.MessageReply;
import org.zstack.header.message.NeedReplyMessage;
import org.zstack.test.BeanConstructor;
import org.zstack.utils.Utils;
import org.zstack.utils.logging.CLogger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class TestCloudBusSendCallback {
CLogger logger = Utils.getLogger(TestCloudBusSendCallback.class);
ComponentLoader loader;
CloudBusIN bus;
CountDownLatch latch = new CountDownLatch(1);
boolean isSuccess = false;
Service serv;
public static class HelloWorldMsg extends NeedReplyMessage {
private String greet;
public String getGreet() {
return greet;
}
public void setGreet(String greet) {
this.greet = greet;
}
}
public static class HelloWorldReply extends MessageReply {
private String greet;
public String getGreet() {
return greet;
}
public void setGreet(String greet) {
this.greet = greet;
}
}
class FakeService extends AbstractService {
@Override
public boolean start() {
bus.registerService(this);
bus.activeService(this);
return true;
}
@Override
public boolean stop() {
bus.deActiveService(this);
bus.unregisterService(this);
return true;
}
@Override
public void handleMessage(Message msg) {
if (msg.getClass() == HelloWorldMsg.class) {
HelloWorldMsg hmsg = (HelloWorldMsg) msg;
HelloWorldReply r = new HelloWorldReply();
r.setGreet(hmsg.getGreet());
bus.reply(msg, r);
}
}
@Override
public String getId() {
return this.getClass().getCanonicalName();
}
}
@Before
public void setUp() throws Exception {
BeanConstructor con = new BeanConstructor();
loader = con.build();
bus = loader.getComponent(CloudBusIN.class);
serv = new FakeService();
serv.start();
}
@Test
public void test() throws InterruptedException, ClassNotFoundException {
HelloWorldMsg msg = new HelloWorldMsg();
msg.setGreet("Hello");
msg.setServiceId(FakeService.class.getCanonicalName());
msg.setTimeout(TimeUnit.SECONDS.toMillis(10));
bus.send(msg, new CloudBusCallBack(null) {
@Override
public void run(MessageReply reply) {
if (reply instanceof HelloWorldReply) {
HelloWorldReply hr = (HelloWorldReply) reply;
if ("Hello".equals(hr.getGreet())) {
isSuccess = true;
}
}
latch.countDown();
}
});
latch.await(15, TimeUnit.SECONDS);
serv.stop();
Assert.assertEquals(true, isSuccess);
}
}
同樣也是注冊了一個Service熊痴,然后使用了CallBack他爸,如果運行一下發(fā)現(xiàn)斷言是可以通過的——意味著CallBack有被調(diào)用。
綜上果善,使用CloudBus很簡單——只需要注冊你的Service诊笤,使用CloudBus指定Service發(fā)送,Service就能收到巾陕,如果你需要注冊你的CallBack讨跟,也能很簡單完成。
## 接口定義
這么好用的東西鄙煤,內(nèi)部實現(xiàn)恐怕不會太簡單晾匠。我們先從[接口](https://github.com/zstackio/zstack/blob/d6f511e6c15a2fab3e57a93637ada63cc4b3ee6c/core/src/main/java/org/zstack/core/cloudbus/CloudBus.java)開始看:
~~~java
package org.zstack.core.cloudbus;
import org.zstack.header.Component;
import org.zstack.header.Service;
import org.zstack.header.errorcode.ErrorCode;
import org.zstack.header.exception.CloudConfigureFailException;
import org.zstack.header.message.*;
import java.util.List;
public interface CloudBus extends Component {
void send(Message msg);
<T extends Message> void send(List<T> msgs);
void send(NeedReplyMessage msg, CloudBusCallBack callback);
@Deprecated
void send(List<? extends NeedReplyMessage> msgs, CloudBusListCallBack callBack);
@Deprecated
void send(List<? extends NeedReplyMessage> msgs, int parallelLevel, CloudBusListCallBack callBack);
@Deprecated
void send(List<? extends NeedReplyMessage> msgs, int parallelLevel, CloudBusSteppingCallback callback);
void route(List<Message> msgs);
void route(Message msg);
void reply(Message request, MessageReply reply);
void publish(List<Event> events);
void publish(Event event);
MessageReply call(NeedReplyMessage msg);
<T extends NeedReplyMessage> List<MessageReply> call(List<T> msg);
void registerService(Service serv) throws CloudConfigureFailException;
void unregisterService(Service serv);
EventSubscriberReceipt subscribeEvent(CloudBusEventListener listener, Event...events);
void dealWithUnknownMessage(Message msg);
void replyErrorByMessageType(Message msg, Exception e);
void replyErrorByMessageType(Message msg, String err);
void replyErrorByMessageType(Message msg, ErrorCode err);
void logExceptionWithMessageDump(Message msg, Throwable e);
String makeLocalServiceId(String serviceId);
void makeLocalServiceId(Message msg, String serviceId);
String makeServiceIdByManagementNodeId(String serviceId, String managementNodeId);
void makeServiceIdByManagementNodeId(Message msg, String serviceId, String managementNodeId);
String makeTargetServiceIdByResourceUuid(String serviceId, String resourceUuid);
void makeTargetServiceIdByResourceUuid(Message msg, String serviceId, String resourceUuid);
void installBeforeDeliveryMessageInterceptor(BeforeDeliveryMessageInterceptor interceptor, Class<? extends Message>...classes);
void installBeforeSendMessageInterceptor(BeforeSendMessageInterceptor interceptor, Class<? extends Message>...classes);
void installBeforePublishEventInterceptor(BeforePublishEventInterceptor interceptor, Class<? extends Event>...classes);
}
接口的命名語義較為清晰,在這里不多做解釋梯刚。開始我們的源碼閱讀之旅凉馆。
源碼解讀
CloudBus在ZStack Starting的時候做了什么?
init
init是在bean處于加載器亡资,Spring提供的一個鉤子句喜。在xml中我們可以看到聲明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:zstack="http://zstack.org/schema/zstack"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://zstack.org/schema/zstack
http://zstack.org/schema/zstack/plugin.xsd"
default-init-method="init" default-destroy-method="destroy">
<bean id="TimeoutManager" class="org.zstack.core.timeout.ApiTimeoutManagerImpl" />
<bean id="CloudBus" class = "org.zstack.core.cloudbus.CloudBusImpl2" depends-on="ThreadFacade,ThreadAspectj">
<zstack:plugin>
<zstack:extension interface="org.zstack.header.managementnode.ManagementNodeChangeListener" order="9999"/>
</zstack:plugin>
</bean>
<bean id="EventFacade" class = "org.zstack.core.cloudbus.EventFacadeImpl">
<zstack:plugin>
<zstack:extension interface="org.zstack.header.Component" />
<zstack:extension interface="org.zstack.header.apimediator.GlobalApiMessageInterceptor" />
</zstack:plugin>
</bean>
<bean id="ResourceDestinationMaker" class="org.zstack.core.cloudbus.ResourceDestinationMakerImpl" />
<bean id="MessageIntegrityChecker" class="org.zstack.core.cloudbus.MessageIntegrityChecker">
<zstack:plugin>
<zstack:extension interface="org.zstack.header.Component" />
</zstack:plugin>
</bean>
</beans>
init方法:
void init() {
trackerClose = CloudBusGlobalProperty.CLOSE_TRACKER;
serverIps = CloudBusGlobalProperty.SERVER_IPS;
tracker = new MessageTracker();
ConnectionFactory connFactory = new ConnectionFactory();
List<Address> addresses = CollectionUtils.transformToList(serverIps, new Function<Address, String>() {
@Override
public Address call(String arg) {
return Address.parseAddress(arg);
}
});
connFactory.setAutomaticRecoveryEnabled(true);
connFactory.setRequestedHeartbeat(CloudBusGlobalProperty.RABBITMQ_HEART_BEAT_TIMEOUT);
connFactory.setNetworkRecoveryInterval((int) TimeUnit.SECONDS.toMillis(CloudBusGlobalProperty.RABBITMQ_NETWORK_RECOVER_INTERVAL));
connFactory.setConnectionTimeout((int) TimeUnit.SECONDS.toMillis(CloudBusGlobalProperty.RABBITMQ_CONNECTION_TIMEOUT));
logger.info(String.format("use RabbitMQ server IPs: %s", serverIps));
try {
if (CloudBusGlobalProperty.RABBITMQ_USERNAME != null) {
connFactory.setUsername(CloudBusGlobalProperty.RABBITMQ_USERNAME);
logger.info(String.format("use RabbitMQ username: %s", CloudBusGlobalProperty.RABBITMQ_USERNAME));
}
if (CloudBusGlobalProperty.RABBITMQ_PASSWORD != null) {
connFactory.setPassword(CloudBusGlobalProperty.RABBITMQ_PASSWORD);
logger.info("use RabbitMQ password: ******");
}
if (CloudBusGlobalProperty.RABBITMQ_VIRTUAL_HOST != null) {
connFactory.setVirtualHost(CloudBusGlobalProperty.RABBITMQ_VIRTUAL_HOST);
logger.info(String.format("use RabbitMQ virtual host: %s", CloudBusGlobalProperty.RABBITMQ_VIRTUAL_HOST));
}
conn = connFactory.newConnection(addresses.toArray(new Address[]{}));
logger.debug(String.format("rabbitmq connection is established on %s", conn.getAddress()));
((Recoverable)conn).addRecoveryListener(new RecoveryListener() {
@Override
public void handleRecovery(Recoverable recoverable) {
logger.info(String.format("rabbitmq connection is recovering on %s", conn.getAddress().toString()));
}
});
channelPool = new ChannelPool(CloudBusGlobalProperty.CHANNEL_POOL_SIZE, conn);
createExchanges();
outboundQueue = new BusQueue(makeMessageQueueName(SERVICE_ID), BusExchange.P2P);
Channel chan = channelPool.acquire();
chan.queueDeclare(outboundQueue.getName(), false, false, true, queueArguments());
chan.basicConsume(outboundQueue.getName(), true, consumer);
chan.queueBind(outboundQueue.getName(), outboundQueue.getBusExchange().toString(), outboundQueue.getBindingKey());
channelPool.returnChannel(chan);
maid.construct();
noRouteEndPoint.construct();
tracker.construct();
tracker.trackService(SERVICE_ID);
} catch (Exception e) {
throw new CloudRuntimeException(e);
}
}
簡單來說,該函數(shù)嘗試獲取配置文件中與RabbitMQ中相關(guān)的配置沟于,并初始化Connection咳胃,并以此為基礎(chǔ)創(chuàng)建了channel poll。然后將一個channel和一個messageQueue綁定在了一起旷太。同時構(gòu)造了EventMaid和noRouteEndPoint和tracker展懈,后二者都是Message的消費者,看名字就可以看出來供璧,一個用于訂閱/發(fā)布模型(綁定此交換器的隊列都會收到消息)存崖,一個用于track。
start
start則是ZStack定義的一個鉤子睡毒,當ManagerNode起來的時候来惧,start會被調(diào)用到。
@Override
public boolean start() {
populateExtension();
prepareStatistics();
for (Service serv : services) {
assert serv.getId() != null : String.format("service id can not be null[%s]", serv.getClass().getName());
registerService(serv);
}
jmxf.registerBean("CloudBus", this);
return true;
}
一個個看:
private void populateExtension() {
services = pluginRgty.getExtensionList(Service.class);
for (ReplyMessagePreSendingExtensionPoint extp : pluginRgty.getExtensionList(ReplyMessagePreSendingExtensionPoint.class)) {
List<Class> clazzs = extp.getReplyMessageClassForPreSendingExtensionPoint();
if (clazzs == null || clazzs.isEmpty()) {
continue;
}
for (Class clz : clazzs) {
if (!(APIEvent.class.isAssignableFrom(clz)) && !(MessageReply.class.isAssignableFrom(clz))) {
throw new CloudRuntimeException(String.format("ReplyMessagePreSendingExtensionPoint can only marshal APIEvent or MessageReply. %s claimed by %s is neither APIEvent nor MessageReply",
clz.getName(), extp.getClass().getName()));
}
List<ReplyMessagePreSendingExtensionPoint> exts = replyMessageMarshaller.get(clz);
if (exts == null) {
exts = new ArrayList<ReplyMessagePreSendingExtensionPoint>();
replyMessageMarshaller.put(clz, exts);
}
exts.add(extp);
}
}
}
首先收集了所有繼承于Service的類,然后加載會改變msg reply的extensionPoint万栅。
private void prepareStatistics() {
List<Class> needReplyMsgs = BeanUtils.scanClassByType("org.zstack", NeedReplyMessage.class);
needReplyMsgs = CollectionUtils.transformToList(needReplyMsgs, new Function<Class, Class>() {
@Override
public Class call(Class arg) {
return !APIMessage.class.isAssignableFrom(arg) || APISyncCallMessage.class.isAssignableFrom(arg) ? arg : null;
}
});
for (Class clz : needReplyMsgs) {
MessageStatistic stat = new MessageStatistic();
stat.setMessageClassName(clz.getName());
statistics.put(stat.getMessageClassName(), stat);
}
}
為需要回復(fù)的msg設(shè)置統(tǒng)計信息。
之后就是把所有的Service收集起來艇拍,方便Msg的分發(fā)个初。
常用方法
CloudBus.makeLocalServiceId
@Override
public String makeLocalServiceId(String serviceId) {
return serviceId + "." + Platform.getManagementServerId();
}
@Override
public void makeLocalServiceId(Message msg, String serviceId) {
msg.setServiceId(makeLocalServiceId(serviceId));
}
如ZStack的伸縮性秘密武器:無狀態(tài)服務(wù)中所說一般痊班,每個管理節(jié)點都會注冊一堆服務(wù)隊列。因此我們要按照其格式組裝,這樣消息才能被服務(wù)所接收瘾婿。
CloudBus.makeTargetServiceIdByResourceUuid
@Override
public String makeTargetServiceIdByResourceUuid(String serviceId, String resourceUuid) {
DebugUtils.Assert(serviceId!=null, "serviceId cannot be null");
DebugUtils.Assert(resourceUuid!=null, "resourceUuid cannot be null");
//得到資源所在的MN UUID
String mgmtUuid = destMaker.makeDestination(resourceUuid);
return serviceId + "." + mgmtUuid;
}
@Override
public void makeTargetServiceIdByResourceUuid(Message msg, String serviceId, String resourceUuid) {
String targetService = makeTargetServiceIdByResourceUuid(serviceId, resourceUuid);
msg.setServiceId(targetService);
}
在ZStack中洗搂,ManagerNode很有可能是集群部署的,每個MN管控不同的資源凝果。那么就需要一致性哈希環(huán)來確定資源所在哪個MN。
CloudBus.send
@Override
public void send(final NeedReplyMessage msg, final CloudBusCallBack callback) {
//給msg一個超時時間
evaluateMessageTimeout(msg);
//new繼承于Envelope的匿名內(nèi)部類
Envelope e = new Envelope() {
//用來判斷這個msg是否已經(jīng)發(fā)出去了
AtomicBoolean called = new AtomicBoolean(false);
final Envelope self = this;
//計算超時睦尽,往線程池提交一個任務(wù)
TimeoutTaskReceipt timeoutTaskReceipt = thdf.submitTimeoutTask(new Runnable() {
@Override
public void run() {
self.timeout();
}
}, TimeUnit.MILLISECONDS, msg.getTimeout());
@Override
//msg 發(fā)送成功時候調(diào)用這個方法
public void ack(MessageReply reply) {
//計算該msg耗時
count(msg);
//根據(jù)msg的唯一UUID移除在這個map中的記錄
envelopes.remove(msg.getId());
//如果更新失敗器净,說明這個消息已經(jīng)被發(fā)送過了。返回
if (!called.compareAndSet(false, true)) {
return;
}
//取消一個計算超時的任務(wù)
timeoutTaskReceipt.cancel();
//調(diào)用注冊的callback
callback.run(reply);
}
//消息超時時調(diào)用的邏輯
@Override
public void timeout() {
// 根據(jù)msg的唯一UUID移除在這個map中的記錄
envelopes.remove(msg.getId());
// 如何已經(jīng)被調(diào)用過則返回
if (!called.compareAndSet(false, true)) {
return;
}
//內(nèi)部構(gòu)造一個超時reply返回給callback
callback.run(createTimeoutReply(msg));
}
//用于getWaitingReplyMessageStatistic
@Override
List<Message> getRequests() {
List<Message> requests = new ArrayList<Message>();
requests.add(msg);
return requests;
}
};
//往envelopes這個map里放入msg的唯一UUID和剛剛構(gòu)造的envelope
envelopes.put(msg.getId(), e);
//發(fā)送消息
send(msg, false);
}
私有方法:send
private void send(Message msg, Boolean noNeedReply) {
//msg的serviceID不允許為空当凡,不然不能
if (msg.getServiceId() == null) {
throw new IllegalArgumentException(String.format("service id cannot be null: %s", msg.getClass().getName()));
}
//為msg構(gòu)建基本屬性
basicProperty(msg);
//設(shè)置msg header屬性
msg.putHeaderEntry(CORRELATION_ID, msg.getId());
//消息的回復(fù)隊列設(shè)置
msg.putHeaderEntry(REPLY_TO, outboundQueue.getBindingKey());
if (msg instanceof APIMessage) {
// API always need reply
msg.putHeaderEntry(NO_NEED_REPLY_MSG, Boolean.FALSE.toString());
} else if (msg instanceof NeedReplyMessage) {
// for NeedReplyMessage sent without requiring receiver to reply,
// mark it, then it will not be tracked and replied
msg.putHeaderEntry(NO_NEED_REPLY_MSG, noNeedReply.toString());
}
buildRequestMessageMetaData(msg);
wire.send(msg);
}
該函數(shù)是一段公用邏輯山害。所有的消息都是從這里進去然后由rabbitMQ發(fā)出去的。所以在這里需要多說幾句沿量。
protected void basicProperty(Message msg) {
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
msg.setAMQPProperties(builder.deliveryMode(1).expiration(String.valueOf(TimeUnit.SECONDS.toMillis(CloudBusGlobalProperty.MESSAGE_TTL))).build());
}
這個函數(shù)設(shè)置了msg基礎(chǔ)屬性——持久化策略(否)和超時浪慌。
那么再看buildRequestMessageMetaData
方法
private void buildRequestMessageMetaData(Message msg) {
if (msg instanceof APIMessage || (msg instanceof NeedReplyMessage && !Boolean.valueOf((String)msg.getHeaderEntry(NO_NEED_REPLY_MSG)))) {
RequestMessageMetaData metaData;
if (msg instanceof LockResourceMessage) {
LockResourceMessage lmsg = (LockResourceMessage) msg;
LockMessageMetaData lmetaData = new LockMessageMetaData();
lmetaData.unlockKey = lmsg.getUnlockKey();
lmetaData.reason = lmsg.getReason();
lmetaData.senderManagementUuid = Platform.getManagementServerId();
metaData = lmetaData;
} else {
metaData = new RequestMessageMetaData();
}
metaData.needApiEvent = msg instanceof APIMessage && !(msg instanceof APISyncCallMessage);
metaData.msgId = msg.getId();
metaData.replyTo = msg.getHeaderEntry(REPLY_TO);
metaData.timeout = msg instanceof NeedReplyMessage ? ((NeedReplyMessage) msg).getTimeout() : null;
metaData.serviceId = msg.getServiceId();
metaData.messageName = msg.getClass().getName();
metaData.className = metaData.getClass().getName();
msg.getAMQPHeaders().put(MESSAGE_META_DATA, JSONObjectUtil.toJsonString(metaData));
}
}
方法buildRequestMessageMetaData
將消息所需的metaData從msg里取了出來并放入了msg的真正Header中。
然后是wire.send:
public void send(Message msg) {
// for unit test finding invocation chain
MessageCommandRecorder.record(msg.getClass());
List<BeforeSendMessageInterceptor> interceptors = beforeSendMessageInterceptors.get(msg.getClass());
if (interceptors != null) {
for (BeforeSendMessageInterceptor interceptor : interceptors) {
interceptor.intercept(msg);
/*
if (logger.isTraceEnabled()) {
logger.trace(String.format("called %s for message[%s]", interceptor.getClass(), msg.getClass()));
}
*/
}
}
for (BeforeSendMessageInterceptor interceptor : beforeSendMessageInterceptorsForAll) {
interceptor.intercept(msg);
/*
if (logger.isTraceEnabled()) {
logger.trace(String.format("called %s for message[%s]", interceptor.getClass(), msg.getClass()));
}
*/
}
send(msg, true);
}
邏輯一目了然:
- 記錄它的調(diào)用鏈
- 調(diào)用它專屬的發(fā)送前攔截器進行攔截
- 調(diào)用所有msg的發(fā)送前攔截器進行攔截
send(msg, true);
:
public void send(final Message msg, boolean makeQueueName) {
/*
StopWatch watch = new StopWatch();
watch.start();
*/
String serviceId = msg.getServiceId();
if (makeQueueName) {
//獲取真正的隊列名
serviceId = makeMessageQueueName(serviceId);
}
// build json schema
buildSchema(msg);
//當前的thread Context中獲取必要信息朴则。每個api調(diào)用所攜帶的uuid就是這樣傳遞下去的
evalThreadContextToMessage(msg);
if (logger.isTraceEnabled() && logMessage(msg)) {
logger.trace(String.format("[msg send]: %s", wire.dumpMessage(msg)));
}
//從channel poll 中取出一個channel
Channel chan = channelPool.acquire();
try {
//接下來單獨解釋
new RecoverableSend(chan, msg, serviceId, outboundQueue.getBusExchange()).send();
/*
watch.stop();
logger.debug(String.mediaType("sending %s cost %sms", msg.getClass().getName(), watch.getTime()));
*/
} catch (IOException e) {
throw new CloudRuntimeException(e);
} finally {
//返回給channel poll
channelPool.returnChannel(chan);
}
}
單獨分析new RecoverableSend(chan, msg, serviceId, outboundQueue.getBusExchange()).send();
:
private class RecoverableSend {
Channel chan;
byte[] data;
String serviceId;
Message msg;
BusExchange exchange;
RecoverableSend(Channel chan, Message msg, String serviceId, BusExchange exchange) throws IOException {
data = compressMessageIfNeeded(msg);
this.chan = chan;
this.serviceId = serviceId;
this.msg = msg;
this.exchange = exchange;
}
void send() throws IOException {
try {
chan.basicPublish(exchange.toString(), serviceId,
true, msg.getAMQPProperties(), data);
} catch (ShutdownSignalException e) {
if (!(conn instanceof AutorecoveringConnection) || serverIps.size() <= 1 || !Platform.IS_RUNNING) {
// the connection is not recoverable
throw e;
}
logger.warn(String.format("failed to send a message because %s; as the connection is recoverable," +
"we are doing recoverable send right now", e.getMessage()));
if (!recoverSend()) {
throw e;
}
}
}
private byte[] compressMessageIfNeeded(Message msg) throws IOException {
if (!CloudBusGlobalProperty.COMPRESS_NON_API_MESSAGE || msg instanceof APIEvent || msg instanceof APIMessage) {
return gson.toJson(msg, Message.class).getBytes();
}
msg.getAMQPHeaders().put(AMQP_PROPERTY_HEADER__COMPRESSED, "true");
return Compresser.deflate(gson.toJson(msg, Message.class).getBytes());
}
private boolean recoverSend() throws IOException {
int interval = conn.getHeartbeat() / 2;
interval = interval > 0 ? interval : 1;
int count = 0;
// as the connection is lost, there is no need to wait heart beat missing 8 times
// so we use reflection to fast the process
RecoveryAwareAMQConnection delegate = FieldUtils.getFieldValue("delegate", conn);
DebugUtils.Assert(delegate != null, "cannot get RecoveryAwareAMQConnection");
Field _missedHeartbeats = FieldUtils.getField("_missedHeartbeats", RecoveryAwareAMQConnection.class);
DebugUtils.Assert(_missedHeartbeats!=null, "cannot find _missedHeartbeats");
_missedHeartbeats.setAccessible(true);
try {
_missedHeartbeats.set(delegate, 100);
} catch (IllegalAccessException e) {
throw new CloudRuntimeException(e);
}
while (count < CloudBusGlobalProperty.RABBITMQ_RECOVERABLE_SEND_TIMES) {
try {
TimeUnit.SECONDS.sleep(interval);
} catch (InterruptedException e1) {
logger.warn(e1.getMessage());
}
try {
chan.basicPublish(exchange.toString(), serviceId,
true, msg.getAMQPProperties(), data);
return true;
} catch (ShutdownSignalException e) {
logger.warn(String.format("recoverable send fails %s times, will continue to retry %s times; %s",
count, CloudBusGlobalProperty.RABBITMQ_RECOVERABLE_SEND_TIMES-count, e.getMessage()));
count ++;
}
}
return false;
}
}
最核心的代碼即是:
chan.basicPublish(exchange.toString(), serviceId,
true, msg.getAMQPProperties(), data);
根據(jù)交換器权纤、綁定器的key和msg的基本屬性還有已經(jīng)序列化的msg在RabbitMQ中發(fā)送消息。
我們可以看一下該方法簽名:
/**
* Publish a message
* @see com.rabbitmq.client.AMQP.Basic.Publish
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param mandatory true if the 'mandatory' flag is to be set
* @param props other properties for the message - routing headers etc
* @param body the message body
* @throws java.io.IOException if an error is encountered
*/
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
throws IOException;
當mandatory標志位設(shè)置為true時乌妒,如果exchange根據(jù)自身類型和消息routeKey無法找到一個符合條件的queue汹想,那么會調(diào)用basic.return方法將消息返還給生產(chǎn)者;當mandatory設(shè)為false時撤蚊,出現(xiàn)上述情形broker會直接將消息扔掉古掏。
還有一個附有immediate的方法:
/**
* Publish a message
* @see com.rabbitmq.client.AMQP.Basic.Publish
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param mandatory true if the 'mandatory' flag is to be set
* @param immediate true if the 'immediate' flag is to be
* set. Note that the RabbitMQ server does not support this flag.
* @param props other properties for the message - routing headers etc
* @param body the message body
* @throws java.io.IOException if an error is encountered
*/
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;
當immediate標志位設(shè)置為true時,如果exchange在將消息route到queue(s)時發(fā)現(xiàn)對應(yīng)的queue上沒有消費者侦啸,那么這條消息不會放入隊列中冗茸。當與消息routeKey關(guān)聯(lián)的所有queue(一個或多個)都沒有消費者時,該消息會通過basic.return方法返還給生產(chǎn)者匹中。
CloudBus.reply
@Override
public void reply(Message request, MessageReply reply) {
if (Boolean.valueOf((String) request.getHeaderEntry(NO_NEED_REPLY_MSG))) {
if (logger.isTraceEnabled()) {
logger.trace(String.format("%s in message%s is set, drop reply%s", NO_NEED_REPLY_MSG,
wire.dumpMessage(request), wire.dumpMessage(reply)));
}
return;
}
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
reply.setAMQPProperties(builder.deliveryMode(1).build());
reply.getHeaders().put(IS_MESSAGE_REPLY, Boolean.TRUE.toString());
reply.putHeaderEntry(CORRELATION_ID, request.getId());
reply.setServiceId((String) request.getHeaderEntry(REPLY_TO));
buildResponseMessageMetaData(reply);
if (request instanceof NeedReplyMessage) {
callReplyPreSendingExtensions(reply, (NeedReplyMessage) request);
}
wire.send(reply, false);
}
其他屬性之前都有提到夏漱。reply.setServiceId((String) request.getHeaderEntry(REPLY_TO));
則是將reply統(tǒng)一經(jīng)過outboundQueue
這個隊列,同時根據(jù)correlationId
返回給原發(fā)送者顶捷。
callReplyPreSendingExtensions
則會根據(jù)需求改變reply結(jié)果挂绰。之后就是wire.send,之前已經(jīng)分析過了。
CloudBus.publish
@Override
public void publish(Event event) {
if (event instanceof APIEvent) {
APIEvent aevt = (APIEvent) event;
DebugUtils.Assert(aevt.getApiId() != null, String.format("apiId of %s cannot be null", aevt.getClass().getName()));
}
//和前面的msgProperty一樣
eventProperty(event);
//構(gòu)建metaData
buildResponseMessageMetaData(event);
//前面分析過了
callReplyPreSendingExtensions(event, null);
//調(diào)用beforeEventPublishInterceptors葵蒂。為了拋出異常的時候方便track交播,聲明了這樣的一個變量。
BeforePublishEventInterceptor c = null;
try {
List<BeforePublishEventInterceptor> is = beforeEventPublishInterceptors.get(event.getClass());
if (is != null) {
for (BeforePublishEventInterceptor i : is) {
c = i;
i.beforePublishEvent(event);
}
}
for (BeforePublishEventInterceptor i : beforeEventPublishInterceptorsForAll) {
c = i;
i.beforePublishEvent(event);
}
} catch (StopRoutingException e) {
if (logger.isTraceEnabled()) {
logger.trace(String.format("BeforePublishEventInterceptor[%s] stop publishing event: %s",
c == null ? "null" : c.getClass().getName(), JSONObjectUtil.toJsonString(event)));
}
return;
}
wire.publish(event);
}
接下來看wire.publish方法
public void publish(Event evt) {
/*
StopWatch watch = new StopWatch();
watch.start();
*/
buildSchema(evt);
evalThreadContextToMessage(evt);
if (logger.isTraceEnabled() && logMessage(evt)) {
logger.trace(String.format("[event publish]: %s", wire.dumpMessage(evt)));
}
Channel chan = channelPool.acquire();
try {
new RecoverableSend(chan, evt, evt.getType().toString(), BusExchange.BROADCAST).send();
/*
watch.stop();
logger.debug(String.mediaType("sending %s cost %sms", evt.getClass().getName(), watch.getTime()));
*/
} catch (IOException e) {
throw new CloudRuntimeException(e);
} finally {
channelPool.returnChannel(chan);
}
}
大部分方法和send
無異践付。但是在Event的類中定義了兩種Type:
package org.zstack.header.message;
import org.zstack.header.rest.APINoSee;
public abstract class Event extends Message {
/**
* @ignore
*/
@APINoSee
private String avoidKey;
public String getAvoidKey() {
return avoidKey;
}
public void setAvoidKey(String avoidKey) {
this.avoidKey = avoidKey;
}
public abstract Type getType();
public abstract String getSubCategory();
public static final String BINDING_KEY_PERFIX = "key.event.";
public static enum Category {
LOCAL,
API,
}
public static class Type {
private final String _name;
public Type(Category ctg, String subCtg) {
_name = BINDING_KEY_PERFIX + ctg.toString() + "." + subCtg;
}
@Override
public String toString() {
return _name;
}
@Override
public int hashCode() {
return _name.hashCode();
}
@Override
public boolean equals(Object t) {
if (!(t instanceof Type)) {
return false;
}
Type type = (Type) t;
return _name.equals(type.toString());
}
}
}
即Local和API秦士。從名字上很好看出來,一個用來回復(fù)APIMsg的永高,一個用來發(fā)布本地消息隧土。不過要了解這里面的細節(jié),就得看EventMaid
了命爬。
EventMaid
private class EventMaid extends AbstractConsumer {
Map<String, List<EventListenerWrapper>> listeners = new ConcurrentHashMap<String, List<EventListenerWrapper>>();
Channel eventChan;
String queueName = makeEventQueueName(String.format("eventMaid.%s", Platform.getUuid()));
public void construct() {
try {
eventChan = conn.createChannel();
eventChan.queueDeclare(queueName, false, false, true, queueArguments());
eventChan.basicConsume(queueName, true, this);
} catch (IOException e) {
throw new CloudRuntimeException(e);
}
}
public void destruct() {
try {
eventChan.close();
} catch (IOException e) {
throw new CloudRuntimeException(e);
}
}
public void listen(Event evt, EventListenerWrapper l) {
String type = evt.getType().toString();
try {
synchronized (listeners) {
List<EventListenerWrapper> lst = listeners.get(type);
if (lst == null) {
lst = new CopyOnWriteArrayList<EventListenerWrapper>();
listeners.put(type, lst);
eventChan.queueBind(queueName, BusExchange.BROADCAST.toString(), type);
logger.debug(String.format("[listening event]: %s", type));
}
if (!lst.contains(l)) {
lst.add(l);
}
}
} catch (IOException e) {
throw new CloudRuntimeException(e);
}
}
public void unlisten(Event evt, EventListenerWrapper l) {
String type = evt.getType().toString();
try {
synchronized (listeners) {
List<EventListenerWrapper> lst = listeners.get(type);
if (lst == null) {
return;
}
lst.remove(l);
if (lst.isEmpty()) {
listeners.remove(type);
eventChan.queueUnbind(queueName, BusExchange.BROADCAST.toString(), type);
logger.debug(String.format("[unlistening event]: %s", type));
}
}
} catch (IOException e) {
throw new CloudRuntimeException(e);
}
}
@SyncThread(level = 10)
@MessageSafe
private void dispatch(Event evt, EventListenerWrapper l) {
setThreadLoggingContext(evt);
l.callEventListener(evt);
}
private void handle(Event evt) {
String type = evt.getType().toString();
List<EventListenerWrapper> lst = listeners.get(type);
if (lst == null) {
return;
}
if (logger.isTraceEnabled()) {
logger.trace(String.format("[event received]: %s", wire.dumpMessage(evt)));
}
for (EventListenerWrapper l : lst) {
dispatch(evt, l);
}
}
@Override
public void handleDelivery(String s, com.rabbitmq.client.Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
Event evt = null;
try {
evt = (Event) wire.toMessage(bytes, basicProperties);
handle(evt);
} catch (final Throwable t) {
final Event fevt = evt;
throwableSafe(new Runnable() {
@Override
public void run() {
if (fevt != null) {
logger.warn(String.format("unhandled throwable when handling event[%s], dump: %s", fevt.getClass().getName(), wire.dumpMessage(fevt)), t);
} else {
logger.warn(String.format("unhandled throwable"), t);
}
}
});
}
}
}
這段代碼得先從handleDelivery
開始看:
@Override
public void handleDelivery(String s, com.rabbitmq.client.Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
Event evt = null;
try {
evt = (Event) wire.toMessage(bytes, basicProperties);
handle(evt);
} catch (final Throwable t) {
final Event fevt = evt;
throwableSafe(new Runnable() {
@Override
public void run() {
if (fevt != null) {
logger.warn(String.format("unhandled throwable when handling event[%s], dump: %s", fevt.getClass().getName(), wire.dumpMessage(fevt)), t);
} else {
logger.warn(String.format("unhandled throwable"), t);
}
}
});
}
}
可以看到曹傀,這里是重載了Consumer
接口的handleDelivery,我們看一下它的方法注釋:
/**
* Called when a <code><b>basic.deliver</b></code> is received for this consumer.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
* @param envelope packaging data for the message
* @param properties content header data for the message
* @param body the message body (opaque, client-specific byte array)
* @throws IOException if the consumer encounters an I/O error while processing the message
* @see Envelope
*/
void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException;
這樣保證EventMaid的對象能夠接收到Msg饲宛。在try代碼塊中皆愉,從byte轉(zhuǎn)換出了Event,然后走向了handle邏輯艇抠。
private void handle(Event evt) {
//前面提過幕庐,有兩種Type,API和Local
String type = evt.getType().toString();
//所以只會取出兩種List
List<EventListenerWrapper> lst = listeners.get(type);
if (lst == null) {
return;
}
if (logger.isTraceEnabled()) {
logger.trace(String.format("[event received]: %s", wire.dumpMessage(evt)));
}
for (EventListenerWrapper l : lst) {
//跳到下一個邏輯
dispatch(evt, l);
}
}
@SyncThread(level = 10)
@MessageSafe
private void dispatch(Event evt, EventListenerWrapper l) {
setThreadLoggingContext(evt);
//跳至下一段邏輯
l.callEventListener(evt);
}
@Override
public EventSubscriberReceipt subscribeEvent(final CloudBusEventListener listener, final Event... events) {
final EventListenerWrapper wrapper = new EventListenerWrapper() {
@Override
public void callEventListener(Event e) {
//走到各自的handle邏輯家淤,如果返回true則unlisten
if (listener.handleEvent(e)) {
maid.unlisten(e, this);
}
}
};
// 一個event對應(yīng)一個ListenWrapper
for (Event e : events) {
maid.listen(e, wrapper);
}
return new EventSubscriberReceipt() {
@Override
public void unsubscribe(Event e) {
maid.unlisten(e, wrapper);
}
@Override
public void unsubscribeAll() {
for (Event e : events) {
maid.unlisten(e, wrapper);
}
}
};
}
再看listen:
public void listen(Event evt, EventListenerWrapper l) {
String type = evt.getType().toString();
try {
synchronized (listeners) {
List<EventListenerWrapper> lst = listeners.get(type);
if (lst == null) {
lst = new CopyOnWriteArrayList<EventListenerWrapper>();
listeners.put(type, lst);
eventChan.queueBind(queueName, BusExchange.BROADCAST.toString(), type);
logger.debug(String.format("[listening event]: %s", type));
}
if (!lst.contains(l)) {
lst.add(l);
}
}
} catch (IOException e) {
throw new CloudRuntimeException(e);
}
}
首先加鎖了listeners這個put异剥,并根據(jù)type取出相應(yīng)的list。同時將這個list轉(zhuǎn)換為CopyOnWriteArrayList
媒鼓,這樣這個list的引用就不會泄露出去了。然后綁定一個channel作為通道错妖。另外绿鸣,如果EventListenerWrapper List中不存在提交的EventListenerWrapper,則添加進去暂氯。
相信講了這么多潮模,有一部分讀者可能已經(jīng)繞暈了。這邊寫一個關(guān)于EventMaid
的邏輯調(diào)用小結(jié):
- 在ZStack的每個Component啟動時痴施,會向CloudBus訂閱event擎厢。
- 當CloudBus收到需要publish的event,會向所有實現(xiàn)
CloudBusEventListener
接口的對象發(fā)送事件辣吃,由他們自己選擇是否處理這些事件动遭。
CloudBus和EventFascade就是這樣協(xié)同工作的。
小結(jié)
在本文神得,我們一起瀏覽的ZStack中提供消息驅(qū)動特性組件的源碼——顯然厘惦,這兩個組件的API非常好用,簡潔明了哩簿。但在具體邏輯中有幾個可以改進的點:
- handleEvent返回boolean的判斷為ture則取消listen宵蕉,語義上不是很好理解
- listen方法中的listeners可以用并發(fā)容器——ConcurrentHashMap代替酝静,以增加吞吐量。
- listeners的v完全可以用Set來代替羡玛。CopyOnWriteArrayList也可以用CopyOnWriteArraySet來代替别智。我們在listen方法中可以看到,如果lst不包含l稼稿,則add薄榛。這說明lst是不應(yīng)該重復(fù)的。