JAVA中階培訓(xùn)(二)-RocketMQ與EasyExcel

前情提要

本次培訓(xùn)為JAVA中階培訓(xùn)(一)的拓展,主要講述RocketMQ的相關(guān)原理與使用旭蠕,以及附帶EasyExcel的使用方法,著重于提高Java初級(jí)程序員對(duì)Java技術(shù)的更加深層次的理解恼除,進(jìn)而培養(yǎng)自己的架構(gòu)思維兢仰,在今后的技術(shù)選型中能夠更加從容。

同JAVA中階(一)培訓(xùn)一樣贩挣,所有例子均來自項(xiàng)目實(shí)例喉前。

一、RocketMQ

1王财、什么是消息隊(duì)列卵迂?

在很久很久以前,人們之間的通信方式就是面對(duì)面交談绒净,你說一句见咒,我聽一句,雖然簡單可靠疯溺,但是弊端也很大论颅。

比如,當(dāng)你成為一個(gè)軍隊(duì)的首領(lǐng)囱嫩,每個(gè)屬下一有情況就立刻向你匯報(bào)恃疯,一個(gè)還好,但當(dāng)你的屬下有幾十個(gè)幾百個(gè)的時(shí)候墨闲,他們每天不分時(shí)間不看場合今妄,都在嘰嘰喳喳和你匯報(bào)情況,那你可能什么都聽不到鸳碧,而且腦袋都要炸掉了盾鳞。這個(gè)時(shí)候,你說停瞻离,都給我停下腾仅,要匯報(bào)情況的,去門口排隊(duì)套利,一個(gè)一個(gè)的來推励,這個(gè)就叫做流量削峰鹤耍,一群人不要一擁而上,都乖乖給我排隊(duì)去验辞。

然后你就一個(gè)接一個(gè)的聽稿黄,聽了整整24個(gè)小時(shí),實(shí)在困的不行跌造,尋思著這樣不行呀杆怕,如此下去可能就要天妒英才了,于是你又說壳贪,來人陵珍,發(fā)筆和紙,都把要匯報(bào)的消息寫在紙上撑碴,寫完后告訴呂秀才撑教,然后聽呂秀才的指示,沿著屋里右面墻根醉拓,按照指示的位置疊放整齊伟姐,匯報(bào)的人就可以退下該做啥做啥去吧,等我休息一下亿卤,再來看你們的匯報(bào)內(nèi)容愤兵,這就叫做異步處理,你終于可以由自己掌控消息獲取的進(jìn)度了排吴,美滋滋的去睡覺了秆乳。

而匯報(bào)的人把內(nèi)容寫在紙上,疊放好钻哩,就可以退下自己做自己該做的事情屹堰,而不是一直在門口等待匯報(bào),這個(gè)就叫做解耦街氢。

削峰扯键,異步,解耦珊肃。這就是消息隊(duì)列最常用的三大場景荣刑。

故事中的下屬們,就是消息生產(chǎn)者角色伦乔,屋子右面墻根那塊地就是消息持久化厉亏,呂秀才就是消息調(diào)度中心,而你就是消息消費(fèi)者角色烈和。下屬們匯報(bào)的消息爱只,應(yīng)該疊放在哪里,這個(gè)消息又應(yīng)該在哪里才能找到招刹,全靠呂秀才的驚人記憶力恬试,才可以讓消息準(zhǔn)確的被投放以及消費(fèi)沥匈。

正題

消息隊(duì)列作為高并發(fā)系統(tǒng)的核心組件之一,能夠幫助業(yè)務(wù)系統(tǒng)解構(gòu)提升開發(fā)效率和系統(tǒng)穩(wěn)定性忘渔。主要具有以下優(yōu)勢:

削峰填谷(主要解決瞬時(shí)寫壓力大于應(yīng)用服務(wù)能力導(dǎo)致消息丟失、系統(tǒng)奔潰等問題)
系統(tǒng)解耦(解決不同重要程度缰儿、不同能力級(jí)別系統(tǒng)之間依賴導(dǎo)致一死全死)
提升性能(當(dāng)存在一對(duì)多調(diào)用時(shí)畦粮,可以發(fā)一條消息給消息系統(tǒng),讓消息系統(tǒng)通知相關(guān)系統(tǒng))
蓄流壓測(線上有些鏈路不好壓測乖阵,可以通過堆積一定量消息再放開來壓測)
目前主流的MQ主要是Rocketmq宣赔、kafka、Rabbitmq瞪浸,Rocketmq相比于Rabbitmq儒将、kafka具有主要優(yōu)勢特性有:
? 支持事務(wù)型消息(消息發(fā)送和DB操作保持兩方的最終一致性,rabbitmq和kafka不支持)
? 支持結(jié)合rocketmq的多個(gè)系統(tǒng)之間數(shù)據(jù)最終一致性(多方事務(wù)对蒲,二方事務(wù)是前提)
? 支持18個(gè)級(jí)別的延遲消息(rabbitmq和kafka不支持)
? 支持指定次數(shù)和時(shí)間間隔的失敗消息重發(fā)(kafka不支持钩蚊,rabbitmq需要手動(dòng)確認(rèn))
? 支持consumer端tag過濾,減少不必要的網(wǎng)絡(luò)傳輸(rabbitmq和kafka不支持)
? 支持重復(fù)消費(fèi)(rabbitmq不支持蹈矮,kafka支持)

Rocketmq砰逻、kafka、Rabbitmq的詳細(xì)對(duì)比泛鸟,請參照下表格:


各MQ對(duì)比

2蝠咆、RocketMQ詳解

部署結(jié)構(gòu)

