ActiveMQ

一. 引言

ActiveMQ擴展出:
API 接受發(fā)送
MQ 的高可用
MQ 的集群容錯配置
MQ 的持久化
延時發(fā)送
簽收機制
Spring/SpringBoot 整合

1、為什么使用MQ?

解決了耦合調(diào)用叉寂、異步模型购岗、抵御洪峰流量,保護了主業(yè)務,削峰谷羞。解耦俗批,削峰,異步!!!
如下的case秘蛔,沒有消息中間件之前陨亡,A、B深员、C负蠕、D四個人來問問題,老師一次只能處理一個人的問題倦畅,處理完A之后遮糖,才能處理B,如果每個人五分鐘叠赐,那么D就要等待15分鐘欲账。添加消息中間件之后,要求學生按照老師的要求將問題都寫在一張紙上燎悍,先交給班長敬惦,班長再交給老師,降低了老師和學生之間的耦合度(解耦)谈山、本來沖向老師的流量都沖向了班長俄删,保護了主業(yè)務(削峰)、班長收集好學生的問題之后,待老師處理完之后再送給學生返回(異步)畴椰。

學生問老師問題case

2臊诊、微服務架構后會有哪些問題?

微服務架構后斜脂,鏈式調(diào)用是我們在寫程序時候的一般流程抓艳,為了完成一個整體功能會將其拆分為多個函數(shù)(或子模塊),比如模塊A調(diào)用模塊B帚戳,模塊B調(diào)用模塊C玷或,模塊C調(diào)用模塊D。但是在大型分布式應用中片任,系統(tǒng)間的RPC交互繁雜偏友,一個功能背后調(diào)用上百個接口并非不可能,從單機架構過度到分布式微服務架構的通例对供。這種架構會出現(xiàn)哪種問題位他??产场?

  • 1鹅髓、系統(tǒng)之間耦合比較嚴重
  • 2、面對大流量并發(fā)時京景,容易被沖垮
  • 3窿冯、等待同步存在性能問題

問題一:每新增一個下游功能,都要對上游的相關接口進行改造:(不行醋粟!我作為廚師做了一個宮保雞丁靡菇,我管你大堂經(jīng)理給誰呢)
舉個例子:加入系統(tǒng)A要發(fā)送數(shù)據(jù)給系統(tǒng)B和C,發(fā)送給每個系統(tǒng)的數(shù)據(jù)可能有差異米愿,因此系統(tǒng)A對要發(fā)送給每個系統(tǒng)的數(shù)據(jù)進行了組裝,然后逐一發(fā)送鼻吮。當代碼上線后又新增了一個需求:把數(shù)據(jù)也發(fā)送給D育苟,新上了一個D系統(tǒng)也要接收A系統(tǒng)的數(shù)據(jù)。此時就需要修改A系統(tǒng)椎木,讓他感知到D的存在违柏,同時把數(shù)據(jù)處理好再給D。在這個過程會看到香椎,每接入一個下游系統(tǒng)漱竖,都要對A系統(tǒng)進行代碼改造,開發(fā)聯(lián)調(diào)的效率很低畜伐。整體架構圖如下:


架構圖

問題二:每個接口的吞吐能力是有限的馍惹,這個上限能力如果是堤壩,當大流量來臨時,容易被沖垮万矾。舉個栗子秒殺業(yè)務:上游系統(tǒng)發(fā)起下單購買操作悼吱,我就是一下單操作,下游系統(tǒng)完成秒殺業(yè)務邏輯:(讀取訂單 > 庫存檢查 > 庫存凍結 > 余額檢查 > 余額凍結 > 訂單生成 > 余額扣減 > 庫存扣減 > 生成流水 > 余額解凍 > 庫存解凍)

問題三:RPC接口基本上是同步調(diào)用良狈,整體的服務性能遵循“木桶理論”后添,即整體系統(tǒng)的耗時取決于鏈路種最慢的那個接口。比如A調(diào)用B/C/D都是50ms薪丁,但此時B又調(diào)用了B1遇西,花費2000ms,那么直接就拖累了整個服務性能严嗜。

架構圖

根據(jù)上述的幾個問題粱檀,在設計系統(tǒng)時可以明確要達到的目標:
1、要做到系統(tǒng)解耦阻问,當新的模塊接進來梧税,可以做到代碼改動最小称近;能夠解耦第队。
2、設置流量緩沖池刨秆,可以讓后端系統(tǒng)按照自身吞吐能力進行消費凳谦,不被沖垮;能夠削峰衡未。
3尸执、強弱以來梳理能將非關鍵詞調(diào)用鏈路的操作異步化提升整體系統(tǒng)的吞吐能力;能夠異步缓醋。

3如失、ActiveMQ的定義

面向消息的中間件(message-oriented-middleware)MOM能夠很好的解決以上問題。是指利用高效可靠的消息傳遞機制進行與平臺無關的數(shù)據(jù)交流送粱,并基于數(shù)據(jù)通信來進行分布系統(tǒng)的集成褪贵,通過提供消息傳遞和消息排隊模型分布式環(huán)境下提供應用解耦、彈性伸縮抗俄、冗余存儲脆丁、流量削峰、異步通信动雹、數(shù)據(jù)同步等功能槽卫。

大致過程如下:

發(fā)送者把消息發(fā)送給消息服務器,消息服務器將消息存放在若干隊列/主題中胰蝠,在合適的時候歼培,消息服務器會將消息轉發(fā)給接收者震蒋,在這個過程中,發(fā)送和接受是異步的丐怯,也就是發(fā)送無需等待喷好,而且發(fā)送者和接受者的生命周期也沒有必然關系。尤其在發(fā)布pub/訂閱sub模式下读跷,也可以完成一對多的通信梗搅,即讓一個消息有多個接受者。

activeMq的過程

4效览、ActiveMQ的特點

一无切、采用異步處理模式

消息發(fā)送者可以發(fā)送一個消息而無需等待響應,消息發(fā)送者將消息發(fā)送到一條虛擬的通道(主題或隊列)上丐枉;消息接收者則訂閱或監(jiān)聽該通道哆键。一條消息可能最終轉發(fā)給一個或多個消息接收者,這些接收者都無需對消息發(fā)送者做出同步回應瘦锹。整個過程都是異步的籍嘹。

也就是說,一個系統(tǒng)跟另外一個系統(tǒng)之間進行通信的時候弯院,假如希望系統(tǒng)A發(fā)送一個消息給系統(tǒng)B辱士,讓他處理。但是系統(tǒng)A不關注系統(tǒng)B到底怎么處理或者有沒有處理好听绳,所以系統(tǒng)A把消息發(fā)送給MQ颂碘,然后就不管這條消息的“死活”了,接著系統(tǒng)B從MQ里消費出來處理即可椅挣。至于怎么處理头岔,是否處理完畢,什么時候處理鼠证,都是系統(tǒng)B的事兒峡竣,與系統(tǒng)A無關。

image.png

二量九、應用系統(tǒng)之間的解耦

發(fā)送者和接受者不必了解對方澎胡,只需要確認消息;
發(fā)送者和接受者不必同時在線娩鹉。

image.png

二、ActiveMQ的安裝

ActiveMQ 的官網(wǎng) : http://activemq.apache.org

Linux三種查看后臺進程的方法:

ps -ef|grep activemq|grep -v grep      // grep -v  grep 可以不讓顯示grep 本來的信息
netstat -anp|grep 61616    // activemq 的默認后臺端口是61616
lsof -i:61616
image.png

安裝步驟:
2.1官網(wǎng)下載
2.2/opt目錄下面
2.3解壓縮apache-activemq-5.15.9-bin.tar.gz
2.4在根目錄下mkdir /myactiveMQ
2.5cp -r apache-activemq-5.15.9 /myactiveMQ/
2.6普通啟動mq:./activemq start/stop
2.7activemq的默認進程端口是61616
2.8帶運行日志的啟動方式 ./activemq start > /myactivemq/myrunmq.log
2.9安裝成功后訪問:192.168.x.x:8161抵達ActiveMQ前臺顯示頁面

安裝過程中遇到的問題見:http://www.reibang.com/p/a52c39859808

成功訪問頁面
JMS編碼總體架構

