Activemq 高性能開發(fā)總結(jié)

最近在開發(fā)過程中由于資源有限凰锡,要對Activemq進(jìn)行高性能處理措近。這里我們只說開發(fā)斯撮。
由下圖可以看到,Activemq是由Connection翎猛、Session、Producer接剩、Consumer切厘、Destination組成。Destination是信息的載體懊缺,通過Producer發(fā)出疫稿,再由Consumer接收。Connection和Session之間是1對多關(guān)系鹃两,Session 和 Producer/Consumer之間也是1對多關(guān)系遗座。程序創(chuàng)建Connection 、Session俊扳、Producer/Consumer都是要消耗服務(wù)器資源的途蒋。正常情況下每個Producer/Consumer都是可以永久使用的。出于網(wǎng)絡(luò)等一些不確定因素造成Producer/Consumer適時失效馋记,這時我們就需要使用到Session池和Producer/Consumer池号坡。在連接恢復(fù)正常后保證數(shù)據(jù)正常提交。這里我提供了10000并發(fā)的Producer實例梯醒,請大家參考宽堆。


image.png
public class MqProducer : BaseMq
{
    /// <summary>
    /// Session池
    /// </summary>
    List<ISession> sessions = new List<ISession>();
    /// <summary>
    /// Producer池
    /// </summary>
    List<IMessageProducer> Producers = new List<IMessageProducer>();

    public List<ProducerPool> ProducerPools { get; set; } = new List<ProducerPool>();
    /// <summary>
    /// 最大并發(fā)
    /// </summary>
    int SessionCounter = 0;
    /// <summary>
    /// 持久化
    /// </summary>
    public bool IsStore { get; set; } = false;
    /// <summary>
    /// 消息類型 Queue/Topic
    /// </summary>
    public string messageType { get; set; }
    /// <summary>
    /// 消息名稱
    /// </summary>
    public string messageName { get; set; }

    /// <summary>
    /// 默認(rèn)參數(shù)方便用戶進(jìn)行軟件測試
    /// </summary>
    /// <param name="url"></param>
    /// <param name="user"></param>
    /// <param name="password"></param>
    public MqProducer(string url = "activemq:failover:(tcp://127.0.0.1:61616)", string user = "admin", string password = "admin")
    {
        Url = url;
        User = user;
        Pwd = password;

        factory = new NMSConnectionFactory(Url);
        connection = factory.CreateConnection(User, Pwd);
        connection.ExceptionListener += ConnectionException;
    }

    /// <summary>
    /// 創(chuàng)建連接
    /// </summary>
    /// <param name="url"></param>
    /// <param name="user"></param>
    /// <param name="password"></param>
    /// <returns></returns>
    public IConnection CreateConnection(string url = "", string user = "", string password = "")
    {
        Url = string.IsNullOrWhiteSpace(url) ? Url : url;
        User = string.IsNullOrWhiteSpace(user) ? User : user;
        Pwd = string.IsNullOrWhiteSpace(password) ? Pwd : password;
        if (string.IsNullOrWhiteSpace(Url) || string.IsNullOrWhiteSpace(User) || string.IsNullOrWhiteSpace(Pwd)) return null;

        factory = new NMSConnectionFactory(Url);
        connection = factory.CreateConnection(User, Pwd);

        return connection;
    }
    /// <summary>
    /// 
    /// </summary>
    /// <param name="type"></param>
    /// <param name="name"></param>
    /// <returns></returns>
    public async Task<IMessageProducer> CreateProducer(string type = "queue", string name = "")
    {
        messageName = string.IsNullOrWhiteSpace(name) ? messageName : name.ToLower();
        messageType = string.IsNullOrWhiteSpace(type) ? messageType : type.ToLower();

        var pool = await GetProducerPool(type, name);
        return pool.Producer;
    }
    /// <summary>
    /// 發(fā)送文本內(nèi)容
    /// </summary>
    /// <param name="content"></param>
    public async Task SendMessage(string content)
    {
        ProducerPool producer = await GetProducerPool(messageType, messageName);
        await ProducerInUse(producer);
        await SendMessage(producer.Producer, producer.Session.CreateTextMessage(content));
        await ProducerUnUse(producer);
    }
    /// <summary>
    /// 發(fā)送文本內(nèi)容
    /// </summary>
    /// <param name="content"></param>
    /// <param name="type"></param>
    /// <param name="name"></param>
    public async Task SendMessage(string content, string type, string name)
    {
        messageName = string.IsNullOrWhiteSpace(name) ? messageName : name.ToLower();
        messageType = string.IsNullOrWhiteSpace(type) ? messageType : type.ToLower();

        ProducerPool producer = await GetProducerPool(type, name);
        await ProducerInUse(producer);
        await SendMessage(producer.Producer, producer.Session.CreateTextMessage(content));
        await ProducerUnUse(producer);
    }
    /// <summary>
    /// 發(fā)送對象
    /// </summary>
    /// <param name="content"></param>
    public async Task SendMessage(object content)
    {
        ProducerPool producer = await GetProducerPool(messageType, messageName);
        await ProducerInUse(producer);
        await SendMessage(producer.Producer, producer.Session.CreateObjectMessage(content));
        await ProducerUnUse(producer);
    }
    /// <summary>
    /// 發(fā)送對象
    /// </summary>
    /// <param name="content"></param>
    /// <param name="type"></param>
    /// <param name="name"></param>
    public async Task SendMessage(object content, string type, string name)
    {
        messageName = string.IsNullOrWhiteSpace(name) ? messageName : name;
        messageType = string.IsNullOrWhiteSpace(type) ? messageType : type;

        ProducerPool producer = await GetProducerPool(type, name);
        await ProducerInUse(producer);
        await SendMessage(producer.Producer, producer.Session.CreateObjectMessage(content));
        await ProducerUnUse(producer);
    }