我們來類比一下現(xiàn)實(shí)生活,有一個(gè)人想要給另外一個(gè)人寄快件北滥,那么就需要先由這個(gè)人在網(wǎng)上查詢有哪些郵局刚操,然后選擇其中一個(gè)郵局,把快件投遞給它再芋,再由郵局配送到目標(biāo)人菊霜。

形象比喻圖

1、路由注冊
郵局在竣工后祝闻,需要與衛(wèi)星聯(lián)網(wǎng)占卧,將自己納入衛(wèi)星網(wǎng)絡(luò)管理中,這樣就相當(dāng)于對(duì)外宣布联喘,我這個(gè)郵局開始運(yùn)營了华蜒,可以收發(fā)郵件快遞了。 郵局并網(wǎng)之后,如何讓衛(wèi)星持續(xù)并及時(shí)感知這個(gè)郵局在線以及郵局自身信息的調(diào)整,使衛(wèi)星可以隨時(shí)協(xié)調(diào)這個(gè)郵局呢叠蝇?

這個(gè)時(shí)候就需要郵局定時(shí)向衛(wèi)星發(fā)一條信息: “嗶嗶嗶————我是郵局C呆细,編號(hào)SHC爷光,地址XXXXX形葬,歸屬中國上海集群合蔽,在線料皇,此時(shí)此刻2019年3月15日13點(diǎn)21秒”

衛(wèi)星接收到消息后啥辨,拿個(gè)小本本記錄下來:

“郵局B涡匀,BJB,北京溉知,2019年3月15日13點(diǎn)10秒陨瘩,活著...”

“郵局A,BJA级乍,北京舌劳,2019年3月15日13點(diǎn)15秒,活著...”

“郵局C玫荣,SHC甚淡,上海,2019年3月15日13點(diǎn)21秒捅厂,活著...”

路由注冊示意圖

上面這個(gè)故事贯卦,就講述了NameServer路由注冊的基本原理。

NameServer就相當(dāng)于衛(wèi)星恒傻,內(nèi)部會(huì)維護(hù)一個(gè)Broker表脸侥,用來動(dòng)態(tài)存儲(chǔ)Broker的信息。 而Broker就相當(dāng)于郵局盈厘,在啟動(dòng)的時(shí)候睁枕,會(huì)先遍歷NameServer列表,依次發(fā)起注冊請求沸手,保持長連接外遇,然后每隔30秒向NameServer發(fā)送心跳包,心跳包中包含BrokerId契吉、Broker地址跳仿、Broker名稱、Broker所屬集群名稱等等捐晶,然后NameServer接收到心跳包后菲语,會(huì)更新時(shí)間戳,記錄這個(gè)Broker的最新存活時(shí)間惑灵。

NameServer在處理心跳包的時(shí)候山上,存在多個(gè)Broker同時(shí)操作一張Broker表,為了防止并發(fā)修改Broker表導(dǎo)致不安全英支,路由注冊操作引入了ReadWriteLock讀寫鎖佩憾,這個(gè)設(shè)計(jì)亮點(diǎn)允許多個(gè)消息生產(chǎn)者并發(fā)讀,保證了消息發(fā)送時(shí)的高并發(fā),但是同一時(shí)刻N(yùn)ameServer只能處理一個(gè)Broker心跳包妄帘,多個(gè)心跳包串行處理楞黄。這也是讀寫鎖的經(jīng)典使用場景,即讀多寫少抡驼。

2鬼廓、路由剔除

路由剔除

忽然有一天,郵局C的機(jī)房進(jìn)老鼠了致盟,咬斷電源線宕機(jī)了桑阶,而衛(wèi)星不知道郵局C業(yè)務(wù)故障了,依舊將帶有郵局C的郵局表信息傳給寄件人(生產(chǎn)者)勾邦,寄件人聯(lián)系郵局C發(fā)送快件,但是郵局C機(jī)房宕機(jī)割择,業(yè)務(wù)暫停眷篇,處于癱瘓狀態(tài),自然也就無法接收快件了荔泳。 另一方面蕉饼,因?yàn)榭旒茨鼙秽]局C收入,也就無法將快件轉(zhuǎn)交給收件人玛歌,顧客們久久等不到自己的快件昧港,紛紛投訴,為此郵局C的管理層備受責(zé)難支子。

于是郵政總局技術(shù)部開始研究討論创肥,怎么讓衛(wèi)星可以感知到郵局“失聯(lián)了”,從而自動(dòng)排除故障郵局值朋,將其負(fù)責(zé)的業(yè)務(wù)交付給其他正常的郵局處理叹侄,這樣就不會(huì)因?yàn)槟骋粋€(gè)郵局出現(xiàn)問題,而導(dǎo)致這個(gè)郵局所管轄的部分業(yè)務(wù)無法處理昨登。

大家眾說紛紜趾代,最后敲定了一個(gè)方案,讓衛(wèi)星每隔一段時(shí)間掃描郵局信息表丰辣,如果發(fā)現(xiàn)某個(gè)郵局上報(bào)信息時(shí)間與當(dāng)時(shí)掃描時(shí)間之間的差值超過了某個(gè)預(yù)設(shè)的閾值撒强,就判定這個(gè)郵局“失聯(lián)了”,將此郵局信息從郵局表中剔除笙什。這樣寄件人查詢到的郵局表里都是正常營業(yè)的郵局信息飘哨。

新功能上線運(yùn)營后,效果不錯(cuò)得湘,大家再也不用擔(dān)心因?yàn)槟硞€(gè)郵局故障而導(dǎo)致業(yè)務(wù)停滯杖玲,又過上了泡茶報(bào)紙的生活。