以上兩個Destination
在點對點的消息傳遞時稚伍,目的地稱為 隊列 queue
在發(fā)布訂閱消息傳遞中弯予,目的地稱為 主題 topic

三、ActiveMQ的初步使用

1个曙、引入依賴

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <!--消息隊列連接池-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.16.1</version>
        </dependency>

1锈嫩、創(chuàng)建生產(chǎn)者


public class producer {

    public static final String ACTIVEMQ_URL = "tcp://192.168.1.115:61616";
    public static final String QUEUE_NAME = "queue01";   // 1對1 的隊列

    public static void main(String[] args) throws JMSException {
        // 1 按照給定的url創(chuàng)建連接工程受楼,這個構造器采用默認的用戶名密碼
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2 通過連接工廠連接 connection  和 啟動
        Connection connection = activeMQConnectionFactory.createConnection();
        // 啟動
        connection.start();
        // 3 創(chuàng)建會話  session
        // 兩個參數(shù),第一個事務呼寸, 第二個簽收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4 創(chuàng)建目的地 (兩種 : 隊列/主題   這里用隊列)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5 創(chuàng)建消息的生產(chǎn)者
        MessageProducer messageProducer = session.createProducer(queue);
        //
        for (int i = 0; i < 3; i++) {
            TextMessage textMessage = session.createTextMessage("MSG" + i);
            messageProducer.send(textMessage);
        }

        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("消息發(fā)布到MQ完成");
    }
}
生產(chǎn)者生產(chǎn)成功后前臺顯示

public class consumer {

    public static final String ACTIVEMQ_URL = "tcp://192.168.1.115:61616";
    public static final String QUEUE_NAME = "queue01";   // 1對1 的隊列

    public static void main(String[] args) throws JMSException {
        // 1 按照給定的url創(chuàng)建連接工程艳汽,這個構造器采用默認的用戶名密碼
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2 通過連接工廠連接 connection  和 啟動
        Connection connection = activeMQConnectionFactory.createConnection();
        // 啟動
        connection.start();
        // 3 創(chuàng)建會話  session
        // 兩個參數(shù),第一個事務对雪, 第二個簽收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4 創(chuàng)建目的地 (兩種 : 隊列/主題   這里用隊列)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5 創(chuàng)建消息的生產(chǎn)者
        MessageConsumer messageConsumer = session.createConsumer(queue);
        while (true) {
            TextMessage textMessage = (TextMessage) messageConsumer.receive();
            if (null != textMessage) {
                System.out.println("消費者接受到消息" + textMessage.getText());
            } else {
                break;
            }
        }
        messageConsumer.close();
        session.close();
        connection.close();

        System.out.println("消息發(fā)布到MQ完成");
    }
}
消費者的三個recieve方法

加了時間就是過時不候河狐,不加就是不離不棄,NoWait盲猜是只限此時瑟捣,如果取不到消息馋艺,直接返回null。

//TODO消費者消費成功前臺頁面顯示

這里的一點經(jīng)驗: activemq 好像自帶負載均衡迈套,當先啟動兩個隊列(Queue)的消費者時捐祠,在啟動生產(chǎn)者發(fā)出消息,此時的消息平均的被兩個消費者消費桑李。 并且消費者不會消費已經(jīng)被消費的消息(即為已經(jīng)出隊的消息)

四踱蛀、JMS

1、JMS基本信息

JMS開發(fā)的基本步驟
image.png

Topic隊列模式:

(1)生產(chǎn)者將消息發(fā)布到topic中贵白,每個消息可以有多個消費者率拒,屬于1:N的關系;
(2)生產(chǎn)者和消費者之間有時間上的相關性戒洼,訂閱某一個主題的消費者只能消費自它訂閱之后發(fā)布的消息俏橘。
(3)生產(chǎn)者生產(chǎn)時,topic不保存消息它是無狀態(tài)的不落地圈浇,假如無人訂閱就去生產(chǎn)寥掐,那就是一條廢消息,所以磷蜀,一般先啟動消費者再啟動生產(chǎn)者召耘。

JMS規(guī)范允許客戶創(chuàng)建持久訂閱,這在一定程度上放松了時間上的相關性要求褐隆,持久訂閱允許消費者消費它在未處于激活狀態(tài)時發(fā)送的消息污它,一句話,好比我們的微信公眾號訂閱庶弃。

topic隊列模式
Topic和Queue的區(qū)別

2衫贬、什么是javaEE?

JAVAEE 是一套使用Java 進行企業(yè)級開發(fā)的13 個核心規(guī)范工業(yè)標準 歇攻, 包括:
(1)JDBC(Java Database) 數(shù)據(jù)庫連接
(2)JNDI(Java Naming and Directory Interfaces)Java的命名和目錄接口
(3)EJB(Enterprise JavaBean)
(4)RMI(Remote Method Invoke)遠程方法調(diào)用 一般使用TCP/IP 協(xié)議
(5)Java IDL(interface Description Language) 接口定義語言
(6)JSP (Java Server Pages)
(7)Servlet
(8)XML (Extensible Markup Language)可擴展標記語言
(9)JMS (Java Message Service)Java 消息服務
(10)JTA (Java Transaction API)Java 事務API
(11)JTS (Java Transaction Service)Java 事務服務
(12)JavaMail
(13)JAF (Java Bean Activation Framework)

3固惯、什么是JMS(Java消息服務):

Java消息服務指的是兩個用程序之間進行異步通信的API,它為標準消息協(xié)議和消息服務提供了一組通用接口缴守,包括創(chuàng)建葬毫、發(fā)送镇辉、讀取消息等,用于支持Java應用程序開發(fā)贴捡,在javaEE中忽肛,當兩個應用程序使用JMS進行通信時,它們之間并不是直接相連的烂斋,而是通過一個共同的消息收發(fā)服務器組件關聯(lián)起來以達到解耦屹逛、異步、削峰的目的源祈。

java消息服務架構
幾種消息隊列的詳細比較

4煎源、JMS組成的四大元素

JMS provider:實現(xiàn)JMS 的消息中間件,也就是MQ服務器
JMS producer:消息生產(chǎn)者香缺,創(chuàng)建和發(fā)送消息的客戶端
JMS consumer:消息消費者手销,接收和處理消息的客戶端
JMS message:JMS 消息,分為消息頭图张、消息屬性锋拖、消息體(重要的)

4.1、消息頭( 5 個常用的消息頭):

① JMSDestination:設置是隊列還是主題

image.png

② JMSDeliveryMode:設置是持久還是非持久祸轮。面試題可能會問到:如何保證消息的可靠性兽埃,持久化虹钮?

一條持久化的消息:應該被傳送"一次僅僅一次"斜做,這就意味著如果JMS提供者出現(xiàn)故障,該消息并不會丟失催蝗,它會在服務器恢復之后再次傳遞苦酱。一條非持久消息:最多會傳送一次售貌,這意味著服務器出現(xiàn)故障,該消息將永遠丟失疫萤。

image.png

③ JMSExpiration:過期時間颂跨,默認永不過期。
④ JMSPriority:優(yōu)先級扯饶,默認是4級恒削,有0~9 ,5-9 是緊急的尾序,0-4 是普通的钓丰。
⑤ JMSMessageId:唯一的消息ID,由消息中間件產(chǎn)生每币。

4.2斑粱、消息體

封裝具體的消息數(shù)據(jù),發(fā)送和接收的消息類型必須一致

可以通過session創(chuàng)建以下五種message

①TextMessage:普通字符串消息脯爪,包含一個String(常用)
②Mapmessage:Map 類型的消息则北,k->Stringv -> Java 基本類型常用)
③BytesMessage:二進制數(shù)組消息,包含一個byte[]
④ StreamMessage:Java 數(shù)據(jù)流消息痕慢,用標準流操作來順序的填充讀取
⑤ObjectMessage:對象消息尚揣,包含一個可序列化的Java 對象

4.3、消息屬性

作用:識別掖举、去重快骗、重點標注
如果需要除消息頭字段以外的值,那么可以使用消息屬性塔次,如下:可以設置Boolean方篮、int、String...的屬性励负,也是以k-v鍵值對存儲的藕溅。消費者在接收時也直接get出來就行了。
比如說想給某個消息設置一個vip继榆,只拿他自己...

