owt-server 的集群管理者麸折、集群工作站锡凝、消息隊(duì)列(一)

轉(zhuǎn)載請(qǐng)注明:

原始地址: http://www.reibang.com/p/9f2fb27062cb

原作者:wonder


1、寫(xiě)在前面

owt-server使用node.js開(kāi)發(fā)垢啼,涉及node.js c++混合開(kāi)發(fā)窜锯。

owt-server的目錄結(jié)構(gòu)如下:


root@ubuntu:/home/wonder/OWT/owt-server-master# ls

build  cert  doc  docker  LICENSE  node_modules  README.md  scripts  source  test  third_party

各種環(huán)境安裝腳本在 scripts/ 下,參考README.md進(jìn)行編譯芭析、安裝锚扎、運(yùn)行、測(cè)試即可


2馁启、owt-server簡(jiǎn)要介紹

owt-server是集群式的媒體服務(wù)驾孔。每種功能模塊可以是集群(cluster)的一個(gè)工作站(worker),多個(gè)worker由中心管理者(manager)管理惯疙,管理者有主(master)/備(slave)/候選者(candidate)之分翠勉。有些模塊可以復(fù)用同一個(gè)worker。

worker霉颠、manager之間通過(guò)消息隊(duì)列進(jìn)行 任務(wù)傳遞/rpc調(diào)用对碌。owt-server使用了node.js中的amqp庫(kù)模塊連接宿主機(jī)中運(yùn)行的rabbitmq,以此作為消息隊(duì)列的底層實(shí)現(xiàn)蒿偎。


3朽们、clusterManager集群管理者模塊概述

查看目錄 source/cluster_manager/


root@ubuntu:/home/wonder/OWT/owt-server-master# ls source/cluster_manager/

clusterManager.js  cluster_manager.toml  dist.json  index.js  log4js_configuration.json  matcher.js  package.json  scheduler.js  strategy.js

其中index.js,為該模塊的啟動(dòng)入口诉位。**index.js **代碼前段是引用必要的庫(kù)华坦,重點(diǎn)庫(kù)有:

1)amqp_client (庫(kù)實(shí)現(xiàn)位于source/common/amqp_client.js):主要用是owt-server的 rpc 封裝(利用amqp實(shí)現(xiàn)RPC角色的封裝定義,如rpcClient不从、rpcServer等),后文會(huì)介紹

2)clusterManager(位于source/cluster_manager/clusterManager.js):主要定義了集群管理中心之管理者(manager)犁跪、候選者(candidate)自薦競(jìng)選椿息、主(master)/備(slave)、同步坷衍、鼻抻牛活等方法。


3.1 clusterManager模塊詳細(xì)分解

clusterManager.js定義了主要的4內(nèi)部函數(shù)變量(var ClusterManager枫耳、var runAsSlave乏矾、var runAsMaster和var runAsCandidate)和一個(gè)導(dǎo)出函數(shù)變量(exports.run)