這個(gè)故事同樣在RocketMQ中上演淘正。

正常情況下摆马,如果Broker關(guān)閉臼闻,則會(huì)與NameServer斷開長連接,Netty的通道關(guān)閉監(jiān)聽器會(huì)監(jiān)聽到連接斷開事件囤采,然后會(huì)將這個(gè)Broker信息剔除掉述呐。

異常情況下,NameServer中有一個(gè)定時(shí)任務(wù)蕉毯,每隔10秒掃描一下Broker表乓搬,如果某個(gè)Broker的心跳包最新時(shí)間戳距離當(dāng)前時(shí)間超多120秒,也會(huì)判定Broker失效并將其移除代虾。

細(xì)心的人會(huì)發(fā)現(xiàn)一個(gè)問題进肯,NameServer在清除失活Broker之后,并沒有主動(dòng)通知生產(chǎn)者棉磨,生產(chǎn)者每隔30秒會(huì)請求NameServer并獲取最新的路由表江掩,那么就意味著,消息生產(chǎn)者總會(huì)有30秒的延時(shí)乘瓤,無法實(shí)時(shí)感知Broker服務(wù)器的宕機(jī)环形。所以在這個(gè)30秒里,生產(chǎn)者依舊會(huì)向失活Broker發(fā)送消息衙傀,那么消息發(fā)送的高可用性如何保證呢抬吟?

3、Broker高可用原理

要解決這個(gè)問題得首先談一談Broker的負(fù)載策略统抬,消息發(fā)送隊(duì)列默認(rèn)采用輪詢機(jī)制火本,消息發(fā)送時(shí)默認(rèn)選擇異常重試機(jī)制來保證消息發(fā)送的高可用。當(dāng)Broker宕機(jī)后聪建,雖然消息發(fā)送者無法第一時(shí)間感知Broker 宕機(jī)发侵,但是當(dāng)消息生產(chǎn)者向Broker發(fā)送消息返回異常后,消息生產(chǎn)者會(huì)選擇另外一個(gè)Broker上的消息隊(duì)列妆偏,這樣就規(guī)避了發(fā)生故障的Broker刃鳄,結(jié)合重試機(jī)制,巧妙實(shí)現(xiàn)消息發(fā)送的高可用钱骂,同時(shí)由于不需要NameServer通知眾多不固定的生產(chǎn)者叔锐,也降低了NameServer實(shí)現(xiàn)的復(fù)雜性。

2见秽、在降低NameServer實(shí)現(xiàn)復(fù)雜性方面愉烙,還有一個(gè)設(shè)計(jì)亮點(diǎn)就是NameServer之間是彼此獨(dú)立無交流的,也就是說NameServer服務(wù)器之間在某個(gè)時(shí)刻的數(shù)據(jù)并不會(huì)完全相同解取,但是異常重試機(jī)制使得這種差異不會(huì)造成任何影響步责。


NameServer彼此無交流圖示

4、路由發(fā)現(xiàn)
天上的衛(wèi)星是有限的,不易變的蔓肯,而地上的寄件人是繁多的遂鹊,易變的。所以寄件人想要知道有哪些郵局蔗包,很明顯最適合的方式是向衛(wèi)星發(fā)請求秉扑,拉取郵局表信息,而不是等衛(wèi)星給每個(gè)人推送调限。 所以在RocketMQ中舟陆,NameServer是不主動(dòng)推送會(huì)客戶端的,而是由客戶端拉取主題的最新路由信息耻矮。

路由發(fā)現(xiàn)示意圖

5秦躯、CAP理論
NameServer作為注冊和發(fā)現(xiàn)中心,是整個(gè)分布式消息隊(duì)列調(diào)度的靈魂裆装,談及到分布式宦赠,就逃不開CAP理論,C是Consistency(一致性)米母,A是Availability(可用性),P是(容錯(cuò)性)毡琉,對(duì)于分布式架構(gòu)铁瞒,網(wǎng)絡(luò)條件不可控,出現(xiàn)網(wǎng)絡(luò)分區(qū)是不可避免的桅滋,因此必須具備分區(qū)容錯(cuò)性慧耍,那么NameServer就是在AP還是CP中選擇了,由于NameServer之間相互獨(dú)立丐谋,很明顯芍碧,是一個(gè)AP設(shè)計(jì)。

正題

1) Name Server(路由注冊中?)(呂秀才)(衛(wèi)星)

Name Server是一個(gè)幾乎無狀態(tài)節(jié)點(diǎn)号俐,可集群部署泌豆,節(jié)點(diǎn)之間無任何信息同步。

2) Broker (消息存儲(chǔ)服務(wù)器)(墻根)(郵局)

Broker部署相對(duì)復(fù)雜吏饿,Broker分為Master與Slave踪危,一個(gè)Master可以對(duì)應(yīng)多個(gè)Slave,但是一個(gè)Slave只能對(duì)應(yīng)一個(gè)Master猪落,Master與Slave的對(duì)應(yīng)關(guān)系通過指定相同的Broker Name贞远,不同的Broker Id來定義,BrokerId為0表示Master笨忌,非0表示Slave蓝仲。Master也可以部署多個(gè)。

每個(gè)Broker與Name Server集群中的所有節(jié)點(diǎn)建立長連接,定時(shí)(每隔30s)注冊Topic信息到所有Name Server袱结。Name Server定時(shí)(每隔10s)掃描所有存活broker的連接亮隙,如果Name Server超過2分鐘沒有收到心跳,則Name Server斷開與Broker的連接擎勘。

