C1-RabbitMQ(消息隊列)--- 分解 A --- 2021年7月4日 14:27:09

分布式框架中間件總綱

http://www.reibang.com/p/00aa796bb5b8

友情鏈接(消息三解序)

1彻坛、RabbitMQ(消息隊列)--- 分解 A
2、RabbitMQ(消息隊列)--- 分解 B
3、RabbitMQ(消息隊列)--- 分解 C
4昌屉、RabbitMQ(消息隊列)--- 面試題

本章目錄

一钙蒙、消息隊列
???????1、MQ相關概念
??????????????什么是
??????????????作用
??????????????分類
??????????????選擇
???????2间驮、RabbotMQ
??????????????概念
??????????????核心概念
??????????????核心部分
??????????????各個名詞介紹
??????????????安裝
二躬厌、hello world
???????1、搭建版本依賴(maven)
???????2竞帽、消息生產(chǎn)者
???????3扛施、消息消費者
三、Work Queues
???????1屹篓、輪訓分發(fā)消息
???????2疙渣、消息應答
???????3、RabbitMQ 持久化

一堆巧、消息隊列

1妄荔、MQ 的相關概念

1.1、什么是MQ

MQ(message queue)谍肤,從字面意思上看啦租,本質(zhì)是個隊列,F(xiàn)IFO 先入先出荒揣,只不過隊列中存放的內(nèi)容是
message 而已篷角,還是一種跨進程的通信機制,用于上下游傳遞消息系任。在互聯(lián)網(wǎng)架構(gòu)中内地,MQ 是一種非常常
見的上下游“邏輯解耦+物理解耦”的消息通信服務。使用了 MQ 之后赋除,消息發(fā)送上游只需要依賴 MQ阱缓,不
用依賴其他服務。

1.2举农、為什么要用MQ

1.2.1荆针、流量消峰

舉個例子,如果訂單系統(tǒng)最多能處理一萬次訂單颁糟,這個處理能力應付正常時段的下單時綽綽有余航背,正
常時段我們下單一秒后就能返回結(jié)果。但是在高峰期棱貌,如果有兩萬次下單操作系統(tǒng)是處理不了的玖媚,只能限
制訂單超過一萬后不允許用戶下單。使用消息隊列做緩沖婚脱,我們可以取消這個限制今魔,把一秒內(nèi)下的訂單分
散成一段時間來處理勺像,這時有些用戶可能在下單十幾秒后才能收到下單成功的操作,但是比不能下單的體
驗要好错森。(阿K:這里的十幾秒不是頁面卡主十幾秒吟宦,而是十幾秒后收到通知)

1.2.2、應用解耦

以電商應用為例涩维,應用中有訂單系統(tǒng)殃姓、庫存系統(tǒng)、物流系統(tǒng)瓦阐、支付系統(tǒng)蜗侈。用戶創(chuàng)建訂單后,如果耦合
調(diào)用庫存系統(tǒng)睡蟋、物流系統(tǒng)踏幻、支付系統(tǒng),任何一個子系統(tǒng)出了故障薄湿,都會造成下單操作異常。當轉(zhuǎn)變成基于
消息隊列的方式后偷卧,系統(tǒng)間調(diào)用的問題會減少很多豺瘤,比如物流系統(tǒng)因為發(fā)生故障,需要幾分鐘來修復听诸。在
這幾分鐘的時間里坐求,物流系統(tǒng)要處理的內(nèi)存被緩存在消息隊列中,用戶的下單操作可以正常完成晌梨。當物流
系統(tǒng)恢復后桥嗤,繼續(xù)處理訂單信息即可,中單用戶感受不到物流系統(tǒng)的故障仔蝌,提升系統(tǒng)的可用性泛领。


image.png
1.2.3耍群、異步處理

有些服務間調(diào)用是異步的巴碗,例如 A 調(diào)用 B,B 需要花費很長時間執(zhí)行基跑,但是 A 需要知道 B 什么時候可
以執(zhí)行完瞧挤,以前一般有兩種方式锡宋,A 過一段時間去調(diào)用 B 的查詢 api 查詢√靥瘢或者 A 提供一個 callback api执俩,
B 執(zhí)行完之后調(diào)用 api 通知 A 服務。這兩種方式都不是很優(yōu)雅癌刽,使用消息總線役首,可以很方便解決這個問題尝丐,
A 調(diào)用 B 服務后,只需要監(jiān)聽 B 處理完成的消息宋税,當 B 處理完成后摊崭,會發(fā)送一條消息給 MQ,MQ 會將此消
息轉(zhuǎn)發(fā)給 A 服務杰赛。這樣 A 服務既不用循環(huán)調(diào)用 B 的查詢 api呢簸,也不用提供 callback api。同樣B 服務也不用
做這些操作乏屯。A 服務還能及時的得到異步處理成功的消息根时。


image.png

