RocketMQ 簡介

概述

消息隊列作為高并發(fā)系統(tǒng)的核心組件之一泳秀,能夠幫助業(yè)務(wù)系統(tǒng)解構(gòu)提升開發(fā)效率和系統(tǒng)穩(wěn)定性鞋既。主要具有以下優(yōu)勢:

  • 削峰填谷: 主要解決瞬時寫壓力大于應(yīng)用服務(wù)能力導(dǎo)致消息丟失鼓拧、系統(tǒng)奔潰等問題
  • 系統(tǒng)解耦: 解決不同重要程度、不同能力級別系統(tǒng)之間依賴導(dǎo)致一死全死
  • 提升性能: 當(dāng)存在一對多調(diào)用時娜扇,可以發(fā)一條消息給消息系統(tǒng)错沃,讓消息系統(tǒng)通知相關(guān)系統(tǒng)
  • 蓄流壓測: 線上有些鏈路不好壓測,可以通過堆積一定量消息再放開來壓測

RocketMQ

Apache Alibaba RocketMQ 是一個消息中間件雀瓢。消息中間件中有兩個角色:消息生產(chǎn)者和消息消費者枢析。RocketMQ 里同樣有這兩個概念,消息生產(chǎn)者負責(zé)創(chuàng)建消息并發(fā)送到 RocketMQ 服務(wù)器刃麸,RocketMQ 服務(wù)器會將消息持久化到磁盤醒叁,消息消費者從 RocketMQ 服務(wù)器拉取消息并提交給應(yīng)用消費。

RocketMQ 特點

RocketMQ 是一款分布式、隊列模型的消息中間件把沼,具有以下特點:

  • 支持嚴(yán)格的消息順序
  • 支持 Topic 與 Queue 兩種模式
  • 億級消息堆積能力
  • 比較友好的分布式特性
  • 同時支持 Push 與 Pull 方式消費消息
  • 歷經(jīng)多次天貓雙十一海量消息考驗

RocketMQ 優(yōu)勢

目前主流的 MQ 主要是 RocketMQ啊易、kafka、RabbitMQ饮睬,其主要優(yōu)勢有:

  • 支持事務(wù)型消息(消息發(fā)送和 DB 操作保持兩方的最終一致性租谈,RabbitMQ 和 Kafka 不支持)
  • 支持結(jié)合 RocketMQ 的多個系統(tǒng)之間數(shù)據(jù)最終一致性(多方事務(wù),二方事務(wù)是前提)
  • 支持 18 個級別的延遲消息(RabbitMQ 和 Kafka 不支持)
  • 支持指定次數(shù)和時間間隔的失敗消息重發(fā)(Kafka 不支持捆愁,RabbitMQ 需要手動確認)
  • 支持 Consumer 端 Tag 過濾割去,減少不必要的網(wǎng)絡(luò)傳輸(RabbitMQ 和 Kafka 不支持)
  • 支持重復(fù)消費(RabbitMQ 不支持,Kafka 支持)

消息隊列對比參照表

基于 Docker 安裝 RocketMQ

docker-compose.yml

注意:啟動 RocketMQ Server + Broker + Console 至少需要 2G 內(nèi)存

version: '3.5'
services:
  rmqnamesrv:
    image: foxiswho/rocketmq:server
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    volumes:
      - ./data/logs:/opt/logs
      - ./data/store:/opt/store
    networks:
        rmq:
          aliases:
            - rmqnamesrv

  rmqbroker:
    image: foxiswho/rocketmq:broker
    container_name: rmqbroker
    ports:
      - 10909:10909
      - 10911:10911
    volumes:
      - ./data/logs:/opt/logs
      - ./data/store:/opt/store
      - ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf
    environment:
        NAMESRV_ADDR: "rmqnamesrv:9876"
        JAVA_OPTS: " -Duser.home=/opt"
        JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
    command: mqbroker -c /etc/rocketmq/broker.conf
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqbroker

  rmqconsole:
    image: styletang/rocketmq-console-ng
    container_name: rmqconsole
    ports:
      - 8080:8080
    environment:
        JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqconsole

networks:
  rmq:
    name: rmq
    driver: bridge

broker.conf

