Node.js 基于 RabbitMQ 的分布式爬蟲框架

本文是基于 Node.js 和 RabbitMQ 設(shè)計(jì)的分布式爬蟲框架,適用于每次爬取數(shù)據(jù)量不大,但是高并發(fā)的爬蟲昵时。其中該項(xiàng)目中還用到了 PM2(啟動 server 及熱更新,還能監(jiān)控程序運(yùn)行數(shù)據(jù))和 Express

現(xiàn)在的需求是,用戶向我們的 Api Server 發(fā)送請求,然后我們的 Api Server 不會真正的去爬取數(shù)據(jù),而是將這個(gè)任務(wù)放到隊(duì)列之中,然后然后真正的 Crawler Server 去爬取數(shù)據(jù)呜袁,最后將數(shù)據(jù)返回給 Api Server,響應(yīng)用戶的請求简珠。所以架構(gòu)圖是這個(gè)樣子的:

Structure

接下來就是擼碼了阶界,同時(shí)獻(xiàn)上項(xiàng)目傳送門虹钮。

1. 連接 RabbitMQ 服務(wù)

首先你需要配置一臺 RabbitMQ Server ,需要安裝 Erlang 環(huán)境和 RabbitMQ膘融,因?yàn)镽abbitMQ 是用 Erlang 寫的芙粱。具體安裝過程可以 查看 RabbitMQ 官網(wǎng)。我是在 Windows 上配置的氧映,很簡單春畔,安裝完成后,RabbitMQ 會開機(jī)自啟動岛都。

Node.js 中我們使用 RabbitMQ律姨,借助 amqplib 庫,這里面關(guān)于 RabbitMQ 的所有使用方式臼疫,都有示例代碼择份,而且由于 RabbitMQ 只用作中間的任務(wù)分發(fā),所以烫堤,代碼幾乎不用改動荣赶,當(dāng)然為了保證 RabbitMQ 的通用性,這部分代碼我也不建議大家更改鸽斟。

下面是連接 RabbitMQ 服務(wù)的代碼讯壶,需要注意以下幾點(diǎn):

  1. 代碼中我們統(tǒng)一使用 amqplib 里的 callback_api,amqplib 下也有 promise 的 api湾盗,但是官方示例中,即便使用了 promise api立轧,還是有好幾層的嵌套格粪,不是很好理解,所以不如使用 callback_api氛改,因?yàn)椴簧婕暗秸嬲龢I(yè)務(wù)邏輯帐萎,都是固定的代碼,也無所謂胜卤。

  2. RabbitMQ 的連接地址寫全的話疆导,格式如下:amqp://user:password@host:port,比如: amqp://guest:guest@192.168.10.63:5672 葛躏,其中默認(rèn)用戶名好密碼都是 guest澈段,默認(rèn)端口是 5672。如果你本機(jī)調(diào)試的話舰攒,那么直接寫成 amqp://localhost 就 OK 了败富。

  3. 如果你要配置集群可連接的話,那么摩窃,你需要更改配置文件兽叮。并重啟 RabbitMQ 服務(wù)。 Windows 下配置文件的路徑為:C:\Users\user\AppData\Roaming\RabbitMQ\rabbitmq.config,默認(rèn)請況下鹦聪,里面的內(nèi)容是空的账阻,你需要什么配置,把同級目錄下的 rabbitmq.config.example 中的內(nèi)容拷過去就好了泽本,記得打開注釋淘太,筆者當(dāng)時(shí)搞的時(shí)候,因?yàn)樗緳C(jī)不夠老观挎,沒反應(yīng)過來 %% 是注釋琴儿,搞了好幾次配置文件,重啟了后幾次服務(wù)才恍然大悟嘁捷,囧死了造成。對了,重點(diǎn)的雄嚣,你只需要把本機(jī) ip 和 port 添加到 tcp_listeners 中就好了晒屎,然后重啟 RabbitMQ 服務(wù),這樣缓升,之后你的集群就可以連接了鼓鲁。

     // rabbitmq.config
     {tcp_listeners, [{"127.0.0.1", 5672},
                      {"192.168.10.63", 5672}, %% 你添加的一行
                       {"::1",       5672}]}
    
    
     // rabbit.js
     let amqp = require('amqplib/callback_api');
    
     let mqConn;
     
     /**
      * 創(chuàng)建 RabbitMQ 連接
      */
     function createMqConnection() {
         amqp.connect('amqp://localhost', function (err, conn) {
             if (err) {
                 console.log('error --> ', err);
             } else {
                 mqConn = conn;
                 console.log('RabbitMQ 連接已建立');
             }
         });
     }
     
     
     /**
      * 獲取 RabbitMQ 連接
      *
      * @returns {*}
      */
     function getMqConnection() {
         return mqConn;
     }
     
     
     module.exports = {
         createMqConnection: createMqConnection,
         getMqConnection: getMqConnection,
     };
    

