rabbitmq調用

rabbitmq分為發(fā)送端和接收端

發(fā)送端代碼
  • 首先捐祠,引入依賴jar
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>1.4.0.RELEASE</version>
</dependency>
  • 先在application.properties里面配置rabbitmq地址和topic
spring.rabbitmq.addresses=IP
spring.rabbitmq.port=5672
spring.rabbitmq.username=canace(賬號)
spring.rabbitmq.password=123456(密碼)
spring.rabbitmq.connection-timeout=2000
rabbit_topickey=wsQueue
  • 新建一個類玖瘸,發(fā)送消息
@Autowired
private RabbitTemplate rabbitTemplate;

@Value("${rabbit_topickey}")
private String rabbitmqtopic;

@GetMapping("/test")
public String toRabbitmq(String username) {
    System.out.println("To Rabbitmq>>>>>>>>" + username);
    // 第一個參數為剛剛定義的隊列名稱
    this.rabbitTemplate.convertAndSend(rabbitmqtopic, username);
    return "add User to rabbitmq Successful!!";
}
    • 第二種發(fā)送,引入jar包
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.4.3</version>
</dependency>
    • 創(chuàng)建RabbitmqConfig,鏈接rabbitmq
@Configuration
public class RabbitmqConfig {
    private static String host = "IP";
    private static String userName = "canace";
    private static String passWord = "123456";
    private static int port = 5672;

    public static Channel getChannelInstance(String connectionDescription) {
        try {
            ConnectionFactory connectionFactory = getConnectionFactory();
            Connection connection = connectionFactory.newConnection(connectionDescription);
            return connection.createChannel();
        } catch (Exception e) {
            throw new RuntimeException("獲取Channel連接失敗", e);
        }
    }

    @Bean
    private static ConnectionFactory getConnectionFactory() {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 配置連接信息
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(passWord);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setConnectionTimeout(30000);
        connectionFactory.setHandshakeTimeout(30000);
        // 網絡異常自動連接恢復
        connectionFactory.setAutomaticRecoveryEnabled(true);
        // 每10秒嘗試重試連接一次
        connectionFactory.setNetworkRecoveryInterval(10000);
        return connectionFactory;
    }
}
    • 發(fā)送消息代碼
public String send(String exchange, String routingKey, String message) throws IOException {
    if (channel == null || !channel.isOpen()) {
        channel = RabbitmqConfig.getChannelInstance("隊列消息生產者");
    }
    message = URLEncoder.encode(message, "utf-8");
    AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().deliveryMode(2).contentType(
            "UTF-8").build();
    channel.basicPublish(exchange, routingKey, false, basicProperties, message.getBytes());
    RedisUtil.set(message, message);
    return "send ok";
}
接收端代碼
  • 首先,引入依賴jar
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>1.4.0.RELEASE</version>
</dependency>
  • 先在application.properties里面配置一樣的rabbitmq地址,然后queues與發(fā)送端的key配置成一樣是目,group消費組可自行配置。
@Component
@RabbitListener(queues = "wsQueue", group = "wushaungRabit")
public class rabbitMQConsumer {
    /**
     * 消息消費
     * @RabbitHandler 代表此方法為接受到消息后的處理方法
     */
    @RabbitHandler
    public void receive(String msg) {
        System.out.println("[Rabbitmq] recieved message: " + msg);
    }
}
    • 第二種标捺,引入jar包spring-rabbit和spring-amqp
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.3.2</version>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-amqp</artifactId>
    <version>2.3.2</version>
</dependency>
    • 配置rabbitmq.properties
rabbit.hosts=192.168.1.95
rabbit.username=canace
rabbit.password=123456
rabbit.port=5672
rabbit.virtualHost=/
    • 配置amqp-share.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:util="http://www.springframework.org/schema/util"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
     http://www.springframework.org/schema/context
     http://www.springframework.org/schema/context/spring-context-4.3.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
     http://www.springframework.org/schema/util
     http://www.springframework.org/schema/util/spring-util-4.3.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

    <context:property-placeholder location="classpath:rabbitmq.properties"/>
    <util:properties id="appConfig" location="classpath:rabbitmq.properties"></util:properties>
    <rabbit:connection-factory id="connectionFactory" host="${rabbit.hosts}"
                               port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}"
                               virtual-host="${rabbit.virtualHost}"
                               channel-cache-size="10"/>
    <rabbit:admin connection-factory="connectionFactory"/>

    <!--定義消息隊列-->
    <rabbit:queue name="spittle.alert.queue.3" durable="true" auto-delete="false"/>

    <!--交換機定義懊纳,綁定隊列-->
    <rabbit:fanout-exchange id="spittle.fanout" name="spittle.fanout" durable="true">
        <rabbit:bindings>
            <rabbit:binding queue="spittle.alert.queue.3"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

