RabbitMQ鏡像隊(duì)列實(shí)現(xiàn)原理

一顽冶、鏡像隊(duì)列使用

1.鏡像隊(duì)列作用

? RabbitMQ默認(rèn)集群模式采驻,并不包管隊(duì)列的高可用性宝当,盡管隊(duì)列信息视事,交換機(jī)、綁定這些可以復(fù)制到集群里的任何一個(gè)節(jié)點(diǎn)庆揩,然則隊(duì)列內(nèi)容不會(huì)復(fù)制俐东,固然該模式解決一項(xiàng)目組節(jié)點(diǎn)壓力跌穗,但隊(duì)列節(jié)點(diǎn)宕機(jī)直接導(dǎo)致該隊(duì)列無(wú)法應(yīng)用,只能守候重啟虏辫,所以要想在隊(duì)列節(jié)點(diǎn)宕機(jī)或故障也能正常應(yīng)用,就要復(fù)制隊(duì)列內(nèi)容到集群里的每個(gè)節(jié)點(diǎn)蚌吸,須要?jiǎng)?chuàng)建鏡像隊(duì)列。

2.策略設(shè)置

鏡像隊(duì)列設(shè)置可以基于策略設(shè)置砌庄,策略設(shè)置可以通過(guò)如下兩種方法:

(1)RabbitMQ 管理后臺(tái)

(2)rabbitmqctl 設(shè)置

policy 添加命令:

rabbitmqctl set_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern>  <definition>

指令參數(shù)詳情

參數(shù)名稱(chēng) 描述
-p 可選參數(shù)羹唠,針對(duì)指定 vhost 下的exchange或 queue
--priority 可選參數(shù),policy 的優(yōu)先級(jí)
--apply-to 可選參數(shù)娄昆,策略適用的對(duì)象類(lèi)型佩微,其值可為 "queues", "exchanges" 或 "all".默認(rèn)是"all"
name policy 的名稱(chēng)
pattern 匹配模式(正則表達(dá)式)
definition 鏡像定義,json 格式萌焰,包括三部分(ha-mode,ha-params,ha-sync-mode)具體配置見(jiàn)下表

definition參數(shù)詳情

參數(shù)名稱(chēng) 描述
ha-mode 指名鏡像隊(duì)列模式哺眯,其值可為"all","exactly"或"nodes",all:表示在集群所有節(jié)點(diǎn)上進(jìn)行鏡像扒俯;exactly:表示在指定個(gè)數(shù)的節(jié)點(diǎn)上鏡像奶卓,節(jié)點(diǎn)個(gè)數(shù)由 ha-params 指定;nodes:表示在指定節(jié)點(diǎn)上進(jìn)行鏡像撼玄,節(jié)點(diǎn)名稱(chēng)通過(guò)ha-params 指定寝杖。
ha-params ha-mode模式需要用到的參數(shù):exactly 模式下為數(shù)字表述鏡像節(jié)點(diǎn)數(shù),nodes 模式下為節(jié)點(diǎn)列表表示需要鏡像的節(jié)點(diǎn)互纯。
ha-sync-mode 鏡像隊(duì)列中消息的同步方式,其值可為"automatic"或"manually".

例如:對(duì)隊(duì)列名稱(chēng)為 hello 開(kāi)頭的所有隊(duì)列鏡像鏡像磕蒲,并且在集群的節(jié)點(diǎn) rabbit@10.18.195.57上進(jìn)行鏡像留潦,隊(duì)列消息自動(dòng)同步,policy 的設(shè)置命令:

rabbitmqctl set_policy --apply-to queues hello-ha "^hello" '{"ha-mode":"nodes","ha-params":["rabbit@10.18.195.57"],"ha-sync-mode":"automatic"}'

3.ha 策略確認(rèn)

鏡像隊(duì)列策略是否生效可以通過(guò)如下兩種方式驗(yàn)證:

(1)RabbitMQ 管理后臺(tái)

可以通過(guò)策略管理驗(yàn)證策略是否配置正確

通過(guò)隊(duì)列列表也可以查看隊(duì)列應(yīng)用的策略辣往,如果是鏡像策略兔院,可以看到當(dāng)前隊(duì)列副本數(shù)

通過(guò)隊(duì)列詳情可以查看鏡像隊(duì)列當(dāng)前主副本在哪個(gè)節(jié)點(diǎn),從副本在哪幾個(gè)節(jié)點(diǎn)

(2)rabbitmqctl 查看

查看策略詳情指令:

rabbitmqctl list_policies

返回:

查看隊(duì)列是否鏡像指令:

rabbitmqctl list_queues name pid slave_pids

返回:

二站削、鏡像隊(duì)列實(shí)現(xiàn)原理

1.整體介紹

? 通常隊(duì)列由兩部分組成:一部分是 amqqueue_process, 負(fù)責(zé)協(xié)議相關(guān)的消息處理坊萝,即接收生產(chǎn)者發(fā)布的消息,向消費(fèi)者投遞消息许起,處理消息 confirm十偶,ack 等等;另外一部分是 backing_queue园细, 作為消息存儲(chǔ)的具體形式和引擎惦积,提供了相關(guān)接口供進(jìn)程amqqueue_process調(diào)用,用來(lái)完成消息的存儲(chǔ)及可能的持久化工作等猛频。

? 鏡像隊(duì)列和普通隊(duì)列組成有所不同狮崩,鏡像隊(duì)列存在兩類(lèi)進(jìn)程:master隊(duì)列進(jìn)程為 amqqueue_process蛛勉,slave 隊(duì)列進(jìn)程為 rabbit_mirror_queue_slave,每個(gè)進(jìn)程會(huì)創(chuàng)建一個(gè) gm(guaranteed multicast)進(jìn)程睦柴,鏡像隊(duì)列中所有 gm 進(jìn)程會(huì)組成一個(gè)進(jìn)程組用于廣播和接收消息诽凌。同時(shí)和普通隊(duì)列一樣,每個(gè)進(jìn)程都包含一個(gè)用于處理消息邏輯的隊(duì)列 backing_queue(默認(rèn)為rabbit_variable_queue)坦敌。集群中每個(gè)有客戶(hù)端連接的節(jié)點(diǎn)都會(huì)啟動(dòng)若干個(gè)channel進(jìn)程侣诵,channel進(jìn)程中記錄著鏡像隊(duì)列中master和所有slave進(jìn)程的Pid,以便直接與隊(duì)列進(jìn)程通信恬试。整體結(jié)構(gòu)如下:

