RabbitMQ整理

kafka與RabbitMQ的區(qū)別?
1错览、確認機制不同
在RabbitMQ中纫雁,消息確認是指生產(chǎn)者發(fā)送消息到RabbitMQ后,等待RabbitMQ返回確認信息倾哺,確認消息已經(jīng)被正確地接收并處理轧邪。通常情況下,【消息確認是同步進行的】即必須等到 【消息隊列】 接收到和處理了消息后羞海,才會返回確認信息忌愚。這樣可以確保消息隊列完全處理了消息,避免消息的丟失却邓,保證了數(shù)據(jù)的一致性和可靠性硕糊。
而在Kafka中,為了保證高吞吐率腊徙,沒有提供消息確認機制(at-most-once和at-least-once除外)简十,數(shù)據(jù)是異步推送到broker中的,一旦生產(chǎn)者把消息推送到broker后撬腾,就不會再對該消息進行確認處理螟蝙,消費者需要自己對消息的處理情況進行確認。在Kafka中民傻,也有一些同步的模式(例如producer.send()方法中的同步調(diào)用)胰默,可以確保發(fā)送的消息被同步復(fù)制到Kafka的所有副本,并在接收到確認后才會返回饰潜。

2初坠、實施部署不同
RabbitMQ 穩(wěn)定可靠,數(shù)據(jù)?致,?持多協(xié)議,有消息確認,基于erlang語?
Kafka ?吞吐,?性能,快速持久化,?消息確認,?消息遺漏,可能會有有重復(fù)消息,依賴于zookeeper,成本高和簸,“成本高”的含義是指 Kafka 部署和運維所需要的成本相比 RabbitMQ更高需要更加復(fù)雜的基礎(chǔ)設(shè)施來支持它的運行彭雾,這些基礎(chǔ)設(shè)施包括:
一、 集群環(huán)境成本高:Kafka 集群需要依賴 ZooKeeper 來實現(xiàn)節(jié)點的選舉和協(xié)調(diào)工作锁保,而 ZooKeeper 集群的部署和配置都比 RabbitMQ 更加復(fù)雜薯酝,需要更多的服務(wù)器資源和維護成本
二. 數(shù)據(jù)持久化成本高:Kafka 依賴磁盤的高速讀寫特性來實現(xiàn)其快速的消息持久化半沽,因此需要更高的成本來維護磁盤的健康狀態(tài)和數(shù)據(jù)備份和恢復(fù)等工作。

總的來說吴菠,Kafka 的高吞吐者填、高性能的特點是與高成本密切相關(guān)的,如果一個應(yīng)用場景對性能要求很高做葵,且具備一定的技術(shù)架構(gòu)能力占哟,那么可以選擇使用 Kafka。而如果應(yīng)用場景對可靠性和穩(wěn)定性要求較高酿矢,對【性能要求不太高榨乎,且業(yè)務(wù)規(guī)模不太大 ,那么選擇 RabbitMQ 更為合適瘫筐。

邏輯結(jié)構(gòu):
用一棟大樓類比說明
1蜜暑、Broker 是指 RabbitMQ 服務(wù)器實例,并根據(jù)預(yù)先定義的規(guī)則將消息發(fā)送到合適的隊列或 Exchange
2策肝、exchange是 RabbitMQ 中用于接收和分發(fā)消息的中間組件肛捍,它負責(zé)接收來自生產(chǎn)者的消息,并將其路由到與之相關(guān)的1個或多個隊列 (生產(chǎn)者到交換機)
3之众、隊列:房間是存放消息的地方(每個房間)
4拙毫、綁定:綁定規(guī)定了交換機和隊列之間的連接(樓層之間的通道)
5疯淫、虛擬主機:虛擬主機則是 RabbitMQ 中用于【邏輯隔離】的重要概念符衔,它類似于一個命名空間(每層樓)
可以在一個 Broker 內(nèi)部創(chuàng)建多個不同的虛擬主機车要,每個虛擬主機都有自己的命名空間疗杉,并且可以獨立配置和管理其中Exchange蒂破、Queue度苔、Binding音诫、用戶權(quán)限等

