RocketMq應用實踐

一紧阔、使用rocketmq-spring-boot-starter組件

1、添加依賴


<!-- rocketmq依賴 -->

<dependency>

  <groupId>org.apache.rocketmq</groupId>

  <artifactId>rocketmq-spring-boot-starter</artifactId>

  <version>2.2.2</version>

</dependency>

2烤黍、添加配置

rocketmq:
  nameServer: xx.xx.xx.xx:9876
  producer:
    group: test-group # 必須指定group
    send-message-timeout: 3000 # 消息發(fā)送超時時長知市,默認3s
    retry-times-when-send-failed: 3 # 同步發(fā)送消息失敗重試次數,默認2
    retry-times-when-send-async-failed: 3 # 異步發(fā)送消息失敗重試次數速蕊,默認2
  consumer:
    group: test-group
  topic: test-topic

3嫂丙、配置生產者

常用方法匯總,根據實際需要進行封裝

import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class MQProducerService {

    @Value("${rocketmq.producer.send-message-timeout}")
    private Integer messageTimeOut;

    // 直接注入使用规哲,用于發(fā)送消息到broker服務器
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 發(fā)送同步消息跟啤,異步消費(阻塞當前線程,等待broker響應發(fā)送結果唉锌,這樣不太容易丟失消息)
     * (msgBody也可以是對象隅肥,sendResult為返回的發(fā)送結果)
     */
    public SendResult sendMsg(String topic,String msgBody) {
        SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build());
        return sendResult;
    }
    public SendResult sendKeyMsg(String topic,String msgBody,String key){
        SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).setHeader("KEYS",key).build());
        return sendResult;
    }

    /**
     * 發(fā)送異步消息(通過線程池執(zhí)行發(fā)送到broker的消息任務,執(zhí)行完后回調:在SendCallback中可處理相關成功失敗時的邏輯)
     * (適合對響應時間敏感的業(yè)務場景)
     */
    public void sendAsyncMsg(String topic,String msgBody) {
        rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                // 處理消息發(fā)送成功邏輯
            }
            @Override
            public void onException(Throwable throwable) {
                // 處理消息發(fā)送異常邏輯
            }
        });
    }
    public void sendAsyncKeyMsg(String topic,String msgBody,String key) {
        rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).setHeader("KEYS",key).build(), new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                // 處理消息發(fā)送成功邏輯
            }
            @Override
            public void onException(Throwable throwable) {
                // 處理消息發(fā)送異常邏輯
            }
        });
    }

    /**
     * 發(fā)送延時消息(上面的發(fā)送同步消息袄简,delayLevel的值就為0腥放,因為不延時)
     * 在start版本中 延時消息一共分為18個等級分別為:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     */
    public void sendDelayMsg(String topic,String msgBody, int delayLevel) {
        rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), messageTimeOut, delayLevel);
    }
    public void sendDelayKeyMsg(String topic,String msgBody, int delayLevel,String key) {
        rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).setHeader("KEYS",key).build(), messageTimeOut, delayLevel);
    }

    /**
     * 發(fā)送單向消息(只負責發(fā)送消息,不等待應答绿语,不關心發(fā)送結果秃症,如日志)
     */
    public void sendOneWayMsg(String topic,String msgBody) {
        rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msgBody).build());
    }
    public void sendOneWayKeyMsg(String topic,String msgBody,String key) {
        rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msgBody).setHeader("KEYS",key).build());
    }

    /**
     * 發(fā)送帶tag的消息候址,直接在topic后面加上":tag"
     */
    public SendResult sendTagMsg(String topic,String msgBody,String tag) {
        return rocketMQTemplate.syncSend(topic + ":"+tag, MessageBuilder.withPayload(msgBody).build());
    }
    public SendResult sendTagKeyMsg(String topic,String msgBody,String tag,String key) {
        return rocketMQTemplate.syncSend(topic + ":"+tag, MessageBuilder.withPayload(msgBody).setHeader("KEYS",key).build());
    }
}

4、配置消費者