? gm 負(fù)責(zé)消息廣播窝趣,至于廣播消息處理,master 隊(duì)列上回掉處理是通過(guò)coordinator训柴,消息相關(guān)協(xié)議操作是通過(guò)amqqueue_process處理哑舒,而 slave 隊(duì)列都是由rabbit_mirror_queue_slave進(jìn)行處理。

注意:消息的發(fā)布和消費(fèi)都是通過(guò) master 隊(duì)列完成幻馁,master 隊(duì)列對(duì)消息進(jìn)行處理同時(shí)將消息的處理動(dòng)作通過(guò) gm 廣播給所有 slave 隊(duì)列洗鸵,slave 的 gm 收到消息后,通過(guò)回調(diào)交由 rabbit_mirror_queue_slave 進(jìn)行實(shí)際處理仗嗦。

2.gm(Guaranteed Muticast)

? 鏡像隊(duì)列 gm 組通過(guò)將所有 gm 進(jìn)程形成一個(gè)循環(huán)鏈表膘滨,每個(gè) gm 都會(huì)監(jiān)控位于自己左右兩邊的 gm,當(dāng)有 gm 新增時(shí)稀拐,相鄰的 gm 保證當(dāng)前廣播的消息會(huì)通知到新的 gm 上火邓;當(dāng)有 gm 失效時(shí),相鄰的 gm 會(huì)接管保證本次廣播消息會(huì)通知到所有 gm德撬。

? gm 組信息會(huì)記錄在本地?cái)?shù)據(jù)庫(kù)(mnesia)中铲咨,不同的鏡像隊(duì)列行程的 gm 組也是不同的。

? 消息從 master 隊(duì)列對(duì)應(yīng)的 gm 發(fā)出后蜓洪,順著鏈表依次傳送到所有 gm 進(jìn)程纤勒,由于所有 gm 進(jìn)程組成一個(gè)循環(huán)鏈表,master 隊(duì)列的 gm 線(xiàn)程最終會(huì)收到自己發(fā)送的消息隆檀,這個(gè)時(shí)候 master 隊(duì)列就知道消息已經(jīng)復(fù)制到所有 slave 隊(duì)列了摇天。

3.重要的數(shù)據(jù)結(jié)構(gòu)

queue 隊(duì)列相關(guān)信息

-record(q, 
        { q,                    %% 隊(duì)列信息數(shù)據(jù)結(jié)構(gòu)amqqueue
          exclusive_consumer,   %% 當(dāng)前隊(duì)列的獨(dú)有消費(fèi)者
          has_had_consumers,    %% 當(dāng)前隊(duì)列中是否有消費(fèi)者的標(biāo)識(shí)
          backing_queue,        %% backing_queue對(duì)應(yīng)的模塊名字
          backing_queue_state,  %% backing_queue對(duì)應(yīng)的狀態(tài)結(jié)構(gòu)
          consumers,            %% 消費(fèi)者存儲(chǔ)的優(yōu)先級(jí)隊(duì)列
          expires,              %% 當(dāng)前隊(duì)列未使用就刪除自己的時(shí)間
          sync_timer_ref,       %% 同步confirm的定時(shí)器,當(dāng)前隊(duì)列大部分接收一次消息就要確保當(dāng)前定時(shí)器的存在(200ms的定時(shí)器)
          rate_timer_ref,       %% 隊(duì)列中消息進(jìn)入和出去的速率定時(shí)器
          expiry_timer_ref,     %% 隊(duì)列中未使用就刪除自己的定時(shí)器
          stats_timer,          %% 向rabbit_event發(fā)布信息的數(shù)據(jù)結(jié)構(gòu)狀態(tài)字段
          msg_id_to_channel,    %% 當(dāng)前隊(duì)列進(jìn)程中等待confirm的消息gb_trees結(jié)構(gòu)恐仑,里面的結(jié)構(gòu)是Key:MsgId Value:{SenderPid, MsgSeqNo}
          ttl,                  %% 隊(duì)列中設(shè)置的消息存在的時(shí)間
          ttl_timer_ref,        %% 隊(duì)列中消息存在的定時(shí)器
          ttl_timer_expiry,     %% 當(dāng)前隊(duì)列頭部消息的過(guò)期時(shí)間點(diǎn)
          senders,              %% 向當(dāng)前隊(duì)列發(fā)送消息的rabbit_channel進(jìn)程列表
          dlx,                  %% 死亡消息要發(fā)送的exchange交換機(jī)(通過(guò)隊(duì)列聲明的參數(shù)或者policy接口來(lái)設(shè)置)
          dlx_routing_key,      %% 死亡消息要發(fā)送的路由規(guī)則(通過(guò)隊(duì)列聲明的參數(shù)或者policy接口來(lái)設(shè)置)
          max_length,           %% 當(dāng)前隊(duì)列中消息的最大上限(通過(guò)隊(duì)列聲明的參數(shù)或者policy接口來(lái)設(shè)置)
          max_bytes,            %% 隊(duì)列中消息內(nèi)容占的最大空間
          args_policy_version,  %% 當(dāng)前隊(duì)列中參數(shù)設(shè)置對(duì)應(yīng)的版本號(hào)泉坐,每設(shè)置一次都會(huì)將版本號(hào)加一
          status                %% 當(dāng)前隊(duì)列的狀態(tài)
        }).