在 RabbitMQ 中是可以設(shè)置多個交換機的攻泼。通過設(shè)置不同類型的交換機以及它們之間的綁定關(guān)系竣付,可以實現(xiàn)復(fù)雜的消息路由和處理邏輯
交換機類型和工作模式都涉及消息在 RabbitMQ 中的傳遞和處理诡延,但它們的焦點和作用有所不同。交換機類型主要關(guān)注消息的路由和分發(fā)古胆,決定消息如何被發(fā)送到隊列中肆良;而工作模式主要關(guān)注消費者對消息的接收和處理方式,決定消息如何被消費

消息發(fā)送模式:
1逸绎、簡單模式:一個隊列只有一個消費者
2惹恃、工作模式:多個消費者監(jiān)聽同一個隊列,但消費者中只有一個消費者會成功消費消息(負載均衡)
3棺牧、發(fā)布/訂閱模式:一個交換機綁定多個隊列巫糙,消息同時被所有隊列消費(集體通知)
4、路由模式:它通過在交換機和隊列之間建立綁定關(guān)系颊乘,并定義路由鍵來實現(xiàn)消息的有選擇性地路由傳遞(訂閱廣告)

有一個即時消息系統(tǒng)或者新聞訂閱系統(tǒng)需要將消息或新聞推送給多個用戶参淹,這時可以采用發(fā)布/訂閱模式醉锄。生產(chǎn)者將消息發(fā)送到交換機 (exchange),交換機將消息廣播給多個訂閱者浙值,訂閱者通過綁定交換機來接收消息恳不。在這種模式下,每個訂閱者都會接收到相同的消息开呐,可以采用不同的隊列來對訂閱者進行分組烟勋,以實現(xiàn)訂閱者之間的隔離和不同級別的消息推送

  • 工作模式和發(fā)布訂閱模式的區(qū)別?
    在工作模式中筐付,多個消費者可以并行地處理來自同一個隊列的任務(wù)
    每個消息只能由一個消費者處理神妹,但是多個消息可以同時被多個消費者處理
    在發(fā)布訂閱模式中,雖然消息會被廣播到多個訂閱者家妆,但是每個訂閱者獨立地接收和處理消息鸵荠。因此,不同的訂閱者可以并行地處理自己接收到的消息

  • 路由模式和發(fā)布訂閱模式的區(qū)別伤极?
    發(fā)布訂閱Fanout(廣播)模式:在 Fanout 模式中蛹找,交換機會將消息廣播到綁定的所有隊列。無論隊列是否有不同的路由鍵或其他綁定條件哨坪,交換機都會將消息傳遞給所有綁定的隊列庸疾。
    路由Redirect(重定向)模式:在 Redirect 模式中,交換機會將消息發(fā)送到1個特定的隊列而不是廣播到所有綁定的隊列当编。這種模式適用于需要將消息定向發(fā)送到1個特定的隊列

創(chuàng)建用戶和創(chuàng)建交換機
創(chuàng)建用戶:創(chuàng)建用戶名届慈、密碼
創(chuàng)建交換機:
1、選擇虛擬主機
2忿偷、選擇交換機類型
3金顿、持久性……
在創(chuàng)建交換機時,durability 是一個可選的參數(shù)鲤桥,用于配置交換機的持久性揍拆。持久性指的是當消息代理(如 RabbitMQ)重新啟動時,交換機是否仍然存在茶凳。如果將 durability 設(shè)置為 true嫂拴,交換機將被標記為持久的,即使消息代理重新啟動贮喧,交換機也將保留筒狠。這意味著在消息代理恢復(fù)正常運行后,交換機將保持不變
如果你的應(yīng)用程序需要頻繁地更改或刪除交換機箱沦,并且你不希望在每次更改時手動刪除或重新創(chuàng)建交換機辩恼,那么可以選擇非持久化交換機
4、交換機綁定隊列
發(fā)布消息并設(shè)置消息屬性
在 RabbitMQ 的 Java 客戶端 API 中,可以在發(fā)布消息時設(shè)置這些消息屬性运挫。
下面是一個示例代碼,展示了如何設(shè)置消息的持久性套耕、優(yōu)先級和過期時間