3) Producer(消息?產(chǎn)者咱揍,?于向消息服務(wù)器發(fā)送消息)(士兵)(發(fā)件人)

Producer與Name Server集群中的其中一個(gè)節(jié)點(diǎn)(隨機(jī)選擇)建立長連接,定期從Name Server取Topic路由信息棚饵,并向提供Topic服務(wù)的Master建立長連接煤裙,且定時(shí)向Master發(fā)送心跳。Producer完全無狀態(tài)噪漾,可集群部署硼砰。

Producer每隔30s(由ClientConfig的pollNameServerInterval)從Name server獲取所有topic隊(duì)列的最新情況,這意味著如果Broker不可用欣硼,Producer最多30s能夠感知题翰,在此期間內(nèi)發(fā)往Broker的所有消息都會(huì)失敗。

Producer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關(guān)聯(lián)的broker發(fā)送心跳诈胜,Broker每隔10s中掃描所有存活的連接豹障,如果Broker在2分鐘內(nèi)沒有收到心跳數(shù)據(jù),則關(guān)閉與Producer的連接焦匈。

4) Consumer(消息消費(fèi)者)(你-將軍)(收件人)

Consumer與Name Server集群中的其中一個(gè)節(jié)點(diǎn)(隨機(jī)選擇)建立長連接血公,定期從Name Server取Topic路由信息,并向提供Topic服務(wù)的Master缓熟、Slave建立長連接累魔,且定時(shí)向Master、Slave發(fā)送心跳够滑。Consumer既可以從Master訂閱消息垦写,也可以從Slave訂閱消息,訂閱規(guī)則由Broker配置決定彰触。

Consumer每隔30s從Name server獲取topic的最新隊(duì)列情況梯投,這意味著Broker不可用時(shí),Consumer最多最需要30s才能感知况毅。

Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關(guān)聯(lián)的broker發(fā)送心跳晚伙,Broker每隔10s掃描所有存活的連接,若某個(gè)連接2分鐘內(nèi)沒有發(fā)送心跳數(shù)據(jù)俭茧,則關(guān)閉連接咆疗;并向該Consumer Group的所有Consumer發(fā)出通知,Group內(nèi)的Consumer重新分配隊(duì)列母债,然后繼續(xù)消費(fèi)午磁。

當(dāng)Consumer得到master宕機(jī)通知后尝抖,轉(zhuǎn)向slave消費(fèi),slave不能保證master的消息100%都同步過來了迅皇,因此會(huì)有少量的消息丟失昧辽。但是一旦master恢復(fù),未同步過去的消息會(huì)被最終消費(fèi)掉登颓。

消費(fèi)者對(duì)列是消費(fèi)者連接之后(或者之前有連接過)才創(chuàng)建的搅荞。我們將原生的消費(fèi)者標(biāo)識(shí)由 {IP}@{消費(fèi)者group}擴(kuò)展為 {IP}@{消費(fèi)者group}{topic}{tag},(例如xxx.xxx.xxx.xxx@mqtest_producer-group_2m2sTest_tag-zyk)框咙。任何一個(gè)元素不同咕痛,都認(rèn)為是不同的消費(fèi)端,每個(gè)消費(fèi)端會(huì)擁有一份自己消費(fèi)對(duì)列(默認(rèn)是broker對(duì)列數(shù)量*broker數(shù)量)喇嘱。新掛載的消費(fèi)者對(duì)列中擁有commitlog中的所有數(shù)據(jù)茉贡。

3、實(shí)際代碼

一者铜、基礎(chǔ)配置

(1)腔丧、MAVEN 引入(使用阿里云的RocketMQ付費(fèi)服務(wù),不自行搭建)

        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.2.6</version>
        </dependency>

(2)作烟、配置相應(yīng)的賬號(hào)信息(mq-config.properties)

ProducerId=保密
ConsumerId=保密
Topic=保密
AccessKey=保密
SecretKey=保密
expression=*

二愉粤、生產(chǎn)者使用

(1)、生產(chǎn)者發(fā)送消息核心方法

/**
 * MQ生產(chǎn)者-發(fā)布
 *
 * @author 陳豆豆
 * @date 2020-03-31
 */
@Slf4j
public class RocketMqProducer {

    /**
     * 消息發(fā)送
     *
     * @param producer 生產(chǎn)者
     * @param topic    主體
     * @param bean     消息實(shí)體
     * @throws UnsupportedEncodingException 編碼格式異常
     * @throws JsonProcessingException      格式轉(zhuǎn)換異常
     */
    public static void rocketMqSend(Producer producer, String topic, BaseRocketMqBean bean) throws UnsupportedEncodingException, JsonProcessingException {
        SendResult sendResult = producer.send(new Message(topic, bean.getTag(), bean.getKey(), bean.parseToRocketBytes()));
        log.info("[{}] RocketMQ發(fā)送消息 [{}] 反饋信息 [{}]", topic, bean.toString(), sendResult);
    }
}

(2)拿撩、生產(chǎn)者發(fā)送消息組裝核心方法


/**
 * 超級(jí)店長生產(chǎn)者
 *
 * @author 陳豆豆
 * @date 2020-03-31
 */
@Slf4j
@Service
public class OrangeStoreProducer {
    private static final BlockingQueue<BaseRocketMqBean> QUEUE = new LinkedBlockingQueue<>();
    private static Producer producer;
    private static String Topic = "";

