(一)安裝與啟動(dòng)
下載rocketmq-4.8.0二進(jìn)制包,啟動(dòng)name server和broker:
nohup sh /usr/rocketmq-4.8.0/bin/mqnamesrv > /dev/null 2>&1 &
nohup sh /usr/rocketmq-4.8.0/bin/mqbroker -n localhost:9876 -c conf/broker.conf > /dev/null 2>&1 &
注:RocketMQ默認(rèn)需求的jvm內(nèi)存比較大庭再,可以酌情減少,修改runserver.sh
牺堰、runbroker.sh
兩個(gè)啟動(dòng)腳本里邊關(guān)于jvm內(nèi)存參數(shù)的設(shè)置拄轻。
修改broker.conf配置文件,添加:
namesrvAddr=外網(wǎng)IP:9876
brokerIP1=外網(wǎng)IP
這樣是為了能夠使客戶端能夠遠(yuǎn)程的訪問(wèn)mq
停止mq:
./mqshutdown broker
./mqshutdown namesrv
(二)添加后臺(tái)管理端
修改/etc/profile
伟葫,添加環(huán)境變量:
export NAMESRV_ADDR=localhost:9876
然后source /etc/profile
使配置環(huán)境變量生效恨搓。
到https://github.com/apache/rocketmq-externals下載rocketmq-console項(xiàng)目源代碼,修改配置文件application.properties
啟用web端登陸密碼驗(yàn)證筏养,并在users.properties
里邊配置ACL的用戶和密碼:
rocketmq.config.loginRequired=true #開(kāi)啟web端登錄密碼驗(yàn)證
# Define Admin
admin=password,1
# Define Users
#user1=user1
#user2=user2
user1=query
maven編譯打包:
mvn clean package -Dmaven.test.skip=true
啟動(dòng)應(yīng)用:
nohup java -jar rocketmq-console-ng-2.0.0.jar --server.port=19876 --rocketmq.config.namesrvAddr=127.0.0.1:9876 > /dev/null 2>&1 &
訪問(wèn)地址: http://122.51.112.187:19876/#/login
(三)ACL訪問(wèn)控制列表
默認(rèn)搞好的RocketMQ是任意客戶端只要網(wǎng)絡(luò)通的話都可以進(jìn)行發(fā)布訂閱的斧抱,而ACL機(jī)制可以使得mq對(duì)客戶端的訪問(wèn)進(jìn)行校驗(yàn)。mq服務(wù)端需要配置broker.conf
和plain_acl.yml
兩個(gè)配置文件渐溶,前者是配置開(kāi)啟acl=true辉浦,后者是配置acl具體的賬戶與權(quán)限信息,權(quán)限信息支持設(shè)置的很細(xì)茎辐,具體到topic和group級(jí)別盏浙。
broker.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr=訪問(wèn)IP:9876
brokerIP1=訪問(wèn)IP
aclEnable=true
plain_acl.yml
globalWhiteRemoteAddresses:
accounts:
- accessKey: wanganmq
secretKey: rocket008
whiteRemoteAddress:
admin: false
defaultTopicPerm: PUB|SUB
defaultGroupPerm: PUB|SUB
topicPerms:
groupPerms:
# the group should convert to retry topic
- accessKey: mqadmin
secretKey: password
whiteRemoteAddress:
# if it is admin, it could access all resources
admin: true
(四)實(shí)踐心得
1、推薦開(kāi)啟ACL荔茬,然后admin賬戶交給管理員通過(guò)console和cli工具進(jìn)行管理使用废膘,應(yīng)用中配置普通賬戶,這樣方便進(jìn)行管理慕蔚。
2丐黄、一定要開(kāi)發(fā)規(guī)范中約定好topic和tag,以及producer/consumer group這些個(gè)條目的命名規(guī)范孔飒,以及約定代表的含義灌闺。
例如艰争,topic和tag首字母大寫駝峰,分別表示業(yè)務(wù)域和業(yè)務(wù)域所發(fā)生的事件桂对。用“應(yīng)用名-producer-group”來(lái)命名生產(chǎn)者group名字甩卓,"業(yè)務(wù)處理名-consumer-group" 等等。 見(jiàn)名知意的命名能夠提高團(tuán)隊(duì)開(kāi)發(fā)效率蕉斜。
3逾柿、跟其他開(kāi)源中間件一樣,了解底層實(shí)現(xiàn)原理永遠(yuǎn)都是對(duì)的宅此。不僅出現(xiàn)問(wèn)題能夠搞定和排查机错,應(yīng)用實(shí)踐上也能更給出最佳實(shí)踐。
4父腕、關(guān)于每個(gè)Consumer啟動(dòng)多少個(gè)線程并發(fā)去消費(fèi)弱匪。RocketMQ源碼里寫死默認(rèn)是20個(gè)線程,用的是jdk線程池璧亮∠艚耄可以調(diào)整。DefaultMQPushConsumer.setConsumeThreadMin(4)
附: 搞acl時(shí)的一個(gè)小插曲枝嘶,RocketMQ消息發(fā)布報(bào)錯(cuò):No accessKey is configured
No accessKey is configured 本來(lái)這個(gè)錯(cuò)誤是很簡(jiǎn)單個(gè)錯(cuò)誤财搁,本意是mq上邊開(kāi)啟了acl驗(yàn)證機(jī)制,而客戶端沒(méi)有配置accessKey躬络。但是筆者這次遇到的這個(gè)問(wèn)題比較有意思尖奔,廢了我半天時(shí)間去研究。在這里記錄一下穷当。
當(dāng)時(shí)筆者是先行自己封裝好了一個(gè)rocket的工具庫(kù)提茁,考慮到安全問(wèn)題,決定引入acl機(jī)制馁菜,然后主要是參照了網(wǎng)上的兩個(gè)文章茴扁。配置mq的broker和plain_acl兩個(gè)配置文件,客戶端加入acl相關(guān)代碼汪疮,主要是加入了AclRPCHook峭火,在發(fā)送消息之前插入accessKey和簽名供mq進(jìn)行校驗(yàn)。之后用測(cè)試工程引用本地maven庫(kù)上安裝的rocket工具庫(kù)進(jìn)行測(cè)試智嚷,結(jié)果之前能用的代碼在用了acl之后一直報(bào)錯(cuò):提示accessKey沒(méi)配置卖丸。
然后發(fā)現(xiàn)直接弄個(gè)帶main方法的類運(yùn)行例子里的代碼是可以的,一樣的代碼copy到測(cè)試工程里就不行了盏道。至此確認(rèn)應(yīng)該是客戶端問(wèn)題稍浆,mq的acl配置應(yīng)該是對(duì)的。
接下來(lái)1天各種實(shí)驗(yàn)無(wú)果,開(kāi)始回到起點(diǎn)衅枫,“源碼之下無(wú)秘密”嫁艇,潛心來(lái)看源代碼,搞清楚客戶端的mq發(fā)送消息以及acl的源碼是怎么寫的弦撩。
當(dāng)調(diào)用DefaultMQProducer的send方法步咪,同步的向mq投遞消息的時(shí)候,實(shí)際上是defaultMQProducerImpl.send(msg)益楼,也即DefaultMQProducerImpl的sendKernelImpl方法猾漫,關(guān)鍵代碼:sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage
,里邊其實(shí)是MQClientAPIImpl的如下方法:
private SendResult sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response,addr);
}
其中remotingClient是接口偏形,其實(shí)現(xiàn)是netty NettyRemotingClient invokeSync()静袖,后者會(huì)doBeforeRpcHooks(addr, request);
if (rpcHooks.size() > 0) {
for (RPCHook rpcHook: rpcHooks) {
rpcHook.doBeforeRequest(addr, request);
}
}
rpcHooks這里相當(dāng)于是NettyRemotingClient的成員變量觉鼻。
對(duì)于acl的DefaultMQProducer
DefaultMQProducer實(shí)例化的時(shí)候俊扭,是把rpcHook都給到了自己的defaultMQProducerImpl。
factory是在defaultMQProducerImpl.start()也就是DefaultMQProducer.start()的時(shí)候?qū)嵗摹?/p>
MQClientInstance是factory的實(shí)現(xiàn)坠陈,實(shí)例化的時(shí)候內(nèi)部,
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
然后factory提供這個(gè)mQClientAPIImpl實(shí)例萨惑。
defaultMQProducerImpl的start方法中實(shí)例化factory時(shí),會(huì)判斷如果生成過(guò)仇矾,就不再生成了庸蔼。也就是說(shuō)整個(gè)進(jìn)程只會(huì)有一個(gè)factory實(shí)例。關(guān)鍵代碼:
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
到這里贮匕,筆者搞清楚問(wèn)題了姐仅,剛好工程里有另外一個(gè)transactionProducer的代碼,是筆者用來(lái)測(cè)試事務(wù)消息的刻盐,沒(méi)加acl代碼掏膏,所以一旦先transactionProducer.start()創(chuàng)建了factory,那之后一直用這個(gè)無(wú)acl的敦锌,所以發(fā)送消息到mq的時(shí)候一直提示沒(méi)有accessKey就是這個(gè)原因馒疹。
(五)Spring整合RocketMQ開(kāi)發(fā)
編寫自己的rocketutil開(kāi)發(fā)包,然后maven install到本地maven庫(kù)上乙墙。需要使用的gradle項(xiàng)目在repositories添加mavenLocal()
整體思路:使用集群消息模式颖变,然后每個(gè)應(yīng)用服務(wù)使用1個(gè)單例的GeneralMqProducer
工具類,用于發(fā)送消息听想。再提供一個(gè)MQMsgHandler
接口腥刹,用于應(yīng)用服務(wù)自己編寫接口實(shí)現(xiàn):聲明訂閱消息topic和tag,收到訂閱消息之后需要回調(diào)執(zhí)行的業(yè)務(wù)邏輯汉买。MQMsgHandler
接口的實(shí)現(xiàn)類肛走、也可以稱為是消費(fèi)者添加自定義注解MsgConsumer
、通過(guò)Spring BeanPostProcessor機(jī)制加載到Spring容器。
消息Pojo:
/**
* 用于在MQ中進(jìn)行傳輸?shù)氖录? * */
@Data
public class EventMessage {
private String msgId; //消息的唯一標(biāo)識(shí)
private String producerGroup; //消息的生產(chǎn)者所屬的分組
private String topic; //消息所屬的主題(業(yè)務(wù)域)
private String tag; //消息tag朽色,即該主題下的什么事件
private String msgBody; //消息體, 一般為json string
private String publishTime; //發(fā)布時(shí)間
}
生產(chǎn)者工具類:
/**
* 每個(gè)應(yīng)用有一個(gè)producer單例邻吞。
* producergroup表示消息由哪個(gè)應(yīng)用集群投遞。
* topic代表消息屬于哪個(gè)業(yè)務(wù)域的消息葫男,tag代表該業(yè)務(wù)域下的哪種事件發(fā)生了抱冷。
* msgId是消息的唯一標(biāo)識(shí)
* */
@Slf4j
@Component
public class GeneralMqProducer {
@Value("${rocketmq.url}")
private String mqurl;
@Value("${rocketmq.accessKey}")
private String accessKey;
@Value("${rocketmq.secretKey}")
private String secretKey;
@Value("${rocketmq.producergroup.name}")
private String producerGroupName;
private DefaultMQProducer producer;
/**
* 同步發(fā)布消息
* */
public SendResult syncPublish(EventMessage eventMsg) {
try {
Message msg = new Message(eventMsg.getTopic(), eventMsg.getTag(), eventMsg.getMsgId(), eventMsg.getMsgBody().getBytes("utf-8"));
SendResult sendResult = producer.send(msg);
return sendResult;
} catch (Exception e) {
log.error("向RocketMQ發(fā)布消息失敗:" + e.getMessage(), e);
e.printStackTrace();
}
return null;
}
/**
* 異步發(fā)布消息
* SendCallback為消息發(fā)送完畢后的回調(diào)方法
* */
public void asyncPublish(EventMessage eventMsg, SendCallback callBack) {
try {
eventMsg.setMsgId(UUID.randomUUID().toString().replaceAll("-",""));
eventMsg.setProducerGroup(producerGroupName);
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS");
eventMsg.setPublishTime(LocalDateTime.now().format(formatter));
Message msg = new Message(eventMsg.getTopic(),
eventMsg.getTag(),
eventMsg.getMsgId(),
JSON.toJSONString(eventMsg).getBytes("utf-8"));
producer.send(msg, callBack);
log.info("消息已發(fā)布 msgID:{}, producer group:{}, topic:{}, tag:{}", eventMsg.getMsgId(), producerGroupName, eventMsg.getTopic(), eventMsg.getTag());
} catch (Exception e) {
log.error("向RocketMQ發(fā)布消息失斏液帧:" + e.getMessage(), e);
e.printStackTrace();
}
}
@PostConstruct
public void init() {
producer = new DefaultMQProducer(getAclRPCHook());
producer.setProducerGroup(producerGroupName);
producer.setNamesrvAddr(mqurl);
try {
producer.start();
log.info("RocketMQ客戶端producer初始化...");
} catch (MQClientException e) {
log.error("RocketMQ客戶端producer初始化失斖凇:" + e.getErrorMessage(), e);
}
}
private RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
}
消費(fèi)者Handler接口:
/**
* 消息消費(fèi)者interface
* 支持批量消費(fèi)
* */
public interface MQMsgHandler {
public void handleMsg(List<EventMessage> eventMessages);
}
消費(fèi)者注解:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface MsgConsumer {
public String consumerGroup();
public String topic();
public String tag();
}
最后是將消費(fèi)者Handler實(shí)現(xiàn)注入到Spring:
/**
* regist a consumer to MQ for all MQMsgHandler implementations
*
* */
@Slf4j
@Component
public class MQConsumersRegister implements BeanPostProcessor{
@Value("${rocketmq.url}")
private String mqurl;
@Value("${rocketmq.accessKey}")
private String accessKey;
@Value("${rocketmq.secretKey}")
private String secretKey;
@Value("${rocketmq.consumeThreadCorePoolSize:20}")
private int consumeThreadCorePoolSize;
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
MsgConsumer msgConsumer = bean.getClass().getAnnotation(MsgConsumer.class);
if(null != msgConsumer) { //如果有@MsgConsumer注解
String consumerGroup = msgConsumer.consumerGroup();
String topic = msgConsumer.topic();
String tag = msgConsumer.tag();
MQMsgHandler msgHandler = (MQMsgHandler) bean; //當(dāng)前bean應(yīng)是MQMsgHandler接口的實(shí)現(xiàn)
registConsumer(msgHandler, consumerGroup, topic, tag);
log.info("消費(fèi)者組{}已訂閱主題{}下{}事件" , consumerGroup, topic , tag );
}
return bean;
}
//注冊(cè)consumer,并使其訂閱相應(yīng)的topic盈咳、tag
private void registConsumer(MQMsgHandler msgHandler, String consumerGroup, String topic, String tag) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup, getAclRPCHook(), new AllocateMessageQueueAveragely());
try {
consumer.setNamesrvAddr(mqurl);
consumer.setConsumeThreadMin(consumeThreadCorePoolSize);
consumer.setConsumeThreadMax(consumeThreadCorePoolSize);
consumer.setPullBatchSize(32); //一次長(zhǎng)輪詢最多從mq里拿多少個(gè)消息耿眉,默認(rèn)32
consumer.subscribe(topic, tag);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
List<EventMessage> eventMsgs = new ArrayList<>();
String msgContent = null;
try {
for(MessageExt msg : msgs) {
msgContent = new String(msg.getBody(), "utf-8");
EventMessage eventMsg = JSON.parseObject(msgContent, EventMessage.class);
log.debug(JSON.toJSONString(eventMsg));
eventMsgs.add(eventMsg);
}
msgHandler.handleMsg(eventMsgs); //批量處理本次拉取的消息,執(zhí)行業(yè)務(wù)邏輯
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
log.error("消息編碼錯(cuò)誤:" + e.getMessage(), e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}catch(Exception e) {
log.error("注冊(cè)消費(fèi)者出錯(cuò)" + e.getMessage(), e);
}
}
//Access Control List控制
private RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
}
具體使用的話,筆者在使用OpenResty+Redis+RocketMQ構(gòu)建秒殺系統(tǒng) - 簡(jiǎn)書 (jianshu.com)一文里實(shí)際已經(jīng)用到了這個(gè)RocketMQ小工具庫(kù)鱼响。用來(lái)做訂單的異步入庫(kù)鸣剪。這里再貼一下代碼:
/**
* 異步寫預(yù)約訂單記錄
* */
private void sendAppointmentToMq(AppointmentDetail appointDetail) {
String appointJson = JSON.toJSONString(appointDetail);
EventMessage eventMsg = new EventMessage();
eventMsg.setTopic("order");
eventMsg.setTag("newOrder");
eventMsg.setMsgBody(appointJson);
generalMqProducer.asyncPublish(eventMsg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("預(yù)約訂單入庫(kù)消息寫入rocketmq成功,消息ID:{}", sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
//如果與mq通信故障了丈积,那么可以從日志文件里找到預(yù)約記錄筐骇,手工執(zhí)行寫入mysql
log.error("預(yù)約訂單寫入rocketmq失敗:{}, exception detail:{}" , appointJson , e.getMessage());
}
});
}
然后消費(fèi)者是可以批量的對(duì)消息進(jìn)行消費(fèi)的:
@Slf4j
@Component
@MsgConsumer(consumerGroup = "newOrder-consumer-group", tag = "newOrder", topic = "order")
public class NewOrderMsgHandler implements MQMsgHandler{
@Autowired
private AppointmentDetailRepository appointmentDetailRepository;
@Override
public void handleMsg(List<EventMessage> eventMessages) {
log.debug("收到mq消息: {}", JSON.toJSONString(eventMessages));
List<AppointmentDetail> appointmentDetails = new ArrayList<>();
for(EventMessage eventMsg : eventMessages) {
AppointmentDetail appointmentDetail = JSON.parseObject(eventMsg.getMsgBody(), AppointmentDetail.class);
appointmentDetails.add(appointmentDetail);
}
appointmentDetailRepository.saveAll(appointmentDetails); //批量入庫(kù)
}
}
總結(jié)一下,這個(gè)封裝庫(kù)就是典型的消息發(fā)布訂閱模式:發(fā)消息用生產(chǎn)者工具類GeneralMqProducer
的相應(yīng)方法直接發(fā)就行了江滨,訂閱的話需要實(shí)現(xiàn)MQMsgHandler
接口铛纬、在接口實(shí)現(xiàn)類上使用@MsgConsumer
注解來(lái)聲明訂閱的topic和tag、重寫handleMsg(eventMessages)
方法來(lái)編寫收到消息后的業(yè)務(wù)回調(diào)邏輯即可唬滑。