// 創(chuàng)建連接和通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 創(chuàng)建消息屬性
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .deliveryMode(2)  // 設(shè)置消息的持久性
        .priority(5)  // 設(shè)置消息的優(yōu)先級
        .expiration("10000")  // 設(shè)置消息的過期時間
        .build();

// 發(fā)布消息
String exchangeName = "exchange1";
String routingKey = "routingKey1";
String message = "Hello, RabbitMQ!";
channel.basicPublish(exchangeName, routingKey, properties, message.getBytes());

// 關(guān)閉連接和通道
channel.close();
connection.close();

在上述代碼中谁帕,首先創(chuàng)建了連接和通道。然后使用 AMQP.BasicProperties.Builder 類來創(chuàng)建消息屬性對象冯袍,通過鏈式調(diào)用方法來設(shè)置屬性匈挖,例如使用 deliveryMode 方法設(shè)置消息的持久性(1非持久2持久),使用 priority 方法設(shè)置消息的優(yōu)先級康愤,使用 expiration 方法設(shè)置消息的過期時間儡循。最后,調(diào)用 basicPublish 方法發(fā)布消息時征冷,將消息屬性對象作為參數(shù)傳遞择膝,即可在消息中包含相應(yīng)的屬性。

使用RabbitMQ傳遞對象
發(fā)送和接收的都是字符串/字節(jié)數(shù)組類型的消息

  • 使用序列化對象

消息提供者

@Service
public class MQService {
   @Resource
   private AmqpTemplate amqpTemplate;

   public void sendGoodsToMq(Goods goods){
   //消息隊列可以發(fā)送 字符串检激、字節(jié)數(shù)組肴捉、序列化對象
   byte[] bytes = SerializationUtils.serialize(goods);
   amqpTemplate.convertAndSend("","queue1",bytes);
   }
}

消息消費者

@Component
@RabbitListener(queues = "queue1")
public class ReceiveService {

   @RabbitHandler
   public void receiveMsg(byte[] bs){
   Goods goods = (Goods) SerializationUtils.deserialize(bs);
   System.out.println("byte[]---"+goods);
   }
}

PS:為什么要實現(xiàn)序列化接口?
1、易于傳輸和存儲:將 Java 對象序列化為字節(jié)數(shù)組后叔收,可以將其轉(zhuǎn)換為不同的格式齿穗,例如 JSON 或 XML,用于傳輸?shù)讲煌南到y(tǒng)之間饺律。字節(jié)數(shù)組還可以在數(shù)據(jù)庫中進行持久化存儲窃页,以便以后檢索和使用。
2复濒、平臺無關(guān)性:通過使用序列化對象脖卖,可以將 Java 對象轉(zhuǎn)換為平臺無關(guān)的字節(jié)序列,這樣它們可以在不同的系統(tǒng)之間進行傳輸和存儲巧颈,包括跨語言和跨平臺的場景胚嘲。
3、可擴展性:可以在對象中添加新字段和屬性洛二,并保證可以與舊版本兼容馋劈,以便在不同版本的應(yīng)用程序之間進行傳輸和存儲。這可以通過維護對象的序列化版本來實現(xiàn)晾嘶。
4妓雾、高效性:通過序列化對象,可以減少在網(wǎng)絡(luò)或磁盤 I/O 期間傳輸數(shù)據(jù)的大小垒迂,從而提高傳輸效率并減少網(wǎng)絡(luò)流量械姻。這是因為序列化過程通常會去除對象中的一些額外信息或元數(shù)據(jù),只保留必要的數(shù)據(jù)以及用于恢復(fù)對象的必要信息机断。

  • 使用JSON字符串傳遞

消息提供者

@Service
public class MQService {
   @Resource
   private AmqpTemplate amqpTemplate;
   public void sendGoodsToMq(Goods goods) throws 
JsonProcessingException {
    //消息隊列可以發(fā)送 字符串楷拳、字節(jié)數(shù)組绣夺、序列化對象
    ObjectMapper objectMapper = new ObjectMapper();
    String msg = objectMapper.writeValueAsString(goods);
    amqpTemplate.convertAndSend("","queue1",msg);
  }
}

