RabbitMQ簡(jiǎn)介
AMQP,即Advanced Message Queuing Protocol琼腔,高級(jí)消息隊(duì)列協(xié)議抡诞,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)秤朗。消息中間件主要用于組件之間的解耦煤蹭,消息的發(fā)送者無(wú)需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息硝皂、隊(duì)列常挚、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)、可靠性稽物、安全奄毡。
RabbitMQ是一個(gè)開源的AMQP實(shí)現(xiàn),服務(wù)器端用Erlang語(yǔ)言編寫贝或,支持多種客戶端吼过,如:Python、Ruby咪奖、.NET盗忱、Java、JMS羊赵、C趟佃、PHP、ActionScript昧捷、XMPP闲昭、STOMP等,支持AJAX靡挥。用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息汤纸,在易用性、擴(kuò)展性芹血、高可用性等方面表現(xiàn)不俗贮泞。
AMQP協(xié)議更多用在企業(yè)系統(tǒng)內(nèi),對(duì)數(shù)據(jù)一致性幔烛、穩(wěn)定性和可靠性要求很高的場(chǎng)景啃擦,對(duì)性能和吞吐量的要求還在其次。
下面將重點(diǎn)介紹RabbitMQ中的一些基礎(chǔ)概念饿悬,了解了這些概念令蛉,是使用好RabbitMQ的基礎(chǔ)。
為什么用RabbitMQ
- 開源狡恬、性能優(yōu)秀珠叔、穩(wěn)定性保障
- RabbitMQ提供可靠性消息投遞模式(confirm)、返回模式(return)
- SpringAMQP完美的整合弟劲、API豐富
- 集群模式豐富祷安,表達(dá)式配置、HA模式兔乞、鏡像隊(duì)列模型
- 保證數(shù)據(jù)不丟失的前提下做到高可靠性汇鞭、可用性
RabbitMQ的高性能之道是如何做到的
Erlang語(yǔ)言最初用于交換機(jī)領(lǐng)域的架構(gòu)模式凉唐,這樣使得RabbitMQ在Broker之間進(jìn)行數(shù)據(jù)交互的性能是非常優(yōu)秀的
Erlang的優(yōu)點(diǎn):Erlang有著和原生Socket一樣的延遲
什么是AMQP高級(jí)消息隊(duì)列協(xié)議
AMQP:即Advanced Message Queuing Protocol,高級(jí)消息隊(duì)列協(xié)議霍骄。
- 是面向消息的中間件的開放標(biāo)準(zhǔn)應(yīng)用層協(xié)議,AMQP的特征是消息導(dǎo)向台囱,排隊(duì),路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布和訂閱)读整,可靠性和安全性簿训。
- AMQP要求消息傳遞提供商和客戶端的行為在不同供應(yīng)商實(shí)現(xiàn)可互操作的情況下,以與SMTP米间,HTTP强品,F(xiàn)TP等相同的方式創(chuàng)建了可互操作的系統(tǒng)。
- AMQP協(xié)議是具有現(xiàn)代特征的二進(jìn)制協(xié)議车伞。一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開發(fā)標(biāo)準(zhǔn)喻喳,為面向消息的中間件設(shè)計(jì)另玖。基于此協(xié)議的客戶端與消息中間件可傳遞消息表伦,并不受客戶端/中間件不同產(chǎn)品谦去,不同開發(fā)語(yǔ)言等條件的限制。
- AMQP是一種二進(jìn)制應(yīng)用層協(xié)議蹦哼,旨在有效地支持各種消息應(yīng)用和通信模式鳄哭。
AMQP的主要特征是面向消息、隊(duì)列纲熏、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)妆丘、可靠性、 安全局劲。
AMQP的協(xié)議棧
- Modle Layer:位于協(xié)議最高層勺拣,主要定義了一些供客戶端調(diào)用的命令,客戶端可以利用這些命令實(shí)現(xiàn)自己的業(yè)務(wù)邏輯鱼填,例如药有,客戶端可以通過(guò)queue.declare聲明一個(gè)隊(duì)列,利用consume命令獲取一個(gè)隊(duì)列中的消息苹丸。
- Session Layer:主要負(fù)責(zé)將客戶端的命令發(fā)送給服務(wù)器愤惰,在將服務(wù)器端的應(yīng)答返回給客戶端,主要為客戶端與服務(wù)器之間通信提供可靠性赘理、同步機(jī)制和錯(cuò)誤處理宦言。
- Transport Layer:主要傳輸二進(jìn)制數(shù)據(jù)流,提供幀的處理商模、信道復(fù)用蜡励、錯(cuò)誤檢測(cè)和數(shù)據(jù)表示令花。
AMQP協(xié)議模型
AMQP核心概念是什么
Server:
又稱作Broker,用于接受客戶端的連接凉倚,實(shí)現(xiàn)AMQP實(shí)體服務(wù)
Connection:
連接兼都,應(yīng)用程序與Broker的網(wǎng)絡(luò)連接
Channel:
網(wǎng)絡(luò)信道,幾乎所有的操作都在Channel中進(jìn)行稽寒,Channel是進(jìn)行消息讀寫的通道扮碧。客戶端可建立多個(gè)Channel杏糙,每個(gè)Channel代表一個(gè)會(huì)話任務(wù)
Message:
消息慎王,服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù),有Properties和Body組成宏侍。Properties可以對(duì)消息進(jìn)行修飾赖淤,比如消息的優(yōu)先級(jí)、延遲等高級(jí)特性谅河;Body則是消息體內(nèi)容咱旱,即我們要傳輸?shù)臄?shù)據(jù);
僅僅創(chuàng)建了客戶端到Broker之間的連接后绷耍,客戶端還是不能發(fā)送消息的吐限。需要為每一個(gè)Connection創(chuàng)建Channel,AMQP協(xié)議規(guī)定只有通過(guò)Channel才能執(zhí)行AMQP的命令褂始。一個(gè)Connection可以包含多個(gè)Channel诸典。之所以需要Channel,是因?yàn)門CP連接的建立和釋放都是十分昂貴的崎苗,如果一個(gè)客戶端每一個(gè)線程都需要與Broker交互狐粱,如果每一個(gè)線程都建立一個(gè)TCP連接,暫且不考慮TCP連接是否浪費(fèi)胆数,就算操作系統(tǒng)也無(wú)法承受每秒建立如此多的TCP連接脑奠。RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發(fā)送消息必須是串行的幅慌,但是建議盡量共用Connection宋欺。
Virtual Host:
虛擬地址,是一個(gè)邏輯概念胰伍,用于進(jìn)行邏輯隔離齿诞,是最上層的消息路由。一個(gè)Virtual Host里面可以有若干個(gè)Exchange和Queue骂租,同一個(gè)Virtual Host里面不能有相同名稱的Exchange或者Queue祷杈;
Virtual Host是權(quán)限控制的最小粒度;
Exchange:
交換機(jī)渗饮,用于接收消息但汞,可根據(jù)路由鍵將消息轉(zhuǎn)發(fā)到綁定的隊(duì)列宿刮。
Binding:
Exchange和Queue之間的虛擬連接,Exchange在與多個(gè)Message Queue發(fā)生Binding后會(huì)生成一張路由表私蕾,路由表中存儲(chǔ)著Message Queue所需消息的限制條件即Binding Key僵缺。當(dāng)Exchange收到Message時(shí)會(huì)解析其Header得到Routing Key,Exchange根據(jù)Routing Key與Exchange Type將Message路由到Message Queue踩叭。Binding Key由Consumer在Binding Exchange與Message Queue時(shí)指定磕潮,而Routing Key由Producer發(fā)送Message時(shí)指定,兩者的匹配方式由Exchange Type決定容贝。
Routing Key:
一個(gè)路由規(guī)則瞳步,虛擬機(jī)可用它來(lái)確定如何路由一個(gè)特定的消息插勤;
Queue:
也稱作Message Queue,即消息隊(duì)列校套,用于保存消息并將他們轉(zhuǎn)發(fā)給消費(fèi)者灌危;
RabbitMQ整體架構(gòu)模型
生產(chǎn)者把消息投遞到Exchange,Exchange投遞到Queue.
因此我們的生產(chǎn)者只需要關(guān)注把消息投遞到指定的Exchange即可,我們的消費(fèi)者只需要監(jiān)聽指定Queue即可辐啄。就是這么簡(jiǎn)單的機(jī)制作喘。
通過(guò)圖我們也能看到硝清,生產(chǎn)者不需要關(guān)注投遞到哪個(gè)隊(duì)列,消費(fèi)也不需要關(guān)注是從哪個(gè)Exchange上來(lái)的脚囊,這兩塊沒有耦合的情況龟糕。主要是應(yīng)為Exchange和Queue有一個(gè)綁定的關(guān)系桐磁。
RabbitMQ消息流轉(zhuǎn)
生產(chǎn)者publisher application 生產(chǎn)消息Message投遞到Exchange上悔耘,Exchange綁定MessageQueue,可以綁定過(guò)多個(gè)MessageQueue我擂,為什么三個(gè)隊(duì)列只有其中一個(gè)隊(duì)列收到了消息呢衬以?主要是由于Exchange是有一個(gè)路由功能的。這個(gè)路由就是routing key,這個(gè)路由有兩個(gè)非常關(guān)鍵的點(diǎn)校摩,
第一個(gè):你的消息是需要發(fā)送到哪個(gè)Exchange看峻。
第二個(gè):你發(fā)消息的時(shí)候需要帶上routing key,然后通過(guò)Exchange 和 MessageQueue 建立一個(gè)綁定關(guān)系衙吩,通過(guò)路由key把消息路由到一個(gè)指定的隊(duì)列上互妓。然后我們的消費(fèi)端直接監(jiān)聽隊(duì)列就行了,就可以消費(fèi)了坤塞。
RabbitMQ的安裝與使用
RabbitMQ官網(wǎng)地址:https://www.rabbitmq.com/
Erlang官網(wǎng)地址:https://www.erlang.org/
windows平臺(tái)安裝方式:http://www.reibang.com/p/ba3b2c62dce1
Centos7平臺(tái)安裝方式:
綠色版安裝:https://blog.csdn.net/weixin_41004350/article/details/83046842
rpm安裝步驟:
準(zhǔn)備環(huán)境:
yum install
build-essential openssl openssl-devel unixODBC unixODBC-devel
make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
下載:
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
配置文件:
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
比如修改密碼冯勉、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
服務(wù)啟動(dòng)和停止:
啟動(dòng) rabbitmq-server start &
停止 rabbitmqctl app_stop
管理插件:rabbitmq-plugins enable rabbitmq_management
訪問(wèn)地址:http://ip:15672/
RabbitMQ命令行與管控臺(tái)
基礎(chǔ)服務(wù)的命令操作
rabbitmqctl stop_app:關(guān)閉應(yīng)用
rabbitmqctl start_app:?jiǎn)?dòng)應(yīng)用
rabbtmqctl status:節(jié)點(diǎn)狀態(tài)
rabbitmqctl add_user username password:添加用戶
rabbitmqctl list_users:列出所有用戶
rabbitmqctl delete_user username:刪除用戶
rabbitmqctl clear_permissions - p vhostpath username: 清除用戶權(quán)限
rabbitmqctl list_user_permissions_username: 列出用戶權(quán)限
rabbitmqctl change_password username newpassword:修改密碼
rabbitmqctl set_permissions -p vhostpath username "." "." ".*" :設(shè)置用戶權(quán)限
涉及的用戶命令還有許多摹芙,這里就不一一列舉了灼狰。
對(duì)rabbitmq 具體組件的命令
對(duì)虛擬主機(jī)操作
rabbitmqctl add_vhost vhostpath:創(chuàng)建虛擬主機(jī)
rabbitmqctl list_vhosts:列出所有虛擬主機(jī)
rabbitmqctl list_permissions -p vhostpath:列出虛擬主機(jī)上所有權(quán)限
rabbitmqctl delete_vhost vhostpath:刪除虛擬主機(jī)
隊(duì)列操作
rabbitmqctl list_queues:查看所有隊(duì)列信息
rabbitmqctl -p vhostpath purge_queue bule:清除隊(duì)列里的消息
高級(jí)操作
rabbitmqctl reset:移除所有數(shù)據(jù),要在rabbitmqctl stop_app之后使用
rabbitmqctl join_clust [--ram]:組成集群命令
rabbitmqctl clustr_status:查看集群狀態(tài)
rabbitmqctl change_cluster_node_type disc|ram 修改集群節(jié)點(diǎn)的存儲(chǔ)形式
rabbitmqctl forget_cluster_node [--offline] 忘記節(jié)點(diǎn)(摘除節(jié)點(diǎn))
rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2..] (修改節(jié)點(diǎn)名稱)
更多的RabbitMQ命令行與管控臺(tái)操作:https://www.cnblogs.com/coder-programming/p/11382322.html
RabbitMQ交換機(jī)詳解
Exchange:交換機(jī)浮禾,用于接收消息交胚,可根據(jù)路由鍵將消息轉(zhuǎn)發(fā)到綁定的隊(duì)列
Exchange的屬性:
- Name :名字份汗,同一個(gè)virtual host里面的Name不能重復(fù);不同的virtual host是可以重復(fù)的蝴簇。
- Type:fanout杯活、direct、topic军熏、headers四種
public enum BuiltinExchangeType {
DIRECT("direct"),
FANOUT("fanout"),
TOPIC("topic"),
HEADERS("headers");
}
- Durability:是否持久化轩猩,true:是;false:否荡澎。如果不持久化均践,當(dāng)break重啟之后,當(dāng)前的exchange會(huì)消失
- Auto delete:當(dāng)最后一個(gè)綁定到Exchange上的隊(duì)列被刪除后摩幔,自動(dòng)刪除該Exchange
- Internal:當(dāng)前Exchange是否是內(nèi)部RabbitMQ內(nèi)部使用彤委,默認(rèn)false,是的話或衡,就意味著我們不能往該Exchange里面發(fā)送消息
- Arguments:擴(kuò)展參數(shù)焦影,是AMQP協(xié)議留給AMQP實(shí)現(xiàn)做擴(kuò)展用的。
RabbitMQ總共有四種Exchange模式(fanout封断、direct斯辰、topic、headers)
RabbitMQ中坡疼,所有生產(chǎn)者提交的消息都由Exchange來(lái)接受彬呻,然后Exchange按照特定的策略轉(zhuǎn)發(fā)到Queue進(jìn)行存儲(chǔ)
RabbitMQ提供了四種Exchange:fanout、direct柄瑰、topic闸氮、headers
Headers Exchange 會(huì)忽略 RoutingKey 而根據(jù)消息中的Headers和創(chuàng)建綁定關(guān)系時(shí)指定的 Arguments來(lái)匹配決定路由到哪些Queue,在實(shí)際中使用較少教沾,本文只對(duì)前三種模式進(jìn)行比較蒲跨。
性能排序:fanout > direct > topic。比例大約為11 :10 :6
Direct Exchange
Direct Exchange:處理路由鍵授翻。需要將一個(gè)隊(duì)列綁定到交換機(jī)上或悲,要求該消息與一個(gè)特定的路由鍵完全匹配。
所有發(fā)送到Direct Exchange的消息被轉(zhuǎn)發(fā)到RouteKey中指定的queue
注意:Direct模式可以使用RabbitMQ自帶的Exchange:default Exchange堪唐,所以不需要將Exchange進(jìn)行任何綁定(binding)操作巡语,消息傳遞時(shí),RouteKey必須完全匹配才會(huì)被隊(duì)列接收羔杨,否則該消息會(huì)被拋棄捌臊。
Channel channel = connection.createChannel();
channel.exchangeDeclare("exchangeName", "direct"); //direct fanout topic
channel.queueDeclare("queueName");
channel.queueBind("queueName", "exchangeName", "routingKey");
byte[] messageBodyBytes = "hello world".getBytes();
//需要綁定路由鍵
channel.basicPublish("exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
Fanout Exchange
不處理路由鍵。你只需要簡(jiǎn)單的將隊(duì)列綁定到交換機(jī)上兜材。一個(gè)發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上理澎。很像子網(wǎng)廣播逞力,每臺(tái)子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息。Fanout交換機(jī)轉(zhuǎn)發(fā)消息是最快的糠爬。
任何發(fā)送到Fanout Exchange的消息都會(huì)被轉(zhuǎn)發(fā)到與該Exchange綁定(Binding)的所有Queue上寇荧。
- 1、可以理解為路由表的模式
- 2执隧、這種模式不需要RouteKey
- 3揩抡、這種模式需要提前將Exchange與Queue進(jìn)行綁定,一個(gè)Exchange可以綁定多個(gè)Queue镀琉,一個(gè)Queue可以同多個(gè)Exchange進(jìn)行綁定峦嗤。
- 4、如果接受到消息的Exchange沒有與任何Queue綁定屋摔,則消息會(huì)被拋棄烁设。
Channel channel = connection.createChannel();
channel.exchangeDeclare("exchangeName", "fanout"); //direct fanout topic
channel.queueDeclare("queueName");
channel.queueBind("queueName", "exchangeName", "routingKey");
channel.queueDeclare("queueName1");
channel.queueBind("queueName1", "exchangeName", "routingKey1");
byte[] messageBodyBytes = "hello world".getBytes();
//路由鍵需要設(shè)置為空
channel.basicPublish("exchangeName", "", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
Topic Exchange
Topic Exchange:將路由鍵和某模式進(jìn)行匹配。此時(shí)隊(duì)列需要綁定要一個(gè)模式上钓试。符號(hào)“#”匹配一個(gè)或多個(gè)詞装黑,符號(hào)“”匹配不多不少一個(gè)詞。因此“audit.#”能夠匹配到“audit.irs.corporate”弓熏,但是“audit.” 只會(huì)匹配到“audit.irs”
任何發(fā)送到Topic Exchange的消息都會(huì)被轉(zhuǎn)發(fā)到所有關(guān)心RouteKey中指定話題的Queue上
- 1恋谭、這種模式較為復(fù)雜,簡(jiǎn)單來(lái)說(shuō)挽鞠,就是每個(gè)隊(duì)列都有其關(guān)心的主題疚颊,所有的消息都帶有一個(gè)“標(biāo)題”(RouteKey),Exchange會(huì)將消息轉(zhuǎn)發(fā)到所有關(guān)注主題能與RouteKey模糊匹配的隊(duì)列滞谢。
- 2串稀、這種模式需要RouteKey除抛,也許要提前綁定Exchange與Queue狮杨。
- 3、在進(jìn)行綁定時(shí)到忽,要提供一個(gè)該隊(duì)列關(guān)心的主題橄教,如“#.log.#”表示該隊(duì)列關(guān)心所有涉及l(fā)og的消息(一個(gè)RouteKey為”MQ.log.error”的消息會(huì)被轉(zhuǎn)發(fā)到該隊(duì)列)。
- 4喘漏、可以使用通配符進(jìn)行模糊匹配护蝶,“#”表示0個(gè)或若干個(gè)關(guān)鍵字,“”表示一個(gè)關(guān)鍵字翩迈。如“l(fā)og.”能與“l(fā)og.warn”匹配持灰,無(wú)法與“l(fā)og.warn.timeout”匹配;但是“l(fā)og.#”能與上述兩者匹配负饲。
- 5堤魁、同樣喂链,如果Exchange沒有發(fā)現(xiàn)能夠與RouteKey匹配的Queue,則會(huì)拋棄此消息妥泉。
Channel channel = connection.createChannel();
channel.exchangeDeclare("exchangeName", "topic"); //direct fanout topic
channel.queueDeclare("queueName");
channel.queueBind("queueName", "exchangeName", "routingKey.*");
byte[] messageBodyBytes = "hello world".getBytes();
channel.basicPublish("exchangeName", "routingKey.one", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
Headers Exchange
Headers Exchange 會(huì)忽略 RoutingKey 而根據(jù)消息中的 Headers 和創(chuàng)建綁定關(guān)系時(shí)指定的 Arguments 來(lái)匹配決定路由到哪些 Queue椭微。
Headers Exchange 的性能比較差,而且 Direct Exchange 完全可以代替它盲链,所以不建議使用蝇率。
Default Exchange
Default Exchange 是一種特殊的 Direct Exchange。當(dāng)你手動(dòng)創(chuàng)建一個(gè)隊(duì)列時(shí)刽沾,后臺(tái)會(huì)自動(dòng)將這個(gè)隊(duì)列綁定到一個(gè)名稱為空的 Direct Exchange 上本慕,綁定 RoutingKey 與隊(duì)列名稱相同。有了這個(gè)默認(rèn)的交換機(jī)和綁定侧漓,使我們只關(guān)心隊(duì)列這一層即可间狂,這個(gè)比較適合做一些簡(jiǎn)單的應(yīng)用。
RabbitMQ隊(duì)列火架、綁定鉴象、虛擬主機(jī)、消息
Binding:
綁定Exchange和Exchange何鸡、Queue之間的連接關(guān)系纺弊,Binding中可以包含RoutingKey或者參數(shù)
Queue:
消息隊(duì)列,實(shí)際存儲(chǔ)消息數(shù)據(jù)
- Durability:是否持久化骡男,Durable:是淆游,Transient:否
- Auto Delete:如選yes,代表當(dāng)最后一個(gè)監(jiān)聽被移除之后隔盛,該Queue會(huì)自動(dòng)被刪除
Message:
服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù)犹菱,本質(zhì)上是一段數(shù)據(jù),有Properties和Payload(Body)組成
常用屬性:delivery mode吮炕、headers(自定義屬性)
其他屬性:
content_type腊脱、content_encoding(字符集)、priority(消息優(yōu)先級(jí)0-9,從小到大,優(yōu)先級(jí)越來(lái)越高)龙亲、correlation_id(消息唯一id)陕凹、reply_to(重回隊(duì)列,返回哪個(gè)隊(duì)列)、expiration(消息過(guò)期時(shí)間)鳄炉、message_id(消息id)杜耙、timestamp(時(shí)間戳)、type拂盯、user_id佑女、app_id、cluster_id
- Producer代碼:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、創(chuàng)建一個(gè)ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2团驱、通過(guò)連接工廠創(chuàng)建連接
Connection connection = connectionFactory.newConnection();
//3簸呈、通過(guò)Connection創(chuàng)建一個(gè)Channel
Channel channel = connection.createChannel();
Map<String,Object> map = new HashMap<>();
map.put("將故事寫成我們",111);
map.put("夜曲",222);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.headers(map)
.build();
//4、通過(guò)Channel發(fā)送數(shù)據(jù)
String msg = "hello rabbitmq!";
//1 exchange 2 routingKey
channel.basicPublish("","test001",properties,msg.getBytes("UTF-8"));
//5店茶、關(guān)閉相關(guān)連接
channel.close();
connection.close();
}
}
- Consumer端代碼:
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1蜕便、創(chuàng)建一個(gè)ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2、通過(guò)連接工廠創(chuàng)建連接
Connection connection = connectionFactory.newConnection();
//3贩幻、通過(guò)Connection創(chuàng)建一個(gè)Channel
Channel channel = connection.createChannel();
//4轿腺、聲明(創(chuàng)建)一個(gè)隊(duì)列
String queueName = "test001";
channel.queueDeclare(queueName,true,false,false,null);
//5、創(chuàng)建消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("RoutingKey="+envelope.getRoutingKey() + "丛楚,message=" + message);
Map<String, Object> headers = properties.getHeaders();
System.out.println("properties的屬性:headers="+headers);
}
};
//6族壳、設(shè)置channel
channel.basicConsume(queueName,true,consumer);
}
}
Virtual Host:
虛擬主機(jī),用于進(jìn)行邏輯隔離趣些,最上層的消息路由仿荆,一個(gè)Virtual Host里面可以有若干個(gè)Exchange和Queue,同一個(gè)Virtual Host不能有相同名稱的Exchange和Queue
參考:
https://www.cnblogs.com/shenyixin/p/9084249.html