1.3、MQ 的分類

1.3.1辰晕、ActiveMQ(淘汰)

優(yōu)點:單機吞吐量萬級蛤迎,時效性 ms 級,可用性高含友,基于主從架構(gòu)實現(xiàn)高可用性替裆,消息可靠性較
低的概率丟失數(shù)據(jù)

缺點:官方社區(qū)現(xiàn)在對 ActiveMQ 5.x 維護越來越少,高吞吐量場景較少使用窘问。

1.3.2辆童、Kafka(大數(shù)據(jù)領域)

大數(shù)據(jù)的殺手锏,談到大數(shù)據(jù)領域內(nèi)的消息傳輸惠赫,則繞不開 Kafka把鉴,這款為大數(shù)據(jù)而生的消息中間件,
以其百萬級 TPS 的吞吐量名聲大噪儿咱,迅速成為大數(shù)據(jù)領域的寵兒庭砍,在數(shù)據(jù)采集、傳輸混埠、存儲的過程中發(fā)揮
著舉足輕重的作用怠缸。目前已經(jīng)被 LinkedIn,Uber, Twitter, Netflix 等大公司所采納钳宪。

優(yōu)點: 性能卓越凯旭,單機寫入 TPS 約在百萬條/秒,最大的優(yōu)點使套,就是吞吐量高罐呼。時效性 ms 級可用性非
常高,kafka 是分布式的侦高,一個數(shù)據(jù)多個副本嫉柴,少數(shù)機器宕機,不會丟失數(shù)據(jù)奉呛,不會導致不可用,消費者采
用 Pull 方式獲取消息, 消息有序, 通過控制能夠保證所有消息被消費且僅被消費一次;有優(yōu)秀的第三方Kafka
Web 管理界面 Kafka-Manager计螺;在日志領域比較成熟夯尽,被多家公司和多個開源項目使用;功能支持: 功能
較為簡單登馒,主要支持簡單的 MQ 功能匙握,在大數(shù)據(jù)領域的實時計算以及日志采集被大規(guī)模使用

缺點:Kafka 單機超過 64 個隊列/分區(qū),Load 會發(fā)生明顯的飆高現(xiàn)象陈轿,隊列越多圈纺,load 越高,發(fā)送消
息響應時間變長麦射,使用短輪詢方式蛾娶,實時性取決于輪詢間隔時間,消費失敗不支持重試潜秋;支持消息順序蛔琅,
但是一臺代理宕機后,就會產(chǎn)生消息亂序峻呛,社區(qū)更新較慢罗售;

1.3.3、RocketMQ(阿里親兒子)

RocketMQ 出自阿里巴巴的開源產(chǎn)品钩述,用 Java 語言實現(xiàn)寨躁,在設計時參考了 Kafka,并做出了自己的一
些改進切距。被阿里巴巴廣泛應用在訂單朽缎,交易惨远,充值谜悟,流計算,消息推送北秽,日志流式處理葡幸,binglog 分發(fā)等場
景。

優(yōu)點:單機吞吐量十萬級,可用性非常高贺氓,分布式架構(gòu),消息可以做到 0 丟失,MQ 功能較為完善蔚叨,還是分
布式的,擴展性好,支持 10 億級別的消息堆積辙培,不會因為堆積導致性能下降,源碼是 java 我們可以自己閱
讀源碼蔑水,定制自己公司的 MQ

缺點:支持的客戶端語言不多,目前是 java 及 c++扬蕊,其中 c++不成熟搀别;社區(qū)活躍度一般,沒有在MQ
核心中去實現(xiàn) JMS 等接口,有些系統(tǒng)要遷移需要修改大量代碼

1.3.4、RabbitMQ(主流人氣王)

2007 年發(fā)布尾抑,是一個在AMQP(高級消息隊列協(xié)議)基礎上完成的歇父,可復用的企業(yè)消息系統(tǒng)蒂培,是當前最
主流的消息中間件之一。

優(yōu)點:由于 erlang 語言的高并發(fā)特性榜苫,性能較好护戳;吞吐量到萬級,MQ 功能比較完備,健壯垂睬、穩(wěn)定媳荒、易
用、跨平臺羔飞、支持多種語言 如:Python肺樟、Ruby、.NET逻淌、Java么伯、JMS、C卡儒、PHP田柔、ActionScript、XMPP骨望、STOMP
等硬爆,支持 AJAX 文檔齊全;開源提供的管理界面非常棒擎鸠,用起來很好用,社區(qū)活躍度高缀磕;更新頻率相當高

缺點:商業(yè)版需要收費,學習成本較高

1.4、MQ 的選擇

1劣光、Kafka

Kafka 主要特點是基于Pull 的模式來處理消息消費袜蚕,追求高吞吐量,一開始的目的就是用于日志收集
和傳輸绢涡,適合產(chǎn)生大量數(shù)據(jù)的互聯(lián)網(wǎng)服務的數(shù)據(jù)收集業(yè)務牲剃。大型公司建議可以選用,如果有日志采集功能雄可,
肯定是首選 kafka 了凿傅。

