RabbitMQ 核心概念

RabbitMQ簡(jiǎn)介

AMQP,即Advanced Message Queuing Protocol琼腔,高級(jí)消息隊(duì)列協(xié)議抡诞,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)秤朗。消息中間件主要用于組件之間的解耦煤蹭,消息的發(fā)送者無(wú)需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息硝皂、隊(duì)列常挚、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)、可靠性稽物、安全奄毡。
RabbitMQ是一個(gè)開源的AMQP實(shí)現(xiàn),服務(wù)器端用Erlang語(yǔ)言編寫贝或,支持多種客戶端吼过,如:Python、Ruby咪奖、.NET盗忱、Java、JMS羊赵、C趟佃、PHP、ActionScript昧捷、XMPP闲昭、STOMP等,支持AJAX靡挥。用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息汤纸,在易用性、擴(kuò)展性芹血、高可用性等方面表現(xiàn)不俗贮泞。
AMQP協(xié)議更多用在企業(yè)系統(tǒng)內(nèi),對(duì)數(shù)據(jù)一致性幔烛、穩(wěn)定性和可靠性要求很高的場(chǎng)景啃擦,對(duì)性能和吞吐量的要求還在其次。

下面將重點(diǎn)介紹RabbitMQ中的一些基礎(chǔ)概念饿悬,了解了這些概念令蛉,是使用好RabbitMQ的基礎(chǔ)。

為什么用RabbitMQ

  • 開源狡恬、性能優(yōu)秀珠叔、穩(wěn)定性保障
  • RabbitMQ提供可靠性消息投遞模式(confirm)、返回模式(return)
  • SpringAMQP完美的整合弟劲、API豐富
  • 集群模式豐富祷安,表達(dá)式配置、HA模式兔乞、鏡像隊(duì)列模型
  • 保證數(shù)據(jù)不丟失的前提下做到高可靠性汇鞭、可用性

RabbitMQ的高性能之道是如何做到的

Erlang語(yǔ)言最初用于交換機(jī)領(lǐng)域的架構(gòu)模式凉唐,這樣使得RabbitMQ在Broker之間進(jìn)行數(shù)據(jù)交互的性能是非常優(yōu)秀的
Erlang的優(yōu)點(diǎn):Erlang有著和原生Socket一樣的延遲

什么是AMQP高級(jí)消息隊(duì)列協(xié)議

AMQP:即Advanced Message Queuing Protocol,高級(jí)消息隊(duì)列協(xié)議霍骄。

  • 是面向消息的中間件的開放標(biāo)準(zhǔn)應(yīng)用層協(xié)議,AMQP的特征是消息導(dǎo)向台囱,排隊(duì),路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布和訂閱)读整,可靠性和安全性簿训。
  • AMQP要求消息傳遞提供商和客戶端的行為在不同供應(yīng)商實(shí)現(xiàn)可互操作的情況下,以與SMTP米间,HTTP强品,F(xiàn)TP等相同的方式創(chuàng)建了可互操作的系統(tǒng)。
  • AMQP協(xié)議是具有現(xiàn)代特征的二進(jìn)制協(xié)議车伞。一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開發(fā)標(biāo)準(zhǔn)喻喳,為面向消息的中間件設(shè)計(jì)另玖。基于此協(xié)議的客戶端與消息中間件可傳遞消息表伦,并不受客戶端/中間件不同產(chǎn)品谦去,不同開發(fā)語(yǔ)言等條件的限制。
  • AMQP是一種二進(jìn)制應(yīng)用層協(xié)議蹦哼,旨在有效地支持各種消息應(yīng)用和通信模式鳄哭。

AMQP的主要特征是面向消息、隊(duì)列纲熏、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)妆丘、可靠性、 安全局劲。

AMQP的協(xié)議棧