2. 將任務(wù)添加到隊(duì)列

因?yàn)樵谟脩糇龀稣埱蠛螅罱K還要將數(shù)據(jù)返回給用戶港谊,所以骇吭,我們需要使用 RabbitMQ 的 RPC 模式(簡單的理解就是,能接收任務(wù)歧寺,還能將任務(wù)的執(zhí)行結(jié)果返回去)燥狰。下面直接擼碼了:

    /**
     * 添加一個(gè) Rpc 任務(wù)
     *
     * 建議不要在此處處理 Api 傳入的數(shù)據(jù),也不要處理 Worker 返回的數(shù)據(jù)斜筐。數(shù)據(jù)最好在架構(gòu)的兩端處理龙致,即交給 Api
     * 和 Feature 處理,這樣可以保證 RabbitMQ 的通用性顷链,數(shù)據(jù)只與 Api 和 Feature 兩層相關(guān)
     *
     * @param startingData
     * @param uuid 任務(wù)號目代,用于區(qū)分返回的數(shù)據(jù)是哪個(gè)任務(wù)
     * @param callback
     */
    function newRpcTask(startingData, uuid, callback) {
        rabbit.getMqConnection().createChannel(function (err, ch) {
            if (err) return handleError(err);
    
            ch.assertQueue('', {exclusive: true}, function (err, q) {
                if (err) return handleError(err);
    
                let corr = uuid;
                console.log('Starting data: %s', startingData.toString());
    
                ch.consume(q.queue, function (msg) {
                    if (msg.properties.correlationId === corr) {
                        console.log('Return data: %s', msg.content.toString());
                        // Feature 返回的數(shù)據(jù)不要處理,交回給 Api 處理
                        callback(msg.content.toString())
                    }
                }, {noAck: true});
    
                ch.sendToQueue(FIBONACCI_QUEUE, Buffer.from(startingData.toString()), {correlationId: corr, replyTo: q.queue});
            });
        });
    }


    /***
     * 異常處理
     *
     * @param err
     */
    function handleError(err) {
        console.log('Error ---> ', err);
    }

3. Worker 從隊(duì)列拿任務(wù)嗤练,并執(zhí)行返回

這里我封裝了一個(gè) BaseWorker 的基類榛了。其實(shí)現(xiàn)類只需要重寫 doFeature(),實(shí)現(xiàn)相關(guān)的業(yè)務(wù)邏輯潭苞。然后調(diào)用 startRpcConsumer() 忽冻,從隊(duì)列獲取任務(wù)后并再調(diào)用doFeature() ,完成相關(guān)操作此疹。具體使用方式可以參見項(xiàng)目中的 fibonacciWorker.js僧诚,這是一個(gè)斐波那契計(jì)算的邏輯

    // baseWorker.js
    let amqp = require('amqplib/callback_api');


    class BaseWorker {
        constructor() {
            this.queueName = '';
        }
    
    
        /**
         * 消費(fèi) Rpc 事件并返回結(jié)果
         *
         * 建議不要在此處處理 Api 傳入的數(shù)據(jù)遮婶,也不要處理 Worker 返回的數(shù)據(jù)。數(shù)據(jù)最好在架構(gòu)的兩端處理湖笨,即交給 Api
         * 和 Feature 處理旗扑,這樣可以保證 RabbitMQ 的通用性,數(shù)據(jù)只與 Api 和 Feature 兩層相關(guān)
         */
        startRpcConsumer() {
            amqp.connect('amqp://localhost', (err, conn) => {
                if (err) return BaseWorker.handleError(err);
    
                conn.createChannel((err, ch) => {
                    if (err) return BaseWorker.handleError(err);
    
                    ch.assertQueue(this.queueName, {durable: false});
                    ch.prefetch(1);
                    console.log(' [x] Awaiting RPC requests');
                    ch.consume(this.queueName, (msg) => {
                        if (msg !== null) {
                            this.doFeature(msg, ch);
                        } else {
                            console.log('msg is null.');
                        }
                    });
                });
            });
        }
    
    
        /**
         * 完成業(yè)務(wù)后刪除消息
         *
         * @param msg
         * @param ch
         * @param result
         */
        ackMsg(msg, ch, result) {
            ch.sendToQueue(msg.properties.replyTo,
                Buffer.from(result.toString()),
                {correlationId: msg.properties.correlationId});
    
            ch.ack(msg);
        }
    
    
        /**
         * 錯(cuò)誤處理
         *
         * @param err
         */
        static handleError(err) {
            console.log('Error --> ', err);
        }
    
    
        /**
         * 業(yè)務(wù)處理接口慈省,子類實(shí)現(xiàn)該接口即可
         *
         * @param msg
         * @param ch
         */
        doFeature(msg, ch) {
        }
    }

