一顽冶、鏡像隊(duì)列使用
1.鏡像隊(duì)列作用
? RabbitMQ默認(rèn)集群模式采驻,并不包管隊(duì)列的高可用性宝当,盡管隊(duì)列信息视事,交換機(jī)、綁定這些可以復(fù)制到集群里的任何一個(gè)節(jié)點(diǎn)庆揩,然則隊(duì)列內(nèi)容不會(huì)復(fù)制俐东,固然該模式解決一項(xiàng)目組節(jié)點(diǎn)壓力跌穗,但隊(duì)列節(jié)點(diǎn)宕機(jī)直接導(dǎo)致該隊(duì)列無(wú)法應(yīng)用,只能守候重啟虏辫,所以要想在隊(duì)列節(jié)點(diǎn)宕機(jī)或故障也能正常應(yīng)用,就要復(fù)制隊(duì)列內(nèi)容到集群里的每個(gè)節(jié)點(diǎn)蚌吸,須要?jiǎng)?chuàng)建鏡像隊(duì)列。
2.策略設(shè)置
鏡像隊(duì)列設(shè)置可以基于策略設(shè)置砌庄,策略設(shè)置可以通過(guò)如下兩種方法:
(1)RabbitMQ 管理后臺(tái)
(2)rabbitmqctl 設(shè)置
policy 添加命令:
rabbitmqctl set_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition>
指令參數(shù)詳情
參數(shù)名稱(chēng) | 描述 |
---|---|
-p | 可選參數(shù)羹唠,針對(duì)指定 vhost 下的exchange或 queue |
--priority | 可選參數(shù),policy 的優(yōu)先級(jí) |
--apply-to | 可選參數(shù)娄昆,策略適用的對(duì)象類(lèi)型佩微,其值可為 "queues", "exchanges" 或 "all".默認(rèn)是"all" |
name | policy 的名稱(chēng) |
pattern | 匹配模式(正則表達(dá)式) |
definition | 鏡像定義,json 格式萌焰,包括三部分(ha-mode,ha-params,ha-sync-mode)具體配置見(jiàn)下表 |
definition參數(shù)詳情
參數(shù)名稱(chēng) | 描述 |
---|---|
ha-mode | 指名鏡像隊(duì)列模式哺眯,其值可為"all","exactly"或"nodes",all:表示在集群所有節(jié)點(diǎn)上進(jìn)行鏡像扒俯;exactly:表示在指定個(gè)數(shù)的節(jié)點(diǎn)上鏡像奶卓,節(jié)點(diǎn)個(gè)數(shù)由 ha-params 指定;nodes:表示在指定節(jié)點(diǎn)上進(jìn)行鏡像撼玄,節(jié)點(diǎn)名稱(chēng)通過(guò)ha-params 指定寝杖。 |
ha-params | ha-mode模式需要用到的參數(shù):exactly 模式下為數(shù)字表述鏡像節(jié)點(diǎn)數(shù),nodes 模式下為節(jié)點(diǎn)列表表示需要鏡像的節(jié)點(diǎn)互纯。 |
ha-sync-mode | 鏡像隊(duì)列中消息的同步方式,其值可為"automatic"或"manually". |
例如:對(duì)隊(duì)列名稱(chēng)為 hello 開(kāi)頭的所有隊(duì)列鏡像鏡像磕蒲,并且在集群的節(jié)點(diǎn) rabbit@10.18.195.57上進(jìn)行鏡像留潦,隊(duì)列消息自動(dòng)同步,policy 的設(shè)置命令:
rabbitmqctl set_policy --apply-to queues hello-ha "^hello" '{"ha-mode":"nodes","ha-params":["rabbit@10.18.195.57"],"ha-sync-mode":"automatic"}'
3.ha 策略確認(rèn)
鏡像隊(duì)列策略是否生效可以通過(guò)如下兩種方式驗(yàn)證:
(1)RabbitMQ 管理后臺(tái)
可以通過(guò)策略管理驗(yàn)證策略是否配置正確
通過(guò)隊(duì)列列表也可以查看隊(duì)列應(yīng)用的策略辣往,如果是鏡像策略兔院,可以看到當(dāng)前隊(duì)列副本數(shù)
通過(guò)隊(duì)列詳情可以查看鏡像隊(duì)列當(dāng)前主副本在哪個(gè)節(jié)點(diǎn),從副本在哪幾個(gè)節(jié)點(diǎn)
(2)rabbitmqctl 查看
查看策略詳情指令:
rabbitmqctl list_policies
返回:
查看隊(duì)列是否鏡像指令:
rabbitmqctl list_queues name pid slave_pids
返回:
二站削、鏡像隊(duì)列實(shí)現(xiàn)原理
1.整體介紹
? 通常隊(duì)列由兩部分組成:一部分是 amqqueue_process, 負(fù)責(zé)協(xié)議相關(guān)的消息處理坊萝,即接收生產(chǎn)者發(fā)布的消息,向消費(fèi)者投遞消息许起,處理消息 confirm十偶,ack 等等;另外一部分是 backing_queue园细, 作為消息存儲(chǔ)的具體形式和引擎惦积,提供了相關(guān)接口供進(jìn)程amqqueue_process調(diào)用,用來(lái)完成消息的存儲(chǔ)及可能的持久化工作等猛频。
? 鏡像隊(duì)列和普通隊(duì)列組成有所不同狮崩,鏡像隊(duì)列存在兩類(lèi)進(jìn)程:master隊(duì)列進(jìn)程為 amqqueue_process蛛勉,slave 隊(duì)列進(jìn)程為 rabbit_mirror_queue_slave,每個(gè)進(jìn)程會(huì)創(chuàng)建一個(gè) gm(guaranteed multicast)進(jìn)程睦柴,鏡像隊(duì)列中所有 gm 進(jìn)程會(huì)組成一個(gè)進(jìn)程組用于廣播和接收消息诽凌。同時(shí)和普通隊(duì)列一樣,每個(gè)進(jìn)程都包含一個(gè)用于處理消息邏輯的隊(duì)列 backing_queue(默認(rèn)為rabbit_variable_queue)坦敌。集群中每個(gè)有客戶(hù)端連接的節(jié)點(diǎn)都會(huì)啟動(dòng)若干個(gè)channel進(jìn)程侣诵,channel進(jìn)程中記錄著鏡像隊(duì)列中master和所有slave進(jìn)程的Pid,以便直接與隊(duì)列進(jìn)程通信恬试。整體結(jié)構(gòu)如下:
? gm 負(fù)責(zé)消息廣播窝趣,至于廣播消息處理,master 隊(duì)列上回掉處理是通過(guò)coordinator训柴,消息相關(guān)協(xié)議操作是通過(guò)amqqueue_process處理哑舒,而 slave 隊(duì)列都是由rabbit_mirror_queue_slave進(jìn)行處理。
注意:消息的發(fā)布和消費(fèi)都是通過(guò) master 隊(duì)列完成幻馁,master 隊(duì)列對(duì)消息進(jìn)行處理同時(shí)將消息的處理動(dòng)作通過(guò) gm 廣播給所有 slave 隊(duì)列洗鸵,slave 的 gm 收到消息后,通過(guò)回調(diào)交由 rabbit_mirror_queue_slave 進(jìn)行實(shí)際處理仗嗦。
2.gm(Guaranteed Muticast)
? 鏡像隊(duì)列 gm 組通過(guò)將所有 gm 進(jìn)程形成一個(gè)循環(huán)鏈表膘滨,每個(gè) gm 都會(huì)監(jiān)控位于自己左右兩邊的 gm,當(dāng)有 gm 新增時(shí)稀拐,相鄰的 gm 保證當(dāng)前廣播的消息會(huì)通知到新的 gm 上火邓;當(dāng)有 gm 失效時(shí),相鄰的 gm 會(huì)接管保證本次廣播消息會(huì)通知到所有 gm德撬。
? gm 組信息會(huì)記錄在本地?cái)?shù)據(jù)庫(kù)(mnesia)中铲咨,不同的鏡像隊(duì)列行程的 gm 組也是不同的。
? 消息從 master 隊(duì)列對(duì)應(yīng)的 gm 發(fā)出后蜓洪,順著鏈表依次傳送到所有 gm 進(jìn)程纤勒,由于所有 gm 進(jìn)程組成一個(gè)循環(huán)鏈表,master 隊(duì)列的 gm 線(xiàn)程最終會(huì)收到自己發(fā)送的消息隆檀,這個(gè)時(shí)候 master 隊(duì)列就知道消息已經(jīng)復(fù)制到所有 slave 隊(duì)列了摇天。
3.重要的數(shù)據(jù)結(jié)構(gòu)
queue 隊(duì)列相關(guān)信息
-record(q,
{ q, %% 隊(duì)列信息數(shù)據(jù)結(jié)構(gòu)amqqueue
exclusive_consumer, %% 當(dāng)前隊(duì)列的獨(dú)有消費(fèi)者
has_had_consumers, %% 當(dāng)前隊(duì)列中是否有消費(fèi)者的標(biāo)識(shí)
backing_queue, %% backing_queue對(duì)應(yīng)的模塊名字
backing_queue_state, %% backing_queue對(duì)應(yīng)的狀態(tài)結(jié)構(gòu)
consumers, %% 消費(fèi)者存儲(chǔ)的優(yōu)先級(jí)隊(duì)列
expires, %% 當(dāng)前隊(duì)列未使用就刪除自己的時(shí)間
sync_timer_ref, %% 同步confirm的定時(shí)器,當(dāng)前隊(duì)列大部分接收一次消息就要確保當(dāng)前定時(shí)器的存在(200ms的定時(shí)器)
rate_timer_ref, %% 隊(duì)列中消息進(jìn)入和出去的速率定時(shí)器
expiry_timer_ref, %% 隊(duì)列中未使用就刪除自己的定時(shí)器
stats_timer, %% 向rabbit_event發(fā)布信息的數(shù)據(jù)結(jié)構(gòu)狀態(tài)字段
msg_id_to_channel, %% 當(dāng)前隊(duì)列進(jìn)程中等待confirm的消息gb_trees結(jié)構(gòu)恐仑,里面的結(jié)構(gòu)是Key:MsgId Value:{SenderPid, MsgSeqNo}
ttl, %% 隊(duì)列中設(shè)置的消息存在的時(shí)間
ttl_timer_ref, %% 隊(duì)列中消息存在的定時(shí)器
ttl_timer_expiry, %% 當(dāng)前隊(duì)列頭部消息的過(guò)期時(shí)間點(diǎn)
senders, %% 向當(dāng)前隊(duì)列發(fā)送消息的rabbit_channel進(jìn)程列表
dlx, %% 死亡消息要發(fā)送的exchange交換機(jī)(通過(guò)隊(duì)列聲明的參數(shù)或者policy接口來(lái)設(shè)置)
dlx_routing_key, %% 死亡消息要發(fā)送的路由規(guī)則(通過(guò)隊(duì)列聲明的參數(shù)或者policy接口來(lái)設(shè)置)
max_length, %% 當(dāng)前隊(duì)列中消息的最大上限(通過(guò)隊(duì)列聲明的參數(shù)或者policy接口來(lái)設(shè)置)
max_bytes, %% 隊(duì)列中消息內(nèi)容占的最大空間
args_policy_version, %% 當(dāng)前隊(duì)列中參數(shù)設(shè)置對(duì)應(yīng)的版本號(hào)泉坐,每設(shè)置一次都會(huì)將版本號(hào)加一
status %% 當(dāng)前隊(duì)列的狀態(tài)
}).
state 記錄 gm 進(jìn)程狀態(tài)
-record(state,
{ self, %% gm本身的ID
left, %% 該節(jié)點(diǎn)左邊的節(jié)點(diǎn)
right, %% 該節(jié)點(diǎn)右邊的節(jié)點(diǎn)
group_name, %% group名稱(chēng)與隊(duì)列名一致
module, %% 回調(diào)模塊rabbit_mirror_queue_slave或者rabbit_mirror_queue_coordinator
view, %% group成員列表視圖信息,記錄了成員的ID及每個(gè)成員的左右鄰居節(jié)點(diǎn)(組裝成一個(gè)循環(huán)列表)
pub_count, %% 當(dāng)前已發(fā)布的消息計(jì)數(shù)
members_state, %% group成員狀態(tài)列表 記錄了廣播狀態(tài):[#member{}]
callback_args, %% 回調(diào)函數(shù)的參數(shù)信息菊霜,rabbit_mirror_queue_slave/rabbit_mirror_queue_coordinator進(jìn)程PID
confirms, %% confirm列表
broadcast_buffer, %% 緩存待廣播的消息
broadcast_buffer_sz, %% 當(dāng)前緩存帶廣播中消息實(shí)體總的大小
broadcast_timer, %% 廣播消息定時(shí)器
txn_executor %% 操作Mnesia數(shù)據(jù)庫(kù)的操作函數(shù)
}).
gm_group 整個(gè)鏡像隊(duì)列群組的信息坚冀,該信息會(huì)存儲(chǔ)到Mnesia數(shù)據(jù)庫(kù)
-record(gm_group,
{ name, %% group的名稱(chēng),與queue的名稱(chēng)一致
version, %% group的版本號(hào), 新增節(jié)點(diǎn)/節(jié)點(diǎn)失效時(shí)會(huì)遞增
members %% group的成員列表, 按照節(jié)點(diǎn)組成的鏈表順序進(jìn)行排序
}).
view_member 鏡像隊(duì)列群組視圖成員數(shù)據(jù)結(jié)構(gòu)
-record(view_member,
{ id, %% 單個(gè)鏡像隊(duì)列(結(jié)構(gòu)是{版本號(hào),該鏡像隊(duì)列的Pid})
aliases, %% 記錄id對(duì)應(yīng)的左側(cè)死亡的GM進(jìn)程列表
left, %% 當(dāng)前鏡像隊(duì)列左邊的鏡像隊(duì)列(結(jié)構(gòu)是{版本號(hào)鉴逞,該鏡像隊(duì)列的Pid})
right %% 當(dāng)前鏡像隊(duì)列右邊的鏡像隊(duì)列(結(jié)構(gòu)是{版本號(hào)记某,該鏡像隊(duì)列的Pid})
}).
三司训、鏡像隊(duì)列組群維護(hù)
1.節(jié)點(diǎn)新加入組群
目前已有節(jié)點(diǎn) A,B液南,C壳猜,新加入節(jié)點(diǎn) B,如圖:
節(jié)點(diǎn)加入集群流程如下:
(1)新增節(jié)點(diǎn)先從 gm_group 中獲取對(duì)應(yīng) group 成員信息滑凉;
(2)隨機(jī)選擇一個(gè)節(jié)點(diǎn)并向這個(gè)節(jié)點(diǎn)發(fā)送加入請(qǐng)求统扳;
(3)集群節(jié)點(diǎn)收到新增節(jié)點(diǎn)請(qǐng)求后,更新 gm_group 對(duì)應(yīng)信息畅姊,同時(shí)更新左右節(jié)點(diǎn)更新鄰居信息(調(diào)整對(duì)左右節(jié)點(diǎn)的監(jiān)控)咒钟;
(4)集群節(jié)點(diǎn)回復(fù)通知新增節(jié)點(diǎn)成功加入 group;
(5)新增節(jié)點(diǎn)收到回復(fù)后更新 rabbit_queue 中的相關(guān)信息若未,同時(shí)根據(jù)策略同步消息朱嘴。
核心流程詳解:
(1)新增節(jié)點(diǎn) D 的 GM 進(jìn)程請(qǐng)求加入組群
%% 同步處理將自己加入到鏡像隊(duì)列的群組中的消息
handle_cast(join, State = #state { self = Self,
group_name = GroupName,
members_state = undefined,
module = Module,
callback_args = Args,
txn_executor = TxnFun })->
%% join_group函數(shù)主要執(zhí)行邏輯
%% 1.判斷時(shí)候有存活節(jié)點(diǎn),如果沒(méi)有存活粗合,則重新創(chuàng)建gm_group數(shù)據(jù)庫(kù)數(shù)據(jù)
%% 2.如果有存活GM進(jìn)程萍嬉,隨機(jī)選擇一個(gè)GM進(jìn)程
%% 3.將當(dāng)前新增節(jié)點(diǎn)GM進(jìn)程加入到選擇的GM進(jìn)程右側(cè)
%% 4.將所有存活的鏡像隊(duì)列組裝成鏡像隊(duì)列循環(huán)隊(duì)列視圖A->D->B->C->A
View = join_group(Self, GroupName, TxnFun),
MembersState =
%% 獲取鏡像隊(duì)列視圖的所有key列表
case alive_view_members(View) of
%% 如果是第一個(gè)GM進(jìn)程的啟動(dòng)則初始化成員狀態(tài)數(shù)據(jù)結(jié)構(gòu)
[Self] -> blank_member_state();
%% 如果不是第一個(gè)GM進(jìn)程加入到Group中,則成員狀態(tài)先不做初始化隙疚,讓自己左側(cè)的GM進(jìn)程發(fā)送過(guò)來(lái)的信息進(jìn)行初始化
_ -> undefined
end,
%% 檢查當(dāng)前鏡像隊(duì)列的鄰居信息(根據(jù)消息鏡像隊(duì)列的群組循環(huán)視圖更新自己最新的左右兩邊的鏡像隊(duì)列)
State1 = check_neighbours(State #state { view = View, members_state = MembersState }),
%% 通知啟動(dòng)該GM進(jìn)程的進(jìn)程已經(jīng)成功加入鏡像隊(duì)列群組(rabbit_mirror_queue_coordinator或rabbit_mirror_queue_slave模塊回調(diào))
handle_callback_result(
{Module:joined(Args, get_pids(all_known_members(View))), State1});
(2)GM 進(jìn)程 A 處理新增 GM 進(jìn)程到自己右側(cè)
%% 處理將新的鏡像隊(duì)列加入到本鏡像隊(duì)列的右側(cè)的消息
handle_call({add_on_right, NewMember}, _From,
State = #state { self = Self,
group_name = GroupName,
members_state = MembersState,
txn_executor = TxnFun }) ->
%% 記錄將新的鏡像隊(duì)列成員加入到鏡像隊(duì)列組中壤追,將新加入的鏡像隊(duì)列寫(xiě)入gm_group結(jié)構(gòu)中的members字段中(有新成員加入群組的時(shí)候,則將版本號(hào)增加一)
Group = record_new_member_in_group(NewMember, Self,
GroupName, TxnFun),
%% 根據(jù)組成員信息生成新的鏡像隊(duì)列視圖數(shù)據(jù)結(jié)構(gòu)
View1 = group_to_view(Group),
%% 刪除擦除的成員
MembersState1 = remove_erased_members(MembersState,
View1),
%% 向新加入的成員即右邊成員發(fā)送加入成功的消息
ok = send_right(NewMember, View1,
{catchup, Self,
prepare_members_state(MembersState1)}),
%% 根據(jù)新的鏡像隊(duì)列循環(huán)隊(duì)列視圖和老的視圖修改視圖供屉,同時(shí)根據(jù)鏡像隊(duì)列循環(huán)視圖更新自己左右鄰居信息
{Result, State1} = change_view(View1, State #state {
members_state = MembersState1 }),
%% 向請(qǐng)求加入的鏡像隊(duì)列發(fā)送最新的當(dāng)前鏡像隊(duì)列的群組信息
handle_callback_result({Result, {ok, Group}, State1}).
(3) GM進(jìn)程 D 處理 GM 進(jìn)程 A 發(fā)送過(guò)來(lái)成員狀態(tài)信息
%% 左側(cè)的GM進(jìn)程通知右側(cè)的GM進(jìn)程最新的成員狀態(tài)(此情況是本GM進(jìn)程是新加入Group的行冰,等待左側(cè)GM進(jìn)程發(fā)送過(guò)來(lái)的消息進(jìn)行初始化成員狀態(tài))
handle_msg({catchup, Left, MembersStateLeft},
State = #state { self = Self,
left = {Left, _MRefL},
right = {Right, _MRefR},
view = View,
%% 新加入的GM進(jìn)程在加入后是沒(méi)有初始化成員狀態(tài),是等待左側(cè)玩家發(fā)送消息來(lái)進(jìn)行初始化
members_state = undefined }) ->
%% 異步向自己右側(cè)的鏡像隊(duì)列發(fā)送最新的所有成員信息伶丐,讓Group中的所有成員更新成員信息
ok = send_right(Right, View, {catchup, Self, MembersStateLeft}),
%% 將成員信息轉(zhuǎn)化成字典數(shù)據(jù)結(jié)構(gòu)
MembersStateLeft1 = build_members_state(MembersStateLeft),
%% 新增加的GM進(jìn)程更新最新的成員信息
{ok, State #state { members_state = MembersStateLeft1 }};
2.節(jié)點(diǎn)失效
? 當(dāng) Slave 節(jié)點(diǎn)失效時(shí)资柔,僅僅是相鄰節(jié)點(diǎn)感知,然后重新調(diào)整鄰居節(jié)點(diǎn)信息撵割,更新 rabbit_queue, gm_group的記錄。
? 當(dāng) Master 節(jié)點(diǎn)失效時(shí)流程如下:
(1)由于所有 mirror_queue_slave進(jìn)程會(huì)對(duì) amqqueue_process 進(jìn)程監(jiān)控辙芍,如果 Master 節(jié)點(diǎn)失效啡彬,mirror_queue_slave感知后通過(guò) GM 進(jìn)行廣播;
(2)存活最久的 Slave 節(jié)點(diǎn)會(huì)提升自己為 master 節(jié)點(diǎn)故硅;
(3)該節(jié)點(diǎn)會(huì)創(chuàng)建出新的 coordinator庶灿,并通知 GM 進(jìn)程修改回調(diào)處理器為 coordinator;
(4)原來(lái)的 mirror_queue_slave 作為 amqqueue_process 處理生產(chǎn)發(fā)布的消息吃衅,向消費(fèi)者投遞消息往踢。
核心流程詳解:
(1)GM 進(jìn)程掛掉處理
%% 接收到自己左右兩邊的鏡像隊(duì)列GM進(jìn)程掛掉的消息
handle_info({'DOWN', MRef, process, _Pid, Reason},
State = #state { self = Self,
left = Left,
right = Right,
group_name = GroupName,
confirms = Confirms,
txn_executor = TxnFun }) ->
%% 得到掛掉的GM進(jìn)程
Member = case {Left, Right} of
%% 左側(cè)的鏡像隊(duì)列GM進(jìn)程掛掉的情況
{{Member1, MRef}, _} -> Member1;
%% 右側(cè)的鏡像隊(duì)列GM進(jìn)程掛掉的情況
{_, {Member1, MRef}} -> Member1;
_ -> undefined
end,
case {Member, Reason} of
{undefined, _} ->
noreply(State);
{_, {shutdown, ring_shutdown}} ->
noreply(State);
_ -> timer:sleep(100),
%% 先記錄有鏡像隊(duì)列成員死亡的信息,然后將所有存活的鏡像隊(duì)列組裝鏡像隊(duì)列群組循環(huán)隊(duì)列視圖
%% 有成員死亡的時(shí)候會(huì)將版本號(hào)增加一徘层,record_dead_member_in_group函數(shù)是更新gm_group數(shù)據(jù)庫(kù)表中的數(shù)據(jù)峻呕,將死亡信息寫(xiě)入數(shù)據(jù)庫(kù)表
View1 = group_to_view(record_dead_member_in_group(
Member, GroupName,
TxnFun)),
handle_callback_result(
case alive_view_members(View1) of
%% 當(dāng)存活的鏡像隊(duì)列GM進(jìn)程只剩自己的情況
[Self] -> maybe_erase_aliases(
State #state {
members_state = blank_member_state(),
confirms = purge_confirms(Confirms) },
View1);
%% 當(dāng)存活的鏡像隊(duì)列GM進(jìn)程不止自己(根據(jù)新的鏡像隊(duì)列循環(huán)隊(duì)列視圖和老的視圖修改視圖利职,同時(shí)根據(jù)鏡像隊(duì)列循環(huán)視圖更新自己左右鄰居信息)
%% 同時(shí)將當(dāng)前自己節(jié)點(diǎn)的消息信息發(fā)布到自己右側(cè)的GM進(jìn)程
_ -> change_view(View1, State)
end)
end.
(2)主鏡像隊(duì)列回調(diào) rabbit_mirror_queue_coordinator處理 GM 進(jìn)程掛掉
%% 處理循環(huán)鏡像隊(duì)列中有死亡的鏡像隊(duì)列(主鏡像隊(duì)列接收到死亡的鏡像隊(duì)列不可能是主鏡像隊(duì)列死亡的消息,它監(jiān)視的左右兩側(cè)的從鏡像隊(duì)列進(jìn)程)
handle_cast({gm_deaths, DeadGMPids},
State = #state { q = #amqqueue { name = QueueName, pid = MPid } })
when node(MPid) =:= node() ->
%% 返回新的主鏡像隊(duì)列進(jìn)程瘦癌,死亡的鏡像隊(duì)列進(jìn)程列表猪贪,需要新增加鏡像隊(duì)列的節(jié)點(diǎn)列表
case rabbit_mirror_queue_misc:remove_from_queue(
QueueName, MPid, DeadGMPids) of
{ok, MPid, DeadPids, ExtraNodes} ->
%% 打印鏡像隊(duì)列死亡的日志
rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,
DeadPids),
%% 異步在ExtraNodes的所有節(jié)點(diǎn)上增加QName隊(duì)列的從鏡像隊(duì)列
rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes, async),
noreply(State);
{error, not_found} ->
{stop, normal, State}
end;
(3)從鏡像隊(duì)列回調(diào) rabbit_mirror_queue_coordinator處理 GM 進(jìn)程掛掉
%% 從鏡像隊(duì)列處理有鏡像隊(duì)列成員死亡的消息(從鏡像隊(duì)列接收到主鏡像隊(duì)列死亡的消息)
handle_call({gm_deaths, DeadGMPids}, From,
State = #state { gm = GM, q = Q = #amqqueue {
name = QName, pid = MPid }}) ->
Self = self(),
%% 返回新的主鏡像隊(duì)列進(jìn)程,死亡的鏡像隊(duì)列進(jìn)程列表讯私,需要新增加鏡像隊(duì)列的節(jié)點(diǎn)列表
case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of
{error, not_found} -> gen_server2:reply(From, ok),
{stop, normal, State};
{ok, Pid, DeadPids, ExtraNodes} ->
%% 打印鏡像隊(duì)列死亡的日志(Self是副鏡像隊(duì)列)
rabbit_mirror_queue_misc:report_deaths(Self, false, QName, DeadPids),
case Pid of
%% 此情況是主鏡像隊(duì)列沒(méi)有變化
MPid ->
gen_server2:reply(From, ok),
%% 異步在ExtraNodes的所有節(jié)點(diǎn)上增加QName隊(duì)列的副鏡像隊(duì)列
rabbit_mirror_queue_misc:add_mirrors(
QName, ExtraNodes, async),
noreply(State);
%% 此情況是本從鏡像隊(duì)列成為主鏡像隊(duì)列
Self ->
%% 將自己這個(gè)從鏡像隊(duì)列提升為主鏡像隊(duì)列
QueueState = promote_me(From, State),
%% 異步在ExtraNodes的所有節(jié)點(diǎn)上增加QName隊(duì)列的副鏡像隊(duì)列
rabbit_mirror_queue_misc:add_mirrors(
QName, ExtraNodes, async),
%% 返回消息热押,告知自己這個(gè)從鏡像隊(duì)列成為主鏡像隊(duì)列
{become, rabbit_amqqueue_process, QueueState, hibernate};
_ ->
%% 主鏡像隊(duì)列已經(jīng)發(fā)生變化
gen_server2:reply(From, ok),
[] = ExtraNodes,
%% 確認(rèn)在主節(jié)點(diǎn)宕機(jī)時(shí)否有為完成傳輸?shù)臄?shù)據(jù),確認(rèn)所有從節(jié)點(diǎn)都接收到主節(jié)點(diǎn)宕機(jī)的消息斤寇,然后傳輸未傳輸?shù)南ⅰ? ok = gm:broadcast(GM, process_death),
noreply(State #state { q = Q #amqqueue { pid = Pid } })
end
end;
(4)主鏡像隊(duì)列掛掉否選取新的主鏡像隊(duì)列
%% 返回新的主鏡像隊(duì)列進(jìn)程桶癣,死亡的鏡像隊(duì)列進(jìn)程列表,需要新增加鏡像隊(duì)列的節(jié)點(diǎn)列表
remove_from_queue(QueueName, Self, DeadGMPids) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
%% 代碼運(yùn)行到這一步有可能隊(duì)列已經(jīng)被刪除
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
[Q = #amqqueue { pid = QPid,
slave_pids = SPids,
gm_pids = GMPids }] ->
%% 獲得死亡的GM列表和存活的GM列表
{DeadGM, AliveGM} = lists:partition(
fun ({GM, _}) ->
lists:member(GM, DeadGMPids)
end, GMPids),
%% 獲得死亡的實(shí)際進(jìn)程的Pid列表
DeadPids = [Pid || {_GM, Pid} <- DeadGM],
%% 獲得存活的實(shí)際進(jìn)程的Pid列表
AlivePids = [Pid || {_GM, Pid} <- AliveGM],
%% 獲得slave_pids字段中存活的隊(duì)列進(jìn)程Pid列表
Alive = [Pid || Pid <- [QPid | SPids],
lists:member(Pid, AlivePids)],
%% 從存活的鏡像隊(duì)列提取出第一個(gè)鏡像隊(duì)列進(jìn)程Pid娘锁,它是最老的鏡像隊(duì)列牙寞,它將作為新的主鏡像隊(duì)列進(jìn)程
{QPid1, SPids1} = promote_slave(Alive),
Extra =
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
[];
%% 此處的情況是主鏡像隊(duì)列沒(méi)有變化,或者調(diào)用此接口的從鏡像隊(duì)列成為新的主鏡像隊(duì)列
_ when QPid =:= QPid1 orelse QPid1 =:= Self ->
%% 主鏡像隊(duì)列已經(jīng)變化致盟,當(dāng)前從隊(duì)列變更為主隊(duì)列碎税,信息更新到數(shù)據(jù)庫(kù)(mnesia)
Q1 = Q#amqqueue{pid = QPid1,
slave_pids = SPids1,
gm_pids = AliveGM},
store_updated_slaves(Q1),
%% 根據(jù)隊(duì)列的策略如果啟動(dòng)的從鏡像隊(duì)列需要自動(dòng)同步,則進(jìn)行同步操作
maybe_auto_sync(Q1),
%% 根據(jù)當(dāng)前集群節(jié)點(diǎn)和從鏡像隊(duì)列進(jìn)程所在的節(jié)點(diǎn)得到新增加的節(jié)點(diǎn)列表
slaves_to_start_on_failure(Q1, DeadGMPids);
%% 此處的情況是主鏡像隊(duì)列已經(jīng)發(fā)生變化馏锡,且調(diào)用此接口的從鏡像隊(duì)列沒(méi)有成為新的主鏡像隊(duì)列
_ ->
%% 更新最新的存活的從鏡像隊(duì)列進(jìn)程Pid列表和存活的GM進(jìn)程列表
Q1 = Q#amqqueue{slave_pids = Alive,
gm_pids = AliveGM},
%% 存儲(chǔ)更新隊(duì)列的從鏡像隊(duì)列信息
store_updated_slaves(Q1),
[]
end,
{ok, QPid1, DeadPids, Extra}
end
end).
四雷蹂、鏡像隊(duì)列消息同步
1.消息廣播
消息廣播流程如下:
(1)Master 節(jié)點(diǎn)發(fā)出消息,順著鏡像隊(duì)列循環(huán)列表發(fā)送杯道;
(2)所有 Slave 節(jié)點(diǎn)收到消息會(huì)對(duì)消息進(jìn)行緩存(Slave 節(jié)點(diǎn)緩存消息用于在廣播過(guò)程中匪煌,有節(jié)點(diǎn)失效或者新增節(jié)點(diǎn),這樣左側(cè)節(jié)點(diǎn)感知變化后會(huì)重新將消息推送給右側(cè)節(jié)點(diǎn))党巾;
(3)當(dāng) Master 節(jié)點(diǎn)收到自己發(fā)送的消息后意味著所有節(jié)點(diǎn)都收到了消息萎庭,會(huì)再次廣播 Ack 消息;
(4)Ack 消息同樣會(huì)順著循環(huán)列表經(jīng)過(guò)所有 Slave 節(jié)點(diǎn)齿拂,通知 Slave 節(jié)點(diǎn)可以清除緩存消息驳规;
(5)當(dāng) Ack 消息回到 Master 節(jié)點(diǎn),對(duì)應(yīng)消息的廣播結(jié)束署海。
核心流程詳解:
(1)GM 組群中消息廣播
%% 節(jié)點(diǎn)掛掉的情況或者新增節(jié)點(diǎn)發(fā)送給自己右側(cè)GM進(jìn)程的信息
%% 左側(cè)的GM進(jìn)程通知右側(cè)的GM進(jìn)程最新的成員狀態(tài)(此情況是有新GM進(jìn)程加入Group吗购,但是自己不是最新加入的GM進(jìn)程,但是自己仍然需要更新成員信息)
handle_msg({catchup, Left, MembersStateLeft},
State = #state { self = Self,
left = {Left, _MRefL},
view = View,
members_state = MembersState })
when MembersState =/= undefined ->
%% 將最新的成員信息轉(zhuǎn)化成字典數(shù)據(jù)結(jié)構(gòu)
MembersStateLeft1 = build_members_state(MembersStateLeft),
%% 獲取左側(cè)鏡像隊(duì)列傳入的成員信息和自己進(jìn)程存儲(chǔ)的成員信息的ID的去重
AllMembers = lists:usort(?DICT:fetch_keys(MembersState) ++
?DICT:fetch_keys(MembersStateLeft1)),
%% 根據(jù)左側(cè)GM進(jìn)程發(fā)送過(guò)來(lái)的成員狀態(tài)和自己GM進(jìn)程里的成員狀態(tài)得到需要廣播給后續(xù)GM進(jìn)程的信息
{MembersState1, Activity} =
lists:foldl(
fun (Id, MembersStateActivity) ->
%% 獲取左側(cè)鏡像隊(duì)列傳入Id對(duì)應(yīng)的鏡像隊(duì)列成員信息
#member { pending_ack = PALeft, last_ack = LA } =
find_member_or_blank(Id, MembersStateLeft1),
with_member_acc(
%% 函數(shù)的第一個(gè)參數(shù)是Id對(duì)應(yīng)的自己進(jìn)程存儲(chǔ)的鏡像隊(duì)列成員信息
fun (#member { pending_ack = PA } = Member, Activity1) ->
%% 發(fā)送者和自己是一個(gè)人則表示消息已經(jīng)發(fā)送回來(lái)砸狞,或者判斷發(fā)送者是否在死亡列表中
case is_member_alias(Id, Self, View) of
%% 此情況是發(fā)送者和自己是同一個(gè)人或者發(fā)送者已經(jīng)死亡
true ->
%% 根據(jù)左側(cè)GM進(jìn)程發(fā)送過(guò)來(lái)的ID最新的成員信息和本GM進(jìn)程ID對(duì)應(yīng)的成員信息得到已經(jīng)發(fā)布的信息
{_AcksInFlight, Pubs, _PA1} = find_prefix_common_suffix(PALeft, PA),
%% 重新將自己的消息發(fā)布
{Member #member { last_ack = LA },
%% 組裝發(fā)送的內(nèi)容和ack消息結(jié)構(gòu)
activity_cons(Id, pubs_from_queue(Pubs), [], Activity1)};
false ->
%% 根據(jù)左側(cè)GM進(jìn)程發(fā)送過(guò)來(lái)的ID最新的成員信息和本GM進(jìn)程ID對(duì)應(yīng)的成員信息得到Ack和Pub列表
%% 上一個(gè)節(jié)點(diǎn)少的消息就是已經(jīng)得到確認(rèn)的消息捻勉,多出來(lái)的是新發(fā)布的消息
{Acks, _Common, Pubs} =
find_prefix_common_suffix(PA, PALeft),
{Member,
%% 組裝發(fā)送的發(fā)布和ack消息結(jié)構(gòu)
activity_cons(Id, pubs_from_queue(Pubs), acks_from_queue(Acks), Activity1)}
end
end, Id, MembersStateActivity)
end, {MembersState, activity_nil()}, AllMembers),
handle_msg({activity, Left, activity_finalise(Activity)},
State #state { members_state = MembersState1 });
(2) GM 進(jìn)程內(nèi)部廣播
%% GM進(jìn)程內(nèi)部廣播的接口(先調(diào)用本GM進(jìn)程的回調(diào)進(jìn)程進(jìn)行處理消息,然后將廣播數(shù)據(jù)放入廣播緩存中)
internal_broadcast(Msg, SizeHint,
State = #state { self = Self,
pub_count = PubCount,
module = Module,
callback_args = Args,
broadcast_buffer = Buffer,
broadcast_buffer_sz = BufferSize }) ->
%% 將發(fā)布次數(shù)加一
PubCount1 = PubCount + 1,
{%% 先將消息調(diào)用回調(diào)模塊進(jìn)行處理
Module:handle_msg(Args, get_pid(Self), Msg),
%% 然后將廣播消息放入廣播緩存
State #state { pub_count = PubCount1,
broadcast_buffer = [{PubCount1, Msg} | Buffer],
broadcast_buffer_sz = BufferSize + SizeHint}}.
(3)緩存消息發(fā)送定時(shí)器
%% 確保廣播定時(shí)器的關(guān)閉和開(kāi)啟刀森,當(dāng)廣播緩存中有數(shù)據(jù)則啟動(dòng)定時(shí)器踱启,當(dāng)廣播緩存中沒(méi)有數(shù)據(jù)則停止定時(shí)器
%% 廣播緩存中沒(méi)有數(shù)據(jù),同時(shí)廣播定時(shí)器不存在的情況
ensure_broadcast_timer(State = #state { broadcast_buffer = [],
broadcast_timer = undefined }) ->
State;
%% 廣播緩存中沒(méi)有數(shù)據(jù),同時(shí)廣播定時(shí)器存在埠偿,則直接將定時(shí)器刪除掉
ensure_broadcast_timer(State = #state { broadcast_buffer = [],
broadcast_timer = TRef }) ->
erlang:cancel_timer(TRef),
State #state { broadcast_timer = undefined };
%% 廣播緩存中有數(shù)據(jù)且沒(méi)有定時(shí)器的情況
ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) ->
TRef = erlang:send_after(?BROADCAST_TIMER, self(), flush),
State #state { broadcast_timer = TRef };
ensure_broadcast_timer(State) ->
State.
注:當(dāng)處理消息時(shí)透罢,緩存中的內(nèi)容大小超過(guò)100M 則不會(huì)等定時(shí)器觸發(fā),會(huì)立刻將消息發(fā)給自己右側(cè)的 GM 進(jìn)程胚想。
2.消息同步
? 配置鏡像隊(duì)列時(shí)有一個(gè)屬性ha-sync-mode琐凭,支持兩種模式 automatic 或 manually 默認(rèn)為 manually。
? 當(dāng) ha-sync-mode = manually浊服,新節(jié)點(diǎn)加入到鏡像隊(duì)列組后统屈,可以從左節(jié)點(diǎn)獲取當(dāng)前正在廣播的消息,但是在加入之前已經(jīng)廣播的消息無(wú)法獲取牙躺,所以會(huì)處于鏡像隊(duì)列之間數(shù)據(jù)不一致的情況愁憔,直到加入之前的消息都被消費(fèi)后,主從鏡像隊(duì)列數(shù)據(jù)保持一致孽拷。當(dāng)加入之前的消息未全部消費(fèi)完之前吨掌,主節(jié)點(diǎn)宕機(jī),新節(jié)點(diǎn)選為主節(jié)點(diǎn)時(shí)脓恕,這部分消息將丟失膜宋。
? 當(dāng) ha-sync-mode = automatic,新加入組群的 Slave 節(jié)點(diǎn)會(huì)自動(dòng)進(jìn)行消息同步炼幔,使主從鏡像隊(duì)列數(shù)據(jù)保持一致秋茫。