image.png

  • Modle Layer:位于協(xié)議最高層勺拣,主要定義了一些供客戶端調(diào)用的命令,客戶端可以利用這些命令實(shí)現(xiàn)自己的業(yè)務(wù)邏輯鱼填,例如药有,客戶端可以通過(guò)queue.declare聲明一個(gè)隊(duì)列,利用consume命令獲取一個(gè)隊(duì)列中的消息苹丸。
  • Session Layer:主要負(fù)責(zé)將客戶端的命令發(fā)送給服務(wù)器愤惰,在將服務(wù)器端的應(yīng)答返回給客戶端,主要為客戶端與服務(wù)器之間通信提供可靠性赘理、同步機(jī)制和錯(cuò)誤處理宦言。
  • Transport Layer:主要傳輸二進(jìn)制數(shù)據(jù)流,提供幀的處理商模、信道復(fù)用蜡励、錯(cuò)誤檢測(cè)和數(shù)據(jù)表示令花。

AMQP協(xié)議模型

image.png

AMQP核心概念是什么

Server:
又稱作Broker,用于接受客戶端的連接凉倚,實(shí)現(xiàn)AMQP實(shí)體服務(wù)

Connection:
連接兼都,應(yīng)用程序與Broker的網(wǎng)絡(luò)連接

Channel:
網(wǎng)絡(luò)信道,幾乎所有的操作都在Channel中進(jìn)行稽寒,Channel是進(jìn)行消息讀寫的通道扮碧。客戶端可建立多個(gè)Channel杏糙,每個(gè)Channel代表一個(gè)會(huì)話任務(wù)

Message:
消息慎王,服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù),有Properties和Body組成宏侍。Properties可以對(duì)消息進(jìn)行修飾赖淤,比如消息的優(yōu)先級(jí)、延遲等高級(jí)特性谅河;Body則是消息體內(nèi)容咱旱,即我們要傳輸?shù)臄?shù)據(jù);
僅僅創(chuàng)建了客戶端到Broker之間的連接后绷耍,客戶端還是不能發(fā)送消息的吐限。需要為每一個(gè)Connection創(chuàng)建Channel,AMQP協(xié)議規(guī)定只有通過(guò)Channel才能執(zhí)行AMQP的命令褂始。一個(gè)Connection可以包含多個(gè)Channel诸典。之所以需要Channel,是因?yàn)門CP連接的建立和釋放都是十分昂貴的崎苗,如果一個(gè)客戶端每一個(gè)線程都需要與Broker交互狐粱,如果每一個(gè)線程都建立一個(gè)TCP連接,暫且不考慮TCP連接是否浪費(fèi)胆数,就算操作系統(tǒng)也無(wú)法承受每秒建立如此多的TCP連接脑奠。RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發(fā)送消息必須是串行的幅慌,但是建議盡量共用Connection宋欺。

Virtual Host:
虛擬地址,是一個(gè)邏輯概念胰伍,用于進(jìn)行邏輯隔離齿诞,是最上層的消息路由。一個(gè)Virtual Host里面可以有若干個(gè)Exchange和Queue骂租,同一個(gè)Virtual Host里面不能有相同名稱的Exchange或者Queue祷杈;
Virtual Host是權(quán)限控制的最小粒度;

Exchange:
交換機(jī)渗饮,用于接收消息但汞,可根據(jù)路由鍵將消息轉(zhuǎn)發(fā)到綁定的隊(duì)列宿刮。

Binding:
Exchange和Queue之間的虛擬連接,Exchange在與多個(gè)Message Queue發(fā)生Binding后會(huì)生成一張路由表私蕾,路由表中存儲(chǔ)著Message Queue所需消息的限制條件即Binding Key僵缺。當(dāng)Exchange收到Message時(shí)會(huì)解析其Header得到Routing Key,Exchange根據(jù)Routing Key與Exchange Type將Message路由到Message Queue踩叭。Binding Key由Consumer在Binding Exchange與Message Queue時(shí)指定磕潮,而Routing Key由Producer發(fā)送Message時(shí)指定,兩者的匹配方式由Exchange Type決定容贝。