4. 服務(wù)器

我是用 Express 寫的服務(wù)器臀防,當(dāng)然你也可以根據(jù)自己的需求隨意挑選,比如 Restify 等边败。在啟動服務(wù)器時(shí)袱衷,要先連接 RabbitMQ。然后在 Api 中笑窜,不是直接操作業(yè)務(wù)邏輯致燥,而是將請求當(dāng)做一個(gè)任務(wù),發(fā)送到 RabbitMQ 的隊(duì)列排截,例如本例中通過調(diào)用 task.newRpcTask()嫌蚤,將用戶請求生成一個(gè)任務(wù)然后添加到隊(duì)列,在回調(diào)中拿到最終的結(jié)果断傲,返回給用戶脱吱。

    // server.js
    let express = require('express');
    let app = express();
    let rabbit = require('../scheduler/rabbit');
    let task = require('../scheduler/task');
    
    /**
     * 創(chuàng)建 RabbitMQ 連接
     */
    rabbit.createMqConnection();
    
    
    /**
     * Rpc 任務(wù) Api
     */
    app.get('/newRpcTask', function (req, res) {
        let queryData = req.query;
        console.log('queryData = ', queryData);
    
        // 在將數(shù)據(jù)傳遞至 RabbitMQ 層時(shí),要考慮好傳遞的格式认罩,RabbitMQ 層不要對數(shù)據(jù)進(jìn)行處理箱蝠,最終交給 Feature 時(shí)再處理
        task.newRpcTask(queryData.num, generateUuid(), function (result) {
            res.send('fibonacci = ' + result);
        });
    });
    
    app.listen(3000);
    
    
    function generateUuid() {
        return Math.random().toString() +
            Math.random().toString() +
            Math.random().toString();
    }

最后,上演示結(jié)果了

RabbitMQ 只需要在 RabbitMQ 服務(wù)器安裝垦垂,其它設(shè)備通過 amqp 協(xié)議連接即可抡锈。我在演示的時(shí)候,Api Server 和 Rabbit Server 安裝在我的 Windows 上乔外,而另外三臺 CentOS 作為 Worker Server 使用。 好的一罩,終于要發(fā)車了:

  1. 如果你的 Rabbit Server 沒有啟動需要先啟動杨幼,我在 Windows 上已經(jīng)開機(jī)自啟,不在贅述

  2. 啟動 Api Server: node server.js(如果使用 PM2啟動命令是 pm2 start server.js)

  3. 在所有 Worker Server 上啟動 worker: node fibonacciWorker.js(如果使用 PM2啟動命令是 pm2 start fibonacciWorker.js)

    startServer
  4. 我在本例中定義的是 get 請求接口聂渊。只需要一個(gè) num(指的是要計(jì)算斐波那契數(shù)列的第幾個(gè)數(shù)值) 作為 query 參數(shù)差购。通過瀏覽器發(fā)送請求:http://172.16.10.63:3000/newRpcTask?num=42, 獲取計(jì)算結(jié)果汉嗽。我們可以看到欲逃,RabbitMQ 其實(shí)達(dá)到了一個(gè)負(fù)載均衡的作用,它會根據(jù) Worker 的工作情況饼暑,將任務(wù)自動分發(fā)給集群中的 Worker稳析。

    taskTest
    taskTest