下面就是 clusterManager.js,源碼沒(méi)有注釋?zhuān)P者在走讀源碼后根據(jù)自己的理解添加了注釋。涉及到對(duì)專(zhuān)業(yè)術(shù)語(yǔ)有疑問(wèn)的钻心,如rpc凄硼、主\備\候選者(master/slave/candidate)、以及 js 語(yǔ)法捷沸,請(qǐng)自行百度摊沉。

    推薦兩篇介紹消息隊(duì)列的網(wǎng)址,個(gè)人覺(jué)得很不錯(cuò):

        【推薦看這篇痒给,全面且有拓展】:[http://www.reibang.com/p/a4d92d0d7e19](http://www.reibang.com/p/a4d92d0d7e19)

          rabbitmq 對(duì)于AMQP的介紹:[https://www.rabbitmq.com/tutorials/amqp-concepts.html#message-acknowledge]

var ClusterManager = function (clusterName, selfId, spec) { //集群管理者定義**

     //省略一些定義

    ......

    //   var  ClusterManager  定義了集群管理中心(manager)的內(nèi)部函數(shù)變量和返回值:**

    //    以下為內(nèi)部函數(shù)變量说墨,函數(shù)體均省略,請(qǐng)查看源碼**

    var createScheduler = function (purpose);    /*創(chuàng)建某一類(lèi)任務(wù)的調(diào)度器(Scheduler,記錄\管理\執(zhí)行)*/

    var checkAlive = function ();    /*檢查該manager 所管理的工作站 (worker)的存活情況*/

    var workerJoin = function (purpose, worker, info);    /*執(zhí)行某一類(lèi)任務(wù)的worker加入該manager */

    var workerQuit = function (worker);    /*一個(gè) worker 從該manager 退出該*/

    var keepAlive = function (worker, on_result);    /*一個(gè) worker 向該 manager  申請(qǐng)辈园兀活*/

    var reportState = function (worker, state);    /* 該 manager  報(bào)告某 worker 的狀態(tài)*/

    var reportLoad = function (worker, load);    /*該 manager 報(bào)告某 woker 的負(fù)載*/

    var pickUpTasks = function (worker, tasks);    /*令某 worker 執(zhí)行某些任務(wù)*/

    var layDownTask = function (worker, task);    /*令某 worker 放棄執(zhí)行某任務(wù)*/

    var schedule = function (purpose, task, preference, reserveTime, on_ok, on_error);    /*對(duì)某類(lèi)型(purpose) 的任務(wù) (task) 按照指定配置 (preference尼斧,reserveTime) 分配worker*/

    var unschedule = function (worker, task);    /*撤銷(xiāo)某 worker 下分配的任務(wù)*/

    var getWorkerAttr = function (worker, on_ok, on_error);    /*獲取某 worker 的屬性*/

    var getWorkers = function (purpose, on_ok);    /*獲取某類(lèi) workers*/

    var getTasks = function (worker, on_ok);    /*獲取某 worker 的任務(wù)*/

    var getScheduled = function (purpose, task, on_ok, on_error);    /*獲取某類(lèi)型任務(wù)的 worker*/

    //    以下為返回值:

    var that = {name: clusterName,  id: selfId};    /*ClusterManager(...)的返回值that*/

    that.getRuntimeData = function (on_data);    /*收集該 manager 管理的每類(lèi)Scheduler、每個(gè) worker 试吁、每個(gè) task*/

    that.registerDataUpdate = function (on_updated_data);    /*向該 manager 注冊(cè)消息同步實(shí)例*/

    that.setRuntimeData = function (data);    /*向該 manager 配置 data 中記錄的 Scheduler棺棵、worker和 task*/

    that.setUpdatedData = function (data);    /*向該 manager 更新信息,data.type∈{"worker_join "," worker_quit"," worker_state"," worker_load","worker_pickup "," worker_laydown"," scheduled"," unscheduled" }潘悼,data具體數(shù)據(jù)結(jié)構(gòu)律秃,請(qǐng)查看源碼*/

    that.serve = function (monitoringTgt);    /*啟用 manager 服務(wù),并注冊(cè)管理目標(biāo)*/

     that.rpcAPI = {    /*rpc接口函數(shù)治唤,以下函數(shù)體均省略棒动,請(qǐng)查看源碼。它們與內(nèi)部函數(shù)變量是對(duì)應(yīng)的*/

                join: function (purpose, worker, info, callback) { ...},

                quit: function (worker) { ...},

                keepAlive: function (worker, callback) { ...},

                reportState: function (worker, state) { ...},

                reportLoad: function (worker, load) { ...},

                pickUpTasks: function (worker, tasks) { ...},

               layDownTask: function (worker, task) { ...},

               schedule: function (purpose, task, preference, reserveTime, callback) { ...},

               unschedule: function (worker, task) { ...},

               getWorkerAttr: function (worker, callback) { ...},

                getWorkers: function (purpose, callback) {  ...},

               getTasks: function (worker, callback) {  ...},

                getScheduled: function (purpose, task, callback) { ...}

    }

}

var runAsSlave= function(topicChannel, manager) {    //集群管理者作為 “備份”(salve) 的身份運(yùn)行**

    //省略一些定義 

    ......

    **//以下為內(nèi)部函數(shù)變量宾添,函數(shù)體均省略船惨,請(qǐng)查看源碼**

    var requestRuntimeData = function ();     /*向首要集群管理者 (master)  請(qǐng)求運(yùn)行期間的數(shù)據(jù),數(shù)據(jù)內(nèi)容參閱上文var ClusterManager 返回值中的 getRuntimeData  函數(shù)*/

   var onTopicMessage = function(message);     /*接收到主題消息時(shí)的處理函數(shù)缕陕,message.type∈{ “runtimeData”粱锐,“updateData”,“declareMaster” }扛邑,分別對(duì)應(yīng)著 “收到運(yùn)行期間的數(shù)據(jù)”怜浅,“收到數(shù)據(jù)更新”,“收到 master 的角色申明 ” */

    var superviseMaster = function ();    /*監(jiān)督 master 的定時(shí)任務(wù)(30ms檢查一次)蔬崩,若當(dāng)前master失聯(lián)(matster心跳超時(shí)大于2次)恶座,則該 salve 將進(jìn)入候選者身份(candidate) 的狀態(tài)*/

    **//以下為調(diào)用 runAsSlave  將執(zhí)行的函數(shù)體**

    topicChannel.subscribe(     //在指定的主題信道下(基于消息隊(duì)列) 訂閱兩種主題消息

        ['clusterManager.slave.#', 'clusterManager.*.' + manager.id] ,    //兩種主題的關(guān)鍵id

        onTopicMessage 沥阳,    //主題消息處理函數(shù)

        function () {    //訂閱成功后跨琳,執(zhí)行的函數(shù)體

            requestRuntimeData();     //向 master 請(qǐng)求運(yùn)行期間的數(shù)據(jù)

            superviseMaster();    //監(jiān)督 master

        }

    )

**}**

**var runAsMaster = function(topicChannel, manager) {**

    //省略一些定義     

    ...... 

     topicChannel.bus.asRpcServer(    //啟用遠(yuǎn)程調(diào)用服務(wù)

            manager.name,    //master 名稱(chēng)

            manager.rpcAPI,    //master的rpc接口

            function(rpcSvr) {     //rpc服務(wù)啟用成功后執(zhí)行的函數(shù)體

               topicChannel.bus.asMonitoringTarget(function(monitoringTgt) {     //啟用worker監(jiān)管服務(wù),主要用于在worker遠(yuǎn)程調(diào)用登出master時(shí)的消息回傳桐罕,可類(lèi)比為消息確認(rèn)ACK

                        manager.serve(monitoringTgt);     //啟用 master 集群管理服務(wù) 

                        setInterval( //設(shè)置定時(shí)器

                                function () {    // 向消息隊(duì)列的三種主題 發(fā)送 “declareMaster ”消息  

                                        topicChannel.publish(    //主題 'clusterManager.slave' 消息

                                            'clusterManager.slave', 

                                            {type: 'declareMaster', data: {id: manager.id, life_time: life_time}}         

                                        ); 

                                        topicChannel.publish(      //主題  'clusterManager.candidate' 消息

                                            'clusterManager.candidate', 

                                            {type: 'declareMaster', data: {id: manager.id, life_time: life_time}}

                                        );                

                                         topicChannel.publish(     //主題   'clusterManager.master' 消息 

                                            'clusterManager.master', 

                                            {type: 'declareMaster', data: {id: manager.id, life_time: life_time}}

                                        );   

                                 }脉让,

                                 20     //時(shí)間間隔20ms 

                         );

                        var onTopicMessage = function (message);  //消息處理函數(shù)

                        topicChannel.subscribe(     //訂閱主題消息

                                ['clusterManager.master.#', 'clusterManager.*.' + manager.id],

                                onTopicMessage,    //消息處理函數(shù)

                                  function () {     //訂閱成功執(zhí)行的函數(shù)體

                                            manager.registerDataUpdate(    //注冊(cè)通知slave的具體方法

                                                    topicChannel.publish(    //通過(guò)消息隊(duì)列發(fā)送主題為 'clusterManager.slave' 的消息

                                                             'clusterManager.slave',

                                                              {type: 'updateData', data: data}  

                                                    )

                                            );

                                    }

                            );

                        },

                        function(reason) { process.exit();}桂敛; //asMonitoringTarget 失敗

                   },

                    function(reason) {process.exit(); };    //as RPC server 失敗

            }

    }

} 

var runAsCandidate = function(topicChannel, manager) {

     //省略一些定義    

     ......  

    var electMaster = function () { ...}     //該候選者決定自身身份:是 master 還是 slave

    var selfRecommend = function () { ...}    //該候選者自薦溅潜,每30ms向消息隊(duì)列發(fā)送'clusterManager.candidate'  主題消息“selfRecommend”

      var onTopicMessage = function (message) { ...}    //消息處理函數(shù)术唬。初始化后定時(shí)160ms決定自身身份;收到“selfRecommend”伟恶,若消息中id大于自身id碴开,放棄晉升master;收到“declareMaster”博秫,停止自薦潦牛,清除定時(shí),成為slave身份

    topicChannel.subscribe(     //訂閱 “clusterManager.candidate.#”主題消息

        ['clusterManager.candidate.#'],

        onTopicMessage,    //消息處理函數(shù)

        function () {     

            selfRecommend();      //訂閱成功挡育,該參與者開(kāi)始自薦

        }

    );

} 

exports.run= function (topicChannel, clusterName, id, spec) {

    //該js庫(kù)的導(dǎo)出函數(shù)

    var manager = new ClusterManager(clusterName, id, spec);    //生成一個(gè)集群管理者(manager)實(shí)例

    runAsCandidate(topicChannel, manager);    //該manager立即作為候選者(candiate)運(yùn)行

}

思考:主/備方式的好處巴碗,在于:一定程度減少了中心式集群管理的風(fēng)險(xiǎn),即中心管理者宕機(jī)造成集群失效的風(fēng)險(xiǎn)即寒。其缺點(diǎn)也是存在的橡淆,即調(diào)度集中于主管理者,主管理者僅于備管理者進(jìn)行同步母赵,在調(diào)度請(qǐng)求非常頻繁時(shí)逸爵,主管理者性能會(huì)成為瓶頸,這也是中心式網(wǎng)絡(luò)應(yīng)用的通病凹嘲。

閱讀clusterManager.js文件的總結(jié):

a) 通過(guò)源碼走讀师倔,可以明確管理者(manager)、主(master)/備(slave)/候選者(candidate)的分工以及競(jìng)選方式周蹭。

b) 這部分僅是對(duì)集群管理者(cluster_manager)的定義趋艘,對(duì)于集群工作站(worker)的定義還沒(méi)有概念。目前僅知道凶朗,master暴露了一下rpc接口供調(diào)用瓷胧。

