【RabbitMQ-3】連接池的配置

java NIO是IO的多路復(fù)用千劈,Channel連接是TCP的多路復(fù)用蠢莺。那么他們有什么關(guān)系呢督禽?
NIO是服務(wù)器開啟一個線程讲婚,在內(nèi)核中使用select()進(jìn)行輪詢管理一些socket赵哲,當(dāng)socket數(shù)據(jù)準(zhǔn)備好時待德,會通知應(yīng)用程序進(jìn)行讀寫請求。系統(tǒng)之間那點事-NIO(內(nèi)附IO模型)-IO/NIO/AIO到底是什么枫夺。服務(wù)器看起來就好像是一個socket在通信将宪,實現(xiàn)了多路復(fù)用。
channel復(fù)用TCP連接橡庞,是為了避免TCP連接創(chuàng)建和銷毀的性能損耗较坛,而多個channel使用一個tcp連接。

1. rabbitmq的connection連接池

RabbitMQ.png

我們看到一個Connection里面可以包含多個channel扒最。那么我們在連接broker時丑勤,connectionchannel的關(guān)系是什么?

1.1 問題提出

1.1.1 Connection對象管理以及性能

Connection連接本質(zhì)上就是TCP連接吧趣,系統(tǒng)之間那點事-問題驅(qū)動-TCP的連接和關(guān)閉是比較耗費時間的法竞。我們可以使用一個單例的Connection對象創(chuàng)建多個Channel來實現(xiàn)數(shù)據(jù)傳輸,但是在channel信息比較大的情況下强挫,Connection帶寬會限制消息的傳輸岔霸。那么需要設(shè)計Connection池,將流量分?jǐn)偟讲煌?code>connection上俯渤。

1.1.2 Channel對象管理以及性能

Channel對象的創(chuàng)建和銷毀也是非常耗時的呆细,推薦共享使用Channel,而不是每次都創(chuàng)建和銷毀Channel八匠。那如何設(shè)計一個channel線程池呢絮爷?

1.2 官網(wǎng)解讀

官網(wǎng)對于Connection的解讀:

AMQP 0-9-1 connections are typically long-lived. AMQP 0-9-1 is an application level protocol that uses TCP for reliable delivery. Connections use authentication and can be protected using TLS. When an application no longer needs to be connected to the server, it should gracefully close its AMQP 0-9-1 connection instead of abruptly closing the underlying TCP connection.

大概意思就是:AMQP 0-9-1一般是一個TCP的長鏈接,當(dāng)應(yīng)用程序不再需要連接到服務(wù)器時梨树,應(yīng)該正常關(guān)閉AMQP 0-9-1連接而不是關(guān)閉TCP連接坑夯。

官網(wǎng)對于Channel的解讀:

Some applications need multiple connections to the broker. However, it is undesirable to keep many TCP connections open at the same time because doing so consumes system resources and makes it more difficult to configure firewalls. AMQP 0-9-1 connections are multiplexed withchannels that can be thought of as "lightweight connections that share a single TCP connection".
Every protocol operation performed by a client happens on a channel. Communication on a particular channel is completely separate from communication on another channel, therefore every protocol method also carries a channel ID (a.k.a. channel number), an integer that both the broker and clients use to figure out which channel the method is for.
A channel only exists in the context of a connection and never on its own. When a connection is closed, so are all channels on it.

For applications that use multiple threads/processes for processing, it is very common to open a new channel per thread/process and not share channels between them.

大概的意思就是:一些應(yīng)用需要同時創(chuàng)建多個連接到broker也就是RabbitMQ服務(wù)器上。然而因為防火墻的存在劝萤,很難同時創(chuàng)建多個連接渊涝。 AMQP 0-9-1連接使用多個channel連接實現(xiàn)對單一Connection的復(fù)用。
客戶端的每一個協(xié)議操作都發(fā)送在channel上。每個協(xié)議方法攜帶者channel ID跨释。brokerclient使用channel ID來確定方法對應(yīng)的channel胸私。因此實現(xiàn)channel之間的數(shù)據(jù)隔離。
channel不能單獨存在鳖谈,僅存在connection上下文中岁疼。當(dāng)connection關(guān)閉時,channel也會關(guān)閉缆娃。
多線程/進(jìn)程之間打開一個channel但不共享channels是很普遍的捷绒。