state 記錄 gm 進(jìn)程狀態(tài)

-record(state,
        { self,                 %% gm本身的ID
          left,                 %% 該節(jié)點(diǎn)左邊的節(jié)點(diǎn)
          right,                %% 該節(jié)點(diǎn)右邊的節(jié)點(diǎn)
          group_name,           %% group名稱(chēng)與隊(duì)列名一致
          module,               %% 回調(diào)模塊rabbit_mirror_queue_slave或者rabbit_mirror_queue_coordinator
          view,                 %% group成員列表視圖信息,記錄了成員的ID及每個(gè)成員的左右鄰居節(jié)點(diǎn)(組裝成一個(gè)循環(huán)列表)
          pub_count,            %% 當(dāng)前已發(fā)布的消息計(jì)數(shù)
          members_state,        %% group成員狀態(tài)列表 記錄了廣播狀態(tài):[#member{}]
          callback_args,        %% 回調(diào)函數(shù)的參數(shù)信息菊霜,rabbit_mirror_queue_slave/rabbit_mirror_queue_coordinator進(jìn)程PID
          confirms,             %% confirm列表
          broadcast_buffer,     %% 緩存待廣播的消息
          broadcast_buffer_sz,  %% 當(dāng)前緩存帶廣播中消息實(shí)體總的大小
          broadcast_timer,      %% 廣播消息定時(shí)器
          txn_executor          %% 操作Mnesia數(shù)據(jù)庫(kù)的操作函數(shù)
        }).

gm_group 整個(gè)鏡像隊(duì)列群組的信息坚冀,該信息會(huì)存儲(chǔ)到Mnesia數(shù)據(jù)庫(kù)

-record(gm_group, 
        { name,    %% group的名稱(chēng),與queue的名稱(chēng)一致
          version, %% group的版本號(hào), 新增節(jié)點(diǎn)/節(jié)點(diǎn)失效時(shí)會(huì)遞增
          members  %% group的成員列表, 按照節(jié)點(diǎn)組成的鏈表順序進(jìn)行排序
        }).

view_member 鏡像隊(duì)列群組視圖成員數(shù)據(jù)結(jié)構(gòu)

-record(view_member, 
        { id,       %% 單個(gè)鏡像隊(duì)列(結(jié)構(gòu)是{版本號(hào),該鏡像隊(duì)列的Pid})
          aliases,  %% 記錄id對(duì)應(yīng)的左側(cè)死亡的GM進(jìn)程列表
          left,     %% 當(dāng)前鏡像隊(duì)列左邊的鏡像隊(duì)列(結(jié)構(gòu)是{版本號(hào)鉴逞,該鏡像隊(duì)列的Pid})
          right     %% 當(dāng)前鏡像隊(duì)列右邊的鏡像隊(duì)列(結(jié)構(gòu)是{版本號(hào)记某,該鏡像隊(duì)列的Pid})
        }).

三司训、鏡像隊(duì)列組群維護(hù)

1.節(jié)點(diǎn)新加入組群

目前已有節(jié)點(diǎn) A,B液南,C壳猜,新加入節(jié)點(diǎn) B,如圖:

節(jié)點(diǎn)加入集群流程如下:

(1)新增節(jié)點(diǎn)先從 gm_group 中獲取對(duì)應(yīng) group 成員信息滑凉;

(2)隨機(jī)選擇一個(gè)節(jié)點(diǎn)并向這個(gè)節(jié)點(diǎn)發(fā)送加入請(qǐng)求统扳;

(3)集群節(jié)點(diǎn)收到新增節(jié)點(diǎn)請(qǐng)求后,更新 gm_group 對(duì)應(yīng)信息畅姊,同時(shí)更新左右節(jié)點(diǎn)更新鄰居信息(調(diào)整對(duì)左右節(jié)點(diǎn)的監(jiān)控)咒钟;

(4)集群節(jié)點(diǎn)回復(fù)通知新增節(jié)點(diǎn)成功加入 group;

(5)新增節(jié)點(diǎn)收到回復(fù)后更新 rabbit_queue 中的相關(guān)信息若未,同時(shí)根據(jù)策略同步消息朱嘴。

核心流程詳解:

(1)新增節(jié)點(diǎn) D 的 GM 進(jìn)程請(qǐng)求加入組群

%% 同步處理將自己加入到鏡像隊(duì)列的群組中的消息
handle_cast(join, State = #state { self          = Self,
                                   group_name    = GroupName,
                                   members_state = undefined,
                                   module        = Module,
                                   callback_args = Args,
                                   txn_executor  = TxnFun })->
    %% join_group函數(shù)主要執(zhí)行邏輯
    %% 1.判斷時(shí)候有存活節(jié)點(diǎn),如果沒(méi)有存活粗合,則重新創(chuàng)建gm_group數(shù)據(jù)庫(kù)數(shù)據(jù)
    %% 2.如果有存活GM進(jìn)程萍嬉,隨機(jī)選擇一個(gè)GM進(jìn)程
    %% 3.將當(dāng)前新增節(jié)點(diǎn)GM進(jìn)程加入到選擇的GM進(jìn)程右側(cè)
    %% 4.將所有存活的鏡像隊(duì)列組裝成鏡像隊(duì)列循環(huán)隊(duì)列視圖A->D->B->C->A
    View = join_group(Self, GroupName, TxnFun),
    MembersState =
        %% 獲取鏡像隊(duì)列視圖的所有key列表
        case alive_view_members(View) of
            %% 如果是第一個(gè)GM進(jìn)程的啟動(dòng)則初始化成員狀態(tài)數(shù)據(jù)結(jié)構(gòu)
            [Self] -> blank_member_state();
            %% 如果不是第一個(gè)GM進(jìn)程加入到Group中,則成員狀態(tài)先不做初始化隙疚,讓自己左側(cè)的GM進(jìn)程發(fā)送過(guò)來(lái)的信息進(jìn)行初始化
            _      -> undefined
        end,
    %% 檢查當(dāng)前鏡像隊(duì)列的鄰居信息(根據(jù)消息鏡像隊(duì)列的群組循環(huán)視圖更新自己最新的左右兩邊的鏡像隊(duì)列)
    State1 = check_neighbours(State #state { view = View, members_state = MembersState }),
    %% 通知啟動(dòng)該GM進(jìn)程的進(jìn)程已經(jīng)成功加入鏡像隊(duì)列群組(rabbit_mirror_queue_coordinator或rabbit_mirror_queue_slave模塊回調(diào))
    handle_callback_result(
      {Module:joined(Args, get_pids(all_known_members(View))), State1});

(2)GM 進(jìn)程 A 處理新增 GM 進(jìn)程到自己右側(cè)

%% 處理將新的鏡像隊(duì)列加入到本鏡像隊(duì)列的右側(cè)的消息
handle_call({add_on_right, NewMember}, _From,
            State = #state { self          = Self,
                             group_name    = GroupName,
                             members_state = MembersState,
                             txn_executor  = TxnFun }) ->
    %% 記錄將新的鏡像隊(duì)列成員加入到鏡像隊(duì)列組中壤追,將新加入的鏡像隊(duì)列寫(xiě)入gm_group結(jié)構(gòu)中的members字段中(有新成員加入群組的時(shí)候,則將版本號(hào)增加一)
    Group = record_new_member_in_group(NewMember, Self, 
                                       GroupName, TxnFun),
    %% 根據(jù)組成員信息生成新的鏡像隊(duì)列視圖數(shù)據(jù)結(jié)構(gòu)
    View1 = group_to_view(Group),
    %% 刪除擦除的成員
    MembersState1 = remove_erased_members(MembersState, 
                                          View1),
    %% 向新加入的成員即右邊成員發(fā)送加入成功的消息
    ok = send_right(NewMember, View1,
                    {catchup, Self,          
                     prepare_members_state(MembersState1)}),
    %% 根據(jù)新的鏡像隊(duì)列循環(huán)隊(duì)列視圖和老的視圖修改視圖供屉,同時(shí)根據(jù)鏡像隊(duì)列循環(huán)視圖更新自己左右鄰居信息
    {Result, State1} = change_view(View1, State #state {
                                                        members_state = MembersState1 }),
    %% 向請(qǐng)求加入的鏡像隊(duì)列發(fā)送最新的當(dāng)前鏡像隊(duì)列的群組信息
    handle_callback_result({Result, {ok, Group}, State1}).