c) 這部分對(duì)rpc的調(diào)用是比較高層次的,確實(shí)在owt-server的代碼中棚愤,amqp_client.js文件對(duì)node.js中的amqp進(jìn)行了封裝搓萧,以amqp為基礎(chǔ)實(shí)現(xiàn)了底層的消息收發(fā)、通知機(jī)制宛畦。

d) 這部分提到了Scheduler矛绘,它是作為某種類(lèi)型的任務(wù)的管理器,供clusterManager.js使用的刃永。它內(nèi)部實(shí)現(xiàn)了task的記錄、worker的記錄羊精、超時(shí)管理斯够、task與worker的關(guān)聯(lián)囚玫、task和worker的調(diào)度分配細(xì)節(jié)。對(duì)于記錄读规、關(guān)聯(lián)抓督、超時(shí)管理等功能,下文不做詳細(xì)描述束亏,因?yàn)橄嚓P(guān)的接口基本與clusterManager.js 文件中 var ClusterManager 提供的接口一致铃在。

因此,下文將僅對(duì)scheduler.js中的任務(wù)調(diào)度部分做詳細(xì)分解碍遍。

       ( 消息隊(duì)列重要文件amqp_client.js將在下一篇《owt-server 的集群管理定铜、集群工作站、消息隊(duì)列(二)》進(jìn)行分解怕敬; 集群工作站(worker)將在《owt-server 的集群管理揣炕、集群工作站、消息隊(duì)列(三)》結(jié)合具體應(yīng)用類(lèi)型進(jìn)行分解)