RabbitMQ事務(wù)、消息確認和return機制欢揖、手動ACK
RabbitMQ事務(wù)指的是基于客戶端實現(xiàn)的事務(wù)管理陶耍,當在消息發(fā)送過程中添加了事務(wù),處理效率降低幾十倍甚至上百倍

Connection connection = RabbitMQUtil.getConnection(); //connection 表
示與 host1的連接

Channel channel = connection.createChannel();

channel.txSelect(); //開啟事務(wù)

try{
 channel.basicPublish("ex4", "k1", null, msg.getBytes());
 channel.txCommit(); //提交事務(wù)

}catch (Exception e){
 channel.txRollback(); //事務(wù)回滾

}finally{
 channel.close();
 connection.close();
}

同步等待:為了實現(xiàn)事務(wù)的原子性她混,事務(wù)提交是一個同步操作烈钞,即發(fā)送端會等待事務(wù)提交的結(jié)果,只有在返回提交成功的響應(yīng)后才會繼續(xù)發(fā)送下一條消息坤按。
重復(fù)操作:如果發(fā)送端在發(fā)送消息的過程中出現(xiàn)錯誤或異常毯欣,事務(wù)會回滾并且消息會重發(fā),這種方式可以保證消息的可靠性傳遞
所以臭脓,在實際場景中酗钞,如果對消息的實時性要求較高或?qū)ο⑻幚淼耐掏铝坑休^高要求,建議盡量避免使用事務(wù)

PS:消息吞吐量(Message Throughput):消息吞吐量指的是系統(tǒng)在單位時間內(nèi)處理的消息量来累,對于需要高吞吐量的應(yīng)用來說算吩,高效地傳遞和處理消息是非常重要的。使用事務(wù)機制會對消息發(fā)送的性能造成一定的影響佃扼,因為事務(wù)機制需要等待事務(wù)提交的確認偎巢,會增加發(fā)送消息的延遲。

事務(wù)如何確保消息發(fā)送的可靠性?
消息的可靠性:從 生產(chǎn)者發(fā)送消息 —— 消息隊列存儲消息 —— 消費者消費消息的整個過程中消息的安全性及可控性

RabbitMQ提供了消息確認機制及return機制
意義:消息確認機制可以幫助發(fā)送方確保消息成功發(fā)送到交換機兼耀,并可以跟蹤消息是否被一個或多個消費者成功消費压昼。Return 機制則是確保了消息成功從交換機分發(fā)到隊列的過程,當消息無法成功路由到隊列時瘤运,消息會被 Return給消息提供者

  • 消息確認機制:當消息提供者將消息發(fā)送到交換機時窍霞,交換機會對消息提供者進行反饋(到達交換機)
  • return機制:當消息達到交換機之后,交換機會將消息分發(fā)到隊列拯坟,MQ會將分發(fā)的結(jié)果也會反饋給消息提供者(到達隊列)

在 RabbitMQ 中使用消息確認機制可以確保消息被成功發(fā)送到交換機但金,并被一個或多個消費者成功消費。

RabbitMQ 提供了同步和異步兩種消息確認方式

同步消息確認指的是郁季,在消息發(fā)送之后冷溃,發(fā)送方會阻塞等待確認結(jié)果。具體來說梦裂,發(fā)送方通過 waitForConfirms() 方法等待 RabbitMQ 發(fā)送確認結(jié)果似枕,直到接收到確認結(jié)果或者超時時間到達。如果確認結(jié)果為消息成功發(fā)送到交換機并被一個或多個消費者消費年柠,則 waitForConfirms()方法返回 true凿歼,否則返回 false。

異步消息確認指的是,在消息發(fā)送之后答憔,發(fā)送方不會阻塞等待確認結(jié)果味赃。相反,它通過添加ConfirmListener來異步處理確認回調(diào)虐拓。確認回調(diào)會在消息成功發(fā)送到交換機并被一個或多個消費者消費或者發(fā)送失敗時觸發(fā)心俗。

