java NIO是IO的多路復(fù)用千劈,Channel連接是TCP的多路復(fù)用蠢莺。那么他們有什么關(guān)系呢督禽?
NIO
是服務(wù)器開啟一個線程讲婚,在內(nèi)核中使用select()進(jìn)行輪詢管理一些socket赵哲,當(dāng)socket數(shù)據(jù)準(zhǔn)備好時待德,會通知應(yīng)用程序進(jìn)行讀寫請求。系統(tǒng)之間那點事-NIO(內(nèi)附IO模型)-IO/NIO/AIO到底是什么枫夺。服務(wù)器看起來就好像是一個socket在通信将宪,實現(xiàn)了多路復(fù)用。
channel
復(fù)用TCP連接橡庞,是為了避免TCP連接創(chuàng)建和銷毀的性能損耗较坛,而多個channel使用一個tcp連接。
1. rabbitmq的connection連接池
我們看到一個Connection
里面可以包含多個channel
扒最。那么我們在連接broker
時丑勤,connection
和channel
的關(guān)系是什么?
1.1 問題提出
1.1.1 Connection對象管理以及性能
Connection連接本質(zhì)上就是TCP
連接吧趣,系統(tǒng)之間那點事-問題驅(qū)動-TCP的連接和關(guān)閉是比較耗費時間的法竞。我們可以使用一個單例的Connection
對象創(chuàng)建多個Channel
來實現(xiàn)數(shù)據(jù)傳輸,但是在channel
信息比較大的情況下强挫,Connection
帶寬會限制消息的傳輸岔霸。那么需要設(shè)計Connection
池,將流量分?jǐn)偟讲煌?code>connection上俯渤。
1.1.2 Channel對象管理以及性能
Channel對象的創(chuàng)建和銷毀也是非常耗時的呆细,推薦共享使用Channel
,而不是每次都創(chuàng)建和銷毀Channel
八匠。那如何設(shè)計一個channel
線程池呢絮爷?
1.2 官網(wǎng)解讀
官網(wǎng)對于Connection的解讀:
AMQP 0-9-1 connections are typically long-lived. AMQP 0-9-1 is an application level protocol that uses TCP for reliable delivery. Connections use authentication and can be protected using TLS. When an application no longer needs to be connected to the server, it should gracefully close its AMQP 0-9-1 connection instead of abruptly closing the underlying TCP connection.
大概意思就是:AMQP 0-9-1
一般是一個TCP
的長鏈接,當(dāng)應(yīng)用程序不再需要連接到服務(wù)器時梨树,應(yīng)該正常關(guān)閉AMQP 0-9-1
連接而不是關(guān)閉TCP
連接坑夯。
官網(wǎng)對于Channel的解讀:
Some applications need multiple connections to the broker. However, it is undesirable to keep many TCP connections open at the same time because doing so consumes system resources and makes it more difficult to configure firewalls. AMQP 0-9-1 connections are multiplexed withchannels that can be thought of as "lightweight connections that share a single TCP connection".
Every protocol operation performed by a client happens on a channel. Communication on a particular channel is completely separate from communication on another channel, therefore every protocol method also carries a channel ID (a.k.a. channel number), an integer that both the broker and clients use to figure out which channel the method is for.
A channel only exists in the context of a connection and never on its own. When a connection is closed, so are all channels on it.
For applications that use multiple threads/processes for processing, it is very common to open a new channel per thread/process and not share channels between them.
大概的意思就是:一些應(yīng)用需要同時創(chuàng)建多個連接到broker
也就是RabbitMQ
服務(wù)器上。然而因為防火墻的存在劝萤,很難同時創(chuàng)建多個連接渊涝。 AMQP 0-9-1
連接使用多個channel
連接實現(xiàn)對單一Connection
的復(fù)用。
客戶端的每一個協(xié)議操作都發(fā)送在channel
上。每個協(xié)議方法攜帶者channel ID
跨释。broker
和client
使用channel ID
來確定方法對應(yīng)的channel
胸私。因此實現(xiàn)channel
之間的數(shù)據(jù)隔離。
channel
不能單獨存在鳖谈,僅存在connection
上下文中岁疼。當(dāng)connection
關(guān)閉時,channel
也會關(guān)閉缆娃。
多線程/進(jìn)程之間打開一個channel
但不共享channels
是很普遍的捷绒。
通道和并發(fā)注意事項(線程安全)
As a rule of thumb, sharing Channel instances between threads is something to be avoided. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.
線程之間共享channel
是無法避免的,應(yīng)用程序跟喜歡每個線程使用一個channel
而不是跨線程共享相同的channel
贯要。
A classic anti-pattern to be avoided is opening a channel for each published message. Channels are supposed to be reasonably long-lived and opening a new one is a network round-trip which makes this pattern extremely inefficient.
要避免一個反例暖侨,為每一個發(fā)布的消息分配一個channel
,開辟一個新的channel
需要一個網(wǎng)絡(luò)的往返崇渗,這種模式是很低效的字逗。channel
保持合理的存活時間。
It is possible to use channel pooling to avoid concurrent publishing on a shared channel: once a thread is done working with a channel, it returns it to the pool, making the channel available for another thread. Channel pooling can be thought of as a specific synchronization solution. It is recommended that an existing pooling library is used instead of a homegrown solution. For example, Spring AMQP which comes with a ready-to-use channel pooling feature.
(敲黑板宅广,劃重點)可以使用channel pool
來避免共享channel
上并發(fā)發(fā)布:一旦一個線程使用完了channel
葫掉,那么它將返回到pool
中。其他線程便可使用這個Channel
跟狱。線程池是一個解決方案俭厚,可以使用 Spring AMQP線程池而不是自己開發(fā)。
總結(jié):頻繁建立TCP
連接和channel
連接是消耗性能的驶臊,于是我們希望可以共享connection
或者channel
挪挤。達(dá)到連接的復(fù)用。
1.3 Spring AMQP線程池配置
版本spring-rabbit:2.0.2.RELEASE
1.3.1 ConnectionFactory連接工廠
這個ConnectionFactory
是Spring AMQP定義的連接工廠资铡,負(fù)責(zé)創(chuàng)建連接电禀。而CacheConnectionFactory
實現(xiàn)支持對這些通道的緩存。
private static ConnectionFactory newRabbitConnectionFactory() {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setAutomaticRecoveryEnabled(false);
return connectionFactory;
}
參數(shù)分析:
1. 開啟confirm機(jī)制笤休。
connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true);
為了消息的不丟失,生產(chǎn)者可以設(shè)置事務(wù)或者confirm
異步通知症副。但是事務(wù)性能并不是很好店雅,所以一般使用confirm
模式。
區(qū)別:(confirm保證達(dá)到交換機(jī)贞铣,return保證交換機(jī)到達(dá)隊列)
如果消息沒有到exchange
,則confirm
回調(diào),ack=false
闹啦;
如果消息到達(dá)exchange
,則confirm
回調(diào),ack=true
;
exchange
到queue
成功,則不回調(diào)return
exchange
到queue
失敗,則回調(diào)return
(需設(shè)置mandatory=true
,否則不回回調(diào),消息就丟了)
*注意:設(shè)置PublisherReturns
狀態(tài)為true
辕坝,那么需要設(shè)置 rabbitTemplate.setMandatory(true);
具體如何保證消息不丟失窍奋,請參考RabbitMQ的消息不丟失機(jī)制
2. 配置模式
緩存模式一般兩種:
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
public static enum CacheMode {
CHANNEL,
CONNECTION;
private CacheMode() {
}
}
2.1 CHANNEL模式
程序運行期間ConnectionFactory
只維護(hù)著一個connection
,但是可以含有多個channel
,操作rabbitmq
之前必須先獲取一個channel
琳袄,否則將會阻塞江场。
相關(guān)參數(shù)配置:
connectionFactory.setChannelCacheSize(10);
設(shè)置每個Connection
中的緩存Channel
的數(shù)量。操作rabbitmq
之前(send/receive message等)要先獲取到一個Channel
窖逗,獲取Channel
時會先從緩存中找閑置的Channel
址否,如果沒有則創(chuàng)建新的Channel
,當(dāng)Channel
數(shù)量大于緩存數(shù)量時碎紊,多出來沒法放進(jìn)緩存的會被關(guān)閉佑附。
connectionFactory.setChannelCheckoutTimeout(600);
單位毫秒,當(dāng)這個值大于0時仗考,ChannelCacheSize
代表的是緩存的數(shù)量上限音同,當(dāng)緩存獲取不到可用的channel
時,不會創(chuàng)建新的channel
會等待指定的時間秃嗜,若到時間后還獲取不到可用的channel
权均,直接拋出AmqpTimeoutException
。
注意:在CONNECTION模式痪寻,這個值也會影響獲取Connection的等待時間螺句,超時獲取不到Connection也會拋出AmqpTimeoutException異常。
2.2 CONNECTION模式
CONNECTION
模式橡类。在這個模式下允許創(chuàng)建多個connection
蛇尚,會緩存一定數(shù)量的connection
,每個connection
中同樣緩存著一些channel
顾画。
相關(guān)參數(shù)配置:
connectionFactory.setConnectionCacheSize(3);
僅在CONNECTION
模式下使用取劫,指定connection
緩存數(shù)量。
connectionFactory.setConnectionLimit(10);
僅在CONNECTION
模式下使用研侣,指定connection
數(shù)量上限谱邪。
官網(wǎng)對于是否關(guān)閉channel解答:
Channels used within the framework (e.g. RabbitTemplate) will be reliably returned to the cache. If you create channels outside of the framework, (e.g. by accessing the connection(s) directly and invoking createChannel()), you must return them (by closing) reliably, perhaps in a finally block, to avoid running out of channels.
注意:若使用RabbitTemplate
創(chuàng)建channel,那么無需關(guān)閉庶诡,但是自己新建connection創(chuàng)建channel惦银,則需要手動關(guān)閉!避免channel溢出末誓。
ConnectionFactory 代碼:
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
//設(shè)置virtualHost扯俱。
connectionFactory.setVirtualHost("/");
//消息的確認(rèn)機(jī)制(confirm);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
//setCacheMode:設(shè)置緩存模式喇澡,共有兩種迅栅,CHANNEL和CONNECTION模式。
//1晴玖、CONNECTION模式读存,這個模式下允許創(chuàng)建多個Connection为流,會緩存一定數(shù)量的Connection,每個Connection中同樣會緩存一些Channel让簿,
// 除了可以有多個Connection敬察,其它都跟CHANNEL模式一樣。
//2拜英、CHANNEL模式静汤,程序運行期間ConnectionFactory會維護(hù)著一個Connection,
// 所有的操作都會使用這個Connection居凶,但一個Connection中可以有多個Channel虫给,
// 操作rabbitmq之前都必須先獲取到一個Channel,
// 否則就會阻塞(可以通過setChannelCheckoutTimeout()設(shè)置等待時間)侠碧,
// 這些Channel會被緩存(緩存的數(shù)量可以通過setChannelCacheSize()設(shè)置)抹估;
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION); //設(shè)置CONNECTION模式,可創(chuàng)建多個Connection連接
//設(shè)置每個Connection中緩存Channel的數(shù)量弄兜,不是最大的药蜻。操作rabbitmq之前(send/receive message等)
// 要先獲取到一個Channel.獲取Channel時會先從緩存中找閑置的Channel,如果沒有則創(chuàng)建新的Channel替饿,
// 當(dāng)Channel數(shù)量大于緩存數(shù)量時语泽,多出來沒法放進(jìn)緩存的會被關(guān)閉。
connectionFactory.setChannelCacheSize(10);
//單位:毫秒视卢;配合channelCacheSize不僅是緩存數(shù)量踱卵,而且是最大的數(shù)量。
// 從緩存獲取不到可用的Channel時据过,不會創(chuàng)建新的Channel惋砂,會等待這個值設(shè)置的毫秒數(shù)
//同時,在CONNECTION模式绳锅,這個值也會影響獲取Connection的等待時間西饵,
// 超時獲取不到Connection也會拋出AmqpTimeoutException異常。
connectionFactory.setChannelCheckoutTimeout(600);
//僅在CONNECTION模式使用鳞芙,設(shè)置Connection的緩存數(shù)量眷柔。
connectionFactory.setConnectionCacheSize(3);
//setConnectionLimit:僅在CONNECTION模式使用,設(shè)置Connection的數(shù)量上限原朝。
connectionFactory.setConnectionLimit(10);
return connectionFactory;
}
RabbitTemplate 代碼:
@Autowired
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
//客戶端開啟confirm模式
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息發(fā)送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
}
});
return rabbitTemplate;
}
2.2 消費發(fā)送和接收使用不同的Connection
當(dāng)一個服務(wù)同時作為消息的發(fā)送端和接收端時闯割,建議使用不同的
Connection
避免一方出現(xiàn)故障或者阻塞影響另一方。
只需要在RabbitTemplate中加入下面的配置竿拆,那么RabbitTemplate
在創(chuàng)建Connection
時,會根據(jù)這個boolean
的值宾尚,選擇使用ConnectionFactory
本身或者ConnectionFactory
中的publisherConnectionFactory
(也即是一個ConnectionFactory
)來創(chuàng)建丙笋。
rabbitTemplate.setUsePublisherConnection(true);