    private async Task ProducerInUse(ProducerPool pool)
    {
        await Task.Run(() =>
        {
            int index = ProducerPools.IndexOf(pool);
            ProducerPools[index].IsEnable = false;
        });
    }

    private async Task ProducerUnUse(ProducerPool pool)
    {
        await Task.Run(() =>
        {
            int index = ProducerPools.IndexOf(pool);
            ProducerPools[index].IsEnable = true;
        });
    }

    public async Task<ProducerPool> GetProducerPool(string type, string name)
    {
        if (string.IsNullOrWhiteSpace(type) || string.IsNullOrWhiteSpace(name)) return null;

        var canUseProducers = ProducerPools.Where(m => m.IsEnable == true && m.Type == type.ToLower() && m.Name == name.ToLower());
        var canUseSessions = ProducerPools.Where(m => m.CurrentSessionCounter < 100);

        if (canUseProducers.Count() == 0 && canUseSessions.Count() > 0)
        {
            ProducerPool producerPool = ProducerPools.Where(m => m.CurrentSessionCounter < 100).FirstOrDefault();
            producerPool = await CreateProducerPool(producerPool.Session, type.ToLower(), name.ToLower());
            return producerPool;
        }
        else if (canUseSessions.Count() == 0 && SessionCounter < 100)
        {
            ProducerPool producerPool = await CreateSessionPool();
            producerPool = await CreateProducerPool(producerPool.Session, type.ToLower(), name.ToLower());
            return producerPool;
        }
        else
        {
            while (canUseProducers.Count() == 0)
            {
                canUseProducers = ProducerPools.Where(m => m.IsEnable == true && m.Type == type.ToLower() && m.Name == name.ToLower());
            }
            return canUseProducers.FirstOrDefault();
        }
    }

    private async Task<ProducerPool> CreateProducerPool(ISession session, string type, string name)
    {
        IMessageProducer producer = await CreateProducer(session, type, name);
        ProducerPool producerPool = new ProducerPool() { Session = session, Producer = producer, Type = type.ToLower(), Name = name.ToLower(), CurrentSessionCounter = 0 };
        ProducerPools.Insert(0, producerPool);
        return producerPool;
    }

    private async Task<ProducerPool> CreateSessionPool()
    {
        ISession session = await CreateSession();
        ProducerPool producerPool = new ProducerPool() { Session = session, CurrentSessionCounter = 0 };
        return producerPool;
    }

    private IDestination CreateDestination(ISession session, string type, string name)
    {
        IDestination destination = null;
        if (session == null || string.IsNullOrWhiteSpace(type) || string.IsNullOrWhiteSpace(name)) return destination;
        if (type == "topic") { destination = session.GetTopic(name); }
        else { destination = session.GetQueue(name); }
        return destination;
    }

