RabbitMq 消息發(fā)布/確認(rèn) 以及對(duì)確認(rèn)失敗的消息的處理

這部分沒有涉及到交換機(jī),所以一個(gè)消息只能被消費(fèi)一次,多個(gè)消費(fèi)者之間是競爭關(guān)系


image.png

1、連接rabbitMq

pom文件


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.14.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.11.0</version>
        </dependency>


    </dependencies>

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * @Author: yokipang
 * @Date: 2022/5/10
 * 連接工廠創(chuàng)建信道
 */
public class RabbitMqUtils {
    public static Channel getChannel() throws  Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.1.xx");
        factory.setUsername("xxx");
        factory.setPassword("xxxxx");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return  channel;
    }

}

2贤惯、具體方法

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import utils.RabbitMqUtils;

import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

public class ConfirmMessage {

    public static final int message_count = 1000;


    public static void main(String[] args) throws Exception {
        //單個(gè)確認(rèn) 690ms
        //publishMessage1();

        //批量確認(rèn) 160ms
        //publishMessage2();

        //異步批量確認(rèn)
        publishMessage3();
    }

    //單個(gè)確認(rèn)
    public static  void publishMessage1() throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        String queueName = UUID.randomUUID().toString();
        /**
         * 聲明隊(duì)列
         * 參數(shù)1 隊(duì)列名稱
         * 參數(shù)2 消息是否持久化
         * 參數(shù)3 是否可以由多個(gè)消費(fèi)者消費(fèi)
         * 參數(shù)4 是否自動(dòng)刪除
         * 參數(shù)5 其他
         */
        channel.queueDeclare(queueName,true,false,false,null);

        //開啟發(fā)布、確認(rèn)
        channel.confirmSelect();

        //開始時(shí)間
        long begin = System.currentTimeMillis();

        for (int i = 0 ; i<message_count;i++){
            String message = i+"";
            channel.basicPublish("",queueName,null,message.getBytes());
            boolean flag = channel.waitForConfirms();
            if(flag){
                System.out.println("消息發(fā)送成功");
            }
        }

        long end = System.currentTimeMillis();