RocketMQ Broker 需要一個配置文件昼丑,按照上面的 Compose 配置劫拗,我們需要在 ./data/brokerconf/ 目錄下創(chuàng)建一個名為 broker.conf 的配置文件,內(nèi)容如下:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

# 所屬集群名字
brokerClusterName=DefaultCluster

# broker 名字矾克,注意此處不同的配置文件填寫的不一樣页慷,如果在 broker-a.properties 使用: broker-a,
# 在 broker-b.properties 使用: broker-b
brokerName=broker-a

# 0 表示 Master,> 0 表示 Slave
brokerId=0

# nameServer地址胁附,分號分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

# 啟動IP,如果 docker 報 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解決方式1 加上一句 producer.setVipChannelEnabled(false);酒繁,解決方式2 brokerIP1 設(shè)置宿主機IP,不要使用docker 內(nèi)部IP
# brokerIP1=192.168.0.253

# 在發(fā)送消息時控妻,自動創(chuàng)建服務(wù)器不存在的topic州袒,默認創(chuàng)建的隊列數(shù)
defaultTopicQueueNums=4

# 是否允許 Broker 自動創(chuàng)建 Topic,建議線下開啟弓候,線上關(guān)閉 @煽蕖!菇存!這里仔細看是 false夸研,false,false
autoCreateTopicEnable=true

# 是否允許 Broker 自動創(chuàng)建訂閱組依鸥,建議線下開啟亥至,線上關(guān)閉
autoCreateSubscriptionGroup=true

# Broker 對外服務(wù)的監(jiān)聽端口
listenPort=10911

# 刪除文件時間點,默認凌晨4點
deleteWhen=04

# 文件保留時間贱迟,默認48小時
fileReservedTime=120

# commitLog 每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824

# ConsumeQueue 每個文件默認存 30W 條姐扮,根據(jù)業(yè)務(wù)情況調(diào)整
mapedFileSizeConsumeQueue=300000

# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
# 存儲路徑
# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
# commitLog 存儲路徑
# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
# 消費隊列存儲
# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
# 消息索引存儲路徑
# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
# checkpoint 文件存儲路徑
# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
# abort 文件存儲路徑
# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# 限制的消息大小
maxMessageSize=65536

# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000

# Broker 的角色
# - ASYNC_MASTER 異步復(fù)制Master
# - SYNC_MASTER 同步雙寫Master
# - SLAVE
brokerRole=ASYNC_MASTER

# 刷盤方式
# - ASYNC_FLUSH 異步刷盤
# - SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH

# 發(fā)消息線程池數(shù)量
# sendMessageThreadPoolNums=128
# 拉消息線程池數(shù)量
# pullMessageThreadPoolNums=128

RocketMQ 控制臺

訪問 http://192.168.198.131:8070 登入控制臺

由于本教程整個案例基于 Spring Cloud,故我們采用 Spring Cloud Stream 完成一次發(fā)布和訂閱

官方教程
Spring Cloud Alibaba RocketMQ

Spring Cloud Stream

Spring Cloud Stream 是一個用于構(gòu)建基于消息的微服務(wù)應(yīng)用框架衣吠。它基于 SpringBoot 來創(chuàng)建具有生產(chǎn)級別的單機 Spring 應(yīng)用茶敏,并且使用 Spring Integration 與 Broker 進行連接。

Spring Cloud Stream 提供了消息中間件配置的統(tǒng)一抽象缚俏,推出了 publish-subscribe惊搏、consumer groups贮乳、partition 這些統(tǒng)一的概念。

Spring Cloud Stream 內(nèi)部有兩個概念:Binder 和 Binding胀屿。

  • Binder: 跟外部消息中間件集成的組件,用來創(chuàng)建 Binding包雀,各消息中間件都有自己的 Binder 實現(xiàn)宿崭。

比如 Kafka 的實現(xiàn) KafkaMessageChannelBinderRabbitMQ 的實現(xiàn) RabbitMessageChannelBinder 以及 RocketMQ 的實現(xiàn) RocketMQMessageChannelBinder才写。

  • Binding: 包括 Input Binding 和 Output Binding葡兑。

