概述
消息隊列作為高并發(fā)系統(tǒng)的核心組件之一泳秀,能夠幫助業(yè)務(wù)系統(tǒng)解構(gòu)提升開發(fā)效率和系統(tǒng)穩(wěn)定性鞋既。主要具有以下優(yōu)勢:
- 削峰填谷: 主要解決瞬時寫壓力大于應(yīng)用服務(wù)能力導(dǎo)致消息丟失鼓拧、系統(tǒng)奔潰等問題
- 系統(tǒng)解耦: 解決不同重要程度、不同能力級別系統(tǒng)之間依賴導(dǎo)致一死全死
- 提升性能: 當(dāng)存在一對多調(diào)用時娜扇,可以發(fā)一條消息給消息系統(tǒng)错沃,讓消息系統(tǒng)通知相關(guān)系統(tǒng)
- 蓄流壓測: 線上有些鏈路不好壓測,可以通過堆積一定量消息再放開來壓測
RocketMQ
Apache Alibaba RocketMQ 是一個消息中間件雀瓢。消息中間件中有兩個角色:消息生產(chǎn)者和消息消費者枢析。RocketMQ 里同樣有這兩個概念,消息生產(chǎn)者負責(zé)創(chuàng)建消息并發(fā)送到 RocketMQ 服務(wù)器刃麸,RocketMQ 服務(wù)器會將消息持久化到磁盤醒叁,消息消費者從 RocketMQ 服務(wù)器拉取消息并提交給應(yīng)用消費。
RocketMQ 特點
RocketMQ 是一款分布式、隊列模型的消息中間件把沼,具有以下特點:
- 支持嚴(yán)格的消息順序
- 支持 Topic 與 Queue 兩種模式
- 億級消息堆積能力
- 比較友好的分布式特性
- 同時支持 Push 與 Pull 方式消費消息
- 歷經(jīng)多次天貓雙十一海量消息考驗
RocketMQ 優(yōu)勢
目前主流的 MQ 主要是 RocketMQ啊易、kafka、RabbitMQ饮睬,其主要優(yōu)勢有:
- 支持事務(wù)型消息(消息發(fā)送和 DB 操作保持兩方的最終一致性租谈,RabbitMQ 和 Kafka 不支持)
- 支持結(jié)合 RocketMQ 的多個系統(tǒng)之間數(shù)據(jù)最終一致性(多方事務(wù),二方事務(wù)是前提)
- 支持 18 個級別的延遲消息(RabbitMQ 和 Kafka 不支持)
- 支持指定次數(shù)和時間間隔的失敗消息重發(fā)(Kafka 不支持捆愁,RabbitMQ 需要手動確認)
- 支持 Consumer 端 Tag 過濾割去,減少不必要的網(wǎng)絡(luò)傳輸(RabbitMQ 和 Kafka 不支持)
- 支持重復(fù)消費(RabbitMQ 不支持,Kafka 支持)
消息隊列對比參照表
基于 Docker 安裝 RocketMQ
docker-compose.yml
注意:啟動 RocketMQ Server + Broker + Console 至少需要 2G 內(nèi)存
version: '3.5'
services:
rmqnamesrv:
image: foxiswho/rocketmq:server
container_name: rmqnamesrv
ports:
- 9876:9876
volumes:
- ./data/logs:/opt/logs
- ./data/store:/opt/store
networks:
rmq:
aliases:
- rmqnamesrv
rmqbroker:
image: foxiswho/rocketmq:broker
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
volumes:
- ./data/logs:/opt/logs
- ./data/store:/opt/store
- ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf
environment:
NAMESRV_ADDR: "rmqnamesrv:9876"
JAVA_OPTS: " -Duser.home=/opt"
JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
command: mqbroker -c /etc/rocketmq/broker.conf
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqbroker
rmqconsole:
image: styletang/rocketmq-console-ng
container_name: rmqconsole
ports:
- 8080:8080
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqconsole
networks:
rmq:
name: rmq
driver: bridge
broker.conf
RocketMQ Broker 需要一個配置文件昼丑,按照上面的 Compose 配置劫拗,我們需要在 ./data/brokerconf/
目錄下創(chuàng)建一個名為 broker.conf
的配置文件,內(nèi)容如下:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 所屬集群名字
brokerClusterName=DefaultCluster
# broker 名字矾克,注意此處不同的配置文件填寫的不一樣页慷,如果在 broker-a.properties 使用: broker-a,
# 在 broker-b.properties 使用: broker-b
brokerName=broker-a
# 0 表示 Master,> 0 表示 Slave
brokerId=0
# nameServer地址胁附,分號分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 啟動IP,如果 docker 報 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解決方式1 加上一句 producer.setVipChannelEnabled(false);酒繁,解決方式2 brokerIP1 設(shè)置宿主機IP,不要使用docker 內(nèi)部IP
# brokerIP1=192.168.0.253
# 在發(fā)送消息時控妻,自動創(chuàng)建服務(wù)器不存在的topic州袒,默認創(chuàng)建的隊列數(shù)
defaultTopicQueueNums=4
# 是否允許 Broker 自動創(chuàng)建 Topic,建議線下開啟弓候,線上關(guān)閉 @煽蕖!菇存!這里仔細看是 false夸研,false,false
autoCreateTopicEnable=true
# 是否允許 Broker 自動創(chuàng)建訂閱組依鸥,建議線下開啟亥至,線上關(guān)閉
autoCreateSubscriptionGroup=true
# Broker 對外服務(wù)的監(jiān)聽端口
listenPort=10911
# 刪除文件時間點,默認凌晨4點
deleteWhen=04
# 文件保留時間贱迟,默認48小時
fileReservedTime=120
# commitLog 每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue 每個文件默認存 30W 條姐扮,根據(jù)業(yè)務(wù)情況調(diào)整
mapedFileSizeConsumeQueue=300000
# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
# 存儲路徑
# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
# commitLog 存儲路徑
# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
# 消費隊列存儲
# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
# 消息索引存儲路徑
# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
# checkpoint 文件存儲路徑
# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
# abort 文件存儲路徑
# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker 的角色
# - ASYNC_MASTER 異步復(fù)制Master
# - SYNC_MASTER 同步雙寫Master
# - SLAVE
brokerRole=ASYNC_MASTER
# 刷盤方式
# - ASYNC_FLUSH 異步刷盤
# - SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
# 發(fā)消息線程池數(shù)量
# sendMessageThreadPoolNums=128
# 拉消息線程池數(shù)量
# pullMessageThreadPoolNums=128
RocketMQ 控制臺
訪問 http://192.168.198.131:8070 登入控制臺
由于本教程整個案例基于 Spring Cloud,故我們采用 Spring Cloud Stream 完成一次發(fā)布和訂閱
官方教程
Spring Cloud Alibaba RocketMQ
Spring Cloud Stream
Spring Cloud Stream 是一個用于構(gòu)建基于消息的微服務(wù)應(yīng)用框架衣吠。它基于 SpringBoot 來創(chuàng)建具有生產(chǎn)級別的單機 Spring 應(yīng)用茶敏,并且使用 Spring Integration
與 Broker 進行連接。
Spring Cloud Stream 提供了消息中間件配置的統(tǒng)一抽象缚俏,推出了 publish-subscribe惊搏、consumer groups贮乳、partition 這些統(tǒng)一的概念。
Spring Cloud Stream 內(nèi)部有兩個概念:Binder 和 Binding胀屿。
- Binder: 跟外部消息中間件集成的組件,用來創(chuàng)建 Binding包雀,各消息中間件都有自己的 Binder 實現(xiàn)宿崭。
比如 Kafka
的實現(xiàn) KafkaMessageChannelBinder
,RabbitMQ
的實現(xiàn) RabbitMessageChannelBinder
以及 RocketMQ
的實現(xiàn) RocketMQMessageChannelBinder
才写。
- Binding: 包括 Input Binding 和 Output Binding葡兑。
Binding 在消息中間件與應(yīng)用程序提供的 Provider 和 Consumer 之間提供了一個橋梁,實現(xiàn)了開發(fā)者只需使用應(yīng)用程序的 Provider 或 Consumer 生產(chǎn)或消費數(shù)據(jù)即可赞草,屏蔽了開發(fā)者與底層消息中間件的接觸讹堤。
Figure 4. Spring Cloud Stream
使用 Spring Cloud Stream 完成一段簡單的消息發(fā)送和消息接收代碼:
MessageChannel messageChannel = new DirectChannel();
// 消息訂閱
((SubscribableChannel) messageChannel).subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("receive msg: " + message.getPayload());
}
});
// 消息發(fā)送
messageChannel.send(MessageBuilder.withPayload("simple msg").build());
這段代碼所有的消息類都是 spring-messaging
模塊里提供的。屏蔽具體消息中間件的底層實現(xiàn)厨疙,如果想用更換消息中間件洲守,在配置文件里配置相關(guān)消息中間件信息以及修改 binder 依賴即可。
Spring Cloud Stream 底層基于這段代碼去做了各種抽象沾凄。
解決連接超時問題
我們采用 Docker 部署了 RocketMQ 服務(wù)梗醇,此時 RocketMQ Broker 暴露的地址和端口(10909,10911)是基于容器的撒蟀,會導(dǎo)致我們開發(fā)機無法連接叙谨,從而引發(fā) org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
異常
注意下圖中的 IP 地址,這個是容器的 IP保屯,開發(fā)機與容器不在一個局域網(wǎng)所以無法連接手负。
解決方案是在 broker.conf
配置文件中增加 brokerIP1=宿主機IP
即可
POM
參考文檔,此處使用com.alibaba.cloud:spring-cloud-starter-stream-rocketmq
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.wj</groupId>
<artifactId>hello-spring-cloud-alibaba-dependencies</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../hello-spring-cloud-alibaba-dependencies/pom.xml</relativePath>
</parent>
<artifactId>hello-spring-cloud-alibaba-rocketmq-provider</artifactId>
<packaging>jar</packaging>
<name>hello-spring-cloud-alibaba-rocketmq-provider</name>
<dependencies>
<!-- Spring Boot Begin -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Spring Boot End -->
<!-- Spring Cloud Begin -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<!-- Spring Cloud End -->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>com.wj.hello.spring.cloud.alibaba.rocketmq.provider.RocketMQProviderApplication</mainClass>
</configuration>
</plugin>
</plugins>
</build>
</project>
Application
package com.wj.hello.spring.cloud.alibaba.rocketmq.provider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
@SpringBootApplication
@EnableBinding({Source.class})
public class RocketMQProviderApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(RocketMQProviderApplication.class,args);
}
@Autowired
private MessageChannel output;
@Override
public void run(String... args) throws Exception {
output.send(MessageBuilder.withPayload("Hello RockerMQ").build());
}
}
application.yml
spring:
application:
name: rocketmq-provider
cloud:
stream:
rocketmq:
binder:
# RocketMQ 服務(wù)器地址
namesrv-server: 192.168.198.131:9876
bindings:
# 這里是個 Map 類型參數(shù)姑尺,{} 為 YAML 中 Map 的行內(nèi)寫法
output: {destination: test-topic, content-type: application/json}
server:
port: 9093
management:
endpoints:
web:
exposure:
include: '*'
運行成功后即可在 RocketMQ 控制臺的 消息 列表中選擇 test-topic
主題即可看到發(fā)送的消息
RocketMQ 消費者
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.wj</groupId>
<artifactId>hello-spring-cloud-alibaba-dependencies</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../hello-spring-cloud-alibaba-dependencies/pom.xml</relativePath>
</parent>
<artifactId>hello-spring-cloud-alibaba-rocketmq-consumer</artifactId>
<packaging>jar</packaging>
<name>hello-spring-cloud-alibaba-rocketmq-consumer</name>
<dependencies>
<!-- Spring Boot Begin -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Spring Boot End -->
<!-- Spring Cloud Begin -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<!-- Spring Cloud End -->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>com.wj.hello.spring.cloud.alibaba.rocketmq.consumer.RocketMQConsumerApplication</mainClass>
</configuration>
</plugin>
</plugins>
</build>
</project>
Application
package com.wj.hello.spring.cloud.alibaba.rocketmq.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
@SpringBootApplication
@EnableBinding({Sink.class})
public class RocketMQConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQConsumerApplication.class, args);
}
@StreamListener("input")
private void inputMessage(String message){
System.out.printf("input receive: " + message);
}
}
application.yml
spring:
application:
name: rocketmq-consumer
cloud:
stream:
rocketmq:
binder:
namesrv-server: 192.168.198.131:9876
bindings:
input: {consumer.orderly: true}
bindings:
input: {destination: test-topic, content-type: text/plain, group: test-group, consumer.maxAttempts: 1}
server:
port: 9094
management:
endpoints:
web:
exposure:
include: '*'
運行成功后即可在控制臺接收到消息:input receive:: Hello RocketMQ
rocketmq導(dǎo)圖
推薦文章:
https://www.cnblogs.com/goodAndyxublog/p/11457164.html
https://www.funtl.com/zh/spring-cloud-alibaba/RocketMQ-%E7%AE%80%E4%BB%8B.html