2、RocketMQ

天生為金融互聯(lián)網(wǎng)領域而生数苫,對于可靠性要求很高的場景聪舒,尤其是電商里面的訂單扣款,以及業(yè)務削
峰虐急,在大量交易涌入時箱残,后端可能無法及時處理的情況。RoketMQ 在穩(wěn)定性上可能更值得信賴戏仓,這些業(yè)務
場景在阿里雙 11 已經(jīng)經(jīng)歷了多次考驗疚宇,如果你的業(yè)務有上述并發(fā)場景亡鼠,建議可以選擇 RocketMQ。

3敷待、RabbitMQ

結(jié)合 erlang 語言本身的并發(fā)優(yōu)勢间涵,性能好時效性微秒級,社區(qū)活躍度也比較高榜揖,管理界面用起來十分
方便勾哩,如果你的數(shù)據(jù)量沒有那么大,中小型公司優(yōu)先選擇功能比較完備的 RabbitMQ举哟。

2思劳、RabbitMQ

2.1、RabbitMQ 的概念

RabbitMQ 是一個消息中間件:它接受并轉(zhuǎn)發(fā)消息妨猩。你可以把它當做一個快遞站點潜叛,當你要發(fā)送一個包
裹時,你把你的包裹放到快遞站壶硅,快遞員最終會把你的快遞送到收件人那里威兜,按照這種邏輯 RabbitMQ 是
一個快遞站,一個快遞員幫你傳遞快件庐椒。RabbitMQ 與快遞站的主要區(qū)別在于椒舵,它不處理快件而是接收,
存儲和轉(zhuǎn)發(fā)消息數(shù)據(jù)约谈。

2.2笔宿、四大核心概念

(1)生產(chǎn)者:產(chǎn)生數(shù)據(jù)發(fā)送消息的程序是生產(chǎn)者

(2)交換機:
交換機是 RabbitMQ 非常重要的一個部件,一方面它接收來自生產(chǎn)者的消息棱诱,另一方面它將消息
推送到隊列中泼橘。交換機必須確切知道如何處理它接收到的消息,是將這些消息推送到特定隊列還是推
送到多個隊列军俊,亦或者是把消息丟棄侥加,這個得有交換機類型決定(阿K:有點jvm的味道)

(3)隊列:
隊列是 RabbitMQ 內(nèi)部使用的一種數(shù)據(jù)結(jié)構(gòu)捧存,盡管消息流經(jīng) RabbitMQ 和應用程序粪躬,但它們只能存
儲在隊列中。隊列僅受主機的內(nèi)存和磁盤限制的約束昔穴,本質(zhì)上是一個大的消息緩沖區(qū)镰官。許多生產(chǎn)者可
以將消息發(fā)送到一個隊列,許多消費者可以嘗試從一個隊列接收數(shù)據(jù)吗货。這就是我們使用隊列的方式(阿K:不歸普通人管理的營業(yè)員)

(4)消費者:
消費與接收具有相似的含義泳唠。消費者大多時候是一個等待接收消息的程序。請注意生產(chǎn)者宙搬,消費
者和消息中間件很多時候并不在同一機器上笨腥。同一個應用程序既可以是生產(chǎn)者又是可以是消費者拓哺。(阿K:你可以喜歡女的,也可以喜歡男的脖母,也可以同時)

2.3士鸥、Rabbit 核心部分

image.png

2.4、各個名詞介紹

image.png

(1)Broker:接收和分發(fā)消息的應用谆级,RabbitMQ Server 就是 Message Broker

(2)Virtual host:出于多租戶和安全因素設計的烤礁,把 AMQP 的基本組件劃分到一個虛擬的分組中,類似
于網(wǎng)絡中的 namespace 概念肥照。當多個不同的用戶使用同一個 RabbitMQ server 提供的服務時脚仔,可以劃分出
多個 vhost,每個用戶在自己的 vhost 創(chuàng)建 exchange/queue 等

(3)Connection:publisher/consumer 和 broker 之間的 TCP 連接

(4)Channel:如果每一次訪問 RabbitMQ 都建立一個 Connection舆绎,在消息量大的時候建立 TCP
Connection 的開銷將是巨大的鲤脏,效率也較低。Channel 是在 connection 內(nèi)部建立的邏輯連接吕朵,如果應用程
序支持多線程凑兰,通常每個 thread 創(chuàng)建單獨的 channel 進行通訊,AMQP method 包含了 channel id 幫助客
戶端和 message broker 識別 channel边锁,所以 channel 之間是完全隔離的姑食。( Channel 作為輕量級的 Connection
極大減少了操作系統(tǒng)建立 TCP connection 的開銷 )