使用舉例

5巾表、JMS的可靠性

(1)持久性

// 在隊列為目的地的時候持久化消息
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

// 隊列為目的地的非持久化消息
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

持久化的消息:服務器宕機后消息依舊存在,只是沒有入隊略吨,當服務器再次啟動集币,消息就會被消費。

非持久化的消息:服務器宕機后消息永遠丟失翠忠。 而當沒有注明是否是持久化還是非持久化時鞠苟,默認是持久化的消息

消息生產(chǎn)者:

public class producerTopic {

    public static final String ACTIVEMQ_URL = "tcp://192.168.1.115:61616";
    public static final String TOPIC_NAME = "topic-01";   // 1對1 的隊列

    public static void main(String[] args) throws JMSException {
        // 1 按照給定的url創(chuàng)建連接工程秽之,這個構造器采用默認的用戶名密碼
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2 通過連接工廠連接 connection  和 啟動
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3 創(chuàng)建會話  session
        // 兩個參數(shù)当娱,第一個事務, 第二個簽收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4 創(chuàng)建消息生產(chǎn)者
        Topic topic = session.createTopic(TOPIC_NAME);//類似于公眾號的名字
        MessageProducer messageProducer = session.createProducer(topic);
        //設置為持久化
        messageProducer.setDeliveryDelay(DeliveryMode.PERSISTENT);
        // 5 發(fā)布訂閱|要先進行持久化再啟動了
        connection.start();
        for (int i = 0; i < 3; i++) {
            TextMessage textMessage = session.createTextMessage("msg--persist" + i);
            messageProducer.send(textMessage);
        }
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("主題發(fā)布到MQ完成");
    }
}

消息消費者:

public class consumerTopic {

    public static final String ACTIVEMQ_URL = "tcp://192.168.1.115:61616";
    public static final String TOPIC_NAME = "topic-01";   // 1對1 的隊列

    public static void main(String[] args) throws JMSException {
        System.out.println("**********zs");
        // 1 按照給定的url創(chuàng)建連接工程政溃,這個構造器采用默認的用戶名密碼
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2 通過連接工廠連接 connection  和 啟動
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.setClientID("marry");//表明有一個叫張三的用戶訂閱
        // 3 創(chuàng)建會話  session
        // 兩個參數(shù)趾访,第一個事務, 第二個簽收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4 創(chuàng)建目的地 (兩種 : 隊列/主題   這里用隊列)
        Topic topic = session.createTopic(TOPIC_NAME);
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "remark...");
        connection.start();
        Message message = topicSubscriber.receive();
        while (null != message) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("收到持久化的" + textMessage.getText());
            message = topicSubscriber.receive(5000L);
        }
        session.close();
        connection.close();
    }
}
持久化主題消費者在線
當生產(chǎn)者啟動后
持久化主題消費者下線后

持久化的消息:服務器宕機后消息依舊存在董虱,只是沒有入隊扼鞋,當服務器再次啟動,消息就會被消費愤诱。

非持久化的消息:服務器宕機后消息永遠丟失云头。 而當沒有注明是否是持久化還是非持久化時,默認是持久化的消息淫半。

隊列(queue)持久化:默認是持久化消息的溃槐,此模式保證這些消息只會被傳送一次和成功使用一次。對于這些消息科吭,可靠性是優(yōu)先考慮的因素昏滴『秭辏可靠性的另一個重要方面是確保持久化消息傳遞至目標后,消息服務在向消費者傳送它們之前不會丟失消息谣殊。

主題(topic)持久化:默認就是非持久化的拂共,讓主題的訂閱支持化的意義在于:對于訂閱了公眾號的人來說,當用戶手機關機姻几,在開機后任就可以接受到關注公眾號之前發(fā)送的消息宜狐。(這里不能看成是關注非關注)一定要先運行一次消費者,等于向MQ注冊蛇捌,我關注了這個公眾號抚恒,如果我取關了,一個月后又關注了络拌,這一個月的消息統(tǒng)統(tǒng)推送給你~

(2)事務

事務偏生產(chǎn)者俭驮,簽收偏消費者

createSession的第一個參數(shù)為true 為開啟事務,開啟事務之后必須在將消息提交盒音,才可以在隊列中看到消息表鳍,要不然這個事務就是沒結束。

生產(chǎn)者啟動事務

事務開啟的意義在于祥诽,如果對于多條必須同批次傳輸?shù)南⑵┦ィ梢允褂檬聞眨绻粭l傳輸失敗雄坪,可以將事務回滾厘熟,再次傳輸,保證數(shù)據(jù)的完整性维哈。 對于消息消費者來說绳姨,開啟事務的話,可以避免消息被多次消費阔挠,以及后臺和服務器數(shù)據(jù)的不一致性飘庄。
舉個栗子:
如果消息消費的 createSession 設置為 ture ,但是沒有 commit 购撼,此時就會造成非常嚴重的后果跪削,那就是在后臺看來消息已經(jīng)被消費,但是對于服務器來說并沒有接收到消息被消費迂求,此時就有可能被多次消費碾盐。開啟了事務注意一定要commit!?帧毫玖!生產(chǎn)事務開啟,只有commit后才能將全部消息變?yōu)橐严M。

事務提交
發(fā)生錯誤時事務回滾

(3)簽收(俗稱ack)

Session.AUTO_ACKNOWLEDGE       自動簽收付枫,默認
Session.CLIENT_ACKNOWLEDGE     手動簽收

手動簽收需要acknowledge
textMessage.acknowledge();

手動簽收示例

對于開啟事務時烹玉,設置手動簽收和自動簽收沒有多大的意義,都默認自動簽收励背,也就是說事務的優(yōu)先級更高一些春霍。可能是提交事務時叶眉,底層代碼自動調(diào)用了ack。關閉了事務芹枷,第二個簽收參數(shù)就起效衅疙。如果沒有簽收,就會重復的接收數(shù)據(jù)鸳慈。

(4)簽收和事務的關系

在事務性會話中饱溢,當一個事務被成功提交則消息被自動簽收。如果事務回滾走芋,則消息會被再次傳送绩郎。非事務性會話中,消息何時被確認取決于創(chuàng)建會話的應答模式(acknowledgement mode/是否簽收)翁逞。

6肋杖、JMS小結

1、JMS點對點總結
點對點模型是基于隊列的挖函,生產(chǎn)者發(fā)消息到隊列状植,消費者從對列接收到消息,隊列的存在使得消息的異步傳輸成為可能怨喘。和我們平時給朋友發(fā)送短信類似津畸。
①如果在Session關閉時有部分消息已被收到但還沒有被簽收,那當消費者下次連接到相同隊列時必怜,這些消息還會被再次接收肉拓。
②隊列可以長久保存消息直到消費者收到消息。消費者不需要因為擔心會丟失而時刻和隊列保持激活的連接狀態(tài)梳庆,充分體現(xiàn)了異步傳輸模式的優(yōu)勢暖途。
2、JMS發(fā)布訂閱總結
JMS Pub/Sub 模型定義了如何向一個內(nèi)容發(fā)布和訂閱消息靠益,這些節(jié)點被稱為topic丧肴,主題可以被認為是消息的傳輸中介,發(fā)布者發(fā)布消息到主題胧后,訂閱者從主題訂閱消息芋浮。主題使得消息訂閱者和消息發(fā)布者保持互相獨立,不需要接觸即可保證消息的傳送。
3纸巷、持久訂閱
客戶端先向MQ注冊一個自己的身份ID識別號镇草,當這個客戶端處于離線時,生產(chǎn)者會為這個ID保存所有發(fā)送到主題的消息瘤旨,當客戶再次連接到MQ時會根據(jù)消費者的ID得到所有當自己處于離線時發(fā)送到主題的消息梯啤。
非持久訂閱狀態(tài)下,不能恢復或重新派送一個未簽收的消息存哲,持久訂閱才能恢復或重新派送一個未簽收的消息因宇。
4、非持久訂閱
非持久訂閱只有當客戶端處于激活狀態(tài)祟偷,也就是和MQ保持連接狀態(tài)才能收到發(fā)送到某個主題的消息察滑。如果消費者處于離線狀態(tài),生產(chǎn)者發(fā)送的主題消息將會丟失作廢修肠,消費者永遠也不會收到贺辰。
一句話,非持久訂閱下嵌施,先要訂閱注冊才能接受到發(fā)布饲化,只給訂閱者發(fā)布消息