Routing Key:
一個(gè)路由規(guī)則瞳步,虛擬機(jī)可用它來(lái)確定如何路由一個(gè)特定的消息插勤;

Queue:
也稱作Message Queue,即消息隊(duì)列校套,用于保存消息并將他們轉(zhuǎn)發(fā)給消費(fèi)者灌危;

RabbitMQ整體架構(gòu)模型

image.png

生產(chǎn)者把消息投遞到Exchange,Exchange投遞到Queue.
因此我們的生產(chǎn)者只需要關(guān)注把消息投遞到指定的Exchange即可,我們的消費(fèi)者只需要監(jiān)聽指定Queue即可辐啄。就是這么簡(jiǎn)單的機(jī)制作喘。
通過(guò)圖我們也能看到硝清,生產(chǎn)者不需要關(guān)注投遞到哪個(gè)隊(duì)列,消費(fèi)也不需要關(guān)注是從哪個(gè)Exchange上來(lái)的脚囊,這兩塊沒有耦合的情況龟糕。主要是應(yīng)為Exchange和Queue有一個(gè)綁定的關(guān)系桐磁。

RabbitMQ消息流轉(zhuǎn)

image.png

生產(chǎn)者publisher application 生產(chǎn)消息Message投遞到Exchange上悔耘,Exchange綁定MessageQueue,可以綁定過(guò)多個(gè)MessageQueue我擂,為什么三個(gè)隊(duì)列只有其中一個(gè)隊(duì)列收到了消息呢衬以?主要是由于Exchange是有一個(gè)路由功能的。這個(gè)路由就是routing key,這個(gè)路由有兩個(gè)非常關(guān)鍵的點(diǎn)校摩,
第一個(gè):你的消息是需要發(fā)送到哪個(gè)Exchange看峻。
第二個(gè):你發(fā)消息的時(shí)候需要帶上routing key,然后通過(guò)Exchange 和 MessageQueue 建立一個(gè)綁定關(guān)系衙吩,通過(guò)路由key把消息路由到一個(gè)指定的隊(duì)列上互妓。然后我們的消費(fèi)端直接監(jiān)聽隊(duì)列就行了,就可以消費(fèi)了坤塞。

RabbitMQ的安裝與使用

RabbitMQ官網(wǎng)地址:https://www.rabbitmq.com/
Erlang官網(wǎng)地址:https://www.erlang.org/
windows平臺(tái)安裝方式:http://www.reibang.com/p/ba3b2c62dce1
Centos7平臺(tái)安裝方式:
綠色版安裝:https://blog.csdn.net/weixin_41004350/article/details/83046842
rpm安裝步驟:

準(zhǔn)備環(huán)境:
yum install 
build-essential openssl openssl-devel unixODBC unixODBC-devel 
make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

下載:
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm

配置文件:
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
比如修改密碼冯勉、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
服務(wù)啟動(dòng)和停止:
啟動(dòng) rabbitmq-server start &
停止 rabbitmqctl app_stop

管理插件:rabbitmq-plugins enable rabbitmq_management
訪問(wèn)地址:http://ip:15672/

RabbitMQ命令行與管控臺(tái)

基礎(chǔ)服務(wù)的命令操作

rabbitmqctl stop_app:關(guān)閉應(yīng)用

rabbitmqctl start_app:?jiǎn)?dòng)應(yīng)用

rabbtmqctl status:節(jié)點(diǎn)狀態(tài)

rabbitmqctl add_user username password:添加用戶

rabbitmqctl list_users:列出所有用戶

rabbitmqctl delete_user username:刪除用戶

rabbitmqctl clear_permissions - p vhostpath username: 清除用戶權(quán)限

rabbitmqctl list_user_permissions_username: 列出用戶權(quán)限

rabbitmqctl change_password username newpassword:修改密碼

rabbitmqctl set_permissions -p vhostpath username "." "." ".*" :設(shè)置用戶權(quán)限

涉及的用戶命令還有許多摹芙,這里就不一一列舉了灼狰。

