來(lái)源https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md
推薦小白看看,看完了后感覺(jué)受益良多,更熟悉相關(guān)使用的配置
1 生產(chǎn)者
1.1 發(fā)送消息注意事項(xiàng)
1 Tags的使用
一個(gè)應(yīng)用盡可能用一個(gè)Topic,而消息子類型則可以用tags來(lái)標(biāo)識(shí)竖瘾。tags可以由應(yīng)用自由設(shè)置寂玲,只有生產(chǎn)者在發(fā)送消息設(shè)置了tags悄但,消費(fèi)方在訂閱消息時(shí)才可以利用tags通過(guò)broker做消息過(guò)濾:message.setTags("TagA")。
2 Keys的使用
每個(gè)消息在業(yè)務(wù)層面的唯一標(biāo)識(shí)碼要設(shè)置到keys字段伞梯,方便將來(lái)定位消息丟失問(wèn)題奶稠。服務(wù)器會(huì)為每個(gè)消息創(chuàng)建索引(哈希索引)酝蜒,應(yīng)用可以通過(guò)topic卦停、key來(lái)查詢這條消息內(nèi)容向胡,以及消息被誰(shuí)消費(fèi)恼蓬。由于是哈希索引惊完,請(qǐng)務(wù)必保證key盡可能唯一,這樣可以避免潛在的哈希沖突处硬。
// 訂單Id
String orderId = "20034568923546";
message.setKeys(orderId);
3 日志的打印
消息發(fā)送成功或者失敗要打印消息日志小槐,務(wù)必要打印SendResult和key字段。send消息方法只要不拋異常荷辕,就代表發(fā)送成功凿跳。發(fā)送成功會(huì)有多個(gè)狀態(tài),在sendResult里定義疮方。以下對(duì)每個(gè)狀態(tài)進(jìn)行說(shuō)明:
- SEND_OK
消息發(fā)送成功控嗜。要注意的是消息發(fā)送成功也不意味著它是可靠的。要確保不會(huì)丟失任何消息骡显,還應(yīng)啟用同步Master服務(wù)器或同步刷盤(pán)疆栏,即SYNC_MASTER或SYNC_FLUSH曾掂。
- FLUSH_DISK_TIMEOUT
消息發(fā)送成功但是服務(wù)器刷盤(pán)超時(shí)。此時(shí)消息已經(jīng)進(jìn)入服務(wù)器隊(duì)列(內(nèi)存)壁顶,只有服務(wù)器宕機(jī)珠洗,消息才會(huì)丟失。消息存儲(chǔ)配置參數(shù)中可以設(shè)置刷盤(pán)方式和同步刷盤(pán)時(shí)間長(zhǎng)度若专,如果Broker服務(wù)器設(shè)置了刷盤(pán)方式為同步刷盤(pán)许蓖,即FlushDiskType=SYNC_FLUSH(默認(rèn)為異步刷盤(pán)方式),當(dāng)Broker服務(wù)器未在同步刷盤(pán)時(shí)間內(nèi)(默認(rèn)為5s)完成刷盤(pán)调衰,則將返回該狀態(tài)——刷盤(pán)超時(shí)膊爪。
- FLUSH_SLAVE_TIMEOUT
消息發(fā)送成功,但是服務(wù)器同步到Slave時(shí)超時(shí)嚎莉。此時(shí)消息已經(jīng)進(jìn)入服務(wù)器隊(duì)列蚁飒,只有服務(wù)器宕機(jī),消息才會(huì)丟失萝喘。如果Broker服務(wù)器的角色是同步Master淮逻,即SYNC_MASTER(默認(rèn)是異步Master即ASYNC_MASTER),并且從Broker服務(wù)器未在同步刷盤(pán)時(shí)間(默認(rèn)為5秒)內(nèi)完成與主服務(wù)器的同步阁簸,則將返回該狀態(tài)——數(shù)據(jù)同步到Slave服務(wù)器超時(shí)爬早。
- SLAVE_NOT_AVAILABLE
消息發(fā)送成功,但是此時(shí)Slave不可用启妹。如果Broker服務(wù)器的角色是同步Master筛严,即SYNC_MASTER(默認(rèn)是異步Master服務(wù)器即ASYNC_MASTER),但沒(méi)有配置slave Broker服務(wù)器饶米,則將返回該狀態(tài)——無(wú)Slave服務(wù)器可用桨啃。
1.2 消息發(fā)送失敗處理方式
Producer的send方法本身支持內(nèi)部重試,重試邏輯如下:
- 至多重試2次檬输。
- 如果同步模式發(fā)送失敗照瘾,則輪轉(zhuǎn)到下一個(gè)Broker,如果異步模式發(fā)送失敗丧慈,則只會(huì)在當(dāng)前Broker進(jìn)行重試析命。這個(gè)方法的總耗時(shí)時(shí)間不超過(guò)sendMsgTimeout設(shè)置的值,默認(rèn)10s逃默。
- 如果本身向broker發(fā)送消息產(chǎn)生超時(shí)異常弄抬,就不會(huì)再重試咧欣。
以上策略也是在一定程度上保證了消息可以發(fā)送成功黍匾。如果業(yè)務(wù)對(duì)消息可靠性要求比較高旭贬,建議應(yīng)用增加相應(yīng)的重試邏輯:比如調(diào)用send同步方法發(fā)送失敗時(shí),則嘗試將消息存儲(chǔ)到db吟税,然后由后臺(tái)線程定時(shí)重試凹耙,確保消息一定到達(dá)Broker鸟蟹。
上述db重試方式為什么沒(méi)有集成到MQ客戶端內(nèi)部做,而是要求應(yīng)用自己去完成使兔,主要基于以下幾點(diǎn)考慮:首先建钥,MQ的客戶端設(shè)計(jì)為無(wú)狀態(tài)模式,方便任意的水平擴(kuò)展虐沥,且對(duì)機(jī)器資源的消耗僅僅是cpu熊经、內(nèi)存、網(wǎng)絡(luò)欲险。其次镐依,如果MQ客戶端內(nèi)部集成一個(gè)KV存儲(chǔ)模塊,那么數(shù)據(jù)只有同步落盤(pán)才能較可靠天试,而同步落盤(pán)本身性能開(kāi)銷(xiāo)較大槐壳,所以通常會(huì)采用異步落盤(pán),又由于應(yīng)用關(guān)閉過(guò)程不受MQ運(yùn)維人員控制喜每,可能經(jīng)常會(huì)發(fā)生 kill -9 這樣暴力方式關(guān)閉务唐,造成數(shù)據(jù)沒(méi)有及時(shí)落盤(pán)而丟失。第三带兜,Producer所在機(jī)器的可靠性較低枫笛,一般為虛擬機(jī),不適合存儲(chǔ)重要數(shù)據(jù)刚照。綜上刑巧,建議重試過(guò)程交由應(yīng)用來(lái)控制。
1.3選擇oneway形式發(fā)送
通常消息的發(fā)送是這樣一個(gè)過(guò)程:
- 客戶端發(fā)送請(qǐng)求到服務(wù)器
- 服務(wù)器處理請(qǐng)求
- 服務(wù)器向客戶端返回應(yīng)答
所以无畔,一次消息發(fā)送的耗時(shí)時(shí)間是上述三個(gè)步驟的總和啊楚,而某些場(chǎng)景要求耗時(shí)非常短,但是對(duì)可靠性要求并不高浑彰,例如日志收集類應(yīng)用恭理,此類應(yīng)用可以采用oneway形式調(diào)用,oneway形式只發(fā)送請(qǐng)求不等待應(yīng)答闸昨,而發(fā)送請(qǐng)求在客戶端實(shí)現(xiàn)層面僅僅是一個(gè)操作系統(tǒng)系統(tǒng)調(diào)用的開(kāi)銷(xiāo)蚯斯,即將數(shù)據(jù)寫(xiě)入客戶端的socket緩沖區(qū),此過(guò)程耗時(shí)通常在微秒級(jí)饵较。
2 消費(fèi)者
2.1 消費(fèi)過(guò)程冪等
RocketMQ無(wú)法避免消息重復(fù)(Exactly-Once),所以如果業(yè)務(wù)對(duì)消費(fèi)重復(fù)非常敏感遭赂,務(wù)必要在業(yè)務(wù)層面進(jìn)行去重處理循诉。可以借助關(guān)系數(shù)據(jù)庫(kù)進(jìn)行去重撇他。首先需要確定消息的唯一鍵茄猫,可以是msgId狈蚤,也可以是消息內(nèi)容中的唯一標(biāo)識(shí)字段,例如訂單Id等划纽。在消費(fèi)之前判斷唯一鍵是否在關(guān)系數(shù)據(jù)庫(kù)中存在脆侮。如果不存在則插入,并消費(fèi)勇劣,否則跳過(guò)靖避。(實(shí)際過(guò)程要考慮原子性問(wèn)題,判斷是否存在可以嘗試插入比默,如果報(bào)主鍵沖突幻捏,則插入失敗,直接跳過(guò))
msgId一定是全局唯一標(biāo)識(shí)符命咐,但是實(shí)際使用中篡九,可能會(huì)存在相同的消息有兩個(gè)不同msgId的情況(消費(fèi)者主動(dòng)重發(fā)、因客戶端重投機(jī)制導(dǎo)致的重復(fù)等)醋奠,這種情況就需要使業(yè)務(wù)字段進(jìn)行重復(fù)消費(fèi)榛臼。
2.2 消費(fèi)速度慢的處理方式
1 提高消費(fèi)并行度
絕大部分消息消費(fèi)行為都屬于 IO 密集型,即可能是操作數(shù)據(jù)庫(kù)窜司,或者調(diào)用 RPC讽坏,這類消費(fèi)行為的消費(fèi)速度在于后端數(shù)據(jù)庫(kù)或者外系統(tǒng)的吞吐量,通過(guò)增加消費(fèi)并行度例证,可以提高總的消費(fèi)吞吐量路呜,但是并行度增加到一定程度,反而會(huì)下降织咧。所以胀葱,應(yīng)用必須要設(shè)置合理的并行度。 如下有幾種修改消費(fèi)并行度的方法:
- 同一個(gè) ConsumerGroup 下笙蒙,通過(guò)增加 Consumer 實(shí)例數(shù)量來(lái)提高并行度(需要注意的是超過(guò)訂閱隊(duì)列數(shù)的 Consumer 實(shí)例無(wú)效)抵屿。可以通過(guò)加機(jī)器捅位,或者在已有機(jī)器啟動(dòng)多個(gè)進(jìn)程的方式轧葛。
- 提高單個(gè) Consumer 的消費(fèi)并行線程,通過(guò)修改參數(shù) consumeThreadMin艇搀、consumeThreadMax實(shí)現(xiàn)尿扯。
2 批量方式消費(fèi)
某些業(yè)務(wù)流程如果支持批量方式消費(fèi),則可以很大程度上提高消費(fèi)吞吐量焰雕,例如訂單扣款類應(yīng)用衷笋,一次處理一個(gè)訂單耗時(shí) 1 s,一次處理 10 個(gè)訂單可能也只耗時(shí) 2 s矩屁,這樣即可大幅度提高消費(fèi)的吞吐量辟宗,通過(guò)設(shè)置 consumer的 consumeMessageBatchMaxSize 返個(gè)參數(shù)爵赵,默認(rèn)是 1,即一次只消費(fèi)一條消息泊脐,例如設(shè)置為 N空幻,那么每次消費(fèi)的消息數(shù)小于等于 N。
3 跳過(guò)非重要消息
發(fā)生消息堆積時(shí)容客,如果消費(fèi)速度一直追不上發(fā)送速度秕铛,如果業(yè)務(wù)對(duì)數(shù)據(jù)要求不高的話,可以選擇丟棄不重要的消息耘柱。例如如捅,當(dāng)某個(gè)隊(duì)列的消息數(shù)堆積到100000條以上,則嘗試丟棄部分或全部消息调煎,這樣就可以快速追上發(fā)送消息的速度镜遣。示例代碼如下:
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
long offset = msgs.get(0).getQueueOffset();
String maxOffset =
msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
long diff = Long.parseLong(maxOffset) - offset;
if (diff > 100000) {
// TODO 消息堆積情況的特殊處理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// TODO 正常消費(fèi)過(guò)程
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
4 優(yōu)化每條消息消費(fèi)過(guò)程
舉例如下,某條消息的消費(fèi)過(guò)程如下:
- 根據(jù)消息從 DB 查詢【數(shù)據(jù) 1】
- 根據(jù)消息從 DB 查詢【數(shù)據(jù) 2】
- 復(fù)雜的業(yè)務(wù)計(jì)算
- 向 DB 插入【數(shù)據(jù) 3】
- 向 DB 插入【數(shù)據(jù) 4】
這條消息的消費(fèi)過(guò)程中有4次與 DB的 交互士袄,如果按照每次 5ms 計(jì)算悲关,那么總共耗時(shí) 20ms,假設(shè)業(yè)務(wù)計(jì)算耗時(shí) 5ms娄柳,那么總過(guò)耗時(shí) 25ms寓辱,所以如果能把 4 次 DB 交互優(yōu)化為 2 次,那么總耗時(shí)就可以優(yōu)化到 15ms赤拒,即總體性能提高了 40%秫筏。所以應(yīng)用如果對(duì)時(shí)延敏感的話,可以把DB部署在SSD硬盤(pán)挎挖,相比于SCSI磁盤(pán)这敬,前者的RT會(huì)小很多。
2.3 消費(fèi)打印日志
如果消息量較少蕉朵,建議在消費(fèi)入口方法打印消息崔涂,消費(fèi)耗時(shí)等,方便后續(xù)排查問(wèn)題始衅。
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
log.info("RECEIVE_MSG_BEGIN: " + msgs.toString());
// TODO 正常消費(fèi)過(guò)程
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
如果能打印每條消息消費(fèi)耗時(shí)冷蚂,那么在排查消費(fèi)慢等線上問(wèn)題時(shí),會(huì)更方便汛闸。
2.4 其他消費(fèi)建議
1 關(guān)于消費(fèi)者和訂閱
第一件需要注意的事情是蝙茶,不同的消費(fèi)者組可以獨(dú)立的消費(fèi)一些 topic,并且每個(gè)消費(fèi)者組都有自己的消費(fèi)偏移量蛉拙,請(qǐng)確保同一組內(nèi)的每個(gè)消費(fèi)者訂閱信息保持一致尸闸。
2 關(guān)于有序消息
消費(fèi)者將鎖定每個(gè)消息隊(duì)列,以確保他們被逐個(gè)消費(fèi)孕锄,雖然這將會(huì)導(dǎo)致性能下降吮廉,但是當(dāng)你關(guān)心消息順序的時(shí)候會(huì)很有用。我們不建議拋出異常畸肆,你可以返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 作為替代宦芦。
3 關(guān)于并發(fā)消費(fèi)
顧名思義,消費(fèi)者將并發(fā)消費(fèi)這些消息轴脐,建議你使用它來(lái)獲得良好性能调卑,我們不建議拋出異常,你可以返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 作為替代大咱。
4 關(guān)于消費(fèi)狀態(tài)Consume Status
對(duì)于并發(fā)的消費(fèi)監(jiān)聽(tīng)器恬涧,你可以返回 RECONSUME_LATER 來(lái)通知消費(fèi)者現(xiàn)在不能消費(fèi)這條消息,并且希望可以稍后重新消費(fèi)它碴巾。然后溯捆,你可以繼續(xù)消費(fèi)其他消息。對(duì)于有序的消息監(jiān)聽(tīng)器厦瓢,因?yàn)槟汴P(guān)心它的順序提揍,所以不能跳過(guò)消息,但是你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT 告訴消費(fèi)者等待片刻煮仇。
5 關(guān)于Blocking
不建議阻塞監(jiān)聽(tīng)器劳跃,因?yàn)樗鼤?huì)阻塞線程池,并最終可能會(huì)終止消費(fèi)進(jìn)程
6 關(guān)于線程數(shù)設(shè)置
消費(fèi)者使用 ThreadPoolExecutor 在內(nèi)部對(duì)消息進(jìn)行消費(fèi)浙垫,所以你可以通過(guò)設(shè)置 setConsumeThreadMin 或 setConsumeThreadMax 來(lái)改變它刨仑。
7 關(guān)于消費(fèi)位點(diǎn)
當(dāng)建立一個(gè)新的消費(fèi)者組時(shí),需要決定是否需要消費(fèi)已經(jīng)存在于 Broker 中的歷史消息CONSUME_FROM_LAST_OFFSET 將會(huì)忽略歷史消息夹姥,并消費(fèi)之后生成的任何消息杉武。CONSUME_FROM_FIRST_OFFSET 將會(huì)消費(fèi)每個(gè)存在于 Broker 中的信息。你也可以使用 CONSUME_FROM_TIMESTAMP 來(lái)消費(fèi)在指定時(shí)間戳后產(chǎn)生的消息佃声。
3 Broker
3.1 Broker 角色
Broker 角色分為 ASYNC_MASTER(異步主機(jī))圾亏、SYNC_MASTER(同步主機(jī))以及SLAVE(從機(jī))缰趋。如果對(duì)消息的可靠性要求比較嚴(yán)格仔涩,可以采用 SYNC_MASTER加SLAVE的部署方式霞揉。如果對(duì)消息可靠性要求不高,可以采用ASYNC_MASTER加SLAVE的部署方式胁住。如果只是測(cè)試方便跷坝,則可以選擇僅ASYNC_MASTER或僅SYNC_MASTER的部署方式淮韭。
3.2 FlushDiskType
SYNC_FLUSH(同步刷新)相比于ASYNC_FLUSH(異步處理)會(huì)損失很多性能,但是也更可靠贴届,所以需要根據(jù)實(shí)際的業(yè)務(wù)場(chǎng)景做好權(quán)衡靠粪。
3.3 Broker 配置
參數(shù)名 | 默認(rèn)值 | 說(shuō)明 |
---|---|---|
listenPort | 10911 | 接受客戶端連接的監(jiān)聽(tīng)端口 |
namesrvAddr | null | nameServer 地址 |
brokerIP1 | 網(wǎng)卡的 InetAddress | 當(dāng)前 broker 監(jiān)聽(tīng)的 IP |
brokerIP2 | 跟 brokerIP1 一樣 | 存在主從 broker 時(shí)蜡吧,如果在 broker 主節(jié)點(diǎn)上配置了 brokerIP2 屬性,broker 從節(jié)點(diǎn)會(huì)連接主節(jié)點(diǎn)配置的 brokerIP2 進(jìn)行同步 |
brokerName | null | broker 的名稱 |
brokerClusterName | DefaultCluster | 本 broker 所屬的 Cluser 名稱 |
brokerId | 0 | broker id, 0 表示 master, 其他的正整數(shù)表示 slave |
storePathCommitLog | $HOME/store/commitlog/ | 存儲(chǔ) commit log 的路徑 |
storePathConsumerQueue | $HOME/store/consumequeue/ | 存儲(chǔ) consume queue 的路徑 |
mappedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commit log 的映射文件大小 |
deleteWhen | 04 | 在每天的什么時(shí)間刪除已經(jīng)超過(guò)文件保留時(shí)間的 commit log |
fileReservedTime | 72 | 以小時(shí)計(jì)算的文件保留時(shí)間 |
brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |
flushDiskType | ASYNC_FLUSH | SYNC_FLUSH/ASYNC_FLUSH SYNC_FLUSH 模式下的 broker 保證在收到確認(rèn)生產(chǎn)者之前將消息刷盤(pán)占键。ASYNC_FLUSH 模式下的 broker 則利用刷盤(pán)一組消息的模式昔善,可以取得更好的性能。 |
4 NameServer
RocketMQ 中捞慌,Name Servers 被設(shè)計(jì)用來(lái)做簡(jiǎn)單的路由管理耀鸦。其職責(zé)包括:
- Brokers 定期向每個(gè)名稱服務(wù)器注冊(cè)路由數(shù)據(jù)柬批。
- 名稱服務(wù)器為客戶端啸澡,包括生產(chǎn)者,消費(fèi)者和命令行客戶端提供最新的路由信息氮帐。
5 客戶端配置
相對(duì)于RocketMQ的Broker集群嗅虏,生產(chǎn)者和消費(fèi)者都是客戶端。本小節(jié)主要描述生產(chǎn)者和消費(fèi)者公共的行為配置上沐。
5.1 客戶端尋址方式
RocketMQ可以令客戶端找到Name Server, 然后通過(guò)Name Server再找到Broker皮服。如下所示有多種配置方式,優(yōu)先級(jí)由高到低参咙,高優(yōu)先級(jí)會(huì)覆蓋低優(yōu)先級(jí)龄广。
- 代碼中指定Name Server地址,多個(gè)namesrv地址之間用分號(hào)分割
producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
- Java啟動(dòng)參數(shù)中指定Name Server地址
-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876
- 環(huán)境變量指定Name Server地址
export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
- HTTP靜態(tài)服務(wù)器尋址(默認(rèn))
客戶端啟動(dòng)后蕴侧,會(huì)定時(shí)訪問(wèn)一個(gè)靜態(tài)HTTP服務(wù)器择同,地址如下:http://jmenv.tbsite.net:8080/rocketmq/nsaddr,這個(gè)URL的返回內(nèi)容如下:
192.168.0.1:9876;192.168.0.2:9876
客戶端默認(rèn)每隔2分鐘訪問(wèn)一次這個(gè)HTTP服務(wù)器净宵,并更新本地的Name Server地址敲才。URL已經(jīng)在代碼中硬編碼,可通過(guò)修改/etc/hosts文件來(lái)改變要訪問(wèn)的服務(wù)器择葡,例如在/etc/hosts增加如下配置:
10.232.22.67 jmenv.taobao.net
推薦使用HTTP靜態(tài)服務(wù)器尋址方式紧武,好處是客戶端部署簡(jiǎn)單,且Name Server集群可以熱升級(jí)敏储。
5.2 客戶端配置
DefaultMQProducer阻星、TransactionMQProducer、DefaultMQPushConsumer已添、DefaultMQPullConsumer都繼承于ClientConfig類妥箕,ClientConfig為客戶端的公共配置類≡吞迹客戶端的配置都是get矾踱、set形式,每個(gè)參數(shù)都可以用spring來(lái)配置疏哗,也可以在代碼中配置呛讲,例如namesrvAddr這個(gè)參數(shù)可以這樣配置,producer.setNamesrvAddr("192.168.0.1:9876"),其他參數(shù)同理贝搁。
1 客戶端的公共配置
參數(shù)名 | 默認(rèn)值 | 說(shuō)明 |
---|---|---|
namesrvAddr | Name Server地址列表吗氏,多個(gè)NameServer地址用分號(hào)隔開(kāi) | |
clientIP | 本機(jī)IP | 客戶端本機(jī)IP地址,某些機(jī)器會(huì)發(fā)生無(wú)法識(shí)別客戶端IP地址情況雷逆,需要應(yīng)用在代碼中強(qiáng)制指定 |
instanceName | DEFAULT | 客戶端實(shí)例名稱弦讽,客戶端創(chuàng)建的多個(gè)Producer、Consumer實(shí)際是共用一個(gè)內(nèi)部實(shí)例(這個(gè)實(shí)例包含網(wǎng)絡(luò)連接膀哲、線程資源等) |
clientCallbackExecutorThreads | 4 | 通信層異步回調(diào)線程數(shù) |
pollNameServerInteval | 30000 | 輪詢Name Server間隔時(shí)間往产,單位毫秒 |
heartbeatBrokerInterval | 30000 | 向Broker發(fā)送心跳間隔時(shí)間,單位毫秒 |
persistConsumerOffsetInterval | 5000 | 持久化Consumer消費(fèi)進(jìn)度間隔時(shí)間某宪,單位毫秒 |
2 Producer配置
參數(shù)名 | 默認(rèn)值 | 說(shuō)明 |
---|---|---|
producerGroup | DEFAULT_PRODUCER | Producer組名仿村,多個(gè)Producer如果屬于一個(gè)應(yīng)用,發(fā)送同樣的消息兴喂,則應(yīng)該將它們歸為同一組 |
createTopicKey | TBW102 | 在發(fā)送消息時(shí)蔼囊,自動(dòng)創(chuàng)建服務(wù)器不存在的topic,需要指定Key衣迷,該Key可用于配置發(fā)送消息所在topic的默認(rèn)路由畏鼓。 |
defaultTopicQueueNums | 4 | 在發(fā)送消息,自動(dòng)創(chuàng)建服務(wù)器不存在的topic時(shí)壶谒,默認(rèn)創(chuàng)建的隊(duì)列數(shù) |
sendMsgTimeout | 10000 | 發(fā)送消息超時(shí)時(shí)間云矫,單位毫秒 |
compressMsgBodyOverHowmuch | 4096 | 消息Body超過(guò)多大開(kāi)始?jí)嚎s(Consumer收到消息會(huì)自動(dòng)解壓縮),單位字節(jié) |
retryAnotherBrokerWhenNotStoreOK | FALSE | 如果發(fā)送消息返回sendResult佃迄,但是sendStatus!=SEND_OK泼差,是否重試發(fā)送 |
retryTimesWhenSendFailed | 2 | 如果消息發(fā)送失敗,最大重試次數(shù)呵俏,該參數(shù)只對(duì)同步發(fā)送模式起作用 |
maxMessageSize | 4MB | 客戶端限制的消息大小堆缘,超過(guò)報(bào)錯(cuò),同時(shí)服務(wù)端也會(huì)限制普碎,所以需要跟服務(wù)端配合使用吼肥。 |
transactionCheckListener | 事務(wù)消息回查監(jiān)聽(tīng)器,如果發(fā)送事務(wù)消息麻车,必須設(shè)置 | |
checkThreadPoolMinSize | 1 | Broker回查Producer事務(wù)狀態(tài)時(shí)缀皱,線程池最小線程數(shù) |
checkThreadPoolMaxSize | 1 | Broker回查Producer事務(wù)狀態(tài)時(shí),線程池最大線程數(shù) |
checkRequestHoldMax | 2000 | Broker回查Producer事務(wù)狀態(tài)時(shí)动猬,Producer本地緩沖請(qǐng)求隊(duì)列大小 |
RPCHook | null | 該參數(shù)是在Producer創(chuàng)建時(shí)傳入的啤斗,包含消息發(fā)送前的預(yù)處理和消息響應(yīng)后的處理兩個(gè)接口,用戶可以在第一個(gè)接口中做一些安全控制或者其他操作赁咙。 |
3 PushConsumer配置
參數(shù)名 | 默認(rèn)值 | 說(shuō)明 |
---|---|---|
consumerGroup | DEFAULT_CONSUMER | Consumer組名钮莲,多個(gè)Consumer如果屬于一個(gè)應(yīng)用免钻,訂閱同樣的消息,且消費(fèi)邏輯一致崔拥,則應(yīng)該將它們歸為同一組 |
messageModel | CLUSTERING | 消費(fèi)模型支持集群消費(fèi)和廣播消費(fèi)兩種 |
consumeFromWhere | CONSUME_FROM_LAST_OFFSET | Consumer啟動(dòng)后极舔,默認(rèn)從上次消費(fèi)的位置開(kāi)始消費(fèi),這包含兩種情況:一種是上次消費(fèi)的位置未過(guò)期链瓦,則消費(fèi)從上次中止的位置進(jìn)行拆魏;一種是上次消費(fèi)位置已經(jīng)過(guò)期,則從當(dāng)前隊(duì)列第一條消息開(kāi)始消費(fèi) |
consumeTimestamp | 半個(gè)小時(shí)前 | 只有當(dāng)consumeFromWhere值為CONSUME_FROM_TIMESTAMP時(shí)才起作用慈俯。 |
allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法實(shí)現(xiàn)策略 |
subscription | 訂閱關(guān)系 | |
messageListener | 消息監(jiān)聽(tīng)器 | |
offsetStore | 消費(fèi)進(jìn)度存儲(chǔ) | |
consumeThreadMin | 10 | 消費(fèi)線程池最小線程數(shù) |
consumeThreadMax | 20 | 消費(fèi)線程池最大線程數(shù) |
consumeConcurrentlyMaxSpan | 2000 | 單隊(duì)列并行消費(fèi)允許的最大跨度 |
pullThresholdForQueue | 1000 | 拉消息本地隊(duì)列緩存消息最大數(shù) |
pullInterval | 0 | 拉消息間隔渤刃,由于是長(zhǎng)輪詢,所以為0肥卡,但是如果應(yīng)用為了流控溪掀,也可以設(shè)置大于0的值,單位毫秒 |
consumeMessageBatchMaxSize | 1 | 批量消費(fèi)步鉴,一次消費(fèi)多少條消息 |
pullBatchSize | 32 | 批量拉消息,一次最多拉多少條 |
4 PullConsumer配置
參數(shù)名 | 默認(rèn)值 | 說(shuō)明 |
---|---|---|
consumerGroup | DEFAULT_CONSUMER | Consumer組名璃哟,多個(gè)Consumer如果屬于一個(gè)應(yīng)用氛琢,訂閱同樣的消息,且消費(fèi)邏輯一致随闪,則應(yīng)該將它們歸為同一組 |
brokerSuspendMaxTimeMillis | 20000 | 長(zhǎng)輪詢阳似,Consumer拉消息請(qǐng)求在Broker掛起最長(zhǎng)時(shí)間,單位毫秒 |
consumerTimeoutMillisWhenSuspend | 30000 | 長(zhǎng)輪詢铐伴,Consumer拉消息請(qǐng)求在Broker掛起超過(guò)指定時(shí)間撮奏,客戶端認(rèn)為超時(shí),單位毫秒 |
consumerPullTimeoutMillis | 10000 | 非長(zhǎng)輪詢当宴,拉消息超時(shí)時(shí)間畜吊,單位毫秒 |
messageModel | BROADCASTING | 消息支持兩種模式:集群消費(fèi)和廣播消費(fèi) |
messageQueueListener | 監(jiān)聽(tīng)隊(duì)列變化 | |
offsetStore | 消費(fèi)進(jìn)度存儲(chǔ) | |
registerTopics | 注冊(cè)的topic集合 | |
allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法實(shí)現(xiàn)策略 |
5 Message數(shù)據(jù)結(jié)構(gòu)
字段名 | 默認(rèn)值 | 說(shuō)明 |
---|---|---|
Topic | null | 必填,消息所屬topic的名稱 |
Body | null | 必填户矢,消息體 |
Tags | null | 選填玲献,消息標(biāo)簽,方便服務(wù)器過(guò)濾使用梯浪。目前只支持每個(gè)消息設(shè)置一個(gè)tag |
Keys | null | 選填捌年,代表這條消息的業(yè)務(wù)關(guān)鍵詞,服務(wù)器會(huì)根據(jù)keys創(chuàng)建哈希索引挂洛,設(shè)置后叼屠,可以在Console系統(tǒng)根據(jù)Topic驼抹、Keys來(lái)查詢消息,由于是哈希索引史辙,請(qǐng)盡可能保證key唯一,例如訂單號(hào)瞳秽,商品Id等。 |
Flag | 0 | 選填,完全由應(yīng)用來(lái)設(shè)置哈肖,RocketMQ不做干預(yù) |
DelayTimeLevel | 0 | 選填,消息延時(shí)級(jí)別念秧,0表示不延時(shí)淤井,大于0會(huì)延時(shí)特定的時(shí)間才會(huì)被消費(fèi) |
WaitStoreMsgOK | TRUE | 選填,表示消息是否在服務(wù)器落盤(pán)后才返回應(yīng)答摊趾。 |
6 系統(tǒng)配置
本小節(jié)主要介紹系統(tǒng)(JVM/OS)相關(guān)的配置币狠。
6.1 JVM選項(xiàng)
推薦使用最新發(fā)布的JDK 1.8版本。通過(guò)設(shè)置相同的Xms和Xmx值來(lái)防止JVM調(diào)整堆大小以獲得更好的性能砾层。簡(jiǎn)單的JVM配置如下所示:
-server -Xms8g -Xmx8g -Xmn4g
如果您不關(guān)心RocketMQ Broker的啟動(dòng)時(shí)間漩绵,還有一種更好的選擇,就是通過(guò)“預(yù)觸摸”Java堆以確保在JVM初始化期間每個(gè)頁(yè)面都將被分配肛炮。那些不關(guān)心啟動(dòng)時(shí)間的人可以啟用它: -XX:+AlwaysPreTouch
禁用偏置鎖定可能會(huì)減少JVM暫停止吐, -XX:-UseBiasedLocking
至于垃圾回收,建議使用帶JDK 1.8的G1收集器侨糟。
-XX:+UseG1GC -XX:G1HeapRegionSize=16m
-XX:G1ReservePercent=25
-XX:InitiatingHeapOccupancyPercent=30
這些GC選項(xiàng)看起來(lái)有點(diǎn)激進(jìn)碍扔,但事實(shí)證明它在我們的生產(chǎn)環(huán)境中具有良好的性能。另外不要把-XX:MaxGCPauseMillis的值設(shè)置太小秕重,否則JVM將使用一個(gè)小的年輕代來(lái)實(shí)現(xiàn)這個(gè)目標(biāo)不同,這將導(dǎo)致非常頻繁的minor GC,所以建議使用rolling GC日志文件:
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=5
-XX:GCLogFileSize=30m
如果寫(xiě)入GC文件會(huì)增加代理的延遲溶耘,可以考慮將GC日志文件重定向到內(nèi)存文件系統(tǒng):
-Xloggc:/dev/shm/mq_gc_%p.log123
6.2 Linux內(nèi)核參數(shù)
os.sh腳本在bin文件夾中列出了許多內(nèi)核參數(shù)二拐,可以進(jìn)行微小的更改然后用于生產(chǎn)用途。下面的參數(shù)需要注意凳兵,更多細(xì)節(jié)請(qǐng)參考/proc/sys/vm/*的文檔
- vm.extra_free_kbytes百新,告訴VM在后臺(tái)回收(kswapd)啟動(dòng)的閾值與直接回收(通過(guò)分配進(jìn)程)的閾值之間保留額外的可用內(nèi)存。RocketMQ使用此參數(shù)來(lái)避免內(nèi)存分配中的長(zhǎng)延遲留荔。(與具體內(nèi)核版本相關(guān))
- vm.min_free_kbytes吟孙,如果將其設(shè)置為低于1024KB,將會(huì)巧妙的將系統(tǒng)破壞聚蝶,并且系統(tǒng)在高負(fù)載下容易出現(xiàn)死鎖杰妓。
- vm.max_map_count,限制一個(gè)進(jìn)程可能具有的最大內(nèi)存映射區(qū)域數(shù)碘勉。RocketMQ將使用mmap加載CommitLog和ConsumeQueue巷挥,因此建議將為此參數(shù)設(shè)置較大的值。(agressiveness --> aggressiveness)
- vm.swappiness验靡,定義內(nèi)核交換內(nèi)存頁(yè)面的積極程度倍宾。較高的值會(huì)增加攻擊性雏节,較低的值會(huì)減少交換量。建議將值設(shè)置為10來(lái)避免交換延遲高职。
- File descriptor limits钩乍,RocketMQ需要為文件(CommitLog和ConsumeQueue)和網(wǎng)絡(luò)連接打開(kāi)文件描述符。我們建議設(shè)置文件描述符的值為655350怔锌。
- Disk scheduler寥粹,RocketMQ建議使用I/O截止時(shí)間調(diào)度器,它試圖為請(qǐng)求提供有保證的延遲埃元。