1震庭,摘要
從安裝環(huán)境,配置入門你雌,到HelloWorld實(shí)操器联,各種類型消息傳遞的演示代碼,原理介紹婿崭,答疑解惑拨拓,面試題,全面介紹RabbitMQ消息隊(duì)列氓栈。
RabbitMQ集群搭建另外一篇文章介紹渣磷。
2,內(nèi)容
2.1 Ubuntu 16.04 安裝 RabbitMQ
參考:
(1)Ubuntu 16.04 RabbitMq 安裝與運(yùn)行(安裝篇)【成功】
https://blog.csdn.net/qq_22638399/article/details/81704372
2.1.1 安裝 Erlang
由于rabbitMq需要erlang語言的支持授瘦,在安裝rabbitMq之前需要安裝erlang醋界,執(zhí)行命令:
apt-get install erlang-nox # 安裝erlang
erl # 查看relang語言版本,成功執(zhí)行則說明relang安裝成功
2.1.2 安裝 RabbitMQ 3.5.7
2提完,添加公鑰
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
3.更新軟件包
apt-get update
4.安裝 RabbitMQ
apt-get install erlang #安裝erlang形纺。
apt-get install rabbitmq-server #安裝成功自動啟動。
通過系統(tǒng)默認(rèn)源安裝得到的rabbitmq的版本是3.5.7氯葬。
檢查rabbitmq的版本:
rabbitmqctl status |grep rabbit #檢查rabbit版本
5.查看 RabbitMq狀態(tài)
systemctl status rabbitmq-server #Active: active (running) 說明處于運(yùn)行狀態(tài)
# service rabbitmq-server status 用service指令也可以查看挡篓,同systemctl指令
6.啟動、停止、重啟
service rabbitmq-server start # 啟動
service rabbitmq-server stop # 停止
service rabbitmq-server restart # 重啟
執(zhí)行了上面的步驟官研,rabbitMq已經(jīng)安裝成功秽澳。
2.1.2 安裝 RabbitMQ 3.8.4
(1)如果之前安裝過RabbitMQ,此處需要卸載掉。
apt-get remove rabbitmq
apt-get purge erlang
apt-get autoremove
Erlang跟RabbitMQ匹配戏羽,才能安裝得成功担神。
Erlang Release Series | Apt Repositories that provide it | Notes |
---|---|---|
23.x | Debian packages of Erlang by Team RabbitMQErlang Solutions | Supported starting with 3.8.4. See Erlang compatibility guide. |
(2)Erlang安裝最新版本 (20+版本):
配置源
echo "deb http://packages.erlang-solutions.com/ubuntu trusty contrib" | tee -a /etc/apt/sources.list.d/erlang_solutions.list
導(dǎo)入key
wget -O- https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc |apt-key add -
更新和安裝
apt-get update
apt-get install erlang-nox
(3)下載RabbitMQ版本。
wget -c https://bintray.com/rabbitmq/debian/download_file?file_path=pool%2Frabbitmq-server%2Frabbitmq-server_3.8.4-1_all.deb
mv download_file?file_path=pool%2Frabbitmq-server%2Frabbitmq-server_3.8.4-1_all.deb rabbitmq-server_3.8.4-1_all.deb
安裝RabbitMQ3.8.4軟件始花。
apt get install socat
dpkg -i ./rabbitmq-server_3.8.4-1_all.deb
(4)查看 RabbitMq狀態(tài)
systemctl status rabbitmq-server #Active: active (running) 說明處于運(yùn)行狀態(tài)
# service rabbitmq-server status 用service指令也可以查看妄讯,同systemctl指令
(5)啟動、停止酷宵、重啟
service rabbitmq-server start # 啟動
service rabbitmq-server stop # 停止
service rabbitmq-server restart # 重啟
執(zhí)行了上面的步驟亥贸,rabbitMq已經(jīng)安裝成功。
2.1.3 啟用 RabbitMQ web 管理插件
- 啟用 web端可視化操作界面浇垦,我們還需要配置Management Plugin插件
rabbitmq-plugins enable rabbitmq_management # 啟用插件
service rabbitmq-server restart # 重啟
注意:RabbitMQ 3.3 及后續(xù)版本炕置,guest 只能在服務(wù)本機(jī)登錄。
8.查看用戶
rabbitmqctl list_users
9.添加管理用戶
rabbitmqctl add_user admin [yourpassword] # 增加普通用戶男韧,密碼為admin
rabbitmqctl set_user_tags admin administrator # 給普通用戶分配管理員角色
10.設(shè)置權(quán)限朴摊,指定允許訪問的vhost以及write/read
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" admin "." "." ".*"
rabbitmqctl list_permissions -p / #查看vhost(/)允許哪些用戶訪問
-
修改配置
在/etc/rabbitmq 目錄下面增加文件
rabbitmq.config
,編輯內(nèi)容如下,loopback_users表示guest用戶只能本地訪問此虑,admin 賬戶能夠外網(wǎng)訪問甚纲。[
{rabbit, [{tcp_listeners, [5672]}, {loopback_users, [“guest”]}]}
].
**端口信息:**
1. client端通信口amqp: 5672
2. http管理口15672
3. server間內(nèi)部通信口25672
4. erlang發(fā)現(xiàn)口:4369
想要修改默認(rèn)端口可修改,修改安裝目錄下 etc/rabbitmq.config文件朦前。
**配置相關(guān)信息:**
=INFO REPORT==== 22-Jan-2021::11:32:41 ===
node : rabbit@JD3
home dir : /var/lib/rabbitmq
config file(s) : /etc/rabbitmq/rabbitmq.config
cookie hash : VrYYogHAkySebZfCPZM8DQ==
log : /var/log/rabbitmq/rabbit@JD3.log
sasl log : /var/log/rabbitmq/rabbit@JD3-sasl.log
database dir : /var/lib/rabbitmq/mnesia/rabbit@JD3
-
重啟服務(wù)器
systemctl restart rabbitmq-server
ok介杆,你可以在你的瀏覽器上輸入:http://服務(wù)器Ip:15672/ 來訪問你的rabbitmq監(jiān)控頁面。使用剛剛添加的新用戶登錄韭寸。
2.1.4 管理命令參考
命令全集:
Commands:
stop [<pid_file>]
stop_app
start_app
wait <pid_file>
reset
force_reset
rotate_logs <suffix>
join_cluster <clusternode> [--ram]
cluster_status
change_cluster_node_type disc | ram
forget_cluster_node [--offline]
rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2 ...]
update_cluster_nodes clusternode
force_boot
sync_queue queue
cancel_sync_queue queue
purge_queue queue
set_cluster_name name
add_user <username> <password>
delete_user <username>
change_password <username> <newpassword>
clear_password <username>
authenticate_user <username> <password>
set_user_tags <username> <tag> ...
list_users
add_vhost <vhostpath>
delete_vhost <vhostpath>
list_vhosts [<vhostinfoitem> ...]
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
clear_permissions [-p <vhostpath>] <username>
list_permissions [-p <vhostpath>]
list_user_permissions <username>set_parameter [-p <vhostpath>] <component_name> <name> <value>
clear_parameter [-p <vhostpath>] <component_name> <key>
list_parameters [-p <vhostpath>]
?set_policy [-p <vhostpath>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition>
clear_policy [-p <vhostpath>] <name>
list_policies [-p <vhostpath>]list_queues [-p <vhostpath>] [<queueinfoitem> ...]
list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...]
list_bindings [-p <vhostpath>] [<bindinginfoitem> ...]
list_connections [<connectioninfoitem> ...]
list_channels [<channelinfoitem> ...]
list_consumers [-p <vhostpath>]
status
environment
report
eval <expr>close_connection <connectionpid> <explanation>
trace_on [-p <vhost>]
trace_off [-p <vhost>]
set_vm_memory_high_watermark <fraction>
set_vm_memory_high_watermark absolute <memory_limit_in_bytes>
具體命令信息:
在rabbitmq的內(nèi)部數(shù)據(jù)庫添加用戶这溅;
add_user刪除一個(gè)用戶;
delete_user改變用戶密碼(也是改變web管理登陸密碼)棒仍;
change_password清除用戶的密碼悲靴,該用戶將不能使用密碼登陸,但是可以通過SASL登陸如果配置了SASL認(rèn)證莫其;
clear_password設(shè)置用戶tags癞尚;
set_user_tags …列出用戶;
list_users創(chuàng)建一個(gè)vhosts乱陡;
add_vhost刪除一個(gè)vhosts浇揩;
delete_vhost列出vhosts;
list_vhosts [ …]針對一個(gè)vhosts給用戶賦予相關(guān)權(quán)限憨颠;
set_permissions [-p ]清除一個(gè)用戶對vhosts的權(quán)限胳徽;
clear_permissions [-p ]列出哪些用戶可以訪問該vhosts积锅;
list_permissions [-p ]-
列出該用戶的訪問權(quán)限;
list_user_permissionsset_parameter [-p ] <component_name>
clear_parameter [-p ] <component_name>
list_parameters [-p ] 獲取RabbitMQ的版本號
? rabbitmqctl status | grep rabbit
2.2 RabbitMQ配置
參考:https://www.rabbitmq.com/configure.html
2.2.1 修舊配置格式如何區(qū)分
RabbitMQ新的版本(V3.7以后)支持類似ini养盗,sysctrl配置文件格式的配置文件缚陷。這個(gè)文件被命名為rabbitmq.conf。
Configuration File | Format Used | Purpose |
---|---|---|
rabbitmq.conf | New style format (sysctl or ini-like) | Primary configuration file. Should be used for most settings. It is easier for humans to read and machines (deployment tools) to generate. Not every setting can be expressed in this format. |
advanced.config | Classic (Erlang terms) | A limited number of settings that cannot be expressed in the new style configuration format, such as LDAP queries. Only should be used when necessary. |
rabbitmq-env.conf (rabbitmq-env.conf.bat on Windows) | Environment variable pairs | Used to set environment variables relevant to RabbitMQ in one place. |
sysctl 格式舉例: rabbitmq.conf
# A new style format snippet. This format is used by rabbitmq.conf files.
ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile = /path/to/server_certificate.pem
ssl_options.keyfile = /path/to/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
經(jīng)典格式舉例: rabbitmq.config
%% A classic format snippet, now used by advanced.config files.
[
{rabbit, [{ssl_options, [{cacertfile, "/path/to/ca_certificate.pem"},
{certfile, "/path/to/server_certificate.pem"},
{keyfile, "/path/to/server_key.pem"},
{verify, verify_peer},
{fail_if_no_peer_cert, true}]}]}
].
2.2.2 典型配置文件
新格式:
[rabbitmq.conf]
https://github.com/rabbitmq/rabbitmq-server/blob/master/deps/rabbit/docs/rabbitmq.conf.example
經(jīng)典格式:
[advanced.config]
https://github.com/rabbitmq/rabbitmq-server/blob/master/deps/rabbit/docs/advanced.config.example
有些配置設(shè)置是不可能或者很難通過sysctl格式進(jìn)行配置往核。因此箫爷,可以使用Erlang術(shù)語配置格式的附加配置文件,該文件通常被命名為advanced.config聂儒。它將與rabbitmq.conf中提供的配置合并虎锚。
經(jīng)典格式:
[rabbitmq.config.example]
https://github.com/rabbitmq/rabbitmq-server/blob/v3.7.x/deps/rabbit/docs/rabbitmq.config.example
更多配置說明可參考文章《RabbitMQ系列(六)RabbitMQ的配置》
2.2.3 rabbitmq.config詳細(xì)配置參數(shù)
Key | Documentation |
---|---|
tcp_listeners | 用于監(jiān)聽 AMQP連接的端口列表(無SSL). 可以包含整數(shù) (即"監(jiān)聽所有接口")或者元組如 {"127.0.0.1", 5672} 用于監(jiān)聽一個(gè)或多個(gè)接口.Default: [5672] listeners.tcp.default = 5672
|
num_tcp_acceptors | 接受TCP偵聽器連接的Erlang進(jìn)程數(shù)。Default: 10 num_acceptors.tcp = 10
|
handshake_timeout | AMQP 0-8/0-9/0-9-1 handshake (在 socket連接和SSL 握手之后)的最大時(shí)間, 毫秒為單位.Default: 10000 handshake_timeout = 10000
|
ssl_listeners | 如上所述衩婚,用于SSL連接窜护。Default: [] |
num_ssl_acceptors | 接受SSL偵聽器連接的Erlang進(jìn)程數(shù)。Default: 1 num_acceptors.ssl = 10
|
ssl_options | SSL配置.參考SSL documentation.Default: [] ssl_options = none
|
ssl_handshake_timeout | SSL handshake超時(shí)時(shí)間,毫秒為單位.Default: 5000 ssl_handshake_timeout = 5000
|
vm_memory_high_watermark | 流程控制觸發(fā)的內(nèi)存閥值.相看memory-based flow control 文檔.Default: 0.4 vm_memory_high_watermark.relative = 0.6 vm_memory_high_watermark.absolute = 2GB
|
vm_memory_high_watermark_paging_ratio | 高水位限制的分?jǐn)?shù)非春,當(dāng)達(dá)到閥值時(shí)柄慰,隊(duì)列中消息消息會轉(zhuǎn)移到磁盤上以釋放內(nèi)存. 參考memory-based flow control 文檔.Default: 0.5 vm_memory_high_watermark_paging_ratio = 0.5
|
disk_free_limit | RabbitMQ存儲數(shù)據(jù)分區(qū)的可用磁盤空間限制.當(dāng)可用空間值低于閥值時(shí)合蔽,流程控制將被觸發(fā).此值可根據(jù)RAM的總大小來相對設(shè)置 (如.{mem_relative, 1.0}).此值也可以設(shè)為整數(shù)(單位為bytes)或者使用數(shù)字單位(如."50MB").默認(rèn)情況下贱枣,可用磁盤空間必須超過50MB.參考 Disk Alarms 文檔.Default: 50000000 disk_free_limit.relative = 3.0 disk_free_limit.absolute = 2GB
|
log_levels | 控制日志的粒度.其值是日志事件類別(category)和日志級別(level)成對的列表.level 可以是 'none' (不記錄日志事件), 'error' (只記錄錯(cuò)誤), 'warning' (只記錄錯(cuò)誤和警告), 'info' (記錄錯(cuò)誤件甥,警告和信息), or 'debug' (記錄錯(cuò)誤,警告敬矩,信息以及調(diào)試信息).目前定義了4種日志類別. 它們是:channel -針對所有與AMQP channels相關(guān)的事件connection - 針對所有與網(wǎng)絡(luò)連接相關(guān)的事件federation - 針對所有與federation相關(guān)的事件mirroring -針對所有與 mirrored queues相關(guān)的事件Default: [{connection, info}] log.file.level = info
|
frame_max | 與客戶端協(xié)商的允許最大frame大小. 設(shè)置為0表示無限制,但在某些QPid客戶端會引發(fā)bug. 設(shè)置較大的值可以提高吞吐量;設(shè)置一個(gè)較小的值可能會提高延遲. Default: 131072 |
channel_max | 與客戶端協(xié)商的允許最大chanel大小. 設(shè)置為0表示無限制.該數(shù)值越大蠢挡,則broker使用的內(nèi)存就越高.Default: 2047channel_max = 2047
|
channel_operation_timeout | Channel 操作超時(shí)時(shí)間(毫秒為單位) (內(nèi)部使用弧岳,因?yàn)橄f(xié)議的區(qū)別和限制,不暴露給客戶端).Default: 5000 channel_operation_timeout = 15000
|
heartbeat | 表示心跳延遲(單位為秒) 业踏,服務(wù)器將在connection.tune frame中發(fā)送.如果設(shè)置為 0, 心跳將被禁用. 客戶端可以不用遵循服務(wù)器的建議, 查看 AMQP reference 來了解詳情. 禁用心跳可以在有大量連接的場景中提高性能禽炬,但可能會造成關(guān)閉了非活動連接的網(wǎng)絡(luò)設(shè)備上的連接落下.Default: 60 (3.5.5之前的版本是580) heartbeat = 60
|
default_vhost | 當(dāng)RabbitMQ從頭開始創(chuàng)建數(shù)據(jù)庫時(shí)創(chuàng)建的虛擬主機(jī). amq.rabbitmq.log交換器會存在于這個(gè)虛擬主機(jī)中.Default: <<"/">> default_vhost = /
|
default_user | RabbitMQ從頭開始創(chuàng)建數(shù)據(jù)庫時(shí),創(chuàng)建的用戶名.Default: <<"guest">> default_user = guest
|
default_pass | 默認(rèn)用戶的密碼.Default: <<"guest">> default_pass = guest
|
default_user_tags | 默認(rèn)用戶的Tags.Default: [administrator] default_user_tags.administrator = true
|
default_permissions | 創(chuàng)建用戶時(shí)分配給它的默認(rèn)Permissions .Default: [<<".">>, <<".">>, <<".*">>] default_permissions.configure = .* default_permissions.read = .* default_permissions.write = .*
|
loopback_users | 只能通過環(huán)回接口(即localhost)連接broker的用戶列表如果你希望默認(rèn)的guest用戶能遠(yuǎn)程連接,你必須將其修改為[].Default: [<<"guest">>] ``loopback_users = none |
cluster_nodes | 當(dāng)節(jié)點(diǎn)第一次啟動的時(shí)候勤家,設(shè)置此選項(xiàng)會導(dǎo)致集群動作自動發(fā)生. 元組的第一個(gè)元素是其它節(jié)點(diǎn)想與其建立集群的節(jié)點(diǎn). 第二個(gè)元素是節(jié)點(diǎn)的類型腹尖,要么是disc,要么是ramDefault: {[], disc} cluster_formation.classic_config.nodes.1 = rabbit@hostname1 cluster_formation.classic_config.nodes.2 = rabbit@hostname2
|
server_properties | 連接時(shí)向客戶端聲明的鍵值對列表Default: [] |
collect_statistics | 統(tǒng)計(jì)收集模式。主要與管理插件相關(guān)伐脖。選項(xiàng):none (不發(fā)出統(tǒng)計(jì)事件)coarse (發(fā)出每個(gè)隊(duì)列 /每個(gè)通道 /每個(gè)連接的統(tǒng)計(jì)事件)fine (也發(fā)出每個(gè)消息統(tǒng)計(jì)事件)你自已可不用修改此選項(xiàng).Default: none `` |
collect_statistics_interval | 統(tǒng)計(jì)收集時(shí)間間隔(毫秒為單位). 主要針對于 management plugin.Default: 5000 collect_statistics_interval = 5000
|
auth_mechanisms | 提供給客戶端的SASL authentication mechanisms.Default: ['PLAIN', 'AMQPLAIN'] auth_mechanisms.1 = PLAIN auth_mechanisms.2 = AMQPLAIN
|
auth_backends | 用于 authentication / authorisation backends 的列表. 此列表可包含模塊的名稱(在模塊相同的情況下热幔,將同時(shí)用于認(rèn)證來授權(quán))或像{ModN, ModZ}這樣的元組,在這里ModN將用于認(rèn)證讼庇,ModZ將用于授權(quán).在2元組的情況中, ModZ可由列表代替,列表中的所有元素必須通過每個(gè)授權(quán)的確認(rèn)绎巨,如{ModN, [ModZ1, ModZ2]}.這就允許授權(quán)插件進(jìn)行組合提供額外的安全約束.除rabbit_auth_backend_internal外,其它數(shù)據(jù)庫可以通常 plugins來使用.Default: [rabbit_auth_backend_internal] |
reverse_dns_lookups | 設(shè)置為true,可讓客戶端在連接時(shí)讓RabbitMQ 執(zhí)行一個(gè)反向DNS查找, 然后通過 rabbitmqctl 和 管理插件來展現(xiàn)信息. Default: false |
delegate_count | 內(nèi)部集群通信中蠕啄,委派進(jìn)程的數(shù)目. 在一個(gè)有非常多核的機(jī)器(集群的一部分)上,你可以增加此值.Default: 16 |
trace_vhosts | tracer內(nèi)部使用.你不應(yīng)該修改.Default: [] |
tcp_listen_options | 默認(rèn)socket選項(xiàng). 你可能不想修改這個(gè)選項(xiàng).Default:[{backlog, 128}, {nodelay, true}, {exit_on_close, false}] |
hipe_compile | 將此選項(xiàng)設(shè)置為true,將會使用HiPE預(yù)編譯部分RabbitMQ,Erlang的即時(shí)編譯器. 這可以增加服務(wù)器吞吐量场勤,但會增加服務(wù)器的啟動時(shí)間. 你可以看到花費(fèi)幾分鐘延遲啟動的成本戈锻,就可以帶來20-50% 更好性能.這些數(shù)字與高度依賴于工作負(fù)載和硬件.HiPE 支持可能沒有編譯進(jìn)你的Erlang安裝中.如果沒有的話,啟用這個(gè)選項(xiàng),并啟動RabbitMQ時(shí)和媳,會看到警告消息. 例如, Debian / Ubuntu 用戶需要安裝erlang-base-hipe 包.HiPE并非在所有平臺上都可用,尤其是Windows.在 Erlang/OTP 17.5版本之前格遭,HiPE有明顯的問題 . 對于HiPE,使用最新的OTP版本是高度推薦的.Default: false |
cluster_partition_handling | 如何處理網(wǎng)絡(luò)分區(qū).可用模式有:ignorepause_minority{pause_if_all_down, [nodes], ignore | autoheal}where [nodes] is a list of node names (ex: ['rabbit@node1', 'rabbit@node2'])autoheal參考documentation on partitions 來了解更多信息Default: ignore |
cluster_keepalive_interval | 節(jié)點(diǎn)向其它節(jié)點(diǎn)發(fā)送存活消息和頻率(毫秒). 注意,這與 net_ticktime是不同的;丟失存活消息不會引起節(jié)點(diǎn)掉線Default: 10000 |
queue_index_embed_msgs_below | 消息大小在此之下的會直接內(nèi)嵌在隊(duì)列索引中. 在修改此值時(shí)窗价,建議你先閱讀 persister tuning 文檔.Default: 4096 |
msg_store_index_module | 隊(duì)列索引的實(shí)現(xiàn)模塊. 在修改此值時(shí)如庭,建議你先閱讀 persister tuning 文檔.Default: rabbit_msg_store_ets_index |
backing_queue_module | 隊(duì)列內(nèi)容的實(shí)現(xiàn)模塊. 你可能不想修改此值.Default: rabbit_variable_queue |
msg_store_file_size_limit | Tunable value for the persister. 你幾乎肯定不應(yīng)該改變此值。Default: 16777216 |
mnesia_table_loading_timeout | 在集群中等待使用Mnesia表可用的超時(shí)時(shí)間撼港。Default: 30000 |
queue_index_max_ journal_entries | Tunable value for the persister. 你幾乎肯定不應(yīng)該改變此值坪它。Default: 65536 |
queue_master_locator | Queue master 位置策略.可用策略有:<<"min-masters">><<"client-local">><<"random">>查看documentation on queue master location 來了解更多信息.Default: <<"client-local">> |
2.3 RabbitMQ實(shí)操入門
參考:
(1)官方入門文檔 https://www.rabbitmq.com/getstarted.html
(2)中文譯文入門:http://raylei.cn/index.php/archives/48/
(3) rabbitmq官方的六種工作模式 https://blog.csdn.net/qq_33040219/article/details/82383127
(4)RABBITMP的GO消息接口:https://godoc.org/github.com/streadway/amqp
(5)所有的樣例代碼
https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/go
2.3.1 HelloWorld 入門
(1)實(shí)現(xiàn)原理
我們將實(shí)現(xiàn)一個(gè)使用RabbitMQ的Hello World示例,其中包含兩個(gè)小程序帝牡,一個(gè)用于發(fā)送消息的send.go, 一個(gè)用于接收消息的receive.go.
在下面的圖示中:"P"表示生產(chǎn)者往毡,"C"表示消費(fèi)者,中間的表格表示隊(duì)列靶溜, 即為RabbitMQ中的消息池开瞭。
關(guān)于Go語言使用RabbitMQ的API,可參考GO amqp.
我們在這里使用amqp庫罩息,因此我們需要先安裝GO amqp客戶端嗤详。
首先,使用go get安裝amqp:
go get github.com/streadway/amqp
(2)Sending 發(fā)送端代碼實(shí)現(xiàn)
我們使用send.go稱把消息生產(chǎn)者瓷炮,receive.go成為消費(fèi)者葱色。send.go連接到RabbitMQ后,發(fā)送一條消息后便退出娘香。
首先苍狰,需要導(dǎo)入amqp:
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
然后,我們編寫一個(gè)通用的輔助方法烘绽,用來檢查每一步amqp調(diào)用的結(jié)果淋昭。另外,在Go語言中經(jīng)常需要使用if語句來檢查操作結(jié)果安接,為了避免在代碼中到處散落if(err != nil)語句翔忽,可以使用下列方法:
func failOnError(err error, msg string){
if err != nil {
log.Fatalf("%s:%s", msg, err)
panic(fmt.Sprintf("%s:%s", msg, err))
}
}
下面,我們來實(shí)現(xiàn)main函數(shù):
1盏檐,連接RabbitMQ服務(wù)器:
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
上面代碼會建立一個(gè)socket連接呀打,處理一些協(xié)議轉(zhuǎn)換及版本對接和登錄授權(quán)的問題。
func Dial接口說明:
func Dial(url string) (*Connection, error)
Dial accepts a string in the AMQP URI format and returns a new Connection over TCP using PlainAuth. Defaults to a server heartbeat interval of 10 seconds and sets the handshake deadline to 30 seconds. After handshake, deadlines are cleared.Dial uses the zero value of tls.Config when it encounters an amqps:// scheme. It is equivalent to calling DialTLS(amqp, nil).
2糯笙,建立連接之后贬丛,我們需要創(chuàng)建一個(gè)通道channel,之后我們的大多數(shù)API操作都是圍繞通道來實(shí)現(xiàn)的:
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
func (Connection) Channel接口說明:*
func (c Connection) Channel() (Channel, error)通道打開唯一的给涕,并發(fā)的服務(wù)通道豺憔,用于處于一堆的AMQP消息额获。當(dāng)該方法在接收端遇到任何錯(cuò)誤時(shí),會導(dǎo)致接收端失效恭应,并且一個(gè)新的通道會被打開抄邀。
Channel opens a unique, concurrent server channel to process the bulk of AMQP messages. Any error from methods on this receiver will render the receiver invalid and a new Channel should be opened.
3, 最后昼榛,我們需要定義一個(gè)隊(duì)列用來存儲境肾、轉(zhuǎn)發(fā)消息,然后我們的sender只需要將消息發(fā)送到這個(gè)隊(duì)列中胆屿,就完成了消息的publish操作奥喻。隊(duì)列Queue待名字,區(qū)分唯一性非迹。
q, err := ch.QueueDeclare(
"hello", //name
false, //durable
false, //delete when unused
false, //exclusive
false, //no wait
nil, //arguments
)
failOnError(err, "Failed to declare q queue")
body := "Hello"
err = ch.Publish(
"", //exchange
q.Name, // routing key
false, //mandatory
false, //immediate
amqp.Publishing{
ContentType: "text/plain",
Body : []byte(body),
}
)
failOnError(err, "Failed to publish a message")
func (Channel) QueueDeclare接口:*
func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)QueueDeclare聲明一個(gè)保存和傳遞消息的隊(duì)列queue环鲤。 如果一個(gè)channel不存在一個(gè)queque,則Declaring函數(shù)創(chuàng)建一個(gè)queue,如果channel已經(jīng)存在了一個(gè)queue憎兽,該函數(shù)用于確認(rèn)存在queque的參數(shù)是否同聲明的冷离。
Every queue declared gets a default binding to the empty exchange "" which has the type "direct" with the routing key matching the queue's name. With this default binding, it is possible to publish messages that route directly to this queue by publishing to "" with the routing key of the queue name.
QueueDeclare("alerts", true, false, false, false, nil)
Publish("", "alerts", false, false, Publishing{Body: []byte("...")})Delivery Exchange Key Queue
key: alerts -> "" -> alerts -> alerts
The queue name may be empty, in which case the server will generate a unique name which will be returned in the Name field of Queue struct.Durable and Non-Auto-Deleted queues will survive server restarts and remain when there are no remaining consumers or bindings. Persistent publishings will be restored in this queue on server restart. These queues are only able to be bound to durable exchanges.
Non-Durable and Auto-Deleted queues will not be redeclared on server restart and will be deleted by the server after a short time when the last consumer is canceled or the last consumer's channel is closed. Queues with this lifetime can also be deleted normally with QueueDelete. These durable queues can only be bound to non-durable exchanges.
Non-Durable and Non-Auto-Deleted queues will remain declared as long as the server is running regardless of how many consumers. This lifetime is useful for temporary topologies that may have long delays between consumer activity. These queues can only be bound to non-durable exchanges.
Durable and Auto-Deleted queues will be restored on server restart, but without active consumers will not survive and be removed. This Lifetime is unlikely to be useful.
Exclusive queues are only accessible by the connection that declares them and will be deleted when the connection closes. Channels on other connections will receive an error when attempting to declare, bind, consume, purge or delete a queue with the same name.
When noWait is true, the queue will assume to be declared on the server. A channel exception will arrive if the conditions are met for existing queues or attempting to modify an existing queue from a different connection.
When the error return value is not nil, you can assume the queue could not be declared with these parameters, and the channel will be closed.
func (Channel) Publish接口說明:*
func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
Publish sends a Publishing from the client to an exchange on the server.從客戶端發(fā)布一個(gè)消息給服務(wù)器端的交換機(jī)exchange。
When you want a single message to be delivered to a single queue, you can publish to the default exchange with the routingKey of the queue name. This is because every declared queue gets an implicit route to the default exchange.
Since publishings are asynchronous, any undeliverable message will get returned by the server. Add a listener with Channel.NotifyReturn to handle any undeliverable message when calling publish with either the mandatory or immediate parameters as true.
Publishings can be undeliverable when the mandatory flag is true and no queue is bound that matches the routing key, or when the immediate flag is true and no consumer on the matched queue is ready to accept the delivery.
This can return an error when the channel, connection or socket is closed. The error or lack of an error does not indicate whether the server has received this publishing.
It is possible for publishing to not reach the broker if the underlying socket is shut down without pending publishing packets being flushed from the kernel buffers. The easy way of making it probable that all publishings reach the server is to always call Connection.Close before terminating your publishing application. The way to ensure that all publishings reach the server is to add a listener to Channel.NotifyPublish and put the channel in confirm mode with Channel.Confirm. Publishing delivery tags and their corresponding confirmations start at 1. Exit when all publishings are confirmed.
When Publish does not return an error and the channel is in confirm mode, the internal counter for DeliveryTags with the first confirmation starts at 1.
type Publishing類型說明
type Publishing struct {
// Application or exchange specific fields,
// the headers exchange will inspect this field.
Headers Table// Properties
ContentType string // MIME content type
ContentEncoding string // MIME content encoding
DeliveryMode uint8 // Transient (0 or 1) or Persistent (2)
Priority uint8 // 0 to 9
CorrelationId string // correlation identifier
ReplyTo string // address to to reply to (ex: RPC)
Expiration string // message expiration spec
MessageId string // message identifier
Timestamp time.Time // message timestamp
Type string // message type name
UserId string // creating user id - ex: "guest"
AppId string // creating application id// The application specific payload of the message
Body []byte
}
Publishing captures the client message sent to the server. The fields outside of the Headers table included in this struct mirror the underlying fields in the content frame. They use native types for convenience and efficiency.
定義隊(duì)列操作具備冪等性纯命,也就是說多次重復(fù)定義西剥,相同名稱的隊(duì)列只會創(chuàng)建一個(gè)。發(fā)送給隊(duì)列的內(nèi)容是byte數(shù)組亿汞,將任意格式數(shù)據(jù)轉(zhuǎn)換成byte數(shù)組是一件很簡單的事情瞭空,因此對于任何格式的數(shù)據(jù),要將它發(fā)送到隊(duì)列中是很容易的留夜。
(3) Receiving 接收端代碼實(shí)現(xiàn)
以上完成了發(fā)送消息的程序,現(xiàn)在來實(shí)現(xiàn)從RabbitMQ隊(duì)列中接收消息的消費(fèi)者程序图甜。不同于消息發(fā)送程序只需要將單一的消息推送至隊(duì)列后推出碍粥,消息接收者需要保持一個(gè)監(jiān)聽程序從隊(duì)列中不斷的接收消息。
1黑毅,首先嚼摩,同樣的導(dǎo)入包和實(shí)現(xiàn)輔助函數(shù):
package main
import {
"fmt"
"log"
"github.com/streadway/amqp"
}
func failOnError(err error, msg string){
if err != nil {
log.Fatalf("%s:%s", msg, err)
panic(fmt.Sprintf("%s:%s", msg, err))
}
}
2,接著矿瘦,與生產(chǎn)者一樣枕面,打開連接并創(chuàng)建通道,注意這里的參數(shù)必須與send中的queue name相一致缚去,這樣才能實(shí)現(xiàn)發(fā)送/接受的配對潮秘。
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
failOnError(err, "Failed to connect to server")
defer conn.Close();
ch, err := conn.Channel()
failOnError(err, "Failed to connect to channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", //name
false, //durable
false, //delete when usused
false, // exclusive
false, //no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
一般來說,接收消息的程序會先于發(fā)送者運(yùn)行易结,因此在這里我們先定義一個(gè)queue枕荞,確保后面發(fā)送者連接到這個(gè)queue時(shí)柜候,當(dāng)前接收消息程序以運(yùn)行。
3躏精,接下來渣刷,需要RabbitMQ服務(wù)器讓它將消息分發(fā)到我們的消費(fèi)者程序中,消息轉(zhuǎn)發(fā)操作是異步執(zhí)行的矗烛,這里使用goroutine來完成從隊(duì)列中的讀取消息操作:
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func(){
for d:= range msgs{
log.Printf("Received a message : %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages, To exit press CTRL+C")
<-forever
func (*Channel) Consume
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
Consume immediately starts delivering queued messages.
Begin receiving on the returned chan Delivery before any other operation on the Connection or Channel.
Continues deliveries to the returned chan Delivery until Channel.Cancel, Connection.Close, Channel.Close, or an AMQP exception occurs. Consumers must range over the chan to ensure all deliveries are received. Unreceived deliveries will block all methods on the same connection.
All deliveries in AMQP must be acknowledged. It is expected of the consumer to call Delivery.Ack after it has successfully processed the delivery. If the consumer is cancelled or the channel or connection is closed any unacknowledged deliveries will be requeued at the end of the same queue.
The consumer is identified by a string that is unique and scoped for all consumers on this channel. If you wish to eventually cancel the consumer, use the same non-empty identifier in Channel.Cancel. An empty string will cause the library to generate a unique identity. The consumer identity will be included in every Delivery in the ConsumerTag field
When autoAck (also known as noAck) is true, the server will acknowledge deliveries to this consumer prior to writing the delivery to the network. When autoAck is true, the consumer should not call Delivery.Ack. Automatically acknowledging deliveries means that some deliveries may get lost if the consumer is unable to process them after the server delivers them. See http://www.rabbitmq.com/confirms.html for more details.
When exclusive is true, the server will ensure that this is the sole consumer from this queue. When exclusive is false, the server will fairly distribute deliveries across multiple consumers.
The noLocal flag is not supported by RabbitMQ.
It's advisable to use separate connections for Channel.Publish and Channel.Consume so not to have TCP pushback on publishing affect the ability to consume messages, so this parameter is here mostly for completeness.
When noWait is true, do not wait for the server to confirm the request and immediately begin deliveries. If it is not possible to consume, a channel exception will be raised and the channel will be closed.
Optional arguments can be provided that have specific semantics for the queue or server.
Inflight messages, limited by Channel.Qos will be buffered until received from the returned chan.
When the Channel or Connection is closed, all buffered and inflight messages will be dropped.
When the consumer tag is cancelled, all inflight messages will be delivered until the returned chan is closed.
(4) Running運(yùn)行
首先辅柴,在命令行中先運(yùn)行消費(fèi)者:
go run receive.go
當(dāng)前程序會一直監(jiān)聽RabbitMQ的隊(duì)列消息,一旦接收到消息后會直接打印出來瞭吃,使用Ctrl+C可以終止程序碌嘀;
接著,在另一個(gè)命令終端中運(yùn)行生產(chǎn)者:
go run send.go
可以看到receive.go程序接收到了當(dāng)前信息:
發(fā)送端:
接收端:
如上就是本系列的RabbitMQ的第一個(gè)例子虱而。你可以在這里下載完整的send.go, receive.go文件筏餐。
2.3.2 工作隊(duì)列
本篇將實(shí)現(xiàn)一個(gè)將耗時(shí)任務(wù)分發(fā)到多個(gè)消費(fèi)者程序的工作隊(duì)列。
工作隊(duì)列的主要思想是避免對資源密集型任務(wù)處理時(shí)的等待牡拇,而是先將任務(wù)壓入隊(duì)列魁瞪,后期再進(jìn)行計(jì)劃處理。我們將任務(wù)封裝成消息發(fā)送給隊(duì)列惠呼,由隊(duì)列程序按策略分發(fā)到所有的在線工作者程序執(zhí)行导俘。當(dāng)有多個(gè)工作程序同時(shí)在線時(shí),多項(xiàng)任務(wù)同時(shí)被多個(gè)不同的工作者處理便成為可能剔蹋。
在Web應(yīng)用程序領(lǐng)域旅薄,如果需要在一個(gè)HTTP短連接中完成一些復(fù)雜的耗時(shí)任務(wù)時(shí),工作隊(duì)列的思想能大幅提高處理效率而帶來了更好的用戶體驗(yàn)泣崩。
(1)準(zhǔn)備
在上一篇介紹中我們發(fā)送一個(gè)固定的字符串“Hello world"到隊(duì)列少梁,然后在接收程序中打印出來。這里沒有實(shí)現(xiàn)諸如圖片大小的調(diào)整矫付、PDF文件的渲染等真實(shí)的復(fù)雜任務(wù)凯沪,而是用特定字符串來表示復(fù)雜任務(wù),導(dǎo)致消息處理程序忙碌买优。任務(wù)處理程序通過time.Sleep函數(shù)讓線程睡眠來模擬復(fù)雜度妨马,以一連串的字符"."來表示任務(wù)的復(fù)雜度,每一個(gè)點(diǎn)表示停頓1秒鐘杀赢,如"Hello..."表示任務(wù)耗時(shí)3秒鐘烘跺。
還是在之前例子的send.go文件上進(jìn)行修改,讓程序通過命令行將任意個(gè)消息參數(shù)傳遞到隊(duì)列脂崔,姑且將新文件命名為new_task.go:
body := bodyFrom(os.Args)
err = ch.Publish(
"", //exchange
q.Name, //routing key
false, //mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
}
)
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
同樣的滤淳,receive.go文件也需要進(jìn)行修改,根據(jù)消息體中"."的個(gè)數(shù)來模擬任務(wù)的耗時(shí)長度砌左。該文件的任務(wù)還是從隊(duì)列中取出一個(gè)任務(wù)并執(zhí)行娇钱,我們姑且稱之為work.go:
msgs, err := ch.Consume(
q.Name, //queue
"", //consumer
true, //auto-ack
false, //exclusive
false, //no-local
false, //no-wait
nil, //args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func(){
for d:= range msgs{
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t* time.Second)
log.Printf("Done")
}
}()
log.Printf(" [*] Waiting for message. To exit press CTRL+C")
<-forever
好了伤柄,到此為止,我們先來看看上述的模擬是否成功:
先運(yùn)行worker.go
#shell 1
go run worker.go
然后另起一個(gè)終端運(yùn)行new_task.go
#shell 2
go run new_task.go First message.
go run new_task.go Second message..
go run new_task.go Third message...
go run new_task.go Fourth message....
go run new_task.go Fifth message.....
接收端的信息:
接收端打印Done輸出分別有1秒文搂,2秒适刀,3秒,4秒煤蹭,5秒的延遲笔喉。
(2)輪詢調(diào)度(Round-robin dispatching)
工作隊(duì)列的優(yōu)勢是能輕松處理多個(gè)積壓的任務(wù),如果有一個(gè)已經(jīng)堆滿的任務(wù)隊(duì)列待處理硝皂,只需添加多個(gè)消費(fèi)者常挚,這些消費(fèi)者便都能對隊(duì)列進(jìn)行消耗。
1稽物,首先奄毡,想象一下如果同時(shí)運(yùn)行兩個(gè)worker.go腳本,當(dāng)生產(chǎn)者不斷發(fā)送消息到隊(duì)列時(shí)贝或,會出現(xiàn)什么情況吼过?
我們需要三個(gè)終端來運(yùn)行這個(gè)小例子,兩個(gè)運(yùn)行worker.go咪奖,我們將他們看成兩個(gè)消費(fèi)者C1和C2.
#shell 1
go run worker.go
#shell 2
go run worker.go
然后在第三個(gè)終端中盗忱,發(fā)送消息到隊(duì)列,你可以嘗試多次羊赵,如下:
#shell 3
go run new_task.go First message..
go run new_task.go Second message....
go run new_task.go Third message......
go run new_task.go Fourth message........
go run new_task.go Fifth message..........
...
這種消息分發(fā)方式叫做round-robin(輪詢調(diào)度).
shell1接收端:(3)消息確認(rèn)
當(dāng)處理一個(gè)長耗時(shí)任務(wù)時(shí)趟佃,任務(wù)處理程序(消費(fèi)者)可能由于某種原因以外崩潰,那么此時(shí)會發(fā)生什么事情呢昧捷?在我們目前的代碼中闲昭,一旦RabbitMQ將消息發(fā)送到消費(fèi)者時(shí)就會將其標(biāo)記并刪除,而不會去關(guān)心消費(fèi)者程序是否執(zhí)行完畢靡挥。因此在這種情形下序矩,如果你關(guān)閉了一個(gè)正在處理某項(xiàng)任務(wù)的消費(fèi)者時(shí),會導(dǎo)致其正在處理的及已分發(fā)給它卻還沒來得及處理的任務(wù)丟失芹血。
然而在很多真實(shí)情況下贮泞,我們并不希望丟失掉任何一條消息楞慈,如訂單信息幔烛、支付信息等。當(dāng)某一消費(fèi)者突然崩潰后囊蓝,我們希望將其未處理完畢的消息轉(zhuǎn)發(fā)到其他消費(fèi)者進(jìn)行處理饿悬,這種思想有如我們常見的主備設(shè)置策略。
RabbitMQ提供消息確認(rèn)機(jī)制來確保每一個(gè)消息都不會丟失聚霜,其原理是當(dāng)RabbitMQ接收到一個(gè)從消費(fèi)者發(fā)出的表明任務(wù)已處理完畢的確認(rèn)包(ack)后狡恬,才其從隊(duì)列中釋放刪除珠叔。
如果某一個(gè)消費(fèi)者突然崩潰(如通道關(guān)閉、連接關(guān)閉或TCP連接丟失)而沒有發(fā)出確認(rèn)包弟劲,RabbitMQ將會認(rèn)為該消息并沒有被完全處理祷安,因此會重新將其加入到隊(duì)列中。如果在此時(shí)還有其他消費(fèi)者在線兔乞,那么當(dāng)前消息也會很快被分發(fā)處理掉汇鞭,這樣即使在某些消費(fèi)者意外掉線關(guān)閉的情況下,我們也能確保所有消息會被丟失庸追。
消息確認(rèn)沒有超時(shí)機(jī)制霍骄,RabbitMQ只會在消費(fèi)者Down掉之后才進(jìn)行重新分發(fā),因此即使對于某些耗時(shí)很長的任務(wù)也不會有影響淡溯。
在這個(gè)Demo里面读整,我們將Consume()函數(shù)的auto-ack參數(shù)設(shè)為false,然后當(dāng)任務(wù)處理完畢之后通過d.Ack(false)手動發(fā)送一個(gè)確認(rèn)消息.
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
這樣,即使我們通過Ctrl+C來關(guān)閉某一個(gè)正在處理消息的消費(fèi)者咱娶,其消息也不會丟失米间,RabbitMQ馬上就會將當(dāng)前未確認(rèn)的消息轉(zhuǎn)發(fā)的其他消費(fèi)者處理。
需要注意的是消息確認(rèn)包的目的地必須是當(dāng)前消息的接收通道豺总,如果將確認(rèn)包發(fā)送到其他通道時(shí)會引發(fā)異常车伞。更多的信息科參考doc guide on confirmations.
Forgotten acknowledgment
忘記對消息進(jìn)行確認(rèn)是一個(gè)比較常見的錯(cuò)誤,這個(gè)錯(cuò)誤很容易犯喻喳,但是后果很嚴(yán)重另玖。
當(dāng)消費(fèi)者退出后消息會重發(fā),卻永遠(yuǎn)沒有確認(rèn)刪除的包表伦,因此RabbitMQ消息越積越多就會吃掉越來越多的內(nèi)存谦去,最后可能導(dǎo)致崩潰。
對于這種未確認(rèn)的消息調(diào)試蹦哼,我們可以使用rabbitmqcrl命令來打印message_unacknowledged的內(nèi)容:
# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
task_queue 5
hello 0
(4)消息持久化
前面我們講解了當(dāng)消費(fèi)者程序Down掉如何保證消息不丟失鳄哭。可如果是RabbitMQ崩潰呢纲熏?消息還能保證不丟失嗎妆丘?
當(dāng)RabbitMQ退出或崩潰時(shí),除非你明確地指定局劲,否則所有的隊(duì)列和消息都會丟失勺拣。要做到消息不丟失需滿足兩個(gè)條件:隊(duì)列和消息的持久化。
首先鱼填,要保證隊(duì)列不會丟失药有,可將隊(duì)列聲明為持久化:
q, err := ch.QueueDeclare(
"hello", //name
true, //durable
false, //delete when unused
false, //exclusive
false, //no-wait
nil, //arguments
)
failOnError(err, "Failed to declare a queue")
上述代碼看起來沒有問題,但到目前為止,如果直接就這樣運(yùn)行愤惰,那么隊(duì)列還是無法持久化而導(dǎo)致丟失苇经。這是因?yàn)槲覀冎耙呀?jīng)定義了一個(gè)名為“Hello”的隊(duì)列,RabbitMQ不允許創(chuàng)建多個(gè)名稱相同而參數(shù)不同的隊(duì)列宦言,這個(gè)跟函數(shù)重載有區(qū)別残制,但這種情況發(fā)生時(shí)篓足,RabbitMQ會返回錯(cuò)誤。既然如此,直接換個(gè)名字:task_queue掏父,
q, err := ch.QueueDeclare(
"task_queue", //name
true, //durable
false, //delete when unused
false, //exclusive
false, //no-wait
nil, //arguments
)
failOnError(err, "Failed to declare a queue")
注意:durable參數(shù)在生產(chǎn)者和消費(fèi)者程序中都要指定為True鸯匹。
現(xiàn)在渺氧,task_queue隊(duì)列即使在RabbitMQ重啟之后也不會丟失了洞渔。接著就需要實(shí)現(xiàn)對消息的持久化,這個(gè)也很簡單稽寒,只需要在amqp.Publishing函數(shù)中設(shè)置一下amqp.Persistent參數(shù)即可:
err = ch.Publishing(
"", //exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
}
)
關(guān)于消息持久化
將消息設(shè)置為Persistent并不能百分百地完全保證消息不會丟失扮碧。雖然RabbitMQ知道要將消息寫到磁盤,但在RabbitMQ接收到消息和寫入磁盤前還是有個(gè)時(shí)間空檔杏糙。
因?yàn)镽abbitMQ并不會對每一個(gè)消息都執(zhí)行fsync(2),因此消息可能只是寫入緩存而不是磁盤慎王。
所以Persistent選項(xiàng)并不是完全強(qiáng)一致性的,但在應(yīng)付我們的簡單場景已經(jīng)足夠宏侍。如需對消息完全持久化赖淤,可參考publisher confirms.
(5)公平分發(fā)
有時(shí)候隊(duì)列的輪詢調(diào)度并不能滿足我們的需求,假設(shè)有這么一個(gè)場景谅河,存在兩個(gè)消費(fèi)者程序咱旱,所有的單數(shù)序列消息都是長耗時(shí)任務(wù)而雙數(shù)序列消息則都是簡單任務(wù),那么結(jié)果將是一個(gè)消費(fèi)者一直處于繁忙狀態(tài)而另外一個(gè)則幾乎沒有任務(wù)被掛起绷耍。當(dāng)RabbitMQ對此情況卻是視而不見吐限,仍然根據(jù)輪詢來分發(fā)消息。
導(dǎo)致這種情況發(fā)生的根本原因是RabbitMQ是根據(jù)消息的入隊(duì)順序進(jìn)行派發(fā)褂始,而并不關(guān)心在線消費(fèi)者還有多少未確認(rèn)的消息诸典,它只是簡單的將第N條消息分發(fā)到第N個(gè)消費(fèi)者:
為了避免這種情況,我們可以給隊(duì)列設(shè)置預(yù)取數(shù)(prefect count)為1崎苗。它告訴RabbitMQ不要一次性分發(fā)超過1個(gè)的消息給某一個(gè)消費(fèi)者狐粱,換句話說,就是當(dāng)分發(fā)給該消費(fèi)者的前一個(gè)消息還沒有收到ack確認(rèn)時(shí)胆数,RabbitMQ將不會再給它派發(fā)消息肌蜻,而是尋找下一個(gè)空閑的消費(fèi)者目標(biāo)進(jìn)行分發(fā)。
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set Qos")
關(guān)于隊(duì)列長度
NOTE:如果所有的消費(fèi)者都繁忙幅慌,隊(duì)列可能會被消息填滿宋欺。你需要注意這種情況轰豆,要么通過增加消費(fèi)者來處理胰伍,要么改用其他的策略齿诞。
(6)整合上面的代碼
我們將上面的片段整合起來,那么new_task.go:
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body := bodyFrom(os.Args)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
Github地址 new_task.go.
worker.go文件如下:
package main
import (
"bytes"
"fmt"
"github.com/streadway/amqp"
"log"
"time"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
Github地址:worker.go.
最后骂租,為了驗(yàn)證上面輪詢調(diào)度祷杈、消息持久化和公平分發(fā)的特性,你可以多開幾個(gè)Shell窗口渗饮,發(fā)幾條長耗時(shí)的消息但汞,然后停掉某一些worker或重啟RabbitMQ就能觀察到與之相符的現(xiàn)象。
(7)總結(jié)
本篇介紹了通過消息確認(rèn)機(jī)制和設(shè)置預(yù)取消息長度的方式來實(shí)現(xiàn)一個(gè)工作隊(duì)列互站,而持久化選項(xiàng)的設(shè)置可以保證隊(duì)列和消息在出現(xiàn)消費(fèi)者崩潰或RabbitMQ重啟的異常情況下都不會丟失私蕾。
有關(guān)于更多amqp.Channel方法和消息屬性的內(nèi)容,可以參考amqp API reference.
2.3.3 發(fā)布/訂閱模式
在本篇中胡桃,我們嘗試將同一個(gè)消息發(fā)送給多個(gè)消費(fèi)者進(jìn)行處理踩叭,這就是廣為人知的發(fā)布/訂閱模式。
本篇通過搭建一個(gè)日志系統(tǒng)來闡述發(fā)布/訂閱模式翠胰,它包含兩部分內(nèi)容:一個(gè)用于產(chǎn)生日志消息的程序容贝,另一個(gè)用于接收和打印消息。
在這個(gè)日志系統(tǒng)中之景,每一份接收者程序的拷貝都能收到消息斤富,因此我們可以輕易地使用一個(gè)程序?qū)⑷罩緦懭氪疟P,而另一個(gè)程序直接在屏幕顯示锻狗。
本質(zhì)上來說满力,當(dāng)系統(tǒng)收到一個(gè)日志處理請求時(shí),會把這個(gè)消息廣播給所有的接收者轻纪。
(1)Exchanges
之前的介紹中脚囊,我們都是以隊(duì)列為中介進(jìn)行消息的發(fā)送和接收,現(xiàn)在將完整的介紹一下RabbitMQ的消息模式桐磁。
對前述內(nèi)容做一個(gè)簡單總結(jié):
- 一個(gè)producter(生產(chǎn)者)是指用于發(fā)送消息的用戶程序悔耘;
- 一個(gè)queue(隊(duì)列)是用來存儲消息的緩沖區(qū);
- 一個(gè)consumer(消費(fèi)者)是用來接收消息的用戶程序我擂;
RabbitMQ消息模式的核心內(nèi)容是一個(gè)producter永遠(yuǎn)不會將消息直接發(fā)送給隊(duì)列衬以。
Producter甚至都不知道其產(chǎn)生的消息會被分發(fā)到哪一個(gè)隊(duì)列,實(shí)際上校摩,producter只會將消息發(fā)送給exchange.Exchange很好理解看峻,類似于一個(gè)中轉(zhuǎn)站,它就是將從producter中接收到的消息轉(zhuǎn)發(fā)給與之綁定的隊(duì)列衙吩。
當(dāng)Exchange接收到消息后互妓,它是如何來確定對消息進(jìn)行處理的呢?是將消息發(fā)送到指定的一個(gè)隊(duì)列,還是廣播到所有隊(duì)列冯勉,或者是直接將其忽略澈蚌?
這一切都由Exchange定義是的類型(type)來控制。
RabbitMQ共有四種exchange類型:direct, topic, headers, fanout.我們這里使用的最后一種fanout灼狰,現(xiàn)在就讓我們來定義一個(gè)type宛瞄,取名logs:
err = ch.ExchangeDeclare(
"logs", //name
"fanout", //type
"true", //durable
false, //auto-deleted
false, //internal
false, //no-wait
nil, //arguments
)
這個(gè)fanout類型的exchange很簡單,顧名思義:它將所接收到的消息廣播給所有綁定的隊(duì)列交胚。這也正是日志系統(tǒng)說要做的工作份汗。
查看exchange
可以通過rabbitmqctl命令來查看所有的exchanges:sodu rabbitmqctl list_exchanges
命令執(zhí)行后,會列出一些amq.*名稱的exchanges,這是系統(tǒng)默認(rèn)存在的蝴簇,我們現(xiàn)在還用不到這些杯活。
默認(rèn)的exchange
你可能會感到奇怪,前面例子并沒有提及生產(chǎn)者只能將消息發(fā)送給exchange熬词,為什么程序仍能將消息發(fā)送給隊(duì)列轩猩?原因在于我們使用了一個(gè)默認(rèn)的exchange,代碼中就是Publish函數(shù)的參數(shù)使用了空字符串"":
來看看之前的publish代碼:
> err = ch.Publish(
> "", //exchange
> q.Name, //routing key
> false, //mandatory
> false, //immediate
> amqp.Publishing(
> ContentType: "text/plain",
> Body: []byte(body),
> )
> )
使用一個(gè)無命名的或默認(rèn)的exchange荡澎,消息將會根據(jù)routing_key所指定的參數(shù)進(jìn)行查找均践,如果存在就會分發(fā)到相應(yīng)的隊(duì)列。
既然講到的exchange,那么我們可以使用前面定義的exchange來代替默認(rèn)值:
err = ch.ExchangeDeclare(
"logs", //name
"fanout", //type
true, //durable
false, //auto-deleted
false, //internal
false, //no-wait
nil, //arguments
)
failOnError(err, "Failed to declare an exchange")
body:= bodyForm(os.Args)
err = ch.Publish(
"logs", //exchange
"", //routing key
false, //mandatory
false, //immediate
amqp.Publishing(
ContentType: "text/plain",
Body: []byte(body),
)
)
(2)臨時(shí)隊(duì)列
不出意外摩幔,你應(yīng)該還對前面使用過的兩個(gè)命名隊(duì)列(hello和task_queue)有印象,在使用命名隊(duì)列時(shí)必須讓生產(chǎn)者和消費(fèi)者都是用同一個(gè)名稱的隊(duì)列彤委,否則消息將無法在兩者之間進(jìn)行傳遞。
但在這里名字不是關(guān)心的重點(diǎn)或衡,因?yàn)槲覀兊娜罩鞠到y(tǒng)需要記錄所有的消息焦影,而不是其中的一部分。我們比較關(guān)心的是消費(fèi)者程序接收和處理的消息都應(yīng)該是未處理過的封断。
為了確保這一點(diǎn)斯辰,我們需要兩個(gè)條件:
首先,無論何時(shí)當(dāng)消費(fèi)者連接到Rabbit時(shí)我們需要一個(gè)新的坡疼、空的隊(duì)列彬呻,因此就不會存在之前的消息。我們可以通過創(chuàng)建一個(gè)隨機(jī)名字的隊(duì)列來實(shí)現(xiàn)柄瑰,而更好的方法是:讓服務(wù)器自己選擇一個(gè)隨機(jī)隊(duì)列給我們闸氮。
再者,當(dāng)我們的消費(fèi)者程序斷開連接時(shí)教沾,這個(gè)隊(duì)列要能自動的刪除蒲跨。
在amqp客戶端中,當(dāng)我們將空字符串指定為隊(duì)列名字時(shí)授翻,將會創(chuàng)建一個(gè)非持久化的或悲、帶有隨機(jī)命名的隊(duì)列:
q, err := ch.QueueDeclare(
"", //name
false, //durable
false, //delete when unused
true, //exclusive
false, //no-wait
nil, //arguments
)
當(dāng)這個(gè)函數(shù)返回時(shí)孙咪,RabbitMQ將創(chuàng)建一個(gè)帶有隨機(jī)名字的隊(duì)列,如amq.gen-JzTY20BRgKO-HjmUJj0wLg.
當(dāng)這個(gè)連接被關(guān)閉時(shí)巡语,隊(duì)列將會被刪除翎蹈,因?yàn)槠浔欢x為獨(dú)有的(exclusive)。
(3)綁定
到現(xiàn)在捌臊,我們已經(jīng)創(chuàng)建了一個(gè)fanout類型的exchange和一個(gè)隊(duì)列,然而exchange并不知道它要哪個(gè)隊(duì)列是應(yīng)該被分發(fā)消息的兜材。因此理澎,我們需要明確的指定exchange和隊(duì)列隊(duì)列之間的關(guān)系,這個(gè)操作稱之為綁定曙寡。
err = ch.QueueBind(
q.Name, //queue name
"", //routing key
"logs", //exchange
false,
nil
)
現(xiàn)在糠爬,logs exchange中的消息就會被分發(fā)到我們的隊(duì)列。
查看綁定列表
仍然可以通過命令來查看:
rabbitmqctl list_bindings
(4)完整的例子
生產(chǎn)者程序看起來跟之前例子區(qū)別不大举庶,最重要的不同就是這里將消息發(fā)送給名為logs的exchange执隧,而不是直接發(fā)送到默認(rèn)隊(duì)列。在發(fā)送消息時(shí)需要提供一個(gè)routingKey户侥,但是在fanout類型的exchange中這個(gè)值是被忽略的镀琉。
那么,emit_log.go的腳本就是:
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish(
"logs", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
Github地址:emit_log.go
需要注意蕊唐,我們必須在建立了連接之后才能定義exchange屋摔,否則會報(bào)錯(cuò)。
如果exchange沒有綁定任何一個(gè)隊(duì)列替梨,那么消息將會丟失而沒有得到處理钓试,但在這個(gè)例子里,這種情況是允許的副瀑,如果沒有任何一個(gè)隊(duì)列來消費(fèi)這些消息弓熏,那么就直接忽略掉就好。
下面是receive_logs.go的代碼:
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"logs", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
Github地址:receive_logs.go
如果你想將日志消息保存到文件糠睡,只需在命令終端中執(zhí)行下面的命令:
go run receive_logs.go > logs_from_rabbit.log
如果想直接打印到屏幕上挽鞠,在另一個(gè)終端中執(zhí)行:
go run receive_logs.go
當(dāng)然,發(fā)送消息的命令如下:
go run emit_log.go
使用rabbitmqctl list_bindings命令可以查看上面代碼所創(chuàng)建的綁定關(guān)系狈孔,如當(dāng)運(yùn)行兩個(gè)receive_logs.go之后滞谢,可能會得到如下的結(jié)果:
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => ...done.
其結(jié)果是很容易理解的:從logs exchange中的消息會被轉(zhuǎn)發(fā)到兩個(gè)由系統(tǒng)命名的隊(duì)列中。這也正是我們所期望的除抛。
下一節(jié)將會介紹如何對消息進(jìn)行篩選狮杨,不監(jiān)聽所有消息,而是監(jiān)聽其中的一個(gè)子集到忽。
2.3.4 路由
在本篇中橄教,將介紹如何對消息進(jìn)行過濾清寇,從而只處理我們感興趣的消息。如只把一些嚴(yán)重的錯(cuò)誤信息寫入磁盤护蝶,但對所有類型的消息都打印到屏幕华烟。
(1)Binding
前面例子中我們使用了如下綁定:
err = ch.QueueBind(
q.Name, //queue name
"", //routing key
"logs", //exchange
false,
nil)
綁定是交換器(exchange)和隊(duì)列之間的關(guān)系,我們可以簡單的理解為:隊(duì)列對其綁定的交換器的消息感興趣持灰。
綁定函數(shù)Bind()可以指定routing_key參數(shù)盔夜,為了避免跟之前的Channel.Publish的參數(shù)混淆,我們稱之為binding_key堤魁。下面就是使用了綁定參數(shù)的例子:
err = ch.QueueBind(
q.Name, //queue name
"black", //routing key
"logs", //exchange
false,
nil)
綁定參數(shù)的作用取決于exchange的類型喂链,前面例子中使用的fanout類型的exchange是會忽略掉這個(gè)值的。
(2)Direct exchange
前面文章中的日志系統(tǒng)會將所有消息廣播分發(fā)到所有的消費(fèi)者處理程序⊥兹現(xiàn)在我們想要擴(kuò)展成為根據(jù)消息的嚴(yán)重程度來過濾分發(fā)椭微,如只將嚴(yán)重的error級別的日志寫入磁盤,而不寫入info和warn類型的日志消息以節(jié)省磁盤空間盲链。
前面使用的fanout exchange蝇率,只是對消息一味的廣播轉(zhuǎn)發(fā),可擴(kuò)展性差刽沾,無法滿足我們的需求本慕。所以,我們使用Direct exchange進(jìn)行替代侧漓。Direct exchange的路由算法很簡單:就是將exchange的binding_key和消息的routing_key進(jìn)行比較间狂,如果完全匹配這說明是需要分發(fā)的隊(duì)列。
如下圖配置:
在圖中火架,direct exchange X有兩個(gè)隊(duì)列與之綁定鉴象。隊(duì)列Q1的binding_key是orange, 而隊(duì)列Q2有兩個(gè)binding_key: black,green.
由此,當(dāng)發(fā)送routing_key為orange的消息時(shí)會被路由到Q1何鸡,而帶有black或green的routing_key的消息則會被分發(fā)到Q2纺弊,其他類型的消息都會被忽略。
(3)多重綁定
Direct exchange會將消息廣播至所有匹配的綁定隊(duì)列骡男,因此很容易實(shí)現(xiàn)對同一個(gè)binding_key需要分發(fā)到多個(gè)隊(duì)列的情況淆游。如圖,帶有routing_key的消息會被分發(fā)到Q1和Q2兩個(gè)隊(duì)列隔盛。
(4)發(fā)送日志
考慮下如何實(shí)現(xiàn)犹菱?首先需要使用direct類型的exchange替換掉fanout類型,然后在發(fā)送消息是用routing_key來表示日志級別吮炕,而接收消息的腳本需要指定接收哪些級別的消息腊脱。先來看看如何發(fā)送日志:
首先創(chuàng)建exchange:
err = ch.ExchangeDeclare(
"logs_direct", //name
"direct", //type
true, //durable
false, //auto-deleted
false, //internal
false, //no-wait
nil, //arguments
)
然后準(zhǔn)備發(fā)送消息:
err = ch.ExchangeDeclare(
"logs_direct", //name
"direct", //type
true, //durable
false, //auto-deleted
false, //internal
false, //no-wait
nil, //arguments
)
failOnError(err, "Failed declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish(
"logs_direct", //exchange
severityFrom(os.Args), //routing key
false, //mandatory
false, //immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
}
)
為了簡單,我們假設(shè)消息的級別為"info", "warning", "error"三者中的一個(gè).
(5)訂閱
接收消息的程序跟之前的大體相同龙亲,只是需要為每一種級別的日志消息新建一個(gè)綁定:
q, err := ch.QueueDeclare(
"", //name
false, //durable
false, //delete when unused
true, //exclusive
false, //no-wait
nil, //arguemnts
)
failOnError(err, "Failed to declare queue.")
if len(os.Args) < 2 {
log.Printf("Usage: %s [info] [warnint] [error]", os.Args[0])
os.Exit(0)
}
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s",
q.Name, "logs_direct", s)
err = ch.QueueBind(
q.Name, // queue name
s, // routing key
"logs_direct", //exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
}
(6)整個(gè)文件
emit_log_direct.go文件:
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string){
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open an channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_direct", //name
"direct", //type
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish(
"logs_direct", // exchange
severityFrom(os.Args), //routing key
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] sent %s", body)
}
func bodyFrom(args []string) string{
var s string
if(len(args) < 3) || os.Args[2] == "" {
s = "hello"
}else{
s = strings.Join(args[2:], " ")
}
return s
}
func severityFrom(args []string) string {
var s string
if len(args) < 2 || args[1] == "" {
s = "info"
}else {
s = os.Args[1]
}
return s
}
GitHub地址:emit_log.direct.go
receive_logs_direct.go文件:
package main
import(
"fmt"
"log"
"os"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string){
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_direct",
"direct",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", //name
false,
false,
true,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
if len(os.Args) < 2 {
log.Printf("Usage: %s [info] [warning] [error]")
os.Exit(0)
}
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s",
q.Name, "logs_direct", s)
err = ch.QueueBind(
q.Name, //queue name
s, //routing key
"logs_direct", //exchange
false,
nil,
)
failOnError(err, "Failed to bind a queue")
}
msgs, err := ch.Consume(
q.Name, //name
"", //consumer
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func(){
for d:= range msgs{
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To Exit press Ctrl+c")
<-forever
}
GitHub地址:receive_logs.direct.go
(7)運(yùn)行結(jié)果
將"warning"和"error"級別的消息都寫入磁盤陕凹,只需運(yùn)行:
go run receive_logs_direct.go warning error>logs_from_rabbit.log
將所有消息都打印到屏幕:
go run receive_logs_direct.go warning error info
而發(fā)送消息:
go run emit_log_direct.go error "this is a log message"
2.3.5 主題交換器
如果想讓系統(tǒng)不能僅根據(jù)日志級別來定義悍抑,還能根據(jù)發(fā)送日志的源信息來訂閱。如unix工具syslog杜耙,就是根據(jù)日志級別(info/warn/crit..)和設(shè)備(auth/cron/kern)來進(jìn)行路由的搜骡。這會提供更多的靈活性,如可以做到監(jiān)聽所有來自'cron'和'kern'設(shè)備的error信息佑女。
為了實(shí)現(xiàn)這種靈活性记靡,需要來學(xué)習(xí)下另外一個(gè)功能更綜合的topic類型的交換器(exchange).
(1)Topic exchange
Topic類型的exchange消息的routing key是有一定限制的,必須是一組使用“.”分開的單詞团驱。單詞可以是任意的摸吠,但是一般來說以能準(zhǔn)確的表達(dá)功能的為佳。如以下的例子都是合法的:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".Routing key可以是任意多個(gè)單詞組成店茶,但其總長度不能超過255個(gè)字節(jié)蜕便。
Topic exchange的binding key跟之前的沒有太大區(qū)別劫恒,其邏輯跟direct一樣贩幻,其接收到的消息會分發(fā)到所有與其routing key相匹配的綁定隊(duì)列。以下兩個(gè)通配符也可作為binding key使用:
* 表示一個(gè)單詞
# 表示0或多個(gè)單詞
看例子:
上圖中两嘴,消息的routing key用三個(gè)單詞來表示丛楚,依次表示速度、顏色憔辫、種類趣些,其形式如:"<speed>.<color>.<species>”。
圖中有三個(gè)綁定:Q1的綁定鍵是".orange.", Q2的綁定鍵是"..rabbit"和"lazy.#".
這三個(gè)綁定規(guī)則可以簡單概括為:
Q1對orange顏色的動物感興趣贰您;
Q2則對所有物種是rabbit的坏平、速度是lazy的所有動物感興趣;
舉例來說明帶有不同routing key的消息會被路由到哪個(gè)隊(duì)列:
"quick.orange.rabbit": 分發(fā)到Q1和Q2锦亦;
"lazy.orange.elephant": 分發(fā)到Q1和Q2;
"quick.orange.fox": 分發(fā)到Q1舶替;
"lazy.brown.fox": 分發(fā)到Q2;
"lazy.pink.rabbit": 分發(fā)到Q2,且只會分發(fā)一次杠园,雖然它匹配了Q2的兩個(gè)綁定顾瞪;
"quick.brown.fox": Q1、Q2都不分發(fā)抛蚁,因?yàn)闆]有與之匹配的綁定規(guī)則陈醒,將會被忽略;
"orange": 無匹配規(guī)則瞧甩,丟失
"quick.orange.male.rabbit": 無匹配規(guī)則钉跷,丟失
"lazy.orange.male.rabbit": 分發(fā)到Q2,雖然有4個(gè)單詞肚逸,但符合"lazy.#"規(guī)則的通配符
Topic exchange
Topic exchange很靈活尘应,也很容易用此實(shí)現(xiàn)其他類型的功能.
如果將"#"指定為綁定鍵惶凝,那么就會接收所有的消息,相當(dāng)于fanout類型的廣播犬钢;
如果通配符"*","#"均不作為綁定鍵使用苍鲜,那么其功能實(shí)現(xiàn)就等同于direct類型;
(2)整個(gè)文件
在前一篇的基礎(chǔ)上實(shí)現(xiàn)topic exchange玷犹,只需做少許改的即可,這里我們假設(shè)routing key由兩個(gè)單詞組成混滔,類似于"<facility>.<severity>".
emit_log_topic.go:
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string){
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open an channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_topic", //name
"topic", //type
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish(
"logs_topic", // exchange
severityFrom(os.Args), //routing key
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] sent %s", body)
}
func bodyFrom(args []string) string{
var s string
if(len(args) < 3) || os.Args[2] == "" {
s = "hello"
}else{
s = strings.Join(args[2:], " ")
}
return s
}
func severityFrom(args []string) string {
var s string
if len(args) < 2 || args[1] == "" {
s = "info"
}else {
s = os.Args[1]
}
return s
}
receive_logs_topic.go:
package main
import(
"fmt"
"log"
"os"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string){
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main(){
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs_topic",
"topic",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", //name
false,
false,
true,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
if len(os.Args) < 2 {
log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])
os.Exit(0)
}
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s",
q.Name, "logs_topic", s)
err = ch.QueueBind(
q.Name,
s,
"logs_topic",
false,
nil)
failOnError(err, "Failed to bind a queue")
}
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func(){
for d:= range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
(3)運(yùn)行
接收所有消息:
go run receive_logs_topic.go "#"
接收來自"kern"設(shè)備的消息:
go run receive_logs_topic.go "kern.*"
接收所有以"critical"結(jié)尾的消息:
go run receive_logs_topic.go "*.critical"
創(chuàng)建多重綁定:
go run receive_logs_topic.go "kern.*" "*.critical"
發(fā)送消息:
go run emit_log_topic.go "kern.critical" "A critical kernal error"
上述假設(shè)都是基于兩個(gè)單詞的,你也可以試一下設(shè)置其他長度單詞的routing key看看會發(fā)生什么歹颓。
2.3.6 遠(yuǎn)程過程調(diào)用(RPC)
如果需要在一個(gè)遠(yuǎn)程機(jī)器上執(zhí)行一個(gè)函數(shù)然后等待它的返回結(jié)果應(yīng)該怎樣坯屿?這個(gè)過程稱之為遠(yuǎn)程過程調(diào)用(RPC:Remote Procedure Call).
本篇將介紹如何利用RabbitMQ實(shí)現(xiàn)一個(gè)包含客戶端和可擴(kuò)展服務(wù)端的RPC系統(tǒng),仍然跟之前的一樣巍扛,利用模擬計(jì)算來替代真實(shí)的耗時(shí)任務(wù)领跛,這里使用計(jì)算斐波那契數(shù)列函數(shù)。
(1)回調(diào)隊(duì)列(Callback queue)
RPC系統(tǒng)的模式是客戶端發(fā)送請求給服務(wù)器撤奸,服務(wù)器接收處理后回復(fù)一條響應(yīng)消息吠昭。RabbitMQ對此提供了很好的支持,通過在請求中指定callback的回調(diào)隊(duì)列地址來實(shí)現(xiàn)接收服務(wù)器的響應(yīng)消息:
q, err := ch.QueueDeclare(
"", //name
false, //durable
false, //delete when usused
true, //exclusive
false, //nowait
nil, //argments
)
err = ch.Publish(
"", //exchange
"rpc_queue" //routing key
false, //mandatory
false, //immediate
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: corrId,
ReplyTo: q.Name,
Body: []byte(strconv.Itoa(n)),
}
)
Message properties
AMQP 0-9-1協(xié)議中共定義了14個(gè)消息屬性胧瓜,其中大部分是不常用的矢棚,常用的有以下幾個(gè):
- persistent: 標(biāo)記消息是持久化(true)或者臨時(shí)的(false),該屬性在第二篇文章中有介紹;
- content_type: 用來描述mime-type的編碼,如JSON類型:application/json;
- reply_to: 用于標(biāo)記回調(diào)隊(duì)列的名稱府喳;
- correlation_id: 用來表示request和response的關(guān)聯(lián)關(guān)系蒲肋;
(2)Correlation Id
RPC server對Client請求的響應(yīng)同樣需要通過消息隊(duì)列來傳遞,可以對每一次請求創(chuàng)建一個(gè)回調(diào)隊(duì)列钝满,但這種方式效率很低兜粘,更好的方式是:對于每一個(gè)客戶端只創(chuàng)建一個(gè)回調(diào)隊(duì)列铆农。
但這樣會帶來一個(gè)問題:回調(diào)隊(duì)列接收到一個(gè)response之后耽装,如何確定其對應(yīng)的request?這就需要 correlataion_id來標(biāo)識晾嘶∈炖簦客戶端在request中添加一個(gè)唯一的correlation_id距糖,在接收到服務(wù)器返回的response時(shí),根據(jù)該值來確定與之匹配的request并處理牵寺。如果未能找到與之匹配的correlation_id悍引,說明該response并不屬于當(dāng)前client的請求,為了安全起見帽氓,將其忽略即可趣斤。
我們可能會問:為什么在沒有找到與之匹配的correlation_id時(shí)是將其忽略而不是失敗報(bào)錯(cuò)?這是考慮到服務(wù)端的競爭條件:假設(shè)RPC server在發(fā)送response后宕機(jī)了黎休,而此時(shí)卻沒能對當(dāng)前request發(fā)出確認(rèn)消息(ack).如果這種情況出現(xiàn)浓领,該請求還在隊(duì)列中會被再次派發(fā)玉凯。因此當(dāng)前Request會在服務(wù)端處理兩次,也會給客戶端發(fā)送兩次Response联贩,故而漫仆,client要能處理重復(fù)的response,而server端對于Request需要實(shí)現(xiàn)冪等泪幌。
(3)總結(jié)
RPC的工作過程如下:
當(dāng)Client啟動時(shí)盲厌,會創(chuàng)建一個(gè)匿名的、獨(dú)有的回調(diào)隊(duì)列祸泪;
對每一個(gè)RPC Request吗浩,Client都會設(shè)置兩個(gè)參數(shù):用于標(biāo)識回調(diào)隊(duì)列的reply_to和用于唯一標(biāo)識的correlation_id;
Request被發(fā)送到rpc_queue隊(duì)列。
RPC服務(wù)器等待rpc_queue的消息没隘,一旦消息到達(dá)懂扼,處理任務(wù)后將響應(yīng)結(jié)果消息發(fā)送到reply_to指定的隊(duì)列;
Client等待callback隊(duì)列的消息右蒲,一旦消息到達(dá)阀湿,查找與correlation_id匹配的request,然后返回給應(yīng)用程序品嚣。
(4)代碼
首先看下斐波那契數(shù)列(Fibonacci)函數(shù):
斐波那契數(shù)列指的是這樣一個(gè)數(shù)列:
這個(gè)數(shù)列從第3項(xiàng)開始炕倘,每一項(xiàng)都等于前兩項(xiàng)之和钧大。
func fib(n int) int {
if n== 0 {
return 0
}else if n==1 {
return 1
}else {
return fib(n-1) + fib(n-2)
}
}
這里假設(shè)輸入都是正整數(shù)翰撑,且不指望它對大整數(shù)有效,因?yàn)檫@個(gè)方式可能是效率最差的了啊央。
rpc_server.go文件:
package main
import (
"fmt"
"log"
"strconv"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func fib(n int) int {
if n== 0 {
return 0
}else if n==1 {
return 1
}else {
return fib(n-1) + fib(n-2)
}
}
func main(){
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"rpc_queue", //name
false, //durables
false, //delete when unused
false, //exclusive
false, //no wait
nil, //args
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, //global
)
failOnError(err, "Failed to set Qos")
msgs, err := ch.Consume(
q.Name, //queue
"", //exchange
false, // auto-ack
false, //exclusive
false, //no-local
false, //no-wait
nil, //args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
n, err := strconv.Atoi(string(d.Body))
failOnError(err, "Failed to convert body to an integer")
log.Printf(" [.] fib(%d)", n)
response := fib(n)
err = ch.Publish(
"", //exchange
d.ReplyTo, //routing key
false, //mandatory
false, //immediate
amqp.Publishing{
ContentType : "text/plain",
CorrelationId: d.CorrelationId,
Body: []byte(strconv.Itoa(response)),
})
failOnError(err, "Failed to publish a message")
d.Ack(false)
}
}()
log.Printf(" [*] Awaiting RPC reqeusts")
<-forever
}
服務(wù)端的代碼簡單明了:
首先建立RabbitMQ的連接眶诈、創(chuàng)建通道和定義隊(duì)列;
其次如果是多服務(wù)器進(jìn)程瓜饥,可以通過prefetch值得設(shè)置實(shí)現(xiàn)的負(fù)載均衡逝撬;
最后通過Channel.Consume監(jiān)聽隊(duì)列消息,然后通過goroutine來實(shí)現(xiàn)對消息的處理和發(fā)送response乓土。
rpc_client.go文件:
package main
import (
"fmt"
"log"
"strconv"
"os"
"math/rand"
"strings"
"time"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func randomString(l int) string {
bytes := make([]byte, l)
for i:=0; i<l; i++ {
bytes[i] = byte(randInt(65, 90))
}
return string(bytes)
}
func randInt(min int, max int) int {
return min + rand.Intn(max - min)
}
func bodyFrom(args []string) int {
var s string
if(len(args) < 2 || os.Args[1]==""){
s = "30"
}else{
s = strings.Join(args[1:], " ")
}
n, err := strconv.Atoi(s)
failOnError(err, "Failed to convert arg to integer")
return n
}
func fibonacciRPC(n int) (res int, err error) {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"", //name
false, //durables
false, //delete when unused
true, //exclusive
false, //no wait
nil, //args
)
failOnError(err, "Failed to declare a queue")
msgs , err := ch.Consume(
q.Name, //queue
"", //consumer
true, //auto-ack
false, //exclusive
false, //no-lock
false, //nowait
nil,
)
failOnError(err, "Faield to register a consumer")
corrId := randomString(32)
err = ch.Publish(
"", //exchange
"rpc_queue", //routing key
false, //mandatory
false, //immediate
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: corrId,
ReplyTo: q.Name,
Body: []byte(strconv.Itoa(n)),
})
failOnError(err, "Failed to publish a message")
for d:= range msgs {
if corrId == d.CorrelationId {
res, err = strconv.Atoi(string(d.Body))
failOnError(err, "Failed to convert body to integer")
break
}
}
return
}
func main(){
rand.Seed(time.Now().UTC().UnixNano())
n:= bodyFrom(os.Args)
log.Printf(" [x] Requesting fib(%d)", n)
res, err := fibonacciRPC(n)
failOnError(err, "Failed to handle RPC request")
log.Printf(" [.] Got %d", res)
}
(5)運(yùn)行
首先運(yùn)行RPC server:
go run rpc_server.go
# => [x] Awaiting RPC requests
客戶端計(jì)算斐波那契數(shù)列:
go run rpc_client.go 30
# => [x] Requesting fib(30)
目前為止設(shè)計(jì)的RPC系統(tǒng)宪潮,不僅僅是能提供RPC服務(wù),還具備其他優(yōu)點(diǎn):
* 如果單臺RPC服務(wù)器性能緩慢趣苏,可以很容易的進(jìn)行擴(kuò)展狡相,只需在新窗口運(yùn)行一個(gè)rpc_server.go腳本即可;
* 在客戶端食磕,RPC模式要求對消息進(jìn)行一次發(fā)送和接收操作尽棕,因此只需要一次網(wǎng)絡(luò)往返即可完成一次RPC請求;
當(dāng)然彬伦,這里的Demo過于簡單滔悉,并沒有考慮實(shí)際應(yīng)用中復(fù)雜而重要的諸多問題伊诵,如:
* 客戶端如何處理服務(wù)端掉線的情況?
* 客戶端如何處理服務(wù)端超時(shí)情況回官?
* 如果服務(wù)端故障導(dǎo)致異常曹宴,是否需要將異常轉(zhuǎn)發(fā)給客戶端處理?
* 在對消息處理前未對消息的合法性進(jìn)行檢查歉提,如邊界值浙炼、類型信息等;
如果你對上述問題有興趣唯袄,期望了解更多的使用知識弯屈,請參考Management UI.
2.4 RabbitMQ原理
2.4.1 RabbitMQ基本概念
首先來看看RabbitMQ里的幾個(gè)重要概念:
- 生產(chǎn)者(Producer):發(fā)送消息的應(yīng)用。
- 消費(fèi)者(Consumer):接收消息的應(yīng)用恋拷。
- 隊(duì)列(Queue):存儲消息的緩存资厉。
- 消息(Message):由生產(chǎn)者通過RabbitMQ發(fā)送給消費(fèi)者的信息。
- 連接(Connection):連接RabbitMQ和應(yīng)用服務(wù)器的TCP連接蔬顾。
- 通道(Channel):連接里的一個(gè)虛擬通道宴偿。當(dāng)你通過消息隊(duì)列發(fā)送或者接收消息時(shí),這個(gè)操作都是通過通道進(jìn)行的诀豁。
- 交換機(jī)(Exchange):交換機(jī)負(fù)責(zé)從生產(chǎn)者那里接收消息窄刘,并根據(jù)交換類型分發(fā)到對應(yīng)的消息列隊(duì)里。要實(shí)現(xiàn)消息的接收舷胜,一個(gè)隊(duì)列必須到綁定一個(gè)交換機(jī)娩践。
- 綁定(Binding):綁定是隊(duì)列和交換機(jī)的一個(gè)關(guān)聯(lián)連接。
- 路由鍵(Routing Key):路由鍵是供交換機(jī)查看并根據(jù)鍵來決定如何分發(fā)消息到列隊(duì)的一個(gè)鍵烹骨。路由鍵可以說是消息的目的地址翻伺。
生產(chǎn)者(Producer)發(fā)送/發(fā)布消息到代理->消費(fèi)者(Consumer)從代理那里接收消息。哪怕生產(chǎn)者和消費(fèi)者運(yùn)行在不同的機(jī)器上沮焕,RabbitMQ也能扮演代理中間件的角色吨岭。
當(dāng)生產(chǎn)者發(fā)送消息時(shí),它并不是直接把消息發(fā)送到隊(duì)列里的峦树,而是使用交換機(jī)(Exchange)來發(fā)送辣辫。下面的設(shè)計(jì)圖簡單展示了這三個(gè)主要的組件之間是如何連接起來的。
交換機(jī)代理(exchange agent)負(fù)責(zé)把消息分發(fā)到不同的隊(duì)列里魁巩。這樣的話急灭,消息就能夠從生產(chǎn)者發(fā)送到交換機(jī),然后被分發(fā)到消息隊(duì)列里歪赢。這就是常見的“發(fā)布”方法化戳。
Producer
然后,消息會被消費(fèi)者從隊(duì)列里讀取并消費(fèi),這就是“消費(fèi)”点楼。
2.4.2 RabbitMQ架構(gòu)
2.4.3 交換機(jī)(Exchange)
消息并不是直接發(fā)布到隊(duì)里里的扫尖,而是被生產(chǎn)者發(fā)送到一個(gè)交換機(jī)上。交換機(jī)負(fù)責(zé)把消息發(fā)布到不同的隊(duì)列里掠廓。交換機(jī)從生產(chǎn)者應(yīng)用上接收消息换怖,然后根據(jù)綁定和路由鍵將消息發(fā)送到對應(yīng)的隊(duì)列里。綁定是交換機(jī)和隊(duì)列之間的一個(gè)關(guān)系連接蟀瞧。
2.4.4 RabbitMQ里的消息流程
生產(chǎn)者(producer)把消息發(fā)送給交換機(jī)沉颂。當(dāng)你創(chuàng)建交換機(jī)的時(shí)候,你需要指定類型悦污。交換機(jī)的類型接下來會講到铸屉。
交換機(jī)(exchange)接收消息并且負(fù)責(zé)對消息進(jìn)行路由。根據(jù)交換機(jī)的類型切端,消息的多個(gè)屬性會被使用彻坛,例如路由鍵。
綁定(binding)需要從交換機(jī)到隊(duì)列的這種方式來進(jìn)行創(chuàng)建踏枣。在這個(gè)例子里昌屉,我們可以看到交換機(jī)有到兩個(gè)不同隊(duì)列的綁定。交換機(jī)根據(jù)消息的屬性來把消息分發(fā)到不同的隊(duì)列上茵瀑。
消息(message)消息會一直留在隊(duì)列里直到被消費(fèi)间驮。
消費(fèi)者(consumer)處理消息。
2.4.5 交換機(jī)類型的3種類型
直接(Direct):直接交換機(jī)通過消息上的路由鍵直接對消息進(jìn)行分發(fā)马昨。
扇出(Fanout):一個(gè)扇出交換機(jī)會將消息發(fā)送到所有和它進(jìn)行綁定的隊(duì)列上竞帽。
主題(Topic):這個(gè)交換機(jī)會將路由鍵和綁定上的模式進(jìn)行通配符匹配。
2.4.6 虛擬主機(jī)
RabbitMQ是一個(gè)多組件系統(tǒng):Connection偏陪、Exchange抢呆、Queue煮嫌、Binding笛谦、User Permision、Policies和其他屬于虛擬主機(jī)(實(shí)體的邏輯分組)的組件昌阿。
創(chuàng)建虛擬主機(jī)
虛擬主機(jī)可以通過rabbitmqctl的add_vhost命令進(jìn)行創(chuàng)建饥脑,該命令接收一個(gè)必填參數(shù)——虛擬主機(jī)名稱。例如:
> rabbitmqctl add_vhost qa1
更多內(nèi)容參考《RabbitMQ系列(四) RabbitMQ的虛擬主機(jī)》
3.問題及解答
3.1 RabbitMQ啟動失敗
失敗原因信息:
root@JD3:/etc/rabbitmq# systemctl start rabbitmq-server
Job for rabbitmq-server.service failed because the control process exited with error code. See "systemctl status rabbitmq-server.service" and "journalctl -xe" for details.
使用systemctl status rabbitmq-server.service命令進(jìn)一步查看問題:
root@JD3:/etc/rabbitmq# systemctl status rabbitmq-server.service
rabbitmq-server.service - RabbitMQ Messaging Server
Loaded: loaded (/lib/systemd/system/rabbitmq-server.service; enabled; vendor preset: enabled)
Active: activating (auto-restart) (Result: exit-code) since Fri 2021-01-22 11:26:52 CST; 4s ago
Process: 6389 ExecStop=/usr/sbin/rabbitmqctl stop (code=exited, status=0/SUCCESS)
Process: 27911 ExecStartPost=/usr/lib/rabbitmq/bin/rabbitmq-server-wait (code=exited, status=2)
Process: 27910 ExecStart=/usr/sbin/rabbitmq-server (code=exited, status=140)
Main PID: 27910 (code=exited, status=140)
Jan 22 11:26:52 JD3 systemd[1]: Failed to start RabbitMQ Messaging Server.
Jan 22 11:26:52 JD3 systemd[1]: rabbitmq-server.service: Unit entered failed state.
Jan 22 11:26:52 JD3 systemd[1]: rabbitmq-server.service: Failed with result 'exit-code'.
沒有看到有效信息懦冰。
查看 /var/log/rabbitmq/startup_log的啟動日志灶轰,發(fā)現(xiàn)異常情況:
{"could not start kernel pid",application_controller,"error in config file "/etc/rabbitmq/rabbitmq.config" (1): syntax error before: '['"}
解決方法:
發(fā)現(xiàn)自己編輯 /etc/rabbitmq/rabbitmq.config 文件,少了最后的.導(dǎo)致刷钢,此字符表示配置結(jié)束笋颤。修改后就不會報(bào)錯(cuò)了。
[
{rabbit, [{tcp_listeners, [5672]}, {loopback_users, [“guest”]}]}
].
3.2 RabbitMQ的配置文件格式如何選擇?
/etc/rabbitmq下伴澄,增加rabbitmq.conf文件赋除,刪除rabbitmq.config文件。
4, RabbitMQ常見面試題
參考:
(1) 消息隊(duì)列 RabbitMQ 常見面試題總結(jié)! (附答案)-專題 http://www.mianshigee.com/topic/1009ada/
(2) 技術(shù)干貨:RabbitMQ面試題及答案 http://blog.itpub.net/69902581/viewspace-2673724/
5非凌,參考
(1)Ubuntu 16.04 RabbitMq 安裝與運(yùn)行(安裝篇)【成功】
https://blog.csdn.net/qq_22638399/article/details/81704372
(2)RabbitMQ 從入門到精通 (一)
https://www.cnblogs.com/dwlovelife/p/10982735.html
https://www.cnblogs.com/dwlovelife/p/10991371.html
(3)RabbitMQ核心概念以及工作原理
http://www.reibang.com/p/256c502d09cd
(4)RabbitMQ 中文文檔
http://rabbitmq.mr-ping.com/
https://www.rabbitmq.com
(5)RabbitMQ入門詳解以及使用【操作】
https://www.cnblogs.com/huangting/p/11989597.html
(6)RabbitMQ 第一個(gè)入門實(shí)例hello world【goLang版本】
http://rabbitmq.mr-ping.com/tutorials_with_golang/[1]Hello_World.html
(7)RabbitMQ 第一個(gè)入門實(shí)例hello world【PHP版本】
http://www.apeit.cn/rabbitmq-hello-world-1xvq3
(8)老司機(jī)帶你入門RabbitMQ消息中間件【MQ對比举农,一般】
https://www.imooc.com/article/79039
(9)rabbitmq官方的六種工作模式
https://blog.csdn.net/qq_33040219/article/details/82383127
(10)rabbitmq的配置參考
https://www.rabbitmq.com/configure.html
https://github.com/rabbitmq/rabbitmq-server/blob/v3.8.x/deps/rabbit/docs/rabbitmq.conf.example
(11)RABBITMP的GO消息接口
https://godoc.org/github.com/streadway/amqp
(12)攀登者007的RabbitMQ系列 入門
RabbitMQ系列(一)RabbitMQ的架構(gòu)
https://juejin.cn/post/6844904136941518862
RabbitMQ系列(二)RabbitMQ Server的安裝(基于二進(jìn)制)
https://juejin.cn/post/6844904111826026510
RabbitMQ系列(三)RabbitMQ Server的安裝(基于Linux RPM)
https://juejin.cn/post/6844904111545008141
RabbitMQ系列(四) RabbitMQ的虛擬主機(jī)
https://juejin.cn/post/6844904057274892296
RabbitMQ系列(五) RabbitMQ的文件和目錄位置
https://juejin.cn/post/6844904139063820295
RabbitMQ系列(六)RabbitMQ的配置
https://juejin.cn/post/6844904057845334029
RabbitMQ系列(七)RabbitMQ的參數(shù)與策略
https://juejin.cn/post/6844904136949891085
RabbitMQ系列(八)RabbitMQ的日志
https://juejin.cn/post/6844904061829939208