對(duì)rabbitmq 具體組件的命令
對(duì)虛擬主機(jī)操作

rabbitmqctl add_vhost vhostpath:創(chuàng)建虛擬主機(jī)

rabbitmqctl list_vhosts:列出所有虛擬主機(jī)

rabbitmqctl list_permissions -p vhostpath:列出虛擬主機(jī)上所有權(quán)限

rabbitmqctl delete_vhost vhostpath:刪除虛擬主機(jī)

隊(duì)列操作

rabbitmqctl list_queues:查看所有隊(duì)列信息

rabbitmqctl -p vhostpath purge_queue bule:清除隊(duì)列里的消息

高級(jí)操作

rabbitmqctl reset:移除所有數(shù)據(jù),要在rabbitmqctl stop_app之后使用

rabbitmqctl join_clust [--ram]:組成集群命令

rabbitmqctl clustr_status:查看集群狀態(tài)

rabbitmqctl change_cluster_node_type disc|ram 修改集群節(jié)點(diǎn)的存儲(chǔ)形式

rabbitmqctl forget_cluster_node [--offline] 忘記節(jié)點(diǎn)(摘除節(jié)點(diǎn))

rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2..] (修改節(jié)點(diǎn)名稱)

更多的RabbitMQ命令行與管控臺(tái)操作:https://www.cnblogs.com/coder-programming/p/11382322.html

RabbitMQ交換機(jī)詳解

Exchange:交換機(jī)浮禾,用于接收消息交胚,可根據(jù)路由鍵將消息轉(zhuǎn)發(fā)到綁定的隊(duì)列

image.png

Exchange的屬性:

  • Name :名字份汗,同一個(gè)virtual host里面的Name不能重復(fù);不同的virtual host是可以重復(fù)的蝴簇。
  • Type:fanout杯活、direct、topic军熏、headers四種
public enum BuiltinExchangeType {
    DIRECT("direct"),
    FANOUT("fanout"),
    TOPIC("topic"),
    HEADERS("headers");
}
  • Durability:是否持久化轩猩,true:是;false:否荡澎。如果不持久化均践,當(dāng)break重啟之后,當(dāng)前的exchange會(huì)消失
  • Auto delete:當(dāng)最后一個(gè)綁定到Exchange上的隊(duì)列被刪除后摩幔,自動(dòng)刪除該Exchange
  • Internal:當(dāng)前Exchange是否是內(nèi)部RabbitMQ內(nèi)部使用彤委,默認(rèn)false,是的話或衡,就意味著我們不能往該Exchange里面發(fā)送消息
  • Arguments:擴(kuò)展參數(shù)焦影,是AMQP協(xié)議留給AMQP實(shí)現(xiàn)做擴(kuò)展用的。

RabbitMQ總共有四種Exchange模式(fanout封断、direct斯辰、topic、headers)
RabbitMQ中坡疼,所有生產(chǎn)者提交的消息都由Exchange來(lái)接受彬呻,然后Exchange按照特定的策略轉(zhuǎn)發(fā)到Queue進(jìn)行存儲(chǔ)
RabbitMQ提供了四種Exchange:fanout、direct柄瑰、topic闸氮、headers
Headers Exchange 會(huì)忽略 RoutingKey 而根據(jù)消息中的Headers和創(chuàng)建綁定關(guān)系時(shí)指定的 Arguments來(lái)匹配決定路由到哪些Queue,在實(shí)際中使用較少教沾,本文只對(duì)前三種模式進(jìn)行比較蒲跨。
性能排序:fanout > direct > topic。比例大約為11 :10 :6

Direct Exchange

image.png

Direct Exchange:處理路由鍵授翻。需要將一個(gè)隊(duì)列綁定到交換機(jī)上或悲,要求該消息與一個(gè)特定的路由鍵完全匹配。
所有發(fā)送到Direct Exchange的消息被轉(zhuǎn)發(fā)到RouteKey中指定的queue
注意:Direct模式可以使用RabbitMQ自帶的Exchange:default Exchange堪唐,所以不需要將Exchange進(jìn)行任何綁定(binding)操作巡语,消息傳遞時(shí),RouteKey必須完全匹配才會(huì)被隊(duì)列接收羔杨,否則該消息會(huì)被拋棄捌臊。