3.2 scheduler模塊---任務(wù)調(diào)度部分詳細(xì)分解

話不多說(shuō)东跪,上干貨畸陡。

首先,放兩個(gè)scheduler 模塊---任務(wù)調(diào)度部分需要使用的模塊虽填。

1) strategy.js

調(diào)度策略模塊丁恭,描述了不同的調(diào)度準(zhǔn)則:最近使用、最常使用斋日、最少使用牲览、roundRobin(輪詢(xún))、隨機(jī)選取

這里貼兩個(gè)(最常使用桑驱、roundRobin)進(jìn)行說(shuō)明


var mostUsed = function () {    

    this.allocate = function (workers, candidates, on_ok, on_error) {    //獲取該策略選中的某個(gè) work 在 workers 中的標(biāo)號(hào)竭恬,candidates中存放標(biāo)號(hào)

        var most = 0, found = undefined;

        for (var i in candidates) {    //在提前篩選出的候選candidates中搜索,(提前篩選好處是縮小策略算法運(yùn)算的空間范圍)

            var id = candidates[i];

            if (workers[id].load >= most) {    //檢查id所對(duì)應(yīng)work的負(fù)載熬的,選取最大負(fù)載的work的標(biāo)號(hào)

                most = workers[id].load;

                found = id;

            }

        }

        on_ok(found);    //回調(diào)選中的標(biāo)號(hào)

    };};


