1梢什、RocketMQ
[官網(wǎng)地址] (http://rocketmq.apache.org)
function : 應(yīng)用解耦践图、流量消峰、消息分發(fā)菇曲、保證最終一致性冠绢、方便動態(tài)擴容等
2、linux 單機RocketMQ
1常潮、準(zhǔn)備RocketMQ
-
從官網(wǎng)下載編譯好的二進制文件弟胀,或者下載源碼自己編譯。
RocketMQ 當(dāng)前的最新版本是4.2.0
系統(tǒng)要求: 64bit 的Linux 喊式、Unix 或Mac 孵户。
Java 版本大于等于JDKl.8 。
如果需要從GitHub 上下載源碼和編譯的話需要安裝Maven 3.2.x 和Git 岔留。
[root@aliyun rocketmq-all-4.2.0-bin]#> unzip rocketmq-all-4.2.0-bin-release.zip -d ./rocketmq-all-4.2.0-bin
[root@aliyun rocketmq-all-4.2.0-bin]# cd rocketmq-all-4.2.0-bin
[root@aliyun rocketmq-all-4.2.0-bin]# ls
里面含有以下內(nèi)容: LICENSE NOTICE README.md benchmark/ bin/ conf/ lib/
+ LICENSE 夏哭、NOTICE 和README.md 包括一些版權(quán)聲明和功能說明信息;
+ benchmark 里包括運行benchmark 程序的shell 腳本献联;
+ bin 文件夾里含有各種使用RocketMQ的shell腳本和cmd 腳本竖配,比如啟動NameServer的mqnamesrv啟動Broker的mqbroker,集群管理腳本mqadmin 等里逆;
+ conf 文件夾里有一些示例配置文件械念,包括三種方式的broker 配置文件、logback 日志配置文件等运悲,用戶在寫配置文件的時候,一般基于這些示例配置文件项钮,加上自己特殊的需求即可班眯;
+ lib 文件夾里包括RocketMQ各個模塊編譯成的jar 包,以及RocketMQ 依賴的一些jar包烁巫,比如Netty署隘、commons-lang禽绪、FastJSON 等况鸣。
2、啟動RocketMQ服務(wù)
啟動單機的消息隊列服務(wù)比較簡單幼衰,不需要寫配置文件,只需要依次啟動本機的NameServer 和Broker 即可诊霹。
啟動NameServer:
[root@aliyun rocketmq-all-4.2.0-bin]#> nohup sh bin/mqnamesrv &
[root@aliyun rocketmq-all-4.2.0-bin]# tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success . ..
啟動B roker :
[root@aliyun rocketmq-all-4.2.0-bin]# nohup sh bin/mqbroker -n localhost:9876 &
[root@aliyun rocketmq-all-4.2.0-bin]# tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 192.168.0.233 : 10911] boot success .. .
!!!!!! 1->內(nèi)存不足
如果Java運行時環(huán)境的內(nèi)存不足羞延,修改jdk參數(shù)配置
rocketmq-all-4.2.0-bin/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
rocketmq-all-4.2.0-bin/bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m"
!!!!!! 2-> org.apache.rocketmq.client.exception.MQClientException: No route info of this topic
nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &
3、用命令行發(fā)送和接收消息
實際上就是運行寫好的demo 程序脾还,后續(xù)我們可以參考這些demo 來寫自己的發(fā)送和接收程序伴箩。
運行示例程序,發(fā)送和接收消息:
[root@aliyun rocketmq-all-4.2.0-bin]# export NAMESRV_ADDR=localhost:9876
[root@aliyun rocketmq-all-4.2.0-bin]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND OK, msgid= ...
[root@aliyun rocketmq-all-4.2.0-bin]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread 主d Receive New Messages : [MessageExt . ..
4鄙漏、關(guān)閉消息隊列
[root@aliyun rocketmq-all-4.2.0-bin]# sh bin/mqshutdown broker
Send shutdown request to mqbroker (36695 ) OK
[root@aliyun rocketmq-all-4.2.0-bin]# sh bin/mqshutdown namesrv
Send shutdown request t o mqnamesrv (36664) OK
5嗤谚、java client發(fā)送、消費消息 demo
- java client
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.0.0-incubating</version>
</dependency>
- Producer
創(chuàng)建DefaultMQProducer對象怔蚌,設(shè)置好GroupName和NameServer后啟動巩步,把待發(fā)送的消息拼裝成Message對象,用Producer發(fā)送桦踊。
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producer1");
// 設(shè)置NameServer地址 , 多個地址之間用椅野;分隔 ,但是也可以通過環(huán)境變量的方式設(shè)置 钞钙。
producer.setNamesrvAddr("47.104.209.137:9876");
producer.start();
for (int i = 0; i < 10; i++) {
try {
Message msg = new Message(
"TopicTest",
"TagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//這里調(diào)用的是同步的方式鳄橘,所以會有返回結(jié)果
SendResult sendResult = producer.send(msg);
//打印返回結(jié)果,可以看到消息發(fā)送的狀態(tài)以及一些相關(guān)信息
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
- Consumer
設(shè)置GroupName 芒炼、NameServer 地址以及端口號瘫怜。然后指明要操作的Topic 名稱,最后進入發(fā)送和接收邏輯本刽。
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
consumer.setNamesrvAddr("47.104.209.137:9876");
//這里設(shè)置的是一個consumer的消費策略
//CONSUME_FROM_LAST_OFFSET 默認(rèn)策略鲸湃,從該隊列最尾開始消費,即跳過歷史消息
//CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費子寓,即歷史消息(還儲存在broker的)全部消費一遍
//CONSUME_FROM_TIMESTAMP 從某個時間點開始消費暗挑,和setConsumeTimestamp()配合使用,默認(rèn)是半個小時以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//設(shè)置consumer所訂閱的Topic和Tag斜友,*代表全部的Tag
consumer.subscribe("TopicTest", "*");
//設(shè)置一個Listener炸裆,主要進行消息的邏輯處理
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
//返回消費狀態(tài)
//CONSUME_SUCCESS 消費成功
//RECONSUME_LATER 消費失敗,需要稍后重新消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//調(diào)用start()方法啟動consumer
consumer.start();
System.out.println("Consumer Started.");
}
}