本文是基于 Node.js 和 RabbitMQ 設(shè)計(jì)的分布式爬蟲框架,適用于每次爬取數(shù)據(jù)量不大,但是高并發(fā)的爬蟲昵时。其中該項(xiàng)目中還用到了 PM2(啟動 server 及熱更新,還能監(jiān)控程序運(yùn)行數(shù)據(jù))和 Express
現(xiàn)在的需求是,用戶向我們的 Api Server 發(fā)送請求,然后我們的 Api Server 不會真正的去爬取數(shù)據(jù),而是將這個(gè)任務(wù)放到隊(duì)列之中,然后然后真正的 Crawler Server 去爬取數(shù)據(jù)呜袁,最后將數(shù)據(jù)返回給 Api Server,響應(yīng)用戶的請求简珠。所以架構(gòu)圖是這個(gè)樣子的:
接下來就是擼碼了阶界,同時(shí)獻(xiàn)上項(xiàng)目傳送門虹钮。
1. 連接 RabbitMQ 服務(wù)
首先你需要配置一臺 RabbitMQ Server ,需要安裝 Erlang 環(huán)境和 RabbitMQ膘融,因?yàn)镽abbitMQ 是用 Erlang 寫的芙粱。具體安裝過程可以 查看 RabbitMQ 官網(wǎng)。我是在 Windows 上配置的氧映,很簡單春畔,安裝完成后,RabbitMQ 會開機(jī)自啟動岛都。
Node.js 中我們使用 RabbitMQ律姨,借助 amqplib 庫,這里面關(guān)于 RabbitMQ 的所有使用方式臼疫,都有示例代碼择份,而且由于 RabbitMQ 只用作中間的任務(wù)分發(fā),所以烫堤,代碼幾乎不用改動荣赶,當(dāng)然為了保證 RabbitMQ 的通用性,這部分代碼我也不建議大家更改鸽斟。
下面是連接 RabbitMQ 服務(wù)的代碼讯壶,需要注意以下幾點(diǎn):
代碼中我們統(tǒng)一使用 amqplib 里的 callback_api,amqplib 下也有 promise 的 api湾盗,但是官方示例中,即便使用了 promise api立轧,還是有好幾層的嵌套格粪,不是很好理解,所以不如使用 callback_api氛改,因?yàn)椴簧婕暗秸嬲龢I(yè)務(wù)邏輯帐萎,都是固定的代碼,也無所謂胜卤。
RabbitMQ 的連接地址寫全的話疆导,格式如下:
amqp://user:password@host:port
,比如:amqp://guest:guest@192.168.10.63:5672
葛躏,其中默認(rèn)用戶名好密碼都是 guest澈段,默認(rèn)端口是 5672。如果你本機(jī)調(diào)試的話舰攒,那么直接寫成amqp://localhost
就 OK 了败富。-
如果你要配置集群可連接的話,那么摩窃,你需要更改配置文件兽叮。并重啟 RabbitMQ 服務(wù)。 Windows 下配置文件的路徑為:
C:\Users\user\AppData\Roaming\RabbitMQ\rabbitmq.config
,默認(rèn)請況下鹦聪,里面的內(nèi)容是空的账阻,你需要什么配置,把同級目錄下的rabbitmq.config.example
中的內(nèi)容拷過去就好了泽本,記得打開注釋淘太,筆者當(dāng)時(shí)搞的時(shí)候,因?yàn)樗緳C(jī)不夠老观挎,沒反應(yīng)過來%%
是注釋琴儿,搞了好幾次配置文件,重啟了后幾次服務(wù)才恍然大悟嘁捷,囧死了造成。對了,重點(diǎn)的雄嚣,你只需要把本機(jī) ip 和 port 添加到 tcp_listeners 中就好了晒屎,然后重啟 RabbitMQ 服務(wù),這樣缓升,之后你的集群就可以連接了鼓鲁。// rabbitmq.config {tcp_listeners, [{"127.0.0.1", 5672}, {"192.168.10.63", 5672}, %% 你添加的一行 {"::1", 5672}]} // rabbit.js let amqp = require('amqplib/callback_api'); let mqConn; /** * 創(chuàng)建 RabbitMQ 連接 */ function createMqConnection() { amqp.connect('amqp://localhost', function (err, conn) { if (err) { console.log('error --> ', err); } else { mqConn = conn; console.log('RabbitMQ 連接已建立'); } }); } /** * 獲取 RabbitMQ 連接 * * @returns {*} */ function getMqConnection() { return mqConn; } module.exports = { createMqConnection: createMqConnection, getMqConnection: getMqConnection, };
2. 將任務(wù)添加到隊(duì)列
因?yàn)樵谟脩糇龀稣埱蠛螅罱K還要將數(shù)據(jù)返回給用戶港谊,所以骇吭,我們需要使用 RabbitMQ 的 RPC 模式(簡單的理解就是,能接收任務(wù)歧寺,還能將任務(wù)的執(zhí)行結(jié)果返回去)燥狰。下面直接擼碼了:
/**
* 添加一個(gè) Rpc 任務(wù)
*
* 建議不要在此處處理 Api 傳入的數(shù)據(jù),也不要處理 Worker 返回的數(shù)據(jù)斜筐。數(shù)據(jù)最好在架構(gòu)的兩端處理龙致,即交給 Api
* 和 Feature 處理,這樣可以保證 RabbitMQ 的通用性顷链,數(shù)據(jù)只與 Api 和 Feature 兩層相關(guān)
*
* @param startingData
* @param uuid 任務(wù)號目代,用于區(qū)分返回的數(shù)據(jù)是哪個(gè)任務(wù)
* @param callback
*/
function newRpcTask(startingData, uuid, callback) {
rabbit.getMqConnection().createChannel(function (err, ch) {
if (err) return handleError(err);
ch.assertQueue('', {exclusive: true}, function (err, q) {
if (err) return handleError(err);
let corr = uuid;
console.log('Starting data: %s', startingData.toString());
ch.consume(q.queue, function (msg) {
if (msg.properties.correlationId === corr) {
console.log('Return data: %s', msg.content.toString());
// Feature 返回的數(shù)據(jù)不要處理,交回給 Api 處理
callback(msg.content.toString())
}
}, {noAck: true});
ch.sendToQueue(FIBONACCI_QUEUE, Buffer.from(startingData.toString()), {correlationId: corr, replyTo: q.queue});
});
});
}
/***
* 異常處理
*
* @param err
*/
function handleError(err) {
console.log('Error ---> ', err);
}
3. Worker 從隊(duì)列拿任務(wù)嗤练,并執(zhí)行返回
這里我封裝了一個(gè) BaseWorker 的基類榛了。其實(shí)現(xiàn)類只需要重寫 doFeature()
,實(shí)現(xiàn)相關(guān)的業(yè)務(wù)邏輯潭苞。然后調(diào)用 startRpcConsumer()
忽冻,從隊(duì)列獲取任務(wù)后并再調(diào)用doFeature()
,完成相關(guān)操作此疹。具體使用方式可以參見項(xiàng)目中的 fibonacciWorker.js僧诚,這是一個(gè)斐波那契計(jì)算的邏輯
// baseWorker.js
let amqp = require('amqplib/callback_api');
class BaseWorker {
constructor() {
this.queueName = '';
}
/**
* 消費(fèi) Rpc 事件并返回結(jié)果
*
* 建議不要在此處處理 Api 傳入的數(shù)據(jù)遮婶,也不要處理 Worker 返回的數(shù)據(jù)。數(shù)據(jù)最好在架構(gòu)的兩端處理湖笨,即交給 Api
* 和 Feature 處理旗扑,這樣可以保證 RabbitMQ 的通用性,數(shù)據(jù)只與 Api 和 Feature 兩層相關(guān)
*/
startRpcConsumer() {
amqp.connect('amqp://localhost', (err, conn) => {
if (err) return BaseWorker.handleError(err);
conn.createChannel((err, ch) => {
if (err) return BaseWorker.handleError(err);
ch.assertQueue(this.queueName, {durable: false});
ch.prefetch(1);
console.log(' [x] Awaiting RPC requests');
ch.consume(this.queueName, (msg) => {
if (msg !== null) {
this.doFeature(msg, ch);
} else {
console.log('msg is null.');
}
});
});
});
}
/**
* 完成業(yè)務(wù)后刪除消息
*
* @param msg
* @param ch
* @param result
*/
ackMsg(msg, ch, result) {
ch.sendToQueue(msg.properties.replyTo,
Buffer.from(result.toString()),
{correlationId: msg.properties.correlationId});
ch.ack(msg);
}
/**
* 錯(cuò)誤處理
*
* @param err
*/
static handleError(err) {
console.log('Error --> ', err);
}
/**
* 業(yè)務(wù)處理接口慈省,子類實(shí)現(xiàn)該接口即可
*
* @param msg
* @param ch
*/
doFeature(msg, ch) {
}
}
4. 服務(wù)器
我是用 Express 寫的服務(wù)器臀防,當(dāng)然你也可以根據(jù)自己的需求隨意挑選,比如 Restify 等边败。在啟動服務(wù)器時(shí)袱衷,要先連接 RabbitMQ。然后在 Api 中笑窜,不是直接操作業(yè)務(wù)邏輯致燥,而是將請求當(dāng)做一個(gè)任務(wù),發(fā)送到 RabbitMQ 的隊(duì)列排截,例如本例中通過調(diào)用 task.newRpcTask()
嫌蚤,將用戶請求生成一個(gè)任務(wù)然后添加到隊(duì)列,在回調(diào)中拿到最終的結(jié)果断傲,返回給用戶脱吱。
// server.js
let express = require('express');
let app = express();
let rabbit = require('../scheduler/rabbit');
let task = require('../scheduler/task');
/**
* 創(chuàng)建 RabbitMQ 連接
*/
rabbit.createMqConnection();
/**
* Rpc 任務(wù) Api
*/
app.get('/newRpcTask', function (req, res) {
let queryData = req.query;
console.log('queryData = ', queryData);
// 在將數(shù)據(jù)傳遞至 RabbitMQ 層時(shí),要考慮好傳遞的格式认罩,RabbitMQ 層不要對數(shù)據(jù)進(jìn)行處理箱蝠,最終交給 Feature 時(shí)再處理
task.newRpcTask(queryData.num, generateUuid(), function (result) {
res.send('fibonacci = ' + result);
});
});
app.listen(3000);
function generateUuid() {
return Math.random().toString() +
Math.random().toString() +
Math.random().toString();
}
最后,上演示結(jié)果了
RabbitMQ 只需要在 RabbitMQ 服務(wù)器安裝垦垂,其它設(shè)備通過 amqp 協(xié)議連接即可抡锈。我在演示的時(shí)候,Api Server 和 Rabbit Server 安裝在我的 Windows 上乔外,而另外三臺 CentOS 作為 Worker Server 使用。 好的一罩,終于要發(fā)車了:
如果你的 Rabbit Server 沒有啟動需要先啟動杨幼,我在 Windows 上已經(jīng)開機(jī)自啟,不在贅述
啟動 Api Server:
node server.js
(如果使用 PM2啟動命令是pm2 start server.js
)-
在所有 Worker Server 上啟動 worker:
node fibonacciWorker.js
(如果使用 PM2啟動命令是pm2 start fibonacciWorker.js
)startServer -
我在本例中定義的是 get 請求接口聂渊。只需要一個(gè) num(指的是要計(jì)算斐波那契數(shù)列的第幾個(gè)數(shù)值) 作為 query 參數(shù)差购。通過瀏覽器發(fā)送請求:
http://172.16.10.63:3000/newRpcTask?num=42
, 獲取計(jì)算結(jié)果汉嗽。我們可以看到欲逃,RabbitMQ 其實(shí)達(dá)到了一個(gè)負(fù)載均衡的作用,它會根據(jù) Worker 的工作情況饼暑,將任務(wù)自動分發(fā)給集群中的 Worker稳析。taskTest