相關(guān)概念:
Producer:消息產(chǎn)生者
Consumer: 消息消費(fèi)者
Broker:消息中間件的服務(wù)節(jié)點(diǎn),對(duì)于Rabbitmq來(lái)說(shuō)刁赦,一個(gè)Rabbitmq broker可以認(rèn)為是一個(gè)Rabbitmq的服務(wù)實(shí)例。
Connection眉枕,與Broker的tcp長(zhǎng)連接忱嘹,Producer和Consumer都需要建立連接之后才可以使用
Channel衣迷,建立在Connection基礎(chǔ)上,每個(gè)線程分配一個(gè)channel邮破,類似于NIO的多路復(fù)用诈豌,節(jié)省連接資源仆救。大部分RabbitMQ的操作和核心概念都是基于Channel的,需要特別注意矫渔。
-
Queue:隊(duì)列派桩,RabbitMQ中用于存儲(chǔ)消息的容器。而且RabbitMQ中的消息只能存儲(chǔ)在Queue中蚌斩,這點(diǎn)跟kafka不同铆惑,kafka只能將消息放在topic中,而kafka中的queue只是topic實(shí)際存儲(chǔ)文件中的位移標(biāo)識(shí)送膳。
多個(gè)consumer可以消費(fèi)同一個(gè)queue中的消息员魏,這時(shí)候消息的處理是互斥的,即一個(gè)消息只能被一個(gè)consumer處理叠聋。
Exchange:還是不翻譯成中文了撕阎,太怪。在producer將消息發(fā)到Broker中時(shí)碌补,是通過(guò)exchange按照一定規(guī)則轉(zhuǎn)發(fā)到不同的queue中虏束,而不是直接放入queue中。
RoutingKey:producer在發(fā)送消息給exchange時(shí)厦章,一般會(huì)指定一個(gè)RoutingKey镇匀,用來(lái)指定這個(gè)消息的路由規(guī)則。
BindingKey:RoutingKey和BindingKey匹配時(shí)(注意不是相同袜啃,可能是模糊匹配)汗侵,exchange才會(huì)把消息發(fā)送到對(duì)應(yīng)的queue中
exchange類型
- fanout,會(huì)把消息路由到所有綁定的queue中群发,無(wú)視RoutingKey和BindingKey
- direct晰韵,只能將消息路由到RoutingKey和BindingKey完全匹配的queue,queue可以有多個(gè)熟妓,只要匹配就行
- topic雪猪,可以支持# * 等通配符匹配RoutingKey和BindingKey
- headers,不常用起愈,不會(huì)依賴RoutingKey只恨,而是根據(jù)消息內(nèi)容中的headers屬性跟exchange綁定的內(nèi)容進(jìn)行匹配,性能較低告材,不過(guò)非常靈活坤次。
一些關(guān)鍵點(diǎn)
大部分情況下,按照最簡(jiǎn)單的方式使用就好了斥赋,作為工具書去查詢《RabbitMQ實(shí)戰(zhàn)詳解》里面的配置缰猴。
- channel.basicQos是設(shè)置一個(gè)channel中consumer所能保持的最大未確認(rèn)消息,也就是說(shuō)疤剑,如果一個(gè)channel中的qos值已經(jīng)到了最大滑绒,那么rabbitmq就不會(huì)繼續(xù)往這個(gè)channel中push對(duì)應(yīng)的消息闷堡。
- rabbitmq的順序一致性其實(shí)是無(wú)法保證的,
- 比如事務(wù)消息或者發(fā)送消息確認(rèn)疑故,當(dāng)發(fā)送失敗需要重試時(shí)杠览,這一條(批)數(shù)據(jù)跟其之后的數(shù)據(jù)在producer端就不一致了
- 如果producer發(fā)送的時(shí)候設(shè)置了不同的超時(shí)時(shí)間,并且也設(shè)置了死信隊(duì)列纵势,那么消費(fèi)者在處理死信隊(duì)列的時(shí)候踱阿,也會(huì)出現(xiàn)數(shù)據(jù)順序與發(fā)送順序不同的情況。
- 設(shè)置優(yōu)先級(jí)钦铁,也會(huì)導(dǎo)致順序一致性收到影響软舌。
- 死信隊(duì)列、延遲隊(duì)列牛曹、消費(fèi)優(yōu)先級(jí)佛点、持久化等,查詢工具書即可黎比。
web端管理
- 使用rabbitmq-plugins enable rabbitmq_management 來(lái)啟用web管理插件超营,重啟才會(huì)生效.
- 使用rabbitmq-plugins list來(lái)查看正在使用的插件。
- 訪問(wèn) server_ip:15672可以訪問(wèn)阅虫,guest用戶無(wú)法登陸遠(yuǎn)程服務(wù)器演闭,需要使用上面創(chuàng)建的root:root123 用戶/密碼來(lái)登陸
HelloWorld
加入maven依賴:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.2.0</version>
</dependency>
producer代碼:
package rabbitmq.server;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQProducer {
private static final String exchange_name = "exchange_siyu";
private static final String routing_key = "routing_key_siyu";
private static final String queue_name = "queue_siyu";
private static final String rabbitmq_server_ip_addr = "10.199.189.30";
private static final int rabbitmq_server_port = 5672;
public static void main(String[] args) throws IOException, TimeoutException {
// 連接工廠類
ConnectionFactory connectionFactory = new ConnectionFactory();
// 設(shè)置連接屬性及用戶名密碼,用戶书妻、密碼要通過(guò)rabbitmqctl設(shè)置過(guò)權(quán)限
connectionFactory.setHost(rabbitmq_server_ip_addr);
connectionFactory.setPort(rabbitmq_server_port);
connectionFactory.setUsername("root");
connectionFactory.setPassword("root123"); // 如果用戶名密碼不匹配船响,會(huì)連接失敗
// 建立連接,一個(gè)tcp長(zhǎng)連接
Connection connection = connectionFactory.newConnection();
// 創(chuàng)建信道躲履,主要操作通過(guò)channel執(zhí)行,可以認(rèn)為channel是虛擬化出來(lái)的一個(gè)Connection聊闯,用于復(fù)用
Channel channel = connection.createChannel();
// 定義路由工猜,direct是point-2-piont的,直接到對(duì)應(yīng)的單個(gè)queue中
channel.exchangeDeclare(exchange_name,"direct",true,false,null);
// 定義queue
channel.queueDeclare(queue_name,true,false,false,null);
// 通過(guò)routingkey 綁定queue和exchange
channel.queueBind(queue_name,exchange_name,routing_key);
// 開(kāi)始發(fā)送消息
String message = "Hello World!!";
/* MessageProperties中預(yù)置了一部分消息的參數(shù)菱蔬,比如PERSIST_TEXT_PLAIN篷帅,其中的定義如下:
*
*
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null);
* */
channel.basicPublish(exchange_name,routing_key, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
// 關(guān)閉channel和connection
channel.close();
connection.close();
}
}
Consumer
package rabbitmq.client;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class RabbitMQConsumer {
private static final String queue_name = "queue_siyu";
private static final String rabbitmq_server_ip_address = "10.199.189.30";
private static final int port = 5672;
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Address[] addresses = new Address[]{
new Address(rabbitmq_server_ip_address,port)
};
// 長(zhǎng)連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername("root");
connectionFactory.setPassword("root123");
// 這里創(chuàng)建連接跟server端不同,傳入了address
Connection connection = connectionFactory.newConnection(addresses);
// 創(chuàng)建channel
final Channel channel = connection.createChannel();
channel.basicQos(64);// 拴泌?魏身? 設(shè)置客戶端最多接收未被ack的消息個(gè)數(shù)
// 創(chuàng)建消費(fèi)
final Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("receive msg:" + new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(),false); // 發(fā)送ack之后,消息會(huì)在queue中被刪除
}
};
channel.basicConsume(queue_name,consumer);
TimeUnit.SECONDS.sleep(5);
// 如果先關(guān)閉connection蚪腐,再關(guān)閉channel箭昵,就會(huì)拋出異常:
// com.rabbitmq.client.AlreadyClosedException: connection is already closed due to clean connection shutdown;
// 所以這里一定要注意關(guān)閉的順序
channel.close();
connection.close();
}
}