在上一篇《RocketMQ實戰(zhàn)(一)》中已經(jīng)為大家初步介紹了下RocketMQ以及搭建了雙Master環(huán)境,接下來繼續(xù)為大家介紹湾盒!
Quick Start
寫一個簡單的生產(chǎn)者倔撞、消費者,帶大家快速體驗RocketMQ~
Maven配置:
生產(chǎn)者:
消費者:
無論生產(chǎn)者、消費者都必須給出GroupName缰揪,而且具有唯一性!
生產(chǎn)到哪個Topic的哪個Tag下葱淳,消費者也是從Topic的哪個Tag進行消費钝腺,可見這個Tag有點類似于JMS Selector機制,即實現(xiàn)消息的過濾赞厕。
生產(chǎn)者艳狐、消費者需要設(shè)置NameServer地址。
這里皿桑,采用的是Consumer Push的方式毫目,即設(shè)置Listener機制回調(diào)蔬啡,相當(dāng)于開啟了一個線程。以后為大家介紹Consumer Pull的方式镀虐。
我們看一下運行結(jié)果:
仔細(xì)看看生產(chǎn)者結(jié)果輸出箱蟆,你會發(fā)現(xiàn),有的消息發(fā)往broker-a刮便,有的在broker-b上空猜,自動實現(xiàn)了消息的負(fù)載均衡!
這里消費消息是沒有什么順序的恨旱,以后我們在來談消息的順序性辈毯。
我們再來看一看管控臺:
在多Master模式中,如果某個Master進程掛了窖杀,顯然這臺broker將不可用漓摩,上面的消息也將無法消費,要知道開源版本的RocketMQ是沒有提供切換程序入客,來自動恢復(fù)故障的管毙,因此在實際開發(fā)中,我們一般提供一個監(jiān)聽程序桌硫,用于監(jiān)控Master的狀態(tài)夭咬。
在ActiveMQ中,生產(chǎn)消息的時候會提供是否持久化的選擇铆隘,但是對于RocketMQ而言卓舵,消息是一定會被持久化的!
上面的消費者采用的是Push Consumer的方式膀钠,那么監(jiān)聽的Listener中的消息List到底是多少條呢掏湾?雖然提供了API,如consumer.setConsumeMessageBatchMaxSize(10)肿嘲,實際上即使設(shè)置了批量的條數(shù)迹缀,但是注意了磷雇,是最大是10一铅,并不意味著每次batch的都是10票腰,只有在消息有擠壓的情況下才有可能。而且Push Consumer的最佳實踐方式就是一條條的消費封救,如果需要batch拇涤,可以使用Pull Consumer。
務(wù)必保證先啟動消費者進行Topic訂閱誉结,然后在啟動生產(chǎn)者進行生產(chǎn)(否則極有可能導(dǎo)致消息的重復(fù)消費鹅士,重復(fù)消費,重復(fù)消費惩坑!重要的事情說三遍如绸!關(guān)于消息的重復(fù)問題后續(xù)給大家介紹~)嘱朽。而且在實際開發(fā)中旭贬,有時候不會批量的處理消息怔接,而是原子性的,單線程的去一條一條的處理消息稀轨,這樣就是實時的在處理消息扼脐。(批量的處理海量的消息,可以考慮Kafka)
初步了解消息失敗重試機制
消息失敗奋刽,無非涉及到2端:從生產(chǎn)者端發(fā)往MQ的失斖呶辍;消費者端從MQ消費消息的失斢缎场肚吏;
生產(chǎn)者端的失敗重試
生產(chǎn)者端的消息失敗:比如網(wǎng)絡(luò)抖動導(dǎo)致生產(chǎn)者發(fā)送消息到MQ失敗狭魂。
上圖代碼示例的處理手段是:如果該條消息在1S內(nèi)沒有發(fā)送成功罚攀,那么重試3次。
消費者端的失敗重試
消費者端的失敗雌澄,分為2種情況斋泄,一個是timeout,一個是exception
timeout镐牺,比如由于網(wǎng)絡(luò)原因?qū)е孪焊蜎]有從MQ到消費者上炫掐,在RocketMQ內(nèi)部會不斷的嘗試發(fā)送這條消息,直至發(fā)送成功為止2墙А(比如集群中一個broker失敗募胃,就嘗試另一個broker)
exception,消息正常的到了消費者畦浓,結(jié)果消費者發(fā)生異常痹束,處理失敗了。這里涉及到一些問題宅粥,需要我們思考下参袱,比如,消費者消費消息的狀態(tài)有哪些定義秽梅?如果失敗抹蚀,MQ將采取什么策略進行重試?假設(shè)一次性批量PUSH了10條企垦,其中某條數(shù)據(jù)消費異常环壤,那么消息重試是10條呢,還是1條呢钞诡?而且在重試的過程中郑现,需要保證不重復(fù)消費嗎湃崩?
消息消費的狀態(tài),有2種接箫,一個是成功(CONSUME_SUCCESS)攒读,一個是失敗&稍后重試(RECONSUME_LATER)
在啟動broker的過程中,可以觀察下日志辛友,你會發(fā)現(xiàn)RECONSUME_LATER的策略薄扁。
如果消費失敗,那么1S后再次消費废累,如果失敗邓梅,那么5S后,再次消費邑滨,......直至2H后如果消費還失敗日缨,那么該條消息就會終止發(fā)送給消費者了!
RocketMQ為我們提供了這么多次數(shù)的失敗重試掖看,但是在實際中也許我們并不需要這么多重試匣距,比如重試3次,還沒有成功乙各,我們希望把這條消息存儲起來并采用另一種方式處理墨礁,而且希望RocketMQ不要在重試呢,因為重試解決不了問題了耳峦!這該如何做呢恩静?
我們先來看一下一條消息MessageExt對象的輸出:
MessageExt [queueId=0, storeSize=137, queueOffset=0, sysFlag=0, bornTimestamp=1492213846916, bornHost=/192.168.99.219:50478, storeTimestamp=1492213846981, storeHost=/192.168.99.121:10911, msgId=C0A8637900002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest2, flag=0, properties={TAGS=TagA, WAIT=true, MAX_OFFSET=3, MIN_OFFSET=0}, body=16]]
注意到reconsumeTimes屬性,這個屬性就代表消息重試的次數(shù)蹲坷!來看一段代碼:
注意了驶乾,對于消費消息而言,存在2種指定的狀態(tài)(成功 OR 失敗重試)循签,如果一條消息在消費端處理沒有返回這2個狀態(tài)级乐,那么相當(dāng)于這條消息沒有達到消費者,勢必會再次發(fā)送給消費者县匠!也即是消息的處理必須有返回值风科,否則就進行重發(fā)。
天然的消息負(fù)載均衡及高效的水平擴展機制
對于RocketMQ而言乞旦,通過ConsumeGroup的機制贼穆,實現(xiàn)了天然的消息負(fù)載均衡!通俗點來說兰粉,RocketMQ中的消息通過ConsumeGroup實現(xiàn)了將消息分發(fā)到C1/C2/C3/......的機制故痊,這意味著我們將非常方便的通過加機器來實現(xiàn)水平擴展!
我們考慮一下這種情況:比如C2發(fā)生了重啟玖姑,一條消息發(fā)往C3進行消費愕秫,但是這條消息的處理需要0.1S慨菱,而此時C2剛好完成重啟,那么C2是否可能會收到這條消息呢戴甩?答案是肯定的符喝,也就是consume broker的重啟,或者水平擴容等恐,或者不遵守先訂閱后生產(chǎn)消息洲劣,都可能導(dǎo)致消息的重復(fù)消費!關(guān)于去重的話題會在后續(xù)中予以介紹课蔬!
至于消息分發(fā)到C1/C2/C3,其實也是可以設(shè)置策略的郊尝。
集群消費 AND 廣播消費
RocketMQ的消費方式有2種二跋,在默認(rèn)情況下,就是集群消費流昏,也就是上面提及的消息的負(fù)載均衡消費扎即。另一種消費模式,是廣播消費况凉。廣播消費谚鄙,類似于ActiveMQ中的發(fā)布訂閱模式,消息會發(fā)給Consume Group中的每一個消費者進行消費刁绒。
OK闷营,到這里,本期的RocketMQ就結(jié)束了知市,咱們下期見~
周末愉快傻盟!