    private async Task<IMessageProducer> CreateProducer(ISession session, string type, string name)
    {
        return await Task.Run(() =>
        {
            IDestination dest = CreateDestination(session, type, name);
            return session.CreateProducer(dest);
        });
    }

    private async Task<ISession> CreateSession()
    {
        return await Task.Run(() =>
        {
            if (connection == null) CreateConnection();

            ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
            SessionCounter++;
            return session;
        });
    }
    /// <summary>
    /// 發(fā)送消息
    /// </summary>
    /// <param name="producer"></param>
    /// <param name="message"></param>
    /// <returns></returns>
    public async Task SendMessage(IMessageProducer producer, IMessage message)
    {
        await Task.Run(() =>
        {
            producer.Send(message);
        });
    }
    /// <summary>
    /// 連接異常監(jiān)控
    /// </summary>
    /// <param name="ex"></param>
    private void ConnectionException(Exception ex)
    {
        MessageBox.Show(ex.Message + "\r\n" + ex.StackTrace);
    }
}

public class ProducerPool
{
    /// <summary>
    /// Activemq Session
    /// </summary>
    public ISession Session { get; set; }
    /// <summary>
    /// 生產(chǎn)者
    /// </summary>
    public IMessageProducer Producer { get; set; }
    /// <summary>
    /// 生產(chǎn)者類型
    /// </summary>
    public string Type { get; set; }
    /// <summary>
    /// 生產(chǎn)者名稱
    /// </summary>
    public string Name { get; set; }
    /// <summary>
    /// 當(dāng)前Session產(chǎn)生的會話數(shù)量
    /// </summary>
    public int CurrentSessionCounter { get; set; }
    /// <summary>
    /// Session下Producer計數(shù)器
    /// </summary>
    public bool IsEnable { get; set; } = true;
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市冤馏,隨后出現(xiàn)的幾起案子日麸,更是在濱河造成了極大的恐慌寄啼,老刑警劉巖逮光,帶你破解...
    沈念sama閱讀 207,113評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異墩划,居然都是意外死亡涕刚,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評論 2 381
  • 文/潘曉璐 我一進(jìn)店門乙帮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來杜漠,“玉大人,你說我怎么就攤上這事〖蒈睿” “怎么了盼樟?”我有些...
    開封第一講書人閱讀 153,340評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長锈至。 經(jīng)常有香客問我晨缴,道長,這世上最難降的妖魔是什么峡捡? 我笑而不...
    開封第一講書人閱讀 55,449評論 1 279
  • 正文 為了忘掉前任击碗,我火速辦了婚禮,結(jié)果婚禮上们拙,老公的妹妹穿的比我還像新娘稍途。我一直安慰自己,他們只是感情好砚婆,可當(dāng)我...
    茶點故事閱讀 64,445評論 5 374
  • 文/花漫 我一把揭開白布械拍。 她就那樣靜靜地躺著,像睡著了一般装盯。 火紅的嫁衣襯著肌膚如雪殊者。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,166評論 1 284
  • 那天验夯,我揣著相機(jī)與錄音猖吴,去河邊找鬼。 笑死挥转,一個胖子當(dāng)著我的面吹牛海蔽,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播绑谣,決...
    沈念sama閱讀 38,442評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼党窜,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了借宵?” 一聲冷哼從身側(cè)響起幌衣,我...
    開封第一講書人閱讀 37,105評論 0 261
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎壤玫,沒想到半個月后豁护,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,601評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡欲间,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,066評論 2 325
  • 正文 我和宋清朗相戀三年楚里,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片猎贴。...
    茶點故事閱讀 38,161評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡班缎,死狀恐怖蝴光,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情达址,我是刑警寧澤蔑祟,帶...
    沈念sama閱讀 33,792評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站沉唠,受9級特大地震影響做瞪,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜右冻,卻給世界環(huán)境...
    茶點故事閱讀 39,351評論 3 307
  • 文/蒙蒙 一装蓬、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧纱扭,春花似錦牍帚、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至肃叶,卻和暖如春蹂随,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背因惭。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評論 1 261
  • 我被黑心中介騙來泰國打工岳锁, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人蹦魔。 一個月前我還...
    沈念sama閱讀 45,618評論 2 355
  • 正文 我出身青樓激率,卻偏偏與公主長得像,于是被迫代替她去往敵國和親勿决。 傳聞我的和親對象是個殘疾皇子乒躺,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,916評論 2 344

推薦閱讀更多精彩內(nèi)容