通道和并發(fā)注意事項(線程安全)

As a rule of thumb, sharing Channel instances between threads is something to be avoided. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.

線程之間共享channel是無法避免的,應(yīng)用程序跟喜歡每個線程使用一個channel而不是跨線程共享相同的channel贯要。

A classic anti-pattern to be avoided is opening a channel for each published message. Channels are supposed to be reasonably long-lived and opening a new one is a network round-trip which makes this pattern extremely inefficient.

要避免一個反例暖侨,為每一個發(fā)布的消息分配一個channel,開辟一個新的channel需要一個網(wǎng)絡(luò)的往返崇渗,這種模式是很低效的字逗。channel保持合理的存活時間。

It is possible to use channel pooling to avoid concurrent publishing on a shared channel: once a thread is done working with a channel, it returns it to the pool, making the channel available for another thread. Channel pooling can be thought of as a specific synchronization solution. It is recommended that an existing pooling library is used instead of a homegrown solution. For example, Spring AMQP which comes with a ready-to-use channel pooling feature.

(敲黑板宅广,劃重點)可以使用channel pool來避免共享channel上并發(fā)發(fā)布:一旦一個線程使用完了channel葫掉,那么它將返回到pool中。其他線程便可使用這個Channel跟狱。線程池是一個解決方案俭厚,可以使用 Spring AMQP線程池而不是自己開發(fā)。

總結(jié):頻繁建立TCP連接和channel連接是消耗性能的驶臊,于是我們希望可以共享connection或者channel挪挤。達(dá)到連接的復(fù)用。

1.3 Spring AMQP線程池配置

版本spring-rabbit:2.0.2.RELEASE

1.3.1 ConnectionFactory連接工廠

這個ConnectionFactory是Spring AMQP定義的連接工廠资铡,負(fù)責(zé)創(chuàng)建連接电禀。而CacheConnectionFactory實現(xiàn)支持對這些通道的緩存。

  private static ConnectionFactory newRabbitConnectionFactory() {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setAutomaticRecoveryEnabled(false);
        return connectionFactory;
    }

參數(shù)分析:

1. 開啟confirm機(jī)制笤休。
connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true);
為了消息的不丟失,生產(chǎn)者可以設(shè)置事務(wù)或者confirm異步通知症副。但是事務(wù)性能并不是很好店雅,所以一般使用confirm模式。
區(qū)別:(confirm保證達(dá)到交換機(jī)贞铣,return保證交換機(jī)到達(dá)隊列)
如果消息沒有到exchange,則confirm回調(diào),ack=false闹啦;
如果消息到達(dá)exchange,則confirm回調(diào),ack=true
exchangequeue成功,則不回調(diào)return
exchangequeue失敗,則回調(diào)return(需設(shè)置mandatory=true,否則不回回調(diào),消息就丟了)

*注意:設(shè)置PublisherReturns狀態(tài)為true辕坝,那么需要設(shè)置 rabbitTemplate.setMandatory(true);
具體如何保證消息不丟失窍奋,請參考RabbitMQ的消息不丟失機(jī)制

2. 配置模式
緩存模式一般兩種:
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);

  public static enum CacheMode {
        CHANNEL,
        CONNECTION;

        private CacheMode() {
        }
    }

2.1 CHANNEL模式
程序運行期間ConnectionFactory只維護(hù)著一個connection,但是可以含有多個channel,操作rabbitmq之前必須先獲取一個channel琳袄,否則將會阻塞江场。
相關(guān)參數(shù)配置:

connectionFactory.setChannelCacheSize(10);
設(shè)置每個Connection中的緩存Channel的數(shù)量。操作rabbitmq之前(send/receive message等)要先獲取到一個Channel窖逗,獲取Channel時會先從緩存中找閑置的Channel址否,如果沒有則創(chuàng)建新的Channel,當(dāng)Channel數(shù)量大于緩存數(shù)量時碎紊,多出來沒法放進(jìn)緩存的會被關(guān)閉佑附。

