項(xiàng)目里遇到需要在ActiveMQ上記錄任務(wù)日志的場(chǎng)景。其實(shí)AMQ本身自帶一個(gè)LoggingPlugin灿意,但是用起來(lái)總是不切合項(xiàng)目的實(shí)際場(chǎng)景估灿。思來(lái)想去,正好前段時(shí)間為其他項(xiàng)目做了個(gè)MQTT協(xié)議認(rèn)證的插件脾歧,技術(shù)基礎(chǔ)已經(jīng)有了甲捏,還是自己給項(xiàng)目寫一個(gè)定制化版的插件吧。
在我之前的文章ActiveMQ插件開(kāi)發(fā)里介紹了如何開(kāi)發(fā)一個(gè)AMQ的插件鞭执。其實(shí)這次的功能就是基于之前的代碼里的內(nèi)容進(jìn)行修改的司顿。主要功能是每次AMQ接收到一個(gè)任務(wù)消息后,就往一臺(tái)服務(wù)器上使用HTTP POST方法發(fā)送一條消息兄纺。表示任務(wù)流經(jīng)MQ大溜。
先來(lái)看入口類,相比之前的代碼估脆,增加了兩個(gè)參數(shù)钦奋,這兩個(gè)參數(shù)可以在配置文件activemq.xml中實(shí)現(xiàn)手動(dòng)配置。
package com.cn.amqs;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.plugin.StatisticsBrokerPlugin;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class MessageLogPlugin implements BrokerPlugin {
private Log log = LogFactory.getLog(StatisticsBrokerPlugin.class);
private String seviceUrl;
private String sign;
public Broker installPlugin(Broker broker) throws Exception {
log.info("install MessageLogPlugin");
return new MessageLog(broker,serviceUrl,sign);
}
public void setServiceUrl(String serviceUrl) {
this.serviceUrl=serviceUrl;
}
……
}
主要功能在MessageLog類中疙赠,實(shí)現(xiàn)了幾個(gè)功能:
- 每來(lái)一個(gè)任務(wù)消息付材,判斷消息是否曾經(jīng)來(lái)過(guò),如果是第一次收到圃阳,則發(fā)送一條post消息到服務(wù)器上
- 記錄一個(gè)任務(wù)的消息數(shù)量
- 為了防止任務(wù)數(shù)量無(wú)限增長(zhǎng)厌衔,設(shè)置了定時(shí)清理機(jī)制(但是由于每個(gè)任務(wù)都設(shè)置了Timer,適用的場(chǎng)景應(yīng)該是任務(wù)較少或者任務(wù)可清理時(shí)間較短的場(chǎng)景捍岳,否則也是對(duì)資源的消耗)
- 區(qū)分任務(wù)上行還是任務(wù)下行
package com.cn.amqs;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* 實(shí)現(xiàn)每次任務(wù)到達(dá)MQ時(shí)自動(dòng)往一個(gè)地址上送一條信息
* @author MiSterRabbit
*/
public class MessageLog extends BrokerFilter{
private Log log;
/**下行任務(wù)HashMap*/
private ConcurrentHashMap<Object, Integer> downWards;
/**上行任務(wù)HashMap*/
private ConcurrentHashMap<Object, Integer> upWards;
private String seviceUrl;
private String sign;
public MessageLog(Broker next,String seviceUrl,String sign) {
super(next);
downWards = new ConcurrentHashMap<Object, Integer>();
upWards = new ConcurrentHashMap<Object, Integer>();
this.seviceUrl=seviceUrl;
this.sign=sign.isEmpty()? "分部":"總部";
log = LogFactory.getLog(com.cn.amqs.MessageLog.class);
log.info("initialize Message Log plugin");
}
/**
* Timer類富寿,實(shí)現(xiàn)定時(shí)清理日志HashMap,防止Map的無(wú)限增長(zhǎng)
*/
class missionTimer extends TimerTask {
private String missionID;
private Log log;
private ConcurrentHashMap<Object, Integer> map;
public missionTimer(String missionID, Log log, ConcurrentHashMap<Object, Integer> map) {
this.missionID=missionID;
this.log=log;
this.map=map;
}
@Override
public void run() {
this.map.remove(missionID);
this.log.info("[FLOW_LOG] Remove expired mission: "+missionID);
}
}
/**
* 判斷日志是否在map中锣夹,如果不在页徐,則發(fā)送一條消息,若存在银萍,則增加計(jì)數(shù)器
* @param missionID 任務(wù)號(hào)
* @param map 任務(wù)下發(fā)和任務(wù)上送使用不同的map
*/
public synchronized void insertIntoMap(String missionID, ConcurrentHashMap<Object,Integer> map,String direction) {
if(map.containsKey(missionID)) {
int count = map.get(missionID)+1;
map.put(missionID,count);
this.log.debug("[FLOW_LOG] "+map);
} else{
map.put(missionID,1);
this.log.info("[FLOW_LOG] Receive a new "+direction+" mission: "+missionID);
// 開(kāi)啟一個(gè)線程發(fā)送一條任務(wù)數(shù)據(jù)变勇,這里的MissionSend類其實(shí)就是開(kāi)啟一個(gè)線程發(fā)送一條http post消息
if (direction.equalsIgnoreCase("DOWNWARD")){
MissionSend tmqs = new MissionSend(missionID, super.getBrokerName().toString().substring(3), "ActiveMQ", "/opt/activemq/apache-activemq-5.13.4/data/mission.log", this.sign+"MQ收到下行任務(wù)", "ok", this.seviceUrl);
new Thread(tmqs,"mission_send").start();
} else {
MissionSend tmqs = new MissionSend(missionID, super.getBrokerName().toString().substring(3), "ActiveMQ", "/opt/activemq/apache-activemq-5.13.4/data/mission.log", this.sign+"MQ收到上行任務(wù)", "ok", this.seviceUrl);
new Thread(tmqs,"mission_send").start();
}
// 使用Timer定時(shí)清理,1800秒后清理這個(gè)任務(wù)
Timer timer =new Timer();
TimerTask task = new missionTimer(missionID,this.log,downWards);
timer.schedule(task,1800000);
}
}
/**
* 每當(dāng)MQ收到一條生產(chǎn)者發(fā)送過(guò)來(lái)的消息的時(shí)候執(zhí)行判斷贴唇。
*/
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
// 如果任務(wù)ID不為空贰锁,且不是經(jīng)由集群內(nèi)部發(fā)過(guò)來(lái)的消息
if ((messageSend.getProperty("misid") != null) &&
(!producerExchange.getProducerState().getInfo().getProducerId().toString().contains("MQ_"))) {
// 如果目的地不包含UPLAOD字段,則判斷為消息下行滤蝠,否則為消息上行豌熄。記錄一個(gè)日志,然后調(diào)用insertIntoMap判斷是否需要發(fā)送http post消息
if (!messageSend.getDestination().toString().toLowerCase().contains("upload")) {
this.log.info("[FLOW_LOG] Down Mission: " + messageSend.getProperty("misid") + ". Destination: " + messageSend.getDestination() + ". Producer: "+producerExchange.getConnectionContext().getConnection().getRemoteAddress());
insertIntoMap(messageSend.getProperty("misid").toString(),downWards,"DOWNWARD");
} else {
this.log.info("[FLOW_LOG] Up Mission: " + messageSend.getProperty("misid") + ". Destination: " + messageSend.getDestination()+". Producer: "+producerExchange.getConnectionContext().getConnection().getRemoteAddress());
insertIntoMap(messageSend.getProperty("misid").toString(),upWards,"UPWARD");
}
}
super.send(producerExchange, messageSend);
}
}
MissionSend的類可以自由擴(kuò)展物咳。我就不贅述了锣险。
插件功能為AMQ帶來(lái)了極強(qiáng)的擴(kuò)展性蹄皱,用戶可以實(shí)現(xiàn)在不對(duì)現(xiàn)有功能進(jìn)行修改的前提下進(jìn)行功能的二次開(kāi)發(fā)。有空我會(huì)整理一個(gè)插件可以實(shí)現(xiàn)的功能清單芯肤。其實(shí)如果有空巷折,看看BrokerFilter這個(gè)類,就能明白插件能實(shí)現(xiàn)的功能了崖咨。