異步消息確認相較于同步消息確認,其對消息發(fā)送方更加友好侯嘀。使用異步確認的優(yōu)點是可以提高發(fā)送消息的吞吐量,因為發(fā)送方能夠繼續(xù)發(fā)送下一條消息谱轨,而不需要等待每條消息的確認結(jié)果戒幔。另外,異步確認還可以通過監(jiān)聽確認回調(diào)在消息發(fā)送失敗時及時發(fā)現(xiàn)并處理這種情況土童,而同步確認則只能通過超時等待的方式判斷消息是否發(fā)送成功诗茎。

普通maven項目的消息確認機制
下面是 RabbitMQ 中 Java 客戶端使用消息確認和批量發(fā)送消息的示例代碼,以及使用確認監(jiān)聽器異步處理確認結(jié)果:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MessageSender {
    private static final String EXCHANGE_NAME = "ex1";
    private static final String ROUTING_KEY = "a";
    private static final String MESSAGE = "Hello, RabbitMQ!";
    private static final int MESSAGE_COUNT = 10;

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //開啟消息確Z
        channel.confirmSelect();

        //批量發(fā)送消息
        for (int i=0 ; i<MESSAGE_COUNT ; i++) {
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, MESSAGE.getBytes());
        }献汗、
        //假如發(fā)送消息需要10s敢订,waitForConfirms會進?阻塞狀態(tài)(同步)
        //boolean b = channel.waitForConfirms();

        //使用監(jiān)聽器異步處理確認結(jié)果
        channel.addConfirmListener(new ConfirmListener() {
            //參數(shù)1:long deliveryTag 返回消息的標識
            //參數(shù)2:boolean multiple 是否為批量confirm
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("~~~~~消息成功發(fā)送到交換機");
            }

            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("~~~~~消息發(fā)送到交換機失敗");
            }
        });
    }
}

當調(diào)用 channel.waitForConfirms() 時,它會阻塞當前線程罢吃,直到 RabbitMQ 確認消息是否成功發(fā)送到隊列或交換機楚午。
換句話說,在以下情況下尿招,waitForConfirms() 會阻塞程序繼續(xù)執(zhí)行:

  • 當你發(fā)送一條消息后矾柜,還沒有收到 RabbitMQ 的確認消息;
  • 當發(fā)送消息時發(fā)生異常就谜,如網(wǎng)絡(luò)中斷或發(fā)送超時怪蔑;
  • 當你設(shè)置了批量發(fā)送消息的模式,并且尚未收到足夠數(shù)量的消息確認丧荐,或者超過了設(shè)置的超時時間缆瓣。
    簡而言之,waitForConfirms() 會等待確認消息的返回虹统,確保消息成功發(fā)送到 RabbitMQ弓坞。
    消息確認機制不光監(jiān)聽成功的消息同時也監(jiān)聽失敗的消息

spring項目的return機制

@Component
public class MyReturnListener implements RabbitTemplate.ReturnsCallback {
 @Autowired
 private AmqpTemplate amqpTemplate;

 @Autowired
 private RabbitTemplate rabbitTemplate;

 @PostConstruct
 public void init(){
   rabbitTemplate.setReturnsCallback(this);
 }
 @Override
  public void returnedMessage(ReturnedMessage returnedMessage) {
     System.out.println("消息從交換機分發(fā)到隊列失敗");
     String exchange = returnedMessage.getExchange();
     String routingKey = returnedMessage.getRoutingKey();
     String msg = returnedMessage.getMessage().toString();
     amqpTemplate.convertAndSend(exchange,routingKey,msg);
  }
}

在上述代碼中,returnedMessage方法是用于處理消息從交換機分發(fā)到隊列失敗的回調(diào)方法车荔。當消息無法路由到指定的隊列時昼丑,會觸發(fā)該方法。
下面是代碼中各個參數(shù)的含義:
returnedMessage對象:表示被退回的消息的相關(guān)信息夸赫。
getExchange():獲取退回消息所使用的交換機名稱菩帝。
getRoutingKey():獲取退回消息時所使用的路由鍵。
getMessage():獲取退回的消息對象,可以通過toString()方法將其轉(zhuǎn)換為字符串呼奢。
在該方法中宜雀,它首先打印了一條消息提示,表示消息從交換機分發(fā)到隊列失敗握础。然后辐董,通過 amqpTemplate.convertAndSend(exchange,routingKey, msg) 方法將消息重新發(fā)送到指定的交換機和路由鍵,以便重新分發(fā)消息