@RocketMQMessageListener(topic = "${rocketmq.topic}",nameServer = "${rocketmq.nameServer}",consumerGroup = "${rocketmq.consumer.group}")
@Component
public class ComsumerListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String str) {
        try {
            Thread.sleep(1000);
            System.out.println("睡眠結束");
        }catch (Exception e){
            System.out.println("異常");
        }
        System.out.println("接收信息:"+str);

    }
}

5种柑、測試推送/消費mq

@RestController("/mq")
public class RocketMqTestController {
    @Autowired
    private MQProducerService mqProducerService;

    @GetMapping(value = "/sned")
    @ResponseBody
    public Boolean snedMqMsg(){
        mqProducerService.sendMsg("test-topic","發(fā)送mq消息:1111");
        System.out.println("發(fā)送成功");
        return true;
    }
}

測試結果:


image.png

二宗雇、使用rocketmq-client組件

1、添加依賴


<dependency>

  <groupId>org.apache.rocketmq</groupId>

  <artifactId>rocketmq-client</artifactId>

  <version>4.9.1</version>

</dependency>

2莹规、添加配置

rocketmq:
  config:
    topic: test-topic
    nameServerAddress: xx.xx.xxx.xxx:9876;xx.xx.xxx.xxx:9876
    consumerGroupId: test-group

添加配置類

@Component
public class RocketMqProducerConfig {
    private static String nameServerAddress;
    private static String topic;
    private static String groupId;



    @Value("${rocketmq.nameServerAddress}")
    private String appAddress;
    @Value("${rocketmq.config.topic}")
    private String appTopic;
    @Value("${rocketmq.esSync.consumerGroupId}")
    private String appGroupId;
    

    @PostConstruct
    public void getConfig() {
        topic = this.appTopic;
        nameServerAddress = this.appAddress;
        groupId = this.appGroupId;
    }

    public static String getNameServerAddress() {
        return nameServerAddress;
    }
    public static String getTopic() {
        return topic;
    }
    public static String getGroupId() {
        return groupId;
    }
 
}

添加消息dto

public class MqMsgDto implements Serializable {
    private String pushId;

    public String getPushId() {
        return pushId;
    }
    public void setPushId(String pushId) {
        this.pushId = pushId;
    }

}

3、配置生產者

@Service
@DependsOn("rocketMqProducerConfig")
public class CalendarPushMqServiceImpl implements ICalendarPushMqApi {
    protected static final Logger logger = LoggerFactory.getLogger(CalendarPushMqServiceImpl.class);
    private static DefaultMQProducer rocketProducer = null;

static {
        try {
            // 實例化消息生產者Producer
            rocketProducer = new DefaultMQProducer(RocketMqProducerConfig.getGroupId());
            // 設置NameServer的地址
            rocketProducer.setNamesrvAddr(RocketMqProducerConfig.getNameServerAddress());
            rocketProducer.start();
            logger.error("rocketMq生產者初始化成功");
        } catch (Exception e) {
            logger.error("rocketMq生產者初始化失斆谏瘛:", e);
        }
  }

    public void pushMsg(MqMsgDto mqMsgDto) {
        try {
            Message message = new Message();
            message.setTopic(RocketMqProducerConfig.getTopic());
            message.setBody(JsonUtil.json(mqMsgDto).getBytes());
            message.setKeys(calendarMqMsgDto.getCalendarId());
            message.setTags(Long.toString(new Date().getTime()));
            //延時消費,第二個等級-5s
            message.setDelayTimeLevel(2);
            rocketProducer.send(message);
        } catch (Exception e) {
            logger.error("推送mq出錯:", e);
        }
    }
}

4良漱、配置消費者

@Configuration
public class RocketMqConfiguration {
    protected static final Logger logger = LoggerFactory.getLogger(RocketMqConfiguration.class);
    /**
     * -推送開放搜索-普通消費隊列
     * */
    @Bean
    public DefaultMQPushConsumer testConsumer() {
        try {
            // 實例化消費者
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqProducerConfig.getGroupId());
            // 設置NameServer的地址
            consumer.setNamesrvAddr(RocketMqProducerConfig.getNameServerAddress());
            // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
            consumer.subscribe(RocketMqProducerConfig.getTopic(), "*");
            consumer.registerMessageListener(new RocketMqMessageListener());
            consumer.start();
            logger.info("初始化消費者成功");
            return consumer;
        }catch (Exception e){
            logger.error("初始化消費隊列失敾都省:",e);
        }
        return null;
    }
}

