RabbitMQ筆記十五:消息確認(rèn)之一(Publisher Confirms)

問題

企業(yè)中使用消息中間件面臨的常見問題:
1.消息莫名其妙的沒了旭旭,也不知道什么情況盖彭,有丟消息的問題。
2.發(fā)送者沒法確認(rèn)是否發(fā)送成功蚀乔,消費(fèi)者處理失敗也無法反饋烁竭。

消息可靠性的二種方式
1.事務(wù),利用AMQP協(xié)議的一部分吉挣,發(fā)送消息前設(shè)置channel為tx模式(channel.txSelect();)派撕,如果txCommit提交成功了,則消息一定到達(dá)了broker了睬魂,如果在txCommit執(zhí)行之前broker異常崩潰或者由于其他原因拋出異常终吼,這個(gè)時(shí)候我們便可以捕獲異常通過txRollback回滾事務(wù)了。(大大得削弱消息中間件的性能)
2.消息確認(rèn)(publish confirms)氯哮,設(shè)置管道為confirmSelect模式(channel.confirmSelect();)

publisher confirms,consumer Acknowledgements

生產(chǎn)者與broker之間的消息確認(rèn)稱為public confirms际跪,public confirms機(jī)制用于解決生產(chǎn)者與Rabbitmq服務(wù)器之間消息可靠傳輸,它在消息服務(wù)器持久化消息后通知消息生產(chǎn)者發(fā)送成功喉钢。

發(fā)送確認(rèn)(publisher confirms)

RabbitMQ java Client實(shí)現(xiàn)發(fā)送確認(rèn)

deliveryTag(投遞的標(biāo)識(shí))垫卤,當(dāng)Channel設(shè)置成confirm模式時(shí),發(fā)布的每一條消息都會(huì)獲得一個(gè)唯一的deliveryTag出牧,任何channel上發(fā)布的第一條消息的deliveryTag為1,此后的每一條消息都會(huì)加1歇盼,deliveryTag在channel范圍內(nèi)是唯一的舔痕。

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;

public class Send {

    static Long id = 0L;

    static TreeSet<Long> tags = new TreeSet<>();

    public static Long send(Channel channel,byte[] bytes) throws Exception{
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).
                contentEncoding("UTF-8").build();
        channel.basicPublish("zhihao.direct.exchange","zhihao.miao.order",properties,bytes);
        return ++id;
    }


    public static void main(String[] args) throws Exception{
        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        //是當(dāng)前的channel處于確認(rèn)模式
        channel.confirmSelect();

        //使當(dāng)前的channel處于事務(wù)模式,與上面的使channel處于確認(rèn)模式使互斥的
        //channel.txSelect();

        /**
         * deliveryTag 消息id
         * multiple 是否批量
         *      如果是true豹缀,就意味著伯复,小于等于deliveryTag的消息都處理成功了
         *      如果是false,只是成功了deliveryTag這一條消息
         */
        channel.addConfirmListener(new ConfirmListener() {
            //消息發(fā)送成功并且在broker落地邢笙,deliveryTag是唯一標(biāo)志符啸如,在channek上發(fā)布的消息的deliveryTag都會(huì)比之前加1
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("=========deliveryTag==========");
                System.out.println("deliveryTag: "+deliveryTag);
                System.out.println("multiple: "+multiple);
                //處理成功發(fā)送的消息
                if(multiple){
                    //批量操作
                    for(Long _id:new TreeSet<>(tags.headSet(deliveryTag+1))){
                        tags.remove(_id);
                    }
                }else{
                    //單個(gè)確認(rèn)
                    tags.remove(deliveryTag);
                }

                System.out.println("未處理的消息: "+tags);
            }

            /**
             * deliveryTag 消息id
             * multiple 是否批量
             *      如果是true,就意味著氮惯,小于等于deliveryTag的消息都處理失敗了
             *      如果是false叮雳,只是失敗了deliveryTag這一條消息
             */
            //消息發(fā)送失敗或者落地失敗
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("===========handleNack===========");
                System.out.println("deliveryTag: "+deliveryTag);
                System.out.println("multiple: "+multiple);
            }
        });

        /**
         * 當(dāng)Channel設(shè)置成confirm模式時(shí)想暗,發(fā)布的每一條消息都會(huì)獲得一個(gè)唯一的deliveryTag
         * deliveryTag在basicPublish執(zhí)行的時(shí)候加1
         */


        Long id = send(channel,"你的外賣已經(jīng)送達(dá)".getBytes());
        tags.add(id);
        //channel.waitForConfirms();

        id =send(channel,"你的外賣已經(jīng)送達(dá)".getBytes());
        tags.add(id);
        //channel.waitForConfirms();

        id = send(channel,"呵呵,不接電話".getBytes());
        tags.add(id);
        //channel.waitForConfirms();  

        TimeUnit.SECONDS.sleep(10);

        channel.close();
        connection.close();
    }
}

