RocketMQ 生產(chǎn)者

學(xué)習(xí)完整課程請(qǐng)移步 互聯(lián)網(wǎng) Java 全棧工程師

本節(jié)視頻

概述

RocketMQ 是一款開(kāi)源的分布式消息系統(tǒng)聋亡,基于高可用分布式集群技術(shù)抡砂,提供低延時(shí)的胚迫、高可靠的消息發(fā)布與訂閱服務(wù)。

由于本教程整個(gè)案例基于 Spring Cloud测秸,故我們采用 Spring Cloud Stream 完成一次發(fā)布和訂閱

官方教程

Spring Cloud Stream

Spring Cloud Stream 是一個(gè)用于構(gòu)建基于消息的微服務(wù)應(yīng)用框架拓售。它基于 Spring Boot 來(lái)創(chuàng)建具有生產(chǎn)級(jí)別的單機(jī) Spring 應(yīng)用瞬浓,并且使用 Spring Integration 與 Broker 進(jìn)行連接惠遏。

Spring Cloud Stream 提供了消息中間件配置的統(tǒng)一抽象,推出了 publish-subscribe拄养、consumer groups离斩、partition 這些統(tǒng)一的概念银舱。

Spring Cloud Stream 內(nèi)部有兩個(gè)概念:

  • Binder: 跟外部消息中間件集成的組件,用來(lái)創(chuàng)建 Binding跛梗,各消息中間件都有自己的 Binder 實(shí)現(xiàn)寻馏。
  • Binding: 包括 Input Binding 和 Output Binding。

Binding 在消息中間件與應(yīng)用程序提供的 Provider 和 Consumer 之間提供了一個(gè)橋梁核偿,實(shí)現(xiàn)了開(kāi)發(fā)者只需使用應(yīng)用程序的 Provider 或 Consumer 生產(chǎn)或消費(fèi)數(shù)據(jù)即可诚欠,屏蔽了開(kāi)發(fā)者與底層消息中間件的接觸。

解決連接超時(shí)問(wèn)題

在之前的 基于 Docker 安裝 RocketMQ 章節(jié)中漾岳,我們采用 Docker 部署了 RocketMQ 服務(wù)轰绵,此時(shí) RocketMQ Broker 暴露的地址和端口(10909,10911)是基于容器的尼荆,會(huì)導(dǎo)致我們開(kāi)發(fā)機(jī)無(wú)法連接左腔,從而引發(fā) org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout 異常

注意下圖中的 IP 地址,這個(gè)是容器的 IP捅儒,開(kāi)發(fā)機(jī)與容器不在一個(gè)局域網(wǎng)所以無(wú)法連接液样。

解決方案是在 broker.conf 配置文件中增加 brokerIP1=宿主機(jī)IP 即可

POM

主要增加了 org.springframework.cloud:spring-cloud-starter-stream-rocketmq 依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>com.funtl</groupId>
        <artifactId>hello-spring-cloud-alibaba-dependencies</artifactId>
        <version>1.0.0-SNAPSHOT</version>
        <relativePath>../hello-spring-cloud-alibaba-dependencies/pom.xml</relativePath>
    </parent>

    <artifactId>hello-spring-cloud-alibaba-rocketmq-provider</artifactId>
    <packaging>jar</packaging>

    <name>hello-spring-cloud-alibaba-rocketmq-provider</name>
    <url>http://www.funtl.com</url>
    <inceptionYear>2018-Now</inceptionYear>

    <dependencies>
        <!-- Spring Boot Begin -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- Spring Boot End -->

        <!-- Spring Cloud Begin -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>
        <!-- Spring Cloud End -->
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <mainClass>com.funtl.hello.spring.cloud.alibaba.rocketmq.provider.RocketMQProviderApplication</mainClass>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

消息生產(chǎn)者服務(wù)

package com.funtl.hello.spring.cloud.alibaba.rocketmq.provider.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class ProviderService {
    @Autowired
    private MessageChannel output;

    public void send(String message) {
        output.send(MessageBuilder.withPayload(message).build());
    }
}

Application