    static {
        try {
            Properties properties = new Properties();
            properties.load(OrangeStoreProducer.class.getClassLoader().getResourceAsStream("mq-config.properties"));
            properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
            Topic = properties.getProperty("Topic");
            producer = ONSFactory.createProducer(properties);
            producer.start();
            log.info("超級(jí)店長生產(chǎn)隊(duì)列啟動(dòng)");
        } catch (IOException e) {
            log.error("超級(jí)店長生產(chǎn)隊(duì)列啟動(dòng)異常: {}", e.getLocalizedMessage(), e);
        }
    }

    public static void addToQueue(BaseRocketMqBean bean) {
        bean.setTag("MS_orange_store_tag");
        bean.setKey("MS_orange_store");
        QUEUE.offer(bean);
    }

    @PostConstruct
    @SuppressWarnings("all")
    public void batchSend() {
        Thread thread = new Thread(() -> {
            {
                while (true) {
                    try {
                        // 超過1秒無法獲取數(shù)據(jù)就等待下次獲取
                        BaseRocketMqBean bean = QUEUE.poll(1, TimeUnit.SECONDS);
                        if (bean == null) {
                            continue;
                        }
                        RocketMqProducer.rocketMqSend(producer, Topic, bean);
                    } catch (Exception e) {
                        log.error("同步數(shù)據(jù)到超級(jí)店長生產(chǎn)隊(duì)列異常:{}", e.getLocalizedMessage(), e);
                    }
                }
            }
        });
        thread.start();
    }
}

添加隊(duì)列方法
  • 講解點(diǎn)
    RocketMQ消息分發(fā)的兩種模式
    廣播模式(BROADCASTING)和 集群模式(CLUSTERING)


    集群模式-平均分?jǐn)偹惴ǎJ(rèn))

    集群模式-環(huán)狀輪流算法

    廣播模式

(3)衣厘、消息傳遞結(jié)構(gòu)體

基礎(chǔ)結(jié)構(gòu)體

/**
 * 阿里云rocketMq隊(duì)列實(shí)體bean父類
 *
 * @author 陳豆豆
 * @date 2020-03-31
 */
@Data
public abstract class BaseRocketMqBean implements Serializable {
    private static final long serialVersionUID = 29723539849425729L;
    /**
     * 隊(duì)列tag
     */
    private String tag;
    /**
     * 隊(duì)列messageKey
     */
    private String key;

    /**
     * bean 轉(zhuǎn)換成阿里云json
     *
     * @return json
     * @throws JsonProcessingException 格式轉(zhuǎn)換異常
     */
    public byte[] parseToRocketBytes() throws JsonProcessingException, UnsupportedEncodingException {
        return new ObjectMapper().writeValueAsString(this).getBytes("UTF-8");
    }
}

同步數(shù)據(jù)結(jié)構(gòu)體

/**
 * 同步數(shù)據(jù)專用bean
 *
 * @author 陳豆豆
 * @date 2020/4/01 10:04
 */
public class SyncMessageBean<T> extends BaseRocketMqBean implements Serializable {
    private static final long serialVersionUID = -1535244308014581262L;
    /**
     * 操作結(jié)果
     */
    @JsonIgnore
    private Boolean success;
    /**
     * 標(biāo)題
     */
    private String title;
    /**
     * 提示消息
     */
    private String errorMsg;
    /**
     * 返回碼
     */
    @JsonIgnore
    private Integer errorCode;
    /**
     * 對(duì)象實(shí)體
     */
    private Object data;

    public SyncMessageBean() {
        super();
        this.success = success;
        this.errorMsg = errorMsg;
    }

    public SyncMessageBean(Boolean success, String message, T data, String title) {
        super();
        this.success = success;
        this.errorMsg = message;
        this.data = data;
        this.title = title;
    }

    public SyncMessageBean(boolean success, String message) {
        super();
        this.title = "操作提示";
        this.success = success;
        this.errorMsg = message;
    }

    public SyncMessageBean(Exception ex) {
        super();
        this.success = false;
        this.errorMsg = StringUtils.defaultIfBlank(ex.getMessage(), "程序異常");
    }

    public static SyncMessageBean success() {
        SyncMessageBean messageBean = new SyncMessageBean();
        messageBean.setSuccess(true);
        messageBean.setErrorMsg("操作成功");
        return messageBean;
    }

    public static SyncMessageBean success(String msg) {
        SyncMessageBean messageBean = new SyncMessageBean();
        messageBean.setSuccess(true);
        messageBean.setErrorMsg(msg);
        return messageBean;
    }

    public static SyncMessageBean fail(String msg) {
        SyncMessageBean messageBean = new SyncMessageBean();
        messageBean.setSuccess(false);
        messageBean.setErrorCode(ErrorCodeEnum.ERROR.getCode());
        messageBean.setErrorMsg(msg);
        messageBean.setTitle();
        return messageBean;
    }

    public static SyncMessageBean fail() {
        return fail("操作失敗");
    }

    public Boolean getSuccess() {
        return success;
    }

    public void setSuccess(Boolean success) {
        this.success = success;
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public void setTitle() {
        this.title = "ERROR";
    }

    public String getErrorMsg() {
        return errorMsg;
    }

    public void setErrorMsg(String errorMsg) {
        this.errorMsg = errorMsg;
    }

    public Integer getErrorCode() {
        return errorCode;
    }

    public void setErrorCode(Integer errorCode) {
        this.errorCode = errorCode;
    }

    public Object getData() {
        return data;
    }

    public void setData(Object data) {
        this.data = data;
    }

    @Override
    public String toString() {
        return "MessageBean{" +
                "success=" + success +
                ", title='" + title + '\'' +
                ", errorMsg='" + errorMsg + '\'' +
                ", errorCode=" + errorCode +
                ", data=" + data +
                '}';
    }
}

(4)、生產(chǎn)消息類型

/**
 * 同步類型枚舉
 *
 * @author 陳豆豆
 * @date 2020/4/1 10:00
 */
public enum ProducerTypeEnum {
    /**
     * 店員信息同步
     */
    POS_CASHIER(0, "POS_CASHIER"),
    /**
     * 門店電費(fèi)信息同步
     */
    STORE_ELECTRICITY(1, "STORE_ELECTRICITY"),
  ;