(5)Exchange:message 到達 broker 的第一站,根據(jù)分發(fā)規(guī)則茅坛,匹配查詢表中的 routing key音半,分發(fā)
消息到 queue 中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

(6)Queue:消息最終被送到這里等待 consumer 取走

(7)Binding:exchange 和 queue 之間的虛擬連接贡蓖,binding 中可以包含 routing key曹鸠,Binding 信息被保
存到 exchange 中的查詢表中,用于 message 的分發(fā)依據(jù)

2.5斥铺、安裝(實操)

2.5.1彻桃、文件上傳(加QQ群 62263397 可以獲取)
image.png
2.5.2晾蜘、安裝文件(分別按照以下順序安裝)

rpm -ivh erlang-21.3-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

2.5.3邻眷、常用命令(按照以下順序執(zhí)行)

(1)添加開機啟動 RabbitMQ 服務
chkconfig rabbitmq-server on
(2)啟動服務
/sbin/service rabbitmq-server start
(3)查看服務狀態(tài)
/sbin/service rabbitmq-server status

image.png

(4)停止服務(選擇執(zhí)行)
/sbin/service rabbitmq-server stop
(5)開啟 web 管理插件
rabbitmq-plugins enable rabbitmq_management

用默認賬號密碼(guest)訪問地址 http://106.52.23.202:15672 出現(xiàn)權(quán)限問題


image.png
2.5.4、添加一個新的用戶

(1)創(chuàng)建賬號:rabbitmqctl add_user mykk abc666

(2)設置用戶角色:rabbitmqctl set_user_tags mykk administrator

(3)設置用戶權(quán)限:
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" mykk ".*" ".*" ".*"
用戶 mykk 具有/vhost1 這個 virtual host 中所有資源的配置剔交、寫肆饶、讀權(quán)限

(4)當前用戶和角色:rabbitmqctl list_users

2.5.5、再次利用 admin 用戶登錄
image.png
2.5.6岖常、重置命令

(1)關閉應用的命令:rabbitmqctl stop_app
(2)清除的命令:rabbitmqctl reset (會清除掉剛才設置的用戶驯镊,初始化的意思)
(3)重新啟動命令:rabbitmqctl start_app

二、Hello world

我們將用 Java 編寫兩個程序。發(fā)送單個消息的生產(chǎn)者和接收消息并打印
出來的消費者板惑。我們將介紹 Java API 中的一些細節(jié)橄镜。
在下圖中,“ P”是我們的生產(chǎn)者冯乘,“ C”是我們的消費者蛉鹿。中間的框是一個隊列-RabbitMQ 代
表使用者保留的消息緩沖區(qū)


image.png

1、版本依賴

    <!--指定 jdk 編譯版本-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <!--rabbitmq 依賴客戶端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <!--操作文件流的一個依賴-->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>

2往湿、消息生產(chǎn)者

public class Producer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        //創(chuàng)建一個連接工廠
        ConnectionFactory factory = new ConnectionFactory ( );
        factory.setHost ("106.52.23.202");
        factory.setUsername ("mykk");
        factory.setPassword ("abc666");

        //channel 實現(xiàn)了自動 close 接口 自動關閉 不需要顯示關閉
        try {
            Connection connection = factory.newConnection ( );
            Channel channel = connection.createChannel ( );


            /**
             * 生成一個隊列
             * 1.隊列名稱
             * 2.隊列里面的消息是否持久化 默認消息存儲在內(nèi)存中
             * 3.該隊列是否只供一個消費者進行消費 是否進行共享 true 可以多個消費者消費
             * 4.是否自動刪除 最后一個消費者端開連接以后 該隊列是否自動刪除 true 自動刪除
             * 5.其他參數(shù)
             */
            channel.queueDeclare (QUEUE_NAME, false, false, false, null);
            String message = "hello world妖异,請求阿K接收消息";

            /**
             * 發(fā)送一個消息
             * 1.發(fā)送到那個交換機
             * 2.路由的 key 是哪個
             * 3.其他的參數(shù)信息
             * 4.發(fā)送消息的消息體
             */
            channel.basicPublish ("", QUEUE_NAME, null, message.getBytes ( ));
            System.out.println ("消息發(fā)送完畢");

        } catch (Exception e) {
            e.getStackTrace ( );
        }
    }
}

3、消息消費者

