RocketMq訂閱與發(fā)送消息

使用阿里云的rocketMq訂閱與發(fā)送消息,直接上代碼:

1.引入pom依賴

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.8.Final</version>
</dependency>

2.新建RocketMqUtil類

package com.**.***.***.utils;

import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.***.***.***.listener.MqMessageListener;
import com.***.***.***.listener.MqTimeMessageListener;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Slf4j
@Configuration
public class RocketMqUtil {

    @Value("${rocketMq.groupId}")
    private String groupId;

    @Value("${rocketMq.accessKey}")
    private String accessKey;

    @Value("${rocketMq.secretKey}")
    private String secretKey;

    @Value("${rocketMq.nameSrvAddr}")
    private String nameSrvAddr;

    @Value("${rocketMq.topic}")
    private String topic;

    @Value("${rocketMq.startApprovalTag}")
    private String startApprovalTag;

    @Value("${rocketMq.timeout}")
    private String timeout;

    @Autowired
    private MqMessageListener messageListener;

    @Autowired
    private MqTimeMessageListener timeMessageListener;

    /**
     * 創(chuàng)建消息生產(chǎn)者
     * @return
     */
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public Producer producer() {
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);
        properties.put(PropertyKeyConst.AccessKey, accessKey);
        properties.put(PropertyKeyConst.SecretKey, secretKey);
        properties.put(PropertyKeyConst.GROUP_ID, groupId);
        properties.put(PropertyKeyConst.SendMsgTimeoutMillis, timeout);

        ProducerBean producer = new ProducerBean();
        producer.setProperties(properties);

        log.info("rocketMq創(chuàng)建生產(chǎn)者成功");
        return producer;
    }


    /**
     * 創(chuàng)建消息訂閱
     * @return
     */
    @Bean(initMethod = "start")
    public ConsumerBean consumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, groupId);
        properties.put(PropertyKeyConst.AccessKey, accessKey);
        properties.put(PropertyKeyConst.SecretKey, secretKey);
        properties.put(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);

        consumerBean.setProperties(properties);

        // 訂閱消息
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
        // 訂閱普通消息
        Subscription subscription = new Subscription();
        subscription.setTopic(topic);
        subscription.setExpression("start_approve||process_approve");
        subscriptionTable.put(subscription, messageListener);
        // 訂閱定時/延時消息
        Subscription subscriptionTime = new Subscription();
        subscriptionTime.setTopic(topic);
        subscriptionTime.setExpression("start_approve||process_approve");
        subscriptionTable.put(subscriptionTime, timeMessageListener);

        consumerBean.setSubscriptionTable(subscriptionTable);
        log.info("rocketMq訂閱成功");
        return consumerBean;
    }
}

3.普通消息監(jiān)聽類MqMessageListener

package com.***.***.***.listener;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;


@Component
public class MqMessageListener implements MessageListener {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public Action consume(Message message, ConsumeContext context) {
        logger.info("接收到MQ普通消息 -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
                    message.getTopic(), message.getTag(), message.getMsgID(),message.getKey(), new String(message.getBody()));
        try {
            // 處理業(yè)務
            return Action.CommitMessage;
        } catch (Exception e) {
            logger.error("消費MQ消息失敗耍休! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage());
            return Action.ReconsumeLater;
        }
    }

}

4.異步/定時/延時消息監(jiān)聽類

package com.***.***.***.listener;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;


@Component
public class MqTimeMessageListener implements MessageListener {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Value("${rocketMq.startedApprovalTag}")
    private String startedApprovalTag;

    @Value("${rocketMq.processedApprove}")
    private String processedApprove;


    @Override
    public Action consume(Message message, ConsumeContext context) {
        logger.info("接收到MQ定時/延時消息 -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
                    message.getTopic(), message.getTag(), message.getMsgID(),message.getKey(), new String(message.getBody()));


        try {
            // 處理業(yè)務

        } catch (Exception e) {

            logger.error("消費MQ消息失斄⌒蟆崭添! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage());
        } 
        return Action.CommitMessage;
    }




}

5.消息發(fā)送RocketMqProducer

package com.***.***.***.utils;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.shade.com.alibaba.fastjson.JSON;
import java.util.Properties;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;


@Component
@Slf4j
public class RocketMqProducer {

    @Value("${rocketMq.groupId}")
    private String groupId;

    @Value("${rocketMq.sendMegGroup}")
    private String sendMegGroup;

