1 Producer
- 一個(gè)應(yīng)用盡可能用一個(gè)Topic,消息子類(lèi)型用tags來(lái)標(biāo)識(shí)贮勃,tags可以由應(yīng)用自由設(shè)置
只有發(fā)送消息設(shè)置了tags,消費(fèi)方在訂閱消息時(shí)医男,才可以利用tags在broker做消息過(guò)濾
message.setTags("TagA");
如有可靠性需要养渴,消息發(fā)送成功或者失敗伊诵,要打印消息日志(sendresult和key信 息)
如果相同性質(zhì)的消息量大,使用批量消息,可以提升性能
建議消息大小不超過(guò)512KB
send(msg)會(huì)阻塞,如果有性能要求,可以使用異步的方式: send(msg, callback)
如果在一個(gè)JVM中崎弃,有多個(gè)生產(chǎn)者進(jìn)行大數(shù)據(jù)處理,建議:
● 少數(shù)生產(chǎn)者使用異步發(fā)送方式(3~5個(gè)就夠了)
● 通過(guò)setInstanceName方法茸俭,給每個(gè)生產(chǎn)者設(shè)置一個(gè)實(shí)例名-
send消息方法吊履,只要不拋異常,就代表發(fā)送成功 , 但是發(fā)送成功會(huì)有多個(gè)狀態(tài)调鬓, 在
sendStatus
類(lèi)里定義
● SEND_ OK : 消息發(fā)送成功
● FLUSH_ DISK_ TIMEOUT: 消息發(fā)送成功艇炎, 但是服務(wù)器刷盤(pán)超時(shí),消息已經(jīng)進(jìn)入
服務(wù)器隊(duì)列腾窝,只有此時(shí)服務(wù)器宕機(jī)缀踪,消息才會(huì)丟失
● FLUSH SLAVE_ TIMEOUT: 消息發(fā)送成功,但是服務(wù)器同步到Slave時(shí)超時(shí),
消息已經(jīng)進(jìn)入服務(wù)器隊(duì)列虹脯,只有此時(shí)服務(wù)器宕機(jī)驴娃,消息才會(huì)丟失
● SLAVE_ NOT_ AVAILABLE: 消息發(fā)送成功, 但是此時(shí)slave不可用循集, 消息已經(jīng)進(jìn)入服務(wù)器隊(duì)列唇敞,只有此時(shí)服務(wù)器宕機(jī),消息才會(huì)丟失
● 如果狀態(tài)是FLUSH_ DISK_ TIMEOUT或FLUSH SLAVE_ _TIMEOUT,并且Broker正好關(guān)閉
此時(shí)咒彤,可以丟棄這條消息疆柔,或者重發(fā)。但建議最好重發(fā)镶柱,由消費(fèi)端去重
● Producer向Broker發(fā)送請(qǐng)求會(huì)等待響應(yīng)旷档,但如果達(dá)到最大等待時(shí)間,未得到響應(yīng)歇拆,則客戶(hù)端將拋出RemotingTimeoutException
● 默認(rèn)等待時(shí)間是3秒鞋屈,如果使用send(msg, timeout),則可以自己設(shè)定超時(shí)時(shí)間,
但超時(shí)時(shí)間不能設(shè)置太小,應(yīng)為Borker需要一些時(shí)間來(lái)刷新磁盤(pán)或與從屬設(shè)備同步
● 如果該值超過(guò)syncFlushTimeout,則該值可能影響不大故觅,因?yàn)锽roker可能會(huì)在超時(shí)之前返回FLUSH_ SLAVE_ TIMEOUT或FLUSH_ SLAVE_ TIMEOUT的響應(yīng)
- 對(duì)于消息不可丟失應(yīng)用厂庇,務(wù)必要有消息重發(fā)機(jī)制
Producer的send方法本身支持內(nèi)部重試:
● 至多重試3次
● 如果發(fā)送失敗,則輪轉(zhuǎn)到下一-個(gè)Broker
● 這個(gè)方法的總耗時(shí)時(shí)間不超過(guò)sendMsgTimeout設(shè)置的值输吏,默認(rèn)10s
所以宋列,如果本身向broker發(fā)送消息產(chǎn)生超時(shí)異常,就不會(huì)再做重試
以上策略仍然不能保證消息一定發(fā)送成功,為保證消息一定成功评也,建議將消息存儲(chǔ)到db,由后臺(tái)線(xiàn)程定時(shí)重試炼杖,保證消息一定到達(dá)Broker
2 Consumer
每個(gè)消息在業(yè)務(wù)層面的唯一標(biāo)識(shí)碼,要設(shè)置到keys字段盗迟,方便將來(lái)定位消息丟失問(wèn)題
服務(wù)器會(huì)為每個(gè)消息創(chuàng)建索引(哈希索引)坤邪,應(yīng)用可以通過(guò)topic, key來(lái)查詢(xún)這條消息內(nèi)容,以及消息被誰(shuí)消費(fèi)
由于是哈希索引,請(qǐng)務(wù)必保證key盡可能唯一,這樣可以避免潛在的哈希沖突
String orderld =“1250689524981";
message.setKeys(orderld);
console客戶(hù)端使GUI
-
mvn clean package -Dmaven.test.skip=true
--server.port=8081 --rocketmq.config. namesrvAddr=192.168.1.17:9876
2.1 消費(fèi)者組和訂閱
不同的消費(fèi)群體可以獨(dú)立地消費(fèi)同樣的主題罚缕,并且每個(gè)消費(fèi)者都有自己的消費(fèi)偏移量(offsets) 艇纺。
確保同一組中的每個(gè)消費(fèi)者訂閱相同的主題
2.2 消息監(jiān)聽(tīng)器(MessageListener)
2.2.1 順序 (Orderly)
消費(fèi)者將鎖定每個(gè)MessageQueue,以確保每個(gè)消息被一個(gè)按順序使用。
這將導(dǎo)致性能損失
如果關(guān)心消息的順序時(shí)邮弹,它就很有用了黔衡。不建議拋出異常,可以返回
ConsumeOrderlyStatus. SUSPEND_ CURRENT_ QUEUE_ A_ MOMENT代替
2.2.2 消費(fèi)狀態(tài)(Consume Status)
對(duì)于MessageListenerConcurrently,可以返回RECONSUME_ LATER告訴消費(fèi)者腌乡,當(dāng)前不能消費(fèi)它并且希望以后重新消費(fèi)盟劫。然后可以繼續(xù)使用其他消息
對(duì)于MessageListenerOrderly, 如果關(guān)心順序,就不能跳過(guò)消息与纽,可以返回SUSPEND_ CURRENT_ QUEUE_ A_ MOMENT來(lái)告訴消費(fèi)者等待片刻侣签。
阻塞(Blocking)
不建議阻塞Listener,因?yàn)樗鼤?huì)阻塞線(xiàn)程池,最終可能會(huì)停止消費(fèi)程序
線(xiàn)程數(shù)
DefaultMQPushConsumer
消費(fèi)者使用一個(gè)ThreadPoolExecutor來(lái)處理內(nèi)部的消費(fèi)急迂,因此可以通過(guò)設(shè)
置
從何處開(kāi)始消費(fèi)
● 當(dāng)建立一個(gè)新的Consumer Group時(shí)影所,需要決定是否需要消費(fèi)Broker中已經(jīng)
存在的歷史消息。
● CONSUME_ FROM LAST_ OFFSET將忽略歷史消息僚碎,并消費(fèi)此后生成的任何
內(nèi)容猴娩。
● CONSUME_ FROM_ FIRST_ OFFSET將消耗Broker中存在的所有消息。還可以使用CONSUME_ FROM_ TIMESTAMP 來(lái)消費(fèi)在指定的時(shí)間戳之后生成的消息勺阐。
重復(fù)(冪等性)
RocketMQ無(wú)法避免消息重復(fù)卷中,如果業(yè)務(wù)對(duì)重復(fù)消費(fèi)非常敏感,務(wù)必在業(yè)務(wù)層面做去重:
● 通過(guò)記錄消息唯一鍵進(jìn)行去重
● 使用業(yè)務(wù)層面的狀態(tài)機(jī)制去重
3 最佳實(shí)踐之 NameServer
在Apache RocketMQ中皆看,NameServer用于協(xié)調(diào)分布式系統(tǒng)的每個(gè)組件仓坞,主要通過(guò)管理主題路由信息
來(lái)實(shí)現(xiàn)協(xié)調(diào)。
管理由兩部分組成:
- Brokers定期更新保存在每個(gè)名稱(chēng)服務(wù)器中的元數(shù)據(jù)
- 名稱(chēng)服務(wù)器是為客戶(hù)端提供最新的路由信息服務(wù)的腰吟,包括生產(chǎn)者无埃、消費(fèi)者和命令行客戶(hù)端。
因此毛雇,在啟動(dòng)brokers和clients之前嫉称,我們需要告訴他們?nèi)绾瓮ㄟ^(guò)給他們提
供的一個(gè)名稱(chēng)服務(wù)器地址列表來(lái)訪(fǎng)問(wèn)名稱(chēng)服務(wù)器。
在Apache RocketMQ中灵疮,可以用四種方式完成织阅。
3.1 編程方式
- 對(duì)于brokers,我們可以在broker的配置文件中指定
namesrvAddr=name-server-ip1:port;name-server-ip2:port
- 對(duì)于生產(chǎn)者和消費(fèi)者,我們可以給他們提供姓名服務(wù)器地址列表如下:
DefaultMQProducer producer = new DefaultMQProducer(" please_ rename_ unique_ group name");
producer.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(" please_ rename_ unique_ _group_ name");
consumer.setNamesrvAddr(" name-server1-ip:port;name-server2-ip:port");
- 如果從shell中使用管理命令行震捣,也可以這樣指定:
sh mqadmin command-name -n name-server-ip1:port;name-server-ip2:port -X OTHER-OPTION
- 一個(gè)簡(jiǎn)單的例子荔棉,在NameServer節(jié)點(diǎn)上查詢(xún)集群信息:
sh mqadmin -n localhost:9876 clusterList
- 如果將管理工具集成到自己的項(xiàng)目中闹炉,可以這樣
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(" please_ rename_ _unique_ group_ _name");
defaultMQAdminExt.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");
3.2 Java參數(shù)
NameServer的地址列表也可以通過(guò)java參數(shù)rocketmq.namesrv.addr
在啟動(dòng)之前指定
3.3 環(huán)境變量
可以設(shè)置NAMESRV_ ADDR環(huán)境變量。如果設(shè)置了润樱,Broker和clients將檢 查并使用其值
3.4 HTTP端點(diǎn)(HTTP Endpoint)
如果沒(méi)有使用前面提到的方法指定NameServer地址列表渣触,Apache RocketMQ將每2分鐘發(fā)送一次HTTP請(qǐng)求,以獲取和更新NameServer地址列表壹若,初始延遲10秒嗅钻。
默認(rèn)情況下,訪(fǎng)問(wèn)的HTTP地址是:
http://jmenv.tbsite.net:8080/rocketmq/nsaddr
通過(guò)Java參數(shù)rocketmq.namesrv.domain,可以修改jmenv.tbsite.net
通過(guò)Java參數(shù)rocketmq.namesrv.domain.subgroup,可以修改nsaddr
3.5 優(yōu)先級(jí)
編程方式> Java參數(shù)>環(huán)境變量> HTTP方式
4 JVM與Linux內(nèi)核配置
4.1 JVM配置
推薦使用JDK 1.8版本店展,使用服務(wù)器編譯器和8g堆养篓。
設(shè)置相同的Xms和Xmx值,以防止JVM動(dòng)態(tài)調(diào)整堆大小以獲得更好的性能赂蕴。
簡(jiǎn)單的JVM配置如下所示:
-server -Xms8g -Xmx8g -Xmn4g
如果不關(guān)心Broker的啟動(dòng)時(shí)間柳弄,可以預(yù)先觸摸Java堆,以確保在JVM初始化期間分配頁(yè)是更好的選擇睡腿。
-XX:+AlwaysPreTouch
- 禁用偏置鎖定可能會(huì)減少JVM暫停:
-XX: UseBiasedL ocking
- 對(duì)于垃圾回收语御,建議使用G1收集器:
-XX:+UseG1GC -XX:G1HeapRegionSize= 16m -XX:G lReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30
這些GC選項(xiàng)看起來(lái)有點(diǎn)激進(jìn),但事實(shí)證明它在生產(chǎn)環(huán)境中具有良好的性能。
-XX:MaxGCPauseMillis不要設(shè)置太小的值席怪,否則JVM將使用一個(gè)小的新生代应闯,這將導(dǎo)致非常頻繁的新生代GC。
- 推薦使用滾動(dòng)GC日志文件:
-XX:+UseGCLogFileRotation -Xx:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m
- 如果寫(xiě)入GC文件會(huì)增加代理的延遲挂捻,請(qǐng)將重定向GC日志文件考慮在內(nèi)存文件系統(tǒng)中:
-Xloggc:/dev/shm/mq_ gc. _%p.log
4.2 Linux內(nèi)核配置
- 在bin目錄中碉纺,有一個(gè)os.sh腳本列出了許多內(nèi)核參數(shù),只需要稍微的修改刻撒,就可以用于生產(chǎn)環(huán)境骨田。
- 以下參數(shù)需要注意,詳細(xì)信息請(qǐng)參考
https://www.kernel.org/doc/Documentation/sysctl/vm.txt