        System.out.println("所需時(shí)間:"+(end-begin)+"ms");

    }


    //批量發(fā)布確認(rèn)
    public static  void publishMessage2() throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        String queueName = UUID.randomUUID().toString();
        /**
         * 聲明隊(duì)列
         * 參數(shù)1 隊(duì)列名稱
         * 參數(shù)2 消息是否持久化
         * 參數(shù)3 是否可以由多個(gè)消費(fèi)者消費(fèi)
         * 參數(shù)4 是否自動(dòng)刪除
         * 參數(shù)5 其他
         */
        channel.queueDeclare(queueName,true,false,false,null);

        //開啟發(fā)布棒掠、確認(rèn)
        channel.confirmSelect();

        //開始時(shí)間
        long begin = System.currentTimeMillis();

        //批量確認(rèn)數(shù)量
        int batchSize = 100;


        for (int i = 0 ; i<message_count;i++){
            String message = i+"";
            channel.basicPublish("",queueName,null,message.getBytes());

            if( i % batchSize == 0){
                channel.waitForConfirms();
            }
        }

        long end = System.currentTimeMillis();
        System.out.println("所需時(shí)間:"+(end-begin)+"ms");

    }



    //異步批量發(fā)布確認(rèn)
    public static  void publishMessage3() throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        String queueName = UUID.randomUUID().toString();
        /**
         * 聲明隊(duì)列
         * 參數(shù)1 隊(duì)列名稱
         * 參數(shù)2 消息是否持久化
         * 參數(shù)3 是否可以由多個(gè)消費(fèi)者消費(fèi)
         * 參數(shù)4 是否自動(dòng)刪除
         * 參數(shù)5 其他
         */
        channel.queueDeclare(queueName,true,false,false,null);
        //開啟發(fā)布孵构、確認(rèn)
        channel.confirmSelect();

        /**
         * 線程安全有序的哈希表 適用于高并發(fā)情況
         * 1、輕松地將序號(hào)與消息進(jìn)行關(guān)聯(lián)
         * 2烟很、輕松的批量刪除條目 只要給到序號(hào)
         * 3颈墅、支持高并發(fā)(多線程)
         */
        ConcurrentSkipListMap<Long,String> concurrentSkipListMap = new ConcurrentSkipListMap<>();

        //消息發(fā)送前準(zhǔn)備監(jiān)聽器 監(jiān)聽消息發(fā)送狀態(tài)
        /**
         * 消息確認(rèn)成功函數(shù)
         * 1、消息標(biāo)記
         * 2溯职、是否批量
         */
        ConfirmCallback ackConfirmCallback =(deliveryTag,multiple)->{
            //是否批量處理
            if(multiple){
                //刪除掉已確認(rèn)的消息 剩下的就是未成功發(fā)送的消息
                ConcurrentNavigableMap<Long,String> confirmd = concurrentSkipListMap.headMap(deliveryTag);
                confirmd.clear();
            }else{
                concurrentSkipListMap.remove(deliveryTag);
            }
            System.out.println("確認(rèn)成功的消息:"+ deliveryTag);
        };

        /**
         * 消息確認(rèn)失敗函數(shù)
         * 1精盅、消息標(biāo)記
         * 2帽哑、是否批量
         */
        ConfirmCallback notAckConfirmCallback =(deliveryTag,multiple)->{
            String message = concurrentSkipListMap.get(deliveryTag);
            System.out.println("確認(rèn)失敗的消息:"+message +"   消息標(biāo)記" + deliveryTag);
        };

        //開始時(shí)間
        long begin = System.currentTimeMillis();
        //監(jiān)聽器
        channel.addConfirmListener(ackConfirmCallback,notAckConfirmCallback);

        //發(fā)送消息
        for (int i = 0 ; i<message_count;i++){
            String message = i+"";
            channel.basicPublish("",queueName,null,message.getBytes());
            //記錄所有要發(fā)送的消息
            concurrentSkipListMap.put(channel.getNextPublishSeqNo(),message);
        }

        long end = System.currentTimeMillis();
        System.out.println("所需時(shí)間:"+(end-begin)+"ms");

    }
}


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末谜酒,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子妻枕,更是在濱河造成了極大的恐慌僻族,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,599評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件屡谐,死亡現(xiàn)場(chǎng)離奇詭異述么,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)愕掏,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,629評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門度秘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人饵撑,你說我怎么就攤上這事剑梳。” “怎么了滑潘?”我有些...
    開封第一講書人閱讀 158,084評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵垢乙,是天一觀的道長。 經(jīng)常有香客問我语卤,道長追逮,這世上最難降的妖魔是什么酪刀? 我笑而不...
    開封第一講書人閱讀 56,708評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮钮孵,結(jié)果婚禮上骂倘,老公的妹妹穿的比我還像新娘。我一直安慰自己巴席,他們只是感情好稠茂,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,813評(píng)論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著情妖,像睡著了一般睬关。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上毡证,一...
    開封第一講書人閱讀 50,021評(píng)論 1 291
  • 那天电爹,我揣著相機(jī)與錄音,去河邊找鬼料睛。 笑死丐箩,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的恤煞。 我是一名探鬼主播屎勘,決...
    沈念sama閱讀 39,120評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼居扒!你這毒婦竟也來了概漱?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,866評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤喜喂,失蹤者是張志新(化名)和其女友劉穎瓤摧,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體玉吁,經(jīng)...
    沈念sama閱讀 44,308評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡照弥,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,633評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了进副。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片这揣。...
    茶點(diǎn)故事閱讀 38,768評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖影斑,靈堂內(nèi)的尸體忽然破棺而出给赞,到底是詐尸還是另有隱情,我是刑警寧澤鸥昏,帶...
    沈念sama閱讀 34,461評(píng)論 4 333
  • 正文 年R本政府宣布塞俱,位于F島的核電站,受9級(jí)特大地震影響吏垮,放射性物質(zhì)發(fā)生泄漏障涯。R本人自食惡果不足惜罐旗,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,094評(píng)論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望唯蝶。 院中可真熱鬧九秀,春花似錦、人聲如沸粘我。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,850評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽征字。三九已至都弹,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間匙姜,已是汗流浹背畅厢。 一陣腳步聲響...
    開封第一講書人閱讀 32,082評(píng)論 1 267
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留氮昧,地道東北人框杜。 一個(gè)月前我還...
    沈念sama閱讀 46,571評(píng)論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像袖肥,于是被迫代替她去往敵國和親咪辱。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,666評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容