    @Value("${rocketMq.accessKey}")
    private String accessKey;

    @Value("${rocketMq.secretKey}")
    private String secretKey;

    @Value("${rocketMq.nameSrvAddr}")
    private String nameSrvAddr;

    @Value("${rocketMq.topic}")
    private String topic;

    @Value("${rocketMq.startApprovalTag}")
    private String startApprovalTag;

    @Value("${rocketMq.timeout}")
    private String timeout;

    @Resource
    private ProducerBean producer;


    /**
     * 發(fā)送異步消息
     * @param tag
     * @param msgKey
     * @param messageBody
     */
    public void sendAsyncMsg(String tag, String msgKey, byte[] messageBody) {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, sendMegGroup);
        properties.put(PropertyKeyConst.AccessKey, accessKey);
        properties.put(PropertyKeyConst.SecretKey, secretKey);
        properties.put(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, timeout);

        Message msg = new Message(topic, tag, msgKey, messageBody);
        try {
            producer.sendAsync(msg, new SendCallback() {
                @Override
                public void onSuccess(final SendResult sendResult) {
                    log.info("rocketMq發(fā)送成功澈蟆,msg:{}", JSON.toJSONString(msg));
                }

                @Override
                public void onException(final OnExceptionContext context) {
                    log.info("rocketMq發(fā)送失斚韬觥:tag:{},topic:{},body:{}", tag, context.getTopic(), new String(messageBody), context.getException());
                    // todo 持久化失敗消息雾家,定時補償
                }
            });
        } catch (ONSClientException e) {
            log.info("rocketMq發(fā)送異常:", e);
        }
    }
}


6.調(diào)用消息發(fā)送

@Autowired
private RocketMqProducer rocketMqProducer;
rocketMqProducer.sendAsyncMsg("tag", "msgKey", "msg";
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末铃彰,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子芯咧,更是在濱河造成了極大的恐慌牙捉,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,734評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件敬飒,死亡現(xiàn)場離奇詭異邪铲,居然都是意外死亡,警方通過查閱死者的電腦和手機无拗,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評論 3 394
  • 文/潘曉璐 我一進店門带到,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人英染,你說我怎么就攤上這事揽惹。” “怎么了四康?”我有些...
    開封第一講書人閱讀 164,133評論 0 354
  • 文/不壞的土叔 我叫張陵搪搏,是天一觀的道長。 經(jīng)常有香客問我闪金,道長疯溺,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,532評論 1 293
  • 正文 為了忘掉前任哎垦,我火速辦了婚禮喝检,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘撼泛。我一直安慰自己挠说,他們只是感情好,可當我...
    茶點故事閱讀 67,585評論 6 392
  • 文/花漫 我一把揭開白布愿题。 她就那樣靜靜地躺著损俭,像睡著了一般蛙奖。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上杆兵,一...
    開封第一講書人閱讀 51,462評論 1 302
  • 那天雁仲,我揣著相機與錄音,去河邊找鬼琐脏。 笑死攒砖,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的日裙。 我是一名探鬼主播吹艇,決...
    沈念sama閱讀 40,262評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼昂拂!你這毒婦竟也來了受神?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,153評論 0 276
  • 序言:老撾萬榮一對情侶失蹤格侯,失蹤者是張志新(化名)和其女友劉穎鼻听,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體联四,經(jīng)...
    沈念sama閱讀 45,587評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡撑碴,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,792評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了朝墩。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片醉拓。...
    茶點故事閱讀 39,919評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖鱼辙,靈堂內(nèi)的尸體忽然破棺而出廉嚼,到底是詐尸還是另有隱情,我是刑警寧澤倒戏,帶...
    沈念sama閱讀 35,635評論 5 345
  • 正文 年R本政府宣布怠噪,位于F島的核電站,受9級特大地震影響杜跷,放射性物質(zhì)發(fā)生泄漏傍念。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,237評論 3 329
  • 文/蒙蒙 一葛闷、第九天 我趴在偏房一處隱蔽的房頂上張望憋槐。 院中可真熱鬧,春花似錦淑趾、人聲如沸阳仔。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽近范。三九已至嘶摊,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間评矩,已是汗流浹背叶堆。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留斥杜,地道東北人虱颗。 一個月前我還...
    沈念sama閱讀 48,048評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像蔗喂,于是被迫代替她去往敵國和親忘渔。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,864評論 2 354

推薦閱讀更多精彩內(nèi)容