五、ActiveMQ的broker

1吗伤、是什么

broker 就是實現(xiàn)了用代碼形式啟動 ActiveMQ 吃靠,將 MQ 內(nèi)嵌到 Java 代碼中,可以隨時啟動牲芋,節(jié)省資源撩笆,提高了可靠性。就是將 MQ 服務器作為了 Java 對象

2缸浦、怎么用

使用多個配置文件啟動 activemq

//復制一份原來的配置文件
cp activemq.xml  activemq02.xml 

//以active02 啟動mq 服務器
./activemq start xbean:file:/myactivemq/apache-activemq-5.15.9/conf/activemq02.xml 

3夕冲、嵌入式Broker

①引入pom

<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
  <version>2.9.5</version>
</dependency>

②擼代碼

public class EmbedBroker {

    public static void main(String[] args) throws Exception {
        // broker 服務
        BrokerService brokerService = new BrokerService();
        // 把小型 activemq 服務器嵌入到 java 代碼
        brokerService.setUseJmx(true);
        // 原本的是 192.……  是linux 上的服務器,而這里是本地windows 的小型mq 服務器
        brokerService.addConnector("tcp://loaclhost:61616");
        //開啟
        brokerService.start();
    }
}

六裂逐、Spring整合ActiveMQ

1.pom

<!--  activeMQ  jms 的支持  -->
<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-jms</artifactId>
    <version>4.3.23.RELEASE</version>
</dependency>
<dependency>    <!--  pool 池化包相關的支持  -->
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-pool</artifactId>
  <version>5.15.9</version>
</dependency>

<!--  aop 相關的支持  -->
<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-core</artifactId>
  <version>4.3.23.RELEASE</version>
</dependency>
<dependency>
  <groupId>org.springframework</groupId>
  <artifactId>spring-context</artifactId>
  <version>4.3.23.RELEASE</version>
</dependency>

2.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://camel.apache.org/schema/spring"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
         http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/aop
     http://www.springframework.org/schema/aop/spring-aop.xsd
     http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

    <context:commponent-scan base-package="com.at.activemq"/>
    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"  destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="tcp://192.168.17.3:61616"></property>
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>

    <!-- 隊列目的地 -->
    <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="spring-active-queue"></constructor-arg>
    </bean>


    <!--  jms 的工具類 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsFactory"/>
        <property name="defaultDestination" ref="destinationQueue"/>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>
</beans>

3.生產(chǎn)者

@Service
public class SpringMQ_producer {
    @Autowired
    private JmsTemplate jmsTemplate;
    public static void main(String[] args) {
        ApplicationContext  ctx = new ClassPathXmlApplicationContext("applicationContext.xml");
        SpringMQ_producer producer = (SpringMQ_producer) ctx.getBean("springMQ_Producer");
        producer.jmsTemplate.send((session) -> {
            TextMessage textMessage = session.createTextMessage("spring 和 activemq 的整合");
            return textMessage;
        });
        System.out.println(" *** send task over ***");
    }
}

4.消費者

@Service
public class Spring_MQConsummer {
    @Autowired
    private JmsTemplate jmsTemplate;
    public static void main(String[] args) {
        ApplicationContext  ac = new ClassPathXmlApplicationContext("applicationContext.xml");
        Spring_MQConsummer  sm = (Spring_MQConsummer)ac.getBean("spring_MQConsummer");

        String s = (String) sm.jmsTemplate.receiveAndConvert();
        System.out.println(" *** 消費者消息"+s);
    }
}

可以在spring 中設置監(jiān)聽器歹鱼,不用啟動消費者,就可以自動監(jiān)聽到消息卜高,并處理弥姻,如下:

消費者不啟動,即可通過監(jiān)聽完成

七掺涛、SpringBoot整合ActiveMQ

1庭敦、隊列

①配置pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.5.RELEASE</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu.boot.activemq</groupId>
    <artifactId>boot_mq_produce</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compoler.source>1.8</maven.compoler.source>
        <maven.compoler.target>1.8</maven.compoler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
            <version>2.1.5.RELEASE</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

②配置yml

server:
  port:7777

spring:
  activemq:
    broker-url: tcp://192.168.111.136:61616 #自己的MQ服務器地址,用自己的
    user: admin
    password: admin
  jms:
    pub-sub-domain: false    #false表示Queue   true表示Topic薪缆,不寫默認false

#自己定義的隊列名稱
myqueue: boot-activemq-queue

③配置bean

@Component
@EnableJms
public class ConfigBean {

    //讀取在配置文件中配置的myqueue秧廉,隊列的名字不寫死
    @Value("${myqueue}")
    private String myQueue;

    @Bean
    public Queue queue() {
        return new ActiveMQQueue(myQueue);
    }

}

④生產(chǎn)者代碼

@Component
public class Queue_Produce {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    private Queue queue;

    public void produceMsg() {
        jmsMessagingTemplate.convertAndSend(queue, "*********" + UUID.randomUUID().toString().substring(0, 6));
    }
}

想要調(diào)一次,就往消息服務器上發(fā)一次消息,就要在config里面開啟一個注解@EnableJms疼电,代表開啟了jms適配的注解

⑤啟動類

@SpringBootApplication
public class MainApp_Produce {

    public static void main(String[] args) {

        SpringApplication.run(MainApp_Produce.class, args);

    }
}

⑥要求每3s發(fā)送一條消息
修改生產(chǎn)Service嚼锄,添加一個定時生產(chǎn)的方法

@Component
public class Queue_Produce {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    private Queue queue;

    public void produceMsg() {
        jmsMessagingTemplate.convertAndSend(queue, "*********" + UUID.randomUUID().toString().substring(0, 6));
    }

    //每隔三秒定時投送一條消息
    @Scheduled(fixedDelay = 3000)
    public void produceMsgScheduled() {
        jmsMessagingTemplate.convertAndSend(queue, "Scheduled*********" + UUID.randomUUID().toString().substring(0, 6));
    }
}

修改主啟動類,添加注解@EnableScheduling

@SpringBootApplication
@EnableScheduling //主啟動類開啟定時投送功能
public class MainApp_Produce {

    public static void main(String[] args) {

        SpringApplication.run(MainApp_Produce.class, args);

    }
}

以上兩種點一次發(fā)一次和間隔定投足以解決工作中的大部分問題蔽豺,消費者微服務的配置大概與生產(chǎn)者相同区丑,不需要config的配置,如下消費者業(yè)務代碼

⑦消費者代碼

@Component
public class Queue_Consumer {

    //springboot不需要再編寫監(jiān)聽器修陡,只需要加一個注解JmsListener即可時刻收到生產(chǎn)者的消息
    @JmsListener(destination = "${myqueue}")
    public void receive(TextMessage textMessage) throws JMSException {
        System.out.println("**********收到消息" + textMessage.getText());
    }

}

2沧侥、主題

①pom同上不變
②yml

server:
  port:6666

spring:
  activemq:
    broker-url: tcp://192.168.111.136:61616 #自己的MQ服務器地址,用自己的
    user: admin
    password: admin
  jms:
    pub-sub-domain: true    #false表示Queue   true表示Topic魄鸦,不寫默認false

#自己定義的隊列名稱
myTopic: boot-activemq-topic

③啟動類

@SpringBootApplication
@EnableScheduling
public class Main_Topic_Produce {

    public static void main(String[] args) {
        SpringApplication.run(Main_Topic_Produce.class, args);
    }
}

④配置Bean

@Component
public class ConfigBean {

    @Value("${myTopic}")
    private String topicName;

    @Bean
    public Topic topic() {
        return new ActiveMQTempTopic();
    }
}

⑤生產(chǎn)者代碼

@Component
public class Topic_Produce {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    private Topic topic;

    @Scheduled(fixedDelay = 3000)
    public void produce() {
        jmsMessagingTemplate.convertAndSend(topic, "主題消息:"+UUID.randomUUID().toString().substring(0, 6));
    }
}