(3) GM進(jìn)程 D 處理 GM 進(jìn)程 A 發(fā)送過(guò)來(lái)成員狀態(tài)信息

%% 左側(cè)的GM進(jìn)程通知右側(cè)的GM進(jìn)程最新的成員狀態(tài)(此情況是本GM進(jìn)程是新加入Group的行冰,等待左側(cè)GM進(jìn)程發(fā)送過(guò)來(lái)的消息進(jìn)行初始化成員狀態(tài))
handle_msg({catchup, Left, MembersStateLeft},
           State = #state { self          = Self,
                            left          = {Left, _MRefL},
                            right         = {Right, _MRefR},
                            view          = View,
                            %% 新加入的GM進(jìn)程在加入后是沒(méi)有初始化成員狀態(tài),是等待左側(cè)玩家發(fā)送消息來(lái)進(jìn)行初始化
                            members_state = undefined }) ->
    %% 異步向自己右側(cè)的鏡像隊(duì)列發(fā)送最新的所有成員信息伶丐,讓Group中的所有成員更新成員信息
    ok = send_right(Right, View, {catchup, Self, MembersStateLeft}),
    %% 將成員信息轉(zhuǎn)化成字典數(shù)據(jù)結(jié)構(gòu)
    MembersStateLeft1 = build_members_state(MembersStateLeft),
    %% 新增加的GM進(jìn)程更新最新的成員信息
    {ok, State #state { members_state = MembersStateLeft1 }};

2.節(jié)點(diǎn)失效

? 當(dāng) Slave 節(jié)點(diǎn)失效時(shí)资柔,僅僅是相鄰節(jié)點(diǎn)感知,然后重新調(diào)整鄰居節(jié)點(diǎn)信息撵割,更新 rabbit_queue, gm_group的記錄。

? 當(dāng) Master 節(jié)點(diǎn)失效時(shí)流程如下:

(1)由于所有 mirror_queue_slave進(jìn)程會(huì)對(duì) amqqueue_process 進(jìn)程監(jiān)控辙芍,如果 Master 節(jié)點(diǎn)失效啡彬,mirror_queue_slave感知后通過(guò) GM 進(jìn)行廣播;

(2)存活最久的 Slave 節(jié)點(diǎn)會(huì)提升自己為 master 節(jié)點(diǎn)故硅;

(3)該節(jié)點(diǎn)會(huì)創(chuàng)建出新的 coordinator庶灿,并通知 GM 進(jìn)程修改回調(diào)處理器為 coordinator;

(4)原來(lái)的 mirror_queue_slave 作為 amqqueue_process 處理生產(chǎn)發(fā)布的消息吃衅,向消費(fèi)者投遞消息往踢。

核心流程詳解:

(1)GM 進(jìn)程掛掉處理

%% 接收到自己左右兩邊的鏡像隊(duì)列GM進(jìn)程掛掉的消息
handle_info({'DOWN', MRef, process, _Pid, Reason},
            State = #state { self          = Self,
                             left          = Left,
                             right         = Right,
                             group_name    = GroupName,
                             confirms      = Confirms,
                             txn_executor  = TxnFun }) ->
    %% 得到掛掉的GM進(jìn)程
    Member = case {Left, Right} of
                 %% 左側(cè)的鏡像隊(duì)列GM進(jìn)程掛掉的情況
                 {{Member1, MRef}, _} -> Member1;
                 %% 右側(cè)的鏡像隊(duì)列GM進(jìn)程掛掉的情況
                 {_, {Member1, MRef}} -> Member1;
                 _                    -> undefined
             end,
    case {Member, Reason} of
        {undefined, _} ->
            noreply(State);
        {_, {shutdown, ring_shutdown}} ->
            noreply(State);
        _ -> timer:sleep(100),
            %% 先記錄有鏡像隊(duì)列成員死亡的信息,然后將所有存活的鏡像隊(duì)列組裝鏡像隊(duì)列群組循環(huán)隊(duì)列視圖
            %% 有成員死亡的時(shí)候會(huì)將版本號(hào)增加一徘层,record_dead_member_in_group函數(shù)是更新gm_group數(shù)據(jù)庫(kù)表中的數(shù)據(jù)峻呕,將死亡信息寫(xiě)入數(shù)據(jù)庫(kù)表
            View1 = group_to_view(record_dead_member_in_group(
                                    Member, GroupName, 
                                    TxnFun)),
            handle_callback_result(
              case alive_view_members(View1) of
                  %% 當(dāng)存活的鏡像隊(duì)列GM進(jìn)程只剩自己的情況
                  [Self] -> maybe_erase_aliases(
                              State #state {
                                            members_state = blank_member_state(),
                                            confirms      = purge_confirms(Confirms) },
                                           View1);
                  %% 當(dāng)存活的鏡像隊(duì)列GM進(jìn)程不止自己(根據(jù)新的鏡像隊(duì)列循環(huán)隊(duì)列視圖和老的視圖修改視圖利职,同時(shí)根據(jù)鏡像隊(duì)列循環(huán)視圖更新自己左右鄰居信息)
                  %% 同時(shí)將當(dāng)前自己節(jié)點(diǎn)的消息信息發(fā)布到自己右側(cè)的GM進(jìn)程
                  _      -> change_view(View1, State)
              end)
    end.