var roundRobin = function () {

    var latest_used = 65536 * 65536;

    this.allocate = function (workers, candidates, on_ok, on_error) {    

        var i = candidates.indexOf(latest_used);    //初始返回-1

        if (i === -1) {

            latest_used = candidates[0];    //初始選第一個(gè)候選

        } else {

            latest_used = (i === candidates.length - 1) ? candidates[0] : candidates[i + 1];  //選擇下一個(gè)candidates[xxx]中存放的標(biāo)號(hào)

        }

        on_ok(latest_used);     //回調(diào)選中的標(biāo)號(hào)

    };};

2) matcher.js

條件匹配模塊痊硕,描述了不同類(lèi)型work的匹配準(zhǔn)則。owt-server提供了多種類(lèi)型的服務(wù):portal押框、webrtc岔绸、video、audio橡伞、analytics盒揉、conference、recording兑徘、streaming刚盈。其中有些服務(wù)需要有獨(dú)特的任務(wù)task和工作站worker的匹配準(zhǔn)則。

舉兩個(gè)栗子(上干貨):


var webrtcMatcher = function () {

    this.match = function (preference, workers, candidates) {    //參數(shù)1是配置喜好

        var result = [],

            found_sweet = false;    //找到甜心?!?!?!?!!!挂脑,源碼作者有點(diǎn)意思的(奸笑~)

        for (var i in candidates) {

            var id = candidates[i];

            var capacity = workers[id].info.capacity;    //每個(gè)worker在向master登記時(shí)藕漱,都會(huì)把自身能力帶上

            if (is_isp_applicable(capacity.isps, preference.isp)) {    //這個(gè)isp是什么作用還不清楚欲侮,直譯是“運(yùn)營(yíng)商”?懂的朋友可以交流一下

                if (is_region_suited(capacity.regions, preference.region)) {    //這個(gè)region也不太明確肋联,根據(jù)字面直覺(jué)上和域控相關(guān)

                    if (!found_sweet) {

                        found_sweet = true;

                        result = [id];

                    } else {

                        result.push(id);

                    }

                } else {    //不在region里威蕉,并且沒(méi)有找到甜心,強(qiáng)行指定甜心嗎橄仍?有點(diǎn)迷

                    if (!found_sweet) {

                        result.push(id);

                    }

                }

            }

        }

        return result;

    };};