消費者微服務的配置大概和生產(chǎn)者相同正什,如下是消費者代碼

⑥消費者代碼

@Component
public class Topic_Consumer {

    @JmsListener(destination = "${myTopic}")
    public void receive(TextMessage textMessage) throws JMSException {
        System.out.println("消費者接收到主題:" + textMessage.getText());

    }
}

八、ActiveMQ的傳輸協(xié)議

1号杏、面試題

①默認的端口61616怎么修改
②生產(chǎn)上的鏈接協(xié)議如何配置的,TCP嗎斯棒?

2盾致、配置

設置協(xié)議的地方是activemq 的activemq.xml配置文件

activemq.xml位置
協(xié)議配置位置

如上默認是使用 openwire 也就是 tcp 協(xié)議,默認的Broker 配置荣暮,TCP 的Client 監(jiān)聽端口 61616 庭惜,在網(wǎng)絡上傳輸數(shù)據(jù),必須序列化數(shù)據(jù)穗酥,消息是通過一個 write protocol 來序列化為字節(jié)流护赊。默認情況 ActiveMQ 會把 wire protocol 叫做 Open Wire ,它的目的是促使網(wǎng)絡上的效率和數(shù)據(jù)快速交互 砾跃。

3骏啰、有哪些協(xié)議

ActiveMQ 支持的協(xié)議有 TCP 、 UDP抽高、NIO判耕、SSL、HTTP(S) 翘骂、VM
參考(https://activemq.apache.org/configuring-version-5-transports.html)

#使用tcp 的一些優(yōu)化方案:
tcp://hostname:port?key=value

它的參數(shù)詳情參考:http://activemq.apache.org/tcp-transport-reference

1.NIO協(xié)議和TCP協(xié)議類似壁熄,但是更偏底層操作,它允許開發(fā)人員對同一資源有更多的client調(diào)用和服務端有更多的負載碳竟。
2.適合NIO 使用的場景:
①當有大量的Client 連接到Broker 上 草丧, 使用NIO 比使用 tcp 需要更少的線程數(shù)量,所以使用 NIO
②可能對于Broker有一個很遲鈍的網(wǎng)絡傳輸莹桅,NIO比TCP提供更好的性能昌执。
3.NIO鏈接的URI形式:nio://hostname:port?key=value
4.Transport Connector配置示例,參考官網(wǎng):http://activemq.apache.org/configuring-version-5-transports.html

activemq支持的網(wǎng)絡協(xié)議

4、NIO案例演示

先停掉activemq仙蚜,然后在activemq.xml添加以下配置

 <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumCon    nections=1000&amp;wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnect    ions=1000&amp;wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConn    ections=1000&amp;wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnect    ions=1000&amp;wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnection    s=1000&amp;wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true"/>   <!-- 這是添加的 -->
 </transportConnectors>
添加NIO協(xié)議后

使用 NIO 協(xié)議后此洲,代碼修改量極小,只需同時將消息生產(chǎn)者和消費者的 URL 修改即可:
//public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616";
public static final String ACTIVEMQ_URL = "nio://192.168.17.3:61618";

如果不特別指定ActiveMQ的網(wǎng)絡監(jiān)聽端口委粉,那么這些端口都將使用BIO網(wǎng)絡的IO模型呜师。(OpenWire,STOMP贾节,AMQP......)汁汗,所以為了首先提高單節(jié)點的網(wǎng)絡吞吐性能,需要明確指定Active的網(wǎng)絡IO模型栗涂,BIO中B其實就是Blocked(阻塞)知牌,BIO就是阻塞的IO,NIO就是非阻塞的IO斤程。

5角寸、NIO案例增強

image.png

URI 格式以 nio 開頭,表示這個端口使用 tcp 協(xié)議為基礎的NIO 網(wǎng)絡 IO 模型忿墅,但這樣設置它只支持 tcp 扁藕、 nio 的連接協(xié)議。如何讓它支持多種協(xié)議疚脐?

Starting with version 5.13.0, ActiveMQ supports wire format protocol detection. OpenWire, STOMP, AMQP, and MQTT can be automatically detected. This allows one transport to be shared for all 4 types of clients.

配置方法: http://activemq.apache.org/auto

使用 auto 的方式就相當于四合一協(xié)議 : STOMP AMQP MQTT TCP NIO

<transportConnector name="auto+nio" uri="auto+nio://localhost:5671"/>
<transportConnector name="auto" uri="auto://localhost:5671?auto.protocols=default,stomp"/>

將activemq.xml的配置該為如下:

<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000
&amp;wireFormat.maxFrameSize=104857600&amp;org.apache.activemq.transport.nio.SelectorManager.corelPoolSize=20
&amp;org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50"/>

此時再啟動activemq

配置nio+auto的結果
public static final String ACTIVEMQ_URL = "nio://192.168.17.3:61608";

其實就是一個端口適配多種協(xié)議奠蹬,同樣代碼只需修改 URI 废膘,對于 nio 和 tcp 的只需要修改如上URI即可,但不代表使用其他協(xié)議代碼相同,因為底層配置不同腊尚,其他協(xié)議如果使用需要去修改代碼

九帅韧、ActiveMQ的消息存儲和持久化

1鸳兽、什么是MQ的持久化

官網(wǎng) : http://activemq.apache.org/persistence

為了避免意外宕機以后丟失消息待讳,需要重啟后可以恢復消息隊列,消息系統(tǒng)一般都會采用持久化機制役耕。ActiveMQ的消息持久化機制有JDBC采转、AMQ、KahaDB和LevelDB瞬痘,無論采用哪種方式故慈,消息的存儲邏輯都是一致的。

就是在發(fā)送者將消息發(fā)送出去后框全,消息中心首先將消息存儲到本地數(shù)據(jù)文件察绷、內(nèi)存數(shù)據(jù)庫或者遠程數(shù)據(jù)庫等再試圖將消息發(fā)送給接收者,成功則將消息從存儲中刪除津辩,失敗則繼續(xù)嘗試發(fā)送拆撼。消息中心啟動以后首先要檢查指定的存儲位置容劳。如果有未發(fā)送成功的消息,則需要把消息發(fā)送出去闸度。

2竭贩、幾種MQ持久化的方式

持久化方案
①AMQ (了解)

AMQ是基于文件存儲形式,寫入快莺禁、易恢復 默認 32M 在 ActiveMQ 5.3 之后不再適用留量。

②KahaDB :

(1)KahaDB在5.4 之后基于日志文件的持久化插件,默認持久化插件哟冬,提高了性能和恢復能力

(2)KahaDB的參數(shù)配置參考官網(wǎng):https://activemq.apache.org/kahadb

kahaDB在activeMq中的配置
kahadb的位置

(3)KahaDB 的存儲原理
KahaDB 的屬性配置 : http://activemq.apache.org/kahadb

KahaDB是默認的存儲方式楼熄,可用于任何場景,提高了性能和恢復能力浩峡。消息存儲使用一個事務日志和僅僅用一個索引文件來存儲他所有的地址可岂。KahaDB是一個專門針對消息持久化的解決方案,它對典型的消息使用模式進行了優(yōu)化翰灾。數(shù)據(jù)被追加到data logs中缕粹。當不再需要log文件中的數(shù)據(jù)的時候,log文件會被丟棄纸淮。

kahadb文件中的內(nèi)容

db-<數(shù)字>.log: 存儲數(shù)據(jù)致开,一個存滿會再次創(chuàng)建 db-2 db-3 …… ,當不會有引用到數(shù)據(jù)文件的內(nèi)容時萎馅,文件會被刪除或歸檔
db.data: 是一個BTree 索引,索引了消息數(shù)據(jù)記錄的消息虹蒋,是消息索引文件糜芳,它作為索引指向了 db-<x>.log 里的消息
db.free: 存儲空閑頁 ID 有時會被清除
db.redo: 當 KahaDB 消息存儲在強制退出后啟動,用于恢復 BTree 索引
lock: 顧名思義就是鎖 魄衅,表示獲取到當前讀取權限的broker峭竣。

(一點題外話:就像mysql 數(shù)據(jù)庫,新建一張表晃虫,就有這個表對應的 .MYD 文件皆撩,作為它的數(shù)據(jù)文件,就有一個 .MYI 作為索引文件哲银。)

四類文件 + 一把鎖 ==>> KahaDB

③JDBC消息存儲

(1)修改配置文件扛吞,默認 kahaDB

##修改之前:
<persistenceAdapter>
       <kahaDB directory="${activemq.data}/kahadb"/>  
 </persistenceAdapter>
##修改之后:
<persistenceAdapter>
      <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
 </persistenceAdapter>

(2)在activemq 的lib 目錄下添加 jdbc 的jar 包 ,如果用了druid還要導入druid的jar包
(3)改配置文件 : activemq.xml荆责,配置位置如下</broker>之后滥比,<import>之前,使其連接自己windows 上的數(shù)據(jù)庫做院,并在本地創(chuàng)建名為activemq 的數(shù)據(jù)庫

配置位置
詳細配置

(4)讓linux 上activemq 可以訪問到 mysql盲泛,之后產(chǎn)生消息濒持。ActiveMQ 啟動后會自動在 mysql 的activemq 數(shù)據(jù)庫下創(chuàng)建三張表:activemq_msgs 、activemq_acks寺滚、activemq_lock

activemq_msgs:用于存儲消息柑营,Queue和Topic都存儲在這個表中

activemq_msgs

activemq_acks:用于存儲訂閱關系。如果是持久化Topic村视,訂閱者和服務器的訂閱關系在這個表保存

activemq_acks

activemq_lock:在集群環(huán)境中才有用官套,只有一個Broker可以獲得消息,稱為Master Broker蓖议,此表用于記錄當前哪一個broker作為Master Broker

activemq_lock

如果新建數(shù)據(jù)庫OK+上述配置OK+代碼運行OK虏杰,上述三表會自動生成,萬一情況勒虾,手動建表SQL如下:

-- auto-generated definition
create table ACTIVEMQ_ACKS
(
    CONTAINER     varchar(250)     not null comment '消息的Destination',
    SUB_DEST      varchar(250)     null comment '如果使用的是Static集群纺阔,這個字段會有集群其他系統(tǒng)的信息',
    CLIENT_ID     varchar(250)     not null comment '每個訂閱者都必須有一個唯一的客戶端ID用以區(qū)分',
    SUB_NAME      varchar(250)     not null comment '訂閱者名稱',
    SELECTOR      varchar(250)     null comment '選擇器,可以選擇只消費滿足條件的消息修然,條件可以用自定義屬性實現(xiàn)笛钝,可支持多屬性AND和OR操作',
    LAST_ACKED_ID bigint           null comment '記錄消費過消息的ID',
    PRIORITY      bigint default 5 not null comment '優(yōu)先級,默認5',
    XID           varchar(250)     null,
    primary key (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)
)
    comment '用于存儲訂閱關系愕宋。如果是持久化Topic玻靡,訂閱者和服務器的訂閱關系在這個表保存';

create index ACTIVEMQ_ACKS_XIDX
    on ACTIVEMQ_ACKS (XID);

 
-- auto-generated definition
create table ACTIVEMQ_LOCK
(
    ID          bigint       not null
        primary key,
    TIME        bigint       null,
    BROKER_NAME varchar(250) null
);

 
-- auto-generated definition
create table ACTIVEMQ_MSGS
(
    ID         bigint       not null
        primary key,
    CONTAINER  varchar(250) not null,
    MSGID_PROD varchar(250) null,
    MSGID_SEQ  bigint       null,
    EXPIRATION bigint       null,
    MSG        blob         null,
    PRIORITY   bigint       null,
    XID        varchar(250) null
);

create index ACTIVEMQ_MSGS_CIDX
    on ACTIVEMQ_MSGS (CONTAINER);

create index ACTIVEMQ_MSGS_EIDX
    on ACTIVEMQ_MSGS (EXPIRATION);

create index ACTIVEMQ_MSGS_MIDX
    on ACTIVEMQ_MSGS (MSGID_PROD, MSGID_SEQ);

create index ACTIVEMQ_MSGS_PIDX
    on ACTIVEMQ_MSGS (PRIORITY);

create index ACTIVEMQ_MSGS_XIDX
    on ACTIVEMQ_MSGS (XID);

(5)小總結

在點對點類型中:當DeliveryMode設置為NON_PERSISTENCE時,消息被保存在內(nèi)存中中贝;當DeliveryMode設置為PERSISTENCE時囤捻,消息保存在broker的相應的文件或者數(shù)據(jù)庫中,而且點對點類型中一旦被consumer消費就從broker中刪除邻寿。點對點會在數(shù)據(jù)庫的數(shù)據(jù)表 ACTIVEMQ_MSGS 中加入消息的數(shù)據(jù)蝎土,且在點對點時,消息被消費就會從數(shù)據(jù)庫中刪除 绣否。
但是對于主題誊涯,訂閱方式接受到的消息,會在 ACTIVEMQ_MSGS 存儲消息蒜撮,即使MQ 服務器下線暴构,并在 ACTIVEMQ_ACKS 中存儲消費者信息。并且存儲以 activemq 為主段磨,當activemq 中的消息被刪除后取逾,數(shù)據(jù)庫中的也會自動被刪除。

(6)開發(fā)中的坑

注意點1:要將使用到的jar文件房知道ActiveMQ安裝目錄下的lib中苹支。包括mysql_jdbc和對應的連接池的jar包
注意點2:在jdbcPersistenceAdapter標簽中設置了createTableOnStartup屬性為true時候菌赖,第一次啟動ActiveMQ時,將會自動創(chuàng)建數(shù)據(jù)表沐序,但是啟動之后可以更改為false
下劃線坑爹:java.lang.IllageStateException:BeanFactory not initialized or aleady closed:這是因為您的操作機器中有_符號琉用,更改機器名后重啟就行堕绩。

④LevalDB (了解)

LeavelDB : 希望作為以后的存儲引擎,5.8 以后引進邑时,也是基于文件的本地數(shù)據(jù)存儲形式奴紧,但是比 KahaDB 更快,它比KahaDB 更快的原因是她不使用BTree 索引晶丘,而是使用本身自帶的 LeavelDB 索引
題外話:為什么LeavelDB 更快黍氮,并且5.8 以后就支持,為什么還是默認 KahaDB引擎浅浮,因為activemq 官網(wǎng)本身沒有定論沫浆,LeavelDB之后又出了可復制的LeavelDB比LeavelDB 更性能更優(yōu)越,但需要基于 Zookeeper 所以這些官方還沒有定論滚秩,默認就使用 KahaDB

⑤JDBC Message store with ActiveMQ Journal

這種方式克服了JDBC Store的不足专执,JDBC每次消息過來,都需要去寫庫讀庫郁油。ActiveMQ Journal本股,使用高速緩存寫入技術,大大提高了性能桐腌。當消費者的速度能夠及時跟上生產(chǎn)者消息的生產(chǎn)速度時拄显,journal文件能夠大大減少需要寫入到DB中的消息。

舉個例子:生產(chǎn)者生產(chǎn)了1000條消息案站,這1000條消息會保存到journal文件躬审,如果消費者的消費速度很快的情況下,在journal文件還沒有同步到DB之前蟆盐,消費者已經(jīng)消費了90%的以上消息盒件,那么這個時候只需要同步剩余的10%的消息到DB。如果消費者的速度很慢舱禽,這個時候journal文件可以使消息以批量方式寫到DB。

為了高性能恩沽,這種方式使用日志文件存儲+數(shù)據(jù)庫存儲誊稚。先將消息持久到日志文件,等待一段時間再將未消費的消息持久到數(shù)據(jù)庫罗心。該方式要比JDBC性能要高里伯。

配置修改
配置修改

3、幾種持久化方式總結

① jdbc效率低渤闷,kahaDB效率高疾瓮,jdbc+Journal效率較高。
② 持久化消息主要指的是:MQ所在服務器宕機了消息不會丟試的機制飒箭。
③ 持久化機制演變的過程:
從最初的AMQ Message Store方案到ActiveMQ V4版本退出的High Performance Journal(高性能事務支持)附件狼电,并且同步推出了關于關系型數(shù)據(jù)庫的存儲方案蜒灰。ActiveMQ5.3版本又推出了對KahaDB的支持(5.4版本后被作為默認的持久化方案),后來ActiveMQ 5.8版本開始支持LevelDB肩碟,到現(xiàn)在5.9提供了標準的Zookeeper+LevelDB集群化方案强窖。

十、ActiveMQ的多節(jié)點集群

1削祈、引入消息隊列后如何保證高可用翅溺?

2、是什么:

基于ZooKeeper和LevelDB搭建ActiveMQ集群髓抑。集群僅供主備方式的高可用集群功能咙崎,避免單點故障。

3吨拍、zookeeper+replicated+leveldb-store的主從集群(三種方式)

①基于shareFileSystem共享文件
②基于JDBC
③基于可復制的LevelDB

集群官網(wǎng)參考: http://activemq.apache.org/replicated-leveldb-store

image.png

這幅圖的意思就是 當 Master 宕機后褪猛,zookeper 監(jiān)測到?jīng)]有心跳信號, 則認為 master 宕機了密末,然后選舉機制會從剩下的 Slave 中選出一個作為新的 Master