connectionFactory.setChannelCheckoutTimeout(600);
單位毫秒,當(dāng)這個值大于0時仗考,ChannelCacheSize代表的是緩存的數(shù)量上限音同,當(dāng)緩存獲取不到可用的channel時,不會創(chuàng)建新的channel會等待指定的時間秃嗜,若到時間后還獲取不到可用的channel权均,直接拋出AmqpTimeoutException

注意:在CONNECTION模式痪寻,這個值也會影響獲取Connection的等待時間螺句,超時獲取不到Connection也會拋出AmqpTimeoutException異常。

2.2 CONNECTION模式

CONNECTION模式橡类。在這個模式下允許創(chuàng)建多個connection蛇尚,會緩存一定數(shù)量的connection,每個connection中同樣緩存著一些channel顾画。

相關(guān)參數(shù)配置:

connectionFactory.setConnectionCacheSize(3);
僅在CONNECTION模式下使用取劫,指定connection緩存數(shù)量。

connectionFactory.setConnectionLimit(10);
僅在CONNECTION模式下使用研侣,指定connection數(shù)量上限谱邪。

官網(wǎng)對于是否關(guān)閉channel解答:

Channels used within the framework (e.g. RabbitTemplate) will be reliably returned to the cache. If you create channels outside of the framework, (e.g. by accessing the connection(s) directly and invoking createChannel()), you must return them (by closing) reliably, perhaps in a finally block, to avoid running out of channels.

注意:若使用RabbitTemplate創(chuàng)建channel,那么無需關(guān)閉庶诡,但是自己新建connection創(chuàng)建channel惦银,則需要手動關(guān)閉!避免channel溢出末誓。

ConnectionFactory 代碼:

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        //設(shè)置virtualHost扯俱。
        connectionFactory.setVirtualHost("/");
        //消息的確認(rèn)機(jī)制(confirm);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        //setCacheMode:設(shè)置緩存模式喇澡,共有兩種迅栅,CHANNEL和CONNECTION模式。
        //1晴玖、CONNECTION模式读存,這個模式下允許創(chuàng)建多個Connection为流,會緩存一定數(shù)量的Connection,每個Connection中同樣會緩存一些Channel让簿,
        // 除了可以有多個Connection敬察,其它都跟CHANNEL模式一樣。
        //2拜英、CHANNEL模式静汤,程序運行期間ConnectionFactory會維護(hù)著一個Connection,
        // 所有的操作都會使用這個Connection居凶,但一個Connection中可以有多個Channel虫给,
        // 操作rabbitmq之前都必須先獲取到一個Channel,
        // 否則就會阻塞(可以通過setChannelCheckoutTimeout()設(shè)置等待時間)侠碧,
        // 這些Channel會被緩存(緩存的數(shù)量可以通過setChannelCacheSize()設(shè)置)抹估;
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);   //設(shè)置CONNECTION模式,可創(chuàng)建多個Connection連接
        //設(shè)置每個Connection中緩存Channel的數(shù)量弄兜,不是最大的药蜻。操作rabbitmq之前(send/receive message等)
        // 要先獲取到一個Channel.獲取Channel時會先從緩存中找閑置的Channel,如果沒有則創(chuàng)建新的Channel替饿,
        // 當(dāng)Channel數(shù)量大于緩存數(shù)量時语泽,多出來沒法放進(jìn)緩存的會被關(guān)閉。
        connectionFactory.setChannelCacheSize(10);
        //單位:毫秒视卢;配合channelCacheSize不僅是緩存數(shù)量踱卵,而且是最大的數(shù)量。
        // 從緩存獲取不到可用的Channel時据过,不會創(chuàng)建新的Channel惋砂,會等待這個值設(shè)置的毫秒數(shù)
        //同時,在CONNECTION模式绳锅,這個值也會影響獲取Connection的等待時間西饵,
        // 超時獲取不到Connection也會拋出AmqpTimeoutException異常。
        connectionFactory.setChannelCheckoutTimeout(600);

        //僅在CONNECTION模式使用鳞芙,設(shè)置Connection的緩存數(shù)量眷柔。
        connectionFactory.setConnectionCacheSize(3);
        //setConnectionLimit:僅在CONNECTION模式使用,設(shè)置Connection的數(shù)量上限原朝。
        connectionFactory.setConnectionLimit(10);
        return connectionFactory;
    }