public class Consumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        //創(chuàng)建一個連接工廠
        ConnectionFactory factory = new ConnectionFactory ( );
        factory.setHost ("106.52.23.202");
        factory.setUsername ("mykk");
        factory.setPassword ("abc666");

        try {
            Connection connection = factory.newConnection ( );
            Channel channel = connection.createChannel ( );
            System.out.println ("等待接收消息.........");

            //推送的消息如何進行消費的接口回調(diào)
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String (delivery.getBody ( ));
                System.out.println (message);
            };

            //取消消費的一個回調(diào)接口 如在消費的時候隊列被刪除掉了
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println ("消息消費被中斷");
            };

            /**
             * 消費者消費消息
             * 1.消費哪個隊列
             * 2.消費成功之后是否要自動應答 true 代表自動應答 false 手動應答
             * 3.消費者未成功消費的回調(diào)
             */
            channel.basicConsume (QUEUE_NAME, true, deliverCallback, cancelCallback);

        } catch (IOException e) {
            e.printStackTrace ( );
        } catch (TimeoutException e) {
            e.printStackTrace ( );
        }
    }
}

三领追、Work Queues

工作隊列(又稱任務隊列)的主要思想是避免立即執(zhí)行資源密集型任務他膳,而不得不等待它完成。
相反我們安排任務在之后執(zhí)行绒窑。我們把任務封裝為消息并將其發(fā)送到隊列棕孙。在后臺運行的工作進
程將彈出任務并最終執(zhí)行作業(yè)。當有多個工作線程時些膨,這些工作線程將一起處理這些任務蟀俊。

1、輪訓分發(fā)消息

在這個案例中我們會啟動兩個工作線程订雾,一個消息發(fā)送線程肢预,我們來看看他們兩個工作線程是如何工作的。

1.1洼哎、抽取工具類

public class RabbitMqUtils {
    //得到一個連接的 channel
    public static Channel getChannel() throws Exception {
        //創(chuàng)建一個連接工廠
        ConnectionFactory factory = new ConnectionFactory ( );
        factory.setHost ("106.52.23.202");
        factory.setUsername ("mykk");
        factory.setPassword ("abc666");
        Connection connection = factory.newConnection ( );
        Channel channel = connection.createChannel ( );
        return channel;
    }
}

1.2烫映、啟動兩個工作線程

class Work1 {
    private static final String QUEUE_NAME = "hello";
    private static SimpleDateFormat sdf = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss");

    public static void main(String[] args) {
        try {
            Channel channel = RabbitMqUtils.getChannel ( );

            //推送的消息如何進行消費的接口回調(diào)
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String receivedMessage = new String (delivery.getBody ( ));
                System.out.println (sdf.format (new Date ().getTime ())+"接收到消息:" + receivedMessage);
            };

            //取消消費的一個回調(diào)接口 如在消費的時候隊列被刪除掉了
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println (consumerTag + "消費者取消消費接口回調(diào)邏輯");
            };

            System.out.println ("C2 消費者啟動等待消費.................. ");
            channel.basicConsume (QUEUE_NAME, true, deliverCallback, cancelCallback);

        } catch (Exception e) {
            e.printStackTrace ( );
        }
    }
}

設置多開


image.png

image.png

1.3、啟動一個發(fā)送線程

class Task1 {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        try {
            Channel channel = RabbitMqUtils.getChannel ( );
            channel.queueDeclare (QUEUE_NAME, false, false, false, null);

            //從控制臺當中接受信息
            Scanner scanner = new Scanner (System.in);
            while (scanner.hasNext ( )) {
                String message = scanner.next ( );
                channel.basicPublish ("", QUEUE_NAME, null, message.getBytes ( ));
                System.out.println ("發(fā)送消息完成:" + message);
            }
        } catch (Exception e) {
            e.printStackTrace ( );
        }
    }
}

1.4噩峦、測試

通過程序執(zhí)行發(fā)現(xiàn)生產(chǎn)者總共發(fā)送 6 個消息锭沟,消費者 1 和消費者 2 分別分得三個消息,
并且是按照有序的一個接收一次消息


image.png

2识补、消息應答

2.1族淮、概念

消費者完成一個任務可能需要一段時間,如果其中一個消費者處理一個長的任務并僅只完成
了部分突然它掛掉了凭涂,會發(fā)生什么情況。RabbitMQ 一旦向消費者傳遞了一條消息导盅,便立即將該消
息標記為刪除揍瑟。在這種情況下,突然有個消費者掛掉了滤馍,我們將丟失正在處理的消息。以及后續(xù)
發(fā)送給該消費這的消息槐瑞,因為它無法接收到阁苞。

為了保證消息在發(fā)送過程中不丟失,rabbitmq 引入消息應答機制那槽,消息應答就是:消費者在接收
到消息并且處理該消息之后悼沿,告訴 rabbitmq 它已經(jīng)處理了,rabbitmq 可以把該消息刪除了骚灸。

2.2糟趾、自動應答

消息發(fā)送后立即被認為已經(jīng)傳送成功,這種模式需要在高吞吐量和數(shù)據(jù)傳輸安全性方面做權(quán)
衡,因為這種模式如果消息在接收到之前甚牲,消費者那邊出現(xiàn)連接或者 channel 關閉义郑,那么消息就丟失 了,
當然另一方面這種模式消費者那邊可以傳遞過載的消息,沒有對傳遞的消息數(shù)量進行限制丈钙,當
然這樣有可能使得消費者這邊由于接收太多還來不及處理的消息非驮,導致這些消息的積壓,最終使
得內(nèi)存耗盡雏赦,最終這些消費者線程被操作系統(tǒng)殺死院尔,所以這種模式僅適用在消費者可以高效并以
某種速率能夠處理這些消息的情況下使用。