Binding 在消息中間件與應(yīng)用程序提供的 Provider 和 Consumer 之間提供了一個橋梁,實現(xiàn)了開發(fā)者只需使用應(yīng)用程序的 Provider 或 Consumer 生產(chǎn)或消費數(shù)據(jù)即可赞草,屏蔽了開發(fā)者與底層消息中間件的接觸讹堤。

SCSt overview

Figure 4. Spring Cloud Stream

使用 Spring Cloud Stream 完成一段簡單的消息發(fā)送和消息接收代碼:

MessageChannel messageChannel = new DirectChannel();

// 消息訂閱
((SubscribableChannel) messageChannel).subscribe(new MessageHandler() {
    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        System.out.println("receive msg: " + message.getPayload());
    }
});

// 消息發(fā)送
messageChannel.send(MessageBuilder.withPayload("simple msg").build());

這段代碼所有的消息類都是 spring-messaging 模塊里提供的。屏蔽具體消息中間件的底層實現(xiàn)厨疙,如果想用更換消息中間件洲守,在配置文件里配置相關(guān)消息中間件信息以及修改 binder 依賴即可。

Spring Cloud Stream 底層基于這段代碼去做了各種抽象沾凄。

解決連接超時問題

我們采用 Docker 部署了 RocketMQ 服務(wù)梗醇,此時 RocketMQ Broker 暴露的地址和端口(10909,10911)是基于容器的撒蟀,會導(dǎo)致我們開發(fā)機無法連接叙谨,從而引發(fā) org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout 異常

注意下圖中的 IP 地址,這個是容器的 IP保屯,開發(fā)機與容器不在一個局域網(wǎng)所以無法連接手负。

解決方案是在 broker.conf 配置文件中增加 brokerIP1=宿主機IP 即可

POM

參考文檔,此處使用com.alibaba.cloud:spring-cloud-starter-stream-rocketmq

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>com.wj</groupId>
        <artifactId>hello-spring-cloud-alibaba-dependencies</artifactId>
        <version>1.0.0-SNAPSHOT</version>
        <relativePath>../hello-spring-cloud-alibaba-dependencies/pom.xml</relativePath>
    </parent>

    <artifactId>hello-spring-cloud-alibaba-rocketmq-provider</artifactId>
    <packaging>jar</packaging>

    <name>hello-spring-cloud-alibaba-rocketmq-provider</name>


    <dependencies>
        <!-- Spring Boot Begin -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- Spring Boot End -->

        <!-- Spring Cloud Begin -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>
        <!-- Spring Cloud End -->
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <mainClass>com.wj.hello.spring.cloud.alibaba.rocketmq.provider.RocketMQProviderApplication</mainClass>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Application

package com.wj.hello.spring.cloud.alibaba.rocketmq.provider;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

@SpringBootApplication
@EnableBinding({Source.class})
public class RocketMQProviderApplication implements CommandLineRunner {
    public static void main(String[] args) {
        SpringApplication.run(RocketMQProviderApplication.class,args);
    }

    @Autowired
    private MessageChannel output;

    @Override
    public void run(String... args) throws Exception {
        output.send(MessageBuilder.withPayload("Hello RockerMQ").build());
    }
}

application.yml

spring:
  application:
    name: rocketmq-provider
  cloud:
    stream:
      rocketmq:
        binder:
          # RocketMQ 服務(wù)器地址
          namesrv-server: 192.168.198.131:9876
      bindings:
        # 這里是個 Map 類型參數(shù)姑尺,{} 為 YAML 中 Map 的行內(nèi)寫法
        output: {destination: test-topic, content-type: application/json}

server:
  port: 9093

management:
  endpoints:
    web:
      exposure:
        include: '*'

運行成功后即可在 RocketMQ 控制臺的 消息 列表中選擇 test-topic 主題即可看到發(fā)送的消息

RocketMQ 消費者

POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>com.wj</groupId>
        <artifactId>hello-spring-cloud-alibaba-dependencies</artifactId>
        <version>1.0.0-SNAPSHOT</version>
        <relativePath>../hello-spring-cloud-alibaba-dependencies/pom.xml</relativePath>
    </parent>

    <artifactId>hello-spring-cloud-alibaba-rocketmq-consumer</artifactId>
    <packaging>jar</packaging>

    <name>hello-spring-cloud-alibaba-rocketmq-consumer</name>

    <dependencies>
        <!-- Spring Boot Begin -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- Spring Boot End -->

        <!-- Spring Cloud Begin -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>
        <!-- Spring Cloud End -->
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <mainClass>com.wj.hello.spring.cloud.alibaba.rocketmq.consumer.RocketMQConsumerApplication</mainClass>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Application