RabbitMQ消費者手動應(yīng)答(手動ACK)

@Component
@RabbitListener(queues = "queue01")
public class Consumer1 {

    @RabbitHandler
    public void process(String msg, Channel channel, Message message) throws IOException {
        try {
            System.out.println("成功接收到消息 msg = " + msg);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            System.err.println("接收消息失敗 msg = " + msg);
        }
    }
}

在上述代碼中禀综,channel.basicAck 是用于手動確認消息的方法简烘,用于告知 RabbitMQ 已經(jīng)成功處理(消費)了一個消息。下面是 channel.basicAck 方法的各個參數(shù)的詳細說明:

  1. deliveryTag:這是一個消息的唯一標識符定枷。每個消息都有一個獨特的 deliveryTag孤澎,用于標識消息的順序。通過 message.getMessageProperties().getDeliveryTag() 可以獲取當前處理消息的 deliveryTag欠窒。
    PS:每條消息在 RabbitMQ 中都會被分配一個唯一deliveryTag覆旭,用于標識消息的順序。deliveryTag 是一個連續(xù)遞增的正整數(shù)岖妄,每次消費一條消息時型将,deliveryTag 會被分配并隨消息一起傳遞給消費者。消費者在處理完一條消息后荐虐,需要對該消息進行手動 ACK 才能告知 RabbitMQ 已經(jīng)成功處理了該消息七兜,此時會將該消息的 deliveryTag 返回給 RabbitMQ 作為確認

  2. multiple:這是一個布爾值,用于指定是否確認多個消息福扬。對于簡單地確認單個消息惊搏,可以將其設(shè)置為 false。如果要確認多個消息忧换,可以將其設(shè)置為 true恬惯,這將確認所有比當前 deliveryTag 小(包括當前 deliveryTag)的未確認消息亚茬。
    PS:在消息隊列系統(tǒng)中酪耳,通常會有多個消息同時被發(fā)送到隊列中等待消費。當消費者處理消息時刹缝,通常會以一批消息的方式進行確認碗暗,而非逐條確認。因此梢夯,multiple參數(shù)的存在是為了提供一種批量確認消息的機制

    在這種情況下言疗,如果設(shè)置為 true,則表示當前消息之前的所有消息也被認為已經(jīng)處理完成颂砸,全部移除噪奄。如果設(shè)置為 false死姚,則僅確認當前 deliveryTag 的消息,然后將其從隊列中移除勤篮。通常設(shè)置為false都毒。

  3. requeue:這是一個布爾值,用于指定當確認消息時碰缔,是否將消息重新加入到隊列账劲。如果設(shè)置為 false,則代表不重新加入隊列金抡,即認為消息已被完全處理瀑焦。如果設(shè)置為 true,則代表將消息重新加入隊列梗肝,以便重新被消費榛瓮。通常情況下,如果消息處理成功统捶,則設(shè)置為 false榆芦,如果消息處理失敗柄粹,則設(shè)置為 true喘鸟。

在上述代碼中,第一個 channel.basicAck 用于確認消息已經(jīng)成功處理驻右,同時使用了 message.getMessageProperties().getDeliveryTag() 來獲取當前消息的 deliveryTag 值什黑。
而第二個 channel.basicAck 是在處理消息出現(xiàn)異常時的回退機制。它使用與第一個 channel.basicAck 相同的參數(shù)堪夭,以確認消息處理失敗愕把,然后將消息重新加入隊列(requeue 設(shè)置為 true),以便重新消費森爽。同時恨豁,使用了 message.getMessageProperties().getDeliveryTag() 獲取當前消息的 deliveryTag 值。

手動ACK的意義體現(xiàn)在以下幾個方面:
1爬迟、確遍倜郏可靠性:消費者在處理消息時,可能會發(fā)生異掣杜唬或出現(xiàn)錯誤的情況计福。如果沒有手動ACK機制,當消費者在處理消息過程中出現(xiàn)異常時徽职,消息隊列將無法得知消息是否被成功處理象颖,可能會導(dǎo)致消息的丟失重復(fù)消費。手動ACK可以確保消費者在處理消息成功后再發(fā)送確認姆钉,從而確保消息的可靠性说订。
2抄瓦、提高消息處理效率:手動ACK機制可以確保消息隊列在收到消費者的ACK確認后,【將該消息從隊列中移除】克蚂,從而減少隊列的負荷闺鲸。通過手動ACK,消費者可以在處理完一條消息后立即發(fā)送確認埃叭,減少了消息在隊列中滯留的時間摸恍,提高了消息的處理效率

