前情提要
本次培訓(xùn)為JAVA中階培訓(xùn)(一)的拓展,主要講述RocketMQ的相關(guān)原理與使用旭蠕,以及附帶EasyExcel的使用方法,著重于提高Java初級(jí)程序員對(duì)Java技術(shù)的更加深層次的理解恼除,進(jìn)而培養(yǎng)自己的架構(gòu)思維兢仰,在今后的技術(shù)選型中能夠更加從容。
同JAVA中階(一)培訓(xùn)一樣贩挣,所有例子均來自項(xiàng)目實(shí)例喉前。
一、RocketMQ
1王财、什么是消息隊(duì)列卵迂?
序
在很久很久以前,人們之間的通信方式就是面對(duì)面交談绒净,你說一句见咒,我聽一句,雖然簡單可靠疯溺,但是弊端也很大论颅。
比如,當(dāng)你成為一個(gè)軍隊(duì)的首領(lǐng)囱嫩,每個(gè)屬下一有情況就立刻向你匯報(bào)恃疯,一個(gè)還好,但當(dāng)你的屬下有幾十個(gè)幾百個(gè)的時(shí)候墨闲,他們每天不分時(shí)間不看場合今妄,都在嘰嘰喳喳和你匯報(bào)情況,那你可能什么都聽不到鸳碧,而且腦袋都要炸掉了盾鳞。這個(gè)時(shí)候,你說停瞻离,都給我停下腾仅,要匯報(bào)情況的,去門口排隊(duì)套利,一個(gè)一個(gè)的來推励,這個(gè)就叫做流量削峰鹤耍,一群人不要一擁而上,都乖乖給我排隊(duì)去验辞。
然后你就一個(gè)接一個(gè)的聽稿黄,聽了整整24個(gè)小時(shí),實(shí)在困的不行跌造,尋思著這樣不行呀杆怕,如此下去可能就要天妒英才了,于是你又說壳贪,來人陵珍,發(fā)筆和紙,都把要匯報(bào)的消息寫在紙上撑碴,寫完后告訴呂秀才撑教,然后聽呂秀才的指示,沿著屋里右面墻根醉拓,按照指示的位置疊放整齊伟姐,匯報(bào)的人就可以退下該做啥做啥去吧,等我休息一下亿卤,再來看你們的匯報(bào)內(nèi)容愤兵,這就叫做異步處理,你終于可以由自己掌控消息獲取的進(jìn)度了排吴,美滋滋的去睡覺了秆乳。
而匯報(bào)的人把內(nèi)容寫在紙上,疊放好钻哩,就可以退下自己做自己該做的事情屹堰,而不是一直在門口等待匯報(bào),這個(gè)就叫做解耦街氢。
削峰扯键,異步,解耦珊肃。這就是消息隊(duì)列最常用的三大場景荣刑。
故事中的下屬們,就是消息生產(chǎn)者角色伦乔,屋子右面墻根那塊地就是消息持久化厉亏,呂秀才就是消息調(diào)度中心,而你就是消息消費(fèi)者角色烈和。下屬們匯報(bào)的消息爱只,應(yīng)該疊放在哪里,這個(gè)消息又應(yīng)該在哪里才能找到招刹,全靠呂秀才的驚人記憶力恬试,才可以讓消息準(zhǔn)確的被投放以及消費(fèi)沥匈。
正題
消息隊(duì)列作為高并發(fā)系統(tǒng)的核心組件之一,能夠幫助業(yè)務(wù)系統(tǒng)解構(gòu)提升開發(fā)效率和系統(tǒng)穩(wěn)定性忘渔。主要具有以下優(yōu)勢:
削峰填谷(主要解決瞬時(shí)寫壓力大于應(yīng)用服務(wù)能力導(dǎo)致消息丟失、系統(tǒng)奔潰等問題)
系統(tǒng)解耦(解決不同重要程度缰儿、不同能力級(jí)別系統(tǒng)之間依賴導(dǎo)致一死全死)
提升性能(當(dāng)存在一對(duì)多調(diào)用時(shí)畦粮,可以發(fā)一條消息給消息系統(tǒng),讓消息系統(tǒng)通知相關(guān)系統(tǒng))
蓄流壓測(線上有些鏈路不好壓測乖阵,可以通過堆積一定量消息再放開來壓測)
目前主流的MQ主要是Rocketmq宣赔、kafka、Rabbitmq瞪浸,Rocketmq相比于Rabbitmq儒将、kafka具有主要優(yōu)勢特性有:
? 支持事務(wù)型消息(消息發(fā)送和DB操作保持兩方的最終一致性,rabbitmq和kafka不支持)
? 支持結(jié)合rocketmq的多個(gè)系統(tǒng)之間數(shù)據(jù)最終一致性(多方事務(wù)对蒲,二方事務(wù)是前提)
? 支持18個(gè)級(jí)別的延遲消息(rabbitmq和kafka不支持)
? 支持指定次數(shù)和時(shí)間間隔的失敗消息重發(fā)(kafka不支持钩蚊,rabbitmq需要手動(dòng)確認(rèn))
? 支持consumer端tag過濾,減少不必要的網(wǎng)絡(luò)傳輸(rabbitmq和kafka不支持)
? 支持重復(fù)消費(fèi)(rabbitmq不支持蹈矮,kafka支持)
Rocketmq砰逻、kafka、Rabbitmq的詳細(xì)對(duì)比泛鸟,請參照下表格:
2蝠咆、RocketMQ詳解
序
我們來類比一下現(xiàn)實(shí)生活,有一個(gè)人想要給另外一個(gè)人寄快件北滥,那么就需要先由這個(gè)人在網(wǎng)上查詢有哪些郵局刚操,然后選擇其中一個(gè)郵局,把快件投遞給它再芋,再由郵局配送到目標(biāo)人菊霜。
1、路由注冊
郵局在竣工后祝闻,需要與衛(wèi)星聯(lián)網(wǎng)占卧,將自己納入衛(wèi)星網(wǎng)絡(luò)管理中,這樣就相當(dāng)于對(duì)外宣布联喘,我這個(gè)郵局開始運(yùn)營了华蜒,可以收發(fā)郵件快遞了。 郵局并網(wǎng)之后,如何讓衛(wèi)星持續(xù)并及時(shí)感知這個(gè)郵局在線以及郵局自身信息的調(diào)整,使衛(wèi)星可以隨時(shí)協(xié)調(diào)這個(gè)郵局呢叠蝇?
這個(gè)時(shí)候就需要郵局定時(shí)向衛(wèi)星發(fā)一條信息: “嗶嗶嗶————我是郵局C呆细,編號(hào)SHC爷光,地址XXXXX形葬,歸屬中國上海集群合蔽,在線料皇,此時(shí)此刻2019年3月15日13點(diǎn)21秒”
衛(wèi)星接收到消息后啥辨,拿個(gè)小本本記錄下來:
“郵局B涡匀,BJB,北京溉知,2019年3月15日13點(diǎn)10秒陨瘩,活著...”
“郵局A,BJA级乍,北京舌劳,2019年3月15日13點(diǎn)15秒,活著...”
“郵局C玫荣,SHC甚淡,上海,2019年3月15日13點(diǎn)21秒捅厂,活著...”
上面這個(gè)故事贯卦,就講述了NameServer路由注冊的基本原理。
NameServer就相當(dāng)于衛(wèi)星恒傻,內(nèi)部會(huì)維護(hù)一個(gè)Broker表脸侥,用來動(dòng)態(tài)存儲(chǔ)Broker的信息。 而Broker就相當(dāng)于郵局盈厘,在啟動(dòng)的時(shí)候睁枕,會(huì)先遍歷NameServer列表,依次發(fā)起注冊請求沸手,保持長連接外遇,然后每隔30秒向NameServer發(fā)送心跳包,心跳包中包含BrokerId契吉、Broker地址跳仿、Broker名稱、Broker所屬集群名稱等等捐晶,然后NameServer接收到心跳包后菲语,會(huì)更新時(shí)間戳,記錄這個(gè)Broker的最新存活時(shí)間惑灵。
NameServer在處理心跳包的時(shí)候山上,存在多個(gè)Broker同時(shí)操作一張Broker表,為了防止并發(fā)修改Broker表導(dǎo)致不安全英支,路由注冊操作引入了ReadWriteLock讀寫鎖佩憾,這個(gè)設(shè)計(jì)亮點(diǎn)允許多個(gè)消息生產(chǎn)者并發(fā)讀,保證了消息發(fā)送時(shí)的高并發(fā),但是同一時(shí)刻N(yùn)ameServer只能處理一個(gè)Broker心跳包妄帘,多個(gè)心跳包串行處理楞黄。這也是讀寫鎖的經(jīng)典使用場景,即讀多寫少抡驼。
2鬼廓、路由剔除
忽然有一天,郵局C的機(jī)房進(jìn)老鼠了致盟,咬斷電源線宕機(jī)了桑阶,而衛(wèi)星不知道郵局C業(yè)務(wù)故障了,依舊將帶有郵局C的郵局表信息傳給寄件人(生產(chǎn)者)勾邦,寄件人聯(lián)系郵局C發(fā)送快件,但是郵局C機(jī)房宕機(jī)割择,業(yè)務(wù)暫停眷篇,處于癱瘓狀態(tài),自然也就無法接收快件了荔泳。 另一方面蕉饼,因?yàn)榭旒茨鼙秽]局C收入,也就無法將快件轉(zhuǎn)交給收件人玛歌,顧客們久久等不到自己的快件昧港,紛紛投訴,為此郵局C的管理層備受責(zé)難支子。
于是郵政總局技術(shù)部開始研究討論创肥,怎么讓衛(wèi)星可以感知到郵局“失聯(lián)了”,從而自動(dòng)排除故障郵局值朋,將其負(fù)責(zé)的業(yè)務(wù)交付給其他正常的郵局處理叹侄,這樣就不會(huì)因?yàn)槟骋粋€(gè)郵局出現(xiàn)問題,而導(dǎo)致這個(gè)郵局所管轄的部分業(yè)務(wù)無法處理昨登。
大家眾說紛紜趾代,最后敲定了一個(gè)方案,讓衛(wèi)星每隔一段時(shí)間掃描郵局信息表丰辣,如果發(fā)現(xiàn)某個(gè)郵局上報(bào)信息時(shí)間與當(dāng)時(shí)掃描時(shí)間之間的差值超過了某個(gè)預(yù)設(shè)的閾值撒强,就判定這個(gè)郵局“失聯(lián)了”,將此郵局信息從郵局表中剔除笙什。這樣寄件人查詢到的郵局表里都是正常營業(yè)的郵局信息飘哨。
新功能上線運(yùn)營后,效果不錯(cuò)得湘,大家再也不用擔(dān)心因?yàn)槟硞€(gè)郵局故障而導(dǎo)致業(yè)務(wù)停滯杖玲,又過上了泡茶報(bào)紙的生活。
這個(gè)故事同樣在RocketMQ中上演淘正。
正常情況下摆马,如果Broker關(guān)閉臼闻,則會(huì)與NameServer斷開長連接,Netty的通道關(guān)閉監(jiān)聽器會(huì)監(jiān)聽到連接斷開事件囤采,然后會(huì)將這個(gè)Broker信息剔除掉述呐。
異常情況下,NameServer中有一個(gè)定時(shí)任務(wù)蕉毯,每隔10秒掃描一下Broker表乓搬,如果某個(gè)Broker的心跳包最新時(shí)間戳距離當(dāng)前時(shí)間超多120秒,也會(huì)判定Broker失效并將其移除代虾。
細(xì)心的人會(huì)發(fā)現(xiàn)一個(gè)問題进肯,NameServer在清除失活Broker之后,并沒有主動(dòng)通知生產(chǎn)者棉磨,生產(chǎn)者每隔30秒會(huì)請求NameServer并獲取最新的路由表江掩,那么就意味著,消息生產(chǎn)者總會(huì)有30秒的延時(shí)乘瓤,無法實(shí)時(shí)感知Broker服務(wù)器的宕機(jī)环形。所以在這個(gè)30秒里,生產(chǎn)者依舊會(huì)向失活Broker發(fā)送消息衙傀,那么消息發(fā)送的高可用性如何保證呢抬吟?
3、Broker高可用原理
要解決這個(gè)問題得首先談一談Broker的負(fù)載策略统抬,消息發(fā)送隊(duì)列默認(rèn)采用輪詢機(jī)制火本,消息發(fā)送時(shí)默認(rèn)選擇異常重試機(jī)制來保證消息發(fā)送的高可用。當(dāng)Broker宕機(jī)后聪建,雖然消息發(fā)送者無法第一時(shí)間感知Broker 宕機(jī)发侵,但是當(dāng)消息生產(chǎn)者向Broker發(fā)送消息返回異常后,消息生產(chǎn)者會(huì)選擇另外一個(gè)Broker上的消息隊(duì)列妆偏,這樣就規(guī)避了發(fā)生故障的Broker刃鳄,結(jié)合重試機(jī)制,巧妙實(shí)現(xiàn)消息發(fā)送的高可用钱骂,同時(shí)由于不需要NameServer通知眾多不固定的生產(chǎn)者叔锐,也降低了NameServer實(shí)現(xiàn)的復(fù)雜性。
2见秽、在降低NameServer實(shí)現(xiàn)復(fù)雜性方面愉烙,還有一個(gè)設(shè)計(jì)亮點(diǎn)就是NameServer之間是彼此獨(dú)立無交流的,也就是說NameServer服務(wù)器之間在某個(gè)時(shí)刻的數(shù)據(jù)并不會(huì)完全相同解取,但是異常重試機(jī)制使得這種差異不會(huì)造成任何影響步责。
4、路由發(fā)現(xiàn)
天上的衛(wèi)星是有限的,不易變的蔓肯,而地上的寄件人是繁多的遂鹊,易變的。所以寄件人想要知道有哪些郵局蔗包,很明顯最適合的方式是向衛(wèi)星發(fā)請求秉扑,拉取郵局表信息,而不是等衛(wèi)星給每個(gè)人推送调限。 所以在RocketMQ中舟陆,NameServer是不主動(dòng)推送會(huì)客戶端的,而是由客戶端拉取主題的最新路由信息耻矮。
5秦躯、CAP理論
NameServer作為注冊和發(fā)現(xiàn)中心,是整個(gè)分布式消息隊(duì)列調(diào)度的靈魂裆装,談及到分布式宦赠,就逃不開CAP理論,C是Consistency(一致性)米母,A是Availability(可用性),P是(容錯(cuò)性)毡琉,對(duì)于分布式架構(gòu)铁瞒,網(wǎng)絡(luò)條件不可控,出現(xiàn)網(wǎng)絡(luò)分區(qū)是不可避免的桅滋,因此必須具備分區(qū)容錯(cuò)性慧耍,那么NameServer就是在AP還是CP中選擇了,由于NameServer之間相互獨(dú)立丐谋,很明顯芍碧,是一個(gè)AP設(shè)計(jì)。
正題
1) Name Server(路由注冊中?)(呂秀才)(衛(wèi)星)
Name Server是一個(gè)幾乎無狀態(tài)節(jié)點(diǎn)号俐,可集群部署泌豆,節(jié)點(diǎn)之間無任何信息同步。
2) Broker (消息存儲(chǔ)服務(wù)器)(墻根)(郵局)
Broker部署相對(duì)復(fù)雜吏饿,Broker分為Master與Slave踪危,一個(gè)Master可以對(duì)應(yīng)多個(gè)Slave,但是一個(gè)Slave只能對(duì)應(yīng)一個(gè)Master猪落,Master與Slave的對(duì)應(yīng)關(guān)系通過指定相同的Broker Name贞远,不同的Broker Id來定義,BrokerId為0表示Master笨忌,非0表示Slave蓝仲。Master也可以部署多個(gè)。
每個(gè)Broker與Name Server集群中的所有節(jié)點(diǎn)建立長連接,定時(shí)(每隔30s)注冊Topic信息到所有Name Server袱结。Name Server定時(shí)(每隔10s)掃描所有存活broker的連接亮隙,如果Name Server超過2分鐘沒有收到心跳,則Name Server斷開與Broker的連接擎勘。
3) Producer(消息?產(chǎn)者咱揍,?于向消息服務(wù)器發(fā)送消息)(士兵)(發(fā)件人)
Producer與Name Server集群中的其中一個(gè)節(jié)點(diǎn)(隨機(jī)選擇)建立長連接,定期從Name Server取Topic路由信息棚饵,并向提供Topic服務(wù)的Master建立長連接煤裙,且定時(shí)向Master發(fā)送心跳。Producer完全無狀態(tài)噪漾,可集群部署硼砰。
Producer每隔30s(由ClientConfig的pollNameServerInterval)從Name server獲取所有topic隊(duì)列的最新情況,這意味著如果Broker不可用欣硼,Producer最多30s能夠感知题翰,在此期間內(nèi)發(fā)往Broker的所有消息都會(huì)失敗。
Producer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關(guān)聯(lián)的broker發(fā)送心跳诈胜,Broker每隔10s中掃描所有存活的連接豹障,如果Broker在2分鐘內(nèi)沒有收到心跳數(shù)據(jù),則關(guān)閉與Producer的連接焦匈。
4) Consumer(消息消費(fèi)者)(你-將軍)(收件人)
Consumer與Name Server集群中的其中一個(gè)節(jié)點(diǎn)(隨機(jī)選擇)建立長連接血公,定期從Name Server取Topic路由信息,并向提供Topic服務(wù)的Master缓熟、Slave建立長連接累魔,且定時(shí)向Master、Slave發(fā)送心跳够滑。Consumer既可以從Master訂閱消息垦写,也可以從Slave訂閱消息,訂閱規(guī)則由Broker配置決定彰触。
Consumer每隔30s從Name server獲取topic的最新隊(duì)列情況梯投,這意味著Broker不可用時(shí),Consumer最多最需要30s才能感知况毅。
Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關(guān)聯(lián)的broker發(fā)送心跳晚伙,Broker每隔10s掃描所有存活的連接,若某個(gè)連接2分鐘內(nèi)沒有發(fā)送心跳數(shù)據(jù)俭茧,則關(guān)閉連接咆疗;并向該Consumer Group的所有Consumer發(fā)出通知,Group內(nèi)的Consumer重新分配隊(duì)列母债,然后繼續(xù)消費(fèi)午磁。
當(dāng)Consumer得到master宕機(jī)通知后尝抖,轉(zhuǎn)向slave消費(fèi),slave不能保證master的消息100%都同步過來了迅皇,因此會(huì)有少量的消息丟失昧辽。但是一旦master恢復(fù),未同步過去的消息會(huì)被最終消費(fèi)掉登颓。
消費(fèi)者對(duì)列是消費(fèi)者連接之后(或者之前有連接過)才創(chuàng)建的搅荞。我們將原生的消費(fèi)者標(biāo)識(shí)由 {IP}@{消費(fèi)者group}擴(kuò)展為 {IP}@{消費(fèi)者group}{topic}{tag},(例如xxx.xxx.xxx.xxx@mqtest_producer-group_2m2sTest_tag-zyk)框咙。任何一個(gè)元素不同咕痛,都認(rèn)為是不同的消費(fèi)端,每個(gè)消費(fèi)端會(huì)擁有一份自己消費(fèi)對(duì)列(默認(rèn)是broker對(duì)列數(shù)量*broker數(shù)量)喇嘱。新掛載的消費(fèi)者對(duì)列中擁有commitlog中的所有數(shù)據(jù)茉贡。
3、實(shí)際代碼
一者铜、基礎(chǔ)配置
(1)腔丧、MAVEN 引入(使用阿里云的RocketMQ付費(fèi)服務(wù),不自行搭建)
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.2.6</version>
</dependency>
(2)作烟、配置相應(yīng)的賬號(hào)信息(mq-config.properties)
ProducerId=保密
ConsumerId=保密
Topic=保密
AccessKey=保密
SecretKey=保密
expression=*
二愉粤、生產(chǎn)者使用
(1)、生產(chǎn)者發(fā)送消息核心方法
/**
* MQ生產(chǎn)者-發(fā)布
*
* @author 陳豆豆
* @date 2020-03-31
*/
@Slf4j
public class RocketMqProducer {
/**
* 消息發(fā)送
*
* @param producer 生產(chǎn)者
* @param topic 主體
* @param bean 消息實(shí)體
* @throws UnsupportedEncodingException 編碼格式異常
* @throws JsonProcessingException 格式轉(zhuǎn)換異常
*/
public static void rocketMqSend(Producer producer, String topic, BaseRocketMqBean bean) throws UnsupportedEncodingException, JsonProcessingException {
SendResult sendResult = producer.send(new Message(topic, bean.getTag(), bean.getKey(), bean.parseToRocketBytes()));
log.info("[{}] RocketMQ發(fā)送消息 [{}] 反饋信息 [{}]", topic, bean.toString(), sendResult);
}
}
(2)拿撩、生產(chǎn)者發(fā)送消息組裝核心方法
/**
* 超級(jí)店長生產(chǎn)者
*
* @author 陳豆豆
* @date 2020-03-31
*/
@Slf4j
@Service
public class OrangeStoreProducer {
private static final BlockingQueue<BaseRocketMqBean> QUEUE = new LinkedBlockingQueue<>();
private static Producer producer;
private static String Topic = "";
static {
try {
Properties properties = new Properties();
properties.load(OrangeStoreProducer.class.getClassLoader().getResourceAsStream("mq-config.properties"));
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Topic = properties.getProperty("Topic");
producer = ONSFactory.createProducer(properties);
producer.start();
log.info("超級(jí)店長生產(chǎn)隊(duì)列啟動(dòng)");
} catch (IOException e) {
log.error("超級(jí)店長生產(chǎn)隊(duì)列啟動(dòng)異常: {}", e.getLocalizedMessage(), e);
}
}
public static void addToQueue(BaseRocketMqBean bean) {
bean.setTag("MS_orange_store_tag");
bean.setKey("MS_orange_store");
QUEUE.offer(bean);
}
@PostConstruct
@SuppressWarnings("all")
public void batchSend() {
Thread thread = new Thread(() -> {
{
while (true) {
try {
// 超過1秒無法獲取數(shù)據(jù)就等待下次獲取
BaseRocketMqBean bean = QUEUE.poll(1, TimeUnit.SECONDS);
if (bean == null) {
continue;
}
RocketMqProducer.rocketMqSend(producer, Topic, bean);
} catch (Exception e) {
log.error("同步數(shù)據(jù)到超級(jí)店長生產(chǎn)隊(duì)列異常:{}", e.getLocalizedMessage(), e);
}
}
}
});
thread.start();
}
}
講解點(diǎn)
RocketMQ消息分發(fā)的兩種模式
廣播模式(BROADCASTING)和 集群模式(CLUSTERING)
(3)衣厘、消息傳遞結(jié)構(gòu)體
基礎(chǔ)結(jié)構(gòu)體
/**
* 阿里云rocketMq隊(duì)列實(shí)體bean父類
*
* @author 陳豆豆
* @date 2020-03-31
*/
@Data
public abstract class BaseRocketMqBean implements Serializable {
private static final long serialVersionUID = 29723539849425729L;
/**
* 隊(duì)列tag
*/
private String tag;
/**
* 隊(duì)列messageKey
*/
private String key;
/**
* bean 轉(zhuǎn)換成阿里云json
*
* @return json
* @throws JsonProcessingException 格式轉(zhuǎn)換異常
*/
public byte[] parseToRocketBytes() throws JsonProcessingException, UnsupportedEncodingException {
return new ObjectMapper().writeValueAsString(this).getBytes("UTF-8");
}
}
同步數(shù)據(jù)結(jié)構(gòu)體
/**
* 同步數(shù)據(jù)專用bean
*
* @author 陳豆豆
* @date 2020/4/01 10:04
*/
public class SyncMessageBean<T> extends BaseRocketMqBean implements Serializable {
private static final long serialVersionUID = -1535244308014581262L;
/**
* 操作結(jié)果
*/
@JsonIgnore
private Boolean success;
/**
* 標(biāo)題
*/
private String title;
/**
* 提示消息
*/
private String errorMsg;
/**
* 返回碼
*/
@JsonIgnore
private Integer errorCode;
/**
* 對(duì)象實(shí)體
*/
private Object data;
public SyncMessageBean() {
super();
this.success = success;
this.errorMsg = errorMsg;
}
public SyncMessageBean(Boolean success, String message, T data, String title) {
super();
this.success = success;
this.errorMsg = message;
this.data = data;
this.title = title;
}
public SyncMessageBean(boolean success, String message) {
super();
this.title = "操作提示";
this.success = success;
this.errorMsg = message;
}
public SyncMessageBean(Exception ex) {
super();
this.success = false;
this.errorMsg = StringUtils.defaultIfBlank(ex.getMessage(), "程序異常");
}
public static SyncMessageBean success() {
SyncMessageBean messageBean = new SyncMessageBean();
messageBean.setSuccess(true);
messageBean.setErrorMsg("操作成功");
return messageBean;
}
public static SyncMessageBean success(String msg) {
SyncMessageBean messageBean = new SyncMessageBean();
messageBean.setSuccess(true);
messageBean.setErrorMsg(msg);
return messageBean;
}
public static SyncMessageBean fail(String msg) {
SyncMessageBean messageBean = new SyncMessageBean();
messageBean.setSuccess(false);
messageBean.setErrorCode(ErrorCodeEnum.ERROR.getCode());
messageBean.setErrorMsg(msg);
messageBean.setTitle();
return messageBean;
}
public static SyncMessageBean fail() {
return fail("操作失敗");
}
public Boolean getSuccess() {
return success;
}
public void setSuccess(Boolean success) {
this.success = success;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public void setTitle() {
this.title = "ERROR";
}
public String getErrorMsg() {
return errorMsg;
}
public void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
}
public Integer getErrorCode() {
return errorCode;
}
public void setErrorCode(Integer errorCode) {
this.errorCode = errorCode;
}
public Object getData() {
return data;
}
public void setData(Object data) {
this.data = data;
}
@Override
public String toString() {
return "MessageBean{" +
"success=" + success +
", title='" + title + '\'' +
", errorMsg='" + errorMsg + '\'' +
", errorCode=" + errorCode +
", data=" + data +
'}';
}
}
(4)、生產(chǎn)消息類型
/**
* 同步類型枚舉
*
* @author 陳豆豆
* @date 2020/4/1 10:00
*/
public enum ProducerTypeEnum {
/**
* 店員信息同步
*/
POS_CASHIER(0, "POS_CASHIER"),
/**
* 門店電費(fèi)信息同步
*/
STORE_ELECTRICITY(1, "STORE_ELECTRICITY"),
;
/**
* 編碼
*/
private Integer code;
/**
* 名稱
*/
private String name;
ProducerTypeEnum(Integer code, String name) {
this.code = code;
this.name = name;
}
public Integer getCode() {
return code;
}
public void setCode(Integer code) {
this.code = code;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
(5)绷雏、調(diào)用方法
/**
* 同步門店信息到物聯(lián)網(wǎng)系統(tǒng)
*
* @param storeCode 門店編號(hào)
*/
@Override
public void syncStoreForSmart(String storeCode) {
// 查詢對(duì)應(yīng)的門店信息
StoreForSmart store = storeInfoDao.queryStoreForSmart(storeCode);
if (Objects.isNull(store)) {
return;
}
SyncMessageBean messageBean = new SyncMessageBean();
messageBean.setErrorMsg(ProducerTypeEnum.SYNC_STORE_TO_SMART.getName());
messageBean.setData(JSON.toJSONString(store));
OrangeStoreProducer.addToQueue(messageBean);
}
三、消費(fèi)者
(1)怖亭、接受消息核心組件
/**
* @description:接受消息核心組件
* @author: tangn
* @time: 2021/2/7 15:14
*/
@Slf4j
@Component
public class ReceiveHandleComponent {
public static final BlockingQueue<SyncMessageBean> RECEIVE_HANDLE = new LinkedBlockingQueue<>();
@Resource
private ConsumerService consumerService;
/**
* 創(chuàng)建線程池
*/
private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder()
.setNameFormat("ReceiveHandleComponent-%d").setDaemon(true).build();
private static final ExecutorService RECEIVE_HANDLE_POLL = Executors.newFixedThreadPool(16, THREAD_FACTORY);
@PostConstruct
public void judgeFace() {
//執(zhí)行線程
RECEIVE_HANDLE_POLL.execute(() -> {
while (true) {
try {
// 設(shè)置隊(duì)列超過1秒無法獲取數(shù)據(jù)就等待下次獲取
SyncMessageBean syncMessageBean = RECEIVE_HANDLE.poll(1, TimeUnit.SECONDS);
if (Objects.isNull(syncMessageBean)) {
continue;
}
consumerService.receiveHandle(syncMessageBean);
} catch (Exception e) {
log.warn("訂閱處理線程異常[{}]", e.getLocalizedMessage(), e);
}
}
});
}
}
(2)涎显、消費(fèi)者注冊
/**
* @description:消費(fèi)者注冊
* @author: tangn
* @time: 2021/2/7 15:09
*/
@Slf4j
@Component
public class QianYunDataConsumer extends ConsumerBean {
private final QianYunDataListener qianYunDataListener;
@Autowired
public QianYunDataConsumer(QianYunDataListener qianYunDataListener) {
this.qianYunDataListener = qianYunDataListener;
}
@SuppressWarnings("Duplicates")
private Subscription postInit() {
Subscription subscription = new Subscription();
try {
Properties properties = new Properties();
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
properties.load(this.getClass().getClassLoader().getResourceAsStream("mq-config.properties"));
this.setProperties(properties);
String topic = properties.getProperty("Topic");
String expression = properties.getProperty("expression");
subscription.setExpression(expression);
subscription.setTopic(topic);
log.info("消費(fèi)者配置文件加載成功");
} catch (IOException e) {
log.error("消費(fèi)者初始化,配置文件加載異常 {}", e.getLocalizedMessage(), e);
}
return subscription;
}
@PostConstruct
@Override
public void start() {
Map<Subscription, MessageListener> consumers = new HashMap<>(1);
consumers.put(postInit(), qianYunDataListener);
this.setSubscriptionTable(consumers);
log.info("消費(fèi)者啟動(dòng)");
super.start();
}
@PreDestroy
@Override
public void shutdown() {
log.info("消費(fèi)者關(guān)閉");
super.shutdown();
}
}
(4)、消費(fèi)者消息監(jiān)聽
/**
* @description:消費(fèi)者消息監(jiān)聽
* @author: tangning
* @time: 2021/2/7 15:12
*/
@Slf4j
@Service
public class QianYunDataListener implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
try {
log.info("[{}] 消費(fèi)隊(duì)列接收信息,body [{}] ", message, message == null ? "null" : new String(message.getBody(), "ISO8859-1"));
SyncMessageBean messageBean = MqTextUtil.convertMessageToBean(message, SyncMessageBean.class);
log.info("messageId [{}] 訂閱信息返回 [{}]", message.getMsgID(), messageBean);
if (Objects.nonNull(messageBean)){
ReceiveHandleComponent.RECEIVE_HANDLE.offer(messageBean);
}
return Action.CommitMessage;
} catch (ErrorCodeException e) {
log.warn("[{}] 消費(fèi)者異常 [{}]", message, e.getLocalizedMessage(), e);
return Action.ReconsumeLater;
} catch (Throwable e) {
log.error("[{}] 消費(fèi)者異常 [{}]", message, e.getLocalizedMessage(), e);
return Action.ReconsumeLater;
}
}
}
(5)兴猩、消費(fèi)者消息處理(根據(jù)自己的業(yè)務(wù)進(jìn)行處理)
/**
* @description: 訂閱處理
* @author: MA XIN
* @time: 2021/2/7 15:26
*/
@Slf4j
@Service
public class ConsumerServiceImpl implements ConsumerService {
@Resource
private StoreService storeService;
/**
* 訂閱處理
*
* @param messageBean 訂閱消息
*/
@Override
public void receiveHandle(SyncMessageBean<T> messageBean) {
// 門店信息同步
if (ConsumerTypeEnum.SYNC_STORE_TO_SMART.getName().equals(messageBean.getErrorMsg())) {
StoreInfo storeInfo = JSON.parseObject(messageBean.getData().toString(), StoreInfo.class);
log.info("門店信息同步訂閱解析[{}]", storeInfo);
SimpleMessage simpleMessage = null;
try {
simpleMessage = storeService.syncStoreInfo(storeInfo);
} catch (Exception e) {
log.error("門店信息同步異常[{}]", storeInfo, e);
}
log.info("店信息同步結(jié)果[{}],[{}]", storeInfo, simpleMessage);
}
}
}
二期吓、EasyExcel 使用
優(yōu)勢:EasyExcel是一個(gè)基于Java的簡單、省內(nèi)存的讀寫Excel的開源項(xiàng)目倾芝。在盡可能節(jié)約內(nèi)存的情況下支持讀寫百M(fèi)的Excel讨勤。 github地址:https://github.com/alibaba/easyexcel
直接上代碼
1、基礎(chǔ)引入
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
<version>1.1.2-beat1</version>
</dependency>
2晨另、導(dǎo)出EXCEL
(1)潭千、下載結(jié)構(gòu)體
/**
* @author: 唐寧
* @date: 2019/5/4
* @time: 21:05
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
public class AskCarSheetDetailDownloadBean implements Serializable {
private static final long serialVersionUID = -7779152957404073436L;
/**
* 車輛VIN碼
*/
@ExcelProperty("VIN碼")
private String carCode;
/**
* 車輛狀態(tài)
*/
@ExcelProperty("車輛狀態(tài)")
private String carStateIntro;
/**
* 倉庫名稱
*/
@ExcelProperty("倉庫名稱")
private String areaName;
/**
* 區(qū)域名稱
*/
@ExcelProperty("區(qū)域名稱")
private String positionName;
/**
* 第幾排
*/
@ExcelProperty("第幾排")
private Integer row;
/**
* 第幾列
*/
@ExcelProperty("第幾列")
private Integer position;
}
(2)、核心代碼及使用方法
/**
* 下載單據(jù)詳情數(shù)據(jù)
*
* @param response 響應(yīng)
* @param dto 查詢參數(shù)
* @throws IOException
*/
@Override
public void downloadAskCarSheetDetail(HttpServletResponse response, AskCarSheetDetailDTO dto) throws IOException {
List<AskCarSheetDetailVO> askCarSheetDetailList = sheetDao.getAskCarSheetDetailList(dto);
// 處理下載數(shù)據(jù)
List<AskCarSheetDetailDownloadBean> askCarSheetDetailDownloadBeanList = new ArrayList<>();
askCarSheetDetailList.forEach(askCarSheetDetailVO -> {
// 構(gòu)建基礎(chǔ)
AskCarSheetDetailDownloadBean askCarSheetDetailDownloadBean = AskCarSheetDetailDownloadBean.builder()
.carCode(askCarSheetDetailVO.getCarCode())
.areaName(askCarSheetDetailVO.getAreaName())
.positionName(askCarSheetDetailVO.getPositionName())
.row(askCarSheetDetailVO.getRow())
.position(askCarSheetDetailVO.getPosition())
.build();
// 車輛狀態(tài)
if (Objects.nonNull(dto.getCarState())) {
askCarSheetDetailDownloadBean.setCarStateIntro(dto.getCarState().getStr());
} else {
askCarSheetDetailDownloadBean.setCarStateIntro("未找到");
}
// 添加數(shù)據(jù)
askCarSheetDetailDownloadBeanList.add(askCarSheetDetailDownloadBean);
});
// 處理下載
response.setContentType("application/vnd.ms-excel");
response.setCharacterEncoding("utf-8");
// 這里URLEncoder.encode可以防止中文亂碼 當(dāng)然和easyexcel沒有關(guān)系
String fileName = URLEncoder.encode("單據(jù)詳情信息", "UTF-8").replaceAll("\\+", "%20");
response.setHeader("Content-disposition", "attachment;filename*=utf-8''" + fileName + ".xlsx");
EasyExcel.write(response.getOutputStream(), AskCarSheetDetailDownloadBean.class).sheet("單據(jù)詳情信息").doWrite(askCarSheetDetailDownloadBeanList);
}
三借尿、導(dǎo)入Excel表格
(1)刨晴、核心處理方法
/**
* 上傳單據(jù)信息
*
* @param file 文件
* @return SimpleMessage
*/
@Override
@Transactional(rollbackFor = Exception.class)
public SimpleMessage uploadSheetInfo(MultipartFile file) throws IOException {
// 接收解析出的目標(biāo)對(duì)象(Student)
List<OutStockSheetBean> outStockSheetBeanList = new ArrayList<>();
// excel中表的列要與對(duì)象的字段相對(duì)應(yīng)
EasyExcel.read(file.getInputStream(), OutStockSheetBean.class, new AnalysisEventListener<OutStockSheetBean>() {
// 每解析一條數(shù)據(jù)都會(huì)調(diào)用該方法
@Override
public void invoke(OutStockSheetBean outStockSheetBean, AnalysisContext analysisContext) {
log.info("文件信息[{}]", JSON.toJSONString(outStockSheetBean));
outStockSheetBeanList.add(outStockSheetBean);
}
// 解析完畢的回調(diào)方法
@Override
public void doAfterAllAnalysed(AnalysisContext analysisContext) {
log.info("讀取完畢:共[{}]行", outStockSheetBeanList.size());
// 執(zhí)行后續(xù)操作
}
}).sheet().doRead();
}
return new SimpleMessage(ErrorCodeEnum.OK, "成功讀取" + outStockSheetBeanList.size() + "條數(shù)據(jù)");
}