由于之前做的項目中需要在多個節(jié)點之間可靠地通信沮峡,所以廢棄了之前使用的Redis pub/sub(因為集群有單點問題,且有諸多限制),改用了RabbitMQ。
使用期間得到不少收獲责语,也踩了不少坑炮障,所以在此分享下心得目派。
1 - 怎么保證可靠性的?
RabbitMQ提供了幾種特性胁赢,犧牲了一點性能代價企蹭,提供了可靠性的保證。
-
持久化
當RabbitMQ退出時,默認會將消息和隊列都清除谅摄,所以需要在第一次聲明隊列和發(fā)送消息時指定其持久化屬性為true
徒河,
這樣RabbitMQ會將隊列、消息和狀態(tài)存到RabbitMQ本地的數(shù)據(jù)庫送漠,重啟后會恢復顽照。durable=true channel.queueDeclare("task_queue", durable, false, false, null);//隊列 channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());//消息
注:當聲明的隊列已經(jīng)存在時,嘗試重新定義它的durable是不生效的闽寡。
-
接收應答
客戶端接收消息的模式默認是自動應答
代兵,通過設置autoAck為false
可以讓客戶端主動應答
消息.
當客戶端拒絕
此消息或者未應答
便斷開連接時,就會使得此消息重新入隊(2.7.0以前是重新加入到隊尾爷狈,2.7.0及以后是保留在隊列中的原來位置).autoAck = false; requeue = true; channel.basicConsume(queue, autoAck, callback); channel.basicAck();//應答 channel.basicReject(deliveryTag, requeue);//拒絕 channel.basicRecover(requeue);//恢復
-
發(fā)送確認
默認情況下植影,發(fā)送端不關注
發(fā)出去的消息是否
被消費掉了∠延溃可設置channel為confirm
模式.
所有發(fā)送的消息都會被確認一次思币,用戶可以自行根據(jù)server發(fā)回的確認消息查看狀態(tài)。詳細介紹見:confirmschannel.confirmSelect(); // 進入confirm模式 do publish messages... // 每個消息都會被編號羡微,從1開始 channel.getNextPublishSeqNo() // 查看下一個要發(fā)送的消息的序號 channel.waitForConfirms(); // 等待所有消息發(fā)送并確認
-
事務
和confirm模式不能同時使用谷饿,而且會帶來大量的多余開銷,導致吞吐量下降很多拷淘,故而不推薦各墨。channel.txSelect(); try { do something... channel.txCommit(); } catch (e){ channel.txRollback(); }
-
消息隊列的高可用(主備模式)
相比于路由和綁定,可以視為是共享于所有的節(jié)點的启涯,消息隊列默認只存在于第一次聲明它的節(jié)點上, 這樣一旦這個節(jié)點掛了
這個隊列中未處理的消息就沒有了. 幸好贬堵,RabbitMQ提供了將它備份到其他節(jié)點的機制. 任何時候都有
一個master
負責處理請求,其他slaves負責備份结洼,當master掛掉黎做,會將最早創(chuàng)建的那個slave提升為master命令:
rabbitmqctl set_policy ha-all “^ha\.” ‘{“ha-mode”:”all”}’
:
設置所有以’ha’開頭的queue在所有節(jié)點上擁有備份。詳細語法點這里.也可以在界面上配置
20160516071133333.png注:由于exclusive類型的隊列會在client和server連接斷開時被刪掉松忍,所以對它設置持久化屬性和備份都是沒有意義的
順序保證
直接上圖好了:
2 - 一些需要注意的地方
集群配置
一個集群中多個節(jié)點共享一份.erlang.cookie文件蒸殿;若是沒有啟用RABBITMQ_USE_LONGNAME,需要在每個節(jié)點的hosts文件中指定其他節(jié)點的地址鸣峭,不然會找不到其他集群中的節(jié)點宏所。-
腦裂
RabbitMQ集群對于網(wǎng)絡分區(qū)的處理和忍受能力不太好,推薦使用federation
或者shovel
插件去解決摊溶。federation詳見高級->Federation爬骤。但是,情況已經(jīng)發(fā)生了莫换,怎么去解決呢霞玄?放心骤铃,還是有辦法恢復的。當網(wǎng)絡斷斷續(xù)續(xù)時坷剧,會使得節(jié)點之間的通信斷掉惰爬,進而造成集群被分隔開的情況。這樣惫企,每個小集群之后便只處理各自本地的連接和消息撕瞧,從而導致數(shù)據(jù)不同步。當重新恢復網(wǎng)絡連接時狞尔,它們彼此都認為是對方掛了-_-||风范,便可以判斷出有網(wǎng)絡分區(qū)出現(xiàn)了。但是RabbitMQ默認是忽略掉不處理的沪么,造成兩個節(jié)點繼續(xù)各自為政(路由硼婿,綁定關系,隊列等可以獨立地創(chuàng)建刪除禽车,甚至主備隊列也會每一方擁有自己的master)寇漫。可以更改配置使得連接恢復時殉摔,會根據(jù)配置自動恢復州胳。- ignore:默認,不做任何處理
- pause-minority:斷開連接時逸月,判斷當前節(jié)點是否屬于少數(shù)派(節(jié)點數(shù)少于或者等于一半)栓撞,如果是,則暫停直到恢復連接碗硬。
- {pause_if_all_down, [nodes], ignore | autoheal}:斷開連接時瓤湘,判斷當前集群中節(jié)點是否有節(jié)點在nodes中,如果有恩尾,則繼續(xù)運行弛说,否則暫停直到恢復連接。這種策略下翰意,當恢復連接時木人,可能會有多個分區(qū)存活,所以冀偶,最后一個參數(shù)決定它們怎么合并醒第。
- autoheal:當恢復連接時,選擇客戶端連接數(shù)最多的節(jié)點狀態(tài)為主进鸠,重啟其他節(jié)點稠曼。
配置:**【詳見下文:集群配置】
- 多次ack:客戶端多次應答同一條消息,會使得該客戶端收不到后續(xù)消息堤如。
3 - 結合Docker使用
集群版本的實現(xiàn):詳見我自己寫的一個例子rabbitmq-server-cluster
4 - 消息隊列中間件的比較
- RabbitMQ:
- 優(yōu)點:支持很多協(xié)議如:AMQP蒲列,XMPP,STMP搀罢,STOMP蝗岖;靈活的路由;成熟穩(wěn)定的集群方案榔至;負載均衡抵赢;數(shù)據(jù)持久化等。
- 缺點:速度較慢唧取;比較重量級铅鲤,安裝需要依賴Erlang環(huán)境。
- Redis:
- 優(yōu)點:比較輕量級枫弟,易上手
- 缺點:單點問題邢享,功能單一
- Kafka:
- 優(yōu)點:高吞吐;分布式淡诗;快速持久化骇塘;負載均衡;輕量級
- 缺點:極端情況下會丟消息
最后附一張網(wǎng)上截取的測試結果:
5 - 幾個重要的概念
- Virtual Host:包含若干個Exchange和Queue韩容,表示一個節(jié)點款违;
- Exchange:接受客戶端發(fā)送的消息,并根據(jù)Binding將消息路由給服務器中的隊列群凶,Exchange分為direct插爹、fanout、topic三種请梢;
- Binding:連接Exchange和Queue赠尾,包含路由規(guī)則;
- Queue:消息隊列毅弧,存儲還未被消費的消息萍虽;
- Message:Header+Body;
- Channel:通道形真,執(zhí)行AMQP的命令杉编;一個連接可創(chuàng)建多個通道以節(jié)省資源。
6 - Client
RabbitMQ官方實現(xiàn)了很多熱門語言的客戶端咆霜,就不一一列舉啦邓馒,以java為例,直接開始正題:
- 建立連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 可以加上斷開重試機制:
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(10000);
- 創(chuàng)建連接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
- 一對一:一個生產(chǎn)者蛾坯,一個消費者
代碼同上光酣,只不過會有多個消費者,消息會輪序發(fā)給各個消費者脉课。
如果設置了autoAck=false救军,那么可以實現(xiàn)公平分發(fā)(即對于某個特定的消費者财异,每次最多只發(fā)送指定條數(shù)的消息,直到其中一條消息應答后唱遭,再發(fā)送下一條)戳寸。需要在消費者中加上:
int prefetchCount = 1;
channel.basicQos(prefetchCount);
- 廣播
生產(chǎn)者:
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
消費者同上
- Routing
生產(chǎn)者:支持通配符的Routing
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
消費者同上