2.3喉誊、消息應答的方法

  1. Channel.basicAck(用于肯定確認) 邀摆,RabbitMQ 已知道該消息并且成功的處理消息,可以將其丟棄了
  2. Channel.basicNack(用于否定確認)
  3. Channel.basicReject(用于否定確認) 伍茄,與 Channel.basicNack 相比少一個參數(shù)栋盹,不處理該消息了
    直接拒絕,可以將其丟棄了

2.4敷矫、Multiple 的解釋

手動應答的好處是可以批量應答并且減少網(wǎng)絡擁堵


image.png

multiple 的 true 和 false 代表不同意思

  1. true 代表批量應答 channel 上未應答的消息 例获,比如說 channel 上有傳送 tag 的消息 5,6,7,8
    當前 tag 是8 那么此時,5-8 的這些還未應答的消息都會被確認收到消息應答(所有)


    image.png
  2. false 同上面相比曹仗, 只會應答 tag=8 的消息 5,6,7 這三個消息依然不會被確認收到消息應答(最后一個)


    image.png

2.5怎茫、消息自動重新入隊

如果消費者由于某些原因失去連接(其通道已關閉,連接已關閉或 TCP 連接丟失)虫埂,導致消息
未發(fā)送 ACK 確認掉伏,RabbitMQ 將了解到消息未完全處理斧散,并將對其重新排隊颅湘。如果此時其他消費者
可以處理闯参,它將很快將其重新分發(fā)給另一個消費者。這樣脚草,即使某個消費者偶爾死亡馏慨,也可以確
保不會丟失任何消息写隶。


image.png

2.6、消息手動應答代碼

默認消息采用的是自動應答鄙陡,所以我們要想實現(xiàn)消息消費過程中不丟失耙册,需要把自動應答改
為手動應答详拙,消費者在上面代碼的基礎上增加下面畫紅色部分代碼溪厘。

消費者和生產(chǎn)者代碼(為了方便全部寫在內(nèi)部類畸悬,并且為靜態(tài)蹋宦,方便調(diào)用)

package com.kk.rabbitmq;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.util.Scanner;

// 模擬手動應答冷冗,消費者A睡眠,模擬宕機思灌,消費者B代替消費
public class Ack {
}

/**
 * 生產(chǎn)者
 */
class Tsk{
    private static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel ( );
        channel.queueDeclare (TASK_QUEUE_NAME,false,false,false,null);

        Scanner sc = new Scanner (System.in);
        System.out.println("請輸入信息");
        while (sc.hasNext()) {
            String message = sc.nextLine ( );
            channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println("生產(chǎn)者發(fā)出消息" + message);
        }
    }
}
// 睡眠工具類
class SleepUtils {
    public static void sleep(int second) {
        try {
            Thread.sleep (1000 * second);
        } catch (InterruptedException _ignored) {
            Thread.currentThread ( ).interrupt ( );
        }
    }
}
/**
 * 消費者1
 */
class Work1 {
    private static final String ACK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel ( );
        System.out.println ("C1 等待接收消息處理時間較短");
        // 消息消費的時候如何處理消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String (delivery.getBody ( ));
            SleepUtils.sleep (1);
            System.out.println ("接收到消息:" + message);

            /**
             * 1.消息標記 tag
             * 2.是否批量應答未應答消息
             */
            channel.basicAck (delivery.getEnvelope ().getDeliveryTag (),false);
        };

        // 采用手動應答
        boolean autoAck = false;
        channel.basicConsume (ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
            System.out.println (consumerTag+"消費者取消消費接口回調(diào)邏輯" );
        });
    }
}

/**
 * 消費者2
 */
class Work2 {
    private static final String ACK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel ( );
        System.out.println ("C2 等待接收消息處理時間較長");
        // 消息消費的時候如何處理消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String (delivery.getBody ( ));
            SleepUtils.sleep (5);
            System.out.println ("接收到消息:" + message);

            /**
             * 1.消息標記 tag
             * 2.是否批量應答未應答消息
             */
            channel.basicAck (delivery.getEnvelope ().getDeliveryTag (),false);
        };

        // 采用手動應答
        boolean autoAck = false;
        channel.basicConsume (ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
            System.out.println (consumerTag+"消費者取消消費接口回調(diào)邏輯" );
        });
    }
}

2.7、手動應答效果

正常情況下消息發(fā)送方發(fā)送兩個消息 C1 和 C2 分別接收到消息并進行處理

image.png