package com.wj.hello.spring.cloud.alibaba.rocketmq.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;

@SpringBootApplication
@EnableBinding({Sink.class})
public class RocketMQConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(RocketMQConsumerApplication.class, args);
    }

    @StreamListener("input")
    private void inputMessage(String message){
        System.out.printf("input receive: " + message);
    }
}

application.yml

spring:
  application:
    name: rocketmq-consumer
  cloud:
    stream:
      rocketmq:
        binder:
          namesrv-server: 192.168.198.131:9876
        bindings:
          input: {consumer.orderly: true}
      bindings:
        input: {destination: test-topic, content-type: text/plain, group: test-group, consumer.maxAttempts: 1}

server:
  port: 9094

management:
  endpoints:
    web:
      exposure:
        include: '*'

運行成功后即可在控制臺接收到消息:input receive:: Hello RocketMQ

rocketmq導(dǎo)圖

推薦文章:
https://www.cnblogs.com/goodAndyxublog/p/11457164.html
https://www.funtl.com/zh/spring-cloud-alibaba/RocketMQ-%E7%AE%80%E4%BB%8B.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末竟终,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子切蟋,更是在濱河造成了極大的恐慌衡楞,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,542評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件敦姻,死亡現(xiàn)場離奇詭異瘾境,居然都是意外死亡,警方通過查閱死者的電腦和手機镰惦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,822評論 3 394
  • 文/潘曉璐 我一進店門迷守,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人旺入,你說我怎么就攤上這事兑凿】Γ” “怎么了?”我有些...
    開封第一講書人閱讀 163,912評論 0 354
  • 文/不壞的土叔 我叫張陵礼华,是天一觀的道長咐鹤。 經(jīng)常有香客問我,道長圣絮,這世上最難降的妖魔是什么祈惶? 我笑而不...
    開封第一講書人閱讀 58,449評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮扮匠,結(jié)果婚禮上捧请,老公的妹妹穿的比我還像新娘。我一直安慰自己棒搜,他們只是感情好疹蛉,可當(dāng)我...
    茶點故事閱讀 67,500評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著力麸,像睡著了一般可款。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上克蚂,一...
    開封第一講書人閱讀 51,370評論 1 302
  • 那天筑舅,我揣著相機與錄音,去河邊找鬼陨舱。 笑死翠拣,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的游盲。 我是一名探鬼主播误墓,決...
    沈念sama閱讀 40,193評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼益缎!你這毒婦竟也來了谜慌?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,074評論 0 276
  • 序言:老撾萬榮一對情侶失蹤莺奔,失蹤者是張志新(化名)和其女友劉穎欣范,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體令哟,經(jīng)...
    沈念sama閱讀 45,505評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡恼琼,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,722評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了屏富。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片晴竞。...
    茶點故事閱讀 39,841評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖狠半,靈堂內(nèi)的尸體忽然破棺而出噩死,到底是詐尸還是另有隱情颤难,我是刑警寧澤,帶...
    沈念sama閱讀 35,569評論 5 345
  • 正文 年R本政府宣布已维,位于F島的核電站行嗤,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏垛耳。R本人自食惡果不足惜栅屏,卻給世界環(huán)境...
    茶點故事閱讀 41,168評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望艾扮。 院中可真熱鬧既琴,春花似錦占婉、人聲如沸泡嘴。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,783評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽酌予。三九已至,卻和暖如春奖慌,著一層夾襖步出監(jiān)牢的瞬間抛虫,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,918評論 1 269
  • 我被黑心中介騙來泰國打工简僧, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留建椰,地道東北人。 一個月前我還...
    沈念sama閱讀 47,962評論 2 370
  • 正文 我出身青樓岛马,卻偏偏與公主長得像棉姐,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子啦逆,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,781評論 2 354

推薦閱讀更多精彩內(nèi)容