Channel channel = connection.createChannel();    
channel.exchangeDeclare("exchangeName", "direct"); //direct fanout topic    
channel.queueDeclare("queueName");    
channel.queueBind("queueName", "exchangeName", "routingKey");    
    
byte[] messageBodyBytes = "hello world".getBytes();    
//需要綁定路由鍵    
channel.basicPublish("exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);

Fanout Exchange

image.png

不處理路由鍵。你只需要簡(jiǎn)單的將隊(duì)列綁定到交換機(jī)上兜材。一個(gè)發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上理澎。很像子網(wǎng)廣播逞力,每臺(tái)子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息。Fanout交換機(jī)轉(zhuǎn)發(fā)消息是最快的糠爬。
任何發(fā)送到Fanout Exchange的消息都會(huì)被轉(zhuǎn)發(fā)到與該Exchange綁定(Binding)的所有Queue上寇荧。

  • 1、可以理解為路由表的模式
  • 2执隧、這種模式不需要RouteKey
  • 3揩抡、這種模式需要提前將Exchange與Queue進(jìn)行綁定,一個(gè)Exchange可以綁定多個(gè)Queue镀琉,一個(gè)Queue可以同多個(gè)Exchange進(jìn)行綁定峦嗤。
  • 4、如果接受到消息的Exchange沒有與任何Queue綁定屋摔,則消息會(huì)被拋棄烁设。
Channel channel = connection.createChannel();    
channel.exchangeDeclare("exchangeName", "fanout"); //direct fanout topic    
channel.queueDeclare("queueName");    
channel.queueBind("queueName", "exchangeName", "routingKey");    
    
channel.queueDeclare("queueName1");    
channel.queueBind("queueName1", "exchangeName", "routingKey1");    
    
byte[] messageBodyBytes = "hello world".getBytes();    
//路由鍵需要設(shè)置為空    
channel.basicPublish("exchangeName", "", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);

Topic Exchange

image.png

Topic Exchange:將路由鍵和某模式進(jìn)行匹配。此時(shí)隊(duì)列需要綁定要一個(gè)模式上钓试。符號(hào)“#”匹配一個(gè)或多個(gè)詞装黑,符號(hào)“”匹配不多不少一個(gè)詞。因此“audit.#”能夠匹配到“audit.irs.corporate”弓熏,但是“audit.” 只會(huì)匹配到“audit.irs”
任何發(fā)送到Topic Exchange的消息都會(huì)被轉(zhuǎn)發(fā)到所有關(guān)心RouteKey中指定話題的Queue上

  • 1恋谭、這種模式較為復(fù)雜,簡(jiǎn)單來(lái)說(shuō)挽鞠,就是每個(gè)隊(duì)列都有其關(guān)心的主題疚颊,所有的消息都帶有一個(gè)“標(biāo)題”(RouteKey),Exchange會(huì)將消息轉(zhuǎn)發(fā)到所有關(guān)注主題能與RouteKey模糊匹配的隊(duì)列滞谢。
  • 2串稀、這種模式需要RouteKey除抛,也許要提前綁定Exchange與Queue狮杨。
  • 3、在進(jìn)行綁定時(shí)到忽,要提供一個(gè)該隊(duì)列關(guān)心的主題橄教,如“#.log.#”表示該隊(duì)列關(guān)心所有涉及l(fā)og的消息(一個(gè)RouteKey為”MQ.log.error”的消息會(huì)被轉(zhuǎn)發(fā)到該隊(duì)列)。
  • 4喘漏、可以使用通配符進(jìn)行模糊匹配护蝶,“#”表示0個(gè)或若干個(gè)關(guān)鍵字,“”表示一個(gè)關(guān)鍵字翩迈。如“l(fā)og.”能與“l(fā)og.warn”匹配持灰,無(wú)法與“l(fā)og.warn.timeout”匹配;但是“l(fā)og.#”能與上述兩者匹配负饲。
  • 5堤魁、同樣喂链,如果Exchange沒有發(fā)現(xiàn)能夠與RouteKey匹配的Queue,則會(huì)拋棄此消息妥泉。
Channel channel = connection.createChannel();    
channel.exchangeDeclare("exchangeName", "topic"); //direct fanout topic    
channel.queueDeclare("queueName");    
channel.queueBind("queueName", "exchangeName", "routingKey.*");    
    
byte[] messageBodyBytes = "hello world".getBytes();    
channel.basicPublish("exchangeName", "routingKey.one", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes); 

Headers Exchange
Headers Exchange 會(huì)忽略 RoutingKey 而根據(jù)消息中的 Headers 和創(chuàng)建綁定關(guān)系時(shí)指定的 Arguments 來(lái)匹配決定路由到哪些 Queue椭微。
Headers Exchange 的性能比較差,而且 Direct Exchange 完全可以代替它盲链,所以不建議使用蝇率。

Default Exchange
Default Exchange 是一種特殊的 Direct Exchange。當(dāng)你手動(dòng)創(chuàng)建一個(gè)隊(duì)列時(shí)刽沾,后臺(tái)會(huì)自動(dòng)將這個(gè)隊(duì)列綁定到一個(gè)名稱為空的 Direct Exchange 上本慕,綁定 RoutingKey 與隊(duì)列名稱相同。有了這個(gè)默認(rèn)的交換機(jī)和綁定侧漓,使我們只關(guān)心隊(duì)列這一層即可间狂,這個(gè)比較適合做一些簡(jiǎn)單的應(yīng)用。

RabbitMQ隊(duì)列火架、綁定鉴象、虛擬主機(jī)、消息

Binding:
綁定Exchange和Exchange何鸡、Queue之間的連接關(guān)系纺弊,Binding中可以包含RoutingKey或者參數(shù)

Queue:
消息隊(duì)列,實(shí)際存儲(chǔ)消息數(shù)據(jù)

  • Durability:是否持久化骡男,Durable:是淆游,Transient:否
  • Auto Delete:如選yes,代表當(dāng)最后一個(gè)監(jiān)聽被移除之后隔盛,該Queue會(huì)自動(dòng)被刪除

Message:
服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù)犹菱,本質(zhì)上是一段數(shù)據(jù),有Properties和Payload(Body)組成
常用屬性:delivery mode吮炕、headers(自定義屬性)
其他屬性:
content_type腊脱、content_encoding(字符集)、priority(消息優(yōu)先級(jí)0-9,從小到大,優(yōu)先級(jí)越來(lái)越高)龙亲、correlation_id(消息唯一id)陕凹、reply_to(重回隊(duì)列,返回哪個(gè)隊(duì)列)、expiration(消息過(guò)期時(shí)間)鳄炉、message_id(消息id)杜耙、timestamp(時(shí)間戳)、type拂盯、user_id佑女、app_id、cluster_id

  • Producer代碼:
public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、創(chuàng)建一個(gè)ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2团驱、通過(guò)連接工廠創(chuàng)建連接
        Connection connection = connectionFactory.newConnection();

        //3簸呈、通過(guò)Connection創(chuàng)建一個(gè)Channel
        Channel channel = connection.createChannel();

        Map<String,Object> map = new HashMap<>();
        map.put("將故事寫成我們",111);
        map.put("夜曲",222);

        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                .expiration("10000")
                .headers(map)
                .build();

        //4、通過(guò)Channel發(fā)送數(shù)據(jù)
        String msg = "hello rabbitmq!";
        //1 exchange   2 routingKey
        channel.basicPublish("","test001",properties,msg.getBytes("UTF-8"));

        //5店茶、關(guān)閉相關(guān)連接
        channel.close();
        connection.close();
    }
}
  • Consumer端代碼:
public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1蜕便、創(chuàng)建一個(gè)ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通過(guò)連接工廠創(chuàng)建連接
        Connection connection = connectionFactory.newConnection();

        //3贩幻、通過(guò)Connection創(chuàng)建一個(gè)Channel
        Channel channel = connection.createChannel();

        //4轿腺、聲明(創(chuàng)建)一個(gè)隊(duì)列
        String queueName = "test001";
        channel.queueDeclare(queueName,true,false,false,null);

        //5、創(chuàng)建消費(fèi)者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("RoutingKey="+envelope.getRoutingKey() + "丛楚,message=" + message);
                Map<String, Object> headers = properties.getHeaders();
                System.out.println("properties的屬性:headers="+headers);
            }
        };

        //6族壳、設(shè)置channel
        channel.basicConsume(queueName,true,consumer);
    }
}