channel.waitForConfirms():表示等待已經(jīng)發(fā)送給broker的消息act或者nack之后才會(huì)繼續(xù)執(zhí)行帘不。
channel.waitForConfirmsOrDie():表示等待已經(jīng)發(fā)送給broker的消息act或者nack之后才會(huì)繼續(xù)執(zhí)行说莫,如果有任何一個(gè)消息觸發(fā)了nack則拋出IOException。

總結(jié)
生產(chǎn)者與broker之間的消息可靠性保證的基本思路就是

  • 當(dāng)消息發(fā)送到broker的時(shí)候寞焙,會(huì)執(zhí)行監(jiān)聽的回調(diào)函數(shù)储狭,其中deliveryTag是消息id(在同一個(gè)channel中這個(gè)數(shù)值是遞增的,而multiple表示是否批量確認(rèn)消息捣郊。
  • 在生產(chǎn)端要維護(hù)一個(gè)消息發(fā)送的表辽狈,消息發(fā)送的時(shí)候記錄消息id,在消息成功落地broker磁盤并且進(jìn)行回調(diào)確認(rèn)(ack)的時(shí)候呛牲,根據(jù)本地消息表和回調(diào)確認(rèn)的消息id進(jìn)行對(duì)比刮萌,這樣可以確保生產(chǎn)端的消息表中的沒有進(jìn)行回調(diào)確認(rèn)(或者回調(diào)確認(rèn)時(shí)網(wǎng)絡(luò)問題)的消息進(jìn)行補(bǔ)救式的重發(fā),當(dāng)然不可避免的就會(huì)在消息端可能會(huì)造成消息的重復(fù)消息侈净。針對(duì)消費(fèi)端重復(fù)消息尊勿,在消費(fèi)端進(jìn)行冪等處理。(丟消息和重復(fù)消息是不可避免的二個(gè)極端畜侦,比起丟消息元扔,重復(fù)消息還有補(bǔ)救措施,而消息丟失就真的丟失了旋膳。

Spring AMQP實(shí)現(xiàn)實(shí)現(xiàn)發(fā)送確認(rèn)

示列
定義消息內(nèi)容

public class Order {

    private String orderId;

    private String createTime;

    private double price;

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public String getCreateTime() {
        return createTime;
    }

    public void setCreateTime(String createTime) {
        this.createTime = createTime;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }
}

配置項(xiàng):

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
        factory.setPublisherConfirms(true);
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

            /**
             * @param correlationData 唯一標(biāo)識(shí)澎语,有了這個(gè)唯一標(biāo)識(shí),我們就知道可以確認(rèn)(失斞榘谩)哪一條消息了
             * @param ack
             * @param cause
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("=====消息進(jìn)行消費(fèi)了======");
                if(ack){
                    System.out.println("消息id為: "+correlationData+"的消息擅羞,已經(jīng)被ack成功");
                }else{
                    System.out.println("消息id為: "+correlationData+"的消息,消息nack义图,失敗原因是:"+cause);
                }
            }
        });
        return rabbitTemplate;
    }

}

啟動(dòng)應(yīng)用類:

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

@ComponentScan
public class Application {

    public static Order createOrder(){
        Order order = new Order();
        order.setOrderId(UUID.randomUUID().toString());
        order.setCreateTime(LocalDateTime.now().toString());
        order.setPrice(100L);
        return order;
    }

    public static void saveOrder(Order order){
        //入庫操作
        System.out.println("入庫操作");
    }

    public static void main(String[] args) throws Exception{
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);

        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

        Order order  = createOrder();

        saveOrder(order);

        ObjectMapper objectMapper = new ObjectMapper();
        byte[] body = objectMapper.writeValueAsBytes(order);

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("json");

        Message message = new Message(body,messageProperties);

        System.out.println("id: "+order.getOrderId());

        //指定correlationData的值
        rabbitTemplate.send("zhihao.direct.exchange","zhihao.miao.order",message,new CorrelationData(order.getOrderId().toString()));

        TimeUnit.SECONDS.sleep(10);

        context.close();
    }
}

控制臺(tái)打釉群濉:

入庫操作
id: 11bc9eb3-fbcb-4777-9596-b6f6db81cafc
十月 22, 2017 7:14:14 下午 org.springframework.amqp.rabbit.connection.CachingConnectionFactory createBareConnection
信息: Created new connection: connectionFactory#50ad3bc1:0/SimpleConnection@4efc180e [delegate=amqp://zhihao.miao@192.168.1.131:5672/, localPort= 61095]
=====消息進(jìn)行消費(fèi)了======
消息id為: CorrelationData [id=11bc9eb3-fbcb-4777-9596-b6f6db81cafc]的消息,已經(jīng)被ack成功

原理其實(shí)和java client是一樣的舰讹,我們?cè)诎l(fā)送消息的時(shí)候落地本地的消息表(有表示confirm字段)北苟,然后進(jìn)行回調(diào)確認(rèn)的方法中進(jìn)行狀態(tài)的更新,最后輪詢表中狀態(tài)不正確的消息進(jìn)行輪詢重發(fā)怕篷。

步驟

  • 在容器中的ConnectionFactory實(shí)例中加上setPublisherConfirms屬性
    factory.setPublisherConfirms(true);
  • 在RabbitTemplate實(shí)例中增加setConfirmCallback回調(diào)方法历筝。
  • 發(fā)送消息的時(shí)候,需要指定CorrelationData廊谓,用于標(biāo)識(shí)該發(fā)送的唯一id梳猪。

對(duì)比與java client的publisher confirm:
1.spring amqp不支持批量確認(rèn),底層的rabbitmq java client方式支持批量確認(rèn)蒸痹。
2.spring amqp提供的方式更加的簡單明了春弥。

參考資料

關(guān)于另外一種Publisher Confirms事務(wù)機(jī)制可以參考下面這篇博客呛哟,很是簡單
深入學(xué)習(xí)RabbitMQ(二):AMQP事務(wù)機(jī)制

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市惕稻,隨后出現(xiàn)的幾起案子竖共,更是在濱河造成了極大的恐慌,老刑警劉巖俺祠,帶你破解...
    沈念sama閱讀 223,207評(píng)論 6 521
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件公给,死亡現(xiàn)場離奇詭異,居然都是意外死亡蜘渣,警方通過查閱死者的電腦和手機(jī)淌铐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,455評(píng)論 3 400
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蔫缸,“玉大人腿准,你說我怎么就攤上這事∈奥担” “怎么了吐葱?”我有些...
    開封第一講書人閱讀 170,031評(píng)論 0 366
  • 文/不壞的土叔 我叫張陵,是天一觀的道長校翔。 經(jīng)常有香客問我弟跑,道長,這世上最難降的妖魔是什么防症? 我笑而不...
    開封第一講書人閱讀 60,334評(píng)論 1 300
  • 正文 為了忘掉前任孟辑,我火速辦了婚禮,結(jié)果婚禮上蔫敲,老公的妹妹穿的比我還像新娘饲嗽。我一直安慰自己,他們只是感情好奈嘿,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,322評(píng)論 6 398
  • 文/花漫 我一把揭開白布貌虾。 她就那樣靜靜地躺著,像睡著了一般裙犹。 火紅的嫁衣襯著肌膚如雪酝惧。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,895評(píng)論 1 314
  • 那天伯诬,我揣著相機(jī)與錄音,去河邊找鬼巫财。 笑死盗似,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的平项。 我是一名探鬼主播赫舒,決...
    沈念sama閱讀 41,300評(píng)論 3 424
  • 文/蒼蘭香墨 我猛地睜開眼悍及,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼!你這毒婦竟也來了接癌?” 一聲冷哼從身側(cè)響起心赶,我...
    開封第一講書人閱讀 40,264評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎缺猛,沒想到半個(gè)月后缨叫,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,784評(píng)論 1 321
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡荔燎,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,870評(píng)論 3 343
  • 正文 我和宋清朗相戀三年耻姥,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片有咨。...
    茶點(diǎn)故事閱讀 40,989評(píng)論 1 354
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡琐簇,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出座享,到底是詐尸還是另有隱情婉商,我是刑警寧澤,帶...
    沈念sama閱讀 36,649評(píng)論 5 351
  • 正文 年R本政府宣布渣叛,位于F島的核電站丈秩,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏诗箍。R本人自食惡果不足惜癣籽,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,331評(píng)論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望滤祖。 院中可真熱鬧筷狼,春花似錦、人聲如沸匠童。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,814評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽汤求。三九已至俏险,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間扬绪,已是汗流浹背竖独。 一陣腳步聲響...
    開封第一講書人閱讀 33,940評(píng)論 1 275
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留挤牛,地道東北人莹痢。 一個(gè)月前我還...
    沈念sama閱讀 49,452評(píng)論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親竞膳。 傳聞我的和親對(duì)象是個(gè)殘疾皇子航瞭,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,995評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容

  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化坦辟、事務(wù)刊侯、擁塞控...
    jiangmo閱讀 10,369評(píng)論 2 34
  • 1.什么是消息隊(duì)列 消息隊(duì)列允許應(yīng)用間通過消息的發(fā)送與接收的方式進(jìn)行通信,當(dāng)消息接收方服務(wù)忙或不可用時(shí)锉走,其提供了一...
    zhuke閱讀 4,476評(píng)論 0 12
  • 本文章翻譯自http://www.rabbitmq.com/api-guide.html滨彻,并沒有及時(shí)更新。 術(shù)語對(duì)...
    joyenlee閱讀 7,678評(píng)論 0 3
  • 1. 歷史 RabbitMQ是一個(gè)由erlang開發(fā)的AMQP(Advanced Message Queue )的...
    高廣超閱讀 6,098評(píng)論 3 51
  • 關(guān)于消息隊(duì)列挠日,從前年開始斷斷續(xù)續(xù)看了些資料疮绷,想寫很久了,但一直沒騰出空嚣潜,近來分別碰到幾個(gè)朋友聊這塊的技術(shù)選型冬骚,是時(shí)...
    預(yù)流閱讀 584,978評(píng)論 51 786