(2)主鏡像隊(duì)列回調(diào) rabbit_mirror_queue_coordinator處理 GM 進(jìn)程掛掉

%% 處理循環(huán)鏡像隊(duì)列中有死亡的鏡像隊(duì)列(主鏡像隊(duì)列接收到死亡的鏡像隊(duì)列不可能是主鏡像隊(duì)列死亡的消息,它監(jiān)視的左右兩側(cè)的從鏡像隊(duì)列進(jìn)程)
handle_cast({gm_deaths, DeadGMPids},
            State = #state { q  = #amqqueue { name = QueueName, pid = MPid } })
  when node(MPid) =:= node() ->
    %% 返回新的主鏡像隊(duì)列進(jìn)程瘦癌,死亡的鏡像隊(duì)列進(jìn)程列表猪贪,需要新增加鏡像隊(duì)列的節(jié)點(diǎn)列表
    case rabbit_mirror_queue_misc:remove_from_queue(
           QueueName, MPid, DeadGMPids) of
        {ok, MPid, DeadPids, ExtraNodes} ->
            %% 打印鏡像隊(duì)列死亡的日志
            rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,
                                                   DeadPids),
            %% 異步在ExtraNodes的所有節(jié)點(diǎn)上增加QName隊(duì)列的從鏡像隊(duì)列
            rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes, async),
            noreply(State);
        {error, not_found} ->
            {stop, normal, State}
    end;

(3)從鏡像隊(duì)列回調(diào) rabbit_mirror_queue_coordinator處理 GM 進(jìn)程掛掉