Virtual Host:
虛擬主機(jī),用于進(jìn)行邏輯隔離趣些,最上層的消息路由仿荆,一個(gè)Virtual Host里面可以有若干個(gè)Exchange和Queue,同一個(gè)Virtual Host不能有相同名稱的Exchange和Queue

參考:
https://www.cnblogs.com/shenyixin/p/9084249.html

https://www.cnblogs.com/luhan777/p/11162649.html

https://www.cnblogs.com/wuzhenzhao/p/10319677.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末坏平,一起剝皮案震驚了整個(gè)濱河市拢操,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌舶替,老刑警劉巖令境,帶你破解...
    沈念sama閱讀 211,884評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異顾瞪,居然都是意外死亡舔庶,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門陈醒,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)惕橙,“玉大人,你說(shuō)我怎么就攤上這事钉跷∶逐校” “怎么了?”我有些...
    開封第一講書人閱讀 157,435評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵尘应,是天一觀的道長(zhǎng)惶凝。 經(jīng)常有香客問(wèn)我吼虎,道長(zhǎng)犬钢,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,509評(píng)論 1 284
  • 正文 為了忘掉前任思灰,我火速辦了婚禮玷犹,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘洒疚。我一直安慰自己歹颓,他們只是感情好坯屿,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,611評(píng)論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著巍扛,像睡著了一般领跛。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上撤奸,一...
    開封第一講書人閱讀 49,837評(píng)論 1 290
  • 那天吠昭,我揣著相機(jī)與錄音,去河邊找鬼胧瓜。 笑死矢棚,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的府喳。 我是一名探鬼主播蒲肋,決...
    沈念sama閱讀 38,987評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼钝满!你這毒婦竟也來(lái)了兜粘?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,730評(píng)論 0 267
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤弯蚜,失蹤者是張志新(化名)和其女友劉穎妹沙,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體熟吏,經(jīng)...
    沈念sama閱讀 44,194評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡距糖,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,525評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了牵寺。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片悍引。...
    茶點(diǎn)故事閱讀 38,664評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖帽氓,靈堂內(nèi)的尸體忽然破棺而出趣斤,到底是詐尸還是另有隱情,我是刑警寧澤黎休,帶...
    沈念sama閱讀 34,334評(píng)論 4 330
  • 正文 年R本政府宣布浓领,位于F島的核電站,受9級(jí)特大地震影響势腮,放射性物質(zhì)發(fā)生泄漏联贩。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,944評(píng)論 3 313
  • 文/蒙蒙 一捎拯、第九天 我趴在偏房一處隱蔽的房頂上張望泪幌。 院中可真熱鬧,春花似錦、人聲如沸祸泪。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,764評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)没隘。三九已至懂扼,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間右蒲,已是汗流浹背微王。 一陣腳步聲響...
    開封第一講書人閱讀 31,997評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留品嚣,地道東北人炕倘。 一個(gè)月前我還...
    沈念sama閱讀 46,389評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像翰撑,于是被迫代替她去往敵國(guó)和親罩旋。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,554評(píng)論 2 349