</beans>

    • 配置amqp-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <import resource="amqp-share.xml"/>
    <!-- 配置監(jiān)聽器 -->
    <rabbit:listener-container connection-factory="connectionFactory" type="simple">
        <rabbit:listener ref="spittleListener" method="onMessage"
                         queues="spittle.alert.queue.3"/>
    </rabbit:listener-container>
    <bean id="spittleListener" class="com.pamirs.agent.rabbitmq2xc.demo.handler.SpittleAlertHandler"/>
</beans>
    • 消費端接收端
public class SpittleAlertHandler implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            String body = new String(message.getBody(), "UTF-8").replace("\"", "");
            System.out.println("body>>>>>>>>>>>>>>>" + body);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
}
      • 第三種,引入jar包amqp-client
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.5.0</version>
</dependency>
      • 消費端代碼
@Component
public class rabbitMGconsumer {
    @PostConstruct
    public void init() {
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.1.95");
            factory.setPort(5672);
            factory.setUsername("canace");
            factory.setPassword("123456");
            factory.setVirtualHost("/");
            //// 網絡異常自動連接恢復
            //factory.setAutomaticRecoveryEnabled(true);
            //// 每10秒嘗試重試連接一次
            //factory.setNetworkRecoveryInterval(10000);
            
            Connection connect = factory.newConnection();
            Channel channel = connect.createChannel();
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume("test", false, tag, new DeliverCallback() {
              @Override
              public void handle(String consumerTag, Delivery message) throws IOException {
                  System.out.printf("exchange : %s routingKey : %s consumer tag : %s thread : %s 消息內容 : %s%n", message.getEnvelope().getExchange(), message.getEnvelope().getRoutingKey(), consumerTag, Thread.currentThread().getName(), new String(message.getBody()));
                  channel.basicAck(message.getEnvelope().getDeliveryTag(), true);
              }
            }, new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
                    System.out.println("cancel" + consumerTag);
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末亡容,一起剝皮案震驚了整個濱河市嗤疯,隨后出現的幾起案子,更是在濱河造成了極大的恐慌闺兢,老刑警劉巖茂缚,帶你破解...
    沈念sama閱讀 212,718評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異屋谭,居然都是意外死亡脚囊,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 90,683評論 3 385
  • 文/潘曉璐 我一進店門桐磁,熙熙樓的掌柜王于貴愁眉苦臉地迎上來悔耘,“玉大人,你說我怎么就攤上這事我擂〕囊裕” “怎么了?”我有些...
    開封第一講書人閱讀 158,207評論 0 348
  • 文/不壞的土叔 我叫張陵校摩,是天一觀的道長看峻。 經常有香客問我,道長衙吩,這世上最難降的妖魔是什么互妓? 我笑而不...
    開封第一講書人閱讀 56,755評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮坤塞,結果婚禮上冯勉,老公的妹妹穿的比我還像新娘。我一直安慰自己尺锚,他們只是感情好珠闰,可當我...
    茶點故事閱讀 65,862評論 6 386
  • 文/花漫 我一把揭開白布惜浅。 她就那樣靜靜地躺著瘫辩,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上伐厌,一...
    開封第一講書人閱讀 50,050評論 1 291
  • 那天承绸,我揣著相機與錄音,去河邊找鬼挣轨。 笑死军熏,一個胖子當著我的面吹牛,可吹牛的內容都是我干的卷扮。 我是一名探鬼主播荡澎,決...
    沈念sama閱讀 39,136評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼扫俺,長吁一口氣:“原來是場噩夢啊……” “哼危喉!你這毒婦竟也來了?” 一聲冷哼從身側響起铐尚,我...
    開封第一講書人閱讀 37,882評論 0 268
  • 序言:老撾萬榮一對情侶失蹤鞭铆,失蹤者是張志新(化名)和其女友劉穎或衡,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體车遂,經...
    沈念sama閱讀 44,330評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡封断,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,651評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了舶担。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片坡疼。...
    茶點故事閱讀 38,789評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖衣陶,靈堂內的尸體忽然破棺而出回梧,到底是詐尸還是另有隱情,我是刑警寧澤祖搓,帶...
    沈念sama閱讀 34,477評論 4 333
  • 正文 年R本政府宣布狱意,位于F島的核電站,受9級特大地震影響拯欧,放射性物質發(fā)生泄漏详囤。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,135評論 3 317
  • 文/蒙蒙 一镐作、第九天 我趴在偏房一處隱蔽的房頂上張望藏姐。 院中可真熱鬧,春花似錦该贾、人聲如沸羔杨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽兜材。三九已至理澎,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間曙寡,已是汗流浹背糠爬。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評論 1 267
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留举庶,地道東北人执隧。 一個月前我還...
    沈念sama閱讀 46,598評論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像户侥,于是被迫代替她去往敵國和親镀琉。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,697評論 2 351

推薦閱讀更多精彩內容