    /**
     * 編碼
     */
    private Integer code;
    /**
     * 名稱
     */
    private String name;

    ProducerTypeEnum(Integer code, String name) {
        this.code = code;
        this.name = name;
    }

    public Integer getCode() {
        return code;
    }

    public void setCode(Integer code) {
        this.code = code;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

(5)绷雏、調(diào)用方法

    /**
     * 同步門店信息到物聯(lián)網(wǎng)系統(tǒng)
     *
     * @param storeCode 門店編號(hào)
     */
    @Override
    public void syncStoreForSmart(String storeCode) {
        // 查詢對(duì)應(yīng)的門店信息
        StoreForSmart store = storeInfoDao.queryStoreForSmart(storeCode);
        if (Objects.isNull(store)) {
            return;
        }
        SyncMessageBean messageBean = new SyncMessageBean();
        messageBean.setErrorMsg(ProducerTypeEnum.SYNC_STORE_TO_SMART.getName());
        messageBean.setData(JSON.toJSONString(store));
        OrangeStoreProducer.addToQueue(messageBean);
    }

三、消費(fèi)者

(1)怖亭、接受消息核心組件

/**
 * @description:接受消息核心組件
 * @author: tangn
 * @time: 2021/2/7 15:14
 */
@Slf4j
@Component
public class ReceiveHandleComponent {

    public static final BlockingQueue<SyncMessageBean> RECEIVE_HANDLE = new LinkedBlockingQueue<>();
    @Resource
    private ConsumerService consumerService;

    /**
     * 創(chuàng)建線程池
     */
    private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder()
            .setNameFormat("ReceiveHandleComponent-%d").setDaemon(true).build();
    private static final ExecutorService RECEIVE_HANDLE_POLL = Executors.newFixedThreadPool(16, THREAD_FACTORY);

    @PostConstruct
    public void judgeFace() {
        //執(zhí)行線程
        RECEIVE_HANDLE_POLL.execute(() -> {
            while (true) {
                try {
                    // 設(shè)置隊(duì)列超過1秒無法獲取數(shù)據(jù)就等待下次獲取
                    SyncMessageBean syncMessageBean = RECEIVE_HANDLE.poll(1, TimeUnit.SECONDS);
                    if (Objects.isNull(syncMessageBean)) {
                        continue;
                    }
                    consumerService.receiveHandle(syncMessageBean);
                } catch (Exception e) {
                    log.warn("訂閱處理線程異常[{}]", e.getLocalizedMessage(), e);
                }
            }
        });
    }
}

(2)涎显、消費(fèi)者注冊

/**
 * @description:消費(fèi)者注冊
 * @author: tangn
 * @time: 2021/2/7 15:09
 */
@Slf4j
@Component
public class QianYunDataConsumer extends ConsumerBean {
    private final QianYunDataListener qianYunDataListener;

    @Autowired
    public QianYunDataConsumer(QianYunDataListener qianYunDataListener) {
        this.qianYunDataListener = qianYunDataListener;
    }

    @SuppressWarnings("Duplicates")
    private Subscription postInit() {
        Subscription subscription = new Subscription();
        try {
            Properties properties = new Properties();
            properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
            properties.load(this.getClass().getClassLoader().getResourceAsStream("mq-config.properties"));
            this.setProperties(properties);

            String topic = properties.getProperty("Topic");
            String expression = properties.getProperty("expression");
            subscription.setExpression(expression);
            subscription.setTopic(topic);
            log.info("消費(fèi)者配置文件加載成功");
        } catch (IOException e) {
            log.error("消費(fèi)者初始化,配置文件加載異常 {}", e.getLocalizedMessage(), e);
        }
        return subscription;
    }

    @PostConstruct
    @Override
    public void start() {
        Map<Subscription, MessageListener> consumers = new HashMap<>(1);
        consumers.put(postInit(), qianYunDataListener);
        this.setSubscriptionTable(consumers);
        log.info("消費(fèi)者啟動(dòng)");
        super.start();
    }

    @PreDestroy
    @Override
    public void shutdown() {
        log.info("消費(fèi)者關(guān)閉");
        super.shutdown();
    }
}

(4)、消費(fèi)者消息監(jiān)聽

/**
 * @description:消費(fèi)者消息監(jiān)聽
 * @author: tangning
 * @time: 2021/2/7 15:12
 */
@Slf4j
@Service
public class QianYunDataListener implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext consumeContext) {
        try {
            log.info("[{}] 消費(fèi)隊(duì)列接收信息,body [{}] ", message, message == null ? "null" : new String(message.getBody(), "ISO8859-1"));
            SyncMessageBean messageBean = MqTextUtil.convertMessageToBean(message, SyncMessageBean.class);
            log.info("messageId [{}] 訂閱信息返回 [{}]", message.getMsgID(), messageBean);
            if (Objects.nonNull(messageBean)){
                ReceiveHandleComponent.RECEIVE_HANDLE.offer(messageBean);
            }
            return Action.CommitMessage;
        } catch (ErrorCodeException e) {
            log.warn("[{}] 消費(fèi)者異常 [{}]", message, e.getLocalizedMessage(), e);
            return Action.ReconsumeLater;
        } catch (Throwable e) {
            log.error("[{}] 消費(fèi)者異常 [{}]", message, e.getLocalizedMessage(), e);
            return Action.ReconsumeLater;
        }
    }
}

(5)兴猩、消費(fèi)者消息處理(根據(jù)自己的業(yè)務(wù)進(jìn)行處理)

/**
 * @description: 訂閱處理
 * @author: MA XIN
 * @time: 2021/2/7 15:26
 */
@Slf4j
@Service
public class ConsumerServiceImpl implements ConsumerService {

