什么是 MQ?
MQ全稱為Message Queue, 消息隊列(MQ)是一種應(yīng)用程序?qū)?yīng)用程序的通信方法。應(yīng)用程序通過寫和檢索出入列隊的針對應(yīng)用程序的數(shù)據(jù)(消息)來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發(fā)送數(shù)據(jù)進行通信犬钢,而不是通過直接調(diào)用彼此來通信苗踪,直接調(diào)用通常是用于諸如遠程過程調(diào)用的技術(shù)跪者。
MQ的好處
- 實現(xiàn)開發(fā)語言間的解耦,比如生產(chǎn)者是Java妹卿,消費者是.NET
- 實現(xiàn)高并發(fā)場景下的分布式線程池
- 將無需即時返回且耗時的操作進行異步處理,比如短信群發(fā)
有 Broker 的 MQ
這個流派通常有一臺服務(wù)器作為 Broker蔑鹦,所有的消息都通過它中轉(zhuǎn)夺克。生產(chǎn)者把消息發(fā)送給它就結(jié)束自己的任務(wù)了,Broker 則把消息主動推送給消費者(或者消費者主動輪詢)
無 Broker 的 MQ
無 Broker 的 MQ 的代表是 ZeroMQ嚎朽。該作者非常睿智铺纽,他非常敏銳的意識到——MQ 是更高級的 Socket,它是解決通訊問題的哟忍。所以 ZeroMQ 被設(shè)計成了一個“庫”而不是一個中間件狡门,這種實現(xiàn)也可以達到——沒有 Broker 的目的
節(jié)點之間通訊的消息都是發(fā)送到彼此的隊列中,每個節(jié)點都既是生產(chǎn)者又是消費者锅很。ZeroMQ 做的事情就是封裝出一套類似于 Socket 的 API 可以完成發(fā)送數(shù)據(jù)其馏,讀取數(shù)據(jù)
ZeroMQ 其實就是一個跨語言的、重量級的 Actor 模型郵箱庫爆安。你可以把自己的程序想象成一個 Actor叛复,ZeroMQ 就是提供郵箱功能的庫;ZeroMQ 可以實現(xiàn)同一臺機器的 RPC 通訊也可以實現(xiàn)不同機器的 TCP、UDP 通訊褐奥,如果你需要一個強大的咖耘、靈活、野蠻的通訊能力撬码,別猶豫 ZeroMQ
附:Queue 和 Topic 的區(qū)別
- Queue:一個發(fā)布者發(fā)布消息儿倒,下面的接收者按隊列順序接收,比如發(fā)布了 10 個消息呜笑,兩個接收者 A,B 那就是 A,B總共會收到 10 條消息义桂,不重復(fù)。
- Topic:一個發(fā)布者發(fā)布消息蹈垢,有兩個接收者 A,B 來訂閱慷吊,那么發(fā)布了 10 條消息,A,B各收到10 條消息曹抬。
類型 | Topic | Queue |
---|---|---|
概要 | Publish Subscribe Messaging 發(fā)布訂閱消息 | Point-to-Point 點對點 |
有無狀態(tài) | Topic 數(shù)據(jù)默認不落地溉瓶,是無狀態(tài)的。 | Queue 數(shù)據(jù)默認會在 MQ 服務(wù)器上以文件形式保存谤民,比如 ActiveMQ 一般保存在?$AMQ_HOME\data\kr-store\data ?下面堰酿。也可以配置成 DB 存儲。 |
完整性保障 | 并不保證 Publisher 發(fā)布的每條數(shù)據(jù)张足,Subscriber 都能接受到触创。 | Queue 保證每條數(shù)據(jù)都能被 Receiver 接收。 |
消息是否會丟失 | 一般來說 Publisher 發(fā)布消息到某一個 Topic 時为牍,只有正在監(jiān)聽該 Topic 地址的 Sub 能夠接收到消息哼绑;如果沒有 Sub 在監(jiān)聽,該 Topic 就丟失了碉咆。 | Sender 發(fā)送消息到目標 Queue抖韩,Receiver 可以異步接收這個 Queue 上的消息。Queue 上的消息如果暫時沒有 Receiver 來取疫铜,也不會丟失茂浮。 |
消息發(fā)布接收策略 | 一對多的消息發(fā)布接收策略,監(jiān)聽同一個 Topic 地址的多個 Sub 都能收到 Publisher 發(fā)送的消息壳咕。Sub 接收完通知 MQ 服務(wù)器 | 一對一的消息發(fā)布接收策略席揽,一個 Sender 發(fā)送的消息,只能有一個 Receiver 接收谓厘。Receiver 接收完后幌羞,通知 MQ 服務(wù)器已接收,MQ 服務(wù)器對 Queue 里的消息采取刪除或其他操作庞呕。 |
RocketMQ 簡介
概述
消息隊列作為高并發(fā)系統(tǒng)的核心組件之一新翎,能夠幫助業(yè)務(wù)系統(tǒng)解構(gòu)提升開發(fā)效率和系統(tǒng)穩(wěn)定性程帕。主要具有以下優(yōu)勢:
- 削峰填谷:主要解決瞬時寫壓力大于應(yīng)用服務(wù)能力導(dǎo)致消息丟失、系統(tǒng)奔潰等問題
- 系統(tǒng)解耦:解決不同重要程度地啰、不同能力級別系統(tǒng)之間依賴導(dǎo)致一死全死
- 提升性能:當存在一對多調(diào)用時愁拭,可以發(fā)一條消息給消息系統(tǒng),讓消息系統(tǒng)通知相關(guān)系統(tǒng)
- 蓄流壓測:線上有些鏈路不好壓測亏吝,可以通過堆積一定量消息再放開來壓測
RocketMQ
Apache Alibaba RocketMQ 是一個消息中間件岭埠。消息中間件中有兩個角色:消息生產(chǎn)者和消息消費者。RocketMQ 里同樣有這兩個概念蔚鸥,消息生產(chǎn)者負責創(chuàng)建消息并發(fā)送到 RocketMQ 服務(wù)器惜论,RocketMQ 服務(wù)器會將消息持久化到磁盤,消息消費者從 RocketMQ 服務(wù)器拉取消息并提交給應(yīng)用消費止喷。
RocketMQ 特點
RocketMQ 是一款分布式馆类、隊列模型的消息中間件,具有以下特點:
- 支持嚴格的消息順序
- 支持 Topic 與 Queue 兩種模式
- 億級消息堆積能力
- 比較友好的分布式特性
- 同時支持 Push 與 Pull 方式消費消息
- 歷經(jīng)多次天貓雙十一海量消息考驗
RocketMQ 優(yōu)勢
目前主流的 MQ 主要是 RocketMQ弹谁、kafka乾巧、RabbitMQ,其主要優(yōu)勢有:
- 支持事務(wù)型消息(消息發(fā)送和 DB 操作保持兩方的最終一致性预愤,RabbitMQ 和 Kafka 不支持)
- 支持結(jié)合 RocketMQ 的多個系統(tǒng)之間數(shù)據(jù)最終一致性(多方事務(wù)沟于,二方事務(wù)是前提)
- 支持 18 個級別的延遲消息(RabbitMQ 和 Kafka 不支持)
- 支持指定次數(shù)和時間間隔的失敗消息重發(fā)(Kafka 不支持,RabbitMQ 需要手動確認)
- 支持 Consumer 端 Tag 過濾植康,減少不必要的網(wǎng)絡(luò)傳輸(RabbitMQ 和 Kafka 不支持)
- 支持重復(fù)消費(RabbitMQ 不支持旷太,Kafka 支持)
消息隊列對比參照表
基于 Docker 安裝 RocketMQ
docker-compose.yml
** 注意:啟動 RocketMQ Server + Broker + Console 至少需要 2G 內(nèi)存 **
version: '3.3'
services:
rmqnamesrv:
image: foxiswho/rocketmq:server
container_name: rmqnamesrv
ports:
- 9876:9876
volumes:
- ./data/logs:/opt/logs
- ./data/store:/opt/store
networks:
rmq:
aliases:
- rmqnamesrv
rmqbroker:
image: foxiswho/rocketmq:broker
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
volumes:
- ./data/logs:/opt/logs
- ./data/store:/opt/store
- ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf
environment:
NAMESRV_ADDR: "rmqnamesrv:9876"
JAVA_OPTS: " -Duser.home=/opt"
JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
command: mqbroker -c /etc/rocketmq/broker.conf
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqbroker
rmqconsole:
image: styletang/rocketmq-console-ng
container_name: rmqconsole
ports:
- 8080:8080
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqconsole
networks:
rmq:
external:
name: my_net
broker.conf
RocketMQ Broker 需要一個配置文件,按照上面的 Compose 配置销睁,我們需要在./data/brokerconf/
目錄下創(chuàng)建一個名為broker.conf
的配置文件供璧,內(nèi)容如下:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 所屬集群名字
brokerClusterName=DefaultCluster
# broker 名字,注意此處不同的配置文件填寫的不一樣榄攀,如果在 broker-a.properties 使用: broker-a,
# 在 broker-b.properties 使用: broker-b
brokerName=broker-a
# 0 表示 Master嗜傅,> 0 表示 Slave
brokerId=0
# nameServer地址,分號分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 啟動IP,如果 docker 報 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解決方式1 加上一句 producer.setVipChannelEnabled(false);檩赢,解決方式2 brokerIP1 設(shè)置宿主機IP,不要使用docker 內(nèi)部IP
# brokerIP1=192.168.0.253
# 在發(fā)送消息時违寞,自動創(chuàng)建服務(wù)器不存在的topic贞瞒,默認創(chuàng)建的隊列數(shù)
defaultTopicQueueNums=4
# 是否允許 Broker 自動創(chuàng)建 Topic,建議線下開啟趁曼,線上關(guān)閉 >!挡闰!這里仔細看是 false乒融,false掰盘,false
autoCreateTopicEnable=true
# 是否允許 Broker 自動創(chuàng)建訂閱組,建議線下開啟赞季,線上關(guān)閉
autoCreateSubscriptionGroup=true
# Broker 對外服務(wù)的監(jiān)聽端口
listenPort=10911
# 刪除文件時間點愧捕,默認凌晨4點
deleteWhen=04
# 文件保留時間,默認48小時
fileReservedTime=120
# commitLog 每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue 每個文件默認存 30W 條申钩,根據(jù)業(yè)務(wù)情況調(diào)整
mapedFileSizeConsumeQueue=300000
# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
# 存儲路徑
# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
# commitLog 存儲路徑
# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
# 消費隊列存儲
# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
# 消息索引存儲路徑
# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
# checkpoint 文件存儲路徑
# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
# abort 文件存儲路徑
# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker 的角色
# - ASYNC_MASTER 異步復(fù)制Master
# - SYNC_MASTER 同步雙寫Master
# - SLAVE
brokerRole=ASYNC_MASTER
# 刷盤方式
# - ASYNC_FLUSH 異步刷盤
# - SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
# 發(fā)消息線程池數(shù)量
# sendMessageThreadPoolNums=128
# 拉消息線程池數(shù)量
# pullMessageThreadPoolNums=128
RocketMQ 控制臺
訪問 http://rmqIP:8080 登入控制臺
RocketMQ的架構(gòu)
Nameserver
Nameserver的開發(fā)旨在輕量級次绘,多臺Nameserver互相獨立,彼此間互不通信撒遣,這樣的設(shè)計邮偎,保證了單臺Nameserver宕機,不影響Nameserver义黎。nameserver不會有頻繁的讀寫禾进,所以性能開銷非常小,穩(wěn)定性很高廉涕。
- NameServer用來保存活躍的broker列表泻云,包括Master和Slave。
- NameServer用來保存所有topic和該topic所有隊列的列表火的。
- NameServer用來保存所有broker的Filter列表壶愤。