馬桶??Java 上廁所就能看完的小知識(shí)! 歡迎關(guān)注润绵、點(diǎn)贊 持續(xù)更新!
什么是MQ?
? MQ(Message Quene) : 翻譯為 消息隊(duì)列,運(yùn)用生產(chǎn)者和消費(fèi)者模型,生產(chǎn)者不斷向消息隊(duì)列中生產(chǎn)消息咐容,消費(fèi)者不斷的從隊(duì)列中獲取(消費(fèi))消息蚂维。因?yàn)橄⒌纳a(chǎn)和消費(fèi)都是異步的戳粒,而且只關(guān)心消息的發(fā)送和接收路狮,沒有業(yè)務(wù)邏輯的侵入,輕松的實(shí)現(xiàn)系統(tǒng)間解耦。
MQ種類介紹
# 1.ActiveMQ
ActiveMQ 是Apache出品蔚约,最流行的奄妨,能力強(qiáng)勁的開源消息總線。它是一個(gè)完全支持JMS規(guī)范的的消息中間件苹祟。豐富的API,多種集群架構(gòu)模式讓ActiveMQ在業(yè)界成為老牌的消息中間件,在中小型企業(yè)頗受歡迎!
# 2.Kafka
Kafka是LinkedIn開源的分布式發(fā)布-訂閱消息系統(tǒng)砸抛,目前歸屬于Apache頂級(jí)項(xiàng)目。Kafka主要特點(diǎn)是基于Pull的模式來處理消息消費(fèi)树枫,追求高吞吐量直焙,一開始的目的就是用于日志收集和傳輸。0.8版本開始支持復(fù)制砂轻,不支持事務(wù)奔誓,對(duì)消息的重復(fù)、丟失搔涝、錯(cuò)誤沒有嚴(yán)格要求厨喂,適合產(chǎn)生大量數(shù)據(jù)的互聯(lián)網(wǎng)服務(wù)的數(shù)據(jù)收集業(yè)務(wù)。
# 3.RocketMQ
RocketMQ是阿里開源的消息中間件庄呈,它是純Java開發(fā)蜕煌,具有高吞吐量、高可用性诬留、適合大規(guī)模分布式系統(tǒng)應(yīng)用的特點(diǎn)斜纪。RocketMQ思路起源于Kafka,但并不是復(fù)制kafka故响,它對(duì)消息的可靠傳輸及事務(wù)性做了優(yōu)化傀广,目前在阿里集團(tuán)被廣泛應(yīng)用于交易、充值彩届、流計(jì)算伪冰、消息推送、日志流式處理樟蠕、binglog分發(fā)等場(chǎng)景贮聂。
# 4.RabbitMQ
RabbitMQ是使用Erlang語言開發(fā)的開源消息隊(duì)列系統(tǒng),基于AMQP協(xié)議來實(shí)現(xiàn)寨辩。AMQP的主要特征是面向消息吓懈、隊(duì)列、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)靡狞、可靠性耻警、安全。AMQP協(xié)議更多用在企業(yè)系統(tǒng)內(nèi)對(duì)數(shù)據(jù)一致性、**穩(wěn)定性和可靠性**要求很高的場(chǎng)景甘穿,對(duì)性能和吞吐量的要求還在其次腮恩。
RabbitMQ比Kafka可靠,Kafka更適合IO高吞吐的處理温兼,一般應(yīng)用在大數(shù)據(jù)日志處理或?qū)?shí)時(shí)性(少量延遲)秸滴,可靠性(少量丟數(shù)據(jù))要求稍低的場(chǎng)景使用,比如ELK日志收集募判。
RabbitMQ引言
什么是rabbitMQ荡含?
RabbitMQ是實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ服務(wù)器是用Erlang語言編寫的届垫,而群集和故障轉(zhuǎn)移是構(gòu)建在開放電信平臺(tái)框架上的释液。
官網(wǎng)
: https://www.rabbitmq.com/
官方教程
: https://www.rabbitmq.com/#getstarted
RabbitMQ安裝
RabbitMQ安裝首先要安裝Erlang
安裝地址:https://www.rabbitmq.com/download.html
具體安裝調(diào)試請(qǐng)參考。
RabbitMQ界面初識(shí)
主界面
登錄管理界面 username: guest password: guest
Connections:連接装处。無論是生產(chǎn)者和消費(fèi)者都需要在與RabbitMQ建立連接的情況下完成消息的生產(chǎn)與消費(fèi)均澳,這里可以查看相應(yīng)的消費(fèi)情況。
Channels:通道符衔。建立連接后會(huì)形成相應(yīng)的通道,消息的投遞與獲取都依賴于通道糟袁。
Exchanges: 交換機(jī)判族。用于實(shí)現(xiàn)消息的路由功能。
Queues:消息隊(duì)列项戴。消息存在在消息隊(duì)列中等待消費(fèi)形帮。
用戶
tags為指定用戶
-
超級(jí)管理員(administrator)
可登陸管理控制臺(tái),可查看所有的信息周叮,并且可以對(duì)用戶辩撑,策略(policy)進(jìn)行操作。
-
監(jiān)控者(monitoring)
可登陸管理控制臺(tái)仿耽,同時(shí)可以查看rabbitmq節(jié)點(diǎn)的相關(guān)信息(進(jìn)程數(shù)合冀,內(nèi)存使用情況,磁盤使用情況等)
-
策略制定者(policymaker)
可登陸管理控制臺(tái), 同時(shí)可以對(duì)policy進(jìn)行管理项贺。但無法查看節(jié)點(diǎn)的相關(guān)信息(上圖紅框標(biāo)識(shí)的部分)君躺。
-
普通管理者(management)
僅可登陸管理控制臺(tái),無法看到節(jié)點(diǎn)信息开缎,也無法對(duì)策略進(jìn)行管理棕叫。
-
其他
無法登陸管理控制臺(tái),通常就是普通的生產(chǎn)者和消費(fèi)者奕删。
虛擬機(jī)
為了讓各個(gè)用戶互相不干預(yù)工作俺泣,RabbitMQ 添加了虛擬機(jī)概念,就是一個(gè)獨(dú)立的訪問路徑,不同用戶使用不同路徑伏钠,各自有自己的隊(duì)列横漏、交換機(jī),互相不會(huì)影響贝润。
進(jìn)入相關(guān)新創(chuàng)建的虛擬機(jī)绊茧。
為創(chuàng)建好的虛擬機(jī)綁定對(duì)應(yīng)用戶。
RabbitMQ 消息模型
引入依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
創(chuàng)建工具
public class RabbitMqConnectionUtil {
private static final ConnectionFactory CONNECTION_FACTORY;
private static Connection connection;
static {
//通過連接工廠并設(shè)置相應(yīng)的ip 端口 用戶名密碼 及綁定的虛擬機(jī)創(chuàng)建連接
CONNECTION_FACTORY = new ConnectionFactory();
CONNECTION_FACTORY.setHost("127.0.0.1");
CONNECTION_FACTORY.setPort(5672);
CONNECTION_FACTORY.setUsername("test");
CONNECTION_FACTORY.setPassword("test");
CONNECTION_FACTORY.setVirtualHost("/test");
}
public static Connection createConnection() {
try {
connection = CONNECTION_FACTORY.newConnection();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
return connection;
}
public static void close(Channel channel, Connection connection) throws IOException, TimeoutException {
channel.close();
connection.close();
}
}
工作隊(duì)列
讓多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列打掘,共同消費(fèi)隊(duì)列中的消息华畏。隊(duì)列中的消息一旦消費(fèi),就會(huì)消失尊蚁,因此任務(wù)是不會(huì)被重復(fù)執(zhí)行的亡笑。
在模型中,有以下概念:
- P:生產(chǎn)者横朋,也就是要發(fā)送消息的程序
- C:消費(fèi)者:消息的接受者仑乌,會(huì)一直等待消息到來。
- queue:消息隊(duì)列琴锭,圖中紅色部分晰甚。類似一個(gè)郵箱,可以緩存消息决帖;生產(chǎn)者向其中投遞消息厕九,消費(fèi)者從其中取出消息。
生產(chǎn)者如下:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取連接
Connection connection = RabbitMqConnectionUtil.createConnection();
// 獲取通道
Channel channel = connection.createChannel();
/**
創(chuàng)建隊(duì)列
參數(shù)如下:
queue - 隊(duì)列的名稱
durable - 宣布一個(gè)持久的隊(duì)列(隊(duì)列不受服務(wù)器重啟地回,數(shù)據(jù)會(huì)消失)
exclusive - 宣布獨(dú)占隊(duì)列(限于此連接)
autoDelete - 一個(gè)自動(dòng)刪除隊(duì)列(服務(wù)器將刪除它在使用時(shí)不再)
參數(shù) - 其它性質(zhì)(結(jié)構(gòu)參數(shù))隊(duì)列
*/
channel.queueDeclare("work", true, false, false, null);
for (int i = 0; i < 10; i++) {
channel.basicPublish("", "work", null, ("work queue message has been sent --"+i).getBytes());
}
RabbitMqConnectionUtil.close(channel,connection);
}
}
消費(fèi)者獲取消息:
再創(chuàng)建一個(gè)Consumer02 代碼相同扁远。
public class Consumer01 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqConnectionUtil.createConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
// 獲取對(duì)應(yīng)消息 autoAck 自動(dòng)確認(rèn)消息 在這里使用的默認(rèn)消息消費(fèi)者
channel.basicConsume("work",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
}
}
先運(yùn)行消費(fèi)者等待消費(fèi)隊(duì)列中數(shù)據(jù),在打開生產(chǎn)者生產(chǎn)消息結(jié)果如下:
這種形式是公平分配形式刻像,在程序運(yùn)行時(shí)將對(duì)應(yīng)的未確認(rèn)消息分配給對(duì)應(yīng)的消費(fèi)者畅买。然后再進(jìn)行消費(fèi)。(自動(dòng)確認(rèn)消息)
接下來我們切換為手動(dòng)確認(rèn)消息细睡,實(shí)現(xiàn)類似于能者多勞的分配模式谷羞。
未完待續(xù)。溜徙。洒宝。