在發(fā)送者發(fā)送消息 dd,發(fā)出消息之后的把 C2 消費者停掉调塌,按理說該 C2 來處理該消息烟阐,但是
由于它處理時間較長蜒茄,在還未處理完檀葛,也就是說 C2 還沒有執(zhí)行 ack 代碼的時候屿聋,C2 被停掉了转锈,
此時會看到消息被 C1 接收到了撮慨,說明消息 dd 被重新入隊砌溺,然后分配給能處理消息的 C1 處理了

3规伐、RabbitMQ 持久化

3.1猖闪、概念

剛剛我們已經(jīng)看到了如何處理任務不丟失的情況,但是如何保障當 RabbitMQ 服務停掉以后消
息生產(chǎn)者發(fā)送過來的消息不丟失检柬。默認情況下 RabbitMQ 退出或由于某種原因崩潰時何址,它忽視隊列
和消息用爪,除非告知它不要這樣做偎血。確保消息不會丟失需要做兩件事:我們需要將隊列和消息都標
記為持久化

3.2颇玷、隊列如何實現(xiàn)持久化

之前我們創(chuàng)建的隊列都是非持久化的,rabbitmq 如果重啟的話空郊,該隊列就會被刪除掉,如果
要隊列實現(xiàn)持久化 需要在聲明隊列的時候把 durable 參數(shù)設置為持久化

        // 讓消息隊列持久化
        boolean durable = true;
        channel.queueDeclare (ACK_QUEUE_NAME,durable,false,false,null);

手動刪除截圖


image.png

需要注意的就是如果之前聲明的隊列不是持久化的,需要把原先隊列先刪除谐腰,或者重新
創(chuàng)建一個持久化的隊列怔蚌,不然就會出現(xiàn)錯誤


image.png

以下為控制臺中持久化與非持久化隊列的 UI 顯示區(qū)、


image.png

image.png

這個時候即使重啟 rabbitmq 隊列也依然存在

3.3、消息如何實現(xiàn)持久化

要想讓消息實現(xiàn)持久化需要在消息生產(chǎn)者修改代碼离福,MessageProperties.PERSISTENT_TEXT_PLAIN 添
加這個屬性妖爷。

            AMQP.BasicProperties Persistence = MessageProperties.PERSISTENT_TEXT_PLAIN;
            channel.basicPublish("", TASK_QUEUE_NAME, Persistence, message.getBytes("UTF-8"));

將消息標記為持久化并不能完全保證不會丟失消息絮识。盡管它告訴 RabbitMQ 將消息保存到磁盤次舌,但是
這里依然存在當消息剛準備存儲在磁盤的時候 但是還沒有存儲完彼念,消息還在緩存的一個間隔點。此時并沒
有真正寫入磁盤吩案。持久性保證并不強务热,但是對于我們的簡單任務隊列而言,這已經(jīng)綽綽有余了捆毫。如果需要
更強有力的持久化策略绩卤,參考后邊《發(fā)布確認》章節(jié)。

3.4凛驮、不公平分發(fā)

在最開始的時候我們學習到 RabbitMQ 分發(fā)消息采用的輪訓分發(fā)黔夭,但是在某種場景下這種策略并不是
很好本姥,比方說有兩個消費者在處理任務,其中有個消費者 1 處理任務的速度非郴曜Γ快密浑,而另外一個消費者 2
處理速度卻很慢尔破,這個時候我們還是采用輪訓分發(fā)的化就會到這處理速度快的這個消費者很大一部分時間
處于空閑狀態(tài)懒构,而處理慢的那個消費者一直在干活胆剧,這種分配方式在這種情況下其實就不太好篙悯,但是
RabbitMQ 并不知道這種情況它依然很公平的進行分發(fā)鸽照。

為了避免這種情況良漱,我們可以設置參數(shù) channel.basicQos(1);

        //  公平分發(fā)(能者多勞)
        channel.basicQos (1); 
image.png
image.png

意思就是如果這個任務我還沒有處理完或者我還沒有應答你,你先別分配給我峡谊,我目前只能處理一個
任務靖苇,然后 rabbitmq 就會把該任務分配給沒有那么忙的那個空閑消費者,當然如果所有的消費者都沒有完
成手上任務埠忘,隊列還在不停的添加新任務莹妒,隊列有可能就會遇到隊列被撐滿的情況旨怠,這個時候就只能添加
新的 worker 或者改變其他存儲任務的策略。

3.5爽哎、說明:以上三段配置,前兩段持久化在生產(chǎn)者祈秕,后一段公平分發(fā)在消費者

/**
 * 生產(chǎn)者
 */
class Tsk{
    private static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel ( );
        // 聲明創(chuàng)建時隊列
        boolean durable = true;// 需要讓Queue持久化
        channel.queueDeclare (TASK_QUEUE_NAME,durable,false,false,null);