    @Resource
    private StoreService storeService;

    /**
     * 訂閱處理
     *
     * @param messageBean 訂閱消息
     */
    @Override
    public void receiveHandle(SyncMessageBean<T> messageBean) {
        // 門店信息同步
        if (ConsumerTypeEnum.SYNC_STORE_TO_SMART.getName().equals(messageBean.getErrorMsg())) {
            StoreInfo storeInfo = JSON.parseObject(messageBean.getData().toString(), StoreInfo.class);
            log.info("門店信息同步訂閱解析[{}]", storeInfo);
            SimpleMessage simpleMessage = null;
            try {
                simpleMessage = storeService.syncStoreInfo(storeInfo);
            } catch (Exception e) {
                log.error("門店信息同步異常[{}]", storeInfo, e);
            }
            log.info("店信息同步結(jié)果[{}],[{}]", storeInfo, simpleMessage);
        }
    }
}

二期吓、EasyExcel 使用

優(yōu)勢:EasyExcel是一個(gè)基于Java的簡單、省內(nèi)存的讀寫Excel的開源項(xiàng)目倾芝。在盡可能節(jié)約內(nèi)存的情況下支持讀寫百M(fèi)的Excel讨勤。 github地址:https://github.com/alibaba/easyexcel

直接上代碼

1、基礎(chǔ)引入

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>easyexcel</artifactId>
            <version>1.1.2-beat1</version>
        </dependency>

2晨另、導(dǎo)出EXCEL

(1)潭千、下載結(jié)構(gòu)體

/**
 * @author: 唐寧
 * @date: 2019/5/4
 * @time: 21:05
 */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
public class AskCarSheetDetailDownloadBean implements Serializable {
    private static final long serialVersionUID = -7779152957404073436L;
    /**
     * 車輛VIN碼
     */
    @ExcelProperty("VIN碼")
    private String carCode;
    /**
     * 車輛狀態(tài)
     */
    @ExcelProperty("車輛狀態(tài)")
    private String carStateIntro;
    /**
     * 倉庫名稱
     */
    @ExcelProperty("倉庫名稱")
    private String areaName;
    /**
     * 區(qū)域名稱
     */
    @ExcelProperty("區(qū)域名稱")
    private String positionName;
    /**
     * 第幾排
     */
    @ExcelProperty("第幾排")
    private Integer row;
    /**
     * 第幾列
     */
    @ExcelProperty("第幾列")
    private Integer position;

}

(2)、核心代碼及使用方法

    /**
     * 下載單據(jù)詳情數(shù)據(jù)
     *
     * @param response 響應(yīng)
     * @param dto      查詢參數(shù)
     * @throws IOException
     */
    @Override
    public void downloadAskCarSheetDetail(HttpServletResponse response, AskCarSheetDetailDTO dto) throws IOException {
        List<AskCarSheetDetailVO> askCarSheetDetailList = sheetDao.getAskCarSheetDetailList(dto);
        // 處理下載數(shù)據(jù)
        List<AskCarSheetDetailDownloadBean> askCarSheetDetailDownloadBeanList = new ArrayList<>();
        askCarSheetDetailList.forEach(askCarSheetDetailVO -> {
            // 構(gòu)建基礎(chǔ)
            AskCarSheetDetailDownloadBean askCarSheetDetailDownloadBean = AskCarSheetDetailDownloadBean.builder()
                    .carCode(askCarSheetDetailVO.getCarCode())
                    .areaName(askCarSheetDetailVO.getAreaName())
                    .positionName(askCarSheetDetailVO.getPositionName())
                    .row(askCarSheetDetailVO.getRow())
                    .position(askCarSheetDetailVO.getPosition())
                    .build();
            // 車輛狀態(tài)
            if (Objects.nonNull(dto.getCarState())) {
                askCarSheetDetailDownloadBean.setCarStateIntro(dto.getCarState().getStr());
            } else {
                askCarSheetDetailDownloadBean.setCarStateIntro("未找到");
            }
            // 添加數(shù)據(jù)
            askCarSheetDetailDownloadBeanList.add(askCarSheetDetailDownloadBean);
        });
        // 處理下載
        response.setContentType("application/vnd.ms-excel");
        response.setCharacterEncoding("utf-8");
        // 這里URLEncoder.encode可以防止中文亂碼 當(dāng)然和easyexcel沒有關(guān)系
        String fileName = URLEncoder.encode("單據(jù)詳情信息", "UTF-8").replaceAll("\\+", "%20");
        response.setHeader("Content-disposition", "attachment;filename*=utf-8''" + fileName + ".xlsx");
        EasyExcel.write(response.getOutputStream(), AskCarSheetDetailDownloadBean.class).sheet("單據(jù)詳情信息").doWrite(askCarSheetDetailDownloadBeanList);
    }

三借尿、導(dǎo)入Excel表格

(1)刨晴、核心處理方法