%% 從鏡像隊(duì)列處理有鏡像隊(duì)列成員死亡的消息(從鏡像隊(duì)列接收到主鏡像隊(duì)列死亡的消息)
handle_call({gm_deaths, DeadGMPids}, From,
            State = #state { gm = GM, q = Q = #amqqueue {
                                                         name = QName, pid = MPid }}) ->
    Self = self(),
    %% 返回新的主鏡像隊(duì)列進(jìn)程,死亡的鏡像隊(duì)列進(jìn)程列表讯私,需要新增加鏡像隊(duì)列的節(jié)點(diǎn)列表
    case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of
        {error, not_found} -> gen_server2:reply(From, ok),
        {stop, normal, State};
        {ok, Pid, DeadPids, ExtraNodes} ->
            %% 打印鏡像隊(duì)列死亡的日志(Self是副鏡像隊(duì)列)
            rabbit_mirror_queue_misc:report_deaths(Self, false, QName, DeadPids),
            case Pid of
                %% 此情況是主鏡像隊(duì)列沒(méi)有變化
                MPid ->
                    gen_server2:reply(From, ok),
                    %% 異步在ExtraNodes的所有節(jié)點(diǎn)上增加QName隊(duì)列的副鏡像隊(duì)列
                    rabbit_mirror_queue_misc:add_mirrors(
                      QName, ExtraNodes, async),
                    noreply(State);
                %% 此情況是本從鏡像隊(duì)列成為主鏡像隊(duì)列
                Self ->
                    %% 將自己這個(gè)從鏡像隊(duì)列提升為主鏡像隊(duì)列
                    QueueState = promote_me(From, State),
                    %% 異步在ExtraNodes的所有節(jié)點(diǎn)上增加QName隊(duì)列的副鏡像隊(duì)列
                    rabbit_mirror_queue_misc:add_mirrors(
                      QName, ExtraNodes, async),
                    %% 返回消息热押,告知自己這個(gè)從鏡像隊(duì)列成為主鏡像隊(duì)列
                    {become, rabbit_amqqueue_process, QueueState, hibernate};
                _ ->
                    %% 主鏡像隊(duì)列已經(jīng)發(fā)生變化
                    gen_server2:reply(From, ok),
                    [] = ExtraNodes,
                    %% 確認(rèn)在主節(jié)點(diǎn)宕機(jī)時(shí)否有為完成傳輸?shù)臄?shù)據(jù),確認(rèn)所有從節(jié)點(diǎn)都接收到主節(jié)點(diǎn)宕機(jī)的消息斤寇,然后傳輸未傳輸?shù)南ⅰ?                    ok = gm:broadcast(GM, process_death),
                    noreply(State #state { q = Q #amqqueue { pid = Pid } })
            end
    end;

(4)主鏡像隊(duì)列掛掉否選取新的主鏡像隊(duì)列

%% 返回新的主鏡像隊(duì)列進(jìn)程桶癣,死亡的鏡像隊(duì)列進(jìn)程列表,需要新增加鏡像隊(duì)列的節(jié)點(diǎn)列表
remove_from_queue(QueueName, Self, DeadGMPids) ->
    rabbit_misc:execute_mnesia_transaction(
      fun () ->
               %% 代碼運(yùn)行到這一步有可能隊(duì)列已經(jīng)被刪除
               case mnesia:read({rabbit_queue, QueueName}) of
                   [] -> {error, not_found};
                   [Q = #amqqueue { pid        = QPid,
                                    slave_pids = SPids,
                                    gm_pids    = GMPids }] ->
                       %% 獲得死亡的GM列表和存活的GM列表
                       {DeadGM, AliveGM} = lists:partition(
                                             fun ({GM, _}) ->
                                                      lists:member(GM, DeadGMPids)
                                             end, GMPids),
                       %% 獲得死亡的實(shí)際進(jìn)程的Pid列表
                       DeadPids  = [Pid || {_GM, Pid} <- DeadGM],
                       %% 獲得存活的實(shí)際進(jìn)程的Pid列表
                       AlivePids = [Pid || {_GM, Pid} <- AliveGM],
                       %% 獲得slave_pids字段中存活的隊(duì)列進(jìn)程Pid列表
                       Alive     = [Pid || Pid <- [QPid | SPids],
                                           lists:member(Pid, AlivePids)],
                       %% 從存活的鏡像隊(duì)列提取出第一個(gè)鏡像隊(duì)列進(jìn)程Pid娘锁,它是最老的鏡像隊(duì)列牙寞,它將作為新的主鏡像隊(duì)列進(jìn)程
                       {QPid1, SPids1} = promote_slave(Alive),
                       Extra =
                           case {{QPid, SPids}, {QPid1, SPids1}} of
                               {Same, Same} ->
                                   [];
                               %% 此處的情況是主鏡像隊(duì)列沒(méi)有變化,或者調(diào)用此接口的從鏡像隊(duì)列成為新的主鏡像隊(duì)列
                               _ when QPid =:= QPid1 orelse QPid1 =:= Self ->
                                   %% 主鏡像隊(duì)列已經(jīng)變化致盟,當(dāng)前從隊(duì)列變更為主隊(duì)列碎税,信息更新到數(shù)據(jù)庫(kù)(mnesia)
                                   Q1 = Q#amqqueue{pid        = QPid1,
                                                   slave_pids = SPids1,
                                                   gm_pids    = AliveGM},
                                   store_updated_slaves(Q1),
                                   
                                   %% 根據(jù)隊(duì)列的策略如果啟動(dòng)的從鏡像隊(duì)列需要自動(dòng)同步,則進(jìn)行同步操作
                                   maybe_auto_sync(Q1),
                                   %% 根據(jù)當(dāng)前集群節(jié)點(diǎn)和從鏡像隊(duì)列進(jìn)程所在的節(jié)點(diǎn)得到新增加的節(jié)點(diǎn)列表
                               slaves_to_start_on_failure(Q1, DeadGMPids);
                               %% 此處的情況是主鏡像隊(duì)列已經(jīng)發(fā)生變化馏锡,且調(diào)用此接口的從鏡像隊(duì)列沒(méi)有成為新的主鏡像隊(duì)列
                               _ ->
                                   %% 更新最新的存活的從鏡像隊(duì)列進(jìn)程Pid列表和存活的GM進(jìn)程列表
                                   Q1 = Q#amqqueue{slave_pids = Alive,
                                                   gm_pids    = AliveGM},
                                   %% 存儲(chǔ)更新隊(duì)列的從鏡像隊(duì)列信息
                                   store_updated_slaves(Q1),
                                   []
                           end,
                       {ok, QPid1, DeadPids, Extra}
               end
      end).

四雷蹂、鏡像隊(duì)列消息同步

1.消息廣播

消息廣播流程如下:

(1)Master 節(jié)點(diǎn)發(fā)出消息,順著鏡像隊(duì)列循環(huán)列表發(fā)送杯道;

(2)所有 Slave 節(jié)點(diǎn)收到消息會(huì)對(duì)消息進(jìn)行緩存(Slave 節(jié)點(diǎn)緩存消息用于在廣播過(guò)程中匪煌,有節(jié)點(diǎn)失效或者新增節(jié)點(diǎn),這樣左側(cè)節(jié)點(diǎn)感知變化后會(huì)重新將消息推送給右側(cè)節(jié)點(diǎn))党巾;

(3)當(dāng) Master 節(jié)點(diǎn)收到自己發(fā)送的消息后意味著所有節(jié)點(diǎn)都收到了消息萎庭,會(huì)再次廣播 Ack 消息;

(4)Ack 消息同樣會(huì)順著循環(huán)列表經(jīng)過(guò)所有 Slave 節(jié)點(diǎn)齿拂,通知 Slave 節(jié)點(diǎn)可以清除緩存消息驳规;

(5)當(dāng) Ack 消息回到 Master 節(jié)點(diǎn),對(duì)應(yīng)消息的廣播結(jié)束署海。

核心流程詳解:

(1)GM 組群中消息廣播

