文章參考:Rabbit實戰(zhàn)指南
交換機的類型
-
fanout
- 它會把所有發(fā)送到該交換機的消息路由到所有與該交換機綁定的隊列中伶椿。
-
direct
- 它會把消息路由到那些BindingKey和RoutingKey完全匹配的隊列中辜伟。
-
topic
- 將消息路由到BindingKey和RoutingKey相匹配的隊列中,它約定:
- RoutingKey作為一個點號"."分割的字符串(被點號"."分隔開的每一段獨立的字符串稱為一個單詞)
- BindingKey和RoutingKey一樣也是點號"."分割的字符串脊另。
- BindingKey中可以存在兩種特殊字符串"*"和"#"导狡,用于做模糊匹配,其中"#"用來匹配一個單詞偎痛,"*"用于匹配多規(guī)格單詞旱捧。
- 路由鍵為“com.rabbitmq.client”的消息會同時路由到Queue1和Queue2;
- 路由鍵為“com.hidden.client”的消息只會路由到Queue2中;
- 路由鍵為java.util.concurrent的消息會被丟棄或者返回給生產(chǎn)者(需要設(shè)置)
- 將消息路由到BindingKey和RoutingKey相匹配的隊列中,它約定:
-
headers
- headers類型的交換機不依賴路由鍵的匹配規(guī)則來路由信息踩麦,而是根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進行匹配枚赡。在綁定隊列和交換器時制定一組鍵值對,當發(fā)送信息到交換器時谓谦,RabbitMQ會獲取到該信息的headers贫橙,對比其中鍵值對是否完全匹配隊列和交換器綁定時指定的鍵值對,如果完全匹配則消息會路由到該隊列反粥。
RabbitMQ運轉(zhuǎn)流程
- 生產(chǎn)者發(fā)送消息:
- 生產(chǎn)者連接到RabbitMQ Broker卢肃,建立一個連接(Connection),開啟一個信道(Channel)
- 生產(chǎn)者聲明一個交換器才顿,并設(shè)置相關(guān)屬性莫湘,比如交互器類型,是否持久化等
- 生產(chǎn)者聲明一個隊列并設(shè)置相關(guān)屬性郑气,比如是否排他幅垮,是否持久化,是否自動刪除等
- 生產(chǎn)者通過路由鍵將交換器和隊列綁定起來
- 生產(chǎn)者發(fā)送消息至RabbitMQ Broker尾组,其中包含路由鍵忙芒,交換器等信息
- 相應的交換器根據(jù)接受到的路由鍵查找相匹配的隊列
- 如果找到,則將從生產(chǎn)者發(fā)送過來的信息存入相應的隊列中
- 如果沒有找到讳侨,則根據(jù)生產(chǎn)者配置的屬性選擇丟棄還是回退給生產(chǎn)者
- 關(guān)閉信道
- 關(guān)閉連接
- 消費者接受消息:
- 消費者連接到RabbitMQ Broker匕争,建立一個連接,開啟一個信道
- 消費者向RabbitMQ Broker 請求消費相應隊列中的消息爷耀,可能會設(shè)置相應的回調(diào)函數(shù)甘桑,以及做一些準備工作
- 等待RabbitMQ Broker 回應并投遞相應隊列中的消息,消費者接受消息
- 消費者確認接收到的消息
- RabbitMQ從隊列中刪除相應已經(jīng)被確認的消息
- 關(guān)閉信道
- 關(guān)閉連接
連接RabbitMQ
下面代碼用來在給定的參數(shù)(IP地址歹叮、端口號跑杭、用戶名、密碼等)下連接RabbitMQ:
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(virtualHost);
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
Connection conn = factory.newConnection();
也可以選擇使用URI的方式來實現(xiàn)咆耿,
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@ipAddress:portNumber/virtualHost");
Connection conn = factory.newConnection();
創(chuàng)建之后德谅,Channel可以用來發(fā)送或者接收消息了。
Channel channel = conn.createChannel();
Channel或者Connection中有個isOpen方法可以用來檢測其是否處于開啟狀態(tài)萨螺,不推薦在生產(chǎn)環(huán)境上使用isOpen方法窄做,這個方法的返回值依賴于shutdownCause的存在愧驱,有可能會產(chǎn)生競爭。
isOpen方法的源碼
public boolean isOpen(){
synchronized(this.monitor){
return this.shutdownCause == null;
}
}
錯誤的使用isOpen方法
public void brokenMethod(Channel channel){
if(channel.isOpen()){
//The following code depends on the channel being in open state.
//However there is a possibility of the change in the channel state
//between isOpen() and basicQos(1) call
...
channel.baseicQos(1);
}
}
通常情況下椭盏,在調(diào)用createXXX或者newXXX方法之后组砚,我們可以簡單地認為Connection或者Channel已經(jīng)成功地處于開啟狀態(tài),而并不會在代碼中使用isOpen這個檢測方法掏颊。如果在使用Channel的時候其已經(jīng)處于關(guān)閉狀態(tài)糟红,那么程序會拋出個 com. rabbitmq. client. ShutdownSignalException,我們只需捕獲這個異常即可。當然同時也要試著捕獲I0Exception 或者SocketException, 以防Connection意外關(guān)閉乌叶。示例代碼如下:
public void validMethod(Channel channel){
try{
...
channel.basicQos(1);
} catch (ShutdownSignalException sse){
//possibly check if channel was closed
//by the time we started action and reasons for
//closing it
...
} catch (IOException ioe) {
// check why connection was closed
...
}
}