var videoMatcher = function () {

    this.match = function (preference, workers, candidates) {

        if (!preference || !preference.video)    

            return candidates;

        var formatContain = function (listA, listB) {    //函數(shù)韧涨,統(tǒng)計(jì)B在A中的數(shù)量

            var count = 0;

            listB.forEach((fmtB) => {

                if (listA.indexOf(fmtB) > -1)

                    count++;

            });

            return (count === listB.length);

        };

        var result = candidates.filter(function(cid) {    //篩選結(jié)果

            var capacity = workers[cid].info.capacity;

            var encodeOk = false;

            var decodeOk = false;

            if (capacity.video) {    

                encodeOk = formatContain(capacity.video.encode, preference.video.encode);    //判斷偏好的視頻編碼器是否在worker的能力中

                decodeOk = formatContain(capacity.video.decode, preference.video.decode);    //判斷偏好的視頻解碼器是否在worker的能力中

            }

            if (!encodeOk) {    //編碼不匹配

                log.warn('No available workers for encoding:', JSON.stringify(preference.video.encode));

            }

            if (!decodeOk) {    //解碼不匹配

                log.warn('No available workers for decoding:', JSON.stringify(preference.video.decode));

            }

            return (encodeOk && decodeOk);   //編解碼都匹配才行嘛!

        });

        return result;

    };};

終于侮繁,

3)scheduler.js

代碼不多虑粥,就是淦~


exports.Scheduler = function(spec) {

    /*State <- [0 | 1 | 2]*/        //官方注釋最為致命,這種魔數(shù)是看代碼最大的障礙之一鼎天,尤其是表示狀態(tài)的魔數(shù)

    /*{WorkerId: {state: State, load: Number, info: info, tasks:[Task]}*/     // Scheduler  中worker表中的屬性舀奶,以及tasks表屬性

    var workers = {};

    /*{Task: {reserve_timer: TimerId,

              reserve_time: Number,

              worker: WorkerId}      }*/

    var tasks = {};

    var matcher = Matcher.create(spec.purpose),     //根據(jù)指定的應(yīng)用類(lèi)型名稱(chēng)創(chuàng)建對(duì)應(yīng)matcher 

        strategy = Strategy.create(spec.strategy),     //根據(jù)指定的策略創(chuàng)建對(duì)應(yīng)matcher 

        schedule_reserve_time = spec.scheduleReserveTime; 

that.schedule = function (task, preference, reserveTime, on_ok, on_error) {    //參數(shù)1是需要調(diào)度的任務(wù)編號(hào),參數(shù)2是該任務(wù)的偏好配置

        if (tasks[task]) {    //該任務(wù)編號(hào)在處理記錄中

            var newReserveTime = reserveTime && tasks[task].reserve_time < reserveTime ? reserveTime : tasks[task].reserve_time,  //更新保留時(shí)長(zhǎng)

                worker = tasks[task].worker;    //正在處理該 task 的 worker

            if (workers[worker]) {    // worker還在記錄中

                if (isTaskInExecution(task, worker)) {    //該任務(wù)正在執(zhí)行

                    tasks[task].reserve_time = newReserveTime;    //更新任務(wù)保留時(shí)長(zhǎng)

                } else {    //任務(wù)沒(méi)在執(zhí)行

                    reserveWorkerForTask(task, worker, newReserveTime);    //向worker申請(qǐng)?jiān)搕ask執(zhí)行時(shí)長(zhǎng)

                }

                return on_ok(worker, workers[worker].info);    //回調(diào)指定的 worker 和 它的信息斋射,并返回

            } else {    //如果 worker 沒(méi)了

                repealTask(task);      //清理該任務(wù)記錄育勺,準(zhǔn)備重新分派

            }

        }

        var candidates = [];

        for (var worker in workers) {

            if (isWorkerAvailable(workers[worker])) {    //衡量worker負(fù)載和狀態(tài)

                candidates.push(worker);    //加入候選

            }

        }

        if (candidates.length < 1) {    

            return on_error('No worker available, all in full load.');

        }

        candidates = matcher.match(preference, workers, candidates);    //matcher它來(lái)了,基于任務(wù)偏好罗岖、matcher類(lèi)型篩選候選者

        if (candidates.length < 1) {

            return on_error('No worker matches the preference.');

        } else {

            strategy.allocate(workers, candidates, function (worker) {    //strategy它來(lái)了涧至,基于strategy 策略選擇合適的 worker

                reserveWorkerForTask(task, worker, (reserveTime && reserveTime > 0 ? reserveTime : schedule_reserve_time));    //為該task分配記錄

                on_ok(worker, workers[worker].info);    //回調(diào)

            }, on_error);

        }

    };

}