至此洗做,本文全部結(jié)束(項(xiàng)目傳送門)。感謝您的閱讀彰居,也歡迎您拍磚诚纸。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市陈惰,隨后出現(xiàn)的幾起案子畦徘,更是在濱河造成了極大的恐慌,老刑警劉巖抬闯,帶你破解...
    沈念sama閱讀 221,198評論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件井辆,死亡現(xiàn)場離奇詭異,居然都是意外死亡溶握,警方通過查閱死者的電腦和手機(jī)杯缺,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評論 3 398
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來奈虾,“玉大人夺谁,你說我怎么就攤上這事∪馕ⅲ” “怎么了匾鸥?”我有些...
    開封第一講書人閱讀 167,643評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長碉纳。 經(jīng)常有香客問我勿负,道長,這世上最難降的妖魔是什么劳曹? 我笑而不...
    開封第一講書人閱讀 59,495評論 1 296
  • 正文 為了忘掉前任奴愉,我火速辦了婚禮,結(jié)果婚禮上铁孵,老公的妹妹穿的比我還像新娘锭硼。我一直安慰自己,他們只是感情好蜕劝,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,502評論 6 397
  • 文/花漫 我一把揭開白布檀头。 她就那樣靜靜地躺著,像睡著了一般岖沛。 火紅的嫁衣襯著肌膚如雪暑始。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,156評論 1 308
  • 那天婴削,我揣著相機(jī)與錄音廊镜,去河邊找鬼。 笑死唉俗,一個(gè)胖子當(dāng)著我的面吹牛嗤朴,可吹牛的內(nèi)容都是我干的配椭。 我是一名探鬼主播,決...
    沈念sama閱讀 40,743評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼播赁,長吁一口氣:“原來是場噩夢啊……” “哼颂郎!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起容为,我...
    開封第一講書人閱讀 39,659評論 0 276
  • 序言:老撾萬榮一對情侶失蹤乓序,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后坎背,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體替劈,經(jīng)...
    沈念sama閱讀 46,200評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,282評論 3 340
  • 正文 我和宋清朗相戀三年得滤,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了陨献。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,424評論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡懂更,死狀恐怖眨业,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情沮协,我是刑警寧澤龄捡,帶...
    沈念sama閱讀 36,107評論 5 349
  • 正文 年R本政府宣布,位于F島的核電站慷暂,受9級特大地震影響聘殖,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜行瑞,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,789評論 3 333
  • 文/蒙蒙 一奸腺、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧血久,春花似錦突照、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,264評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至副砍,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間庄岖,已是汗流浹背豁翎。 一陣腳步聲響...
    開封第一講書人閱讀 33,390評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留隅忿,地道東北人心剥。 一個(gè)月前我還...
    沈念sama閱讀 48,798評論 3 376
  • 正文 我出身青樓邦尊,卻偏偏與公主長得像,于是被迫代替她去往敵國和親优烧。 傳聞我的和親對象是個(gè)殘疾皇子蝉揍,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,435評論 2 359

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)畦娄,斷路器又沾,智...
    卡卡羅2017閱讀 134,693評論 18 139
  • 序言第1章 并行和分布式計(jì)算介紹第2章 異步編程第3章 Python的并行計(jì)算第4章 Celery分布式應(yīng)用第5章...
    SeanCheney閱讀 12,520評論 3 35
  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化熙卡、事務(wù)杖刷、擁塞控...
    jiangmo閱讀 10,367評論 2 34
  • 1.RabbitMQ概述 簡介: MQ全稱為Message Queue,消息隊(duì)列是應(yīng)用程序和應(yīng)用程序之間的通信方法...
    梁朋舉閱讀 49,765評論 0 47
  • 落日蕭瑟 看夕陽 乘清風(fēng) 斷劍驳癌,殘簫 斯人滑燃,烈酒 事隔經(jīng)年 誰曾在意你的孤獨(dú) 只道是 一派瀟灑景象 英雄 誰又是你...
    黑小生閱讀 628評論 21 15