配置 Output(Source.class) 的 Binding 信息并配合 @EnableBinding 注解使其生效

package com.funtl.hello.spring.cloud.alibaba.rocketmq.provider;

import com.funtl.hello.spring.cloud.alibaba.rocketmq.provider.service.ProviderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;

@SpringBootApplication
@EnableBinding({Source.class})
public class RocketMQProviderApplication implements CommandLineRunner {

    @Autowired
    private ProviderService providerService;

    public static void main(String[] args) {
        SpringApplication.run(RocketMQProviderApplication.class, args);
    }

    /**
     * 實(shí)現(xiàn)了 CommandLineRunner 接口,只是為了 Spring Boot 啟動(dòng)時(shí)執(zhí)行任務(wù)巧还,不必特別在意
     * @param args
     * @throws Exception
     */
    @Override
    public void run(String... args) throws Exception {
        providerService.send("Hello RocketMQ");
    }
}

application.yml

spring:
  application:
    name: rocketmq-provider
  cloud:
    stream:
      rocketmq:
        binder:
          # RocketMQ 服務(wù)器地址
          namesrv-addr: 192.168.10.149:9876
      bindings:
        # 這里是個(gè) Map 類(lèi)型參數(shù)鞭莽,{} 為 YAML 中 Map 的行內(nèi)寫(xiě)法
        output: {destination: test-topic, content-type: application/json}

server:
  port: 9093

management:
  endpoints:
    web:
      exposure:
        include: '*'

運(yùn)行成功后即可在 RocketMQ 控制臺(tái)的 消息 列表中選擇 test-topic 主題即可看到發(fā)送的消息

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市麸祷,隨后出現(xiàn)的幾起案子澎怒,更是在濱河造成了極大的恐慌,老刑警劉巖阶牍,帶你破解...
    沈念sama閱讀 212,029評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件丹拯,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡荸恕,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,395評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)死相,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)融求,“玉大人,你說(shuō)我怎么就攤上這事算撮∩穑” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,570評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵肮柜,是天一觀的道長(zhǎng)陷舅。 經(jīng)常有香客問(wèn)我,道長(zhǎng)审洞,這世上最難降的妖魔是什么莱睁? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,535評(píng)論 1 284
  • 正文 為了忘掉前任待讳,我火速辦了婚禮,結(jié)果婚禮上仰剿,老公的妹妹穿的比我還像新娘创淡。我一直安慰自己,他們只是感情好南吮,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,650評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布琳彩。 她就那樣靜靜地躺著,像睡著了一般部凑。 火紅的嫁衣襯著肌膚如雪露乏。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,850評(píng)論 1 290
  • 那天涂邀,我揣著相機(jī)與錄音瘟仿,去河邊找鬼。 笑死必孤,一個(gè)胖子當(dāng)著我的面吹牛猾骡,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播敷搪,決...
    沈念sama閱讀 39,006評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼兴想,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了赡勘?” 一聲冷哼從身側(cè)響起嫂便,我...
    開(kāi)封第一講書(shū)人閱讀 37,747評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎闸与,沒(méi)想到半個(gè)月后毙替,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,207評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡践樱,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,536評(píng)論 2 327
  • 正文 我和宋清朗相戀三年厂画,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片拷邢。...
    茶點(diǎn)故事閱讀 38,683評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡袱院,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出瞭稼,到底是詐尸還是另有隱情忽洛,我是刑警寧澤,帶...
    沈念sama閱讀 34,342評(píng)論 4 330
  • 正文 年R本政府宣布环肘,位于F島的核電站欲虚,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏悔雹。R本人自食惡果不足惜复哆,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,964評(píng)論 3 315
  • 文/蒙蒙 一欣喧、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧寂恬,春花似錦续誉、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,772評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至牙咏,卻和暖如春臼隔,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背妄壶。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,004評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工摔握, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人丁寄。 一個(gè)月前我還...
    沈念sama閱讀 46,401評(píng)論 2 360
  • 正文 我出身青樓氨淌,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親伊磺。 傳聞我的和親對(duì)象是個(gè)殘疾皇子盛正,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,566評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容