一昭伸、基本概念
RabbitMQ:接受吹菱、存儲和轉(zhuǎn)發(fā)二進(jìn)制數(shù)據(jù)消息
1.生產(chǎn)者
生產(chǎn)者是發(fā)送消息的程序
2.消費(fèi)者
消費(fèi)者是一個等待接收消息的程序。
3.隊列
隊列是存在于RabbitMQ內(nèi)部的郵箱的名稱。盡管消息通過RabbitMQ和應(yīng)用程序流動腐魂,但它們只能存儲在隊列中咧最。隊列只受主機(jī)的內(nèi)存和磁盤限制約束捂人,它本質(zhì)上是一個大型消息緩沖區(qū)御雕。許多生產(chǎn)者可以向一個隊列發(fā)送消息,許多消費(fèi)者可以嘗試從一個隊列接收數(shù)據(jù)先慷。
在使用隊列之前饮笛,必須聲明隊列。如果隊列不存在论熙,則聲明隊列將導(dǎo)致它被創(chuàng)建福青。如果隊列已經(jīng)存在,且其屬性與聲明中的屬性相同脓诡,則聲明無效无午。當(dāng)現(xiàn)有隊列屬性與聲明中的隊列屬性不相同時,就會出現(xiàn)通道級異常祝谚,代碼為406宪迟。
4.綁定
綁定(Bindings)是交換器為了將消息路由到隊列而使用(除其他外)的規(guī)則。要表示交換器E將消息路由到隊列Q交惯,必須將Q綁定到E. 綁定有一些交換類型使用的可選路由鍵屬性次泽。路由鍵的目的是選擇發(fā)布到要路由到綁定隊列的交換器的某些消息。換句話說席爽,路由密鑰就像一個過濾器
5.交換器和交換類型
交換器是發(fā)送消息的AMQP實體意荤。交換器接收消息并將其路由到零個或多個隊列中。所使用的路由算法依賴于稱為綁定的交換類型和規(guī)則只锻。AMQP 0-9-1經(jīng)紀(jì)人提供四種交換類型:
除了交換類型以外玖像,聲明隊列還有其他的屬性,比較重要的有名字齐饮、持久性捐寥、自動刪除、參數(shù)們祖驱。
默認(rèn)交換器:默認(rèn)交換器是直接交換器握恳,當(dāng)時用默認(rèn)交換器時,創(chuàng)建的每個隊列都會自動綁定一個與隊列名稱相同的路由鍵捺僻。舉例睡互,當(dāng)您聲明一個名為“search-indexing-online”的隊列時,AMQP 0-9-1代理將使用“search-indexing-online”作為路由鍵將其綁定到默認(rèn)交換器上陵像。因此就珠,以“search-indexing-online”路由鍵發(fā)布到默認(rèn)交換器上的消息將被路由到名字為“search-indexing-online”的隊列。
6.連接connection
AMQP連接通常是長期的醒颖。AMQP是一個應(yīng)用程序級協(xié)議妻怎,使用TCP進(jìn)行可靠的傳輸。AMQP連接使用身份驗證泞歉,可以使用TLS (SSL)進(jìn)行保護(hù)逼侦。當(dāng)應(yīng)用程序不再需要連接到AMQP代理時匿辩,它應(yīng)該優(yōu)雅地關(guān)閉AMQP連接,而不是突然地關(guān)閉底層的TCP連接榛丢。
7.信道channel
為了方便理解掰读,打個比喻猪狈,將connection比作電纜谅年,channel就是里面的光纖纽甘,一個連接可以有很多信道,各個信道相互獨(dú)立傳輸信息掖鱼。
8.虛擬主機(jī)virtural hosts
為了使單個代理能夠承載多個獨(dú)立的“環(huán)境”(用戶組然走、交換、隊列等)戏挡,AMQP包含虛擬主機(jī)(vhosts)的概念芍瑞。它們類似于許多流行Web服務(wù)器使用的虛擬主機(jī),并提供AMQP實體所在的完全獨(dú)立的環(huán)境褐墅。AMQP客戶端指定在AMQP連接協(xié)商期間希望使用的vhost拆檬。
二、代碼
package com.codesheep.product;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;
@Component(value = "producer")
public class Producer {
//隊列名
private final static String QUEUE_NAME = "hello";
/**
* 步奏:
* 1.創(chuàng)建連接工廠(需要指定ip妥凳、端口秩仆、用戶名、密碼)----連接rabbitmq服務(wù)端猾封,創(chuàng)建連接
* 2、創(chuàng)建渠道channel噪珊、聲明隊列(作用是將渠道當(dāng)中的信息傳遞到隊列)
* 3晌缘、發(fā)布消息到交換機(jī),交換機(jī)會自動將消息傳遞到指定的隊列痢站,等待消費(fèi)
*/
public static void main(String[] args) throws IOException, TimeoutException {
//創(chuàng)建連接到服務(wù)端工廠
ConnectionFactory factory=new ConnectionFactory();
//指定MQ的主機(jī)
factory.setHost("47.114.50.39");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//創(chuàng)建一個連接:連接相當(dāng)于電纜磷箕,渠道相當(dāng)于電纜當(dāng)中的電線
Connection connection=factory.newConnection();
//一個電纜當(dāng)中可以有多個電線
Channel channel=connection.createChannel();
//聲明隊列,參數(shù)意義(隊列名阵难,是否持久化岳枷,是否是排他隊列,是否自動刪除呜叫,參數(shù)的其他屬性)聲明隊列是冪等的——它只會在它不存在的情況下被創(chuàng)建
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message="hello word"+new Date();
//發(fā)布消息空繁,參數(shù)意義(交換器(不填寫為默認(rèn)),路由鍵朱庆,消息路由頭的其他屬性盛泡,消息體),發(fā)布到隊列當(dāng)中(MQ會創(chuàng)建隊列)
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("Send:"+message);
channel.close();
connection.close();
factory.clone();
}
}
package com.codesheep.custom;
import com.rabbitmq.client.*;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@Component
public class Consumer {
private final static String QUEUE_NAME = "hello";
/**
* 步奏:
* 1.創(chuàng)建連接工廠(需要指定ip娱颊、端口傲诵、用戶名凯砍、密碼)----連接rabbitmq服務(wù)端,創(chuàng)建連接
* 2拴竹、創(chuàng)建渠道channel悟衩、聲明隊列(通過渠道連接到隊列)
* 3、通過Consumer的DefaultConsumer緩存
* 4.通過channel.basicConsume消費(fèi)指定隊列
*/
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("47.114.50.39");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//創(chuàng)建連接
Connection connection=factory.newConnection();
//渠道
Channel channel=connection.createChannel();
//綁定隊列
channel.queueDeclare(QUEUE_NAME,false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//從渠道當(dāng)中獲得信息
//以對象的形式提供回調(diào)栓拜,該對象將緩沖消息座泳,直到我們準(zhǔn)備好使用它們?yōu)橹埂_@就是DefaultConsumer子類所做的事情菱属。
com.rabbitmq.client.Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String message=new String(body,"UTF-8");
System.out.println("[消費(fèi)者]接收:'"+message+"'");
}
};
//消費(fèi)
channel.basicConsume(QUEUE_NAME, true, consumer);
channel.close();
connection.close();
factory.clone();
}
}