利用SynchronousQueue多線程處理ActiveMQ消息

當(dāng)我們想通過(guò)多條線程處理activemq中的消息,直覺(jué)上會(huì)使用固定大小線程池去處理贝奇,然而這種方式并不妥當(dāng)肥荔,這么做我們只是將消息從activemq轉(zhuǎn)移到線程池的阻塞隊(duì)列之中加酵,當(dāng)線程池開(kāi)始工作,activemq中的消息快速被消費(fèi)完畢姜盈,而消息所代表的任務(wù)卻并未真正被處理低千, 他們被堆積在處理程序的內(nèi)存中,并陸續(xù)由線程中的線程處理馏颂。這會(huì)產(chǎn)生副作用示血,此時(shí)當(dāng)處理程序因?yàn)槟撤N原因而崩潰,這些待處理的任務(wù)都將丟失救拉。

如何實(shí)現(xiàn)既能通過(guò)多個(gè)線程處理任務(wù)难审,又能保證未完成的任務(wù)的安全性,此時(shí) SynchronousQueue 就有了用武之地亿絮。

我們可以把SynchronousQueue 當(dāng)作長(zhǎng)度為1的阻塞隊(duì)列告喊,當(dāng)隊(duì)列被塞入一個(gè)元素,假如這個(gè)元素未被消費(fèi)掉壹无,那么后續(xù)的塞入操作將被阻塞葱绒。我們可以利用它的這個(gè)特性,把它當(dāng)作是activemq與處理線程之間的緩沖層斗锭。在 SynchronousQueue 的一端地淀,我們從activemq中讀取一個(gè)元素,并將它put進(jìn)SynchronousQueue 岖是。在另一端帮毁,多條線程分別從 SynchronousQueue 中 take 元素進(jìn)行處理实苞,只有當(dāng) SynchronousQueue 中不存在任何元素,也就是線程們將當(dāng)前的任務(wù)都處理完畢烈疚,還有一端的從activemq中提取消息的操作才能執(zhí)行黔牵,反之則將被阻塞。 通過(guò)這種方式爷肝,我們便能保證任務(wù)不丟失的同時(shí)又能通過(guò)多線程處理它們猾浦。示例代碼如下

初始化一個(gè) SynchronousQueue

private SynchronousQueue<ActiveMQObjectMessage> synchronousQueue = new SynchronousQueue<>();

從activemq中將消息轉(zhuǎn)移至synchronousQueue,一次轉(zhuǎn)移一條灯抛,如果上一條未被處理金赦,下一條不能繼續(xù)

ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection
                    .DEFAULT_PASSWORD, brokerUrl);
connection = factory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue(dest);
MessageConsumer consumer = session.createConsumer(destination);

while (true) {
  try {
    Message message = consumer.receive();
    if (message instanceof ActiveMQObjectMessage) {
      ActiveMQObjectMessage activeMQObjectMessage = (ActiveMQObjectMessage) message;
      synchronousQueue.put(activeMQObjectMessage);
    } else {
      if (message != null) {
        message.acknowledge();
        logger.error("消息格式錯(cuò)誤,msg={}",message.toString());
      }
    }
  } catch (JMSException | InterruptedException e) {
    e.printStackTrace();
  }
}

開(kāi)啟多條線程同時(shí)處理消息

  Runnable task = () -> {
            while (true) {
                try {
                    ActiveMQObjectMessage activeMQObjectMessage = synchronousQueue.take();
                      //消費(fèi)消息对嚼,處理成功后確認(rèn)
                       boolean complete = handle(msg);
                        if (complete) {
                            activeMQObjectMessage.acknowledge();
                        }
                } catch ( JMSException e) {
                    e.printStackTrace();
                }
            }
        };

        for (int i = 0; i < threads; i++) {
            Thread thread = new Thread(task);
            thread.setName("log-task-" + i);
            thread.start();
        }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末夹抗,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子纵竖,更是在濱河造成了極大的恐慌漠烧,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,376評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件靡砌,死亡現(xiàn)場(chǎng)離奇詭異已脓,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)乏奥,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,126評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門(mén)摆舟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人邓了,你說(shuō)我怎么就攤上這事恨诱。” “怎么了骗炉?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,966評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵照宝,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我句葵,道長(zhǎng)厕鹃,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,432評(píng)論 1 283
  • 正文 為了忘掉前任乍丈,我火速辦了婚禮剂碴,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘轻专。我一直安慰自己忆矛,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,519評(píng)論 6 385
  • 文/花漫 我一把揭開(kāi)白布请垛。 她就那樣靜靜地躺著催训,像睡著了一般洽议。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上漫拭,一...
    開(kāi)封第一講書(shū)人閱讀 49,792評(píng)論 1 290
  • 那天亚兄,我揣著相機(jī)與錄音,去河邊找鬼采驻。 笑死审胚,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的挑宠。 我是一名探鬼主播菲盾,決...
    沈念sama閱讀 38,933評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼颓影,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼各淀!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起诡挂,我...
    開(kāi)封第一講書(shū)人閱讀 37,701評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤碎浇,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后璃俗,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體奴璃,經(jīng)...
    沈念sama閱讀 44,143評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,488評(píng)論 2 327
  • 正文 我和宋清朗相戀三年城豁,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了苟穆。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,626評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡唱星,死狀恐怖雳旅,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情间聊,我是刑警寧澤攒盈,帶...
    沈念sama閱讀 34,292評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站哎榴,受9級(jí)特大地震影響型豁,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜尚蝌,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,896評(píng)論 3 313
  • 文/蒙蒙 一迎变、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧飘言,春花似錦衣形、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,742評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)泪电。三九已至,卻和暖如春纪铺,著一層夾襖步出監(jiān)牢的瞬間相速,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工鲜锚, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留突诬,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,324評(píng)論 2 360
  • 正文 我出身青樓芜繁,卻偏偏與公主長(zhǎng)得像旺隙,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子骏令,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,494評(píng)論 2 348