配置流程
4.三臺機器
5.創(chuàng)建三臺集群目錄握爷。新建 /mq_cluster 將原始的解壓文件復制三個

6.在jetty.xml修改端口

7.hostname名字映射

vim /etc/hosts

將最下面修改為而自己的IP地址和地址映射

8、activemq集群配置

首先三臺服務器的路徑
然后將broker的名字改為相同
最后進行三個節(jié)點的持久化配置

修改broker名字
將broker名字修改為相同

三個節(jié)點的持久化配置參考官網(wǎng):https://activemq.apache.org/replicated-leveldb-store.html

修改持久化配置:

<persistenceAdapter>
    <replicatedLevelDB
      directory="{activemq.data}/leveldb"
      replicas="3"
      bind="tcp://0.0.0.0:63631"
      zkAddress="localhost:2191,localhost:2192,localhost:2193"
      zkPassword="123456"
      sync="local_disk"
      zkPath="/activemq/leveldb-stores"
      hostname="wh-mq-server"
      />
  </persistenceAdapter>

三臺服務器都要修改如上的配置严里,只不過將端口號改了就好

9.修改各節(jié)點的消息端口

image.jpeg

10新啼、按順序啟動三個Activemq節(jié)點,首先是zk集群已經(jīng)成功啟動運行了的