%% 節(jié)點(diǎn)掛掉的情況或者新增節(jié)點(diǎn)發(fā)送給自己右側(cè)GM進(jìn)程的信息
%% 左側(cè)的GM進(jìn)程通知右側(cè)的GM進(jìn)程最新的成員狀態(tài)(此情況是有新GM進(jìn)程加入Group吗购,但是自己不是最新加入的GM進(jìn)程,但是自己仍然需要更新成員信息)
handle_msg({catchup, Left, MembersStateLeft},
           State = #state { self = Self,
                            left = {Left, _MRefL},
                            view = View,
                            members_state = MembersState })
  when MembersState =/= undefined ->
    %% 將最新的成員信息轉(zhuǎn)化成字典數(shù)據(jù)結(jié)構(gòu)
    MembersStateLeft1 = build_members_state(MembersStateLeft),
    %% 獲取左側(cè)鏡像隊(duì)列傳入的成員信息和自己進(jìn)程存儲(chǔ)的成員信息的ID的去重
    AllMembers = lists:usort(?DICT:fetch_keys(MembersState) ++
                        ?DICT:fetch_keys(MembersStateLeft1)),
    %% 根據(jù)左側(cè)GM進(jìn)程發(fā)送過(guò)來(lái)的成員狀態(tài)和自己GM進(jìn)程里的成員狀態(tài)得到需要廣播給后續(xù)GM進(jìn)程的信息
    {MembersState1, Activity} =
        lists:foldl(
          fun (Id, MembersStateActivity) ->
                   %% 獲取左側(cè)鏡像隊(duì)列傳入Id對(duì)應(yīng)的鏡像隊(duì)列成員信息
                   #member { pending_ack = PALeft, last_ack = LA } =
                               find_member_or_blank(Id, MembersStateLeft1),
                   with_member_acc(
                     %% 函數(shù)的第一個(gè)參數(shù)是Id對(duì)應(yīng)的自己進(jìn)程存儲(chǔ)的鏡像隊(duì)列成員信息
                     fun (#member { pending_ack = PA } = Member, Activity1) ->
                              %% 發(fā)送者和自己是一個(gè)人則表示消息已經(jīng)發(fā)送回來(lái)砸狞,或者判斷發(fā)送者是否在死亡列表中
                              case is_member_alias(Id, Self, View) of
                                  %% 此情況是發(fā)送者和自己是同一個(gè)人或者發(fā)送者已經(jīng)死亡
                                  true ->
                                      %% 根據(jù)左側(cè)GM進(jìn)程發(fā)送過(guò)來(lái)的ID最新的成員信息和本GM進(jìn)程ID對(duì)應(yīng)的成員信息得到已經(jīng)發(fā)布的信息
                                      {_AcksInFlight, Pubs, _PA1} = find_prefix_common_suffix(PALeft, PA),
                                      %% 重新將自己的消息發(fā)布
                                      {Member #member { last_ack = LA },
                                    %% 組裝發(fā)送的內(nèi)容和ack消息結(jié)構(gòu)
                                      activity_cons(Id, pubs_from_queue(Pubs), [], Activity1)};
                                  false ->
                                      %% 根據(jù)左側(cè)GM進(jìn)程發(fā)送過(guò)來(lái)的ID最新的成員信息和本GM進(jìn)程ID對(duì)應(yīng)的成員信息得到Ack和Pub列表
                                      %% 上一個(gè)節(jié)點(diǎn)少的消息就是已經(jīng)得到確認(rèn)的消息捻勉,多出來(lái)的是新發(fā)布的消息
                                      {Acks, _Common, Pubs} =
                      find_prefix_common_suffix(PA, PALeft),
                                      {Member,
                                       %% 組裝發(fā)送的發(fā)布和ack消息結(jié)構(gòu)
                                       activity_cons(Id, pubs_from_queue(Pubs), acks_from_queue(Acks), Activity1)}
                              end
                     end, Id, MembersStateActivity)
          end, {MembersState, activity_nil()}, AllMembers),
    handle_msg({activity, Left, activity_finalise(Activity)},
               State #state { members_state = MembersState1 });

(2) GM 進(jìn)程內(nèi)部廣播

%% GM進(jìn)程內(nèi)部廣播的接口(先調(diào)用本GM進(jìn)程的回調(diào)進(jìn)程進(jìn)行處理消息,然后將廣播數(shù)據(jù)放入廣播緩存中)
internal_broadcast(Msg, SizeHint,
                   State = #state { self                = Self,
                                    pub_count           = PubCount,
                                    module              = Module,
                                    callback_args       = Args,
                                    broadcast_buffer    = Buffer,
                                    broadcast_buffer_sz = BufferSize }) ->
    %% 將發(fā)布次數(shù)加一
    PubCount1 = PubCount + 1,
    {%% 先將消息調(diào)用回調(diào)模塊進(jìn)行處理
     Module:handle_msg(Args, get_pid(Self), Msg),
     %% 然后將廣播消息放入廣播緩存
     State #state { pub_count           = PubCount1,
                    broadcast_buffer    = [{PubCount1, Msg} | Buffer],
                    broadcast_buffer_sz = BufferSize + SizeHint}}.

(3)緩存消息發(fā)送定時(shí)器

%% 確保廣播定時(shí)器的關(guān)閉和開(kāi)啟刀森,當(dāng)廣播緩存中有數(shù)據(jù)則啟動(dòng)定時(shí)器踱启,當(dāng)廣播緩存中沒(méi)有數(shù)據(jù)則停止定時(shí)器
%% 廣播緩存中沒(méi)有數(shù)據(jù),同時(shí)廣播定時(shí)器不存在的情況
ensure_broadcast_timer(State = #state { broadcast_buffer = [],
                                        broadcast_timer  = undefined }) ->
    State;
%% 廣播緩存中沒(méi)有數(shù)據(jù),同時(shí)廣播定時(shí)器存在埠偿,則直接將定時(shí)器刪除掉
ensure_broadcast_timer(State = #state { broadcast_buffer = [],
                                        broadcast_timer  = TRef }) ->
    erlang:cancel_timer(TRef),
    State #state { broadcast_timer = undefined };
%% 廣播緩存中有數(shù)據(jù)且沒(méi)有定時(shí)器的情況
ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) ->
    TRef = erlang:send_after(?BROADCAST_TIMER, self(), flush),
    State #state { broadcast_timer = TRef };
