一、消息中間件對比
kafka | RocketMQ | RabbitMQ | ||
---|---|---|---|---|
定位 | 設計定位 | 系統(tǒng)間的數(shù)據(jù)流管道嚷狞,實時數(shù)據(jù)處理块促。例如常規(guī)的消息系統(tǒng)、監(jiān)控數(shù)據(jù)床未、日志收集 | 可靠的消息傳輸竭翠,例如消息推送 | 可靠的消息傳輸,與RocketMQ類似薇搁。 |
開發(fā)語言 | Scala | Java | Erlang | |
客戶端語言 | Java斋扰,Python,C | Java | Java臊恋,Python梆惯,C | |
注冊中心 | Zookeeper | namespace | 無 | |
選舉方式 | 自動選舉 | 不支持自動選舉 | 無 | |
數(shù)據(jù)可靠性 | 很好。支持同步刷盤毙石,同步復制损离,但性能差哥艇。 | 很好,支持同僻澎、異步刷盤貌踏,同步雙寫,異步復制 | 好 | |
消息寫入性能 | 非常好窟勃,每條10個字符測試:百萬條/s祖乳,Topic數(shù)量60個左右后性能會下降 | 很好,每條10個字符測試:單機單broker 7w/s秉氧,單機3broker 12w/s眷昆,Topic數(shù)量支持5W條左右 | 好,2W/s左右 | |
性能穩(wěn)定性 | 隊列汁咏、分區(qū)多的時候性能不穩(wěn)定亚斋,明顯下降,消息堆積時性能穩(wěn)定 | 隊列多的時候攘滩,消息堆積時性能穩(wěn)定 | 消息堆積時性能不穩(wěn)定 | |
消息堆積能力 | 非常好 | 非常好 | 一般 | |
消息獲取 | pull | pull帅刊,push | pull,push | |
順序消費 | 支持 | 支持漂问,局部有序 | 支持 | |
定時消息 | 支持不好 | 支持赖瞒,開源只支持指定級別的延遲 | 支持不好 | |
事務消息 | 不支持 | 支持 | 不支持 | |
消息查詢 | 不支持 | 支持 | 不支持 |
二、RocketMQ架構分布圖
Apache RocketMQ是一個分布式消息傳遞和流媒體平臺蚤假,具有低延遲栏饮,高性能和可靠性, 萬億級容量和靈活的可伸縮性磷仰。 它由四個部分組成:nameserver袍嬉,broker,生產者和使用者灶平。 它們中的每一個都可以水平擴展伺通,而沒有單個故障點。
- nameserver:提供輕量級的服務發(fā)現(xiàn)和路由民逼。 每個名稱服務器記錄完整的路由信息泵殴,提供 相應的讀寫服務涮帘,并支持快速的存儲擴展拼苍。 注意:nameserver集群中每個nameserver都是相互獨立的,與zookeeper不同,nameserver節(jié)點間沒有通訊疮鲫,也沒有主從吆你、選舉概念。
- Broker:通過提供輕量級的TOPIC和QUEUE機制來存儲消息俊犯,把自身信息注冊到每個nameserver中妇多。
- 生產者:本地隨機從nameserver中維護broker的信息,并與每個master broker有心跳通訊燕侠。
- 消費者:本地隨機從nameserver中維護broker的信息者祖,并與每個master broker有心跳通訊。
三绢彤、RocketMQ環(huán)境
環(huán)境變量
#java環(huán)境
export JAVA_HOME=/usr/local/jdk
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH
#rocketmq環(huán)境
export ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq‐all‐4.1.0‐incubating
export PATH=$ROCKETMQ_HOME/bin:$PATH
broker配置
#rocketmq‐name服務地址,多個地址用;分開七问,不配置默認為localhost:9876
namesrvAddr = 192.168.241.198:9876
brokerClusterName = DefaultCluster
brokerName = broker‐a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
#主從角色SYNC_MASTER,ASYNC_MASTER,SLAVE
brokerRole = SYNC_MASTER
flushDiskType = ASYNC_FLUSH
#允許自動創(chuàng)建主題topic
autoCreateTopicEnable=true
#broker監(jiān)聽端口
listenPort=10911
#數(shù)據(jù)存儲位置
storePathRootDir=/root/rocketmq/store
- brokerClusterName:所屬集群名稱,如果節(jié)點較多可以配置多個茫舶。
- brokerName:brokerName為子集群的名稱械巡,子集群中有一個Master與多個Slave,子集群下所有節(jié)點的brokerName必須一樣饶氏,brokerId不一樣讥耗,默認brokerId = 0的為Master節(jié)點,大于0的為Salve節(jié)點疹启。
- namesrvAddr:注冊中心連接開放端口古程,可以配置多個,用分號分隔皮仁。
- deleteWhen:刪除數(shù)據(jù)的時間籍琳,04代表凌晨4點,fileReservedTime為數(shù)據(jù)保存在磁盤的時長贷祈,單位小時趋急。
- brokerRole:Master節(jié)點與Slave節(jié)點間的同步方式,有三個值:SYNC_MASTER势誊,ASYNC_MASTER呜达,SLAVE;同步和異步表示Master和Slave之間同步數(shù)據(jù)的機制粟耻,其中Slave一致性使用SLAVE查近;
- flushDiskType:刷盤策略,取值為:ASYNC_FLUSH挤忙,SYNC_FLUSH表示同步刷盤和異步刷盤霜威;SYNC_FLUSH消 息寫入磁盤后才返回成功狀態(tài),ASYNC_FLUSH不需要册烈;
- autoCreateTopicEnable:自動新建topic戈泼,默認為false。
- listenPort:啟動監(jiān)聽的端口號。
- storePathRootDir:磁盤存儲消息的根目錄大猛。
內存的設置
rocketmq集群內存的設置是針對注冊中心namesrv與broker內存的設置扭倾,分別設置rocketmq bin目錄下的runserver.sh與runbroker.sh(或者runserver.cmd與runbroker.cmd)。
- runserver配置
JAVA_OPT="${JAVA_OPT} ‐server ‐Xms256m ‐Xmx256m ‐Xmn128m ‐XX:MetaspaceSi ze=64m ‐XX:MaxMetaspaceSize=128m"
- runbroker配置
JAVA_OPT="${JAVA_OPT} ‐server ‐Xms256m ‐Xmx256m ‐Xmn128m ‐XX:MetaspaceSi ze=64m ‐XX:MaxMetaspaceSize=128m"
單機運行
#啟動注冊中心
nohup sh bin/mqnamesrv ‐n 192.168.241.198:9876
#啟動broker
nohup sh bin/mqbroker ‐n 192.168.241.198:9876 ‐c conf/broker.conf &
注意:啟動注冊中心或者broker的時候最好指定一下IP挽绩,防止在多網卡或者dockers的環(huán)境下膛壹,IP使用錯誤。
多機集群部署
在主目錄下的conf文件夾下提供了多種broker配置模式唉堪,分別有:2m-2s-async模聋,2m-2s- sync,2m-noslave唠亚。若目前2臺機器撬槽,分別部署1個 NameServer,同時分別部署一個Master和一個Slave趾撵,互為主備侄柔。
注冊中心配置
namesrvAddr配置與在單機的環(huán)境下無異。broker配置
broker節(jié)點在集群中有主從之分占调,與單機環(huán)境下的配置差異性主要體現(xiàn)如下:
- master
#broker節(jié)點注冊到多個注冊中心
namesrvAddr = 192.168.241.198:9876;192.168.241.199:9876
#主節(jié)點
brokerId = 0
#SYNC_MASTER或者ASYNC_MASTER
brokerRole = SYNC_MASTER
- slave
#broker節(jié)點注冊到多個注冊中心
namesrvAddr = 192.168.241.198:9876;192.168.241.199:9876
#非0表示從節(jié)點唯一標志
brokerId = 1
#表明從節(jié)點
brokerRole = SLAVE
環(huán)境驗證
- 查看集群監(jiān)控狀態(tài)
sh mqadmin clusterlist ‐n 192.168.241.198:9876;192.168.241.199:9876
- 測試
export NAMESRV_ADDR=192.168.241.198:9876;192.168.241.199:9876
測試發(fā)送端
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
測試消費端
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
rocketmq console
進入rocketmq-externals項目的GitHub地址暂题,如下圖,可看到RocketMQ項目的諸多擴展項目究珊,其中就包含我們需要下載的rocketmq-console薪者。
rocketmq-console是一個springboot項目,跑之前修改下配置剿涮。
四言津、基本概念
消息模型
RocketMQ主要由 Producer、Broker取试、Consumer 三部分組成悬槽,其中Producer 負責 生產消息,Consumer 負責消費消息瞬浓,Broker 負責存儲消息初婆。
消息對象
生產者(producer)
負責生產消息,一般由業(yè)務系統(tǒng)負責生產消息猿棉。一個消息生產者會把業(yè)務應用系統(tǒng)里產生的消息發(fā)送到broker服務器磅叛。RocketMQ提供多種發(fā)送方式,同步發(fā)送萨赁、異步發(fā)送弊琴、順序發(fā)送、單向發(fā)送杖爽。同步和異步方式均需要Broker返回確認信息敲董,單向發(fā)送不需要详瑞。消費者(Consumer)
負責消費消息,一般是后臺系統(tǒng)負責異步消費臣缀。一個消息消費者會從Broker服務器拉取消息、并將其提供給應用程序泻帮。從用戶應用的角度而言提供了兩種消費形式:拉取式消費 (pull consumer)精置、推動式消費(push consumer)。
主題(Topic)
表示一類消息的集合锣杂,每個主題包含若干條消息脂倦,每條消息只能屬于一個主題,是RocketMQ進行消息訂閱的基本單位元莫。
代理服務器(Broker Server)
消息中轉角色赖阻,負責存儲消息、轉發(fā)消息踱蠢。代理服務器在RocketMQ系統(tǒng)中負責接收 從生產者發(fā)送來的消息并存儲火欧、同時為消費者的拉取請求作準備。代理服務器也存儲消息相 關的元數(shù)據(jù)茎截,包括消費者組苇侵、消費進度偏移和主題和隊列消息等。
注冊中心服務(Name Server)
注冊中心服務充當路由消息的提供者企锌。生產者或消費者能夠通過名字服務查找各主題相應的 Broker IP列表榆浓。多個Namesrv實例組成集群,但相互獨立撕攒,沒有信息交換陡鹃。
組
生產者組(Producer Group)
同一類Producer的集合,這類Producer發(fā)送同一類消息且發(fā)送邏輯一致抖坪。如果發(fā)送的 是事物消息且原始生產者在發(fā)送之后崩潰萍鲸,則Broker服務器會聯(lián)系同一生產者組的其他生產者實例以提交或回溯消費。消費者組(Consumer Group)
同一類Consumer的集合擦俐,這類Consumer通常消費同一類消息且消費邏輯一致猿推。消費者組使得在消息消費方面,實現(xiàn)負載均衡和容錯的目標變得非常容易捌肴。要注意的是蹬叭,消費者組的消費者實例必須訂閱完全相同的Topic。RocketMQ 支持兩種消息模式:集群消費 (Clustering)和廣播消費(Broadcasting)状知。
客戶端消費
拉取式消費(Pull Consumer)
Consumer消費的一種類型秽五,應用通常主動調用Consumer的拉消息方法從Broker服務 器拉消息、主動權由應用控制饥悴。一旦獲取了批量消息坦喘,應用就會啟動消費過程盲再。推動式消費(Push Consumer)
Consumer消費的一種類型,該模式下Broker收到數(shù)據(jù)后會主動推送給消費端瓣铣,該消費模式一般實時性較高答朋。集群消費(Clustering)
集群消費模式下,相同Consumer Group的每個Consumer實例平均分攤消息。廣播消費(Broadcasting)
廣播消費模式下棠笑,相同Consumer Group的每個Consumer實例都接收全量的消息梦碗。
消息(Message)
消息系統(tǒng)所傳輸信息的物理載體,生產和消費數(shù)據(jù)的最小單位蓖救,每條消息必須屬于一個主題洪规。RocketMQ中每個消息擁有唯一的Message ID,且可以攜帶具有業(yè)務標識的Key循捺。 系統(tǒng)提供了通過Message ID和Key查詢消息的功能斩例。
順序消息
普通順序消息(Normal Ordered Message)
普通順序消費模式下,消費者通過同一個消費隊列收到的消息是有順序的从橘,不同消息隊列收到的消息則可能是無順序的念赶。嚴格順序消息(Strictly Ordered Message)
嚴格順序消息模式下,消費者收到的所有消息均是有順序的恰力。
標簽(Tag)
為消息設置的標志晶乔,用于同一主題下區(qū)分不同類型的消息。來自同一業(yè)務單元的消息牺勾, 可以根據(jù)不同業(yè)務目的在同一主題下設置不同標簽正罢。標簽能夠有效地保持代碼的清晰度和連 貫性,并優(yōu)化RocketMQ提供的查詢系統(tǒng)驻民。消費者可以根據(jù)Tag實現(xiàn)對不同子主題的不同消 費邏輯翻具,實現(xiàn)更好的擴展性。
注意1:每個broker中都會有一個commitlog回还,由于記錄生產者發(fā)送的消息裆泳。
注意2:每個broker中有多個Topic,每個Topic中默認有4個queue隊列柠硕,每個queue對應一個持久化文件工禾。
注意3:每個broker中會對應一個consumerOffset.json文件,用于記錄隊列消費的節(jié)點到哪了蝗柔。
注意4:consumer闻葵、producer與broker間的通信基于Netty來實現(xiàn)的,默認為Netty中的epoll模式癣丧,若系統(tǒng)不支持epoll模式槽畔,才使用nio模式。
注意5:producer在發(fā)送消息的時候胁编,會以輪循的方式放置于隊列中(比如圖上broker-master-1與broker-master-2共8個)厢钧,若有順序消息的話鳞尔,會保證所有順序消息放在同一個隊列中。
沒開始使用的broker內部的文件早直。
已經使用的broker內部的文件寥假。
config內部結構
store:存儲commitlog文件,每個broker對應一個commitlog霞扬,commitlog中存儲的是topic真正的內容數(shù)據(jù)糕韧。
index:索引。
consumequeue:存儲每個主題下的隊列祥得,默認每個主題4個隊列,這邊存儲的主要是消息的tag蒋得、消息對應在commitlog的地址级及、空間大小等,额衙。
topic.json: 存儲所有topic的信息饮焦,主要為topic的屬性信息。
consumerOffset.json:消費者偏移量信息窍侧,對應了每個主題@每個消費群組{隊列1:偏移量县踢,隊列2:偏移量,隊列3:偏移量伟件,隊列4:偏移量}
名稱 | 作用 |
---|---|
broker | broker模塊:c和p端消息存儲邏輯 |
client | 客戶端api:produce硼啤、consumer端 接受與發(fā)送api |
common | 公共組件:常量、基類斧账、數(shù)據(jù)結構 |
tools | 運維tools:命令行工具模塊 |
store | 存儲模塊:消息谴返、索引、commitlog存儲 |
namesrv | 服務管理模塊:服務注冊topic等信息存儲 |
remoting | 遠程通訊模塊:netty+fastjson |
logappender | 日志適配模塊 |
example | Demo列子 |
filtersrv | 消息過濾器模塊 |
srvutil | 輔助模塊 |
filter | 過濾模塊:消息過濾模塊 |
distribution | 部署咧织、運維相關zip包中的代碼 |
openmessaging | 兼容openmessaging分布式消息模塊 |
五嗓袱、使用
1. 同步、異步习绢、一次性
生產者
同步
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("tl_msg_student_group");
producer.setNamesrvAddr("192.168.241.198:9876");
//producer.setSendMsgTimeout(10000);
producer.start();
Message msg = new Message("TopicStudent" ,
"TagStudent" ,
"tag" ,
("Hello tuling RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();
}
異步
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("tl_message_group");
producer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
producer.start();
//設置發(fā)送失敗重試機制
producer.setRetryTimesWhenSendAsyncFailed(5);
int messageCount = 1;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
final int index = i;
Message msg = new Message("TopicTest",
"TagSendOne",
"OrderID188",
"I m sending msg content is yangguo".getBytes(RemotingHelper.DEFAULT_CHARSET));
//消息發(fā)送成功后渠抹,執(zhí)行回調函數(shù)
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
//防止回調未回,producer就已經刪除
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
一次性
DefaultMQProducer producer = new DefaultMQProducer("tl_message_group");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
producer.setSendMsgTimeout(10000);
producer.start();
for (int i = 0; i < 1; i++) {
Message msg = new Message("TopicTest" /* Topic */,
"TagSendOne" /* Tag */,
"OrderID198",
("Hello RocketMQ test i " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
producer.sendOneway(msg);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
消費者
// tl_msg_student_group
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tl_student_group");
// ;192.168.241.199:9876
consumer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicStudent", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs){
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
2. 廣播消息
生產者
DefaultMQProducer producer = new DefaultMQProducer("consumer_model_group");
producer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
producer.start();
for (int i = 0; i < 4; i++){
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
("Hello world"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_model_group");
consumer.setNamesrvAddr("192.168.241.198:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//廣播,全量消費
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt ext : msgs){
System.out.printf(Thread.currentThread().getName() + " Receive New Message: " + new String(ext.getBody()) + "%n");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
3. 批量消息
批量發(fā)送消息能顯著提高傳遞小消息的性能闪萄。限制是這些批量消息應該有相同的 topic,相同的waitStoreMsgOK败去,而且不能是延時消息篮幢。此外,這一批消息的總大小不應 超過4MB为迈。rocketmq建議每次批量消息大小大概在1MB三椿。 當消息大小超過4MB時缺菌,需要將消息進行分割。
生產者
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1000 * 1000 * 1;//1MB
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
//遍歷消息準備拆分
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //for log overhead
if (tmpSize > SIZE_LIMIT) {
if (nextIndex - currIndex == 0) {
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
/**
* rocketMq 支持消息批量發(fā)送
* 同一批次的消息應具有:相同的主題搜锰,相同的waitStoreMsgOK伴郁,并且不支持定時任務。
* <strong> 同一批次消息建議大小不超過~1M </strong>,消息最大不能超過4M,需要
* 對msg進行拆分
*/
DefaultMQProducer producer = new DefaultMQProducer("batch_group");
producer.setNamesrvAddr("192.168.241.198:9876");
producer.start();
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
ListSplitter splitter = new ListSplitter(messages);
/**
* 對批量消息進行拆分
*/
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch_group");
// ;192.168.241.199:9876
consumer.setNamesrvAddr("192.168.241.198:9876");
consumer.subscribe("BatchTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs){
System.out.println("queueId=" + msg.getQueueId() + "," + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
4. 過濾消息
RocketMq在消息過濾這塊做得很強大蛋叼,它可以通過Tag過濾消息焊傅,可以通過SQL表達式篩選消息,它也可以支持java腳本過濾狈涮。
其中通過SQL表達式篩選 和 java腳本過濾 需要在broker的配置文件中把對應的配置打開狐胎。
enablePropertyFilter=true
Topic 與 Tag 都是業(yè)務上用來歸類的標識,區(qū)分在于 Topic 是一級分類歌馍,而 Tag 可以說是二級分類握巢,關系如圖所示。
生產者
/***
* TAG-FILTER-1000 ---> 布隆過濾器
* 過濾掉的那些消息松却。直接就跳過了么暴浦。下次就不會繼續(xù)過濾這些了。是么晓锻。
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("filter_sample_group");
producer.setNamesrvAddr("192.168.241.198:9876");
producer.start();
for (int i = 0; i < 3; i++) {
Message msg = new Message("TopicFilter",
"TAG-FILTER",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
msg.putUserProperty("a",String.valueOf(i));
if(i % 2 == 0){
msg.putUserProperty("b","yangguo");
}else{
msg.putUserProperty("b","xiaolong girl");
}
producer.send(msg);
}
producer.shutdown();
}
消費者
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_sample_group");
/**
* 注冊中心
*/
consumer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
/**
* 訂閱主題
* 一種資源去換取另外一種資源
*/
consumer.subscribe("TopicFilter", MessageSelector.bySql("a between 0 and 3 and b = 'yangguo'"));
/**
* 注冊監(jiān)聽器歌焦,監(jiān)聽主題消息
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs){
try {
System.out.println("consumeThread=" + Thread.currentThread().getName()
+ ", queueId=" + msg.getQueueId() + ", content:"
+ new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Filter Consumer Started.%n");
}
5. 延遲消息
定時消息是指消息發(fā)到 Broker 后,不能立刻被 Consumer 消費砚哆,要到特定的時間點 或者等待特定的時間后才能被消費独撇。
使用場景:如電商里,提交了一個訂單就可以發(fā)送一個延時消息躁锁,1h后去檢查這個訂單的 狀態(tài)券勺,如果還是未付款就取消訂單釋放庫存。
當前支持的延遲時間
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
分別對應級別
1 2 3....................
生產者
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ExampleConsumer");
//;192.168.241.199:9876
producer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
producer.start();
int totalMessagesToSend = 3;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
//延時消費
message.setDelayTimeLevel(6);
// Send the message
producer.send(message);
}
System.out.printf("message send is completed .%n");
producer.shutdown();
}
消費者
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
//;192.168.241.199:9876
consumer.setNamesrvAddr("192.168.241.198:9876");
consumer.subscribe("TestTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
+ "message content is :" + new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
//System.out.printf("Consumer Started.%n");
}
注意1:延遲消息發(fā)送到broker的時候灿里,broker會專門新建一個中轉主題SCHEDULED_TOPIC_XXXX來存放消息关炼,目前開原版只支持18個級別,相當于中轉主題下存在18個隊列文件分別存儲這18個級別匣吊。同時broker后臺開啟個線程儒拂,只要延遲消息的時間到了,才會把延遲消息放置于真正的topic下色鸳。
注意2:開源版下的延遲消息并不適合高并發(fā)的延遲消息社痛,若業(yè)務存在高并發(fā)的延遲消息,需要考慮使用商業(yè)版的RocketMQ命雀。
注意3:客戶端集群消息的消費來源于pullRequestQueue蒜哀,pullRequestQueue中的消息來源在于客戶端中存在一個線程從broker中主動pull。
注意4:客戶端從namesrv同步信息周期30s吏砂,客戶端與broker心跳周期30s撵儿,客戶端心跳消費偏移量同步周期5s乘客。
注意5:客戶端執(zhí)行失敗的消息,客戶端會發(fā)回到broker中淀歇,broker端會新建一個RETRY_TOPIC_XXXX來存儲易核,大概10S后會再次發(fā)給客戶端消費,默認16次浪默。
6. 順序消息
生產者
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ordered_group_name");
producer.setNamesrvAddr("192.168.241.198:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// 訂單列表
List<OrderStep> orderList = buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加個時間前綴
String body = dateStr + " Hello RocketMQ "+ i + " " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,
body.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //根據(jù)訂單id選擇發(fā)送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//訂單id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
消費者
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_group_name");
consumer.setNamesrvAddr("192.168.241.198:9876");
/**
* 設置消費位置
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每個queue有唯一的consume來消費, 訂單對每個queue(分區(qū))有序
try {
System.out.println("consumeThread=" + Thread.currentThread().getName() + ", queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
7. 事務消息
半事務消息:暫不能投遞的消息牡直,發(fā)送方已經成功地將消息發(fā)送到了消息隊列 MQ 服務端,但是服務端未收到生產者對該消息的二次確認纳决,此時該消息被標記碰逸。半事務消息會單獨存儲在HALF_TOPIC中。
消息回查:由于網絡閃斷阔加、生產者應用重啟等原因饵史,導致某條事務消息的二次確 認丟失,消息隊列 MQ 服務端通過掃描發(fā)現(xiàn)某條消息長期處于“半事務消息”時掸哑,需要 主動向消息生產者詢問該消息的最終狀態(tài)(Commit 或是 Rollback)约急,該詢問過程即 消息回查零远。
注意:事務消息中的實現(xiàn)在于product端與broker端是雙向通信的苗分,互為客戶端和服務端
生產者
private void testTransaction() throws MessagingException {
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg = MessageBuilder.withPayload("Hello RocketMQ " + i).
setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build();
/**
* TX_PGROUP_NAME 必須同 {@link TransactionListenerImpl} 類的注解 txProducerGroup
* @RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup")
*/
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME,
springTransTopic + ":" + tags[i % tags.length], msg, null);
System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n",
msg.getPayload(), sendResult.getSendStatus());
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
}
}
監(jiān)聽
@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<String, Integer>();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transId = (String)msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID);
System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n",
transId);
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(transId, status);
if (status == 0) {
// 事務提交
System.out.printf(" # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());
return RocketMQLocalTransactionState.COMMIT;
}
if (status == 1) {
// 本地事務回滾
System.out.printf(" # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());
return RocketMQLocalTransactionState.ROLLBACK;
}
// 事務狀態(tài)不確定,待Broker發(fā)起 ASK 回查本地事務狀態(tài)
System.out.printf(" # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");
return RocketMQLocalTransactionState.UNKNOWN;
}
/**
* 在{@link TransactionListenerImpl#executeLocalTransaction(org.springframework.messaging.Message, java.lang.Object)}
* 中執(zhí)行本地事務時可能失敗,或者異步提交牵辣,導致事務狀態(tài)暫時不能確定摔癣,broker在一定時間后
* 將會發(fā)起重試,broker會向producer-group發(fā)起ask回查纬向,
* 這里producer->相當于server端择浊,broker相當于client端,所以由此可以看出broker&producer-group是
* 雙向通信的逾条。
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = (String)msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID);
RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
Integer status = localTrans.get(transId);
if (null != status) {
switch (status) {
case 0:
retState = RocketMQLocalTransactionState.UNKNOWN;
break;
case 1:
retState = RocketMQLocalTransactionState.COMMIT;
break;
case 2:
retState = RocketMQLocalTransactionState.ROLLBACK;
break;
}
}
System.out.printf("------ !!! checkLocalTransaction is executed once," +
" msgTransactionId=%s, TransactionState=%s status=%s %n",
transId, retState, status);
return retState;
}
}
七琢岩、消息存儲整體架構
消息存儲架構圖中主要有下面三個跟消息存儲相關的文件構成。
- CommitLog
消息主體以及元數(shù)據(jù)的存儲主體师脂,存儲Producer端寫入的消息主體內容, 消息內容不是定長的担孔。單個文件大小默認1G ,文件名長度為20位吃警,左邊補零糕篇,剩余為起始偏移量,比如00000000000000000000代表了第一個文件酌心,起始偏移量為0拌消,文件大小為 1G=1073741824;當?shù)谝粋€文件寫滿了安券,第二個文件為00000000001073741824墩崩,起始 偏移量為1073741824氓英,以此類推。消息主要是順序寫入日志文件泰鸡,當文件滿了债蓝,寫入下一 個文件;
- ConsumeQueue
消息消費隊列盛龄,引入的目的主要是提高消息消費的性能饰迹,由于 RocketMQ是基于主題topic的訂閱模式,消息消費是針對主題進行的余舶,如果要遍歷 commitlog文件中根據(jù)topic檢索消息是非常低效的啊鸭。Consumer即可根據(jù) ConsumeQueue來查找待消費的消息。其中匿值,ConsumeQueue(邏輯消費隊列)作為消費消息的索引赠制,保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset, 消息大小size和消息Tag的HashCode值挟憔。consumequeue文件可以看成是基于topic的 commitlog索引文件钟些,故consumequeue文件夾的組織方式如下:topic/queue/file三層 組織結構,具體存儲路徑為: $HOME/store/consumequeue/{topic}/{queueId}/{fileName}绊谭。同樣consumequeue文 件采取定長設計政恍,每一個條目共20個字節(jié),分別為8字節(jié)的commitlog物理偏移量达传、4字節(jié) 的消息長度篙耗、8字節(jié)tag hashcode,單個文件由30W個條目組成宪赶,可以像數(shù)組一樣隨機訪 問每一個條目宗弯,每個ConsumeQueue文件大小約5.72M;
- IndexFile
IndexFile(索引文件)提供了一種可以通過key或時間區(qū)間來查詢消息的方 法搂妻。Index文件的存儲位置是:{fileName}嘀掸,文件名fileName是以 創(chuàng)建時的時間戳命名的沸版,固定的單個IndexFile文件大小:40+500Wx4+2000Wx20= 420000040個字節(jié)大小,約為400M诅愚,一個IndexFile可以保存 2000W個索引垃瞧,IndexFile 的底層存儲設計為在文件系統(tǒng)中實現(xiàn)HashMap結構酝锅,故rocketmq的索引文件其底層實現(xiàn) 為hash索引到涂。
零拷貝刷盤
以文件下載為例,服務端的主要任務是:將服務端主機磁盤中的文件不做修改地從已連接的socket發(fā)出去涤妒。操作系統(tǒng)底層I/O過程如下圖所示:
過程共產生了四次數(shù)據(jù)拷貝单雾,在此過程中,我們沒有對文件內容做任何修改,那么在內核空 間和用戶空間來回拷貝數(shù)據(jù)無疑就是一種浪費硅堆,而零拷貝主要就是為了解決這種低效性屿储。
什么是零拷貝技術?
零拷貝主要的任務就是避免CPU將數(shù)據(jù)從一塊存儲拷貝到另外一塊存儲渐逃,主要就是利用各種零拷貝技術够掠,避免讓CPU做大量的數(shù)據(jù)拷貝任務,減少不必要的拷貝茄菊,或者讓別的組件 來做這一類簡單的數(shù)據(jù)傳輸任務疯潭,讓CPU解脫出來專注于別的任務。這樣就可以讓系統(tǒng)資源的利用更加有效面殖。
原理是磁盤上的數(shù)據(jù)會通過DMA被拷貝的內核緩沖區(qū)竖哩,接著操作系統(tǒng)會把這段內核緩沖 區(qū)與應用程序共享,這樣就不需要把內核緩沖區(qū)的內容往用戶空間拷貝脊僚。應用程序再調用 write(),操作系統(tǒng)直接將內核緩沖區(qū)的內容拷貝到socket緩沖區(qū)中相叁,這一切都發(fā)生在內核 態(tài),最后辽幌,socket緩沖區(qū)再把數(shù)據(jù)發(fā)到網卡去增淹。
注意:連續(xù)的磁盤空間才不用經過用戶空間的整合,而直接實現(xiàn)頁緩存與socket緩沖區(qū)的共享乌企,從而減少了內核空間到用戶空間狀態(tài)的轉換虑润,并且減少了2次內核空間與用戶空間復制操作,進而提高了整個系統(tǒng)的性能逛犹。這就是rocketmq開辟磁盤空間的時候為什么選擇直接開啟足夠大的磁盤空間文件進行存儲消息的原因(CommitLog IndexFile)端辱。