        Scanner sc = new Scanner (System.in);
        System.out.println("請輸入信息");
        while (sc.hasNext()) {
            String message = sc.nextLine ( );

            // 消息持久化
            AMQP.BasicProperties Persistence = MessageProperties.PERSISTENT_TEXT_PLAIN;
            channel.basicPublish("", TASK_QUEUE_NAME, Persistence, message.getBytes("UTF-8"));
            System.out.println("生產(chǎn)者發(fā)出消息" + message);
        }
    }
}
/**
 * 消費者1
 */
class Work1 {
    private static final String ACK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel ( );
        System.out.println ("C1 等待接收消息處理時間較短");
        // 消息消費的時候如何處理消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String (delivery.getBody ( ));
            SleepUtils.sleep (1);
            System.out.println ("接收到消息:" + message);

            /**
             * 1.消息標記 tag
             * 2.是否批量應答未應答消息
             */
            channel.basicAck (delivery.getEnvelope ().getDeliveryTag (),false);
        };

        //  公平分發(fā)(能者多勞)
        channel.basicQos (1);

        // 采用手動應答
        boolean autoAck = false;
        channel.basicConsume (ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
            System.out.println (consumerTag+"消費者取消消費接口回調(diào)邏輯" );
        });
    }
}

3.6丑掺、預期值

image.png

簡單來說就是:一開始預估你要拿多少的消息街州,當然這是不公平的唆缴,C2目前設置處理是緩慢的,如果設置為5匣掸,C1為2霎匈,
則會堆積消息在C2上


image.png

參考文章

https://www.cnblogs.com/nbf-156cwl/p/8641165.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市袭厂,隨后出現(xiàn)的幾起案子帖烘,更是在濱河造成了極大的恐慌秘症,老刑警劉巖历极,帶你破解...
    沈念sama閱讀 221,198評論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件趟卸,死亡現(xiàn)場離奇詭異图云,居然都是意外死亡竣况,警方通過查閱死者的電腦和手機筒严,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評論 3 398
  • 文/潘曉璐 我一進店門摹恨,熙熙樓的掌柜王于貴愁眉苦臉地迎上來晒哄,“玉大人寝凌,你說我怎么就攤上這事较木〗儆常” “怎么了?”我有些...
    開封第一講書人閱讀 167,643評論 0 360
  • 文/不壞的土叔 我叫張陵喇喉,是天一觀的道長拣技。 經(jīng)常有香客問我膏斤,道長莫辨,這世上最難降的妖魔是什么盘榨? 我笑而不...
    開封第一講書人閱讀 59,495評論 1 296
  • 正文 為了忘掉前任草巡,我火速辦了婚禮山憨,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘枪孩。我一直安慰自己藻肄,他們只是感情好攻询,可當我...
    茶點故事閱讀 68,502評論 6 397
  • 文/花漫 我一把揭開白布钧栖。 她就那樣靜靜地躺著拯杠,像睡著了一般潭陪。 火紅的嫁衣襯著肌膚如雪依溯。 梳的紋絲不亂的頭發(fā)上瘟则,一...
    開封第一講書人閱讀 52,156評論 1 308
  • 那天慷嗜,我揣著相機與錄音洪添,去河邊找鬼痊焊。 笑死薄啥,一個胖子當著我的面吹牛垄惧,可吹牛的內(nèi)容都是我干的到逊。 我是一名探鬼主播觉壶,決...
    沈念sama閱讀 40,743評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼争剿!你這毒婦竟也來了蚕苇?” 一聲冷哼從身側(cè)響起捆蜀,我...
    開封第一講書人閱讀 39,659評論 0 276
  • 序言:老撾萬榮一對情侶失蹤誊薄,失蹤者是張志新(化名)和其女友劉穎呢蔫,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體协屡,經(jīng)...
    沈念sama閱讀 46,200評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡肤晓,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,282評論 3 340
  • 正文 我和宋清朗相戀三年认然,在試婚紗的時候發(fā)現(xiàn)自己被綠了补憾。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,424評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡卷员,死狀恐怖盈匾,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情毕骡,我是刑警寧澤削饵,帶...
    沈念sama閱讀 36,107評論 5 349
  • 正文 年R本政府宣布,位于F島的核電站未巫,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏宰啦。R本人自食惡果不足惜师抄,卻給世界環(huán)境...
    茶點故事閱讀 41,789評論 3 333
  • 文/蒙蒙 一锋玲、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,264評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,390評論 1 271
  • 我被黑心中介騙來泰國打工擒权, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個月前我還...
    沈念sama閱讀 48,798評論 3 376
  • 正文 我出身青樓爷光,卻偏偏與公主長得像徐裸,于是被迫代替她去往敵國和親气笙。 傳聞我的和親對象是個殘疾皇子秉犹,可洞房花燭夜當晚...
    茶點故事閱讀 45,435評論 2 359

推薦閱讀更多精彩內(nèi)容