監(jiān)聽器配置

@Component
public class RocketMqMessageListener implements MessageListenerConcurrently {

    private static final Logger logger = LoggerFactory.getLogger(RocketMqMessageListener.class);

    private Test test= SpringBeanUtil.getBean(Test.class);
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if(CollectionUtils.isEmpty(list)){
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        for (MessageExt message : list) {
            String mdmqMessage = null;
            MqMsgDto dto = null;
            try {
                mdmqMessage = new String(message.getBody(), StandardCharsets.UTF_8);
                dto = JSON.parseObject(mdmqMessage, MqMsgDto.class);
                if (Objects.isNull(dto)){
                    continue;
                }
               logger.error("接收信息:"+JSON.toJSONString(dto));
               test.update(dto);
            }catch (Exception e){
                logger.error("接收無效消息母市,自動過濾",e);
                continue;
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市损趋,隨后出現的幾起案子患久,更是在濱河造成了極大的恐慌,老刑警劉巖浑槽,帶你破解...
    沈念sama閱讀 206,723評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蒋失,死亡現場離奇詭異,居然都是意外死亡桐玻,警方通過查閱死者的電腦和手機篙挽,發(fā)現死者居然都...
    沈念sama閱讀 88,485評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來镊靴,“玉大人铣卡,你說我怎么就攤上這事∑梗” “怎么了煮落?”我有些...
    開封第一講書人閱讀 152,998評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長踊谋。 經常有香客問我蝉仇,道長,這世上最難降的妖魔是什么褪子? 我笑而不...
    開封第一講書人閱讀 55,323評論 1 279
  • 正文 為了忘掉前任量淌,我火速辦了婚禮,結果婚禮上嫌褪,老公的妹妹穿的比我還像新娘呀枢。我一直安慰自己,他們只是感情好笼痛,可當我...
    茶點故事閱讀 64,355評論 5 374
  • 文/花漫 我一把揭開白布裙秋。 她就那樣靜靜地躺著琅拌,像睡著了一般。 火紅的嫁衣襯著肌膚如雪摘刑。 梳的紋絲不亂的頭發(fā)上进宝,一...
    開封第一講書人閱讀 49,079評論 1 285
  • 那天,我揣著相機與錄音枷恕,去河邊找鬼党晋。 笑死,一個胖子當著我的面吹牛徐块,可吹牛的內容都是我干的未玻。 我是一名探鬼主播,決...
    沈念sama閱讀 38,389評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼胡控,長吁一口氣:“原來是場噩夢啊……” “哼扳剿!你這毒婦竟也來了?” 一聲冷哼從身側響起昼激,我...
    開封第一講書人閱讀 37,019評論 0 259
  • 序言:老撾萬榮一對情侶失蹤庇绽,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后橙困,有當地人在樹林里發(fā)現了一具尸體瞧掺,經...
    沈念sama閱讀 43,519評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,971評論 2 325
  • 正文 我和宋清朗相戀三年纷宇,在試婚紗的時候發(fā)現自己被綠了夸盟。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,100評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡像捶,死狀恐怖上陕,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情拓春,我是刑警寧澤释簿,帶...
    沈念sama閱讀 33,738評論 4 324
  • 正文 年R本政府宣布,位于F島的核電站硼莽,受9級特大地震影響庶溶,放射性物質發(fā)生泄漏。R本人自食惡果不足惜懂鸵,卻給世界環(huán)境...
    茶點故事閱讀 39,293評論 3 307
  • 文/蒙蒙 一偏螺、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧匆光,春花似錦套像、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,289評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽贞让。三九已至,卻和暖如春柳譬,著一層夾襖步出監(jiān)牢的瞬間喳张,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,517評論 1 262
  • 我被黑心中介騙來泰國打工美澳, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留销部,地道東北人。 一個月前我還...
    沈念sama閱讀 45,547評論 2 354
  • 正文 我出身青樓制跟,卻偏偏與公主長得像柴墩,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子凫岖,可洞房花燭夜當晚...
    茶點故事閱讀 42,834評論 2 345