 /**
     * 上傳單據(jù)信息
     *
     * @param file 文件
     * @return SimpleMessage
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public SimpleMessage uploadSheetInfo(MultipartFile file) throws IOException {
        // 接收解析出的目標(biāo)對(duì)象(Student)
        List<OutStockSheetBean> outStockSheetBeanList = new ArrayList<>();
        // excel中表的列要與對(duì)象的字段相對(duì)應(yīng)
        EasyExcel.read(file.getInputStream(), OutStockSheetBean.class, new AnalysisEventListener<OutStockSheetBean>() {
            // 每解析一條數(shù)據(jù)都會(huì)調(diào)用該方法
            @Override
            public void invoke(OutStockSheetBean outStockSheetBean, AnalysisContext analysisContext) {
                log.info("文件信息[{}]", JSON.toJSONString(outStockSheetBean));
                outStockSheetBeanList.add(outStockSheetBean);
            }

            // 解析完畢的回調(diào)方法
            @Override
            public void doAfterAllAnalysed(AnalysisContext analysisContext) {
                log.info("讀取完畢:共[{}]行", outStockSheetBeanList.size());
                // 執(zhí)行后續(xù)操作
            }
        }).sheet().doRead();
    }
 return new SimpleMessage(ErrorCodeEnum.OK, "成功讀取" + outStockSheetBeanList.size() + "條數(shù)據(jù)");
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末屉来,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子狈癞,更是在濱河造成了極大的恐慌茄靠,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,122評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蝶桶,死亡現(xiàn)場離奇詭異慨绳,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)真竖,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門脐雪,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人疼邀,你說我怎么就攤上這事喂江。” “怎么了旁振?”我有些...
    開封第一講書人閱讀 164,491評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵获询,是天一觀的道長。 經(jīng)常有香客問我拐袜,道長吉嚣,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,636評(píng)論 1 293
  • 正文 為了忘掉前任蹬铺,我火速辦了婚禮尝哆,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘甜攀。我一直安慰自己秋泄,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評(píng)論 6 392
  • 文/花漫 我一把揭開白布规阀。 她就那樣靜靜地躺著恒序,像睡著了一般。 火紅的嫁衣襯著肌膚如雪谁撼。 梳的紋絲不亂的頭發(fā)上歧胁,一...
    開封第一講書人閱讀 51,541評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音厉碟,去河邊找鬼喊巍。 笑死,一個(gè)胖子當(dāng)著我的面吹牛箍鼓,可吹牛的內(nèi)容都是我干的崭参。 我是一名探鬼主播,決...
    沈念sama閱讀 40,292評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼款咖,長吁一口氣:“原來是場噩夢啊……” “哼阵翎!你這毒婦竟也來了逢并?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,211評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤郭卫,失蹤者是張志新(化名)和其女友劉穎砍聊,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體贰军,經(jīng)...
    沈念sama閱讀 45,655評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡玻蝌,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了词疼。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片俯树。...
    茶點(diǎn)故事閱讀 39,965評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖贰盗,靈堂內(nèi)的尸體忽然破棺而出许饿,到底是詐尸還是另有隱情,我是刑警寧澤舵盈,帶...
    沈念sama閱讀 35,684評(píng)論 5 347
  • 正文 年R本政府宣布陋率,位于F島的核電站,受9級(jí)特大地震影響秽晚,放射性物質(zhì)發(fā)生泄漏瓦糟。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評(píng)論 3 329
  • 文/蒙蒙 一赴蝇、第九天 我趴在偏房一處隱蔽的房頂上張望菩浙。 院中可真熱鬧,春花似錦句伶、人聲如沸劲蜻。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽先嬉。三九已至,卻和暖如春秃殉,著一層夾襖步出監(jiān)牢的瞬間坝初,已是汗流浹背浸剩。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評(píng)論 1 269
  • 我被黑心中介騙來泰國打工钾军, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人绢要。 一個(gè)月前我還...
    沈念sama閱讀 48,126評(píng)論 3 370
  • 正文 我出身青樓吏恭,卻偏偏與公主長得像,于是被迫代替她去往敵國和親重罪。 傳聞我的和親對(duì)象是個(gè)殘疾皇子樱哼,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 一哀九、 MQ背景&選型 消息隊(duì)列作為高并發(fā)系統(tǒng)的核心組件之一,能夠幫助業(yè)務(wù)系統(tǒng)解構(gòu)提升開發(fā)效率和系統(tǒng)穩(wěn)定性搅幅。主要具有...
    designer閱讀 344評(píng)論 0 0
  • 一阅束、 MQ背景&選型 消息隊(duì)列作為高并發(fā)系統(tǒng)的核心組件之一,能夠幫助業(yè)務(wù)系統(tǒng)解構(gòu)提升開發(fā)效率和系統(tǒng)穩(wěn)定性茄唐。主要具有...
    拖拉機(jī)看簡書閱讀 381評(píng)論 0 0
  • 一息裸、 MQ背景&選型 消息隊(duì)列作為高并發(fā)系統(tǒng)的核心組件之一,能夠幫助業(yè)務(wù)系統(tǒng)解構(gòu)提升開發(fā)效率和系統(tǒng)穩(wěn)定性沪编。主要具有...
    彥幀閱讀 472,741評(píng)論 21 300
  • 今天青石的票圈出鏡率最高的腿时,莫過于張藝謀的新片終于定檔了。 一張滿溢著水墨風(fēng)的海報(bào)一次次的出現(xiàn)在票圈里平绩,也就是老謀...
    青石電影閱讀 10,333評(píng)論 1 2
  • 今天主要學(xué)習(xí)了flex布局圈匆,學(xué)習(xí)筆記如下: 1.指定flex布局: display:flex(任意容器)...
    riku_lu閱讀 3,144評(píng)論 2 3