首先昆码,在一個企業(yè)級的架構(gòu)應(yīng)用中气忠,究竟何時需引入消息隊列呢?本人認(rèn)為赋咽,最經(jīng)常的情況旧噪,無非這幾種:做業(yè)務(wù)解耦、事件消息廣播脓匿、消息流控處理淘钟。其中,對于業(yè)務(wù)解耦是作為消息隊列陪毡,要解決的一個首要問題米母。所謂業(yè)務(wù)解耦,就是說在一個業(yè)務(wù)流程處理上毡琉,只關(guān)注具體的流程铁瞒,盡到通知的責(zé)任即可,不必等待消息處理的結(jié)果桅滋。
總得來看慧耍,企業(yè)級系統(tǒng)模塊通信的方式通常情況下身辨,無非兩種。
同步方式:REST芍碧、RPC方式實現(xiàn)煌珊;異步方式:消息中間件(消息隊列)方式實現(xiàn)。
同步方式的優(yōu)點:可以基于http協(xié)議之上泌豆,無需中間件代理定庵,系統(tǒng)架構(gòu)相對而言比較簡單。缺點是:客戶端和服務(wù)端緊密耦合践美,并且要實時在線通信洗贰,否則會導(dǎo)致消息發(fā)送失敗。
異步方式的優(yōu)點:客戶端和服務(wù)端互相解耦陨倡,雙方可以不產(chǎn)生依賴敛滋。缺點是:由于引入了消息中間件,在編程的時候會增加難度系數(shù)兴革。此外绎晃,消息中間件的可靠性、容錯性杂曲、健壯性往往成為這類架構(gòu)的決定性因素庶艾。
舉一個本人工作中的例子向大家說明一下:移動業(yè)務(wù)中的產(chǎn)品訂購中心,每當(dāng)一個用戶通過某些渠道(營業(yè)廳擎勘、自助終端等等)開通咱揍、訂購了某個套餐之后,如果這些套餐涉及第三方平臺派單的話棚饵,產(chǎn)品訂購中心會向第三方平臺發(fā)起訂購請求操作煤裙。試想一下,如果遇到高峰受理時間段噪漾,由于業(yè)務(wù)受理量的激增硼砰,導(dǎo)致一些外圍系統(tǒng)的響應(yīng)速度降低(比如業(yè)務(wù)網(wǎng)關(guān)響應(yīng)速度不及時、網(wǎng)絡(luò)延時等等原因)欣硼,最終用戶開通一個套餐花在主流程的時間會延長很多题翰,這個會造成極不好的用戶體驗,最終可能導(dǎo)致受理失敗诈胜。在上述的場景里面豹障,我們就可以很好的引入一個消息隊列進(jìn)行業(yè)務(wù)的解耦,具體來說耘斩,產(chǎn)品訂購中心只要“通知”第三方平臺沼填,我們的套餐開通成功了,并不一定非要同步阻塞地等待其真正的開通處理完成括授。正因為如此,消息隊列逐漸成為當(dāng)下系統(tǒng)模塊通信的主要方式手段。
當(dāng)今在Java的消息隊列通信領(lǐng)域荚虚,有很多主流的消息中間件薛夜,比如RabbitMQ、ActiveMQ版述、以及炙手可熱Kafka梯澜。其中ActiveMQ是基于JMS的標(biāo)準(zhǔn)之上開發(fā)定制的一套消息隊列系統(tǒng),性能穩(wěn)定渴析,訪問接口也非常友好晚伙,但是這類的消息隊列在訪問吞吐量上有所折扣;另外一個方面俭茧,比如Kafka這樣咆疗,以高效吞吐量著稱的消息隊列系統(tǒng),但是在穩(wěn)定性和可靠性上母债,能力似乎還不夠午磁,因此更多的是用在服務(wù)日志傳輸、短消息推送等等對于可靠性不高的業(yè)務(wù)場景之中毡们⊙富剩總結(jié)起來,不管是ActiveMQ還是Kafka衙熔,其框架的背后涉及到很多異步網(wǎng)絡(luò)通信登颓、多線程、高并發(fā)處理方面的專業(yè)技術(shù)知識红氯。但本文的重點框咙,也不在于介紹這些消息中間件背后的技術(shù)細(xì)節(jié),而是想重點闡述一下脖隶,如何透過上述消息隊列的基本原理扁耐,在必要的時候,開發(fā)定制一套符合自身業(yè)務(wù)要求的消息隊列系統(tǒng)時产阱,能夠獲得更加全面的視角去設(shè)計婉称、考量這些問題。
因此本人用心開發(fā)實現(xiàn)了一個构蹬,基于Netty的消息隊列系統(tǒng):AvatarMQ王暗。當(dāng)然,在設(shè)計庄敛、實現(xiàn)AvatarMQ的時候俗壹,我會適當(dāng)參考這些成熟消息中間件中用到的很多重要的思想理念。
當(dāng)各位從github上面下載到AvatarMQ的源代碼的時候藻烤,可以發(fā)現(xiàn)绷雏,其中的包結(jié)構(gòu)如下所示:
現(xiàn)在對每個包的主要功能進(jìn)行一下簡要說明(下面省略前綴com.newlandframework.avatarmq)头滔。
broker:消息中間件的服務(wù)器模塊,主要負(fù)責(zé)消息的路由涎显、負(fù)載均衡坤检,對于生產(chǎn)者、消費者進(jìn)行消息的應(yīng)答回復(fù)處理(ACK)期吓,AvatarMQ中的中心節(jié)點早歇,是連接生產(chǎn)者、消費者的橋梁紐帶讨勤。
consumer:消息中間件中的消費者模塊箭跳,負(fù)責(zé)接收生產(chǎn)者過來的消息,在設(shè)計的時候潭千,會對消費者進(jìn)行一個集群化管理谱姓,同一個集群標(biāo)識的消費者,會構(gòu)成一個大的消費者集群脊岳,作為一個整體逝段,接收生產(chǎn)者投遞過來的消息。此外割捅,還提供消費者接收消息相關(guān)的API給客戶端進(jìn)行調(diào)用奶躯。
producer:消息中間件中的生產(chǎn)者模塊,負(fù)責(zé)生產(chǎn)特定主題(Topic)的消息亿驾,傳遞給對此主題感興趣的消費者嘹黔,同時提供生產(chǎn)者生產(chǎn)消息的API接口,給客戶端使用莫瞬。
core:AvatarMQ中消息處理的核心模塊儡蔓,負(fù)責(zé)消息的內(nèi)存存儲、應(yīng)答控制疼邀、對消息進(jìn)行多線程任務(wù)分派處理喂江。
model:主要定義了AvatarMQ中的數(shù)據(jù)模型對象,比如MessageType消息類型旁振、MessageSource消息源頭等等模型對象的定義获询。
msg:主要定義了具體的消息類型對應(yīng)的結(jié)構(gòu)模型,比如消費者訂閱消息SubscribeMessage拐袜、消費者取消訂閱消息UnSubscribeMessage吉嚣,消息服務(wù)器應(yīng)答給生產(chǎn)者的應(yīng)答消息ProducerAckMessage、消息服務(wù)器應(yīng)答給消費者的應(yīng)答消息ConsumerAckMessage蹬铺。
netty:主要封裝了Netty網(wǎng)絡(luò)通信相關(guān)的核心模塊代碼尝哆,比如訂閱消息事件的路由分派策略、消息的編碼甜攀、解碼器等等秋泄。
serialize:利用Kryo這個優(yōu)秀高效的對象序列化琐馆、反序列框架對消息對象進(jìn)行序列化網(wǎng)絡(luò)傳輸。
spring:Spring的容器管理類印衔,負(fù)責(zé)把AvatarMQ中的消息服務(wù)器模塊:Broker啡捶,進(jìn)行容器化管理姥敛。這個包里面的AvatarMQServerStartup是整個AvatarMQ消息服務(wù)器的啟動入口奸焙。
test:這個就不用多說了,就是針對AvatarMQ進(jìn)行消息路由傳遞的測試demo彤敛。
AvatarMQ運行原理示意圖:
首先是消息生產(chǎn)者客戶端(AvatarMQ Producer)發(fā)送帶有主題的消息給消息轉(zhuǎn)發(fā)服務(wù)器(AvatarMQ Broker)与帆,消息轉(zhuǎn)發(fā)服務(wù)器確認(rèn)收到生產(chǎn)者的消息,發(fā)送ACK應(yīng)答給生產(chǎn)者墨榄,然后把消息繼續(xù)投遞給消費者(AvatarMQ Consumer)玄糟。同時broker服務(wù)器接收來自消費者的訂閱、取消訂閱消息袄秩,并發(fā)送ACK應(yīng)該給對應(yīng)的消費者阵翎,整個消息系統(tǒng)就是這樣周而復(fù)始的工作。
現(xiàn)在再來看一下之剧,AvatarMQ中的核心模塊的組成郭卫,如下圖所示:
Producer Manage:消息的生產(chǎn)者,其主要代碼在(com.newlandframework.avatarmq.producer)包之下背稼,其主要代碼模塊關(guān)鍵部分簡要說明如下:
package com.newlandframework.avatarmq.producer;
import com.newlandframework.avatarmq.core.AvatarMQAction;
import com.newlandframework.avatarmq.model.MessageSource;
import com.newlandframework.avatarmq.model.MessageType;
import com.newlandframework.avatarmq.model.RequestMessage;
import com.newlandframework.avatarmq.model.ResponseMessage;
import com.newlandframework.avatarmq.msg.Message;
import com.newlandframework.avatarmq.msg.ProducerAckMessage;
import com.newlandframework.avatarmq.netty.MessageProcessor;
import java.util.concurrent.atomic.AtomicLong;
/**
* @filename:AvatarMQProducer.java
* @description:AvatarMQProducer功能模塊
* @author tangjie<https://github.com/tang-jie>
* @blog http://www.cnblogs.com/jietang/
* @since 2016-8-11
*/
public class AvatarMQProducer extends MessageProcessor implements AvatarMQAction {
private boolean brokerConnect = false;
private boolean running = false;
private String brokerServerAddress;
private String topic;
private String defaultClusterId = "AvatarMQProducerClusters";
private String clusterId = "";
private AtomicLong msgId = new AtomicLong(0L);
//連接消息轉(zhuǎn)發(fā)服務(wù)器broker的ip地址贰军,以及生產(chǎn)出來消息附帶的主題信息
public AvatarMQProducer(String brokerServerAddress, String topic) {
super(brokerServerAddress);
this.brokerServerAddress = brokerServerAddress;
this.topic = topic;
}
//沒有連接上消息轉(zhuǎn)發(fā)服務(wù)器broker就發(fā)送的話殿漠,直接應(yīng)答失敗
private ProducerAckMessage checkMode() {
if (!brokerConnect) {
ProducerAckMessage ack = new ProducerAckMessage();
ack.setStatus(ProducerAckMessage.FAIL);
return ack;
}
return null;
}
//啟動消息生產(chǎn)者
public void start() {
super.getMessageConnectFactory().connect();
brokerConnect = true;
running = true;
}
//連接消息轉(zhuǎn)發(fā)服務(wù)器broker制肮,設(shè)定生產(chǎn)者消息處理鉤子,用于處理broker過來的消息應(yīng)答
public void init() {
ProducerHookMessageEvent hook = new ProducerHookMessageEvent();
hook.setBrokerConnect(brokerConnect);
hook.setRunning(running);
super.getMessageConnectFactory().setMessageHandle(new MessageProducerHandler(this, hook));
}
//投遞消息API
public ProducerAckMessage delivery(Message message) {
if (!running || !brokerConnect) {
return checkMode();
}
message.setTopic(topic);
message.setTimeStamp(System.currentTimeMillis());
RequestMessage request = new RequestMessage();
request.setMsgId(String.valueOf(msgId.incrementAndGet()));
request.setMsgParams(message);
request.setMsgType(MessageType.AvatarMQMessage);
request.setMsgSource(MessageSource.AvatarMQProducer);
message.setMsgId(request.getMsgId());
ResponseMessage response = (ResponseMessage) sendAsynMessage(request);
if (response == null) {
ProducerAckMessage ack = new ProducerAckMessage();
ack.setStatus(ProducerAckMessage.FAIL);
return ack;
}
ProducerAckMessage result = (ProducerAckMessage) response.getMsgParams();
return result;
}
//關(guān)閉消息生產(chǎn)者
public void shutdown() {
if (running) {
running = false;
super.getMessageConnectFactory().close();
super.closeMessageConnectFactory();
}
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getClusterId() {
return clusterId;
}
public void setClusterId(String clusterId) {
this.clusterId = clusterId;
}
}
Consumer Clusters Manage / Message Routing:消息的消費者集群管理以及消息路由模塊废境,其主要模塊在包(com.newlandframework.avatarmq.consumer)之中帘腹。其中消息消費者對象贰盗,對應(yīng)的核心代碼主要功能描述如下:
package com.newlandframework.avatarmq.consumer;
import com.google.common.base.Joiner;
import com.newlandframework.avatarmq.core.AvatarMQAction;
import com.newlandframework.avatarmq.core.MessageIdGenerator;
import com.newlandframework.avatarmq.core.MessageSystemConfig;
import com.newlandframework.avatarmq.model.MessageType;
import com.newlandframework.avatarmq.model.RequestMessage;
import com.newlandframework.avatarmq.msg.SubscribeMessage;
import com.newlandframework.avatarmq.msg.UnSubscribeMessage;
import com.newlandframework.avatarmq.netty.MessageProcessor;
/**
* @filename:AvatarMQConsumer.java
* @description:AvatarMQConsumer功能模塊
* @author tangjie<https://github.com/tang-jie>
* @blog http://www.cnblogs.com/jietang/
* @since 2016-8-11
*/
public class AvatarMQConsumer extends MessageProcessor implements AvatarMQAction {
private ProducerMessageHook hook;
private String brokerServerAddress;
private String topic;
private boolean subscribeMessage = false;
private boolean running = false;
private String defaultClusterId = "AvatarMQConsumerClusters";
private String clusterId = "";
private String consumerId = "";
//連接的消息服務(wù)器broker的ip地址以及關(guān)注的生產(chǎn)過來的消息鉤子
public AvatarMQConsumer(String brokerServerAddress, String topic, ProducerMessageHook hook) {
super(brokerServerAddress);
this.hook = hook;
this.brokerServerAddress = brokerServerAddress;
this.topic = topic;
}
//向消息服務(wù)器broker發(fā)送取消訂閱消息
private void unRegister() {
RequestMessage request = new RequestMessage();
request.setMsgType(MessageType.AvatarMQUnsubscribe);
request.setMsgId(new MessageIdGenerator().generate());
request.setMsgParams(new UnSubscribeMessage(consumerId));
sendSyncMessage(request);
super.getMessageConnectFactory().close();
super.closeMessageConnectFactory();
running = false;
}
//向消息服務(wù)器broker發(fā)送訂閱消息
private void register() {
RequestMessage request = new RequestMessage();
request.setMsgType(MessageType.AvatarMQSubscribe);
request.setMsgId(new MessageIdGenerator().generate());
SubscribeMessage subscript = new SubscribeMessage();
subscript.setClusterId((clusterId.equals("") ? defaultClusterId : clusterId));
subscript.setTopic(topic);
subscript.setConsumerId(consumerId);
request.setMsgParams(subscript);
sendAsynMessage(request);
}
public void init() {
super.getMessageConnectFactory().setMessageHandle(new MessageConsumerHandler(this, new ConsumerHookMessageEvent(hook)));
Joiner joiner = Joiner.on(MessageSystemConfig.MessageDelimiter).skipNulls();
consumerId = joiner.join((clusterId.equals("") ? defaultClusterId : clusterId), topic, new MessageIdGenerator().generate());
}
//連接消息服務(wù)器broker
public void start() {
if (isSubscribeMessage()) {
super.getMessageConnectFactory().connect();
register();
running = true;
}
}
public void receiveMode() {
setSubscribeMessage(true);
}
public void shutdown() {
if (running) {
unRegister();
}
}
public String getBrokerServerAddress() {
return brokerServerAddress;
}
public void setBrokerServerAddress(String brokerServerAddress) {
this.brokerServerAddress = brokerServerAddress;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public boolean isSubscribeMessage() {
return subscribeMessage;
}
public void setSubscribeMessage(boolean subscribeMessage) {
this.subscribeMessage = subscribeMessage;
}
public String getDefaultClusterId() {
return defaultClusterId;
}
public void setDefaultClusterId(String defaultClusterId) {
this.defaultClusterId = defaultClusterId;
}
public String getClusterId() {
return clusterId;
}
public void setClusterId(String clusterId) {
this.clusterId = clusterId;
}
}
消息的集群管理模塊,主要代碼是ConsumerContext.java阳欲、ConsumerClusters.java舵盈。先簡單說一下消費者集群模塊ConsumerClusters,主要負(fù)責(zé)定義消費者集群的行為胸完,以及負(fù)責(zé)消息的路由书释。主要的功能描述如下所示:
package com.newlandframework.avatarmq.consumer;
import com.newlandframework.avatarmq.model.RemoteChannelData;
import com.newlandframework.avatarmq.model.SubscriptionData;
import com.newlandframework.avatarmq.netty.NettyUtil;
import io.netty.channel.Channel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.collections.Predicate;
/**
* @filename:ConsumerClusters.java
* @description:ConsumerClusters功能模塊
* @author tangjie<https://github.com/tang-jie>
* @blog http://www.cnblogs.com/jietang/
* @since 2016-8-11
*/
public class ConsumerClusters {
//輪詢調(diào)度(Round-Robin Scheduling)位置標(biāo)記
private int next = 0;
private final String clustersId;
private final ConcurrentHashMap<String/*生產(chǎn)者消息的主題*/, SubscriptionData/*消息對應(yīng)的topic信息數(shù)據(jù)結(jié)構(gòu)*/> subMap
= new ConcurrentHashMap<String, SubscriptionData>();
private final ConcurrentHashMap<String/*消費者標(biāo)識編碼*/, RemoteChannelData/*對應(yīng)的消費者的netty網(wǎng)絡(luò)通信管道信息*/> channelMap
= new ConcurrentHashMap<String, RemoteChannelData>();
private final List<RemoteChannelData> channelList = Collections.synchronizedList(new ArrayList<RemoteChannelData>());
public ConsumerClusters(String clustersId) {
this.clustersId = clustersId;
}
public String getClustersId() {
return clustersId;
}
public ConcurrentHashMap<String, SubscriptionData> getSubMap() {
return subMap;
}
public ConcurrentHashMap<String, RemoteChannelData> getChannelMap() {
return channelMap;
}
//添加一個消費者到消費者集群
public void attachRemoteChannelData(String clientId, RemoteChannelData channelinfo) {
if (findRemoteChannelData(channelinfo.getClientId()) == null) {
channelMap.put(clientId, channelinfo);
subMap.put(channelinfo.getSubcript().getTopic(), channelinfo.getSubcript());
channelList.add(channelinfo);
} else {
System.out.println("consumer clusters exists! it's clientId:" + clientId);
}
}
//從消費者集群中刪除一個消費者
public void detachRemoteChannelData(String clientId) {
channelMap.remove(clientId);
Predicate predicate = new Predicate() {
public boolean evaluate(Object object) {
String id = ((RemoteChannelData) object).getClientId();
return id.compareTo(clientId) == 0;
}
};
RemoteChannelData data = (RemoteChannelData) CollectionUtils.find(channelList, predicate);
if (data != null) {
channelList.remove(data);
}
}
//根據(jù)消費者標(biāo)識編碼,在消費者集群中查找定位一個消費者赊窥,如果不存在返回null
public RemoteChannelData findRemoteChannelData(String clientId) {
return (RemoteChannelData) MapUtils.getObject(channelMap, clientId);
}
//負(fù)載均衡爆惧,根據(jù)連接到broker的順序,依次投遞消息給消費者锨能。這里的均衡算法直接采用
//輪詢調(diào)度(Round-Robin Scheduling)扯再,后續(xù)可以加入:加權(quán)輪詢芍耘、隨機輪詢、哈希輪詢等等策略熄阻。
public RemoteChannelData nextRemoteChannelData() {
Predicate predicate = new Predicate() {
public boolean evaluate(Object object) {
RemoteChannelData data = (RemoteChannelData) object;
Channel channel = data.getChannel();
return NettyUtil.validateChannel(channel);
}
};
CollectionUtils.filter(channelList, predicate);
return channelList.get(next++ % channelList.size());
}
//根據(jù)生產(chǎn)者的主題關(guān)鍵字斋竞,定位于具體的消息結(jié)構(gòu)
public SubscriptionData findSubscriptionData(String topic) {
return this.subMap.get(topic);
}
}
而ConsumerContext主要的負(fù)責(zé)管理消費者集群的,其主要核心代碼注釋說明如下:
package com.newlandframework.avatarmq.consumer;
import com.newlandframework.avatarmq.model.RemoteChannelData;
import com.newlandframework.avatarmq.model.SubscriptionData;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.collections.Predicate;
import org.apache.commons.collections.iterators.FilterIterator;
/**
* @filename:ConsumerContext.java
* @description:ConsumerContext功能模塊
* @author tangjie<https://github.com/tang-jie>
* @blog http://www.cnblogs.com/jietang/
* @since 2016-8-11
*/
public class ConsumerContext {
//消費者集群關(guān)系定義
private static final CopyOnWriteArrayList<ClustersRelation> relationArray = new CopyOnWriteArrayList<ClustersRelation>();
//消費者集群狀態(tài)
private static final CopyOnWriteArrayList<ClustersState> stateArray = new CopyOnWriteArrayList<ClustersState>();
public static void setClustersStat(String clusters, int stat) {
stateArray.add(new ClustersState(clusters, stat));
}
//根據(jù)消費者集群編碼cluster_id獲取一個消費者集群的狀態(tài)
public static int getClustersStat(String clusters) {
Predicate predicate = new Predicate() {
public boolean evaluate(Object object) {
String clustersId = ((ClustersState) object).getClusters();
return clustersId.compareTo(clusters) == 0;
}
};
Iterator iterator = new FilterIterator(stateArray.iterator(), predicate);
ClustersState state = null;
while (iterator.hasNext()) {
state = (ClustersState) iterator.next();
break;
}
return (state != null) ? state.getState() : 0;
}
//根據(jù)消費者集群編碼cluster_id查找一個消費者集群
public static ConsumerClusters selectByClusters(String clusters) {
Predicate predicate = new Predicate() {
public boolean evaluate(Object object) {
String id = ((ClustersRelation) object).getId();
return id.compareTo(clusters) == 0;
}
};
Iterator iterator = new FilterIterator(relationArray.iterator(), predicate);
ClustersRelation relation = null;
while (iterator.hasNext()) {
relation = (ClustersRelation) iterator.next();
break;
}
return (relation != null) ? relation.getClusters() : null;
}
//查找一下關(guān)注這個主題的消費者集群集合
public static List<ConsumerClusters> selectByTopic(String topic) {
List<ConsumerClusters> clusters = new ArrayList<ConsumerClusters>();
for (int i = 0; i < relationArray.size(); i++) {
ConcurrentHashMap<String, SubscriptionData> subscriptionTable = relationArray.get(i).getClusters().getSubMap();
if (subscriptionTable.containsKey(topic)) {
clusters.add(relationArray.get(i).getClusters());
}
}
return clusters;
}
//添加消費者集群
public static void addClusters(String clusters, RemoteChannelData channelinfo) {
ConsumerClusters manage = selectByClusters(clusters);
if (manage == null) {
ConsumerClusters newClusters = new ConsumerClusters(clusters);
newClusters.attachRemoteChannelData(channelinfo.getClientId(), channelinfo);
relationArray.add(new ClustersRelation(clusters, newClusters));
} else if (manage.findRemoteChannelData(channelinfo.getClientId()) != null) {
manage.detachRemoteChannelData(channelinfo.getClientId());
manage.attachRemoteChannelData(channelinfo.getClientId(), channelinfo);
} else {
String topic = channelinfo.getSubcript().getTopic();
boolean touchChannel = manage.getSubMap().containsKey(topic);
if (touchChannel) {
manage.attachRemoteChannelData(channelinfo.getClientId(), channelinfo);
} else {
manage.getSubMap().clear();
manage.getChannelMap().clear();
manage.attachRemoteChannelData(channelinfo.getClientId(), channelinfo);
}
}
}
//從一個消費者集群中刪除一個消費者
public static void unLoad(String clientId) {
for (int i = 0; i < relationArray.size(); i++) {
String id = relationArray.get(i).getId();
ConsumerClusters manage = relationArray.get(i).getClusters();
if (manage.findRemoteChannelData(clientId) != null) {
manage.detachRemoteChannelData(clientId);
}
if (manage.getChannelMap().size() == 0) {
ClustersRelation relation = new ClustersRelation();
relation.setId(id);
relationArray.remove(id);
}
}
}
}
ACK Queue Dispatch:主要是broker分別向?qū)?yīng)的消息生產(chǎn)者秃殉、消費者發(fā)送ACK消息應(yīng)答坝初,其主要核心模塊是在:com.newlandframework.avatarmq.broker包下面的AckPullMessageController和AckPushMessageController模塊,主要職責(zé)是在broker中收集生產(chǎn)者的消息钾军,確認(rèn)成功收到之后鳄袍,把其放到消息隊列容器中,然后專門安排一個工作線程池把ACK應(yīng)答發(fā)送給生產(chǎn)者吏恭。
Message Queue Dispatch:生產(chǎn)者消息的分派拗小,主要是由com.newlandframework.avatarmq.broker包下面的SendMessageController派發(fā)模塊進(jìn)行任務(wù)的分派,其中消息分派支持兩種策略樱哼,一種是內(nèi)存緩沖消息區(qū)里面只要一有消息就通知消費者哀九;還有一種是對消息進(jìn)行緩沖處理,累計到一定的數(shù)量之后進(jìn)行派發(fā)搅幅,這個是根據(jù):MessageSystemConfig類中的核心參數(shù):SystemPropertySendMessageControllerTaskCommitValue(com.newlandframework.avatarmq.system.send.taskcommit)決定的阅束,默認(rèn)是1。即一有消息就派發(fā)盏筐,如果改成大于1的數(shù)值围俘,表示消息緩沖的數(shù)量。現(xiàn)在給出SendMessageController的核心實現(xiàn)代碼:
<pre language="javascript" code_block="true">package com.newlandframework.avatarmq.broker;
import com.newlandframework.avatarmq.core.SemaphoreCache;
import com.newlandframework.avatarmq.core.MessageSystemConfig;
import com.newlandframework.avatarmq.core.MessageTaskQueue;
import com.newlandframework.avatarmq.core.SendMessageCache;
import com.newlandframework.avatarmq.model.MessageDispatchTask;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* @filename:SendMessageController.java
* @description:SendMessageController功能模塊
* @author tangjie<https://github.com/tang-jie>
* @blog http://www.cnblogs.com/jietang/
* @since 2016-8-11
*/
public class SendMessageController implements Callable<Void> {
private volatile boolean stoped = false;
private AtomicBoolean flushTask = new AtomicBoolean(false);
private ThreadLocal<ConcurrentLinkedQueue<MessageDispatchTask>> requestCacheList = new ThreadLocal<ConcurrentLinkedQueue<MessageDispatchTask>>() {
protected ConcurrentLinkedQueue<MessageDispatchTask> initialValue() {
return new ConcurrentLinkedQueue<MessageDispatchTask>();
}
};
private final Timer timer = new Timer("SendMessageTaskMonitor", true);
public void stop() {
stoped = true;
}
public boolean isStoped() {
return stoped;
}
public Void call() {
int period = MessageSystemConfig.SendMessageControllerPeriodTimeValue;
int commitNumber = MessageSystemConfig.SendMessageControllerTaskCommitValue;
int sleepTime = MessageSystemConfig.SendMessageControllerTaskSleepTimeValue;
ConcurrentLinkedQueue<MessageDispatchTask> queue = requestCacheList.get();
SendMessageCache ref = SendMessageCache.getInstance();
while (!stoped) {
SemaphoreCache.acquire(MessageSystemConfig.NotifyTaskSemaphoreValue);
MessageDispatchTask task = MessageTaskQueue.getInstance().getTask();
queue.add(task);
if (queue.size() == 0) {
try {
Thread.sleep(sleepTime);
continue;
} catch (InterruptedException ex) {
Logger.getLogger(SendMessageController.class.getName()).log(Level.SEVERE, null, ex);
}
}
if (queue.size() > 0 && (queue.size() % commitNumber == 0 || flushTask.get() == true)) {
ref.commit(queue);
queue.clear();
flushTask.compareAndSet(true, false);
}
timer.scheduleAtFixedRate(new TimerTask() {
public void run() {
try {
flushTask.compareAndSet(false, true);
} catch (Exception e) {
System.out.println("SendMessageTaskMonitor happen exception");
}
}
}, 1000 * 1, period);
}
return null;
}
}</pre>
消息分派采用多線程并行派發(fā)琢融,其內(nèi)部通過柵欄機制界牡,為消息派發(fā)設(shè)置一個屏障點,后續(xù)可以暴露給JMX接口漾抬,進(jìn)行對整個消息系統(tǒng)宿亡,消息派發(fā)情況的動態(tài)監(jiān)控。比如發(fā)現(xiàn)消息積壓太多纳令,可以加大線程并行度挽荠。消息無堆積的話,降低線程并行度平绩,減輕系統(tǒng)負(fù)荷∪Υ遥現(xiàn)在給出消息派發(fā)任務(wù)模塊SendMessageTask的核心代碼:
<pre language="javascript" code_block="true">package com.newlandframework.avatarmq.core;
import com.newlandframework.avatarmq.msg.ConsumerAckMessage;
import com.newlandframework.avatarmq.msg.Message;
import com.newlandframework.avatarmq.broker.SendMessageLauncher;
import com.newlandframework.avatarmq.consumer.ClustersState;
import com.newlandframework.avatarmq.consumer.ConsumerContext;
import com.newlandframework.avatarmq.model.MessageType;
import com.newlandframework.avatarmq.model.RequestMessage;
import com.newlandframework.avatarmq.model.ResponseMessage;
import com.newlandframework.avatarmq.model.RemoteChannelData;
import com.newlandframework.avatarmq.model.MessageSource;
import com.newlandframework.avatarmq.model.MessageDispatchTask;
import com.newlandframework.avatarmq.netty.NettyUtil;
import java.util.concurrent.Callable;
import java.util.concurrent.Phaser;
/**
* @filename:SendMessageTask.java
* @description:SendMessageTask功能模塊
* @author tangjie<https://github.com/tang-jie>
* @blog http://www.cnblogs.com/jietang/
* @since 2016-8-11
*/
public class SendMessageTask implements Callable<Void> {
private MessageDispatchTask[] tasks;
//消息柵欄器,為后續(xù)進(jìn)行消息JMX實時監(jiān)控預(yù)留接口
private Phaser phaser = null;
private SendMessageLauncher launcher = SendMessageLauncher.getInstance();
public SendMessageTask(Phaser phaser, MessageDispatchTask[] tasks) {
this.phaser = phaser;
this.tasks = tasks;
}
public Void call() throws Exception {
for (MessageDispatchTask task : tasks) {
Message msg = task.getMessage();
if (ConsumerContext.selectByClusters(task.getClusters()) != null) {
RemoteChannelData channel = ConsumerContext.selectByClusters(task.getClusters()).nextRemoteChannelData();
ResponseMessage response = new ResponseMessage();
response.setMsgSource(MessageSource.AvatarMQBroker);
response.setMsgType(MessageType.AvatarMQMessage);
response.setMsgParams(msg);
response.setMsgId(new MessageIdGenerator().generate());
try {
//消息派發(fā)的時候捏雌,發(fā)現(xiàn)管道不可達(dá)跃赚,跳過
if (!NettyUtil.validateChannel(channel.getChannel())) {
ConsumerContext.setClustersStat(task.getClusters(), ClustersState.NETWORKERR);
continue;
}
RequestMessage request = (RequestMessage) launcher.launcher(channel.getChannel(), response);
ConsumerAckMessage result = (ConsumerAckMessage) request.getMsgParams();
if (result.getStatus() == ConsumerAckMessage.SUCCESS) {
ConsumerContext.setClustersStat(task.getClusters(), ClustersState.SUCCESS);
}
} catch (Exception e) {
ConsumerContext.setClustersStat(task.getClusters(), ClustersState.ERROR);
}
}
}
//若干個并行的線程共同到達(dá)統(tǒng)一的屏障點之后,再進(jìn)行消息統(tǒng)計,把數(shù)據(jù)最終匯總給JMX
phaser.arriveAndAwaitAdvance();
return null;
}
}</pre>
Message Serialize:消息的序列化模塊纬傲,主要基于Kryo满败。其主要的核心代碼為:com.newlandframework.avatarmq.serialize包下面的KryoCodecUtil、KryoSerialize完成消息的序列化和反序列化工作叹括。其對應(yīng)的主要核心代碼模塊是:
<pre language="javascript" code_block="true">package com.newlandframework.avatarmq.serialize;
import com.esotericsoftware.kryo.pool.KryoPool;
import io.netty.buffer.ByteBuf;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
* @filename:KryoCodecUtil.java
* @description:KryoCodecUtil功能模塊
* @author tangjie<https://github.com/tang-jie>
* @blog http://www.cnblogs.com/jietang/
* @since 2016-8-11
*/
public class KryoCodecUtil implements MessageCodecUtil {
private KryoPool pool;
public KryoCodecUtil(KryoPool pool) {
this.pool = pool;
}
public void encode(final ByteBuf out, final Object message) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = null;
try {
byteArrayOutputStream = new ByteArrayOutputStream();
KryoSerialize kryoSerialization = new KryoSerialize(pool);
kryoSerialization.serialize(byteArrayOutputStream, message);
byte[] body = byteArrayOutputStream.toByteArray();
int dataLength = body.length;
out.writeInt(dataLength);
out.writeBytes(body);
} finally {
byteArrayOutputStream.close();
}
}
public Object decode(byte[] body) throws IOException {
ByteArrayInputStream byteArrayInputStream = null;
try {
byteArrayInputStream = new ByteArrayInputStream(body);
KryoSerialize kryoSerialization = new KryoSerialize(pool);
Object obj = kryoSerialization.deserialize(byteArrayInputStream);
return obj;
} finally {
byteArrayInputStream.close();
}
}
}</pre>
<pre language="javascript" code_block="true">package com.newlandframework.avatarmq.serialize;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.pool.KryoPool;
import com.google.common.io.Closer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* @filename:KryoSerialize.java
* @description:KryoSerialize功能模塊
* @author tangjie<https://github.com/tang-jie>
* @blog http://www.cnblogs.com/jietang/
* @since 2016-8-11
*/
public class KryoSerialize {
private KryoPool pool = null;
private Closer closer = Closer.create();
public KryoSerialize(final KryoPool pool) {
this.pool = pool;
}
public void serialize(OutputStream output, Object object) throws IOException {
try {
Kryo kryo = pool.borrow();
Output out = new Output(output);
closer.register(out);
closer.register(output);
kryo.writeClassAndObject(out, object);
pool.release(kryo);
} finally {
closer.close();
}
}
public Object deserialize(InputStream input) throws IOException {
try {
Kryo kryo = pool.borrow();
Input in = new Input(input);
closer.register(in);
closer.register(input);
Object result = kryo.readClassAndObject(in);
pool.release(kryo);
return result;
} finally {
closer.close();
}
}
}</pre>
Netty Core:基于Netty對producer算墨、consumer、broker的網(wǎng)絡(luò)事件處理器(Handler)進(jìn)行封裝處理汁雷,核心模塊在:com.newlandframework.avatarmq.netty包之下净嘀。其中broker的Netty網(wǎng)絡(luò)事件處理器為ShareMessageEventWrapper、producer的Netty網(wǎng)絡(luò)事件處理器為MessageProducerHandler摔竿、consumer的Netty網(wǎng)絡(luò)事件處理器為MessageConsumerHandler面粮。其對應(yīng)的類圖為:
可以看到,他們共同的父類是:MessageEventWrapper继低。該類的代碼簡要說明如下:
<pre language="javascript" code_block="true">package com.newlandframework.avatarmq.netty;
import com.newlandframework.avatarmq.core.HookMessageEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.NameMatchMethodPointcutAdvisor;
/**
* @filename:MessageEventWrapper.java
* @description:MessageEventWrapper功能模塊
* @author tangjie<https://github.com/tang-jie>
* @blog http://www.cnblogs.com/jietang/
* @since 2016-8-11
*/
public class MessageEventWrapper<T> extends ChannelInboundHandlerAdapter implements MessageEventHandler, MessageEventProxy {
final public static String proxyMappedName = "handleMessage";
protected MessageProcessor processor;
protected Throwable cause;
protected HookMessageEvent<T> hook;
protected MessageConnectFactory factory;
private MessageEventWrapper<T> wrapper;
public MessageEventWrapper() {
}
public MessageEventWrapper(MessageProcessor processor) {
this(processor, null);
}
public MessageEventWrapper(MessageProcessor processor, HookMessageEvent<T> hook) {
this.processor = processor;
this.hook = hook;
this.factory = processor.getMessageConnectFactory();
}
public void handleMessage(ChannelHandlerContext ctx, Object msg) {
return;
}
public void beforeMessage(Object msg) {
}
public void afterMessage(Object msg) {
}
//管道鏈路激活
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
//讀管道數(shù)據(jù)
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
super.channelRead(ctx, msg);
ProxyFactory weaver = new ProxyFactory(wrapper);
NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor();
advisor.setMappedName(MessageEventWrapper.proxyMappedName);
advisor.setAdvice(new MessageEventAdvisor(wrapper, msg));
weaver.addAdvisor(advisor);
//具體的如何處理管道中的數(shù)據(jù),直接由producer稍走、consumer袁翁、broker自行決定
MessageEventHandler proxyObject = (MessageEventHandler) weaver.getProxy();
proxyObject.handleMessage(ctx, msg);
}
//管道鏈路失效,可能網(wǎng)絡(luò)連接斷開了婿脸,后續(xù)如果重連broker粱胜,可以在這里做文章
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}
public void setWrapper(MessageEventWrapper<T> wrapper) {
this.wrapper = wrapper;
}
}</pre>
整個AvatarMQ消息隊列系統(tǒng)的運行情況,可以參考:Netty構(gòu)建分布式消息隊列(AvatarMQ)設(shè)計指南之架構(gòu)篇狐树,里面說的很詳細(xì)了焙压,本文就不具體演示了。
下圖是VisualVM監(jiān)控AvatarMQ中broker服務(wù)器的CPU使用率曲線抑钟。
可以發(fā)現(xiàn)涯曲,隨著消息的堆積,broker進(jìn)行消息投遞在塔、ACK應(yīng)答的壓力增大幻件,CPU的使用率明細(xì)提高。現(xiàn)在具體看下broker的CPU使用率增高的原因是調(diào)用哪個熱點方法呢蛔溃?
從下圖可以看出绰沥,熱點方法是:SemaphoreCache的acquire。
這個是因為broker接收來自生產(chǎn)者消息的同時贺待,會先把消息緩存起來徽曲,然后利用多線程機制進(jìn)行消息的分派,這個時候會對信號量維護(hù)的許可集合進(jìn)行獲取操作麸塞,獲取成功之后秃臣,才能進(jìn)行任務(wù)的派發(fā),主要防止臨界區(qū)的共享資源競爭喘垂。這里的Semaphore是用來控制多線程訪問共享資源(生產(chǎn)者過來的消息)甜刻,類似操作系統(tǒng)中的PV原語绍撞,P原語相當(dāng)于acquire(),V原語相當(dāng)于release()得院。
寫在最后
本文通過一個基于Netty構(gòu)建分布式消息隊列系統(tǒng)(AvatarMQ)傻铣,簡單地闡述了一個極簡消息中間件的內(nèi)部結(jié)構(gòu)、以及如何利用Netty祥绞,構(gòu)建生產(chǎn)者非洲、消費者消息路由的通信模塊。一切都是從零開始蜕径,開發(fā)两踏、實現(xiàn)出精簡版的消息中間件!本系列文章的主要靈感源自兜喻,自己業(yè)余時間梦染,閱讀到的一些消息隊列原理闡述文章以及相關(guān)開源消息中間件的源代碼,其中也結(jié)合了自己的一些理解和體會朴皆。由于自身技術(shù)水平帕识、理解能力方面的限制,不能可能擁有大師一樣高屋建瓴的視角遂铡,本文有說得不對肮疗、寫的不好的地方,懇請廣大同行批評指正“墙樱現(xiàn)在伪货,文章寫畢,算是對自己平時學(xué)習(xí)的一些經(jīng)驗總結(jié)钾怔,在這之前碱呼,對于消息中間件都是很簡單的使用別人造好的輪子,沒有更多的深入了解背后的技術(shù)細(xì)節(jié)蒂教,只是單純的覺得別人寫的很強大巍举、很高效。其實有的時候提升自己能力凝垛,要更多的深究其背后的技術(shù)原理懊悯,舉一反三,而不是簡單的蜻蜓點水梦皮,一味地點到為止炭分,長此以往、日復(fù)一日剑肯,自身的技術(shù)積累就很難有質(zhì)的飛躍捧毛。
AvatarMQ一定還有許多不足、瓶頸甚至是bug,確實它不是一個完美的消息中間件呀忧,真因為如此师痕,還需要不斷地進(jìn)行重構(gòu)優(yōu)化。后續(xù)本人還會持續(xù)更新而账、維護(hù)這個開源項目胰坟,希望有興趣的朋友,共同關(guān)注泞辐!
文章略長笔横,謝謝大家的觀賞,如果覺得不錯咐吼,還請多多推薦吹缔!