手動ACK與消息確認機制和return機制之間的區(qū)別赤屋?
手動 ACK 與消息確認機制:手動 ACK 是消費者在處理完一條消息后立镶,向 RabbitMQ 發(fā)送確認信號,告知 RabbitMQ 已成功處理該消息的機制类早。通過手動 ACK媚媒,消費者可以明確地通知 RabbitMQ 消息已經(jīng)被處理,避免消息丟失重復(fù)消費的情況發(fā)生涩僻,手動ACK是消費端的機制缭召。消息確認機制則是生產(chǎn)者端的機制,用于確認消息已經(jīng)被發(fā)送到交換機并被消費者接收逆日,確保消息的可靠傳遞嵌巷。手動 ACK 和消息確認機制共同保證了消息在生產(chǎn)者和消費者之間的可靠傳遞。

手動 ACK 與消息返回機制:消息返回機制會在消息無法被路由到隊列時將消息返回給生產(chǎn)者室抽。在這種情況下搪哪,消費者可能無法接收到消息,因此消費者不會發(fā)送 ACK坪圾。手動 ACK 可以確保消費者在成功處理消息后再發(fā)送確認信號晓折,而不是直接丟棄未處理的消息。通過手動 ACK兽泄,消費者能夠在實際處理消息后再確認消息的處理情況漓概,提高了消息傳遞的完整性和可靠性

