在上一篇《RocketMQ實戰(zhàn)(一)》中已經(jīng)為大家初步介紹了下RocketMQ以及搭建了雙Master環(huán)境亭枷,接下來繼續(xù)為大家介紹袭艺!
Quick Start
寫一個簡單的生產(chǎn)者、消費者叨粘,帶大家快速體驗RocketMQ~
Maven配置:
生產(chǎn)者:
消費者:
無論生產(chǎn)者猾编、消費者都必須給出GroupName,而且具有唯一性升敲!
生產(chǎn)到哪個Topic的哪個Tag下答倡,消費者也是從Topic的哪個Tag進(jìn)行消費,可見這個Tag有點類似于JMS Selector機制驴党,即實現(xiàn)消息的過濾瘪撇。
生產(chǎn)者、消費者需要設(shè)置NameServer地址港庄。
這里倔既,采用的是Consumer Push的方式,即設(shè)置Listener機制回調(diào)鹏氧,相當(dāng)于開啟了一個線程渤涌。以后為大家介紹Consumer Pull的方式。
我們看一下運行結(jié)果:
仔細(xì)看看生產(chǎn)者結(jié)果輸出把还,你會發(fā)現(xiàn)实蓬,有的消息發(fā)往broker-a,有的在broker-b上吊履,自動實現(xiàn)了消息的負(fù)載均衡安皱!
這里消費消息是沒有什么順序的,以后我們在來談消息的順序性艇炎。
我們再來看一看管控臺:
在多Master模式中练俐,如果某個Master進(jìn)程掛了,顯然這臺broker將不可用冕臭,上面的消息也將無法消費腺晾,要知道開源版本的RocketMQ是沒有提供切換程序燕锥,來自動恢復(fù)故障的,因此在實際開發(fā)中悯蝉,我們一般提供一個監(jiān)聽程序归形,用于監(jiān)控Master的狀態(tài)。
在ActiveMQ中鼻由,生產(chǎn)消息的時候會提供是否持久化的選擇暇榴,但是對于RocketMQ而言,消息是一定會被持久化的蕉世!
上面的消費者采用的是Push Consumer的方式蔼紧,那么監(jiān)聽的Listener中的消息List到底是多少條呢?雖然提供了API狠轻,如consumer.setConsumeMessageBatchMaxSize(10)奸例,實際上即使設(shè)置了批量的條數(shù),但是注意了向楼,是最大是10查吊,并不意味著每次batch的都是10,只有在消息有擠壓的情況下才有可能湖蜕。而且Push Consumer的最佳實踐方式就是一條條的消費逻卖,如果需要batch,可以使用Pull Consumer昭抒。
務(wù)必保證先啟動消費者進(jìn)行Topic訂閱评也,然后在啟動生產(chǎn)者進(jìn)行生產(chǎn)(否則極有可能導(dǎo)致消息的重復(fù)消費,重復(fù)消費灭返,重復(fù)消費仇参!重要的事情說三遍!關(guān)于消息的重復(fù)問題后續(xù)給大家介紹~)婆殿。而且在實際開發(fā)中诈乒,有時候不會批量的處理消息,而是原子性的婆芦,單線程的去一條一條的處理消息怕磨,這樣就是實時的在處理消息。(批量的處理海量的消息消约,可以考慮Kafka)
初步了解消息失敗重試機制
消息失敗肠鲫,無非涉及到2端:從生產(chǎn)者端發(fā)往MQ的失敗或粮;消費者端從MQ消費消息的失數妓恰;
生產(chǎn)者端的失敗重試
生產(chǎn)者端的消息失敗:比如網(wǎng)絡(luò)抖動導(dǎo)致生產(chǎn)者發(fā)送消息到MQ失敗渣锦。
上圖代碼示例的處理手段是:如果該條消息在1S內(nèi)沒有發(fā)送成功硝岗,那么重試3次。
消費者端的失敗重試
消費者端的失敗袋毙,分為2種情況型檀,一個是timeout,一個是exception
timeout,比如由于網(wǎng)絡(luò)原因?qū)е孪焊蜎]有從MQ到消費者上,在RocketMQ內(nèi)部會不斷的嘗試發(fā)送這條消息甥绿,直至發(fā)送成功為止!(比如集群中一個broker失敗鼎兽,就嘗試另一個broker)
exception,消息正常的到了消費者,結(jié)果消費者發(fā)生異常,處理失敗了无埃。這里涉及到一些問題,需要我們思考下蝎困,比如,消費者消費消息的狀態(tài)有哪些定義倍啥?如果失敗禾乘,MQ將采取什么策略進(jìn)行重試?假設(shè)一次性批量PUSH了10條虽缕,其中某條數(shù)據(jù)消費異常始藕,那么消息重試是10條呢,還是1條呢氮趋?而且在重試的過程中伍派,需要保證不重復(fù)消費嗎?
消息消費的狀態(tài)剩胁,有2種诉植,一個是成功(CONSUME_SUCCESS),一個是失敗&稍后重試(RECONSUME_LATER)
在啟動broker的過程中昵观,可以觀察下日志晾腔,你會發(fā)現(xiàn)RECONSUME_LATER的策略。
如果消費失敗啊犬,那么1S后再次消費灼擂,如果失敗,那么5S后觉至,再次消費剔应,......直至2H后如果消費還失敗,那么該條消息就會終止發(fā)送給消費者了!
RocketMQ為我們提供了這么多次數(shù)的失敗重試峻贮,但是在實際中也許我們并不需要這么多重試席怪,比如重試3次,還沒有成功月洛,我們希望把這條消息存儲起來并采用另一種方式處理何恶,而且希望RocketMQ不要在重試呢,因為重試解決不了問題了嚼黔!這該如何做呢细层?
我們先來看一下一條消息MessageExt對象的輸出:
MessageExt [queueId=0, storeSize=137, queueOffset=0, sysFlag=0, bornTimestamp=1492213846916, bornHost=/192.168.99.219:50478, storeTimestamp=1492213846981, storeHost=/192.168.99.121:10911, msgId=C0A8637900002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest2, flag=0, properties={TAGS=TagA, WAIT=true, MAX_OFFSET=3, MIN_OFFSET=0}, body=16]]
注意到reconsumeTimes屬性,這個屬性就代表消息重試的次數(shù)唬涧!來看一段代碼:
注意了疫赎,對于消費消息而言,存在2種指定的狀態(tài)(成功 OR 失敗重試)碎节,如果一條消息在消費端處理沒有返回這2個狀態(tài)捧搞,那么相當(dāng)于這條消息沒有達(dá)到消費者,勢必會再次發(fā)送給消費者狮荔!也即是消息的處理必須有返回值胎撇,否則就進(jìn)行重發(fā)。
天然的消息負(fù)載均衡及高效的水平擴展機制
對于RocketMQ而言殖氏,通過ConsumeGroup的機制晚树,實現(xiàn)了天然的消息負(fù)載均衡!通俗點來說雅采,RocketMQ中的消息通過ConsumeGroup實現(xiàn)了將消息分發(fā)到C1/C2/C3/......的機制爵憎,這意味著我們將非常方便的通過加機器來實現(xiàn)水平擴展!
我們考慮一下這種情況:比如C2發(fā)生了重啟婚瓜,一條消息發(fā)往C3進(jìn)行消費宝鼓,但是這條消息的處理需要0.1S,而此時C2剛好完成重啟巴刻,那么C2是否可能會收到這條消息呢愚铡?答案是肯定的,也就是consume broker的重啟胡陪,或者水平擴容茂附,或者不遵守先訂閱后生產(chǎn)消息,都可能導(dǎo)致消息的重復(fù)消費督弓!關(guān)于去重的話題會在后續(xù)中予以介紹营曼!
至于消息分發(fā)到C1/C2/C3,其實也是可以設(shè)置策略的愚隧。
集群消費 AND 廣播消費
RocketMQ的消費方式有2種蒂阱,在默認(rèn)情況下锻全,就是集群消費,也就是上面提及的消息的負(fù)載均衡消費录煤。另一種消費模式鳄厌,是廣播消費。廣播消費妈踊,類似于ActiveMQ中的發(fā)布訂閱模式了嚎,消息會發(fā)給Consume Group中的每一個消費者進(jìn)行消費。