總結(jié):scheduler 使用了策略方式,結(jié)合strategy 和matcher 模塊桑包,將不同配置對(duì)應(yīng)的策略與策略的執(zhí)行進(jìn)行解耦南蓬,方便擴(kuò)展,是良好設(shè)計(jì)模式的體現(xiàn)哑了。


3.3 index.js --- cluster_manager 模塊入口

在節(jié)3中赘方,提到了該模塊下的index.js是入口程序,下面簡(jiǎn)單介紹一下弱左,以形成模塊到進(jìn)程(程序)的概念窄陡。

首先是引用的庫(kù)


var amqper = require('./amqp_client'));    //amqp封裝模塊

var logger = require('./logger').logger;    //日志

var log = logger.getLogger('Main');    

var ClusterManager = require('./clusterManager');    //cluster_manager模塊

var toml = require('toml');    //配置文件模塊

var fs = require('fs');    //文件系統(tǒng)模塊

其次,配置文件讀取拆火、配置設(shè)置


var config;

try {  

config = toml.parse(fs.readFileSync('./cluster_manager.toml'));      //可以讀一下配置文件跳夭,更加清晰

} catch (e) {

  log.error('Parsing config error on line ' + e.line + ', column ' + e.column + ': ' + e.message);

  process.exit(1);

}

config.manager = config.manager || {};    //manager配置

config.manager.name = config.manager.name || 'owt-cluster';    //manager名字

config.manager.initial_time = config.manager.initial_time || 10 * 1000;    //啟動(dòng)時(shí)間

config.manager.check_alive_interval = config.manager.check_alive_interval || 1000;    //manager 檢查 worker 失聯(lián)的時(shí)間間隔

config.manager.check_alive_count = config.manager.check_alive_count || 10;    //manager 剔除失聯(lián) worker 前的最大檢查次數(shù)

config.manager.schedule_reserve_time = config.manager.schedule_reserve_time || 60 * 1000;    //調(diào)度默認(rèn)保留時(shí)間(僅當(dāng)調(diào)度請(qǐng)求沒(méi)有該字段時(shí))

config.strategy = config.strategy || {};    //調(diào)度策略, 以下為各種應(yīng)用類(lèi)型的默認(rèn)調(diào)度策略

config.strategy.general = config.strategy.general || 'round-robin';

config.strategy.portal = config.strategy.portal || 'last-used';

config.strategy.conference = config.strategy.conference || 'last-used';

config.strategy.webrtc = config.strategy.webrtc || 'last-used';

config.strategy.sip = config.strategy.sip || 'round-robin';

config.strategy.streaming = config.strategy.streaming || 'round-robin';

config.strategy.recording = config.strategy.recording || 'randomly-pick';

config.strategy.audio = config.strategy.audio || 'most-used';

config.strategy.video = config.strategy.video || 'least-used';

config.strategy.analytics = config.strategy.analytics || 'least-used';

config.rabbit = config.rabbit || {};    //rabbitmq配置

config.rabbit.host = config.rabbit.host || 'localhost';    //rabbitmq地址

config.rabbit.port = config.rabbit.port || 5672;    //rabbitmq端口

最后们镜,