主要說zookeper 復制三份后改配置文件刹碾,并讓之自動生成 myid 文件燥撞,并將zk的端口改為之前表格中對應的端口 。修改conf 下的配置文件zoo.cfg

zoo.cfg
#配置如下
tickTime=2000
initLimit=10
syncLimit=5
clientPort=2191    // 自行設置
server.1=192.168.17.3:2888:3888
server.2=192.168.17.3:2887:3887
server.3=192.168.17.3:286:3886
dataDir=/zk_server/data/log1    // 自行設置

修改完成后就可以啟動zk了迷帜,批處理啟動zk的腳本如下

#!/bin/sh

cd /zk_server/zk_01/bin
./zkServer.sh  start

cd /zk_server/zk_02/bin
./zkServer.sh  start

cd /zk_server/zk_03/bin
./zkServer.sh  start  

編寫這個 zk_batch.sh 之后

#命令即可讓它變?yōu)榭蓤?zhí)行腳本
chmod  700    zk_batch.sh
#啟動了三個zk 的服務
 ./zk_batch.sh   start  即可

同理可以寫一個批處理關閉zk 服務的腳本和 批處理開啟mq 服務 關閉 mq 服務的腳本物舒。

#完成上述之后連接zk 的一個客戶端
./zkCli.sh -server 127.0.0.1:2191
三個activemq已經(jīng)掛上
也有可能名字不是如上
#如果要查看哪一個是master,可以使用如下命令
get /activemq/leveldb-stores/00000000003
image.jpeg

此次驗證表明 00000003 的節(jié)點狀態(tài)是master (即為63631 的那個mq 服務) 而其余的(00000004 00000005) activemq 的節(jié)點是 slave戏锹。此時集群已經(jīng)順利搭建成功 冠胯!

測試可用性

image.jpeg

如上所示:此次測試表明只有 8161 的端口可以使用 經(jīng)測試只有 61 可以使用,也就是61 代表的就是master

測試集群可用性:

集群可用性測試

生產(chǎn)者和消費者都修改如下代碼:

public static final String ACTIVEMQ_URL = "failover:(tcp://192.168.17.3:61616,
tcp://192.168.17.3:61617,tcp://192.168.17.3:61618)?randomize=false";

public static final String QUEUE_NAME = "queue_cluster";
測試通過連上61616
MQ服務收到三條消息
接受到消息
MQ將消息出列
將一臺activemq殺死
8161掛掉
8162被activemq選出锦针,成為新的Master
自動連接到 61617

至此:activemq集群測試成功

十一荠察、ActiveMQ的高級特性和大廠面試重點

1、引入消息隊列后 如何保證高可用性

持久化奈搜、事務悉盆、簽收、 以及帶復制的 Leavel DB + zookeeper 主從集群搭建

2馋吗、 異步投遞 Async send

(官網(wǎng)參考)https://activemq.apache.org/async-sends

對于一個慢消費者焕盟,使用同步有可能造成堵塞,消息消費較慢時適合用異步發(fā)送消息宏粤。activemq 支持同步異步發(fā)送的消息脚翘,默認異步灼卢。如果沒有使用事務,且發(fā)送的是持久化消息堰怨,這樣是同步的芥玉,每次發(fā)送都會阻塞一個生產(chǎn)者直到 broker 發(fā)回一個確認,這樣做保證了消息的安全送達备图,但是會阻塞客戶端灿巧,造成很大延時 。

在高性能要求下揽涮,可以使用異步提高producer的性能抠藕。但會消耗較多的client端內(nèi)存,也不能完全保證消息發(fā)送成功蒋困。在useAsyncSend = true 情況下需要容忍消息丟失的可能盾似。

三種開啟的方式如下:(參考自官網(wǎng))

  • 1、 Configuring Async Send using a Connection URI(url 后面加參數(shù))

You can use the Connection Configuration URI to configure async sends as follows

cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");

  • 2雪标、Configuring Async Send at the ConnectionFactory Level(開啟ActiveMQConnectionFactory 的Async 為true)

You can enable this feature on the ActiveMQConnectionFactory object using the property.

((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
  • 3零院、Configuring Async Send at the Connection Level(將ActiveMQConnection 設Async 為true)

Configuring the dispatchAsync setting at this level overrides the settings at the connection factory level.You can enable this feature on the ActiveMQConnection object using the property.

((ActiveMQConnection)connection).setUseAsyncSend(true);

3、 異步發(fā)送如何確保發(fā)送成功

異步發(fā)送消息丟失的情況場景是: UseAsyncSend 為 true 使用 producer(send)持續(xù)發(fā)送消息村刨,消息不會阻塞告抄,生產(chǎn)者會認為所有的 send 消息均會被發(fā)送到 MQ ,如果MQ 突然宕機嵌牺,此時生產(chǎn)者端尚未同步到 MQ 的消息均會丟失打洼。

所以:正確的異步發(fā)送方法需要接收回調(diào) 。同步發(fā)送和異步發(fā)送的區(qū)別就在于——同步發(fā)送send 不阻塞就代表消息發(fā)送成功逆粹。異步發(fā)送需要接收回執(zhí)并又客戶端在判斷一次是否發(fā)送

代碼截圖1
代碼截圖2
//其他代碼不變
activeMQConnectionFactory.setUseAsyncSend(true);
……  
    
 for (int i = 1; i < 4 ; i++) {
         textMessage = session.createTextMessage("msg--" + i);
      textMessage.setJMSMessageID(UUID.randomUUID().toString()+"--  orderr");
     String msgid = textMessage.getJMSMessageID();
            messageProducer.send(textMessage, new AsyncCallback() {
                @Override
                public void onSuccess() {
                    // 發(fā)送成功怎么樣
                    System.out.println(msgid+"has been successful send ");
                }

                @Override
                public void onException(JMSException e) {
                    // 發(fā)送失敗怎么樣
                    System.out.println(msgid+" has been failure send ");
                }
            });
}    

4募疮、延遲投遞和定時投遞

官網(wǎng)參考文檔

第一步:如官網(wǎng)所說,設置schedulerSupport為true

schedulerSupport
延遲投遞設置的參數(shù)

第二步:修改代碼如下

long delay = 3 * 1000 ;
long perid = 4 * 1000 ;
int repeat = 7 ;
for (int i = 1; i < 4 ; i++) {
    TextMessage textMessage = session.createTextMessage("delay msg--" + i);
    // 消息每過 3 秒投遞僻弹,每 4 秒重復投遞一次 阿浓,一共重復投遞 7 次
    textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay);
    textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,perid);
    textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat);

    messageProducer.send(textMessage);
}