消息的冪等性問題
要在 RabbitMQ 中實現(xiàn)冪等性,可以使用 Redis 的 SETNX`(Set if Not Exists)命令來進行輔助病梢。

  • 意義:
    確保消息被處理一次而不會重復(fù)處理
    確保消息處理的唯一性和正確性

SETNX 是 Redis 提供的一種原子性命令胃珍,用于設(shè)置指定鍵的值,僅當該鍵不存在時才會設(shè)置成功飘千。在 RabbitMQ 的消息處理中堂鲜,可以將消息的唯一標識或關(guān)鍵信息作為鍵,將已處理消息的標記作為值存儲在 Redis 中护奈,并利用 SETNX 來保證只有第一次處理消息時才會成功設(shè)置標記缔莲,從而保證消息的冪等性

下面是一個示例代碼:

@Component
@RabbitListener(queues = "queue01")
public class Consumer1 {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    private final String LOCK_PREFIX = "messageLock:";
    private final long LOCK_EXPIRE_TIME = 60 * 1000; // 鎖的過期時間,單位毫秒

    @RabbitHandler
    public void process(String msg, Channel channel, Message message) throws IOException {
        // 使用消息的唯一標識作為鎖的 key
        String lockKey = LOCK_PREFIX + message.getMessageProperties().getDeliveryTag();
        Boolean isLocked = false;

        try {
            // 嘗試獲取鎖
            isLocked = redisTemplate.opsForValue().setIfAbsent(lockKey, "");
            // 設(shè)置鎖的過期時間
            redisTemplate.expire(lockKey, LOCK_EXPIRE_TIME, TimeUnit.MILLISECONDS);

            // 如果成功獲取到了鎖霉旗,則處理消息
            if (isLocked) {
                System.out.println("成功獲取到鎖并處理消息 msg = " + msg);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                System.out.println("未獲取到鎖痴奏,消息已被其他消費者處理 msg = " + msg);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            System.err.println("處理消息失敗 msg = " + msg);
        } finally {
            // 釋放鎖
            if (isLocked) {
                redisTemplate.delete(lockKey);
            }
        }
    }
}
  1. 添加了 RedisTemplate<String, Object> 對象的自動裝配蛀骇,該對象用于與 Redis 進行交互。
  2. 聲明了一個以 messageLock: 作為前綴的鎖定鍵读拆,使用消息的唯一標識作為鎖的唯一標識擅憔。
  3. 使用 RedisTemplateopsForValue().setIfAbsent() 方法嘗試獲取鎖,如果成功獲取到鎖則繼續(xù)處理該消息檐晕,否則說明該消息已被其他消費者處理暑诸。
  4. 使用 redisTemplate.expire() 設(shè)置鎖的過期時間,避免死鎖辟灰。
  5. finally 塊中个榕,如果成功獲取到了鎖,則釋放鎖芥喇。

延遲機制
未完待續(xù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末西采,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子继控,更是在濱河造成了極大的恐慌械馆,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件武通,死亡現(xiàn)場離奇詭異霹崎,居然都是意外死亡,警方通過查閱死者的電腦和手機厅须,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進店門仿畸,熙熙樓的掌柜王于貴愁眉苦臉地迎上來食棕,“玉大人朗和,你說我怎么就攤上這事〔鞠” “怎么了眶拉?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長憔儿。 經(jīng)常有香客問我忆植,道長,這世上最難降的妖魔是什么谒臼? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任朝刊,我火速辦了婚禮,結(jié)果婚禮上蜈缤,老公的妹妹穿的比我還像新娘拾氓。我一直安慰自己,他們只是感情好底哥,可當我...
    茶點故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布咙鞍。 她就那樣靜靜地躺著房官,像睡著了一般。 火紅的嫁衣襯著肌膚如雪续滋。 梳的紋絲不亂的頭發(fā)上翰守,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天,我揣著相機與錄音疲酌,去河邊找鬼蜡峰。 笑死,一個胖子當著我的面吹牛朗恳,可吹牛的內(nèi)容都是我干的事示。 我是一名探鬼主播,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼僻肖,長吁一口氣:“原來是場噩夢啊……” “哼肖爵!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起臀脏,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤劝堪,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后揉稚,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體秒啦,經(jīng)...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年搀玖,在試婚紗的時候發(fā)現(xiàn)自己被綠了余境。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,137評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡灌诅,死狀恐怖芳来,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情猜拾,我是刑警寧澤即舌,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布,位于F島的核電站挎袜,受9級特大地震影響顽聂,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜盯仪,卻給世界環(huán)境...
    茶點故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一紊搪、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧全景,春花似錦耀石、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽奔浅。三九已至,卻和暖如春诗良,著一層夾襖步出監(jiān)牢的瞬間汹桦,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工鉴裹, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留舞骆,地道東北人。 一個月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓径荔,卻偏偏與公主長得像督禽,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子总处,可洞房花燭夜當晚...
    茶點故事閱讀 42,901評論 2 345

推薦閱讀更多精彩內(nèi)容

  • 悲觀鎖 有鎖狈惫,每次只有一個人能訪問某個數(shù)據(jù),這和Golang中的互斥鎖一樣鹦马。 如果是在分布式下使用鎖胧谈,可以使用如 ...
    bysir閱讀 533評論 0 1
  • 什么是冪等性? HTTP/1.1中對冪等性的定義是:一次和多次請求某一個資源對于資源本身應(yīng)該具有同樣的結(jié)果(網(wǎng)絡(luò)超...
    侯華明閱讀 1,346評論 0 0
  • 一分鐘教你知道樂觀鎖和悲觀鎖的區(qū)別 悲觀鎖(Pessimistic Lock), 顧名思義,就是很悲觀旭从,每次去拿數(shù)...
    php紅薯閱讀 4,542評論 2 58
  • 樂觀鎖和悲觀鎖 樂觀鎖:使用無鎖結(jié)構(gòu),無非是對發(fā)生沖突保有樂觀態(tài)度摹闽,覺得大多數(shù)情況下沖突不會發(fā)生蹄咖,一旦發(fā)生就采取重...
    L千年老妖閱讀 2,275評論 0 0
  • 在數(shù)據(jù)庫的鎖機制中介紹過褐健,數(shù)據(jù)庫管理系統(tǒng)(DBMS)中的并發(fā)控制的任務(wù)是確保在多個事務(wù)同時存取數(shù)據(jù)庫中同一數(shù)據(jù)時不...
    java成功之路閱讀 289評論 0 1