function startup () {

    var enableService = function () {

        var id = Math.floor(Math.random() * 1000000000);    //生成隨機(jī)id

        var spec = {initialTime: config.manager.initial_time,

                    checkAlivePeriod: config.manager.check_alive_interval,

                    checkAliveCount: config.manager.check_alive_count,

                    scheduleKeepTime: config.manager.schedule_reserve_time,

                    strategy: config.strategy

                   };

        amqper.asTopicParticipant(config.manager.name + '.management', function(channel) {  //利用amqp封裝庫(kù)加入主題币叹,得到句柄channel

            log.info('Cluster manager up! id:', id);    

            ClusterManager.run(channel, config.manager.name, id, spec);    //使用配置、隨機(jī)模狭、句柄小压,啟動(dòng)ClusterManager

        }, function(reason) {

            log.error('Cluster manager initializing failed, reason:', reason);

            process.exit();

        });

    };

    amqper.connect(config.rabbit, function () {    //amqp封裝庫(kù)連接rabbitmq消息隊(duì)列

        enableService();    //啟動(dòng)上述服務(wù)

    }, function(reason) {

        log.error('Cluster manager connect to rabbitMQ server failed, reason:', reason);

        process.exit();

    });

}

startup();    //啟動(dòng)

...    //省略其他系統(tǒng)信號(hào)設(shè)置


至此降瞳,集群管理者模塊墨缘,即cluster_manager模塊以及基本分解完畢。下一步就是需要理清集群工作站九妈,以及二者如何協(xié)調(diào)工作。此外就是二者所依賴(lài)的底層消息隊(duì)列究竟做了些什么雾鬼,還有怎么實(shí)現(xiàn)的。

image
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末宴树,一起剝皮案震驚了整個(gè)濱河市策菜,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌酒贬,老刑警劉巖又憨,帶你破解...
    沈念sama閱讀 211,639評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異锭吨,居然都是意外死亡蠢莺,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)零如,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)躏将,“玉大人,你說(shuō)我怎么就攤上這事考蕾』霰铮” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,221評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵肖卧,是天一觀的道長(zhǎng)蚯窥。 經(jīng)常有香客問(wèn)我,道長(zhǎng)塞帐,這世上最難降的妖魔是什么拦赠? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,474評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮葵姥,結(jié)果婚禮上荷鼠,老公的妹妹穿的比我還像新娘。我一直安慰自己牌里,他們只是感情好颊咬,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,570評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著牡辽,像睡著了一般喳篇。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上态辛,一...
    開(kāi)封第一講書(shū)人閱讀 49,816評(píng)論 1 290
  • 那天麸澜,我揣著相機(jī)與錄音,去河邊找鬼奏黑。 笑死炊邦,一個(gè)胖子當(dāng)著我的面吹牛编矾,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播馁害,決...
    沈念sama閱讀 38,957評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼窄俏,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了碘菜?” 一聲冷哼從身側(cè)響起凹蜈,我...
    開(kāi)封第一講書(shū)人閱讀 37,718評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎忍啸,沒(méi)想到半個(gè)月后仰坦,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,176評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡计雌,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,511評(píng)論 2 327
  • 正文 我和宋清朗相戀三年悄晃,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片凿滤。...
    茶點(diǎn)故事閱讀 38,646評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡妈橄,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出鸭巴,到底是詐尸還是另有隱情眷细,我是刑警寧澤,帶...
    沈念sama閱讀 34,322評(píng)論 4 330
  • 正文 年R本政府宣布鹃祖,位于F島的核電站溪椎,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏恬口。R本人自食惡果不足惜校读,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,934評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望祖能。 院中可真熱鬧歉秫,春花似錦、人聲如沸养铸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,755評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)钞螟。三九已至兔甘,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間鳞滨,已是汗流浹背洞焙。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,987評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人澡匪。 一個(gè)月前我還...
    沈念sama閱讀 46,358評(píng)論 2 360
  • 正文 我出身青樓熔任,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親唁情。 傳聞我的和親對(duì)象是個(gè)殘疾皇子疑苔,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,514評(píng)論 2 348