完整代碼:

package com.activemq.demo;

import org.apache.activemq.*;
import javax.jms.*;
import java.util.UUID;

public class Jms_TX_Producer {

    private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";

    private static final String ACTIVEMQ_QUEUE_NAME = "Schedule01";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageProducer messageProducer = session.createProducer(queue);
        long delay =  10*1000;
        long period = 5*1000;
        int repeat = 3 ;
        try {
            for (int i = 0; i < 3; i++) {
                TextMessage textMessage = session.createTextMessage("tx msg--" + i);
                // 延遲的時間
                textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
                // 重復投遞的時間間隔
                textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
                // 重復投遞的次數(shù)
                textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
                // 此處的意思:該條消息,等待10秒蹋绽,之后每5秒發(fā)送一次芭毙,重復發(fā)送3次。
                messageProducer.send(textMessage);
            }
            System.out.println("消息發(fā)送完成");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            messageProducer.close();
            session.close();
            connection.close();
        }
    }
}

5蟋字、activemq的消息重試機制

官網(wǎng)參考文檔

5.2、具體哪些情況會導致activemq的消息重發(fā)

  • 1扭勉、客戶端用了事務鹊奖,并且調(diào)用rollback()。
  • 2涂炎、客戶端用了事務忠聚,但是未commit()设哗。
  • 3、客戶端在CLIENT_ACK情況下两蟀,調(diào)用了session的recover()网梢。
  • 4、投遞失敗赂毯,標記有毒消息战虏,DLQ死性隊列。

5.3党涕、RedeliveryPolicy(消息重試)屬性說明

屬性

修改重試次數(shù)為3(默認是6)烦感。更多的設置請參考官網(wǎng)文檔。

public class Jms_TX_Consumer {
    private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    private static final String ACTIVEMQ_QUEUE_NAME = "dead01";

    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 修改默認參數(shù)膛堤,設置消息消費重試3次
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(3);
        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageConsumer messageConsumer = session.createConsumer(queue);
        messageConsumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("***消費者接收到的消息:   " + textMessage.getText());
                        //session.commit();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        });
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

在spring中使用重發(fā)機制

spring中使用重發(fā)機制

5.3手趣、請說說消息重發(fā)時間間隔和重發(fā)次數(shù)

  1. 處理失敗指的是MessageListener的onMessage方法里拋出RuntimeException。
  2. Message頭里有兩個相關字段:Redelivered默認為false肥荔,redeliveryCounter默認為0绿渣。
  3. 消息先由broker發(fā)送給consumer,consumer調(diào)用listener燕耿,若是處理失敗中符,本地redeliveryCounter++,給broker一個特定應答缸棵,broker端的message里redeliveryCounter++舟茶,延遲一點時間繼續(xù)調(diào)用,默認1s堵第。超過6次吧凉,則給broker另外一個特定應答,broker就直接發(fā)送消息到DLQ踏志。內(nèi)存
  4. 若是失敗2次阀捅,consumer重啟,則broker再推過來的消息里针余,redeliveryCounter=2饲鄙,本地只能再重試4次即會進入DLQ。
  5. 重試的特定應答發(fā)送到broker圆雁,broker即會在內(nèi)存將消息的redelivered設置為true忍级,redeliveryCounter++,可是這兩個字段都沒有持久化伪朽,即沒有修改存儲中的消息記錄轴咱。因此broker重啟時這兩個字段會被重置為默認值。

默認是:間隔每1秒鐘重發(fā)一次,共計6次朴肺。6次的意思是窖剑,當消息重試機制被觸發(fā),第一次戈稿,第二次....第六次消費者都能接收到消息西土,但是當?shù)谄撸ㄊ∫院?+6=7次)次發(fā)送消息的時候就會被發(fā)送到死信隊列中,不在進行接收消息鞍盗。

5.4需了、有毒消息Poison ACK 談談你的理解

一個消息被redelivedred超過默認的最大重發(fā)次數(shù)(默認6次)時,消費者會個MQ發(fā)一個“poison ack”表示這個消息有毒橡疼,告訴broker不要再發(fā)了援所。這個時候broker會把這個消息放到DLQ(死信隊列)。

6欣除、死信隊列

6.1是什么

官網(wǎng)文檔: http://activemq.apache.org/redelivery-policy
死信隊列:異常消息規(guī)避處理的集合住拭,主要處理失敗的消息。

死信隊列的使用

在業(yè)務邏輯中历帚,如果一個訂單系統(tǒng)沒有問題滔岳,則使用正常的業(yè)務隊列,當出現(xiàn)問題挽牢,則加入死信隊列 谱煤,此時可以選擇人工干預還是機器處理 。

6.2死信隊列的配置(一般采用默認的)

死信隊列默認是全部共享的禽拔,但是也可以設置獨立的死信隊列

死信隊列的獨立設置
直接刪除而不存儲到死信隊列
存放非持久消息到死信隊列

7刘离、冪等性,即為保證消息不被重復消費

冪等性睹栖,即為保證消息不被重復消費硫惕。在網(wǎng)絡傳輸延遲中,會造成MQ的重試中,在重試中,可能會造成重復消費嬉愧。
解決方案:
如果這個消息是做數(shù)據(jù)庫的插入操作,給這個消息做一個唯一主鍵豁辉,就算是出現(xiàn)了重復消費和情況,就會導致主鍵沖突舀患。實在不行徽级,準備一個第三方的服務去做消費記錄,以redis為例聊浅,給消息分配一個id餐抢,只要是消費過該消息堵幽,會以<id,message>以k-v形式寫入redis,先去redis中查詢有無消費記錄即可弹澎。

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市努咐,隨后出現(xiàn)的幾起案子苦蒿,更是在濱河造成了極大的恐慌,老刑警劉巖渗稍,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件佩迟,死亡現(xiàn)場離奇詭異,居然都是意外死亡竿屹,警方通過查閱死者的電腦和手機报强,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來拱燃,“玉大人秉溉,你說我怎么就攤上這事⊥胗” “怎么了召嘶?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長哮缺。 經(jīng)常有香客問我弄跌,道長,這世上最難降的妖魔是什么尝苇? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任铛只,我火速辦了婚禮,結果婚禮上糠溜,老公的妹妹穿的比我還像新娘淳玩。我一直安慰自己,他們只是感情好诵冒,可當我...
    茶點故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布凯肋。 她就那樣靜靜地躺著,像睡著了一般汽馋。 火紅的嫁衣襯著肌膚如雪侮东。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天豹芯,我揣著相機與錄音悄雅,去河邊找鬼。 笑死铁蹈,一個胖子當著我的面吹牛宽闲,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼容诬,長吁一口氣:“原來是場噩夢啊……” “哼娩梨!你這毒婦竟也來了?” 一聲冷哼從身側響起览徒,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤狈定,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后习蓬,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體纽什,經(jīng)...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年躲叼,在試婚紗的時候發(fā)現(xiàn)自己被綠了芦缰。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,039評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡枫慷,死狀恐怖让蕾,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情或听,我是刑警寧澤涕俗,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站神帅,受9級特大地震影響再姑,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜找御,卻給世界環(huán)境...
    茶點故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一元镀、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧霎桅,春花似錦栖疑、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至揭糕,卻和暖如春萝快,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背著角。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工揪漩, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人吏口。 一個月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓奄容,卻偏偏與公主長得像冰更,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子昂勒,可洞房花燭夜當晚...
    茶點故事閱讀 42,786評論 2 345

推薦閱讀更多精彩內(nèi)容