RabbitTemplate 代碼:

  @Autowired
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        //客戶端開啟confirm模式
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("消息發(fā)送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
            }
        });
        return rabbitTemplate;
    }

2.2 消費發(fā)送和接收使用不同的Connection

當(dāng)一個服務(wù)同時作為消息的發(fā)送端和接收端時闯割,建議使用不同的Connection避免一方出現(xiàn)故障或者阻塞影響另一方。

只需要在RabbitTemplate中加入下面的配置竿拆,那么RabbitTemplate在創(chuàng)建Connection時,會根據(jù)這個boolean的值宾尚,選擇使用ConnectionFactory本身或者ConnectionFactory中的publisherConnectionFactory(也即是一個ConnectionFactory)來創(chuàng)建丙笋。

rabbitTemplate.setUsePublisherConnection(true);
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末谢澈,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子御板,更是在濱河造成了極大的恐慌锥忿,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件怠肋,死亡現(xiàn)場離奇詭異敬鬓,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)笙各,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進(jìn)店門钉答,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人杈抢,你說我怎么就攤上這事数尿。” “怎么了惶楼?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵右蹦,是天一觀的道長。 經(jīng)常有香客問我歼捐,道長何陆,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任豹储,我火速辦了婚禮贷盲,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘颂翼。我一直安慰自己晃洒,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布朦乏。 她就那樣靜靜地躺著球及,像睡著了一般。 火紅的嫁衣襯著肌膚如雪呻疹。 梳的紋絲不亂的頭發(fā)上吃引,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天,我揣著相機(jī)與錄音刽锤,去河邊找鬼镊尺。 笑死,一個胖子當(dāng)著我的面吹牛并思,可吹牛的內(nèi)容都是我干的庐氮。 我是一名探鬼主播,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼宋彼,長吁一口氣:“原來是場噩夢啊……” “哼弄砍!你這毒婦竟也來了仙畦?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤音婶,失蹤者是張志新(化名)和其女友劉穎慨畸,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體衣式,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡寸士,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了碴卧。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片弱卡。...
    茶點故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖螟深,靈堂內(nèi)的尸體忽然破棺而出谐宙,到底是詐尸還是另有隱情,我是刑警寧澤界弧,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布凡蜻,位于F島的核電站,受9級特大地震影響垢箕,放射性物質(zhì)發(fā)生泄漏划栓。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一条获、第九天 我趴在偏房一處隱蔽的房頂上張望忠荞。 院中可真熱鬧,春花似錦帅掘、人聲如沸委煤。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽碧绞。三九已至,卻和暖如春吱窝,著一層夾襖步出監(jiān)牢的瞬間讥邻,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工院峡, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留兴使,地道東北人。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓照激,卻偏偏與公主長得像发魄,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子俩垃,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內(nèi)容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 15,925評論 2 11
  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi閱讀 7,334評論 0 10
  • 本文章翻譯自http://www.rabbitmq.com/api-guide.html欠母,并沒有及時更新欢策。 術(shù)語對...
    joyenlee閱讀 7,660評論 0 3
  • 昨天周四,早上起來也是七點多了赏淌,沒出操也被班主任給抓了,于是說是下午四點半去操場跑步啄清,起來以后就去上課去了六水,第一節(jié)...
    堅志閱讀 204評論 0 0
  • 一 初春的無界村,山青水秀辣卒,阡陌塘池掷贾,柏楊垂柳,一幅世外桃源的田園風(fēng)光荣茫。 蕭寧走在村外的田埂上想帅,眼睛環(huán)視著四周,目...
    書成我讀閱讀 455評論 0 10