ensure_broadcast_timer(State) ->
    State.

注:當(dāng)處理消息時(shí)透罢,緩存中的內(nèi)容大小超過(guò)100M 則不會(huì)等定時(shí)器觸發(fā),會(huì)立刻將消息發(fā)給自己右側(cè)的 GM 進(jìn)程胚想。

2.消息同步

? 配置鏡像隊(duì)列時(shí)有一個(gè)屬性ha-sync-mode琐凭,支持兩種模式 automatic 或 manually 默認(rèn)為 manually。

? 當(dāng) ha-sync-mode = manually浊服,新節(jié)點(diǎn)加入到鏡像隊(duì)列組后统屈,可以從左節(jié)點(diǎn)獲取當(dāng)前正在廣播的消息,但是在加入之前已經(jīng)廣播的消息無(wú)法獲取牙躺,所以會(huì)處于鏡像隊(duì)列之間數(shù)據(jù)不一致的情況愁憔,直到加入之前的消息都被消費(fèi)后,主從鏡像隊(duì)列數(shù)據(jù)保持一致孽拷。當(dāng)加入之前的消息未全部消費(fèi)完之前吨掌,主節(jié)點(diǎn)宕機(jī),新節(jié)點(diǎn)選為主節(jié)點(diǎn)時(shí)脓恕,這部分消息將丟失膜宋。

? 當(dāng) ha-sync-mode = automatic,新加入組群的 Slave 節(jié)點(diǎn)會(huì)自動(dòng)進(jìn)行消息同步炼幔,使主從鏡像隊(duì)列數(shù)據(jù)保持一致秋茫。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市乃秀,隨后出現(xiàn)的幾起案子肛著,更是在濱河造成了極大的恐慌,老刑警劉巖跺讯,帶你破解...
    沈念sama閱讀 221,820評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件枢贿,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡刀脏,警方通過(guò)查閱死者的電腦和手機(jī)局荚,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)愈污,“玉大人危队,你說(shuō)我怎么就攤上這事「婆希” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,324評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵金麸,是天一觀的道長(zhǎng)擎析。 經(jīng)常有香客問(wèn)我,道長(zhǎng),這世上最難降的妖魔是什么揍魂? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,714評(píng)論 1 297
  • 正文 為了忘掉前任桨醋,我火速辦了婚禮,結(jié)果婚禮上现斋,老公的妹妹穿的比我還像新娘喜最。我一直安慰自己,他們只是感情好庄蹋,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,724評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布瞬内。 她就那樣靜靜地躺著,像睡著了一般限书。 火紅的嫁衣襯著肌膚如雪虫蝶。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 52,328評(píng)論 1 310
  • 那天倦西,我揣著相機(jī)與錄音能真,去河邊找鬼。 笑死扰柠,一個(gè)胖子當(dāng)著我的面吹牛粉铐,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播卤档,決...
    沈念sama閱讀 40,897評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼蝙泼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了裆装?” 一聲冷哼從身側(cè)響起踱承,我...
    開(kāi)封第一講書(shū)人閱讀 39,804評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎哨免,沒(méi)想到半個(gè)月后茎活,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,345評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡琢唾,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,431評(píng)論 3 340
  • 正文 我和宋清朗相戀三年载荔,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片采桃。...
    茶點(diǎn)故事閱讀 40,561評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡懒熙,死狀恐怖询刹,靈堂內(nèi)的尸體忽然破棺而出蒸矛,到底是詐尸還是另有隱情刊橘,我是刑警寧澤儒老,帶...
    沈念sama閱讀 36,238評(píng)論 5 350
  • 正文 年R本政府宣布比原,位于F島的核電站莫其,受9級(jí)特大地震影響墓阀,放射性物質(zhì)發(fā)生泄漏塌碌。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,928評(píng)論 3 334
  • 文/蒙蒙 一橱健、第九天 我趴在偏房一處隱蔽的房頂上張望而钞。 院中可真熱鬧,春花似錦拘荡、人聲如沸臼节。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,417評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)网缝。三九已至,卻和暖如春亮隙,著一層夾襖步出監(jiān)牢的瞬間途凫,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,528評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工溢吻, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留维费,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,983評(píng)論 3 376
  • 正文 我出身青樓促王,卻偏偏與公主長(zhǎng)得像犀盟,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子蝇狼,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,573評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理阅畴,服務(wù)發(fā)現(xiàn),斷路器迅耘,智...
    卡卡羅2017閱讀 134,702評(píng)論 18 139
  • 1.什么是消息隊(duì)列 消息隊(duì)列允許應(yīng)用間通過(guò)消息的發(fā)送與接收的方式進(jìn)行通信贱枣,當(dāng)消息接收方服務(wù)忙或不可用時(shí),其提供了一...
    zhuke閱讀 4,476評(píng)論 0 12
  • RabbitMQ集群 集群架構(gòu) RabbitMQ始終記錄以下4種類(lèi)型的數(shù)據(jù) 隊(duì)列元數(shù)據(jù):隊(duì)列名稱(chēng)颤专、屬性(是否持久化...
    JAVA覓音閣閱讀 1,439評(píng)論 1 0
  • 關(guān)于消息隊(duì)列纽哥,從前年開(kāi)始斷斷續(xù)續(xù)看了些資料,想寫(xiě)很久了栖秕,但一直沒(méi)騰出空春塌,近來(lái)分別碰到幾個(gè)朋友聊這塊的技術(shù)選型,是時(shí)...
    預(yù)流閱讀 584,928評(píng)論 51 786
  • 整體架構(gòu) 部署步驟 基于 Docker 基本概念內(nèi)存節(jié)點(diǎn)只保存狀態(tài)到內(nèi)存簇捍,例外情況是:持久的 queue 的內(nèi)容將...
    mvictor閱讀 12,758評(píng)論 5 30